# # Copyright 2018 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Setup : https://developers.google.com/gmail/api/quickstart/python # Need to have credentials.json were you call main script # # File imported from https://github.com/googleworkspace/python-samples/blob/main/gmail/quickstart/quickstart.py # from __future__ import print_function import os.path from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build # If modifying these scopes, delete the file token.json. SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] creds = None def _initCreds(): global creds if creds: return # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('./token.json', SCOPES) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open('token.json', 'w') as token: token.write(creds.to_json()) def getUnreadMails(): """ Get number of unread threads (that may contain multiple messages) """ _initCreds() service = build('gmail', 'v1', credentials=creds) pageToken = '' threads = set() while True: results = service.users().messages().list(userId='me', labelIds=['UNREAD'],\ includeSpamTrash=False, pageToken=pageToken)\ .execute() threads = threads.union(set([k['threadId'] for k in results['messages']])) # Loop over all result pages (100 results per page by default) pageToken = results.get('nextPageToken', '') if not pageToken: break return len(threads)