OpenWeather API を使い現在地の今日の天気を取得

OpenWeather API を使い現在地の今日の天気を取得。
一時間以内に雨が降るなら音声で知らせるものを作る

以前
https://github.com/Snowpooll/face_weather
で作成した顔を認識したら今日の天気を知らせる
の応用で
顔を認識した時に1時間以内に雨が降るなら
音声で知らせる
を作成する

GPTで
OpenWeather API を使い現在地の今日の天気を取得。
一時間以内に雨が降るならアラートを表示するpythonコード
としたとき

import requests
import json

# OpenWeather API キーと緯度経度を設定
api_key = 'YOUR_API_KEY'
latitude = 'YOUR_LATITUDE'
longitude = 'YOUR_LONGITUDE'

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

となった

しかし
現在地をどうせなら自動で取得したいので
GPTで
現在地の緯度経度を取得するpythonコード
とすると

ipinfo.ioとNominatim ジオコーダ
2つの取得方法が提示される

なお移動しながらの位置情報取得はGPSをお勧めされた

とりあえず精度などからだと

家庭のように固定された場所の位置情報を取得する場合は、Nominatim ジオコーダを使用するのが一般的に適しています。その理由は以下の通りです:
1. 正確性:Nominatim は OpenStreetMap のデータに基づいており、具体的な住所や場所名から精密な緯度と経度を提供できます。これは家庭や建物の正確な位置を特定するのに最適です。
2. 詳細レベル:Nominatim は詳細な逆ジオコーディングを提供し、具体的な住所情報を含めることができます。これにより、特定の住所の地理的な位置を正確に把握できます。
3. 汎用性:地図上での場所検索や、特定の地点の座標を取得するために、Nominatim は広範なアプリケーションで利用可能です。地図関連の多くの機能を実装する場合にも対応しやすいです。
一方で、ipinfo.io は IP アドレスに基づいて地理的な位置を特定するサービスであり、特定の住所に対してはそれほど詳細な情報を提供することはできません。IP ベースの位置特定は、あくまでおおまかな地域情報を提供するため、家庭のように固定された具体的な位置を必要とする場合には適していません。
したがって、家庭のような固定の場所の正確な位置情報を求める場合は、Nominatim ジオコーダがより適切な選択肢と言えるでしょう。

とのこと

とりあえず

pip install geopy

でライブラリを入れる

しかし

from geopy.geocoders import Nominatim
import geopy

def get_location():
    # ジオロケーターを初期化
    geolocator = Nominatim(user_agent="geoapiExercises")
    
    try:
        # IPアドレスから位置情報を取得
        location = geolocator.geocode("現在地の住所", timeout=10)
        
        # 緯度と経度を取得
        latitude = location.latitude
        longitude = location.longitude
        
        return latitude, longitude
    
    except geopy.exc.GeocoderTimedOut:
        return "Geocoder service timed out"
    
    except AttributeError:
        return "Location not found"

# 現在地の緯度経度を取得
current_location = get_location()
print("Latitude:", current_location[0])
print("Longitude:", current_location[1])

を実行すると

Latitude: L
Longitude: o

となる

ご提示のコードで Latitude: L と Longitude: o という結果が出力されるのは、おそらく何らかのエラーが発生しているため、エラーメッセージの文字列を返している可能性が高いです。get_location 関数では、例外が発生した場合に文字列を返しています。しかし、それが数字のタプルではないため、current_location[0] と current_location[1] で文字列の最初の文字が表示されてしまうのです。
問題を解決するために、エラー発生時のハンドリングを少し変更し、エラーメッセージを確認できるように修正すると良いでしょう。以下に修正したコードを示します。

from geopy.geocoders import Nominatim
import geopy

def get_location():
    # ジオロケーターを初期化
    geolocator = Nominatim(user_agent="geoapiExercises")
    
    try:
        # 住所から位置情報を取得
        location = geolocator.geocode("現在地の住所", timeout=10)
        
        # 緯度と経度を取得
        if location:
            latitude = location.latitude
            longitude = location.longitude
            return (latitude, longitude)
        else:
            return "Location not found", None
    
    except geopy.exc.GeocoderTimedOut:
        return "Geocoder service timed out", None
    
    except Exception as e:
        return str(e), None

# 現在地の緯度経度を取得
current_location = get_location()

if isinstance(current_location, tuple) and None not in current_location:
    print("Latitude:", current_location[0])
    print("Longitude:", current_location[1])
else:
    print(current_location[0])  # エラーメッセージを出力

の結果は

Location not found

住所の入力方法に問題ありかと思って
https://note.com/asahi_ictrad/n/nc948388e7c72#66ef102d-ebb9-421c-9e94-ea6e86c1a063
を参考に

from geopy.geocoders import Nominatim
import geopy

def get_location():
    # ジオロケーターを初期化
    geolocator = Nominatim(user_agent="geoapiExercises")
    
    try:
        # 住所から位置情報を取得
        location = geolocator.geocode("日本、〒郵便番号 住所", timeout=10)
        
        # 緯度と経度を取得
        if location:
            latitude = location.latitude
            longitude = location.longitude
            return (latitude, longitude)
        else:
            return "Location not found", None
    
    except geopy.exc.GeocoderTimedOut:
        return "Geocoder service timed out", None
    
    except Exception as e:
        return str(e), None

# 現在地の緯度経度を取得
current_location = get_location()

if isinstance(current_location, tuple) and None not in current_location:
    print("Latitude:", current_location[0])
    print("Longitude:", current_location[1])
else:
    print(current_location[0])  # エラーメッセージを出力

としたが変わらない

このため

現在地の緯度と経度を取得するには、Pythonで geopy ライブラリを使用するのが一般的です。geopy を使って、デバイスの IP アドレスを基にして位置情報を推測する方法
を試す

pip install geopy requests

でライブラリインポート

from geopy.geocoders import Nominatim
import requests

def get_public_ip():
    # 公開 API を使用して公衆 IP アドレスを取得
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    # IP アドレスから位置情報を推定
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

# 公衆 IP アドレスを取得
public_ip = get_public_ip()

# IP アドレスから位置情報を取得
latitude, longitude = get_location_by_ip(public_ip)
print(f"Latitude: {latitude}, Longitude: {longitude}")

これなら成功

なのでこれをモジュールにして呼び出すようにする

location_utils.py

として

# location_utils.py

from geopy.geocoders import Nominatim
import requests

def get_public_ip():
    """公開 API を使用して公衆 IP アドレスを取得する関数"""
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    """IP アドレスから位置情報(緯度と経度)を推定する関数"""
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

で保存

次に

import requests
import json

# OpenWeather API キーと緯度経度を設定
api_key = 'YOUR_API_KEY'
latitude = 'YOUR_LATITUDE'
longitude = 'YOUR_LONGITUDE'

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

