Redis Ingestion Pipeline#

This walkthrough shows how to use Redis for both the vector store, cache, and docstore in an Ingestion Pipeline.

Dependencies#

Install and start redis, setup OpenAI API key

!pip install redis
Requirement already satisfied: redis in /home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages (5.0.1)
Requirement already satisfied: async-timeout>=4.0.2 in /home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages (from redis) (4.0.3)

[notice] A new release of pip is available: 23.2.1 -> 23.3.1
[notice] To update, run: pip install --upgrade pip
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
338c889086e8649aa80dfb79ebff4fffc98d72fc6d988ac158c6662e9e0cf04b
import os

os.environ["OPENAI_API_KEY"] = "sk-..."

Create Seed Data#

# Make some test data
!rm -rf test_redis_data
!mkdir -p test_redis_data
!echo "This is a test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: two!" > test_redis_data/test2.txt
from llama_index import SimpleDirectoryReader

# load documents with deterministic IDs
documents = SimpleDirectoryReader(
    "./test_redis_data", filename_as_id=True
).load_data()
/home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages/deeplake/util/check_latest_version.py:32: UserWarning: A newer version of deeplake (3.8.9) is available. It's recommended that you update to the latest version using `pip install -U deeplake`.
  warnings.warn(

Run the Redis-Based Ingestion Pipeline#

With a vector store attached, the pipeline will handle upserting data into your vector store.

However, if you only want to handle duplcates, you can change the strategy to DUPLICATES_ONLY.

from llama_index.embeddings import HuggingFaceEmbedding
from llama_index.ingestion import (
    DocstoreStrategy,
    IngestionPipeline,
    IngestionCache,
)
from llama_index.ingestion.cache import RedisCache
from llama_index.storage.docstore import RedisDocumentStore
from llama_index.text_splitter import SentenceSplitter
from llama_index.vector_stores import RedisVectorStore

embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(),
        embed_model,
    ],
    docstore=RedisDocumentStore.from_host_and_port(
        "localhost", 6379, namespace="document_store"
    ),
    vector_store=RedisVectorStore(
        index_name="redis_vector_store",
        index_prefix="vectore_store",
        redis_url="redis://localhost:6379",
    ),
    cache=IngestionCache(
        cache=RedisCache.from_host_and_port("localhost", 6379),
        collection="redis_cache",
    ),
    docstore_strategy=DocstoreStrategy.UPSERTS,
)
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes

Confirm documents are ingested#

We can create a vector index using our vector store, and quickly ask which documents are seen.

from llama_index import VectorStoreIndex, ServiceContext

service_context = ServiceContext.from_defaults(embed_model=embed_model)

index = VectorStoreIndex.from_vector_store(
    pipeline.vector_store, service_context=service_context
)
print(
    index.as_query_engine(similarity_top_k=10).query(
        "What documents do you see?"
    )
)
I see two documents: "test2.txt" and "test1.txt".

Add data and Ingest#

Here, we can update an existing file, as well as add a new one!

!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt
documents = SimpleDirectoryReader(
    "./test_redis_data", filename_as_id=True
).load_data()

nodes = pipeline.run(documents=documents)

print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
index = VectorStoreIndex.from_vector_store(
    pipeline.vector_store, service_context=service_context
)

response = index.as_query_engine(similarity_top_k=10).query(
    "What documents do you see?"
)

print(response)

for node in response.source_nodes:
    print(node.get_text())
I see three documents: test3.txt, test1.txt, and test2.txt.
This is a test file: three!
This is a NEW test file: one!
This is a test file: two!

As we can see, the data was deduplicated and upserted correctly! Only three nodes are in the index, even though we ran the full pipeline twice.