M1MacbookAir にfaster-whisper を入れる

M1MacbookAir にfaster-whisper を入れる

https://microai.jp/blog/6cf7f278-e06a-42e1-953b-67eedd9c1ff8

m1mac16Gでの実行方法が掲載されてる

Apple Silicon用に最適化されたML用のライブラリMLXで最速Whisperを試す
も古いけど参考になると思う

M1 Macで音声認識AI Whisper
にもある

やはりGPUは必要らしい

実行時間
M1 Mac (MEM: 16G) で実行した結果(cudaなし=int8)
モデルの読み込み: 4.79秒
文字起こし: 93.72秒
GPUを使用していないとやはり遅いらしい

MLX の Whisperを検討したが
https://qiita.com/syukan3/items/5cdf2735d81d438929a9
によれば16GBのメモリでは厳しいらしい

ということで環境も同じような

文字起こしのライブラリ faster-whisper が超簡単で超早くて超正確!
を参考に行う

mkdir whisper
cd whisper

で作業ディレクトリ移動

pip install faster-whisper

音声ファイル操作のサードパーティ製ライブラリ

pip install pydub
pip install pandas

openpyxl(xcelファイル(.xlsx形式)を読み書きするため

pip install openpyxl

も追加でインストールしておく

brew install ffmpeg

でffmpeg も入れておく

しかし

arch -arm64 brew install ffmpeg

としないと

ffmpeg 7.0.1 is already installed but outdated (so it will be upgraded). Error: Cannot install under Rosetta 2 in ARM default prefix (/opt/homebrew)! To rerun under ARM use: arch -arm64 brew install ... To install under x86_64, install Homebrew into /usr/local. 

となる

これは
ARMアーキテクチャ用のHomebrewを使う必要があるため

実行後

Removing: /Users/snowpool/Library/Caches/Homebrew/python-packaging_bottle_manifest--24.0... (1.8KB)
Pruned 0 symbolic links and 2 directories from /opt/homebrew
==> Caveats
==> rust
zsh completions have been installed to:
  /opt/homebrew/share/zsh/site-functions

となるが

このメッセージは、Homebrewがキャッシュをクリアし、
シンボリックリンクやディレクトリの整理を行ったことを示しています。
また、Rustのzsh用の補完機能が
/opt/homebrew/share/zsh/site-functions にインストールされたことを知らせています。

特にエラーは表示されていないため、
ffmpeg のインストールやアップグレードが正常に進行したと考えられます。
次のステップとして、ffmpeg が正しくインストールされているかを確認するために、
以下のコマンドを実行してバージョン情報を確認してみてください

ということで

ffmpeg -version

を実行

ffmpeg version 7.0.2 Copyright (c) 2000-2024 the FFmpeg developers
built with Apple clang version 15.0.0 (clang-1500.3.9.4)
configuration: --prefix=/opt/homebrew/Cellar/ffmpeg/7.0.2 --enable-shared --enable-pthreads --enable-version3 --cc=clang --host-cflags= --host-ldflags='-Wl,-ld_classic' --enable-ffplay --enable-gnutls --enable-gpl --enable-libaom --enable-libaribb24 --enable-libbluray --enable-libdav1d --enable-libharfbuzz --enable-libjxl --enable-libmp3lame --enable-libopus --enable-librav1e --enable-librist --enable-librubberband --enable-libsnappy --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtesseract --enable-libtheora --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lzma --enable-libfontconfig --enable-libfreetype --enable-frei0r --enable-libass --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenjpeg --enable-libspeex --enable-libsoxr --enable-libzmq --enable-libzimg --disable-libjack --disable-indev=jack --enable-videotoolbox --enable-audiotoolbox --enable-neon
libavutil      59.  8.100 / 59.  8.100
libavcodec     61.  3.100 / 61.  3.100
libavformat    61.  1.100 / 61.  1.100
libavdevice    61.  1.100 / 61.  1.100
libavfilter    10.  1.100 / 10.  1.100
libswscale      8.  1.100 /  8.  1.100
libswresample   5.  1.100 /  5.  1.100
libpostproc    58.  1.100 / 58.  1.100

となるので問題ない

pip install torch

これもインストール

from faster_whisper import WhisperModel

# large-v3モデルのロード
model = WhisperModel("large-v3", device="cuda", compute_type="float16")

# 音声ファイルをテキストに変換
segments, info = model.transcribe("path_to_your_audio_file.wav")

# 結果を表示
for segment in segments:
    print(f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}")

とすることが多いけど
M1mac なので cuda は使えない

なお
faster-whisper ライブラリを使用する際、
指定したモデル(例えば large-v3)は、初回使用時に自動的にダウンロードされます。
モデルはローカルに保存されるため、次回以降は再ダウンロードする必要はありません
とのこと

touch test.py

でファイル作成

from faster_whisper import WhisperModel

# large-v3モデルのロード(CPUを使用)
model = WhisperModel("large-v3", device="cpu", compute_type="int8")

# 音声ファイルをテキストに変換
segments, info = model.transcribe("path_to_your_audio_file.wav")

# 結果を表示
for segment in segments:
    print(f"[{segment.start:.2f}s -> {segment.end:.2f}s] {segment.text}")

これを実行

python test.py 

モデルダウンロード時間は10分くらい

Downloading config.json: 100%|█████████████| 2.39k/2.39k [00:00<00:00, 8.90MB/s]
Downloading (…)rocessor_config.json: 100%|█████| 340/340 [00:00<00:00, 1.15MB/s]
Downloading tokenizer.json: 100%|██████████| 2.48M/2.48M [00:01<00:00, 2.08MB/s]
Downloading vocabulary.json: 100%|██████████| 1.07M/1.07M [00:01<00:00, 726kB/s]
Downloading model.bin: 100%|███████████████| 3.09G/3.09G [05:35<00:00, 9.21MB/s]
Traceback (most recent call last):█████████| 3.09G/3.09G [05:35<00:00, 9.24MB/s]
  File "/Users/snowpool/aw10s/whisper/test.py", line 7, in <module>
    segments, info = model.transcribe("path_to_your_audio_file.wav")
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/faster_whisper/transcribe.py", line 319, in transcribe
    audio = decode_audio(audio, sampling_rate=sampling_rate)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/faster_whisper/audio.py", line 46, in decode_audio
    with av.open(input_file, mode="r", metadata_errors="ignore") as container:
  File "av/container/core.pyx", line 420, in av.container.core.open
  File "av/container/core.pyx", line 266, in av.container.core.Container.__cinit__
  File "av/container/core.pyx", line 286, in av.container.core.Container.err_check
  File "av/error.pyx", line 326, in av.error.err_check
av.error.FileNotFoundError: [Errno 2] No such file or directory: 'path_to_your_audio_file.wav'

となった

原因は単純でフィルを指定していないため

https://microai.jp/blog/6cf7f278-e06a-42e1-953b-67eedd9c1ff8
を参考に
https://www.youtube.com/shorts/tLxGgAVvLwU
のYouTube 動画をダウンロードしてみる

yt-dlp -x --audio-format mp3 --output "output.mp4" "https://www.youtube.com/shorts/tLxGgAVvLwU" 

を実行すると

[youtube] Extracting URL: https://www.youtube.com/shorts/tLxGgAVvLwU [youtube] tLxGgAVvLwU: Downloading webpage [youtube] tLxGgAVvLwU: Downloading ios player API JSON [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] HTTP Error 400: Bad Request. Retrying (1/3)... [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] HTTP Error 400: Bad Request. Retrying (2/3)... [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] HTTP Error 400: Bad Request. Retrying (3/3)... [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] Unable to download API page: HTTP Error 400: Bad Request (caused by <HTTPError 400: Bad Request>); please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , filling out the appropriate issue template. Confirm you are on the latest version using yt-dlp -U [youtube] tLxGgAVvLwU: Downloading player 53afa3ce WARNING: [youtube] tLxGgAVvLwU: nsig extraction failed: You may experience throttling for some formats n = nvo-1xpBtSvwgEdI4 ; player = https://www.youtube.com/s/player/53afa3ce/player_ias.vflset/en_US/base.js WARNING: [youtube] tLxGgAVvLwU: nsig extraction failed: You may experience throttling for some formats n = TE4MbILAq1Ti1ox53 ; player = https://www.youtube.com/s/player/53afa3ce/player_ias.vflset/en_US/base.js [youtube] tLxGgAVvLwU: Downloading m3u8 information [info] tLxGgAVvLwU: Downloading 1 format(s): 140 [download] Destination: output.mp4 [download] 100% of 786.76KiB in 00:00:00 at 3.80MiB/s [FixupM4a] Correcting container of "output.mp4" [ExtractAudio] Destination: output.mp4.mp3 

となって output.mp4.mp3 が作成される

これは、yt-dlp が元のMP4ファイルから音声を抽出してMP3形式に変換した結果

音声のみのMP4ファイルを作成するには

yt-dlp -f mp4 --output "output.mp4" "https://www.youtube.com/shorts/tLxGgAVvLwU"

とするらしいが

 [youtube] Extracting URL: https://www.youtube.com/shorts/tLxGgAVvLwU [youtube] tLxGgAVvLwU: Downloading webpage [youtube] tLxGgAVvLwU: Downloading ios player API JSON [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] HTTP Error 400: Bad Request. Retrying (1/3)... [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] HTTP Error 400: Bad Request. Retrying (2/3)... [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] HTTP Error 400: Bad Request. Retrying (3/3)... [youtube] tLxGgAVvLwU: Downloading android player API JSON WARNING: [youtube] YouTube said: ERROR - Precondition check failed. WARNING: [youtube] Unable to download API page: HTTP Error 400: Bad Request (caused by <HTTPError 400: Bad Request>); please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , filling out the appropriate issue template. Confirm you are on the latest version using yt-dlp -U [youtube] tLxGgAVvLwU: Downloading player 53afa3ce WARNING: [youtube] tLxGgAVvLwU: nsig extraction failed: You may experience throttling for some formats n = xIaDKrIXPH5wjB2nw ; player = https://www.youtube.com/s/player/53afa3ce/player_ias.vflset/en_US/base.js WARNING: [youtube] tLxGgAVvLwU: nsig extraction failed: You may experience throttling for some formats n = yF3boOCKXfHqyR5qo ; player = https://www.youtube.com/s/player/53afa3ce/player_ias.vflset/en_US/base.js [youtube] tLxGgAVvLwU: Downloading m3u8 information [info] tLxGgAVvLwU: Downloading 1 format(s): 18 ERROR: unable to download video data: HTTP Error 403: Forbidden 

となる

このエラーは、YouTubeがアクセスをブロックしていることが原因で発生しています。
特に、yt-dlp が特定のAPIやプレイヤーの情報にアクセスできなくなっている場合に起こります

なお
–force-generic-extractor オプションを使う
–referer オプションを使う
ではだめだった

とりあえずテストで音声の文字起こしができればいいので
これはとりあえず保留

次に

import torch
from faster_whisper import WhisperModel

target_file = "sample.mp4"
model_size = "large-v3"
compute_type_for_gpu = "float16" # or int8_float16
compute_type_for_cup = "int8"
beam_size = 5

# CUDA が利用可能か確認
if torch.cuda.is_available():
    model = WhisperModel(model_size, device="cuda", compute_type=compute_type_for_gpu)
else:
    # CPU で INT8 を使用
    model = WhisperModel(model_size, device="cpu", compute_type=compute_type_for_cup)


segments, info = model.transcribe(target_file, beam_size=beam_size)

print("Detected language '%s' with probability %f" % (info.language, info.language_probability))

for segment in segments:
    print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

を実行しようとしたが

M1macなのでソース変更の可能性をGPTで調べた

import torch
from faster_whisper import WhisperModel

target_file = "sample.mp4"
model_size = "large-v3"
compute_type_for_gpu = "float16" # or int8_float16
compute_type_for_cpu = "int8"
beam_size = 5

# MPSが利用可能か確認 (AppleシリコンのGPU)
if torch.backends.mps.is_available():
    model = WhisperModel(model_size, device="mps", compute_type=compute_type_for_gpu)
else:
    # CPUで INT8 を使用
    model = WhisperModel(model_size, device="cpu", compute_type=compute_type_for_cpu)

segments, info = model.transcribe(target_file, beam_size=beam_size)

print("Detected language '%s' with probability %f" % (info.language, info.language_probability))

for segment in segments:
    print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

というように

AppleシリコンのGPUを活用するために
mps(Metal Performance Shaders)デバイスを使うように変更することができます。

ということで
mpsデバイスを使うにあたり設定やインストールが必要なものは?

. PyTorchのインストール
mpsバックエンドはPyTorch 1.12.0以降でサポートされています。
まず、最新のPyTorchをインストールする必要があります

pip install torch torchvision torchaudio

次に使えるか確認したいので

vim check.py

import torch

print(torch.backends.mps.is_available())

として保存

これを

python check.py 

として

True

なのでOK

xcode-select --install

を念の為実行

xcode-select: note: Command line tools are already installed. Use "Software Update" in System Settings or the softwareupdate command line interface to install updates

既にインストール済みだった

mv output.mp4.mp3 output.mp3

でファイルをリネーム

次に

touch sample.py

import torch
from faster_whisper import WhisperModel

target_file = "output.mp3"
model_size = "large-v3"
compute_type_for_gpu = "float16" # or int8_float16
compute_type_for_cpu = "int8"
beam_size = 5

# MPSが利用可能か確認 (AppleシリコンのGPU)
if torch.backends.mps.is_available():
    model = WhisperModel(model_size, device="mps", compute_type=compute_type_for_gpu)
else:
    # CPUで INT8 を使用
    model = WhisperModel(model_size, device="cpu", compute_type=compute_type_for_cpu)

segments, info = model.transcribe(target_file, beam_size=beam_size)

print("Detected language '%s' with probability %f" % (info.language, info.language_probability))

for segment in segments:
    print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

として保存

これを実行したら

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/whisper/sample.py", line 12, in <module>
    model = WhisperModel(model_size, device="mps", compute_type=compute_type_for_gpu)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/faster_whisper/transcribe.py", line 145, in __init__
    self.model = ctranslate2.models.Whisper(
ValueError: unsupported device mps

となった

結局MPSはサポートされていないので
CPUで動かすことに

import torch
from faster_whisper import WhisperModel

target_file = "output.mp3"
model_size = "large-v3"
compute_type_for_gpu = "float16" # or int8_float16
compute_type_for_cup = "int8"
beam_size = 5

# CUDA が利用可能か確認
if torch.cuda.is_available():
    model = WhisperModel(model_size, device="cuda", compute_type=compute_type_for_gpu)
else:
    # CPU で INT8 を使用
    model = WhisperModel(model_size, device="cpu", compute_type=compute_type_for_cup)


segments, info = model.transcribe(target_file, beam_size=beam_size)

print("Detected language '%s' with probability %f" % (info.language, info.language_probability))

for segment in segments:
    print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))

