"""
Fractional Partial Differential Equation Solvers
This module provides comprehensive solvers for fractional PDEs including
finite difference methods, spectral methods, and adaptive mesh refinement.
Performance Note (v2.1.0):
- Intelligent backend selection for sparse matrix operations
- Optimal array operations based on problem size
- Grünwald–Letnikov interior spatial operator is assembled directly in CSR (no dense fill)
"""
import numpy as np
from typing import Union, Optional, Tuple, Callable, Dict
from scipy import sparse, fft
from scipy.sparse.linalg import spsolve, splu
from scipy.special import gamma
from ..core.definitions import FractionalOrder
# Initialize intelligent backend selector for PDE solvers
_intelligent_selector = None
_use_intelligent_backend = True
[docs]
def _get_intelligent_selector():
"""Get intelligent backend selector instance for PDE solvers."""
global _intelligent_selector, _use_intelligent_backend
if not _use_intelligent_backend:
return None
if _intelligent_selector is None:
try:
from ..ml.intelligent_backend_selector import IntelligentBackendSelector
_intelligent_selector = IntelligentBackendSelector(enable_learning=True)
except ImportError:
_use_intelligent_backend = False
_intelligent_selector = None
return _intelligent_selector
[docs]
def _select_array_backend(data_size: int, operation_type: str = "element_wise") -> str:
"""
Select optimal backend for array operations in PDE solving.
Args:
data_size: Number of elements to process
operation_type: Type of operation (e.g., "element_wise", "matrix_multiply")
Returns:
Backend name: "numpy", "numba", or "jax"
"""
selector = _get_intelligent_selector()
if selector is not None:
try:
from ..ml.intelligent_backend_selector import WorkloadCharacteristics
from ..ml.backends import BackendType
workload = WorkloadCharacteristics(
operation_type=operation_type,
data_size=data_size,
data_shape=(data_size,),
is_iterative=True
)
backend_type = selector.select_backend(workload)
# Map to appropriate backends
if backend_type == BackendType.NUMBA:
return "numba"
elif backend_type == BackendType.JAX:
return "jax"
else:
return "numpy"
except Exception:
pass # Fall through to default
# Default selection based on size
if data_size < 10000:
return "numpy" # Small problems: NumPy is fastest
else:
return "numba" # Large problems: Numba JIT compilation helps
[docs]
def _select_fft_backend(data_size: int) -> str:
"""Select optimal FFT backend."""
selector = _get_intelligent_selector()
if selector is not None:
try:
from ..ml.intelligent_backend_selector import WorkloadCharacteristics
from ..ml.backends import BackendType
workload = WorkloadCharacteristics(
operation_type="fft", data_size=data_size, data_shape=(data_size,), is_iterative=True
)
backend_type = selector.select_backend(workload)
if backend_type == BackendType.JAX: return "jax"
except Exception: pass
return "scipy" if data_size > 1000 else "numpy"
[docs]
def _fft_convolution(coeffs: np.ndarray, values: np.ndarray, axis: int = 0) -> np.ndarray:
"""Fast convolution with backend selection."""
N = coeffs.shape[0]
backend = _select_fft_backend(N * (values.shape[1] if values.ndim == 2 else 1))
if backend == "jax":
try:
import jax.numpy as jnp
from jax.numpy import fft as jax_fft
size = int(2 ** np.ceil(np.log2(2 * N - 1)))
if values.ndim == 1:
coeffs_fft = jax_fft.fft(jnp.array(coeffs), n=size)
values_fft = jax_fft.fft(jnp.array(values), n=size)
return np.array(jax_fft.ifft(coeffs_fft * values_fft).real[:N])
elif values.ndim == 2:
# Handle axis=0 (time) or axis=1 (space)?
# Usually we convolve along time.
# If axis=1, transpose logic needed or supported axis arg.
# JAX FFT supports axis.
coeffs_fft = jax_fft.fft(jnp.array(coeffs), n=size, axis=0 if coeffs.ndim>1 else -1)
# Wait, if coeffs is 1D, fft is 1D.
# We need to broadcast coeffs to values shape effectively?
# Convolution of (N,) and (N, M) along axis 0:
# FFT(coeffs) (N,) * FFT(values) (N, M). Broadcasting works?
# (size,) * (size, M) -> (size, M). Yes.
vals_shape = list(values.shape); vals_shape[axis] = size
values_in = jnp.array(values)
coeffs_in = jnp.array(coeffs)
# Expand coeffs for broadcasting if needed
if axis == 0:
coeffs_fft = jax_fft.fft(coeffs_in, n=size)
coeffs_fft = coeffs_fft[:, None]
values_fft = jax_fft.fft(values_in, n=size, axis=0)
else:
coeffs_fft = jax_fft.fft(coeffs_in, n=size)
coeffs_fft = coeffs_fft[None, :]
values_fft = jax_fft.fft(values_in, n=size, axis=1)
return np.array(jax_fft.ifft(coeffs_fft * values_fft, axis=axis).real) # Slice handled by caller?
# The original implementation returned the sliced output [:N].
except ImportError: pass
# NumPy/SciPy fallback
size = int(2 ** np.ceil(np.log2(2 * N - 1)))
fft_mod = fft if backend == "scipy" else np.fft
if values.ndim == 1:
c_f = fft_mod.fft(coeffs, n=size)
v_f = fft_mod.fft(values, n=size)
return fft_mod.ifft(c_f * v_f).real[:N]
else: # 2D
if axis == 0:
c_f = fft_mod.fft(coeffs, n=size)[:, None]
v_f = fft_mod.fft(values, n=size, axis=0)
return fft_mod.ifft(c_f * v_f, axis=0).real[:N, :]
else:
c_f = fft_mod.fft(coeffs, n=size)[None, :]
v_f = fft_mod.fft(values, n=size, axis=1)
return fft_mod.ifft(c_f * v_f, axis=1).real[:, :N]
[docs]
def _grunwald_letnikov_coeffs_pde(order: float, n_points: int) -> np.ndarray:
"""Grünwald–Letnikov coefficients c_k for spatial discretization (recursive form)."""
coeffs = np.zeros(n_points, dtype=float)
coeffs[0] = 1.0
for k in range(1, n_points):
coeffs[k] = -coeffs[k - 1] * (order - (k - 1)) / k
return coeffs
[docs]
def _grunwald_letnikov_interior_sparse(
nx: int, beta: Union[float, FractionalOrder], dx: float
) -> sparse.csr_matrix:
"""
Sparse interior Grünwald–Letnikov spatial operator (Toeplitz lower triangular).
Interior unknowns correspond to physical indices 1 .. nx-2; entry (i, j)
uses offset k = i - j >= 0 with GL coefficient c_k, scaled by dx**(-beta).
"""
if isinstance(beta, FractionalOrder):
beta_val = float(beta.alpha)
else:
beta_val = float(beta)
n_interior = nx - 2
if n_interior <= 0:
return sparse.csr_matrix((0, 0), dtype=float)
coeffs = _grunwald_letnikov_coeffs_pde(beta_val, nx)
rows: list[int] = []
cols: list[int] = []
data: list[float] = []
scale = dx ** (-beta_val)
for i in range(n_interior):
for k in range(i + 1):
rows.append(i)
cols.append(i - k)
data.append(scale * float(coeffs[k]))
return sparse.csr_matrix((data, (rows, cols)), shape=(n_interior, n_interior))
[docs]
class FractionalPDESolver:
"""
Base class for fractional PDE solvers.
Provides common functionality for solving fractional partial
differential equations of various types.
"""
[docs]
def __init__(
self,
pde_type: str = "diffusion",
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
adaptive: bool = False,
*,
fractional_order: Optional[Union[float, FractionalOrder]] = None,
boundary_conditions: Optional[str] = None,
):
"""
Initialize fractional PDE solver.
Args:
pde_type: Type of PDE ("diffusion", "advection", "reaction_diffusion")
method: Numerical method ("finite_difference", "spectral", "finite_element")
spatial_order: Order of spatial discretization
temporal_order: Order of temporal discretization
adaptive: Use adaptive mesh refinement
"""
self.pde_type = pde_type.lower()
self.method = method.lower()
self.spatial_order = spatial_order
self.temporal_order = temporal_order
self.adaptive = adaptive
# Accept alias for tests; stored for compatibility only
self.fractional_order = fractional_order
# Store boundary condition mode for compatibility (e.g., "dirichlet", "periodic")
self.boundary_conditions = boundary_conditions or "dirichlet"
# Validate PDE type
valid_pde_types = ["diffusion", "advection",
"reaction_diffusion", "wave"]
if self.pde_type not in valid_pde_types:
raise ValueError(f"PDE type must be one of {valid_pde_types}")
# Validate method
valid_methods = ["finite_difference", "spectral", "finite_element"]
if self.method not in valid_methods:
raise ValueError(f"Method must be one of {valid_methods}")
[docs]
def _validate_orders(self, alpha: float, beta: float):
"""Validate fractional orders for PDE solver."""
if not 0 < alpha <= 2:
raise ValueError(f"Temporal order alpha must be in (0, 2], but got {alpha}")
if not 0 < beta <= 2:
raise ValueError(f"Spatial order beta must be in (0, 2], but got {beta}")
[docs]
def _grunwald_letnikov_coeffs(self, order: float, n_points: int) -> np.ndarray:
"""Compute Grünwald–Letnikov coefficients (shared by FD spatial operators)."""
return _grunwald_letnikov_coeffs_pde(order, n_points)
[docs]
def _get_spatial_operator(
self, nx: int, beta: Union[float, FractionalOrder], dx: float
) -> sparse.csr_matrix:
"""Interior Grünwald–Letnikov operator as CSR (no dense fill)."""
return _grunwald_letnikov_interior_sparse(nx, beta, dx)
[docs]
def _build_spatial_matrix(
self,
nx: int,
beta: Union[float, FractionalOrder],
diffusion_coeff: float,
dx: float,
dt: float,
alpha: Union[float, FractionalOrder],
side: str = "lhs",
) -> sparse.spmatrix:
"""
Build spatial discretization matrix for interior unknowns (sparse assembly).
Combines identity and scaled Grünwald–Letnikov operator without forming a
dense intermediate.
"""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
A = self._get_spatial_operator(nx, beta, dx)
n_interior = nx - 2
I = sparse.identity(n_interior, format="csr", dtype=float)
if 0 < alpha_val < 1:
factor = diffusion_coeff * gamma(2 - alpha_val) * (dt ** alpha_val)
final_matrix = I - factor * A
elif 1 < alpha_val < 2:
i_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val))
spatial_factor = diffusion_coeff
final_matrix = i_factor * I - spatial_factor * A
elif alpha_val == 1.0:
k = diffusion_coeff * dt
if side == "lhs":
final_matrix = I - (k / 2) * A
else:
final_matrix = I + (k / 2) * A
else:
final_matrix = I
return final_matrix.tocsr()
[docs]
def solve(
self,
t_span: Tuple[float, float],
x_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Dict,
alpha: float,
beta: float,
**kwargs,
) -> Dict[str, np.ndarray]:
"""Solve the fractional PDE."""
self._validate_orders(alpha, beta)
t0, tf = t_span
x_start, x_end = x_span
nt = kwargs.get("nt", 100)
nx = kwargs.get("nx", 100)
t_values = np.linspace(t0, tf, nt)
x_values = np.linspace(x_start, x_end, nx)
dt = t_values[1] - t_values[0]
dx = x_values[1] - x_values[0]
solution = np.zeros((nx, nt))
solution[:, 0] = initial_condition(x_values)
diffusion_coeff = kwargs.get("diffusion_coeff", 1.0)
A = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha, side='lhs')
A_lu = splu(A.tocsc())
# Precompute L1 weights
j = np.arange(1, nt)
l1_weights = (j + 1)**(1 - alpha) - j**(1 - alpha)
for n in range(nt - 1):
history = np.sum(l1_weights[:n] * np.diff(solution[:, :n+1], axis=1), axis=1)
rhs = solution[:, n] - (dt**alpha / gamma(2 - alpha)) * history
# Add source term if provided and callable
source_term = kwargs.get("source_term")
if callable(source_term):
source = source_term(x_values, t_values[n + 1])
rhs += dt * source
solution[1:-1, n + 1] = A_lu.solve(rhs[1:-1])
# Apply boundary conditions
if 'dirichlet' in boundary_conditions:
solution[0, n + 1] = boundary_conditions['dirichlet'][0](t_values[n+1])
solution[-1, n + 1] = boundary_conditions['dirichlet'][1](t_values[n+1])
return {"t": t_values, "x": x_values, "u": solution}
[docs]
class FractionalDiffusionSolver(FractionalPDESolver):
"""
Solver for fractional diffusion equations.
Solves equations of the form:
∂^α u/∂t^α = D ∂^β u/∂x^β + f(x, t, u)
where α and β are fractional orders.
"""
[docs]
def __init__(
self,
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
derivative_type: str = "caputo",
):
"""
Initialize fractional diffusion solver.
Args:
method: Numerical method
spatial_order: Order of spatial discretization
temporal_order: Order of temporal discretization
derivative_type: Type of fractional derivative
"""
super().__init__("diffusion", method, spatial_order, temporal_order)
self.derivative_type = derivative_type.lower()
[docs]
def solve(
self,
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
source_term: Optional[Callable] = None,
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
source_term: Source term f(x, t, u)
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
self._validate_orders(alpha, beta)
x0, xf = x_span
t0, tf = t_span
# Grids
x_values = np.linspace(x0, xf, nx)
t_values = np.linspace(t0, tf, nt)
dx = x_values[1] - x_values[0]
dt = t_values[1] - t_values[0]
# Solution array in (nx, nt) to reuse base formulations
solution = np.zeros((nx, nt), dtype=float)
solution[:, 0] = np.array([initial_condition(xi) for xi in x_values], dtype=float)
alpha_val = float(alpha.alpha) if hasattr(alpha, 'alpha') else float(alpha)
if self.method == "spectral":
# Periodic spectral implicit step per time level
D = diffusion_coeff
k = 2.0 * np.pi * np.fft.fftfreq(nx, d=dx)
abs_k_beta = np.abs(k) ** (float(beta.alpha) if hasattr(beta, 'alpha') else float(beta))
for n in range(1, nt):
# L1 history accumulation for 0<alpha<1, fallback to CN at alpha=1
# L1 history accumulation for 0<alpha<1
# We need sum_{m=1}^n a[m-1] * solution[:, n-m]
# Let k = m-1, sum_{k=0}^{n-1} a[k] * solution[:, n-1-k]
# This is convolution (a * solution) at index n-1
if 0.0 < alpha_val < 1.0:
j = np.arange(1, n + 1, dtype=float)
a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val)
# Vectorized history calculation
# solution[:, :n] is (nx, n)
# a is (n,)
# We want dot(solution[:, :n], a[::-1]) -> (nx,)
if n < 64:
hist = solution[:, :n] @ a[::-1]
else:
# Use FFT convolution along axis 1 (time) of solution
# solution is (nx, nt), we slice (nx, n)
# We need convolution along axis 1.
# _fft_convolution expects values, coeffs.
# We pass values=solution[:, :n], coeffs=a, axis=1
# Result shape (nx, n). We take slice [:, n-1] (last element)
full_conv = _fft_convolution(a, solution[:, :n], axis=1)
hist = full_conv[:, -1]
scale = gamma(2.0 - alpha_val) * (dt ** alpha_val)
rhs = hist.copy()
elif alpha_val == 1.0:
rhs = solution[:, n - 1]
scale = dt
else:
# Fallback simple history for 1<alpha<2
j = np.arange(1, n + 1, dtype=float)
c = j**(2.0 - alpha_val) - (j - 1.0)**(2.0 - alpha_val)
if n < 64:
hist = solution[:, :n] @ c[::-1]
else:
full_conv = _fft_convolution(c, solution[:, :n], axis=1)
hist = full_conv[:, -1]
scale = 1.0 / (gamma(3.0 - alpha_val) * (dt ** alpha_val))
rhs = hist.copy()
if source_term is not None:
s = np.array(
[
source_term(
xi,
t_values[n],
solution[ix, n - 1] if n > 0 else solution[ix, 0],
)
for ix, xi in enumerate(x_values)
],
dtype=float,
)
rhs = rhs + scale * s
rhs_hat = np.fft.fft(rhs)
denom = 1.0 + scale * D * abs_k_beta
u_hat = rhs_hat / denom
u_new = np.fft.ifft(u_hat).real
solution[:, n] = u_new
return t_values, x_values, solution.T
# Finite difference implicit solve with L1/CN/L2-1σ variants
A = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha, side='lhs')
A_lu = splu(A.tocsc())
if 0.0 < alpha_val < 1.0:
# Precompute L1 weights
j = np.arange(1, nt, dtype=float)
a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val)
scale = dt ** alpha_val / gamma(2.0 - alpha_val)
for n in range(0, nt - 1):
# Build RHS using history of increments
if n >= 1:
# increments of u in time for all past steps 1..n
diffs = np.diff(solution[:, : n + 1], axis=1) # shape (nx, n)
# weight for each past increment Δu^{k} is a_{n-k}
weights = a[:n][::-1] # length n
history = diffs @ weights # (nx,)
else:
history = np.zeros(nx, dtype=float)
rhs_full = solution[:, n] - scale * history
if source_term is not None:
# Source term may accept (x, t) or (x, t, u); support both
try:
s_n = np.array([source_term(xi, t_values[n + 1], solution[ix, n]) for ix, xi in enumerate(x_values)], dtype=float)
except TypeError:
s_n = np.array([source_term(xi, t_values[n + 1]) for xi in x_values], dtype=float)
rhs_full += dt * s_n
# Dirichlet boundaries enforced after solve; interior system
rhs = rhs_full[1:-1]
sol_interior = A_lu.solve(rhs)
solution[1:-1, n + 1] = sol_interior
# Apply boundary conditions
solution[0, n + 1] = boundary_conditions[0](t_values[n + 1])
solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1])
elif alpha_val == 1.0:
A_lhs = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='lhs')
A_rhs = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='rhs')
A_lu = splu(A_lhs.tocsc())
for n in range(0, nt - 1):
rhs_full = (A_rhs @ solution[1:-1, n])
if source_term is not None:
try:
s_n = np.array(
[
source_term(xi, t_values[n + 1], solution[ix + 1, n])
for ix, xi in enumerate(x_values[1:-1])
],
dtype=float,
)
except TypeError:
s_n = np.array(
[source_term(xi, t_values[n + 1]) for xi in x_values[1:-1]],
dtype=float,
)
rhs_full += dt * s_n
sol_interior = A_lu.solve(rhs_full)
solution[1:-1, n + 1] = sol_interior
solution[0, n + 1] = boundary_conditions[0](t_values[n + 1])
solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1])
else:
# Basic L2-1σ-like accumulation (smoke path)
j = np.arange(1, nt, dtype=float)
c = j**(2.0 - alpha_val) - (j - 1.0)**(2.0 - alpha_val)
inv_scale = 1.0 / (gamma(3.0 - alpha_val) * (dt ** alpha_val))
for n in range(0, nt - 1):
hist = np.zeros(nx - 2, dtype=float)
for m in range(1, n + 1):
hist += c[m - 1] * solution[1:-1, n + 1 - m]
rhs = inv_scale * hist
if source_term is not None:
try:
s_n = np.array(
[
source_term(xi, t_values[n + 1], solution[ix + 1, n])
for ix, xi in enumerate(x_values[1:-1])
],
dtype=float,
)
except TypeError:
s_n = np.array(
[source_term(xi, t_values[n + 1]) for xi in x_values[1:-1]],
dtype=float,
)
rhs += dt * s_n
sol_interior = A_lu.solve(rhs)
solution[1:-1, n + 1] = sol_interior
solution[0, n + 1] = boundary_conditions[0](t_values[n + 1])
solution[-1, n + 1] = boundary_conditions[1](t_values[n + 1])
return t_values, x_values, solution.T
[docs]
def _solve_spatial_step(
self,
solution: np.ndarray,
n: int,
x_values: np.ndarray,
t_values: np.ndarray,
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
source_term: Optional[Callable],
nx: int,
nt: int,
pde_params: dict,
) -> np.ndarray:
"""
Solve spatial problem at current time step.
Args:
solution: Solution array
n: Current time step
x_values: Spatial grid
t_values: Time grid
alpha: Temporal fractional order
beta: Spatial fractional order
source_term: Source term
nx: Number of spatial grid points
nt: Number of temporal grid points
pde_params: Dictionary of parameters (diffusion_coeff, dx, dt)
Returns:
Solution at interior points
"""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
dt = t_values[1] - t_values[0]
# Handle explicit scheme for alpha=2.0 (wave equation) separately
if alpha_val == 2.0:
diffusion_coeff = pde_params.get('diffusion_coeff', 1.0)
dx = pde_params.get('dx')
spatial_op = self._get_spatial_operator(nx, beta, dx)
if n == 1: # First step, assuming u_t(x,0)=0
spatial_deriv_term = diffusion_coeff * (spatial_op @ solution[1:-1, 0])
return solution[1:-1, 0] + (dt**2 / 2) * spatial_deriv_term
else: # Subsequent steps
spatial_deriv_term = diffusion_coeff * (spatial_op @ solution[1:-1, n-1])
return 2*solution[1:-1, n-1] - solution[1:-1, n-2] + dt**2 * spatial_deriv_term
if self.method == "finite_difference":
return self._finite_difference_step(
solution, n, x_values, t_values,
alpha, beta, source_term, nx, nt, pde_params
)
elif self.method == "spectral":
return self._spectral_step(
solution, n, x_values, t_values,
alpha, beta, source_term, nx, nt, pde_params
)
else:
raise ValueError(f"Unknown method: {self.method}")
[docs]
def _finite_difference_step(
self, solution: np.ndarray, n: int, x_values: np.ndarray, t_values: np.ndarray,
alpha: Union[float, FractionalOrder], beta: Union[float, FractionalOrder],
source_term: Optional[Callable], nx: int, nt: int, pde_params: dict,
) -> np.ndarray:
"""
Finite difference step.
Args:
solution: Solution array
n: Current time step
x_values: Spatial grid
t_values: Time grid
alpha: Temporal fractional order
beta: Spatial fractional order
source_term: Source term
nx: Number of spatial grid points
nt: Number of temporal grid points
pde_params: Dictionary of parameters (diffusion_coeff, dx, dt)
Returns:
Solution at interior points
"""
dt = t_values[1] - t_values[0]
dx = x_values[1] - x_values[0]
diffusion_coeff = pde_params.get("diffusion_coeff", 1.0)
# Build the left-hand side matrix A
A = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha, side='lhs')
# Build the right-hand side vector b from previous time steps
b = self._compute_temporal_derivative(solution, n, alpha, dt, **pde_params)
# Add source term if provided
if source_term:
source_at_n = np.array(
[source_term(xi, t_values[n]) for xi in x_values[1:-1]]
)
b += source_at_n
# Solve the linear system Au_n = b
u_interior = spsolve(A, b)
return u_interior
[docs]
def _spectral_step(
self,
solution, n, x_values, t_values, alpha, beta, source_term, nx, nt, pde_params
):
"""
Correct, efficient, diagonal spectral solve for periodic BCs.
"""
dx = x_values[1] - x_values[0]
dt = t_values[1] - t_values[0]
D = pde_params.get("diffusion_coeff", 1.0)
alpha_val = float(alpha.alpha) if hasattr(alpha, "alpha") else float(alpha)
beta_val = float(beta.alpha) if hasattr(beta, "alpha") else float(beta)
if not (0.0 < alpha_val < 1.0):
raise NotImplementedError(
f"Spectral PDE scheme is implemented for 0 < α < 1, got α={alpha_val}. "
"For α ≥ 1, consider decomposing into integer and fractional parts, or use "
"finite difference schemes with the L1/L2 methods. For α ≤ 0, the problem "
"is not well-defined for fractional PDEs."
)
u_prev = solution[:, n-1]
# L1 history RHS: sum_{j=1..n} a_j * u^{n-j}
j = np.arange(1, n+1, dtype=float)
a = j**(1.0 - alpha_val) - (j - 1.0)**(1.0 - alpha_val)
hist = np.zeros_like(u_prev)
for m in range(1, n+1):
hist += a[m-1] * solution[:, n-m]
scale = gamma(2.0 - alpha_val) * (dt ** alpha_val)
rhs = hist.copy()
if source_term is not None:
s = np.array([source_term(x, t_values[n], u_prev[ix]) for ix, x in enumerate(x_values)])
rhs += scale * s
k = 2.0 * np.pi * np.fft.fftfreq(nx, d=dx)
rhs_hat = np.fft.fft(rhs)
denom = 1.0 + scale * D * (np.abs(k) ** beta_val) # implicit-in-time diagonal
u_hat = rhs_hat / denom
u_new = np.fft.ifft(u_hat).real
return u_new[1:-1] # interior to match caller
[docs]
def _compute_temporal_derivative(
self,
solution: np.ndarray,
n: int,
alpha: Union[float, FractionalOrder],
dt: float,
**kwargs,
) -> np.ndarray:
"""Computes the RHS vector of the system, based on previous time steps."""
if isinstance(alpha, FractionalOrder):
alpha_val = alpha.alpha
else:
alpha_val = alpha
# For implicit schemes, construct the historical part of the temporal derivative
sum_term = np.zeros_like(solution[1:-1, 0])
if 0 < alpha_val < 1:
# L1 scheme coefficients
def l1_coeffs(k):
if k < 0: return 0
return (k + 1)**(1 - alpha_val) - k**(1 - alpha_val)
for j in range(1, n):
sum_term += (l1_coeffs(n - j - 1) - l1_coeffs(n - j)) * solution[1:-1, j]
rhs_temporal = sum_term + l1_coeffs(n - 1) * solution[1:-1, 0]
elif 1 < alpha_val < 2:
from scipy.special import gamma
# L2-type scheme coefficients
def c_coeffs(k):
if k < 0: return 0
return (k + 1)**(2 - alpha_val) - k**(2 - alpha_val)
# Summation part of the history term
for j in range(1, n):
sum_term += (c_coeffs(n - j - 1) - c_coeffs(n - j)) * solution[1:-1, j]
# Add the u_0 term
history_term = sum_term + c_coeffs(n - 1) * solution[1:-1, 0]
# This is only part of the RHS, needs scaling and other terms.
# (u_n - (2u_{n-1} - u_{n-2}) - history) / scale = D*A*u_n
# (1/scale)u_n - D*A*u_n = (1/scale)*(2u_{n-1} - u_{n-2} + history)
scaling_factor = 1 / (gamma(3 - alpha_val) * (dt ** alpha_val))
if n > 1:
rhs_temporal = scaling_factor * (2*solution[1:-1, n-1] - solution[1:-1, n-2] + history_term)
else: # n=1
rhs_temporal = scaling_factor * (2*solution[1:-1, n-1] + history_term)
elif alpha_val == 1.0:
# Crank-Nicolson for RHS
# A_LHS u_n = A_RHS u_{n-1}
# We solve A_LHS u_n = b, so b = A_RHS u_{n-1}
nx = kwargs['nx']
beta = kwargs['beta']
diffusion_coeff = kwargs['diffusion_coeff']
dx = kwargs['dx']
A_RHS = self._build_spatial_matrix(nx, beta, diffusion_coeff, dx, dt, alpha_val, side='rhs')
rhs_temporal = A_RHS @ solution[1:-1, n-1]
else: # Should not happen for alpha > 0
rhs_temporal = np.zeros_like(solution[1:-1, 0])
return rhs_temporal
[docs]
def _compute_spatial_derivative(
self, u: np.ndarray, beta: Union[float, FractionalOrder], dx: float
) -> np.ndarray:
"""
Compute spatial fractional derivative.
Args:
u: Solution at current time
beta: Spatial fractional order
dx: Spatial step size
Returns:
Spatial derivative
"""
if isinstance(beta, FractionalOrder):
beta_val = beta.alpha
else:
beta_val = beta
nx = len(u)
# Use finite difference approximation for spatial derivative
spatial_deriv = np.zeros(nx)
coeffs = self._grunwald_letnikov_coeffs(beta_val, nx)
for i in range(1, nx - 1):
for k in range(i + 1):
spatial_deriv[i] += coeffs[k] * u[i - k]
return spatial_deriv / (dx ** beta_val)
[docs]
class FractionalAdvectionSolver(FractionalPDESolver):
"""
Solver for fractional advection-diffusion equations.
Solves equations of the form:
D_t^α u = v * D_x^β u
Currently, only integer orders (alpha=1, beta=1) are supported.
"""
[docs]
def __init__(
self,
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
derivative_type: str = "caputo",
):
super().__init__("advection", method, spatial_order, temporal_order)
self.derivative_type = derivative_type.lower()
[docs]
def solve(
self,
t_span: Tuple[float, float],
x_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Dict,
alpha: float,
beta: float,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Solve the fractional advection equation."""
if not (alpha == 1.0 and beta == 1.0):
raise NotImplementedError(
"FractionalAdvectionSolver currently only supports integer orders (alpha=1, beta=1)."
)
result = super().solve(
t_span, x_span, initial_condition, boundary_conditions, alpha, beta, **kwargs
)
# Convert dictionary return to tuple format expected by tests
if isinstance(result, dict):
return result["t"], result["x"], result["u"].T
else:
return result
[docs]
class FractionalReactionDiffusionSolver(FractionalPDESolver):
"""
Solver for fractional reaction-diffusion equations.
Solves equations of the form:
∂^α u/∂t^α = D ∂^β u/∂x^β + R(u) + f(x, t, u)
where α and β are fractional orders and R(u) is the reaction term.
"""
[docs]
def __init__(
self,
method: str = "finite_difference",
spatial_order: int = 2,
temporal_order: int = 1,
derivative_type: str = "caputo",
):
"""
Initialize fractional reaction-diffusion solver.
Args:
method: Numerical method
spatial_order: Order of spatial discretization
temporal_order: Order of temporal discretization
derivative_type: Type of fractional derivative
"""
super().__init__("reaction_diffusion", method, spatial_order, temporal_order)
self.derivative_type = derivative_type.lower()
[docs]
def solve(
self,
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
reaction_term: Optional[Callable] = None,
source_term: Optional[Callable] = None,
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional reaction-diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
reaction_term: Reaction term R(u)
source_term: Source term f(x, t, u)
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
# Combine diffusion and reaction terms
def combined_source(x, t, u):
source = 0.0
if reaction_term is not None:
source += reaction_term(u)
if source_term is not None:
source += source_term(x, t, u)
return source
# Use diffusion solver with combined source term
diffusion_solver = FractionalDiffusionSolver(
self.method, self.spatial_order, self.temporal_order, self.derivative_type)
return diffusion_solver.solve(
x_span,
t_span,
initial_condition,
boundary_conditions,
alpha,
beta,
diffusion_coeff,
combined_source,
nx,
nt,
**kwargs,
)
# Convenience functions
[docs]
def solve_fractional_diffusion(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
source_term: Optional[Callable] = None,
method: str = "finite_difference",
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
source_term: Source term f(x, t, u)
method: Numerical method
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
solver = FractionalDiffusionSolver(method)
return solver.solve(
x_span=x_span,
t_span=t_span,
initial_condition=initial_condition,
boundary_conditions=boundary_conditions,
alpha=alpha,
beta=beta,
diffusion_coeff=diffusion_coeff,
source_term=source_term,
nx=nx,
nt=nt,
**kwargs,
)
[docs]
def solve_fractional_advection(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
velocity: float = 1.0,
source_term: Optional[Callable] = None,
method: str = "finite_difference",
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional advection equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
velocity: Advection velocity
source_term: Source term f(x, t, u)
method: Numerical method
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
solver = FractionalAdvectionSolver(method)
kwargs = dict(kwargs)
kwargs.setdefault("velocity", velocity)
kwargs.setdefault("source_term", source_term)
return solver.solve(
t_span,
x_span,
initial_condition,
boundary_conditions=boundary_conditions,
alpha=alpha,
beta=beta,
nx=nx,
nt=nt,
**kwargs,
)
[docs]
def solve_fractional_reaction_diffusion(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
diffusion_coeff: float = 1.0,
reaction_term: Optional[Callable] = None,
source_term: Optional[Callable] = None,
method: str = "finite_difference",
nx: int = 100,
nt: int = 100,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Solve fractional reaction-diffusion equation.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
diffusion_coeff: Diffusion coefficient
reaction_term: Reaction term R(u)
source_term: Source term f(x, t, u)
method: Numerical method
nx: Number of spatial grid points
nt: Number of temporal grid points
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
solver = FractionalReactionDiffusionSolver(method)
return solver.solve(
x_span,
t_span,
initial_condition,
boundary_conditions,
alpha,
beta,
diffusion_coeff,
reaction_term,
source_term,
nx,
nt,
**kwargs,
)
[docs]
def solve_fractional_pde(
x_span: Tuple[float, float],
t_span: Tuple[float, float],
initial_condition: Callable,
boundary_conditions: Tuple[Callable, Callable],
alpha: Union[float, FractionalOrder],
beta: Union[float, FractionalOrder],
equation_type: str = "diffusion",
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Generic solver for fractional PDEs.
Args:
x_span: Spatial interval (x0, xf)
t_span: Time interval (t0, tf)
initial_condition: Initial condition u(x, 0)
boundary_conditions: Boundary conditions (left_bc, right_bc)
alpha: Temporal fractional order
beta: Spatial fractional order
equation_type: Type of PDE ("diffusion", "advection", "reaction_diffusion")
**kwargs: Additional solver parameters
Returns:
Tuple of (t_values, x_values, solution)
"""
if equation_type == "diffusion":
return solve_fractional_diffusion(
x_span, t_span, initial_condition, boundary_conditions,
alpha, beta, **kwargs
)
elif equation_type == "advection":
return solve_fractional_advection(
x_span, t_span, initial_condition, boundary_conditions,
alpha, beta, **kwargs
)
elif equation_type == "reaction_diffusion":
return solve_fractional_reaction_diffusion(
x_span, t_span, initial_condition, boundary_conditions,
alpha, beta, **kwargs
)
else:
raise ValueError(f"Unknown equation type: {equation_type}")