Gmailから抽出した本文をOllamaに渡しGoogle カレンダーに登録する

Gmailから抽出した本文をOllamaに渡しGoogle カレンダーに登録する

touch main6.py

でファイルを作成

Gmailから抽出したメッセージの本文を、
PDFのテキスト抽出と同様にOllamaに渡すようにコードを変更

Gmailからの本文取得:
gmail_get_latest_unread_message_bodyを使用して、Gmailの本文を取得

Ollamaに渡す:
取得した本文をOllamaに渡して解析

解析結果のフィルタリングとカレンダーへの追加:
PDFからのテキスト処理と同様に、抽出されたイベントをフィルタリングしてカレンダーに追加

以下は、Gmailから抽出した本文をOllamaに渡すようにした修正版コード

from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
from event_utils import filter_events

# Gmail APIの初期化
service = gmail_init()

# 最新の未読メッセージの本文を取得
label_id = ''  # 取得するラベルID
message_body = gmail_get_latest_unread_message_body(service, label_id)

if message_body != "No unread messages found.":
    print("メール本文:")
    print(message_body)
    
    # Ollamaでメール本文を解析
    raw_events = parse_text_with_ollama(message_body, model_name='elyza:jp8b')
    
    # 抽出されたイベントを表示
    print("抽出されたイベント:")
    for event in raw_events:
        print(event)

    # イベントのフィルタリングとフォーマット
    events = filter_events(raw_events)

    # 有効なイベントを表示
    print("有効なイベント:")
    for event in events:
        print(event)

    # 有効なイベントがある場合のみGoogleカレンダーに追加
    if events:
        add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
    else:
        print("有効なイベントがありません。")
else:
    print("未読メッセージが見つかりませんでした。")

実行すると

「Request had insufficient authentication scopes」

これは、Google Calendar APIへのリクエストに対して適切な認証スコープが不足している

現在使用しているスコープがカレンダーイベントの追加(writeアクセス)を許可していないため

スコープに
https://www.googleapis.com/auth/calendar.events
または
https://www.googleapis.com/auth/calendar
を追加し、Googleカレンダーへの書き込み権限を付与する

gmail_utils.pyを修正する

# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    return body  # 本文のみを返す

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date

def gmail_init():
    creds = None
    token_path = '../mail_auto/token.json'  # token.jsonのパスを指定
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

を書き換えてみる

実験結果のログを見ると
Gmail関連の動作をさせるとエラーになる
このため、tokenが重複しているとエラーを起こす可能性があるのかもしれない

Gmailから取得し
Calendar書き込みするとエラーになり
その後はカレンダー機能を使う他のものも動作しなくなる

しかし一度token.jsonを削除し、再度認証すれば
カレンダー機能は使えるようになっている

とりあえず認証関連はこれで動くので
再度tokenを発行すれば解決するはず

先に顔認証したら週間予定を読み上げるのを作成する

gmailから本文を抽出しカレンダー書き込み

gmailから本文を抽出しカレンダー書き込み

PDFからはできるようになったので
次はGmailから本文を抽出しテキストとして取り出すようにする

以前、gmailの未読の最新1件を取得し
本文とURLを抽出するものを作成したので
このソースを変更し
本文を抽出するようにする

なお、実行前にラベルIDを調べる必要がある
Schoolというラベルを作成し
このIDを調べる

cd ../mail_auto
python base,py

を実行し
結果の中から
Schoolのものを探す

コードは

from __future__ import print_function
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

import base64, email #デコード用
import dateutil.parser

#token.jsonを設定
tokenPath = "token.json"

#credentials.jsonを設定
credentialsPath = "credentials.json"


#メール本文のデコード
def decode(encoded):
   decoded = base64.urlsafe_b64decode(encoded).decode()
   return decoded


#メール本文の内容を配列で取得する関数
def gmail_get_messages_body(service, labelIdsValue):
   mailBody = []
   
   # メッセージの一覧を取得
   messages = service.users().messages()
   msg_list = messages.list(userId='me', labelIds=labelIdsValue).execute() 
   # msg_list = messages.list(userId='me', labelIds=labelIdsValue ,maxResults=456).execute() #最大値指定
   
   # 取得したメッセージの一覧を配列に格納
   for msg in msg_list['messages']:
       
       #メールの受信日時を取得
       date = gmail_get_messages_body_date(messages,msg)
       
       topid = msg['id']     
       msg = messages.get(userId='me', id=topid).execute()
       
       if(msg["payload"]["body"]["size"]!=0):
           mailBody.append(date+"<br>"+decode(msg["payload"]["body"]["data"])) 
       else:
           #メールによっては"parts"属性の中に本文がある場合もある
           mailBody.append(date+"<br>"+decode(msg["payload"]["parts"][0]["body"]["data"])) 

   return mailBody
   

#gmail_get_messages_body関数内で受信日時を取得する関数
def gmail_get_messages_body_date(messages,msg):
   msg_id = msg['id']
   m = messages.get(userId='me', id=msg_id, format='raw').execute()
   raw = base64.urlsafe_b64decode(m['raw'])
   
   # Emailを解析する
   eml = email.message_from_bytes(raw)
   
   date = dateutil.parser.parse(eml.get('Date')).strftime("%Y-%m-%d_%H-%M-%S")
   return date

#ラベルのIDやnameを表示する関数
def gmail_display_label(service):
   results = service.users().labels().list(userId='me').execute()
   labels = results.get('labels', [])

   if not labels:
       print('No labels found.')
   else:
       print('Labels:')
       for label in labels:
           print(label)


#初期化(from quickstart.py)
def gmail_init():
   creds = None
   # The file token.json stores the user's access and refresh tokens, and is
   # created automatically when the authorization flow completes for the first
   # time.
   if os.path.exists(tokenPath):
       creds = Credentials.from_authorized_user_file(tokenPath, SCOPES)
   # If there are no (valid) credentials available, let the user log in.
   if not creds or not creds.valid:
       if creds and creds.expired and creds.refresh_token:
           creds.refresh(Request())
       else:
           flow = InstalledAppFlow.from_client_secrets_file(
               credentialsPath, SCOPES)
           creds = flow.run_local_server(port=0)
       # Save the credentials for the next run
       with open(tokenPath, 'w') as token:
           token.write(creds.to_json())
           
   service = build('gmail', 'v1', credentials=creds)
   return service


#ここからやりたい処理を書く
service = gmail_init()

#quickstart.pyと同じ処理にしてみた
results = service.users().labels().list(userId='me').execute()
labels = results.get('labels', [])

if not labels:
   print('No labels found.')
else:
   print('Labels:')
   for label in labels:
       print(label['name'])
       
gmail_display_label(service)

grep を使い

python base,py| grep School
で探す

School
{'id': '', 'name': 'School', 'type': 'user'}

となったので
あとはこれを元にコードを書き換えていく

cp ../mail_voice/gmail_utils.py .

でコードをコピー

次にこのコードをURLを抽出せず本文のみ抽出するようにコード変更

# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    return body  # 本文のみを返す

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date

def gmail_init():
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

