Source code for hpfracc.algorithms.integral_methods

"""
Fractional Integral Methods

**Legacy / alternate API:** FFT- and method-switching integral classes used by
existing tests and examples. For the **canonical** integral hierarchy tied to
``FractionalOrder`` and ``core`` (quad-based ``__call__`` / ``compute``), use
``hpfracc.core.integrals`` (``RiemannLiouvilleIntegral``, ``CaputoIntegral``, …).
The **direct** RL path in this module calls that canonical implementation for
``α ≥ 1e-4``; for smaller ``α`` a legacy discrete sum avoids quad blow-up on short grids.
:class:`WeylIntegral` here follows the same rule: **core** quad Weyl for normal ``α``,
legacy discrete window for ``α < 1e-4``.

**``method`` and ``fft_threshold`` (RL / Caputo):** With ``method="auto"``, arrays
shorter than ``fft_threshold`` (default ``1000`` on :class:`RiemannLiouvilleIntegral`)
use the direct path; longer grids use FFT. For ``α ≥ 1e-4``, direct mode calls
``hpfracc.core.integrals`` (quad on each prefix of ``t``). If FFT accumulation looks
unstable for your grid length or ``α``, pass ``method="direct"`` or use
``core.integrals`` directly.

**``method="adaptive"``:** FFT and direct paths are **different discretizations**;
cross-check uses a **scale-aware** relative L2 residual. ``UserWarning`` signals
moderate disagreement (direct is returned); ``RuntimeWarning`` is reserved for
non-finite FFT output or **severe** disagreement—see module constants
``_ADAPTIVE_AGREE_RTOL`` and ``_ADAPTIVE_SEVERE_RTOL``.

This module provides high-performance implementations of fractional integrals
including Riemann-Liouville and Caputo integrals with optimized algorithms.
"""

import numpy as np
from typing import Union, Optional, Callable
import warnings

from scipy.signal import fftconvolve

from ..core.definitions import FractionalOrder
from ..special import gamma

# Adaptive RL: FFT vs direct are different discretizations; these are heuristics only.
_ADAPTIVE_AGREE_RTOL = 5e-2  # accept FFT if L2-relative discrepancy vs direct is below this
_ADAPTIVE_SEVERE_RTOL = 5e-1  # at or above: treat as numerical alarm (RuntimeWarning)


