メール読み上げを設定ファイルからに変更

メール読み上げを設定ファイルからに変更

config.ini

を作成し

[Settings]
server_ip = 192.168.1.69:50021
label_id = GmailのラベルID
detection_interval = 1200
notification_sound = notice.wav
pdf_notification_sound = notice_pdf.wav

とする

configparser モジュールを使用して、INIファイルから設定を読みこむ

kao.py

import configparser

を追加

# configparserを使用してINIファイルを読み込む
config = configparser.ConfigParser()
config.read('config.ini')

# 設定を変数に格納
server_ip = config['Settings']['server_ip']
label_id = config['Settings']['label_id']
detection_interval = int(config['Settings']['detection_interval'])

        if lastTime is None or time.perf_counter() - lastTime > 1200:
を
        if lastTime is None or time.perf_counter() - lastTime > detection_interval:

というように設定値の定数へ変更

次にサーバーIPも設定ファイルから読み込むように変更する

email_processor,py

のコードを変更

import configparser

# 設定ファイルの読み込み
config = configparser.ConfigParser()
config.read('config.ini')
server_ip = config['Settings']['server_ip']
label_id = config['Settings']['label_id']
notification_sound = config['Settings']['notification_sound']
pdf_notification_sound = config['Settings']['pdf_notification_sound']

を追記

音声を生成する部部の

    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]
でサーバーIPの部分を
    command_json = [
        "curl", "-s", "-X", "POST",
        f"http://{server_ip}/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", f"http://{server_ip}/synthesis?speaker=1"
    ]

へ変更

また再生する音声を

        playsound('notice.wav')

から

playsound(notification_sound)

へ変更

                        playsound('notice_pdf.wav')

                        playsound(pdf_notification_sound)

へ変更

これで
Voicevox のdocker サーバーIP
お知らせの音声
顔認識してメールを読み上げる時間の指定
Gmailで読み上げるラベルの指定
を設定ファイルで行えるように変更完了

あとは
GithubへSSHでアップできるようにする

顔を認識したらメールを読み上げる

顔を認識したらメールを読み上げる

import fitz
from playsound import playsound
import subprocess
import pygame
import time

# テキストから音声を生成して再生する関数
def generate_and_play_audio_from_text(text):
    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)
    pygame.init()
    pygame.mixer.init()
    sound = pygame.mixer.Sound("audio_output.wav")
    sound.play()
    while pygame.mixer.get_busy():
        time.sleep(0.1)

# PDFファイルから文字数をカウントする関数
def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

# メールの処理を行う関数
def process_email(service, label_id):
    from gmail_utils import gmail_get_latest_unread_message_body
    from pdf_downloader import find_preview_link, download_pdf

    body, urls = gmail_get_latest_unread_message_body(service, label_id)
    if body:
        playsound('notice.wav')
        generate_and_play_audio_from_text(body)
        if urls:
            for url in urls:
                preview_url = find_preview_link(url)
                if preview_url:
                    download_pdf(preview_url, "downloaded_file.pdf")
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    if char_count >= 100:
                        playsound('notice_pdf.wav')
                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")
    else:
        print("未読メールはありません。")

というようにモジュールにして保存
ファイル名は

email_processor.py

とした

次に
Kao,pyで顔認識したらメールを読み上げるようにする

git clone https://github.com/Snowpooll/face_weather.git

で以前作成した顔認識したら天気を知らせるの中にあるものを

cp ../../face_weather/haarcascade_* .

でコピーして使う

import cv2
import time
from email_processor import process_email
from gmail_utils import gmail_init

# Haar Cascade分類器の読み込み
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

# Webカメラの設定
cap = cv2.VideoCapture(0)  # 0番目のカメラを使用する場合

# Gmailサービスの初期化
service = gmail_init()
label_id = "ラベルID"  # 例として特定のラベルIDを設定

# 最後の顔検出時刻
lastTime = None

# メインループ
while True:
    # カメラからのフレームの取得
    ret, frame = cap.read()
    
    # フレームのグレースケール化
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 顔の検出
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    # 検出された顔に対する処理
    for (x, y, w, h) in faces:
        # 検出自の処理(検出から1分たったら再度イベント動かす)
        if lastTime is None or time.perf_counter() - lastTime > 60:
            # 検出時刻更新
            lastTime = time.perf_counter()
            # メール処理関数を呼び出し
            process_email(service, label_id)

