#!/usr/bin/env python3 """ Video Transcoding Benchmark Tool Measures how many simultaneous 1080p streams can be transcoded in real-time or better. """ import subprocess import sys import time import tempfile import os import threading import argparse from pathlib import Path from typing import Optional, Tuple, List class HardwareAcceleration: """Detect and configure hardware acceleration.""" @staticmethod def detect(debug=False) -> Tuple[str, str, str, bool]: """ Detect available hardware acceleration. Returns: (name, encoder, hwaccel_args, hw_decode_supported) """ warnings = [] if debug: print("\n=== Hardware Detection Debug ===") # Check NVIDIA NVENC if debug: print("Testing NVIDIA NVENC...", end=' ') result = HardwareAcceleration._test_hardware('h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', debug) if result[0]: if debug: print("✓") return ('NVIDIA NVENC', 'h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check Intel QSV if debug: print("Testing Intel QSV...", end=' ') result = HardwareAcceleration._test_hardware('h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', debug) if result[0]: if debug: print("✓") return ('Intel Quick Sync', 'h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check AMD AMF (Windows/Linux) if debug: print("Testing AMD AMF...", end=' ') result = HardwareAcceleration._test_hardware('h264_amf', '', debug) if result[0]: if debug: print("✓") return ('AMD AMF', 'h264_amf', '', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check VideoToolbox (macOS/iOS - ARM) if debug: print("Testing VideoToolbox...", end=' ') result = HardwareAcceleration._test_hardware('h264_videotoolbox', '-hwaccel videotoolbox', debug) if result[0]: if debug: print("✓") return ('VideoToolbox', 'h264_videotoolbox', '-hwaccel videotoolbox', True) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) # Check VA-API (Linux Intel/AMD) if debug: print("Testing VA-API...", end=' ') result = HardwareAcceleration._test_vaapi(debug) if result[0]: if debug: print("✓") return (result[1], result[2], result[3], result[4]) # return (name, encoder, hwaccel_args, hw_decode) if debug: print("✗" if result[1] else "not available") if result[1]: warnings.append(result[1]) if debug: print("=== End Debug ===\n") # Print warnings if we fell back to software if warnings: print("\nNote: Hardware acceleration not available:") for warning in warnings: print(f" - {warning}") print() # Fallback to software return ('Software (libx264)', 'libx264', '', False) @staticmethod def _test_hardware(encoder: str, hwaccel_args: str, debug=False) -> Tuple[bool, Optional[str]]: """ Test if hardware encoder is actually available by attempting to use it. Returns: (success, warning_message) """ try: # Build a quick test encode command cmd = ['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error'] # Add hardware acceleration args if present if hwaccel_args: cmd.extend(hwaccel_args.split()) # Generate 1 frame of test video and try to encode it cmd.extend([ '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1', '-frames:v', '1', '-c:v', encoder, '-f', 'null', '-' if os.name != 'nt' else 'NUL' ]) result = subprocess.run( cmd, capture_output=True, timeout=10 ) if debug and result.returncode != 0: print(f"\n Error: {result.stderr.decode() if result.stderr else 'Unknown error'}") return (result.returncode == 0, None) except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e: if debug: print(f"\n Exception: {e}") return (False, None) @staticmethod def _test_vaapi(debug=False) -> Tuple: """ Test VA-API with better error handling and permission checks. Returns: (name, encoder, hwaccel_args) if success or (False, warning, '') """ # Check if VA-API encoder is available in FFmpeg try: result = subprocess.run( ['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True, timeout=5 ) if 'h264_vaapi' not in result.stdout: if debug: print("\n h264_vaapi encoder not found in FFmpeg") return (False, None, '') except Exception as e: if debug: print(f"\n Error checking encoders: {e}") return (False, None, '') # Check for /dev/dri devices dri_devices = [] try: dri_path = Path('/dev/dri') if dri_path.exists(): dri_devices = sorted([str(d) for d in dri_path.glob('renderD*')]) except Exception as e: if debug: print(f"\n Error checking /dev/dri: {e}") pass if not dri_devices: if debug: print("\n No /dev/dri/renderD* devices found") return (False, None, '') # Check permissions on first device device = dri_devices[0] if debug: print(f"\n Found device: {device}") print(f" Can read: {os.access(device, os.R_OK)}") print(f" Can write: {os.access(device, os.W_OK)}") print(f" Process groups: {os.getgroups()}") if not os.access(device, os.R_OK | os.W_OK): return (False, f"VA-API device {device} exists but not accessible. Run: sudo usermod -a -G render $USER (then logout/login)", '', '', False) # Test VA-API for both encoding and decoding try: with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f: test_video = f.name if debug: print(f" Creating test video for VA-API...") # First create a small H.264 test video using software encoding cmd1 = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1', '-frames:v', '2', '-c:v', 'libx264', '-preset', 'ultrafast', test_video ] result1 = subprocess.run(cmd1, capture_output=True, timeout=5) if debug: print(f" Test video creation: {'success' if result1.returncode == 0 else 'failed'}") if result1.returncode != 0: print(f" Error: {result1.stderr.decode() if result1.stderr else 'Unknown'}") if result1.returncode == 0: # Test 1: HW decode + HW encode if debug: print(f" Testing VA-API HW decode + HW encode...") cmd2 = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-vaapi_device', device, '-hwaccel', 'vaapi', '-hwaccel_output_format', 'vaapi', '-i', test_video, '-frames:v', '1', '-c:v', 'h264_vaapi', '-f', 'null', '-' if os.name != 'nt' else 'NUL' ] result2 = subprocess.run(cmd2, capture_output=True, timeout=5) if debug: print(f" HW decode + HW encode: {'success' if result2.returncode == 0 else 'failed'}") if result2.returncode != 0: print(f" Error: {result2.stderr.decode() if result2.stderr else 'Unknown'}") # Test 2: SW decode + HW encode (fallback if HW decode doesn't work) hw_decode_works = result2.returncode == 0 if not hw_decode_works: if debug: print(f" Testing VA-API SW decode + HW encode...") cmd3 = [ 'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error', '-vaapi_device', device, '-i', test_video, '-vf', 'format=nv12,hwupload', '-frames:v', '1', '-c:v', 'h264_vaapi', '-f', 'null', '-' if os.name != 'nt' else 'NUL' ] result3 = subprocess.run(cmd3, capture_output=True, timeout=5) if debug: print(f" SW decode + HW encode: {'success' if result3.returncode == 0 else 'failed'}") if result3.returncode != 0: print(f" Error: {result3.stderr.decode() if result3.stderr else 'Unknown'}") if result3.returncode == 0: os.unlink(test_video) return (True, 'VA-API (Intel/AMD) [SW decode + HW encode]', 'h264_vaapi', f'-vaapi_device {device}', False) os.unlink(test_video) if hw_decode_works: return (True, 'VA-API (Intel/AMD) [HW decode + HW encode]', 'h264_vaapi', f'-vaapi_device {device} -hwaccel vaapi -hwaccel_output_format vaapi', True) if os.path.exists(test_video): os.unlink(test_video) except Exception as e: if debug: print(f" Exception during VA-API test: {e}") pass return (False, None, '', '', False) class TestVideo: """Generate or manage test video file.""" @staticmethod def generate(path: str, duration: int = 30) -> bool: """Generate a 1080p test video.""" try: print(f"Generating {duration}s 1080p test video...") cmd = [ 'ffmpeg', '-y', '-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:rate=30:duration={}'.format(duration), '-f', 'lavfi', '-i', 'sine=frequency=1000:duration={}'.format(duration), '-c:v', 'libx264', '-preset', 'medium', '-c:a', 'aac', '-b:a', '128k', path ] result = subprocess.run(cmd, capture_output=True, timeout=120) return result.returncode == 0 except Exception as e: print(f"Error generating test video: {e}") return False class TranscodeJob: """Represents a single transcode job.""" def __init__(self, input_file: str, output_file: str, encoder: str, hwaccel_args: str, hw_decode: bool): self.input_file = input_file self.output_file = output_file self.encoder = encoder self.hwaccel_args = hwaccel_args self.hw_decode = hw_decode self.process = None self.start_time = None self.end_time = None self.success = False self.fps = 0.0 def run(self): """Execute the transcode job.""" try: # Build FFmpeg command cmd = ['ffmpeg', '-y'] # Add hardware acceleration args (before input) if self.hwaccel_args: cmd.extend(self.hwaccel_args.split()) cmd.extend(['-i', self.input_file]) # For VA-API without HW decode, we need to upload to hardware if 'vaapi' in self.encoder and not self.hw_decode: cmd.extend(['-vf', 'format=nv12,hwupload']) cmd.extend([ '-c:v', self.encoder, '-b:v', '4M', '-c:a', 'aac', '-b:a', '128k', '-f', 'null' if os.name != 'nt' else 'null', self.output_file if os.name != 'nt' else '-' ]) self.start_time = time.time() self.process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # Wait for completion _, stderr = self.process.communicate() self.end_time = time.time() # Parse FPS from FFmpeg output self.fps = self._parse_fps(stderr) self.success = self.process.returncode == 0 except Exception as e: self.success = False self.end_time = time.time() def _parse_fps(self, ffmpeg_output: str) -> float: """Parse average FPS from FFmpeg output.""" try: # Look for the final fps value in output for line in ffmpeg_output.split('\n'): if 'fps=' in line: fps_str = line.split('fps=')[1].split()[0] return float(fps_str) except: pass return 0.0 class Benchmark: """Main benchmark orchestrator.""" def __init__(self, test_video: str, encoder: str, hwaccel_args: str, accel_name: str, hw_decode: bool): self.test_video = test_video self.encoder = encoder self.hwaccel_args = hwaccel_args self.accel_name = accel_name self.hw_decode = hw_decode def run_parallel_transcodes(self, num_streams: int, timeout: int = 60) -> Tuple[bool, float]: """ Run multiple parallel transcode jobs. Returns: (success, average_fps) """ jobs = [] threads = [] # Create jobs for i in range(num_streams): output_file = f'/dev/null' if os.name != 'nt' else 'NUL' job = TranscodeJob(self.test_video, output_file, self.encoder, self.hwaccel_args, self.hw_decode) jobs.append(job) # Start all jobs in parallel for job in jobs: thread = threading.Thread(target=job.run) thread.start() threads.append(thread) # Wait for all to complete (with timeout) start = time.time() for thread in threads: remaining = timeout - (time.time() - start) if remaining > 0: thread.join(timeout=remaining) else: return False, 0.0 # Check if all succeeded and calculate average FPS all_success = all(job.success for job in jobs) if all_success: avg_fps = sum(job.fps for job in jobs) / len(jobs) if jobs else 0.0 return True, avg_fps return False, 0.0 def find_max_streams(self, min_fps: float = 30.0) -> int: """ Use binary search to find maximum number of simultaneous streams. A stream is considered "real-time" if it achieves >= min_fps. """ print(f"\nBenchmarking with {self.accel_name}...") print("Finding maximum simultaneous 1080p streams at real-time or better...\n") # Binary search bounds low, high = 1, 64 max_streams = 0 while low <= high: mid = (low + high) // 2 print(f"Testing {mid} simultaneous streams...", end=' ', flush=True) success, avg_fps = self.run_parallel_transcodes(mid) if success and avg_fps >= min_fps: print(f"✓ (avg {avg_fps:.1f} fps)") max_streams = mid low = mid + 1 else: if success: print(f"✗ (avg {avg_fps:.1f} fps - below real-time)") else: print(f"✗ (failed)") high = mid - 1 return max_streams def main(): parser = argparse.ArgumentParser( description='Benchmark video transcoding performance', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--duration', type=int, default=30, help='Test video duration in seconds (default: 30)' ) parser.add_argument( '--input', type=str, help='Use existing video file instead of generating test video' ) parser.add_argument( '--min-fps', type=float, default=30.0, help='Minimum FPS to consider real-time (default: 30.0)' ) parser.add_argument( '--debug', action='store_true', help='Enable debug output for hardware detection' ) args = parser.parse_args() # Check for FFmpeg try: subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) except (subprocess.CalledProcessError, FileNotFoundError): print("Error: FFmpeg not found. Please install FFmpeg and ensure it's in your PATH.") return 1 print("=" * 60) print("Video Transcoding Benchmark") print("=" * 60) # Detect hardware acceleration accel_name, encoder, hwaccel_args, hw_decode = HardwareAcceleration.detect(debug=args.debug) print(f"Detected acceleration: {accel_name}") print(f"Encoder: {encoder}") # Prepare test video if args.input: test_video = args.input if not os.path.exists(test_video): print(f"Error: Input file '{test_video}' not found.") return 1 else: with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f: test_video = f.name if not TestVideo.generate(test_video, args.duration): print("Error: Failed to generate test video.") return 1 try: # Run benchmark benchmark = Benchmark(test_video, encoder, hwaccel_args, accel_name, hw_decode) max_streams = benchmark.find_max_streams(args.min_fps) # Display results print("\n" + "=" * 60) print("BENCHMARK RESULTS") print("=" * 60) print(f"Hardware Acceleration: {accel_name}") print(f"Maximum Simultaneous 1080p Streams: {max_streams}") print(f"(at {args.min_fps} FPS or better)") print("=" * 60) # Also output just the number for easy parsing print(f"\nBenchmark Score: {max_streams}") finally: # Cleanup if not args.input and os.path.exists(test_video): os.unlink(test_video) return 0 if __name__ == '__main__': sys.exit(main())