#!/usr/bin/env python3 """ SquashKiwi Score Overlay Service Fetches scores from SquashKiwi API and manages recordings """ import asyncio import aiohttp import json import subprocess import os import time import signal import sys from datetime import datetime, timedelta from typing import Optional, Dict, Any import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class ScoreOverlayService: def __init__(self): # Load configuration from environment self.api_url = os.getenv('SQUASHKIWI_API', 'https://squash.kiwi/api') self.court_id = os.getenv('COURT_ID', 'court1') self.mediamtx_api = os.getenv('MEDIAMTX_API', 'http://localhost:9997') self.recording_path = os.getenv('RECORDING_PATH', '/recordings') # State management self.current_score = {"player1": 0, "player2": 0, "games": "0-0", "serving": None} self.recording_process = None self.recording_filename = None self.last_score_change = time.time() self.match_start_time = None self.idle_timeout = int(os.getenv('IDLE_TIMEOUT', '300')) # Ensure recording directory exists os.makedirs(self.recording_path, exist_ok=True) # Setup signal handlers signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) logger.info(f"Score overlay service initialized for court {self.court_id}") def signal_handler(self, signum, frame): """Handle shutdown signals gracefully""" logger.info(f"Received signal {signum}, shutting down...") if self.recording_process: self.stop_recording_sync() sys.exit(0) async def fetch_score(self) -> Optional[Dict[str, Any]]: """Fetch current score from SquashKiwi API""" try: timeout = aiohttp.ClientTimeout(total=5) async with aiohttp.ClientSession(timeout=timeout) as session: url = f"{self.api_url}/court/{self.court_id}/score" async with session.get(url) as resp: if resp.status == 200: return await resp.json() elif resp.status == 404: logger.debug(f"No active match on court {self.court_id}") return None else: logger.warning(f"API returned status {resp.status}") return None except Exception as e: logger.error(f"Error fetching score: {e}") return None async def check_stream_health(self) -> bool: """Check if the stream is healthy""" try: timeout = aiohttp.ClientTimeout(total=2) async with aiohttp.ClientSession(timeout=timeout) as session: url = f"{self.mediamtx_api}/v2/paths/list" async with session.get(url) as resp: if resp.status == 200: data = await resp.json() paths = data.get('items', []) return any(p.get('name') == 'court_main' for p in paths) return False except Exception as e: logger.error(f"Failed to check stream health: {e}") return False def format_score_text(self) -> str: """Format the score for overlay display""" games = self.current_score.get('games', '0-0') p1_score = self.current_score.get('player1', 0) p2_score = self.current_score.get('player2', 0) p1_name = self.current_score.get('player1_name', 'Player 1')[:15] p2_name = self.current_score.get('player2_name', 'Player 2')[:15] serving = self.current_score.get('serving') serve1 = ' •' if serving == 1 else '' serve2 = ' •' if serving == 2 else '' return f"{games} | {p1_name}: {p1_score}{serve1} - {p2_name}: {p2_score}{serve2}" async def start_recording(self): """Start recording the match""" if self.recording_process and self.recording_process.poll() is None: logger.debug("Recording already in progress") return timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") court_folder = os.path.join(self.recording_path, self.court_id) os.makedirs(court_folder, exist_ok=True) self.recording_filename = os.path.join(court_folder, f"match_{timestamp}.mp4") self.match_start_time = datetime.now() logger.info(f"Starting recording: {self.recording_filename}") score_text = self.format_score_text() cmd = [ 'ffmpeg', '-y', '-i', 'rtsp://localhost:8554/court_main', '-vf', f"drawtext=fontfile=/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf:" f"text='{score_text}':fontcolor=white:fontsize=30:" f"box=1:boxcolor=black@0.5:boxborderw=5:x=10:y=10:" f"reload=1:textfile=/tmp/score.txt", '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-c:a', 'aac', '-b:a', '128k', '-movflags', '+faststart', self.recording_filename ] with open('/tmp/score.txt', 'w') as f: f.write(score_text) try: self.recording_process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) logger.info(f"Recording process started with PID {self.recording_process.pid}") except Exception as e: logger.error(f"Failed to start recording: {e}") self.recording_process = None async def stop_recording(self): """Stop recording the match""" if not self.recording_process or self.recording_process.poll() is not None: logger.debug("No active recording to stop") return logger.info("Stopping recording due to inactivity") try: self.recording_process.send_signal(signal.SIGINT) try: await asyncio.wait_for( asyncio.create_subprocess_exec('wait', str(self.recording_process.pid)), timeout=5.0 ) except asyncio.TimeoutError: self.recording_process.kill() logger.warning("Had to force kill recording process") if self.match_start_time: duration = datetime.now() - self.match_start_time logger.info(f"Recording duration: {duration}") metadata_file = self.recording_filename.replace('.mp4', '.json') metadata = { 'court_id': self.court_id, 'start_time': self.match_start_time.isoformat(), 'end_time': datetime.now().isoformat(), 'duration': str(duration), 'final_score': self.current_score } try: with open(metadata_file, 'w') as f: json.dump(metadata, f, indent=2) logger.info(f"Metadata saved to {metadata_file}") except Exception as e: logger.error(f"Failed to save metadata: {e}") except Exception as e: logger.error(f"Error stopping recording: {e}") finally: self.recording_process = None self.recording_filename = None self.match_start_time = None def stop_recording_sync(self): """Synchronous version of stop_recording for signal handler""" if self.recording_process and self.recording_process.poll() is None: logger.info("Stopping recording (sync)") self.recording_process.terminate() self.recording_process.wait(timeout=5) async def update_overlay_text(self): """Update the overlay text file""" score_text = self.format_score_text() try: with open('/tmp/score.txt', 'w') as f: f.write(score_text) except Exception as e: logger.error(f"Failed to update overlay text: {e}") async def cleanup_old_recordings(self): """Remove recordings older than retention period""" retention_days = int(os.getenv('RECORDING_RETENTION_DAYS', '30')) cutoff_date = datetime.now() - timedelta(days=retention_days) try: for root, dirs, files in os.walk(self.recording_path): for file in files: if file.endswith('.mp4'): file_path = os.path.join(root, file) file_time = datetime.fromtimestamp(os.path.getmtime(file_path)) if file_time < cutoff_date: os.remove(file_path) logger.info(f"Deleted old recording: {file}") metadata_file = file_path.replace('.mp4', '.json') if os.path.exists(metadata_file): os.remove(metadata_file) except Exception as e: logger.error(f"Error during cleanup: {e}") async def main_loop(self): """Main service loop""" logger.info("Starting main service loop") await self.cleanup_old_recordings() last_cleanup = datetime.now() while True: try: stream_healthy = await self.check_stream_health() if not stream_healthy: logger.warning("Stream not healthy, waiting...") await asyncio.sleep(10) continue score = await self.fetch_score() if score: if score != self.current_score: self.current_score = score self.last_score_change = time.time() logger.info(f"Score updated: {self.format_score_text()}") await self.update_overlay_text() if not self.recording_process or self.recording_process.poll() is not None: await self.start_recording() idle_time = time.time() - self.last_score_change if self.recording_process and idle_time > self.idle_timeout: logger.info(f"Match idle for {idle_time:.0f} seconds") await self.stop_recording() else: if self.recording_process: logger.info("No active match detected") await self.stop_recording() if datetime.now() - last_cleanup > timedelta(days=1): await self.cleanup_old_recordings() last_cleanup = datetime.now() await asyncio.sleep(1) except Exception as e: logger.error(f"Error in main loop: {e}") await asyncio.sleep(5) async def run(self): """Run the service""" logger.info("SquashKiwi Score Overlay Service starting...") await self.main_loop() if __name__ == "__main__": service = ScoreOverlayService() try: asyncio.run(service.run()) except KeyboardInterrupt: logger.info("Service stopped by user") except Exception as e: logger.error(f"Service crashed: {e}") sys.exit(1)