津波避難関連のモジュール化(失敗)

津波避難関連のモジュール化

touch config.py

これで定数ひとまとめ

from dataclasses import dataclass

@dataclass(frozen=True)
class Cfg:
    GPKG = "outputs/shizuoka_hazard.gpkg"
    EPSG_WGS84 = 4326
    EPSG_METRIC = 32654           # 静岡周辺
    GRAPH_RADIUS_M = 12000
    WALK_KMH = 4.8
    DRIVE_KMH = 20.0
    SAFETY_FACTOR = 1.3
touch data_io.py

内容は

import geopandas as gpd
from fiona import listlayers
from .config import Cfg

def load_layers():
    coast   = gpd.read_file(Cfg.GPKG, layer="coastline")
    tsunami = gpd.read_file(Cfg.GPKG, layer="tsunami_inundation")
    return coast, tsunami

def get_tsunami_union(tsunami):
    if "tsunami_union" in listlayers(Cfg.GPKG):
        return gpd.read_file(Cfg.GPKG, layer="tsunami_union")
    fu = gpd.GeoDataFrame(geometry=[tsunami.geometry.union_all()], crs=tsunami.crs)
    fu.to_file(Cfg.GPKG, layer="tsunami_union", driver="GPKG")
    return fu

def write_route_latest(route_gdf):
    route_gdf.to_file(Cfg.GPKG, layer="route_latest", driver="GPKG")
touch hazard.py

これは(検証ロジック)

import geopandas as gpd
def validate_route(route_gdf, flood_union, epsg_metric, near_m=2.0):
    route_m = route_gdf.to_crs(epsg_metric)
    flood_m = flood_union.to_crs(epsg_metric)

    inter = gpd.overlay(route_m, flood_m, how="intersection")
    inside_len_m = float(inter.geometry.length.sum()) if len(inter) else 0.0

    buf_gdf = gpd.GeoDataFrame(geometry=route_m.buffer(near_m), crs=epsg_metric)
    near = not gpd.sjoin(buf_gdf, flood_m, how="inner", predicate="intersects").empty
    return inside_len_m, near
touch network.py

これはOSM取得&遮断

import osmnx as ox, geopandas as gpd, pandas as pd

def build_safe_graph(center_latlon, radius_m, tsunami):
    G = ox.graph_from_point(center_latlon, dist=radius_m, network_type="walk", simplify=True)
    nodes, edges = ox.graph_to_gdfs(G)
    edges = edges.reset_index()
    if edges.crs is None: edges = edges.set_crs(4326)
    elif edges.crs.to_epsg()!=4326: edges = edges.to_crs(4326)

    tag = gpd.sjoin(edges[["u","v","key","geometry"]], tsunami[["geometry"]],
                    how="left", predicate="intersects")
    blocked = (tag.assign(hit=tag["index_right"].notna())
                  .groupby(["u","v","key"], as_index=False)["hit"].max()
                  .rename(columns={"hit":"blocked"}))
    edges2 = edges.merge(blocked, on=["u","v","key"], how="left")
    edges2["blocked"] = edges2["blocked"].fillna(False)
    edges_keep = edges2.loc[~edges2["blocked"]].copy()
    edges_keep["key"] = edges_keep["key"].fillna(0).astype(int)
    edges_keep = edges_keep.set_index(["u","v","key"]).sort_index()
    used_nodes = pd.unique(edges_keep.reset_index()[["u","v"]].values.ravel())
    nodes_keep = nodes.loc[nodes.index.isin(used_nodes)].copy()
    try:
        G2 = ox.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=G.graph)
    except AttributeError:
        from osmnx import utils_graph as ug
        G2 = ug.graph_from_gdfs(nodes_keep, edges_keep, graph_attrs=G.graph)
    return G2, nodes_keep, edges_keep
touch routing.py

これは経路・ETA

import osmnx as ox, networkx as nx, geopandas as gpd
from shapely.geometry import LineString
from .config import Cfg

def nearest_node(G, x, y):
    try: return ox.distance.nearest_nodes(G, X=x, Y=y)
    except AttributeError: return ox.nearest_nodes(G, X=x, Y=y)

