カレンダー読み上げのコード公開

カレンダー読み上げのコード公開

webcom_face_gcalendar
というリポジトリを作成する

Add a README file
にチェックを入れる

Description (optional)
はせっかくなので英語で解説を書いておく

OpenCVで顔の識別を行い
登録した人物の顔を認識したら
Google calendar API で今週の予定を取得し
Docker の voicevox で音声を生成し読み上げします

これを翻訳すると
Perform face identification with OpenCV
When the face of the registered person is recognized
Get this week’s schedule using Google calendar API
Generate and read audio with Docker’s voicevox

これをDescriptionに書いておく

Add .gitignore
はそのまま
.gitignore template: None
のままにしておく

ライセンスはMITにしておく

これでリポジトリを作成

次に

git clone git@github.com:Snowpooll/webcom_face_gcalendar.git

でリポジトリコピー

この時にSSHを使っているので
パスフレーズを入力する

cd webcom_face_gcalendar 

で移動し
ここへコードをコピーしていく

スマホからの写真を抽出できるようにするスクリプト

cp ../week_calendar_voice/resize_save.py .

ファイルサイズを調べるスクリプト

cp ../week_calendar_voice/file_info.py .

写真から顔のみ抽出するスクリプト

cp ../week_calendar_voice/generate_aligned_faces.py . 

切り出した顔のjpgファイルを読み顔の特徴量に変換するスクリプト

cp ../week_calendar_voice/generate_feature_dictionary.py . 

顔の得微量が近い人を検出するにはモデル

cp ../week_calendar_voice/face_recognizer_fast.onnx . 
cp ../week_calendar_voice/face_detection_yunet_2023mar.onnx . 

顔を期別した時にカレンダーを読み上げるメインスクリプト

cp ../week_calendar_voice/webcam_face_calendar.py . 

今週の残りの予定を取得し、音声ファイルを生成する

cp ../week_calendar_voice/calendar_audio_utils.py . 

Google Calendar API で予定を取得する

cp ../week_calendar_voice/calendar_utils.py . 

お知らせの音声

cp ../week_calendar_voice/notice.wav .

メンテの手間を省くため
Google calendar へ予定を追加するコードも別フォルダに入れておく

mkdir add_calendar
cd add_calendar 

こちらへGoogleカレンダー予定追加のためのスクリプトを保存する

未読GmailからGoogle calendar に追加するスクリプト

cp ../../week_calendar_voice/main6.py .
mv main6.py add_gmail_calendar.py 

GoogleDrive からGoogle calendar に追加するスクリプト

cp ../../week_calendar_voice/main4.py .
mv main4.py add_gdrive_calendar.py

和暦を西暦に変換するスクリプト

cp ../../week_calendar_voice/event_utils.py .

Google CalendarAPI で予定を追加するスクリプト

cp ../../week_calendar_voice/google_calendar_module.py .

Ollamaを使用してテキストから日時とイベントを抽出

cp ../../week_calendar_voice/ollama_module.py .

Gmailから本文を抽出する

cp ../../week_calendar_voice/gmail_utils.py .

Gmailのラベルを調べるスクリプト

cp ../../week_calendar_voice/label_gmail.py .

GoogleDrive の指定フォルダからPDFを取得し本文を抽出

cp ../../week_calendar_voice/drive_pdf_extractor.py .

GoogleDrive にフォルダを作成し、そこからPDFファイルを取得し内容を抽出

 cp ../../week_calendar_voice/g_drive/create_folder.py .

次に

touch requirements.txt

でインポートするライブラリ関連を書いておく

numpy
opencv-python
pytz
requests
google-auth
google-auth-oauthlib
google-auth-httplib2
google-api-python-client
playsound
pdfminer.six
python-dateutil
git add .
git commit -m "add code"

Gitignore を忘れていたので

vim .gitignore

内容を

token.json
credentials.json

を追加

再度 リポジトリ登録する

git add .
git commit -m "add gitignore”

あとは

git push -u origin main

で送信

OpenCVの顔の識別機能とカレンダー読み上げの組み合わせ

OpenCVの顔の識別機能とカレンダー読み上げの組み合わせ

まず顔の識別で自分の顔だった時に動作するように
カレンダー読み上げ機能をモジュールにする

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
from playsound import playsound

def main():
    creds = authenticate()
    audio_files = []  # 音声ファイルのリスト
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            # 音声ファイルは再生しない
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                filename = synthesize_speech(event_text)
                if filename:
                    audio_files.append(filename)  # 生成されたファイル名をリストに追加

        # 音声ファイルが存在する場合のみ notice.wav と各予定の音声を再生
        if audio_files:
            # notice.wavを最初に再生
            print("再生中: notice.wav")
            playsound("notice.wav")
            
            # 各予定の音声ファイルを再生
            for audio_file in audio_files:
                print(f"再生中: {audio_file}")
                playsound(audio_file)

if __name__ == '__main__':
    main()

をモジュールにして他のプログラムから呼び出せるようにしたい

touch calendar_audio_utils.py

でファイルを作成

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
from playsound import playsound

def get_weekly_schedule_with_audio(play_audio=False):
    """
    今週の残りの予定を取得し、音声ファイルを生成する関数。
    
    :param play_audio: 予定を音声で再生するかどうか(デフォルトは再生しない)
    :return: 今週の予定をテキスト形式で返すリスト
    """
    creds = authenticate()
    audio_files = []  # 音声ファイルのリスト
    event_texts = []  # 予定のテキストリスト

    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                event_texts.append(event_text)  # テキストをリストに追加
                print(event_text)

                # 音声ファイルを生成
                filename = synthesize_speech(event_text)
                if filename:
                    audio_files.append(filename)  # 生成されたファイル名をリストに追加

        # 音声を再生するオプションがTrueの場合にのみ、音声ファイルを再生
        if play_audio and audio_files:
            # notice.wavを最初に再生
            print("再生中: notice.wav")
            playsound("notice.wav")
            
            # 各予定の音声ファイルを再生
            for audio_file in audio_files:
                print(f"再生中: {audio_file}")
                playsound(audio_file)

    return event_texts

として保存

念のため動作するかチェック

vim testvoice.py  

でファイル作成

from calendar_audio_utils import get_weekly_schedule_with_audio

# 音声再生なしで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=False)
print(schedule)

# 音声再生ありで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=True)

保存したら

python testvoice.py

で実行

これで動作するのが確認できたので
次に顔の識別

以前使ったものを再利用する

Pixcel8で撮影したスマホの写真で顔データを作る場合には
元画像の1/4にする必要があるため
変換のため
resize_save.py
を作成したのでこれを使う

コードは

import cv2
import os
import argparse

def main():
    # コマンドライン引数を解析するパーサーを作成
    parser = argparse.ArgumentParser(description="Resize and save an image")
    parser.add_argument("image_path", help="Path to the image file")
    args = parser.parse_args()

    # 画像を読み込む
    image = cv2.imread(args.image_path)
    if image is None:
        print("画像が読み込めませんでした。")
        return

    # 画像の元の高さ、幅を取得
    height, width = image.shape[:2]

    # 新しい寸法を計算(元のサイズの1/4)
    new_width = width // 4
    new_height = height // 4

    # 画像をリサイズ
    resized_image = cv2.resize(image, (new_width, new_height))

    # 新しいファイル名を設定
    new_file_path = os.path.splitext(args.image_path)[0] + "_quarter.jpg"

    # リサイズした画像を保存
    cv2.imwrite(new_file_path, resized_image)
    print(f"リサイズされた画像が保存されました: {new_file_path}")

if __name__ == '__main__':
    main()

使用する時にはターミナルでコマンドで実行する

python resize_save.py PXL_20240612_091410912.jpg      

というようにファイルを指定すれば
実行後1/4サイズにした画像が作成される

ファイルサイズを調べるスクリプトも作成

import cv2
import os
import argparse

def main():
    # コマンドライン引数を解析するパーサーを作成
    parser = argparse.ArgumentParser(description="Display image properties")
    parser.add_argument("image_path", help="Path to the image file")
    args = parser.parse_args()

    # 画像を読み込む
    image = cv2.imread(args.image_path)
    if image is None:
        print("画像が読み込めませんでした。")
        return

    # 画像の高さ、幅、チャンネル数を取得
    height, width, channels = image.shape
    print(f"画像の幅: {width} ピクセル")
    print(f"画像の高さ: {height} ピクセル")
    print(f"色チャネル数: {channels}")

    # ファイルサイズを取得
    file_size = os.path.getsize(args.image_path)
    print(f"ファイルサイズ: {file_size} バイト")