[docs] def _adaptive_fft_direct_relative_l2(a: np.ndarray, b: np.ndarray) -> float: """``||a-b||_2 / max(||b||_2, eps)`` for scale-aware comparison.""" a = np.asarray(a, dtype=np.float64) b = np.asarray(b, dtype=np.float64) scale = max(float(np.linalg.norm(b)), np.finfo(np.float64).eps * max(b.size, 1)) return float(np.linalg.norm(a - b) / scale)
[docs] class RiemannLiouvilleIntegral: """ Riemann-Liouville fractional integral of order α. The Riemann-Liouville fractional integral of order α > 0 is defined as: I^α f(t) = (1/Γ(α)) ∫₀ᵗ (t-τ)^(α-1) f(τ) dτ where Γ(α) is the gamma function. Features: - Optimized FFT-based computation for large arrays - Direct method for small arrays with high accuracy - Memory-efficient algorithms - Support for both callable and array inputs - Error estimation and convergence analysis **Thresholds:** ``method="auto"`` chooses FFT when ``len(t) >= fft_threshold`` (default ``1000``); otherwise the direct path is used. For ``α ≥ 1e-4``, direct computation is delegated to :class:`hpfracc.core.integrals.RiemannLiouvilleIntegral`. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder], method: str = "auto", optimize_memory: bool = True, use_jax: bool = False ): """ Initialize Riemann-Liouville integral calculator. Args: alpha: Fractional order (must be > 0) method: Computation method ("auto", "fft", "direct", "adaptive") optimize_memory: Use memory optimization techniques use_jax: Use JAX acceleration if available """ if isinstance(alpha, FractionalOrder): alpha = alpha.alpha if alpha < 0: raise ValueError("Fractional order must be non-negative") self.alpha = alpha self.method = method.lower() self.optimize_memory = optimize_memory self.use_jax = use_jax # Validate method valid_methods = ["auto", "fft", "direct", "adaptive"] if self.method not in valid_methods: raise ValueError(f"Method must be one of {valid_methods}") # Precompute gamma value self.gamma_alpha = gamma(alpha) # Set method thresholds self.fft_threshold = 1000 # Use FFT for arrays larger than this
[docs] def compute( self, f: Union[Callable, np.ndarray], t: np.ndarray, h: Optional[float] = None, method: Optional[str] = None ) -> np.ndarray: """ Compute the Riemann-Liouville fractional integral. Args: f: Function to integrate (callable) or function values (array) t: Time points where integral is evaluated h: Step size (if None, computed from t) method: Override the default method Returns: Array of integral values at each time point """ if method is None: method = self.method # Handle α = 0: identity integral if self.alpha == 0: # Accept length >= 1 if callable(f): return np.array([f(ti) for ti in t]) return np.asarray(f) # Handle empty and short inputs gracefully if len(t) == 0: return np.array([]) # Determine step size if h is None: if len(t) >= 2: h = t[1] - t[0] else: # For a single point, any positive step works; result will be zero h = 1.0 # Convert callable to array if needed if callable(f): f_array = np.array([f(ti) for ti in t]) else: f_array = np.asarray(f) if f_array.shape != t.shape: raise ValueError("Function values must match time array shape") # Choose computation method if method == "auto": method = self._select_optimal_method(len(t)) # Compute integral using selected method if method == "fft": return self._compute_fft(f_array, t, h) elif method == "direct": return self._compute_direct(f_array, t, h) elif method == "adaptive": return self._compute_adaptive(f_array, t, h) else: raise ValueError(f"Unknown method: {method}")
[docs] def _select_optimal_method(self, n_points: int) -> str: """Select optimal computation method based on array size.""" if n_points >= self.fft_threshold: return "fft" else: return "direct"
[docs] def _compute_fft( self, f: np.ndarray, t: np.ndarray, h: float) -> np.ndarray: """ RL fractional integral via **linear** convolution on the uniform grid. Matches the legacy discrete sum in :meth:`_compute_direct_discrete_legacy` (kernel weight ``((i-j)h)^{\\alpha-1} / \\Gamma(\\alpha)``, then ``\\times h``). Uses ``scipy.signal.fftconvolve`` so the operation is **causal** (first ``n`` samples of the full convolution), not periodic length-``n`` FFT convolution (which was incorrect and disagreed strongly with quadrature). """ n = len(t) if n == 0: return np.array([], dtype=np.float64) kernel = np.zeros(n, dtype=np.float64) for i in range(1, n): kernel[i] = (i * h) ** (self.alpha - 1) kernel /= self.gamma_alpha f64 = np.asarray(f, dtype=np.float64) conv = fftconvolve(f64, kernel, mode="full") out = np.asarray(conv[:n], dtype=np.float64) * h # For nonnegative samples, RL fractional integral is nonnegative; clip # only sub-roundoff negatives from floating-point accumulation. if np.min(f64) >= 0.0: out = np.maximum(out, 0.0) return out
[docs] def _compute_direct_discrete_legacy( self, f: np.ndarray, t: np.ndarray, h: float) -> np.ndarray: """Original O(N²) uniform-grid RL sum (used only for tiny α where quad is ill-conditioned).""" n = len(t) result = np.zeros(n) for i in range(n): integral = 0.0 for j in range(i + 1): if j == i: weight = 0 else: weight = ((i - j) * h) ** (self.alpha - 1) integral += weight * f[j] result[i] = (integral * h) / self.gamma_alpha return result
[docs] def _compute_direct( self, f: np.ndarray, t: np.ndarray, h: float) -> np.ndarray: """ RL integral on the sample grid via the canonical quad-based implementation in ``hpfracc.core.integrals`` (same definition as the rest of the library), except for extremely small α where the kernel is near-singular on a short grid. """ if self.alpha < 1e-4: return self._compute_direct_discrete_legacy(f, t, h) from ..core.definitions import FractionalOrder from ..core.integrals import RiemannLiouvilleIntegral as CoreRLI core = CoreRLI(FractionalOrder(float(self.alpha))) return np.asarray(core(f, t), dtype=float)
[docs] def _compute_adaptive( self, f: np.ndarray, t: np.ndarray, h: float) -> np.ndarray: """ Compare FFT to the direct reference on short grids (``len(t) < fft_threshold``). FFT and quad/discrete direct are **not** required to match to tight ``allclose`` tolerances; a scale-aware L2 relative residual is used. ``RuntimeWarning`` is used only for non-finite FFT output or **severe** disagreement; moderate mismatch emits ``UserWarning`` and still returns the direct result. """ result_fft = self._compute_fft(f, t, h) if len(t) >= self.fft_threshold: return result_fft result_direct = self._compute_direct(f, t, h) fft_finite = np.all(np.isfinite(result_fft)) direct_finite = np.all(np.isfinite(result_direct)) if not fft_finite: warnings.warn( "FFT RL fractional integral returned non-finite values; " "using direct reference.", RuntimeWarning, stacklevel=2, ) return result_direct if not direct_finite: warnings.warn( "Direct RL fractional integral returned non-finite values; " "check quadrature, α, or grid. Returning direct array as-is.", RuntimeWarning, stacklevel=2, ) return result_direct rel = _adaptive_fft_direct_relative_l2(result_fft, result_direct) if rel <= _ADAPTIVE_AGREE_RTOL: return result_fft if rel >= _ADAPTIVE_SEVERE_RTOL: warnings.warn( f"FFT vs direct RL integral disagree severely " f"(relative L2 discrepancy {rel:.3g} >= {_ADAPTIVE_SEVERE_RTOL}); " "using direct reference.", RuntimeWarning, stacklevel=2, ) else: warnings.warn( f"FFT vs direct RL integral differ (relative L2 discrepancy {rel:.3g}; " f"agree if <={_ADAPTIVE_AGREE_RTOL}, severe if >={_ADAPTIVE_SEVERE_RTOL}). " "Using direct reference. For publication-grade values prefer " "method='direct' or hpfracc.core.integrals.", UserWarning, stacklevel=2, ) return result_direct
[docs] class CaputoIntegral: """ Caputo fractional integral of order α. For α > 0, the Caputo fractional integral equals the Riemann-Liouville integral: I^α f(t) = (1/Γ(α)) ∫₀ᵗ (t-τ)^(α-1) f(τ) dτ This class provides a consistent interface while reusing the RL implementation. ``method`` and ``fft_threshold`` behaviour match :class:`RiemannLiouvilleIntegral`. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder], method: str = "auto", optimize_memory: bool = True, use_jax: bool = False ): """ Initialize Caputo integral calculator. Args: alpha: Fractional order (must be > 0) method: Computation method ("auto", "fft", "direct", "adaptive") optimize_memory: Use memory optimization techniques use_jax: Use JAX acceleration if available """ if isinstance(alpha, FractionalOrder): alpha = alpha.alpha if alpha < 0: raise ValueError("Fractional order must be non-negative") # Store alpha and optionally create RL delegate self.alpha = alpha self.method = method self.optimize_memory = optimize_memory self.use_jax = use_jax self.rl_integral = None if alpha == 0 else RiemannLiouvilleIntegral( alpha, method, optimize_memory, use_jax )
[docs] def compute( self, f: Union[Callable, np.ndarray], t: np.ndarray, h: Optional[float] = None, method: Optional[str] = None ) -> np.ndarray: """ Compute the Caputo fractional integral. Args: f: Function to integrate (callable) or function values (array) t: Time points where integral is evaluated h: Step size (if None, computed from t) method: Override the default method Returns: Array of integral values at each time point """ # α = 0: identity if self.alpha == 0: if callable(f): return np.array([f(ti) for ti in t]) return np.asarray(f) # Delegate to RL implementation return self.rl_integral.compute(f, t, h, method)
[docs] class WeylIntegral: """ Weyl fractional integral of order α (grid API). For ``α ≥ 1e-4``, delegates to the canonical quad-based :class:`hpfracc.core.integrals.WeylIntegral`. For smaller ``α`` (near-identity kernel), keeps a legacy discrete RL-style sum on the sample grid for stability. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder], method: str = "auto", ): if isinstance(alpha, FractionalOrder): alpha = alpha.alpha if alpha < 0: raise ValueError("Fractional order must be non-negative") self.alpha = alpha self.method = method
[docs] def _compute_weyl_discrete_legacy( self, f: Union[Callable, np.ndarray], t: np.ndarray, h: float, ) -> np.ndarray: """Original windowed RL-style sum on ``t`` (used only for tiny α).""" if callable(f): f_array = np.array([f(ti) for ti in t]) else: f_array = np.asarray(f) n = len(t) result = np.zeros(n) from ..special import gamma as _gamma gamma_alpha = _gamma(self.alpha) for i in range(n): integ = 0.0 for j in range(i + 1): if j == i: weight = 0.0 else: weight = ((i - j) * h) ** (self.alpha - 1) integ += weight * f_array[j] result[i] = (integ * h) / gamma_alpha return result
[docs] def compute( self, f: Union[Callable, np.ndarray], t: np.ndarray, h: Optional[float] = None, ) -> np.ndarray: if self.alpha == 0: if callable(f): return np.array([f(ti) for ti in t]) return np.asarray(f) if len(t) == 0: return np.array([]) if len(t) < 2: if callable(f): return np.array([f(ti) for ti in t]) return np.asarray(f) if h is None and len(t) >= 2: h = float(t[1] - t[0]) elif h is None: h = 1.0 if self.alpha < 1e-4: return self._compute_weyl_discrete_legacy(f, t, h) from ..core.definitions import FractionalOrder from ..core.integrals import WeylIntegral as CoreWeyl core = CoreWeyl(FractionalOrder(float(self.alpha))) return np.asarray(core(f, t), dtype=float)
# Convenience functions for easy access
[docs] def riemann_liouville_integral( f: Union[Callable, np.ndarray], t: np.ndarray, alpha: Union[float, FractionalOrder], h: Optional[float] = None, method: str = "auto" ) -> np.ndarray: """ Compute Riemann-Liouville fractional integral. Args: f: Function to integrate or function values t: Time points alpha: Fractional order h: Step size method: Computation method Returns: Integral values """ calculator = RiemannLiouvilleIntegral(alpha, method) return calculator.compute(f, t, h)
[docs] def caputo_integral( f: Union[Callable, np.ndarray], t: np.ndarray, alpha: Union[float, FractionalOrder], h: Optional[float] = None, method: str = "auto" ) -> np.ndarray: """ Compute Caputo fractional integral. Args: f: Function to integrate or function values t: Time points alpha: Fractional order h: Step size method: Computation method Returns: Integral values """ calculator = CaputoIntegral(alpha, method) return calculator.compute(f, t, h)
# Optimized versions for high-performance applications
[docs] def optimized_riemann_liouville_integral( f: Union[Callable, np.ndarray], t: np.ndarray, alpha: Union[float, FractionalOrder], h: Optional[float] = None ) -> np.ndarray: """ Optimized Riemann-Liouville integral with automatic method selection. Args: f: Function to integrate or function values t: Time points alpha: Fractional order h: Step size Returns: Integral values """ calculator = RiemannLiouvilleIntegral( alpha, method="auto", optimize_memory=True) return calculator.compute(f, t, h)
[docs] def optimized_caputo_integral( f: Union[Callable, np.ndarray], t: np.ndarray, alpha: Union[float, FractionalOrder], h: Optional[float] = None ) -> np.ndarray: """ Optimized Caputo integral with automatic method selection. Args: f: Function to integrate or function values t: Time points alpha: Fractional order h: Step size Returns: Integral values """ calculator = CaputoIntegral(alpha, method="auto", optimize_memory=True) return calculator.compute(f, t, h)