From 6d079fe631367e4f671ad6fc36af4433f363ae56 Mon Sep 17 00:00:00 2001 From: Luca Date: Sun, 5 Apr 2026 10:30:59 +0100 Subject: [PATCH 1/2] Add trainer --- .vscode/launch.json | 2 +- docs/api/options/divfm.md | 30 +++ docs/api/options/pricer.md | 1 + docs/bibliography.md | 8 + docs/glossary.md | 18 +- mkdocs.yml | 1 + quantflow/options/calibration.py | 12 +- quantflow/options/divfm/__init__.py | 4 + quantflow/options/divfm/network.py | 210 ++++++++++++++++++++ quantflow/options/divfm/pricer.py | 137 +++++++++++++ quantflow/options/divfm/trainer.py | 181 +++++++++++++++++ quantflow/options/divfm/weights.py | 154 +++++++++++++++ quantflow/options/pricer.py | 74 ++++--- quantflow_tests/test_divfm.py | 296 ++++++++++++++++++++++++++++ 14 files changed, 1094 insertions(+), 34 deletions(-) create mode 100644 docs/api/options/divfm.md create mode 100644 quantflow/options/divfm/__init__.py create mode 100644 quantflow/options/divfm/network.py create mode 100644 quantflow/options/divfm/pricer.py create mode 100644 quantflow/options/divfm/trainer.py create mode 100644 quantflow/options/divfm/weights.py create mode 100644 quantflow_tests/test_divfm.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 843c624..418ba6a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -14,7 +14,7 @@ "args": [ "-x", "-vvv", - "quantflow_tests/test_options_pricer.py", + "quantflow_tests/test_divfm.py", ] }, ] diff --git a/docs/api/options/divfm.md b/docs/api/options/divfm.md new file mode 100644 index 0000000..96a9b3b --- /dev/null +++ b/docs/api/options/divfm.md @@ -0,0 +1,30 @@ +# Deep IV Factor Model + +The DIVFM module implements the Deep Implied Volatility Factor Model from +Gauthier, Godin & Legros (2025). The IV surface on a given day is modelled as +a linear combination of $p$ fixed latent functions learned by a neural network: + +$$\sigma_t(M, \tau; \theta) = \mathbf{f}(M, \tau, X; \theta)\,\boldsymbol{\beta}_t = \sum_{i=1}^{p} \beta_{t,i}\,f_i(M, \tau, X; \theta)$$ + +where $M = \frac{1}{\sqrt{\tau}}\log\!\left(\frac{K}{F_{t,\tau}}\right)$ is the +time-scaled moneyness, $\mathbf{f}$ is a feedforward neural network with fixed +weights $\theta$ shared across all days, and $\boldsymbol{\beta}_t$ are daily +coefficients fitted in closed form via OLS. + +## Inference (no torch required) + +::: quantflow.options.divfm.DIVFMPricer + +::: quantflow.options.divfm.DIVFMWeights + +::: quantflow.options.divfm.weights.SubnetWeights + +::: quantflow.options.divfm.weights.LayerWeights + +## Training (requires `quantflow[ml]`) + +::: quantflow.options.divfm.network.DIVFMNetwork + +::: quantflow.options.divfm.trainer.DIVFMTrainer + +::: quantflow.options.divfm.trainer.DayData diff --git a/docs/api/options/pricer.md b/docs/api/options/pricer.md index b421b2f..796736d 100644 --- a/docs/api/options/pricer.md +++ b/docs/api/options/pricer.md @@ -3,6 +3,7 @@ The option pricer module provides classes for pricing options using different stochastic volatility models. +::: quantflow.options.pricer.OptionPricerBase ::: quantflow.options.pricer.OptionPricer diff --git a/docs/bibliography.md b/docs/bibliography.md index b569ccb..6c97f26 100644 --- a/docs/bibliography.md +++ b/docs/bibliography.md @@ -41,3 +41,11 @@ Peter Molnar Master's thesis, University of Economics in Prague, 2020 --- + +### Gauthier + +Gauthier Godin & Legros + +[Deep Implied Volatility Factor Models for Stock Options](https://drive.google.com/file/d/1Rjypn1IqnhpiZz08s0hxl5ISQDC8KeWk/view?usp=sharing) + +2025 diff --git a/docs/glossary.md b/docs/glossary.md index f676abe..31fc621 100644 --- a/docs/glossary.md +++ b/docs/glossary.md @@ -42,20 +42,26 @@ Moneyness, or log strike/forward ratio, is used in the context of option pricing where $K$ is the strike and $F$ is the Forward price. A positive value implies strikes above the forward, which means put options are in the money (ITM) and call options are out of the money (OTM). +## Moneyness Time Scaled + +The time to maturity scaled moneyness, is used in the context of option pricing in order to compare options with different maturities. It is defined as + +\begin{equation} + \frac{1}{\sqrt{T}}\ln{\frac{K}{F}} +\end{equation} + +where $K$ is the strike, $F$ is the Forward price, and $T$ is the time to maturity. It is used to compare options with different maturities by scaling the moneyness by the square root of time to maturity. This is because the price of the underlying asset is subject to random fluctuations, if these fluctuations follow a Brownian motion than the standard deviation of the price movement will increase with the square root of time. + ## Moneyness Vol Adjusted -The vol-adjusted moneyness is used in the context of option pricing in order to compare options with different maturities. It is defined as +The vol-adjusted moneyness is used in the context of option pricing in order to compare options with different maturities and different levels of volatility. It is defined as \begin{equation} \frac{1}{\sigma\sqrt{T}}\ln\frac{K}{F} \end{equation} -where $K$ is the strike and $F$ is the Forward price and $T$ is the time to maturity and $\sigma$ is the implied Black volatility. - -The key reason for dividing by the square root of time-to-maturity is related to how volatility and price movement behave over time. -The price of the underlying asset is subject to random fluctuations, if these fluctuations follow a Brownian motion than the -standard deviation of the price movement will increase with the square root of time. +where $K$ is the strike, $F$ is the Forward price, $T$ is the time to maturity and $\sigma$ is the implied Black volatility. ## Probability Density Function (PDF) diff --git a/mkdocs.yml b/mkdocs.yml index e918cec..9921f77 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -70,6 +70,7 @@ nav: - api/options/index.md - Black-Scholes: api/options/black.md - Calibration: api/options/calibration.md + - Deep IV Factor Model: api/options/divfm.md - Pricer: api/options/pricer.md - Volatility Surface: api/options/vol_surface.md - Stochastic Processes: diff --git a/quantflow/options/calibration.py b/quantflow/options/calibration.py index 1965fe7..1420278 100644 --- a/quantflow/options/calibration.py +++ b/quantflow/options/calibration.py @@ -16,7 +16,7 @@ from quantflow.sp.jump_diffusion import D from quantflow.utils import plot -from .pricer import OptionPricer +from .pricer import OptionPricerBase from .surface import OptionPrice, VolSurface M = TypeVar("M", bound=StochasticProcess1D) @@ -59,8 +59,12 @@ def price_range(self) -> Bounds: class VolModelCalibration(BaseModel, ABC, Generic[M], arbitrary_types_allowed=True): """Abstract class for calibration of a stochastic volatility model""" - pricer: OptionPricer[M] - """The [OptionPricer][quantflow.options.pricer.OptionPricer] for the model""" + pricer: OptionPricerBase = Field( + description=( + "The [OptionPricerBase][quantflow.options.pricer.OptionPricerBase]" + " for the model" + ) + ) vol_surface: VolSurface[Any] = Field(repr=False) """The [VolSurface][quantflow.options.surface.VolSurface] to calibrate the model with""" @@ -111,7 +115,7 @@ def set_params(self, params: np.ndarray) -> None: @property def model(self) -> M: """Get the model""" - return self.pricer.model + return self.pricer.model # type: ignore[attr-defined] @property def ref_date(self) -> datetime: diff --git a/quantflow/options/divfm/__init__.py b/quantflow/options/divfm/__init__.py new file mode 100644 index 0000000..1487277 --- /dev/null +++ b/quantflow/options/divfm/__init__.py @@ -0,0 +1,4 @@ +from .pricer import DIVFMPricer +from .weights import DIVFMWeights + +__all__ = ["DIVFMPricer", "DIVFMWeights"] diff --git a/quantflow/options/divfm/network.py b/quantflow/options/divfm/network.py new file mode 100644 index 0000000..0e485ad --- /dev/null +++ b/quantflow/options/divfm/network.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +import torch +import torch.nn as nn +from typing_extensions import Annotated, Doc + +from .weights import DIVFMWeights, LayerWeights, SubnetWeights + + +def _make_subnet( + input_size: int, + hidden_size: int, + num_hidden_layers: int, + output_size: int, +) -> nn.Sequential: + """Feedforward subnet: affine + sigmoid + batch norm per hidden layer, + then a linear output layer with fixed (non-trainable) batch normalization.""" + layers: list[nn.Module] = [] + in_size = input_size + for _ in range(num_hidden_layers): + layers.append(nn.Linear(in_size, hidden_size)) + layers.append(nn.Sigmoid()) + layers.append(nn.BatchNorm1d(hidden_size)) + in_size = hidden_size + layers.append(nn.Linear(in_size, output_size)) + # Fixed output normalization: zero mean, unit variance, no learnable scale/shift + layers.append(nn.BatchNorm1d(output_size, affine=False)) + return nn.Sequential(*layers) + + +def _extract_subnet(subnet: nn.Sequential) -> SubnetWeights: + """Extract weights from a torch subnet into a + [SubnetWeights][quantflow.options.divfm.weights.SubnetWeights] instance.""" + modules = list(subnet.children()) + layers = [] + i = 0 + while i < len(modules): + linear = modules[i] + assert isinstance(linear, nn.Linear) + W = linear.weight.detach().cpu().numpy().copy() + b = linear.bias.detach().cpu().numpy().copy() + i += 1 + apply_activation = i < len(modules) and isinstance(modules[i], nn.Sigmoid) + if apply_activation: + i += 1 + bn = modules[i] + assert isinstance(bn, nn.BatchNorm1d) + layers.append( + LayerWeights( + weight=W, + bias=b, + bn_mean=bn.running_mean.cpu().numpy().copy(), # type: ignore[union-attr] + bn_var=bn.running_var.cpu().numpy().copy(), # type: ignore[union-attr] + bn_gamma=( + bn.weight.detach().cpu().numpy().copy() if bn.affine else None + ), + bn_beta=(bn.bias.detach().cpu().numpy().copy() if bn.affine else None), + bn_eps=bn.eps, + apply_activation=apply_activation, + ) + ) + i += 1 + return SubnetWeights(layers=layers) + + +class DIVFMNetwork(nn.Module): + r"""Neural network implementing the latent factor functions + + $$ + f_p\left(m, \tau, X; \theta\right) + $$ + + Produces $P$ factor functions with the following structural constraints + (as in [gauthier](/bibliography#gauthier)): + + - $f_1 = 1$ constant, not learned + - $f_2(\tau, X)$ depends only on time-to-maturity and optional extra features X + - $f_3(m)$ depends only on time-scaled moneyness + - $f_4, ..., f_p (m, \tau, X)$ unrestricted + + These structural constraints improve interpretability by associating each + factor with a specific dimension of the implied volatility surface. + + The network uses sigmoid activations throughout to ensure the implied + volatility surface is twice continuously differentiable in the strike + dimension, which is required for a well-defined risk-neutral density. + """ + + def __init__( + self, + num_factors: Annotated[ + int, + Doc( + "Total number of factors p (including the constant $f_1$). " + "Must be greater or equal 3 to satisfy the structural constraints" + ), + ] = 5, + hidden_size: Annotated[ + int, + Doc("Number of neurons per hidden layer"), + ] = 32, + num_hidden_layers: Annotated[ + int, + Doc("Number of hidden layers L - 2 (default 3 gives L=5 total)"), + ] = 3, + extra_features: Annotated[ + int, + Doc( + "Number of additional observable features X beyond (M, tau)," + " e.g. time-to-earnings-announcement" + ), + ] = 0, + ) -> None: + super().__init__() + if num_factors < 3: + raise ValueError( + "num_factors must be at least 3 (constant + ttm + moneyness)" + ) + self.num_factors = num_factors + self.extra_features = extra_features + + # f_2: input is tau (+ optional extra features X) + self.subnet_ttm = _make_subnet( + input_size=1 + extra_features, + hidden_size=hidden_size, + num_hidden_layers=num_hidden_layers, + output_size=1, + ) + + # f_3: input is M only (moneyness, no extra features by design) + self.subnet_moneyness = _make_subnet( + input_size=1, + hidden_size=hidden_size, + num_hidden_layers=num_hidden_layers, + output_size=1, + ) + + # f_4 ... f_p: input is (M, tau) + optional extra features X + num_joint = num_factors - 3 + if num_joint > 0: + self.subnet_joint: nn.Module = _make_subnet( + input_size=2 + extra_features, + hidden_size=hidden_size, + num_hidden_layers=num_hidden_layers, + output_size=num_joint, + ) + else: + self.subnet_joint = nn.Identity() + + def to_weights(self) -> DIVFMWeights: + """Extract network weights into a + [DIVFMWeights][quantflow.options.divfm.weights.DIVFMWeights] instance + for torch-free inference.""" + return DIVFMWeights( + subnet_ttm=_extract_subnet(self.subnet_ttm), + subnet_moneyness=_extract_subnet(self.subnet_moneyness), + subnet_joint=( + _extract_subnet(self.subnet_joint) # type: ignore[arg-type] + if self.num_factors > 3 + else None + ), + num_factors=self.num_factors, + extra_features=self.extra_features, + ) + + def forward( + self, + moneyness_ttm: Annotated[ + torch.Tensor, + Doc("Shape (N,). Time-scaled moneyness M = log(K/F) / sqrt(tau)"), + ], + ttm: Annotated[ + torch.Tensor, + Doc("Shape (N,). Time-to-maturity tau in years"), + ], + extra: Annotated[ + torch.Tensor | None, + Doc("Shape (N, extra_features) or None. Additional observable features X"), + ] = None, + ) -> torch.Tensor: + """Compute factor values for a batch of options. + + Returns shape (N, num_factors) with factor values [f_1, f_2, ..., f_p]. + """ + N = moneyness_ttm.shape[0] + + # f_1 = 1 + f1 = torch.ones(N, 1, device=moneyness_ttm.device, dtype=moneyness_ttm.dtype) + + # f_2(tau [, X]) + ttm_input = ttm.unsqueeze(1) + if extra is not None: + ttm_input = torch.cat([ttm_input, extra], dim=1) + f2 = self.subnet_ttm(ttm_input) + + # f_3(M) + f3 = self.subnet_moneyness(moneyness_ttm.unsqueeze(1)) + + parts = [f1, f2, f3] + + # f_4 ... f_p (M, tau [, X]) + if self.num_factors > 3: + joint_input = torch.cat( + [moneyness_ttm.unsqueeze(1), ttm.unsqueeze(1)] + + ([extra] if extra is not None else []), + dim=1, + ) + parts.append(self.subnet_joint(joint_input)) # type: ignore[arg-type] + + return torch.cat(parts, dim=1) diff --git a/quantflow/options/divfm/pricer.py b/quantflow/options/divfm/pricer.py new file mode 100644 index 0000000..0f4d576 --- /dev/null +++ b/quantflow/options/divfm/pricer.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +from typing import Any + +import numpy as np +from pydantic import Field + +from quantflow.utils.types import FloatArray + +from ..bs import black_call +from ..pricer import MaturityPricer, OptionPricerBase +from .weights import DIVFMWeights + + +class DIVFMPricer(OptionPricerBase): + r"""Option pricer based on the Deep Implied Volatility Factor Model (DIVFM). + + The IV surface on a given day is modelled as a linear combination of p + fixed latent functions learned by a neural network: + + \begin{equation} + \sigma_t\left(m, \tau\right) = f\left(m, \tau; theta\right) \dot \beta_t + \end{equation} + + where M = log(K/F) / sqrt(tau) is the time-scaled moneyness, f is + implemented by [DIVFMWeights][quantflow.options.divfm.weights.DIVFMWeights], + and beta_t are daily coefficients computed in closed form via OLS. + + Call prices are derived from the IV surface via Black-Scholes. + + Usage + ----- + 1. Train a [DIVFMNetwork][quantflow.options.divfm.network.DIVFMNetwork] + and call its ``to_weights()`` method to obtain a + [DIVFMWeights][quantflow.options.divfm.weights.DIVFMWeights] instance. + 2. Construct this pricer with those weights. + 3. Call [calibrate][quantflow.options.divfm.pricer.DIVFMPricer.calibrate] + with the day's observed implied volatilities to fit beta_t. + 4. Use [maturity][quantflow.options.pricer.OptionPricerBase.maturity], + [price][quantflow.options.pricer.OptionPricerBase.price] etc. as normal. + """ + + weights: DIVFMWeights = Field( + description=( + "Extracted weights of the trained DIVFM network." + " No torch dependency required at inference time" + ) + ) + betas: FloatArray = Field( + default_factory=lambda: np.zeros(5), + description="Daily OLS factor loadings beta_t, shape (num_factors,)", + ) + extra: FloatArray | None = Field( + default=None, + description=( + "Current day's observable features X, shape (extra_features,)." + " Broadcast across all grid points in _compute_maturity." + " Set automatically by calibrate() when extra is provided" + ), + ) + max_moneyness_ttm: float = Field( + default=3.0, + description="Max time-scaled moneyness |M| used to build the pricing grid", + ) + n: int = Field( + default=100, description="Number of grid points along the moneyness axis" + ) + + model_config = {"arbitrary_types_allowed": True} + + def calibrate( + self, + moneyness_ttm: FloatArray, + ttm: FloatArray, + implied_vols: FloatArray, + extra: FloatArray | None = None, + ) -> None: + """Fit daily OLS coefficients from observed implied volatilities. + + Given a set of options observed on a single day, computes the + closed-form OLS estimate: + + beta_t = (F^T F)^{-1} F^T IV_t + + where F is the (N, p) matrix of factor values from the network. + + Parameters + ---------- + moneyness_ttm: + Shape (N,). Time-scaled moneyness M = log(K/F) / sqrt(tau). + ttm: + Shape (N,). Time-to-maturity tau in years. + implied_vols: + Shape (N,). Observed implied volatilities. + extra: + Shape (N, extra_features) or None. Additional features passed to + the network (e.g. time-to-earnings-announcement). + """ + extra_arr = np.asarray(extra, dtype=np.float32) if extra is not None else None + F = self.weights.forward( + np.asarray(moneyness_ttm, dtype=np.float32), + np.asarray(ttm, dtype=np.float32), + extra_arr, + ) + self.betas = np.linalg.lstsq(F, implied_vols, rcond=None)[0] + # Store the mean X across options as the day-level representative value + # used when pricing on a grid in _compute_maturity + self.extra = ( + extra_arr.mean(axis=0, keepdims=True) if extra_arr is not None else None + ) + self.reset() + + def _compute_maturity(self, ttm: float, **kwargs: Any) -> MaturityPricer: + """Compute a MaturityPricer for the given TTM using the fitted IV surface.""" + M_grid = np.linspace( + -self.max_moneyness_ttm, self.max_moneyness_ttm, self.n, dtype=np.float32 + ) + ttm_arr = np.full_like(M_grid, ttm) + # Broadcast day-level X across all grid points + extra_grid = ( + np.repeat(self.extra, self.n, axis=0) if self.extra is not None else None + ) + F = self.weights.forward(M_grid, ttm_arr, extra_grid) + + iv_grid = np.clip(F @ self.betas, 1e-6, None) + + # Convert from time-scaled moneyness M to log-moneyness m = log(K/F) + moneyness = M_grid * np.sqrt(ttm) + call = np.asarray(black_call(moneyness, iv_grid, ttm=ttm)) + + return MaturityPricer( + ttm=ttm, + std=float(np.mean(iv_grid) * np.sqrt(ttm)), + moneyness=moneyness, + call=call, + name="DIVFM", + ) diff --git a/quantflow/options/divfm/trainer.py b/quantflow/options/divfm/trainer.py new file mode 100644 index 0000000..5524916 --- /dev/null +++ b/quantflow/options/divfm/trainer.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import random +from dataclasses import dataclass +from typing import Sequence + +import numpy as np +import torch +from typing_extensions import Annotated, Doc + +from .network import DIVFMNetwork +from .weights import DIVFMWeights + + +@dataclass +class DayData: + """Option data for a single trading day. + + Used as the unit of input for + [DIVFMTrainer][quantflow.options.divfm.trainer.DIVFMTrainer]. + Each instance holds all options observed on one day. + """ + + moneyness_ttm: np.ndarray + """Shape (N,). Time-scaled moneyness M = log(K/F) / sqrt(tau).""" + ttm: np.ndarray + """Shape (N,). Time-to-maturity tau in years.""" + implied_vols: np.ndarray + """Shape (N,). Observed implied volatilities.""" + extra: np.ndarray | None = None + """Shape (N, extra_features) or None. Additional observable features X.""" + + +def _day_loss( + network: DIVFMNetwork, + day: DayData, + ridge: float, +) -> torch.Tensor: + """OLS residual loss for a single day (differentiable w.r.t. network weights). + + Computes the closed-form OLS estimate of beta_t via the normal equations + with a small ridge penalty for numerical stability, then returns the + squared residual norm ||IV - F @ beta||^2. + """ + M = torch.tensor(day.moneyness_ttm, dtype=torch.float32) + T = torch.tensor(day.ttm, dtype=torch.float32) + IV = torch.tensor(day.implied_vols, dtype=torch.float32) + extra = ( + torch.tensor(day.extra, dtype=torch.float32) if day.extra is not None else None + ) + + F = network(M, T, extra) # (N, p) + + # Normal equations with ridge: beta = (F^T F + ridge * I)^{-1} F^T IV + # torch.linalg.lstsq does not support autograd, so we use solve() instead. + p = F.shape[1] + FtF = F.T @ F + ridge * torch.eye(p, dtype=torch.float32) + beta = torch.linalg.solve(FtF, F.T @ IV) # (p,) + + residual = IV - F @ beta # (N,) + return (residual**2).sum() + + +class DIVFMTrainer: + """Training loop for [DIVFMNetwork][quantflow.options.divfm.network.DIVFMNetwork]. + + Implements the mini-batch procedure from Gauthier, Godin & Legros (2025): + at each gradient step a random subset of days is sampled from the training + set, OLS factor loadings are computed in closed form for each day, and the + network weights theta are updated to minimise the total IV residual. + + The OLS step is fully differentiable via the normal equations, so gradients + flow through beta_t back into the network parameters theta. + """ + + def __init__( + self, + network: Annotated[DIVFMNetwork, Doc("The network to train")], + lr: Annotated[float, Doc("Adam learning rate")] = 1e-3, + batch_days: Annotated[ + int, + Doc("Number of days sampled per gradient step (J=64 in the paper)"), + ] = 64, + weight_decay: Annotated[float, Doc("L2 regularisation for Adam")] = 0.0, + ridge: Annotated[ + float, + Doc( + "Ridge penalty added to F^T F before solving the normal equations," + " for numerical stability" + ), + ] = 1e-6, + ) -> None: + self.network = network + self.batch_days = batch_days + self.ridge = ridge + self.optimizer = torch.optim.Adam( + network.parameters(), lr=lr, weight_decay=weight_decay + ) + + def step( + self, + days: Annotated[ + Sequence[DayData], + Doc("Pool of training days to sample from"), + ], + ) -> float: + """Perform a single gradient update step. + + Samples ``batch_days`` distinct days, computes the OLS loss for each, + and updates the network weights. + + Returns the total loss for this step. + """ + self.network.train() + batch = random.sample(list(days), min(self.batch_days, len(days))) + + self.optimizer.zero_grad() + loss: torch.Tensor = sum( # type: ignore[assignment] + _day_loss(self.network, day, self.ridge) for day in batch + ) + loss.backward() # type: ignore[no-untyped-call] + self.optimizer.step() + return loss.detach().item() + + def evaluate( + self, + days: Annotated[Sequence[DayData], Doc("Days to evaluate on")], + ) -> float: + """Compute the average per-day loss without updating weights.""" + if not days: + return 0.0 + self.network.eval() + total = 0.0 + with torch.no_grad(): + for day in days: + total += float(_day_loss(self.network, day, self.ridge)) + return total / len(days) + + def fit( + self, + days: Annotated[Sequence[DayData], Doc("Training days")], + num_steps: Annotated[ + int, + Doc("Number of gradient update steps"), + ] = 1000, + val_days: Annotated[ + Sequence[DayData] | None, + Doc("Optional validation days for loss monitoring"), + ] = None, + log_every: Annotated[ + int, + Doc("Print a progress line every this many steps (0 to disable)"), + ] = 100, + ) -> list[float]: + """Train the network for ``num_steps`` gradient steps. + + At each step, ``batch_days`` distinct days are sampled from ``days``, + following the mini-batch procedure described in the paper. + + Returns the list of per-step training losses. + """ + losses: list[float] = [] + for step_idx in range(1, num_steps + 1): + loss = self.step(days) + losses.append(loss) + + if log_every and step_idx % log_every == 0: + msg = f"step {step_idx}/{num_steps} loss={loss:.6f}" + if val_days is not None: + val_loss = self.evaluate(val_days) + msg += f" val_loss={val_loss:.6f}" + print(msg) + + return losses + + def to_weights(self) -> DIVFMWeights: + """Extract the trained network into a + [DIVFMWeights][quantflow.options.divfm.weights.DIVFMWeights] instance + ready for torch-free inference.""" + self.network.eval() + return self.network.to_weights() diff --git a/quantflow/options/divfm/weights.py b/quantflow/options/divfm/weights.py new file mode 100644 index 0000000..973fb3c --- /dev/null +++ b/quantflow/options/divfm/weights.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +import numpy as np +from pydantic import BaseModel, Field + +from quantflow.utils.types import FloatArray + + +def _sigmoid(x: FloatArray) -> FloatArray: + return 1.0 / (1.0 + np.exp(-x)) + + +def _apply_subnet(x: FloatArray, layers: list[LayerWeights]) -> FloatArray: + """Numpy forward pass through a subnet (eval mode: uses running BN statistics).""" + for layer in layers: + x = x @ layer.weight.T + layer.bias + if layer.apply_activation: + x = _sigmoid(x) + # Batch norm eval mode: normalize with running statistics + x = (x - layer.bn_mean) / np.sqrt(layer.bn_var + layer.bn_eps) + if layer.bn_gamma is not None and layer.bn_beta is not None: + x = layer.bn_gamma * x + layer.bn_beta + return x + + +class LayerWeights(BaseModel, arbitrary_types_allowed=True): + """Weights for a single linear layer with batch normalization. + + Combines the linear transform, optional sigmoid activation, and + batch normalization into one unit matching the structure of each + block in [DIVFMNetwork][quantflow.options.divfm.network.DIVFMNetwork]. + """ + + weight: FloatArray = Field(description="Linear weight matrix, shape (out, in)") + bias: FloatArray = Field(description="Linear bias vector, shape (out,)") + bn_mean: FloatArray = Field(description="Batch norm running mean, shape (out,)") + bn_var: FloatArray = Field(description="Batch norm running variance, shape (out,)") + bn_gamma: FloatArray | None = Field( + default=None, + description=( + "Batch norm learnable scale (gamma), shape (out,)." + " None for fixed (affine=False) output normalization" + ), + ) + bn_beta: FloatArray | None = Field( + default=None, + description=( + "Batch norm learnable shift (beta), shape (out,)." + " None for fixed (affine=False) output normalization" + ), + ) + bn_eps: float = Field( + default=1e-5, description="Batch norm epsilon for numerical stability" + ) + apply_activation: bool = Field( + default=True, + description=( + "Whether to apply sigmoid activation before batch norm." + " True for hidden layers, False for the output layer" + ), + ) + + +class SubnetWeights(BaseModel): + """Extracted weights for one sub-network (hidden layers + output layer).""" + + layers: list[LayerWeights] = Field( + description="Ordered list of layer weights from input to output" + ) + + def forward(self, x: FloatArray) -> FloatArray: + """Run the numpy forward pass for this subnet.""" + return _apply_subnet(x, self.layers) + + +class DIVFMWeights(BaseModel): + """Extracted weights of a trained + [DIVFMNetwork][quantflow.options.divfm.network.DIVFMNetwork]. + + Implements the full network forward pass in pure numpy so that + [DIVFMPricer][quantflow.options.divfm.pricer.DIVFMPricer] has no + torch dependency at inference time. + + Obtain an instance from a trained network via + [DIVFMNetwork.to_weights][quantflow.options.divfm.network.DIVFMNetwork.to_weights]. + """ + + subnet_ttm: SubnetWeights = Field( + description="Weights for the time-to-maturity sub-network (f_2)" + ) + subnet_moneyness: SubnetWeights = Field( + description="Weights for the moneyness sub-network (f_3)" + ) + subnet_joint: SubnetWeights | None = Field( + default=None, + description=( + "Weights for the joint (M, tau) sub-network (f_4 ... f_p)." + " None when num_factors == 3" + ), + ) + num_factors: int = Field( + description="Total number of factors p (including the constant f_1)" + ) + extra_features: int = Field( + default=0, + description="Number of additional observable features X beyond (M, tau)", + ) + + def forward( + self, + moneyness_ttm: FloatArray, + ttm: FloatArray, + extra: FloatArray | None = None, + ) -> FloatArray: + """Compute factor values for a batch of options. + + Parameters + ---------- + moneyness_ttm: + Shape (N,). Time-scaled moneyness M = log(K/F) / sqrt(tau). + ttm: + Shape (N,). Time-to-maturity tau in years. + extra: + Shape (N, extra_features) or None. Additional observable features. + + Returns + ------- + FloatArray + Shape (N, num_factors). Factor values [f_1, f_2, ..., f_p]. + """ + N = len(moneyness_ttm) + + # f_1 = 1 + f1 = np.ones((N, 1), dtype=np.float32) + + # f_2(tau [, X]) + ttm_in: FloatArray = ttm[:, None] + if extra is not None: + ttm_in = np.concatenate([ttm_in, extra], axis=1) + f2 = self.subnet_ttm.forward(ttm_in) + + # f_3(M) + f3 = self.subnet_moneyness.forward(moneyness_ttm[:, None]) + + parts: list[FloatArray] = [f1, f2, f3] + + # f_4 ... f_p (M, tau [, X]) + if self.subnet_joint is not None: + joint_in: FloatArray = np.stack([moneyness_ttm, ttm], axis=1) + if extra is not None: + joint_in = np.concatenate([joint_in, extra], axis=1) + parts.append(self.subnet_joint.forward(joint_in)) + + return np.concatenate(parts, axis=1) diff --git a/quantflow/options/pricer.py b/quantflow/options/pricer.py index dff6966..6029892 100644 --- a/quantflow/options/pricer.py +++ b/quantflow/options/pricer.py @@ -1,5 +1,6 @@ from __future__ import annotations +from abc import abstractmethod from decimal import Decimal from typing import Any, Generic, Self, TypeVar, cast @@ -226,10 +227,15 @@ def plot(self, series: str = "implied_vol", **kwargs: Any) -> Any: return plot.plot_vol_cross(self.df, series=series, **kwargs) -class OptionPricer(BaseModel, Generic[M], arbitrary_types_allowed=True): - """Pricer for options""" +class OptionPricerBase(BaseModel, arbitrary_types_allowed=True): + """Abstract base class for option pricers. + + Provides caching of [MaturityPricer][quantflow.options.pricer.MaturityPricer] + results and generic pricing/plotting methods. Subclasses implement + ``_compute_maturity`` to define how call prices are computed for a given + time to maturity. + """ - model: M = Field(description="Stochastic process model for the underlying") ttm: dict[int, MaturityPricer] = Field( default_factory=dict, repr=False, @@ -239,35 +245,27 @@ class OptionPricer(BaseModel, Generic[M], arbitrary_types_allowed=True): "for different time to maturity" ), ) - n: int = Field( - default=128, - description="Number of discretization points for the marginal distribution", - ) - max_moneyness_ttm: float = Field( - default=1.5, description="Max time-adjusted moneyness to calculate prices" - ) + + @abstractmethod + def _compute_maturity(self, ttm: float, **kwargs: Any) -> MaturityPricer: + """Compute a [MaturityPricer][quantflow.options.pricer.MaturityPricer] + for the given time to maturity. + + Called by [maturity][quantflow.options.pricer.OptionPricerBase.maturity] + on a cache miss. Subclasses must implement this method. + """ def reset(self) -> None: - """Clear the [ttm][quantflow.options.pricer.OptionPricer.ttm] cache""" + """Clear the [ttm][quantflow.options.pricer.OptionPricerBase.ttm] cache""" self.ttm.clear() def maturity(self, ttm: float, **kwargs: Any) -> MaturityPricer: """Get a [MaturityPricer][quantflow.options.pricer.MaturityPricer] - from cache or create a new one and return it""" + from cache or compute a new one and return it""" ttm_int = int(TTM_FACTOR * ttm) if ttm_int not in self.ttm: ttmr = ttm_int / TTM_FACTOR - marginal = self.model.marginal(ttmr) - transform = marginal.call_option( - self.n, max_moneyness=self.max_moneyness_ttm * np.sqrt(ttmr), **kwargs - ) - self.ttm[ttm_int] = MaturityPricer( - ttm=ttmr, - std=float(np.max(marginal.std())), - moneyness=transform.x, - call=transform.y, - name=type(self.model).__name__, - ) + self.ttm[ttm_int] = self._compute_maturity(ttmr, **kwargs) return self.ttm[ttm_int] def price( @@ -333,3 +331,33 @@ def plot3d( z=implied, **properties, ) + + +class OptionPricer(OptionPricerBase, Generic[M]): + """Pricer for options based on a stochastic process model. + + Computes call prices via the inverse Fourier transform of the + characteristic function of the underlying stochastic process. + """ + + model: M = Field(description="Stochastic process model for the underlying") + n: int = Field( + default=128, + description="Number of discretization points for the marginal distribution", + ) + max_moneyness_ttm: float = Field( + default=1.5, description="Max time-adjusted moneyness to calculate prices" + ) + + def _compute_maturity(self, ttm: float, **kwargs: Any) -> MaturityPricer: + marginal = self.model.marginal(ttm) + transform = marginal.call_option( + self.n, max_moneyness=self.max_moneyness_ttm * np.sqrt(ttm), **kwargs + ) + return MaturityPricer( + ttm=ttm, + std=float(np.max(marginal.std())), + moneyness=transform.x, + call=transform.y, + name=type(self.model).__name__, + ) diff --git a/quantflow_tests/test_divfm.py b/quantflow_tests/test_divfm.py new file mode 100644 index 0000000..2582c9e --- /dev/null +++ b/quantflow_tests/test_divfm.py @@ -0,0 +1,296 @@ +from __future__ import annotations + +import numpy as np +import pytest + +from quantflow.options.divfm import DIVFMPricer, DIVFMWeights +from quantflow.options.divfm.weights import LayerWeights, SubnetWeights +from quantflow.options.pricer import OptionPricerBase + +try: + import torch + + from quantflow.options.divfm.network import DIVFMNetwork + from quantflow.options.divfm.trainer import DayData, DIVFMTrainer + + has_torch = True +except ImportError: + has_torch = False + + +NUM_FACTORS = 5 +HIDDEN_SIZE = 16 +NUM_HIDDEN = 2 +N = 40 # number of synthetic options + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_layer( + in_size: int, + out_size: int, + apply_activation: bool = True, +) -> LayerWeights: + """LayerWeights with zero linear weights (output = 0 before BN).""" + return LayerWeights( + weight=np.zeros((out_size, in_size), dtype=np.float32), + bias=np.zeros(out_size, dtype=np.float32), + bn_mean=np.zeros(out_size, dtype=np.float32), + bn_var=np.ones(out_size, dtype=np.float32), + bn_gamma=np.ones(out_size, dtype=np.float32), + bn_beta=np.zeros(out_size, dtype=np.float32), + apply_activation=apply_activation, + ) + + +def _make_subnet(input_size: int, output_size: int) -> SubnetWeights: + layers = [] + in_size = input_size + for _ in range(NUM_HIDDEN): + layers.append(_make_layer(in_size, HIDDEN_SIZE, apply_activation=True)) + in_size = HIDDEN_SIZE + layers.append(_make_layer(in_size, output_size, apply_activation=False)) + return SubnetWeights(layers=layers) + + +@pytest.fixture +def weights() -> DIVFMWeights: + return DIVFMWeights( + subnet_ttm=_make_subnet(1, 1), + subnet_moneyness=_make_subnet(1, 1), + subnet_joint=_make_subnet(2, NUM_FACTORS - 3), + num_factors=NUM_FACTORS, + ) + + +@pytest.fixture +def pricer(weights: DIVFMWeights) -> DIVFMPricer: + return DIVFMPricer(weights=weights) + + +def _synthetic_options( + n: int = N, + iv: float = 0.3, +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """Synthetic option data with constant implied vol.""" + rng = np.random.default_rng(42) + moneyness_ttm = rng.uniform(-2.0, 2.0, n).astype(np.float32) + ttm = rng.uniform(0.1, 2.0, n).astype(np.float32) + implied_vols = np.full(n, iv, dtype=np.float64) + return moneyness_ttm, ttm, implied_vols + + +# --------------------------------------------------------------------------- +# DIVFMWeights tests (no torch required) +# --------------------------------------------------------------------------- + + +def test_weights_is_pricer_base(pricer: DIVFMPricer) -> None: + assert isinstance(pricer, OptionPricerBase) + + +def test_forward_shape(weights: DIVFMWeights) -> None: + moneyness_ttm, ttm, _ = _synthetic_options() + F = weights.forward(moneyness_ttm, ttm) + assert F.shape == (N, NUM_FACTORS) + + +def test_first_factor_is_constant_one(weights: DIVFMWeights) -> None: + moneyness_ttm, ttm, _ = _synthetic_options() + F = weights.forward(moneyness_ttm, ttm) + np.testing.assert_array_equal(F[:, 0], np.ones(N)) + + +def test_calibrate_constant_iv(pricer: DIVFMPricer) -> None: + target_iv = 0.3 + moneyness_ttm, ttm, implied_vols = _synthetic_options(iv=target_iv) + pricer.calibrate(moneyness_ttm, ttm, implied_vols) + + # With zero-weight subnets, all non-constant factors collapse to 0, + # so the model reduces to sigma = beta_1 * 1 = mean(IV) + assert pricer.betas[0] == pytest.approx(target_iv, rel=1e-4) + np.testing.assert_array_almost_equal(pricer.betas[1:], 0.0) + + +def test_maturity_after_calibrate(weights: DIVFMWeights) -> None: + # Use a tighter moneyness range so Black-Scholes IV inversion is reliable + pricer = DIVFMPricer(weights=weights, max_moneyness_ttm=1.5, n=50) + moneyness_ttm, ttm, implied_vols = _synthetic_options(iv=0.3) + pricer.calibrate(moneyness_ttm, ttm, implied_vols) + + mp = pricer.maturity(0.5) + assert mp.ttm == pytest.approx(0.5, rel=1e-3) + assert len(mp.moneyness) == pricer.n + assert len(mp.call) == pricer.n + # call prices must be non-negative + assert np.all(mp.call >= 0.0) + # implied vols should be close to the calibrated constant in the central region + central = np.abs(mp.moneyness) < 0.5 + assert np.allclose(mp.implied_vols[central], 0.3, atol=1e-3) + + +def test_maturity_cache(pricer: DIVFMPricer) -> None: + moneyness_ttm, ttm, implied_vols = _synthetic_options() + pricer.calibrate(moneyness_ttm, ttm, implied_vols) + + mp1 = pricer.maturity(0.25) + mp2 = pricer.maturity(0.25) + assert mp1 is mp2 # same object from cache + + +def test_calibrate_with_extra(weights: DIVFMWeights) -> None: + """Extra features are stored and propagated to the pricing grid.""" + extra_features = 1 + subnet_ttm = _make_subnet(1 + extra_features, 1) + subnet_joint = _make_subnet(2 + extra_features, NUM_FACTORS - 3) + w = DIVFMWeights( + subnet_ttm=subnet_ttm, + subnet_moneyness=_make_subnet(1, 1), + subnet_joint=subnet_joint, + num_factors=NUM_FACTORS, + extra_features=extra_features, + ) + pricer = DIVFMPricer(weights=w, max_moneyness_ttm=1.5, n=20) + moneyness_ttm, ttm, implied_vols = _synthetic_options() + extra = np.zeros((N, extra_features), dtype=np.float32) + + pricer.calibrate(moneyness_ttm, ttm, implied_vols, extra=extra) + + assert pricer.extra is not None + assert pricer.extra.shape == (1, extra_features) + + # Maturity must compute without error when extra is set + mp = pricer.maturity(0.5) + assert len(mp.call) == pricer.n + + +def test_reset_clears_cache(pricer: DIVFMPricer) -> None: + moneyness_ttm, ttm, implied_vols = _synthetic_options() + pricer.calibrate(moneyness_ttm, ttm, implied_vols) + + pricer.maturity(0.25) + assert len(pricer.ttm) == 1 + pricer.reset() + assert len(pricer.ttm) == 0 + + +# --------------------------------------------------------------------------- +# DIVFMNetwork tests (requires torch) +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_network_default_construction() -> None: + net = DIVFMNetwork() + assert net.num_factors == 5 + assert net.extra_features == 0 + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_network_forward_shape() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + net.eval() + M = torch.zeros(N) + T = torch.ones(N) * 0.5 + out = net(M, T) + assert out.shape == (N, NUM_FACTORS) + assert (out[:, 0] == 1.0).all() # f_1 = 1 + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_to_weights_forward_matches_network() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + net.eval() + + rng = np.random.default_rng(0) + M_np = rng.uniform(-2, 2, N).astype(np.float32) + T_np = rng.uniform(0.1, 2.0, N).astype(np.float32) + + with torch.no_grad(): + torch_out = net(torch.tensor(M_np), torch.tensor(T_np)).numpy() + + w = net.to_weights() + numpy_out = w.forward(M_np, T_np) + + np.testing.assert_allclose(torch_out, numpy_out, atol=1e-5) + + +# --------------------------------------------------------------------------- +# DIVFMTrainer tests (requires torch) +# --------------------------------------------------------------------------- + + +def _make_days(num_days: int = 20, iv: float = 0.3) -> list[DayData]: + """Synthetic training set with a constant IV surface.""" + rng = np.random.default_rng(7) + days = [] + for _ in range(num_days): + n = rng.integers(20, 60) + days.append( + DayData( + moneyness_ttm=rng.uniform(-2.0, 2.0, n).astype(np.float32), + ttm=rng.uniform(0.1, 2.0, n).astype(np.float32), + implied_vols=np.full(n, iv, dtype=np.float64), + ) + ) + return days + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_trainer_construction() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + trainer = DIVFMTrainer(net, lr=1e-3, batch_days=8) + assert trainer.batch_days == 8 + assert trainer.network is net + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_trainer_step_returns_loss() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + trainer = DIVFMTrainer(net, batch_days=4) + days = _make_days(num_days=10) + loss = trainer.step(days) + assert isinstance(loss, float) + assert loss >= 0.0 + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_trainer_evaluate() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + trainer = DIVFMTrainer(net) + days = _make_days(num_days=5) + val_loss = trainer.evaluate(days) + assert isinstance(val_loss, float) + assert val_loss >= 0.0 + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_trainer_fit_loss_decreases() -> None: + """Loss should decrease over training steps on a simple constant-IV surface.""" + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + trainer = DIVFMTrainer(net, lr=1e-2, batch_days=8) + days = _make_days(num_days=30) + losses = trainer.fit(days, num_steps=50, log_every=0) + assert len(losses) == 50 + # Average loss over the last 10 steps should be lower than the first 10 + assert np.mean(losses[-10:]) < np.mean(losses[:10]) + + +@pytest.mark.skipif(not has_torch, reason="torch not installed") +def test_trainer_to_weights_produces_pricer() -> None: + net = DIVFMNetwork(num_factors=NUM_FACTORS, hidden_size=HIDDEN_SIZE) + trainer = DIVFMTrainer(net, batch_days=4) + days = _make_days(num_days=10) + trainer.fit(days, num_steps=5, log_every=0) + + weights = trainer.to_weights() + assert isinstance(weights, DIVFMWeights) + + pricer = DIVFMPricer(weights=weights, max_moneyness_ttm=1.5, n=20) + day = days[0] + pricer.calibrate(day.moneyness_ttm, day.ttm, day.implied_vols) + mp = pricer.maturity(0.5) + assert len(mp.call) == pricer.n From 5ad1c7a55495701df295bbb5184dc429fdbfa2b2 Mon Sep 17 00:00:00 2001 From: Luca Date: Sun, 5 Apr 2026 12:06:24 +0100 Subject: [PATCH 2/2] Adheston vol --- app/gaussian_sampling.py | 2 +- app/heston_vol_surface.py | 119 +++++++++++++++++ notebooks/applications/hurst.md | 202 ----------------------------- notebooks/applications/overview.md | 23 ---- notebooks/applications/sampling.md | 35 ----- quantflow/sp/base.py | 10 +- quantflow/utils/distributions.py | 7 +- 7 files changed, 131 insertions(+), 267 deletions(-) create mode 100644 app/heston_vol_surface.py delete mode 100644 notebooks/applications/hurst.md delete mode 100644 notebooks/applications/overview.md delete mode 100644 notebooks/applications/sampling.md diff --git a/app/gaussian_sampling.py b/app/gaussian_sampling.py index 8cbf68f..465ceea 100644 --- a/app/gaussian_sampling.py +++ b/app/gaussian_sampling.py @@ -1,4 +1,4 @@ -import marimo + import marimo __generated_with = "0.19.7" app = marimo.App(width="medium") diff --git a/app/heston_vol_surface.py b/app/heston_vol_surface.py new file mode 100644 index 0000000..cfcacc1 --- /dev/null +++ b/app/heston_vol_surface.py @@ -0,0 +1,119 @@ +import marimo + +__generated_with = "0.22.0" +app = marimo.App(width="medium") + + +@app.cell +def _(): + import marimo as mo + from app.utils import nav_menu + nav_menu() + return (mo,) + + +@app.cell(hide_code=True) +def _(mo): + mo.md(r""" + ## Jump Diffusion + """) + return + + +@app.cell +def _(mo): + from quantflow.sp.jump_diffusion import JumpDiffusion + from quantflow.utils.distributions import DoubleExponential + + jump_fraction = mo.ui.slider(start=0.1, stop=0.9, step=0.05, value=0.5, debounce=True, label="Jump Fraction") + jump_intensity = mo.ui.slider(start=10, stop=100, step=5, debounce=True, label="Jump Intensity") + jump_asymmetry = mo.ui.slider(start=-2, stop=2, step=0.1, value=0, debounce=True, label="Jump Asymmetry") + jump_controls = mo.vstack([ + jump_fraction, jump_intensity, jump_asymmetry + ]) + jump_controls + return ( + DoubleExponential, + JumpDiffusion, + jump_asymmetry, + jump_fraction, + jump_intensity, + ) + + +@app.cell +def _( + DoubleExponential, + JumpDiffusion, + OptionPricer, + jump_asymmetry, + jump_fraction, + jump_intensity, +): + jd = JumpDiffusion.create( + DoubleExponential, + jump_fraction=jump_fraction.value, + jump_intensity=jump_intensity.value, + jump_asymmetry=jump_asymmetry.value, + ) + OptionPricer(model=jd).maturity(0.01).plot() + return (jd,) + + +@app.cell +def _(jd): + jd.model_dump() + return + + +@app.cell +def _(): + return + + +@app.cell(hide_code=True) +def _(mo): + mo.md(r""" + ## Heston Volatility Surface + """) + return + + +@app.cell +def _(mo): + from quantflow.options.pricer import OptionPricer + from quantflow.sp.heston import HestonJ + + sigma = mo.ui.slider(start=0.1, stop=2, step=0.1, debounce=True, label="vol of vol") + sigma + return HestonJ, OptionPricer, sigma + + +@app.cell +def _(DoubleExponential, HestonJ, jump_asymmetry, jump_fraction, sigma): + hj = HestonJ.create( + DoubleExponential, + vol=0.5, + sigma=sigma.value, + kappa=1.0, + rho=0.0, + jump_fraction=jump_fraction.value, + jump_asymmetry=jump_asymmetry.value, + ) + return (hj,) + + +@app.cell +def _(OptionPricer, hj): + hjp = OptionPricer(model=hj) + hjp.maturity(0.5).plot() + return + + +@app.cell +def _(): + return + + +if __name__ == "__main__": + app.run() diff --git a/notebooks/applications/hurst.md b/notebooks/applications/hurst.md deleted file mode 100644 index 2d6b68e..0000000 --- a/notebooks/applications/hurst.md +++ /dev/null @@ -1,202 +0,0 @@ ---- -jupytext: - text_representation: - extension: .md - format_name: myst - format_version: 0.13 - jupytext_version: 1.16.6 -kernelspec: - display_name: .venv - language: python - name: python3 ---- - -# Hurst Exponent - -The [Hurst exponent](https://en.wikipedia.org/wiki/Hurst_exponent) is used as a measure of long-term memory of time series. It relates to the auto-correlations of the time series, and the rate at which these decrease as the lag between pairs of values increases. -It is a statistics which can be used to test if a time-series is mean reverting or it is trending. - -The idea idea behind the Hurst exponent is that if the time-series $x_t$ follows a Brownian motion (aka Weiner process), than variance between two time points will increase linearly with the time difference. that is to say - -\begin{align} - \text{Var}(x_{t_2} - x_{t_1}) &\propto t_2 - t_1 \\ - &\propto \Delta t^{2H}\\ - H &= 0.5 -\end{align} - -where $H$ is the Hurst exponent. - -Trending time-series have a Hurst exponent H > 0.5, while mean reverting time-series have H < 0.5. Understanding in which regime a time-series is can be useful for trading strategies. - -These are some references to understand the Hurst exponent and its applications: - -* [Hurst Exponent for Algorithmic Trading](https://robotwealth.com/demystifying-the-hurst-exponent-part-1/) -* [Basics of Statistical Mean Reversion Testing](https://www.quantstart.com/articles/Basics-of-Statistical-Mean-Reversion-Testing/) - -## Study with the Weiner Process - -We want to construct a mechanism to estimate the Hurst exponent via OHLC data because it is widely available from data providers and easily constructed as an online signal during trading. - -In order to evaluate results against known solutions, we consider the Weiner process as generator of timeseries. - -The Weiner process is a continuous-time stochastic process named in honor of Norbert Wiener. It is often also called Brownian motion due to its historical connection with the physical model of Brownian motion of particles in water, named after the botanist Robert Brown. - -We use the **WeinerProcess** from the stochastic process library and sample one path over a time horizon of 1 (day) with a time step every second. - -```{code-cell} ipython3 -from quantflow.sp.weiner import WeinerProcess -p = WeinerProcess(sigma=2.0) -paths = p.sample(n=1, time_horizon=1, time_steps=24*60*60) -paths.plot(title="A path of Weiner process with sigma=2.0") -``` - -In order to down-sample the timeseries, we need to convert it into a dataframe with dates as indices. - -```{code-cell} ipython3 -from quantflow.utils.dates import start_of_day -df = paths.as_datetime_df(start=start_of_day(), unit="d").reset_index() -df -``` - -### Realized Variance - -At this point we estimate the standard deviation using the **realized variance** along the path (we use the **scaled** flag so that the standard deviation is scaled by the square-root of time step, in this way it removes the dependency on the time step size). -The value should be close to the **sigma** of the WeinerProcess defined above. - -```{code-cell} ipython3 -float(paths.paths_std(scaled=True)[0]) -``` - -The evaluation of the hurst exponent is done by calculating the variance for several time windows and by fitting a line to the log-log plot of the variance vs the time window. - -```{code-cell} ipython3 -paths.hurst_exponent() -``` - -As expected, the Hurst exponent should be close to 0.5, since we have calculated the exponent from the paths of a Weiner process. - -+++ - -### Range-based Variance Estimators - -We now turn our attention to range-based variance estimators. These estimators depends on OHLC timeseries, which are widely available from data providers such as [FMP](https://site.financialmodelingprep.com/). -To analyze range-based variance estimators, we use he **quantflow.ta.OHLC** tool which allows to down-sample a timeserie to OHLC series and estimate variance with three different estimators - -* **Parkinson** (1980) -* **Garman & Klass** (1980) -* **Rogers & Satchell** (1991) - -See {cite:p}`molnar` for a detailed overview of the properties of range-based estimators. - -For this we build an OHLC estimator as template and use it to create OHLC estimators for different periods. - -```{code-cell} ipython3 -import pandas as pd -import polars as pl -import math -from quantflow.ta.ohlc import OHLC -template = OHLC(serie="0", period="10m", rogers_satchell_variance=True, parkinson_variance=True, garman_klass_variance=True) -seconds_in_day = 24*60*60 - -def rstd(pdf: pl.Series, range_seconds: float) -> float: - """Calculate the standard deviation from a range-based variance estimator""" - variance = pdf.mean() - # scale the variance by the number of seconds in the period - variance = seconds_in_day * variance / range_seconds - return math.sqrt(variance) - -results = [] -for period in ("10s", "20s", "30s", "1m", "2m", "3m", "5m", "10m", "30m"): - ohlc = template.model_copy(update=dict(period=period)) - rf = ohlc(df) - ts = pd.to_timedelta(period).to_pytimedelta().total_seconds() - data = dict(period=period) - for name in ("pk", "gk", "rs"): - estimator = rf[f"0_{name}"] - data[name] = rstd(estimator, ts) - results.append(data) -vdf = pd.DataFrame(results).set_index("period") -vdf -``` - -These numbers are different from the realized variance because they are based on the range of the prices, not on the actual prices. The realized variance is a more direct measure of the volatility of the process, while the range-based estimators are more robust to market microstructure noise. - -The Parkinson estimator is always higher than both the Garman-Klass and Rogers-Satchell estimators, the reason is due to the use of the high and low prices only, which are always further apart than the open and close prices. The GK and RS estimators are similar and are more accurate than the Parkinson estimator, especially for greater periods. - -```{code-cell} ipython3 -pd.options.plotting.backend = "plotly" -fig = vdf.plot(markers=True, title="Weiner Standard Deviation from Range-based estimators - correct value is 2.0") -fig.show() -``` - -To estimate the Hurst exponent with the range-based estimators, we calculate the variance of the log of the range for different time windows and fit a line to the log-log plot of the variance vs the time window. - -```{code-cell} ipython3 -from typing import Sequence -import numpy as np -from quantflow.ta.ohlc import OHLC -from collections import defaultdict -from quantflow.ta.base import DataFrame - -default_periods = ("10s", "20s", "30s", "1m", "2m", "3m", "5m", "10m", "30m") - -def ohlc_hurst_exponent( - df: DataFrame, - series: Sequence[str], - periods: Sequence[str] = default_periods, -) -> DataFrame: - results = {} - estimator_names = ("pk", "gk", "rs") - for serie in series: - template = OHLC( - serie=serie, - period="10m", - rogers_satchell_variance=True, - parkinson_variance=True, - garman_klass_variance=True - ) - time_range = [] - estimators = defaultdict(list) - for period in periods: - ohlc = template.model_copy(update=dict(period=period)) - rf = ohlc(df) - ts = pd.to_timedelta(period).to_pytimedelta().total_seconds() - time_range.append(ts) - for name in estimator_names: - estimators[name].append(rf[f"{serie}_{name}"].mean()) - results[serie] = [float(np.polyfit(np.log(time_range), np.log(estimators[name]), 1)[0])/2.0 for name in estimator_names] - return pd.DataFrame(results, index=estimator_names) -``` - -```{code-cell} ipython3 -ohlc_hurst_exponent(df, series=["0"]) -``` - -The Hurst exponent should be close to 0.5, since we have calculated the exponent from the paths of a Weiner process. But the Hurst exponent is not exactly 0.5 because the range-based estimators are not the same as the realized variance. Interestingly, the Parkinson estimator gives a Hurst exponent closer to 0.5 than the Garman-Klass and Rogers-Satchell estimators. - -## Mean Reverting Time Series - -We now turn our attention to mean reverting time series, where the Hurst exponent is less than 0.5. - -```{code-cell} ipython3 -from quantflow.sp.ou import Vasicek -import pandas as pd -pd.options.plotting.backend = "plotly" - -p = Vasicek(kappa=2) -paths = {f"kappa={k}": Vasicek(kappa=k).sample(n=1, time_horizon=1, time_steps=24*60*6) for k in (1.0, 10.0, 50.0, 100.0, 500.0)} -pdf = pd.DataFrame({k: p.path(0) for k, p in paths.items()}, index=paths["kappa=1.0"].dates(start=start_of_day())) -pdf.plot() -``` - -We can now estimate the Hurst exponent from the realized variance. As we can see the Hurst exponent decreases as we increase the mean reversion parameter. - -```{code-cell} ipython3 -pd.DataFrame({k: [p.hurst_exponent()] for k, p in paths.items()}) -``` - -And we can also estimate the Hurst exponent from the range-based estimators. As we can see the Hurst exponent decreases as we increase the mean reversion parameter along the same lines as the realized variance. - -```{code-cell} ipython3 -ohlc_hurst_exponent(pdf.reset_index(), list(paths), periods=("10m", "20m", "30m", "1h")) -``` diff --git a/notebooks/applications/overview.md b/notebooks/applications/overview.md deleted file mode 100644 index 10f0567..0000000 --- a/notebooks/applications/overview.md +++ /dev/null @@ -1,23 +0,0 @@ ---- -jupytext: - text_representation: - extension: .md - format_name: myst - format_version: 0.13 - jupytext_version: 1.16.6 -kernelspec: - display_name: Python 3 (ipykernel) - language: python - name: python3 ---- - -# Applications - -Real-world applications of the library - -```{tableofcontents} -``` - -```{code-cell} - -``` diff --git a/notebooks/applications/sampling.md b/notebooks/applications/sampling.md deleted file mode 100644 index 2192c50..0000000 --- a/notebooks/applications/sampling.md +++ /dev/null @@ -1,35 +0,0 @@ ---- -jupytext: - text_representation: - extension: .md - format_name: myst - format_version: 0.13 - jupytext_version: 1.16.6 -kernelspec: - display_name: Python 3 (ipykernel) - language: python - name: python3 ---- - -# Sampling Tools - -The library use the `Paths` class for managing monte carlo paths. - -```{code-cell} -from quantflow.utils.paths import Paths - -nv = Paths.normal_draws(paths=1000, time_horizon=1, time_steps=1000) -``` - -```{code-cell} -nv.var().mean() -``` - -```{code-cell} -nv = Paths.normal_draws(paths=1000, time_horizon=1, time_steps=1000, antithetic_variates=False) -nv.var().mean() -``` - -```{code-cell} - -``` diff --git a/quantflow/sp/base.py b/quantflow/sp/base.py index 148a538..6ad59fb 100755 --- a/quantflow/sp/base.py +++ b/quantflow/sp/base.py @@ -47,14 +47,17 @@ def characteristic( t: Annotated[FloatArrayLike, Doc("Time horizon")], u: Annotated[Vector, Doc("Characteristic function input parameter")], ) -> Vector: - r"""Characteristic function at time `t` for a given input parameter + r"""Characteristic function at time `t` for a given input parameter `u` The characteristic function represents the Fourier transform of the probability density function \begin{equation} - \phi = {\mathbb E} \left[e^{i u x_t}\right] + \phi = {\mathbb E} \left[e^{i u x_t}\right] = e^{-\psi(t, u)} \end{equation} + + where $\psi$ is the characteristic exponent, which can be more easily + computed for many processes. """ return np.exp(-self.characteristic_exponent(t, u)) @@ -187,8 +190,9 @@ def integrated_log_laplace( ) -> Vector: r"""The log-Laplace transform of the cumulative process: - .. math:: + \begin{equation} e^{\phi_{t, u}} = {\mathbb E} \left[e^{i u \int_0^t x_s ds}\right] + \end{equation} """ def domain_range(self) -> Bounds: diff --git a/quantflow/utils/distributions.py b/quantflow/utils/distributions.py index 23ca705..76b7859 100644 --- a/quantflow/utils/distributions.py +++ b/quantflow/utils/distributions.py @@ -209,9 +209,10 @@ def from_moments( def characteristic(self, u: Vector) -> Vector: r"""Characteristic function of the double exponential distribution - .. math:: - \phi(u) = \frac{e^{i u m}}{\left(1 + \frac{i u \kappa}{\lambda}\right) - \left(1 - \frac{i u}{\lambda \kappa}\right)} + \begin{equation} + \phi(u) = \frac{e^{i u m}}{\left(1 + \frac{i u \kappa}{\lambda}\right) + \left(1 - \frac{i u}{\lambda \kappa}\right)} + \end{equation} """ den = (1.0 + 1j * u * self.kappa / self.decay) * ( 1.0 - 1j * u / (self.kappa * self.decay)