classRefine(BaseSynthesizer):"""Refine a response to a query across text chunks."""def__init__(self,llm:Optional[LLM]=None,callback_manager:Optional[CallbackManager]=None,prompt_helper:Optional[PromptHelper]=None,text_qa_template:Optional[BasePromptTemplate]=None,refine_template:Optional[BasePromptTemplate]=None,output_cls:Optional[Type[BaseModel]]=None,streaming:bool=False,verbose:bool=False,structured_answer_filtering:bool=False,program_factory:Optional[Callable[[BasePromptTemplate],BasePydanticProgram]]=None,)->None:super().__init__(llm=llm,callback_manager=callback_manager,prompt_helper=prompt_helper,streaming=streaming,)self._text_qa_template=text_qa_templateorDEFAULT_TEXT_QA_PROMPT_SELself._refine_template=refine_templateorDEFAULT_REFINE_PROMPT_SELself._verbose=verboseself._structured_answer_filtering=structured_answer_filteringself._output_cls=output_clsifself._streamingandself._structured_answer_filtering:raiseValueError("Streaming not supported with structured answer filtering.")ifnotself._structured_answer_filteringandprogram_factoryisnotNone:raiseValueError("Program factory not supported without structured answer filtering.")self._program_factory=program_factoryorself._default_program_factorydef_get_prompts(self)->PromptDictType:"""Get prompts."""return{"text_qa_template":self._text_qa_template,"refine_template":self._refine_template,}def_update_prompts(self,prompts:PromptDictType)->None:"""Update prompts."""if"text_qa_template"inprompts:self._text_qa_template=prompts["text_qa_template"]if"refine_template"inprompts:self._refine_template=prompts["refine_template"]@dispatcher.spandefget_response(self,query_str:str,text_chunks:Sequence[str],prev_response:Optional[RESPONSE_TEXT_TYPE]=None,**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Give response over chunks."""dispatcher.event(GetResponseStartEvent(query_str=query_str,text_chunks=text_chunks))response:Optional[RESPONSE_TEXT_TYPE]=Nonefortext_chunkintext_chunks:ifprev_responseisNone:# if this is the first chunk, and text chunk already# is an answer, then return itresponse=self._give_response_single(query_str,text_chunk,**response_kwargs)else:# refine response if possibleresponse=self._refine_response_single(prev_response,query_str,text_chunk,**response_kwargs)prev_response=responseifisinstance(response,str):ifself._output_clsisnotNone:try:response=self._output_cls.model_validate_json(response)exceptValidationError:passelse:response=responseor"Empty Response"else:response=cast(Generator,response)dispatcher.event(GetResponseEndEvent())returnresponsedef_default_program_factory(self,prompt:BasePromptTemplate)->BasePydanticProgram:ifself._structured_answer_filtering:fromllama_index.core.program.utilsimportget_program_for_llmreturnget_program_for_llm(StructuredRefineResponse,prompt,self._llm,verbose=self._verbose,)else:returnDefaultRefineProgram(prompt=prompt,llm=self._llm,output_cls=self._output_cls,)def_give_response_single(self,query_str:str,text_chunk:str,**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Give response given a query and a corresponding text chunk."""text_qa_template=self._text_qa_template.partial_format(query_str=query_str)text_chunks=self._prompt_helper.repack(text_qa_template,[text_chunk],llm=self._llm)response:Optional[RESPONSE_TEXT_TYPE]=Noneprogram=self._program_factory(text_qa_template)# TODO: consolidate with loop in get_response_defaultforcur_text_chunkintext_chunks:query_satisfied=FalseifresponseisNoneandnotself._streaming:try:structured_response=cast(StructuredRefineResponse,program(context_str=cur_text_chunk,**response_kwargs,),)query_satisfied=structured_response.query_satisfiedifquery_satisfied:response=structured_response.answerexceptValidationErrorase:logger.warning(f"Validation error on structured response: {e}",exc_info=True)elifresponseisNoneandself._streaming:response=self._llm.stream(text_qa_template,context_str=cur_text_chunk,**response_kwargs,)query_satisfied=Trueelse:response=self._refine_response_single(cast(RESPONSE_TEXT_TYPE,response),query_str,cur_text_chunk,**response_kwargs,)ifresponseisNone:response="Empty Response"ifisinstance(response,str):response=responseor"Empty Response"else:response=cast(Generator,response)returnresponsedef_refine_response_single(self,response:RESPONSE_TEXT_TYPE,query_str:str,text_chunk:str,**response_kwargs:Any,)->Optional[RESPONSE_TEXT_TYPE]:"""Refine response."""# TODO: consolidate with logic in response/schema.pyifisinstance(response,Generator):response=get_response_text(response)fmt_text_chunk=truncate_text(text_chunk,50)logger.debug(f"> Refine context: {fmt_text_chunk}")ifself._verbose:print(f"> Refine context: {fmt_text_chunk}")# NOTE: partial format refine template with query_str and existing_answer hererefine_template=self._refine_template.partial_format(query_str=query_str,existing_answer=response)# compute available chunk size to see if there is any available space# determine if the refine template is too big (which can happen if# prompt template + query + existing answer is too large)avail_chunk_size=self._prompt_helper._get_available_chunk_size(refine_template)ifavail_chunk_size<0:# if the available chunk size is negative, then the refine template# is too big and we just return the original responsereturnresponse# obtain text chunks to add to the refine templatetext_chunks=self._prompt_helper.repack(refine_template,text_chunks=[text_chunk],llm=self._llm)program=self._program_factory(refine_template)forcur_text_chunkintext_chunks:query_satisfied=Falseifnotself._streaming:try:structured_response=cast(StructuredRefineResponse,program(context_msg=cur_text_chunk,**response_kwargs,),)query_satisfied=structured_response.query_satisfiedifquery_satisfied:response=structured_response.answerexceptValidationErrorase:logger.warning(f"Validation error on structured response: {e}",exc_info=True)else:# TODO: structured response not supported for streamingifisinstance(response,Generator):response="".join(response)refine_template=self._refine_template.partial_format(query_str=query_str,existing_answer=response)response=self._llm.stream(refine_template,context_msg=cur_text_chunk,**response_kwargs,)returnresponse@dispatcher.spanasyncdefaget_response(self,query_str:str,text_chunks:Sequence[str],prev_response:Optional[RESPONSE_TEXT_TYPE]=None,**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:dispatcher.event(GetResponseStartEvent(query_str=query_str,text_chunks=text_chunks))response:Optional[RESPONSE_TEXT_TYPE]=Nonefortext_chunkintext_chunks:ifprev_responseisNone:# if this is the first chunk, and text chunk already# is an answer, then return itresponse=awaitself._agive_response_single(query_str,text_chunk,**response_kwargs)else:response=awaitself._arefine_response_single(prev_response,query_str,text_chunk,**response_kwargs)prev_response=responseifresponseisNone:response="Empty Response"ifisinstance(response,str):ifself._output_clsisnotNone:response=self._output_cls.model_validate_json(response)else:response=responseor"Empty Response"else:response=cast(AsyncGenerator,response)dispatcher.event(GetResponseEndEvent())returnresponseasyncdef_arefine_response_single(self,response:RESPONSE_TEXT_TYPE,query_str:str,text_chunk:str,**response_kwargs:Any,)->Optional[RESPONSE_TEXT_TYPE]:"""Refine response."""# TODO: consolidate with logic in response/schema.pyifisinstance(response,AsyncGenerator):response=awaitaget_response_text(response)fmt_text_chunk=truncate_text(text_chunk,50)logger.debug(f"> Refine context: {fmt_text_chunk}")# NOTE: partial format refine template with query_str and existing_answer hererefine_template=self._refine_template.partial_format(query_str=query_str,existing_answer=response)# compute available chunk size to see if there is any available space# determine if the refine template is too big (which can happen if# prompt template + query + existing answer is too large)avail_chunk_size=self._prompt_helper._get_available_chunk_size(refine_template)ifavail_chunk_size<0:# if the available chunk size is negative, then the refine template# is too big and we just return the original responsereturnresponse# obtain text chunks to add to the refine templatetext_chunks=self._prompt_helper.repack(refine_template,text_chunks=[text_chunk],llm=self._llm)program=self._program_factory(refine_template)forcur_text_chunkintext_chunks:query_satisfied=Falseifnotself._streaming:try:structured_response=awaitprogram.acall(context_msg=cur_text_chunk,**response_kwargs,)structured_response=cast(StructuredRefineResponse,structured_response)query_satisfied=structured_response.query_satisfiedifquery_satisfied:response=structured_response.answerexceptValidationErrorase:logger.warning(f"Validation error on structured response: {e}",exc_info=True)else:ifisinstance(response,Generator):response="".join(response)ifisinstance(response,AsyncGenerator):_r=""asyncfortextinresponse:_r+=textresponse=_rrefine_template=self._refine_template.partial_format(query_str=query_str,existing_answer=response)response=awaitself._llm.astream(refine_template,context_msg=cur_text_chunk,**response_kwargs,)ifquery_satisfied:refine_template=self._refine_template.partial_format(query_str=query_str,existing_answer=response)returnresponseasyncdef_agive_response_single(self,query_str:str,text_chunk:str,**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Give response given a query and a corresponding text chunk."""text_qa_template=self._text_qa_template.partial_format(query_str=query_str)text_chunks=self._prompt_helper.repack(text_qa_template,[text_chunk],llm=self._llm)response:Optional[RESPONSE_TEXT_TYPE]=Noneprogram=self._program_factory(text_qa_template)# TODO: consolidate with loop in get_response_defaultforcur_text_chunkintext_chunks:ifresponseisNoneandnotself._streaming:try:structured_response=awaitprogram.acall(context_str=cur_text_chunk,**response_kwargs,)structured_response=cast(StructuredRefineResponse,structured_response)query_satisfied=structured_response.query_satisfiedifquery_satisfied:response=structured_response.answerexceptValidationErrorase:logger.warning(f"Validation error on structured response: {e}",exc_info=True)elifresponseisNoneandself._streaming:response=awaitself._llm.astream(text_qa_template,context_str=cur_text_chunk,**response_kwargs,)query_satisfied=Trueelse:response=awaitself._arefine_response_single(cast(RESPONSE_TEXT_TYPE,response),query_str,cur_text_chunk,**response_kwargs,)ifresponseisNone:response="Empty Response"ifisinstance(response,str):response=responseor"Empty Response"else:response=cast(AsyncGenerator,response)returnresponse
@dispatcher.spandefget_response(self,query_str:str,text_chunks:Sequence[str],prev_response:Optional[RESPONSE_TEXT_TYPE]=None,**response_kwargs:Any,)->RESPONSE_TEXT_TYPE:"""Give response over chunks."""dispatcher.event(GetResponseStartEvent(query_str=query_str,text_chunks=text_chunks))response:Optional[RESPONSE_TEXT_TYPE]=Nonefortext_chunkintext_chunks:ifprev_responseisNone:# if this is the first chunk, and text chunk already# is an answer, then return itresponse=self._give_response_single(query_str,text_chunk,**response_kwargs)else:# refine response if possibleresponse=self._refine_response_single(prev_response,query_str,text_chunk,**response_kwargs)prev_response=responseifisinstance(response,str):ifself._output_clsisnotNone:try:response=self._output_cls.model_validate_json(response)exceptValidationError:passelse:response=responseor"Empty Response"else:response=cast(Generator,response)dispatcher.event(GetResponseEndEvent())returnresponse