importosimportopenaios.environ["OPENAI_API_KEY"]="sk-..."openai.api_key=os.environ["OPENAI_API_KEY"]fromllama_index.llms.openaiimportOpenAIllm=OpenAI(model="gpt-3.5-turbo")stream=llm.stream("Hi, write a short story")forrinstream:print(r.delta,end="")
Source code in llama-index-integrations/llms/llama-index-llms-openai/llama_index/llms/openai/base.py
classOpenAI(FunctionCallingLLM):""" OpenAI LLM. Args: model: name of the OpenAI model to use. temperature: a float from 0 to 1 controlling randomness in generation; higher will lead to more creative, less deterministic responses. max_tokens: the maximum number of tokens to generate. additional_kwargs: Add additional parameters to OpenAI request body. max_retries: How many times to retry the API call if it fails. timeout: How long to wait, in seconds, for an API call before failing. reuse_client: Reuse the OpenAI client between requests. When doing anything with large volumes of async API calls, setting this to false can improve stability. api_key: Your OpenAI api key api_base: The base URL of the API to call api_version: the version of the API to call callback_manager: the callback manager is used for observability. default_headers: override the default headers for API requests. http_client: pass in your own httpx.Client instance. async_http_client: pass in your own httpx.AsyncClient instance. Examples: `pip install llama-index-llms-openai` ```python import os import openai os.environ["OPENAI_API_KEY"] = "sk-..." openai.api_key = os.environ["OPENAI_API_KEY"] from llama_index.llms.openai import OpenAI llm = OpenAI(model="gpt-3.5-turbo") stream = llm.stream("Hi, write a short story") for r in stream: print(r.delta, end="") ``` """model:str=Field(default=DEFAULT_OPENAI_MODEL,description="The OpenAI model to use.")temperature:float=Field(default=DEFAULT_TEMPERATURE,description="The temperature to use during generation.",ge=0.0,le=2.0,)max_tokens:Optional[int]=Field(description="The maximum number of tokens to generate.",gt=0,)logprobs:Optional[bool]=Field(description="Whether to return logprobs per token.",default=None,)top_logprobs:int=Field(description="The number of top token log probs to return.",default=0,ge=0,le=20,)additional_kwargs:Dict[str,Any]=Field(default_factory=dict,description="Additional kwargs for the OpenAI API.")max_retries:int=Field(default=3,description="The maximum number of API retries.",ge=0,)timeout:float=Field(default=60.0,description="The timeout, in seconds, for API requests.",ge=0,)default_headers:Optional[Dict[str,str]]=Field(default=None,description="The default headers for API requests.")reuse_client:bool=Field(default=True,description=("Reuse the OpenAI client between requests. When doing anything with large ""volumes of async API calls, setting this to false can improve stability."),)api_key:str=Field(default=None,description="The OpenAI API key.")api_base:str=Field(description="The base URL for OpenAI API.")api_version:str=Field(description="The API version for OpenAI API.")strict:bool=Field(default=False,description="Whether to use strict mode for invoking tools/using schemas.",)reasoning_effort:Optional[Literal["low","medium","high"]]=Field(default=None,description="The effort to use for reasoning models.",)modalities:Optional[List[str]]=Field(default=None,description="The output modalities to use for the model.",)audio_config:Optional[Dict[str,Any]]=Field(default=None,description="The audio configuration to use for the model.",)_client:Optional[SyncOpenAI]=PrivateAttr()_aclient:Optional[AsyncOpenAI]=PrivateAttr()_http_client:Optional[httpx.Client]=PrivateAttr()_async_http_client:Optional[httpx.AsyncClient]=PrivateAttr()def__init__(self,model:str=DEFAULT_OPENAI_MODEL,temperature:float=DEFAULT_TEMPERATURE,max_tokens:Optional[int]=None,additional_kwargs:Optional[Dict[str,Any]]=None,max_retries:int=3,timeout:float=60.0,reuse_client:bool=True,api_key:Optional[str]=None,api_base:Optional[str]=None,api_version:Optional[str]=None,callback_manager:Optional[CallbackManager]=None,default_headers:Optional[Dict[str,str]]=None,http_client:Optional[httpx.Client]=None,async_http_client:Optional[httpx.AsyncClient]=None,openai_client:Optional[SyncOpenAI]=None,async_openai_client:Optional[AsyncOpenAI]=None,# base classsystem_prompt:Optional[str]=None,messages_to_prompt:Optional[Callable[[Sequence[ChatMessage]],str]]=None,completion_to_prompt:Optional[Callable[[str],str]]=None,pydantic_program_mode:PydanticProgramMode=PydanticProgramMode.DEFAULT,output_parser:Optional[BaseOutputParser]=None,strict:bool=False,reasoning_effort:Optional[Literal["low","medium","high"]]=None,modalities:Optional[List[str]]=None,audio_config:Optional[Dict[str,Any]]=None,**kwargs:Any,)->None:# TODO: Support deprecated max_new_tokensif"max_new_tokens"inkwargs:max_tokens=kwargs["max_new_tokens"]delkwargs["max_new_tokens"]additional_kwargs=additional_kwargsor{}api_key,api_base,api_version=resolve_openai_credentials(api_key=api_key,api_base=api_base,api_version=api_version,)# TODO: Temp forced to 1.0 for o1ifmodelinO1_MODELS:temperature=1.0super().__init__(model=model,temperature=temperature,max_tokens=max_tokens,additional_kwargs=additional_kwargs,max_retries=max_retries,callback_manager=callback_manager,api_key=api_key,api_version=api_version,api_base=api_base,timeout=timeout,reuse_client=reuse_client,default_headers=default_headers,system_prompt=system_prompt,messages_to_prompt=messages_to_prompt,completion_to_prompt=completion_to_prompt,pydantic_program_mode=pydantic_program_mode,output_parser=output_parser,strict=strict,reasoning_effort=reasoning_effort,modalities=modalities,audio_config=audio_config,**kwargs,)self._client=openai_clientself._aclient=async_openai_clientself._http_client=http_clientself._async_http_client=async_http_clientdef_get_client(self)->SyncOpenAI:ifnotself.reuse_client:returnSyncOpenAI(**self._get_credential_kwargs())ifself._clientisNone:self._client=SyncOpenAI(**self._get_credential_kwargs())returnself._clientdef_get_aclient(self)->AsyncOpenAI:ifnotself.reuse_client:returnAsyncOpenAI(**self._get_credential_kwargs(is_async=True))ifself._aclientisNone:self._aclient=AsyncOpenAI(**self._get_credential_kwargs(is_async=True))returnself._aclientdef_get_model_name(self)->str:model_name=self.modelif"ft-"inmodel_name:# legacy fine-tuningmodel_name=model_name.split(":")[0]elifmodel_name.startswith("ft:"):model_name=model_name.split(":")[1]returnmodel_namedef_is_azure_client(self)->bool:returnisinstance(self._get_client(),AzureOpenAI)@classmethoddefclass_name(cls)->str:return"openai_llm"@propertydef_tokenizer(self)->Optional[Tokenizer]:""" Get a tokenizer for this model, or None if a tokenizing method is unknown. OpenAI can do this using the tiktoken package, subclasses may not have this convenience. """returntiktoken.encoding_for_model(self._get_model_name())@propertydefmetadata(self)->LLMMetadata:returnLLMMetadata(context_window=openai_modelname_to_contextsize(self._get_model_name()),num_output=self.max_tokensor-1,is_chat_model=is_chat_model(model=self._get_model_name()),is_function_calling_model=is_function_calling_model(model=self._get_model_name()),model_name=self.model,# TODO: Temp for O1 betasystem_role=MessageRole.USERifself.modelinO1_MODELSelseMessageRole.SYSTEM,)@llm_chat_callback()defchat(self,messages:Sequence[ChatMessage],**kwargs:Any)->ChatResponse:ifself._use_chat_completions(kwargs):chat_fn=self._chatelse:chat_fn=completion_to_chat_decorator(self._complete)returnchat_fn(messages,**kwargs)@llm_chat_callback()defstream_chat(self,messages:Sequence[ChatMessage],**kwargs:Any)->ChatResponseGen:ifself._use_chat_completions(kwargs):stream_chat_fn=self._stream_chatelse:stream_chat_fn=stream_completion_to_chat_decorator(self._stream_complete)returnstream_chat_fn(messages,**kwargs)@llm_completion_callback()defcomplete(self,prompt:str,formatted:bool=False,**kwargs:Any)->CompletionResponse:ifself.modalitiesand"audio"inself.modalities:raiseValueError("Audio is not supported for completion. Use chat/achat instead.")ifself._use_chat_completions(kwargs):complete_fn=chat_to_completion_decorator(self._chat)else:complete_fn=self._completereturncomplete_fn(prompt,**kwargs)@llm_completion_callback()defstream_complete(self,prompt:str,formatted:bool=False,**kwargs:Any)->CompletionResponseGen:ifself._use_chat_completions(kwargs):stream_complete_fn=stream_chat_to_completion_decorator(self._stream_chat)else:stream_complete_fn=self._stream_completereturnstream_complete_fn(prompt,**kwargs)def_use_chat_completions(self,kwargs:Dict[str,Any])->bool:if"use_chat_completions"inkwargs:returnkwargs["use_chat_completions"]returnself.metadata.is_chat_modeldef_get_credential_kwargs(self,is_async:bool=False)->Dict[str,Any]:return{"api_key":self.api_key,"base_url":self.api_base,"max_retries":self.max_retries,"timeout":self.timeout,"default_headers":self.default_headers,"http_client":self._async_http_clientifis_asyncelseself._http_client,}def_get_model_kwargs(self,**kwargs:Any)->Dict[str,Any]:base_kwargs={"model":self.model,"temperature":self.temperature,**kwargs}ifself.max_tokensisnotNone:# If max_tokens is None, don't include in the payload:# https://platform.openai.com/docs/api-reference/chat# https://platform.openai.com/docs/api-reference/completionsbase_kwargs["max_tokens"]=self.max_tokensifself.logprobsisnotNoneandself.logprobsisTrue:ifself.metadata.is_chat_model:base_kwargs["logprobs"]=self.logprobsbase_kwargs["top_logprobs"]=self.top_logprobselse:base_kwargs["logprobs"]=self.top_logprobs# int in this case# can't send stream_options to the API when not streamingall_kwargs={**base_kwargs,**self.additional_kwargs}if"stream"notinall_kwargsand"stream_options"inall_kwargs:delall_kwargs["stream_options"]ifself.modelinO1_MODELSandbase_kwargs.get("max_tokens")isnotNone:# O1 models use max_completion_tokens instead of max_tokensall_kwargs["max_completion_tokens"]=all_kwargs.get("max_completion_tokens",all_kwargs["max_tokens"])all_kwargs.pop("max_tokens",None)ifself.modelinO1_MODELSandself.reasoning_effortisnotNone:# O1 models support reasoning_effort of low, medium, highall_kwargs["reasoning_effort"]=self.reasoning_effortifself.modalitiesisnotNone:all_kwargs["modalities"]=self.modalitiesifself.audio_configisnotNone:all_kwargs["audio"]=self.audio_configreturnall_kwargs@llm_retry_decoratordef_chat(self,messages:Sequence[ChatMessage],**kwargs:Any)->ChatResponse:client=self._get_client()message_dicts=to_openai_message_dicts(messages,model=self.model,)ifself.reuse_client:response=client.chat.completions.create(messages=message_dicts,stream=False,**self._get_model_kwargs(**kwargs),)else:withclient:response=client.chat.completions.create(messages=message_dicts,stream=False,**self._get_model_kwargs(**kwargs),)openai_message=response.choices[0].messagemessage=from_openai_message(openai_message,modalities=self.modalitiesor["text"])openai_token_logprobs=response.choices[0].logprobslogprobs=Noneifopenai_token_logprobsandopenai_token_logprobs.content:logprobs=from_openai_token_logprobs(openai_token_logprobs.content)returnChatResponse(message=message,raw=response,logprobs=logprobs,additional_kwargs=self._get_response_token_counts(response),)@llm_retry_decoratordef_stream_chat(self,messages:Sequence[ChatMessage],**kwargs:Any)->ChatResponseGen:ifself.modalitiesand"audio"inself.modalities:raiseValueError("Audio is not supported for chat streaming")client=self._get_client()message_dicts=to_openai_message_dicts(messages,model=self.model,)defgen()->ChatResponseGen:content=""tool_calls:List[ChoiceDeltaToolCall]=[]is_function=Falseforresponseinclient.chat.completions.create(messages=message_dicts,**self._get_model_kwargs(stream=True,**kwargs),):response=cast(ChatCompletionChunk,response)iflen(response.choices)>0:delta=response.choices[0].deltaelse:ifself._is_azure_client():continueelse:delta=ChoiceDelta()ifdeltaisNone:continue# check if this chunk is the start of a function callifdelta.tool_calls:is_function=True# update using deltasrole=delta.roleorMessageRole.ASSISTANTcontent_delta=delta.contentor""content+=content_deltaadditional_kwargs={}ifis_function:tool_calls=update_tool_calls(tool_calls,delta.tool_calls)iftool_calls:additional_kwargs["tool_calls"]=tool_callsyieldChatResponse(message=ChatMessage(role=role,content=content,additional_kwargs=additional_kwargs,),delta=content_delta,raw=response,additional_kwargs=self._get_response_token_counts(response),)returngen()@llm_retry_decoratordef_complete(self,prompt:str,**kwargs:Any)->CompletionResponse:client=self._get_client()all_kwargs=self._get_model_kwargs(**kwargs)self._update_max_tokens(all_kwargs,prompt)ifself.reuse_client:response=client.completions.create(prompt=prompt,stream=False,**all_kwargs,)else:withclient:response=client.completions.create(prompt=prompt,stream=False,**all_kwargs,)text=response.choices[0].textopenai_completion_logprobs=response.choices[0].logprobslogprobs=Noneifopenai_completion_logprobs:logprobs=from_openai_completion_logprobs(openai_completion_logprobs)returnCompletionResponse(text=text,raw=response,logprobs=logprobs,additional_kwargs=self._get_response_token_counts(response),)@llm_retry_decoratordef_stream_complete(self,prompt:str,**kwargs:Any)->CompletionResponseGen:client=self._get_client()all_kwargs=self._get_model_kwargs(stream=True,**kwargs)self._update_max_tokens(all_kwargs,prompt)defgen()->CompletionResponseGen:text=""forresponseinclient.completions.create(prompt=prompt,**all_kwargs,):iflen(response.choices)>0:delta=response.choices[0].textifdeltaisNone:delta=""else:delta=""text+=deltayieldCompletionResponse(delta=delta,text=text,raw=response,additional_kwargs=self._get_response_token_counts(response),)returngen()def_update_max_tokens(self,all_kwargs:Dict[str,Any],prompt:str)->None:"""Infer max_tokens for the payload, if possible."""ifself.max_tokensisnotNoneorself._tokenizerisNone:return# NOTE: non-chat completion endpoint requires max_tokens to be setnum_tokens=len(self._tokenizer.encode(prompt))max_tokens=self.metadata.context_window-num_tokensifmax_tokens<=0:raiseValueError(f"The prompt has {num_tokens} tokens, which is too long for"" the model. Please use a prompt that fits within"f" {self.metadata.context_window} tokens.")all_kwargs["max_tokens"]=max_tokensdef_get_response_token_counts(self,raw_response:Any)->dict:"""Get the token usage reported by the response."""ifhasattr(raw_response,"usage"):try:prompt_tokens=raw_response.usage.prompt_tokenscompletion_tokens=raw_response.usage.completion_tokenstotal_tokens=raw_response.usage.total_tokensexceptAttributeError:return{}elifisinstance(raw_response,dict):usage=raw_response.get("usage",{})# NOTE: other model providers that use the OpenAI client may not report usageifusageisNone:return{}# Backwards compatibility with old dict typeprompt_tokens=usage.get("prompt_tokens",0)completion_tokens=usage.get("completion_tokens",0)total_tokens=usage.get("total_tokens",0)else:return{}return{"prompt_tokens":prompt_tokens,"completion_tokens":completion_tokens,"total_tokens":total_tokens,}# ===== Async Endpoints =====@llm_chat_callback()asyncdefachat(self,messages:Sequence[ChatMessage],**kwargs:Any,)->ChatResponse:achat_fn:Callable[...,Awaitable[ChatResponse]]ifself._use_chat_completions(kwargs):achat_fn=self._achatelse:achat_fn=acompletion_to_chat_decorator(self._acomplete)returnawaitachat_fn(messages,**kwargs)@llm_chat_callback()asyncdefastream_chat(self,messages:Sequence[ChatMessage],**kwargs:Any,)->ChatResponseAsyncGen:astream_chat_fn:Callable[...,Awaitable[ChatResponseAsyncGen]]ifself._use_chat_completions(kwargs):astream_chat_fn=self._astream_chatelse:astream_chat_fn=astream_completion_to_chat_decorator(self._astream_complete)returnawaitastream_chat_fn(messages,**kwargs)@llm_completion_callback()asyncdefacomplete(self,prompt:str,formatted:bool=False,**kwargs:Any)->CompletionResponse:ifself.modalitiesand"audio"inself.modalities:raiseValueError("Audio is not supported for completion. Use chat/achat instead.")ifself._use_chat_completions(kwargs):acomplete_fn=achat_to_completion_decorator(self._achat)else:acomplete_fn=self._acompletereturnawaitacomplete_fn(prompt,**kwargs)@llm_completion_callback()asyncdefastream_complete(self,prompt:str,formatted:bool=False,**kwargs:Any)->CompletionResponseAsyncGen:ifself._use_chat_completions(kwargs):astream_complete_fn=astream_chat_to_completion_decorator(self._astream_chat)else:astream_complete_fn=self._astream_completereturnawaitastream_complete_fn(prompt,**kwargs)@llm_retry_decoratorasyncdef_achat(self,messages:Sequence[ChatMessage],**kwargs:Any)->ChatResponse:aclient=self._get_aclient()message_dicts=to_openai_message_dicts(messages,model=self.model,)ifself.reuse_client:response=awaitaclient.chat.completions.create(messages=message_dicts,stream=False,**self._get_model_kwargs(**kwargs))else:asyncwithaclient:response=awaitaclient.chat.completions.create(messages=message_dicts,stream=False,**self._get_model_kwargs(**kwargs),)openai_message=response.choices[0].messagemessage=from_openai_message(openai_message,modalities=self.modalitiesor["text"])openai_token_logprobs=response.choices[0].logprobslogprobs=Noneifopenai_token_logprobsandopenai_token_logprobs.content:logprobs=from_openai_token_logprobs(openai_token_logprobs.content)returnChatResponse(message=message,raw=response,logprobs=logprobs,additional_kwargs=self._get_response_token_counts(response),)@llm_retry_decoratorasyncdef_astream_chat(self,messages:Sequence[ChatMessage],**kwargs:Any)->ChatResponseAsyncGen:ifself.modalitiesand"audio"inself.modalities:raiseValueError("Audio is not supported for chat streaming")aclient=self._get_aclient()message_dicts=to_openai_message_dicts(messages,model=self.model,)asyncdefgen()->ChatResponseAsyncGen:content=""tool_calls:List[ChoiceDeltaToolCall]=[]is_function=Falsefirst_chat_chunk=Trueasyncforresponseinawaitaclient.chat.completions.create(messages=message_dicts,**self._get_model_kwargs(stream=True,**kwargs),):response=cast(ChatCompletionChunk,response)iflen(response.choices)>0:# check if the first chunk has neither content nor tool_calls# this happens when 1106 models end up calling multiple toolsif(first_chat_chunkandresponse.choices[0].delta.contentisNoneandresponse.choices[0].delta.tool_callsisNone):first_chat_chunk=Falsecontinuedelta=response.choices[0].deltaelse:ifself._is_azure_client():continueelse:delta=ChoiceDelta()first_chat_chunk=FalseifdeltaisNone:continue# check if this chunk is the start of a function callifdelta.tool_calls:is_function=True# update using deltasrole=delta.roleorMessageRole.ASSISTANTcontent_delta=delta.contentor""content+=content_deltaadditional_kwargs={}ifis_function:tool_calls=update_tool_calls(tool_calls,delta.tool_calls)iftool_calls:additional_kwargs["tool_calls"]=tool_callsyieldChatResponse(message=ChatMessage(role=role,content=content,additional_kwargs=additional_kwargs,),delta=content_delta,raw=response,additional_kwargs=self._get_response_token_counts(response),)returngen()@llm_retry_decoratorasyncdef_acomplete(self,prompt:str,**kwargs:Any)->CompletionResponse:aclient=self._get_aclient()all_kwargs=self._get_model_kwargs(**kwargs)self._update_max_tokens(all_kwargs,prompt)ifself.reuse_client:response=awaitaclient.completions.create(prompt=prompt,stream=False,**all_kwargs,)else:asyncwithaclient:response=awaitaclient.completions.create(prompt=prompt,stream=False,**all_kwargs,)text=response.choices[0].textopenai_completion_logprobs=response.choices[0].logprobslogprobs=Noneifopenai_completion_logprobs:logprobs=from_openai_completion_logprobs(openai_completion_logprobs)returnCompletionResponse(text=text,raw=response,logprobs=logprobs,additional_kwargs=self._get_response_token_counts(response),)@llm_retry_decoratorasyncdef_astream_complete(self,prompt:str,**kwargs:Any)->CompletionResponseAsyncGen:aclient=self._get_aclient()all_kwargs=self._get_model_kwargs(stream=True,**kwargs)self._update_max_tokens(all_kwargs,prompt)asyncdefgen()->CompletionResponseAsyncGen:text=""asyncforresponseinawaitaclient.completions.create(prompt=prompt,**all_kwargs,):iflen(response.choices)>0:delta=response.choices[0].textifdeltaisNone:delta=""else:delta=""text+=deltayieldCompletionResponse(delta=delta,text=text,raw=response,additional_kwargs=self._get_response_token_counts(response),)returngen()def_prepare_chat_with_tools(self,tools:Sequence["BaseTool"],user_msg:Optional[Union[str,ChatMessage]]=None,chat_history:Optional[List[ChatMessage]]=None,verbose:bool=False,allow_parallel_tool_calls:bool=False,tool_choice:Union[str,dict]="auto",strict:Optional[bool]=None,**kwargs:Any,)->Dict[str,Any]:"""Predict and call the tool."""tool_specs=[tool.metadata.to_openai_tool()fortoolintools]# if strict is passed in, use, else default to the class-level attribute, else default to True`ifstrictisnotNone:strict=strictelse:strict=self.strictifself.metadata.is_function_calling_model:fortool_specintool_specs:iftool_spec["type"]=="function":tool_spec["function"]["strict"]=strict# in current openai 1.40.0 it is always false.tool_spec["function"]["parameters"]["additionalProperties"]=Falseifisinstance(user_msg,str):user_msg=ChatMessage(role=MessageRole.USER,content=user_msg)messages=chat_historyor[]ifuser_msg:messages.append(user_msg)return{"messages":messages,"tools":tool_specsorNone,"tool_choice":resolve_tool_choice(tool_choice)iftool_specselseNone,**kwargs,}def_validate_chat_with_tools_response(self,response:ChatResponse,tools:Sequence["BaseTool"],allow_parallel_tool_calls:bool=False,**kwargs:Any,)->ChatResponse:"""Validate the response from chat_with_tools."""ifnotallow_parallel_tool_calls:force_single_tool_call(response)returnresponsedefget_tool_calls_from_response(self,response:"ChatResponse",error_on_no_tool_call:bool=True,**kwargs:Any,)->List[ToolSelection]:"""Predict and call the tool."""tool_calls=response.message.additional_kwargs.get("tool_calls",[])iflen(tool_calls)<1:iferror_on_no_tool_call:raiseValueError(f"Expected at least one tool call, but got {len(tool_calls)} tool calls.")else:return[]tool_selections=[]fortool_callintool_calls:ifnotisinstance(tool_call,get_args(OpenAIToolCall)):raiseValueError("Invalid tool_call object")iftool_call.type!="function":raiseValueError("Invalid tool type. Unsupported by OpenAI")# this should handle both complete and partial jsonstry:argument_dict=parse_partial_json(tool_call.function.arguments)exceptValueError:argument_dict={}tool_selections.append(ToolSelection(tool_id=tool_call.id,tool_name=tool_call.function.name,tool_kwargs=argument_dict,))returntool_selections@dispatcher.spandefstructured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->BaseModel:"""Structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnsuper().structured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)@dispatcher.spanasyncdefastructured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->BaseModel:"""Structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnawaitsuper().astructured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)@dispatcher.spandefstream_structured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->Generator[Union[Model,List[Model]],None,None]:"""Stream structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnsuper().stream_structured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)@dispatcher.spanasyncdefastream_structured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->Generator[Union[Model,List[Model]],None,None]:"""Stream structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnawaitsuper().astream_structured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)
defget_tool_calls_from_response(self,response:"ChatResponse",error_on_no_tool_call:bool=True,**kwargs:Any,)->List[ToolSelection]:"""Predict and call the tool."""tool_calls=response.message.additional_kwargs.get("tool_calls",[])iflen(tool_calls)<1:iferror_on_no_tool_call:raiseValueError(f"Expected at least one tool call, but got {len(tool_calls)} tool calls.")else:return[]tool_selections=[]fortool_callintool_calls:ifnotisinstance(tool_call,get_args(OpenAIToolCall)):raiseValueError("Invalid tool_call object")iftool_call.type!="function":raiseValueError("Invalid tool type. Unsupported by OpenAI")# this should handle both complete and partial jsonstry:argument_dict=parse_partial_json(tool_call.function.arguments)exceptValueError:argument_dict={}tool_selections.append(ToolSelection(tool_id=tool_call.id,tool_name=tool_call.function.name,tool_kwargs=argument_dict,))returntool_selections
@dispatcher.spandefstructured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->BaseModel:"""Structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnsuper().structured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)
@dispatcher.spanasyncdefastructured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->BaseModel:"""Structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnawaitsuper().astructured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)
@dispatcher.spandefstream_structured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->Generator[Union[Model,List[Model]],None,None]:"""Stream structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnsuper().stream_structured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)
@dispatcher.spanasyncdefastream_structured_predict(self,*args:Any,llm_kwargs:Optional[Dict[str,Any]]=None,**kwargs:Any)->Generator[Union[Model,List[Model]],None,None]:"""Stream structured predict."""llm_kwargs=llm_kwargsor{}all_kwargs={**llm_kwargs,**kwargs}llm_kwargs["tool_choice"]=("required"if"tool_choice"notinall_kwargselseall_kwargs["tool_choice"])# by default structured prediction uses function calling to extract structured outputs# here we force tool_choice to be requiredreturnawaitsuper().astream_structured_predict(*args,llm_kwargs=llm_kwargs,**kwargs)