危機監視用特徴量にJPYのIMMポジションを追加し、2つのSQLite DBを統合して扱う

危機監視用特徴量にJPYのIMMポジションを追加し、2つのSQLite DBを統合して扱う

前回までに、FREDやStooqから取得した市場データを market.db に保存し、危機監視用の特徴量を research.sqlite に保存する流れを作りました。

具体的には、米国債利回り、VIX、社債スプレッド、SOFR、NFCI、ANFCI、SLOOS、USD/JPY、WTI、金価格などを読み込み、日次の特徴量として整形するところまで進めています。

今回はそこに、CFTCのCOTデータから作成した cot_legacy.db を追加で読み込み、JPYのIMMポジションを危機監視用特徴量に加えます。

目的は、単なる金利・ボラティリティ・信用スプレッドだけでなく、投機筋の円ポジションも見ながら、将来的に position_score のような需給系スコアへつなげることです。

今回やること

今回の目的は、既存の build_crisis_features.py を大きく作り替えることではありません。

最小差分で、次の3点だけを追加します。

  • market.db に加えて cot_legacy.db を読む
  • COT LegacyデータからJPYのIMMポジション比率を作る
  • 既存の日次特徴量に imm_jpy_rate 系の特徴量を追加して保存する

イメージとしては、既存の市場データ特徴量に、週次のIMMデータを日次へ前方補完して結合する形です。

前提となるDB構成

今回使うDBは3つです。

DB 役割
market.db FRED、Stooqなどから取得した市場データを保存するDB
cot_legacy.db CFTC COT Legacyデータを保存するDB
research.sqlite 加工後の危機監視用特徴量を保存するDB

既存の market.db には、次のような系列が入っています。

  • 米10年債利回り
  • 米2年債利回り
  • 米3か月債利回り
  • VIX
  • High Yield OAS
  • BBB OAS
  • SOFR
  • NFCI / ANFCI
  • SLOOS
  • USD/JPY
  • WTI
  • Gold

今回追加する cot_legacy.db からは、JPYのCOTデータを読み込みます。

追加するJPY IMM特徴量

今回追加する特徴量は、まず次の4本に絞ります。

  • imm_jpy_rate
  • imm_jpy_rate_chg_4w
  • imm_jpy_rate_z20
  • imm_jpy_rate_ma20_gap

imm_jpy_rate は、非商業部門のロングとショートの差をOpen Interestで割ったものです。

計算式としては次のようになります。

imm_jpy_net = noncomm_long - noncomm_short
imm_jpy_rate = imm_jpy_net / open_interest

これにより、単純な枚数ではなく、Open Interestに対するネットポジション比率として扱えます。

そのうえで、20営業日差分、20日ローリングz-score、20日移動平均からの乖離も作ります。

週次データを日次へ前方補完しているため、ここでいう4週変化は厳密な週次4回分ではなく、日次化後の20営業日相当として扱っています。

修正後の build_crisis_features.py

修正後のコードは次のとおりです。

import sqlite3
import pandas as pd
import numpy as np

MARKET_DB = "market.db"
COT_DB = "cot_legacy.db"
RESEARCH_DB = "research.sqlite"

# 危機監視用の分析期間

ANALYSIS_START_DATE = "1990-01-01"

# True にすると保存前に crisis_features を全削除

RESET_CRISIS_FEATURES = True

# 使用する系列

FEATURE_SERIES = {
"DGS10": "dgs10",
"DGS2": "dgs2",
"DGS3MO": "dgs3mo",
"VIXCLS": "vix",
"BAMLH0A0HYM2": "hy_oas",
"BAMLC0A4CBBB": "bbb_oas",
"SOFR": "sofr",
"NFCI": "nfci",
"ANFCI": "anfci",
"DRTSCILM": "sloos",
"DEXJPUS": "usdjpy",
"DCOILWTICO": "wti",
"XAUUSD_STOOQ": "gold",
}

IMM_JPY_MARKET_NAME = "JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE"