# 後処理
cap.release()
cv2.destroyAllWindows()

で実行したら成功したが
間隔を一分にしたため
メールばかり読み上げになる

なので感覚を20分ぐらいに変更する

        # 検出自の処理(検出から20分たったら再度イベント動かす)
        if lastTime is None or time.perf_counter() - lastTime > 1200:

へ変更した

とりあえず動作確認は取れたので
次に設定ファイルを作成し
コードのメンテをしやすくする

とりあえず
docker サーバーIP
gmailのラベル
次の検出までの待ち時間
は設定ファイルにまとめておき
これを編集するだけでできるようにする

ダウンロード機能の修正とモジュール化

ダウンロード機能の修正とモジュール化

import fitz  # PyMuPDF
from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                    
                    # PDFファイルから文字数をカウント
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    print(f"PDF内の文字数: {char_count}")
                    
                    # 文字数が100文字以上の場合は別の音声ファイルを再生
                    if char_count >= 100:
                        playsound('notice_pdf.wav')

                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")

        # メール本文をテキストファイルに保存
        save_email_body_to_text(body)
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

としたらリンクがエラーになる

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    if urls:
        for url in urls:
            print(f"プレビューリンクを検索するURL: {url}")
            # プレビューリンクを取得
            preview_url = find_preview_link(url)
            if preview_url:
                print(f"プレビューリンク: {preview_url}")
                # プレビューリンクからPDFファイルのダウンロードを試みる
                download_pdf(preview_url, file_path="downloaded_file.pdf")
            else:
                print("プレビューリンクが見つかりませんでした。")
    else:
        print("メールにURLが見つかりませんでした。")

    # メール本文をテキストファイルに保存
    save_email_body_to_text(body)

if __name__ == "__main__":
    main()

だと問題ない

とりあえず

import fitz  # PyMuPDF
from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf
import subprocess
import pygame
import time

def generate_and_play_audio_from_text(text):
    # JSONファイルを作成するためのcurlコマンド
    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]

    # 音声ファイルを作成するためのcurlコマンド
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]

    # JSONファイルと音声ファイルを作成
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)

    # Pygameで音声ファイルを再生
    pygame.init()
    pygame.mixer.init()
    sound = pygame.mixer.Sound("audio_output.wav")
    sound.play()
    while pygame.mixer.get_busy():
        time.sleep(0.1)

def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')
        generate_and_play_audio_from_text(body)
        

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                    
                    # PDFファイルから文字数をカウント
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    print(f"PDF内の文字数: {char_count}")
                    
                    # 文字数が100文字以上の場合は別の音声ファイルを再生
                    if char_count >= 100:
                        playsound('notice_pdf.wav')

                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

とすることで
メールの読み上げと
PDFのダウンロードができた
また長文のPDFに関してはPDFを見るように促すようにした

あとは顔を認識したら起動するようにする

まずはこれをモジュールにする

import fitz
from playsound import playsound
import subprocess
import pygame
import time

# テキストから音声を生成して再生する関数
def generate_and_play_audio_from_text(text):
    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)
    pygame.init()
    pygame.mixer.init()
    sound = pygame.mixer.Sound("audio_output.wav")
    sound.play()
    while pygame.mixer.get_busy():
        time.sleep(0.1)

# PDFファイルから文字数をカウントする関数
def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

# メールの処理を行う関数
def process_email(service, label_id):
    from gmail_utils import gmail_get_latest_unread_message_body
    from pdf_downloader import find_preview_link, download_pdf

    body, urls = gmail_get_latest_unread_message_body(service, label_id)
    if body:
        playsound('notice.wav')
        generate_and_play_audio_from_text(body)
        if urls:
            for url in urls:
                preview_url = find_preview_link(url)
                if preview_url:
                    download_pdf(preview_url, "downloaded_file.pdf")
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    if char_count >= 100:
                        playsound('notice_pdf.wav')
                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")
    else:
        print("未読メールはありません。")

として

email_processor.py

として保存

次に顔を検出したらこれを呼び出すようにする

Gmail読み上げとメールのお知らせを作る

