Skip to content

File

CSVReader #

Bases: BaseReader

CSV parser.

Parameters:

Name Type Description Default
concat_rows bool

whether to concatenate all rows into one document. If set to False, a Document will be created for each row. True by default.

True
Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/tabular/base.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class CSVReader(BaseReader):
    """CSV parser.

    Args:
        concat_rows (bool): whether to concatenate all rows into one document.
            If set to False, a Document will be created for each row.
            True by default.

    """

    def __init__(self, *args: Any, concat_rows: bool = True, **kwargs: Any) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)
        self._concat_rows = concat_rows

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """Parse file.

        Returns:
            Union[str, List[str]]: a string or a List of strings.

        """
        try:
            import csv
        except ImportError:
            raise ImportError("csv module is required to read CSV files.")
        text_list = []
        with open(file) as fp:
            csv_reader = csv.reader(fp)
            for row in csv_reader:
                text_list.append(", ".join(row))

        metadata = {"filename": file.name, "extension": file.suffix}
        if extra_info:
            metadata = {**metadata, **extra_info}

        if self._concat_rows:
            return [Document(text="\n".join(text_list), metadata=metadata)]
        else:
            return [Document(text=text, metadata=metadata) for text in text_list]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None) -> List[Document]

Parse file.

Returns:

Type Description
List[Document]

Union[str, List[str]]: a string or a List of strings.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/tabular/base.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """Parse file.

    Returns:
        Union[str, List[str]]: a string or a List of strings.

    """
    try:
        import csv
    except ImportError:
        raise ImportError("csv module is required to read CSV files.")
    text_list = []
    with open(file) as fp:
        csv_reader = csv.reader(fp)
        for row in csv_reader:
            text_list.append(", ".join(row))

    metadata = {"filename": file.name, "extension": file.suffix}
    if extra_info:
        metadata = {**metadata, **extra_info}

    if self._concat_rows:
        return [Document(text="\n".join(text_list), metadata=metadata)]
    else:
        return [Document(text=text, metadata=metadata) for text in text_list]

DocxReader #

Bases: BaseReader

Docx parser.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/docs/base.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class DocxReader(BaseReader):
    """Docx parser."""

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        if not isinstance(file, Path):
            file = Path(file)

        try:
            import docx2txt
        except ImportError:
            raise ImportError(
                "docx2txt is required to read Microsoft Word files: "
                "`pip install docx2txt`"
            )

        if fs:
            with fs.open(file) as f:
                text = docx2txt.process(f)
        else:
            text = docx2txt.process(file)
        metadata = {"file_name": file.name}
        if extra_info is not None:
            metadata.update(extra_info)

        return [Document(text=text, metadata=metadata or {})]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/docs/base.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    if not isinstance(file, Path):
        file = Path(file)

    try:
        import docx2txt
    except ImportError:
        raise ImportError(
            "docx2txt is required to read Microsoft Word files: "
            "`pip install docx2txt`"
        )

    if fs:
        with fs.open(file) as f:
            text = docx2txt.process(f)
    else:
        text = docx2txt.process(file)
    metadata = {"file_name": file.name}
    if extra_info is not None:
        metadata.update(extra_info)

    return [Document(text=text, metadata=metadata or {})]

EpubReader #

Bases: BaseReader

Epub Parser.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/epub/base.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class EpubReader(BaseReader):
    """Epub Parser."""

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        try:
            import ebooklib
            import html2text
            from ebooklib import epub
        except ImportError:
            raise ImportError(
                "Please install extra dependencies that are required for "
                "the EpubReader: "
                "`pip install EbookLib html2text`"
            )
        if fs:
            logger.warning(
                "fs was specified but EpubReader doesn't support loading "
                "from fsspec filesystems. Will load from local filesystem instead."
            )

        text_list = []
        book = epub.read_epub(file, options={"ignore_ncx": True})

        # Iterate through all chapters.
        for item in book.get_items():
            # Chapters are typically located in epub documents items.
            if item.get_type() == ebooklib.ITEM_DOCUMENT:
                text_list.append(
                    html2text.html2text(item.get_content().decode("utf-8"))
                )

        text = "\n".join(text_list)
        return [Document(text=text, metadata=extra_info or {})]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/epub/base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    try:
        import ebooklib
        import html2text
        from ebooklib import epub
    except ImportError:
        raise ImportError(
            "Please install extra dependencies that are required for "
            "the EpubReader: "
            "`pip install EbookLib html2text`"
        )
    if fs:
        logger.warning(
            "fs was specified but EpubReader doesn't support loading "
            "from fsspec filesystems. Will load from local filesystem instead."
        )

    text_list = []
    book = epub.read_epub(file, options={"ignore_ncx": True})

    # Iterate through all chapters.
    for item in book.get_items():
        # Chapters are typically located in epub documents items.
        if item.get_type() == ebooklib.ITEM_DOCUMENT:
            text_list.append(
                html2text.html2text(item.get_content().decode("utf-8"))
            )

    text = "\n".join(text_list)
    return [Document(text=text, metadata=extra_info or {})]

FlatReader #

Bases: BaseReader

Flat reader.

Extract raw text from a file and save the file type in the metadata

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/flat/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class FlatReader(BaseReader):
    """Flat reader.

    Extract raw text from a file and save the file type in the metadata
    """

    def __init__(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """Parse file into string."""
        with open(file, encoding="utf-8") as f:
            content = f.read()
        metadata = {"filename": file.name, "extension": file.suffix}
        if extra_info:
            metadata = {**metadata, **extra_info}

        return [Document(text=content, metadata=metadata)]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None) -> List[Document]

Parse file into string.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/flat/base.py
23
24
25
26
27
28
29
30
31
32
33
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """Parse file into string."""
    with open(file, encoding="utf-8") as f:
        content = f.read()
    metadata = {"filename": file.name, "extension": file.suffix}
    if extra_info:
        metadata = {**metadata, **extra_info}

    return [Document(text=content, metadata=metadata)]

HTMLTagReader #

Bases: BaseReader

Read HTML files and extract text from a specific tag with BeautifulSoup.

By default, reads the text from the <section> tag.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/html/base.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class HTMLTagReader(BaseReader):
    """
    Read HTML files and extract text from a specific tag with BeautifulSoup.

    By default, reads the text from the ``<section>`` tag.
    """

    def __init__(
        self,
        tag: str = "section",
        ignore_no_id: bool = False,
    ) -> None:
        self._tag = tag
        self._ignore_no_id = ignore_no_id

        super().__init__()

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        try:
            from bs4 import BeautifulSoup
        except ImportError:
            raise ImportError("bs4 is required to read HTML files.")

        with open(file, encoding="utf-8") as html_file:
            soup = BeautifulSoup(html_file, "html.parser")

        tags = soup.find_all(self._tag)
        docs = []
        for tag in tags:
            tag_id = tag.get("id")
            tag_text = self._extract_text_from_tag(tag)

            if self._ignore_no_id and not tag_id:
                continue

            metadata = {
                "tag": self._tag,
                "tag_id": tag_id,
                "file_path": str(file),
            }
            metadata.update(extra_info or {})

            doc = Document(
                text=tag_text,
                metadata=metadata,
            )
            docs.append(doc)
        return docs

    def _extract_text_from_tag(self, tag: "Tag") -> str:
        try:
            from bs4 import NavigableString
        except ImportError:
            raise ImportError("bs4 is required to read HTML files.")

        texts = []
        for elem in tag.children:
            if isinstance(elem, NavigableString):
                if elem.strip():
                    texts.append(elem.strip())
            elif elem.name == self._tag:
                continue
            else:
                texts.append(elem.get_text().strip())
        return "\n".join(texts)

HWPReader #

Bases: BaseReader

