再度 notebook での確認
import sys, os
sys.path.append(os.path.abspath(".")) # プロジェクト直下をパスに追加
# 以後 from evac.config import Cfg が通る
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, LineString
import osmnx as ox
import networkx as nx
import folium
import os
GPKG = "outputs/shizuoka_hazard.gpkg"
# 1) 既存レイヤ読み込み(両方とも 4326)
coast = gpd.read_file(GPKG, layer="coastline")
tsunami = gpd.read_file(GPKG, layer="tsunami_inundation")
# ----- パラメータ -----
# 自宅座標(lat, lon)←必ず置き換え
home_wgs = (34.728, 137.978)
# OSM道路の取得半径[m](到達不能なら大きく:15000, 20000…)
GRAPH_RADIUS_M = 12000
# 避難所CSV(cp932想定)
SHELTER_CSV = "data/20250801_指定緊急避難場所一覧.CSV"
# ----------------------
# 2) 避難所CSV→点化(列名は自動検出)
df = pd.read_csv(SHELTER_CSV, encoding="cp932")
lat_candidates =
lon_candidates =
if not lat_candidates or not lon_candidates:
raise ValueError("CSVに緯度/経度列が見つかりません。列名を教えてください。")
lat_col, lon_col = lat_candidates[0], lon_candidates[0]
df[lat_col] = pd.to_numeric(df[lat_col], errors="coerce")
df[lon_col] = pd.to_numeric(df[lon_col], errors="coerce")
df = df.dropna(subset=[lat_col, lon_col])
shelters = gpd.GeoDataFrame(df.copy(),
geometry=gpd.points_from_xy(df[lon_col], df[lat_col], crs="EPSG:4326")
)
# 3) 安全な避難所 = 浸水域と非交差
# (unionはメモリ負荷になるので、空間結合で判定)
safe_join = gpd.sjoin(shelters, tsunami[["geometry"]], how="left", predicate="intersects")
shelters["safe"] = safe_join["index_right"].isna()
safe_shelters = shelters[shelters["safe"]].copy()
# --- 4) OSM道路取得(徒歩網) ---
G = ox.graph_from_point(home_wgs, dist=GRAPH_RADIUS_M, network_type="walk", simplify=True, retain_all=False)
nodes, edges = ox.graph_to_gdfs(G, nodes=True, edges=True)
# u,v,key を“列”にする(重要)
edges = edges.reset_index()
# CRSを4326に
if edges.crs is None:
edges = edges.set_crs(4326)
elif edges.crs.to_epsg() != 4326:
edges = edges.to_crs(4326)
# --- 5) 浸水域と交差する道路を除去(sjoin→集計で一意化) ---
left = edges[["u","v","key","geometry"]].copy() # edges は reset_index() 済み想定
right = tsunami[["geometry"]]
tag = gpd.sjoin(left, right, how="left", predicate="intersects")
blocked = (
tag.assign(hit=tag["index_right"].notna())
.groupby(["u","v","key"], as_index=False)["hit"].max()
.rename(columns={"hit":"blocked"})
)
edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
edges2["blocked"] = edges2["blocked"].fillna(False)
edges_keep = edges2.loc[~edges2["blocked"]].copy()
# ★ ここが重要:graph_from_gdfs に渡す前に MultiIndex に戻す
# (key が float になっていたら int にしておくと安心)
if edges_keep["key"].isna().any():
edges_keep["key"] = edges_keep["key"].fillna(0)
edges_keep["key"] = edges_keep["key"].astype(int)
edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()
# --- 6) サブグラフ再構築(通行可のみ) ---
used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()
# nodes_keep の geometry 列が欠けていたら補完
if "geometry" not in nodes_keep or nodes_keep["geometry"].isna().any():
nodes_keep = nodes_keep.set_geometry(
gpd.points_from_xy(nodes_keep["x"], nodes_keep["y"]), crs=edges_keep.crs
)
# OSMnx のバージョン差に対応
try:
G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))
except AttributeError:
from osmnx import utils_graph as ug
G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=getattr(G, "graph", None))
# 7) 出発・目的地ノード(最寄り“安全”避難所)
home_pt = Point(home_wgs[1], home_wgs[0])
# 最寄り安全避難所(直線距離)…なければ全体から選ぶ
TARGET = safe_shelters if len(safe_shelters) else shelters
METRIC = "EPSG:32654"# 静岡周辺 UTM
hp = gpd.GeoSeries([home_pt], crs=4326).to_crs(METRIC).iloc[0]
tp = TARGET.to_crs(METRIC).copy()
tp["__dist_m__"] = tp.geometry.distance(hp)
dest_pt = TARGET.loc[tp["__dist_m__"].idxmin(), "geometry"]
# ▶ 互換呼び分け
try:
orig = ox.distance.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
dest = ox.distance.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)
except AttributeError:
# 新API名(環境によってはこちらが有効)
orig = ox.nearest_nodes(G2, X=home_pt.x, Y=home_pt.y)
dest = ox.nearest_nodes(G2, X=dest_pt.x, Y=dest_pt.y)
# 8) 最短経路(通行可能グラフ)
try:
route = nx.shortest_path(G2, orig, dest, weight="length")
except nx.NetworkXNoPath:
raise RuntimeError("到達経路なし。GRAPH_RADIUS_Mを増やすか、別の避難所で再試行を。")
# 9) GPKGへ追記
# 既にある場合は上書きしたいなら driverオプションを切り替える(ここは追記)
shelters.to_file(GPKG, layer="shelters_all", driver="GPKG")
safe_shelters.to_file(GPKG, layer="shelters_safe", driver="GPKG")
edges.to_file(GPKG, layer="roads_all", driver="GPKG")
edges_keep.to_file(GPKG, layer="roads_safe", driver="GPKG")
# 10) Folium地図
m = folium.Map(location=[home_wgs[0], home_wgs[1]], zoom_start=12, control_scale=True)
folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定(A40)").add_to(m)
folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)
# 経路を線で描画
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip="避難経路").add_to(m)
# 出発・目的地
folium.Marker([home_wgs[0], home_wgs[1]], popup="現在地", icon=folium.Icon(color="green")).add_to(m)
folium.Marker([dest_pt.y, dest_pt.x], popup="避難所", icon=folium.Icon(color="blue")).add_to(m)
# 避難所レイヤ(安全のみ強調)
folium.GeoJson(safe_shelters.__geo_interface__, name="安全な避難所",
marker=folium.CircleMarker(radius=4)).add_to(m)
folium.LayerControl().add_to(m)
# 保存(任意)
m.save("outputs/evac.html")
print("✓ 完了: GPKG追記 & outputs/evac.html を作成")
print("edges_keep index levels:", getattr(edges_keep.index, "nlevels", 1)) # → 3 が正解
print(edges_keep.index.names) # → ['u','v','key'] が正解
# # ルートの延長[m]は G2 の edge 属性 'length' を合計(OSMnxが付けてくれてます)
# meters = sum(ox.utils_graph.get_route_edge_attributes(G2, route, 'length'))
# walk_kmh = 4.8 # 歩行速度(必要に応じて変更)
# drive_kmh = 20.0 # 災害時の車想定の控えめ値(使うなら)
# eta_walk_min = meters / (walk_kmh * 1000 / 60)
# eta_drive_min = meters / (drive_kmh * 1000 / 60)
# print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")
# ▼ 距離[m]の取り方をバージョン互換で
try:
# 新しめのOSMnx(推奨): ルートをGDF化して length を合計
route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
meters = float(route_edges["length"].sum())
except Exception:
# フォールバック: NetworkXからエッジ属性 length を合計
meters = 0.0
for u, v in zip(route[:-1], route[1:]):
ed = G2.get_edge_data(u, v) # MultiDiGraph: {key: attr_dict}
if isinstance(ed, dict) and len(ed):
meters += min(d.get("length", 0.0) for d in ed.values())
# ETA(必要なら速度を調整)
walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min = meters / (walk_kmh * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
print(f"距離: {meters:.0f} m, 徒歩ETA: {eta_walk_min:.1f} 分, 車ETA(仮): {eta_drive_min:.1f} 分")
# Folium のツールチップにも表示
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
route_latlon, weight=6, opacity=0.9,
tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分"
).add_to(m)
from shapely.geometry import LineString
import geopandas as gpd
route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route]
route_gdf = gpd.GeoDataFrame(
{"meters":[meters], "eta_min_walk":[eta_walk_min], "eta_min_drive":[eta_drive_min]},
geometry=[LineString(route_lonlat)], crs="EPSG:4326"
)
route_gdf.to_file("outputs/shizuoka_hazard.gpkg", layer="route_latest", driver="GPKG")
import geopandas as gpd
# ok = not route_gdf.intersects(tsunami.unary_union).iloc[0]
# print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")
# 旧: tsunami.unary_union ← 警告が出る
# 新:
flood_union = tsunami.geometry.union_all()
ok = not route_gdf.intersects(flood_union).iloc[0]
print("交差なし✅" if ok else "⚠️ 交差あり(パラメータ見直し)")
# === 検証→ETA→保存(検証に通ったルートだけ保存・表示) ===
from shapely.geometry import LineString
import geopandas as gpd
# 1) ルートのLineString(まだ属性は付けない)
route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route] # (lon,lat)
route_geom = LineString(route_lonlat)
route_gdf = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")
# 2) 堅牢チェック(交差ゼロ&境界近接なし)
# union_all が使えない環境向けにフォールバックも用意
try:
flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs="EPSG:4326")
except Exception:
flood_union = gpd.GeoDataFrame(geometry=[tsunami.unary_union], crs="EPSG:4326")
METRIC = "EPSG:32654" # 静岡周辺(必要なら地域に合わせて変更)
route_m = route_gdf.to_crs(METRIC)
flood_m = flood_union.to_crs(METRIC)
# 交差長[m]
inter = gpd.overlay(route_m, flood_m, how="intersection")
inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0
# 近接(境界 ±2m 以内をNGにする例)
# near = not gpd.overlay(route_m.buffer(2.0), flood_m, how="intersection").empty
buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty
if inside_len_m > 0 or near:
raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
"別避難所 or 半径拡大で再探索してください。")
# 3) 距離[m]→ETA(バージョン互換)
try:
route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
meters = float(route_edges["length"].sum())
except Exception:
meters = 0.0
for u, v in zip(route[:-1], route[1:]):
ed = G2.get_edge_data(u, v)
meters += min(d.get("length", 0.0) for d in ed.values())
walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min = meters / (walk_kmh * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
eta_walk_safe = eta_walk_min * 1.3 # 安全係数(例:+30%)
# 4) GPKG に“合格ルート”だけ属性付きで保存
out_gpkg = "outputs/shizuoka_hazard.gpkg"
route_gdf = gpd.GeoDataFrame(
{
"meters":[meters],
"eta_min_walk":[eta_walk_min],
"eta_min_walk_safe":[eta_walk_safe],
"eta_min_drive":[eta_drive_min],
},
geometry=[route_geom], crs="EPSG:4326",
)
route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
print("✅ 交差なし・保存完了")
# 5) (任意)Foliumに表示するならここで
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
route_latlon, weight=6, opacity=0.9,
tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)
!ogrinfo -so outputs/shizuoka_hazard.gpkg route_latest
import geopandas as gpd
gpd.read_file("outputs/shizuoka_hazard.gpkg", layer="route_latest")[["meters","eta_min_walk","eta_min_walk_safe","eta_min_drive"]]
までは動く
どこからバグったかを調べたら
まとめ(差し込む位置の目安)
1. 目的地 dest_pt 決定直後に「キャッシュ照会ブロック」を挿入
2. use_cached_route が False なら従来の「計算→検証→保存」ブロックへ進む
3. 保存直後に route_cache へ追記
4. True なら保存はスキップし、地図表示へ直行(または再検証のみ実施)
これで「既知の組み合わせは即復帰/未知は新規計算」の運用ができます。
これはモジュールにして読み込んだ方が良い?
というあたりでバグ発生
つまりここから無理にモジュールしたのが原因で先にプロトタイプを作ればいい
ルート再利用性を高めたいので、ルート結果をキー付きでGPKGにキャッシュする機能を追加
# === 検証→ETA→保存(検証に通ったルートだけ保存・表示) ===
from shapely.geometry import LineString
import geopandas as gpd
# 1) ルートのLineString(まだ属性は付けない)
route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route] # (lon,lat)
route_geom = LineString(route_lonlat)
route_gdf = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")
# 2) 堅牢チェック(交差ゼロ&境界近接なし)
# union_all が使えない環境向けにフォールバックも用意
try:
flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs="EPSG:4326")
except Exception:
flood_union = gpd.GeoDataFrame(geometry=[tsunami.unary_union], crs="EPSG:4326")
METRIC = "EPSG:32654" # 静岡周辺(必要なら地域に合わせて変更)
route_m = route_gdf.to_crs(METRIC)
flood_m = flood_union.to_crs(METRIC)
# 交差長[m]
inter = gpd.overlay(route_m, flood_m, how="intersection")
inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0
# 近接(境界 ±2m 以内をNGにする例)
# near = not gpd.overlay(route_m.buffer(2.0), flood_m, how="intersection").empty
buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty
if inside_len_m > 0 or near:
raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
"別避難所 or 半径拡大で再探索してください。")
# 3) 距離[m]→ETA(バージョン互換)
try:
route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
meters = float(route_edges["length"].sum())
except Exception:
meters = 0.0
for u, v in zip(route[:-1], route[1:]):
ed = G2.get_edge_data(u, v)
meters += min(d.get("length", 0.0) for d in ed.values())
walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min = meters / (walk_kmh * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
eta_walk_safe = eta_walk_min * 1.3 # 安全係数(例:+30%)
# 4) GPKG に“合格ルート”だけ属性付きで保存
out_gpkg = "outputs/shizuoka_hazard.gpkg"
route_gdf = gpd.GeoDataFrame(
{
"meters":[meters],
"eta_min_walk":[eta_walk_min],
"eta_min_walk_safe":[eta_walk_safe],
"eta_min_drive":[eta_drive_min],
},
geometry=[route_geom], crs="EPSG:4326",
)
route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
print("✅ 交差なし・保存完了")
# 5) (任意)Foliumに表示するならここで
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
folium.PolyLine(
route_latlon, weight=6, opacity=0.9,
tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)
のコードを
# === ルート検証 → ETA → 保存(キャッシュ対応) ===
from shapely.geometry import LineString, Point
import geopandas as gpd
import hashlib, json
from datetime import datetime
from fiona import listlayers
import osmnx as ox
import networkx as nx
GPKG = "outputs/shizuoka_hazard.gpkg"
METRIC = "EPSG:32654" # 静岡周辺
# 0) 津波 union はキャッシュを優先(なければ作る)
if "tsunami_union" in listlayers(GPKG):
flood_union = gpd.read_file(GPKG, layer="tsunami_union")
else:
flood_union = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
flood_union.to_file(GPKG, layer="tsunami_union", driver="GPKG")
# 0') ハザードの簡易メタ(A40が変わったらルートキャッシュ無効化に使う)
ti = gpd.read_file(GPKG, layer="tsunami_inundation")
hazard_meta = f'{len(ti)}|{",".join(map(str, ti.total_bounds))}'
# 1) キャッシュキー生成(端点・モード・半径・ハザードメタ)
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=12000, hazard_meta=None):
payload = {"home": home_wgs, "dest": dest_wgs, "mode": mode, "r": radius_m, "hz": hazard_meta}
return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()
key = route_key(home_wgs, (dest_pt.y, dest_pt.x), mode="walk",
radius_m=GRAPH_RADIUS_M, hazard_meta=hazard_meta)
# 2) ルートキャッシュにヒットすれば使う
route_gdf = None
use_cached_route = False
if "route_cache" in listlayers(GPKG):
cache = gpd.read_file(GPKG, layer="route_cache")
hit = cache.loc[cache["rid"] == key]
if len(hit):
route_gdf = hit.set_crs(4326)
meters = float(route_gdf["meters"].iloc[0])
eta_walk_min = float(route_gdf["eta_min_walk"].iloc[0])
eta_walk_safe = float(route_gdf["eta_min_walk_safe"].iloc[0])
eta_drive_min = float(route_gdf["eta_min_drive"].iloc[0])
coords = list(route_gdf.geometry.iloc[0].coords)
route_latlon = [(lat, lon) for lon, lat in coords] # folium用 (lat,lon)
use_cached_route = True
print("↩︎ ルート: キャッシュヒット")
if not use_cached_route:
# 3) これまで通りの経路('route')から LineString を作成
route_lonlat = [(G2.nodes[n]["x"], G2.nodes[n]["y"]) for n in route] # (lon,lat)
route_geom = LineString(route_lonlat)
route_gdf = gpd.GeoDataFrame(geometry=[route_geom], crs="EPSG:4326")
# 4) 堅牢チェック(交差ゼロ&境界近接なし)
route_m = route_gdf.to_crs(METRIC)
flood_m = flood_union.to_crs(METRIC)
inter = gpd.overlay(route_m, flood_m, how="intersection")
inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0
buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(2.0), crs=METRIC)
near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty
if inside_len_m > 0 or near:
raise RuntimeError(f"ルート不適合: 交差長 {inside_len_m:.2f} m / 近接={near}. "
"別避難所 or 半径拡大で再探索してください。")
# 5) 距離[m]→ETA
try:
route_edges = ox.routing.route_to_gdf(G2, route, fill_edge_geometry=True)
meters = float(route_edges["length"].sum())
except Exception:
meters = 0.0
for u, v in zip(route[:-1], route[1:]):
ed = G2.get_edge_data(u, v)
meters += min(d.get("length", 0.0) for d in ed.values())
walk_kmh = 4.8
drive_kmh = 20.0
eta_walk_min = meters / (walk_kmh * 1000 / 60)
eta_drive_min = meters / (drive_kmh * 1000 / 60)
eta_walk_safe = eta_walk_min * 1.3
# 6) route_latest へ保存(検証済みのみ)
out_gpkg = GPKG
route_gdf = gpd.GeoDataFrame(
{
"meters":[meters],
"eta_min_walk":[eta_walk_min],
"eta_min_walk_safe":[eta_walk_safe],
"eta_min_drive":[eta_drive_min],
"created_at":[datetime.utcnow().isoformat(timespec="seconds")+"Z"],
},
geometry=[route_geom], crs="EPSG:4326",
)
route_gdf.to_file(out_gpkg, layer="route_latest", driver="GPKG")
print("✅ 交差なし・保存完了")
# 7) ルートキャッシュに追記
route_cache_rec = route_gdf.copy()
route_cache_rec["rid"] = key
try:
route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG", if_exists="append")
except TypeError:
# 古いGeoPandasのためのフォールバック
if "route_cache" in listlayers(out_gpkg):
old = gpd.read_file(out_gpkg, layer="route_cache")
gpd.GeoDataFrame(pd.concat([old, route_cache_rec], ignore_index=True), crs=4326)\
.to_file(out_gpkg, layer="route_cache", driver="GPKG")
else:
route_cache_rec.to_file(out_gpkg, layer="route_cache", driver="GPKG")
# 8) folium用座標
route_latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route]
# 9) Folium表示(キャッシュ・新規どちらでも)
folium.PolyLine(
route_latlon, weight=6, opacity=0.9,
tooltip=f"距離 {meters:.0f} m / 徒歩 {eta_walk_min:.1f} 分 (安全側 {eta_walk_safe:.1f} 分)"
).add_to(m)
に書き換える