"""
Fractional Stochastic Differential Equation Solvers
This module provides comprehensive solvers for fractional SDEs including
various numerical methods, adaptive step size control, and error estimation.
Performance Note:
- Uses FFT-based convolution for O(N log N) history summation instead of O(N²)
- Intelligent backend selection for optimal performance
- Multi-backend support (PyTorch, JAX, NumPy/Numba)
"""
import numpy as np
import time
from typing import Union, Optional, Tuple, Callable, List, Dict, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
from scipy import fft
from ..core.definitions import FractionalOrder, DefinitionType
from .noise_models import NoiseModel
# Use adapter system for gamma function
[docs]
def _get_gamma_function():
"""Get gamma function through adapter system."""
try:
from ..special.gamma_beta import gamma_function as gamma
return gamma
except Exception:
from scipy.special import gamma
return gamma
gamma = _get_gamma_function()
# Initialize intelligent backend selector
_intelligent_selector = None
_use_intelligent_backend = True
[docs]
def _get_intelligent_selector():
"""Get intelligent backend selector instance."""
global _intelligent_selector, _use_intelligent_backend
if not _use_intelligent_backend:
return None
if _intelligent_selector is None:
try:
from ..ml.intelligent_backend_selector import IntelligentBackendSelector
_intelligent_selector = IntelligentBackendSelector(enable_learning=True)
except ImportError:
_use_intelligent_backend = False
_intelligent_selector = None
return _intelligent_selector
[docs]
def _fft_convolution(coeffs: np.ndarray, values: np.ndarray, axis: int = 0) -> np.ndarray:
"""
Fast convolution using FFT for O(N log N) performance.
Args:
coeffs: Coefficient array (1D)
values: Value array (can be 1D or 2D)
axis: Axis along which to perform convolution
Returns:
Convolution result (same shape as values)
"""
N = coeffs.shape[0]
# Pad to next power of 2 for FFT efficiency
size = int(2 ** np.ceil(np.log2(2 * N - 1)))
if values.ndim == 1:
coeffs_padded = np.zeros(size)
coeffs_padded[:N] = coeffs
values_padded = np.zeros(size)
values_padded[:N] = values
coeffs_fft = fft.fft(coeffs_padded)
values_fft = fft.fft(values_padded)
conv_result = fft.ifft(coeffs_fft * values_fft).real[:N]
return conv_result
else:
num_cols = values.shape[1]
result = np.zeros_like(values)
for col in range(num_cols):
coeffs_padded = np.zeros(size)
coeffs_padded[:N] = coeffs
values_padded = np.zeros(size)
values_padded[:N] = values[:, col]
coeffs_fft = fft.fft(coeffs_padded)
values_fft = fft.fft(values_padded)
conv_result = fft.ifft(coeffs_fft * values_fft).real[:N]
result[:, col] = conv_result
return result
[docs]
@dataclass
class SDESolution:
"""Solution object for SDE solvers."""
t: np.ndarray
y: np.ndarray
fractional_order: Union[float, FractionalOrder]
method: str
drift_func: Callable
diffusion_func: Callable
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
[docs]
class FractionalSDESolver(ABC):
"""
Base class for fractional SDE solvers.
A fractional SDE takes the form:
D^α X(t) = f(t, X(t)) dt + g(t, X(t)) dW(t)
where:
- α is the fractional order
- f is the drift function
- g is the diffusion function
- W(t) is a Wiener process
"""
[docs]
def __init__(
self,
fractional_order: Union[float, FractionalOrder],
definition: str = "caputo",
adaptive: bool = False,
rtol: float = 1e-5,
atol: float = 1e-8,
):
"""
Initialize fractional SDE solver.
Args:
fractional_order: Fractional order (0 < α < 2)
definition: Type of fractional derivative ("caputo" or "riemann_liouville")
adaptive: Use adaptive step size
rtol: Relative tolerance for adaptive stepping
atol: Absolute tolerance for adaptive stepping
"""
if isinstance(fractional_order, float):
self.fractional_order = FractionalOrder(fractional_order)
else:
self.fractional_order = fractional_order
self.definition = DefinitionType(definition.lower())
self.adaptive = adaptive
self.rtol = rtol
self.atol = atol
# Validate fractional order
if self.fractional_order.alpha <= 0 or self.fractional_order.alpha >= 2:
raise ValueError("Fractional order must be in (0, 2)")
[docs]
@abstractmethod
def solve(
self,
drift: Callable[[float, np.ndarray], np.ndarray],
diffusion: Callable[[float, np.ndarray], np.ndarray],
x0: np.ndarray,
t_span: Tuple[float, float],
noise_model: Optional[NoiseModel] = None,
**kwargs
) -> SDESolution:
"""
Solve fractional SDE.
Args:
drift: Drift function f(t, x) -> R^d
diffusion: Diffusion function g(t, x) -> R^(d x m) where m is dimension of noise
x0: Initial condition
t_span: Time interval (t0, tf)
**kwargs: Additional solver parameters
Returns:
SDESolution object containing trajectory
"""
pass
[docs]
class FastHistoryConvolution:
"""
Helper class for efficient history convolution in fractional SDE solvers.
Features:
- Pre-allocated arrays to avoid list overhead (O(N) -> O(1) allocation)
- Dynamic switching between direct dot product (small N) and FFT (large N)
- JAX/SciPy/NumPy backend support via _fft_convolution
"""
[docs]
def __init__(self, alpha: float, num_steps: int, dim: int):
self.alpha = alpha
self.num_steps = num_steps
self.dim = dim
# Pre-allocate history buffer
self.history = np.zeros((num_steps + 1, dim))
self.current_step = 0
# Precompute weights for fractional integral
# b_k = (k+1)^alpha - k^alpha
k_vals = np.arange(num_steps + 1)
self.weights = (k_vals + 1)**alpha - k_vals**alpha
# Threshold for switching to FFT
# FFT overhead typically pays off around N=64-128
self.fft_threshold = 64
[docs]
def update(self, value: np.ndarray):
"""Add new value to history."""
if self.current_step <= self.num_steps:
self.history[self.current_step] = value
self.current_step += 1
[docs]
def convolve(self) -> np.ndarray:
"""Compute convolution of weights with history."""
if self.current_step == 0:
return np.zeros(self.dim)
# Get views of valid data
hist_view = self.history[:self.current_step]
weights_view = self.weights[:self.current_step]
if self.current_step < self.fft_threshold:
# Small N: Use direct dot product
# Corresponds to sum_{j=0}^{n-1} w_{n-1-j} * history[j]
# w_flipped[0] is w_{n-1}, matches history[0]
w_flipped = weights_view[::-1]
return np.dot(w_flipped, hist_view)
else:
# Large N: Use FFT
# We want the LAST element of the convolution
# conv[n-1]
# Use _fft_convolution which handles backend selection
if self.dim == 1 or hist_view.ndim == 1:
# 1D case
# Reverse weights for convolution definition: conv(f, g)
# _fft_convolution computes full convolution
# We need to be careful with layout.
# Direct sum: sum(w[k] * h[n-1-k])
# This is (w * h)[n-1] if w is NOT reversed?
# Convolution: (f * g)[n] = sum f[k] g[n-k]
# Our sum is: sum_{j=0}^{n-1} w_{n-1-j} * h_j
# Let k = j. f=h, g=w.
# sum h[k] * w[n-1-k]
# This is EXACTLY discrete convolution (h * w) at index n-1.
# So we pass h and w to _fft_convolution.
# Note: _fft_convolution does ifft(fft(A)*fft(B)).
conv_res = _fft_convolution(weights_view, hist_view)
# We want the value at index n-1 (which is the last valid index of valid history)
# _fft_convolution returns array of same shape as values (hist_view)
# It computes circular convolution padded?
# _fft_convolution implementation:
# real(ifft(fft(coeffs, size) * fft(values, size)))[:N]
# This is standard linear convolution truncated to N.
# So the last element is indeed what we want.
if hist_view.ndim == 1:
return conv_res[-1]
else:
return conv_res[-1, :]
else:
# 2D case (history dim > 1)
# Convolve along axis 0
conv_res = _fft_convolution(weights_view, hist_view, axis=0)
return conv_res[-1, :]
[docs]
def _milstein_fd_eps(x: np.ndarray, base: float = 1e-6) -> float:
"""Scale-aware finite-difference step for ∂g/∂x."""
scale = float(np.max(np.abs(x))) if x.size else 0.0
return base * (1.0 + scale)
[docs]
def _milstein_correction_diagonal(
diffusion: Callable[[float, np.ndarray], np.ndarray],
t: float,
x: np.ndarray,
g_vec: np.ndarray,
dW: np.ndarray,
dt: float,
) -> np.ndarray:
"""
Diagonal / independent-noise Milstein correction:
0.5 * sum_i g_i(t,x) * (∂g_i/∂x_i) * (dW_i^2 - dt)
with ∂g_i/∂x_i approximated by central differences on x_i.
"""
dim = x.shape[0]
eps = _milstein_fd_eps(x)
out = np.zeros(dim, dtype=float)
for i in range(dim):
e = np.zeros(dim, dtype=float)
e[i] = 1.0
gp = np.asarray(diffusion(t, x + eps * e), dtype=float).ravel()
gm = np.asarray(diffusion(t, x - eps * e), dtype=float).ravel()
if gp.shape[0] != dim or gm.shape[0] != dim:
raise ValueError(
f"diffusion(t,x) must return vector of shape ({dim},), got {gp.shape}"
)
dg_i_dxi = (gp[i] - gm[i]) / (2.0 * eps)
out[i] = 0.5 * g_vec[i] * dg_i_dxi * (dW[i] ** 2 - dt)
return out
[docs]
def _milstein_correction_single_wiener(
diffusion: Callable[[float, np.ndarray], np.ndarray],
t: float,
x: np.ndarray,
g_vec: np.ndarray,
dW: np.ndarray,
dt: float,
) -> np.ndarray:
"""
Milstein correction for a single driving Wiener process (m=1, dW scalar):
0.5 * (dW^2 - dt) * sum_j g_j * (∂g_k/∂x_j) for each component k.
Uses central differences in the state directions x_j.
"""
dim = x.shape[0]
dw = float(dW.reshape(-1)[0])
factor = 0.5 * (dw * dw - dt)
eps = _milstein_fd_eps(x)
out = np.zeros(dim, dtype=float)
for k in range(dim):
acc = 0.0
for j in range(dim):
ej = np.zeros(dim, dtype=float)
ej[j] = 1.0
gp = np.asarray(diffusion(t, x + eps * ej), dtype=float).ravel()
gm = np.asarray(diffusion(t, x - eps * ej), dtype=float).ravel()
if gp.shape[0] != dim or gm.shape[0] != dim:
raise ValueError(
f"diffusion(t,x) must return vector of shape ({dim},), got {gp.shape}"
)
dg_k_dxj = (gp[k] - gm[k]) / (2.0 * eps)
acc += float(g_vec[j]) * dg_k_dxj
out[k] = factor * acc
return out
[docs]
class FractionalEulerMaruyama(FractionalSDESolver):
"""
Fractional Euler-Maruyama method for solving fractional SDEs.
This is a first-order strong convergence method.
"""
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.method_name = "fractional_euler_maruyama"
[docs]
def solve(
self,
drift: Callable,
diffusion: Callable,
x0: np.ndarray,
t_span: Tuple[float, float],
num_steps: int = 100,
seed: Optional[int] = None,
noise_model: Optional[NoiseModel] = None,
**kwargs
) -> SDESolution:
"""
Solve fractional SDE using Euler-Maruyama method.
Args:
drift: Drift function f(t, x)
diffusion: Diffusion function g(t, x)
x0: Initial condition
t_span: Time interval (t0, tf)
num_steps: Number of time steps
seed: Random seed for Wiener process
noise_model: Optional noise model for non-Brownian noise
Returns:
SDESolution object
"""
t0, tf = t_span
dt = (tf - t0) / num_steps
alpha = self.fractional_order.alpha
# Time grid
t = np.linspace(t0, tf, num_steps + 1)
# Initialize solution
if x0.ndim == 0:
x0 = x0[np.newaxis]
dim = x0.shape[0]
y = np.zeros((num_steps + 1, dim))
y[0] = x0
rng = np.random.default_rng(seed)
# History convolution helpers
drift_conv = FastHistoryConvolution(alpha, num_steps, dim)
diffusion_conv = FastHistoryConvolution(alpha, num_steps, dim)
# Gamma factor
gamma_factor = 1.0 / gamma(alpha + 1)
# Time stepping
for i in range(num_steps):
t_curr = t[i]
x_curr = y[i]
# Compute drift and diffusion at current step
drift_val = drift(t_curr, x_curr)
diffusion_val = diffusion(t_curr, x_curr)
# Determine noise dimension and generate dW
# dW shape depends on diffusion dimension
if np.isscalar(diffusion_val):
noise_dim = dim
diffusion_val = np.full(dim, diffusion_val)
elif diffusion_val.ndim == 0:
noise_dim = dim
diffusion_val = np.full(dim, float(diffusion_val))
elif diffusion_val.ndim == 1:
noise_dim = dim
if len(diffusion_val) != dim:
raise ValueError(f"Diffusion vector shape {diffusion_val.shape} does not match state dim {dim}")
elif diffusion_val.ndim == 2:
d_out, m_in = diffusion_val.shape
if d_out != dim:
raise ValueError(f"Diffusion matrix rows {d_out} does not match state dim {dim}")
noise_dim = m_in
else:
raise ValueError(f"Invalid diffusion shape: {diffusion_val.shape}")
# Generate noise increment using model or fallback to Gaussian
if noise_model is not None:
# On first step, prepare noise model if needed
if i == 0:
noise_model.prepare(num_steps, dt, size=(noise_dim,))
dW = noise_model.generate_increment(t_curr, dt, size=(noise_dim,))
else:
dW = rng.normal(0, np.sqrt(dt), size=(noise_dim,))
# Compute noise term (diffusion * dW)
# diffusion_val shape: (dim,), (dim, m_in)
# dW shape: (noise_dim,)
if diffusion_val.ndim == 1:
# diffusion (dim,) * dW (dim,) -> elementwise or dot?
# Usually diagonal noise: diffusion[i] * dW[i]
noise_term = diffusion_val * dW
elif diffusion_val.ndim == 2:
# diffusion (dim, m) @ dW (m,)
noise_term = diffusion_val @ dW
else:
# Scalar/0-dim handled by conversion above to (dim,)
noise_term = diffusion_val * dW
# Update history
drift_conv.update(drift_val)
diffusion_conv.update(noise_term)
# Compute memory terms
drift_integral = drift_conv.convolve()
diffusion_integral = diffusion_conv.convolve()
# Update X
y[i + 1] = x0 + gamma_factor * dt**alpha * (drift_integral + diffusion_integral)
# Create solution object
solution = SDESolution(
t=t,
y=y,
fractional_order=self.fractional_order,
method=self.method_name,
drift_func=drift,
diffusion_func=diffusion,
metadata={
'num_steps': num_steps,
'dt': dt,
'seed': seed
}
)
return solution
[docs]
class FractionalMilstein(FractionalSDESolver):
"""
Fractional SDE integrator with the same history-convolution path as Euler–Maruyama,
plus a **Milstein correction** where supported.
The correction uses the standard form ``0.5 * (dW^2 - dt)`` multiplied by terms
involving spatial derivatives of the diffusion, approximated by central finite
differences.
**Supported**
- **Diagonal noise**: ``g(t,x)`` returns a vector of length ``d``; one Brownian
increment per component (same layout as Euler–Maruyama).
- **Single Wiener process**: ``g(t,x)`` returns shape ``(d, 1)``; scalar ``dW``.
**Not implemented**
- General matrix diffusion with ``m > 1`` independent Wiener processes
(Levy area / non-commutative terms not included). Use diagonal or scalar noise,
or extend the solver.
"""
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.method_name = "fractional_milstein"
[docs]
def solve(
self,
drift: Callable,
diffusion: Callable,
x0: np.ndarray,
t_span: Tuple[float, float],
num_steps: int = 100,
seed: Optional[int] = None,
noise_model: Optional[NoiseModel] = None,
**kwargs
) -> SDESolution:
"""
Integrate with fractional history terms and Milstein correction (supported cases).
"""
t0, tf = t_span
dt = (tf - t0) / num_steps
alpha = self.fractional_order.alpha
# Time grid
t = np.linspace(t0, tf, num_steps + 1)
# Initialize solution
if x0.ndim == 0:
x0 = x0[np.newaxis]
dim = x0.shape[0]
y = np.zeros((num_steps + 1, dim))
y[0] = x0
rng = np.random.default_rng(seed)
# History convolution helpers
drift_conv = FastHistoryConvolution(alpha, num_steps, dim)
diffusion_conv = FastHistoryConvolution(alpha, num_steps, dim)
# Gamma factor
gamma_factor = 1.0 / gamma(alpha + 1)
# Time stepping
for i in range(num_steps):
t_curr = t[i]
x_curr = y[i]
# Compute drift and diffusion terms
drift_val = drift(t_curr, x_curr)
diffusion_val = diffusion(t_curr, x_curr)
# Determine noise dimension and generate dW
if np.isscalar(diffusion_val):
noise_dim = dim
diffusion_val = np.full(dim, diffusion_val)
elif diffusion_val.ndim == 0:
noise_dim = dim
diffusion_val = np.full(dim, float(diffusion_val))
elif diffusion_val.ndim == 1:
noise_dim = dim
if len(diffusion_val) != dim:
raise ValueError(f"Diffusion vector shape {diffusion_val.shape} does not match state dim {dim}")
elif diffusion_val.ndim == 2:
d_out, m_in = diffusion_val.shape
if d_out != dim:
raise ValueError(f"Diffusion matrix rows {d_out} does not match state dim {dim}")
noise_dim = m_in
if m_in > 1:
raise NotImplementedError(
"FractionalMilstein does not implement multi-dimensional Brownian motion "
f"(diffusion shape {diffusion_val.shape}, m={m_in}). "
"Use diagonal noise g(t,x) as a vector of length d, or a (d,1) matrix "
"for a single Wiener process."
)
else:
raise ValueError(f"Invalid diffusion shape: {diffusion_val.shape}")
# Generate noise increment using model or fallback to Gaussian
if noise_model is not None:
# On first step, prepare noise model if needed
if i == 0:
noise_model.prepare(num_steps, dt, size=(noise_dim,))
dW = noise_model.generate_increment(t_curr, dt, size=(noise_dim,))
else:
dW = rng.normal(0, np.sqrt(dt), size=(noise_dim,))
# Compute noise term
if diffusion_val.ndim == 1:
noise_term = diffusion_val * dW
elif diffusion_val.ndim == 2:
noise_term = diffusion_val @ dW
else:
noise_term = diffusion_val * dW
# Milstein correction (diagonal or single-Wiener only)
g_vec = np.asarray(diffusion_val, dtype=float).reshape(-1) if diffusion_val.ndim == 2 else np.asarray(diffusion_val, dtype=float)
if diffusion_val.ndim == 1 or (diffusion_val.ndim == 2 and noise_dim == dim):
correction_term = _milstein_correction_diagonal(
diffusion, t_curr, x_curr, g_vec, dW, dt
)
else:
# noise_dim == 1, diffusion (dim, 1)
correction_term = _milstein_correction_single_wiener(
diffusion, t_curr, x_curr, g_vec, dW, dt
)
# Update history
drift_conv.update(drift_val)
diffusion_conv.update(noise_term)
drift_integral = drift_conv.convolve()
diffusion_integral = diffusion_conv.convolve()
y[i + 1] = (
x0
+ gamma_factor * dt**alpha * (drift_integral + diffusion_integral)
+ correction_term
)
# Create solution object
solution = SDESolution(
t=t,
y=y,
fractional_order=self.fractional_order,
method=self.method_name,
drift_func=drift,
diffusion_func=diffusion,
metadata={
'num_steps': num_steps,
'dt': dt,
'seed': seed
}
)
return solution
[docs]
def solve_fractional_sde(
drift: Callable,
diffusion: Callable,
x0: np.ndarray,
t_span: Tuple[float, float],
fractional_order: Union[float, FractionalOrder] = 0.5,
method: str = "euler_maruyama",
num_steps: int = 100,
seed: Optional[int] = None,
noise_model: Optional[NoiseModel] = None,
**kwargs
) -> SDESolution:
"""
Solve a fractional SDE.
Args:
drift: Drift function f(t, x) -> R^d
diffusion: Diffusion function g(t, x) -> R^(d x m)
x0: Initial condition
t_span: Time interval (t0, tf)
fractional_order: Fractional order (0 < α < 2)
method: Solver method (``euler_maruyama`` or ``milstein``). ``milstein`` adds a
finite-difference Milstein correction for diagonal noise or a single Wiener
driver; see :class:`FractionalMilstein`.
num_steps: Number of time steps
seed: Random seed for Wiener process
**kwargs: Additional solver parameters
Returns:
SDESolution object
Example:
>>> def drift(t, x):
... return -0.5 * x
>>> def diffusion(t, x):
... return 0.2 * np.eye(1)
>>> x0 = np.array([1.0])
>>> sol = solve_fractional_sde(drift, diffusion, x0, (0, 1), 0.5, num_steps=100)
>>> print(sol.y[-1])
"""
# Select solver based on method
if method == "euler_maruyama":
solver = FractionalEulerMaruyama(fractional_order, **kwargs)
elif method == "milstein":
solver = FractionalMilstein(fractional_order, **kwargs)
else:
raise ValueError(f"Unknown method: {method}. Available: 'euler_maruyama', 'milstein'")
# Solve SDE
# Solve SDE
solution = solver.solve(drift, diffusion, x0, t_span, num_steps=num_steps, seed=seed, noise_model=noise_model)
return solution
[docs]
def solve_fractional_sde_system(
drift: Callable,
diffusion: Callable,
x0: np.ndarray,
t_span: Tuple[float, float],
fractional_order: Union[float, FractionalOrder, List[float]],
method: str = "euler_maruyama",
noise_type: str = "additive",
num_steps: int = 100,
seed: Optional[int] = None,
noise_model: Optional[NoiseModel] = None,
**kwargs
) -> SDESolution:
"""
Solve a system of coupled fractional SDEs.
Args:
drift: Drift function f(t, x) -> R^d
diffusion: Diffusion function g(t, x) -> R^(d x m)
x0: Initial condition
t_span: Time interval (t0, tf)
fractional_order: Fractional order(s) for system
method: Solver method
noise_type: Type of noise ("additive" or "multiplicative")
num_steps: Number of time steps
seed: Random seed
noise_model: Optional noise model
**kwargs: Additional parameters
Returns:
SDESolution object
"""
# Handle multiple fractional orders
if isinstance(fractional_order, list):
# Use average order for now (could be extended to per-equation orders)
alpha = np.mean(fractional_order)
else:
alpha = fractional_order
# Use the standard solver
return solve_fractional_sde(
drift, diffusion, x0, t_span, alpha, method, num_steps, seed, noise_model=noise_model, **kwargs
)