UPSの矩形波(擬似正弦波)と正弦波

UPSを調べているときに
本機は矩形波出力です。
今時のデスクトップPCはPFCという機構が入っていて、
矩形波電源を接続すると動かなかったり壊れたりするらしい。
できれば正弦波出力のUPSを購入するのが無難です。

というレビューを見たので
矩形波(擬似正弦波)ではMac miniが壊れるかを調べる

矩形波(擬似正弦波)ではMac miniが壊れるのは
PC ES / CyberPower小型UPS など矩形波の機種のレビュー

🟦 Mac mini と電源(PFC)の相性
結論:
✅ **Mac mini(M1/M2/M4 などすべて)は
PFC(力率改善回路)内蔵なので、
矩形波 UPS は相性が悪いことがある。**
症状としては:
UPS切り替え時に電源が落ちる
バッテリー運転に入ると不安定
電源ユニットが過熱する
最悪の場合「電源ユニット劣化」に繋がる
※「壊れる可能性がある」というのはここ。

🟩 では正弦波 UPS は?
💯 **正弦波 UPS はPFC搭載PCに完全対応。
Mac mini+正弦波はベストマッチ。**
オムロン BW55T はカタログに明記されています:
常時商用給電方式
正弦波出力
PC(PFC電源)対応

🔍 実際のオムロン公式にもこう書かれています
PFC電源搭載PCには正弦波UPSを推奨
つまり Mac mini に使うなら 矩形波より正弦波が圧倒的に安全。

⚠️ BW55T を使う上で「注意すべきこと/限界」
出力は340 Wまで — GPUを使った大きなAI処理器や、高負荷複数機器を同時起動するなら容量ギリギリになる。
UPS自体が定期的なバッテリー交換対象 — 長期間運用なら経年でバッテリー劣化する。
静音性が完璧とは限らない — UPS特有のファン音やバッテリー交換時の警告音がある機種も存在。
停電持続時間は“数分〜数十分”レベル(UPSはあくまで「瞬断対策・短時間バックアップ用」)。長時間停電の避難用ではない。

また
mac mini 単体なら停電時にどれくらい持つ?
これを調べた

BW55T + Mac mini(本体だけ)なら、だいたい 20〜60分くらい 持つと思っておけばOK

これらを元に
BW55Tを購入

家のブレーカー対策でUPS導入

間にブレーカーを落とす=家全体の電力負荷が一時的に上限を超えている、
もしくは特定回路の過負荷または老朽化によるブレーカの過感度が原因の可能性

UPS(無停電電源装置)で即時対処

目的:瞬断や短時間停電への即応
💡 適用対象
自動売買用Mac mini/ルーター/ONU/Wi-Fiルータ/NAS
重要機器:冷蔵庫、サーバ機、VoiceVoxサーバなど

⚠️ 注意
UPSは「数分〜数十分の保持」目的。長時間停電では自動シャットダウン連携を必ず設定。
macOSでは apcupsd または NUT (Network UPS Tools) が利用可能。
→ これで「残り電力10%以下で安全終了」を自動化できます。

GPTによれば
🎯 あなたにおすすめな容量目安 &予算
あなたの用途(自動売買サーバ・ネットワーク機器)を想定すると以下が目安です:
常時稼働させる機器が「Mac mini/ルーター/NAS/スイッチ」などであれば、合計消費電力が 300W〜500W 程度を見積もると安全です。
→ 500VA〜1000VAクラス(目安:300W=約600VA)あたりから検討。
予算としては 3万円〜6万円 程度のモデルで十分カバーできる可能性が高い。
将来的に「さらに機器を増やす/長時間停電にも備えたい」なら、1000VA〜1500VAクラス(価格5万円〜10万円以上)にしておくと安心。
また、UPSだけではなく「自動シャットダウン機能付き」「正弦波出力」「通信/監視機能付き」があるとより信頼性高いです。

予定している構成は以下の通り

Mac mini M4 Pro 64GB(新品) ← 本番管理・AI・FX・OCR・音声基盤
 ├─ Python自動化スクリプト
 ├─ 音声システム(VoiceVox / pyttsx3)
 ├─ 地震・津波フィード監視
 ├─ 家電マニュアルRAG(ChromaDB)
 ├─ OCR処理(pdfminer / tesseract)
 └─ AIモデル(Ollama: Gemma / DeepSeek)

どれくらい停電に耐えれるかをGPTで質問

CyberPower CP375-JP UPSの場合だと
Mac mini 単体(アイドル 10〜13W) 12W 約 25〜35分
Mac mini(作業中 20〜30W) 25W 約 10〜15分

この商品レビューを調べると
レビューによれば夜中になる音がうるさいらしいが、電池切れも関係しているらしい

✅ **CyberPower CP375-JP は「電池切れ」ではなく、
“異常”を検知したときに夜中でも警告音が鳴る仕組みです。**
そしてその「異常」の代表が バッテリー寿命(劣化・電池切れ)」 です。
レビューで「夜中に急にうるさく鳴り出した」という話は、
ほぼ バッテリー警告ブザー が原因

🔔 CP375-JP が勝手に鳴る理由(レビューで多い順)
① バッテリー劣化(3〜5年で発生) → ピーピーピーと鳴り続ける
UPSは内部に鉛バッテリー(密閉型)を使っていて、
これが弱ってくると “要交換” の警告音が夜中でも鳴る。
点滅+連続ビープ
停電ではないのに鳴る
夜中に気づくケースが多い

⚠ CP375-JP はブザー無効化ができません。
(廉価モデルはほぼ全てブザーOFFができない)
高級UPS(APC Back-UPS Pro 900VA/1000VA など)なら
ブザーOFF機能
管理ソフトで消音
が可能。

家族からの苦情があるため、静音性は必須

UPSがうるさくなる主な原因:
内部ファンの回転 → 熱・負荷が高いときやバッテリー運転時に回る。

警告音(ビープ音) → バッテリー劣化/負荷異常/入力電圧異常などを知らせるため。

インバータやトランスの電子音や低周波ノイズ — 負荷が高いときに出ることがある。

つまり、「容量が大きい」「負荷が大きい」UPSほど静音化は難しい。
家庭・夜間運用のように「静かさ優先」なら、静音性に配慮されたモデルを選ぶ必要がある

GPTによるおすすめは

APC BR1000G Back-UPS Pro 1000VA
家庭向けPC/サーバ用に非常に人気。
多くのモデルでファンが静か or 必要時のみ動作。
バッテリー運転時にも比較的静音で、夜間の稼働にも向いている。

APC BACK-UPS Pro BN1350M2 1350VA
少し余裕を持たせた容量で、Mac mini+複数周辺機器の運用にも対応。
定格容量の余裕があると、インバータ負荷が軽いためファン回転が控えめになりやすい。

APC BR650MI Pro 無停電電源装置 650VA
中容量で、それほど大きな負荷でなければ比較的静か。
ノートPC/小型デスクトップ+ルータなど軽め構成なら実用的。

オムロン 無停電電源装置 400VA BW40T や オムロン 無停電電源装置 550VA BW55T
家庭のルータや小型機器専用、あるいは非常用用としてなら。
ただし高負荷なPC用途には容量不足の可能性あり。

CyberPower CP375-JP UPS
サイズ・価格ともに魅力があるが、先のレビューであったように「バッテリー劣化での警告音」「容量ギリギリだとファン稼働 → 騒音」というリスクあり。
静音性を最優先するなら、上位のUPSに替えることを強く推奨。

🛠 静音運用のための設置・運用のコツ
静音性を維持するには、次のような運用が大切です:
UPS を 人の近く(寝室・居間)に置かず、別室またはラック内・棚の下などに設置
容量に対して余裕のあるUPS を選ぶ(定格の 40〜60% 程度の負荷)
ファンや警告音をミュート可能なモデル を選ぶ/設定する

定期的な バッテリー交換 → 劣化で警告音が鳴るリスクを低減

UPS専用の 遮音ケース or 防振マット で静音化

🔋 各UPSの特徴
APC ES 550 UPS BE550M1-JP E
正弦波出力、小〜中容量ながら静音評価が高く、PFC電源のPCにも安心。
コンパクトで設置しやすく、アパートなど静かな環境でも比較的音が抑えられやすいモデル。

Omron BW55T 無停電電源装置 550VA/340W
本の住宅事情に合わせた家庭向けUPSで、一般的なPC + ルーター程度の構成なら十分。
容量大きすぎず、ファン音や警告音が出にくい構成で“静音性重視の入門用”に向く。

Omron BW40T 無停電電源装置 400VA/250W
非常にコンパクトで、ルーター・ONU や小型機器専用のサブUPSとして使いやすい。
静音性重視で、「PC本体は別UPS」「通信機器だけバックアップ」の構成に最適。

CyberPower SL550U-JP UPS
コスパ重視の小容量UPS。容量をそこまで必要としない機器(ルーター、HDD、ラズパイなど)向け
「とにかく静か&安価」に抑えたい場合の選択肢。

⚠️ 選ぶときの注意点
UPSは「容量(VA/W数)に余裕を持たせる」ことが静音化のコツ。容量ギリギリで使うとファン動作頻度が上がる。

健康なバッテリーかつファン状態のチェックを定期的に。劣化すると「夜中ブザー+ファン動作」で音が大きくなるモデルもある。

設置場所にも配慮を — 床に直置きせず、防振マット+少し離したスペースを確保することで静音性が向上

upsを使ったことがないため、どのようなものかを知りたい。 モバイルバッテリーみたいなかんじ?
とGPTで聞いてみた

🟦 【まず結論】
❌ UPS = モバイルバッテリーとは全く違う
⭕ UPS = 「停電しても電気が途切れない装置」
「モバイルバッテリーみたいに電気を貯めておくもの」というのは半分正解ですが、
UPSは パソコン・ルーター・ONUなどの重要機器を守るための“安全装置” です。

🟩 【UPSの役割:4つの重要機能】
① 停電した瞬間に0秒で電力を供給する
モバイルバッテリーは人がケーブルで繋ぎ直す必要がある
UPSは 電力が落ちた瞬間に自動で切り替わる(無瞬断)
➡ パソコン、サーバー、FX自動売買、データベースは絶対必要
➡ 「ブチッ」と電源が落ちる事故から守る

② 電圧が不安定なときに自動補正する(サージ・瞬断対策)
雷・ブレーカー落ち・瞬低などに強い。
➡ Mac mini やルーターなどの電子機器を長持ちさせる
➡ 特にアパートは電圧が揺れやすい

③ 停電時に数分〜数十分の余裕を作る
CyberPower CP375-JP の場合:
ONU + ルーター:40〜60分
Mac mini(軽負荷):10〜20分
➡ その間に
自動シャットダウンができる
通信が維持される

④ 安全にシャットダウンする時間を作る
データ保存
Mac mini を安全にシャットダウン
などの処理が可能。
➡ 「データ破損」や「SSD壊れる」を防ぐ生命線。

特に:
停電
ブレーカー落ち
電圧の揺らぎ(アパートは多い)
パソコンの突然落ち(データ壊れる)
を防ぐには UPSが事実上の必須。

🟦 【UPSは“二段構え”が最強】
① 通信系 (ONU + ルーター)
→ 小型UPS(CP375-JP / BW40T)
→ 40〜60分持つ
→ 停電でもインターネット復旧まで生きる
② Mac mini本体
→ 静音の中容量 UPS(APC BR1000G など)
→ 20〜40分
→ 静音で家族も安心
→ データ破損なし
➡ この構成が家庭サーバーでは標準構成。

✅ なぜ UPS で Mac mini の破損リスクを下げられるか
UPS は、停電や瞬停、電圧低下・サージなどの 電源トラブルを瞬時に補う装置。
電源が不安定になる「瞬断」「瞬低」でも、PC を安全に保護できます。

突然の電源断で Mac mini がシャットダウン → データ破損、SSD やファイルシステムの破損、最悪は起動できなくなる。
UPS があれば、シャットダウンまたは処理停止を安全に行える時間が稼げる。

電圧の上下(電圧サグ、ブレーカーの不安定、サージ電流など)による電子部品への負荷・劣化も防げる
これはモバイルバッテリーでは対応できない。

つまり、UPS は「停電対策」だけでなく、「
電源の品質維持と保護」を兼ねた「PC・サーバ保護装置」だと考えるのが正しい。

🔧 どのようにUPSを使うと安全か(おすすめ構成)
あなたのように 家庭でサーバ・自動化・FX自動売買など24/7稼働 するなら、次のような構成が現実的で強固:
メイン機(Mac mini) + モニター + 必要機器 → 中容量〜高容量UPS
通信機器(ONU、ルーター、ネットワークハブなど) → 小容量 UPS or 別系統で冗長化
UPS は壁コンセント直結。電源タップや延長コードは通さない — 電圧管理上は重要。
UPS と Mac mini を USB 接続(または対応ソフトで連携) → 停電時に自動で安全シャットダウンする設定を入れておくことで、人的対応なしでも安全確保。

✅ UPS は “常に充電されながら” 機器へ電力を供給する装置
(ずっとコンセントに挿しっぱなしで使うもの)
つまり 常時「待機+充電状態」 が正しい動作です。
モバイルバッテリーのように
「使うときだけ充電する」「普段は抜いておく」
という使い方は しません。

UPS は内部バッテリーを常に充電し、同時にその電力を機器へ流しています。
停電や瞬断が起きると、内部バッテリーが 0秒で 電力供給に切り替わります

🟦 UPS は基本的に「常時接続・常時充電」が前提
■ UPS の正常状態
バッテリー残量:常に 100% に近い
AC(商用電源)から電力を供給
バッテリーは軽く充電し続けるだけ
電気料金はほぼ変わらない(消費電力 3〜7W 程度)
■ 停電時
即、内部バッテリーへ切替(0秒・無瞬断)
Mac mini やルーターが落ちずに動き続ける
バッテリーの残量が尽きる前に安全シャットダウン可能

🟧 さらに役立つ補足
🔸 バッテリーは劣化する?
する。ただし
2〜4年が一般的な寿命
交換バッテリーが売っている(APC、Omron)
Mac miniなど高価PCを守る保険と割り切るべき部分
🔸 充電しっぱなしで危なくない?
危なくない。
UPSはずっと充電されていることが前提で内部制御されている。
🔸 夜中にブザー鳴るのは何故?
バッテリー劣化
過負荷
内部温度
→ なので静音性優先なら 容量に余裕のあるUPSを使う方が安全

