Source code for hpfracc.algorithms.derivatives

"""
Unified fractional derivative engines (canonical implementation path).

`RiemannLiouville`, `Caputo`, and `GrunwaldLetnikov` here are the **single**
dispatching implementations used across the library. Higher-level wrappers in
`hpfracc.core.fractional_implementations` delegate to these classes; names such as
``OptimizedCaputo`` in `hpfracc.algorithms.optimized_methods` are backward-compatible
aliases of the same types. ``ParallelOptimized*`` in that module are the same engine
classes with a parallel-oriented name; ``compute_numerical`` is provided on
``UnifiedFractionalOperator`` so call sites match adapter-style APIs without duplicating
kernels.
"""

import os
import warnings
import numpy as np
from typing import Any, Union, Callable, Optional
from ..core.definitions import FractionalOrder
from .base import FractionalOperator
from .dispatch import BackendDispatcher
from .impls import numpy_backend, jax_backend, cuda_backend

_STRICT_NUMERICS = os.getenv("HPFRACC_STRICT_NUMERICS", "").strip().lower() in {
    "1",
    "true",
    "yes",
    "on",
}


[docs] def _handle_backend_failure(method: str, backend: str, error: Exception) -> None: """Apply fail-fast policy or explicit warning for backend failures.""" message = ( f"{method} backend '{backend}' failed: {error}. " "Falling back to NumPy backend." ) warnings.warn(message, RuntimeWarning) if _STRICT_NUMERICS: raise RuntimeError( f"{method} backend '{backend}' failed with strict numerics enabled." ) from error
[docs] class UnifiedFractionalOperator(FractionalOperator): """ Base class for unified operators with backend dispatch. """
[docs] def __init__(self, order: Union[float, FractionalOrder], backend: str = "auto"): super().__init__(order) self.backend_request = backend
[docs] def compute_numerical( self, f_values: np.ndarray, x_values: np.ndarray, **kwargs: Any, ) -> np.ndarray: """ Array-in / array-out entry point; same backend path as :meth:`compute`. Matches the ``BaseFractionalDerivative.compute_numerical`` surface so parallel and engine aliases can be used interchangeably in tests and factory wiring without a second implementation. """ h = kwargs.pop("h", None) return self.compute(f_values, x_values, h=h)
[docs] def _dispatch(self, f_arr, h): """ Dispatches computation to the appropriate backend implementation. """ N = len(f_arr) selected_backend = BackendDispatcher.get_backend(self.backend_request, N) return selected_backend
[docs] class RiemannLiouville(UnifiedFractionalOperator): """ Riemann-Liouville fractional derivative. D^α f(t) = (d/dt)^n I^(n-α) f(t) """
[docs] def __init__(self, order: Union[float, FractionalOrder], backend: str = "auto"): super().__init__(order, backend) self.n = int(np.ceil(self.alpha.alpha))
[docs] def compute(self, f: Union[Callable, np.ndarray], t: Union[float, np.ndarray], h: float = None) -> np.ndarray: if h is not None and h <= 0: raise ValueError("Step size h must be positive") # Prepare inputs (common logic from base) # We assume base.FracOp.compute structure, but we override to control dispatch # Actually, base.compute does too much (it assumes JIT/NumPy split). # We should use helper methods from base but orchestrate here. t_array = self._get_t_array(f, t, h) f_array = self._prepare_f(f, t_array) if len(f_array) == 0: return np.array([]) step_size = self._get_step_size(t_array, h) # Classical limits: match numpy_backend and avoid fragile JAX RL for β→0, n=1. if self.alpha.alpha == 0: return f_array if self.alpha.alpha == 1.0: return np.asarray(np.gradient(f_array, step_size, edge_order=2)) backend = self._dispatch(f_array, step_size) if backend == "jax": try: out = np.asarray( jax_backend._riemann_liouville_jax( f_array, self.alpha.alpha, self.n, step_size ) ) if np.all(np.isfinite(out)): return out warnings.warn( "Riemann-Liouville JAX backend returned non-finite values; " "falling back to NumPy.", RuntimeWarning, stacklevel=2, ) return numpy_backend._riemann_liouville_numpy( f_array, self.alpha.alpha, self.n, step_size ) except Exception as e: _handle_backend_failure("Riemann-Liouville", "jax", e) elif backend == "cuda": try: out = np.asarray( cuda_backend._riemann_liouville_cuda( f_array, self.alpha.alpha, self.n, step_size ) ) if np.all(np.isfinite(out)): return out warnings.warn( "Riemann-Liouville CUDA backend returned non-finite values; " "falling back to NumPy.", RuntimeWarning, stacklevel=2, ) return numpy_backend._riemann_liouville_numpy( f_array, self.alpha.alpha, self.n, step_size ) except Exception as e: _handle_backend_failure("Riemann-Liouville", "cuda", e) # Default / Fallback to NumPy return numpy_backend._riemann_liouville_numpy( f_array, self.alpha.alpha, self.n, step_size )
[docs] class Caputo(UnifiedFractionalOperator): """ Caputo fractional derivative. D^α f(t) = I^(n-α) f^(n)(t) """
[docs] def compute(self, f: Union[Callable, np.ndarray], t: Union[float, np.ndarray], h: float = None) -> np.ndarray: if h is not None and h <= 0: raise ValueError("Step size h must be positive") t_array = self._get_t_array(f, t, h) f_array = self._prepare_f(f, t_array) if len(f_array) == 0: return np.array([]) step_size = self._get_step_size(t_array, h) backend = self._dispatch(f_array, step_size) if backend == "jax": try: out = np.asarray( jax_backend._caputo_jax( f_array, self.alpha.alpha, step_size ) ) if np.all(np.isfinite(out)): return out warnings.warn( "Caputo JAX backend returned non-finite values; " "falling back to NumPy.", RuntimeWarning, stacklevel=2, ) return numpy_backend._caputo_numpy( f_array, self.alpha.alpha, step_size ) except Exception as e: _handle_backend_failure("Caputo", "jax", e) elif backend == "cuda": # Caputo CUDA not implemented yet _handle_backend_failure( "Caputo", "cuda", NotImplementedError("Caputo CUDA backend is not implemented."), ) return numpy_backend._caputo_numpy( f_array, self.alpha.alpha, step_size )
[docs] class GrunwaldLetnikov(UnifiedFractionalOperator): """ Grünwald-Letnikov fractional derivative. """
[docs] def compute(self, f: Union[Callable, np.ndarray], t: Union[float, np.ndarray], h: float = None) -> np.ndarray: if h is not None and h <= 0: raise ValueError("Step size h must be positive") t_array = self._get_t_array(f, t, h) f_array = self._prepare_f(f, t_array) if len(f_array) == 0: return np.array([]) step_size = self._get_step_size(t_array, h) if self.alpha.alpha == 0.0: return np.asarray(f_array) if self.alpha.alpha == 1.0: return np.asarray(np.gradient(f_array, step_size, edge_order=2)) backend = self._dispatch(f_array, step_size) if backend == "jax": try: out = np.asarray( jax_backend._grunwald_letnikov_jax( f_array, self.alpha.alpha, step_size ) ) if np.all(np.isfinite(out)): return out warnings.warn( "Grünwald-Letnikov JAX backend returned non-finite values; " "falling back to NumPy.", RuntimeWarning, stacklevel=2, ) return numpy_backend._grunwald_letnikov_numpy( f_array, self.alpha.alpha, step_size ) except Exception as e: _handle_backend_failure("Grunwald-Letnikov", "jax", e) elif backend == "cuda": try: out = np.asarray( cuda_backend._grunwald_letnikov_cuda( f_array, self.alpha.alpha, step_size ) ) if np.all(np.isfinite(out)): return out warnings.warn( "Grünwald-Letnikov CUDA backend returned non-finite values; " "falling back to NumPy.", RuntimeWarning, stacklevel=2, ) return numpy_backend._grunwald_letnikov_numpy( f_array, self.alpha.alpha, step_size ) except Exception as e: _handle_backend_failure("Grunwald-Letnikov", "cuda", e) return numpy_backend._grunwald_letnikov_numpy( f_array, self.alpha.alpha, step_size )