17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 | class TreeSummarize(BaseSynthesizer):
"""
Tree summarize response builder.
This response builder recursively merges text chunks and summarizes them
in a bottom-up fashion (i.e. building a tree from leaves to root).
More concretely, at each recursively step:
1. we repack the text chunks so that each chunk fills the context window of the LLM
2. if there is only one chunk, we give the final response
3. otherwise, we summarize each chunk and recursively summarize the summaries.
"""
def __init__(
self,
llm: Optional[LLM] = None,
callback_manager: Optional[CallbackManager] = None,
prompt_helper: Optional[PromptHelper] = None,
summary_template: Optional[BasePromptTemplate] = None,
output_cls: Optional[BaseModel] = None,
streaming: bool = False,
use_async: bool = False,
verbose: bool = False,
) -> None:
super().__init__(
llm=llm,
callback_manager=callback_manager,
prompt_helper=prompt_helper,
streaming=streaming,
output_cls=output_cls,
)
self._summary_template = summary_template or DEFAULT_TREE_SUMMARIZE_PROMPT_SEL
self._use_async = use_async
self._verbose = verbose
def _get_prompts(self) -> PromptDictType:
"""Get prompts."""
return {"summary_template": self._summary_template}
def _update_prompts(self, prompts: PromptDictType) -> None:
"""Update prompts."""
if "summary_template" in prompts:
self._summary_template = prompts["summary_template"]
async def aget_response(
self,
query_str: str,
text_chunks: Sequence[str],
**response_kwargs: Any,
) -> RESPONSE_TEXT_TYPE:
"""Get tree summarize response."""
summary_template = self._summary_template.partial_format(query_str=query_str)
# repack text_chunks so that each chunk fills the context window
text_chunks = self._prompt_helper.repack(
summary_template, text_chunks=text_chunks
)
if self._verbose:
print(f"{len(text_chunks)} text chunks after repacking")
# give final response if there is only one chunk
if len(text_chunks) == 1:
response: RESPONSE_TEXT_TYPE
if self._streaming:
response = await self._llm.astream(
summary_template, context_str=text_chunks[0], **response_kwargs
)
else:
if self._output_cls is None:
response = await self._llm.apredict(
summary_template,
context_str=text_chunks[0],
**response_kwargs,
)
else:
response = await self._llm.astructured_predict( # type: ignore
self._output_cls,
summary_template,
context_str=text_chunks[0],
**response_kwargs,
)
# return pydantic object if output_cls is specified
return response
else:
# summarize each chunk
if self._output_cls is None:
tasks = [
self._llm.apredict(
summary_template,
context_str=text_chunk,
**response_kwargs,
)
for text_chunk in text_chunks
]
else:
tasks = [
self._llm.astructured_predict( # type: ignore
self._output_cls,
summary_template,
context_str=text_chunk,
**response_kwargs,
)
for text_chunk in text_chunks
]
summary_responses = await asyncio.gather(*tasks)
if self._output_cls is not None:
summaries = [summary.model_dump_json() for summary in summary_responses]
else:
summaries = summary_responses
# recursively summarize the summaries
return await self.aget_response(
query_str=query_str,
text_chunks=summaries,
**response_kwargs,
)
def get_response(
self,
query_str: str,
text_chunks: Sequence[str],
**response_kwargs: Any,
) -> RESPONSE_TEXT_TYPE:
"""Get tree summarize response."""
summary_template = self._summary_template.partial_format(query_str=query_str)
# repack text_chunks so that each chunk fills the context window
text_chunks = self._prompt_helper.repack(
summary_template, text_chunks=text_chunks
)
if self._verbose:
print(f"{len(text_chunks)} text chunks after repacking")
# give final response if there is only one chunk
if len(text_chunks) == 1:
response: RESPONSE_TEXT_TYPE
if self._streaming:
response = self._llm.stream(
summary_template, context_str=text_chunks[0], **response_kwargs
)
else:
if self._output_cls is None:
response = self._llm.predict(
summary_template,
context_str=text_chunks[0],
**response_kwargs,
)
else:
response = self._llm.structured_predict( # type: ignore
self._output_cls,
summary_template,
context_str=text_chunks[0],
**response_kwargs,
)
return response
else:
# summarize each chunk
if self._use_async:
if self._output_cls is None:
tasks = [
self._llm.apredict(
summary_template,
context_str=text_chunk,
**response_kwargs,
)
for text_chunk in text_chunks
]
else:
tasks = [
self._llm.astructured_predict( # type: ignore
self._output_cls,
summary_template,
context_str=text_chunk,
**response_kwargs,
)
for text_chunk in text_chunks
]
summary_responses = run_async_tasks(tasks)
if self._output_cls is not None:
summaries = [
summary.model_dump_json() for summary in summary_responses
]
else:
summaries = summary_responses
else:
if self._output_cls is None:
summaries = [
self._llm.predict(
summary_template,
context_str=text_chunk,
**response_kwargs,
)
for text_chunk in text_chunks
]
else:
summaries = [
self._llm.structured_predict( # type: ignore
self._output_cls,
summary_template,
context_str=text_chunk,
**response_kwargs,
)
for text_chunk in text_chunks
]
summaries = [summary.model_dump_json() for summary in summaries]
# recursively summarize the summaries
return self.get_response(
query_str=query_str, text_chunks=summaries, **response_kwargs
)
|