linebotへ送信

linebotへ送信

実行環境
M1 MacbookAir 16GB

Ollamaで修正されたテキストをLINE botに送信するため
既存のLINE Notifyの処理と合わせて、修正された文章をLINE botに送信する処理を実装

LINE botにメッセージを送信するには、LINE Messaging APIを利用する

LINE botにメッセージを送信するためのモジュールを作る

touch line_bot_sender.py

中身は

# line_bot_sender.py
import requests
import json

class LineBotSender:
    def __init__(self, config_file_path):
        self.config = self._load_config(config_file_path)
        self.channel_access_token = self.config["line_bot_channel_access_token"]
        self.user_id = self.config["line_bot_user_id"]

    def _load_config(self, config_file_path):
        with open(config_file_path, 'r') as file:
            return json.load(file)

    def send_message(self, message):
        url = "https://api.line.me/v2/bot/message/push"
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.channel_access_token}"
        }
        payload = {
            "to": self.user_id,
            "messages": [
                {
                    "type": "text",
                    "text": message
                }
            ]
        }
        response = requests.post(url, headers=headers, json=payload)
        if response.status_code != 200:
            raise Exception(f"Error sending message to LINE bot: {response.status_code}, {response.text}")

として保存

config.json にLINE botの情報を追加
{
    "line_notify_token": "YOUR_LINE_NOTIFY_TOKEN",
    "ollama_model": "elyza:jp8b",
    "line_bot_channel_access_token": "YOUR_LINE_BOT_CHANNEL_ACCESS_TOKEN",
    "line_bot_user_id": "TARGET_USER_ID"
}

linebot useridがわからなかったため検索

開発者が自分自身のユーザーIDを取得する

を参考に

LINE Developersコンソール

基本情報のプロバイダーIDを入れてみる

次にOllamaで修正されたテキストをLINE botに送信するために、メインスクリプトにLineBotSenderを組み込む

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time
from line_notify import LineNotify  # 作成したLineNotifyモジュールをインポート
from ollama_text_correction import OllamaTextCorrector  # Ollamaによる修正モジュールをインポート

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    # LINE Notifyのモジュールを初期化(config.jsonからトークンを読み込む)
    line_notify = LineNotify("config.json")
    
    # Ollamaのテキスト修正モジュールを初期化
    text_corrector = OllamaTextCorrector("config.json")

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()

        # 処理が10秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 10:
            print("10秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        
        # Ollamaでテキストを構成
        corrected_text = text_corrector.correct_text(text)
        
        if corrected_text:  # Noneが返された場合はスキップ
            recognized_texts.append(corrected_text)
            print(corrected_text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    if recognized_texts:
        message = "\n".join(recognized_texts)
        print("\n入力された音声テキスト一覧:")
        print(message)

        # LINE Notifyでメッセージを送信
        line_notify.send(f"入力された音声テキスト一覧:\n{message}")
    else:
        print("入力メッセージはありませんでした")

if __name__ == "__main__":
    main()

を書き換えるとプロンプト対処があとで困るので
これをコピーし main2.pyとしてコードを記述

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time
from line_notify import LineNotify  # 作成したLineNotifyモジュールをインポート
from ollama_text_correction import OllamaTextCorrector  # Ollamaによる修正モジュールをインポート
from line_bot_sender import LineBotSender  # LINE bot送信用のモジュールをインポート

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    # LINE Notifyのモジュールを初期化(config.jsonからトークンを読み込む)
    line_notify = LineNotify("config.json")
    
    # Ollamaのテキスト修正モジュールを初期化
    text_corrector = OllamaTextCorrector("config.json")
    
    # LINE bot送信用のモジュールを初期化
    line_bot_sender = LineBotSender("config.json")

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()

        # 処理が10秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 10:
            print("10秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        
        # Ollamaでテキストを構成
        corrected_text = text_corrector.correct_text(text)
        
        if corrected_text:  # Noneが返された場合はスキップ
            recognized_texts.append(corrected_text)
            print(corrected_text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    if recognized_texts:
        message = "\n".join(recognized_texts)
        print("\n入力された音声テキスト一覧:")
        print(message)

        # LINE Notifyでメッセージを送信
        line_notify.send(f"入力された音声テキスト一覧:\n{message}")
        
        # LINE botで修正されたテキストを送信
        line_bot_sender.send_message(f"修正された音声テキスト:\n{message}")
    else:
        print("入力メッセージはありませんでした")

if __name__ == "__main__":
    main()

これで実行したら

入力された音声テキスト一覧:
本日の天候
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/linebot/main2.py", line 63, in <module>
    main()
  File "/Users/snowpool/aw10s/linebot/main2.py", line 58, in main
    line_bot_sender.send_message(f"修正された音声テキスト:\n{message}")
  File "/Users/snowpool/aw10s/linebot/line_bot_sender.py", line 32, in send_message
    raise Exception(f"Error sending message to LINE bot: {response.status_code}, {response.text}")
Exception: Error sending message to LINE bot: 400, {"message":"The property, 'to', in the request body is invalid (line: -, column: -)"}

となった

config.jsonに
  "channel_secret": "YOUR_CHANNEL_SECRET",

を追加する

次に
line_bot_sender.pyを修正する
line_bot_sender.py ファイルで、
send_message メソッド内のリクエストボディに ‘to’ フィールドを追加し、
config.jsonから読み込んだ user_id を設定

    def send_message(self, message):
        url = "https://api.line.me/v2/bot/message/push"
        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.channel_access_token}"
        }
        payload = {
            "to": self.user_id,
            "messages": [
                {
                    "type": "text",
                    "text": message
                }
            ]
        }
        response = requests.post(url, headers=headers, json=payload)
        if response.status_code != 200:
            raise Exception(f"Error sending message to LINE bot: {response.status_code}, {response.text}")

def send_message(self, message):
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {self.channel_access_token}'
    }
    data = {
        "to": self.user_id,  # 追加
        "messages": [
            {
                "type": "text",
                "text": message
            }
        ]
    }
    response = requests.post(self.api_endpoint, headers=headers, json=data)
    if response.status_code != 200:
        raise Exception(f"Error sending message to LINE bot: {response.status_code}, {response.text}")

に変更

line_bot_sender.py のクラス内で user_id を読み込むように変更

# line_bot_sender.py
import requests
import json

class LineBotSender:
    def __init__(self, config_path):
        with open(config_path, 'r') as file:
            config = json.load(file)
        self.channel_access_token = config.get('channel_access_token')
        self.channel_secret = config.get('channel_secret')
        self.user_id = config.get('user_id')  # 追加
        self.api_endpoint = 'https://api.line.me/v2/bot/message/push'  # push APIを使用


    def _load_config(self, config_file_path):
        with open(config_file_path, 'r') as file:
            return json.load(file)
    def send_message(self, message):
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {self.channel_access_token}'
        }
        data = {
            "to": self.user_id,  # 追加
            "messages": [
                {
                    "type": "text",
                    "text": message
                }
            ]
        }
        response = requests.post(self.api_endpoint, headers=headers, json=data)
        if response.status_code != 200:
            raise Exception(f"Error sending message to LINE bot: {response.status_code}, {response.text}")

というようにした

これだけだとまだwebhook設定ができていないのでエラーになる

LINEBotの設定

LINEBotの設定

udemyの講習
ChatGPTを用いたLINEBot開発入門-基本的な機能からPDFを用いたQ&Aまで、クラウド上での開発方法を徹底解説
これによれば
LINEから cloud functionsのURLにリクエスト送信

これはトリガーとしてcloud functions で関数が実行される

今回はLINEからのhttpリクエストをトリガーとして実行される

LINEではユーザがメッセージを送ると
指定したURLにリクエストを送信するwebhookという機能がある

このwebhookを使うことで
Lineから CFにリクエスト送信できる

CFを実行すると
返答文章が生成されるので
それをLINEサーバーに返すことで
ユーザのトークルームに返答が表示される

この場合CFで行っているのは2つ
LINEからのリクエストを受け取る
文章を生成してLINEに返答文を送信

CF側では
リクエストがLINEから来てるのか認証する
あと
返答文を返す時に
LINEのトークルームにメッセージを送る権限があるかを認証する

CFとLINE連携には認証が必要

これは
Channel secret
LINEからのメッセージの証明
Channel access token
LINEに権限を持っている証明
が必要

これらの認証関連は
MessaginAPIを使う

https://developers.line.biz/ja/
にログイン

プロバイダを作成するので
任意の名前を入力

会社・事業者の所在国・地域
を日本にして

チャネル名を設定

チャネル説明

音声入力でLINEを行い、返信も音声で行う

大業種

個人でOK

小業種

個人(ITコンピュータ)
とした

次に
MessagingAPI 設定をクリック

webhookは後でfirebaseを設定してURLを取得する

チャンネルアクセストークンを発行する

チャネル基本設定で
チャネルシークレットの取得ができる

Ollamaで音声入力テキストの修正をする

実行環境
M1 MacbookAir 16GB

Ollamaで音声入力テキストの修正をする

Ollamaを使って日本語として自然な文章に構成する部分を追加

 touch ollama_text_correction.py

でファイルを作成

# ollama_text_correction.py
import requests
import json

class OllamaTextCorrector:
    def __init__(self, config_file_path):
        self.config = self._load_config(config_file_path)
        self.model = self.config.get("ollama_model", "elyza:jp8b")  # 使用するモデルを指定
    
    def _load_config(self, config_file_path):
        with open(config_file_path, 'r') as file:
            config = json.load(file)
        return config
    
    def correct_text(self, text):
        url = "http://localhost:11434/api/generate"  # OllamaのAPIエンドポイント
        headers = {
            "Content-Type": "application/json"
        }
        payload = {
            "model": self.model,
            "prompt": f"以下の文を正しい日本語に構成してください:\n{text}"
        }
        response = requests.post(url, headers=headers, json=payload)
        if response.status_code == 200:
            corrected_text = response.json()["text"]
            return corrected_text.strip()
        else:
            raise Exception(f"Error from Ollama API: {response.status_code}, {response.text}")