if __name__ == '__main__':
    main()

これを

python file_info.py PXL_20240612_091410912_resized_resized.jpg 

というように実行すればサイズが表示される

画像の幅: 684 ピクセル
画像の高さ: 912 ピクセル
色チャネル数: 3
ファイルサイズ: 228769 バイト

この2つは自分以外の写真から登録画像を作るのに使うので

cp ../face_recog/file_info.py .
cp ../face_recog/resize_save.py .

でコピーしておく

次に
入力した写真から人の顔の部分を切り出して保存するプログラム

generate_aligned_faces.py

に写真のファイルを引数にして実行すれば個人ごとの顔写真ができる

これは
入力した写真から人の顔の部分を切り出して保存するプログラム

複数の人物が写っている場合は全員を切り出して face001.jpg , face002.jpg ・・・ と名前を付けて保存する
出力されたファイル名を 人の名前に変更しておくと後々便利です。 
face001.jpg → taro.jpg

python generate_aligned_faces.py image.jpg

とすれば
写真に写っている人の分だけファイルができる
そのファイル名を人の名前に変更する

つまり全て
face001.jpg
という感じで
Face00x.jpg
となっているので
写真ごとに名前を変える

これもコピーしておく

cp ../face_recog/generate_aligned_faces.py .

次に

 generate_feature_dictionary.py


切り出した顔のjpgファイルを読み込んで、顔の特徴量に変換する

例えば 顔写真 taro.jpg を入力すると 顔の特徴量 taro.npy が出力される
このnumpyファイルに各個人の顔の特徴量が128次元ベクトルに変換されて入っている

python generate_feature_dictionary.py face001.jpg
python generate_feature_dictionary.py face002.jpg

つまり
写真の人の分だけ実行すればOK

これもコピーしておく

 cp ../face_recog/generate_feature_dictionary.py .

次に顔の得微量が近い人を検出するにはモデルが必要なのでコピー

cp ../face_recog/face_recognizer_fast.onnx .
cp ../face_recog/face_detection_yunet_2023mar.onnx .

そして作成した自分の顔の得微量ファイルもコピーしておく

cp ../face_recog/satoru.* .

Webカメラから映った時に顔の識別をするので

cp ../face_recog/webcam_face_recognizer.py .

でコピー

念の為動作確認

python webcam_face_recognizer.py

で自分の顔を識別しているのがわかる

次にこの中で
顔認識した時に

from calendar_module import get_weekly_schedule_with_audio # 音声再生なしで予定を取得 
schedule = get_weekly_schedule_with_audio(play_audio=False) print(schedule) # 音声再生ありで予定を取得 schedule = get_weekly_schedule_with_audio(play_audio=True) 

を実行

また
毎回読み上げでは負荷がかかるため、次の呼び出しは12時間後になるように設定

顔を識別できたときに特定の関数を呼び出し、
呼び出しが12時間に1回のみになるように制限するためには、識別が成功した時間を記録し、
次に呼び出すタイミングを管理する

1. call_function_when_recognized: この関数が顔認識時に呼び出され、最後の呼び出し時間を記録します。
次に呼び出すまでの間隔が12時間経過していない場合は、新たに処理を実行しないようにしています。
2. THROTTLE_TIME: 12時間を秒単位(12 * 60 * 60)で設定しています。
3. last_called_time: この変数は最後に呼び出された時間を記録し、
次の呼び出しが12時間以内であれば、新たな処理をスキップします。

.wavファイルを毎回生成していると容量を圧迫するため

from calendar_audio_utils import get_weekly_schedule_with_audio

# 音声再生なしで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=False)
print(schedule)

# 音声再生ありで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=True)

の処理の後に notice.wavファイル以外の .wavファイルをすべて削除する

touch webcam_face_calendar.py

でファイルを作成

import os
import glob
import numpy as np
import cv2
import time
from calendar_module import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート

COSINE_THRESHOLD = 0.363
NORML2_THRESHOLD = 1.128

# 12時間(秒単位)
THROTTLE_TIME = 12 * 60 * 60
last_called_time = 0  # 最後に呼び出した時間を初期化

def match(recognizer, feature1, dictionary):
    for element in dictionary:
        user_id, feature2 = element
        score = recognizer.match(feature1, feature2, cv2.FaceRecognizerSF_FR_COSINE)
        if score > COSINE_THRESHOLD:
            return True, (user_id, score)
    return False, ("", 0.0)

def call_function_when_recognized(user_id):
    global last_called_time
    current_time = time.time()
    
    # 最後に呼び出してから12時間経過しているかを確認
    if current_time - last_called_time >= THROTTLE_TIME:
        print(f"認識されました: {user_id}")
        
        # 予定を音声再生なしで取得
        schedule = get_weekly_schedule_with_audio(play_audio=False)
        print("予定:", schedule)
        
        # 予定を音声再生ありで取得
        schedule = get_weekly_schedule_with_audio(play_audio=True)
        print("音声で再生される予定:", schedule)
        
        # notice.wavファイル以外の.wavファイルを削除
        cleanup_audio_files(exclude_file="notice.wav")
        
        # 最後に呼び出した時間を更新
        last_called_time = current_time
    else:
        print("まだ12時間経過していないため、次の呼び出しは行われません。")

def cleanup_audio_files(exclude_file):
    """指定された.wavファイル以外の.wavファイルを削除する関数"""
    directory = os.getcwd()  # 現在のディレクトリを取得
    wav_files = glob.glob(os.path.join(directory, "*.wav"))  # すべての.wavファイルを取得

    for wav_file in wav_files:
        if os.path.basename(wav_file) != exclude_file:
            try:
                os.remove(wav_file)  # 指定されたファイル以外を削除
                print(f"削除しました: {wav_file}")
            except OSError as e:
                print(f"ファイル削除エラー: {wav_file}, {e}")

def main():
    directory = os.path.dirname(__file__)
    capture = cv2.VideoCapture(0)  # Use the default camera

    if not capture.isOpened():
        print("Error: The webcam could not be opened.")
        return

    dictionary = []
    files = glob.glob(os.path.join(directory, "*.npy"))
    for file in files:
        feature = np.load(file)
        user_id = os.path.splitext(os.path.basename(file))[0]
        dictionary.append((user_id, feature))

    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = os.path.join(directory, "face_recognizer_fast.onnx")
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")

    while True:
        result, image = capture.read()
        if not result:
            print("Error: No image from webcam.")
            break

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Ensure image is in RGB

        height, width, _ = image.shape
        face_detector.setInputSize((width, height))

        result, faces = face_detector.detect(image)
        faces = faces if faces is not None else []

        for face in faces:
            aligned_face = face_recognizer.alignCrop(image, face)
            feature = face_recognizer.feature(aligned_face)

            result, user = match(face_recognizer, feature, dictionary)

            box = list(map(int, face[:4]))
            color = (0, 255, 0) if result else (0, 0, 255)
            thickness = 2
            cv2.rectangle(image, box, color, thickness, cv2.LINE_AA)

            id, score = user if result else ("unknown", 0.0)
            text = "{} ({:.2f})".format(id, score)
            position = (box[0], box[1] - 10)
            font = cv2.FONT_HERSHEY_SIMPLEX
            scale = 0.6
            cv2.putText(image, text, position, font, scale, color, thickness, cv2.LINE_AA)

            if result and id != "unknown":
                call_function_when_recognized(id)  # 顔が認識された時にカレンダーの予定取得を実行

        # 画像を表示する前にRGBからBGRに再変換
        cv2.imshow("face recognition", cv2.cvtColor(image, cv2.COLOR_RGB2BGR))

        key = cv2.waitKey(1)
        if key == ord('q'):
            break

    capture.release()
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main()

で実行

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/webcam_face_calendar.py", line 6, in <module>
    from calendar_module import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート
ImportError: cannot import name 'get_weekly_schedule_with_audio' from 'calendar_module' (/Users/snowpool/aw10s/week_calendar_voice/calendar_module.py)

となる

