CFTCのCOT Legacyデータを1986年からバックフィルして、IMMポジション分析用DBを整える

CFTCのCOT Legacyデータを1986年からバックフィルして、IMMポジション分析用DBを整える

FXや金利・コモディティの分析では、価格データだけでなく、投機筋がどちらに傾いているかを確認するためのポジションデータも重要になります。

今回は、CFTCが公開しているCOT Legacyデータを取得し、SQLiteに保存している既存の仕組みに対して、1986年から現在までの過去データをバックフィルする処理を追加しました。

もともとのスクリプトは、最新日付を確認して差分更新する運用向けの作りにしていました。しかし、そのままでは過去データの取り込みには向かないため、通常更新用とバックフィル用のスクリプトを分けることにしました。

今回やりたいこと

今回の目的は次の通りです。

  • CFTCのCOT Legacy Futures Onlyデータを1986年から取得する
  • 既存DBに入っているデータは重複登録しない
  • 年ごとの行数・列数・列名をログで確認できるようにする
  • 通常運用の差分更新スクリプトとは別に、バックフィル専用スクリプトを用意する
  • JPY、つまり日本円先物の取得開始日をSQLiteで確認する

ここで注意したいのは、「COT全体を1986年から取得できること」と「JPYの市場データが1986年から存在すること」は別問題だという点です。

最初に試したこと:DEFAULT_START_YEARを1986に変更

まず、既存の取得スクリプトで開始年を変更しました。

DEFAULT_START_YEAR = 1986

さらに、年ごとの取得状況を確認できるように、CSVを読み込んだ直後にログを追加しました。

print(f"  rows={len(df)}, cols={len(df.columns)}")
print(df.columns.tolist()[:10])

これで、各年のデータ件数や列数、先頭の列名を確認できます。CFTCの古いデータは、年によってフォーマット差がある可能性があるため、こうしたログはかなり重要です。

しかし、過去バックフィルは発動しなかった

実行すると、次のような結果になりました。

start_year=2025, end_year=2026, latest_date=2026-03-17
== 2025 ==
  rows=17316, cols=129
  no new rows
== 2026 ==
  rows=3866, cols=129
  no new rows
latest date in db: 2026-03-17
done.

一見すると、1986年から取得されていないように見えます。

理由は、既存スクリプトが「差分更新モード」として作ってあるためです。

既存DBに最新日付が入っている場合、スクリプトは次のように開始年を決めていました。

start_year = max(DEFAULT_START_YEAR, parse_cot_date(latest_date).year - 1)

つまり、DBに 2026-03-17 のデータがある場合、DEFAULT_START_YEAR = 1986 にしても、実際には2025年から再取得する動きになります。

これは通常運用では正しい挙動です。直近の取りこぼし対策として前年から再取得する形になっているためです。

ただし、今回やりたいのは過去データのバックフィルです。そこで、通常更新用のスクリプトとは別に、バックフィル専用スクリプトを作ることにしました。

通常更新用とバックフィル用を分ける

既存の get_imm_rate.py は通常更新用として残し、バックフィル用にコピーします。

cp get_imm_rate.py get_imm_rate_backfill.py

通常更新用は、今後も直近データの差分更新に使います。

一方、get_imm_rate_backfill.py は、1986年から現在までを総当たりで確認し、既存キーに存在しないデータだけを追加する用途にします。

バックフィル用スクリプト

バックフィル用では、開始年を固定値として持たせます。

START_YEAR = 1986
END_YEAR = datetime.now().year

そして、年ごとにCFTCのzipファイルを取得し、既存DBにない行だけを追加します。

import io
import zipfile
import requests
import pandas as pd
import sqlite3
from pathlib import Path
import re
from datetime import datetime

BASE = "https://www.cftc.gov/files/dea/history"
DB_PATH = "cot_legacy.db"
TABLE = "cot_legacy_fut_only"

START_YEAR = 1986
END_YEAR = datetime.now().year


def norm_col(s: str) -> str:
    s = s.strip().lower()
    s = re.sub(r"[^a-z0-9]+", "_", s)
    s = re.sub(r"_+", "_", s).strip("_")
    return s