Hwp Parser.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/docs/base.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class HWPReader(BaseReader):
    """Hwp Parser."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        super().__init__(*args, **kwargs)
        self.FILE_HEADER_SECTION = "FileHeader"
        self.HWP_SUMMARY_SECTION = "\x05HwpSummaryInformation"
        self.SECTION_NAME_LENGTH = len("Section")
        self.BODYTEXT_SECTION = "BodyText"
        self.HWP_TEXT_TAGS = [67]
        self.text = ""

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Load data and extract table from Hwp file.

        Args:
            file (Path): Path for the Hwp file.

        Returns:
            List[Document]
        """
        import olefile

        if fs:
            logger.warning(
                "fs was specified but HWPReader doesn't support loading "
                "from fsspec filesystems. Will load from local filesystem instead."
            )

        if not isinstance(file, Path):
            file = Path(file)
        load_file = olefile.OleFileIO(file)
        file_dir = load_file.listdir()
        if self.is_valid(file_dir) is False:
            raise Exception("Not Valid HwpFile")

        result_text = self._get_text(load_file, file_dir)
        result = self._text_to_document(text=result_text, extra_info=extra_info)
        return [result]

    def is_valid(self, dirs: List[str]) -> bool:
        if [self.FILE_HEADER_SECTION] not in dirs:
            return False

        return [self.HWP_SUMMARY_SECTION] in dirs

    def get_body_sections(self, dirs: List[str]) -> List[str]:
        m = []
        for d in dirs:
            if d[0] == self.BODYTEXT_SECTION:
                m.append(int(d[1][self.SECTION_NAME_LENGTH :]))

        return ["BodyText/Section" + str(x) for x in sorted(m)]

    def _text_to_document(
        self, text: str, extra_info: Optional[Dict] = None
    ) -> Document:
        return Document(text=text, extra_info=extra_info or {})

    def get_text(self) -> str:
        return self.text

        # ์ „์ฒด text ์ถ”์ถœ

    def _get_text(self, load_file: Any, file_dirs: List[str]) -> str:
        sections = self.get_body_sections(file_dirs)
        text = ""
        for section in sections:
            text += self.get_text_from_section(load_file, section)
            text += "\n"

        self.text = text
        return self.text

    def is_compressed(self, load_file: Any) -> bool:
        header = load_file.openstream("FileHeader")
        header_data = header.read()
        return (header_data[36] & 1) == 1

    def get_text_from_section(self, load_file: Any, section: str) -> str:
        bodytext = load_file.openstream(section)
        data = bodytext.read()

        unpacked_data = (
            zlib.decompress(data, -15) if self.is_compressed(load_file) else data
        )
        size = len(unpacked_data)

        i = 0

        text = ""
        while i < size:
            header = struct.unpack_from("<I", unpacked_data, i)[0]
            rec_type = header & 0x3FF
            (header >> 10) & 0x3FF
            rec_len = (header >> 20) & 0xFFF

            if rec_type in self.HWP_TEXT_TAGS:
                rec_data = unpacked_data[i + 4 : i + 4 + rec_len]
                text += rec_data.decode("utf-16")
                text += "\n"

            i += 4 + rec_len

        return text

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Load data and extract table from Hwp file.

Parameters:

Name Type Description Default
file Path

Path for the Hwp file.

required

Returns:

Type Description
List[Document]