Gmail読み上げとメールのお知らせを作る

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    if urls:
        for url in urls:
            print(f"プレビューリンクを検索するURL: {url}")
            # プレビューリンクを取得
            preview_url = find_preview_link(url)
            if preview_url:
                print(f"プレビューリンク: {preview_url}")
                # プレビューリンクからPDFファイルのダウンロードを試みる
                download_pdf(preview_url, file_path="downloaded_file.pdf")
            else:
                print("プレビューリンクが見つかりませんでした。")
    else:
        print("メールにURLが見つかりませんでした。")

    # メール本文をテキストファイルに保存
    save_email_body_to_text(body)

if __name__ == "__main__":
    main()

でテキストファイルの作成とPDF取得ができているので
お知らせメッセージの
notice.wavを再生する

GPTによれば

pip install playsound

でインストールし
コードを変更する

from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")

        # メール本文をテキストファイルに保存
        save_email_body_to_text(body)
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

変更点は

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

で音声ファイルを読み込んでいること

Main.pyの中で

    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

を追加して音声を出している

次にPDFの文字数を調べて100文字以上なら
音声でPDFの内容を確認するように促す

あとはメール本文の読み上げ処理だけ

PyMuPDFを使い
文字数をカウント、文字数が100文字以上ならPDFを確認する音声を再生するようにコード変更

import fitz  # PyMuPDF
from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                    
                    # PDFファイルから文字数をカウント
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    print(f"PDF内の文字数: {char_count}")
                    
                    # 文字数が100文字以上の場合は別の音声ファイルを再生
                    if char_count >= 100:
                        playsound('notice_pdf.wav')

                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")

        # メール本文をテキストファイルに保存
        save_email_body_to_text(body)
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

あとはvoicevoxの処理のみ

とりあえずdockerを起動させる

docker run -d -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest

でやったら普通に音声が再生できたので
単純に負荷でdockerが落ちてたみたい

文章を音声で読み上げしたが
どうやら最初の文章に日付と宛先の名前がくっついている
毎回main3.pyを実行するたびに他のメールでも同じなので
最初の文章を削除する

Gmail未読メールの読み上げ

Gmail未読メールの読み上げ

cp mail_message.py unread_mail_message.py


ファイルをコピーしてから改造
未読の最新の1件のみ取得するようにする

ChatGPTでの出力
最新の未読メール1件のみを取得するようにコードを修正します。
既にmessages.list API呼び出しでmaxResults=1を指定しているため、最新の1件だけを取得しています。
しかし、一番新しいメールが未読でない可能性があるため、
念のためにメッセージのリストをループする部分を削除し、最初のメッセージのみを処理するようする

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 最新の未読メール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 最新のメッセージ(最新の未読メール)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br>" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()

# ラベル ID を指定して最新の未読メール本文を取得
latest_unread_message_body = gmail_get_latest_unread_message_body(service, "Label_4")
print(latest_unread_message_body)

これで最新の1件のみ取得できるようになった
さらに差し出し元などもなく本文のみ取得している

URLの削除もできているが
メルマガによっては
——
などで区切っているところがある
これは読み上げるとどうなるのか?

もしダメなら、削除項目を増やすこと

とりあえず本文を読み上げる

まず支援学校のメールのIDになるように

latest_unread_message_body = gmail_get_latest_unread_message_body(service, "Label_4")

Label_4の部分を変える

次に
本文が

特別支援学校よりメッセージをお預かりしています。
下記URLをクリックして内容を確認してください。



※学校からのお知らせの保存期間は1年間です。
重要なお知らせは、スクリーンショットなどでお手元に保存してください。
※お手元で保存された画像等データのお取り扱いにつきましては、個人情報保護に十分ご配慮ください。

となって取得できているので
記号がどうなるか試すため
Docker を起動して実験

docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest

あとは
取得したメールの本文が
mail.txt
にあるので中のテキストをもとにして音声を作成する

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@mail.txt > query.json

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > mail_audio.wav

普通に問題なく記号や句読点などは読み上げることなく成功した

支援学校は問題なさそうなので
メルマガで実験する

サーチマン佐藤さんのメルマガを使い実験

mail_searchman.txt

として保存し

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@mail_searchman.txt > query.json

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > mail_searchman_audio.wav

で作成し音声を作成

しかし途中でdocker が過負荷で落ちる
このためおそらくこの方法ではメルマガなどの長文を再生することは出来なさそう

pythonでGmailの最新、または未読の本文を取得する

