WTI・Gold・USDJPY・S&P500・DXYをyfinanceで取得し、SQLiteへ保存する単体テスト

WTI・Gold・USDJPY・S&P500・DXYをyfinanceで取得し、SQLiteへ保存する単体テスト

為替・金・WTI・米金利などを集めて相関分析するためのデータ収集基盤を作り始めました。

今回は、いきなり全銘柄をまとめて取得するのではなく、まずは重要な銘柄を1つずつ単体テストします。

対象は以下です。

  • WTI原油:CL=F
  • Gold先物:GC=F
  • USDJPY:JPY=X
  • S&P500:^GSPC
  • DXY:DX-Y.NYB

それぞれ、yfinanceで1時間足を取得し、SQLiteの market.db に保存できるか確認します。

今回の目的

相関分析や市場環境の変化を確認するには、複数資産の時系列データが必要です。

ただし、yfinanceはティッカーによって取得できたり、空になったり、履歴が短かったりすることがあります。

そのため、まずは1銘柄ずつ単体スクリプトで確認します。

  • 取得できるか
  • 行数は十分か
  • 1時間足が取れるか
  • SQLiteへ保存できるか
  • 後で相関計算に使えそうか

最初に単体で確認しておくと、あとでまとめて取得するスクリプトに統合しやすくなります。

SQLiteの保存テーブル

今回は、すべての銘柄を共通の prices テーブルに保存します。

テーブル定義は各スクリプト内で作成します。

CREATE TABLE IF NOT EXISTS prices (
  ticker TEXT NOT NULL,
  ts     TEXT NOT NULL,
  open   REAL,
  high   REAL,
  low    REAL,
  close  REAL,
  volume REAL,
  PRIMARY KEY (ticker, ts)
);

tickerts を主キーにしているため、同じ銘柄・同じ時刻のデータは INSERT OR REPLACE で上書きできます。

WTIを取得する

まずはWTI原油を取得します。

yfinanceでは、WTI先物は CL=F で取得できます。

過去に別の取得方法が途中で使えなくなったことがあったため、今回はまずyfinanceで単体テストします。

vim scripts/fetch_wti_only.py

内容は以下です。

import datetime as dt
import sqlite3

import pandas as pd
import yfinance as yf

DB_PATH = "./market.db"
TICKER = "CL=F"
INTERVAL = "1h"
LOOKBACK_DAYS = 60

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS prices (
  ticker TEXT NOT NULL,
  ts     TEXT NOT NULL,
  open   REAL,
  high   REAL,
  low    REAL,
  close  REAL,
  volume REAL,
  PRIMARY KEY (ticker, ts)
);
"""

def fetch_yf(ticker: str, interval: str, lookback_days: int) -> pd.DataFrame:
    end = dt.datetime.utcnow()
    start = end - dt.timedelta(days=lookback_days)

    df = yf.download(
        tickers=ticker,
        start=start,
        end=end,
        interval=interval,
        auto_adjust=False,
        progress=False,
        threads=False,
    )

    if df is None or df.empty:
        return df

    if hasattr(df.columns, "levels") and len(df.columns.levels) > 1:
        df.columns = df.columns.get_level_values(0)

    if df.index.tz is not None:
        df.index = df.index.tz_convert(None)

    return df

def upsert_prices(conn: sqlite3.Connection, ticker: str, df: pd.DataFrame):
    rows = []

    for ts, r in df.iterrows():
        rows.append((
            ticker,
            ts.to_pydatetime().isoformat(),
            float(r.get("Open")) if pd.notna(r.get("Open")) else None,
            float(r.get("High")) if pd.notna(r.get("High")) else None,
            float(r.get("Low")) if pd.notna(r.get("Low")) else None,
            float(r.get("Close")) if pd.notna(r.get("Close")) else None,
            float(r.get("Volume")) if pd.notna(r.get("Volume")) else None,
        ))

    conn.executemany("""
        INSERT OR REPLACE INTO prices
        (ticker, ts, open, high, low, close, volume)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, rows)