config.json にOllamaのモデル設定を追加
{
  "token": "LINE notify の token",
  "ollama_model": "elyza:jp8b"
}

音声入力後にOllamaを使ってテキストを修正するように、メインスクリプトを更新

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time
from line_notify import LineNotify  # 作成したLineNotifyモジュールをインポート

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    # LINE Notifyのモジュールを初期化(config.jsonからトークンを読み込む)
    line_notify = LineNotify("config.json")

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()

        # 処理が2秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("2秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        recognized_texts.append(text)  # テキストをリストに追加
        print(text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    if recognized_texts:
        message = "\n".join(recognized_texts)
        print("\n入力された音声テキスト一覧:")
        print(message)

        # LINE Notifyでメッセージを送信
        line_notify.send(f"入力された音声テキスト一覧:\n{message}")
    else:
        print("入力メッセージはありませんでした")

if __name__ == "__main__":
    main()

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time
from line_notify import LineNotify  # 作成したLineNotifyモジュールをインポート
from ollama_text_correction import OllamaTextCorrector  # Ollamaによる修正モジュールをインポート

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    # LINE Notifyのモジュールを初期化(config.jsonからトークンを読み込む)
    line_notify = LineNotify("config.json")
    
    # Ollamaのテキスト修正モジュールを初期化
    text_corrector = OllamaTextCorrector("config.json")

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()

        # 処理が2秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("2秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        
        # Ollamaでテキストを構成
        corrected_text = text_corrector.correct_text(text)
        
        recognized_texts.append(corrected_text)  # 構成されたテキストをリストに追加
        print(corrected_text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    if recognized_texts:
        message = "\n".join(recognized_texts)
        print("\n入力された音声テキスト一覧:")
        print(message)

        # LINE Notifyでメッセージを送信
        line_notify.send(f"入力された音声テキスト一覧:\n{message}")
    else:
        print("入力メッセージはありませんでした")

if __name__ == "__main__":
    main()

に変更

しかし

[2024-09-14 00:48:22.490] [ctranslate2] [thread 305229] [warning] The compute type inferred from the saved model is float16, but the target device or backend do not support efficient float16 computation. The model weights have been automatically converted to use the float32 compute type instead.
stand by ready OK
recording...
finished
Traceback (most recent call last):
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/requests/models.py", line 963, in json
    return complexjson.loads(self.content.decode(encoding), **kwargs)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/json/decoder.py", line 340, in decode
    raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 2 column 1 (char 94)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/linebot/main.py", line 55, in <module>
    main()
  File "/Users/snowpool/aw10s/linebot/main.py", line 38, in main
    corrected_text = text_corrector.correct_text(text)
  File "/Users/snowpool/aw10s/linebot/ollama_text_correction.py", line 26, in correct_text
    corrected_text = response.json()["text"]
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/requests/models.py", line 971, in json
    raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)
requests.exceptions.JSONDecodeError: Extra data: line 2 column 1 (char 94)

となる

ollama_text_correction.py の
correct_text 関数にデバッグ用の出力を追加し、レスポンスの内容をテキストで表示

# ollama_text_correction.py
import requests
import json

class OllamaTextCorrector:
    def __init__(self, config_file_path):
        self.config = self._load_config(config_file_path)
        self.model = self.config.get("ollama_model", "elyza:jp8b")  # 使用するモデルを指定
    
    def _load_config(self, config_file_path):
        with open(config_file_path, 'r') as file:
            config = json.load(file)
        return config
    
    def correct_text(self, text):
        url = "http://localhost:11434/api/generate"  # OllamaのAPIエンドポイント
        headers = {
            "Content-Type": "application/json"
        }
        payload = {
            "model": self.model,
            "prompt": f"以下の文を正しい日本語に構成してください:\n{text}"
        }
        response = requests.post(url, headers=headers, json=payload)
        
        # レスポンスをテキストで表示して確認
        print(f"API Response: {response.text}")

        # レスポンスがJSONとして正しいか確認
        try:
            corrected_text = response.json()["text"]
            return corrected_text.strip()
        except json.JSONDecodeError as e:
            print(f"JSONDecodeError: {e}")
            return None

実行結果は

[2024-09-14 00:50:32.751] [ctranslate2] [thread 306834] [warning] The compute type inferred from the saved model is float16, but the target device or backend do not support efficient float16 computation. The model weights have been automatically converted to use the float32 compute type instead.
stand by ready OK
recording...
finished
API Response: {"model":"elyza:jp8b","created_at":"2024-09-13T15:50:43.828714Z","response":"独","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:43.915193Z","response":"り","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:43.999869Z","response":"言","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.084866Z","response":"のような","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.170081Z","response":"短","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.254747Z","response":"い","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.341826Z","response":"文章","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.428313Z","response":"ですが","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.513551Z","response":"、","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.599198Z","response":"問題","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.6867Z","response":"ない","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.775178Z","response":"です","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.864271Z","response":"。","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:44.951287Z","response":"正","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.037784Z","response":"しい","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.123048Z","response":"日本","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.21019Z","response":"語","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.297796Z","response":"に","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.384251Z","response":"構","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.471506Z","response":"成","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.56044Z","response":"した","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.64597Z","response":"文","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.732028Z","response":"は","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.821103Z","response":"次","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.908953Z","response":"の","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:45.997249Z","response":"通り","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:46.088031Z","response":"です","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:46.183558Z","response":"。\n\n","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:46.277991Z","response":"「","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:46.370159Z","response":"天","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:46.46265Z","response":"気","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:46.563932Z","response":"」","done":false}
{"model":"elyza:jp8b","created_at":"2024-09-13T15:50:46.664274Z","response":"","done":true,"done_reason":"stop","context":[128006,882,128007,271,88852,16144,17161,30512,37656,102800,102433,102158,20230,106391,13153,39926,72315,512,36827,95221,128009,128006,78191,128007,271,106063,31431,78244,120950,106649,16995,83125,119627,5486,109606,100604,38641,1811,37656,102800,102433,102158,20230,106391,13153,56051,17161,15682,33671,16144,121640,38641,3490,13177,36827,95221,10646],"total_duration":6901465958,"load_duration":40178541,"prompt_eval_count":26,"prompt_eval_duration":4023220000,"eval_count":33,"eval_duration":2836184000}

JSONDecodeError: Extra data: line 2 column 1 (char 94)
None
stand by ready OK
recording...
finished
10秒間音声が入力されなかったため、ループを終了します。
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/linebot/main.py", line 55, in <module>
    main()
  File "/Users/snowpool/aw10s/linebot/main.py", line 45, in main
    message = "\n".join(recognized_texts)
TypeError: sequence item 0: expected str instance, NoneType found

となる

このエラーは、2つの問題が原因です。
1. Ollama APIのレスポンスが複数行にわたっていることが原因:
response.json()が一つのJSONオブジェクトを期待しているのに対して、
複数行のJSONレスポンスが返されています。

これは、Ollamaが複数の部分に分けてレスポンスを返しているためです。
2. NoneTypeがrecognized_textsに含まれていることが原因:
correct_text関数でNoneが返され、
recognized_textsリストに追加されているため、TypeErrorが発生しています。

1. Ollamaのレスポンスを段階的に収集する:
複数のJSONオブジェクトが連続して返されている場合、手動でそれを収集し、
一つの完全なテキストに結合する処理を追加します。

2. Noneの扱いを改善する: Noneがリストに追加されないように修正します。

なので

ollama_text_correction.py

を修正

Ollamaのレスポンスを部分的に受け取り、テキストを結合するようにする

# ollama_text_correction.py
import requests
import json

class OllamaTextCorrector:
    def __init__(self, config_file_path):
        self.config = self._load_config(config_file_path)
        self.model = self.config.get("ollama_model", "elyza:jp8b")  # 使用するモデルを指定
    
    def _load_config(self, config_file_path):
        with open(config_file_path, 'r') as file:
            config = json.load(file)
        return config
    
    def correct_text(self, text):
        url = "http://localhost:11434/api/generate"  # OllamaのAPIエンドポイント
        headers = {
            "Content-Type": "application/json"
        }
        payload = {
            "model": self.model,
            "prompt": f"以下の文を正しい日本語に構成してください:\n{text}"
        }
        response = requests.post(url, headers=headers, json=payload)

        # レスポンスをテキストで確認して、複数の部分を結合
        full_response = ""
        for line in response.text.splitlines():
            try:
                json_line = json.loads(line)  # 各行をJSONとして処理
                if "response" in json_line:
                    full_response += json_line["response"]  # テキスト部分を結合
            except json.JSONDecodeError as e:
                print(f"JSONDecodeError: {e}")

        return full_response.strip() if full_response else None

そしてmain.py
Noneがリストに追加されないように修正

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time
from line_notify import LineNotify  # 作成したLineNotifyモジュールをインポート
from ollama_text_correction import OllamaTextCorrector  # Ollamaによる修正モジュールをインポート

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    # LINE Notifyのモジュールを初期化(config.jsonからトークンを読み込む)
    line_notify = LineNotify("config.json")
    
    # Ollamaのテキスト修正モジュールを初期化
    text_corrector = OllamaTextCorrector("config.json")

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()

        # 処理が10秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 10:
            print("10秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        
        # Ollamaでテキストを構成
        corrected_text = text_corrector.correct_text(text)
        
        if corrected_text:  # Noneが返された場合はスキップ
            recognized_texts.append(corrected_text)
            print(corrected_text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    if recognized_texts:
        message = "\n".join(recognized_texts)
        print("\n入力された音声テキスト一覧:")
        print(message)

        # LINE Notifyでメッセージを送信
        line_notify.send(f"入力された音声テキスト一覧:\n{message}")
    else:
        print("入力メッセージはありませんでした")

if __name__ == "__main__":
    main()

これで実行し
音声を
まともに動きますか
と入力すると

入力された音声テキスト一覧:
「まともに動きますか」は、少々不自然な表現です。
より自然で適切な表現は、「正常に動きますか」「問題なく動きますか」などになります。

「まとも」は通常、「正当・真正」という意味合いで用いられ、
物事の本来あるべき姿や道理に反しないことを示します。
例えば、「彼はまともな理由で解雇されたわけではなかった」のように使います。

一方、「動く」は「正常に機能する・問題なく作動する」という意味合いで用いられます。
ですから、文中で「まとも」を用いる必要性が低く、
より適切な表現を選ぶと自然な日本語になります。

というようになる

意図したのは
「正常に動きますか」「問題なく動きますか
というように変換してほしい

なのでプロンプトを変更する

Whisper の結果をLINEで送る

Whisper の結果をLINEで送る

実行環境
M1 MacbookAir 16GB

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time

def main():

    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()
    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()
                # 処理が10秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("10秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("5秒以上の無音状態が続いたため、ループを終了します。")
            break  # 無音5秒以上でループを抜ける
        text = fasterWhispermodel.audio2text(audio_data)
        print(text)

if __name__ == "__main__":
    main()

としたら停止するが実際には10秒以上かかったので
これを五秒程度にしたらうまく動く

次は line notify でこの文章を送信してみる

しかし、その前にこのtext部分をリストなどで格納してから送信する必要がある

まずここまでのソースを移動しておく

cp -rp module ~/aw10s/linebot
cp -rp configs/ ~/aw10s/linebot 
cp main.py ~/aw10s/linebot
で
cd ~/aw10s/linebot
python main.py

を実行したらエラー

Config.iniがないと出る
これは

cp -rp configs ~/aw10s/linebot

とすることで解決

これでここまで行った音声の自動終了はできたので
入力をリストなどにまとめるようにする

これは以前音声ファイルを出力した時の応用でできるはず

ループを抜けた時に入力した音声を改行付きで一覧表示したい

入力した音声をループを抜けたときに改行付きで一覧表示するには、音声認識されたテキストをリストに格納し、ループを抜けた後にそのリスト内のテキストを改行付きで表示する

recognized_textsリストの作成
* recognized_textsというリストを作成し、各ループで音声認識されたテキストをそのリストに追加
recognized_texts.append(text)
* 各ループで変換されたテキストをrecognized_textsリストに追加
ループ終了後の一覧表示
* ループを抜けた後に、print(“\n”.join(recognized_texts))を使って、リスト内のテキストを改行付きで表示

をmain.pyに追加する

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time

def main():

    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()
    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()
                # 処理が10秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("10秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("5秒以上の無音状態が続いたため、ループを終了します。")
            break  # 無音5秒以上でループを抜ける
        text = fasterWhispermodel.audio2text(audio_data)
        print(text)

if __name__ == "__main__":
    main()
を

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()
        
        # 処理が2秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("2秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        recognized_texts.append(text)  # テキストをリストに追加
        print(text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    print("\n入力された音声テキスト一覧:")
    print("\n".join(recognized_texts))

if __name__ == "__main__":
    main()

に変更し実行

[2024-09-06 06:03:52.900] [ctranslate2] [thread 17609052] [warning] The compute type inferred from the saved model is float16, but the target device or backend do not support efficient float16 computation. The model weights have been automatically converted to use the float32 compute type instead.
stand by ready OK
recording...
finished
今日のハンバーグは何にしようか
stand by ready OK
recording...
finished
和風ソースか
stand by ready OK
recording...
finished
オニオンソース
stand by ready OK
recording...
finished
デミグラスソース
stand by ready OK
recording...
finished
出しようかな
stand by ready OK
recording...
finished
今日の買い物リスト
stand by ready OK
recording...
finished
2秒間音声が入力されなかったため、ループを終了します。

入力された音声テキスト一覧:
今日のハンバーグは何にしようか
和風ソースか
オニオンソース
デミグラスソース
出しようかな
今日の買い物リスト

というように一覧表示ができた

次はこれを line notify で送信してみる

これは以前作成した在庫管理送信の機能から使う
チラシの方だとOCR処理が入っているのでより複雑だった

今回の場合、画像は不要で
Whisperで文字起こしした音声を送信するのが目的

 cp ../store_adversting_list/config.json .


LINEのAPIキーが格納されているファイルをコピー

今回はconfig.jsonの中はtoken 以外は削除しておく

{
  "token": "取得したアクセストークン"
}

次に

touch ilne_notify.py

でline notify の処理のモジュールを作成

# line_notify.py
import json
import requests

class LineNotify:
    def __init__(self, token_file_path):
        self.token = self._load_token(token_file_path)

    # JSONファイルからアクセストークンを読み込む
    def _load_token(self, token_file_path):
        with open(token_file_path, 'r') as file:
            data = json.load(file)
        return data["token"]

    # LINE Notifyでメッセージを送信
    def send(self, message):
        url = "https://notify-api.line.me/api/notify"
        headers = {
            "Authorization": f"Bearer {self.token}"
        }
        data = {
            "message": message
        }
        response = requests.post(url, headers=headers, data=data)
        if response.status_code != 200:
            raise Exception(f"Error sending message: {response.status_code}, {response.text}")
        return response

として保存

Main.pyを

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()
        
        # 処理が2秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("2秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        recognized_texts.append(text)  # テキストをリストに追加
        print(text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    print("\n入力された音声テキスト一覧:")
    print("\n".join(recognized_texts))

if __name__ == "__main__":
    main()

から

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time
from line_notify import LineNotify  # 作成したLineNotifyモジュールをインポート

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    # LINE Notifyのモジュールを初期化(アクセストークンをJSONファイルから読み込む)
    line_notify = LineNotify("config.json")

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()

        # 処理が2秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("2秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        recognized_texts.append(text)  # テキストをリストに追加
        print(text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    print("\n入力された音声テキスト一覧:")
    message = "\n".join(recognized_texts)
    print(message)

    # LINE Notifyでメッセージを送信
    line_notify.send(f"入力された音声テキスト一覧:\n{message}")

if __name__ == "__main__":
    main()

へ変更

これで実行すると音声で入力したものがline notify で送信される

しかし、これだと音声入力がなくても送信されるため、
もし入力リストが空ならログ表示だけにする

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time
from line_notify import LineNotify  # 作成したLineNotifyモジュールをインポート

def main():
    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()

    # 入力された音声テキストを格納するリスト
    recognized_texts = []

    # LINE Notifyのモジュールを初期化(config.jsonからトークンを読み込む)
    line_notify = LineNotify("config.json")

    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()

        # 処理が2秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("2秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("無音状態が続いたため、ループを終了します。")
            break  # 無音でループを抜ける
        
        # 音声をテキストに変換
        text = fasterWhispermodel.audio2text(audio_data)
        recognized_texts.append(text)  # テキストをリストに追加
        print(text)

    # ループ終了後に、入力した音声テキストを改行付きで一覧表示
    if recognized_texts:
        message = "\n".join(recognized_texts)
        print("\n入力された音声テキスト一覧:")
        print(message)

        # LINE Notifyでメッセージを送信
        line_notify.send(f"入力された音声テキスト一覧:\n{message}")
    else:
        print("入力メッセージはありませんでした")

if __name__ == "__main__":
    main()

とすることで入力がない場合には
Line notifyで送信しなくなる

停止処理を5秒たったら録音を終了させる

停止処理を5秒たったら終了させる

import numpy as np
import sounddevice as sd
import os
import configparser
import errno

class Recorderconfig:
    def __init__(self, config_ini_path='./configs/config.ini'):
        # iniファイルの読み込み
        self.config_ini = configparser.ConfigParser()
        
        # 指定したiniファイルが存在しない場合、エラー発生
        if not os.path.exists(config_ini_path):
            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_ini_path)
        
        self.config_ini.read(config_ini_path, encoding='utf-8')
        Recorder_items = self.config_ini.items('Recorder')
        self.Recorder_config_dict = dict(Recorder_items)

class Recorder:
    def __init__(self, config_ini_path='./configs/config.ini'):
        Recorder_config = Recorderconfig(config_ini_path=config_ini_path)
        config_dict = Recorder_config.Recorder_config_dict
        
        self.fs = int(config_dict["fs"])
        self.silence_threshold = float(config_dict["silence_threshold"])
        self.min_duration = float(config_dict["min_duration"])
        self.amplitude_threshold = float(config_dict["amplitude_threshold"])
        self.start_threshold = float(config_dict["start_threshold"])

    def speech2audio(self):
        record_Flag = False
        non_recorded_data = []
        recorded_audio = []
        silent_time = 0
        input_time = 0
        start_threshold = 0.3
        all_time = 0
        
        with sd.InputStream(samplerate=self.fs, channels=1) as stream:
            while True:
                data, overflowed = stream.read(int(self.fs * self.min_duration))
                all_time += 1
                if all_time == 10:
                    print("stand by ready OK")
                elif all_time >= 10:
                    if np.max(np.abs(data)) > self.amplitude_threshold and not record_Flag:
                        input_time += self.min_duration
                        if input_time >= start_threshold:
                            record_Flag = True
                            print("recording...")
                            recorded_audio = non_recorded_data[int(-1*start_threshold*10)-2:]

                    else:
                        input_time = 0

                    if overflowed:
                        print("Overflow occurred. Some samples might have been lost.")
                    if record_Flag:
                        recorded_audio.append(data)

                    else:
                        non_recorded_data.append(data)

                    # 無音が10秒以上続いたらループを抜ける
                    if np.all(np.abs(data) < self.amplitude_threshold):
                        silent_time += self.min_duration
                        if silent_time >= 10:  # 無音が10秒続いたらNoneを返す
                            print("finished")
                            record_Flag = False
                            return None
                    else:
                        silent_time = 0

        # 録音データが存在しない場合はNoneを返す
        if len(recorded_audio) == 0:
            return None

        audio_data = np.concatenate(recorded_audio, axis=0)
        return audio_data

として

from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder

def main():
    recorder = Recorder()  # Recorderのインスタンス作成
    fasterWhispermodel = FasterWhisperModel()  # FasterWhisperModelのインスタンス作成

    while True:
        audio_data = recorder.speech2audio()  # 音声の取得
        if audio_data is None:
            print("5秒以上の無音状態が続いたため、ループを終了します。")
            break  # 無音5秒以上でループを抜ける

        text = fasterWhispermodel.audio2text(audio_data)  # 音声をテキストに変換
        print(text)  # テキストを表示

if __name__ == "__main__":
    main()

としたが

[2024-09-05 06:19:29.705] [ctranslate2] [thread 17518403] [warning] The compute type inferred from the saved model is float16, but the target device or backend do not support efficient float16 computation. The model weights have been automatically converted to use the float32 compute type instead.
stand by ready OK
recording...
finished
5秒以上の無音状態が続いたため、ループを終了します。

となるが今度は入力ができなくなった

単純に

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder
import time

def main():

    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()
    while True:
        start_time = time.time()  # 処理開始時刻を記録
        audio_data = recorder.speech2audio()
                # 処理が10秒間行われなかった場合はループを抜ける
        if time.time() - start_time >= 5:
            print("10秒間音声が入力されなかったため、ループを終了します。")
            break

        if audio_data is None:
            print("5秒以上の無音状態が続いたため、ループを終了します。")
            break  # 無音5秒以上でループを抜ける
        text = fasterWhispermodel.audio2text(audio_data)
        print(text)

if __name__ == "__main__":
    main()

としたら停止するが実際には10秒以上かかっている

これを五秒程度にしたらうまく動く

次は line notify でこの文章を送信してみる

Faster-whisperマイクのコード解説

Faster-whisperマイクのコード解説

コードを読むのにChatGPTを使用
これによりコードの解説が得られることで理解しやすくなる

git clone https://github.com/personabb/colab_AI_sample.git

でリポジトリクローン

cd colab_AI_sample 
cd colab_fasterwhisper_sample

そして
main.pyの中の

    while True:
        audio_data = recorder.speech2audio()
        text = fasterWhispermodel.audio2text(audio_data)
        print(text)

これで
recorder.speech2audio()メソッドで音声を録音して、録音データをaudio_dataに格納し、fasterWhispermodel.audio2text()メソッドにて、音声ファイルを文字起こしして、textとして表示

つまり
printではなく
文字を送信するメソッドを作成すれば処理は完成するはず

module_recoder.py
発話の録音用モジュール

import numpy as np
import sounddevice as sd

import os
import configparser
# ファイルの存在チェック用モジュール
import errno

class Recorderconfig:
    def __init__(self, config_ini_path = './configs/config.ini'):
        # iniファイルの読み込み
        self.config_ini = configparser.ConfigParser()
        
        # 指定したiniファイルが存在しない場合、エラー発生
        if not os.path.exists(config_ini_path):
            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_ini_path)
        
        self.config_ini.read(config_ini_path, encoding='utf-8')
        Recorder_items = self.config_ini.items('Recorder')
        self.Recorder_config_dict = dict(Recorder_items)

class Recorder:
    def __init__(self, config_ini_path = './configs/config.ini'):
            
            Recorder_config = Recorderconfig(config_ini_path = config_ini_path)
            config_dict = Recorder_config.Recorder_config_dict
            
            self.fs = int(config_dict["fs"])
            self.silence_threshold = float(config_dict["silence_threshold"])
            self.min_duration = float(config_dict["min_duration"])
            self.amplitude_threshold = float(config_dict["amplitude_threshold"])
            self.start_threshold = float(config_dict["start_threshold"])

    def speech2audio(self):
        record_Flag = False

        non_recorded_data = []
        recorded_audio = []
        silent_time = 0
        input_time = 0
        start_threshold = 0.3
        all_time = 0
        
        with sd.InputStream(samplerate=self.fs, channels=1) as stream:
            while True:
                data, overflowed = stream.read(int(self.fs * self.min_duration))
                all_time += 1
                if all_time == 10:
                    print("stand by ready OK")
                elif all_time >=10:
                    if np.max(np.abs(data) > self.amplitude_threshold) and not record_Flag:
                        input_time += self.min_duration
                        if input_time >= start_threshold:
                            record_Flag = True
                            print("recording...")
                            recorded_audio=non_recorded_data[int(-1*start_threshold*10)-2:]  

                    else:
                        input_time = 0

                    if overflowed:
                        print("Overflow occurred. Some samples might have been lost.")
                    if record_Flag:
                        recorded_audio.append(data)

                    else:
                        non_recorded_data.append(data)

                    if np.all(np.abs(data) < self.amplitude_threshold):
                        silent_time += self.min_duration
                        if (silent_time >= self.silence_threshold) and record_Flag:
                            print("finished")
                            record_Flag = False
                            break
                    else:
                        silent_time = 0

        audio_data = np.concatenate(recorded_audio, axis=0)

        return audio_data

が全文

class Recorderconfig:
    def __init__(self, config_ini_path = './configs/config.ini'):
        # iniファイルの読み込み
        self.config_ini = configparser.ConfigParser()
        
        # 指定したiniファイルが存在しない場合、エラー発生
        if not os.path.exists(config_ini_path):
            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_ini_path)
        
        self.config_ini.read(config_ini_path, encoding='utf-8')
        Recorder_items = self.config_ini.items('Recorder')
        self.Recorder_config_dict = dict(Recorder_items)

この中で

config_ini_path = './configs/config.ini'

で指定されている設定ファイルをRecorder_config_dictとして読み込む

辞書型で読み込んでいるため、設定ファイルの中身をpythonの辞書として読み込むことが可能

self.Recorder_config_dict = dict(Recorder_items)

読み込んでいるのは

[Recorder]

fs=16000
silence_threshold=0.5
min_duration=0.1
amplitude_threshold=0.05
start_threshold = 0.3

の部分
それが

Recorder_items = self.config_ini.items('Recorder')

次に
Recorderクラスのinitメソッド

class Recorder:
    def __init__(self, config_ini_path = './configs/config.ini'):
            
            Recorder_config = Recorderconfig(config_ini_path = config_ini_path)
            config_dict = Recorder_config.Recorder_config_dict
            
            self.fs = int(config_dict["fs"])
            self.silence_threshold = float(config_dict["silence_threshold"])
            self.min_duration = float(config_dict["min_duration"])
            self.amplitude_threshold = float(config_dict["amplitude_threshold"])
            self.start_threshold = float(config_dict["start_threshold"])

まず、設定ファイルの内容をconfig_dictに格納
これは辞書型のため、config_dict[“device”]のような形で設定ファイルの内容を文字列として取得
すべての文字を文字列として取得するため、int型やbool型にしたい場合は、適宜型変更をする必要があることに注意

self.fs = int(config_dict["fs"]):

ではサンプリングレート fsの設定

fs=16000

がその対象
この時文字列として読み込んでいるので
Int でキャストしている

つまり

            self.silence_threshold = float(config_dict["silence_threshold"])
            self.min_duration = float(config_dict["min_duration"])
            self.amplitude_threshold = float(config_dict["amplitude_threshold"])
            self.start_threshold = float(config_dict["start_threshold"])

の部分で

config_dict[]

で設定値を読み込んで
それぞれfloat int などにキャストして値を設定している

続いて、設定ファイルから各種値をインスタンス変数に格納する

Recorderクラスのspeech2audioメソッド

class Recorder:
   ・・・
    def speech2audio(self):
        record_Flag = False

        non_recorded_data = []
        recorded_audio = []
        silent_time = 0
        input_time = 0
        start_threshold = 0.3
        all_time = 0
        
        with sd.InputStream(samplerate=self.fs, channels=1) as stream:
            while True:
                data, overflowed = stream.read(int(self.fs * self.min_duration))
                all_time += 1
                if all_time == 10:
                    print("stand by ready OK")
                elif all_time >=10:
                    if np.max(np.abs(data) > self.amplitude_threshold) and not record_Flag:
                        input_time += self.min_duration
                        if input_time >= start_threshold:
                            record_Flag = True
                            print("recording...")
                            recorded_audio=non_recorded_data[int(-1*start_threshold*10)-2:]  

                    else:
                        input_time = 0

                    if overflowed:
                        print("Overflow occurred. Some samples might have been lost.")
                    if record_Flag:
                        recorded_audio.append(data)

                    else:
                        non_recorded_data.append(data)

                    if np.all(np.abs(data) < self.amplitude_threshold):
                        silent_time += self.min_duration
                        if (silent_time >= self.silence_threshold) and record_Flag:
                            print("finished")
                            record_Flag = False
                            break
                    else:
                        silent_time = 0

        audio_data = np.concatenate(recorded_audio, axis=0)

        return audio_data

上記の部分では,ユーザの発話をマイクを通して取得

「stand by ready OK」と表示されてから,マイクに入力された音声の大きさが閾値(self.amplitude_threshold)以上だった場合に,録音が開始
録音が開始されたら、「recording…」と表示

その後、マイクに入力された音声の大きさが閾値以下になって0.5秒(self.silence_threshold)経過したら録音が停止する

録音が停止したら、「finished」と表示

初期設定:
* record_Flag : 録音中かどうかを示すフラグ。初期値は False(録音していない)。
* non_recorded_data : 録音を開始する前に収集したデータを保持するリスト。
* recorded_audio : 録音中の音声データを保持するリスト。
* silent_time : 無音が続いた時間をカウントする変数。
* input_time : 入力が続いた時間をカウントする変数。
* start_threshold : 録音を開始するためのしきい値。
* all_time : 全体の時間をカウントする変数

with sd.InputStream(samplerate=self.fs, channels=1) as stream:: 

サンプリングレート self.fs とチャンネル数 1(モノラル)で音声入力ストリームを開始

            while True:

でループで音声を取得する

データの読み取り: data, overflowed = stream.read(int(self.fs * self.min_duration))
* self.min_duration に基づいて、一定時間分の音声データを読み取ります。
* overflowed はバッファがオーバーフローしたかどうかを示します。
スタンバイメッセージの表示:
* 最初の10回のループで “stand by ready OK” を表示します(初期化期間を意味するようです)。

                data, overflowed = stream.read(int(self.fs * self.min_duration))
                all_time += 1
                if all_time == 10:
                    print("stand by ready OK")

の部分

if np.max(np.abs(data) > self.amplitude_threshold) and not record_Flag:
    input_time += self.min_duration
    if input_time >= start_threshold:
        record_Flag = True
        print("recording...")
        recorded_audio = non_recorded_data[int(-1 * start_threshold * 10) - 2:]
else:
    input_time = 0

では

np.max(np.abs(data) > self.amplitude_threshold):

* これは、現在の音声データの振幅が self.amplitude_threshold(振幅のしきい値)を超えているかどうかをチェックします。np.abs(data) は音声データの絶対値(振幅)を計算し、np.max() はその最大値を取得します。
not record_Flag:
* record_Flag が False の場合、つまりまだ録音が開始されていない場合にこの条件が真になります。

次に入力時間の追加

input_time += self.min_duration

上記の条件が真の場合、input_time に self.min_duration を加算します。これにより、音声がしきい値を超えた時間がカウントされます。

if input_time >= start_threshold:

input_time が start_threshold を超えた場合、録音を開始する条件が満たされたとみなされます。start_threshold は録音開始のためのしきい値です。

録音の開始

record_Flag = True
print("recording...")
recorded_audio = non_recorded_data[int(-1 * start_threshold * 10) - 2:]

record_Flag = True に設定することで、録音が開始されます。

recorded_audio = non_recorded_data[int(-1 * start_threshold * 10) - 2:]:

* 録音の開始時に、直前の音声データを recorded_audio に追加します。これは、しきい値を超える前の短い期間の音声も記録するためです。int(-1 * start_threshold * 10) – 2 は、start_threshold 秒間のデータを遡って取得するインデックスを計算しています。-2 の部分は余裕を持たせるために追加されている可能性があります

module_whisper.py
音声の文字起こし用のモジュール

from faster_whisper import WhisperModel
import numpy as np
import torch

import os
import configparser
# ファイルの存在チェック用モジュール
import errno

class FasterWhisperconfig:
    def __init__(self, config_ini_path = './configs/config.ini'):
        # iniファイルの読み込み
        self.config_ini = configparser.ConfigParser()
        
        # 指定したiniファイルが存在しない場合、エラー発生
        if not os.path.exists(config_ini_path):
            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_ini_path)
        
        self.config_ini.read(config_ini_path, encoding='utf-8')
        FasterWhisper_items = self.config_ini.items('FasterWhisper')
        self.FasterWhisper_config_dict = dict(FasterWhisper_items)

class FasterWhisperModel:
    def __init__(self,device = None, config_ini_path = './configs/config.ini'):
        FasterWhisper_config = FasterWhisperconfig(config_ini_path = config_ini_path)
        config_dict = FasterWhisper_config.FasterWhisper_config_dict

        if device is not None:
            self.DEVICE = device
        else:
            device = config_dict["device"]

            self.DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
            if device != "auto":
                self.DEVICE = device
            
        self.BEAM_SIZE = int(config_dict["gpu_beam_size"]) if self.DEVICE == "cuda" else int(config_dict["cpu_beam_size"])
        self.language = config_dict["language"]
        self.COMPUTE_TYPE = config_dict["gpu_compute_type"] if self.DEVICE == "cuda" else config_dict["cpu_compute_type"]
        self.MODEL_TYPE = config_dict["gpu_model_type"] if self.DEVICE == "cuda" else config_dict["cpu_model_type"]
        self.kotoba_chunk_length = int(config_dict["chunk_length"])
        self.kotoba_condition_on_previous_text = config_dict["condition_on_previous_text"]
        if self.kotoba_condition_on_previous_text == "True":
            self.kotoba_condition_on_previous_text = True
        else:
            self.kotoba_condition_on_previous_text = False

        if config_dict["use_kotoba"] == "True":
            self.use_kotoba = True
        else:
            self.use_kotoba = False

        if not self.use_kotoba:
            self.model = WhisperModel(self.MODEL_TYPE, device=self.DEVICE, compute_type=self.COMPUTE_TYPE)
        else:
            self.MODEL_TYPE = config_dict["kotoba_model_type"]
            #self.model = WhisperModel(self.MODEL_TYPE, device=self.DEVICE, compute_type=self.cotoba_compute_type)
            self.model = WhisperModel(self.MODEL_TYPE)


    def audio2text(self, data):
        result = ""
        data = data.flatten().astype(np.float32)
        if not self.use_kotoba:
            segments, _ = self.model.transcribe(data, beam_size=self.BEAM_SIZE,language=self.language)
        else:
            segments, _ = self.model.transcribe(data, beam_size=self.BEAM_SIZE,language=self.language, chunk_length=self.kotoba_chunk_length, condition_on_previous_text=self.kotoba_condition_on_previous_text)
        
        for segment in segments:
            result += segment.text
        
        return result
            
    def audioFile2text(self, file_path):
        result = ""
        if not self.use_kotoba:
            segments, _ = self.model.transcribe(file_path, beam_size=self.BEAM_SIZE,language=self.language)
        else:
            segments, _ = self.model.transcribe(file_path, beam_size=self.BEAM_SIZE,language=self.language, chunk_length=self.kotoba_chunk_length, condition_on_previous_text=self.kotoba_condition_on_previous_text)
        
        for segment in segments:
            result += segment.text

        return result
FasterWhisperconfigクラス

class FasterWhisperconfig:
    def __init__(self, config_ini_path = './configs/config.ini'):
        # iniファイルの読み込み
        self.config_ini = configparser.ConfigParser()
        
        # 指定したiniファイルが存在しない場合、エラー発生
        if not os.path.exists(config_ini_path):
            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_ini_path)
        
        self.config_ini.read(config_ini_path, encoding='utf-8')
        FasterWhisper_items = self.config_ini.items('FasterWhisper')
        self.FasterWhisper_config_dict = dict(FasterWhisper_items)

ここではconfig_ini_path = ‘./configs/config.ini’で指定されている設定ファイルをFasterWhisper_config_dictとして読み込んでいます。
辞書型で読み込んでいるため、設定ファイルの中身をpythonの辞書として読み込むことが可能

これにより

[FasterWhisper]
device = auto
language = ja

gpu_model_type = large-v3
gpu_beam_size = 1
gpu_compute_type = float16

cpu_model_type = small
cpu_beam_size = 1
cpu_compute_type = int8

use_kotoba = True
kotoba_model_type = kotoba-tech/kotoba-whisper-v1.0-faster
chunk_length = 15
condition_on_previous_text = False

の部分を読み込んで辞書にしている

FasterWhisperModelクラスのinitメソッド

class FasterWhisperModel:
    def __init__(self,device = None, config_ini_path = './configs/config.ini'):
        FasterWhisper_config = FasterWhisperconfig(config_ini_path = config_ini_path)
        config_dict = FasterWhisper_config.FasterWhisper_config_dict

        if device is not None:
            self.DEVICE = device
        else:
            device = config_dict["device"]

            self.DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
            if device != "auto":
                self.DEVICE = device
            
        self.BEAM_SIZE = int(config_dict["gpu_beam_size"]) if self.DEVICE == "cuda" else int(config_dict["cpu_beam_size"])
        self.language = config_dict["language"]
        self.COMPUTE_TYPE = config_dict["gpu_compute_type"] if self.DEVICE == "cuda" else config_dict["cpu_compute_type"]
        self.MODEL_TYPE = config_dict["gpu_model_type"] if self.DEVICE == "cuda" else config_dict["cpu_model_type"]
        self.kotoba_chunk_length = int(config_dict["chunk_length"])
        self.kotoba_condition_on_previous_text = config_dict["condition_on_previous_text"]
        if self.kotoba_condition_on_previous_text == "True":
            self.kotoba_condition_on_previous_text = True
        else:
            self.kotoba_condition_on_previous_text = False

        if config_dict["use_kotoba"] == "True":
            self.use_kotoba = True
        else:
            self.use_kotoba = False

        if not self.use_kotoba:
            self.model = WhisperModel(self.MODEL_TYPE, device=self.DEVICE, compute_type=self.COMPUTE_TYPE)
        else:
            self.MODEL_TYPE = config_dict["kotoba_model_type"]
            #self.model = WhisperModel(self.MODEL_TYPE, device=self.DEVICE, compute_type=self.cotoba_compute_type)
            self.model = WhisperModel(self.MODEL_TYPE)

これは

        FasterWhisper_config = FasterWhisperconfig(config_ini_path = config_ini_path)
        config_dict = FasterWhisper_config.FasterWhisper_config_dict


まず、設定ファイルの内容をconfig_dictに格納しています。これは辞書型のため、config_dict[“device”]のような形で設定ファイルの内容を文字列として取得することができます。
あくまで、すべての文字を文字列として取得するため、int型やbool型にしたい場合は、適宜型変更をする必要があることに注意

続いて下記の順番で処理を行います。
* モデルを動作させるdeviceを指定する
* 設定ファイルの各種設定を取得する
* モデルを定義する。
* 設定ファイルに合わせて、適切なモデルを定義する

FasterWhisperModelクラスのaudioFile2textメソッド

class FasterWhisperModel:
    ・・・
    def audio2text(self, data):
        result = ""
        data = data.flatten().astype(np.float32)
        if not self.use_kotoba:
            segments, _ = self.model.transcribe(data, beam_size=self.BEAM_SIZE,language=self.language)
        else:
            segments, _ = self.model.transcribe(data, beam_size=self.BEAM_SIZE,language=self.language, chunk_length=self.kotoba_chunk_length, condition_on_previous_text=self.kotoba_condition_on_previous_text)
        
        for segment in segments:
            result += segment.text
        
        return result

faster-whisperモデルのtranscribeメソッドを呼び出して、音声認識をしています。
faster-whisperモデルのtranscribeメソッドは、ファイル名を引数にした場合は、そのファイルを読み込んで処理を行い。numpyデータやbinaryデータを引数にした場合は、そのデータをそのまま利用してくれます

設定ファイルで指定したモデルに合わせて、適切な引数を使っています。
faster-whisperは30秒以上の音声に関しては、音声を分割して処理をするため、分割されて生成されたテキストをresult変数に格納して、return

data = data.flatten().astype(np.float32)

音声データをfaster-whisperモデルに入力するために、データを整形

roboflow Workspaceでアノテーション

roboflow Workspaceでアノテーション

https://axross-recipe.com/recipes/1469
を参考に行う

https://blog.roboflow.com/train-yolov8-obb-model/
のチュートリアルも参考にする

https://blog.roboflow.com
へアクセスし
GoogleIDなどで、Sign InすればOK

その場合
Continue with Google をクリック

ユーザ名を入力すると
プランが2つ出る
Free の無料か
Starter traialの月額249$
のどっちかになるので

とりあえずはFree にする

これでCreate workspace をクリック

次に
Invite teammates.
Add collaborators to help with labeling, upload data, train models, and more.
チームメイトを招待します。
ラベル付け、データのアップロード、モデルのトレーニングなどを支援するコラボレーターを追加します。

とあるけど
とりあえずSkipでOK

これでプロジェクトの作成画面になるが
デフォルトだとライセンスが
CC BY 4.0
になっている

Project Name は
anotation_test
とした

Annotation Group
注釈グループ
が識別のため必要らしい

とりあえずバスクリンなので
Baskulin

Project Typeには
Object Detection
を選択

これで
Create Project をクリック

次に画像ファイル
もしくは画像フォルダを選択する

ここでバスクリンを撮影したフォルダを指定するが
せっかくなので
画像の水増しをする

まず
Pixcel 8 で撮影したバスクリンの画像を
Google Photo からダウンロード

4枚の写真になっているので
これを水増しする

なお複数の写真をダウンロードすると
圧縮ファイルになっているのでこれを解凍する

vim generate_images.py

で内容を

import sys
import os
from PIL import Image

# コマンドライン引数から画像ファイル名を取得
if len(sys.argv) != 2:
    print("Usage: python generate_images.py imagefile.png")
    sys.exit(1)

image_file = sys.argv[1]

# 画像を読み込む
try:
    image = Image.open(image_file)
except IOError:
    print(f"Could not open the image file {image_file}")
    sys.exit(1)

# ファイル名と拡張子を分離し、ディレクトリ名を決定
file_name, file_extension = os.path.splitext(os.path.basename(image_file))
directory_name = file_name

# ディレクトリが存在しない場合は作成
if not os.path.exists(directory_name):
    os.makedirs(directory_name)

# 画像の変形と保存を行う関数
def save_images(image, prefix, transform, count=100):
    for i in range(count):
        filename = f'{prefix}{i+1:03}{file_extension}'
        filepath = os.path.join(directory_name, filename)
        transformed_image = image.transpose(transform)
        transformed_image.save(filepath)

# 各変換を適用して画像を保存
save_images(image, f'{file_name}_', Image.FLIP_TOP_BOTTOM, 100)
save_images(image, f'{file_name}_rot90_', Image.ROTATE_90, 100)
save_images(image, f'{file_name}_rot270_', Image.ROTATE_270, 100)

という画像を回転させてコピーするスクリプトを作成

次に写真をコピーしておく

cp ~/Downloads/Photos-001\ \(1\)/PXL_20240504_1732* .

ファイル名を変えた方が楽なので
ターミナルで

i=1
for file in PXL_20240504_173232354.jpg PXL_20240504_173242547.jpg PXL_20240504_173237123.jpg PXL_20240504_173253414.jpg; do
  mv "$file" "basclin$i.jpg"
  i=$((i + 1))
done

を実行

すると

basclin1.jpg
basclin2.jpg
basclin3.jpg
basclin4.jpg
というようにファイル名が変わる

よくみたらスペルミスなので

i=1
for file in basclin*.jpg; do
  mv "$file" "Baskulin$i.jpg"
  i=$((i + 1))
done

で修正

これでファイル名が
Baskulin1.jpg
Baskulin2.jpg
Baskulin3.jpg
Baskulin4.jpg
となったので

python generate_images.py Baskulin1.jpg
python generate_images.py Baskulin2.jpg
python generate_images.py Baskulin3.jpg
python generate_images.py Baskulin4.jpg

でファイルを量産

これで各ファイルごとのフォルダができたので
これを1つのフォルダにまとめる

ls -d */ | grep Bas


Basと書かれたディレクトリのみ表示できるので

move_files.sh

というスクリプトを作成

#!/bin/bash

# 移動先のディレクトリを作成(存在しない場合)
mkdir -p baskulin

# Baskulinで始まる全ディレクトリのファイルを baskulin ディレクトリに移動
for dir in Baskulin*/; do
    # ディレクトリ内のファイルを baskulin に移動
    mv "$dir"* baskulin/
done

echo "All files have been moved to the 'baskulin' directory."

として保存

chmod +x move_files.sh

で実行権限付与

./move_files.sh

で実行すれば全てのファイルが
1つのフォルダに移動される

そしてこのフォルダをアップロード

これでsave and continueをクリック

なお100枚以上に増やしたけど
同じ画像の場合は全て1つのものとしてみなすため
意味がなかった

回転させたりした場合は
異なる画像としてカウントされる

これでデータがアップできたので
次はアノテーション

kotoba-whisper-v1.0

kotoba-whisper-v1.0

large3 より速いし日本語特化らしい
https://zenn.dev/asap/articles/ba8fcb1880165e

これと
https://zenn.dev/asap/articles/2c0d421e68ef16

生成AIをローカルで簡単に 【Part5.5 faster-whisper+マイク録音編】
と組み合わせることにする

https://github.com/personabb/colab_AI_sample/tree/main/colab_fasterwhisper_sample
のソースコード
を参考に作り上げる

とりあえず設定ファイルの解説

https://zenn.dev/asap/articles/2c0d421e68ef16
の解説を見ながら行う

git clone https://github.com/personabb/colab_AI_sample.git

でリポジトリクローン

cd colab_AI_sample 
cd colab_fasterwhisper_sample


Faster-whisperのところへ移動

python main.py

でとりあえず稼働テスト

config.json: 100%|█████████████████████████| 2.39k/2.39k [00:00<00:00, 13.2MB/s]
preprocessor_config.json: 100%|████████████████| 340/340 [00:00<00:00, 1.59MB/s]
tokenizer.json: 100%|███████████████████████| 2.48M/2.48M [00:02<00:00, 962kB/s]
vocabulary.json: 100%|██████████████████████| 1.07M/1.07M [00:02<00:00, 419kB/s]
model.bin: 100%|███████████████████████████| 1.51G/1.51G [02:37<00:00, 9.58MB/s]
[2024-09-03 05:52:04.173] [ctranslate2] [thread 17310990] [warning] The compute type inferred from the saved model is float16, but the target device or backend do not support efficient float16 computation. The model weights have been automatically converted to use the float32 compute type instead.
stand by ready OK
recording...
finished
ごめん
stand by ready OK
recording...
finished
ごめん
stand by ready OK
recording...
finished
あれ?
stand by ready OK
recording...
finished
今日の天気
stand by ready OK
recording...
finished
明日の天気。
stand by ready OK
recording...
finished
今日はご飯を食べたいハンバーグも欲しい
stand by ready OK
recording...
finished
終わり
stand by ready OK
recording...
finished
一応はこれで使い物にはなるかな
stand by ready OK
recording...
finished
文字を少しまで何秒
stand by ready OK
recording...
finished
あらら
stand by ready OK
recording...
finished
あわわわ
stand by ready OK
recording...

となってほぼ聞き取りができている
ただしアレクサの音声は聞き取りがうまくできなかった
時間は数秒のタイムラグで実行され文字起こしされる

Ctrl+Cをおしてプログラムを停止するまで、永遠に録音と文字起こしが繰り返される

そこまでこだわらなくても
以前、顔の個人認識のときみたいに
ソースをもらって使えばいい

とりあえず修正にあたりソースの理解は必要なので

設定ファイルのソースからみる

[Recorder]

fs=16000
silence_threshold=0.5
min_duration=0.1
amplitude_threshold=0.05
start_threshold = 0.3



[FasterWhisper]
device = auto
language = ja

gpu_model_type = large-v3
gpu_beam_size = 1
gpu_compute_type = float16

cpu_model_type = small
cpu_beam_size = 1
cpu_compute_type = int8

use_kotoba = True
kotoba_model_type = kotoba-tech/kotoba-whisper-v1.0-faster
chunk_length = 15
condition_on_previous_text = False

以下解説
[Recorder]部分は録音モジュールに関係する設定

fs=16000
サンプリングレート16,000Hzで録音
これは
Whisperが16,000Hzの音声を前提としているため

silence_threshold=0.5
無音になってから0.5秒後に録音を停止するパラメータ

min_duration=0.1
streamで録音する際の、chunkの大きさ
変更の必要はない

amplitude_threshold=0.05
無音判定の閾値です。音声の音量が0.05よりも小さい音は無音と判定
これが0だと、自然なノイズも全て音と判定するため、永遠に録音が終わりません

start_threshold = 0.3
閾値(amplitude_threshold)以上の音量が、0.3秒以上継続して入ってきたら、発話開始と判断して録音を開始します。
突発的なノイズによる閾値越えを防ぐための処理

[FasterWhisper]部分は文字起こしモジュールの設定

device = auto
マシンのGPUやCPUのどちらを利用するかの設定
autoの場合は、マシンがGPUを利用できるならGPUを利用する。
その他ではcudaやcpuを指定できる

gpu_model_type = large-v3
gpu_beam_size = 1
gpu_compute_type = float16

cpu_model_type = small
cpu_beam_size = 1
cpu_compute_type = int8


モデルの設定をしている
GPUを利用する場合とCPUを利用する場合でモデルを変更している
GPU利用の場合はlarge-v3モデルという高性能モデルを利用している
CPU利用の場合はsmallモデルという軽量モデルを利用している
加えてCPUの場合は、int8という小さなデータ型を利用して、
計算量を減らしている

use_kotoba = True
kotoba_model_type = kotoba-tech/kotoba-whisper-v1.0-faster
chunk_length = 15
condition_on_previous_text = False

これは
日本語特化モデルであるkotoba-whisper-v1.0を利用する設定

kotoba_model_type = kotoba-tech/kotoba-whisper-v1.0-faster


利用するfaster-whisperモデルを指定

次に

import sounddevice as sd
from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder

def main():

    recorder = Recorder()
    fasterWhispermodel = FasterWhisperModel()
    while True:
        audio_data = recorder.speech2audio()
        text = fasterWhispermodel.audio2text(audio_data)
        print(text)

if __name__ == "__main__":
    main()

のソース

これがmain.py

from module.module_whisper import FasterWhisperModel
from module.module_recorder import Recorder


moduleフォルダ内の録音用のモジュールと文字起こし用のモジュールを呼び出し

recorder = Recorder()
fasterWhispermodel = FasterWhisperModel()


呼び出したモジュールのクラスのインスタンスを作成

    while True:
        audio_data = recorder.speech2audio()
        text = fasterWhispermodel.audio2text(audio_data)
        print(text)

これで
recorder.speech2audio()メソッドで音声を録音して、録音データをaudio_dataに格納し、fasterWhispermodel.audio2text()メソッドにて、音声ファイルを文字起こしして、textとして表示

つまり
printではなく
文字を送信するメソッドを作成すれば処理は完成するはず

Faster WhisperとPyAudioを使用して、マイクから音声をリアルタイムで取得し、音声をテキストに変換する

Faster WhisperとPyAudioを使用して、マイクから音声をリアルタイムで取得し、音声をテキストに変換する

pip install faster-whisper pyaudio

でライブラリインストール

録音ファイルは
mic_rec.py
とする

import pyaudio
import numpy as np
import faster_whisper

# Faster Whisperのモデルをロードします(モデルパスは適宜変更してください)
model = faster_whisper.WhisperModel("large-v2", device="cpu")  # または "cuda" でGPUを使用

# 音声設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024

# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()

# マイクから音声を取得するストリームを開きます
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)

