gmailから本文を抽出しカレンダー書き込み
PDFからはできるようになったので
次はGmailから本文を抽出しテキストとして取り出すようにする
以前、gmailの未読の最新1件を取得し
本文とURLを抽出するものを作成したので
このソースを変更し
本文を抽出するようにする
なお、実行前にラベルIDを調べる必要がある
Schoolというラベルを作成し
このIDを調べる
cd ../mail_auto
python base,py
を実行し
結果の中から
Schoolのものを探す
コードは
from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
import base64, email #デコード用
import dateutil.parser
#token.jsonを設定
tokenPath = "token.json"
#credentials.jsonを設定
credentialsPath = "credentials.json"
#メール本文のデコード
def decode(encoded):
decoded = base64.urlsafe_b64decode(encoded).decode()
return decoded
#メール本文の内容を配列で取得する関数
def gmail_get_messages_body(service, labelIdsValue):
mailBody = []
# メッセージの一覧を取得
messages = service.users().messages()
msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute()
# msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定
# 取得したメッセージの一覧を配列に格納
for msg in msg_list['messages']:
#メールの受信日時を取得
date = gmail_get_messages_body_date(messages,msg)
topid = msg['id']
msg = messages.get(userId='me', id=topid).execute()
if(msg["payload"]["body"]["size"]!=0):
mailBody.append(date+"<br>"+decode(msg["payload"]["body"]["data"]))
else:
#メールによっては"parts"属性の中に本文がある場合もある
mailBody.append(date+"<br>"+decode(msg["payload"]["parts"][0]["body"]["data"]))
return mailBody
#gmail_get_messages_body関数内で受信日時を取得する関数
def gmail_get_messages_body_date(messages,msg):
msg_id = msg['id']
m = messages.get(userId='me', id=msg_id, format='raw').execute()
raw = base64.urlsafe_b64decode(m['raw'])
# Emailを解析する
eml = email.message_from_bytes(raw)
date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
return date
#ラベルのIDやnameを表示する関数
def gmail_display_label(service):
results = service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])
if not labels:
print('No labels found.')
else:
print('Labels:')
for label in labels:
print(label)
#初期化(from quickstart.py)
def gmail_init():
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(tokenPath):
creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
credentialsPath, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(tokenPath, 'w') as token:
token.write(creds.to_json())
service = build('gmail', 'v1', credentials=creds)
return service
#ここからやりたい処理を書く
service = gmail_init()
#quickstart.pyと同じ処理にしてみた
results = service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])
if not labels:
print('No labels found.')
else:
print('Labels:')
for label in labels:
print(label['name'])
gmail_display_label(service)
grep を使い
python base,py| grep School
で探す
School
{'id': '', 'name': 'School', 'type': 'user'}
となったので
あとはこれを元にコードを書き換えていく
cp ../mail_voice/gmail_utils.py .
でコードをコピー
次にこのコードをURLを抽出せず本文のみ抽出するようにコード変更
# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser
# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
def decode(encoded):
decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
decoded_message = decoded_bytes.decode('utf-8')
return decoded_message
def gmail_get_latest_unread_message_body(service, labelIdsValue):
messages = service.users().messages()
msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()
if 'messages' not in msg_list:
return "No unread messages found."
msg = msg_list['messages'][0]
msg_id = msg['id']
msg = messages.get(userId='me', id=msg_id, format='full').execute()
body = ""
if 'parts' in msg['payload']:
for part in msg['payload']['parts']:
if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
body = decode(part['body']['data'])
break
else:
body = decode(msg['payload']['body']['data'])
return body # 本文のみを返す
def gmail_get_messages_body_date(msg):
headers = msg['payload']['headers']
date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
return date
def gmail_init():
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as token:
token.write(creds.to_json())
service = build('gmail', 'v1', credentials=creds)
return service
次に main4.pyは Google Drive からPDFを取得し
抽出する機能だったので
新たに main5.pyを作成し
Gmail未読のものを対象にするようにしてみる
touch main5.py
GPTで生成したコードからラベルIDを変更する
# main2.py
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from ollama_module import parse_text_with_ollama
# Gmail APIの初期化
service = gmail_init()
# 最新の未読メッセージの本文を取得
label_id = '' # 取得するラベルID
message_body = gmail_get_latest_unread_message_body(service, label_id)
if message_body != "No unread messages found.":
print("メール本文:")
print(message_body)
# Ollamaでメール本文を解析
events = parse_text_with_ollama(message_body, model_name='elyza:jp8b')
# 抽出されたイベントを表示
print("抽出されたイベント:")
for event in events:
print(event)
else:
print("未読メッセージが見つかりませんでした。")
次にGmail でSchoolラベルの中から予定があるメールを試しに1つ未読にして実験する
結果
Traceback (most recent call last):
File "/Users/snowpool/aw10s/week_calendar_voice/main5.py", line 6, in <module>
service = gmail_init()
File "/Users/snowpool/aw10s/week_calendar_voice/gmail_utils.py", line 52, in gmail_init
creds.refresh(Request())
File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
) = reauth.refresh_grant(
File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
_client._handle_error_response(response_data, retryable_error)
File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})
となる
これはtoken.jsonがあってないため
なので以前gmail関連のプロジェクトを作成したので
そちらを使うようにコード変更
# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser
# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
def decode(encoded):
decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
decoded_message = decoded_bytes.decode('utf-8')
return decoded_message
def gmail_get_latest_unread_message_body(service, labelIdsValue):
messages = service.users().messages()
msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()
if 'messages' not in msg_list:
return "No unread messages found."
msg = msg_list['messages'][0]
msg_id = msg['id']
msg = messages.get(userId='me', id=msg_id, format='full').execute()
body = ""
if 'parts' in msg['payload']:
for part in msg['payload']['parts']:
if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
body = decode(part['body']['data'])
break
else:
body = decode(msg['payload']['body']['data'])
return body # 本文のみを返す
def gmail_get_messages_body_date(msg):
headers = msg['payload']['headers']
date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
return date
def gmail_init():
creds = None
token_path = '../mail_auto/token.json' # token.jsonのパスを指定
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as token:
token.write(creds.to_json())
service = build('gmail', 'v1', credentials=creds)
return service
再度実行するとイベントの抽出ができた
次にこれをカレンダーに書き込むようにする