def load_series_wide(db_path: str, series_map: dict[str, str]) -> pd.DataFrame:
series_ids = list(series_map.keys())
placeholders = ",".join(["?"] * len(series_ids))

```
sql = f"""
SELECT series_id, date, value
FROM series_observations
WHERE series_id IN ({placeholders})
ORDER BY date
"""

with sqlite3.connect(db_path) as conn:
    df = pd.read_sql_query(sql, conn, params=series_ids)

if df.empty:
    raise RuntimeError("series_observations から対象系列を取得できませんでした")

df["date"] = pd.to_datetime(df["date"])
df["value"] = pd.to_numeric(df["value"], errors="coerce")

wide = df.pivot(index="date", columns="series_id", values="value").sort_index()
rename_map = {k: v for k, v in series_map.items() if k in wide.columns}
wide = wide.rename(columns=rename_map)
wide.columns.name = None

return wide
```

def load_imm_jpy_weekly(cot_db_path: str) -> pd.DataFrame:
sql = """
SELECT
"As of Date in Form YYYY-MM-DD" AS date,
"Open Interest (All)" AS open_interest,
"Noncommercial Positions-Long (All)" AS noncomm_long,
"Noncommercial Positions-Short (All)" AS noncomm_short
FROM cot_legacy_fut_only
WHERE "Market and Exchange Names" = ?
AND "As of Date in Form YYYY-MM-DD" IS NOT NULL
ORDER BY "As of Date in Form YYYY-MM-DD"
"""

```
with sqlite3.connect(cot_db_path) as conn:
    df = pd.read_sql_query(sql, conn, params=[IMM_JPY_MARKET_NAME])

if df.empty:
    raise RuntimeError("cot_legacy.db から JPY の IMM データを取得できませんでした")

df["date"] = pd.to_datetime(df["date"])

for col in ["open_interest", "noncomm_long", "noncomm_short"]:
    df[col] = pd.to_numeric(df[col], errors="coerce")

df["imm_jpy_net"] = df["noncomm_long"] - df["noncomm_short"]
df["imm_jpy_rate"] = df["imm_jpy_net"] / df["open_interest"]

out = (
    df[["date", "imm_jpy_rate"]]
    .dropna(subset=["date", "imm_jpy_rate"])
    .drop_duplicates(subset=["date"])
    .set_index("date")
    .sort_index()
)

return out
```

def zscore_rolling(s: pd.Series, window: int = 20) -> pd.Series:
mean_ = s.rolling(window).mean()
std_ = s.rolling(window).std()
return (s - mean_) / std_

def build_features(wide: pd.DataFrame, imm_jpy_weekly: pd.DataFrame) -> pd.DataFrame:
df = wide.copy()

