どの分岐でも同じスキーマで必ず上書き保存するように 共通の保存関数を作り、キャッシュヒット時も呼ぶようにする

import sys, os
sys.path.append(os.path.abspath("."))  # プロジェクト直下をパスに追加
# 以後 from evac.config import Cfg が通る

import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString
import osmnx as ox
import networkx as nx
import folium
import os

GPKG = "outputs/shizuoka_hazard.gpkg"

# 1) 既存レイヤ読み込み(両方とも 4326)
coast   = gpd.read_file(GPKG, layer="coastline")
tsunami = gpd.read_file(GPKG, layer="tsunami_inundation")

# ----- パラメータ -----
# 自宅座標(lat, lon)←必ず置き換え
home_wgs = (34.728, 137.978)
# OSM道路の取得半径[m](到達不能なら大きく:15000, 20000…)
GRAPH_RADIUS_M = 12000
# 避難所CSV(cp932想定)
SHELTER_CSV = "data/20250801_指定緊急避難場所一覧.CSV"
# ----------------------

# 2) 避難所CSV→点化(列名は自動検出)
df = pd.read_csv(SHELTER_CSV, encoding="cp932")
lat_candidates = 
lon_candidates = 
if not lat_candidates or not lon_candidates:
    raise ValueError("CSVに緯度/経度列が見つかりません。列名を教えてください。")
lat_col, lon_col = lat_candidates[0], lon_candidates[0]
df[lat_col] = pd.to_numeric(df[lat_col], errors="coerce")
df[lon_col] = pd.to_numeric(df[lon_col], errors="coerce")
df = df.dropna(subset=[lat_col, lon_col])
shelters = gpd.GeoDataFrame(df.copy(),
    geometry=gpd.points_from_xy(df[lon_col], df[lat_col], crs="EPSG:4326")
)

# 3) 安全な避難所 = 浸水域と非交差
#   (unionはメモリ負荷になるので、空間結合で判定)
safe_join = gpd.sjoin(shelters, tsunami[["geometry"]], how="left", predicate="intersects")
shelters["safe"] = safe_join["index_right"].isna()
safe_shelters = shelters[shelters["safe"]].copy()

# --- 4) OSM道路取得(徒歩網) ---
G = ox.graph_from_point(home_wgs, dist=GRAPH_RADIUS_M, network_type="walk", simplify=True, retain_all=False)
nodes, edges = ox.graph_to_gdfs(G, nodes=True, edges=True)

# u,v,key を“列”にする(重要)
edges = edges.reset_index()

# CRSを4326に
if edges.crs is None:
    edges = edges.set_crs(4326)
elif edges.crs.to_epsg() != 4326:
    edges = edges.to_crs(4326)
# --- 5) 浸水域と交差する道路を除去(sjoin→集計で一意化) ---
left = edges[["u","v","key","geometry"]].copy()   # edges は reset_index() 済み想定
right = tsunami[["geometry"]]

tag = gpd.sjoin(left, right, how="left", predicate="intersects")

blocked = (
    tag.assign(hit=tag["index_right"].notna())
       .groupby(["u","v","key"], as_index=False)["hit"].max()
       .rename(columns={"hit":"blocked"})
)

edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
edges2["blocked"] = edges2["blocked"].fillna(False)

edges_keep = edges2.loc[~edges2["blocked"]].copy()

# ★ ここが重要:graph_from_gdfs に渡す前に MultiIndex に戻す
#   (key が float になっていたら int にしておくと安心)
if edges_keep["key"].isna().any():
    edges_keep["key"] = edges_keep["key"].fillna(0)

edges_keep["key"] = edges_keep["key"].astype(int)
edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()

# --- 6) サブグラフ再構築(通行可のみ) ---
used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()

# nodes_keep の geometry 列が欠けていたら補完
if "geometry" not in nodes_keep or nodes_keep["geometry"].isna().any():
    nodes_keep = nodes_keep.set_geometry(
        gpd.points_from_xy(nodes_keep["x"], nodes_keep["y"]), crs=edges_keep.crs
    )

# OSMnx のバージョン差に対応
try:
    G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))
except AttributeError:
    from osmnx import utils_graph as ug
    G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))


# 7) 出発・目的地ノード(最寄り“安全”避難所)
home_pt = Point(home_wgs[1], home_wgs[0])
# 最寄り安全避難所(直線距離)…なければ全体から選ぶ
TARGET = safe_shelters if len(safe_shelters) else shelters
METRIC = "EPSG:32654"# 静岡周辺 UTM
hp = gpd.GeoSeries([home_pt], crs=4326).to_crs(METRIC).iloc[0]
tp = TARGET.to_crs(METRIC).copy()
tp["__dist_m__"] = tp.geometry.distance(hp)
dest_pt = TARGET.loc[tp["__dist_m__"].idxmin(), "geometry"]

# ▶ 互換呼び分け
try:
    orig = ox.distance.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.distance.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)
except AttributeError:
    # 新API名(環境によってはこちらが有効)
    orig = ox.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)


# 8) 最短経路(通行可能グラフ)
try:
    route = nx.shortest_path(G2, orig, dest, weight="length")
except nx.NetworkXNoPath:
    raise RuntimeError("到達経路なし。GRAPH_RADIUS_Mを増やすか、別の避難所で再試行を。")

# 9) GPKGへ追記
# #    既にある場合は上書きしたいなら driverオプションを切り替える(ここは追記)
# shelters.to_file(GPKG, layer="shelters_all", driver="GPKG")
# safe_shelters.to_file(GPKG, layer="shelters_safe", driver="GPKG")
# edges.to_file(GPKG, layer="roads_all", driver="GPKG")
# edges_keep.to_file(GPKG, layer="roads_safe", driver="GPKG")

# 9) GPKGへ書き出し(毎回クリーンに上書き)
common_kwargs = dict(
    driver="GPKG",
    engine="pyogrio",
    layer_options={"OVERWRITE": "YES"},  # 既存レイヤーを置き換える
)

shelters.to_file(GPKG, layer="shelters_all",   **common_kwargs)
safe_shelters.to_file(GPKG, layer="shelters_safe", **common_kwargs)
edges.to_file(GPKG,    layer="roads_all",     **common_kwargs)
edges_keep.to_file(GPKG, layer="roads_safe",  **common_kwargs)


# 10) Folium地図
m = folium.Map(location=[home_wgs[0], home_wgs[1]], zoom_start=12, control_scale=True)
folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定(A40)").add_to(m)
folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)

# 経路を線で描画
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip="避難経路").add_to(m)

# 出発・目的地
folium.Marker([home_wgs[0], home_wgs[1]], popup="現在地", icon=folium.Icon(color="green")).add_to(m)
folium.Marker([dest_pt.y, dest_pt.x], popup="避難所", icon=folium.Icon(color="blue")).add_to(m)

# 避難所レイヤ(安全のみ強調)
folium.GeoJson(safe_shelters.__geo_interface__, name="安全な避難所",
               marker=folium.CircleMarker(radius=4)).add_to(m)
folium.LayerControl().add_to(m)

# 保存(任意)
m.save("outputs/evac.html")
print("✓ 完了: GPKG追記 & outputs/evac.html を作成")

# # ルートの延長[m]は G2 の edge 属性 'length' を合計(OSMnxが付けてくれてます)
# meters = sum(ox.utils_graph.get_route_edge_attributes(G2, route, 'length'))
# walk_kmh = 4.8   # 歩行速度(必要に応じて変更)
# drive_kmh = 20.0 # 災害時の車想定の控えめ値(使うなら)

# eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
# eta_drive_min = meters / (drive_kmh * 1000 / 60)
# print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")

# ▼ 距離[m]の取り方をバージョン互換で
try:
    # 新しめのOSMnx(推奨): ルートをGDF化して length を合計
    route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
    meters = float(route_edges["length"].sum())
except Exception:
    # フォールバック: NetworkXからエッジ属性 length を合計
    meters = 0.0
    for u, v in zip(route[:-1], route[1:]):
        ed = G2.get_edge_data(u, v)  # MultiDiGraph: {key: attr_dict}
        if isinstance(ed, dict) and len(ed):
            meters += min(d.get("length", 0.0) for d in ed.values())

# ETA(必要なら速度を調整)
walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")

# Folium のツールチップにも表示
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分"
).add_to(m)


from shapely.geometry import LineString
import geopandas as gpd

route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]
route_gdf = gpd.GeoDataFrame(
    {"meters":[meters], "eta_min_walk":[eta_walk_min], "eta_min_drive":[eta_drive_min]},
    geometry=[LineString(route_lonlat)], crs="EPSG:4326"
)
route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

import geopandas as gpd
# ok = not route_gdf.intersects(tsunami.unary_union).iloc[0]
# print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")

# 旧: tsunami.unary_union  ← 警告が出る
# 新:
flood_union = tsunami.geometry.union_all()
ok = not route_gdf.intersects(flood_union).iloc[0]
print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")

# 
# === ルート検証 → ETA → 保存(キャッシュ対応, 上書き&追記の方針を反映) ===
from shapely.geometry import LineString, Point
import geopandas as gpd
import pandas as pd
import hashlib, json
from datetime import datetime
from fiona import listlayers
import osmnx as ox
import networkx as nx

GPKG = "outputs/shizuoka_hazard.gpkg"
METRIC = "EPSG:32654"  # 静岡周辺

# 0) 津波 union はキャッシュを優先(なければ作る)→ 上書き耐性ありで作成可
if "tsunami_union" in listlayers(GPKG):
    flood_union = gpd.read_file(GPKG, layer="tsunami_union")
else:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    flood_union.to_file(
        GPKG, layer="tsunami_union",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}  # (3) 上書き耐性
    )

# 0') ハザードの簡易メタ(A40が変わったらルートキャッシュ無効化に使う)
ti = gpd.read_file(GPKG, layer="tsunami_inundation")
hazard_meta = f'{len(ti)}|{",".join(map(str, ti.total_bounds))}'

# 1) キャッシュキー生成(端点・モード・半径・ハザードメタ)
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=12000, hazard_meta=None):
    payload = {"home": home_wgs, "dest": dest_wgs, "mode": mode, "r": radius_m, "hz": hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

key = route_key(home_wgs, (dest_pt.y, dest_pt.x), mode="walk",
                radius_m=GRAPH_RADIUS_M, hazard_meta=hazard_meta)

# 2) ルートキャッシュにヒットすれば使う
route_gdf = None
use_cached_route = False
if "route_cache" in listlayers(GPKG):
    cache = gpd.read_file(GPKG, layer="route_cache")
    hit = cache.loc[cache["rid"] == key]
    if len(hit):
        route_gdf = hit.set_crs(4326)
        meters = float(route_gdf["meters"].iloc[0])
        eta_walk_min  = float(route_gdf["eta_min_walk"].iloc[0])
        eta_walk_safe = float(route_gdf["eta_min_walk_safe"].iloc[0])
        eta_drive_min = float(route_gdf["eta_min_drive"].iloc[0])
        coords = list(route_gdf.geometry.iloc[0].coords)
        route_latlon = [(lat, lon) for lon, lat in coords]  # folium用 (lat,lon)
        use_cached_route = True
        print("↩︎ ルート: キャッシュヒット")

if not use_cached_route:
    # 3) これまで通りの経路('route')から LineString を作成
    route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
    route_geom = LineString(route_lonlat)
    route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

    # 4) 堅牢チェック(交差ゼロ&境界近接なし)
    route_m = route_gdf.to_crs(METRIC)
    flood_m = flood_union.to_crs(METRIC)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty

    if inside_len_m > 0 or near:
        raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                           "別避難所 or 半径拡大で再探索してください。")

    # 5) 距離[m]→ETA
    try:
        route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u, v in zip(route[:-1], route[1:]):
            ed = G2.get_edge_data(u, v)
            meters += min(d.get("length", 0.0) for d in ed.values())

    walk_kmh = 4.8
    drive_kmh = 20.0
    eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
    eta_drive_min = meters / (drive_kmh * 1000 / 60)
    eta_walk_safe = eta_walk_min * 1.3

    # 6) route_latest へ保存(検証済みのみ)→ 上書き
    out_gpkg = GPKG
    route_gdf = gpd.GeoDataFrame(
        {
            "meters":[meters],
            "eta_min_walk":[eta_walk_min],
            "eta_min_walk_safe":[eta_walk_safe],
            "eta_min_drive":[eta_drive_min],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326",
    )
    route_gdf.to_file(
        out_gpkg, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}  # (1) 上書き
    )
    print("✅ 交差なし・保存完了")

    # 7) ルートキャッシュに追記(pyogrio 流儀)
    route_cache_rec = route_gdf.copy()
    route_cache_rec["rid"] = key
    try:
        route_cache_rec.to_file(
            out_gpkg, layer="route_cache",
            driver="GPKG", engine="pyogrio",
            append=True                     # (2) 追記
        )
    except Exception:
        # フォールバック:スキーマ差や古い環境でも確実に追記する
        if "route_cache" in listlayers(out_gpkg):
            old = gpd.read_file(out_gpkg, layer="route_cache")
            cols = sorted(set(old.columns) | set(route_cache_rec.columns))
            old = old.reindex(columns=cols)
            new = route_cache_rec.reindex(columns=cols)
            merged = gpd.GeoDataFrame(pd.concat([old, new], ignore_index=True), crs=4326)
            merged.to_file(
                out_gpkg, layer="route_cache",
                driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"}  # マージして上書き
            )
        else:
            route_cache_rec.to_file(
                out_gpkg, layer="route_cache",
                driver="GPKG", engine="pyogrio"
            )

    # 8) folium用座標
    route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]

# 9) Folium表示(キャッシュ・新規どちらでも)
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

の後に

import geopandas as gpd
gpd.read_file("outputs/shizuoka_hazard.gpkg", layer="route_latest")[["meters","eta_min_walk","eta_min_walk_safe","eta_min_drive"]]

を実行すると

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
Cell In[31], line 2
      1 import geopandas as gpd
----> 2 gpd.read_file("outputs/shizuoka_hazard.gpkg", layer="route_latest")[["meters","eta_min_walk","eta_min_walk_safe","eta_min_drive"]]

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/geodataframe.py:1896, in GeoDataFrame.__getitem__(self, key)
   1890 def __getitem__(self, key):
   1891     """
   1892     If the result is a column containing only 'geometry', return a
   1893     GeoSeries. If it's a DataFrame with any columns of GeometryDtype,
   1894     return a GeoDataFrame.
   1895     """
-> 1896     result = super().__getitem__(key)
   1897     # Custom logic to avoid waiting for pandas GH51895
   1898     # result is not geometry dtype for multi-indexes
   1899     if (
   1900         pd.api.types.is_scalar(key)
   1901         and key == ""
   (...)   1904         and not is_geometry_type(result)
   1905     ):

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pandas/core/frame.py:4108, in DataFrame.__getitem__(self, key)
   4106     if is_iterator(key):
   4107         key = list(key)
-> 4108     indexer = self.columns._get_indexer_strict(key, "columns")[1]
   4110 # take() does not accept boolean indexers
   4111 if getattr(indexer, "dtype", None) == bool:

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pandas/core/indexes/base.py:6200, in Index._get_indexer_strict(self, key, axis_name)
   6197 else:
   6198     keyarr, indexer, new_indexer = self._reindex_non_unique(keyarr)
-> 6200 self._raise_if_missing(keyarr, indexer, axis_name)
   6202 keyarr = self.take(indexer)
   6203 if isinstance(key, Index):
   6204     # GH 42790 - Preserve name from an Index

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pandas/core/indexes/base.py:6252, in Index._raise_if_missing(self, key, indexer, axis_name)
   6249     raise KeyError(f"None of [{key}] are in the [{axis_name}]")
   6251 not_found = list(ensure_index(key)[missing_mask.nonzero()[0]].unique())
-> 6252 raise KeyError(f"{not_found} not in index")

KeyError: "['eta_min_walk_safe'] not in index"

となった

GPTによれば
原因は2つのどちらか(または両方

途中の「一回目の書き出し」で eta_min_walk_safe を含まないスキーマが作られた

最初に meters / eta_min_walk / eta_min_drive だけを持つ route_gdf を
書き出す箇所があります(OVERWRITE なし)。
この時点で route_latest の列定義が確定 します。
その後に eta_min_walk_safe を追加して上書きしても、環境によっては列が残らない/読み出し側で見えないことがあります

route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

がその部分

次の理由として
キャッシュヒット分岐では route_latest を書き直していない
ルート: キャッシュヒット になった場合、後段の

# 6) route_latest へ保存(…)
route_gdf.to_file(... OVERWRITE="YES")

がが実行されません。結果、古いスキーマのまま

このため

from shapely.geometry import LineString
import geopandas as gpd

route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]
route_gdf = gpd.GeoDataFrame(
    {"meters":[meters], "eta_min_walk":[eta_walk_min], "eta_min_drive":[eta_drive_min]},
    geometry=[LineString(route_lonlat)], crs="EPSG:4326"
)
# route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

の中の

# ❌ これを削除:
route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

をコメントアウト

どの分岐でも同じスキーマで必ず上書き保存するように
共通の保存関数を作り、キャッシュヒット時も呼ぶようにする

from datetime import datetime

def save_route_latest(out_gpkg, route_geom, meters, eta_walk_min, eta_drive_min, eta_walk_safe):
    gdf = gpd.GeoDataFrame(
        {
            "meters":[float(meters)],
            "eta_min_walk":[float(eta_walk_min)],
            "eta_min_walk_safe":[float(eta_walk_safe)],
            "eta_min_drive":[float(eta_drive_min)],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326"
    )
    gdf.to_file(
        out_gpkg, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}
    )

無限ループの対処

一度 notebook を止めて再起動し
以下のコードを実行した
結果は以下の通り

import requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

def parse_entry_xml(url):
    r = requests.get(url, timeout=15)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波領域の抽出
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if kind_name:
            kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    return {
        "title": title, "report_dt": report_dt,
        "info_kind": info_kind, "info_type": info_type,
        "headline": headline, "kinds": list(set(kinds)),
        "areas": areas, "url": url
    }

def is_tsunami_trigger(rec):
    # “津波”を広めに判定(タイトル・見出し・InfoKind)
    text_pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in text_pool:
        return False
    # 強ワード
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in text_pool for k in kw):
        return True
    # あるいはKindsに入っていればOK
    return any(k in rec.get("kinds", []) for k in kw)

# ---- ドライラン:feed取得→最新5件だけ詳細をなめて出力 ----
r = requests.get(EQVOL_FEED, timeout=15)
print("feed status:", r.status_code, "bytes:", len(r.content))
d = feedparser.parse(r.text)
print("entries:", len(d.entries))
for e in d.entries[:5]:
    print("ENTRY:", e.get("published",""), e.get("title",""))
    try:
        rec = parse_entry_xml(e.link)
        print("  InfoKind:", rec["info_kind"], "| InfoType:", rec["info_type"])
        print("  Title   :", rec["title"])
        print("  Headline:", (rec["headline"] or "")[:120])
        print("  kinds   :", rec["kinds"], "| areas:", len(rec["areas"]))
        print("  -> TRIGGER?", is_tsunami_trigger(rec))
    except Exception as ex:
        print("  ERROR parsing:", ex)

を実行

feed status: 200 bytes: 27328
entries: 37
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 薩摩硫黄島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 諏訪之瀬島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 霧島山(新燃岳) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 浅間山 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 草津白根山(白根山(湯釜付近)) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False

もしこれが
entries=0 → フィード取得が怪しい(ネットワーク・一時ブロック)。requests.get(EQVOL_FEED).text[:2000]をprintして中身を確認
entries>0かつInfoKindやTitleに「津波」が無い → 正常(いま津波電文が無いだけ)
parse_entry_xmlでエラー → XML名前空間/パスが電文種別で違う可能性。print(rec[“url”])で該当XMLを保存して中身を見るのが早い