print("Listening...")

# 音声データをバッファとして取得してリアルタイムに処理します
try:
    while True:
        # 音声データを取得
        data = stream.read(CHUNK)
        # NumPy配列に変換
        audio_data = np.frombuffer(data, dtype=np.int16)
        # Faster Whisperに音声データを渡してテキストを取得
        segments, _ = model.transcribe(audio_data)
        # 取得したセグメントを出力
        for segment in segments:
            print(f"Text: {segment.text}")
except KeyboardInterrupt:
    # 終了処理
    print("Terminating...")
finally:
    stream.stop_stream()
    stream.close()
    audio.terminate()


生成されたコードだが
モデルは
large-v3
を使い

CPUを使う

なお以前
/aw10s/whisper/test.py
で試した時には

model = WhisperModel("large-v3", device="cpu", compute_type="int8")

としたので
この設定を使う

import pyaudio
import numpy as np
from faster_whisper import WhisperModel


# Faster Whisperのモデルをロードします(モデルパスは適宜変更してください)
# model = faster_whisper.WhisperModel("large-v2", device="cpu")  # または "cuda" でGPUを使用
model = WhisperModel("large-v3", device="cpu", compute_type="int8")
# 音声設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 1024

# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()

# マイクから音声を取得するストリームを開きます
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)