これはChatGPTで作成した時のモジュールのエラー
結構あることでライブラリのインポートを間違えたり削除下入りしている

from calendar_module import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート

に変更すれば解決

起動はしたけど、このままだとOpenCVで画面描画するので
これは不要なので非表示にする
これをしないとリモート環境などで動作しない

v2.imshow()やキーボードの操作に関する部分を削除し、
無限ループで顔認識を行うコードに修正

cv2.VideoCapture の映像確認が不要な場合は、その部分を省略しても動作する

ということで
画面表示とキー入力待機を削除

import os
import glob
import numpy as np
import cv2
import time
from calendar_audio_utils import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート

COSINE_THRESHOLD = 0.363
NORML2_THRESHOLD = 1.128

# 12時間(秒単位)
THROTTLE_TIME = 12 * 60 * 60
last_called_time = 0  # 最後に呼び出した時間を初期化

def match(recognizer, feature1, dictionary):
    for element in dictionary:
        user_id, feature2 = element
        score = recognizer.match(feature1, feature2, cv2.FaceRecognizerSF_FR_COSINE)
        if score > COSINE_THRESHOLD:
            return True, (user_id, score)
    return False, ("", 0.0)

def call_function_when_recognized(user_id):
    global last_called_time
    current_time = time.time()
    
    # 最後に呼び出してから12時間経過しているかを確認
    if current_time - last_called_time >= THROTTLE_TIME:
        print(f"認識されました: {user_id}")
        
        # 予定を音声再生なしで取得
        schedule = get_weekly_schedule_with_audio(play_audio=False)
        print("予定:", schedule)
        
        # 予定を音声再生ありで取得
        schedule = get_weekly_schedule_with_audio(play_audio=True)
        print("音声で再生される予定:", schedule)
        
        # notice.wavファイル以外の.wavファイルを削除
        cleanup_audio_files(exclude_file="notice.wav")
        
        # 最後に呼び出した時間を更新
        last_called_time = current_time
    else:
        print("まだ12時間経過していないため、次の呼び出しは行われません。")

def cleanup_audio_files(exclude_file):
    """指定された.wavファイル以外の.wavファイルを削除する関数"""
    directory = os.getcwd()  # 現在のディレクトリを取得
    wav_files = glob.glob(os.path.join(directory, "*.wav"))  # すべての.wavファイルを取得

    for wav_file in wav_files:
        if os.path.basename(wav_file) != exclude_file:
            try:
                os.remove(wav_file)  # 指定されたファイル以外を削除
                print(f"削除しました: {wav_file}")
            except OSError as e:
                print(f"ファイル削除エラー: {wav_file}, {e}")

def main():
    directory = os.path.dirname(__file__)
    capture = cv2.VideoCapture(0)  # Use the default camera

    if not capture.isOpened():
        print("Error: The webcam could not be opened.")
        return

    dictionary = []
    files = glob.glob(os.path.join(directory, "*.npy"))
    for file in files:
        feature = np.load(file)
        user_id = os.path.splitext(os.path.basename(file))[0]
        dictionary.append((user_id, feature))

    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = os.path.join(directory, "face_recognizer_fast.onnx")
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")

    while True:
        result, image = capture.read()
        if not result:
            print("Error: No image from webcam.")
            break

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Ensure image is in RGB

        height, width, _ = image.shape
        face_detector.setInputSize((width, height))

        result, faces = face_detector.detect(image)
        faces = faces if faces is not None else []

        for face in faces:
            aligned_face = face_recognizer.alignCrop(image, face)
            feature = face_recognizer.feature(aligned_face)

            result, user = match(face_recognizer, feature, dictionary)

            if result and user[0] != "unknown":
                call_function_when_recognized(user[0])  # 顔が認識された時にカレンダーの予定取得を実行

        # 適当な待機時間を設けてリソースの使用を抑える
        time.sleep(1)

    capture.release()

if __name__ == '__main__':
    main()

というコードに変更

これでwebカメラの画面描画はなくなり
停止手段は ctrl + c で停止となる

実際に動かしたけど
M1macbookAir 16GB で
顔認識してからGoogle カレンダーを読み込み
Voicevox で音声ファイルを生成し、予定を読み上げるまで一分かかる

Docker ではなくインストールタイプにしたり
マシンスペックを上げれば短縮できるかもしれない

Gmailから抽出した本文をOllamaに渡しGoogle カレンダーに登録する

Gmailから抽出した本文をOllamaに渡しGoogle カレンダーに登録する

touch main6.py

でファイルを作成

Gmailから抽出したメッセージの本文を、
PDFのテキスト抽出と同様にOllamaに渡すようにコードを変更

Gmailからの本文取得:
gmail_get_latest_unread_message_bodyを使用して、Gmailの本文を取得

Ollamaに渡す:
取得した本文をOllamaに渡して解析

解析結果のフィルタリングとカレンダーへの追加:
PDFからのテキスト処理と同様に、抽出されたイベントをフィルタリングしてカレンダーに追加

以下は、Gmailから抽出した本文をOllamaに渡すようにした修正版コード

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
from event_utils import filter_events

# Gmail APIの初期化
service = gmail_init()

# 最新の未読メッセージの本文を取得
label_id = ''  # 取得するラベルID
message_body = gmail_get_latest_unread_message_body(service, label_id)

if message_body != "No unread messages found.":
    print("メール本文:")
    print(message_body)
    
    # Ollamaでメール本文を解析
    raw_events = parse_text_with_ollama(message_body, model_name='elyza:jp8b')
    
    # 抽出されたイベントを表示
    print("抽出されたイベント:")
    for event in raw_events:
        print(event)

    # イベントのフィルタリングとフォーマット
    events = filter_events(raw_events)

    # 有効なイベントを表示
    print("有効なイベント:")
    for event in events:
        print(event)

    # 有効なイベントがある場合のみGoogleカレンダーに追加
    if events:
        add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
    else:
        print("有効なイベントがありません。")
else:
    print("未読メッセージが見つかりませんでした。")

実行すると

「Request had insufficient authentication scopes」

これは、Google Calendar APIへのリクエストに対して適切な認証スコープが不足している

現在使用しているスコープがカレンダーイベントの追加(writeアクセス)を許可していないため

スコープに
https://www.googleapis.com/auth/calendar.events
または
https://www.googleapis.com/auth/calendar
を追加し、Googleカレンダーへの書き込み権限を付与する

gmail_utils.pyを修正する

# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    return body  # 本文のみを返す

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date

def gmail_init():
    creds = None
    token_path = '../mail_auto/token.json'  # token.jsonのパスを指定
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

を書き換えてみる

実験結果のログを見ると
Gmail関連の動作をさせるとエラーになる
このため、tokenが重複しているとエラーを起こす可能性があるのかもしれない

Gmailから取得し
Calendar書き込みするとエラーになり
その後はカレンダー機能を使う他のものも動作しなくなる

しかし一度token.jsonを削除し、再度認証すれば
カレンダー機能は使えるようになっている

とりあえず認証関連はこれで動くので
再度tokenを発行すれば解決するはず

先に顔認証したら週間予定を読み上げるのを作成する

gmailから本文を抽出しカレンダー書き込み

gmailから本文を抽出しカレンダー書き込み

PDFからはできるようになったので
次はGmailから本文を抽出しテキストとして取り出すようにする

以前、gmailの未読の最新1件を取得し
本文とURLを抽出するものを作成したので
このソースを変更し
本文を抽出するようにする

なお、実行前にラベルIDを調べる必要がある
Schoolというラベルを作成し
このIDを調べる

cd ../mail_auto
python base,py

を実行し
結果の中から
Schoolのものを探す

コードは

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

import base64, email #デコード用
import dateutil.parser

#token.jsonを設定
tokenPath = "token.json"

#credentials.jsonを設定
credentialsPath = "credentials.json"


#メール本文のデコード
def decode(encoded):
   decoded = base64.urlsafe_b64decode(encoded).decode()
   return decoded