というように原因を理解できる

補足(実運用へ進めるとき)
静岡沿岸の津波予報区コードでareasをフィルタ(対象外は通知しない)
InfoType == “取消”で状態を解除に遷移
併せてextra.xmlも監視すると取りこぼしが減ります(仕様差し込みは後でOK)
ノートブックではなく常駐スクリプト/systemdで運用し、ログをローテーション

他にやったことは
1. 通信・ネットワークの確認
まず、最初の requests.get(EQVOL_FEED) で通信が成功しているかを確かめる

import requests
EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"

r = requests.get(EQVOL_FEED, timeout=15)
print("status:", r.status_code)
print(r.text[:1000])  # 最初の1000文字だけ表示

✅ status: 200 なら通信成功。
❌ もし 403 や Timeout が出た場合 → 通信がブロックされている(VPN・プロキシ・ファイアウォールなどの影響)。

結果は

status: 200
<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" lang="ja">
  <title>高頻度(地震火山)</title>
  <subtitle>JMAXML publishing feed</subtitle>
  <updated>2025-11-06T05:01:14+09:00</updated>
  <id>https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml#short_1762372874</id>
  <link rel="related" href="https://www.jma.go.jp/"/>
  <link rel="self" href="https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"/>
  <link rel="hub" href="http://alert-hub.appspot.com/"/>
  <rights type="html"><![CDATA[
<a href="https://www.jma.go.jp/jma/kishou/info/coment.html">利用規約</a>,
<a href="https://www.jma.go.jp/jma/en/copyright.html">Terms of Use</a>
]]></rights>
  <entry>
    <title>降灰予報(定時)</title>
    <id>https://www.data.jma.go.jp/developer/xml/data/20251105200049_0_VFVO53_010000.xml</id>
    <updated>2025-11-05T20:00:00Z</updated>
    <author>
      <name>気象庁</name>
    </author>
    <link type="application/xml" href="https://ww

カーネル・出力の健全性チェック
これで何も出ないなら、セルが実行されていない/カーネルが固まっている/出力パネルが壊れている可能性が濃厚

import sys, time, os, pathlib, subprocess

print("STEP A1: print works?", flush=True)
sys.stdout.write("STEP A2: stdout.write works?\n"); sys.stdout.flush()
sys.stderr.write("STEP A3: stderr works?\n"); sys.stderr.flush()

for i in range(3):
    print(f"tick {i}", flush=True)
    time.sleep(0.5)

# ファイル出力で実行確認(UI出力が壊れていても痕跡が残る)
pathlib.Path("notebook_alive.txt").write_text("alive\n")
print("STEP A4: wrote notebook_alive.txt", flush=True)

# サブプロセスの標準出力も確認
ret = subprocess.run(["echo", "STEP A5: subprocess echo"], capture_output=True, text=True)
print(ret.stdout, end="")
print("STEP A DONE", flush=True)

結果は

STEP A1: print works?
STEP A2: stdout.write works?
STEP A3: stderr works?
tick 0
tick 1
tick 2
STEP A4: wrote notebook_alive.txt
STEP A5: subprocess echo
STEP A DONE

しかし
再度

# 津波情報の取得
import time
import requests
import feedparser
import xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

last_modified = None
seen_ids = set()

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波関連の抽出(領域と種別)
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })

    return {
        "title": title,
        "report_dt": report_dt,
        "info_kind": info_kind,
        "info_type": info_type,
        "headline": headline,
        "kinds": list(set(kinds)),
        "areas": areas,
        "url": url,
    }

def is_tsunami_trigger(rec):
    # 種別に“津波警報・注意報・予報/津波情報”などが含まれるものを対象
    if "津波" not in (rec["info_kind"] or rec["title"] or rec["headline"]):
        return False
    # 警報/注意報/予報の明示があれば強トリガー
    keywords = ["大津波警報", "津波警報", "津波注意報", "津波予報", "津波情報"]
    if any(k in (rec["headline"] or "") for k in keywords):
        return True
    # 後方互換(kindsに入っていればOK)
    return any(k in keywords for k in rec["kinds"])

def loop():
    global last_modified
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)
                # 新しい順に処理
                for e in d.entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id)
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec):
                        # === ここであなたの通知/保存パイプラインへ ===
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"], rec["kinds"][:3], "areas=", len(rec["areas"]))
            if lm:
                last_modified = lm
        except Exception as ex:
            print("error:", ex)
        time.sleep(60)

if __name__ == "__main__":
    loop()

を実行すると終わらない

ノートブックで「終わらない」のは、**無限ループ中に“トリガー時しかprintしない”**から

= 津波電文が無い通常時は1分ごとに静かにsleep→再取得、を永遠に繰り返す

すぐ直すポイント(Notebook向け)
1. デバッグ出力を入れて「今なにしてるか」を毎ループ表示(flushも)
2. ループ回数に上限(MAX_LOOPS)を付けてセルが必ず終わる
3. 取得件数・新規処理件数・直近タイトルなどを表示
4. 例外時にURLも表示して切り分けしやすく

その前に処理を停止する方法を調べる

すぐ止める(今まさに動いているセルを止める)
* JupyterLab:Esc → I を素早く2回(= Interrupt)。またはツールバーの■(Stop)をクリック
* Classic Notebook:メニュー Kernel → Interrupt
* Google Colab:メニュー ランタイム → 実行を中断(ショートカット:Ctrl/Cmd + M, I)
* それでも止まらない/固まる時:Kernel → Restart(再起動)
これで KeyboardInterrupt が発生し、セルの実行が中断されます。

ということで
🔳アイコンをクリックして処理を停止

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[4], line 97
     94         time.sleep(60)
     96 if __name__ == "__main__":
---> 97     loop()

Cell In[4], line 94, in loop()
     92 except Exception as ex:
     93     print("error:", ex)
---> 94 time.sleep(60)

KeyboardInterrupt: 

と表示された

停止確認できたので
デバッグ出力を入れて「今なにしてるか」を毎ループ表示(flushも)
ループ回数に上限(MAX_LOOPS)を付けてセルが必ず終わる
取得件数・新規処理件数・直近タイトルなどを表示
例外時にURLも表示して切り分けしやすく

を入れる

Notebook向けに
* 3周で必ず終了(MAX_LOOPS=3)
* 毎ループの状況を表示(entries / new / last_modified と直近タイトル)
* 変更なしのときは 304 Not Modified と出力
になるので、「動いているのに何も出ない」を防げる

小さな実用メモ
* 早く挙動を見たいときは SLEEP_SEC = 15 などにするとテンポ良く確認できます。
* 何度かセルを繰り返し実行するなら、毎回まっさらで試すために先頭で seen_ids = set() を明示的に初期化しておくと安心です。
* 津波電文が出たときだけ [TSUNAMI TRIGGER] … が出ます(通常時は出ません)。
次の一手(お好みで)
* 対象地域フィルタ:静岡沿岸などの予報区コードだけ通知したい場合は、parse_entry_xml() で得た rec[“areas”] を使って絞れます(例:if not any(a[“code”] in SHIZUOKA_CODES for a in rec[“areas”]): return False)。コードは後で差し込みましょう(リストは用意します)。
* 見栄えの良い停止:loop() を try: … except KeyboardInterrupt: print(“Stopped by user”) で包むとトレース無しで止まります。
* 常駐運用:Notebook検証が終わったら MAX_LOOPS を外し、systemd などで常駐+ログローテーションにすると本番向きです。

ということで

# Notebookデモ用:必ず終わる・状況が見える版
import time, requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

last_modified = None
seen_ids = set()
MAX_LOOPS = 3          # ★ Notebookでは必ず終わらせる
SLEEP_SEC  = 60        # デモなら 15 にしてもOK
DEBUG = True

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)
    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if kind_name:
            kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    return {
        "title": title, "report_dt": report_dt, "info_kind": info_kind, "info_type": info_type,
        "headline": headline, "kinds": list(set(kinds)), "areas": areas, "url": url
    }

def is_tsunami_trigger(rec):
    pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in pool:
        return False
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in pool for k in kw): 
        return True
    return any(k in rec.get("kinds", []) for k in kw)

def loop():
    global last_modified
    loops = 0
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)
                new_cnt = 0
                last_titles = []
                for e in d.entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id)
                    new_cnt += 1
                    last_titles.append(e.title)
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec):
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"], rec["kinds"][:3], "areas=", len(rec["areas"]), flush=True)
                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] entries={len(d.entries)} new={new_cnt} last_modified={lm}", flush=True)
                    if last_titles:
                        print("  recent:", " / ".join(last_titles[:3])[:160], flush=True)
                    else:
                        print("  recent: (no new entries this cycle)", flush=True)
            else:
                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] 304 Not Modified(変更なし)", flush=True)
            if lm:
                last_modified = lm
        except Exception as ex:
            print("error:", ex, flush=True)
        loops += 1
        if loops >= MAX_LOOPS:
            print("done (MAX_LOOPS reached)", flush=True)
            break
        time.sleep(SLEEP_SEC)

loop()

に変更して実行する

ログは「フィード取得は成功・“いま津波電文はない”・初回なので new=33(初回は未既知IDがないため全部新規扱い)」という正常動作

最近流れているのが降灰予報(火山)なので、[TSUNAMI TRIGGER] は出ない

無駄なXML取得を減らして津波だけを見る・静岡沿岸だけ通知・既読IDを永続化の3点を足す

# 津波専用・Notebook検証向け(津波以外はスキップ、静岡コードで絞り、既読IDを永続化)
import time, json, os
import requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

# --- 静岡沿岸の津波予報区コード(代表例):必要に応じて追加 ---
SHIZUOKA_CODES = {
    "504",  # 遠州灘
    "505",  # 駿河湾
    # 必要なら追加(JMAコード表に準拠)
}

last_modified = None
SEEN_FILE = "seen_ids.json"
seen_ids = set(json.load(open(SEEN_FILE)) if os.path.exists(SEEN_FILE) else [])

MAX_LOOPS = 3
SLEEP_SEC = 60
DEBUG = True

def save_seen():
    try:
        with open(SEEN_FILE, "w") as f:
            json.dump(list(seen_ids), f)
    except Exception:
        pass

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)
    head = root.find(".//jmx_ib:Head", NAMESP)

    rec = {
        "title":     head.findtext("jmx_ib:Title", default="", namespaces=NAMESP),
        "report_dt": head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP),
        "info_kind": head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP),
        "info_type": head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP),
        "headline":  head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP),
        "kinds": [],
        "areas": [],
        "url": url,
    }

    # 津波領域(情報タイプ=津波予報領域表現)
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        k = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if k: rec["kinds"].append(k)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            rec["areas"].append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    rec["kinds"] = list(set(rec["kinds"]))
    return rec

def is_tsunami_trigger(rec):
    pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in pool:
        return False
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in pool for k in kw): 
        return True
    return any(k in rec.get("kinds", []) for k in kw)

def is_target_area(rec):
    if not rec["areas"]:
        # 領域表現が無い電文は地域未特定なので一旦通す or 無視するかは方針次第
        return True
    return any(a.get("code") in SHIZUOKA_CODES for a in rec["areas"])

def loop():
    global last_modified
    loops = 0
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)

                # ★ フィード段階で「津波」を含むエントリだけに早期フィルタ
                tsunami_entries = [e for e in d.entries 
                                   if "津波" in (e.title or "") or "tsunami" in (e.title or "").lower()]

                new_cnt = 0
                for e in tsunami_entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id); new_cnt += 1
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec) and is_target_area(rec):
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"],
                              rec["kinds"][:3], "areas=", len(rec["areas"]), flush=True)

                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] entries={len(d.entries)} tsunami_only={len(tsunami_entries)} new={new_cnt} last_modified={lm}", flush=True)

                save_seen()
            else:
                if DEBUG:
                    print(f"[{time.strftime('%H:%M:%S')}] 304 Not Modified(変更なし)", flush=True)

            if lm:
                last_modified = lm

        except Exception as ex:
            print("error:", ex, flush=True)

        loops += 1
        if loops >= MAX_LOOPS:
            print("done (MAX_LOOPS reached)", flush=True)
            break
        time.sleep(SLEEP_SEC)

loop()

で実行

[06:25:13] entries=33 tsunami_only=0 new=0 last_modified=Fri, 07 Nov 2025 20:01:38 GMT
[06:26:13] 304 Not Modified(変更なし)
[06:27:14] 304 Not Modified(変更なし)
done (MAX_LOOPS reached)

となる

tsunami_only=0 は「今回のフィードのエントリータイトルに“津波”が一件も無い」という意味で、現状は降灰や地震系だけ=津波電文が出ていない状態です。
ただし——タイトルに“津波”が無くても本体XML側が津波情報というケースがあるため、実運用では「タイトルで早期フィルタ」は外して、直近N件だけ中身を開いて判定するのが堅い

安全版(タイトルで絞らず、直近N件だけ本文チェック/extra.xml併走)
直近 MAX_FETCH=10 件だけ本文XMLを開いて InfoKind/Headline から津波判定
eqvol.xml と extra.xml を同じロジックで監視
既読IDはフィードごとに保存

# 津波検知(タイトルで絞らず本文チェック)+ extra.xml 併走 / Notebook用
import time, json, os, requests, feedparser, xml.etree.ElementTree as ET

FEEDS = {
    "eqvol": "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml",
    "extra": "https://www.data.jma.go.jp/developer/xml/feed/extra.xml",
}

NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

# 静岡の津波予報区コード(例)
SHIZUOKA_CODES = {"504", "505"}  # 遠州灘/駿河湾。必要なら追加

MAX_FETCH = 10         # 各サイクルで本文を開く最大件数
MAX_LOOPS = 3          # Notebook用:必ず終わる
SLEEP_SEC = 60
DEBUG = True

state = {name: {"last_modified": None, "seen": set()} for name in FEEDS}

def _seen_path(feed_name): return f"seen_ids_{feed_name}.json"

# 既読IDのロード
for name in FEEDS:
    p = _seen_path(name)
    if os.path.exists(p):
        try:
            state[name]["seen"] = set(json.load(open(p)))
        except Exception:
            pass

def save_seen(feed_name):
    try:
        json.dump(list(state[feed_name]["seen"]), open(_seen_path(feed_name), "w"))
    except Exception:
        pass

def fetch_feed(feed_name, url):
    headers = {}
    lm = state[feed_name]["last_modified"]
    if lm: headers["If-Modified-Since"] = lm
    r = requests.get(url, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)
    head = root.find(".//jmx_ib:Head", NAMESP)

    rec = {
        "title": head.findtext("jmx_ib:Title", default="", namespaces=NAMESP),
        "report_dt": head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP),
        "info_kind": head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP),
        "info_type": head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP),
        "headline": head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP),
        "kinds": [], "areas": [], "url": url,
    }
    # 津波予報領域
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        k = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if k: rec["kinds"].append(k)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            rec["areas"].append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    rec["kinds"] = list(set(rec["kinds"]))
    return rec

def is_tsunami_trigger(rec):
    pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in pool:  # 見出し系に「津波」が登場しないものは通常スキップ
        return False
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in pool for k in kw): return True
    return any(k in rec.get("kinds", []) for k in kw)

def is_target_area(rec):
    # 領域未記載なら方針次第。ここでは一旦通す。
    if not rec["areas"]: return True
    return any(a.get("code") in SHIZUOKA_CODES for a in rec["areas"])

def watch_cycle():
    total_new = 0
    for name, url in FEEDS.items():
        try:
            feed_text, lm = fetch_feed(name, url)
            if not feed_text:
                if DEBUG: print(f"[{name}] 304 Not Modified")
                continue

            d = feedparser.parse(feed_text)
            # まだ見てない最新順から MAX_FETCH 件だけ本文チェック
            new_entries = [e for e in d.entries if e.id not in state[name]["seen"]][:MAX_FETCH]
            total_new += len(new_entries)

            for e in new_entries:
                state[name]["seen"].add(e.id)
                rec = parse_entry_xml(e.link)
                if is_tsunami_trigger(rec) and is_target_area(rec):
                    print("[TSUNAMI TRIGGER]", name, rec["report_dt"], rec["headline"],
                          rec["kinds"][:3], "areas=", len(rec["areas"]), flush=True)

            if DEBUG:
                print(f"[{name}] entries={len(d.entries)} checked={len(new_entries)} last_modified={lm}")

            save_seen(name)
            if lm: state[name]["last_modified"] = lm

        except Exception as ex:
            print(f"[{name}] error:", ex, flush=True)
    return total_new

# ===== Notebook用メインループ =====
loops = 0
while loops < MAX_LOOPS:
    _ = watch_cycle()
    loops += 1
    if loops >= MAX_LOOPS:
        print("done (MAX_LOOPS reached)")
        break
    time.sleep(SLEEP_SEC)

で実行

タイトルに“津波”が無いせいで tsunami_only=0 になる問題を回避(本文で判定)
フィード2本(eqvol/extra)を同ロジックで監視
既読の永続化により、Notebookを再実行しても初回から「全部new」にならない
直近 MAX_FETCH のみ本文取得で無駄アクセス抑制

結果は

[eqvol] entries=33 checked=10 last_modified=Fri, 07 Nov 2025 20:01:38 GMT
[extra] entries=106 checked=10 last_modified=Fri, 07 Nov 2025 21:24:15 GMT
[eqvol] 304 Not Modified
[extra] 304 Not Modified

ログの意味
* [eqvol] entries=33 checked=10:フィードに33件、今回は未読の先頭10件だけ本文を精査
* [extra] entries=106 checked=10:同上
* 304 Not Modified:前回から更新なし=正常
いまは津波電文が無いのでトリガーなし=想定どおり

とりあえず動作はしそうなので
次は通知の方法を考える

津波警報などの情報を検知できるようにしたい

EEW/揺れ検知トリガーの作成をしようと思ったが
これでGPTで行うと実際に揺れを検知するような仕組みを提案される

そうではなく
前回の津波警報の時にはロシアのカムチャッカ半島沖の地震で
津波警報が出ていたので、そういった情報を検知できるようにしたい

また提案の中で
*JMA防災情報XMLの実パーサ(地震/津波の要素抽出)*
とあったが、これがどのようなものかを知りたい

回答は
「端末の揺れを検知する」のではなく、気象庁の公式配信(JMA防災情報XML)から“津波関連の電文”を監視してトリガーを発火させたい

JMA防災情報XMLの“実パーサ”とは

気象庁は“PULL型”のAtomフィードを公開しています。代表的には:
* 地震・火山用(高頻度):eqvol.xml(毎分更新)
ここから「津波警報・注意報・予報」「津波情報」「震源・震度」などの電文へのリンクを取得します。 data.e-gov.go.jp
* 随時情報(高頻度):extra.xml(毎分更新)
広義の警報・注意報系の新着が流れます。用途に応じて併用。 data.e-gov.go.jp
配信の概要・取得方法は公式「防災情報XML(PULL型)」にまとまっています(Pushは終了済み)。 xml.kishou.go.jp+1
例:カムチャッカ半島沖で地震発生 → 日本沿岸に影響が見込まれる場合、**「津波警報・注意報・予報」「津波情報」**の電文が発表され、上記フィードに載ります。発表・解除・更新も電文として流れます。

「実パーサ」とは何か?
ここでの“実パーサ”は、JMA XMLの仕様に沿って、関心の要素(見出し、発表種別、対象地域、予想津波高、発表時刻、取消/解除、更新理由など)を確実に抽出する処理を指します。
JMA XMLは複数の名前空間・電文種別で構成され、代表的な抽出ポイントは以下です:

