Add SSE streaming for server refresh with live progress log modal
All checks were successful
Build-Publish / build (linux/amd64) (push) Successful in 4s
Build-Publish / build (linux/arm64) (push) Successful in 13s
Build-Publish / create-manifest (push) Successful in 2s
Build-Publish / publish-template (push) Successful in 15s

This commit is contained in:
j
2026-03-08 20:03:34 +13:00
parent be032e66d7
commit cb946b2259
3 changed files with 262 additions and 65 deletions

View File

@@ -10,7 +10,7 @@ import paramiko
BUILD_DATE = '__BUILD_DATE__'
from flask import Flask, render_template, jsonify
from flask import Flask, render_template, jsonify, Response, stream_with_context
from flask_sqlalchemy import SQLAlchemy
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
@@ -63,14 +63,16 @@ def parse_infrastructure_conf():
current_host = None
else:
# Detect indent level: double indent = child VM
stripped = line.lstrip('\t')
tab_count = len(line) - len(stripped)
if tab_count < 2:
# Also check spaces: 8+ spaces = double indent
space_indent = len(line) - len(line.lstrip(' '))
is_child = space_indent >= 8
else:
is_child = tab_count >= 2
# Normalize: count leading whitespace treating tab as 4 spaces
raw_indent = 0
for ch in line:
if ch == '\t':
raw_indent += 4
elif ch == ' ':
raw_indent += 1
else:
break
is_child = raw_indent >= 8
parts = line.strip().split(None, 1)
entry = parts[0] if parts else ''
@@ -239,30 +241,8 @@ def collect_all():
db.session.add(server)
server.group_name = entry['group']
server.url = entry.get('url', '')
server.parent_hostname = entry.get('parent_hostname', '')
server.is_online = result.get('is_online', False)
server.last_collected = datetime.now(timezone.utc)
server.details = result
# Extract primary IP: prefer the interface carrying the default route
default_iface = ''
routing = result.get('routing', {})
if isinstance(routing, dict):
default_iface = routing.get('interface', '')
primary_ip = ''
for iface in result.get('net', []):
ipv4 = iface.get('ipv4', '')
if not ipv4 or ipv4.startswith('127.'):
continue
iface_name = iface.get('name', '') or iface.get('_name', '')
if iface_name == default_iface:
primary_ip = ipv4
break
if not primary_ip:
primary_ip = ipv4
server.primary_ip = primary_ip
_update_server_from_result(server, entry, result)
db.session.commit()
logger.info("Collection complete, updated %d servers", len(results))
@@ -340,33 +320,13 @@ def api_servers():
return jsonify(result)
@app.route('/api/refresh', methods=['POST'])
def api_refresh():
trigger_collect()
return jsonify({'ok': True, 'message': 'Collection triggered'})
@app.route('/api/servers/<int:server_id>/refresh', methods=['POST'])
def api_refresh_one(server_id):
server = Server.query.get_or_404(server_id)
try:
ssh_key = load_ssh_key()
except Exception as e:
return jsonify({'ok': False, 'error': str(e)}), 500
entry = {
'group': server.group_name,
'username': server.username,
'hostname': server.hostname,
'url': server.url,
}
result = collect_one(entry, ssh_key)
def _update_server_from_result(server, entry, result):
"""Apply collection result to a server record."""
server.is_online = result.get('is_online', False)
server.last_collected = datetime.now(timezone.utc)
server.details = result
server.url = entry.get('url', server.url)
# Extract primary IP
default_iface = ''
routing = result.get('routing', {})
if isinstance(routing, dict):
@@ -384,8 +344,127 @@ def api_refresh_one(server_id):
primary_ip = ipv4
server.primary_ip = primary_ip
db.session.commit()
return jsonify({'ok': True})
@app.route('/api/refresh', methods=['POST'])
def api_refresh():
trigger_collect()
return jsonify({'ok': True, 'message': 'Collection triggered'})
@app.route('/api/refresh/stream')
def api_refresh_stream():
"""SSE endpoint: collect all servers with progress updates."""
def generate():
entries = parse_infrastructure_conf()
if not entries:
yield f"data: No servers configured\n\n"
yield f"data: [DONE]\n\n"
return
try:
ssh_key = load_ssh_key()
except Exception as e:
yield f"data: SSH key error: {e}\n\n"
yield f"data: [DONE]\n\n"
return
yield f"data: Collecting from {len(entries)} servers...\n\n"
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_SSH) as pool:
futures = {pool.submit(collect_one, e, ssh_key): e for e in entries}
results = {}
for future in as_completed(futures):
entry = futures[future]
key = f"{entry['username']}@{entry['hostname']}"
try:
result = future.result(timeout=90)
results[key] = (entry, result)
status = 'online' if result.get('is_online') else 'offline'
yield f"data: {entry['hostname']} - {status}\n\n"
except Exception as e:
results[key] = (entry, {'is_online': False, 'error': str(e)})
yield f"data: {entry['hostname']} - error: {e}\n\n"
# Update database
with app.app_context():
config_keys = {(e['username'], e['hostname']) for e in entries}
for server in Server.query.all():
if (server.username, server.hostname) not in config_keys:
db.session.delete(server)
for key, (entry, result) in results.items():
server = Server.query.filter_by(
username=entry['username'], hostname=entry['hostname'],
).first()
if not server:
server = Server(
group_name=entry['group'],
username=entry['username'],
hostname=entry['hostname'],
)
db.session.add(server)
server.group_name = entry['group']
server.parent_hostname = entry.get('parent_hostname', '')
_update_server_from_result(server, entry, result)
db.session.commit()
yield f"data: Collection complete - {len(results)} servers updated\n\n"
yield f"data: [DONE]\n\n"
return Response(stream_with_context(generate()),
mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@app.route('/api/servers/<int:server_id>/refresh/stream')
def api_refresh_one_stream(server_id):
"""SSE endpoint: collect a single server with progress updates."""
def generate():
with app.app_context():
server = Server.query.get(server_id)
if not server:
yield f"data: Server not found\n\n"
yield f"data: [DONE]\n\n"
return
hostname = server.hostname
entry = {
'group': server.group_name,
'username': server.username,
'hostname': server.hostname,
'url': server.url,
}
try:
ssh_key = load_ssh_key()
except Exception as e:
yield f"data: SSH key error: {e}\n\n"
yield f"data: [DONE]\n\n"
return
yield f"data: Connecting to {hostname}...\n\n"
result = collect_one(entry, ssh_key)
with app.app_context():
server = Server.query.get(server_id)
_update_server_from_result(server, entry, result)
db.session.commit()
if result.get('is_online'):
sys_info = result.get('system', {})
ct_count = len(result.get('container', []))
msg = f"{hostname} - online"
if sys_info.get('os_pretty'):
msg += f" ({sys_info['os_pretty']})"
if ct_count:
msg += f", {ct_count} containers"
yield f"data: {msg}\n\n"
else:
yield f"data: {hostname} - offline: {result.get('error', 'unknown')}\n\n"
yield f"data: [DONE]\n\n"
return Response(stream_with_context(generate()),
mimetype='text/event-stream',
headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'})
@app.route('/api/servers/<int:server_id>/notes', methods=['PUT'])