Source code for hpfracc.solvers.sde_solvers

"""
Fractional Stochastic Differential Equation Solvers

This module provides comprehensive solvers for fractional SDEs including
various numerical methods, adaptive step size control, and error estimation.

Performance Note:
- Uses FFT-based convolution for O(N log N) history summation instead of O(N²)
- Intelligent backend selection for optimal performance
- Multi-backend support (PyTorch, JAX, NumPy/Numba)
"""

import numpy as np
import time
from typing import Union, Optional, Tuple, Callable, List, Dict, Any
from dataclasses import dataclass
from abc import ABC, abstractmethod
from scipy import fft

from ..core.definitions import FractionalOrder, DefinitionType

# Use adapter system for gamma function
[docs] def _get_gamma_function(): """Get gamma function through adapter system.""" try: from ..special.gamma_beta import gamma_function as gamma return gamma except Exception: from scipy.special import gamma return gamma
gamma = _get_gamma_function() # Initialize intelligent backend selector _intelligent_selector = None _use_intelligent_backend = True
[docs] def _get_intelligent_selector(): """Get intelligent backend selector instance.""" global _intelligent_selector, _use_intelligent_backend if not _use_intelligent_backend: return None if _intelligent_selector is None: try: from ..ml.intelligent_backend_selector import IntelligentBackendSelector _intelligent_selector = IntelligentBackendSelector(enable_learning=True) except ImportError: _use_intelligent_backend = False _intelligent_selector = None return _intelligent_selector
[docs] def _fft_convolution(coeffs: np.ndarray, values: np.ndarray, axis: int = 0) -> np.ndarray: """ Fast convolution using FFT for O(N log N) performance. Args: coeffs: Coefficient array (1D) values: Value array (can be 1D or 2D) axis: Axis along which to perform convolution Returns: Convolution result (same shape as values) """ N = coeffs.shape[0] # Pad to next power of 2 for FFT efficiency size = int(2 ** np.ceil(np.log2(2 * N - 1))) if values.ndim == 1: coeffs_padded = np.zeros(size) coeffs_padded[:N] = coeffs values_padded = np.zeros(size) values_padded[:N] = values coeffs_fft = fft.fft(coeffs_padded) values_fft = fft.fft(values_padded) conv_result = fft.ifft(coeffs_fft * values_fft).real[:N] return conv_result else: num_cols = values.shape[1] result = np.zeros_like(values) for col in range(num_cols): coeffs_padded = np.zeros(size) coeffs_padded[:N] = coeffs values_padded = np.zeros(size) values_padded[:N] = values[:, col] coeffs_fft = fft.fft(coeffs_padded) values_fft = fft.fft(values_padded) conv_result = fft.ifft(coeffs_fft * values_fft).real[:N] result[:, col] = conv_result return result
[docs] @dataclass class SDESolution: """Solution object for SDE solvers.""" t: np.ndarray y: np.ndarray fractional_order: Union[float, FractionalOrder] method: str drift_func: Callable diffusion_func: Callable metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {}
[docs] class FractionalSDESolver(ABC): """ Base class for fractional SDE solvers. A fractional SDE takes the form: D^α X(t) = f(t, X(t)) dt + g(t, X(t)) dW(t) where: - α is the fractional order - f is the drift function - g is the diffusion function - W(t) is a Wiener process """
[docs] def __init__( self, fractional_order: Union[float, FractionalOrder], definition: str = "caputo", adaptive: bool = False, rtol: float = 1e-5, atol: float = 1e-8, ): """ Initialize fractional SDE solver. Args: fractional_order: Fractional order (0 < α < 2) definition: Type of fractional derivative ("caputo" or "riemann_liouville") adaptive: Use adaptive step size rtol: Relative tolerance for adaptive stepping atol: Absolute tolerance for adaptive stepping """ if isinstance(fractional_order, float): self.fractional_order = FractionalOrder(fractional_order) else: self.fractional_order = fractional_order self.definition = DefinitionType(definition.lower()) self.adaptive = adaptive self.rtol = rtol self.atol = atol # Validate fractional order if self.fractional_order.alpha <= 0 or self.fractional_order.alpha >= 2: raise ValueError("Fractional order must be in (0, 2)")
[docs] @abstractmethod def solve( self, drift: Callable[[float, np.ndarray], np.ndarray], diffusion: Callable[[float, np.ndarray], np.ndarray], x0: np.ndarray, t_span: Tuple[float, float], **kwargs ) -> SDESolution: """ Solve fractional SDE. Args: drift: Drift function f(t, x) -> R^d diffusion: Diffusion function g(t, x) -> R^(d x m) where m is dimension of noise x0: Initial condition t_span: Time interval (t0, tf) **kwargs: Additional solver parameters Returns: SDESolution object containing trajectory """ pass
[docs] class FastHistoryConvolution: """ Helper class for efficient history convolution in fractional SDE solvers. Currently uses optimized dot product (O(N^2)), but structured to support FFT-based block convolution (O(N log N)) in future updates. """
[docs] def __init__(self, alpha: float, num_steps: int, dim: int): self.alpha = alpha self.num_steps = num_steps self.dim = dim self.history = [] # Precompute weights for fractional integral # b_k = (k+1)^alpha - k^alpha k_vals = np.arange(num_steps + 1) self.weights = (k_vals + 1)**alpha - k_vals**alpha
[docs] def update(self, value: np.ndarray): """Add new value to history.""" self.history.append(value)
[docs] def convolve(self) -> np.ndarray: """Compute convolution of weights with history.""" hist_len = len(self.history) if hist_len == 0: return np.zeros(self.dim) current_weights = self.weights[:hist_len] w_flipped = current_weights[::-1] # Convert to array for dot product # TODO: Optimize this conversion or use a pre-allocated array hist_arr = np.array(self.history) # Compute dot product along time axis # w_flipped: (hist_len,) # hist_arr: (hist_len, dim) return np.dot(w_flipped, hist_arr)
[docs] class FractionalEulerMaruyama(FractionalSDESolver): """ Fractional Euler-Maruyama method for solving fractional SDEs. This is a first-order strong convergence method. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.method_name = "fractional_euler_maruyama"
[docs] def solve( self, drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], num_steps: int = 100, seed: Optional[int] = None, **kwargs ) -> SDESolution: """ Solve fractional SDE using Euler-Maruyama method. Args: drift: Drift function f(t, x) diffusion: Diffusion function g(t, x) x0: Initial condition t_span: Time interval (t0, tf) num_steps: Number of time steps seed: Random seed for Wiener process Returns: SDESolution object """ t0, tf = t_span dt = (tf - t0) / num_steps alpha = self.fractional_order.alpha # Time grid t = np.linspace(t0, tf, num_steps + 1) # Initialize solution if x0.ndim == 0: x0 = x0[np.newaxis] dim = x0.shape[0] y = np.zeros((num_steps + 1, dim)) y[0] = x0 # Set random seed if seed is not None: np.random.seed(seed) # History convolution helpers drift_conv = FastHistoryConvolution(alpha, num_steps, dim) diffusion_conv = FastHistoryConvolution(alpha, num_steps, dim) # Gamma factor gamma_factor = 1.0 / gamma(alpha + 1) # Time stepping for i in range(num_steps): t_curr = t[i] x_curr = y[i] # Compute drift and diffusion at current step drift_val = drift(t_curr, x_curr) diffusion_val = diffusion(t_curr, x_curr) # Generate Wiener increment # dW shape depends on diffusion dimension if np.isscalar(diffusion_val): noise_dim = dim diffusion_val = np.full(dim, diffusion_val) dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val * dW elif diffusion_val.ndim == 0: noise_dim = dim diffusion_val = np.full(dim, float(diffusion_val)) dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val * dW elif diffusion_val.ndim == 1: # Vector diffusion (diagonal) noise_dim = dim if len(diffusion_val) != dim: raise ValueError(f"Diffusion vector shape {diffusion_val.shape} does not match state dim {dim}") dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val * dW elif diffusion_val.ndim == 2: # Matrix diffusion (d, m) d_out, m_in = diffusion_val.shape if d_out != dim: raise ValueError(f"Diffusion matrix rows {d_out} does not match state dim {dim}") noise_dim = m_in dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val @ dW else: raise ValueError(f"Invalid diffusion shape: {diffusion_val.shape}") # Update history drift_conv.update(drift_val) diffusion_conv.update(noise_term) # Compute memory terms drift_integral = drift_conv.convolve() diffusion_integral = diffusion_conv.convolve() # Update X y[i + 1] = x0 + gamma_factor * dt**alpha * (drift_integral + diffusion_integral) # Create solution object solution = SDESolution( t=t, y=y, fractional_order=self.fractional_order, method=self.method_name, drift_func=drift, diffusion_func=diffusion, metadata={ 'num_steps': num_steps, 'dt': dt, 'seed': seed } ) return solution
[docs] class FractionalMilstein(FractionalSDESolver): """ Fractional Milstein method for solving fractional SDEs. This is a second-order strong convergence method with improved accuracy. """
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.method_name = "fractional_milstein"
[docs] def solve( self, drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], num_steps: int = 100, seed: Optional[int] = None, **kwargs ) -> SDESolution: """ Solve fractional SDE using Milstein method. Args: drift: Drift function f(t, x) diffusion: Diffusion function g(t, x) x0: Initial condition t_span: Time interval (t0, tf) num_steps: Number of time steps seed: Random seed for Wiener process Returns: SDESolution object """ t0, tf = t_span dt = (tf - t0) / num_steps alpha = self.fractional_order.alpha # Time grid t = np.linspace(t0, tf, num_steps + 1) # Initialize solution if x0.ndim == 0: x0 = x0[np.newaxis] dim = x0.shape[0] y = np.zeros((num_steps + 1, dim)) y[0] = x0 # Set random seed if seed is not None: np.random.seed(seed) # History convolution helpers drift_conv = FastHistoryConvolution(alpha, num_steps, dim) diffusion_conv = FastHistoryConvolution(alpha, num_steps, dim) # Gamma factor gamma_factor = 1.0 / gamma(alpha + 1) # Time stepping for i in range(num_steps): t_curr = t[i] x_curr = y[i] # Compute drift and diffusion terms drift_val = drift(t_curr, x_curr) diffusion_val = diffusion(t_curr, x_curr) # Generate Wiener increment # dW shape depends on diffusion dimension if np.isscalar(diffusion_val): noise_dim = dim diffusion_val = np.full(dim, diffusion_val) dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val * dW elif diffusion_val.ndim == 0: noise_dim = dim diffusion_val = np.full(dim, float(diffusion_val)) dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val * dW elif diffusion_val.ndim == 1: # Vector diffusion (diagonal) noise_dim = dim if len(diffusion_val) != dim: raise ValueError(f"Diffusion vector shape {diffusion_val.shape} does not match state dim {dim}") dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val * dW elif diffusion_val.ndim == 2: # Matrix diffusion (d, m) d_out, m_in = diffusion_val.shape if d_out != dim: raise ValueError(f"Diffusion matrix rows {d_out} does not match state dim {dim}") noise_dim = m_in dW = np.random.normal(0, np.sqrt(dt), size=(noise_dim,)) noise_term = diffusion_val @ dW else: raise ValueError(f"Invalid diffusion shape: {diffusion_val.shape}") # Update history drift_conv.update(drift_val) diffusion_conv.update(noise_term) # Simplified Milstein correction term correction_term = np.zeros_like(x_curr) # Compute memory terms drift_integral = drift_conv.convolve() diffusion_integral = diffusion_conv.convolve() # Update using Milstein scheme y[i + 1] = x0 + gamma_factor * dt**alpha * (drift_integral + diffusion_integral) + correction_term # Create solution object solution = SDESolution( t=t, y=y, fractional_order=self.fractional_order, method=self.method_name, drift_func=drift, diffusion_func=diffusion, metadata={ 'num_steps': num_steps, 'dt': dt, 'seed': seed } ) return solution
[docs] def solve_fractional_sde( drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], fractional_order: Union[float, FractionalOrder] = 0.5, method: str = "euler_maruyama", num_steps: int = 100, seed: Optional[int] = None, **kwargs ) -> SDESolution: """ Solve a fractional SDE. Args: drift: Drift function f(t, x) -> R^d diffusion: Diffusion function g(t, x) -> R^(d x m) x0: Initial condition t_span: Time interval (t0, tf) fractional_order: Fractional order (0 < α < 2) method: Solver method ("euler_maruyama", "milstein", "predictor_corrector") num_steps: Number of time steps seed: Random seed for Wiener process **kwargs: Additional solver parameters Returns: SDESolution object Example: >>> def drift(t, x): ... return -0.5 * x >>> def diffusion(t, x): ... return 0.2 * np.eye(1) >>> x0 = np.array([1.0]) >>> sol = solve_fractional_sde(drift, diffusion, x0, (0, 1), 0.5, num_steps=100) >>> print(sol.y[-1]) """ # Select solver based on method if method == "euler_maruyama": solver = FractionalEulerMaruyama(fractional_order, **kwargs) elif method == "milstein": solver = FractionalMilstein(fractional_order, **kwargs) else: raise ValueError(f"Unknown method: {method}. Available: 'euler_maruyama', 'milstein'") # Solve SDE solution = solver.solve(drift, diffusion, x0, t_span, num_steps=num_steps, seed=seed) return solution
[docs] def solve_fractional_sde_system( drift: Callable, diffusion: Callable, x0: np.ndarray, t_span: Tuple[float, float], fractional_order: Union[float, FractionalOrder, List[float]], method: str = "euler_maruyama", noise_type: str = "additive", num_steps: int = 100, seed: Optional[int] = None, **kwargs ) -> SDESolution: """ Solve a system of coupled fractional SDEs. Args: drift: Drift function f(t, x) -> R^d diffusion: Diffusion function g(t, x) -> R^(d x m) x0: Initial condition t_span: Time interval (t0, tf) fractional_order: Fractional order(s) for system method: Solver method noise_type: Type of noise ("additive" or "multiplicative") num_steps: Number of time steps seed: Random seed **kwargs: Additional parameters Returns: SDESolution object """ # Handle multiple fractional orders if isinstance(fractional_order, list): # Use average order for now (could be extended to per-equation orders) alpha = np.mean(fractional_order) else: alpha = fractional_order # Use the standard solver return solve_fractional_sde( drift, diffusion, x0, t_span, alpha, method, num_steps, seed, **kwargs )