* ヘッダ系(共通)
Head/Title, Head/ReportDateTime, Head/TargetDateTime, Head/Headline/Text, Head/InfoKind, Head/InfoType(発表/訂正/取消 など) xml.kishou.go.jp
* 津波系
Body/Tsunami(jmx_tsunami:名前空間)やInformation[@type=”津波予報領域表現”]配下の
Item/Kind/Name(大津波警報/津波警報/津波注意報/津波予報 等)と
Areas/Area/Name, Code(津波予報区とコード)を列挙。更新・解除時もここに反映。 xml.kishou.go.jp+1
* 地震系(参考)
震源・震度情報やEEW(公開範囲に制約あり)等のjmx_seis:要素。津波判断の参考に使うことがあります(ただし“揺れ検知”はしない)。 data.e-gov.go.jp
JMAは詳細な解説・例示を公開しています(2012/2013年版などが基本仕様をカバー)。パース時の要素階層・コード表はここを参照するのが安全です。 xml.kishou.go.jp

運用のコツ(トリガー条件)
* 監視間隔:毎分。If-Modified-Since等で負荷と無駄取得を抑制。 Qiita
* 一次トリガー:新規・更新電文のItem/Kind/Nameに
「大津波警報 / 津波警報 / 津波注意報 / 津波予報」が含まれる、かつ Areas/Area/Codeが対象地域に一致。 xml.kishou.go.jp
* 抑止・更新:Head/InfoTypeが「取消」「訂正」「追加」などの場合は状態遷移として扱う。
* 表示:JMAマップで最新状況を補助表示(UIリンク)。 気象庁

仕上げ:あなたのシステムへの組み込みポイント
1. 地域フィルタ:静岡沿岸など対象の津波予報区コードだけに絞る。
2. 状態管理:InfoTypeが取消のときは通知解除、訂正/追加は更新扱い。 xml.kishou.go.jp
3. 可視化:ダッシュボードにJMA公式の最新表示もリンクで添える(家族向けに分かりやすい)。 気象庁
4. 安全側設計:不明時は強めの文言で注意喚起(あなたの既存“安全側”方針に合わせる)。
5. ログ/再現性:電文URL・ReportDateTime・InfoKindVersion等を保存(事後検証が楽)。 気象庁

必要なら、対象予報区のコード表や「解除検知」「詳細(予想高、到達予想時刻が文面に含まれる場合の抽出)」まで組み込んだ“完成版パーサ”に拡張します。
この方向なら“揺れ検知ハードウェア”は不要で、カムチャッカ沖など国外震源でも日本向け津波電文を確実にトリガーできます。

生成されたコードを実行

import time
import requests
import feedparser
import xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

last_modified = None
seen_ids = set()

def fetch_feed():
    headers = {}
    if last_modified:
        headers["If-Modified-Since"] = last_modified
    r = requests.get(EQVOL_FEED, headers=headers, timeout=10)
    if r.status_code == 304:
        return None, None
    r.raise_for_status()
    return r.text, r.headers.get("Last-Modified")

def parse_entry_xml(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波関連の抽出(領域と種別)
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })

    return {
        "title": title,
        "report_dt": report_dt,
        "info_kind": info_kind,
        "info_type": info_type,
        "headline": headline,
        "kinds": list(set(kinds)),
        "areas": areas,
        "url": url,
    }

def is_tsunami_trigger(rec):
    # 種別に“津波警報・注意報・予報/津波情報”などが含まれるものを対象
    if "津波" not in (rec["info_kind"] or rec["title"] or rec["headline"]):
        return False
    # 警報/注意報/予報の明示があれば強トリガー
    keywords = ["大津波警報", "津波警報", "津波注意報", "津波予報", "津波情報"]
    if any(k in (rec["headline"] or "") for k in keywords):
        return True
    # 後方互換(kindsに入っていればOK)
    return any(k in keywords for k in rec["kinds"])

def loop():
    global last_modified
    while True:
        try:
            feed_text, lm = fetch_feed()
            if feed_text:
                d = feedparser.parse(feed_text)
                # 新しい順に処理
                for e in d.entries:
                    if e.id in seen_ids:
                        continue
                    seen_ids.add(e.id)
                    rec = parse_entry_xml(e.link)
                    if is_tsunami_trigger(rec):
                        # === ここであなたの通知/保存パイプラインへ ===
                        print("[TSUNAMI TRIGGER]", rec["report_dt"], rec["headline"], rec["kinds"][:3], "areas=", len(rec["areas"]))
            if lm:
                last_modified = lm
        except Exception as ex:
            print("error:", ex)
        time.sleep(60)

if __name__ == "__main__":
    loop()

しかしこれだと動作しない
実際には処理が長すぎるのが原因だったようだ

以下の内容を notebook を再起動して実行したら

原因はだいたい次のどれかです:
1. いま津波系の電文が出ていない → フィルタに引っかからず1行もprintされない
2. eqvol.xmlは取れているが、電文のInfoKindやHeadlineに「津波」が無くて弾かれている
3. 取得自体が失敗(304やネットワーク)しているが、分かるログを出していない
4. notebookで無限ループを回していて、動いてはいるが“何もヒットしないから何も出ない”
まずは「取れている?パースできてる?何が弾かれてる?」を確認する**ドライラン(1回だけ走る)**を入れると状況が見えます。下のテスト版を一度そのまま実行してください(無限ループなし・デバッグprint多め)。

なので

import requests, feedparser, xml.etree.ElementTree as ET

EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"
NAMESP = {
    "jmx": "http://xml.kishou.go.jp/jmaxml1/",
    "jmx_ib": "http://xml.kishou.go.jp/jmaxml1/informationBasis1/",
    "jmx_eb": "http://xml.kishou.go.jp/jmaxml1/elementBasis1/",
    "jmx_seis": "http://xml.kishou.go.jp/jmaxml1/body/seismology1/",
    "jmx_tsunami": "http://xml.kishou.go.jp/jmaxml1/body/tsunami1/",
}

def parse_entry_xml(url):
    r = requests.get(url, timeout=15)
    r.raise_for_status()
    root = ET.fromstring(r.content)

    head = root.find(".//jmx_ib:Head", NAMESP)
    title = head.findtext("jmx_ib:Title", default="", namespaces=NAMESP)
    report_dt = head.findtext("jmx_ib:ReportDateTime", default="", namespaces=NAMESP)
    info_kind = head.findtext("jmx_ib:InfoKind", default="", namespaces=NAMESP)
    info_type = head.findtext("jmx_ib:InfoType", default="", namespaces=NAMESP)
    headline = head.findtext("jmx_ib:Headline/jmx_eb:Text", default="", namespaces=NAMESP)

    # 津波領域の抽出
    areas, kinds = [], []
    for item in root.findall(".//jmx_eb:Body//jmx_eb:Information[@type='津波予報領域表現']/jmx_eb:Item", NAMESP):
        kind_name = item.findtext("jmx_eb:Kind/jmx_eb:Name", default="", namespaces=NAMESP)
        if kind_name:
            kinds.append(kind_name)
        for a in item.findall("jmx_eb:Areas/jmx_eb:Area", NAMESP):
            areas.append({
                "name": a.findtext("jmx_eb:Name", default="", namespaces=NAMESP),
                "code": a.findtext("jmx_eb:Code", default="", namespaces=NAMESP),
            })
    return {
        "title": title, "report_dt": report_dt,
        "info_kind": info_kind, "info_type": info_type,
        "headline": headline, "kinds": list(set(kinds)),
        "areas": areas, "url": url
    }

def is_tsunami_trigger(rec):
    # “津波”を広めに判定(タイトル・見出し・InfoKind)
    text_pool = " ".join([rec.get("title",""), rec.get("headline",""), rec.get("info_kind","")])
    if "津波" not in text_pool:
        return False
    # 強ワード
    kw = ["大津波警報","津波警報","津波注意報","津波予報","津波情報"]
    if any(k in text_pool for k in kw):
        return True
    # あるいはKindsに入っていればOK
    return any(k in rec.get("kinds", []) for k in kw)

# ---- ドライラン:feed取得→最新5件だけ詳細をなめて出力 ----
r = requests.get(EQVOL_FEED, timeout=15)
print("feed status:", r.status_code, "bytes:", len(r.content))
d = feedparser.parse(r.text)
print("entries:", len(d.entries))
for e in d.entries[:5]:
    print("ENTRY:", e.get("published",""), e.get("title",""))
    try:
        rec = parse_entry_xml(e.link)
        print("  InfoKind:", rec["info_kind"], "| InfoType:", rec["info_type"])
        print("  Title   :", rec["title"])
        print("  Headline:", (rec["headline"] or "")[:120])
        print("  kinds   :", rec["kinds"], "| areas:", len(rec["areas"]))
        print("  -> TRIGGER?", is_tsunami_trigger(rec))
    except Exception as ex:
        print("  ERROR parsing:", ex)

を実行

feed status: 200 bytes: 27328
entries: 37
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 薩摩硫黄島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 諏訪之瀬島 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 霧島山(新燃岳) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 浅間山 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False
ENTRY:  降灰予報(定時)
  InfoKind: 降灰予報 | InfoType: 発表
  Title   : 火山名 草津白根山(白根山(湯釜付近)) 降灰予報(定時)
  Headline: 
  kinds   : [] | areas: 0
  -> TRIGGER? False

もしこれが

entries=0 → フィード取得が怪しい(ネットワーク・一時ブロック)。requests.get(EQVOL_FEED).text[:2000]をprintして中身を確認

entries>0かつInfoKindやTitleに「津波」が無い → 正常(いま津波電文が無いだけ)

parse_entry_xmlでエラー → XML名前空間/パスが電文種別で違う可能性。print(rec[“url”])で該当XMLを保存して中身を見るのが早い

というように原因を理解できる

補足(実運用へ進めるとき)
静岡沿岸の津波予報区コードでareasをフィルタ(対象外は通知しない)
InfoType == “取消”で状態を解除に遷移
併せてextra.xmlも監視すると取りこぼしが減ります(仕様差し込みは後でOK)
ノートブックではなく常駐スクリプト/systemdで運用し、ログをローテーション

他にやったことは
1. 通信・ネットワークの確認
まず、最初の requests.get(EQVOL_FEED) で通信が成功しているかを確かめる

import requests
EQVOL_FEED = "https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"

r = requests.get(EQVOL_FEED, timeout=15)
print("status:", r.status_code)
print(r.text[:1000])  # 最初の1000文字だけ表示

✅ status: 200 なら通信成功。
❌ もし 403 や Timeout が出た場合 → 通信がブロックされている(VPN・プロキシ・ファイアウォールなどの影響)。

結果は

status: 200
<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom" lang="ja">
  <title>高頻度(地震火山)</title>
  <subtitle>JMAXML publishing feed</subtitle>
  <updated>2025-11-06T05:01:14+09:00</updated>
  <id>https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml#short_1762372874</id>
  <link rel="related" href="https://www.jma.go.jp/"/>
  <link rel="self" href="https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml"/>
  <link rel="hub" href="http://alert-hub.appspot.com/"/>
  <rights type="html"><![CDATA[
<a href="https://www.jma.go.jp/jma/kishou/info/coment.html">利用規約</a>,
<a href="https://www.jma.go.jp/jma/en/copyright.html">Terms of Use</a>
]]></rights>
  <entry>
    <title>降灰予報(定時)</title>
    <id>https://www.data.jma.go.jp/developer/xml/data/20251105200049_0_VFVO53_010000.xml</id>
    <updated>2025-11-05T20:00:00Z</updated>
    <author>
      <name>気象庁</name>
    </author>
    <link type="application/xml" href="https://ww

2. フィード構造が読めているか確認
feedparser がXMLを正しく認識していないと、entries が空になる

import feedparser

feed = feedparser.parse("https://www.data.jma.go.jp/developer/xml/feed/eqvol.xml")
print("entries:", len(feed.entries))
for e in feed.entries[:5]:
    print(e.title, e.link)

の結果は

カーネル・出力の健全性チェック
これで何も出ないなら、セルが実行されていない/カーネルが固まっている/出力パネルが壊れている可能性が濃厚

import sys, time, os, pathlib, subprocess

print("STEP A1: print works?", flush=True)
sys.stdout.write("STEP A2: stdout.write works?\n"); sys.stdout.flush()
sys.stderr.write("STEP A3: stderr works?\n"); sys.stderr.flush()

for i in range(3):
    print(f"tick {i}", flush=True)
    time.sleep(0.5)

# ファイル出力で実行確認(UI出力が壊れていても痕跡が残る)
pathlib.Path("notebook_alive.txt").write_text("alive\n")
print("STEP A4: wrote notebook_alive.txt", flush=True)

# サブプロセスの標準出力も確認
ret = subprocess.run(["echo", "STEP A5: subprocess echo"], capture_output=True, text=True)
print(ret.stdout, end="")
print("STEP A DONE", flush=True)

結果は

STEP A1: print works?
STEP A2: stdout.write works?
STEP A3: stderr works?
tick 0
tick 1
tick 2
STEP A4: wrote notebook_alive.txt
STEP A5: subprocess echo
STEP A DONE

処理そのものが終わらないのが原因なので原因を探す

まずは v0(当日実用版) を“そのまま挿すだけ”で動く形で用意

まずは v0(当日実用版) を“そのまま挿すだけ”で動く形で用意

Epicenter(震源の lat,lon)を渡すと、深海近似の伝播速度で到達時間を安全側に推定し、
route_latest に arrival_time_min / height_class / confidence を追記して上書き保存、
同時に sim_runs に実行ログを追記

既存の変数:GPKG, coast(海岸線), home_wgs(自宅), route_latest は前工程で作成済みを想定
追加依存:pyproj(地球楕円体距離計算用)

# === 津波「到達時間・高さ」簡易推定 (v0) ===
# 目的:震源→自宅最寄り海岸線までの直線距離 / 深海近似速度で到達時間を安全側に推定
# 出力:arrival_time_min, height_class, confidence を route_latest に追記(上書き保存)
#       sim_runs テーブルに実行ログを追記
from datetime import datetime
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from shapely.ops import nearest_points
from pyproj import Geod

GEOD = Geod(ellps="WGS84")

def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
    """
    自宅に最も近い海岸線上の点を返す(WGS84)
    - coast_gdf: 4326 想定(LineString/MultiLineString)
    - home_latlon: (lat,lon)
    """
    if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt = Point(home_latlon[1], home_latlon[0])  # (lon,lat)
    # 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
    coast_wgs["__d__"] = coast_wgs.distance(home_pt)
    geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
    n1, n2 = nearest_points(geom, home_pt)
    return n1  # 海岸線側の最近点(Point lon,lat)

def _geodesic_km(p1_latlon: tuple[float,float], p2_latlon: tuple[float,float]) -> float:
    """
    WGS84 楕円体の測地距離[km]
    """
    az12, az21, m = GEOD.inv(p1_latlon[1], p1_latlon[0], p2_latlon[1], p2_latlon[0])
    return m / 1000.0

def estimate_arrival_minutes_v0(
    epicenter_latlon: tuple[float,float],
    coast_gdf: gpd.GeoDataFrame,
    home_latlon: tuple[float,float],
    h_rep_m: float = 3000.0,       # 代表水深[m](深海)
    safety_slow_factor: float = 1.2  # 速度を遅めに見積もる倍率(到達時間は長くなる)
) -> float:
    """
    深海近似の伝播速度 c ≈ √(g*h) を用い、震源→自宅最寄り海岸までの直線距離から到達時間[分]を推定(安全側)。
    """
    g = 9.81
    c_ms = (g * h_rep_m) ** 0.5              # m/s
    c_ms_safe = c_ms / safety_slow_factor    # 安全側に遅くする
    coast_pt = _nearest_coast_point_to_home(coast_gdf, home_latlon)  # lon,lat
    coast_latlon = (coast_pt.y, coast_pt.x)

    D_km = _geodesic_km(epicenter_latlon, coast_latlon)  # 震源→自宅最寄り海岸
    t_min = (D_km * 1000.0) / c_ms_safe / 60.0
    return float(t_min)

def update_route_latest_with_arrival(
    GPKG: str,
    arrival_time_min: float,
    height_class: str = "unknown",
    confidence: str = "low",
):
    """
    route_latest に arrival_time_min / height_class / confidence を追記して上書き保存。
    route_latest が未作成の場合は何もしない(例外を出さず return)。
    """
    from fiona import listlayers
    if "route_latest" not in listlayers(GPKG):
        print("route_latest レイヤが未作成のため、到達時間の追記はスキップしました。")
        return
    rl = gpd.read_file(GPKG, layer="route_latest")
    # 既存列がなければ追加
    for col in ["arrival_time_min", "height_class", "confidence"]:
        if col not in rl.columns:
            rl[col] = None
    rl.loc[:, "arrival_time_min"] = float(arrival_time_min)
    rl.loc[:, "height_class"] = str(height_class)
    rl.loc[:, "confidence"] = str(confidence)
    rl.to_file(
        GPKG, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}
    )
    print(f"✓ route_latest を更新: 到達 {arrival_time_min:.1f} 分 / 高さ {height_class} / 信頼度 {confidence}")

def append_sim_run(
    GPKG: str,
    epicenter_latlon: tuple[float,float],
    home_latlon: tuple[float,float],
    arrival_time_min: float,
    h_rep_m: float,
    safety_slow_factor: float,
    height_class: str,
    confidence: str,
    source: str = "v0_deep_sea_approx"
):
    """
    sim_runs に今回の推定メタを 1 行追記。
    既存スキーマ差があればマージ→上書きで吸収。
    """
    from fiona import listlayers
    rec = gpd.GeoDataFrame({
        "run_id": [datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        "epicenter_lat": [epicenter_latlon[0]],
        "epicenter_lon": [epicenter_latlon[1]],
        "home_lat": [home_latlon[0]],
        "home_lon": [home_latlon[1]],
        "arrival_time_min": [float(arrival_time_min)],
        "h_rep_m": [float(h_rep_m)],
        "safety_slow_factor": [float(safety_slow_factor)],
        "height_class": [str(height_class)],
        "confidence": [str(confidence)],
        "source": ,
    }, geometry=[Point(epicenter_latlon[1], epicenter_latlon[0])], crs="EPSG:4326")

    try:
        rec.to_file(
            GPKG, layer="sim_runs",
            driver="GPKG", engine="pyogrio",
            append=True
        )
    except Exception:
        # スキーマ差などのフォールバック(既存行を読み出してマージ→上書き)
        if "sim_runs" in listlayers(GPKG):
            old = gpd.read_file(GPKG, layer="sim_runs")
            cols = sorted(set(old.columns) | set(rec.columns))
            merged = gpd.GeoDataFrame(pd.concat([old.reindex(columns=cols),
                                                 rec.reindex(columns=cols)], ignore_index=True), crs=4326)
            merged.to_file(
                GPKG, layer="sim_runs",
                driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"}
            )
        else:
            rec.to_file(GPKG, layer="sim_runs", driver="GPKG", engine="pyogrio")
    print("✓ sim_runs に追記しました。")

次にテスト

# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5)  # (lat, lon)

# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
    epicenter_latlon=epicenter,
    coast_gdf=coast,            # 既に読み込み済みの海岸線(EPSG:4326推奨)
    home_latlon=home_wgs,
    h_rep_m=3000.0,             # 代表水深:3,000m(必要に応じ調整)
    safety_slow_factor=1.2      # 速度を 1/1.2 に落として安全側
)

# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"

# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
    GPKG=GPKG,
    arrival_time_min=arr_min,
    height_class=height_class,
    confidence=confidence
)

# sim_runs にログ追記
append_sim_run(
    GPKG=GPKG,
    epicenter_latlon=epicenter,
    home_latlon=home_wgs,
    arrival_time_min=arr_min,
    h_rep_m=3000.0,
    safety_slow_factor=1.2,
    height_class=height_class,
    confidence=confidence
)

