Managed Index with Zilliz Cloud Pipelines¶
Zilliz Cloud Pipelines is a scalable API service for retrieval. You can use Zilliz Cloud Pipelines as managed index in llama-index
. This service can transform documents into vector embeddings and store them in Zilliz Cloud for effective semantic search.
Setup¶
- Install llama-index dependencies
%pip install llama-index-indices-managed-zilliz
%pip install llama-index
- Configure credentials of your Zilliz Cloud accounts.
from getpass import getpass
ZILLIZ_PROJECT_ID = getpass("Enter your Zilliz Project ID:")
ZILLIZ_CLUSTER_ID = getpass("Enter your Zilliz Cluster ID:")
ZILLIZ_TOKEN = getpass("Enter your Zilliz API Key:")
Indexing documents¶
It is optional to add metadata for each document. The metadata can be used to filter doc data during retrieval.
From Signed URL¶
Zilliz Cloud Pipelines accepts files from AWS S3 and Google Cloud Storage. You can generate a presigned url from the Object Storage and use from_document_url()
to ingest the file. It can automatically index the document and store the doc chunks as vectors on Zilliz Cloud.
from llama_index.indices.managed.zilliz import ZillizCloudPipelineIndex
# Create pipelines: skip this step if you have prepared valid pipelines
pipeline_ids = ZillizCloudPipelineIndex.create_pipelines(
project_id=ZILLIZ_PROJECT_ID,
cluster_id=ZILLIZ_CLUSTER_ID,
api_key=ZILLIZ_TOKEN,
data_type="doc",
collection_name="zcp_llamalection_doc", # change this value will customize collection name
metadata_schema={"user_id": "VarChar"},
)
print(pipeline_ids)
{'INGESTION': 'pipe-d639f220f27320e2e381de', 'SEARCH': 'pipe-47bd43fe8fd54502874a08', 'DELETION': 'pipe-bd434c99e064282f1a28e8'}
zcp_doc_index = ZillizCloudPipelineIndex.from_document_url(
# a public or pre-signed url of a file stored on AWS S3 or Google Cloud Storage
url="https://publicdataset.zillizcloud.com/milvus_doc.md",
pipeline_ids=pipeline_ids,
api_key=ZILLIZ_TOKEN,
metadata={
"user_id": "user-001"
}, # optional, which can be used for filtering
)
# # Delete docs by doc name
# zcp_doc_index.delete_by_expression(expression="doc_name == 'milvus_doc_22.md'")
From Document Nodes¶
Zilliz Cloud Pipelines support text as data input as well. The following example prepares data with a sample document node.
from llama_index.core import Document
from llama_index.indices.managed.zilliz import ZillizCloudPipelineIndex
# prepare documents
documents = [Document(text="The number that is being searched for is ten.")]
# create pipelines: skip this step if you have prepared valid pipelines
pipeline_ids = ZillizCloudPipelineIndex.create_pipelines(
project_id=ZILLIZ_PROJECT_ID,
cluster_id=ZILLIZ_CLUSTER_ID,
api_key=ZILLIZ_TOKEN,
data_type="text",
collection_name="zcp_llamalection_text", # change this value will customize collection name
)
print(pipeline_ids)
{'INGESTION': 'pipe-2bbab10f273a57eb987024', 'SEARCH': 'pipe-e1914a072ec5e6f83e446a', 'DELETION': 'pipe-72bbabf273a51af0b0c447'}
zcp_text_index = ZillizCloudPipelineIndex.from_documents(
# a public or pre-signed url of a file stored on AWS S3 or Google Cloud Storage
documents=documents,
pipeline_ids=pipeline_ids,
api_key=ZILLIZ_TOKEN,
)
Working as Query Engine¶
To conduct semantic search with ZillizCloudPipelineIndex
, you can use it as_query_engine()
by specifying a few parameters:
- search_top_k: How many text nodes/chunks to retrieve. Optional, defaults to
DEFAULT_SIMILARITY_TOP_K
(2). - filters: Metadata filters. Optional, defaults to None.
- output_metadata: What metadata fields to return with the retrieved text node. Optional, defaults to [].
import os
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API Key:")
query_engine = zcp_doc_index.as_query_engine(search_top_k=3)
Then the query engine is ready for Semantic Search or Retrieval Augmented Generation with Milvus 2.3 documents:
- Retrieve (Semantic search powered by Zilliz Cloud Pipelines):
question = "Can users delete entities by filtering non-primary fields?"
retrieved_nodes = query_engine.retrieve(question)
print(retrieved_nodes)
[NodeWithScore(node=TextNode(id_='449755997496672548', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, text='# Delete Entities\nThis topic describes how to delete entities in Milvus. \nMilvus supports deleting entities by primary key or complex boolean expressions. Deleting entities by primary key is much faster and lighter than deleting them by complex boolean expressions. This is because Milvus executes queries first when deleting data by complex boolean expressions. \nDeleted entities can still be retrieved immediately after the deletion if the consistency level is set lower than Strong.\nEntities deleted beyond the pre-specified span of time for Time Travel cannot be retrieved again.\nFrequent deletion operations will impact the system performance. \nBefore deleting entities by comlpex boolean expressions, make sure the collection has been loaded.\nDeleting entities by complex boolean expressions is not an atomic operation. Therefore, if it fails halfway through, some data may still be deleted.\nDeleting entities by complex boolean expressions is supported only when the consistency is set to Bounded. For details, see Consistency.\\\n\\\n# Delete Entities\n## Prepare boolean expression\nPrepare the boolean expression that filters the entities to delete. \nMilvus supports deleting entities by primary key or complex boolean expressions. For more information on expression rules and supported operators, see Boolean Expression Rules.\\\n\\\n# Delete Entities\n## Prepare boolean expression\n### Simple boolean expression\nUse a simple expression to filter data with primary key values of 0 and 1: \n```python\nexpr = "book_id in [0,1]"\n```\\\n\\\n# Delete Entities\n## Prepare boolean expression\n### Complex boolean expression\nTo filter entities that meet specific conditions, define complex boolean expressions. \nFilter entities whose word_count is greater than or equal to 11000: \n```python\nexpr = "word_count >= 11000"\n``` \nFilter entities whose book_name is not Unknown: \n```python\nexpr = "book_name != Unknown"\n``` \nFilter entities whose primary key values are greater than 5 and word_count is smaller than or equal to 9999: \n```python\nexpr = "book_id > 5 && word_count <= 9999"\n```', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.742070198059082), NodeWithScore(node=TextNode(id_='449755997496672549', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, text='# Delete Entities\n## Delete entities\nDelete the entities with the boolean expression you created. Milvus returns the ID list of the deleted entities.\n```python\nfrom pymilvus import Collection\ncollection = Collection("book") # Get an existing collection.\ncollection.delete(expr)\n``` \nParameter\tDescription\nexpr\tBoolean expression that specifies the entities to delete.\npartition_name (optional)\tName of the partition to delete entities from.\\\n\\\n# Upsert Entities\nThis topic describes how to upsert entities in Milvus. \nUpserting is a combination of insert and delete operations. In the context of a Milvus vector database, an upsert is a data-level operation that will overwrite an existing entity if a specified field already exists in a collection, and insert a new entity if the specified value doesn’t already exist. \nThe following example upserts 3,000 rows of randomly generated data as the example data. When performing upsert operations, it\'s important to note that the operation may compromise performance. This is because the operation involves deleting data during execution.\\\n\\\n# Upsert Entities\n## Prepare data\nFirst, prepare the data to upsert. The type of data to upsert must match the schema of the collection, otherwise Milvus will raise an exception. \nMilvus supports default values for scalar fields, excluding a primary key field. This indicates that some fields can be left empty during data inserts or upserts. For more information, refer to Create a Collection. \n```python\n# Generate data to upsert\n\nimport random\nnb = 3000\ndim = 8\nvectors = [[random.random() for _ in range(dim)] for _ in range(nb)]\ndata = [\n[i for i in range(nb)],\n[str(i) for i in range(nb)],\n[i for i in range(10000, 10000+nb)],\nvectors,\n[str("dy"*i) for i in range(nb)]\n]\n```', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.6409814953804016), NodeWithScore(node=TextNode(id_='449755997496672550', embedding=None, metadata={}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, text='# Upsert Entities\n## Upsert data\nUpsert the data to the collection. \n```python\nfrom pymilvus import Collection\ncollection = Collection("book") # Get an existing collection.\nmr = collection.upsert(data)\n``` \nParameter\tDescription\ndata\tData to upsert into Milvus.\npartition_name (optional)\tName of the partition to upsert data into.\ntimeout (optional)\tAn optional duration of time in seconds to allow for the RPC. If it is set to None, the client keeps waiting until the server responds or error occurs.\nAfter upserting entities into a collection that has previously been indexed, you do not need to re-index the collection, as Milvus will automatically create an index for the newly upserted data. For more information, refer to Can indexes be created after inserting vectors?\\\n\\\n# Upsert Entities\n## Flush data\nWhen data is upserted into Milvus it is updated and inserted into segments. Segments have to reach a certain size to be sealed and indexed. Unsealed segments will be searched brute force. In order to avoid this with any remainder data, it is best to call flush(). The flush() call will seal any remaining segments and send them for indexing. It is important to only call this method at the end of an upsert session. Calling it too often will cause fragmented data that will need to be cleaned later on.\\\n\\\n# Upsert Entities\n## Limits\nUpdating primary key fields is not supported by upsert().\nupsert() is not applicable and an error can occur if autoID is set to True for primary key fields.', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'), score=0.5456743240356445)]
- Query (RAG powered by Zilliz Cloud Pipelines as retriever and OpenAI's LLM):
response = query_engine.query(question)
print(response.response)
Users can delete entities by filtering non-primary fields using complex boolean expressions in Milvus.
Multi-Tenancy¶
With the tenant-specific value (eg. user id) as metadata, the managed index is able to achieve multi-tenancy by applying metadata filters.
By specifying metadata value, each document is tagged with the tenant-specific field at ingestion.
zcp_doc_index._insert_doc_url(
url="https://publicdataset.zillizcloud.com/milvus_doc_22.md",
metadata={"user_id": "user_002"},
)
{'token_usage': 984, 'doc_name': 'milvus_doc_22.md', 'num_chunks': 3}
Then the managed index is able to build a query engine for each tenant by filtering the tenant-specific field.
from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters
query_engine_for_user_002 = zcp_doc_index.as_query_engine(
search_top_k=3,
filters=MetadataFilters(
filters=[ExactMatchFilter(key="user_id", value="user_002")]
),
output_metadata=["user_id"], # optional, display user_id in outputs
)
Change
filters
to build query engines with different conditions.
question = "Can I delete entities by filtering non-primary fields?"
# search_results = query_engine_for_user_002.retrieve(question)
response = query_engine_for_user_002.query(question)
print(response.response)
Milvus only supports deleting entities by primary key filtered with boolean expressions. Other operators can be used only in query or scalar filtering in vector search.