コンセントからupsへ挿し、upsへmac miniを挿すかんじ? この場合、コードの長さはどこまで長くできる?
これに関しては

UPS は 壁コンセントに直挿し
Mac mini やルーターは UPS のコンセントに挿す
というのが基本&最も安全な接続

🔍 では、ケーブルの長さはどこまで長くできる?
結論:
✅ 推奨は 1.5m〜2m 程度まで
⚠️ 最大でも 3m まで(これ以上はUPSの性能低下リスクあり)

🟥 なぜ長すぎる延長コードを使うとダメなのか?
UPS は
電圧の変動
瞬断
サージ(雷)
過負荷
から機器を守るために
壁コンセントと UPS の間の電気品質が重要です。
延長コードを長くすると:
❌ 電圧降下しやすい
(UPS の充電効率が落ちる)

❌ ノイズを拾いやすい
(サージ保護が弱くなる)
❌ 過負荷検知が正確でなくなる可能性
(UPS が“正しく保護できない”)
そのためメーカー(APC/Omron/CyberPower)は
延長ケーブルの使用を非推奨 or 最小限としています。

🔵 現実的に安全に使える長さ
■ 壁コン → UPS 間
1.5m〜2m が安全圏
どうしても離したい場合でも 3m以内
電源タップは使わず「延長ケーブル」だけ(スイッチなしタイプ)
👉 UPS は「壁に直挿し」が基本
👉 アパートの配線は細いので延長しすぎると電圧低下しやすい

■ UPS → Mac mini 間
これは多少伸ばしても大丈夫。
2m〜5m でもOK(この距離では電圧降下しない)
高品質な 3ピン or 2ピン電源ケーブルを使うなら問題なし
👉 Mac mini 側は延ばしてOK
👉 UPS 側は短めにするのがポイント

🔋 あなたの用途でおすすめ構成
壁 → UPS:短く(1.5〜2m)
UPS → 機器:自由度あり(2〜5m OK)
その上で
Mac mini、ルーターは必ず UPS につなぐ
→ 停電や瞬断で落ちるリスクが激減

高い位置に置くため、mac mini のコンセントを延長するには?
✅ Mac mini の電源ケーブルは“通常の延長コード”ではなく「3ピン延長ケーブル(AC延長ケーブル)」で延長するのが最適
(UPS → Mac mini の間を延ばす。UPS → 壁のほうは延長NG)

🔌 Mac mini の電源はどんな規格?
Mac mini は
「メガネ型(IEC C7)」の AC ケーブルを使っています。

🔧 延長方法は2つ
① C7 → 通常コンセント(2ピン)の延長ケーブルを使う(最も簡単)
✔ 一番おすすめ
AC電源の延長ケーブル(2m〜5m)を買うだけ
Mac mini の純正ケーブルを延長コードに挿す
安定性・安全性が高い
UPS経由なので電源品質も問題なし
例:
・雷ガードなし
・スイッチなし
・太めの2ピン/3ピン延長ケーブル
こういった“普通の延長コード(スイッチなし)”が最適。

② C7 延長ケーブルを使う(よりスマート)
C7 → C8 の「電源ケーブル延長専用」も存在します。
メリット
ケーブル1本でシンプル
高い位置までスマートに引ける
検索ワード
「C7 延長ケーブル」「メガネ電源 延長」
長さ:1.5 / 2 / 3 / 5m で選べる。

🟥 やってはいけない延長方法(重要)
❌ UPS → 壁コンセント側の延長
UPS は壁に直挿しが原則なので延長禁止
(電圧低下・過電流・火災リスクが増す)
❌ スイッチ付きタップをUPSに挟む
誤作動・接点トラブルで UPS 保護が無効になる
❌ 極細ケーブル・100均品で延長
Mac mini のACは最大150W近く流れるので危険

UPS は重くて振動があるため
床・棚の下に置く方が静音で安全です。
Mac mini は
机上・棚上・壁掛けなど自由に配置可能。

🔍 ケーブル長の目安
UPS → Mac mini は 3m〜5m でも全く問題なし
ノイズの影響もほぼゼロ
電圧降下も無視していいレベル

🔧 おすすめの延長ケーブル(条件だけ)
スイッチなし
雷ガードなし(UPSが持っている)
極太タイプ(1.25㎟以上)
2m〜5m

次にコンセントについて

### [APC BE550M1-JP E (UPS)]()
#### 中容量 UPS
*¥15,480*

### [Omron BW55T 無停電電源装置 550VA / 340W]()
#### 中容量 UPS
*¥19,580*

### [Omron BW40T 無停電電源装置 400VA / 250W]()
#### 小容量 UPS
*¥16,515*
は壁に挿すらしいが、他のコンセントの邪魔にならないか?

✅ この3機種は「壁に直挿しするタイプ」だけど、設置によっては“他のコンセントの邪魔をしないようにできる”
⭕ もし干渉する場合の解決策も複数ある(安全に使える方法あり)

🟦 なぜ「UPSは壁に直挿し」なのか?
UPSメーカー(APC / オムロン)は
**延長コード経由の接続を“非推奨”**にしている理由:
延長コードの劣化で発熱・火災リスク
延長コードの許容量を超える危険
電圧降下でUPSの充電が不安定になる
落雷サージの保護機能が弱くなる
だから、
壁 → UPS は必ず直挿しが原則になります

🟥 問題:壁に直挿し=隣のコンセントを潰す?
Yes。
とくに
APC BE550M1-JP E(横幅が広い)
オムロン BW55T/BW40T(横方向に膨らむ)
これらは 1つ、場合によっては2つ分のコンセントを占領しがち。

🟩 対策:他のコンセントを潰さず安全にUPSを直挿しする方法
壁コンセント位置に90度曲げる「L字アダプタ」を使う(安全&定番)
ただし条件あり
👉 スイッチなし・安全基準適合・PSE認証のL字アダプタのみ
UPS を“横ずらし”にできるので、
隣のコンセントスペースを塞がない。
※L字アダプタは延長コードではないのでメーカー規定に抵触しません。

壁のコンセント自体が2口の場合 → 上下で挿し分ける
コンセントが縦に並んでいるタイプなら
邪魔になりにくい。

PS を「OAタップ側」に挿すのはNG(注意)
これはやっちゃダメ
理由:UPS保護機能の低下+火災リスク
メーカー全社が非推奨。

🟦 実際にどれくらい邪魔になるか(サイズ目安)
■ APC BE550M1-JP E
幅 99mm
高さ 305mm
➡ 片側のコンセントは潰れる可能性あり。
➡ 縦向きに挿せば干渉なしのケースが多い。

■ オムロン BW55T / BW40T
幅が実質的に広めで、厚みもある
➡ 片方のコンセントを塞ぎがち
➡ L字アダプタ併用が現実的で安全

✅ UPS → マルチタップ(電源タップ)は “可能”
⚠️ ただし条件付き(安全な使い方を守ればOK)
一方で、
❌ 壁 → マルチタップ → UPS(逆向き)は絶対にNG
(メーカー全社が禁止)

upsからマルチタップコンセントはできる?
✅ UPS → マルチタップ(電源タップ)は “可能”
⚠️ ただし条件付き(安全な使い方を守ればOK)
一方で、
❌ 壁 → マルチタップ → UPS(逆向き)は絶対にNG
(メーカー全社が禁止)

🟦 ■ なぜ UPS → 電源タップ はOKなのか?
UPS は以下のように設計されています:
出力側(UPS→機器)の電流・過負荷はUPS側で制御
出力コンセントを増やすこと自体は問題ない
ただし「UPSの定格W数以内」である必要がある
つまり
UPSにかかる合計負荷が上限W数を超えなければ安全。
あなたが検討している UPS の定格:
APC BE550M1-JP E → 330W
Omron BW55T → 340W
Omron BW40T → 250W
Mac mini は
ピークでも 150W未満なので余裕があります。

🟩 ■ UPS からマルチタップへ接続する場合の“条件”
以下の条件を守ると安全。
✔ 必ず「スイッチなし」のタップを使う
スイッチ付きだと
ラインノイズ
接触不良
UPS保護無効化
が起こる場合がある。

✔ 雷ガード(サージ保護)付きは非推奨
理由:
「UPS内のサージ保護」と干渉するため。
👉 “ただの電源タップ”が正解

✔ タップは“高品質・太いケーブル(1.25mm²〜)”が安全
100均・極細ケーブルはNG。

✔ UPS定格W数以内に負荷を収める
例:BW55T(340W)の場合
Mac mini:100W前後
ルーター+ONU:10〜20W
モニター:20〜40W
ラズパイ:5〜10W
→ ぜんぶ入れても 200W 以下で余裕あり。

✔ タップは「横置き」ではなく「床置き・固定」が安全
UPSの近くでケーブルが詰まると発熱リスク増。

🟧 ■ まとめ
✔ UPS→電源タップ=OK(条件付きで安全)
✔ 壁→タップ→UPS=NG(禁止)
✔ 合計W数だけ守れば複数機器を同時保護できる
✔ スイッチなし・雷ガードなしの高品質タップを使うこと

次に、オムロン 無停電電源装置(常時商用給電/正弦波出力) 550VA/340W BW55T ブラック のコンセントについて調べる

🔌 BW55T の出力コンセント仕様
BW55T の 出力コンセント数(バックアップ対応分) は 4つ。

出力形状は 日本の家庭用AC(100 V 単相・アース付き) に対応する形式。
つまりここに「Mac mini」「ルーター」「NAS/HDD」「その他機器」といった複数の機器を直接つなぐことができます。

⚠️ 注意すべき点
出力合計の消費電力が 340 W(定格)以内 である必要があります。
出力用コンセントは 4 口のみなので、そこに直接挿せる機器の数には上限がある
追加で “タップ(電源タップ)を挿す” 場合、そのタップがスイッチ付きであったり、雷ガード付きだったりすると安全性やUPSの効果が落ちる可能性があるので注意

次にスマホでコンセントの写真を撮り
UPSがコンセントを塞がないかGPTで質問

✅ この壁コンセント形状なら、BW55T も APC も “問題なく挿して、隣の口も塞がずに使える”
⭕ 下段にUPSを挿す配置ならほぼ確実に干渉しない

🔍 写真から分かるポイント
① コンセントは縦2口構造(通常の日本住宅)
→ UPS のプラグは“上下方向にしか広がらない”
→ 左右に広がらないため、隣の口への干渉は起きにくい
② プレート周りのスペースも標準的
→ UPSのプラグは太いが、上下どちらかに挿せば反対側は空く

✔ BW55T(プラグは縦長)
BW55T のプラグは「縦1方向に長い」だけで、横幅はほぼ普通のプラグと同じ。
🔧 実際のシミュレーション(写真をもとに)
あなたの写真の壁コンセントにUPSプラグを重ねると:
UPSのプラグ部分は 縦7〜8cmくらい
壁コンセントの2口間の間隔は 3〜4cm
だから UPS を 下に挿せば上側は完全にフリー
さらに
上段→スマホ充電器など小物を使用
下段→UPSを直挿し
UPS本体は床に置く
という配置が 一番きれいで熱もこもらず安全。

🎯 結論(設置可否)
✔ このコンセントなら BW55T / BW40T / APC BE550M1-JP すべて問題なく挿せる
✔ 下段に挿すだけで上段は自由に使える
✔ 他のコンセントを塞ぐ心配はない

とりあえず購入候補は決まったけど、正弦波、矩形波(擬似正弦波)もあるのでこちらを調べることにする

mac mini のm4 と m4 pro の違い、そしてmac studio m2max の違い

mac mini のm4 と m4 pro の違い、そしてmac studio m2max の違い

機種一覧
* Mac mini (M4) — Apple M4チップ搭載のベーシックモデル
* Mac mini (M4 Pro) — 上位チップ搭載のMac mini
* Mac Studio (M2 Max) — プロ用途向けハイエンドモデル

1. Mac mini (M4) の特徴
主な仕様
* Apple M4チップ:10コアCPU+10コアGPU(モデルによって若干異なる構成あり)という記述あり。 ラックソリューションズ+3EveryMac+3The Verge+3
* メモリ:16GBなどからスタート、構成により24GBなど選択肢あり。 TechRadar+1
* 接続・ポート:レビューでは「新しいサーマルシステムで静か」「一般用途に十分」などの評価。 The Verge
* 向いている用途:Web/オフィス用途、軽めのクリエイティブ作業、コストを抑えたい方。
注意点・限界
* グラフィック処理・動画編集で「本格的に使うには上位モデルの方が有利」というレビューも。 The Verge+1
* 将来的な拡張性(特にGPU・メモリ)を重視するなら、上位モデルを検討すべき。

2. Mac mini (M4 Pro) の特徴
主な仕様
* Apple M4 Proチップ:ベース構成で12コアCPU(例:8P+4E)、16コアGPUという記述あり。上位構成では14コアCPU+20コアGPUも。 AppleInsider+2Apple+2
* メモリ・ストレージ構成もより上:例として48GB/64GBユニファイドメモリ、1TB~8TB SSDなど。 Apple+1
* 接続インターフェースも強化:Thunderbolt 5対応、10Gb Ethernetオプションなど。 Cinco Días+1
* 向いている用途:動画編集、中~上位クリエイター作業、マルチディスプレイ環境、将来性を重視する使い方。実際、レビューでは「M4でも十分だけど、RAW現像・動画ではProモデルにしておいて安心」という声あり。 Reddit+1
注意点・限界
* コストが大きく上がる。仕様を上げれば価格も急増。
* 物理的な拡張性(例えばGPU交換)などはMac mini形式ゆえ限られている。

3. Mac Studio (M2 Max) の特徴
主な仕様
* Apple M2 Maxチップ:12コアCPU(8P+4E)、30コアGPU (または一部仕様で38コアGPU) など。メモリ帯域幅400 GB/s。 Rentex+2Techable+2
* メモリ・ストレージ容量・拡張性が非常に高く、プロユースに耐える仕様。外部モニタ・アクセサリ接続も豊富。 B&H+1
* 向いている用途:本格的な映像編集(8K含む)、3Dレンダリング、複数アプリ同時使用、プロ用途・将来を見据えた構成。実際、ユーザー投稿では「Premiere Pro/After Effectsを複数起動しても余裕あり」とのコメントあり。 Reddit+1
注意点・限界
* 価格がかなり高め。一般用途・軽作業であればオーバースペックになる可能性あり。
* Mac Studioもアップグレード可能な部品は限られており、「購入時のスペック選び」が重要。

