IMMポジション取得スクリプトを差分更新対応にして、再実行で壊れない形に直す
前回までに、CFTCのCOTレポートからIMMポジションを取得し、SQLiteへ保存する仕組みを作っていました。
ただし、しばらく取得していなかったため、最新データを取り直そうとしたところ、既存レコードとの重複によってスクリプトが停止しました。
今回は、get_imm_rate.py を「毎回フル取得してappendする形」から、「DBの最新日付を見て差分更新する形」に修正します。
目的は次の3つです。
- 最新のIMMポジションを継続取得できるようにする
- 同じスクリプトを再実行しても重複エラーで止まらないようにする
- 金融危機分析やディーリング用の特徴量生成に使えるrawデータを維持する
発生したエラー
久しぶりに get_imm_rate.py を実行したところ、次のようなエラーが出ました。
python get_imm_rate.py == 2015 == sqlite3.IntegrityError: UNIQUE constraint failed: cot_legacy_fut_only.Market and Exchange Names, cot_legacy_fut_only.As of Date in Form YYYY-MM-DD, cot_legacy_fut_only.CFTC Contract Market Code, cot_legacy_fut_only.CFTC Commodity Code
長いTracebackは省略していますが、原因は単純です。
SQLiteの cot_legacy_fut_only テーブルには、次の組み合わせで一意制約を設定しています。
Market and Exchange NamesAs of Date in Form YYYY-MM-DDCFTC Contract Market CodeCFTC Commodity Code
つまり、同じ市場・同じ日付・同じ契約コード・同じ商品コードのデータを、もう一度 append しようとして失敗していました。
これは「最新取得に失敗した」というよりも、「すでに入っている過去年のデータを再投入しようとして止まった」という状態です。
なぜ修正が必要か
ディーリングや金融危機分析で使うなら、IMMポジションは一度取って終わりではありません。
毎週更新されるデータを継続的に取り込み、過去データと直近データを同じDB上で扱えるようにする必要があります。
そのため、単純に次のような形で毎回appendする設計は危険です。
new_df.to_sql(TABLE, conn, if_exists="append", index=False)
この形だと、同じ年を再取得しただけで重複エラーになります。
そこで、今回の方針は次のようにしました。
- DBから最新取得日を読む
- 最新取得日の年をもとに、再取得する開始年を決める
- 安全のため前年から再取得する
- 取得したデータから既存キーを除外する
- 新規分だけSQLiteへappendする
- DB側のUNIQUE制約は最終防衛線として維持する
修正前の問題点
今回のコードには、大きく2つの問題がありました。
1. to_sql が2回実行される可能性があった
メモ段階のコードでは、次のように to_sql が2回走る構造になっていました。
new_df.to_sql(TABLE, conn, if_exists="append", index=False) ensure_columns(conn, TABLE, new_df) new_df.to_sql(TABLE, conn, if_exists="append", index=False)
これでは、新規データであっても同じ内容を2回挿入しようとしてしまいます。
そのため、ensure_columns() を先に実行し、to_sql() は1回だけ実行する形に直しました。
2. 日付列の判定で YYMMDD を優先していた
最初の修正版では、日付列として As of Date in Form YYMMDD を拾ってしまい、次のような実行結果になりました。
python get_imm_rate.py start_year=2030, end_year=2026, latest_date=251230 latest date in db: 251230 done.
一見すると正常終了に見えますが、実際には何も取得できていません。
latest_date=251230 を日付として解釈した結果、2030年 扱いになり、start_year=2030 になっていました。
その結果、end_year=2026 より開始年が未来になり、取得ループが実行されない状態でした。
DBには As of Date in Form YYYY-MM-DD も入っていたので、こちらを優先して使うように修正します。
修正方針
今回の修正では、次の3点を中心に直しました。
START_YEAR = 2015固定をやめる- 重複判定キーをDBのUNIQUE制約に合わせる
YYYY-MM-DD形式の日付列を優先する
重複判定には、次の4列を使います。
key_cols = [ "Market and Exchange Names", "As of Date in Form YYYY-MM-DD", "CFTC Contract Market Code", "CFTC Commodity Code", ]
これにより、同じ市場・同じ日付・同じ契約コード・同じ商品コードのデータは、既存データとして除外できます。
修正版 get_imm_rate.py
以下が、今回修正した get_imm_rate.py です。
import io
import zipfile
import requests
import pandas as pd
import sqlite3
from pathlib import Path
import re
from datetime import datetime
BASE = "[https://www.cftc.gov/files/dea/history](https://www.cftc.gov/files/dea/history)"
DB_PATH = "cot_legacy.db"
TABLE = "cot_legacy_fut_only"
DEFAULT_START_YEAR = 2015
END_YEAR = datetime.now().year
def norm_col(s: str) -> str:
s = s.strip().lower()
s = re.sub(r"[^a-z0-9]+", "*", s)
s = re.sub(r"*+", "*", s).strip("*")
return s
def find_col(df: pd.DataFrame, candidates: list[str]) -> str | None:
norm_map = {norm_col(c): c for c in df.columns}
for cand in candidates:
key = norm_col(cand)
if key in norm_map:
return norm_map[key]
return None
def parse_cot_date(s: str):
s = str(s).strip()
```
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", s):
return pd.to_datetime(s, format="%Y-%m-%d")
if re.fullmatch(r"\d{6}", s):
return pd.to_datetime(s, format="%y%m%d")
return pd.to_datetime(s)
```
def table_exists(conn: sqlite3.Connection, table: str) -> bool:
cur = conn.cursor()
cur.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table,),
)
return cur.fetchone() is not None
def ensure_columns(conn: sqlite3.Connection, table: str, df: pd.DataFrame):
cur = conn.cursor()
cur.execute(f'PRAGMA table_info("{table}")')
existing_cols = {row[1] for row in cur.fetchall()}
missing = [c for c in df.columns if c not in existing_cols]
```
for c in missing:
cur.execute(f'ALTER TABLE "{table}" ADD COLUMN "{c}" TEXT')
if missing:
conn.commit()
```
def ensure_table(conn: sqlite3.Connection, df: pd.DataFrame):
if not table_exists(conn, TABLE):
df.head(0).to_sql(TABLE, conn, if_exists="append", index=False)
def get_latest_date(conn: sqlite3.Connection, date_col: str) -> str | None:
if not table_exists(conn, TABLE):
return None
```
cur = conn.cursor()
try:
cur.execute(f'SELECT MAX("{date_col}") FROM "{TABLE}"')
row = cur.fetchone()
if row and row[0]:
return str(row[0])
except sqlite3.OperationalError:
return None
return None
```
def get_existing_keys(conn: sqlite3.Connection, key_cols: list[str]) -> set[tuple]:
cur = conn.cursor()
cols = ", ".join([f'"{c}"' for c in key_cols])
cur.execute(f'SELECT DISTINCT {cols} FROM "{TABLE}"')
```
return set(
tuple("" if v is None else str(v) for v in row)
for row in cur.fetchall()
)
```
def download_zip(year: int) -> bytes:
url = f"{BASE}/deacot{year}.zip"
r = requests.get(url, timeout=60)
r.raise_for_status()
return r.content
def read_first_txt_from_zip(zip_bytes: bytes) -> pd.DataFrame:
z = zipfile.ZipFile(io.BytesIO(zip_bytes))
candidates = [
n for n in z.namelist()
if n.lower().endswith((".txt", ".csv"))
]
```
if not candidates:
raise RuntimeError(f"no txt/csv found in zip: {z.namelist()[:10]}")
name = candidates[0]
with z.open(name) as f:
return pd.read_csv(f)
```
def main():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
```
# YYYY-MM-DD を優先する
possible_date_cols = [
"As of Date in Form YYYY-MM-DD",
"As_of_Date_In_Form_YYYY-MM-DD",
"Report_Date_as_YYYY-MM-DD",
"As of Date in Form YYMMDD",
"As_of_Date_In_Form_YYMMDD",
]
possible_mkt_cols = [
"Market_and_Exchange_Names",
"Market and Exchange Names",
]
possible_contract_cols = [
"CFTC_Contract_Market_Code",
"CFTC Contract Market Code",
]
possible_commodity_cols = [
"CFTC_Commodity_Code",
"CFTC Commodity Code",
]
with sqlite3.connect(DB_PATH) as conn:
existing = None
sample_year = DEFAULT_START_YEAR
sample_df = read_first_txt_from_zip(download_zip(sample_year))
ensure_table(conn, sample_df)
ensure_columns(conn, TABLE, sample_df)
date_col = find_col(sample_df, possible_date_cols)
if date_col is None:
raise ValueError(
f"date col not found. columns={list(sample_df.columns)[:50]}"
)
mkt_col = find_col(sample_df, possible_mkt_cols)
contract_col = find_col(sample_df, possible_contract_cols)
commodity_col = find_col(sample_df, possible_commodity_cols)
key_cols = []
if mkt_col:
key_cols.append(mkt_col)
key_cols.append(date_col)
if contract_col:
key_cols.append(contract_col)
if commodity_col:
key_cols.append(commodity_col)
latest_date = get_latest_date(conn, date_col)
if latest_date:
# 直近の取りこぼし対策として、前年から再取得する
start_year = max(
DEFAULT_START_YEAR,
parse_cot_date(latest_date).year - 1,
)
else:
start_year = DEFAULT_START_YEAR
print(
f"start_year={start_year}, "
f"end_year={END_YEAR}, "
f"latest_date={latest_date}"
)
for y in range(start_year, END_YEAR + 1):
print(f"== {y} ==")
zip_bytes = download_zip(y)
df = read_first_txt_from_zip(zip_bytes)
ensure_columns(conn, TABLE, df)
date_col_y = find_col(df, possible_date_cols)
if date_col_y is None:
raise ValueError(f"date col not found in year={y}")
mkt_col_y = find_col(df, possible_mkt_cols)
contract_col_y = find_col(df, possible_contract_cols)
commodity_col_y = find_col(df, possible_commodity_cols)
key_cols_y = []
if mkt_col_y:
key_cols_y.append(mkt_col_y)
key_cols_y.append(date_col_y)
if contract_col_y:
key_cols_y.append(contract_col_y)
if commodity_col_y:
key_cols_y.append(commodity_col_y)
if existing is None:
try:
existing = get_existing_keys(conn, key_cols_y)
except sqlite3.OperationalError:
existing = set()
key_tuples = []
for _, row in df.iterrows():
key_tuples.append(
tuple(
"" if pd.isna(row[c]) else str(row[c])
for c in key_cols_y
)
)
mask = [k not in existing for k in key_tuples]
new_df = df[mask].copy()
if new_df.empty:
print(" no new rows")
continue
new_df.to_sql(TABLE, conn, if_exists="append", index=False)
for _, row in new_df.iterrows():
existing.add(
tuple(
"" if pd.isna(row[c]) else str(row[c])
for c in key_cols_y
)
)
print(f" inserted: {len(new_df)} rows")
try:
cur = conn.cursor()
cur.execute(f'SELECT MAX("{date_col}") FROM "{TABLE}"')
max_date = cur.fetchone()[0]
print(f"latest date in db: {max_date}")
except Exception:
pass
print("done.")
```
if **name** == "**main**":
main()
実行結果
修正後に再実行すると、次の結果になりました。
python get_imm_rate.py start_year=2024, end_year=2026, latest_date=2025-12-30 == 2024 == no new rows == 2025 == no new rows == 2026 == inserted: 3866 rows latest date in db: 2026-03-17 done.
この結果から、次のことが確認できます。
start_year=2024になっている- DBの最新日付を見て、前年から再取得できている
- 2024年と2025年は既存データとしてスキップされている
- 2026年の未取得分だけ
3866 rows追加されている - DB内の最新日付が
2026-03-17まで更新されている
つまり、今回の目的だった「最新情報を継続取得できる」「再実行で落ちない」「raw履歴を壊さず蓄積できる」は満たせました。
今回の設計で良くなった点
DB最終日ベースで更新できる
以前のように 2015年から毎回すべて取得 する形ではなく、DBに入っている最新日付から開始年を決めるようにしました。
ただし、CFTC側の更新遅延や年またぎの取りこぼしを考えると、最新年だけ再取得するよりも、前年から再取得する方が安全です。
start_year = max( DEFAULT_START_YEAR, parse_cot_date(latest_date).year - 1, )
重複は挿入前に除外するため、前年から取り直してもDBは壊れません。
重複しても落ちにくい
取得したDataFrameをそのままappendせず、既存キーと比較して新規分だけ残すようにしました。
処理の流れは次の通りです。
- 既存DBから重複判定キーを取得する
- 新しく取得したDataFrame側でも同じキーを作る
- 既存キーに含まれる行を除外する
- 残った新規行だけappendする
さらにDB側のUNIQUE制約も維持しているため、スクリプト側の判定に漏れがあっても、DBが最終的に重複を防いでくれます。
raw保存と分析処理を分離できる
今回の get_imm_rate.py は、あくまでCOTのrawデータを保存する役割にしています。
分析用の特徴量は、別の build_crisis_features.py 側で作る方針です。
get_imm_rate.py:COT rawデータをcot_legacy.dbに保存するbuild_crisis_features.py:rawデータからIMM特徴量を作る
この分離にしておくと、ディーリング用の直近更新と、金融危機分析用の長期履歴を両立しやすくなります。
次に追加したいIMM特徴量
次は build_crisis_features.py を2DB読み込み対応にして、JPYのIMM特徴量を追加します。
最初に追加する特徴量は、次の4つで十分です。
imm_jpy_rateimm_jpy_rate_chg_4wimm_jpy_rate_z20imm_jpy_rate_ma20_gap
計算の基本は次の形です。
nc_net = noncommercial_long - noncommercial_short imm_jpy_rate = nc_net / open_interest
まず使う市場名は、次のものにします。
JAPANESE YEN - CHICAGO MERCANTILE EXCHANGE
JPYのIMMポジションを危機監視特徴量に入れられれば、為替・金利・ボラティリティ・信用スプレッドとあわせて、リスクオフ局面の分析に使いやすくなります。
作業の区切り
今回の修正で、IMMポジション取得スクリプトは、ひとまず運用に耐えられる形になりました。
ここまでの変更は、次のようにコミットしておきます。
git add get_imm_rate.py git commit -m "get imm position"
まとめ
今回は、CFTCのCOTレポートからIMMポジションを取得する get_imm_rate.py を、差分更新対応に修正しました。
最初のエラーは、同じレコードを再取得してappendしようとしたことによるUNIQUE制約違反でした。
そこで、DBの最新日付を見て取得開始年を決め、既存キーを除外してから新規分だけ保存する形に変更しました。
また、日付列の候補順で YYMMDD を優先してしまう問題もありましたが、YYYY-MM-DD を優先するように修正し、日付パース関数も追加しました。
最終的には、2026年分の未取得データだけが追加され、DBの最新日付も 2026-03-17 まで更新できました。
次は、今回保存したIMM rawデータを使って、build_crisis_features.py にJPYのIMM特徴量を追加していきます。

コメント