Files
dropshell-templates/squashthumbnailmaker/generate_thumbnail.py
Your Name 2222a77070
All checks were successful
Test and Publish Templates / test-and-publish (push) Successful in 48s
docs: Add 24 files
2025-09-22 23:57:38 +12:00

529 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Squash Thumbnail Maker - Generates thumbnails for MP4 videos
Uses smart detection to find best frame with players visible
Independent standalone version based on squashkiwi approach
"""
import os
import sys
import json
import logging
import subprocess
import asyncio
from pathlib import Path
from typing import Optional, Tuple
from datetime import datetime
# Set process to low priority if configured
try:
nice_level = int(os.environ.get('NICE_LEVEL', '19'))
os.nice(nice_level)
except:
pass
logging.basicConfig(
level=logging.INFO,
format='%(levelname)s - %(message)s'
)
logger = logging.getLogger('thumbnailmaker')
class ThumbnailGenerator:
"""Generates thumbnails for videos using sophisticated AI detection"""
def __init__(self):
self.has_yolo = self.check_yolo_available()
self.enable_smart = os.environ.get('ENABLE_SMART_DETECTION', 'true').lower() == 'true'
self.fallback_time = int(os.environ.get('FALLBACK_TIME', '10'))
self.thumbnail_width = int(os.environ.get('THUMBNAIL_WIDTH', '640'))
self.thumbnail_quality = int(os.environ.get('THUMBNAIL_QUALITY', '95'))
self.max_sample_time = int(os.environ.get('MAX_SAMPLE_TIME', '180'))
self.sample_interval = int(os.environ.get('SAMPLE_INTERVAL', '1'))
def check_yolo_available(self) -> bool:
"""Check if YOLOv8 is available for smart detection"""
try:
from ultralytics import YOLO
return True
except ImportError:
logger.debug("YOLOv8 not available, will use simple thumbnail generation")
return False
async def process_video_file(self, video_file: Path) -> bool:
"""Process a single video file"""
if not video_file.exists():
logger.error(f"Video file not found: {video_file}")
return False
if video_file.suffix.lower() not in ['.mp4', '.mov', '.avi', '.mkv']:
logger.warning(f"Skipping non-video file: {video_file}")
return False
return await self.generate_thumbnail(video_file)
async def process_directory(self, directory: Path) -> int:
"""Process all video files in a directory"""
if not directory.exists() or not directory.is_dir():
logger.error(f"Directory not found: {directory}")
return 0
video_extensions = ['.mp4', '.mov', '.avi', '.mkv']
video_files = []
for ext in video_extensions:
video_files.extend(directory.glob(f'*{ext}'))
video_files.extend(directory.glob(f'*{ext.upper()}'))
if not video_files:
logger.warning(f"No video files found in {directory}")
return 0
logger.info(f"Found {len(video_files)} video files to process")
success_count = 0
for video_file in video_files:
if await self.process_video_file(video_file):
success_count += 1
return success_count
async def generate_thumbnail(self, video_file: Path) -> bool:
"""Main method to generate thumbnail - tries smart detection first, falls back to simple"""
thumbnail_file = video_file.with_suffix('.jpg')
# Skip if thumbnail already exists
if thumbnail_file.exists():
logger.info(f"Thumbnail already exists: {thumbnail_file.name}")
return True
try:
# Try smart thumbnail generation first if enabled
if self.enable_smart and self.has_yolo:
try:
success = await self.generate_smart_thumbnail(video_file)
if success:
return True
except ImportError as e:
logger.debug(f"YOLOv8 not available, using simple thumbnail: {e}")
except Exception as e:
logger.warning(f"Smart thumbnail failed, using fallback: {e}")
# Fallback to simple thumbnail at configured time
success = await self.generate_simple_thumbnail(video_file, seek_time=self.fallback_time)
if success:
return True
# Last resort - try to get frame at 1 second
success = await self.generate_simple_thumbnail(video_file, seek_time=1)
if not success:
logger.error(f"All thumbnail generation methods failed for {video_file.name}")
return False
except Exception as e:
logger.error(f"Critical error generating thumbnail: {e}")
return False
async def generate_simple_thumbnail(self, video_file: Path, seek_time: int = 10) -> bool:
"""Generate a simple thumbnail at specified time"""
try:
thumbnail_file = video_file.with_suffix('.jpg')
logger.debug(f"Attempting simple thumbnail generation for {video_file.name} at {seek_time}s")
if not video_file.exists():
logger.error(f"Video file does not exist: {video_file}")
return False
video_size = video_file.stat().st_size
logger.debug(f"Video file size: {video_size / (1024*1024):.1f} MB")
cmd = [
'ffmpeg',
'-i', str(video_file),
'-ss', str(seek_time),
'-vframes', '1',
'-vf', f'scale={self.thumbnail_width}:-1',
'-q:v', str(100 - self.thumbnail_quality + 1), # Convert quality to ffmpeg scale
'-y',
str(thumbnail_file)
]
logger.debug(f"Running FFmpeg command: {' '.join(cmd[:6])}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
if thumbnail_file.exists():
thumb_size = thumbnail_file.stat().st_size
logger.info(f"Generated simple thumbnail: {thumbnail_file.name} at {seek_time}s ({thumb_size / 1024:.1f} KB)")
return True
else:
logger.error(f"FFmpeg succeeded but thumbnail file not created: {thumbnail_file}")
return False
else:
logger.error(f"FFmpeg failed with code {process.returncode} for {video_file.name}")
if stderr:
stderr_text = stderr.decode('utf-8', errors='ignore')[-500:]
logger.error(f"FFmpeg stderr: {stderr_text}")
return False
except Exception as e:
logger.error(f"Exception generating simple thumbnail for {video_file.name}: {e}")
return False
async def generate_smart_thumbnail(self, video_file: Path) -> bool:
"""Generate thumbnail using YOLOv8 pose model to find best frame with both players"""
try:
logger.info(f"Starting smart thumbnail generation for {video_file.name}")
# Import here to avoid loading if not needed
from ultralytics import YOLO
import cv2
import numpy as np
thumbnail_file = video_file.with_suffix('.jpg')
# Load YOLO pose model for better player detection
logger.debug("Loading YOLOv8 pose model...")
model = YOLO('yolov8n-pose.pt')
logger.debug("YOLOv8 model loaded successfully")
# Sample frames from video
sample_times = list(range(1, min(self.max_sample_time + 1, 181), self.sample_interval))
logger.debug(f"Will sample {len(sample_times)} frames from video")
# Get video dimensions first
probe_cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'csv=s=x:p=0',
str(video_file)
]
probe_process = await asyncio.create_subprocess_exec(
*probe_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
dimensions, _ = await probe_process.communicate()
if not dimensions:
logger.error("Failed to get video dimensions")
return False
width, height = map(int, dimensions.decode().strip().split('x'))
logger.debug(f"Video dimensions: {width}x{height}")
best_frame = None
best_score = 0
best_time = self.fallback_time
frames_processed = 0
# First pass: sample frames
for seek_time in sample_times:
logger.debug(f"Extracting frame at {seek_time}s...")
cmd = [
'ffmpeg',
'-ss', str(seek_time),
'-i', str(video_file),
'-vframes', '1',
'-f', 'image2pipe',
'-pix_fmt', 'rgb24',
'-vcodec', 'rawvideo',
'-'
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
stdout, _ = await process.communicate()
if process.returncode != 0 or not stdout:
logger.debug(f"Failed to extract frame at {seek_time}s")
continue
# Convert raw video to numpy array
frame = np.frombuffer(stdout, dtype=np.uint8)
frame = frame.reshape((height, width, 3))
frames_processed += 1
# Run YOLO detection
logger.debug(f"Running YOLO detection on frame at {seek_time}s...")
results = model(frame, verbose=False, conf=0.3)
# Calculate score for this frame
score = self.calculate_frame_score(results, width, height)
logger.debug(f"Frame at {seek_time}s scored: {score}")
if score > best_score:
best_score = score
best_frame = frame
best_time = seek_time
logger.info(f"New best frame found at {seek_time}s with score {score}")
# If we find an excellent frame, stop searching
if score >= 180:
logger.info(f"Excellent frame found at {seek_time}s! Stopping search.")
break
# Second pass: fine-grained search around the best frame
if best_frame is not None and best_score > 50:
logger.info(f"Found initial best frame at {best_time}s with score {best_score}")
logger.info(f"Performing fine-tuning search ±0.9s around best frame...")
fine_sample_times = []
for offset in range(-9, 10):
if offset == 0:
continue
fine_time = best_time + (offset * 0.1)
if fine_time > 0:
fine_sample_times.append(fine_time)
logger.info(f"Fine-tuning: checking {len(fine_sample_times)} frames around {best_time}s")
for seek_time in fine_sample_times:
logger.debug(f"Fine-tuning: checking frame at {seek_time:.1f}s...")
cmd = [
'ffmpeg',
'-ss', str(seek_time),
'-i', str(video_file),
'-vframes', '1',
'-f', 'image2pipe',
'-pix_fmt', 'rgb24',
'-vcodec', 'rawvideo',
'-'
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL
)
stdout, _ = await process.communicate()
if process.returncode != 0 or not stdout:
logger.debug(f"Failed to extract frame at {seek_time:.1f}s")
continue
frame = np.frombuffer(stdout, dtype=np.uint8)
frame = frame.reshape((height, width, 3))
frames_processed += 1
results = model(frame, verbose=False, conf=0.3)
score = self.calculate_frame_score(results, width, height)
logger.debug(f"Fine-tuning: frame at {seek_time:.1f}s scored: {score}")
if score > best_score:
logger.info(f"Fine-tuning: found better frame at {seek_time:.1f}s with score {score}")
best_score = score
best_frame = frame
best_time = seek_time
if score >= 180:
logger.info(f"Fine-tuning: excellent frame found at {seek_time:.1f}s!")
break
logger.info(f"Fine-tuning complete. Final best frame at {best_time}s with score {best_score}")
# Save the best frame if we found one
logger.info(f"Processed {frames_processed} frames total")
if best_frame is not None and best_score > 0:
logger.info(f"Saving final best frame (score {best_score}) from {best_time}s as thumbnail")
# Convert RGB to BGR for OpenCV
best_frame_bgr = cv2.cvtColor(best_frame, cv2.COLOR_RGB2BGR)
# Resize to configured width
height, width = best_frame_bgr.shape[:2]
new_width = self.thumbnail_width
new_height = int(height * (new_width / width))
resized = cv2.resize(best_frame_bgr, (new_width, new_height), interpolation=cv2.INTER_AREA)
logger.debug(f"Resized thumbnail to {new_width}x{new_height}")
# Save as JPEG with configured quality
cv2.imwrite(str(thumbnail_file), resized, [cv2.IMWRITE_JPEG_QUALITY, self.thumbnail_quality])
if thumbnail_file.exists():
thumb_size = thumbnail_file.stat().st_size
logger.info(f"Smart thumbnail saved: {thumbnail_file.name} ({thumb_size / 1024:.1f} KB) from {best_time}s with score {best_score}")
return True
else:
logger.error(f"Failed to save smart thumbnail to {thumbnail_file}")
return False
else:
logger.warning(f"No suitable frame found for smart thumbnail (best score was {best_score})")
return False
except ImportError as e:
logger.debug(f"YOLOv8 not installed: {e}")
return False
except Exception as e:
logger.error(f"Smart thumbnail generation error: {e}", exc_info=True)
return False
def calculate_frame_score(self, results, frame_width, frame_height) -> int:
"""Calculate score for a frame based on player detection and pose"""
import numpy as np
score = 0
if not results or len(results) == 0:
logger.debug("No detection results")
return 0
result = results[0]
# Get person detections
if result.boxes is None:
logger.debug("No boxes detected")
return 0
persons = []
for box in result.boxes:
if box.cls == 0: # Class 0 is person in COCO
persons.append(box)
logger.debug(f"Detected {len(persons)} person(s) in frame")
# Score based on number of people
if len(persons) == 2:
score = 30 # Base score for having both players
logger.debug("Both players detected: +30 points")
elif len(persons) == 1:
score = 5 # Low score for single player
logger.debug("Single player detected: +5 points")
else:
logger.debug(f"No valid player count ({len(persons)} detected)")
return 0
# Check poses if available for face visibility
if hasattr(result, 'keypoints') and result.keypoints is not None:
keypoints = result.keypoints.xy.cpu().numpy() if hasattr(result.keypoints.xy, 'cpu') else result.keypoints.xy
faces_visible = 0
for person_kpts in keypoints[:2]: # Check first 2 people
if len(person_kpts) > 0:
# Check if face keypoints are visible (indices 0-4 are face in COCO pose)
face_points = person_kpts[:5]
face_visible = np.sum(face_points[:, 0] > 0) >= 3 # At least 3 face points visible
if face_visible:
faces_visible += 1
# Bonus points for faces visible
if faces_visible == 2:
score += 100 # Both faces clearly visible
logger.debug("Both faces visible: +100 points")
elif faces_visible == 1:
score += 20 # One face visible
logger.debug("One face visible: +20 points")
# Additional scoring based on player size and positioning
if len(persons) == 2:
center_players = 0
total_player_area = 0
min_player_area = float('inf')
for box in persons[:2]:
x1, y1, x2, y2 = box.xyxy[0].tolist()
center_x = (x1 + x2) / 2
# Calculate player bounding box area as percentage of frame
box_width = x2 - x1
box_height = y2 - y1
box_area = (box_width * box_height) / (frame_width * frame_height)
total_player_area += box_area
min_player_area = min(min_player_area, box_area)
# Check if player is in central 60% of frame
if 0.2 * frame_width < center_x < 0.8 * frame_width:
center_players += 1
# Score based on player size
avg_player_area = total_player_area / 2
if avg_player_area > 0.08:
score += 40
logger.debug(f"Players are large ({avg_player_area*100:.1f}% avg): +40 points")
elif avg_player_area > 0.05:
score += 25
logger.debug(f"Players are good size ({avg_player_area*100:.1f}% avg): +25 points")
elif avg_player_area > 0.03:
score += 10
logger.debug(f"Players are small ({avg_player_area*100:.1f}% avg): +10 points")
# Both players should be similar size
if min_player_area > 0:
size_ratio = min_player_area / (total_player_area - min_player_area)
if size_ratio > 0.5:
score += 10
logger.debug(f"Players are similar size (ratio {size_ratio:.2f}): +10 points")
# Bonus for both players being centered
if center_players == 2:
score += 15
logger.debug("Both players centered: +15 points")
elif center_players == 1:
score += 5
logger.debug("One player centered: +5 points")
logger.debug(f"Final frame score: {score}")
return score
async def main():
"""Main entry point"""
if len(sys.argv) != 2:
logger.error("Usage: generate_thumbnail.py <video_file_or_directory>")
sys.exit(1)
input_path = Path(sys.argv[1])
# Report nice level
try:
nice_level = os.nice(0) # Get current nice level
logger.info(f"Running at nice level: {nice_level} (low priority)")
except:
pass
generator = ThumbnailGenerator()
try:
if input_path.is_file():
# Process single file
if await generator.process_video_file(input_path):
logger.info(f"Thumbnail generation successful for {input_path.name}")
sys.exit(0)
else:
logger.error(f"Thumbnail generation failed for {input_path.name}")
sys.exit(1)
elif input_path.is_dir():
# Process directory
success_count = await generator.process_directory(input_path)
total_count = len(list(input_path.glob('*.mp4')) + list(input_path.glob('*.MP4')))
logger.info(f"Processed {success_count}/{total_count} videos successfully")
sys.exit(0 if success_count > 0 else 1)
else:
logger.error(f"Invalid input: {input_path} is neither a file nor a directory")
sys.exit(1)
except Exception as e:
logger.error(f"Fatal error: {e}")
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())