津波避難関連のモジュール化
touch config.py
これで定数ひとまとめ
from dataclasses import dataclass
@dataclass(frozen=True)
class Cfg:
GPKG = "outputs/shizuoka_hazard.gpkg"
EPSG_WGS84 = 4326
EPSG_METRIC = 32654 # 静岡周辺
GRAPH_RADIUS_M = 12000
WALK_KMH = 4.8
DRIVE_KMH = 20.0
SAFETY_FACTOR = 1.3
touch data_io.py
内容は
import geopandas as gpd
from fiona import listlayers
from .config import Cfg
def load_layers():
coast = gpd.read_file(Cfg.GPKG, layer="coastline")
tsunami = gpd.read_file(Cfg.GPKG, layer="tsunami_inundation")
return coast, tsunami
def get_tsunami_union(tsunami):
if "tsunami_union" in listlayers(Cfg.GPKG):
return gpd.read_file(Cfg.GPKG, layer="tsunami_union")
fu = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
fu.to_file(Cfg.GPKG, layer="tsunami_union", driver="GPKG")
return fu
def write_route_latest(route_gdf):
route_gdf.to_file(Cfg.GPKG, layer="route_latest", driver="GPKG")
touch hazard.py
これは(検証ロジック)
import geopandas as gpd
def validate_route(route_gdf, flood_union, epsg_metric, near_m=2.0):
route_m = route_gdf.to_crs(epsg_metric)
flood_m = flood_union.to_crs(epsg_metric)
inter = gpd.overlay(route_m, flood_m, how="intersection")
inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0
buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(near_m), crs=epsg_metric)
near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty
return inside_len_m, near
touch network.py
これはOSM取得&遮断
import osmnx as ox, geopandas as gpd, pandas as pd
def build_safe_graph(center_latlon, radius_m, tsunami):
G = ox.graph_from_point(center_latlon, dist=radius_m, network_type="walk", simplify=True)
nodes, edges = ox.graph_to_gdfs(G)
edges = edges.reset_index()
if edges.crs is None: edges = edges.set_crs(4326)
elif edges.crs.to_epsg()!=4326: edges = edges.to_crs(4326)
tag = gpd.sjoin(edges[["u","v","key","geometry"]], tsunami[["geometry"]],
how="left", predicate="intersects")
blocked = (tag.assign(hit=tag["index_right"].notna())
.groupby(["u","v","key"], as_index=False)["hit"].max()
.rename(columns={"hit":"blocked"}))
edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
edges2["blocked"] = edges2["blocked"].fillna(False)
edges_keep = edges2.loc[~edges2["blocked"]].copy()
edges_keep["key"] = edges_keep["key"].fillna(0).astype(int)
edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()
used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()
try:
G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=G.graph)
except AttributeError:
from osmnx import utils_graph as ug
G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=G.graph)
return G2, nodes_keep, edges_keep
touch routing.py
これは経路・ETA
import osmnx as ox, networkx as nx, geopandas as gpd
from shapely.geometry import LineString
from .config import Cfg
def nearest_node(G, x, y):
try: return ox.distance.nearest_nodes(G, X=x, Y=y)
except AttributeError: return ox.nearest_nodes(G, X=x, Y=y)
def route_and_eta(G2, route_nodes):
try:
route_edges = ox.routing.route_to_gdf(G2, route_nodes, True)
meters = float(route_edges["length"].sum())
except Exception:
meters = 0.0
for u,v in zip(route_nodes[:-1], route_nodes[1:]):
ed = G2.get_edge_data(u,v); meters += min(d.get("length",0.0) for d in ed.values())
walk = meters/(Cfg.WALK_KMH*1000/60)
drive= meters/(Cfg.DRIVE_KMH*1000/60)
return meters, walk, drive
def make_route_gdf(G2, route_nodes, meters, walk, drive):
latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route_nodes]
lonlat = [(x,y) for y,x in latlon]
gdf = gpd.GeoDataFrame(
{"meters":[meters],"eta_min_walk":[walk],
"eta_min_walk_safe":[walk*Cfg.SAFETY_FACTOR],
"eta_min_drive":[drive]},
geometry=[LineString(lonlat)], crs=4326
)
return gdf, latlon
touch cache.py
これはキー生成&保存
import json, hashlib, geopandas as gpd
from fiona import listlayers
from .config import Cfg
def route_key(home_wgs, dest_wgs, mode="walk", radius_m=Cfg.GRAPH_RADIUS_M, hazard_meta=None):
payload = {"home":home_wgs, "dest":dest_wgs, "mode":mode, "r":radius_m, "hz":hazard_meta}
return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()
def load_cached_route(rid):
if "route_cache" not in listlayers(Cfg.GPKG): return None
gdf = gpd.read_file(Cfg.GPKG, layer="route_cache")
hit = gdf.loc[gdf["rid"]==rid]
return hit if len(hit) else None
def save_route_cache(route_gdf, rid):
route_gdf = route_gdf.copy(); route_gdf["rid"] = rid
try:
route_gdf.to_file(Cfg.GPKG, layer="route_cache", driver="GPKG", if_exists="append")
except TypeError:
# 旧版対応
import pandas as pd
try:
old = gpd.read_file(Cfg.GPKG, layer="route_cache")
gpd.GeoDataFrame(pd.concat([old, route_gdf], ignore_index=True), crs=4326)\
.to_file(Cfg.GPKG, layer="route_cache", driver="GPKG")
except Exception:
route_gdf.to_file(Cfg.GPKG, layer="route_cache", driver="GPKG")
touch viz.py
これは表示するモジュール
import folium, geopandas as gpd
def add_layers_to_map(m, tsunami, edges_keep, route_latlon, tooltip):
folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定").add_to(m)
folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)
folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip=tooltip).add_to(m)
folium.LayerControl().add_to(m)
あとは
Notebook で
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache
coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)
home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)
# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(G2, x=home_wgs[1], y=home_wgs[0])
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
if cached is not None:
route_gdf = cached.set_crs(4326)
else:
route_nodes = nx.shortest_path(G2, orig, dest, weight="length")
meters, walk, drive = route_and_eta(G2, route_nodes)
route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
if inside_len_m>0 or near: raise RuntimeError("ルート不適合")
write_route_latest(route_gdf)
save_route_cache(route_gdf, rid)
/
を実行したが
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[17], line 1
----> 1 from evac.config import Cfg
2 from evac.data_io import load_layers, get_tsunami_union, write_route_latest
3 from evac.network import build_safe_graph
ModuleNotFoundError: No module named 'evac'
これは
evac パッケージがPythonの検索パスに無いだけ
mkdir -p evac touch evac/__init__.py
でファイルを作成
とりあえず中身は空でも良いらしい
あとは notebookで
import sys, os
sys.path.append(os.path.abspath(".")) # プロジェクト直下をパスに追加
# 以後 from evac.config import Cfg が通る
を記述
しかしうまくいかないため
Main.pyにして実行してみる
mv *.py evac mv evac/main.py . python main.py
これで実行
今度は
Traceback (most recent call last):
File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 16, in <module>
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
^^^^^^^
NameError: name 'dest_pt' is not defined
となる
dest_pt(目的地の点ジオメトリ)を作っていないのが原因
修正案は2つ
A) 手早い版(直線距離で最寄りの安全避難所を選ぶ)
B) 推奨版(上位N候補から“ネットワーク距離最短”を選ぶ)
Bを実行する
import geopandas as gpd import networkx as nx from shapely.geometry import Point
を追記
もともとの
# 目的地は別途選定済みとする: dest_pt orig = nearest_node(...) dest = nearest_node(... dest_pt ...) rid = route_key(...)
は削除
代わりに pick_best_shelter_by_network で route_nodes と dest_pt を得ます。
pick_best_shelter_by_network をまだモジュール化していないなら、main.py に一時定義してから使ってOKです(前に渡した実装をそのまま貼り付け)。
もしくは evac/routing_extras.py を作って関数を置き、from evac.routing_extras import … で読み込む形にしてください。
キャッシュヒット時は route_gdf をそのまま使えるので、再計算・検証はスキップ(必要なら最新 tsunami_union で再検証だけ行うのも可)。
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache
coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)
home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)
# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(G2, x=home_wgs[1], y=home_wgs[0])
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
if cached is not None:
route_gdf = cached.set_crs(4326)
else:
route_nodes = nx.shortest_path(G2, orig, dest, weight="length")
meters, walk, drive = route_and_eta(G2, route_nodes)
route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
if inside_len_m>0 or near: raise RuntimeError("ルート不適合")
write_route_latest(route_gdf)
save_route_cache(route_gdf, rid)
のコードを
# --- ここまで同じ ---
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache
import geopandas as gpd
import networkx as nx
from shapely.geometry import Point
# pick_best_shelter_by_network をどこかに実装済みなら import
# 例: evac.routing_extras に置いた場合
# from evac.routing_extras import pick_best_shelter_by_network
# 未実装なら、ひとまず前に貼った関数を main.py に一時定義してもOK
coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)
home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)
# ====== ここを「目的地は別途選定済み」から 置き換え ======
# 避難所レイヤ(安全優先 → なければ全体)
try:
TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_safe")
if TARGET.empty:
raise IOError
except Exception:
TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_all")
if TARGET.empty:
raise RuntimeError("避難所レイヤが空です(shelters_safe / shelters_all)。作成を確認してください。")
# 出発ノード
home_pt = Point(home_wgs[1], home_wgs[0]) # (lon, lat)
orig = nearest_node(G2, x=home_pt.x, y=home_pt.y)
# 上位N候補からネットワーク距離最短の避難所を選定
# ※ pick_best_shelter_by_network(G2, orig, TARGET, N=5) が定義済み前提
route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)
# 以降のキャッシュキー生成に dest_pt を使用
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
# ====== 置き換えここまで ======
if cached is not None:
route_gdf = cached.set_crs(4326)
else:
# 既に route_nodes があるので最短経路は再計算不要
meters, walk, drive = route_and_eta(G2, route_nodes)
route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
if inside_len_m > 0 or near:
raise RuntimeError("ルート不適合")
write_route_latest(route_gdf)
save_route_cache(route_gdf, rid)
に変更
しかし
Traceback (most recent call last):
File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 16, in <module>
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
^^^^^^^
NameError: name 'dest_pt' is not defined
snowpool@MacBookAir FamilySafetyMap % python main.py
Traceback (most recent call last):
File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 42, in <module>
route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
NameError: name 'pick_best_shelter_by_network' is not defined
となる
エラー原因は2つ:
1. dest_pt を作る前に nearest_node(… dest_pt …) を呼んでいる(古い行が残っている)
2. pick_best_shelter_by_network をまだ定義/インポートしていない
このため
evac/routing_extras.py
を作成
# evac/routing_extras.py
import numpy as np
import networkx as nx
import osmnx as ox
import geopandas as gpd
def pick_best_shelter_by_network(G2, orig_node, shelters_gdf, N=5, epsg_metric=32654):
"""直線距離の上位N候補から、ネットワーク距離が最短の避難所を選ぶ。"""
# 出発点の点(lon,lat)
hp = gpd.GeoSeries([ (G2.nodes[orig_node]["x"], G2.nodes[orig_node]["y"]) ],
crs=4326).to_crs(epsg_metric).iloc[0]
cand = shelters_gdf.to_crs(epsg_metric).copy()
cand["__d__"] = cand.geometry.distance(hp)
cand = shelters_gdf.loc[cand.sort_values("__d__").head(N).index].copy()
best_route = None
best_len = np.inf
best_dest_pt = None
for _, row in cand.iterrows():
try:
try:
dnode = ox.distance.nearest_nodes(G2, X=row.geometry.x, Y=row.geometry.y)
except AttributeError:
dnode = ox.nearest_nodes(G2, X=row.geometry.x, Y=row.geometry.y)
r = nx.shortest_path(G2, orig_node, dnode, weight="length")
# 経路長[m]
try:
meters = float(ox.routing.route_to_gdf(G2, r, True)["length"].sum())
except Exception:
meters = 0.0
for u, v in zip(r[:-1], r[1:]):
ed = G2.get_edge_data(u, v)
meters += min(d.get("length", 0.0) for d in ed.values())
if meters < best_len:
best_len = meters
best_route = r
best_dest_pt = row.geometry
except nx.NetworkXNoPath:
continue
if best_route is None:
raise RuntimeError("到達可能な候補が見つかりません。半径またはNを増やしてください。")
return best_route, best_len, best_dest_pt
次に
main.py の該当箇所を置き換え
# ❌ 削除 dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y) rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
コードを変更
# main.py
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache
from evac.routing_extras import pick_best_shelter_by_network # ← 追加
import geopandas as gpd
import networkx as nx
from shapely.geometry import Point
coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)
home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)
# 避難所レイヤ(安全があれば優先)
try:
TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_safe")
if TARGET.empty:
raise IOError
except Exception:
TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_all")
if TARGET.empty:
raise RuntimeError("避難所レイヤが空です(shelters_safe / shelters_all)。")
# 出発ノード
home_pt = Point(home_wgs[1], home_wgs[0])
orig = nearest_node(G2, x=home_pt.x, y=home_pt.y)
# ✅ 上位Nからネットワーク距離最短を選定(ここで dest_pt が得られる)
route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)
# 以降のキャッシュキー生成は dest_pt を使う
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
if cached is not None:
print("↩︎ ルート: キャッシュヒット")
route_gdf = cached.set_crs(4326)
else:
meters, walk, drive = route_and_eta(G2, route_nodes)
route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
if inside_len_m > 0 or near:
raise RuntimeError("ルート不適合: 浸水域と交差/近接あり")
write_route_latest(route_gdf)
save_route_cache(route_gdf, rid)