Redis Ingestion Pipeline¶
This walkthrough shows how to use Redis for both the vector store, cache, and docstore in an Ingestion Pipeline.
Dependencies¶
Install and start redis, setup OpenAI API key
In [ ]:
Copied!
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
%pip install llama-index-storage-docstore-redis
%pip install llama-index-vector-stores-redis
%pip install llama-index-embeddings-huggingface
In [ ]:
Copied!
!pip install redis
!pip install redis
Requirement already satisfied: redis in /home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages (5.0.1) Requirement already satisfied: async-timeout>=4.0.2 in /home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages (from redis) (4.0.3) [notice] A new release of pip is available: 23.2.1 -> 23.3.1 [notice] To update, run: pip install --upgrade pip
In [ ]:
Copied!
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
!docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
338c889086e8649aa80dfb79ebff4fffc98d72fc6d988ac158c6662e9e0cf04b
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
Create Seed Data¶
In [ ]:
Copied!
# Make some test data
!rm -rf test_redis_data
!mkdir -p test_redis_data
!echo "This is a test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: two!" > test_redis_data/test2.txt
# Make some test data
!rm -rf test_redis_data
!mkdir -p test_redis_data
!echo "This is a test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: two!" > test_redis_data/test2.txt
In [ ]:
Copied!
from llama_index.core import SimpleDirectoryReader
# load documents with deterministic IDs
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
from llama_index.core import SimpleDirectoryReader
# load documents with deterministic IDs
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
/home/loganm/.cache/pypoetry/virtualenvs/llama-index-4a-wkI5X-py3.11/lib/python3.11/site-packages/deeplake/util/check_latest_version.py:32: UserWarning: A newer version of deeplake (3.8.9) is available. It's recommended that you update to the latest version using `pip install -U deeplake`. warnings.warn(
Run the Redis-Based Ingestion Pipeline¶
With a vector store attached, the pipeline will handle upserting data into your vector store.
However, if you only want to handle duplcates, you can change the strategy to DUPLICATES_ONLY
.
In [ ]:
Copied!
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionPipeline,
IngestionCache,
)
from llama_index.core.ingestion.cache import RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=RedisVectorStore(
index_name="redis_vector_store",
index_prefix="vectore_store",
redis_url="redis://localhost:6379",
),
cache=IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
),
docstore_strategy=DocstoreStrategy.UPSERTS,
)
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.ingestion import (
DocstoreStrategy,
IngestionPipeline,
IngestionCache,
)
from llama_index.core.ingestion.cache import RedisCache
from llama_index.storage.docstore.redis import RedisDocumentStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.redis import RedisVectorStore
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(),
embed_model,
],
docstore=RedisDocumentStore.from_host_and_port(
"localhost", 6379, namespace="document_store"
),
vector_store=RedisVectorStore(
index_name="redis_vector_store",
index_prefix="vectore_store",
redis_url="redis://localhost:6379",
),
cache=IngestionCache(
cache=RedisCache.from_host_and_port("localhost", 6379),
collection="redis_cache",
),
docstore_strategy=DocstoreStrategy.UPSERTS,
)
In [ ]:
Copied!
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
Confirm documents are ingested¶
We can create a vector index using our vector store, and quickly ask which documents are seen.
In [ ]:
Copied!
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
In [ ]:
Copied!
print(
index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
)
print(
index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
)
I see two documents: "test2.txt" and "test1.txt".
Add data and Ingest¶
Here, we can update an existing file, as well as add a new one!
In [ ]:
Copied!
!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt
!echo "This is a test file: three!" > test_redis_data/test3.txt
!echo "This is a NEW test file: one!" > test_redis_data/test1.txt
In [ ]:
Copied!
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
documents = SimpleDirectoryReader(
"./test_redis_data", filename_as_id=True
).load_data()
nodes = pipeline.run(documents=documents)
print(f"Ingested {len(nodes)} Nodes")
Ingested 2 Nodes
In [ ]:
Copied!
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
response = index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
print(response)
for node in response.source_nodes:
print(node.get_text())
index = VectorStoreIndex.from_vector_store(
pipeline.vector_store, embed_model=embed_model
)
response = index.as_query_engine(similarity_top_k=10).query(
"What documents do you see?"
)
print(response)
for node in response.source_nodes:
print(node.get_text())
I see three documents: test3.txt, test1.txt, and test2.txt. This is a test file: three! This is a NEW test file: one! This is a test file: two!
As we can see, the data was deduplicated and upserted correctly! Only three nodes are in the index, even though we ran the full pipeline twice.