#!/usr/bin/env python3 """ Video Transcoding Benchmark Tool Measures how many simultaneous 1080p streams can be transcoded in real-time or better. """ import subprocess import sys import time import tempfile import os import threading import argparse import signal import atexit from pathlib import Path from typing import Optional, Tuple, List def restore_terminal(): """Restore terminal settings.""" if sys.platform != 'win32': os.system('stty sane') # Register cleanup function atexit.register(restore_terminal) class HardwareAcceleration: """Detect and configure hardware acceleration.""" @staticmethod def detect(debug=False) -> Tuple[str, str, str, bool]: """ Detect available hardware acceleration. Returns: (name, encoder, hwaccel_args, hw_decode_supported) """ warnings = [] if debug: print("\n=== Hardware Detection Debug ===") # Check NVIDIA NVENC if debug: print("Testing NVIDIA NVENC...", end=' ') result = HardwareAcceleration._test_hardware('h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', debug) if result[0]: if debug: print("✓") return ('NVIDIA NVENC', 'h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check Intel QSV if debug: print("Testing Intel QSV...", end=' ') result = HardwareAcceleration._test_hardware('h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', debug) if result[0]: if debug: print("✓") return ('Intel Quick Sync', 'h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check AMD AMF (Windows/Linux) if debug: print("Testing AMD AMF...", end=' ') result = HardwareAcceleration._test_hardware('h264_amf', '', debug) if result[0]: if debug: print("✓") return ('AMD AMF', 'h264_amf', '', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check VideoToolbox (macOS/iOS - ARM) if debug: print("Testing VideoToolbox...", end=' ') result = HardwareAcceleration._test_hardware('h264_videotoolbox', '-hwaccel videotoolbox', debug) if result[0]: if debug: print("✓") return ('VideoToolbox', 'h264_videotoolbox', '-hwaccel videotoolbox', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check VA-API (Linux Intel/AMD) if debug: print("Testing VA-API...", end=' ') result = HardwareAcceleration._test_vaapi(debug) if result[0]: if debug: print("✓") return (result[1], result[2], result[3], result[4]) # return (name, encoder, hwaccel_args, hw_decode) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) if debug: print("=== End Debug ===\n") # Print warnings if we fell back to software if warnings: print("\nNote: Hardware acceleration not available:") for warning in warnings: print(f" - {warning}") print() # Fallback to software return ('Software (libx264)', 'libx264', '', False) @staticmethod def _test_hardware(encoder: str, hwaccel_args: str, debug=False) -> Tuple[bool, Optional[str]]: """ Test if hardware encoder is actually available by attempting to use it. Returns: (success, warning_message) """ try: # Build a quick test encode command cmd = ['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error'] # Add hardware acceleration args if present if hwaccel_args: cmd.extend(hwaccel_args.split()) # Generate 1 frame of test video and try to encode it cmd.extend([ '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1', '-frames:v', '1', '-c:v', encoder, '-f', 'null', '-' if os.name != 'nt' else 'NUL' ]) result = subprocess.run( cmd, capture_output=True, timeout=10 ) if debug and result.returncode != 0: print(f"\n Error: {result.stderr.decode() if result.stderr else 'Unknown error'}") return (result.returncode == 0, None) except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: if debug: print(f"\n Exception: {e}") return (False, None) @staticmethod def _test_vaapi(debug=False) -> Tuple: """ Test VA-API with better error handling and permission checks. Returns: (name, encoder, hwaccel_args) if success or (False, warning, '') """ # Check if VA-API encoder is available in FFmpeg try: result = subprocess.run( ['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True, timeout=5 ) if 'h264_vaapi' not in result.stdout: if debug: print("\n h264_vaapi encoder not found in FFmpeg") return (False, None, '') except Exception as e: if debug: print(f"\n Error checking encoders: {e}") return (False, None, '') # Check for /dev/dri devices dri_devices = [] try: dri_path = Path('/dev/dri') if dri_path.exists(): dri_devices = sorted([str(d) for d in dri_path.glob('renderD*')]) except Exception as e: if debug: print(f"\n Error checking /dev/dri: {e}") pass if not dri_devices: if debug: print("\n No /dev/dri/renderD* devices found") return (False, None, '') # Check permissions on first device device = dri_devices[0] if debug: print(f"\n Found device: {device}") print(f" Can read: {os.access(device, os.R_OK)}") print(f" Can write: {os.access(device, os.W_OK)}") print(f" Process groups: {os.getgroups()}") if not os.access(device, os.R_OK | os.W_OK): return (False, f"VA-API device {device} exists but not accessible. Run: sudo usermod -a -G render $USER (then logout/login)", '', '', False) # Test VA-API for both encoding and decoding try: with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f: test_video = f.name if debug: print(f" Creating test video for VA-API...") # First create a small H.264 test video using software encoding cmd1 = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1', '-frames:v', '2', '-c:v', 'libx264', '-preset', 'ultrafast', test_video ] result1 = subprocess.run(cmd1, capture_output=True, timeout=5) if debug: print(f" Test video creation: {'success' if result1.returncode == 0 else 'failed'}") if result1.returncode != 0: print(f" Error: {result1.stderr.decode() if result1.stderr else 'Unknown'}") if result1.returncode == 0: # Test 1: HW decode + HW encode if debug: print(f" Testing VA-API HW decode + HW encode...") cmd2 = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-vaapi_device', device, '-hwaccel', 'vaapi', '-hwaccel_output_format', 'vaapi', '-i', test_video, '-frames:v', '1', '-c:v', 'h264_vaapi', '-f', 'null', '-' if os.name != 'nt' else 'NUL' ] result2 = subprocess.run(cmd2, capture_output=True, timeout=5) if debug: print(f" HW decode + HW encode: {'success' if result2.returncode == 0 else 'failed'}") if result2.returncode != 0: print(f" Error: {result2.stderr.decode() if result2.stderr else 'Unknown'}") # Test 2: SW decode + HW encode (fallback if HW decode doesn't work) hw_decode_works = result2.returncode == 0 if not hw_decode_works: if debug: print(f" Testing VA-API SW decode + HW encode...") cmd3 = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-vaapi_device', device, '-i', test_video, '-vf', 'format=nv12,hwupload', '-frames:v', '1', '-c:v', 'h264_vaapi', '-f', 'null', '-' if os.name != 'nt' else 'NUL' ] result3 = subprocess.run(cmd3, capture_output=True, timeout=5) if debug: print(f" SW decode + HW encode: {'success' if result3.returncode == 0 else 'failed'}") if result3.returncode != 0: print(f" Error: {result3.stderr.decode() if result3.stderr else 'Unknown'}") if result3.returncode == 0: os.unlink(test_video) return (True, 'VA-API (Intel/AMD) [SW decode + HW encode]', 'h264_vaapi', f'-vaapi_device {device}', False) os.unlink(test_video) if hw_decode_works: return (True, 'VA-API (Intel/AMD) [HW decode + HW encode]', 'h264_vaapi', f'-vaapi_device {device} -hwaccel vaapi -hwaccel_output_format vaapi', True) if os.path.exists(test_video): os.unlink(test_video) except Exception as e: if debug: print(f" Exception during VA-API test: {e}") pass return (False, None, '', '', False) class TestVideo: """Generate or manage test video file.""" @staticmethod def generate(path: str, duration: int = 30) -> bool: """Generate a 1080p test video.""" try: print(f"Generating {duration}s 1080p test video...") cmd = [ 'ffmpeg', '-y', '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:rate=30:duration={}'.format(duration), '-f', 'lavfi', '-i', 'sine=frequency=1000:duration={}'.format(duration), '-c:v', 'libx264', '-preset', 'medium', '-c:a', 'aac', '-b:a', '128k', path ] result = subprocess.run(cmd, capture_output=True, timeout=120) return result.returncode == 0 except Exception as e: print(f"Error generating test video: {e}") return False class TranscodeJob: """Represents a single transcode job.""" def __init__(self, input_file: str, output_file: str, encoder: str, hwaccel_args: str, hw_decode: bool): self.input_file = input_file self.output_file = output_file self.encoder = encoder self.hwaccel_args = hwaccel_args self.hw_decode = hw_decode self.process = None self.start_time = None self.end_time = None self.success = False self.fps = 0.0 def run(self): """Execute the transcode job.""" try: # Build FFmpeg command cmd = ['ffmpeg', '-y', '-nostdin', '-stats'] # Add hardware acceleration args (before input) if self.hwaccel_args: cmd.extend(self.hwaccel_args.split()) cmd.extend(['-i', self.input_file]) # For VA-API without HW decode, we need to upload to hardware if 'vaapi' in self.encoder and not self.hw_decode: cmd.extend(['-vf', 'format=nv12,hwupload']) # Build encoding parameters encode_params = ['-c:v', self.encoder] # For VA-API, use CQP mode instead of bitrate if needed if 'vaapi' in self.encoder: # Use constant quality mode (lower is better quality, 20-30 is good) encode_params.extend(['-qp', '23']) else: # Use bitrate mode for other encoders encode_params.extend(['-b:v', '4M']) cmd.extend(encode_params) cmd.extend([ '-c:a', 'aac', '-b:a', '128k', '-f', 'null' if os.name != 'nt' else 'null', self.output_file if os.name != 'nt' else '-' ]) self.start_time = time.time() self.process = subprocess.Popen( cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for completion _, stderr = self.process.communicate() self.end_time = time.time() # Parse FPS from FFmpeg output self.fps = self._parse_fps(stderr) self.success = self.process.returncode == 0 # Store stderr for debugging self.stderr = stderr except Exception as e: self.success = False self.end_time = time.time() self.stderr = str(e) def _parse_fps(self, ffmpeg_output: str) -> float: """Parse average FPS from FFmpeg output.""" try: # Look for the last line with fps= (the final summary line) fps_value = None for line in ffmpeg_output.split('\n'): if 'fps=' in line and 'frame=' in line: # Extract fps value try: fps_str = line.split('fps=')[1].split()[0] fps_value = float(fps_str) except: pass if fps_value and fps_value > 0: return fps_value # Fallback: calculate from time and speed # Look for lines like "time=00:00:30.00" and "speed=2.5x" time_seconds = None speed_multiplier = None for line in ffmpeg_output.split('\n'): if 'time=' in line: try: time_str = line.split('time=')[1].split()[0] # Parse time format HH:MM:SS.MS parts = time_str.split(':') if len(parts) == 3: h, m, s = parts time_seconds = int(h) * 3600 + int(m) * 60 + float(s) except: pass if 'speed=' in line: try: speed_str = line.split('speed=')[1].split('x')[0] speed_multiplier = float(speed_str) except: pass # Calculate effective FPS: if encoding 30fps video at 2x speed, effective fps = 60 if time_seconds and speed_multiplier and self.end_time and self.start_time: wall_time = self.end_time - self.start_time if wall_time > 0: # Assume input is 30fps input_fps = 30.0 frames = time_seconds * input_fps return frames / wall_time except Exception as e: pass return 0.0 class Benchmark: """Main benchmark orchestrator.""" def __init__(self, test_video: str, encoder: str, hwaccel_args: str, accel_name: str, hw_decode: bool): self.test_video = test_video self.encoder = encoder self.hwaccel_args = hwaccel_args self.accel_name = accel_name self.hw_decode = hw_decode def run_parallel_transcodes(self, num_streams: int, timeout: int = 60) -> Tuple[bool, float]: """ Run multiple parallel transcode jobs. Returns: (success, average_fps) """ jobs = [] threads = [] # Create jobs for i in range(num_streams): output_file = f'/dev/null' if os.name != 'nt' else 'NUL' job = TranscodeJob(self.test_video, output_file, self.encoder, self.hwaccel_args, self.hw_decode) jobs.append(job) # Start all jobs in parallel for job in jobs: thread = threading.Thread(target=job.run) thread.start() threads.append(thread) # Wait for all to complete (with timeout) start = time.time() for thread in threads: remaining = timeout - (time.time() - start) if remaining > 0: thread.join(timeout=remaining) else: return False, 0.0 # Check if all succeeded and calculate average FPS all_success = all(job.success for job in jobs) if all_success: avg_fps = sum(job.fps for job in jobs) / len(jobs) if jobs else 0.0 return True, avg_fps # If failed, print first error for debugging for job in jobs: if not job.success and hasattr(job, 'stderr'): # Skip FFmpeg header and show actual error (last 10 non-empty lines) error_lines = [line for line in job.stderr.split('\n') if line.strip()] if len(error_lines) > 10: error_lines = error_lines[-10:] print(f"\n Debug - Error output:") for line in error_lines: print(f" {line}") break return False, 0.0 def find_max_streams(self, min_fps: float = 30.0) -> int: """ Use binary search to find maximum number of simultaneous streams. A stream is considered "real-time" if it achieves >= min_fps. """ print(f"\nBenchmarking with {self.accel_name}...") print("Finding maximum simultaneous 1080p streams at real-time or better...\n") # First verify that 1 stream works print(f"Testing 1 simultaneous stream...", end=' ', flush=True) success, avg_fps = self.run_parallel_transcodes(1) if not success: print(f"✗ (failed)") return 0 if avg_fps < min_fps: print(f"✗ (avg {avg_fps:.1f} fps - below real-time)") return 0 print(f"✓ (avg {avg_fps:.1f} fps)") max_streams = 1 # Estimate maximum possible streams based on single stream performance # If 1 stream achieves X fps, we can theoretically handle X/min_fps streams # Use 80% of theoretical max to account for overhead theoretical_max = int((avg_fps / min_fps) * 0.8) # Cap the search space reasonably estimated_max = max(2, min(theoretical_max, 128)) print(f"Estimated capacity: ~{theoretical_max} streams (searching up to {estimated_max})") # Binary search bounds low, high = 2, estimated_max while low <= high: mid = (low + high) // 2 print(f"Testing {mid} simultaneous streams...", end=' ', flush=True) success, avg_fps = self.run_parallel_transcodes(mid) if success and avg_fps >= min_fps: print(f"✓ (avg {avg_fps:.1f} fps)") max_streams = mid low = mid + 1 else: if success: print(f"✗ (avg {avg_fps:.1f} fps - below real-time)") else: print(f"✗ (failed)") high = mid - 1 return max_streams def main(): parser = argparse.ArgumentParser( description='Benchmark video transcoding performance', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--duration', type=int, default=30, help='Test video duration in seconds (default: 30)' ) parser.add_argument( '--input', type=str, help='Use existing video file instead of generating test video' ) parser.add_argument( '--min-fps', type=float, default=30.0, help='Minimum FPS to consider real-time (default: 30.0)' ) parser.add_argument( '--debug', action='store_true', help='Enable debug output for hardware detection' ) args = parser.parse_args() # Check for FFmpeg try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): print("Error: FFmpeg not found. Please install FFmpeg and ensure it's in your PATH.") return 1 print("=" * 60) print("Video Transcoding Benchmark") print("=" * 60) # Detect hardware acceleration accel_name, encoder, hwaccel_args, hw_decode = HardwareAcceleration.detect(debug=args.debug) print(f"Detected acceleration: {accel_name}") print(f"Encoder: {encoder}") # Prepare test video if args.input: test_video = args.input if not os.path.exists(test_video): print(f"Error: Input file '{test_video}' not found.") return 1 else: with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f: test_video = f.name if not TestVideo.generate(test_video, args.duration): print("Error: Failed to generate test video.") return 1 try: # Run benchmark benchmark = Benchmark(test_video, encoder, hwaccel_args, accel_name, hw_decode) max_streams = benchmark.find_max_streams(args.min_fps) # Display results print("\n" + "=" * 60) print("BENCHMARK RESULTS") print("=" * 60) print(f"Hardware Acceleration: {accel_name}") print(f"Maximum Simultaneous 1080p Streams: {max_streams}") print(f"(at {args.min_fps} FPS or better)") print("=" * 60) # Also output just the number for easy parsing print(f"\nBenchmark Score: {max_streams}") finally: # Cleanup if not args.input and os.path.exists(test_video): os.unlink(test_video) return 0 if __name__ == '__main__': sys.exit(main())