yfinanceで通貨ペアの日足OHLCVを取得し、SQLiteのresearch.sqliteへ保存する

yfinanceで通貨ペアの日足OHLCVを取得し、SQLiteに保存する

IMMポジションやCOTデータを取得できるようになったので、次は為替レート側のデータを取得します。

今回は、yfinanceを使って主要通貨ペアの日足OHLCVを取得し、SQLiteの research.sqlite に保存します。

まずは日足データを最短で動かすことを優先し、データ品質や欠損が気になったら、あとでDukascopyなど別データソースに差し替えられる構成にします。

今回の方針

今回の方針は以下です。

  • 通貨ペアの日足OHLCVを取得する
  • 取得元はまずyfinanceを使う
  • 保存先は research.sqlite にする
  • trade用DBには後でsignalsだけ流す
  • ペア管理テーブルとOHLCVテーブルを分ける
  • 2回目以降は前回の最終日から差分取得する

まずは分析用データベースに為替の日足データを溜めていきます。

最初に取得する通貨ペア

まずは、クロス円、対ドル、人民元関連を入れます。

  • USDJPY
  • EURJPY
  • GBPJPY
  • AUDJPY
  • EURUSD
  • GBPUSD
  • AUDUSD
  • USDCNH
  • CNHJPY

yfinanceでは、為替ペアはおおむね XXXYYY=X 形式で指定できます。

例えば、USDJPYは以下です。

USDJPY=X

SQLiteスキーマを作成する

まず、SQLiteデータベースを作成します。

sqlite3 research.sqlite

以下のSQLを実行します。

CREATE TABLE IF NOT EXISTS pairs (
  pair_id INTEGER PRIMARY KEY AUTOINCREMENT,
  base   TEXT NOT NULL,
  quote  TEXT NOT NULL,
  source TEXT NOT NULL,
  symbol TEXT NOT NULL,
  UNIQUE(base, quote, source)
);

CREATE TABLE IF NOT EXISTS fx_ohlcv_d (
  pair_id INTEGER NOT NULL,
  date    TEXT NOT NULL,
  open    REAL,
  high    REAL,
  low     REAL,
  close   REAL,
  volume  REAL,
  PRIMARY KEY(pair_id, date),
  FOREIGN KEY(pair_id) REFERENCES pairs(pair_id)
);

CREATE INDEX IF NOT EXISTS idx_fx_ohlcv_d_date ON fx_ohlcv_d(date);

作成できたら、SQLiteを終了します。

.quit

テーブル設計

今回は、通貨ペアの管理と日足データを分けています。

テーブル 役割
pairs 通貨ペアと取得元シンボルの管理
fx_ohlcv_d 日足OHLCVデータの保存

fx_ohlcv_d は、pair_iddate を主キーにしています。

これにより、同じ通貨ペア・同じ日付のデータは重複しない構造になります。

Pythonスクリプトを作成する

次に、yfinanceで日足を取得してSQLiteに保存するPythonスクリプトを作成します。

vim fetch_fx_daily.py

最初に作成したスクリプトは以下です。

import sqlite3
from datetime import datetime
import pandas as pd
import yfinance as yf

DB_PATH = "research.sqlite"

PAIRS = [
    ("USD", "JPY", "yfinance", "USDJPY=X"),
    ("EUR", "JPY", "yfinance", "EURJPY=X"),
    ("GBP", "JPY", "yfinance", "GBPJPY=X"),
    ("AUD", "JPY", "yfinance", "AUDJPY=X"),
    ("EUR", "USD", "yfinance", "EURUSD=X"),
    ("GBP", "USD", "yfinance", "GBPUSD=X"),
    ("AUD", "USD", "yfinance", "AUDUSD=X"),
    ("USD", "CNH", "yfinance", "USDCNH=X"),
    ("CNH", "JPY", "yfinance", "CNHJPY=X"),
]

