Open In Colab

Pathway Reader#

Pathway is an open data processing framework. It allows you to easily develop data transformation pipelines and Machine Learning applications that work with live data sources and changing data.

This notebook demonstrates how to set up a live data indexing pipeline. You can query the results of this pipeline from your LLM application in the same manner as you would a regular reader. However, under the hood, Pathway updates the index on each data change giving you always up-to-date answers.

In this notebook, we will use a simple document processing pipeline that:

  1. Monitors several data sources (files, S3 folders, cloud storage) for data changes.

  2. Parses, splits and embeds the documents using Llama-index methods.

  3. Builds a vector index for the data.

We will connect to the index using llama_index.readers.pathway.PathwayReader reader, which implements the load_data interface.

The basic pipeline described in this document allows to effortlessly build a simple index of files stored in a cloud location. However, Pathway provides everything needed to build realtime data pipelines and apps, including SQL-like able operations such as groupby-reductions and joins between disparate data sources, time-based grouping and windowing of data, and a wide array of connectors.

For more details about Pathway data ingestion pipeline and vector store, visit vector store pipeline.

Prerequisites#

Install pathway and llama-index packages. Then download sample data.

%pip install llama-index-readers-pathway
%pip install llama-index-embeddings-openai
!pip install pathway
!pip install llama-index

!mkdir -p 'data/'
!wget 'https://gist.githubusercontent.com/janchorowski/dd22a293f3d99d1b726eedc7d46d2fc0/raw/pathway_readme.md' -O 'data/pathway_readme.md'

Configure logging.

import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.ERROR)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

Set up your OpenAI API key.

import getpass
import os

# omit if embedder of choice is not OpenAI
if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")

Define data sources tracked by Pathway#

Pathway can listen to many sources simultaneously, such as local files, S3 folders, cloud storage and any data stream for data changes.

See pathway-io for more information.

import pathway as pw

data_sources = []
data_sources.append(
    pw.io.fs.read(
        "./data",
        format="binary",
        mode="streaming",
        with_metadata=True,
    )  # This creates a `pathway` connector that tracks
    # all the files in the ./data directory
)

# This creates a connector that tracks files in Google drive.
# please follow the instructions at https://pathway.com/developers/tutorials/connectors/gdrive-connector/ to get credentials
# data_sources.append(
#     pw.io.gdrive.read(object_id="17H4YpBOAKQzEJ93xmC2z170l0bP2npMy", service_user_credentials_file="credentials.json", with_metadata=True))

Create the document indexing pipeline#

Let us create the document indexing pipeline. The transformations should be a list of TransformComponents ending with an Embedding transformation.

In this example, let’s first split the text first using TokenTextSplitter, then embed with OpenAIEmbedding.

from llama_index.core.retrievers import PathwayVectorServer
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import TokenTextSplitter

embed_model = OpenAIEmbedding(embed_batch_size=10)

transformations_example = [
    TokenTextSplitter(
        chunk_size=150,
        chunk_overlap=10,
        separator=" ",
    ),
    embed_model,
]

processing_pipeline = PathwayVectorServer(
    *data_sources,
    transformations=transformations_example,
)

# Define the Host and port that Pathway will be on
PATHWAY_HOST = "127.0.0.1"
PATHWAY_PORT = 8754

# `threaded` runs pathway in detached mode, we have to set it to False when running from terminal or container
# for more information on `with_cache` check out https://pathway.com/developers/api-docs/persistence-api
processing_pipeline.run_server(
    host=PATHWAY_HOST, port=PATHWAY_PORT, with_cache=False, threaded=True
)
<Thread(Thread-5 (run), started 140336003253824)>
======== Running on http://127.0.0.1:8754 ========
(Press CTRL+C to quit)
148

Create the Reader#

from llama_index.readers.pathway import PathwayReader

