classTogetherEmbedding(BaseEmbedding):api_base:str=Field(default="https://api.together.xyz/v1",description="The base URL for the Together API.",)api_key:str=Field(default="",description="The API key for the Together API. If not set, will attempt to use the TOGETHER_API_KEY environment variable.",)def__init__(self,model_name:str,api_key:Optional[str]=None,api_base:str="https://api.together.xyz/v1",**kwargs:Any,)->None:api_key=api_keyoros.environ.get("TOGETHER_API_KEY",None)super().__init__(model_name=model_name,api_key=api_key,api_base=api_base,**kwargs,)def_generate_embedding(self,text:str,model_api_string:str)->Embedding:"""Generate embeddings from Together API. Args: text: str. An input text sentence or document. model_api_string: str. An API string for a specific embedding model of your choice. Returns: embeddings: a list of float numbers. Embeddings correspond to your given text. """headers={"accept":"application/json","content-type":"application/json","Authorization":f"Bearer {self.api_key}",}session=requests.session()whileTrue:response=session.post(self.api_base.strip("/")+"/embeddings",headers=headers,json={"input":text,"model":model_api_string},)ifresponse.status_code!=200:ifresponse.status_code==429:"""Rate limit exceeded, wait for reset"""reset_time=int(response.headers.get("X-RateLimit-Reset",0))ifreset_time>0:time.sleep(reset_time)continueelse:"""Rate limit reset time has passed, retry immediately"""continue""" Handle other non-200 status codes """raiseValueError(f"Request failed with status code {response.status_code}: {response.text}")returnresponse.json()["data"][0]["embedding"]asyncdef_agenerate_embedding(self,text:str,model_api_string:str)->Embedding:"""Async generate embeddings from Together API. Args: text: str. An input text sentence or document. model_api_string: str. An API string for a specific embedding model of your choice. Returns: embeddings: a list of float numbers. Embeddings correspond to your given text. """headers={"accept":"application/json","content-type":"application/json","Authorization":f"Bearer {self.api_key}",}asyncwithhttpx.AsyncClient()asclient:whileTrue:response=awaitclient.post(self.api_base.strip("/")+"/embeddings",headers=headers,json={"input":text,"model":model_api_string},)ifresponse.status_code!=200:ifresponse.status_code==429:"""Rate limit exceeded, wait for reset"""reset_time=int(response.headers.get("X-RateLimit-Reset",0))ifreset_time>0:awaitasyncio.sleep(reset_time)continueelse:"""Rate limit reset time has passed, retry immediately"""continue""" Handle other non-200 status codes"""raiseValueError(f"Request failed with status code {response.status_code}: {response.text}")returnresponse.json()["data"][0]["embedding"]def_get_text_embedding(self,text:str)->Embedding:"""Get text embedding."""returnself._generate_embedding(text,self.model_name)def_get_query_embedding(self,query:str)->Embedding:"""Get query embedding."""returnself._generate_embedding(query,self.model_name)def_get_text_embeddings(self,texts:List[str])->List[Embedding]:"""Get text embeddings."""return[self._generate_embedding(text,self.model_name)fortextintexts]asyncdef_aget_text_embedding(self,text:str)->Embedding:"""Async get text embedding."""returnawaitself._agenerate_embedding(text,self.model_name)asyncdef_aget_query_embedding(self,query:str)->Embedding:"""Async get query embedding."""returnawaitself._agenerate_embedding(query,self.model_name)asyncdef_aget_text_embeddings(self,texts:List[str])->List[Embedding]:"""Async get text embeddings."""returnawaitasyncio.gather(*[self._agenerate_embedding(text,self.model_name)fortextintexts])