おすすめシナリオ
日常使い/Office/Web仕事が中心 → M4で十分。価格を抑えたいならこちら。
写真現像・動画編集を時々行う/将来少し余裕を持たせたい → M4 Proが安心。少し先まで使える。
8K動画編集/3Dモデリング/複数アプリ常駐・プロ用途 → Mac Studio (M2 Max)で安心・快適。

自動売買システム(FX・株・仮想通貨・アルゴリズム取引など)を運用するうえで、
①安定稼働、②低遅延、③将来的な拡張性、④冷却・静音・耐久性、⑤コストパフォーマンス、という観点があります。これらを踏まえて、以下に Mac mini (M4)、Mac mini (M4 Pro)、Mac Studio (M2 Max) のそれぞれのメリット・留意点を整理します

✅ モデル別:適性・メリット・留意点
1. Mac mini (M4)
スペック:10コアCPU+10コアGPUの「M4」チップ搭載。標準メモリ16GB/24GBなど。

自動売買用途でのメリット:
コストを抑えられる。最小構成でも十分な処理能力あり。
コンパクト・低消費電力で、24時間稼働にも向く。静音性も評価されています。
The Verge
軽めの戦略(例えば1 〜 数チャネル監視、API呼び出し+少量データ処理)には十分な性能。
留意点:
高頻度売買、複数インジケータ・機械学習・大規模バックテスト・大量データ処理などを頻繁に行うなら「余裕」が少ない可能性あり。
メモリ・ストレージ・I/Oの将来性で上位モデルとの差が出る可能性あり。
私見として:コスト重視・戦略が比較的シンプル/軽量なら十分な選択肢。

2. Mac mini (M4 Pro)

スペック例:12コアCPU+16〜20コアGPU、「M4 Pro」チップ。メモリ最大48〜64GB、ストレージ1TB 〜。
自動売買用途でのメリット:
複数戦略並列実行、リアルタイム監視/分析、機械学習モデルの推論・最適化など、より“重め”な処理にも耐え得る。
将来的な拡張性も高く、「数年使えるマシンを選びたい」なら安心。
留意点:
コストが上がる。
“過剰”になる可能性もあり、自動売買だけならM4でも十分という意見もあります。
Reddit
私見として:複数通貨ペアを同時・高頻度で売買、機械学習・深層学習を導入済み/将来導入予定ならこのモデルが安心。

3. Mac Studio (M2 Max)

スペック:12コアCPU+30コアGPU、400 GB/s帯域、M2 Maxチップ搭載。
自動売買用途でのメリット:
非常に高性能。例えば膨大な時系列データを用いた機械学習、ディープラーニング、複数モデル同時展開、大量バックテストなど超ヘビーな処理も可能。
将来の用途拡張(例えばリアルタイムAI予測+注文+可視化)を考えているなら、キャパシティに余裕あり。
留意点:
価格がかなり高め。自動売買だけに特化するなら“オーバースペック”になる可能性あり。
消費電力・発熱・設置環境(冷却・騒音)なども少し気をつけたほうが良い。
私見として:自動売買プラットフォームを自作し、機械学習・ディープラーニング・大量データ作業・さらに他用途(動画編集など)も同時に行いたいなら選択肢になる。

🎯 “自動売買システムにおいて重要な仕様”と各モデルの着眼点
安定稼働性(24時間運用):静音・冷却・信頼性が重要。どのモデルもApple製チップで冷却仕様も優れていますが、高負荷状態を長時間続けるならThermal余裕のあるモデル(M4 Pro/M2 Max)を選ぶ意味あり。
メモリ容量・帯域:リアルタイム分析・複数戦略並列・機械学習を使うなら “メモリ量+帯域幅” が鍵です。 M4(帯域120 GB/s程度)とM4 Pro(帯域273 GB/s)で差があります。

I/O/ネットワーク:自動売買ではネットワーク遅延・APIレスポンス・ストレージ速度も影響。M4 Proには10Gb Ethernetオプションもあります。
Apple
将来性・拡張性:将来的により高度なアルゴリズム・複数戦略・機械学習を入れたいなら、少し余裕を持ったモデルにしておくのが賢明。
コスト対効果:戦略が軽ければ過剰スペックになり無駄コストになります。自分の想定運用・将来像から必要なスペックを割り出すことが大切。

✔️ 結論:あなたにおすすめするモデル
ご記憶の通り、ユーザー様は「自動売買システム」を構築されており、また他にも多用途(経済指標データ取得・SQLite/Neo4j格納・AIモデル構築など)も考えておられます。
これらを勘案すると:
主に「自動売買+監視+SQLite/Neo4j格納」ぐらいなら → Mac mini (M4) でも十分に動くと思います。コスト重視ならまずこの構成でも良いでしょう。
「自動売買+複数戦略+機械学習/モデル推論を含む」かつ「将来数年使い倒す」なら → Mac mini (M4 Pro) がバランス良くおすすめです。
もし「複数市場・高頻度・機械学習+ディープラーニング+バックテスト大量データ処理+他にも用途(動画/3Dなど)も併用」などの重め用途を見据えているなら → Mac Studio (M2 Max) が“投資する価値あり”となるでしょう。

✅ なぜ64 GBで “十分” と言えるか
自動売買システム(多くの場合)では、リアルタイム為替データの監視+売買ロジックの実行が主な処理です。実際、比較的軽めの自動売買では「4〜8 GBでも動く」という報告があります。

「最低限16 GB以上を推奨」というレビューもあります(例えば、マルチモニタ・複数チャート・幾つものインジケーターを走らせる場合)。
つまり、64 GBは “まず困らない” 領域であり、将来的に別用途(分析用DB・バックテスト・機械学習モデル)にも使うなら余裕がある構成です。

⚠️ ただし、用途によっては 64 GB 以上も検討すべき
以下のようなケースでは、64 GBで十分かどうか慎重に考えたほうが良いです。
何十/何百の通貨ペアを同時に監視・売買 → 処理量・データ量が増え、メモリ消費も増加します。
高頻度売買(HFT)や超高速アルゴリズム → 遅延・レスポンス重視でメモリ以外(ネットワーク・I/O・CPU)もボトルネックになりますが、メモリも余裕見ておいたほうが安心。

機械学習/ディープラーニングを併用 → たとえば過去データ大量に読み込んでモデルを推論・学習させるなら、メモリ+ストレージ帯域・GPUなども要求されるので “64 GBだけ” では将来限界が出るかもしれません。
バックテスト/大規模シミュレーション → 過去データを高速に処理しながら複数パラメータを検証、となるとメモリ、CPU、ストレージ読み書き速度すべてが重要です。

✅ Mac mini(M1〜M4)は本体にスピーカー内蔵している
Appleの公式仕様にも:
「内蔵スピーカー」
「3.5mmヘッドフォンジャック」
「HDMI 2.1(オーディオ出力対応)」
とあるので
音声を鳴らすだけなら
 Mac mini本体の内蔵スピーカーで音が出る
音質をこだわらない用途(システム音・読み上げなど)は問題なし

LINEのメッセージ読み上げ
家庭内の通知アラート
地震・津波速報の読み上げ
学校のお知らせOCRの読み上げ
レシートOCRの在庫通知
ルート案内・天気案内
など 全て問題なく本体スピーカーで対応可能

Mac miniの内蔵スピーカーの特徴:
クリアで聞こえやすい
音量は広めの部屋だと少し弱い
低音はほぼ出ない
1m〜3mの距離なら音声読み上げは問題なし

🎧 外付けスピーカーが必要になるケース
以下の場合だけ追加を検討すればOK:
リビングの広い空間で聞きたい
家族全員が離れた場所で通知を聞きたい
音楽も楽しみたい
AIキャラの音声を「いい声」で聞きたい
車並みの大音量が必要

Macの容量の確保

Macの容量の確保

不要なものを削除しないとアップデートもできないので
まずは

https://wp.developapp.net/?p=7068
でインストールした
stable diffusion web UI 
を削除

rm -rf ~/stable-diffusion-webui     

で削除

sudo rm -rf ~/stable-diffusion


stable-diffusion
も削除

rm -rf ~/.cache/huggingface


モデル(ckpt)の HuggingFace キャッシュも削除

これで容量の確保ができた

どの分岐でも同じスキーマで必ず上書き保存するように 共通の保存関数を作り、キャッシュヒット時も呼ぶようにする

import sys, os
sys.path.append(os.path.abspath("."))  # プロジェクト直下をパスに追加
# 以後 from evac.config import Cfg が通る

import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString
import osmnx as ox
import networkx as nx
import folium
import os

GPKG = "outputs/shizuoka_hazard.gpkg"

# 1) 既存レイヤ読み込み(両方とも 4326)
coast   = gpd.read_file(GPKG, layer="coastline")
tsunami = gpd.read_file(GPKG, layer="tsunami_inundation")

# ----- パラメータ -----
# 自宅座標(lat, lon)←必ず置き換え
home_wgs = (34.728, 137.978)
# OSM道路の取得半径[m](到達不能なら大きく:15000, 20000…)
GRAPH_RADIUS_M = 12000
# 避難所CSV(cp932想定)
SHELTER_CSV = "data/20250801_指定緊急避難場所一覧.CSV"
# ----------------------

# 2) 避難所CSV→点化(列名は自動検出)
df = pd.read_csv(SHELTER_CSV, encoding="cp932")
lat_candidates = 
lon_candidates = 
if not lat_candidates or not lon_candidates:
    raise ValueError("CSVに緯度/経度列が見つかりません。列名を教えてください。")
lat_col, lon_col = lat_candidates[0], lon_candidates[0]
df[lat_col] = pd.to_numeric(df[lat_col], errors="coerce")
df[lon_col] = pd.to_numeric(df[lon_col], errors="coerce")
df = df.dropna(subset=[lat_col, lon_col])
shelters = gpd.GeoDataFrame(df.copy(),
    geometry=gpd.points_from_xy(df[lon_col], df[lat_col], crs="EPSG:4326")
)

# 3) 安全な避難所 = 浸水域と非交差
#   (unionはメモリ負荷になるので、空間結合で判定)
safe_join = gpd.sjoin(shelters, tsunami[["geometry"]], how="left", predicate="intersects")
shelters["safe"] = safe_join["index_right"].isna()
safe_shelters = shelters[shelters["safe"]].copy()

# --- 4) OSM道路取得(徒歩網) ---
G = ox.graph_from_point(home_wgs, dist=GRAPH_RADIUS_M, network_type="walk", simplify=True, retain_all=False)
nodes, edges = ox.graph_to_gdfs(G, nodes=True, edges=True)

# u,v,key を“列”にする(重要)
edges = edges.reset_index()

# CRSを4326に
if edges.crs is None:
    edges = edges.set_crs(4326)
elif edges.crs.to_epsg() != 4326:
    edges = edges.to_crs(4326)
# --- 5) 浸水域と交差する道路を除去(sjoin→集計で一意化) ---
left = edges[["u","v","key","geometry"]].copy()   # edges は reset_index() 済み想定
right = tsunami[["geometry"]]

tag = gpd.sjoin(left, right, how="left", predicate="intersects")

blocked = (
    tag.assign(hit=tag["index_right"].notna())
       .groupby(["u","v","key"], as_index=False)["hit"].max()
       .rename(columns={"hit":"blocked"})
)

edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
edges2["blocked"] = edges2["blocked"].fillna(False)

edges_keep = edges2.loc[~edges2["blocked"]].copy()

# ★ ここが重要:graph_from_gdfs に渡す前に MultiIndex に戻す
#   (key が float になっていたら int にしておくと安心)
if edges_keep["key"].isna().any():
    edges_keep["key"] = edges_keep["key"].fillna(0)

edges_keep["key"] = edges_keep["key"].astype(int)
edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()

# --- 6) サブグラフ再構築(通行可のみ) ---
used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()

# nodes_keep の geometry 列が欠けていたら補完
if "geometry" not in nodes_keep or nodes_keep["geometry"].isna().any():
    nodes_keep = nodes_keep.set_geometry(
        gpd.points_from_xy(nodes_keep["x"], nodes_keep["y"]), crs=edges_keep.crs
    )

# OSMnx のバージョン差に対応
try:
    G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))
except AttributeError:
    from osmnx import utils_graph as ug
    G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))


# 7) 出発・目的地ノード(最寄り“安全”避難所)
home_pt = Point(home_wgs[1], home_wgs[0])
# 最寄り安全避難所(直線距離)…なければ全体から選ぶ
TARGET = safe_shelters if len(safe_shelters) else shelters
METRIC = "EPSG:32654"# 静岡周辺 UTM
hp = gpd.GeoSeries([home_pt], crs=4326).to_crs(METRIC).iloc[0]
tp = TARGET.to_crs(METRIC).copy()
tp["__dist_m__"] = tp.geometry.distance(hp)
dest_pt = TARGET.loc[tp["__dist_m__"].idxmin(), "geometry"]

# ▶ 互換呼び分け
try:
    orig = ox.distance.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.distance.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)
except AttributeError:
    # 新API名(環境によってはこちらが有効)
    orig = ox.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)


# 8) 最短経路(通行可能グラフ)
try:
    route = nx.shortest_path(G2, orig, dest, weight="length")
except nx.NetworkXNoPath:
    raise RuntimeError("到達経路なし。GRAPH_RADIUS_Mを増やすか、別の避難所で再試行を。")

# 9) GPKGへ追記
# #    既にある場合は上書きしたいなら driverオプションを切り替える(ここは追記)
# shelters.to_file(GPKG, layer="shelters_all", driver="GPKG")
# safe_shelters.to_file(GPKG, layer="shelters_safe", driver="GPKG")
# edges.to_file(GPKG, layer="roads_all", driver="GPKG")
# edges_keep.to_file(GPKG, layer="roads_safe", driver="GPKG")

# 9) GPKGへ書き出し(毎回クリーンに上書き)
common_kwargs = dict(
    driver="GPKG",
    engine="pyogrio",
    layer_options={"OVERWRITE": "YES"},  # 既存レイヤーを置き換える
)

shelters.to_file(GPKG, layer="shelters_all",   **common_kwargs)
safe_shelters.to_file(GPKG, layer="shelters_safe", **common_kwargs)
edges.to_file(GPKG,    layer="roads_all",     **common_kwargs)
edges_keep.to_file(GPKG, layer="roads_safe",  **common_kwargs)


