危機監視スコアを2層化し、信用危機と地政学ショックを分けて見る

危機監視スコアを2層化し、信用危機と地政学ショックを分けて見る

前回までに、FREDやStooqから取得した市場データを使い、危機監視用の特徴量を research.sqlite に保存するところまで進めました。

今回は、その特徴量を使って作成していた crisis_score を改良します。

これまでのスコアは、VIX、社債スプレッド、金融環境指数、イールドカーブなどを使って、主に信用不安や金融環境の悪化を検出する設計でした。

ただし、直近の市場の緊張状態を見る場合、信用不安だけでは説明しきれない局面があります。たとえば、原油価格や金価格が動くような地政学リスク、資源ショック、為替の急変などです。

そこで今回は、危機監視スコアを次の2層構造に変更しました。

  • core_score:信用・ボラティリティ・金融条件を見るスコア
  • geo_score:WTI、金、ドル円などから地政学・資源ショックを見るスコア
  • score:core_score と geo_score を合成した最終スコア

この形にしておくと、単に「危機っぽい」だけではなく、後から「信用危機なのか」「地政学ショックなのか」を分けて確認しやすくなります。

今回やること

今回の作業は、いきなり既存の crisis_score を上書きするのではなく、旧版を退避してから新版を作る流れにしました。

  1. 既存の crisis_scorecrisis_score_old として保存する
  2. build_crisis_score.py を2層化対応版に変更する
  3. 新しい crisis_score を作成する
  4. check_crisis_events.py で旧版と新版を比較する

旧版を残しておくことで、新しいスコアが過去の危機でどう変わったかを比較できます。

既存のcrisis_scoreを退避する

まず、現在の crisis_scorecrisis_score_old として保存します。

sqlite3 research.sqlite

SQLiteに入ったら、次のSQLを実行します。

DROP TABLE IF EXISTS crisis_score_old;

CREATE TABLE crisis_score_old AS
SELECT * FROM crisis_score;

保存できたか確認します。

SELECT COUNT(*) FROM crisis_score_old;

結果は次のようになりました。

10664

期間も確認します。

SELECT MIN(date), MAX(date) FROM crisis_score_old;

結果は次の通りです。

1997-01-01|2026-03-13

これで、旧版の危機スコアを退避できました。

スコアを2層構造にする

次に、build_crisis_score.py を2層化します。

今回の考え方は、次の通りです。

  • core_score は、信用危機や金融環境の悪化を検出する
  • geo_score は、WTI、金、ドル円を使って地政学・資源ショックを検出する
  • score は、core_score を主、geo_score を補助として合成する

今回の合成比率は、いったん次のようにしました。

  • core_score:80%
  • geo_score:20%

信用危機を検出する既存スコアの性格を残しつつ、WTIや金価格の変化も反映できるようにするためです。

build_crisis_score.py

以下が、今回作成した2層化対応版の build_crisis_score.py です。

import sqlite3
import pandas as pd
import numpy as np

RESEARCH_DB = "research.sqlite"

SCORE_START_DATE = "1997-01-01"
RESET_CRISIS_SCORE = True

# -----------------------------
# 1) コア危機スコア
# -----------------------------
CORE_FEATURES = {
    "vix_z20": 1.2,
    "hy_oas_z20": 1.4,
    "bbb_oas_z20": 1.0,
    "hy_minus_bbb_z20": 1.2,
    "nfci_z20": 1.0,
    "anfci_z20": 1.0,
    "yc_10y_2y_z20": 0.8,
    "yc_10y_3m_z20": 0.8,
}

CORE_DIRECTION = {
    "vix_z20": 1.0,
    "hy_oas_z20": 1.0,
    "bbb_oas_z20": 1.0,
    "hy_minus_bbb_z20": 1.0,
    "nfci_z20": 1.0,
    "anfci_z20": 1.0,
    "yc_10y_2y_z20": -1.0,
    "yc_10y_3m_z20": -1.0,
}

