OpenCVの顔の識別機能とカレンダー読み上げの組み合わせ

OpenCVの顔の識別機能とカレンダー読み上げの組み合わせ

まず顔の識別で自分の顔だった時に動作するように
カレンダー読み上げ機能をモジュールにする

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
from playsound import playsound
 
def main():
    creds = authenticate()
    audio_files = []  # 音声ファイルのリスト
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            # 音声ファイルは再生しない
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                filename = synthesize_speech(event_text)
                if filename:
                    audio_files.append(filename)  # 生成されたファイル名をリストに追加
 
        # 音声ファイルが存在する場合のみ notice.wav と各予定の音声を再生
        if audio_files:
            # notice.wavを最初に再生
            print("再生中: notice.wav")
            playsound("notice.wav")
             
            # 各予定の音声ファイルを再生
            for audio_file in audio_files:
                print(f"再生中: {audio_file}")
                playsound(audio_file)
 
if __name__ == '__main__':
    main()

をモジュールにして他のプログラムから呼び出せるようにしたい

1
touch calendar_audio_utils.py

でファイルを作成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
from playsound import playsound
 
def get_weekly_schedule_with_audio(play_audio=False):
    """
    今週の残りの予定を取得し、音声ファイルを生成する関数。
     
    :param play_audio: 予定を音声で再生するかどうか(デフォルトは再生しない)
    :return: 今週の予定をテキスト形式で返すリスト
    """
    creds = authenticate()
    audio_files = []  # 音声ファイルのリスト
    event_texts = []  # 予定のテキストリスト
 
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                event_texts.append(event_text)  # テキストをリストに追加
                print(event_text)
 
                # 音声ファイルを生成
                filename = synthesize_speech(event_text)
                if filename:
                    audio_files.append(filename)  # 生成されたファイル名をリストに追加
 
        # 音声を再生するオプションがTrueの場合にのみ、音声ファイルを再生
        if play_audio and audio_files:
            # notice.wavを最初に再生
            print("再生中: notice.wav")
            playsound("notice.wav")
             
            # 各予定の音声ファイルを再生
            for audio_file in audio_files:
                print(f"再生中: {audio_file}")
                playsound(audio_file)
 
    return event_texts

として保存

念のため動作するかチェック

1
vim testvoice.py

でファイル作成

1
2
3
4
5
6
7
8
from calendar_audio_utils import get_weekly_schedule_with_audio
 
# 音声再生なしで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=False)
print(schedule)
 
# 音声再生ありで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=True)

保存したら

1
python testvoice.py

で実行

これで動作するのが確認できたので
次に顔の識別

以前使ったものを再利用する

Pixcel8で撮影したスマホの写真で顔データを作る場合には
元画像の1/4にする必要があるため
変換のため
resize_save.py
を作成したのでこれを使う

コードは

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import cv2
import os
import argparse
 
def main():
    # コマンドライン引数を解析するパーサーを作成
    parser = argparse.ArgumentParser(description="Resize and save an image")
    parser.add_argument("image_path", help="Path to the image file")
    args = parser.parse_args()
 
    # 画像を読み込む
    image = cv2.imread(args.image_path)
    if image is None:
        print("画像が読み込めませんでした。")
        return
 
    # 画像の元の高さ、幅を取得
    height, width = image.shape[:2]
 
    # 新しい寸法を計算(元のサイズの1/4)
    new_width = width // 4
    new_height = height // 4
 
    # 画像をリサイズ
    resized_image = cv2.resize(image, (new_width, new_height))
 
    # 新しいファイル名を設定
    new_file_path = os.path.splitext(args.image_path)[0] + "_quarter.jpg"
 
    # リサイズした画像を保存
    cv2.imwrite(new_file_path, resized_image)
    print(f"リサイズされた画像が保存されました: {new_file_path}")
 
if __name__ == '__main__':
    main()

使用する時にはターミナルでコマンドで実行する

1
python resize_save.py PXL_20240612_091410912.jpg

というようにファイルを指定すれば
実行後1/4サイズにした画像が作成される

ファイルサイズを調べるスクリプトも作成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import cv2
import os
import argparse
 
def main():
    # コマンドライン引数を解析するパーサーを作成
    parser = argparse.ArgumentParser(description="Display image properties")
    parser.add_argument("image_path", help="Path to the image file")
    args = parser.parse_args()
 
    # 画像を読み込む
    image = cv2.imread(args.image_path)
    if image is None:
        print("画像が読み込めませんでした。")
        return
 
    # 画像の高さ、幅、チャンネル数を取得
    height, width, channels = image.shape
    print(f"画像の幅: {width} ピクセル")
    print(f"画像の高さ: {height} ピクセル")
    print(f"色チャネル数: {channels}")
 
    # ファイルサイズを取得
    file_size = os.path.getsize(args.image_path)
    print(f"ファイルサイズ: {file_size} バイト")
 
if __name__ == '__main__':
    main()

これを

1
python file_info.py PXL_20240612_091410912_resized_resized.jpg

というように実行すればサイズが表示される

1
2
3
4
画像の幅: 684 ピクセル
画像の高さ: 912 ピクセル
色チャネル数: 3
ファイルサイズ: 228769 バイト

この2つは自分以外の写真から登録画像を作るのに使うので

1
2
cp ../face_recog/file_info.py .
cp ../face_recog/resize_save.py .

でコピーしておく

次に
入力した写真から人の顔の部分を切り出して保存するプログラム

1
generate_aligned_faces.py

に写真のファイルを引数にして実行すれば個人ごとの顔写真ができる

これは
入力した写真から人の顔の部分を切り出して保存するプログラム

複数の人物が写っている場合は全員を切り出して face001.jpg , face002.jpg ・・・ と名前を付けて保存する
出力されたファイル名を 人の名前に変更しておくと後々便利です。 
face001.jpg → taro.jpg

1
python generate_aligned_faces.py image.jpg

とすれば
写真に写っている人の分だけファイルができる
そのファイル名を人の名前に変更する

つまり全て
face001.jpg
という感じで
Face00x.jpg
となっているので
写真ごとに名前を変える

これもコピーしておく

1
cp ../face_recog/generate_aligned_faces.py .

次に

1
generate_feature_dictionary.py


切り出した顔のjpgファイルを読み込んで、顔の特徴量に変換する

例えば 顔写真 taro.jpg を入力すると 顔の特徴量 taro.npy が出力される
このnumpyファイルに各個人の顔の特徴量が128次元ベクトルに変換されて入っている

1
2
python generate_feature_dictionary.py face001.jpg
python generate_feature_dictionary.py face002.jpg

つまり
写真の人の分だけ実行すればOK

これもコピーしておく

1
cp ../face_recog/generate_feature_dictionary.py .

次に顔の得微量が近い人を検出するにはモデルが必要なのでコピー

1
2
cp ../face_recog/face_recognizer_fast.onnx .
cp ../face_recog/face_detection_yunet_2023mar.onnx .

そして作成した自分の顔の得微量ファイルもコピーしておく

1
cp ../face_recog/satoru.* .

Webカメラから映った時に顔の識別をするので

1
cp ../face_recog/webcam_face_recognizer.py .

でコピー

念の為動作確認

1
python webcam_face_recognizer.py

で自分の顔を識別しているのがわかる

次にこの中で
顔認識した時に

1
2
from calendar_module import get_weekly_schedule_with_audio # 音声再生なしで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=False) print(schedule) # 音声再生ありで予定を取得 schedule = get_weekly_schedule_with_audio(play_audio=True)

を実行

また
毎回読み上げでは負荷がかかるため、次の呼び出しは12時間後になるように設定

