33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 | class SiliconFlowEmbedding(BaseEmbedding):
"""SiliconFlow class for embeddings."""
model: str = Field(
default="BAAI/bge-m3",
description="""\
The name of the embedding model to use.
512 tokens for all models input except `bge-m3` which is 8192.
""",
)
api_key: Optional[str] = Field(
default=None,
description="The SiliconFlow API key.",
)
base_url: str = Field(
default=DEFAULT_SILICONFLOW_API_URL,
description="The base URL for the SiliconFlow API.",
)
encoding_format: str = Field(
default="float",
description="The format to return the embeddings in. Can be either float or base64.",
) # TODO: Consider whether to fix the encoding format as float.
_headers: Any = PrivateAttr()
def __init__(
self,
model: str = "BAAI/bge-m3",
api_key: Optional[str] = None,
base_url: str = DEFAULT_SILICONFLOW_API_URL,
encoding_format: Optional[str] = "float",
callback_manager: Optional[CallbackManager] = None,
**kwargs: Any,
) -> None:
super().__init__(
model=model,
api_key=api_key,
base_url=base_url,
encoding_format=encoding_format,
callback_manager=callback_manager,
**kwargs,
)
assert (
self.encoding_format in VALID_ENCODING
), f"""\
Encoding_format parameter {self.encoding_format} not supported.
Please choose one of {VALID_ENCODING}".
"""
self._headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
@classmethod
def class_name(cls) -> str:
return "SiliconFlowEmbedding"
def _data_formatting(self, response: list) -> List[List[float]]:
results = sorted(response["data"], key=lambda e: e["index"])
if self.encoding_format == "base64":
return [base64_to_float_list(data["embedding"]) for data in results]
else:
return [data["embedding"] for data in results]
def _get_query_embedding(self, query: str) -> List[float]:
"""Get query embedding."""
return self._get_text_embeddings([query])[0]
async def _aget_query_embedding(self, query: str) -> List[float]:
"""The asynchronous version of _get_query_embedding."""
result = await self._aget_text_embeddings([query])
return result[0]
def _get_text_embedding(self, text: str) -> List[float]:
"""Get text embedding."""
return self._get_text_embeddings([text])[0]
async def _aget_text_embedding(self, text: str) -> List[float]:
"""Asynchronously get text embedding."""
result = await self._aget_text_embeddings([text])
return result[0]
def _get_text_embeddings(self, texts: List[str]) -> List[List[float]]:
with requests.Session() as session:
input_json = {
"model": self.model,
"input": texts,
"encoding_format": self.encoding_format,
}
response = session.post(
self.base_url, json=input_json, headers=self._headers
).json()
if "data" not in response:
raise RuntimeError(response)
return self._data_formatting(response)
async def _aget_text_embeddings(
self,
texts: List[str],
) -> List[List[float]]:
async with aiohttp.ClientSession() as session:
input_json = {
"input": texts,
"model": self.model,
"encoding_format": self.encoding_format,
}
async with session.post(
self.base_url, json=input_json, headers=self._headers
) as response:
response_json = await response.json()
response.raise_for_status()
return self._data_formatting(response_json)
|