print("Listening...")

# 音声データをバッファとして取得してリアルタイムに処理します
try:
    while True:
        # 音声データを取得
        data = stream.read(CHUNK)
        # NumPy配列に変換
        audio_data = np.frombuffer(data, dtype=np.int16)
        # Faster Whisperに音声データを渡してテキストを取得
        segments, _ = model.transcribe(audio_data)
        # 取得したセグメントを出力
        for segment in segments:
            print(f"Text: {segment.text}")
except KeyboardInterrupt:
    # 終了処理
    print("Terminating...")
finally:
    stream.stop_stream()
    stream.close()
    audio.terminate()

で実行

しかし

Listening...
Text:  Takk for att du så på!
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/linebot/mic_rec.py", line 27, in <module>
    data = stream.read(CHUNK)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/pyaudio/__init__.py", line 570, in read
    return pa.read_stream(self._stream, num_frames,
OSError: [Errno -9981] Input overflowed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/linebot/mic_rec.py", line 39, in <module>
    stream.stop_stream()
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/pyaudio/__init__.py", line 500, in stop_stream
    pa.stop_stream(self._stream)
OSError: Stream not open

となる

OSError: [Errno -9981] Input overflowed は、
PyAudio がバッファから音声データを適切に読み取れなかったことを示しています。
この問題は、バッファサイズが小さいか、処理が遅いために発生することがあります。
とのこと

バッファサイズの調整:
CHUNK のサイズを大きくすることで、入力オーバーフローを防ぐことができます。
例えば、1024 を 2048 や 4096 に変更してみてください
ということで

CHUNK = 2048  # もしくは 4096

で試す

また
エラーハンドリングの追加: オーバーフローエラーが発生した場合に備え、
例外処理を追加してストリームを正しく閉じるようにします。

try:
    while True:
        data = stream.read(CHUNK, exception_on_overflow=False)
        audio_data = np.frombuffer(data, dtype=np.int16)
        segments, _ = model.transcribe(audio_data)
        for segment in segments:
            print(f"Text: {segment.text}")
except OSError as e:
    print(f"Error: {e}")
except KeyboardInterrupt:
    print("Terminating...")
finally:
    stream.stop_stream()
    stream.close()
    audio.terminate()

これらを追加して

import pyaudio
import numpy as np
from faster_whisper import WhisperModel

# Faster Whisperのモデルをロードします
model = WhisperModel("large-v3", device="cpu", compute_type="int8")

# 音声設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 2048  # バッファサイズを大きく変更

# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()

# マイクから音声を取得するストリームを開きます
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)