reader = PathwayReader(host=PATHWAY_HOST, port=PATHWAY_PORT)
# let us search with some text
reader.load_data(query_text="What is Pathway")
[Document(id_='cf4217ed-6ba8-4ac7-8125-e3ce847244ef', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='92c84c3369fcd527faeee011f1106d054ea385f5dfa725f39999fd1c2f44a3db', text="Pathway is an open framework for high-throughput and low-latency real-time data processing. It is used to create Python code which seamlessly combines batch processing, streaming, and real-time API's for LLM apps. Pathway's distributed runtime (🦀-🐍) provides fresh results of your data pipelines whenever new inputs and requests are received.\n\nIn the first place, Pathway was designed to be a life-saver (or at least a time-saver) for Python developers and ML/AI engineers faced with live data sources, where you need to react quickly to fresh data. Still, Pathway is a powerful tool that can be used for a lot of things. If you want to do streaming in Python,", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'),
 Document(id_='ec25ff55-f59d-406a-8319-a24cccdd9638', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='e72e4a7e14dc2d6d75b1dc2452609d08eb3743fb7c707d88af684870456dd06e', text='Resources\n\nSee also: Pathway Documentation (https://pathway.com/developers/) webpage (including API Docs).\n\n### Videos about Pathway<a id="videos-about-pathway"></a>\n[▶️ Building an LLM Application without a vector database](https://www.youtube.com/watch?v=kcrJSk00duw) - by [Jan Chorowski](https://scholar.google.com/citations?user=Yc94070AAAAJ) (7min 56s)\n\n[▶️ Linear regression on a Kafka Stream](https://vimeo.com/805069039) - by [Richard Pelgrim](https://twitter.com/richardpelgrim) (7min 53s)\n\n[▶️', start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'),
 Document(id_='a4710236-8362-4181-8910-04de01a6f203', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='1f03446cf0d7c1d39ec54932ca40d7fd45715db5ccd14ca93aea0aa20633d59f', text="If you want to do streaming in Python, build an AI data pipeline, or if you are looking for your next Python data processing framework, keep reading.\n\nPathway provides a high-level programming interface in Python for defining data transformations, aggregations, and other operations on data streams.\nWith Pathway, you can effortlessly design and deploy sophisticated data workflows that efficiently handle high volumes of data in real time.\n\nPathway is interoperable with various data sources and sinks such as Kafka, CSV files, SQL/noSQL databases, and REST API's, allowing you to connect and process data from different storage systems.\n\nTypical use-cases of Pathway include realtime data processing, ETL (Extract, Transform, Load) pipelines, data analytics,", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n'),
 Document(id_='668429b4-594f-491f-88c9-3504d2b0cc65', embedding=None, metadata={'created_at': None, 'modified_at': 1703700883, 'owner': 'janek', 'path': '/home/janek/projects/llama_index/docs/examples/data_connectors/data/pathway_readme.md'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='b8e5829ef270c804df92834d70467c328a2c9b7afa3688132c35116be07d6286', text="Get Help\n\nIf you have any questions, issues, or just want to chat about Pathway, we're here to help! Feel free to:\n- Check out the documentation in https://pathway.com/developers/ for detailed information.\n- Reach out to us via email at [email protected].\n\nOur team is always happy to help you and ensure that you get the most out of Pathway.\nIf you would like to better understand how best to use Pathway in your project, please don't hesitate to reach out to us.", start_char_idx=None, end_char_idx=None, text_template='{metadata_str}\n\n{content}', metadata_template='{key}: {value}', metadata_seperator='\n')]

Create a summary index with llama-index#

docs = reader.load_data(query_text="some search input", k=2)
from llama_index.core import SummaryIndex

index = SummaryIndex.from_documents(docs)
query_engine = index.as_query_engine()
response = query_engine.query("What does Pathway do?")
print(response)
Pathway is a platform that offers reactive data processing. It provides detailed information and documentation for users to better understand and utilize its features. Additionally, Pathway has a support team that is available to assist users with any questions or issues they may have.