次に main4.pyは Google Drive からPDFを取得し
抽出する機能だったので
新たに main5.pyを作成し
Gmail未読のものを対象にするようにしてみる

touch main5.py

GPTで生成したコードからラベルIDを変更する

# main2.py
from gmail_utils import gmail_init, gmail_get_latest_unread_message_body
from ollama_module import parse_text_with_ollama

# Gmail APIの初期化
service = gmail_init()

# 最新の未読メッセージの本文を取得
label_id = ''  # 取得するラベルID
message_body = gmail_get_latest_unread_message_body(service, label_id)

if message_body != "No unread messages found.":
    print("メール本文:")
    print(message_body)
    
    # Ollamaでメール本文を解析
    events = parse_text_with_ollama(message_body, model_name='elyza:jp8b')
    
    # 抽出されたイベントを表示
    print("抽出されたイベント:")
    for event in events:
        print(event)
else:
    print("未読メッセージが見つかりませんでした。")

次にGmail でSchoolラベルの中から予定があるメールを試しに1つ未読にして実験する

結果

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/main5.py", line 6, in <module>
    service = gmail_init()
  File "/Users/snowpool/aw10s/week_calendar_voice/gmail_utils.py", line 52, in gmail_init
    creds.refresh(Request())
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

となる

これはtoken.jsonがあってないため
なので以前gmail関連のプロジェクトを作成したので
そちらを使うようにコード変更

# gmail_utils.py
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
import base64
import dateutil.parser

# スコープの設定
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']

def decode(encoded):
    decoded_bytes = base64.urlsafe_b64decode(encoded.encode('ASCII'))
    decoded_message = decoded_bytes.decode('utf-8')
    return decoded_message

def gmail_get_latest_unread_message_body(service, labelIdsValue):
    messages = service.users().messages()
    msg_list = messages.list(userId='me', labelIds=[labelIdsValue], q="is:unread", maxResults=1).execute()

    if 'messages' not in msg_list:
        return "No unread messages found."

    msg = msg_list['messages'][0]
    msg_id = msg['id']
    msg = messages.get(userId='me', id=msg_id, format='full').execute()

    body = ""
    if 'parts' in msg['payload']:
        for part in msg['payload']['parts']:
            if part['mimeType'] == 'text/plain' and part['body']['size'] > 0:
                body = decode(part['body']['data'])
                break
    else:
        body = decode(msg['payload']['body']['data'])

    return body  # 本文のみを返す

def gmail_get_messages_body_date(msg):
    headers = msg['payload']['headers']
    date_header = next(header['value'] for header in headers if header['name'].lower() == 'date')
    date = dateutil.parser.parse(date_header).strftime("%Y-%m-%d %H:%M:%S")
    return date

def gmail_init():
    creds = None
    token_path = '../mail_auto/token.json'  # token.jsonのパスを指定
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        with open('token.json', 'w') as token:
            token.write(creds.to_json())
    service = build('gmail', 'v1', credentials=creds)
    return service

再度実行するとイベントの抽出ができた

次にこれをカレンダーに書き込むようにする

日付形式の問題解決とイベント抽出とフィルタリング

日付形式の問題解決とイベント抽出とフィルタリング

レスポンスに含まれている日付形式が「令和6年10月26日」などの和暦表記や、「2024-10-08」などのISO表記と混在している

このような場合、統一的に処理するためのフォーマット変換を行う必要がある

from datetime import datetime
import re

def convert_japanese_date(japanese_date):
    # 和暦の例: "令和6年10月26日"
    pattern = r"令和(\d+)年(\d+)月(\d+)日"
    match = re.match(pattern, japanese_date)
    if match:
        year = int(match.group(1)) + 2018  # 令和元年は2019年に相当
        month = int(match.group(2))
        day = int(match.group(3))
        return f"{year}-{month:02d}-{day:02d}"
    # ISO形式のチェック
    try:
        datetime.strptime(japanese_date, '%Y-%m-%d')
        return japanese_date
    except ValueError:
        return None  # 無効な日付形式

また
日付形式の変換後、無効な日付やイベント名が空のエントリをフィルタリングするように変更

eventsリストから有効なエントリのみを抽出

def filter_events(events):
    valid_events = []
    for event in events:
        date = convert_japanese_date(event['date'])
        if date and event['event']:
            valid_events.append({'date': date, 'event': event['event']})
        else:
            print(f"無効な日付形式: {event['date']}")
    return valid_events

日付の変換やイベントのフィルタリングを他のスクリプトでも使うため
モジュール化する

event_utils.pyというモジュールを作成し
これに記述する

touch event_utils.py

内容は

from datetime import datetime
import re

def convert_japanese_date(japanese_date):
    pattern = r"令和(\d+)年(\d+)月(\d+)日"
    match = re.match(pattern, japanese_date)
    if match:
        year = int(match.group(1)) + 2018  # 令和元年は2019年に相当
        month = int(match.group(2))
        day = int(match.group(3))
        return f"{year}-{month:02d}-{day:02d}"
    # ISO形式のチェック
    try:
        datetime.strptime(japanese_date, '%Y-%m-%d')
        return japanese_date
    except ValueError:
        return None  # 無効な日付形式

def filter_events(events):
    valid_events = []
    for event in events:
        date = convert_japanese_date(event['date'])
        if date and event['event']:
            valid_events.append({'date': date, 'event': event['event']})
        else:
            print(f"無効な日付形式: {event['date']}")
    return valid_events

これをmain4.pyで
event_utils.pyをインポートして使用する

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
from event_utils import convert_japanese_date, filter_events

# フォルダIDを指定して処理を開始
folder_id = "" #Folder ID
texts = extract_texts_from_folder(folder_id)

if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        raw_events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
        print("抽出されたイベント:")
        for event in raw_events:
            print(event)

        # イベントのフィルタリングとフォーマット
        events = filter_events(raw_events)

        print("有効なイベント:")
        for event in events:
            print(event)

        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

これで
eventsリストからNoneのイベントを削除するフィルタリング処理が追加
Noneを削除したイベントのみをカレンダーに追加

events = [event for event in events if event['event'] is not None]

の部分がその処理

また
eventsリストが空でないかを確認し、有効なイベントがある場合のみカレンダーに追加
もしイベントがなければ、カレンダーには何も追加されず、
「有効なイベントがありません。」というメッセージが表示される

このチェックによって、無駄な処理が減り、空のイベントがカレンダーに追加されるのを防ぐ

if events:
    add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
else:
    print("有効なイベントがありません。")

これで実行すると
運動会のイベントが失敗している

これは
OllamaからのレスポンスがJSON形式として期待されているのに対し、
不正なJSON形式
(複数のJSONオブジェクトがコンマで区切られている形式や、余分なテキストが含まれている形式)
が返ってきていることが考えられます。

これを解決するために、
Ollamaのレスポンスを受け取る際に、
例外処理や、JSON以外の余計な部分を取り除く前処理を追加

レスポンスを前処理する関数を追加し、JSON形式に整形
JSON解析エラーの例外処理を追加して、不正な形式のデータをスキップ

llamaからのレスポンスを受け取って前処理し、JSON形式に変換するには

import json
import re