# 10) Folium地図
m = folium.Map(location=[home_wgs[0], home_wgs[1]], zoom_start=12, control_scale=True)
folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定(A40)").add_to(m)
folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)

# 経路を線で描画
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip="避難経路").add_to(m)

# 出発・目的地
folium.Marker([home_wgs[0], home_wgs[1]], popup="現在地", icon=folium.Icon(color="green")).add_to(m)
folium.Marker([dest_pt.y, dest_pt.x], popup="避難所", icon=folium.Icon(color="blue")).add_to(m)

# 避難所レイヤ(安全のみ強調)
folium.GeoJson(safe_shelters.__geo_interface__, name="安全な避難所",
               marker=folium.CircleMarker(radius=4)).add_to(m)
folium.LayerControl().add_to(m)

# 保存(任意)
m.save("outputs/evac.html")
print("✓ 完了: GPKG追記 & outputs/evac.html を作成")

# # ルートの延長[m]は G2 の edge 属性 'length' を合計(OSMnxが付けてくれてます)
# meters = sum(ox.utils_graph.get_route_edge_attributes(G2, route, 'length'))
# walk_kmh = 4.8   # 歩行速度(必要に応じて変更)
# drive_kmh = 20.0 # 災害時の車想定の控えめ値(使うなら)

# eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
# eta_drive_min = meters / (drive_kmh * 1000 / 60)
# print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")

# ▼ 距離[m]の取り方をバージョン互換で
try:
    # 新しめのOSMnx(推奨): ルートをGDF化して length を合計
    route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
    meters = float(route_edges["length"].sum())
except Exception:
    # フォールバック: NetworkXからエッジ属性 length を合計
    meters = 0.0
    for u, v in zip(route[:-1], route[1:]):
        ed = G2.get_edge_data(u, v)  # MultiDiGraph: {key: attr_dict}
        if isinstance(ed, dict) and len(ed):
            meters += min(d.get("length", 0.0) for d in ed.values())

# ETA(必要なら速度を調整)
walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")

# Folium のツールチップにも表示
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分"
).add_to(m)


from shapely.geometry import LineString
import geopandas as gpd

route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]
route_gdf = gpd.GeoDataFrame(
    {"meters":[meters], "eta_min_walk":[eta_walk_min], "eta_min_drive":[eta_drive_min]},
    geometry=[LineString(route_lonlat)], crs="EPSG:4326"
)
route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

import geopandas as gpd
# ok = not route_gdf.intersects(tsunami.unary_union).iloc[0]
# print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")

# 旧: tsunami.unary_union  ← 警告が出る
# 新:
flood_union = tsunami.geometry.union_all()
ok = not route_gdf.intersects(flood_union).iloc[0]
print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")

# 
# === ルート検証 → ETA → 保存(キャッシュ対応, 上書き&追記の方針を反映) ===
from shapely.geometry import LineString, Point
import geopandas as gpd
import pandas as pd
import hashlib, json
from datetime import datetime
from fiona import listlayers
import osmnx as ox
import networkx as nx

GPKG = "outputs/shizuoka_hazard.gpkg"
METRIC = "EPSG:32654"  # 静岡周辺

# 0) 津波 union はキャッシュを優先(なければ作る)→ 上書き耐性ありで作成可
if "tsunami_union" in listlayers(GPKG):
    flood_union = gpd.read_file(GPKG, layer="tsunami_union")
else:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    flood_union.to_file(
        GPKG, layer="tsunami_union",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}  # (3) 上書き耐性
    )

# 0') ハザードの簡易メタ(A40が変わったらルートキャッシュ無効化に使う)
ti = gpd.read_file(GPKG, layer="tsunami_inundation")
hazard_meta = f'{len(ti)}|{",".join(map(str, ti.total_bounds))}'

# 1) キャッシュキー生成(端点・モード・半径・ハザードメタ)
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=12000, hazard_meta=None):
    payload = {"home": home_wgs, "dest": dest_wgs, "mode": mode, "r": radius_m, "hz": hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

key = route_key(home_wgs, (dest_pt.y, dest_pt.x), mode="walk",
                radius_m=GRAPH_RADIUS_M, hazard_meta=hazard_meta)

# 2) ルートキャッシュにヒットすれば使う
route_gdf = None
use_cached_route = False
if "route_cache" in listlayers(GPKG):
    cache = gpd.read_file(GPKG, layer="route_cache")
    hit = cache.loc[cache["rid"] == key]
    if len(hit):
        route_gdf = hit.set_crs(4326)
        meters = float(route_gdf["meters"].iloc[0])
        eta_walk_min  = float(route_gdf["eta_min_walk"].iloc[0])
        eta_walk_safe = float(route_gdf["eta_min_walk_safe"].iloc[0])
        eta_drive_min = float(route_gdf["eta_min_drive"].iloc[0])
        coords = list(route_gdf.geometry.iloc[0].coords)
        route_latlon = [(lat, lon) for lon, lat in coords]  # folium用 (lat,lon)
        use_cached_route = True
        print("↩︎ ルート: キャッシュヒット")

if not use_cached_route:
    # 3) これまで通りの経路('route')から LineString を作成
    route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
    route_geom = LineString(route_lonlat)
    route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

    # 4) 堅牢チェック(交差ゼロ&境界近接なし)
    route_m = route_gdf.to_crs(METRIC)
    flood_m = flood_union.to_crs(METRIC)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty

    if inside_len_m > 0 or near:
        raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                           "別避難所 or 半径拡大で再探索してください。")

    # 5) 距離[m]→ETA
    try:
        route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u, v in zip(route[:-1], route[1:]):
            ed = G2.get_edge_data(u, v)
            meters += min(d.get("length", 0.0) for d in ed.values())

    walk_kmh = 4.8
    drive_kmh = 20.0
    eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
    eta_drive_min = meters / (drive_kmh * 1000 / 60)
    eta_walk_safe = eta_walk_min * 1.3

    # 6) route_latest へ保存(検証済みのみ)→ 上書き
    out_gpkg = GPKG
    route_gdf = gpd.GeoDataFrame(
        {
            "meters":[meters],
            "eta_min_walk":[eta_walk_min],
            "eta_min_walk_safe":[eta_walk_safe],
            "eta_min_drive":[eta_drive_min],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326",
    )
    route_gdf.to_file(
        out_gpkg, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}  # (1) 上書き
    )
    print("✅ 交差なし・保存完了")

    # 7) ルートキャッシュに追記(pyogrio 流儀)
    route_cache_rec = route_gdf.copy()
    route_cache_rec["rid"] = key
    try:
        route_cache_rec.to_file(
            out_gpkg, layer="route_cache",
            driver="GPKG", engine="pyogrio",
            append=True                     # (2) 追記
        )
    except Exception:
        # フォールバック:スキーマ差や古い環境でも確実に追記する
        if "route_cache" in listlayers(out_gpkg):
            old = gpd.read_file(out_gpkg, layer="route_cache")
            cols = sorted(set(old.columns) | set(route_cache_rec.columns))
            old = old.reindex(columns=cols)
            new = route_cache_rec.reindex(columns=cols)
            merged = gpd.GeoDataFrame(pd.concat([old, new], ignore_index=True), crs=4326)
            merged.to_file(
                out_gpkg, layer="route_cache",
                driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"}  # マージして上書き
            )
        else:
            route_cache_rec.to_file(
                out_gpkg, layer="route_cache",
                driver="GPKG", engine="pyogrio"
            )

    # 8) folium用座標
    route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]

# 9) Folium表示(キャッシュ・新規どちらでも)
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

の後に

import geopandas as gpd
gpd.read_file("outputs/shizuoka_hazard.gpkg", layer="route_latest")[["meters","eta_min_walk","eta_min_walk_safe","eta_min_drive"]]

を実行すると

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[31], line 2
      1 import geopandas as gpd
----> 2 gpd.read_file("outputs/shizuoka_hazard.gpkg", layer="route_latest")[["meters","eta_min_walk","eta_min_walk_safe","eta_min_drive"]]

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/geodataframe.py:1896, in GeoDataFrame.__getitem__(self, key)
   1890 def __getitem__(self, key):
   1891     """
   1892     If the result is a column containing only 'geometry', return a
   1893     GeoSeries. If it's a DataFrame with any columns of GeometryDtype,
   1894     return a GeoDataFrame.
   1895     """
-> 1896     result = super().__getitem__(key)
   1897     # Custom logic to avoid waiting for pandas GH51895
   1898     # result is not geometry dtype for multi-indexes
   1899     if (
   1900         pd.api.types.is_scalar(key)
   1901         and key == ""
   (...)   1904         and not is_geometry_type(result)
   1905     ):

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pandas/core/frame.py:4108, in DataFrame.__getitem__(self, key)
   4106     if is_iterator(key):
   4107         key = list(key)
-> 4108     indexer = self.columns._get_indexer_strict(key, "columns")[1]
   4110 # take() does not accept boolean indexers
   4111 if getattr(indexer, "dtype", None) == bool:

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pandas/core/indexes/base.py:6200, in Index._get_indexer_strict(self, key, axis_name)
   6197 else:
   6198     keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr)
-> 6200 self._raise_if_missing(keyarr, indexer, axis_name)
   6202 keyarr = self.take(indexer)
   6203 if isinstance(key, Index):
   6204     # GH 42790 - Preserve name from an Index

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pandas/core/indexes/base.py:6252, in Index._raise_if_missing(self, key, indexer, axis_name)
   6249     raise KeyError(f"None of [{key}] are in the [{axis_name}]")
   6251 not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique())
-> 6252 raise KeyError(f"{not_found} not in index")

KeyError: "['eta_min_walk_safe'] not in index"

となった

GPTによれば
原因は2つのどちらか(または両方

途中の「一回目の書き出し」で eta_min_walk_safe を含まないスキーマが作られた

最初に meters / eta_min_walk / eta_min_drive だけを持つ route_gdf を
書き出す箇所があります(OVERWRITE なし)。
この時点で route_latest の列定義が確定 します。
その後に eta_min_walk_safe を追加して上書きしても、環境によっては列が残らない/読み出し側で見えないことがあります

route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

がその部分

次の理由として
キャッシュヒット分岐では route_latest を書き直していない
ルート: キャッシュヒット になった場合、後段の

# 6) route_latest へ保存(…)
route_gdf.to_file(... OVERWRITE="YES")

がが実行されません。結果、古いスキーマのまま

このため

from shapely.geometry import LineString
import geopandas as gpd

route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]
route_gdf = gpd.GeoDataFrame(
    {"meters":[meters], "eta_min_walk":[eta_walk_min], "eta_min_drive":[eta_drive_min]},
    geometry=[LineString(route_lonlat)], crs="EPSG:4326"
)
# route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

の中の

# ❌ これを削除:
route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

をコメントアウト

どの分岐でも同じスキーマで必ず上書き保存するように
共通の保存関数を作り、キャッシュヒット時も呼ぶようにする

from datetime import datetime

def save_route_latest(out_gpkg, route_geom, meters, eta_walk_min, eta_drive_min, eta_walk_safe):
    gdf = gpd.GeoDataFrame(
        {
            "meters":[float(meters)],
            "eta_min_walk":[float(eta_walk_min)],
            "eta_min_walk_safe":[float(eta_walk_safe)],
            "eta_min_drive":[float(eta_drive_min)],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326"
    )
    gdf.to_file(
        out_gpkg, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}
    )

無限ループの対処

一度 notebook を止めて再起動し
以下のコードを実行した
結果は以下の通り

import requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

def parse_entry_xml(url):
    r = requests.get(url, timeout=15)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波領域の抽出
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if kind_name:
            kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    return {
        "title": title, "report_dt": report_dt,
        "info_kind": info_kind, "info_type": info_type,
        "headline": headline, "kinds": list(set(kinds)),
        "areas": areas, "url": url
    }

def is_tsunami_trigger(rec):
    # “津波”を広めに判定(タイトル・見出し・InfoKind)
    text_pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in text_pool:
        return False
    # 強ワード
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in text_pool for k in kw):
        return True
    # あるいはKindsに入っていればOK
    return any(k in rec.get("kinds", []) for k in kw)

# ---- ドライラン:feed取得→最新5件だけ詳細をなめて出力 ----
r = requests.get(EQVOL_FEED, timeout=15)
print("feed status:", r.status_code, "bytes:", len(r.content))
d = feedparser.parse(r.text)
print("entries:", len(d.entries))
for e in d.entries[:5]:
    print("ENTRY:", e.get("published",""), e.get("title",""))
    try:
        rec = parse_entry_xml(e.link)
        print("  InfoKind:", rec["info_kind"], "| InfoType:", rec["info_type"])
        print("  Title   :", rec["title"])
        print("  Headline:", (rec["headline"] or "")[:120])
        print("  kinds   :", rec["kinds"], "| areas:", len(rec["areas"]))
        print("  -> TRIGGER?", is_tsunami_trigger(rec))
    except Exception as ex:
        print("  ERROR parsing:", ex)

を実行

feed status: 200 bytes: 27328
entries: 37
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 薩摩硫黄島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 諏訪之瀬島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 霧島山(新燃岳) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 浅間山 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 草津白根山(白根山(湯釜付近)) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False

もしこれが
entries=0 → フィード取得が怪しい(ネットワーク・一時ブロック)。requests.get(EQVOL_FEED).text[:2000]をprintして中身を確認
entries>0かつInfoKindやTitleに「津波」が無い → 正常(いま津波電文が無いだけ)
parse_entry_xmlでエラー → XML名前空間/パスが電文種別で違う可能性。print(rec[“url”])で該当XMLを保存して中身を見るのが早い

というように原因を理解できる

補足(実運用へ進めるとき)
静岡沿岸の津波予報区コードでareasをフィルタ(対象外は通知しない)
InfoType == “取消”で状態を解除に遷移
併せてextra.xmlも監視すると取りこぼしが減ります(仕様差し込みは後でOK)
ノートブックではなく常駐スクリプト/systemdで運用し、ログをローテーション

他にやったことは
1. 通信・ネットワークの確認
まず、最初の requests.get(EQVOL_FEED) で通信が成功しているかを確かめる

import requests
EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"

r = requests.get(EQVOL_FEED, timeout=15)
print("status:", r.status_code)
print(r.text[:1000])  # 最初の1000文字だけ表示

✅ status: 200 なら通信成功。
❌ もし 403 や Timeout が出た場合 → 通信がブロックされている(VPN・プロキシ・ファイアウォールなどの影響)。