の中で

location_utils.py

を呼び出して緯度経度を自動取得できるようにしたい

location_utils.py モジュールから緯度と経度を取得する機能を weather_check.py に組み込むために、以下のようにコードを修正します。まず、location_utils.py が緯度と経度を直接返すようにし、その後、この情報を weather_check.py で利用して OpenWeather API に問い合わせます。

location_utils.py を以下のように更新します。このコードは緯度と経度を直接取得し、戻り値として返します

# location_utils.py

import requests

def get_public_ip():
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

def get_current_location():
    ip_address = get_public_ip()
    return get_location_by_ip(ip_address)

次に

weather_check.py

location_utils から緯度と経度を取得し、その情報を使って天気情報を取得

# weather_check.py

import requests
import location_utils

# OpenWeather API キー
api_key = 'APIキー'

# 緯度と経度を location_utils から取得
latitude, longitude = location_utils.get_current_location()
if latitude is None or longitude is None:
    print("Failed to get location.")
    exit()

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

これで実行すると

No rain expected in the next hour.

となる

しかしこれだけだと本当に合っているかわからないので
IPアドレスから住所を表示するのと
一時間毎の天気を表示するものを作る

住所を確認したが違ってた

表示された緯度経度が本来の緯度経度とずれている

他のものも試してみた

pip install geocoder

の後に

import geocoder

def get_current_location():
    # 自分の IP アドレスに基づいて現在地の位置情報を取得
    g = geocoder.ip('me')
    if g.ok:
        return g.latlng
    else:
        return None

# 現在地の緯度と経度を取得して表示
location = get_current_location()
if location:
    latitude, longitude = location
    print(f"Latitude: {latitude}, Longitude: {longitude}")
else:
    print("Unable to determine current location")

国土地理院なら

import requests
import urllib

makeUrl = "https://msearch.gsi.go.jp/address-search/AddressSearch?q="
s_quote = urllib.parse.quote('千葉県南房総市富浦町青木123-1')
response = requests.get(makeUrl + s_quote)
print(response.json()[0]["geometry"]["coordinates"])

[137.91481, 34.743805]

取り合えず住所から入力して緯度経度取得にするなどを考えていたが
よくよく考えればどうやって緯度経度を出したか調べたら

import requests
import json
from pprint import pprint
url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&appid={API_key}"
# xxxxx
url = url.format(zip_place = "郵便番号,JP", API_key = "取得したAPIキー")

jsondata = requests.get(url).json()
pprint(jsondata)

print("天気:",jsondata["weather"][0]["main"])
print("天気詳細:",jsondata["weather"][0]["description"])

print("都市名:",jsondata["name"])
print("気温:",jsondata["main"]["temp"])
print("体感気温:",jsondata["main"]["feels_like"])
print("最低気温:",jsondata["main"]["temp_min"])
print("最高気温:",jsondata["main"]["temp_max"])
print("気圧:",jsondata["main"]["pressure"])
print("湿度:",jsondata["main"]["humidity"])

print("風速:",jsondata["wind"]["speed"])
print("風の方角:",jsondata["wind"]["deg"])

で緯度経度を取得していた

つまり

import requests
import json
from pprint import pprint
# url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&appid={API_key}"
url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"

# xxxxx
url = url.format(zip_place = "郵便番号,JP", API_key = "取得したAPIキー")

jsondata = requests.get(url).json()

pprint(jsondata)

の結果で

{'base': 'stations',
 'clouds': {'all': 100},
 'cod': 200,
 'coord': {'lat': 緯度, 'lon': 経度},
 'dt': 1714340555,
 'id': 0,
 'main': {'feels_like': 17.59,
          'grnd_level': 1014,
          'humidity': 77,
          'pressure': 1015,
          'sea_level': 1015,
          'temp': 17.75,
          'temp_max': 17.75,
          'temp_min': 17.75},
 'name': 'Kawai',
 'rain': {'1h': 0.1},
 'sys': {'country': 'JP',
         'id': 2092872,
         'sunrise': 1714334403,
         'sunset': 1714383067,
         'type': 2},
 'timezone': 32400,
 'visibility': 10000,
 'weather': [{'description': '厚い雲',
              'icon': '04d',
              'id': 804,
              'main': 'Clouds'}],
 'wind': {'deg': 255, 'gust': 1.55, 'speed': 0.83}}

の実行結果の中から

 'coord': {'lat': 緯度, 'lon': 経度},


緯度経度のみ抽出して表示したい

これは

# 緯度経度のデータを抽出
latitude = jsondata['coord']['lat']
longitude = jsondata['coord']['lon']

# 緯度経度を表示
print("緯度:", latitude)
print("経度:", longitude)

とすればOK

次にモジュールにする
location_util2.pyの中身の

# location_utils.py

import requests
from geopy.geocoders import Nominatim

def get_public_ip():
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

def get_address_from_coordinates(latitude, longitude):
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), exactly_one=True)
    return location.address if location else "Address not found."

def get_current_location_and_address():
    ip_address = get_public_ip()
    location = get_location_by_ip(ip_address)
    if location:
        latitude, longitude = location
        address = get_address_from_coordinates(latitude, longitude)
        return latitude, longitude, address
    return None, None, "Location not found."

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "取得したAPIキー"
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address

として保存

Address.pyの中身を

import location_utils2
latitude, longitude, address = location_utils2.get_current_location_and_address()
if latitude and longitude:
    print(f"Latitude: {latitude}, Longitude: {longitude}")
if address:
    print("Address:", address)
から
import location_utils2

latitude, longitude, address = location_utils2.get_current_location_and_address()
if latitude and longitude:
    print(f"Latitude: {latitude}, Longitude: {longitude}")
if address:
    print("Address:", address)

って同じじゃん

実行し
https://www.geocoding.jp
で調べたら
緯度 経度
が住所の緯度経度

つまり
Nominatim
による住所への変換がダメってことがわかった

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "取得したAPIキー"
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address

の内容を

import requests

def get_current_location_and_address():
    # APIキーとZIPコードを設定
    API_key = "YOUR_API_KEY"
    zip_place = "YOUR_ZIP_CODE,JP"
    
    # OpenWeatherMap APIを使用して緯度経度を取得
    weather_url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    response = requests.get(weather_url)
    jsondata = response.json()
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    # 国土地理院APIを使用して住所を取得
    geo_url = f"https://mreversegeocoder.gsi.go.jp/reverse-geocoder/LonLatToAddress?lat={latitude}&lon={longitude}"
    geo_response = requests.get(geo_url)
    address_data = geo_response.json()
    
    # 住所情報を取得(存在する場合)
    address = address_data['results']['lv01Nm'] if 'results' in address_data and 'lv01Nm' in address_data['results'] else None
    
    return latitude, longitude, address