として再度実行する

 python sample.py > output.txt

を実行すると

[0.00s -> 1.78s] 食べないダイエットでリバウンドを繰り返し
[1.78s -> 3.78s] 正しく食べることでやっと痩せられた私の
[3.78s -> 5.48s] 一日の食事を解説していきます
[5.48s -> 7.44s] 朝はバナナとオールブランで糖質
[7.44s -> 8.40s] くるみで脂質
[8.40s -> 9.52s] 無脂肪ヨーグルトと
[9.52s -> 12.00s] 低糖質低脂質のミルクで脂肪質とってます
[12.00s -> 13.94s] 詳しい作り方はYouTubeで紹介してます
[13.94s -> 16.02s] 昼はサラダ、胸からお米です
[16.02s -> 17.92s] 胸からは揚げ物なんだけど
[17.92s -> 19.72s] 脂質が低くておすすめです
[19.72s -> 21.94s] 運動量も身長も平均な方であれば
[21.94s -> 23.72s] 4歩から5歩ぐらいがおすすめです
[23.72s -> 24.68s] 夜はサラダと
[24.68s -> 27.28s] あやのさんがヘルシーダイエットクラブで教えてくれた
[27.28s -> 28.84s] バターチキンカレーです
[28.84s -> 31.40s] カレーもノーオイルで自分で手作りしたりとか
[31.40s -> 33.86s] こういう脂質が低いカレールーを使ったりとか
[33.86s -> 37.04s] 市販のレトルトカレーでも脂質が低いものがあるので
[37.04s -> 39.22s] ダイエット中でもカレーを楽しめます
[39.22s -> 41.08s] 市販のものの裏面を見るコツは
[41.08s -> 42.64s] YouTubeで詳しく解説してます
[42.64s -> 45.68s] 自分の食事の適正量や過不足がわからないよっていう方は
[45.68s -> 46.56s] 数日でいいので
[46.56s -> 48.16s] アスケンなどで可視化してみると
[48.16s -> 49.52s] 気づきがあるかもしれません

というように文字起こしができた

次はマイクから文字起こしできるようにする

OCRのモジュール化とCSVファイルの作成

OCRのモジュール化

画像ファイルを引数にして使うのでモジュール化する

from google.cloud import vision

client = vision.ImageAnnotatorClient()

with open("test.jpg", "rb") as fb:
    content = fb.read()

image = vision.Image(content=content)

response = client.document_text_detection(image=image)
texts = response.text_annotations
print(texts[0].description)

をモジュールにする

touch ocr_module.py

コードは

# ocr_module.py

from google.cloud import vision

class OCRProcessor:
    def __init__(self):
        # Vision APIクライアントの初期化
        self.client = vision.ImageAnnotatorClient()

    def extract_text_from_image(self, image_path):
        """
        画像からテキストを抽出するメソッド

        Args:
            image_path (str): 画像ファイルのパス

        Returns:
            str: 画像から抽出されたテキスト
        """
        with open(image_path, "rb") as fb:
            content = fb.read()

        image = vision.Image(content=content)
        response = self.client.document_text_detection(image=image)
        texts = response.text_annotations

        if texts:
            return texts[0].description
        else:
            return ""

使う時には

# main.py

from ocr_module import OCRProcessor

def main():
    # OCRProcessorのインスタンスを作成
    ocr_processor = OCRProcessor()

    # 画像ファイルパスを指定
    image_path = "test.jpg"

    # 画像からテキストを抽出
    extracted_text = ocr_processor.extract_text_from_image(image_path)

    # 抽出されたテキストを出力
    print("抽出されたテキスト:")
    print(extracted_text)

if __name__ == "__main__":
    main()

というようにする

さらに

import re
import csv
from datetime import datetime

# ファイルからテキストを読み込む
with open('ocr.txt', 'r', encoding='utf-8') as file:
    text = file.read()

# 情報を抽出
date_match = re.search(r'\d{4}年\d{1,2}月\d{1,2}日', text)
date = datetime.strptime(date_match.group(), '%Y年%m月%d日').strftime('%Y-%m-%d')

shop_name_match = re.search(r'とれたて食楽部', text)
shop_name = shop_name_match.group()

# 商品情報を正規表現で抽出
items = re.findall(r'内\d+ (.+?) ¥(\d+)', text)

# 現在のタイムスタンプを生成
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')

# 出力ファイル名にタイムスタンプを付ける
csv_filename = f'receipt_data_{timestamp}.csv'

# CSVファイルに出力
with open(csv_filename, mode='w', newline='', encoding='utf-8') as csvfile:
    csv_writer = csv.writer(csvfile)
    # ヘッダー行を書き込み
    csv_writer.writerow(["日付", "店名", "商品名", "数量", "金額"])
    for item in items:
        product_name, price = item
        # 商品名の「/」以降を削除
        product_name_clean = product_name.split('/')[0]
        # 出力: 日付, 店名, 商品名, 数量, 金額
        row = f"{date},{shop_name},{product_name_clean},1,{price}"
        csv_writer.writerow(row.split(','))

print(f"CSVファイル '{csv_filename}' に出力しました。")

のCSV変換もモジュール化する

 touch ocr_to_csv.py
# ocr_to_csv.py

import re
import csv
from datetime import datetime

