Skip to content

Index

Pipeline schema.

InputKeys #

Bases: BaseModel

Input keys.

Parameters:

Name Type Description Default
required_keys Set[str]
set()
optional_keys Set[str]
set()
Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class InputKeys(BaseModel):
    """Input keys."""

    required_keys: Set[str] = Field(default_factory=set)
    optional_keys: Set[str] = Field(default_factory=set)

    @classmethod
    def from_keys(
        cls, required_keys: Set[str], optional_keys: Optional[Set[str]] = None
    ) -> "InputKeys":
        """Create InputKeys from tuple."""
        return cls(required_keys=required_keys, optional_keys=optional_keys or set())

    def validate_keys(self, input_keys: Set[str]) -> None:
        """Validate input keys."""
        # check if required keys are present, and that keys all are in required or optional
        if not self.required_keys.issubset(input_keys):
            raise ValueError(
                f"Required keys {self.required_keys} are not present in input keys {input_keys}"
            )
        if not input_keys.issubset(self.required_keys.union(self.optional_keys)):
            raise ValueError(
                f"Input keys {input_keys} contain keys not in required or optional keys {self.required_keys.union(self.optional_keys)}"
            )

    def __len__(self) -> int:
        """Length of input keys."""
        return len(self.required_keys) + len(self.optional_keys)

    def all(self) -> Set[str]:
        """Get all input keys."""
        return self.required_keys.union(self.optional_keys)

from_keys classmethod #

from_keys(required_keys: Set[str], optional_keys: Optional[Set[str]] = None) -> InputKeys

Create InputKeys from tuple.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
77
78
79
80
81
82
@classmethod
def from_keys(
    cls, required_keys: Set[str], optional_keys: Optional[Set[str]] = None
) -> "InputKeys":
    """Create InputKeys from tuple."""
    return cls(required_keys=required_keys, optional_keys=optional_keys or set())

validate_keys #

validate_keys(input_keys: Set[str]) -> None

Validate input keys.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
84
85
86
87
88
89
90
91
92
93
94
def validate_keys(self, input_keys: Set[str]) -> None:
    """Validate input keys."""
    # check if required keys are present, and that keys all are in required or optional
    if not self.required_keys.issubset(input_keys):
        raise ValueError(
            f"Required keys {self.required_keys} are not present in input keys {input_keys}"
        )
    if not input_keys.issubset(self.required_keys.union(self.optional_keys)):
        raise ValueError(
            f"Input keys {input_keys} contain keys not in required or optional keys {self.required_keys.union(self.optional_keys)}"
        )

all #

all() -> Set[str]

Get all input keys.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
100
101
102
def all(self) -> Set[str]:
    """Get all input keys."""
    return self.required_keys.union(self.optional_keys)

OutputKeys #

Bases: BaseModel

Output keys.

Parameters:

Name Type Description Default
required_keys Set[str]
set()
Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class OutputKeys(BaseModel):
    """Output keys."""

    required_keys: Set[str] = Field(default_factory=set)

    @classmethod
    def from_keys(
        cls,
        required_keys: Set[str],
    ) -> "OutputKeys":
        """Create OutputKeys from tuple."""
        return cls(required_keys=required_keys)

    def validate_keys(self, input_keys: Set[str]) -> None:
        """Validate input keys."""
        # validate that input keys exactly match required keys
        if input_keys != self.required_keys:
            raise ValueError(
                f"Input keys {input_keys} do not match required keys {self.required_keys}"
            )

from_keys classmethod #

from_keys(required_keys: Set[str]) -> OutputKeys

Create OutputKeys from tuple.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
110
111
112
113
114
115
116
@classmethod
def from_keys(
    cls,
    required_keys: Set[str],
) -> "OutputKeys":
    """Create OutputKeys from tuple."""
    return cls(required_keys=required_keys)

validate_keys #

validate_keys(input_keys: Set[str]) -> None

Validate input keys.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
118
119
120
121
122
123
124
def validate_keys(self, input_keys: Set[str]) -> None:
    """Validate input keys."""
    # validate that input keys exactly match required keys
    if input_keys != self.required_keys:
        raise ValueError(
            f"Input keys {input_keys} do not match required keys {self.required_keys}"
        )

ChainableMixin #

Bases: ABC

Chainable mixin.

A module that can produce a QueryComponent from a set of inputs through as_query_component.

If plugged in directly into a QueryPipeline, the ChainableMixin will be converted into a QueryComponent with default parameters.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class ChainableMixin(ABC):
    """Chainable mixin.

    A module that can produce a `QueryComponent` from a set of inputs through
    `as_query_component`.

    If plugged in directly into a `QueryPipeline`, the `ChainableMixin` will be
    converted into a `QueryComponent` with default parameters.

    """

    @abstractmethod
    def _as_query_component(self, **kwargs: Any) -> "QueryComponent":
        """Get query component."""

    def as_query_component(
        self, partial: Optional[Dict[str, Any]] = None, **kwargs: Any
    ) -> "QueryComponent":
        """Get query component."""
        component = self._as_query_component(**kwargs)
        component.partial(**(partial or {}))
        return component

