JavaScriptで作成されたページからPDFダウンロード

JavaScriptで作成されたページからPDFダウンロード

pip install selenium

でインストール

from selenium import webdriver

# Safari WebDriverのインスタンスを作成
driver = webdriver.Safari()

# Googleのホームページにアクセス
driver.get("https://www.google.com")

# ウェブページのタイトルをコンソールに表示
print(driver.title)

# ブラウザを閉じる
driver.quit()

で実行できたので
safariでseleniumで行ってみて
成功したらubuntu でchrome などで行うようにしてみる

とりあえずまずはリンクを表示できるかテスト

まず、SafariのWebDriverを有効にし、SeleniumでSafariを使用する準備を整える必要があることを思い出してください。これには、Safariの「開発」メニューで「リモートオートメーションを許可」を有効にする手順が含まれます。

これは
Safari > 設定 > 詳細タブをクリック
Webデベロッパ用の機能を表示をチェックし

デベロッパタブをクリックし
リモートオートメーションを許可にチェックを入れることでできる

from selenium import webdriver
from selenium.webdriver.common.by import By

# WebDriverの初期化(Safari)
driver = webdriver.Safari()

# ページにアクセス
driver.get("https://message.cocoo.education/message/ファイル番号")

# ページが完全にロードされるまで待機
driver.implicitly_wait(10)  # 秒

# 'プレビュー'リンクを探す
try:
    preview_link = driver.find_element(By.LINK_TEXT, "プレビュー")
    print("プレビューリンク:", preview_link.get_attribute('href'))
except:
    print("プレビューリンクが見つかりませんでした。")

# ブラウザを閉じる
driver.quit()

これでブラウザが立ち上がり
ログにPDFのリンクが表示される

つまりselenium ならURLの取得が可能

なのでPDF取得まで行うようにする
しかし

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Safari()
driver.get("https://message.cocoo.education/message/ファイル番号")

try:
    # 要素が見えるまで最大10秒間待機します。
    preview_button = WebDriverWait(driver, 10).until(
        EC.visibility_of_element_located((By.XPATH, '//button[contains(text(),"プレビュー")]'))
    )
    preview_button.click()
except Exception as e:
    print("エラーが発生しました:", e)
finally:
    driver.quit()

というようにページの動作を待つように変更したが変わらない

このためやはり機能を分割し
まずgmail からURLリンクを取得
次にこのURLからseleniumでプレビューボタンおURLを取得
取得したボタンURLからPDFを取得
としてみる

とりあえずPDFのリンクを取得するため
バックグランドでseleniumを実行し
PDFのリンクをテキストとして入れるメソッドに書き換える

from selenium import webdriver
from selenium.webdriver.common.by import By

# WebDriverの初期化(Safari)
driver = webdriver.Safari()

# ページにアクセス
driver.get("https://message.cocoo.education/message/ファイル番号")

# ページが完全にロードされるまで待機
driver.implicitly_wait(10)  # 秒

# 'プレビュー'リンクを探す
try:
    preview_link = driver.find_element(By.LINK_TEXT, "プレビュー")
    print("プレビューリンク:", preview_link.get_attribute('href'))
except:
    print("プレビューリンクが見つかりませんでした。")

# ブラウザを閉じる
driver.quit()

しかしsafariだとヘッドレスモードがない
まぁ試作品なので
とりあえずこれをモジュールにするだけにしておく

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

def find_preview_link(url, link_text="プレビュー"):
    # WebDriverの初期化(Safari)
    driver = webdriver.Safari()
    
    try:
        # ページにアクセス
        driver.get(url)
        
        # ページが完全にロードされるまで待機
        driver.implicitly_wait(10)  # 秒
        
        # 'プレビュー'リンクを探す
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        
        # リンクのURLを返す
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        # ブラウザを閉じる
        driver.quit()

# 使用例
url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223"
link_url = find_preview_link(url)
if link_url:
    print("プレビューリンク:", link_url)
else:
    print("プレビューリンクが見つかりませんでした。")

これでモジュールにできる

次にこのPDFのリンクを使いPDFをダウンロードする

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def find_preview_link(url, link_text="プレビュー"):
    driver = webdriver.Safari()
    try:
        driver.get(url)
        driver.implicitly_wait(10)  # 秒
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        driver.quit()

# 使用例
url = "https://message.cocoo.education/message/ファイル番号"
pdf_url = find_preview_link(url)
if pdf_url:
    print("プレビューリンク:", pdf_url)

    # ダウンロードしたいPDFファイルのURL
    response = requests.get(pdf_url)

    # レスポンスのステータスコードが200(成功)の場合、ファイルを保存
    if response.status_code == 200:
        with open("downloaded_file.pdf", "wb") as f:
            f.write(response.content)
        print("ファイルが正常にダウンロードされました。")
    else:
        print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code)
else:
    print("プレビューリンクが見つかりませんでした。")

これでOK

次にこのメソッドの最初の取得対象URLをgmailから取得する

以前

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 最新の未読メール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 最新のメッセージ(最新の未読メール)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br>" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()

# ラベル ID を指定して最新の未読メール本文を取得
latest_unread_message_body = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)

でURLを削除した本文を表示するものを作成した

これをURLと本文をそれぞれの変数に格納して返すようなメソッドに書き換える

import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを検出してリストに格納する関数
def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

# 最新の未読メール本文とURLを取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content", []

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
print(urls)

これで本文とURLの分離取得ができた

次にこのメソッドで得られたURLからPDFをダウンロードするようにしたい

その前にモジュールかしておく

vim gmail_utils.py

中身を

# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# メール本文のデコード関数
def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

# URLを検出してリストに格納する関数
def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

# 最新の未読メール本文とURLを取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

# メールの受信日時を取得する関数
def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数
def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

として

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body

# Gmail API サービスを初期化
service = gmail_init()

# ラベルIDを指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "INBOX")
print(latest_unread_message_body)
for url in urls:
    print(url)

のINBOXをラベルIDに変える

これで実行すると未読メールが表示される

次にこれを使いPDFのダウンロード

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def download_pdf_from_gmail_links():
    service = gmail_init()  # Gmail API サービスを初期化
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    _, urls = gmail_get_latest_unread_message_body(service, "INBOX")

    if not urls:
        print("メールからURLを抽出できませんでした。")
        return

    for url in urls:
        pdf_url = find_preview_link(url)
        if pdf_url:
            print("プレビューリンク:", pdf_url)
            try:
                response = requests.get(pdf_url)
                if response.status_code == 200:
                    filename = "downloaded_file.pdf"
                    with open(filename, "wb") as f:
                        f.write(response.content)
                    print(f"ファイルが正常にダウンロードされました: {filename}")
                else:
                    print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")
            except Exception as e:
                print(f"ダウンロード中にエラーが発生しました: {e}")
        else:
            print("プレビューリンクが見つかりませんでした。")

download_pdf_from_gmail_links()

コードを大幅に変更

# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service
# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

としたらバグった

とりあえず

gmail_get_text_and_url.py

でのURLと本文の取得

get_pdf.py

でPDFの取得はできた

あとはこの2つを組み合わせるだけ

vim gmail_api_utils.py

でgmailのテキストとURLを取り出す機能をモジュールにして他で呼び出せるようにする

ファイルの内容は

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
print(urls)

より上の部分のみ抽出して保存

main.py

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
for url in urls:
    print(url)

というように

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

でインポートして

service = gmail_init()

で初期化

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
for url in urls:
    print(url)

で呼び出せば同じ効果が出る

同様にPDFダウンロードもモジュールにする

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def find_preview_link(url, link_text="プレビュー"):
    driver = webdriver.Safari()  # またはChrome(), Firefox() など、使用するブラウザに合わせて変更
    try:
        driver.get(url)
        driver.implicitly_wait(10)  # 秒
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        driver.quit()

def download_pdf(url, file_path="downloaded_file.pdf"):
    if url:
        print("プレビューリンク:", url)
        response = requests.get(url)
        if response.status_code == 200:
            with open(file_path, "wb") as f:
                f.write(response.content)
            print("ファイルが正常にダウンロードされました。")
        else:
            print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code)
    else:
        print("プレビューリンクが見つかりませんでした。")

これを

from pdf_downloader import find_preview_link, download_pdf

# 使用例
url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223"
pdf_url = find_preview_link(url)
download_pdf(pdf_url)

