Files
pricing/src/data/ingestion/ingest_yahoo_options.py

280 lines
8.3 KiB
Python
Raw Normal View History

from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text
from config.settings import DB_CONFIG, PIPELINE_CONFIG
def build_db_url() -> str:
return (
f"postgresql+psycopg2://{DB_CONFIG['user']}:{DB_CONFIG['password']}"
f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
)
def to_python_number(value):
"""Convert pandas/numpy values to plain Python values or None."""
if pd.isna(value):
return None
return value
def compute_mid(bid, ask):
bid = to_python_number(bid)
ask = to_python_number(ask)
if bid is None or ask is None:
return None
try:
return float((bid + ask) / 2.0)
except Exception:
return None
def infer_option_style(symbol: str) -> str:
"""
Very rough default convention:
- US equities / ETFs from Yahoo are usually American style
"""
# TODO: If later you ingest index options like SPX, adapt this logic.
return "american"
def get_or_create_underlying(conn, symbol: str) -> int:
query_insert = text("""
INSERT INTO underlyings (symbol, exchange, currency)
VALUES (:symbol, :exchange, :currency)
ON CONFLICT (symbol) DO NOTHING
""")
query_select = text("""
SELECT id FROM underlyings WHERE symbol = :symbol
""")
# TODO: improve exchange/currency detection if you want richer metadata
conn.execute(query_insert, {
"symbol": symbol,
"exchange": None,
"currency": "USD",
})
result = conn.execute(query_select, {"symbol": symbol}).fetchone()
return result[0]
def get_or_create_contract(
conn,
underlying_id: int,
option_type: str,
strike: float,
expiration_date,
style: str,
contract_symbol: str,
) -> int:
query_insert = text("""
INSERT INTO option_contracts (
underlying_id, option_type, strike, expiration_date, style, contract_symbol
)
VALUES (
:underlying_id, :option_type, :strike, :expiration_date, :style, :contract_symbol
)
ON CONFLICT (underlying_id, option_type, strike, expiration_date)
DO NOTHING
""")
query_select = text("""
SELECT id
FROM option_contracts
WHERE underlying_id = :underlying_id
AND option_type = :option_type
AND strike = :strike
AND expiration_date = :expiration_date
""")
conn.execute(query_insert, {
"underlying_id": underlying_id,
"option_type": option_type,
"strike": strike,
"expiration_date": expiration_date,
"style": style,
"contract_symbol": contract_symbol,
})
result = conn.execute(query_select, {
"underlying_id": underlying_id,
"option_type": option_type,
"strike": strike,
"expiration_date": expiration_date,
}).fetchone()
return result[0]
def insert_underlying_price(conn, underlying_id: int, price_timestamp: datetime, price: float):
query = text("""
INSERT INTO underlying_prices (underlying_id, price_timestamp, price)
VALUES (:underlying_id, :price_timestamp, :price)
ON CONFLICT (underlying_id, price_timestamp) DO NOTHING
""")
conn.execute(query, {
"underlying_id": underlying_id,
"price_timestamp": price_timestamp,
"price": price,
})
def insert_option_quote(
conn,
contract_id: int,
quote_timestamp: datetime,
bid,
ask,
mid,
last_price,
implied_vol,
volume,
open_interest,
):
query = text("""
INSERT INTO option_quotes (
contract_id, quote_timestamp, bid, ask, mid,
last_price, implied_vol, volume, open_interest
)
VALUES (
:contract_id, :quote_timestamp, :bid, :ask, :mid,
:last_price, :implied_vol, :volume, :open_interest
)
ON CONFLICT (contract_id, quote_timestamp) DO NOTHING
""")
conn.execute(query, {
"contract_id": contract_id,
"quote_timestamp": quote_timestamp,
"bid": bid,
"ask": ask,
"mid": mid,
"last_price": last_price,
"implied_vol": implied_vol,
"volume": volume,
"open_interest": open_interest,
})
def process_option_dataframe(conn, df: pd.DataFrame, underlying_id: int, option_type: str, symbol: str, expiration_date, quote_timestamp: datetime):
style = infer_option_style(symbol)
for _, row in df.iterrows():
strike = to_python_number(row.get("strike"))
contract_symbol = to_python_number(row.get("contractSymbol"))
bid = to_python_number(row.get("bid"))
ask = to_python_number(row.get("ask"))
last_price = to_python_number(row.get("lastPrice"))
implied_vol = to_python_number(row.get("impliedVolatility"))
volume = to_python_number(row.get("volume"))
open_interest = to_python_number(row.get("openInterest"))
if strike is None:
continue
contract_id = get_or_create_contract(
conn=conn,
underlying_id=underlying_id,
option_type=option_type,
strike=float(strike),
expiration_date=expiration_date,
style=style,
contract_symbol=contract_symbol,
)
mid = compute_mid(bid, ask)
insert_option_quote(
conn=conn,
contract_id=contract_id,
quote_timestamp=quote_timestamp,
bid=bid,
ask=ask,
mid=mid,
last_price=last_price,
implied_vol=implied_vol,
volume=int(volume) if volume is not None else None,
open_interest=int(open_interest) if open_interest is not None else None,
)
def ingest_symbol(symbol: str, engine):
print(f"Starting ingestion for {symbol}...")
ticker = yf.Ticker(symbol)
expirations = ticker.options
if not expirations:
print(f"No options found for {symbol}")
return
quote_timestamp = datetime.now(timezone.utc)
# Try to get spot price
info = {}
try:
info = ticker.fast_info
except Exception:
pass
spot_price = None
if info:
spot_price = info.get("lastPrice") or info.get("last_price")
with engine.begin() as conn:
underlying_id = get_or_create_underlying(conn, symbol)
if spot_price is not None:
insert_underlying_price(
conn=conn,
underlying_id=underlying_id,
price_timestamp=quote_timestamp,
price=float(spot_price),
)
for expiry in expirations:
print(f" Fetching expiry {expiry} ...")
chain = ticker.option_chain(expiry)
expiration_date = pd.to_datetime(expiry).date()
process_option_dataframe(
conn=conn,
df=chain.calls,
underlying_id=underlying_id,
option_type="call",
symbol=symbol,
expiration_date=expiration_date,
quote_timestamp=quote_timestamp,
)
process_option_dataframe(
conn=conn,
df=chain.puts,
underlying_id=underlying_id,
option_type="put",
symbol=symbol,
expiration_date=expiration_date,
quote_timestamp=quote_timestamp,
)
print(f"Finished ingestion for {symbol}.")
def main():
db_url = build_db_url()
engine = create_engine(db_url, future=True)
for symbol in PIPELINE_CONFIG["symbols"]:
ingest_symbol(symbol, engine)
if __name__ == "__main__":
main()