Source code for hpfracc.ml.gpu_optimization

"""
GPU optimization utilities for fractional calculus computations.

This module provides GPU acceleration features including Automatic Mixed Precision (AMP),
chunked FFT operations, and performance profiling for fractional calculus operations.
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.amp import autocast, GradScaler
import time
import numpy as np
from typing import Dict, List, Optional, Any, Tuple, Union
from contextlib import contextmanager
import warnings
from dataclasses import dataclass
from abc import ABC, abstractmethod


[docs] @dataclass class PerformanceMetrics: """Container for performance metrics.""" operation: str device: str dtype: str input_shape: Tuple[int, ...] execution_time: float memory_used: float memory_peak: float throughput: float # operations per second timestamp: float
[docs] class GPUProfiler: """Simple profiler for GPU operations."""
[docs] def __init__(self, device: str = "cuda"): self.device = device self.metrics_history: List[PerformanceMetrics] = [] self.current_metrics: Dict[str, PerformanceMetrics] = {}
[docs] def start_timer(self, operation: str): """Start timing an operation.""" if torch.cuda.is_available() and self.device == "cuda": torch.cuda.synchronize() self.start_time = time.time() self.operation = operation
[docs] def end_timer(self, input_tensor: torch.Tensor, output_tensor: Optional[torch.Tensor] = None): """End timing and record metrics.""" if torch.cuda.is_available() and self.device == "cuda": torch.cuda.synchronize() end_time = time.time() execution_time = end_time - self.start_time # Get memory usage memory_used = 0.0 memory_peak = 0.0 if torch.cuda.is_available() and self.device == "cuda": memory_used = torch.cuda.memory_allocated() / 1024**3 # GB memory_peak = torch.cuda.max_memory_allocated() / 1024**3 # GB # Calculate throughput num_elements = input_tensor.numel() if output_tensor is not None: num_elements += output_tensor.numel() throughput = num_elements / execution_time if execution_time > 0 else 0 # Create metrics metrics = PerformanceMetrics( operation=self.operation, device=str(input_tensor.device), dtype=str(input_tensor.dtype), input_shape=tuple(input_tensor.shape), execution_time=execution_time, memory_used=memory_used, memory_peak=memory_peak, throughput=throughput, timestamp=time.time() ) # Store metrics self.current_metrics[self.operation] = metrics self.metrics_history.append(metrics) return metrics
[docs] def get_summary(self) -> Dict[str, Dict[str, float]]: """Get performance summary.""" summary = {} for operation, metrics in self.current_metrics.items(): summary[operation] = { 'execution_time': metrics.execution_time, 'memory_used': metrics.memory_used, 'memory_peak': metrics.memory_peak, 'throughput': metrics.throughput } return summary
[docs] def clear_history(self): """Clear metrics history.""" self.metrics_history.clear() self.current_metrics.clear()
[docs] class ChunkedFFT: """Chunked FFT operations for large sequences."""
[docs] def __init__(self, chunk_size: int = 1024, overlap: int = 128): self.chunk_size = chunk_size self.overlap = overlap
[docs] def fft_chunked(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor: """Perform chunked FFT on large sequences.""" if x.shape[dim] <= self.chunk_size: return torch.fft.fft(x, dim=dim) # For large sequences, use chunked processing return self._process_chunks(x, dim, torch.fft.fft)
[docs] def ifft_chunked(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor: """Perform chunked IFFT on large sequences.""" if x.shape[dim] <= self.chunk_size: return torch.fft.ifft(x, dim=dim) # For large sequences, use chunked processing return self._process_chunks(x, dim, torch.fft.ifft)
[docs] def _process_chunks(self, x: torch.Tensor, dim: int, fft_func) -> torch.Tensor: """Process tensor in chunks with overlap.""" original_shape = x.shape sequence_length = x.shape[dim] # Using simple chunking. For overlap-add reconstruction: # 1. Each chunk overlaps with previous/next by overlap_size # 2. Apply windowing function (e.g., Hann window) # 3. Reconstruct using weighted sum in overlap regions # Currently using simple concatenation for stability if sequence_length <= self.chunk_size: return fft_func(x, dim=dim) # Calculate number of chunks num_chunks = (sequence_length + self.chunk_size - 1) // self.chunk_size # Initialize output output_chunks = [] for i in range(num_chunks): start_idx = i * self.chunk_size end_idx = min(start_idx + self.chunk_size, sequence_length) # Extract chunk if dim == -1: chunk = x[..., start_idx:end_idx] else: # Handle other dimensions indices = [slice(None)] * x.dim() indices[dim] = slice(start_idx, end_idx) chunk = x[tuple(indices)] # Apply FFT chunk_fft = fft_func(chunk, dim=dim) output_chunks.append(chunk_fft) # Combine chunks if dim == -1: result = torch.cat(output_chunks, dim=dim) else: result = torch.cat(output_chunks, dim=dim) return result
[docs] class AMPFractionalEngine: """Automatic Mixed Precision wrapper for fractional engines."""
[docs] def __init__(self, base_engine, use_amp: bool = True, dtype: torch.dtype = torch.float16): self.base_engine = base_engine self.use_amp = use_amp self.dtype = dtype self.scaler = GradScaler('cuda') if use_amp else None
[docs] def forward(self, x: torch.Tensor, alpha: float, **kwargs) -> torch.Tensor: """Forward pass with AMP support.""" if self.use_amp and x.device.type == 'cuda': with autocast(device_type='cuda', dtype=self.dtype): return self.base_engine.forward(x, alpha, **kwargs) else: return self.base_engine.forward(x, alpha, **kwargs)
[docs] def backward(self, grad_output: torch.Tensor, **kwargs) -> torch.Tensor: """Backward pass with AMP support.""" if self.use_amp and self.scaler is not None: return self.scaler.scale(grad_output) else: return grad_output
[docs] class GPUOptimizedSpectralEngine: """GPU-optimized spectral engine with AMP and chunked FFT."""
[docs] def __init__(self, engine_type: str = "fft", use_amp: bool = True, chunk_size: int = 1024, dtype: torch.dtype = torch.float16): self.engine_type = engine_type self.use_amp = use_amp self.chunk_size = chunk_size self.dtype = dtype # Initialize components self.chunked_fft = ChunkedFFT(chunk_size=chunk_size) self.profiler = GPUProfiler() # AMP scaler self.scaler = GradScaler('cuda') if use_amp else None
[docs] def forward(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """GPU-optimized forward pass.""" self.profiler.start_timer(f"{self.engine_type}_forward") try: if self.use_amp and x.device.type == 'cuda': with autocast(device_type='cuda', dtype=self.dtype): result = self._compute_spectral_transform(x, alpha) else: result = self._compute_spectral_transform(x, alpha) self.profiler.end_timer(x, result) return result except Exception as e: # Fallback to CPU or different precision warnings.warn(f"GPU optimization failed, falling back: {e}") return self._fallback_compute(x, alpha)
[docs] def _compute_spectral_transform(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """Compute spectral transform with GPU optimizations.""" if self.engine_type == "fft": # Use chunked FFT for large sequences x_fft = self.chunked_fft.fft_chunked(x) # Apply fractional operator in frequency domain N = x_fft.shape[-1] omega = torch.fft.fftfreq(N, device=x.device, dtype=torch.float32) multiplier = torch.pow(torch.abs(omega) + 1e-8, alpha) # Apply multiplier result_fft = x_fft * multiplier # Inverse FFT result = self.chunked_fft.ifft_chunked(result_fft) return result.real elif self.engine_type == "mellin": # GPU-optimized Mellin transform approximation # The Mellin transform M[f](s) = ∫₀^∞ t^(s-1) f(t) dt # For fractional derivatives, we use the relationship: # D^α f(x) = M^(-1)[s^α M[f](s)] # Convert to log-space for Mellin transform approximation # This is a simplified implementation using FFT on log-scaled data x_positive = torch.abs(x) + 1e-8 log_x = torch.log(x_positive) # Apply FFT in log space x_fft = self.chunked_fft.fft_chunked(log_x) # Apply fractional operator N = x_fft.shape[-1] s = torch.arange(N, device=x.device, dtype=torch.float32) multiplier = torch.pow(s + 1.0, alpha) # Apply and inverse transform result_fft = x_fft * multiplier result = self.chunked_fft.ifft_chunked(result_fft) # Convert back from log space result = torch.exp(result.real) # Restore sign result = result * torch.sign(x) return result elif self.engine_type == "laplacian": # GPU-optimized fractional Laplacian x_fft = self.chunked_fft.fft_chunked(x) N = x_fft.shape[-1] xi = torch.fft.fftfreq(N, device=x.device, dtype=torch.float32) multiplier = torch.pow(torch.abs(xi) + 1e-8, alpha) result_fft = x_fft * multiplier result = self.chunked_fft.ifft_chunked(result_fft) return result.real else: raise ValueError(f"Unknown engine type: {self.engine_type}")
[docs] def _fallback_compute(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """Fallback computation without GPU optimizations.""" # Simple fallback - just return a scaled version of input return x * (alpha + 1.0)
[docs] def get_performance_summary(self) -> Dict[str, Dict[str, float]]: """Get performance summary.""" return self.profiler.get_summary()
[docs] class GPUOptimizedStochasticSampler: """GPU-optimized stochastic memory sampler."""
[docs] def __init__(self, base_sampler, use_amp: bool = True, batch_size: int = 1024): self.base_sampler = base_sampler self.use_amp = use_amp self.batch_size = batch_size self.profiler = GPUProfiler()
[docs] def sample_indices(self, n: int, k: int) -> torch.Tensor: """GPU-optimized index sampling.""" self.profiler.start_timer("stochastic_sampling") try: if self.use_amp and torch.cuda.is_available(): with autocast(device_type='cuda', dtype=torch.float16): indices = self._gpu_sample_indices(n, k) else: indices = self.base_sampler.sample_indices(n, k) self.profiler.end_timer(torch.tensor([n, k])) return indices except Exception as e: warnings.warn(f"GPU sampling failed, falling back: {e}") return self.base_sampler.sample_indices(n, k)
[docs] def _gpu_sample_indices(self, n: int, k: int) -> torch.Tensor: """GPU-optimized index sampling implementation.""" # For now, use the base sampler but with GPU tensors indices = self.base_sampler.sample_indices(n, k) return indices.to('cuda') if torch.cuda.is_available() else indices
[docs] def get_performance_summary(self) -> Dict[str, Dict[str, float]]: """Get performance summary.""" return self.profiler.get_summary()
[docs] @contextmanager def gpu_optimization_context(use_amp: bool = True, dtype: torch.dtype = torch.float16): """Context manager for GPU optimization.""" if use_amp and torch.cuda.is_available(): with autocast(device_type='cuda', dtype=dtype): yield else: yield
[docs] def benchmark_gpu_optimization(): """Benchmark GPU optimization performance.""" print("Benchmarking GPU optimization...") # Test parameters sequence_lengths = [1024, 2048, 4096, 8192] alpha_values = [0.3, 0.5, 0.7] results = {} for length in sequence_lengths: print(f"\nTesting sequence length: {length}") results[length] = {} # Create test data x = torch.randn( 32, length, device='cuda' if torch.cuda.is_available() else 'cpu') for alpha in alpha_values: print(f" Alpha: {alpha}") # Test different configurations configs = [ ("baseline", False, torch.float32), ("amp_fp16", True, torch.float16), ("amp_bf16", True, torch.bfloat16), ] for config_name, use_amp, dtype in configs: if dtype == torch.bfloat16 and not torch.cuda.is_bf16_supported(): continue # Create engine engine = GPUOptimizedSpectralEngine( engine_type="fft", use_amp=use_amp, dtype=dtype ) # Benchmark start_time = time.time() for _ in range(10): # Multiple runs for averaging result = engine.forward(x, alpha) end_time = time.time() avg_time = (end_time - start_time) / 10 if config_name not in results[length]: results[length][config_name] = {} results[length][config_name][alpha] = avg_time print(f" {config_name}: {avg_time:.4f}s") return results
[docs] def create_gpu_optimized_components(use_amp: bool = True, chunk_size: int = 1024, dtype: torch.dtype = torch.float16): """Factory function to create GPU-optimized components.""" components = {} # Create GPU-optimized spectral engines for engine_type in ["fft", "mellin", "laplacian"]: components[f"{engine_type}_engine"] = GPUOptimizedSpectralEngine( engine_type=engine_type, use_amp=use_amp, chunk_size=chunk_size, dtype=dtype ) return components
# Example usage and testing
[docs] def test_gpu_optimization(): """Test GPU optimization functionality.""" print("Testing GPU optimization...") # Test if CUDA is available if not torch.cuda.is_available(): print("CUDA not available, testing CPU fallback...") device = 'cpu' else: print("CUDA available, testing GPU optimization...") device = 'cuda' # Create test data x = torch.randn(16, 1024, device=device) alpha = 0.5 # Test GPU-optimized spectral engine engine = GPUOptimizedSpectralEngine( engine_type="fft", use_amp=True, chunk_size=512 ) # Test forward pass result = engine.forward(x, alpha) print(f"Input shape: {x.shape}, Output shape: {result.shape}") # Get performance summary summary = engine.get_performance_summary() print("Performance summary:") for op, metrics in summary.items(): print(f" {op}: {metrics['execution_time']:.4f}s, " f"throughput: {metrics['throughput']:.2e} ops/s") # Test chunked FFT chunked_fft = ChunkedFFT(chunk_size=256) x_fft = chunked_fft.fft_chunked(x) x_reconstructed = chunked_fft.ifft_chunked(x_fft) print( f"Chunked FFT reconstruction error: {torch.mean(torch.abs(x - x_reconstructed.real)):.6f}") print("GPU optimization test completed!")
if __name__ == "__main__": test_gpu_optimization() if torch.cuda.is_available(): print("\nRunning GPU benchmark...") benchmark_results = benchmark_gpu_optimization() print("Benchmark completed!")