def parse_ollama_response(response_text):
    # 正規表現でJSON配列形式のみを抽出
    json_pattern = re.compile(r'\[.*?\]', re.DOTALL)
    match = json_pattern.search(response_text)
    if not match:
        print("JSON形式のレスポンスが見つかりません。")
        return []

    json_data = match.group(0)  # 最初のJSON形式の部分を取得

    try:
        events = json.loads(json_data)
        return events
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", str(e))
        return []

parse_ollama_response関数を
parse_text_with_ollama関数の後処理として使用し、Ollamaからのレスポンスを整形
これにより、Ollamaからの不正なレスポンス形式を取り除く

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
import json
import re

# OllamaからのレスポンスをJSON形式に変換
def parse_ollama_response(response_text):
    # JSON配列形式のみを抽出する正規表現
    json_pattern = re.compile(r'\[.*?\]', re.DOTALL)
    match = json_pattern.search(response_text)
    if not match:
        print("JSON形式のレスポンスが見つかりません。")
        return []

    json_data = match.group(0)  # 最初のJSON形式の部分を取得

    try:
        events = json.loads(json_data)
        return events
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", str(e))
        return []

# Google DriveのSchoolフォルダID
folder_id = "" 

# フォルダ内のPDFファイルからテキストを抽出
texts = extract_texts_from_folder(folder_id)

# テキストが抽出できているか確認
if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        # Ollamaでテキストを解析(モデル名を指定)
        raw_response = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
        
        # Ollamaのレスポンスを前処理してJSONに変換
        events = parse_ollama_response(raw_response)

        print("抽出されたイベント:")
        for event in events:
            print(event)

        # Noneのイベントを除外
        events = [event for event in events if event['event']]

        # フィルタリング後のイベントを表示
        print("有効なイベント:")
        for event in events:
            print(event)

        # 有効なイベントがある場合のみGoogleカレンダーに追加
        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

としたが

TypeError: expected string or bytes-like object というエラーは、
response_textのデータ型が文字列やバイト列ではない場合に発生

これは、response_textが予期せず辞書(dict)型や他の非文字列データ型である場合に起こることがよくある

response_textの型が文字列であることを確認し、
もし辞書型など別の型が返ってきている場合には、JSONエンコードして文字列に変換する処理を追加

レスポンスが辞書型か確認し、文字列に変換
parse_text_with_ollamaから返ってくるレスポンスが文字列であるかをチェックし、
もし辞書型など別の型であれば文字列に変換する処理を追加

型チェックを行う
parse_ollama_response関数内で
response_textが文字列であることをチェックし、文字列でない場合には変換

以下修正

import json
import re

def parse_ollama_response(response_text):
    # レスポンスが辞書型なら文字列に変換
    if isinstance(response_text, dict):
        response_text = json.dumps(response_text)  # 辞書をJSON文字列に変換
    elif not isinstance(response_text, str):
        print("不明なデータ形式のレスポンスを受け取りました:", type(response_text))
        return []
    
    # JSON配列形式のみを抽出する正規表現
    json_pattern = re.compile(r'\[.*?\]', re.DOTALL)
    match = json_pattern.search(response_text)
    if not match:
        print("JSON形式のレスポンスが見つかりません。")
        return []

    json_data = match.group(0)  # 最初のJSON形式の部分を取得

    try:
        events = json.loads(json_data)
        return events
    except json.JSONDecodeError as e:
        print("Ollamaからのレスポンスの解析に失敗しました:", str(e))
        return []

という処理を追加

response_textが辞書型かを確認します。辞書型の場合はjson.dumps()を使って文字列に変換

次に、正規表現を使用してレスポンス内のJSON配列形式を抽出

json.loadsを使って抽出した部分をJSONとしてパースし、
もしエラーが発生した場合にはエラーメッセージを表示

逆にエラーになった

このため一度

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar
from event_utils import convert_japanese_date, filter_events

# フォルダIDを指定して処理を開始
folder_id = ""
texts = extract_texts_from_folder(folder_id)

if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        raw_events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')
        print("抽出されたイベント:")
        for event in raw_events:
            print(event)

        # イベントのフィルタリングとフォーマット
        events = filter_events(raw_events)

        print("有効なイベント:")
        for event in events:
            print(event)

        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

に戻したら
となって無事にイベントが追加された

次は
メールから読み込んで実行できるようにする

GoogleDriveの指定フォルダからファイルを取得し処理のモジュール化と処理結果を ollamaへ渡す

GoogleDriveの指定フォルダからファイルを取得し処理のモジュール化と処理結果を ollamaへ渡す

Google DriveからPDFを取得し、テキストを抽出する機能をモジュールに分ける

drive_pdf_extractor.py

を作成する

touch drive_pdf_extractor.py

でファイルを作成し

import os
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from googleapiclient.http import MediaIoBaseDownload
from pdfminer.high_level import extract_text
from io import BytesIO

# Google Drive APIの認証
def authenticate_drive():
    SCOPES = ['https://www.googleapis.com/auth/drive']
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("認証トークンが見つかりません。認証を実行してください。")
        return None
    return build('drive', 'v3', credentials=creds)

# フォルダ内のPDFファイルリストを取得
def list_pdf_files_in_folder(service, folder_id):
    """Google Driveフォルダ内のPDFファイルのリストを取得します"""
    query = f"'{folder_id}' in parents and mimeType='application/pdf'"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    files = results.get('files', [])
    return files

# Google DriveからPDFファイルをダウンロード
def download_pdf_from_drive(service, file_id):
    """Google DriveからPDFファイルをダウンロードし、バイナリデータを返します"""
    request = service.files().get_media(fileId=file_id)
    file_data = BytesIO()
    downloader = MediaIoBaseDownload(file_data, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
        print(f"Download Progress: {int(status.progress() * 100)}%")
    file_data.seek(0)
    return file_data

# PDFからテキストを抽出
def extract_text_from_pdf(pdf_data):
    """PDFファイルデータからテキストを抽出します"""
    text = extract_text(pdf_data)
    return text

# 指定したフォルダ内のすべてのPDFファイルのテキストを抽出
def extract_texts_from_folder(folder_id):
    """フォルダ内のPDFファイルからテキストを抽出し、リストとして返します"""
    service = authenticate_drive()
    if not service:
        return []

    pdf_files = list_pdf_files_in_folder(service, folder_id)
    if not pdf_files:
        print("指定されたフォルダにPDFファイルが見つかりません。")
        return []

    texts = []
    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file['name']}")
        pdf_data = download_pdf_from_drive(service, pdf_file['id'])
        pdf_text = extract_text_from_pdf(pdf_data)
        if pdf_text:
            texts.append(pdf_text)
        else:
            print(f"{pdf_file['name']} からテキストを抽出できませんでした。")
    return texts

として保存

 touch main3.py

from drive_pdf_extractor import extract_texts_from_folder
from ollama_module import parse_text_with_ollama
from google_calendar_module import add_events_to_calendar

# Google DriveのSchoolフォルダID
folder_id = ""

# フォルダ内のPDFファイルからテキストを抽出
texts = extract_texts_from_folder(folder_id)

# テキストが抽出できているか確認
if not texts:
    print("フォルダ内に解析するテキストがありません。")
