Source code for hpfracc.ml.core

"""
Core Machine Learning Components for Fractional Calculus

This module provides the foundational ML classes that integrate fractional calculus
with neural networks, attention mechanisms, loss functions, and AutoML capabilities.
"""

import warnings
import numpy as np
from typing import Dict, List, Tuple, Optional, Any, Sequence, Union
from dataclasses import dataclass
from abc import abstractmethod
import json
from pathlib import Path

from hpfracc.core.definitions import FractionalOrder
from hpfracc.algorithms.derivatives import (
    RiemannLiouville,
    Caputo,
    GrunwaldLetnikov,
)
from hpfracc.ml.backends import get_backend_manager, BackendType
from hpfracc.ml.fractional_derivative_native import (
    _t_grid_and_h,
    fractional_feature_map_native,
)
from hpfracc.ml.tensor_ops import get_tensor_ops


[docs] @dataclass class MLConfig: """Configuration for ML components""" device: str = "cpu" dtype: str = "float32" fractional_order: float = 0.5 use_gpu: bool = False batch_size: int = 32 learning_rate: float = 0.001 max_epochs: int = 100 validation_split: float = 0.2 early_stopping_patience: int = 10 model_save_path: str = "models/" log_interval: int = 10 backend: BackendType = BackendType.AUTO
[docs] class FractionalNeuralNetwork: """ Neural network with fractional calculus integration. **Tensor layout.** The first linear layer expects the same shape you pass to ``forward`` (typically ``(batch, input_size)``). The fractional pre-processing step applies a **discrete** fractional operator along ``fractional_axis`` (default ``-1``, the feature / time-sample axis). Use ``fractional_axis=0`` only if your **rows** are samples along an index (unusual for ``(batch, features)``). **Grid.** Native GL / Caputo L1 use a **uniform** step ``h``. Provide ``fractional_step`` (scalar ``h``), or ``fractional_t_grid`` (length matching the sampled axis; must be strictly increasing and evenly spaced), or omit both for the default grid ``t = linspace(0, 1, n)`` with ``h = 1/(n-1)``. Non-uniform grids are rejected in the native path; use ``differentiable_fractional=False`` for the legacy NumPy operator (still uniform ``t`` implied the same way). Supports PyTorch, JAX, and NUMBA backends. """
[docs] def __init__( self, input_size: int, hidden_sizes: List[int], output_size: int, fractional_order: float = 0.5, activation: str = "relu", dropout: float = 0.1, config: Optional[MLConfig] = None, backend: Optional[BackendType] = None, differentiable_fractional: bool = True, fractional_axis: int = -1, fractional_step: Optional[float] = None, fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None, ): self.config = config or MLConfig() self.fractional_order = FractionalOrder(fractional_order) self.input_size = input_size self.hidden_sizes = hidden_sizes self.output_size = output_size self.activation_name = activation self.dropout_rate = dropout # Set backend # Resolve backend; treat AUTO as active backend resolved_backend = backend or self.config.backend or get_backend_manager().active_backend if resolved_backend == BackendType.AUTO: resolved_backend = get_backend_manager().active_backend self.backend = resolved_backend self.tensor_ops = get_tensor_ops(self.backend) self.differentiable_fractional = bool(differentiable_fractional) self.fractional_axis = int(fractional_axis) self.fractional_step = fractional_step self.fractional_t_grid = fractional_t_grid if ( fractional_step is not None and fractional_t_grid is not None ): raise ValueError( "Pass at most one of fractional_step and fractional_t_grid." ) # Initialize fractional derivative calculators self.rl_calculator = RiemannLiouville(fractional_order) self.caputo_calculator = Caputo(fractional_order) self.gl_calculator = GrunwaldLetnikov(fractional_order) # Build network layers self.layers = [] self._build_network() # Initialize weights self._initialize_weights() # Training state self.training = True
[docs] def train(self, mode: bool = True): """Set training mode""" self.training = mode
[docs] def eval(self): """Set evaluation mode""" self.train(False)
[docs] def parameters(self) -> List[Any]: """Return list of learnable parameters for compatibility with optimizers/tests""" params: List[Any] = [] params.extend(self.weights) params.extend(self.biases) return params
[docs] def _build_network(self): """Build the network architecture using the current backend""" # Input layer self.layers.append({ 'type': 'linear', 'in_features': self.input_size, 'out_features': self.hidden_sizes[0] }) # Hidden layers for i in range(len(self.hidden_sizes) - 1): self.layers.append({ 'type': 'linear', 'in_features': self.hidden_sizes[i], 'out_features': self.hidden_sizes[i + 1] }) # Output layer self.layers.append({ 'type': 'linear', 'in_features': self.hidden_sizes[-1], 'out_features': self.output_size }) # Initialize weights and biases for each layer self.weights = [] self.biases = [] for layer in self.layers: if layer['type'] == 'linear': # Initialize weights with proper random data if self.backend == BackendType.TORCH: import torch weight = torch.randn( layer['in_features'], layer['out_features'], dtype=torch.float32, requires_grad=True) bias = torch.zeros( layer['out_features'], dtype=torch.float32, requires_grad=True) elif self.backend == BackendType.JAX: import jax.random as random import jax.numpy as jnp key = random.PRNGKey(0) weight = random.normal( key, (layer['in_features'], layer['out_features'])) bias = jnp.zeros(layer['out_features']) else: # NUMBA import numpy as np weight = np.random.randn( layer['in_features'], layer['out_features']) bias = np.zeros(layer['out_features']) self.weights.append(weight) self.biases.append(bias)
[docs] def _initialize_weights(self): """Initialize network weights using Xavier initialization""" for i, (weight, bias) in enumerate(zip(self.weights, self.biases)): if self.backend == BackendType.TORCH: import torch.nn.init as init init.xavier_uniform_(weight) if bias is not None: init.zeros_(bias) else: # Standard Xavier initialization for JAX/Numba fan_in = weight.shape[0] fan_out = weight.shape[1] # Use tensor_ops to avoid manual math imports if desired, # but standard numpy/math is fine for init limit = np.sqrt(6 / (fan_in + fan_out)) if self.backend == BackendType.JAX: # JAX weights are immutable, but here 'weight' is likely a JAX array # being held in a list. In JAX, we usually carry a key and init # during build, but for this refactor we rely on the placeholder structure. # We can't mutate 'weight' in-place for JAX. # But self.weights is a list, so we can replace. # Note: Earlier code used simple scaling. Let's stick to simple scaling # to avoid complex RNG logic here without passing keys around. # Re-implementing the original scaling logic for consistency: import math scale = math.sqrt(2.0 / (fan_in + fan_out)) self.weights[i] = weight * scale self.biases[i] = bias * 0.0 else: # Numba/Numpy -> Mutable import math scale = math.sqrt(2.0 / (fan_in + fan_out)) np.copyto(self.weights[i], self.weights[i] * scale) np.copyto(self.biases[i], self.biases[i] * 0.0)
[docs] def fractional_forward( self, x: Any, method: str = "RL", *, axis: Optional[int] = None, fractional_step: Optional[float] = None, fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None, ) -> Any: """ Apply a discrete fractional operator along ``axis`` (default: ``fractional_axis``). Optional per-call overrides: ``axis``, ``fractional_step``, ``fractional_t_grid`` (same semantics as constructor; per-call grid overrides instance grid). """ alpha = float(self.fractional_order.alpha) m = method.upper() ax = self.fractional_axis if axis is None else int(axis) h_arg = ( fractional_step if fractional_step is not None else self.fractional_step ) grid_arg = ( fractional_t_grid if fractional_t_grid is not None else self.fractional_t_grid ) if h_arg is not None and grid_arg is not None: raise ValueError( "Pass at most one of fractional_step and fractional_t_grid " "(per-call or on the model)." ) if self.differentiable_fractional: try: return fractional_feature_map_native( x, backend=self.backend, alpha=alpha, method=m, axis=ax, h=h_arg, t_grid=grid_arg, ) except NotImplementedError as e: warnings.warn( f"Native fractional path unavailable ({e!r}); using legacy NumPy " "operator with straight-through / detach on Torch or JAX.", RuntimeWarning, stacklevel=2, ) if m == "RL": calculator = self.rl_calculator elif m == "CAPUTO": calculator = self.caputo_calculator elif m == "GL": calculator = self.gl_calculator else: raise ValueError(f"Unknown method: {method}") if self.backend == BackendType.TORCH: x_np = x.detach().cpu().numpy().astype(np.float32) elif self.backend == BackendType.JAX: x_np = np.asarray(x, dtype=np.float32) else: x_np = np.array(x, dtype=np.float32) nd = x_np.ndim if ax < 0: ax += nd if not 0 <= ax < nd: raise ValueError(f"fractional axis {ax} out of bounds for ndim={nd}") n = int(x_np.shape[ax]) t_np, dt = _t_grid_and_h(n, h=h_arg, t_grid=grid_arg) t_np = t_np.astype(np.float32, copy=False) x_line = np.moveaxis(x_np, ax, -1) def _one_row(row: np.ndarray) -> np.ndarray: return calculator.compute(row, t_np, dt) result = np.apply_along_axis(_one_row, -1, x_line) result = np.moveaxis(result, -1, ax) fr_np = result.astype(np.float32) if self.backend == BackendType.TORCH: import torch fr = torch.as_tensor(fr_np, device=x.device, dtype=torch.float32) return fr.detach() + (x - x.detach()) if self.backend == BackendType.JAX: import jax import jax.numpy as jnp fr = jnp.asarray(fr_np, dtype=jnp.result_type(x)) return jax.lax.stop_gradient(fr) + ( x - jax.lax.stop_gradient(x) ) return self.tensor_ops.create_tensor(fr_np)
[docs] def forward( self, x: Any, use_fractional: bool = True, method: str = "RL", params: Optional[Dict[str, List[Any]]] = None, fractional_axis: Optional[int] = None, fractional_step: Optional[float] = None, fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None, ) -> Any: """ Forward pass through the network Args: x: Input tensor use_fractional: Whether to apply fractional derivatives method: Fractional derivative method if use_fractional is True params: Optional dictionary of parameters {'weights': [...], 'biases': [...]} for functional execution (JAX support). fractional_axis: Optional override for the sampled axis (see class docstring). fractional_step: Optional uniform step ``h`` for the fractional grid. fractional_t_grid: Optional uniform 1D grid (length = size along sampled axis). Returns: Network output """ if use_fractional: x = self.fractional_forward( x, method, axis=fractional_axis, fractional_step=fractional_step, fractional_t_grid=fractional_t_grid, ) # Use provided params or self.params weights = params['weights'] if params else self.weights biases = params['biases'] if params else self.biases # Pass through network layers for i, (weight, bias) in enumerate( zip(weights[:-1], biases[:-1])): # Linear transformation x = self.tensor_ops.matmul(x, weight) + bias # Apply activation x = self._apply_activation(x) # Apply dropout x = self.tensor_ops.dropout(x, p=self.dropout_rate, training=self.training) # Output layer (no activation) x = self.tensor_ops.matmul(x, weights[-1]) + biases[-1] return x
[docs] def _apply_activation(self, x: Any) -> Any: """Apply activation function based on backend""" if self.activation_name == "relu": return self.tensor_ops.relu(x) elif self.activation_name == "sigmoid": return self.tensor_ops.sigmoid(x) elif self.activation_name == "tanh": return self.tensor_ops.tanh(x) else: return x
[docs] def save_model(self, path: str): """Save model to file""" Path(path).parent.mkdir(parents=True, exist_ok=True) # Save weights and biases model_data = { 'weights': [ self.tensor_ops.create_tensor(w) for w in self.weights], 'biases': [ self.tensor_ops.create_tensor(b) for b in self.biases]} if self.backend == BackendType.TORCH: import torch torch.save(model_data, path) else: import pickle with open(path, 'wb') as f: pickle.dump(model_data, f) # Save configuration config_path = path.replace('.pth', '_config.json') config_data = { 'input_size': self.input_size, 'hidden_sizes': self.hidden_sizes, 'output_size': self.output_size, 'fractional_order': float(self.fractional_order), 'activation': self.activation_name, 'backend': self.backend.value } with open(config_path, 'w') as f: json.dump(config_data, f, indent=2)
[docs] @classmethod def load_model(cls, path: str, config_path: Optional[str] = None): """Load model from file""" if config_path is None: config_path = path.replace('.pth', '_config.json') with open(config_path, 'r') as f: config_data = json.load(f) # Determine backend from config backend = BackendType(config_data.get('backend', 'torch')) model = cls( input_size=config_data['input_size'], hidden_sizes=config_data['hidden_sizes'], output_size=config_data['output_size'], fractional_order=config_data['fractional_order'], backend=backend ) # Load weights and biases if backend == BackendType.TORCH: import torch model_data = torch.load(path) else: import pickle with open(path, 'rb') as f: model_data = pickle.load(f) model.weights = model_data['weights'] model.biases = model_data['biases'] return model
def __call__( self, x: Any, use_fractional: bool = True, method: str = "RL") -> Any: """Make the network callable""" return self.forward(x, use_fractional, method)
[docs] class FractionalAttention: """ Multi-head attention with an optional **discrete** fractional map on the attended context. After standard attention, the context tensor has shape ``(batch, n_heads, seq_len, d_k)``. The fractional step runs along the **sequence** dimension (axis ``2``), using the same native Grรผnwaldโ€“Letnikov / L1-Caputo machinery as ``FractionalNeuralNetwork`` when ``differentiable_fractional`` is True. Optional ``fractional_step`` and ``fractional_t_grid`` define a **uniform** grid along that axis (see ``fractional_feature_map_native``). The spectral FFT stack in ``hpfracc.ml.spectral_autograd`` is separate; this class does not use it. Supports PyTorch, JAX, and NUMBA backends. """
[docs] def __init__( self, d_model: int, n_heads: int = 8, fractional_order: float = 0.5, dropout: float = 0.1, backend: Optional[BackendType] = None, differentiable_fractional: bool = True, fractional_step: Optional[float] = None, fractional_t_grid: Optional[Union[np.ndarray, Sequence[float]]] = None, ): self.d_model = d_model self.n_heads = n_heads # Ensure d_k is valid if d_model % n_heads != 0: # Adjust d_model to be divisible by n_heads self.d_model = ((d_model // n_heads) + 1) * n_heads print( f"Warning: d_model adjusted from {d_model} to {self.d_model} to be divisible by {n_heads}") self.d_k = self.d_model // n_heads self.fractional_order = FractionalOrder(fractional_order) self.dropout_rate = dropout self.differentiable_fractional = bool(differentiable_fractional) self.fractional_step = fractional_step self.fractional_t_grid = fractional_t_grid if fractional_step is not None and fractional_t_grid is not None: raise ValueError( "Pass at most one of fractional_step and fractional_t_grid." ) # Set backend self.backend = backend or get_backend_manager().active_backend self.tensor_ops = get_tensor_ops(self.backend) self.training = True # Initialize attention weights self._initialize_weights() # Fractional derivative calculators self.rl_calculator = RiemannLiouville(fractional_order) self.caputo_calculator = Caputo(fractional_order) self.gl_calculator = GrunwaldLetnikov(fractional_order)
[docs] def train(self, mode: bool = True): self.training = bool(mode) return self
[docs] def eval(self): return self.train(False)
[docs] def _initialize_weights(self): """Initialize attention weights""" if self.backend == BackendType.TORCH: import torch self.w_q = torch.randn( self.d_model, self.d_model, dtype=torch.float32, requires_grad=True) self.w_k = torch.randn( self.d_model, self.d_model, dtype=torch.float32, requires_grad=True) self.w_v = torch.randn( self.d_model, self.d_model, dtype=torch.float32, requires_grad=True) self.w_o = torch.randn( self.d_model, self.d_model, dtype=torch.float32, requires_grad=True) # Xavier initialization import torch.nn.init as init init.xavier_uniform_(self.w_q) init.xavier_uniform_(self.w_k) init.xavier_uniform_(self.w_v) init.xavier_uniform_(self.w_o) elif self.backend == BackendType.JAX: import jax.random as random key = random.PRNGKey(0) self.w_q = random.normal(key, (self.d_model, self.d_model)) self.w_k = random.normal(key, (self.d_model, self.d_model)) self.w_v = random.normal(key, (self.d_model, self.d_model)) self.w_o = random.normal(key, (self.d_model, self.d_model)) else: # NUMBA import numpy as np self.w_q = np.random.randn(self.d_model, self.d_model) self.w_k = np.random.randn(self.d_model, self.d_model) self.w_v = np.random.randn(self.d_model, self.d_model) self.w_o = np.random.randn(self.d_model, self.d_model)
[docs] def fractional_attention( self, q: Any, k: Any, v: Any, method: str = "RL") -> Any: """ Compute attention with fractional derivatives Args: q, k, v: Query, key, value tensors of shape (batch_size, n_heads, seq_len, d_k) method: Fractional derivative method Returns: Attention output with fractional calculus applied """ # Compute attention scores # Ensure tensors are in (batch, heads, seq, d_k) # Some tests provide input as (seq, batch, d_model); our forward reshapes accordingly. if self.backend == BackendType.TORCH: import torch k_t = k.transpose(2, 3).contiguous() else: k_t = self.tensor_ops.transpose(k, (0, 1, 3, 2)) # Compute scale factor sqrt(d_k) as scalar to avoid broadcast issues if self.backend == BackendType.TORCH: import torch d_k_sqrt_scalar = float(self.d_k) ** 0.5 scores = torch.matmul(q, k_t) / d_k_sqrt_scalar else: d_k_tensor = self.tensor_ops.create_tensor(self.d_k) d_k_sqrt = self.tensor_ops.sqrt(d_k_tensor) scores = self.tensor_ops.matmul(q, k_t) / d_k_sqrt attention_weights = self.tensor_ops.softmax(scores, dim=-1) attention_weights = self.tensor_ops.dropout( attention_weights, p=self.dropout_rate, training=self.training) # Apply attention to values context = self.tensor_ops.matmul(attention_weights, v) m = method.upper() if m == "RL": calculator = self.rl_calculator elif m == "CAPUTO": calculator = self.caputo_calculator elif m == "GL": calculator = self.gl_calculator else: raise ValueError(f"Unknown method: {method}") alpha = float(self.fractional_order.alpha) seq_axis = 2 if self.differentiable_fractional: try: return fractional_feature_map_native( context, backend=self.backend, alpha=alpha, method=m, axis=seq_axis, h=self.fractional_step, t_grid=self.fractional_t_grid, ) except NotImplementedError as e: warnings.warn( f"Native fractional path on attention output unavailable ({e!r}); " "using legacy NumPy path with straight-through / detach.", RuntimeWarning, stacklevel=2, ) context_np = ( context.detach().cpu().numpy() if self.backend == BackendType.TORCH else np.asarray(context) ) n = int(context_np.shape[seq_axis]) t_np, dt = _t_grid_and_h( n, h=self.fractional_step, t_grid=self.fractional_t_grid ) t_np = t_np.astype(np.float32, copy=False) ctx_line = np.moveaxis(context_np, seq_axis, -1) def _one_slice(row: np.ndarray) -> np.ndarray: return calculator.compute(row, t_np, dt) result = np.apply_along_axis(_one_slice, -1, ctx_line) result = np.moveaxis(result, -1, seq_axis).astype(np.float32) if self.backend == BackendType.TORCH: import torch fr = torch.as_tensor( result, device=context.device, dtype=context.dtype ) return fr.detach() + (context - context.detach()) if self.backend == BackendType.JAX: import jax import jax.numpy as jnp fr = jnp.asarray(result, dtype=jnp.result_type(context)) return jax.lax.stop_gradient(fr) + ( context - jax.lax.stop_gradient(context) ) return self.tensor_ops.create_tensor(result)
[docs] def forward(self, x: Any, method: str = "RL") -> Any: """ Forward pass through fractional attention Args: x: Input tensor of shape (batch_size, seq_len, d_model) method: Fractional derivative method Returns: Output tensor with attention and fractional calculus applied """ # Accept both (batch, seq, d_model) and (seq, batch, d_model) original_layout_seq_batch = False if hasattr(x, "shape") and len(x.shape) == 3: b0, b1, b2 = x.shape # Common case in tests: (seq, batch, d_model) with batch < seq if b2 == self.d_model and b1 < b0: original_layout_seq_batch = True if self.backend == BackendType.TORCH: x = x.permute(1, 0, 2).contiguous() else: x = self.tensor_ops.transpose(x, (1, 0, 2)) batch_size, seq_len, _ = x.shape # Linear transformations if self.backend == BackendType.TORCH: import torch # Ensure contiguous and perform batched matmul via flattening b, t, d = x.shape x2 = x.contiguous().view(b * t, d) q2 = torch.matmul(x2, self.w_q) k2 = torch.matmul(x2, self.w_k) v2 = torch.matmul(x2, self.w_v) q = q2.view(b, t, d) k = k2.view(b, t, d) v = v2.view(b, t, d) else: q = self.tensor_ops.matmul(x, self.w_q) k = self.tensor_ops.matmul(x, self.w_k) v = self.tensor_ops.matmul(x, self.w_v) # Reshape for multi-head attention q = self.tensor_ops.reshape( q, (batch_size, seq_len, self.n_heads, self.d_k)) k = self.tensor_ops.reshape( k, (batch_size, seq_len, self.n_heads, self.d_k)) v = self.tensor_ops.reshape( v, (batch_size, seq_len, self.n_heads, self.d_k)) # Transpose for attention computation (batch_size, n_heads, seq_len, # d_k) q = self.tensor_ops.transpose(q, dims=(0, 2, 1, 3)) k = self.tensor_ops.transpose(k, dims=(0, 2, 1, 3)) v = self.tensor_ops.transpose(v, dims=(0, 2, 1, 3)) # Apply fractional attention context = self.fractional_attention(q, k, v, method) # Reshape and apply output projection context = self.tensor_ops.transpose(context, dims=(0, 2, 1, 3)) context = self.tensor_ops.reshape( context, (batch_size, seq_len, self.d_model)) output = self.tensor_ops.matmul(context, self.w_o) # Residual connection and layer normalization (simplified) # Ensure consistent dtype for residual connection if self.backend == BackendType.TORCH: if x.dtype != output.dtype: output = output.to(x.dtype) output = x + output # Convert back to original layout if needed if original_layout_seq_batch: if self.backend == BackendType.TORCH: output = output.permute(1, 0, 2).contiguous() else: output = self.tensor_ops.transpose(output, dims=(1, 0, 2)) return output
def __call__(self, x: Any, method: str = "RL") -> Any: """Make the attention mechanism callable""" return self.forward(x, method)
[docs] class FractionalLossFunction: """ Base class for loss functions with fractional calculus integration This class provides a framework for creating loss functions that incorporate fractional derivatives to capture complex relationships. Supports multiple backends: PyTorch, JAX, and NUMBA. """
[docs] def __init__(self, fractional_order: float = 0.5, backend: Optional[BackendType] = None): self.fractional_order = FractionalOrder(fractional_order) self.backend = backend or get_backend_manager().active_backend self.tensor_ops = get_tensor_ops(self.backend) self.rl_calculator = RiemannLiouville(fractional_order)
[docs] @abstractmethod def compute_loss(self, predictions: Any, targets: Any) -> Any: """Compute the base loss"""
[docs] def fractional_loss(self, predictions: Any, targets: Any) -> Any: """ Compute loss with fractional derivative applied to predictions Args: predictions: Model predictions targets: Ground truth targets Returns: Fractional loss value """ # Apply fractional derivative to predictions if self.backend == BackendType.TORCH: pred_np = predictions.detach().cpu().numpy() else: pred_np = np.array(predictions) if pred_np.ndim == 2: # For 2D tensors (batch_size, features) # Apply along axis 1 (features) t = np.linspace(0, 1, pred_np.shape[1]) dt = t[1] - t[0] if len(t) > 1 else 1.0 result = np.apply_along_axis(self.rl_calculator.compute, 1, pred_np, t, dt) else: # For 1D tensors t = np.linspace(0, 1, pred_np.shape[0]) dt = t[1] - t[0] if len(t) > 1 else 1.0 result = self.rl_calculator.compute(pred_np, t, dt) fractional_pred = self.tensor_ops.create_tensor( result, requires_grad=True) # Compute loss with fractional predictions return self.compute_loss(fractional_pred, targets)
[docs] def forward(self, predictions: Any, targets: Any, use_fractional: bool = True) -> Any: """ Forward pass for loss computation Args: predictions: Model predictions targets: Ground truth targets use_fractional: Whether to apply fractional derivatives Returns: Loss value """ if use_fractional: return self.fractional_loss(predictions, targets) else: return self.compute_loss(predictions, targets)
[docs] class FractionalMSELoss(FractionalLossFunction): """Mean Squared Error loss with fractional calculus integration"""
[docs] def compute_loss(self, predictions: Any, targets: Any) -> Any: return self.tensor_ops.mean((predictions - targets) ** 2)
[docs] class FractionalCrossEntropyLoss(FractionalLossFunction): """Cross Entropy loss with fractional calculus integration"""
[docs] def compute_loss(self, predictions: Any, targets: Any) -> Any: # Simplified cross-entropy for multi-backend compatibility # In practice, you'd want more sophisticated implementations return self.tensor_ops.mean(-targets * self.tensor_ops.log( self.tensor_ops.softmax(predictions, dim=-1)))
[docs] class FractionalAutoML: """ Automated Machine Learning for fractional calculus parameters This class provides automated optimization of fractional orders and other hyperparameters for optimal performance on specific tasks. """
[docs] def __init__(self, config: Optional[MLConfig] = None): self.config = config or MLConfig() self.best_params = {} self.optimization_history = []
[docs] def optimize_fractional_order( self, model_class: type, train_data: Tuple[Any, Any], val_data: Tuple[Any, Any], param_ranges: Dict[str, List[float]], n_trials: int = 50, metric: str = "accuracy" ) -> Dict[str, Any]: """ Optimize fractional order and other hyperparameters Args: model_class: Class of model to optimize train_data: Training data (X, y) val_data: Validation data (X, y) param_ranges: Dictionary of parameter ranges to search n_trials: Number of optimization trials metric: Metric to optimize Returns: Dictionary with best parameters and optimization results """ import optuna def objective(trial): # Sample parameters params = {} for param_name, param_range in param_ranges.items(): if isinstance(param_range[0], int): params[param_name] = trial.suggest_int( param_name, param_range[0], param_range[1]) elif isinstance(param_range[0], float): params[param_name] = trial.suggest_float( param_name, param_range[0], param_range[1]) else: params[param_name] = trial.suggest_categorical( param_name, param_range) # Create model try: model = model_class(**params) # Setup optimizer (basic SGD for demo/speed in AutoML) # In a real scenario, this should be configurable or use the model's preferred optimizer if model.backend == BackendType.TORCH: import torch.optim as optim import torch.nn as nn import torch optimizer = optim.Adam(model.parameters(), lr=0.01) criterion = nn.MSELoss() if metric != "accuracy" else nn.CrossEntropyLoss() X_train, y_train = train_data X_val, y_val = val_data # Ensure tensors if not isinstance(X_train, torch.Tensor): X_train = torch.tensor(X_train, dtype=torch.float32) y_train = torch.tensor(y_train, dtype=torch.float32 if metric != "accuracy" else torch.long) X_val = torch.tensor(X_val, dtype=torch.float32) y_val = torch.tensor(y_val, dtype=torch.float32 if metric != "accuracy" else torch.long) # Short training loop for trial model.train() epochs = 5 # Reduced epochs for speed in AutoML loop for _ in range(epochs): optimizer.zero_grad() output = model(X_train) loss = criterion(output, y_train) loss.backward() optimizer.step() # Validation model.eval() with torch.no_grad(): val_out = model(X_val) if metric == "accuracy": preds = val_out.argmax(dim=1) score = (preds == y_val).float().mean().item() else: score = criterion(val_out, y_val).item() return score elif model.backend == BackendType.JAX: import jax import jax.numpy as jnp import optax # Setup data X_train, y_train = train_data X_val, y_val = val_data X_train = jnp.asarray(X_train) y_train = jnp.asarray(y_train) X_val = jnp.asarray(X_val) y_val = jnp.asarray(y_val) # Define optimizer optimizer = optax.adam(learning_rate=0.01) # Initialize params as a PyTree params = { 'weights': model.weights, # These are already JAX arrays 'biases': model.biases } opt_state = optimizer.init(params) # Define loss function def loss_fn(p, x, y): preds = model.forward(x, params=p) if metric == "accuracy": # Cross Entropy # Simplified: -mean(sum(one_hot(y) * log(softmax(preds)))) logits = jax.nn.log_softmax(preds) n_classes = preds.shape[-1] one_hot = jax.nn.one_hot(y, n_classes) return -jnp.mean(jnp.sum(one_hot * logits, axis=-1)) else: # MSE return jnp.mean((preds - y) ** 2) # JIT compile update step @jax.jit def step(p, opt_st, x, y): loss, grads = jax.value_and_grad(loss_fn)(p, x, y) updates, new_opt_st = optimizer.update(grads, opt_st) new_params = optax.apply_updates(p, updates) return new_params, new_opt_st, loss # Training loop epochs = 5 for _ in range(epochs): params, opt_state, loss = step(params, opt_state, X_train, y_train) # Validation val_preds = model.forward(X_val, params=params) if metric == "accuracy": preds_cls = jnp.argmax(val_preds, axis=1) score = jnp.mean(preds_cls == y_val).item() else: score = jnp.mean((val_preds - y_val) ** 2).item() return float(score) except Exception as e: # Prune failed trials print(f"Trial failed: {e}") raise optuna.exceptions.TrialPruned() # Create study and optimize study = optuna.create_study( direction="maximize" if metric == "accuracy" else "minimize") study.optimize(objective, n_trials=n_trials) # Store results self.best_params = study.best_params self.optimization_history = study.trials return { 'best_params': self.best_params, 'best_value': study.best_value, 'optimization_history': self.optimization_history }
[docs] def get_best_model(self, model_class: type, **kwargs) -> Any: """Get model instance with best parameters""" if not self.best_params: raise ValueError("No optimization has been run yet") # Merge best params with additional kwargs params = {**self.best_params, **kwargs} return model_class(**params)