#メール本文の内容を配列で取得する関数
def gmail_get_messages_body(service, labelIdsValue):
   mailBody = []
   
   # メッセージの一覧を取得
   messages = service.users().messages()
   msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute() 
   # msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定
   
   # 取得したメッセージの一覧を配列に格納
   for msg in msg_list['messages']:
       
       #メールの受信日時を取得
       date = gmail_get_messages_body_date(messages,msg)
       
       topid = msg['id']     
       msg = messages.get(userId='me', id=topid).execute()
       
       if(msg["payload"]["body"]["size"]!=0):
           mailBody.append(date+"<br>"+decode(msg["payload"]["body"]["data"])) 
       else:
           #メールによっては"parts"属性の中に本文がある場合もある
           mailBody.append(date+"<br>"+decode(msg["payload"]["parts"][0]["body"]["data"])) 

   return mailBody
   

#gmail_get_messages_body関数内で受信日時を取得する関数
def gmail_get_messages_body_date(messages,msg):
   msg_id = msg['id']
   m = messages.get(userId='me', id=msg_id, format='raw').execute()
   raw = base64.urlsafe_b64decode(m['raw'])
   
   # Emailを解析する
   eml = email.message_from_bytes(raw)
   
   date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
   return date

#ラベルのIDやnameを表示する関数
def gmail_display_label(service):
   results = service.users().labels().list(userId='me').execute()
   labels = results.get('labels', [])

   if not labels:
       print('No labels found.')
   else:
       print('Labels:')
       for label in labels:
           print(label)


#初期化(from quickstart.py)
def gmail_init():
   creds = None
   # The file token.json stores the user's access and refresh tokens, and is
   # created automatically when the authorization flow completes for the first
   # time.
   if os.path.exists(tokenPath):
       creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
   # If there are no (valid) credentials available, let the user log in.
   if not creds or not creds.valid:
       if creds and creds.expired and creds.refresh_token:
           creds.refresh(Request())
       else:
           flow = InstalledAppFlow.from_client_secrets_file(
               credentialsPath, SCOPES)
           creds = flow.run_local_server(port=0)
       # Save the credentials for the next run
       with open(tokenPath, 'w') as token:
           token.write(creds.to_json())
           
   service = build('gmail', 'v1', credentials=creds)
   return service


#ここからやりたい処理を書く
service = gmail_init()

#quickstart.pyと同じ処理にしてみた
results = service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])

if not labels:
   print('No labels found.')
else:
   print('Labels:')
   for label in labels:
       print(label['name'])
       
gmail_display_label(service)

grep を使い

python base,py| grep School
で探す

School
{'id': '', 'name': 'School', 'type': 'user'}

となったので
あとはこれを元にコードを書き換えていく

cp ../mail_voice/gmail_utils.py .

でコードをコピー

次にこのコードをURLを抽出せず本文のみ抽出するようにコード変更

# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    return body  # 本文のみを返す

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

次に main4.pyは Google Drive からPDFを取得し
抽出する機能だったので
新たに main5.pyを作成し
Gmail未読のものを対象にするようにしてみる

touch main5.py

GPTで生成したコードからラベルIDを変更する

# main2.py
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from ollama_module import parse_text_with_ollama

# Gmail APIの初期化
service = gmail_init()

# 最新の未読メッセージの本文を取得
label_id = ''  # 取得するラベルID
message_body = gmail_get_latest_unread_message_body(service, label_id)

if message_body != "No unread messages found.":
    print("メール本文:")
    print(message_body)
    
    # Ollamaでメール本文を解析
    events = parse_text_with_ollama(message_body, model_name='elyza:jp8b')
    
    # 抽出されたイベントを表示
    print("抽出されたイベント:")
    for event in events:
        print(event)
else:
    print("未読メッセージが見つかりませんでした。")

次にGmail でSchoolラベルの中から予定があるメールを試しに1つ未読にして実験する

結果

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/main5.py", line 6, in <module>
    service = gmail_init()
  File "/Users/snowpool/aw10s/week_calendar_voice/gmail_utils.py", line 52, in gmail_init
    creds.refresh(Request())
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

となる

これはtoken.jsonがあってないため
なので以前gmail関連のプロジェクトを作成したので
そちらを使うようにコード変更

# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    return body  # 本文のみを返す

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date

def gmail_init():
    creds = None
    token_path = '../mail_auto/token.json'  # token.jsonのパスを指定
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

再度実行するとイベントの抽出ができた

次にこれをカレンダーに書き込むようにする

日付形式の問題解決とイベント抽出とフィルタリング

日付形式の問題解決とイベント抽出とフィルタリング

レスポンスに含まれている日付形式が「令和6年10月26日」などの和暦表記や、「2024-10-08」などのISO表記と混在している

このような場合、統一的に処理するためのフォーマット変換を行う必要がある

from datetime import datetime
import re

def convert_japanese_date(japanese_date):
    # 和暦の例: "令和6年10月26日"
    pattern = r"令和(\d+)年(\d+)月(\d+)日"
    match = re.match(pattern, japanese_date)
    if match:
        year = int(match.group(1)) + 2018  # 令和元年は2019年に相当
        month = int(match.group(2))
        day = int(match.group(3))
        return f"{year}-{month:02d}-{day:02d}"
    # ISO形式のチェック
    try:
        datetime.strptime(japanese_date, '%Y-%m-%d')
        return japanese_date
    except ValueError:
        return None  # 無効な日付形式

また
日付形式の変換後、無効な日付やイベント名が空のエントリをフィルタリングするように変更

eventsリストから有効なエントリのみを抽出

def filter_events(events):
    valid_events = []
    for event in events:
        date = convert_japanese_date(event['date'])
        if date and event['event']:
            valid_events.append({'date': date, 'event': event['event']})
        else:
            print(f"無効な日付形式: {event['date']}")
    return valid_events

日付の変換やイベントのフィルタリングを他のスクリプトでも使うため
モジュール化する

event_utils.pyというモジュールを作成し
これに記述する

touch event_utils.py

内容は

from datetime import datetime
import re

def convert_japanese_date(japanese_date):
    pattern = r"令和(\d+)年(\d+)月(\d+)日"
    match = re.match(pattern, japanese_date)
    if match:
        year = int(match.group(1)) + 2018  # 令和元年は2019年に相当
        month = int(match.group(2))
        day = int(match.group(3))
        return f"{year}-{month:02d}-{day:02d}"
    # ISO形式のチェック
    try:
        datetime.strptime(japanese_date, '%Y-%m-%d')
        return japanese_date
    except ValueError:
        return None  # 無効な日付形式

def filter_events(events):
    valid_events = []
    for event in events:
        date = convert_japanese_date(event['date'])
        if date and event['event']:
            valid_events.append({'date': date, 'event': event['event']})
        else:
            print(f"無効な日付形式: {event['date']}")
    return valid_events

これをmain4.pyで
event_utils.pyをインポートして使用する

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
from event_utils import convert_japanese_date, filter_events

# フォルダIDを指定して処理を開始
folder_id = "" #Folder ID
texts = extract_texts_from_folder(folder_id)

if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        raw_events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
        print("抽出されたイベント:")
        for event in raw_events:
            print(event)

        # イベントのフィルタリングとフォーマット
        events = filter_events(raw_events)

        print("有効なイベント:")
        for event in events:
            print(event)

        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

これで
eventsリストからNoneのイベントを削除するフィルタリング処理が追加
Noneを削除したイベントのみをカレンダーに追加

events = [event for event in events if event['event'] is not None]

の部分がその処理

また
eventsリストが空でないかを確認し、有効なイベントがある場合のみカレンダーに追加
もしイベントがなければ、カレンダーには何も追加されず、
「有効なイベントがありません。」というメッセージが表示される

このチェックによって、無駄な処理が減り、空のイベントがカレンダーに追加されるのを防ぐ

if events:
    add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
else:
    print("有効なイベントがありません。")

これで実行すると
運動会のイベントが失敗している

これは
OllamaからのレスポンスがJSON形式として期待されているのに対し、
不正なJSON形式
(複数のJSONオブジェクトがコンマで区切られている形式や、余分なテキストが含まれている形式)
が返ってきていることが考えられます。

これを解決するために、
Ollamaのレスポンスを受け取る際に、
例外処理や、JSON以外の余計な部分を取り除く前処理を追加

レスポンスを前処理する関数を追加し、JSON形式に整形
JSON解析エラーの例外処理を追加して、不正な形式のデータをスキップ

llamaからのレスポンスを受け取って前処理し、JSON形式に変換するには

import json
import re