else:
    for text_content in texts:
        # Ollamaでテキストを解析(モデル名を指定)
        events = parse_text_with_ollama(text_content, model_name='elyza:jp8b')

        # 抽出されたイベントを表示
        print("抽出されたイベント:")
        for event in events:
            print(event)

        # eventがNoneのものを削除
        events = [event for event in events if event['event'] is not None]

        # フィルタリング後のイベントを表示
        print("有効なイベント:")
        for event in events:
            print(event)

        # 有効なイベントがある場合のみGoogleカレンダーに追加
        if events:
            add_events_to_calendar(events, calendar_id='primary', token_file='token.json', credentials_file='credentials.json')
        else:
            print("有効なイベントがありません。")

として保存

しかし認証エラーになる

これは google drive API の時に作成した token.jsonが
Google calendar API の権限と異なるため
なので参照する token.json を変更

import os
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from googleapiclient.http import MediaIoBaseDownload
from pdfminer.high_level import extract_text
from io import BytesIO

# Google Drive APIの認証
def authenticate_drive():
    SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
    token_path = 'g_drive/token.json'  # token.jsonのパスを変更
    creds = None
    if os.path.exists(token_path):
        creds = Credentials.from_authorized_user_file(token_path, SCOPES)
    else:
        print("認証トークンが見つかりません。認証を実行してください。")
        return None
    return build('drive', 'v3', credentials=creds)

# 他の関数も引き続き同様
# フォルダ内のPDFファイルリストを取得
def list_pdf_files_in_folder(service, folder_id):
    query = f"'{folder_id}' in parents and mimeType='application/pdf'"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    files = results.get('files', [])
    return files

# Google DriveからPDFファイルをダウンロード
def download_pdf_from_drive(service, file_id):
    request = service.files().get_media(fileId=file_id)
    file_data = BytesIO()
    downloader = MediaIoBaseDownload(file_data, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
        print(f"Download Progress: {int(status.progress() * 100)}%")
    file_data.seek(0)
    return file_data

# PDFからテキストを抽出
def extract_text_from_pdf(pdf_data):
    text = extract_text(pdf_data)
    return text

# 指定したフォルダ内のすべてのPDFファイルのテキストを抽出
def extract_texts_from_folder(folder_id):
    service = authenticate_drive()
    if not service:
        return []

    pdf_files = list_pdf_files_in_folder(service, folder_id)
    if not pdf_files:
        print("指定されたフォルダにPDFファイルが見つかりません。")
        return []

    texts = []
    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file['name']}")
        pdf_data = download_pdf_from_drive(service, pdf_file['id'])
        pdf_text = extract_text_from_pdf(pdf_data)
        if pdf_text:
            texts.append(pdf_text)
        else:
            print(f"{pdf_file['name']} からテキストを抽出できませんでした。")
    return texts

というように

drive_pdf_extractor.py

を修正

修正したのは

token_path = 'g_drive/token.json'  # token.jsonのパスを変更

というように参照先を変えた

これで
Main3.pyを実行したが

レスポンスに含まれている日付形式が「令和6年10月26日」などの和暦表記や、
「2024-10-08」などのISO表記と混在しているため

統一的に処理するためのフォーマット変換を行い
日付形式の変換後、無効な日付やイベント名が空のエントリをフィルタリングするように変更する必要がある

GoogleDriveの指定フォルダからファイルを取得し処理

GoogleDriveの指定フォルダからファイルを取得し処理

Google Drive API でフォルダのIDを取得できたので
次にここにpdfファイルを配置して
これを取得しテキストを抽出する

PyPDF2 と pdfminer.six はどちらも PDF ファイルを操作するための Python ライブラリ

PyPDF2 には PDF ファイルの結合、分割、ページの回転、ページの追加・削除など、PDF のページ操作機能が充実

pdfminer.six は PDF を読み取ってテキストデータを抽出することが主目的のため、ページ操作や編集の機能はない

PyPDF2 は比較的軽量で、簡単なテキスト抽出やページ操作には高速に動作しますが、PDFの内容解析が簡易的

pdfminer.six はテキスト抽出においては非常に詳細な処理を行うため、
特に長い PDF や複雑なレイアウトのファイルでは処理時間が長くなる

ということでpdfminer.sixを使いテキストを取り出す

そしてその処理結果を渡すというモジュールを作成する

Google Drive の
School フォルダの ID: ここからpdfファイルを取得し
pdfminer.sixを使ってPDFからテキストを抽出したい

Google Drive の School フォルダの ID:
ここからpdfファイルを取得し pdfminer.sixを使ってPDFからテキストを抽出したい
個人で使うので token.jsonを使ったコードにする
抽出したテキストを処理する関数に渡すようにするため 抽出したテキストをOllamaに送信は不要。

import os
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
from googleapiclient.http import MediaIoBaseDownload
from pdfminer.high_level import extract_text
from io import BytesIO

# Google Drive APIの認証
def authenticate_drive():
    SCOPES = ['https://www.googleapis.com/auth/drive']
    creds = None
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("認証トークンが見つかりません。認証を実行してください。")
        return None
    return build('drive', 'v3', credentials=creds)

# フォルダ内のPDFファイルリストを取得
def list_pdf_files_in_folder(service, folder_id):
    """Google Driveフォルダ内のPDFファイルのリストを取得します"""
    query = f"'{folder_id}' in parents and mimeType='application/pdf'"
    results = service.files().list(q=query, fields="files(id, name)").execute()
    files = results.get('files', [])
    return files

# Google DriveからPDFファイルをダウンロード
def download_pdf_from_drive(service, file_id):
    """Google DriveからPDFファイルをダウンロードし、バイナリデータを返します"""
    request = service.files().get_media(fileId=file_id)
    file_data = BytesIO()
    downloader = MediaIoBaseDownload(file_data, request)
    done = False
    while not done:
        status, done = downloader.next_chunk()
        print(f"Download Progress: {int(status.progress() * 100)}%")
    file_data.seek(0)
    return file_data

# PDFからテキストを抽出
def extract_text_from_pdf(pdf_data):
    """PDFファイルデータからテキストを抽出します"""
    text = extract_text(pdf_data)
    return text

# テキストを処理する関数
def process_text(text):
    """抽出したテキストを処理します"""
    print("Extracted Text:\n", text)
    # テキストの処理ロジックをここに追加します

# フォルダ内のPDFを処理
def process_pdfs_in_folder(folder_id):
    service = authenticate_drive()
    if not service:
        return

    pdf_files = list_pdf_files_in_folder(service, folder_id)
    if not pdf_files:
        print("指定されたフォルダにPDFファイルが見つかりません。")
        return

    for pdf_file in pdf_files:
        print(f"Processing file: {pdf_file['name']}")
        pdf_data = download_pdf_from_drive(service, pdf_file['id'])
        pdf_text = extract_text_from_pdf(pdf_data)
        if pdf_text:
            process_text(pdf_text)
        else:
            print(f"{pdf_file['name']} からテキストを抽出できませんでした。")

# フォルダIDを指定して処理を開始
folder_id = ""  # SchoolフォルダのID
process_pdfs_in_folder(folder_id)

