Source code for hpfracc.solvers.sde_solvers

"""
Fractional Stochastic Differential Equation Solvers

This module provides comprehensive solvers for fractional SDEs including
various numerical methods, adaptive step size control, and error estimation.

Performance Note:
- Uses FFT-based convolution for O(N log N) history summation instead of O(N²)
- Intelligent backend selection for optimal performance
- Multi-backend support (PyTorch, JAX, NumPy/Numba)
"""

import numpy as np
import time
from typing import Union, Optional, Tuple, Callable, List, Dict, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
from scipy import fft

from ..core.definitions import FractionalOrder, DefinitionType
from .noise_models import NoiseModel

# Use adapter system for gamma function
[docs] def _get_gamma_function(): """Get gamma function through adapter system.""" try: from ..special.gamma_beta import gamma_function as gamma return gamma except Exception: from scipy.special import gamma return gamma
gamma = _get_gamma_function() # Initialize intelligent backend selector _intelligent_selector = None _use_intelligent_backend = True
[docs] def _get_intelligent_selector(): """Get intelligent backend selector instance.""" global _intelligent_selector, _use_intelligent_backend if not _use_intelligent_backend: return None if _intelligent_selector is None: try: from ..ml.intelligent_backend_selector import IntelligentBackendSelector _intelligent_selector = IntelligentBackendSelector(enable_learning=True) except ImportError: _use_intelligent_backend = False _intelligent_selector = None return _intelligent_selector
[docs] def _fft_convolution(coeffs: np.ndarray, values: np.ndarray, axis: int = 0) -> np.ndarray: """ Fast convolution using FFT for O(N log N) performance. Args: coeffs: Coefficient array (1D) values: Value array (can be 1D or 2D) axis: Axis along which to perform convolution Returns: Convolution result (same shape as values) """ N = coeffs.shape[0] # Pad to next power of 2 for FFT efficiency size = int(2 ** np.ceil(np.log2(2 * N - 1))) if values.ndim == 1: coeffs_padded = np.zeros(size) coeffs_padded[:N] = coeffs values_padded = np.zeros(size) values_padded[:N] = values coeffs_fft = fft.fft(coeffs_padded) values_fft = fft.fft(values_padded) conv_result = fft.ifft(coeffs_fft * values_fft).real[:N] return conv_result else: num_cols = values.shape[1] result = np.zeros_like(values) for col in range(num_cols): coeffs_padded = np.zeros(size) coeffs_padded[:N] = coeffs values_padded = np.zeros(size) values_padded[:N] = values[:, col] coeffs_fft = fft.fft(coeffs_padded) values_fft = fft.fft(values_padded) conv_result = fft.ifft(coeffs_fft * values_fft).real[:N] result[:, col] = conv_result return result
[docs] @dataclass class SDESolution: """Solution object for SDE solvers.""" t: np.ndarray y: np.ndarray fractional_order: Union[float, FractionalOrder] method: str drift_func: Callable diffusion_func: Callable metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {}
[docs] class FractionalSDESolver(ABC): """ Base class for fractional SDE solvers. A fractional SDE takes the form: D^α X(t) = f(t, X(t)) dt + g(t, X(t)) dW(t) where: - α is the fractional order - f is the drift function - g is the diffusion function - W(t) is a Wiener process """
[docs] def __init__( self, fractional_order: Union[float, FractionalOrder], definition: str = "caputo", adaptive: bool = False, rtol: float = 1e-5, atol: float = 1e-8, ): """ Initialize fractional SDE solver. Args: fractional_order: Fractional order (0 < α < 2) definition: Type of fractional derivative ("caputo" or "riemann_liouville") adaptive: Use adaptive step size rtol: Relative tolerance for adaptive stepping atol: Absolute tolerance for adaptive stepping """ if isinstance(fractional_order, float): self.fractional_order = FractionalOrder(fractional_order) else: self.fractional_order = fractional_order self.definition = DefinitionType(definition.lower()) self.adaptive = adaptive self.rtol = rtol self.atol = atol # Validate fractional order if self.fractional_order.alpha <= 0 or self.fractional_order.alpha >= 2: raise ValueError("Fractional order must be in (0, 2)")
[docs] @abstractmethod def solve( self, drift: Callable[[float, np.ndarray], np.ndarray], diffusion: Callable[[float, np.ndarray], np.ndarray], x0: np.ndarray, t_span: Tuple[float, float], noise_model: Optional[NoiseModel] = None, **kwargs ) -> SDESolution: """ Solve fractional SDE. Args: drift: Drift function f(t, x) -> R^d diffusion: Diffusion function g(t, x) -> R^(d x m) where m is dimension of noise x0: Initial condition t_span: Time interval (t0, tf) **kwargs: Additional solver parameters Returns: SDESolution object containing trajectory """ pass
[docs] class FastHistoryConvolution: """ Helper class for efficient history convolution in fractional SDE solvers. Features: - Pre-allocated arrays to avoid list overhead (O(N) -> O(1) allocation) - Dynamic switching between direct dot product (small N) and FFT (large N) - JAX/SciPy/NumPy backend support via _fft_convolution """
[docs] def __init__(self, alpha: float, num_steps: int, dim: int): self.alpha = alpha self.num_steps = num_steps self.dim = dim # Pre-allocate history buffer self.history = np.zeros((num_steps + 1, dim)) self.current_step = 0 # Precompute weights for fractional integral # b_k = (k+1)^alpha - k^alpha k_vals = np.arange(num_steps + 1) self.weights = (k_vals + 1)**alpha - k_vals**alpha # Threshold for switching to FFT # FFT overhead typically pays off around N=64-128 self.fft_threshold = 64
[docs] def update(self, value: np.ndarray): """Add new value to history.""" if self.current_step <= self.num_steps: self.history[self.current_step] = value self.current_step += 1
[docs] def convolve(self) -> np.ndarray: """Compute convolution of weights with history.""" if self.current_step == 0: return np.zeros(self.dim) # Get views of valid data hist_view = self.history[:self.current_step] weights_view = self.weights[:self.current_step] if self.current_step < self.fft_threshold: # Small N: Use direct dot product # Corresponds to sum_{j=0}^{n-1} w_{n-1-j} * history[j] # w_flipped[0] is w_{n-1}, matches history[0] w_flipped = weights_view[::-1] return np.dot(w_flipped, hist_view) else: # Large N: Use FFT # We want the LAST element of the convolution # conv[n-1] # Use _fft_convolution which handles backend selection if self.dim == 1 or hist_view.ndim == 1: # 1D case # Reverse weights for convolution definition: conv(f, g) # _fft_convolution computes full convolution # We need to be careful with layout. # Direct sum: sum(w[k] * h[n-1-k]) # This is (w * h)[n-1] if w is NOT reversed? # Convolution: (f * g)[n] = sum f[k] g[n-k] # Our sum is: sum_{j=0}^{n-1} w_{n-1-j} * h_j # Let k = j. f=h, g=w. # sum h[k] * w[n-1-k] # This is EXACTLY discrete convolution (h * w) at index n-1. # So we pass h and w to _fft_convolution. # Note: _fft_convolution does ifft(fft(A)*fft(B)). conv_res = _fft_convolution(weights_view, hist_view) # We want the value at index n-1 (which is the last valid index of valid history) # _fft_convolution returns array of same shape as values (hist_view) # It computes circular convolution padded? # _fft_convolution implementation: # real(ifft(fft(coeffs, size) * fft(values, size)))[:N] # This is standard linear convolution truncated to N. # So the last element is indeed what we want. if hist_view.ndim == 1: return conv_res[-1] else: return conv_res[-1, :] else: # 2D case (history dim > 1) # Convolve along axis 0 conv_res = _fft_convolution(weights_view, hist_view, axis=0) return conv_res[-1, :]
[docs] def _milstein_fd_eps(x: np.ndarray, base: float = 1e-6) -> float: """Scale-aware finite-difference step for ∂g/∂x.""" scale = float(np.max(np.abs(x))) if x.size else 0.0 return base * (1.0 + scale)
[docs] def _milstein_correction_diagonal( diffusion: Callable[[float, np.ndarray], np.ndarray], t: float, x: np.ndarray, g_vec: np.ndarray, dW: np.ndarray, dt: float, ) -> np.ndarray: """ Diagonal / independent-noise Milstein correction: 0.5 * sum_i g_i(t,x) * (∂g_i/∂x_i) * (dW_i^2 - dt) with ∂g_i/∂x_i approximated by central differences on x_i. """ dim = x.shape[0] eps = _milstein_fd_eps(x) out = np.zeros(dim, dtype=float) for i in range(dim): e = np.zeros(dim, dtype=float) e[i] = 1.0 gp = np.asarray(diffusion(t, x + eps * e), dtype=float).ravel() gm = np.asarray(diffusion(t, x - eps * e), dtype=float).ravel() if gp.shape[0] != dim or gm.shape[0] != dim: raise ValueError( f"diffusion(t,x) must return vector of shape ({dim},), got {gp.shape}" ) dg_i_dxi = (gp[i] - gm[i]) / (2.0 * eps) out[i] = 0.5 * g_vec[i] * dg_i_dxi * (dW[i] ** 2 - dt) return out
[docs] def _milstein_correction_single_wiener( diffusion: Callable[[float, np.ndarray], np.ndarray], t: float, x: np.ndarray, g_vec: np.ndarray, dW: np.ndarray, dt: float, ) -> np.ndarray: """ Milstein correction for a single driving Wiener process (m=1, dW scalar): 0.5 * (dW^2 - dt) * sum_j g_j * (∂g_k/∂x_j) for each component k. Uses central differences in the state directions x_j. """ dim = x.shape[0] dw = float(dW.reshape(-1)[0]) factor = 0.5 * (dw * dw - dt) eps = _milstein_fd_eps(x) out = np.zeros(dim, dtype=float) for k in range(dim): acc = 0.0 for j in range(dim): ej = np.zeros(dim, dtype=float) ej[j] = 1.0 gp = np.asarray(diffusion(t, x + eps * ej), dtype=float).ravel() gm = np.asarray(diffusion(t, x - eps * ej), dtype=float).ravel() if gp.shape[0] != dim or gm.shape[0] != dim: raise ValueError( f"diffusion(t,x) must return vector of shape ({dim},), got {gp.shape}" ) dg_k_dxj = (gp[k] - gm[k]) / (2.0 * eps) acc += float(g_vec[j]) * dg_k_dxj out[k] = factor * acc return out
[docs] class FractionalEulerMaruyama(FractionalSDESolver): """ Fractional Euler-Maruyama method for solving fractional SDEs. This is a first-order strong convergence method. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.method_name = "fractional_euler_maruyama"
[docs] def solve( self, drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], num_steps: int = 100, seed: Optional[int] = None, noise_model: Optional[NoiseModel] = None, **kwargs ) -> SDESolution: """ Solve fractional SDE using Euler-Maruyama method. Args: drift: Drift function f(t, x) diffusion: Diffusion function g(t, x) x0: Initial condition t_span: Time interval (t0, tf) num_steps: Number of time steps seed: Random seed for Wiener process noise_model: Optional noise model for non-Brownian noise Returns: SDESolution object """ t0, tf = t_span dt = (tf - t0) / num_steps alpha = self.fractional_order.alpha # Time grid t = np.linspace(t0, tf, num_steps + 1) # Initialize solution if x0.ndim == 0: x0 = x0[np.newaxis] dim = x0.shape[0] y = np.zeros((num_steps + 1, dim)) y[0] = x0 rng = np.random.default_rng(seed) # History convolution helpers drift_conv = FastHistoryConvolution(alpha, num_steps, dim) diffusion_conv = FastHistoryConvolution(alpha, num_steps, dim) # Gamma factor gamma_factor = 1.0 / gamma(alpha + 1) # Time stepping for i in range(num_steps): t_curr = t[i] x_curr = y[i] # Compute drift and diffusion at current step drift_val = drift(t_curr, x_curr) diffusion_val = diffusion(t_curr, x_curr) # Determine noise dimension and generate dW # dW shape depends on diffusion dimension if np.isscalar(diffusion_val): noise_dim = dim diffusion_val = np.full(dim, diffusion_val) elif diffusion_val.ndim == 0: noise_dim = dim diffusion_val = np.full(dim, float(diffusion_val)) elif diffusion_val.ndim == 1: noise_dim = dim if len(diffusion_val) != dim: raise ValueError(f"Diffusion vector shape {diffusion_val.shape} does not match state dim {dim}") elif diffusion_val.ndim == 2: d_out, m_in = diffusion_val.shape if d_out != dim: raise ValueError(f"Diffusion matrix rows {d_out} does not match state dim {dim}") noise_dim = m_in else: raise ValueError(f"Invalid diffusion shape: {diffusion_val.shape}") # Generate noise increment using model or fallback to Gaussian if noise_model is not None: # On first step, prepare noise model if needed if i == 0: noise_model.prepare(num_steps, dt, size=(noise_dim,)) dW = noise_model.generate_increment(t_curr, dt, size=(noise_dim,)) else: dW = rng.normal(0, np.sqrt(dt), size=(noise_dim,)) # Compute noise term (diffusion * dW) # diffusion_val shape: (dim,), (dim, m_in) # dW shape: (noise_dim,) if diffusion_val.ndim == 1: # diffusion (dim,) * dW (dim,) -> elementwise or dot? # Usually diagonal noise: diffusion[i] * dW[i] noise_term = diffusion_val * dW elif diffusion_val.ndim == 2: # diffusion (dim, m) @ dW (m,) noise_term = diffusion_val @ dW else: # Scalar/0-dim handled by conversion above to (dim,) noise_term = diffusion_val * dW # Update history drift_conv.update(drift_val) diffusion_conv.update(noise_term) # Compute memory terms drift_integral = drift_conv.convolve() diffusion_integral = diffusion_conv.convolve() # Update X y[i + 1] = x0 + gamma_factor * dt**alpha * (drift_integral + diffusion_integral) # Create solution object solution = SDESolution( t=t, y=y, fractional_order=self.fractional_order, method=self.method_name, drift_func=drift, diffusion_func=diffusion, metadata={ 'num_steps': num_steps, 'dt': dt, 'seed': seed } ) return solution
[docs] class FractionalMilstein(FractionalSDESolver): """ Fractional SDE integrator with the same history-convolution path as Euler–Maruyama, plus a **Milstein correction** where supported. The correction uses the standard form ``0.5 * (dW^2 - dt)`` multiplied by terms involving spatial derivatives of the diffusion, approximated by central finite differences. **Supported** - **Diagonal noise**: ``g(t,x)`` returns a vector of length ``d``; one Brownian increment per component (same layout as Euler–Maruyama). - **Single Wiener process**: ``g(t,x)`` returns shape ``(d, 1)``; scalar ``dW``. **Not implemented** - General matrix diffusion with ``m > 1`` independent Wiener processes (Levy area / non-commutative terms not included). Use diagonal or scalar noise, or extend the solver. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.method_name = "fractional_milstein"
[docs] def solve( self, drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], num_steps: int = 100, seed: Optional[int] = None, noise_model: Optional[NoiseModel] = None, **kwargs ) -> SDESolution: """ Integrate with fractional history terms and Milstein correction (supported cases). """ t0, tf = t_span dt = (tf - t0) / num_steps alpha = self.fractional_order.alpha # Time grid t = np.linspace(t0, tf, num_steps + 1) # Initialize solution if x0.ndim == 0: x0 = x0[np.newaxis] dim = x0.shape[0] y = np.zeros((num_steps + 1, dim)) y[0] = x0 rng = np.random.default_rng(seed) # History convolution helpers drift_conv = FastHistoryConvolution(alpha, num_steps, dim) diffusion_conv = FastHistoryConvolution(alpha, num_steps, dim) # Gamma factor gamma_factor = 1.0 / gamma(alpha + 1) # Time stepping for i in range(num_steps): t_curr = t[i] x_curr = y[i] # Compute drift and diffusion terms drift_val = drift(t_curr, x_curr) diffusion_val = diffusion(t_curr, x_curr) # Determine noise dimension and generate dW if np.isscalar(diffusion_val): noise_dim = dim diffusion_val = np.full(dim, diffusion_val) elif diffusion_val.ndim == 0: noise_dim = dim diffusion_val = np.full(dim, float(diffusion_val)) elif diffusion_val.ndim == 1: noise_dim = dim if len(diffusion_val) != dim: raise ValueError(f"Diffusion vector shape {diffusion_val.shape} does not match state dim {dim}") elif diffusion_val.ndim == 2: d_out, m_in = diffusion_val.shape if d_out != dim: raise ValueError(f"Diffusion matrix rows {d_out} does not match state dim {dim}") noise_dim = m_in if m_in > 1: raise NotImplementedError( "FractionalMilstein does not implement multi-dimensional Brownian motion " f"(diffusion shape {diffusion_val.shape}, m={m_in}). " "Use diagonal noise g(t,x) as a vector of length d, or a (d,1) matrix " "for a single Wiener process." ) else: raise ValueError(f"Invalid diffusion shape: {diffusion_val.shape}") # Generate noise increment using model or fallback to Gaussian if noise_model is not None: # On first step, prepare noise model if needed if i == 0: noise_model.prepare(num_steps, dt, size=(noise_dim,)) dW = noise_model.generate_increment(t_curr, dt, size=(noise_dim,)) else: dW = rng.normal(0, np.sqrt(dt), size=(noise_dim,)) # Compute noise term if diffusion_val.ndim == 1: noise_term = diffusion_val * dW elif diffusion_val.ndim == 2: noise_term = diffusion_val @ dW else: noise_term = diffusion_val * dW # Milstein correction (diagonal or single-Wiener only) g_vec = np.asarray(diffusion_val, dtype=float).reshape(-1) if diffusion_val.ndim == 2 else np.asarray(diffusion_val, dtype=float) if diffusion_val.ndim == 1 or (diffusion_val.ndim == 2 and noise_dim == dim): correction_term = _milstein_correction_diagonal( diffusion, t_curr, x_curr, g_vec, dW, dt ) else: # noise_dim == 1, diffusion (dim, 1) correction_term = _milstein_correction_single_wiener( diffusion, t_curr, x_curr, g_vec, dW, dt ) # Update history drift_conv.update(drift_val) diffusion_conv.update(noise_term) drift_integral = drift_conv.convolve() diffusion_integral = diffusion_conv.convolve() y[i + 1] = ( x0 + gamma_factor * dt**alpha * (drift_integral + diffusion_integral) + correction_term ) # Create solution object solution = SDESolution( t=t, y=y, fractional_order=self.fractional_order, method=self.method_name, drift_func=drift, diffusion_func=diffusion, metadata={ 'num_steps': num_steps, 'dt': dt, 'seed': seed } ) return solution
[docs] def solve_fractional_sde( drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], fractional_order: Union[float, FractionalOrder] = 0.5, method: str = "euler_maruyama", num_steps: int = 100, seed: Optional[int] = None, noise_model: Optional[NoiseModel] = None, **kwargs ) -> SDESolution: """ Solve a fractional SDE. Args: drift: Drift function f(t, x) -> R^d diffusion: Diffusion function g(t, x) -> R^(d x m) x0: Initial condition t_span: Time interval (t0, tf) fractional_order: Fractional order (0 < α < 2) method: Solver method (``euler_maruyama`` or ``milstein``). ``milstein`` adds a finite-difference Milstein correction for diagonal noise or a single Wiener driver; see :class:`FractionalMilstein`. num_steps: Number of time steps seed: Random seed for Wiener process **kwargs: Additional solver parameters Returns: SDESolution object Example: >>> def drift(t, x): ... return -0.5 * x >>> def diffusion(t, x): ... return 0.2 * np.eye(1) >>> x0 = np.array([1.0]) >>> sol = solve_fractional_sde(drift, diffusion, x0, (0, 1), 0.5, num_steps=100) >>> print(sol.y[-1]) """ # Select solver based on method if method == "euler_maruyama": solver = FractionalEulerMaruyama(fractional_order, **kwargs) elif method == "milstein": solver = FractionalMilstein(fractional_order, **kwargs) else: raise ValueError(f"Unknown method: {method}. Available: 'euler_maruyama', 'milstein'") # Solve SDE # Solve SDE solution = solver.solve(drift, diffusion, x0, t_span, num_steps=num_steps, seed=seed, noise_model=noise_model) return solution
[docs] def solve_fractional_sde_system( drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], fractional_order: Union[float, FractionalOrder, List[float]], method: str = "euler_maruyama", noise_type: str = "additive", num_steps: int = 100, seed: Optional[int] = None, noise_model: Optional[NoiseModel] = None, **kwargs ) -> SDESolution: """ Solve a system of coupled fractional SDEs. Args: drift: Drift function f(t, x) -> R^d diffusion: Diffusion function g(t, x) -> R^(d x m) x0: Initial condition t_span: Time interval (t0, tf) fractional_order: Fractional order(s) for system method: Solver method noise_type: Type of noise ("additive" or "multiplicative") num_steps: Number of time steps seed: Random seed noise_model: Optional noise model **kwargs: Additional parameters Returns: SDESolution object """ # Handle multiple fractional orders if isinstance(fractional_order, list): # Use average order for now (could be extended to per-equation orders) alpha = np.mean(fractional_order) else: alpha = fractional_order # Use the standard solver return solve_fractional_sde( drift, diffusion, x0, t_span, alpha, method, num_steps, seed, noise_model=noise_model, **kwargs )