pythonでGmailの最新、または未読の本文を取得する

最新のものだけ取得するようにする

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# 最新のメール本文を取得する関数
def gmail_get_latest_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, maxResults=1).execute()

    if not msg_list['messages']:
        return "No messages found."

    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    if msg["payload"]["body"]["size"] != 0:
        return date + "<br>" + decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                return date + "<br>" + decode(part["body"]["data"])
    return date + "<br> No body content"

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して最新のメール本文を取得
latest_message_body = gmail_get_latest_message_body(service, "Label_4")
print(latest_message_body)

これで最新の1件のメールのみ取得できるようになる

次に未読のものだけ取得するようにする

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# 未読メールの本文を取得する関数
def gmail_get_unread_messages_body(service, labelIdsValue):
    mailBody = []
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread").execute()

    if 'messages' not in msg_list:
        return ["No unread messages found."]

    for msg in msg_list['messages']:
        date = gmail_get_messages_body_date(messages, msg)
        topid = msg['id']
        msg = messages.get(userId='me', id=topid).execute()

        # メールの本文を取得
        if msg["payload"]["body"]["size"] != 0:
            mailBody.append(date + "<br>" + decode(msg["payload"]["body"]["data"]))
        elif 'parts' in msg["payload"]:
            # メール本文が parts 属性にある場合
            for part in msg["payload"]["parts"]:
                if part["body"]["size"] != 0:
                    mailBody.append(date + "<br>" + decode(part["body"]["data"]))
                    break
        else:
            mailBody.append(date + "<br> No body content")

    return mailBody

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して未読メールの本文を取得
unread_messages_body = gmail_get_unread_messages_body(service, "Label_4")
for body in unread_messages_body:
    print(body)

未読のメールのみを表示するようにコードを修正するには
Gmail APIのクエリパラメータに q=”is:unread” を追加する

q=”is:unread” パラメータを messages.list
API呼び出しに追加することで、未読のメールのみがフィルタリングされる

取得したメールの中で
decode 関数を使用してメールの本文をデコードし
必要に応じて parts 属性を確認

未読メールがない場合
“No unread messages found.”
というメッセージが返され
未読メールがある場合は
それぞれのメールについて受信日時と本文が表示される

Label_4
に属する未読メールのみを処理しているので
他のラベルや追加のフィルタリング条件を使用したい場合は
labelIdsValue 引数や q パラメータを適宜変更する

この場合
過去全てからになってしまうので期間を設定する

未読で最新のものを取得するようにする
そしてURLを本文から削除

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 未読で最新のメール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread").execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 未読メッセージのリストを取得し、最初のメッセージ(最新)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()


    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して未読メールの本文を取得
latest_unread_message_body  = gmail_get_latest_unread_message_body(service, "Label_4")
print(latest_unread_message_body)


しかしこれだと本文の中のURLのみ削除しているため
発信元とかについては削除されていないので
これも対処する必要がある

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です