print("Listening...")

# 音声データをバッファとして取得してリアルタイムに処理します
try:
    while True:
        # 音声データを取得、オーバーフロー時に例外を発生させない
        data = stream.read(CHUNK, exception_on_overflow=False)
        # NumPy配列に変換
        audio_data = np.frombuffer(data, dtype=np.int16)
        # Faster Whisperに音声データを渡してテキストを取得
        segments, _ = model.transcribe(audio_data)
        # 取得したセグメントを出力
        for segment in segments:
            print(f"Text: {segment.text}")
except OSError as e:
    print(f"Error: {e}")
except KeyboardInterrupt:
    print("Terminating...")
finally:
    stream.stop_stream()
    stream.close()
    audio.terminate()

としたが
実行すると

Listening... Text: Takk for att du så med. Text: Teksting av Nicolai Winther Text: Teksting av Nicolai Winther Text: Продолжение следует... Text: Teksting av Nicolai Winther ^CTerminating... 

となる

Faster Whisperモデルが認識する言語が様々であるため、
期待される言語での認識がうまくいっていない可能性があります。
これを改善するために、
特定の言語を指定してモデルがその言語に焦点を当てて認識するように設定することが有効です。

Faster Whisperモデルで特定の言語を指定するためには、
transcribe メソッドに language パラメータを追加して使用します。例えば、
日本語を指定したい場合は、language=”ja” とします。