def find_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
    norm_map = {norm_col(c): c for c in df.columns}
    for cand in candidates:
        key = norm_col(cand)
        if key in norm_map:
            return norm_map[key]
    return None


def table_exists(conn: sqlite3.Connection, table: str) -> bool:
    cur = conn.cursor()
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name=?",
        (table,),
    )
    return cur.fetchone() is not None


def ensure_columns(conn: sqlite3.Connection, table: str, df: pd.DataFrame):
    cur = conn.cursor()
    cur.execute(f'PRAGMA table_info("{table}")')
    existing_cols = {row[1] for row in cur.fetchall()}
    missing = [c for c in df.columns if c not in existing_cols]

    for c in missing:
        cur.execute(f'ALTER TABLE "{table}" ADD COLUMN "{c}" TEXT')

    if missing:
        conn.commit()


def ensure_table(conn: sqlite3.Connection, df: pd.DataFrame):
    if not table_exists(conn, TABLE):
        df.head(0).to_sql(TABLE, conn, if_exists="append", index=False)


def get_existing_keys(conn: sqlite3.Connection, key_cols: list[str]) -> set[tuple]:
    cur = conn.cursor()
    cols = ", ".join([f'"{c}"' for c in key_cols])
    cur.execute(f'SELECT DISTINCT {cols} FROM "{TABLE}"')
    return set(tuple("" if v is None else str(v) for v in row) for row in cur.fetchall())


def download_zip(year: int) -> bytes:
    url = f"{BASE}/deacot{year}.zip"
    r = requests.get(url, timeout=60)
    r.raise_for_status()
    return r.content


def read_first_txt_from_zip(zip_bytes: bytes) -> pd.DataFrame:
    z = zipfile.ZipFile(io.BytesIO(zip_bytes))
    candidates = [n for n in z.namelist() if n.lower().endswith((".txt", ".csv"))]
    if not candidates:
        raise RuntimeError(f"no txt/csv found in zip: {z.namelist()[:10]}")
    name = candidates[0]
    with z.open(name) as f:
        return pd.read_csv(f)


def main():
    Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)

    possible_date_cols = [
        "As of Date in Form YYYY-MM-DD",
        "As_of_Date_In_Form_YYYY-MM-DD",
        "Report_Date_as_YYYY-MM-DD",
        "As of Date in Form YYMMDD",
        "As_of_Date_In_Form_YYMMDD",
    ]
    possible_mkt_cols = [
        "Market_and_Exchange_Names",
        "Market and Exchange Names",
    ]
    possible_contract_cols = [
        "CFTC_Contract_Market_Code",
        "CFTC Contract Market Code",
    ]
    possible_commodity_cols = [
        "CFTC_Commodity_Code",
        "CFTC Commodity Code",
    ]

    with sqlite3.connect(DB_PATH) as conn:
        existing = None

        print(f"backfill start_year={START_YEAR}, end_year={END_YEAR}")

        for y in range(START_YEAR, END_YEAR + 1):
            print(f"== {y} ==")
            try:
                zip_bytes = download_zip(y)
                df = read_first_txt_from_zip(zip_bytes)
            except Exception as e:
                print(f"  skip year={y}, error={e}")
                continue

            print(f"  rows={len(df)}, cols={len(df.columns)}")
            print(f"  first 10 cols={df.columns.tolist()[:10]}")

            ensure_table(conn, df)
            ensure_columns(conn, TABLE, df)

            date_col = find_col(df, possible_date_cols)
            if date_col is None:
                print(f"  skip year={y}, date col not found")
                continue

            mkt_col = find_col(df, possible_mkt_cols)
            contract_col = find_col(df, possible_contract_cols)
            commodity_col = find_col(df, possible_commodity_cols)

            key_cols = []
            if mkt_col:
                key_cols.append(mkt_col)
            key_cols.append(date_col)
            if contract_col:
                key_cols.append(contract_col)
            if commodity_col:
                key_cols.append(commodity_col)

            if existing is None:
                try:
                    existing = get_existing_keys(conn, key_cols)
                except sqlite3.OperationalError:
                    existing = set()

            key_tuples = []
            for _, row in df.iterrows():
                key_tuples.append(
                    tuple("" if pd.isna(row[c]) else str(row[c]) for c in key_cols)
                )

            mask = [k not in existing for k in key_tuples]
            new_df = df[mask].copy()

            if new_df.empty:
                print("  no new rows")
                continue

            new_df.to_sql(TABLE, conn, if_exists="append", index=False)

            for _, row in new_df.iterrows():
                existing.add(
                    tuple("" if pd.isna(row[c]) else str(row[c]) for c in key_cols)
                )

            print(f"  inserted: {len(new_df)} rows")

        print("backfill done.")


