Source code for hpfracc.ml.gpu_optimization

"""
GPU optimization utilities for fractional calculus computations.

This module provides GPU acceleration features including Automatic Mixed Precision (AMP),
chunked FFT operations, and performance profiling for fractional calculus operations.
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.amp import autocast, GradScaler
import time
import numpy as np
from typing import Dict, List, Optional, Any, Tuple, Union
from contextlib import contextmanager
import warnings
from dataclasses import dataclass
from abc import ABC, abstractmethod


[docs] @dataclass class PerformanceMetrics: """Container for performance metrics.""" operation: str device: str dtype: str input_shape: Tuple[int, ...] execution_time: float memory_used: float memory_peak: float throughput: float # operations per second timestamp: float
[docs] class GPUProfiler: """Simple profiler for GPU operations."""
[docs] def __init__(self, device: str = "cuda"): self.device = device self.metrics_history: List[PerformanceMetrics] = [] self.current_metrics: Dict[str, PerformanceMetrics] = {}
[docs] def start_timer(self, operation: str): """Start timing an operation.""" if torch.cuda.is_available() and self.device == "cuda": torch.cuda.synchronize() self.start_time = time.time() self.operation = operation
[docs] def end_timer(self, input_tensor: torch.Tensor, output_tensor: Optional[torch.Tensor] = None): """End timing and record metrics.""" if torch.cuda.is_available() and self.device == "cuda": torch.cuda.synchronize() end_time = time.time() execution_time = end_time - self.start_time # Get memory usage memory_used = 0.0 memory_peak = 0.0 if torch.cuda.is_available() and self.device == "cuda": memory_used = torch.cuda.memory_allocated() / 1024**3 # GB memory_peak = torch.cuda.max_memory_allocated() / 1024**3 # GB # Calculate throughput num_elements = input_tensor.numel() if output_tensor is not None: num_elements += output_tensor.numel() throughput = num_elements / execution_time if execution_time > 0 else 0 # Create metrics metrics = PerformanceMetrics( operation=self.operation, device=str(input_tensor.device), dtype=str(input_tensor.dtype), input_shape=tuple(input_tensor.shape), execution_time=execution_time, memory_used=memory_used, memory_peak=memory_peak, throughput=throughput, timestamp=time.time() ) # Store metrics self.current_metrics[self.operation] = metrics self.metrics_history.append(metrics) return metrics
[docs] def get_summary(self) -> Dict[str, Dict[str, float]]: """Get performance summary.""" summary = {} for operation, metrics in self.current_metrics.items(): summary[operation] = { 'execution_time': metrics.execution_time, 'memory_used': metrics.memory_used, 'memory_peak': metrics.memory_peak, 'throughput': metrics.throughput } return summary
[docs] def clear_history(self): """Clear metrics history.""" self.metrics_history.clear() self.current_metrics.clear()
[docs] class ChunkedFFT: """Chunked FFT operations for large sequences."""
[docs] def __init__(self, chunk_size: int = 1024, overlap: Union[int, float] = 0.1, window_type: str = "hann"): self.chunk_size = chunk_size self.overlap = overlap self.window_type = window_type
[docs] def fft_chunked(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor: """Perform chunked FFT on large sequences.""" if x.numel() == 0 or x.shape[dim] == 0: return x if x.shape[dim] <= self.chunk_size: return torch.fft.fft(x, dim=dim) # For large sequences, use chunked processing return self._process_chunks(x, dim, torch.fft.fft)
[docs] def ifft_chunked(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor: """Perform chunked IFFT on large sequences.""" if x.numel() == 0 or x.shape[dim] == 0: return x if x.shape[dim] <= self.chunk_size: return torch.fft.ifft(x, dim=dim) # For large sequences, use chunked processing return self._process_chunks(x, dim, torch.fft.ifft)
[docs] def forward(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor: """Alias for fft_chunked.""" return self.fft_chunked(x, dim)
[docs] def inverse(self, x: torch.Tensor, dim: int = -1) -> torch.Tensor: """Alias for ifft_chunked.""" return self.ifft_chunked(x, dim)
[docs] def _process_chunks(self, x: torch.Tensor, dim: int, fft_func) -> torch.Tensor: """Process tensor in chunks with overlap.""" original_shape = x.shape sequence_length = x.shape[dim] # Using simple chunking. For overlap-add reconstruction: # 1. Each chunk overlaps with previous/next by overlap_size # 2. Apply windowing function (e.g., Hann window) # 3. Reconstruct using weighted sum in overlap regions # Currently using simple concatenation for stability if sequence_length <= self.chunk_size: return fft_func(x, dim=dim) # Calculate number of chunks num_chunks = (sequence_length + self.chunk_size - 1) // self.chunk_size # Initialize output output_chunks = [] for i in range(num_chunks): start_idx = i * self.chunk_size end_idx = min(start_idx + self.chunk_size, sequence_length) # Extract chunk if dim == -1: chunk = x[..., start_idx:end_idx] else: # Handle other dimensions indices = [slice(None)] * x.dim() indices[dim] = slice(start_idx, end_idx) chunk = x[tuple(indices)] # Apply FFT chunk_fft = fft_func(chunk, dim=dim) output_chunks.append(chunk_fft) # Combine chunks if dim == -1: result = torch.cat(output_chunks, dim=dim) else: result = torch.cat(output_chunks, dim=dim) return result
[docs] class AMPFractionalEngine: """Automatic Mixed Precision wrapper for fractional engines."""
[docs] def __init__(self, base_engine, use_amp: bool = True, dtype: torch.dtype = torch.float16): self.base_engine = base_engine self.use_amp = use_amp self.dtype = dtype self.scaler = GradScaler('cuda') if (use_amp and torch.cuda.is_available()) else None
[docs] def forward(self, x: torch.Tensor, alpha: float, **kwargs) -> torch.Tensor: """Forward pass with AMP support.""" if self.use_amp and x.device.type == 'cuda': with autocast(device_type='cuda', dtype=self.dtype): return self.base_engine.forward(x, alpha, **kwargs) else: return self.base_engine.forward(x, alpha, **kwargs)
[docs] def backward(self, grad_output: torch.Tensor, **kwargs) -> torch.Tensor: """Backward pass with AMP support.""" if self.use_amp and self.scaler is not None: return self.scaler.scale(grad_output) else: return grad_output
[docs] def update_scaler(self): """Update the GradScaler after an optimization step.""" if self.scaler is not None: self.scaler.update()
[docs] def get_scaler_state(self) -> Dict[str, Any]: """Get the current state of the GradScaler.""" if self.scaler is not None: return { 'scale': self.scaler.get_scale(), 'growth_factor': self.scaler.get_growth_factor(), 'backoff_factor': self.scaler.get_backoff_factor(), 'growth_interval': self.scaler.get_growth_interval() } return {}
[docs] class GPUOptimizedSpectralEngine: """GPU-optimized spectral engine with AMP and chunked FFT."""
[docs] def __init__(self, engine_type: str = "fft", use_amp: bool = True, chunk_size: int = 1024, dtype: torch.dtype = torch.float16): self.engine_type = engine_type self.use_amp = use_amp self.chunk_size = chunk_size self.dtype = dtype # Initialize components self.chunked_fft = ChunkedFFT(chunk_size=chunk_size) self.profiler = GPUProfiler() # AMP scaler - only initialize if CUDA is available self.scaler = GradScaler('cuda') if (use_amp and torch.cuda.is_available()) else None
[docs] def forward(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """GPU-optimized forward pass.""" self.profiler.start_timer(f"{self.engine_type}_forward") try: if self.use_amp and x.device.type == 'cuda': with autocast(device_type='cuda', dtype=self.dtype): result = self._compute_spectral_transform(x, alpha) else: result = self._compute_spectral_transform(x, alpha) self.profiler.end_timer(x, result) return result except Exception as e: # Fallback to CPU or different precision warnings.warn(f"GPU optimization failed, falling back: {e}") return self._fallback_compute(x, alpha)
[docs] def _compute_spectral_transform(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """Compute spectral transform with GPU optimizations.""" if self.engine_type == "fft": # Use chunked FFT for large sequences x_fft = self.chunked_fft.fft_chunked(x) # Apply fractional operator in frequency domain N = x_fft.shape[-1] omega = torch.fft.fftfreq(N, device=x.device, dtype=torch.float32) multiplier = torch.pow(torch.abs(omega) + 1e-8, alpha) # Apply multiplier result_fft = x_fft * multiplier # Inverse FFT result = self.chunked_fft.ifft_chunked(result_fft) return result.real elif self.engine_type == "mellin": # GPU-optimized Mellin transform approximation # The Mellin transform M[f](s) = ∫₀^∞ t^(s-1) f(t) dt # For fractional derivatives, we use the relationship: # D^α f(x) = M^(-1)[s^α M[f](s)] # Convert to log-space for Mellin transform approximation # This is a simplified implementation using FFT on log-scaled data x_positive = torch.abs(x) + 1e-8 log_x = torch.log(x_positive) # Apply FFT in log space x_fft = self.chunked_fft.fft_chunked(log_x) # Apply fractional operator N = x_fft.shape[-1] s = torch.arange(N, device=x.device, dtype=torch.float32) multiplier = torch.pow(s + 1.0, alpha) # Apply and inverse transform result_fft = x_fft * multiplier result = self.chunked_fft.ifft_chunked(result_fft) # Convert back from log space result = torch.exp(result.real) # Restore sign result = result * torch.sign(x) return result elif self.engine_type == "laplacian": # GPU-optimized fractional Laplacian x_fft = self.chunked_fft.fft_chunked(x) N = x_fft.shape[-1] xi = torch.fft.fftfreq(N, device=x.device, dtype=torch.float32) multiplier = torch.pow(torch.abs(xi) + 1e-8, alpha) result_fft = x_fft * multiplier result = self.chunked_fft.ifft_chunked(result_fft) return result.real raise ValueError(f"Unknown engine type: {self.engine_type}")
[docs] def spectral_derivative(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """Compute spectral fractional derivative.""" return self.forward(x, alpha)
[docs] def spectral_integral(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """Compute spectral fractional integral.""" # Integral is negative derivative order return self.forward(x, -alpha)
[docs] def spectral_transform(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """Compute spectral transform (returns complex result).""" # We need the complex output before taking real part if self.engine_type == "fft": x_fft = self.chunked_fft.fft_chunked(x) N = x_fft.shape[-1] omega = torch.fft.fftfreq(N, device=x.device, dtype=torch.float32) multiplier = torch.pow(torch.abs(omega) + 1e-8, alpha) result_fft = x_fft * multiplier result = self.chunked_fft.ifft_chunked(result_fft) return result else: # Fallback for other engines (simplify by returning forward result cast to complex) return self.forward(x, alpha).to(torch.complex64)
[docs] def _fallback_compute(self, x: torch.Tensor, alpha: float) -> torch.Tensor: """Fallback computation without GPU optimizations.""" # Simple fallback - just return a scaled version of input return x * (alpha + 1.0)
[docs] def get_performance_summary(self) -> Dict[str, Dict[str, float]]: """Get performance summary.""" return self.profiler.get_summary()
[docs] class GPUOptimizedStochasticSampler: """GPU-optimized stochastic memory sampler."""
[docs] def __init__(self, base_sampler, use_amp: bool = True, batch_size: int = 1024): self.base_sampler = base_sampler self.use_amp = use_amp self.batch_size = batch_size self.profiler = GPUProfiler()
[docs] def sample_indices(self, n: int, k: int) -> torch.Tensor: """GPU-optimized index sampling.""" self.profiler.start_timer("stochastic_sampling") try: if self.use_amp and torch.cuda.is_available(): with autocast(device_type='cuda', dtype=torch.float16): indices = self._gpu_sample_indices(n, k) else: indices = self.base_sampler.sample_indices(n, k) self.profiler.end_timer(torch.tensor([n, k])) return indices except Exception as e: warnings.warn(f"GPU sampling failed, falling back: {e}") return self.base_sampler.sample_indices(n, k)
[docs] def _gpu_sample_indices(self, n: int, k: int) -> torch.Tensor: """GPU-optimized index sampling implementation.""" # For now, use the base sampler but with GPU tensors indices = self.base_sampler.sample_indices(n, k) return indices.to('cuda') if torch.cuda.is_available() else indices
[docs] def sample(self, mu: torch.Tensor, sigma: torch.Tensor, num_samples: int) -> torch.Tensor: """GPU-optimized distribution sampling.""" self.profiler.start_timer("stochastic_sampling_dist") try: if self.use_amp and torch.cuda.is_available(): with autocast(device_type='cuda', dtype=torch.float16): samples = self._gpu_sample(mu, sigma, num_samples) else: if hasattr(self.base_sampler, 'sample'): samples = self.base_sampler.sample(mu, sigma, num_samples) else: # Fallback implementation if base_sampler doesn't have sample # Assuming normal distribution for mu/sigma eps = torch.randn(num_samples, len(mu), device=mu.device, dtype=mu.dtype) samples = mu + sigma * eps self.profiler.end_timer(mu, samples) return samples except Exception as e: warnings.warn(f"GPU sampling failed, falling back: {e}") if hasattr(self.base_sampler, 'sample'): return self.base_sampler.sample(mu, sigma, num_samples) # Simple fallback return mu + sigma * torch.randn(num_samples, len(mu), device=mu.device)
def _gpu_sample(self, mu, sigma, num_samples): # Ensure inputs are on GPU if torch.cuda.is_available(): mu = mu.to('cuda') sigma = sigma.to('cuda') eps = torch.randn(num_samples, len(mu), device=mu.device, dtype=mu.dtype) return mu + sigma * eps
[docs] def get_performance_summary(self) -> Dict[str, Dict[str, float]]: """Get performance summary.""" return self.profiler.get_summary()
[docs] @contextmanager def gpu_optimization_context(use_amp: bool = True, dtype: torch.dtype = torch.float16): """Context manager for GPU optimization.""" context = {'use_amp': use_amp, 'dtype': dtype} if use_amp and torch.cuda.is_available(): with autocast(device_type='cuda', dtype=dtype): yield context else: yield context
[docs] def benchmark_gpu_optimization(): """Benchmark GPU optimization performance.""" print("Benchmarking GPU optimization...") # Test parameters sequence_lengths = [1024, 2048, 4096, 8192] alpha_values = [0.3, 0.5, 0.7] results = {} for length in sequence_lengths: print(f"\nTesting sequence length: {length}") results[length] = {} # Create test data x = torch.randn( 32, length, device='cuda' if torch.cuda.is_available() else 'cpu') for alpha in alpha_values: print(f" Alpha: {alpha}") # Test different configurations configs = [ ("baseline", False, torch.float32), ("amp_fp16", True, torch.float16), ("amp_bf16", True, torch.bfloat16), ] for config_name, use_amp, dtype in configs: if dtype == torch.bfloat16 and not torch.cuda.is_bf16_supported(): continue # Create engine engine = GPUOptimizedSpectralEngine( engine_type="fft", use_amp=use_amp, dtype=dtype ) # Benchmark start_time = time.time() for _ in range(10): # Multiple runs for averaging result = engine.forward(x, alpha) end_time = time.time() avg_time = (end_time - start_time) / 10 if config_name not in results[length]: results[length][config_name] = {} results[length][config_name][alpha] = avg_time print(f" {config_name}: {avg_time:.4f}s") print(f" {config_name}: {avg_time:.4f}s") return { 'gpu_available': torch.cuda.is_available(), 'benchmarks': results, 'summary': {} }
[docs] def create_gpu_optimized_components(use_amp: bool = True, chunk_size: int = 1024, dtype: torch.dtype = torch.float16, enable_profiling: bool = False): """Factory function to create GPU-optimized components.""" # Create base components profiler = GPUProfiler() # Create spectral engine spectral_engine = GPUOptimizedSpectralEngine( engine_type="fft", use_amp=use_amp, chunk_size=chunk_size, dtype=dtype ) # Create AMP engine with a mock base engine for standalone use class MockBaseEngine: def forward(self, x, alpha, **kwargs): return x * alpha amp_engine = AMPFractionalEngine(MockBaseEngine(), use_amp=use_amp, dtype=dtype) # Create stochastic sampler with a simple base sampler class SimpleSampler: def sample_indices(self, n, k): return torch.randperm(n)[:k] def sample(self, mu, sigma, num_samples): eps = torch.randn(num_samples, len(mu), device=mu.device) return mu + sigma * eps stochastic_sampler = GPUOptimizedStochasticSampler(SimpleSampler(), use_amp=use_amp) components = { 'profiler': profiler, 'spectral_engine': spectral_engine, 'amp_engine': amp_engine, 'stochastic_sampler': stochastic_sampler } return components
# Example usage and testing
[docs] def test_gpu_optimization(test_size: int = 1024, num_iterations: int = 10, use_amp: bool = True): """Test GPU optimization functionality. Returns: Dictionary containing test results with test_passed, test_results, and performance_metrics. """ test_results = {} performance_metrics = {} test_passed = True # Test if CUDA is available if not torch.cuda.is_available(): device = 'cpu' else: device = 'cuda' try: # Create test data x = torch.randn(16, test_size, device=device) alpha = 0.5 # Test GPU-optimized spectral engine engine = GPUOptimizedSpectralEngine( engine_type="fft", use_amp=use_amp, chunk_size=512 ) # Test forward pass start_time = time.time() for _ in range(num_iterations): result = engine.forward(x, alpha) end_time = time.time() test_results['spectral_engine'] = True performance_metrics['spectral_engine'] = { 'execution_time': (end_time - start_time) / num_iterations, 'input_shape': tuple(x.shape), 'output_shape': tuple(result.shape) } # Test chunked FFT chunked_fft = ChunkedFFT(chunk_size=256) x_fft = chunked_fft.fft_chunked(x) x_reconstructed = chunked_fft.ifft_chunked(x_fft) reconstruction_error = torch.mean(torch.abs(x - x_reconstructed.real)).item() test_results['chunked_fft'] = reconstruction_error < 0.1 performance_metrics['chunked_fft'] = { 'reconstruction_error': reconstruction_error } except Exception as e: test_passed = False test_results['error'] = str(e) return { 'test_passed': test_passed and all(test_results.values()), 'test_results': test_results, 'performance_metrics': performance_metrics }
if __name__ == "__main__": test_gpu_optimization() if torch.cuda.is_available(): print("\nRunning GPU benchmark...") benchmark_results = benchmark_gpu_optimization() print("Benchmark completed!")