まずは v0(当日実用版) を“そのまま挿すだけ”で動く形で用意
Epicenter(震源の lat,lon)を渡すと、深海近似の伝播速度で到達時間を安全側に推定し、
route_latest に arrival_time_min / height_class / confidence を追記して上書き保存、
同時に sim_runs に実行ログを追記
既存の変数:GPKG, coast(海岸線), home_wgs(自宅), route_latest は前工程で作成済みを想定
追加依存:pyproj(地球楕円体距離計算用)
# === 津波「到達時間・高さ」簡易推定 (v0) ===
# 目的:震源→自宅最寄り海岸線までの直線距離 / 深海近似速度で到達時間を安全側に推定
# 出力:arrival_time_min, height_class, confidence を route_latest に追記(上書き保存)
# sim_runs テーブルに実行ログを追記
from datetime import datetime
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from shapely.ops import nearest_points
from pyproj import Geod
GEOD = Geod(ellps="WGS84")
def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
"""
自宅に最も近い海岸線上の点を返す(WGS84)
- coast_gdf: 4326 想定(LineString/MultiLineString)
- home_latlon: (lat,lon)
"""
if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
coast_wgs = coast_gdf.to_crs(4326)
else:
coast_wgs = coast_gdf
home_pt = Point(home_latlon[1], home_latlon[0]) # (lon,lat)
# 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
coast_wgs["__d__"] = coast_wgs.distance(home_pt)
geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
n1, n2 = nearest_points(geom, home_pt)
return n1 # 海岸線側の最近点(Point lon,lat)
def _geodesic_km(p1_latlon: tuple[float,float], p2_latlon: tuple[float,float]) -> float:
"""
WGS84 楕円体の測地距離[km]
"""
az12, az21, m = GEOD.inv(p1_latlon[1], p1_latlon[0], p2_latlon[1], p2_latlon[0])
return m / 1000.0
def estimate_arrival_minutes_v0(
epicenter_latlon: tuple[float,float],
coast_gdf: gpd.GeoDataFrame,
home_latlon: tuple[float,float],
h_rep_m: float = 3000.0, # 代表水深[m](深海)
safety_slow_factor: float = 1.2 # 速度を遅めに見積もる倍率(到達時間は長くなる)
) -> float:
"""
深海近似の伝播速度 c ≈ √(g*h) を用い、震源→自宅最寄り海岸までの直線距離から到達時間[分]を推定(安全側)。
"""
g = 9.81
c_ms = (g * h_rep_m) ** 0.5 # m/s
c_ms_safe = c_ms / safety_slow_factor # 安全側に遅くする
coast_pt = _nearest_coast_point_to_home(coast_gdf, home_latlon) # lon,lat
coast_latlon = (coast_pt.y, coast_pt.x)
D_km = _geodesic_km(epicenter_latlon, coast_latlon) # 震源→自宅最寄り海岸
t_min = (D_km * 1000.0) / c_ms_safe / 60.0
return float(t_min)
def update_route_latest_with_arrival(
GPKG: str,
arrival_time_min: float,
height_class: str = "unknown",
confidence: str = "low",
):
"""
route_latest に arrival_time_min / height_class / confidence を追記して上書き保存。
route_latest が未作成の場合は何もしない(例外を出さず return)。
"""
from fiona import listlayers
if "route_latest" not in listlayers(GPKG):
print("route_latest レイヤが未作成のため、到達時間の追記はスキップしました。")
return
rl = gpd.read_file(GPKG, layer="route_latest")
# 既存列がなければ追加
for col in ["arrival_time_min", "height_class", "confidence"]:
if col not in rl.columns:
rl[col] = None
rl.loc[:, "arrival_time_min"] = float(arrival_time_min)
rl.loc[:, "height_class"] = str(height_class)
rl.loc[:, "confidence"] = str(confidence)
rl.to_file(
GPKG, layer="route_latest",
driver="GPKG", engine="pyogrio",
layer_options={"OVERWRITE": "YES"}
)
print(f"✓ route_latest を更新: 到達 {arrival_time_min:.1f} 分 / 高さ {height_class} / 信頼度 {confidence}")
def append_sim_run(
GPKG: str,
epicenter_latlon: tuple[float,float],
home_latlon: tuple[float,float],
arrival_time_min: float,
h_rep_m: float,
safety_slow_factor: float,
height_class: str,
confidence: str,
source: str = "v0_deep_sea_approx"
):
"""
sim_runs に今回の推定メタを 1 行追記。
既存スキーマ差があればマージ→上書きで吸収。
"""
from fiona import listlayers
rec = gpd.GeoDataFrame({
"run_id": [datetime.utcnow().isoformat(timespec="seconds")+"Z"],
"epicenter_lat": [epicenter_latlon[0]],
"epicenter_lon": [epicenter_latlon[1]],
"home_lat": [home_latlon[0]],
"home_lon": [home_latlon[1]],
"arrival_time_min": [float(arrival_time_min)],
"h_rep_m": [float(h_rep_m)],
"safety_slow_factor": [float(safety_slow_factor)],
"height_class": [str(height_class)],
"confidence": [str(confidence)],
"source": ,
}, geometry=[Point(epicenter_latlon[1], epicenter_latlon[0])], crs="EPSG:4326")
try:
rec.to_file(
GPKG, layer="sim_runs",
driver="GPKG", engine="pyogrio",
append=True
)
except Exception:
# スキーマ差などのフォールバック(既存行を読み出してマージ→上書き)
if "sim_runs" in listlayers(GPKG):
old = gpd.read_file(GPKG, layer="sim_runs")
cols = sorted(set(old.columns) | set(rec.columns))
merged = gpd.GeoDataFrame(pd.concat([old.reindex(columns=cols),
rec.reindex(columns=cols)], ignore_index=True), crs=4326)
merged.to_file(
GPKG, layer="sim_runs",
driver="GPKG", engine="pyogrio",
layer_options={"OVERWRITE": "YES"}
)
else:
rec.to_file(GPKG, layer="sim_runs", driver="GPKG", engine="pyogrio")
print("✓ sim_runs に追記しました。")
次にテスト
# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5) # (lat, lon)
# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
epicenter_latlon=epicenter,
coast_gdf=coast, # 既に読み込み済みの海岸線(EPSG:4326推奨)
home_latlon=home_wgs,
h_rep_m=3000.0, # 代表水深:3,000m(必要に応じ調整)
safety_slow_factor=1.2 # 速度を 1/1.2 に落として安全側
)
# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"
# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
GPKG=GPKG,
arrival_time_min=arr_min,
height_class=height_class,
confidence=confidence
)
# sim_runs にログ追記
append_sim_run(
GPKG=GPKG,
epicenter_latlon=epicenter,
home_latlon=home_wgs,
arrival_time_min=arr_min,
h_rep_m=3000.0,
safety_slow_factor=1.2,
height_class=height_class,
confidence=confidence
)
を実行すると
✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low ✓ sim_runs に追記しました。 /var/folders/db/tkl5w9dd3kn3h53ctyl4s6180000gn/T/ipykernel_72100/3983269974.py:27: UserWarning: Geometry is in a geographic CRS. Results from 'distance' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation. coast_wgs["__d__"] = coast_wgs.distance(home_pt)
となる
原因は
警告は「経度緯度(EPSG:4326)のまま .distance() を使っている」ため
距離計算は投影座標系(m単位)で行うようにすれば解決するらしい
最近点の計算を UTM で実施 → WGS84 に戻して返すようにする
def _nearest_coast_point_to_home(coast_gdf: gpd.GeoDataFrame, home_latlon: tuple[float,float]) -> Point:
"""
自宅に最も近い海岸線上の点を返す(WGS84)
- coast_gdf: 4326 想定(LineString/MultiLineString)
- home_latlon: (lat,lon)
"""
if coast_gdf.crs is None or coast_gdf.crs.to_epsg() != 4326:
coast_wgs = coast_gdf.to_crs(4326)
else:
coast_wgs = coast_gdf
home_pt = Point(home_latlon[1], home_latlon[0]) # (lon,lat)
# 距離が最小のジオメトリを粗く選んでから、正確な最短点を取る
coast_wgs["__d__"] = coast_wgs.distance(home_pt)
geom = coast_wgs.sort_values("__d__").geometry.iloc[0]
n1, n2 = nearest_points(geom, home_pt)
return n1 # 海岸線側の最近点(Point lon,lat)
の部分を
def _nearest_coast_point_to_home(
coast_gdf: gpd.GeoDataFrame,
home_latlon: tuple[float, float],
metric_crs: str = "EPSG:32654" # 静岡周辺 UTM (m)
) -> Point:
"""
自宅に最も近い海岸線上の点(WGS84)を返す。
距離計算は投影座標系で実施して精度を担保。
"""
# 入力を WGS84 とみなして正規化
if coast_gdf.crs is None:
coast_wgs = coast_gdf.set_crs(4326)
elif coast_gdf.crs.to_epsg() != 4326:
coast_wgs = coast_gdf.to_crs(4326)
else:
coast_wgs = coast_gdf
home_pt_wgs = Point(home_latlon[1], home_latlon[0]) # (lon, lat)
# 距離計算は m 単位の投影座標で
coast_m = coast_wgs.to_crs(metric_crs).copy()
home_pt_m = gpd.GeoSeries([home_pt_wgs], crs=4326).to_crs(metric_crs).iloc[0]
# まず最も近いジオメトリを特定(sindex があれば速い)
try:
# GeoPandas ≥0.12 なら sindex.nearest が使える環境も
idx = next(coast_m.sindex.nearest(home_pt_m.bounds))[0]
except Exception:
# フォールバック:距離最小
coast_m["__d__"] = coast_m.geometry.distance(home_pt_m)
idx = coast_m["__d__"].idxmin()
geom_m = coast_m.loc[idx, "geometry"]
p_on_coast_m, _ = nearest_points(geom_m, home_pt_m) # 投影座標で最近点
# WGS84 に戻して返す
p_on_coast_wgs = gpd.GeoSeries([p_on_coast_m], crs=metric_crs).to_crs(4326).iloc[0]
return p_on_coast_wgs
へ変更
これで
# 震源(例):熊野灘沖など仮
epicenter = (33.0, 138.5) # (lat, lon)
# 到達時間(安全側)
arr_min = estimate_arrival_minutes_v0(
epicenter_latlon=epicenter,
coast_gdf=coast, # 既に読み込み済みの海岸線(EPSG:4326推奨)
home_latlon=home_wgs,
h_rep_m=3000.0, # 代表水深:3,000m(必要に応じ調整)
safety_slow_factor=1.2 # 速度を 1/1.2 に落として安全側
)
# 高さは入手できない前提で unknown(JMA速報を後で接続)
height_class = "unknown"
confidence = "low"
# route_latest に追記(上書き保存)
update_route_latest_with_arrival(
GPKG=GPKG,
arrival_time_min=arr_min,
height_class=height_class,
confidence=confidence
)
# sim_runs にログ追記
append_sim_run(
GPKG=GPKG,
epicenter_latlon=epicenter,
home_latlon=home_wgs,
arrival_time_min=arr_min,
h_rep_m=3000.0,
safety_slow_factor=1.2,
height_class=height_class,
confidence=confidence
)
を再度実行すると
✓ route_latest を更新: 到達 22.3 分 / 高さ unknown / 信頼度 low ✓ sim_runs に追記しました。
となる
ここまではできたので
推定到達時間に連動して等時間到達圏(Y分)を自動生成 → 安全エリアを重ねて表示
# === 推定到達時間に連動した等時間到達圏の自動生成 ===
import geopandas as gpd
# 1) route_latest から arrival_time_min を取得(安全側 70% & 上限20分)
rl = gpd.read_file(GPKG, layer="route_latest")
arr = float(rl["arrival_time_min"].iloc[0]) if "arrival_time_min" in rl else 15.0
Y = max(5, min(20, int(arr * 0.7))) # 5〜20分にクリップ
# 2) 出発ノード & 重み(既存関数)
orig = nearest_node_from_wgs(G2, lat=home_wgs[0], lon=home_wgs[1])
add_edge_time_weight(G2, speed_kmh=4.8)
# 3) 到達圏→非浸水差し引き
iso_gdf, reach_edges_4326 = isochrone_polygon(
G2, origin_node=orig, cutoff_min=Y,
proj_crs=METRIC, buffer_m=35.0, weight_attr="time_min"
)
safe_area_now = subtract_flood(iso_gdf, flood_union, proj_crs=METRIC)
# 4) 上書き保存(pyogrio + OVERWRITE)
iso_layer = f"isochrone_walk_{Y}min"
for gdf, layer in [(iso_gdf, iso_layer), (safe_area_now, "safe_area_now")]:
gdf.to_file(GPKG, layer=layer, driver="GPKG", engine="pyogrio",
layer_options={"OVERWRITE": "YES"})
# 5) Folium へ重ねて保存
folium.GeoJson(iso_gdf.__geo_interface__, name=iso_layer,
style_function=lambda x: {"fillOpacity":0.20, "color":"black", "weight":1}).add_to(m)
folium.GeoJson(safe_area_now.__geo_interface__, name="安全エリア(到達圏∩非浸水)",
style_function=lambda x: {"fillOpacity":0.30, "weight":0}).add_to(m)
m.save("outputs/evac.html")
print(f"✓ {iso_layer} / safe_area_now を更新し、outputs/evac.html を保存しました。")
✓ isochrone_walk_15min / safe_area_now を更新し、outputs/evac.html を保存しました。
となる
震源→到達時間v0→(Y分)等時間到達圏→非浸水エリア→地図」が自動でひとまとめになるので
次は
EEW/揺れ検知トリガーからこの一連を叩く CLI に繋げる
そのためにまずは
EEW/揺れ検知トリガーの作成をする