として保存し実行すればOK

あとは組み合わせる

しかし
プレビューリンクが
https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイル番号
というAmazonS3へのリンク
のはずがgmailから抽出したリンクになっているのでエラー

原因は

            # ここでは、プレビューリンクを直接取得する機能がないため、URLがPDFファイルを指していると仮定しています
            download_pdf(url, file_path="downloaded_file.pdf")

の部分
なのでURLが違う

これを修正する
なおGPTで出力した時にはINBOXになっているため
指定のラベルに変更することが必要になる

main3.py

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
    if urls:
        for url in urls:
            print(f"プレビューリンクを検索するURL: {url}")
            # プレビューリンクを取得
            preview_url = find_preview_link(url)
            if preview_url:
                print(f"プレビューリンク: {preview_url}")
                # プレビューリンクからPDFファイルのダウンロードを試みる
                download_pdf(preview_url, file_path="downloaded_file.pdf")
            else:
                print("プレビューリンクが見つかりませんでした。")
    else:
        print("メールにURLが見つかりませんでした。")

    # メール本文をテキストファイルに保存
    save_email_body_to_text(body)

if __name__ == "__main__":
    main()

とすることで
メール本文はテキストファイルへ
PDFはダウンロードすることが完了した

指定したURLのプレビューボタンをクリックしPDFをダウンロードするpythonコードの作成(失敗編)

指定したURLのプレビューボタンをクリックしPDFをダウンロードするpythonコードの作成

amazonS3からファイルの取得はできたけど
動的サイトからのリンク取得はこの方法ではできなかった

以下ログ

指定したURLにアクセスし
プレビューと書かれたボタンをクリック、表示されるPDFファイルをダウンロードするpythonコード

chatGPTで作成していく

pip install requests beautifulsoup4

で必要ライブラリのインストール

import requests
from bs4 import BeautifulSoup

# ステップ2: 指定されたURLにGETリクエストを送信し、HTMLを取得
url = "https://任意のURL"
response = requests.get(url)
html = response.content

# ステップ3: BeautifulSoupを使用してHTMLを解析し、プレビューボタンのリンクを見つける
soup = BeautifulSoup(html, 'html.parser')
preview_button = soup.find('a', text='プレビュー')  # ここではaタグでテキストが'プレビュー'の要素を探していますが、実際の構造に応じて変更する必要があります。

# ステップ4: プレビューボタンのリンクを取得し、PDFファイルをダウンロード
if preview_button:
    pdf_url = preview_button['href']  # href属性からPDFのURLを取得
    pdf_response = requests.get(pdf_url)
    with open('document.pdf', 'wb') as f:
        f.write(pdf_response.content)
    print("PDFファイルがダウンロードされました。")
else:
    print("プレビューボタンが見つかりませんでした。")

で実行したが

/Users/snowpool/aw10s/mail_auto/get_pdf.py:11: DeprecationWarning: The 'text' argument to find()-type methods is deprecated. Use 'string' instead.
  preview_button = soup.find('a', text='プレビュー')  # ここではaタグでテキストが'プレビュー'の要素を探していますが、実際の構造に応じて変更する必要があります。
プレビューボタンが見つかりませんでした。

となる

ChatGPTでこの結果を調べると

警告メッセージに基づき、text引数の代わりにstring引数を使用する必要があります。また、プレビューボタンが見つからなかった問題に関しては、ページのHTML構造を確認せずに正確な解決策を提供するのは難しいですが、一般的なアドバイスをいくつか提供できます。
* 		string引数の使用: findメソッドでtextの代わりにstringを使用してみてください。
* 		正しい要素を特定する: プレビューボタンを特定するために、ページのHTMLソースを確認し、ボタンの正確なタグ名、クラス名、またはIDを特定します。この情報を元に、findまたはfind_allメソッドを適切に使用してください。
* 		動的コンテンツの考慮: もしプレビューボタンがJavaScriptによって動的に生成されている場合、requestsとBeautifulSoupだけでは取得できないかもしれません。この場合は、seleniumのようなブラウザ自動化ツールを使用して実際のブラウザセッションを介してページを読み込み、要素を探す必要があります。

となる

以下は、string引数を使用した修正されたコードの例

import requests
from bs4 import BeautifulSoup

# ステップ2: 指定されたURLにGETリクエストを送信し、HTMLを取得
url = "https://任意のURL"
response = requests.get(url)
html = response.content

# ステップ3: BeautifulSoupを使用してHTMLを解析し、プレビューボタンのリンクを見つける
soup = BeautifulSoup(html, 'html.parser')
preview_button = soup.find('a', string='プレビュー')  # `text`の代わりに`string`を使用

# ステップ4: プレビューボタンのリンクを取得し、PDFファイルをダウンロード
if preview_button:
    pdf_url = preview_button['href']  # href属性からPDFのURLを取得
    pdf_response = requests.get(pdf_url)
    with open('document.pdf', 'wb') as f:
        f.write(pdf_response.content)
    print("PDFファイルがダウンロードされました。")
else:
    print("プレビューボタンが見つかりませんでした。")

でもだめ

seleniumで取得してみる

pip install selenium


seleniumをインストール

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
import time

# Chromeのダウンロード設定
chrome_options = Options()
prefs = {
    "download.default_directory": "/path/to/download/directory",  # ダウンロードフォルダーのパスを設定
    "download.prompt_for_download": False,  # ダウンロード前に確認を求めない
    "download.directory_upgrade": True,
    "plugins.always_open_pdf_externally": True  # PDFファイルを自動的にダウンロード
}
chrome_options.add_experimental_option("prefs", prefs)

# Chrome WebDriverのパス
webdriver_path = '/path/to/chromedriver'  # ChromeDriverのパス

# WebDriverサービスを設定
service = Service(webdriver_path)

# WebDriverの初期化
driver = webdriver.Chrome(service=service, options=chrome_options)

# Webページにアクセス
driver.get("https://任意のURL")

# プレビューボタンを探してクリック(要素のXPathを使用)
# 実際のボタンのXPathに応じて調整してください
try:
    preview_button = driver.find_element(By.XPATH, '//button[contains(text(),"プレビュー")]')
    preview_button.click()
    time.sleep(5)  # ダウンロード完了を待つ
except Exception as e:
    print("エラーが発生しました:", e)

# ブラウザを閉じる
driver.quit()

でもだめ

ならば chrome developer tools で一度サイトの構成を調べる

<a href="https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイルの保存先" target="_blank">プレビュー</a>

となっていたが
これの構成で seleniumでダウンロードしようとしてもエラー

なので

import requests

# PDFファイルのURL
pdf_url = "https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイルの保存先"

# リクエストを送信してPDFファイルを取得
response = requests.get(pdf_url)

# レスポンスのステータスコードが200(成功)の場合、ファイルを保存
if response.status_code == 200:
    with open("downloaded_file.pdf", "wb") as file:
        file.write(response.content)
    print("PDFファイルが正常にダウンロードされました。")
else:
    print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")

として直接リンクのURLからファイルを取得

これは成功

なので、
GMAILからリンクURLを抜き出しサイトへアクセスする機能
次にそのページからリンクボタンのURLを抜き出す機能
最後にPDFをダウンロードする機能
と分けて作成する

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# 'プレビュー'リンクを探す
preview_link = soup.find('a', text='プレビュー')
if preview_link:
    href_value = preview_link['href']
    print("プレビューのリンク:", href_value)
else:
    print("プレビューリンクが見つかりませんでした。")

しかし

/Users/snowpool/aw10s/mail_auto/get_pdf_url.py:15: DeprecationWarning: The 'text' argument to find()-type methods is deprecated. Use 'string' instead.
  preview_link = soup.find('a', text='プレビュー')
プレビューリンクが見つかりませんでした。

となる

このため
BeautifulSoupの最新バージョンでは、findメソッド(および関連メソッド)でtext引数の代わりにstring引数を使用するよう推奨
とのことのため

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# 'プレビュー'リンクを探す('string'引数を使用)
preview_link = soup.find('a', string='プレビュー')
if preview_link:
    href_value = preview_link['href']
    print("プレビューのリンク:", href_value)
else:
    # 'プレビュー'を含むテキストを持つリンクを探す場合の代替手段
    preview_links = soup.find_all('a')
    for link in preview_links:
        if 'プレビュー' in link.text:
            print("プレビューのリンク:", link['href'])
            break
    else:
        print("プレビューリンクが見つかりませんでした。")

