IMMポジション取得スクリプトを差分更新対応にして、再実行で壊れない形に直す

IMMポジション取得スクリプトを差分更新対応にして、再実行で壊れない形に直す

前回までに、CFTCのCOTレポートからIMMポジションを取得し、SQLiteへ保存する仕組みを作っていました。

ただし、しばらく取得していなかったため、最新データを取り直そうとしたところ、既存レコードとの重複によってスクリプトが停止しました。

今回は、get_imm_rate.py を「毎回フル取得してappendする形」から、「DBの最新日付を見て差分更新する形」に修正します。

目的は次の3つです。

  • 最新のIMMポジションを継続取得できるようにする
  • 同じスクリプトを再実行しても重複エラーで止まらないようにする
  • 金融危機分析やディーリング用の特徴量生成に使えるrawデータを維持する

発生したエラー

久しぶりに get_imm_rate.py を実行したところ、次のようなエラーが出ました。

python get_imm_rate.py

== 2015 ==
sqlite3.IntegrityError: UNIQUE constraint failed:
cot_legacy_fut_only.Market and Exchange Names,
cot_legacy_fut_only.As of Date in Form YYYY-MM-DD,
cot_legacy_fut_only.CFTC Contract Market Code,
cot_legacy_fut_only.CFTC Commodity Code

長いTracebackは省略していますが、原因は単純です。

SQLiteの cot_legacy_fut_only テーブルには、次の組み合わせで一意制約を設定しています。

  • Market and Exchange Names
  • As of Date in Form YYYY-MM-DD
  • CFTC Contract Market Code
  • CFTC Commodity Code

つまり、同じ市場・同じ日付・同じ契約コード・同じ商品コードのデータを、もう一度 append しようとして失敗していました。

これは「最新取得に失敗した」というよりも、「すでに入っている過去年のデータを再投入しようとして止まった」という状態です。

なぜ修正が必要か

ディーリングや金融危機分析で使うなら、IMMポジションは一度取って終わりではありません。

毎週更新されるデータを継続的に取り込み、過去データと直近データを同じDB上で扱えるようにする必要があります。

そのため、単純に次のような形で毎回appendする設計は危険です。

new_df.to_sql(TABLE, conn, if_exists="append", index=False)

この形だと、同じ年を再取得しただけで重複エラーになります。

そこで、今回の方針は次のようにしました。

  1. DBから最新取得日を読む
  2. 最新取得日の年をもとに、再取得する開始年を決める
  3. 安全のため前年から再取得する
  4. 取得したデータから既存キーを除外する
  5. 新規分だけSQLiteへappendする
  6. DB側のUNIQUE制約は最終防衛線として維持する

修正前の問題点

今回のコードには、大きく2つの問題がありました。

1. to_sql が2回実行される可能性があった

メモ段階のコードでは、次のように to_sql が2回走る構造になっていました。

new_df.to_sql(TABLE, conn, if_exists="append", index=False)
ensure_columns(conn, TABLE, new_df)
new_df.to_sql(TABLE, conn, if_exists="append", index=False)

これでは、新規データであっても同じ内容を2回挿入しようとしてしまいます。

そのため、ensure_columns() を先に実行し、to_sql() は1回だけ実行する形に直しました。

2. 日付列の判定で YYMMDD を優先していた

最初の修正版では、日付列として As of Date in Form YYMMDD を拾ってしまい、次のような実行結果になりました。

python get_imm_rate.py

start_year=2030, end_year=2026, latest_date=251230
latest date in db: 251230
done.

一見すると正常終了に見えますが、実際には何も取得できていません。

latest_date=251230 を日付として解釈した結果、2030年 扱いになり、start_year=2030 になっていました。

その結果、end_year=2026 より開始年が未来になり、取得ループが実行されない状態でした。

DBには As of Date in Form YYYY-MM-DD も入っていたので、こちらを優先して使うように修正します。

修正方針

