危機監視スコアを2層化し、信用危機と地政学ショックを分けて見る
前回までに、FREDやStooqから取得した市場データを使い、危機監視用の特徴量を research.sqlite に保存するところまで進めました。
今回は、その特徴量を使って作成していた crisis_score を改良します。
これまでのスコアは、VIX、社債スプレッド、金融環境指数、イールドカーブなどを使って、主に信用不安や金融環境の悪化を検出する設計でした。
ただし、直近の市場の緊張状態を見る場合、信用不安だけでは説明しきれない局面があります。たとえば、原油価格や金価格が動くような地政学リスク、資源ショック、為替の急変などです。
そこで今回は、危機監視スコアを次の2層構造に変更しました。
core_score:信用・ボラティリティ・金融条件を見るスコアgeo_score:WTI、金、ドル円などから地政学・資源ショックを見るスコアscore:core_score と geo_score を合成した最終スコア
この形にしておくと、単に「危機っぽい」だけではなく、後から「信用危機なのか」「地政学ショックなのか」を分けて確認しやすくなります。
今回やること
今回の作業は、いきなり既存の crisis_score を上書きするのではなく、旧版を退避してから新版を作る流れにしました。
- 既存の
crisis_scoreをcrisis_score_oldとして保存する build_crisis_score.pyを2層化対応版に変更する- 新しい
crisis_scoreを作成する check_crisis_events.pyで旧版と新版を比較する
旧版を残しておくことで、新しいスコアが過去の危機でどう変わったかを比較できます。
既存のcrisis_scoreを退避する
まず、現在の crisis_score を crisis_score_old として保存します。
sqlite3 research.sqlite
SQLiteに入ったら、次のSQLを実行します。
DROP TABLE IF EXISTS crisis_score_old; CREATE TABLE crisis_score_old AS SELECT * FROM crisis_score;
保存できたか確認します。
SELECT COUNT(*) FROM crisis_score_old;
結果は次のようになりました。
10664
期間も確認します。
SELECT MIN(date), MAX(date) FROM crisis_score_old;
結果は次の通りです。
1997-01-01|2026-03-13
これで、旧版の危機スコアを退避できました。
スコアを2層構造にする
次に、build_crisis_score.py を2層化します。
今回の考え方は、次の通りです。
core_scoreは、信用危機や金融環境の悪化を検出するgeo_scoreは、WTI、金、ドル円を使って地政学・資源ショックを検出するscoreは、core_scoreを主、geo_scoreを補助として合成する
今回の合成比率は、いったん次のようにしました。
core_score:80%geo_score:20%
信用危機を検出する既存スコアの性格を残しつつ、WTIや金価格の変化も反映できるようにするためです。
build_crisis_score.py
以下が、今回作成した2層化対応版の build_crisis_score.py です。
import sqlite3
import pandas as pd
import numpy as np
RESEARCH_DB = "research.sqlite"
SCORE_START_DATE = "1997-01-01"
RESET_CRISIS_SCORE = True
# -----------------------------
# 1) コア危機スコア
# -----------------------------
CORE_FEATURES = {
"vix_z20": 1.2,
"hy_oas_z20": 1.4,
"bbb_oas_z20": 1.0,
"hy_minus_bbb_z20": 1.2,
"nfci_z20": 1.0,
"anfci_z20": 1.0,
"yc_10y_2y_z20": 0.8,
"yc_10y_3m_z20": 0.8,
}
CORE_DIRECTION = {
"vix_z20": 1.0,
"hy_oas_z20": 1.0,
"bbb_oas_z20": 1.0,
"hy_minus_bbb_z20": 1.0,
"nfci_z20": 1.0,
"anfci_z20": 1.0,
"yc_10y_2y_z20": -1.0,
"yc_10y_3m_z20": -1.0,
}
# -----------------------------
# 2) 地政学・資源ショックスコア
# -----------------------------
GEO_FEATURES = {
"wti_z20": 1.0,
"gold_z20": 0.9,
"usdjpy_z20": 0.7,
"wti_chg_5d": 0.8,
"gold_chg_5d": 0.6,
"usdjpy_chg_5d": 0.5,
}
GEO_DIRECTION = {
"wti_z20": 1.0,
"gold_z20": 1.0,
"usdjpy_z20": -1.0,
"wti_chg_5d": 1.0,
"gold_chg_5d": 1.0,
"usdjpy_chg_5d": -1.0,
}
FINAL_CORE_WEIGHT = 0.8
FINAL_GEO_WEIGHT = 0.2
def load_features_wide(db_path: str, feature_names: list[str]) -> pd.DataFrame:
placeholders = ",".join(["?"] * len(feature_names))
sql = f"""
SELECT date, feature_name, value
FROM crisis_features
WHERE feature_name IN ({placeholders})
ORDER BY date
"""
with sqlite3.connect(db_path) as conn:
df = pd.read_sql_query(sql, conn, params=feature_names)
if df.empty:
raise RuntimeError("crisis_features から対象特徴量を取得できませんでした")
df["date"] = pd.to_datetime(df["date"])
df["value"] = pd.to_numeric(df["value"], errors="coerce")
wide = df.pivot(index="date", columns="feature_name", values="value").sort_index()
wide.columns.name = None
return wide
def normalize_to_unit_interval(
s: pd.Series,
clip_min: float = -3.0,
clip_max: float = 3.0
) -> pd.Series:
x = s.clip(lower=clip_min, upper=clip_max)
return (x - clip_min) / (clip_max - clip_min)
def compute_weighted_block_score(
df: pd.DataFrame,
features: dict[str, float],
direction: dict[str, float],
score_name: str
) -> pd.DataFrame:
work = df.copy()
adj_cols = []
for col in features:
if col not in work.columns:
work[col] = np.nan
adj_col = f"{col}_adj"
work[adj_col] = work[col] * direction[col]
adj_cols.append(adj_col)
valid_mask = work[adj_cols].notna().all(axis=1)
norm_df = pd.DataFrame(index=work.index)
for col in adj_cols:
norm_df[col] = normalize_to_unit_interval(work[col])
weights = np.array(
[features[col.replace("_adj", "")] for col in adj_cols],
dtype=float
)
weight_sum = weights.sum()
score_raw = norm_df.mul(weights, axis=1).sum(axis=1) / weight_sum
score_100 = score_raw * 100.0
out = pd.DataFrame(index=work.index)
out[score_name] = np.where(valid_mask, score_100, np.nan)
out[f"{score_name}_valid_feature_count"] = work[adj_cols].notna().sum(axis=1)
out[f"{score_name}_required_feature_count"] = len(adj_cols)
return out
def compute_score(wide: pd.DataFrame) -> pd.DataFrame:
df = wide.copy()
df = df[df.index >= SCORE_START_DATE]
if df.empty:
raise RuntimeError(f"{SCORE_START_DATE} 以降のデータがありません")
core_df = compute_weighted_block_score(
df,
CORE_FEATURES,
CORE_DIRECTION,
"core_score"
)
geo_df = compute_weighted_block_score(
df,
GEO_FEATURES,
GEO_DIRECTION,
"geo_score"
)
out = df.copy()
out = out.join(core_df)
out = out.join(geo_df)
both_valid = out["core_score"].notna() & out["geo_score"].notna()
out["score"] = np.where(
both_valid,
out["core_score"] * FINAL_CORE_WEIGHT + out["geo_score"] * FINAL_GEO_WEIGHT,
np.nan
)
out["regime"] = np.select(
[
out["score"] >= 85,
out["score"] >= 65,
out["score"] >= 45,
],
[
"risk_off",
"warning",
"caution",
],
default="normal"
)
out.loc[out["score"].isna(), "regime"] = None
out["valid_feature_count"] = np.where(
both_valid,
out["core_score_valid_feature_count"] + out["geo_score_valid_feature_count"],
np.nan
)
out["required_feature_count"] = (
out["core_score_required_feature_count"]
+ out["geo_score_required_feature_count"]
)
keep_cols = (
list(CORE_FEATURES.keys())
+ list(GEO_FEATURES.keys())
+ [
"core_score",
"geo_score",
"score",
"regime",
"core_score_valid_feature_count",
"core_score_required_feature_count",
"geo_score_valid_feature_count",
"geo_score_required_feature_count",
"valid_feature_count",
"required_feature_count",
]
)
out = out[keep_cols].copy()
out.index.name = "date"
return out
def init_research_db(db_path: str) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS crisis_score (
date TEXT PRIMARY KEY,
core_score REAL,
geo_score REAL,
score REAL,
regime TEXT,
core_score_valid_feature_count INTEGER,
core_score_required_feature_count INTEGER,
geo_score_valid_feature_count INTEGER,
geo_score_required_feature_count INTEGER,
valid_feature_count INTEGER,
required_feature_count INTEGER
);
""")
conn.commit()
def reset_crisis_score_table(db_path: str) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute("DROP TABLE IF EXISTS crisis_score;")
conn.commit()
def save_score(db_path: str, score_df: pd.DataFrame) -> int:
out = score_df.reset_index()[[
"date",
"core_score",
"geo_score",
"score",
"regime",
"core_score_valid_feature_count",
"core_score_required_feature_count",
"geo_score_valid_feature_count",
"geo_score_required_feature_count",
"valid_feature_count",
"required_feature_count",
]].copy()
out["date"] = pd.to_datetime(out["date"]).dt.strftime("%Y-%m-%d")
out = out.replace({np.nan: None})
rows = list(out.itertuples(index=False, name=None))
with sqlite3.connect(db_path) as conn:
cur = conn.executemany("""
INSERT OR REPLACE INTO crisis_score (
date,
core_score,
geo_score,
score,
regime,
core_score_valid_feature_count,
core_score_required_feature_count,
geo_score_valid_feature_count,
geo_score_required_feature_count,
valid_feature_count,
required_feature_count
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", rows)
conn.commit()
return cur.rowcount
def main():
feature_names = list(CORE_FEATURES.keys()) + list(GEO_FEATURES.keys())
print("Loading crisis features from research.sqlite ...")
wide = load_features_wide(RESEARCH_DB, feature_names)
print(f"Loaded columns: {list(wide.columns)}")
print(f"Raw shape: {wide.shape}")
print(f"Raw date range: {wide.index.min().date()} -> {wide.index.max().date()}")
print("Computing layered crisis score ...")
score_df = compute_score(wide)
print(f"Score start date: {SCORE_START_DATE}")
print(f"Score shape: {score_df.shape}")
print(f"Score date range: {score_df.index.min().date()} -> {score_df.index.max().date()}")
print("Latest rows:")
print(score_df.tail(10).to_string())
print("Preparing research.sqlite ...")
if RESET_CRISIS_SCORE:
print("Dropping existing crisis_score ...")
reset_crisis_score_table(RESEARCH_DB)
init_research_db(RESEARCH_DB)
print("Saving to research.sqlite ...")
n = save_score(RESEARCH_DB, score_df)
print(f"Upserted {n} rows into crisis_score")
if __name__ == "__main__":
main()
実行する
スクリプトを実行します。
python build_crisis_score.py
実行結果は次のようになりました。
Loading crisis features from research.sqlite ... Loaded columns: ['anfci_z20', 'bbb_oas_z20', 'gold_chg_5d', 'gold_z20', 'hy_minus_bbb_z20', 'hy_oas_z20', 'nfci_z20', 'usdjpy_chg_5d', 'usdjpy_z20', 'vix_z20', 'wti_chg_5d', 'wti_z20', 'yc_10y_2y_z20', 'yc_10y_3m_z20'] Raw shape: (13221, 14) Raw date range: 1990-01-01 -> 2026-03-13 Computing layered crisis score ... Score start date: 1997-01-01 Score shape: (10664, 24) Score date range: 1997-01-01 -> 2026-03-13 Preparing research.sqlite ... Dropping existing crisis_score ... Saving to research.sqlite ... Upserted 10664 rows into crisis_score
1997年1月1日から2026年3月13日までのスコアを作成できました。
ハマったところ:旧テーブルの列定義が残っていた
最初に実行したとき、次のエラーが出ました。
sqlite3.OperationalError: table crisis_score has no column named core_score
原因は、既存の crisis_score テーブルが旧版の列定義のままだったことです。
新版のスクリプトでは、core_score や geo_score を保存しようとします。しかし、旧版の crisis_score にはそれらの列が存在しません。
また、元のコードではテーブル作成部分が次のようになっていました。
CREATE TABLE IF NOT EXISTS crisis_score ...
IF NOT EXISTS は、すでにテーブルがある場合は何もしません。そのため、列定義は旧版のまま残ってしまいます。
今回のようにテーブル構造を変更する場合は、単に DELETE するのではなく、DROP TABLE してから作り直す方が安全です。
そのため、リセット処理は次のように変更しました。
def reset_crisis_score_table(db_path: str) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute("DROP TABLE IF EXISTS crisis_score;")
conn.commit()
そして、main() 側では、初期化前に既存テーブルを削除するようにしました。
print("Preparing research.sqlite ...")
if RESET_CRISIS_SCORE:
print("Dropping existing crisis_score ...")
reset_crisis_score_table(RESEARCH_DB)
init_research_db(RESEARCH_DB)
これで、今後テーブル構造を変更しても、古い列定義が残る問題を避けられます。
テーブル構造を確認する
実行後に、crisis_score のスキーマを確認します。
sqlite3 research.sqlite
.schema crisis_score
結果は次の通りです。
CREATE TABLE crisis_score (
date TEXT PRIMARY KEY,
core_score REAL,
geo_score REAL,
score REAL,
regime TEXT,
core_score_valid_feature_count INTEGER,
core_score_required_feature_count INTEGER,
geo_score_valid_feature_count INTEGER,
geo_score_required_feature_count INTEGER,
valid_feature_count INTEGER,
required_feature_count INTEGER
);
core_score と geo_score が追加されているので、2層化されたスコアを保存できています。
旧版と新版の件数を比較する
次に、旧版と新版の件数、期間、NULL件数を確認します。
SELECT MIN(date) AS min_date, MAX(date) AS max_date, COUNT(*) AS total_rows, COUNT(score) AS non_null_score_rows FROM crisis_score;
新版の結果です。
1997-01-01|2026-03-13|10664|10641
旧版も確認します。
SELECT MIN(date) AS min_date, MAX(date) AS max_date, COUNT(*) AS total_rows, COUNT(score) AS non_null_score_rows FROM crisis_score_old;
旧版の結果です。
1997-01-01|2026-03-13|10664|10641
旧版と新版で、期間と件数は一致しています。
これにより、スコア計算対象の期間がずれたり、保存件数が大きく減ったりしていないことを確認できました。
regimeの分布を確認する
次に、normal、caution、warning、risk_off の分布を確認します。
SELECT regime, COUNT(*) FROM crisis_score GROUP BY regime ORDER BY regime;
新版の分布は次の通りです。
|23 caution|5276 normal|4155 risk_off|9 warning|1201
旧版も確認します。
SELECT regime, COUNT(*) FROM crisis_score_old GROUP BY regime ORDER BY regime;
旧版の分布です。
|23 caution|5819 normal|2159 risk_off|172 warning|2491
ここで重要なのは、新版では risk_off と warning がかなり減っていることです。
つまり、geo_scoreを加えたことでスコアの構造は分かりやすくなった一方、最終スコアとしては危機判定が弱くなっている可能性があります。
直近の旧版・新版を比較する
直近の日付について、旧版スコア、新版スコア、core_score、geo_scoreを比較します。
SELECT n.date, o.score AS old_score, o.regime AS old_regime, n.core_score, n.geo_score, n.score AS new_score, n.regime AS new_regime FROM crisis_score n LEFT JOIN crisis_score_old o ON n.date = o.date ORDER BY n.date DESC LIMIT 30;
一部を抜粋すると、次のようになりました。
2026-03-13|66.160164|warning|66.160164|48.354333|62.598998|caution 2026-03-12|68.063244|warning|68.063244|50.739479|64.598491|caution 2026-03-11|59.548978|caution|59.548978|69.445862|61.528354|caution 2026-03-10|58.952546|caution|58.952546|70.641114|61.290259|caution 2026-03-09|70.417150|warning|70.417150|54.998877|67.333496|warning 2026-03-08|67.745484|warning|67.745484|70.642920|68.324971|warning 2026-03-07|70.396756|warning|70.396756|58.554676|68.028340|warning 2026-03-06|73.181938|warning|73.181938|57.725218|70.090594|warning
この比較で分かるのは、core_score が旧版スコアに近く、geo_score を混ぜた new_score はやや低めに出る日がある、ということです。
直近のように地政学・資源要因も見たい場合、geo_score を別で見られるようになった点は大きな改善です。
一方で、最終的な score に混ぜる比率をどうするかは、まだ調整が必要です。
差分が大きい日を確認する
旧版と新版の差が大きい日を確認します。
SELECT n.date, o.score AS old_score, n.score AS new_score, (n.score - o.score) AS score_diff, o.regime AS old_regime, n.regime AS new_regime, n.core_score, n.geo_score FROM crisis_score n JOIN crisis_score_old o ON n.date = o.date WHERE n.score IS NOT NULL AND o.score IS NOT NULL ORDER BY ABS(n.score - o.score) DESC LIMIT 30;
結果を見ると、geo_scoreが高いために新版スコアが上がる日もあれば、逆にcore_scoreが高いのにgeo_scoreが低いため、最終スコアが下がる日もありました。
これは、2層化によってスコアの中身が見えるようになった一方で、単純な加重平均では信用危機の強いシグナルを薄めてしまう可能性がある、ということです。
過去危機イベントで旧版と新版を比較する
次に、過去の主要危機イベントで旧版と新版のスコアがどう動いたかを確認するスクリプトを作成します。
今回作成する check_crisis_events.py では、次の内容を確認します。
- イベント当日の旧版スコア
- イベント当日の新版スコア
- 旧版と新版の差分
- 旧版と新版のregime差分
- イベント前90日以内に warning / risk_off が出ていたか
- イベント前後30日のスコア推移
check_crisis_events.py
import sqlite3
import pandas as pd
RESEARCH_DB = "research.sqlite"
LOOKBACK_DAYS = 90
WINDOW_BEFORE = 30
WINDOW_AFTER = 30
OUTPUT_CSV = "crisis_event_compare.csv"
CRISIS_EVENTS = [
("LTCM危機", "1998-09-23"),
("ITバブル崩壊", "2000-04-14"),
("9.11後市場混乱", "2001-09-17"),
("パリバ・ショック", "2007-08-09"),
("リーマン破綻", "2008-09-15"),
("ギリシャ・欧州債務危機", "2010-05-06"),
("米国格下げショック", "2011-08-05"),
("チャイナショック", "2015-08-24"),
("人民元ショック余波", "2016-01-20"),
("VIXショック", "2018-02-05"),
("2018年末リスクオフ", "2018-12-24"),
("コロナショック初動", "2020-02-24"),
("コロナショック本格化", "2020-03-12"),
("英国トラス・ショック", "2022-09-26"),
("SVB破綻", "2023-03-10"),
]
def load_score_table(
table_name: str,
score_col_name: str,
regime_col_name: str
) -> pd.DataFrame:
sql = f"""
SELECT date, score, regime
FROM {table_name}
ORDER BY date
"""
with sqlite3.connect(RESEARCH_DB) as conn:
df = pd.read_sql_query(sql, conn)
if df.empty:
raise RuntimeError(f"{table_name} テーブルが空です")
df["date"] = pd.to_datetime(df["date"])
df["score"] = pd.to_numeric(df["score"], errors="coerce")
df = df.rename(columns={
"score": score_col_name,
"regime": regime_col_name,
})
return df
def find_first_regime(
df: pd.DataFrame,
event_date: pd.Timestamp,
lookback_days: int,
regime_col: str,
regimes: list[str]
):
start_date = event_date - pd.Timedelta(days=lookback_days)
sub = df[
(df["date"] >= start_date)
& (df["date"] <= event_date)
].copy()
sub = sub[sub[regime_col].isin(regimes)].sort_values("date")
if sub.empty:
return None
return sub.iloc[0]["date"]
def analyze_event(
df: pd.DataFrame,
event_name: str,
event_date_str: str
) -> dict:
event_date = pd.Timestamp(event_date_str)
window_start = event_date - pd.Timedelta(days=WINDOW_BEFORE)
window_end = event_date + pd.Timedelta(days=WINDOW_AFTER)
window_df = df[
(df["date"] >= window_start)
& (df["date"] <= window_end)
].copy()
event_day_row = df[df["date"] == event_date]
old_score = event_day_row.iloc[0]["old_score"] if not event_day_row.empty else None
old_regime = event_day_row.iloc[0]["old_regime"] if not event_day_row.empty else None
new_score = event_day_row.iloc[0]["new_score"] if not event_day_row.empty else None
new_regime = event_day_row.iloc[0]["new_regime"] if not event_day_row.empty else None
first_warning_old = find_first_regime(
df,
event_date,
LOOKBACK_DAYS,
"old_regime",
["warning", "risk_off"]
)
first_warning_new = find_first_regime(
df,
event_date,
LOOKBACK_DAYS,
"new_regime",
["warning", "risk_off"]
)
first_risk_off_old = find_first_regime(
df,
event_date,
LOOKBACK_DAYS,
"old_regime",
["risk_off"]
)
first_risk_off_new = find_first_regime(
df,
event_date,
LOOKBACK_DAYS,
"new_regime",
["risk_off"]
)
return {
"event_name": event_name,
"event_date": event_date.strftime("%Y-%m-%d"),
"old_score": None if pd.isna(old_score) else float(old_score),
"old_regime": old_regime,
"new_score": None if pd.isna(new_score) else float(new_score),
"new_regime": new_regime,
"score_diff_new_minus_old": None
if (pd.isna(old_score) or pd.isna(new_score))
else float(new_score - old_score),
"first_warning_old": None
if first_warning_old is None
else first_warning_old.strftime("%Y-%m-%d"),
"days_before_warning_old": None
if first_warning_old is None
else (event_date - first_warning_old).days,
"first_warning_new": None
if first_warning_new is None
else first_warning_new.strftime("%Y-%m-%d"),
"days_before_warning_new": None
if first_warning_new is None
else (event_date - first_warning_new).days,
"first_risk_off_old": None
if first_risk_off_old is None
else first_risk_off_old.strftime("%Y-%m-%d"),
"days_before_risk_off_old": None
if first_risk_off_old is None
else (event_date - first_risk_off_old).days,
"first_risk_off_new": None
if first_risk_off_new is None
else first_risk_off_new.strftime("%Y-%m-%d"),
"days_before_risk_off_new": None
if first_risk_off_new is None
else (event_date - first_risk_off_new).days,
"window_old_max_score": None
if window_df["old_score"].dropna().empty
else float(window_df["old_score"].max()),
"window_new_max_score": None
if window_df["new_score"].dropna().empty
else float(window_df["new_score"].max()),
}
def print_event_window(
df: pd.DataFrame,
event_name: str,
event_date_str: str
):
event_date = pd.Timestamp(event_date_str)
window_start = event_date - pd.Timedelta(days=WINDOW_BEFORE)
window_end = event_date + pd.Timedelta(days=WINDOW_AFTER)
window_df = df[
(df["date"] >= window_start)
& (df["date"] <= window_end)
].copy()
window_df["date"] = window_df["date"].dt.strftime("%Y-%m-%d")
print("\n" + "=" * 100)
print(f"{event_name} ({event_date_str})")
print("=" * 100)
print(
window_df[[
"date",
"old_score",
"old_regime",
"new_score",
"new_regime",
]].to_string(index=False)
)
def main():
old_df = load_score_table(
"crisis_score_old",
"old_score",
"old_regime"
)
new_df = load_score_table(
"crisis_score",
"new_score",
"new_regime"
)
df = pd.merge(old_df, new_df, on="date", how="outer").sort_values("date")
summary_rows = []
for event_name, event_date in CRISIS_EVENTS:
summary_rows.append(analyze_event(df, event_name, event_date))
summary_df = pd.DataFrame(summary_rows)
print("\n===== compare summary =====")
print(summary_df.to_string(index=False))
summary_df.to_csv(OUTPUT_CSV, index=False, encoding="utf-8-sig")
print(f"\nSaved summary CSV: {OUTPUT_CSV}")
for event_name, event_date in CRISIS_EVENTS:
print_event_window(df, event_name, event_date)
if __name__ == "__main__":
main()
比較スクリプトを実行する
次のコマンドで実行します。
python check_crisis_events.py
実行すると、主要危機イベントごとの旧版・新版比較が表示され、さらに crisis_event_compare.csv に保存されます。
これにより、イベントごとに次のような比較ができます。
- 旧版では
risk_offだったが、新版ではwarningに下がった - 旧版では
warningだったが、新版ではcautionに下がった - geo_scoreが高いことで、新版スコアが旧版より上がった
- 信用危機ではcore_scoreが高く、地政学ショックではgeo_scoreが高くなる
SVB破綻時の比較例
たとえば、SVB破綻前後では次のような動きになりました。
SVB破綻 (2023-03-10) date old_score old_regime new_score new_regime 2023-03-08 55.741222 caution 49.305081 caution 2023-03-09 68.424409 warning 60.529238 caution 2023-03-10 86.176509 risk_off 80.549474 warning 2023-03-11 81.277774 risk_off 76.451197 warning 2023-03-12 77.318386 warning 74.826107 warning 2023-03-13 79.335763 warning 77.087318 warning
旧版では、2023年3月10日に risk_off まで上がっています。
一方、新版では warning にとどまっています。
これは、geo_scoreを合成したことで、core_scoreの強い危機シグナルがやや薄まったためです。
今回の結果から分かったこと
今回の2層化によって、スコアの中身はかなり見やすくなりました。
特に、次のような切り分けができるようになったのは大きな改善です。
- 信用スプレッドやVIXが主導する危機なのか
- WTIや金価格が主導する地政学・資源ショックなのか
- ドル円の動きがリスクオフ方向に寄与しているのか
一方で、最終スコアの合成方法には課題も見つかりました。
旧版と新版を比較すると、多くの主要危機で新版のスコアが下がっています。
特に、次のような変化が見られました。
risk_offがwarningに下がるwarningがcautionに下がる- 過去の信用危機で、警戒レベルがやや弱く出る
これは、core_score に強い危機シグナルが出ているにもかかわらず、geo_score が低いことで最終スコアが薄まっているためです。
現時点での評価
現時点では、旧版と新版は次のような性格になっています。
旧版
- 信用危機には強い
- VIXや社債スプレッドの悪化を素直に反映しやすい
- やや過敏に
risk_offが出る可能性がある
新版
- 信用危機と地政学ショックを分けて見られる
- core_score と geo_score の内訳が分かる
- ただし、単純な加重平均では信用危機のシグナルが薄まる場合がある
つまり、2層化そのものは成功ですが、最終スコアの作り方はまだ調整が必要です。
次にやること
次は、build_crisis_score.py をさらに調整します。
今回の結果を見ると、単純に core_score 80%、geo_score 20% で合成するだけでは、信用危機の検出力が弱くなりすぎる場面があります。
そのため、次は次のような「core優先・geo補助」型にする予定です。
- core_score が高いときは、geo_score が低くても危機判定を弱めすぎない
- geo_score が高いときは、地政学・資源ショックとして別に警戒できるようにする
- final score とは別に、ショックの種類を判定する列を追加する
たとえば、次のような列を追加すると、後で分析しやすくなります。
shock_type:credit / geopolitical / mixed / normalcore_regime:core_scoreだけで見た警戒レベルgeo_regime:geo_scoreだけで見た警戒レベル
これにより、単にスコアを見るだけでなく、「どの種類のリスクが高まっているのか」まで確認できるようになります。
まとめ
今回は、危機監視スコアを2層化し、旧版と新版を比較できるようにしました。
- 既存の
crisis_scoreをcrisis_score_oldとして退避した core_scoreとgeo_scoreを持つ新版crisis_scoreを作成した- テーブル構造変更時の
no column named core_scoreエラーを修正した - 旧版と新版のregime分布を比較した
- 過去危機イベントで、旧版と新版の反応を比較するスクリプトを作成した
結果として、2層化によってスコアの中身は見やすくなりました。
ただし、今の単純な加重平均では、信用危機の強いシグナルが薄まる場面があります。
そのため、次回は core_score を主軸にしつつ、geo_score を補助的に使う形へ調整します。
最終的には、単なる危機スコアではなく、「信用危機なのか」「地政学ショックなのか」「両方が重なっているのか」を判定できる市場監視システムにしていきます。

コメント