List[Document]

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/docs/base.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Load data and extract table from Hwp file.

    Args:
        file (Path): Path for the Hwp file.

    Returns:
        List[Document]
    """
    import olefile

    if fs:
        logger.warning(
            "fs was specified but HWPReader doesn't support loading "
            "from fsspec filesystems. Will load from local filesystem instead."
        )

    if not isinstance(file, Path):
        file = Path(file)
    load_file = olefile.OleFileIO(file)
    file_dir = load_file.listdir()
    if self.is_valid(file_dir) is False:
        raise Exception("Not Valid HwpFile")

    result_text = self._get_text(load_file, file_dir)
    result = self._text_to_document(text=result_text, extra_info=extra_info)
    return [result]

IPYNBReader #

Bases: BaseReader

Image parser.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/ipynb/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class IPYNBReader(BaseReader):
    """Image parser."""

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        concatenate: bool = False,
    ):
        """Init params."""
        self._parser_config = parser_config
        self._concatenate = concatenate

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        if file.name.endswith(".ipynb"):
            try:
                import nbconvert
            except ImportError:
                raise ImportError("Please install nbconvert 'pip install nbconvert' ")
        if fs:
            with fs.open(file, encoding="utf-8") as f:
                string = nbconvert.exporters.ScriptExporter().from_file(f)[0]
        else:
            string = nbconvert.exporters.ScriptExporter().from_file(file)[0]
        # split each In[] cell into a separate string
        splits = re.split(r"In\[\d+\]:", string)
        # remove the first element, which is empty
        splits.pop(0)

        if self._concatenate:
            docs = [Document(text="\n\n".join(splits), metadata=extra_info or {})]
        else:
            docs = [Document(text=s, metadata=extra_info or {}) for s in splits]
        return docs

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/ipynb/base.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    if file.name.endswith(".ipynb"):
        try:
            import nbconvert
        except ImportError:
            raise ImportError("Please install nbconvert 'pip install nbconvert' ")
    if fs:
        with fs.open(file, encoding="utf-8") as f:
            string = nbconvert.exporters.ScriptExporter().from_file(f)[0]
    else:
        string = nbconvert.exporters.ScriptExporter().from_file(file)[0]
    # split each In[] cell into a separate string
    splits = re.split(r"In\[\d+\]:", string)
    # remove the first element, which is empty
    splits.pop(0)

    if self._concatenate:
        docs = [Document(text="\n\n".join(splits), metadata=extra_info or {})]
    else:
        docs = [Document(text=s, metadata=extra_info or {}) for s in splits]
    return docs

ImageCaptionReader #

Bases: BaseReader

Image parser.

Caption image using Blip.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image_caption/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class ImageCaptionReader(BaseReader):
    """Image parser.

    Caption image using Blip.

    """

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        prompt: Optional[str] = None,
    ):
        """Init params."""
        if parser_config is None:
            """Init parser."""
            try:
                import sentencepiece  # noqa
                import torch
                from PIL import Image  # noqa
                from transformers import BlipForConditionalGeneration, BlipProcessor
            except ImportError:
                raise ImportError(
                    "Please install extra dependencies that are required for "
                    "the ImageCaptionReader: "
                    "`pip install torch transformers sentencepiece Pillow`"
                )

            device = infer_torch_device()
            dtype = torch.float16 if torch.cuda.is_available() else torch.float32

            processor = BlipProcessor.from_pretrained(
                "Salesforce/blip-image-captioning-large"
            )
            model = BlipForConditionalGeneration.from_pretrained(
                "Salesforce/blip-image-captioning-large", torch_dtype=dtype
            )

            parser_config = {
                "processor": processor,
                "model": model,
                "device": device,
                "dtype": dtype,
            }

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._prompt = prompt

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """Parse file."""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        image = Image.open(file)
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        device = self._parser_config["device"]
        dtype = self._parser_config["dtype"]
        model.to(device)

        # unconditional image captioning

        inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

        out = model.generate(**inputs)
        text_str = processor.decode(out[0], skip_special_tokens=True)

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                image_path=str(file),
                metadata=extra_info or {},
            )
        ]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image_caption/base.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """Parse file."""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    image = Image.open(file)
    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    model = self._parser_config["model"]
    processor = self._parser_config["processor"]

    device = self._parser_config["device"]
    dtype = self._parser_config["dtype"]
    model.to(device)

    # unconditional image captioning

    inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

    out = model.generate(**inputs)
    text_str = processor.decode(out[0], skip_special_tokens=True)

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            image_path=str(file),
            metadata=extra_info or {},
        )
    ]

ImageReader #

Bases: BaseReader

Image parser.

Extract text from images using DONUT or pytesseract.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class ImageReader(BaseReader):
    """Image parser.

    Extract text from images using DONUT or pytesseract.

    """

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        parse_text: bool = False,
        text_type: str = "text",
        pytesseract_model_kwargs: Dict[str, Any] = {},
    ):
        """Init parser."""
        self._text_type = text_type
        if parser_config is None and parse_text:
            if text_type == "plain_text":
                try:
                    import pytesseract
                except ImportError:
                    raise ImportError(
                        "Please install extra dependencies that are required for "
                        "the ImageReader when text_type is 'plain_text': "
                        "`pip install pytesseract`"
                    )
                processor = None
                model = pytesseract
            else:
                try:
                    import sentencepiece  # noqa
                    import torch  # noqa
                    from PIL import Image  # noqa
                    from transformers import DonutProcessor, VisionEncoderDecoderModel
                except ImportError:
                    raise ImportError(
                        "Please install extra dependencies that are required for "
                        "the ImageCaptionReader: "
                        "`pip install torch transformers sentencepiece Pillow`"
                    )

                processor = DonutProcessor.from_pretrained(
                    "naver-clova-ix/donut-base-finetuned-cord-v2"
                )
                model = VisionEncoderDecoderModel.from_pretrained(
                    "naver-clova-ix/donut-base-finetuned-cord-v2"
                )
            parser_config = {"processor": processor, "model": model}

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._parse_text = parse_text
        self._pytesseract_model_kwargs = pytesseract_model_kwargs

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        if fs:
            with fs.open(path=file) as f:
                image = Image.open(f.read())
        else:
            image = Image.open(file)

        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        text_str: str = ""
        if self._parse_text:
            assert self._parser_config is not None
            model = self._parser_config["model"]
            processor = self._parser_config["processor"]

            if processor:
                device = infer_torch_device()
                model.to(device)

                # prepare decoder inputs
                task_prompt = "<s_cord-v2>"
                decoder_input_ids = processor.tokenizer(
                    task_prompt, add_special_tokens=False, return_tensors="pt"
                ).input_ids

                pixel_values = processor(image, return_tensors="pt").pixel_values

                outputs = model.generate(
                    pixel_values.to(device),
                    decoder_input_ids=decoder_input_ids.to(device),
                    max_length=model.decoder.config.max_position_embeddings,
                    early_stopping=True,
                    pad_token_id=processor.tokenizer.pad_token_id,
                    eos_token_id=processor.tokenizer.eos_token_id,
                    use_cache=True,
                    num_beams=3,
                    bad_words_ids=[[processor.tokenizer.unk_token_id]],
                    return_dict_in_generate=True,
                )

                sequence = processor.batch_decode(outputs.sequences)[0]
                sequence = sequence.replace(processor.tokenizer.eos_token, "").replace(
                    processor.tokenizer.pad_token, ""
                )
                # remove first task start token
                text_str = re.sub(r"<.*?>", "", sequence, count=1).strip()
            else:
                import pytesseract

                model = cast(pytesseract, self._parser_config["model"])
                text_str = model.image_to_string(
                    image, **self._pytesseract_model_kwargs
                )

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                image_path=str(file),
                metadata=extra_info or {},
            )
        ]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image/base.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    if fs:
        with fs.open(path=file) as f:
            image = Image.open(f.read())
    else:
        image = Image.open(file)

    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    text_str: str = ""
    if self._parse_text:
        assert self._parser_config is not None
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        if processor:
            device = infer_torch_device()
            model.to(device)

            # prepare decoder inputs
            task_prompt = "<s_cord-v2>"
            decoder_input_ids = processor.tokenizer(
                task_prompt, add_special_tokens=False, return_tensors="pt"
            ).input_ids

            pixel_values = processor(image, return_tensors="pt").pixel_values

            outputs = model.generate(
                pixel_values.to(device),
                decoder_input_ids=decoder_input_ids.to(device),
                max_length=model.decoder.config.max_position_embeddings,
                early_stopping=True,
                pad_token_id=processor.tokenizer.pad_token_id,
                eos_token_id=processor.tokenizer.eos_token_id,
                use_cache=True,
                num_beams=3,
                bad_words_ids=[[processor.tokenizer.unk_token_id]],
                return_dict_in_generate=True,
            )

            sequence = processor.batch_decode(outputs.sequences)[0]
            sequence = sequence.replace(processor.tokenizer.eos_token, "").replace(
                processor.tokenizer.pad_token, ""
            )
            # remove first task start token
            text_str = re.sub(r"<.*?>", "", sequence, count=1).strip()
        else:
            import pytesseract

            model = cast(pytesseract, self._parser_config["model"])
            text_str = model.image_to_string(
                image, **self._pytesseract_model_kwargs
            )

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            image_path=str(file),
            metadata=extra_info or {},
        )
    ]

ImageTabularChartReader #

Bases: BaseReader

Image parser.

Extract tabular data from a chart or figure.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image_deplot/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class ImageTabularChartReader(BaseReader):
    """Image parser.

    Extract tabular data from a chart or figure.

    """

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        max_output_tokens=512,
        prompt: str = "Generate underlying data table of the figure below:",
    ):
        """Init params."""
        if parser_config is None:
            try:
                import torch
                from PIL import Image  # noqa: F401
                from transformers import (
                    Pix2StructForConditionalGeneration,
                    Pix2StructProcessor,
                )
            except ImportError:
                raise ImportError(
                    "Please install extra dependencies that are required for "
                    "the ImageCaptionReader: "
                    "`pip install torch transformers Pillow`"
                )

            device = "cuda" if torch.cuda.is_available() else "cpu"
            dtype = torch.float16 if torch.cuda.is_available() else torch.float32
            processor = Pix2StructProcessor.from_pretrained("google/deplot")
            model = Pix2StructForConditionalGeneration.from_pretrained(
                "google/deplot", torch_dtype=dtype
            )
            parser_config = {
                "processor": processor,
                "model": model,
                "device": device,
                "dtype": dtype,
            }

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._max_output_tokens = max_output_tokens
        self._prompt = prompt

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """Parse file."""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        image = Image.open(file)
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        device = self._parser_config["device"]
        dtype = self._parser_config["dtype"]
        model.to(device)

        # unconditional image captioning

        inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

        out = model.generate(**inputs, max_new_tokens=self._max_output_tokens)
        text_str = "Figure or chart with tabular data: " + processor.decode(
            out[0], skip_special_tokens=True
        )

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                extra_info=extra_info or {},
            )
        ]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image_deplot/base.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """Parse file."""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    image = Image.open(file)
    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    model = self._parser_config["model"]
    processor = self._parser_config["processor"]

    device = self._parser_config["device"]
    dtype = self._parser_config["dtype"]
    model.to(device)

    # unconditional image captioning

    inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

    out = model.generate(**inputs, max_new_tokens=self._max_output_tokens)
    text_str = "Figure or chart with tabular data: " + processor.decode(
        out[0], skip_special_tokens=True
    )

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            extra_info=extra_info or {},
        )
    ]

ImageVisionLLMReader #

Bases: BaseReader

Image parser.

Caption image using Blip2 (a multimodal VisionLLM similar to GPT4).

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image_vision_llm/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class ImageVisionLLMReader(BaseReader):
    """Image parser.

    Caption image using Blip2 (a multimodal VisionLLM similar to GPT4).

    """

    def __init__(
        self,
        parser_config: Optional[Dict] = None,
        keep_image: bool = False,
        prompt: str = "Question: describe what you see in this image. Answer:",
    ):
        """Init params."""
        if parser_config is None:
            try:
                import sentencepiece  # noqa
                import torch
                from PIL import Image  # noqa
                from transformers import Blip2ForConditionalGeneration, Blip2Processor
            except ImportError:
                raise ImportError(
                    "Please install extra dependencies that are required for "
                    "the ImageCaptionReader: "
                    "`pip install torch transformers sentencepiece Pillow`"
                )

            device = infer_torch_device()
            dtype = torch.float16 if torch.cuda.is_available() else torch.float32
            processor = Blip2Processor.from_pretrained("Salesforce/blip2-opt-2.7b")
            model = Blip2ForConditionalGeneration.from_pretrained(
                "Salesforce/blip2-opt-2.7b", torch_dtype=dtype
            )
            parser_config = {
                "processor": processor,
                "model": model,
                "device": device,
                "dtype": dtype,
            }

        self._parser_config = parser_config
        self._keep_image = keep_image
        self._prompt = prompt

    def load_data(
        self, file: Path, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """Parse file."""
        from llama_index.core.img_utils import img_2_b64
        from PIL import Image

        # load document image
        image = Image.open(file)
        if image.mode != "RGB":
            image = image.convert("RGB")

        # Encode image into base64 string and keep in document
        image_str: Optional[str] = None
        if self._keep_image:
            image_str = img_2_b64(image)

        # Parse image into text
        model = self._parser_config["model"]
        processor = self._parser_config["processor"]

        device = self._parser_config["device"]
        dtype = self._parser_config["dtype"]
        model.to(device)

        # unconditional image captioning

        inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

        out = model.generate(**inputs)
        text_str = processor.decode(out[0], skip_special_tokens=True)

        return [
            ImageDocument(
                text=text_str,
                image=image_str,
                image_path=str(file),
                metadata=extra_info or {},
            )
        ]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/image_vision_llm/base.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def load_data(
    self, file: Path, extra_info: Optional[Dict] = None
) -> List[Document]:
    """Parse file."""
    from llama_index.core.img_utils import img_2_b64
    from PIL import Image

    # load document image
    image = Image.open(file)
    if image.mode != "RGB":
        image = image.convert("RGB")

    # Encode image into base64 string and keep in document
    image_str: Optional[str] = None
    if self._keep_image:
        image_str = img_2_b64(image)

    # Parse image into text
    model = self._parser_config["model"]
    processor = self._parser_config["processor"]

    device = self._parser_config["device"]
    dtype = self._parser_config["dtype"]
    model.to(device)

    # unconditional image captioning

    inputs = processor(image, self._prompt, return_tensors="pt").to(device, dtype)

    out = model.generate(**inputs)
    text_str = processor.decode(out[0], skip_special_tokens=True)

    return [
        ImageDocument(
            text=text_str,
            image=image_str,
            image_path=str(file),
            metadata=extra_info or {},
        )
    ]

MarkdownReader #

Bases: BaseReader

Markdown parser.

Extract text from markdown files. Returns dictionary with keys as headers and values as the text between headers.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/markdown/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class MarkdownReader(BaseReader):
    """Markdown parser.

    Extract text from markdown files.
    Returns dictionary with keys as headers and values as the text between headers.

    """

    def __init__(
        self,
        *args: Any,
        remove_hyperlinks: bool = True,
        remove_images: bool = True,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)
        self._remove_hyperlinks = remove_hyperlinks
        self._remove_images = remove_images

    def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
        """Convert a markdown file to a dictionary.

        The keys are the headers and the values are the text under each header.

        """
        markdown_tups: List[Tuple[Optional[str], str]] = []
        lines = markdown_text.split("\n")

        current_header = None
        current_text = ""

        for line in lines:
            header_match = re.match(r"^#+\s", line)
            if header_match:
                if current_header is not None:
                    if current_text == "" or None:
                        continue
                    markdown_tups.append((current_header, current_text))

                current_header = line
                current_text = ""
            else:
                current_text += line + "\n"
        markdown_tups.append((current_header, current_text))

        if current_header is not None:
            # pass linting, assert keys are defined
            markdown_tups = [
                (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value))
                for key, value in markdown_tups
            ]
        else:
            markdown_tups = [
                (key, re.sub("<.*?>", "", value)) for key, value in markdown_tups
            ]

        return markdown_tups

    def remove_images(self, content: str) -> str:
        """Remove images in markdown content."""
        pattern = r"!{1}\[\[(.*)\]\]"
        return re.sub(pattern, "", content)

    def remove_hyperlinks(self, content: str) -> str:
        """Remove hyperlinks in markdown content."""
        pattern = r"\[(.*?)\]\((.*?)\)"
        return re.sub(pattern, r"\1", content)

    def _init_parser(self) -> Dict:
        """Initialize the parser with the config."""
        return {}

    def parse_tups(
        self,
        filepath: Path,
        errors: str = "ignore",
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Tuple[Optional[str], str]]:
        """Parse file into tuples."""
        fs = fs or LocalFileSystem()
        with fs.open(filepath, encoding="utf-8") as f:
            content = f.read().decode(encoding="utf-8")
        if self._remove_hyperlinks:
            content = self.remove_hyperlinks(content)
        if self._remove_images:
            content = self.remove_images(content)
        return self.markdown_to_tups(content)

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file into string."""
        tups = self.parse_tups(file, fs=fs)
        results = []
        # TODO: don't include headers right now
        for header, value in tups:
            if header is None:
                results.append(Document(text=value, metadata=extra_info or {}))
            else:
                results.append(
                    Document(text=f"\n\n{header}\n{value}", metadata=extra_info or {})
                )
        return results

