Skip to content

Context

Context #

A global object representing a context for a given workflow run.

The Context object can be used to store data that needs to be available across iterations during a workflow execution, and across multiple workflow runs. Every context instance offers two type of data storage: a global one, that's shared among all the steps within a workflow, and private one, that's only accessible from a single step.

Both set and get operations on global data are governed by a lock, and considered coroutine-safe.

Source code in llama-index-core/llama_index/core/workflow/context.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class Context:
    """A global object representing a context for a given workflow run.

    The Context object can be used to store data that needs to be available across iterations during a workflow
    execution, and across multiple workflow runs.
    Every context instance offers two type of data storage: a global one, that's shared among all the steps within a
    workflow, and private one, that's only accessible from a single step.

    Both `set` and `get` operations on global data are governed by a lock, and considered coroutine-safe.
    """

    def __init__(
        self,
        workflow: "Workflow",
        stepwise: bool = False,
    ) -> None:
        self.stepwise = stepwise
        self.is_running = False

        self._workflow = workflow
        # Broker machinery
        self._waiter_id = str(uuid.uuid4())
        self._queues: Dict[str, asyncio.Queue] = {self._waiter_id: asyncio.Queue()}
        self._tasks: Set[asyncio.Task] = set()
        self._broker_log: List[Event] = []
        self._cancel_flag: asyncio.Event = asyncio.Event()
        self._step_flags: Dict[str, asyncio.Event] = {}
        self._step_event_holding: Optional[Event] = None
        self._step_lock: asyncio.Lock = asyncio.Lock()
        self._step_condition: asyncio.Condition = asyncio.Condition(
            lock=self._step_lock
        )
        self._step_event_written: asyncio.Condition = asyncio.Condition(
            lock=self._step_lock
        )
        self._accepted_events: List[Tuple[str, str]] = []
        self._retval: Any = None
        self._in_progress: Dict[str, List[Event]] = defaultdict(list)
        # Streaming machinery
        self._streaming_queue: asyncio.Queue = asyncio.Queue()
        # Global data storage
        self._lock = asyncio.Lock()
        self._globals: Dict[str, Any] = {}
        # Step-specific instance
        self._events_buffer: Dict[Type[Event], List[Event]] = defaultdict(list)

    def _serialize_queue(self, queue: asyncio.Queue, serializer: BaseSerializer) -> str:
        queue_items = list(queue._queue)  # type: ignore
        queue_objs = [serializer.serialize(obj) for obj in queue_items]
        return json.dumps(queue_objs)  # type: ignore

    def _deserialize_queue(
        self,
        queue_str: str,
        serializer: BaseSerializer,
        prefix_queue_objs: List[Any] = [],
    ) -> asyncio.Queue:
        queue_objs = json.loads(queue_str)
        queue_objs = prefix_queue_objs + queue_objs
        queue = asyncio.Queue()  # type: ignore
        for obj in queue_objs:
            event_obj = serializer.deserialize(obj)
            queue.put_nowait(event_obj)
        return queue

    def _serialize_globals(self, serializer: BaseSerializer) -> Dict[str, Any]:
        serialized_globals = {}
        for key, value in self._globals.items():
            try:
                serialized_globals[key] = serializer.serialize(value)
            except Exception as e:
                raise ValueError(f"Failed to serialize value for key {key}: {e}")
        return serialized_globals

    def _deserialize_globals(
        self, serialized_globals: Dict[str, Any], serializer: BaseSerializer
    ) -> Dict[str, Any]:
        deserialized_globals = {}
        for key, value in serialized_globals.items():
            try:
                deserialized_globals[key] = serializer.deserialize(value)
            except Exception as e:
                raise ValueError(f"Failed to deserialize value for key {key}: {e}")
        return deserialized_globals

    def to_dict(self, serializer: Optional[BaseSerializer] = None) -> Dict[str, Any]:
        serializer = serializer or JsonSerializer()

        return {
            "globals": self._serialize_globals(serializer),
            "streaming_queue": self._serialize_queue(self._streaming_queue, serializer),
            "queues": {
                k: self._serialize_queue(v, serializer) for k, v in self._queues.items()
            },
            "stepwise": self.stepwise,
            "events_buffer": {
                k: [serializer.serialize(ev) for ev in v]
                for k, v in self._events_buffer.items()
            },
            "in_progress": {
                k: [serializer.serialize(ev) for ev in v]
                for k, v in self._in_progress.items()
            },
            "accepted_events": self._accepted_events,
            "broker_log": [serializer.serialize(ev) for ev in self._broker_log],
            "waiter_id": self._waiter_id,
            "is_running": self.is_running,
        }

    @classmethod
    def from_dict(
        cls,
        workflow: "Workflow",
        data: Dict[str, Any],
        serializer: Optional[BaseSerializer] = None,
    ) -> "Context":
        serializer = serializer or JsonSerializer()

        context = cls(workflow, stepwise=data["stepwise"])
        context._globals = context._deserialize_globals(data["globals"], serializer)
        context._streaming_queue = context._deserialize_queue(
            data["streaming_queue"], serializer
        )
        context._events_buffer = {
            k: [serializer.deserialize(ev) for ev in v]
            for k, v in data["events_buffer"].items()
        }
        if len(context._events_buffer) == 0:
            context._events_buffer = defaultdict(list)
        context._accepted_events = data["accepted_events"]
        context._waiter_id = data.get("waiter_id", str(uuid.uuid4()))
        context._broker_log = [serializer.deserialize(ev) for ev in data["broker_log"]]
        context.is_running = data["is_running"]
        # load back up whatever was in the queue as well as the events whose steps
        # were in progress when the serialization of the Context took place
        context._queues = {
            k: context._deserialize_queue(
                v, serializer, prefix_queue_objs=data["in_progress"].get(k, [])
            )
            for k, v in data["queues"].items()
        }
        context._in_progress = defaultdict(list)
        return context

    async def set(self, key: str, value: Any, make_private: bool = False) -> None:
        """Store `value` into the Context under `key`.

        Args:
            key: A unique string to identify the value stored.
            value: The data to be stored.

        Raises:
            ValueError: When make_private is True but a key already exists in the global storage.
        """
        if make_private:
            warnings.warn(
                "`make_private` is deprecated and will be ignored", DeprecationWarning
            )

        async with self.lock:
            self._globals[key] = value

    async def mark_in_progress(self, name: str, ev: Event) -> None:
        """Add input event to in_progress dict.

        Args:
            name (str): The name of the step that is in progress.
            ev (Event): The input event that kicked off this step.
        """
        async with self.lock:
            self._in_progress[name].append(ev)

    async def remove_from_in_progress(self, name: str, ev: Event) -> None:
        """Remove input event from active steps.

        Args:
            name (str): The name of the step that has been completed.
            ev (Event): The associated input event that kicked of this completed step.
        """
        async with self.lock:
            events = [e for e in self._in_progress[name] if e != ev]
            self._in_progress[name] = events

    async def get(self, key: str, default: Optional[Any] = Ellipsis) -> Any:
        """Get the value corresponding to `key` from the Context.

        Args:
            key: A unique string to identify the value stored.
            default: The value to return when `key` is missing instead of raising an exception.

        Raises:
            ValueError: When there's not value accessible corresponding to `key`.
        """
        async with self.lock:
            if key in self._globals:
                return self._globals[key]
            elif default is not Ellipsis:
                return default

        msg = f"Key '{key}' not found in Context"
        raise ValueError(msg)

    @property
    def data(self) -> Dict[str, Any]:
        """This property is provided for backward compatibility.

        Use `get` and `set` instead.
        """
        msg = "`data` is deprecated, please use the `get` and `set` method to store data into the Context."
        warnings.warn(msg, DeprecationWarning)
        return self._globals

    @property
    def lock(self) -> asyncio.Lock:
        """Returns a mutex to lock the Context."""
        return self._lock

    @property
    def session(self) -> "Context":
        """This property is provided for backward compatibility."""
        msg = "`session` is deprecated, please use the Context instance directly."
        warnings.warn(msg, DeprecationWarning)
        return self

    def collect_events(
        self, ev: Event, expected: List[Type[Event]]
    ) -> Optional[List[Event]]:
        self._events_buffer[type(ev)].append(ev)

        retval: List[Event] = []
        for e_type in expected:
            e_instance_list = self._events_buffer.get(e_type)
            if e_instance_list:
                retval.append(e_instance_list.pop(0))

        if len(retval) == len(expected):
            return retval

        # put back the events if unable to collect all
        for ev in retval:
            self._events_buffer[type(ev)].append(ev)

        return None

    def send_event(self, message: Event, step: Optional[str] = None) -> None:
        """Sends an event to a specific step in the workflow.

        If step is None, the event is sent to all the receivers and we let
        them discard events they don't want.
        """
        if step is None:
            for queue in self._queues.values():
                queue.put_nowait(message)
        else:
            if step not in self._workflow._get_steps():
                raise WorkflowRuntimeError(f"Step {step} does not exist")

            step_func = self._workflow._get_steps()[step]
            step_config: Optional[StepConfig] = getattr(
                step_func, "__step_config", None
            )

            if step_config and type(message) in step_config.accepted_events:
                self._queues[step].put_nowait(message)
            else:
                raise WorkflowRuntimeError(
                    f"Step {step} does not accept event of type {type(message)}"
                )

        self._broker_log.append(message)

    async def wait_for_event(
        self,
        event_type: Type[T],
        requirements: Optional[Dict[str, Any]] = None,
        timeout: Optional[float] = 2000,
    ) -> T:
        """Asynchronously wait for a specific event type to be received.

        Args:
            event_type: The type of event to wait for
            requirements: Optional dict of requirements the event must match
            timeout: Optional timeout in seconds. Defaults to 2000s.

        Returns:
            The event type that was requested.

        Raises:
            asyncio.TimeoutError: If the timeout is reached before receiving matching event
        """
        requirements = requirements or {}

        while True:
            event = await asyncio.wait_for(
                self._queues[self._waiter_id].get(), timeout=timeout
            )
            if type(event) == event_type:
                if all(
                    event.get(k, default=None) == v for k, v in requirements.items()
                ):
                    return event
                else:
                    continue

    def write_event_to_stream(self, ev: Optional[Event]) -> None:
        self._streaming_queue.put_nowait(ev)

    def get_result(self) -> Any:
        """Returns the result of the workflow."""
        return self._retval

    @property
    def streaming_queue(self) -> asyncio.Queue:
        return self._streaming_queue

