34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321 | class SentenceSplitter(MetadataAwareTextSplitter):
"""Parse text with a preference for complete sentences.
In general, this class tries to keep sentences and paragraphs together. Therefore
compared to the original TokenTextSplitter, there are less likely to be
hanging sentences or parts of sentences at the end of the node chunk.
"""
chunk_size: int = Field(
default=DEFAULT_CHUNK_SIZE,
description="The token chunk size for each chunk.",
gt=0,
)
chunk_overlap: int = Field(
default=SENTENCE_CHUNK_OVERLAP,
description="The token overlap of each chunk when splitting.",
ge=0,
)
separator: str = Field(
default=" ", description="Default separator for splitting into words"
)
paragraph_separator: str = Field(
default=DEFAULT_PARAGRAPH_SEP, description="Separator between paragraphs."
)
secondary_chunking_regex: Optional[str] = Field(
default=CHUNKING_REGEX, description="Backup regex for splitting into sentences."
)
_chunking_tokenizer_fn: Callable[[str], List[str]] = PrivateAttr()
_tokenizer: Callable = PrivateAttr()
_split_fns: List[Callable] = PrivateAttr()
_sub_sentence_split_fns: List[Callable] = PrivateAttr()
def __init__(
self,
separator: str = " ",
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = SENTENCE_CHUNK_OVERLAP,
tokenizer: Optional[Callable] = None,
paragraph_separator: str = DEFAULT_PARAGRAPH_SEP,
chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None,
secondary_chunking_regex: Optional[str] = CHUNKING_REGEX,
callback_manager: Optional[CallbackManager] = None,
include_metadata: bool = True,
include_prev_next_rel: bool = True,
id_func: Optional[Callable] = None,
):
"""Initialize with parameters."""
if chunk_overlap > chunk_size:
raise ValueError(
f"Got a larger chunk overlap ({chunk_overlap}) than chunk size "
f"({chunk_size}), should be smaller."
)
id_func = id_func or default_id_func
callback_manager = callback_manager or CallbackManager([])
super().__init__(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
secondary_chunking_regex=secondary_chunking_regex,
separator=separator,
paragraph_separator=paragraph_separator,
callback_manager=callback_manager,
include_metadata=include_metadata,
include_prev_next_rel=include_prev_next_rel,
id_func=id_func,
)
self._chunking_tokenizer_fn = (
chunking_tokenizer_fn or split_by_sentence_tokenizer()
)
self._tokenizer = tokenizer or get_tokenizer()
self._split_fns = [
split_by_sep(paragraph_separator),
self._chunking_tokenizer_fn,
]
if secondary_chunking_regex:
self._sub_sentence_split_fns = [
split_by_regex(secondary_chunking_regex),
split_by_sep(separator),
split_by_char(),
]
else:
self._sub_sentence_split_fns = [
split_by_sep(separator),
split_by_char(),
]
@classmethod
def from_defaults(
cls,
separator: str = " ",
chunk_size: int = DEFAULT_CHUNK_SIZE,
chunk_overlap: int = SENTENCE_CHUNK_OVERLAP,
tokenizer: Optional[Callable] = None,
paragraph_separator: str = DEFAULT_PARAGRAPH_SEP,
chunking_tokenizer_fn: Optional[Callable[[str], List[str]]] = None,
secondary_chunking_regex: str = CHUNKING_REGEX,
callback_manager: Optional[CallbackManager] = None,
include_metadata: bool = True,
include_prev_next_rel: bool = True,
) -> "SentenceSplitter":
"""Initialize with parameters."""
callback_manager = callback_manager or CallbackManager([])
return cls(
separator=separator,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
tokenizer=tokenizer,
paragraph_separator=paragraph_separator,
chunking_tokenizer_fn=chunking_tokenizer_fn,
secondary_chunking_regex=secondary_chunking_regex,
callback_manager=callback_manager,
include_metadata=include_metadata,
include_prev_next_rel=include_prev_next_rel,
)
@classmethod
def class_name(cls) -> str:
return "SentenceSplitter"
def split_text_metadata_aware(self, text: str, metadata_str: str) -> List[str]:
metadata_len = len(self._tokenizer(metadata_str))
effective_chunk_size = self.chunk_size - metadata_len
if effective_chunk_size <= 0:
raise ValueError(
f"Metadata length ({metadata_len}) is longer than chunk size "
f"({self.chunk_size}). Consider increasing the chunk size or "
"decreasing the size of your metadata to avoid this."
)
elif effective_chunk_size < 50:
print(
f"Metadata length ({metadata_len}) is close to chunk size "
f"({self.chunk_size}). Resulting chunks are less than 50 tokens. "
"Consider increasing the chunk size or decreasing the size of "
"your metadata to avoid this.",
flush=True,
)
return self._split_text(text, chunk_size=effective_chunk_size)
def split_text(self, text: str) -> List[str]:
return self._split_text(text, chunk_size=self.chunk_size)
def _split_text(self, text: str, chunk_size: int) -> List[str]:
"""
_Split incoming text and return chunks with overlap size.
Has a preference for complete sentences, phrases, and minimal overlap.
"""
if text == "":
return [text]
with self.callback_manager.event(
CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]}
) as event:
splits = self._split(text, chunk_size)
chunks = self._merge(splits, chunk_size)
event.on_end(payload={EventPayload.CHUNKS: chunks})
return chunks
def _split(self, text: str, chunk_size: int) -> List[_Split]:
r"""Break text into splits that are smaller than chunk size.
The order of splitting is:
1. split by paragraph separator
2. split by chunking tokenizer (default is nltk sentence tokenizer)
3. split by second chunking regex (default is "[^,\.;]+[,\.;]?")
4. split by default separator (" ")
"""
token_size = self._token_size(text)
if token_size <= chunk_size:
return [_Split(text, is_sentence=True, token_size=token_size)]
text_splits_by_fns, is_sentence = self._get_splits_by_fns(text)
text_splits = []
for text_split_by_fns in text_splits_by_fns:
token_size = self._token_size(text_split_by_fns)
if token_size <= chunk_size:
text_splits.append(
_Split(
text_split_by_fns,
is_sentence=is_sentence,
token_size=token_size,
)
)
else:
recursive_text_splits = self._split(
text_split_by_fns, chunk_size=chunk_size
)
text_splits.extend(recursive_text_splits)
return text_splits
def _merge(self, splits: List[_Split], chunk_size: int) -> List[str]:
"""Merge splits into chunks."""
chunks: List[str] = []
cur_chunk: List[Tuple[str, int]] = [] # list of (text, length)
last_chunk: List[Tuple[str, int]] = []
cur_chunk_len = 0
new_chunk = True
def close_chunk() -> None:
nonlocal chunks, cur_chunk, last_chunk, cur_chunk_len, new_chunk
chunks.append("".join([text for text, length in cur_chunk]))
last_chunk = cur_chunk
cur_chunk = []
cur_chunk_len = 0
new_chunk = True
# add overlap to the next chunk using the last one first
# there is a small issue with this logic. If the chunk directly after
# the overlap is really big, then we could go over the chunk_size, and
# in theory the correct thing to do would be to remove some/all of the
# overlap. However, it would complicate the logic further without
# much real world benefit, so it's not implemented now.
if len(last_chunk) > 0:
last_index = len(last_chunk) - 1
while (
last_index >= 0
and cur_chunk_len + last_chunk[last_index][1] <= self.chunk_overlap
):
text, length = last_chunk[last_index]
cur_chunk_len += length
cur_chunk.insert(0, (text, length))
last_index -= 1
while len(splits) > 0:
cur_split = splits[0]
if cur_split.token_size > chunk_size:
raise ValueError("Single token exceeded chunk size")
if cur_chunk_len + cur_split.token_size > chunk_size and not new_chunk:
# if adding split to current chunk exceeds chunk size: close out chunk
close_chunk()
else:
if (
cur_split.is_sentence
or cur_chunk_len + cur_split.token_size <= chunk_size
or new_chunk # new chunk, always add at least one split
):
# add split to chunk
cur_chunk_len += cur_split.token_size
cur_chunk.append((cur_split.text, cur_split.token_size))
splits.pop(0)
new_chunk = False
else:
# close out chunk
close_chunk()
# handle the last chunk
if not new_chunk:
chunk = "".join([text for text, length in cur_chunk])
chunks.append(chunk)
# run postprocessing to remove blank spaces
return self._postprocess_chunks(chunks)
def _postprocess_chunks(self, chunks: List[str]) -> List[str]:
"""Post-process chunks.
Remove whitespace only chunks and remove leading and trailing whitespace.
"""
new_chunks = []
for chunk in chunks:
stripped_chunk = chunk.strip()
if stripped_chunk == "":
continue
new_chunks.append(stripped_chunk)
return new_chunks
def _token_size(self, text: str) -> int:
return len(self._tokenizer(text))
def _get_splits_by_fns(self, text: str) -> Tuple[List[str], bool]:
for split_fn in self._split_fns:
splits = split_fn(text)
if len(splits) > 1:
return splits, True
for split_fn in self._sub_sentence_split_fns:
splits = split_fn(text)
if len(splits) > 1:
break
return splits, False
|