とした結果
とりあえず郵便番号まで取得できて大体の住所がでるなら
Nominatim
で事足りる

重要なのはそこじゃなくて緯度経度が合ってるかどうか
なので
住所検索の機能はオミットする予定だったが

import location_utils2

latitude, longitude, _ = location_utils2.get_current_location_and_address()  # 住所情報は無視
print(f"Latitude: {latitude}, Longitude: {longitude}")

というように別のファイルで呼び出すときに
指定しなければ緯度経度のみ抽出できる

次に

# weather_check.py

import requests
import location_utils

# OpenWeather API キー
api_key = 'APIキー'

# 緯度と経度を location_utils から取得
latitude, longitude = location_utils.get_current_location()
if latitude is None or longitude is None:
    print("Failed to get location.")
    exit()

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

のコードを
これを使って緯度経度取得したものへ変更

# weather_check.py
import location_utils2
import requests

def get_weather_data(api_key, latitude, longitude):
    # OpenWeather One Call API の URL
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# APIキーを設定
api_key = 'APIキー'


# location_utils2から緯度と経度を取得
latitude, longitude, _ = location_utils2.get_current_location_and_address()

# 天気データを取得
weather_data = get_weather_data(api_key, latitude, longitude)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

これを実行すれば雨が降らないので

No rain expected in the next hour.

となる

次に6時間毎の天気を表示する

weather_forecast_6hour.py

import requests

def get_weather_forecast(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&units=metric&lang=ja&appid={api_key}"
    response = requests.get(url)
    return response.json()

# api_key = 'YOUR_API_KEY'  # APIキーを設定
api_key = 'APIキー'

latitude = 35.681236  # 東京駅の緯度
longitude = 139.767125  # 東京駅の経度

weather_data = get_weather_forecast(api_key, latitude, longitude)

# 現在の天気、1時間毎の天気(次の6時間)、1日毎の天気(次の7日間)を表示
print("Current Weather:", weather_data['current'])
print("Hourly Forecast for next 6 hours:")
for hour in weather_data['hourly'][:6]:
    print(hour)
print("Daily Forecast for next 7 days:")
for day in weather_data['daily'][:7]:
    print(day)

としたが

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/rain_alert/weather_forecast_6hour.py", line 17, in <module>
    print("Current Weather:", weather_data['current'])
KeyError: 'current'

となる

「現在・1時間毎・1日毎」の天気予報を取得
https://dev.classmethod.jp/articles/use-open-weather-map/
のコードが古いようだ

次に
https://3pysci.com/openweathermap-5/#index_id5
を参考にまずは全データの取得

だがだめ

公式を確認する
https://openweathermap.org/api/one-call-3#current

https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}
となっていて
	optional	By using this parameter you can exclude some parts of the weather data from the API response. It should be a comma-delimited list (without spaces).
Available values:
	•	current
	•	minutely
	•	hourly
	•	daily
	•	alerts

やるべきことは
現在の天気を取得し
雨以外の天気なら
一時間以内に雨が降るかを調べるものを作成する

雨が降りそうな時のみ音声を流す
これを顔認識したら実行するものを作成することで
洗濯物を取り込めるようにする

出かけたり買い物の時間を考えれば三時間がベストなので

print("天気:",jsondata["current"]["weather"][0]["main"])

で実験したらNG

3時間にしても

    hourly_forecast = weather_data['hourly'][:24]  # 24時間分のデータを取得

の部分が間違っている

つまり時間毎の指定ができないとだめ

https://zenn.dev/daifukuninja/articles/5e696cd0a75ba8#たたいてみた
を参考に
現在地の天気は

curl 'https://api.openweathermap.org/data/2.5/weather?zip=169-0072,JP&appid={your-api-key}&lang=jp'

で取得できるので

curl 'https://api.openweathermap.org/data/2.5/forecast?lon=緯度&lat=経度&appid={your-api-key}&lang=jp'

とりあえず現在の郵便番号から
緯度経度、天気を取得するように

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "APIキー"
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    #天気の取得
    weather_description = jsondata['weather'][0]['description']
    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address, weather_description

へコード変更

メール読み上げを設定ファイルからに変更

メール読み上げを設定ファイルからに変更

config.ini

を作成し

[Settings]
server_ip = 192.168.1.69:50021
label_id = GmailのラベルID
detection_interval = 1200
notification_sound = notice.wav
pdf_notification_sound = notice_pdf.wav

とする

configparser モジュールを使用して、INIファイルから設定を読みこむ

kao.py

import configparser

を追加

# configparserを使用してINIファイルを読み込む
config = configparser.ConfigParser()
config.read('config.ini')

# 設定を変数に格納
server_ip = config['Settings']['server_ip']
label_id = config['Settings']['label_id']
detection_interval = int(config['Settings']['detection_interval'])

        if lastTime is None or time.perf_counter() - lastTime > 1200:
を
        if lastTime is None or time.perf_counter() - lastTime > detection_interval:

というように設定値の定数へ変更

次にサーバーIPも設定ファイルから読み込むように変更する

email_processor,py

のコードを変更

import configparser

# 設定ファイルの読み込み
config = configparser.ConfigParser()
config.read('config.ini')
server_ip = config['Settings']['server_ip']
label_id = config['Settings']['label_id']
notification_sound = config['Settings']['notification_sound']
pdf_notification_sound = config['Settings']['pdf_notification_sound']

を追記

音声を生成する部部の

    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]
でサーバーIPの部分を
    command_json = [
        "curl", "-s", "-X", "POST",
        f"http://{server_ip}/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", f"http://{server_ip}/synthesis?speaker=1"
    ]

へ変更

また再生する音声を

        playsound('notice.wav')

から

playsound(notification_sound)

へ変更

                        playsound('notice_pdf.wav')

                        playsound(pdf_notification_sound)

へ変更

これで
Voicevox のdocker サーバーIP
お知らせの音声
顔認識してメールを読み上げる時間の指定
Gmailで読み上げるラベルの指定
を設定ファイルで行えるように変更完了

あとは
GithubへSSHでアップできるようにする

顔を認識したらメールを読み上げる

顔を認識したらメールを読み上げる

import fitz
from playsound import playsound
import subprocess
import pygame
import time

# テキストから音声を生成して再生する関数
def generate_and_play_audio_from_text(text):
    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)
    pygame.init()
    pygame.mixer.init()
    sound = pygame.mixer.Sound("audio_output.wav")
    sound.play()
    while pygame.mixer.get_busy():
        time.sleep(0.1)

# PDFファイルから文字数をカウントする関数
def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