へ変更したが

プレビューリンクが見つかりませんでした。
となる

webページの構造が予想と異なるか、リンクが動的に生成されている可能性があり
のようなケースでは、ブラウザの自動操作が可能なSeleniumのようなツールを使用する必要がある

念の為Javascriptで書かれているかチェック

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# ページ内のすべてのaタグを探索
links = soup.find_all('a')
found = False
for link in links:
    if link.text and 'プレビュー' in link.text:
        print("プレビューのリンク:", link['href'])
        found = True
        break

if not found:
    print("プレビューリンクが見つかりませんでした。")

の結果

プレビューリンクが見つかりませんでした

となったため
Seleniumを使うことにする

seleniumを使うにはwebdriverが必要

Pdf の中身を取り出す

Gmail でPDFをダウンロードし、PDFを読み上げしたいので
まずPDFの中身を読み出す

まずはPDF単体の取り扱いから

pip install pdfminer.six requests

でインストール

vim text_from_pdf.py

from pdfminer.high_level import extract_text

def extract_text_from_pdf(pdf_file):
    return extract_text(pdf_file)

pdf_text = extract_text_from_pdf("your_file.pdf")
print(pdf_text)  # PDFの内容を表示

ファイル名を

tunagu.pdf

に変えてみる

実行結果は
読み上げできる量を超える可能性が高いため

念の為文字数をカウントする

vim pdf_text.txt

でこの実行結果の文章を保存する

次に

vim count_characters_in_text_file

でファイルの文字数をカウントするようにする

def count_characters_in_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
        return len(content)

file_path = 'path/to/your/textfile.txt'
char_count = count_characters_in_text_file(file_path)
print(f"The file has {char_count} characters.")

のファイルを

pdf_text.txt

に変更して実行

The file has 995 characters.

となるため
Voicevox の扱える量を超える
このため対策が必要

あとファイルの取得に関しては
そのまま
メール内の
ファイルリンク
へアクセスしてもできたので
これを直接ダウンロードできるか試す

Gmail未読メールの読み上げ

Gmail未読メールの読み上げ

cp mail_message.py unread_mail_message.py


ファイルをコピーしてから改造
未読の最新の1件のみ取得するようにする

ChatGPTでの出力
最新の未読メール1件のみを取得するようにコードを修正します。
既にmessages.list API呼び出しでmaxResults=1を指定しているため、最新の1件だけを取得しています。
しかし、一番新しいメールが未読でない可能性があるため、
念のためにメッセージのリストをループする部分を削除し、最初のメッセージのみを処理するようする

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 最新の未読メール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 最新のメッセージ(最新の未読メール)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br>" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()

# ラベル ID を指定して最新の未読メール本文を取得
latest_unread_message_body = gmail_get_latest_unread_message_body(service, "Label_4")
print(latest_unread_message_body)

これで最新の1件のみ取得できるようになった
さらに差し出し元などもなく本文のみ取得している

URLの削除もできているが
メルマガによっては
——
などで区切っているところがある
これは読み上げるとどうなるのか?

もしダメなら、削除項目を増やすこと

とりあえず本文を読み上げる

まず支援学校のメールのIDになるように

latest_unread_message_body = gmail_get_latest_unread_message_body(service, "Label_4")

Label_4の部分を変える

次に
本文が

特別支援学校よりメッセージをお預かりしています。
下記URLをクリックして内容を確認してください。



※学校からのお知らせの保存期間は1年間です。
重要なお知らせは、スクリーンショットなどでお手元に保存してください。
※お手元で保存された画像等データのお取り扱いにつきましては、個人情報保護に十分ご配慮ください。

となって取得できているので
記号がどうなるか試すため
Docker を起動して実験

docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest

あとは
取得したメールの本文が
mail.txt
にあるので中のテキストをもとにして音声を作成する

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@mail.txt > query.json

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > mail_audio.wav

普通に問題なく記号や句読点などは読み上げることなく成功した

支援学校は問題なさそうなので
メルマガで実験する

サーチマン佐藤さんのメルマガを使い実験

mail_searchman.txt

として保存し

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@mail_searchman.txt > query.json

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > mail_searchman_audio.wav

で作成し音声を作成

しかし途中でdocker が過負荷で落ちる
このためおそらくこの方法ではメルマガなどの長文を再生することは出来なさそう

pythonでGmailの最新、または未読の本文を取得する

最新のものだけ取得するようにする

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# 最新のメール本文を取得する関数
def gmail_get_latest_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, maxResults=1).execute()

    if not msg_list['messages']:
        return "No messages found."

    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    if msg["payload"]["body"]["size"] != 0:
        return date + "<br>" + decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                return date + "<br>" + decode(part["body"]["data"])
    return date + "<br> No body content"

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して最新のメール本文を取得
latest_message_body = gmail_get_latest_message_body(service, "Label_4")
print(latest_message_body)

これで最新の1件のメールのみ取得できるようになる

次に未読のものだけ取得するようにする

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# 未読メールの本文を取得する関数
def gmail_get_unread_messages_body(service, labelIdsValue):
    mailBody = []
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread").execute()

    if 'messages' not in msg_list:
        return ["No unread messages found."]

    for msg in msg_list['messages']:
        date = gmail_get_messages_body_date(messages, msg)
        topid = msg['id']
        msg = messages.get(userId='me', id=topid).execute()

        # メールの本文を取得
        if msg["payload"]["body"]["size"] != 0:
            mailBody.append(date + "<br>" + decode(msg["payload"]["body"]["data"]))
        elif 'parts' in msg["payload"]:
            # メール本文が parts 属性にある場合
            for part in msg["payload"]["parts"]:
                if part["body"]["size"] != 0:
                    mailBody.append(date + "<br>" + decode(part["body"]["data"]))
                    break
        else:
            mailBody.append(date + "<br> No body content")

    return mailBody

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して未読メールの本文を取得
unread_messages_body = gmail_get_unread_messages_body(service, "Label_4")
for body in unread_messages_body:
    print(body)

未読のメールのみを表示するようにコードを修正するには
Gmail APIのクエリパラメータに q=”is:unread” を追加する

q=”is:unread” パラメータを messages.list
API呼び出しに追加することで、未読のメールのみがフィルタリングされる

取得したメールの中で
decode 関数を使用してメールの本文をデコードし
必要に応じて parts 属性を確認

未読メールがない場合
“No unread messages found.”
というメッセージが返され
未読メールがある場合は
それぞれのメールについて受信日時と本文が表示される

Label_4
に属する未読メールのみを処理しているので
他のラベルや追加のフィルタリング条件を使用したい場合は
labelIdsValue 引数や q パラメータを適宜変更する

この場合
過去全てからになってしまうので期間を設定する

未読で最新のものを取得するようにする
そしてURLを本文から削除

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 未読で最新のメール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread").execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 未読メッセージのリストを取得し、最初のメッセージ(最新)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()


    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定して未読メールの本文を取得
latest_unread_message_body  = gmail_get_latest_unread_message_body(service, "Label_4")
print(latest_unread_message_body)


しかしこれだと本文の中のURLのみ削除しているため
発信元とかについては削除されていないので
これも対処する必要がある

Gmail 本文取得

pythonでGmail 本文取得

まずラベルを指定して取得するには
ラベルIDが必要になる

vim base.py

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

import base64, email #デコード用
import dateutil.parser

#token.jsonを設定
tokenPath = "token.json"

#credentials.jsonを設定
credentialsPath = "credentials.json"


#メール本文のデコード
def decode(encoded):
   decoded = base64.urlsafe_b64decode(encoded).decode()
   return decoded


#メール本文の内容を配列で取得する関数
def gmail_get_messages_body(service, labelIdsValue):
   mailBody = []
   
   # メッセージの一覧を取得
   messages = service.users().messages()
   msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute() 
   # msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定
   
   # 取得したメッセージの一覧を配列に格納
   for msg in msg_list['messages']:
       
       #メールの受信日時を取得
       date = gmail_get_messages_body_date(messages,msg)
       
       topid = msg['id']     
       msg = messages.get(userId='me', id=topid).execute()
       
       if(msg["payload"]["body"]["size"]!=0):
           mailBody.append(date+"<br>"+decode(msg["payload"]["body"]["data"])) 
       else:
           #メールによっては"parts"属性の中に本文がある場合もある
           mailBody.append(date+"<br>"+decode(msg["payload"]["parts"][0]["body"]["data"])) 

   return mailBody
   