結果は

status: 200
<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" lang="ja">
  <title>高頻度(地震火山)</title>
  <subtitle>JMAXML publishing feed</subtitle>
  <updated>2025-11-06T05:01:14+09:00</updated>
  <id>https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml#short_1762372874</id>
  <link rel="related" href="https://www.jma.go.jp/"/>
  <link rel="self" href="https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"/>
  <link rel="hub" href="http://alert-hub.appspot.com/"/>
  <rights type="html"><![CDATA[
<a href="https://www.jma.go.jp/jma/kishou/info/coment.html">利用規約</a>,
<a href="https://www.jma.go.jp/jma/en/copyright.html">Terms of Use</a>
]]></rights>
  <entry>
    <title>降灰予報(定時)</title>
    <id>https://www.data.jma.go.jp/developer/xml/data/20251105200049_0_VFVO53_010000.xml</id>
    <updated>2025-11-05T20:00:00Z</updated>
    <author>
      <name>気象庁</name>
    </author>
    <link type="application/xml" href="https://ww

カーネル・出力の健全性チェック
これで何も出ないなら、セルが実行されていない/カーネルが固まっている/出力パネルが壊れている可能性が濃厚

import sys, time, os, pathlib, subprocess

print("STEP A1: print works?", flush=True)
sys.stdout.write("STEP A2: stdout.write works?\n"); sys.stdout.flush()
sys.stderr.write("STEP A3: stderr works?\n"); sys.stderr.flush()

for i in range(3):
    print(f"tick {i}", flush=True)
    time.sleep(0.5)

# ファイル出力で実行確認(UI出力が壊れていても痕跡が残る)
pathlib.Path("notebook_alive.txt").write_text("alive\n")
print("STEP A4: wrote notebook_alive.txt", flush=True)

# サブプロセスの標準出力も確認
ret = subprocess.run(["echo", "STEP A5: subprocess echo"], capture_output=True, text=True)
print(ret.stdout, end="")
print("STEP A DONE", flush=True)

結果は

STEP A1: print works?
STEP A2: stdout.write works?
STEP A3: stderr works?
tick 0
tick 1
tick 2
STEP A4: wrote notebook_alive.txt
STEP A5: subprocess echo
STEP A DONE

しかし
再度

# 津波情報の取得
import time
import requests
import feedparser
import xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

last_modified = None
seen_ids = set()

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波関連の抽出(領域と種別)
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })

    return {
        "title": title,
        "report_dt": report_dt,
        "info_kind": info_kind,
        "info_type": info_type,
        "headline": headline,
        "kinds": list(set(kinds)),
        "areas": areas,
        "url": url,
    }

def is_tsunami_trigger(rec):
    # 種別に“津波警報・注意報・予報/津波情報”などが含まれるものを対象
    if "津波" not in (rec["info_kind"] or rec["title"] or rec["headline"]):
        return False
    # 警報/注意報/予報の明示があれば強トリガー
    keywords = ["大津波警報", "津波警報", "津波注意報", "津波予報", "津波情報"]
    if any(k in (rec["headline"] or "") for k in keywords):
        return True
    # 後方互換(kindsに入っていればOK)
    return any(k in keywords for k in rec["kinds"])

def loop():
    global last_modified
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)
                # 新しい順に処理
                for e in d.entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id)
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec):
                        # === ここであなたの通知/保存パイプラインへ ===
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"], rec["kinds"][:3], "areas=", len(rec["areas"]))
            if lm:
                last_modified = lm
        except Exception as ex:
            print("error:", ex)
        time.sleep(60)

if __name__ == "__main__":
    loop()

を実行すると終わらない

ノートブックで「終わらない」のは、**無限ループ中に“トリガー時しかprintしない”**から

= 津波電文が無い通常時は1分ごとに静かにsleep→再取得、を永遠に繰り返す

すぐ直すポイント(Notebook向け)
1. デバッグ出力を入れて「今なにしてるか」を毎ループ表示(flushも)
2. ループ回数に上限(MAX_LOOPS)を付けてセルが必ず終わる
3. 取得件数・新規処理件数・直近タイトルなどを表示
4. 例外時にURLも表示して切り分けしやすく

その前に処理を停止する方法を調べる

すぐ止める(今まさに動いているセルを止める)
* JupyterLab:Esc → I を素早く2回(= Interrupt)。またはツールバーの■(Stop)をクリック
* Classic Notebook:メニュー Kernel → Interrupt
* Google Colab:メニュー ランタイム → 実行を中断(ショートカット:Ctrl/Cmd + M, I)
* それでも止まらない/固まる時:Kernel → Restart(再起動)
これで KeyboardInterrupt が発生し、セルの実行が中断されます。

ということで
🔳アイコンをクリックして処理を停止

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[4], line 97
     94         time.sleep(60)
     96 if __name__ == "__main__":
---> 97     loop()

Cell In[4], line 94, in loop()
     92 except Exception as ex:
     93     print("error:", ex)
---> 94 time.sleep(60)

KeyboardInterrupt: 

と表示された

停止確認できたので
デバッグ出力を入れて「今なにしてるか」を毎ループ表示(flushも)
ループ回数に上限(MAX_LOOPS)を付けてセルが必ず終わる
取得件数・新規処理件数・直近タイトルなどを表示
例外時にURLも表示して切り分けしやすく

を入れる

Notebook向けに
* 3周で必ず終了(MAX_LOOPS=3)
* 毎ループの状況を表示(entries / new / last_modified と直近タイトル)
* 変更なしのときは 304 Not Modified と出力
になるので、「動いているのに何も出ない」を防げる

小さな実用メモ
* 早く挙動を見たいときは SLEEP_SEC = 15 などにするとテンポ良く確認できます。
* 何度かセルを繰り返し実行するなら、毎回まっさらで試すために先頭で seen_ids = set() を明示的に初期化しておくと安心です。
* 津波電文が出たときだけ [TSUNAMI TRIGGER] … が出ます(通常時は出ません)。
次の一手(お好みで)
* 対象地域フィルタ:静岡沿岸などの予報区コードだけ通知したい場合は、parse_entry_xml() で得た rec[“areas”] を使って絞れます(例:if not any(a[“code”] in SHIZUOKA_CODES for a in rec[“areas”]): return False)。コードは後で差し込みましょう(リストは用意します)。
* 見栄えの良い停止:loop() を try: … except KeyboardInterrupt: print(“Stopped by user”) で包むとトレース無しで止まります。
* 常駐運用:Notebook検証が終わったら MAX_LOOPS を外し、systemd などで常駐+ログローテーションにすると本番向きです。

ということで

# Notebookデモ用:必ず終わる・状況が見える版
import time, requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

last_modified = None
seen_ids = set()
MAX_LOOPS = 3          # ★ Notebookでは必ず終わらせる
SLEEP_SEC  = 60        # デモなら 15 にしてもOK
DEBUG = True

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)
    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if kind_name:
            kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    return {
        "title": title, "report_dt": report_dt, "info_kind": info_kind, "info_type": info_type,
        "headline": headline, "kinds": list(set(kinds)), "areas": areas, "url": url
    }

def is_tsunami_trigger(rec):
    pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in pool:
        return False
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in pool for k in kw): 
        return True
    return any(k in rec.get("kinds", []) for k in kw)

def loop():
    global last_modified
    loops = 0
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)
                new_cnt = 0
                last_titles = []
                for e in d.entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id)
                    new_cnt += 1
                    last_titles.append(e.title)
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec):
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"], rec["kinds"][:3], "areas=", len(rec["areas"]), flush=True)
                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] entries={len(d.entries)} new={new_cnt} last_modified={lm}", flush=True)
                    if last_titles:
                        print("  recent:", " / ".join(last_titles[:3])[:160], flush=True)
                    else:
                        print("  recent: (no new entries this cycle)", flush=True)
            else:
                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] 304 Not Modified(変更なし)", flush=True)
            if lm:
                last_modified = lm
        except Exception as ex:
            print("error:", ex, flush=True)
        loops += 1
        if loops >= MAX_LOOPS:
            print("done (MAX_LOOPS reached)", flush=True)
            break
        time.sleep(SLEEP_SEC)

loop()

に変更して実行する

ログは「フィード取得は成功・“いま津波電文はない”・初回なので new=33(初回は未既知IDがないため全部新規扱い)」という正常動作

最近流れているのが降灰予報(火山)なので、[TSUNAMI TRIGGER] は出ない

無駄なXML取得を減らして津波だけを見る・静岡沿岸だけ通知・既読IDを永続化の3点を足す

# 津波専用・Notebook検証向け(津波以外はスキップ、静岡コードで絞り、既読IDを永続化)
import time, json, os
import requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

# --- 静岡沿岸の津波予報区コード(代表例):必要に応じて追加 ---
SHIZUOKA_CODES = {
    "504",  # 遠州灘
    "505",  # 駿河湾
    # 必要なら追加(JMAコード表に準拠)
}

last_modified = None
SEEN_FILE = "seen_ids.json"
seen_ids = set(json.load(open(SEEN_FILE)) if os.path.exists(SEEN_FILE) else [])

MAX_LOOPS = 3
SLEEP_SEC = 60
DEBUG = True

def save_seen():
    try:
        with open(SEEN_FILE, "w") as f:
            json.dump(list(seen_ids), f)
    except Exception:
        pass

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)
    head = root.find(".//jmx_ib:Head", NAMESP)

    rec = {
        "title":     head.findtext("jmx_ib:Title", default="", namespaces=NAMESP),
        "report_dt": head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP),
        "info_kind": head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP),
        "info_type": head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP),
        "headline":  head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP),
        "kinds": [],
        "areas": [],
        "url": url,
    }

    # 津波領域(情報タイプ=津波予報領域表現)
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        k = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if k: rec["kinds"].append(k)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            rec["areas"].append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    rec["kinds"] = list(set(rec["kinds"]))
    return rec

def is_tsunami_trigger(rec):
    pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in pool:
        return False
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in pool for k in kw): 
        return True
    return any(k in rec.get("kinds", []) for k in kw)

def is_target_area(rec):
    if not rec["areas"]:
        # 領域表現が無い電文は地域未特定なので一旦通す or 無視するかは方針次第
        return True
    return any(a.get("code") in SHIZUOKA_CODES for a in rec["areas"])

def loop():
    global last_modified
    loops = 0
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)

                # ★ フィード段階で「津波」を含むエントリだけに早期フィルタ
                tsunami_entries = [e for e in d.entries 
                                   if "津波" in (e.title or "") or "tsunami" in (e.title or "").lower()]

                new_cnt = 0
                for e in tsunami_entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id); new_cnt += 1
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec) and is_target_area(rec):
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"],
                              rec["kinds"][:3], "areas=", len(rec["areas"]), flush=True)

                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] entries={len(d.entries)} tsunami_only={len(tsunami_entries)} new={new_cnt} last_modified={lm}", flush=True)

                save_seen()
            else:
                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] 304 Not Modified(変更なし)", flush=True)

            if lm:
                last_modified = lm

        except Exception as ex:
            print("error:", ex, flush=True)

        loops += 1
        if loops >= MAX_LOOPS:
            print("done (MAX_LOOPS reached)", flush=True)
            break
        time.sleep(SLEEP_SEC)

loop()

で実行

[06:25:13] entries=33 tsunami_only=0 new=0 last_modified=Fri, 07 Nov 2025 20:01:38 GMT
[06:26:13] 304 Not Modified(変更なし)
[06:27:14] 304 Not Modified(変更なし)
done (MAX_LOOPS reached)

となる

tsunami_only=0 は「今回のフィードのエントリータイトルに“津波”が一件も無い」という意味で、現状は降灰や地震系だけ=津波電文が出ていない状態です。
ただし——タイトルに“津波”が無くても本体XML側が津波情報というケースがあるため、実運用では「タイトルで早期フィルタ」は外して、直近N件だけ中身を開いて判定するのが堅い

安全版(タイトルで絞らず、直近N件だけ本文チェック/extra.xml併走)
直近 MAX_FETCH=10 件だけ本文XMLを開いて InfoKind/Headline から津波判定
eqvol.xml と extra.xml を同じロジックで監視
既読IDはフィードごとに保存

# 津波検知(タイトルで絞らず本文チェック)+ extra.xml 併走 / Notebook用
import time, json, os, requests, feedparser, xml.etree.ElementTree as ET

FEEDS = {
    "eqvol": "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml",
    "extra": "https://www.data.jma.go.jp/developer/xml/feed/extra.xml",
}

NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

# 静岡の津波予報区コード(例)
SHIZUOKA_CODES = {"504", "505"}  # 遠州灘/駿河湾。必要なら追加

MAX_FETCH = 10         # 各サイクルで本文を開く最大件数
MAX_LOOPS = 3          # Notebook用:必ず終わる
SLEEP_SEC = 60
DEBUG = True

state = {name: {"last_modified": None, "seen": set()} for name in FEEDS}

def _seen_path(feed_name): return f"seen_ids_{feed_name}.json"

# 既読IDのロード
for name in FEEDS:
    p = _seen_path(name)
    if os.path.exists(p):
        try:
            state[name]["seen"] = set(json.load(open(p)))
        except Exception:
            pass

def save_seen(feed_name):
    try:
        json.dump(list(state[feed_name]["seen"]), open(_seen_path(feed_name), "w"))
    except Exception:
        pass

def fetch_feed(feed_name, url):
    headers = {}
    lm = state[feed_name]["last_modified"]
    if lm: headers["If-Modified-Since"] = lm
    r = requests.get(url, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)
    head = root.find(".//jmx_ib:Head", NAMESP)

    rec = {
        "title": head.findtext("jmx_ib:Title", default="", namespaces=NAMESP),
        "report_dt": head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP),
        "info_kind": head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP),
        "info_type": head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP),
        "headline": head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP),
        "kinds": [], "areas": [], "url": url,
    }
    # 津波予報領域
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        k = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if k: rec["kinds"].append(k)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            rec["areas"].append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    rec["kinds"] = list(set(rec["kinds"]))
    return rec

def is_tsunami_trigger(rec):
    pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in pool:  # 見出し系に「津波」が登場しないものは通常スキップ
        return False
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in pool for k in kw): return True
    return any(k in rec.get("kinds", []) for k in kw)