ということで

import pyaudio
import numpy as np
from faster_whisper import WhisperModel

# Faster Whisperのモデルをロードします
model = WhisperModel("large-v3", device="cpu", compute_type="int8")

# 音声設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 2048  # バッファサイズを大きく変更

# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()

# マイクから音声を取得するストリームを開きます
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)

print("Listening...")

# 音声データをバッファとして取得してリアルタイムに処理します
try:
    while True:
        # 音声データを取得、オーバーフロー時に例外を発生させない
        data = stream.read(CHUNK, exception_on_overflow=False)
        # NumPy配列に変換
        audio_data = np.frombuffer(data, dtype=np.int16)
        # Faster Whisperに音声データを渡してテキストを取得
        segments, _ = model.transcribe(audio_data, language="ja")  # 言語を日本語に指定
        # 取得したセグメントを出力
        for segment in segments:
            print(f"Text: {segment.text}")
except OSError as e:
    print(f"Error: {e}")
except KeyboardInterrupt:
    print("Terminating...")
finally:
    stream.stop_stream()
    stream.close()
    audio.terminate()

実行すると

Text: ご視聴ありがとうございました
Text: ご視聴ありがとうございました
Text: ご視聴ありがとうございました

となる

多分設定が足りていない

kotoba-whisper-v1.0

