A global object representing a context for a given workflow run.
The Context object can be used to store data that needs to be available across iterations during a workflow
execution, and across multiple workflow runs.
Every context instance offers two type of data storage: a global one, that's shared among all the steps within a
workflow, and private one, that's only accessible from a single step.
Both set
and get
operations on global data are governed by a lock, and considered coroutine-safe.
Source code in llama-index-core/llama_index/core/workflow/context.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 | class Context:
"""A global object representing a context for a given workflow run.
The Context object can be used to store data that needs to be available across iterations during a workflow
execution, and across multiple workflow runs.
Every context instance offers two type of data storage: a global one, that's shared among all the steps within a
workflow, and private one, that's only accessible from a single step.
Both `set` and `get` operations on global data are governed by a lock, and considered coroutine-safe.
"""
def __init__(self, workflow: "Workflow", stepwise: bool = False) -> None:
self.stepwise = stepwise
self._workflow = workflow
# Broker machinery
self._queues: Dict[str, asyncio.Queue] = {}
self._tasks: Set[asyncio.Task] = set()
self._broker_log: List[Event] = []
self._step_flags: Dict[str, asyncio.Event] = {}
self._accepted_events: List[Tuple[str, str]] = []
self._retval: Any = None
# Streaming machinery
self._streaming_queue: asyncio.Queue = asyncio.Queue()
# Global data storage
self._lock = asyncio.Lock()
self._globals: Dict[str, Any] = {}
# Step-specific instance
self._events_buffer: Dict[Type[Event], List[Event]] = defaultdict(list)
async def set(self, key: str, value: Any, make_private: bool = False) -> None:
"""Store `value` into the Context under `key`.
Args:
key: A unique string to identify the value stored.
value: The data to be stored.
Raises:
ValueError: When make_private is True but a key already exists in the global storage.
"""
if make_private:
warnings.warn(
"`make_private` is deprecated and will be ignored", DeprecationWarning
)
async with self.lock:
self._globals[key] = value
async def get(self, key: str, default: Optional[Any] = None) -> Any:
"""Get the value corresponding to `key` from the Context.
Args:
key: A unique string to identify the value stored.
default: The value to return when `key` is missing instead of raising an exception.
Raises:
ValueError: When there's not value accessible corresponding to `key`.
"""
async with self.lock:
if key in self._globals:
return self._globals[key]
elif default is not None:
return default
msg = f"Key '{key}' not found in Context"
raise ValueError(msg)
@property
def data(self) -> Dict[str, Any]:
"""This property is provided for backward compatibility.
Use `get` and `set` instead.
"""
msg = "`data` is deprecated, please use the `get` and `set` method to store data into the Context."
warnings.warn(msg, DeprecationWarning)
return self._globals
@property
def lock(self) -> asyncio.Lock:
"""Returns a mutex to lock the Context."""
return self._lock
@property
def session(self) -> "Context":
"""This property is provided for backward compatibility."""
msg = "`session` is deprecated, please use the Context instance directly."
warnings.warn(msg, DeprecationWarning)
return self
def collect_events(
self, ev: Event, expected: List[Type[Event]]
) -> Optional[List[Event]]:
self._events_buffer[type(ev)].append(ev)
retval: List[Event] = []
for e_type in expected:
e_instance_list = self._events_buffer.get(e_type)
if e_instance_list:
retval.append(e_instance_list.pop(0))
if len(retval) == len(expected):
return retval
# put back the events if unable to collect all
for ev in retval:
self._events_buffer[type(ev)].append(ev)
return None
def send_event(self, message: Event, step: Optional[str] = None) -> None:
"""Sends an event to a specific step in the workflow.
If step is None, the event is sent to all the receivers and we let
them discard events they don't want.
"""
if step is None:
for queue in self._queues.values():
queue.put_nowait(message)
else:
if step not in self._workflow._get_steps():
raise WorkflowRuntimeError(f"Step {step} does not exist")
step_func = self._workflow._get_steps()[step]
step_config: Optional[StepConfig] = getattr(
step_func, "__step_config", None
)
if step_config and type(message) in step_config.accepted_events:
self._queues[step].put_nowait(message)
else:
raise WorkflowRuntimeError(
f"Step {step} does not accept event of type {type(message)}"
)
self._broker_log.append(message)
def write_event_to_stream(self, ev: Optional[Event]) -> None:
self._streaming_queue.put_nowait(ev)
def get_result(self) -> Any:
"""Returns the result of the workflow."""
return self._retval
@property
def streaming_queue(self) -> asyncio.Queue:
return self._streaming_queue
|
data
property
This property is provided for backward compatibility.
Use get
and set
instead.
lock
property
Returns a mutex to lock the Context.
session
property
This property is provided for backward compatibility.
set
async
set(key: str, value: Any, make_private: bool = False) -> None
Store value
into the Context under key
.
Parameters:
Name |
Type |
Description |
Default |
key |
str
|
A unique string to identify the value stored.
|
required
|
value |
Any
|
|
required
|
Raises:
Type |
Description |
ValueError
|
When make_private is True but a key already exists in the global storage.
|
Source code in llama-index-core/llama_index/core/workflow/context.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 | async def set(self, key: str, value: Any, make_private: bool = False) -> None:
"""Store `value` into the Context under `key`.
Args:
key: A unique string to identify the value stored.
value: The data to be stored.
Raises:
ValueError: When make_private is True but a key already exists in the global storage.
"""
if make_private:
warnings.warn(
"`make_private` is deprecated and will be ignored", DeprecationWarning
)
async with self.lock:
self._globals[key] = value
|
get
async
get(key: str, default: Optional[Any] = None) -> Any
Get the value corresponding to key
from the Context.
Parameters:
Name |
Type |
Description |
Default |
key |
str
|
A unique string to identify the value stored.
|
required
|
default |
Optional[Any]
|
The value to return when key is missing instead of raising an exception.
|
None
|
Raises:
Type |
Description |
ValueError
|
When there's not value accessible corresponding to key .
|
Source code in llama-index-core/llama_index/core/workflow/context.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 | async def get(self, key: str, default: Optional[Any] = None) -> Any:
"""Get the value corresponding to `key` from the Context.
Args:
key: A unique string to identify the value stored.
default: The value to return when `key` is missing instead of raising an exception.
Raises:
ValueError: When there's not value accessible corresponding to `key`.
"""
async with self.lock:
if key in self._globals:
return self._globals[key]
elif default is not None:
return default
msg = f"Key '{key}' not found in Context"
raise ValueError(msg)
|
send_event
send_event(message: Event, step: Optional[str] = None) -> None
Sends an event to a specific step in the workflow.
If step is None, the event is sent to all the receivers and we let
them discard events they don't want.
Source code in llama-index-core/llama_index/core/workflow/context.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 | def send_event(self, message: Event, step: Optional[str] = None) -> None:
"""Sends an event to a specific step in the workflow.
If step is None, the event is sent to all the receivers and we let
them discard events they don't want.
"""
if step is None:
for queue in self._queues.values():
queue.put_nowait(message)
else:
if step not in self._workflow._get_steps():
raise WorkflowRuntimeError(f"Step {step} does not exist")
step_func = self._workflow._get_steps()[step]
step_config: Optional[StepConfig] = getattr(
step_func, "__step_config", None
)
if step_config and type(message) in step_config.accepted_events:
self._queues[step].put_nowait(message)
else:
raise WorkflowRuntimeError(
f"Step {step} does not accept event of type {type(message)}"
)
self._broker_log.append(message)
|
get_result
Returns the result of the workflow.
Source code in llama-index-core/llama_index/core/workflow/context.py
| def get_result(self) -> Any:
"""Returns the result of the workflow."""
return self._retval
|