Skip to content

Google

GmailReader #

Bases: BaseReader, BaseModel

Gmail reader.

Reads emails

Parameters:

Name Type Description Default
max_results int

Defaults to 10.

required
query str

Gmail query. Defaults to None.

required
service Any

Gmail service. Defaults to None.

required
results_per_page Optional[int]

Max number of results per page. Defaults to 10.

required
use_iterative_parser bool

Use iterative parser. Defaults to False.

required
Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/gmail/base.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class GmailReader(BaseReader, BaseModel):
    """Gmail reader.

    Reads emails

    Args:
        max_results (int): Defaults to 10.
        query (str): Gmail query. Defaults to None.
        service (Any): Gmail service. Defaults to None.
        results_per_page (Optional[int]): Max number of results per page. Defaults to 10.
        use_iterative_parser (bool): Use iterative parser. Defaults to False.
    """

    query: str = None
    use_iterative_parser: bool = False
    max_results: int = 10
    service: Any
    results_per_page: Optional[int]

    def load_data(self) -> List[Document]:
        """Load emails from the user's account."""
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        if not self.service:
            self.service = build("gmail", "v1", credentials=credentials)

        messages = self.search_messages()

        results = []
        for message in messages:
            text = message.pop("body")
            extra_info = message
            results.append(Document(text=text, extra_info=extra_info or {}))

        return results

    def _get_credentials(self) -> Any:
        """Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.
        """
        import os

        from google_auth_oauthlib.flow import InstalledAppFlow

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=8080)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def search_messages(self):
        query = self.query

        max_results = self.max_results
        if self.results_per_page:
            max_results = self.results_per_page

        results = (
            self.service.users()
            .messages()
            .list(userId="me", q=query, maxResults=int(max_results))
            .execute()
        )
        messages = results.get("messages", [])

        if len(messages) < self.max_results:
            # paginate if there are more results
            while "nextPageToken" in results:
                page_token = results["nextPageToken"]
                results = (
                    self.service.users()
                    .messages()
                    .list(
                        userId="me",
                        q=query,
                        pageToken=page_token,
                        maxResults=int(max_results),
                    )
                    .execute()
                )
                messages.extend(results["messages"])
                if len(messages) >= self.max_results:
                    break

        result = []
        try:
            for message in messages:
                message_data = self.get_message_data(message)
                if not message_data:
                    continue
                result.append(message_data)
        except Exception as e:
            raise Exception("Can't get message data" + str(e))

        return result

    def get_message_data(self, message):
        message_id = message["id"]
        message_data = (
            self.service.users()
            .messages()
            .get(format="raw", userId="me", id=message_id)
            .execute()
        )
        if self.use_iterative_parser:
            body = self.extract_message_body_iterative(message_data)
        else:
            body = self.extract_message_body(message_data)

        if not body:
            return None

        # https://developers.google.com/gmail/api/reference/rest/v1/users.messages
        return {
            "id": message_data["id"],
            "threadId": message_data["threadId"],
            "snippet": message_data["snippet"],
            "internalDate": message_data["internalDate"],
            "body": body,
        }

    def extract_message_body_iterative(self, message: dict):
        if message["raw"]:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf-8"))
            mime_msg = email.message_from_bytes(body)
        else:
            mime_msg = message

        body_text = ""
        if mime_msg.get_content_type() == "text/plain":
            plain_text = mime_msg.get_payload(decode=True)
            charset = mime_msg.get_content_charset("utf-8")
            body_text = plain_text.decode(charset).encode("utf-8").decode("utf-8")

        elif mime_msg.get_content_maintype() == "multipart":
            msg_parts = mime_msg.get_payload()
            for msg_part in msg_parts:
                body_text += self.extract_message_body_iterative(msg_part)

        return body_text

    def extract_message_body(self, message: dict):
        from bs4 import BeautifulSoup

        try:
            body = base64.urlsafe_b64decode(message["raw"].encode("utf-8"))
            mime_msg = email.message_from_bytes(body)

            # If the message body contains HTML, parse it with BeautifulSoup
            if "text/html" in mime_msg:
                soup = BeautifulSoup(body, "html.parser")
                body = soup.get_text()
            return body.decode("utf-8")
        except Exception as e:
            raise Exception("Can't parse message body" + str(e))

load_data #

load_data() -> List[Document]

Load emails from the user's account.

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/gmail/base.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def load_data(self) -> List[Document]:
    """Load emails from the user's account."""
    from googleapiclient.discovery import build

    credentials = self._get_credentials()
    if not self.service:
        self.service = build("gmail", "v1", credentials=credentials)

    messages = self.search_messages()

    results = []
    for message in messages:
        text = message.pop("body")
        extra_info = message
        results.append(Document(text=text, extra_info=extra_info or {}))

    return results

