"""
Unified fractional derivative engines (canonical implementation path).
`RiemannLiouville`, `Caputo`, and `GrunwaldLetnikov` here are the **single**
dispatching implementations used across the library. Higher-level wrappers in
`hpfracc.core.fractional_implementations` delegate to these classes; names such as
``OptimizedCaputo`` in `hpfracc.algorithms.optimized_methods` are backward-compatible
aliases of the same types. ``ParallelOptimized*`` in that module are the same engine
classes with a parallel-oriented name; ``compute_numerical`` is provided on
``UnifiedFractionalOperator`` so call sites match adapter-style APIs without duplicating
kernels.
"""
import os
import warnings
import numpy as np
from typing import Any, Union, Callable, Optional
from ..core.definitions import FractionalOrder
from .base import FractionalOperator
from .dispatch import BackendDispatcher
from .impls import numpy_backend, jax_backend, cuda_backend
_STRICT_NUMERICS = os.getenv("HPFRACC_STRICT_NUMERICS", "").strip().lower() in {
"1",
"true",
"yes",
"on",
}
[docs]
def _handle_backend_failure(method: str, backend: str, error: Exception) -> None:
"""Apply fail-fast policy or explicit warning for backend failures."""
message = (
f"{method} backend '{backend}' failed: {error}. "
"Falling back to NumPy backend."
)
warnings.warn(message, RuntimeWarning)
if _STRICT_NUMERICS:
raise RuntimeError(
f"{method} backend '{backend}' failed with strict numerics enabled."
) from error
[docs]
class UnifiedFractionalOperator(FractionalOperator):
"""
Base class for unified operators with backend dispatch.
"""
[docs]
def __init__(self, order: Union[float, FractionalOrder], backend: str = "auto"):
super().__init__(order)
self.backend_request = backend
[docs]
def compute_numerical(
self,
f_values: np.ndarray,
x_values: np.ndarray,
**kwargs: Any,
) -> np.ndarray:
"""
Array-in / array-out entry point; same backend path as :meth:`compute`.
Matches the ``BaseFractionalDerivative.compute_numerical`` surface so
parallel and engine aliases can be used interchangeably in tests and
factory wiring without a second implementation.
"""
h = kwargs.pop("h", None)
return self.compute(f_values, x_values, h=h)
[docs]
def _dispatch(self, f_arr, h):
"""
Dispatches computation to the appropriate backend implementation.
"""
N = len(f_arr)
selected_backend = BackendDispatcher.get_backend(self.backend_request, N)
return selected_backend
[docs]
class RiemannLiouville(UnifiedFractionalOperator):
"""
Riemann-Liouville fractional derivative.
D^α f(t) = (d/dt)^n I^(n-α) f(t)
"""
[docs]
def __init__(self, order: Union[float, FractionalOrder], backend: str = "auto"):
super().__init__(order, backend)
self.n = int(np.ceil(self.alpha.alpha))
[docs]
def compute(self, f: Union[Callable, np.ndarray], t: Union[float, np.ndarray], h: float = None) -> np.ndarray:
if h is not None and h <= 0:
raise ValueError("Step size h must be positive")
# Prepare inputs (common logic from base)
# We assume base.FracOp.compute structure, but we override to control dispatch
# Actually, base.compute does too much (it assumes JIT/NumPy split).
# We should use helper methods from base but orchestrate here.
t_array = self._get_t_array(f, t, h)
f_array = self._prepare_f(f, t_array)
if len(f_array) == 0:
return np.array([])
step_size = self._get_step_size(t_array, h)
# Classical limits: match numpy_backend and avoid fragile JAX RL for β→0, n=1.
if self.alpha.alpha == 0:
return f_array
if self.alpha.alpha == 1.0:
return np.asarray(np.gradient(f_array, step_size, edge_order=2))
backend = self._dispatch(f_array, step_size)
if backend == "jax":
try:
out = np.asarray(
jax_backend._riemann_liouville_jax(
f_array, self.alpha.alpha, self.n, step_size
)
)
if np.all(np.isfinite(out)):
return out
warnings.warn(
"Riemann-Liouville JAX backend returned non-finite values; "
"falling back to NumPy.",
RuntimeWarning,
stacklevel=2,
)
return numpy_backend._riemann_liouville_numpy(
f_array, self.alpha.alpha, self.n, step_size
)
except Exception as e:
_handle_backend_failure("Riemann-Liouville", "jax", e)
elif backend == "cuda":
try:
out = np.asarray(
cuda_backend._riemann_liouville_cuda(
f_array, self.alpha.alpha, self.n, step_size
)
)
if np.all(np.isfinite(out)):
return out
warnings.warn(
"Riemann-Liouville CUDA backend returned non-finite values; "
"falling back to NumPy.",
RuntimeWarning,
stacklevel=2,
)
return numpy_backend._riemann_liouville_numpy(
f_array, self.alpha.alpha, self.n, step_size
)
except Exception as e:
_handle_backend_failure("Riemann-Liouville", "cuda", e)
# Default / Fallback to NumPy
return numpy_backend._riemann_liouville_numpy(
f_array, self.alpha.alpha, self.n, step_size
)
[docs]
class Caputo(UnifiedFractionalOperator):
"""
Caputo fractional derivative.
D^α f(t) = I^(n-α) f^(n)(t)
"""
[docs]
def compute(self, f: Union[Callable, np.ndarray], t: Union[float, np.ndarray], h: float = None) -> np.ndarray:
if h is not None and h <= 0:
raise ValueError("Step size h must be positive")
t_array = self._get_t_array(f, t, h)
f_array = self._prepare_f(f, t_array)
if len(f_array) == 0: return np.array([])
step_size = self._get_step_size(t_array, h)
backend = self._dispatch(f_array, step_size)
if backend == "jax":
try:
out = np.asarray(
jax_backend._caputo_jax(
f_array, self.alpha.alpha, step_size
)
)
if np.all(np.isfinite(out)):
return out
warnings.warn(
"Caputo JAX backend returned non-finite values; "
"falling back to NumPy.",
RuntimeWarning,
stacklevel=2,
)
return numpy_backend._caputo_numpy(
f_array, self.alpha.alpha, step_size
)
except Exception as e:
_handle_backend_failure("Caputo", "jax", e)
elif backend == "cuda":
# Caputo CUDA not implemented yet
_handle_backend_failure(
"Caputo",
"cuda",
NotImplementedError("Caputo CUDA backend is not implemented."),
)
return numpy_backend._caputo_numpy(
f_array, self.alpha.alpha, step_size
)
[docs]
class GrunwaldLetnikov(UnifiedFractionalOperator):
"""
Grünwald-Letnikov fractional derivative.
"""
[docs]
def compute(self, f: Union[Callable, np.ndarray], t: Union[float, np.ndarray], h: float = None) -> np.ndarray:
if h is not None and h <= 0:
raise ValueError("Step size h must be positive")
t_array = self._get_t_array(f, t, h)
f_array = self._prepare_f(f, t_array)
if len(f_array) == 0:
return np.array([])
step_size = self._get_step_size(t_array, h)
if self.alpha.alpha == 0.0:
return np.asarray(f_array)
if self.alpha.alpha == 1.0:
return np.asarray(np.gradient(f_array, step_size, edge_order=2))
backend = self._dispatch(f_array, step_size)
if backend == "jax":
try:
out = np.asarray(
jax_backend._grunwald_letnikov_jax(
f_array, self.alpha.alpha, step_size
)
)
if np.all(np.isfinite(out)):
return out
warnings.warn(
"Grünwald-Letnikov JAX backend returned non-finite values; "
"falling back to NumPy.",
RuntimeWarning,
stacklevel=2,
)
return numpy_backend._grunwald_letnikov_numpy(
f_array, self.alpha.alpha, step_size
)
except Exception as e:
_handle_backend_failure("Grunwald-Letnikov", "jax", e)
elif backend == "cuda":
try:
out = np.asarray(
cuda_backend._grunwald_letnikov_cuda(
f_array, self.alpha.alpha, step_size
)
)
if np.all(np.isfinite(out)):
return out
warnings.warn(
"Grünwald-Letnikov CUDA backend returned non-finite values; "
"falling back to NumPy.",
RuntimeWarning,
stacklevel=2,
)
return numpy_backend._grunwald_letnikov_numpy(
f_array, self.alpha.alpha, step_size
)
except Exception as e:
_handle_backend_failure("Grunwald-Letnikov", "cuda", e)
return numpy_backend._grunwald_letnikov_numpy(
f_array, self.alpha.alpha, step_size
)