を実行すると



✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low
✓ sim_runs に追記しました。
/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/3983269974.py:27: UserWarning: Geometry is in a geographic CRS. Results from 'distance' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation.

  coast_wgs["__d__"] = coast_wgs.distance(home_pt)

となる

原因は
警告は「経度緯度(EPSG:4326)のまま .distance() を使っている」ため
距離計算は投影座標系(m単位)で行うようにすれば解決するらしい

最近点の計算を UTM で実施 → WGS84 に戻して返すようにする

def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
    """
    自宅に最も近い海岸線上の点を返す(WGS84)
    - coast_gdf: 4326 想定(LineString/MultiLineString)
    - home_latlon: (lat,lon)
    """
    if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt = Point(home_latlon[1], home_latlon[0])  # (lon,lat)
    # 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
    coast_wgs["__d__"] = coast_wgs.distance(home_pt)
    geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
    n1, n2 = nearest_points(geom, home_pt)
    return n1  # 海岸線側の最近点(Point lon,lat)

の部分を

def _nearest_coast_point_to_home(
    coast_gdf: gpd.GeoDataFrame,
    home_latlon: tuple[float, float],
    metric_crs: str = "EPSG:32654"  # 静岡周辺 UTM (m)
) -> Point:
    """
    自宅に最も近い海岸線上の点(WGS84)を返す。
    距離計算は投影座標系で実施して精度を担保。
    """
    # 入力を WGS84 とみなして正規化
    if coast_gdf.crs is None:
        coast_wgs = coast_gdf.set_crs(4326)
    elif coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt_wgs = Point(home_latlon[1], home_latlon[0])  # (lon, lat)

    # 距離計算は m 単位の投影座標で
    coast_m = coast_wgs.to_crs(metric_crs).copy()
    home_pt_m = gpd.GeoSeries([home_pt_wgs], crs=4326).to_crs(metric_crs).iloc[0]

    # まず最も近いジオメトリを特定(sindex があれば速い)
    try:
        # GeoPandas ≥0.12 なら sindex.nearest が使える環境も
        idx = next(coast_m.sindex.nearest(home_pt_m.bounds))[0]
    except Exception:
        # フォールバック:距離最小
        coast_m["__d__"] = coast_m.geometry.distance(home_pt_m)
        idx = coast_m["__d__"].idxmin()

    geom_m = coast_m.loc[idx, "geometry"]
    p_on_coast_m, _ = nearest_points(geom_m, home_pt_m)  # 投影座標で最近点

    # WGS84 に戻して返す
    p_on_coast_wgs = gpd.GeoSeries([p_on_coast_m], crs=metric_crs).to_crs(4326).iloc[0]
    return p_on_coast_wgs

へ変更

これで

# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5)  # (lat, lon)

# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
    epicenter_latlon=epicenter,
    coast_gdf=coast,            # 既に読み込み済みの海岸線(EPSG:4326推奨)
    home_latlon=home_wgs,
    h_rep_m=3000.0,             # 代表水深:3,000m(必要に応じ調整)
    safety_slow_factor=1.2      # 速度を 1/1.2 に落として安全側
)

# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"

# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
    GPKG=GPKG,
    arrival_time_min=arr_min,
    height_class=height_class,
    confidence=confidence
)

# sim_runs にログ追記
append_sim_run(
    GPKG=GPKG,
    epicenter_latlon=epicenter,
    home_latlon=home_wgs,
    arrival_time_min=arr_min,
    h_rep_m=3000.0,
    safety_slow_factor=1.2,
    height_class=height_class,
    confidence=confidence
)

を再度実行すると

✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low
✓ sim_runs に追記しました。

となる

ここまではできたので
推定到達時間に連動して等時間到達圏(Y分)を自動生成 → 安全エリアを重ねて表示

# === 推定到達時間に連動した等時間到達圏の自動生成 ===
import geopandas as gpd

# 1) route_latest から arrival_time_min を取得(安全側 70% & 上限20分)
rl = gpd.read_file(GPKG, layer="route_latest")
arr = float(rl["arrival_time_min"].iloc[0]) if "arrival_time_min" in rl else 15.0
Y = max(5, min(20, int(arr * 0.7)))   # 5〜20分にクリップ

# 2) 出発ノード & 重み(既存関数)
orig = nearest_node_from_wgs(G2, lat=home_wgs[0], lon=home_wgs[1])
add_edge_time_weight(G2, speed_kmh=4.8)

# 3) 到達圏→非浸水差し引き
iso_gdf, reach_edges_4326 = isochrone_polygon(
    G2, origin_node=orig, cutoff_min=Y,
    proj_crs=METRIC, buffer_m=35.0, weight_attr="time_min"
)
safe_area_now = subtract_flood(iso_gdf, flood_union, proj_crs=METRIC)

# 4) 上書き保存(pyogrio + OVERWRITE)
iso_layer = f"isochrone_walk_{Y}min"
for gdf, layer in [(iso_gdf, iso_layer), (safe_area_now, "safe_area_now")]:
    gdf.to_file(GPKG, layer=layer, driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"})

# 5) Folium へ重ねて保存
folium.GeoJson(iso_gdf.__geo_interface__, name=iso_layer,
               style_function=lambda x: {"fillOpacity":0.20, "color":"black", "weight":1}).add_to(m)
folium.GeoJson(safe_area_now.__geo_interface__, name="安全エリア(到達圏∩非浸水)",
               style_function=lambda x: {"fillOpacity":0.30, "weight":0}).add_to(m)
m.save("outputs/evac.html")
print(f"✓ {iso_layer} / safe_area_now を更新し、outputs/evac.html を保存しました。")

✓ isochrone_walk_15min / safe_area_now を更新し、outputs/evac.html を保存しました。

となる

震源→到達時間v0→(Y分)等時間到達圏→非浸水エリア→地図」が自動でひとまとめになるので
次は
EEW/揺れ検知トリガーからこの一連を叩く CLI に繋げる
そのためにまずは
EEW/揺れ検知トリガーの作成をする

時間到達圏(isochrone)生成関数から作成

時間到達圏(isochrone)生成関数から作成

G2(浸水回避済みグラフ)と自宅座標から 徒歩Y分の到達圏ポリゴン を作る

到達圏は「到達可能エッジを太らせて面化」する堅牢方式
(alpha-shape不要、Shapely/GeoPandas 標準で完結)

前提:G2(到達可能道路グラフ)、home_wgs、GPKG、METRIC=”EPSG:32654″ が既に定義済み

# === Isochrone 生成ユーティリティ ===
import math
import geopandas as gpd
import networkx as nx
import osmnx as ox
from shapely.ops import unary_union

def add_edge_time_weight(G, speed_kmh: float = 4.8, attr_name: str = "time_min"):
    """
    エッジに 'time_min'(分)を付与(長さ[m] / 速度[km/h]).
    既にある場合は上書き。
    """
    fac = speed_kmh * 1000 / 60.0  # m / min
    for u, v, k, data in G.edges(keys=True, data=True):
        length_m = float(data.get("length", 0.0))
        data[attr_name] = length_m / fac
    return G

def nearest_node_from_wgs(G, lat: float, lon: float):
    """WGS84の(lat,lon)から最近傍ノードIDを返す(osmnxのAPI差異に両対応)。"""
    try:
        return ox.distance.nearest_nodes(G, X=lon, Y=lat)
    except AttributeError:
        return ox.nearest_nodes(G, X=lon, Y=lat)

def isochrone_polygon(
    G,
    origin_node: int,
    cutoff_min: float,
    proj_crs: str = "EPSG:32654",
    buffer_m: float = 35.0,
    weight_attr: str = "time_min",
):
    """
    到達可能サブグラフのエッジを buffer で面化 → マルチポリゴン化。
    返り値: (isochrone_gdf[EPSG:4326], reachable_edges_gdf[EPSG:4326])

    - cutoff_min: 到達時間しきい値(分)
    - buffer_m: エッジに与えるバッファ[m](道路幅+余裕)
    """
    # dijkstra で到達ノード
    dist = nx.single_source_dijkstra_path_length(G, origin_node, cutoff=cutoff_min, weight=weight_attr)
    reachable_nodes = set(dist.keys())
    if not reachable_nodes:
        raise RuntimeError("到達ノードが空です。cutoff_min が小さすぎる可能性。")

    # 到達エッジ抽出
    nodes_gdf, edges_gdf = ox.graph_to_gdfs(G, nodes=True, edges=True)
    idx = edges_gdf.reset_index()[["u", "v", "key"]].apply(tuple, axis=1)
    mask = [(u in reachable_nodes and v in reachable_nodes) for (u, v, k) in idx]
    reach_edges = edges_gdf.loc[mask].copy()
    if reach_edges.empty:
        raise RuntimeError("到達エッジが空です。グラフが疎か cutoff_min が小さい可能性。")

    # 面化(project→buffer→union→修正→逆投影)
    reach_proj = reach_edges.to_crs(proj_crs)
    merged = unary_union(reach_proj.buffer(buffer_m).geometry.values)
    # geometry 修正(buffer(0) に相当)
    poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union

    # 出力(4326)
    iso_gdf = gpd.GeoDataFrame({"mode": ["walk"], "minutes": [float(cutoff_min)]},
                               geometry=[poly], crs=proj_crs).to_crs(4326)
    reach_edges_4326 = reach_proj.to_crs(4326)
    return iso_gdf, reach_edges_4326

def subtract_flood(iso_gdf: gpd.GeoDataFrame, flood_union_gdf: gpd.GeoDataFrame, proj_crs: str = "EPSG:32654"):
    """
    isochrone(面)から津波浸水(union)を差し引き → 「到達可能 × 非浸水」エリア。
    返り値: safe_area_now_gdf[EPSG:4326]
    """
    iso_m = iso_gdf.to_crs(proj_crs)
    fl_m  = flood_union_gdf.to_crs(proj_crs)
    safe_geom = iso_m.geometry.iloc[0].difference(fl_m.unary_union)
    safe = gpd.GeoDataFrame({"minutes":[iso_gdf["minutes"].iloc[0]]}, geometry=[safe_geom], crs=proj_crs).to_crs(4326)
    return safe

実行すると

/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/1981613481.py:59: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union
/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/1981613481.py:74: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  safe_geom = iso_m.geometry.iloc[0].difference(fl_m.unary_union)
<folium.features.GeoJson at 0x178279f10>

警告は Shapely/GeoPandas 2.x 系の非推奨によるもの
unary_union は廃止予定 → union_all() に置き換えればOK

変更すると

/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/832098648.py:59: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union
<folium.features.GeoJson at 0x17dfdf350>
    poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union

も修正する

poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).union_all()
merged = unary_union(reach_proj.buffer(buffer_m).geometry.values)

も修正

buf = reach_proj.buffer(buffer_m)
merged = gpd.GeoSeries(buf, crs=proj_crs).union_all()

に書き換える

これでエラーが消える

重みtime_minはエッジ長さ/歩行速度。最短経路と同様にDijkstraで「時間≦Y分」のノード集合を取り、両端が到達可能なエッジのみ抽出→buffer→unionで面化しています。
buffer_m は道路幅+余裕のパラメータ。市街地なら 30–50m で見た目・連続性が良好です。
非浸水差引は isochrone – flood_union。あなたの既存 flood_union(津波浸水 union)をそのまま使えます。
保存は pyogrio + OVERWRITE に統一(毎回クリーン)。

次は

# ② 津波「到達時間・高さ」簡易推定モジュール

**目的**: 「あと何分で来る?」に即答。初期は簡易/安全側、後で精緻化。
**ステップ**

* v0(当日実用版)

* **深海近似の伝播速度** (c ≈ √(g·h)) を **一定深さ代表値**で近似(安全側に遅めに見積もる)
* 震源→沿岸/自宅最寄り海岸線までの距離から到達時間を算出
* 高さは情報源(JMA速報の区分)を採用、未入手なら「不明→避難優先」のメッセージ
* v1(精緻化)

* 沿岸ごとの**水深ラスタ**(GEBCO/J-EGG等)から距離積分で可変速度伝播
* 複数反射・地形収束は扱わず「防災判断に使える保守的な上限/下限」を出す
* 出力

* `arrival_time_min`, `height_class`, `confidence` を `route_latest`/`sim_runs` に書込

使用する用途のコンセプト

使用する用途のコンセプト また震源地から津波が到達するまでの時間と津波の高さを計算し GIS情報を使い避難するエリアを算出しルート案内できるようにしたい 家族の避難経路・地域防災マップをGraphDBで整理 避難所や重要連絡先も紐付け 使い方例: 「地震が来たら最寄りの避難所はどこ?」 「非常食の賞味期限が近いものは?」 など この用途で使うならどれを推奨する?

✅ あなたの用途の要件
* 精度:震源→沿岸までの到達時間・波高推定の入力に使うので、誤差が大きすぎると危険。
* 範囲:主に日本周辺で十分。全球は必須ではない。
* 更新頻度:将来的に OSM/防災機関の最新データと連携したい。
* GraphDB連携:避難所・道路・避難経路・浸水域をノード/リレーションで管理。
* 利用イメージ:
* 「地震が来たら最寄りの避難所はどこ?」 → 避難所ノード+道路グラフ
* 「非常食の賞味期限が近いものは?」 → 在庫管理ノード
* 「津波到達までに何分ある?」 → 震源ノード→海岸ノードのコスト距離

✅ データセットごとの特徴
1. 国土数値情報「海岸線データ」
* 日本限定。行政データとして信頼性が高い。
* GML/Shapefile 提供。
* 避難計画(住民向け防災マップ)に最も適する。
➡ 推奨(基準データにする)
2. OSM Coastlines / Land polygons
* 更新頻度が高い(週次〜月次)。
* 最新の人工改変(埋立地・護岸)も比較的反映。
* 避難経路や実際の生活圏との整合性が良い。
➡ 補助データ(現況把握用)
3. Natural Earth
* 全球対応、軽量。
* 精度は粗い(1:10mでも数百 m〜km の誤差)。
* 可視化や概念実証には便利。
➡ プロトタイプ用のみ
4. GSHHG
* 全球対応、高解像度あり。
* ファイルが大きく、取り回しに手間。
* 学術研究向き。
➡ 専門シミュレーション以外では過剰

✅ 推奨構成(防災マップ+GraphDB向け)
1. 海岸線:
* 基本 → 国土数値情報(海岸線)
* 最新の現況補正 → OSM Coastlines
2. 標高/地形:
* 陸域 DEM → 国土地理院 DEM / ALOS / SRTM
* 海域 DEM → GEBCO
3. 避難所・道路:
* 避難所 → 自治体オープンデータ(CSV/GeoJSON)
* 道路 → OSMnx で抽出
4. GraphDBに格納:
* ノード:震源・海岸・浸水域・避難所・家庭・在庫
* リレーション:到達時間・避難経路・所要時間・備蓄品管理

✅ まとめ
ご提示の用途(家族避難+地域防災+GraphDB管理)なら:
* **国土数値情報(海岸線)**をメイン
* OSM Coastlinesで現況更新
* DEM(陸域+海域)+道路+避難所を組み合わせ
* GraphDBでノード/リレーションを整理して「質問に答えられる防災知識ベース」を構築

これらを考慮して次に作る機能は?

いいね、その前提なら“すぐ効く順”でこれを作るのが最短です。各項目は**完成定義(DoD)**と**最小コード断片**まで落とし込みました。

# 1) 震源→到達時間v0(安全側近似)

**目的**: 「あと何分?」に即答(まずは簡易モデル)。
**DoD**: `route_latest` に `arrival_time_min`, `height_class`, `confidence` を追記して保存。
**やること**

* 震源点(緯度経度)と自宅最寄り海岸線点の距離`D`を計算
* 代表水深`h_rep`で c≈√(g·h_rep)(保守的に**遅め**で見積り)
* `arrival_time_min = D / c / 60`(分)
* 波高は「不明なら ‘unknown’」、速報値が入手できたら `height_class` に格納

def arrival_time_min_km(d_km, h_rep=3000):  # m
    import math
    g=9.81; c=math.sqrt(g*h_rep) # m/s
    return (d_km*1000)/c/60