# メールの処理を行う関数
def process_email(service, label_id):
    from gmail_utils import gmail_get_latest_unread_message_body
    from pdf_downloader import find_preview_link, download_pdf

    body, urls = gmail_get_latest_unread_message_body(service, label_id)
    if body:
        playsound('notice.wav')
        generate_and_play_audio_from_text(body)
        if urls:
            for url in urls:
                preview_url = find_preview_link(url)
                if preview_url:
                    download_pdf(preview_url, "downloaded_file.pdf")
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    if char_count >= 100:
                        playsound('notice_pdf.wav')
                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")
    else:
        print("未読メールはありません。")

というようにモジュールにして保存
ファイル名は

email_processor.py

とした

次に
Kao,pyで顔認識したらメールを読み上げるようにする

git clone https://github.com/Snowpooll/face_weather.git

で以前作成した顔認識したら天気を知らせるの中にあるものを

cp ../../face_weather/haarcascade_* .

でコピーして使う

import cv2
import time
from email_processor import process_email
from gmail_utils import gmail_init

# Haar Cascade分類器の読み込み
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

# Webカメラの設定
cap = cv2.VideoCapture(0)  # 0番目のカメラを使用する場合

# Gmailサービスの初期化
service = gmail_init()
label_id = "ラベルID"  # 例として特定のラベルIDを設定

# 最後の顔検出時刻
lastTime = None

# メインループ
while True:
    # カメラからのフレームの取得
    ret, frame = cap.read()
    
    # フレームのグレースケール化
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 顔の検出
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
    
    # 検出された顔に対する処理
    for (x, y, w, h) in faces:
        # 検出自の処理(検出から1分たったら再度イベント動かす)
        if lastTime is None or time.perf_counter() - lastTime > 60:
            # 検出時刻更新
            lastTime = time.perf_counter()
            # メール処理関数を呼び出し
            process_email(service, label_id)

# 後処理
cap.release()
cv2.destroyAllWindows()

で実行したら成功したが
間隔を一分にしたため
メールばかり読み上げになる

なので感覚を20分ぐらいに変更する

        # 検出自の処理(検出から20分たったら再度イベント動かす)
        if lastTime is None or time.perf_counter() - lastTime > 1200:

へ変更した

とりあえず動作確認は取れたので
次に設定ファイルを作成し
コードのメンテをしやすくする

とりあえず
docker サーバーIP
gmailのラベル
次の検出までの待ち時間
は設定ファイルにまとめておき
これを編集するだけでできるようにする

ダウンロード機能の修正とモジュール化

ダウンロード機能の修正とモジュール化

import fitz  # PyMuPDF
from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                    
                    # PDFファイルから文字数をカウント
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    print(f"PDF内の文字数: {char_count}")
                    
                    # 文字数が100文字以上の場合は別の音声ファイルを再生
                    if char_count >= 100:
                        playsound('notice_pdf.wav')

                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")

        # メール本文をテキストファイルに保存
        save_email_body_to_text(body)
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

としたらリンクがエラーになる

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    if urls:
        for url in urls:
            print(f"プレビューリンクを検索するURL: {url}")
            # プレビューリンクを取得
            preview_url = find_preview_link(url)
            if preview_url:
                print(f"プレビューリンク: {preview_url}")
                # プレビューリンクからPDFファイルのダウンロードを試みる
                download_pdf(preview_url, file_path="downloaded_file.pdf")
            else:
                print("プレビューリンクが見つかりませんでした。")
    else:
        print("メールにURLが見つかりませんでした。")

    # メール本文をテキストファイルに保存
    save_email_body_to_text(body)

if __name__ == "__main__":
    main()

だと問題ない

とりあえず

import fitz  # PyMuPDF
from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf
import subprocess
import pygame
import time

def generate_and_play_audio_from_text(text):
    # JSONファイルを作成するためのcurlコマンド
    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]

    # 音声ファイルを作成するためのcurlコマンド
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]

    # JSONファイルと音声ファイルを作成
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)

    # Pygameで音声ファイルを再生
    pygame.init()
    pygame.mixer.init()
    sound = pygame.mixer.Sound("audio_output.wav")
    sound.play()
    while pygame.mixer.get_busy():
        time.sleep(0.1)

def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')
        generate_and_play_audio_from_text(body)
        

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                    
                    # PDFファイルから文字数をカウント
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    print(f"PDF内の文字数: {char_count}")
                    
                    # 文字数が100文字以上の場合は別の音声ファイルを再生
                    if char_count >= 100:
                        playsound('notice_pdf.wav')

                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

とすることで
メールの読み上げと
PDFのダウンロードができた
また長文のPDFに関してはPDFを見るように促すようにした

あとは顔を認識したら起動するようにする

まずはこれをモジュールにする

import fitz
from playsound import playsound
import subprocess
import pygame
import time

# テキストから音声を生成して再生する関数
def generate_and_play_audio_from_text(text):
    command_json = [
        "curl", "-s", "-X", "POST",
        "192.168.1.69:50021/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", "192.168.1.69:50021/synthesis?speaker=1"
    ]
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)
    pygame.init()
    pygame.mixer.init()
    sound = pygame.mixer.Sound("audio_output.wav")
    sound.play()
    while pygame.mixer.get_busy():
        time.sleep(0.1)

# PDFファイルから文字数をカウントする関数
def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

# メールの処理を行う関数
def process_email(service, label_id):
    from gmail_utils import gmail_get_latest_unread_message_body
    from pdf_downloader import find_preview_link, download_pdf

    body, urls = gmail_get_latest_unread_message_body(service, label_id)
    if body:
        playsound('notice.wav')
        generate_and_play_audio_from_text(body)
        if urls:
            for url in urls:
                preview_url = find_preview_link(url)
                if preview_url:
                    download_pdf(preview_url, "downloaded_file.pdf")
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    if char_count >= 100:
                        playsound('notice_pdf.wav')
                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")
    else:
        print("未読メールはありません。")

として

email_processor.py

として保存

次に顔を検出したらこれを呼び出すようにする

Gmail読み上げとメールのお知らせを作る

Gmail読み上げとメールのお知らせを作る

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    if urls:
        for url in urls:
            print(f"プレビューリンクを検索するURL: {url}")
            # プレビューリンクを取得
            preview_url = find_preview_link(url)
            if preview_url:
                print(f"プレビューリンク: {preview_url}")
                # プレビューリンクからPDFファイルのダウンロードを試みる
                download_pdf(preview_url, file_path="downloaded_file.pdf")
            else:
                print("プレビューリンクが見つかりませんでした。")
    else:
        print("メールにURLが見つかりませんでした。")

    # メール本文をテキストファイルに保存
    save_email_body_to_text(body)

if __name__ == "__main__":
    main()

でテキストファイルの作成とPDF取得ができているので
お知らせメッセージの
notice.wavを再生する

GPTによれば

pip install playsound

でインストールし
コードを変更する

from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")

        # メール本文をテキストファイルに保存
        save_email_body_to_text(body)
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