def main():
    print(f"[WTI] fetching {TICKER} interval={INTERVAL} lookback_days={LOOKBACK_DAYS}")

    df = fetch_yf(TICKER, INTERVAL, LOOKBACK_DAYS)

    if df is None or df.empty:
        print("[WTI] empty result")
        print("対策: LOOKBACK_DAYSを30/14に下げる、INTERVALを1dにする")
        return

    print("[WTI] fetched rows:", len(df))
    print(df.tail(5))

    conn = sqlite3.connect(DB_PATH)

    try:
        conn.executescript(SCHEMA_SQL)
        upsert_prices(conn, TICKER, df)
        conn.commit()
    finally:
        conn.close()

    print(f"[WTI] saved to {DB_PATH} (table=prices)")

if __name__ == "__main__":
    main()

実行します。

python scripts/fetch_wti_only.py

結果は以下です。

[WTI] fetching CL=F interval=1h lookback_days=60
[WTI] fetched rows: 840
Price                Adj Close      Close  ...       Open  Volume
Datetime                                   ...                   
2026-02-10 17:00:00  64.070000  64.070000  ...  63.970001   14980
2026-02-10 18:00:00  63.840000  63.840000  ...  64.070000   25952
2026-02-10 19:00:00  64.139999  64.139999  ...  63.840000   41996
2026-02-10 20:00:00  64.300003  64.300003  ...  64.139999   12651
2026-02-10 21:00:00  64.180000  64.180000  ...  64.309998    2080

[5 rows x 6 columns]
[WTI] saved to ./market.db (table=prices)

WTIは1時間足で840行取得でき、SQLiteへ保存できました。

Goldを取得する

次にGoldを取得します。

yfinanceでは XAUUSD=X が使える場合もありますが、環境によって空になることがあります。

そのため、まずはGold先物の GC=F を使います。

vim scripts/fetch_gold_only.py
import datetime as dt
import sqlite3

import pandas as pd
import yfinance as yf

DB_PATH = "./market.db"
TICKER = "GC=F"
INTERVAL = "1h"
LOOKBACK_DAYS = 60

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS prices (
  ticker TEXT NOT NULL,
  ts     TEXT NOT NULL,
  open   REAL,
  high   REAL,
  low    REAL,
  close  REAL,
  volume REAL,
  PRIMARY KEY (ticker, ts)
);
"""

def fetch_yf(ticker: str, interval: str, lookback_days: int) -> pd.DataFrame:
    end = dt.datetime.utcnow()
    start = end - dt.timedelta(days=lookback_days)

    df = yf.download(
        tickers=ticker,
        start=start,
        end=end,
        interval=interval,
        auto_adjust=False,
        progress=False,
        threads=False,
    )

    if df is None or df.empty:
        return df

    if hasattr(df.columns, "levels") and len(df.columns.levels) > 1:
        df.columns = df.columns.get_level_values(0)

    if df.index.tz is not None:
        df.index = df.index.tz_convert(None)

    return df

def upsert_prices(conn: sqlite3.Connection, ticker: str, df: pd.DataFrame):
    rows = []

    for ts, r in df.iterrows():
        rows.append((
            ticker,
            ts.to_pydatetime().isoformat(),
            float(r.get("Open")) if pd.notna(r.get("Open")) else None,
            float(r.get("High")) if pd.notna(r.get("High")) else None,
            float(r.get("Low")) if pd.notna(r.get("Low")) else None,
            float(r.get("Close")) if pd.notna(r.get("Close")) else None,
            float(r.get("Volume")) if pd.notna(r.get("Volume")) else None,
        ))

    conn.executemany("""
        INSERT OR REPLACE INTO prices
        (ticker, ts, open, high, low, close, volume)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, rows)

def main():
    print(f"[GOLD] fetching {TICKER} interval={INTERVAL} lookback_days={LOOKBACK_DAYS}")

    df = fetch_yf(TICKER, INTERVAL, LOOKBACK_DAYS)

    if df is None or df.empty:
        print("[GOLD] empty result")
        print("対策: LOOKBACK_DAYSを30/14に下げる or INTERVALを1dにする")
        print("代替: XAUUSD=X が取れるか試す / それもダメなら別ソースへ")
        return

    print("[GOLD] fetched rows:", len(df))
    print(df.tail(5))

    conn = sqlite3.connect(DB_PATH)

    try:
        conn.executescript(SCHEMA_SQL)
        upsert_prices(conn, TICKER, df)
        conn.commit()
    finally:
        conn.close()

    print(f"[GOLD] saved to {DB_PATH} (table=prices)")

