Google Cloud Vision API の登録

Google Cloud Vision API の登録

https://cloud.google.com/vision/?hl=ja&_gl=1*1yrkeuh*_ga*NzQ0NTYyMjM5LjE2ODQxODU5MzA.*_ga_WH2QY8WWF5*MTcyMTE1MzQ3OS4yLjEuMTcyMTE1MzYyNS42LjAuMA..
でチュートリアル

料金については
https://cloud.google.com/vision/pricing?hl=ja#prices
を参考に
基本的に無料枠でできそう

プロジェクトを選択するのだが
Gmailと同じプロジェクトを選択する

Credentials.json
の中身を見ればOK

とりあえずデフォルトがこれなので
そのまま進める

APIとサービスを有効にする
をクリックし
cloud vision api
で検索

これでAPIを有効にする

多分認証のJSONファイルなので
権限などを増やしても変わらないはず

もしダメなら再度作成するだけ

とりあえずこれで進めてみる

Google colabでも実験できるのだが
Google drive へアップするときに
アップロード先のディレクトリがpublicに共有されていないことを確認すること
不安なら都度アップロードすればOK

とりあえずローカルで動かすのだが
Google Cloud SDK
が必要と chatgpt では言っている

本当か調べてみる

https://nikkie-ftnext.hatenablog.com/entry/ocr-with-google-vision-api-python-first-step
2024-01-03
GoogleのVision APIをPythonから呼び出して、画像内のテキストを検出する
に載っていた

サンプルコードを動かす前に準備が必要です。
「Google Cloud プロジェクトと認証のセットアップ」
* 大前提:Google Cloudのアカウント
* Google Cloudのプロジェクト1つ
* すでにあるものを選ぶ
* または、新規作成
* 余談:削除など管理したい場合は https://console.cloud.google.com/cloud-resource-manager から
* プロジェクトで課金が有効になっているか確認する
* 案内された別ドキュメントに従いました
* 直リンクは https://console.cloud.google.com/billing/linkedaccount?hl=ja&project=<選んだプロジェクトID> っぽいです
* 課金を有効にしますが、今回のサンプルコードの範囲内では無料で使える認識です
* 最初の 1,000 ユニット/月は無料

まではOK

次に
gcloudコマンドが使えるようにする設定が必要

https://cloud.google.com/docs/authentication/provide-credentials-adc?hl=ja#local-dev
の中の
gcloud CLI をインストールして初期化します。
をクリック

gcloud CLI をインストールする
https://cloud.google.com/sdk/docs/install?hl=ja
の中で
各種OSごとの
Google Cloud CLI 
のパッケージがダウンロードできるので
ここからダウンロード

私の場合 Macだけど
M1MacBookAir なので
macOS 64 ビット
(ARM64, Apple M1 silicon)
の方を選ぶ

google-cloud-cli-477.0.0-darwin-arm.tar.gz

をダウンロードしたら
展開して
ホームディレクトリに移動させる

Macの場合
圧縮ファイルをダブルクリックで展開できるので

 mv ~/Downloads/google-cloud-sdk .

でホームディレクトリ直下に移動

./google-cloud-sdk/install.sh

を実行したらエラーになるので

Reason: tried: '/opt/homebrew/opt/openssl@1.1/lib/libssl.1.1.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/opt/openssl@1.1/lib/libssl.1.1.dylib' (no such file), '/opt/homebrew/opt/openssl@1.1/lib/libssl.1.1.dylib' (no such file), '/usr/local/lib/libssl.1.1.dylib' (no such file), '/usr/lib/libssl.1.1.dylib' (no such file, not in dyld cache)

で検索

Library not loaded: libssl.1.1.dylib (LoadError) 発生時の解決方法
によれば
OpenSSL@1.1のインストール
でOK

 brew install openssl@1.1

でインストールできるらしい

既にサポート終了しているはずと思い検索
「OpenSSL」にアップデート – 「同1.1.1」はサポート終了
https://www.security-next.com/149614

最初にchatgptでエラーの解決を調べた時にこの答えが出たけど
以前 YouTube live を再生するコードの時に
昔のライブラリ情報のインストールを提示され
ドツボにハマったことがあったので
念の為検索した

もうちょっと調べたら

PythonがOpenSSL周りのエラーで動かない

2024年05月24日
でこっちも同じ感じ

ただ

brew install openssl@1.1

を実行しても

brew install openssl@1.1

==> Auto-updating Homebrew...
Adjust how often this is run with HOMEBREW_AUTO_UPDATE_SECS or disable with
HOMEBREW_NO_AUTO_UPDATE. Hide these hints with HOMEBREW_NO_ENV_HINTS (see `man brew`).
==> Auto-updated Homebrew!
Updated 2 taps (homebrew/core and homebrew/cask).
==> New Formulae
libgit2@1.7                              tdb
==> New Casks
avbeam                     jagex                      twingate
duplicateaudiofinder       sq-mixpad

You have 23 outdated formulae installed.

Error: Cannot install under Rosetta 2 in ARM default prefix (/opt/homebrew)!
To rerun under ARM use:
    arch -arm64 brew install ...
To install under x86_64, install Homebrew into /usr/local.

となるので

arch -arm64 brew install openssl@1.1

を実行

Warning: openssl@1.1 has been deprecated because it is not supported upstream!
==> Downloading https://ghcr.io/v2/homebrew/core/openssl/1.1/manifests/1.1.1w
######################################################################### 100.0%
==> Fetching openssl@1.1
==> Downloading https://ghcr.io/v2/homebrew/core/openssl/1.1/blobs/sha256:38619f
######################################################################### 100.0%
==> Pouring openssl@1.1--1.1.1w.arm64_sonoma.bottle.tar.gz
==> Caveats
A CA file has been bootstrapped using certificates from the system
keychain. To add additional certificates, place .pem files in
  /opt/homebrew/etc/openssl@1.1/certs

and run
  /opt/homebrew/opt/openssl@1.1/bin/c_rehash

openssl@1.1 is keg-only, which means it was not symlinked into /opt/homebrew,
because this is an alternate version of another formula.

If you need to have openssl@1.1 first in your PATH, run:
  echo 'export PATH="/opt/homebrew/opt/openssl@1.1/bin:$PATH"' >> ~/.zshrc

For compilers to find openssl@1.1 you may need to set:
  export LDFLAGS="-L/opt/homebrew/opt/openssl@1.1/lib"
  export CPPFLAGS="-I/opt/homebrew/opt/openssl@1.1/include"

For pkg-config to find openssl@1.1 you may need to set:
  export PKG_CONFIG_PATH="/opt/homebrew/opt/openssl@1.1/lib/pkgconfig"
==> Summary
🍺  /opt/homebrew/Cellar/openssl@1.1/1.1.1w: 8,102 files, 18MB
==> Running `brew cleanup openssl@1.1`...
Disable this behaviour by setting HOMEBREW_NO_INSTALL_CLEANUP.
Hide these hints with HOMEBREW_NO_ENV_HINTS (see `man brew`).

最後のメッセージが気になるので翻訳

openssl@1.1 は Keg 専用です。つまり、/opt/homebrew にシンボリックリンクされていません。
これは別の公式の代替バージョンであるためです。

PATH の最初に openssl@1.1 を含める必要がある場合は、次を実行します。
echo ‘export PATH=”/opt/homebrew/opt/openssl@1.1/bin:$PATH”‘ >> ~/.zshrc

コンパイラが openssl@1.1 を見つけるには、次の設定が必要になる場合があります。
エクスポート LDFLAGS=”-L/opt/homebrew/opt/openssl@1.1/lib”
エクスポート CPPFLAGS=”-I/opt/homebrew/opt/openssl@1.1/include”

pkg-config が openssl@1.1 を見つけるには、次の設定が必要になる場合があります。
import PKG_CONFIG_PATH=”/opt/homebrew/opt/openssl@1.1/lib/pkgconfig”
==> まとめ
🍺 /opt/homebrew/Cellar/openssl@1.1/1.1.1w: 8,102 ファイル、18MB
==> `brew cleanup openssl@1.1` を実行しています…
HOMEBREW_NO_INSTALL_CLEANUP を設定して、この動作を無効にします。
これらのヒントは HOMEBREW_NO_ENV_HINTS で非表示にします (「man brew」を参照)。

このため

export LDFLAGS="-L/opt/homebrew/opt/openssl@1.1/lib"
export CPPFLAGS="-I/opt/homebrew/opt/openssl@1.1/include"
export PKG_CONFIG_PATH="/opt/homebrew/opt/openssl@1.1/lib/pkgconfig"

の後に

 ./google-cloud-sdk/install.sh

を実行すると

Welcome to the Google Cloud CLI!

To help improve the quality of this product, we collect anonymized usage data
and anonymized stacktraces when crashes are encountered; additional information
is available at <https://cloud.google.com/sdk/usage-statistics>. This data is
handled in accordance with our privacy policy
<https://cloud.google.com/terms/cloud-privacy-notice>. You may choose to opt in this
collection now (by choosing 'Y' at the below prompt), or at any time in the
future by running the following command:

    gcloud config set disable_usage_reporting false

Do you want to help improve the Google Cloud CLI (y/N)?  

となる
メッセージを検索し

gcloud CLI をインストールする
を参考に
これは
N
でよいらしい

ちなみに日本語訳は
Google Cloud CLI へようこそ。

この製品の品質向上に役立てるため、当社は匿名化された使用状況データを収集します。
クラッシュが発生した場合は匿名化されたスタックトレース。追加情報
で入手できます。このデータは
弊社のプライバシーポリシーに従って取り扱われます
。これを選択することもできます
今すぐ (以下のプロンプトで「Y」を選択)、またはいつでも収集できます。
次のコマンドを実行して、将来のことを確認します。

gcloud config set disable_usage_reporting false

Google Cloud CLI の改善に協力したいと思いますか (y/N)?

どうやらデータ収集に同意するか
ということらしい

次に

Your current Google Cloud CLI version is: 477.0.0
The latest available version is: 484.0.0

┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                     Components                                                     │
├──────────────────┬──────────────────────────────────────────────────────┬──────────────────────────────┬───────────┤
│      Status      │                         Name                         │              ID              │    Size   │
├──────────────────┼──────────────────────────────────────────────────────┼──────────────────────────────┼───────────┤
│ Update Available │ BigQuery Command Line Tool                           │ bq                           │   1.7 MiB │
│ Update Available │ Cloud Storage Command Line Tool                      │ gsutil                       │  11.3 MiB │
│ Update Available │ Google Cloud CLI Core Libraries                      │ core                         │  19.1 MiB │
│ Not Installed    │ App Engine Go Extensions                             │ app-engine-go                │   4.5 MiB │
│ Not Installed    │ Appctl                                               │ appctl                       │  18.5 MiB │
│ Not Installed    │ Artifact Registry Go Module Package Helper           │ package-go-module            │   < 1 MiB │
│ Not Installed    │ Cloud Bigtable Command Line Tool                     │ cbt                          │  17.0 MiB │
│ Not Installed    │ Cloud Bigtable Emulator                              │ bigtable                     │   7.0 MiB │
│ Not Installed    │ Cloud Datastore Emulator                             │ cloud-datastore-emulator     │  36.2 MiB │
│ Not Installed    │ Cloud Firestore Emulator                             │ cloud-firestore-emulator     │  45.2 MiB │
│ Not Installed    │ Cloud Pub/Sub Emulator                               │ pubsub-emulator              │  63.7 MiB │
│ Not Installed    │ Cloud Run Proxy                                      │ cloud-run-proxy              │  11.3 MiB │
│ Not Installed    │ Cloud SQL Proxy v2                                   │ cloud-sql-proxy              │  13.2 MiB │
│ Not Installed    │ Google Container Registry's Docker credential helper │ docker-credential-gcr        │           │
│ Not Installed    │ Kustomize                                            │ kustomize                    │   7.4 MiB │
│ Not Installed    │ Log Streaming                                        │ log-streaming                │  11.9 MiB │
│ Not Installed    │ Minikube                                             │ minikube                     │  34.8 MiB │
│ Not Installed    │ Nomos CLI                                            │ nomos                        │  30.1 MiB │
│ Not Installed    │ On-Demand Scanning API extraction helper             │ local-extract                │  13.7 MiB │
│ Not Installed    │ Skaffold                                             │ skaffold                     │  22.8 MiB │
│ Not Installed    │ Terraform Tools                                      │ terraform-tools              │  63.6 MiB │
│ Not Installed    │ anthos-auth                                          │ anthos-auth                  │  20.9 MiB │
│ Not Installed    │ config-connector                                     │ config-connector             │  88.7 MiB │
│ Not Installed    │ enterprise-certificate-proxy                         │ enterprise-certificate-proxy │   8.3 MiB │
│ Not Installed    │ gcloud Alpha Commands                                │ alpha                        │   < 1 MiB │
│ Not Installed    │ gcloud Beta Commands                                 │ beta                         │   < 1 MiB │
│ Not Installed    │ gcloud app Java Extensions                           │ app-engine-java              │ 127.8 MiB │
│ Not Installed    │ gcloud app Python Extensions                         │ app-engine-python            │   5.0 MiB │
│ Not Installed    │ gcloud app Python Extensions (Extra Libraries)       │ app-engine-python-extras     │   < 1 MiB │
│ Not Installed    │ gke-gcloud-auth-plugin                               │ gke-gcloud-auth-plugin       │   4.0 MiB │
│ Not Installed    │ istioctl                                             │ istioctl                     │  24.8 MiB │
│ Not Installed    │ kpt                                                  │ kpt                          │  13.8 MiB │
│ Not Installed    │ kubectl                                              │ kubectl                      │   < 1 MiB │
│ Not Installed    │ kubectl-oidc                                         │ kubectl-oidc                 │  20.9 MiB │
│ Not Installed    │ pkg                                                  │ pkg                          │           │
│ Installed        │ Google Cloud CRC32C Hash Tool                        │ gcloud-crc32c                │   1.2 MiB │
└──────────────────┴──────────────────────────────────────────────────────┴──────────────────────────────┴───────────┘
To install or remove components at your current SDK version [477.0.0], run:
  $ gcloud components install COMPONENT_ID
  $ gcloud components remove COMPONENT_ID