def parse_ollama_response(response_text):
    # 正規表現でJSON配列形式のみを抽出
    json_pattern = re.compile(r'\[.*?\]', re.DOTALL)
    match = json_pattern.search(response_text)
    if not match:
        print("JSON形式のレスポンスが見つかりません。")
        return []

    json_data = match.group(0)  # 最初のJSON形式の部分を取得

    try:
        events = json.loads(json_data)
        return events
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", str(e))
        return []

parse_ollama_response関数を
parse_text_with_ollama関数の後処理として使用し、Ollamaからのレスポンスを整形
これにより、Ollamaからの不正なレスポンス形式を取り除く

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
import json
import re

# OllamaからのレスポンスをJSON形式に変換
def parse_ollama_response(response_text):
    # JSON配列形式のみを抽出する正規表現
    json_pattern = re.compile(r'\[.*?\]', re.DOTALL)
    match = json_pattern.search(response_text)
    if not match:
        print("JSON形式のレスポンスが見つかりません。")
        return []

    json_data = match.group(0)  # 最初のJSON形式の部分を取得

    try:
        events = json.loads(json_data)
        return events
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", str(e))
        return []

# Google DriveのSchoolフォルダID
folder_id = "" 

# フォルダ内のPDFファイルからテキストを抽出
texts = extract_texts_from_folder(folder_id)

# テキストが抽出できているか確認
if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        # Ollamaでテキストを解析(モデル名を指定)
        raw_response = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
        
        # Ollamaのレスポンスを前処理してJSONに変換
        events = parse_ollama_response(raw_response)

        print("抽出されたイベント:")
        for event in events:
            print(event)

        # Noneのイベントを除外
        events = [event for event in events if event['event']]

        # フィルタリング後のイベントを表示
        print("有効なイベント:")
        for event in events:
            print(event)

        # 有効なイベントがある場合のみGoogleカレンダーに追加
        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

としたが

TypeError: expected string or bytes-like object というエラーは、
response_textのデータ型が文字列やバイト列ではない場合に発生

これは、response_textが予期せず辞書(dict)型や他の非文字列データ型である場合に起こることがよくある

response_textの型が文字列であることを確認し、
もし辞書型など別の型が返ってきている場合には、JSONエンコードして文字列に変換する処理を追加

レスポンスが辞書型か確認し、文字列に変換
parse_text_with_ollamaから返ってくるレスポンスが文字列であるかをチェックし、
もし辞書型など別の型であれば文字列に変換する処理を追加

型チェックを行う
parse_ollama_response関数内で
response_textが文字列であることをチェックし、文字列でない場合には変換

以下修正

import json
import re

def parse_ollama_response(response_text):
    # レスポンスが辞書型なら文字列に変換
    if isinstance(response_text, dict):
        response_text = json.dumps(response_text)  # 辞書をJSON文字列に変換
    elif not isinstance(response_text, str):
        print("不明なデータ形式のレスポンスを受け取りました:", type(response_text))
        return []
    
    # JSON配列形式のみを抽出する正規表現
    json_pattern = re.compile(r'\[.*?\]', re.DOTALL)
    match = json_pattern.search(response_text)
    if not match:
        print("JSON形式のレスポンスが見つかりません。")
        return []

    json_data = match.group(0)  # 最初のJSON形式の部分を取得

    try:
        events = json.loads(json_data)
        return events
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", str(e))
        return []

という処理を追加

response_textが辞書型かを確認します。辞書型の場合はjson.dumps()を使って文字列に変換

次に、正規表現を使用してレスポンス内のJSON配列形式を抽出

json.loadsを使って抽出した部分をJSONとしてパースし、
もしエラーが発生した場合にはエラーメッセージを表示

逆にエラーになった

このため一度

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
from event_utils import convert_japanese_date, filter_events

# フォルダIDを指定して処理を開始
folder_id = ""
texts = extract_texts_from_folder(folder_id)

if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        raw_events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
        print("抽出されたイベント:")
        for event in raw_events:
            print(event)

        # イベントのフィルタリングとフォーマット
        events = filter_events(raw_events)

        print("有効なイベント:")
        for event in events:
            print(event)

        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

に戻したら
となって無事にイベントが追加された

次は
メールから読み込んで実行できるようにする

GoogleDriveの指定フォルダからファイルを取得し処理のモジュール化と処理結果を ollamaへ渡す

GoogleDriveの指定フォルダからファイルを取得し処理のモジュール化と処理結果を ollamaへ渡す

Google DriveからPDFを取得し、テキストを抽出する機能をモジュールに分ける

drive_pdf_extractor.py

を作成する

touch drive_pdf_extractor.py

でファイルを作成し

import os
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from googleapiclient.http import MediaIoBaseDownload
from pdfminer.high_level import extract_text
from io import BytesIO

# Google Drive APIの認証
def authenticate_drive():
    SCOPES = ['https://www.googleapis.com/auth/drive']
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("認証トークンが見つかりません。認証を実行してください。")
        return None
    return build('drive', 'v3', credentials=creds)

# フォルダ内のPDFファイルリストを取得
def list_pdf_files_in_folder(service, folder_id):
    """Google Driveフォルダ内のPDFファイルのリストを取得します"""
    query = f"'{folder_id}' in parents and mimeType='application/pdf'"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    files = results.get('files', [])
    return files

# Google DriveからPDFファイルをダウンロード
def download_pdf_from_drive(service, file_id):
    """Google DriveからPDFファイルをダウンロードし、バイナリデータを返します"""
    request = service.files().get_media(fileId=file_id)
    file_data = BytesIO()
    downloader = MediaIoBaseDownload(file_data, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
        print(f"Download Progress: {int(status.progress() * 100)}%")
    file_data.seek(0)
    return file_data

# PDFからテキストを抽出
def extract_text_from_pdf(pdf_data):
    """PDFファイルデータからテキストを抽出します"""
    text = extract_text(pdf_data)
    return text

# 指定したフォルダ内のすべてのPDFファイルのテキストを抽出
def extract_texts_from_folder(folder_id):
    """フォルダ内のPDFファイルからテキストを抽出し、リストとして返します"""
    service = authenticate_drive()
    if not service:
        return []

    pdf_files = list_pdf_files_in_folder(service, folder_id)
    if not pdf_files:
        print("指定されたフォルダにPDFファイルが見つかりません。")
        return []

    texts = []
    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file['name']}")
        pdf_data = download_pdf_from_drive(service, pdf_file['id'])
        pdf_text = extract_text_from_pdf(pdf_data)
        if pdf_text:
            texts.append(pdf_text)
        else:
            print(f"{pdf_file['name']} からテキストを抽出できませんでした。")
    return texts

として保存

 touch main3.py

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar

# Google DriveのSchoolフォルダID
folder_id = ""

# フォルダ内のPDFファイルからテキストを抽出
texts = extract_texts_from_folder(folder_id)

# テキストが抽出できているか確認
if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        # Ollamaでテキストを解析(モデル名を指定)
        events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')

        # 抽出されたイベントを表示
        print("抽出されたイベント:")
        for event in events:
            print(event)

        # eventがNoneのものを削除
        events = [event for event in events if event['event'] is not None]

        # フィルタリング後のイベントを表示
        print("有効なイベント:")
        for event in events:
            print(event)

        # 有効なイベントがある場合のみGoogleカレンダーに追加
        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

として保存

しかし認証エラーになる

これは google drive API の時に作成した token.jsonが
Google calendar API の権限と異なるため
なので参照する token.json を変更

import os
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from googleapiclient.http import MediaIoBaseDownload
from pdfminer.high_level import extract_text
from io import BytesIO

# Google Drive APIの認証
def authenticate_drive():
    SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
    token_path = 'g_drive/token.json'  # token.jsonのパスを変更
    creds = None
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)
    else:
        print("認証トークンが見つかりません。認証を実行してください。")
        return None
    return build('drive', 'v3', credentials=creds)

# 他の関数も引き続き同様
# フォルダ内のPDFファイルリストを取得
def list_pdf_files_in_folder(service, folder_id):
    query = f"'{folder_id}' in parents and mimeType='application/pdf'"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    files = results.get('files', [])
    return files

