Stream real-time section progress during single-host refresh
This commit is contained in:
122
app/app.py
122
app/app.py
@@ -116,7 +116,7 @@ def load_ssh_key():
|
||||
raise RuntimeError(f"Could not load SSH key from {SSH_KEY_PATH}")
|
||||
|
||||
|
||||
def collect_one(entry, ssh_key):
|
||||
def collect_one(entry, ssh_key, progress_cb=None):
|
||||
"""SSH into a single server, run the gather script, return parsed data."""
|
||||
try:
|
||||
ssh = paramiko.SSHClient()
|
||||
@@ -136,7 +136,21 @@ def collect_one(entry, ssh_key):
|
||||
stdin, stdout, stderr = ssh.exec_command('bash -s', timeout=60)
|
||||
stdin.write(script)
|
||||
stdin.channel.shutdown_write()
|
||||
output = stdout.read().decode('utf-8', errors='replace')
|
||||
|
||||
if progress_cb:
|
||||
# Stream line-by-line and report section headers as progress
|
||||
output_lines = []
|
||||
for raw_line in stdout:
|
||||
line = raw_line.rstrip('\n')
|
||||
output_lines.append(line)
|
||||
stripped = line.strip()
|
||||
if stripped.startswith('[') and stripped.endswith(']') and stripped != '[end]':
|
||||
section = stripped[1:-1].split(':')[0]
|
||||
progress_cb(section)
|
||||
output = '\n'.join(output_lines)
|
||||
else:
|
||||
output = stdout.read().decode('utf-8', errors='replace')
|
||||
|
||||
ssh.close()
|
||||
|
||||
data = parse_gather_output(output)
|
||||
@@ -479,9 +493,65 @@ def api_refresh_one_stream(server_id):
|
||||
yield f"data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
# Helper to collect one server with streaming progress
|
||||
def collect_with_progress(srv_entry, srv_id):
|
||||
progress_msgs = []
|
||||
last_section = [None]
|
||||
def on_progress(section):
|
||||
if section != last_section[0]:
|
||||
last_section[0] = section
|
||||
progress_msgs.append(section)
|
||||
result = collect_one(srv_entry, ssh_key, progress_cb=on_progress)
|
||||
with app.app_context():
|
||||
srv = Server.query.get(srv_id)
|
||||
_update_server_from_result(srv, srv_entry, result)
|
||||
db.session.commit()
|
||||
return result, progress_msgs
|
||||
|
||||
# Collect the host
|
||||
yield f"data: Connecting to {hostname}...\n\n"
|
||||
result = collect_one(entry, ssh_key)
|
||||
import queue as _queue
|
||||
progress_q = _queue.Queue()
|
||||
host_done = threading.Event()
|
||||
|
||||
def _collect_host():
|
||||
last_reported = [None]
|
||||
def on_progress(section):
|
||||
if section != last_reported[0]:
|
||||
last_reported[0] = section
|
||||
progress_q.put(('progress', f"{hostname}: {section}"))
|
||||
try:
|
||||
result = collect_one(entry, ssh_key, progress_cb=on_progress)
|
||||
progress_q.put(('result', result))
|
||||
except Exception as e:
|
||||
progress_q.put(('result', {'is_online': False, 'error': str(e)}))
|
||||
host_done.set()
|
||||
|
||||
t = threading.Thread(target=_collect_host)
|
||||
t.start()
|
||||
|
||||
# Stream progress while collecting
|
||||
while not host_done.is_set():
|
||||
try:
|
||||
kind, val = progress_q.get(timeout=0.3)
|
||||
if kind == 'progress':
|
||||
yield f"data: {val}\n\n"
|
||||
elif kind == 'result':
|
||||
break
|
||||
except _queue.Empty:
|
||||
continue
|
||||
|
||||
t.join()
|
||||
# Drain remaining messages
|
||||
result = None
|
||||
while not progress_q.empty():
|
||||
kind, val = progress_q.get_nowait()
|
||||
if kind == 'progress':
|
||||
yield f"data: {val}\n\n"
|
||||
elif kind == 'result':
|
||||
result = val
|
||||
|
||||
if result is None:
|
||||
result = {'is_online': False, 'error': 'unknown'}
|
||||
|
||||
with app.app_context():
|
||||
server = Server.query.get(server_id)
|
||||
@@ -500,11 +570,49 @@ def api_refresh_one_stream(server_id):
|
||||
else:
|
||||
yield f"data: {hostname} - offline: {result.get('error', 'unknown')}\n\n"
|
||||
|
||||
# Collect child VMs
|
||||
# Collect child VMs with progress
|
||||
for child_entry in child_entries:
|
||||
child_host = child_entry['hostname']
|
||||
yield f"data: Connecting to {child_host}...\n\n"
|
||||
child_result = collect_one(child_entry, ssh_key)
|
||||
child_q = _queue.Queue()
|
||||
child_done = threading.Event()
|
||||
|
||||
def _collect_child(ce=child_entry, cq=child_q, cd=child_done):
|
||||
last_reported = [None]
|
||||
def on_progress(section):
|
||||
if section != last_reported[0]:
|
||||
last_reported[0] = section
|
||||
cq.put(('progress', f"{ce['hostname']}: {section}"))
|
||||
try:
|
||||
r = collect_one(ce, ssh_key, progress_cb=on_progress)
|
||||
cq.put(('result', r))
|
||||
except Exception as e:
|
||||
cq.put(('result', {'is_online': False, 'error': str(e)}))
|
||||
cd.set()
|
||||
|
||||
ct = threading.Thread(target=_collect_child)
|
||||
ct.start()
|
||||
|
||||
while not child_done.is_set():
|
||||
try:
|
||||
kind, val = child_q.get(timeout=0.3)
|
||||
if kind == 'progress':
|
||||
yield f"data: {val}\n\n"
|
||||
elif kind == 'result':
|
||||
break
|
||||
except _queue.Empty:
|
||||
continue
|
||||
|
||||
ct.join()
|
||||
child_result = None
|
||||
while not child_q.empty():
|
||||
kind, val = child_q.get_nowait()
|
||||
if kind == 'progress':
|
||||
yield f"data: {val}\n\n"
|
||||
elif kind == 'result':
|
||||
child_result = val
|
||||
|
||||
if child_result is None:
|
||||
child_result = {'is_online': False, 'error': 'unknown'}
|
||||
|
||||
with app.app_context():
|
||||
child_server = Server.query.get(child_entry['id'])
|
||||
|
||||
Reference in New Issue
Block a user