To update your SDK installation to the latest version [484.0.0], run:
  $ gcloud components update


Modify profile to update your $PATH and enable shell command completion?

Do you want to continue (Y/n)?  

これを訳そうとしたけど
Google 翻訳は3900文字以上だとできなかった

To install or remove components at your current SDK version [477.0.0], run:
  $ gcloud components install COMPONENT_ID
  $ gcloud components remove COMPONENT_ID

To update your SDK installation to the latest version [484.0.0], run:
  $ gcloud components update


Modify profile to update your $PATH and enable shell command completion?

Do you want to continue (Y/n)?  

だけにして翻訳

現在の SDK バージョン [477.0.0] でコンポーネントをインストールまたは削除するには、次を実行します。
$ gcloud コンポーネントのインストール COMPONENT_ID
$ gcloud コンポーネントは COMPONENT_ID を削除します

SDK インストールを最新バージョン [484.0.0] に更新するには、次を実行します。
$ gcloud コンポーネントの更新

プロファイルを変更して $PATH を更新し、シェル コマンド補完を有効にしますか?

続行しますか (Y/n)?

これは
Y
でよいらしい

The Google Cloud SDK installer will now prompt you to update an rc file to bring
 the Google Cloud CLIs into your environment.

Enter a path to an rc file to update, or leave blank to use 
[/Users/snowpool/.zshrc]:  

の翻訳は

Google Cloud SDK インストーラは、rc ファイルを更新して、
Google Cloud CLI を環境に導入します。

更新する rc ファイルへのパスを入力するか、空白のままにして使用します
[/ユーザー/snowpool/.zshrc]:

これは
Enterキーにした

Backing up [/Users/snowpool/.zshrc] to [/Users/snowpool/.zshrc.backup].
[/Users/snowpool/.zshrc] has been updated.

==> Start a new shell for the changes to take effect.


Google Cloud CLI works best with Python 3.11 and certain modules.

Download and run Python 3.11 installer? (Y/n)?  

の翻訳は

[/Users/snowpool/.zshrc] を [/Users/snowpool/.zshrc.backup] にバックアップしています。
[/Users/snowpool/.zshrc]が更新されました。

==> 変更を有効にするために新しいシェルを開始します。

Google Cloud CLI は、Python 3.11 および特定のモジュールで最適に動作します。

Python 3.11 インストーラーをダウンロードして実行しますか? (はい/いいえ)?
なので
Y
にする

Running Python 3.11 installer, you may be prompted for sudo password...
Password:

となるので
Macのパスワードを入れる

あとは処理が自動で行われ

installer: Package name is Python
installer: Installing at base path /

installer: The install was successful.
Setting up virtual environment
Creating virtualenv...
Installing modules...
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 89.7/89.7 kB 2.4 MB/s eta 0:00:00
  Installing build dependencies ... done
  Getting requirements to build wheel ... done
  Preparing metadata (pyproject.toml) ... done
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 59.0/59.0 kB 5.0 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 10.4/10.4 MB 7.2 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 163.0/163.0 kB 3.4 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.9/2.9 MB 7.4 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 182.4/182.4 kB 8.7 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 117.6/117.6 kB 6.5 MB/s eta 0:00:00
  Building wheel for crcmod (pyproject.toml) ... done


Updates are available for some Google Cloud CLI components.  To install them,
please run:
  $ gcloud components update

Virtual env enabled.

For more information on how to get started, please visit:
  https://cloud.google.com/sdk/docs/quickstarts

となる

source ~/.zshrc

を実行すれば

gcloud -v

が実行できるようになる

結果は

Google Cloud SDK 477.0.0
bq 2.1.4
core 2024.05.17
gcloud-crc32c 1.0.0
gsutil 5.29
Updates are available for some Google Cloud CLI components.  To install them,
please run:
  $ gcloud components update

次はgcloudの初期化をする

LINE通知の時の画像サイズ変更

LINE通知の時の画像サイズ変更

3M以上の画像サイズになると

File: runs/detect/predict3/image0.jpg
400
{"status":400,"message":"Image size must be less than 3MB."}

となるため、画像ファイルの大きさを変更するプログラムが必要

なお画像については

results = model(image, save=True, conf=0.2, iou=0.5)

というように
save=True をつければ検出結果が出力されるので
その画像を使う

画像サイズが3MB以下になるように画像を1/4にリサイズして送信するようにコード変更

import requests
import os
from utils import load_config, get_latest_directory, get_image_files

# 設定ファイルを読み込む
config = load_config('config.json')

# 設定ファイルからトークンとディレクトリパスを取得
token = config['token']
base_path = config['image_file_path']

# 最新のpredictディレクトリを取得
latest_dir = get_latest_directory(base_path)
image_files = get_image_files(latest_dir)

url = 'https://notify-api.line.me/api/notify'
message = 'ファイルパス自動取得テスト'

headers = {'Authorization': f"Bearer {token}"}
params = {'message': message}

# 最新のpredictディレクトリ内の全ての画像ファイルに対してLINE Notify APIにリクエストを送信
for image_file_path in image_files:
    files = {'imageFile': open(image_file_path, 'rb')}
    
    # LINE Notify APIにリクエストを送信
    res = requests.post(url, headers=headers, params=params, files=files)

    # レスポンスを出力
    print(f"File: {image_file_path}")
    print(res.status_code)
    print(res.text)

import requests
import os
from PIL import Image
from io import BytesIO
from utils import load_config, get_latest_directory, get_image_files

# 設定ファイルを読み込む
config = load_config('config.json')

# 設定ファイルからトークンとディレクトリパスを取得
token = config['token']
base_path = config['image_file_path']

# 最新のpredictディレクトリを取得
latest_dir = get_latest_directory(base_path)
image_files = get_image_files(latest_dir)

url = 'https://notify-api.line.me/api/notify'
message = 'ファイルパス自動取得テスト'

headers = {'Authorization': f"Bearer {token}"}
params = {'message': message}