class OCRToCSV:
    def __init__(self, input_file):
        self.input_file = input_file
        self.text = self._read_text_from_file()
        self.date = self._extract_date()
        self.shop_name = self._extract_shop_name()
        self.items = self._extract_items()

    def _read_text_from_file(self):
        with open(self.input_file, 'r', encoding='utf-8') as file:
            return file.read()

    def _extract_date(self):
        date_match = re.search(r'\d{4}年\d{1,2}月\d{1,2}日', self.text)
        if date_match:
            return datetime.strptime(date_match.group(), '%Y年%m月%d日').strftime('%Y-%m-%d')
        return None

    def _extract_shop_name(self):
        shop_name_match = re.search(r'とれたて食楽部', self.text)
        return shop_name_match.group() if shop_name_match else None

    def _extract_items(self):
        return re.findall(r'内\d+ (.+?) ¥(\d+)', self.text)

    def generate_csv(self, output_dir='.', timestamp=None):
        if timestamp is None:
            timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        
        csv_filename = f'{output_dir}/receipt_data_{timestamp}.csv'
        
        with open(csv_filename, mode='w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(["日付", "店名", "商品名", "数量", "金額"])
            for item in self.items:
                product_name, price = item
                product_name_clean = product_name.split('/')[0]
                row = f"{self.date},{self.shop_name},{product_name_clean},1,{price}"
                csv_writer.writerow(row.split(','))

        print(f"CSVファイル '{csv_filename}' に出力しました。")
        return csv_filename

この2つを使い
OCRした結果を受け取りCSVファイルにするようにする

touch main_ocr.py
# main.py

from ocr_module import OCRProcessor
from ocr_to_csv import OCRToCSV

def main():
    # OCRProcessorのインスタンスを作成
    ocr_processor = OCRProcessor()

    # 画像ファイルパスを指定
    image_path = 'test.jpg'

    # 画像からテキストを抽出
    extracted_text = ocr_processor.extract_text_from_image(image_path)

    # OCRToCSVのインスタンスを作成し、抽出されたテキストを処理
    ocr_to_csv = OCRToCSV(extracted_text)
    ocr_to_csv.generate_csv()

if __name__ == "__main__":
    main()

しかし、これで実行すると

I0000 00:00:1723753428.164426 15335390 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/receit_ocr/main_ocr.py", line 21, in <module>
    main()
  File "/Users/snowpool/aw10s/receit_ocr/main_ocr.py", line 17, in main
    ocr_to_csv = OCRToCSV(extracted_text)
  File "/Users/snowpool/aw10s/receit_ocr/ocr_to_csv.py", line 10, in __init__
    self.text = self._read_text_from_file()
  File "/Users/snowpool/aw10s/receit_ocr/ocr_to_csv.py", line 16, in _read_text_from_file
    with open(self.input_file, 'r', encoding='utf-8') as file:
FileNotFoundError: [Errno 2] No such file or directory: 'それで食楽部\n登録番号 15080401017738\nとれたて食楽部\n2024年8月10日 (土) 08:59 #000011\n000801精算機 1 000801精算機1\n3901\nお会計券 #000003 R9309 09:08\n000008 西澤\n内8 きゅうり/鈴木仁 ¥150\nP2023300101503\n内8 きゅうり/小林宗作 ¥130\nP2055600101303\n内8 リーフレタス/(有)成神工 ¥216\nP2086402402169\n小計\n¥496\n(内税8%対象額\n¥496)\n買上点数\n3点\n合計\n¥496\n(税率8%対象額\n¥496)\n(内消費税等 8%\n¥36)\n課税事業者\n(税率 8%対象額\n¥216)\n(内消費税等 8%\n¥16)\n免税事業者\n(税率 8%対象額\n¥280)\nクレジット\n¥496\n(内消費税等\n¥36)\n8、内容は軽減税率対象商品です。\n約專業者商品'

となる

原因はモジュールがファイルを渡す前提になっているため
渡されたテキストを処理するようにする

OCRToCSVクラスの__init__メソッドで、self.textとして直接テキストを受け取るように変更しました。
_read_text_from_fileメソッドは削除しました。代わりに、コンストラクタで直接テキストを受け取ります。

import re
import csv
from datetime import datetime

class OCRToCSV:
    def __init__(self, text):
        self.text = text
        self.date = self._extract_date()
        self.shop_name = self._extract_shop_name()
        self.items = self._extract_items()

    def _extract_date(self):
        date_match = re.search(r'\d{4}年\d{1,2}月\d{1,2}日', self.text)
        if date_match:
            return datetime.strptime(date_match.group(), '%Y年%m月%d日').strftime('%Y-%m-%d')
        return None

    def _extract_shop_name(self):
        shop_name_match = re.search(r'とれたて食楽部', self.text)
        return shop_name_match.group() if shop_name_match else None

    def _extract_items(self):
        return re.findall(r'内\d+ (.+?) ¥(\d+)', self.text)

    def generate_csv(self, output_dir='.', timestamp=None):
        if timestamp is None:
            timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        
        csv_filename = f'{output_dir}/receipt_data_{timestamp}.csv'
        
        with open(csv_filename, mode='w', newline='', encoding='utf-8') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(["日付", "店名", "商品名", "数量", "金額"])
            for item in self.items:
                product_name, price = item
                product_name_clean = product_name.split('/')[0]
                row = f"{self.date},{self.shop_name},{product_name_clean},1,{price}"
                csv_writer.writerow(row.split(','))

        print(f"CSVファイル '{csv_filename}' に出力しました。")
        return csv_filename

これで再度実行すると問題なくCSVファイルが作成される

レシートをCSVファイルに出力

レシートをCSVファイルに出力する

レシート画像を
test.jpg
にリネーム

from google.cloud import vision

client = vision.ImageAnnotatorClient()

with open("test.jpg", "rb") as fb:
    content = fb.read()

image = vision.Image(content=content)

response = client.document_text_detection(image=image)
texts = response.text_annotations
print(texts[0].description)

python vision_api_test.py

として実行

結果

それで食楽部
登録番号 15080401017738
とれたて食楽部

2024年8月10日 (土) 08:59 #000011
000801精算機 1 000801精算機1
3901
お会計券 #000003 R9309 09:08
000008 西澤
内8 きゅうり/鈴木仁 ¥150
P2023300101503
内8 きゅうり/小林宗作 ¥130
P2055600101303
内8 リーフレタス/(有)成神工 ¥216
P2086402402169
小計
¥496
(内税8%対象額
¥496)
買上点数
3点
合計
¥496
(税率8%対象額
¥496)
(内消費税等 8%
¥36)
課税事業者
(税率 8%対象額
¥216)
(内消費税等 8%
¥16)
免税事業者
(税率 8%対象額
¥280)
クレジット
¥496
(内消費税等
¥36)
8、内容は軽減税率対象商品です。
約專業者商品

となる

これをCSVに出力できるようにする
この時に
日付、購入した店名、商品名、数量、商品ごとの金額を抽出し
他の情報を削除する

import re
import csv
from datetime import datetime

# 入力されたテキスト
text = """
それで食楽部
登録番号 15080401017738
とれたて食楽部

2024年8月10日 (土) 08:59 #000011
000801精算機 1 000801精算機1
3901
お会計券 #000003 R9309 09:08
000008 西澤
内8 きゅうり/鈴木仁 ¥150
P2023300101503
内8 きゅうり/小林宗作 ¥130
P2055600101303
内8 リーフレタス/(有)成神工 ¥216
P2086402402169
小計
¥496
(内税8%対象額
¥496)
買上点数
3点
合計
¥496
(税率8%対象額
¥496)
(内消費税等 8%
¥36)
課税事業者
(税率 8%対象額
¥216)
(内消費税等 8%
¥16)
免税事業者
(税率 8%対象額
¥280)
クレジット
¥496
(内消費税等
¥36)
8、内容は軽減税率対象商品です。
約專業者商品
"""

# 情報を抽出
date_match = re.search(r'\d{4}年\d{1,2}月\d{1,2}日', text)
date = datetime.strptime(date_match.group(), '%Y年%m月%d日').strftime('%Y-%m-%d')

shop_name_match = re.search(r'とれたて食楽部', text)
shop_name = shop_name_match.group()

# 商品情報を正規表現で抽出
items = re.findall(r'内\d+ (.+?) ¥(\d+)', text)

# CSV形式で出力
output = []
for item in items:
    product_name, price = item
    row = f"{date},{shop_name},{product_name},1,{price}"
    output.append(row)

# CSVファイルに出力
csv_filename = 'receipt_data.csv'
with open(csv_filename, mode='w', newline='', encoding='utf-8') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow(["日付", "店名", "商品名", "数量", "金額"])
    for row in output:
        csv_writer.writerow(row.split(','))

print(f"CSVファイル '{csv_filename}' に出力しました。")

以下解説

1. 日付の抽出: 正規表現を使用して、日付情報を抽出しています。その後、datetimeを使ってフォーマットをYYYY-MM-DDに変換しています。
2. 店名の抽出: テキスト内の特定の店名を正規表現で探して抽出しています。
3. 商品の抽出: 商品名と価格を正規表現で抽出しています。数量はレシートに明示されていないため、ここでは全て1と仮定しています。
4. CSV形式の出力: csvモジュールを使用して、指定された形式でCSVファイルに出力しています。CSVファイルには、ヘッダー行が含まれています。
このスクリプトを実行すると、指定された形式のCSVファイルが生成されます。生成されたCSVファイルは、receipt_data.csvとして保存されます。このファイルには、各商品の情報がカンマ区切りで含まれています。

しかしこれだと
商品名の後に /生産者もしくは出品者が残ってしまう

実行結果のファイルは

日付,店名,商品名,数量,金額
2024-08-10,とれたて食楽部,きゅうり/鈴木仁,1,150
2024-08-10,とれたて食楽部,きゅうり/小林宗作,1,130
2024-08-10,とれたて食楽部,リーフレタス/(有)成神工,1,216

となっている

このため
商品名の / 以降を削除
またファイルの上書きを防ぐためタイムスタンプをファイル名に追加

import re
import csv
from datetime import datetime

# ファイルからテキストを読み込む
with open('ocr.txt', 'r', encoding='utf-8') as file:
    text = file.read()

# 情報を抽出
date_match = re.search(r'\d{4}年\d{1,2}月\d{1,2}日', text)
date = datetime.strptime(date_match.group(), '%Y年%m月%d日').strftime('%Y-%m-%d')

shop_name_match = re.search(r'とれたて食楽部', text)
shop_name = shop_name_match.group()

# 商品情報を正規表現で抽出
items = re.findall(r'内\d+ (.+?) ¥(\d+)', text)

# 現在のタイムスタンプを生成
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')

# 出力ファイル名にタイムスタンプを付ける
csv_filename = f'receipt_data_{timestamp}.csv'

# CSVファイルに出力
with open(csv_filename, mode='w', newline='', encoding='utf-8') as csvfile:
    csv_writer = csv.writer(csvfile)
    # ヘッダー行を書き込み
    csv_writer.writerow(["日付", "店名", "商品名", "数量", "金額"])
    for item in items:
        product_name, price = item
        # 商品名の「/」以降を削除
        product_name_clean = product_name.split('/')[0]
        # 出力: 日付, 店名, 商品名, 数量, 金額
        row = f"{date},{shop_name},{product_name_clean},1,{price}"
        csv_writer.writerow(row.split(','))

print(f"CSVファイル '{csv_filename}' に出力しました。")

とコード変更

これで

python ocr_to_csv.py

とすれば

receipt_data_20240814054441.csv

が作成され

日付,店名,商品名,数量,金額
2024-08-10,とれたて食楽部,きゅうり,1,150
2024-08-10,とれたて食楽部,きゅうり,1,130
2024-08-10,とれたて食楽部,リーフレタス,1,216

というように商品名のみになる

チラシ解析のソースの公開

チラシ解析のソースの公開

store_adversting_lists
としてリポジトリを作成し公開する

リポジトリは
https://github.com/Snowpooll/store_adversting_lists

descriptionは

OCR the content of the flyer
If your purchase matches
Send product name and flyer image with line notify

チラシの内容をOCRし
購入すとにマッチした場合に
line notify で商品名とチラシ画像を送信する

Public にして
Add a README file
にチェック

License はMITにして
リポジトリを作成

git clone git@github.com:Snowpooll/store_adversting_lists.git

でリポジトリコピー

あとはファイルをコピーしていく

cp ../store_adversting_list/gmail_url_extractor.py .

gmailで件名を指定し未読の最新1件取得しURL抽出

cp ../store_adversting_list/image_downloader.py .

画像ファイルダウンロード

cp ../store_adversting_list/line_notify.py .

Line notify送信関連

cp ../store_adversting_list/settings.json . 

キーワードリスト

cp ../store_adversting_list/utils.py . 

あとでyolov8を使う時に使うモジュール

cp ../store_adversting_list/config.json . 

Line notify と xpathの設定ファイル

cp ../store_adversting_list/image_ocr_notifier.py . 

Cloud vision api のOCRと メイン処理

vim .gitignore

で除外するファイルを設定

token.json
config.json
credentials.json
/images/

もしコミットしているのなら

git rm -r --cached images

で追跡解除可能

次に
requirements.txt
の作成

google-auth
google-auth-oauthlib
google-auth-httplib2
google-api-python-client
pillow
selenium
google-cloud-vision
requests

として保存

あとは公開するので

git add .
git commit -m "add code"

git push -u origin main

で公開

とりあえずREADMEは後で書く

機能の統合

機能の統合

メールからURLを抽出する

gmail_url_extractor.py
でURLを取得し
image_downloader.py
で画像を取得

ocr_list.py
で vision api でOCR

この中で
line_notify.py
を使うことで
OCRした結果をワードリストに一致したものと画像をLINEで送信

とりあえずここまで作成したらgithubで公開

残る機能は
Yolov8のモデルを作成し画像認識させること
キーワードリストを効率的に作成するため
レシートをOCRしたものから
店名
価格
商品名
日付
を取り出しCSVファイルに保存すること

CSVファイルを元にDBを作成し
在庫管理と連携するようにして無駄な買い物を減らすこと

とりあえずまずは
ocr_list.py

gmail_url_extractor.py
でURLを取得し
image_downloader.py
で画像を取得

この画像に対してOCRすればOK

ただし
そのままソースを変えるとテストできなくなるので
別のファイルを作成することにする

なお今日は
おいしい牛乳
が割引らしいのでリストに加える

{
  "keywords": [  
    "麻婆豆腐",
    "キッチンタオル",
    "ほんだし",
    "ぶなしめじ",
    "レタス",
    "キャベツ",
    "おいしい牛乳"

  ]
}

そしてOCRするファイルを作成する

touch image_ocr_notifier.py

中身は

# example_usage.py

from gmail_url_extractor import get_first_unread_email_url
from image_downloader import download_and_merge_images
from google.cloud import vision
import io
import json

def load_settings(file_path='settings.json'):
    with open(file_path, 'r', encoding='utf_8') as settings_json:
        return json.load(settings_json)

def detect_text(image_path):
    """OCRで画像からテキストを抽出"""
    client = vision.ImageAnnotatorClient()
    with io.open(image_path, 'rb') as image_file:
        content = image_file.read()

    image = vision.Image(content=content)
    response = client.document_text_detection(image=image)
    full_text_annotation = response.full_text_annotation

    if response.error.message:
        raise Exception(
            '{}\nFor more info on error messages, check: '
            'https://cloud.google.com/apis/design/errors'.format(
                response.error.message))

    return full_text_annotation.text

def search_words(text, keywords):
    """抽出したテキストからキーワードを検索"""
    hitwords = []
    for keyword in keywords:
        if keyword in text:
            hitwords.append(keyword)
    return hitwords

def main():
    # 設定を読み込む
    settings = load_settings()
    
    # GmailからURLを取得
    url = get_first_unread_email_url('【Shufoo!】お気に入り店舗新着チラシお知らせメール')  # '特売情報'はメールの件名に含まれるキーワード
    
    if url:
        print(f"Processing URL: {url}")
        # 画像をダウンロードしてOCRを実行
        output_path = download_and_merge_images('config.json', url)
        
        if output_path:
            extracted_text = detect_text(output_path)
            hitwords = search_words(extracted_text, settings["keywords"])
            
            if hitwords:
                message = "特売リスト: " + ", ".join(hitwords)
                send_line_notify(message, output_path)
            else:
                print("マッチしたキーワードはありませんでした。")
    else:
        print("未読メールが見つかりませんでした。")

if __name__ == '__main__':
    main()

実行したけど

Processing URL: https://www.shufoo.net/pntweb/shopDetail/860323/?cid=nmail_pc

となるだけ

おそらくxpathが変更になっているので確認

/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]
/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]

同じだが動作していない

image_downloader.pyの動作確認

ログの追加: image_downloader.pyの中にログを追加して、どこで処理が失敗しているかを特定
print(f'Checking image source: {src}')  # ログ追加

となるように

def get_images_from_container(driver, base_xpath):
    """指定されたXPathから画像URLを取得する"""
    image_urls = []
    try:
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            print(f'Checking image source: {src}')  # ログ追加
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

というように
変更

また
CR処理のデバッグ: detect_text関数において、Cloud Vision APIのレスポンスが正常であるか確認

image_ocr_notifier.py
の中に

# OCRの結果をデバッグ表示 
full_text_annotation = response.full_text_annotation print("Extracted Text:", full_text_annotation.text)

を追加するので

def detect_text(image_path):
    """OCRで画像からテキストを抽出"""
    client = vision.ImageAnnotatorClient()
    with io.open(image_path, 'rb') as image_file:
        content = image_file.read()

    image = vision.Image(content=content)
    response = client.document_text_detection(image=image)
    if response.error.message:
        raise Exception(
            '{}\nFor more info on error messages, check: '
            'https://cloud.google.com/apis/design/errors'.format(
                response.error.message))
    
    # OCRの結果をデバッグ表示
    full_text_annotation = response.full_text_annotation
    print("Extracted Text:", full_text_annotation.text)

    return full_text_annotation.text

しかし動作しない

/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]/div[2]/div[1]

ではなく

/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]/div[2]

<div id="cv_1" class="ChirashiView" style="position: absolute; left: 0px; top: -30px; z-index: 1; opacity: 1; cursor: url(&quot;https://www.shufoo.net/site/chirashi_viewer_js/js/../images/openhand_8_8.cur&quot;), default; transition: opacity 200ms ease-in-out;"><div class="ChirashiView_tempDiv" style="position: absolute; overflow: hidden; width: 750px; height: 603px; left: 0px; top: 0px; z-index: 100;"></div><div class="ChirashiContainer" style="position: absolute; left: 0px; top: 0px; width: 750px; height: 603px; z-index: 0; opacity: 1;"><div class="inDiv" style="position: absolute; left: 0px; top: 0px; z-index: 1;"><div id="-2_-2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -1004px; top: -977.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 512px;"></div><div id="-1_-2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -492px; top: -977.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 512px;"></div><div id="0_-2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 20px; top: -977.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 512px;"></div><div id="1_-2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 532px; top: -977.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 198px; height: 512px;"></div><div id="2_-2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 1044px; top: -977.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; height: 512px;"></div><div id="-2_-1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -1004px; top: -465.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 512px;"></div><div id="-1_-1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -492px; top: -465.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 512px;"></div><div id="0_-1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 20px; top: -465.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 512px;"></div><div id="1_-1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 532px; top: -465.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 198px; height: 512px;"></div><div id="2_-1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 1044px; top: -465.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; height: 512px;"></div><div id="-2_0" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -1004px; top: 46.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 510px;"></div><div id="-1_0" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -492px; top: 46.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 510px;"></div><div id="0_0" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 20px; top: 46.5px; width: 512px; height: 512px;"><img draggable="false" src="https://ipqcache2.shufoo.net/c/2024/08/08/25295137072090/index/img/0_100_0.jpg" style="border: 0px; padding: 0px; margin: 0px; width: 512px; height: 510px;"></div><div id="1_0" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 532px; top: 46.5px; width: 512px; height: 512px;"><img draggable="false" src="https://ipqcache2.shufoo.net/c/2024/08/08/25295137072090/index/img/0_100_1.jpg" style="border: 0px; padding: 0px; margin: 0px; width: 198px; height: 510px;"></div><div id="2_0" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 1044px; top: 46.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; height: 510px;"></div><div id="-2_1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -1004px; top: 558.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px;"></div><div id="-1_1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -492px; top: 558.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px;"></div><div id="0_1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 20px; top: 558.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px;"></div><div id="1_1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 532px; top: 558.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 198px;"></div><div id="2_1" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 1044px; top: 558.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px;"></div><div id="-2_2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -1004px; top: 1070.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px;"></div><div id="-1_2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: -492px; top: 1070.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px;"></div><div id="0_2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 20px; top: 1070.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 512px;"></div><div id="1_2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 532px; top: 1070.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px; width: 198px;"></div><div id="2_2" style="position: absolute; opacity: 1; transition: opacity 200ms ease-out; left: 1044px; top: 1070.5px; width: 512px; height: 512px;"><img draggable="false" src="https://www.shufoo.net/site/chirashi_viewer_js/images/transparent.png" style="border: 0px; padding: 0px; margin: 0px;"></div></div><div class="linkDiv" style="position: absolute; left: 0px; top: 0px; z-index: 2;"></div></div></div>

