""" RV50x Template Manager - Web GUI Backend Run directly: uvicorn app:app --host 0.0.0.0 --port 8000 Run via Docker: docker-compose up -d Environment variables (set in .env or docker-compose.yml): NOCODB_URL NocoDB base URL e.g. http://192.168.16.130:8080 NOCODB_TOKEN NocoDB API token NOCODB_BASE_ID Base ID from browser URL NOCODB_TABLE_ID Table ID from browser URL NOCODB_VIEW_ID View ID from browser URL (the "All" view) PAGE_TIMEOUT ms to wait for page elements (default 90000) DOWNLOAD_TIMEOUT ms to wait for template gen (default 120000) UPLOAD_TIMEOUT ms to wait for upload done (default 120000) MAX_RETRIES retry attempts per device (default 3) """ import asyncio import html as html_lib import io import json import os import re import threading import urllib.parse import urllib.request from datetime import datetime, timedelta from pathlib import Path from typing import Optional import pandas as pd import paramiko from fastapi import FastAPI, BackgroundTasks, File, Form, UploadFile, Request, Response, Depends, HTTPException from fastapi.responses import HTMLResponse, StreamingResponse, RedirectResponse from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired from pydantic import BaseModel # ── NocoDB connection — read from environment, fall back to defaults ─────────── NOCODB_URL = os.environ.get("NOCODB_URL", "http://192.168.16.130:8080") NOCODB_TOKEN = os.environ.get("NOCODB_TOKEN", "eWU_ilelaCtNy1JzC7vf41DokkqFOovcLHM0zVml") NOCODB_BASE_ID = os.environ.get("NOCODB_BASE_ID", "pdq96x915xt4a0m") NOCODB_TABLE_ID = os.environ.get("NOCODB_TABLE_ID", "mkewnr53ahqvnt9") _view_id = os.environ.get("NOCODB_VIEW_ID", "vwl7qvxo1xclvawz") NOCODB_VIEW_IDS = { "All": _view_id, "Electric": _view_id, "Gas & Water": _view_id, } # ── Local paths ──────────────────────────────────────────────────────────────── # In Docker these are bind-mounted from the host so data persists across rebuilds. SCRIPT_DIR = Path(__file__).parent DOWNLOAD_DIR = Path(os.environ.get("DOWNLOAD_DIR", str(SCRIPT_DIR / "template_downloads"))) UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", str(SCRIPT_DIR / "template_uploads"))) TEMPLATES_DIR = Path(os.environ.get("TEMPLATES_DIR", str(SCRIPT_DIR / "xml_templates"))) DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) UPLOAD_DIR.mkdir(parents=True, exist_ok=True) TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) # ── Playwright timeouts — tunable via environment ────────────────────────────── PAGE_TIMEOUT = int(os.environ.get("PAGE_TIMEOUT", "90000")) DOWNLOAD_TIMEOUT = int(os.environ.get("DOWNLOAD_TIMEOUT", "120000")) UPLOAD_TIMEOUT = int(os.environ.get("UPLOAD_TIMEOUT", "120000")) MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3")) # ── In-memory job store ──────────────────────────────────────────────────────── jobs: dict[str, dict] = {} abort_flags: set[str] = set() # job_ids that have been asked to abort # ── AT Terminal job store (separate from Playwright jobs) ────────────────────── at_sessions: dict[str, dict] = {} # session_id → {logs, status, results} SSH_PORT = int(os.environ.get("SSH_PORT", "3223")) # ── Auth config ──────────────────────────────────────────────────────────────── APP_USERNAME = os.environ.get("APP_USERNAME", "admin") APP_PASSWORD = os.environ.get("APP_PASSWORD", "changeme") SESSION_SECRET = os.environ.get("SESSION_SECRET", "change-this-secret") SESSION_HOURS = int(os.environ.get("SESSION_HOURS", "8")) SESSION_COOKIE = "rv50x_session" _signer = URLSafeTimedSerializer(SESSION_SECRET) def _make_session_cookie() -> str: return _signer.dumps({"user": APP_USERNAME}) def _verify_session_cookie(token: str) -> bool: try: _signer.loads(token, max_age=SESSION_HOURS * 3600) return True except (BadSignature, SignatureExpired): return False def require_auth(request: Request): token = request.cookies.get(SESSION_COOKIE) if not token or not _verify_session_cookie(token): raise HTTPException(status_code=302, headers={"Location": "/login"}) app = FastAPI(title="RV50x Template Manager") # ══════════════════════════════════════════════════════════════════════════════ # NocoDB helpers # ══════════════════════════════════════════════════════════════════════════════ def _fetch_all_rows(view_id: str) -> list: headers = {"xc-token": NOCODB_TOKEN, "Content-Type": "application/json"} all_rows = [] offset, limit = 0, 1000 while True: params = urllib.parse.urlencode({"viewId": view_id, "limit": limit, "offset": offset}) url = f"{NOCODB_URL}/api/v1/db/data/noco/{NOCODB_BASE_ID}/{NOCODB_TABLE_ID}?{params}" req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read().decode()) all_rows.extend(data.get("list", [])) if data.get("pageInfo", {}).get("isLastPage", True): break offset += limit return all_rows # All NocoDB fields we care about — everything becomes a template variable NOCODB_FIELDS = [ "hostname", "ip_address", "username", "password", "https_port", "use_https", "ssh_port", "location", "alias", "dept", "def_pass", "snmp_auth_key", "snmp_priv_key", "fort_key", "vpn_subnets", ] def load_devices(group: str = "All") -> list: view_id = NOCODB_VIEW_IDS.get(group, NOCODB_VIEW_IDS["All"]) rows = _fetch_all_rows(view_id) devices = [] for row in rows: device_id = str(row.get("hostname", "")).strip() ip = str(row.get("ip_address", "")).strip() username = str(row.get("username", "")).strip() password = str(row.get("password", "")).strip() dept = str(row.get("dept", "")).strip() location = str(row.get("location", "")).strip() if not device_id or not ip: continue if group == "Electric" and dept.upper() != "ELEC": continue if group == "Gas & Water" and dept.upper() != "GW": continue devices.append({ "id": device_id, "ip": ip, "username": username, "password": password, "dept": dept, "location": location, }) return devices def load_devices_full(group: str = "All") -> list: """Load all NocoDB fields for each device (used by XML Builder).""" view_id = NOCODB_VIEW_IDS.get(group, NOCODB_VIEW_IDS["All"]) rows = _fetch_all_rows(view_id) devices = [] for row in rows: device_id = str(row.get("hostname", "")).strip() dept = str(row.get("dept", "")).strip() if not device_id: continue if group == "Electric" and dept.upper() != "ELEC": continue if group == "Gas & Water" and dept.upper() != "GW": continue # Build a flat dict of all available fields device = {} for field in NOCODB_FIELDS: val = row.get(field, "") device[field] = str(val).strip() if val is not None else "" devices.append(device) return devices # ══════════════════════════════════════════════════════════════════════════════ # XML Builder — core logic (replaces excel_to_xml.py) # ══════════════════════════════════════════════════════════════════════════════ def _extract_template_variables(template: str) -> list[str]: """Return unique variable names found in {var} or {{ var }} placeholders.""" pattern = r'\{\{?\s*([^{}]+?)\s*\}?\}' return list(set(re.findall(pattern, template))) def _escape_xml(value) -> str: """Convert a value to an XML-safe string.""" if not isinstance(value, str): # Convert float-integers like 443.0 → "443" if isinstance(value, float) and value == int(value): value = str(int(value)) else: value = str(value) # Escape backslashes then XML special chars value = value.replace('\\', '\\\\') value = html_lib.escape(value) return value def _render_template(template: str, data: dict, default: str = "NOTSET") -> str: """Replace all {var} and {{ var }} placeholders in template with data values.""" for var, raw_val in data.items(): val = _escape_xml(raw_val) if raw_val not in ("", None) else default try: template = re.sub(r'\{\{\s*' + re.escape(var) + r'\s*\}\}', val, template) template = re.sub(r'\{' + re.escape(var) + r'\}', val, template) except Exception: pass # Fill any remaining unreplaced variables with the default template = re.sub(r'\{\{?\s*[^{}]+?\s*\}?\}', default, template) return template def build_xmls_from_nocodb( group: str, hostnames: list[str], template: str, default: str = "NOTSET", ) -> list[dict]: """ Generate XML files for the given hostnames using live NocoDB data. Files are written to template_uploads/. Returns a list of result dicts: {hostname, success, message}. """ all_devices = load_devices_full(group) device_map = {d["hostname"]: d for d in all_devices} results = [] for hostname in hostnames: if hostname not in device_map: results.append({"hostname": hostname, "success": False, "message": "Not found in NocoDB"}) continue data = device_map[hostname] content = _render_template(template, data, default) out = UPLOAD_DIR / f"{hostname}.xml" try: out.write_text(content, encoding="utf-8") results.append({"hostname": hostname, "success": True, "message": f"→ {out.name}"}) except Exception as e: results.append({"hostname": hostname, "success": False, "message": str(e)}) return results def build_xmls_from_excel( excel_bytes: bytes, template: str, hostnames: list[str], default: str = "NOTSET", ) -> list[dict]: """ Generate XML files for the given hostnames using data from an uploaded Excel file. Files are written to template_uploads/. Returns a list of result dicts: {hostname, success, message}. """ df = pd.read_excel(io.BytesIO(excel_bytes)) # Find hostname column hostname_col = next( (c for c in df.columns if "hostname" in c.lower()), None ) if not hostname_col: return [{"hostname": h, "success": False, "message": "No hostname column in Excel"} for h in hostnames] results = [] for hostname in hostnames: rows = df[df[hostname_col] == hostname] if rows.empty: results.append({"hostname": hostname, "success": False, "message": "Not found in Excel"}) continue row = rows.iloc[0].to_dict() data = {} for col, val in row.items(): clean = col.strip().strip('{}') if pd.notna(val): if isinstance(val, float) and val == int(val): data[clean] = str(int(val)) else: data[clean] = str(val) else: data[clean] = default content = _render_template(template, data, default) out = UPLOAD_DIR / f"{hostname}.xml" try: out.write_text(content, encoding="utf-8") results.append({"hostname": hostname, "success": True, "message": f"→ {out.name}"}) except Exception as e: results.append({"hostname": hostname, "success": False, "message": str(e)}) return results # ══════════════════════════════════════════════════════════════════════════════ # Report writer # ══════════════════════════════════════════════════════════════════════════════ def _write_report(job_id: str, job: dict, output_dir: Path) -> Path: """Write a timestamped report file matching the CLI format.""" results = job.get("results", []) succeeded = [r for r in results if r.get("success")] failed = [r for r in results if not r.get("success")] started = datetime.fromisoformat(job["started"]) if job.get("started") else datetime.now() finished = datetime.fromisoformat(job["finished"]) if job.get("finished") else datetime.now() elapsed = (finished - started).total_seconds() job_type = job.get("type", "job").upper() timestamp = finished.strftime("%Y-%m-%d %H:%M") lines = [] lines.append("═" * 60) lines.append(f" {job_type} SUMMARY — {timestamp}") lines.append(f" Job ID: {job_id}") lines.append("═" * 60) lines.append(f" Total: {len(results)}") lines.append(f" ✓ Success: {len(succeeded)}") lines.append(f" ✗ Failed: {len(failed)}") lines.append(f" Time: {elapsed:.0f}s") if succeeded: lines.append("") lines.append(f" {'Downloaded' if job_type == 'DOWNLOAD' else 'Uploaded'}:") for r in succeeded: extra = f" ({r.get('size_kb', '')} KB)" if r.get("size_kb") else "" lines.append(f" ✓ {r['id']}{extra}") if failed: lines.append("") lines.append(" Failed:") for r in failed: lines.append(f" ✗ {r['id']} — {r.get('message', '')}") lines.append("═" * 60) lines.append("") lines.append(" Full log:") lines.append("") for log_line in job.get("logs", []): lines.append(f" {log_line}") lines.append("═" * 60) report_name = f"report_{finished.strftime('%Y%m%d_%H%M%S')}.txt" report_path = output_dir / report_name report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") return report_path # ══════════════════════════════════════════════════════════════════════════════ # Playwright workers (download / upload to modems) # ══════════════════════════════════════════════════════════════════════════════ async def _download_one(device: dict, job_id: str) -> dict: from playwright.async_api import async_playwright, TimeoutError as PWTimeout device_id = device["id"] ip = device["ip"] username = device["username"] password = device["password"] def log(msg): jobs[job_id]["logs"].append(f"[{datetime.now().strftime('%H:%M:%S')}] [{device_id}] {msg}") async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(ignore_https_errors=True, accept_downloads=True) page = await context.new_page() page.set_default_timeout(PAGE_TIMEOUT) try: await page.goto(f"https://{ip}", wait_until="load") await page.wait_for_selector('input[name="username"]', state="visible", timeout=PAGE_TIMEOUT) await page.wait_for_timeout(1000) await page.click('input[name="username"]', click_count=3) await page.fill('input[name="username"]', username) await page.fill('input[name="password"]', password) await page.wait_for_timeout(800) login_btn = page.locator('input[name="Login"]') await login_btn.scroll_into_view_if_needed() await login_btn.click() await page.wait_for_selector('#btn_tpl', state="visible", timeout=PAGE_TIMEOUT) log("Logged in.") await page.wait_for_function("typeof showTemplateDialog === 'function'", timeout=PAGE_TIMEOUT) await page.wait_for_timeout(500) for _ in range(5): await page.evaluate("showTemplateDialog()") await page.wait_for_timeout(1000) visible = await page.evaluate(""" () => { const el = document.getElementById('template_name'); if (!el) return false; const form = el.closest('form') || el.closest('div[id]'); if (!form) return el.offsetParent !== null; return form.offsetParent !== null; } """) if visible: break else: return {"id": device_id, "success": False, "message": "Modal did not open"} await page.fill('input[id="template_name"]', device_id) pwd_cb = page.locator('input[id="template_pwds"]') if not await pwd_cb.is_checked(): await pwd_cb.check() info_cb = page.locator('input[id="template_info"]') if await info_cb.count() > 0 and await info_cb.is_checked(): await info_cb.uncheck() page.on("dialog", lambda d: asyncio.ensure_future(d.dismiss())) out_path = DOWNLOAD_DIR / f"{device_id}.xml" async with page.expect_download(timeout=DOWNLOAD_TIMEOUT) as dl: await page.click('input.fbtn[name="download_template"]') download = await dl.value await download.save_as(out_path) await page.wait_for_timeout(1000) if out_path.exists() and out_path.stat().st_size > 0: size_kb = out_path.stat().st_size // 1024 log(f"✓ Saved {size_kb} KB") return {"id": device_id, "success": True, "message": f"{size_kb} KB", "size_kb": size_kb} return {"id": device_id, "success": False, "message": "File empty after download"} except PWTimeout: log("✗ TIMEOUT") return {"id": device_id, "success": False, "message": "TIMEOUT"} except Exception as e: log(f"✗ ERROR: {str(e)[:80]}") return {"id": device_id, "success": False, "message": str(e)[:120]} finally: await context.close() await browser.close() async def _upload_one(device: dict, reboot: bool, job_id: str) -> dict: from playwright.async_api import async_playwright, TimeoutError as PWTimeout device_id = device["id"] ip = device["ip"] username = device["username"] password = device["password"] template_file = UPLOAD_DIR / f"{device_id}.xml" def log(msg): jobs[job_id]["logs"].append(f"[{datetime.now().strftime('%H:%M:%S')}] [{device_id}] {msg}") if not template_file.exists(): log("✗ Template file not found") return {"id": device_id, "success": False, "message": "Template file not found"} async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(ignore_https_errors=True, accept_downloads=True) page = await context.new_page() page.set_default_timeout(PAGE_TIMEOUT) try: await page.goto(f"https://{ip}", wait_until="load") await page.wait_for_selector('input[name="username"]', state="visible", timeout=PAGE_TIMEOUT) await page.wait_for_timeout(1000) await page.click('input[name="username"]', click_count=3) await page.fill('input[name="username"]', username) await page.fill('input[name="password"]', password) await page.wait_for_timeout(800) login_btn = page.locator('input[name="Login"]') await login_btn.scroll_into_view_if_needed() await login_btn.click() await page.wait_for_selector('#btn_tpl', state="visible", timeout=PAGE_TIMEOUT) await page.wait_for_function("typeof showTemplateDialog === 'function'", timeout=PAGE_TIMEOUT) await page.wait_for_timeout(500) log("Logged in.") for _ in range(5): await page.evaluate("showTemplateDialog()") await page.wait_for_timeout(1000) visible = await page.evaluate(""" () => { const el = document.getElementById('template_name'); if (!el) return false; const form = el.closest('form') || el.closest('div[id]'); if (!form) return el.offsetParent !== null; return form.offsetParent !== null; } """) if visible: break else: return {"id": device_id, "success": False, "message": "Modal did not open"} await page.locator('input[id="template_filename"]').set_input_files(str(template_file)) log(f"File set: {template_file.name}") reboot_cb = page.locator('input[name="reboot_after_upload"]') is_checked = await reboot_cb.is_checked() if reboot and not is_checked: await reboot_cb.check() elif not reboot and is_checked: await reboot_cb.uncheck() page.on("dialog", lambda d: asyncio.ensure_future(d.dismiss())) log("Uploading...") if reboot: # Click upload then race multiple reboot signals. # Different firmware versions signal reboot differently: # - URL navigates to /?reboot # - Page shows a reboot/success message # - Connection drops entirely (device gone) await page.click('input[name="upload_template"]') reboot_confirmed = False deadline = UPLOAD_TIMEOUT / 1000 # seconds async def _any_reboot_signal(): tasks = [ asyncio.ensure_future(page.wait_for_url("**/?reboot**", timeout=UPLOAD_TIMEOUT)), asyncio.ensure_future(page.wait_for_url("**reboot**", timeout=UPLOAD_TIMEOUT)), asyncio.ensure_future(page.wait_for_selector( 'text=Rebooting, text=reboot, text=Upload Complete, text=Apply Complete', timeout=UPLOAD_TIMEOUT )), ] done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for t in pending: t.cancel() return bool(done) try: reboot_confirmed = await asyncio.wait_for(_any_reboot_signal(), timeout=deadline) except (asyncio.TimeoutError, Exception): # Connection drop or page crash after clicking upload # is itself strong evidence the device is rebooting. try: await page.title() except Exception: reboot_confirmed = True # page is gone = rebooted if reboot_confirmed: log("✓ Rebooting — upload successful.") return {"id": device_id, "success": True, "message": "OK"} log("✗ Reboot signal not detected after upload") return {"id": device_id, "success": False, "message": "Reboot not confirmed"} else: await page.click('input[name="upload_template"]') await page.wait_for_selector( 'text=Upload Complete, text=Apply Complete, text=Template Applied', timeout=UPLOAD_TIMEOUT ) log("✓ Upload complete (no reboot).") return {"id": device_id, "success": True, "message": "OK"} except PWTimeout: log("✗ TIMEOUT") return {"id": device_id, "success": False, "message": "TIMEOUT"} except Exception as e: err = str(e).lower() # Connection errors AFTER upload started = device rebooted if reboot and any(k in err for k in ("connection", "net::", "closed", "crashed", "target", "disconnected")): log("✓ Connection closed — device is rebooting.") return {"id": device_id, "success": True, "message": "OK"} log(f"✗ ERROR: {str(e)[:80]}") return {"id": device_id, "success": False, "message": str(e)[:120]} finally: await context.close() await browser.close() async def _run_with_retry(fn, device, job_id, **kwargs): device_id = device["id"] ts = lambda: datetime.now().strftime('%H:%M:%S') for attempt in range(1, MAX_RETRIES + 1): # Check abort flag before each attempt if job_id in abort_flags: jobs[job_id]["logs"].append(f"[{ts()}] [{device_id}] ✗ Aborted.") return {"id": device_id, "success": False, "message": "Aborted"} jobs[job_id]["logs"].append( f"[{ts()}] [{device_id}] Attempt {attempt}/{MAX_RETRIES}" ) result = await fn(device, job_id=job_id, **kwargs) if result["success"]: return result if attempt < MAX_RETRIES: if job_id in abort_flags: jobs[job_id]["logs"].append(f"[{ts()}] [{device_id}] ✗ Aborted.") return {"id": device_id, "success": False, "message": "Aborted"} jobs[job_id]["logs"].append( f"[{ts()}] [{device_id}] Retrying in 15s..." ) # Poll abort flag every second so abort is responsive for _ in range(15): if job_id in abort_flags: break await asyncio.sleep(1) return result # ══════════════════════════════════════════════════════════════════════════════ # Background job runners # ══════════════════════════════════════════════════════════════════════════════ async def run_download_job(job_id: str, devices: list, concurrency: int): jobs[job_id]["status"] = "running" semaphore = asyncio.Semaphore(concurrency) async def _guarded(device): async with semaphore: return await _run_with_retry(_download_one, device, job_id) results = await asyncio.gather(*[_guarded(d) for d in devices]) jobs[job_id]["results"] = list(results) jobs[job_id]["status"] = "done" jobs[job_id]["finished"] = datetime.now().isoformat() abort_flags.discard(job_id) report_path = _write_report(job_id, jobs[job_id], DOWNLOAD_DIR) jobs[job_id]["logs"].append( f"[{datetime.now().strftime('%H:%M:%S')}] Report saved → {report_path.name}" ) async def run_upload_job(job_id: str, devices: list, concurrency: int, reboot: bool): jobs[job_id]["status"] = "running" semaphore = asyncio.Semaphore(concurrency) async def _guarded(device): async with semaphore: return await _run_with_retry(_upload_one, device, job_id, reboot=reboot) results = await asyncio.gather(*[_guarded(d) for d in devices]) jobs[job_id]["results"] = list(results) jobs[job_id]["status"] = "done" jobs[job_id]["finished"] = datetime.now().isoformat() abort_flags.discard(job_id) report_path = _write_report(job_id, jobs[job_id], UPLOAD_DIR) jobs[job_id]["logs"].append( f"[{datetime.now().strftime('%H:%M:%S')}] Report saved → {report_path.name}" ) # ══════════════════════════════════════════════════════════════════════════════ # API models # ══════════════════════════════════════════════════════════════════════════════ class JobRequest(BaseModel): group: str = "All" device_ids: list[str] = [] concurrency: int = 1 reboot: bool = True class BuildRequest(BaseModel): source: str # "nocodb" or "excel" group: str = "All" hostnames: list[str] template_name: str # filename in xml_templates/ default_value: str = "NOTSET" # ══════════════════════════════════════════════════════════════════════════════ # API routes — devices & jobs (unchanged) # ══════════════════════════════════════════════════════════════════════════════ @app.get("/api/devices") def api_devices(request: Request, group: str = "All", _auth=Depends(require_auth)): try: devices = load_devices(group) return {"devices": devices, "count": len(devices)} except Exception as e: return {"error": str(e), "devices": [], "count": 0} @app.post("/api/jobs/download") async def start_download(request: Request, req: JobRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)): devices = load_devices(req.group) if req.device_ids: devices = [d for d in devices if d["id"] in req.device_ids] if not devices: return {"error": "No matching devices found"} job_id = f"dl_{datetime.now().strftime('%Y%m%d_%H%M%S')}" jobs[job_id] = { "type": "download", "status": "queued", "logs": [], "results": [], "started": datetime.now().isoformat(), "finished": None, "device_count": len(devices), } background_tasks.add_task(run_download_job, job_id, devices, req.concurrency) return {"job_id": job_id} @app.post("/api/jobs/upload") async def start_upload(request: Request, req: JobRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)): devices = load_devices(req.group) if req.device_ids: devices = [d for d in devices if d["id"] in req.device_ids] devices = [d for d in devices if (UPLOAD_DIR / f"{d['id']}.xml").exists()] if not devices: return {"error": "No devices with matching template files found in template_uploads/"} job_id = f"ul_{datetime.now().strftime('%Y%m%d_%H%M%S')}" jobs[job_id] = { "type": "upload", "status": "queued", "logs": [], "results": [], "started": datetime.now().isoformat(), "finished": None, "device_count": len(devices), "reboot": req.reboot, } background_tasks.add_task(run_upload_job, job_id, devices, req.concurrency, req.reboot) return {"job_id": job_id} @app.get("/api/jobs/{job_id}") def get_job(request: Request, job_id: str, _auth=Depends(require_auth)): if job_id not in jobs: return {"error": "Job not found"} return jobs[job_id] @app.get("/api/jobs/{job_id}/stream") async def stream_job(request: Request, job_id: str, from_line: int = 0, _auth=Depends(require_auth)): """Stream job logs via SSE. Pass ?from_line=N to skip already-seen lines.""" async def event_generator(): sent = max(0, from_line) while True: if job_id not in jobs: yield "data: {\"error\": \"job not found\"}\n\n" break job = jobs[job_id] logs = job["logs"] if len(logs) > sent: for line in logs[sent:]: yield f"data: {json.dumps({'log': line, 'line_num': sent})}\n\n" sent += 1 if job["status"] == "done": yield f"data: {json.dumps({'done': True, 'results': job['results']})}\n\n" break await asyncio.sleep(0.5) return StreamingResponse(event_generator(), media_type="text/event-stream") @app.get("/api/jobs") def list_jobs(request: Request, _auth=Depends(require_auth)): summary = [] for jid, j in sorted(jobs.items(), reverse=True): summary.append({ "job_id": jid, "type": j["type"], "status": j["status"], "device_count": j["device_count"], "started": j["started"], "finished": j.get("finished"), "succeeded": sum(1 for r in j["results"] if r.get("success")), "failed": sum(1 for r in j["results"] if not r.get("success")), }) return {"jobs": summary} @app.get("/api/files/downloads") def list_downloads(request: Request, _auth=Depends(require_auth)): files = sorted(DOWNLOAD_DIR.glob("*.xml")) return {"files": [{"name": f.name, "size_kb": f.stat().st_size // 1024} for f in files]} @app.get("/api/files/uploads") def list_uploads(request: Request, _auth=Depends(require_auth)): files = sorted(UPLOAD_DIR.glob("*.xml")) return {"files": [{"name": f.name, "size_kb": f.stat().st_size // 1024} for f in files]} # ══════════════════════════════════════════════════════════════════════════════ # API routes — XML Builder # ══════════════════════════════════════════════════════════════════════════════ @app.get("/api/xmlbuilder/nocodb-fields") def xmlbuilder_nocodb_fields(request: Request, _auth=Depends(require_auth)): """Return available NocoDB field names (= available template variables).""" return {"fields": NOCODB_FIELDS} @app.get("/api/xmlbuilder/nocodb-devices") def xmlbuilder_nocodb_devices(request: Request, group: str = "All", _auth=Depends(require_auth)): """Return hostnames for the given group from NocoDB.""" try: devices = load_devices_full(group) hostnames = [d["hostname"] for d in devices if d["hostname"]] return {"hostnames": hostnames, "count": len(hostnames)} except Exception as e: return {"error": str(e), "hostnames": []} @app.post("/api/xmlbuilder/upload-excel") async def xmlbuilder_upload_excel(request: Request, file: UploadFile = File(...), _auth=Depends(require_auth)): """ Accept an uploaded Excel file, return its columns and hostnames. Stores the file temporarily in memory (not on disk). """ try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents)) columns = [c.strip().strip('{}') for c in df.columns] hostname_col = next((c for c in df.columns if "hostname" in c.lower()), None) hostnames = df[hostname_col].dropna().astype(str).tolist() if hostname_col else [] # Store bytes in a simple cache keyed by filename excel_cache[file.filename] = contents return { "filename": file.filename, "columns": columns, "hostnames": hostnames, "rows": len(df), } except Exception as e: return {"error": str(e)} # Simple in-memory cache for uploaded Excel files (cleared on restart) excel_cache: dict[str, bytes] = {} @app.get("/api/xmlbuilder/templates") def xmlbuilder_list_templates(request: Request, _auth=Depends(require_auth)): """List XML template files stored in xml_templates/.""" files = sorted(TEMPLATES_DIR.glob("*.xml")) result = [] for f in files: content = f.read_text(encoding="utf-8", errors="replace") variables = _extract_template_variables(content) result.append({ "name": f.name, "size_kb": f.stat().st_size // 1024, "variables": variables, }) return {"templates": result} @app.post("/api/xmlbuilder/upload-template") async def xmlbuilder_upload_template(request: Request, file: UploadFile = File(...), _auth=Depends(require_auth)): """Save an uploaded XML template into xml_templates/.""" try: contents = await file.read() dest = TEMPLATES_DIR / file.filename dest.write_bytes(contents) text = contents.decode("utf-8", errors="replace") variables = _extract_template_variables(text) return {"name": file.filename, "variables": variables, "saved": True} except Exception as e: return {"error": str(e)} @app.post("/api/xmlbuilder/generate/nocodb") def xmlbuilder_generate_nocodb(request: Request, req: BuildRequest, _auth=Depends(require_auth)): """Generate XMLs from NocoDB data and save to template_uploads/.""" template_path = TEMPLATES_DIR / req.template_name if not template_path.exists(): return {"error": f"Template '{req.template_name}' not found in xml_templates/"} template = template_path.read_text(encoding="utf-8") results = build_xmls_from_nocodb(req.group, req.hostnames, template, req.default_value) ok = sum(1 for r in results if r["success"]) err = len(results) - ok return {"results": results, "succeeded": ok, "failed": err} @app.post("/api/xmlbuilder/generate/excel") async def xmlbuilder_generate_excel( request: Request, hostnames: str = Form(...), # JSON array string template_name: str = Form(...), excel_filename: str = Form(...), # key into excel_cache default_value: str = Form("NOTSET"), excel_file: Optional[UploadFile] = File(None), _auth=Depends(require_auth), ): """Generate XMLs from Excel data and save to template_uploads/.""" template_path = TEMPLATES_DIR / template_name if not template_path.exists(): return {"error": f"Template '{template_name}' not found in xml_templates/"} template = template_path.read_text(encoding="utf-8") # Get Excel bytes — from fresh upload or cache if excel_file: excel_bytes = await excel_file.read() excel_cache[excel_filename] = excel_bytes elif excel_filename in excel_cache: excel_bytes = excel_cache[excel_filename] else: return {"error": "Excel file not found. Please re-upload it."} host_list = json.loads(hostnames) results = build_xmls_from_excel(excel_bytes, template, host_list, default_value) ok = sum(1 for r in results if r["success"]) err = len(results) - ok return {"results": results, "succeeded": ok, "failed": err} # ══════════════════════════════════════════════════════════════════════════════ # Abort endpoint # ══════════════════════════════════════════════════════════════════════════════ @app.post("/api/jobs/{job_id}/abort") def abort_job(request: Request, job_id: str, _auth=Depends(require_auth)): if job_id not in jobs: return {"error": "Job not found"} if jobs[job_id]["status"] == "done": return {"error": "Job already finished"} abort_flags.add(job_id) jobs[job_id]["logs"].append( f"[{datetime.now().strftime('%H:%M:%S')}] ⚠ Abort requested — stopping after current device..." ) return {"aborted": True, "job_id": job_id} @app.post("/api/jobs/abort-all") def abort_all_jobs(request: Request, _auth=Depends(require_auth)): aborted = [] for job_id, job in jobs.items(): if job["status"] in ("running", "queued"): abort_flags.add(job_id) job["logs"].append( f"[{datetime.now().strftime('%H:%M:%S')}] ⚠ Abort-all requested." ) aborted.append(job_id) return {"aborted": aborted, "count": len(aborted)} # ══════════════════════════════════════════════════════════════════════════════ # AT Terminal — SSH command execution # ══════════════════════════════════════════════════════════════════════════════ class ATRequest(BaseModel): device_ids: list[str] = [] # empty = all devices in group group: str = "All" commands: list[str] # list of AT commands to send in order ssh_username: str = "" ssh_password: str = "" concurrency: int = 3 def _ssh_send_at_commands( device: dict, commands: list[str], ssh_username: str, ssh_password: str, session_id: str, ) -> dict: """ Open an SSH session to one device on port 3223, send AT commands one at a time, capture output, return result dict. """ device_id = device["id"] ip = device["ip"] username = ssh_username or device.get("username", "user") password = ssh_password or device.get("password", "") ts = lambda: datetime.now().strftime('%H:%M:%S') def log(msg, level="info"): at_sessions[session_id]["logs"].append({ "ts": ts(), "device": device_id, "msg": msg, "level": level }) log(f"Connecting SSH → {ip}:{SSH_PORT}") ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect( ip, port=SSH_PORT, username=username, password=password, timeout=15, look_for_keys=False, allow_agent=False, disabled_algorithms={"pubkeys": [], "keys": []}, ) except Exception as e: log(f"✗ Connection failed: {str(e)[:80]}", "error") return {"id": device_id, "success": False, "message": str(e)[:120], "outputs": []} outputs = [] all_ok = True try: # Use an interactive shell channel so the modem's AT prompt stays alive chan = ssh.invoke_shell(term="vt100", width=200, height=50) chan.settimeout(10) import time, socket # Drain any banner/login text the modem sends on connect time.sleep(1.0) buf = b"" try: while True: chunk = chan.recv(4096) if not chunk: break buf += chunk except socket.timeout: pass for cmd in commands: log(f"→ {cmd}", "cmd") chan.send(cmd + "\r") time.sleep(0.8) response_buf = b"" deadline = time.time() + 8 # max 8 s per command try: while time.time() < deadline: if chan.recv_ready(): chunk = chan.recv(4096) if not chunk: break response_buf += chunk # Stop early if we see OK or ERROR decoded_so_far = response_buf.decode("utf-8", errors="replace") if "\nOK" in decoded_so_far or "\nERROR" in decoded_so_far: time.sleep(0.1) break else: time.sleep(0.1) except socket.timeout: pass response = response_buf.decode("utf-8", errors="replace").strip() # Strip the echoed command from the start if present if response.startswith(cmd): response = response[len(cmd):].strip() if response: for line in response.splitlines(): stripped = line.strip() if stripped: log(f" {stripped}", "data") else: log(" (no response)", "warn") outputs.append({"command": cmd, "output": response}) # If ATZ was sent the modem will reboot — nothing more to do if cmd.strip().upper() == "ATZ": log(" Modem rebooting.", "warn") break except Exception as e: log(f"✗ Session error: {str(e)[:80]}", "error") all_ok = False outputs.append({"command": "session", "output": str(e)}) finally: try: chan.close() except Exception: pass ssh.close() log(f"{'✓ Done' if all_ok else '✗ Finished with errors'}", "ok" if all_ok else "error") return { "id": device_id, "success": all_ok, "message": "OK" if all_ok else "Errors — see log", "outputs": outputs, } def _run_at_session(session_id: str, devices: list, commands: list, ssh_username: str, ssh_password: str, concurrency: int): """Background thread: send AT commands to all selected devices.""" import concurrent.futures at_sessions[session_id]["status"] = "running" results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: futures = { executor.submit(_ssh_send_at_commands, d, commands, ssh_username, ssh_password, session_id): d for d in devices } for future in concurrent.futures.as_completed(futures): try: results.append(future.result()) except Exception as e: d = futures[future] results.append({"id": d["id"], "success": False, "message": str(e), "outputs": []}) at_sessions[session_id]["results"] = results at_sessions[session_id]["status"] = "done" at_sessions[session_id]["finished"] = datetime.now().isoformat() @app.post("/api/at/send") async def at_send(request: Request, req: ATRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)): """Start an AT command session against one or more devices.""" if not req.commands or not any(c.strip() for c in req.commands): return {"error": "No commands provided"} commands = [c.strip() for c in req.commands if c.strip()] devices = load_devices(req.group) if req.device_ids: devices = [d for d in devices if d["id"] in req.device_ids] if not devices: return {"error": "No matching devices found"} session_id = f"at_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" at_sessions[session_id] = { "status": "queued", "started": datetime.now().isoformat(), "finished": None, "device_count": len(devices), "commands": commands, "logs": [], "results": [], } # Run in a background thread (paramiko is blocking/sync) t = threading.Thread( target=_run_at_session, args=(session_id, devices, commands, req.ssh_username, req.ssh_password, req.concurrency), daemon=True, ) t.start() return {"session_id": session_id, "device_count": len(devices)} @app.get("/api/at/{session_id}/stream") async def at_stream(request: Request, session_id: str, from_line: int = 0, _auth=Depends(require_auth)): """SSE stream for AT session logs — same pattern as job streaming.""" if session_id not in at_sessions: return StreamingResponse( iter([f'data: {json.dumps({"error": "session not found"})}\n\n']), media_type="text/event-stream" ) async def event_generator(): sent = max(0, from_line) while True: if session_id not in at_sessions: yield f'data: {json.dumps({"error": "session gone"})}\n\n' break sess = at_sessions[session_id] logs = sess["logs"] if len(logs) > sent: for entry in logs[sent:]: yield f"data: {json.dumps({'log': entry})}\n\n" sent += 1 if sess["status"] == "done": yield f"data: {json.dumps({'done': True, 'results': sess['results']})}\n\n" break await asyncio.sleep(0.4) return StreamingResponse(event_generator(), media_type="text/event-stream") @app.get("/api/at/{session_id}") def at_get_session(request: Request, session_id: str, _auth=Depends(require_auth)): if session_id not in at_sessions: return {"error": "Session not found"} return at_sessions[session_id] # ══════════════════════════════════════════════════════════════════════════════ # Auth routes — login / logout # ══════════════════════════════════════════════════════════════════════════════ LOGIN_HTML = """