def route_and_eta(G2, route_nodes):
    try:
        route_edges = ox.routing.route_to_gdf(G2, route_nodes, True)
        meters = float(route_edges["length"].sum())
    except Exception:
        meters = 0.0
        for u,v in zip(route_nodes[:-1], route_nodes[1:]):
            ed = G2.get_edge_data(u,v); meters += min(d.get("length",0.0) for d in ed.values())
    walk = meters/(Cfg.WALK_KMH*1000/60)
    drive= meters/(Cfg.DRIVE_KMH*1000/60)
    return meters, walk, drive

def make_route_gdf(G2, route_nodes, meters, walk, drive):
    latlon = [(G2.nodes[n]["y"], G2.nodes[n]["x"]) for n in route_nodes]
    lonlat = [(x,y) for y,x in latlon]
    gdf = gpd.GeoDataFrame(
        {"meters":[meters],"eta_min_walk":[walk],
         "eta_min_walk_safe":[walk*Cfg.SAFETY_FACTOR],
         "eta_min_drive":[drive]},
        geometry=[LineString(lonlat)], crs=4326
    )
    return gdf, latlon
touch cache.py

これはキー生成&保存

import json, hashlib, geopandas as gpd
from fiona import listlayers
from .config import Cfg

def route_key(home_wgs, dest_wgs, mode="walk", radius_m=Cfg.GRAPH_RADIUS_M, hazard_meta=None):
    payload = {"home":home_wgs, "dest":dest_wgs, "mode":mode, "r":radius_m, "hz":hazard_meta}
    return hashlib.md5(json.dumps(payload, sort_keys=True).encode()).hexdigest()

def load_cached_route(rid):
    if "route_cache" not in listlayers(Cfg.GPKG): return None
    gdf = gpd.read_file(Cfg.GPKG, layer="route_cache")
    hit = gdf.loc[gdf["rid"]==rid]
    return hit if len(hit) else None

def save_route_cache(route_gdf, rid):
    route_gdf = route_gdf.copy(); route_gdf["rid"] = rid
    try:
        route_gdf.to_file(Cfg.GPKG, layer="route_cache", driver="GPKG", if_exists="append")
    except TypeError:
        # 旧版対応
        import pandas as pd
        try:
            old = gpd.read_file(Cfg.GPKG, layer="route_cache")
            gpd.GeoDataFrame(pd.concat([old, route_gdf], ignore_index=True), crs=4326)\
               .to_file(Cfg.GPKG, layer="route_cache", driver="GPKG")
        except Exception:
            route_gdf.to_file(Cfg.GPKG, layer="route_cache", driver="GPKG")
 touch viz.py

これは表示するモジュール

import folium, geopandas as gpd

def add_layers_to_map(m, tsunami, edges_keep, route_latlon, tooltip):
    folium.GeoJson(tsunami.__geo_interface__, name="津波浸水想定").add_to(m)
    folium.GeoJson(edges_keep.__geo_interface__, name="道路(浸水回避)").add_to(m)
    folium.PolyLine(route_latlon, weight=6, opacity=0.9, tooltip=tooltip).add_to(m)
    folium.LayerControl().add_to(m)

あとは
Notebook で

from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(G2, x=home_wgs[1], y=home_wgs[0])
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)

rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
if cached is not None:
    route_gdf = cached.set_crs(4326)
else:
    route_nodes = nx.shortest_path(G2, orig, dest, weight="length")
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m>0 or near: raise RuntimeError("ルート不適合")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)
/
を実行したが

---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
Cell In[17], line 1
----> 1 from evac.config import Cfg
      2 from evac.data_io import load_layers, get_tsunami_union, write_route_latest
      3 from evac.network import build_safe_graph

ModuleNotFoundError: No module named 'evac'

これは
evac パッケージがPythonの検索パスに無いだけ

mkdir -p evac
touch evac/__init__.py

でファイルを作成

とりあえず中身は空でも良いらしい

あとは notebookで

