Source code for hpfracc.solvers.ode_solvers

"""
Fractional Ordinary Differential Equation Solvers

This module provides comprehensive solvers for fractional ODEs including
various numerical methods, adaptive step size control, and error estimation.

Performance Note:
- Uses FFT-based convolution for O(N log N) history summation instead of O(N²)
- Intelligent backend selection for optimal performance (v2.1.0)
"""

import numpy as np
import time
from typing import Union, Optional, Tuple, Callable
from scipy import fft

from ..core.definitions import FractionalOrder

# Use adapter system for gamma function instead of direct imports


[docs] def _get_gamma_function(): """Get gamma function through adapter system.""" try: from ..special.gamma_beta import gamma_function as gamma return gamma except Exception: # Fallback to scipy from scipy.special import gamma return gamma
gamma = _get_gamma_function() # Initialize intelligent backend selector for ODE solvers _intelligent_selector = None _use_intelligent_backend = True
[docs] def _get_intelligent_selector(): """Get intelligent backend selector instance.""" global _intelligent_selector, _use_intelligent_backend if not _use_intelligent_backend: return None if _intelligent_selector is None: try: from ..ml.intelligent_backend_selector import IntelligentBackendSelector _intelligent_selector = IntelligentBackendSelector(enable_learning=True) except ImportError: _use_intelligent_backend = False _intelligent_selector = None return _intelligent_selector
[docs] def _select_fft_backend(data_size: int) -> str: """ Select optimal FFT backend based on data size. Args: data_size: Number of elements to process Returns: Backend name: "numpy", "jax", or "scipy" """ selector = _get_intelligent_selector() if selector is not None: try: from ..ml.intelligent_backend_selector import WorkloadCharacteristics from ..ml.backends import BackendType workload = WorkloadCharacteristics( operation_type="fft", data_size=data_size, data_shape=(data_size,), is_iterative=True ) backend_type = selector.select_backend(workload) # Map to FFT-specific backends if backend_type == BackendType.JAX: return "jax" elif backend_type == BackendType.TORCH: return "numpy" # PyTorch FFT is not ideal for this use case else: return "numpy" except Exception: pass # Fall through to default # Default selection based on size if data_size < 1000: return "numpy" # Small data: NumPy is faster else: return "scipy" # Large data: SciPy is optimized
[docs] def _fft_convolution(coeffs: np.ndarray, values: np.ndarray, axis: int = 0) -> np.ndarray: """ Fast convolution using FFT for O(N log N) performance with intelligent backend selection. This replaces the O(N²) direct summation loop for history terms in fractional ODEs. Now uses intelligent backend selection to choose optimal FFT implementation. Args: coeffs: Coefficient array (1D) values: Value array (can be 1D or 2D with axis specifying which dimension to convolve) axis: Axis along which to perform convolution (default 0) Returns: Convolution result (same shape as values) Mathematical basis: conv(C, Y) = ifft(fft(C) * fft(Y)) Performance: - Direct summation: O(N²) - FFT-based: O(N log N) - Backend selection: < 0.001 ms overhead """ N = coeffs.shape[0] # Select optimal FFT backend based on data size backend = _select_fft_backend(N * (values.shape[1] if values.ndim == 2 else 1)) # Try JAX FFT for large data (if available and selected) if backend == "jax" and N > 1000: try: import jax.numpy as jnp from jax.numpy import fft as jax_fft if values.ndim == 1: size = int(2 ** np.ceil(np.log2(2 * N - 1))) coeffs_jax = jnp.array(coeffs) values_jax = jnp.array(values) coeffs_fft = jax_fft.fft(coeffs_jax, n=size) values_fft = jax_fft.fft(values_jax, n=size) conv_result = jax_fft.ifft(coeffs_fft * values_fft).real[:N] return np.array(conv_result) elif values.ndim == 2 and axis == 0: size = int(2 ** np.ceil(np.log2(2 * N - 1))) num_cols = values.shape[1] coeffs_padded = jnp.zeros((size, num_cols)) coeffs_padded = coeffs_padded.at[:N, :].set(coeffs[:, np.newaxis]) values_padded = jnp.zeros((size, num_cols)) values_padded = values_padded.at[:N, :].set(values) coeffs_fft = jax_fft.fft(coeffs_padded, axis=0) values_fft = jax_fft.fft(values_padded, axis=0) conv_result = jax_fft.ifft(coeffs_fft * values_fft, axis=0).real[:N, :] return np.array(conv_result) except (ImportError, Exception): # Fall back to SciPy/NumPy pass # Default: Use SciPy FFT (works for all cases) if values.ndim == 1: # 1D case: simple convolution M = values.shape[0] if M != N: raise ValueError(f"Coefficient and value array lengths must match: {N} vs {M}") # Zero-pad to next power of 2 for optimal FFT performance size = int(2 ** np.ceil(np.log2(2 * N - 1))) # Perform FFT-based convolution coeffs_fft = fft.fft(coeffs, n=size) values_fft = fft.fft(values, n=size) conv_result = fft.ifft(coeffs_fft * values_fft).real[:N] return conv_result elif values.ndim == 2: # 2D case: convolve along specified axis if axis == 0: M = values.shape[0] if M != N: raise ValueError(f"Coefficient and value array lengths must match: {N} vs {M}") # Vectorized FFT for all columns at once (more efficient than loop) size = int(2 ** np.ceil(np.log2(2 * N - 1))) num_cols = values.shape[1] # Expand coeffs to match all columns: shape (size,) -> (size, num_cols) coeffs_padded = np.zeros((size, num_cols)) coeffs_padded[:N, :] = coeffs[:, np.newaxis] # Pad values: shape (N, num_cols) -> (size, num_cols) values_padded = np.zeros((size, num_cols)) values_padded[:N, :] = values # FFT along axis 0 for all columns simultaneously coeffs_fft = fft.fft(coeffs_padded, axis=0) values_fft = fft.fft(values_padded, axis=0) # Element-wise multiplication and inverse FFT conv_result = fft.ifft(coeffs_fft * values_fft, axis=0).real[:N, :] return conv_result else: raise NotImplementedError( "FFT convolution is currently only implemented for axis=0 (time axis). " "For multi-dimensional problems, transpose your data so time is the first axis, " "or use direct convolution methods instead of FFT-based approach." ) else: raise ValueError(f"FFT convolution only supports 1D and 2D arrays, got {values.ndim}D")
[docs] def _fast_history_sum(coeffs: np.ndarray, f_hist: np.ndarray, reverse: bool = True, verbose: bool = False) -> np.ndarray: """ Compute weighted sum of history using FFT-based convolution. This is the key optimization for fractional ODE solvers, replacing: sum_{j=0}^{n} coeffs[n-j] * f_hist[j] with an O(N log N) FFT-based approach instead of O(N²) direct summation. Args: coeffs: Coefficient array of length N f_hist: History array of shape (N,) or (N, m) where m is state dimension reverse: If True, apply coefficients in reverse order (typical for convolution) Returns: Weighted sum result (scalar or array of length m) """ start_time = time.perf_counter() if reverse: coeffs = coeffs[::-1] if f_hist.ndim == 1: # For 1D, use direct convolution and return the last element conv_full = _fft_convolution(coeffs, f_hist) result = conv_full[-1] else: # For 2D (N, m), convolve and return the last row conv_full = _fft_convolution(coeffs, f_hist, axis=0) result = conv_full[-1, :] end_time = time.perf_counter() if verbose and coeffs.shape[0] > 64: # Only print for FFT cases print(f"[TIMING] _fast_history_sum (N={coeffs.shape[0]}): {end_time - start_time:.6f}s") return result
[docs] class FixedStepODESolver: """ Base class for fixed-step fractional ODE solvers. Provides common functionality for solving fractional ordinary differential equations of the form: D^α y(t) = f(t, y(t)) where D^α is a fractional derivative operator. """
[docs] def __init__( self, derivative_type: str = "caputo", method: str = "predictor_corrector", adaptive: bool = True, tol: float = 1e-6, max_iter: int = 1000, *, fractional_order: Optional[Union[float, FractionalOrder]] = None, rtol: Optional[float] = None, atol: Optional[float] = None, # compatibility/extended params order: int = 1, min_h: Optional[float] = None, max_h: Optional[float] = None, min_step: Optional[float] = None, max_step: Optional[float] = None, ): """ Initialize fractional ODE solver. Args: derivative_type: Type of fractional derivative ("caputo", "riemann_liouville", "grunwald_letnikov") method: Numerical method ("predictor_corrector", "adams_bashforth", "runge_kutta") adaptive: Use adaptive step size control tol: Tolerance for convergence max_iter: Maximum number of iterations """ self.derivative_type = derivative_type.lower() self.method = method.lower() self.adaptive = adaptive self.tol = tol self.max_iter = max_iter # Accept optional fractional_order for compatibility; stored as attribute only self.fractional_order = fractional_order # Order compatibility (for predictor-corrector family) self.order = int(order) # Accept rtol/atol but map to tol if provided (basic solver uses single tol) if rtol is not None: self.tol = min(self.tol, float(rtol)) if atol is not None: self.tol = min(self.tol, float(atol)) # Step-size preferences (used by adaptive solvers) self.min_h = float(min_h) if min_h is not None else (float(min_step) if min_step is not None else None) self.max_h = float(max_h) if max_h is not None else (float(max_step) if max_step is not None else None) # Validate derivative type valid_derivatives = [ "caputo", "riemann_liouville", "grunwald_letnikov"] if self.derivative_type not in valid_derivatives: raise ValueError( f"Derivative type must be one of {valid_derivatives}") # Validate method valid_methods = [ "predictor_corrector", "adams_bashforth", "runge_kutta", "euler", ] if self.method not in valid_methods: raise ValueError(f"Method must be one of {valid_methods}")
[docs] def _validate_alpha(self, alpha: Union[float, FractionalOrder]): """Validate the fractional order.""" if isinstance(alpha, FractionalOrder): alpha_val = alpha.alpha else: alpha_val = alpha if not 0 < alpha_val <= 2: raise ValueError(f"Fractional order alpha must be in (0, 2], but got {alpha_val}") if self.method == "predictor_corrector" and not (0.0 < alpha_val <= 1.0): raise ValueError(f"The predictor-corrector method currently only supports orders in (0, 1], but got {alpha_val}")
[docs] def solve( self, f: Callable, t_span: Tuple[float, float], y0: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: Optional[float] = None, **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Solve fractional ODE. Args: f: Right-hand side function f(t, y) t_span: Time interval (t0, tf) y0: Initial condition(s) alpha: Fractional order h: Step size (None for adaptive) **kwargs: Additional solver parameters Returns: Tuple of (t_values, y_values) """ # Validate alpha range self._validate_alpha(alpha) t0, tf = t_span if h is None: h = (tf - t0) / 100 # Default step size if self.method == "predictor_corrector": return self._solve_predictor_corrector( f, t0, tf, y0, alpha, h, **kwargs) elif self.method == "adams_bashforth": return self._solve_adams_bashforth( f, t0, tf, y0, alpha, h, **kwargs) elif self.method == "runge_kutta": return self._solve_runge_kutta(f, t0, tf, y0, alpha, h, **kwargs) elif self.method == "euler": return self._solve_euler(f, t0, tf, y0, alpha, h, **kwargs) else: raise ValueError(f"Unknown method: {self.method}")
[docs] def _solve_predictor_corrector( self, f, t0, tf, y0, alpha, h, **kwargs ): """ Solve using Adams-Bashforth-Moulton predictor-corrector for Caputo fractional ODEs. Based on the Volterra integral formulation: y(t) = y_0 + (1/Γ(α)) ∫_0^t (t-τ)^(α-1) f(τ, y(τ)) dτ Reference: Diethelm, K. (2010). The Analysis of Fractional Differential Equations. """ # Only Caputo is supported in this scheme if self.derivative_type != "caputo": raise NotImplementedError( "Predictor-corrector method is currently implemented for Caputo derivative only. " "For Riemann-Liouville or other derivative types, use the fixed-step Euler method " "or convert your problem to use Caputo formulation." ) alpha_val = float(alpha.alpha) if hasattr(alpha, "alpha") else float(alpha) # grid N = int(np.ceil((tf - t0) / h)) + 1 t_values = np.linspace(t0, tf, N, dtype=float) y0_arr = np.atleast_1d(np.array(y0, dtype=float)) m = y0_arr.size Y = np.zeros((N, m), dtype=float) Y[0] = y0_arr # history of f F = np.zeros((N, m), dtype=float) F[0] = f(t_values[0], Y[0]) # ABM weights: b for predictor, c for corrector b, c = self._abm_weights(alpha_val, N) inv_g1 = 1.0 / gamma(alpha_val + 1.0) inv_g2 = 1.0 / gamma(alpha_val + 2.0) # FFT threshold: use FFT when history length exceeds this value fft_threshold = kwargs.get('fft_threshold', 64) # Number of corrector iterations for implicit refinement max_corrector_iter = kwargs.get('max_corrector_iter', 3) verbose = kwargs.get('verbose', False) for n in range(0, N - 1): # Compute history sum for predictor: S_p = sum_{j=0..n} b_{n-j} f(t_j, y_j) if n + 1 < fft_threshold: Sp = (b[:n+1][::-1, None] * F[:n+1]).sum(axis=0) else: Sp = _fast_history_sum(b[:n+1], F[:n+1], reverse=True, verbose=verbose) # Predictor step: y^{p}_{n+1} = y_0 + h^α/Γ(α+1) * S_p y_pred = y0_arr + (h**alpha_val) * inv_g1 * Sp # Compute history sum for corrector: S_c = sum_{j=0..n} c_{n-j} f(t_j, y_j) # Note: This uses same history F[:n+1] but different weights (c vs b) if n + 1 < fft_threshold: Sc = (c[:n+1][::-1, None] * F[:n+1]).sum(axis=0) else: Sc = _fast_history_sum(c[:n+1], F[:n+1], reverse=True, verbose=verbose) # Corrector step with iterative refinement # Formula: y_{n+1} = y_0 + h^α/Γ(α+2) * (f(t_{n+1}, y_{n+1}) + S_c) # This is implicit in y_{n+1}, so we iterate to convergence start_time_corr = time.perf_counter() y_corr = y_pred.copy() # Initial guess from predictor final_iter_count = 0 for iter_count in range(max_corrector_iter): final_iter_count = iter_count + 1 y_old = y_corr.copy() # Evaluate f at current corrector estimate f_corr = f(t_values[n+1], y_corr) # Update corrector estimate y_corr = y0_arr + (h**alpha_val) * inv_g2 * (f_corr + Sc) # Check convergence if np.allclose(y_corr, y_old, rtol=self.tol, atol=self.tol): break end_time_corr = time.perf_counter() if verbose: print(f"[TIMING] Corrector loop (n={n}, iters={final_iter_count}): {end_time_corr - start_time_corr:.6f}s") # Store converged solution Y[n+1] = y_corr F[n+1] = f(t_values[n+1], y_corr) return t_values, Y
[docs] def _solve_adams_bashforth( self, f: Callable, t0: float, tf: float, y0: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: float, **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Solve using Adams-Bashforth method. Args: f: Right-hand side function t0: Initial time tf: Final time y0: Initial condition alpha: Fractional order h: Step size **kwargs: Additional parameters Returns: Tuple of (t_values, y_values) """ # Convert to arrays if needed if np.isscalar(y0): y0 = np.array([y0]) # Time grid t_values = np.arange(t0, tf + h, h) N = len(t_values) # Solution array y_values = np.zeros((N, len(y0))) y_values[0] = y0 # Compute fractional derivative coefficients coeffs = self._compute_fractional_coefficients(alpha, N) # Main iteration loop for n in range(1, N): # Adams-Bashforth step y_values[n] = self._adams_bashforth_step( f, t_values, y_values, n, alpha, coeffs, h ) return t_values, y_values
[docs] def _solve_runge_kutta( self, f: Callable, t0: float, tf: float, y0: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: float, **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Solve using fractional Runge-Kutta method. Args: f: Right-hand side function t0: Initial time tf: Final time y0: Initial condition alpha: Fractional order h: Step size **kwargs: Additional parameters Returns: Tuple of (t_values, y_values) """ # Convert to arrays if needed if np.isscalar(y0): y0 = np.array([y0]) # Time grid t_values = np.arange(t0, tf + h, h) N = len(t_values) # Solution array y_values = np.zeros((N, len(y0))) y_values[0] = y0 # Main iteration loop for n in range(1, N): # Fractional Runge-Kutta step y_values[n] = self._runge_kutta_step( f, t_values, y_values, n, alpha, h) return t_values, y_values
[docs] def _solve_euler( self, f: Callable, t0: float, tf: float, y0: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: float, **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Solve using fractional Euler method. Args: f: Right-hand side function t0: Initial time tf: Final time y0: Initial condition alpha: Fractional order h: Step size **kwargs: Additional parameters Returns: Tuple of (t_values, y_values) """ # Convert to arrays if needed if np.isscalar(y0): y0 = np.array([y0]) # Time grid t_values = np.arange(t0, tf + h, h) N = len(t_values) # Solution array y_values = np.zeros((N, len(y0))) y_values[0] = y0 # Main iteration loop for n in range(1, N): # Fractional Euler step y_values[n] = self._euler_step( f, t_values, y_values, n, alpha, h) return t_values, y_values
[docs] def _euler_step( self, f: Callable, t_values: np.ndarray, y_values: np.ndarray, n: int, alpha: Union[float, FractionalOrder], h: float, ) -> np.ndarray: """Minimal fractional Euler update for 0<α≤1. y_n = y_{n-1} + h^α / Γ(α+1) * f(t_{n-1}, y_{n-1}) """ if isinstance(alpha, FractionalOrder): alpha_val = float(alpha.alpha) else: alpha_val = float(alpha) inv_gamma = 1.0 / gamma(alpha_val + 1.0) return y_values[n - 1] + (h ** alpha_val) * inv_gamma * f(t_values[n - 1], y_values[n - 1])
[docs] def _adams_bashforth_step( self, f: Callable, t_values: np.ndarray, y_values: np.ndarray, n: int, alpha: Union[float, FractionalOrder], coeffs: np.ndarray, h: float, ) -> np.ndarray: """Compute one fractional Adams-Bashforth-like explicit step. Uses available history with precomputed coefficients. For the first step, it falls back to Euler behavior. """ if isinstance(alpha, FractionalOrder): alpha_val = float(alpha.alpha) else: alpha_val = float(alpha) inv_gamma = 1.0 / gamma(alpha_val + 1.0) if n <= 1: return y_values[n - 1] + (h ** alpha_val) * inv_gamma * f(t_values[n - 1], y_values[n - 1]) # Weighted history of previous RHS evaluations rhs_hist = np.array( [f(t_values[k], y_values[k]) for k in range(n)], dtype=float, ) weights = coeffs[:n][::-1] history_term = np.sum(weights[:, None] * rhs_hist, axis=0) return y_values[n - 1] + (h ** alpha_val) * inv_gamma * history_term
[docs] def _runge_kutta_step( self, f: Callable, t_values: np.ndarray, y_values: np.ndarray, n: int, alpha: Union[float, FractionalOrder], h: float, ) -> np.ndarray: """Compute one fractional RK2-style step. Uses midpoint evaluation with fractional scaling factor. """ if isinstance(alpha, FractionalOrder): alpha_val = float(alpha.alpha) else: alpha_val = float(alpha) y_prev = y_values[n - 1] t_prev = t_values[n - 1] inv_gamma = 1.0 / gamma(alpha_val + 1.0) k1 = f(t_prev, y_prev) y_mid = y_prev + 0.5 * (h ** alpha_val) * inv_gamma * k1 t_mid = t_prev + 0.5 * h k2 = f(t_mid, y_mid) return y_prev + (h ** alpha_val) * inv_gamma * k2
[docs] def _compute_fractional_coefficients( self, alpha: Union[float, FractionalOrder], N: int ) -> np.ndarray: """ Compute fractional derivative coefficients. Args: alpha: Fractional order N: Number of time steps Returns: Array of coefficients """ if isinstance(alpha, FractionalOrder): alpha_val = alpha.alpha else: alpha_val = alpha coeffs = np.zeros(N) coeffs[0] = 1.0 for j in range(1, N): if self.derivative_type == "caputo": coeffs[j] = (j + 1) ** alpha_val - j**alpha_val elif self.derivative_type == "grunwald_letnikov": coeffs[j] = coeffs[j - 1] * (1 - (alpha_val + 1) / j) else: # Riemann-Liouville coeffs[j] = ( (-1) ** j * gamma(alpha_val + 1) / (gamma(j + 1) * gamma(alpha_val - j + 1)) ) return coeffs
def _abm_weights(self, alpha: float, N: int): # Predictor weights b_k = (k+1)^α - k^α k = np.arange(N, dtype=float) b = (k + 1.0)**alpha - k**alpha # Corrector weights c_k = Δ^2 (k)^{α+1} c = np.empty(N, dtype=float) c[0] = 1.0 if N > 1: kk = np.arange(1, N, dtype=float) c[1:] = (kk + 1.0)**(alpha + 1.0) - 2.0*kk**(alpha + 1.0) + (kk - 1.0)**(alpha + 1.0) return b, c
[docs] def solve_fractional_ode( f: Callable, t_span: Tuple[float, float], y0: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], derivative_type: str = "caputo", method: str = "predictor_corrector", adaptive: bool = False, h: Optional[float] = None, **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Solve fractional ODE. """ if adaptive: raise NotImplementedError("The adaptive solver is currently disabled due to a critical implementation flaw. Please use the fixed-step solver (adaptive=False).") solver = FixedStepODESolver(derivative_type, method, adaptive=False) return solver.solve(f, t_span, y0, alpha, h, **kwargs)
# Backwards-compatibility alias for tests and external users AdaptiveFractionalODESolver = None
[docs] def solve_fractional_system( f: Callable, t_span: Tuple[float, float], y0: np.ndarray, alpha: Union[float, np.ndarray], derivative_type: str = "caputo", method: str = "predictor_corrector", **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Solve system of fractional ODEs. Args: f: Right-hand side function f(t, y) t_span: Time interval (t0, tf) y0: Initial conditions alpha: Fractional order (scalar only). Per-component orders are not implemented. derivative_type: Type of fractional derivative method: Numerical method **kwargs: Additional solver parameters Returns: Tuple of (t_values, y_values) """ y0_arr = np.asarray(y0, dtype=float) if y0_arr.ndim != 1: raise ValueError( f"solve_fractional_system expects y0 to be a 1D array of shape (d,), got shape {y0_arr.shape}" ) if isinstance(alpha, np.ndarray): if alpha.size != 1: raise NotImplementedError( "Per-component fractional orders (alpha array with length != 1) are not implemented. " "Pass a scalar alpha for all components, or implement a multi-order system solver." ) alpha = float(alpha.reshape(-1)[0]) elif isinstance(alpha, (list, tuple)): if len(alpha) != 1: raise NotImplementedError( "Per-component fractional orders (alpha sequence with length != 1) are not implemented. " "Pass a scalar alpha for all components, or implement a multi-order system solver." ) alpha = alpha[0] # Single scalar order: delegate to scalar-path solver (same equation for each component). return solve_fractional_ode( f, t_span, y0_arr, alpha, derivative_type, method, **kwargs, )