yfinanceで通貨ペアの日足OHLCVを取得し、SQLiteに保存する
IMMポジションやCOTデータを取得できるようになったので、次は為替レート側のデータを取得します。
今回は、yfinanceを使って主要通貨ペアの日足OHLCVを取得し、SQLiteの research.sqlite に保存します。
まずは日足データを最短で動かすことを優先し、データ品質や欠損が気になったら、あとでDukascopyなど別データソースに差し替えられる構成にします。
今回の方針
今回の方針は以下です。
- 通貨ペアの日足OHLCVを取得する
- 取得元はまずyfinanceを使う
- 保存先は
research.sqliteにする - trade用DBには後でsignalsだけ流す
- ペア管理テーブルとOHLCVテーブルを分ける
- 2回目以降は前回の最終日から差分取得する
まずは分析用データベースに為替の日足データを溜めていきます。
最初に取得する通貨ペア
まずは、クロス円、対ドル、人民元関連を入れます。
- USDJPY
- EURJPY
- GBPJPY
- AUDJPY
- EURUSD
- GBPUSD
- AUDUSD
- USDCNH
- CNHJPY
yfinanceでは、為替ペアはおおむね XXXYYY=X 形式で指定できます。
例えば、USDJPYは以下です。
USDJPY=X
SQLiteスキーマを作成する
まず、SQLiteデータベースを作成します。
sqlite3 research.sqlite
以下のSQLを実行します。
CREATE TABLE IF NOT EXISTS pairs ( pair_id INTEGER PRIMARY KEY AUTOINCREMENT, base TEXT NOT NULL, quote TEXT NOT NULL, source TEXT NOT NULL, symbol TEXT NOT NULL, UNIQUE(base, quote, source) ); CREATE TABLE IF NOT EXISTS fx_ohlcv_d ( pair_id INTEGER NOT NULL, date TEXT NOT NULL, open REAL, high REAL, low REAL, close REAL, volume REAL, PRIMARY KEY(pair_id, date), FOREIGN KEY(pair_id) REFERENCES pairs(pair_id) ); CREATE INDEX IF NOT EXISTS idx_fx_ohlcv_d_date ON fx_ohlcv_d(date);
作成できたら、SQLiteを終了します。
.quit
テーブル設計
今回は、通貨ペアの管理と日足データを分けています。
| テーブル | 役割 |
|---|---|
pairs |
通貨ペアと取得元シンボルの管理 |
fx_ohlcv_d |
日足OHLCVデータの保存 |
fx_ohlcv_d は、pair_id と date を主キーにしています。
これにより、同じ通貨ペア・同じ日付のデータは重複しない構造になります。
Pythonスクリプトを作成する
次に、yfinanceで日足を取得してSQLiteに保存するPythonスクリプトを作成します。
vim fetch_fx_daily.py
最初に作成したスクリプトは以下です。
import sqlite3
from datetime import datetime
import pandas as pd
import yfinance as yf
DB_PATH = "research.sqlite"
PAIRS = [
("USD", "JPY", "yfinance", "USDJPY=X"),
("EUR", "JPY", "yfinance", "EURJPY=X"),
("GBP", "JPY", "yfinance", "GBPJPY=X"),
("AUD", "JPY", "yfinance", "AUDJPY=X"),
("EUR", "USD", "yfinance", "EURUSD=X"),
("GBP", "USD", "yfinance", "GBPUSD=X"),
("AUD", "USD", "yfinance", "AUDUSD=X"),
("USD", "CNH", "yfinance", "USDCNH=X"),
("CNH", "JPY", "yfinance", "CNHJPY=X"),
]
SCHEMA_SQL = """
PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS pairs (
pair_id INTEGER PRIMARY KEY AUTOINCREMENT,
base TEXT NOT NULL,
quote TEXT NOT NULL,
source TEXT NOT NULL,
symbol TEXT NOT NULL,
UNIQUE(base, quote, source)
);
CREATE TABLE IF NOT EXISTS fx_ohlcv_d (
pair_id INTEGER NOT NULL,
date TEXT NOT NULL,
open REAL,
high REAL,
low REAL,
close REAL,
volume REAL,
PRIMARY KEY(pair_id, date),
FOREIGN KEY(pair_id) REFERENCES pairs(pair_id)
);
CREATE INDEX IF NOT EXISTS idx_fx_ohlcv_d_date ON fx_ohlcv_d(date);
"""
def ensure_schema(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA_SQL)
def upsert_pair(conn: sqlite3.Connection, base: str, quote: str, source: str, symbol: str) -> int:
conn.execute(
"INSERT OR IGNORE INTO pairs(base, quote, source, symbol) VALUES(?,?,?,?)",
(base, quote, source, symbol),
)
conn.execute(
"UPDATE pairs SET symbol=? WHERE base=? AND quote=? AND source=?",
(symbol, base, quote, source),
)
row = conn.execute(
"SELECT pair_id FROM pairs WHERE base=? AND quote=? AND source=?",
(base, quote, source),
).fetchone()
return int(row[0])
def last_date(conn: sqlite3.Connection, pair_id: int) -> str | None:
row = conn.execute(
"SELECT MAX(date) FROM fx_ohlcv_d WHERE pair_id=?",
(pair_id,),
).fetchone()
return row[0] if row and row[0] else None
def fetch_daily(symbol: str, start: str | None) -> pd.DataFrame:
df = yf.download(
symbol,
start=start,
interval="1d",
auto_adjust=False,
progress=False,
)
if df is None or df.empty:
return pd.DataFrame()
df = df.rename(columns={
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
})
df = df[["open", "high", "low", "close", "volume"]].copy()
df.index = pd.to_datetime(df.index).tz_localize(None)
df["date"] = df.index.strftime("%Y-%m-%d")
return df[["date", "open", "high", "low", "close", "volume"]]
def insert_ohlcv(conn: sqlite3.Connection, pair_id: int, df: pd.DataFrame) -> int:
if df.empty:
return 0
rows = [
(
pair_id,
r.date,
float(r.open),
float(r.high),
float(r.low),
float(r.close),
None if pd.isna(r.volume) else float(r.volume),
)
for r in df.itertuples(index=False)
]
conn.executemany(
"""INSERT OR REPLACE INTO fx_ohlcv_d(pair_id,date,open,high,low,close,volume)
VALUES(?,?,?,?,?,?,?)""",
rows,
)
return len(rows)
def main():
conn = sqlite3.connect(DB_PATH)
try:
ensure_schema(conn)
total = 0
for base, quote, source, symbol in PAIRS:
pair_id = upsert_pair(conn, base, quote, source, symbol)
ld = last_date(conn, pair_id)
start = None
if ld:
start = ld
df = fetch_daily(symbol, start=start)
n = insert_ohlcv(conn, pair_id, df)
conn.commit()
print(f"{base}{quote} ({symbol}) pair_id={pair_id} start={start} rows={n}")
total += n
print(f"Done. inserted/updated rows = {total}")
finally:
conn.close()
if __name__ == "__main__":
main()
最初のエラー:r.date が存在しない
実行すると、以下のエラーが出ました。
python fetch_fx_daily.py
Traceback (most recent call last):
File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 136, in <module>
main()
File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 125, in main
n = insert_ohlcv(conn, pair_id, df)
File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 96, in insert_ohlcv
rows = [
File "/Users/snowpool/aw10s/fx_tools/fetch_fx_daily.py", line 97, in <listcomp>
(pair_id, r.date, float(r.open), float(r.high), float(r.low), float(r.close),
AttributeError: 'Pandas' object has no attribute 'date'
pandasの itertuples() で返ってくる行オブジェクトに、環境によっては r.date のような属性アクセスができないことが原因でした。
一度forループ方式に変更したが解決しなかった
最初は、リスト内包表記をやめて、forループでappendする形に変更しました。
def insert_ohlcv(conn: sqlite3.Connection, pair_id: int, df: pd.DataFrame) -> int:
if df.empty:
return 0
rows = []
for r in df.itertuples(index=False):
rows.append((
pair_id,
r.date,
float(r.open),
float(r.high),
float(r.low),
float(r.close),
None if pd.isna(r.volume) else float(r.volume),
))
conn.executemany(
"""INSERT OR REPLACE INTO fx_ohlcv_d
(pair_id, date, open, high, low, close, volume)
VALUES (?,?,?,?,?,?,?)""",
rows,
)
return len(rows)
しかし、この方法でも同じように r.date が見つからないエラーになりました。
AttributeError: 'Pandas' object has no attribute 'date'
つまり、属性アクセスを使わない方法に変える必要がありました。
最終修正:列順を固定してto_numpyで取り出す
最終的には、itertuples() の属性アクセスを使わず、列順を固定して to_numpy() で取り出す形にしました。
def insert_ohlcv(conn: sqlite3.Connection, pair_id: int, df: pd.DataFrame) -> int:
if df.empty:
return 0
df2 = df[["date", "open", "high", "low", "close", "volume"]].copy()
df2 = df2.where(pd.notnull(df2), None)
rows = [(pair_id, *row) for row in df2.to_numpy().tolist()]
conn.executemany(
"""INSERT OR REPLACE INTO fx_ohlcv_d
(pair_id, date, open, high, low, close, volume)
VALUES (?,?,?,?,?,?,?)""",
rows,
)
return len(rows)
この形なら、列名を属性として参照しないため、pandas側の挙動に左右されにくくなります。
fetch_dailyも堅牢化する
yfinanceの戻り値は、場合によって列がMultiIndexになることがあります。
そのため、fetch_daily() も少し堅牢にしておきます。
def fetch_daily(symbol: str, start: str | None) -> pd.DataFrame:
df = yf.download(
symbol,
start=start,
interval="1d",
auto_adjust=False,
progress=False,
)
if df is None or df.empty:
return pd.DataFrame()
if isinstance(df.columns, pd.MultiIndex):
df.columns = [c[0] for c in df.columns]
need = ["Open", "High", "Low", "Close", "Volume"]
missing = [c for c in need if c not in df.columns]
if missing:
return pd.DataFrame()
df = df.rename(columns={
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
}).copy()
df.index = pd.to_datetime(df.index).tz_localize(None)
df["date"] = df.index.strftime("%Y-%m-%d")
return df[["date", "open", "high", "low", "close", "volume"]]
これで、MultiIndexになった場合や、想定した列が存在しない場合にも対応しやすくなります。
修正後の実行結果
修正後、再度実行します。
python fetch_fx_daily.py
結果は以下です。
USDJPY (USDJPY=X) pair_id=1 start=None rows=24 EURJPY (EURJPY=X) pair_id=2 start=None rows=24 GBPJPY (GBPJPY=X) pair_id=3 start=None rows=24 AUDJPY (AUDJPY=X) pair_id=4 start=None rows=24 EURUSD (EURUSD=X) pair_id=5 start=None rows=24 GBPUSD (GBPUSD=X) pair_id=6 start=None rows=24 AUDUSD (AUDUSD=X) pair_id=7 start=None rows=25 USDCNH (USDCNH=X) pair_id=8 start=None rows=1 CNHJPY (CNHJPY=X) pair_id=9 start=None rows=1 Done. inserted/updated rows = 171
これで、通貨ペアごとの日足データをSQLiteへ保存できました。
SQLiteで保存件数を確認する
ペアごとの保存件数を確認します。
sqlite3 research.sqlite "SELECT p.base||p.quote, COUNT(*) FROM fx_ohlcv_d d JOIN pairs p ON p.pair_id=d.pair_id GROUP BY p.pair_id;"
結果は以下です。
USDJPY|24 EURJPY|24 GBPJPY|24 AUDJPY|24 EURUSD|24 GBPUSD|24 AUDUSD|25 USDCNH|1 CNHJPY|1
クロス円や対ドルペアは24〜25行、CNH関連は1行だけ取得できました。
人民元関連はyfinance側の履歴や取得仕様によって短くなる可能性があるため、後で別ソースも検討します。
差分取得の仕組み
今回のスクリプトは、1回目と2回目以降で動きが変わります。
該当する処理は以下です。
ld = last_date(conn, pair_id)
start = None
if ld:
start = ld
ld は、SQLiteに保存済みの最終日です。
まだデータがない場合、ld は None になります。
その場合、start=None のままyfinanceへ渡すため、yfinanceが返せる範囲のデータを取得します。
2回目以降は、保存済みの最終日を start として指定します。
同じ日付が再取得されても、SQLite側は INSERT OR REPLACE なので安全です。
取得のイメージ
動きとしては、以下のようになります。
- 1回目:取得できる範囲をまとめて取得
- 2回目以降:前回の最終日以降だけ取得
- 同じ日付が含まれても
INSERT OR REPLACEで上書き
例えば、1回目に2003年から2026年まで取得できた場合、翌日の2回目実行では直近の1日分だけ追加するような動きになります。
yfinanceで取れる履歴の目安
yfinanceで取得できる履歴は、通貨ペアやデータソース側の都合で変わります。
目安としては以下です。
| ペア | 開始目安 |
|---|---|
| USDJPY | 2003年ごろから |
| EURUSD | 2003年ごろから |
| AUDUSD | 2003年ごろから |
| EURJPY | 2003年ごろから |
| USDCNH | 2010年ごろから。ただし短い可能性あり |
| CNHJPY | 2010年ごろから。ただし短い可能性あり |
今回の取得では、CNH関連がかなり短かったため、必要なら別ソースを検討します。
今回の結果
今回の作業で、以下まで完了しました。
research.sqliteに通貨ペア管理テーブルを作成したfx_ohlcv_dに日足OHLCVを保存する設計にした- yfinanceで主要通貨ペアの日足を取得した
- pandasの
itertuples()による属性アクセスエラーを修正した - yfinanceのMultiIndex列にも対応した
- SQLiteへ
INSERT OR REPLACEで保存できた - ペアごとの保存件数を確認できた
ハマりどころ
itertuplesの属性アクセスでエラーになる
df.itertuples(index=False) の戻り値に対して r.date のように属性アクセスしようとすると、環境によって失敗することがあります。
今回も以下のエラーが出ました。
AttributeError: 'Pandas' object has no attribute 'date'
最終的には、列順を固定し、to_numpy().tolist() で取り出す方法に変更しました。
yfinanceの列がMultiIndexになる場合がある
yfinanceの戻り値は、取得条件によって列がMultiIndexになる場合があります。
そのため、以下のようにフラット化しておくと安全です。
if isinstance(df.columns, pd.MultiIndex):
df.columns = [c[0] for c in df.columns]
CNH関連は履歴が短い可能性がある
今回の取得では、USDCNH と CNHJPY は1行しか入りませんでした。
人民元関連の分析を行う場合は、yfinance以外のデータソースも検討した方がよさそうです。
次にやること
次は、この為替データとCOTのJPY先物データを結合します。
具体的には、以下を行います。
- USDJPYの日足を週次へ変換する
- COTの
cot_jpyと日付で結合する - USDJPYとIMMネットポジションを同じグラフで表示する
- COT基準日で見るか、公表日ベースにずらすか検討する
- IMMネットポジションの変化量やZスコアを特徴量化する
これにより、為替レートと投機筋ポジションの流れを同じ時間軸で確認できるようになります。
まとめ
yfinanceを使って、主要通貨ペアの日足OHLCVを取得し、SQLiteの research.sqlite に保存しました。
最初はpandasの行アクセスでエラーが出ましたが、列順を固定して to_numpy() で取り出す形に変更することで解決しました。
これで、USDJPYなどの為替レートと、CFTC/COTから取得したIMMポジションを結合する準備ができました。
次は、USDJPY価格とJPY先物の投機筋ネットポジションを結合し、為替レートとIMMポジションの関係を可視化していきます。

コメント