まずは v0(当日実用版) を“そのまま挿すだけ”で動く形で用意

まずは v0(当日実用版) を“そのまま挿すだけ”で動く形で用意

Epicenter(震源の lat,lon)を渡すと、深海近似の伝播速度で到達時間を安全側に推定し、
route_latest に arrival_time_min / height_class / confidence を追記して上書き保存、
同時に sim_runs に実行ログを追記

既存の変数:GPKG, coast(海岸線), home_wgs(自宅), route_latest は前工程で作成済みを想定
追加依存:pyproj(地球楕円体距離計算用)

# === 津波「到達時間・高さ」簡易推定 (v0) ===
# 目的:震源→自宅最寄り海岸線までの直線距離 / 深海近似速度で到達時間を安全側に推定
# 出力:arrival_time_min, height_class, confidence を route_latest に追記(上書き保存)
#       sim_runs テーブルに実行ログを追記
from datetime import datetime
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from shapely.ops import nearest_points
from pyproj import Geod

GEOD = Geod(ellps="WGS84")

def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
    """
    自宅に最も近い海岸線上の点を返す(WGS84)
    - coast_gdf: 4326 想定(LineString/MultiLineString)
    - home_latlon: (lat,lon)
    """
    if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt = Point(home_latlon[1], home_latlon[0])  # (lon,lat)
    # 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
    coast_wgs["__d__"] = coast_wgs.distance(home_pt)
    geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
    n1, n2 = nearest_points(geom, home_pt)
    return n1  # 海岸線側の最近点(Point lon,lat)

def _geodesic_km(p1_latlon: tuple[float,float], p2_latlon: tuple[float,float]) -> float:
    """
    WGS84 楕円体の測地距離[km]
    """
    az12, az21, m = GEOD.inv(p1_latlon[1], p1_latlon[0], p2_latlon[1], p2_latlon[0])
    return m / 1000.0

def estimate_arrival_minutes_v0(
    epicenter_latlon: tuple[float,float],
    coast_gdf: gpd.GeoDataFrame,
    home_latlon: tuple[float,float],
    h_rep_m: float = 3000.0,       # 代表水深[m](深海)
    safety_slow_factor: float = 1.2  # 速度を遅めに見積もる倍率(到達時間は長くなる)
) -> float:
    """
    深海近似の伝播速度 c ≈ √(g*h) を用い、震源→自宅最寄り海岸までの直線距離から到達時間[分]を推定(安全側)。
    """
    g = 9.81
    c_ms = (g * h_rep_m) ** 0.5              # m/s
    c_ms_safe = c_ms / safety_slow_factor    # 安全側に遅くする
    coast_pt = _nearest_coast_point_to_home(coast_gdf, home_latlon)  # lon,lat
    coast_latlon = (coast_pt.y, coast_pt.x)

    D_km = _geodesic_km(epicenter_latlon, coast_latlon)  # 震源→自宅最寄り海岸
    t_min = (D_km * 1000.0) / c_ms_safe / 60.0
    return float(t_min)

def update_route_latest_with_arrival(
    GPKG: str,
    arrival_time_min: float,
    height_class: str = "unknown",
    confidence: str = "low",
):
    """
    route_latest に arrival_time_min / height_class / confidence を追記して上書き保存。
    route_latest が未作成の場合は何もしない(例外を出さず return)。
    """
    from fiona import listlayers
    if "route_latest" not in listlayers(GPKG):
        print("route_latest レイヤが未作成のため、到達時間の追記はスキップしました。")
        return
    rl = gpd.read_file(GPKG, layer="route_latest")
    # 既存列がなければ追加
    for col in ["arrival_time_min", "height_class", "confidence"]:
        if col not in rl.columns:
            rl[col] = None
    rl.loc[:, "arrival_time_min"] = float(arrival_time_min)
    rl.loc[:, "height_class"] = str(height_class)
    rl.loc[:, "confidence"] = str(confidence)
    rl.to_file(
        GPKG, layer="route_latest",
        driver="GPKG", engine="pyogrio",
        layer_options={"OVERWRITE": "YES"}
    )
    print(f"✓ route_latest を更新: 到達 {arrival_time_min:.1f} 分 / 高さ {height_class} / 信頼度 {confidence}")

