Source code for hpfracc.solvers.pde_solvers

"""
Fractional Partial Differential Equation Solvers

This module provides comprehensive solvers for fractional PDEs including
finite difference methods, spectral methods, and adaptive mesh refinement.

Performance Note (v2.1.0):
- Intelligent backend selection for sparse matrix operations
- Optimal array operations based on problem size
"""

import numpy as np
from typing import Union, Optional, Tuple, Callable, Dict
from scipy import sparse
from scipy.sparse.linalg import spsolve, splu
from scipy.special import gamma

from ..core.definitions import FractionalOrder


# Initialize intelligent backend selector for PDE solvers
_intelligent_selector = None
_use_intelligent_backend = True

[docs] def _get_intelligent_selector(): """Get intelligent backend selector instance for PDE solvers.""" global _intelligent_selector, _use_intelligent_backend if not _use_intelligent_backend: return None if _intelligent_selector is None: try: from ..ml.intelligent_backend_selector import IntelligentBackendSelector _intelligent_selector = IntelligentBackendSelector(enable_learning=True) except ImportError: _use_intelligent_backend = False _intelligent_selector = None return _intelligent_selector
[docs] def _select_array_backend(data_size: int, operation_type: str = "element_wise") -> str: """ Select optimal backend for array operations in PDE solving. Args: data_size: Number of elements to process operation_type: Type of operation (e.g., "element_wise", "matrix_multiply") Returns: Backend name: "numpy", "numba", or "jax" """ selector = _get_intelligent_selector() if selector is not None: try: from ..ml.intelligent_backend_selector import WorkloadCharacteristics from ..ml.backends import BackendType workload = WorkloadCharacteristics( operation_type=operation_type, data_size=data_size, data_shape=(data_size,), is_iterative=True ) backend_type = selector.select_backend(workload) # Map to appropriate backends if backend_type == BackendType.NUMBA: return "numba" elif backend_type == BackendType.JAX: return "jax" else: return "numpy" except Exception: pass # Fall through to default # Default selection based on size if data_size < 10000: return "numpy" # Small problems: NumPy is fastest else: return "numba" # Large problems: Numba JIT compilation helps
[docs] class FractionalPDESolver: """ Base class for fractional PDE solvers. Provides common functionality for solving fractional partial differential equations of various types. """
[docs] def __init__( self, pde_type: str = "diffusion", method: str = "finite_difference", spatial_order: int = 2, temporal_order: int = 1, adaptive: bool = False, *, fractional_order: Optional[Union[float, FractionalOrder]] = None, boundary_conditions: Optional[str] = None, ): """ Initialize fractional PDE solver. Args: pde_type: Type of PDE ("diffusion", "advection", "reaction_diffusion") method: Numerical method ("finite_difference", "spectral", "finite_element") spatial_order: Order of spatial discretization temporal_order: Order of temporal discretization adaptive: Use adaptive mesh refinement """ self.pde_type = pde_type.lower() self.method = method.lower() self.spatial_order = spatial_order self.temporal_order = temporal_order self.adaptive = adaptive # Accept alias for tests; stored for compatibility only self.fractional_order = fractional_order # Store boundary condition mode for compatibility (e.g., "dirichlet", "periodic") self.boundary_conditions = boundary_conditions or "dirichlet" # Validate PDE type valid_pde_types = ["diffusion", "advection", "reaction_diffusion", "wave"] if self.pde_type not in valid_pde_types: raise ValueError(f"PDE type must be one of {valid_pde_types}") # Validate method valid_methods = ["finite_difference", "spectral", "finite_element"] if self.method not in valid_methods: raise ValueError(f"Method must be one of {valid_methods}")
[docs] def _validate_orders(self, alpha: float, beta: float): """Validate fractional orders for PDE solver.""" if not 0 < alpha <= 2: raise ValueError(f"Temporal order alpha must be in (0, 2], but got {alpha}") if not 0 < beta <= 2: raise ValueError(f"Spatial order beta must be in (0, 2], but got {beta}")
[docs] def solve( self, t_span: Tuple[float, float], x_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Dict, alpha: float, beta: float, **kwargs, ) -> Dict[str, np.ndarray]: """Solve the fractional PDE.""" self._validate_orders(alpha, beta) t0, tf = t_span x_start, x_end = x_span nt = kwargs.get("nt", 100) nx = kwargs.get("nx", 100) t_values = np.linspace(t0, tf, nt) x_values = np.linspace(x_start, x_end, nx) dt = t_values[1] - t_values[0] dx = x_values[1] - x_values[0] solution = np.zeros((nx, nt)) solution[:, 0] = initial_condition(x_values) diffusion_coeff = kwargs.get("diffusion_coeff", 1.0) A = self._build_spatial_matrix(nx, dx, beta, diffusion_coeff) A_lu = splu(A) # Precompute L1 weights j = np.arange(1, nt) l1_weights = (j + 1)**(1 - alpha) - j**(1 - alpha) for n in range(nt - 1): history = np.sum(l1_weights[:n] * np.diff(solution[:, :n+1], axis=1), axis=1) rhs = solution[:, n] - (dt**alpha / gamma(2 - alpha)) * history # Add source term if provided if 'source_term' in kwargs: source = kwargs['source_term'](x_values, t_values[n+1]) rhs += dt * source solution[1:-1, n + 1] = A_lu.solve(rhs[1:-1]) # Apply boundary conditions if 'dirichlet' in boundary_conditions: solution[0, n + 1] = boundary_conditions['dirichlet'][0](t_values[n+1]) solution[-1, n + 1] = boundary_conditions['dirichlet'][1](t_values[n+1]) return {"t": t_values, "x": x_values, "u": solution}
[docs] class FractionalDiffusionSolver(FractionalPDESolver): """ Solver for fractional diffusion equations. Solves equations of the form: ∂^α u/∂t^α = D ∂^β u/∂x^β + f(x, t, u) where α and β are fractional orders. """
[docs] def __init__( self, method: str = "finite_difference", spatial_order: int = 2, temporal_order: int = 1, derivative_type: str = "caputo", ): """ Initialize fractional diffusion solver. Args: method: Numerical method spatial_order: Order of spatial discretization temporal_order: Order of temporal discretization derivative_type: Type of fractional derivative """ super().__init__("diffusion", method, spatial_order, temporal_order) self.derivative_type = derivative_type.lower()
[docs] def solve( self, x_span: Tuple[float, float], t_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Tuple[Callable, Callable], alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], diffusion_coeff: float = 1.0, source_term: Optional[Callable] = None, nx: int = 100, nt: int = 100, **kwargs, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Solve fractional diffusion equation. Args: x_span: Spatial interval (x0, xf) t_span: Time interval (t0, tf) initial_condition: Initial condition u(x, 0) boundary_conditions: Boundary conditions (left_bc, right_bc) alpha: Temporal fractional order beta: Spatial fractional order diffusion_coeff: Diffusion coefficient source_term: Source term f(x, t, u) nx: Number of spatial grid points nt: Number of temporal grid points **kwargs: Additional solver parameters Returns: Tuple of (t_values, x_values, solution) """ self._validate_orders(alpha, beta) x0, xf = x_span t0, tf = t_span # Grids x_values = np.linspace(x0, xf, nx) t_values = np.linspace(t0, tf, nt) dx = x_values[1] - x_values[0] dt = t_values[1] - t_values[0] # Solution array in (nx, nt) to reuse base formulations solution = np.zeros((nx, nt), dtype=float) solution[:, 0] = np.array([initial_condition(xi) for xi in x_values], dtype=float) alpha_val = float(alpha.alpha) if hasattr(alpha, 'alpha') else float(alpha) if self.method == "spectral": # Periodic spectral implicit step per time level D = diffusion_coeff k = 2.0 * np.pi * np.fft.fftfreq(nx, d=dx) abs_k_beta = np.abs(k) ** (float(beta.alpha) if hasattr(beta, 'alpha') else float(beta)) for n in range(1, nt): # L1 history accumulation for 0<alpha<1, fallback to CN at alpha=1 if 0.0 < alpha_val < 1.0: j = np.arange(1, n + 1, dtype=float) a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val) hist = np.zeros(nx, dtype=float) for m in range(1, n + 1): hist += a[m - 1] * (solution[:, n - m + 1 - 1] if n - m >= 0 else 0.0) scale = gamma(2.0 - alpha_val) * (dt ** alpha_val) rhs = hist.copy() elif alpha_val == 1.0: # CN-like scaling rhs = solution[:, n - 1] scale = dt else: # Fallback simple history for 1<alpha<2 j = np.arange(1, n + 1, dtype=float) c = j**(2.0 - alpha_val) - (j - 1.0)**(2.0 - alpha_val) hist = np.zeros(nx, dtype=float) for m in range(1, n + 1): hist += c[m - 1] * solution[:, n - m] scale = 1.0 / (gamma(3.0 - alpha_val) * (dt ** alpha_val)) rhs = hist.copy() if source_term is not None: s = np.array([source_term(xi, t_values[n], solution[n - 1, ix] if n > 0 else solution[0, ix]) for ix, xi in enumerate(x_values)], dtype=float) rhs = rhs + scale * s rhs_hat = np.fft.fft(rhs) denom = 1.0 + scale * D * abs_k_beta u_hat = rhs_hat / denom u_new = np.fft.ifft(u_hat).real solution[:, n] = u_new return t_values, x_values, solution.T # Finite difference implicit solve with L1/CN/L2-1σ variants A = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha, side='lhs') A_lu = splu(A.tocsc()) if 0.0 < alpha_val < 1.0: # Precompute L1 weights j = np.arange(1, nt, dtype=float) a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val) scale = dt ** alpha_val / gamma(2.0 - alpha_val) for n in range(0, nt - 1): # Build RHS using history of increments if n >= 1: # increments of u in time for all past steps 1..n diffs = np.diff(solution[:, : n + 1], axis=1) # shape (nx, n) # weight for each past increment Δu^{k} is a_{n-k} weights = a[:n][::-1] # length n history = diffs @ weights # (nx,) else: history = np.zeros(nx, dtype=float) rhs_full = solution[:, n] - scale * history if source_term is not None: # Source term may accept (x, t) or (x, t, u); support both try: s_n = np.array([source_term(xi, t_values[n + 1], solution[ix, n]) for ix, xi in enumerate(x_values)], dtype=float) except TypeError: s_n = np.array([source_term(xi, t_values[n + 1]) for xi in x_values], dtype=float) rhs_full += dt * s_n # Dirichlet boundaries enforced after solve; interior system rhs = rhs_full[1:-1] sol_interior = A_lu.solve(rhs) solution[1:-1, n + 1] = sol_interior # Apply boundary conditions solution[0, n + 1] = boundary_conditions[0](t_values[n + 1]) solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1]) elif alpha_val == 1.0: A_lhs = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='lhs') A_rhs = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='rhs') A_lu = splu(A_lhs.tocsc()) for n in range(0, nt - 1): rhs_full = (A_rhs @ solution[1:-1, n]) if source_term is not None: s_n = np.array([source_term(xi, t_values[n + 1]) for xi in x_values[1:-1]], dtype=float) rhs_full += dt * s_n sol_interior = A_lu.solve(rhs_full) solution[1:-1, n + 1] = sol_interior solution[0, n + 1] = boundary_conditions[0](t_values[n + 1]) solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1]) else: # Basic L2-1σ-like accumulation (smoke path) j = np.arange(1, nt, dtype=float) c = j**(2.0 - alpha_val) - (j - 1.0)**(2.0 - alpha_val) inv_scale = 1.0 / (gamma(3.0 - alpha_val) * (dt ** alpha_val)) for n in range(0, nt - 1): hist = np.zeros(nx - 2, dtype=float) for m in range(1, n + 1): hist += c[m - 1] * solution[1:-1, n + 1 - m] rhs = inv_scale * hist if source_term is not None: s_n = np.array([source_term(xi, t_values[n + 1]) for xi in x_values[1:-1]], dtype=float) rhs += dt * s_n sol_interior = A_lu.solve(rhs) solution[1:-1, n + 1] = sol_interior solution[0, n + 1] = boundary_conditions[0](t_values[n + 1]) solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1]) return t_values, x_values, solution.T
[docs] def _solve_spatial_step( self, solution: np.ndarray, n: int, x_values: np.ndarray, t_values: np.ndarray, alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], source_term: Optional[Callable], nx: int, nt: int, pde_params: dict, ) -> np.ndarray: """ Solve spatial problem at current time step. Args: solution: Solution array n: Current time step x_values: Spatial grid t_values: Time grid alpha: Temporal fractional order beta: Spatial fractional order source_term: Source term nx: Number of spatial grid points nt: Number of temporal grid points pde_params: Dictionary of parameters (diffusion_coeff, dx, dt) Returns: Solution at interior points """ if isinstance(alpha, FractionalOrder): alpha_val = alpha.alpha else: alpha_val = alpha dt = t_values[1] - t_values[0] # Handle explicit scheme for alpha=2.0 (wave equation) separately if alpha_val == 2.0: diffusion_coeff = pde_params.get('diffusion_coeff', 1.0) dx = pde_params.get('dx') spatial_op = self._get_spatial_operator(nx, beta, dx) if n == 1: # First step, assuming u_t(x,0)=0 spatial_deriv_term = diffusion_coeff * (spatial_op @ solution[1:-1, 0]) return solution[1:-1, 0] + (dt**2 / 2) * spatial_deriv_term else: # Subsequent steps spatial_deriv_term = diffusion_coeff * (spatial_op @ solution[1:-1, n-1]) return 2*solution[1:-1, n-1] - solution[1:-1, n-2] + dt**2 * spatial_deriv_term if self.method == "finite_difference": return self._finite_difference_step( solution, n, x_values, t_values, alpha, beta, source_term, nx, nt, pde_params ) elif self.method == "spectral": return self._spectral_step( solution, n, x_values, t_values, alpha, beta, source_term, nx, nt, pde_params ) else: raise ValueError(f"Unknown method: {self.method}")
[docs] def _finite_difference_step( self, solution: np.ndarray, n: int, x_values: np.ndarray, t_values: np.ndarray, alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], source_term: Optional[Callable], nx: int, nt: int, pde_params: dict, ) -> np.ndarray: """ Finite difference step. Args: solution: Solution array n: Current time step x_values: Spatial grid t_values: Time grid alpha: Temporal fractional order beta: Spatial fractional order source_term: Source term nx: Number of spatial grid points nt: Number of temporal grid points pde_params: Dictionary of parameters (diffusion_coeff, dx, dt) Returns: Solution at interior points """ dt = t_values[1] - t_values[0] dx = x_values[1] - x_values[0] diffusion_coeff = pde_params.get("diffusion_coeff", 1.0) # Build the left-hand side matrix A A = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha, side='lhs') # Build the right-hand side vector b from previous time steps b = self._compute_temporal_derivative(solution, n, alpha, dt, **pde_params) # Add source term if provided if source_term: source_at_n = np.array( [source_term(xi, t_values[n]) for xi in x_values[1:-1]] ) b += source_at_n # Solve the linear system Au_n = b u_interior = spsolve(A, b) return u_interior
[docs] def _spectral_step( self, solution, n, x_values, t_values, alpha, beta, source_term, nx, nt, pde_params ): """ Correct, efficient, diagonal spectral solve for periodic BCs. """ dx = x_values[1] - x_values[0] dt = t_values[1] - t_values[0] D = pde_params.get("diffusion_coeff", 1.0) alpha_val = float(alpha.alpha) if hasattr(alpha, "alpha") else float(alpha) beta_val = float(beta.alpha) if hasattr(beta, "alpha") else float(beta) if not (0.0 < alpha_val < 1.0): raise NotImplementedError( f"Spectral PDE scheme is implemented for 0 < α < 1, got α={alpha_val}. " "For α ≥ 1, consider decomposing into integer and fractional parts, or use " "finite difference schemes with the L1/L2 methods. For α ≤ 0, the problem " "is not well-defined for fractional PDEs." ) u_prev = solution[:, n-1] # L1 history RHS: sum_{j=1..n} a_j * u^{n-j} j = np.arange(1, n+1, dtype=float) a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val) hist = np.zeros_like(u_prev) for m in range(1, n+1): hist += a[m-1] * solution[:, n-m] scale = gamma(2.0 - alpha_val) * (dt ** alpha_val) rhs = hist.copy() if source_term is not None: s = np.array([source_term(x, t_values[n], u_prev[ix]) for ix, x in enumerate(x_values)]) rhs += scale * s k = 2.0 * np.pi * np.fft.fftfreq(nx, d=dx) rhs_hat = np.fft.fft(rhs) denom = 1.0 + scale * D * (np.abs(k) ** beta_val) # implicit-in-time diagonal u_hat = rhs_hat / denom u_new = np.fft.ifft(u_hat).real return u_new[1:-1] # interior to match caller
[docs] def _compute_temporal_derivative( self, solution: np.ndarray, n: int, alpha: Union[float, FractionalOrder], dt: float, **kwargs, ) -> np.ndarray: """Computes the RHS vector of the system, based on previous time steps.""" if isinstance(alpha, FractionalOrder): alpha_val = alpha.alpha else: alpha_val = alpha # For implicit schemes, construct the historical part of the temporal derivative sum_term = np.zeros_like(solution[1:-1, 0]) if 0 < alpha_val < 1: # L1 scheme coefficients def l1_coeffs(k): if k < 0: return 0 return (k + 1)**(1 - alpha_val) - k**(1 - alpha_val) for j in range(1, n): sum_term += (l1_coeffs(n - j - 1) - l1_coeffs(n - j)) * solution[1:-1, j] rhs_temporal = sum_term + l1_coeffs(n - 1) * solution[1:-1, 0] elif 1 < alpha_val < 2: from scipy.special import gamma # L2-type scheme coefficients def c_coeffs(k): if k < 0: return 0 return (k + 1)**(2 - alpha_val) - k**(2 - alpha_val) # Summation part of the history term for j in range(1, n): sum_term += (c_coeffs(n - j - 1) - c_coeffs(n - j)) * solution[1:-1, j] # Add the u_0 term history_term = sum_term + c_coeffs(n - 1) * solution[1:-1, 0] # This is only part of the RHS, needs scaling and other terms. # (u_n - (2u_{n-1} - u_{n-2}) - history) / scale = D*A*u_n # (1/scale)u_n - D*A*u_n = (1/scale)*(2u_{n-1} - u_{n-2} + history) scaling_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val)) if n > 1: rhs_temporal = scaling_factor * (2*solution[1:-1, n-1] - solution[1:-1, n-2] + history_term) else: # n=1 rhs_temporal = scaling_factor * (2*solution[1:-1, n-1] + history_term) elif alpha_val == 1.0: # Crank-Nicolson for RHS # A_LHS u_n = A_RHS u_{n-1} # We solve A_LHS u_n = b, so b = A_RHS u_{n-1} nx = kwargs['nx'] beta = kwargs['beta'] diffusion_coeff = kwargs['diffusion_coeff'] dx = kwargs['dx'] A_RHS = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='rhs') rhs_temporal = A_RHS @ solution[1:-1, n-1] else: # Should not happen for alpha > 0 rhs_temporal = np.zeros_like(solution[1:-1, 0]) return rhs_temporal
[docs] def _compute_spatial_derivative( self, u: np.ndarray, beta: Union[float, FractionalOrder], dx: float ) -> np.ndarray: """ Compute spatial fractional derivative. Args: u: Solution at current time beta: Spatial fractional order dx: Spatial step size Returns: Spatial derivative """ if isinstance(beta, FractionalOrder): beta_val = beta.alpha else: beta_val = beta nx = len(u) # Use finite difference approximation for spatial derivative spatial_deriv = np.zeros(nx) coeffs = self._grunwald_letnikov_coeffs(beta_val, nx) for i in range(1, nx - 1): for k in range(i + 1): spatial_deriv[i] += coeffs[k] * u[i - k] return spatial_deriv / (dx ** beta_val)
[docs] def _grunwald_letnikov_coeffs(self, order: float, n_points: int) -> np.ndarray: """Compute Grünwald-Letnikov coefficients.""" coeffs = np.zeros(n_points, dtype=float) coeffs[0] = 1.0 for k in range(1, n_points): # c_k = (-1)^k * C(order, k) with recursive form coeffs[k] = -coeffs[k - 1] * (order - (k - 1)) / k return coeffs
[docs] def _validate_orders(self, alpha: float, beta: float): """Validate fractional orders for PDE solver.""" if not 0 < alpha <= 2: raise ValueError(f"Temporal order alpha must be in (0, 2], but got {alpha}") if not 0 < beta <= 2: raise ValueError(f"Spatial order beta must be in (0, 2], but got {beta}")
[docs] def _build_spatial_matrix( self, nx: int, beta: Union[float, FractionalOrder], diffusion_coeff: float, dx: float, dt: float, alpha: Union[float, FractionalOrder], side: str = 'lhs', # 'lhs' or 'rhs' ) -> sparse.spmatrix: """ Build spatial discretization matrix. Args: nx: Number of spatial points beta: Spatial fractional order diffusion_coeff: Diffusion coefficient dx: Spatial step size dt: Temporal step size alpha: Temporal fractional order side: 'lhs' for left-hand side of the equation, 'rhs' for right-hand side Returns: Sparse matrix for spatial discretization """ if isinstance(alpha, FractionalOrder): alpha_val = alpha.alpha else: alpha_val = alpha A = self._get_spatial_operator(nx, beta, dx) n_interior = nx - 2 from scipy.special import gamma I = np.eye(n_interior) if 0 < alpha_val < 1: # Implicit Euler for fractional derivative: (I - D*gamma(2-a)dt^a * A) u_n = history factor = diffusion_coeff * gamma(2 - alpha_val) * (dt ** alpha_val) final_matrix = I - factor * A elif 1 < alpha_val < 2: i_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val)) spatial_factor = diffusion_coeff final_matrix = i_factor * I - spatial_factor * A elif alpha_val == 1.0: # Crank-Nicolson: (I - k/2 A) u_n = (I + k/2 A) u_{n-1} k = diffusion_coeff * dt if side == 'lhs': final_matrix = I - (k/2) * A else: # rhs final_matrix = I + (k/2) * A else: # alpha = 2.0 handled explicitly, this is a fallback final_matrix = np.eye(n_interior) return sparse.csr_matrix(final_matrix)
[docs] def _get_spatial_operator( self, nx: int, beta: Union[float, FractionalOrder], dx: float ) -> np.ndarray: """Computes the matrix for the Grünwald-Letnikov spatial derivative.""" if isinstance(beta, FractionalOrder): beta_val = beta.alpha else: beta_val = beta n_interior = nx - 2 A = np.zeros((n_interior, n_interior)) coeffs = self._grunwald_letnikov_coeffs(beta_val, nx) for i in range(n_interior): for j in range(n_interior): k = i - j if 0 <= k < len(coeffs): A[i, j] = coeffs[k] return A / (dx ** beta_val)
[docs] class FractionalAdvectionSolver(FractionalPDESolver): """ Solver for fractional advection-diffusion equations. Solves equations of the form: D_t^α u = v * D_x^β u Currently, only integer orders (alpha=1, beta=1) are supported. """
[docs] def __init__( self, method: str = "finite_difference", spatial_order: int = 2, temporal_order: int = 1, derivative_type: str = "caputo", ): super().__init__("advection", method, spatial_order, temporal_order) self.derivative_type = derivative_type.lower()
[docs] def solve( self, t_span: Tuple[float, float], x_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Dict, alpha: float, beta: float, **kwargs, ) -> Dict[str, np.ndarray]: """Solve the fractional advection equation.""" if not (alpha == 1.0 and beta == 1.0): raise NotImplementedError( "FractionalAdvectionSolver currently only supports integer orders (alpha=1, beta=1)." ) return super().solve( t_span, x_span, initial_condition, boundary_conditions, alpha, beta, **kwargs )
[docs] def _build_spatial_matrix( self, nx: int, beta: Union[float, FractionalOrder], diffusion_coeff: float, dx: float, dt: float, alpha: Union[float, FractionalOrder], side: str = 'lhs', # 'lhs' or 'rhs' ) -> sparse.spmatrix: """ Build spatial discretization matrix. Args: nx: Number of spatial points beta: Spatial fractional order diffusion_coeff: Diffusion coefficient dx: Spatial step size dt: Temporal step size alpha: Temporal fractional order side: 'lhs' for left-hand side of the equation, 'rhs' for right-hand side Returns: Sparse matrix for spatial discretization """ if isinstance(alpha, FractionalOrder): alpha_val = alpha.alpha else: alpha_val = alpha A = self._get_spatial_operator(nx, beta, dx) n_interior = nx - 2 from scipy.special import gamma I = np.eye(n_interior) if 0 < alpha_val < 1: # Implicit Euler for fractional derivative: (I - D*gamma(2-a)dt^a * A) u_n = history factor = diffusion_coeff * gamma(2 - alpha_val) * (dt ** alpha_val) final_matrix = I - factor * A elif 1 < alpha_val < 2: i_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val)) spatial_factor = diffusion_coeff final_matrix = i_factor * I - spatial_factor * A elif alpha_val == 1.0: # Crank-Nicolson: (I - k/2 A) u_n = (I + k/2 A) u_{n-1} k = diffusion_coeff * dt if side == 'lhs': final_matrix = I - (k/2) * A else: # rhs final_matrix = I + (k/2) * A else: # alpha = 2.0 handled explicitly, this is a fallback final_matrix = np.eye(n_interior) return sparse.csr_matrix(final_matrix)
[docs] def _get_spatial_operator( self, nx: int, beta: Union[float, FractionalOrder], dx: float ) -> np.ndarray: """Computes the matrix for the Grünwald-Letnikov spatial derivative.""" if isinstance(beta, FractionalOrder): beta_val = beta.alpha else: beta_val = beta n_interior = nx - 2 A = np.zeros((n_interior, n_interior)) coeffs = self._grunwald_letnikov_coeffs(beta_val, nx) for i in range(n_interior): for j in range(n_interior): k = i - j if 0 <= k < len(coeffs): A[i, j] = coeffs[k] return A / (dx ** beta_val)
[docs] class FractionalReactionDiffusionSolver(FractionalPDESolver): """ Solver for fractional reaction-diffusion equations. Solves equations of the form: ∂^α u/∂t^α = D ∂^β u/∂x^β + R(u) + f(x, t, u) where α and β are fractional orders and R(u) is the reaction term. """
[docs] def __init__( self, method: str = "finite_difference", spatial_order: int = 2, temporal_order: int = 1, derivative_type: str = "caputo", ): """ Initialize fractional reaction-diffusion solver. Args: method: Numerical method spatial_order: Order of spatial discretization temporal_order: Order of temporal discretization derivative_type: Type of fractional derivative """ super().__init__("reaction_diffusion", method, spatial_order, temporal_order) self.derivative_type = derivative_type.lower()
[docs] def solve( self, x_span: Tuple[float, float], t_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Tuple[Callable, Callable], alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], diffusion_coeff: float = 1.0, reaction_term: Optional[Callable] = None, source_term: Optional[Callable] = None, nx: int = 100, nt: int = 100, **kwargs, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Solve fractional reaction-diffusion equation. Args: x_span: Spatial interval (x0, xf) t_span: Time interval (t0, tf) initial_condition: Initial condition u(x, 0) boundary_conditions: Boundary conditions (left_bc, right_bc) alpha: Temporal fractional order beta: Spatial fractional order diffusion_coeff: Diffusion coefficient reaction_term: Reaction term R(u) source_term: Source term f(x, t, u) nx: Number of spatial grid points nt: Number of temporal grid points **kwargs: Additional solver parameters Returns: Tuple of (t_values, x_values, solution) """ # Combine diffusion and reaction terms def combined_source(x, t, u): source = 0.0 if reaction_term is not None: source += reaction_term(u) if source_term is not None: source += source_term(x, t, u) return source # Use diffusion solver with combined source term diffusion_solver = FractionalDiffusionSolver( self.method, self.spatial_order, self.temporal_order, self.derivative_type) return diffusion_solver.solve( x_span, t_span, initial_condition, boundary_conditions, alpha, beta, diffusion_coeff, combined_source, nx, nt, **kwargs, )
# Convenience functions
[docs] def solve_fractional_diffusion( x_span: Tuple[float, float], t_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Tuple[Callable, Callable], alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], diffusion_coeff: float = 1.0, source_term: Optional[Callable] = None, method: str = "finite_difference", nx: int = 100, nt: int = 100, **kwargs, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Solve fractional diffusion equation. Args: x_span: Spatial interval (x0, xf) t_span: Time interval (t0, tf) initial_condition: Initial condition u(x, 0) boundary_conditions: Boundary conditions (left_bc, right_bc) alpha: Temporal fractional order beta: Spatial fractional order diffusion_coeff: Diffusion coefficient source_term: Source term f(x, t, u) method: Numerical method nx: Number of spatial grid points nt: Number of temporal grid points **kwargs: Additional solver parameters Returns: Tuple of (t_values, x_values, solution) """ solver = FractionalDiffusionSolver(method) return solver.solve( x_span, t_span, initial_condition, boundary_conditions, alpha, beta, diffusion_coeff, source_term, nx, nt, **kwargs, )
[docs] def solve_fractional_advection( x_span: Tuple[float, float], t_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Tuple[Callable, Callable], alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], velocity: float = 1.0, source_term: Optional[Callable] = None, method: str = "finite_difference", nx: int = 100, nt: int = 100, **kwargs, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Solve fractional advection equation. Args: x_span: Spatial interval (x0, xf) t_span: Time interval (t0, tf) initial_condition: Initial condition u(x, 0) boundary_conditions: Boundary conditions (left_bc, right_bc) alpha: Temporal fractional order beta: Spatial fractional order velocity: Advection velocity source_term: Source term f(x, t, u) method: Numerical method nx: Number of spatial grid points nt: Number of temporal grid points **kwargs: Additional solver parameters Returns: Tuple of (t_values, x_values, solution) """ solver = FractionalAdvectionSolver(method) return solver.solve( x_span, t_span, initial_condition, boundary_conditions, alpha, beta, velocity, source_term, nx, nt, **kwargs, )
[docs] def solve_fractional_reaction_diffusion( x_span: Tuple[float, float], t_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Tuple[Callable, Callable], alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], diffusion_coeff: float = 1.0, reaction_term: Optional[Callable] = None, source_term: Optional[Callable] = None, method: str = "finite_difference", nx: int = 100, nt: int = 100, **kwargs, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Solve fractional reaction-diffusion equation. Args: x_span: Spatial interval (x0, xf) t_span: Time interval (t0, tf) initial_condition: Initial condition u(x, 0) boundary_conditions: Boundary conditions (left_bc, right_bc) alpha: Temporal fractional order beta: Spatial fractional order diffusion_coeff: Diffusion coefficient reaction_term: Reaction term R(u) source_term: Source term f(x, t, u) method: Numerical method nx: Number of spatial grid points nt: Number of temporal grid points **kwargs: Additional solver parameters Returns: Tuple of (t_values, x_values, solution) """ solver = FractionalReactionDiffusionSolver(method) return solver.solve( x_span, t_span, initial_condition, boundary_conditions, alpha, beta, diffusion_coeff, reaction_term, source_term, nx, nt, **kwargs, )
[docs] def solve_fractional_pde( x_span: Tuple[float, float], t_span: Tuple[float, float], initial_condition: Callable, boundary_conditions: Tuple[Callable, Callable], alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder], equation_type: str = "diffusion", **kwargs, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Generic solver for fractional PDEs. Args: x_span: Spatial interval (x0, xf) t_span: Time interval (t0, tf) initial_condition: Initial condition u(x, 0) boundary_conditions: Boundary conditions (left_bc, right_bc) alpha: Temporal fractional order beta: Spatial fractional order equation_type: Type of PDE ("diffusion", "advection", "reaction_diffusion") **kwargs: Additional solver parameters Returns: Tuple of (t_values, x_values, solution) """ if equation_type == "diffusion": return solve_fractional_diffusion( x_span, t_span, initial_condition, boundary_conditions, alpha, beta, **kwargs ) elif equation_type == "advection": return solve_fractional_advection( x_span, t_span, initial_condition, boundary_conditions, alpha, beta, **kwargs ) elif equation_type == "reaction_diffusion": return solve_fractional_reaction_diffusion( x_span, t_span, initial_condition, boundary_conditions, alpha, beta, **kwargs ) else: raise ValueError(f"Unknown equation type: {equation_type}")