Files
dropshell-templates/squashkiwi-streaming/config/overlay/overlay_service.py
Your Name 1a6306a39d
All checks were successful
Test and Publish Templates / test-and-publish (push) Successful in 21s
Update overlay service to use match_active boolean instead of 404 status
2025-09-02 09:56:31 +12:00

309 lines
12 KiB
Python

#!/usr/bin/env python3
"""
SquashKiwi Score Overlay Service
Fetches scores from SquashKiwi API and manages recordings
"""
import asyncio
import aiohttp
import json
import subprocess
import os
import time
import signal
import sys
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ScoreOverlayService:
def __init__(self):
# Load configuration from environment
self.api_url = os.getenv('SQUASHKIWI_API', 'https://squash.kiwi/api')
self.court_id = os.getenv('COURT_ID', 'court1')
self.mediamtx_api = os.getenv('MEDIAMTX_API', 'http://localhost:9997')
self.recording_path = os.getenv('RECORDING_PATH', '/recordings')
# State management
self.current_score = {"player1": 0, "player2": 0, "games": "0-0", "serving": None}
self.recording_process = None
self.recording_filename = None
self.last_score_change = time.time()
self.match_start_time = None
self.idle_timeout = int(os.getenv('IDLE_TIMEOUT', '300'))
# Ensure recording directory exists
os.makedirs(self.recording_path, exist_ok=True)
# Setup signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
logger.info(f"Score overlay service initialized for court {self.court_id}")
def signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
logger.info(f"Received signal {signum}, shutting down...")
if self.recording_process:
self.stop_recording_sync()
sys.exit(0)
async def fetch_score(self) -> Optional[Dict[str, Any]]:
"""Fetch current score from SquashKiwi API"""
try:
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{self.api_url}/court/{self.court_id}/score"
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
# Check if match is active
if data.get('match_active', False):
return data
else:
logger.debug(f"No active match on court {self.court_id}")
return None
else:
logger.warning(f"API returned status {resp.status}")
return None
except Exception as e:
logger.error(f"Error fetching score: {e}")
return None
async def check_stream_health(self) -> bool:
"""Check if the stream is healthy"""
try:
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{self.mediamtx_api}/v2/paths/list"
async with session.get(url) as resp:
if resp.status == 200:
data = await resp.json()
paths = data.get('items', [])
return any(p.get('name') == 'court_main' for p in paths)
return False
except Exception as e:
logger.error(f"Failed to check stream health: {e}")
return False
def format_score_text(self) -> str:
"""Format the score for overlay display"""
games = self.current_score.get('games', '0-0')
p1_score = self.current_score.get('player1', 0)
p2_score = self.current_score.get('player2', 0)
p1_name = self.current_score.get('player1_name', 'Player 1')[:15]
p2_name = self.current_score.get('player2_name', 'Player 2')[:15]
serving = self.current_score.get('serving')
serve1 = '' if serving == 1 else ''
serve2 = '' if serving == 2 else ''
return f"{games} | {p1_name}: {p1_score}{serve1} - {p2_name}: {p2_score}{serve2}"
async def start_recording(self):
"""Start recording the match"""
if self.recording_process and self.recording_process.poll() is None:
logger.debug("Recording already in progress")
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
court_folder = os.path.join(self.recording_path, self.court_id)
os.makedirs(court_folder, exist_ok=True)
self.recording_filename = os.path.join(court_folder, f"match_{timestamp}.mp4")
self.match_start_time = datetime.now()
logger.info(f"Starting recording: {self.recording_filename}")
score_text = self.format_score_text()
cmd = [
'ffmpeg',
'-y',
'-i', 'rtsp://localhost:8554/court_main',
'-vf', f"drawtext=fontfile=/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf:"
f"text='{score_text}':fontcolor=white:fontsize=30:"
f"box=1:boxcolor=black@0.5:boxborderw=5:x=10:y=10:"
f"reload=1:textfile=/tmp/score.txt",
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '23',
'-c:a', 'aac',
'-b:a', '128k',
'-movflags', '+faststart',
self.recording_filename
]
with open('/tmp/score.txt', 'w') as f:
f.write(score_text)
try:
self.recording_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logger.info(f"Recording process started with PID {self.recording_process.pid}")
except Exception as e:
logger.error(f"Failed to start recording: {e}")
self.recording_process = None
async def stop_recording(self):
"""Stop recording the match"""
if not self.recording_process or self.recording_process.poll() is not None:
logger.debug("No active recording to stop")
return
logger.info("Stopping recording due to inactivity")
try:
self.recording_process.send_signal(signal.SIGINT)
try:
await asyncio.wait_for(
asyncio.create_subprocess_exec('wait', str(self.recording_process.pid)),
timeout=5.0
)
except asyncio.TimeoutError:
self.recording_process.kill()
logger.warning("Had to force kill recording process")
if self.match_start_time:
duration = datetime.now() - self.match_start_time
logger.info(f"Recording duration: {duration}")
metadata_file = self.recording_filename.replace('.mp4', '.json')
metadata = {
'court_id': self.court_id,
'start_time': self.match_start_time.isoformat(),
'end_time': datetime.now().isoformat(),
'duration': str(duration),
'final_score': self.current_score
}
try:
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
logger.info(f"Metadata saved to {metadata_file}")
except Exception as e:
logger.error(f"Failed to save metadata: {e}")
except Exception as e:
logger.error(f"Error stopping recording: {e}")
finally:
self.recording_process = None
self.recording_filename = None
self.match_start_time = None
def stop_recording_sync(self):
"""Synchronous version of stop_recording for signal handler"""
if self.recording_process and self.recording_process.poll() is None:
logger.info("Stopping recording (sync)")
self.recording_process.terminate()
self.recording_process.wait(timeout=5)
async def update_overlay_text(self):
"""Update the overlay text file"""
score_text = self.format_score_text()
try:
with open('/tmp/score.txt', 'w') as f:
f.write(score_text)
except Exception as e:
logger.error(f"Failed to update overlay text: {e}")
async def cleanup_old_recordings(self):
"""Remove recordings older than retention period"""
retention_days = int(os.getenv('RECORDING_RETENTION_DAYS', '30'))
cutoff_date = datetime.now() - timedelta(days=retention_days)
try:
for root, dirs, files in os.walk(self.recording_path):
for file in files:
if file.endswith('.mp4'):
file_path = os.path.join(root, file)
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_date:
os.remove(file_path)
logger.info(f"Deleted old recording: {file}")
metadata_file = file_path.replace('.mp4', '.json')
if os.path.exists(metadata_file):
os.remove(metadata_file)
except Exception as e:
logger.error(f"Error during cleanup: {e}")
async def main_loop(self):
"""Main service loop"""
logger.info("Starting main service loop")
# Initialize overlay text
with open('/tmp/score.txt', 'w') as f:
f.write(f"Court {self.court_id} - Waiting for match...")
await self.cleanup_old_recordings()
last_cleanup = datetime.now()
while True:
try:
stream_healthy = await self.check_stream_health()
if not stream_healthy:
logger.warning("Stream not healthy, waiting...")
await asyncio.sleep(10)
continue
score = await self.fetch_score()
if score:
if score != self.current_score:
self.current_score = score
self.last_score_change = time.time()
logger.info(f"Score updated: {self.format_score_text()}")
await self.update_overlay_text()
if not self.recording_process or self.recording_process.poll() is not None:
await self.start_recording()
idle_time = time.time() - self.last_score_change
if self.recording_process and idle_time > self.idle_timeout:
logger.info(f"Match idle for {idle_time:.0f} seconds")
await self.stop_recording()
else:
if self.recording_process:
logger.info("No active match detected")
await self.stop_recording()
# Update overlay to show no active match
with open('/tmp/score.txt', 'w') as f:
f.write(f"Court {self.court_id} - No active match")
if datetime.now() - last_cleanup > timedelta(days=1):
await self.cleanup_old_recordings()
last_cleanup = datetime.now()
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error in main loop: {e}")
await asyncio.sleep(5)
async def run(self):
"""Run the service"""
logger.info("SquashKiwi Score Overlay Service starting...")
await self.main_loop()
if __name__ == "__main__":
service = ScoreOverlayService()
try:
asyncio.run(service.run())
except KeyboardInterrupt:
logger.info("Service stopped by user")
except Exception as e:
logger.error(f"Service crashed: {e}")
sys.exit(1)