gmailから本文を抽出しカレンダー書き込み

gmailから本文を抽出しカレンダー書き込み

PDFからはできるようになったので
次はGmailから本文を抽出しテキストとして取り出すようにする

以前、gmailの未読の最新1件を取得し
本文とURLを抽出するものを作成したので
このソースを変更し
本文を抽出するようにする

なお、実行前にラベルIDを調べる必要がある
Schoolというラベルを作成し
このIDを調べる

1
2
cd ../mail_auto
python base,py

を実行し
結果の中から
Schoolのものを探す

コードは

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
 
# If modifying these scopes, delete the file token.json.
 
import base64, email #デコード用
import dateutil.parser
 
#token.jsonを設定
tokenPath = "token.json"
 
#credentials.jsonを設定
credentialsPath = "credentials.json"
 
 
#メール本文のデコード
def decode(encoded):
   decoded = base64.urlsafe_b64decode(encoded).decode()
   return decoded
 
 
#メール本文の内容を配列で取得する関数
def gmail_get_messages_body(service, labelIdsValue):
   mailBody = []
    
   # メッセージの一覧を取得
   messages = service.users().messages()
   msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute()
   # msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定
    
   # 取得したメッセージの一覧を配列に格納
   for msg in msg_list['messages']:
        
       #メールの受信日時を取得
       date = gmail_get_messages_body_date(messages,msg)
        
       topid = msg['id']    
       msg = messages.get(userId='me', id=topid).execute()
        
       if(msg["payload"]["body"]["size"]!=0):
           mailBody.append(date+"<br>"+decode(msg["payload"]["body"]["data"]))
       else:
           #メールによっては"parts"属性の中に本文がある場合もある
           mailBody.append(date+"<br>"+decode(msg["payload"]["parts"][0]["body"]["data"]))
 
   return mailBody
    
 
#gmail_get_messages_body関数内で受信日時を取得する関数
def gmail_get_messages_body_date(messages,msg):
   msg_id = msg['id']
   m = messages.get(userId='me', id=msg_id, format='raw').execute()
   raw = base64.urlsafe_b64decode(m['raw'])
    
   # Emailを解析する
   eml = email.message_from_bytes(raw)
    
   date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
   return date
 
#ラベルのIDやnameを表示する関数
def gmail_display_label(service):
   results = service.users().labels().list(userId='me').execute()
   labels = results.get('labels', [])
 
   if not labels:
       print('No labels found.')
   else:
       print('Labels:')
       for label in labels:
           print(label)
 
 
#初期化(from quickstart.py)
def gmail_init():
   creds = None
   # The file token.json stores the user's access and refresh tokens, and is
   # created automatically when the authorization flow completes for the first
   # time.
   if os.path.exists(tokenPath):
       creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
   # If there are no (valid) credentials available, let the user log in.
   if not creds or not creds.valid:
       if creds and creds.expired and creds.refresh_token:
           creds.refresh(Request())
       else:
           flow = InstalledAppFlow.from_client_secrets_file(
               credentialsPath, SCOPES)
           creds = flow.run_local_server(port=0)
       # Save the credentials for the next run
       with open(tokenPath, 'w') as token:
           token.write(creds.to_json())
            
   service = build('gmail', 'v1', credentials=creds)
   return service
 
 
#ここからやりたい処理を書く
service = gmail_init()
 
#quickstart.pyと同じ処理にしてみた
results = service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])
 
if not labels:
   print('No labels found.')
else:
   print('Labels:')
   for label in labels:
       print(label['name'])
        
gmail_display_label(service)

grep を使い

1
2
3
4
5
python base,py| grep School
で探す
 
School
{'id': '', 'name': 'School', 'type': 'user'}

となったので
あとはこれを元にコードを書き換えていく

1
cp ../mail_voice/gmail_utils.py .

でコードをコピー

次にこのコードをURLを抽出せず本文のみ抽出するようにコード変更

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser
 
# スコープの設定
 
def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message
 
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()
 
    if 'messages' not in msg_list:
        return "No unread messages found."
 
    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()
 
    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])
 
    return body  # 本文のみを返す
 
def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date
 
def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

次に main4.pyは Google Drive からPDFを取得し
抽出する機能だったので
新たに main5.pyを作成し
Gmail未読のものを対象にするようにしてみる

1
touch main5.py

GPTで生成したコードからラベルIDを変更する

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# main2.py
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from ollama_module import parse_text_with_ollama
 
# Gmail APIの初期化
service = gmail_init()
 
# 最新の未読メッセージの本文を取得
label_id = ''  # 取得するラベルID
message_body = gmail_get_latest_unread_message_body(service, label_id)
 
if message_body != "No unread messages found.":
    print("メール本文:")
    print(message_body)
     
    # Ollamaでメール本文を解析
    events = parse_text_with_ollama(message_body, model_name='elyza:jp8b')
     
    # 抽出されたイベントを表示
    print("抽出されたイベント:")
    for event in events:
        print(event)
else:
    print("未読メッセージが見つかりませんでした。")

次にGmail でSchoolラベルの中から予定があるメールを試しに1つ未読にして実験する

結果

1
2
3
4
5
6
7
8
9
10
11
12
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/main5.py", line 6, in <module>
    service = gmail_init()
  File "/Users/snowpool/aw10s/week_calendar_voice/gmail_utils.py", line 52, in gmail_init
    creds.refresh(Request())
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

となる

これはtoken.jsonがあってないため
なので以前gmail関連のプロジェクトを作成したので
そちらを使うようにコード変更

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser
 
# スコープの設定
 
def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message
 
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()
 
    if 'messages' not in msg_list:
        return "No unread messages found."
 
    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()
 
    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])
 
    return body  # 本文のみを返す
 
def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date
 
def gmail_init():
    creds = None
    token_path = '../mail_auto/token.json'  # token.jsonのパスを指定
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

再度実行するとイベントの抽出ができた

次にこれをカレンダーに書き込むようにする

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です