large3 より速いし日本語特化らしい
https://zenn.dev/asap/articles/ba8fcb1880165e

これと
https://zenn.dev/asap/articles/2c0d421e68ef16

生成AIをローカルで簡単に 【Part5.5 faster-whisper+マイク録音編】
と組み合わせることにする

現在の天気が雨でない場合に次の1時間の天気予報をチェックするようにする

現在の天気が雨でない場合に次の1時間の天気予報をチェックするようにする

# weather_check.py
import location_utils2
import requests

def get_weather_data(api_key, latitude, longitude):
    # OpenWeather One Call API の URL
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]  # 次の1時間の予報だけをチェック
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

# APIキーを設定
api_key = ''


# location_utils2から緯度と経度と天気をを取得
latitude, longitude, _ , weather_description = location_utils2.get_current_location_and_address()

# 天気データを取得
weather_data = get_weather_data(api_key, latitude, longitude)

# 雨が予報されているかどうかをチェック
if check_for_rain(weather_data):
    print("Alert: Rain is expected within the next hour!")
else:
    print("No rain expected in the next hour.")

のコードだと
雨が降っていても一時間後に雨なのかを調べてしまう

なので現在地の天気が雨でないのなら実行するようにする

なおこの判定は

# location_utils2.py
import requests
from geopy.geocoders import Nominatim