変更点は

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

で音声ファイルを読み込んでいること

Main.pyの中で

    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

を追加して音声を出している

次にPDFの文字数を調べて100文字以上なら
音声でPDFの内容を確認するように促す

あとはメール本文の読み上げ処理だけ

PyMuPDFを使い
文字数をカウント、文字数が100文字以上ならPDFを確認する音声を再生するようにコード変更

import fitz  # PyMuPDF
from playsound import playsound
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def count_pdf_characters(file_path):
    doc = fitz.open(file_path)
    text = ""
    for page in doc:
        text += page.get_text()
    return len(text)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルID")
    
    # 未読メールがある場合は音声ファイルを再生
    if body:
        playsound('notice.wav')

        if urls:
            for url in urls:
                print(f"プレビューリンクを検索するURL: {url}")
                # プレビューリンクを取得
                preview_url = find_preview_link(url)
                if preview_url:
                    print(f"プレビューリンク: {preview_url}")
                    # プレビューリンクからPDFファイルのダウンロードを試みる
                    download_pdf(preview_url, file_path="downloaded_file.pdf")
                    
                    # PDFファイルから文字数をカウント
                    char_count = count_pdf_characters("downloaded_file.pdf")
                    print(f"PDF内の文字数: {char_count}")
                    
                    # 文字数が100文字以上の場合は別の音声ファイルを再生
                    if char_count >= 100:
                        playsound('notice_pdf.wav')

                else:
                    print("プレビューリンクが見つかりませんでした。")
        else:
            print("メールにURLが見つかりませんでした。")

        # メール本文をテキストファイルに保存
        save_email_body_to_text(body)
    else:
        print("未読メールはありません。")

if __name__ == "__main__":
    main()

あとはvoicevoxの処理のみ

とりあえずdockerを起動させる

docker run -d -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest

でやったら普通に音声が再生できたので
単純に負荷でdockerが落ちてたみたい

文章を音声で読み上げしたが
どうやら最初の文章に日付と宛先の名前がくっついている
毎回main3.pyを実行するたびに他のメールでも同じなので
最初の文章を削除する

Githubへの公開とライセンス関連

Githubへの公開とライセンス関連を調べた

https://github.com/Snowpooll/face_weather
が今回作成したもの

顔を認識するとVOICEVOXで今日の天気を教えてくれる

コンセプトは操作しないこと
アレクサでもいいけど
声が出せないと使えないし

以下は作業ログ

リポジトリ名を
face_weather

publicで公開

Add a README file

readmeを追加

ライセンスはMITライセンスを選択

これでリポジトリを作成すれば
ライセンスの英文は自動で作成してくれる

Readmeを編集してから

git clone https://github.com/Snowpooll/face_weather.git


ローカル環境にリポジトリをクローン

この中にプログラムをコピーしていくが

__pycache__

があった

これは
ChatGPTによれば
__pycache__ は、
Pythonがソースコードをコンパイルした後のバイトコードを格納するディレクトリ

バージョン管理システム(例:Git)を使用している場合は、
通常、__pycache__ ディレクトリを
無視リストに追加することが一般的

なので

 vim .gitignore

でファイルを作成

__pycache__/

として保存

git add .gitignore

で追加

git commit -m "Add __pycache__ to .gitignore"

を実行したら

Author identity unknown

*** Please tell me who you are.

Run

  git config --global user.email "you@example.com"
  git config --global user.name "Your Name"

to set your account's default identity.
Omit --global to set the identity only in this repository.

fatal: unable to auto-detect email address (got 'snowpool@snowpool-Prime-Series.(none)')

となった

そういえば再インストールした時に設定を忘れていたので
git config --global user.name "ユーザ名"
git config --global user.email "メルアド"


再度

git commit -m "Add __pycache__ to .gitignore"

を実行

あと

query.json
test_audio.wav
weather.txt

も無視リストに加えるので

vim .gitignore

でファイルを開き
追記

git add .gitignore
git commit -m "Update .gitignore to include specific files"

で追加

次に

requirements.txt

の作成

これでライブラリ関連のインストールが簡単になる

requests
deepl
pygame
opencv-python
configparser

というようにライブラリを書いておけばOK

git push origin main

でエラー。

remote: Support for password authentication was removed on August 13, 2021.
remote: Please see https://docs.github.com/get-started/getting-started-with-git/about-remote-repositories#cloning-with-https-urls for information on currently recommended modes of authentication.
fatal: Authentication failed for 'https://github.com/Snowpooll/face_weather.git/'

調べたら
GitHubでパスワード認証が廃止されたため、
HTTPSを使ってリモートリポジトリにアクセスする際には
パーソナルアクセストークン(PAT)を使用するか、
SSHキー認証を使用する必要があるらしい

とりあえずPATを使うことにする

GitHubでPATを生成する:
* GitHubにログインし、右上のアカウントアイコンから「Settings」を選択します。
* 左側のサイドバーから「Developer settings」を選択し、「Personal access tokens」に移動します。
* 「Generate new token」をクリックし、必要な権限を選択してトークンを生成します。生成されたトークンは安全な場所にコピー

で作成しようとしたら
New personal access token (classic)
の設定でつまづくので検索

https://dev.classmethod.jp/articles/github-personal-access-tokens/
によれば

Noteにはトークンの使用用途
code maintenanceとした

Expirationにはトークンの有効期限
デフォルトは30だが短いので90にする

Select scopesではトークンが利用可能なGitHub権限を設定
git cloneやgit pullをしたい場合は、repoにチェック

これで
Generate token
をクリックすれば
トークンが表示される

これで再度実行したら

 ! [rejected]        main -> main (fetch first)
error: failed to push some refs to 'https://github.com/Snowpooll/face_weather.git'
hint: Updates were rejected because the remote contains work that you do
hint: not have locally. This is usually caused by another repository pushing
hint: to the same ref. You may want to first integrate the remote changes
hint: (e.g., 'git pull ...') before pushing again.
hint: See the 'Note about fast-forwards' in 'git push --help' for details.

となる

原因は
リモートリポジトリにあなたのローカルリポジトリにはない変更が含まれているために発生

思い当たることとして
READMEを
Githubで編集した後に

git pull origin main

していなかったこと

他の変更はなかったため

git push origin main --force

で強制プッシュ

とりあえずはこれで公開できた

ちなみに後で聞いたのだが
–force は、remoteのものを完全に上書きするので、使わないほうがベター
必要なcommitが消える可能性があるらしい

認証も、SSHキーを使うのがベター
とのこと

voicevox でお知らせメッセージの作成

voicevox でお知らせメッセージの作成

新着メールがあった時とかに急に読み上げするのではなく

内容は長文のためPDFをご確認ください

特別支援学校からのお知らせがあります

