"""
Core Machine Learning Components for Fractional Calculus
This module provides the foundational ML classes that integrate fractional calculus
with neural networks, attention mechanisms, loss functions, and AutoML capabilities.
"""
import warnings
import numpy as np
from typing import Dict, List, Tuple, Optional, Any, Sequence, Union
from dataclasses import dataclass
from abc import abstractmethod
import json
from pathlib import Path
from hpfracc.core.definitions import FractionalOrder
from hpfracc.algorithms.derivatives import (
RiemannLiouville,
Caputo,
GrunwaldLetnikov,
)
from hpfracc.ml.backends import get_backend_manager, BackendType
from hpfracc.ml.fractional_derivative_native import (
_t_grid_and_h,
fractional_feature_map_native,
)
from hpfracc.ml.tensor_ops import get_tensor_ops
[docs]
@dataclass
class MLConfig:
"""Configuration for ML components"""
device: str = "cpu"
dtype: str = "float32"
fractional_order: float = 0.5
use_gpu: bool = False
batch_size: int = 32
learning_rate: float = 0.001
max_epochs: int = 100
validation_split: float = 0.2
early_stopping_patience: int = 10
model_save_path: str = "models/"
log_interval: int = 10
backend: BackendType = BackendType.AUTO
[docs]
class FractionalNeuralNetwork:
"""
Neural network with fractional calculus integration.
**Tensor layout.** The first linear layer expects the same shape you pass to
``forward`` (typically ``(batch, input_size)``). The fractional pre-processing
step applies a **discrete** fractional operator along ``fractional_axis``
(default ``-1``, the feature / time-sample axis). Use ``fractional_axis=0`` only
if your **rows** are samples along an index (unusual for ``(batch, features)``).
**Grid.** Native GL / Caputo L1 use a **uniform** step ``h``. Provide
``fractional_step`` (scalar ``h``), or ``fractional_t_grid`` (length matching the
sampled axis; must be strictly increasing and evenly spaced), or omit both for
the default grid ``t = linspace(0, 1, n)`` with ``h = 1/(n-1)``. Non-uniform
grids are rejected in the native path; use ``differentiable_fractional=False``
for the legacy NumPy operator (still uniform ``t`` implied the same way).
Supports PyTorch, JAX, and NUMBA backends.
"""
[docs]
def __init__(
self,
input_size: int,
hidden_sizes: List[int],
output_size: int,
fractional_order: float = 0.5,
activation: str = "relu",
dropout: float = 0.1,
config: Optional[MLConfig] = None,
backend: Optional[BackendType] = None,
differentiable_fractional: bool = True,
fractional_axis: int = -1,
fractional_step: Optional[float] = None,
fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None,
):
self.config = config or MLConfig()
self.fractional_order = FractionalOrder(fractional_order)
self.input_size = input_size
self.hidden_sizes = hidden_sizes
self.output_size = output_size
self.activation_name = activation
self.dropout_rate = dropout
# Set backend
# Resolve backend; treat AUTO as active backend
resolved_backend = backend or self.config.backend or get_backend_manager().active_backend
if resolved_backend == BackendType.AUTO:
resolved_backend = get_backend_manager().active_backend
self.backend = resolved_backend
self.tensor_ops = get_tensor_ops(self.backend)
self.differentiable_fractional = bool(differentiable_fractional)
self.fractional_axis = int(fractional_axis)
self.fractional_step = fractional_step
self.fractional_t_grid = fractional_t_grid
if (
fractional_step is not None
and fractional_t_grid is not None
):
raise ValueError(
"Pass at most one of fractional_step and fractional_t_grid."
)
# Initialize fractional derivative calculators
self.rl_calculator = RiemannLiouville(fractional_order)
self.caputo_calculator = Caputo(fractional_order)
self.gl_calculator = GrunwaldLetnikov(fractional_order)
# Build network layers
self.layers = []
self._build_network()
# Initialize weights
self._initialize_weights()
# Training state
self.training = True
[docs]
def train(self, mode: bool = True):
"""Set training mode"""
self.training = mode
[docs]
def eval(self):
"""Set evaluation mode"""
self.train(False)
[docs]
def parameters(self) -> List[Any]:
"""Return list of learnable parameters for compatibility with optimizers/tests"""
params: List[Any] = []
params.extend(self.weights)
params.extend(self.biases)
return params
[docs]
def _build_network(self):
"""Build the network architecture using the current backend"""
# Input layer
self.layers.append({
'type': 'linear',
'in_features': self.input_size,
'out_features': self.hidden_sizes[0]
})
# Hidden layers
for i in range(len(self.hidden_sizes) - 1):
self.layers.append({
'type': 'linear',
'in_features': self.hidden_sizes[i],
'out_features': self.hidden_sizes[i + 1]
})
# Output layer
self.layers.append({
'type': 'linear',
'in_features': self.hidden_sizes[-1],
'out_features': self.output_size
})
# Initialize weights and biases for each layer
self.weights = []
self.biases = []
for layer in self.layers:
if layer['type'] == 'linear':
# Initialize weights with proper random data
if self.backend == BackendType.TORCH:
import torch
weight = torch.randn(
layer['in_features'],
layer['out_features'],
dtype=torch.float32,
requires_grad=True)
bias = torch.zeros(
layer['out_features'],
dtype=torch.float32,
requires_grad=True)
elif self.backend == BackendType.JAX:
import jax.random as random
import jax.numpy as jnp
key = random.PRNGKey(0)
weight = random.normal(
key, (layer['in_features'], layer['out_features']))
bias = jnp.zeros(layer['out_features'])
else: # NUMBA
import numpy as np
weight = np.random.randn(
layer['in_features'], layer['out_features'])
bias = np.zeros(layer['out_features'])
self.weights.append(weight)
self.biases.append(bias)
[docs]
def _initialize_weights(self):
"""Initialize network weights using Xavier initialization"""
for i, (weight, bias) in enumerate(zip(self.weights, self.biases)):
if self.backend == BackendType.TORCH:
import torch.nn.init as init
init.xavier_uniform_(weight)
if bias is not None:
init.zeros_(bias)
else:
# Standard Xavier initialization for JAX/Numba
fan_in = weight.shape[0]
fan_out = weight.shape[1]
# Use tensor_ops to avoid manual math imports if desired,
# but standard numpy/math is fine for init
limit = np.sqrt(6 / (fan_in + fan_out))
if self.backend == BackendType.JAX:
# JAX weights are immutable, but here 'weight' is likely a JAX array
# being held in a list. In JAX, we usually carry a key and init
# during build, but for this refactor we rely on the placeholder structure.
# We can't mutate 'weight' in-place for JAX.
# But self.weights is a list, so we can replace.
# Note: Earlier code used simple scaling. Let's stick to simple scaling
# to avoid complex RNG logic here without passing keys around.
# Re-implementing the original scaling logic for consistency:
import math
scale = math.sqrt(2.0 / (fan_in + fan_out))
self.weights[i] = weight * scale
self.biases[i] = bias * 0.0
else:
# Numba/Numpy -> Mutable
import math
scale = math.sqrt(2.0 / (fan_in + fan_out))
np.copyto(self.weights[i], self.weights[i] * scale)
np.copyto(self.biases[i], self.biases[i] * 0.0)
[docs]
def fractional_forward(
self,
x: Any,
method: str = "RL",
*,
axis: Optional[int] = None,
fractional_step: Optional[float] = None,
fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None,
) -> Any:
"""
Apply a discrete fractional operator along ``axis`` (default: ``fractional_axis``).
Optional per-call overrides: ``axis``, ``fractional_step``, ``fractional_t_grid``
(same semantics as constructor; per-call grid overrides instance grid).
"""
alpha = float(self.fractional_order.alpha)
m = method.upper()
ax = self.fractional_axis if axis is None else int(axis)
h_arg = (
fractional_step
if fractional_step is not None
else self.fractional_step
)
grid_arg = (
fractional_t_grid
if fractional_t_grid is not None
else self.fractional_t_grid
)
if h_arg is not None and grid_arg is not None:
raise ValueError(
"Pass at most one of fractional_step and fractional_t_grid "
"(per-call or on the model)."
)
if self.differentiable_fractional:
try:
return fractional_feature_map_native(
x,
backend=self.backend,
alpha=alpha,
method=m,
axis=ax,
h=h_arg,
t_grid=grid_arg,
)
except NotImplementedError as e:
warnings.warn(
f"Native fractional path unavailable ({e!r}); using legacy NumPy "
"operator with straight-through / detach on Torch or JAX.",
RuntimeWarning,
stacklevel=2,
)
if m == "RL":
calculator = self.rl_calculator
elif m == "CAPUTO":
calculator = self.caputo_calculator
elif m == "GL":
calculator = self.gl_calculator
else:
raise ValueError(f"Unknown method: {method}")
if self.backend == BackendType.TORCH:
x_np = x.detach().cpu().numpy().astype(np.float32)
elif self.backend == BackendType.JAX:
x_np = np.asarray(x, dtype=np.float32)
else:
x_np = np.array(x, dtype=np.float32)
nd = x_np.ndim
if ax < 0:
ax += nd
if not 0 <= ax < nd:
raise ValueError(f"fractional axis {ax} out of bounds for ndim={nd}")
n = int(x_np.shape[ax])
t_np, dt = _t_grid_and_h(n, h=h_arg, t_grid=grid_arg)
t_np = t_np.astype(np.float32, copy=False)
x_line = np.moveaxis(x_np, ax, -1)
def _one_row(row: np.ndarray) -> np.ndarray:
return calculator.compute(row, t_np, dt)
result = np.apply_along_axis(_one_row, -1, x_line)
result = np.moveaxis(result, -1, ax)
fr_np = result.astype(np.float32)
if self.backend == BackendType.TORCH:
import torch
fr = torch.as_tensor(fr_np, device=x.device, dtype=torch.float32)
return fr.detach() + (x - x.detach())
if self.backend == BackendType.JAX:
import jax
import jax.numpy as jnp
fr = jnp.asarray(fr_np, dtype=jnp.result_type(x))
return jax.lax.stop_gradient(fr) + (
x - jax.lax.stop_gradient(x)
)
return self.tensor_ops.create_tensor(fr_np)
[docs]
def forward(
self,
x: Any,
use_fractional: bool = True,
method: str = "RL",
params: Optional[Dict[str, List[Any]]] = None,
fractional_axis: Optional[int] = None,
fractional_step: Optional[float] = None,
fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None,
) -> Any:
"""
Forward pass through the network
Args:
x: Input tensor
use_fractional: Whether to apply fractional derivatives
method: Fractional derivative method if use_fractional is True
params: Optional dictionary of parameters {'weights': [...], 'biases': [...]}
for functional execution (JAX support).
fractional_axis: Optional override for the sampled axis (see class docstring).
fractional_step: Optional uniform step ``h`` for the fractional grid.
fractional_t_grid: Optional uniform 1D grid (length = size along sampled axis).
Returns:
Network output
"""
if use_fractional:
x = self.fractional_forward(
x,
method,
axis=fractional_axis,
fractional_step=fractional_step,
fractional_t_grid=fractional_t_grid,
)
# Use provided params or self.params
weights = params['weights'] if params else self.weights
biases = params['biases'] if params else self.biases
# Pass through network layers
for i, (weight, bias) in enumerate(
zip(weights[:-1], biases[:-1])):
# Linear transformation
x = self.tensor_ops.matmul(x, weight) + bias
# Apply activation
x = self._apply_activation(x)
# Apply dropout
x = self.tensor_ops.dropout(x, p=self.dropout_rate, training=self.training)
# Output layer (no activation)
x = self.tensor_ops.matmul(x, weights[-1]) + biases[-1]
return x
[docs]
def _apply_activation(self, x: Any) -> Any:
"""Apply activation function based on backend"""
if self.activation_name == "relu":
return self.tensor_ops.relu(x)
elif self.activation_name == "sigmoid":
return self.tensor_ops.sigmoid(x)
elif self.activation_name == "tanh":
return self.tensor_ops.tanh(x)
else:
return x
[docs]
def save_model(self, path: str):
"""Save model to file"""
Path(path).parent.mkdir(parents=True, exist_ok=True)
# Save weights and biases
model_data = {
'weights': [
self.tensor_ops.create_tensor(w) for w in self.weights], 'biases': [
self.tensor_ops.create_tensor(b) for b in self.biases]}
if self.backend == BackendType.TORCH:
import torch
torch.save(model_data, path)
else:
import pickle
with open(path, 'wb') as f:
pickle.dump(model_data, f)
# Save configuration
config_path = path.replace('.pth', '_config.json')
config_data = {
'input_size': self.input_size,
'hidden_sizes': self.hidden_sizes,
'output_size': self.output_size,
'fractional_order': float(self.fractional_order),
'activation': self.activation_name,
'backend': self.backend.value
}
with open(config_path, 'w') as f:
json.dump(config_data, f, indent=2)
[docs]
@classmethod
def load_model(cls, path: str, config_path: Optional[str] = None):
"""Load model from file"""
if config_path is None:
config_path = path.replace('.pth', '_config.json')
with open(config_path, 'r') as f:
config_data = json.load(f)
# Determine backend from config
backend = BackendType(config_data.get('backend', 'torch'))
model = cls(
input_size=config_data['input_size'],
hidden_sizes=config_data['hidden_sizes'],
output_size=config_data['output_size'],
fractional_order=config_data['fractional_order'],
backend=backend
)
# Load weights and biases
if backend == BackendType.TORCH:
import torch
model_data = torch.load(path)
else:
import pickle
with open(path, 'rb') as f:
model_data = pickle.load(f)
model.weights = model_data['weights']
model.biases = model_data['biases']
return model
def __call__(
self,
x: Any,
use_fractional: bool = True,
method: str = "RL") -> Any:
"""Make the network callable"""
return self.forward(x, use_fractional, method)
[docs]
class FractionalAttention:
"""
Multi-head attention with an optional **discrete** fractional map on the
attended context.
After standard attention, the context tensor has shape
``(batch, n_heads, seq_len, d_k)``. The fractional step runs along the
**sequence** dimension (axis ``2``), using the same native GrรผnwaldโLetnikov /
L1-Caputo machinery as ``FractionalNeuralNetwork`` when
``differentiable_fractional`` is True. Optional ``fractional_step`` and
``fractional_t_grid`` define a **uniform** grid along that axis (see
``fractional_feature_map_native``).
The spectral FFT stack in ``hpfracc.ml.spectral_autograd`` is separate; this
class does not use it.
Supports PyTorch, JAX, and NUMBA backends.
"""
[docs]
def __init__(
self,
d_model: int,
n_heads: int = 8,
fractional_order: float = 0.5,
dropout: float = 0.1,
backend: Optional[BackendType] = None,
differentiable_fractional: bool = True,
fractional_step: Optional[float] = None,
fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None,
):
self.d_model = d_model
self.n_heads = n_heads
# Ensure d_k is valid
if d_model % n_heads != 0:
# Adjust d_model to be divisible by n_heads
self.d_model = ((d_model // n_heads) + 1) * n_heads
print(
f"Warning: d_model adjusted from {d_model} to {self.d_model} to be divisible by {n_heads}")
self.d_k = self.d_model // n_heads
self.fractional_order = FractionalOrder(fractional_order)
self.dropout_rate = dropout
self.differentiable_fractional = bool(differentiable_fractional)
self.fractional_step = fractional_step
self.fractional_t_grid = fractional_t_grid
if fractional_step is not None and fractional_t_grid is not None:
raise ValueError(
"Pass at most one of fractional_step and fractional_t_grid."
)
# Set backend
self.backend = backend or get_backend_manager().active_backend
self.tensor_ops = get_tensor_ops(self.backend)
self.training = True
# Initialize attention weights
self._initialize_weights()
# Fractional derivative calculators
self.rl_calculator = RiemannLiouville(fractional_order)
self.caputo_calculator = Caputo(fractional_order)
self.gl_calculator = GrunwaldLetnikov(fractional_order)
[docs]
def train(self, mode: bool = True):
self.training = bool(mode)
return self
[docs]
def eval(self):
return self.train(False)
[docs]
def _initialize_weights(self):
"""Initialize attention weights"""
if self.backend == BackendType.TORCH:
import torch
self.w_q = torch.randn(
self.d_model, self.d_model, dtype=torch.float32,
requires_grad=True)
self.w_k = torch.randn(
self.d_model, self.d_model, dtype=torch.float32,
requires_grad=True)
self.w_v = torch.randn(
self.d_model, self.d_model, dtype=torch.float32,
requires_grad=True)
self.w_o = torch.randn(
self.d_model, self.d_model, dtype=torch.float32,
requires_grad=True)
# Xavier initialization
import torch.nn.init as init
init.xavier_uniform_(self.w_q)
init.xavier_uniform_(self.w_k)
init.xavier_uniform_(self.w_v)
init.xavier_uniform_(self.w_o)
elif self.backend == BackendType.JAX:
import jax.random as random
key = random.PRNGKey(0)
self.w_q = random.normal(key, (self.d_model, self.d_model))
self.w_k = random.normal(key, (self.d_model, self.d_model))
self.w_v = random.normal(key, (self.d_model, self.d_model))
self.w_o = random.normal(key, (self.d_model, self.d_model))
else: # NUMBA
import numpy as np
self.w_q = np.random.randn(self.d_model, self.d_model)
self.w_k = np.random.randn(self.d_model, self.d_model)
self.w_v = np.random.randn(self.d_model, self.d_model)
self.w_o = np.random.randn(self.d_model, self.d_model)
[docs]
def fractional_attention(
self,
q: Any,
k: Any,
v: Any,
method: str = "RL") -> Any:
"""
Compute attention with fractional derivatives
Args:
q, k, v: Query, key, value tensors of shape (batch_size, n_heads, seq_len, d_k)
method: Fractional derivative method
Returns:
Attention output with fractional calculus applied
"""
# Compute attention scores
# Ensure tensors are in (batch, heads, seq, d_k)
# Some tests provide input as (seq, batch, d_model); our forward reshapes accordingly.
if self.backend == BackendType.TORCH:
import torch
k_t = k.transpose(2, 3).contiguous()
else:
k_t = self.tensor_ops.transpose(k, (0, 1, 3, 2))
# Compute scale factor sqrt(d_k) as scalar to avoid broadcast issues
if self.backend == BackendType.TORCH:
import torch
d_k_sqrt_scalar = float(self.d_k) ** 0.5
scores = torch.matmul(q, k_t) / d_k_sqrt_scalar
else:
d_k_tensor = self.tensor_ops.create_tensor(self.d_k)
d_k_sqrt = self.tensor_ops.sqrt(d_k_tensor)
scores = self.tensor_ops.matmul(q, k_t) / d_k_sqrt
attention_weights = self.tensor_ops.softmax(scores, dim=-1)
attention_weights = self.tensor_ops.dropout(
attention_weights, p=self.dropout_rate, training=self.training)
# Apply attention to values
context = self.tensor_ops.matmul(attention_weights, v)
m = method.upper()
if m == "RL":
calculator = self.rl_calculator
elif m == "CAPUTO":
calculator = self.caputo_calculator
elif m == "GL":
calculator = self.gl_calculator
else:
raise ValueError(f"Unknown method: {method}")
alpha = float(self.fractional_order.alpha)
seq_axis = 2
if self.differentiable_fractional:
try:
return fractional_feature_map_native(
context,
backend=self.backend,
alpha=alpha,
method=m,
axis=seq_axis,
h=self.fractional_step,
t_grid=self.fractional_t_grid,
)
except NotImplementedError as e:
warnings.warn(
f"Native fractional path on attention output unavailable ({e!r}); "
"using legacy NumPy path with straight-through / detach.",
RuntimeWarning,
stacklevel=2,
)
context_np = (
context.detach().cpu().numpy()
if self.backend == BackendType.TORCH
else np.asarray(context)
)
n = int(context_np.shape[seq_axis])
t_np, dt = _t_grid_and_h(
n, h=self.fractional_step, t_grid=self.fractional_t_grid
)
t_np = t_np.astype(np.float32, copy=False)
ctx_line = np.moveaxis(context_np, seq_axis, -1)
def _one_slice(row: np.ndarray) -> np.ndarray:
return calculator.compute(row, t_np, dt)
result = np.apply_along_axis(_one_slice, -1, ctx_line)
result = np.moveaxis(result, -1, seq_axis).astype(np.float32)
if self.backend == BackendType.TORCH:
import torch
fr = torch.as_tensor(
result, device=context.device, dtype=context.dtype
)
return fr.detach() + (context - context.detach())
if self.backend == BackendType.JAX:
import jax
import jax.numpy as jnp
fr = jnp.asarray(result, dtype=jnp.result_type(context))
return jax.lax.stop_gradient(fr) + (
context - jax.lax.stop_gradient(context)
)
return self.tensor_ops.create_tensor(result)
[docs]
def forward(self, x: Any, method: str = "RL") -> Any:
"""
Forward pass through fractional attention
Args:
x: Input tensor of shape (batch_size, seq_len, d_model)
method: Fractional derivative method
Returns:
Output tensor with attention and fractional calculus applied
"""
# Accept both (batch, seq, d_model) and (seq, batch, d_model)
original_layout_seq_batch = False
if hasattr(x, "shape") and len(x.shape) == 3:
b0, b1, b2 = x.shape
# Common case in tests: (seq, batch, d_model) with batch < seq
if b2 == self.d_model and b1 < b0:
original_layout_seq_batch = True
if self.backend == BackendType.TORCH:
x = x.permute(1, 0, 2).contiguous()
else:
x = self.tensor_ops.transpose(x, (1, 0, 2))
batch_size, seq_len, _ = x.shape
# Linear transformations
if self.backend == BackendType.TORCH:
import torch
# Ensure contiguous and perform batched matmul via flattening
b, t, d = x.shape
x2 = x.contiguous().view(b * t, d)
q2 = torch.matmul(x2, self.w_q)
k2 = torch.matmul(x2, self.w_k)
v2 = torch.matmul(x2, self.w_v)
q = q2.view(b, t, d)
k = k2.view(b, t, d)
v = v2.view(b, t, d)
else:
q = self.tensor_ops.matmul(x, self.w_q)
k = self.tensor_ops.matmul(x, self.w_k)
v = self.tensor_ops.matmul(x, self.w_v)
# Reshape for multi-head attention
q = self.tensor_ops.reshape(
q, (batch_size, seq_len, self.n_heads, self.d_k))
k = self.tensor_ops.reshape(
k, (batch_size, seq_len, self.n_heads, self.d_k))
v = self.tensor_ops.reshape(
v, (batch_size, seq_len, self.n_heads, self.d_k))
# Transpose for attention computation (batch_size, n_heads, seq_len,
# d_k)
q = self.tensor_ops.transpose(q, dims=(0, 2, 1, 3))
k = self.tensor_ops.transpose(k, dims=(0, 2, 1, 3))
v = self.tensor_ops.transpose(v, dims=(0, 2, 1, 3))
# Apply fractional attention
context = self.fractional_attention(q, k, v, method)
# Reshape and apply output projection
context = self.tensor_ops.transpose(context, dims=(0, 2, 1, 3))
context = self.tensor_ops.reshape(
context, (batch_size, seq_len, self.d_model))
output = self.tensor_ops.matmul(context, self.w_o)
# Residual connection and layer normalization (simplified)
# Ensure consistent dtype for residual connection
if self.backend == BackendType.TORCH:
if x.dtype != output.dtype:
output = output.to(x.dtype)
output = x + output
# Convert back to original layout if needed
if original_layout_seq_batch:
if self.backend == BackendType.TORCH:
output = output.permute(1, 0, 2).contiguous()
else:
output = self.tensor_ops.transpose(output, dims=(1, 0, 2))
return output
def __call__(self, x: Any, method: str = "RL") -> Any:
"""Make the attention mechanism callable"""
return self.forward(x, method)
[docs]
class FractionalLossFunction:
"""
Base class for loss functions with fractional calculus integration
This class provides a framework for creating loss functions that
incorporate fractional derivatives to capture complex relationships.
Supports multiple backends: PyTorch, JAX, and NUMBA.
"""
[docs]
def __init__(self, fractional_order: float = 0.5,
backend: Optional[BackendType] = None):
self.fractional_order = FractionalOrder(fractional_order)
self.backend = backend or get_backend_manager().active_backend
self.tensor_ops = get_tensor_ops(self.backend)
self.rl_calculator = RiemannLiouville(fractional_order)
[docs]
@abstractmethod
def compute_loss(self, predictions: Any, targets: Any) -> Any:
"""Compute the base loss"""
[docs]
def fractional_loss(self, predictions: Any, targets: Any) -> Any:
"""
Compute loss with fractional derivative applied to predictions
Args:
predictions: Model predictions
targets: Ground truth targets
Returns:
Fractional loss value
"""
# Apply fractional derivative to predictions
if self.backend == BackendType.TORCH:
pred_np = predictions.detach().cpu().numpy()
else:
pred_np = np.array(predictions)
if pred_np.ndim == 2:
# For 2D tensors (batch_size, features)
# Apply along axis 1 (features)
t = np.linspace(0, 1, pred_np.shape[1])
dt = t[1] - t[0] if len(t) > 1 else 1.0
result = np.apply_along_axis(self.rl_calculator.compute, 1, pred_np, t, dt)
else:
# For 1D tensors
t = np.linspace(0, 1, pred_np.shape[0])
dt = t[1] - t[0] if len(t) > 1 else 1.0
result = self.rl_calculator.compute(pred_np, t, dt)
fractional_pred = self.tensor_ops.create_tensor(
result, requires_grad=True)
# Compute loss with fractional predictions
return self.compute_loss(fractional_pred, targets)
[docs]
def forward(self, predictions: Any, targets: Any,
use_fractional: bool = True) -> Any:
"""
Forward pass for loss computation
Args:
predictions: Model predictions
targets: Ground truth targets
use_fractional: Whether to apply fractional derivatives
Returns:
Loss value
"""
if use_fractional:
return self.fractional_loss(predictions, targets)
else:
return self.compute_loss(predictions, targets)
[docs]
class FractionalMSELoss(FractionalLossFunction):
"""Mean Squared Error loss with fractional calculus integration"""
[docs]
def compute_loss(self, predictions: Any, targets: Any) -> Any:
return self.tensor_ops.mean((predictions - targets) ** 2)
[docs]
class FractionalCrossEntropyLoss(FractionalLossFunction):
"""Cross Entropy loss with fractional calculus integration"""
[docs]
def compute_loss(self, predictions: Any, targets: Any) -> Any:
# Simplified cross-entropy for multi-backend compatibility
# In practice, you'd want more sophisticated implementations
return self.tensor_ops.mean(-targets * self.tensor_ops.log(
self.tensor_ops.softmax(predictions, dim=-1)))
[docs]
class FractionalAutoML:
"""
Automated Machine Learning for fractional calculus parameters
This class provides automated optimization of fractional orders and
other hyperparameters for optimal performance on specific tasks.
"""
[docs]
def __init__(self, config: Optional[MLConfig] = None):
self.config = config or MLConfig()
self.best_params = {}
self.optimization_history = []
[docs]
def optimize_fractional_order(
self,
model_class: type,
train_data: Tuple[Any, Any],
val_data: Tuple[Any, Any],
param_ranges: Dict[str, List[float]],
n_trials: int = 50,
metric: str = "accuracy"
) -> Dict[str, Any]:
"""
Optimize fractional order and other hyperparameters
Args:
model_class: Class of model to optimize
train_data: Training data (X, y)
val_data: Validation data (X, y)
param_ranges: Dictionary of parameter ranges to search
n_trials: Number of optimization trials
metric: Metric to optimize
Returns:
Dictionary with best parameters and optimization results
"""
import optuna
def objective(trial):
# Sample parameters
params = {}
for param_name, param_range in param_ranges.items():
if isinstance(param_range[0], int):
params[param_name] = trial.suggest_int(
param_name, param_range[0], param_range[1])
elif isinstance(param_range[0], float):
params[param_name] = trial.suggest_float(
param_name, param_range[0], param_range[1])
else:
params[param_name] = trial.suggest_categorical(
param_name, param_range)
# Create model
try:
model = model_class(**params)
# Setup optimizer (basic SGD for demo/speed in AutoML)
# In a real scenario, this should be configurable or use the model's preferred optimizer
if model.backend == BackendType.TORCH:
import torch.optim as optim
import torch.nn as nn
import torch
optimizer = optim.Adam(model.parameters(), lr=0.01)
criterion = nn.MSELoss() if metric != "accuracy" else nn.CrossEntropyLoss()
X_train, y_train = train_data
X_val, y_val = val_data
# Ensure tensors
if not isinstance(X_train, torch.Tensor):
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32 if metric != "accuracy" else torch.long)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32 if metric != "accuracy" else torch.long)
# Short training loop for trial
model.train()
epochs = 5 # Reduced epochs for speed in AutoML loop
for _ in range(epochs):
optimizer.zero_grad()
output = model(X_train)
loss = criterion(output, y_train)
loss.backward()
optimizer.step()
# Validation
model.eval()
with torch.no_grad():
val_out = model(X_val)
if metric == "accuracy":
preds = val_out.argmax(dim=1)
score = (preds == y_val).float().mean().item()
else:
score = criterion(val_out, y_val).item()
return score
elif model.backend == BackendType.JAX:
import jax
import jax.numpy as jnp
import optax
# Setup data
X_train, y_train = train_data
X_val, y_val = val_data
X_train = jnp.asarray(X_train)
y_train = jnp.asarray(y_train)
X_val = jnp.asarray(X_val)
y_val = jnp.asarray(y_val)
# Define optimizer
optimizer = optax.adam(learning_rate=0.01)
# Initialize params as a PyTree
params = {
'weights': model.weights, # These are already JAX arrays
'biases': model.biases
}
opt_state = optimizer.init(params)
# Define loss function
def loss_fn(p, x, y):
preds = model.forward(x, params=p)
if metric == "accuracy":
# Cross Entropy
# Simplified: -mean(sum(one_hot(y) * log(softmax(preds))))
logits = jax.nn.log_softmax(preds)
n_classes = preds.shape[-1]
one_hot = jax.nn.one_hot(y, n_classes)
return -jnp.mean(jnp.sum(one_hot * logits, axis=-1))
else:
# MSE
return jnp.mean((preds - y) ** 2)
# JIT compile update step
@jax.jit
def step(p, opt_st, x, y):
loss, grads = jax.value_and_grad(loss_fn)(p, x, y)
updates, new_opt_st = optimizer.update(grads, opt_st)
new_params = optax.apply_updates(p, updates)
return new_params, new_opt_st, loss
# Training loop
epochs = 5
for _ in range(epochs):
params, opt_state, loss = step(params, opt_state, X_train, y_train)
# Validation
val_preds = model.forward(X_val, params=params)
if metric == "accuracy":
preds_cls = jnp.argmax(val_preds, axis=1)
score = jnp.mean(preds_cls == y_val).item()
else:
score = jnp.mean((val_preds - y_val) ** 2).item()
return float(score)
except Exception as e:
# Prune failed trials
print(f"Trial failed: {e}")
raise optuna.exceptions.TrialPruned()
# Create study and optimize
study = optuna.create_study(
direction="maximize" if metric == "accuracy" else "minimize")
study.optimize(objective, n_trials=n_trials)
# Store results
self.best_params = study.best_params
self.optimization_history = study.trials
return {
'best_params': self.best_params,
'best_value': study.best_value,
'optimization_history': self.optimization_history
}
[docs]
def get_best_model(self, model_class: type, **kwargs) -> Any:
"""Get model instance with best parameters"""
if not self.best_params:
raise ValueError("No optimization has been run yet")
# Merge best params with additional kwargs
params = {**self.best_params, **kwargs}
return model_class(**params)