最新のものだけ取得するようにする

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# 最新のメール本文を取得する関数
def gmail_get_latest_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, maxResults=1).execute()

    if not msg_list['messages']:
        return "No messages found."

    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    if msg["payload"]["body"]["size"] != 0:
        return date + "<br>" + decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                return date + "<br>" + decode(part["body"]["data"])
    return date + "<br> No body content"

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して最新のメール本文を取得
latest_message_body = gmail_get_latest_message_body(service, "Label_4")
print(latest_message_body)

これで最新の1件のメールのみ取得できるようになる

次に未読のものだけ取得するようにする

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# 未読メールの本文を取得する関数
def gmail_get_unread_messages_body(service, labelIdsValue):
    mailBody = []
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread").execute()

    if 'messages' not in msg_list:
        return ["No unread messages found."]

    for msg in msg_list['messages']:
        date = gmail_get_messages_body_date(messages, msg)
        topid = msg['id']
        msg = messages.get(userId='me', id=topid).execute()

        # メールの本文を取得
        if msg["payload"]["body"]["size"] != 0:
            mailBody.append(date + "<br>" + decode(msg["payload"]["body"]["data"]))
        elif 'parts' in msg["payload"]:
            # メール本文が parts 属性にある場合
            for part in msg["payload"]["parts"]:
                if part["body"]["size"] != 0:
                    mailBody.append(date + "<br>" + decode(part["body"]["data"]))
                    break
        else:
            mailBody.append(date + "<br> No body content")

    return mailBody

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して未読メールの本文を取得
unread_messages_body = gmail_get_unread_messages_body(service, "Label_4")
for body in unread_messages_body:
    print(body)

未読のメールのみを表示するようにコードを修正するには
Gmail APIのクエリパラメータに q=”is:unread” を追加する

q=”is:unread” パラメータを messages.list
API呼び出しに追加することで、未読のメールのみがフィルタリングされる

取得したメールの中で
decode 関数を使用してメールの本文をデコードし
必要に応じて parts 属性を確認

未読メールがない場合
“No unread messages found.”
というメッセージが返され
未読メールがある場合は
それぞれのメールについて受信日時と本文が表示される

Label_4
に属する未読メールのみを処理しているので
他のラベルや追加のフィルタリング条件を使用したい場合は
labelIdsValue 引数や q パラメータを適宜変更する

この場合
過去全てからになってしまうので期間を設定する

未読で最新のものを取得するようにする
そしてURLを本文から削除

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 未読で最新のメール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread").execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 未読メッセージのリストを取得し、最初のメッセージ(最新)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()


    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して未読メールの本文を取得
latest_unread_message_body  = gmail_get_latest_unread_message_body(service, "Label_4")
print(latest_unread_message_body)


しかしこれだと本文の中のURLのみ削除しているため
発信元とかについては削除されていないので
これも対処する必要がある

Gmail 本文取得

pythonでGmail 本文取得

まずラベルを指定して取得するには
ラベルIDが必要になる

vim base.py

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

import base64, email #デコード用
import dateutil.parser

#token.jsonを設定
tokenPath = "token.json"

#credentials.jsonを設定
credentialsPath = "credentials.json"


#メール本文のデコード
def decode(encoded):
   decoded = base64.urlsafe_b64decode(encoded).decode()
   return decoded


#メール本文の内容を配列で取得する関数
def gmail_get_messages_body(service, labelIdsValue):
   mailBody = []
   
   # メッセージの一覧を取得
   messages = service.users().messages()
   msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute() 
   # msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定
   
   # 取得したメッセージの一覧を配列に格納
   for msg in msg_list['messages']:
       
       #メールの受信日時を取得
       date = gmail_get_messages_body_date(messages,msg)
       
       topid = msg['id']     
       msg = messages.get(userId='me', id=topid).execute()
       
       if(msg["payload"]["body"]["size"]!=0):
           mailBody.append(date+"<br>"+decode(msg["payload"]["body"]["data"])) 
       else:
           #メールによっては"parts"属性の中に本文がある場合もある
           mailBody.append(date+"<br>"+decode(msg["payload"]["parts"][0]["body"]["data"])) 

   return mailBody
   

#gmail_get_messages_body関数内で受信日時を取得する関数
def gmail_get_messages_body_date(messages,msg):
   msg_id = msg['id']
   m = messages.get(userId='me', id=msg_id, format='raw').execute()
   raw = base64.urlsafe_b64decode(m['raw'])
   
   # Emailを解析する
   eml = email.message_from_bytes(raw)
   
   date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
   return date