if __name__ == "__main__":
    main()

実行します。

python scripts/fetch_gold_only.py

結果は以下です。

[GOLD] fetching GC=F interval=1h lookback_days=60
[GOLD] fetched rows: 842
Price                  Adj Close        Close  ...         Open  Volume
Datetime                                       ...                     
2026-02-10 17:00:00  5049.700195  5049.700195  ...  5046.399902    4121
2026-02-10 18:00:00  5036.000000  5036.000000  ...  5049.100098    6989
2026-02-10 19:00:00  5050.799805  5050.799805  ...  5035.299805    2822
2026-02-10 20:00:00  5052.899902  5052.899902  ...  5050.500000    3995
2026-02-10 21:00:00  5050.600098  5050.600098  ...  5052.000000     703

[5 rows x 6 columns]
[GOLD] saved to ./market.db (table=prices)

Goldも1時間足で842行取得でき、SQLiteへ保存できました。

これで、リスク回避資産として使いやすいGoldと、インフレ・景気シグナルとして使いやすいWTIが揃いました。

USDJPYを取得する

次に、通貨側の軸としてUSDJPYを取得します。

yfinanceではUSDJPYは JPY=X で取得できます。

vim scripts/fetch_usdjpy_only.py
import datetime as dt
import sqlite3

import pandas as pd
import yfinance as yf

DB_PATH = "./market.db"
TICKER = "JPY=X"
INTERVAL = "1h"
LOOKBACK_DAYS = 60

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS prices (
  ticker TEXT NOT NULL,
  ts     TEXT NOT NULL,
  open   REAL,
  high   REAL,
  low    REAL,
  close  REAL,
  volume REAL,
  PRIMARY KEY (ticker, ts)
);
"""

def fetch_yf(ticker: str, interval: str, lookback_days: int) -> pd.DataFrame:
    end = dt.datetime.utcnow()
    start = end - dt.timedelta(days=lookback_days)

    df = yf.download(
        tickers=ticker,
        start=start,
        end=end,
        interval=interval,
        auto_adjust=False,
        progress=False,
        threads=False,
    )

    if df is None or df.empty:
        return df

    if hasattr(df.columns, "levels") and len(df.columns.levels) > 1:
        df.columns = df.columns.get_level_values(0)

    if df.index.tz is not None:
        df.index = df.index.tz_convert(None)

    return df

def upsert_prices(conn: sqlite3.Connection, ticker: str, df: pd.DataFrame):
    rows = []

    for ts, r in df.iterrows():
        rows.append((
            ticker,
            ts.to_pydatetime().isoformat(),
            float(r.get("Open")) if pd.notna(r.get("Open")) else None,
            float(r.get("High")) if pd.notna(r.get("High")) else None,
            float(r.get("Low")) if pd.notna(r.get("Low")) else None,
            float(r.get("Close")) if pd.notna(r.get("Close")) else None,
            float(r.get("Volume")) if pd.notna(r.get("Volume")) else None,
        ))

    conn.executemany("""
        INSERT OR REPLACE INTO prices
        (ticker, ts, open, high, low, close, volume)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, rows)

def main():
    print(f"[USDJPY] fetching {TICKER} interval={INTERVAL} lookback_days={LOOKBACK_DAYS}")

    df = fetch_yf(TICKER, INTERVAL, LOOKBACK_DAYS)

    if df is None or df.empty:
        print("[USDJPY] empty result")
        print("対策: LOOKBACK_DAYSを30/14に下げる or INTERVALを1dにする")
        return

    print("[USDJPY] fetched rows:", len(df))
    print(df.tail(5))

    conn = sqlite3.connect(DB_PATH)

    try:
        conn.executescript(SCHEMA_SQL)
        upsert_prices(conn, TICKER, df)
        conn.commit()
    finally:
        conn.close()

    print(f"[USDJPY] saved to {DB_PATH} (table=prices)")

