Files
transcode_bench/transcode_bench.py
2025-10-12 20:24:09 +13:00

609 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Video Transcoding Benchmark Tool
Measures how many simultaneous 1080p streams can be transcoded in real-time or better.
"""
import subprocess
import sys
import time
import tempfile
import os
import threading
import argparse
import signal
import atexit
from pathlib import Path
from typing import Optional, Tuple, List
def restore_terminal():
"""Restore terminal settings."""
if sys.platform != 'win32':
os.system('stty sane')
# Register cleanup function
atexit.register(restore_terminal)
class HardwareAcceleration:
"""Detect and configure hardware acceleration."""
@staticmethod
def detect(debug=False) -> Tuple[str, str, str, bool]:
"""
Detect available hardware acceleration.
Returns: (name, encoder, hwaccel_args, hw_decode_supported)
"""
warnings = []
if debug:
print("\n=== Hardware Detection Debug ===")
# Check NVIDIA NVENC
if debug:
print("Testing NVIDIA NVENC...", end=' ')
result = HardwareAcceleration._test_hardware('h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', debug)
if result[0]:
if debug:
print("")
return ('NVIDIA NVENC', 'h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check Intel QSV
if debug:
print("Testing Intel QSV...", end=' ')
result = HardwareAcceleration._test_hardware('h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', debug)
if result[0]:
if debug:
print("")
return ('Intel Quick Sync', 'h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check AMD AMF (Windows/Linux)
if debug:
print("Testing AMD AMF...", end=' ')
result = HardwareAcceleration._test_hardware('h264_amf', '', debug)
if result[0]:
if debug:
print("")
return ('AMD AMF', 'h264_amf', '', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check VideoToolbox (macOS/iOS - ARM)
if debug:
print("Testing VideoToolbox...", end=' ')
result = HardwareAcceleration._test_hardware('h264_videotoolbox', '-hwaccel videotoolbox', debug)
if result[0]:
if debug:
print("")
return ('VideoToolbox', 'h264_videotoolbox', '-hwaccel videotoolbox', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check VA-API (Linux Intel/AMD)
if debug:
print("Testing VA-API...", end=' ')
result = HardwareAcceleration._test_vaapi(debug)
if result[0]:
if debug:
print("")
return (result[1], result[2], result[3], result[4]) # return (name, encoder, hwaccel_args, hw_decode)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
if debug:
print("=== End Debug ===\n")
# Print warnings if we fell back to software
if warnings:
print("\nNote: Hardware acceleration not available:")
for warning in warnings:
print(f" - {warning}")
print()
# Fallback to software
return ('Software (libx264)', 'libx264', '', False)
@staticmethod
def _test_hardware(encoder: str, hwaccel_args: str, debug=False) -> Tuple[bool, Optional[str]]:
"""
Test if hardware encoder is actually available by attempting to use it.
Returns: (success, warning_message)
"""
try:
# Build a quick test encode command
cmd = ['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error']
# Add hardware acceleration args if present
if hwaccel_args:
cmd.extend(hwaccel_args.split())
# Generate 1 frame of test video and try to encode it
cmd.extend([
'-f', 'lavfi',
'-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1',
'-frames:v', '1',
'-c:v', encoder,
'-f', 'null',
'-' if os.name != 'nt' else 'NUL'
])
result = subprocess.run(
cmd,
capture_output=True,
timeout=10
)
if debug and result.returncode != 0:
print(f"\n Error: {result.stderr.decode() if result.stderr else 'Unknown error'}")
return (result.returncode == 0, None)
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
if debug:
print(f"\n Exception: {e}")
return (False, None)
@staticmethod
def _test_vaapi(debug=False) -> Tuple:
"""
Test VA-API with better error handling and permission checks.
Returns: (name, encoder, hwaccel_args) if success or (False, warning, '')
"""
# Check if VA-API encoder is available in FFmpeg
try:
result = subprocess.run(
['ffmpeg', '-hide_banner', '-encoders'],
capture_output=True,
text=True,
timeout=5
)
if 'h264_vaapi' not in result.stdout:
if debug:
print("\n h264_vaapi encoder not found in FFmpeg")
return (False, None, '')
except Exception as e:
if debug:
print(f"\n Error checking encoders: {e}")
return (False, None, '')
# Check for /dev/dri devices
dri_devices = []
try:
dri_path = Path('/dev/dri')
if dri_path.exists():
dri_devices = sorted([str(d) for d in dri_path.glob('renderD*')])
except Exception as e:
if debug:
print(f"\n Error checking /dev/dri: {e}")
pass
if not dri_devices:
if debug:
print("\n No /dev/dri/renderD* devices found")
return (False, None, '')
# Check permissions on first device
device = dri_devices[0]
if debug:
print(f"\n Found device: {device}")
print(f" Can read: {os.access(device, os.R_OK)}")
print(f" Can write: {os.access(device, os.W_OK)}")
print(f" Process groups: {os.getgroups()}")
if not os.access(device, os.R_OK | os.W_OK):
return (False,
f"VA-API device {device} exists but not accessible. Run: sudo usermod -a -G render $USER (then logout/login)",
'', '', False)
# Test VA-API for both encoding and decoding
try:
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f:
test_video = f.name
if debug:
print(f" Creating test video for VA-API...")
# First create a small H.264 test video using software encoding
cmd1 = [
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-f', 'lavfi',
'-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1',
'-frames:v', '2',
'-c:v', 'libx264',
'-preset', 'ultrafast',
test_video
]
result1 = subprocess.run(cmd1, capture_output=True, timeout=5)
if debug:
print(f" Test video creation: {'success' if result1.returncode == 0 else 'failed'}")
if result1.returncode != 0:
print(f" Error: {result1.stderr.decode() if result1.stderr else 'Unknown'}")
if result1.returncode == 0:
# Test 1: HW decode + HW encode
if debug:
print(f" Testing VA-API HW decode + HW encode...")
cmd2 = [
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-vaapi_device', device,
'-hwaccel', 'vaapi',
'-hwaccel_output_format', 'vaapi',
'-i', test_video,
'-frames:v', '1',
'-c:v', 'h264_vaapi',
'-f', 'null',
'-' if os.name != 'nt' else 'NUL'
]
result2 = subprocess.run(cmd2, capture_output=True, timeout=5)
if debug:
print(f" HW decode + HW encode: {'success' if result2.returncode == 0 else 'failed'}")
if result2.returncode != 0:
print(f" Error: {result2.stderr.decode() if result2.stderr else 'Unknown'}")
# Test 2: SW decode + HW encode (fallback if HW decode doesn't work)
hw_decode_works = result2.returncode == 0
if not hw_decode_works:
if debug:
print(f" Testing VA-API SW decode + HW encode...")
cmd3 = [
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-vaapi_device', device,
'-i', test_video,
'-vf', 'format=nv12,hwupload',
'-frames:v', '1',
'-c:v', 'h264_vaapi',
'-f', 'null',
'-' if os.name != 'nt' else 'NUL'
]
result3 = subprocess.run(cmd3, capture_output=True, timeout=5)
if debug:
print(f" SW decode + HW encode: {'success' if result3.returncode == 0 else 'failed'}")
if result3.returncode != 0:
print(f" Error: {result3.stderr.decode() if result3.stderr else 'Unknown'}")
if result3.returncode == 0:
os.unlink(test_video)
return (True, 'VA-API (Intel/AMD) [SW decode + HW encode]', 'h264_vaapi',
f'-vaapi_device {device}', False)
os.unlink(test_video)
if hw_decode_works:
return (True, 'VA-API (Intel/AMD) [HW decode + HW encode]', 'h264_vaapi',
f'-vaapi_device {device} -hwaccel vaapi -hwaccel_output_format vaapi', True)
if os.path.exists(test_video):
os.unlink(test_video)
except Exception as e:
if debug:
print(f" Exception during VA-API test: {e}")
pass
return (False, None, '', '', False)
class TestVideo:
"""Generate or manage test video file."""
@staticmethod
def generate(path: str, duration: int = 30) -> bool:
"""Generate a 1080p test video."""
try:
print(f"Generating {duration}s 1080p test video...")
cmd = [
'ffmpeg', '-y',
'-f', 'lavfi',
'-i', 'testsrc2=size=1920x1080:rate=30:duration={}'.format(duration),
'-f', 'lavfi',
'-i', 'sine=frequency=1000:duration={}'.format(duration),
'-c:v', 'libx264',
'-preset', 'medium',
'-c:a', 'aac',
'-b:a', '128k',
path
]
result = subprocess.run(cmd, capture_output=True, timeout=120)
return result.returncode == 0
except Exception as e:
print(f"Error generating test video: {e}")
return False
class TranscodeJob:
"""Represents a single transcode job."""
def __init__(self, input_file: str, output_file: str, encoder: str, hwaccel_args: str, hw_decode: bool):
self.input_file = input_file
self.output_file = output_file
self.encoder = encoder
self.hwaccel_args = hwaccel_args
self.hw_decode = hw_decode
self.process = None
self.start_time = None
self.end_time = None
self.success = False
self.fps = 0.0
def run(self):
"""Execute the transcode job."""
try:
# Build FFmpeg command
cmd = ['ffmpeg', '-y']
# Add hardware acceleration args (before input)
if self.hwaccel_args:
cmd.extend(self.hwaccel_args.split())
cmd.extend(['-i', self.input_file])
# For VA-API without HW decode, we need to upload to hardware
if 'vaapi' in self.encoder and not self.hw_decode:
cmd.extend(['-vf', 'format=nv12,hwupload'])
# Build encoding parameters
encode_params = ['-c:v', self.encoder]
# For VA-API, use CQP mode instead of bitrate if needed
if 'vaapi' in self.encoder:
# Use constant quality mode (lower is better quality, 20-30 is good)
encode_params.extend(['-qp', '23'])
else:
# Use bitrate mode for other encoders
encode_params.extend(['-b:v', '4M'])
cmd.extend(encode_params)
cmd.extend([
'-c:a', 'aac',
'-b:a', '128k',
'-f', 'null' if os.name != 'nt' else 'null',
self.output_file if os.name != 'nt' else '-'
])
self.start_time = time.time()
self.process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Wait for completion
_, stderr = self.process.communicate()
self.end_time = time.time()
# Parse FPS from FFmpeg output
self.fps = self._parse_fps(stderr)
self.success = self.process.returncode == 0
# Store stderr for debugging
self.stderr = stderr
except Exception as e:
self.success = False
self.end_time = time.time()
self.stderr = str(e)
def _parse_fps(self, ffmpeg_output: str) -> float:
"""Parse average FPS from FFmpeg output."""
try:
# Look for the final fps value in output
for line in ffmpeg_output.split('\n'):
if 'fps=' in line:
fps_str = line.split('fps=')[1].split()[0]
return float(fps_str)
except:
pass
return 0.0
class Benchmark:
"""Main benchmark orchestrator."""
def __init__(self, test_video: str, encoder: str, hwaccel_args: str, accel_name: str, hw_decode: bool):
self.test_video = test_video
self.encoder = encoder
self.hwaccel_args = hwaccel_args
self.accel_name = accel_name
self.hw_decode = hw_decode
def run_parallel_transcodes(self, num_streams: int, timeout: int = 60) -> Tuple[bool, float]:
"""
Run multiple parallel transcode jobs.
Returns: (success, average_fps)
"""
jobs = []
threads = []
# Create jobs
for i in range(num_streams):
output_file = f'/dev/null' if os.name != 'nt' else 'NUL'
job = TranscodeJob(self.test_video, output_file, self.encoder, self.hwaccel_args, self.hw_decode)
jobs.append(job)
# Start all jobs in parallel
for job in jobs:
thread = threading.Thread(target=job.run)
thread.start()
threads.append(thread)
# Wait for all to complete (with timeout)
start = time.time()
for thread in threads:
remaining = timeout - (time.time() - start)
if remaining > 0:
thread.join(timeout=remaining)
else:
return False, 0.0
# Check if all succeeded and calculate average FPS
all_success = all(job.success for job in jobs)
if all_success:
avg_fps = sum(job.fps for job in jobs) / len(jobs) if jobs else 0.0
return True, avg_fps
# If failed, print first error for debugging
for job in jobs:
if not job.success and hasattr(job, 'stderr'):
# Skip FFmpeg header and show actual error (last 10 non-empty lines)
error_lines = [line for line in job.stderr.split('\n') if line.strip()]
if len(error_lines) > 10:
error_lines = error_lines[-10:]
print(f"\n Debug - Error output:")
for line in error_lines:
print(f" {line}")
break
return False, 0.0
def find_max_streams(self, min_fps: float = 30.0) -> int:
"""
Use binary search to find maximum number of simultaneous streams.
A stream is considered "real-time" if it achieves >= min_fps.
"""
print(f"\nBenchmarking with {self.accel_name}...")
print("Finding maximum simultaneous 1080p streams at real-time or better...\n")
# First verify that 1 stream works
print(f"Testing 1 simultaneous stream...", end=' ', flush=True)
success, avg_fps = self.run_parallel_transcodes(1)
if not success:
print(f"✗ (failed)")
return 0
if avg_fps < min_fps:
print(f"✗ (avg {avg_fps:.1f} fps - below real-time)")
return 0
print(f"✓ (avg {avg_fps:.1f} fps)")
max_streams = 1
# Binary search bounds
low, high = 2, 64
while low <= high:
mid = (low + high) // 2
print(f"Testing {mid} simultaneous streams...", end=' ', flush=True)
success, avg_fps = self.run_parallel_transcodes(mid)
if success and avg_fps >= min_fps:
print(f"✓ (avg {avg_fps:.1f} fps)")
max_streams = mid
low = mid + 1
else:
if success:
print(f"✗ (avg {avg_fps:.1f} fps - below real-time)")
else:
print(f"✗ (failed)")
high = mid - 1
return max_streams
def main():
parser = argparse.ArgumentParser(
description='Benchmark video transcoding performance',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--duration',
type=int,
default=30,
help='Test video duration in seconds (default: 30)'
)
parser.add_argument(
'--input',
type=str,
help='Use existing video file instead of generating test video'
)
parser.add_argument(
'--min-fps',
type=float,
default=30.0,
help='Minimum FPS to consider real-time (default: 30.0)'
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug output for hardware detection'
)
args = parser.parse_args()
# Check for FFmpeg
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("Error: FFmpeg not found. Please install FFmpeg and ensure it's in your PATH.")
return 1
print("=" * 60)
print("Video Transcoding Benchmark")
print("=" * 60)
# Detect hardware acceleration
accel_name, encoder, hwaccel_args, hw_decode = HardwareAcceleration.detect(debug=args.debug)
print(f"Detected acceleration: {accel_name}")
print(f"Encoder: {encoder}")
# Prepare test video
if args.input:
test_video = args.input
if not os.path.exists(test_video):
print(f"Error: Input file '{test_video}' not found.")
return 1
else:
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f:
test_video = f.name
if not TestVideo.generate(test_video, args.duration):
print("Error: Failed to generate test video.")
return 1
try:
# Run benchmark
benchmark = Benchmark(test_video, encoder, hwaccel_args, accel_name, hw_decode)
max_streams = benchmark.find_max_streams(args.min_fps)
# Display results
print("\n" + "=" * 60)
print("BENCHMARK RESULTS")
print("=" * 60)
print(f"Hardware Acceleration: {accel_name}")
print(f"Maximum Simultaneous 1080p Streams: {max_streams}")
print(f"(at {args.min_fps} FPS or better)")
print("=" * 60)
# Also output just the number for easy parsing
print(f"\nBenchmark Score: {max_streams}")
finally:
# Cleanup
if not args.input and os.path.exists(test_video):
os.unlink(test_video)
return 0
if __name__ == '__main__':
sys.exit(main())