Files
transcode_bench/transcode_bench.py
2025-10-12 20:40:06 +13:00

739 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Video Transcoding Benchmark Tool
Measures how many simultaneous 1080p streams can be transcoded in real-time or better.
"""
import subprocess
import sys
import time
import tempfile
import os
import threading
import argparse
import signal
import atexit
import math
from pathlib import Path
from typing import Optional, Tuple, List
def restore_terminal():
"""Restore terminal settings."""
if sys.platform != 'win32':
os.system('stty sane')
# Register cleanup function
atexit.register(restore_terminal)
class HardwareAcceleration:
"""Detect and configure hardware acceleration."""
@staticmethod
def detect(debug=False) -> Tuple[str, str, str, bool]:
"""
Detect available hardware acceleration.
Returns: (name, encoder, hwaccel_args, hw_decode_supported)
"""
warnings = []
if debug:
print("\n=== Hardware Detection Debug ===")
# Check NVIDIA NVENC
if debug:
print("Testing NVIDIA NVENC...", end=' ')
result = HardwareAcceleration._test_hardware('h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', debug)
if result[0]:
if debug:
print("")
return ('NVIDIA NVENC', 'h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check Intel QSV
if debug:
print("Testing Intel QSV...", end=' ')
result = HardwareAcceleration._test_hardware('h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', debug)
if result[0]:
if debug:
print("")
return ('Intel Quick Sync', 'h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check AMD AMF (Windows/Linux)
if debug:
print("Testing AMD AMF...", end=' ')
result = HardwareAcceleration._test_hardware('h264_amf', '', debug)
if result[0]:
if debug:
print("")
return ('AMD AMF', 'h264_amf', '', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check VideoToolbox (macOS/iOS - ARM)
if debug:
print("Testing VideoToolbox...", end=' ')
result = HardwareAcceleration._test_hardware('h264_videotoolbox', '-hwaccel videotoolbox', debug)
if result[0]:
if debug:
print("")
return ('VideoToolbox', 'h264_videotoolbox', '-hwaccel videotoolbox', True)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
# Check VA-API (Linux Intel/AMD)
if debug:
print("Testing VA-API...", end=' ')
result = HardwareAcceleration._test_vaapi(debug)
if result[0]:
if debug:
print("")
return (result[1], result[2], result[3], result[4]) # return (name, encoder, hwaccel_args, hw_decode)
if debug:
print("" if result[1] else "not available")
if result[1]:
warnings.append(result[1])
if debug:
print("=== End Debug ===\n")
# Print warnings if we fell back to software
if warnings:
print("\nNote: Hardware acceleration not available:")
for warning in warnings:
print(f" - {warning}")
print()
# Fallback to software
return ('Software (libx264)', 'libx264', '', False)
@staticmethod
def _test_hardware(encoder: str, hwaccel_args: str, debug=False) -> Tuple[bool, Optional[str]]:
"""
Test if hardware encoder is actually available by attempting to use it.
Returns: (success, warning_message)
"""
try:
# Build a quick test encode command
cmd = ['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error']
# Add hardware acceleration args if present
if hwaccel_args:
cmd.extend(hwaccel_args.split())
# Generate 1 frame of test video and try to encode it
cmd.extend([
'-f', 'lavfi',
'-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1',
'-frames:v', '1',
'-c:v', encoder,
'-f', 'null',
'-' if os.name != 'nt' else 'NUL'
])
result = subprocess.run(
cmd,
capture_output=True,
timeout=10
)
if debug and result.returncode != 0:
print(f"\n Error: {result.stderr.decode() if result.stderr else 'Unknown error'}")
return (result.returncode == 0, None)
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
if debug:
print(f"\n Exception: {e}")
return (False, None)
@staticmethod
def _test_vaapi(debug=False) -> Tuple:
"""
Test VA-API with better error handling and permission checks.
Returns: (name, encoder, hwaccel_args) if success or (False, warning, '')
"""
# Check if VA-API encoder is available in FFmpeg
try:
result = subprocess.run(
['ffmpeg', '-hide_banner', '-encoders'],
capture_output=True,
text=True,
timeout=5
)
if 'h264_vaapi' not in result.stdout:
if debug:
print("\n h264_vaapi encoder not found in FFmpeg")
return (False, None, '')
except Exception as e:
if debug:
print(f"\n Error checking encoders: {e}")
return (False, None, '')
# Check for /dev/dri devices
dri_devices = []
try:
dri_path = Path('/dev/dri')
if dri_path.exists():
dri_devices = sorted([str(d) for d in dri_path.glob('renderD*')])
except Exception as e:
if debug:
print(f"\n Error checking /dev/dri: {e}")
pass
if not dri_devices:
if debug:
print("\n No /dev/dri/renderD* devices found")
return (False, None, '')
# Check permissions on first device
device = dri_devices[0]
if debug:
print(f"\n Found device: {device}")
print(f" Can read: {os.access(device, os.R_OK)}")
print(f" Can write: {os.access(device, os.W_OK)}")
print(f" Process groups: {os.getgroups()}")
if not os.access(device, os.R_OK | os.W_OK):
return (False,
f"VA-API device {device} exists but not accessible. Run: sudo usermod -a -G render $USER (then logout/login)",
'', '', False)
# Test VA-API for both encoding and decoding
try:
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f:
test_video = f.name
if debug:
print(f" Creating test video for VA-API...")
# First create a small H.264 test video using software encoding
cmd1 = [
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-f', 'lavfi',
'-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1',
'-frames:v', '2',
'-c:v', 'libx264',
'-preset', 'ultrafast',
test_video
]
result1 = subprocess.run(cmd1, capture_output=True, timeout=5)
if debug:
print(f" Test video creation: {'success' if result1.returncode == 0 else 'failed'}")
if result1.returncode != 0:
print(f" Error: {result1.stderr.decode() if result1.stderr else 'Unknown'}")
if result1.returncode == 0:
# Test 1: HW decode + HW encode
if debug:
print(f" Testing VA-API HW decode + HW encode...")
cmd2 = [
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-vaapi_device', device,
'-hwaccel', 'vaapi',
'-hwaccel_output_format', 'vaapi',
'-i', test_video,
'-frames:v', '1',
'-c:v', 'h264_vaapi',
'-f', 'null',
'-' if os.name != 'nt' else 'NUL'
]
result2 = subprocess.run(cmd2, capture_output=True, timeout=5)
if debug:
print(f" HW decode + HW encode: {'success' if result2.returncode == 0 else 'failed'}")
if result2.returncode != 0:
print(f" Error: {result2.stderr.decode() if result2.stderr else 'Unknown'}")
# Test 2: SW decode + HW encode (fallback if HW decode doesn't work)
hw_decode_works = result2.returncode == 0
if not hw_decode_works:
if debug:
print(f" Testing VA-API SW decode + HW encode...")
cmd3 = [
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-vaapi_device', device,
'-i', test_video,
'-vf', 'format=nv12,hwupload',
'-frames:v', '1',
'-c:v', 'h264_vaapi',
'-f', 'null',
'-' if os.name != 'nt' else 'NUL'
]
result3 = subprocess.run(cmd3, capture_output=True, timeout=5)
if debug:
print(f" SW decode + HW encode: {'success' if result3.returncode == 0 else 'failed'}")
if result3.returncode != 0:
print(f" Error: {result3.stderr.decode() if result3.stderr else 'Unknown'}")
if result3.returncode == 0:
os.unlink(test_video)
return (True, 'VA-API (Intel/AMD) [SW decode + HW encode]', 'h264_vaapi',
f'-vaapi_device {device}', False)
os.unlink(test_video)
if hw_decode_works:
return (True, 'VA-API (Intel/AMD) [HW decode + HW encode]', 'h264_vaapi',
f'-vaapi_device {device} -hwaccel vaapi -hwaccel_output_format vaapi', True)
if os.path.exists(test_video):
os.unlink(test_video)
except Exception as e:
if debug:
print(f" Exception during VA-API test: {e}")
pass
return (False, None, '', '', False)
class TestVideo:
"""Generate or manage test video file."""
@staticmethod
def generate(path: str, duration: int = 30) -> bool:
"""Generate a 1080p test video."""
try:
print(f"Generating {duration}s 1080p test video...")
cmd = [
'ffmpeg', '-y',
'-f', 'lavfi',
'-i', 'testsrc2=size=1920x1080:rate=30:duration={}'.format(duration),
'-f', 'lavfi',
'-i', 'sine=frequency=1000:duration={}'.format(duration),
'-c:v', 'libx264',
'-preset', 'medium',
'-c:a', 'aac',
'-b:a', '128k',
path
]
result = subprocess.run(cmd, capture_output=True, timeout=120)
return result.returncode == 0
except Exception as e:
print(f"Error generating test video: {e}")
return False
class TranscodeJob:
"""Represents a single transcode job."""
def __init__(self, input_file: str, output_file: str, encoder: str, hwaccel_args: str, hw_decode: bool, hevc_encoder: str):
self.input_file = input_file
self.output_file = output_file
self.encoder = encoder
self.hwaccel_args = hwaccel_args
self.hw_decode = hw_decode
self.hevc_encoder = hevc_encoder # May be hardware or software
self.process = None
self.start_time = None
self.end_time = None
self.success = False
self.fps = 0.0
def run(self):
"""Execute the transcode job."""
try:
# Build FFmpeg command
cmd = ['ffmpeg', '-y', '-nostdin', '-stats']
# Add hardware acceleration args (before input)
if self.hwaccel_args:
cmd.extend(self.hwaccel_args.split())
cmd.extend(['-i', self.input_file])
# Build video filter chain for realistic transcoding
vf_filters = []
# Handle decode format conversion and scaling
if self.hw_decode:
# HW decode: download from GPU to CPU first
vf_filters.append('hwdownload')
vf_filters.append('format=nv12')
# Scale on CPU (works everywhere)
vf_filters.append('scale=1280x720')
vf_filters.append('format=nv12')
# Upload to GPU only if HEVC encoder is hardware-accelerated
if 'vaapi' in self.hevc_encoder:
vf_filters.append('hwupload')
if vf_filters:
cmd.extend(['-vf', ','.join(vf_filters)])
# Build encoding parameters - use HEVC/H.265 for output
encode_params = ['-c:v', self.hevc_encoder]
# Configure encoding parameters based on encoder type
if 'vaapi' in self.hevc_encoder:
# VA-API: use CQP mode with Main profile (4:2:0)
encode_params.extend(['-profile:v', 'main', '-qp', '23'])
elif self.hevc_encoder == 'libx265':
# Software HEVC: use CRF mode
encode_params.extend(['-preset', 'medium', '-crf', '23'])
else:
# Other hardware encoders: use bitrate
encode_params.extend(['-b:v', '4M'])
cmd.extend(encode_params)
cmd.extend([
'-c:a', 'aac',
'-b:a', '128k',
'-f', 'null' if os.name != 'nt' else 'null',
self.output_file if os.name != 'nt' else '-'
])
self.start_time = time.time()
self.process = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Wait for completion
_, stderr = self.process.communicate()
self.end_time = time.time()
# Parse FPS from FFmpeg output
self.fps = self._parse_fps(stderr)
self.success = self.process.returncode == 0
# Store stderr for debugging
self.stderr = stderr
except Exception as e:
self.success = False
self.end_time = time.time()
self.stderr = str(e)
def _parse_fps(self, ffmpeg_output: str) -> float:
"""Parse average FPS from FFmpeg output."""
try:
# Look for the last line with fps= (the final summary line)
fps_value = None
for line in ffmpeg_output.split('\n'):
if 'fps=' in line and 'frame=' in line:
# Extract fps value
try:
fps_str = line.split('fps=')[1].split()[0]
fps_value = float(fps_str)
except:
pass
if fps_value and fps_value > 0:
return fps_value
# Fallback: calculate from time and speed
# Look for lines like "time=00:00:30.00" and "speed=2.5x"
time_seconds = None
speed_multiplier = None
for line in ffmpeg_output.split('\n'):
if 'time=' in line:
try:
time_str = line.split('time=')[1].split()[0]
# Parse time format HH:MM:SS.MS
parts = time_str.split(':')
if len(parts) == 3:
h, m, s = parts
time_seconds = int(h) * 3600 + int(m) * 60 + float(s)
except:
pass
if 'speed=' in line:
try:
speed_str = line.split('speed=')[1].split('x')[0]
speed_multiplier = float(speed_str)
except:
pass
# Calculate effective FPS: if encoding 30fps video at 2x speed, effective fps = 60
if time_seconds and speed_multiplier and self.end_time and self.start_time:
wall_time = self.end_time - self.start_time
if wall_time > 0:
# Assume input is 30fps
input_fps = 30.0
frames = time_seconds * input_fps
return frames / wall_time
except Exception as e:
pass
return 0.0
class Benchmark:
"""Main benchmark orchestrator."""
def __init__(self, test_video: str, encoder: str, hwaccel_args: str, accel_name: str, hw_decode: bool):
self.test_video = test_video
self.encoder = encoder
self.hwaccel_args = hwaccel_args
self.accel_name = accel_name
self.hw_decode = hw_decode
# Determine HEVC encoder (hardware or software fallback)
self.hevc_encoder = self._detect_hevc_encoder()
def _detect_hevc_encoder(self) -> str:
"""Detect if hardware HEVC encoding is available, otherwise use software."""
# Try hardware HEVC encoder first
hw_hevc_encoder = self.encoder.replace('h264', 'hevc').replace('264', '265')
# Test if hardware HEVC works
try:
with tempfile.NamedTemporaryFile(suffix='.yuv', delete=False) as f:
yuv_file = f.name
# Generate 1 frame
cmd1 = [
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
'-f', 'lavfi', '-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1',
'-frames:v', '1', '-pix_fmt', 'nv12', yuv_file
]
subprocess.run(cmd1, capture_output=True, timeout=5)
# Try to encode with hardware HEVC
cmd2 = ['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error']
if self.hwaccel_args:
cmd2.extend(self.hwaccel_args.split())
cmd2.extend([
'-f', 'rawvideo', '-pix_fmt', 'nv12', '-s:v', '1920x1080', '-i', yuv_file,
'-vf', 'scale=1280x720,format=nv12,hwupload' if 'vaapi' in hw_hevc_encoder else 'scale=1280x720',
'-frames:v', '1', '-c:v', hw_hevc_encoder,
])
# Add profile and quality settings for VA-API HEVC
if 'vaapi' in hw_hevc_encoder:
cmd2.extend(['-profile:v', 'main', '-qp', '23'])
else:
cmd2.extend(['-qp', '23'])
cmd2.extend(['-f', 'null', '-'])
result = subprocess.run(cmd2, capture_output=True, timeout=5)
# Debug output if it fails
if result.returncode != 0:
print(f" Hardware HEVC test failed: {result.stderr.decode()[:200]}")
os.unlink(yuv_file)
if result.returncode == 0:
print(f" Using hardware HEVC encoder: {hw_hevc_encoder}")
return hw_hevc_encoder
except:
pass
# Fall back to software
print(f" Hardware HEVC not available, falling back to software (libx265)")
return 'libx265'
def run_parallel_transcodes(self, num_streams: int, timeout: int = 60) -> Tuple[bool, float]:
"""
Run multiple parallel transcode jobs.
Returns: (success, average_fps)
"""
jobs = []
threads = []
# Create jobs
for i in range(num_streams):
output_file = f'/dev/null' if os.name != 'nt' else 'NUL'
job = TranscodeJob(self.test_video, output_file, self.encoder, self.hwaccel_args, self.hw_decode, self.hevc_encoder)
jobs.append(job)
# Start all jobs in parallel
for job in jobs:
thread = threading.Thread(target=job.run)
thread.start()
threads.append(thread)
# Wait for all to complete (with timeout)
start = time.time()
for thread in threads:
remaining = timeout - (time.time() - start)
if remaining > 0:
thread.join(timeout=remaining)
else:
return False, 0.0
# Check if all succeeded and calculate average FPS
all_success = all(job.success for job in jobs)
if all_success:
avg_fps = sum(job.fps for job in jobs) / len(jobs) if jobs else 0.0
return True, avg_fps
# If failed, print first error for debugging
for job in jobs:
if not job.success and hasattr(job, 'stderr'):
# Skip FFmpeg header and show actual error (last 10 non-empty lines)
error_lines = [line for line in job.stderr.split('\n') if line.strip()]
if len(error_lines) > 10:
error_lines = error_lines[-10:]
print(f"\n Debug - Error output:")
for line in error_lines:
print(f" {line}")
break
return False, 0.0
def find_max_streams(self, min_fps: float = 30.0) -> int:
"""
Use binary search to find maximum number of simultaneous streams.
A stream is considered "real-time" if it achieves >= min_fps.
"""
print(f"\nBenchmarking with {self.accel_name}...")
print("Finding maximum simultaneous 1080p H.264 → 720p H.265 transcode streams at real-time or better...\n")
# First verify that 1 stream works
print(f"Testing 1 simultaneous stream...", end=' ', flush=True)
success, avg_fps = self.run_parallel_transcodes(1)
if not success:
print(f"✗ (failed)")
return 0
if avg_fps < min_fps:
print(f"✗ (avg {avg_fps:.1f} fps - below real-time)")
return 0
print(f"✓ (avg {avg_fps:.1f} fps)")
max_streams = 1
# Estimate maximum possible streams based on single stream performance
# Upper bound = ceiling(1.2 * X / min_fps)
estimated_max = math.ceil(1.2 * avg_fps / min_fps)
# Cap the search space reasonably (at least 2, at most 128)
estimated_max = max(2, min(estimated_max, 128))
print(f"Estimated capacity: ~{int(avg_fps / min_fps)} streams (searching up to {estimated_max})")
# Binary search bounds
low, high = 2, estimated_max
while low <= high:
mid = (low + high) // 2
print(f"Testing {mid} simultaneous streams...", end=' ', flush=True)
success, avg_fps = self.run_parallel_transcodes(mid)
if success and avg_fps >= min_fps:
print(f"✓ (avg {avg_fps:.1f} fps)")
max_streams = mid
low = mid + 1
else:
if success:
print(f"✗ (avg {avg_fps:.1f} fps - below real-time)")
else:
print(f"✗ (failed)")
high = mid - 1
return max_streams
def main():
parser = argparse.ArgumentParser(
description='Benchmark video transcoding performance',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--duration',
type=int,
default=10,
help='Test video duration in seconds (default: 10)'
)
parser.add_argument(
'--input',
type=str,
help='Use existing video file instead of generating test video'
)
parser.add_argument(
'--min-fps',
type=float,
default=30.0,
help='Minimum FPS to consider real-time (default: 30.0)'
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug output for hardware detection'
)
args = parser.parse_args()
# Check for FFmpeg
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("Error: FFmpeg not found. Please install FFmpeg and ensure it's in your PATH.")
return 1
print("=" * 60)
print("Video Transcoding Benchmark")
print("=" * 60)
# Detect hardware acceleration
accel_name, encoder, hwaccel_args, hw_decode = HardwareAcceleration.detect(debug=args.debug)
print(f"Detected acceleration: {accel_name}")
print(f"Encoder: {encoder}")
# Prepare test video
if args.input:
test_video = args.input
if not os.path.exists(test_video):
print(f"Error: Input file '{test_video}' not found.")
return 1
else:
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f:
test_video = f.name
if not TestVideo.generate(test_video, args.duration):
print("Error: Failed to generate test video.")
return 1
try:
# Run benchmark
benchmark = Benchmark(test_video, encoder, hwaccel_args, accel_name, hw_decode)
max_streams = benchmark.find_max_streams(args.min_fps)
# Display results
print("\n" + "=" * 60)
print("BENCHMARK RESULTS")
print("=" * 60)
print(f"Hardware Acceleration: {accel_name}")
print(f"HEVC Encoder: {benchmark.hevc_encoder}")
print(f"Transcode Task: 1080p H.264 → 720p H.265")
print(f"Maximum Simultaneous Streams: {max_streams}")
print(f"(at {args.min_fps} FPS or better)")
print("=" * 60)
# Also output just the number for easy parsing
print(f"\nBenchmark Score: {max_streams}")
finally:
# Cleanup
if not args.input and os.path.exists(test_video):
os.unlink(test_video)
return 0
if __name__ == '__main__':
sys.exit(main())