# 保存時に route_latest の属性へ追記
route_gdf["arrival_time_min"] = [arrival_time_min_km(D_km)]
route_gdf["height_class"] = ["unknown"]
route_gdf["confidence"] = ["low"]  # v0は保守的
```

# 2) 「安全エリア」ポリゴン生成(反転×等時間到達圏)

**目的**: 地図で「今向かうべき範囲」を1枚で示す。
**DoD**: `safe_area_now` と `isochrone_walk_min` を GPKG に**上書き**保存し、Foliumに重ねる。
**やること**

* 浸水想定を反転(`safe = study_area – tsunami_inundation`)
* 歩行速度4.8km/hで `Y = min(arrival_time*0.7, 20)` 分の等時間到達圏(graph isochrone)
* `safe_area_now = safe ∩ isochrone`

(等時間到達圏は既存グラフ`G2`から到達ノード抽出→凸包/マルチポリゴン化でOK)

# 3) 自動実行パイプライン(CLI化)

**目的**: トリガーが来たら無人で回る。
**DoD**: `python evac_pipeline.py –home 34.728,137.978 –quake 33.0,138.5 –radius 12000` で
`roads_safe`,`shelters_safe`,`route_latest`,`safe_area_now` が更新され、LINE/音声に通知。

**雛形**

# evac_pipeline.py
import argparse
def main():
    ap=argparse.ArgumentParser()
    ap.add_argument("--home", required=True)  # "lat,lon"
    ap.add_argument("--quake", required=True) # "lat,lon"
    ap.add_argument("--radius", type=int, default=12000)
    ap.add_argument("--notify", default="line,voice")
    args=ap.parse_args()
    # 1) データ更新→ 2) ルート再計算→ 3) 到達時間→ 4) 安全エリア→ 5) 保存→ 6) 通知
    # (既存コードを関数化して順番に呼ぶ)
if __name__=="__main__":
    main()

# 4) GraphDB(Neo4j) スキーマ初期化

**目的**: 「質問に答えられる」グラフへ。
**DoD**: `Schema v1` が作成され、`Route(latest=true)` が常に1本だけ存在。

**Cypher(最小)**

CREATE CONSTRAINT shelter_id IF NOT EXISTS FOR (s:Shelter) REQUIRE s.sid IS UNIQUE;
CREATE CONSTRAINT route_id   IF NOT EXISTS FOR (r:Route)   REQUIRE r.rid IS UNIQUE;

UNWIND $shelters AS s
MERGE (sh:Shelter {sid:s.sid})
  SET sh.name=s.name, sh.lat=s.lat, sh.lon=s.lon, sh.safe=s.safe;

MERGE (h:Home {hid:'home-1'}) SET h.lat=$home.lat, h.lon=$home.lon;

WITH $route AS r
MERGE (rt:Route {rid:r.rid})
  SET rt.meters=r.meters, rt.eta_walk=r.eta_walk, rt.eta_walk_safe=r.eta_walk_safe,
      rt.arrival_time_min=r.arrival_time_min, rt.height_class=r.height_class,
      rt.created_at=r.created_at, rt.latest=true;

MATCH (h:Home {hid:'home-1'}), (best:Shelter {sid:$best_sid})
MERGE (h)-[:SAFE_PATH_TO {mode:'walk'}]->(best);

MATCH (x:Route {latest:true}) WHERE x.rid <> $route.rid
SET x.latest=false;

# 5) 緊急ダッシュボード(Folium MVP)

**目的**: 家族が1枚で把握。
**DoD**: `outputs/evac.html` に以下が出る。

* `route_latest`(距離/徒歩ETA/安全係数/推定到達までX分)
* `safe_area_now`(半透明塗り)
* 「Googleマップで開く」直リンク

**追加断片**

folium.GeoJson(safe_area_now.__geo_interface__, name="安全エリア", style_function=lambda x: {
    "fillOpacity":0.25, "weight":0
}).add_to(m)

gmap_url=f"https://www.google.com/maps/dir/?api=1&destination={dest_pt.y},{dest_pt.x}&travelmode=walking"
folium.Marker([dest_pt.y, dest_pt.x],
    popup=f"<a href='{gmap_url}' target='_blank'>Googleマップで開く</a>",
    icon=folium.Icon(color='blue')).add_to(m)

# 6) 期限ウォッチャ(非常食)

**目的**: 「切れてた」を無くす。
**DoD**: SQLite(またはNeo4j)から**14日以内期限**を抽出し、朝9時にLINE/音声通知。

**SQL 例**

SELECT name, qty, expiry_date
FROM supplies
WHERE DATE(expiry_date) <= DATE('now','+14 day')
ORDER BY expiry_date ASC;

# 7) イベントトリガー(擬似EEW→本番)

**目的**: 自動化の完成。
**DoD**: ダミーの震源JSONを受けて③のCLIを起動→将来EEW/津波情報に差し替え。

**擬似Webhook**

curl -X POST http://localhost:8080/quake \
  -H "Content-Type: application/json" \
  -d '{"lat":33.0,"lon":138.5,"mag":7.6,"depth_km":20}'

## 実装順(2スプリント想定)

1. **③ CLI化** → 既存処理の関数化・引数化
2. **1) 到達時間v0** → `route_latest` 属性追加
3. **2) 安全エリア** → 反転×等時間到達圏を保存&Folium表示
4. **5) ダッシュボード** → GMapリンク・要点表示
5. **4) GraphDB投入** → Route/Shelter/関係だけ先行
6. **6) 期限ウォッチャ** → 朝9時通知
7. **7) 擬似EEWトリガー** → 自動化通し

必要なら、上記の**等時間到達圏(isochrone)生成関数**と**Neo4j投入用の具体的なPythonコード**をすぐ出します。どこから書くか一言くれたら、その部分を一気に仕上げます。

ということでまずは
上記の等時間到達圏(isochrone)生成関数から作成

次に作る機能について

次に作る機能について

これで

使用する用途のコンセプト また震源地から津波が到達するまでの時間と津波の高さを計算し GIS情報を使い避難するエリアを算出しルート案内できるようにしたい 家族の避難経路・地域防災マップをGraphDBで整理 避難所や重要連絡先も紐付け 使い方例: 「地震が来たら最寄りの避難所はどこ?」 「非常食の賞味期限が近いものは?」 など この用途で使う
また
防災・非常時情報管理
やること:
非常持ち出し品・家族の避難経路・地域防災マップをGraphDBで整理
避難所や重要連絡先も紐付け
使い方例:
「地震が来たら最寄りの避難所はどこ?」
「非常食の賞味期限が近いものは?」
メリット:
緊急時に必要な情報を瞬時に取得
をやりたい

またYoutubeライブカメラなどで現地の情報をgeminiなどでマルチモーダルを使うことで解説できるはず。
また震源地から津波が到達するまでの時間と津波の高さを計算し
QGIS情報を使い避難するエリアを算出しルート案内できるようにしたい

地震速報などが出たときや揺れを検知したタイミングで自動動作させたい

この場合、次に作る機能は?

最高です。そのコンセプトなら、まずは「自動で動く骨格」と「意思決定に直結する出力」を先に固めましょう。次に作る機能を、優先度つきの“最短実用ルート”で並べます。すぐ作れる粒度で書きました。

# ① 地震トリガー → 自動実行(最優先)

**目的**: 揺れ/速報を検知したら、対話なしで「最新の安全経路」と「要点ダイジェスト」を家族に配信。
**やること**

* トリガー入力(どれか1つでOK、複数あれば冗長化)

* EEW(緊急地震速報)/ 津波情報のフィード監視(RSS/JSON/PUSH)
* ローカル加速度センサ(Raspberry Pi + MEMS)でしきい値超え検知
* **自動パイプライン起動**:

1. 震源・規模を取得 → 2) 津波到達目安・高さの推定 → 3) 浸水想定×道路を再マスク → 4) `route_latest` を再計算 → 5) 配信
* **LINE/音声配信**(あなたの既存基盤を活用)

* VoiceVox で読み上げ(「徒歩X分、安全側Y分。最寄りは◯◯小学校」)
* LINE Bot へ地図URL(`outputs/evac.html`)+テキスト要約

> 実装ヒント:`quake_watcher.py`(常駐)→ イベント時に `evac_pipeline.py –mode=auto –home ` を起動

# ② 津波「到達時間・高さ」簡易推定モジュール

**目的**: 「あと何分で来る?」に即答。初期は簡易/安全側、後で精緻化。
**ステップ**

* v0(当日実用版)

* **深海近似の伝播速度** (c ≈ √(g·h)) を **一定深さ代表値**で近似(安全側に遅めに見積もる)
* 震源→沿岸/自宅最寄り海岸線までの距離から到達時間を算出
* 高さは情報源(JMA速報の区分)を採用、未入手なら「不明→避難優先」のメッセージ
* v1(精緻化)

* 沿岸ごとの**水深ラスタ**(GEBCO/J-EGG等)から距離積分で可変速度伝播
* 複数反射・地形収束は扱わず「防災判断に使える保守的な上限/下限」を出す
* 出力

* `arrival_time_min`, `height_class`, `confidence` を `route_latest`/`sim_runs` に書込

# ③ 「安全エリア」ポリゴン生成(QGIS/GeoPandas)

**目的**: 地図上で「今向かうべきエリア」を一目で示す。
**やること**

* 浸水想定(`tsunami_inundation`)の**反転**+標高/堤防レイヤ(任意)で**安全領域**を生成
* **到達時間で内側制限**(例:徒歩Y分の等時間到達圏 isochrone)× 安全領域 = 「到達可能×安全」領域
* GPKGレイヤ例:`safe_area_now`(上書き)、`isochrone_walk_Ymin`(上書き)

# ④ ルート再計算の自動化(既存コードの自動再実行)

**目的**: クリックなしで最新ルートが常に用意される。
**やること**

* 既に作成済みの `roads_safe` / `shelters_safe` / `route_latest` 更新処理を

* **パラメトリックCLI化**:`evac_pipeline.py –home 34.728,137.978 –radius 12000`
* **一時ファイル→置換**でアトミックに上書き(停電等でも壊れない)
* **キャッシュキー**(あなたの `rid` 実装)を、**震源メタ**・**津波メタ**も内包して無効化制御

# ⑤ 家族向け「緊急ダッシュボード」(最短 MVP)

**目的**: 家族が迷わず見る1枚。
**やること**

* `outputs/evac.html` に以下を追記(Folium簡易UIでOK)

* 現在の `route_latest`(距離/ETA/安全係数)
* 「推定到達まで X 分 / 高さ区分:◯◯(安全側)」
* 「最寄り安全避難所:◯◯」「次点:◯◯」
* **赤ボタン:『ナビ開始(Googleマップ)』**(目的地座標埋め込みURL)
* LINEにも同じ情報をテンプレで送信(短文+URL)

# ⑥ GraphDB スキーマ拡張(避難・備蓄)

**目的**: 「何を、どこに、誰が、いつまで」をグラフで統合。
**ノード**

* `Person`(家族)/ `Shelter` / `Place(POI)` / `Contact`(電話・LINE)
* `HazardZone` / `Route`(`latest`/`run_id`)/ `Supply`(非常食・水・電池)
**リレーション**
* `LIVES_AT`, `NEAR`, `SAFE_PATH_TO`, `INSIDE`, `HAS_CONTACT`, `HAS_SUPPLY`, `BEST_OPTION`
**最低限のプロパティ**
* `Route`: meters, eta_min_walk, eta_min_walk_safe, created_at, arrival_time_min, height_class
* `Supply`: name, qty, expiry_date, storage_place
**使い方例**
* 「最寄りの安全避難所」→ `MATCH (home)-[:NEAR]->(s: Shelter) WHERE s.safe=true RETURN s LIMIT 1`
* 「賞味期限が近い」→ `MATCH (sup:Supply) WHERE sup.expiry_date < date()+duration('P14D') RETURN sup` --- # ⑦ 非常食・備蓄の賞味期限ウォッチャ **目的**: 「切らしてた/期限切れ」をなくす。 **やること** * `supplies.db`(SQLite) or GraphDB に `Supply` 登録 * 毎朝 9時に期限14日以内を抽出 → LINE/音声通知 * 地震トリガー時は**即時**に「持ち出し優先リスト」を配信(例:水/ヘッドライト/救急) --- # ⑧ ライブカメラ要約(マルチモーダル) **目的**: 「今、現地はどう見える?」を数十秒で要約。 **やること** * YouTube Live の**固定リスト**(地域局・河川・港湾)を管理 * イベント時:最新サムネイル or 数秒キャプチャを**LLMに要約**(水位・人流・通行可否など) * 要約に「**不確実性/注意喚起**」を必ず付け、安全側誘導 --- # ⑨ 訓練モード & 失敗時フォールバック **目的**: いざ本番でも動く。 **やること** * `--drill` フラグで**擬似震源**を流し、全自動ルート→配信まで通す * 外部データ失敗時は**ローカルのみ**で保守的判定(津波到達は短め、浸水は広め) --- # ⑩ すぐに書き足すスクリプト断片(抜粋) **自動起動フック(例・擬似)** [shell] # systemd 例 [Unit] Description=Quake watcher After=network-online.target [Service] ExecStart=/usr/bin/python3 /opt/evac/quake_watcher.py Restart=always [Install] WantedBy=multi-user.target ``` **パイプライン入口** ```bash python evac_pipeline.py \ --home 34.728,137.978 \ --radius 12000 \ --mode auto \ --notify line,voice ``` [/shell] ## 今日からの実装順(最短2–3スプリント想定) 1. **①トリガー→自動実行**(最低限:ローカル加速度しきい値 or ダミーEEW) 2. **④ルート再計算のCLI化**(既存コードを関数化→引数で動く) 3. **⑤ダッシュボードMVP**(Foliumに要点3行+ナビボタン) 4. **②到達時間v0**(一定速度・安全側近似)+`route_latest`に追記 5. **⑦賞味期限ウォッチャ**(朝9時通知) 6. **⑥GraphDBスキーマ**(`Route`と`Supply`だけ先に) 7. **⑧ライブ要約**(固定ライブ2–3本から) --- 必要なら、この骨格に合わせて**`evac_pipeline.py` の雛形**(CLI引数・モジュール分割)と **Neo4jの初期Cypher** もすぐ書きます。どこから着手するかだけ言ってくれたら、実装用のコードをそのまま渡します。 がGPTの答え

コンセプトに基づく改良

コンセプトに基づく改良

import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString
import osmnx as ox
import networkx as nx
import folium
import os

GPKG = "outputs/shizuoka_hazard.gpkg"

# 1) 既存レイヤ読み込み(両方とも 4326)
coast   = gpd.read_file(GPKG, layer="coastline")
tsunami = gpd.read_file(GPKG, layer="tsunami_inundation")

# ----- パラメータ -----
# 自宅座標(lat, lon)←必ず置き換え
home_wgs = (34.728, 137.978)
# OSM道路の取得半径[m](到達不能なら大きく:15000, 20000…)
GRAPH_RADIUS_M = 12000
# 避難所CSV(cp932想定)
SHELTER_CSV = "data/20250801_指定緊急避難場所一覧.CSV"
# ----------------------

# 2) 避難所CSV→点化(列名は自動検出)
df = pd.read_csv(SHELTER_CSV, encoding="cp932")
lat_candidates = 
lon_candidates = 
if not lat_candidates or not lon_candidates:
    raise ValueError("CSVに緯度/経度列が見つかりません。列名を教えてください。")
lat_col, lon_col = lat_candidates[0], lon_candidates[0]
df[lat_col] = pd.to_numeric(df[lat_col], errors="coerce")
df[lon_col] = pd.to_numeric(df[lon_col], errors="coerce")
df = df.dropna(subset=[lat_col, lon_col])
shelters = gpd.GeoDataFrame(df.copy(),
    geometry=gpd.points_from_xy(df[lon_col], df[lat_col], crs="EPSG:4326")
)

# 3) 安全な避難所 = 浸水域と非交差
#   (unionはメモリ負荷になるので、空間結合で判定)
safe_join = gpd.sjoin(shelters, tsunami[["geometry"]], how="left", predicate="intersects")
shelters["safe"] = safe_join["index_right"].isna()
safe_shelters = shelters[shelters["safe"]].copy()

# --- 4) OSM道路取得(徒歩網) ---
G = ox.graph_from_point(home_wgs, dist=GRAPH_RADIUS_M, network_type="walk", simplify=True, retain_all=False)
nodes, edges = ox.graph_to_gdfs(G, nodes=True, edges=True)

# u,v,key を“列”にする(重要)
edges = edges.reset_index()

# CRSを4326に
if edges.crs is None:
    edges = edges.set_crs(4326)
elif edges.crs.to_epsg() != 4326:
    edges = edges.to_crs(4326)
# --- 5) 浸水域と交差する道路を除去(sjoin→集計で一意化) ---
left = edges[["u","v","key","geometry"]].copy()   # edges は reset_index() 済み想定
right = tsunami[["geometry"]]

tag = gpd.sjoin(left, right, how="left", predicate="intersects")

blocked = (
    tag.assign(hit=tag["index_right"].notna())
       .groupby(["u","v","key"], as_index=False)["hit"].max()
       .rename(columns={"hit":"blocked"})
)

edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
edges2["blocked"] = edges2["blocked"].fillna(False)

edges_keep = edges2.loc[~edges2["blocked"]].copy()

# ★ ここが重要:graph_from_gdfs に渡す前に MultiIndex に戻す
#   (key が float になっていたら int にしておくと安心)
if edges_keep["key"].isna().any():
    edges_keep["key"] = edges_keep["key"].fillna(0)

edges_keep["key"] = edges_keep["key"].astype(int)
edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()

# --- 6) サブグラフ再構築(通行可のみ) ---
used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()

# nodes_keep の geometry 列が欠けていたら補完
if "geometry" not in nodes_keep or nodes_keep["geometry"].isna().any():
    nodes_keep = nodes_keep.set_geometry(
        gpd.points_from_xy(nodes_keep["x"], nodes_keep["y"]), crs=edges_keep.crs
    )

# OSMnx のバージョン差に対応
try:
    G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))
except AttributeError:
    from osmnx import utils_graph as ug
    G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))


# 7) 出発・目的地ノード(最寄り“安全”避難所)
home_pt = Point(home_wgs[1], home_wgs[0])
# 最寄り安全避難所(直線距離)…なければ全体から選ぶ
TARGET = safe_shelters if len(safe_shelters) else shelters
METRIC = "EPSG:32654"# 静岡周辺 UTM
hp = gpd.GeoSeries([home_pt], crs=4326).to_crs(METRIC).iloc[0]
tp = TARGET.to_crs(METRIC).copy()
tp["__dist_m__"] = tp.geometry.distance(hp)
dest_pt = TARGET.loc[tp["__dist_m__"].idxmin(), "geometry"]

# ▶ 互換呼び分け
try:
    orig = ox.distance.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.distance.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)
except AttributeError:
    # 新API名(環境によってはこちらが有効)
    orig = ox.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)


# 8) 最短経路(通行可能グラフ)
try:
    route = nx.shortest_path(G2, orig, dest, weight="length")
except nx.NetworkXNoPath:
    raise RuntimeError("到達経路なし。GRAPH_RADIUS_Mを増やすか、別の避難所で再試行を。")

# 9) GPKGへ追記
#    既にある場合は上書きしたいなら driverオプションを切り替える(ここは追記)
shelters.to_file(GPKG, layer="shelters_all", driver="GPKG")
safe_shelters.to_file(GPKG, layer="shelters_safe", driver="GPKG")
edges.to_file(GPKG, layer="roads_all", driver="GPKG")
edges_keep.to_file(GPKG, layer="roads_safe", driver="GPKG")

# 10) Folium地図
m = folium.Map(location=[home_wgs[0], home_wgs[1]], zoom_start=12, control_scale=True)
folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定(A40)").add_to(m)
folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)

# 経路を線で描画
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip="避難経路").add_to(m)

# 出発・目的地
folium.Marker([home_wgs[0], home_wgs[1]], popup="現在地", icon=folium.Icon(color="green")).add_to(m)
folium.Marker([dest_pt.y, dest_pt.x], popup="避難所", icon=folium.Icon(color="blue")).add_to(m)

# 避難所レイヤ(安全のみ強調)
folium.GeoJson(safe_shelters.__geo_interface__, name="安全な避難所",
               marker=folium.CircleMarker(radius=4)).add_to(m)
folium.LayerControl().add_to(m)

# 保存(任意)
m.save("outputs/evac.html")
print("✓ 完了: GPKG追記 & outputs/evac.html を作成")

の結果

---------------------------------------------------------------------------
CPLE_AppDefinedError                      Traceback (most recent call last)
File pyogrio/_io.pyx:2344, in pyogrio._io.create_ogr_dataset_layer()

File pyogrio/_err.pyx:218, in pyogrio._err.check_pointer()

CPLE_AppDefinedError: Layer roads_all already exists, CreateLayer failed. Use the layer creation option OVERWRITE=YES to replace it.

During handling of the above exception, another exception occurred:

DataLayerError                            Traceback (most recent call last)
Cell In[2], line 129
    127 shelters.to_file(GPKG, layer="shelters_all", driver="GPKG")
    128 safe_shelters.to_file(GPKG, layer="shelters_safe", driver="GPKG")
--> 129 edges.to_file(GPKG, layer="roads_all", driver="GPKG")
    130 edges_keep.to_file(GPKG, layer="roads_safe", driver="GPKG")
    132 # 10) Folium地図

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/geodataframe.py:1637, in GeoDataFrame.to_file(self, filename, driver, schema, index, **kwargs)


   1543 """Write the ``GeoDataFrame`` to a file.
   1544 
   1545 By default, an ESRI shapefile is written, but any OGR data source
   (...)   1633 
   1634 """
   1635 from geopandas.io.file import _to_file
-> 1637 _to_file(self, filename, driver, schema, index, **kwargs)

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/io/file.py:731, in _to_file(df, filename, driver, schema, index, mode, crs, engine, metadata, **kwargs)
    728     raise ValueError(f"'mode' should be one of 'w' or 'a', got '{mode}' instead")
    730 if engine == "pyogrio":
--> 731     _to_file_pyogrio(df, filename, driver, schema, crs, mode, metadata, **kwargs)
    732 elif engine == "fiona":
    733     _to_file_fiona(df, filename, driver, schema, crs, mode, metadata, **kwargs)

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/io/file.py:793, in _to_file_pyogrio(df, filename, driver, schema, crs, mode, metadata, **kwargs)
    790 if not df.columns.is_unique:
    791     raise ValueError("GeoDataFrame cannot contain duplicated column names.")
--> 793 pyogrio.write_dataframe(df, filename, driver=driver, metadata=metadata, **kwargs)

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pyogrio/geopandas.py:710, in write_dataframe(df, path, layer, driver, encoding, geometry_type, promote_to_multi, nan_as_null, append, use_arrow, dataset_metadata, layer_metadata, metadata, dataset_options, layer_options, **kwargs)
    707         field_data.append(values)
    708         field_mask.append(None)