if __name__ == "__main__":
    main()

実行結果は以下です。

[USDJPY] fetching JPY=X interval=1h lookback_days=60
[USDJPY] fetched rows: 941
Price                 Adj Close       Close  ...        Open  Volume
Datetime                                     ...                    
2026-02-10 17:00:00  154.380005  154.380005  ...  154.285004       0
2026-02-10 18:00:00  154.494003  154.494003  ...  154.373993       0
2026-02-10 19:00:00  154.309998  154.309998  ...  154.501999       0
2026-02-10 20:00:00  154.341995  154.341995  ...  154.311005       0
2026-02-10 21:00:00  154.356995  154.356995  ...  154.339005       0

[5 rows x 6 columns]
[USDJPY] saved to ./market.db (table=prices)

USDJPYも1時間足で941行取得できました。

FX系ティッカーでは Volume が0になることがありますが、相関計算では主に終値を使うため、今回は問題なしとします。

S&P500を取得する

次に、リスク資産の代表としてS&P500を取得します。

yfinanceでは ^GSPC を使います。

vim scripts/fetch_sp500_only.py
import datetime as dt
import sqlite3

import pandas as pd
import yfinance as yf

DB_PATH = "./market.db"
TICKER = "^GSPC"
INTERVAL = "1h"
LOOKBACK_DAYS = 60

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS prices (
  ticker TEXT NOT NULL,
  ts     TEXT NOT NULL,
  open   REAL,
  high   REAL,
  low    REAL,
  close  REAL,
  volume REAL,
  PRIMARY KEY (ticker, ts)
);
"""

def fetch_yf(ticker: str, interval: str, lookback_days: int) -> pd.DataFrame:
    end = dt.datetime.utcnow()
    start = end - dt.timedelta(days=lookback_days)

    df = yf.download(
        tickers=ticker,
        start=start,
        end=end,
        interval=interval,
        auto_adjust=False,
        progress=False,
        threads=False,
    )

    if df is None or df.empty:
        return df

    if hasattr(df.columns, "levels") and len(df.columns.levels) > 1:
        df.columns = df.columns.get_level_values(0)

    if df.index.tz is not None:
        df.index = df.index.tz_convert(None)

    return df

def upsert_prices(conn: sqlite3.Connection, ticker: str, df: pd.DataFrame):
    rows = []

    for ts, r in df.iterrows():
        rows.append((
            ticker,
            ts.to_pydatetime().isoformat(),
            float(r.get("Open")) if pd.notna(r.get("Open")) else None,
            float(r.get("High")) if pd.notna(r.get("High")) else None,
            float(r.get("Low")) if pd.notna(r.get("Low")) else None,
            float(r.get("Close")) if pd.notna(r.get("Close")) else None,
            float(r.get("Volume")) if pd.notna(r.get("Volume")) else None,
        ))

    conn.executemany("""
        INSERT OR REPLACE INTO prices
        (ticker, ts, open, high, low, close, volume)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, rows)

def main():
    print(f"[SP500] fetching {TICKER} interval={INTERVAL} lookback_days={LOOKBACK_DAYS}")

    df = fetch_yf(TICKER, INTERVAL, LOOKBACK_DAYS)

    if df is None or df.empty:
        print("[SP500] empty result")
        print("対策: LOOKBACK_DAYSを30/14に下げる or INTERVALを1dにする")
        return

    print("[SP500] fetched rows:", len(df))
    print(df.tail(5))

    conn = sqlite3.connect(DB_PATH)

    try:
        conn.executescript(SCHEMA_SQL)
        upsert_prices(conn, TICKER, df)
        conn.commit()
    finally:
        conn.close()

    print(f"[SP500] saved to {DB_PATH} (table=prices)")

if __name__ == "__main__":
    main()

実行結果は以下です。

