17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 | class CodeSplitter(TextSplitter):
"""Split code using a AST parser.
Thank you to Kevin Lu / SweepAI for suggesting this elegant code splitting solution.
https://docs.sweep.dev/blogs/chunking-2m-files
"""
language: str = Field(
description="The programming language of the code being split."
)
chunk_lines: int = Field(
default=DEFAULT_CHUNK_LINES,
description="The number of lines to include in each chunk.",
gt=0,
)
chunk_lines_overlap: int = Field(
default=DEFAULT_LINES_OVERLAP,
description="How many lines of code each chunk overlaps with.",
gt=0,
)
max_chars: int = Field(
default=DEFAULT_MAX_CHARS,
description="Maximum number of characters per chunk.",
gt=0,
)
_parser: Any = PrivateAttr()
def __init__(
self,
language: str,
chunk_lines: int = DEFAULT_CHUNK_LINES,
chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP,
max_chars: int = DEFAULT_MAX_CHARS,
parser: Any = None,
callback_manager: Optional[CallbackManager] = None,
include_metadata: bool = True,
include_prev_next_rel: bool = True,
id_func: Optional[Callable[[int, Document], str]] = None,
) -> None:
"""Initialize a CodeSplitter."""
from tree_sitter import Parser # pants: no-infer-dep
if parser is None:
try:
import tree_sitter_languages # pants: no-infer-dep
parser = tree_sitter_languages.get_parser(language)
except ImportError:
raise ImportError(
"Please install tree_sitter_languages to use CodeSplitter."
"Or pass in a parser object."
)
except Exception:
print(
f"Could not get parser for language {language}. Check "
"https://github.com/grantjenks/py-tree-sitter-languages#license "
"for a list of valid languages."
)
raise
if not isinstance(parser, Parser):
raise ValueError("Parser must be a tree-sitter Parser object.")
self._parser = parser
callback_manager = callback_manager or CallbackManager([])
id_func = id_func or default_id_func
super().__init__(
language=language,
chunk_lines=chunk_lines,
chunk_lines_overlap=chunk_lines_overlap,
max_chars=max_chars,
callback_manager=callback_manager,
include_metadata=include_metadata,
include_prev_next_rel=include_prev_next_rel,
id_func=id_func,
)
@classmethod
def from_defaults(
cls,
language: str,
chunk_lines: int = DEFAULT_CHUNK_LINES,
chunk_lines_overlap: int = DEFAULT_LINES_OVERLAP,
max_chars: int = DEFAULT_MAX_CHARS,
callback_manager: Optional[CallbackManager] = None,
parser: Any = None,
) -> "CodeSplitter":
"""Create a CodeSplitter with default values."""
return cls(
language=language,
chunk_lines=chunk_lines,
chunk_lines_overlap=chunk_lines_overlap,
max_chars=max_chars,
parser=parser,
)
@classmethod
def class_name(cls) -> str:
return "CodeSplitter"
def _chunk_node(self, node: Any, text: str, last_end: int = 0) -> List[str]:
new_chunks = []
current_chunk = ""
for child in node.children:
if child.end_byte - child.start_byte > self.max_chars:
# Child is too big, recursively chunk the child
if len(current_chunk) > 0:
new_chunks.append(current_chunk)
current_chunk = ""
new_chunks.extend(self._chunk_node(child, text, last_end))
elif (
len(current_chunk) + child.end_byte - child.start_byte > self.max_chars
):
# Child would make the current chunk too big, so start a new chunk
new_chunks.append(current_chunk)
current_chunk = text[last_end : child.end_byte]
else:
current_chunk += text[last_end : child.end_byte]
last_end = child.end_byte
if len(current_chunk) > 0:
new_chunks.append(current_chunk)
return new_chunks
def split_text(self, text: str) -> List[str]:
"""Split incoming code and return chunks using the AST."""
with self.callback_manager.event(
CBEventType.CHUNKING, payload={EventPayload.CHUNKS: [text]}
) as event:
tree = self._parser.parse(bytes(text, "utf-8"))
if (
not tree.root_node.children
or tree.root_node.children[0].type != "ERROR"
):
chunks = [
chunk.strip() for chunk in self._chunk_node(tree.root_node, text)
]
event.on_end(
payload={EventPayload.CHUNKS: chunks},
)
return chunks
else:
raise ValueError(f"Could not parse code with language {self.language}.")
|