Parallel Execution of Same Event Example¶
In this example, we'll demonstrate how to use the workflow functionality to achieve similar capabilities while allowing parallel execution of multiple events of the same type.
By setting the num_workers
parameter in @step
decorator, we can control the number of steps executed simultaneously, enabling efficient parallel processing.
Installing Dependencies¶
First, we need to install the necessary dependencies:
- LlamaIndex core for most functionalities
- llama-index-utils-workflow for workflow capabilities
!pip install llama-index-core llama-index-utils-workflow
Importing Required Libraries¶
After installing the dependencies, we can import the required libraries:
import asyncio
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)
We will create two workflows: one that can process multiple data items in parallel by using the @step(num_workers=N)
decorator, and another without setting num_workers, for comparison.
By using the num_workers
parameter in the @step
decorator, we can limit the number of steps executed simultaneously, thus controlling the level of parallelism. This approach is particularly suitable for scenarios that require processing similar tasks while managing resource usage.
For example, you can execute multiple sub-queries at once, but please note that num_workers cannot be set without limits. It depends on your workload or token limits.
Defining Event Types¶
We'll define two event types: one for input events to be processed, and another for processing results:
class ProcessEvent(Event):
data: str
class ResultEvent(Event):
result: str
Creating Sequential and Parallel Workflows¶
Now, we'll create a SequentialWorkflow and a ParallelWorkflow class that includes three main steps:
- start: Initialize and send multiple parallel events
- process_data: Process data
- combine_results: Collect and merge all processing results
import random
class SequentialWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.set("num_to_collect", len(data_list))
for item in data_list:
self.send_event(ProcessEvent(data=item))
return None
@step
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
await asyncio.sleep(random.randint(1, 2))
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)
class ParallelWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.set("num_to_collect", len(data_list))
for item in data_list:
self.send_event(ProcessEvent(data=item))
return None
@step(num_workers=3)
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
await asyncio.sleep(random.randint(1, 2))
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)
In these two workflows:
- The start method initializes and sends multiple ProcessEvent.
- The process_data method uses
- only the
@step
decorator in SequentialWorkflow - uses the
@step(num_workers=3)
decorator in ParallelWorkflow to limit the number of simultaneously executing workers to 3.
- only the
- The combine_results method collects all processing results and merges them.
Running the Workflow¶
Finally, we can create a main function to run our workflow:
import time
sequential_workflow = SequentialWorkflow()
print(
"Start a sequential workflow without setting num_workers in the step of process_data"
)
start_time = time.time()
result = await sequential_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
print("-" * 30)
parallel_workflow = ParallelWorkflow()
print(
"Start a parallel workflow with setting num_workers in the step of process_data"
)
start_time = time.time()
result = await parallel_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
Start a sequential workflow without setting num_workers in the step of process_data Completed processing: A Completed processing: B Completed processing: C Workflow result: Processed: A, Processed: B, Processed: C Time taken: 4.008663654327393 seconds ------------------------------ Start a parallel workflow with setting num_workers in the step of process_data Completed processing: C Completed processing: A Completed processing: B Workflow result: Processed: C, Processed: A, Processed: B Time taken: 2.0040180683135986 seconds
Note¶
- Without setting num_workers, it might take 3 to 6 seconds. By setting num_workers, the processing occurs in parallel, handling 3 items at a time, and only takes 2 seconds.
- In ParallelWorkflow, the order of the completed results may differ from the input order, depending on the completion time of the tasks.
This example demonstrates the execution speed with and without using num_workers, and how to implement parallel processing in a workflow. By setting num_workers, we can control the degree of parallelism, which is very useful for scenarios that need to balance performance and resource usage.