markdown_to_tups #

markdown_to_tups(markdown_text: str) -> List[Tuple[Optional[str], str]]

Convert a markdown file to a dictionary.

The keys are the headers and the values are the text under each header.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/markdown/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
    """Convert a markdown file to a dictionary.

    The keys are the headers and the values are the text under each header.

    """
    markdown_tups: List[Tuple[Optional[str], str]] = []
    lines = markdown_text.split("\n")

    current_header = None
    current_text = ""

    for line in lines:
        header_match = re.match(r"^#+\s", line)
        if header_match:
            if current_header is not None:
                if current_text == "" or None:
                    continue
                markdown_tups.append((current_header, current_text))

            current_header = line
            current_text = ""
        else:
            current_text += line + "\n"
    markdown_tups.append((current_header, current_text))

    if current_header is not None:
        # pass linting, assert keys are defined
        markdown_tups = [
            (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value))
            for key, value in markdown_tups
        ]
    else:
        markdown_tups = [
            (key, re.sub("<.*?>", "", value)) for key, value in markdown_tups
        ]

    return markdown_tups

remove_images #

remove_images(content: str) -> str

Remove images in markdown content.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/markdown/base.py
76
77
78
79
def remove_images(self, content: str) -> str:
    """Remove images in markdown content."""
    pattern = r"!{1}\[\[(.*)\]\]"
    return re.sub(pattern, "", content)
remove_hyperlinks(content: str) -> str

Remove hyperlinks in markdown content.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/markdown/base.py
81
82
83
84
def remove_hyperlinks(self, content: str) -> str:
    """Remove hyperlinks in markdown content."""
    pattern = r"\[(.*?)\]\((.*?)\)"
    return re.sub(pattern, r"\1", content)

parse_tups #

parse_tups(filepath: Path, errors: str = 'ignore', fs: Optional[AbstractFileSystem] = None) -> List[Tuple[Optional[str], str]]