if __name__ == "__main__":
    main()

実行結果

バックフィル用スクリプトを実行します。

python get_imm_rate_backfill.py

実行すると、1986年から順番に処理されました。

backfill start_year=1986, end_year=2026
== 1986 ==
  rows=1255, cols=129
  inserted: 1255 rows
== 1987 ==
  rows=1259, cols=129
  inserted: 1259 rows
== 1988 ==
  rows=1232, cols=129
  inserted: 1232 rows
== 1989 ==
  rows=1316, cols=129
  inserted: 1316 rows
== 1990 ==
  rows=1332, cols=129
  inserted: 1332 rows
== 1991 ==
  rows=1351, cols=129
  inserted: 1351 rows
== 1992 ==
  rows=1908, cols=129
  inserted: 1908 rows

1990年代から2005年までは、過去データが新規追加されました。

一方、2006年以降はすでにDBに入っていたため、基本的に no new rows になりました。

== 2006 ==
  rows=4926, cols=129
  no new rows
== 2007 ==
  rows=5394, cols=129
  no new rows
...
== 2026 ==
  rows=3866, cols=129
  no new rows
backfill done.

この結果から、既存データを壊さず、足りなかった古い年だけ追加できたことが分かります。

DtypeWarningについて

一部の年では、次のような警告が出ました。

DtypeWarning: Columns (...) have mixed types. Specify dtype option on import or set low_memory=False.

これは、同じ列の中に数値と文字列が混在している可能性がある、というpandasの警告です。

今回の処理では警告で止まっておらず、最後までバックフィルは完走しています。そのため、今回の段階では致命的な問題ではありません。

ただし、後で数値計算や特徴量生成に使う場合は、必要な列だけ明示的に数値変換する処理を別途入れた方が安全です。

SQLiteでJPYの取得期間を確認する

バックフィル後、SQLiteで日本円先物の取得期間を確認しました。

sqlite3 cot_legacy.db

まず、JPYの最古日と最新日を確認します。

SELECT
  MIN("As of Date in Form YYYY-MM-DD"),
  MAX("As of Date in Form YYYY-MM-DD")
FROM cot_legacy_fut_only
WHERE TRIM("Market and Exchange Names") = 'JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE';

結果は次の通りでした。

2000-08-29|2026-03-17

ここが重要です。

COT全体としては1986年からバックフィルできています。しかし、JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE という市場名で見ると、実データの開始日は 2000-08-29 でした。

つまり、「1986年から全体取得できた」と「JPYが1986年から存在する」は同じ意味ではありません。

JPYの先頭データを確認する

実際にJPYの先頭データも確認しました。

SELECT
  "As of Date in Form YYYY-MM-DD" AS date,
  TRIM("Market and Exchange Names") AS market_name,
  "Open Interest (All)" AS oi,
  "Noncommercial Positions-Long (All)" AS nc_long,
  "Noncommercial Positions-Short (All)" AS nc_short
FROM cot_legacy_fut_only
WHERE TRIM("Market and Exchange Names") = 'JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE'
ORDER BY "As of Date in Form YYYY-MM-DD"
LIMIT 20;

結果の一部です。

2000-08-29|JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE|77923|6943|19918
2000-09-05|JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE|71820|5476|12578
2000-09-12|JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE|92390|14416|8082
2000-09-19|JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE|38431|3734|7566
2000-09-26|JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE|43973|5306|9532

このように、対象市場名では2000年8月末からデータが入っています。

JPYの件数も確認する