#ラベルのIDやnameを表示する関数
def gmail_display_label(service):
   results = service.users().labels().list(userId='me').execute()
   labels = results.get('labels', [])

   if not labels:
       print('No labels found.')
   else:
       print('Labels:')
       for label in labels:
           print(label)


#初期化(from quickstart.py)
def gmail_init():
   creds = None
   # The file token.json stores the user's access and refresh tokens, and is
   # created automatically when the authorization flow completes for the first
   # time.
   if os.path.exists(tokenPath):
       creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
   # If there are no (valid) credentials available, let the user log in.
   if not creds or not creds.valid:
       if creds and creds.expired and creds.refresh_token:
           creds.refresh(Request())
       else:
           flow = InstalledAppFlow.from_client_secrets_file(
               credentialsPath, SCOPES)
           creds = flow.run_local_server(port=0)
       # Save the credentials for the next run
       with open(tokenPath, 'w') as token:
           token.write(creds.to_json())
           
   service = build('gmail', 'v1', credentials=creds)
   return service


#ここからやりたい処理を書く
service = gmail_init()

#quickstart.pyと同じ処理にしてみた
results = service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])

if not labels:
   print('No labels found.')
else:
   print('Labels:')
   for label in labels:
       print(label['name'])
       
gmail_display_label(service)

として

python base.py

を実行すると
ラベルとID一覧が表示される

これでラベルIDがわかったので本文を取得するようにコード変更

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# メール本文を取得する関数
def gmail_get_messages_body(service, labelIdsValue):
    mailBody = []
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute()

    for msg in msg_list['messages']:
        date = gmail_get_messages_body_date(messages, msg)
        topid = msg['id']
        msg = messages.get(userId='me', id=topid).execute()

        # メールの本文を取得
        if msg["payload"]["body"]["size"] != 0:
            mailBody.append(date + "<br>" + decode(msg["payload"]["body"]["data"]))
        elif 'parts' in msg["payload"]:
            # メール本文が parts 属性にある場合
            for part in msg["payload"]["parts"]:
                if part["body"]["size"] != 0:
                    mailBody.append(date + "<br>" + decode(part["body"]["data"]))
                    break
        else:
            mailBody.append(date + "<br> No body content")

    return mailBody

# メールの受信日時を取得する関数
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定してメール本文を取得
mail_bodies = gmail_get_messages_body(service, "Label_4")
for body in mail_bodies:
    print(body)

これで

python base.py

とすると
Lavel_4のIDのメールの本文を表示することができる

gmail読み上げ (タイトル取得まで)

gmail読み上げ (タイトル取得まで)

ダウンロードしたファイルをコピーしてリネームする
詳細は
https://developers.google.com/gmail/api/quickstart/python?hl=ja
にあるが
とりあえず
credentials.json
にしておく

cd Downloads
cp client_secret_336287491272-faqvqnb4hjrg4ragurjh8nfhn2s3ujkg.apps.googleusercontent.com.json credentials.json

のあと作業ディレクトリ作成

mkdir -p mail_auto

ここへ認証ファイルの credentials.json をコピーする

あとはライブラリのインストール

pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

次に
quickstart.py
を作成し
コピペして動作確認

from __future__ import print_function

import os.path

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']


def main():
    """Shows basic usage of the Gmail API.
    Lists the user's Gmail labels.
    """
    creds = None
    # The file token.json stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first
    # time.
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    # If there are no (valid) credentials available, let the user log in.
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # Save the credentials for the next run
        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    try:
        # Call the Gmail API
        service = build('gmail', 'v1', credentials=creds)
        results = service.users().labels().list(userId='me').execute()
        labels = results.get('labels', [])

        if not labels:
            print('No labels found.')
            return
        print('Labels:')
        for label in labels:
            print(label['name'])

    except HttpError as error:
        # TODO(developer) - Handle errors from gmail API.
        print(f'An error occurred: {error}')


if __name__ == '__main__':
    main()

保存したら

python3 quickstart.py

で実行

この時に認証画面が出るが
セキュリティの危険があるような警告が出る
これはサーバーのオレオレ証明書の時のようなものなので
気にせず続ける

認証が終わると
メールのラベル一覧が表示される