import sys, os
sys.path.append(os.path.abspath("."))  # プロジェクト直下をパスに追加
# 以後 from evac.config import Cfg が通る

を記述

しかしうまくいかないため
Main.pyにして実行してみる

mv *.py evac 
mv evac/main.py .
python main.py   

これで実行

今度は

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 16, in <module>
    dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
                              ^^^^^^^
NameError: name 'dest_pt' is not defined

となる

dest_pt(目的地の点ジオメトリ)を作っていないのが原因

修正案は2つ
A) 手早い版(直線距離で最寄りの安全避難所を選ぶ)
B) 推奨版(上位N候補から“ネットワーク距離最短”を選ぶ)

Bを実行する

import geopandas as gpd
import networkx as nx
from shapely.geometry import Point

を追記

もともとの

# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(...)
dest = nearest_node(... dest_pt ...)
rid = route_key(...)

は削除

代わりに pick_best_shelter_by_network で route_nodes と dest_pt を得ます。
pick_best_shelter_by_network をまだモジュール化していないなら、main.py に一時定義してから使ってOKです(前に渡した実装をそのまま貼り付け)。
もしくは evac/routing_extras.py を作って関数を置き、from evac.routing_extras import … で読み込む形にしてください。
キャッシュヒット時は route_gdf をそのまま使えるので、再計算・検証はスキップ(必要なら最新 tsunami_union で再検証だけ行うのも可)。

from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# 目的地は別途選定済みとする: dest_pt
orig = nearest_node(G2, x=home_wgs[1], y=home_wgs[0])
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)

rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
if cached is not None:
    route_gdf = cached.set_crs(4326)
else:
    route_nodes = nx.shortest_path(G2, orig, dest, weight="length")
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m>0 or near: raise RuntimeError("ルート不適合")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)

のコードを

# --- ここまで同じ ---
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache

import geopandas as gpd
import networkx as nx
from shapely.geometry import Point

# pick_best_shelter_by_network をどこかに実装済みなら import
# 例: evac.routing_extras に置いた場合
# from evac.routing_extras import pick_best_shelter_by_network
# 未実装なら、ひとまず前に貼った関数を main.py に一時定義してもOK

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# ====== ここを「目的地は別途選定済み」から 置き換え ======
# 避難所レイヤ(安全優先 → なければ全体)
try:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_safe")
    if TARGET.empty:
        raise IOError
except Exception:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_all")

if TARGET.empty:
    raise RuntimeError("避難所レイヤが空です(shelters_safe / shelters_all)。作成を確認してください。")

# 出発ノード
home_pt = Point(home_wgs[1], home_wgs[0])  # (lon, lat)
orig = nearest_node(G2, x=home_pt.x, y=home_pt.y)

# 上位N候補からネットワーク距離最短の避難所を選定
# ※ pick_best_shelter_by_network(G2, orig, TARGET, N=5) が定義済み前提
route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)

# 以降のキャッシュキー生成に dest_pt を使用
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)
# ====== 置き換えここまで ======

if cached is not None:
    route_gdf = cached.set_crs(4326)
else:
    # 既に route_nodes があるので最短経路は再計算不要
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m > 0 or near:
        raise RuntimeError("ルート不適合")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)

に変更

しかし

Traceback (most recent call last):
  File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 16, in <module>
    dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
                              ^^^^^^^
NameError: name 'dest_pt' is not defined
snowpool@MacBookAir FamilySafetyMap % python main.py 
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/FamilySafetyMap/main.py", line 42, in <module>
    route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)
                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
NameError: name 'pick_best_shelter_by_network' is not defined

となる

エラー原因は2つ:
1. dest_pt を作る前に nearest_node(… dest_pt …) を呼んでいる(古い行が残っている)
2. pick_best_shelter_by_network をまだ定義/インポートしていない

このため

evac/routing_extras.py

を作成

# evac/routing_extras.py
import numpy as np
import networkx as nx
import osmnx as ox
import geopandas as gpd

