"""
Fractional Ordinary Differential Equation Solvers
This module provides comprehensive solvers for fractional ODEs including
various numerical methods, adaptive step size control, and error estimation.
Performance Note:
- Uses FFT-based convolution for O(N log N) history summation instead of O(N²)
- Intelligent backend selection for optimal performance (v2.1.0)
"""
import numpy as np
import time
from typing import Union, Optional, Tuple, Callable
from scipy import fft
from ..core.definitions import FractionalOrder
# Use adapter system for gamma function instead of direct imports
[docs]
def _get_gamma_function():
"""Get gamma function through adapter system."""
try:
from ..special.gamma_beta import gamma_function as gamma
return gamma
except Exception:
# Fallback to scipy
from scipy.special import gamma
return gamma
gamma = _get_gamma_function()
# Initialize intelligent backend selector for ODE solvers
_intelligent_selector = None
_use_intelligent_backend = True
[docs]
def _get_intelligent_selector():
"""Get intelligent backend selector instance."""
global _intelligent_selector, _use_intelligent_backend
if not _use_intelligent_backend:
return None
if _intelligent_selector is None:
try:
from ..ml.intelligent_backend_selector import IntelligentBackendSelector
_intelligent_selector = IntelligentBackendSelector(enable_learning=True)
except ImportError:
_use_intelligent_backend = False
_intelligent_selector = None
return _intelligent_selector
[docs]
def _select_fft_backend(data_size: int) -> str:
"""
Select optimal FFT backend based on data size.
Args:
data_size: Number of elements to process
Returns:
Backend name: "numpy", "jax", or "scipy"
"""
selector = _get_intelligent_selector()
if selector is not None:
try:
from ..ml.intelligent_backend_selector import WorkloadCharacteristics
from ..ml.backends import BackendType
workload = WorkloadCharacteristics(
operation_type="fft",
data_size=data_size,
data_shape=(data_size,),
is_iterative=True
)
backend_type = selector.select_backend(workload)
# Map to FFT-specific backends
if backend_type == BackendType.JAX:
return "jax"
elif backend_type == BackendType.TORCH:
return "numpy" # PyTorch FFT is not ideal for this use case
else:
return "numpy"
except Exception:
pass # Fall through to default
# Default selection based on size
if data_size < 1000:
return "numpy" # Small data: NumPy is faster
else:
return "scipy" # Large data: SciPy is optimized
[docs]
def _fft_convolution(coeffs: np.ndarray, values: np.ndarray, axis: int = 0) -> np.ndarray:
"""
Fast convolution using FFT for O(N log N) performance with intelligent backend selection.
This replaces the O(N²) direct summation loop for history terms in fractional ODEs.
Now uses intelligent backend selection to choose optimal FFT implementation.
Args:
coeffs: Coefficient array (1D)
values: Value array (can be 1D or 2D with axis specifying which dimension to convolve)
axis: Axis along which to perform convolution (default 0)
Returns:
Convolution result (same shape as values)
Mathematical basis:
conv(C, Y) = ifft(fft(C) * fft(Y))
Performance:
- Direct summation: O(N²)
- FFT-based: O(N log N)
- Backend selection: < 0.001 ms overhead
"""
N = coeffs.shape[0]
# Select optimal FFT backend based on data size
backend = _select_fft_backend(N * (values.shape[1] if values.ndim == 2 else 1))
# Try JAX FFT for large data (if available and selected)
if backend == "jax" and N > 1000:
try:
import jax.numpy as jnp
from jax.numpy import fft as jax_fft
if values.ndim == 1:
size = int(2 ** np.ceil(np.log2(2 * N - 1)))
coeffs_jax = jnp.array(coeffs)
values_jax = jnp.array(values)
coeffs_fft = jax_fft.fft(coeffs_jax, n=size)
values_fft = jax_fft.fft(values_jax, n=size)
conv_result = jax_fft.ifft(coeffs_fft * values_fft).real[:N]
return np.array(conv_result)
elif values.ndim == 2 and axis == 0:
size = int(2 ** np.ceil(np.log2(2 * N - 1)))
num_cols = values.shape[1]
coeffs_padded = jnp.zeros((size, num_cols))
coeffs_padded = coeffs_padded.at[:N, :].set(coeffs[:, np.newaxis])
values_padded = jnp.zeros((size, num_cols))
values_padded = values_padded.at[:N, :].set(values)
coeffs_fft = jax_fft.fft(coeffs_padded, axis=0)
values_fft = jax_fft.fft(values_padded, axis=0)
conv_result = jax_fft.ifft(coeffs_fft * values_fft, axis=0).real[:N, :]
return np.array(conv_result)
except (ImportError, Exception):
# Fall back to SciPy/NumPy
pass
# Default: Use SciPy FFT (works for all cases)
if values.ndim == 1:
# 1D case: simple convolution
M = values.shape[0]
if M != N:
raise ValueError(f"Coefficient and value array lengths must match: {N} vs {M}")
# Zero-pad to next power of 2 for optimal FFT performance
size = int(2 ** np.ceil(np.log2(2 * N - 1)))
# Perform FFT-based convolution
coeffs_fft = fft.fft(coeffs, n=size)
values_fft = fft.fft(values, n=size)
conv_result = fft.ifft(coeffs_fft * values_fft).real[:N]
return conv_result
elif values.ndim == 2:
# 2D case: convolve along specified axis
if axis == 0:
M = values.shape[0]
if M != N:
raise ValueError(f"Coefficient and value array lengths must match: {N} vs {M}")
# Vectorized FFT for all columns at once (more efficient than loop)
size = int(2 ** np.ceil(np.log2(2 * N - 1)))
num_cols = values.shape[1]
# Expand coeffs to match all columns: shape (size,) -> (size, num_cols)
coeffs_padded = np.zeros((size, num_cols))
coeffs_padded[:N, :] = coeffs[:, np.newaxis]
# Pad values: shape (N, num_cols) -> (size, num_cols)
values_padded = np.zeros((size, num_cols))
values_padded[:N, :] = values
# FFT along axis 0 for all columns simultaneously
coeffs_fft = fft.fft(coeffs_padded, axis=0)
values_fft = fft.fft(values_padded, axis=0)
# Element-wise multiplication and inverse FFT
conv_result = fft.ifft(coeffs_fft * values_fft, axis=0).real[:N, :]
return conv_result
else:
raise NotImplementedError(
"FFT convolution is currently only implemented for axis=0 (time axis). "
"For multi-dimensional problems, transpose your data so time is the first axis, "
"or use direct convolution methods instead of FFT-based approach."
)
else:
raise ValueError(f"FFT convolution only supports 1D and 2D arrays, got {values.ndim}D")
[docs]
def _fast_history_sum(coeffs: np.ndarray, f_hist: np.ndarray, reverse: bool = True, verbose: bool = False) -> np.ndarray:
"""
Compute weighted sum of history using FFT-based convolution.
This is the key optimization for fractional ODE solvers, replacing:
sum_{j=0}^{n} coeffs[n-j] * f_hist[j]
with an O(N log N) FFT-based approach instead of O(N²) direct summation.
Args:
coeffs: Coefficient array of length N
f_hist: History array of shape (N,) or (N, m) where m is state dimension
reverse: If True, apply coefficients in reverse order (typical for convolution)
Returns:
Weighted sum result (scalar or array of length m)
"""
start_time = time.perf_counter()
if reverse:
coeffs = coeffs[::-1]
if f_hist.ndim == 1:
# For 1D, use direct convolution and return the last element
conv_full = _fft_convolution(coeffs, f_hist)
result = conv_full[-1]
else:
# For 2D (N, m), convolve and return the last row
conv_full = _fft_convolution(coeffs, f_hist, axis=0)
result = conv_full[-1, :]
end_time = time.perf_counter()
if verbose and coeffs.shape[0] > 64: # Only print for FFT cases
print(f"[TIMING] _fast_history_sum (N={coeffs.shape[0]}): {end_time - start_time:.6f}s")
return result
[docs]
class FixedStepODESolver:
"""
Base class for fixed-step fractional ODE solvers.
Provides common functionality for solving fractional ordinary
differential equations of the form:
D^α y(t) = f(t, y(t))
where D^α is a fractional derivative operator.
"""
[docs]
def __init__(
self,
derivative_type: str = "caputo",
method: str = "predictor_corrector",
adaptive: bool = True,
tol: float = 1e-6,
max_iter: int = 1000,
*,
fractional_order: Optional[Union[float, FractionalOrder]] = None,
rtol: Optional[float] = None,
atol: Optional[float] = None,
# compatibility/extended params
order: int = 1,
min_h: Optional[float] = None,
max_h: Optional[float] = None,
min_step: Optional[float] = None,
max_step: Optional[float] = None,
):
"""
Initialize fractional ODE solver.
Args:
derivative_type: Type of fractional derivative ("caputo", "riemann_liouville", "grunwald_letnikov")
method: Numerical method ("predictor_corrector", "adams_bashforth", "runge_kutta")
adaptive: Use adaptive step size control
tol: Tolerance for convergence
max_iter: Maximum number of iterations
"""
self.derivative_type = derivative_type.lower()
self.method = method.lower()
self.adaptive = adaptive
self.tol = tol
self.max_iter = max_iter
# Accept optional fractional_order for compatibility; stored as attribute only
self.fractional_order = fractional_order
# Order compatibility (for predictor-corrector family)
self.order = int(order)
# Accept rtol/atol but map to tol if provided (basic solver uses single tol)
if rtol is not None:
self.tol = min(self.tol, float(rtol))
if atol is not None:
self.tol = min(self.tol, float(atol))
# Step-size preferences (used by adaptive solvers)
self.min_h = float(min_h) if min_h is not None else (float(min_step) if min_step is not None else None)
self.max_h = float(max_h) if max_h is not None else (float(max_step) if max_step is not None else None)
# Validate derivative type
valid_derivatives = [
"caputo", "riemann_liouville", "grunwald_letnikov"]
if self.derivative_type not in valid_derivatives:
raise ValueError(
f"Derivative type must be one of {valid_derivatives}")
# Validate method
valid_methods = [
"predictor_corrector",
"adams_bashforth",
"runge_kutta",
"euler",
]
if self.method not in valid_methods:
raise ValueError(f"Method must be one of {valid_methods}")
[docs]
def _validate_alpha(self, alpha: Union[float, FractionalOrder]):
"""Validate the fractional order."""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
if not 0 < alpha_val <= 2:
raise ValueError(f"Fractional order alpha must be in (0, 2], but got {alpha_val}")
if self.method == "predictor_corrector" and not (0.0 < alpha_val <= 1.0):
raise ValueError(f"The predictor-corrector method currently only supports orders in (0, 1], but got {alpha_val}")
[docs]
def solve(
self,
f: Callable,
t_span: Tuple[float, float],
y0: Union[float, np.ndarray],
alpha: Union[float, FractionalOrder],
h: Optional[float] = None,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Solve fractional ODE.
Args:
f: Right-hand side function f(t, y)
t_span: Time interval (t0, tf)
y0: Initial condition(s)
alpha: Fractional order
h: Step size (None for adaptive)
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, y_values)
"""
# Validate alpha range
self._validate_alpha(alpha)
t0, tf = t_span
if h is None:
h = (tf - t0) / 100 # Default step size
if self.method == "predictor_corrector":
return self._solve_predictor_corrector(
f, t0, tf, y0, alpha, h, **kwargs)
elif self.method == "adams_bashforth":
return self._solve_adams_bashforth(
f, t0, tf, y0, alpha, h, **kwargs)
elif self.method == "runge_kutta":
return self._solve_runge_kutta(f, t0, tf, y0, alpha, h, **kwargs)
elif self.method == "euler":
return self._solve_euler(f, t0, tf, y0, alpha, h, **kwargs)
else:
raise ValueError(f"Unknown method: {self.method}")
[docs]
def _solve_predictor_corrector(
self, f, t0, tf, y0, alpha, h, **kwargs
):
"""
Solve using Adams-Bashforth-Moulton predictor-corrector for Caputo fractional ODEs.
Based on the Volterra integral formulation:
y(t) = y_0 + (1/Γ(α)) ∫_0^t (t-τ)^(α-1) f(τ, y(τ)) dτ
Reference: Diethelm, K. (2010). The Analysis of Fractional Differential Equations.
"""
# Only Caputo is supported in this scheme
if self.derivative_type != "caputo":
raise NotImplementedError(
"Predictor-corrector method is currently implemented for Caputo derivative only. "
"For Riemann-Liouville or other derivative types, use the fixed-step Euler method "
"or convert your problem to use Caputo formulation."
)
alpha_val = float(alpha.alpha) if hasattr(alpha, "alpha") else float(alpha)
# grid
N = int(np.ceil((tf - t0) / h)) + 1
t_values = np.linspace(t0, tf, N, dtype=float)
y0_arr = np.atleast_1d(np.array(y0, dtype=float))
m = y0_arr.size
Y = np.zeros((N, m), dtype=float)
Y[0] = y0_arr
# history of f
F = np.zeros((N, m), dtype=float)
F[0] = f(t_values[0], Y[0])
# ABM weights: b for predictor, c for corrector
b, c = self._abm_weights(alpha_val, N)
inv_g1 = 1.0 / gamma(alpha_val + 1.0)
inv_g2 = 1.0 / gamma(alpha_val + 2.0)
# FFT threshold: use FFT when history length exceeds this value
fft_threshold = kwargs.get('fft_threshold', 64)
# Number of corrector iterations for implicit refinement
max_corrector_iter = kwargs.get('max_corrector_iter', 3)
verbose = kwargs.get('verbose', False)
for n in range(0, N - 1):
# Compute history sum for predictor: S_p = sum_{j=0..n} b_{n-j} f(t_j, y_j)
if n + 1 < fft_threshold:
Sp = (b[:n+1][::-1, None] * F[:n+1]).sum(axis=0)
else:
Sp = _fast_history_sum(b[:n+1], F[:n+1], reverse=True, verbose=verbose)
# Predictor step: y^{p}_{n+1} = y_0 + h^α/Γ(α+1) * S_p
y_pred = y0_arr + (h**alpha_val) * inv_g1 * Sp
# Compute history sum for corrector: S_c = sum_{j=0..n} c_{n-j} f(t_j, y_j)
# Note: This uses same history F[:n+1] but different weights (c vs b)
if n + 1 < fft_threshold:
Sc = (c[:n+1][::-1, None] * F[:n+1]).sum(axis=0)
else:
Sc = _fast_history_sum(c[:n+1], F[:n+1], reverse=True, verbose=verbose)
# Corrector step with iterative refinement
# Formula: y_{n+1} = y_0 + h^α/Γ(α+2) * (f(t_{n+1}, y_{n+1}) + S_c)
# This is implicit in y_{n+1}, so we iterate to convergence
start_time_corr = time.perf_counter()
y_corr = y_pred.copy() # Initial guess from predictor
final_iter_count = 0
for iter_count in range(max_corrector_iter):
final_iter_count = iter_count + 1
y_old = y_corr.copy()
# Evaluate f at current corrector estimate
f_corr = f(t_values[n+1], y_corr)
# Update corrector estimate
y_corr = y0_arr + (h**alpha_val) * inv_g2 * (f_corr + Sc)
# Check convergence
if np.allclose(y_corr, y_old, rtol=self.tol, atol=self.tol):
break
end_time_corr = time.perf_counter()
if verbose:
print(f"[TIMING] Corrector loop (n={n}, iters={final_iter_count}): {end_time_corr - start_time_corr:.6f}s")
# Store converged solution
Y[n+1] = y_corr
F[n+1] = f(t_values[n+1], y_corr)
return t_values, Y
[docs]
def _solve_adams_bashforth(
self,
f: Callable,
t0: float,
tf: float,
y0: Union[float, np.ndarray],
alpha: Union[float, FractionalOrder],
h: float,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Solve using Adams-Bashforth method.
Args:
f: Right-hand side function
t0: Initial time
tf: Final time
y0: Initial condition
alpha: Fractional order
h: Step size
**kwargs: Additional parameters
Returns:
Tuple of (t_values, y_values)
"""
# Convert to arrays if needed
if np.isscalar(y0):
y0 = np.array([y0])
# Time grid
t_values = np.arange(t0, tf + h, h)
N = len(t_values)
# Solution array
y_values = np.zeros((N, len(y0)))
y_values[0] = y0
# Compute fractional derivative coefficients
coeffs = self._compute_fractional_coefficients(alpha, N)
# Main iteration loop
for n in range(1, N):
# Adams-Bashforth step
y_values[n] = self._adams_bashforth_step(
f, t_values, y_values, n, alpha, coeffs, h
)
return t_values, y_values
[docs]
def _solve_runge_kutta(
self,
f: Callable,
t0: float,
tf: float,
y0: Union[float, np.ndarray],
alpha: Union[float, FractionalOrder],
h: float,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Solve using fractional Runge-Kutta method.
Args:
f: Right-hand side function
t0: Initial time
tf: Final time
y0: Initial condition
alpha: Fractional order
h: Step size
**kwargs: Additional parameters
Returns:
Tuple of (t_values, y_values)
"""
# Convert to arrays if needed
if np.isscalar(y0):
y0 = np.array([y0])
# Time grid
t_values = np.arange(t0, tf + h, h)
N = len(t_values)
# Solution array
y_values = np.zeros((N, len(y0)))
y_values[0] = y0
# Main iteration loop
for n in range(1, N):
# Fractional Runge-Kutta step
y_values[n] = self._runge_kutta_step(
f, t_values, y_values, n, alpha, h)
return t_values, y_values
[docs]
def _solve_euler(
self,
f: Callable,
t0: float,
tf: float,
y0: Union[float, np.ndarray],
alpha: Union[float, FractionalOrder],
h: float,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Solve using fractional Euler method.
Args:
f: Right-hand side function
t0: Initial time
tf: Final time
y0: Initial condition
alpha: Fractional order
h: Step size
**kwargs: Additional parameters
Returns:
Tuple of (t_values, y_values)
"""
# Convert to arrays if needed
if np.isscalar(y0):
y0 = np.array([y0])
# Time grid
t_values = np.arange(t0, tf + h, h)
N = len(t_values)
# Solution array
y_values = np.zeros((N, len(y0)))
y_values[0] = y0
# Main iteration loop
for n in range(1, N):
# Fractional Euler step
y_values[n] = self._euler_step(
f, t_values, y_values, n, alpha, h)
return t_values, y_values
[docs]
def _euler_step(
self,
f: Callable,
t_values: np.ndarray,
y_values: np.ndarray,
n: int,
alpha: Union[float, FractionalOrder],
h: float,
) -> np.ndarray:
"""Minimal fractional Euler update for 0<α≤1.
y_n = y_{n-1} + h^α / Γ(α+1) * f(t_{n-1}, y_{n-1})
"""
if isinstance(alpha, FractionalOrder):
alpha_val = float(alpha.alpha)
else:
alpha_val = float(alpha)
inv_gamma = 1.0 / gamma(alpha_val + 1.0)
return y_values[n - 1] + (h ** alpha_val) * inv_gamma * f(t_values[n - 1], y_values[n - 1])
[docs]
def _compute_fractional_coefficients(
self, alpha: Union[float, FractionalOrder], N: int
) -> np.ndarray:
"""
Compute fractional derivative coefficients.
Args:
alpha: Fractional order
N: Number of time steps
Returns:
Array of coefficients
"""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
coeffs = np.zeros(N)
coeffs[0] = 1.0
for j in range(1, N):
if self.derivative_type == "caputo":
coeffs[j] = (j + 1) ** alpha_val - j**alpha_val
elif self.derivative_type == "grunwald_letnikov":
coeffs[j] = coeffs[j - 1] * (1 - (alpha_val + 1) / j)
else: # Riemann-Liouville
coeffs[j] = (
(-1) ** j
* gamma(alpha_val + 1)
/ (gamma(j + 1) * gamma(alpha_val - j + 1))
)
return coeffs
def _abm_weights(self, alpha: float, N: int):
# Predictor weights b_k = (k+1)^α - k^α
k = np.arange(N, dtype=float)
b = (k + 1.0)**alpha - k**alpha
# Corrector weights c_k = Δ^2 (k)^{α+1}
c = np.empty(N, dtype=float)
c[0] = 1.0
if N > 1:
kk = np.arange(1, N, dtype=float)
c[1:] = (kk + 1.0)**(alpha + 1.0) - 2.0*kk**(alpha + 1.0) + (kk - 1.0)**(alpha + 1.0)
return b, c
[docs]
def solve_fractional_ode(
f: Callable,
t_span: Tuple[float, float],
y0: Union[float, np.ndarray],
alpha: Union[float, FractionalOrder],
derivative_type: str = "caputo",
method: str = "predictor_corrector",
adaptive: bool = False,
h: Optional[float] = None,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Solve fractional ODE.
"""
if adaptive:
raise NotImplementedError("The adaptive solver is currently disabled due to a critical implementation flaw. Please use the fixed-step solver (adaptive=False).")
solver = FixedStepODESolver(derivative_type, method, adaptive=False)
return solver.solve(f, t_span, y0, alpha, h, **kwargs)
# Backwards-compatibility alias for tests and external users
AdaptiveFractionalODESolver = None
[docs]
def solve_fractional_system(
f: Callable,
t_span: Tuple[float, float],
y0: np.ndarray,
alpha: Union[float, np.ndarray],
derivative_type: str = "caputo",
method: str = "predictor_corrector",
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Solve system of fractional ODEs.
Args:
f: Right-hand side function f(t, y)
t_span: Time interval (t0, tf)
y0: Initial conditions
alpha: Fractional orders (scalar or array)
derivative_type: Type of fractional derivative
method: Numerical method
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, y_values)
"""
# For now, use the same solver for systems
# In practice, you might want specialized system solvers
return solve_fractional_ode(
f,
t_span,
y0,
alpha,
derivative_type,
method,
**kwargs)