# Google DriveからPDFファイルをダウンロード
def download_pdf_from_drive(service, file_id):
    request = service.files().get_media(fileId=file_id)
    file_data = BytesIO()
    downloader = MediaIoBaseDownload(file_data, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
        print(f"Download Progress: {int(status.progress() * 100)}%")
    file_data.seek(0)
    return file_data

# PDFからテキストを抽出
def extract_text_from_pdf(pdf_data):
    text = extract_text(pdf_data)
    return text

# 指定したフォルダ内のすべてのPDFファイルのテキストを抽出
def extract_texts_from_folder(folder_id):
    service = authenticate_drive()
    if not service:
        return []

    pdf_files = list_pdf_files_in_folder(service, folder_id)
    if not pdf_files:
        print("指定されたフォルダにPDFファイルが見つかりません。")
        return []

    texts = []
    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file['name']}")
        pdf_data = download_pdf_from_drive(service, pdf_file['id'])
        pdf_text = extract_text_from_pdf(pdf_data)
        if pdf_text:
            texts.append(pdf_text)
        else:
            print(f"{pdf_file['name']} からテキストを抽出できませんでした。")
    return texts

というように

drive_pdf_extractor.py

を修正

修正したのは

token_path = 'g_drive/token.json'  # token.jsonのパスを変更

というように参照先を変えた

これで
Main3.pyを実行したが

レスポンスに含まれている日付形式が「令和6年10月26日」などの和暦表記や、
「2024-10-08」などのISO表記と混在しているため

統一的に処理するためのフォーマット変換を行い
日付形式の変換後、無効な日付やイベント名が空のエントリをフィルタリングするように変更する必要がある

GoogleDriveの指定フォルダからファイルを取得し処理

GoogleDriveの指定フォルダからファイルを取得し処理

Google Drive API でフォルダのIDを取得できたので
次にここにpdfファイルを配置して
これを取得しテキストを抽出する

PyPDF2 と pdfminer.six はどちらも PDF ファイルを操作するための Python ライブラリ

PyPDF2 には PDF ファイルの結合、分割、ページの回転、ページの追加・削除など、PDF のページ操作機能が充実

pdfminer.six は PDF を読み取ってテキストデータを抽出することが主目的のため、ページ操作や編集の機能はない

PyPDF2 は比較的軽量で、簡単なテキスト抽出やページ操作には高速に動作しますが、PDFの内容解析が簡易的

pdfminer.six はテキスト抽出においては非常に詳細な処理を行うため、
特に長い PDF や複雑なレイアウトのファイルでは処理時間が長くなる

ということでpdfminer.sixを使いテキストを取り出す

そしてその処理結果を渡すというモジュールを作成する

Google Drive の
School フォルダの ID: ここからpdfファイルを取得し
pdfminer.sixを使ってPDFからテキストを抽出したい

Google Drive の School フォルダの ID:
ここからpdfファイルを取得し pdfminer.sixを使ってPDFからテキストを抽出したい
個人で使うので token.jsonを使ったコードにする
抽出したテキストを処理する関数に渡すようにするため 抽出したテキストをOllamaに送信は不要。

import os
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from googleapiclient.http import MediaIoBaseDownload
from pdfminer.high_level import extract_text
from io import BytesIO

# Google Drive APIの認証
def authenticate_drive():
    SCOPES = ['https://www.googleapis.com/auth/drive']
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("認証トークンが見つかりません。認証を実行してください。")
        return None
    return build('drive', 'v3', credentials=creds)

# フォルダ内のPDFファイルリストを取得
def list_pdf_files_in_folder(service, folder_id):
    """Google Driveフォルダ内のPDFファイルのリストを取得します"""
    query = f"'{folder_id}' in parents and mimeType='application/pdf'"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    files = results.get('files', [])
    return files

# Google DriveからPDFファイルをダウンロード
def download_pdf_from_drive(service, file_id):
    """Google DriveからPDFファイルをダウンロードし、バイナリデータを返します"""
    request = service.files().get_media(fileId=file_id)
    file_data = BytesIO()
    downloader = MediaIoBaseDownload(file_data, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
        print(f"Download Progress: {int(status.progress() * 100)}%")
    file_data.seek(0)
    return file_data

# PDFからテキストを抽出
def extract_text_from_pdf(pdf_data):
    """PDFファイルデータからテキストを抽出します"""
    text = extract_text(pdf_data)
    return text

# テキストを処理する関数
def process_text(text):
    """抽出したテキストを処理します"""
    print("Extracted Text:\n", text)
    # テキストの処理ロジックをここに追加します

# フォルダ内のPDFを処理
def process_pdfs_in_folder(folder_id):
    service = authenticate_drive()
    if not service:
        return

    pdf_files = list_pdf_files_in_folder(service, folder_id)
    if not pdf_files:
        print("指定されたフォルダにPDFファイルが見つかりません。")
        return

    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file['name']}")
        pdf_data = download_pdf_from_drive(service, pdf_file['id'])
        pdf_text = extract_text_from_pdf(pdf_data)
        if pdf_text:
            process_text(pdf_text)
        else:
            print(f"{pdf_file['name']} からテキストを抽出できませんでした。")

# フォルダIDを指定して処理を開始
folder_id = ""  # SchoolフォルダのID
process_pdfs_in_folder(folder_id)

これを

python get_pdf_read.py

で実行するとかなりの精度で取り出すことができた

次はこれをモジュールにしてollamaに処理結果を渡せるようにする

なおコード解説は
コードの説明
1. authenticate_drive関数: token.jsonを使用してGoogle Drive APIに認証し、Driveサービスを取得
2. list_pdf_files_in_folder関数: 指定したフォルダID内のPDFファイルをリスト化
3. download_pdf_from_drive関数: 各PDFファイルをダウンロードし、バイナリデータを返す
4. extract_text_from_pdf関数: pdfminer.sixを使ってPDFデータからテキストを抽出
5. process_text関数: 抽出したテキストを処理します。ここに任意の処理ロジックを追加
6. process_pdfs_in_folder関数: フォルダ内の全PDFファイルを処理し、各PDFのテキストを抽出後にprocess_text関数に渡す
これで、抽出されたテキストを直接処理する

なおGPTで生成したコードでは

SCOPES = ['https://www.googleapis.com/auth/drive.readonly']

というスコープだが
これだと認証エラーになる

    SCOPES = ['https://www.googleapis.com/auth/drive']

で回避可能

これは google カレンダーの時も同様で

SCOPES = ['https://www.googleapis.com/auth/calendar']

なら認証エラーにならない

Google Drive APIで新規フォルダ作成

Google Drive API を使い、my drive のなかに Schoolフォルダを作成したい
さらにSchoolフォルダのなかにReadフォルダを作成したい

ここへPDFファイルを置いて
このなかにファイルがあるならOCRしてカレンダーに予定を書き込み
処理終了したら ReadフォルダにPDFファイルを移動する

この時に同じフォルダがあると同じフォルダが複数できるらしいので
チェック機能を追加する

なので
Google Drive API を使用して My Drive 内に「School」フォルダを作成(または既存のものを使用)し、その中に「Read」フォルダを作成する

pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

で必要なライブラリを入れる

Google Cloud Console でプロジェクトを作成し、Google Drive API を有効化も忘れずに

from __future__ import print_function
import os.path
from google.oauth2 import credentials
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

# 認証と API クライアントのセットアップ
SCOPES = ['https://www.googleapis.com/auth/drive']

def main():
    """Google Drive API に接続し、フォルダを作成または取得します。"""
    creds = None
    # token.json はユーザーのアクセストークンとリフレッシュトークンを保存します。初回実行時に自動的に作成されます。
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    # 資格情報がないか、無効または期限切れの場合は再ログインします。
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            # ユーザーにブラウザで認証してもらいます。
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # 認証情報を保存します。
        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    # Drive API クライアントを作成
    service = build('drive', 'v3', credentials=creds)

    # 1. "School" フォルダを取得または作成
    school_folder_id = get_or_create_folder(service, 'School', parent_id=None)

    # 2. "Read" フォルダを "School" フォルダ内に作成
    read_folder_id = get_or_create_folder(service, 'Read', parent_id=school_folder_id)

    print('School フォルダの ID: %s' % school_folder_id)
    print('Read フォルダの ID: %s' % read_folder_id)