```
# 危機監視用の分析期間だけ使う
df = df[df.index >= ANALYSIS_START_DATE]

if df.empty:
    raise RuntimeError(f"{ANALYSIS_START_DATE} 以降のデータがありません")

# 日次軸へそろえる
full_index = pd.date_range(df.index.min(), df.index.max(), freq="D")
df = df.reindex(full_index)

# 低頻度系列も含めて前方補完
df = df.ffill()

# IMM週次を日次へ合わせる
imm_jpy_daily = imm_jpy_weekly.reindex(df.index).ffill()
df = df.join(imm_jpy_daily, how="left")

# 基本特徴量
df["yc_10y_2y"] = df["dgs10"] - df["dgs2"]
df["yc_10y_3m"] = df["dgs10"] - df["dgs3mo"]
df["hy_minus_bbb"] = df["hy_oas"] - df["bbb_oas"]

# 5日変化
change_cols = [
    "vix",
    "hy_oas",
    "bbb_oas",
    "hy_minus_bbb",
    "yc_10y_2y",
    "yc_10y_3m",
    "sofr",
    "nfci",
    "anfci",
    "sloos",
    "usdjpy",
    "wti",
    "gold",
]

for col in change_cols:
    if col in df.columns:
        df[f"{col}_chg_5d"] = df[col] - df[col].shift(5)

# 20日 z-score
z_cols = [
    "vix",
    "hy_oas",
    "bbb_oas",
    "hy_minus_bbb",
    "yc_10y_2y",
    "yc_10y_3m",
    "sofr",
    "nfci",
    "anfci",
    "usdjpy",
    "wti",
    "gold",
]

for col in z_cols:
    if col in df.columns:
        df[f"{col}_z20"] = zscore_rolling(df[col], window=20)

# 20日移動平均乖離
ma_cols = ["vix", "hy_oas", "bbb_oas", "usdjpy", "wti", "gold"]

for col in ma_cols:
    if col in df.columns:
        ma20 = df[col].rolling(20).mean()
        df[f"{col}_ma20_gap"] = df[col] - ma20

# IMM JPY特徴量
if "imm_jpy_rate" in df.columns:
    df["imm_jpy_rate_chg_4w"] = df["imm_jpy_rate"] - df["imm_jpy_rate"].shift(20)
    df["imm_jpy_rate_z20"] = zscore_rolling(df["imm_jpy_rate"], window=20)

    imm_jpy_ma20 = df["imm_jpy_rate"].rolling(20).mean()
    df["imm_jpy_rate_ma20_gap"] = df["imm_jpy_rate"] - imm_jpy_ma20

preferred_order = [
    "dgs10",
    "dgs2",
    "dgs3mo",
    "yc_10y_2y",
    "yc_10y_3m",
    "hy_oas",
    "bbb_oas",
    "hy_minus_bbb",
    "vix",
    "sofr",
    "nfci",
    "anfci",
    "sloos",
    "usdjpy",
    "wti",
    "gold",
    "imm_jpy_rate",
    "imm_jpy_rate_chg_4w",
    "imm_jpy_rate_z20",
    "imm_jpy_rate_ma20_gap",
]

others = [c for c in df.columns if c not in preferred_order]
df = df[[c for c in preferred_order if c in df.columns] + others]

df.index.name = "date"

return df
```