GoogleCalendarReader #

Bases: BaseReader

Google Calendar reader.

Reads events from Google Calendar

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/calendar/base.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class GoogleCalendarReader(BaseReader):
    """Google Calendar reader.

    Reads events from Google Calendar

    """

    def load_data(
        self,
        number_of_results: Optional[int] = 100,
        start_date: Optional[Union[str, datetime.date]] = None,
    ) -> List[Document]:
        """Load data from user's calendar.

        Args:
            number_of_results (Optional[int]): the number of events to return. Defaults to 100.
            start_date (Optional[Union[str, datetime.date]]): the start date to return events from. Defaults to today.
        """
        from googleapiclient.discovery import build

        credentials = self._get_credentials()
        service = build("calendar", "v3", credentials=credentials)

        if start_date is None:
            start_date = datetime.date.today()
        elif isinstance(start_date, str):
            start_date = datetime.date.fromisoformat(start_date)

        start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
        start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

        events_result = (
            service.events()
            .list(
                calendarId="primary",
                timeMin=start_datetime_utc,
                maxResults=number_of_results,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )

        events = events_result.get("items", [])

        if not events:
            return []

        results = []
        for event in events:
            if "dateTime" in event["start"]:
                start_time = event["start"]["dateTime"]
            else:
                start_time = event["start"]["date"]

            if "dateTime" in event["end"]:
                end_time = event["end"]["dateTime"]
            else:
                end_time = event["end"]["date"]

            event_string = f"Status: {event['status']}, "
            event_string += f"Summary: {event['summary']}, "
            event_string += f"Start time: {start_time}, "
            event_string += f"End time: {end_time}, "

            organizer = event.get("organizer", {})
            display_name = organizer.get("displayName", "N/A")
            email = organizer.get("email", "N/A")
            if display_name != "N/A":
                event_string += f"Organizer: {display_name} ({email})"
            else:
                event_string += f"Organizer: {email}"

            results.append(Document(text=event_string))

        return results

    def _get_credentials(self) -> Any:
        """Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.
        """
        from google_auth_oauthlib.flow import InstalledAppFlow

        from google.auth.transport.requests import Request
        from google.oauth2.credentials import Credentials

        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

load_data #

load_data(number_of_results: Optional[int] = 100, start_date: Optional[Union[str, date]] = None) -> List[Document]

Load data from user's calendar.

Parameters:

Name Type Description Default
number_of_results Optional[int]

the number of events to return. Defaults to 100.

100
start_date Optional[Union[str, date]]

the start date to return events from. Defaults to today.

None
Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/calendar/base.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def load_data(
    self,
    number_of_results: Optional[int] = 100,
    start_date: Optional[Union[str, datetime.date]] = None,
) -> List[Document]:
    """Load data from user's calendar.

    Args:
        number_of_results (Optional[int]): the number of events to return. Defaults to 100.
        start_date (Optional[Union[str, datetime.date]]): the start date to return events from. Defaults to today.
    """
    from googleapiclient.discovery import build

    credentials = self._get_credentials()
    service = build("calendar", "v3", credentials=credentials)

    if start_date is None:
        start_date = datetime.date.today()
    elif isinstance(start_date, str):
        start_date = datetime.date.fromisoformat(start_date)

    start_datetime = datetime.datetime.combine(start_date, datetime.time.min)
    start_datetime_utc = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

    events_result = (
        service.events()
        .list(
            calendarId="primary",
            timeMin=start_datetime_utc,
            maxResults=number_of_results,
            singleEvents=True,
            orderBy="startTime",
        )
        .execute()
    )

    events = events_result.get("items", [])

    if not events:
        return []

    results = []
    for event in events:
        if "dateTime" in event["start"]:
            start_time = event["start"]["dateTime"]
        else:
            start_time = event["start"]["date"]

        if "dateTime" in event["end"]:
            end_time = event["end"]["dateTime"]
        else:
            end_time = event["end"]["date"]

        event_string = f"Status: {event['status']}, "
        event_string += f"Summary: {event['summary']}, "
        event_string += f"Start time: {start_time}, "
        event_string += f"End time: {end_time}, "

        organizer = event.get("organizer", {})
        display_name = organizer.get("displayName", "N/A")
        email = organizer.get("email", "N/A")
        if display_name != "N/A":
            event_string += f"Organizer: {display_name} ({email})"
        else:
            event_string += f"Organizer: {email}"

        results.append(Document(text=event_string))

    return results

GoogleDocsReader #

Bases: BasePydanticReader

Google Docs reader.

Reads a page from Google Docs

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/docs/base.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class GoogleDocsReader(BasePydanticReader):
    """Google Docs reader.

    Reads a page from Google Docs

    """

    is_remote: bool = True

    split_on_heading_level: Optional[int] = Field(
        default=None,
        description="If set the document will be split on the specified heading level.",
    )

    include_toc: bool = Field(
        default=True, description="Include table of contents elements."
    )

    @classmethod
    def class_name(cls) -> str:
        return "GoogleDocsReader"

    def load_data(self, document_ids: List[str]) -> List[Document]:
        """Load data from the input directory.

        Args:
            document_ids (List[str]): a list of document ids.
        """
        if document_ids is None:
            raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

        results = []
        for document_id in document_ids:
            docs = self._load_doc(document_id)
            results.extend(docs)

        return results

    def _load_doc(self, document_id: str) -> str:
        """Load a document from Google Docs.

        Args:
            document_id: the document id.

        Returns:
            The document text.
        """
        credentials = self._get_credentials()
        docs_service = discovery.build("docs", "v1", credentials=credentials)
        google_doc = docs_service.documents().get(documentId=document_id).execute()
        google_doc_content = google_doc.get("body").get("content")

        doc_metadata = {"document_id": document_id}

        return self._structural_elements_to_docs(google_doc_content, doc_metadata)

    def _get_credentials(self) -> Any:
        """Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.
        """
        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

    def _read_paragraph_element(self, element: Any) -> Any:
        """Return the text in the given ParagraphElement.

        Args:
            element: a ParagraphElement from a Google Doc.
        """
        text_run = element.get("textRun")
        if not text_run:
            return ""
        return text_run.get("content")

    def _read_structural_elements(self, elements: List[Any]) -> Any:
        """Recurse through a list of Structural Elements.

        Read a document's text where text may be in nested elements.

        Args:
            elements: a list of Structural Elements.
        """
        text = ""
        for value in elements:
            if "paragraph" in value:
                elements = value.get("paragraph").get("elements")
                for elem in elements:
                    text += self._read_paragraph_element(elem)
            elif "table" in value:
                # The text in table cells are in nested Structural Elements
                # and tables may be nested.
                table = value.get("table")
                for row in table.get("tableRows"):
                    cells = row.get("tableCells")
                    for cell in cells:
                        text += self._read_structural_elements(cell.get("content"))
            elif "tableOfContents" in value:
                # The text in the TOC is also in a Structural Element.
                toc = value.get("tableOfContents")
                text += self._read_structural_elements(toc.get("content"))
        return text

    def _determine_heading_level(self, element):
        """Extracts the heading level, label, and ID from a document element.

        Args:
            element: a Structural Element.
        """
        level = None
        heading_key = None
        heading_id = None
        if self.split_on_heading_level and "paragraph" in element:
            style = element.get("paragraph").get("paragraphStyle")
            style_type = style.get("namedStyleType", "")
            heading_id = style.get("headingId", None)
            if style_type == "TITLE":
                level = 0
                heading_key = "title"
            elif style_type.startswith("HEADING_"):
                level = int(style_type.split("_")[1])
                if level > self.split_on_heading_level:
                    return None, None, None

                heading_key = f"Header {level}"

        return level, heading_key, heading_id

    def _generate_doc_id(self, metadata: dict):
        if "heading_id" in metadata:
            heading_id = metadata["heading_id"]
        else:
            heading_id = "".join(
                random.choices(string.ascii_letters + string.digits, k=8)
            )
        return f"{metadata['document_id']}_{heading_id}"

    def _structural_elements_to_docs(
        self, elements: List[Any], doc_metadata: dict
    ) -> Any:
        """Recurse through a list of Structural Elements.

        Split documents on heading if split_on_heading_level is set.

        Args:
            elements: a list of Structural Elements.
        """
        docs = []

        current_heading_level = self.split_on_heading_level

        metadata = doc_metadata.copy()
        text = ""
        for value in elements:
            element_text = self._read_structural_elements([value])

            level, heading_key, heading_id = self._determine_heading_level(value)

            if level is not None:
                if level == self.split_on_heading_level:
                    if text.strip():
                        docs.append(
                            Document(
                                id_=self._generate_doc_id(metadata),
                                text=text,
                                metadata=metadata.copy(),
                            )
                        )
                        text = ""
                    if "heading_id" in metadata:
                        metadata["heading_id"] = heading_id
                elif level < current_heading_level:
                    metadata = doc_metadata.copy()

                metadata[heading_key] = element_text
                current_heading_level = level
            else:
                text += element_text

        if text:
            if docs:
                id_ = self._generate_doc_id(metadata)
            else:
                id_ = metadata["document_id"]
            docs.append(Document(id_=id_, text=text, metadata=metadata))

        return docs

load_data #

load_data(document_ids: List[str]) -> List[Document]

Load data from the input directory.

Parameters:

Name Type Description Default
document_ids List[str]

a list of document ids.

required
Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/docs/base.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def load_data(self, document_ids: List[str]) -> List[Document]:
    """Load data from the input directory.

    Args:
        document_ids (List[str]): a list of document ids.
    """
    if document_ids is None:
        raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

    results = []
    for document_id in document_ids:
        docs = self._load_doc(document_id)
        results.extend(docs)

    return results

GoogleDriveReader #

Bases: BaseReader

Google drive reader.

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/drive/base.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
class GoogleDriveReader(BaseReader):
    """Google drive reader."""

    def __init__(
        self,
        credentials_path: str = "credentials.json",
        token_path: str = "token.json",
        pydrive_creds_path: str = "creds.txt",
    ) -> None:
        """Initialize with parameters."""
        self.credentials_path = credentials_path
        self.token_path = token_path
        self.pydrive_creds_path = pydrive_creds_path

        self._creds = None
        self._drive = None

        # Download Google Docs/Slides/Sheets as actual files
        # See https://developers.google.com/drive/v3/web/mime-types
        self._mimetypes = {
            "application/vnd.google-apps.document": {
                "mimetype": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
                "extension": ".docx",
            },
            "application/vnd.google-apps.spreadsheet": {
                "mimetype": (
                    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
                ),
                "extension": ".xlsx",
            },
            "application/vnd.google-apps.presentation": {
                "mimetype": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
                "extension": ".pptx",
            },
        }

    def _get_credentials(self) -> Tuple[Credentials, GoogleDrive]:
        """Authenticate with Google and save credentials.
        Download the credentials.json file with these instructions: https://developers.google.com/drive/api/v3/quickstart/python.
            Copy credentials.json file and rename it to client_secrets.json file which will be used by pydrive for downloading files.
            So, we need two files:
                1. credentials.json
                2. client_secrets.json
            Both 1, 2 are essentially same but needed with two different names according to google-api-python-client, google-auth-httplib2, google-auth-oauthlib and pydrive libraries.

        Returns:
            credentials, pydrive object.
        """
        from google_auth_oauthlib.flow import InstalledAppFlow
        from pydrive.auth import GoogleAuth

        # First, we need the Google API credentials for the app
        creds = None

        if Path(self.token_path).exists():
            creds = Credentials.from_authorized_user_file(self.token_path, SCOPES)
        elif Path(self.credentials_path).exists():
            creds = service_account.Credentials.from_service_account_file(
                self.credentials_path, scopes=SCOPES
            )
            gauth = GoogleAuth()
            gauth.credentials = creds
            drive = GoogleDrive(gauth)
            return creds, drive

        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    self.credentials_path, SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open(self.token_path, "w", encoding="utf-8") as token:
                token.write(creds.to_json())

        # Next, we need user authentication to download files (via pydrive)
        # Uses client_secrets.json file for authorization.
        gauth = GoogleAuth()
        # Try to load saved client credentials
        gauth.LoadCredentialsFile(self.pydrive_creds_path)
        if gauth.credentials is None:
            # Authenticate if they're not there
            gauth.LocalWebserverAuth()
        elif gauth.access_token_expired:
            # Refresh them if expired
            gauth.Refresh()
        else:
            # Initialize the saved creds
            gauth.Authorize()
        # Save the current credentials to a file so user doesn't have to auth every time
        gauth.SaveCredentialsFile(self.pydrive_creds_path)

        drive = GoogleDrive(gauth)

        return creds, drive

    def _get_fileids_meta(
        self,
        folder_id: Optional[str] = None,
        file_id: Optional[str] = None,
        mime_types: Optional[List[str]] = None,
        query_string: Optional[str] = None,
    ) -> List[List[str]]:
        """Get file ids present in folder/ file id
        Args:
            folder_id: folder id of the folder in google drive.
            file_id: file id of the file in google drive
            mime_types: The mimeTypes you want to allow e.g.: "application/vnd.google-apps.document"
            query_string: A more generic query string to filter the documents, e.g. "name contains 'test'".

        Returns:
            metadata: List of metadata of filde ids.
        """
        from googleapiclient.discovery import build

        try:
            service = build("drive", "v3", credentials=self._creds)
            fileids_meta = []
            if folder_id:
                folder_mime_type = "application/vnd.google-apps.folder"
                query = "('" + folder_id + "' in parents)"

                # Add mimeType filter to query
                if mime_types:
                    if folder_mime_type not in mime_types:
                        mime_types.append(folder_mime_type)  # keep the recursiveness
                    mime_query = " or ".join(
                        [f"mimeType='{mime_type}'" for mime_type in mime_types]
                    )
                    query += f" and ({mime_query})"

                # Add query string filter
                if query_string:
                    # to keep the recursiveness, we need to add folder_mime_type to the mime_types
                    query += (
                        f" and ((mimeType='{folder_mime_type}') or ({query_string}))"
                    )

                items = []
                # get files taking into account that the results are paginated
                while True:
                    results = (
                        service.files()
                        .list(
                            q=query,
                            includeItemsFromAllDrives=True,
                            supportsAllDrives=True,
                            fields="*",
                        )
                        .execute()
                    )
                    items.extend(results.get("files", []))
                    page_token = results.get("nextPageToken", None)
                    if page_token is None:
                        break

                for item in items:
                    if item["mimeType"] == folder_mime_type:
                        fileids_meta.extend(
                            self._get_fileids_meta(
                                folder_id=item["id"],
                                mime_types=mime_types,
                                query_string=query_string,
                            )
                        )
                    else:
                        # Check if file doesn't belong to a Shared Drive. "owners" doesn't exist in a Shared Drive
                        is_shared_drive = "driveId" in item
                        author = (
                            item["owners"][0]["displayName"]
                            if not is_shared_drive
                            else "Shared Drive"
                        )

                        fileids_meta.append(
                            (
                                item["id"],
                                author,
                                item["name"],
                                item["mimeType"],
                                item["createdTime"],
                                item["modifiedTime"],
                            )
                        )
            else:
                # Get the file details
                file = (
                    service.files()
                    .get(fileId=file_id, supportsAllDrives=True, fields="*")
                    .execute()
                )
                # Get metadata of the file
                # Check if file doesn't belong to a Shared Drive. "owners" doesn't exist in a Shared Drive
                is_shared_drive = "driveId" in file
                author = (
                    file["owners"][0]["displayName"]
                    if not is_shared_drive
                    else "Shared Drive"
                )

                fileids_meta.append(
                    (
                        file["id"],
                        author,
                        file["name"],
                        file["mimeType"],
                        file["createdTime"],
                        file["modifiedTime"],
                    )
                )
            return fileids_meta

        except Exception as e:
            logger.error(
                f"An error occurred while getting fileids metadata: {e}", exc_info=True
            )

    def _download_file(self, fileid: str, filename: str) -> str:
        """Download the file with fileid and filename
        Args:
            fileid: file id of the file in google drive
            filename: filename with which it will be downloaded
        Returns:
            The downloaded filename, which which may have a new extension.
        """
        from io import BytesIO

        from googleapiclient.discovery import build
        from googleapiclient.http import MediaIoBaseDownload

        try:
            # Get file details
            service = build("drive", "v3", credentials=self._creds)
            file = service.files().get(fileId=fileid, supportsAllDrives=True).execute()

            if file["mimeType"] in self._mimetypes:
                download_mimetype = self._mimetypes[file["mimeType"]]["mimetype"]
                download_extension = self._mimetypes[file["mimeType"]]["extension"]
                new_file_name = filename + download_extension

                # Download and convert file
                request = service.files().export_media(
                    fileId=fileid, mimeType=download_mimetype
                )
            else:
                new_file_name = filename

                # Download file without conversion
                request = service.files().get_media(fileId=fileid)

            # Download file data
            file_data = BytesIO()
            downloader = MediaIoBaseDownload(file_data, request)
            done = False

            while not done:
                status, done = downloader.next_chunk()

            # Save the downloaded file
            with open(new_file_name, "wb") as f:
                f.write(file_data.getvalue())

            return new_file_name
        except Exception as e:
            logger.error(
                f"An error occurred while downloading file: {e}", exc_info=True
            )

    def _load_data_fileids_meta(self, fileids_meta: List[List[str]]) -> List[Document]:
        """Load data from fileids metadata
        Args:
            fileids_meta: metadata of fileids in google drive.

        Returns:
            Lis[Document]: List of Document of data present in fileids.
        """
        try:
            with tempfile.TemporaryDirectory() as temp_dir:

                def get_metadata(filename):
                    return metadata[filename]

                temp_dir = Path(temp_dir)
                metadata = {}

                for fileid_meta in fileids_meta:
                    # Download files and name them with their fileid
                    fileid = fileid_meta[0]
                    filepath = os.path.join(temp_dir, fileid)
                    final_filepath = self._download_file(fileid, filepath)

                    # Add metadata of the file to metadata dictionary
                    metadata[final_filepath] = {
                        "file id": fileid_meta[0],
                        "author": fileid_meta[1],
                        "file name": fileid_meta[2],
                        "mime type": fileid_meta[3],
                        "created at": fileid_meta[4],
                        "modified at": fileid_meta[5],
                    }
                loader = SimpleDirectoryReader(temp_dir, file_metadata=get_metadata)
                documents = loader.load_data()
                for doc in documents:
                    doc.id_ = doc.metadata.get("file id", doc.id_)

            return documents
        except Exception as e:
            logger.error(
                f"An error occurred while loading data from fileids meta: {e}",
                exc_info=True,
            )

    def _load_from_file_ids(
        self,
        file_ids: List[str],
        mime_types: Optional[List[str]],
        query_string: Optional[str],
    ) -> List[Document]:
        """Load data from file ids
        Args:
            file_ids: File ids of the files in google drive.
            mime_types: The mimeTypes you want to allow e.g.: "application/vnd.google-apps.document"
            query_string: List of query strings to filter the documents, e.g. "name contains 'test'".

        Returns:
            Document: List of Documents of text.
        """
        try:
            fileids_meta = []
            for file_id in file_ids:
                fileids_meta.extend(
                    self._get_fileids_meta(
                        file_id=file_id,
                        mime_types=mime_types,
                        query_string=query_string,
                    )
                )
            return self._load_data_fileids_meta(fileids_meta)
        except Exception as e:
            logger.error(
                f"An error occurred while loading with fileid: {e}", exc_info=True
            )

    def _load_from_folder(
        self,
        folder_id: str,
        mime_types: Optional[List[str]],
        query_string: Optional[str],
    ) -> List[Document]:
        """Load data from folder_id.

        Args:
            folder_id: folder id of the folder in google drive.
            mime_types: The mimeTypes you want to allow e.g.: "application/vnd.google-apps.document"
            query_string: A more generic query string to filter the documents, e.g. "name contains 'test'".

        Returns:
            Document: List of Documents of text.
        """
        try:
            fileids_meta = self._get_fileids_meta(
                folder_id=folder_id,
                mime_types=mime_types,
                query_string=query_string,
            )
            return self._load_data_fileids_meta(fileids_meta)
        except Exception as e:
            logger.error(
                f"An error occurred while loading from folder: {e}", exc_info=True
            )

    def load_data(
        self,
        folder_id: Optional[str] = None,
        file_ids: Optional[List[str]] = None,
        mime_types: Optional[List[str]] = None,  # Deprecated
        query_string: Optional[str] = None,
    ) -> List[Document]:
        """Load data from the folder id or file ids.

        Args:
            folder_id: Folder id of the folder in google drive.
            file_ids: File ids of the files in google drive.
            mime_types: The mimeTypes you want to allow e.g.: "application/vnd.google-apps.document"
            query_string: A more generic query string to filter the documents, e.g. "name contains 'test'".
                It gives more flexibility to filter the documents. More info: https://developers.google.com/drive/api/v3/search-files

        Returns:
            List[Document]: A list of documents.
        """
        self._creds, self._drive = self._get_credentials()

        if folder_id:
            return self._load_from_folder(folder_id, mime_types, query_string)
        elif file_ids:
            return self._load_from_file_ids(file_ids, mime_types, query_string)
        else:
            logger.warning("Either 'folder_id' or 'file_ids' must be provided.")
            return []

load_data #

load_data(folder_id: Optional[str] = None, file_ids: Optional[List[str]] = None, mime_types: Optional[List[str]] = None, query_string: Optional[str] = None) -> List[Document]

Load data from the folder id or file ids.

Parameters:

Name Type Description Default
folder_id Optional[str]

Folder id of the folder in google drive.

None
file_ids Optional[List[str]]

File ids of the files in google drive.

None
mime_types Optional[List[str]]

The mimeTypes you want to allow e.g.: "application/vnd.google-apps.document"

None
query_string Optional[str]

A more generic query string to filter the documents, e.g. "name contains 'test'". It gives more flexibility to filter the documents. More info: https://developers.google.com/drive/api/v3/search-files

None

Returns:

Type Description
List[Document]

List[Document]: A list of documents.

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/drive/base.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def load_data(
    self,
    folder_id: Optional[str] = None,
    file_ids: Optional[List[str]] = None,
    mime_types: Optional[List[str]] = None,  # Deprecated
    query_string: Optional[str] = None,
) -> List[Document]:
    """Load data from the folder id or file ids.

    Args:
        folder_id: Folder id of the folder in google drive.
        file_ids: File ids of the files in google drive.
        mime_types: The mimeTypes you want to allow e.g.: "application/vnd.google-apps.document"
        query_string: A more generic query string to filter the documents, e.g. "name contains 'test'".
            It gives more flexibility to filter the documents. More info: https://developers.google.com/drive/api/v3/search-files

    Returns:
        List[Document]: A list of documents.
    """
    self._creds, self._drive = self._get_credentials()

    if folder_id:
        return self._load_from_folder(folder_id, mime_types, query_string)
    elif file_ids:
        return self._load_from_file_ids(file_ids, mime_types, query_string)
    else:
        logger.warning("Either 'folder_id' or 'file_ids' must be provided.")
        return []

GoogleKeepReader #

Bases: BaseReader

Google Keep reader.

Reads notes from Google Keep

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/keep/base.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class GoogleKeepReader(BaseReader):
    """Google Keep reader.

    Reads notes from Google Keep

    """

    def load_data(self, document_ids: List[str]) -> List[Document]:
        """Load data from the document_ids.

        Args:
            document_ids (List[str]): a list of note ids.
        """
        keep = self._get_keep()

        if document_ids is None:
            raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

        results = []
        for note_id in document_ids:
            note = keep.get(note_id)
            if note is None:
                raise ValueError(f"Note with id {note_id} not found.")
            text = f"Title: {note.title}\nContent: {note.text}"
            results.append(Document(text=text, extra_info={"note_id": note_id}))
        return results

    def load_all_notes(self) -> List[Document]:
        """Load all notes from Google Keep."""
        keep = self._get_keep()

        notes = keep.all()
        results = []
        for note in notes:
            text = f"Title: {note.title}\nContent: {note.text}"
            results.append(Document(text=text, extra_info={"note_id": note.id}))
        return results

    def _get_keep(self) -> Any:
        import gkeepapi

        """Get a Google Keep object with login."""
        # Read username and password from keep_credentials.json
        if os.path.exists("keep_credentials.json"):
            with open("keep_credentials.json") as f:
                credentials = json.load(f)
        else:
            raise RuntimeError("Failed to load keep_credentials.json.")

        keep = gkeepapi.Keep()

        success = keep.login(credentials["username"], credentials["password"])
        if not success:
            raise RuntimeError("Failed to login to Google Keep.")

        return keep

load_data #

load_data(document_ids: List[str]) -> List[Document]

Load data from the document_ids.

Parameters:

Name Type Description Default
document_ids List[str]

a list of note ids.

required
Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/keep/base.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def load_data(self, document_ids: List[str]) -> List[Document]:
    """Load data from the document_ids.

    Args:
        document_ids (List[str]): a list of note ids.
    """
    keep = self._get_keep()

    if document_ids is None:
        raise ValueError('Must specify a "document_ids" in `load_kwargs`.')

    results = []
    for note_id in document_ids:
        note = keep.get(note_id)
        if note is None:
            raise ValueError(f"Note with id {note_id} not found.")
        text = f"Title: {note.title}\nContent: {note.text}"
        results.append(Document(text=text, extra_info={"note_id": note_id}))
    return results

load_all_notes #

load_all_notes() -> List[Document]

Load all notes from Google Keep.

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/keep/base.py
38
39
40
41
42
43
44
45
46
47
def load_all_notes(self) -> List[Document]:
    """Load all notes from Google Keep."""
    keep = self._get_keep()

    notes = keep.all()
    results = []
    for note in notes:
        text = f"Title: {note.title}\nContent: {note.text}"
        results.append(Document(text=text, extra_info={"note_id": note.id}))
    return results

GoogleSheetsReader #

Bases: BasePydanticReader

Google Sheets reader.

Reads a sheet as TSV from Google Sheets

Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/sheets/base.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class GoogleSheetsReader(BasePydanticReader):
    """Google Sheets reader.

    Reads a sheet as TSV from Google Sheets

    """

    is_remote: bool = True

    def __init__(self) -> None:
        """Initialize with parameters."""
        try:
            import google  # noqa
            import google_auth_oauthlib  # noqa
            import googleapiclient  # noqa
        except ImportError:
            raise ImportError(
                "`google_auth_oauthlib`, `googleapiclient` and `google` "
                "must be installed to use the GoogleSheetsReader.\n"
                "Please run `pip install --upgrade google-api-python-client "
                "google-auth-httplib2 google-auth-oauthlib`."
            )

    @classmethod
    def class_name(cls) -> str:
        return "GoogleSheetsReader"

    def load_data(self, spreadsheet_ids: List[str]) -> List[Document]:
        """Load data from the input directory.

        Args:
            spreadsheet_ids (List[str]): a list of document ids.
        """
        if spreadsheet_ids is None:
            raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

        results = []
        for spreadsheet_id in spreadsheet_ids:
            sheet = self._load_sheet(spreadsheet_id)
            results.append(
                Document(
                    id_=spreadsheet_id,
                    text=sheet,
                    metadata={"spreadsheet_id": spreadsheet_id},
                )
            )
        return results

    def load_data_in_pandas(self, spreadsheet_ids: List[str]) -> List[pd.DataFrame]:
        """Load data from the input directory.

        Args:
            spreadsheet_ids (List[str]): a list of document ids.
        """
        if spreadsheet_ids is None:
            raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

        results = []
        for spreadsheet_id in spreadsheet_ids:
            dataframes = self._load_sheet_in_pandas(spreadsheet_id)
            results.extend(dataframes)
        return results

    def _load_sheet(self, spreadsheet_id: str) -> str:
        """Load a sheet from Google Sheets.

        Args:
            spreadsheet_id: the sheet id.

        Returns:
            The sheet data.
        """
        credentials = self._get_credentials()
        sheets_service = discovery.build("sheets", "v4", credentials=credentials)
        spreadsheet_data = (
            sheets_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
        )
        sheets = spreadsheet_data.get("sheets")
        sheet_text = ""

        for sheet in sheets:
            properties = sheet.get("properties")
            title = properties.get("title")
            sheet_text += title + "\n"
            grid_props = properties.get("gridProperties")
            rows = grid_props.get("rowCount")
            cols = grid_props.get("columnCount")
            range_pattern = f"R1C1:R{rows}C{cols}"
            response = (
                sheets_service.spreadsheets()
                .values()
                .get(spreadsheetId=spreadsheet_id, range=range_pattern)
                .execute()
            )
            sheet_text += (
                "\n".join("\t".join(row) for row in response.get("values", [])) + "\n"
            )
        return sheet_text

    def _load_sheet_in_pandas(self, spreadsheet_id: str) -> List[pd.DataFrame]:
        """Load a sheet from Google Sheets.

        Args:
            spreadsheet_id: the sheet id.
            sheet_name: the sheet name.

        Returns:
            The sheet data.
        """
        credentials = self._get_credentials()
        sheets_service = discovery.build("sheets", "v4", credentials=credentials)
        sheet = sheets_service.spreadsheets()
        spreadsheet_data = sheet.get(spreadsheetId=spreadsheet_id).execute()
        sheets = spreadsheet_data.get("sheets")
        dataframes = []
        for sheet in sheets:
            properties = sheet.get("properties")
            title = properties.get("title")
            grid_props = properties.get("gridProperties")
            rows = grid_props.get("rowCount")
            cols = grid_props.get("columnCount")
            range_pattern = f"{title}!R1C1:R{rows}C{cols}"
            response = (
                sheets_service.spreadsheets()
                .values()
                .get(spreadsheetId=spreadsheet_id, range=range_pattern)
                .execute()
            )
            values = response.get("values", [])
            if not values:
                print(f"No data found in {title}")
            else:
                df = pd.DataFrame(values[1:], columns=values[0])
                dataframes.append(df)
        return dataframes

    def _get_credentials(self) -> Any:
        """Get valid user credentials from storage.

        The file token.json stores the user's access and refresh tokens, and is
        created automatically when the authorization flow completes for the first
        time.

        Returns:
            Credentials, the obtained credential.
        """
        creds = None
        if os.path.exists("token.json"):
            creds = Credentials.from_authorized_user_file("token.json", SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    "credentials.json", SCOPES
                )
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open("token.json", "w") as token:
                token.write(creds.to_json())

        return creds

load_data #

load_data(spreadsheet_ids: List[str]) -> List[Document]

Load data from the input directory.

Parameters:

Name Type Description Default
spreadsheet_ids List[str]

a list of document ids.

required
Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/sheets/base.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def load_data(self, spreadsheet_ids: List[str]) -> List[Document]:
    """Load data from the input directory.

    Args:
        spreadsheet_ids (List[str]): a list of document ids.
    """
    if spreadsheet_ids is None:
        raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

    results = []
    for spreadsheet_id in spreadsheet_ids:
        sheet = self._load_sheet(spreadsheet_id)
        results.append(
            Document(
                id_=spreadsheet_id,
                text=sheet,
                metadata={"spreadsheet_id": spreadsheet_id},
            )
        )
    return results

load_data_in_pandas #

load_data_in_pandas(spreadsheet_ids: List[str]) -> List[DataFrame]

Load data from the input directory.

Parameters:

Name Type Description Default
spreadsheet_ids List[str]

a list of document ids.

required
Source code in llama-index-integrations/readers/llama-index-readers-google/llama_index/readers/google/sheets/base.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def load_data_in_pandas(self, spreadsheet_ids: List[str]) -> List[pd.DataFrame]:
    """Load data from the input directory.

    Args:
        spreadsheet_ids (List[str]): a list of document ids.
    """
    if spreadsheet_ids is None:
        raise ValueError('Must specify a "spreadsheet_ids" in `load_kwargs`.')

    results = []
    for spreadsheet_id in spreadsheet_ids:
        dataframes = self._load_sheet_in_pandas(spreadsheet_id)
        results.extend(dataframes)
    return results