def is_target_area(rec):
    # 領域未記載なら方針次第。ここでは一旦通す。
    if not rec["areas"]: return True
    return any(a.get("code") in SHIZUOKA_CODES for a in rec["areas"])

def watch_cycle():
    total_new = 0
    for name, url in FEEDS.items():
        try:
            feed_text, lm = fetch_feed(name, url)
            if not feed_text:
                if DEBUG: print(f"[{name}] 304 Not Modified")
                continue

            d = feedparser.parse(feed_text)
            # まだ見てない最新順から MAX_FETCH 件だけ本文チェック
            new_entries = [e for e in d.entries if e.id not in state[name]["seen"]][:MAX_FETCH]
            total_new += len(new_entries)

            for e in new_entries:
                state[name]["seen"].add(e.id)
                rec = parse_entry_xml(e.link)
                if is_tsunami_trigger(rec) and is_target_area(rec):
                    print("[TSUNAMI TRIGGER]", name, rec["report_dt"], rec["headline"],
                          rec["kinds"][:3], "areas=", len(rec["areas"]), flush=True)

            if DEBUG:
                print(f"[{name}] entries={len(d.entries)} checked={len(new_entries)} last_modified={lm}")

            save_seen(name)
            if lm: state[name]["last_modified"] = lm

        except Exception as ex:
            print(f"[{name}] error:", ex, flush=True)
    return total_new

# ===== Notebook用メインループ =====
loops = 0
while loops < MAX_LOOPS:
    _ = watch_cycle()
    loops += 1
    if loops >= MAX_LOOPS:
        print("done (MAX_LOOPS reached)")
        break
    time.sleep(SLEEP_SEC)

で実行

タイトルに“津波”が無いせいで tsunami_only=0 になる問題を回避(本文で判定)
フィード2本(eqvol/extra)を同ロジックで監視
既読の永続化により、Notebookを再実行しても初回から「全部new」にならない
直近 MAX_FETCH のみ本文取得で無駄アクセス抑制

結果は

[eqvol] entries=33 checked=10 last_modified=Fri, 07 Nov 2025 20:01:38 GMT
[extra] entries=106 checked=10 last_modified=Fri, 07 Nov 2025 21:24:15 GMT
[eqvol] 304 Not Modified
[extra] 304 Not Modified

ログの意味
* [eqvol] entries=33 checked=10:フィードに33件、今回は未読の先頭10件だけ本文を精査
* [extra] entries=106 checked=10:同上
* 304 Not Modified:前回から更新なし=正常
いまは津波電文が無いのでトリガーなし=想定どおり

とりあえず動作はしそうなので
次は通知の方法を考える

津波警報などの情報を検知できるようにしたい

EEW/揺れ検知トリガーの作成をしようと思ったが
これでGPTで行うと実際に揺れを検知するような仕組みを提案される

そうではなく
前回の津波警報の時にはロシアのカムチャッカ半島沖の地震で
津波警報が出ていたので、そういった情報を検知できるようにしたい

また提案の中で
*JMA防災情報XMLの実パーサ(地震/津波の要素抽出)*
とあったが、これがどのようなものかを知りたい

回答は
「端末の揺れを検知する」のではなく、気象庁の公式配信(JMA防災情報XML)から“津波関連の電文”を監視してトリガーを発火させたい

JMA防災情報XMLの“実パーサ”とは

気象庁は“PULL型”のAtomフィードを公開しています。代表的には:
* 地震・火山用(高頻度):eqvol.xml(毎分更新)
ここから「津波警報・注意報・予報」「津波情報」「震源・震度」などの電文へのリンクを取得します。 data.e-gov.go.jp
* 随時情報(高頻度):extra.xml(毎分更新)
広義の警報・注意報系の新着が流れます。用途に応じて併用。 data.e-gov.go.jp
配信の概要・取得方法は公式「防災情報XML(PULL型)」にまとまっています(Pushは終了済み)。 xml.kishou.go.jp+1
例:カムチャッカ半島沖で地震発生 → 日本沿岸に影響が見込まれる場合、**「津波警報・注意報・予報」「津波情報」**の電文が発表され、上記フィードに載ります。発表・解除・更新も電文として流れます。

「実パーサ」とは何か?
ここでの“実パーサ”は、JMA XMLの仕様に沿って、関心の要素(見出し、発表種別、対象地域、予想津波高、発表時刻、取消/解除、更新理由など)を確実に抽出する処理を指します。
JMA XMLは複数の名前空間・電文種別で構成され、代表的な抽出ポイントは以下です:

* ヘッダ系(共通)
Head/Title, Head/ReportDateTime, Head/TargetDateTime, Head/Headline/Text, Head/InfoKind, Head/InfoType(発表/訂正/取消 など) xml.kishou.go.jp
* 津波系
Body/Tsunami(jmx_tsunami:名前空間)やInformation[@type=”津波予報領域表現”]配下の
Item/Kind/Name(大津波警報/津波警報/津波注意報/津波予報 等)と
Areas/Area/Name, Code(津波予報区とコード)を列挙。更新・解除時もここに反映。 xml.kishou.go.jp+1
* 地震系(参考)
震源・震度情報やEEW(公開範囲に制約あり)等のjmx_seis:要素。津波判断の参考に使うことがあります(ただし“揺れ検知”はしない)。 data.e-gov.go.jp
JMAは詳細な解説・例示を公開しています(2012/2013年版などが基本仕様をカバー)。パース時の要素階層・コード表はここを参照するのが安全です。 xml.kishou.go.jp

運用のコツ(トリガー条件)
* 監視間隔:毎分。If-Modified-Since等で負荷と無駄取得を抑制。 Qiita
* 一次トリガー:新規・更新電文のItem/Kind/Nameに
「大津波警報 / 津波警報 / 津波注意報 / 津波予報」が含まれる、かつ Areas/Area/Codeが対象地域に一致。 xml.kishou.go.jp
* 抑止・更新:Head/InfoTypeが「取消」「訂正」「追加」などの場合は状態遷移として扱う。
* 表示:JMAマップで最新状況を補助表示(UIリンク)。 気象庁

仕上げ:あなたのシステムへの組み込みポイント
1. 地域フィルタ:静岡沿岸など対象の津波予報区コードだけに絞る。
2. 状態管理:InfoTypeが取消のときは通知解除、訂正/追加は更新扱い。 xml.kishou.go.jp
3. 可視化:ダッシュボードにJMA公式の最新表示もリンクで添える(家族向けに分かりやすい)。 気象庁
4. 安全側設計:不明時は強めの文言で注意喚起(あなたの既存“安全側”方針に合わせる)。
5. ログ/再現性:電文URL・ReportDateTime・InfoKindVersion等を保存(事後検証が楽)。 気象庁

必要なら、対象予報区のコード表や「解除検知」「詳細(予想高、到達予想時刻が文面に含まれる場合の抽出)」まで組み込んだ“完成版パーサ”に拡張します。
この方向なら“揺れ検知ハードウェア”は不要で、カムチャッカ沖など国外震源でも日本向け津波電文を確実にトリガーできます。

生成されたコードを実行

import time
import requests
import feedparser
import xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

last_modified = None
seen_ids = set()

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波関連の抽出(領域と種別)
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })

    return {
        "title": title,
        "report_dt": report_dt,
        "info_kind": info_kind,
        "info_type": info_type,
        "headline": headline,
        "kinds": list(set(kinds)),
        "areas": areas,
        "url": url,
    }

def is_tsunami_trigger(rec):
    # 種別に“津波警報・注意報・予報/津波情報”などが含まれるものを対象
    if "津波" not in (rec["info_kind"] or rec["title"] or rec["headline"]):
        return False
    # 警報/注意報/予報の明示があれば強トリガー
    keywords = ["大津波警報", "津波警報", "津波注意報", "津波予報", "津波情報"]
    if any(k in (rec["headline"] or "") for k in keywords):
        return True
    # 後方互換(kindsに入っていればOK)
    return any(k in keywords for k in rec["kinds"])

def loop():
    global last_modified
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)
                # 新しい順に処理
                for e in d.entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id)
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec):
                        # === ここであなたの通知/保存パイプラインへ ===
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"], rec["kinds"][:3], "areas=", len(rec["areas"]))
            if lm:
                last_modified = lm
        except Exception as ex:
            print("error:", ex)
        time.sleep(60)

if __name__ == "__main__":
    loop()

しかしこれだと動作しない
実際には処理が長すぎるのが原因だったようだ

以下の内容を notebook を再起動して実行したら

原因はだいたい次のどれかです:
1. いま津波系の電文が出ていない → フィルタに引っかからず1行もprintされない
2. eqvol.xmlは取れているが、電文のInfoKindやHeadlineに「津波」が無くて弾かれている
3. 取得自体が失敗(304やネットワーク)しているが、分かるログを出していない
4. notebookで無限ループを回していて、動いてはいるが“何もヒットしないから何も出ない”
まずは「取れている?パースできてる?何が弾かれてる?」を確認する**ドライラン(1回だけ走る)**を入れると状況が見えます。下のテスト版を一度そのまま実行してください(無限ループなし・デバッグprint多め)。

なので

import requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

def parse_entry_xml(url):
    r = requests.get(url, timeout=15)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波領域の抽出
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if kind_name:
            kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    return {
        "title": title, "report_dt": report_dt,
        "info_kind": info_kind, "info_type": info_type,
        "headline": headline, "kinds": list(set(kinds)),
        "areas": areas, "url": url
    }

def is_tsunami_trigger(rec):
    # “津波”を広めに判定(タイトル・見出し・InfoKind)
    text_pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in text_pool:
        return False
    # 強ワード
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in text_pool for k in kw):
        return True
    # あるいはKindsに入っていればOK
    return any(k in rec.get("kinds", []) for k in kw)

# ---- ドライラン:feed取得→最新5件だけ詳細をなめて出力 ----
r = requests.get(EQVOL_FEED, timeout=15)
print("feed status:", r.status_code, "bytes:", len(r.content))
d = feedparser.parse(r.text)
print("entries:", len(d.entries))
for e in d.entries[:5]:
    print("ENTRY:", e.get("published",""), e.get("title",""))
    try:
        rec = parse_entry_xml(e.link)
        print("  InfoKind:", rec["info_kind"], "| InfoType:", rec["info_type"])
        print("  Title   :", rec["title"])
        print("  Headline:", (rec["headline"] or "")[:120])
        print("  kinds   :", rec["kinds"], "| areas:", len(rec["areas"]))
        print("  -> TRIGGER?", is_tsunami_trigger(rec))
    except Exception as ex:
        print("  ERROR parsing:", ex)

を実行

feed status: 200 bytes: 27328
entries: 37
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 薩摩硫黄島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 諏訪之瀬島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 霧島山(新燃岳) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 浅間山 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 草津白根山(白根山(湯釜付近)) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False

もしこれが

entries=0 → フィード取得が怪しい(ネットワーク・一時ブロック)。requests.get(EQVOL_FEED).text[:2000]をprintして中身を確認

entries>0かつInfoKindやTitleに「津波」が無い → 正常(いま津波電文が無いだけ)

parse_entry_xmlでエラー → XML名前空間/パスが電文種別で違う可能性。print(rec[“url”])で該当XMLを保存して中身を見るのが早い

というように原因を理解できる

補足(実運用へ進めるとき)
静岡沿岸の津波予報区コードでareasをフィルタ(対象外は通知しない)
InfoType == “取消”で状態を解除に遷移
併せてextra.xmlも監視すると取りこぼしが減ります(仕様差し込みは後でOK)
ノートブックではなく常駐スクリプト/systemdで運用し、ログをローテーション

他にやったことは
1. 通信・ネットワークの確認
まず、最初の requests.get(EQVOL_FEED) で通信が成功しているかを確かめる

import requests
EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"

r = requests.get(EQVOL_FEED, timeout=15)
print("status:", r.status_code)
print(r.text[:1000])  # 最初の1000文字だけ表示

✅ status: 200 なら通信成功。
❌ もし 403 や Timeout が出た場合 → 通信がブロックされている(VPN・プロキシ・ファイアウォールなどの影響)。

結果は

status: 200
<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" lang="ja">
  <title>高頻度(地震火山)</title>
  <subtitle>JMAXML publishing feed</subtitle>
  <updated>2025-11-06T05:01:14+09:00</updated>
  <id>https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml#short_1762372874</id>
  <link rel="related" href="https://www.jma.go.jp/"/>
  <link rel="self" href="https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"/>
  <link rel="hub" href="http://alert-hub.appspot.com/"/>
  <rights type="html"><![CDATA[
<a href="https://www.jma.go.jp/jma/kishou/info/coment.html">利用規約</a>,
<a href="https://www.jma.go.jp/jma/en/copyright.html">Terms of Use</a>
]]></rights>
  <entry>
    <title>降灰予報(定時)</title>
    <id>https://www.data.jma.go.jp/developer/xml/data/20251105200049_0_VFVO53_010000.xml</id>
    <updated>2025-11-05T20:00:00Z</updated>
    <author>
      <name>気象庁</name>
    </author>
    <link type="application/xml" href="https://ww

2. フィード構造が読めているか確認
feedparser がXMLを正しく認識していないと、entries が空になる

import feedparser

feed = feedparser.parse("https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml")
print("entries:", len(feed.entries))
for e in feed.entries[:5]:
    print(e.title, e.link)

の結果は

カーネル・出力の健全性チェック
これで何も出ないなら、セルが実行されていない/カーネルが固まっている/出力パネルが壊れている可能性が濃厚

import sys, time, os, pathlib, subprocess

print("STEP A1: print works?", flush=True)
sys.stdout.write("STEP A2: stdout.write works?\n"); sys.stdout.flush()
sys.stderr.write("STEP A3: stderr works?\n"); sys.stderr.flush()

for i in range(3):
    print(f"tick {i}", flush=True)
    time.sleep(0.5)

# ファイル出力で実行確認(UI出力が壊れていても痕跡が残る)
pathlib.Path("notebook_alive.txt").write_text("alive\n")
print("STEP A4: wrote notebook_alive.txt", flush=True)

# サブプロセスの標準出力も確認
ret = subprocess.run(["echo", "STEP A5: subprocess echo"], capture_output=True, text=True)
print(ret.stdout, end="")
print("STEP A DONE", flush=True)

結果は

STEP A1: print works?
STEP A2: stdout.write works?
STEP A3: stderr works?
tick 0
tick 1
tick 2
STEP A4: wrote notebook_alive.txt
STEP A5: subprocess echo
STEP A DONE