Parse file into tuples.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/markdown/base.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def parse_tups(
    self,
    filepath: Path,
    errors: str = "ignore",
    fs: Optional[AbstractFileSystem] = None,
) -> List[Tuple[Optional[str], str]]:
    """Parse file into tuples."""
    fs = fs or LocalFileSystem()
    with fs.open(filepath, encoding="utf-8") as f:
        content = f.read().decode(encoding="utf-8")
    if self._remove_hyperlinks:
        content = self.remove_hyperlinks(content)
    if self._remove_images:
        content = self.remove_images(content)
    return self.markdown_to_tups(content)

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file into string.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/markdown/base.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file into string."""
    tups = self.parse_tups(file, fs=fs)
    results = []
    # TODO: don't include headers right now
    for header, value in tups:
        if header is None:
            results.append(Document(text=value, metadata=extra_info or {}))
        else:
            results.append(
                Document(text=f"\n\n{header}\n{value}", metadata=extra_info or {})
            )
    return results

MboxReader #

Bases: BaseReader

Mbox parser.

Extract messages from mailbox files. Returns string including date, subject, sender, receiver and content for each message.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/mbox/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class MboxReader(BaseReader):
    """Mbox parser.

    Extract messages from mailbox files.
    Returns string including date, subject, sender, receiver and
    content for each message.

    """

    DEFAULT_MESSAGE_FORMAT: str = (
        "Date: {_date}\n"
        "From: {_from}\n"
        "To: {_to}\n"
        "Subject: {_subject}\n"
        "Content: {_content}"
    )

    def __init__(
        self,
        *args: Any,
        max_count: int = 0,
        message_format: str = DEFAULT_MESSAGE_FORMAT,
        **kwargs: Any,
    ) -> None:
        """Init params."""
        try:
            from bs4 import BeautifulSoup  # noqa
        except ImportError:
            raise ImportError(
                "`beautifulsoup4` package not found: `pip install beautifulsoup4`"
            )

        super().__init__(*args, **kwargs)
        self.max_count = max_count
        self.message_format = message_format

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file into string."""
        # Import required libraries
        import mailbox
        from email.parser import BytesParser
        from email.policy import default

        from bs4 import BeautifulSoup

        if fs:
            logger.warning(
                "fs was specified but MboxReader doesn't support loading "
                "from fsspec filesystems. Will load from local filesystem instead."
            )

        i = 0
        results: List[str] = []
        # Load file using mailbox
        bytes_parser = BytesParser(policy=default).parse
        mbox = mailbox.mbox(file, factory=bytes_parser)  # type: ignore

        # Iterate through all messages
        for _, _msg in enumerate(mbox):
            try:
                msg: mailbox.mboxMessage = _msg
                # Parse multipart messages
                if msg.is_multipart():
                    for part in msg.walk():
                        ctype = part.get_content_type()
                        cdispo = str(part.get("Content-Disposition"))
                        if ctype == "text/plain" and "attachment" not in cdispo:
                            content = part.get_payload(decode=True)  # decode
                            break
                # Get plain message payload for non-multipart messages
                else:
                    content = msg.get_payload(decode=True)

                # Parse message HTML content and remove unneeded whitespace
                soup = BeautifulSoup(content)
                stripped_content = " ".join(soup.get_text().split())
                # Format message to include date, sender, receiver and subject
                msg_string = self.message_format.format(
                    _date=msg["date"],
                    _from=msg["from"],
                    _to=msg["to"],
                    _subject=msg["subject"],
                    _content=stripped_content,
                )
                # Add message string to results
                results.append(msg_string)
            except Exception as e:
                logger.warning(f"Failed to parse message:\n{_msg}\n with exception {e}")

            # Increment counter and return if max count is met
            i += 1
            if self.max_count > 0 and i >= self.max_count:
                break

        return [Document(text=result, metadata=extra_info or {}) for result in results]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file into string.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/mbox/base.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file into string."""
    # Import required libraries
    import mailbox
    from email.parser import BytesParser
    from email.policy import default

    from bs4 import BeautifulSoup

    if fs:
        logger.warning(
            "fs was specified but MboxReader doesn't support loading "
            "from fsspec filesystems. Will load from local filesystem instead."
        )

    i = 0
    results: List[str] = []
    # Load file using mailbox
    bytes_parser = BytesParser(policy=default).parse
    mbox = mailbox.mbox(file, factory=bytes_parser)  # type: ignore

    # Iterate through all messages
    for _, _msg in enumerate(mbox):
        try:
            msg: mailbox.mboxMessage = _msg
            # Parse multipart messages
            if msg.is_multipart():
                for part in msg.walk():
                    ctype = part.get_content_type()
                    cdispo = str(part.get("Content-Disposition"))
                    if ctype == "text/plain" and "attachment" not in cdispo:
                        content = part.get_payload(decode=True)  # decode
                        break
            # Get plain message payload for non-multipart messages
            else:
                content = msg.get_payload(decode=True)

            # Parse message HTML content and remove unneeded whitespace
            soup = BeautifulSoup(content)
            stripped_content = " ".join(soup.get_text().split())
            # Format message to include date, sender, receiver and subject
            msg_string = self.message_format.format(
                _date=msg["date"],
                _from=msg["from"],
                _to=msg["to"],
                _subject=msg["subject"],
                _content=stripped_content,
            )
            # Add message string to results
            results.append(msg_string)
        except Exception as e:
            logger.warning(f"Failed to parse message:\n{_msg}\n with exception {e}")

        # Increment counter and return if max count is met
        i += 1
        if self.max_count > 0 and i >= self.max_count:
            break

    return [Document(text=result, metadata=extra_info or {}) for result in results]

PDFReader #

Bases: BaseReader

PDF parser.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/docs/base.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class PDFReader(BaseReader):
    """PDF parser."""

    def __init__(self, return_full_document: Optional[bool] = False) -> None:
        """
        Initialize PDFReader.
        """
        self.return_full_document = return_full_document

    @retry(
        stop=stop_after_attempt(RETRY_TIMES),
    )
    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        if not isinstance(file, Path):
            file = Path(file)

        try:
            import pypdf
        except ImportError:
            raise ImportError(
                "pypdf is required to read PDF files: `pip install pypdf`"
            )
        fs = fs or get_default_fs()
        with fs.open(file, "rb") as fp:
            # Load the file in memory if the filesystem is not the default one to avoid
            # issues with pypdf
            stream = fp if is_default_fs(fs) else io.BytesIO(fp.read())

            # Create a PDF object
            pdf = pypdf.PdfReader(stream)

            # Get the number of pages in the PDF document
            num_pages = len(pdf.pages)

            docs = []

            # This block returns a whole PDF as a single Document
            if self.return_full_document:
                text = ""
                metadata = {"file_name": file.name}

                for page in range(num_pages):
                    # Extract the text from the page
                    page_text = pdf.pages[page].extract_text()
                    text += page_text

                docs.append(Document(text=text, metadata=metadata))

            # This block returns each page of a PDF as its own Document
            else:
                # Iterate over every page

                for page in range(num_pages):
                    # Extract the text from the page
                    page_text = pdf.pages[page].extract_text()
                    page_label = pdf.page_labels[page]

                    metadata = {"page_label": page_label, "file_name": file.name}
                    if extra_info is not None:
                        metadata.update(extra_info)

                    docs.append(Document(text=page_text, metadata=metadata))

            return docs

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/docs/base.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@retry(
    stop=stop_after_attempt(RETRY_TIMES),
)
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    if not isinstance(file, Path):
        file = Path(file)

    try:
        import pypdf
    except ImportError:
        raise ImportError(
            "pypdf is required to read PDF files: `pip install pypdf`"
        )
    fs = fs or get_default_fs()
    with fs.open(file, "rb") as fp:
        # Load the file in memory if the filesystem is not the default one to avoid
        # issues with pypdf
        stream = fp if is_default_fs(fs) else io.BytesIO(fp.read())

        # Create a PDF object
        pdf = pypdf.PdfReader(stream)

        # Get the number of pages in the PDF document
        num_pages = len(pdf.pages)

        docs = []

        # This block returns a whole PDF as a single Document
        if self.return_full_document:
            text = ""
            metadata = {"file_name": file.name}

            for page in range(num_pages):
                # Extract the text from the page
                page_text = pdf.pages[page].extract_text()
                text += page_text

            docs.append(Document(text=text, metadata=metadata))

        # This block returns each page of a PDF as its own Document
        else:
            # Iterate over every page

            for page in range(num_pages):
                # Extract the text from the page
                page_text = pdf.pages[page].extract_text()
                page_label = pdf.page_labels[page]

                metadata = {"page_label": page_label, "file_name": file.name}
                if extra_info is not None:
                    metadata.update(extra_info)

                docs.append(Document(text=page_text, metadata=metadata))

        return docs

PagedCSVReader #

Bases: BaseReader

Paged CSV parser.

Displayed each row in an LLM-friendly format on a separate document.

Parameters:

Name Type Description Default
encoding str

Encoding used to open the file. utf-8 by default.

'utf-8'
Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/paged_csv/base.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class PagedCSVReader(BaseReader):
    """Paged CSV parser.

    Displayed each row in an LLM-friendly format on a separate document.

    Args:
        encoding (str): Encoding used to open the file.
            utf-8 by default.
    """

    def __init__(self, *args: Any, encoding: str = "utf-8", **kwargs: Any) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)
        self._encoding = encoding

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        delimiter: str = ",",
        quotechar: str = '"',
    ) -> List[Document]:
        """Parse file."""
        import csv

        docs = []
        with open(file, encoding=self._encoding) as fp:
            csv_reader = csv.DictReader(f=fp, delimiter=delimiter, quotechar=quotechar)  # type: ignore
            for row in csv_reader:
                docs.append(
                    Document(
                        text="\n".join(
                            f"{k.strip()}: {v.strip()}" for k, v in row.items()
                        ),
                        extra_info=extra_info or {},
                    )
                )
        return docs

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, delimiter: str = ',', quotechar: str = '"') -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/paged_csv/base.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    delimiter: str = ",",
    quotechar: str = '"',
) -> List[Document]:
    """Parse file."""
    import csv

    docs = []
    with open(file, encoding=self._encoding) as fp:
        csv_reader = csv.DictReader(f=fp, delimiter=delimiter, quotechar=quotechar)  # type: ignore
        for row in csv_reader:
            docs.append(
                Document(
                    text="\n".join(
                        f"{k.strip()}: {v.strip()}" for k, v in row.items()
                    ),
                    extra_info=extra_info or {},
                )
            )
    return docs

PandasCSVReader #

Bases: BaseReader

Pandas-based CSV parser.

Parses CSVs using the separator detection from Pandas read_csvfunction. If special parameters are required, use the pandas_config dict.

Parameters:

Name Type Description Default
concat_rows bool

whether to concatenate all rows into one document. If set to False, a Document will be created for each row. True by default.

True
col_joiner str

Separator to use for joining cols per row. Set to ", " by default.

', '
row_joiner str

Separator to use for joining each row. Only used when concat_rows=True. Set to "\n" by default.

'\n'
pandas_config dict

Options for the pandas.read_csv function call. Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html for more information. Set to empty dict by default, this means pandas will try to figure out the separators, table head, etc. on its own.

{}
Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/tabular/base.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class PandasCSVReader(BaseReader):
    r"""Pandas-based CSV parser.

    Parses CSVs using the separator detection from Pandas `read_csv`function.
    If special parameters are required, use the `pandas_config` dict.

    Args:
        concat_rows (bool): whether to concatenate all rows into one document.
            If set to False, a Document will be created for each row.
            True by default.

        col_joiner (str): Separator to use for joining cols per row.
            Set to ", " by default.

        row_joiner (str): Separator to use for joining each row.
            Only used when `concat_rows=True`.
            Set to "\n" by default.

        pandas_config (dict): Options for the `pandas.read_csv` function call.
            Refer to https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html
            for more information.
            Set to empty dict by default, this means pandas will try to figure
            out the separators, table head, etc. on its own.

    """

    def __init__(
        self,
        *args: Any,
        concat_rows: bool = True,
        col_joiner: str = ", ",
        row_joiner: str = "\n",
        pandas_config: dict = {},
        **kwargs: Any
    ) -> None:
        """Init params."""
        super().__init__(*args, **kwargs)
        self._concat_rows = concat_rows
        self._col_joiner = col_joiner
        self._row_joiner = row_joiner
        self._pandas_config = pandas_config

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        if fs:
            with fs.open(file) as f:
                df = pd.read_csv(f, **self._pandas_config)
        else:
            df = pd.read_csv(file, **self._pandas_config)

        text_list = df.apply(
            lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1
        ).tolist()

        if self._concat_rows:
            return [
                Document(
                    text=(self._row_joiner).join(text_list), metadata=extra_info or {}
                )
            ]
        else:
            return [
                Document(text=text, metadata=extra_info or {}) for text in text_list
            ]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/tabular/base.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    if fs:
        with fs.open(file) as f:
            df = pd.read_csv(f, **self._pandas_config)
    else:
        df = pd.read_csv(file, **self._pandas_config)

    text_list = df.apply(
        lambda row: (self._col_joiner).join(row.astype(str).tolist()), axis=1
    ).tolist()

    if self._concat_rows:
        return [
            Document(
                text=(self._row_joiner).join(text_list), metadata=extra_info or {}
            )
        ]
    else:
        return [
            Document(text=text, metadata=extra_info or {}) for text in text_list
        ]

PptxReader #

Bases: BaseReader

Powerpoint parser.

Extract text, caption images, and specify slides.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/slides/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class PptxReader(BaseReader):
    """Powerpoint parser.

    Extract text, caption images, and specify slides.

    """

    def __init__(self) -> None:
        """Init parser."""
        try:
            import torch  # noqa
            from PIL import Image  # noqa
            from pptx import Presentation  # noqa
            from transformers import (
                AutoTokenizer,
                VisionEncoderDecoderModel,
                ViTFeatureExtractor,
            )
        except ImportError:
            raise ImportError(
                "Please install extra dependencies that are required for "
                "the PptxReader: "
                "`pip install torch transformers python-pptx Pillow`"
            )

        model = VisionEncoderDecoderModel.from_pretrained(
            "nlpconnect/vit-gpt2-image-captioning"
        )
        feature_extractor = ViTFeatureExtractor.from_pretrained(
            "nlpconnect/vit-gpt2-image-captioning"
        )
        tokenizer = AutoTokenizer.from_pretrained(
            "nlpconnect/vit-gpt2-image-captioning"
        )

        self.parser_config = {
            "feature_extractor": feature_extractor,
            "model": model,
            "tokenizer": tokenizer,
        }

    def caption_image(self, tmp_image_file: str) -> str:
        """Generate text caption of image."""
        from PIL import Image

        model = self.parser_config["model"]
        feature_extractor = self.parser_config["feature_extractor"]
        tokenizer = self.parser_config["tokenizer"]

        device = infer_torch_device()
        model.to(device)

        max_length = 16
        num_beams = 4
        gen_kwargs = {"max_length": max_length, "num_beams": num_beams}

        i_image = Image.open(tmp_image_file)
        if i_image.mode != "RGB":
            i_image = i_image.convert(mode="RGB")

        pixel_values = feature_extractor(
            images=[i_image], return_tensors="pt"
        ).pixel_values
        pixel_values = pixel_values.to(device)

        output_ids = model.generate(pixel_values, **gen_kwargs)

        preds = tokenizer.batch_decode(output_ids, skip_special_tokens=True)
        return preds[0].strip()

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        from pptx import Presentation

        if fs:
            with fs.open(file) as f:
                presentation = Presentation(f)
        else:
            presentation = Presentation(file)
        result = ""
        for i, slide in enumerate(presentation.slides):
            result += f"\n\nSlide #{i}: \n"
            for shape in slide.shapes:
                if hasattr(shape, "image"):
                    image = shape.image
                    # get image "file" contents
                    image_bytes = image.blob
                    # temporarily save the image to feed into model
                    image_filename = f"tmp_image.{image.ext}"
                    with open(image_filename, "wb") as f:
                        f.write(image_bytes)
                    result += f"\n Image: {self.caption_image(image_filename)}\n\n"

                    os.remove(image_filename)
                if hasattr(shape, "text"):
                    result += f"{shape.text}\n"

        return [Document(text=result, metadata=extra_info or {})]

caption_image #

caption_image(tmp_image_file: str) -> str

Generate text caption of image.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/slides/base.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def caption_image(self, tmp_image_file: str) -> str:
    """Generate text caption of image."""
    from PIL import Image

    model = self.parser_config["model"]
    feature_extractor = self.parser_config["feature_extractor"]
    tokenizer = self.parser_config["tokenizer"]

    device = infer_torch_device()
    model.to(device)

    max_length = 16
    num_beams = 4
    gen_kwargs = {"max_length": max_length, "num_beams": num_beams}

    i_image = Image.open(tmp_image_file)
    if i_image.mode != "RGB":
        i_image = i_image.convert(mode="RGB")

    pixel_values = feature_extractor(
        images=[i_image], return_tensors="pt"
    ).pixel_values
    pixel_values = pixel_values.to(device)

    output_ids = model.generate(pixel_values, **gen_kwargs)

    preds = tokenizer.batch_decode(output_ids, skip_special_tokens=True)
    return preds[0].strip()

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/slides/base.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    from pptx import Presentation

    if fs:
        with fs.open(file) as f:
            presentation = Presentation(f)
    else:
        presentation = Presentation(file)
    result = ""
    for i, slide in enumerate(presentation.slides):
        result += f"\n\nSlide #{i}: \n"
        for shape in slide.shapes:
            if hasattr(shape, "image"):
                image = shape.image
                # get image "file" contents
                image_bytes = image.blob
                # temporarily save the image to feed into model
                image_filename = f"tmp_image.{image.ext}"
                with open(image_filename, "wb") as f:
                    f.write(image_bytes)
                result += f"\n Image: {self.caption_image(image_filename)}\n\n"

                os.remove(image_filename)
            if hasattr(shape, "text"):
                result += f"{shape.text}\n"

    return [Document(text=result, metadata=extra_info or {})]

PyMuPDFReader #

Bases: BaseReader

Read PDF files using PyMuPDF library.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/pymu_pdf/base.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class PyMuPDFReader(BaseReader):
    """Read PDF files using PyMuPDF library."""

    def load_data(
        self,
        file_path: Union[Path, str],
        metadata: bool = True,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """Loads list of documents from PDF file and also accepts extra information in dict format."""
        return self.load(file_path, metadata=metadata, extra_info=extra_info)

    def load(
        self,
        file_path: Union[Path, str],
        metadata: bool = True,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """Loads list of documents from PDF file and also accepts extra information in dict format.

        Args:
            file_path (Union[Path, str]): file path of PDF file (accepts string or Path).
            metadata (bool, optional): if metadata to be included or not. Defaults to True.
            extra_info (Optional[Dict], optional): extra information related to each document in dict format. Defaults to None.

        Raises:
            TypeError: if extra_info is not a dictionary.
            TypeError: if file_path is not a string or Path.

        Returns:
            List[Document]: list of documents.
        """
        import fitz

        # check if file_path is a string or Path
        if not isinstance(file_path, str) and not isinstance(file_path, Path):
            raise TypeError("file_path must be a string or Path.")

        # open PDF file
        doc = fitz.open(file_path)

        # if extra_info is not None, check if it is a dictionary
        if extra_info:
            if not isinstance(extra_info, dict):
                raise TypeError("extra_info must be a dictionary.")

        # if metadata is True, add metadata to each document
        if metadata:
            if not extra_info:
                extra_info = {}
            extra_info["total_pages"] = len(doc)
            extra_info["file_path"] = str(file_path)

            # return list of documents
            return [
                Document(
                    text=page.get_text().encode("utf-8"),
                    extra_info=dict(
                        extra_info,
                        **{
                            "source": f"{page.number+1}",
                        },
                    ),
                )
                for page in doc
            ]

        else:
            return [
                Document(
                    text=page.get_text().encode("utf-8"), extra_info=extra_info or {}
                )
                for page in doc
            ]

load_data #

load_data(file_path: Union[Path, str], metadata: bool = True, extra_info: Optional[Dict] = None) -> List[Document]

Loads list of documents from PDF file and also accepts extra information in dict format.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/pymu_pdf/base.py
13
14
15
16
17
18
19
20
def load_data(
    self,
    file_path: Union[Path, str],
    metadata: bool = True,
    extra_info: Optional[Dict] = None,
) -> List[Document]:
    """Loads list of documents from PDF file and also accepts extra information in dict format."""
    return self.load(file_path, metadata=metadata, extra_info=extra_info)

load #

load(file_path: Union[Path, str], metadata: bool = True, extra_info: Optional[Dict] = None) -> List[Document]

Loads list of documents from PDF file and also accepts extra information in dict format.

Parameters:

Name Type Description Default
file_path Union[Path, str]

file path of PDF file (accepts string or Path).

required
metadata bool

if metadata to be included or not. Defaults to True.

True
extra_info Optional[Dict]

extra information related to each document in dict format. Defaults to None.

None

Raises:

Type Description
TypeError

if extra_info is not a dictionary.

TypeError

if file_path is not a string or Path.

Returns:

Type Description
List[Document]

List[Document]: list of documents.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/pymu_pdf/base.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def load(
    self,
    file_path: Union[Path, str],
    metadata: bool = True,
    extra_info: Optional[Dict] = None,
) -> List[Document]:
    """Loads list of documents from PDF file and also accepts extra information in dict format.

    Args:
        file_path (Union[Path, str]): file path of PDF file (accepts string or Path).
        metadata (bool, optional): if metadata to be included or not. Defaults to True.
        extra_info (Optional[Dict], optional): extra information related to each document in dict format. Defaults to None.

    Raises:
        TypeError: if extra_info is not a dictionary.
        TypeError: if file_path is not a string or Path.

    Returns:
        List[Document]: list of documents.
    """
    import fitz

    # check if file_path is a string or Path
    if not isinstance(file_path, str) and not isinstance(file_path, Path):
        raise TypeError("file_path must be a string or Path.")

    # open PDF file
    doc = fitz.open(file_path)

    # if extra_info is not None, check if it is a dictionary
    if extra_info:
        if not isinstance(extra_info, dict):
            raise TypeError("extra_info must be a dictionary.")

    # if metadata is True, add metadata to each document
    if metadata:
        if not extra_info:
            extra_info = {}
        extra_info["total_pages"] = len(doc)
        extra_info["file_path"] = str(file_path)

        # return list of documents
        return [
            Document(
                text=page.get_text().encode("utf-8"),
                extra_info=dict(
                    extra_info,
                    **{
                        "source": f"{page.number+1}",
                    },
                ),
            )
            for page in doc
        ]

    else:
        return [
            Document(
                text=page.get_text().encode("utf-8"), extra_info=extra_info or {}
            )
            for page in doc
        ]

RTFReader #

Bases: BaseReader

RTF (Rich Text Format) Reader. Reads rtf file and convert to Document.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/rtf/base.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class RTFReader(BaseReader):
    """RTF (Rich Text Format) Reader. Reads rtf file and convert to Document."""

    def load_data(
        self,
        input_file: Union[Path, str],
        extra_info=Dict[str, Any],
        **load_kwargs: Any
    ) -> List[Document]:
        """Load data from RTF file.

        Args:
            input_file (Path | str): Path for the RTF file.
            extra_info (Dict[str, Any]): Path for the RTF file.

        Returns:
            List[Document]: List of documents.
        """
        try:
            from striprtf.striprtf import rtf_to_text
        except ImportError:
            raise ImportError("striprtf is required to read RTF files.")

        with open(str(input_file)) as f:
            text = rtf_to_text(f.read())
            return [Document(text=text.strip())]

load_data #

load_data(input_file: Union[Path, str], extra_info=Dict[str, Any], **load_kwargs: Any) -> List[Document]

Load data from RTF file.

Parameters:

Name Type Description Default
input_file Path | str

Path for the RTF file.

required
extra_info Dict[str, Any]

Path for the RTF file.

Dict[str, Any]

Returns:

Type Description
List[Document]

List[Document]: List of documents.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/rtf/base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def load_data(
    self,
    input_file: Union[Path, str],
    extra_info=Dict[str, Any],
    **load_kwargs: Any
) -> List[Document]:
    """Load data from RTF file.

    Args:
        input_file (Path | str): Path for the RTF file.
        extra_info (Dict[str, Any]): Path for the RTF file.

    Returns:
        List[Document]: List of documents.
    """
    try:
        from striprtf.striprtf import rtf_to_text
    except ImportError:
        raise ImportError("striprtf is required to read RTF files.")

    with open(str(input_file)) as f:
        text = rtf_to_text(f.read())
        return [Document(text=text.strip())]

UnstructuredReader #

Bases: BaseReader

General unstructured text reader for a variety of files.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/unstructured/base.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class UnstructuredReader(BaseReader):
    """General unstructured text reader for a variety of files."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Init params."""
        super().__init__(*args)  # not passing kwargs to parent bc it cannot accept it

        self.api = False  # we default to local
        if "url" in kwargs:
            self.server_url = str(kwargs["url"])
            self.api = True  # is url was set, switch to api
        else:
            self.server_url = "http://localhost:8000"

        if "api" in kwargs:
            self.api = kwargs["api"]

        self.api_key = ""
        if "api_key" in kwargs:
            self.api_key = kwargs["api_key"]

        # Prerequisite for Unstructured.io to work
        import nltk

        if not nltk.data.find("tokenizers/punkt"):
            nltk.download("punkt")
        if not nltk.data.find("taggers/averaged_perceptron_tagger"):
            nltk.download("averaged_perceptron_tagger")

    """ Loads data using Unstructured.io py

        Depending on the constructin if url is set or api = True
        it'll parse file using API call, else parse it locally
        extra_info is extended by the returned metadata if
        split_documents is True

        Returns list of documents
    """

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        split_documents: Optional[bool] = False,
    ) -> List[Document]:
        """If api is set, parse through api."""
        if self.api:
            from unstructured.partition.api import partition_via_api

            elements = partition_via_api(
                filename=str(file),
                api_key=self.api_key,
                api_url=self.server_url + "/general/v0/general",
            )
        else:
            """Parse file locally"""
            from unstructured.partition.auto import partition

            elements = partition(filename=str(file))

        """ Process elements """
        docs = []
        if split_documents:
            for node in elements:
                metadata = {}
                if hasattr(node, "metadata"):
                    """Load metadata fields"""
                    for field, val in vars(node.metadata).items():
                        if field == "_known_field_names":
                            continue
                        # removing coordinates because it does not serialize
                        # and dont want to bother with it
                        if field == "coordinates":
                            continue
                        # removing bc it might cause interference
                        if field == "parent_id":
                            continue
                        metadata[field] = val

                if extra_info is not None:
                    metadata.update(extra_info)

                metadata["filename"] = str(file)
                docs.append(Document(text=node.text, extra_info=metadata))

        else:
            text_chunks = [" ".join(str(el).split()) for el in elements]

            metadata = {}

            if extra_info is not None:
                metadata.update(extra_info)

            metadata["filename"] = str(file)
            # Create a single document by joining all the texts
            docs.append(Document(text="\n\n".join(text_chunks), extra_info=metadata))

        return docs

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, split_documents: Optional[bool] = False) -> List[Document]