#gmail_get_messages_body関数内で受信日時を取得する関数
def gmail_get_messages_body_date(messages,msg):
   msg_id = msg['id']
   m = messages.get(userId='me', id=msg_id, format='raw').execute()
   raw = base64.urlsafe_b64decode(m['raw'])
   
   # Emailを解析する
   eml = email.message_from_bytes(raw)
   
   date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
   return date

#ラベルのIDやnameを表示する関数
def gmail_display_label(service):
   results = service.users().labels().list(userId='me').execute()
   labels = results.get('labels', [])

   if not labels:
       print('No labels found.')
   else:
       print('Labels:')
       for label in labels:
           print(label)


#初期化(from quickstart.py)
def gmail_init():
   creds = None
   # The file token.json stores the user's access and refresh tokens, and is
   # created automatically when the authorization flow completes for the first
   # time.
   if os.path.exists(tokenPath):
       creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
   # If there are no (valid) credentials available, let the user log in.
   if not creds or not creds.valid:
       if creds and creds.expired and creds.refresh_token:
           creds.refresh(Request())
       else:
           flow = InstalledAppFlow.from_client_secrets_file(
               credentialsPath, SCOPES)
           creds = flow.run_local_server(port=0)
       # Save the credentials for the next run
       with open(tokenPath, 'w') as token:
           token.write(creds.to_json())
           
   service = build('gmail', 'v1', credentials=creds)
   return service


#ここからやりたい処理を書く
service = gmail_init()

#quickstart.pyと同じ処理にしてみた
results = service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])

if not labels:
   print('No labels found.')
else:
   print('Labels:')
   for label in labels:
       print(label['name'])
       
gmail_display_label(service)

として

python base.py

を実行すると
ラベルとID一覧が表示される

これでラベルIDがわかったので本文を取得するようにコード変更

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# メール本文を取得する関数
def gmail_get_messages_body(service, labelIdsValue):
    mailBody = []
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute()

    for msg in msg_list['messages']:
        date = gmail_get_messages_body_date(messages, msg)
        topid = msg['id']
        msg = messages.get(userId='me', id=topid).execute()

        # メールの本文を取得
        if msg["payload"]["body"]["size"] != 0:
            mailBody.append(date + "<br>" + decode(msg["payload"]["body"]["data"]))
        elif 'parts' in msg["payload"]:
            # メール本文が parts 属性にある場合
            for part in msg["payload"]["parts"]:
                if part["body"]["size"] != 0:
                    mailBody.append(date + "<br>" + decode(part["body"]["data"]))
                    break
        else:
            mailBody.append(date + "<br> No body content")

    return mailBody

# メールの受信日時を取得する関数
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# ラベルの表示関数(変更なし)
def gmail_display_label(service):
    results = service.users().labels().list(userId='me').execute()
    labels = results.get('labels', [])

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()
gmail_display_label(service)

# ラベル ID を指定してメール本文を取得
mail_bodies = gmail_get_messages_body(service, "Label_4")
for body in mail_bodies:
    print(body)

これで

python base.py

とすると
Lavel_4のIDのメールの本文を表示することができる

顔認識して天気を答える

顔認識して天気を答える

Weather map api で現在地の天気を取得し
Voicevox へ curl で送って音声を作成し
Pygame でしゃべるまではできたので

次にこれを関数にしてkao.pyで実行するようにする

まずはubuntuへ転送して実験する

pip install pygame
pip install --upgrade deepl

で足りないものを入れる

そして

scp weather_* snowpool@192.168.1.69:/home/snowpool/aw10s/

でコピーして

python weather_voice.py

を実行するとubuntu でもできた

次に、このweather_voide.pyのコードを関数にする

現在の天気を取得するコードは

import requests
import json
from datetime import datetime
import deepl

def get_weather_forecast(latitude, longitude, API_key, deepl_auth_key):
    # DeepL Translatorのインスタンスを生成
    translator = deepl.Translator(deepl_auth_key)

    # OpenWeather APIのURL
    url = "https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude=hourly,minutely&units=metric&lang=ja&appid={API_key}"
    url = url.format(lat=latitude, lon=longitude, API_key=API_key)

    # APIリクエスト
    response = requests.get(url)
    jsondata = response.json()

    # 今日の日付を取得
    today = datetime.now().date()

    # 今日の天気予報を探す
    for daily_forecast in jsondata["daily"]:
        date = datetime.fromtimestamp(daily_forecast["dt"]).date()
        if date == today:
            min_temp = daily_forecast["temp"]["min"]
            max_temp = daily_forecast["temp"]["max"]
            weather = daily_forecast["weather"][0]["main"]
            description = daily_forecast["weather"][0]["description"]
            break

    # 天気をdeeplで日本語に翻訳
    weather_japanese = translator.translate_text(weather, target_lang="JA").text

    # 今日の天気予報をまとめる
    today_weather_repo = f"今日の天気は{weather_japanese}、予想最高気温は{max_temp}度、予想最低気温は{min_temp}度です"
    return today_weather_repo

# 関数を使用して天気予報を取得
latitude = "緯度"
longitude = "経度"
API_key = "open weather map APIキー"
deepl_auth_key = "deepLのAPIキー"

weather_report = get_weather_forecast(latitude, longitude, API_key, deepl_auth_key)

# 天気予報をテキストファイルに保存
with open('weather.txt', 'w') as file:
    file.write(weather_report)

これは
weather_forecast.py
にまとめてある

次に
weather_voice.py
をほかで呼べるように関数にする

現状は

import subprocess
import pygame
import time
from weather_forecast import get_weather_forecast

# 天気予報を取得してファイルに保存
latitude = "緯度"
longitude = "経度"
API_key = "open weather map API キー"
deepl_auth_key = "deepLのAPIキー"

weather_report = get_weather_forecast(latitude, longitude, API_key, deepl_auth_key)

with open('weather.txt', 'w') as file:
    file.write(weather_report)

# JSONファイルを作成するためのcurlコマンド
command_json = [
    "curl", "-s", "-X", "POST",
    "192.168.1.69:50021/audio_query?speaker=1",
    "--get", "--data-urlencode", "text@weather.txt"
]

# 音声ファイルを作成するためのcurlコマンド
command_audio = [
    "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
    "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
]

# 最初のコマンドを実行してJSONファイルを作成
with open('query.json', 'w') as file:
    subprocess.run(command_json, stdout=file)

# 第二のコマンドを実行して音声ファイルを作成
with open('test_audio.wav', 'wb') as file:
    subprocess.run(command_audio, stdout=file)

# Pygameの初期化
pygame.init()
pygame.mixer.init()

# WAVファイルを読み込む
sound = pygame.mixer.Sound("test_audio.wav")

# 再生
sound.play()

# 再生が終了するまで待機
while pygame.mixer.get_busy():
    time.sleep(0.1)

これを関数にする

Chatgpt の答えは

import subprocess
import pygame
import time
from weather_forecast import get_weather_forecast

