危機監視用特徴量にJPYのIMMポジションを追加し、2つのSQLite DBを統合して扱う
前回までに、FREDやStooqから取得した市場データを market.db に保存し、危機監視用の特徴量を research.sqlite に保存する流れを作りました。
具体的には、米国債利回り、VIX、社債スプレッド、SOFR、NFCI、ANFCI、SLOOS、USD/JPY、WTI、金価格などを読み込み、日次の特徴量として整形するところまで進めています。
今回はそこに、CFTCのCOTデータから作成した cot_legacy.db を追加で読み込み、JPYのIMMポジションを危機監視用特徴量に加えます。
目的は、単なる金利・ボラティリティ・信用スプレッドだけでなく、投機筋の円ポジションも見ながら、将来的に position_score のような需給系スコアへつなげることです。
今回やること
今回の目的は、既存の build_crisis_features.py を大きく作り替えることではありません。
最小差分で、次の3点だけを追加します。
market.dbに加えてcot_legacy.dbを読む- COT LegacyデータからJPYのIMMポジション比率を作る
- 既存の日次特徴量に
imm_jpy_rate系の特徴量を追加して保存する
イメージとしては、既存の市場データ特徴量に、週次のIMMデータを日次へ前方補完して結合する形です。
前提となるDB構成
今回使うDBは3つです。
| DB | 役割 |
|---|---|
market.db |
FRED、Stooqなどから取得した市場データを保存するDB |
cot_legacy.db |
CFTC COT Legacyデータを保存するDB |
research.sqlite |
加工後の危機監視用特徴量を保存するDB |
既存の market.db には、次のような系列が入っています。
- 米10年債利回り
- 米2年債利回り
- 米3か月債利回り
- VIX
- High Yield OAS
- BBB OAS
- SOFR
- NFCI / ANFCI
- SLOOS
- USD/JPY
- WTI
- Gold
今回追加する cot_legacy.db からは、JPYのCOTデータを読み込みます。
追加するJPY IMM特徴量
今回追加する特徴量は、まず次の4本に絞ります。
imm_jpy_rateimm_jpy_rate_chg_4wimm_jpy_rate_z20imm_jpy_rate_ma20_gap
imm_jpy_rate は、非商業部門のロングとショートの差をOpen Interestで割ったものです。
計算式としては次のようになります。
imm_jpy_net = noncomm_long - noncomm_short imm_jpy_rate = imm_jpy_net / open_interest
これにより、単純な枚数ではなく、Open Interestに対するネットポジション比率として扱えます。
そのうえで、20営業日差分、20日ローリングz-score、20日移動平均からの乖離も作ります。
週次データを日次へ前方補完しているため、ここでいう4週変化は厳密な週次4回分ではなく、日次化後の20営業日相当として扱っています。
修正後の build_crisis_features.py
修正後のコードは次のとおりです。
import sqlite3
import pandas as pd
import numpy as np
MARKET_DB = "market.db"
COT_DB = "cot_legacy.db"
RESEARCH_DB = "research.sqlite"
# 危機監視用の分析期間
ANALYSIS_START_DATE = "1990-01-01"
# True にすると保存前に crisis_features を全削除
RESET_CRISIS_FEATURES = True
# 使用する系列
FEATURE_SERIES = {
"DGS10": "dgs10",
"DGS2": "dgs2",
"DGS3MO": "dgs3mo",
"VIXCLS": "vix",
"BAMLH0A0HYM2": "hy_oas",
"BAMLC0A4CBBB": "bbb_oas",
"SOFR": "sofr",
"NFCI": "nfci",
"ANFCI": "anfci",
"DRTSCILM": "sloos",
"DEXJPUS": "usdjpy",
"DCOILWTICO": "wti",
"XAUUSD_STOOQ": "gold",
}
IMM_JPY_MARKET_NAME = "JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE"
def load_series_wide(db_path: str, series_map: dict[str, str]) -> pd.DataFrame:
series_ids = list(series_map.keys())
placeholders = ",".join(["?"] * len(series_ids))
```
sql = f"""
SELECT series_id, date, value
FROM series_observations
WHERE series_id IN ({placeholders})
ORDER BY date
"""
with sqlite3.connect(db_path) as conn:
df = pd.read_sql_query(sql, conn, params=series_ids)
if df.empty:
raise RuntimeError("series_observations から対象系列を取得できませんでした")
df["date"] = pd.to_datetime(df["date"])
df["value"] = pd.to_numeric(df["value"], errors="coerce")
wide = df.pivot(index="date", columns="series_id", values="value").sort_index()
rename_map = {k: v for k, v in series_map.items() if k in wide.columns}
wide = wide.rename(columns=rename_map)
wide.columns.name = None
return wide
```
def load_imm_jpy_weekly(cot_db_path: str) -> pd.DataFrame:
sql = """
SELECT
"As of Date in Form YYYY-MM-DD" AS date,
"Open Interest (All)" AS open_interest,
"Noncommercial Positions-Long (All)" AS noncomm_long,
"Noncommercial Positions-Short (All)" AS noncomm_short
FROM cot_legacy_fut_only
WHERE "Market and Exchange Names" = ?
AND "As of Date in Form YYYY-MM-DD" IS NOT NULL
ORDER BY "As of Date in Form YYYY-MM-DD"
"""
```
with sqlite3.connect(cot_db_path) as conn:
df = pd.read_sql_query(sql, conn, params=[IMM_JPY_MARKET_NAME])
if df.empty:
raise RuntimeError("cot_legacy.db から JPY の IMM データを取得できませんでした")
df["date"] = pd.to_datetime(df["date"])
for col in ["open_interest", "noncomm_long", "noncomm_short"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df["imm_jpy_net"] = df["noncomm_long"] - df["noncomm_short"]
df["imm_jpy_rate"] = df["imm_jpy_net"] / df["open_interest"]
out = (
df[["date", "imm_jpy_rate"]]
.dropna(subset=["date", "imm_jpy_rate"])
.drop_duplicates(subset=["date"])
.set_index("date")
.sort_index()
)
return out
```
def zscore_rolling(s: pd.Series, window: int = 20) -> pd.Series:
mean_ = s.rolling(window).mean()
std_ = s.rolling(window).std()
return (s - mean_) / std_
def build_features(wide: pd.DataFrame, imm_jpy_weekly: pd.DataFrame) -> pd.DataFrame:
df = wide.copy()
```
# 危機監視用の分析期間だけ使う
df = df[df.index >= ANALYSIS_START_DATE]
if df.empty:
raise RuntimeError(f"{ANALYSIS_START_DATE} 以降のデータがありません")
# 日次軸へそろえる
full_index = pd.date_range(df.index.min(), df.index.max(), freq="D")
df = df.reindex(full_index)
# 低頻度系列も含めて前方補完
df = df.ffill()
# IMM週次を日次へ合わせる
imm_jpy_daily = imm_jpy_weekly.reindex(df.index).ffill()
df = df.join(imm_jpy_daily, how="left")
# 基本特徴量
df["yc_10y_2y"] = df["dgs10"] - df["dgs2"]
df["yc_10y_3m"] = df["dgs10"] - df["dgs3mo"]
df["hy_minus_bbb"] = df["hy_oas"] - df["bbb_oas"]
# 5日変化
change_cols = [
"vix",
"hy_oas",
"bbb_oas",
"hy_minus_bbb",
"yc_10y_2y",
"yc_10y_3m",
"sofr",
"nfci",
"anfci",
"sloos",
"usdjpy",
"wti",
"gold",
]
for col in change_cols:
if col in df.columns:
df[f"{col}_chg_5d"] = df[col] - df[col].shift(5)
# 20日 z-score
z_cols = [
"vix",
"hy_oas",
"bbb_oas",
"hy_minus_bbb",
"yc_10y_2y",
"yc_10y_3m",
"sofr",
"nfci",
"anfci",
"usdjpy",
"wti",
"gold",
]
for col in z_cols:
if col in df.columns:
df[f"{col}_z20"] = zscore_rolling(df[col], window=20)
# 20日移動平均乖離
ma_cols = ["vix", "hy_oas", "bbb_oas", "usdjpy", "wti", "gold"]
for col in ma_cols:
if col in df.columns:
ma20 = df[col].rolling(20).mean()
df[f"{col}_ma20_gap"] = df[col] - ma20
# IMM JPY特徴量
if "imm_jpy_rate" in df.columns:
df["imm_jpy_rate_chg_4w"] = df["imm_jpy_rate"] - df["imm_jpy_rate"].shift(20)
df["imm_jpy_rate_z20"] = zscore_rolling(df["imm_jpy_rate"], window=20)
imm_jpy_ma20 = df["imm_jpy_rate"].rolling(20).mean()
df["imm_jpy_rate_ma20_gap"] = df["imm_jpy_rate"] - imm_jpy_ma20
preferred_order = [
"dgs10",
"dgs2",
"dgs3mo",
"yc_10y_2y",
"yc_10y_3m",
"hy_oas",
"bbb_oas",
"hy_minus_bbb",
"vix",
"sofr",
"nfci",
"anfci",
"sloos",
"usdjpy",
"wti",
"gold",
"imm_jpy_rate",
"imm_jpy_rate_chg_4w",
"imm_jpy_rate_z20",
"imm_jpy_rate_ma20_gap",
]
others = [c for c in df.columns if c not in preferred_order]
df = df[[c for c in preferred_order if c in df.columns] + others]
df.index.name = "date"
return df
```
def init_research_db(db_path: str) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS crisis_features (
date TEXT NOT NULL,
feature_name TEXT NOT NULL,
value REAL,
PRIMARY KEY (date, feature_name)
);
""")
conn.commit()
def reset_crisis_features_table(db_path: str) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute("DELETE FROM crisis_features;")
conn.commit()
def save_features_long(db_path: str, feature_df: pd.DataFrame) -> int:
out = feature_df.reset_index().melt(
id_vars="date",
var_name="feature_name",
value_name="value",
)
```
out["date"] = pd.to_datetime(out["date"]).dt.strftime("%Y-%m-%d")
out = out.replace({np.nan: None})
rows = list(out.itertuples(index=False, name=None))
with sqlite3.connect(db_path) as conn:
cur = conn.executemany("""
INSERT OR REPLACE INTO crisis_features(date, feature_name, value)
VALUES (?, ?, ?)
""", rows)
conn.commit()
return cur.rowcount
```
def main():
print("Loading series from market.db ...")
wide = load_series_wide(MARKET_DB, FEATURE_SERIES)
```
print(f"Loaded columns: {list(wide.columns)}")
print(f"Raw shape: {wide.shape}")
print(f"Raw date range: {wide.index.min().date()} -> {wide.index.max().date()}")
print("Loading IMM JPY from cot_legacy.db ...")
imm_jpy_weekly = load_imm_jpy_weekly(COT_DB)
print(f"IMM JPY weekly shape: {imm_jpy_weekly.shape}")
print(f"IMM JPY date range: {imm_jpy_weekly.index.min().date()} -> {imm_jpy_weekly.index.max().date()}")
print("Building crisis features ...")
feature_df = build_features(wide, imm_jpy_weekly)
print(f"Analysis start date: {ANALYSIS_START_DATE}")
print(f"Feature shape: {feature_df.shape}")
print(f"Feature date range: {feature_df.index.min().date()} -> {feature_df.index.max().date()}")
print("Latest rows:")
print(feature_df.tail(3).to_string())
imm_cols = [
"imm_jpy_rate",
"imm_jpy_rate_chg_4w",
"imm_jpy_rate_z20",
"imm_jpy_rate_ma20_gap",
]
print("Latest IMM rows:")
print(feature_df[imm_cols].tail(5).to_string())
print("Preparing research.sqlite ...")
init_research_db(RESEARCH_DB)
if RESET_CRISIS_FEATURES:
print("Resetting existing crisis_features ...")
reset_crisis_features_table(RESEARCH_DB)
print("Saving to research.sqlite ...")
n = save_features_long(RESEARCH_DB, feature_df)
print(f"Upserted {n} rows into crisis_features")
```
if **name** == "**main**":
main()
実行結果
修正後のスクリプトを実行します。
python build_crisis_features.py
実行すると、market.db から市場データを読み込み、続いて cot_legacy.db からJPYのIMMデータを読み込みます。
Loading series from market.db ... Loaded columns: ['anfci', 'bbb_oas', 'hy_oas', 'wti', 'usdjpy', 'dgs10', 'dgs2', 'dgs3mo', 'sloos', 'nfci', 'sofr', 'vix', 'gold'] Raw shape: (15484, 13) Raw date range: 1793-03-01 -> 2026-03-13 Loading IMM JPY from cot_legacy.db ... IMM JPY weekly shape: (585, 1) IMM JPY date range: 2015-01-06 -> 2026-03-17 Building crisis features ... Analysis start date: 1990-01-01 Feature shape: (13221, 51) Feature date range: 1990-01-01 -> 2026-03-13 Saving to research.sqlite ... Upserted 674271 rows into crisis_features
今回の実行では、特徴量の列数が 51 になりました。
前回の特徴量に対して、JPYのIMM関連の4列が追加されたためです。
imm_jpy_rateimm_jpy_rate_chg_4wimm_jpy_rate_z20imm_jpy_rate_ma20_gap
保存件数は 674271 行になりました。
これは、日次の行数と特徴量数を掛け合わせたlong形式の保存件数です。
IMM特徴量が保存されたか確認する
次に、research.sqlite 側で、IMM関連の特徴量が保存されているか確認します。
sqlite3 research.sqlite
まずは、imm_jpy で始まる特徴量の行数を確認します。
SELECT feature_name, COUNT(*) AS total_rows FROM crisis_features WHERE feature_name LIKE 'imm_jpy%' GROUP BY feature_name ORDER BY feature_name;
結果は次のようになりました。
imm_jpy_rate|13221 imm_jpy_rate_chg_4w|13221 imm_jpy_rate_ma20_gap|13221 imm_jpy_rate_z20|13221
4本とも 13221 行で揃っています。
これは、日次indexに対して、IMM関連の特徴量が正しく保存されたことを意味します。
NULLを含む行数と有効なデータ開始日
ただし、ここで注意点があります。
COUNT(*) で見ると、1990年からすべての行が存在しているように見えます。
しかし、JPYのIMM元データは2015年からしかありません。
そのため、1990年から2014年までは、行は存在していても値は NULL です。
まず、単純に MIN(date) を見ると次のようになります。
SELECT feature_name, MIN(date) AS start_date, MAX(date) AS end_date, COUNT(value) AS non_null_rows FROM crisis_features WHERE feature_name LIKE 'imm_jpy%' GROUP BY feature_name ORDER BY feature_name;
結果は次のとおりです。
imm_jpy_rate|1990-01-01|2026-03-13|4085 imm_jpy_rate_chg_4w|1990-01-01|2026-03-13|4065 imm_jpy_rate_ma20_gap|1990-01-01|2026-03-13|4066 imm_jpy_rate_z20|1990-01-01|2026-03-13|4066
この結果だけを見ると、imm_jpy_rate が1990年からあるように見えます。
しかし、これは feature_name の行が1990年から存在しているという意味であり、値が入っているという意味ではありません。
本当に確認したいのは、value IS NOT NULL の開始日です。
SELECT feature_name, MIN(date) AS first_non_null_date, MAX(date) AS last_non_null_date, COUNT(*) AS non_null_rows FROM crisis_features WHERE feature_name LIKE 'imm_jpy%' AND value IS NOT NULL GROUP BY feature_name ORDER BY feature_name;
結果は次のようになりました。
imm_jpy_rate|2015-01-06|2026-03-13|4085 imm_jpy_rate_chg_4w|2015-01-26|2026-03-13|4065 imm_jpy_rate_ma20_gap|2015-01-25|2026-03-13|4066 imm_jpy_rate_z20|2015-01-25|2026-03-13|4066
これで、JPYのIMM特徴量は2015年以降に有効な値が入っていることが確認できました。
今回の結果の判断
今回の実装は、処理としては成功です。
確認できたことは次のとおりです。
market.dbとcot_legacy.dbの2DB読み込みができた- COT LegacyからJPYのIMMデータを抽出できた
- 週次IMMデータを日次indexに前方補完して結合できた
imm_jpy_rate系4本を特徴量として生成できたresearch.sqliteのcrisis_featuresに保存できた
つまり、次の build_crisis_score.py で position_score を作る準備はできました。
注意点:2015年以前の危機比較には使えない
ただし、今回の結果で1つ大きな注意点があります。
現在の imm_jpy_rate は、元データの都合で2015年以降しか有効値がありません。
そのため、次のような過去の危機イベントを比較する場合、JPYのIMM特徴量は使えません。
- LTCM危機
- ITバブル崩壊
- 9.11
- リーマンショック前半
- 欧州債務危機の初期
一方で、2015年以降のイベントであれば使えます。
- チャイナショック
- コロナショック
- 英国トラスショック
- SVBショック
- 2024年以降の円キャリー巻き戻し局面
つまり、まずは2015年以降のイベントで検証するのが自然です。
次にやること
次の課題は2つあります。
1. build_crisis_score.py に position_score を追加する
今回追加した imm_jpy_rate 系の特徴量は、危機そのものを見るというより、投機筋のポジション偏りを見るためのデータです。
そのため、既存の金融ストレス系スコアとは分けて、まずは position_score のような形で扱うのがよさそうです。
たとえば、以下のような特徴量を候補にします。
imm_jpy_rateimm_jpy_rate_chg_4wimm_jpy_rate_z20imm_jpy_rate_ma20_gap
最初はJPYだけでスコアを作り、その後、EUR、GBP、CHF、CADなどにも広げていく方針がよさそうです。
2. 2015年以前のCOTデータを取得できるか確認する
もう1つの課題は、2015年以前のJPY COTデータです。
今回の cot_legacy.db では2015年以降しか入っていませんでした。
しかし、危機比較では1997年以降、少なくとも2000年代前半から使えるデータが欲しくなります。
そのため、次はCFTCの過去アーカイブから、2015年以前のLegacy Futures Onlyデータを取得できるか確認します。
これができれば、リーマンショックや欧州債務危機でもJPYのIMMポジションを使った検証ができるようになります。
まとめ
今回は、既存の build_crisis_features.py を2DB読み込み対応にし、JPYのIMMポジションを危機監視用特徴量へ追加しました。
変更点は大きくありません。
COT_DB = "cot_legacy.db"を追加load_imm_jpy_weekly()を追加- 週次IMMデータを日次indexへ前方補完
imm_jpy_rate系4本を生成research.sqliteのcrisis_featuresに保存
実行結果として、特徴量は 51 列になり、research.sqlite には 674271 行が保存されました。
SQL確認でも、imm_jpy_rate 系の4本が保存されていることを確認できました。
一方で、現在のJPY IMMデータは2015年以降しか有効値がないため、リーマンショック以前の危機比較には使えません。
次は、build_crisis_score.py に position_score を追加しつつ、2015年以前のCOTデータ取得も検討していきます。

コメント