"""
Batch Execution API — run indicators on multiple series in a single call.
This module provides a 2-D batch API that accepts a 2-D numpy array
(n_samples × n_series) and applies an indicator to every column, returning
a 2-D output array of the same shape.
For the most common indicators — SMA, EMA, RSI — the 2-D path is handled
entirely in Rust (a single GIL release for all columns). ``batch_apply``
also dispatches these indicators to Rust when possible; other indicators
use the generic Python fallback path.
Functions
---------
batch_sma — SMA on every column of a 2-D array (Rust fast path for 2-D)
batch_ema — EMA on every column of a 2-D array (Rust fast path for 2-D)
batch_rsi — RSI on every column of a 2-D array (Rust fast path for 2-D)
batch_apply — Generic batch wrapper with Rust fast-path for SMA/EMA/RSI
Usage
-----
>>> import numpy as np
>>> from ferro_ta.data.batch import batch_sma
>>> data = np.random.rand(100, 5) # 100 bars, 5 symbols
>>> result = batch_sma(data, timeperiod=14)
>>> result.shape
(100, 5)
"""
from __future__ import annotations
from collections.abc import Callable, Sequence
import numpy as np
from numpy.typing import ArrayLike
from ferro_ta._ferro_ta import (
batch_adx as _rust_batch_adx,
)
from ferro_ta._ferro_ta import (
batch_atr as _rust_batch_atr,
)
from ferro_ta._ferro_ta import (
batch_ema as _rust_batch_ema,
)
from ferro_ta._ferro_ta import (
batch_rsi as _rust_batch_rsi,
)
from ferro_ta._ferro_ta import (
batch_sma as _rust_batch_sma,
)
from ferro_ta._ferro_ta import (
batch_stoch as _rust_batch_stoch,
)
from ferro_ta._ferro_ta import (
run_close_indicators as _rust_run_close_indicators,
)
from ferro_ta._ferro_ta import (
run_hlc_indicators as _rust_run_hlc_indicators,
)
from ferro_ta.core.registry import run as _registry_run
from ferro_ta.indicators.momentum import RSI
from ferro_ta.indicators.overlap import EMA, SMA
__all__ = [
"batch_sma",
"batch_ema",
"batch_rsi",
"batch_apply",
"compute_many",
]
_CLOSE_FASTPATH_DEFAULTS: dict[str, int] = {
"SMA": 30,
"EMA": 30,
"RSI": 14,
"STDDEV": 5,
"VAR": 5,
"LINEARREG": 14,
"LINEARREG_SLOPE": 14,
"LINEARREG_INTERCEPT": 14,
"LINEARREG_ANGLE": 14,
"TSF": 14,
}
_HLC_FASTPATH_DEFAULTS: dict[str, int] = {
"ATR": 14,
"NATR": 14,
"ADX": 14,
"ADXR": 14,
"CCI": 14,
"WILLR": 14,
}
_BATCH_FASTPATH_DEFAULTS: dict[str, int] = {
"SMA": 30,
"EMA": 30,
"RSI": 14,
}
def _resolve_batch_fastpath(
fn: Callable[..., np.ndarray],
kwargs: dict[str, object],
) -> tuple[str, int] | None:
name = getattr(fn, "__name__", "").upper()
if name not in _BATCH_FASTPATH_DEFAULTS:
return None
if set(kwargs) - {"timeperiod"}:
return None
raw = kwargs.get("timeperiod", _BATCH_FASTPATH_DEFAULTS[name])
if not isinstance(raw, int):
return None
return name, int(raw)
def _normalize_indicator_spec(
spec: str | tuple[str, dict[str, object]] | tuple[str, dict[str, object], object],
) -> tuple[str, dict[str, object], object | None]:
if isinstance(spec, str):
return spec, {}, None
if len(spec) == 2:
name, kwargs = spec
return name, kwargs, None
name, kwargs, out_key = spec
return name, kwargs, out_key
def _extract_timeperiod(
name: str, kwargs: dict[str, object], defaults: dict[str, int]
) -> int | None:
if name not in defaults:
return None
extra_keys = set(kwargs) - {"timeperiod"}
if extra_keys:
return None
raw_value = kwargs.get("timeperiod", defaults[name])
if not isinstance(raw_value, int):
return None
return raw_value
[docs]
def compute_many(
indicators: Sequence[
str | tuple[str, dict[str, object]] | tuple[str, dict[str, object], object]
],
*,
close: ArrayLike,
high: ArrayLike | None = None,
low: ArrayLike | None = None,
volume: ArrayLike | None = None,
parallel: bool = True,
) -> list[object]:
"""Compute multiple indicators over the same arrays with grouped Rust calls.
Supported single-output indicators are grouped into one Rust boundary crossing
per input-shape family (`close` only or `high/low/close`). Unsupported specs
fall back to the regular registry path, preserving behavior.
"""
close_arr = np.ascontiguousarray(close, dtype=np.float64)
high_arr = None if high is None else np.ascontiguousarray(high, dtype=np.float64)
low_arr = None if low is None else np.ascontiguousarray(low, dtype=np.float64)
volume_arr = (
None if volume is None else np.ascontiguousarray(volume, dtype=np.float64)
)
normalized = [_normalize_indicator_spec(spec) for spec in indicators]
results: list[object | None] = [None] * len(normalized)
close_indices: list[int] = []
close_names: list[str] = []
close_periods: list[int] = []
hlc_indices: list[int] = []
hlc_names: list[str] = []
hlc_periods: list[int] = []
for idx, (name, kwargs, out_key) in enumerate(normalized):
if out_key is None:
close_period = _extract_timeperiod(name, kwargs, _CLOSE_FASTPATH_DEFAULTS)
if close_period is not None:
close_indices.append(idx)
close_names.append(name)
close_periods.append(close_period)
continue
hlc_period = _extract_timeperiod(name, kwargs, _HLC_FASTPATH_DEFAULTS)
if hlc_period is not None and high_arr is not None and low_arr is not None:
hlc_indices.append(idx)
hlc_names.append(name)
hlc_periods.append(hlc_period)
continue
if close_names:
grouped = _rust_run_close_indicators(
close_arr, close_names, close_periods, parallel
)
for idx, value in zip(close_indices, grouped):
results[idx] = np.asarray(value, dtype=np.float64)
if hlc_names and high_arr is not None and low_arr is not None:
grouped = _rust_run_hlc_indicators(
high_arr, low_arr, close_arr, hlc_names, hlc_periods, parallel
)
for idx, value in zip(hlc_indices, grouped):
results[idx] = np.asarray(value, dtype=np.float64)
for idx, (name, kwargs, _) in enumerate(normalized):
if results[idx] is not None:
continue
try:
results[idx] = _registry_run(name, close_arr, **kwargs)
continue
except (TypeError, Exception):
pass
if high_arr is not None and low_arr is not None:
try:
results[idx] = _registry_run(
name, high_arr, low_arr, close_arr, **kwargs
)
continue
except Exception:
pass
if volume_arr is not None:
try:
results[idx] = _registry_run(
name, high_arr, low_arr, close_arr, volume_arr, **kwargs
)
continue
except Exception:
pass
raise ValueError(
f"Cannot call indicator '{name}': insufficient data columns or incompatible parameters."
)
return [result for result in results]
[docs]
def batch_apply(
data: ArrayLike,
fn: Callable[..., np.ndarray],
**kwargs,
) -> np.ndarray:
"""Apply any single-series indicator *fn* to every column of *data*.
For recognized close-only indicators (SMA/EMA/RSI with default or
``timeperiod`` argument only), this function dispatches to the Rust
batch kernels. Otherwise it falls back to a Python per-column loop.
Parameters
----------
data : array-like, shape (n_samples,) or (n_samples, n_series)
Input data. If 1-D, the function is called directly on the array
and the result is returned without adding a column dimension.
fn : callable
Single-series indicator function (e.g. ``SMA``, ``EMA``, ``RSI``).
It must accept a 1-D array as first positional argument and return
a 1-D array of the same length.
**kwargs
Extra keyword arguments forwarded to *fn* (e.g. ``timeperiod=14``).
Returns
-------
numpy.ndarray
Same shape as *data*. Leading values are ``NaN`` for the warm-up
period, identical to calling *fn* on each column individually.
Examples
--------
>>> import numpy as np
>>> from ferro_ta import SMA
>>> from ferro_ta.data.batch import batch_apply
>>> data = np.random.rand(50, 3)
>>> out = batch_apply(data, SMA, timeperiod=5)
>>> out.shape
(50, 3)
"""
arr = np.asarray(data, dtype=np.float64)
if arr.ndim == 1:
return fn(arr, **kwargs)
if arr.ndim != 2:
raise ValueError(f"batch_apply expects 1-D or 2-D input; got {arr.ndim}-D")
fastpath = _resolve_batch_fastpath(fn, kwargs)
if fastpath is not None:
indicator, timeperiod = fastpath
contiguous = np.ascontiguousarray(arr)
if indicator == "SMA":
return np.asarray(_rust_batch_sma(contiguous, timeperiod, True))
if indicator == "EMA":
return np.asarray(_rust_batch_ema(contiguous, timeperiod, True))
return np.asarray(_rust_batch_rsi(contiguous, timeperiod, True))
n_samples, n_series = arr.shape
result = np.empty((n_samples, n_series), dtype=np.float64)
for j in range(n_series):
result[:, j] = fn(arr[:, j], **kwargs)
return result
[docs]
def batch_sma(
data: ArrayLike,
timeperiod: int = 30,
parallel: bool = True,
) -> np.ndarray:
"""Simple Moving Average on every column of *data*.
For 2-D inputs uses a Rust-side column loop (single GIL release).
When *parallel* is ``True`` (default), columns are processed in parallel
via Rayon across all available CPU cores.
1-D input is passed directly to the single-series SMA.
Parameters
----------
data : array-like, shape (n_samples,) or (n_samples, n_series)
timeperiod : int, default 30
parallel : bool, default True
Enable multi-threaded parallel column processing via Rayon.
Set to ``False`` for small inputs where thread overhead dominates.
Returns
-------
numpy.ndarray — same shape as *data*.
Examples
--------
>>> import numpy as np
>>> from ferro_ta.data.batch import batch_sma
>>> data = np.arange(1.0, 101.0).reshape(100, 1).repeat(3, axis=1)
>>> out = batch_sma(data, timeperiod=10)
>>> out.shape
(100, 3)
"""
arr = np.ascontiguousarray(data, dtype=np.float64)
if arr.ndim == 1:
return SMA(arr, timeperiod=timeperiod)
if arr.ndim != 2:
raise ValueError(f"batch_sma expects 1-D or 2-D input; got {arr.ndim}-D")
return np.asarray(_rust_batch_sma(arr, timeperiod, parallel))
[docs]
def batch_ema(
data: ArrayLike,
timeperiod: int = 30,
parallel: bool = True,
) -> np.ndarray:
"""Exponential Moving Average on every column of *data*.
For 2-D inputs uses a Rust-side column loop (single GIL release).
When *parallel* is ``True`` (default), columns are processed in parallel
via Rayon across all available CPU cores.
Parameters
----------
data : array-like, shape (n_samples,) or (n_samples, n_series)
timeperiod : int, default 30
parallel : bool, default True
Enable multi-threaded parallel column processing via Rayon.
Returns
-------
numpy.ndarray — same shape as *data*.
"""
arr = np.ascontiguousarray(data, dtype=np.float64)
if arr.ndim == 1:
return EMA(arr, timeperiod=timeperiod)
if arr.ndim != 2:
raise ValueError(f"batch_ema expects 1-D or 2-D input; got {arr.ndim}-D")
return np.asarray(_rust_batch_ema(arr, timeperiod, parallel))
[docs]
def batch_rsi(
data: ArrayLike,
timeperiod: int = 14,
parallel: bool = True,
) -> np.ndarray:
"""Relative Strength Index on every column of *data*.
For 2-D inputs uses a Rust-side column loop (single GIL release).
When *parallel* is ``True`` (default), columns are processed in parallel
via Rayon across all available CPU cores.
Parameters
----------
data : array-like, shape (n_samples,) or (n_samples, n_series)
timeperiod : int, default 14
parallel : bool, default True
Enable multi-threaded parallel column processing via Rayon.
Returns
-------
numpy.ndarray — same shape as *data*. Values in [0, 100].
"""
arr = np.ascontiguousarray(data, dtype=np.float64)
if arr.ndim == 1:
return RSI(arr, timeperiod=timeperiod)
if arr.ndim != 2:
raise ValueError(f"batch_rsi expects 1-D or 2-D input; got {arr.ndim}-D")
return np.asarray(_rust_batch_rsi(arr, timeperiod, parallel))
def batch_atr(
high: ArrayLike,
low: ArrayLike,
close: ArrayLike,
timeperiod: int = 14,
parallel: bool = True,
) -> np.ndarray:
h = np.ascontiguousarray(high, dtype=np.float64)
low_arr = np.ascontiguousarray(low, dtype=np.float64)
c = np.ascontiguousarray(close, dtype=np.float64)
return np.asarray(_rust_batch_atr(h, low_arr, c, timeperiod, parallel))
def batch_stoch(
high: ArrayLike,
low: ArrayLike,
close: ArrayLike,
fastk_period: int = 5,
slowk_period: int = 3,
slowd_period: int = 3,
parallel: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
h = np.ascontiguousarray(high, dtype=np.float64)
low_arr = np.ascontiguousarray(low, dtype=np.float64)
c = np.ascontiguousarray(close, dtype=np.float64)
k, d = _rust_batch_stoch(
h, low_arr, c, fastk_period, slowk_period, slowd_period, parallel
)
return np.asarray(k), np.asarray(d)
def batch_adx(
high: ArrayLike,
low: ArrayLike,
close: ArrayLike,
timeperiod: int = 14,
parallel: bool = True,
) -> np.ndarray:
h = np.ascontiguousarray(high, dtype=np.float64)
low_arr = np.ascontiguousarray(low, dtype=np.float64)
c = np.ascontiguousarray(close, dtype=np.float64)
return np.asarray(_rust_batch_adx(h, low_arr, c, timeperiod, parallel))