def generate_and_play_weather_report(latitude, longitude, API_key, deepl_auth_key):
    # 天気予報を取得してファイルに保存
    weather_report = get_weather_forecast(latitude, longitude, API_key, deepl_auth_key)
    with open('weather.txt', 'w') as file:
        file.write(weather_report)

    # JSONファイルを作成するためのcurlコマンド
    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", "text@weather.txt"
    ]

    # 音声ファイルを作成するためのcurlコマンド
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]

    # JSONファイルと音声ファイルを作成
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('test_audio.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)

    # Pygameで音声ファイルを再生
    pygame.init()
    pygame.mixer.init()
    sound = pygame.mixer.Sound("test_audio.wav")
    sound.play()
    while pygame.mixer.get_busy():
        time.sleep(0.1)

変更点を確認しておく

インポート部分は変化なし

import subprocess
import pygame
import time
from weather_forecast import get_weather_forecast


# 天気予報を取得してファイルに保存
latitude = "34.745755"
longitude = "137.91146"
API_key = "open weather map のAPIキー"
deepl_auth_key = "deepLのAPIキー"

の部分を削除しkao.pyへ移動させる

また

import weather_voice

をkao,oyへ追加

そしてprintぶんの下へ

weather_voice.generate_and_play_weather_report(latitude, longitude, API_key, deepl_auth_key)

を追記して保存

実験のため
VNC接続して実行

ubuntuで

tigervncserver -xstartup /usr/bin/gnome-session -geometry 800x600 -localhost no :1 

を実行後

Mac の場合
Finder から
移動 > サーバーに接続で

vnc://192.168.1.69:5901

で接続

これで実験すると顔認識した時に今日の天気を教えてくれるようになった

次は画面表示せずに処理するようにコードを書き換える

これは単純にishowの画面表示部分と
認識した部分を枠で囲む部分を削除するだけでOK

import cv2
import time

import weather_voice

# 天気予報を取得してファイルに保存
latitude = "緯度"
longitude = "経度"
API_key = "open weather map APIキー"
deepl_auth_key = "deepLのAPIキー"

# Haar Cascade分類器の読み込み
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
eye_cascade = cv2.CascadeClassifier('haarcascade_eye.xml')

# Webカメラの設定
cap = cv2.VideoCapture(0)  # 0番目のカメラを使用する場合

# 最後の顔検出時刻
lastTime = None

# メインループ
while True:


    # カメラからのフレームの取得
    ret, frame = cap.read()
    
    # フレームのグレースケール化
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 顔の検出
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    # 検出された顔に対する処理
    for (x, y, w, h) in faces:
        # 検出自の処理(検出から1分たったら再度イベント動かす
        if lastTime is None or time.perf_counter() - lastTime > 60:
            # 検出時刻更新
            lastTime = time.perf_counter()
            print("人間発見、警戒せよw")
            weather_voice.generate_and_play_weather_report(latitude, longitude, API_key, deepl_auth_key)
        
        #画像を表示する場合
        #cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
        #roi_gray = gray[y:y+h, x:x+w]
        #roi_color = frame[y:y+h, x:x+w]
        # 以下は目もマークする場合
        # eyes = eye_cascade.detectMultiScale(roi_gray)
        # for (ex, ey, ew, eh) in eyes:
        #     cv2.rectangle(roi_color, (ex, ey), (ex+ew, ey+eh), (255, 0, 0), 2)

    
    # 結果の表示
    #cv2.imshow('Facial Feature Detection', frame)
    
    # 終了のキー入力
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 後処理
cap.release()
cv2.destroyAllWindows()

とすればOK

あとは終了する方法が
Ctrl +c 以外にないので
他に終了させる方法があるか考える

それとスペック不足のためか
顔認識してから音声が出るまでに時間がかかる

とりあえず不要部分を削除したのが

import cv2
import time

import weather_voice

# 天気予報を取得してファイルに保存
latitude = "34.745755"
longitude = "137.91146"
API_key = "1082c12d65462d76f7dd1b7ef93c7849"
deepl_auth_key = "5f169e4d-3701-9eff-08fc-bf6065b64c8f:fx"

# Haar Cascade分類器の読み込み
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
eye_cascade = cv2.CascadeClassifier('haarcascade_eye.xml')

# Webカメラの設定
cap = cv2.VideoCapture(0)  # 0番目のカメラを使用する場合

# 最後の顔検出時刻
lastTime = None

# メインループ
while True:


    # カメラからのフレームの取得
    ret, frame = cap.read()
    
    # フレームのグレースケール化
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 顔の検出
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    # 検出された顔に対する処理
    for (x, y, w, h) in faces:
        # 検出自の処理(検出から1分たったら再度イベント動かす
        if lastTime is None or time.perf_counter() - lastTime > 60:
            # 検出時刻更新
            lastTime = time.perf_counter()
            print("今日の天気を顔認識したのでお知らせ")
            weather_voice.generate_and_play_weather_report(latitude, longitude, API_key, deepl_auth_key)
        

# 後処理
cap.release()
cv2.destroyAllWindows()

次にセキュリティのため

vim config.ini

でファイルを作り

[API_KEYS]
OPENWEATHER_API_KEY = open weather map API キー
DEEPL_AUTH_KEY = deepLのAPIキー

というようにしてキーを設定ファイルから読み込むようにする

なおキーを”” で囲むとバグるので注意

あとは
Pythonスクリプトでconfigparserモジュールを使用して設定ファイルから情報を読み込む

import configparser
# 設定ファイルを読み込む
config = configparser.ConfigParser()
config.read('config.ini')

# APIキーを取得
API_key = config['API_KEYS']['OPENWEATHER_API_KEY']
deepl_auth_key = config['API_KEYS']['DEEPL_AUTH_KEY']

これを使うようにコードを変更する

import cv2
import time

import configparser
import weather_voice

# 天気予報を取得してファイルに保存
latitude = "緯度"
longitude = "経度"

# 設定ファイルを読み込む
config = configparser.ConfigParser()
config.read('config.ini')


# APIキーを取得
API_key = config['API_KEYS']['OPENWEATHER_API_KEY']
deepl_auth_key = config['API_KEYS']['DEEPL_AUTH_KEY']

# Haar Cascade分類器の読み込み
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')
eye_cascade = cv2.CascadeClassifier('haarcascade_eye.xml')

# Webカメラの設定
cap = cv2.VideoCapture(0)  # 0番目のカメラを使用する場合

# 最後の顔検出時刻
lastTime = None

# メインループ
while True:


    # カメラからのフレームの取得
    ret, frame = cap.read()
    
    # フレームのグレースケール化
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 顔の検出
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    # 検出された顔に対する処理
    for (x, y, w, h) in faces:
        # 検出自の処理(検出から1分たったら再度イベント動かす
        if lastTime is None or time.perf_counter() - lastTime > 60:
            # 検出時刻更新
            lastTime = time.perf_counter()
            print("今日の天気を顔認識したのでお知らせ")
            weather_voice.generate_and_play_weather_report(latitude, longitude, API_key, deepl_auth_key)
        

# 後処理
cap.release()
cv2.destroyAllWindows()

とりあえずこれで完成
どのみちバックグランドで動作させるので
停止方法は保留

あとは GitHub へコードをアップする

Pythonでコマンドラインの実行

Pythonでコマンドラインの実行

あとはコマンドラインをpythonで実行するメソッドが必要

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@weather.txt > query.json

でJSONファイルを作成

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > test_audio.wav

で音声ファイルを作成

これを
Pythonで行うようにする

なおcurl の動作をrequest で行えるらしい

pip install requests simpleaudio

でインストール

Chatgpt で

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@weather.txt > query.json
でJSONファイルを作成

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > test_audio.wav
で音声ファイルを作成

これらを python で行い
作成した音声ファイルを再生したい

で調べる

帰ってきた答えは

requests ライブラリを使用して、テキストファイルからJSONファイルを作成し、その後音声ファイルを作成します。次に、pydub や simpleaudio のようなライブラリを使用して音声ファイルを再生

とのこと

生成されたコードは

import requests
import json
import simpleaudio as sa

# テキストファイルを読み込む
with open('weather.txt', 'r') as file:
    text = file.read()

# 最初のPOSTリクエストを行い、JSONを取得
response = requests.post(
    "http://192.168.1.69:50021/audio_query?speaker=1",
    data={'text': text}
)
json_data = response.json()

# JSONデータをファイルに保存
with open('query.json', 'w') as json_file:
    json.dump(json_data, json_file)

# 第二のPOSTリクエストを行い、音声データを取得
response = requests.post(
    "http://192.168.1.69:50021/synthesis?speaker=1",
    json=json_data
)

# 取得した音声データをWAVファイルとして保存
with open('test_audio.wav', 'wb') as audio_file:
    audio_file.write(response.content)

# WAVファイルを読み込み、再生する
wave_obj = sa.WaveObject.from_wave_file('test_audio.wav')
play_obj = wave_obj.play()
play_obj.wait_done()  # 再生が終わるまで待機

これを実験で実行

しかしエラーとなる
原因を調べたら

ChatGPT
このエラーメッセージは、simpleaudio モジュールがあなたのシステムのアーキテクチャと互換性がないことを示しています。エラーによると、インストールされたsimpleaudioはx86_64アーキテクチャ用ですが、あなたのシステムはarm64アーキテクチャを必要

とあるため
pygame や pydub などを使う必要がある

pip install requests pygame

の後に

import requests
import json
import pygame

# テキストファイルの内容を読み込む
with open('weather.txt', 'r') as file:
    text = file.read()

# POSTリクエストでJSONデータを取得
response = requests.post(
    "http://192.168.1.69:50021/audio_query?speaker=1",
    data={'text': text}
)
json_data = response.json()

# JSONデータをファイルに保存
with open('query.json', 'w') as json_file:
    json.dump(json_data, json_file)

# JSONファイルを読み込む
with open('query.json', 'r') as json_file:
    json_payload = json.load(json_file)

# POSTリクエストで音声ファイルを取得
response = requests.post(
    "http://192.168.1.69:50021/synthesis?speaker=1",
    json=json_payload
)

# 音声データをファイルに保存
with open('test_audio.wav', 'wb') as audio_file:
    audio_file.write(response.content)

# Pygameの初期化
pygame.init()
pygame.mixer.init()

# WAVファイルを読み込む
sound = pygame.mixer.Sound("test_audio.wav")

# 再生
sound.play()

# 再生が終了するまで待機
while pygame.mixer.get_busy():
    pygame.time.Clock().tick(10)

で実行したが

pygame 2.5.2 (SDL 2.28.3, Python 3.10.6)
Hello from the pygame community. https://www.pygame.org/contribute.html
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/weather/weather_audio.py", line 39, in <module>
    sound = pygame.mixer.Sound("test_audio.wav")
pygame.error: Unrecognized audio format

となる

とりあえず問題を分割
音声の再生ができるかテスト

cp ../test_audio.wav .

あとは

import pygame

# Pygameの初期化
pygame.init()
pygame.mixer.init()

# WAVファイルを読み込む
sound = pygame.mixer.Sound("test_audio.wav")

# 再生
sound.play()

# 再生が終了するまで待機
while pygame.mixer.get_busy():
    pygame.time.Clock().tick(10)

で音声が再生されるのを確認

となると問題は音声ファイルの作成プロセス

import requests
import json

# テキストファイルを読み込む
with open('weather.txt', 'r') as file:
    text = file.read()

# 最初のPOSTリクエストを行い、JSONを取得
response = requests.post(
    "http://192.168.1.69:50021/audio_query?speaker=1",
    data={'text': text}
)
json_data = response.json()

# JSONデータをファイルに保存
with open('query.json', 'w') as json_file:
    json.dump(json_data, json_file)

# 第二のPOSTリクエストを行い、音声データを取得
response = requests.post(
    "http://192.168.1.69:50021/synthesis?speaker=1",
    json=json_data
)

# 取得した音声データをWAVファイルとして保存
with open('test_audio.wav', 'wb') as audio_file:
    audio_file.write(response.content)

で作成したファイルは音声が再生されない

作成に必要なJSONファイルを見てみると
失敗の方は

{"detail": [{"loc": ["query", "text"], "msg": "field required", "type": "value_error.missing"}]}

成功は

{"accent_phrases":[{"moras":[{"text":"キョ","consonant":"ky","consonant_length":0.12866447865962982,"vowel":"o","vowel_length":0.0973580852150917,"pitch":5.962612628936768},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.09024209529161453,"pitch":6.068655967712402},{"text":"ノ","consonant":"n","consonant_length":0.05692561715841293,"vowel":"o","vowel_length":0.11087840050458908,"pitch":5.8726630210876465}],"accent":1,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"テ","consonant":"t","consonant_length":0.0718907043337822,"vowel":"e","vowel_length":0.14058615267276764,"pitch":5.855612277984619},{"text":"ン","consonant":null,"consonant_length":null,"vowel":"N","vowel_length":0.08981689810752869,"pitch":5.842429161071777},{"text":"キ","consonant":"k","consonant_length":0.07256065309047699,"vowel":"i","vowel_length":0.07487940043210983,"pitch":5.817480087280273},{"text":"ワ","consonant":"w","consonant_length":0.057370081543922424,"vowel":"a","vowel_length":0.12027011066675186,"pitch":5.624467849731445}],"accent":1,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"ク","consonant":"k","consonant_length":0.07742901891469955,"vowel":"u","vowel_length":0.0734512135386467,"pitch":5.836529731750488},{"text":"モ","consonant":"m","consonant_length":0.060555391013622284,"vowel":"o","vowel_length":0.18545563519001007,"pitch":5.841731071472168}],"accent":1,"pause_mora":{"text":"、","consonant":null,"consonant_length":null,"vowel":"pau","vowel_length":0.33395129442214966,"pitch":0.0},"is_interrogative":false},{"moras":[{"text":"ヨ","consonant":"y","consonant_length":0.10122014582157135,"vowel":"o","vowel_length":0.08855406194925308,"pitch":5.50715970993042},{"text":"ソ","consonant":"s","consonant_length":0.09714090079069138,"vowel":"o","vowel_length":0.09897967427968979,"pitch":5.924524784088135},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.10465160012245178,"pitch":6.014603614807129},{"text":"サ","consonant":"s","consonant_length":0.09610642492771149,"vowel":"a","vowel_length":0.11944571882486343,"pitch":6.102450370788574},{"text":"イ","consonant":null,"consonant_length":null,"vowel":"i","vowel_length":0.08921179920434952,"pitch":6.081024169921875},{"text":"コ","consonant":"k","consonant_length":0.07877751439809799,"vowel":"o","vowel_length":0.08582849055528641,"pitch":6.140597343444824},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.11994349211454391,"pitch":6.1246795654296875},{"text":"キ","consonant":"k","consonant_length":0.08878674358129501,"vowel":"i","vowel_length":0.09810564666986465,"pitch":6.173953533172607},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.16754235327243805,"pitch":6.181836128234863},{"text":"ン","consonant":null,"consonant_length":null,"vowel":"N","vowel_length":0.06746162474155426,"pitch":6.051656723022461},{"text":"ワ","consonant":"w","consonant_length":0.04867963492870331,"vowel":"a","vowel_length":0.1127525344491005,"pitch":5.779387950897217}],"accent":8,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"ジュ","consonant":"j","consonant_length":0.11186956614255905,"vowel":"u","vowel_length":0.10070556402206421,"pitch":5.609426498413086},{"text":"ウ","consonant":null,"consonant_length":null,"vowel":"u","vowel_length":0.08927937597036362,"pitch":5.8536553382873535}],"accent":2,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"ゴ","consonant":"g","consonant_length":0.06749280542135239,"vowel":"o","vowel_length":0.09443604201078415,"pitch":6.002788543701172},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.12283030897378922,"pitch":6.167281150817871},{"text":"テ","consonant":"t","consonant_length":0.07367654889822006,"vowel":"e","vowel_length":0.14059318602085114,"pitch":6.100039958953857},{"text":"ン","consonant":null,"consonant_length":null,"vowel":"N","vowel_length":0.08243037015199661,"pitch":5.868035316467285}],"accent":1,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"サ","consonant":"s","consonant_length":0.08632750064134598,"vowel":"a","vowel_length":0.1564868539571762,"pitch":5.76937198638916},{"text":"ン","consonant":null,"consonant_length":null,"vowel":"N","vowel_length":0.0757487341761589,"pitch":5.765336036682129}],"accent":2,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"ゴ","consonant":"g","consonant_length":0.05643042176961899,"vowel":"o","vowel_length":0.09696970134973526,"pitch":5.819827079772949},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.10339736193418503,"pitch":5.959120273590088},{"text":"ド","consonant":"d","consonant_length":0.06090632826089859,"vowel":"o","vowel_length":0.17854683101177216,"pitch":5.801456451416016}],"accent":2,"pause_mora":{"text":"、","consonant":null,"consonant_length":null,"vowel":"pau","vowel_length":0.3034582734107971,"pitch":0.0},"is_interrogative":false},{"moras":[{"text":"ヨ","consonant":"y","consonant_length":0.09282273054122925,"vowel":"o","vowel_length":0.08764959871768951,"pitch":5.5444231033325195},{"text":"ソ","consonant":"s","consonant_length":0.09865055978298187,"vowel":"o","vowel_length":0.09965776652097702,"pitch":5.934866905212402},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.10340947657823563,"pitch":6.015321731567383},{"text":"サ","consonant":"s","consonant_length":0.09463881701231003,"vowel":"a","vowel_length":0.11840283870697021,"pitch":6.101940155029297},{"text":"イ","consonant":null,"consonant_length":null,"vowel":"i","vowel_length":0.08852870017290115,"pitch":6.076019287109375},{"text":"テ","consonant":"t","consonant_length":0.0689489021897316,"vowel":"e","vowel_length":0.09846750646829605,"pitch":6.127346038818359},{"text":"エ","consonant":null,"consonant_length":null,"vowel":"e","vowel_length":0.1237698420882225,"pitch":6.114901542663574},{"text":"キ","consonant":"k","consonant_length":0.08733474463224411,"vowel":"i","vowel_length":0.0968116894364357,"pitch":6.151494026184082},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.1778039187192917,"pitch":6.152987480163574},{"text":"ン","consonant":null,"consonant_length":null,"vowel":"N","vowel_length":0.064981609582901,"pitch":6.005949974060059},{"text":"ワ","consonant":"w","consonant_length":0.04751142859458923,"vowel":"a","vowel_length":0.09467842429876328,"pitch":5.725627899169922}],"accent":8,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"ロ","consonant":"r","consonant_length":0.0632673129439354,"vowel":"o","vowel_length":0.10540910810232162,"pitch":5.67587947845459},{"text":"ク","consonant":"k","consonant_length":0.06531907618045807,"vowel":"U","vowel_length":0.05817136913537979,"pitch":0.0},{"text":"テ","consonant":"t","consonant_length":0.07730791717767715,"vowel":"e","vowel_length":0.16715875267982483,"pitch":6.10499382019043},{"text":"ン","consonant":null,"consonant_length":null,"vowel":"N","vowel_length":0.06785128265619278,"pitch":5.860898494720459}],"accent":1,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"ロ","consonant":"r","consonant_length":0.03526639938354492,"vowel":"o","vowel_length":0.11113513261079788,"pitch":5.707563877105713},{"text":"ク","consonant":"k","consonant_length":0.07907760888338089,"vowel":"u","vowel_length":0.07053706794977188,"pitch":5.830077648162842}],"accent":2,"pause_mora":null,"is_interrogative":false},{"moras":[{"text":"ゴ","consonant":"g","consonant_length":0.072600357234478,"vowel":"o","vowel_length":0.09719936549663544,"pitch":5.857402324676514},{"text":"オ","consonant":null,"consonant_length":null,"vowel":"o","vowel_length":0.09454251080751419,"pitch":6.060022354125977},{"text":"ド","consonant":"d","consonant_length":0.05659743398427963,"vowel":"o","vowel_length":0.08684452623128891,"pitch":6.083237171173096},{"text":"デ","consonant":"d","consonant_length":0.05277804285287857,"vowel":"e","vowel_length":0.14346836507320404,"pitch":5.951061248779297},{"text":"ス","consonant":"s","consonant_length":0.07407647371292114,"vowel":"U","vowel_length":0.12432015687227249,"pitch":0.0}],"accent":2,"pause_mora":null,"is_interrogative":false}],"speedScale":1.0,"pitchScale":0.0,"intonationScale":1.0,"volumeScale":1.0,"prePhonemeLength":0.1,"postPhonemeLength":0.1,"outputSamplingRate":24000,"outputStereo":false,"kana":"キョ'オノ/テ'ンキワ/ク'モ、ヨソオサイコオキ'オンワ/ジュウ'/ゴ'オテン/サン'/ゴオ'ド、ヨソオサイテエキ'オンワ/ロ'_クテン/ロク'/ゴオ'ドデ_ス"}

