Source code for haive.core.utils.debugkit.profiling.performance

"""
Performance Profiling Utilities

Provides comprehensive performance analysis including line-by-line profiling,
memory analysis, CPU profiling, and execution optimization insights.
"""

import functools
import subprocess
import time
from collections.abc import Callable
from contextlib import contextmanager, suppress
from pathlib import Path

# Try to import profiling tools
try:
    import line_profiler

    HAS_LINE_PROFILER = True
except ImportError:
    HAS_LINE_PROFILER = False

try:
    import memory_profiler

    HAS_MEMORY_PROFILER = True
except ImportError:
    HAS_MEMORY_PROFILER = False

try:
    import pyinstrument

    HAS_PYINSTRUMENT = True
except ImportError:
    HAS_PYINSTRUMENT = False

try:
    HAS_SCALENE = True
except ImportError:
    HAS_SCALENE = False

try:
    HAS_PY_SPY = True
except ImportError:
    HAS_PY_SPY = False

try:
    import psutil

    HAS_PSUTIL = True
except ImportError:
    HAS_PSUTIL = False

try:
    from rich.console import Console
    from rich.progress import track
    from rich.table import Table

    HAS_RICH = True
except ImportError:
    HAS_RICH = False


[docs] class TimingProfiler: """Simple timing profiler for function execution.""" def __init__(self): self.timings: dict[str, list[float]] = {} self.console = Console() if HAS_RICH else None
[docs] def time_function(self, func: Callable) -> Callable: """Decorator to time function execution.""" @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.perf_counter() try: result = func(*args, **kwargs) return result finally: end_time = time.perf_counter() duration = end_time - start_time func_name = f"{func.__module__}.{func.__name__}" if func_name not in self.timings: self.timings[func_name] = [] self.timings[func_name].append(duration) if HAS_RICH and self.console: self.console.print(f"⏱️ {func_name}: {duration:.6f}s", style="blue") else: pass return wrapper
[docs] def get_stats(self) -> dict[str, dict[str, float]]: """Get timing statistics for all functions.""" stats = {} for func_name, times in self.timings.items(): if times: stats[func_name] = { "count": len(times), "total": sum(times), "mean": sum(times) / len(times), "min": min(times), "max": max(times), } if HAS_RICH and self.console: table = Table(title="Timing Statistics") table.add_column("Function", style="cyan") table.add_column("Count", style="green") table.add_column("Total (s)", style="yellow") table.add_column("Mean (s)", style="magenta") table.add_column("Min (s)", style="blue") table.add_column("Max (s)", style="red") for func_name, stat in stats.items(): table.add_row( func_name, str(stat["count"]), f"{stat['total']:.6f}", f"{stat['mean']:.6f}", f"{stat['min']:.6f}", f"{stat['max']:.6f}", ) self.console.print(table) return stats
[docs] def clear(self) -> None: """Clear timing data.""" self.timings.clear()
[docs] class MemoryProfiler: """Memory usage profiler.""" def __init__(self): self.baseline_memory = self._get_memory_usage() self.console = Console() if HAS_RICH else None def _get_memory_usage(self) -> float: """Get current memory usage in MB.""" if HAS_PSUTIL: process = psutil.Process() return process.memory_info().rss / 1024 / 1024 else: # Fallback using memory_profiler if available if HAS_MEMORY_PROFILER: return memory_profiler.memory_usage()[0] return 0.0
[docs] def profile_memory(self, func: Callable) -> Callable: """Decorator to profile memory usage.""" @functools.wraps(func) def wrapper(*args, **kwargs): # Get memory before memory_before = self._get_memory_usage() try: result = func(*args, **kwargs) return result finally: # Get memory after memory_after = self._get_memory_usage() memory_delta = memory_after - memory_before func_name = f"{func.__module__}.{func.__name__}" if HAS_RICH and self.console: color = "red" if memory_delta > 0 else "green" self.console.print( f"💾 {func_name}: {memory_delta:+.2f} MB (total: {memory_after:.2f} MB)", style=color, ) else: pass return wrapper
[docs] def memory_line_by_line(self, func: Callable) -> Callable: """Profile memory usage line by line using memory_profiler.""" if not HAS_MEMORY_PROFILER: return self.profile_memory(func) # Add @profile decorator for memory_profiler return memory_profiler.profile(func)
[docs] def get_current_usage(self) -> dict[str, float]: """Get current memory usage statistics.""" current = self._get_memory_usage() delta_from_baseline = current - self.baseline_memory stats = { "current_mb": current, "baseline_mb": self.baseline_memory, "delta_mb": delta_from_baseline, } if HAS_RICH and self.console: table = Table(title="Memory Usage") table.add_column("Metric", style="cyan") table.add_column("Value (MB)", style="green") table.add_row("Current", f"{current:.2f}") table.add_row("Baseline", f"{self.baseline_memory:.2f}") table.add_row("Delta", f"{delta_from_baseline:+.2f}") self.console.print(table) else: pass return stats
[docs] class LineProfiler: """Line-by-line performance profiler.""" def __init__(self): self.profiler = line_profiler.LineProfiler() if HAS_LINE_PROFILER else None self.console = Console() if HAS_RICH else None
[docs] def profile_lines(self, func: Callable) -> Callable: """Profile function line by line.""" if not HAS_LINE_PROFILER: return func self.profiler.add_function(func) @functools.wraps(func) def wrapper(*args, **kwargs): self.profiler.enable_by_count() try: result = func(*args, **kwargs) return result finally: self.profiler.disable_by_count() return wrapper
[docs] def show_stats(self, filename: str | None = None) -> None: """Show line profiling statistics.""" if not self.profiler: return if filename: with open(filename, "w") as f: self.profiler.print_stats(stream=f) else: self.profiler.print_stats()
[docs] class CPUProfiler: """CPU profiling utilities.""" def __init__(self): self.console = Console() if HAS_RICH else None
[docs] def profile_cpu(self, func: Callable, duration: int = 10) -> Callable: """Profile CPU usage with pyinstrument.""" if not HAS_PYINSTRUMENT: return func @functools.wraps(func) def wrapper(*args, **kwargs): profiler = pyinstrument.Profiler() profiler.start() try: result = func(*args, **kwargs) return result finally: profiler.stop() if HAS_RICH and self.console: self.console.print("🔥 CPU Profiling Results:", style="bold red") return wrapper
[docs] def profile_with_scalene( self, script_path: str, output_dir: str = "scalene_profiles" ) -> None: """Profile a script with Scalene.""" if not HAS_SCALENE: return Path(output_dir).mkdir(exist_ok=True) output_file = Path(output_dir) / f"profile_{int(time.time())}.html" cmd = ["scalene", "--html", "--outfile", str(output_file), script_path] with suppress(subprocess.CalledProcessError): subprocess.run(cmd, check=True)
[docs] class ProfilingUtilities: """Comprehensive profiling utilities.""" def __init__(self): self.timing_profiler = TimingProfiler() self.memory_profiler = MemoryProfiler() self.line_profiler = LineProfiler() self.cpu_profiler = CPUProfiler() self.console = Console() if HAS_RICH else None
[docs] def time(self, func: Callable | None = None) -> Callable: """Time function execution.""" if func: return self.timing_profiler.time_function(func) else: return self.timing_profiler.time_function
[docs] def memory( self, func: Callable | None = None, line_by_line: bool = False ) -> Callable: """Profile memory usage.""" if line_by_line: profiler_func = self.memory_profiler.memory_line_by_line else: profiler_func = self.memory_profiler.profile_memory if func: return profiler_func(func) else: return profiler_func
[docs] def line(self, func: Callable | None = None) -> Callable: """Profile line-by-line execution.""" if func: return self.line_profiler.profile_lines(func) else: return self.line_profiler.profile_lines
[docs] def cpu(self, func: Callable | None = None) -> Callable: """Profile CPU usage.""" if func: return self.cpu_profiler.profile_cpu(func) else: return self.cpu_profiler.profile_cpu
[docs] def comprehensive(self, func: Callable) -> Callable: """Apply comprehensive profiling (time + memory + CPU).""" @functools.wraps(func) def wrapper(*args, **kwargs): if HAS_RICH and self.console: self.console.print( f"🔬 Starting comprehensive profiling of {func.__name__}", style="bold blue", ) else: pass # Apply multiple profilers profiled_func = self.timing_profiler.time_function( self.memory_profiler.profile_memory(self.cpu_profiler.profile_cpu(func)) ) return profiled_func(*args, **kwargs) return wrapper
[docs] @contextmanager def profile_context( self, name: str = "profile", include_memory: bool = True, include_cpu: bool = True, ): """Context manager for profiling a block of code.""" start_time = time.perf_counter() start_memory = ( self.memory_profiler._get_memory_usage() if include_memory else None ) cpu_profiler = None if include_cpu and HAS_PYINSTRUMENT: cpu_profiler = pyinstrument.Profiler() cpu_profiler.start() if HAS_RICH and self.console: self.console.print(f"🏁 Starting profile context: {name}", style="blue") else: pass try: yield self finally: end_time = time.perf_counter() duration = end_time - start_time if cpu_profiler: cpu_profiler.stop() # Report results if HAS_RICH and self.console: table = Table(title=f"Profile Results: {name}") table.add_column("Metric", style="cyan") table.add_column("Value", style="green") table.add_row("Duration", f"{duration:.6f}s") if start_memory: end_memory = self.memory_profiler._get_memory_usage() memory_delta = end_memory - start_memory table.add_row("Memory Delta", f"{memory_delta:+.2f} MB") table.add_row("Final Memory", f"{end_memory:.2f} MB") self.console.print(table) if cpu_profiler: self.console.print("🔥 CPU Profile:", style="bold red") else: if start_memory: end_memory = self.memory_profiler._get_memory_usage() memory_delta = end_memory - start_memory if cpu_profiler: pass
[docs] def benchmark( self, func: Callable, iterations: int = 1000, warmup: int = 100 ) -> dict[str, float]: """Benchmark a function with multiple iterations.""" if HAS_RICH and self.console: self.console.print( f"🏃 Benchmarking {func.__name__} ({iterations} iterations)", style="bold green", ) else: pass # Warmup for _ in range(warmup): func() # Actual benchmark times = [] if HAS_RICH: iterations_range = track(range(iterations), description="Benchmarking...") else: iterations_range = range(iterations) for _ in iterations_range: start_time = time.perf_counter() func() end_time = time.perf_counter() times.append(end_time - start_time) # Calculate statistics total_time = sum(times) mean_time = total_time / len(times) min_time = min(times) max_time = max(times) # Calculate percentiles sorted_times = sorted(times) p50 = sorted_times[len(sorted_times) // 2] p95 = sorted_times[int(len(sorted_times) * 0.95)] p99 = sorted_times[int(len(sorted_times) * 0.99)] stats = { "iterations": iterations, "total_time": total_time, "mean_time": mean_time, "min_time": min_time, "max_time": max_time, "p50_time": p50, "p95_time": p95, "p99_time": p99, "ops_per_second": iterations / total_time, } if HAS_RICH and self.console: table = Table(title=f"Benchmark Results: {func.__name__}") table.add_column("Metric", style="cyan") table.add_column("Value", style="green") table.add_row("Iterations", str(iterations)) table.add_row("Total Time", f"{total_time:.6f}s") table.add_row("Mean Time", f"{mean_time:.6f}s") table.add_row("Min Time", f"{min_time:.6f}s") table.add_row("Max Time", f"{max_time:.6f}s") table.add_row("P50 Time", f"{p50:.6f}s") table.add_row("P95 Time", f"{p95:.6f}s") table.add_row("P99 Time", f"{p99:.6f}s") table.add_row("Ops/Second", f"{stats['ops_per_second']:.2f}") self.console.print(table) else: for _key, _value in stats.items(): pass return stats
[docs] def compare( self, funcs: list[Callable], iterations: int = 1000 ) -> dict[str, dict[str, float]]: """Compare performance of multiple functions.""" results = {} for func in funcs: results[func.__name__] = self.benchmark(func, iterations) # Show comparison if HAS_RICH and self.console: table = Table(title="Performance Comparison") table.add_column("Function", style="cyan") table.add_column("Mean Time", style="green") table.add_column("Ops/Second", style="yellow") table.add_column("Relative Speed", style="magenta") # Find fastest function fastest_ops = max(result["ops_per_second"] for result in results.values()) for func_name, result in results.items(): relative_speed = result["ops_per_second"] / fastest_ops table.add_row( func_name, f"{result['mean_time']:.6f}s", f"{result['ops_per_second']:.2f}", f"{relative_speed:.2f}x", ) self.console.print(table) return results
[docs] def stats(self) -> None: """Show all profiling statistics.""" if HAS_RICH and self.console: self.console.print("📊 Profiling Statistics", style="bold blue") else: pass self.timing_profiler.get_stats() self.memory_profiler.get_current_usage() if HAS_LINE_PROFILER: self.line_profiler.show_stats()
[docs] def clear(self) -> None: """Clear all profiling data.""" self.timing_profiler.clear()
[docs] def status(self) -> dict[str, bool]: """Get status of available profiling tools.""" status = { "line_profiler": HAS_LINE_PROFILER, "memory_profiler": HAS_MEMORY_PROFILER, "pyinstrument": HAS_PYINSTRUMENT, "scalene": HAS_SCALENE, "py_spy": HAS_PY_SPY, "psutil": HAS_PSUTIL, "rich": HAS_RICH, } if HAS_RICH and self.console: table = Table(title="Profiling Tools Status") table.add_column("Tool", style="cyan") table.add_column("Available", style="green") for tool, available in status.items(): table.add_row(tool, "✅" if available else "❌") self.console.print(table) else: for tool, available in status.items(): pass return status
# Create global profile instance profile = ProfilingUtilities()