data property #

data: Dict[str, Any]

This property is provided for backward compatibility.

Use get and set instead.

lock property #

lock: Lock

Returns a mutex to lock the Context.

session property #

session: Context

This property is provided for backward compatibility.

set async #

set(key: str, value: Any, make_private: bool = False) -> None

Store value into the Context under key.

Parameters:

Name Type Description Default
key str

A unique string to identify the value stored.

required
value Any

The data to be stored.

required

Raises:

Type Description
ValueError

When make_private is True but a key already exists in the global storage.

Source code in llama-index-core/llama_index/core/workflow/context.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
async def set(self, key: str, value: Any, make_private: bool = False) -> None:
    """Store `value` into the Context under `key`.

    Args:
        key: A unique string to identify the value stored.
        value: The data to be stored.

    Raises:
        ValueError: When make_private is True but a key already exists in the global storage.
    """
    if make_private:
        warnings.warn(
            "`make_private` is deprecated and will be ignored", DeprecationWarning
        )

    async with self.lock:
        self._globals[key] = value

mark_in_progress async #

mark_in_progress(name: str, ev: Event) -> None

Add input event to in_progress dict.

Parameters:

Name Type Description Default
name str

The name of the step that is in progress.

required
ev Event

The input event that kicked off this step.

required
Source code in llama-index-core/llama_index/core/workflow/context.py
181
182
183
184
185
186
187
188
189
async def mark_in_progress(self, name: str, ev: Event) -> None:
    """Add input event to in_progress dict.

    Args:
        name (str): The name of the step that is in progress.
        ev (Event): The input event that kicked off this step.
    """
    async with self.lock:
        self._in_progress[name].append(ev)