となる

つまりJSON作成が失敗している

Request による生成を断念
代わりに subprocess によるLinux コマンドを実行する方法に買える

import subprocess

# curl コマンドを定義
command = [
    "curl", "-s", "-X", "POST",
    "192.168.1.69:50021/audio_query?speaker=1",
    "--get", "--data-urlencode", "text@weather.txt"
]

# コマンドを実行し、出力を query.json にリダイレクト
with open('query.json', 'w') as file:
    subprocess.run(command, stdout=file)

でサブプロセスを使う方法にすると
JSONファイルの作成が成功

これを

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > test_audio.wav

とすると音声ファイルができているのを確認

次に一緒に音声ファイル作成までをpython でやるようにする

import subprocess

# 最初のcurlコマンド(JSONファイルの作成)
command_json = [
    "curl", "-s", "-X", "POST",
    "192.168.1.69:50021/audio_query?speaker=1",
    "--get", "--data-urlencode", "text@weather.txt"
]

# 第二のcurlコマンド(音声ファイルの作成)
command_audio = [
    "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
    "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
]

# 最初のコマンドを実行してJSONファイルを作成
with open('query.json', 'w') as file:
    subprocess.run(command_json, stdout=file)

# 第二のコマンドを実行して音声ファイルを作成
with open('test_audio.wav', 'wb') as file:
    subprocess.run(command_audio, stdout=file)

