classAccumulate(BaseSynthesizer):"""Accumulate responses from multiple text chunks."""def__init__(self,llm:Optional[LLM]=None,callback_manager:Optional[CallbackManager]=None,prompt_helper:Optional[PromptHelper]=None,text_qa_template:Optional[BasePromptTemplate]=None,output_cls:Optional[Type[BaseModel]]=None,streaming:bool=False,use_async:bool=False,)->None:super().__init__(llm=llm,callback_manager=callback_manager,prompt_helper=prompt_helper,streaming=streaming,)self._text_qa_template=text_qa_templateorDEFAULT_TEXT_QA_PROMPT_SELself._use_async=use_asyncself._output_cls=output_clsdef_get_prompts(self)->PromptDictType:"""Get prompts."""return{"text_qa_template":self._text_qa_template}def_update_prompts(self,prompts:PromptDictType)->None:"""Update prompts."""if"text_qa_template"inprompts:self._text_qa_template=prompts["text_qa_template"]defflatten_list(self,md_array:List[List[Any]])->List[Any]:return[itemforsublistinmd_arrayforiteminsublist]def_format_response(self,outputs:List[Any],separator:str)->str:responses:List[str]=[]forresponseinoutputs:responses.append(responseor"Empty Response")returnseparator.join([f"Response {index+1}: {item}"forindex,iteminenumerate(responses)])asyncdefaget_response(self,query_str:str,text_chunks:Sequence[str],separator:str="\n---------------------\n",**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Apply the same prompt to text chunks and return async responses."""ifself._streaming:raiseValueError("Unable to stream in Accumulate response mode")tasks=[self._give_responses(query_str,text_chunk,use_async=True,**response_kwargs)fortext_chunkintext_chunks]flattened_tasks=self.flatten_list(tasks)outputs=awaitasyncio.gather(*flattened_tasks)returnself._format_response(outputs,separator)defget_response(self,query_str:str,text_chunks:Sequence[str],separator:str="\n---------------------\n",**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Apply the same prompt to text chunks and return responses."""ifself._streaming:raiseValueError("Unable to stream in Accumulate response mode")tasks=[self._give_responses(query_str,text_chunk,use_async=self._use_async,**response_kwargs)fortext_chunkintext_chunks]outputs=self.flatten_list(tasks)ifself._use_async:outputs=run_async_tasks(outputs)returnself._format_response(outputs,separator)def_give_responses(self,query_str:str,text_chunk:str,use_async:bool=False,**response_kwargs:Any,)->List[Any]:"""Give responses given a query and a corresponding text chunk."""text_qa_template=self._text_qa_template.partial_format(query_str=query_str)text_chunks=self._prompt_helper.repack(text_qa_template,[text_chunk],llm=self._llm)predictor:Callableifself._output_clsisNone:predictor=self._llm.apredictifuse_asyncelseself._llm.predictreturn[predictor(text_qa_template,context_str=cur_text_chunk,**response_kwargs,)forcur_text_chunkintext_chunks]else:predictor=(self._llm.astructured_predictifuse_asyncelseself._llm.structured_predict)return[predictor(self._output_cls,text_qa_template,context_str=cur_text_chunk,**response_kwargs,)forcur_text_chunkintext_chunks]
Apply the same prompt to text chunks and return async responses.
Source code in llama-index-core/llama_index/core/response_synthesizers/accumulate.py
62636465666768697071727374757677787980818283
asyncdefaget_response(self,query_str:str,text_chunks:Sequence[str],separator:str="\n---------------------\n",**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Apply the same prompt to text chunks and return async responses."""ifself._streaming:raiseValueError("Unable to stream in Accumulate response mode")tasks=[self._give_responses(query_str,text_chunk,use_async=True,**response_kwargs)fortext_chunkintext_chunks]flattened_tasks=self.flatten_list(tasks)outputs=awaitasyncio.gather(*flattened_tasks)returnself._format_response(outputs,separator)
defget_response(self,query_str:str,text_chunks:Sequence[str],separator:str="\n---------------------\n",**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Apply the same prompt to text chunks and return responses."""ifself._streaming:raiseValueError("Unable to stream in Accumulate response mode")tasks=[self._give_responses(query_str,text_chunk,use_async=self._use_async,**response_kwargs)fortext_chunkintext_chunks]outputs=self.flatten_list(tasks)ifself._use_async:outputs=run_async_tasks(outputs)returnself._format_response(outputs,separator)