Voicevoxで作成した音声を流すようにするため
音声の作成

これは
curlで作成できるので

まず

docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-latest

でdockerを起動

次に

vim notice.txt

で内容を
特別支援学校からのお知らせがあります

vim notice_pdf.txt


内容は長文のためPDFをご確認ください

として

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@notice.txt > query.json

でjsonを作成

このjsonを元に

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > notice.wav    

で音声を作成

同様に

curl -s -X POST "192.168.1.69:50021/audio_query?speaker=1" --get --data-urlencode text@notice_pdf.txt > query.json

でJSON作成

curl -s -H "Content-Type: application/json" -X POST -d @query.json "192.168.1.69:50021/synthesis?speaker=1" > notice_pdf.wav

で音声を作成

支援学校のPDFをダウンロードし
本文をテキストファイルに保存するプログラムは

main3.py

に記述してある

ダウンロード関連は

pdf_downloader.py

Gmailの認証系は

gmail_utils.py

にまとめてある

また認証関連は

token.json

これらと作成したwavファイルを
mail_voiceディレクトリに移動

これで処理を完成させる

とりあえずPDFとgmail本文の取得した文字列はできているので
あとはテキストファイルの読み上げをvoicevoxとするのと
PDFの本文の長さを取得し

docker のVoicevox で再生できるのは100文字程度だった
それ以上なら
音声ファイルを再生しPDFを閲覧するように促す
このためには文字数のカウントをする

まずはPDFの文字列の長さを取得する

JavaScriptで作成されたページからPDFダウンロード

JavaScriptで作成されたページからPDFダウンロード

pip install selenium

でインストール

from selenium import webdriver

# Safari WebDriverのインスタンスを作成
driver = webdriver.Safari()

# Googleのホームページにアクセス
driver.get("https://www.google.com")

# ウェブページのタイトルをコンソールに表示
print(driver.title)

# ブラウザを閉じる
driver.quit()

で実行できたので
safariでseleniumで行ってみて
成功したらubuntu でchrome などで行うようにしてみる

とりあえずまずはリンクを表示できるかテスト

まず、SafariのWebDriverを有効にし、SeleniumでSafariを使用する準備を整える必要があることを思い出してください。これには、Safariの「開発」メニューで「リモートオートメーションを許可」を有効にする手順が含まれます。

これは
Safari > 設定 > 詳細タブをクリック
Webデベロッパ用の機能を表示をチェックし

デベロッパタブをクリックし
リモートオートメーションを許可にチェックを入れることでできる

from selenium import webdriver
from selenium.webdriver.common.by import By

# WebDriverの初期化(Safari)
driver = webdriver.Safari()

# ページにアクセス
driver.get("https://message.cocoo.education/message/ファイル番号")

# ページが完全にロードされるまで待機
driver.implicitly_wait(10)  # 秒

# 'プレビュー'リンクを探す
try:
    preview_link = driver.find_element(By.LINK_TEXT, "プレビュー")
    print("プレビューリンク:", preview_link.get_attribute('href'))
except:
    print("プレビューリンクが見つかりませんでした。")

# ブラウザを閉じる
driver.quit()

これでブラウザが立ち上がり
ログにPDFのリンクが表示される

つまりselenium ならURLの取得が可能

なのでPDF取得まで行うようにする
しかし

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Safari()
driver.get("https://message.cocoo.education/message/ファイル番号")

try:
    # 要素が見えるまで最大10秒間待機します。
    preview_button = WebDriverWait(driver, 10).until(
        EC.visibility_of_element_located((By.XPATH, '//button[contains(text(),"プレビュー")]'))
    )
    preview_button.click()
except Exception as e:
    print("エラーが発生しました:", e)
finally:
    driver.quit()

というようにページの動作を待つように変更したが変わらない

このためやはり機能を分割し
まずgmail からURLリンクを取得
次にこのURLからseleniumでプレビューボタンおURLを取得
取得したボタンURLからPDFを取得
としてみる

とりあえずPDFのリンクを取得するため
バックグランドでseleniumを実行し
PDFのリンクをテキストとして入れるメソッドに書き換える

from selenium import webdriver
from selenium.webdriver.common.by import By

# WebDriverの初期化(Safari)
driver = webdriver.Safari()

# ページにアクセス
driver.get("https://message.cocoo.education/message/ファイル番号")

# ページが完全にロードされるまで待機
driver.implicitly_wait(10)  # 秒

# 'プレビュー'リンクを探す
try:
    preview_link = driver.find_element(By.LINK_TEXT, "プレビュー")
    print("プレビューリンク:", preview_link.get_attribute('href'))
except:
    print("プレビューリンクが見つかりませんでした。")

# ブラウザを閉じる
driver.quit()

しかしsafariだとヘッドレスモードがない
まぁ試作品なので
とりあえずこれをモジュールにするだけにしておく

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

def find_preview_link(url, link_text="プレビュー"):
    # WebDriverの初期化(Safari)
    driver = webdriver.Safari()
    
    try:
        # ページにアクセス
        driver.get(url)
        
        # ページが完全にロードされるまで待機
        driver.implicitly_wait(10)  # 秒
        
        # 'プレビュー'リンクを探す
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        
        # リンクのURLを返す
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        # ブラウザを閉じる
        driver.quit()

# 使用例
url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223"
link_url = find_preview_link(url)
if link_url:
    print("プレビューリンク:", link_url)
else:
    print("プレビューリンクが見つかりませんでした。")

これでモジュールにできる

次にこのPDFのリンクを使いPDFをダウンロードする

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def find_preview_link(url, link_text="プレビュー"):
    driver = webdriver.Safari()
    try:
        driver.get(url)
        driver.implicitly_wait(10)  # 秒
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        driver.quit()

# 使用例
url = "https://message.cocoo.education/message/ファイル番号"
pdf_url = find_preview_link(url)
if pdf_url:
    print("プレビューリンク:", pdf_url)

    # ダウンロードしたいPDFファイルのURL
    response = requests.get(pdf_url)

    # レスポンスのステータスコードが200(成功)の場合、ファイルを保存
    if response.status_code == 200:
        with open("downloaded_file.pdf", "wb") as f:
            f.write(response.content)
        print("ファイルが正常にダウンロードされました。")
    else:
        print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code)
else:
    print("プレビューリンクが見つかりませんでした。")

これでOK

次にこのメソッドの最初の取得対象URLをgmailから取得する

以前

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 最新の未読メール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 最新のメッセージ(最新の未読メール)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br>" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()

# ラベル ID を指定して最新の未読メール本文を取得
latest_unread_message_body = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)

でURLを削除した本文を表示するものを作成した

これをURLと本文をそれぞれの変数に格納して返すようなメソッドに書き換える

import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを検出してリストに格納する関数
def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