これで音声ファイルができているのを確認できたので
次に
これをpygame. で再生する

import subprocess
import pygame
import time

# JSONファイルを作成するためのcurlコマンド
command_json = [
    "curl", "-s", "-X", "POST",
    "192.168.1.69:50021/audio_query?speaker=1",
    "--get", "--data-urlencode", "text@weather.txt"
]

# 音声ファイルを作成するためのcurlコマンド
command_audio = [
    "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
    "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
]

# 最初のコマンドを実行してJSONファイルを作成
with open('query.json', 'w') as file:
    subprocess.run(command_json, stdout=file)

# 第二のコマンドを実行して音声ファイルを作成
with open('test_audio.wav', 'wb') as file:
    subprocess.run(command_audio, stdout=file)

# Pygameの初期化
pygame.init()
pygame.mixer.init()

# WAVファイルを読み込む
sound = pygame.mixer.Sound("test_audio.wav")

# 再生
sound.play()

# 再生が終了するまで待機
while pygame.mixer.get_busy():
    time.sleep(0.1)

これでようやくcurl でファイルを作成し
Pygame で音声再生ができるようになった

次はテキストファイルではなく
Weather map api へアクセスし
その結果をテキストファイルへ保存
それを変換する
もしくはそのまま処理するようにする

pygame で読み上げ

一度に全て処理しpygame で読み上げ

wavファイルの読み上げを gygame にすることでエラー対処

import requests
import json
from datetime import datetime
import deepl

def get_weather_forecast(latitude, longitude, API_key, deepl_auth_key):
    # DeepL Translatorのインスタンスを生成
    translator = deepl.Translator(deepl_auth_key)

    # OpenWeather APIのURL
    url = "https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude=hourly,minutely&units=metric&lang=ja&appid={API_key}"
    url = url.format(lat=latitude, lon=longitude, API_key=API_key)

    # APIリクエスト
    response = requests.get(url)
    jsondata = response.json()

    # 今日の日付を取得
    today = datetime.now().date()

    # 今日の天気予報を探す
    for daily_forecast in jsondata["daily"]:
        date = datetime.fromtimestamp(daily_forecast["dt"]).date()
        if date == today:
            min_temp = daily_forecast["temp"]["min"]
            max_temp = daily_forecast["temp"]["max"]
            weather = daily_forecast["weather"][0]["main"]
            description = daily_forecast["weather"][0]["description"]
            break

    # 天気をdeeplで日本語に翻訳
    weather_japanese = translator.translate_text(weather, target_lang="JA").text

    # 今日の天気予報をまとめる
    today_weather_repo = f"今日の天気は{weather_japanese}、予想最高気温は{max_temp}度、予想最低気温は{min_temp}度です"
    return today_weather_repo

# 関数を使用して天気予報を取得
latitude = "緯度"
longitude = "経度"
API_key = "weather map apiキー"
deepl_auth_key = "deeplのAPIキー"

weather_report = get_weather_forecast(latitude, longitude, API_key, deepl_auth_key)

# 天気予報をテキストファイルに保存
with open('weather.txt', 'w') as file:
    file.write(weather_report)

として関数にして
weather_forecast.py
として保存

次に

import subprocess
import pygame
import time
from weather_forecast import get_weather_forecast

# 天気予報を取得してファイルに保存
latitude = "緯度"
longitude = "経度"
API_key = "weather map apiキー"
deepl_auth_key = "DeepL APIキー"

weather_report = get_weather_forecast(latitude, longitude, API_key, deepl_auth_key)

with open('weather.txt', 'w') as file:
    file.write(weather_report)

# JSONファイルを作成するためのcurlコマンド
command_json = [
    "curl", "-s", "-X", "POST",
    "192.168.1.69:50021/audio_query?speaker=1",
    "--get", "--data-urlencode", "text@weather.txt"
]

