gmailから本文を抽出しカレンダー書き込み
PDFからはできるようになったので
次はGmailから本文を抽出しテキストとして取り出すようにする
以前、gmailの未読の最新1件を取得し
本文とURLを抽出するものを作成したので
このソースを変更し
本文を抽出するようにする
なお、実行前にラベルIDを調べる必要がある
Schoolというラベルを作成し
このIDを調べる
1 2 | cd .. /mail_auto python base,py |
を実行し
結果の中から
Schoolのものを探す
コードは
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | from __future__ import print_function import os.path from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials # If modifying these scopes, delete the file token.json. import base64 , email #デコード用 import dateutil.parser #token.jsonを設定 tokenPath = "token.json" #credentials.jsonを設定 credentialsPath = "credentials.json" #メール本文のデコード def decode(encoded): decoded = base64 .urlsafe_b64decode(encoded).decode() return decoded #メール本文の内容を配列で取得する関数 def gmail_get_messages_body(service, labelIdsValue): mailBody = [] # メッセージの一覧を取得 messages = service. users ().messages() msg_list = messages.list(userId= 'me' , labelIds=labelIdsValue).execute() # msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定 # 取得したメッセージの一覧を配列に格納 for msg in msg_list[ 'messages' ]: #メールの受信日時を取得 date = gmail_get_messages_body_date(messages,msg) topid = msg[ 'id' ] msg = messages.get(userId= 'me' , id =topid).execute() if (msg[ "payload" ][ "body" ][ "size" ]!=0): mailBody.append( date + "<br>" +decode(msg[ "payload" ][ "body" ][ "data" ])) else : #メールによっては"parts"属性の中に本文がある場合もある mailBody.append( date + "<br>" +decode(msg[ "payload" ][ "parts" ][0][ "body" ][ "data" ])) return mailBody #gmail_get_messages_body関数内で受信日時を取得する関数 def gmail_get_messages_body_date(messages,msg): msg_id = msg[ 'id' ] m = messages.get(userId= 'me' , id =msg_id, format = 'raw' ).execute() raw = base64 .urlsafe_b64decode(m[ 'raw' ]) # Emailを解析する eml = email.message_from_bytes(raw) date = dateutil.parser.parse(eml.get( 'Date' )).strftime( "%Y-%m-%d_%H-%M-%S" ) return date #ラベルのIDやnameを表示する関数 def gmail_display_label(service): results = service. users ().labels().list(userId= 'me' ).execute() labels = results.get( 'labels' , []) if not labels: print( 'No labels found.' ) else : print( 'Labels:' ) for label in labels: print(label) #初期化(from quickstart.py) def gmail_init(): creds = None # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. if os.path.exists(tokenPath): creds = Credentials.from_authorized_user_file(tokenPath, SCOPES) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else : flow = InstalledAppFlow.from_client_secrets_file( credentialsPath, SCOPES) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open (tokenPath, 'w' ) as token: token.write(creds.to_json()) service = build( 'gmail' , 'v1' , credentials=creds) return service #ここからやりたい処理を書く service = gmail_init() #quickstart.pyと同じ処理にしてみた results = service. users ().labels().list(userId= 'me' ).execute() labels = results.get( 'labels' , []) if not labels: print( 'No labels found.' ) else : print( 'Labels:' ) for label in labels: print(label[ 'name' ]) gmail_display_label(service) |
grep を使い
1 2 3 4 5 | python base,py| grep School で探す School { 'id' : '' , 'name' : 'School' , 'type' : 'user' } |
となったので
あとはこれを元にコードを書き換えていく
1 | cp .. /mail_voice/gmail_utils .py . |
でコードをコピー
次にこのコードをURLを抽出せず本文のみ抽出するようにコード変更
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | # gmail_utils.py import os.path from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import dateutil.parser # スコープの設定 def decode(encoded): decoded_bytes = base64 .urlsafe_b64decode(encoded.encode( 'ASCII' )) decoded_message = decoded_bytes.decode( 'utf-8' ) return decoded_message def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service. users ().messages() msg_list = messages.list(userId= 'me' , labelIds=[labelIdsValue], q= "is:unread" , maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found." msg = msg_list[ 'messages' ][0] msg_id = msg[ 'id' ] msg = messages.get(userId= 'me' , id =msg_id, format = 'full' ).execute() body = "" if 'parts' in msg[ 'payload' ]: for part in msg[ 'payload' ][ 'parts' ]: if part[ 'mimeType' ] == 'text/plain' and part[ 'body' ][ 'size' ] > 0: body = decode(part[ 'body' ][ 'data' ]) break else : body = decode(msg[ 'payload' ][ 'body' ][ 'data' ]) return body # 本文のみを返す def gmail_get_messages_body_date(msg): headers = msg[ 'payload' ][ 'headers' ] date_header = next(header[ 'value' ] for header in headers if header[ 'name' ].lower() == 'date' ) date = dateutil.parser.parse(date_header).strftime( "%Y-%m-%d %H:%M:%S" ) return date def gmail_init(): creds = None if os.path.exists( 'token.json' ): creds = Credentials.from_authorized_user_file( 'token.json' , SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else : flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json' , SCOPES) creds = flow.run_local_server(port=0) with open ( 'token.json' , 'w' ) as token: token.write(creds.to_json()) service = build( 'gmail' , 'v1' , credentials=creds) return service |
次に main4.pyは Google Drive からPDFを取得し
抽出する機能だったので
新たに main5.pyを作成し
Gmail未読のものを対象にするようにしてみる
1 | touch main5.py |
GPTで生成したコードからラベルIDを変更する
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | # main2.py from gmail_utils import gmail_init, gmail_get_latest_unread_message_body from ollama_module import parse_text_with_ollama # Gmail APIの初期化 service = gmail_init() # 最新の未読メッセージの本文を取得 label_id = '' # 取得するラベルID message_body = gmail_get_latest_unread_message_body(service, label_id) if message_body != "No unread messages found." : print( "メール本文:" ) print(message_body) # Ollamaでメール本文を解析 events = parse_text_with_ollama(message_body, model_name= 'elyza:jp8b' ) # 抽出されたイベントを表示 print( "抽出されたイベント:" ) for event in events: print(event) else : print( "未読メッセージが見つかりませんでした。" ) |
次にGmail でSchoolラベルの中から予定があるメールを試しに1つ未読にして実験する
結果
1 2 3 4 5 6 7 8 9 10 11 12 | Traceback (most recent call last): File "/Users/snowpool/aw10s/week_calendar_voice/main5.py" , line 6, in <module> service = gmail_init() File "/Users/snowpool/aw10s/week_calendar_voice/gmail_utils.py" , line 52, in gmail_init creds.refresh(Request()) File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py" , line 335, in refresh ) = reauth.refresh_grant( File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py" , line 351, in refresh_grant _client._handle_error_response(response_data, retryable_error) File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py" , line 73, in _handle_error_response raise exceptions.RefreshError( google.auth.exceptions.RefreshError: ( 'invalid_scope: Bad Request' , { 'error' : 'invalid_scope' , 'error_description' : 'Bad Request' }) |
となる
これはtoken.jsonがあってないため
なので以前gmail関連のプロジェクトを作成したので
そちらを使うようにコード変更
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | # gmail_utils.py import os.path from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials import base64 import dateutil.parser # スコープの設定 def decode(encoded): decoded_bytes = base64 .urlsafe_b64decode(encoded.encode( 'ASCII' )) decoded_message = decoded_bytes.decode( 'utf-8' ) return decoded_message def gmail_get_latest_unread_message_body(service, labelIdsValue): messages = service. users ().messages() msg_list = messages.list(userId= 'me' , labelIds=[labelIdsValue], q= "is:unread" , maxResults=1).execute() if 'messages' not in msg_list: return "No unread messages found." msg = msg_list[ 'messages' ][0] msg_id = msg[ 'id' ] msg = messages.get(userId= 'me' , id =msg_id, format = 'full' ).execute() body = "" if 'parts' in msg[ 'payload' ]: for part in msg[ 'payload' ][ 'parts' ]: if part[ 'mimeType' ] == 'text/plain' and part[ 'body' ][ 'size' ] > 0: body = decode(part[ 'body' ][ 'data' ]) break else : body = decode(msg[ 'payload' ][ 'body' ][ 'data' ]) return body # 本文のみを返す def gmail_get_messages_body_date(msg): headers = msg[ 'payload' ][ 'headers' ] date_header = next(header[ 'value' ] for header in headers if header[ 'name' ].lower() == 'date' ) date = dateutil.parser.parse(date_header).strftime( "%Y-%m-%d %H:%M:%S" ) return date def gmail_init(): creds = None token_path = '../mail_auto/token.json' # token.jsonのパスを指定 if os.path.exists(token_path): creds = Credentials.from_authorized_user_file(token_path, SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else : flow = InstalledAppFlow.from_client_secrets_file( 'credentials.json' , SCOPES) creds = flow.run_local_server(port=0) with open ( 'token.json' , 'w' ) as token: token.write(creds.to_json()) service = build( 'gmail' , 'v1' , credentials=creds) return service |
再度実行するとイベントの抽出ができた
次にこれをカレンダーに書き込むようにする