def get_current_location_and_address():
    # APIキーとZIPコードは予め設定しておく必要があります。
    API_key = ""
    zip_place = "郵便番号,JP"
    
    # OpenWeatherMap API URL
    url = f"https://api.openweathermap.org/data/2.5/weather?zip={zip_place}&units=metric&lang=ja&appid={API_key}"
    
    # データの取得
    response = requests.get(url)
    jsondata = response.json()
    
    # 緯度と経度の取得
    latitude = jsondata['coord']['lat']
    longitude = jsondata['coord']['lon']
    
    #天気の取得
    # weather_description = jsondata['weather'][0]['description']
    weather_description = jsondata['weather'][0]['main']

    
    # 住所の取得(オプショナル)
    geolocator = Nominatim(user_agent="geoapiExercises")
    location = geolocator.reverse((latitude, longitude), language='ja')
    address = location.address if location else None
    
    return latitude, longitude, address, weather_description

で郵便番号をもとに
現在地の緯度経度と天気を取得している

この時に
Mainだと英語で取得になるが
Descriptionだと日本語で詳細の天気になる

ただ詳細な天気だと小雨とか判定が面倒なので
とにかく雨であれば
main==rainの判定にできるので
Mainを判定基準にした

変更後のコードは

import location_utils2
import requests

def get_weather_data(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

api_key = ''

latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()

# 現在の天気が雨でない場合にのみ次の1時間の雨の予報をチェック
if weather_description != 'Rain':
    weather_data = get_weather_data(api_key, latitude, longitude)
    if check_for_rain(weather_data):
        print("Alert: Rain is expected within the next hour!")
    else:
        print("No rain expected in the next hour.")
else:
    print("It is currently raining.")

次に音声を流れるように変更

今回も音声再生はpygameを使う

pygameライブラリを使って音声ファイルを再生しています。
まず、pygame.mixerを初期化し、音声ファイルを読み込んで再生します。
pygame.mixer.music.play()関数で音声を再生し、
while pygame.mixer.music.get_busy()ループを使って
音声が再生されている間は待機します。
このスクリプトは、雨が予報されていない場合にのみ、次の1時間の雨の予報をチェックし、
予報されていた場合に指定された音声ファイルを再生します

コードは

import location_utils2
import requests
import pygame  # Import pygame for audio playback

def get_weather_data(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

api_key = ''

latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()

# 現在の天気が雨でない場合にのみ次の1時間の雨の予報をチェック
if weather_description != 'Rain':
    weather_data = get_weather_data(api_key, latitude, longitude)
    if check_for_rain(weather_data):
        print("Alert: Rain is expected within the next hour!")
        pygame.mixer.init()  # Initialize the mixer module
        pygame.mixer.music.load('voice.wav')  # Load your sound file
        pygame.mixer.music.play()  # Play the sound
        while pygame.mixer.music.get_busy():  # Wait for music to finish playing
            pygame.time.Clock().tick(10)
    else:
        print("No rain expected in the next hour.")
else:
    print("It is currently raining.")

でOK

音声の部分は

python create_voice.py voice.txt 192.168.1.69:50021

で作成した音声を使用

次は顔認識したら実行するようにする

以前の
Weatheer や mail_voice などで使ったkao.pyとxmlファイルを使えばできるはず

ということでモジュール化する

weather_alert.py

として

# weather_alert.py

import location_utils2
import requests
import pygame

def get_weather_data(api_key, latitude, longitude):
    url = f"https://api.openweathermap.org/data/3.0/onecall?lat={latitude}&lon={longitude}&exclude=minutely,daily&appid={api_key}&units=metric"
    response = requests.get(url)
    return response.json()

def check_for_rain(weather_data):
    hourly_forecast = weather_data.get('hourly', [])[:1]
    for hour in hourly_forecast:
        for weather in hour['weather']:
            if weather['main'] == 'Rain':
                return True
    return False

def check_and_alert(api_key):
    latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()

    if weather_description != 'Rain':
        weather_data = get_weather_data(api_key, latitude, longitude)
        if check_for_rain(weather_data):
            print("Alert: Rain is expected within the next hour!")
            pygame.mixer.init()
            pygame.mixer.music.load('voice.wav')
            pygame.mixer.music.play()
            while pygame.mixer.music.get_busy():
                pygame.time.Clock().tick(10)
        else:
            print("No rain expected in the next hour.")
    else:
        print("It is currently raining.")

として保存

他で呼び出す時には

    api_key = ''
    check_and_alert(api_key)

というように加えればいい

なので
以前作成した kao.pyの中での処理を変更すればOK

その前に
config.iniを作成し
待機時間とAPIキーを記述しておく

とりあえず mail_voiceのものを使うので

cp ../mail_voice/*.xml .
cp ../mail_voice/config.ini .

でコピー

config.iniを編集する

[Settings]
detection_interval = 1800 #30分

[API_KEYS]
OPENWEATHER_API_KEY = 

としておく

次に kao.pyの編集

 cp ../mail_voice/kao.py .

の後に編集する

import weather_alert

でインポート

# 設定を変数に格納
api_key = config['API_KEYS']['OPENWEATHER_API_KEY']
detection_interval = int(config['Settings']['detection_interval'])

そして天気の取得処理を追加

        if lastTime is None or time.perf_counter() - lastTime > detection_interval:
            # 検出時刻更新
            lastTime = time.perf_counter()
            weather_alert.check_and_alert(api_key)

とする

これで実行したら

  File "/Users/snowpool/aw10s/rain_alert/kao.py", line 12, in <module>
    detection_interval = int(config['Settings']['detection_interval'])
ValueError: invalid literal for int() with base 10: '1800 #30分'

となった

原因はコメント
これが原因で
config[‘Settings’][‘detection_interval’] の値が 1800 #30分 という文字列であり、
これを整数に変換しようとしてエラーが発生していることです。

int() 関数は整数に変換できない文字列を受け取るとエラーを発生させます。
問題の原因は、config[‘Settings’][‘detection_interval’] の値が正しい整数値ではなく、
その後にコメントが付いているためです。
Pythonでは、int() 関数は文字列中の数字のみを整数として解釈します。
とのこと

interval_string = config['Settings']['detection_interval']
interval_without_comment = interval_string.split('#')[0].strip()
detection_interval = int(interval_without_comment)

でコメントを消すこともできるらしいが
とりあえず余計なコメントは削除した

これで実行したら

Hello from the pygame community. https://www.pygame.org/contribute.html
Traceback (most recent call last):
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/base.py", line 368, in _call_geocoder
    result = self.adapter.get_json(url, timeout=timeout, headers=req_headers)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/adapters.py", line 472, in get_json
    resp = self._request(url, timeout=timeout, headers=headers)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/adapters.py", line 500, in _request
    raise AdapterHTTPError(
geopy.adapters.AdapterHTTPError: Non-successful status code 403

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/rain_alert/kao.py", line 41, in <module>
    weather_alert.check_and_alert(api_key)
  File "/Users/snowpool/aw10s/rain_alert/weather_alert.py", line 21, in check_and_alert
    latitude, longitude, _, weather_description = location_utils2.get_current_location_and_address()
  File "/Users/snowpool/aw10s/rain_alert/location_utils2.py", line 28, in get_current_location_and_address
    location = geolocator.reverse((latitude, longitude), language='ja')
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/nominatim.py", line 372, in reverse
    return self._call_geocoder(url, callback, timeout=timeout)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/base.py", line 388, in _call_geocoder
    res = self._adapter_error_handler(error)
  File "/Users/snowpool/.pyenv/versions/3.10.6/lib/python3.10/site-packages/geopy/geocoders/base.py", line 411, in _adapter_error_handler
    raise exc_cls(str(error)) from error
geopy.exc.GeocoderInsufficientPrivileges: Non-successful status code 403

となる

geopy ライブラリがジオコーディングサービスへのアクセスに失敗
が原因とのこと

住所の表示は不要で
緯度経度があれば問題ないため
住所表示機能を削除したら無事に動いた

とりあえず今回もコードを公開するのと同時に
Voicevoxのスクリプトも一緒にリポジトリに入れて公開する