処理そのものが終わらないのが原因なので原因を探す

まずは v0(当日実用版) を“そのまま挿すだけ”で動く形で用意

まずは v0(当日実用版) を“そのまま挿すだけ”で動く形で用意

Epicenter(震源の lat,lon)を渡すと、深海近似の伝播速度で到達時間を安全側に推定し、
route_latest に arrival_time_min / height_class / confidence を追記して上書き保存、
同時に sim_runs に実行ログを追記

既存の変数:GPKG, coast(海岸線), home_wgs(自宅), route_latest は前工程で作成済みを想定
追加依存:pyproj(地球楕円体距離計算用)

# === 津波「到達時間・高さ」簡易推定 (v0) ===
# 目的:震源→自宅最寄り海岸線までの直線距離 / 深海近似速度で到達時間を安全側に推定
# 出力:arrival_time_min, height_class, confidence を route_latest に追記(上書き保存)
#       sim_runs テーブルに実行ログを追記
from datetime import datetime
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from shapely.ops import nearest_points
from pyproj import Geod

GEOD = Geod(ellps="WGS84")

def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
    """
    自宅に最も近い海岸線上の点を返す(WGS84)
    - coast_gdf: 4326 想定(LineString/MultiLineString)
    - home_latlon: (lat,lon)
    """
    if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt = Point(home_latlon[1], home_latlon[0])  # (lon,lat)
    # 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
    coast_wgs["__d__"] = coast_wgs.distance(home_pt)
    geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
    n1, n2 = nearest_points(geom, home_pt)
    return n1  # 海岸線側の最近点(Point lon,lat)

def _geodesic_km(p1_latlon: tuple[float,float], p2_latlon: tuple[float,float]) -> float:
    """
    WGS84 楕円体の測地距離[km]
    """
    az12, az21, m = GEOD.inv(p1_latlon[1], p1_latlon[0], p2_latlon[1], p2_latlon[0])
    return m / 1000.0

def estimate_arrival_minutes_v0(
    epicenter_latlon: tuple[float,float],
    coast_gdf: gpd.GeoDataFrame,
    home_latlon: tuple[float,float],
    h_rep_m: float = 3000.0,       # 代表水深[m](深海)
    safety_slow_factor: float = 1.2  # 速度を遅めに見積もる倍率(到達時間は長くなる)
) -> float:
    """
    深海近似の伝播速度 c ≈ √(g*h) を用い、震源→自宅最寄り海岸までの直線距離から到達時間[分]を推定(安全側)。
    """
    g = 9.81
    c_ms = (g * h_rep_m) ** 0.5              # m/s
    c_ms_safe = c_ms / safety_slow_factor    # 安全側に遅くする
    coast_pt = _nearest_coast_point_to_home(coast_gdf, home_latlon)  # lon,lat
    coast_latlon = (coast_pt.y, coast_pt.x)

    D_km = _geodesic_km(epicenter_latlon, coast_latlon)  # 震源→自宅最寄り海岸
    t_min = (D_km * 1000.0) / c_ms_safe / 60.0
    return float(t_min)

def update_route_latest_with_arrival(
    GPKG: str,
    arrival_time_min: float,
    height_class: str = "unknown",
    confidence: str = "low",
):
    """
    route_latest に arrival_time_min / height_class / confidence を追記して上書き保存。
    route_latest が未作成の場合は何もしない(例外を出さず return)。
    """
    from fiona import listlayers
    if "route_latest" not in listlayers(GPKG):
        print("route_latest レイヤが未作成のため、到達時間の追記はスキップしました。")
        return
    rl = gpd.read_file(GPKG, layer="route_latest")
    # 既存列がなければ追加
    for col in ["arrival_time_min", "height_class", "confidence"]:
        if col not in rl.columns:
            rl[col] = None
    rl.loc[:, "arrival_time_min"] = float(arrival_time_min)
    rl.loc[:, "height_class"] = str(height_class)
    rl.loc[:, "confidence"] = str(confidence)
    rl.to_file(
        GPKG, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}
    )
    print(f"✓ route_latest を更新: 到達 {arrival_time_min:.1f} 分 / 高さ {height_class} / 信頼度 {confidence}")

def append_sim_run(
    GPKG: str,
    epicenter_latlon: tuple[float,float],
    home_latlon: tuple[float,float],
    arrival_time_min: float,
    h_rep_m: float,
    safety_slow_factor: float,
    height_class: str,
    confidence: str,
    source: str = "v0_deep_sea_approx"
):
    """
    sim_runs に今回の推定メタを 1 行追記。
    既存スキーマ差があればマージ→上書きで吸収。
    """
    from fiona import listlayers
    rec = gpd.GeoDataFrame({
        "run_id": [datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        "epicenter_lat": [epicenter_latlon[0]],
        "epicenter_lon": [epicenter_latlon[1]],
        "home_lat": [home_latlon[0]],
        "home_lon": [home_latlon[1]],
        "arrival_time_min": [float(arrival_time_min)],
        "h_rep_m": [float(h_rep_m)],
        "safety_slow_factor": [float(safety_slow_factor)],
        "height_class": [str(height_class)],
        "confidence": [str(confidence)],
        "source": ,
    }, geometry=[Point(epicenter_latlon[1], epicenter_latlon[0])], crs="EPSG:4326")

    try:
        rec.to_file(
            GPKG, layer="sim_runs",
            driver="GPKG", engine="pyogrio",
            append=True
        )
    except Exception:
        # スキーマ差などのフォールバック(既存行を読み出してマージ→上書き)
        if "sim_runs" in listlayers(GPKG):
            old = gpd.read_file(GPKG, layer="sim_runs")
            cols = sorted(set(old.columns) | set(rec.columns))
            merged = gpd.GeoDataFrame(pd.concat([old.reindex(columns=cols),
                                                 rec.reindex(columns=cols)], ignore_index=True), crs=4326)
            merged.to_file(
                GPKG, layer="sim_runs",
                driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"}
            )
        else:
            rec.to_file(GPKG, layer="sim_runs", driver="GPKG", engine="pyogrio")
    print("✓ sim_runs に追記しました。")

次にテスト

# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5)  # (lat, lon)

# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
    epicenter_latlon=epicenter,
    coast_gdf=coast,            # 既に読み込み済みの海岸線(EPSG:4326推奨)
    home_latlon=home_wgs,
    h_rep_m=3000.0,             # 代表水深:3,000m(必要に応じ調整)
    safety_slow_factor=1.2      # 速度を 1/1.2 に落として安全側
)

# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"

# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
    GPKG=GPKG,
    arrival_time_min=arr_min,
    height_class=height_class,
    confidence=confidence
)

# sim_runs にログ追記
append_sim_run(
    GPKG=GPKG,
    epicenter_latlon=epicenter,
    home_latlon=home_wgs,
    arrival_time_min=arr_min,
    h_rep_m=3000.0,
    safety_slow_factor=1.2,
    height_class=height_class,
    confidence=confidence
)

を実行すると



✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low
✓ sim_runs に追記しました。
/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/3983269974.py:27: UserWarning: Geometry is in a geographic CRS. Results from 'distance' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation.

  coast_wgs["__d__"] = coast_wgs.distance(home_pt)

となる

原因は
警告は「経度緯度(EPSG:4326)のまま .distance() を使っている」ため
距離計算は投影座標系(m単位)で行うようにすれば解決するらしい

最近点の計算を UTM で実施 → WGS84 に戻して返すようにする

def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
    """
    自宅に最も近い海岸線上の点を返す(WGS84)
    - coast_gdf: 4326 想定(LineString/MultiLineString)
    - home_latlon: (lat,lon)
    """
    if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt = Point(home_latlon[1], home_latlon[0])  # (lon,lat)
    # 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
    coast_wgs["__d__"] = coast_wgs.distance(home_pt)
    geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
    n1, n2 = nearest_points(geom, home_pt)
    return n1  # 海岸線側の最近点(Point lon,lat)

の部分を

def _nearest_coast_point_to_home(
    coast_gdf: gpd.GeoDataFrame,
    home_latlon: tuple[float, float],
    metric_crs: str = "EPSG:32654"  # 静岡周辺 UTM (m)
) -> Point:
    """
    自宅に最も近い海岸線上の点(WGS84)を返す。
    距離計算は投影座標系で実施して精度を担保。
    """
    # 入力を WGS84 とみなして正規化
    if coast_gdf.crs is None:
        coast_wgs = coast_gdf.set_crs(4326)
    elif coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt_wgs = Point(home_latlon[1], home_latlon[0])  # (lon, lat)

    # 距離計算は m 単位の投影座標で
    coast_m = coast_wgs.to_crs(metric_crs).copy()
    home_pt_m = gpd.GeoSeries([home_pt_wgs], crs=4326).to_crs(metric_crs).iloc[0]

    # まず最も近いジオメトリを特定(sindex があれば速い)
    try:
        # GeoPandas ≥0.12 なら sindex.nearest が使える環境も
        idx = next(coast_m.sindex.nearest(home_pt_m.bounds))[0]
    except Exception:
        # フォールバック:距離最小
        coast_m["__d__"] = coast_m.geometry.distance(home_pt_m)
        idx = coast_m["__d__"].idxmin()

    geom_m = coast_m.loc[idx, "geometry"]
    p_on_coast_m, _ = nearest_points(geom_m, home_pt_m)  # 投影座標で最近点

    # WGS84 に戻して返す
    p_on_coast_wgs = gpd.GeoSeries([p_on_coast_m], crs=metric_crs).to_crs(4326).iloc[0]
    return p_on_coast_wgs

へ変更

これで

# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5)  # (lat, lon)

# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
    epicenter_latlon=epicenter,
    coast_gdf=coast,            # 既に読み込み済みの海岸線(EPSG:4326推奨)
    home_latlon=home_wgs,
    h_rep_m=3000.0,             # 代表水深:3,000m(必要に応じ調整)
    safety_slow_factor=1.2      # 速度を 1/1.2 に落として安全側
)

# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"

# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
    GPKG=GPKG,
    arrival_time_min=arr_min,
    height_class=height_class,
    confidence=confidence
)

# sim_runs にログ追記
append_sim_run(
    GPKG=GPKG,
    epicenter_latlon=epicenter,
    home_latlon=home_wgs,
    arrival_time_min=arr_min,
    h_rep_m=3000.0,
    safety_slow_factor=1.2,
    height_class=height_class,
    confidence=confidence
)

を再度実行すると

✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low
✓ sim_runs に追記しました。

となる

ここまではできたので
推定到達時間に連動して等時間到達圏(Y分)を自動生成 → 安全エリアを重ねて表示

# === 推定到達時間に連動した等時間到達圏の自動生成 ===
import geopandas as gpd

# 1) route_latest から arrival_time_min を取得(安全側 70% & 上限20分)
rl = gpd.read_file(GPKG, layer="route_latest")
arr = float(rl["arrival_time_min"].iloc[0]) if "arrival_time_min" in rl else 15.0
Y = max(5, min(20, int(arr * 0.7)))   # 5〜20分にクリップ

# 2) 出発ノード & 重み(既存関数)
orig = nearest_node_from_wgs(G2, lat=home_wgs[0], lon=home_wgs[1])
add_edge_time_weight(G2, speed_kmh=4.8)

# 3) 到達圏→非浸水差し引き
iso_gdf, reach_edges_4326 = isochrone_polygon(
    G2, origin_node=orig, cutoff_min=Y,
    proj_crs=METRIC, buffer_m=35.0, weight_attr="time_min"
)
safe_area_now = subtract_flood(iso_gdf, flood_union, proj_crs=METRIC)

