"""
Fractional Partial Differential Equation Solvers
This module provides comprehensive solvers for fractional PDEs including
finite difference methods, spectral methods, and adaptive mesh refinement.
Performance Note (v2.1.0):
- Intelligent backend selection for sparse matrix operations
- Optimal array operations based on problem size
"""
import numpy as np
from typing import Union, Optional, Tuple, Callable, Dict
from scipy import sparse
from scipy.sparse.linalg import spsolve, splu
from scipy.special import gamma
from ..core.definitions import FractionalOrder
# Initialize intelligent backend selector for PDE solvers
_intelligent_selector = None
_use_intelligent_backend = True
[docs]
def _get_intelligent_selector():
"""Get intelligent backend selector instance for PDE solvers."""
global _intelligent_selector, _use_intelligent_backend
if not _use_intelligent_backend:
return None
if _intelligent_selector is None:
try:
from ..ml.intelligent_backend_selector import IntelligentBackendSelector
_intelligent_selector = IntelligentBackendSelector(enable_learning=True)
except ImportError:
_use_intelligent_backend = False
_intelligent_selector = None
return _intelligent_selector
[docs]
def _select_array_backend(data_size: int, operation_type: str = "element_wise") -> str:
"""
Select optimal backend for array operations in PDE solving.
Args:
data_size: Number of elements to process
operation_type: Type of operation (e.g., "element_wise", "matrix_multiply")
Returns:
Backend name: "numpy", "numba", or "jax"
"""
selector = _get_intelligent_selector()
if selector is not None:
try:
from ..ml.intelligent_backend_selector import WorkloadCharacteristics
from ..ml.backends import BackendType
workload = WorkloadCharacteristics(
operation_type=operation_type,
data_size=data_size,
data_shape=(data_size,),
is_iterative=True
)
backend_type = selector.select_backend(workload)
# Map to appropriate backends
if backend_type == BackendType.NUMBA:
return "numba"
elif backend_type == BackendType.JAX:
return "jax"
else:
return "numpy"
except Exception:
pass # Fall through to default
# Default selection based on size
if data_size < 10000:
return "numpy" # Small problems: NumPy is fastest
else:
return "numba" # Large problems: Numba JIT compilation helps
[docs]
class FractionalPDESolver:
"""
Base class for fractional PDE solvers.
Provides common functionality for solving fractional partial
differential equations of various types.
"""
[docs]
def __init__(
self,
pde_type: str = "diffusion",
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
adaptive: bool = False,
*,
fractional_order: Optional[Union[float, FractionalOrder]] = None,
boundary_conditions: Optional[str] = None,
):
"""
Initialize fractional PDE solver.
Args:
pde_type: Type of PDE ("diffusion", "advection", "reaction_diffusion")
method: Numerical method ("finite_difference", "spectral", "finite_element")
spatial_order: Order of spatial discretization
temporal_order: Order of temporal discretization
adaptive: Use adaptive mesh refinement
"""
self.pde_type = pde_type.lower()
self.method = method.lower()
self.spatial_order = spatial_order
self.temporal_order = temporal_order
self.adaptive = adaptive
# Accept alias for tests; stored for compatibility only
self.fractional_order = fractional_order
# Store boundary condition mode for compatibility (e.g., "dirichlet", "periodic")
self.boundary_conditions = boundary_conditions or "dirichlet"
# Validate PDE type
valid_pde_types = ["diffusion", "advection",
"reaction_diffusion", "wave"]
if self.pde_type not in valid_pde_types:
raise ValueError(f"PDE type must be one of {valid_pde_types}")
# Validate method
valid_methods = ["finite_difference", "spectral", "finite_element"]
if self.method not in valid_methods:
raise ValueError(f"Method must be one of {valid_methods}")
[docs]
def _validate_orders(self, alpha: float, beta: float):
"""Validate fractional orders for PDE solver."""
if not 0 < alpha <= 2:
raise ValueError(f"Temporal order alpha must be in (0, 2], but got {alpha}")
if not 0 < beta <= 2:
raise ValueError(f"Spatial order beta must be in (0, 2], but got {beta}")
[docs]
def solve(
self,
t_span: Tuple[float, float],
x_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Dict,
alpha: float,
beta: float,
**kwargs,
) -> Dict[str, np.ndarray]:
"""Solve the fractional PDE."""
self._validate_orders(alpha, beta)
t0, tf = t_span
x_start, x_end = x_span
nt = kwargs.get("nt", 100)
nx = kwargs.get("nx", 100)
t_values = np.linspace(t0, tf, nt)
x_values = np.linspace(x_start, x_end, nx)
dt = t_values[1] - t_values[0]
dx = x_values[1] - x_values[0]
solution = np.zeros((nx, nt))
solution[:, 0] = initial_condition(x_values)
diffusion_coeff = kwargs.get("diffusion_coeff", 1.0)
A = self._build_spatial_matrix(nx, dx, beta, diffusion_coeff)
A_lu = splu(A)
# Precompute L1 weights
j = np.arange(1, nt)
l1_weights = (j + 1)**(1 - alpha) - j**(1 - alpha)
for n in range(nt - 1):
history = np.sum(l1_weights[:n] * np.diff(solution[:, :n+1], axis=1), axis=1)
rhs = solution[:, n] - (dt**alpha / gamma(2 - alpha)) * history
# Add source term if provided
if 'source_term' in kwargs:
source = kwargs['source_term'](x_values, t_values[n+1])
rhs += dt * source
solution[1:-1, n + 1] = A_lu.solve(rhs[1:-1])
# Apply boundary conditions
if 'dirichlet' in boundary_conditions:
solution[0, n + 1] = boundary_conditions['dirichlet'][0](t_values[n+1])
solution[-1, n + 1] = boundary_conditions['dirichlet'][1](t_values[n+1])
return {"t": t_values, "x": x_values, "u": solution}
[docs]
class FractionalDiffusionSolver(FractionalPDESolver):
"""
Solver for fractional diffusion equations.
Solves equations of the form:
∂^α u/∂t^α = D ∂^β u/∂x^β + f(x, t, u)
where α and β are fractional orders.
"""
[docs]
def __init__(
self,
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
derivative_type: str = "caputo",
):
"""
Initialize fractional diffusion solver.
Args:
method: Numerical method
spatial_order: Order of spatial discretization
temporal_order: Order of temporal discretization
derivative_type: Type of fractional derivative
"""
super().__init__("diffusion", method, spatial_order, temporal_order)
self.derivative_type = derivative_type.lower()
[docs]
def solve(
self,
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
source_term: Optional[Callable] = None,
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
source_term: Source term f(x, t, u)
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
self._validate_orders(alpha, beta)
x0, xf = x_span
t0, tf = t_span
# Grids
x_values = np.linspace(x0, xf, nx)
t_values = np.linspace(t0, tf, nt)
dx = x_values[1] - x_values[0]
dt = t_values[1] - t_values[0]
# Solution array in (nx, nt) to reuse base formulations
solution = np.zeros((nx, nt), dtype=float)
solution[:, 0] = np.array([initial_condition(xi) for xi in x_values], dtype=float)
alpha_val = float(alpha.alpha) if hasattr(alpha, 'alpha') else float(alpha)
if self.method == "spectral":
# Periodic spectral implicit step per time level
D = diffusion_coeff
k = 2.0 * np.pi * np.fft.fftfreq(nx, d=dx)
abs_k_beta = np.abs(k) ** (float(beta.alpha) if hasattr(beta, 'alpha') else float(beta))
for n in range(1, nt):
# L1 history accumulation for 0<alpha<1, fallback to CN at alpha=1
if 0.0 < alpha_val < 1.0:
j = np.arange(1, n + 1, dtype=float)
a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val)
hist = np.zeros(nx, dtype=float)
for m in range(1, n + 1):
hist += a[m - 1] * (solution[:, n - m + 1 - 1] if n - m >= 0 else 0.0)
scale = gamma(2.0 - alpha_val) * (dt ** alpha_val)
rhs = hist.copy()
elif alpha_val == 1.0:
# CN-like scaling
rhs = solution[:, n - 1]
scale = dt
else:
# Fallback simple history for 1<alpha<2
j = np.arange(1, n + 1, dtype=float)
c = j**(2.0 - alpha_val) - (j - 1.0)**(2.0 - alpha_val)
hist = np.zeros(nx, dtype=float)
for m in range(1, n + 1):
hist += c[m - 1] * solution[:, n - m]
scale = 1.0 / (gamma(3.0 - alpha_val) * (dt ** alpha_val))
rhs = hist.copy()
if source_term is not None:
s = np.array([source_term(xi, t_values[n], solution[n - 1, ix] if n > 0 else solution[0, ix]) for ix, xi in enumerate(x_values)], dtype=float)
rhs = rhs + scale * s
rhs_hat = np.fft.fft(rhs)
denom = 1.0 + scale * D * abs_k_beta
u_hat = rhs_hat / denom
u_new = np.fft.ifft(u_hat).real
solution[:, n] = u_new
return t_values, x_values, solution.T
# Finite difference implicit solve with L1/CN/L2-1σ variants
A = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha, side='lhs')
A_lu = splu(A.tocsc())
if 0.0 < alpha_val < 1.0:
# Precompute L1 weights
j = np.arange(1, nt, dtype=float)
a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val)
scale = dt ** alpha_val / gamma(2.0 - alpha_val)
for n in range(0, nt - 1):
# Build RHS using history of increments
if n >= 1:
# increments of u in time for all past steps 1..n
diffs = np.diff(solution[:, : n + 1], axis=1) # shape (nx, n)
# weight for each past increment Δu^{k} is a_{n-k}
weights = a[:n][::-1] # length n
history = diffs @ weights # (nx,)
else:
history = np.zeros(nx, dtype=float)
rhs_full = solution[:, n] - scale * history
if source_term is not None:
# Source term may accept (x, t) or (x, t, u); support both
try:
s_n = np.array([source_term(xi, t_values[n + 1], solution[ix, n]) for ix, xi in enumerate(x_values)], dtype=float)
except TypeError:
s_n = np.array([source_term(xi, t_values[n + 1]) for xi in x_values], dtype=float)
rhs_full += dt * s_n
# Dirichlet boundaries enforced after solve; interior system
rhs = rhs_full[1:-1]
sol_interior = A_lu.solve(rhs)
solution[1:-1, n + 1] = sol_interior
# Apply boundary conditions
solution[0, n + 1] = boundary_conditions[0](t_values[n + 1])
solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1])
elif alpha_val == 1.0:
A_lhs = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='lhs')
A_rhs = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='rhs')
A_lu = splu(A_lhs.tocsc())
for n in range(0, nt - 1):
rhs_full = (A_rhs @ solution[1:-1, n])
if source_term is not None:
s_n = np.array([source_term(xi, t_values[n + 1]) for xi in x_values[1:-1]], dtype=float)
rhs_full += dt * s_n
sol_interior = A_lu.solve(rhs_full)
solution[1:-1, n + 1] = sol_interior
solution[0, n + 1] = boundary_conditions[0](t_values[n + 1])
solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1])
else:
# Basic L2-1σ-like accumulation (smoke path)
j = np.arange(1, nt, dtype=float)
c = j**(2.0 - alpha_val) - (j - 1.0)**(2.0 - alpha_val)
inv_scale = 1.0 / (gamma(3.0 - alpha_val) * (dt ** alpha_val))
for n in range(0, nt - 1):
hist = np.zeros(nx - 2, dtype=float)
for m in range(1, n + 1):
hist += c[m - 1] * solution[1:-1, n + 1 - m]
rhs = inv_scale * hist
if source_term is not None:
s_n = np.array([source_term(xi, t_values[n + 1]) for xi in x_values[1:-1]], dtype=float)
rhs += dt * s_n
sol_interior = A_lu.solve(rhs)
solution[1:-1, n + 1] = sol_interior
solution[0, n + 1] = boundary_conditions[0](t_values[n + 1])
solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1])
return t_values, x_values, solution.T
[docs]
def _solve_spatial_step(
self,
solution: np.ndarray,
n: int,
x_values: np.ndarray,
t_values: np.ndarray,
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
source_term: Optional[Callable],
nx: int,
nt: int,
pde_params: dict,
) -> np.ndarray:
"""
Solve spatial problem at current time step.
Args:
solution: Solution array
n: Current time step
x_values: Spatial grid
t_values: Time grid
alpha: Temporal fractional order
beta: Spatial fractional order
source_term: Source term
nx: Number of spatial grid points
nt: Number of temporal grid points
pde_params: Dictionary of parameters (diffusion_coeff, dx, dt)
Returns:
Solution at interior points
"""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
dt = t_values[1] - t_values[0]
# Handle explicit scheme for alpha=2.0 (wave equation) separately
if alpha_val == 2.0:
diffusion_coeff = pde_params.get('diffusion_coeff', 1.0)
dx = pde_params.get('dx')
spatial_op = self._get_spatial_operator(nx, beta, dx)
if n == 1: # First step, assuming u_t(x,0)=0
spatial_deriv_term = diffusion_coeff * (spatial_op @ solution[1:-1, 0])
return solution[1:-1, 0] + (dt**2 / 2) * spatial_deriv_term
else: # Subsequent steps
spatial_deriv_term = diffusion_coeff * (spatial_op @ solution[1:-1, n-1])
return 2*solution[1:-1, n-1] - solution[1:-1, n-2] + dt**2 * spatial_deriv_term
if self.method == "finite_difference":
return self._finite_difference_step(
solution, n, x_values, t_values,
alpha, beta, source_term, nx, nt, pde_params
)
elif self.method == "spectral":
return self._spectral_step(
solution, n, x_values, t_values,
alpha, beta, source_term, nx, nt, pde_params
)
else:
raise ValueError(f"Unknown method: {self.method}")
[docs]
def _finite_difference_step(
self, solution: np.ndarray, n: int, x_values: np.ndarray, t_values: np.ndarray,
alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder],
source_term: Optional[Callable], nx: int, nt: int, pde_params: dict,
) -> np.ndarray:
"""
Finite difference step.
Args:
solution: Solution array
n: Current time step
x_values: Spatial grid
t_values: Time grid
alpha: Temporal fractional order
beta: Spatial fractional order
source_term: Source term
nx: Number of spatial grid points
nt: Number of temporal grid points
pde_params: Dictionary of parameters (diffusion_coeff, dx, dt)
Returns:
Solution at interior points
"""
dt = t_values[1] - t_values[0]
dx = x_values[1] - x_values[0]
diffusion_coeff = pde_params.get("diffusion_coeff", 1.0)
# Build the left-hand side matrix A
A = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha, side='lhs')
# Build the right-hand side vector b from previous time steps
b = self._compute_temporal_derivative(solution, n, alpha, dt, **pde_params)
# Add source term if provided
if source_term:
source_at_n = np.array(
[source_term(xi, t_values[n]) for xi in x_values[1:-1]]
)
b += source_at_n
# Solve the linear system Au_n = b
u_interior = spsolve(A, b)
return u_interior
[docs]
def _spectral_step(
self,
solution, n, x_values, t_values, alpha, beta, source_term, nx, nt, pde_params
):
"""
Correct, efficient, diagonal spectral solve for periodic BCs.
"""
dx = x_values[1] - x_values[0]
dt = t_values[1] - t_values[0]
D = pde_params.get("diffusion_coeff", 1.0)
alpha_val = float(alpha.alpha) if hasattr(alpha, "alpha") else float(alpha)
beta_val = float(beta.alpha) if hasattr(beta, "alpha") else float(beta)
if not (0.0 < alpha_val < 1.0):
raise NotImplementedError(
f"Spectral PDE scheme is implemented for 0 < α < 1, got α={alpha_val}. "
"For α ≥ 1, consider decomposing into integer and fractional parts, or use "
"finite difference schemes with the L1/L2 methods. For α ≤ 0, the problem "
"is not well-defined for fractional PDEs."
)
u_prev = solution[:, n-1]
# L1 history RHS: sum_{j=1..n} a_j * u^{n-j}
j = np.arange(1, n+1, dtype=float)
a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val)
hist = np.zeros_like(u_prev)
for m in range(1, n+1):
hist += a[m-1] * solution[:, n-m]
scale = gamma(2.0 - alpha_val) * (dt ** alpha_val)
rhs = hist.copy()
if source_term is not None:
s = np.array([source_term(x, t_values[n], u_prev[ix]) for ix, x in enumerate(x_values)])
rhs += scale * s
k = 2.0 * np.pi * np.fft.fftfreq(nx, d=dx)
rhs_hat = np.fft.fft(rhs)
denom = 1.0 + scale * D * (np.abs(k) ** beta_val) # implicit-in-time diagonal
u_hat = rhs_hat / denom
u_new = np.fft.ifft(u_hat).real
return u_new[1:-1] # interior to match caller
[docs]
def _compute_temporal_derivative(
self,
solution: np.ndarray,
n: int,
alpha: Union[float, FractionalOrder],
dt: float,
**kwargs,
) -> np.ndarray:
"""Computes the RHS vector of the system, based on previous time steps."""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
# For implicit schemes, construct the historical part of the temporal derivative
sum_term = np.zeros_like(solution[1:-1, 0])
if 0 < alpha_val < 1:
# L1 scheme coefficients
def l1_coeffs(k):
if k < 0: return 0
return (k + 1)**(1 - alpha_val) - k**(1 - alpha_val)
for j in range(1, n):
sum_term += (l1_coeffs(n - j - 1) - l1_coeffs(n - j)) * solution[1:-1, j]
rhs_temporal = sum_term + l1_coeffs(n - 1) * solution[1:-1, 0]
elif 1 < alpha_val < 2:
from scipy.special import gamma
# L2-type scheme coefficients
def c_coeffs(k):
if k < 0: return 0
return (k + 1)**(2 - alpha_val) - k**(2 - alpha_val)
# Summation part of the history term
for j in range(1, n):
sum_term += (c_coeffs(n - j - 1) - c_coeffs(n - j)) * solution[1:-1, j]
# Add the u_0 term
history_term = sum_term + c_coeffs(n - 1) * solution[1:-1, 0]
# This is only part of the RHS, needs scaling and other terms.
# (u_n - (2u_{n-1} - u_{n-2}) - history) / scale = D*A*u_n
# (1/scale)u_n - D*A*u_n = (1/scale)*(2u_{n-1} - u_{n-2} + history)
scaling_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val))
if n > 1:
rhs_temporal = scaling_factor * (2*solution[1:-1, n-1] - solution[1:-1, n-2] + history_term)
else: # n=1
rhs_temporal = scaling_factor * (2*solution[1:-1, n-1] + history_term)
elif alpha_val == 1.0:
# Crank-Nicolson for RHS
# A_LHS u_n = A_RHS u_{n-1}
# We solve A_LHS u_n = b, so b = A_RHS u_{n-1}
nx = kwargs['nx']
beta = kwargs['beta']
diffusion_coeff = kwargs['diffusion_coeff']
dx = kwargs['dx']
A_RHS = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='rhs')
rhs_temporal = A_RHS @ solution[1:-1, n-1]
else: # Should not happen for alpha > 0
rhs_temporal = np.zeros_like(solution[1:-1, 0])
return rhs_temporal
[docs]
def _compute_spatial_derivative(
self, u: np.ndarray, beta: Union[float, FractionalOrder], dx: float
) -> np.ndarray:
"""
Compute spatial fractional derivative.
Args:
u: Solution at current time
beta: Spatial fractional order
dx: Spatial step size
Returns:
Spatial derivative
"""
if isinstance(beta, FractionalOrder):
beta_val = beta.alpha
else:
beta_val = beta
nx = len(u)
# Use finite difference approximation for spatial derivative
spatial_deriv = np.zeros(nx)
coeffs = self._grunwald_letnikov_coeffs(beta_val, nx)
for i in range(1, nx - 1):
for k in range(i + 1):
spatial_deriv[i] += coeffs[k] * u[i - k]
return spatial_deriv / (dx ** beta_val)
[docs]
def _grunwald_letnikov_coeffs(self, order: float, n_points: int) -> np.ndarray:
"""Compute Grünwald-Letnikov coefficients."""
coeffs = np.zeros(n_points, dtype=float)
coeffs[0] = 1.0
for k in range(1, n_points):
# c_k = (-1)^k * C(order, k) with recursive form
coeffs[k] = -coeffs[k - 1] * (order - (k - 1)) / k
return coeffs
[docs]
def _validate_orders(self, alpha: float, beta: float):
"""Validate fractional orders for PDE solver."""
if not 0 < alpha <= 2:
raise ValueError(f"Temporal order alpha must be in (0, 2], but got {alpha}")
if not 0 < beta <= 2:
raise ValueError(f"Spatial order beta must be in (0, 2], but got {beta}")
[docs]
def _build_spatial_matrix(
self,
nx: int,
beta: Union[float, FractionalOrder],
diffusion_coeff: float,
dx: float,
dt: float,
alpha: Union[float, FractionalOrder],
side: str = 'lhs', # 'lhs' or 'rhs'
) -> sparse.spmatrix:
"""
Build spatial discretization matrix.
Args:
nx: Number of spatial points
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
dx: Spatial step size
dt: Temporal step size
alpha: Temporal fractional order
side: 'lhs' for left-hand side of the equation, 'rhs' for right-hand side
Returns:
Sparse matrix for spatial discretization
"""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
A = self._get_spatial_operator(nx, beta, dx)
n_interior = nx - 2
from scipy.special import gamma
I = np.eye(n_interior)
if 0 < alpha_val < 1:
# Implicit Euler for fractional derivative: (I - D*gamma(2-a)dt^a * A) u_n = history
factor = diffusion_coeff * gamma(2 - alpha_val) * (dt ** alpha_val)
final_matrix = I - factor * A
elif 1 < alpha_val < 2:
i_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val))
spatial_factor = diffusion_coeff
final_matrix = i_factor * I - spatial_factor * A
elif alpha_val == 1.0:
# Crank-Nicolson: (I - k/2 A) u_n = (I + k/2 A) u_{n-1}
k = diffusion_coeff * dt
if side == 'lhs':
final_matrix = I - (k/2) * A
else: # rhs
final_matrix = I + (k/2) * A
else: # alpha = 2.0 handled explicitly, this is a fallback
final_matrix = np.eye(n_interior)
return sparse.csr_matrix(final_matrix)
[docs]
def _get_spatial_operator(
self, nx: int, beta: Union[float, FractionalOrder], dx: float
) -> np.ndarray:
"""Computes the matrix for the Grünwald-Letnikov spatial derivative."""
if isinstance(beta, FractionalOrder):
beta_val = beta.alpha
else:
beta_val = beta
n_interior = nx - 2
A = np.zeros((n_interior, n_interior))
coeffs = self._grunwald_letnikov_coeffs(beta_val, nx)
for i in range(n_interior):
for j in range(n_interior):
k = i - j
if 0 <= k < len(coeffs):
A[i, j] = coeffs[k]
return A / (dx ** beta_val)
[docs]
class FractionalAdvectionSolver(FractionalPDESolver):
"""
Solver for fractional advection-diffusion equations.
Solves equations of the form:
D_t^α u = v * D_x^β u
Currently, only integer orders (alpha=1, beta=1) are supported.
"""
[docs]
def __init__(
self,
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
derivative_type: str = "caputo",
):
super().__init__("advection", method, spatial_order, temporal_order)
self.derivative_type = derivative_type.lower()
[docs]
def solve(
self,
t_span: Tuple[float, float],
x_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Dict,
alpha: float,
beta: float,
**kwargs,
) -> Dict[str, np.ndarray]:
"""Solve the fractional advection equation."""
if not (alpha == 1.0 and beta == 1.0):
raise NotImplementedError(
"FractionalAdvectionSolver currently only supports integer orders (alpha=1, beta=1)."
)
return super().solve(
t_span, x_span, initial_condition, boundary_conditions, alpha, beta, **kwargs
)
[docs]
def _build_spatial_matrix(
self,
nx: int,
beta: Union[float, FractionalOrder],
diffusion_coeff: float,
dx: float,
dt: float,
alpha: Union[float, FractionalOrder],
side: str = 'lhs', # 'lhs' or 'rhs'
) -> sparse.spmatrix:
"""
Build spatial discretization matrix.
Args:
nx: Number of spatial points
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
dx: Spatial step size
dt: Temporal step size
alpha: Temporal fractional order
side: 'lhs' for left-hand side of the equation, 'rhs' for right-hand side
Returns:
Sparse matrix for spatial discretization
"""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
A = self._get_spatial_operator(nx, beta, dx)
n_interior = nx - 2
from scipy.special import gamma
I = np.eye(n_interior)
if 0 < alpha_val < 1:
# Implicit Euler for fractional derivative: (I - D*gamma(2-a)dt^a * A) u_n = history
factor = diffusion_coeff * gamma(2 - alpha_val) * (dt ** alpha_val)
final_matrix = I - factor * A
elif 1 < alpha_val < 2:
i_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val))
spatial_factor = diffusion_coeff
final_matrix = i_factor * I - spatial_factor * A
elif alpha_val == 1.0:
# Crank-Nicolson: (I - k/2 A) u_n = (I + k/2 A) u_{n-1}
k = diffusion_coeff * dt
if side == 'lhs':
final_matrix = I - (k/2) * A
else: # rhs
final_matrix = I + (k/2) * A
else: # alpha = 2.0 handled explicitly, this is a fallback
final_matrix = np.eye(n_interior)
return sparse.csr_matrix(final_matrix)
[docs]
def _get_spatial_operator(
self, nx: int, beta: Union[float, FractionalOrder], dx: float
) -> np.ndarray:
"""Computes the matrix for the Grünwald-Letnikov spatial derivative."""
if isinstance(beta, FractionalOrder):
beta_val = beta.alpha
else:
beta_val = beta
n_interior = nx - 2
A = np.zeros((n_interior, n_interior))
coeffs = self._grunwald_letnikov_coeffs(beta_val, nx)
for i in range(n_interior):
for j in range(n_interior):
k = i - j
if 0 <= k < len(coeffs):
A[i, j] = coeffs[k]
return A / (dx ** beta_val)
[docs]
class FractionalReactionDiffusionSolver(FractionalPDESolver):
"""
Solver for fractional reaction-diffusion equations.
Solves equations of the form:
∂^α u/∂t^α = D ∂^β u/∂x^β + R(u) + f(x, t, u)
where α and β are fractional orders and R(u) is the reaction term.
"""
[docs]
def __init__(
self,
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
derivative_type: str = "caputo",
):
"""
Initialize fractional reaction-diffusion solver.
Args:
method: Numerical method
spatial_order: Order of spatial discretization
temporal_order: Order of temporal discretization
derivative_type: Type of fractional derivative
"""
super().__init__("reaction_diffusion", method, spatial_order, temporal_order)
self.derivative_type = derivative_type.lower()
[docs]
def solve(
self,
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
reaction_term: Optional[Callable] = None,
source_term: Optional[Callable] = None,
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional reaction-diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
reaction_term: Reaction term R(u)
source_term: Source term f(x, t, u)
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
# Combine diffusion and reaction terms
def combined_source(x, t, u):
source = 0.0
if reaction_term is not None:
source += reaction_term(u)
if source_term is not None:
source += source_term(x, t, u)
return source
# Use diffusion solver with combined source term
diffusion_solver = FractionalDiffusionSolver(
self.method, self.spatial_order, self.temporal_order, self.derivative_type)
return diffusion_solver.solve(
x_span,
t_span,
initial_condition,
boundary_conditions,
alpha,
beta,
diffusion_coeff,
combined_source,
nx,
nt,
**kwargs,
)
# Convenience functions
[docs]
def solve_fractional_diffusion(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
source_term: Optional[Callable] = None,
method: str = "finite_difference",
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
source_term: Source term f(x, t, u)
method: Numerical method
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
solver = FractionalDiffusionSolver(method)
return solver.solve(
x_span,
t_span,
initial_condition,
boundary_conditions,
alpha,
beta,
diffusion_coeff,
source_term,
nx,
nt,
**kwargs,
)
[docs]
def solve_fractional_advection(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
velocity: float = 1.0,
source_term: Optional[Callable] = None,
method: str = "finite_difference",
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional advection equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
velocity: Advection velocity
source_term: Source term f(x, t, u)
method: Numerical method
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
solver = FractionalAdvectionSolver(method)
return solver.solve(
x_span,
t_span,
initial_condition,
boundary_conditions,
alpha,
beta,
velocity,
source_term,
nx,
nt,
**kwargs,
)
[docs]
def solve_fractional_reaction_diffusion(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
reaction_term: Optional[Callable] = None,
source_term: Optional[Callable] = None,
method: str = "finite_difference",
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional reaction-diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
reaction_term: Reaction term R(u)
source_term: Source term f(x, t, u)
method: Numerical method
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
solver = FractionalReactionDiffusionSolver(method)
return solver.solve(
x_span,
t_span,
initial_condition,
boundary_conditions,
alpha,
beta,
diffusion_coeff,
reaction_term,
source_term,
nx,
nt,
**kwargs,
)
[docs]
def solve_fractional_pde(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
equation_type: str = "diffusion",
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Generic solver for fractional PDEs.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
equation_type: Type of PDE ("diffusion", "advection", "reaction_diffusion")
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
if equation_type == "diffusion":
return solve_fractional_diffusion(
x_span, t_span, initial_condition, boundary_conditions,
alpha, beta, **kwargs
)
elif equation_type == "advection":
return solve_fractional_advection(
x_span, t_span, initial_condition, boundary_conditions,
alpha, beta, **kwargs
)
elif equation_type == "reaction_diffusion":
return solve_fractional_reaction_diffusion(
x_span, t_span, initial_condition, boundary_conditions,
alpha, beta, **kwargs
)
else:
raise ValueError(f"Unknown equation type: {equation_type}")