# -----------------------------
# 2) 地政学・資源ショックスコア
# -----------------------------
GEO_FEATURES = {
    "wti_z20": 1.0,
    "gold_z20": 0.9,
    "usdjpy_z20": 0.7,
    "wti_chg_5d": 0.8,
    "gold_chg_5d": 0.6,
    "usdjpy_chg_5d": 0.5,
}

GEO_DIRECTION = {
    "wti_z20": 1.0,
    "gold_z20": 1.0,
    "usdjpy_z20": -1.0,
    "wti_chg_5d": 1.0,
    "gold_chg_5d": 1.0,
    "usdjpy_chg_5d": -1.0,
}

FINAL_CORE_WEIGHT = 0.8
FINAL_GEO_WEIGHT = 0.2


def load_features_wide(db_path: str, feature_names: list[str]) -> pd.DataFrame:
    placeholders = ",".join(["?"] * len(feature_names))

    sql = f"""
    SELECT date, feature_name, value
    FROM crisis_features
    WHERE feature_name IN ({placeholders})
    ORDER BY date
    """

    with sqlite3.connect(db_path) as conn:
        df = pd.read_sql_query(sql, conn, params=feature_names)

    if df.empty:
        raise RuntimeError("crisis_features から対象特徴量を取得できませんでした")

    df["date"] = pd.to_datetime(df["date"])
    df["value"] = pd.to_numeric(df["value"], errors="coerce")

    wide = df.pivot(index="date", columns="feature_name", values="value").sort_index()
    wide.columns.name = None

    return wide


def normalize_to_unit_interval(
    s: pd.Series,
    clip_min: float = -3.0,
    clip_max: float = 3.0
) -> pd.Series:
    x = s.clip(lower=clip_min, upper=clip_max)
    return (x - clip_min) / (clip_max - clip_min)


def compute_weighted_block_score(
    df: pd.DataFrame,
    features: dict[str, float],
    direction: dict[str, float],
    score_name: str
) -> pd.DataFrame:
    work = df.copy()

    adj_cols = []

    for col in features:
        if col not in work.columns:
            work[col] = np.nan

        adj_col = f"{col}_adj"
        work[adj_col] = work[col] * direction[col]
        adj_cols.append(adj_col)

    valid_mask = work[adj_cols].notna().all(axis=1)

    norm_df = pd.DataFrame(index=work.index)

    for col in adj_cols:
        norm_df[col] = normalize_to_unit_interval(work[col])

    weights = np.array(
        [features[col.replace("_adj", "")] for col in adj_cols],
        dtype=float
    )

    weight_sum = weights.sum()

    score_raw = norm_df.mul(weights, axis=1).sum(axis=1) / weight_sum
    score_100 = score_raw * 100.0

    out = pd.DataFrame(index=work.index)
    out[score_name] = np.where(valid_mask, score_100, np.nan)
    out[f"{score_name}_valid_feature_count"] = work[adj_cols].notna().sum(axis=1)
    out[f"{score_name}_required_feature_count"] = len(adj_cols)

    return out