今回の修正では、次の3点を中心に直しました。

  • START_YEAR = 2015 固定をやめる
  • 重複判定キーをDBのUNIQUE制約に合わせる
  • YYYY-MM-DD 形式の日付列を優先する

重複判定には、次の4列を使います。

key_cols = [
"Market and Exchange Names",
"As of Date in Form YYYY-MM-DD",
"CFTC Contract Market Code",
"CFTC Commodity Code",
]

これにより、同じ市場・同じ日付・同じ契約コード・同じ商品コードのデータは、既存データとして除外できます。

修正版 get_imm_rate.py

以下が、今回修正した get_imm_rate.py です。

import io
import zipfile
import requests
import pandas as pd
import sqlite3
from pathlib import Path
import re
from datetime import datetime

BASE = "[https://www.cftc.gov/files/dea/history](https://www.cftc.gov/files/dea/history)"
DB_PATH = "cot_legacy.db"
TABLE = "cot_legacy_fut_only"

DEFAULT_START_YEAR = 2015
END_YEAR = datetime.now().year

def norm_col(s: str) -> str:
s = s.strip().lower()
s = re.sub(r"[^a-z0-9]+", "*", s)
s = re.sub(r"*+", "*", s).strip("*")
return s

def find_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
norm_map = {norm_col(c): c for c in df.columns}
for cand in candidates:
key = norm_col(cand)
if key in norm_map:
return norm_map[key]
return None

def parse_cot_date(s: str):
s = str(s).strip()

```
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", s):
    return pd.to_datetime(s, format="%Y-%m-%d")

if re.fullmatch(r"\d{6}", s):
    return pd.to_datetime(s, format="%y%m%d")

return pd.to_datetime(s)
```

def table_exists(conn: sqlite3.Connection, table: str) -> bool:
cur = conn.cursor()
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table,),
)
return cur.fetchone() is not None

def ensure_columns(conn: sqlite3.Connection, table: str, df: pd.DataFrame):
cur = conn.cursor()
cur.execute(f'PRAGMA table_info("{table}")')
existing_cols = {row[1] for row in cur.fetchall()}
missing = [c for c in df.columns if c not in existing_cols]

```
for c in missing:
    cur.execute(f'ALTER TABLE "{table}" ADD COLUMN "{c}" TEXT')

if missing:
    conn.commit()
```

def ensure_table(conn: sqlite3.Connection, df: pd.DataFrame):
if not table_exists(conn, TABLE):
df.head(0).to_sql(TABLE, conn, if_exists="append", index=False)

def get_latest_date(conn: sqlite3.Connection, date_col: str) -> str | None:
if not table_exists(conn, TABLE):
return None

```
cur = conn.cursor()

try:
    cur.execute(f'SELECT MAX("{date_col}") FROM "{TABLE}"')
    row = cur.fetchone()

    if row and row[0]:
        return str(row[0])

except sqlite3.OperationalError:
    return None

return None
```

def get_existing_keys(conn: sqlite3.Connection, key_cols: list[str]) -> set[tuple]:
cur = conn.cursor()
cols = ", ".join([f'"{c}"' for c in key_cols])
cur.execute(f'SELECT DISTINCT {cols} FROM "{TABLE}"')

```
return set(
    tuple("" if v is None else str(v) for v in row)
    for row in cur.fetchall()
)
```

def download_zip(year: int) -> bytes:
url = f"{BASE}/deacot{year}.zip"
r = requests.get(url, timeout=60)
r.raise_for_status()
return r.content

def read_first_txt_from_zip(zip_bytes: bytes) -> pd.DataFrame:
z = zipfile.ZipFile(io.BytesIO(zip_bytes))
candidates = [
n for n in z.namelist()
if n.lower().endswith((".txt", ".csv"))
]

```
if not candidates:
    raise RuntimeError(f"no txt/csv found in zip: {z.namelist()[:10]}")

name = candidates[0]

with z.open(name) as f:
    return pd.read_csv(f)
```

def main():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)