件数と期間も確認しました。

SELECT
  COUNT(*) AS rows,
  MIN("As of Date in Form YYYY-MM-DD") AS start_date,
  MAX("As of Date in Form YYYY-MM-DD") AS end_date
FROM cot_legacy_fut_only
WHERE TRIM("Market and Exchange Names") = 'JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE';

結果は次の通りです。

1334|2000-08-29|2026-03-17

JPYについては、1334件の週次データが取得できていることが分かりました。

YENを含む市場名を確認する

念のため、YEN を含む市場名も確認しました。

SELECT DISTINCT TRIM("Market and Exchange Names") AS market_name
FROM cot_legacy_fut_only
WHERE "Market and Exchange Names" LIKE '%YEN%'
ORDER BY 1;

結果として、次のように複数の市場名が存在していました。

3-MO. EUROYEN - INTERNATIONAL MONETARY MARKET
3-MO. EUROYEN TIBOR - CHICAGO MERCANTILE EXCHANGE
3-MO. EUROYEN TIBOR - INTERNATIONAL MONETARY MARKET
3-MO. EUROYEN-LIBOR - INTERNATIONAL MONETARY MARKET
AUSSIE-YEN XRATE - NEW YORK COTTON EXCHANGE
EURO FX/JAPANESE YEN XRATE - CHICAGO MERCANTILE EXCHANGE
JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE
JAPANESE YEN - INTERNATIONAL MONETARY MARKET
JAPANESE YEN - PHILADELPHIA BOARD OF TRADE
U.S. DOLLARS-JAPANESE YEN - NEW YORK FUTURES EXCHANGE

この結果を見ると、単純に YEN で検索すると、日本円そのもの以外にも、ユーロ円、クロスレート、金利系の商品などが含まれることが分かります。

そのため、FXのJPYポジション分析に使う場合は、どの市場名を採用するかを明確に決めておく必要があります。

今回分かったこと

今回のバックフィルで分かったことを整理します。

  • 既存の get_imm_rate.py は差分更新用として正しく動いていた
  • DEFAULT_START_YEAR を1986に変えるだけでは、過去バックフィルにはならなかった
  • バックフィル用に get_imm_rate_backfill.py を分けることで、1986年から総当たり取得できた
  • 1986年から2005年までは新規データが追加された
  • 2006年以降は既存データがあり、基本的に no new rows になった
  • COT全体のバックフィルと、特定市場の開始日は別問題
  • JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE の開始日は 2000-08-29 だった

今後やること

今回でCOT Legacyデータのバックフィルはできました。

次にやることは、この生データをそのまま使うのではなく、分析しやすい形に整えることです。

  • JPY、EUR、GBP、AUD、CAD、CHFなど主要通貨の市場名を整理する
  • Noncommercial Long / Short からネットポジションを計算する
  • 建玉に対する比率を計算する
  • z-scoreや前年差分などの特徴量を作る
  • USD/JPYやDXY、金利、VIXなどと結合して、リスクオン・リスクオフ分析に使う

特にJPYについては、リスク回避局面で円が買われることも多いため、COTポジションと価格・金利・ボラティリティを組み合わせると、短期売買や危機監視の材料として使いやすくなりそうです。

まとめ

今回は、CFTCのCOT Legacyデータを1986年からバックフィルし、SQLiteに保存する処理を作りました。

最初は DEFAULT_START_YEAR を1986に変更すればよいと思いましたが、既存スクリプトは差分更新用のロジックになっていたため、過去データの取得には入りませんでした。

そこで、通常更新用とバックフィル用を分け、get_imm_rate_backfill.py として1986年から現在までを総当たりで確認する形にしました。

結果として、1986年から2005年までの未取得データを追加でき、2006年以降の既存データは重複登録せずに済みました。

ただし、COT全体を1986年から取得できても、JPYの市場データが1986年から存在するわけではありません。今回確認した限り、JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE の開始日は 2000-08-29 でした。

次は、このCOTデータから通貨別のネットポジションやz-scoreを作り、FX分析用の特徴量として使える形にしていきます。

コメント

タイトルとURLをコピーしました