[SP500] fetching ^GSPC interval=1h lookback_days=60
[SP500] fetched rows: 260
Price                  Adj Close        Close  ...         Open     Volume
Datetime                                       ...                        
2026-02-10 16:30:00  6973.379883  6973.379883  ...  6968.979980  346539000
2026-02-10 17:30:00  6955.490234  6955.490234  ...  6973.359863  293796000
2026-02-10 18:30:00  6959.459961  6959.459961  ...  6955.529785  291717000
2026-02-10 19:30:00  6953.799805  6953.799805  ...  6959.509766  355681000
2026-02-10 20:30:00  6940.009766  6940.009766  ...  6953.850098  489036000

[5 rows x 6 columns]
[SP500] saved to ./market.db (table=prices)

S&P500も取得できました。

行数が260と少なめですが、これは正常です。

株式指数は米国市場の取引時間だけ1時間足が出るため、FXや先物のように24時間分の行数にはなりません。

DXYを取得する

最後に、ドル指数DXYを取得します。

DXYはyfinanceで取れない場合もあるため、今回はまず DX-Y.NYB を試します。

vim scripts/fetch_dxy_yf_only.py
import datetime as dt
import sqlite3

import pandas as pd
import yfinance as yf

DB_PATH = "./market.db"
TICKER = "DX-Y.NYB"
INTERVAL = "1h"
LOOKBACK_DAYS = 60

SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS prices (
  ticker TEXT NOT NULL,
  ts     TEXT NOT NULL,
  open   REAL,
  high   REAL,
  low    REAL,
  close  REAL,
  volume REAL,
  PRIMARY KEY (ticker, ts)
);
"""

def fetch_yf(ticker: str, interval: str, lookback_days: int) -> pd.DataFrame:
    end = dt.datetime.utcnow()
    start = end - dt.timedelta(days=lookback_days)

    df = yf.download(
        tickers=ticker,
        start=start,
        end=end,
        interval=interval,
        auto_adjust=False,
        progress=False,
        threads=False,
    )

    if df is None or df.empty:
        return df

    if hasattr(df.columns, "levels") and len(df.columns.levels) > 1:
        df.columns = df.columns.get_level_values(0)

    if df.index.tz is not None:
        df.index = df.index.tz_convert(None)

    return df

def upsert_prices(conn: sqlite3.Connection, ticker: str, df: pd.DataFrame):
    rows = []

    for ts, r in df.iterrows():
        rows.append((
            ticker,
            ts.to_pydatetime().isoformat(),
            float(r.get("Open")) if pd.notna(r.get("Open")) else None,
            float(r.get("High")) if pd.notna(r.get("High")) else None,
            float(r.get("Low")) if pd.notna(r.get("Low")) else None,
            float(r.get("Close")) if pd.notna(r.get("Close")) else None,
            float(r.get("Volume")) if pd.notna(r.get("Volume")) else None,
        ))

    conn.executemany("""
        INSERT OR REPLACE INTO prices
        (ticker, ts, open, high, low, close, volume)
        VALUES (?, ?, ?, ?, ?, ?, ?)
    """, rows)

def main():
    print(f"[DXY-YF] fetching {TICKER} interval={INTERVAL} lookback_days={LOOKBACK_DAYS}")

    df = fetch_yf(TICKER, INTERVAL, LOOKBACK_DAYS)

    if df is None or df.empty:
        print("[DXY-YF] empty result")
        print("対策: INTERVALを1dにする、またはFRED版に切替")
        return

    print("[DXY-YF] fetched rows:", len(df))
    print(df.tail(5))

    conn = sqlite3.connect(DB_PATH)

    try:
        conn.executescript(SCHEMA_SQL)
        upsert_prices(conn, TICKER, df)
        conn.commit()
    finally:
        conn.close()

    print(f"[DXY-YF] saved to {DB_PATH} (table=prices)")

if __name__ == "__main__":
    main()

実行結果は以下です。

[DXY-YF] fetching DX-Y.NYB interval=1h lookback_days=60
[DXY-YF] fetched rows: 868
Price                Adj Close      Close  ...       Open  Volume
Datetime                                   ...                   
2026-02-10 18:00:00  96.886002  96.886002  ...  96.841003       0
2026-02-10 19:00:00  96.799004  96.799004  ...  96.886002       0
2026-02-10 20:00:00  96.858002  96.858002  ...  96.797997       0
2026-02-10 21:00:00  96.862000  96.862000  ...  96.858002       0
2026-02-10 22:00:00  96.847000  96.847000  ...  96.861000       0

[5 rows x 6 columns]
[DXY-YF] saved to ./market.db (table=prices)

DXYもyfinanceから取得できました。

これで、ドルの軸も相関分析に含められるようになりました。

今回取得できた銘柄

今回、以下の5銘柄を取得し、SQLiteに保存できました。

銘柄 ティッカー 行数 用途
WTI原油 CL=F 840 インフレ・景気シグナル
Gold先物 GC=F 842 リスク回避資産
USDJPY JPY=X 941 為替の中心軸
S&P500 ^GSPC 260 リスク資産の代表
DXY DX-Y.NYB 868 ドル指数

これで、相関分析の核になるデータが揃いました。

行数が銘柄ごとに違う理由

今回の取得結果を見ると、銘柄ごとに行数がかなり違います。

これはデータの欠損というより、取引時間の違いによるものが大きいです。

  • FX:ほぼ24時間に近い
  • 先物:取引時間が長い
  • 株価指数:米国市場の取引時間が中心

そのため、1時間足のまま相関を取ると、銘柄ごとに時刻が揃わず、欠損が多くなりがちです。

次は日次終値に変換して相関を取る

株式は取引時間だけ、FXや先物はほぼ24時間で動いています。

そのため、1時間足のまま相関を計算すると、時刻が揃わずに欠損が多くなります。

最初の相関分析では、全銘柄を日次終値へリサンプルしてから比較する方がよさそうです。

次に行うことは以下です。

  • prices テーブルから各銘柄の終値を取得する
  • 1時間足を日次終値に変換する
  • 日付を揃えて横持ちデータにする
  • 30日・90日のrolling相関を計算する
  • 相関が急に崩れた箇所を検知する

Neo4jへ相関エッジを入れるのは、まずSQLite上でrolling相関の計算が安定してからにします。

今回の到達点

今回の作業で、以下まで確認できました。

  • WTI原油をyfinanceで取得できた
  • Gold先物をyfinanceで取得できた
  • USDJPYをyfinanceで取得できた
  • S&P500をyfinanceで取得できた
  • DXYをyfinanceで取得できた
  • すべて market.dbprices テーブルへ保存できた
  • 1時間足のまま相関を取ると時刻のズレが問題になりそうだと分かった

ハマりどころ

XAUUSDよりGC先物の方が安定しやすい

金価格を取る場合、XAUUSD=X は環境によって空になることがあります。

今回は、まずGold先物の GC=F を使いました。

相関分析の初期検証では、GC先物で十分進められそうです。

S&P500の行数は少なくて正常

S&P500は株式市場の取引時間だけ1時間足が出るため、FXや先物より行数が少なくなります。

今回も60日分で260行でした。

これは異常ではなく、取引時間の違いによるものです。

FXやDXYのVolumeは0でも問題なし

USDJPYやDXYでは Volume が0になっていました。

相関分析では基本的に終値を使うため、今回の用途では問題なしとします。

次にやること

次は、SQLiteに保存した価格データを使って、rolling相関を計算します。

  • 日次終値への変換
  • 銘柄ごとの終値を横持ちにする
  • 30日rolling相関
  • 90日rolling相関
  • 相関崩れの検知
  • 結果をSQLiteへ保存

相関計算が安定したら、次の段階でNeo4jへ :CORR エッジとして保存し、資産間の関係をグラフとして扱えるようにします。

まとめ

今回は、WTI、Gold、USDJPY、S&P500、DXYをyfinanceで単体取得し、SQLiteへ保存しました。

すべて取得と保存に成功し、相関分析の核になるデータが揃いました。

一方で、1時間足のままだと、FX、先物、株価指数で取引時間が違うため、相関計算時に時刻が揃いにくいことも分かりました。

そのため、次は全銘柄を日次終値にリサンプルし、30日・90日のrolling相関を計算して、市場間の連動や相関崩れを確認していきます。

コメント

タイトルとURLをコピーしました