```
# YYYY-MM-DD を優先する
possible_date_cols = [
    "As of Date in Form YYYY-MM-DD",
    "As_of_Date_In_Form_YYYY-MM-DD",
    "Report_Date_as_YYYY-MM-DD",
    "As of Date in Form YYMMDD",
    "As_of_Date_In_Form_YYMMDD",
]

possible_mkt_cols = [
    "Market_and_Exchange_Names",
    "Market and Exchange Names",
]

possible_contract_cols = [
    "CFTC_Contract_Market_Code",
    "CFTC Contract Market Code",
]

possible_commodity_cols = [
    "CFTC_Commodity_Code",
    "CFTC Commodity Code",
]

with sqlite3.connect(DB_PATH) as conn:
    existing = None

    sample_year = DEFAULT_START_YEAR
    sample_df = read_first_txt_from_zip(download_zip(sample_year))

    ensure_table(conn, sample_df)
    ensure_columns(conn, TABLE, sample_df)

    date_col = find_col(sample_df, possible_date_cols)

    if date_col is None:
        raise ValueError(
            f"date col not found. columns={list(sample_df.columns)[:50]}"
        )

    mkt_col = find_col(sample_df, possible_mkt_cols)
    contract_col = find_col(sample_df, possible_contract_cols)
    commodity_col = find_col(sample_df, possible_commodity_cols)

    key_cols = []

    if mkt_col:
        key_cols.append(mkt_col)

    key_cols.append(date_col)

    if contract_col:
        key_cols.append(contract_col)

    if commodity_col:
        key_cols.append(commodity_col)

    latest_date = get_latest_date(conn, date_col)

    if latest_date:
        # 直近の取りこぼし対策として、前年から再取得する
        start_year = max(
            DEFAULT_START_YEAR,
            parse_cot_date(latest_date).year - 1,
        )
    else:
        start_year = DEFAULT_START_YEAR

    print(
        f"start_year={start_year}, "
        f"end_year={END_YEAR}, "
        f"latest_date={latest_date}"
    )

    for y in range(start_year, END_YEAR + 1):
        print(f"== {y} ==")

        zip_bytes = download_zip(y)
        df = read_first_txt_from_zip(zip_bytes)

        ensure_columns(conn, TABLE, df)

        date_col_y = find_col(df, possible_date_cols)

        if date_col_y is None:
            raise ValueError(f"date col not found in year={y}")

        mkt_col_y = find_col(df, possible_mkt_cols)
        contract_col_y = find_col(df, possible_contract_cols)
        commodity_col_y = find_col(df, possible_commodity_cols)

        key_cols_y = []

        if mkt_col_y:
            key_cols_y.append(mkt_col_y)

        key_cols_y.append(date_col_y)

        if contract_col_y:
            key_cols_y.append(contract_col_y)

        if commodity_col_y:
            key_cols_y.append(commodity_col_y)

        if existing is None:
            try:
                existing = get_existing_keys(conn, key_cols_y)
            except sqlite3.OperationalError:
                existing = set()

        key_tuples = []

        for _, row in df.iterrows():
            key_tuples.append(
                tuple(
                    "" if pd.isna(row[c]) else str(row[c])
                    for c in key_cols_y
                )
            )

        mask = [k not in existing for k in key_tuples]
        new_df = df[mask].copy()

        if new_df.empty:
            print("  no new rows")
            continue

        new_df.to_sql(TABLE, conn, if_exists="append", index=False)

        for _, row in new_df.iterrows():
            existing.add(
                tuple(
                    "" if pd.isna(row[c]) else str(row[c])
                    for c in key_cols_y
                )
            )

        print(f"  inserted: {len(new_df)} rows")

    try:
        cur = conn.cursor()
        cur.execute(f'SELECT MAX("{date_col}") FROM "{TABLE}"')
        max_date = cur.fetchone()[0]
        print(f"latest date in db: {max_date}")

    except Exception:
        pass

print("done.")
```

if **name** == "**main**":
main()

実行結果

修正後に再実行すると、次の結果になりました。