これを

python get_pdf_read.py

で実行するとかなりの精度で取り出すことができた

次はこれをモジュールにしてollamaに処理結果を渡せるようにする

なおコード解説は
コードの説明
1. authenticate_drive関数: token.jsonを使用してGoogle Drive APIに認証し、Driveサービスを取得
2. list_pdf_files_in_folder関数: 指定したフォルダID内のPDFファイルをリスト化
3. download_pdf_from_drive関数: 各PDFファイルをダウンロードし、バイナリデータを返す
4. extract_text_from_pdf関数: pdfminer.sixを使ってPDFデータからテキストを抽出
5. process_text関数: 抽出したテキストを処理します。ここに任意の処理ロジックを追加
6. process_pdfs_in_folder関数: フォルダ内の全PDFファイルを処理し、各PDFのテキストを抽出後にprocess_text関数に渡す
これで、抽出されたテキストを直接処理する

なおGPTで生成したコードでは

SCOPES = ['https://www.googleapis.com/auth/drive.readonly']

というスコープだが
これだと認証エラーになる

    SCOPES = ['https://www.googleapis.com/auth/drive']

で回避可能

これは google カレンダーの時も同様で

SCOPES = ['https://www.googleapis.com/auth/calendar']

なら認証エラーにならない

Google Drive APIで新規フォルダ作成

Google Drive API を使い、my drive のなかに Schoolフォルダを作成したい
さらにSchoolフォルダのなかにReadフォルダを作成したい

ここへPDFファイルを置いて
このなかにファイルがあるならOCRしてカレンダーに予定を書き込み
処理終了したら ReadフォルダにPDFファイルを移動する

この時に同じフォルダがあると同じフォルダが複数できるらしいので
チェック機能を追加する

なので
Google Drive API を使用して My Drive 内に「School」フォルダを作成(または既存のものを使用)し、その中に「Read」フォルダを作成する

pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib

で必要なライブラリを入れる

Google Cloud Console でプロジェクトを作成し、Google Drive API を有効化も忘れずに

from __future__ import print_function
import os.path
from google.oauth2 import credentials
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request

# 認証と API クライアントのセットアップ
SCOPES = ['https://www.googleapis.com/auth/drive']

def main():
    """Google Drive API に接続し、フォルダを作成または取得します。"""
    creds = None
    # token.json はユーザーのアクセストークンとリフレッシュトークンを保存します。初回実行時に自動的に作成されます。
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    # 資格情報がないか、無効または期限切れの場合は再ログインします。
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            # ユーザーにブラウザで認証してもらいます。
            flow = InstalledAppFlow.from_client_secrets_file(
                'credentials.json', SCOPES)
            creds = flow.run_local_server(port=0)
        # 認証情報を保存します。
        with open('token.json', 'w') as token:
            token.write(creds.to_json())

    # Drive API クライアントを作成
    service = build('drive', 'v3', credentials=creds)

    # 1. "School" フォルダを取得または作成
    school_folder_id = get_or_create_folder(service, 'School', parent_id=None)

    # 2. "Read" フォルダを "School" フォルダ内に作成
    read_folder_id = get_or_create_folder(service, 'Read', parent_id=school_folder_id)

    print('School フォルダの ID: %s' % school_folder_id)
    print('Read フォルダの ID: %s' % read_folder_id)

def get_or_create_folder(service, folder_name, parent_id=None):
    """
    フォルダを取得または作成します。

    Parameters:
        service: Drive API サービス インスタンス。
        folder_name (str): フォルダの名前。
        parent_id (str): 親フォルダの ID(省略可能)。

    Returns:
        str: フォルダの ID。
    """
    # フォルダを検索
    query = "name='{}' and mimeType='application/vnd.google-apps.folder' and trashed=false".format(folder_name)
    if parent_id:
        query += " and '{}' in parents".format(parent_id)

    response = service.files().list(
        q=query,
        spaces='drive',
        fields='files(id, name)',
    ).execute()
    files = response.get('files', [])

    if files:
        # 既存のフォルダが見つかった場合
        folder_id = files[0]['id']
        print('既存の "{}" フォルダの ID を使用します: {}'.format(folder_name, folder_id))
    else:
        # フォルダが存在しない場合、新規作成
        file_metadata = {
            'name': folder_name,
            'mimeType': 'application/vnd.google-apps.folder',
        }
        if parent_id:
            file_metadata['parents'] = [parent_id]

        file = service.files().create(body=file_metadata, fields='id').execute()
        folder_id = file.get('id')
        print('新規に "{}" フォルダを作成しました。ID: {}'.format(folder_name, folder_id))

    return folder_id

if __name__ == '__main__':
    main()

を実行したが

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/quickstart.py", line 59, in <module>
    main()
  File "/Users/snowpool/aw10s/week_calendar_voice/quickstart.py", line 26, in main
    creds.refresh(Request())
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

となる

認証関連でエラーになるので
一度検証

mkdir g_drive
cd g_drive 
cp ../credentials.json .
cp ../create_folder.py .

python create_folder.py 

で実行すると
認証画面になるので進めていく

これで Google Drive APIが使えて
新規フォルダ作成までできた

次はPDFファイルを読み込んでテキストを抽出する

それができたら
抽出した内容をollamaに渡してカレンダーに入れる

一度テキストファイルに書き出すかGPTで調べたが
内容の多さを基準にする場合、以下の点を目安に考えると良いでしょう。

### 1. **APIの入力制限**
– Ollama APIや多くのテキスト処理APIは、入力テキストの長さに制限があります。
一般的に**数千文字以内**であれば直接渡しても問題ないことが多いです。
– 確認しているAPIの制限が**4,000〜8,000文字**程度であれば、
1〜2ページ程度の短めのPDFであれば直接渡せます。
内容がこの文字数を超える場合は、分割するかファイルで管理した方が良いでしょう。

### 2. **テキストの内容**
– **数百文字〜数千文字程度**であれば、直接渡してもスムーズに処理できます。
一般的には、A4サイズのPDF1〜2ページ程度です。
– 内容が**5ページ以上**ある場合や、**10,000文字以上の大量のテキスト**になる場合は、
テキストファイルに保存し分割しながら処理した方が管理が簡単です。

### 3. **利用シナリオ**
– **リアルタイム性が重要**な場合:数千文字までで、
可能であれば直接APIに渡す方がシンプルです。
– **大規模なドキュメントや長文の内容を確認しながら処理したい**場合:
ファイルに書き出して保存し、分割して処理する方が確認がしやすくなります。

### 具体的な基準
– **3,000文字以下**:直接Ollamaに渡す
– **3,000〜10,000文字程度**:可能であれば直接渡すが、ファイルに書き出して保存も検討
– **10,000文字以上**:テキストファイルに書き出し、必要に応じて分割して処理

例えば、10,000文字はおおよそA4サイズのPDFファイルで5ページ前後に相当します。
この文字数を基準に、リアルタイム性や内容の確認のしやすさに応じて直接渡すか、
ファイルに保存してから渡すかを判断すると良いでしょう。

ということなので
PDFの処理結果を直接渡すことにする