def get_or_create_folder(service, folder_name, parent_id=None):
    """
    フォルダを取得または作成します。

    Parameters:
        service: Drive API サービス インスタンス。
        folder_name (str): フォルダの名前。
        parent_id (str): 親フォルダの ID(省略可能)。

    Returns:
        str: フォルダの ID。
    """
    # フォルダを検索
    query = "name='{}' and mimeType='application/vnd.google-apps.folder' and trashed=false".format(folder_name)
    if parent_id:
        query += " and '{}' in parents".format(parent_id)

    response = service.files().list(
        q=query,
        spaces='drive',
        fields='files(id, name)',
    ).execute()
    files = response.get('files', [])

    if files:
        # 既存のフォルダが見つかった場合
        folder_id = files[0]['id']
        print('既存の "{}" フォルダの ID を使用します: {}'.format(folder_name, folder_id))
    else:
        # フォルダが存在しない場合、新規作成
        file_metadata = {
            'name': folder_name,
            'mimeType': 'application/vnd.google-apps.folder',
        }
        if parent_id:
            file_metadata['parents'] = [parent_id]

        file = service.files().create(body=file_metadata, fields='id').execute()
        folder_id = file.get('id')
        print('新規に "{}" フォルダを作成しました。ID: {}'.format(folder_name, folder_id))

    return folder_id

if __name__ == '__main__':
    main()

を実行したが

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/quickstart.py", line 59, in <module>
    main()
  File "/Users/snowpool/aw10s/week_calendar_voice/quickstart.py", line 26, in main
    creds.refresh(Request())
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

となる

認証関連でエラーになるので
一度検証

mkdir g_drive
cd g_drive 
cp ../credentials.json .
cp ../create_folder.py .

python create_folder.py 

で実行すると
認証画面になるので進めていく

これで Google Drive APIが使えて
新規フォルダ作成までできた

次はPDFファイルを読み込んでテキストを抽出する

それができたら
抽出した内容をollamaに渡してカレンダーに入れる

一度テキストファイルに書き出すかGPTで調べたが
内容の多さを基準にする場合、以下の点を目安に考えると良いでしょう。

### 1. **APIの入力制限**
– Ollama APIや多くのテキスト処理APIは、入力テキストの長さに制限があります。
一般的に**数千文字以内**であれば直接渡しても問題ないことが多いです。
– 確認しているAPIの制限が**4,000〜8,000文字**程度であれば、
1〜2ページ程度の短めのPDFであれば直接渡せます。
内容がこの文字数を超える場合は、分割するかファイルで管理した方が良いでしょう。

### 2. **テキストの内容**
– **数百文字〜数千文字程度**であれば、直接渡してもスムーズに処理できます。
一般的には、A4サイズのPDF1〜2ページ程度です。
– 内容が**5ページ以上**ある場合や、**10,000文字以上の大量のテキスト**になる場合は、
テキストファイルに保存し分割しながら処理した方が管理が簡単です。

### 3. **利用シナリオ**
– **リアルタイム性が重要**な場合:数千文字までで、
可能であれば直接APIに渡す方がシンプルです。
– **大規模なドキュメントや長文の内容を確認しながら処理したい**場合:
ファイルに書き出して保存し、分割して処理する方が確認がしやすくなります。

### 具体的な基準
– **3,000文字以下**:直接Ollamaに渡す
– **3,000〜10,000文字程度**:可能であれば直接渡すが、ファイルに書き出して保存も検討
– **10,000文字以上**:テキストファイルに書き出し、必要に応じて分割して処理

例えば、10,000文字はおおよそA4サイズのPDFファイルで5ページ前後に相当します。
この文字数を基準に、リアルタイム性や内容の確認のしやすさに応じて直接渡すか、
ファイルに保存してから渡すかを判断すると良いでしょう。

ということなので
PDFの処理結果を直接渡すことにする

WTIの価格を得る

WTIの価格を得る

pip install yfinance

でインストール後

import yfinance as yf

# リアルタイムのWTI価格を取得
wti = yf.Ticker("CL=F")
price = wti.history(period="1d")['Close'].iloc[-1]  # 最新の終値
print(f"Current WTI Price: ${price}")

とすると

[*********************100%***********************]  1 of 1 completed
                 Open       High        Low      Close  Adj Close  Volume
Date                                                                     
2023-01-03  80.570000  81.500000  76.599998  76.930000  76.930000  338520
2023-01-04  77.250000  77.419998  72.730003  72.839996  72.839996  352434
2023-01-05  73.250000  74.919998  72.459999  73.669998  73.669998  300731
2023-01-06  73.970001  75.470001  73.239998  73.769997  73.769997  258128
2023-01-09  73.470001  76.739998  73.470001  74.629997  74.629997  329290

と去年のものが出る

wti.history(period=”1d”) が1日分のデータではなく、
日次の複数日分のデータを取得しています。
これは、yfinanceが日次の過去データを返しているためです。

リアルタイムの価格を取得したい場合は、
次のように interval=”1m” を指定して、
最新の1分ごとのデータを取得することができます。
また、tail(1)で最新の行だけを取得するようにすると良い
とのことなのでコード変更

import yfinance as yf

# リアルタイムのWTI価格を取得(1分間隔の最新データ)
wti = yf.Ticker("CL=F")
data = wti.history(period="1d", interval="1m").tail(1)  # 最新の1分データ
price = data['Close'].iloc[-1]  # 最新の終値
print(f"Current WTI Price: ${price}")

これなら

Current WTI Price: $75.55999755859375

となる

1ヶ月間のデータを取得し折線グラフで表示
yfinance を使用して過去1ヶ月間の日次データを取得し、
matplotlib を使用して折れ線グラフを描画

import yfinance as yf
import matplotlib.pyplot as plt

# WTIデータを取得(1ヶ月間の日次データ)
wti = yf.Ticker("CL=F")
data = wti.history(period="1mo", interval="1d")

# 折れ線グラフの描画
plt.figure(figsize=(12, 6))
plt.plot(data.index, data['Close'], marker='o', linestyle='-', color='b')
plt.title('WTI Price Trend Over Last Month')
plt.xlabel('Date')
plt.ylabel('WTI Price ($)')
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

これで表示される

次にスクロール機能をつける

pip install plotly

の後

import yfinance as yf
import plotly.graph_objs as go

# WTIデータを取得(3年間の月次データ)
wti = yf.Ticker("CL=F")
data_3yr = wti.history(period="3y", interval="1mo")

# Plotlyグラフの作成
fig = go.Figure()

# 折れ線グラフを追加
fig.add_trace(go.Scatter(x=data_3yr.index, y=data_3yr['Close'], mode='lines+markers', name='WTI Price'))

# グラフのレイアウト設定
fig.update_layout(
    title='WTI Price Trend Over Last 3 Years',
    xaxis_title='Date',
    yaxis_title='WTI Price ($)',
    xaxis_rangeslider_visible=True,  # スクロールバーを追加
)

# グラフを表示
fig.show()

これだと出ない

1年にしてみる

import yfinance as yf
import plotly.graph_objs as go

# WTIデータを取得(1年間の月次データ)
wti = yf.Ticker("CL=F")
data_1yr = wti.history(period="1y", interval="1mo")

# Plotlyグラフの作成
fig = go.Figure()

# 折れ線グラフを追加
fig.add_trace(go.Scatter(x=data_1yr.index, y=data_1yr['Close'], mode='lines+markers', name='WTI Price'))

# グラフのレイアウト設定
fig.update_layout(
    title='WTI Price Trend Over Last 1 Year',
    xaxis_title='Date',
    yaxis_title='WTI Price ($)',
    xaxis_rangeslider_visible=True,  # スクロールバーを追加
)

# グラフを表示
fig.show()

これならOK

Googleカレンダーの読み上げ

Googleカレンダーの読み上げ

一週間の予定の取得はできたので
次はvoicevox で読み上げをする

今回も docker で起動して実験する

その前に

import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# カレンダーAPIのスコープ
SCOPES = ['https://www.googleapis.com/auth/calendar']