のソース部分が正解らしい

config.jsonの中身を

  "base_xpath_images": "/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]/div[2]"

とxpathを変更することで
とりあえず画像の取得はできたが

 File "/Users/snowpool/aw10s/store_adversting_list/image_ocr_notifier.py", line 67, in <module> main() File "/Users/snowpool/aw10s/store_adversting_list/image_ocr_notifier.py", line 60, in main send_line_notify(message, output_path) NameError: name 'send_line_notify' is not defined

となる

    # # OCRの結果をデバッグ表示
    # full_text_annotation = response.full_text_annotation
    # print("Extracted Text:", full_text_annotation.text)

というようにコメントアウト

from line_notify import send_line_notify

が抜けていたので
これを追記

なぜかGPTでソースを生成すると
モジュールのインポートが削除されることがよくある

無事に処理ができたがログに

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR I0000 00:00:1723322571.066745 14856910 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache Sent images/combined_image_20240811_054251.jpg: 200 {"status":200,"message":"ok"} 

と出る

GPTによれば

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR

このメッセージは、abseil (Absl) ライブラリが初期化される前にログメッセージが標準エラー出力(STDERR)に書き込まれることを示しています。これは、Google CloudのPythonクライアントライブラリやgRPCなどのバックエンドでAbseilライブラリが使用されている場合によく見られます。この警告は通常、以下の点で心配する必要はありません:

初期化前ログの出力: Abseilが初期化される前に生成されるログメッセージが一時的にSTDERRに出力されるという技術的な情報を示しています。
影響なし: このメッセージは一般的にはアプリケーションの動作に影響を与えないため、無視しても問題ありません

I0000 00:00:1723322571.066745 14856910 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache

このメッセージは、gRPCライブラリの内部で実験的な機能が有効になっていることを示す情報ログです。これも通常、開発者が特に指定しない限り、デフォルトで有効になっている設定です。これらの実験的な機能は、以下のような効果を持ちます:
* 実験的機能の有効化: gRPCの内部機能が特定の実験的な設定で動作していることを示します。
* 通常の動作: 通常、gRPCの標準的な利用に影響はなく、情報提供のためのログです。

これらのログメッセージは、技術的な情報や警告を提供するためのものです。特にabsl::InitializeLog()やgRPCに関する警告は、ログの設定やライブラリの内部動作に関するもので、アプリケーションの正常動作には通常影響を与えません

とのこと

Gmailで指定の未読メールの取得

Gmailで指定の未読メールの取得

ラベルがshopで
お気に入り店舗新着チラシお知らせメール
の件名
の未読メールを取得する

本文のURLをクリックし
杏林堂なら
/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]
もしくは
/html/body/div[1]/div[3]/div[1]/div/div[2]
で画像をダウンロードし
複数画像の時には1つに統合する

とりあえずここまでできるようにする

まずは件名で未読のものを取得するようにした

クエリに is:unread を追加しました。この変更により、未読のメールのみが検索対象になります。

import os.path
import base64
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

# 認証情報ファイルのパス
CREDENTIALS_FILE = 'path/to/credentials.json'
TOKEN_FILE = 'token.json'

# Gmail APIのスコープ
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def main():
    # トークンファイルが存在する場合は読み込む
    creds = None
    if os.path.exists(TOKEN_FILE):
        creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
    
    # 認証が有効でない場合は新しく認証を行う
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
            creds = flow.run_local_server(port=0)
        # トークンを保存する
        with open(TOKEN_FILE, 'w') as token:
            token.write(creds.to_json())

    # Gmail APIクライアントを構築
    service = build('gmail', 'v1', credentials=creds)

    # 未読メールを検索
    query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"'
    results = service.users().messages().list(userId='me', q=query).execute()
    messages = results.get('messages', [])

    if not messages:
        print('No unread messages found.')
    else:
        print(f'Found {len(messages)} unread messages:')
        for msg in messages:
            msg_id = msg['id']
            msg = service.users().messages().get(userId='me', id=msg_id).execute()
            msg_snippet = msg['snippet']
            print(f'Message snippet: {msg_snippet}')

if __name__ == '__main__':
    main()

変更点は

query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"'

の部分

さらに最新の1件のみ取得するようにコード変更

変更点は
maxResults パラメータの追加:
service.users().messages().list メソッドに maxResults=1 を追加
これにより、検索結果として最新の1件のみが返される

これで実行すると

Found 1 unread message:
Message snippet: こちらのメールは「Shufoo!」でお気に入り登録した店舗の新着チラシ掲載開始をお知らせするメールです。 以下、1件の新着チラシが掲載開始されました。 ・ピアゴ袋井店https://www.shufoo.net/pntweb/shopDetail/15782/?cid=nmail_pc ※Shufoo!PCサイトまたは、シュフーチラシアプリ(スマートフォン・タブレット端末用) からログインしてお店の
s

となる

次に本文からURLのみ抽出する
ただし複数存在するため最初のURLのみ抽出する

本文からURLを抽出するには、メールの本文を取得し、正規表現を使ってURLを抽出
複数のURLが含まれている場合は、最初のURLのみを抽出

import os.path
import base64
import re
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

# 認証情報ファイルのパス
CREDENTIALS_FILE = 'path/to/credentials.json'
TOKEN_FILE = 'token.json'

# Gmail APIのスコープ
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def extract_first_url(text):
    """テキストから最初のURLを抽出します。"""
    url_pattern = r'https?://[^\s]+'
    urls = re.findall(url_pattern, text)
    return urls[0] if urls else None

def main():
    # トークンファイルが存在する場合は読み込む
    creds = None
    if os.path.exists(TOKEN_FILE):
        creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
    
    # 認証が有効でない場合は新しく認証を行う
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
            creds = flow.run_local_server(port=0)
        # トークンを保存する
        with open(TOKEN_FILE, 'w') as token:
            token.write(creds.to_json())

    # Gmail APIクライアントを構築
    service = build('gmail', 'v1', credentials=creds)

    # 最新の未読メール1件を検索
    query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"'
    results = service.users().messages().list(userId='me', q=query, maxResults=1).execute()
    messages = results.get('messages', [])

    if not messages:
        print('No unread messages found.')
    else:
        print(f'Found {len(messages)} unread message:')
        msg_id = messages[0]['id']
        msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
        msg_payload = msg['payload']
        msg_parts = msg_payload.get('parts', [])
        
        body_data = ""
        for part in msg_parts:
            if part['mimeType'] == 'text/plain':  # or 'text/html' depending on the format you need
                body_data = part['body']['data']
                break
        
        # Base64デコード
        body_text = base64.urlsafe_b64decode(body_data).decode('utf-8')

        # URLを抽出
        first_url = extract_first_url(body_text)
        if first_url:
            print(f'First URL found: {first_url}')
        else:
            print('No URL found in the message.')

if __name__ == '__main__':
    main()

ここで問題発生

Found 1 unread message:
No URL found in the message.

となる

ChatGPTで調べると

変更点
* extract_first_url関数: 正規表現を使ってテキストから最初のURLを抽出する関数を追加しました。
* メール本文の取得: メールのpayloadから本文のデータ部分を取得し、Base64でデコードしています。
* 本文の解析: デコードした本文から最初のURLを抽出しています。
注意点
* MIMEタイプの確認: メールの本文がtext/plainではなくtext/htmlの場合は、if part[‘mimeType’] == ‘text/plain’: を if part[‘mimeType’] == ‘text/html’: に変更する必要があります。
* 複数部品のメール: 一部のメールは複数のpartsに分かれている場合があります。このコードは最初のtext/plainパートからURLを抽出しますが、複雑なメール構造の場合は調整が必要です。

今回の場合

メール本文からURLを抽出するために、正確なデータ部分をデコードすることが必要です。
Gmail APIで取得するメールの本文は複数のパートに分かれていることがあり、
正しいパートからデコードする必要があります。
また、本文の形式がtext/plainかtext/htmlかを確認し、適切にデコードします

get_message_body 関数:
* メールの本文をすべてのパートから取得し、text/plainまたはtext/htmlの内容をデコードします。
* part[‘body’].get(‘data’) を使ってBase64エンコードされた本文データを取得し、デコードして連結します。
本文の抽出:
* get_message_body関数で取得した本文全体からURLを抽出します。
* 正規表現を使用して、最初に見つかったURLを返します。

メール本文のパートを正しく取得してURLを抽出するようにします。メールの本文パートを順番に確認して、デコード

import os.path
import base64
import re
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

# 認証情報ファイルのパス
CREDENTIALS_FILE = 'path/to/credentials.json'
TOKEN_FILE = 'token.json'

# Gmail APIのスコープ
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def extract_first_url(text):
    """テキストから最初のURLを抽出します。"""
    url_pattern = r'https?://[^\s]+'
    urls = re.findall(url_pattern, text)
    return urls[0] if urls else None

def get_message_body(parts):
    """メールの本文を取得します。"""
    body = ""
    for part in parts:
        if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html':
            body_data = part['body'].get('data')
            if body_data:
                body += base64.urlsafe_b64decode(body_data).decode('utf-8')
    return body

def main():
    # トークンファイルが存在する場合は読み込む
    creds = None
    if os.path.exists(TOKEN_FILE):
        creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
    
    # 認証が有効でない場合は新しく認証を行う
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
            creds = flow.run_local_server(port=0)
        # トークンを保存する
        with open(TOKEN_FILE, 'w') as token:
            token.write(creds.to_json())

    # Gmail APIクライアントを構築
    service = build('gmail', 'v1', credentials=creds)

    # 最新の未読メール1件を検索
    query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"'
    results = service.users().messages().list(userId='me', q=query, maxResults=1).execute()
    messages = results.get('messages', [])

    if not messages:
        print('No unread messages found.')
    else:
        print(f'Found {len(messages)} unread message:')
        msg_id = messages[0]['id']
        msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
        msg_payload = msg['payload']
        
        # メッセージ本文を取得
        msg_body = get_message_body(msg_payload.get('parts', []))

        # URLを抽出
        first_url = extract_first_url(msg_body)
        if first_url:
            print(f'First URL found: {first_url}')
        else:
            print('No URL found in the message.')

if __name__ == '__main__':
    main()

しかし結果は

Found 1 unread message:
No URL found in the message.

再度ChatGPTで調べる

メールの本文からURLが正しく抽出されない場合、
メールの構造が予想とは異なる可能性があります。

Gmail APIで取得したメールのペイロードを詳しく確認し、
メールの本文がどのように構造化されているかを理解する必要があります。

メールの内容は、partsとして分割されているか、
bodyに直接含まれている場合があります。

どちらの場合でも、メールの本文をデコードしてURLを抽出することが必要です。
以下は、メールペイロードを詳細に確認してURLを抽出するための修正コード

import os.path
import base64
import re
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

# 認証情報ファイルのパス
CREDENTIALS_FILE = 'path/to/credentials.json'
TOKEN_FILE = 'token.json'

# Gmail APIのスコープ
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def extract_first_url(text):
    """テキストから最初のURLを抽出します。"""
    url_pattern = r'https?://[^\s]+'
    urls = re.findall(url_pattern, text)
    return urls[0] if urls else None

def get_message_body(payload):
    """メールの本文を取得します。"""
    body = ""
    
    if 'data' in payload['body']:
        # 直接bodyに含まれる場合
        body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8')
    elif 'parts' in payload:
        # partsに分割されている場合
        for part in payload['parts']:
            if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html':
                body_data = part['body'].get('data')
                if body_data:
                    body += base64.urlsafe_b64decode(body_data).decode('utf-8')
    
    return body