# 最新のpredictディレクトリ内の全ての画像ファイルに対してLINE Notify APIにリクエストを送信
for image_file_path in image_files:
    with open(image_file_path, 'rb') as img_file:
        img_data = img_file.read()
        
        # 画像ファイルのサイズをチェック
        if len(img_data) > 3 * 1024 * 1024:  # 3MB
            # 画像をリサイズ
            image = Image.open(BytesIO(img_data))
            new_size = (image.width // 2, image.height // 2)
            image = image.resize(new_size, Image.ANTIALIAS)
            
            # リサイズした画像をバイトデータに変換
            output = BytesIO()
            image.save(output, format=image.format)
            img_data = output.getvalue()
        
        # ファイルデータをバイトデータとして用意
        files = {'imageFile': BytesIO(img_data)}
        files['imageFile'].name = os.path.basename(image_file_path)
        
        # LINE Notify APIにリクエストを送信
        res = requests.post(url, headers=headers, params=params, files=files)

        # レスポンスを出力
        print(f"File: {image_file_path}")
        print(res.status_code)
        print(res.text)

としたが

/Users/snowpool/aw10s/inventory/line_order.py:34: DeprecationWarning: ANTIALIAS is deprecated and will be removed in Pillow 10 (2023-07-01). Use LANCZOS or Resampling.LANCZOS instead.
  image = image.resize(new_size, Image.ANTIALIAS)
Traceback (most recent call last):
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/PIL/Image.py", line 2408, in save
    format = EXTENSION[ext]
KeyError: ''

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/inventory/line_order.py", line 38, in <module>
    image.save(output, format=image.format)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/PIL/Image.py", line 2411, in save
    raise ValueError(msg) from e
ValueError: unknown file extension: 

となる

これは
ANTIALIASの非推奨:ANTIALIASがPillow 10で非推奨となり、代わりにLANCZOSを使用する必要がある

画像の形式が不明:image.saveメソッドに指定された形式が正しく解釈されていないため、KeyErrorが発生しています。これは、image.formatが正しく設定されていないか、空であるため
の2つが原因

import requests
import os
from PIL import Image
from io import BytesIO
from utils import load_config, get_latest_directory, get_image_files

# 設定ファイルを読み込む
config = load_config('config.json')

# 設定ファイルからトークンとディレクトリパスを取得
token = config['token']
base_path = config['image_file_path']

# 最新のpredictディレクトリを取得
latest_dir = get_latest_directory(base_path)
image_files = get_image_files(latest_dir)

url = 'https://notify-api.line.me/api/notify'
message = 'ファイルパス自動取得テスト'

headers = {'Authorization': f"Bearer {token}"}
params = {'message': message}

# 最新のpredictディレクトリ内の全ての画像ファイルに対してLINE Notify APIにリクエストを送信
for image_file_path in image_files:
    with open(image_file_path, 'rb') as img_file:
        img_data = img_file.read()
        
        # 画像ファイルのサイズをチェック
        if len(img_data) > 3 * 1024 * 1024:  # 3MB
            # 画像をリサイズ
            image = Image.open(BytesIO(img_data))
            new_size = (image.width // 2, image.height // 2)
            image = image.resize(new_size, Image.LANCZOS)
            
            # リサイズした画像をバイトデータに変換
            output = BytesIO()
            image_format = image.format if image.format else 'JPEG'  # デフォルトでJPEG形式を設定
            image.save(output, format=image_format)
            img_data = output.getvalue()
        
        # ファイルデータをバイトデータとして用意
        files = {'imageFile': BytesIO(img_data)}
        files['imageFile'].name = os.path.basename(image_file_path)
        
        # LINE Notify APIにリクエストを送信
        res = requests.post(url, headers=headers, params=params, files=files)

        # レスポンスを出力
        print(f"File: {image_file_path}")
        print(res.status_code)
        print(res.text)

とすることで

File: runs/detect/predict3/image0.jpg
200
{"status":200,"message":"ok"}

となり、元の画像サイズが大きくても問題なく送信できるようになった

LINE Notify を通知で使う

LINEで通知できるようにする

在庫管理をできるようにしたら
足りないものを知らせる機能が必要

LINEで買い物リストとして昼ぐらいに送信すれば
帰りに購入して帰ることができる

過去記事を参考に
LINE Messasging API でメッセージ送信

を参考にリンクをしたら

LINE Business ID
になってしまうので
再度調べることにする

LINE: LINE Notifyを用いてWindowsのcurlコマンドからメッセージを投稿する
を参考に再度設定をしていく

スマホのLINEアプリで
トーク > トークルームの作成 > グループ

友達は誰も選択せずに次へ

次に友達をグループに追加があるけど
デフォルトで友達をグループに自動で追加がチェックされているので
チェックを外す

グループ名は買い物リスト
とした

次にLINE Notifyにログインする
https://notify-bot.line.me/ja/

ログインの時にQRコードからログインできるので
スマホのLINEアプリでQRコードを読み取る
もしくはスマホのカメラアプリでQRコードを読み取ると
LINEアプリでQRコードを読み取るように出るので
そのまま実行していくと認証画面になり
PCの画面に表示された数字をスマホで打ち込めば
ログインできる

ログインできたらトークンを発行する

マイページ > アクセストークンの発行

トークンを発行する
をクリックし
トークン名を入力し
通知を送信するトークルームを選択

今回は両方とも
買い物リストを選択

これでトークンが発行される

次にスマホで
買い物リストの
トークルームを開き
設定 > 招待 で
LINE notifyを選択し招待する

ここへはcurlコマンドでメッセージを送信できる

https://notify-bot.line.me/doc/ja/
のサンプルは

 curl -X POST -H 'Authorization: Bearer <access_token>' -F 'message=foobar' \
https://notify-api.line.me/api/notify
{"status":200,"message":"ok"}

 curl -v -X POST -H 'Authorization: Bearer invalidtoken' -F 'message=foobar' \
https://notify-api.line.me/api/notify
{"status":401,"message":"Invalid access token"}

https://qiita.com/frozencatpisces/items/679d66ab1d617b7a40cb#1-投稿先トークルームの作成
では

curl -X POST -H "Authorization: Bearer 発行したトークン" -F "message=foobar" https://notify-api.line.me/api/notify

これをリストにする場合は複数行必要なので
LINE Notify で curl で改行する
https://blog.framinal.life/entry/2023/06/14/151933
を参考に

URLエンコーディングされた改行文字(%0A) に変換して送ることでできそう

message="こんにちは\n元気ですか?\n\n"

# Convert newline characters to URL-encoded form
message_encoded=$(echo -e $message | awk '{printf "%s%%0A", $0}')

curl -X POST -H "Authorization: Bearer XXXXXX" --data-binary "message=$message_encoded" https://notify-api.line.me/api/notify

ちょっと古い情報で3年前のだと

LINE NotifyからのLINE通知を改行する方法【Python/LINE Notify(API)】
だと
Lineで通知する文章を改行したい場合、「\n」を入れると改行できるらしい

send_contents = f'\n今日は\n{strftime}({day_of_the_week[weekday]})です。'

また

send_contents = '\n今日は\n' + str(strftime) + '(' + str(day_of_the_week[weekday]) + ')です。'

というようにしてもOK

“Line Notify”を利用してPythonでLineに通知を送る
では
requesta
を使っている

pip install requests

テキストだけなら

import requests

url = 'https://notify-api.line.me/api/notify'

token = '発行したトークン'
message = '通知したいメッセージ'

headers = {'Authorization': f"Bearer {token}"}
params = {'message': message}

requests.post(url, headers=headers, params=params)

テキストと画像付きなら

import requests

url = 'https://notify-api.line.me/api/notify'

token = '発行したトークン'
message = '通知したいテキスト'
image_file_path = 'イメージファイルパス'

headers = {'Authorization': f"Bearer {token}"}
params = {'message': message}
files = {'imageFile': open(image_file_path, 'rb')}

res = requests.post(url, headers=headers, params=params, files=files)

テキストの改行なら

message = 'ここで改行\n改行後のテキスト'

PythonでLINE Notifyを使ってみよう
によれば
・メッセージを改行したい場合は「\n」を挿入
・メッセージは最大1000文字まで

import requests
def notify(message):

        url = 'https://notify-api.line.me/api/notify'
        token = '発行されたトークン'
        headers = {'Authorization': 'Bearer ' + token}

        message = message
        params = {'message': message}

        requests.post(url, headers=headers, params=params)

if __name__ == '__main__':
    notify('テスト')

がサンプル

こちらもrequestを使用

[Python]LINEで天気を自動通知させてみた[初心者]
によれば
Webから天気情報を取得してLINEで自動通知をしてみました。定期実行までやります
では
Cronで定期実行

実行するコードは

import datetime
import urllib.request as req
import requests
from bs4 import BeautifulSoup
import re

#LINE notifyの設定を行う
url = "https://notify-api.line.me/api/notify"
access_token = '○○○○○'
headers = {'Authorization': 'Bearer ' + access_token}

#天気サイトから欲しい情報を取得する
url2 = "https://tenki.jp/forecast/3/17/4610/14100/"   #欲しい情報があるURLを指定
res = requests.get(url2)                              #上記URL情報を取得する
soup = BeautifulSoup(res.content, 'html.parser')      #取得した情報をhtmlで解析する

# 以下各種情報を取得
ddd = soup.find(class_="left-style")                  

telop = soup.find("p", class_="weather-telop").string

highlists = soup.find("dd",class_="high-temp temp")

lowlists = soup.find("dd",class_="low-temp temp")

ttt = soup.find(class_="rain-probability")

row=[]
for t in ttt:
    row.append(t)

# message変数に通知したい文を代入する 改行したい場合は "\n" とダブルクォテーションで囲う
message="\n" + ddd.text + "\n" + telop + "\n" + "最高 " + highlists.text + "\n" + "最低 " + lowlists.text + "\n"+ "---------" + "\n" +row[1].text +"\n" + "~6  : " + row[3].text + "\n" + "~12 : " + row[5].text +"\n" + "~18 : " + row[7].text +"\n" + "~24 : " + row[9].text +"\n" +"今日も元気に٩( 'ω' )و "

payload = {'message': message}
r = requests.post(url, headers=headers, params=payload,)

というように requestを使っている

あと
1人のユーザーにつき、1時間に通知できる回数は1000回まで
という縛りがあるが
買い物リストは1日1回程度だと思うし
カメラ画像からとして考えても3箇所程度だと思うので問題なし

とりあえずchatgptで調べた結果
Curl でも requestでも問題はなさそう

コードメンテを考えるとrequestsの方が良さそう

ということで

vim line_order.py

でファイルを作成し

vim config.json

で設定ファイルを作成

{
  "token": "発行したトークン",
  "image_file_path": "イメージファイルパス"
}
import requests
import json

# 設定ファイルを読み込む関数
def load_config(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

# 設定ファイルを読み込む
config = load_config('config.json')

# 設定ファイルからトークンとファイルパスを取得
token = config['token']
image_file_path = config['image_file_path']

url = 'https://notify-api.line.me/api/notify'
message = '通知したいテキスト'

headers = {'Authorization': f"Bearer {token}"}
params = {'message': message}
files = {'imageFile': open(image_file_path, 'rb')}

# LINE Notify APIにリクエストを送信
res = requests.post(url, headers=headers, params=params, files=files)

# レスポンスを出力
print(res.status_code)
print(res.text)


line_order.py
の内容を書き換えても

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/inventory/line_order.py", line 21, in <module>
    files = {'imageFile': open(image_file_path, 'rb')}
IsADirectoryError: [Errno 21] Is a directory: 'image/'

となる

とりあえず画像を指定する

{
  "token": "発行したトークン",
  "image_file_path": "runs/detect/predict7/Baskulin1.jpg"
}

とすれば成功

改良点としては
メッセージの文章を
検出結果のラベルを変換した文字列にすること
検出結果は
runs/detect/
の中にどんどん新しい番号が付けられて増えていくため
動的にパスを取得するスクリプトにすること

osモジュールを使用して、指定されたディレクトリ内のサブディレクトリをリストアップし、その中で最新の番号を持つディレクトリを特定できる

1. os.listdir(base_path)を使用して、指定されたディレクトリ内の全てのファイルとディレクトリのリストを取得します。
2. リスト内の要素がディレクトリであるかどうかを確認するためにos.path.isdir()を使用します。
3. predictプレフィックスを削除して数値に変換し、max()関数を使用して最大の数値を持つディレクトリを特定します。
4. os.path.join(base_path, latest_dir)を使用して、フルパスを生成します。

import os

def get_latest_directory(base_path):
    # 指定されたディレクトリ内の全てのサブディレクトリを取得
    subdirs = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]
    
    # サブディレクトリ名を数値に変換し、ソートして最新のディレクトリを特定
    latest_dir = max(subdirs, key=lambda x: int(x.replace('predict', '')))
    
    return os.path.join(base_path, latest_dir)

# 使用例
base_path = 'runs/detect'
latest_dir = get_latest_directory(base_path)
print(f"Latest directory: {latest_dir}")

実行すると

File "/Users/snowpool/aw10s/inventory/utils.py", line 14, in <module> latest_dir = get_latest_directory(base_path) File "/Users/snowpool/aw10s/inventory/utils.py", line 8, in get_latest_directory latest_dir = max(subdirs, key=lambda x: int(x.replace('predict', ''))) File "/Users/snowpool/aw10s/inventory/utils.py", line 8, in <lambda> latest_dir = max(subdirs, key=lambda x: int(x.replace('predict', ''))) ValueError: invalid literal for int() with base 10: '' 

となる

原因は

ValueError: invalid literal for int() with base 10: ''

というエラーは
int()関数が空文字列を処理しようとしたときに発生

これは、predictという文字列をreplaceで削除した結果が
空文字列になる場合に起こる

例えば、predictという名前のディレクトリがある場合など

この問題を解決するために
ディレクトリ名がpredictのプレフィックスを持っているかどうかをチェックし
それ以外のディレクトリ名を無視するようにする

また、predictの後の文字列が数字であることを確認するために
追加のチェックを行う

対処として
1. predictで始まり、その後に数字が続くディレクトリのみを対象とするようにフィルタリングしています。
2. 有効なディレクトリが存在しない場合に適切なエラーメッセージを出力します。

import os

def get_latest_directory(base_path):
    # 指定されたディレクトリ内の全てのサブディレクトリを取得
    subdirs = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]

    # サブディレクトリ名が 'predict' で始まり、その後に数字が続くものをフィルタリング
    predict_dirs = [d for d in subdirs if d.startswith('predict') and d[7:].isdigit()]

    if not predict_dirs:
        raise ValueError("No valid 'predict' directories found")

    # サブディレクトリ名を数値に変換し、ソートして最新のディレクトリを特定
    latest_dir = max(predict_dirs, key=lambda x: int(x[7:]))

    return os.path.join(base_path, latest_dir)

# 使用例
base_path = 'runs/detect'
latest_dir = get_latest_directory(base_path)
print(f"Latest directory: {latest_dir}")

これで実行すると

Latest directory: runs/detect/predict7

となった

次は画像ファイルパスの取得

import os

def get_latest_directory(base_path):
    # 指定されたディレクトリ内の全てのサブディレクトリを取得
    subdirs = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]

    # サブディレクトリ名が 'predict' で始まり、その後に数字が続くものをフィルタリング
    predict_dirs = [d for d in subdirs if d.startswith('predict') and d[7:].isdigit()]

    if not predict_dirs:
        raise ValueError("No valid 'predict' directories found")

    # サブディレクトリ名を数値に変換し、ソートして最新のディレクトリを特定
    latest_dir = max(predict_dirs, key=lambda x: int(x[7:]))

    return os.path.join(base_path, latest_dir)

def get_image_files(directory):
    # 指定されたディレクトリ内の全ての画像ファイルのパスを取得
    image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp')
    image_files = [os.path.join(directory, file) for file in os.listdir(directory) if file.lower().endswith(image_extensions)]
    return image_files

# 使用例
base_path = 'runs/detect'
latest_dir = get_latest_directory(base_path)
image_files = get_image_files(latest_dir)

print(f"Latest directory: {latest_dir}")
print("Image files:")
for image_file in image_files:
    print(image_file)

とすることで画像ファイルのパスが取得できた

Latest directory: runs/detect/predict7
Image files:
runs/detect/predict7/Baskulin1.jpg

次にこれをモジュールにして
LINEの画像パスにして送信テストする

import os
import json

def load_config(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

def get_latest_directory(base_path):
    # 指定されたディレクトリ内の全てのサブディレクトリを取得
    subdirs = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]

    # サブディレクトリ名が 'predict' で始まり、その後に数字が続くものをフィルタリング
    predict_dirs = [d for d in subdirs if d.startswith('predict') and d[7:].isdigit()]

    if not predict_dirs:
        raise ValueError("No valid 'predict' directories found")

    # サブディレクトリ名を数値に変換し、ソートして最新のディレクトリを特定
    latest_dir = max(predict_dirs, key=lambda x: int(x[7:]))

    return os.path.join(base_path, latest_dir)

def get_image_files(directory):
    # 指定されたディレクトリ内の全ての画像ファイルのパスを取得
    image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp')
    image_files = [os.path.join(directory, file) for file in os.listdir(directory) if file.lower().endswith(image_extensions)]
    return image_files

として

config.jsonの中の

  "image_file_path": "runs/detect/predict7/Baskulin1.jpg"

という指定を

  "image_file_path": "runs/detect"

に変更

line_order.pyの中身を

import requests
import os
from utils import load_config, get_latest_directory, get_image_files

# 設定ファイルを読み込む
config = load_config('config.json')

# 設定ファイルからトークンとディレクトリパスを取得
token = config['token']
base_path = config['image_file_path']

# 最新のpredictディレクトリを取得
latest_dir = get_latest_directory(base_path)
image_files = get_image_files(latest_dir)

url = 'https://notify-api.line.me/api/notify'
message = '通知したいテキスト'

headers = {'Authorization': f"Bearer {token}"}
params = {'message': message}

# 最新のpredictディレクトリ内の全ての画像ファイルに対してLINE Notify APIにリクエストを送信
for image_file_path in image_files:
    files = {'imageFile': open(image_file_path, 'rb')}
    
    # LINE Notify APIにリクエストを送信
    res = requests.post(url, headers=headers, params=params, files=files)

    # レスポンスを出力
    print(f"File: {image_file_path}")
    print(res.status_code)
    print(res.text)

として保存

これで

python line_order.py 

を実行すれば画像付きでLINEで送信してくれる

試しに再度新しいyolov8での推論をして
できたディレクトリを対象にするか実験する

しかし

python count_inventory_terminal.py data_bak/Baskulin1.jpg

0: 640x512 1 baskulin, 93.9ms
Speed: 2.7ms preprocess, 93.9ms inference, 2.9ms postprocess per image at shape (1, 3, 640, 512)
バスクリン: 1個

の後に

python line_order.py

を実行しても

File: runs/detect/predict7/Baskulin1.jpg
200
{"status":200,"message":"ok"}

となる

ls runs/detect
の結果も
predict		predict3	predict5	predict7
predict2	predict4	predict6

となる

どうやらyoloのコマンドで実行した時だけ
runs/predict以下に保存されるらしい

つまりカウントした後に画像を保存するプログラムを追加しないとだめ
あと、カウントした時に残り1以下になった時に
ラベルを書き出すプログラムが必要

FaceRecognizerSFによる顔の認識の実践

FaceRecognizerSFによる顔の認識の実践

スマホの写真のサイズを1/4にすることで

generate_aligned_faces.py

による顔の切り出しが成功した

取り出した顔画像は
Face00x.jpg
となっているので、個人ごとの名前ファイルに改名する

次に
顔画像から特徴を抽出、特徴辞書として保存する

python generate_feature_dictionary.py snowpool.jpg

これで
snowpool.npy
が作成される

同様に家族分も実行する

python resize_save.py PXL_20240218_063620749.jpg

で画像ファイルを1/4にして

python generate_aligned_faces.py PXL_20240218_063620749_quarter.jpg 

で写真から顔を抽出

mv face001.jpg child.jpg

python generate_feature_dictionary.py child.jpg

これでそれぞれのnpyファイルができる

次に識別
モデルが変更になっているので

     weights = os.path.join(directory, "yunet.onnx")

    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")

というように
指定するモデルを
変更する

編集するファイルは

face_recognizer.py

これで

python face_recognizer.py PXL_20240218_063620749.jpg 

を実行したら

OpenCV: Couldn't read video stream from file "/Users/snowpool/aw10s/face_recog/image.jpg"

となる