If api is set, parse through api.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/unstructured/base.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    split_documents: Optional[bool] = False,
) -> List[Document]:
    """If api is set, parse through api."""
    if self.api:
        from unstructured.partition.api import partition_via_api

        elements = partition_via_api(
            filename=str(file),
            api_key=self.api_key,
            api_url=self.server_url + "/general/v0/general",
        )
    else:
        """Parse file locally"""
        from unstructured.partition.auto import partition

        elements = partition(filename=str(file))

    """ Process elements """
    docs = []
    if split_documents:
        for node in elements:
            metadata = {}
            if hasattr(node, "metadata"):
                """Load metadata fields"""
                for field, val in vars(node.metadata).items():
                    if field == "_known_field_names":
                        continue
                    # removing coordinates because it does not serialize
                    # and dont want to bother with it
                    if field == "coordinates":
                        continue
                    # removing bc it might cause interference
                    if field == "parent_id":
                        continue
                    metadata[field] = val

            if extra_info is not None:
                metadata.update(extra_info)

            metadata["filename"] = str(file)
            docs.append(Document(text=node.text, extra_info=metadata))

    else:
        text_chunks = [" ".join(str(el).split()) for el in elements]

        metadata = {}

        if extra_info is not None:
            metadata.update(extra_info)

        metadata["filename"] = str(file)
        # Create a single document by joining all the texts
        docs.append(Document(text="\n\n".join(text_chunks), extra_info=metadata))

    return docs

