Ingestion Pipeline#
An IngestionPipeline
uses a concept of Transformations
that are applied to input data. These Transformations
are applied to your input data, and the resulting nodes are either returned or inserted into a vector database (if given). Each node+transformation pair is cached, so that subsequent runs (if the cache is persisted) with the same node+transformation combination can use the cached result and save you time.
To see an interactive example of IngestionPipeline
being put in use, check out the RAG CLI.
Usage Pattern#
The simplest usage is to instantiate an IngestionPipeline
like so:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
# create the pipeline with transformations
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
]
)
# run the pipeline
nodes = pipeline.run(documents=[Document.example()])
Note that in a real-world scenario, you would get your documents from SimpleDirectoryReader
or another reader from Llama Hub.
Connecting to Vector Databases#
When running an ingestion pipeline, you can also chose to automatically insert the resulting nodes into a remote vector store.
Then, you can construct an index from that vector store later on.
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore
import qdrant_client
client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
vector_store=vector_store,
)
# Ingest directly into a vector db
pipeline.run(documents=[Document.example()])
# Create your index
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store)
Calculating embeddings in a pipeline#
Note that in the above example, embeddings are calculated as part of the pipeline. If you are connecting your pipeline to a vector store, embeddings must be a stage of your pipeline or your later instantiation of the index will fail.
You can omit embeddings from your pipeline if you are not connecting to a vector store, i.e. just producing a list of nodes.
Caching#
In an IngestionPipeline
, each node + transformation combination is hashed and cached. This saves time on subsequent runs that use the same data.
The following sections describe some basic usage around caching.
Local Cache Management#
Once you have a pipeline, you may want to store and load the cache.
# save
pipeline.persist("./pipeline_storage")
# load and restore state
new_pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
],
)
new_pipeline.load("./pipeline_storage")
# will run instantly due to the cache
nodes = pipeline.run(documents=[Document.example()])
If the cache becomes too large, you can clear it
# delete all context of the cache
cache.clear()
Remote Cache Management#
We support multiple remote storage backends for caches
RedisCache
MongoDBCache
FirestoreCache
Here as an example using the RedisCache
:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.ingestion.cache import RedisCache
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
cache=IngestionCache(
cache=RedisCache(
redis_uri="redis://127.0.0.1:6379", collection="test_cache"
)
),
)
# Ingest directly into a vector db
nodes = pipeline.run(documents=[Document.example()])
Here, no persist step is needed, since everything is cached as you go in the specified remote collection.
Async Support#
The IngestionPipeline
also has support for async operation
nodes = await pipeline.arun(documents=documents)
Document Management#
Attaching a docstore
to the ingestion pipeline will enable document management.
Using the document.doc_id
or node.ref_doc_id
as a grounding point, the ingestion pipeline will actively look for duplicate documents.
It works by:
Storing a map of
doc_id
->document_hash
If a vector store is attached:
If a duplicate
doc_id
is detected, and the hash has changed, the document will be re-processed and upsertedIf a duplicate
doc_id
is detected and the hash is unchanged, the node is skipped
If only a vector store is not attached:
Checks all existing hashes for each node
If a duplicate is found, the node is skipped
Otherwise, the node is processed
NOTE: If we do not attach a vector store, we can only check for and remove duplicate inputs.
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore
pipeline = IngestionPipeline(
transformations=[...], docstore=SimpleDocumentStore()
)
A full walkthrough is found in our demo notebook.
Also check out another guide using Redis as our entire ingestion stack.
Parallel Processing#
The run
method of IngestionPipeline
can be executed with parallel processes.
It does so by making use of multiprocessing.Pool
distributing batches of nodes
to across processors.
To execute with parallel processing, set num_workers
to the number of processes
you’d like use:
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline(
transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)