def append_sim_run(
    GPKG: str,
    epicenter_latlon: tuple[float,float],
    home_latlon: tuple[float,float],
    arrival_time_min: float,
    h_rep_m: float,
    safety_slow_factor: float,
    height_class: str,
    confidence: str,
    source: str = "v0_deep_sea_approx"
):
    """
    sim_runs に今回の推定メタを 1 行追記。
    既存スキーマ差があればマージ→上書きで吸収。
    """
    from fiona import listlayers
    rec = gpd.GeoDataFrame({
        "run_id": [datetime.utcnow().isoformat(timespec="seconds")+"Z"],
        "epicenter_lat": [epicenter_latlon[0]],
        "epicenter_lon": [epicenter_latlon[1]],
        "home_lat": [home_latlon[0]],
        "home_lon": [home_latlon[1]],
        "arrival_time_min": [float(arrival_time_min)],
        "h_rep_m": [float(h_rep_m)],
        "safety_slow_factor": [float(safety_slow_factor)],
        "height_class": [str(height_class)],
        "confidence": [str(confidence)],
        "source": ,
    }, geometry=[Point(epicenter_latlon[1], epicenter_latlon[0])], crs="EPSG:4326")

    try:
        rec.to_file(
            GPKG, layer="sim_runs",
            driver="GPKG", engine="pyogrio",
            append=True
        )
    except Exception:
        # スキーマ差などのフォールバック(既存行を読み出してマージ→上書き)
        if "sim_runs" in listlayers(GPKG):
            old = gpd.read_file(GPKG, layer="sim_runs")
            cols = sorted(set(old.columns) | set(rec.columns))
            merged = gpd.GeoDataFrame(pd.concat([old.reindex(columns=cols),
                                                 rec.reindex(columns=cols)], ignore_index=True), crs=4326)
            merged.to_file(
                GPKG, layer="sim_runs",
                driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"}
            )
        else:
            rec.to_file(GPKG, layer="sim_runs", driver="GPKG", engine="pyogrio")
    print("✓ sim_runs に追記しました。")

次にテスト

# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5)  # (lat, lon)

# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
    epicenter_latlon=epicenter,
    coast_gdf=coast,            # 既に読み込み済みの海岸線(EPSG:4326推奨)
    home_latlon=home_wgs,
    h_rep_m=3000.0,             # 代表水深:3,000m(必要に応じ調整)
    safety_slow_factor=1.2      # 速度を 1/1.2 に落として安全側
)

# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"

# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
    GPKG=GPKG,
    arrival_time_min=arr_min,
    height_class=height_class,
    confidence=confidence
)

# sim_runs にログ追記
append_sim_run(
    GPKG=GPKG,
    epicenter_latlon=epicenter,
    home_latlon=home_wgs,
    arrival_time_min=arr_min,
    h_rep_m=3000.0,
    safety_slow_factor=1.2,
    height_class=height_class,
    confidence=confidence
)

を実行すると



✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low
✓ sim_runs に追記しました。
/var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/3983269974.py:27: UserWarning: Geometry is in a geographic CRS. Results from 'distance' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation.

  coast_wgs["__d__"] = coast_wgs.distance(home_pt)

となる

原因は
警告は「経度緯度(EPSG:4326)のまま .distance() を使っている」ため
距離計算は投影座標系(m単位)で行うようにすれば解決するらしい

最近点の計算を UTM で実施 → WGS84 に戻して返すようにする

def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
    """
    自宅に最も近い海岸線上の点を返す(WGS84)
    - coast_gdf: 4326 想定(LineString/MultiLineString)
    - home_latlon: (lat,lon)
    """
    if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt = Point(home_latlon[1], home_latlon[0])  # (lon,lat)
    # 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
    coast_wgs["__d__"] = coast_wgs.distance(home_pt)
    geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
    n1, n2 = nearest_points(geom, home_pt)
    return n1  # 海岸線側の最近点(Point lon,lat)

の部分を

