Bases: BaseReader
AirbyteCDKReader reader.
Retrieve documents from an Airbyte source implemented using the CDK.
Parameters:
Name |
Type |
Description |
Default |
source_class |
Any
|
The Airbyte source class.
|
required
|
config |
Mapping[str, Any]
|
The config object for the Airbyte source.
|
required
|
Source code in llama-index-integrations/readers/llama-index-readers-airbyte-cdk/llama_index/readers/airbyte_cdk/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 | class AirbyteCDKReader(BaseReader):
"""AirbyteCDKReader reader.
Retrieve documents from an Airbyte source implemented using the CDK.
Args:
source_class: The Airbyte source class.
config: The config object for the Airbyte source.
"""
def __init__(
self,
source_class: Any,
config: Mapping[str, Any],
record_handler: Optional[RecordHandler] = None,
) -> None:
"""Initialize with parameters."""
from airbyte_cdk.models.airbyte_protocol import AirbyteRecordMessage
from airbyte_cdk.sources.embedded.base_integration import (
BaseEmbeddedIntegration,
)
from airbyte_cdk.sources.embedded.runner import CDKRunner
class CDKIntegration(BaseEmbeddedIntegration):
def _handle_record(
self, record: AirbyteRecordMessage, id: Optional[str]
) -> Document:
if record_handler:
return record_handler(record, id)
return Document(
doc_id=id, text=json.dumps(record.data), extra_info=record.data
)
self._integration = CDKIntegration(
config=config,
runner=CDKRunner(source=source_class(), name=source_class.__name__),
)
def load_data(self, *args: Any, **kwargs: Any) -> List[Document]:
return list(self.lazy_load_data(*args, **kwargs))
def lazy_load_data(self, *args: Any, **kwargs: Any) -> Iterator[Document]:
return self._integration._load_data(*args, **kwargs)
@property
def last_state(self):
return self._integration.last_state
|