def compute_score(wide: pd.DataFrame) -> pd.DataFrame:
    df = wide.copy()
    df = df[df.index >= SCORE_START_DATE]

    if df.empty:
        raise RuntimeError(f"{SCORE_START_DATE} 以降のデータがありません")

    core_df = compute_weighted_block_score(
        df,
        CORE_FEATURES,
        CORE_DIRECTION,
        "core_score"
    )

    geo_df = compute_weighted_block_score(
        df,
        GEO_FEATURES,
        GEO_DIRECTION,
        "geo_score"
    )

    out = df.copy()
    out = out.join(core_df)
    out = out.join(geo_df)

    both_valid = out["core_score"].notna() & out["geo_score"].notna()

    out["score"] = np.where(
        both_valid,
        out["core_score"] * FINAL_CORE_WEIGHT + out["geo_score"] * FINAL_GEO_WEIGHT,
        np.nan
    )

    out["regime"] = np.select(
        [
            out["score"] >= 85,
            out["score"] >= 65,
            out["score"] >= 45,
        ],
        [
            "risk_off",
            "warning",
            "caution",
        ],
        default="normal"
    )

    out.loc[out["score"].isna(), "regime"] = None

    out["valid_feature_count"] = np.where(
        both_valid,
        out["core_score_valid_feature_count"] + out["geo_score_valid_feature_count"],
        np.nan
    )

    out["required_feature_count"] = (
        out["core_score_required_feature_count"]
        + out["geo_score_required_feature_count"]
    )

    keep_cols = (
        list(CORE_FEATURES.keys())
        + list(GEO_FEATURES.keys())
        + [
            "core_score",
            "geo_score",
            "score",
            "regime",
            "core_score_valid_feature_count",
            "core_score_required_feature_count",
            "geo_score_valid_feature_count",
            "geo_score_required_feature_count",
            "valid_feature_count",
            "required_feature_count",
        ]
    )

    out = out[keep_cols].copy()
    out.index.name = "date"

    return out


def init_research_db(db_path: str) -> None:
    with sqlite3.connect(db_path) as conn:
        conn.execute("""
        CREATE TABLE IF NOT EXISTS crisis_score (
            date TEXT PRIMARY KEY,
            core_score REAL,
            geo_score REAL,
            score REAL,
            regime TEXT,
            core_score_valid_feature_count INTEGER,
            core_score_required_feature_count INTEGER,
            geo_score_valid_feature_count INTEGER,
            geo_score_required_feature_count INTEGER,
            valid_feature_count INTEGER,
            required_feature_count INTEGER
        );
        """)
        conn.commit()


def reset_crisis_score_table(db_path: str) -> None:
    with sqlite3.connect(db_path) as conn:
        conn.execute("DROP TABLE IF EXISTS crisis_score;")
        conn.commit()