https://sites.google.com/iot-com.net/home/ホーム/実験室/jetson-nano/jetson-nanoのopen-cvで顔認証
によれば

#            return True, (user_id, cos_score)   ←オリジナルのtypo
            return True, (user_id, score)

とあるので

            # return True, (user_id, cos_score)
            return True, (user_id, score)

というように修正

そして
そのままだとファイル名が指定されているので
コマンドラインからファイル名を指定して実行できるように
ソースを変更する

Mainの部分を

# main関数の引数を追加 def main(image_path): # captureの初期化を変更 capture = cv2.VideoCapture(image_path)  # コマンドラインから指定された画像ファイル

として

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Face Recognition")
    parser.add_argument('image', help="Path to the image file")
    args = parser.parse_args()
    main(args.image)

と書き換える

これで再度実行してみる

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/face_recog/face_recognizer.py", line 108, in <module>
    main(args.image)
  File "/Users/snowpool/aw10s/face_recog/face_recognizer.py", line 37, in main
    files = glob.glob(os.path.join(directory, "*.npy"))
NameError: name 'directory' is not defined

となった

原因は

    directory = os.path.dirname(__file__)

を削除したことでディレクトリの指定ができなくなっていた

import os
import sys
import glob
import numpy as np
import cv2
import argparse

COSINE_THRESHOLD = 0.363
NORML2_THRESHOLD = 1.128

# 特徴を辞書と比較してマッチしたユーザーとスコアを返す関数
def match(recognizer, feature1, dictionary):
    for element in dictionary:
        user_id, feature2 = element
        score = recognizer.match(feature1, feature2, cv2.FaceRecognizerSF_FR_COSINE)
        if score > COSINE_THRESHOLD:
            # return True, (user_id, cos_score)
            return True, (user_id, score)

    return False, ("", 0.0)

# def main():
#     # キャプチャを開く
#     directory = os.path.dirname(__file__)
#     capture = cv2.VideoCapture(os.path.join(directory, "image.jpg")) # 画像ファイル
# main関数の引数を追加
def main(image_path):
    # captureの初期化を変更
    directory = os.path.dirname(__file__)
    
    capture = cv2.VideoCapture(image_path)  # コマンドラインから指定された画像ファイル

    #capture = cv2.VideoCapture(0) # カメラ
    if not capture.isOpened():
        exit()

    # 特徴を読み込む
    dictionary = []
    files = glob.glob(os.path.join(directory, "*.npy"))
    for file in files:
        feature = np.load(file)
        user_id = os.path.splitext(os.path.basename(file))[0]
        dictionary.append((user_id, feature))

    # モデルを読み込む
    # weights = os.path.join(directory, "yunet.onnx")
    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = os.path.join(directory, "face_recognizer_fast.onnx")
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")

    while True:
        # フレームをキャプチャして画像を読み込む
        result, image = capture.read()
        if result is False:
            cv2.waitKey(0)
            break

        # 画像が3チャンネル以外の場合は3チャンネルに変換する
        channels = 1 if len(image.shape) == 2 else image.shape[2]
        if channels == 1:
            image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
        if channels == 4:
            image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)

        # 入力サイズを指定する
        height, width, _ = image.shape
        face_detector.setInputSize((width, height))

        # 顔を検出する
        result, faces = face_detector.detect(image)
        faces = faces if faces is not None else []

        for face in faces:
            # 顔を切り抜き特徴を抽出する
            aligned_face = face_recognizer.alignCrop(image, face)
            feature = face_recognizer.feature(aligned_face)

            # 辞書とマッチングする
            result, user = match(face_recognizer, feature, dictionary)

            # 顔のバウンディングボックスを描画する
            box = list(map(int, face[:4]))
            color = (0, 255, 0) if result else (0, 0, 255)
            thickness = 2
            cv2.rectangle(image, box, color, thickness, cv2.LINE_AA)

            # 認識の結果を描画する
            id, score = user if result else ("unknown", 0.0)
            text = "{0} ({1:.2f})".format(id, score)
            position = (box[0], box[1] - 10)
            font = cv2.FONT_HERSHEY_SIMPLEX
            scale = 0.6
            cv2.putText(image, text, position, font, scale, color, thickness, cv2.LINE_AA)

        # 画像を表示する
        cv2.imshow("face recognition", image)
        key = cv2.waitKey(1)
        if key == ord('q'):
            break
    
    cv2.destroyAllWindows()

# if __name__ == '__main__':
#     main()
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Face Recognition")
    parser.add_argument('image', help="Path to the image file")
    args = parser.parse_args()
    main(args.image)


再度

python face_recognizer.py PXL_20240218_063620749.jpg 


実行したら顔の認識ができた

OpenCVのSFaceで顔認証の準備

OpenCVのSFaceで顔認証

https://sites.google.com/iot-com.net/home/ホーム/実験室/jetson-nano/jetson-nanoのopen-cvで顔認証
を参考に
OpenCVにDNNを使用した顔認識SFaceが実装され、誰の顔かを認識できる様になったのとの記事を見つけて試してみました。
記事 : OpenCVの新しい顔認識を試してみる https://qiita.com/UnaNancyOwen/items/8c65a976b0da2a558f06
Github : OpenCV ObjDetect Module Face Recognition (SFace) Sample https://gist.github.com/UnaNancyOwen/49df508ad8b6d9520024354df0e3e740

顔認識は OpenCV 4.5.4 以上からの導入になる

基本的にはGithub のPython をコピペすればそのまま動作する

generate_aligned_faces.py
入力した写真から人の顔の部分を切り出して保存するプログラム
複数の人物が写っている場合は全員を切り出して
face001.jpg , face002.jpg ・・・ と名前を付けて保存する
出力されたファイル名を 人の名前に変更しておくと後々便利です。 
face001.jpg → taro.jpg
というようにリネームする

generate_feature_dictionary.py
切り出した顔のjpgファイルを読み込んで、顔の特徴量に変換するプログラム
顔写真 taro.jpg を入力すると 顔の特徴量 taro.npy が出力される
このnumpyファイルに各個人の顔の特徴量が128次元ベクトルに変換されて入る

face_recognizer.py
入力された写真に上記で作成した顔の特徴量が近い人が写っているかを判別するプログラム
特徴量の npyファイルは同じフォルダに入っているものが全て自動で読み込まれる
表示される名前は特徴量ファイル名となるので人物名をファイル名にした方がわかりやすい
類似した顔が無い場合には Unknown と表示

これらを元に実践する

generate_aligned_faces.py
で写真のファイルを引数にして実行

python generate_aligned_faces.py image.jpg

とすれば
写真に写っている人の分だけファイルができる
そのファイル名を人の名前に変更する

つまり全て
face001.jpg
という感じで
Face00x.jpg
となっているので写真ごとに名前を変える

次に
generate_feature_dictionary.py

切り出した顔のjpgファイルを読み込んで、顔の特徴量に変換するプログラムです。
例えば 顔写真 taro.jpg を入力すると 顔の特徴量 taro.npy が出力されます。
このnumpyファイルに各個人の顔の特徴量が128次元ベクトルに変換されて入ってます。

python generate_feature_dictionary.py face001.jpg
python generate_feature_dictionary.py face002.jpg

写真の人の分だけ実行すればOK
人物名なら

python generate_feature_dictionary.py tarojpg
python generate_feature_dictionary.py jiro.jpg
python generate_feature_dictionary.py hoge.jpg
python generate_feature_dictionary.py hogehoge.jpg

これで
顔の特徴量 taro.npy
というようなnpyファイルが作成される
実際には画像ファイル名.npyファイルになる

実行するにあたり
写真を用意する必要がある
横も認識したいのなら、横の写真も必要になる

とりあえず写真を探すこと
まずは自分の写真を撮影し
GooglePhotoからダウンロード

 cp ~/Downloads/PXL_20240612_091410912.jpg .

でコピー

作業ディレクトリは

/Users/snowpool/aw10s/face_recog

で行う

https://gist.github.com/UnaNancyOwen/49df508ad8b6d9520024354df0e3e740#file-face_recognizer-pyのコードをそのまま使う