def init_research_db(db_path: str) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS crisis_features (
date TEXT NOT NULL,
feature_name TEXT NOT NULL,
value REAL,
PRIMARY KEY (date, feature_name)
);
""")
conn.commit()

def reset_crisis_features_table(db_path: str) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM crisis_features;")
conn.commit()

def save_features_long(db_path: str, feature_df: pd.DataFrame) -> int:
out = feature_df.reset_index().melt(
id_vars="date",
var_name="feature_name",
value_name="value",
)

```
out["date"] = pd.to_datetime(out["date"]).dt.strftime("%Y-%m-%d")
out = out.replace({np.nan: None})

rows = list(out.itertuples(index=False, name=None))

with sqlite3.connect(db_path) as conn:
    cur = conn.executemany("""
        INSERT OR REPLACE INTO crisis_features(date, feature_name, value)
        VALUES (?, ?, ?)
    """, rows)
    conn.commit()
    return cur.rowcount
```

def main():
print("Loading series from market.db ...")
wide = load_series_wide(MARKET_DB, FEATURE_SERIES)

```
print(f"Loaded columns: {list(wide.columns)}")
print(f"Raw shape: {wide.shape}")
print(f"Raw date range: {wide.index.min().date()} -> {wide.index.max().date()}")

print("Loading IMM JPY from cot_legacy.db ...")
imm_jpy_weekly = load_imm_jpy_weekly(COT_DB)

print(f"IMM JPY weekly shape: {imm_jpy_weekly.shape}")
print(f"IMM JPY date range: {imm_jpy_weekly.index.min().date()} -> {imm_jpy_weekly.index.max().date()}")

print("Building crisis features ...")
feature_df = build_features(wide, imm_jpy_weekly)

print(f"Analysis start date: {ANALYSIS_START_DATE}")
print(f"Feature shape: {feature_df.shape}")
print(f"Feature date range: {feature_df.index.min().date()} -> {feature_df.index.max().date()}")

print("Latest rows:")
print(feature_df.tail(3).to_string())

imm_cols = [
    "imm_jpy_rate",
    "imm_jpy_rate_chg_4w",
    "imm_jpy_rate_z20",
    "imm_jpy_rate_ma20_gap",
]

print("Latest IMM rows:")
print(feature_df[imm_cols].tail(5).to_string())

print("Preparing research.sqlite ...")
init_research_db(RESEARCH_DB)

if RESET_CRISIS_FEATURES:
    print("Resetting existing crisis_features ...")
    reset_crisis_features_table(RESEARCH_DB)

print("Saving to research.sqlite ...")
n = save_features_long(RESEARCH_DB, feature_df)

print(f"Upserted {n} rows into crisis_features")
```

if **name** == "**main**":
main()

実行結果

修正後のスクリプトを実行します。

python build_crisis_features.py

実行すると、market.db から市場データを読み込み、続いて cot_legacy.db からJPYのIMMデータを読み込みます。

Loading series from market.db ...
Loaded columns: ['anfci', 'bbb_oas', 'hy_oas', 'wti', 'usdjpy', 'dgs10', 'dgs2', 'dgs3mo', 'sloos', 'nfci', 'sofr', 'vix', 'gold']
Raw shape: (15484, 13)
Raw date range: 1793-03-01 -> 2026-03-13

Loading IMM JPY from cot_legacy.db ...
IMM JPY weekly shape: (585, 1)
IMM JPY date range: 2015-01-06 -> 2026-03-17

Building crisis features ...
Analysis start date: 1990-01-01
Feature shape: (13221, 51)
Feature date range: 1990-01-01 -> 2026-03-13

Saving to research.sqlite ...
Upserted 674271 rows into crisis_features

今回の実行では、特徴量の列数が 51 になりました。

前回の特徴量に対して、JPYのIMM関連の4列が追加されたためです。

  • imm_jpy_rate
  • imm_jpy_rate_chg_4w
  • imm_jpy_rate_z20
  • imm_jpy_rate_ma20_gap

保存件数は 674271 行になりました。

これは、日次の行数と特徴量数を掛け合わせたlong形式の保存件数です。

IMM特徴量が保存されたか確認する

次に、research.sqlite 側で、IMM関連の特徴量が保存されているか確認します。

sqlite3 research.sqlite

まずは、imm_jpy で始まる特徴量の行数を確認します。

SELECT
feature_name,
COUNT(*) AS total_rows
FROM crisis_features
WHERE feature_name LIKE 'imm_jpy%'
GROUP BY feature_name
ORDER BY feature_name;

結果は次のようになりました。

imm_jpy_rate|13221
imm_jpy_rate_chg_4w|13221
imm_jpy_rate_ma20_gap|13221
imm_jpy_rate_z20|13221

4本とも 13221 行で揃っています。

これは、日次indexに対して、IMM関連の特徴量が正しく保存されたことを意味します。

NULLを含む行数と有効なデータ開始日

ただし、ここで注意点があります。

COUNT(*) で見ると、1990年からすべての行が存在しているように見えます。

しかし、JPYのIMM元データは2015年からしかありません。

そのため、1990年から2014年までは、行は存在していても値は NULL です。

まず、単純に MIN(date) を見ると次のようになります。

SELECT
feature_name,
MIN(date) AS start_date,
MAX(date) AS end_date,
COUNT(value) AS non_null_rows
FROM crisis_features
WHERE feature_name LIKE 'imm_jpy%'
GROUP BY feature_name
ORDER BY feature_name;

結果は次のとおりです。

imm_jpy_rate|1990-01-01|2026-03-13|4085
imm_jpy_rate_chg_4w|1990-01-01|2026-03-13|4065
imm_jpy_rate_ma20_gap|1990-01-01|2026-03-13|4066
imm_jpy_rate_z20|1990-01-01|2026-03-13|4066

この結果だけを見ると、imm_jpy_rate が1990年からあるように見えます。

しかし、これは feature_name の行が1990年から存在しているという意味であり、値が入っているという意味ではありません。

本当に確認したいのは、value IS NOT NULL の開始日です。

SELECT
feature_name,
MIN(date) AS first_non_null_date,
MAX(date) AS last_non_null_date,
COUNT(*) AS non_null_rows
FROM crisis_features
WHERE feature_name LIKE 'imm_jpy%'
AND value IS NOT NULL
GROUP BY feature_name
ORDER BY feature_name;

結果は次のようになりました。

imm_jpy_rate|2015-01-06|2026-03-13|4085
imm_jpy_rate_chg_4w|2015-01-26|2026-03-13|4065
imm_jpy_rate_ma20_gap|2015-01-25|2026-03-13|4066
imm_jpy_rate_z20|2015-01-25|2026-03-13|4066

これで、JPYのIMM特徴量は2015年以降に有効な値が入っていることが確認できました。

今回の結果の判断

今回の実装は、処理としては成功です。

確認できたことは次のとおりです。

  • market.dbcot_legacy.db の2DB読み込みができた
  • COT LegacyからJPYのIMMデータを抽出できた
  • 週次IMMデータを日次indexに前方補完して結合できた
  • imm_jpy_rate 系4本を特徴量として生成できた
  • research.sqlitecrisis_features に保存できた

つまり、次の build_crisis_score.pyposition_score を作る準備はできました。

注意点:2015年以前の危機比較には使えない

ただし、今回の結果で1つ大きな注意点があります。

現在の imm_jpy_rate は、元データの都合で2015年以降しか有効値がありません。

そのため、次のような過去の危機イベントを比較する場合、JPYのIMM特徴量は使えません。

  • LTCM危機
  • ITバブル崩壊
  • 9.11
  • リーマンショック前半
  • 欧州債務危機の初期

一方で、2015年以降のイベントであれば使えます。

  • チャイナショック
  • コロナショック
  • 英国トラスショック
  • SVBショック
  • 2024年以降の円キャリー巻き戻し局面

つまり、まずは2015年以降のイベントで検証するのが自然です。

次にやること

次の課題は2つあります。

1. build_crisis_score.py に position_score を追加する

今回追加した imm_jpy_rate 系の特徴量は、危機そのものを見るというより、投機筋のポジション偏りを見るためのデータです。

そのため、既存の金融ストレス系スコアとは分けて、まずは position_score のような形で扱うのがよさそうです。

たとえば、以下のような特徴量を候補にします。

  • imm_jpy_rate
  • imm_jpy_rate_chg_4w
  • imm_jpy_rate_z20
  • imm_jpy_rate_ma20_gap

最初はJPYだけでスコアを作り、その後、EUR、GBP、CHF、CADなどにも広げていく方針がよさそうです。

2. 2015年以前のCOTデータを取得できるか確認する

もう1つの課題は、2015年以前のJPY COTデータです。

今回の cot_legacy.db では2015年以降しか入っていませんでした。

しかし、危機比較では1997年以降、少なくとも2000年代前半から使えるデータが欲しくなります。

そのため、次はCFTCの過去アーカイブから、2015年以前のLegacy Futures Onlyデータを取得できるか確認します。

これができれば、リーマンショックや欧州債務危機でもJPYのIMMポジションを使った検証ができるようになります。

まとめ

今回は、既存の build_crisis_features.py を2DB読み込み対応にし、JPYのIMMポジションを危機監視用特徴量へ追加しました。

変更点は大きくありません。

  • COT_DB = "cot_legacy.db" を追加
  • load_imm_jpy_weekly() を追加
  • 週次IMMデータを日次indexへ前方補完
  • imm_jpy_rate 系4本を生成
  • research.sqlitecrisis_features に保存

実行結果として、特徴量は 51 列になり、research.sqlite には 674271 行が保存されました。

SQL確認でも、imm_jpy_rate 系の4本が保存されていることを確認できました。

一方で、現在のJPY IMMデータは2015年以降しか有効値がないため、リーマンショック以前の危機比較には使えません。

次は、build_crisis_score.pyposition_score を追加しつつ、2015年以前のCOTデータ取得も検討していきます。


コメント

タイトルとURLをコピーしました