時間到達圏(isochrone)生成関数から作成

時間到達圏(isochrone)生成関数から作成

G2(浸水回避済みグラフ)と自宅座標から 徒歩Y分の到達圏ポリゴン を作る

到達圏は「到達可能エッジを太らせて面化」する堅牢方式
(alpha-shape不要、Shapely/GeoPandas 標準で完結)

前提:G2(到達可能道路グラフ)、home_wgs、GPKG、METRIC=”EPSG:32654″ が既に定義済み

# === Isochrone 生成ユーティリティ ===
import math
import geopandas as gpd
import networkx as nx
import osmnx as ox
from shapely.ops import unary_union

def add_edge_time_weight(G, speed_kmh: float = 4.8, attr_name: str = "time_min"):
    """
    エッジに 'time_min'(分)を付与(長さ[m] / 速度[km/h]).
    既にある場合は上書き。
    """
    fac = speed_kmh * 1000 / 60.0  # m / min
    for u, v, k, data in G.edges(keys=True, data=True):
        length_m = float(data.get("length", 0.0))
        data[attr_name] = length_m / fac
    return G

def nearest_node_from_wgs(G, lat: float, lon: float):
    """WGS84の(lat,lon)から最近傍ノードIDを返す(osmnxのAPI差異に両対応)。"""
    try:
        return ox.distance.nearest_nodes(G, X=lon, Y=lat)
    except AttributeError:
        return ox.nearest_nodes(G, X=lon, Y=lat)

def isochrone_polygon(
    G,
    origin_node: int,
    cutoff_min: float,
    proj_crs: str = "EPSG:32654",
    buffer_m: float = 35.0,
    weight_attr: str = "time_min",
):
    """
    到達可能サブグラフのエッジを buffer で面化 → マルチポリゴン化。
    返り値: (isochrone_gdf[EPSG:4326], reachable_edges_gdf[EPSG:4326])

    - cutoff_min: 到達時間しきい値(分)
    - buffer_m: エッジに与えるバッファ[m](道路幅+余裕)
    """
    # dijkstra で到達ノード
    dist = nx.single_source_dijkstra_path_length(G, origin_node, cutoff=cutoff_min, weight=weight_attr)
    reachable_nodes = set(dist.keys())
    if not reachable_nodes:
        raise RuntimeError("到達ノードが空です。cutoff_min が小さすぎる可能性。")

    # 到達エッジ抽出
    nodes_gdf, edges_gdf = ox.graph_to_gdfs(G, nodes=True, edges=True)
    idx = edges_gdf.reset_index()[["u", "v", "key"]].apply(tuple, axis=1)
    mask = [(u in reachable_nodes and v in reachable_nodes) for (u, v, k) in idx]
    reach_edges = edges_gdf.loc[mask].copy()
    if reach_edges.empty:
        raise RuntimeError("到達エッジが空です。グラフが疎か cutoff_min が小さい可能性。")

    # 面化(project→buffer→union→修正→逆投影)
    reach_proj = reach_edges.to_crs(proj_crs)
    merged = unary_union(reach_proj.buffer(buffer_m).geometry.values)
    # geometry 修正(buffer(0) に相当)
    poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union

    # 出力(4326)
    iso_gdf = gpd.GeoDataFrame({"mode": ["walk"], "minutes": [float(cutoff_min)]},
                               geometry=[poly], crs=proj_crs).to_crs(4326)
    reach_edges_4326 = reach_proj.to_crs(4326)
    return iso_gdf, reach_edges_4326

def subtract_flood(iso_gdf: gpd.GeoDataFrame, flood_union_gdf: gpd.GeoDataFrame, proj_crs: str = "EPSG:32654"):
    """
    isochrone(面)から津波浸水(union)を差し引き → 「到達可能 × 非浸水」エリア。
    返り値: safe_area_now_gdf[EPSG:4326]
    """
    iso_m = iso_gdf.to_crs(proj_crs)
    fl_m  = flood_union_gdf.to_crs(proj_crs)
    safe_geom = iso_m.geometry.iloc[0].difference(fl_m.unary_union)
    safe = gpd.GeoDataFrame({"minutes":[iso_gdf["minutes"].iloc[0]]}, geometry=[safe_geom], crs=proj_crs).to_crs(4326)
    return safe

実行すると

/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/1981613481.py:59: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union
/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/1981613481.py:74: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  safe_geom = iso_m.geometry.iloc[0].difference(fl_m.unary_union)
<folium.features.GeoJson at 0x178279f10>

警告は Shapely/GeoPandas 2.x 系の非推奨によるもの
unary_union は廃止予定 → union_all() に置き換えればOK

変更すると

/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/832098648.py:59: DeprecationWarning: The 'unary_union' attribute is deprecated, use the 'union_all()' method instead.
  poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union
<folium.features.GeoJson at 0x17dfdf350>
    poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).unary_union

も修正する

poly = gpd.GeoSeries([merged], crs=proj_crs).buffer(0).union_all()
merged = unary_union(reach_proj.buffer(buffer_m).geometry.values)

も修正

buf = reach_proj.buffer(buffer_m)
merged = gpd.GeoSeries(buf, crs=proj_crs).union_all()

に書き換える

これでエラーが消える

重みtime_minはエッジ長さ/歩行速度。最短経路と同様にDijkstraで「時間≦Y分」のノード集合を取り、両端が到達可能なエッジのみ抽出→buffer→unionで面化しています。
buffer_m は道路幅+余裕のパラメータ。市街地なら 30–50m で見た目・連続性が良好です。
非浸水差引は isochrone – flood_union。あなたの既存 flood_union(津波浸水 union)をそのまま使えます。
保存は pyogrio + OVERWRITE に統一(毎回クリーン)。

次は

# ② 津波「到達時間・高さ」簡易推定モジュール

**目的**: 「あと何分で来る?」に即答。初期は簡易/安全側、後で精緻化。
**ステップ**

* v0(当日実用版)

* **深海近似の伝播速度** (c ≈ √(g·h)) を **一定深さ代表値**で近似(安全側に遅めに見積もる)
* 震源→沿岸/自宅最寄り海岸線までの距離から到達時間を算出
* 高さは情報源(JMA速報の区分)を採用、未入手なら「不明→避難優先」のメッセージ
* v1(精緻化)

* 沿岸ごとの**水深ラスタ**(GEBCO/J-EGG等)から距離積分で可変速度伝播
* 複数反射・地形収束は扱わず「防災判断に使える保守的な上限/下限」を出す
* 出力

* `arrival_time_min`, `height_class`, `confidence` を `route_latest`/`sim_runs` に書込

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です