Callback handler for storing generation data in OpenInference format.
OpenInference is an open standard for capturing and storing AI model
inferences. It enables production LLMapp servers to seamlessly integrate
with LLM observability solutions such as Arize and Phoenix.
For more information on the specification, see
https://github.com/Arize-ai/open-inference-spec
Source code in llama-index-integrations/callbacks/llama-index-callbacks-openinference/llama_index/callbacks/openinference/base.py
classOpenInferenceCallbackHandler(BaseCallbackHandler):"""Callback handler for storing generation data in OpenInference format. OpenInference is an open standard for capturing and storing AI model inferences. It enables production LLMapp servers to seamlessly integrate with LLM observability solutions such as Arize and Phoenix. For more information on the specification, see https://github.com/Arize-ai/open-inference-spec """def__init__(self,callback:Optional[Callable[[List[QueryData],List[NodeData]],None]]=None,)->None:"""Initializes the OpenInferenceCallbackHandler. Args: callback (Optional[Callable[[List[QueryData], List[NodeData]], None]], optional): A callback function that will be called when a query trace is completed, often used for logging or persisting query data. """super().__init__(event_starts_to_ignore=[],event_ends_to_ignore=[])self._callback=callbackself._trace_data=TraceData()self._query_data_buffer:List[QueryData]=[]self._node_data_buffer:List[NodeData]=[]defstart_trace(self,trace_id:Optional[str]=None)->None:iftrace_id=="query"ortrace_id=="chat":self._trace_data=TraceData()self._trace_data.query_data.timestamp=datetime.now().isoformat()self._trace_data.query_data.id=_generate_random_id()defend_trace(self,trace_id:Optional[str]=None,trace_map:Optional[Dict[str,List[str]]]=None,)->None:iftrace_id=="query"ortrace_id=="chat":self._query_data_buffer.append(self._trace_data.query_data)self._node_data_buffer.extend(self._trace_data.node_datas)self._trace_data=TraceData()ifself._callbackisnotNone:self._callback(self._query_data_buffer,self._node_data_buffer)defon_event_start(self,event_type:CBEventType,payload:Optional[Dict[str,Any]]=None,event_id:str="",parent_id:str="",**kwargs:Any,)->str:ifpayloadisnotNone:ifevent_typeisCBEventType.QUERY:query_text=payload[EventPayload.QUERY_STR]self._trace_data.query_data.query_text=query_textelifevent_typeisCBEventType.LLM:ifprompt:=payload.get(EventPayload.PROMPT,None):self._trace_data.query_data.llm_prompt=promptifmessages:=payload.get(EventPayload.MESSAGES,None):self._trace_data.query_data.llm_messages=[(m.role.value,m.content)forminmessages]# For chat engines there is no query event and thus the# query text will be None, in this case we set the query# text to the last message passed to the LLMifself._trace_data.query_data.query_textisNone:self._trace_data.query_data.query_text=messages[-1].contentreturnevent_iddefon_event_end(self,event_type:CBEventType,payload:Optional[Dict[str,Any]]=None,event_id:str="",**kwargs:Any,)->None:ifpayloadisNone:returnifevent_typeisCBEventType.RETRIEVE:fornode_with_scoreinpayload[EventPayload.NODES]:node=node_with_score.nodescore=node_with_score.scoreself._trace_data.query_data.node_ids.append(node.hash)self._trace_data.query_data.scores.append(score)self._trace_data.node_datas.append(NodeData(id=node.hash,node_text=node.text,))elifevent_typeisCBEventType.LLM:ifself._trace_data.query_data.response_textisNone:ifresponse:=payload.get(EventPayload.RESPONSE,None):ifisinstance(response,ChatResponse):# If the response is of class ChatResponse the string# representation has the format "<role>: <message>",# but we want just the messageresponse_text=response.message.contentelse:response_text=str(response)self._trace_data.query_data.response_text=response_textelifcompletion:=payload.get(EventPayload.COMPLETION,None):self._trace_data.query_data.response_text=str(completion)elifevent_typeisCBEventType.EMBEDDING:self._trace_data.query_data.query_embedding=payload[EventPayload.EMBEDDINGS][0]defflush_query_data_buffer(self)->List[QueryData]:"""Clears the query data buffer and returns the data. Returns: List[QueryData]: The query data. """query_data_buffer=self._query_data_bufferself._query_data_buffer=[]returnquery_data_bufferdefflush_node_data_buffer(self)->List[NodeData]:"""Clears the node data buffer and returns the data. Returns: List[NodeData]: The node data. """node_data_buffer=self._node_data_bufferself._node_data_buffer=[]returnnode_data_buffer
Clears the query data buffer and returns the data.
Returns:
Type
Description
List[QueryData]
List[QueryData]: The query data.
Source code in llama-index-integrations/callbacks/llama-index-callbacks-openinference/llama_index/callbacks/openinference/base.py
269270271272273274275276277
defflush_query_data_buffer(self)->List[QueryData]:"""Clears the query data buffer and returns the data. Returns: List[QueryData]: The query data. """query_data_buffer=self._query_data_bufferself._query_data_buffer=[]returnquery_data_buffer
Source code in llama-index-integrations/callbacks/llama-index-callbacks-openinference/llama_index/callbacks/openinference/base.py
279280281282283284285286287
defflush_node_data_buffer(self)->List[NodeData]:"""Clears the node data buffer and returns the data. Returns: List[NodeData]: The node data. """node_data_buffer=self._node_data_bufferself._node_data_buffer=[]returnnode_data_buffer