diff --git a/app/__pycache__/app.cpython-312.pyc b/app/__pycache__/app.cpython-312.pyc new file mode 100644 index 0000000..1656844 Binary files /dev/null and b/app/__pycache__/app.cpython-312.pyc differ diff --git a/app/app.py b/app/app.py index a8e977c..83dc115 100644 --- a/app/app.py +++ b/app/app.py @@ -116,7 +116,7 @@ def load_ssh_key(): raise RuntimeError(f"Could not load SSH key from {SSH_KEY_PATH}") -def collect_one(entry, ssh_key): +def collect_one(entry, ssh_key, progress_cb=None): """SSH into a single server, run the gather script, return parsed data.""" try: ssh = paramiko.SSHClient() @@ -136,7 +136,21 @@ def collect_one(entry, ssh_key): stdin, stdout, stderr = ssh.exec_command('bash -s', timeout=60) stdin.write(script) stdin.channel.shutdown_write() - output = stdout.read().decode('utf-8', errors='replace') + + if progress_cb: + # Stream line-by-line and report section headers as progress + output_lines = [] + for raw_line in stdout: + line = raw_line.rstrip('\n') + output_lines.append(line) + stripped = line.strip() + if stripped.startswith('[') and stripped.endswith(']') and stripped != '[end]': + section = stripped[1:-1].split(':')[0] + progress_cb(section) + output = '\n'.join(output_lines) + else: + output = stdout.read().decode('utf-8', errors='replace') + ssh.close() data = parse_gather_output(output) @@ -479,9 +493,65 @@ def api_refresh_one_stream(server_id): yield f"data: [DONE]\n\n" return + # Helper to collect one server with streaming progress + def collect_with_progress(srv_entry, srv_id): + progress_msgs = [] + last_section = [None] + def on_progress(section): + if section != last_section[0]: + last_section[0] = section + progress_msgs.append(section) + result = collect_one(srv_entry, ssh_key, progress_cb=on_progress) + with app.app_context(): + srv = Server.query.get(srv_id) + _update_server_from_result(srv, srv_entry, result) + db.session.commit() + return result, progress_msgs + # Collect the host - yield f"data: Connecting to {hostname}...\n\n" - result = collect_one(entry, ssh_key) + import queue as _queue + progress_q = _queue.Queue() + host_done = threading.Event() + + def _collect_host(): + last_reported = [None] + def on_progress(section): + if section != last_reported[0]: + last_reported[0] = section + progress_q.put(('progress', f"{hostname}: {section}")) + try: + result = collect_one(entry, ssh_key, progress_cb=on_progress) + progress_q.put(('result', result)) + except Exception as e: + progress_q.put(('result', {'is_online': False, 'error': str(e)})) + host_done.set() + + t = threading.Thread(target=_collect_host) + t.start() + + # Stream progress while collecting + while not host_done.is_set(): + try: + kind, val = progress_q.get(timeout=0.3) + if kind == 'progress': + yield f"data: {val}\n\n" + elif kind == 'result': + break + except _queue.Empty: + continue + + t.join() + # Drain remaining messages + result = None + while not progress_q.empty(): + kind, val = progress_q.get_nowait() + if kind == 'progress': + yield f"data: {val}\n\n" + elif kind == 'result': + result = val + + if result is None: + result = {'is_online': False, 'error': 'unknown'} with app.app_context(): server = Server.query.get(server_id) @@ -500,11 +570,49 @@ def api_refresh_one_stream(server_id): else: yield f"data: {hostname} - offline: {result.get('error', 'unknown')}\n\n" - # Collect child VMs + # Collect child VMs with progress for child_entry in child_entries: child_host = child_entry['hostname'] - yield f"data: Connecting to {child_host}...\n\n" - child_result = collect_one(child_entry, ssh_key) + child_q = _queue.Queue() + child_done = threading.Event() + + def _collect_child(ce=child_entry, cq=child_q, cd=child_done): + last_reported = [None] + def on_progress(section): + if section != last_reported[0]: + last_reported[0] = section + cq.put(('progress', f"{ce['hostname']}: {section}")) + try: + r = collect_one(ce, ssh_key, progress_cb=on_progress) + cq.put(('result', r)) + except Exception as e: + cq.put(('result', {'is_online': False, 'error': str(e)})) + cd.set() + + ct = threading.Thread(target=_collect_child) + ct.start() + + while not child_done.is_set(): + try: + kind, val = child_q.get(timeout=0.3) + if kind == 'progress': + yield f"data: {val}\n\n" + elif kind == 'result': + break + except _queue.Empty: + continue + + ct.join() + child_result = None + while not child_q.empty(): + kind, val = child_q.get_nowait() + if kind == 'progress': + yield f"data: {val}\n\n" + elif kind == 'result': + child_result = val + + if child_result is None: + child_result = {'is_online': False, 'error': 'unknown'} with app.app_context(): child_server = Server.query.get(child_entry['id'])