顔を識別できたときに特定の関数を呼び出し、
呼び出しが12時間に1回のみになるように制限するためには、識別が成功した時間を記録し、
次に呼び出すタイミングを管理する

1. call_function_when_recognized: この関数が顔認識時に呼び出され、最後の呼び出し時間を記録します。
次に呼び出すまでの間隔が12時間経過していない場合は、新たに処理を実行しないようにしています。
2. THROTTLE_TIME: 12時間を秒単位(12 * 60 * 60)で設定しています。
3. last_called_time: この変数は最後に呼び出された時間を記録し、
次の呼び出しが12時間以内であれば、新たな処理をスキップします。

.wavファイルを毎回生成していると容量を圧迫するため

1
2
3
4
5
6
7
8
from calendar_audio_utils import get_weekly_schedule_with_audio
 
# 音声再生なしで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=False)
print(schedule)
 
# 音声再生ありで予定を取得
schedule = get_weekly_schedule_with_audio(play_audio=True)

の処理の後に notice.wavファイル以外の .wavファイルをすべて削除する

1
touch webcam_face_calendar.py

でファイルを作成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import glob
import numpy as np
import cv2
import time
from calendar_module import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート
 
COSINE_THRESHOLD = 0.363
NORML2_THRESHOLD = 1.128
 
# 12時間(秒単位)
THROTTLE_TIME = 12 * 60 * 60
last_called_time = 0  # 最後に呼び出した時間を初期化
 
def match(recognizer, feature1, dictionary):
    for element in dictionary:
        user_id, feature2 = element
        score = recognizer.match(feature1, feature2, cv2.FaceRecognizerSF_FR_COSINE)
        if score > COSINE_THRESHOLD:
            return True, (user_id, score)
    return False, ("", 0.0)
 
def call_function_when_recognized(user_id):
    global last_called_time
    current_time = time.time()
     
    # 最後に呼び出してから12時間経過しているかを確認
    if current_time - last_called_time >= THROTTLE_TIME:
        print(f"認識されました: {user_id}")
         
        # 予定を音声再生なしで取得
        schedule = get_weekly_schedule_with_audio(play_audio=False)
        print("予定:", schedule)
         
        # 予定を音声再生ありで取得
        schedule = get_weekly_schedule_with_audio(play_audio=True)
        print("音声で再生される予定:", schedule)
         
        # notice.wavファイル以外の.wavファイルを削除
        cleanup_audio_files(exclude_file="notice.wav")
         
        # 最後に呼び出した時間を更新
        last_called_time = current_time
    else:
        print("まだ12時間経過していないため、次の呼び出しは行われません。")
 
def cleanup_audio_files(exclude_file):
    """指定された.wavファイル以外の.wavファイルを削除する関数"""
    directory = os.getcwd()  # 現在のディレクトリを取得
    wav_files = glob.glob(os.path.join(directory, "*.wav"))  # すべての.wavファイルを取得
 
    for wav_file in wav_files:
        if os.path.basename(wav_file) != exclude_file:
            try:
                os.remove(wav_file)  # 指定されたファイル以外を削除
                print(f"削除しました: {wav_file}")
            except OSError as e:
                print(f"ファイル削除エラー: {wav_file}, {e}")
 