--> 710 write(
    711     path,
    712     layer=layer,
    713     driver=driver,
    714     geometry=geometry,
    715     field_data=field_data,
    716     field_mask=field_mask,
    717     fields=fields,
    718     crs=crs,
    719     geometry_type=geometry_type,
    720     encoding=encoding,
    721     promote_to_multi=promote_to_multi,
    722     nan_as_null=nan_as_null,
    723     append=append,
    724     dataset_metadata=dataset_metadata,
    725     layer_metadata=layer_metadata,
    726     metadata=metadata,
    727     dataset_options=dataset_options,
    728     layer_options=layer_options,
    729     gdal_tz_offsets=gdal_tz_offsets,
    730     **kwargs,
    731 )

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pyogrio/raw.py:723, in write(path, geometry, field_data, fields, field_mask, layer, driver, geometry_type, crs, encoding, promote_to_multi, nan_as_null, append, dataset_metadata, layer_metadata, metadata, dataset_options, layer_options, gdal_tz_offsets, **kwargs)
    718 # preprocess kwargs and split in dataset and layer creation options
    719 dataset_kwargs, layer_kwargs = _preprocess_options_kwargs(
    720     driver, dataset_options, layer_options, kwargs
    721 )
--> 723 ogr_write(
    724     path,
    725     layer=layer,
    726     driver=driver,
    727     geometry=geometry,
    728     geometry_type=geometry_type,
    729     field_data=field_data,
    730     field_mask=field_mask,
    731     fields=fields,
    732     crs=crs,
    733     encoding=encoding,
    734     promote_to_multi=promote_to_multi,
    735     nan_as_null=nan_as_null,
    736     append=append,
    737     dataset_metadata=dataset_metadata,
    738     layer_metadata=layer_metadata,
    739     dataset_kwargs=dataset_kwargs,
    740     layer_kwargs=layer_kwargs,
    741     gdal_tz_offsets=gdal_tz_offsets,
    742 )

File pyogrio/_io.pyx:2463, in pyogrio._io.ogr_write()

File pyogrio/_io.pyx:2360, in pyogrio._io.create_ogr_dataset_layer()

DataLayerError: Layer roads_all already exists, CreateLayer failed. Use the layer creation option OVERWRITE=YES to replace it.

となった

原因は
エラーの原因は、そのレイヤー名(roads_all)が GPKG 内にすでに存在しているのに、上書きオプションを付けずに to_file() しているためです。対処は「上書き」するか「追記」するかの2択です。再実行に強いのは上書きです。

いくつか方法を提示されたが、コンセプトに基づき改造しないと後で全部修正になる

使用する用途のコンセプト また震源地から津波が到達するまでの時間と津波の高さを計算し GIS情報を使い避難するエリアを算出しルート案内できるようにしたい 家族の避難経路・地域防災マップをGraphDBで整理 避難所や重要連絡先も紐付け 使い方例: 「地震が来たら最寄りの避難所はどこ?」 「非常食の賞味期限が近いものは?」 など この用途で使うなら 今回の場合は (上書き) 既存レイヤーを削除してから書く 同じレイヤーにレコードを追加 どれを推奨するか?

結論から:
* 今回の用途(津波到達時間・高さを計算 → 安全エリア算出 → その時点で最良の避難経路を提示)では、
“計算で毎回作り直す派生レイヤー”は原則《上書き》を推奨です。
* 履歴を残したいテーブル(実行ログ・過去の経路・賞味期限チェック結果など)は《追記》を併用します。
* 《既存レイヤー削除→書き込み》は、スキーマを頻繁に変えるときやエンジン差の不具合回避用のセーフティとして使う位置づけ。

なぜ《上書き》が最適か
あなたのワークフローは「同じ入力(ベースの津波想定・OSM・避難所)→同じ処理→決定論的な派生成果物(roads_safe, shelters_safe, route_latest など)」という再現可能・冪等パイプラインです。
このタイプは毎回クリーンに作り直せることが最重要なので、レイヤー名固定+上書きが最もシンプルで壊れにくいです。
* 冪等:再実行で結果が混ざらない
* 運用:下流(Folium表示/GraphDB同期)が常に同じレイヤー名を参照
* 事故防止:前回分の残骸や列増減の不一致で壊れない

まとめ(今回どれを勧める?)
* メインは《上書き》一択(派生レイヤーは常に最新をクリーンに再生成)
* 履歴が必要なものだけ《追記》を別レイヤーで併用
* スキーマ変更直後や不具合回避には《削除→書き込み》をスポット的に
この方針にすると、
「地震が来たら最寄りの避難所は?」→ shelters_safe+route_latest を即参照、
「非常食の賞味期限が近いものは?」→ food_expiry_alerts(追記ログ)や在庫DB を参照、
という形で運用がシンプル&堅牢になります。

# 9) GPKGへ追記
# #    既にある場合は上書きしたいなら driverオプションを切り替える(ここは追記)
# shelters.to_file(GPKG, layer="shelters_all", driver="GPKG")
# safe_shelters.to_file(GPKG, layer="shelters_safe", driver="GPKG")
# edges.to_file(GPKG, layer="roads_all", driver="GPKG")
# edges_keep.to_file(GPKG, layer="roads_safe", driver="GPKG")

# 9) GPKGへ書き出し(毎回クリーンに上書き)
common_kwargs = dict(
    driver="GPKG",
    engine="pyogrio",
    layer_options={"OVERWRITE": "YES"},  # 既存レイヤーを置き換える
)

shelters.to_file(GPKG, layer="shelters_all",   **common_kwargs)
safe_shelters.to_file(GPKG, layer="shelters_safe", **common_kwargs)
edges.to_file(GPKG,    layer="roads_all",     **common_kwargs)
edges_keep.to_file(GPKG, layer="roads_safe",  **common_kwargs)

というようにコードを変更

これでエラーは解決

しかし

# # === 検証→ETA→保存(検証に通ったルートだけ保存・表示) ===
# from shapely.geometry import LineString
# import geopandas as gpd

# # 1) ルートのLineString(まだ属性は付けない)
# route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
# route_geom = LineString(route_lonlat)
# route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

# # 2) 堅牢チェック(交差ゼロ&境界近接なし)
# #   union_all が使えない環境向けにフォールバックも用意
# try:
#     flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs="EPSG:4326")
# except Exception:
#     flood_union = gpd.GeoDataFrame(geometry=[tsunami.unary_union], crs="EPSG:4326")

# METRIC = "EPSG:32654"  # 静岡周辺(必要なら地域に合わせて変更)
# route_m = route_gdf.to_crs(METRIC)
# flood_m = flood_union.to_crs(METRIC)

# # 交差長[m]
# inter = gpd.overlay(route_m, flood_m, how="intersection")
# inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

# # 近接(境界 ±2m 以内をNGにする例)
# # near = not gpd.overlay(route_m.buffer(2.0), flood_m, how="intersection").empty
# buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
# near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty


# if inside_len_m > 0 or near:
#     raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
#                        "別避難所 or 半径拡大で再探索してください。")

# # 3) 距離[m]→ETA(バージョン互換)
# try:
#     route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
#     meters = float(route_edges["length"].sum())
# except Exception:
#     meters = 0.0
#     for u, v in zip(route[:-1], route[1:]):
#         ed = G2.get_edge_data(u, v)
#         meters += min(d.get("length", 0.0) for d in ed.values())

# walk_kmh = 4.8
# drive_kmh = 20.0
# eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
# eta_drive_min = meters / (drive_kmh * 1000 / 60)
# eta_walk_safe = eta_walk_min * 1.3  # 安全係数(例:+30%)

# # 4) GPKG に“合格ルート”だけ属性付きで保存
# out_gpkg = "outputs/shizuoka_hazard.gpkg"
# route_gdf = gpd.GeoDataFrame(
#     {
#         "meters":[meters],
#         "eta_min_walk":[eta_walk_min],
#         "eta_min_walk_safe":[eta_walk_safe],
#         "eta_min_drive":[eta_drive_min],
#     },
#     geometry=[route_geom], crs="EPSG:4326",
# )
# route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
# print("✅ 交差なし・保存完了")

# # 5) (任意)Foliumに表示するならここで
# route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
# folium.PolyLine(
#     route_latlon, weight=6, opacity=0.9,
#     tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
# ).add_to(m)

# === ルート検証 → ETA → 保存(キャッシュ対応) ===
from shapely.geometry import LineString, Point
import geopandas as gpd
import hashlib, json
from datetime import datetime
from fiona import listlayers
import osmnx as ox
import networkx as nx

GPKG = "outputs/shizuoka_hazard.gpkg"
METRIC = "EPSG:32654"  # 静岡周辺

# 0) 津波 union はキャッシュを優先(なければ作る)
if "tsunami_union" in listlayers(GPKG):
    flood_union = gpd.read_file(GPKG, layer="tsunami_union")
else:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    flood_union.to_file(GPKG, layer="tsunami_union", driver="GPKG")

# 0') ハザードの簡易メタ(A40が変わったらルートキャッシュ無効化に使う)
ti = gpd.read_file(GPKG, layer="tsunami_inundation")
hazard_meta = f'{len(ti)}|{",".join(map(str, ti.total_bounds))}'

# 1) キャッシュキー生成(端点・モード・半径・ハザードメタ)
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=12000, hazard_meta=None):
    payload = {"home": home_wgs, "dest": dest_wgs, "mode": mode, "r": radius_m, "hz": hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

key = route_key(home_wgs, (dest_pt.y, dest_pt.x), mode="walk",
                radius_m=GRAPH_RADIUS_M, hazard_meta=hazard_meta)

# 2) ルートキャッシュにヒットすれば使う
route_gdf = None
use_cached_route = False
if "route_cache" in listlayers(GPKG):
    cache = gpd.read_file(GPKG, layer="route_cache")
    hit = cache.loc[cache["rid"] == key]
    if len(hit):
        route_gdf = hit.set_crs(4326)
        meters = float(route_gdf["meters"].iloc[0])
        eta_walk_min  = float(route_gdf["eta_min_walk"].iloc[0])
        eta_walk_safe = float(route_gdf["eta_min_walk_safe"].iloc[0])
        eta_drive_min = float(route_gdf["eta_min_drive"].iloc[0])
        coords = list(route_gdf.geometry.iloc[0].coords)
        route_latlon = [(lat, lon) for lon, lat in coords]  # folium用 (lat,lon)
        use_cached_route = True
        print("↩︎ ルート: キャッシュヒット")

if not use_cached_route:
    # 3) これまで通りの経路('route')から LineString を作成
    route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
    route_geom = LineString(route_lonlat)
    route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

    # 4) 堅牢チェック(交差ゼロ&境界近接なし)
    route_m = route_gdf.to_crs(METRIC)
    flood_m = flood_union.to_crs(METRIC)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty

    if inside_len_m > 0 or near:
        raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                           "別避難所 or 半径拡大で再探索してください。")

    # 5) 距離[m]→ETA
    try:
        route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u, v in zip(route[:-1], route[1:]):
            ed = G2.get_edge_data(u, v)
            meters += min(d.get("length", 0.0) for d in ed.values())

    walk_kmh = 4.8
    drive_kmh = 20.0
    eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
    eta_drive_min = meters / (drive_kmh * 1000 / 60)
    eta_walk_safe = eta_walk_min * 1.3

    # 6) route_latest へ保存(検証済みのみ)
    out_gpkg = GPKG
    route_gdf = gpd.GeoDataFrame(
        {
            "meters":[meters],
            "eta_min_walk":[eta_walk_min],
            "eta_min_walk_safe":[eta_walk_safe],
            "eta_min_drive":[eta_drive_min],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326",
    )
    route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
    print("✅ 交差なし・保存完了")

    # 7) ルートキャッシュに追記
    route_cache_rec = route_gdf.copy()
    route_cache_rec["rid"] = key
    try:
        route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG", if_exists="append")
    except TypeError:
        # 古いGeoPandasのためのフォールバック
        if "route_cache" in listlayers(out_gpkg):
            old = gpd.read_file(out_gpkg, layer="route_cache")
            gpd.GeoDataFrame(pd.concat([old, route_cache_rec], ignore_index=True), crs=4326)\
               .to_file(out_gpkg, layer="route_cache", driver="GPKG")
        else:
            route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG")

    # 8) folium用座標
    route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]

# 9) Folium表示(キャッシュ・新規どちらでも)
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[8], line 175
    173 route_cache_rec["rid"] = key
    174 try:
--> 175     route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG", if_exists="append")
    176 except TypeError:
    177     # 古いGeoPandasのためのフォールバック
    178     if "route_cache" in listlayers(out_gpkg):

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/geodataframe.py:1637, in GeoDataFrame.to_file(self, filename, driver, schema, index, **kwargs)
   1543 """Write the ``GeoDataFrame`` to a file.
   1544 
   1545 By default, an ESRI shapefile is written, but any OGR data source
   (...)   1633 
   1634 """
   1635 from geopandas.io.file import _to_file
-> 1637 _to_file(self, filename, driver, schema, index, **kwargs)

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/io/file.py:731, in _to_file(df, filename, driver, schema, index, mode, crs, engine, metadata, **kwargs)
    728     raise ValueError(f"'mode' should be one of 'w' or 'a', got '{mode}' instead")
    730 if engine == "pyogrio":
--> 731     _to_file_pyogrio(df, filename, driver, schema, crs, mode, metadata, **kwargs)
    732 elif engine == "fiona":
    733     _to_file_fiona(df, filename, driver, schema, crs, mode, metadata, **kwargs)

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/geopandas/io/file.py:793, in _to_file_pyogrio(df, filename, driver, schema, crs, mode, metadata, **kwargs)
    790 if not df.columns.is_unique:
    791     raise ValueError("GeoDataFrame cannot contain duplicated column names.")
--> 793 pyogrio.write_dataframe(df, filename, driver=driver, metadata=metadata, **kwargs)

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pyogrio/geopandas.py:710, in write_dataframe(df, path, layer, driver, encoding, geometry_type, promote_to_multi, nan_as_null, append, use_arrow, dataset_metadata, layer_metadata, metadata, dataset_options, layer_options, **kwargs)
    707         field_data.append(values)
    708         field_mask.append(None)
--> 710 write(
    711     path,
    712     layer=layer,
    713     driver=driver,
    714     geometry=geometry,
    715     field_data=field_data,
    716     field_mask=field_mask,
    717     fields=fields,
    718     crs=crs,
    719     geometry_type=geometry_type,
    720     encoding=encoding,
    721     promote_to_multi=promote_to_multi,
    722     nan_as_null=nan_as_null,
    723     append=append,
    724     dataset_metadata=dataset_metadata,
    725     layer_metadata=layer_metadata,
    726     metadata=metadata,
    727     dataset_options=dataset_options,
    728     layer_options=layer_options,
    729     gdal_tz_offsets=gdal_tz_offsets,
    730     **kwargs,
    731 )

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pyogrio/raw.py:719, in write(path, geometry, field_data, fields, field_mask, layer, driver, geometry_type, crs, encoding, promote_to_multi, nan_as_null, append, dataset_metadata, layer_metadata, metadata, dataset_options, layer_options, gdal_tz_offsets, **kwargs)
    711     warnings.warn(
    712         "'crs' was not provided.  The output dataset will not have "
    713         "projection information defined and may not be usable in other "
    714         "systems.",
    715         stacklevel=2,
    716     )
    718 # preprocess kwargs and split in dataset and layer creation options
--> 719 dataset_kwargs, layer_kwargs = _preprocess_options_kwargs(
    720     driver, dataset_options, layer_options, kwargs
    721 )
    723 ogr_write(
    724     path,
    725     layer=layer,
   (...)    741     gdal_tz_offsets=gdal_tz_offsets,
    742 )

File ~/.pyenv/versions/3.11.0/lib/python3.11/site-packages/pyogrio/raw.py:514, in _preprocess_options_kwargs(driver, dataset_options, layer_options, kwargs)
    512             layer_kwargs[k] = v
    513         else:
--> 514             raise ValueError(f"unrecognized option '{k}' for driver '{driver}'")
    516 return dataset_kwargs, layer_kwargs

ValueError: unrecognized option 'IF_EXISTS' for driver 'GPKG'

原因は
if_exists=”append” は pyogrio では非対応 なのでエラーになっています。
to_file() は engine=”pyogrio” + append=True を使ってください。ついでに route_latest など「毎回作り直す」レイヤーは OVERWRITE で上書きするのが安全です

要点
* pyogrio では if_exists=”append” は使えません → append=True を使う。
* 毎回再計算するレイヤーは layer_options={“OVERWRITE”:”YES”} で確実に上書き。
* 追記でスキーマ差が出たら、読み出し→列合わせ→連結→上書き で堅牢化。

route_latest は毎回クリーンに上書き、
route_cache はappendで追記(pyogrio準拠)、
tsunami_union も必要に応じて上書き耐性で作成、
の3点が組み込んでコードを書き換える

# # === 検証→ETA→保存(検証に通ったルートだけ保存・表示) ===
# from shapely.geometry import LineString
# import geopandas as gpd

# # 1) ルートのLineString(まだ属性は付けない)
# route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
# route_geom = LineString(route_lonlat)
# route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

# # 2) 堅牢チェック(交差ゼロ&境界近接なし)
# #   union_all が使えない環境向けにフォールバックも用意
# try:
#     flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs="EPSG:4326")
# except Exception:
#     flood_union = gpd.GeoDataFrame(geometry=[tsunami.unary_union], crs="EPSG:4326")

# METRIC = "EPSG:32654"  # 静岡周辺(必要なら地域に合わせて変更)
# route_m = route_gdf.to_crs(METRIC)
# flood_m = flood_union.to_crs(METRIC)

# # 交差長[m]
# inter = gpd.overlay(route_m, flood_m, how="intersection")
# inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

# # 近接(境界 ±2m 以内をNGにする例)
# # near = not gpd.overlay(route_m.buffer(2.0), flood_m, how="intersection").empty
# buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
# near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty


# if inside_len_m > 0 or near:
#     raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
#                        "別避難所 or 半径拡大で再探索してください。")

# # 3) 距離[m]→ETA(バージョン互換)
# try:
#     route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
#     meters = float(route_edges["length"].sum())
# except Exception:
#     meters = 0.0
#     for u, v in zip(route[:-1], route[1:]):
#         ed = G2.get_edge_data(u, v)
#         meters += min(d.get("length", 0.0) for d in ed.values())

# walk_kmh = 4.8
# drive_kmh = 20.0
# eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
# eta_drive_min = meters / (drive_kmh * 1000 / 60)
# eta_walk_safe = eta_walk_min * 1.3  # 安全係数(例:+30%)

# # 4) GPKG に“合格ルート”だけ属性付きで保存
# out_gpkg = "outputs/shizuoka_hazard.gpkg"
# route_gdf = gpd.GeoDataFrame(
#     {
#         "meters":[meters],
#         "eta_min_walk":[eta_walk_min],
#         "eta_min_walk_safe":[eta_walk_safe],
#         "eta_min_drive":[eta_drive_min],
#     },
#     geometry=[route_geom], crs="EPSG:4326",
# )
# route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
# print("✅ 交差なし・保存完了")

# # 5) (任意)Foliumに表示するならここで
# route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
# folium.PolyLine(
#     route_latlon, weight=6, opacity=0.9,
#     tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
# ).add_to(m)

# === ルート検証 → ETA → 保存(キャッシュ対応) ===
from shapely.geometry import LineString, Point
import geopandas as gpd
import hashlib, json
from datetime import datetime
from fiona import listlayers
import osmnx as ox
import networkx as nx

GPKG = "outputs/shizuoka_hazard.gpkg"
METRIC = "EPSG:32654"  # 静岡周辺

# 0) 津波 union はキャッシュを優先(なければ作る)
if "tsunami_union" in listlayers(GPKG):
    flood_union = gpd.read_file(GPKG, layer="tsunami_union")
else:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    flood_union.to_file(GPKG, layer="tsunami_union", driver="GPKG")

# 0') ハザードの簡易メタ(A40が変わったらルートキャッシュ無効化に使う)
ti = gpd.read_file(GPKG, layer="tsunami_inundation")
hazard_meta = f'{len(ti)}|{",".join(map(str, ti.total_bounds))}'

# 1) キャッシュキー生成(端点・モード・半径・ハザードメタ)
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=12000, hazard_meta=None):
    payload = {"home": home_wgs, "dest": dest_wgs, "mode": mode, "r": radius_m, "hz": hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

key = route_key(home_wgs, (dest_pt.y, dest_pt.x), mode="walk",
                radius_m=GRAPH_RADIUS_M, hazard_meta=hazard_meta)

# 2) ルートキャッシュにヒットすれば使う
route_gdf = None
use_cached_route = False
if "route_cache" in listlayers(GPKG):
    cache = gpd.read_file(GPKG, layer="route_cache")
    hit = cache.loc[cache["rid"] == key]
    if len(hit):
        route_gdf = hit.set_crs(4326)
        meters = float(route_gdf["meters"].iloc[0])
        eta_walk_min  = float(route_gdf["eta_min_walk"].iloc[0])
        eta_walk_safe = float(route_gdf["eta_min_walk_safe"].iloc[0])
        eta_drive_min = float(route_gdf["eta_min_drive"].iloc[0])
        coords = list(route_gdf.geometry.iloc[0].coords)
        route_latlon = [(lat, lon) for lon, lat in coords]  # folium用 (lat,lon)
        use_cached_route = True
        print("↩︎ ルート: キャッシュヒット")

if not use_cached_route:
    # 3) これまで通りの経路('route')から LineString を作成
    route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
    route_geom = LineString(route_lonlat)
    route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

    # 4) 堅牢チェック(交差ゼロ&境界近接なし)
    route_m = route_gdf.to_crs(METRIC)
    flood_m = flood_union.to_crs(METRIC)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty

    if inside_len_m > 0 or near:
        raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                           "別避難所 or 半径拡大で再探索してください。")

    # 5) 距離[m]→ETA
    try:
        route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u, v in zip(route[:-1], route[1:]):
            ed = G2.get_edge_data(u, v)
            meters += min(d.get("length", 0.0) for d in ed.values())

    walk_kmh = 4.8
    drive_kmh = 20.0
    eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
    eta_drive_min = meters / (drive_kmh * 1000 / 60)
    eta_walk_safe = eta_walk_min * 1.3

    # 6) route_latest へ保存(検証済みのみ)
    out_gpkg = GPKG
    route_gdf = gpd.GeoDataFrame(
        {
            "meters":[meters],
            "eta_min_walk":[eta_walk_min],
            "eta_min_walk_safe":[eta_walk_safe],
            "eta_min_drive":[eta_drive_min],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326",
    )
    route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
    print("✅ 交差なし・保存完了")

    # 7) ルートキャッシュに追記
    route_cache_rec = route_gdf.copy()
    route_cache_rec["rid"] = key
    try:
        route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG", if_exists="append")
    except TypeError:
        # 古いGeoPandasのためのフォールバック
        if "route_cache" in listlayers(out_gpkg):
            old = gpd.read_file(out_gpkg, layer="route_cache")
            gpd.GeoDataFrame(pd.concat([old, route_cache_rec], ignore_index=True), crs=4326)\
               .to_file(out_gpkg, layer="route_cache", driver="GPKG")
        else:
            route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG")

    # 8) folium用座標
    route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]

# 9) Folium表示(キャッシュ・新規どちらでも)
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

を
# === ルート検証 → ETA → 保存(キャッシュ対応, 上書き&追記の方針を反映) ===
from shapely.geometry import LineString, Point
import geopandas as gpd
import pandas as pd
import hashlib, json
from datetime import datetime
from fiona import listlayers
import osmnx as ox
import networkx as nx

GPKG = "outputs/shizuoka_hazard.gpkg"
METRIC = "EPSG:32654"  # 静岡周辺

# 0) 津波 union はキャッシュを優先(なければ作る)→ 上書き耐性ありで作成可
if "tsunami_union" in listlayers(GPKG):
    flood_union = gpd.read_file(GPKG, layer="tsunami_union")
else:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    flood_union.to_file(
        GPKG, layer="tsunami_union",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}  # (3) 上書き耐性
    )

# 0') ハザードの簡易メタ(A40が変わったらルートキャッシュ無効化に使う)
ti = gpd.read_file(GPKG, layer="tsunami_inundation")
hazard_meta = f'{len(ti)}|{",".join(map(str, ti.total_bounds))}'

# 1) キャッシュキー生成(端点・モード・半径・ハザードメタ)
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=12000, hazard_meta=None):
    payload = {"home": home_wgs, "dest": dest_wgs, "mode": mode, "r": radius_m, "hz": hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

key = route_key(home_wgs, (dest_pt.y, dest_pt.x), mode="walk",
                radius_m=GRAPH_RADIUS_M, hazard_meta=hazard_meta)

# 2) ルートキャッシュにヒットすれば使う
route_gdf = None
use_cached_route = False
if "route_cache" in listlayers(GPKG):
    cache = gpd.read_file(GPKG, layer="route_cache")
    hit = cache.loc[cache["rid"] == key]
    if len(hit):
        route_gdf = hit.set_crs(4326)
        meters = float(route_gdf["meters"].iloc[0])
        eta_walk_min  = float(route_gdf["eta_min_walk"].iloc[0])
        eta_walk_safe = float(route_gdf["eta_min_walk_safe"].iloc[0])
        eta_drive_min = float(route_gdf["eta_min_drive"].iloc[0])
        coords = list(route_gdf.geometry.iloc[0].coords)
        route_latlon = [(lat, lon) for lon, lat in coords]  # folium用 (lat,lon)
        use_cached_route = True
        print("↩︎ ルート: キャッシュヒット")

if not use_cached_route:
    # 3) これまで通りの経路('route')から LineString を作成
    route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
    route_geom = LineString(route_lonlat)
    route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

    # 4) 堅牢チェック(交差ゼロ&境界近接なし)
    route_m = route_gdf.to_crs(METRIC)
    flood_m = flood_union.to_crs(METRIC)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty

    if inside_len_m > 0 or near:
        raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                           "別避難所 or 半径拡大で再探索してください。")

    # 5) 距離[m]→ETA
    try:
        route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u, v in zip(route[:-1], route[1:]):
            ed = G2.get_edge_data(u, v)
            meters += min(d.get("length", 0.0) for d in ed.values())

    walk_kmh = 4.8
    drive_kmh = 20.0
    eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
    eta_drive_min = meters / (drive_kmh * 1000 / 60)
    eta_walk_safe = eta_walk_min * 1.3

    # 6) route_latest へ保存(検証済みのみ)→ 上書き
    out_gpkg = GPKG
    route_gdf = gpd.GeoDataFrame(
        {
            "meters":[meters],
            "eta_min_walk":[eta_walk_min],
            "eta_min_walk_safe":[eta_walk_safe],
            "eta_min_drive":[eta_drive_min],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326",
    )
    route_gdf.to_file(
        out_gpkg, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}  # (1) 上書き
    )
    print("✅ 交差なし・保存完了")

    # 7) ルートキャッシュに追記(pyogrio 流儀)
    route_cache_rec = route_gdf.copy()
    route_cache_rec["rid"] = key
    try:
        route_cache_rec.to_file(
            out_gpkg, layer="route_cache",
            driver="GPKG", engine="pyogrio",
            append=True                     # (2) 追記
        )
    except Exception:
        # フォールバック:スキーマ差や古い環境でも確実に追記する
        if "route_cache" in listlayers(out_gpkg):
            old = gpd.read_file(out_gpkg, layer="route_cache")
            cols = sorted(set(old.columns) | set(route_cache_rec.columns))
            old = old.reindex(columns=cols)
            new = route_cache_rec.reindex(columns=cols)
            merged = gpd.GeoDataFrame(pd.concat([old, new], ignore_index=True), crs=4326)
            merged.to_file(
                out_gpkg, layer="route_cache",
                driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"}  # マージして上書き
            )
        else:
            route_cache_rec.to_file(
                out_gpkg, layer="route_cache",
                driver="GPKG", engine="pyogrio"
            )

    # 8) folium用座標
    route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]

# 9) Folium表示(キャッシュ・新規どちらでも)
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

へ変更

これで動くようになった

!ogrinfo -so outputs/shizuoka_hazard.gpkg route_latest

で動作も問題なし

import geopandas as gpd
gpd.read_file("outputs/shizuoka_hazard.gpkg", layer="route_latest")[["meters","eta_min_walk","eta_min_walk_safe","eta_min_drive"]]

も問題なし

再度 notebook での確認

再度 notebook での確認

import sys, os
sys.path.append(os.path.abspath("."))  # プロジェクト直下をパスに追加
# 以後 from evac.config import Cfg が通る

import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString
import osmnx as ox
import networkx as nx
import folium
import os

GPKG = "outputs/shizuoka_hazard.gpkg"

# 1) 既存レイヤ読み込み(両方とも 4326)
coast   = gpd.read_file(GPKG, layer="coastline")
tsunami = gpd.read_file(GPKG, layer="tsunami_inundation")

# ----- パラメータ -----
# 自宅座標(lat, lon)←必ず置き換え
home_wgs = (34.728, 137.978)
# OSM道路の取得半径[m](到達不能なら大きく:15000, 20000…)
GRAPH_RADIUS_M = 12000
# 避難所CSV(cp932想定)
SHELTER_CSV = "data/20250801_指定緊急避難場所一覧.CSV"
# ----------------------

# 2) 避難所CSV→点化(列名は自動検出)
df = pd.read_csv(SHELTER_CSV, encoding="cp932")
lat_candidates = 
lon_candidates = 
if not lat_candidates or not lon_candidates:
    raise ValueError("CSVに緯度/経度列が見つかりません。列名を教えてください。")
lat_col, lon_col = lat_candidates[0], lon_candidates[0]
df[lat_col] = pd.to_numeric(df[lat_col], errors="coerce")
df[lon_col] = pd.to_numeric(df[lon_col], errors="coerce")
df = df.dropna(subset=[lat_col, lon_col])
shelters = gpd.GeoDataFrame(df.copy(),
    geometry=gpd.points_from_xy(df[lon_col], df[lat_col], crs="EPSG:4326")
)

# 3) 安全な避難所 = 浸水域と非交差
#   (unionはメモリ負荷になるので、空間結合で判定)
safe_join = gpd.sjoin(shelters, tsunami[["geometry"]], how="left", predicate="intersects")
shelters["safe"] = safe_join["index_right"].isna()
safe_shelters = shelters[shelters["safe"]].copy()

# --- 4) OSM道路取得(徒歩網) ---
G = ox.graph_from_point(home_wgs, dist=GRAPH_RADIUS_M, network_type="walk", simplify=True, retain_all=False)
nodes, edges = ox.graph_to_gdfs(G, nodes=True, edges=True)

# u,v,key を“列”にする(重要)
edges = edges.reset_index()

# CRSを4326に
if edges.crs is None:
    edges = edges.set_crs(4326)
elif edges.crs.to_epsg() != 4326:
    edges = edges.to_crs(4326)
# --- 5) 浸水域と交差する道路を除去(sjoin→集計で一意化) ---
left = edges[["u","v","key","geometry"]].copy()   # edges は reset_index() 済み想定
right = tsunami[["geometry"]]

tag = gpd.sjoin(left, right, how="left", predicate="intersects")

blocked = (
    tag.assign(hit=tag["index_right"].notna())
       .groupby(["u","v","key"], as_index=False)["hit"].max()
       .rename(columns={"hit":"blocked"})
)

edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
edges2["blocked"] = edges2["blocked"].fillna(False)

edges_keep = edges2.loc[~edges2["blocked"]].copy()

# ★ ここが重要:graph_from_gdfs に渡す前に MultiIndex に戻す
#   (key が float になっていたら int にしておくと安心)
if edges_keep["key"].isna().any():
    edges_keep["key"] = edges_keep["key"].fillna(0)

edges_keep["key"] = edges_keep["key"].astype(int)
edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()

# --- 6) サブグラフ再構築(通行可のみ) ---
used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()

# nodes_keep の geometry 列が欠けていたら補完
if "geometry" not in nodes_keep or nodes_keep["geometry"].isna().any():
    nodes_keep = nodes_keep.set_geometry(
        gpd.points_from_xy(nodes_keep["x"], nodes_keep["y"]), crs=edges_keep.crs
    )

# OSMnx のバージョン差に対応
try:
    G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))
except AttributeError:
    from osmnx import utils_graph as ug
    G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))


# 7) 出発・目的地ノード(最寄り“安全”避難所)
home_pt = Point(home_wgs[1], home_wgs[0])
# 最寄り安全避難所(直線距離)…なければ全体から選ぶ
TARGET = safe_shelters if len(safe_shelters) else shelters
METRIC = "EPSG:32654"# 静岡周辺 UTM
hp = gpd.GeoSeries([home_pt], crs=4326).to_crs(METRIC).iloc[0]
tp = TARGET.to_crs(METRIC).copy()
tp["__dist_m__"] = tp.geometry.distance(hp)
dest_pt = TARGET.loc[tp["__dist_m__"].idxmin(), "geometry"]

# ▶ 互換呼び分け
try:
    orig = ox.distance.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.distance.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)
except AttributeError:
    # 新API名(環境によってはこちらが有効)
    orig = ox.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
    dest = ox.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)


# 8) 最短経路(通行可能グラフ)
try:
    route = nx.shortest_path(G2, orig, dest, weight="length")
except nx.NetworkXNoPath:
    raise RuntimeError("到達経路なし。GRAPH_RADIUS_Mを増やすか、別の避難所で再試行を。")

# 9) GPKGへ追記
#    既にある場合は上書きしたいなら driverオプションを切り替える(ここは追記)
shelters.to_file(GPKG, layer="shelters_all", driver="GPKG")
safe_shelters.to_file(GPKG, layer="shelters_safe", driver="GPKG")
edges.to_file(GPKG, layer="roads_all", driver="GPKG")
edges_keep.to_file(GPKG, layer="roads_safe", driver="GPKG")

# 10) Folium地図
m = folium.Map(location=[home_wgs[0], home_wgs[1]], zoom_start=12, control_scale=True)
folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定(A40)").add_to(m)
folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)

# 経路を線で描画
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip="避難経路").add_to(m)

# 出発・目的地
folium.Marker([home_wgs[0], home_wgs[1]], popup="現在地", icon=folium.Icon(color="green")).add_to(m)
folium.Marker([dest_pt.y, dest_pt.x], popup="避難所", icon=folium.Icon(color="blue")).add_to(m)

# 避難所レイヤ(安全のみ強調)
folium.GeoJson(safe_shelters.__geo_interface__, name="安全な避難所",
               marker=folium.CircleMarker(radius=4)).add_to(m)
folium.LayerControl().add_to(m)

# 保存(任意)
m.save("outputs/evac.html")
print("✓ 完了: GPKG追記 & outputs/evac.html を作成")

print("edges_keep index levels:", getattr(edges_keep.index, "nlevels", 1))  # → 3 が正解
print(edges_keep.index.names)  # → ['u','v','key'] が正解

# # ルートの延長[m]は G2 の edge 属性 'length' を合計(OSMnxが付けてくれてます)
# meters = sum(ox.utils_graph.get_route_edge_attributes(G2, route, 'length'))
# walk_kmh = 4.8   # 歩行速度(必要に応じて変更)
# drive_kmh = 20.0 # 災害時の車想定の控えめ値(使うなら)

# eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
# eta_drive_min = meters / (drive_kmh * 1000 / 60)
# print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")

# ▼ 距離[m]の取り方をバージョン互換で
try:
    # 新しめのOSMnx(推奨): ルートをGDF化して length を合計
    route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
    meters = float(route_edges["length"].sum())
except Exception:
    # フォールバック: NetworkXからエッジ属性 length を合計
    meters = 0.0
    for u, v in zip(route[:-1], route[1:]):
        ed = G2.get_edge_data(u, v)  # MultiDiGraph: {key: attr_dict}
        if isinstance(ed, dict) and len(ed):
            meters += min(d.get("length", 0.0) for d in ed.values())

# ETA(必要なら速度を調整)
walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")

# Folium のツールチップにも表示
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分"
).add_to(m)


from shapely.geometry import LineString
import geopandas as gpd

route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]
route_gdf = gpd.GeoDataFrame(
    {"meters":[meters], "eta_min_walk":[eta_walk_min], "eta_min_drive":[eta_drive_min]},
    geometry=[LineString(route_lonlat)], crs="EPSG:4326"
)
route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")

import geopandas as gpd
# ok = not route_gdf.intersects(tsunami.unary_union).iloc[0]
# print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")

# 旧: tsunami.unary_union  ← 警告が出る
# 新:
flood_union = tsunami.geometry.union_all()
ok = not route_gdf.intersects(flood_union).iloc[0]
print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")

# === 検証→ETA→保存(検証に通ったルートだけ保存・表示) ===
from shapely.geometry import LineString
import geopandas as gpd

# 1) ルートのLineString(まだ属性は付けない)
route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
route_geom = LineString(route_lonlat)
route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

# 2) 堅牢チェック(交差ゼロ&境界近接なし)
#   union_all が使えない環境向けにフォールバックも用意
try:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs="EPSG:4326")
except Exception:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.unary_union], crs="EPSG:4326")

METRIC = "EPSG:32654"  # 静岡周辺(必要なら地域に合わせて変更)
route_m = route_gdf.to_crs(METRIC)
flood_m = flood_union.to_crs(METRIC)

# 交差長[m]
inter = gpd.overlay(route_m, flood_m, how="intersection")
inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

# 近接(境界 ±2m 以内をNGにする例)
# near = not gpd.overlay(route_m.buffer(2.0), flood_m, how="intersection").empty
buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty


if inside_len_m > 0 or near:
    raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                       "別避難所 or 半径拡大で再探索してください。")

# 3) 距離[m]→ETA(バージョン互換)
try:
    route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
    meters = float(route_edges["length"].sum())
except Exception:
    meters = 0.0
    for u, v in zip(route[:-1], route[1:]):
        ed = G2.get_edge_data(u, v)
        meters += min(d.get("length", 0.0) for d in ed.values())

walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
eta_walk_safe = eta_walk_min * 1.3  # 安全係数(例:+30%)

# 4) GPKG に“合格ルート”だけ属性付きで保存
out_gpkg = "outputs/shizuoka_hazard.gpkg"
route_gdf = gpd.GeoDataFrame(
    {
        "meters":[meters],
        "eta_min_walk":[eta_walk_min],
        "eta_min_walk_safe":[eta_walk_safe],
        "eta_min_drive":[eta_drive_min],
    },
    geometry=[route_geom], crs="EPSG:4326",
)
route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
print("✅ 交差なし・保存完了")

# 5) (任意)Foliumに表示するならここで
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

!ogrinfo -so outputs/shizuoka_hazard.gpkg route_latest

import geopandas as gpd
gpd.read_file("outputs/shizuoka_hazard.gpkg", layer="route_latest")[["meters","eta_min_walk","eta_min_walk_safe","eta_min_drive"]]

までは動く

どこからバグったかを調べたら
まとめ(差し込む位置の目安)
1. 目的地 dest_pt 決定直後に「キャッシュ照会ブロック」を挿入
2. use_cached_route が False なら従来の「計算→検証→保存」ブロックへ進む
3. 保存直後に route_cache へ追記
4. True なら保存はスキップし、地図表示へ直行(または再検証のみ実施)
これで「既知の組み合わせは即復帰/未知は新規計算」の運用ができます。