def pick_best_shelter_by_network(G2, orig_node, shelters_gdf, N=5, epsg_metric=32654):
    """直線距離の上位N候補から、ネットワーク距離が最短の避難所を選ぶ。"""
    # 出発点の点(lon,lat)
    hp = gpd.GeoSeries([ (G2.nodes[orig_node]["x"], G2.nodes[orig_node]["y"]) ],
                       crs=4326).to_crs(epsg_metric).iloc[0]
    cand = shelters_gdf.to_crs(epsg_metric).copy()
    cand["__d__"] = cand.geometry.distance(hp)
    cand = shelters_gdf.loc[cand.sort_values("__d__").head(N).index].copy()

    best_route = None
    best_len = np.inf
    best_dest_pt = None

    for _, row in cand.iterrows():
        try:
            try:
                dnode = ox.distance.nearest_nodes(G2, X=row.geometry.x, Y=row.geometry.y)
            except AttributeError:
                dnode = ox.nearest_nodes(G2, X=row.geometry.x, Y=row.geometry.y)
            r = nx.shortest_path(G2, orig_node, dnode, weight="length")
            # 経路長[m]
            try:
                meters = float(ox.routing.route_to_gdf(G2, r, True)["length"].sum())
            except Exception:
                meters = 0.0
                for u, v in zip(r[:-1], r[1:]):
                    ed = G2.get_edge_data(u, v)
                    meters += min(d.get("length", 0.0) for d in ed.values())
            if meters < best_len:
                best_len = meters
                best_route = r
                best_dest_pt = row.geometry
        except nx.NetworkXNoPath:
            continue

    if best_route is None:
        raise RuntimeError("到達可能な候補が見つかりません。半径またはNを増やしてください。")
    return best_route, best_len, best_dest_pt

次に
main.py の該当箇所を置き換え

# ❌ 削除
dest = nearest_node(G2, x=dest_pt.x, y=dest_pt.y)
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))

コードを変更

# main.py
from evac.config import Cfg
from evac.data_io import load_layers, get_tsunami_union, write_route_latest
from evac.network import build_safe_graph
from evac.routing import nearest_node, route_and_eta, make_route_gdf
from evac.hazard import validate_route
from evac.cache import route_key, load_cached_route, save_route_cache
from evac.routing_extras import pick_best_shelter_by_network   # ← 追加

import geopandas as gpd
import networkx as nx
from shapely.geometry import Point

coast, tsunami = load_layers()
flood_union = get_tsunami_union(tsunami)

home_wgs = (34.728, 137.978)
G2, nodes_keep, edges_keep = build_safe_graph(home_wgs, Cfg.GRAPH_RADIUS_M, tsunami)

# 避難所レイヤ(安全があれば優先)
try:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_safe")
    if TARGET.empty:
        raise IOError
except Exception:
    TARGET = gpd.read_file(Cfg.GPKG, layer="shelters_all")
if TARGET.empty:
    raise RuntimeError("避難所レイヤが空です(shelters_safe / shelters_all)。")

# 出発ノード
home_pt = Point(home_wgs[1], home_wgs[0])
orig = nearest_node(G2, x=home_pt.x, y=home_pt.y)

# ✅ 上位Nからネットワーク距離最短を選定(ここで dest_pt が得られる)
route_nodes, meters_tmp, dest_pt = pick_best_shelter_by_network(G2, orig, TARGET, N=5)

# 以降のキャッシュキー生成は dest_pt を使う
rid = route_key(home_wgs, (dest_pt.y, dest_pt.x))
cached = load_cached_route(rid)

if cached is not None:
    print("↩︎ ルート: キャッシュヒット")
    route_gdf = cached.set_crs(4326)
else:
    meters, walk, drive = route_and_eta(G2, route_nodes)
    route_gdf, route_latlon = make_route_gdf(G2, route_nodes, meters, walk, drive)
    inside_len_m, near = validate_route(route_gdf, flood_union, Cfg.EPSG_METRIC)
    if inside_len_m > 0 or near:
        raise RuntimeError("ルート不適合: 浸水域と交差/近接あり")
    write_route_latest(route_gdf)
    save_route_cache(route_gdf, rid)

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です