WTIの価格を得る

WTIの価格を得る

pip install yfinance

でインストール後

import yfinance as yf

# リアルタイムのWTI価格を取得
wti = yf.Ticker("CL=F")
price = wti.history(period="1d")['Close'].iloc[-1]  # 最新の終値
print(f"Current WTI Price: ${price}")

とすると

[*********************100%***********************]  1 of 1 completed
                 Open       High        Low      Close  Adj Close  Volume
Date                                                                     
2023-01-03  80.570000  81.500000  76.599998  76.930000  76.930000  338520
2023-01-04  77.250000  77.419998  72.730003  72.839996  72.839996  352434
2023-01-05  73.250000  74.919998  72.459999  73.669998  73.669998  300731
2023-01-06  73.970001  75.470001  73.239998  73.769997  73.769997  258128
2023-01-09  73.470001  76.739998  73.470001  74.629997  74.629997  329290

と去年のものが出る

wti.history(period=”1d”) が1日分のデータではなく、
日次の複数日分のデータを取得しています。
これは、yfinanceが日次の過去データを返しているためです。

リアルタイムの価格を取得したい場合は、
次のように interval=”1m” を指定して、
最新の1分ごとのデータを取得することができます。
また、tail(1)で最新の行だけを取得するようにすると良い
とのことなのでコード変更

import yfinance as yf

# リアルタイムのWTI価格を取得(1分間隔の最新データ)
wti = yf.Ticker("CL=F")
data = wti.history(period="1d", interval="1m").tail(1)  # 最新の1分データ
price = data['Close'].iloc[-1]  # 最新の終値
print(f"Current WTI Price: ${price}")

これなら

Current WTI Price: $75.55999755859375

となる

1ヶ月間のデータを取得し折線グラフで表示
yfinance を使用して過去1ヶ月間の日次データを取得し、
matplotlib を使用して折れ線グラフを描画

import yfinance as yf
import matplotlib.pyplot as plt

# WTIデータを取得(1ヶ月間の日次データ)
wti = yf.Ticker("CL=F")
data = wti.history(period="1mo", interval="1d")

# 折れ線グラフの描画
plt.figure(figsize=(12, 6))
plt.plot(data.index, data['Close'], marker='o', linestyle='-', color='b')
plt.title('WTI Price Trend Over Last Month')
plt.xlabel('Date')
plt.ylabel('WTI Price ($)')
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()

これで表示される

次にスクロール機能をつける

pip install plotly

の後

import yfinance as yf
import plotly.graph_objs as go

# WTIデータを取得(3年間の月次データ)
wti = yf.Ticker("CL=F")
data_3yr = wti.history(period="3y", interval="1mo")

# Plotlyグラフの作成
fig = go.Figure()

# 折れ線グラフを追加
fig.add_trace(go.Scatter(x=data_3yr.index, y=data_3yr['Close'], mode='lines+markers', name='WTI Price'))

# グラフのレイアウト設定
fig.update_layout(
    title='WTI Price Trend Over Last 3 Years',
    xaxis_title='Date',
    yaxis_title='WTI Price ($)',
    xaxis_rangeslider_visible=True,  # スクロールバーを追加
)

# グラフを表示
fig.show()

これだと出ない

1年にしてみる

import yfinance as yf
import plotly.graph_objs as go

# WTIデータを取得(1年間の月次データ)
wti = yf.Ticker("CL=F")
data_1yr = wti.history(period="1y", interval="1mo")

# Plotlyグラフの作成
fig = go.Figure()

# 折れ線グラフを追加
fig.add_trace(go.Scatter(x=data_1yr.index, y=data_1yr['Close'], mode='lines+markers', name='WTI Price'))

# グラフのレイアウト設定
fig.update_layout(
    title='WTI Price Trend Over Last 1 Year',
    xaxis_title='Date',
    yaxis_title='WTI Price ($)',
    xaxis_rangeslider_visible=True,  # スクロールバーを追加
)

# グラフを表示
fig.show()

これならOK

Googleカレンダーの読み上げ

Googleカレンダーの読み上げ

一週間の予定の取得はできたので
次はvoicevox で読み上げをする

今回も docker で起動して実験する

その前に

import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# カレンダーAPIのスコープ
SCOPES = ['https://www.googleapis.com/auth/calendar']

