gmailから本文を抽出しカレンダー書き込み
PDFからはできるようになったので
次はGmailから本文を抽出しテキストとして取り出すようにする
以前、gmailの未読の最新1件を取得し
本文とURLを抽出するものを作成したので
このソースを変更し
本文を抽出するようにする
なお、実行前にラベルIDを調べる必要がある
Schoolというラベルを作成し
このIDを調べる
cd ../mail_auto python base,py
を実行し
結果の中から
Schoolのものを探す
コードは
from __future__ import print_function import os.path from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials # If modifying these scopes, delete the file token.json. SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] import base64, email #デコード用 import dateutil.parser #token.jsonを設定 tokenPath = "token.json" #credentials.jsonを設定 credentialsPath = "credentials.json" #メール本文のデコード def decode(encoded): decoded = base64.urlsafe_b64decode(encoded).decode() return decoded #メール本文の内容を配列で取得する関数 def gmail_get_messages_body(service, labelIdsValue): mailBody = [] # メッセージの一覧を取得 messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute() # msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定 # 取得したメッセージの一覧を配列に格納 for msg in msg_list['messages']: #メールの受信日時を取得 date = gmail_get_messages_body_date(messages,msg) topid = msg['id'] msg = messages.get(userId='me', id=topid).execute() if(msg["payload"]["body"]["size"]!=0): mailBody.append(date+"<br>"+decode(msg["payload"]["body"]["data"])) else: #メールによっては"parts"属性の中に本文がある場合もある mailBody.append(date+"<br>"+decode(msg["payload"]["parts"][0]["body"]["data"])) return mailBody #gmail_get_messages_body関数内で受信日時を取得する関数 def gmail_get_messages_body_date(messages,msg): msg_id = msg['id'] m = messages.get(userId='me', id=msg_id, format='raw').execute() raw = base64.urlsafe_b64decode(m['raw']) # Emailを解析する eml = email.message_from_bytes(raw) date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S") return date #ラベルのIDやnameを表示する関数 def gmail_display_label(service): results = service.users().labels().list(userId='me').execute() labels = results.get('labels', []) if not labels: print('No labels found.') else: print('Labels:') for label in labels: print(label) #初期化(from quickstart.py) def gmail_init(): creds = None # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. if os.path.exists(tokenPath): creds = Credentials.from_authorized_user_file(tokenPath, SCOPES) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( credentialsPath, SCOPES) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open(tokenPath, 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service #ここからやりたい処理を書く service = gmail_init() #quickstart.pyと同じ処理にしてみた results = service.users().labels().list(userId='me').execute() labels = results.get('labels', []) if not labels: print('No labels found.') else: print('Labels:') for label in labels: print(label['name']) gmail_display_label(service)
grep を使い
python base,py| grep School で探す School {'id': '', 'name': 'School', 'type': 'user'}
となったので
あとはこれを元にコードを書き換えていく
cp ../mail_voice/gmail_utils.py .
でコードをコピー
次にこのコードをURLを抽出せず本文のみ抽出するようにコード変更
# gmail_utils.py import os.path from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import dateutil.parser # スコープの設定 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def decode(encoded): decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII')) decoded_message = decoded_bytes.decode('utf-8') return decoded_message def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found." msg = msg_list['messages'][0] msg_id = msg['id'] msg = messages.get(userId='me', id=msg_id, format='full').execute() body = "" if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['mimeType'] == 'text/plain' and part['body']['size'] > 0: body = decode(part['body']['data']) break else: body = decode(msg['payload']['body']['data']) return body # 本文のみを返す def gmail_get_messages_body_date(msg): headers = msg['payload']['headers'] date_header = next(header['value'] for header in headers if header['name'].lower() == 'date') date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S") return date def gmail_init(): creds = None if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('token.json', SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) with open('token.json', 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service
次に main4.pyは Google Drive からPDFを取得し
抽出する機能だったので
新たに main5.pyを作成し
Gmail未読のものを対象にするようにしてみる
touch main5.py
GPTで生成したコードからラベルIDを変更する
# main2.py from gmail_utils import gmail_init, gmail_get_latest_unread_message_body from ollama_module import parse_text_with_ollama # Gmail APIの初期化 service = gmail_init() # 最新の未読メッセージの本文を取得 label_id = '' # 取得するラベルID message_body = gmail_get_latest_unread_message_body(service, label_id) if message_body != "No unread messages found.": print("メール本文:") print(message_body) # Ollamaでメール本文を解析 events = parse_text_with_ollama(message_body, model_name='elyza:jp8b') # 抽出されたイベントを表示 print("抽出されたイベント:") for event in events: print(event) else: print("未読メッセージが見つかりませんでした。")
次にGmail でSchoolラベルの中から予定があるメールを試しに1つ未読にして実験する
結果
Traceback (most recent call last): File "/Users/snowpool/aw10s/week_calendar_voice/main5.py", line 6, in <module> service = gmail_init() File "/Users/snowpool/aw10s/week_calendar_voice/gmail_utils.py", line 52, in gmail_init creds.refresh(Request()) File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh ) = reauth.refresh_grant( File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant _client._handle_error_response(response_data, retryable_error) File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response raise exceptions.RefreshError( google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})
となる
これはtoken.jsonがあってないため
なので以前gmail関連のプロジェクトを作成したので
そちらを使うようにコード変更
# gmail_utils.py import os.path from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import dateutil.parser # スコープの設定 SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] def decode(encoded): decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII')) decoded_message = decoded_bytes.decode('utf-8') return decoded_message def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service.users().messages() msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found." msg = msg_list['messages'][0] msg_id = msg['id'] msg = messages.get(userId='me', id=msg_id, format='full').execute() body = "" if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part['mimeType'] == 'text/plain' and part['body']['size'] > 0: body = decode(part['body']['data']) break else: body = decode(msg['payload']['body']['data']) return body # 本文のみを返す def gmail_get_messages_body_date(msg): headers = msg['payload']['headers'] date_header = next(header['value'] for header in headers if header['name'].lower() == 'date') date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S") return date def gmail_init(): creds = None token_path = '../mail_auto/token.json' # token.jsonのパスを指定 if os.path.exists(token_path): creds = Credentials.from_authorized_user_file(token_path, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json', SCOPES) creds = flow.run_local_server(port=0) with open('token.json', 'w') as token: token.write(creds.to_json()) service = build('gmail', 'v1', credentials=creds) return service
再度実行するとイベントの抽出ができた
次にこれをカレンダーに書き込むようにする