VideoAudioReader #

Bases: BaseReader

Video audio parser.

Extract text from transcript of video/audio files.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/video_audio/base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class VideoAudioReader(BaseReader):
    """Video audio parser.

    Extract text from transcript of video/audio files.

    """

    def __init__(self, *args: Any, model_version: str = "base", **kwargs: Any) -> None:
        """Init parser."""
        super().__init__(*args, **kwargs)
        self._model_version = model_version

        try:
            import whisper
        except ImportError:
            raise ImportError(
                "Please install OpenAI whisper model "
                "'pip install git+https://github.com/openai/whisper.git' "
                "to use the model"
            )

        model = whisper.load_model(self._model_version)

        self.parser_config = {"model": model}

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
        fs: Optional[AbstractFileSystem] = None,
    ) -> List[Document]:
        """Parse file."""
        import whisper

        if file.name.endswith("mp4"):
            try:
                from pydub import AudioSegment
            except ImportError:
                raise ImportError("Please install pydub 'pip install pydub' ")
            if fs:
                with fs.open(file, "rb") as f:
                    video = AudioSegment.from_file(f, format="mp4")
            else:
                # open file
                video = AudioSegment.from_file(file, format="mp4")

            # Extract audio from video
            audio = video.split_to_mono()[0]

            file_str = str(file)[:-4] + ".mp3"
            # export file
            audio.export(file_str, format="mp3")

        model = cast(whisper.Whisper, self.parser_config["model"])
        result = model.transcribe(str(file))

        transcript = result["text"]

        return [Document(text=transcript, metadata=extra_info or {})]

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None, fs: Optional[AbstractFileSystem] = None) -> List[Document]