SCHEMA_SQL = """
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;

CREATE TABLE IF NOT EXISTS pairs (
  pair_id INTEGER PRIMARY KEY AUTOINCREMENT,
  base   TEXT NOT NULL,
  quote  TEXT NOT NULL,
  source TEXT NOT NULL,
  symbol TEXT NOT NULL,
  UNIQUE(base, quote, source)
);

CREATE TABLE IF NOT EXISTS fx_ohlcv_d (
  pair_id INTEGER NOT NULL,
  date    TEXT NOT NULL,
  open    REAL,
  high    REAL,
  low     REAL,
  close   REAL,
  volume  REAL,
  PRIMARY KEY(pair_id, date),
  FOREIGN KEY(pair_id) REFERENCES pairs(pair_id)
);

CREATE INDEX IF NOT EXISTS idx_fx_ohlcv_d_date ON fx_ohlcv_d(date);
"""

def ensure_schema(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA_SQL)

def upsert_pair(conn: sqlite3.Connection, base: str, quote: str, source: str, symbol: str) -> int:
    conn.execute(
        "INSERT OR IGNORE INTO pairs(base, quote, source, symbol) VALUES(?,?,?,?)",
        (base, quote, source, symbol),
    )

    conn.execute(
        "UPDATE pairs SET symbol=? WHERE base=? AND quote=? AND source=?",
        (symbol, base, quote, source),
    )

    row = conn.execute(
        "SELECT pair_id FROM pairs WHERE base=? AND quote=? AND source=?",
        (base, quote, source),
    ).fetchone()

    return int(row[0])

def last_date(conn: sqlite3.Connection, pair_id: int) -> str | None:
    row = conn.execute(
        "SELECT MAX(date) FROM fx_ohlcv_d WHERE pair_id=?",
        (pair_id,),
    ).fetchone()

    return row[0] if row and row[0] else None

def fetch_daily(symbol: str, start: str | None) -> pd.DataFrame:
    df = yf.download(
        symbol,
        start=start,
        interval="1d",
        auto_adjust=False,
        progress=False,
    )

    if df is None or df.empty:
        return pd.DataFrame()

    df = df.rename(columns={
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Volume": "volume",
    })

    df = df[["open", "high", "low", "close", "volume"]].copy()
    df.index = pd.to_datetime(df.index).tz_localize(None)
    df["date"] = df.index.strftime("%Y-%m-%d")

    return df[["date", "open", "high", "low", "close", "volume"]]

def insert_ohlcv(conn: sqlite3.Connection, pair_id: int, df: pd.DataFrame) -> int:
    if df.empty:
        return 0

    rows = [
        (
            pair_id,
            r.date,
            float(r.open),
            float(r.high),
            float(r.low),
            float(r.close),
            None if pd.isna(r.volume) else float(r.volume),
        )
        for r in df.itertuples(index=False)
    ]

    conn.executemany(
        """INSERT OR REPLACE INTO fx_ohlcv_d(pair_id,date,open,high,low,close,volume)
           VALUES(?,?,?,?,?,?,?)""",
        rows,
    )

    return len(rows)

def main():
    conn = sqlite3.connect(DB_PATH)

    try:
        ensure_schema(conn)

        total = 0

        for base, quote, source, symbol in PAIRS:
            pair_id = upsert_pair(conn, base, quote, source, symbol)
            ld = last_date(conn, pair_id)

            start = None

            if ld:
                start = ld

            df = fetch_daily(symbol, start=start)
            n = insert_ohlcv(conn, pair_id, df)
            conn.commit()

            print(f"{base}{quote} ({symbol}) pair_id={pair_id} start={start} rows={n}")
            total += n

        print(f"Done. inserted/updated rows = {total}")

    finally:
        conn.close()

if __name__ == "__main__":
    main()

最初のエラー:r.date が存在しない

実行すると、以下のエラーが出ました。