def save_score(db_path: str, score_df: pd.DataFrame) -> int:
    out = score_df.reset_index()[[
        "date",
        "core_score",
        "geo_score",
        "score",
        "regime",
        "core_score_valid_feature_count",
        "core_score_required_feature_count",
        "geo_score_valid_feature_count",
        "geo_score_required_feature_count",
        "valid_feature_count",
        "required_feature_count",
    ]].copy()

    out["date"] = pd.to_datetime(out["date"]).dt.strftime("%Y-%m-%d")
    out = out.replace({np.nan: None})

    rows = list(out.itertuples(index=False, name=None))

    with sqlite3.connect(db_path) as conn:
        cur = conn.executemany("""
            INSERT OR REPLACE INTO crisis_score (
                date,
                core_score,
                geo_score,
                score,
                regime,
                core_score_valid_feature_count,
                core_score_required_feature_count,
                geo_score_valid_feature_count,
                geo_score_required_feature_count,
                valid_feature_count,
                required_feature_count
            )
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, rows)

        conn.commit()
        return cur.rowcount


def main():
    feature_names = list(CORE_FEATURES.keys()) + list(GEO_FEATURES.keys())

    print("Loading crisis features from research.sqlite ...")
    wide = load_features_wide(RESEARCH_DB, feature_names)

    print(f"Loaded columns: {list(wide.columns)}")
    print(f"Raw shape: {wide.shape}")
    print(f"Raw date range: {wide.index.min().date()} -> {wide.index.max().date()}")

    print("Computing layered crisis score ...")
    score_df = compute_score(wide)

    print(f"Score start date: {SCORE_START_DATE}")
    print(f"Score shape: {score_df.shape}")
    print(f"Score date range: {score_df.index.min().date()} -> {score_df.index.max().date()}")

    print("Latest rows:")
    print(score_df.tail(10).to_string())

    print("Preparing research.sqlite ...")

    if RESET_CRISIS_SCORE:
        print("Dropping existing crisis_score ...")
        reset_crisis_score_table(RESEARCH_DB)

    init_research_db(RESEARCH_DB)

    print("Saving to research.sqlite ...")
    n = save_score(RESEARCH_DB, score_df)

    print(f"Upserted {n} rows into crisis_score")


if __name__ == "__main__":
    main()

実行する

スクリプトを実行します。

python build_crisis_score.py

実行結果は次のようになりました。

Loading crisis features from research.sqlite ...
Loaded columns: ['anfci_z20', 'bbb_oas_z20', 'gold_chg_5d', 'gold_z20', 'hy_minus_bbb_z20', 'hy_oas_z20', 'nfci_z20', 'usdjpy_chg_5d', 'usdjpy_z20', 'vix_z20', 'wti_chg_5d', 'wti_z20', 'yc_10y_2y_z20', 'yc_10y_3m_z20']
Raw shape: (13221, 14)
Raw date range: 1990-01-01 -> 2026-03-13
Computing layered crisis score ...
Score start date: 1997-01-01
Score shape: (10664, 24)
Score date range: 1997-01-01 -> 2026-03-13
Preparing research.sqlite ...
Dropping existing crisis_score ...
Saving to research.sqlite ...
Upserted 10664 rows into crisis_score

1997年1月1日から2026年3月13日までのスコアを作成できました。

ハマったところ:旧テーブルの列定義が残っていた

最初に実行したとき、次のエラーが出ました。

sqlite3.OperationalError: table crisis_score has no column named core_score

原因は、既存の crisis_score テーブルが旧版の列定義のままだったことです。

新版のスクリプトでは、core_scoregeo_score を保存しようとします。しかし、旧版の crisis_score にはそれらの列が存在しません。

また、元のコードではテーブル作成部分が次のようになっていました。

CREATE TABLE IF NOT EXISTS crisis_score ...

IF NOT EXISTS は、すでにテーブルがある場合は何もしません。そのため、列定義は旧版のまま残ってしまいます。

今回のようにテーブル構造を変更する場合は、単に DELETE するのではなく、DROP TABLE してから作り直す方が安全です。

そのため、リセット処理は次のように変更しました。

def reset_crisis_score_table(db_path: str) -> None:
    with sqlite3.connect(db_path) as conn:
        conn.execute("DROP TABLE IF EXISTS crisis_score;")
        conn.commit()

そして、main() 側では、初期化前に既存テーブルを削除するようにしました。

print("Preparing research.sqlite ...")

if RESET_CRISIS_SCORE:
    print("Dropping existing crisis_score ...")
    reset_crisis_score_table(RESEARCH_DB)

init_research_db(RESEARCH_DB)

これで、今後テーブル構造を変更しても、古い列定義が残る問題を避けられます。

テーブル構造を確認する

実行後に、crisis_score のスキーマを確認します。

sqlite3 research.sqlite
.schema crisis_score

結果は次の通りです。

CREATE TABLE crisis_score (
    date TEXT PRIMARY KEY,
    core_score REAL,
    geo_score REAL,
    score REAL,
    regime TEXT,
    core_score_valid_feature_count INTEGER,
    core_score_required_feature_count INTEGER,
    geo_score_valid_feature_count INTEGER,
    geo_score_required_feature_count INTEGER,
    valid_feature_count INTEGER,
    required_feature_count INTEGER
);

core_scoregeo_score が追加されているので、2層化されたスコアを保存できています。

旧版と新版の件数を比較する

次に、旧版と新版の件数、期間、NULL件数を確認します。

SELECT
  MIN(date) AS min_date,
  MAX(date) AS max_date,
  COUNT(*) AS total_rows,
  COUNT(score) AS non_null_score_rows
FROM crisis_score;

新版の結果です。

1997-01-01|2026-03-13|10664|10641

旧版も確認します。

SELECT
  MIN(date) AS min_date,
  MAX(date) AS max_date,
  COUNT(*) AS total_rows,
  COUNT(score) AS non_null_score_rows
FROM crisis_score_old;

旧版の結果です。

1997-01-01|2026-03-13|10664|10641

旧版と新版で、期間と件数は一致しています。

これにより、スコア計算対象の期間がずれたり、保存件数が大きく減ったりしていないことを確認できました。

regimeの分布を確認する

次に、normalcautionwarningrisk_off の分布を確認します。

SELECT regime, COUNT(*)
FROM crisis_score
GROUP BY regime
ORDER BY regime;

新版の分布は次の通りです。

|23
caution|5276
normal|4155
risk_off|9
warning|1201

旧版も確認します。

SELECT regime, COUNT(*)
FROM crisis_score_old
GROUP BY regime
ORDER BY regime;

旧版の分布です。

|23
caution|5819
normal|2159
risk_off|172
warning|2491

ここで重要なのは、新版では risk_offwarning がかなり減っていることです。

つまり、geo_scoreを加えたことでスコアの構造は分かりやすくなった一方、最終スコアとしては危機判定が弱くなっている可能性があります。

直近の旧版・新版を比較する

直近の日付について、旧版スコア、新版スコア、core_score、geo_scoreを比較します。

SELECT
  n.date,
  o.score AS old_score,
  o.regime AS old_regime,
  n.core_score,
  n.geo_score,
  n.score AS new_score,
  n.regime AS new_regime
FROM crisis_score n
LEFT JOIN crisis_score_old o
  ON n.date = o.date
ORDER BY n.date DESC
LIMIT 30;

一部を抜粋すると、次のようになりました。

2026-03-13|66.160164|warning|66.160164|48.354333|62.598998|caution
2026-03-12|68.063244|warning|68.063244|50.739479|64.598491|caution
2026-03-11|59.548978|caution|59.548978|69.445862|61.528354|caution
2026-03-10|58.952546|caution|58.952546|70.641114|61.290259|caution
2026-03-09|70.417150|warning|70.417150|54.998877|67.333496|warning
2026-03-08|67.745484|warning|67.745484|70.642920|68.324971|warning
2026-03-07|70.396756|warning|70.396756|58.554676|68.028340|warning
2026-03-06|73.181938|warning|73.181938|57.725218|70.090594|warning

この比較で分かるのは、core_score が旧版スコアに近く、geo_score を混ぜた new_score はやや低めに出る日がある、ということです。

直近のように地政学・資源要因も見たい場合、geo_score を別で見られるようになった点は大きな改善です。

一方で、最終的な score に混ぜる比率をどうするかは、まだ調整が必要です。

差分が大きい日を確認する

旧版と新版の差が大きい日を確認します。

SELECT
  n.date,
  o.score AS old_score,
  n.score AS new_score,
  (n.score - o.score) AS score_diff,
  o.regime AS old_regime,
  n.regime AS new_regime,
  n.core_score,
  n.geo_score
FROM crisis_score n
JOIN crisis_score_old o
  ON n.date = o.date
WHERE n.score IS NOT NULL
  AND o.score IS NOT NULL
ORDER BY ABS(n.score - o.score) DESC
LIMIT 30;

結果を見ると、geo_scoreが高いために新版スコアが上がる日もあれば、逆にcore_scoreが高いのにgeo_scoreが低いため、最終スコアが下がる日もありました。

これは、2層化によってスコアの中身が見えるようになった一方で、単純な加重平均では信用危機の強いシグナルを薄めてしまう可能性がある、ということです。

過去危機イベントで旧版と新版を比較する

次に、過去の主要危機イベントで旧版と新版のスコアがどう動いたかを確認するスクリプトを作成します。

今回作成する check_crisis_events.py では、次の内容を確認します。

  • イベント当日の旧版スコア
  • イベント当日の新版スコア
  • 旧版と新版の差分
  • 旧版と新版のregime差分
  • イベント前90日以内に warning / risk_off が出ていたか
  • イベント前後30日のスコア推移

check_crisis_events.py

import sqlite3
import pandas as pd

RESEARCH_DB = "research.sqlite"

LOOKBACK_DAYS = 90
WINDOW_BEFORE = 30
WINDOW_AFTER = 30
OUTPUT_CSV = "crisis_event_compare.csv"

CRISIS_EVENTS = [
    ("LTCM危機", "1998-09-23"),
    ("ITバブル崩壊", "2000-04-14"),
    ("9.11後市場混乱", "2001-09-17"),
    ("パリバ・ショック", "2007-08-09"),
    ("リーマン破綻", "2008-09-15"),
    ("ギリシャ・欧州債務危機", "2010-05-06"),
    ("米国格下げショック", "2011-08-05"),
    ("チャイナショック", "2015-08-24"),
    ("人民元ショック余波", "2016-01-20"),
    ("VIXショック", "2018-02-05"),
    ("2018年末リスクオフ", "2018-12-24"),
    ("コロナショック初動", "2020-02-24"),
    ("コロナショック本格化", "2020-03-12"),
    ("英国トラス・ショック", "2022-09-26"),
    ("SVB破綻", "2023-03-10"),
]


def load_score_table(
    table_name: str,
    score_col_name: str,
    regime_col_name: str
) -> pd.DataFrame:
    sql = f"""
    SELECT date, score, regime
    FROM {table_name}
    ORDER BY date
    """

    with sqlite3.connect(RESEARCH_DB) as conn:
        df = pd.read_sql_query(sql, conn)

    if df.empty:
        raise RuntimeError(f"{table_name} テーブルが空です")

    df["date"] = pd.to_datetime(df["date"])
    df["score"] = pd.to_numeric(df["score"], errors="coerce")

    df = df.rename(columns={
        "score": score_col_name,
        "regime": regime_col_name,
    })

    return df


def find_first_regime(
    df: pd.DataFrame,
    event_date: pd.Timestamp,
    lookback_days: int,
    regime_col: str,
    regimes: list[str]
):
    start_date = event_date - pd.Timedelta(days=lookback_days)

    sub = df[
        (df["date"] >= start_date)
        & (df["date"] <= event_date)
    ].copy()

    sub = sub[sub[regime_col].isin(regimes)].sort_values("date")

    if sub.empty:
        return None

    return sub.iloc[0]["date"]


def analyze_event(
    df: pd.DataFrame,
    event_name: str,
    event_date_str: str
) -> dict:
    event_date = pd.Timestamp(event_date_str)

    window_start = event_date - pd.Timedelta(days=WINDOW_BEFORE)
    window_end = event_date + pd.Timedelta(days=WINDOW_AFTER)

    window_df = df[
        (df["date"] >= window_start)
        & (df["date"] <= window_end)
    ].copy()

    event_day_row = df[df["date"] == event_date]

    old_score = event_day_row.iloc[0]["old_score"] if not event_day_row.empty else None
    old_regime = event_day_row.iloc[0]["old_regime"] if not event_day_row.empty else None
    new_score = event_day_row.iloc[0]["new_score"] if not event_day_row.empty else None
    new_regime = event_day_row.iloc[0]["new_regime"] if not event_day_row.empty else None

    first_warning_old = find_first_regime(
        df,
        event_date,
        LOOKBACK_DAYS,
        "old_regime",
        ["warning", "risk_off"]
    )

    first_warning_new = find_first_regime(
        df,
        event_date,
        LOOKBACK_DAYS,
        "new_regime",
        ["warning", "risk_off"]
    )

    first_risk_off_old = find_first_regime(
        df,
        event_date,
        LOOKBACK_DAYS,
        "old_regime",
        ["risk_off"]
    )

    first_risk_off_new = find_first_regime(
        df,
        event_date,
        LOOKBACK_DAYS,
        "new_regime",
        ["risk_off"]
    )

    return {
        "event_name": event_name,
        "event_date": event_date.strftime("%Y-%m-%d"),
        "old_score": None if pd.isna(old_score) else float(old_score),
        "old_regime": old_regime,
        "new_score": None if pd.isna(new_score) else float(new_score),
        "new_regime": new_regime,
        "score_diff_new_minus_old": None
        if (pd.isna(old_score) or pd.isna(new_score))
        else float(new_score - old_score),
        "first_warning_old": None
        if first_warning_old is None
        else first_warning_old.strftime("%Y-%m-%d"),
        "days_before_warning_old": None
        if first_warning_old is None
        else (event_date - first_warning_old).days,
        "first_warning_new": None
        if first_warning_new is None
        else first_warning_new.strftime("%Y-%m-%d"),
        "days_before_warning_new": None
        if first_warning_new is None
        else (event_date - first_warning_new).days,
        "first_risk_off_old": None
        if first_risk_off_old is None
        else first_risk_off_old.strftime("%Y-%m-%d"),
        "days_before_risk_off_old": None
        if first_risk_off_old is None
        else (event_date - first_risk_off_old).days,
        "first_risk_off_new": None
        if first_risk_off_new is None
        else first_risk_off_new.strftime("%Y-%m-%d"),
        "days_before_risk_off_new": None
        if first_risk_off_new is None
        else (event_date - first_risk_off_new).days,
        "window_old_max_score": None
        if window_df["old_score"].dropna().empty
        else float(window_df["old_score"].max()),
        "window_new_max_score": None
        if window_df["new_score"].dropna().empty
        else float(window_df["new_score"].max()),
    }


def print_event_window(
    df: pd.DataFrame,
    event_name: str,
    event_date_str: str
):
    event_date = pd.Timestamp(event_date_str)

    window_start = event_date - pd.Timedelta(days=WINDOW_BEFORE)
    window_end = event_date + pd.Timedelta(days=WINDOW_AFTER)

    window_df = df[
        (df["date"] >= window_start)
        & (df["date"] <= window_end)
    ].copy()

    window_df["date"] = window_df["date"].dt.strftime("%Y-%m-%d")

    print("\n" + "=" * 100)
    print(f"{event_name} ({event_date_str})")
    print("=" * 100)

    print(
        window_df[[
            "date",
            "old_score",
            "old_regime",
            "new_score",
            "new_regime",
        ]].to_string(index=False)
    )


def main():
    old_df = load_score_table(
        "crisis_score_old",
        "old_score",
        "old_regime"
    )

    new_df = load_score_table(
        "crisis_score",
        "new_score",
        "new_regime"
    )

    df = pd.merge(old_df, new_df, on="date", how="outer").sort_values("date")

    summary_rows = []

    for event_name, event_date in CRISIS_EVENTS:
        summary_rows.append(analyze_event(df, event_name, event_date))

    summary_df = pd.DataFrame(summary_rows)

    print("\n===== compare summary =====")
    print(summary_df.to_string(index=False))

    summary_df.to_csv(OUTPUT_CSV, index=False, encoding="utf-8-sig")
    print(f"\nSaved summary CSV: {OUTPUT_CSV}")

    for event_name, event_date in CRISIS_EVENTS:
        print_event_window(df, event_name, event_date)


if __name__ == "__main__":
    main()

比較スクリプトを実行する

次のコマンドで実行します。

python check_crisis_events.py

実行すると、主要危機イベントごとの旧版・新版比較が表示され、さらに crisis_event_compare.csv に保存されます。

これにより、イベントごとに次のような比較ができます。

  • 旧版では risk_off だったが、新版では warning に下がった
  • 旧版では warning だったが、新版では caution に下がった
  • geo_scoreが高いことで、新版スコアが旧版より上がった
  • 信用危機ではcore_scoreが高く、地政学ショックではgeo_scoreが高くなる

SVB破綻時の比較例

たとえば、SVB破綻前後では次のような動きになりました。

SVB破綻 (2023-03-10)

date        old_score   old_regime   new_score   new_regime
2023-03-08  55.741222   caution      49.305081   caution
2023-03-09  68.424409   warning      60.529238   caution
2023-03-10  86.176509   risk_off     80.549474   warning
2023-03-11  81.277774   risk_off     76.451197   warning
2023-03-12  77.318386   warning      74.826107   warning
2023-03-13  79.335763   warning      77.087318   warning

旧版では、2023年3月10日に risk_off まで上がっています。

一方、新版では warning にとどまっています。

これは、geo_scoreを合成したことで、core_scoreの強い危機シグナルがやや薄まったためです。

今回の結果から分かったこと

今回の2層化によって、スコアの中身はかなり見やすくなりました。

特に、次のような切り分けができるようになったのは大きな改善です。

  • 信用スプレッドやVIXが主導する危機なのか
  • WTIや金価格が主導する地政学・資源ショックなのか
  • ドル円の動きがリスクオフ方向に寄与しているのか

一方で、最終スコアの合成方法には課題も見つかりました。

旧版と新版を比較すると、多くの主要危機で新版のスコアが下がっています。

特に、次のような変化が見られました。

  • risk_offwarning に下がる
  • warningcaution に下がる
  • 過去の信用危機で、警戒レベルがやや弱く出る

これは、core_score に強い危機シグナルが出ているにもかかわらず、geo_score が低いことで最終スコアが薄まっているためです。

現時点での評価

現時点では、旧版と新版は次のような性格になっています。

旧版

  • 信用危機には強い
  • VIXや社債スプレッドの悪化を素直に反映しやすい
  • やや過敏に risk_off が出る可能性がある

新版

  • 信用危機と地政学ショックを分けて見られる
  • core_score と geo_score の内訳が分かる
  • ただし、単純な加重平均では信用危機のシグナルが薄まる場合がある

つまり、2層化そのものは成功ですが、最終スコアの作り方はまだ調整が必要です。

次にやること

次は、build_crisis_score.py をさらに調整します。

今回の結果を見ると、単純に core_score 80%、geo_score 20% で合成するだけでは、信用危機の検出力が弱くなりすぎる場面があります。

そのため、次は次のような「core優先・geo補助」型にする予定です。

  • core_score が高いときは、geo_score が低くても危機判定を弱めすぎない
  • geo_score が高いときは、地政学・資源ショックとして別に警戒できるようにする
  • final score とは別に、ショックの種類を判定する列を追加する

たとえば、次のような列を追加すると、後で分析しやすくなります。

  • shock_type:credit / geopolitical / mixed / normal
  • core_regime:core_scoreだけで見た警戒レベル
  • geo_regime:geo_scoreだけで見た警戒レベル

これにより、単にスコアを見るだけでなく、「どの種類のリスクが高まっているのか」まで確認できるようになります。

まとめ

今回は、危機監視スコアを2層化し、旧版と新版を比較できるようにしました。

  • 既存の crisis_scorecrisis_score_old として退避した
  • core_scoregeo_score を持つ新版 crisis_score を作成した
  • テーブル構造変更時の no column named core_score エラーを修正した
  • 旧版と新版のregime分布を比較した
  • 過去危機イベントで、旧版と新版の反応を比較するスクリプトを作成した

結果として、2層化によってスコアの中身は見やすくなりました。

ただし、今の単純な加重平均では、信用危機の強いシグナルが薄まる場面があります。

そのため、次回は core_score を主軸にしつつ、geo_score を補助的に使う形へ調整します。

最終的には、単なる危機スコアではなく、「信用危機なのか」「地政学ショックなのか」「両方が重なっているのか」を判定できる市場監視システムにしていきます。

コメント

タイトルとURLをコピーしました