661 lines
23 KiB
Python
Executable File
661 lines
23 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Video Transcoding Benchmark Tool
|
|
Measures how many simultaneous 1080p streams can be transcoded in real-time or better.
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import tempfile
|
|
import os
|
|
import threading
|
|
import argparse
|
|
import signal
|
|
import atexit
|
|
import math
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple, List
|
|
|
|
|
|
def restore_terminal():
|
|
"""Restore terminal settings."""
|
|
if sys.platform != 'win32':
|
|
os.system('stty sane')
|
|
|
|
|
|
# Register cleanup function
|
|
atexit.register(restore_terminal)
|
|
|
|
|
|
class HardwareAcceleration:
|
|
"""Detect and configure hardware acceleration."""
|
|
|
|
@staticmethod
|
|
def detect(debug=False) -> Tuple[str, str, str, bool]:
|
|
"""
|
|
Detect available hardware acceleration.
|
|
Returns: (name, encoder, hwaccel_args, hw_decode_supported)
|
|
"""
|
|
warnings = []
|
|
|
|
if debug:
|
|
print("\n=== Hardware Detection Debug ===")
|
|
|
|
# Check NVIDIA NVENC
|
|
if debug:
|
|
print("Testing NVIDIA NVENC...", end=' ')
|
|
result = HardwareAcceleration._test_hardware('h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', debug)
|
|
if result[0]:
|
|
if debug:
|
|
print("✓")
|
|
return ('NVIDIA NVENC', 'h264_nvenc', '-hwaccel cuda -hwaccel_output_format cuda', True)
|
|
if debug:
|
|
print("✗" if result[1] else "not available")
|
|
if result[1]:
|
|
warnings.append(result[1])
|
|
|
|
# Check Intel QSV
|
|
if debug:
|
|
print("Testing Intel QSV...", end=' ')
|
|
result = HardwareAcceleration._test_hardware('h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', debug)
|
|
if result[0]:
|
|
if debug:
|
|
print("✓")
|
|
return ('Intel Quick Sync', 'h264_qsv', '-hwaccel qsv -hwaccel_output_format qsv', True)
|
|
if debug:
|
|
print("✗" if result[1] else "not available")
|
|
if result[1]:
|
|
warnings.append(result[1])
|
|
|
|
# Check AMD AMF (Windows/Linux)
|
|
if debug:
|
|
print("Testing AMD AMF...", end=' ')
|
|
result = HardwareAcceleration._test_hardware('h264_amf', '', debug)
|
|
if result[0]:
|
|
if debug:
|
|
print("✓")
|
|
return ('AMD AMF', 'h264_amf', '', True)
|
|
if debug:
|
|
print("✗" if result[1] else "not available")
|
|
if result[1]:
|
|
warnings.append(result[1])
|
|
|
|
# Check VideoToolbox (macOS/iOS - ARM)
|
|
if debug:
|
|
print("Testing VideoToolbox...", end=' ')
|
|
result = HardwareAcceleration._test_hardware('h264_videotoolbox', '-hwaccel videotoolbox', debug)
|
|
if result[0]:
|
|
if debug:
|
|
print("✓")
|
|
return ('VideoToolbox', 'h264_videotoolbox', '-hwaccel videotoolbox', True)
|
|
if debug:
|
|
print("✗" if result[1] else "not available")
|
|
if result[1]:
|
|
warnings.append(result[1])
|
|
|
|
# Check VA-API (Linux Intel/AMD)
|
|
if debug:
|
|
print("Testing VA-API...", end=' ')
|
|
result = HardwareAcceleration._test_vaapi(debug)
|
|
if result[0]:
|
|
if debug:
|
|
print("✓")
|
|
return (result[1], result[2], result[3], result[4]) # return (name, encoder, hwaccel_args, hw_decode)
|
|
if debug:
|
|
print("✗" if result[1] else "not available")
|
|
if result[1]:
|
|
warnings.append(result[1])
|
|
|
|
if debug:
|
|
print("=== End Debug ===\n")
|
|
|
|
# Print warnings if we fell back to software
|
|
if warnings:
|
|
print("\nNote: Hardware acceleration not available:")
|
|
for warning in warnings:
|
|
print(f" - {warning}")
|
|
print()
|
|
|
|
# Fallback to software
|
|
return ('Software (libx264)', 'libx264', '', False)
|
|
|
|
@staticmethod
|
|
def _test_hardware(encoder: str, hwaccel_args: str, debug=False) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Test if hardware encoder is actually available by attempting to use it.
|
|
Returns: (success, warning_message)
|
|
"""
|
|
try:
|
|
# Build a quick test encode command
|
|
cmd = ['ffmpeg', '-y', '-hide_banner', '-loglevel', 'error']
|
|
|
|
# Add hardware acceleration args if present
|
|
if hwaccel_args:
|
|
cmd.extend(hwaccel_args.split())
|
|
|
|
# Generate 1 frame of test video and try to encode it
|
|
cmd.extend([
|
|
'-f', 'lavfi',
|
|
'-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1',
|
|
'-frames:v', '1',
|
|
'-c:v', encoder,
|
|
'-f', 'null',
|
|
'-' if os.name != 'nt' else 'NUL'
|
|
])
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
if debug and result.returncode != 0:
|
|
print(f"\n Error: {result.stderr.decode() if result.stderr else 'Unknown error'}")
|
|
return (result.returncode == 0, None)
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
|
|
if debug:
|
|
print(f"\n Exception: {e}")
|
|
return (False, None)
|
|
|
|
@staticmethod
|
|
def _test_vaapi(debug=False) -> Tuple:
|
|
"""
|
|
Test VA-API with better error handling and permission checks.
|
|
Returns: (name, encoder, hwaccel_args) if success or (False, warning, '')
|
|
"""
|
|
# Check if VA-API encoder is available in FFmpeg
|
|
try:
|
|
result = subprocess.run(
|
|
['ffmpeg', '-hide_banner', '-encoders'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if 'h264_vaapi' not in result.stdout:
|
|
if debug:
|
|
print("\n h264_vaapi encoder not found in FFmpeg")
|
|
return (False, None, '')
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"\n Error checking encoders: {e}")
|
|
return (False, None, '')
|
|
|
|
# Check for /dev/dri devices
|
|
dri_devices = []
|
|
try:
|
|
dri_path = Path('/dev/dri')
|
|
if dri_path.exists():
|
|
dri_devices = sorted([str(d) for d in dri_path.glob('renderD*')])
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"\n Error checking /dev/dri: {e}")
|
|
pass
|
|
|
|
if not dri_devices:
|
|
if debug:
|
|
print("\n No /dev/dri/renderD* devices found")
|
|
return (False, None, '')
|
|
|
|
# Check permissions on first device
|
|
device = dri_devices[0]
|
|
if debug:
|
|
print(f"\n Found device: {device}")
|
|
print(f" Can read: {os.access(device, os.R_OK)}")
|
|
print(f" Can write: {os.access(device, os.W_OK)}")
|
|
print(f" Process groups: {os.getgroups()}")
|
|
|
|
if not os.access(device, os.R_OK | os.W_OK):
|
|
return (False,
|
|
f"VA-API device {device} exists but not accessible. Run: sudo usermod -a -G render $USER (then logout/login)",
|
|
'', '', False)
|
|
|
|
# Test VA-API for both encoding and decoding
|
|
try:
|
|
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f:
|
|
test_video = f.name
|
|
|
|
if debug:
|
|
print(f" Creating test video for VA-API...")
|
|
|
|
# First create a small H.264 test video using software encoding
|
|
cmd1 = [
|
|
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
|
|
'-f', 'lavfi',
|
|
'-i', 'testsrc2=size=1920x1080:duration=0.1:rate=1',
|
|
'-frames:v', '2',
|
|
'-c:v', 'libx264',
|
|
'-preset', 'ultrafast',
|
|
test_video
|
|
]
|
|
result1 = subprocess.run(cmd1, capture_output=True, timeout=5)
|
|
|
|
if debug:
|
|
print(f" Test video creation: {'success' if result1.returncode == 0 else 'failed'}")
|
|
if result1.returncode != 0:
|
|
print(f" Error: {result1.stderr.decode() if result1.stderr else 'Unknown'}")
|
|
|
|
if result1.returncode == 0:
|
|
# Test 1: HW decode + HW encode
|
|
if debug:
|
|
print(f" Testing VA-API HW decode + HW encode...")
|
|
cmd2 = [
|
|
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
|
|
'-vaapi_device', device,
|
|
'-hwaccel', 'vaapi',
|
|
'-hwaccel_output_format', 'vaapi',
|
|
'-i', test_video,
|
|
'-frames:v', '1',
|
|
'-c:v', 'h264_vaapi',
|
|
'-f', 'null',
|
|
'-' if os.name != 'nt' else 'NUL'
|
|
]
|
|
result2 = subprocess.run(cmd2, capture_output=True, timeout=5)
|
|
|
|
if debug:
|
|
print(f" HW decode + HW encode: {'success' if result2.returncode == 0 else 'failed'}")
|
|
if result2.returncode != 0:
|
|
print(f" Error: {result2.stderr.decode() if result2.stderr else 'Unknown'}")
|
|
|
|
# Test 2: SW decode + HW encode (fallback if HW decode doesn't work)
|
|
hw_decode_works = result2.returncode == 0
|
|
if not hw_decode_works:
|
|
if debug:
|
|
print(f" Testing VA-API SW decode + HW encode...")
|
|
cmd3 = [
|
|
'ffmpeg', '-y', '-hide_banner', '-loglevel', 'error',
|
|
'-vaapi_device', device,
|
|
'-i', test_video,
|
|
'-vf', 'format=nv12,hwupload',
|
|
'-frames:v', '1',
|
|
'-c:v', 'h264_vaapi',
|
|
'-f', 'null',
|
|
'-' if os.name != 'nt' else 'NUL'
|
|
]
|
|
result3 = subprocess.run(cmd3, capture_output=True, timeout=5)
|
|
|
|
if debug:
|
|
print(f" SW decode + HW encode: {'success' if result3.returncode == 0 else 'failed'}")
|
|
if result3.returncode != 0:
|
|
print(f" Error: {result3.stderr.decode() if result3.stderr else 'Unknown'}")
|
|
|
|
if result3.returncode == 0:
|
|
os.unlink(test_video)
|
|
return (True, 'VA-API (Intel/AMD) [SW decode + HW encode]', 'h264_vaapi',
|
|
f'-vaapi_device {device}', False)
|
|
|
|
os.unlink(test_video)
|
|
|
|
if hw_decode_works:
|
|
return (True, 'VA-API (Intel/AMD) [HW decode + HW encode]', 'h264_vaapi',
|
|
f'-vaapi_device {device} -hwaccel vaapi -hwaccel_output_format vaapi', True)
|
|
|
|
if os.path.exists(test_video):
|
|
os.unlink(test_video)
|
|
|
|
except Exception as e:
|
|
if debug:
|
|
print(f" Exception during VA-API test: {e}")
|
|
pass
|
|
|
|
return (False, None, '', '', False)
|
|
|
|
|
|
class TestVideo:
|
|
"""Generate or manage test video file."""
|
|
|
|
@staticmethod
|
|
def generate(path: str, duration: int = 30) -> bool:
|
|
"""Generate a 1080p test video."""
|
|
try:
|
|
print(f"Generating {duration}s 1080p test video...")
|
|
cmd = [
|
|
'ffmpeg', '-y',
|
|
'-f', 'lavfi',
|
|
'-i', 'testsrc2=size=1920x1080:rate=30:duration={}'.format(duration),
|
|
'-f', 'lavfi',
|
|
'-i', 'sine=frequency=1000:duration={}'.format(duration),
|
|
'-c:v', 'libx264',
|
|
'-preset', 'medium',
|
|
'-c:a', 'aac',
|
|
'-b:a', '128k',
|
|
path
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, timeout=120)
|
|
return result.returncode == 0
|
|
except Exception as e:
|
|
print(f"Error generating test video: {e}")
|
|
return False
|
|
|
|
|
|
class TranscodeJob:
|
|
"""Represents a single transcode job."""
|
|
|
|
def __init__(self, input_file: str, output_file: str, encoder: str, hwaccel_args: str, hw_decode: bool):
|
|
self.input_file = input_file
|
|
self.output_file = output_file
|
|
self.encoder = encoder
|
|
self.hwaccel_args = hwaccel_args
|
|
self.hw_decode = hw_decode
|
|
self.process = None
|
|
self.start_time = None
|
|
self.end_time = None
|
|
self.success = False
|
|
self.fps = 0.0
|
|
|
|
def run(self):
|
|
"""Execute the transcode job."""
|
|
try:
|
|
# Build FFmpeg command
|
|
cmd = ['ffmpeg', '-y', '-nostdin', '-stats']
|
|
|
|
# Add hardware acceleration args (before input)
|
|
if self.hwaccel_args:
|
|
cmd.extend(self.hwaccel_args.split())
|
|
|
|
cmd.extend(['-i', self.input_file])
|
|
|
|
# For VA-API without HW decode, we need to upload to hardware
|
|
if 'vaapi' in self.encoder and not self.hw_decode:
|
|
cmd.extend(['-vf', 'format=nv12,hwupload'])
|
|
|
|
# Build encoding parameters
|
|
encode_params = ['-c:v', self.encoder]
|
|
|
|
# For VA-API, use CQP mode instead of bitrate if needed
|
|
if 'vaapi' in self.encoder:
|
|
# Use constant quality mode (lower is better quality, 20-30 is good)
|
|
encode_params.extend(['-qp', '23'])
|
|
else:
|
|
# Use bitrate mode for other encoders
|
|
encode_params.extend(['-b:v', '4M'])
|
|
|
|
cmd.extend(encode_params)
|
|
cmd.extend([
|
|
'-c:a', 'aac',
|
|
'-b:a', '128k',
|
|
'-f', 'null' if os.name != 'nt' else 'null',
|
|
self.output_file if os.name != 'nt' else '-'
|
|
])
|
|
|
|
self.start_time = time.time()
|
|
self.process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
|
|
# Wait for completion
|
|
_, stderr = self.process.communicate()
|
|
self.end_time = time.time()
|
|
|
|
# Parse FPS from FFmpeg output
|
|
self.fps = self._parse_fps(stderr)
|
|
self.success = self.process.returncode == 0
|
|
|
|
# Store stderr for debugging
|
|
self.stderr = stderr
|
|
|
|
except Exception as e:
|
|
self.success = False
|
|
self.end_time = time.time()
|
|
self.stderr = str(e)
|
|
|
|
def _parse_fps(self, ffmpeg_output: str) -> float:
|
|
"""Parse average FPS from FFmpeg output."""
|
|
try:
|
|
# Look for the last line with fps= (the final summary line)
|
|
fps_value = None
|
|
for line in ffmpeg_output.split('\n'):
|
|
if 'fps=' in line and 'frame=' in line:
|
|
# Extract fps value
|
|
try:
|
|
fps_str = line.split('fps=')[1].split()[0]
|
|
fps_value = float(fps_str)
|
|
except:
|
|
pass
|
|
|
|
if fps_value and fps_value > 0:
|
|
return fps_value
|
|
|
|
# Fallback: calculate from time and speed
|
|
# Look for lines like "time=00:00:30.00" and "speed=2.5x"
|
|
time_seconds = None
|
|
speed_multiplier = None
|
|
|
|
for line in ffmpeg_output.split('\n'):
|
|
if 'time=' in line:
|
|
try:
|
|
time_str = line.split('time=')[1].split()[0]
|
|
# Parse time format HH:MM:SS.MS
|
|
parts = time_str.split(':')
|
|
if len(parts) == 3:
|
|
h, m, s = parts
|
|
time_seconds = int(h) * 3600 + int(m) * 60 + float(s)
|
|
except:
|
|
pass
|
|
|
|
if 'speed=' in line:
|
|
try:
|
|
speed_str = line.split('speed=')[1].split('x')[0]
|
|
speed_multiplier = float(speed_str)
|
|
except:
|
|
pass
|
|
|
|
# Calculate effective FPS: if encoding 30fps video at 2x speed, effective fps = 60
|
|
if time_seconds and speed_multiplier and self.end_time and self.start_time:
|
|
wall_time = self.end_time - self.start_time
|
|
if wall_time > 0:
|
|
# Assume input is 30fps
|
|
input_fps = 30.0
|
|
frames = time_seconds * input_fps
|
|
return frames / wall_time
|
|
|
|
except Exception as e:
|
|
pass
|
|
return 0.0
|
|
|
|
|
|
class Benchmark:
|
|
"""Main benchmark orchestrator."""
|
|
|
|
def __init__(self, test_video: str, encoder: str, hwaccel_args: str, accel_name: str, hw_decode: bool):
|
|
self.test_video = test_video
|
|
self.encoder = encoder
|
|
self.hwaccel_args = hwaccel_args
|
|
self.accel_name = accel_name
|
|
self.hw_decode = hw_decode
|
|
|
|
def run_parallel_transcodes(self, num_streams: int, timeout: int = 60) -> Tuple[bool, float]:
|
|
"""
|
|
Run multiple parallel transcode jobs.
|
|
Returns: (success, average_fps)
|
|
"""
|
|
jobs = []
|
|
threads = []
|
|
|
|
# Create jobs
|
|
for i in range(num_streams):
|
|
output_file = f'/dev/null' if os.name != 'nt' else 'NUL'
|
|
job = TranscodeJob(self.test_video, output_file, self.encoder, self.hwaccel_args, self.hw_decode)
|
|
jobs.append(job)
|
|
|
|
# Start all jobs in parallel
|
|
for job in jobs:
|
|
thread = threading.Thread(target=job.run)
|
|
thread.start()
|
|
threads.append(thread)
|
|
|
|
# Wait for all to complete (with timeout)
|
|
start = time.time()
|
|
for thread in threads:
|
|
remaining = timeout - (time.time() - start)
|
|
if remaining > 0:
|
|
thread.join(timeout=remaining)
|
|
else:
|
|
return False, 0.0
|
|
|
|
# Check if all succeeded and calculate average FPS
|
|
all_success = all(job.success for job in jobs)
|
|
if all_success:
|
|
avg_fps = sum(job.fps for job in jobs) / len(jobs) if jobs else 0.0
|
|
return True, avg_fps
|
|
|
|
# If failed, print first error for debugging
|
|
for job in jobs:
|
|
if not job.success and hasattr(job, 'stderr'):
|
|
# Skip FFmpeg header and show actual error (last 10 non-empty lines)
|
|
error_lines = [line for line in job.stderr.split('\n') if line.strip()]
|
|
if len(error_lines) > 10:
|
|
error_lines = error_lines[-10:]
|
|
print(f"\n Debug - Error output:")
|
|
for line in error_lines:
|
|
print(f" {line}")
|
|
break
|
|
|
|
return False, 0.0
|
|
|
|
def find_max_streams(self, min_fps: float = 30.0) -> int:
|
|
"""
|
|
Use binary search to find maximum number of simultaneous streams.
|
|
A stream is considered "real-time" if it achieves >= min_fps.
|
|
"""
|
|
print(f"\nBenchmarking with {self.accel_name}...")
|
|
print("Finding maximum simultaneous 1080p streams at real-time or better...\n")
|
|
|
|
# First verify that 1 stream works
|
|
print(f"Testing 1 simultaneous stream...", end=' ', flush=True)
|
|
success, avg_fps = self.run_parallel_transcodes(1)
|
|
|
|
if not success:
|
|
print(f"✗ (failed)")
|
|
return 0
|
|
|
|
if avg_fps < min_fps:
|
|
print(f"✗ (avg {avg_fps:.1f} fps - below real-time)")
|
|
return 0
|
|
|
|
print(f"✓ (avg {avg_fps:.1f} fps)")
|
|
max_streams = 1
|
|
|
|
# Estimate maximum possible streams based on single stream performance
|
|
# Upper bound = ceiling(1.2 * X / min_fps)
|
|
estimated_max = math.ceil(1.2 * avg_fps / min_fps)
|
|
|
|
# Cap the search space reasonably (at least 2, at most 128)
|
|
estimated_max = max(2, min(estimated_max, 128))
|
|
|
|
print(f"Estimated capacity: ~{int(avg_fps / min_fps)} streams (searching up to {estimated_max})")
|
|
|
|
# Binary search bounds
|
|
low, high = 2, estimated_max
|
|
|
|
while low <= high:
|
|
mid = (low + high) // 2
|
|
print(f"Testing {mid} simultaneous streams...", end=' ', flush=True)
|
|
|
|
success, avg_fps = self.run_parallel_transcodes(mid)
|
|
|
|
if success and avg_fps >= min_fps:
|
|
print(f"✓ (avg {avg_fps:.1f} fps)")
|
|
max_streams = mid
|
|
low = mid + 1
|
|
else:
|
|
if success:
|
|
print(f"✗ (avg {avg_fps:.1f} fps - below real-time)")
|
|
else:
|
|
print(f"✗ (failed)")
|
|
high = mid - 1
|
|
|
|
return max_streams
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Benchmark video transcoding performance',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
parser.add_argument(
|
|
'--duration',
|
|
type=int,
|
|
default=10,
|
|
help='Test video duration in seconds (default: 10)'
|
|
)
|
|
parser.add_argument(
|
|
'--input',
|
|
type=str,
|
|
help='Use existing video file instead of generating test video'
|
|
)
|
|
parser.add_argument(
|
|
'--min-fps',
|
|
type=float,
|
|
default=30.0,
|
|
help='Minimum FPS to consider real-time (default: 30.0)'
|
|
)
|
|
parser.add_argument(
|
|
'--debug',
|
|
action='store_true',
|
|
help='Enable debug output for hardware detection'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Check for FFmpeg
|
|
try:
|
|
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
print("Error: FFmpeg not found. Please install FFmpeg and ensure it's in your PATH.")
|
|
return 1
|
|
|
|
print("=" * 60)
|
|
print("Video Transcoding Benchmark")
|
|
print("=" * 60)
|
|
|
|
# Detect hardware acceleration
|
|
accel_name, encoder, hwaccel_args, hw_decode = HardwareAcceleration.detect(debug=args.debug)
|
|
print(f"Detected acceleration: {accel_name}")
|
|
print(f"Encoder: {encoder}")
|
|
|
|
# Prepare test video
|
|
if args.input:
|
|
test_video = args.input
|
|
if not os.path.exists(test_video):
|
|
print(f"Error: Input file '{test_video}' not found.")
|
|
return 1
|
|
else:
|
|
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as f:
|
|
test_video = f.name
|
|
|
|
if not TestVideo.generate(test_video, args.duration):
|
|
print("Error: Failed to generate test video.")
|
|
return 1
|
|
|
|
try:
|
|
# Run benchmark
|
|
benchmark = Benchmark(test_video, encoder, hwaccel_args, accel_name, hw_decode)
|
|
max_streams = benchmark.find_max_streams(args.min_fps)
|
|
|
|
# Display results
|
|
print("\n" + "=" * 60)
|
|
print("BENCHMARK RESULTS")
|
|
print("=" * 60)
|
|
print(f"Hardware Acceleration: {accel_name}")
|
|
print(f"Maximum Simultaneous 1080p Streams: {max_streams}")
|
|
print(f"(at {args.min_fps} FPS or better)")
|
|
print("=" * 60)
|
|
|
|
# Also output just the number for easy parsing
|
|
print(f"\nBenchmark Score: {max_streams}")
|
|
|
|
finally:
|
|
# Cleanup
|
|
if not args.input and os.path.exists(test_video):
|
|
os.unlink(test_video)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|