# 4) 上書き保存(pyogrio + OVERWRITE)
iso_layer = f"isochrone_walk_{Y}min"
for gdf, layer in [(iso_gdf, iso_layer), (safe_area_now, "safe_area_now")]:
    gdf.to_file(GPKG, layer=layer, driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"})

# 5) Folium へ重ねて保存
folium.GeoJson(iso_gdf.__geo_interface__, name=iso_layer,
               style_function=lambda x: {"fillOpacity":0.20, "color":"black", "weight":1}).add_to(m)
folium.GeoJson(safe_area_now.__geo_interface__, name="安全エリア(到達圏∩非浸水)",
               style_function=lambda x: {"fillOpacity":0.30, "weight":0}).add_to(m)
m.save("outputs/evac.html")
print(f"✓ {iso_layer} / safe_area_now を更新し、outputs/evac.html を保存しました。")

✓ isochrone_walk_15min / safe_area_now を更新し、outputs/evac.html を保存しました。

となる

震源→到達時間v0→(Y分)等時間到達圏→非浸水エリア→地図」が自動でひとまとめになるので
次は
EEW/揺れ検知トリガーからこの一連を叩く CLI に繋げる
そのためにまずは
EEW/揺れ検知トリガーの作成をする

時間到達圏(isochrone)生成関数から作成

時間到達圏(isochrone)生成関数から作成

G2(浸水回避済みグラフ)と自宅座標から 徒歩Y分の到達圏ポリゴン を作る

到達圏は「到達可能エッジを太らせて面化」する堅牢方式
(alpha-shape不要、Shapely/GeoPandas 標準で完結)

前提:G2(到達可能道路グラフ)、home_wgs、GPKG、METRIC=”EPSG:32654″ が既に定義済み

# === Isochrone 生成ユーティリティ ===
import math
import geopandas as gpd
import networkx as nx
import osmnx as ox
from shapely.ops import unary_union

def add_edge_time_weight(G, speed_kmh: float = 4.8, attr_name: str = "time_min"):
    """
    エッジに 'time_min'(分)を付与(長さ[m] / 速度[km/h]).
    既にある場合は上書き。
    """
    fac = speed_kmh * 1000 / 60.0  # m / min
    for u, v, k, data in G.edges(keys=True, data=True):
        length_m = float(data.get("length", 0.0))
        data[attr_name] = length_m / fac
    return G

def nearest_node_from_wgs(G, lat: float, lon: float):
    """WGS84の(lat,lon)から最近傍ノードIDを返す(osmnxのAPI差異に両対応)。"""
    try:
        return ox.distance.nearest_nodes(G, X=lon, Y=lat)
    except AttributeError:
        return ox.nearest_nodes(G, X=lon, Y=lat)

def isochrone_polygon(
    G,
    origin_node: int,
    cutoff_min: float,
    proj_crs: str = "EPSG:32654",
    buffer_m: float = 35.0,
    weight_attr: str = "time_min",
):
    """
    到達可能サブグラフのエッジを buffer で面化 → マルチポリゴン化。
    返り値: (isochrone_gdf[EPSG:4326], reachable_edges_gdf[EPSG:4326])

    - cutoff_min: 到達時間しきい値(分)
    - buffer_m: エッジに与えるバッファ[m](道路幅+余裕)
    """
    # dijkstra で到達ノード
    dist = nx.single_source_dijkstra_path_length(G, origin_node, cutoff=cutoff_min, weight=weight_attr)
    reachable_nodes = set(dist.keys())
    if not reachable_nodes:
        raise RuntimeError("到達ノードが空です。cutoff_min が小さすぎる可能性。")

    # 到達エッジ抽出
    nodes_gdf, edges_gdf = ox.graph_to_gdfs(G, nodes=True, edges=True)
    idx = edges_gdf.reset_index()[["u", "v", "key"]].apply(tuple, axis=1)
    mask = [(u in reachable_nodes and v in reachable_nodes) for (u, v, k) in idx]
    reach_edges = edges_gdf.loc[mask].copy()
    if reach_edges.empty:
        raise RuntimeError("到達エッジが空です。グラフが疎か cutoff_min が小さい可能性。")

    # 面化(project→buffer→union→修正→逆投影)
    reach_proj = reach_edges.to_crs(proj_crs)
    merged = unary_union(reach_proj.buffer(buffer_m).geometry.values)
    # geometry 修正(buffer(0) に相当)
    poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union

    # 出力(4326)
    iso_gdf = gpd.GeoDataFrame({"mode": ["walk"], "minutes": [float(cutoff_min)]},
                               geometry=[poly], crs=proj_crs).to_crs(4326)
    reach_edges_4326 = reach_proj.to_crs(4326)
    return iso_gdf, reach_edges_4326

def subtract_flood(iso_gdf: gpd.GeoDataFrame, flood_union_gdf: gpd.GeoDataFrame, proj_crs: str = "EPSG:32654"):
    """
    isochrone(面)から津波浸水(union)を差し引き → 「到達可能 × 非浸水」エリア。
    返り値: safe_area_now_gdf[EPSG:4326]
    """
    iso_m = iso_gdf.to_crs(proj_crs)
    fl_m  = flood_union_gdf.to_crs(proj_crs)
    safe_geom = iso_m.geometry.iloc[0].difference(fl_m.unary_union)
    safe = gpd.GeoDataFrame({"minutes":[iso_gdf["minutes"].iloc[0]]}, geometry=[safe_geom], crs=proj_crs).to_crs(4326)
    return safe

実行すると

/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/1981613481.py:59: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union
/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/1981613481.py:74: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  safe_geom = iso_m.geometry.iloc[0].difference(fl_m.unary_union)
<folium.features.GeoJson at 0x178279f10>

警告は Shapely/GeoPandas 2.x 系の非推奨によるもの
unary_union は廃止予定 → union_all() に置き換えればOK

変更すると

/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/832098648.py:59: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union
<folium.features.GeoJson at 0x17dfdf350>
    poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union

も修正する

poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).union_all()
merged = unary_union(reach_proj.buffer(buffer_m).geometry.values)

も修正

buf = reach_proj.buffer(buffer_m)
merged = gpd.GeoSeries(buf, crs=proj_crs).union_all()

に書き換える

これでエラーが消える

重みtime_minはエッジ長さ/歩行速度。最短経路と同様にDijkstraで「時間≦Y分」のノード集合を取り、両端が到達可能なエッジのみ抽出→buffer→unionで面化しています。
buffer_m は道路幅+余裕のパラメータ。市街地なら 30–50m で見た目・連続性が良好です。
非浸水差引は isochrone – flood_union。あなたの既存 flood_union(津波浸水 union)をそのまま使えます。
保存は pyogrio + OVERWRITE に統一(毎回クリーン)。

次は

# ② 津波「到達時間・高さ」簡易推定モジュール

**目的**: 「あと何分で来る?」に即答。初期は簡易/安全側、後で精緻化。
**ステップ**

* v0(当日実用版)

* **深海近似の伝播速度** (c ≈ √(g·h)) を **一定深さ代表値**で近似(安全側に遅めに見積もる)
* 震源→沿岸/自宅最寄り海岸線までの距離から到達時間を算出
* 高さは情報源(JMA速報の区分)を採用、未入手なら「不明→避難優先」のメッセージ
* v1(精緻化)

* 沿岸ごとの**水深ラスタ**(GEBCO/J-EGG等)から距離積分で可変速度伝播
* 複数反射・地形収束は扱わず「防災判断に使える保守的な上限/下限」を出す
* 出力

* `arrival_time_min`, `height_class`, `confidence` を `route_latest`/`sim_runs` に書込

使用する用途のコンセプト

使用する用途のコンセプト また震源地から津波が到達するまでの時間と津波の高さを計算し GIS情報を使い避難するエリアを算出しルート案内できるようにしたい 家族の避難経路・地域防災マップをGraphDBで整理 避難所や重要連絡先も紐付け 使い方例: 「地震が来たら最寄りの避難所はどこ?」 「非常食の賞味期限が近いものは?」 など この用途で使うならどれを推奨する?

✅ あなたの用途の要件
* 精度:震源→沿岸までの到達時間・波高推定の入力に使うので、誤差が大きすぎると危険。
* 範囲:主に日本周辺で十分。全球は必須ではない。
* 更新頻度:将来的に OSM/防災機関の最新データと連携したい。
* GraphDB連携:避難所・道路・避難経路・浸水域をノード/リレーションで管理。
* 利用イメージ:
* 「地震が来たら最寄りの避難所はどこ?」 → 避難所ノード+道路グラフ
* 「非常食の賞味期限が近いものは?」 → 在庫管理ノード
* 「津波到達までに何分ある?」 → 震源ノード→海岸ノードのコスト距離

✅ データセットごとの特徴
1. 国土数値情報「海岸線データ」
* 日本限定。行政データとして信頼性が高い。
* GML/Shapefile 提供。
* 避難計画(住民向け防災マップ)に最も適する。
➡ 推奨(基準データにする)
2. OSM Coastlines / Land polygons
* 更新頻度が高い(週次〜月次)。
* 最新の人工改変(埋立地・護岸)も比較的反映。
* 避難経路や実際の生活圏との整合性が良い。
➡ 補助データ(現況把握用)
3. Natural Earth
* 全球対応、軽量。
* 精度は粗い(1:10mでも数百 m〜km の誤差)。
* 可視化や概念実証には便利。
➡ プロトタイプ用のみ
4. GSHHG
* 全球対応、高解像度あり。
* ファイルが大きく、取り回しに手間。
* 学術研究向き。
➡ 専門シミュレーション以外では過剰

✅ 推奨構成(防災マップ+GraphDB向け)
1. 海岸線:
* 基本 → 国土数値情報(海岸線)
* 最新の現況補正 → OSM Coastlines
2. 標高/地形:
* 陸域 DEM → 国土地理院 DEM / ALOS / SRTM
* 海域 DEM → GEBCO
3. 避難所・道路:
* 避難所 → 自治体オープンデータ(CSV/GeoJSON)
* 道路 → OSMnx で抽出
4. GraphDBに格納:
* ノード:震源・海岸・浸水域・避難所・家庭・在庫
* リレーション:到達時間・避難経路・所要時間・備蓄品管理

✅ まとめ
ご提示の用途(家族避難+地域防災+GraphDB管理)なら:
* **国土数値情報(海岸線)**をメイン
* OSM Coastlinesで現況更新
* DEM(陸域+海域)+道路+避難所を組み合わせ
* GraphDBでノード/リレーションを整理して「質問に答えられる防災知識ベース」を構築

これらを考慮して次に作る機能は?

いいね、その前提なら“すぐ効く順”でこれを作るのが最短です。各項目は**完成定義(DoD)**と**最小コード断片**まで落とし込みました。

# 1) 震源→到達時間v0(安全側近似)

**目的**: 「あと何分?」に即答(まずは簡易モデル)。
**DoD**: `route_latest` に `arrival_time_min`, `height_class`, `confidence` を追記して保存。
**やること**

* 震源点(緯度経度)と自宅最寄り海岸線点の距離`D`を計算
* 代表水深`h_rep`で c≈√(g·h_rep)(保守的に**遅め**で見積り)
* `arrival_time_min = D / c / 60`(分)
* 波高は「不明なら ‘unknown’」、速報値が入手できたら `height_class` に格納

def arrival_time_min_km(d_km, h_rep=3000):  # m
    import math
    g=9.81; c=math.sqrt(g*h_rep) # m/s
    return (d_km*1000)/c/60

# 保存時に route_latest の属性へ追記
route_gdf["arrival_time_min"] = [arrival_time_min_km(D_km)]
route_gdf["height_class"] = ["unknown"]
route_gdf["confidence"] = ["low"]  # v0は保守的
```

# 2) 「安全エリア」ポリゴン生成(反転×等時間到達圏)

**目的**: 地図で「今向かうべき範囲」を1枚で示す。
**DoD**: `safe_area_now` と `isochrone_walk_min` を GPKG に**上書き**保存し、Foliumに重ねる。
**やること**

* 浸水想定を反転(`safe = study_area – tsunami_inundation`)
* 歩行速度4.8km/hで `Y = min(arrival_time*0.7, 20)` 分の等時間到達圏(graph isochrone)
* `safe_area_now = safe ∩ isochrone`

(等時間到達圏は既存グラフ`G2`から到達ノード抽出→凸包/マルチポリゴン化でOK)

# 3) 自動実行パイプライン(CLI化)

**目的**: トリガーが来たら無人で回る。
**DoD**: `python evac_pipeline.py –home 34.728,137.978 –quake 33.0,138.5 –radius 12000` で
`roads_safe`,`shelters_safe`,`route_latest`,`safe_area_now` が更新され、LINE/音声に通知。

**雛形**

# evac_pipeline.py
import argparse
def main():
    ap=argparse.ArgumentParser()
    ap.add_argument("--home", required=True)  # "lat,lon"
    ap.add_argument("--quake", required=True) # "lat,lon"
    ap.add_argument("--radius", type=int, default=12000)
    ap.add_argument("--notify", default="line,voice")
    args=ap.parse_args()
    # 1) データ更新→ 2) ルート再計算→ 3) 到達時間→ 4) 安全エリア→ 5) 保存→ 6) 通知
    # (既存コードを関数化して順番に呼ぶ)
if __name__=="__main__":
    main()

# 4) GraphDB(Neo4j) スキーマ初期化

**目的**: 「質問に答えられる」グラフへ。
**DoD**: `Schema v1` が作成され、`Route(latest=true)` が常に1本だけ存在。

**Cypher(最小)**

CREATE CONSTRAINT shelter_id IF NOT EXISTS FOR (s:Shelter) REQUIRE s.sid IS UNIQUE;
CREATE CONSTRAINT route_id   IF NOT EXISTS FOR (r:Route)   REQUIRE r.rid IS UNIQUE;

UNWIND $shelters AS s
MERGE (sh:Shelter {sid:s.sid})
  SET sh.name=s.name, sh.lat=s.lat, sh.lon=s.lon, sh.safe=s.safe;

MERGE (h:Home {hid:'home-1'}) SET h.lat=$home.lat, h.lon=$home.lon;

WITH $route AS r
MERGE (rt:Route {rid:r.rid})
  SET rt.meters=r.meters, rt.eta_walk=r.eta_walk, rt.eta_walk_safe=r.eta_walk_safe,
      rt.arrival_time_min=r.arrival_time_min, rt.height_class=r.height_class,
      rt.created_at=r.created_at, rt.latest=true;

MATCH (h:Home {hid:'home-1'}), (best:Shelter {sid:$best_sid})
MERGE (h)-[:SAFE_PATH_TO {mode:'walk'}]->(best);

MATCH (x:Route {latest:true}) WHERE x.rid <> $route.rid
SET x.latest=false;

# 5) 緊急ダッシュボード(Folium MVP)

**目的**: 家族が1枚で把握。
**DoD**: `outputs/evac.html` に以下が出る。

* `route_latest`(距離/徒歩ETA/安全係数/推定到達までX分)
* `safe_area_now`(半透明塗り)
* 「Googleマップで開く」直リンク

**追加断片**

folium.GeoJson(safe_area_now.__geo_interface__, name="安全エリア", style_function=lambda x: {
    "fillOpacity":0.25, "weight":0
}).add_to(m)

gmap_url=f"https://www.google.com/maps/dir/?api=1&destination={dest_pt.y},{dest_pt.x}&travelmode=walking"
folium.Marker([dest_pt.y, dest_pt.x],
    popup=f"<a href='{gmap_url}' target='_blank'>Googleマップで開く</a>",
    icon=folium.Icon(color='blue')).add_to(m)

# 6) 期限ウォッチャ(非常食)

**目的**: 「切れてた」を無くす。
**DoD**: SQLite(またはNeo4j)から**14日以内期限**を抽出し、朝9時にLINE/音声通知。

**SQL 例**

SELECT name, qty, expiry_date
FROM supplies
WHERE DATE(expiry_date) <= DATE('now','+14 day')
ORDER BY expiry_date ASC;

# 7) イベントトリガー(擬似EEW→本番)

**目的**: 自動化の完成。
**DoD**: ダミーの震源JSONを受けて③のCLIを起動→将来EEW/津波情報に差し替え。

**擬似Webhook**

curl -X POST http://localhost:8080/quake \
  -H "Content-Type: application/json" \
  -d '{"lat":33.0,"lon":138.5,"mag":7.6,"depth_km":20}'

## 実装順(2スプリント想定)

1. **③ CLI化** → 既存処理の関数化・引数化
2. **1) 到達時間v0** → `route_latest` 属性追加
3. **2) 安全エリア** → 反転×等時間到達圏を保存&Folium表示
4. **5) ダッシュボード** → GMapリンク・要点表示
5. **4) GraphDB投入** → Route/Shelter/関係だけ先行
6. **6) 期限ウォッチャ** → 朝9時通知
7. **7) 擬似EEWトリガー** → 自動化通し

必要なら、上記の**等時間到達圏(isochrone)生成関数**と**Neo4j投入用の具体的なPythonコード**をすぐ出します。どこから書くか一言くれたら、その部分を一気に仕上げます。

ということでまずは
上記の等時間到達圏(isochrone)生成関数から作成