# 音声ファイルを作成するためのcurlコマンド
command_audio = [
    "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
    "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
]

# 最初のコマンドを実行してJSONファイルを作成
with open('query.json', 'w') as file:
    subprocess.run(command_json, stdout=file)

# 第二のコマンドを実行して音声ファイルを作成
with open('test_audio.wav', 'wb') as file:
    subprocess.run(command_audio, stdout=file)

# Pygameの初期化
pygame.init()
pygame.mixer.init()

# WAVファイルを読み込む
sound = pygame.mixer.Sound("test_audio.wav")

# 再生
sound.play()

# 再生が終了するまで待機
while pygame.mixer.get_busy():
    time.sleep(0.1)

として作成したファイルを元に
Subprocess で
Curl で音声ファイルを作成

それをpygame で読み上げるようにした

とりあえず動作確認はできたので
次に顔認識と合わせてみる

現在地の天気を音声にする

現在地の天気を音声にする

1日の天気を取得するには座標取得が必要
Google で現在地の緯度経度を調べ

for daily_forecast in jsondata["daily"]:
    date = datetime.fromtimestamp(daily_forecast["dt"]).date()
    if date == today:
        min_temp = daily_forecast["temp"]["min"]
        max_temp = daily_forecast["temp"]["max"]
        weather = daily_forecast["weather"][0]["main"]
        description = daily_forecast["weather"][0]["description"]
        break

というように
OpenWeather APIの「Daily Forecast」エンドポイントを使用

ただしそのままだと天気は英語になる

OpenWeather APIで日本語の天気情報を取得するには、APIリクエストにlangパラメータを追加してjaを追加

リクエストURLを

url = "https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude=hourly,minutely&units=metric&lang=ja&appid={API_key}"

とする

これで詳細な天気は日本語になったけど
天気そのものは英語のまま

辞書を作成してもいいけど
おそらく今後天気を追加していくことになるので
面倒なので
DeepLを使って翻訳する

import deepl

auth_key = "取得したDeepLのAPIキー"  # Replace with your key
translator = deepl.Translator(auth_key)

あとは

#天気をdeeplで英語から日本語にする

weather_japanese = translator.translate_text(weather,target_lang="JA")
# 結果の表示
print(f"今日の最高気温: {max_temp}°C")
print(f"今日の最低気温: {min_temp}°C")
print(f"天気: {weather_japanese} ({description})")

とすれば

python current_weather.py
今日の最高気温: 11.26°C
今日の最低気温: 3.72°C
天気: 雲 (薄い雲)

となる

あとはこれをテキストにする

# 今日の天気予報をまとめる
today_weather_repo = f"今日の天気は{weather_japanese}、予想最高気温は{max_temp}°C、予想最低気温は{min_temp}°C です"

# 結果の表示
print(today_weather_repo)

pythonの場合
テキスト内に変数を入れるなら{変数名}で組み込むことができる

これで文章はできたので
次にdockerで voicevox をバックグランドで起動し
ここへテキストを送り音声を作成する

とりあえず

docker run --rm -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest

で前回起動しているが

docker run –rm -p
はDockerコンテナを実行する際に使用されるオプション

これらのオプションは以下のような意味を持ち
–rm:
このオプションは、コンテナの実行が終了した後にコンテナを自動的に削除するようDockerに指示
通常、コンテナは停止してもファイルシステムが保持され、
後でdocker startコマンドで再開することができるが、
–rmオプションがあると、コンテナが停止したときにそのファイルシステムも一緒に削除される
一時的なテストや一回きりの実行に便利

-p:
-pオプションはポートマッピングを設定するために使用
このオプションはホストのポートとコンテナのポートを紐
形式は -p ホストのポート:コンテナのポート
例えば、-p 8080:80 と指定すると、ホストの8080ポートがコンテナの80ポートにマッピングされる

これにより、ホストの8080ポートにアクセスすると、そのリクエストがコンテナの80ポートに転送される

これらのオプションを組み合わせることで、特定のポートを公開しながら
コンテナの使用が終了した後には自動的に削除するように設定することができる

例えば、docker run –rm -p 8080:80 nginx というコマンドは
nginxサーバーを実行するコンテナを作成し
ホストの8080ポートをコンテナの80ポートにマッピングする
そして、コンテナの実行が終了すると、そのコンテナは自動的に削除される

これだとコンテナが消える

やりたいのはバックグランドでvoicevoxの起動なので

docker pull voicevox/voicevox_engine:cpu-ubuntu20.04-latest

でコンテナ取得

バックグランドでの起動は
-d オプションをつければいいので

docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest

でバックグランドで起動

次にMac で

 echo -h "今日の天気は雲、予想最高気温は11.26°C、予想最低気温は3.72°C です" > weather.txt

でテキストを作成

一度停電になり停止したため
再度ubuntu の起動と

docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest


バックグランドでdocker の起動

再度

cat weather.txt 
-h 今日の天気は雲、予想最高気温は11.26°C、予想最低気温は3.72°C です

で調べたら
余計なものが入っていた

過去ログを見たら
-n オプションの間違いだった

とりあえずファイルを編集し

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@weather.txt > query.json

でJSONファイルを作成

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > test_audio.wav

で音声ファイルを作成

Mac でFinder から再生すると音声ができていて音声の再生も問題なく実行できたが
Cを度ではなくシーとなったり
曇はくも
と発音される

次にファイルをubuntu にコピーして
Ubuntu から docker で実行できるかテスト

scp weather.txt snowpool@192.168.1.69:/home/snowpool/aw10s/

でコピー

Ubuntu へログインし

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@weather.txt > query.json

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > test_audio.wav

次にリモートで音声ファイルを再生
Aplayを使うことで音声再生が可能

aplay test_audio.wav

しかし
これだとパソコン本体にログインしていないと再生されない

目的はリモート接続での音声再生またはcronなどでのログインしていない除隊でも音声を再生させること

ラズパイで過去にやったような気がするので検索

https://www.sato-susumu.com/entry/2023/01/06/203541#pygamemixerSoundでwav再生
を参考に
mpg123
で再生

sudo apt install mpg123

でインストール

 mpg123 test_audio.wav

を実行したが

High Performance MPEG 1.0/2.0/2.5 Audio Player for Layers 1, 2 and 3
	version 1.29.3; written and copyright by Michael Hipp and others
	free software (LGPL) without any warranty but with best wishes


Terminal control enabled, press 'h' for listing of keys and functions.

Playing MPEG stream 1 of 1: test_audio.wav ...

MPEG 1.0 L I cbr865 44100 stereo
[src/libmpg123/layer1.c:check_balloc():30] error: Illegal bit allocation value.
[src/libmpg123/layer1.c:INT123_do_layer1():228] error: Aborting layer I decoding after step one.
Note: Illegal Audio-MPEG-Header 0xf6fff3ff at offset 5272.
Note: Trying to resync...
Note: Skipped 96 bytes in input.
[src/libmpg123/layer1.c:check_balloc():30] error: Illegal bit allocation value.
[src/libmpg123/layer1.c:INT123_do_layer1():228] error: Aborting layer I decoding after step one.
> 002+612  00:00.01+00:05.32 --- 100=100   0 kb/s  942 B acc  128 clip p+0.000  
MPEG 1.0 L I cbr865 44100 stereo
Note: Illegal Audio-MPEG-Header 0xf6fff7ff at offset 6310.
Note: Trying to resync...
Note: Skipped 922 bytes in input.
[src/libmpg123/layer1.c:check_balloc():30] error: Illegal bit allocation value.
[src/libmpg123/layer1.c:INT123_do_layer1():228] error: Aborting layer I decoding after step one.
> 003+611  00:00.02+00:05.31 --- 100=100   0 kb/s  942 B acc    0 clip p+0.000  
MPEG 1.0 L I cbr865 44100 stereo
Note: Illegal Audio-MPEG-Header 0x6cffcb00 at offset 8174.
Note: Trying to resync...
Note: Skipped 809 bytes in input.

Warning: Big change from first (MPEG version, layer, rate). Frankenstein stream?
> 004+610  00:00.20+00:31.85 --- 100=100   0 kb/s  942 B acc    0 clip p+0.000  
MPEG 2.5 L III cbr144 11025 stereo

Warning: Big change (MPEG version, layer, rate). Frankenstein stream?

というようにバグった