# 最新の未読メール本文とURLを取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content", []

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
print(urls)

これで本文とURLの分離取得ができた

次にこのメソッドで得られたURLからPDFをダウンロードするようにしたい

その前にモジュールかしておく

vim gmail_utils.py

中身を

# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# メール本文のデコード関数
def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

# URLを検出してリストに格納する関数
def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

# 最新の未読メール本文とURLを取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

# メールの受信日時を取得する関数
def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数
def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

として

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body

# Gmail API サービスを初期化
service = gmail_init()

# ラベルIDを指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "INBOX")
print(latest_unread_message_body)
for url in urls:
    print(url)

のINBOXをラベルIDに変える

これで実行すると未読メールが表示される

次にこれを使いPDFのダウンロード

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def download_pdf_from_gmail_links():
    service = gmail_init()  # Gmail API サービスを初期化
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    _, urls = gmail_get_latest_unread_message_body(service, "INBOX")

    if not urls:
        print("メールからURLを抽出できませんでした。")
        return

    for url in urls:
        pdf_url = find_preview_link(url)
        if pdf_url:
            print("プレビューリンク:", pdf_url)
            try:
                response = requests.get(pdf_url)
                if response.status_code == 200:
                    filename = "downloaded_file.pdf"
                    with open(filename, "wb") as f:
                        f.write(response.content)
                    print(f"ファイルが正常にダウンロードされました: {filename}")
                else:
                    print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")
            except Exception as e:
                print(f"ダウンロード中にエラーが発生しました: {e}")
        else:
            print("プレビューリンクが見つかりませんでした。")

download_pdf_from_gmail_links()

コードを大幅に変更

# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service
# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

としたらバグった

とりあえず

gmail_get_text_and_url.py

でのURLと本文の取得

get_pdf.py

でPDFの取得はできた

あとはこの2つを組み合わせるだけ

vim gmail_api_utils.py

でgmailのテキストとURLを取り出す機能をモジュールにして他で呼び出せるようにする

ファイルの内容は

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
print(urls)

より上の部分のみ抽出して保存

main.py

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
for url in urls:
    print(url)

というように

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

でインポートして

service = gmail_init()

で初期化

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
for url in urls:
    print(url)

で呼び出せば同じ効果が出る

同様にPDFダウンロードもモジュールにする

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def find_preview_link(url, link_text="プレビュー"):
    driver = webdriver.Safari()  # またはChrome(), Firefox() など、使用するブラウザに合わせて変更
    try:
        driver.get(url)
        driver.implicitly_wait(10)  # 秒
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        driver.quit()

def download_pdf(url, file_path="downloaded_file.pdf"):
    if url:
        print("プレビューリンク:", url)
        response = requests.get(url)
        if response.status_code == 200:
            with open(file_path, "wb") as f:
                f.write(response.content)
            print("ファイルが正常にダウンロードされました。")
        else:
            print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code)
    else:
        print("プレビューリンクが見つかりませんでした。")

これを

from pdf_downloader import find_preview_link, download_pdf

# 使用例
url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223"
pdf_url = find_preview_link(url)
download_pdf(pdf_url)

として保存し実行すればOK

あとは組み合わせる

しかし
プレビューリンクが
https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイル番号
というAmazonS3へのリンク
のはずがgmailから抽出したリンクになっているのでエラー

原因は

            # ここでは、プレビューリンクを直接取得する機能がないため、URLがPDFファイルを指していると仮定しています
            download_pdf(url, file_path="downloaded_file.pdf")

の部分
なのでURLが違う

これを修正する
なおGPTで出力した時にはINBOXになっているため
指定のラベルに変更することが必要になる

main3.py

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
    if urls:
        for url in urls:
            print(f"プレビューリンクを検索するURL: {url}")
            # プレビューリンクを取得
            preview_url = find_preview_link(url)
            if preview_url:
                print(f"プレビューリンク: {preview_url}")
                # プレビューリンクからPDFファイルのダウンロードを試みる
                download_pdf(preview_url, file_path="downloaded_file.pdf")
            else:
                print("プレビューリンクが見つかりませんでした。")
    else:
        print("メールにURLが見つかりませんでした。")

    # メール本文をテキストファイルに保存
    save_email_body_to_text(body)

if __name__ == "__main__":
    main()

とすることで
メール本文はテキストファイルへ
PDFはダウンロードすることが完了した

指定したURLのプレビューボタンをクリックしPDFをダウンロードするpythonコードの作成(失敗編)

指定したURLのプレビューボタンをクリックしPDFをダウンロードするpythonコードの作成

amazonS3からファイルの取得はできたけど
動的サイトからのリンク取得はこの方法ではできなかった

以下ログ

指定したURLにアクセスし
プレビューと書かれたボタンをクリック、表示されるPDFファイルをダウンロードするpythonコード

chatGPTで作成していく

pip install requests beautifulsoup4

で必要ライブラリのインストール

import requests
from bs4 import BeautifulSoup

# ステップ2: 指定されたURLにGETリクエストを送信し、HTMLを取得
url = "https://任意のURL"
response = requests.get(url)
html = response.content

# ステップ3: BeautifulSoupを使用してHTMLを解析し、プレビューボタンのリンクを見つける
soup = BeautifulSoup(html, 'html.parser')
preview_button = soup.find('a', text='プレビュー')  # ここではaタグでテキストが'プレビュー'の要素を探していますが、実際の構造に応じて変更する必要があります。

# ステップ4: プレビューボタンのリンクを取得し、PDFファイルをダウンロード
if preview_button:
    pdf_url = preview_button['href']  # href属性からPDFのURLを取得
    pdf_response = requests.get(pdf_url)
    with open('document.pdf', 'wb') as f:
        f.write(pdf_response.content)
    print("PDFファイルがダウンロードされました。")
else:
    print("プレビューボタンが見つかりませんでした。")

で実行したが

/Users/snowpool/aw10s/mail_auto/get_pdf.py:11: DeprecationWarning: The 'text' argument to find()-type methods is deprecated. Use 'string' instead.
  preview_button = soup.find('a', text='プレビュー')  # ここではaタグでテキストが'プレビュー'の要素を探していますが、実際の構造に応じて変更する必要があります。
プレビューボタンが見つかりませんでした。

となる

ChatGPTでこの結果を調べると

警告メッセージに基づき、text引数の代わりにstring引数を使用する必要があります。また、プレビューボタンが見つからなかった問題に関しては、ページのHTML構造を確認せずに正確な解決策を提供するのは難しいですが、一般的なアドバイスをいくつか提供できます。
* 		string引数の使用: findメソッドでtextの代わりにstringを使用してみてください。
* 		正しい要素を特定する: プレビューボタンを特定するために、ページのHTMLソースを確認し、ボタンの正確なタグ名、クラス名、またはIDを特定します。この情報を元に、findまたはfind_allメソッドを適切に使用してください。
* 		動的コンテンツの考慮: もしプレビューボタンがJavaScriptによって動的に生成されている場合、requestsとBeautifulSoupだけでは取得できないかもしれません。この場合は、seleniumのようなブラウザ自動化ツールを使用して実際のブラウザセッションを介してページを読み込み、要素を探す必要があります。

