"""
ferro_ta.analysis.options — Rust-backed derivatives analytics for options.
This module preserves the legacy IV-series helpers and expands them with
pricing, Greeks, implied-volatility inversion, smile analytics, and strike
selection helpers suitable for research and simulation workflows.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import TypeAlias
import numpy as np
from numpy.typing import ArrayLike, NDArray
from ferro_ta._ferro_ta import (
black76_price as _rust_black76_price,
)
from ferro_ta._ferro_ta import (
black76_price_batch as _rust_black76_price_batch,
)
from ferro_ta._ferro_ta import (
bsm_price as _rust_bsm_price,
)
from ferro_ta._ferro_ta import (
bsm_price_batch as _rust_bsm_price_batch,
)
from ferro_ta._ferro_ta import (
expected_move as _rust_expected_move,
)
from ferro_ta._ferro_ta import (
extended_greeks as _rust_extended_greeks,
)
from ferro_ta._ferro_ta import (
extended_greeks_batch as _rust_extended_greeks_batch,
)
from ferro_ta._ferro_ta import (
implied_volatility as _rust_implied_volatility,
)
from ferro_ta._ferro_ta import (
implied_volatility_batch as _rust_implied_volatility_batch,
)
from ferro_ta._ferro_ta import (
iv_percentile as _rust_iv_percentile,
)
from ferro_ta._ferro_ta import (
iv_rank as _rust_iv_rank,
)
from ferro_ta._ferro_ta import (
iv_zscore as _rust_iv_zscore,
)
from ferro_ta._ferro_ta import (
moneyness_labels as _rust_moneyness_labels,
)
from ferro_ta._ferro_ta import (
option_greeks as _rust_option_greeks,
)
from ferro_ta._ferro_ta import (
option_greeks_batch as _rust_option_greeks_batch,
)
from ferro_ta._ferro_ta import (
put_call_parity_deviation as _rust_put_call_parity_deviation,
)
from ferro_ta._ferro_ta import (
select_strike_delta as _rust_select_strike_delta,
)
from ferro_ta._ferro_ta import (
select_strike_offset as _rust_select_strike_offset,
)
from ferro_ta._ferro_ta import (
smile_metrics as _rust_smile_metrics,
)
from ferro_ta._ferro_ta import (
term_structure_slope as _rust_term_structure_slope,
)
from ferro_ta._utils import _to_f64
from ferro_ta.core.exceptions import (
FerroTAInputError,
FerroTAValueError,
_normalize_rust_error,
)
ScalarOrArray: TypeAlias = float | NDArray[np.float64]
__all__ = [
"OptionGreeks",
"ExtendedGreeks",
"SmileMetrics",
"VolCone",
"black_scholes_price",
"black_76_price",
"option_price",
"greeks",
"extended_greeks",
"implied_volatility",
"smile_metrics",
"term_structure_slope",
"label_moneyness",
"select_strike",
"iv_rank",
"iv_percentile",
"iv_zscore",
"put_call_parity_deviation",
"expected_move",
"digital_option_price",
"digital_option_greeks",
"american_option_price",
"early_exercise_premium",
"close_to_close_vol",
"parkinson_vol",
"garman_klass_vol",
"rogers_satchell_vol",
"yang_zhang_vol",
"vol_cone",
]
[docs]
@dataclass(frozen=True)
class ExtendedGreeks:
"""Container for second-order and cross Greeks."""
vanna: ScalarOrArray
volga: ScalarOrArray
charm: ScalarOrArray
speed: ScalarOrArray
color: ScalarOrArray
[docs]
def to_dict(self) -> dict[str, ScalarOrArray]:
return {
"vanna": self.vanna,
"volga": self.volga,
"charm": self.charm,
"speed": self.speed,
"color": self.color,
}
[docs]
@dataclass(frozen=True)
class VolCone:
"""Historical realized vol distribution across window lengths."""
windows: NDArray[np.float64]
min: NDArray[np.float64]
p25: NDArray[np.float64]
median: NDArray[np.float64]
p75: NDArray[np.float64]
max: NDArray[np.float64]
[docs]
def to_dict(self) -> dict[str, NDArray[np.float64]]:
return {
"windows": self.windows,
"min": self.min,
"p25": self.p25,
"median": self.median,
"p75": self.p75,
"max": self.max,
}
[docs]
@dataclass(frozen=True)
class OptionGreeks:
"""Container for first-order Greeks."""
delta: ScalarOrArray
gamma: ScalarOrArray
vega: ScalarOrArray
theta: ScalarOrArray
rho: ScalarOrArray
[docs]
def to_dict(self) -> dict[str, ScalarOrArray]:
return {
"delta": self.delta,
"gamma": self.gamma,
"vega": self.vega,
"theta": self.theta,
"rho": self.rho,
}
[docs]
@dataclass(frozen=True)
class SmileMetrics:
"""Summary metrics for a single smile slice."""
atm_iv: float
risk_reversal_25d: float
butterfly_25d: float
skew_slope: float
convexity: float
[docs]
def to_dict(self) -> dict[str, float]:
return {
"atm_iv": self.atm_iv,
"risk_reversal_25d": self.risk_reversal_25d,
"butterfly_25d": self.butterfly_25d,
"skew_slope": self.skew_slope,
"convexity": self.convexity,
}
def _validate_option_type(option_type: str) -> str:
value = option_type.lower()
if value not in {"call", "put"}:
raise FerroTAValueError("option_type must be 'call' or 'put'.")
return value
def _validate_model(model: str) -> str:
value = model.lower()
aliases = {
"bsm": "bsm",
"black_scholes": "bsm",
"black-scholes": "bsm",
"blackscholes": "bsm",
"black76": "black76",
"black_76": "black76",
"black-76": "black76",
}
if value not in aliases:
raise FerroTAValueError(
"model must be one of 'bsm', 'black_scholes', or 'black76'."
)
return aliases[value]
def _coerce_1d(data: ArrayLike | float, *, name: str) -> tuple[np.ndarray, bool]:
arr = np.asarray(data, dtype=np.float64)
if arr.ndim > 1:
raise FerroTAInputError(f"{name} must be a scalar or 1-D array.")
return np.ascontiguousarray(arr.reshape(-1)), arr.ndim == 0
def _broadcast_inputs(
**kwargs: ArrayLike | float,
) -> tuple[dict[str, np.ndarray], bool]:
arrays: dict[str, np.ndarray] = {}
scalar_flags: list[bool] = []
for name, value in kwargs.items():
arr, is_scalar = _coerce_1d(value, name=name)
arrays[name] = arr
scalar_flags.append(is_scalar)
try:
broadcast = np.broadcast_arrays(*arrays.values())
except ValueError as err:
raise FerroTAInputError(
f"Inputs could not be broadcast together: {', '.join(arrays.keys())}"
) from err
out = {
name: np.ascontiguousarray(arr, dtype=np.float64).reshape(-1)
for name, arr in zip(arrays.keys(), broadcast)
}
return out, all(scalar_flags)
def _result_or_scalar(result: np.ndarray, scalar_mode: bool) -> ScalarOrArray:
return float(result[0]) if scalar_mode else result
[docs]
def iv_rank(iv_series: ArrayLike, window: int = 252) -> NDArray[np.float64]:
"""Compute rolling IV rank in Rust while preserving the legacy API."""
try:
arr = _to_f64(iv_series)
except ValueError as err:
raise FerroTAInputError(str(err)) from err
if len(arr) == 0:
raise FerroTAInputError("iv_series must not be empty.")
if window < 1:
raise FerroTAValueError(f"window must be >= 1, got {window}.")
try:
return np.asarray(_rust_iv_rank(arr, int(window)), dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def iv_percentile(iv_series: ArrayLike, window: int = 252) -> NDArray[np.float64]:
"""Compute rolling IV percentile in Rust while preserving the legacy API."""
try:
arr = _to_f64(iv_series)
except ValueError as err:
raise FerroTAInputError(str(err)) from err
if len(arr) == 0:
raise FerroTAInputError("iv_series must not be empty.")
if window < 1:
raise FerroTAValueError(f"window must be >= 1, got {window}.")
try:
return np.asarray(_rust_iv_percentile(arr, int(window)), dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def iv_zscore(iv_series: ArrayLike, window: int = 252) -> NDArray[np.float64]:
"""Compute rolling IV z-score in Rust while preserving the legacy API."""
try:
arr = _to_f64(iv_series)
except ValueError as err:
raise FerroTAInputError(str(err)) from err
if len(arr) == 0:
raise FerroTAInputError("iv_series must not be empty.")
if window < 1:
raise FerroTAValueError(f"window must be >= 1, got {window}.")
try:
return np.asarray(_rust_iv_zscore(arr, int(window)), dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def black_scholes_price(
spot: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
dividend_yield: ArrayLike | float = 0.0,
) -> ScalarOrArray:
"""Price options under Black-Scholes-Merton."""
option_type = _validate_option_type(option_type)
arrays, scalar_mode = _broadcast_inputs(
spot=spot,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
dividend_yield=dividend_yield,
)
try:
if scalar_mode:
return float(
_rust_bsm_price(
float(arrays["spot"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
float(arrays["dividend_yield"][0]),
)
)
out = _rust_bsm_price_batch(
arrays["spot"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
arrays["dividend_yield"],
option_type,
)
return np.asarray(out, dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def black_76_price(
forward: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
) -> ScalarOrArray:
"""Price options under Black-76."""
option_type = _validate_option_type(option_type)
arrays, scalar_mode = _broadcast_inputs(
forward=forward,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
)
try:
if scalar_mode:
return float(
_rust_black76_price(
float(arrays["forward"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
)
)
out = _rust_black76_price_batch(
arrays["forward"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
option_type,
)
return np.asarray(out, dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def option_price(
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
model: str = "bsm",
carry: ArrayLike | float = 0.0,
) -> ScalarOrArray:
"""Model-dispatched option price helper."""
model = _validate_model(model)
if model == "black76":
return black_76_price(
underlying,
strike,
rate,
time_to_expiry,
volatility,
option_type=option_type,
)
return black_scholes_price(
underlying,
strike,
rate,
time_to_expiry,
volatility,
option_type=option_type,
dividend_yield=carry,
)
[docs]
def greeks(
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
model: str = "bsm",
carry: ArrayLike | float = 0.0,
) -> OptionGreeks:
"""Return delta, gamma, vega, theta, and rho."""
option_type = _validate_option_type(option_type)
model = _validate_model(model)
arrays, scalar_mode = _broadcast_inputs(
underlying=underlying,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
carry=carry,
)
try:
if scalar_mode:
delta, gamma, vega, theta, rho = _rust_option_greeks(
float(arrays["underlying"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
model,
float(arrays["carry"][0]),
)
return OptionGreeks(delta, gamma, vega, theta, rho)
delta, gamma, vega, theta, rho = _rust_option_greeks_batch(
arrays["underlying"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
option_type,
model,
arrays["carry"],
)
return OptionGreeks(
np.asarray(delta, dtype=np.float64),
np.asarray(gamma, dtype=np.float64),
np.asarray(vega, dtype=np.float64),
np.asarray(theta, dtype=np.float64),
np.asarray(rho, dtype=np.float64),
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def implied_volatility(
price: ArrayLike | float,
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
*,
option_type: str = "call",
model: str = "bsm",
carry: ArrayLike | float = 0.0,
initial_guess: ArrayLike | float = 0.2,
tolerance: float = 1e-8,
max_iterations: int = 100,
) -> ScalarOrArray:
"""Invert option prices to implied volatility."""
option_type = _validate_option_type(option_type)
model = _validate_model(model)
arrays, scalar_mode = _broadcast_inputs(
price=price,
underlying=underlying,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
carry=carry,
initial_guess=initial_guess,
)
try:
if scalar_mode:
return float(
_rust_implied_volatility(
float(arrays["price"][0]),
float(arrays["underlying"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
option_type,
model,
float(arrays["carry"][0]),
float(arrays["initial_guess"][0]),
float(tolerance),
int(max_iterations),
)
)
out = _rust_implied_volatility_batch(
arrays["price"],
arrays["underlying"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
option_type,
model,
arrays["carry"],
arrays["initial_guess"],
float(tolerance),
int(max_iterations),
)
return np.asarray(out, dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def smile_metrics(
strikes: ArrayLike,
vols: ArrayLike,
reference_price: float,
time_to_expiry: float,
*,
model: str = "bsm",
rate: float = 0.0,
carry: float = 0.0,
) -> SmileMetrics:
"""Compute ATM IV, 25-delta RR/BF, skew slope, and convexity."""
model = _validate_model(model)
strikes_arr = _to_f64(strikes)
vols_arr = _to_f64(vols)
order = np.argsort(strikes_arr)
strikes_arr = strikes_arr[order]
vols_arr = vols_arr[order]
try:
atm_iv, rr25, bf25, slope, convexity = _rust_smile_metrics(
strikes_arr,
vols_arr,
float(reference_price),
float(time_to_expiry),
model,
float(rate),
float(carry),
)
except ValueError as err:
_normalize_rust_error(err)
return SmileMetrics(atm_iv, rr25, bf25, slope, convexity)
[docs]
def term_structure_slope(tenors: ArrayLike, atm_ivs: ArrayLike) -> float:
"""Slope of ATM IV against tenor."""
try:
return float(_rust_term_structure_slope(_to_f64(tenors), _to_f64(atm_ivs)))
except ValueError as err:
_normalize_rust_error(err)
[docs]
def label_moneyness(
strikes: ArrayLike,
reference_price: float,
*,
option_type: str = "call",
) -> NDArray[np.object_]:
"""Label strikes as ``ITM``, ``ATM``, or ``OTM``."""
option_type = _validate_option_type(option_type)
try:
codes = np.asarray(
_rust_moneyness_labels(
_to_f64(strikes), float(reference_price), option_type
),
dtype=np.int8,
)
except ValueError as err:
_normalize_rust_error(err)
mapping = np.array(["OTM", "ATM", "ITM"], dtype=object)
return mapping[codes + 1]
def _parse_selector_steps(selector: str) -> int:
suffix = selector[3:]
if suffix == "":
return 1
try:
return int(suffix)
except ValueError as err:
raise FerroTAValueError(
f"Could not parse strike selector '{selector}'. Expected forms like ATM, ITM1, OTM2."
) from err
[docs]
def select_strike(
strikes: ArrayLike,
reference_price: float,
*,
option_type: str = "call",
selector: str = "ATM",
delta_target: float | None = None,
volatilities: ArrayLike | None = None,
time_to_expiry: float | None = None,
model: str = "bsm",
rate: float = 0.0,
carry: float = 0.0,
) -> float | None:
"""Select a strike by ATM/ITM/OTM offset or delta target."""
option_type = _validate_option_type(option_type)
model = _validate_model(model)
strikes_arr = _to_f64(strikes)
if len(strikes_arr) == 0:
raise FerroTAInputError("strikes must not be empty.")
selector_norm = selector.strip().upper()
if delta_target is None and selector_norm.startswith("DELTA"):
try:
delta_target = float(selector_norm.replace("DELTA", ""))
except ValueError as err:
raise FerroTAValueError(
f"Could not parse delta selector '{selector}'. Example: selector='DELTA0.25'."
) from err
if delta_target is not None:
if volatilities is None or time_to_expiry is None:
raise FerroTAValueError(
"Delta-based strike selection requires volatilities and time_to_expiry."
)
vols_arr = _to_f64(volatilities)
if len(vols_arr) != len(strikes_arr):
raise FerroTAInputError(
"strikes and volatilities must have the same length."
)
order = np.argsort(strikes_arr)
strikes_arr = strikes_arr[order]
vols_arr = vols_arr[order]
try:
strike = _rust_select_strike_delta(
strikes_arr,
vols_arr,
float(reference_price),
float(time_to_expiry),
float(delta_target),
option_type,
model,
float(rate),
float(carry),
)
except ValueError as err:
_normalize_rust_error(err)
return None if strike is None else float(strike)
order = np.argsort(strikes_arr)
sorted_strikes = strikes_arr[order]
if selector_norm == "ATM":
offset = 0
elif selector_norm.startswith("ITM"):
steps = _parse_selector_steps(selector_norm)
offset = -steps if option_type == "call" else steps
elif selector_norm.startswith("OTM"):
steps = _parse_selector_steps(selector_norm)
offset = steps if option_type == "call" else -steps
else:
raise FerroTAValueError(
f"Unsupported selector '{selector}'. Use ATM, ITM<n>, OTM<n>, or DELTA<x>."
)
try:
strike = _rust_select_strike_offset(
sorted_strikes, float(reference_price), int(offset)
)
except ValueError as err:
_normalize_rust_error(err)
return None if strike is None else float(strike)
[docs]
def extended_greeks(
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
model: str = "bsm",
carry: ArrayLike | float = 0.0,
) -> ExtendedGreeks:
"""Return vanna, volga, charm, speed, and color (second-order / cross Greeks).
All Greeks are computed via closed-form BSM formulas. Black-76 is not
yet supported and returns NaN for all five values.
Parameters
----------
underlying:
Current underlying (spot) price.
strike:
Option strike price.
rate:
Risk-free rate (annualised, decimal — e.g. ``0.05`` for 5 %).
time_to_expiry:
Time to expiry in years.
volatility:
Implied volatility (annualised, decimal).
option_type:
``"call"`` (default) or ``"put"``.
model:
``"bsm"`` (default). ``"black76"`` returns NaN for all fields.
carry:
Continuous carry / dividend yield (annualised, decimal). Default 0.
Returns
-------
ExtendedGreeks
Named tuple with fields:
- **vanna** — ∂Δ/∂σ: sensitivity of delta to a change in vol.
- **volga** — ∂²V/∂σ² (vomma): sensitivity of vega to a change in vol.
- **charm** — ∂Δ/∂t: daily rate of change in delta (theta of delta).
- **speed** — ∂Γ/∂S: rate of change in gamma with respect to spot.
- **color** — ∂Γ/∂t: daily rate of change in gamma.
Notes
-----
Inputs may be scalars or broadcastable arrays. When arrays are supplied
each field of the returned :class:`ExtendedGreeks` is an ``NDArray``.
Closed-form expressions (BSM, zero-carry)::
vanna = -e^{-qT} · φ(d₁) · d₂ / σ
volga = S · e^{-qT} · φ(d₁) · √T · d₁ · d₂ / σ
charm = -e^{-qT} · φ(d₁) · [2(r-q)T - d₂·σ·√T] / (2T·σ·√T)
speed = -Γ/S · (d₁/(σ√T) + 1)
color = -Γ · [r-q + d₁·σ/(2√T) + (2(r-q)T - d₂·σ√T)·d₁/(2T·σ√T)]
"""
option_type = _validate_option_type(option_type)
model = _validate_model(model)
arrays, scalar_mode = _broadcast_inputs(
underlying=underlying,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
carry=carry,
)
try:
if scalar_mode:
vanna, volga, charm, speed, color = _rust_extended_greeks(
float(arrays["underlying"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
model,
float(arrays["carry"][0]),
)
return ExtendedGreeks(vanna, volga, charm, speed, color)
vanna, volga, charm, speed, color = _rust_extended_greeks_batch(
arrays["underlying"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
option_type,
model,
arrays["carry"],
)
return ExtendedGreeks(
np.asarray(vanna, dtype=np.float64),
np.asarray(volga, dtype=np.float64),
np.asarray(charm, dtype=np.float64),
np.asarray(speed, dtype=np.float64),
np.asarray(color, dtype=np.float64),
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def put_call_parity_deviation(
call_price: float,
put_price: float,
spot: float,
strike: float,
rate: float,
time_to_expiry: float,
*,
carry: float = 0.0,
) -> float:
"""Put-call parity deviation: ``C − P − (S·e^{−q·T} − K·e^{−r·T})``.
At no-arbitrage the deviation is exactly 0. A non-zero result indicates
mispricing, a data error, or a stale quote.
Parameters
----------
call_price:
Market or model price of the call option.
put_price:
Market or model price of the put option.
spot:
Current underlying price.
strike:
Common strike price of the call and put.
rate:
Risk-free rate (annualised, decimal).
time_to_expiry:
Time to expiry in years.
carry:
Continuous dividend yield / carry rate (annualised, decimal).
Returns
-------
float
Signed deviation. Positive → call is overpriced relative to put;
negative → put is overpriced relative to call.
Examples
--------
>>> from ferro_ta.analysis.options import option_price, put_call_parity_deviation
>>> call = option_price(100, 100, 0.05, 1.0, 0.2, option_type="call")
>>> put = option_price(100, 100, 0.05, 1.0, 0.2, option_type="put")
>>> put_call_parity_deviation(call, put, 100, 100, 0.05, 1.0) # ≈ 0.0
"""
try:
return float(
_rust_put_call_parity_deviation(
float(call_price),
float(put_price),
float(spot),
float(strike),
float(rate),
float(time_to_expiry),
float(carry),
)
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def expected_move(
spot: float,
iv: float,
days_to_expiry: float,
trading_days_per_year: float = 252.0,
) -> tuple[float, float]:
"""Expected ±1σ move over *days_to_expiry* calendar days.
Uses the log-normal approximation::
upper_move = spot × e^{+σ√(days/trading_days)} − spot
lower_move = spot × e^{−σ√(days/trading_days)} − spot
Parameters
----------
spot:
Current underlying price.
iv:
Implied volatility (annualised, decimal — e.g. ``0.20`` for 20 %).
days_to_expiry:
Number of calendar days until expiry.
trading_days_per_year:
Annualisation factor (default 252).
Returns
-------
tuple[float, float]
``(lower_move, upper_move)`` — signed absolute price changes from
``spot``. ``lower_move < 0``, ``upper_move > 0``.
Notes
-----
Because of log-normal skew, ``|upper_move| > |lower_move|``.
Examples
--------
>>> from ferro_ta.analysis.options import expected_move
>>> lower, upper = expected_move(100.0, 0.20, 30)
>>> round(upper, 2)
7.14
"""
try:
lower, upper = _rust_expected_move(
float(spot), float(iv), float(days_to_expiry), float(trading_days_per_year)
)
return float(lower), float(upper)
except ValueError as err:
_normalize_rust_error(err)
# ---------------------------------------------------------------------------
# Digital options — populated once the Rust bridge is built
# ---------------------------------------------------------------------------
[docs]
def digital_option_price(
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
digital_type: str = "cash_or_nothing",
carry: ArrayLike | float = 0.0,
) -> ScalarOrArray:
"""Price a digital (binary) option under BSM.
Parameters
----------
underlying:
Current underlying (spot) price.
strike:
Option strike price.
rate:
Risk-free rate (annualised, decimal).
time_to_expiry:
Time to expiry in years.
volatility:
Implied volatility (annualised, decimal).
option_type:
``"call"`` (default) or ``"put"``.
digital_type:
``"cash_or_nothing"`` (default) — pays 1 unit of cash if ITM at
expiry; or ``"asset_or_nothing"`` — pays the underlying asset price.
carry:
Continuous carry / dividend yield (annualised, decimal). Default 0.
Returns
-------
float or NDArray[float64]
Option price. Returns a scalar when all inputs are scalars, or an
array when any input is an array.
Notes
-----
Closed-form BSM formulas::
Cash-or-nothing call: e^{−rT} · N(d₂)
Cash-or-nothing put: e^{−rT} · N(−d₂)
Asset-or-nothing call: S · e^{−qT} · N(d₁)
Asset-or-nothing put: S · e^{−qT} · N(−d₁)
Put-call parity for cash-or-nothing: call + put = e^{−rT}.
Put-call parity for asset-or-nothing: call + put = S · e^{−qT}.
Invalid inputs (non-positive spot/strike, negative time or vol) return NaN.
"""
from ferro_ta._ferro_ta import digital_price as _rust_digital_price
from ferro_ta._ferro_ta import digital_price_batch as _rust_digital_price_batch
option_type = _validate_option_type(option_type)
digital_type = digital_type.lower().replace("-", "_")
if digital_type not in {"cash_or_nothing", "asset_or_nothing"}:
raise FerroTAValueError(
"digital_type must be 'cash_or_nothing' or 'asset_or_nothing'."
)
arrays, scalar_mode = _broadcast_inputs(
underlying=underlying,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
carry=carry,
)
try:
if scalar_mode:
return float(
_rust_digital_price(
float(arrays["underlying"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
digital_type,
float(arrays["carry"][0]),
)
)
out = _rust_digital_price_batch(
arrays["underlying"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
option_type,
digital_type,
arrays["carry"],
)
return np.asarray(out, dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def digital_option_greeks(
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
digital_type: str = "cash_or_nothing",
carry: ArrayLike | float = 0.0,
) -> OptionGreeks:
"""Delta, gamma, and vega for a digital option via numerical bumping.
Uses central finite differences (spot bump ε = spot × 10⁻³ for delta/gamma;
vol bump ε = 10⁻³ for vega). Theta and rho are set to NaN.
Parameters
----------
underlying, strike, rate, time_to_expiry, volatility, option_type, carry:
Same as :func:`digital_option_price`.
digital_type:
``"cash_or_nothing"`` (default) or ``"asset_or_nothing"``.
Returns
-------
OptionGreeks
Named tuple; only ``delta``, ``gamma``, ``vega`` are finite.
``theta`` and ``rho`` are NaN.
"""
from ferro_ta._ferro_ta import digital_greeks as _rust_digital_greeks
from ferro_ta._ferro_ta import digital_greeks_batch as _rust_digital_greeks_batch
option_type = _validate_option_type(option_type)
digital_type = digital_type.lower().replace("-", "_")
if digital_type not in {"cash_or_nothing", "asset_or_nothing"}:
raise FerroTAValueError(
"digital_type must be 'cash_or_nothing' or 'asset_or_nothing'."
)
arrays, scalar_mode = _broadcast_inputs(
underlying=underlying,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
carry=carry,
)
try:
if scalar_mode:
delta, gamma, vega = _rust_digital_greeks(
float(arrays["underlying"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
digital_type,
float(arrays["carry"][0]),
)
return OptionGreeks(delta, gamma, vega, float("nan"), float("nan"))
delta, gamma, vega = _rust_digital_greeks_batch(
arrays["underlying"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
option_type,
digital_type,
arrays["carry"],
)
nan_arr = np.full_like(delta, float("nan"))
return OptionGreeks(
np.asarray(delta, dtype=np.float64),
np.asarray(gamma, dtype=np.float64),
np.asarray(vega, dtype=np.float64),
nan_arr,
nan_arr,
)
except ValueError as err:
_normalize_rust_error(err)
# ---------------------------------------------------------------------------
# American options — populated once the Rust bridge is built
# ---------------------------------------------------------------------------
[docs]
def american_option_price(
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
carry: ArrayLike | float = 0.0,
) -> ScalarOrArray:
"""American option price using the Barone-Adesi-Whaley (1987) approximation.
Accurate to within a few basis points for standard equity/index parameters.
O(1) per evaluation — suitable for batch pricing or calibration.
Parameters
----------
underlying:
Current underlying (spot) price.
strike:
Option strike price.
rate:
Risk-free rate (annualised, decimal).
time_to_expiry:
Time to expiry in years.
volatility:
Implied volatility (annualised, decimal).
option_type:
``"call"`` (default) or ``"put"``.
carry:
Continuous carry / dividend yield (annualised, decimal). Default 0.
For calls with ``carry = 0`` (no dividends) early exercise is never
optimal and the result equals the European BSM price.
Returns
-------
float or NDArray[float64]
American option price ≥ European BSM price.
Notes
-----
The BAW approximation uses a quadratic equation to find the critical
exercise boundary S* via Newton-Raphson iteration, then adds the early
exercise premium on top of the European price.
Reference: Barone-Adesi, G. & Whaley, R.E. (1987). "Efficient Analytic
Approximation of American Option Values." *Journal of Finance*, 42(2),
301–320.
See Also
--------
early_exercise_premium : Difference between American and European prices.
"""
from ferro_ta._ferro_ta import american_price as _rust_american_price
from ferro_ta._ferro_ta import american_price_batch as _rust_american_price_batch
option_type = _validate_option_type(option_type)
arrays, scalar_mode = _broadcast_inputs(
underlying=underlying,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
carry=carry,
)
try:
if scalar_mode:
return float(
_rust_american_price(
float(arrays["underlying"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
float(arrays["carry"][0]),
)
)
out = _rust_american_price_batch(
arrays["underlying"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
option_type,
arrays["carry"],
)
return np.asarray(out, dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def early_exercise_premium(
underlying: ArrayLike | float,
strike: ArrayLike | float,
rate: ArrayLike | float,
time_to_expiry: ArrayLike | float,
volatility: ArrayLike | float,
*,
option_type: str = "call",
carry: ArrayLike | float = 0.0,
) -> ScalarOrArray:
"""Early exercise premium: American price − European BSM price.
Represents the additional value an American option holder gains from the
right to exercise before expiry. Always ≥ 0.
Parameters
----------
underlying, strike, rate, time_to_expiry, volatility, option_type, carry:
Same as :func:`american_option_price`.
Returns
-------
float or NDArray[float64]
Premium ≥ 0. Typically 0 for calls with no dividends.
Notes
-----
For equity calls with zero carry (no dividends), early exercise is never
optimal so the premium is ≈ 0. For puts (or calls on dividend-paying
underlyings), the premium increases with in-the-moneyness, rate, and
time to expiry.
"""
from ferro_ta._ferro_ta import (
early_exercise_premium as _rust_early_exercise_premium,
)
from ferro_ta._ferro_ta import (
early_exercise_premium_batch as _rust_early_exercise_premium_batch,
)
option_type = _validate_option_type(option_type)
arrays, scalar_mode = _broadcast_inputs(
underlying=underlying,
strike=strike,
rate=rate,
time_to_expiry=time_to_expiry,
volatility=volatility,
carry=carry,
)
try:
if scalar_mode:
return float(
_rust_early_exercise_premium(
float(arrays["underlying"][0]),
float(arrays["strike"][0]),
float(arrays["rate"][0]),
float(arrays["time_to_expiry"][0]),
float(arrays["volatility"][0]),
option_type,
float(arrays["carry"][0]),
)
)
out = _rust_early_exercise_premium_batch(
arrays["underlying"],
arrays["strike"],
arrays["rate"],
arrays["time_to_expiry"],
arrays["volatility"],
option_type,
arrays["carry"],
)
return np.asarray(out, dtype=np.float64)
except ValueError as err:
_normalize_rust_error(err)
# ---------------------------------------------------------------------------
# Historical volatility estimators — populated once the Rust bridge is built
# ---------------------------------------------------------------------------
[docs]
def close_to_close_vol(
close: ArrayLike,
window: int = 20,
trading_days_per_year: float = 252.0,
) -> NDArray[np.float64]:
"""Rolling close-to-close realized volatility (annualised).
Baseline estimator — uses only closing prices. Less efficient than OHLC
estimators but requires only daily close data.
Parameters
----------
close:
Array of closing prices (length ≥ window + 1).
window:
Rolling look-back period in bars (default 20).
trading_days_per_year:
Annualisation factor (default 252).
Returns
-------
NDArray[float64]
Same length as *close*. First ``window`` values are NaN.
Notes
-----
Formula::
σ = √( Σᵢ ln²(Cᵢ/Cᵢ₋₁) / window × trading_days_per_year )
No Bessel correction is applied (population variance, not sample variance).
"""
from ferro_ta._ferro_ta import close_to_close_vol as _rust_ctc
try:
arr = _to_f64(close)
return np.asarray(
_rust_ctc(arr, int(window), float(trading_days_per_year)), dtype=np.float64
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def parkinson_vol(
high: ArrayLike,
low: ArrayLike,
window: int = 20,
trading_days_per_year: float = 252.0,
) -> NDArray[np.float64]:
"""Rolling Parkinson high-low realized volatility estimator (annualised).
~5× more efficient than close-to-close for diffusion processes.
Does **not** account for drift or overnight gaps.
Parameters
----------
high, low:
Arrays of daily high and low prices (same length, ≥ window).
window:
Rolling look-back period in bars (default 20).
trading_days_per_year:
Annualisation factor (default 252).
Returns
-------
NDArray[float64]
Same length as *high*. First ``window - 1`` values are NaN.
Notes
-----
Formula per window::
σ² = (1 / (4·ln2·window)) · Σ ln²(Hᵢ/Lᵢ) × trading_days_per_year
Reference: Parkinson, M. (1980). "The Extreme Value Method for
Estimating the Variance of the Rate of Return." *Journal of Business*, 53(1).
"""
from ferro_ta._ferro_ta import parkinson_vol as _rust_parkinson
try:
return np.asarray(
_rust_parkinson(
_to_f64(high), _to_f64(low), int(window), float(trading_days_per_year)
),
dtype=np.float64,
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def garman_klass_vol(
open: ArrayLike,
high: ArrayLike,
low: ArrayLike,
close: ArrayLike,
window: int = 20,
trading_days_per_year: float = 252.0,
) -> NDArray[np.float64]:
"""Rolling Garman-Klass OHLC realized volatility estimator (annualised).
Extends Parkinson by incorporating the open-close return. ~7.4× more
efficient than close-to-close. Does **not** handle overnight gaps.
Parameters
----------
open, high, low, close:
Arrays of daily OHLC prices (same length, ≥ window).
window:
Rolling look-back period in bars (default 20).
trading_days_per_year:
Annualisation factor (default 252).
Returns
-------
NDArray[float64]
Same length as *close*. First ``window - 1`` values are NaN.
Notes
-----
Per-bar contribution::
GK = 0.5·ln²(H/L) − (2·ln2 − 1)·ln²(C/O)
Reference: Garman, M.B. & Klass, M.J. (1980). "On the Estimation of
Security Price Volatilities from Historical Data." *Journal of Business*, 53(1).
"""
from ferro_ta._ferro_ta import garman_klass_vol as _rust_gk
try:
return np.asarray(
_rust_gk(
_to_f64(open),
_to_f64(high),
_to_f64(low),
_to_f64(close),
int(window),
float(trading_days_per_year),
),
dtype=np.float64,
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def rogers_satchell_vol(
open: ArrayLike,
high: ArrayLike,
low: ArrayLike,
close: ArrayLike,
window: int = 20,
trading_days_per_year: float = 252.0,
) -> NDArray[np.float64]:
"""Rolling Rogers-Satchell OHLC realized volatility estimator (annualised).
Drift-invariant: unbiased for assets with non-zero expected return.
Does **not** handle overnight gaps.
Parameters
----------
open, high, low, close:
Arrays of daily OHLC prices (same length, ≥ window).
window:
Rolling look-back period in bars (default 20).
trading_days_per_year:
Annualisation factor (default 252).
Returns
-------
NDArray[float64]
Same length as *close*. First ``window - 1`` values are NaN.
Notes
-----
Per-bar contribution (u = ln(H/O), d = ln(L/O), c = ln(C/O))::
RS = u·(u − c) + d·(d − c)
Reference: Rogers, L.C.G. & Satchell, S.E. (1991). "Estimating Variance
from High, Low and Closing Prices." *Annals of Applied Probability*, 1(4).
"""
from ferro_ta._ferro_ta import rogers_satchell_vol as _rust_rs
try:
return np.asarray(
_rust_rs(
_to_f64(open),
_to_f64(high),
_to_f64(low),
_to_f64(close),
int(window),
float(trading_days_per_year),
),
dtype=np.float64,
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def yang_zhang_vol(
open: ArrayLike,
high: ArrayLike,
low: ArrayLike,
close: ArrayLike,
window: int = 20,
trading_days_per_year: float = 252.0,
) -> NDArray[np.float64]:
"""Rolling Yang-Zhang OHLC realized volatility estimator (annualised).
The most efficient standard estimator (~14× vs close-to-close). Handles
overnight gaps by combining overnight, intraday open-close, and
Rogers-Satchell variance components with an optimal weight *k*.
Parameters
----------
open, high, low, close:
Arrays of daily OHLC prices (same length, ≥ window + 1).
window:
Rolling look-back period in bars (default 20).
trading_days_per_year:
Annualisation factor (default 252).
Returns
-------
NDArray[float64]
Same length as *close*. First ``window`` values are NaN.
Notes
-----
Mixed estimator::
σ²_YZ = σ²_overnight + k·σ²_open_close + (1−k)·σ²_RS
where k = 0.34 / (1.34 + (window+1)/(window-1)).
Reference: Yang, D. & Zhang, Q. (2000). "Drift-Independent Volatility
Estimation Based on High, Low, Open, and Close Prices."
*Journal of Business*, 73(3).
"""
from ferro_ta._ferro_ta import yang_zhang_vol as _rust_yz
try:
return np.asarray(
_rust_yz(
_to_f64(open),
_to_f64(high),
_to_f64(low),
_to_f64(close),
int(window),
float(trading_days_per_year),
),
dtype=np.float64,
)
except ValueError as err:
_normalize_rust_error(err)
[docs]
def vol_cone(
close: ArrayLike,
*,
windows: tuple[int, ...] = (21, 42, 63, 126, 252),
trading_days_per_year: float = 252.0,
) -> VolCone:
"""Historical realised vol distribution across window lengths (volatility cone).
For each window, computes the full history of rolling close-to-close
realised vol, then returns the min / p25 / median / p75 / max distribution.
Contextualises current implied vol: "Is 30 % IV cheap or expensive?"
Parameters
----------
close:
Array of closing prices (length ≥ max(windows) + 1).
windows:
Tuple of rolling window sizes in bars. Default ``(21, 42, 63, 126, 252)``
(approx. 1 month, 2 months, 3 months, 6 months, 1 year).
trading_days_per_year:
Annualisation factor (default 252).
Returns
-------
VolCone
Dataclass with arrays ``windows``, ``min``, ``p25``, ``median``,
``p75``, ``max`` — one value per element of *windows*.
Notes
-----
Uses close-to-close vol internally. Overlay the current IV on the cone
to see whether it is historically cheap or expensive for each tenor.
Examples
--------
>>> import numpy as np
>>> from ferro_ta.analysis.options import vol_cone
>>> rng = np.random.default_rng(0)
>>> close = 100 * np.cumprod(np.exp(rng.normal(0, 0.01, 500)))
>>> cone = vol_cone(close, windows=(21, 63, 252))
>>> cone.median # annualised median realised vol per window
"""
from ferro_ta._ferro_ta import vol_cone as _rust_vol_cone
try:
arr = _to_f64(close)
slices = _rust_vol_cone(arr, list(windows), float(trading_days_per_year))
windows_arr = np.array([s[0] for s in slices], dtype=np.float64)
return VolCone(
windows=windows_arr,
min=np.array([s[1] for s in slices], dtype=np.float64),
p25=np.array([s[2] for s in slices], dtype=np.float64),
median=np.array([s[3] for s in slices], dtype=np.float64),
p75=np.array([s[4] for s in slices], dtype=np.float64),
max=np.array([s[5] for s in slices], dtype=np.float64),
)
except ValueError as err:
_normalize_rust_error(err)