Files
dropshell-templates/squashkiwi-streaming/config/overlay/overlay_service.py
Your Name 1b95675ef6
All checks were successful
Test and Publish Templates / test-and-publish (push) Successful in 22s
Update overlay service to use MediaMTX v3 API
2025-09-02 12:46:01 +12:00

327 lines
13 KiB
Python

#!/usr/bin/env python3
"""
SquashKiwi Score Overlay Service
Fetches scores from SquashKiwi API and manages recordings
"""
import asyncio
import aiohttp
import json
import subprocess
import os
import time
import signal
import sys
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ScoreOverlayService:
def __init__(self):
# Load configuration from environment
self.api_url = os.getenv('SQUASHKIWI_API', 'https://squash.kiwi/api')
self.club_code = os.getenv('CLUB_CODE', 'OTOG')
self.court_number = os.getenv('COURT_NUMBER', '1')
self.court_id = f"{self.club_code.lower()}{self.court_number}"
self.mediamtx_api = os.getenv('MEDIAMTX_API', 'http://localhost:9997')
self.recording_path = os.getenv('RECORDING_PATH', '/recordings')
# State management
self.current_score = {"player1": 0, "player2": 0, "games": "0-0", "serving": None}
self.recording_process = None
self.recording_filename = None
self.last_score_change = time.time()
self.match_start_time = None
self.idle_timeout = int(os.getenv('IDLE_TIMEOUT', '300'))
# Ensure recording directory exists
os.makedirs(self.recording_path, exist_ok=True)
# Setup signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
logger.info(f"Score overlay service initialized for court {self.court_id}")
def signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
logger.info(f"Received signal {signum}, shutting down...")
if self.recording_process:
self.stop_recording_sync()
sys.exit(0)
async def fetch_score(self) -> Optional[Dict[str, Any]]:
"""Fetch current score from SquashKiwi API"""
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
# Use format: https://squash.kiwi/api/otog2/score
url = f"{self.api_url}/{self.court_id}/score"
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
# Check if match is active
if data.get('match_active', False):
return data
else:
logger.debug(f"No active match on court {self.court_id}")
return None
else:
logger.warning(f"API returned status {resp.status}")
return None
except Exception as e:
logger.error(f"Error fetching score: {e}")
return None
async def check_stream_health(self) -> bool:
"""Check if the stream is healthy"""
try:
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{self.mediamtx_api}/v3/paths/list"
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
paths = data.get('items', [])
# Check if either court or court_h264 path exists (we're transcoding)
return any(p.get('name') in ['court', 'court_h264'] for p in paths)
return False
except Exception as e:
logger.error(f"Failed to check stream health: {e}")
return False
def format_score_text(self) -> str:
"""Format the score for overlay display"""
games = self.current_score.get('games', '0-0')
p1_score = self.current_score.get('player1', 0)
p2_score = self.current_score.get('player2', 0)
p1_name = self.current_score.get('player1_name', 'Player 1')[:15]
p2_name = self.current_score.get('player2_name', 'Player 2')[:15]
serving = self.current_score.get('serving')
serve1 = '' if serving == 1 else ''
serve2 = '' if serving == 2 else ''
return f"{games} | {p1_name}: {p1_score}{serve1} - {p2_name}: {p2_score}{serve2}"
async def start_recording(self):
"""Start recording the match"""
if self.recording_process and self.recording_process.poll() is None:
logger.debug("Recording already in progress")
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
court_folder = os.path.join(self.recording_path, self.court_id)
os.makedirs(court_folder, exist_ok=True)
self.recording_filename = os.path.join(court_folder, f"match_{timestamp}.mp4")
self.match_start_time = datetime.now()
logger.info(f"Starting recording: {self.recording_filename}")
score_text = self.format_score_text()
cmd = [
'ffmpeg',
'-y',
'-i', 'rtsp://localhost:8554/court',
'-vf', f"drawtext=fontfile=/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf:"
f"text='{score_text}':fontcolor=white:fontsize=30:"
f"box=1:boxcolor=black@0.5:boxborderw=5:x=10:y=10:"
f"reload=1:textfile=/tmp/score.txt",
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '23',
'-c:a', 'aac',
'-b:a', '128k',
'-movflags', '+faststart',
self.recording_filename
]
with open('/tmp/score.txt', 'w') as f:
f.write(score_text)
try:
self.recording_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.info(f"Recording process started with PID {self.recording_process.pid}")
except Exception as e:
logger.error(f"Failed to start recording: {e}")
self.recording_process = None
async def stop_recording(self):
"""Stop recording the match"""
if not self.recording_process or self.recording_process.poll() is not None:
logger.debug("No active recording to stop")
return
logger.info("Stopping recording due to inactivity")
try:
self.recording_process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(
asyncio.create_subprocess_exec('wait', str(self.recording_process.pid)),
timeout=5.0
)
except asyncio.TimeoutError:
self.recording_process.kill()
logger.warning("Had to force kill recording process")
if self.match_start_time:
duration = datetime.now() - self.match_start_time
logger.info(f"Recording duration: {duration}")
metadata_file = self.recording_filename.replace('.mp4', '.json')
metadata = {
'court_id': self.court_id,
'start_time': self.match_start_time.isoformat(),
'end_time': datetime.now().isoformat(),
'duration': str(duration),
'final_score': self.current_score
}
try:
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"Metadata saved to {metadata_file}")
except Exception as e:
logger.error(f"Failed to save metadata: {e}")
except Exception as e:
logger.error(f"Error stopping recording: {e}")
finally:
self.recording_process = None
self.recording_filename = None
self.match_start_time = None
def stop_recording_sync(self):
"""Synchronous version of stop_recording for signal handler"""
if self.recording_process and self.recording_process.poll() is None:
logger.info("Stopping recording (sync)")
self.recording_process.terminate()
self.recording_process.wait(timeout=5)
async def update_overlay_text(self):
"""Update the overlay text file"""
score_text = self.format_score_text()
try:
with open('/tmp/score.txt', 'w') as f:
f.write(score_text)
except Exception as e:
logger.error(f"Failed to update overlay text: {e}")
async def cleanup_old_recordings(self):
"""Remove recordings older than retention period"""
retention_days = int(os.getenv('RECORDING_RETENTION_DAYS', '30'))
cutoff_date = datetime.now() - timedelta(days=retention_days)
try:
for root, dirs, files in os.walk(self.recording_path):
for file in files:
if file.endswith('.mp4'):
file_path = os.path.join(root, file)
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_date:
os.remove(file_path)
logger.info(f"Deleted old recording: {file}")
metadata_file = file_path.replace('.mp4', '.json')
if os.path.exists(metadata_file):
os.remove(metadata_file)
except Exception as e:
logger.error(f"Error during cleanup: {e}")
async def main_loop(self):
"""Main service loop"""
logger.info("Starting main service loop")
# Initialize overlay text
court_name = os.getenv('COURT_NAME', f'Court {self.court_number}')
try:
# Create file with write permissions for all
with open('/tmp/score.txt', 'w') as f:
f.write(f"{court_name} - Waiting for match...")
os.chmod('/tmp/score.txt', 0o666)
except PermissionError:
# If file exists and we can't write, try to remove and recreate
try:
os.remove('/tmp/score.txt')
with open('/tmp/score.txt', 'w') as f:
f.write(f"{court_name} - Waiting for match...")
os.chmod('/tmp/score.txt', 0o666)
except:
logger.error("Cannot create score.txt file - check permissions")
await self.cleanup_old_recordings()
last_cleanup = datetime.now()
while True:
try:
stream_healthy = await self.check_stream_health()
if not stream_healthy:
logger.warning("Stream not healthy, waiting...")
await asyncio.sleep(10)
continue
score = await self.fetch_score()
if score:
if score != self.current_score:
self.current_score = score
self.last_score_change = time.time()
logger.info(f"Score updated: {self.format_score_text()}")
await self.update_overlay_text()
if not self.recording_process or self.recording_process.poll() is not None:
await self.start_recording()
idle_time = time.time() - self.last_score_change
if self.recording_process and idle_time > self.idle_timeout:
logger.info(f"Match idle for {idle_time:.0f} seconds")
await self.stop_recording()
else:
if self.recording_process:
logger.info("No active match detected")
await self.stop_recording()
# Update overlay to show no active match
court_name = os.getenv('COURT_NAME', f'Court {self.court_number}')
with open('/tmp/score.txt', 'w') as f:
f.write(f"{court_name} - No active match")
if datetime.now() - last_cleanup > timedelta(days=1):
await self.cleanup_old_recordings()
last_cleanup = datetime.now()
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error in main loop: {e}")
await asyncio.sleep(5)
async def run(self):
"""Run the service"""
logger.info("SquashKiwi Score Overlay Service starting...")
await self.main_loop()
if __name__ == "__main__":
service = ScoreOverlayService()
try:
asyncio.run(service.run())
except KeyboardInterrupt:
logger.info("Service stopped by user")
except Exception as e:
logger.error(f"Service crashed: {e}")
sys.exit(1)