def main():
    # トークンファイルが存在する場合は読み込む
    creds = None
    if os.path.exists(TOKEN_FILE):
        creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
    
    # 認証が有効でない場合は新しく認証を行う
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
            creds = flow.run_local_server(port=0)
        # トークンを保存する
        with open(TOKEN_FILE, 'w') as token:
            token.write(creds.to_json())

    # Gmail APIクライアントを構築
    service = build('gmail', 'v1', credentials=creds)

    # 最新の未読メール1件を検索
    query = 'is:unread subject:"【Shufoo!】お気に入り店舗新着チラシお知らせメール"'
    results = service.users().messages().list(userId='me', q=query, maxResults=1).execute()
    messages = results.get('messages', [])

    if not messages:
        print('No unread messages found.')
    else:
        print(f'Found {len(messages)} unread message:')
        msg_id = messages[0]['id']
        msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
        msg_payload = msg['payload']
        
        # メッセージ本文を取得
        msg_body = get_message_body(msg_payload)

        # URLを抽出
        first_url = extract_first_url(msg_body)
        if first_url:
            print(f'First URL found: {first_url}')
        else:
            print('No URL found in the message.')

if __name__ == '__main__':
    main()

修正点
1. get_message_body関数の改良:
* payload[‘body’]に直接データが含まれる場合も考慮しました。この場合、partsはありません。
* partsがある場合でも、本文がtext/plainかtext/htmlのいずれかである部分を探します。
2. URL抽出の再確認:
* extract_first_url関数で、取得した本文全体から最初のURLを抽出します。

これで

Found 1 unread message:
First URL found: https://www.shufoo.net/pntweb/shopDetail/15782/?cid=nmail_pc

というようにURLを抽出できた

これを
Seleniumで開き画像をダウンロードするモジュールの引数に当てるようにする

Xpathはcoopだけ違うようだが
実際には同じだった
ダウンロードファイルはタイムスタンプをつければかぶることはないし
そのままcloud vision api で処理してLINEで送信すれば問題ない

このためクリック処理は不要でそのまま画像をダウンロードするようにする

import os
import time
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def open_link_in_safari(url):
    # Safariドライバーを使用してブラウザを起動
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def get_images_from_container(driver, base_xpath):
    image_urls = []
    try:
        # コンテナ内の画像要素を探す
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            # 特定の条件に基づいて画像をフィルタリング
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    widths, heights = zip(*(img.size for img in images))

    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def main():
    url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    
    # 画像を取得してダウンロードする
    base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]'

    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付を取得してフォーマット
            current_date = datetime.now().strftime('%Y%m%d')
            # カレントディレクトリにimagesフォルダを作成
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)  # ディレクトリが存在しない場合は作成
            output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg')
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

これで無事に画像のダウンロードができた

ただし頻繁にxpathが変わるようなので
設定ファイルを作成し
そこで設定したxpathを

    base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]'

で指定しているxpath になるようにコード変更

設定ファイルは
config.json
だが
既にLINE APIの設定で使っているので
これに

"base_xpath_images": "/html/body/div[1]/div[3]/div[1]/div/div[2]/div[3]"

の項目を追加する

import os
import time
import json
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def load_config(config_file):
    """設定ファイルからコンフィグを読み込む"""
    with open(config_file, 'r') as file:
        config = json.load(file)
    return config

def open_link_in_safari(url):
    # Safariドライバーを使用してブラウザを起動
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def get_images_from_container(driver, base_xpath):
    image_urls = []
    try:
        # コンテナ内の画像要素を探す
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            # 特定の条件に基づいて画像をフィルタリング
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    widths, heights = zip(*(img.size for img in images))

    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def main():
    # 設定ファイルを読み込む
    config = load_config('config.json')
    
    url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    
    # 設定ファイルからXPathを取得して画像を取得
    base_xpath_images = config['base_xpath_images']
    
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付を取得してフォーマット
            current_date = datetime.now().strftime('%Y%m%d')
            # カレントディレクトリにimagesフォルダを作成
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)  # ディレクトリが存在しない場合は作成
            output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg')
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

またタイムスタンプをファイル名に追加すれば上書き防止になるので

タイムスタンプのフォーマット: datetime.now().strftime(‘%Y%m%d_%H%M%S’)で、
現在の日付と時刻をフォーマットし、年/月/日 時:分:秒の順にします。

これにより、ファイル名が一意になります。
ファイル名への追加: output_pathの生成時にタイムスタンプを含めることで、
同日に何度実行してもファイルが上書きされないようにしました。

フォルダの作成: 出力先のディレクトリが存在しない場合、自動的に作成されます。

これらを組み込み

import os
import time
import json
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def load_config(config_file):
    """設定ファイルからコンフィグを読み込む"""
    with open(config_file, 'r') as file:
        config = json.load(file)
    return config

def open_link_in_safari(url):
    # Safariドライバーを使用してブラウザを起動
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def get_images_from_container(driver, base_xpath):
    image_urls = []
    try:
        # コンテナ内の画像要素を探す
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            # 特定の条件に基づいて画像をフィルタリング
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    widths, heights = zip(*(img.size for img in images))

    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def main():
    # 設定ファイルを読み込む
    config = load_config('config.json')
    
    url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    
    # 設定ファイルからXPathを取得して画像を取得
    base_xpath_images = config['base_xpath_images']
    
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付と時刻を取得してフォーマット
            current_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            # カレントディレクトリにimagesフォルダを作成
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)  # ディレクトリが存在しない場合は作成
            output_path = os.path.join(output_dir, f'combined_image_{current_timestamp}.jpg')
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

これで実行すると

Found image: https://ipqcache2.shufoo.net/c/2024/08/09/96510937073938/index/img/0_100_0.jpg
Found image: https://ipqcache2.shufoo.net/c/2024/08/09/96510937073938/index/img/0_100_1.jpg
Downloaded image_0.jpg
Downloaded image_1.jpg
Saved combined image as images/combined_image_20240809_233141.jpg

となって無事にダウンロードが実行される

次にこれをモジュールにする

# image_downloader.py

import os
import json
import time
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def load_config(config_file):
    """設定ファイルからコンフィグを読み込む"""
    with open(config_file, 'r') as file:
        config = json.load(file)
    return config

def open_link_in_safari(url):
    """指定されたURLをSafariで開く"""
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def get_images_from_container(driver, base_xpath):
    """指定されたXPathから画像URLを取得する"""
    image_urls = []
    try:
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    """画像URLから画像をダウンロードする"""
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    """複数の画像を結合して保存する"""
    if not images:
        print("No images to merge.")
        return

    widths, heights = zip(*(img.size for img in images))
    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def download_and_merge_images(config_file, url):
    """画像をダウンロードして結合するメイン関数"""
    config = load_config(config_file)
    driver = open_link_in_safari(url)

    base_xpath_images = config['base_xpath_images']
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            current_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)
            output_path = os.path.join(output_dir, f'combined_image_{current_timestamp}.jpg')
            merge_images(images, output_path)

として
image_downloader.py
を作成

ただし、これだと最新のファイルを探すなどの処理が必要になるため
作成したファイル名を返り値として渡すようにすれば
そのファイルに対して
Cloud vision api を実行できるはず

# image_downloader.py

import os
import json
import time
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def load_config(config_file):
    """設定ファイルからコンフィグを読み込む"""
    with open(config_file, 'r') as file:
        config = json.load(file)
    return config

def open_link_in_safari(url):
    """指定されたURLをSafariで開く"""
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def get_images_from_container(driver, base_xpath):
    """指定されたXPathから画像URLを取得する"""
    image_urls = []
    try:
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    """画像URLから画像をダウンロードする"""
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    """複数の画像を結合して保存する"""
    if not images:
        print("No images to merge.")
        return

    widths, heights = zip(*(img.size for img in images))
    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def download_and_merge_images(config_file, url):
    """画像をダウンロードして結合するメイン関数"""
    config = load_config(config_file)
    driver = open_link_in_safari(url)

    base_xpath_images = config['base_xpath_images']
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            current_timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)
            output_path = os.path.join(output_dir, f'combined_image_{current_timestamp}.jpg')
            merge_images(images, output_path)
            return output_path  # 生成されたファイル名を返す
    return None  # 画像がなかった場合はNoneを返す

次にgmailの未読メールからURLを取得する部分もモジュール化する

touch gmail_url_extractor.py

でファイルを作成

import os.path
import base64
import re
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

# 認証情報ファイルのパス
CREDENTIALS_FILE = 'path/to/credentials.json'
TOKEN_FILE = 'token.json'

# Gmail APIのスコープ
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def authenticate_gmail():
    """Gmail APIに認証し、サービスを構築します。"""
    creds = None
    if os.path.exists(TOKEN_FILE):
        creds = Credentials.from_authorized_user_file(TOKEN_FILE, SCOPES)
    
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(TOKEN_FILE, 'w') as token:
            token.write(creds.to_json())

    service = build('gmail', 'v1', credentials=creds)
    return service

def extract_first_url(text):
    """テキストから最初のURLを抽出します。"""
    url_pattern = r'https?://[^\s]+'
    urls = re.findall(url_pattern, text)
    return urls[0] if urls else None

def get_message_body(payload):
    """メールの本文を取得します。"""
    body = ""
    
    if 'data' in payload['body']:
        body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8')
    elif 'parts' in payload:
        for part in payload['parts']:
            if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html':
                body_data = part['body'].get('data')
                if body_data:
                    body += base64.urlsafe_b64decode(body_data).decode('utf-8')
    
    return body

def get_first_unread_email_url(subject_query):
    """指定された件名を持つ未読メールから最初のURLを取得します。"""
    service = authenticate_gmail()

    query = f'is:unread subject:"{subject_query}"'
    results = service.users().messages().list(userId='me', q=query, maxResults=1).execute()
    messages = results.get('messages', [])

    if not messages:
        print('No unread messages found.')
        return None
    
    msg_id = messages[0]['id']
    msg = service.users().messages().get(userId='me', id=msg_id, format='full').execute()
    msg_payload = msg['payload']
    
    msg_body = get_message_body(msg_payload)
    first_url = extract_first_url(msg_body)
    
    return first_url

として保存

使う時には

from gmail_url_extractor import get_first_unread_email_url

def main():
    subject_query = "【Shufoo!】お気に入り店舗新着チラシお知らせメール"
    url = get_first_unread_email_url(subject_query)
    
    if url:
        print(f'First URL found: {url}')
    else:
        print('No URL found in the message.')

if __name__ == '__main__':
    main()

というように使う

ファイルのダウンロードと生成モジュールは

# example_usage.py

from image_downloader import download_and_merge_images

def main():
    config_file = 'config.json'
    url = 'https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc'
    output_path = download_and_merge_images(config_file, url)
    
    if output_path:
        print(f"Generated file: {output_path}")
        # Cloud Vision APIを実行するコードをここに追加
        # example: run_cloud_vision_api(output_path)
    else:
        print("No image file was created.")

if __name__ == '__main__':
    main()

として使う

とりあえずほとんど準備できたので
あとは
ocr_list.py

gmail_url_extractor.py
でURLを取得し
image_downloader.py
で画像を取得

この画像に対してOCRすればOKとなる

チラシ詳細の取得

チラシ詳細の取得

Shufooで直リンクすれば
Gmail処理は不要

https://www.shufoo.net/pntweb/shopDetail/860323/?cid=nmail_pc
杏林堂
キーワード
日替

https://www.shufoo.net/pntweb/shopDetail/197728/?cid=nmail_pc
ユーコープ
キーワード
ユーコープのお買い得!

https://www.shufoo.net/pntweb/shopDetail/15782/?cid=nmail_pc
ぴあご

というように
それぞれみたいチラシのURLとキーワードがペアになっている
ならばこれをJSONとかにして当てはめることができるはず

もしかしたらxpathも同じかもしれないので

cp clik_allget_image.py piago.py

でコピーして
ぴあごとコープで試す

どうやらxpathも同じ
ただし coopのチラシがOCRの精度が良くない

とりあえずぴあごのチラシのリンクはできたけど
画像のダウンロードができていない

とりあえず杏林堂は毎日テストできるので
先に杏林堂のチラシでLINE送信を試す

def wait_for_page_load(driver, timeout=30):
    WebDriverWait(driver, timeout).until(
        EC.presence_of_element_located((By.XPATH, '//img'))  # ページに画像が表示されるまで待機
    )

を追加してみる

import os
import time
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def open_link_in_safari(url):
    # Safariドライバーを使用してブラウザを起動
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def click_date_element(driver, base_xpath):
    try:
        # コンテナ内の日付要素を探してクリック
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        links = container.find_elements(By.XPATH, ".//a[contains(@title, '日替')]")

        for link in links:
            if '日替' in link.get_attribute('title'):
                link.click()
                print(f'Clicked on link with title: {link.get_attribute("title")}')
                time.sleep(3)  # クリックした後に3秒間待機
                return

        print('No link found with title containing: 日替')
    except Exception as e:
        print(f'Error clicking on element: {e}')

def get_images_from_container(driver, base_xpath):
    image_urls = []
    try:
        # コンテナ内の画像要素を探す
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            # 特定の条件に基づいて画像をフィルタリング
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    widths, heights = zip(*(img.size for img in images))

    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def main():
    url = 'https://www.shufoo.net/pntweb/shopDetail/860323/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    # 特定のリンクをクリックする
    base_xpath_click = '/html/body/div[1]/div[3]/div[1]/div/div[4]/div/div/div/div/div/div/ul'
    click_date_element(driver, base_xpath_click)
    
    # 画像を取得してダウンロードする
    base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]'
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付を取得してフォーマット
            current_date = datetime.now().strftime('%Y%m%d')
            # カレントディレクトリにimagesフォルダを作成
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)  # ディレクトリが存在しない場合は作成
            output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg')
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

