Source code for hpfracc.algorithms.advanced_methods

"""
Advanced Fractional Calculus Methods

This module implements advanced fractional calculus methods with optimizations:
- Weyl derivative via FFT Convolution with parallelization
- Marchaud derivative with Difference Quotient convolution and memory optimization
- Hadamard derivative
- Reiz/Reiz-Feller derivative via spectral method
- Adomian Decomposition method

All methods include parallel processing and memory optimizations.
"""

import numpy as np
from typing import Union, Optional, Tuple, Callable, Dict, List
from concurrent.futures import ThreadPoolExecutor

from ..core.definitions import FractionalOrder
from ..special import gamma
from .optimized_methods import ParallelConfig


[docs] class WeylDerivative: """ Weyl fractional derivative via FFT Convolution with parallelization. The Weyl derivative is defined as: D^α f(x) = (1/Γ(n-α)) (d/dx)^n ∫_x^∞ (τ-x)^(n-α-1) f(τ) dτ This implementation uses FFT convolution for efficiency and parallel processing. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder], parallel_config: Optional[ParallelConfig] = None, optimized: bool = True, **kwargs, ): """Initialize Weyl derivative calculator.""" if isinstance(alpha, (int, float)): self.alpha = FractionalOrder(alpha) else: self.alpha = alpha self.n = int(np.ceil(self.alpha.alpha)) self.alpha_val = self.alpha.alpha # accept alias 'config' if provided if parallel_config is None and 'config' in kwargs: parallel_config = kwargs.get('config') self.parallel_config = parallel_config or ParallelConfig() if not hasattr(self.parallel_config, 'n_jobs'): self.parallel_config.n_jobs = 1 self.optimized = optimized # expose for tests self.fractional_order = self.alpha
[docs] def compute( self, f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], h: Optional[float] = None, use_parallel: bool = True, ) -> Union[float, np.ndarray]: """Compute Weyl derivative using optimized FFT convolution.""" if callable(f): if hasattr(x, "__len__"): x_array = x else: x_max = x if h is None: h = x_max / 1000 x_array = np.arange(0, x_max + h, h) f_array = np.array([f(xi) for xi in x_array]) else: f_array = f if hasattr(x, "__len__"): x_array = x else: x_array = np.arange(len(f)) * (h or 1.0) # Ensure arrays have the same length min_len = min(len(f_array), len(x_array)) f_array = f_array[:min_len] x_array = x_array[:min_len] if len(f_array) == 0: return np.array([]) if use_parallel and self.parallel_config.enabled: return self._compute_parallel(f_array, x_array, h or 1.0) else: return self._compute_serial(f_array, x_array, h or 1.0)
[docs] def _compute_serial( self, f: np.ndarray, x: np.ndarray, h: float) -> np.ndarray: """Serial computation using optimized FFT convolution.""" N = len(f) n = self.n alpha = self.alpha_val # Create Weyl kernel: (τ-x)^(n-α-1) / Γ(n-α) kernel = np.zeros(N) for i in range(N): if x[i] > 0: kernel[i] = (x[i] ** (n - alpha - 1)) / gamma(n - alpha) # Pad arrays for circular convolution f_padded = np.pad(f, (0, N), mode="constant") kernel_padded = np.pad(kernel, (0, N), mode="constant") # FFT convolution f_fft = np.fft.fft(f_padded) kernel_fft = np.fft.fft(kernel_padded) conv_fft = f_fft * kernel_fft conv = np.real(np.fft.ifft(conv_fft)) # Apply nth derivative using finite differences result = np.zeros(N) result[:n] = 0.0 for i in range(n, N): if n == 1: if i < N - 1: result[i] = (conv[i + 1] - conv[i - 1]) / (2 * h) else: result[i] = (conv[i] - conv[i - 1]) / h else: if i < N - 1: result[i] = (conv[i + 1] - 2 * conv[i] + conv[i - 1]) / (h**2) else: result[i] = (conv[i] - conv[i - 1]) / h return result * h
[docs] def _compute_parallel( self, f: np.ndarray, x: np.ndarray, h: float) -> np.ndarray: """Parallel computation using chunked processing.""" N = len(f) # Handle empty arrays if N == 0: return np.array([]) chunk_size = max(1, N // self.parallel_config.n_jobs) chunks = [ (f[i: i + chunk_size], x[i: i + chunk_size], h) for i in range(0, N, chunk_size) ] with ThreadPoolExecutor(max_workers=self.parallel_config.n_jobs) as executor: results = list(executor.map(self._process_chunk, chunks)) return np.concatenate(results)
[docs] def _process_chunk( self, chunk_data: Tuple[np.ndarray, np.ndarray, float] ) -> np.ndarray: """Process a chunk of data for parallel computation.""" f_chunk, x_chunk, h = chunk_data return self._compute_serial(f_chunk, x_chunk, h)
[docs] class MarchaudDerivative: """ Marchaud fractional derivative with Difference Quotient convolution and memory optimization. The Marchaud derivative is defined as: D^α f(x) = α/Γ(1-α) ∫_0^∞ [f(x) - f(x-τ)] / τ^(α+1) dτ This implementation uses difference quotient convolution with memory optimization. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder], parallel_config: Optional[ParallelConfig] = None, **kwargs, ): """Initialize Marchaud derivative calculator.""" if isinstance(alpha, (int, float)): self.alpha = FractionalOrder(alpha) else: self.alpha = alpha self.alpha_val = self.alpha.alpha if parallel_config is None and 'config' in kwargs: parallel_config = kwargs.get('config') self.parallel_config = parallel_config or ParallelConfig() if not hasattr(self.parallel_config, 'n_jobs'): self.parallel_config.n_jobs = 1 self.fractional_order = self.alpha # Precompute constants self.coeff = self.alpha_val / gamma(1 - self.alpha_val)
[docs] def compute( self, f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], h: Optional[float] = None, use_parallel: bool = True, memory_optimized: bool = True, ) -> Union[float, np.ndarray]: """Compute Marchaud derivative with memory optimization.""" if callable(f): if hasattr(x, "__len__"): x_array = x else: x_max = x if h is None: h = x_max / 1000 x_array = np.arange(0, x_max + h, h) f_array = np.array([f(xi) for xi in x_array]) else: f_array = f if hasattr(x, "__len__"): x_array = x else: x_array = np.arange(len(f)) * (h or 1.0) # Ensure arrays have the same length min_len = min(len(f_array), len(x_array)) f_array = f_array[:min_len] x_array = x_array[:min_len] if memory_optimized: return self._compute_memory_optimized( f_array, x_array, h or 1.0, use_parallel ) else: return self._compute_standard( f_array, x_array, h or 1.0, use_parallel)
[docs] def _compute_memory_optimized( self, f: np.ndarray, x: np.ndarray, h: float, use_parallel: bool ) -> np.ndarray: """Memory-optimized computation using streaming approach.""" N = len(f) # Handle empty arrays if N == 0: return np.array([]) result = np.zeros(N) # Use smaller chunks to reduce memory usage chunk_size = max(1, min(1000, N // 4)) if use_parallel and self.parallel_config.enabled: chunks = [(f, x, h, i, min(i + chunk_size, N)) for i in range(0, N, chunk_size)] with ThreadPoolExecutor( max_workers=self.parallel_config.n_jobs ) as executor: chunk_results = list(executor.map( self._process_marchaud_chunk, chunks)) for i, chunk_result in enumerate(chunk_results): start_idx = i * chunk_size end_idx = min(start_idx + chunk_size, N) result[start_idx:end_idx] = chunk_result else: for i in range(0, N, chunk_size): end_idx = min(i + chunk_size, N) result[i:end_idx] = self._compute_marchaud_segment( f, x, h, i, end_idx) return result
[docs] def _process_marchaud_chunk( self, chunk_data: Tuple[np.ndarray, np.ndarray, float, int, int] ) -> np.ndarray: """Process a chunk for Marchaud derivative.""" f, x, h, start_idx, end_idx = chunk_data return self._compute_marchaud_segment(f, x, h, start_idx, end_idx)
[docs] def _compute_marchaud_segment( self, f: np.ndarray, x: np.ndarray, h: float, start_idx: int, end_idx: int) -> np.ndarray: """Compute Marchaud derivative for a segment.""" result = np.zeros(end_idx - start_idx) for i in range(start_idx, end_idx): if i == 0: result[i - start_idx] = 0.0 continue # Compute difference quotient integral integral = 0.0 max_tau = min(i, 1000) # Limit integration range for efficiency for j in range(1, max_tau + 1): tau = j * h if i - j >= 0: diff = f[i] - f[i - j] integral += diff / (tau ** (self.alpha_val + 1)) result[i - start_idx] = self.coeff * integral * h return result
[docs] def _compute_standard( self, f: np.ndarray, x: np.ndarray, h: float, use_parallel: bool ) -> np.ndarray: """Standard computation without memory optimization.""" N = len(f) result = np.zeros(N) for i in range(N): if i == 0: result[i] = 0.0 continue integral = 0.0 for j in range(1, i + 1): tau = j * h diff = f[i] - f[i - j] integral += diff / (tau ** (self.alpha_val + 1)) result[i] = self.coeff * integral * h return result
[docs] class HadamardDerivative: """ Hadamard fractional derivative. The Hadamard derivative is defined as: D^α f(x) = (1/Γ(n-α)) (x d/dx)^n ∫_1^x (log(x/t))^(n-α-1) f(t) dt/t This implementation uses logarithmic transformation and efficient quadrature. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder, None] = None, order: Union[float, FractionalOrder, None] = None, **kwargs, ): """Initialize Hadamard derivative calculator. Accepts both `alpha` and `order` for backward compatibility. """ # Prefer explicit alpha if provided; otherwise fall back to order provided = alpha if alpha is not None else order if provided is None: raise TypeError( "HadamardDerivative requires `alpha` or `order` parameter") if isinstance(provided, (int, float)): self.alpha = FractionalOrder(provided) else: self.alpha = provided self.n = int(np.ceil(self.alpha.alpha)) self.alpha_val = self.alpha.alpha self.fractional_order = self.alpha
[docs] def compute( self, f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], h: Optional[float] = None, ) -> Union[float, np.ndarray]: """Compute Hadamard derivative using logarithmic transformation.""" if callable(f): if hasattr(x, "__len__"): x_array = x else: x_max = x if h is None: h = x_max / 1000 # Start from 1 for Hadamard x_array = np.arange(1, x_max + h, h) f_array = np.array([f(xi) for xi in x_array]) else: f_array = f if hasattr(x, "__len__"): x_array = x else: x_array = np.arange(1, len(f) + 1) * (h or 1.0) # Ensure arrays have the same length min_len = min(len(f_array), len(x_array)) f_array = f_array[:min_len] x_array = x_array[:min_len] return self._compute_hadamard(f_array, x_array, h or 1.0)
[docs] def _compute_hadamard( self, f: np.ndarray, x: np.ndarray, h: float) -> np.ndarray: """Compute Hadamard derivative using logarithmic transformation.""" N = len(f) result = np.zeros(N) for i in range(N): if i < self.n: result[i] = 0.0 continue # Transform to logarithmic coordinates log_x = np.log(x[i]) # Compute integral part integral = 0.0 for j in range(i): if x[j] <= 0: continue # Skip zero or negative values log_t = np.log(x[j]) log_kernel = (log_x - log_t) ** (self.n - self.alpha_val - 1) integral += f[j] * log_kernel / x[j] # Apply differential operator (x d/dx)^n if self.n == 1: result[i] = x[i] * integral / gamma(self.n - self.alpha_val) else: # Higher derivatives using recursive application temp = integral / gamma(self.n - self.alpha_val) for k in range(self.n): temp = x[i] * np.gradient(temp, x[i]) result[i] = temp return result * h
[docs] class ReizFellerDerivative: """ Reiz-Feller fractional derivative via spectral method. The Reiz-Feller derivative is defined as: D^α f(x) = (1/2π) ∫_{-∞}^∞ |ξ|^α F[f](ξ) e^(iξx) dξ This implementation uses FFT for spectral computation. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder], parallel_config: Optional[ParallelConfig] = None, **kwargs, ): """Initialize Reiz-Feller derivative calculator.""" if isinstance(alpha, (int, float)): self.alpha = FractionalOrder(alpha) else: self.alpha = alpha self.alpha_val = self.alpha.alpha if parallel_config is None and 'config' in kwargs: parallel_config = kwargs.get('config') self.parallel_config = parallel_config or ParallelConfig() if not hasattr(self.parallel_config, 'n_jobs'): self.parallel_config.n_jobs = 1 self.fractional_order = self.alpha
[docs] def compute( self, f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], h: Optional[float] = None, use_parallel: bool = True, ) -> Union[float, np.ndarray]: """Compute Reiz-Feller derivative using spectral method.""" if callable(f): if hasattr(x, "__len__"): x_array = x else: x_max = x if h is None: h = x_max / 1000 x_array = np.arange(-x_max, x_max + h, h) f_array = np.array([f(xi) for xi in x_array]) else: f_array = f if hasattr(x, "__len__"): x_array = x else: x_array = np.arange(len(f)) * (h or 1.0) # Ensure arrays have the same length min_len = min(len(f_array), len(x_array)) f_array = f_array[:min_len] x_array = x_array[:min_len] return self._compute_spectral(f_array, x_array, h or 1.0, use_parallel)
[docs] def _compute_spectral( self, f: np.ndarray, x: np.ndarray, h: float, use_parallel: bool ) -> np.ndarray: """Compute using spectral method with FFT.""" N = len(f) original_len = N # Ensure N is even for FFT if N % 2 == 1: N += 1 f = np.pad(f, (0, 1), mode="edge") x = np.pad(x, (0, 1), mode="edge") # Compute FFT f_fft = np.fft.fft(f) # Create frequency array freq = np.fft.fftfreq(N, h) # Apply spectral filter |ξ|^α spectral_filter = np.abs(freq) ** self.alpha_val spectral_filter[0] = 0 # Handle zero frequency # Apply filter in frequency domain filtered_fft = f_fft * spectral_filter # Inverse FFT result = np.real(np.fft.ifft(filtered_fft)) return result[:original_len] # Return original length
[docs] class AdomianDecomposition: """ Adomian Decomposition Method for solving fractional differential equations. This method decomposes the solution into a series and computes each term using the Adomian polynomials. """
[docs] def __init__( self, alpha: Union[float, FractionalOrder], parallel_config: Optional[ParallelConfig] = None, **kwargs, ): """Initialize Adomian Decomposition solver.""" if isinstance(alpha, (int, float)): self.alpha = FractionalOrder(alpha) else: self.alpha = alpha self.alpha_val = self.alpha.alpha if parallel_config is None and 'config' in kwargs: parallel_config = kwargs.get('config') self.parallel_config = parallel_config or ParallelConfig() if not hasattr(self.parallel_config, 'n_jobs'): self.parallel_config.n_jobs = 1 self.fractional_order = self.alpha
[docs] def solve( self, equation: Callable, initial_conditions: Dict, t_span: Tuple[float, float], n_steps: int = 1000, n_terms: int = 10, use_parallel: bool = True, ) -> Tuple[np.ndarray, np.ndarray]: """ Solve fractional differential equation using Adomian decomposition. Args: equation: Function representing the FDE initial_conditions: Dictionary of initial conditions t_span: Time span (t0, tf) n_steps: Number of time steps n_terms: Number of terms in the decomposition use_parallel: Whether to use parallel processing Returns: Tuple of (time_points, solution) """ t0, tf = t_span t = np.linspace(t0, tf, n_steps) h = (tf - t0) / (n_steps - 1) # Initialize solution series solution = np.zeros(n_steps) # Add initial condition if 0 in initial_conditions: solution += initial_conditions[0] # Compute decomposition terms if use_parallel and self.parallel_config.enabled: terms = self._compute_terms_parallel(equation, t, h, n_terms) else: terms = self._compute_terms_serial(equation, t, h, n_terms) # Sum all terms for term in terms: solution += term return t, solution
[docs] def _compute_terms_serial( self, equation: Callable, t: np.ndarray, h: float, n_terms: int ) -> List[np.ndarray]: """Compute decomposition terms serially.""" terms = [] for n in range(1, n_terms + 1): # Compute Adomian polynomial using the same time array adomian = self._compute_adomian_polynomial(equation, terms, n, t) # Compute integral term integral_term = self._compute_integral_term(adomian, t, h) terms.append(integral_term) return terms
[docs] def _compute_terms_parallel( self, equation: Callable, t: np.ndarray, h: float, n_terms: int ) -> List[np.ndarray]: """Compute decomposition terms in parallel.""" term_indices = list(range(1, n_terms + 1)) with ThreadPoolExecutor(max_workers=self.parallel_config.n_jobs) as executor: futures = [] for n in term_indices: future = executor.submit( self._compute_single_term, equation, t, h, n) futures.append(future) terms = [future.result() for future in futures] return terms
[docs] def _compute_adomian_polynomial(self, equation: Callable, previous_terms: List[np.ndarray], n: int, t: np.ndarray) -> np.ndarray: """Compute the nth Adomian polynomial.""" N = len(t) adomian = np.zeros(N) for i in range(N): # Simplified polynomial computation using the provided time array adomian[i] = equation(t[i], 0) * (t[i] ** n) / gamma(n + 1) return adomian
[docs] def _compute_integral_term( self, adomian: np.ndarray, t: np.ndarray, h: float ) -> np.ndarray: """Compute integral term using fractional integration.""" N = len(adomian) result = np.zeros(N) for i in range(N): integral = 0.0 for j in range(i + 1): # Avoid division by zero and negative powers if t[i] > t[j] and self.alpha_val > 0: kernel = ((t[i] - t[j]) ** (self.alpha_val - 1) ) / gamma(self.alpha_val) integral += adomian[j] * kernel result[i] = integral * h return result
[docs] def _compute_single_term( self, equation: Callable, t: np.ndarray, h: float, n: int ) -> np.ndarray: """Compute a single decomposition term.""" # This is a simplified implementation # In practice, you would need the previous terms to compute the Adomian # polynomial adomian = np.zeros_like(t) # For demonstration, use a simple approximation for i in range(len(t)): adomian[i] = equation(t[i], 0) * (t[i] ** n) / gamma(n + 1) return self._compute_integral_term(adomian, t, h)
# Convenience functions for easy access
[docs] def weyl_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: float, h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for Weyl derivative.""" calculator = WeylDerivative(alpha, **kwargs) return calculator.compute(f, x, h)
[docs] def marchaud_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: float, h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for Marchaud derivative.""" calculator = MarchaudDerivative(alpha, **kwargs) return calculator.compute(f, x, h)
[docs] def hadamard_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: float, h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for Hadamard derivative.""" calculator = HadamardDerivative(alpha, **kwargs) return calculator.compute(f, x, h)
[docs] def reiz_feller_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: float, h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for Reiz-Feller derivative.""" calculator = ReizFellerDerivative(alpha, **kwargs) return calculator.compute(f, x, h)
# ============================================================================= # OPTIMIZED ALIASES FOR BACKWARD COMPATIBILITY # =============================================================================
[docs] class OptimizedWeylDerivative(WeylDerivative): """Alias for backward compatibility with optimized Weyl derivative."""
[docs] def __init__(self, order: Union[float, FractionalOrder], **kwargs): """Initialize optimized Weyl derivative calculator.""" super().__init__(order, optimized=True, **kwargs)
[docs] class OptimizedMarchaudDerivative(MarchaudDerivative): """Alias for backward compatibility with optimized Marchaud derivative."""
[docs] def __init__(self, order: Union[float, FractionalOrder], **kwargs): """Initialize optimized Marchaud derivative calculator.""" super().__init__(order, **kwargs)
[docs] class OptimizedHadamardDerivative(HadamardDerivative): """Alias for backward compatibility with optimized Hadamard derivative."""
[docs] def __init__(self, order: Union[float, FractionalOrder], **kwargs): """Initialize optimized Hadamard derivative calculator.""" super().__init__(order, **kwargs)
[docs] class OptimizedReizFellerDerivative(ReizFellerDerivative): """Alias for backward compatibility with optimized Reiz-Feller derivative."""
[docs] def __init__(self, order: Union[float, FractionalOrder], **kwargs): """Initialize optimized Reiz-Feller derivative calculator.""" super().__init__(order, **kwargs)
[docs] class OptimizedAdomianDecomposition(AdomianDecomposition): """Alias for backward compatibility with optimized Adomian decomposition."""
[docs] def __init__(self, order: Union[float, FractionalOrder], **kwargs): """Initialize optimized Adomian decomposition calculator.""" super().__init__(order, **kwargs)
# ============================================================================= # CONVENIENCE FUNCTIONS FOR OPTIMIZED METHODS # =============================================================================
[docs] def optimized_weyl_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for optimized Weyl derivative.""" calculator = OptimizedWeylDerivative(alpha, **kwargs) return calculator.compute(f, x, h)
[docs] def optimized_marchaud_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for optimized Marchaud derivative.""" calculator = OptimizedMarchaudDerivative(alpha, **kwargs) return calculator.compute(f, x, h)
[docs] def optimized_hadamard_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for optimized Hadamard derivative.""" calculator = OptimizedHadamardDerivative(alpha, **kwargs) return calculator.compute(f, x, h)
[docs] def optimized_reiz_feller_derivative( f: Union[Callable, np.ndarray], x: Union[float, np.ndarray], alpha: Union[float, FractionalOrder], h: Optional[float] = None, **kwargs, ) -> Union[float, np.ndarray]: """Convenience function for optimized Reiz-Feller derivative.""" calculator = OptimizedReizFellerDerivative(alpha, **kwargs) return calculator.compute(f, x, h)