remove_from_in_progress async #

remove_from_in_progress(name: str, ev: Event) -> None

Remove input event from active steps.

Parameters:

Name Type Description Default
name str

The name of the step that has been completed.

required
ev Event

The associated input event that kicked of this completed step.

required
Source code in llama-index-core/llama_index/core/workflow/context.py
191
192
193
194
195
196
197
198
199
200
async def remove_from_in_progress(self, name: str, ev: Event) -> None:
    """Remove input event from active steps.

    Args:
        name (str): The name of the step that has been completed.
        ev (Event): The associated input event that kicked of this completed step.
    """
    async with self.lock:
        events = [e for e in self._in_progress[name] if e != ev]
        self._in_progress[name] = events

get async #

get(key: str, default: Optional[Any] = Ellipsis) -> Any

Get the value corresponding to key from the Context.

Parameters:

Name Type Description Default
key str

A unique string to identify the value stored.

required
default Optional[Any]

The value to return when key is missing instead of raising an exception.

Ellipsis

Raises:

Type Description
ValueError

When there's not value accessible corresponding to key.

Source code in llama-index-core/llama_index/core/workflow/context.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
async def get(self, key: str, default: Optional[Any] = Ellipsis) -> Any:
    """Get the value corresponding to `key` from the Context.

    Args:
        key: A unique string to identify the value stored.
        default: The value to return when `key` is missing instead of raising an exception.

    Raises:
        ValueError: When there's not value accessible corresponding to `key`.
    """
    async with self.lock:
        if key in self._globals:
            return self._globals[key]
        elif default is not Ellipsis:
            return default

    msg = f"Key '{key}' not found in Context"
    raise ValueError(msg)