python get_imm_rate.py

start_year=2024, end_year=2026, latest_date=2025-12-30
== 2024 ==
no new rows
== 2025 ==
no new rows
== 2026 ==
inserted: 3866 rows
latest date in db: 2026-03-17
done.

この結果から、次のことが確認できます。

  • start_year=2024 になっている
  • DBの最新日付を見て、前年から再取得できている
  • 2024年と2025年は既存データとしてスキップされている
  • 2026年の未取得分だけ 3866 rows 追加されている
  • DB内の最新日付が 2026-03-17 まで更新されている

つまり、今回の目的だった「最新情報を継続取得できる」「再実行で落ちない」「raw履歴を壊さず蓄積できる」は満たせました。

今回の設計で良くなった点

DB最終日ベースで更新できる

以前のように 2015年から毎回すべて取得 する形ではなく、DBに入っている最新日付から開始年を決めるようにしました。

ただし、CFTC側の更新遅延や年またぎの取りこぼしを考えると、最新年だけ再取得するよりも、前年から再取得する方が安全です。

start_year = max(
DEFAULT_START_YEAR,
parse_cot_date(latest_date).year - 1,
)

重複は挿入前に除外するため、前年から取り直してもDBは壊れません。

重複しても落ちにくい

取得したDataFrameをそのままappendせず、既存キーと比較して新規分だけ残すようにしました。

処理の流れは次の通りです。

  1. 既存DBから重複判定キーを取得する
  2. 新しく取得したDataFrame側でも同じキーを作る
  3. 既存キーに含まれる行を除外する
  4. 残った新規行だけappendする

さらにDB側のUNIQUE制約も維持しているため、スクリプト側の判定に漏れがあっても、DBが最終的に重複を防いでくれます。

raw保存と分析処理を分離できる

今回の get_imm_rate.py は、あくまでCOTのrawデータを保存する役割にしています。

分析用の特徴量は、別の build_crisis_features.py 側で作る方針です。

  • get_imm_rate.py:COT rawデータを cot_legacy.db に保存する
  • build_crisis_features.py:rawデータからIMM特徴量を作る

この分離にしておくと、ディーリング用の直近更新と、金融危機分析用の長期履歴を両立しやすくなります。

次に追加したいIMM特徴量

次は build_crisis_features.py を2DB読み込み対応にして、JPYのIMM特徴量を追加します。

最初に追加する特徴量は、次の4つで十分です。

  • imm_jpy_rate
  • imm_jpy_rate_chg_4w
  • imm_jpy_rate_z20
  • imm_jpy_rate_ma20_gap

計算の基本は次の形です。

nc_net = noncommercial_long - noncommercial_short
imm_jpy_rate = nc_net / open_interest

まず使う市場名は、次のものにします。

JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE

JPYのIMMポジションを危機監視特徴量に入れられれば、為替・金利・ボラティリティ・信用スプレッドとあわせて、リスクオフ局面の分析に使いやすくなります。

作業の区切り

今回の修正で、IMMポジション取得スクリプトは、ひとまず運用に耐えられる形になりました。

ここまでの変更は、次のようにコミットしておきます。

git add get_imm_rate.py
git commit -m "get imm position"

まとめ

今回は、CFTCのCOTレポートからIMMポジションを取得する get_imm_rate.py を、差分更新対応に修正しました。

最初のエラーは、同じレコードを再取得してappendしようとしたことによるUNIQUE制約違反でした。

そこで、DBの最新日付を見て取得開始年を決め、既存キーを除外してから新規分だけ保存する形に変更しました。

また、日付列の候補順で YYMMDD を優先してしまう問題もありましたが、YYYY-MM-DD を優先するように修正し、日付パース関数も追加しました。

最終的には、2026年分の未取得データだけが追加され、DBの最新日付も 2026-03-17 まで更新できました。

次は、今回保存したIMM rawデータを使って、build_crisis_features.py にJPYのIMM特徴量を追加していきます。

コメント

タイトルとURLをコピーしました