となる

以下は、string引数を使用した修正されたコードの例

import requests
from bs4 import BeautifulSoup

# ステップ2: 指定されたURLにGETリクエストを送信し、HTMLを取得
url = "https://任意のURL"
response = requests.get(url)
html = response.content

# ステップ3: BeautifulSoupを使用してHTMLを解析し、プレビューボタンのリンクを見つける
soup = BeautifulSoup(html, 'html.parser')
preview_button = soup.find('a', string='プレビュー')  # `text`の代わりに`string`を使用

# ステップ4: プレビューボタンのリンクを取得し、PDFファイルをダウンロード
if preview_button:
    pdf_url = preview_button['href']  # href属性からPDFのURLを取得
    pdf_response = requests.get(pdf_url)
    with open('document.pdf', 'wb') as f:
        f.write(pdf_response.content)
    print("PDFファイルがダウンロードされました。")
else:
    print("プレビューボタンが見つかりませんでした。")

でもだめ

seleniumで取得してみる

pip install selenium


seleniumをインストール

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
import time

# Chromeのダウンロード設定
chrome_options = Options()
prefs = {
    "download.default_directory": "/path/to/download/directory",  # ダウンロードフォルダーのパスを設定
    "download.prompt_for_download": False,  # ダウンロード前に確認を求めない
    "download.directory_upgrade": True,
    "plugins.always_open_pdf_externally": True  # PDFファイルを自動的にダウンロード
}
chrome_options.add_experimental_option("prefs", prefs)

# Chrome WebDriverのパス
webdriver_path = '/path/to/chromedriver'  # ChromeDriverのパス

# WebDriverサービスを設定
service = Service(webdriver_path)

# WebDriverの初期化
driver = webdriver.Chrome(service=service, options=chrome_options)

# Webページにアクセス
driver.get("https://任意のURL")

# プレビューボタンを探してクリック(要素のXPathを使用)
# 実際のボタンのXPathに応じて調整してください
try:
    preview_button = driver.find_element(By.XPATH, '//button[contains(text(),"プレビュー")]')
    preview_button.click()
    time.sleep(5)  # ダウンロード完了を待つ
except Exception as e:
    print("エラーが発生しました:", e)

# ブラウザを閉じる
driver.quit()

でもだめ

ならば chrome developer tools で一度サイトの構成を調べる

<a href="https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイルの保存先" target="_blank">プレビュー</a>

となっていたが
これの構成で seleniumでダウンロードしようとしてもエラー

なので

import requests

# PDFファイルのURL
pdf_url = "https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイルの保存先"

# リクエストを送信してPDFファイルを取得
response = requests.get(pdf_url)

# レスポンスのステータスコードが200(成功)の場合、ファイルを保存
if response.status_code == 200:
    with open("downloaded_file.pdf", "wb") as file:
        file.write(response.content)
    print("PDFファイルが正常にダウンロードされました。")
else:
    print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")

として直接リンクのURLからファイルを取得

これは成功

なので、
GMAILからリンクURLを抜き出しサイトへアクセスする機能
次にそのページからリンクボタンのURLを抜き出す機能
最後にPDFをダウンロードする機能
と分けて作成する

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# 'プレビュー'リンクを探す
preview_link = soup.find('a', text='プレビュー')
if preview_link:
    href_value = preview_link['href']
    print("プレビューのリンク:", href_value)
else:
    print("プレビューリンクが見つかりませんでした。")

しかし

/Users/snowpool/aw10s/mail_auto/get_pdf_url.py:15: DeprecationWarning: The 'text' argument to find()-type methods is deprecated. Use 'string' instead.
  preview_link = soup.find('a', text='プレビュー')
プレビューリンクが見つかりませんでした。

となる

このため
BeautifulSoupの最新バージョンでは、findメソッド(および関連メソッド)でtext引数の代わりにstring引数を使用するよう推奨
とのことのため

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# 'プレビュー'リンクを探す('string'引数を使用)
preview_link = soup.find('a', string='プレビュー')
if preview_link:
    href_value = preview_link['href']
    print("プレビューのリンク:", href_value)
else:
    # 'プレビュー'を含むテキストを持つリンクを探す場合の代替手段
    preview_links = soup.find_all('a')
    for link in preview_links:
        if 'プレビュー' in link.text:
            print("プレビューのリンク:", link['href'])
            break
    else:
        print("プレビューリンクが見つかりませんでした。")

へ変更したが

プレビューリンクが見つかりませんでした。
となる

webページの構造が予想と異なるか、リンクが動的に生成されている可能性があり
のようなケースでは、ブラウザの自動操作が可能なSeleniumのようなツールを使用する必要がある

念の為Javascriptで書かれているかチェック

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# ページ内のすべてのaタグを探索
links = soup.find_all('a')
found = False
for link in links:
    if link.text and 'プレビュー' in link.text:
        print("プレビューのリンク:", link['href'])
        found = True
        break

if not found:
    print("プレビューリンクが見つかりませんでした。")

の結果

プレビューリンクが見つかりませんでした

となったため
Seleniumを使うことにする

seleniumを使うにはwebdriverが必要

Pdf の中身を取り出す

Gmail でPDFをダウンロードし、PDFを読み上げしたいので
まずPDFの中身を読み出す

まずはPDF単体の取り扱いから

pip install pdfminer.six requests

でインストール

vim text_from_pdf.py

from pdfminer.high_level import extract_text

def extract_text_from_pdf(pdf_file):
    return extract_text(pdf_file)

pdf_text = extract_text_from_pdf("your_file.pdf")
print(pdf_text)  # PDFの内容を表示

ファイル名を

tunagu.pdf

に変えてみる

実行結果は
読み上げできる量を超える可能性が高いため

念の為文字数をカウントする

vim pdf_text.txt

でこの実行結果の文章を保存する

次に

vim count_characters_in_text_file

でファイルの文字数をカウントするようにする

def count_characters_in_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
        return len(content)

file_path = 'path/to/your/textfile.txt'
char_count = count_characters_in_text_file(file_path)
print(f"The file has {char_count} characters.")

のファイルを

pdf_text.txt

に変更して実行

The file has 995 characters.

となるため
Voicevox の扱える量を超える
このため対策が必要

あとファイルの取得に関しては
そのまま
メール内の
ファイルリンク
へアクセスしてもできたので
これを直接ダウンロードできるか試す