#!/usr/bin/env python3 """ Video Transcoding Benchmark Tool Measures how many simultaneous 1080p streams can be transcoded in real-time or better. """ import subprocess import sys import time import tempfile import os import threading import argparse from pathlib import Path from typing import Optional, Tuple, List class HardwareAcceleration: """Detect and configure hardware acceleration.""" @staticmethod def detect() -> Tuple[str, str, str]: """ Detect available hardware acceleration. Returns: (name, encoder, hwaccel_args) """ # Check NVIDIA NVENC if HardwareAcceleration._check_encoder('h264_nvenc'): return ('NVIDIA NVENC', 'h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda') # Check Intel QSV if HardwareAcceleration._check_encoder('h264_qsv'): return ('Intel Quick Sync', 'h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv') # Check AMD AMF (Windows/Linux) if HardwareAcceleration._check_encoder('h264_amf'): return ('AMD AMF', 'h264_amf', '') # Check VideoToolbox (macOS/iOS - ARM) if HardwareAcceleration._check_encoder('h264_videotoolbox'): return ('VideoToolbox', 'h264_videotoolbox', '-hwaccel videotoolbox') # Check VA-API (Linux Intel/AMD) if HardwareAcceleration._check_encoder('h264_vaapi'): return ('VA-API', 'h264_vaapi', '-hwaccel vaapi -hwaccel_output_format vaapi -hwaccel_device /dev/dri/renderD128') # Fallback to software return ('Software (libx264)', 'libx264', '') @staticmethod def _check_encoder(encoder: str) -> bool: """Check if FFmpeg supports a specific encoder.""" try: result = subprocess.run( ['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True, timeout=5 ) return encoder in result.stdout except (subprocess.TimeoutExpired, FileNotFoundError): return False class TestVideo: """Generate or manage test video file.""" @staticmethod def generate(path: str, duration: int = 30) -> bool: """Generate a 1080p test video.""" try: print(f"Generating {duration}s 1080p test video...") cmd = [ 'ffmpeg', '-y', '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:rate=30:duration={}'.format(duration), '-f', 'lavfi', '-i', 'sine=frequency=1000:duration={}'.format(duration), '-c:v', 'libx264', '-preset', 'medium', '-c:a', 'aac', '-b:a', '128k', path ] result = subprocess.run(cmd, capture_output=True, timeout=120) return result.returncode == 0 except Exception as e: print(f"Error generating test video: {e}") return False class TranscodeJob: """Represents a single transcode job.""" def __init__(self, input_file: str, output_file: str, encoder: str, hwaccel_args: str): self.input_file = input_file self.output_file = output_file self.encoder = encoder self.hwaccel_args = hwaccel_args self.process = None self.start_time = None self.end_time = None self.success = False self.fps = 0.0 def run(self): """Execute the transcode job.""" try: # Build FFmpeg command cmd = ['ffmpeg', '-y'] # Add hardware acceleration args if self.hwaccel_args: cmd.extend(self.hwaccel_args.split()) cmd.extend([ '-i', self.input_file, '-c:v', self.encoder, '-b:v', '4M', '-c:a', 'aac', '-b:a', '128k', '-f', 'null' if os.name != 'nt' else 'null', self.output_file if os.name != 'nt' else '-' ]) self.start_time = time.time() self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for completion _, stderr = self.process.communicate() self.end_time = time.time() # Parse FPS from FFmpeg output self.fps = self._parse_fps(stderr) self.success = self.process.returncode == 0 except Exception as e: self.success = False self.end_time = time.time() def _parse_fps(self, ffmpeg_output: str) -> float: """Parse average FPS from FFmpeg output.""" try: # Look for the final fps value in output for line in ffmpeg_output.split('\n'): if 'fps=' in line: fps_str = line.split('fps=')[1].split()[0] return float(fps_str) except: pass return 0.0 class Benchmark: """Main benchmark orchestrator.""" def __init__(self, test_video: str, encoder: str, hwaccel_args: str, accel_name: str): self.test_video = test_video self.encoder = encoder self.hwaccel_args = hwaccel_args self.accel_name = accel_name def run_parallel_transcodes(self, num_streams: int, timeout: int = 60) -> Tuple[bool, float]: """ Run multiple parallel transcode jobs. Returns: (success, average_fps) """ jobs = [] threads = [] # Create jobs for i in range(num_streams): output_file = f'/dev/null' if os.name != 'nt' else 'NUL' job = TranscodeJob(self.test_video, output_file, self.encoder, self.hwaccel_args) jobs.append(job) # Start all jobs in parallel for job in jobs: thread = threading.Thread(target=job.run) thread.start() threads.append(thread) # Wait for all to complete (with timeout) start = time.time() for thread in threads: remaining = timeout - (time.time() - start) if remaining > 0: thread.join(timeout=remaining) else: return False, 0.0 # Check if all succeeded and calculate average FPS all_success = all(job.success for job in jobs) if all_success: avg_fps = sum(job.fps for job in jobs) / len(jobs) if jobs else 0.0 return True, avg_fps return False, 0.0 def find_max_streams(self, min_fps: float = 30.0) -> int: """ Use binary search to find maximum number of simultaneous streams. A stream is considered "real-time" if it achieves >= min_fps. """ print(f"\nBenchmarking with {self.accel_name}...") print("Finding maximum simultaneous 1080p streams at real-time or better...\n") # Binary search bounds low, high = 1, 64 max_streams = 0 while low <= high: mid = (low + high) // 2 print(f"Testing {mid} simultaneous streams...", end=' ', flush=True) success, avg_fps = self.run_parallel_transcodes(mid) if success and avg_fps >= min_fps: print(f"✓ (avg {avg_fps:.1f} fps)") max_streams = mid low = mid + 1 else: if success: print(f"✗ (avg {avg_fps:.1f} fps - below real-time)") else: print(f"✗ (failed)") high = mid - 1 return max_streams def main(): parser = argparse.ArgumentParser( description='Benchmark video transcoding performance', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--duration', type=int, default=30, help='Test video duration in seconds (default: 30)' ) parser.add_argument( '--input', type=str, help='Use existing video file instead of generating test video' ) parser.add_argument( '--min-fps', type=float, default=30.0, help='Minimum FPS to consider real-time (default: 30.0)' ) args = parser.parse_args() # Check for FFmpeg try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): print("Error: FFmpeg not found. Please install FFmpeg and ensure it's in your PATH.") return 1 print("=" * 60) print("Video Transcoding Benchmark") print("=" * 60) # Detect hardware acceleration accel_name, encoder, hwaccel_args = HardwareAcceleration.detect() print(f"Detected acceleration: {accel_name}") print(f"Encoder: {encoder}") # Prepare test video if args.input: test_video = args.input if not os.path.exists(test_video): print(f"Error: Input file '{test_video}' not found.") return 1 else: with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f: test_video = f.name if not TestVideo.generate(test_video, args.duration): print("Error: Failed to generate test video.") return 1 try: # Run benchmark benchmark = Benchmark(test_video, encoder, hwaccel_args, accel_name) max_streams = benchmark.find_max_streams(args.min_fps) # Display results print("\n" + "=" * 60) print("BENCHMARK RESULTS") print("=" * 60) print(f"Hardware Acceleration: {accel_name}") print(f"Maximum Simultaneous 1080p Streams: {max_streams}") print(f"(at {args.min_fps} FPS or better)") print("=" * 60) # Also output just the number for easy parsing print(f"\nBenchmark Score: {max_streams}") finally: # Cleanup if not args.input and os.path.exists(test_video): os.unlink(test_video) return 0 if __name__ == '__main__': sys.exit(main())