def main():
    """今週の残りのGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return

    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)

    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')

    # 現在の日時を取得
    now = datetime.datetime.now(tz)

    # 今週の終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)

    # time_minを現在の日時に設定
    time_min = now.isoformat()
    # time_maxは今週の終了日時
    time_max = end_of_week.isoformat()

    print(f"{time_min} から {time_max} までの予定を取得します。")

    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    if not events:
        print('今週の残りの予定はありません。')
    else:
        print('今週の残りの予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")

if __name__ == '__main__':
    main()

をモジュール化する

カレンダーの予定を取得する関数を別ファイルとして整理し、他のスクリプトからインポートできるようにする

touch calendar_utils.py

import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# カレンダーAPIのスコープ
SCOPES = ['https://www.googleapis.com/auth/calendar']

def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None

def get_upcoming_events(creds, days=7):
    """指定された期間内のGoogleカレンダーの予定を取得します。"""
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)

    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)

    # 開始日と終了日を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)

    # 開始日時と終了日時を設定
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()

    print(f"{time_min} から {time_max} までの予定を取得します。")

    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    return events

これで
calendar_utils.pyに、カレンダー認証を行うauthenticate関数と、
指定された期間の予定を取得するget_upcoming_events関数を作成

また

def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None


Noneを返した場合(トークンがない場合)や、予定がない場合の処理をする

次に
メインスクリプトからモジュールをインポートして使用

touch main_script.py

内容は

from calendar_utils import authenticate, get_upcoming_events

def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                print(f"{start} - {summary}")

if __name__ == '__main__':
    main()

これで

python main_script.py 
2024-10-11T00:06:41.572834+09:00 から 2024-10-14T00:06:41.572834+09:00 までの予定を取得します。
今週の残りの予定はありません。

というように同じ結果が出ればOK

次に voievox
これは以前作成したプロジェクトの中を参考に行う

まず

ssh -i .ssh/ubuntu22 snowpool@192.168.1.69

でログイン

docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-lates

で起動

次にコード変更
calendar_utils.pyに音声合成の関数を追加し、カレンダー予定をVoiceVox経由で音声ファイルとして保存

import os
import datetime
import pytz
import requests
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# カレンダーAPIのスコープ
SCOPES = ['https://www.googleapis.com/auth/calendar']
VOICEVOX_API_URL = "http://192.168.1.69:50021"  # VoiceVoxのAPIサーバーURL

def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None

def get_upcoming_events(creds, days=7):
    """指定された期間内のGoogleカレンダーの予定を取得します。"""
    service = build('calendar', 'v3', credentials=creds)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()

    print(f"{time_min} から {time_max} までの予定を取得します。")

    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    return events

def synthesize_speech(text, speaker=1):
    """VoiceVox APIを使って音声合成を行い、音声ファイルを生成します。"""
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
        else:
            print("音声の生成に失敗しました")
    else:
        print("クエリの作成に失敗しました")

次に

カレンダーの予定を取得し、各予定を音声に変換する処理をメインスクリプトに追加

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech

def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            synthesize_speech("今週の残りの予定はありません。")
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                event_text = f"{start} - {summary}"
                print(event_text)
                synthesize_speech(event_text)

if __name__ == '__main__':
    main()

実行すると音声ファイルが作成されるが
2024-10-11 – 診断書の取得
の場合はそのまm数字を読み上げるため
年月日に変換が必要
できれば曜日もほしい

Google Calendar APIでは、曜日自体を直接返す項目はありませんが、
予定の開始日時がISO形式の文字列として返されるので、
これをPythonで処理して曜日を取得することができます。
datetimeオブジェクトを使用すれば、APIから取得した日時を簡単に曜日に変換できます
とのこと

日時を変換する関数を追加し、
YYYY-MM-DD形式の日時を「YYYY年M月D日(曜日)」
のように整形してVoiceVoxで読み上げるようにする

calendar_utils.py

import os
import datetime
import pytz
import requests
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

SCOPES = ['https://www.googleapis.com/auth/calendar']
VOICEVOX_API_URL = "http://192.168.1.69:50021"  # VoiceVoxのAPIサーバーURL

def authenticate():
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None

def get_upcoming_events(creds, days=7):
    service = build('calendar', 'v3', credentials=creds)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()

    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    return events

def format_date_with_weekday(date_str):
    """ 日付文字列を「YYYY年M月D日(曜日)」形式に変換します """
    date_obj = datetime.datetime.fromisoformat(date_str)
    # 曜日を日本語で取得
    weekday = date_obj.strftime("%A")
    weekday_dict = {
        "Monday": "月",
        "Tuesday": "火",
        "Wednesday": "水",
        "Thursday": "木",
        "Friday": "金",
        "Saturday": "土",
        "Sunday": "日"
    }
    weekday_jp = weekday_dict.get(weekday, weekday)  # 日本語の曜日に変換
    formatted_date = date_obj.strftime(f"%Y年%m月%d日({weekday_jp})")
    return formatted_date

def synthesize_speech(text, speaker=1):
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
        else:
            print("音声の生成に失敗しました")
    else:
        print("クエリの作成に失敗しました")

として

main_script.py

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday

def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            synthesize_speech("今週の残りの予定はありません。")
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                synthesize_speech(event_text)

if __name__ == '__main__':
    main()

として保存

これで実行すると

2024年10月11日(金) - 診断書の取得

というように目的通りの音声が作成された

次は作成した音声を再生できるようにする
この時に今週の予定をお知らせします
という音声ファイルを再生するようにする

これは以前作成したものを使う

touch create_voice.py

import subprocess
import sys

def generate_and_play_audio_from_text(file_path, server_ip):
    # テキストファイルからテキストを読み込む
    with open(file_path, 'r') as file:
        text = file.read()

    # JSONファイルを作成するためのcurlコマンド
    command_json = [
        "curl", "-s", "-X", "POST",
        f"http://{server_ip}/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]

    # 音声ファイルを作成するためのcurlコマンド
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", f"http://{server_ip}/synthesis?speaker=1"
    ]

    # JSONファイルと音声ファイルを作成
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)

    # 音声ファイルを再生
    subprocess.run(["afplay", "audio_output.wav"])

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python script.py <file_path> <server_ip>")
        sys.exit(1)
    
    file_path = sys.argv[1]
    server_ip = sys.argv[2]
    generate_and_play_audio_from_text(file_path, server_ip)

として保存

touch voice.txt 

で中身を

今週の予定をお知らせします

として保存

python create_voice.py voice.txt 192.168.1.69:50021

とすれば

audio_output.wav

が作成される

これをnotice.wavにファイル名を変更する

そしてこれを再生するようにする

生成された音声ファイルを再生するには、
Pythonのsubprocessモジュールやplaysoundライブラリなどを利用する方法がある

pip install playsound

でインストール

一週間分の予定を表示した後、保存した音声ファイルを順番に再生する
音声ファイルのリストを作成: 各イベントの音声ファイルを生成した後、
そのファイル名をaudio_filesリストに追加

予定の表示後にファイルを再生:
audio_filesリストの各ファイルを順番に再生しています。
playsound関数を使って音声ファイルを再生し、全ての音声が順番に再生

これで、カレンダーの予定を表示した後に、順次生成された音声ファイルを再生

残りの予定がない場合に音声が再生されないように、
audio_filesリストにファイルが追加されているか確認する条件を追加します。
audio_filesが空でない場合のみ再生処理を行うように、コードを修正

予定がないときは音声ファイルを再生せず、予定がある場合のみリスト内の音声ファイルを再生

予定があり音声ファイルを再生する前に notice.wav を再生

このためには

calendar_utils.py

を修正する

# 音声合成の関数を修正して、生成されたファイル名を返すようにします
def synthesize_speech(text, speaker=1):
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
            return filename  # 生成されたファイル名を返す
        else:
            print("音声の生成に失敗しました")
            return None
    else:
        print("クエリの作成に失敗しました")
        return None

そして

main_script.py

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
from playsound import playsound

def main():
    creds = authenticate()
    audio_files = []  # 音声ファイルのリスト
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            # 音声ファイルは再生しない
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                filename = synthesize_speech(event_text)
                if filename:
                    audio_files.append(filename)  # 生成されたファイル名をリストに追加

        # 音声ファイルが存在する場合のみ notice.wav と各予定の音声を再生
        if audio_files:
            # notice.wavを最初に再生
            print("再生中: notice.wav")
            playsound("notice.wav")
            
            # 各予定の音声ファイルを再生
            for audio_file in audio_files:
                print(f"再生中: {audio_file}")
                playsound(audio_file)

if __name__ == '__main__':
    main()

として保存

これで実行すると

今週の予定をお知らせします
2024年10月11日(金) - 診断書の取得

というように読み上げてくれる