as_query_component #

as_query_component(partial: Optional[Dict[str, Any]] = None, **kwargs: Any) -> QueryComponent

Get query component.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
142
143
144
145
146
147
148
def as_query_component(
    self, partial: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> "QueryComponent":
    """Get query component."""
    component = self._as_query_component(**kwargs)
    component.partial(**(partial or {}))
    return component

QueryComponent #

Bases: BaseModel

Query component.

Represents a component that can be run in a QueryPipeline.

Parameters:

Name Type Description Default
partial_dict Dict[str, Any]

Partial arguments to run_component

{}
Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
class QueryComponent(BaseModel):
    """Query component.

    Represents a component that can be run in a `QueryPipeline`.

    """

    partial_dict: Dict[str, Any] = Field(
        default_factory=dict, description="Partial arguments to run_component"
    )

    # TODO: make this a subclass of BaseComponent (e.g. use Pydantic)

    def partial(self, **kwargs: Any) -> None:
        """Update with partial arguments."""
        self.partial_dict.update(kwargs)

    @abstractmethod
    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        # TODO: refactor so that callback_manager is always passed in during runtime.

    @property
    def free_req_input_keys(self) -> Set[str]:
        """Get free input keys."""
        return self.input_keys.required_keys.difference(self.partial_dict.keys())

    @abstractmethod
    def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Validate component inputs during run_component."""

    def _validate_component_outputs(self, output: Dict[str, Any]) -> Dict[str, Any]:
        """Validate component outputs during run_component."""
        # override if needed
        return output

    def validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Validate component inputs."""
        # make sure set of input keys == self.input_keys
        self.input_keys.validate_keys(set(input.keys()))
        return self._validate_component_inputs(input)

    def validate_component_outputs(self, output: Dict[str, Any]) -> Dict[str, Any]:
        """Validate component outputs."""
        # make sure set of output keys == self.output_keys
        self.output_keys.validate_keys(set(output.keys()))
        return self._validate_component_outputs(output)

    def run_component(self, **kwargs: Any) -> Dict[str, Any]:
        """Run component."""
        kwargs.update(self.partial_dict)
        kwargs = self.validate_component_inputs(kwargs)
        component_outputs = self._run_component(**kwargs)
        return self.validate_component_outputs(component_outputs)

    async def arun_component(self, **kwargs: Any) -> Dict[str, Any]:
        """Run component."""
        kwargs.update(self.partial_dict)
        kwargs = self.validate_component_inputs(kwargs)
        component_outputs = await self._arun_component(**kwargs)
        return self.validate_component_outputs(component_outputs)

    @abstractmethod
    def _run_component(self, **kwargs: Any) -> Dict:
        """Run component."""

    @abstractmethod
    async def _arun_component(self, **kwargs: Any) -> Any:
        """Run component (async)."""

    @property
    @abstractmethod
    def input_keys(self) -> InputKeys:
        """Input keys."""

    @property
    @abstractmethod
    def output_keys(self) -> OutputKeys:
        """Output keys."""

    @property
    def sub_query_components(self) -> List["QueryComponent"]:
        """Get sub query components.

        Certain query components may have sub query components, e.g. a
        query pipeline will have sub query components, and so will
        an IfElseComponent.

        """
        return []

free_req_input_keys property #

free_req_input_keys: Set[str]

Get free input keys.

input_keys abstractmethod property #

input_keys: InputKeys

Input keys.

output_keys abstractmethod property #

output_keys: OutputKeys

Output keys.

sub_query_components property #

sub_query_components: List[QueryComponent]

Get sub query components.

Certain query components may have sub query components, e.g. a query pipeline will have sub query components, and so will an IfElseComponent.

partial #

partial(**kwargs: Any) -> None

Update with partial arguments.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
164
165
166
def partial(self, **kwargs: Any) -> None:
    """Update with partial arguments."""
    self.partial_dict.update(kwargs)

set_callback_manager abstractmethod #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
168
169
170
@abstractmethod
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""

validate_component_inputs #

validate_component_inputs(input: Dict[str, Any]) -> Dict[str, Any]

Validate component inputs.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
187
188
189
190
191
def validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
    """Validate component inputs."""
    # make sure set of input keys == self.input_keys
    self.input_keys.validate_keys(set(input.keys()))
    return self._validate_component_inputs(input)

validate_component_outputs #

validate_component_outputs(output: Dict[str, Any]) -> Dict[str, Any]

Validate component outputs.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
193
194
195
196
197
def validate_component_outputs(self, output: Dict[str, Any]) -> Dict[str, Any]:
    """Validate component outputs."""
    # make sure set of output keys == self.output_keys
    self.output_keys.validate_keys(set(output.keys()))
    return self._validate_component_outputs(output)

run_component #

run_component(**kwargs: Any) -> Dict[str, Any]

Run component.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
199
200
201
202
203
204
def run_component(self, **kwargs: Any) -> Dict[str, Any]:
    """Run component."""
    kwargs.update(self.partial_dict)
    kwargs = self.validate_component_inputs(kwargs)
    component_outputs = self._run_component(**kwargs)
    return self.validate_component_outputs(component_outputs)

arun_component async #

arun_component(**kwargs: Any) -> Dict[str, Any]

Run component.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
206
207
208
209
210
211
async def arun_component(self, **kwargs: Any) -> Dict[str, Any]:
    """Run component."""
    kwargs.update(self.partial_dict)
    kwargs = self.validate_component_inputs(kwargs)
    component_outputs = await self._arun_component(**kwargs)
    return self.validate_component_outputs(component_outputs)

CustomQueryComponent #

Bases: QueryComponent

Custom query component.

Parameters:

Name Type Description Default
callback_manager CallbackManager

Callback manager

<llama_index.core.callbacks.base.CallbackManager object at 0x7fbf3d599c10>
Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class CustomQueryComponent(QueryComponent):
    """Custom query component."""

    model_config = ConfigDict(arbitrary_types_allowed=True)
    callback_manager: CallbackManager = Field(
        default_factory=CallbackManager, description="Callback manager"
    )

    def set_callback_manager(self, callback_manager: CallbackManager) -> None:
        """Set callback manager."""
        self.callback_manager = callback_manager

    def _validate_component_inputs(self, input: Dict[str, Any]) -> Dict[str, Any]:
        """Validate component inputs during run_component."""
        # NOTE: user can override this method to validate inputs
        # but we do this by default for convenience
        return input

    async def _arun_component(self, **kwargs: Any) -> Any:
        """Run component (async)."""
        raise NotImplementedError("This component does not support async run.")

    @property
    def _input_keys(self) -> Set[str]:
        """Input keys dict."""
        raise NotImplementedError("Not implemented yet. Please override this method.")

    @property
    def _optional_input_keys(self) -> Set[str]:
        """Optional input keys dict."""
        return set()

    @property
    def _output_keys(self) -> Set[str]:
        """Output keys dict."""
        raise NotImplementedError("Not implemented yet. Please override this method.")

    @property
    def input_keys(self) -> InputKeys:
        """Input keys."""
        # NOTE: user can override this too, but we have them implement an
        # abstract method to make sure they do it

        return InputKeys.from_keys(
            required_keys=self._input_keys, optional_keys=self._optional_input_keys
        )

    @property
    def output_keys(self) -> OutputKeys:
        """Output keys."""
        # NOTE: user can override this too, but we have them implement an
        # abstract method to make sure they do it
        return OutputKeys.from_keys(self._output_keys)

input_keys property #

input_keys: InputKeys

Input keys.

output_keys property #

output_keys: OutputKeys

Output keys.

set_callback_manager #

set_callback_manager(callback_manager: CallbackManager) -> None

Set callback manager.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
251
252
253
def set_callback_manager(self, callback_manager: CallbackManager) -> None:
    """Set callback manager."""
    self.callback_manager = callback_manager

Bases: BaseModel

Link between two components.

Parameters:

Name Type Description Default
src str

Source component name

required
dest str

Destination component name

required
src_key str | None

Source component output key

None
dest_key str | None

Destination component input key

None
condition_fn Callable | None

Condition to determine if link should be followed

None
input_fn Callable | None

Input to destination component

None
Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class Link(BaseModel):
    """Link between two components."""

    src: str = Field(..., description="Source component name")
    dest: str = Field(..., description="Destination component name")
    src_key: Optional[str] = Field(
        default=None, description="Source component output key"
    )
    dest_key: Optional[str] = Field(
        default=None, description="Destination component input key"
    )

    condition_fn: Optional[Callable] = Field(
        default=None, description="Condition to determine if link should be followed"
    )
    input_fn: Optional[Callable] = Field(
        default=None, description="Input to destination component"
    )

    def __init__(
        self,
        src: str,
        dest: str,
        src_key: Optional[str] = None,
        dest_key: Optional[str] = None,
        condition_fn: Optional[Callable] = None,
        input_fn: Optional[Callable] = None,
    ) -> None:
        """Init params."""
        # NOTE: This is to enable positional args.
        super().__init__(
            src=src,
            dest=dest,
            src_key=src_key,
            dest_key=dest_key,
            condition_fn=condition_fn,
            input_fn=input_fn,
        )

ComponentIntermediates #

Component intermediate inputs and outputs.

Source code in llama-index-core/llama_index/core/base/query_pipeline/query.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class ComponentIntermediates:
    """Component intermediate inputs and outputs."""

    def __init__(
        self,
        inputs: Dict[str, Any],
        outputs: Dict[str, Any],
    ) -> None:
        """Initialize."""
        self.inputs = inputs
        self.outputs = outputs

    def __repr__(self) -> str:
        return (
            f"ComponentIntermediates(inputs={self.inputs!s}, "
            f"outputs={self.outputs!s})"
        )

    def __str__(self) -> str:
        return self.__repr__()