def main():
    """今週の残りのGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return

    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)

    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')

    # 現在の日時を取得
    now = datetime.datetime.now(tz)

    # 今週の終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)

    # time_minを現在の日時に設定
    time_min = now.isoformat()
    # time_maxは今週の終了日時
    time_max = end_of_week.isoformat()

    print(f"{time_min} から {time_max} までの予定を取得します。")

    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    if not events:
        print('今週の残りの予定はありません。')
    else:
        print('今週の残りの予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")

if __name__ == '__main__':
    main()

をモジュール化する

カレンダーの予定を取得する関数を別ファイルとして整理し、他のスクリプトからインポートできるようにする

touch calendar_utils.py

import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# カレンダーAPIのスコープ
SCOPES = ['https://www.googleapis.com/auth/calendar']

def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None

def get_upcoming_events(creds, days=7):
    """指定された期間内のGoogleカレンダーの予定を取得します。"""
    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)

    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)

    # 開始日と終了日を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)

    # 開始日時と終了日時を設定
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()

    print(f"{time_min} から {time_max} までの予定を取得します。")

    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    return events

これで
calendar_utils.pyに、カレンダー認証を行うauthenticate関数と、
指定された期間の予定を取得するget_upcoming_events関数を作成

また

def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None


Noneを返した場合(トークンがない場合)や、予定がない場合の処理をする

次に
メインスクリプトからモジュールをインポートして使用

touch main_script.py

内容は

from calendar_utils import authenticate, get_upcoming_events

def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                print(f"{start} - {summary}")

if __name__ == '__main__':
    main()

これで

python main_script.py 
2024-10-11T00:06:41.572834+09:00 から 2024-10-14T00:06:41.572834+09:00 までの予定を取得します。
今週の残りの予定はありません。

というように同じ結果が出ればOK

次に voievox
これは以前作成したプロジェクトの中を参考に行う

まず

ssh -i .ssh/ubuntu22 snowpool@192.168.1.69

でログイン

docker run -d  -p '192.168.1.69:50021:50021' voicevox/voicevox_engine:cpu-ubuntu20.04-lates

で起動

次にコード変更
calendar_utils.pyに音声合成の関数を追加し、カレンダー予定をVoiceVox経由で音声ファイルとして保存

import os
import datetime
import pytz
import requests
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# カレンダーAPIのスコープ
SCOPES = ['https://www.googleapis.com/auth/calendar']
VOICEVOX_API_URL = "http://192.168.1.69:50021"  # VoiceVoxのAPIサーバーURL

def authenticate():
    """Google Calendar APIの認証を行います。"""
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None

def get_upcoming_events(creds, days=7):
    """指定された期間内のGoogleカレンダーの予定を取得します。"""
    service = build('calendar', 'v3', credentials=creds)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()

    print(f"{time_min} から {time_max} までの予定を取得します。")

    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    return events

def synthesize_speech(text, speaker=1):
    """VoiceVox APIを使って音声合成を行い、音声ファイルを生成します。"""
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
        else:
            print("音声の生成に失敗しました")
    else:
        print("クエリの作成に失敗しました")

次に

カレンダーの予定を取得し、各予定を音声に変換する処理をメインスクリプトに追加

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech

def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            synthesize_speech("今週の残りの予定はありません。")
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                event_text = f"{start} - {summary}"
                print(event_text)
                synthesize_speech(event_text)

if __name__ == '__main__':
    main()

実行すると音声ファイルが作成されるが
2024-10-11 – 診断書の取得
の場合はそのまm数字を読み上げるため
年月日に変換が必要
できれば曜日もほしい

Google Calendar APIでは、曜日自体を直接返す項目はありませんが、
予定の開始日時がISO形式の文字列として返されるので、
これをPythonで処理して曜日を取得することができます。
datetimeオブジェクトを使用すれば、APIから取得した日時を簡単に曜日に変換できます
とのこと

日時を変換する関数を追加し、
YYYY-MM-DD形式の日時を「YYYY年M月D日(曜日)」
のように整形してVoiceVoxで読み上げるようにする

calendar_utils.py

import os
import datetime
import pytz
import requests
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

SCOPES = ['https://www.googleapis.com/auth/calendar']
VOICEVOX_API_URL = "http://192.168.1.69:50021"  # VoiceVoxのAPIサーバーURL

def authenticate():
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        return creds
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return None

def get_upcoming_events(creds, days=7):
    service = build('calendar', 'v3', credentials=creds)
    tz = pytz.timezone('Asia/Tokyo')
    now = datetime.datetime.now(tz)
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=days)
    time_min = now.isoformat()
    time_max = end_of_week.isoformat()

    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    return events

def format_date_with_weekday(date_str):
    """ 日付文字列を「YYYY年M月D日(曜日)」形式に変換します """
    date_obj = datetime.datetime.fromisoformat(date_str)
    # 曜日を日本語で取得
    weekday = date_obj.strftime("%A")
    weekday_dict = {
        "Monday": "月",
        "Tuesday": "火",
        "Wednesday": "水",
        "Thursday": "木",
        "Friday": "金",
        "Saturday": "土",
        "Sunday": "日"
    }
    weekday_jp = weekday_dict.get(weekday, weekday)  # 日本語の曜日に変換
    formatted_date = date_obj.strftime(f"%Y年%m月%d日({weekday_jp})")
    return formatted_date

def synthesize_speech(text, speaker=1):
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
        else:
            print("音声の生成に失敗しました")
    else:
        print("クエリの作成に失敗しました")

として

main_script.py

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday

def main():
    creds = authenticate()
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            synthesize_speech("今週の残りの予定はありません。")
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                synthesize_speech(event_text)

if __name__ == '__main__':
    main()

として保存

これで実行すると

2024年10月11日(金) - 診断書の取得

というように目的通りの音声が作成された

次は作成した音声を再生できるようにする
この時に今週の予定をお知らせします
という音声ファイルを再生するようにする

これは以前作成したものを使う

touch create_voice.py

import subprocess
import sys

def generate_and_play_audio_from_text(file_path, server_ip):
    # テキストファイルからテキストを読み込む
    with open(file_path, 'r') as file:
        text = file.read()

    # JSONファイルを作成するためのcurlコマンド
    command_json = [
        "curl", "-s", "-X", "POST",
        f"http://{server_ip}/audio_query?speaker=1",
        "--get", "--data-urlencode", f"text={text}"
    ]

    # 音声ファイルを作成するためのcurlコマンド
    command_audio = [
        "curl", "-s", "-H", "Content-Type: application/json", "-X", "POST",
        "-d", "@query.json", f"http://{server_ip}/synthesis?speaker=1"
    ]

    # JSONファイルと音声ファイルを作成
    with open('query.json', 'w') as file:
        subprocess.run(command_json, stdout=file)
    with open('audio_output.wav', 'wb') as file:
        subprocess.run(command_audio, stdout=file)

    # 音声ファイルを再生
    subprocess.run(["afplay", "audio_output.wav"])

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python script.py <file_path> <server_ip>")
        sys.exit(1)
    
    file_path = sys.argv[1]
    server_ip = sys.argv[2]
    generate_and_play_audio_from_text(file_path, server_ip)

として保存

touch voice.txt 

で中身を

今週の予定をお知らせします

として保存

python create_voice.py voice.txt 192.168.1.69:50021

とすれば

audio_output.wav

が作成される

これをnotice.wavにファイル名を変更する

そしてこれを再生するようにする

生成された音声ファイルを再生するには、
Pythonのsubprocessモジュールやplaysoundライブラリなどを利用する方法がある

pip install playsound

でインストール

一週間分の予定を表示した後、保存した音声ファイルを順番に再生する
音声ファイルのリストを作成: 各イベントの音声ファイルを生成した後、
そのファイル名をaudio_filesリストに追加

予定の表示後にファイルを再生:
audio_filesリストの各ファイルを順番に再生しています。
playsound関数を使って音声ファイルを再生し、全ての音声が順番に再生

これで、カレンダーの予定を表示した後に、順次生成された音声ファイルを再生

残りの予定がない場合に音声が再生されないように、
audio_filesリストにファイルが追加されているか確認する条件を追加します。
audio_filesが空でない場合のみ再生処理を行うように、コードを修正

予定がないときは音声ファイルを再生せず、予定がある場合のみリスト内の音声ファイルを再生

予定があり音声ファイルを再生する前に notice.wav を再生

このためには

calendar_utils.py

を修正する

# 音声合成の関数を修正して、生成されたファイル名を返すようにします
def synthesize_speech(text, speaker=1):
    params = {'text': text, 'speaker': speaker}
    response = requests.post(f"{VOICEVOX_API_URL}/audio_query", params=params)
    if response.status_code == 200:
        query_data = response.json()
        synthesis_response = requests.post(f"{VOICEVOX_API_URL}/synthesis", params={'speaker': speaker}, json=query_data)
        if synthesis_response.status_code == 200:
            filename = f"event_voice_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.wav"
            with open(filename, "wb") as f:
                f.write(synthesis_response.content)
            print(f"音声ファイルを生成しました: {filename}")
            return filename  # 生成されたファイル名を返す
        else:
            print("音声の生成に失敗しました")
            return None
    else:
        print("クエリの作成に失敗しました")
        return None

そして

main_script.py

from calendar_utils import authenticate, get_upcoming_events, synthesize_speech, format_date_with_weekday
from playsound import playsound

def main():
    creds = authenticate()
    audio_files = []  # 音声ファイルのリスト
    if creds:
        events = get_upcoming_events(creds)
        if not events:
            print('今週の残りの予定はありません。')
            # 音声ファイルは再生しない
        else:
            print('今週の残りの予定:')
            for event in events:
                start = event['start'].get('dateTime', event['start'].get('date'))
                summary = event.get('summary', '(タイトルなし)')
                formatted_date = format_date_with_weekday(start)
                event_text = f"{formatted_date} - {summary}"
                print(event_text)
                filename = synthesize_speech(event_text)
                if filename:
                    audio_files.append(filename)  # 生成されたファイル名をリストに追加

        # 音声ファイルが存在する場合のみ notice.wav と各予定の音声を再生
        if audio_files:
            # notice.wavを最初に再生
            print("再生中: notice.wav")
            playsound("notice.wav")
            
            # 各予定の音声ファイルを再生
            for audio_file in audio_files:
                print(f"再生中: {audio_file}")
                playsound(audio_file)

if __name__ == '__main__':
    main()

として保存

これで実行すると

今週の予定をお知らせします
2024年10月11日(金) - 診断書の取得

というように読み上げてくれる

TripoSR を Colab で使うメモ

TripoSR を Colab で使うメモ

実行環境 Google Colab Pro+

!git clone -b dev https://github.com/camenduru/TripoSR-hf
%cd /content/TripoSR-hf

でダウンロード

libnvinfer.so.10
libcudnn_adv.so.9
を colabへインストール

!apt-get update
!apt-get install -y libnvinfer8 libnvinfer-plugin8 libnvinfer-dev

!apt-get install -y libcudnn9-cuda-12 libcudnn9-dev-cuda-12 libcudnn9-static-cuda-12

テストのため

!apt-get install libcudnn9-samples

!cd /usr/src/cudnn_samples_v9/mnistCUDNN
!make clean && make
!./mnistCUDNN

!apt-get update
!apt-get install -y libfreeimage-dev

!cd /usr/src/cudnn_samples_v9/mnistCUDNN && make clean && make

!cd /usr/src/cudnn_samples_v9/mnistCUDNN && ./mnistCUDNN

これで
Test passed!
がでればOK

!pip install onnxruntime
!pip install onnxruntime-gpu

でライブラリインストール

import onnxruntime as ort

providers = ort.get_available_providers()
print("Available providers:", providers)

を実行し

Available providers: ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']

となればOK

# Make sure that your environment meets the requirements above
!pip install git+https://github.com/tatsy/torchmcubes.git

でインストール

!pip install tensorrt

でライブラリインストール

!pip install -r requirements.txt

で必要なパッケージインストール

!pip install transformers -U
!pip install huggingface_hub -U

!transformers-cli cache clear

!pip install gradio==5.7.0 gradio-client==1.5.0 diffusers==0.15.0 sentence-transformers==2.2.2
!export LD_LIBRARY_PATH=/path/to/TensorRT/lib:$LD_LIBRARY_PATH

でパスを通しておく

!python app.py

で起動

To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
/content/TripoSR-hf/tsr/system.py:69: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.
  ckpt = torch.load(weight_path, map_location="cpu")
2024-12-07 21:34:10.493703261 [E:onnxruntime:Default, provider_bridge_ort.cc:1848 TryGetProviderInfo_TensorRT] /onnxruntime_src/onnxruntime/core/session/provider_bridge_ort.cc:1539 onnxruntime::Provider& onnxruntime::ProviderLibrary::Get() [ONNXRuntimeError] : 1 : FAIL : Failed to load library libonnxruntime_providers_tensorrt.so with error: libnvinfer_plugin.so.10: cannot open shared object file: No such file or directory

*************** EP Error ***************
EP Error /onnxruntime_src/onnxruntime/python/onnxruntime_pybind_state.cc:507 void onnxruntime::python::RegisterTensorRTPluginsAsCustomOps(PySessionOptions&, const onnxruntime::ProviderOptions&) Please install TensorRT libraries as mentioned in the GPU requirements page, make sure they're in the PATH or LD_LIBRARY_PATH, and that your GPU is supported.
 when using ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']
Falling back to ['CUDAExecutionProvider', 'CPUExecutionProvider'] and retrying.
****************************************
* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: 

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
Could not load symbol cuFuncGetName. Error: /usr/lib64-nvidia/libcuda.so.1: undefined symbol: cuFuncGetName

とエラーが出るけサンプルは動いた

操作は

【超便利】TripoSRで画像から3Dを生成する方法とは?
を参考に
なお写真をアップロードすれば撮影した画像から3Dモデルの作成が可能

Google Calendarの週間予定の取得

週間予定の取得

実行環境
M1 MacbookAir 16GB

import os
import datetime
import pytz
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

# カレンダーAPIのスコープ
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']

def main():
    """今週の残りのGoogleカレンダーの予定を取得して表示します。"""
    creds = None
    # 既存のトークンファイルを使用
    if os.path.exists('token.json'):
        creds = Credentials.from_authorized_user_file('token.json', SCOPES)
    else:
        print("トークンファイルが見つかりません。認証を実行してください。")
        return

    # Google Calendar APIサービスを構築
    service = build('calendar', 'v3', credentials=creds)

    # タイムゾーンの設定(日本時間)
    tz = pytz.timezone('Asia/Tokyo')

    # 現在の日時を取得
    now = datetime.datetime.now(tz)

    # 今週の終了日(日曜日)を計算
    start_of_week = now - datetime.timedelta(days=now.weekday())
    end_of_week = start_of_week + datetime.timedelta(days=7)

    # time_minを現在の日時に設定
    time_min = now.isoformat()
    # time_maxは今週の終了日時
    time_max = end_of_week.isoformat()

    print(f"{time_min} から {time_max} までの予定を取得します。")

    # イベントを取得
    events_result = service.events().list(
        calendarId='primary',
        timeMin=time_min,
        timeMax=time_max,
        singleEvents=True,
        orderBy='startTime'
    ).execute()
    events = events_result.get('items', [])

    if not events:
        print('今週の残りの予定はありません。')
    else:
        print('今週の残りの予定:')
        for event in events:
            start = event['start'].get('dateTime', event['start'].get('date'))
            summary = event.get('summary', '(タイトルなし)')
            print(f"{start} - {summary}")

if __name__ == '__main__':
    main()

を実行したが

2024-10-09T01:28:10.960984+09:00 から 2024-10-14T01:28:10.960984+09:00 までの予定を取得します。
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/week_calendar_voice/get_week.py", line 60, in <module>
    main()
  File "/Users/snowpool/aw10s/week_calendar_voice/get_week.py", line 47, in main
    ).execute()
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/googleapiclient/http.py", line 923, in execute
    resp, content = _retry_request(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/googleapiclient/http.py", line 191, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google_auth_httplib2.py", line 209, in request
    self.credentials.before_request(self._request, method, uri, request_headers)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/auth/credentials.py", line 135, in before_request
    self.refresh(request)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/credentials.py", line 335, in refresh
    ) = reauth.refresh_grant(
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/reauth.py", line 351, in refresh_grant
    _client._handle_error_response(response_data, retryable_error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/google/oauth2/_client.py", line 73, in _handle_error_response
    raise exceptions.RefreshError(
google.auth.exceptions.RefreshError: ('invalid_scope: Bad Request', {'error': 'invalid_scope', 'error_description': 'Bad Request'})

となる

Chatgpt では

SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']

となっているが

SCOPES = ['https://www.googleapis.com/auth/calendar']

というようにスコープを変更したら解決した

これはカレンダー追加がこのスコープで動作するのに
取得の場合はコードが違うので試したら動いた