import os
import time
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def open_link_in_safari(url):
    # Safariドライバーを使用してブラウザを起動
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    wait_for_page_load(driver)  # ページの読み込みを待機
    return driver

def wait_for_page_load(driver, timeout=30):
    """
    ページに画像が表示されるまで待機する関数。
    """
    try:
        WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((By.XPATH, '//img'))
        )
        print("Page loaded successfully.")
    except Exception as e:
        print(f"Error waiting for page to load: {e}")

def click_date_element(driver, base_xpath):
    try:
        # コンテナ内の日付要素を探してクリック
        container = WebDriverWait(driver, 30).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        links = container.find_elements(By.XPATH, ".//a[contains(@title, '日替')]")

        for link in links:
            if '日替' in link.get_attribute('title'):
                link.click()
                print(f'Clicked on link with title: {link.get_attribute("title")}')
                wait_for_page_load(driver)  # クリック後のページ読み込みを待機
                return

        print('No link found with title containing: 日替')
    except Exception as e:
        print(f'Error clicking on element: {e}')

def get_images_from_container(driver, base_xpath):
    image_urls = []
    try:
        # コンテナ内の画像要素を探す
        container = WebDriverWait(driver, 30).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            # 特定の条件に基づいて画像をフィルタリング
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    widths, heights = zip(*(img.size for img in images))

    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def main():
    url = 'https://www.shufoo.net/pntweb/shopDetail/860323/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    # 特定のリンクをクリックする
    base_xpath_click = '/html/body/div[1]/div[3]/div[1]/div/div[4]/div/div/div/div/div/div/ul'
    click_date_element(driver, base_xpath_click)
    
    # 画像を取得してダウンロードする
    base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]'
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付を取得してフォーマット
            current_date = datetime.now().strftime('%Y%m%d')
            # カレントディレクトリにimagesフォルダを作成
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)  # ディレクトリが存在しない場合は作成
            output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg')
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

としてみる

チラシの商品名をLINE送信

チラシの商品名をLINE送信

ocr_list.py
でキーワードと一致した商品名の取得ができるので
これを
line_notify.pyにセットして送信するようにする

import json
from google.cloud import vision
import io

# 設定ファイルの読み込み
settings_json = open('settings.json', 'r', encoding='utf_8')
settings = json.load(settings_json)

# OCRで画像からテキストを抽出
def detect_text(image_paths):
    client = vision.ImageAnnotatorClient()

    all_text = ''

    for image_path in image_paths:
        with io.open(image_path, 'rb') as image_file:
            content = image_file.read()

        image = vision.Image(content=content)

        # document_text_detectionを使用して文書全体のテキストを取得
        response = client.document_text_detection(image=image)
        full_text_annotation = response.full_text_annotation

        # テキストの抽出
        all_text += full_text_annotation.text

        if response.error.message:
            raise Exception(
                '{}\nFor more info on error messages, check: '
                'https://cloud.google.com/apis/design/errors'.format(
                    response.error.message))

    return all_text

# キーワード検索
def search_words(all_text):
    hitwords = []
    for keyword in settings["keywords"]:
        if keyword in all_text:
            hitwords.append(keyword)

    return hitwords

# 例として実行
if __name__ == "__main__":
    image_paths = ["images/combined_image_20240805.jpg"]
    extracted_text = detect_text(image_paths)
    hitwords = search_words(extracted_text)
    
    # ヒットしたキーワードのみを表示
    if hitwords:
        print("マッチしたキーワード:", ", ".join(hitwords))
    else:
        print("マッチしたキーワードはありませんでした。")

の中で

import requests
import os
from PIL import Image
from io import BytesIO
from utils import load_config, get_latest_directory, get_image_files