Parse file.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/video_audio/base.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
    fs: Optional[AbstractFileSystem] = None,
) -> List[Document]:
    """Parse file."""
    import whisper

    if file.name.endswith("mp4"):
        try:
            from pydub import AudioSegment
        except ImportError:
            raise ImportError("Please install pydub 'pip install pydub' ")
        if fs:
            with fs.open(file, "rb") as f:
                video = AudioSegment.from_file(f, format="mp4")
        else:
            # open file
            video = AudioSegment.from_file(file, format="mp4")

        # Extract audio from video
        audio = video.split_to_mono()[0]

        file_str = str(file)[:-4] + ".mp3"
        # export file
        audio.export(file_str, format="mp3")

    model = cast(whisper.Whisper, self.parser_config["model"])
    result = model.transcribe(str(file))

    transcript = result["text"]

    return [Document(text=transcript, metadata=extra_info or {})]

XMLReader #

Bases: BaseReader

XML reader.

Reads XML documents with options to help suss out relationships between nodes.

Parameters:

Name Type Description Default
tree_level_split int

From which level in the xml tree we split documents,

0
Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/xml/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class XMLReader(BaseReader):
    """XML reader.

    Reads XML documents with options to help suss out relationships between nodes.

    Args:
        tree_level_split (int): From which level in the xml tree we split documents,
        the default level is the root which is level 0

    """

    def __init__(self, tree_level_split: Optional[int] = 0) -> None:
        """Initialize with arguments."""
        super().__init__()
        self.tree_level_split = tree_level_split

    def _parse_xmlelt_to_document(
        self, root: ET.Element, extra_info: Optional[Dict] = None
    ) -> List[Document]:
        """Parse the xml object into a list of Documents.

        Args:
            root: The XML Element to be converted.
            extra_info (Optional[Dict]): Additional information. Default is None.

        Returns:
            Document: The documents.
        """
        nodes = _get_leaf_nodes_up_to_level(root, self.tree_level_split)
        documents = []
        for node in nodes:
            content = ET.tostring(node, encoding="utf8").decode("utf-8")
            content = re.sub(r"^<\?xml.*", "", content)
            content = content.strip()
            documents.append(Document(text=content, extra_info=extra_info or {}))

        return documents

    def load_data(
        self,
        file: Path,
        extra_info: Optional[Dict] = None,
    ) -> List[Document]:
        """Load data from the input file.

        Args:
            file (Path): Path to the input file.
            extra_info (Optional[Dict]): Additional information. Default is None.

        Returns:
            List[Document]: List of documents.
        """
        if not isinstance(file, Path):
            file = Path(file)

        tree = ET.parse(file)
        return self._parse_xmlelt_to_document(tree.getroot(), extra_info)

load_data #

load_data(file: Path, extra_info: Optional[Dict] = None) -> List[Document]

Load data from the input file.

Parameters:

Name Type Description Default
file Path

Path to the input file.

required
extra_info Optional[Dict]

Additional information. Default is None.

None

Returns:

Type Description
List[Document]

List[Document]: List of documents.

Source code in llama-index-integrations/readers/llama-index-readers-file/llama_index/readers/file/xml/base.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def load_data(
    self,
    file: Path,
    extra_info: Optional[Dict] = None,
) -> List[Document]:
    """Load data from the input file.

    Args:
        file (Path): Path to the input file.
        extra_info (Optional[Dict]): Additional information. Default is None.

    Returns:
        List[Document]: List of documents.
    """
    if not isinstance(file, Path):
        file = Path(file)

    tree = ET.parse(file)
    return self._parse_xmlelt_to_document(tree.getroot(), extra_info)