これはモジュールにして読み込んだ方が良い?
というあたりでバグ発生

つまりここから無理にモジュールしたのが原因で先にプロトタイプを作ればいい

ルート再利用性を高めたいので、ルート結果をキー付きでGPKGにキャッシュする機能を追加

# === 検証→ETA→保存(検証に通ったルートだけ保存・表示) ===
from shapely.geometry import LineString
import geopandas as gpd

# 1) ルートのLineString(まだ属性は付けない)
route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
route_geom = LineString(route_lonlat)
route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

# 2) 堅牢チェック(交差ゼロ&境界近接なし)
#   union_all が使えない環境向けにフォールバックも用意
try:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs="EPSG:4326")
except Exception:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.unary_union], crs="EPSG:4326")

METRIC = "EPSG:32654"  # 静岡周辺(必要なら地域に合わせて変更)
route_m = route_gdf.to_crs(METRIC)
flood_m = flood_union.to_crs(METRIC)

# 交差長[m]
inter = gpd.overlay(route_m, flood_m, how="intersection")
inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

# 近接(境界 ±2m 以内をNGにする例)
# near = not gpd.overlay(route_m.buffer(2.0), flood_m, how="intersection").empty
buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty


if inside_len_m > 0 or near:
    raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                       "別避難所 or 半径拡大で再探索してください。")

# 3) 距離[m]→ETA(バージョン互換)
try:
    route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
    meters = float(route_edges["length"].sum())
except Exception:
    meters = 0.0
    for u, v in zip(route[:-1], route[1:]):
        ed = G2.get_edge_data(u, v)
        meters += min(d.get("length", 0.0) for d in ed.values())

walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
eta_walk_safe = eta_walk_min * 1.3  # 安全係数(例:+30%)

# 4) GPKG に“合格ルート”だけ属性付きで保存
out_gpkg = "outputs/shizuoka_hazard.gpkg"
route_gdf = gpd.GeoDataFrame(
    {
        "meters":[meters],
        "eta_min_walk":[eta_walk_min],
        "eta_min_walk_safe":[eta_walk_safe],
        "eta_min_drive":[eta_drive_min],
    },
    geometry=[route_geom], crs="EPSG:4326",
)
route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
print("✅ 交差なし・保存完了")

# 5) (任意)Foliumに表示するならここで
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

のコードを
# === ルート検証 → ETA → 保存(キャッシュ対応) ===
from shapely.geometry import LineString, Point
import geopandas as gpd
import hashlib, json
from datetime import datetime
from fiona import listlayers
import osmnx as ox
import networkx as nx

GPKG = "outputs/shizuoka_hazard.gpkg"
METRIC = "EPSG:32654"  # 静岡周辺

# 0) 津波 union はキャッシュを優先(なければ作る)
if "tsunami_union" in listlayers(GPKG):
    flood_union = gpd.read_file(GPKG, layer="tsunami_union")
else:
    flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    flood_union.to_file(GPKG, layer="tsunami_union", driver="GPKG")

# 0') ハザードの簡易メタ(A40が変わったらルートキャッシュ無効化に使う)
ti = gpd.read_file(GPKG, layer="tsunami_inundation")
hazard_meta = f'{len(ti)}|{",".join(map(str, ti.total_bounds))}'

# 1) キャッシュキー生成(端点・モード・半径・ハザードメタ)
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=12000, hazard_meta=None):
    payload = {"home": home_wgs, "dest": dest_wgs, "mode": mode, "r": radius_m, "hz": hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

key = route_key(home_wgs, (dest_pt.y, dest_pt.x), mode="walk",
                radius_m=GRAPH_RADIUS_M, hazard_meta=hazard_meta)

# 2) ルートキャッシュにヒットすれば使う
route_gdf = None
use_cached_route = False
if "route_cache" in listlayers(GPKG):
    cache = gpd.read_file(GPKG, layer="route_cache")
    hit = cache.loc[cache["rid"] == key]
    if len(hit):
        route_gdf = hit.set_crs(4326)
        meters = float(route_gdf["meters"].iloc[0])
        eta_walk_min  = float(route_gdf["eta_min_walk"].iloc[0])
        eta_walk_safe = float(route_gdf["eta_min_walk_safe"].iloc[0])
        eta_drive_min = float(route_gdf["eta_min_drive"].iloc[0])
        coords = list(route_gdf.geometry.iloc[0].coords)
        route_latlon = [(lat, lon) for lon, lat in coords]  # folium用 (lat,lon)
        use_cached_route = True
        print("↩︎ ルート: キャッシュヒット")

if not use_cached_route:
    # 3) これまで通りの経路('route')から LineString を作成
    route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]  # (lon,lat)
    route_geom = LineString(route_lonlat)
    route_gdf  = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")

    # 4) 堅牢チェック(交差ゼロ&境界近接なし)
    route_m = route_gdf.to_crs(METRIC)
    flood_m = flood_union.to_crs(METRIC)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty

    if inside_len_m > 0 or near:
        raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
                           "別避難所 or 半径拡大で再探索してください。")

    # 5) 距離[m]→ETA
    try:
        route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u, v in zip(route[:-1], route[1:]):
            ed = G2.get_edge_data(u, v)
            meters += min(d.get("length", 0.0) for d in ed.values())

    walk_kmh = 4.8
    drive_kmh = 20.0
    eta_walk_min  = meters / (walk_kmh  * 1000 / 60)
    eta_drive_min = meters / (drive_kmh * 1000 / 60)
    eta_walk_safe = eta_walk_min * 1.3

    # 6) route_latest へ保存(検証済みのみ)
    out_gpkg = GPKG
    route_gdf = gpd.GeoDataFrame(
        {
            "meters":[meters],
            "eta_min_walk":[eta_walk_min],
            "eta_min_walk_safe":[eta_walk_safe],
            "eta_min_drive":[eta_drive_min],
            "created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        },
        geometry=[route_geom], crs="EPSG:4326",
    )
    route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
    print("✅ 交差なし・保存完了")

    # 7) ルートキャッシュに追記
    route_cache_rec = route_gdf.copy()
    route_cache_rec["rid"] = key
    try:
        route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG", if_exists="append")
    except TypeError:
        # 古いGeoPandasのためのフォールバック
        if "route_cache" in listlayers(out_gpkg):
            old = gpd.read_file(out_gpkg, layer="route_cache")
            gpd.GeoDataFrame(pd.concat([old, route_cache_rec], ignore_index=True), crs=4326)\
               .to_file(out_gpkg, layer="route_cache", driver="GPKG")
        else:
            route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG")

    # 8) folium用座標
    route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]

# 9) Folium表示(キャッシュ・新規どちらでも)
folium.PolyLine(
    route_latlon, weight=6, opacity=0.9,
    tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)

に書き換える

津波避難関連のモジュール化(失敗)

津波避難関連のモジュール化

touch config.py

これで定数ひとまとめ

from dataclasses import dataclass

@dataclass(frozen=True)
class Cfg:
    GPKG = "outputs/shizuoka_hazard.gpkg"
    EPSG_WGS84 = 4326
    EPSG_METRIC = 32654           # 静岡周辺
    GRAPH_RADIUS_M = 12000
    WALK_KMH = 4.8
    DRIVE_KMH = 20.0
    SAFETY_FACTOR = 1.3
touch data_io.py

内容は

import geopandas as gpd
from fiona import listlayers
from .config import Cfg

def load_layers():
    coast   = gpd.read_file(Cfg.GPKG, layer="coastline")
    tsunami = gpd.read_file(Cfg.GPKG, layer="tsunami_inundation")
    return coast, tsunami

def get_tsunami_union(tsunami):
    if "tsunami_union" in listlayers(Cfg.GPKG):
        return gpd.read_file(Cfg.GPKG, layer="tsunami_union")
    fu = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    fu.to_file(Cfg.GPKG, layer="tsunami_union", driver="GPKG")
    return fu

def write_route_latest(route_gdf):
    route_gdf.to_file(Cfg.GPKG, layer="route_latest", driver="GPKG")
touch hazard.py

これは(検証ロジック)

import geopandas as gpd
def validate_route(route_gdf, flood_union, epsg_metric, near_m=2.0):
    route_m = route_gdf.to_crs(epsg_metric)
    flood_m = flood_union.to_crs(epsg_metric)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(near_m), crs=epsg_metric)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty
    return inside_len_m, near
touch network.py

これはOSM取得&遮断

import osmnx as ox, geopandas as gpd, pandas as pd

def build_safe_graph(center_latlon, radius_m, tsunami):
    G = ox.graph_from_point(center_latlon, dist=radius_m, network_type="walk", simplify=True)
    nodes, edges = ox.graph_to_gdfs(G)
    edges = edges.reset_index()
    if edges.crs is None: edges = edges.set_crs(4326)
    elif edges.crs.to_epsg()!=4326: edges = edges.to_crs(4326)

    tag = gpd.sjoin(edges[["u","v","key","geometry"]], tsunami[["geometry"]],
                    how="left", predicate="intersects")
    blocked = (tag.assign(hit=tag["index_right"].notna())
                  .groupby(["u","v","key"], as_index=False)["hit"].max()
                  .rename(columns={"hit":"blocked"}))
    edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
    edges2["blocked"] = edges2["blocked"].fillna(False)
    edges_keep = edges2.loc[~edges2["blocked"]].copy()
    edges_keep["key"] = edges_keep["key"].fillna(0).astype(int)
    edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()
    used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
    nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()
    try:
        G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=G.graph)
    except AttributeError:
        from osmnx import utils_graph as ug
        G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=G.graph)
    return G2, nodes_keep, edges_keep
touch routing.py

これは経路・ETA

import osmnx as ox, networkx as nx, geopandas as gpd
from shapely.geometry import LineString
from .config import Cfg

def nearest_node(G, x, y):
    try: return ox.distance.nearest_nodes(G, X=x, Y=y)
    except AttributeError: return ox.nearest_nodes(G, X=x, Y=y)

def route_and_eta(G2, route_nodes):
    try:
        route_edges = ox.routing.route_to_gdf(G2, route_nodes, True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u,v in zip(route_nodes[:-1], route_nodes[1:]):
            ed = G2.get_edge_data(u,v); meters += min(d.get("length",0.0) for d in ed.values())
    walk = meters/(Cfg.WALK_KMH*1000/60)
    drive= meters/(Cfg.DRIVE_KMH*1000/60)
    return meters, walk, drive

def make_route_gdf(G2, route_nodes, meters, walk, drive):
    latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route_nodes]
    lonlat = [(x,y) for y,x in latlon]
    gdf = gpd.GeoDataFrame(
        {"meters":[meters],"eta_min_walk":[walk],
         "eta_min_walk_safe":[walk*Cfg.SAFETY_FACTOR],
         "eta_min_drive":[drive]},
        geometry=[LineString(lonlat)], crs=4326
    )
    return gdf, latlon
touch cache.py

これはキー生成&保存

import json, hashlib, geopandas as gpd
from fiona import listlayers
from .config import Cfg

def route_key(home_wgs, dest_wgs, mode="walk", radius_m=Cfg.GRAPH_RADIUS_M, hazard_meta=None):
    payload = {"home":home_wgs, "dest":dest_wgs, "mode":mode, "r":radius_m, "hz":hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

def load_cached_route(rid):
    if "route_cache" not in listlayers(Cfg.GPKG): return None
    gdf = gpd.read_file(Cfg.GPKG, layer="route_cache")
    hit = gdf.loc[gdf["rid"]==rid]
    return hit if len(hit) else None

def save_route_cache(route_gdf, rid):
    route_gdf = route_gdf.copy(); route_gdf["rid"] = rid
    try:
        route_gdf.to_file(Cfg.GPKG, layer="route_cache", driver="GPKG", if_exists="append")
    except TypeError:
        # 旧版対応
        import pandas as pd
        try:
            old = gpd.read_file(Cfg.GPKG, layer="route_cache")
            gpd.GeoDataFrame(pd.concat([old, route_gdf], ignore_index=True), crs=4326)\
               .to_file(Cfg.GPKG, layer="route_cache", driver="GPKG")
        except Exception:
            route_gdf.to_file(Cfg.GPKG, layer="route_cache", driver="GPKG")
 touch viz.py

これは表示するモジュール

import folium, geopandas as gpd

def add_layers_to_map(m, tsunami, edges_keep, route_latlon, tooltip):
    folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定").add_to(m)
    folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)
    folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip=tooltip).add_to(m)
    folium.LayerControl().add_to(m)

あとは
Notebook で

from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(G2, x=home_wgs[1], y=home_wgs[0])
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)

rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
if cached is not None:
    route_gdf = cached.set_crs(4326)
else:
    route_nodes = nx.shortest_path(G2, orig, dest, weight="length")
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m>0 or near: raise RuntimeError("ルート不適合")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)
/
を実行したが

---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[17], line 1
----> 1 from evac.config import Cfg
      2 from evac.data_io import load_layers, get_tsunami_union, write_route_latest
      3 from evac.network import build_safe_graph

ModuleNotFoundError: No module named 'evac'

これは
evac パッケージがPythonの検索パスに無いだけ

mkdir -p evac
touch evac/__init__.py

でファイルを作成

とりあえず中身は空でも良いらしい

あとは notebookで

import sys, os
sys.path.append(os.path.abspath("."))  # プロジェクト直下をパスに追加
# 以後 from evac.config import Cfg が通る

を記述

しかしうまくいかないため
Main.pyにして実行してみる

mv *.py evac 
mv evac/main.py .
python main.py   

これで実行

今度は

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 16, in <module>
    dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
                              ^^^^^^^
NameError: name 'dest_pt' is not defined

となる

dest_pt(目的地の点ジオメトリ)を作っていないのが原因

修正案は2つ
A) 手早い版(直線距離で最寄りの安全避難所を選ぶ)
B) 推奨版(上位N候補から“ネットワーク距離最短”を選ぶ)

Bを実行する

import geopandas as gpd
import networkx as nx
from shapely.geometry import Point

を追記

もともとの

# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(...)
dest = nearest_node(... dest_pt ...)
rid = route_key(...)

は削除

代わりに pick_best_shelter_by_network で route_nodes と dest_pt を得ます。
pick_best_shelter_by_network をまだモジュール化していないなら、main.py に一時定義してから使ってOKです(前に渡した実装をそのまま貼り付け)。
もしくは evac/routing_extras.py を作って関数を置き、from evac.routing_extras import … で読み込む形にしてください。
キャッシュヒット時は route_gdf をそのまま使えるので、再計算・検証はスキップ(必要なら最新 tsunami_union で再検証だけ行うのも可)。

from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(G2, x=home_wgs[1], y=home_wgs[0])
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)

rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
if cached is not None:
    route_gdf = cached.set_crs(4326)
else:
    route_nodes = nx.shortest_path(G2, orig, dest, weight="length")
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m>0 or near: raise RuntimeError("ルート不適合")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)

のコードを

# --- ここまで同じ ---
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache

import geopandas as gpd
import networkx as nx
from shapely.geometry import Point

# pick_best_shelter_by_network をどこかに実装済みなら import
# 例: evac.routing_extras に置いた場合
# from evac.routing_extras import pick_best_shelter_by_network
# 未実装なら、ひとまず前に貼った関数を main.py に一時定義してもOK

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# ====== ここを「目的地は別途選定済み」から 置き換え ======
# 避難所レイヤ(安全優先 → なければ全体)
try:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_safe")
    if TARGET.empty:
        raise IOError
except Exception:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_all")

if TARGET.empty:
    raise RuntimeError("避難所レイヤが空です(shelters_safe / shelters_all)。作成を確認してください。")

# 出発ノード
home_pt = Point(home_wgs[1], home_wgs[0])  # (lon, lat)
orig = nearest_node(G2, x=home_pt.x, y=home_pt.y)

# 上位N候補からネットワーク距離最短の避難所を選定
# ※ pick_best_shelter_by_network(G2, orig, TARGET, N=5) が定義済み前提
route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)

# 以降のキャッシュキー生成に dest_pt を使用
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
# ====== 置き換えここまで ======

if cached is not None:
    route_gdf = cached.set_crs(4326)
else:
    # 既に route_nodes があるので最短経路は再計算不要
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m > 0 or near:
        raise RuntimeError("ルート不適合")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)

に変更

しかし

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 16, in <module>
    dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
                              ^^^^^^^
NameError: name 'dest_pt' is not defined
snowpool@MacBookAir FamilySafetyMap % python main.py 
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 42, in <module>
    route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
NameError: name 'pick_best_shelter_by_network' is not defined

となる

エラー原因は2つ:
1. dest_pt を作る前に nearest_node(… dest_pt …) を呼んでいる(古い行が残っている)
2. pick_best_shelter_by_network をまだ定義/インポートしていない

このため

evac/routing_extras.py

を作成

# evac/routing_extras.py
import numpy as np
import networkx as nx
import osmnx as ox
import geopandas as gpd

def pick_best_shelter_by_network(G2, orig_node, shelters_gdf, N=5, epsg_metric=32654):
    """直線距離の上位N候補から、ネットワーク距離が最短の避難所を選ぶ。"""
    # 出発点の点(lon,lat)
    hp = gpd.GeoSeries([ (G2.nodes[orig_node]["x"], G2.nodes[orig_node]["y"]) ],
                       crs=4326).to_crs(epsg_metric).iloc[0]
    cand = shelters_gdf.to_crs(epsg_metric).copy()
    cand["__d__"] = cand.geometry.distance(hp)
    cand = shelters_gdf.loc[cand.sort_values("__d__").head(N).index].copy()

    best_route = None
    best_len = np.inf
    best_dest_pt = None

    for _, row in cand.iterrows():
        try:
            try:
                dnode = ox.distance.nearest_nodes(G2, X=row.geometry.x, Y=row.geometry.y)
            except AttributeError:
                dnode = ox.nearest_nodes(G2, X=row.geometry.x, Y=row.geometry.y)
            r = nx.shortest_path(G2, orig_node, dnode, weight="length")
            # 経路長[m]
            try:
                meters = float(ox.routing.route_to_gdf(G2, r, True)["length"].sum())
            except Exception:
                meters = 0.0
                for u, v in zip(r[:-1], r[1:]):
                    ed = G2.get_edge_data(u, v)
                    meters += min(d.get("length", 0.0) for d in ed.values())
            if meters < best_len:
                best_len = meters
                best_route = r
                best_dest_pt = row.geometry
        except nx.NetworkXNoPath:
            continue

    if best_route is None:
        raise RuntimeError("到達可能な候補が見つかりません。半径またはNを増やしてください。")
    return best_route, best_len, best_dest_pt

次に
main.py の該当箇所を置き換え

# ❌ 削除
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))

コードを変更

# main.py
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache
from evac.routing_extras import pick_best_shelter_by_network   # ← 追加

import geopandas as gpd
import networkx as nx
from shapely.geometry import Point

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# 避難所レイヤ(安全があれば優先)
try:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_safe")
    if TARGET.empty:
        raise IOError
except Exception:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_all")
if TARGET.empty:
    raise RuntimeError("避難所レイヤが空です(shelters_safe / shelters_all)。")

# 出発ノード
home_pt = Point(home_wgs[1], home_wgs[0])
orig = nearest_node(G2, x=home_pt.x, y=home_pt.y)

# ✅ 上位Nからネットワーク距離最短を選定(ここで dest_pt が得られる)
route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)

# 以降のキャッシュキー生成は dest_pt を使う
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)

if cached is not None:
    print("↩︎ ルート: キャッシュヒット")
    route_gdf = cached.set_crs(4326)
else:
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m > 0 or near:
        raise RuntimeError("ルート不適合: 浸水域と交差/近接あり")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)