send_event #

send_event(message: Event, step: Optional[str] = None) -> None

Sends an event to a specific step in the workflow.

If step is None, the event is sent to all the receivers and we let them discard events they don't want.

Source code in llama-index-core/llama_index/core/workflow/context.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def send_event(self, message: Event, step: Optional[str] = None) -> None:
    """Sends an event to a specific step in the workflow.

    If step is None, the event is sent to all the receivers and we let
    them discard events they don't want.
    """
    if step is None:
        for queue in self._queues.values():
            queue.put_nowait(message)
    else:
        if step not in self._workflow._get_steps():
            raise WorkflowRuntimeError(f"Step {step} does not exist")

        step_func = self._workflow._get_steps()[step]
        step_config: Optional[StepConfig] = getattr(
            step_func, "__step_config", None
        )

        if step_config and type(message) in step_config.accepted_events:
            self._queues[step].put_nowait(message)
        else:
            raise WorkflowRuntimeError(
                f"Step {step} does not accept event of type {type(message)}"
            )

    self._broker_log.append(message)

wait_for_event async #

wait_for_event(event_type: Type[T], requirements: Optional[Dict[str, Any]] = None, timeout: Optional[float] = 2000) -> T

Asynchronously wait for a specific event type to be received.

Parameters:

Name Type Description Default
event_type Type[T]

The type of event to wait for

required
requirements Optional[Dict[str, Any]]

Optional dict of requirements the event must match

None
timeout Optional[float]

Optional timeout in seconds. Defaults to 2000s.

2000

Returns:

Type Description
T

The event type that was requested.

Raises:

Type Description
TimeoutError

If the timeout is reached before receiving matching event

Source code in llama-index-core/llama_index/core/workflow/context.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
async def wait_for_event(
    self,
    event_type: Type[T],
    requirements: Optional[Dict[str, Any]] = None,
    timeout: Optional[float] = 2000,
) -> T:
    """Asynchronously wait for a specific event type to be received.

    Args:
        event_type: The type of event to wait for
        requirements: Optional dict of requirements the event must match
        timeout: Optional timeout in seconds. Defaults to 2000s.

    Returns:
        The event type that was requested.

    Raises:
        asyncio.TimeoutError: If the timeout is reached before receiving matching event
    """
    requirements = requirements or {}

    while True:
        event = await asyncio.wait_for(
            self._queues[self._waiter_id].get(), timeout=timeout
        )
        if type(event) == event_type:
            if all(
                event.get(k, default=None) == v for k, v in requirements.items()
            ):
                return event
            else:
                continue

get_result #

get_result() -> Any

Returns the result of the workflow.

Source code in llama-index-core/llama_index/core/workflow/context.py
326
327
328
def get_result(self) -> Any:
    """Returns the result of the workflow."""
    return self._retval