def resize_image_if_needed(image_data, max_size=3 * 1024 * 1024):
    if len(image_data) > max_size:
        image = Image.open(BytesIO(image_data))
        new_size = (image.width // 2, image.height // 2)
        image = image.resize(new_size, Image.LANCZOS)

        output = BytesIO()
        image_format = image.format if image.format else 'JPEG'
        image.save(output, format=image_format)
        return output.getvalue()
    return image_data

def send_line_notify(message, config_path='config.json'):
    # 設定ファイルを読み込む
    config = load_config(config_path)

    # 設定ファイルからトークンとディレクトリパスを取得
    token = config['token']
    base_path = config['image_file_path']

    # 最新のpredictディレクトリを取得
    latest_dir = get_latest_directory(base_path)
    image_files = get_image_files(latest_dir)

    url = 'https://notify-api.line.me/api/notify'

    headers = {'Authorization': f"Bearer {token}"}
    params = {'message': message}

    # 最新のpredictディレクトリ内の全ての画像ファイルに対してLINE Notify APIにリクエストを送信
    for image_file_path in image_files:
        with open(image_file_path, 'rb') as img_file:
            img_data = img_file.read()
            img_data = resize_image_if_needed(img_data)

            # ファイルデータをバイトデータとして用意
            files = {'imageFile': BytesIO(img_data)}
            files['imageFile'].name = os.path.basename(image_file_path)

            # LINE Notify APIにリクエストを送信
            res = requests.post(url, headers=headers, params=params, files=files)

            # レスポンスを出力
            print(f"File: {image_file_path}")
            print(res.status_code)
            print(res.text)

を呼び出し
hit words を line notifyで送信したい

その前に

import requests
import os
from PIL import Image
from io import BytesIO
from utils import load_config, get_latest_directory, get_image_files

def resize_image_if_needed(image_data, max_size=3 * 1024 * 1024):
    if len(image_data) > max_size:
        image = Image.open(BytesIO(image_data))
        new_size = (image.width // 2, image.height // 2)
        image = image.resize(new_size, Image.LANCZOS)

        output = BytesIO()
        image_format = image.format if image.format else 'JPEG'
        image.save(output, format=image_format)
        return output.getvalue()
    return image_data

def send_line_notify(message, config_path='config.json'):
    # 設定ファイルを読み込む
    config = load_config(config_path)

    # 設定ファイルからトークンとディレクトリパスを取得
    token = config['token']
    base_path = config['image_file_path']

    # 最新のpredictディレクトリを取得
    latest_dir = get_latest_directory(base_path)
    image_files = get_image_files(latest_dir)

    url = 'https://notify-api.line.me/api/notify'

    headers = {'Authorization': f"Bearer {token}"}
    params = {'message': message}

    # 最新のpredictディレクトリ内の全ての画像ファイルに対してLINE Notify APIにリクエストを送信
    for image_file_path in image_files:
        with open(image_file_path, 'rb') as img_file:
            img_data = img_file.read()
            img_data = resize_image_if_needed(img_data)

            # ファイルデータをバイトデータとして用意
            files = {'imageFile': BytesIO(img_data)}
            files['imageFile'].name = os.path.basename(image_file_path)

            # LINE Notify APIにリクエストを送信
            res = requests.post(url, headers=headers, params=params, files=files)

            # レスポンスを出力
            print(f"File: {image_file_path}")
            print(res.status_code)
            print(res.text)

で今回はyolov8は使ってないので
まずはテキストのみにする

import requests
import os
from utils import load_config

def send_line_notify(message, config_path='config.json'):
    # 設定ファイルを読み込む
    config = load_config(config_path)

    # 設定ファイルからトークンを取得
    token = config['token']

    url = 'https://notify-api.line.me/api/notify'

    headers = {'Authorization': f"Bearer {token}"}
    params = {'message': message}

    # LINE Notify APIにリクエストを送信
    res = requests.post(url, headers=headers, params=params)

    # レスポンスを出力
    print(res.status_code)
    print(res.text)

# 例として実行
if __name__ == "__main__":
    message = "マッチしたキーワード: サンプルキーワード"
    send_line_notify(message)

とりあえず main 部分は削除して
ocr_list.pyの中で呼び出したい

import json
from google.cloud import vision
import io
from line_notify import send_line_notify

# 設定ファイルの読み込み
def load_settings(file_path='settings.json'):
    with open(file_path, 'r', encoding='utf_8') as settings_json:
        return json.load(settings_json)

# OCRで画像からテキストを抽出
def detect_text(image_paths):
    client = vision.ImageAnnotatorClient()

    all_text = ''

    for image_path in image_paths:
        with io.open(image_path, 'rb') as image_file:
            content = image_file.read()

        image = vision.Image(content=content)

        # document_text_detectionを使用して文書全体のテキストを取得
        response = client.document_text_detection(image=image)
        full_text_annotation = response.full_text_annotation

        # テキストの抽出
        all_text += full_text_annotation.text

        if response.error.message:
            raise Exception(
                '{}\nFor more info on error messages, check: '
                'https://cloud.google.com/apis/design/errors'.format(
                    response.error.message))

    return all_text

# キーワード検索
def search_words(all_text, keywords):
    hitwords = []
    for keyword in keywords:
        if keyword in all_text:
            hitwords.append(keyword)

    return hitwords

# 例として実行
if __name__ == "__main__":
    settings = load_settings()
    image_paths = ["images/combined_image_20240805.jpg"]
    extracted_text = detect_text(image_paths)
    hitwords = search_words(extracted_text, settings["keywords"])
    
    # ヒットしたキーワードをLINE Notifyで送信
    if hitwords:
        message = "マッチしたキーワード: " + ", ".join(hitwords)
        send_line_notify(message)
    else:
        print("マッチしたキーワードはありませんでした。")

これを実行したらLINEで送信されたのでOK

このままだとメッセージが分かりにくいので
マッチしたキーワードから
特売リスト
にメッセージを変更

そしてOCRした画像ファイルも一緒にLINE送信するように
line_notify.pyのソースを変更

import requests
import os
from utils import load_config

def send_line_notify(message, config_path='config.json'):
    # 設定ファイルを読み込む
    config = load_config(config_path)

    # 設定ファイルからトークンを取得
    token = config['token']

    url = 'https://notify-api.line.me/api/notify'

    headers = {'Authorization': f"Bearer {token}"}
    params = {'message': message}

    # LINE Notify APIにリクエストを送信
    res = requests.post(url, headers=headers, params=params)

    # レスポンスを出力
    print(res.status_code)
    print(res.text)

import requests
from utils import load_config
from io import BytesIO
from PIL import Image

def resize_image_if_needed(image_data, max_size=3 * 1024 * 1024):
    """
    画像が指定されたサイズを超える場合は、画像のサイズを縮小する。

    Args:
        image_data (bytes): 画像データ。
        max_size (int): 最大ファイルサイズ(バイト)。

    Returns:
        bytes: サイズ変更後の画像データ。
    """
    if len(image_data) > max_size:
        image = Image.open(BytesIO(image_data))
        new_size = (image.width // 2, image.height // 2)
        image = image.resize(new_size, Image.LANCZOS)

        output = BytesIO()
        image_format = image.format if image.format else 'JPEG'
        image.save(output, format=image_format)
        return output.getvalue()
    return image_data

def send_line_notify(message, image_path=None, config_path='config.json'):
    """
    LINE Notifyを使用してメッセージとオプションで画像を送信する関数。

    Args:
        message (str): 送信するメッセージ。
        image_path (str): 送信する画像のパス。
        config_path (str): 設定ファイルのパス。

    Returns:
        None
    """
    # 設定ファイルを読み込む
    config = load_config(config_path)

    # 設定ファイルからトークンを取得
    token = config['token']

    url = 'https://notify-api.line.me/api/notify'

    headers = {'Authorization': f"Bearer {token}"}
    params = {'message': message}

    # 画像がある場合は読み込み
    files = None
    if image_path:
        with open(image_path, 'rb') as img_file:
            img_data = img_file.read()
            img_data = resize_image_if_needed(img_data)
            files = {'imageFile': BytesIO(img_data)}
            files['imageFile'].name = os.path.basename(image_path)

    # LINE Notify APIにリクエストを送信
    res = requests.post(url, headers=headers, params=params, files=files)

    # レスポンスを出力
    print(res.status_code)
    print(res.text)

しかし

WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1723040085.190147 14567125 config.cc:230] gRPC experiments enabled: call_status_override_on_cancellation, event_engine_dns, event_engine_listener, http2_stats_fix, monitoring_experiment, pick_first_new, trace_record_callops, work_serializer_clears_time_cache
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/store_adversting_list/ocr_list.py", line 57, in <module>
    send_line_notify(message,image_path)
  File "/Users/snowpool/aw10s/store_adversting_list/line_notify.py", line 54, in send_line_notify
    with open(image_path, 'rb') as img_file:
TypeError: expected str, bytes or os.PathLike object, not list

となる

send_line_notify関数は単一の画像パスを期待しているため、リストから1つずつ画像パスを取り出して送信する必要があります。

ソースを

import requests
from utils import load_config
from io import BytesIO
from PIL import Image
import os

def resize_image_if_needed(image_data, max_size=3 * 1024 * 1024):
    """
    画像が指定されたサイズを超える場合は、画像のサイズを縮小する。

    Args:
        image_data (bytes): 画像データ。
        max_size (int): 最大ファイルサイズ(バイト)。

    Returns:
        bytes: サイズ変更後の画像データ。
    """
    if len(image_data) > max_size:
        image = Image.open(BytesIO(image_data))
        new_size = (image.width // 2, image.height // 2)
        image = image.resize(new_size, Image.LANCZOS)

        output = BytesIO()
        image_format = image.format if image.format else 'JPEG'
        image.save(output, format=image_format)
        return output.getvalue()
    return image_data

def send_line_notify(message, image_paths=None, config_path='config.json'):
    """
    LINE Notifyを使用してメッセージとオプションで画像を送信する関数。
    複数の画像パスをリストとして受け取ることができます。

    Args:
        message (str): 送信するメッセージ。
        image_paths (list): 送信する画像のパスのリスト。
        config_path (str): 設定ファイルのパス。

    Returns:
        None
    """
    # 設定ファイルを読み込む
    config = load_config(config_path)

    # 設定ファイルからトークンを取得
    token = config['token']

    url = 'https://notify-api.line.me/api/notify'

    headers = {'Authorization': f"Bearer {token}"}
    params = {'message': message}

    # 画像がリストとして渡されている場合に対応
    if image_paths is not None:
        if not isinstance(image_paths, list):
            image_paths = [image_paths]

        for image_path in image_paths:
            if image_path is not None:
                with open(image_path, 'rb') as img_file:
                    img_data = img_file.read()
                    img_data = resize_image_if_needed(img_data)
                    files = {'imageFile': BytesIO(img_data)}
                    files['imageFile'].name = os.path.basename(image_path)

                    # LINE Notify APIにリクエストを送信
                    res = requests.post(url, headers=headers, params=params, files=files)

                    # レスポンスを出力
                    print(f"Sent {image_path}: {res.status_code} {res.text}")
    else:
        # 画像がない場合はメッセージのみ送信
        res = requests.post(url, headers=headers, params=params)
        print(f"Sent message: {res.status_code} {res.text}")

とすることで複数の画像に対応

リストに対応: image_paths引数がリスト型で渡されても対応できるようにしました。リストでない場合は、単一の画像パスとしてリストに変換して扱います。
画像のループ処理: 画像が渡された場合はループでそれぞれの画像をLINE Notifyに送信します。
画像なしの場合: 画像が渡されなかった場合は、メッセージのみを送信します。

Jsonファイルを元に購入リストがあるか判定

Jsonファイルを元に購入リストがあるか判定

 vim ocr_list.py   

でファイルを作成

import json
settings_json = open('settings.json', 'r', encoding='utf_8')
settings = json.load(settings_json)

# 公式サイトからpdfリンク一覧取得
def get_urls():
  import requests
  from bs4 import BeautifulSoup

  params = { settings['url_params_name']: settings['url_params_value'] }
  load_url = settings['url']
  html = requests.get(load_url, params=params)
  soup = BeautifulSoup(html.text, 'html.parser')

  flyer_list = soup.find_all('table')
  url_list = []
  for flyer in flyer_list:
    # 日付
    date = flyer.find('div', {'class': 'sale'}).find('a').get_text(strip=True).replace(' ', '').replace('(', '(').replace(')', ')')
    
    # PDF(表)
    omote_url = flyer.find('a', {'title': 'PDFオモテ'})['href']
    omote = {}
    omote['date'] = date
    omote['url'] = settings['url_stem'] + omote_url.replace('../', '')
    url_list.append(omote)

    # PDF(裏)
    if flyer.find('a', {'title': 'PDFウラ'}):
      ura_url = flyer.find('a', {'title': 'PDFウラ'})['href'] 
      ura = {}
      ura['date'] = date
      ura['url'] = settings['url_stem'] + ura_url.replace('../', '')
      url_list.append(ura)

  return url_list

# 未解析のチラシURLを取得
def get_new_urls(url_list):
  # urls.txt読込
  old_urls = []
  with open('urls.txt', 'r') as f:
    old_urls = f.read().splitlines()

  new_urls = []
  urls_text = []
  count = 0
  for url_info in url_list:
    urls_text.append(url_info['url'] + '\n')

    if url_info['url'] not in old_urls:
      # 新規
      url_info['number'] = count
      new_urls.append(url_info)
      count += 1
  
  # urls.txt書込
  f = open('urls.txt', 'w')
  f.writelines(urls_text)
  f.close()

  return new_urls

# 未解析のpdfをDL
def dl_pdfs(new_url_list):
  import urllib.request
  import time

  pdf_list = []
  for url_info in new_url_list:
    # 表
    file_name = f'pdf/{url_info["number"]}.pdf'
    urllib.request.urlretrieve(url_info['url'], file_name)
    url_info['pdf_path'] = file_name

    time.sleep(2)

    pdf_list.append(url_info)

  return pdf_list

# PDFをJPGに変換
def pdf_to_jpeg(path):
  import os
  from pathlib import Path
  from pdf2image import convert_from_path

  # poppler/binを環境変数PATHに追加する
  poppler_dir = Path(__file__).parent.absolute() / 'lib/poppler/bin'
  os.environ['PATH'] += os.pathsep + str(poppler_dir)

  image_paths = []

  pdf_path = Path(path)
  # PDF -> Image に変換(150dpi)
  pages = convert_from_path(str(pdf_path), 150)

  # 画像ファイルを1ページずつ保存
  image_dir = Path('./jpg')
  for i, page in enumerate(pages):
    file_name = pdf_path.stem + '_{:02d}'.format(i + 1) + '.jpeg'
    image_path = image_dir / file_name
    # JPEGで保存
    page.save(str(image_path), 'JPEG')
    image_paths.append(image_path)

  return image_paths

# 複数チラシをJPGに変換
def pdfs_to_jpeg(pdf_list):
  jpg_list = []
  for pdf_info in pdf_list:
    jpg_info = pdf_info
    # 表
    omote_image_paths = pdf_to_jpeg(pdf_info['pdf_path'])
    jpg_info['image_paths'] = omote_image_paths

    jpg_list.append(jpg_info)

  return jpg_list

# OCR
def detect_text(image_paths):
  from google.cloud import vision
  import io
  client = vision.ImageAnnotatorClient()

  all_text = ''

  for image_path in image_paths:
    with io.open(image_path, 'rb') as image_file:
      content = image_file.read()

    image = vision.Image(content=content)
    
    # pylint: disable=no-member
    response = client.text_detection(image=image)
    texts = response.text_annotations

    for text in texts:
      all_text += str(text.description)

    if response.error.message:
      raise Exception(
        '{}\nFor more info on error messages, check: '
        'https://cloud.google.com/apis/design/errors'.format(
          response.error.message))

  return all_text

# キーワード検索
def search_words(all_text):
  hitwords = []
  for keyword in settings["keywords"]:
    if keyword in all_text:
      hitwords.append(keyword)

  return hitwords

# キーワードに引っかかったチラシ取得
def get_target_flyers(jpg_list):
  result = []
  for jpg_info in jpg_list:
    all_text = detect_text(jpg_info['image_paths'])
    hitwords = search_words(all_text)

    if len(hitwords) != 0:
      hit = jpg_info
      hit['hitwords'] = hitwords
      result.append(hit)

  return result

# Slack通知
def slack_notice(results):
  import slackweb
  slack = slackweb.Slack(url=settings['slack_webhook_url'])
  for result in results:
    text = f'{result["date"]} チラシ掲載商品:{",".join(result["hitwords"])}\n<{result["url"]}|チラシを見る>'
    slack.notify(text=text)

### FlyerOCR ###
import shutil
import os
os.makedirs('pdf/', exist_ok=True)
os.makedirs('jpg/', exist_ok=True)

url_list = get_urls()
new_url_list = get_new_urls(url_list)
pdf_list = dl_pdfs(new_url_list)
jpg_list = pdfs_to_jpeg(pdf_list)
results = get_target_flyers(jpg_list)
slack_notice(results)

shutil.rmtree('pdf/')
shutil.rmtree('jpg/')

のコードを書き換える

text_detection

画像内のテキスト要素を検出するのに適しており、一般的なOCRタスクに使用される

これを
document_text_detection
を使い
文書のスキャンや複雑なレイアウトを持つ画像に対して適しており、より詳細なテキスト情報を取得できるようにする

# OCR
def detect_text(image_paths):
    from google.cloud import vision
    import io
    client = vision.ImageAnnotatorClient()

    all_text = ''

    for image_path in image_paths:
        with io.open(image_path, 'rb') as image_file:
            content = image_file.read()

        image = vision.Image(content=content)

        # document_text_detectionを使用
        response = client.document_text_detection(image=image)
        # FullTextAnnotationを使用して文書全体のテキストを取得
        full_text_annotation = response.full_text_annotation

        # テキストの抽出
        all_text += full_text_annotation.text

        if response.error.message:
            raise Exception(
                '{}\nFor more info on error messages, check: '
                'https://cloud.google.com/apis/design/errors'.format(
                    response.error.message))

    return all_text

とりあえず実行できるか試すので

# 例として実行
if __name__ == "__main__":
    image_paths = ["images/combined_image_20240802.jpg"]
    extracted_text = detect_text(image_paths)
    print(extracted_text)

この結果はそのままだとターミナル表示なので
テキストファイルに保存する

全文の中で検出成功しているのは
麻婆豆腐
キッチンタオル

全文は

8/
2
金曜日
本日限定!
とろける
バラエティパック
1000
T
スライスカード
サイズ
創品
創業祭特に
$10
ポイント
焼そば
うどん
$10
ポイント
129 168 159
プレーンヨーグル
ビヒダス
400
イチビキ
EE97
128
上級
ヒラス
-10
139円
創業祭特価
CRUNKY
Ghana
ON 161
198
213
30
128 98 9869
136
105円
645
8/2はおやつの日
BLACKENT
ADREN
おやつフェア カメ
強力小麦を
POTAH
198
64
191
78
B
カメリヤス
強力小麦粉」
298
321
ロリエ
きれいガード
ソフィ
$20
198
PARA 2139
税込74円]
かに
麻婆豆腐
168
8x4 パウダースプレー
88
181
ジョイコン
2
100-498
100 100
260 root
1,180
OPIE
ボディフィット
ガード
8
50
270
378
171
CARE
UF.O.
3
UFO
パワフル吸収
キッチンタオル
くらしりえね
ミネピア
「キッチンタオル
$149
10% 163
18 U
3759
8
300
$20
248
10272
$100
8398
ロール
ダブルメロン
38 萬
14952371
698
798
580
530
2.836
58
458
10 2.066
オフェルミン
2080
=.098
767
7=1080]
10 305P
TONA 415

となっているが
画像と重なっている文字は読み取りが苦手みたい
単純に解像度の問題かもしれないが

とりあえず
麻婆豆腐
キッチンタオル
はできたので
これをjsonファイルに書き込んでリストと一致するか実験する

settings.jsonの中身を

{
  "keywords": [  
    "麻婆豆腐",
    "キッチンタオル",
    "keyword3"
  ]
}

にする

次にキーワードと一致したもののみ変数に格納する
これをlineで送るようにする

とりあえずコードを変更

import json
settings_json = open('settings.json', 'r', encoding='utf_8')
settings = json.load(settings_json)

# OCR
def detect_text(image_paths):
    from google.cloud import vision
    import io
    client = vision.ImageAnnotatorClient()

    all_text = ''

    for image_path in image_paths:
        with io.open(image_path, 'rb') as image_file:
            content = image_file.read()

        image = vision.Image(content=content)

        # document_text_detectionを使用
        response = client.document_text_detection(image=image)
        # FullTextAnnotationを使用して文書全体のテキストを取得
        full_text_annotation = response.full_text_annotation

        # テキストの抽出
        all_text += full_text_annotation.text

        if response.error.message:
            raise Exception(
                '{}\nFor more info on error messages, check: '
                'https://cloud.google.com/apis/design/errors'.format(
                    response.error.message))

    return all_text

# キーワード検索
def search_words(all_text):
  hitwords = []
  for keyword in settings["keywords"]:
    if keyword in all_text:
      hitwords.append(keyword)

  return hitwords

# キーワードに引っかかったチラシ取得
def get_target_flyers(jpg_list):
  result = []
  for jpg_info in jpg_list:
    all_text = detect_text(jpg_info['image_paths'])
    hitwords = search_words(all_text)

    if len(hitwords) != 0:
      hit = jpg_info
      hit['hitwords'] = hitwords
      result.append(hit)

  return result


# 例として実行
if __name__ == "__main__":
    image_paths = ["images/combined_image_20240802.jpg"]
    extracted_text = detect_text(image_paths)
    print(extracted_text)

import json
from google.cloud import vision
import io

# 設定ファイルの読み込み
settings_json = open('settings.json', 'r', encoding='utf_8')
settings = json.load(settings_json)

# OCRで画像からテキストを抽出
def detect_text(image_paths):
    client = vision.ImageAnnotatorClient()

    all_text = ''

    for image_path in image_paths:
        with io.open(image_path, 'rb') as image_file:
            content = image_file.read()

        image = vision.Image(content=content)

        # document_text_detectionを使用して文書全体のテキストを取得
        response = client.document_text_detection(image=image)
        full_text_annotation = response.full_text_annotation

        # テキストの抽出
        all_text += full_text_annotation.text

        if response.error.message:
            raise Exception(
                '{}\nFor more info on error messages, check: '
                'https://cloud.google.com/apis/design/errors'.format(
                    response.error.message))

    return all_text

# キーワード検索
def search_words(all_text):
    hitwords = []
    for keyword in settings["keywords"]:
        if keyword in all_text:
            hitwords.append(keyword)

    return hitwords

# 例として実行
if __name__ == "__main__":
    image_paths = ["images/combined_image_20240802.jpg"]
    extracted_text = detect_text(image_paths)
    hitwords = search_words(extracted_text)
    
    # ヒットしたキーワードのみを表示
    if hitwords:
        print("マッチしたキーワード:", ", ".join(hitwords))
    else:
        print("マッチしたキーワードはありませんでした。")

に変えてみる

これで実行すると
マッチしたキーワード: 麻婆豆腐, キッチンタオル
となる

あとはキーワードにマッチした画像も一緒にLINEで送信したいので
ファイルパスを取得するようにする

そもそもの流れを復習すると
Gmailで最新のチラシのリンクを開く

日替のチラシがあるなら画像をダウンロードし統合して1つのファイルにする
clik_allget_image.py

OCRしてリストに一致しているものを取り出す
LINEで送信
line_notify.py

となっている

ただshufoo限定で店舗ごとにユニークアドレスとなっているのなら
Gmailから開く処理は不要となる

Cloud vision APIで文字列として商品情報が取得できるか試す

Cloud vision APIで文字列として商品情報が取得できるか試す

vim purchase_list.py

中身を

from google.cloud import vision
from PIL import Image
import io
import os

def resize_image_if_needed(image_path, max_size_mb=40):
    """Resize the image to half its original dimensions if it exceeds max_size_mb."""
    with open(image_path, "rb") as fb:
        image = Image.open(fb)
        image_io = io.BytesIO()
        image.save(image_io, format=image.format)
        image_size_mb = image_io.tell() / (1024 * 1024)
        
        if image_size_mb > max_size_mb:
            new_size = (image.width // 2, image.height // 2)
            resized_image = image.resize(new_size, Image.ANTIALIAS)
            
            resized_io = io.BytesIO()
            resized_image.save(resized_io, format=image.format)
            return resized_io.getvalue()
        
        return image_io.getvalue()

client = vision.ImageAnnotatorClient()

# image_path = "combined_image.jpg"
image_path = "image_0.jpg"

resized_image = resize_image_if_needed(image_path)

image = vision.Image(content=resized_image)

response = client.document_text_detection(image=image)
texts = response.text_annotations

if texts:
    print(texts[0].description)
else:
    print("No text detected.")

で実行したが

    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Request payload size exceeds the limit: 41943040 bytes.

となるため容量を40M以下になるようにする

from google.cloud import vision
from PIL import Image
import io

def compress_image(image_path, max_size_mb=40):
    """Compress an image to be under the specified size in megabytes."""
    with open(image_path, "rb") as fb:
        image = Image.open(fb)
        image_format = image.format
        image_io = io.BytesIO()
        
        # Try different quality settings to get under the size limit
        for quality in range(95, 10, -5):
            image_io.seek(0)
            image.save(image_io, format=image_format, quality=quality)
            size_mb = image_io.tell() / (1024 * 1024)
            if size_mb <= max_size_mb:
                break
        
        return image_io.getvalue()

client = vision.ImageAnnotatorClient()

image_path = "preprocessed_image.jpg"
compressed_image = compress_image(image_path)

image = vision.Image(content=compressed_image)

response = client.document_text_detection(image=image)
texts = response.text_annotations

if texts:
    print(texts[0].description)
else:
    print("No text detected.")

よくみたらファイル名が違っていた

杏林堂のチラシは
combined_image.jpg
だったので

from google.cloud import vision
from PIL import Image
import io
import os

def resize_image_if_needed(image_path, max_size_mb=40):
    """Resize the image to half its original dimensions if it exceeds max_size_mb."""
    with open(image_path, "rb") as fb:
        image = Image.open(fb)
        image_io = io.BytesIO()
        image.save(image_io, format=image.format)
        image_size_mb = image_io.tell() / (1024 * 1024)
        
        if image_size_mb > max_size_mb:
            new_size = (image.width // 2, image.height // 2)
            resized_image = image.resize(new_size, Image.ANTIALIAS)
            
            resized_io = io.BytesIO()
            resized_image.save(resized_io, format=image.format)
            return resized_io.getvalue()
        
        return image_io.getvalue()

client = vision.ImageAnnotatorClient()

image_path = "combined_image.jpg"
resized_image = resize_image_if_needed(image_path)

image = vision.Image(content=resized_image)

response = client.document_text_detection(image=image)
texts = response.text_annotations

if texts:
    print(texts[0].description)
else:
    print("No text detected.")

python purchase_list.py > purchase_list.txt 

で結果をテキストファイルに保存した

次に
https://github.com/yakipudding/flyer-ocr
のソース

スーパーのチラシをOCRしてSlackに通知したら便利だった
を参考に
OCRをかけて自分が狙っている商品が出たら通知が来るようにする

使う内容は

# OCR
def detect_text(image_paths):
  from google.cloud import vision
  import io
  client = vision.ImageAnnotatorClient()

  all_text = ''

  for image_path in image_paths:
    with io.open(image_path, 'rb') as image_file:
      content = image_file.read()

    image = vision.Image(content=content)

    response = client.text_detection(image=image)
    texts = response.text_annotations

    for text in texts:
      all_text += str(text.description)

    if response.error.message:
      raise Exception(
        '{}\nFor more info on error messages, check: '
        'https://cloud.google.com/apis/design/errors'.format(
          response.error.message))

  return all_text


OCR結果を変数に格納

そしてこの結果から指定したキーワードが存在するかチェック

# キーワード検索
def search_words(all_text):
  hitwords = []
  # ★任意のキーワード(商品名)を設定
  keywords = ['ヨーグルト', '若鶏もも肉']
  for keyword in keywords:
    if keyword in all_text:
      hitwords.append(keyword)

  return hitwords

これでリストに商品名を入れている

Slackに通知を送る
の部分をLine notifyで送る処理に変えればOK

def slack_notice(results):
  import slackweb
  slack = slackweb.Slack(url='★WebhookのURL')
  for result in results:
    text = f'{result["date"]} チラシ掲載商品:{",".join(result["hitwords"])}\n<{result["url"]}|チラシを見る>'
    slack.notify(text=text)

とりあえずコードを書いていく
商品リストを
settings.json
に記述する

これはチラシの文字列とマッチさせるため
とりあえずは杏林堂のチラシをOCRかけて一致するようなキーワードにする

まずチラシ統合画像がかぶるので
日付をファイル名につけるように変更

import time
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def open_link_in_safari(url):
    # Safariドライバーを使用してブラウザを起動
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def click_date_element(driver, base_xpath):
    try:
        # コンテナ内の日付要素を探してクリック
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        links = container.find_elements(By.XPATH, ".//a[contains(@title, '日替')]")

        for link in links:
            if '日替' in link.get_attribute('title'):
                link.click()
                print(f'Clicked on link with title: {link.get_attribute("title")}')
                time.sleep(3)  # クリックした後に3秒間待機
                return

        print('No link found with title containing: 日替')
    except Exception as e:
        print(f'Error clicking on element: {e}')

def get_images_from_container(driver, base_xpath):
    image_urls = []
    try:
        # コンテナ内の画像要素を探す
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            # 特定の条件に基づいて画像をフィルタリング
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    widths, heights = zip(*(img.size for img in images))

    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def main():
    url = 'https://www.shufoo.net/pntweb/shopDetail/860323/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    # 特定のリンクをクリックする
    base_xpath_click = '/html/body/div[1]/div[3]/div[1]/div/div[4]/div/div/div/div/div/div/ul'
    click_date_element(driver, base_xpath_click)
    
    # 画像を取得してダウンロードする
    base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]'
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付を取得してフォーマット
            current_date = datetime.now().strftime('%Y%m%d')
            output_path = f'/mnt/data/combined_image_{current_date}.jpg'
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

しかし

Clicked on link with title: 8/2 日替
Found image: https://ipqcache2.shufoo.net/c/2024/07/30/15607036918828/index/img/0_100_0.jpg
Found image: https://ipqcache2.shufoo.net/c/2024/07/30/15607036918828/index/img/0_100_1.jpg
Found image: https://ipqcache2.shufoo.net/c/2024/07/30/15607036918828/index/img/0_100_2.jpg
Found image: https://ipqcache2.shufoo.net/c/2024/07/30/15607036918828/index/img/0_100_3.jpg
Downloaded image_0.jpg
Downloaded image_1.jpg
Downloaded image_2.jpg
Downloaded image_3.jpg
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/store_adversting_list/clik_allget_image.py", line 107, in <module>
    main()
  File "/Users/snowpool/aw10s/store_adversting_list/clik_allget_image.py", line 104, in main
    merge_images(images, output_path)
  File "/Users/snowpool/aw10s/store_adversting_list/clik_allget_image.py", line 83, in merge_images
    combined_image.save(output_path)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/PIL/Image.py", line 2428, in save
    fp = builtins.open(filename, "w+b")
FileNotFoundError: [Errno 2] No such file or directory: '/mnt/data/combined_image_20240802.jpg'

となるので

import os

def main():
    url = 'https://www.shufoo.net/pntweb/shopDetail/860323/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    # 特定のリンクをクリックする
    base_xpath_click = '/html/body/div[1]/div[3]/div[1]/div/div[4]/div/div/div/div/div/div/ul'
    click_date_element(driver, base_xpath_click)
    
    # 画像を取得してダウンロードする
    base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]'
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付を取得してフォーマット
            current_date = datetime.now().strftime('%Y%m%d')
            # Update the path to a valid directory on your machine
            output_dir = os.path.expanduser('~/images')
            os.makedirs(output_dir, exist_ok=True)  # Ensure the directory exists
            output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg')
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

へコード変更

さっきの原因は
ローカルシステム上に/mnt/data/というディレクトリが存在しないため
解決策:
1. ディレクトリを作成:ローカルマシンにディレクトリを作成します(例: imagesというディレクトリをユーザーディレクトリ内に作成)。
2. output_pathを更新:スクリプトの中で画像を保存するパスを、この新しいディレクトリに設定します。

これでエラーはなくなるが
ホームディレクトリに画像が保存される
これだと分かりにくいので
カレントディレクトリにimagesフォルダを作成し
1つのファイルにまとめた統合画像ファイルを保存する

import os
import time
import requests
from PIL import Image
from io import BytesIO
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.safari.service import Service as SafariService
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from datetime import datetime

def open_link_in_safari(url):
    # Safariドライバーを使用してブラウザを起動
    service = SafariService()
    driver = webdriver.Safari(service=service)
    driver.get(url)
    time.sleep(3)  # リンクを開いた後に3秒間待機
    return driver

def click_date_element(driver, base_xpath):
    try:
        # コンテナ内の日付要素を探してクリック
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        links = container.find_elements(By.XPATH, ".//a[contains(@title, '日替')]")

        for link in links:
            if '日替' in link.get_attribute('title'):
                link.click()
                print(f'Clicked on link with title: {link.get_attribute("title")}')
                time.sleep(3)  # クリックした後に3秒間待機
                return

        print('No link found with title containing: 日替')
    except Exception as e:
        print(f'Error clicking on element: {e}')

def get_images_from_container(driver, base_xpath):
    image_urls = []
    try:
        # コンテナ内の画像要素を探す
        container = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, base_xpath))
        )
        images = container.find_elements(By.TAG_NAME, 'img')
        
        for img in images:
            src = img.get_attribute('src')
            # 特定の条件に基づいて画像をフィルタリング
            if 'index/img' in src:
                image_urls.append(src)
                print(f'Found image: {src}')
    except Exception as e:
        print(f'Error finding images: {e}')
    return image_urls