def _nearest_coast_point_to_home(
    coast_gdf: gpd.GeoDataFrame,
    home_latlon: tuple[float, float],
    metric_crs: str = "EPSG:32654"  # 静岡周辺 UTM (m)
) -> Point:
    """
    自宅に最も近い海岸線上の点(WGS84)を返す。
    距離計算は投影座標系で実施して精度を担保。
    """
    # 入力を WGS84 とみなして正規化
    if coast_gdf.crs is None:
        coast_wgs = coast_gdf.set_crs(4326)
    elif coast_gdf.crs.to_epsg() != 4326:
        coast_wgs = coast_gdf.to_crs(4326)
    else:
        coast_wgs = coast_gdf

    home_pt_wgs = Point(home_latlon[1], home_latlon[0])  # (lon, lat)

    # 距離計算は m 単位の投影座標で
    coast_m = coast_wgs.to_crs(metric_crs).copy()
    home_pt_m = gpd.GeoSeries([home_pt_wgs], crs=4326).to_crs(metric_crs).iloc[0]

    # まず最も近いジオメトリを特定(sindex があれば速い)
    try:
        # GeoPandas ≥0.12 なら sindex.nearest が使える環境も
        idx = next(coast_m.sindex.nearest(home_pt_m.bounds))[0]
    except Exception:
        # フォールバック:距離最小
        coast_m["__d__"] = coast_m.geometry.distance(home_pt_m)
        idx = coast_m["__d__"].idxmin()

    geom_m = coast_m.loc[idx, "geometry"]
    p_on_coast_m, _ = nearest_points(geom_m, home_pt_m)  # 投影座標で最近点

    # WGS84 に戻して返す
    p_on_coast_wgs = gpd.GeoSeries([p_on_coast_m], crs=metric_crs).to_crs(4326).iloc[0]
    return p_on_coast_wgs

へ変更

これで

# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5)  # (lat, lon)

# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
    epicenter_latlon=epicenter,
    coast_gdf=coast,            # 既に読み込み済みの海岸線(EPSG:4326推奨)
    home_latlon=home_wgs,
    h_rep_m=3000.0,             # 代表水深:3,000m(必要に応じ調整)
    safety_slow_factor=1.2      # 速度を 1/1.2 に落として安全側
)

# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"

# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
    GPKG=GPKG,
    arrival_time_min=arr_min,
    height_class=height_class,
    confidence=confidence
)

# sim_runs にログ追記
append_sim_run(
    GPKG=GPKG,
    epicenter_latlon=epicenter,
    home_latlon=home_wgs,
    arrival_time_min=arr_min,
    h_rep_m=3000.0,
    safety_slow_factor=1.2,
    height_class=height_class,
    confidence=confidence
)

を再度実行すると

✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low
✓ sim_runs に追記しました。

となる

ここまではできたので
推定到達時間に連動して等時間到達圏(Y分)を自動生成 → 安全エリアを重ねて表示

# === 推定到達時間に連動した等時間到達圏の自動生成 ===
import geopandas as gpd

# 1) route_latest から arrival_time_min を取得(安全側 70% & 上限20分)
rl = gpd.read_file(GPKG, layer="route_latest")
arr = float(rl["arrival_time_min"].iloc[0]) if "arrival_time_min" in rl else 15.0
Y = max(5, min(20, int(arr * 0.7)))   # 5〜20分にクリップ

# 2) 出発ノード & 重み(既存関数)
orig = nearest_node_from_wgs(G2, lat=home_wgs[0], lon=home_wgs[1])
add_edge_time_weight(G2, speed_kmh=4.8)

# 3) 到達圏→非浸水差し引き
iso_gdf, reach_edges_4326 = isochrone_polygon(
    G2, origin_node=orig, cutoff_min=Y,
    proj_crs=METRIC, buffer_m=35.0, weight_attr="time_min"
)
safe_area_now = subtract_flood(iso_gdf, flood_union, proj_crs=METRIC)

# 4) 上書き保存(pyogrio + OVERWRITE)
iso_layer = f"isochrone_walk_{Y}min"
for gdf, layer in [(iso_gdf, iso_layer), (safe_area_now, "safe_area_now")]:
    gdf.to_file(GPKG, layer=layer, driver="GPKG", engine="pyogrio",
                layer_options={"OVERWRITE": "YES"})

# 5) Folium へ重ねて保存
folium.GeoJson(iso_gdf.__geo_interface__, name=iso_layer,
               style_function=lambda x: {"fillOpacity":0.20, "color":"black", "weight":1}).add_to(m)
folium.GeoJson(safe_area_now.__geo_interface__, name="安全エリア(到達圏∩非浸水)",
               style_function=lambda x: {"fillOpacity":0.30, "weight":0}).add_to(m)
m.save("outputs/evac.html")
print(f"✓ {iso_layer} / safe_area_now を更新し、outputs/evac.html を保存しました。")

✓ isochrone_walk_15min / safe_area_now を更新し、outputs/evac.html を保存しました。

となる

震源→到達時間v0→(Y分)等時間到達圏→非浸水エリア→地図」が自動でひとまとめになるので
次は
EEW/揺れ検知トリガーからこの一連を叩く CLI に繋げる
そのためにまずは
EEW/揺れ検知トリガーの作成をする

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です