def main():
    directory = os.path.dirname(__file__)
    capture = cv2.VideoCapture(0)  # Use the default camera
 
    if not capture.isOpened():
        print("Error: The webcam could not be opened.")
        return
 
    dictionary = []
    files = glob.glob(os.path.join(directory, "*.npy"))
    for file in files:
        feature = np.load(file)
        user_id = os.path.splitext(os.path.basename(file))[0]
        dictionary.append((user_id, feature))
 
    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = os.path.join(directory, "face_recognizer_fast.onnx")
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")
 
    while True:
        result, image = capture.read()
        if not result:
            print("Error: No image from webcam.")
            break
 
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Ensure image is in RGB
 
        height, width, _ = image.shape
        face_detector.setInputSize((width, height))
 
        result, faces = face_detector.detect(image)
        faces = faces if faces is not None else []
 
        for face in faces:
            aligned_face = face_recognizer.alignCrop(image, face)
            feature = face_recognizer.feature(aligned_face)
 
            result, user = match(face_recognizer, feature, dictionary)
 
            box = list(map(int, face[:4]))
            color = (0, 255, 0) if result else (0, 0, 255)
            thickness = 2
            cv2.rectangle(image, box, color, thickness, cv2.LINE_AA)
 
            id, score = user if result else ("unknown", 0.0)
            text = "{} ({:.2f})".format(id, score)
            position = (box[0], box[1] - 10)
            font = cv2.FONT_HERSHEY_SIMPLEX
            scale = 0.6
            cv2.putText(image, text, position, font, scale, color, thickness, cv2.LINE_AA)
 
            if result and id != "unknown":
                call_function_when_recognized(id# 顔が認識された時にカレンダーの予定取得を実行
 
        # 画像を表示する前にRGBからBGRに再変換
        cv2.imshow("face recognition", cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
 
        key = cv2.waitKey(1)
        if key == ord('q'):
            break
 
    capture.release()
    cv2.destroyAllWindows()
 
if __name__ == '__main__':
    main()

で実行

1
2
3
4
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/webcam_face_calendar.py", line 6, in <module>
    from calendar_module import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート
ImportError: cannot import name 'get_weekly_schedule_with_audio' from 'calendar_module' (/Users/snowpool/aw10s/week_calendar_voice/calendar_module.py)

となる

これはChatGPTで作成した時のモジュールのエラー
結構あることでライブラリのインポートを間違えたり削除下入りしている

1
from calendar_module import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート

に変更すれば解決

起動はしたけど、このままだとOpenCVで画面描画するので
これは不要なので非表示にする
これをしないとリモート環境などで動作しない

v2.imshow()やキーボードの操作に関する部分を削除し、
無限ループで顔認識を行うコードに修正

cv2.VideoCapture の映像確認が不要な場合は、その部分を省略しても動作する

ということで
画面表示とキー入力待機を削除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import os
import glob
import numpy as np
import cv2
import time
from calendar_audio_utils import get_weekly_schedule_with_audio  # カレンダーから予定を取得するためのインポート
 
COSINE_THRESHOLD = 0.363
NORML2_THRESHOLD = 1.128
 
# 12時間(秒単位)
THROTTLE_TIME = 12 * 60 * 60
last_called_time = 0  # 最後に呼び出した時間を初期化
 
def match(recognizer, feature1, dictionary):
    for element in dictionary:
        user_id, feature2 = element
        score = recognizer.match(feature1, feature2, cv2.FaceRecognizerSF_FR_COSINE)
        if score > COSINE_THRESHOLD:
            return True, (user_id, score)
    return False, ("", 0.0)
 
def call_function_when_recognized(user_id):
    global last_called_time
    current_time = time.time()
     
    # 最後に呼び出してから12時間経過しているかを確認
    if current_time - last_called_time >= THROTTLE_TIME:
        print(f"認識されました: {user_id}")
         
        # 予定を音声再生なしで取得
        schedule = get_weekly_schedule_with_audio(play_audio=False)
        print("予定:", schedule)
         
        # 予定を音声再生ありで取得
        schedule = get_weekly_schedule_with_audio(play_audio=True)
        print("音声で再生される予定:", schedule)
         
        # notice.wavファイル以外の.wavファイルを削除
        cleanup_audio_files(exclude_file="notice.wav")
         
        # 最後に呼び出した時間を更新
        last_called_time = current_time
    else:
        print("まだ12時間経過していないため、次の呼び出しは行われません。")
 
def cleanup_audio_files(exclude_file):
    """指定された.wavファイル以外の.wavファイルを削除する関数"""
    directory = os.getcwd()  # 現在のディレクトリを取得
    wav_files = glob.glob(os.path.join(directory, "*.wav"))  # すべての.wavファイルを取得
 
    for wav_file in wav_files:
        if os.path.basename(wav_file) != exclude_file:
            try:
                os.remove(wav_file)  # 指定されたファイル以外を削除
                print(f"削除しました: {wav_file}")
            except OSError as e:
                print(f"ファイル削除エラー: {wav_file}, {e}")
 
def main():
    directory = os.path.dirname(__file__)
    capture = cv2.VideoCapture(0)  # Use the default camera
 
    if not capture.isOpened():
        print("Error: The webcam could not be opened.")
        return
 
    dictionary = []
    files = glob.glob(os.path.join(directory, "*.npy"))
    for file in files:
        feature = np.load(file)
        user_id = os.path.splitext(os.path.basename(file))[0]
        dictionary.append((user_id, feature))
 
    weights = os.path.join(directory, "face_detection_yunet_2023mar.onnx")
    face_detector = cv2.FaceDetectorYN_create(weights, "", (0, 0))
    weights = os.path.join(directory, "face_recognizer_fast.onnx")
    face_recognizer = cv2.FaceRecognizerSF_create(weights, "")
 
    while True:
        result, image = capture.read()
        if not result:
            print("Error: No image from webcam.")
            break
 
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Ensure image is in RGB
 
        height, width, _ = image.shape
        face_detector.setInputSize((width, height))
 
        result, faces = face_detector.detect(image)
        faces = faces if faces is not None else []
 
        for face in faces:
            aligned_face = face_recognizer.alignCrop(image, face)
            feature = face_recognizer.feature(aligned_face)
 
            result, user = match(face_recognizer, feature, dictionary)
 
            if result and user[0] != "unknown":
                call_function_when_recognized(user[0])  # 顔が認識された時にカレンダーの予定取得を実行
 
        # 適当な待機時間を設けてリソースの使用を抑える
        time.sleep(1)
 
    capture.release()
 
if __name__ == '__main__':
    main()

というコードに変更

これでwebカメラの画面描画はなくなり
停止手段は ctrl + c で停止となる

実際に動かしたけど
M1macbookAir 16GB で
顔認識してからGoogle カレンダーを読み込み
Voicevox で音声ファイルを生成し、予定を読み上げるまで一分かかる

Docker ではなくインストールタイプにしたり
マシンスペックを上げれば短縮できるかもしれない

Googleカレンダーの読み上げ

Googleカレンダーの読み上げ

一週間の予定の取得はできたので
次はvoicevox で読み上げをする

今回も docker で起動して実験する

その前に

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ
 
def main():
    """今週の残りのGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return
 
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)
 
    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
 
    # 現在の日時を取得
    now = datetime.datetime.now(tz)
 
    # 今週の終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)
 
    # time_minを現在の日時に設定
    time_min = now.isoformat()
    # time_maxは今週の終了日時
    time_max = end_of_week.isoformat()
 
    print(f"{time_min} から {time_max} までの予定を取得します。")
 
    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    if not events:
        print('今週の残りの予定はありません。')
    else:
        print('今週の残りの予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")
 
if __name__ == '__main__':
    main()

をモジュール化する

カレンダーの予定を取得する関数を別ファイルとして整理し、他のスクリプトからインポートできるようにする

1
touch calendar_utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ
 
def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None
 
def get_upcoming_events(creds, days=7):
    """指定された期間内のGoogleカレンダーの予定を取得します。"""
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)
 
    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)
 
    # 開始日と終了日を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)
 
    # 開始日時と終了日時を設定
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()
 
    print(f"{time_min} から {time_max} までの予定を取得します。")
 
    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    return events

これで
calendar_utils.pyに、カレンダー認証を行うauthenticate関数と、
指定された期間の予定を取得するget_upcoming_events関数を作成

また

1
2
3
4
5
6
7
8
def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None


Noneを返した場合(トークンがない場合)や、予定がない場合の処理をする

次に
メインスクリプトからモジュールをインポートして使用

1
touch main_script.py

内容は

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from calendar_utils import authenticate, get_upcoming_events
 
def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                print(f"{start} - {summary}")
 
if __name__ == '__main__':
    main()

これで

1
2
3
python main_script.py
2024-10-11T00:06:41.572834+09:00 から 2024-10-14T00:06:41.572834+09:00 までの予定を取得します。
今週の残りの予定はありません。

というように同じ結果が出ればOK

次に voievox
これは以前作成したプロジェクトの中を参考に行う

まず

1
ssh -i .ssh/ubuntu22 snowpool@192.168.1.69

でログイン

1
docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-lates

で起動

次にコード変更
calendar_utils.pyに音声合成の関数を追加し、カレンダー予定をVoiceVox経由で音声ファイルとして保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
import datetime
import pytz
import requests
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ
VOICEVOX_API_URL = "http://192.168.1.69:50021"  # VoiceVoxのAPIサーバーURL
 
def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None
 
def get_upcoming_events(creds, days=7):
    """指定された期間内のGoogleカレンダーの予定を取得します。"""
    service = build('calendar', 'v3', credentials=creds)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()
 
    print(f"{time_min} から {time_max} までの予定を取得します。")
 
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    return events
 
def synthesize_speech(text, speaker=1):
    """VoiceVox APIを使って音声合成を行い、音声ファイルを生成します。"""
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
        else:
            print("音声の生成に失敗しました")
    else:
        print("クエリの作成に失敗しました")

次に

カレンダーの予定を取得し、各予定を音声に変換する処理をメインスクリプトに追加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from calendar_utils import authenticate, get_upcoming_events, synthesize_speech
 
def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            synthesize_speech("今週の残りの予定はありません。")
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                event_text = f"{start} - {summary}"
                print(event_text)
                synthesize_speech(event_text)
 
if __name__ == '__main__':
    main()

実行すると音声ファイルが作成されるが
2024-10-11 – 診断書の取得
の場合はそのまm数字を読み上げるため
年月日に変換が必要
できれば曜日もほしい

Google Calendar APIでは、曜日自体を直接返す項目はありませんが、
予定の開始日時がISO形式の文字列として返されるので、
これをPythonで処理して曜日を取得することができます。
datetimeオブジェクトを使用すれば、APIから取得した日時を簡単に曜日に変換できます
とのこと

日時を変換する関数を追加し、
YYYY-MM-DD形式の日時を「YYYY年M月D日(曜日)」
のように整形してVoiceVoxで読み上げるようにする

1
calendar_utils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os
import datetime
import pytz
import requests
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
VOICEVOX_API_URL = "http://192.168.1.69:50021"  # VoiceVoxのAPIサーバーURL
 
def authenticate():
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None
 
def get_upcoming_events(creds, days=7):
    service = build('calendar', 'v3', credentials=creds)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()
 
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    return events
 
def format_date_with_weekday(date_str):
    """ 日付文字列を「YYYY年M月D日(曜日)」形式に変換します """
    date_obj = datetime.datetime.fromisoformat(date_str)
    # 曜日を日本語で取得
    weekday = date_obj.strftime("%A")
    weekday_dict = {
        "Monday": "月",
        "Tuesday": "火",
        "Wednesday": "水",
        "Thursday": "木",
        "Friday": "金",
        "Saturday": "土",
        "Sunday": "日"
    }
    weekday_jp = weekday_dict.get(weekday, weekday)  # 日本語の曜日に変換
    formatted_date = date_obj.strftime(f"%Y年%m月%d日({weekday_jp})")
    return formatted_date
 
def synthesize_speech(text, speaker=1):
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
        else:
            print("音声の生成に失敗しました")
    else:
        print("クエリの作成に失敗しました")

として

1
main_script.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
 
def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            synthesize_speech("今週の残りの予定はありません。")
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                synthesize_speech(event_text)
 
if __name__ == '__main__':
    main()

として保存

これで実行すると

1
2024年10月11日(金) - 診断書の取得

というように目的通りの音声が作成された

次は作成した音声を再生できるようにする
この時に今週の予定をお知らせします
という音声ファイルを再生するようにする

これは以前作成したものを使う

1
touch create_voice.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import subprocess
import sys
 
def generate_and_play_audio_from_text(file_path, server_ip):
    # テキストファイルからテキストを読み込む
    with open(file_path, 'r') as file:
        text = file.read()
 
    # JSONファイルを作成するためのcurlコマンド
    command_json = [
        "curl", "-s", "-X", "POST",
        f"http://{server_ip}/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]
 
    # 音声ファイルを作成するためのcurlコマンド
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", f"http://{server_ip}/synthesis?speaker=1"
    ]
 
    # JSONファイルと音声ファイルを作成
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)
 
    # 音声ファイルを再生
    subprocess.run(["afplay", "audio_output.wav"])
 
if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python script.py <file_path> <server_ip>")
        sys.exit(1)
     
    file_path = sys.argv[1]
    server_ip = sys.argv[2]
    generate_and_play_audio_from_text(file_path, server_ip)

として保存

1
touch voice.txt

で中身を

1
今週の予定をお知らせします

として保存

1
python create_voice.py voice.txt 192.168.1.69:50021

とすれば

1
audio_output.wav

が作成される

これをnotice.wavにファイル名を変更する

そしてこれを再生するようにする

生成された音声ファイルを再生するには、
Pythonのsubprocessモジュールやplaysoundライブラリなどを利用する方法がある

1
pip install playsound

でインストール

一週間分の予定を表示した後、保存した音声ファイルを順番に再生する
音声ファイルのリストを作成: 各イベントの音声ファイルを生成した後、
そのファイル名をaudio_filesリストに追加

予定の表示後にファイルを再生:
audio_filesリストの各ファイルを順番に再生しています。
playsound関数を使って音声ファイルを再生し、全ての音声が順番に再生

これで、カレンダーの予定を表示した後に、順次生成された音声ファイルを再生

残りの予定がない場合に音声が再生されないように、
audio_filesリストにファイルが追加されているか確認する条件を追加します。
audio_filesが空でない場合のみ再生処理を行うように、コードを修正

予定がないときは音声ファイルを再生せず、予定がある場合のみリスト内の音声ファイルを再生

予定があり音声ファイルを再生する前に notice.wav を再生

このためには

1
calendar_utils.py

を修正する

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 音声合成の関数を修正して、生成されたファイル名を返すようにします
def synthesize_speech(text, speaker=1):
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
            return filename  # 生成されたファイル名を返す
        else:
            print("音声の生成に失敗しました")
            return None
    else:
        print("クエリの作成に失敗しました")
        return None

そして

1
main_script.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
from playsound import playsound
 
def main():
    creds = authenticate()
    audio_files = []  # 音声ファイルのリスト
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            # 音声ファイルは再生しない
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                filename = synthesize_speech(event_text)
                if filename:
                    audio_files.append(filename)  # 生成されたファイル名をリストに追加
 
        # 音声ファイルが存在する場合のみ notice.wav と各予定の音声を再生
        if audio_files:
            # notice.wavを最初に再生
            print("再生中: notice.wav")
            playsound("notice.wav")
             
            # 各予定の音声ファイルを再生
            for audio_file in audio_files:
                print(f"再生中: {audio_file}")
                playsound(audio_file)
 
if __name__ == '__main__':
    main()

として保存

これで実行すると

1
2
今週の予定をお知らせします
2024年10月11日(金) - 診断書の取得

というように読み上げてくれる

Google Calendarの週間予定の取得

週間予定の取得

実行環境
M1 MacbookAir 16GB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ
 
def main():
    """今週の残りのGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return
 
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)
 
    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
 
    # 現在の日時を取得
    now = datetime.datetime.now(tz)
 
    # 今週の終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)
 
    # time_minを現在の日時に設定
    time_min = now.isoformat()
    # time_maxは今週の終了日時
    time_max = end_of_week.isoformat()
 
    print(f"{time_min} から {time_max} までの予定を取得します。")
 
    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    if not events:
        print('今週の残りの予定はありません。')
    else:
        print('今週の残りの予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")
 
if __name__ == '__main__':
    main()

を実行したが

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2024-10-09T01:28:10.960984+09:00 から 2024-10-14T01:28:10.960984+09:00 までの予定を取得します。
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/get_week.py", line 60, in <module>
    main()
  File "/Users/snowpool/aw10s/week_calendar_voice/get_week.py", line 47, in main
    ).execute()
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/googleapiclient/http.py", line 923, in execute
    resp, content = _retry_request(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/googleapiclient/http.py", line 191, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google_auth_httplib2.py", line 209, in request
    self.credentials.before_request(self._request, method, uri, request_headers)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/auth/credentials.py", line 135, in before_request
    self.refresh(request)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

となる

Chatgpt では

となっているが

というようにスコープを変更したら解決した

これはカレンダー追加がこのスコープで動作するのに
取得の場合はコードが違うので試したら動いた

テキストファイルを読み込み ollamaでGoogleカレンダーに送れる形式にする

テキストファイルを読み込み ollamaでGoogleカレンダーに送れる形式にする

PDFからテキストと抽出するので

1
pip install pdf2image pytesseract

Tesseract OCRのインストール

1
brew install tesseract

日本語言語データを追加

1
brew install tesseract-lang

とりあえずここまではOK

以前PDFの内容の取り出しはしたことがあるので
肝心の文章から予定を ollamaで取り出しを行う

テキストから日時とイベント情報を抽出します。今回のテキストでは、日付が「10月7日~10月15日」や「10月25日(金)」のように記載されています。これらを正規表現とdateparserライブラリで解析する

1
pip install dateparser

PDFの前に
テキストファイルの読み込み
テキストをOllamaで解析し、日時と予定を抽出
GoogleカレンダーAPIを使って予定を追加
を行うようにする

これはメールでお知らせすることがあるため

1
2
3
4
5
6
7
8
def read_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text
 
# 使用例
text_file_path = 'school_notice.txt'  # テキストファイルのパス
text_content = read_text_file(text_file_path)

でテキストファイルを読み込む

Ollamaを使用してテキストを解析し、日時とイベント情報を抽出
PythonからOllamaを呼び出す
Ollamaがローカルで動作している前提で、requestsライブラリを使用してHTTPリクエストを送信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import requests
import json
 
def parse_text_with_ollama(text):
    # OllamaのAPIエンドポイント
    ollama_url = 'http://localhost:11434/generate'
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日時とそれに対応する予定を抽出してください。結果はJSON形式で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
"""
 
    payload = {
        'model': 'your-ollama-model-name'# 使用するOllamaモデルの名前
        'prompt': prompt
    }
 
    response = requests.post(ollama_url, json=payload)
    response_text = response.text
 
    # Ollamaの出力からJSON部分を抽出
    try:
        start_index = response_text.index('[')
        end_index = response_text.rindex(']') + 1
        json_str = response_text[start_index:end_index]
        events = json.loads(json_str)
    except (ValueError, json.JSONDecodeError) as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", e)
        events = []
 
    return events
 
# 使用例
events = parse_text_with_ollama(text_content)
print("抽出されたイベント:", events)
 
 
抽出されたイベントをGoogleカレンダーに追加
import os
from datetime import datetime, timedelta
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
def add_events_to_calendar(events):
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("token.json が見つかりません。認証を実行してください。")
        return
 
    service = build('calendar', 'v3', credentials=creds)
 
    for event in events:
        # 日付の形式を確認し、必要に応じて変換
        try:
            event_date = datetime.strptime(event['date'], '%Y-%m-%d')
        except ValueError:
            print(f"無効な日付形式: {event['date']}")
            continue
 
        event_body = {
            'summary': event['event'],
            'start': {
                'date': event_date.strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
            'end': {
                'date': (event_date + timedelta(days=1)).strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
        }
 
        # イベントをカレンダーに追加
        created_event = service.events().insert(calendarId='primary', body=event_body).execute()
        print(f"イベントが作成されました: {created_event.get('htmlLink')}")
 
# 使用例
add_events_to_calendar(events)

これらを

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
import requests
import json
from datetime import datetime, timedelta
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
def read_text_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text
 
def parse_text_with_ollama(text):
    # OllamaのAPIエンドポイント
    ollama_url = 'http://localhost:11434/generate'
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日時とそれに対応する予定を抽出してください。結果はJSON形式で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
"""
 
    payload = {
        'model': 'your-ollama-model-name'# 使用するOllamaモデルの名前
        'prompt': prompt
    }
 
    response = requests.post(ollama_url, json=payload)
    response_text = response.text
 
    # Ollamaの出力からJSON部分を抽出
    try:
        start_index = response_text.index('[')
        end_index = response_text.rindex(']') + 1
        json_str = response_text[start_index:end_index]
        events = json.loads(json_str)
    except (ValueError, json.JSONDecodeError) as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", e)
        events = []
 
    return events
 
def add_events_to_calendar(events):
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("token.json が見つかりません。認証を実行してください。")
        return
 
    service = build('calendar', 'v3', credentials=creds)
 
    for event in events:
        # 日付の形式を確認し、必要に応じて変換
        try:
            event_date = datetime.strptime(event['date'], '%Y-%m-%d')
        except ValueError:
            print(f"無効な日付形式: {event['date']}")
            continue
 
        event_body = {
            'summary': event['event'],
            'start': {
                'date': event_date.strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
            'end': {
                'date': (event_date + timedelta(days=1)).strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
        }
 
        # イベントをカレンダーに追加
        created_event = service.events().insert(calendarId='primary', body=event_body).execute()
        print(f"イベントが作成されました: {created_event.get('htmlLink')}")
 
# メインの実行部分
text_file_path = 'school_notice.txt'  # テキストファイルのパス
text_content = read_text_file(text_file_path)
events = parse_text_with_ollama(text_content)
print("抽出されたイベント:", events)
add_events_to_calendar(events)

というように1つにすることもできるが
検証と後で他でも使えるように
モジュールにして他でも使えるようにしたい

1
calendar_module.py

1
2
3
read_text_file(file_path)
parse_text_with_ollama(text, model_name)
add_events_to_calendar(events, calendar_id='primary', token_file='token.json')

を入れる

1
touch calendar_module.py

でファイルを作成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
import requests
import json
from datetime import datetime, timedelta
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
def read_text_file(file_path):
    """テキストファイルを読み込み、その内容を文字列として返します。"""
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text
 
def parse_text_with_ollama(text, model_name='your-ollama-model-name'):
    """
    Ollamaを使用してテキストから日時とイベントを抽出します。
 
    Args:
        text (str): 解析するテキスト。
        model_name (str): 使用するOllamaモデルの名前。
 
    Returns:
        list: 抽出されたイベントのリスト。
    """
    # OllamaのAPIエンドポイント
    ollama_url = 'http://localhost:11434/generate'
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日時とそれに対応する予定を抽出してください。結果はJSON形式で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
"""
 
    payload = {
        'model': model_name,
        'prompt': prompt
    }
 
    response = requests.post(ollama_url, json=payload)
    response_text = response.text
 
    # Ollamaの出力からJSON部分を抽出
    try:
        start_index = response_text.index('[')
        end_index = response_text.rindex(']') + 1
        json_str = response_text[start_index:end_index]
        events = json.loads(json_str)
    except (ValueError, json.JSONDecodeError) as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", e)
        events = []
 
    return events
 
def add_events_to_calendar(events, calendar_id='primary', token_file='token.json'):
    """
    抽出されたイベントをGoogleカレンダーに追加します。
 
    Args:
        events (list): イベントのリスト。
        calendar_id (str): イベントを追加するカレンダーのID。
        token_file (str): 認証トークンファイルのパス。
    """
    creds = None
    if os.path.exists(token_file):
        creds = Credentials.from_authorized_user_file(token_file, SCOPES)
    else:
        print(f"{token_file} が見つかりません。認証を実行してください。")
        return
 
    service = build('calendar', 'v3', credentials=creds)
 
    for event in events:
        # 日付の形式を確認し、必要に応じて変換
        try:
            event_date = datetime.strptime(event['date'], '%Y-%m-%d')
        except ValueError:
            print(f"無効な日付形式: {event['date']}")
            continue
 
        event_body = {
            'summary': event['event'],
            'start': {
                'date': event_date.strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
            'end': {
                'date': (event_date + timedelta(days=1)).strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
        }
 
        # イベントをカレンダーに追加
        created_event = service.events().insert(calendarId=calendar_id, body=event_body).execute()
        print(f"イベントが作成されました: {created_event.get('htmlLink')}")

として保存

モジュールを作成したら、別のスクリプトからインポートして使用

1
touch main.py

でファイルを作成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from calendar_module import read_text_file, parse_text_with_ollama, add_events_to_calendar
 
# テキストファイルのパス
text_file_path = 'school_notice.txt'  # 処理するテキストファイルのパス
 
# テキストの読み込み
text_content = read_text_file(text_file_path)
 
# Ollamaでテキストを解析(モデル名を指定)
events = parse_text_with_ollama(text_content, model_name='your-ollama-model-name')
 
# 抽出されたイベントを表示
print("抽出されたイベント:", events)
 
# Googleカレンダーにイベントを追加
add_events_to_calendar(events, calendar_id='primary', token_file='token.json')

とする

ただし今回はテストなので
とりあえずカレンダーにイベントを追加する部分はコメントアウトし
まずは Ollamaの結果を見る

またモデルには
parse_text_with_ollama関数内のモデル名をelyzaに変更
model_name引数のデフォルト値をelyza:jp8bに設定

ということで

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os
import requests
import json
from datetime import datetime, timedelta
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
def read_text_file(file_path):
    """テキストファイルを読み込み、その内容を文字列として返します。"""
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text
 
def parse_text_with_ollama(text, model_name='elyza:jp8b'):
    """
    Ollamaを使用してテキストから日時とイベントを抽出します。
 
    Args:
        text (str): 解析するテキスト。
        model_name (str): 使用するOllamaモデルの名前(デフォルトは 'elyza:jp8b')。
 
    Returns:
        list: 抽出されたイベントのリスト。
    """
    # OllamaのAPIエンドポイント
    ollama_url = 'http://localhost:11434/generate'
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日時とそれに対応する予定を抽出してください。結果はJSON形式で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
"""
 
    payload = {
        'model': model_name,
        'prompt': prompt
    }
 
    response = requests.post(ollama_url, json=payload)
    response_text = response.text
 
    # Ollamaの出力からJSON部分を抽出
    try:
        # レスポンスをJSONとして直接パース
        events = json.loads(response_text.strip())
    except json.JSONDecodeError:
        try:
            # JSON部分のみを抽出
            start_index = response_text.index('[')
            end_index = response_text.rindex(']') + 1
            json_str = response_text[start_index:end_index]
            events = json.loads(json_str)
        except (ValueError, json.JSONDecodeError) as e:
            print("Ollamaからのレスポンスの解析に失敗しました:", e)
            events = []
 
    return events
 
def add_events_to_calendar(events, calendar_id='primary', token_file='token.json'):
    """
    抽出されたイベントをGoogleカレンダーに追加します。
 
    Args:
        events (list): イベントのリスト。
        calendar_id (str): イベントを追加するカレンダーのID。
        token_file (str): 認証トークンファイルのパス。
    """
    creds = None
    if os.path.exists(token_file):
        creds = Credentials.from_authorized_user_file(token_file, SCOPES)
    else:
        print(f"{token_file} が見つかりません。認証を実行してください。")
        return
 
    service = build('calendar', 'v3', credentials=creds)
 
    for event in events:
        # 日付の形式を確認し、必要に応じて変換
        try:
            event_date = datetime.strptime(event['date'], '%Y-%m-%d')
        except ValueError:
            print(f"無効な日付形式: {event['date']}")
            continue
 
        event_body = {
            'summary': event['event'],
            'start': {
                'date': event_date.strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
            'end': {
                'date': (event_date + timedelta(days=1)).strftime('%Y-%m-%d'),
                'timeZone': 'Asia/Tokyo',
            },
        }
 
        # イベントをカレンダーに追加
        created_event = service.events().insert(calendarId=calendar_id, body=event_body).execute()
        print(f"イベントが作成されました: {created_event.get('htmlLink')}")

というように

1
calendar_module.py

を変更

またmain.pyを

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from calendar_module import read_text_file, parse_text_with_ollama, add_events_to_calendar
 
# テキストファイルのパス
text_file_path = 'school_notice.txt'  # 処理するテキストファイルのパス
 
# テキストの読み込み
text_content = read_text_file(text_file_path)
 
# Ollamaでテキストを解析(モデル名を指定)
events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
 
# 抽出されたイベントを表示
print("抽出されたイベント:", events)
 
# Googleカレンダーにイベントを追加
#add_events_to_calendar(events, calendar_id='primary', token_file='token.json')

としておく

1
touch school_notice.txt

でテキストを
学校からのお知らせ内容をテキストファイルにして実験

しかし

1
2
Ollamaからのレスポンスの解析に失敗しました: substring not found
抽出されたイベント: []

となってしまう

機能を分割して問題を探す

以前のコードを見たらリクエストURLが

1
response = requests.post("http://localhost:11434/api/generate",

なので

に修正

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import requests
import json
 
def parse_text_with_ollama(text, model_name='elyza/jp8b'):
    """
    Ollamaを使用してテキストから日時とイベントを抽出します。
 
    Args:
        text (str): 解析するテキスト。
        model_name (str): 使用するOllamaモデルの名前(デフォルトは 'elyza/jp8b')。
 
    Returns:
        list: 抽出されたイベントのリスト。
    """
    # OllamaのAPIエンドポイント(修正)
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日時とそれに対応する予定を抽出してください。結果はJSON形式で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
"""
 
    payload = {
        'model': model_name,
        'prompt': prompt
    }
 
    try:
        response = requests.post(ollama_url, json=payload)
        response.raise_for_status()
        # レスポンスをJSONとしてパース
        response_json = response.json()
        # テキスト部分を取得
        response_text = response_json.get('response', '')
    except requests.exceptions.RequestException as e:
        print("Ollamaへのリクエストに失敗しました:", e)
        return []
    except json.JSONDecodeError as e:
        print("OllamaからのレスポンスがJSON形式ではありません:", e)
        return []
 
    # Ollamaの出力からJSON部分を抽出
    try:
        start_index = response_text.index('[')
        end_index = response_text.rindex(']') + 1
        json_str = response_text[start_index:end_index]
        events = json.loads(json_str)
    except (ValueError, json.JSONDecodeError) as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", e)
        events = []
 
    return events

とした

しかし

1
2
Ollamaからのレスポンスの解析に失敗しました: Expecting value: line 1 column 2 (char 1)
抽出されたイベント:

となる

ChatGGPTで情報を調べると以下の方になる

Ollamaはストリーミング形式でレスポンスを返しています。
つまり、モデルの生成結果が複数のJSONオブジェクトとして順次送られてきています

{“model”:”elyza:jp8b”,”created_at”:”…”,”response”:”…”,”done”:false}

この”response”フィールドに、モデルが生成したテキストの一部が含まれています。全体のレスポンスを組み立てるには、これらの”response”フィールドを順番に連結する必要があります

requestsライブラリを使用して、ストリーミングレスポンスを処理します。response.iter_lines()を使用して、各行を逐次処理

各行をJSONとしてパースし、”response”フィールドの値を取り出して連結

連結したテキストがJSON文字列(リスト)であることを前提に、json.loads()でパース

となるようにコード変更

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import requests
import json
 
def parse_text_with_ollama(text, model_name='elyza:jp8b'):
    """
    Ollamaを使用してテキストから日時とイベントを抽出します。
 
    Args:
        text (str): 解析するテキスト。
        model_name (str): 使用するOllamaモデルの名前。
 
    Returns:
        list: 抽出されたイベントのリスト。
    """
    # OllamaのAPIエンドポイント
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日付とそれに対応する予定を抽出してください。結果は純粋なJSON形式で、日本語で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
 
重要事項:
- 出力は純粋なJSON形式で、追加のテキストや説明は含めないでください。
- 日付は"YYYY-MM-DD"の形式で出力してください。
- イベント名は元の文章から適切に抽出してください。
"""
 
    payload = {
        'model': model_name,
        'prompt': prompt
    }
 
    try:
        # ストリーミングレスポンスを取得
        response = requests.post(ollama_url, json=payload, stream=True)
        response.raise_for_status()
 
        # レスポンスのストリームを処理
        response_text = ''
        for line in response.iter_lines():
            if line:
                line_str = line.decode('utf-8')
                # 各行をJSONとしてパース
                line_json = json.loads(line_str)
                # "response"フィールドを連結
                response_text += line_json.get('response', '')
 
        print("Ollamaのレスポンス:")
        print(response_text)
    except requests.exceptions.RequestException as e:
        print("Ollamaへのリクエストに失敗しました:", e)
        return []
    except json.JSONDecodeError as e:
        print("レスポンスの解析に失敗しました:", e)
        return []
 
    # 連結したテキストをJSONとして解析
    try:
        events = json.loads(response_text)
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", e)
        events = []
 
    return events

とする

実行結果は

1
2
3
4
{'date': '2024-10-07', 'event': 'ペットボトルの準備'}
{'date': '2024-10-15', 'event': None}
{'date': '2024-10-25', 'event': '準備物の確認'}
{'date': '2024-10-26', 'event': None}

となる

一応はエラーは消えたが他のものを試してみる

台風10号の影響により、9月2日(月)の給食が中止となりました。 弁当の準備をお願いします。 8月30日(金)現時点では、日課の変更はありません。 台風や大雨の状況によっては、変更する場合があります。 4月にコドモンで知らせした、「R6年度 自然災害発生時、警報発表・避難情報発表時等に伴う学校の対処」とおり対応します。 今一度御確認ください。
という文章で実験すると

1
2
{'date': '8月30日', 'event': '弁当の準備'}
{'date': '9月2日', 'event': '給食中止'}

とほぼ目的に近いものになる

1
2
3
4
5
6
7
PTA会員 様  Caros membros do PTA
 
 日頃よりPTA活動に御理解・御協力いただいき、ありがとうございます。
 令和7年度のPTA本部役員候補選考の時期になりました。
 アンケート形式にて本部役員の立候補を募ります。
 添付の文書を御一読いただき、以下のリンクからアンケートに回答をお願いします。
 回答期限は9月30日(月)とします。

だと

1
2
3
4
5
6
7
8
Ollamaのレスポンス:
[
    {"date": "2024-09-25", "event": "PTA本部役員候補選出についてのアンケート"},
    {"date": "2024-09-30", "event": "アンケート回答期限"}
]
抽出されたイベント:
{'date': '2024-09-25', 'event': 'PTA本部役員候補選出についてのアンケート'}
{'date': '2024-09-30', 'event': 'アンケート回答期限'}

テキストファイルの内容によっては

1
抽出されたイベント: {'date': '2024-10-07', 'event': 'ペットボトルの準備'} {'date': '2024-10-15', 'event': None} {'date': '2024-10-25', 'event': '準備物の確認'} {'date': '2024-10-26', 'event': None}

となるため
プロンプトを改善して、モデルにeventがnullにならないように強調

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
prompt = f"""
以下の文章から、日付とそれに対応する予定を抽出してください。結果は純粋なJSON形式で、日本語で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
 
重要事項:
- 出力は純粋なJSON形式で、追加のテキストや説明は含めないでください。
- 日付は"YYYY-MM-DD"の形式で出力してください。
- **イベント名が存在しない場合、そのエントリを出力しないでください。**
- イベント名は元の文章から適切に抽出してください。
"""
 
 
 
リスト内包表記を使用:eventsリストからevent['event']がNoneでないイベントだけを新しいリストにします。
is not Noneを使用:event['event']がNoneでないことを確認
 
# Ollamaでテキストを解析(モデル名を指定)
events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
 
# 抽出されたイベントを表示
print("抽出されたイベント:")
for event in events:
    print(event)
 
# eventがNoneのものを削除
events = [event for event in events if event['event'] is not None]
 
# フィルタリング後のイベントを表示
print("有効なイベント:")
for event in events:
    print(event)

これらを変更するので

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import requests
import json
 
def parse_text_with_ollama(text, model_name='elyza:jp8b'):
    """
    Ollamaを使用してテキストから日時とイベントを抽出します。
 
    Args:
        text (str): 解析するテキスト。
        model_name (str): 使用するOllamaモデルの名前。
 
    Returns:
        list: 抽出されたイベントのリスト。
    """
    # OllamaのAPIエンドポイント
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日付とそれに対応する予定を抽出してください。結果は純粋なJSON形式で、日本語で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
 
重要事項:
- 出力は純粋なJSON形式で、追加のテキストや説明は含めないでください。
- 日付は"YYYY-MM-DD"の形式で出力してください。
- イベント名は元の文章から適切に抽出してください。
"""
 
    payload = {
        'model': model_name,
        'prompt': prompt
    }
 
    try:
        # ストリーミングレスポンスを取得
        response = requests.post(ollama_url, json=payload, stream=True)
        response.raise_for_status()
 
        # レスポンスのストリームを処理
        response_text = ''
        for line in response.iter_lines():
            if line:
                line_str = line.decode('utf-8')
                # 各行をJSONとしてパース
                line_json = json.loads(line_str)
                # "response"フィールドを連結
                response_text += line_json.get('response', '')
 
        print("Ollamaのレスポンス:")
        print(response_text)
    except requests.exceptions.RequestException as e:
        print("Ollamaへのリクエストに失敗しました:", e)
        return []
    except json.JSONDecodeError as e:
        print("レスポンスの解析に失敗しました:", e)
        return []
 
    # 連結したテキストをJSONとして解析
    try:
        events = json.loads(response_text)
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", e)
        events = []
 
    return events

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import requests
import json
 
def parse_text_with_ollama(text, model_name='elyza:jp8b'):
    """
    Ollamaを使用してテキストから日時とイベントを抽出します。
 
    Args:
        text (str): 解析するテキスト。
        model_name (str): 使用するOllamaモデルの名前。
 
    Returns:
        list: 抽出されたイベントのリスト。
    """
    # OllamaのAPIエンドポイント
 
    # Ollamaに送信するプロンプトを作成
    prompt = f"""
以下の文章から、日付とそれに対応する予定を抽出してください。結果は純粋なJSON形式で、日本語で、"date""event"のキーを持つオブジェクトのリストとして返してください。
 
文章:
{text}
 
出力例:
[
    {{"date": "2024-10-07", "event": "ペットボトルの準備"}},
    {{"date": "2024-10-25", "event": "準備物の確認"}}
]
 
重要事項:
- 出力は純粋なJSON形式で、追加のテキストや説明は含めないでください。
- 日付は"YYYY-MM-DD"の形式で出力してください。
- **イベント名が存在しない場合、そのエントリを出力しないでください。**
- イベント名は元の文章から適切に抽出してください。
"""
 
    payload = {
        'model': model_name,
        'prompt': prompt
    }
 
    try:
        # ストリーミングレスポンスを取得
        response = requests.post(ollama_url, json=payload, stream=True)
        response.raise_for_status()
 
        # レスポンスのストリームを処理
        response_text = ''
        for line in response.iter_lines():
            if line:
                line_str = line.decode('utf-8')
                # 各行をJSONとしてパース
                line_json = json.loads(line_str)
                # "response"フィールドを連結
                response_text += line_json.get('response', '')
 
        print("Ollamaのレスポンス:")
        print(response_text)
    except requests.exceptions.RequestException as e:
        print("Ollamaへのリクエストに失敗しました:", e)
        return []
    except json.JSONDecodeError as e:
        print("レスポンスの解析に失敗しました:", e)
        return []
 
    # 連結したテキストをJSONとして解析
    try:
        events = json.loads(response_text)
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", e)
        events = []
 
    return events

にして

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
 
# テキストファイルのパス
text_file_path = 'school_notice.txt'  # 処理するテキストファイルのパス
 
# テキストの読み込み
with open(text_file_path, 'r', encoding='utf-8') as file:
    text_content = file.read()
 
# Ollamaでテキストを解析(モデル名を指定)
events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
 
# 抽出されたイベントを表示
print("抽出されたイベント:")
for event in events:
    print(event)
 
# Googleカレンダーにイベントを追加
# add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
 
# テキストファイルのパス
text_file_path = 'school_notice.txt'  # 処理するテキストファイルのパス
 
# テキストの読み込み
with open(text_file_path, 'r', encoding='utf-8') as file:
    text_content = file.read()
 
# Ollamaでテキストを解析(モデル名を指定)
events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
 
# 抽出されたイベントを表示
print("抽出されたイベント:")
for event in events:
    print(event)
 
# eventがNoneのものを削除
events = [event for event in events if event['event'] is not None]
 
# フィルタリング後のイベントを表示
print("有効なイベント:")
for event in events:
    print(event)
 
# 有効なイベントがある場合のみGoogleカレンダーに追加
if events:
    add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
else:
    print("有効なイベントがありません。")

とした

Googleカレンダーの今週の予定を取得する

Googleカレンダーの今週の予定を取得する

実行環境
M1 MacbookAir 16GB

1
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib pytz

で必要なライブラリをインストール

1
touch get_week.py

でファイルを作成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ
 
def main():
    """今週のGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return
 
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)
 
    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
 
    # 現在の日時を取得
    now = datetime.datetime.now(tz)
 
    # 今週の開始日(月曜日)と終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)
 
    # RFC3339形式に変換
    time_min = start_of_week.isoformat()
    time_max = end_of_week.isoformat()
 
    print(f"{time_min} から {time_max} までの予定を取得します。")
 
    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    if not events:
        print('今週の予定はありません。')
    else:
        print('今週の予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")
 
if __name__ == '__main__':
    main()

実行結果は

1
2
3
2024-09-30T23:52:51.254684+09:00 から 2024-10-07T23:52:51.254684+09:00 までの予定を取得します。
今週の予定:
2024-10-05T09:00:00+09:00 - APIを使って追加したイベント

となる

一週間の予定は月曜日からの予定となっている

次は実行した日より前の予定は出力しないようにコード変更する

現在のコードでは、time_min(開始時刻)が「今週の始まり(週の月曜日)」

これを「現在の日時」に変更することで、実行した日より前の予定を除外

time_minをnow(現在の日時)に変更

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ
 
def main():
    """今週のGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return
 
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)
 
    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
 
    # 現在の日時を取得
    now = datetime.datetime.now(tz)
 
    # 今週の開始日(月曜日)と終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)
 
    # RFC3339形式に変換
    time_min = start_of_week.isoformat()
    time_max = end_of_week.isoformat()
 
    print(f"{time_min} から {time_max} までの予定を取得します。")
 
    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    if not events:
        print('今週の予定はありません。')
    else:
        print('今週の予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")
 
if __name__ == '__main__':
    main()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ
 
def main():
    """今週の残りのGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return
 
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)
 
    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
 
    # 現在の日時を取得
    now = datetime.datetime.now(tz)
 
    # 今週の終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)
 
    # time_minを現在の日時に設定
    time_min = now.isoformat()
    # time_maxは今週の終了日時
    time_max = end_of_week.isoformat()
 
    print(f"{time_min} から {time_max} までの予定を取得します。")
 
    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])
 
    if not events:
        print('今週の残りの予定はありません。')
    else:
        print('今週の残りの予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")
 
if __name__ == '__main__':
    main()

とすることで解決

次はテキストの内容から日時と予定を取り出しGoogleカレンダーへAPIで予定を書き込みできるようにする
また
日時と予定の取り出しはOllamaを使うことで汎用性を持たせることにする

Google カレンダーに予定をpythonで追加する

Google カレンダーに予定をpythonで追加する

実行環境
M1 MacbookAir 16GB

APIなどの登録が必要になるので
[初心者向け] GoogleカレンダーにPythonから予定を追加・編集してみた

を参考に行う

https://developers.google.com/calendar/api/v3/reference/calendarList?hl=ja
がリファレンス

なお情報の取得も後で必要になるので

Googleカレンダー情報を取得する
も参考にする

まずは google カレンダーのAPIを有効にする

流れとしては
* 認証情報の発行
* 認証情報を使ってPythonのプログラムを実行

Google Cloudのコンソールにログイン

プロジェクトはすでに作成しているものを選択
APIとサービスをクリック

APIとサービスを有効にするをクリック

calendar
で検索する
なお日本語でカレンダーとしても出ないので注意

Google Calendar API をクリック
有効にするをクリック

これでOK
認証関連のJSONファイルは以前Gmailで作成してるので
今回もいずれGmailを使うので省略

1
mkdir week_calendar_voice

で作業ディレクトリを作成

ここに

1
cp ../mail_auto/*.json .

で以前作成したプロジェクトから
credentials.json
token.json
をコピーする

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os.path
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
 
# カレンダーAPIのスコープ(Google Calendarの読み書き権限)
 
# 既存の token.json を使用
creds = None
if os.path.exists('token.json'):
    creds = Credentials.from_authorized_user_file('token.json', SCOPES)
     
# トークンが無効またはスコープが一致しない場合は再認証
if not creds or not creds.valid:
    if creds and creds.expired and creds.refresh_token:
        creds.refresh(Request())
    else:
        flow = InstalledAppFlow.from_client_secrets_file(
            'credentials.json', SCOPES)
        creds = flow.run_local_server(port=0)
         
    # トークンを保存
    with open('token.json', 'w') as token:
        token.write(creds.to_json())
 
# Google Calendar APIサービスを構築
service = build('calendar', 'v3', credentials=creds)
 
# イベントの詳細を設定
event = {
  'summary': 'APIを使って追加したイベント',
  'location': 'オンライン',
  'description': 'Google Calendar APIで追加されたイベントです。',
  'start': {
    'dateTime': '2024-10-05T09:00:00',
    'timeZone': 'Asia/Tokyo',
  },
  'end': {
    'dateTime': '2024-10-05T10:00:00',
    'timeZone': 'Asia/Tokyo',
  },
  'attendees': [
    {'email': 'example@example.com'},
  ],
  'reminders': {
    'useDefault': False,
    'overrides': [
      {'method': 'email', 'minutes': 24 * 60},
      {'method': 'popup', 'minutes': 10},
    ],
  },
}
 
# カレンダーにイベントを挿入
event = service.events().insert(calendarId='primary', body=event).execute()
print(f"イベントが作成されました: {event.get('htmlLink')}")

がchatgpt で生成されたコードだが

追加する予定は

1
# イベントの詳細を設定 event = { 'summary': 'APIを使って追加したイベント', 'location': 'オンライン', 'description': 'Google Calendar APIで追加されたイベントです。', 'start': { 'dateTime': '2024-10-05T09:00:00', 'timeZone': 'Asia/Tokyo', }, 'end': { 'dateTime': '2024-10-05T10:00:00', 'timeZone': 'Asia/Tokyo', }, 'attendees': [ {'email': 'example@example.com'}, ], 'reminders': { 'useDefault': False, 'overrides': [ {'method': 'email', 'minutes': 24 * 60}, {'method': 'popup', 'minutes': 10}, ], }, }

このうち
summaryがカレンダーに表示する予定
locationが場所で住所や会場名を指定することも可能
descriptionが詳細な説明
‘start’: イベントの開始時刻
‘end’: イベントの終了時刻
‘attendees’: 出席者のリスト 多分これは使わない
‘reminders’: リマインダーの設定 これも使わない

とりあえずテストなので
attendersと reminders を削除して予定を追加してみる

1
touch add_calendar.py

でファイルを作成

これで実行したら

1
2
3
4
5
6
7
8
9
10
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/add_calendar.py", line 18, in <module>
    creds.refresh(Request())
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

これはgmailの許可はあるけど
Calendar のAPIのスコープがないのが原因

なので
一度 token.jsonを削除して再度実行したけどだめだった

これは認証関連の問題で
再度

1
rm token.json

で削除してから

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import datetime
import os.path
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
 
# スコープの設定
 
# トークンファイルのチェック
creds = None
if os.path.exists('token.json'):
    creds = Credentials.from_authorized_user_file('token.json', SCOPES)
     
# トークンがない場合、新しい認証を実行
if not creds or not creds.valid:
    if creds and creds.expired and creds.refresh_token:
        creds.refresh(Request())
    else:
        flow = InstalledAppFlow.from_client_secrets_file(
            'credentials.json', SCOPES)
        creds = flow.run_local_server(port=0)
         
    # トークンを保存
    with open('token.json', 'w') as token:
        token.write(creds.to_json())

1
touch test2.py

で作成して

1
python test2.py

を実行することで認証画面になるので
そのまま進めていけば認証が完了する

再度

1
python add_calendar.py

を実行すれば
無事に予定が追加される

とりあえず予定の追加はできたので
次に予定の取得をする
まずは今週の予定から