def download_images(image_urls):
    images = []
    for i, url in enumerate(image_urls):
        response = requests.get(url)
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
            images.append(image)
            print(f'Downloaded image_{i}.jpg')
        else:
            print(f'Failed to download {url}')
    return images

def merge_images(images, output_path):
    widths, heights = zip(*(img.size for img in images))

    total_height = sum(heights)
    max_width = max(widths)

    combined_image = Image.new('RGB', (max_width, total_height))

    y_offset = 0
    for img in images:
        combined_image.paste(img, (0, y_offset))
        y_offset += img.height

    combined_image.save(output_path)
    print(f'Saved combined image as {output_path}')

def main():
    url = 'https://www.shufoo.net/pntweb/shopDetail/860323/?cid=nmail_pc'
    driver = open_link_in_safari(url)
    # 特定のリンクをクリックする
    base_xpath_click = '/html/body/div[1]/div[3]/div[1]/div/div[4]/div/div/div/div/div/div/ul'
    click_date_element(driver, base_xpath_click)
    
    # 画像を取得してダウンロードする
    base_xpath_images = '/html/body/div[1]/div[3]/div[1]/div/div[2]/div[2]'
    image_urls = get_images_from_container(driver, base_xpath_images)
    driver.quit()
    
    if image_urls:
        images = download_images(image_urls)
        if images:
            # 現在の日付を取得してフォーマット
            current_date = datetime.now().strftime('%Y%m%d')
            # カレントディレクトリにimagesフォルダを作成
            output_dir = 'images'
            os.makedirs(output_dir, exist_ok=True)  # ディレクトリが存在しない場合は作成
            output_path = os.path.join(output_dir, f'combined_image_{current_date}.jpg')
            merge_images(images, output_path)

if __name__ == '__main__':
    main()

これでimagesの中に被らずに保存できる

次に文字列との一致だが
杏林堂のチラシで欲しいものがなかった

とりあえず適当に検出できた文字列をキーワードにして
一致したら
チラシの画像ファイルと文字列をlineで送るようにする