Download Zip
でダウンロードし展開
中身を

 cp ~/Downloads/49df508ad8b6d9520024354df0e3e740-54e7dbd2f15b6137dc2b6d4ef6ce3143528c3978/* .

でコピー

ソースだけでなくモデルのダウンロードが必要

https://github.com/ShiqiYu/libfacedetection.train/blob/master/tasks/task1/onnx/yunet.onnx
をクリックしたら

404 - page not found
The 
master
 branch of 
libfacedetection.train
 does not contain the path 

https://drive.google.com/file/d/1ClK9WiB492c5OZFKveF3XiHCejoOxINW/view
については
プレビューできません

となる

とりあえずモデルのダウンロードを調べることにする

https://www.eranger.co.jp/blog/news/face-detection-recognition-by-opencv
の記事を参考に

face_detection_yunet_2023mar.onnx
で検索

https://github.com/opencv/opencv_zoo/blob/main/models/face_detection_yunet/face_detection_yunet_2023mar.onnx
にデータがあったので

https://github.com/opencv/opencv_zoo/tree/main
のreadmeを見てから

git clone https://github.com/opencv/opencv_zoo.git

でリポジトリのclone

cp opencv_zoo/models/face_detection_yunet/face_detection_yunet_2023mar.onnx .

で作業ディレクトリにコピー

python generate_aligned_faces.py PXL_20240612_091410912.jpg 

を実行したが

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/face_recog/generate_aligned_faces.py", line 60, in <module>
    main()
  File "/Users/snowpool/aw10s/face_recog/generate_aligned_faces.py", line 33, in main
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
cv2.error: OpenCV(4.10.0) /Users/xperience/GHA-Actions-OpenCV/_work/opencv-python/opencv-python/opencv/modules/dnn/src/onnx/onnx_importer.cpp:277: error: (-5:Bad argument) Can't read ONNX file: /Users/snowpool/aw10s/face_recog/yunet.onnx in function 'ONNXImporter'

となる

python

でPythonインタープリターを使用し

import cv2
print(cv2.__version__)

でバージョンを確認すると
4.10.0
となった

pip show opencv-python

Name: opencv-python
Version: 4.8.0.74
Summary: Wrapper package for OpenCV python bindings.
Home-page: https://github.com/opencv/opencv-python
Author: 
Author-email: 
License: Apache 2.0
Location: /Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages
Requires: numpy, numpy, numpy, numpy, numpy
Required-by: ultralytics

だとバージョンが違う

Pythonインタープリターで
現在のインポートされているOpenCVの場所を確認

import cv2
print(cv2.__file__)

の結果は

/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/cv2/__init__.py

システムパスの確認

import sys
print(sys.path)

の結果は

['', '/Users/snowpool/.pyenv/versions/3.10.6/lib/python310.zip', '/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10', '/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/lib-dynload', '/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages']

その前によくREADMEを読んで再度実行

rm face_detection_yunet_2023mar.onnx

で一度削除し

cd opencv_zoo

git lfs install

これで
Git LFSが有効化され、大きなファイルを扱う準備が整う

git lfs pull
Git LFSを用いて管理されているファイル(大容量のファイルなど)をダウンロードします。git clone はリポジトリのメタデータと小さなファイルのみをダウンロードするため、git lfs pull を使用してLFSを介して管理されている大きなファイルを取得する必要があります
とのこと

cp opencv_zoo/models/face_detection_yunet/face_detection_yunet_2023mar.onnx .

でファイルをコピー

次に
* face_recognizer_fast.onnx
これもダウンロードできなかったので
githubで検索

https://github.com/MYJLAB-2022-HackThon/FaceServer/blob/27ce7099eb3ec46bb07d988b9681e9cc2a6b291c/app/face_recognizer_fast.onnx
にあったので

git clone https://github.com/MYJLAB-2022-HackThon/FaceServer.git

でリポジトリをclone

cp FaceServer/app/face_recognizer_fast.onnx .

でファイルをコピー

これで再度

 python generate_aligned_faces.py PXL_20240612_091410912.jpg

を実行したら

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/face_recog/generate_aligned_faces.py", line 60, in <module>
    main()
  File "/Users/snowpool/aw10s/face_recog/generate_aligned_faces.py", line 33, in main
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
cv2.error: OpenCV(4.10.0) /Users/xperience/GHA-Actions-OpenCV/_work/opencv-python/opencv-python/opencv/modules/dnn/src/onnx/onnx_importer.cpp:277: error: (-5:Bad argument) Can't read ONNX file: /Users/snowpool/aw10s/face_recog/yunet.onnx in function 'ONNXImporter'

となる

多分

    # モデルを読み込む
    weights = os.path.join(directory, "yunet.onnx")
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = os.path.join(directory, "face_recognizer_fast.onnx")

のonnxファイルの指定を変えればいけるはず

generate_aligned_faces.py
の中の

    # weights = os.path.join(directory, "yunet.onnx")

    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")

にして実行

とりあえず

import os
import argparse
import numpy as np
import cv2

def main():
    # 引数をパースする
    parser = argparse.ArgumentParser("generate aligned face images from an image")
    parser.add_argument("image", help="input image file path (./image.jpg)")
    args = parser.parse_args()

    # 引数から画像ファイルのパスを取得
    path = args.image
    directory = os.path.dirname(args.image)
    if not directory:
        directory = os.path.dirname(__file__)
        path = os.path.join(directory, args.image)

    # 画像を開く
    image = cv2.imread(path)
    if image is None:
        exit()

    # 画像が3チャンネル以外の場合は3チャンネルに変換する
    channels = 1 if len(image.shape) == 2 else image.shape[2]
    if channels == 1:
        image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
    if channels == 4:
        image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)

    # モデルを読み込む
    # weights = os.path.join(directory, "yunet.onnx")
    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")
    
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = os.path.join(directory, "face_recognizer_fast.onnx")
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")

    # 入力サイズを指定する
    height, width, _ = image.shape
    face_detector.setInputSize((width, height))

    # 顔を検出する
    _, faces = face_detector.detect(image)

    # 検出された顔を切り抜く
    aligned_faces = []
    if faces is not None:
        for face in faces:
            aligned_face = face_recognizer.alignCrop(image, face)
            aligned_faces.append(aligned_face)

    # 画像を表示、保存する
    for i, aligned_face in enumerate(aligned_faces):
        cv2.imshow("aligned_face {:03}".format(i + 1), aligned_face)
        cv2.imwrite(os.path.join(directory, "face{:03}.jpg".format(i + 1)), aligned_face)

    cv2.waitKey(0)
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

として

サンプル画像をダウンロード後

python generate_aligned_faces.py 136777535-36d6bce1-91bf-446c-9377-645cc60b9c65.jpg

とすると
face001.jpg
face002.jpg
が作成されますが

スマホで撮影した画像で

python generate_aligned_faces.py PXL_20240612_091410912.jpg

とすると処理が終わりません

ファイルサイズなども関連しているかもしれないため一度ファイルサイズなども調べてみます

ファイル情報を調べたいので

 vim file_info.py

import cv2
import os
import argparse

def main():
    # コマンドライン引数を解析するパーサーを作成
    parser = argparse.ArgumentParser(description="Display image properties")
    parser.add_argument("image_path", help="Path to the image file")
    args = parser.parse_args()

    # 画像を読み込む
    image = cv2.imread(args.image_path)
    if image is None:
        print("画像が読み込めませんでした。")
        return

    # 画像の高さ、幅、チャンネル数を取得
    height, width, channels = image.shape
    print(f"画像の幅: {width} ピクセル")
    print(f"画像の高さ: {height} ピクセル")
    print(f"色チャネル数: {channels}")

    # ファイルサイズを取得
    file_size = os.path.getsize(args.image_path)
    print(f"ファイルサイズ: {file_size} バイト")

if __name__ == '__main__':
    main()

で
python file_info.py PXL_20240612_091410912.jpg

結果は

画像の幅: 2736 ピクセル
画像の高さ: 3648 ピクセル
色チャネル数: 3
ファイルサイズ: 2319152 バイト
python file_info.py 136777535-36d6bce1-91bf-446c-9377-645cc60b9c65.jpg

だと

画像の幅: 450 ピクセル
画像の高さ: 312 ピクセル
色チャネル数: 3
ファイルサイズ: 26914 バイト

リサイズして半分にしてから実行したいので

vim resize_save.py

中身は

import cv2
import os
import argparse

def main():
    # コマンドライン引数を解析するパーサーを作成
    parser = argparse.ArgumentParser(description="Resize and save an image")
    parser.add_argument("image_path", help="Path to the image file")
    args = parser.parse_args()

    # 画像を読み込む
    image = cv2.imread(args.image_path)
    if image is None:
        print("画像が読み込めませんでした。")
        return

    # 画像の元の高さ、幅を取得
    height, width = image.shape[:2]

    # 新しい寸法を計算(元のサイズの半分)
    new_width = width // 2
    new_height = height // 2

    # 画像をリサイズ
    resized_image = cv2.resize(image, (new_width, new_height))

    # 新しいファイル名を設定
    new_file_path = os.path.splitext(args.image_path)[0] + "_resized.jpg"

    # リサイズした画像を保存
    cv2.imwrite(new_file_path, resized_image)
    print(f"リサイズされた画像が保存されました: {new_file_path}")

if __name__ == '__main__':
    main()

これを実行し

python resize_save.py PXL_20240612_091410912.jpg    

でファイルサイズを半分にしたが結果は同じのため

python file_info.py PXL_20240612_091410912_resized.jpg

でさらに半分にすることでようやく

Face001.jpg

が作成できた

これにより
Pixcel8で撮影したスマホの写真で顔データを作る場合には
元画像の1/4にする必要があることが判明

ちなみにファイルの比較は

python file_info.py PXL_20240612_091410912_resized_resized.jpg 
画像の幅: 684 ピクセル
画像の高さ: 912 ピクセル
色チャネル数: 3
ファイルサイズ: 228769 バイト
python file_info.py PXL_20240612_091410912.jpg 
画像の幅: 2736 ピクセル
画像の高さ: 3648 ピクセル
色チャネル数: 3
ファイルサイズ: 2319152 バイト

現在の天気が雨でない場合に次の1時間の天気予報をチェックする

現在の天気が雨でない場合に次の1時間の天気予報をチェックするように変更

# weather_check.py
import location_utils2
import requests

def get_weather_data(api_key, latitude, longitude):
    # OpenWeather One Call API の URL
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# APIキーを設定
api_key = 'APIキー'


# location_utils2から緯度と経度と天気をを取得
latitude, longitude, _ , weather_description = location_utils2.get_current_location_and_address()

# 天気データを取得
weather_data = get_weather_data(api_key, latitude, longitude)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

のコードだと
雨が降っていても一時間後に雨なのかを調べてしまう

なので現在地の天気が雨でないのなら実行するようにする

なおこの判定は

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "APIキー"
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    #天気の取得
    # weather_description = jsondata['weather'][0]['description']
    weather_description = jsondata['weather'][0]['main']

    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address, weather_description

で郵便番号をもとに
現在地の緯度経度と天気を取得している

この時に
Mainだと英語で取得になるが
Descriptionだと日本語で詳細の天気になる

ただ詳細な天気だと小雨とか判定が面倒なので
とにかく雨であれば
main==rainの判定にできるので
Mainを判定基準にした

変更後のコードは

import location_utils2
import requests

def get_weather_data(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

api_key = 'APIキー'

latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()

# 現在の天気が雨でない場合にのみ次の1時間の雨の予報をチェック
if weather_description != 'Rain':
    weather_data = get_weather_data(api_key, latitude, longitude)
    if check_for_rain(weather_data):
        print("Alert: Rain is expected within the next hour!")
    else:
        print("No rain expected in the next hour.")
else:
    print("It is currently raining.")

次に音声を流れるように変更

今回も音声再生はpygameを使う

pygameライブラリを使って音声ファイルを再生しています。まず、pygame.mixerを初期化し、音声ファイルを読み込んで再生します。pygame.mixer.music.play()関数で音声を再生し、while pygame.mixer.music.get_busy()ループを使って音声が再生されている間は待機します。このスクリプトは、雨が予報されていない場合にのみ、次の1時間の雨の予報をチェックし、予報されていた場合に指定された音声ファイルを再生します

コードは

import location_utils2
import requests
import pygame  # Import pygame for audio playback

def get_weather_data(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

api_key = 'APIキー'

latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()

# 現在の天気が雨でない場合にのみ次の1時間の雨の予報をチェック
if weather_description != 'Rain':
    weather_data = get_weather_data(api_key, latitude, longitude)
    if check_for_rain(weather_data):
        print("Alert: Rain is expected within the next hour!")
        pygame.mixer.init()  # Initialize the mixer module
        pygame.mixer.music.load('voice.wav')  # Load your sound file
        pygame.mixer.music.play()  # Play the sound
        while pygame.mixer.music.get_busy():  # Wait for music to finish playing
            pygame.time.Clock().tick(10)
    else:
        print("No rain expected in the next hour.")
else:
    print("It is currently raining.")

でOK

音声の部分は

python create_voice.py voice.txt 192.168.1.69:50021

で作成した音声を使用

次は顔認識したら実行するようにする

以前の
Weatheer や mail_voice などで使ったkao.pyとxmlファイルを使えばできるはず

ということでモジュール化する

weather_alert.py

として

# weather_alert.py

import location_utils2
import requests
import pygame

def get_weather_data(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

def check_and_alert(api_key):
    latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()

    if weather_description != 'Rain':
        weather_data = get_weather_data(api_key, latitude, longitude)
        if check_for_rain(weather_data):
            print("Alert: Rain is expected within the next hour!")
            pygame.mixer.init()
            pygame.mixer.music.load('voice.wav')
            pygame.mixer.music.play()
            while pygame.mixer.music.get_busy():
                pygame.time.Clock().tick(10)
        else:
            print("No rain expected in the next hour.")
    else:
        print("It is currently raining.")

として保存

他で呼び出す時には

    api_key = 'APIキー'
    check_and_alert(api_key)

というように加えればいい

なので
以前作成した kao.pyの中での処理を変更すればOK

その前に
config.iniを作成し
待機時間とAPIキーを記述しておく

とりあえず mail_voiceのものを使うので

cp ../mail_voice/*.xml .
cp ../mail_voice/config.ini .

でコピー

config.iniを編集する

[Settings]
detection_interval = 1800 #30分

[API_KEYS]
OPENWEATHER_API_KEY = APIキー

としておく

次に kao.pyの編集

 cp ../mail_voice/kao.py .

の後に編集する

import weather_alert

でインポート

# 設定を変数に格納
api_key = config['API_KEYS']['OPENWEATHER_API_KEY']
detection_interval = int(config['Settings']['detection_interval'])

そして天気の取得処理を追加

        if lastTime is None or time.perf_counter() - lastTime > detection_interval:
            # 検出時刻更新
            lastTime = time.perf_counter()
            weather_alert.check_and_alert(api_key)

とする

これで実行したら

  File "/Users/snowpool/aw10s/rain_alert/kao.py", line 12, in <module>
    detection_interval = int(config['Settings']['detection_interval'])
ValueError: invalid literal for int() with base 10: '1800 #30分'

となった

原因はコメント
これが原因でonfig[‘Settings’][‘detection_interval’] の値が 1800 #30分 という文字列であり、これを整数に変換しようとしてエラーが発生していることです。int() 関数は整数に変換できない文字列を受け取るとエラーを発生させます。
問題の原因は、config[‘Settings’][‘detection_interval’] の値が正しい整数値ではなく、その後にコメントが付いているためです。Pythonでは、int() 関数は文字列中の数字のみを整数として解釈します。
とのこと

interval_string = config['Settings']['detection_interval']
interval_without_comment = interval_string.split('#')[0].strip()
detection_interval = int(interval_without_comment)

でコメントを消すこともできるらしいが
とりあえず余計なコメントは削除した

これで実行したら

Hello from the pygame community. https://www.pygame.org/contribute.html
Traceback (most recent call last):
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/base.py", line 368, in _call_geocoder
    result = self.adapter.get_json(url, timeout=timeout, headers=req_headers)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/adapters.py", line 472, in get_json
    resp = self._request(url, timeout=timeout, headers=headers)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/adapters.py", line 500, in _request
    raise AdapterHTTPError(
geopy.adapters.AdapterHTTPError: Non-successful status code 403

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/rain_alert/kao.py", line 41, in <module>
    weather_alert.check_and_alert(api_key)
  File "/Users/snowpool/aw10s/rain_alert/weather_alert.py", line 21, in check_and_alert
    latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()
  File "/Users/snowpool/aw10s/rain_alert/location_utils2.py", line 28, in get_current_location_and_address
    location = geolocator.reverse((latitude, longitude), language='ja')
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/nominatim.py", line 372, in reverse
    return self._call_geocoder(url, callback, timeout=timeout)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/base.py", line 388, in _call_geocoder
    res = self._adapter_error_handler(error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/base.py", line 411, in _adapter_error_handler
    raise exc_cls(str(error)) from error
geopy.exc.GeocoderInsufficientPrivileges: Non-successful status code 403

となる

geopy ライブラリがジオコーディングサービスへのアクセスに失敗
が原因とのこと

住所の表示は不要で
緯度経度があれば問題ないため
住所表示機能を削除したら無事に動いた

とりあえず今回もコードを公開するのと同時に
Voicevoxのスクリプトも一緒にリポジトリに入れて公開する

緯度経度、天気を取得

緯度経度、天気を取得するように

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "APIキー"
    zip_place = "437-郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    #天気の取得
    weather_description = jsondata['weather'][0]['description']
    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address, weather_description

へコード変更

location_utils2.py

を変更したので

それに伴い

# weather_check.py
import location_utils2
import requests

def get_weather_data(api_key, latitude, longitude):
    # OpenWeather One Call API の URL
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# APIキーを設定
api_key = 'APIキー'


# location_utils2から緯度と経度を取得
latitude, longitude, _ , weather_description = location_utils2.get_current_location_and_address()

# 天気データを取得
weather_data = get_weather_data(api_key, latitude, longitude)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

としたが
雨が降りそうだが降らないと出る

No rain expected in the next hour.
import location_utils2

# 緯度、経度、住所(無視)、天気を取得
latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()

# 緯度、経度、天気を表示
print(f"Latitude: {latitude}, Longitude: {longitude}")
print("Weather:", weather_description)

で一度
緯度経度と天気を出したら
Latitude: 緯度, Longitude: 経度
Weather: 小雨

となっている

import requests
import json
from pprint import pprint
# url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&appid={API_key}"
url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"

# xxxxx
url = url.format(zip_place = "437-郵便番号,JP", API_key = "APIキー")

jsondata = requests.get(url).json()

pprint(jsondata)
# 緯度経度のデータを抽出
latitude = jsondata['coord']['lat']
longitude = jsondata['coord']['lon']

# 緯度経度を表示
print("緯度:", latitude)
print("経度:", longitude)

で詳細のJSONを出しても
日本語になっている

{'base': 'stations',
 'clouds': {'all': 99},
 'cod': 200,
 'coord': {'lat': 緯度, 'lon': 経度},
 'dt': 1714511699,
 'id': 0,
 'main': {'feels_like': 18.27,
          'grnd_level': 1001,
          'humidity': 80,
          'pressure': 1002,
          'sea_level': 1002,
          'temp': 18.3,
          'temp_max': 18.3,
          'temp_min': 18.3},
 'name': 'Kawai',
 'rain': {'1h': 0.32},
 'sys': {'country': 'JP',
         'id': 2092872,
         'sunrise': 1714507077,
         'sunset': 1714555965,
         'type': 2},
 'timezone': 32400,
 'visibility': 10000,
 'weather': [{'description': '小雨', 'icon': '10d', 'id': 500, 'main': 'Rain'}],
 'wind': {'deg': 284, 'gust': 4.82, 'speed': 2.68}}

ただし mainではrain

次に別の日に

weather.py      

を実行
天気は曇り

{'base': 'stations',
 'clouds': {'all': 100},
 'cod': 200,
 'coord': {'lat': 緯度, 'lon': 経度},
 'dt': 1714595140,
 'id': 0,
 'main': {'feels_like': 13.04,
          'grnd_level': 1012,
          'humidity': 90,
          'pressure': 1013,
          'sea_level': 1013,
          'temp': 13.3,
          'temp_max': 13.3,
          'temp_min': 13},
 'name': 'Kawai',
 'sys': {'country': 'JP',
         'id': 2092872,
         'sunrise': 1714593416,
         'sunset': 1714642414,
         'type': 2},
 'timezone': 32400,
 'visibility': 10000,
 'weather': [{'description': '厚い雲',
              'icon': '04d',
              'id': 804,
              'main': 'Clouds'}],
 'wind': {'deg': 87, 'gust': 6.7, 'speed': 3.31}}

つまりweather の main の部分を取得して表示すればOK
descriptionにしてしまうと
小雨とか分類が多くなってしまう

なので

    #天気の取得
    # weather_description = jsondata['weather'][0]['description']
    weather_description = jsondata['weather'][0]['main']

というようにすれば
現在地の天気は

Weather: Clouds

となる

あとは一時間後の天気を取得する部分だけど
これはGPTのコードではできているけど
ほんとにあっているか検証する

OpenWeather API を使い現在地の今日の天気を取得

OpenWeather API を使い現在地の今日の天気を取得。
一時間以内に雨が降るなら音声で知らせるものを作る

以前
https://github.com/Snowpooll/face_weather
で作成した顔を認識したら今日の天気を知らせる
の応用で
顔を認識した時に1時間以内に雨が降るなら
音声で知らせる
を作成する

GPTで
OpenWeather API を使い現在地の今日の天気を取得。
一時間以内に雨が降るならアラートを表示するpythonコード
としたとき

import requests
import json

# OpenWeather API キーと緯度経度を設定
api_key = 'YOUR_API_KEY'
latitude = 'YOUR_LATITUDE'
longitude = 'YOUR_LONGITUDE'

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

となった

しかし
現在地をどうせなら自動で取得したいので
GPTで
現在地の緯度経度を取得するpythonコード
とすると

ipinfo.ioとNominatim ジオコーダ
2つの取得方法が提示される

なお移動しながらの位置情報取得はGPSをお勧めされた

とりあえず精度などからだと

家庭のように固定された場所の位置情報を取得する場合は、Nominatim ジオコーダを使用するのが一般的に適しています。その理由は以下の通りです:
1. 正確性:Nominatim は OpenStreetMap のデータに基づいており、具体的な住所や場所名から精密な緯度と経度を提供できます。これは家庭や建物の正確な位置を特定するのに最適です。
2. 詳細レベル:Nominatim は詳細な逆ジオコーディングを提供し、具体的な住所情報を含めることができます。これにより、特定の住所の地理的な位置を正確に把握できます。
3. 汎用性:地図上での場所検索や、特定の地点の座標を取得するために、Nominatim は広範なアプリケーションで利用可能です。地図関連の多くの機能を実装する場合にも対応しやすいです。
一方で、ipinfo.io は IP アドレスに基づいて地理的な位置を特定するサービスであり、特定の住所に対してはそれほど詳細な情報を提供することはできません。IP ベースの位置特定は、あくまでおおまかな地域情報を提供するため、家庭のように固定された具体的な位置を必要とする場合には適していません。
したがって、家庭のような固定の場所の正確な位置情報を求める場合は、Nominatim ジオコーダがより適切な選択肢と言えるでしょう。

とのこと

とりあえず

pip install geopy

でライブラリを入れる

しかし

from geopy.geocoders import Nominatim
import geopy

def get_location():
    # ジオロケーターを初期化
    geolocator = Nominatim(user_agent="geoapiExercises")
    
    try:
        # IPアドレスから位置情報を取得
        location = geolocator.geocode("現在地の住所", timeout=10)
        
        # 緯度と経度を取得
        latitude = location.latitude
        longitude = location.longitude
        
        return latitude, longitude
    
    except geopy.exc.GeocoderTimedOut:
        return "Geocoder service timed out"
    
    except AttributeError:
        return "Location not found"

# 現在地の緯度経度を取得
current_location = get_location()
print("Latitude:", current_location[0])
print("Longitude:", current_location[1])

を実行すると

Latitude: L
Longitude: o

となる

ご提示のコードで Latitude: L と Longitude: o という結果が出力されるのは、おそらく何らかのエラーが発生しているため、エラーメッセージの文字列を返している可能性が高いです。get_location 関数では、例外が発生した場合に文字列を返しています。しかし、それが数字のタプルではないため、current_location[0] と current_location[1] で文字列の最初の文字が表示されてしまうのです。
問題を解決するために、エラー発生時のハンドリングを少し変更し、エラーメッセージを確認できるように修正すると良いでしょう。以下に修正したコードを示します。

from geopy.geocoders import Nominatim
import geopy

def get_location():
    # ジオロケーターを初期化
    geolocator = Nominatim(user_agent="geoapiExercises")
    
    try:
        # 住所から位置情報を取得
        location = geolocator.geocode("現在地の住所", timeout=10)
        
        # 緯度と経度を取得
        if location:
            latitude = location.latitude
            longitude = location.longitude
            return (latitude, longitude)
        else:
            return "Location not found", None
    
    except geopy.exc.GeocoderTimedOut:
        return "Geocoder service timed out", None
    
    except Exception as e:
        return str(e), None

# 現在地の緯度経度を取得
current_location = get_location()

if isinstance(current_location, tuple) and None not in current_location:
    print("Latitude:", current_location[0])
    print("Longitude:", current_location[1])
else:
    print(current_location[0])  # エラーメッセージを出力

の結果は

Location not found

住所の入力方法に問題ありかと思って
https://note.com/asahi_ictrad/n/nc948388e7c72#66ef102d-ebb9-421c-9e94-ea6e86c1a063
を参考に

from geopy.geocoders import Nominatim
import geopy

def get_location():
    # ジオロケーターを初期化
    geolocator = Nominatim(user_agent="geoapiExercises")
    
    try:
        # 住所から位置情報を取得
        location = geolocator.geocode("日本、〒郵便番号 住所", timeout=10)
        
        # 緯度と経度を取得
        if location:
            latitude = location.latitude
            longitude = location.longitude
            return (latitude, longitude)
        else:
            return "Location not found", None
    
    except geopy.exc.GeocoderTimedOut:
        return "Geocoder service timed out", None
    
    except Exception as e:
        return str(e), None

# 現在地の緯度経度を取得
current_location = get_location()

if isinstance(current_location, tuple) and None not in current_location:
    print("Latitude:", current_location[0])
    print("Longitude:", current_location[1])
else:
    print(current_location[0])  # エラーメッセージを出力

としたが変わらない

このため

現在地の緯度と経度を取得するには、Pythonで geopy ライブラリを使用するのが一般的です。geopy を使って、デバイスの IP アドレスを基にして位置情報を推測する方法
を試す

pip install geopy requests

でライブラリインポート

from geopy.geocoders import Nominatim
import requests

def get_public_ip():
    # 公開 API を使用して公衆 IP アドレスを取得
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    # IP アドレスから位置情報を推定
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

# 公衆 IP アドレスを取得
public_ip = get_public_ip()

# IP アドレスから位置情報を取得
latitude, longitude = get_location_by_ip(public_ip)
print(f"Latitude: {latitude}, Longitude: {longitude}")

これなら成功

なのでこれをモジュールにして呼び出すようにする

location_utils.py

として

# location_utils.py

from geopy.geocoders import Nominatim
import requests

def get_public_ip():
    """公開 API を使用して公衆 IP アドレスを取得する関数"""
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    """IP アドレスから位置情報(緯度と経度)を推定する関数"""
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

で保存

次に

import requests
import json

# OpenWeather API キーと緯度経度を設定
api_key = 'YOUR_API_KEY'
latitude = 'YOUR_LATITUDE'
longitude = 'YOUR_LONGITUDE'

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

の中で

location_utils.py

を呼び出して緯度経度を自動取得できるようにしたい

location_utils.py モジュールから緯度と経度を取得する機能を weather_check.py に組み込むために、以下のようにコードを修正します。まず、location_utils.py が緯度と経度を直接返すようにし、その後、この情報を weather_check.py で利用して OpenWeather API に問い合わせます。

location_utils.py を以下のように更新します。このコードは緯度と経度を直接取得し、戻り値として返します

# location_utils.py

import requests

def get_public_ip():
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

def get_current_location():
    ip_address = get_public_ip()
    return get_location_by_ip(ip_address)

次に

weather_check.py

location_utils から緯度と経度を取得し、その情報を使って天気情報を取得

# weather_check.py

import requests
import location_utils

# OpenWeather API キー
api_key = 'APIキー'

# 緯度と経度を location_utils から取得
latitude, longitude = location_utils.get_current_location()
if latitude is None or longitude is None:
    print("Failed to get location.")
    exit()

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

これで実行すると

No rain expected in the next hour.

となる

しかしこれだけだと本当に合っているかわからないので
IPアドレスから住所を表示するのと
一時間毎の天気を表示するものを作る

住所を確認したが違ってた

表示された緯度経度が本来の緯度経度とずれている

他のものも試してみた

pip install geocoder

の後に

import geocoder

def get_current_location():
    # 自分の IP アドレスに基づいて現在地の位置情報を取得
    g = geocoder.ip('me')
    if g.ok:
        return g.latlng
    else:
        return None

# 現在地の緯度と経度を取得して表示
location = get_current_location()
if location:
    latitude, longitude = location
    print(f"Latitude: {latitude}, Longitude: {longitude}")
else:
    print("Unable to determine current location")

国土地理院なら

import requests
import urllib

makeUrl = "https://msearch.gsi.go.jp/address-search/AddressSearch?q="
s_quote = urllib.parse.quote('千葉県南房総市富浦町青木123-1')
response = requests.get(makeUrl + s_quote)
print(response.json()[0]["geometry"]["coordinates"])

[137.91481, 34.743805]

取り合えず住所から入力して緯度経度取得にするなどを考えていたが
よくよく考えればどうやって緯度経度を出したか調べたら

import requests
import json
from pprint import pprint
url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&appid={API_key}"
# xxxxx
url = url.format(zip_place = "郵便番号,JP", API_key = "取得したAPIキー")

jsondata = requests.get(url).json()
pprint(jsondata)

print("天気:",jsondata["weather"][0]["main"])
print("天気詳細:",jsondata["weather"][0]["description"])

print("都市名:",jsondata["name"])
print("気温:",jsondata["main"]["temp"])
print("体感気温:",jsondata["main"]["feels_like"])
print("最低気温:",jsondata["main"]["temp_min"])
print("最高気温:",jsondata["main"]["temp_max"])
print("気圧:",jsondata["main"]["pressure"])
print("湿度:",jsondata["main"]["humidity"])

print("風速:",jsondata["wind"]["speed"])
print("風の方角:",jsondata["wind"]["deg"])

で緯度経度を取得していた

つまり

import requests
import json
from pprint import pprint
# url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&appid={API_key}"
url = "https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"

# xxxxx
url = url.format(zip_place = "郵便番号,JP", API_key = "取得したAPIキー")

jsondata = requests.get(url).json()

pprint(jsondata)

の結果で

{'base': 'stations',
 'clouds': {'all': 100},
 'cod': 200,
 'coord': {'lat': 緯度, 'lon': 経度},
 'dt': 1714340555,
 'id': 0,
 'main': {'feels_like': 17.59,
          'grnd_level': 1014,
          'humidity': 77,
          'pressure': 1015,
          'sea_level': 1015,
          'temp': 17.75,
          'temp_max': 17.75,
          'temp_min': 17.75},
 'name': 'Kawai',
 'rain': {'1h': 0.1},
 'sys': {'country': 'JP',
         'id': 2092872,
         'sunrise': 1714334403,
         'sunset': 1714383067,
         'type': 2},
 'timezone': 32400,
 'visibility': 10000,
 'weather': [{'description': '厚い雲',
              'icon': '04d',
              'id': 804,
              'main': 'Clouds'}],
 'wind': {'deg': 255, 'gust': 1.55, 'speed': 0.83}}

の実行結果の中から

 'coord': {'lat': 緯度, 'lon': 経度},


緯度経度のみ抽出して表示したい

これは

# 緯度経度のデータを抽出
latitude = jsondata['coord']['lat']
longitude = jsondata['coord']['lon']

# 緯度経度を表示
print("緯度:", latitude)
print("経度:", longitude)

とすればOK

次にモジュールにする
location_util2.pyの中身の

# location_utils.py

import requests
from geopy.geocoders import Nominatim

def get_public_ip():
    response = requests.get('https://api.ipify.org')
    return response.text

def get_location_by_ip(ip_address):
    url = f'https://ipinfo.io/{ip_address}/json'
    response = requests.get(url)
    data = response.json()
    loc = data.get('loc', None)
    if loc:
        latitude, longitude = loc.split(',')
        return latitude, longitude
    return None, None

def get_address_from_coordinates(latitude, longitude):
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), exactly_one=True)
    return location.address if location else "Address not found."

def get_current_location_and_address():
    ip_address = get_public_ip()
    location = get_location_by_ip(ip_address)
    if location:
        latitude, longitude = location
        address = get_address_from_coordinates(latitude, longitude)
        return latitude, longitude, address
    return None, None, "Location not found."

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "取得したAPIキー"
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address

として保存

Address.pyの中身を

import location_utils2
latitude, longitude, address = location_utils2.get_current_location_and_address()
if latitude and longitude:
    print(f"Latitude: {latitude}, Longitude: {longitude}")
if address:
    print("Address:", address)
から
import location_utils2

latitude, longitude, address = location_utils2.get_current_location_and_address()
if latitude and longitude:
    print(f"Latitude: {latitude}, Longitude: {longitude}")
if address:
    print("Address:", address)

って同じじゃん

実行し
https://www.geocoding.jp
で調べたら
緯度 経度
が住所の緯度経度

つまり
Nominatim
による住所への変換がダメってことがわかった

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "取得したAPIキー"
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address

の内容を

import requests

def get_current_location_and_address():
    # APIキーとZIPコードを設定
    API_key = "YOUR_API_KEY"
    zip_place = "YOUR_ZIP_CODE,JP"
    
    # OpenWeatherMap APIを使用して緯度経度を取得
    weather_url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    response = requests.get(weather_url)
    jsondata = response.json()
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    # 国土地理院APIを使用して住所を取得
    geo_url = f"https://mreversegeocoder.gsi.go.jp/reverse-geocoder/LonLatToAddress?lat={latitude}&lon={longitude}"
    geo_response = requests.get(geo_url)
    address_data = geo_response.json()
    
    # 住所情報を取得(存在する場合)
    address = address_data['results']['lv01Nm'] if 'results' in address_data and 'lv01Nm' in address_data['results'] else None
    
    return latitude, longitude, address

とした結果
とりあえず郵便番号まで取得できて大体の住所がでるなら
Nominatim
で事足りる

重要なのはそこじゃなくて緯度経度が合ってるかどうか
なので
住所検索の機能はオミットする予定だったが

import location_utils2

latitude, longitude, _ = location_utils2.get_current_location_and_address()  # 住所情報は無視
print(f"Latitude: {latitude}, Longitude: {longitude}")

というように別のファイルで呼び出すときに
指定しなければ緯度経度のみ抽出できる

次に

# weather_check.py

import requests
import location_utils

# OpenWeather API キー
api_key = 'APIキー'

# 緯度と経度を location_utils から取得
latitude, longitude = location_utils.get_current_location()
if latitude is None or longitude is None:
    print("Failed to get location.")
    exit()

# OpenWeather One Call API の URL
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"

def get_weather_data(url):
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# API から天気データを取得
weather_data = get_weather_data(url)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

のコードを
これを使って緯度経度取得したものへ変更

# weather_check.py
import location_utils2
import requests

def get_weather_data(api_key, latitude, longitude):
    # OpenWeather One Call API の URL
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# APIキーを設定
api_key = 'APIキー'


# location_utils2から緯度と経度を取得
latitude, longitude, _ = location_utils2.get_current_location_and_address()

# 天気データを取得
weather_data = get_weather_data(api_key, latitude, longitude)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

これを実行すれば雨が降らないので

No rain expected in the next hour.

となる

次に6時間毎の天気を表示する

weather_forecast_6hour.py

import requests

def get_weather_forecast(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={latitude}&lon={longitude}&units=metric&lang=ja&appid={api_key}"
    response = requests.get(url)
    return response.json()

# api_key = 'YOUR_API_KEY'  # APIキーを設定
api_key = 'APIキー'

latitude = 35.681236  # 東京駅の緯度
longitude = 139.767125  # 東京駅の経度

weather_data = get_weather_forecast(api_key, latitude, longitude)

# 現在の天気、1時間毎の天気(次の6時間)、1日毎の天気(次の7日間)を表示
print("Current Weather:", weather_data['current'])
print("Hourly Forecast for next 6 hours:")
for hour in weather_data['hourly'][:6]:
    print(hour)
print("Daily Forecast for next 7 days:")
for day in weather_data['daily'][:7]:
    print(day)

としたが

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/rain_alert/weather_forecast_6hour.py", line 17, in <module>
    print("Current Weather:", weather_data['current'])
KeyError: 'current'

となる

「現在・1時間毎・1日毎」の天気予報を取得
https://dev.classmethod.jp/articles/use-open-weather-map/
のコードが古いようだ

次に
https://3pysci.com/openweathermap-5/#index_id5
を参考にまずは全データの取得

だがだめ

公式を確認する
https://openweathermap.org/api/one-call-3#current

https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}
となっていて
	optional	By using this parameter you can exclude some parts of the weather data from the API response. It should be a comma-delimited list (without spaces).
Available values:
	•	current
	•	minutely
	•	hourly
	•	daily
	•	alerts

やるべきことは
現在の天気を取得し
雨以外の天気なら
一時間以内に雨が降るかを調べるものを作成する

雨が降りそうな時のみ音声を流す
これを顔認識したら実行するものを作成することで
洗濯物を取り込めるようにする

出かけたり買い物の時間を考えれば三時間がベストなので

print("天気:",jsondata["current"]["weather"][0]["main"])

で実験したらNG

3時間にしても

    hourly_forecast = weather_data['hourly'][:24]  # 24時間分のデータを取得

の部分が間違っている

つまり時間毎の指定ができないとだめ

https://zenn.dev/daifukuninja/articles/5e696cd0a75ba8#たたいてみた
を参考に
現在地の天気は

curl 'https://api.openweathermap.org/data/2.5/weather?zip=169-0072,JP&appid={your-api-key}&lang=jp'

で取得できるので

curl 'https://api.openweathermap.org/data/2.5/forecast?lon=緯度&lat=経度&appid={your-api-key}&lang=jp'

とりあえず現在の郵便番号から
緯度経度、天気を取得するように

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = "APIキー"
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    #天気の取得
    weather_description = jsondata['weather'][0]['description']
    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address, weather_description

へコード変更

JavaScriptで作成されたページからPDFダウンロード

JavaScriptで作成されたページからPDFダウンロード

pip install selenium

でインストール

from selenium import webdriver

# Safari WebDriverのインスタンスを作成
driver = webdriver.Safari()

# Googleのホームページにアクセス
driver.get("https://www.google.com")

# ウェブページのタイトルをコンソールに表示
print(driver.title)

# ブラウザを閉じる
driver.quit()

で実行できたので
safariでseleniumで行ってみて
成功したらubuntu でchrome などで行うようにしてみる

とりあえずまずはリンクを表示できるかテスト

まず、SafariのWebDriverを有効にし、SeleniumでSafariを使用する準備を整える必要があることを思い出してください。これには、Safariの「開発」メニューで「リモートオートメーションを許可」を有効にする手順が含まれます。

これは
Safari > 設定 > 詳細タブをクリック
Webデベロッパ用の機能を表示をチェックし

デベロッパタブをクリックし
リモートオートメーションを許可にチェックを入れることでできる

from selenium import webdriver
from selenium.webdriver.common.by import By

# WebDriverの初期化(Safari)
driver = webdriver.Safari()

# ページにアクセス
driver.get("https://message.cocoo.education/message/ファイル番号")

# ページが完全にロードされるまで待機
driver.implicitly_wait(10)  # 秒

# 'プレビュー'リンクを探す
try:
    preview_link = driver.find_element(By.LINK_TEXT, "プレビュー")
    print("プレビューリンク:", preview_link.get_attribute('href'))
except:
    print("プレビューリンクが見つかりませんでした。")

# ブラウザを閉じる
driver.quit()

これでブラウザが立ち上がり
ログにPDFのリンクが表示される

つまりselenium ならURLの取得が可能

なのでPDF取得まで行うようにする
しかし

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Safari()
driver.get("https://message.cocoo.education/message/ファイル番号")

try:
    # 要素が見えるまで最大10秒間待機します。
    preview_button = WebDriverWait(driver, 10).until(
        EC.visibility_of_element_located((By.XPATH, '//button[contains(text(),"プレビュー")]'))
    )
    preview_button.click()
except Exception as e:
    print("エラーが発生しました:", e)
finally:
    driver.quit()

というようにページの動作を待つように変更したが変わらない

このためやはり機能を分割し
まずgmail からURLリンクを取得
次にこのURLからseleniumでプレビューボタンおURLを取得
取得したボタンURLからPDFを取得
としてみる

とりあえずPDFのリンクを取得するため
バックグランドでseleniumを実行し
PDFのリンクをテキストとして入れるメソッドに書き換える

from selenium import webdriver
from selenium.webdriver.common.by import By

# WebDriverの初期化(Safari)
driver = webdriver.Safari()

# ページにアクセス
driver.get("https://message.cocoo.education/message/ファイル番号")

# ページが完全にロードされるまで待機
driver.implicitly_wait(10)  # 秒

# 'プレビュー'リンクを探す
try:
    preview_link = driver.find_element(By.LINK_TEXT, "プレビュー")
    print("プレビューリンク:", preview_link.get_attribute('href'))
except:
    print("プレビューリンクが見つかりませんでした。")

# ブラウザを閉じる
driver.quit()

しかしsafariだとヘッドレスモードがない
まぁ試作品なので
とりあえずこれをモジュールにするだけにしておく

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

def find_preview_link(url, link_text="プレビュー"):
    # WebDriverの初期化(Safari)
    driver = webdriver.Safari()
    
    try:
        # ページにアクセス
        driver.get(url)
        
        # ページが完全にロードされるまで待機
        driver.implicitly_wait(10)  # 秒
        
        # 'プレビュー'リンクを探す
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        
        # リンクのURLを返す
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        # ブラウザを閉じる
        driver.quit()

# 使用例
url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223"
link_url = find_preview_link(url)
if link_url:
    print("プレビューリンク:", link_url)
else:
    print("プレビューリンクが見つかりませんでした。")

これでモジュールにできる

次にこのPDFのリンクを使いPDFをダウンロードする

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def find_preview_link(url, link_text="プレビュー"):
    driver = webdriver.Safari()
    try:
        driver.get(url)
        driver.implicitly_wait(10)  # 秒
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        driver.quit()

# 使用例
url = "https://message.cocoo.education/message/ファイル番号"
pdf_url = find_preview_link(url)
if pdf_url:
    print("プレビューリンク:", pdf_url)

    # ダウンロードしたいPDFファイルのURL
    response = requests.get(pdf_url)

    # レスポンスのステータスコードが200(成功)の場合、ファイルを保存
    if response.status_code == 200:
        with open("downloaded_file.pdf", "wb") as f:
            f.write(response.content)
        print("ファイルが正常にダウンロードされました。")
    else:
        print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code)
else:
    print("プレビューリンクが見つかりませんでした。")

これでOK

次にこのメソッドの最初の取得対象URLをgmailから取得する

以前

from __future__ import print_function
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを削除する関数
def remove_urls(text):
    # URLにマッチする正規表現パターン
    url_pattern = r'https?://\S+|www\.\S+'
    return re.sub(url_pattern, '', text)

# 最新の未読メール本文を取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    # 最新のメッセージ(最新の未読メール)を選択
    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    # メールの本文を取得
    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        # メール本文が parts 属性にある場合
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content"

    # URLを削除
    body_no_urls = remove_urls(body)
    return date + "<br>" + body_no_urls

# メールの受信日時を取得する関数(変更なし)
def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数(変更なし)
def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

# メイン処理
service = gmail_init()

# ラベル ID を指定して最新の未読メール本文を取得
latest_unread_message_body = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)

でURLを削除した本文を表示するものを作成した

これをURLと本文をそれぞれの変数に格納して返すようなメソッドに書き換える

import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# トークンとクレデンシャルのパス
tokenPath = "token.json"
credentialsPath = "credentials.json"

# メール本文のデコード関数
def decode(encoded):
    decoded = base64.urlsafe_b64decode(encoded).decode()
    return decoded

# URLを検出してリストに格納する関数
def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

# 最新の未読メール本文とURLを取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=labelIdsValue, q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    date = gmail_get_messages_body_date(messages, msg)
    topid = msg['id']
    msg = messages.get(userId='me', id=topid).execute()

    body = ""
    if msg["payload"]["body"]["size"] != 0:
        body = decode(msg["payload"]["body"]["data"])
    elif 'parts' in msg["payload"]:
        for part in msg["payload"]["parts"]:
            if part["body"]["size"] != 0:
                body = decode(part["body"]["data"])
                break

    if not body:
        return date + "<br> No body content", []

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(messages, msg):
    msg_id = msg['id']
    m = messages.get(userId='me', id=msg_id, format='raw').execute()
    raw = base64.urlsafe_b64decode(m['raw'])
    eml = email.message_from_bytes(raw)
    date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists(tokenPath):
        creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                credentialsPath, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(tokenPath, 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
print(urls)

これで本文とURLの分離取得ができた

次にこのメソッドで得られたURLからPDFをダウンロードするようにしたい

その前にモジュールかしておく

vim gmail_utils.py

中身を

# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

# メール本文のデコード関数
def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

# URLを検出してリストに格納する関数
def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

# 最新の未読メール本文とURLを取得する関数
def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

# メールの受信日時を取得する関数
def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

# Gmail API 初期化関数
def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

として

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body

# Gmail API サービスを初期化
service = gmail_init()

# ラベルIDを指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "INBOX")
print(latest_unread_message_body)
for url in urls:
    print(url)

のINBOXをラベルIDに変える

これで実行すると未読メールが表示される

次にこれを使いPDFのダウンロード

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def download_pdf_from_gmail_links():
    service = gmail_init()  # Gmail API サービスを初期化
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    _, urls = gmail_get_latest_unread_message_body(service, "INBOX")

    if not urls:
        print("メールからURLを抽出できませんでした。")
        return

    for url in urls:
        pdf_url = find_preview_link(url)
        if pdf_url:
            print("プレビューリンク:", pdf_url)
            try:
                response = requests.get(pdf_url)
                if response.status_code == 200:
                    filename = "downloaded_file.pdf"
                    with open(filename, "wb") as f:
                        f.write(response.content)
                    print(f"ファイルが正常にダウンロードされました: {filename}")
                else:
                    print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")
            except Exception as e:
                print(f"ダウンロード中にエラーが発生しました: {e}")
        else:
            print("プレビューリンクが見つかりませんでした。")

download_pdf_from_gmail_links()

コードを大幅に変更

# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service
# gmail_utils.py
import os.path
import re
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import email
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def extract_urls(text):
    url_pattern = r'https?://\S+|www\.\S+'
    urls = re.findall(url_pattern, text)
    text_no_urls = re.sub(url_pattern, '', text)
    return urls, text_no_urls

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found.", []

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    date = gmail_get_messages_body_date(msg)

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    urls, body_no_urls = extract_urls(body)
    return date + "<br>" + body_no_urls, urls

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d_%H-%M-%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

としたらバグった

とりあえず

gmail_get_text_and_url.py

でのURLと本文の取得

get_pdf.py

でPDFの取得はできた

あとはこの2つを組み合わせるだけ

vim gmail_api_utils.py

でgmailのテキストとURLを取り出す機能をモジュールにして他で呼び出せるようにする

ファイルの内容は

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
print(urls)

より上の部分のみ抽出して保存

main.py

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
for url in urls:
    print(url)

というように

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

でインポートして

service = gmail_init()

で初期化

from gmail_api_utils import gmail_init, gmail_get_latest_unread_message_body

service = gmail_init()

# ラベル ID を指定して最新の未読メール本文とURLを取得
latest_unread_message_body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
print(latest_unread_message_body)
for url in urls:
    print(url)

で呼び出せば同じ効果が出る

同様にPDFダウンロードもモジュールにする

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests

def find_preview_link(url, link_text="プレビュー"):
    driver = webdriver.Safari()  # またはChrome(), Firefox() など、使用するブラウザに合わせて変更
    try:
        driver.get(url)
        driver.implicitly_wait(10)  # 秒
        preview_link = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.LINK_TEXT, link_text))
        )
        return preview_link.get_attribute('href')
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        return None
    finally:
        driver.quit()

def download_pdf(url, file_path="downloaded_file.pdf"):
    if url:
        print("プレビューリンク:", url)
        response = requests.get(url)
        if response.status_code == 200:
            with open(file_path, "wb") as f:
                f.write(response.content)
            print("ファイルが正常にダウンロードされました。")
        else:
            print("ファイルのダウンロードに失敗しました。ステータスコード:", response.status_code)
    else:
        print("プレビューリンクが見つかりませんでした。")

これを

from pdf_downloader import find_preview_link, download_pdf

# 使用例
url = "https://message.cocoo.education/message/1ff451cda4cd705697e71734637ec2b5b5d4152d447203420f52da4a35cd8223"
pdf_url = find_preview_link(url)
download_pdf(pdf_url)

として保存し実行すればOK

あとは組み合わせる

しかし
プレビューリンクが
https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイル番号
というAmazonS3へのリンク
のはずがgmailから抽出したリンクになっているのでエラー

原因は

            # ここでは、プレビューリンクを直接取得する機能がないため、URLがPDFファイルを指していると仮定しています
            download_pdf(url, file_path="downloaded_file.pdf")

の部分
なのでURLが違う

これを修正する
なおGPTで出力した時にはINBOXになっているため
指定のラベルに変更することが必要になる

main3.py

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from pdf_downloader import find_preview_link, download_pdf

def save_email_body_to_text(body, filename="email_body.txt"):
    with open(filename, "w", encoding="utf-8") as file:
        file.write(body)

def main():
    # Gmail API サービスを初期化
    service = gmail_init()
    
    # ラベル ID を指定して最新の未読メール本文とURLを取得
    body, urls = gmail_get_latest_unread_message_body(service, "ラベルのID")
    if urls:
        for url in urls:
            print(f"プレビューリンクを検索するURL: {url}")
            # プレビューリンクを取得
            preview_url = find_preview_link(url)
            if preview_url:
                print(f"プレビューリンク: {preview_url}")
                # プレビューリンクからPDFファイルのダウンロードを試みる
                download_pdf(preview_url, file_path="downloaded_file.pdf")
            else:
                print("プレビューリンクが見つかりませんでした。")
    else:
        print("メールにURLが見つかりませんでした。")

    # メール本文をテキストファイルに保存
    save_email_body_to_text(body)

if __name__ == "__main__":
    main()

とすることで
メール本文はテキストファイルへ
PDFはダウンロードすることが完了した

指定したURLのプレビューボタンをクリックしPDFをダウンロードするpythonコードの作成(失敗編)

指定したURLのプレビューボタンをクリックしPDFをダウンロードするpythonコードの作成

amazonS3からファイルの取得はできたけど
動的サイトからのリンク取得はこの方法ではできなかった

以下ログ

指定したURLにアクセスし
プレビューと書かれたボタンをクリック、表示されるPDFファイルをダウンロードするpythonコード

chatGPTで作成していく

pip install requests beautifulsoup4

で必要ライブラリのインストール

import requests
from bs4 import BeautifulSoup

# ステップ2: 指定されたURLにGETリクエストを送信し、HTMLを取得
url = "https://任意のURL"
response = requests.get(url)
html = response.content

# ステップ3: BeautifulSoupを使用してHTMLを解析し、プレビューボタンのリンクを見つける
soup = BeautifulSoup(html, 'html.parser')
preview_button = soup.find('a', text='プレビュー')  # ここではaタグでテキストが'プレビュー'の要素を探していますが、実際の構造に応じて変更する必要があります。

# ステップ4: プレビューボタンのリンクを取得し、PDFファイルをダウンロード
if preview_button:
    pdf_url = preview_button['href']  # href属性からPDFのURLを取得
    pdf_response = requests.get(pdf_url)
    with open('document.pdf', 'wb') as f:
        f.write(pdf_response.content)
    print("PDFファイルがダウンロードされました。")
else:
    print("プレビューボタンが見つかりませんでした。")

で実行したが

/Users/snowpool/aw10s/mail_auto/get_pdf.py:11: DeprecationWarning: The 'text' argument to find()-type methods is deprecated. Use 'string' instead.
  preview_button = soup.find('a', text='プレビュー')  # ここではaタグでテキストが'プレビュー'の要素を探していますが、実際の構造に応じて変更する必要があります。
プレビューボタンが見つかりませんでした。

となる

ChatGPTでこの結果を調べると

警告メッセージに基づき、text引数の代わりにstring引数を使用する必要があります。また、プレビューボタンが見つからなかった問題に関しては、ページのHTML構造を確認せずに正確な解決策を提供するのは難しいですが、一般的なアドバイスをいくつか提供できます。
* 		string引数の使用: findメソッドでtextの代わりにstringを使用してみてください。
* 		正しい要素を特定する: プレビューボタンを特定するために、ページのHTMLソースを確認し、ボタンの正確なタグ名、クラス名、またはIDを特定します。この情報を元に、findまたはfind_allメソッドを適切に使用してください。
* 		動的コンテンツの考慮: もしプレビューボタンがJavaScriptによって動的に生成されている場合、requestsとBeautifulSoupだけでは取得できないかもしれません。この場合は、seleniumのようなブラウザ自動化ツールを使用して実際のブラウザセッションを介してページを読み込み、要素を探す必要があります。

となる

以下は、string引数を使用した修正されたコードの例

import requests
from bs4 import BeautifulSoup

# ステップ2: 指定されたURLにGETリクエストを送信し、HTMLを取得
url = "https://任意のURL"
response = requests.get(url)
html = response.content

# ステップ3: BeautifulSoupを使用してHTMLを解析し、プレビューボタンのリンクを見つける
soup = BeautifulSoup(html, 'html.parser')
preview_button = soup.find('a', string='プレビュー')  # `text`の代わりに`string`を使用

# ステップ4: プレビューボタンのリンクを取得し、PDFファイルをダウンロード
if preview_button:
    pdf_url = preview_button['href']  # href属性からPDFのURLを取得
    pdf_response = requests.get(pdf_url)
    with open('document.pdf', 'wb') as f:
        f.write(pdf_response.content)
    print("PDFファイルがダウンロードされました。")
else:
    print("プレビューボタンが見つかりませんでした。")

でもだめ

seleniumで取得してみる

pip install selenium


seleniumをインストール

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
import time

# Chromeのダウンロード設定
chrome_options = Options()
prefs = {
    "download.default_directory": "/path/to/download/directory",  # ダウンロードフォルダーのパスを設定
    "download.prompt_for_download": False,  # ダウンロード前に確認を求めない
    "download.directory_upgrade": True,
    "plugins.always_open_pdf_externally": True  # PDFファイルを自動的にダウンロード
}
chrome_options.add_experimental_option("prefs", prefs)

# Chrome WebDriverのパス
webdriver_path = '/path/to/chromedriver'  # ChromeDriverのパス

# WebDriverサービスを設定
service = Service(webdriver_path)

# WebDriverの初期化
driver = webdriver.Chrome(service=service, options=chrome_options)

# Webページにアクセス
driver.get("https://任意のURL")

# プレビューボタンを探してクリック(要素のXPathを使用)
# 実際のボタンのXPathに応じて調整してください
try:
    preview_button = driver.find_element(By.XPATH, '//button[contains(text(),"プレビュー")]')
    preview_button.click()
    time.sleep(5)  # ダウンロード完了を待つ
except Exception as e:
    print("エラーが発生しました:", e)

# ブラウザを閉じる
driver.quit()

でもだめ

ならば chrome developer tools で一度サイトの構成を調べる

<a href="https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイルの保存先" target="_blank">プレビュー</a>

となっていたが
これの構成で seleniumでダウンロードしようとしてもエラー

なので

import requests

# PDFファイルのURL
pdf_url = "https://s3.ap-northeast-1.amazonaws.com/storage.cocoo.education/ファイルの保存先"

# リクエストを送信してPDFファイルを取得
response = requests.get(pdf_url)

# レスポンスのステータスコードが200(成功)の場合、ファイルを保存
if response.status_code == 200:
    with open("downloaded_file.pdf", "wb") as file:
        file.write(response.content)
    print("PDFファイルが正常にダウンロードされました。")
else:
    print(f"ファイルのダウンロードに失敗しました。ステータスコード: {response.status_code}")

として直接リンクのURLからファイルを取得

これは成功

なので、
GMAILからリンクURLを抜き出しサイトへアクセスする機能
次にそのページからリンクボタンのURLを抜き出す機能
最後にPDFをダウンロードする機能
と分けて作成する

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# 'プレビュー'リンクを探す
preview_link = soup.find('a', text='プレビュー')
if preview_link:
    href_value = preview_link['href']
    print("プレビューのリンク:", href_value)
else:
    print("プレビューリンクが見つかりませんでした。")

しかし

/Users/snowpool/aw10s/mail_auto/get_pdf_url.py:15: DeprecationWarning: The 'text' argument to find()-type methods is deprecated. Use 'string' instead.
  preview_link = soup.find('a', text='プレビュー')
プレビューリンクが見つかりませんでした。

となる

このため
BeautifulSoupの最新バージョンでは、findメソッド(および関連メソッド)でtext引数の代わりにstring引数を使用するよう推奨
とのことのため

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# 'プレビュー'リンクを探す('string'引数を使用)
preview_link = soup.find('a', string='プレビュー')
if preview_link:
    href_value = preview_link['href']
    print("プレビューのリンク:", href_value)
else:
    # 'プレビュー'を含むテキストを持つリンクを探す場合の代替手段
    preview_links = soup.find_all('a')
    for link in preview_links:
        if 'プレビュー' in link.text:
            print("プレビューのリンク:", link['href'])
            break
    else:
        print("プレビューリンクが見つかりませんでした。")

へ変更したが

プレビューリンクが見つかりませんでした。
となる

webページの構造が予想と異なるか、リンクが動的に生成されている可能性があり
のようなケースでは、ブラウザの自動操作が可能なSeleniumのようなツールを使用する必要がある

念の為Javascriptで書かれているかチェック

import requests
from bs4 import BeautifulSoup

# 初期ページのURL
initial_url = "https://任意のURL"

# 初期ページからHTMLを取得
response = requests.get(initial_url)
html = response.content

# HTMLを解析
soup = BeautifulSoup(html, 'html.parser')

# ページ内のすべてのaタグを探索
links = soup.find_all('a')
found = False
for link in links:
    if link.text and 'プレビュー' in link.text:
        print("プレビューのリンク:", link['href'])
        found = True
        break

if not found:
    print("プレビューリンクが見つかりませんでした。")

の結果

プレビューリンクが見つかりませんでした

となったため
Seleniumを使うことにする

seleniumを使うにはwebdriverが必要