python fetch_fx_daily.py
Traceback (most recent call last):
  File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 136, in <module>
    main()
  File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 125, in main
    n = insert_ohlcv(conn, pair_id, df)
  File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 96, in insert_ohlcv
    rows = [
  File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 97, in <listcomp>
    (pair_id, r.date, float(r.open), float(r.high), float(r.low), float(r.close),
AttributeError: 'Pandas' object has no attribute 'date'

pandasの itertuples() で返ってくる行オブジェクトに、環境によっては r.date のような属性アクセスができないことが原因でした。

一度forループ方式に変更したが解決しなかった

最初は、リスト内包表記をやめて、forループでappendする形に変更しました。

def insert_ohlcv(conn: sqlite3.Connection, pair_id: int, df: pd.DataFrame) -> int:
    if df.empty:
        return 0

    rows = []

    for r in df.itertuples(index=False):
        rows.append((
            pair_id,
            r.date,
            float(r.open),
            float(r.high),
            float(r.low),
            float(r.close),
            None if pd.isna(r.volume) else float(r.volume),
        ))

    conn.executemany(
        """INSERT OR REPLACE INTO fx_ohlcv_d
           (pair_id, date, open, high, low, close, volume)
           VALUES (?,?,?,?,?,?,?)""",
        rows,
    )

    return len(rows)

しかし、この方法でも同じように r.date が見つからないエラーになりました。

AttributeError: 'Pandas' object has no attribute 'date'

つまり、属性アクセスを使わない方法に変える必要がありました。

最終修正:列順を固定してto_numpyで取り出す

最終的には、itertuples() の属性アクセスを使わず、列順を固定して to_numpy() で取り出す形にしました。

def insert_ohlcv(conn: sqlite3.Connection, pair_id: int, df: pd.DataFrame) -> int:
    if df.empty:
        return 0

    df2 = df[["date", "open", "high", "low", "close", "volume"]].copy()

    df2 = df2.where(pd.notnull(df2), None)

    rows = [(pair_id, *row) for row in df2.to_numpy().tolist()]

    conn.executemany(
        """INSERT OR REPLACE INTO fx_ohlcv_d
           (pair_id, date, open, high, low, close, volume)
           VALUES (?,?,?,?,?,?,?)""",
        rows,
    )

    return len(rows)

この形なら、列名を属性として参照しないため、pandas側の挙動に左右されにくくなります。

fetch_dailyも堅牢化する

yfinanceの戻り値は、場合によって列がMultiIndexになることがあります。

そのため、fetch_daily() も少し堅牢にしておきます。

def fetch_daily(symbol: str, start: str | None) -> pd.DataFrame:
    df = yf.download(
        symbol,
        start=start,
        interval="1d",
        auto_adjust=False,
        progress=False,
    )

    if df is None or df.empty:
        return pd.DataFrame()

    if isinstance(df.columns, pd.MultiIndex):
        df.columns = [c[0] for c in df.columns]

    need = ["Open", "High", "Low", "Close", "Volume"]
    missing = [c for c in need if c not in df.columns]

    if missing:
        return pd.DataFrame()

    df = df.rename(columns={
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Volume": "volume",
    }).copy()

    df.index = pd.to_datetime(df.index).tz_localize(None)
    df["date"] = df.index.strftime("%Y-%m-%d")

    return df[["date", "open", "high", "low", "close", "volume"]]

これで、MultiIndexになった場合や、想定した列が存在しない場合にも対応しやすくなります。

修正後の実行結果

修正後、再度実行します。

python fetch_fx_daily.py

結果は以下です。

USDJPY (USDJPY=X) pair_id=1 start=None rows=24
EURJPY (EURJPY=X) pair_id=2 start=None rows=24
GBPJPY (GBPJPY=X) pair_id=3 start=None rows=24
AUDJPY (AUDJPY=X) pair_id=4 start=None rows=24
EURUSD (EURUSD=X) pair_id=5 start=None rows=24
GBPUSD (GBPUSD=X) pair_id=6 start=None rows=24
AUDUSD (AUDUSD=X) pair_id=7 start=None rows=25
USDCNH (USDCNH=X) pair_id=8 start=None rows=1
CNHJPY (CNHJPY=X) pair_id=9 start=None rows=1
Done. inserted/updated rows = 171

これで、通貨ペアごとの日足データをSQLiteへ保存できました。

SQLiteで保存件数を確認する

ペアごとの保存件数を確認します。

sqlite3 research.sqlite "SELECT p.base||p.quote, COUNT(*) FROM fx_ohlcv_d d JOIN pairs p ON p.pair_id=d.pair_id GROUP BY p.pair_id;"

結果は以下です。

USDJPY|24
EURJPY|24
GBPJPY|24
AUDJPY|24
EURUSD|24
GBPUSD|24
AUDUSD|25
USDCNH|1
CNHJPY|1

クロス円や対ドルペアは24〜25行、CNH関連は1行だけ取得できました。

人民元関連はyfinance側の履歴や取得仕様によって短くなる可能性があるため、後で別ソースも検討します。

差分取得の仕組み

今回のスクリプトは、1回目と2回目以降で動きが変わります。

該当する処理は以下です。

ld = last_date(conn, pair_id)

start = None

if ld:
    start = ld

ld は、SQLiteに保存済みの最終日です。

まだデータがない場合、ldNone になります。

その場合、start=None のままyfinanceへ渡すため、yfinanceが返せる範囲のデータを取得します。

2回目以降は、保存済みの最終日を start として指定します。

同じ日付が再取得されても、SQLite側は INSERT OR REPLACE なので安全です。

取得のイメージ

動きとしては、以下のようになります。

  • 1回目:取得できる範囲をまとめて取得
  • 2回目以降:前回の最終日以降だけ取得
  • 同じ日付が含まれても INSERT OR REPLACE で上書き

例えば、1回目に2003年から2026年まで取得できた場合、翌日の2回目実行では直近の1日分だけ追加するような動きになります。

yfinanceで取れる履歴の目安

yfinanceで取得できる履歴は、通貨ペアやデータソース側の都合で変わります。

目安としては以下です。

ペア 開始目安
USDJPY 2003年ごろから
EURUSD 2003年ごろから
AUDUSD 2003年ごろから
EURJPY 2003年ごろから
USDCNH 2010年ごろから。ただし短い可能性あり
CNHJPY 2010年ごろから。ただし短い可能性あり

今回の取得では、CNH関連がかなり短かったため、必要なら別ソースを検討します。

今回の結果

今回の作業で、以下まで完了しました。

  • research.sqlite に通貨ペア管理テーブルを作成した
  • fx_ohlcv_d に日足OHLCVを保存する設計にした
  • yfinanceで主要通貨ペアの日足を取得した
  • pandasの itertuples() による属性アクセスエラーを修正した
  • yfinanceのMultiIndex列にも対応した
  • SQLiteへ INSERT OR REPLACE で保存できた
  • ペアごとの保存件数を確認できた

ハマりどころ

itertuplesの属性アクセスでエラーになる

df.itertuples(index=False) の戻り値に対して r.date のように属性アクセスしようとすると、環境によって失敗することがあります。

今回も以下のエラーが出ました。

AttributeError: 'Pandas' object has no attribute 'date'

最終的には、列順を固定し、to_numpy().tolist() で取り出す方法に変更しました。

yfinanceの列がMultiIndexになる場合がある

yfinanceの戻り値は、取得条件によって列がMultiIndexになる場合があります。

そのため、以下のようにフラット化しておくと安全です。

if isinstance(df.columns, pd.MultiIndex):
    df.columns = [c[0] for c in df.columns]

CNH関連は履歴が短い可能性がある

今回の取得では、USDCNHCNHJPY は1行しか入りませんでした。

人民元関連の分析を行う場合は、yfinance以外のデータソースも検討した方がよさそうです。

次にやること

次は、この為替データとCOTのJPY先物データを結合します。

具体的には、以下を行います。

  • USDJPYの日足を週次へ変換する
  • COTの cot_jpy と日付で結合する
  • USDJPYとIMMネットポジションを同じグラフで表示する
  • COT基準日で見るか、公表日ベースにずらすか検討する
  • IMMネットポジションの変化量やZスコアを特徴量化する

これにより、為替レートと投機筋ポジションの流れを同じ時間軸で確認できるようになります。

まとめ

yfinanceを使って、主要通貨ペアの日足OHLCVを取得し、SQLiteの research.sqlite に保存しました。

最初はpandasの行アクセスでエラーが出ましたが、列順を固定して to_numpy() で取り出す形に変更することで解決しました。

これで、USDJPYなどの為替レートと、CFTC/COTから取得したIMMポジションを結合する準備ができました。

次は、USDJPY価格とJPY先物の投機筋ネットポジションを結合し、為替レートとIMMポジションの関係を可視化していきます。

コメント

タイトルとURLをコピーしました