Files
rv50x-manager/app.py
T
D Stephenson f8125ed123 Simplify to standalone service on shared nocodb-network, add modem_num field
Strips embedded nocodb/postgres services from compose; switches to shared
nocodb-network. Adds modem_num to NocoDB field list. Fixes session cookie
secure flag. Removes previously-tracked XML template files (covered by .gitignore).
2026-06-23 11:42:47 -05:00

1500 lines
63 KiB
Python

"""
RV50x Template Manager - Web GUI Backend
Run directly:
uvicorn app:app --host 0.0.0.0 --port 8000
Run via Docker:
docker-compose up -d
Environment variables (set in .env or docker-compose.yml):
NOCODB_URL NocoDB base URL e.g. http://192.168.16.130:8080
NOCODB_TOKEN NocoDB API token
NOCODB_BASE_ID Base ID from browser URL
NOCODB_TABLE_ID Table ID from browser URL
NOCODB_VIEW_ID View ID for the "All" group
NOCODB_VIEW_ID_ELECTRIC View ID for the "Electric" group (falls back to NOCODB_VIEW_ID)
NOCODB_VIEW_ID_GW View ID for the "Gas & Water" group (falls back to NOCODB_VIEW_ID)
PAGE_TIMEOUT ms to wait for page elements (default 90000)
DOWNLOAD_TIMEOUT ms to wait for template gen (default 120000)
UPLOAD_TIMEOUT ms to wait for upload done (default 120000)
MAX_RETRIES retry attempts per device (default 3)
DB_PATH path to SQLite database file (default ./rv50x.db)
MAX_JOBS_MEMORY max completed jobs kept in RAM (default 200)
EXCEL_CACHE_MAX max Excel files cached in RAM (default 20)
EXCEL_CACHE_TTL seconds before cached Excel expires (default 3600)
REACH_TIMEOUT seconds for TCP reachability pre-check (default 3)
"""
import asyncio
import html as html_lib
import io
import json
import os
import re
import sqlite3
import threading
import urllib.parse
import urllib.request
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import pandas as pd
import paramiko
from fastapi import FastAPI, BackgroundTasks, File, Form, UploadFile, Request, Response, Depends, HTTPException
from fastapi.responses import HTMLResponse, StreamingResponse, RedirectResponse
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from pydantic import BaseModel
# ── NocoDB connection — read from environment, fall back to defaults ───────────
NOCODB_URL = os.environ.get("NOCODB_URL", "http://192.168.16.130:8080")
NOCODB_TOKEN = os.environ.get("NOCODB_TOKEN", "eWU_ilelaCtNy1JzC7vf41DokkqFOovcLHM0zVml")
NOCODB_BASE_ID = os.environ.get("NOCODB_BASE_ID", "pdq96x915xt4a0m")
NOCODB_TABLE_ID = os.environ.get("NOCODB_TABLE_ID", "mkewnr53ahqvnt9")
NOCODB_VIEW_IDS = {
"All": os.environ.get("NOCODB_VIEW_ID", "vwl7qvxo1xclvawz"),
"Electric": os.environ.get("NOCODB_VIEW_ID_ELECTRIC", os.environ.get("NOCODB_VIEW_ID", "vwl7qvxo1xclvawz")),
"Gas & Water": os.environ.get("NOCODB_VIEW_ID_GW", os.environ.get("NOCODB_VIEW_ID", "vwl7qvxo1xclvawz")),
}
# ── Local paths ────────────────────────────────────────────────────────────────
# In Docker these are bind-mounted from the host so data persists across rebuilds.
SCRIPT_DIR = Path(__file__).parent
DOWNLOAD_DIR = Path(os.environ.get("DOWNLOAD_DIR", str(SCRIPT_DIR / "template_downloads")))
UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", str(SCRIPT_DIR / "template_uploads")))
TEMPLATES_DIR = Path(os.environ.get("TEMPLATES_DIR", str(SCRIPT_DIR / "xml_templates")))
DB_PATH = Path(os.environ.get("DB_PATH", str(SCRIPT_DIR / "rv50x.db")))
PRESETS_FILE = SCRIPT_DIR / "at_presets.json"
REACH_TIMEOUT = int(os.environ.get("REACH_TIMEOUT", "3")) # seconds for TCP pre-check
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
# ── Playwright timeouts — tunable via environment ──────────────────────────────
PAGE_TIMEOUT = int(os.environ.get("PAGE_TIMEOUT", "90000"))
DOWNLOAD_TIMEOUT = int(os.environ.get("DOWNLOAD_TIMEOUT", "120000"))
UPLOAD_TIMEOUT = int(os.environ.get("UPLOAD_TIMEOUT", "120000"))
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
# ── In-memory job store ────────────────────────────────────────────────────────
jobs: dict[str, dict] = {}
abort_flags: set[str] = set() # job_ids that have been asked to abort
MAX_JOBS_MEMORY = int(os.environ.get("MAX_JOBS_MEMORY", "200"))
# ── AT Terminal job store (separate from Playwright jobs) ──────────────────────
at_sessions: dict[str, dict] = {} # session_id → {logs, status, results}
SSH_PORT = int(os.environ.get("SSH_PORT", "3223"))
# ── Excel upload cache ─────────────────────────────────────────────────────────
EXCEL_CACHE_MAX = int(os.environ.get("EXCEL_CACHE_MAX", "20"))
EXCEL_CACHE_TTL_S = int(os.environ.get("EXCEL_CACHE_TTL", "3600"))
# Values: {"data": bytes, "ts": datetime}
excel_cache: dict[str, dict] = {}
# ── Auth config ────────────────────────────────────────────────────────────────
APP_USERNAME = os.environ.get("APP_USERNAME", "admin")
APP_PASSWORD = os.environ.get("APP_PASSWORD", "changeme")
SESSION_SECRET = os.environ.get("SESSION_SECRET", "change-this-secret")
SESSION_HOURS = int(os.environ.get("SESSION_HOURS", "8"))
SESSION_COOKIE = "rv50x_session"
_signer = URLSafeTimedSerializer(SESSION_SECRET)
def _make_session_cookie() -> str:
return _signer.dumps({"user": APP_USERNAME})
def _verify_session_cookie(token: str) -> bool:
try:
_signer.loads(token, max_age=SESSION_HOURS * 3600)
return True
except (BadSignature, SignatureExpired):
return False
def require_auth(request: Request):
token = request.cookies.get(SESSION_COOKIE)
if not token or not _verify_session_cookie(token):
raise HTTPException(status_code=302, headers={"Location": "/login"})
# ══════════════════════════════════════════════════════════════════════════════
# SQLite persistence — jobs & AT sessions survive restarts
# ══════════════════════════════════════════════════════════════════════════════
def _init_db():
with sqlite3.connect(DB_PATH) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS jobs (
job_id TEXT PRIMARY KEY,
type TEXT,
status TEXT,
device_count INTEGER,
started TEXT,
finished TEXT,
reboot INTEGER DEFAULT 0,
logs TEXT DEFAULT '[]',
results TEXT DEFAULT '[]'
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS at_sessions (
session_id TEXT PRIMARY KEY,
status TEXT,
started TEXT,
finished TEXT,
device_count INTEGER,
commands TEXT DEFAULT '[]',
logs TEXT DEFAULT '[]',
results TEXT DEFAULT '[]'
)
""")
conn.commit()
def _persist_job(job_id: str):
j = jobs.get(job_id)
if not j:
return
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"""INSERT OR REPLACE INTO jobs
(job_id, type, status, device_count, started, finished, reboot, logs, results)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
job_id, j["type"], j["status"], j["device_count"],
j.get("started"), j.get("finished"),
1 if j.get("reboot") else 0,
json.dumps(j["logs"]), json.dumps(j["results"]),
),
)
conn.commit()
def _persist_at_session(session_id: str):
s = at_sessions.get(session_id)
if not s:
return
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"""INSERT OR REPLACE INTO at_sessions
(session_id, status, started, finished, device_count, commands, logs, results)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id, s["status"], s.get("started"), s.get("finished"),
s["device_count"], json.dumps(s["commands"]),
json.dumps(s["logs"]), json.dumps(s["results"]),
),
)
conn.commit()
def _load_history_from_db():
"""Load the most-recent completed jobs/sessions from SQLite into memory."""
try:
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
for row in conn.execute(
"SELECT * FROM jobs ORDER BY started DESC LIMIT ?", (MAX_JOBS_MEMORY,)
):
if row["job_id"] not in jobs:
jobs[row["job_id"]] = {
"type": row["type"],
"status": row["status"],
"device_count": row["device_count"],
"started": row["started"],
"finished": row["finished"],
"reboot": bool(row["reboot"]),
"logs": json.loads(row["logs"]),
"results": json.loads(row["results"]),
}
for row in conn.execute(
"SELECT * FROM at_sessions ORDER BY started DESC LIMIT ?", (MAX_JOBS_MEMORY,)
):
if row["session_id"] not in at_sessions:
at_sessions[row["session_id"]] = {
"status": row["status"],
"started": row["started"],
"finished": row["finished"],
"device_count": row["device_count"],
"commands": json.loads(row["commands"]),
"logs": json.loads(row["logs"]),
"results": json.loads(row["results"]),
}
except Exception as e:
print(f"[DB] Failed to load history: {e}")
def _prune_jobs_memory():
"""Drop oldest completed jobs from memory once we exceed MAX_JOBS_MEMORY."""
done = sorted(
[(jid, j) for jid, j in jobs.items() if j["status"] == "done"],
key=lambda x: x[1].get("finished", ""),
)
excess = len(done) - MAX_JOBS_MEMORY
for jid, _ in done[:max(0, excess)]:
del jobs[jid]
app = FastAPI(title="RV50x Template Manager")
# Initialise DB and reload history on startup
_init_db()
_load_history_from_db()
# ══════════════════════════════════════════════════════════════════════════════
# Excel cache helpers — bounded size + TTL eviction
# ══════════════════════════════════════════════════════════════════════════════
def _excel_cache_evict():
"""Remove expired entries, then drop oldest until under EXCEL_CACHE_MAX."""
now = datetime.now()
expired = [
k for k, v in excel_cache.items()
if (now - v["ts"]).total_seconds() > EXCEL_CACHE_TTL_S
]
for k in expired:
del excel_cache[k]
if len(excel_cache) >= EXCEL_CACHE_MAX:
oldest = sorted(excel_cache.items(), key=lambda x: x[1]["ts"])
for k, _ in oldest[: len(excel_cache) - EXCEL_CACHE_MAX + 1]:
del excel_cache[k]
def _excel_cache_set(filename: str, data: bytes):
_excel_cache_evict()
excel_cache[filename] = {"data": data, "ts": datetime.now()}
def _excel_cache_get(filename: str) -> bytes | None:
entry = excel_cache.get(filename)
if not entry:
return None
if (datetime.now() - entry["ts"]).total_seconds() > EXCEL_CACHE_TTL_S:
del excel_cache[filename]
return None
return entry["data"]
# ══════════════════════════════════════════════════════════════════════════════
# NocoDB helpers
# ══════════════════════════════════════════════════════════════════════════════
def _fetch_all_rows(view_id: str) -> list:
headers = {"xc-token": NOCODB_TOKEN, "Content-Type": "application/json"}
all_rows = []
offset, limit = 0, 1000
while True:
params = urllib.parse.urlencode({"viewId": view_id, "limit": limit, "offset": offset})
url = f"{NOCODB_URL}/api/v1/db/data/noco/{NOCODB_BASE_ID}/{NOCODB_TABLE_ID}?{params}"
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode())
all_rows.extend(data.get("list", []))
if data.get("pageInfo", {}).get("isLastPage", True):
break
offset += limit
return all_rows
# All NocoDB fields we care about — everything becomes a template variable
NOCODB_FIELDS = [
"hostname", "ip_address", "username", "password", "https_port",
"use_https", "ssh_port", "location", "alias", "dept", "def_pass",
"snmp_auth_key", "snmp_priv_key", "fort_key", "vpn_subnets", "modem_num",
]
def load_devices(group: str = "All") -> list:
view_id = NOCODB_VIEW_IDS.get(group, NOCODB_VIEW_IDS["All"])
rows = _fetch_all_rows(view_id)
devices = []
for row in rows:
device_id = str(row.get("hostname", "")).strip()
ip = str(row.get("ip_address", "")).strip()
username = str(row.get("username", "")).strip()
password = str(row.get("password", "")).strip()
dept = str(row.get("dept", "")).strip()
location = str(row.get("location", "")).strip()
if not device_id or not ip:
continue
if group == "Electric" and dept.upper() != "ELEC":
continue
if group == "Gas & Water" and dept.upper() != "GW":
continue
devices.append({
"id": device_id, "ip": ip,
"username": username, "password": password,
"dept": dept, "location": location,
})
return devices
def load_devices_full(group: str = "All") -> list:
"""Load all NocoDB fields for each device (used by XML Builder)."""
view_id = NOCODB_VIEW_IDS.get(group, NOCODB_VIEW_IDS["All"])
rows = _fetch_all_rows(view_id)
devices = []
for row in rows:
device_id = str(row.get("hostname", "")).strip()
dept = str(row.get("dept", "")).strip()
if not device_id:
continue
if group == "Electric" and dept.upper() != "ELEC":
continue
if group == "Gas & Water" and dept.upper() != "GW":
continue
# Build a flat dict of all available fields
device = {}
for field in NOCODB_FIELDS:
val = row.get(field, "")
device[field] = str(val).strip() if val is not None else ""
devices.append(device)
return devices
# ══════════════════════════════════════════════════════════════════════════════
# XML Builder — core logic (replaces excel_to_xml.py)
# ══════════════════════════════════════════════════════════════════════════════
def _extract_template_variables(template: str) -> list[str]:
"""Return unique variable names found in {var} or {{ var }} placeholders."""
pattern = r'\{\{?\s*([^{}]+?)\s*\}?\}'
return list(set(re.findall(pattern, template)))
def _escape_xml(value) -> str:
"""Convert a value to an XML-safe string."""
if not isinstance(value, str):
# Convert float-integers like 443.0 → "443"
if isinstance(value, float) and value == int(value):
value = str(int(value))
else:
value = str(value)
# Escape backslashes then XML special chars
value = value.replace('\\', '\\\\')
value = html_lib.escape(value)
return value
def _render_template(template: str, data: dict, default: str = "NOTSET") -> str:
"""Replace all {var} and {{ var }} placeholders in template with data values."""
for var, raw_val in data.items():
val = _escape_xml(raw_val) if raw_val not in ("", None) else default
try:
template = re.sub(r'\{\{\s*' + re.escape(var) + r'\s*\}\}', val, template)
template = re.sub(r'\{' + re.escape(var) + r'\}', val, template)
except Exception:
pass
# Fill any remaining unreplaced variables with the default
template = re.sub(r'\{\{?\s*[^{}]+?\s*\}?\}', default, template)
return template
def build_xmls_from_nocodb(
group: str,
hostnames: list[str],
template: str,
default: str = "NOTSET",
) -> list[dict]:
"""
Generate XML files for the given hostnames using live NocoDB data.
Files are written to template_uploads/.
Returns a list of result dicts: {hostname, success, message}.
"""
all_devices = load_devices_full(group)
device_map = {d["hostname"]: d for d in all_devices}
results = []
for hostname in hostnames:
if hostname not in device_map:
results.append({"hostname": hostname, "success": False, "message": "Not found in NocoDB"})
continue
data = device_map[hostname]
content = _render_template(template, data, default)
out = UPLOAD_DIR / f"{hostname}.xml"
try:
out.write_text(content, encoding="utf-8")
results.append({"hostname": hostname, "success": True, "message": f"{out.name}"})
except Exception as e:
results.append({"hostname": hostname, "success": False, "message": str(e)})
return results
def build_xmls_from_excel(
excel_bytes: bytes,
template: str,
hostnames: list[str],
default: str = "NOTSET",
) -> list[dict]:
"""
Generate XML files for the given hostnames using data from an uploaded Excel file.
Files are written to template_uploads/.
Returns a list of result dicts: {hostname, success, message}.
"""
df = pd.read_excel(io.BytesIO(excel_bytes))
# Find hostname column
hostname_col = next(
(c for c in df.columns if "hostname" in c.lower()), None
)
if not hostname_col:
return [{"hostname": h, "success": False, "message": "No hostname column in Excel"} for h in hostnames]
results = []
for hostname in hostnames:
rows = df[df[hostname_col] == hostname]
if rows.empty:
results.append({"hostname": hostname, "success": False, "message": "Not found in Excel"})
continue
row = rows.iloc[0].to_dict()
data = {}
for col, val in row.items():
clean = col.strip().strip('{}')
if pd.notna(val):
if isinstance(val, float) and val == int(val):
data[clean] = str(int(val))
else:
data[clean] = str(val)
else:
data[clean] = default
content = _render_template(template, data, default)
out = UPLOAD_DIR / f"{hostname}.xml"
try:
out.write_text(content, encoding="utf-8")
results.append({"hostname": hostname, "success": True, "message": f"{out.name}"})
except Exception as e:
results.append({"hostname": hostname, "success": False, "message": str(e)})
return results
# ══════════════════════════════════════════════════════════════════════════════
# Report writer
# ══════════════════════════════════════════════════════════════════════════════
def _write_report(job_id: str, job: dict, output_dir: Path) -> Path:
"""Write a timestamped report file matching the CLI format."""
results = job.get("results", [])
succeeded = [r for r in results if r.get("success")]
failed = [r for r in results if not r.get("success")]
started = datetime.fromisoformat(job["started"]) if job.get("started") else datetime.now()
finished = datetime.fromisoformat(job["finished"]) if job.get("finished") else datetime.now()
elapsed = (finished - started).total_seconds()
job_type = job.get("type", "job").upper()
timestamp = finished.strftime("%Y-%m-%d %H:%M")
lines = []
lines.append("" * 60)
lines.append(f" {job_type} SUMMARY — {timestamp}")
lines.append(f" Job ID: {job_id}")
lines.append("" * 60)
lines.append(f" Total: {len(results)}")
lines.append(f" ✓ Success: {len(succeeded)}")
lines.append(f" ✗ Failed: {len(failed)}")
lines.append(f" Time: {elapsed:.0f}s")
if succeeded:
lines.append("")
lines.append(f" {'Downloaded' if job_type == 'DOWNLOAD' else 'Uploaded'}:")
for r in succeeded:
extra = f" ({r.get('size_kb', '')} KB)" if r.get("size_kb") else ""
lines.append(f"{r['id']}{extra}")
if failed:
lines.append("")
lines.append(" Failed:")
for r in failed:
lines.append(f"{r['id']}{r.get('message', '')}")
lines.append("" * 60)
lines.append("")
lines.append(" Full log:")
lines.append("")
for log_line in job.get("logs", []):
lines.append(f" {log_line}")
lines.append("" * 60)
report_name = f"report_{finished.strftime('%Y%m%d_%H%M%S')}.txt"
report_path = output_dir / report_name
report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return report_path
# ══════════════════════════════════════════════════════════════════════════════
# Playwright workers (download / upload to modems)
# ══════════════════════════════════════════════════════════════════════════════
def _is_reachable(ip: str, port: int = 443) -> bool:
"""Single TCP connect to confirm a device is accepting connections on port 443."""
import socket
try:
with socket.create_connection((ip, port), timeout=REACH_TIMEOUT):
return True
except OSError:
return False
async def _login_and_open_modal(page, device: dict, log) -> bool:
"""
Log in to a device web UI and open the template dialog.
Returns True when the modal is visible and ready, False if it never opened.
Raises on network / timeout errors so callers can handle them uniformly.
"""
ip = device["ip"]
username = device["username"]
password = device["password"]
await page.goto(f"https://{ip}", wait_until="load")
await page.wait_for_selector('input[name="username"]', state="visible", timeout=PAGE_TIMEOUT)
await page.wait_for_timeout(1000)
await page.click('input[name="username"]', click_count=3)
await page.fill('input[name="username"]', username)
await page.fill('input[name="password"]', password)
await page.wait_for_timeout(800)
login_btn = page.locator('input[name="Login"]')
await login_btn.scroll_into_view_if_needed()
await login_btn.click()
await page.wait_for_selector('#btn_tpl', state="visible", timeout=PAGE_TIMEOUT)
await page.wait_for_function("typeof showTemplateDialog === 'function'", timeout=PAGE_TIMEOUT)
await page.wait_for_timeout(500)
log("Logged in.")
for _ in range(5):
await page.evaluate("showTemplateDialog()")
await page.wait_for_timeout(1000)
visible = await page.evaluate("""
() => {
const el = document.getElementById('template_name');
if (!el) return false;
const form = el.closest('form') || el.closest('div[id]');
if (!form) return el.offsetParent !== null;
return form.offsetParent !== null;
}
""")
if visible:
return True
return False
async def _download_one(device: dict, job_id: str) -> dict:
from playwright.async_api import async_playwright, TimeoutError as PWTimeout
device_id = device["id"]
def log(msg):
jobs[job_id]["logs"].append(f"[{datetime.now().strftime('%H:%M:%S')}] [{device_id}] {msg}")
if not _is_reachable(device["ip"]):
log(f"✗ Unreachable (no TCP response on :443 within {REACH_TIMEOUT}s)")
return {"id": device_id, "success": False, "message": "Unreachable"}
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(ignore_https_errors=True, accept_downloads=True)
page = await context.new_page()
page.set_default_timeout(PAGE_TIMEOUT)
try:
if not await _login_and_open_modal(page, device, log):
return {"id": device_id, "success": False, "message": "Modal did not open"}
await page.fill('input[id="template_name"]', device_id)
pwd_cb = page.locator('input[id="template_pwds"]')
if not await pwd_cb.is_checked():
await pwd_cb.check()
info_cb = page.locator('input[id="template_info"]')
if await info_cb.count() > 0 and await info_cb.is_checked():
await info_cb.uncheck()
page.on("dialog", lambda d: asyncio.ensure_future(d.dismiss()))
out_path = DOWNLOAD_DIR / f"{device_id}.xml"
async with page.expect_download(timeout=DOWNLOAD_TIMEOUT) as dl:
await page.click('input.fbtn[name="download_template"]')
download = await dl.value
await download.save_as(out_path)
await page.wait_for_timeout(1000)
if out_path.exists() and out_path.stat().st_size > 0:
size_kb = out_path.stat().st_size // 1024
log(f"✓ Saved {size_kb} KB")
return {"id": device_id, "success": True, "message": f"{size_kb} KB", "size_kb": size_kb}
return {"id": device_id, "success": False, "message": "File empty after download"}
except PWTimeout:
log("✗ TIMEOUT")
return {"id": device_id, "success": False, "message": "TIMEOUT"}
except Exception as e:
log(f"✗ ERROR: {str(e)[:80]}")
return {"id": device_id, "success": False, "message": str(e)[:120]}
finally:
await context.close()
await browser.close()
async def _upload_one(device: dict, reboot: bool, job_id: str) -> dict:
from playwright.async_api import async_playwright, TimeoutError as PWTimeout
device_id = device["id"]
template_file = UPLOAD_DIR / f"{device_id}.xml"
def log(msg):
jobs[job_id]["logs"].append(f"[{datetime.now().strftime('%H:%M:%S')}] [{device_id}] {msg}")
if not template_file.exists():
log("✗ Template file not found")
return {"id": device_id, "success": False, "message": "Template file not found"}
if not _is_reachable(device["ip"]):
log(f"✗ Unreachable (no TCP response on :443 within {REACH_TIMEOUT}s)")
return {"id": device_id, "success": False, "message": "Unreachable"}
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(ignore_https_errors=True, accept_downloads=True)
page = await context.new_page()
page.set_default_timeout(PAGE_TIMEOUT)
try:
if not await _login_and_open_modal(page, device, log):
return {"id": device_id, "success": False, "message": "Modal did not open"}
await page.locator('input[id="template_filename"]').set_input_files(str(template_file))
log(f"File set: {template_file.name}")
reboot_cb = page.locator('input[name="reboot_after_upload"]')
is_checked = await reboot_cb.is_checked()
if reboot and not is_checked:
await reboot_cb.check()
elif not reboot and is_checked:
await reboot_cb.uncheck()
page.on("dialog", lambda d: asyncio.ensure_future(d.dismiss()))
log("Uploading...")
if reboot:
# Click upload then race multiple reboot signals.
# Different firmware versions signal reboot differently:
# - URL navigates to /?reboot
# - Page shows a reboot/success message
# - Connection drops entirely (device gone)
await page.click('input[name="upload_template"]')
reboot_confirmed = False
deadline = UPLOAD_TIMEOUT / 1000 # seconds
async def _any_reboot_signal():
tasks = [
asyncio.ensure_future(page.wait_for_url("**/?reboot**", timeout=UPLOAD_TIMEOUT)),
asyncio.ensure_future(page.wait_for_url("**reboot**", timeout=UPLOAD_TIMEOUT)),
asyncio.ensure_future(page.wait_for_selector(
'text=Rebooting, text=reboot, text=Upload Complete, text=Apply Complete',
timeout=UPLOAD_TIMEOUT
)),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
return bool(done)
try:
reboot_confirmed = await asyncio.wait_for(_any_reboot_signal(), timeout=deadline)
except (asyncio.TimeoutError, Exception):
# Connection drop or page crash after clicking upload
# is itself strong evidence the device is rebooting.
try:
await page.title()
except Exception:
reboot_confirmed = True # page is gone = rebooted
if reboot_confirmed:
log("✓ Rebooting — upload successful.")
return {"id": device_id, "success": True, "message": "OK"}
log("✗ Reboot signal not detected after upload")
return {"id": device_id, "success": False, "message": "Reboot not confirmed"}
else:
await page.click('input[name="upload_template"]')
await page.wait_for_selector(
'text=Upload Complete, text=Apply Complete, text=Template Applied',
timeout=UPLOAD_TIMEOUT
)
log("✓ Upload complete (no reboot).")
return {"id": device_id, "success": True, "message": "OK"}
except PWTimeout:
log("✗ TIMEOUT")
return {"id": device_id, "success": False, "message": "TIMEOUT"}
except Exception as e:
err = str(e).lower()
# Connection errors AFTER upload started = device rebooted
if reboot and any(k in err for k in ("connection", "net::", "closed", "crashed", "target", "disconnected")):
log("✓ Connection closed — device is rebooting.")
return {"id": device_id, "success": True, "message": "OK"}
log(f"✗ ERROR: {str(e)[:80]}")
return {"id": device_id, "success": False, "message": str(e)[:120]}
finally:
await context.close()
await browser.close()
async def _run_with_retry(fn, device, job_id, **kwargs):
device_id = device["id"]
ts = lambda: datetime.now().strftime('%H:%M:%S')
for attempt in range(1, MAX_RETRIES + 1):
# Check abort flag before each attempt
if job_id in abort_flags:
jobs[job_id]["logs"].append(f"[{ts()}] [{device_id}] ✗ Aborted.")
return {"id": device_id, "success": False, "message": "Aborted"}
jobs[job_id]["logs"].append(
f"[{ts()}] [{device_id}] Attempt {attempt}/{MAX_RETRIES}"
)
result = await fn(device, job_id=job_id, **kwargs)
if result["success"]:
return result
if attempt < MAX_RETRIES:
if job_id in abort_flags:
jobs[job_id]["logs"].append(f"[{ts()}] [{device_id}] ✗ Aborted.")
return {"id": device_id, "success": False, "message": "Aborted"}
jobs[job_id]["logs"].append(
f"[{ts()}] [{device_id}] Retrying in 15s..."
)
# Poll abort flag every second so abort is responsive
for _ in range(15):
if job_id in abort_flags:
break
await asyncio.sleep(1)
return result
# ══════════════════════════════════════════════════════════════════════════════
# Background job runners
# ══════════════════════════════════════════════════════════════════════════════
async def run_download_job(job_id: str, devices: list, concurrency: int):
jobs[job_id]["status"] = "running"
semaphore = asyncio.Semaphore(concurrency)
async def _guarded(device):
async with semaphore:
return await _run_with_retry(_download_one, device, job_id)
results = await asyncio.gather(*[_guarded(d) for d in devices])
jobs[job_id]["results"] = list(results)
jobs[job_id]["status"] = "done"
jobs[job_id]["finished"] = datetime.now().isoformat()
abort_flags.discard(job_id)
report_path = _write_report(job_id, jobs[job_id], DOWNLOAD_DIR)
jobs[job_id]["logs"].append(
f"[{datetime.now().strftime('%H:%M:%S')}] Report saved → {report_path.name}"
)
_persist_job(job_id)
_prune_jobs_memory()
async def run_upload_job(job_id: str, devices: list, concurrency: int, reboot: bool):
jobs[job_id]["status"] = "running"
semaphore = asyncio.Semaphore(concurrency)
async def _guarded(device):
async with semaphore:
return await _run_with_retry(_upload_one, device, job_id, reboot=reboot)
results = await asyncio.gather(*[_guarded(d) for d in devices])
jobs[job_id]["results"] = list(results)
jobs[job_id]["status"] = "done"
jobs[job_id]["finished"] = datetime.now().isoformat()
abort_flags.discard(job_id)
report_path = _write_report(job_id, jobs[job_id], UPLOAD_DIR)
jobs[job_id]["logs"].append(
f"[{datetime.now().strftime('%H:%M:%S')}] Report saved → {report_path.name}"
)
_persist_job(job_id)
_prune_jobs_memory()
# ══════════════════════════════════════════════════════════════════════════════
# API models
# ══════════════════════════════════════════════════════════════════════════════
class JobRequest(BaseModel):
group: str = "All"
device_ids: list[str] = []
concurrency: int = 1
reboot: bool = True
class BuildRequest(BaseModel):
source: str # "nocodb" or "excel"
group: str = "All"
hostnames: list[str]
template_name: str # filename in xml_templates/
default_value: str = "NOTSET"
class PresetRequest(BaseModel):
name: str
commands: list[str]
# ══════════════════════════════════════════════════════════════════════════════
# API routes — devices & jobs (unchanged)
# ══════════════════════════════════════════════════════════════════════════════
@app.get("/api/devices")
def api_devices(request: Request, group: str = "All", _auth=Depends(require_auth)):
try:
devices = load_devices(group)
return {"devices": devices, "count": len(devices)}
except Exception as e:
return {"error": str(e), "devices": [], "count": 0}
@app.post("/api/jobs/download")
async def start_download(request: Request, req: JobRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)):
devices = load_devices(req.group)
if req.device_ids:
devices = [d for d in devices if d["id"] in req.device_ids]
if not devices:
return {"error": "No matching devices found"}
job_id = f"dl_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
jobs[job_id] = {
"type": "download", "status": "queued",
"logs": [], "results": [],
"started": datetime.now().isoformat(), "finished": None,
"device_count": len(devices),
}
background_tasks.add_task(run_download_job, job_id, devices, req.concurrency)
return {"job_id": job_id}
@app.post("/api/jobs/upload")
async def start_upload(request: Request, req: JobRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)):
devices = load_devices(req.group)
if req.device_ids:
devices = [d for d in devices if d["id"] in req.device_ids]
devices = [d for d in devices if (UPLOAD_DIR / f"{d['id']}.xml").exists()]
if not devices:
return {"error": "No devices with matching template files found in template_uploads/"}
job_id = f"ul_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
jobs[job_id] = {
"type": "upload", "status": "queued",
"logs": [], "results": [],
"started": datetime.now().isoformat(), "finished": None,
"device_count": len(devices),
"reboot": req.reboot,
}
background_tasks.add_task(run_upload_job, job_id, devices, req.concurrency, req.reboot)
return {"job_id": job_id}
@app.get("/api/jobs/{job_id}")
def get_job(request: Request, job_id: str, _auth=Depends(require_auth)):
if job_id not in jobs:
return {"error": "Job not found"}
return jobs[job_id]
@app.get("/api/jobs/{job_id}/stream")
async def stream_job(request: Request, job_id: str, from_line: int = 0, _auth=Depends(require_auth)):
"""Stream job logs via SSE. Pass ?from_line=N to skip already-seen lines."""
async def event_generator():
sent = max(0, from_line)
while True:
if job_id not in jobs:
yield "data: {\"error\": \"job not found\"}\n\n"
break
job = jobs[job_id]
logs = job["logs"]
if len(logs) > sent:
for line in logs[sent:]:
yield f"data: {json.dumps({'log': line, 'line_num': sent})}\n\n"
sent += 1
if job["status"] == "done":
yield f"data: {json.dumps({'done': True, 'results': job['results']})}\n\n"
break
await asyncio.sleep(0.5)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/jobs")
def list_jobs(request: Request, _auth=Depends(require_auth)):
summary = []
for jid, j in sorted(jobs.items(), reverse=True):
summary.append({
"job_id": jid, "type": j["type"], "status": j["status"],
"device_count": j["device_count"], "started": j["started"],
"finished": j.get("finished"),
"succeeded": sum(1 for r in j["results"] if r.get("success")),
"failed": sum(1 for r in j["results"] if not r.get("success")),
})
return {"jobs": summary}
@app.get("/api/files/downloads")
def list_downloads(request: Request, _auth=Depends(require_auth)):
files = sorted(DOWNLOAD_DIR.glob("*.xml"))
return {"files": [{"name": f.name, "size_kb": f.stat().st_size // 1024} for f in files]}
@app.get("/api/files/uploads")
def list_uploads(request: Request, _auth=Depends(require_auth)):
files = sorted(UPLOAD_DIR.glob("*.xml"))
return {"files": [{"name": f.name, "size_kb": f.stat().st_size // 1024} for f in files]}
# ══════════════════════════════════════════════════════════════════════════════
# API routes — XML Builder
# ══════════════════════════════════════════════════════════════════════════════
@app.get("/api/xmlbuilder/nocodb-fields")
def xmlbuilder_nocodb_fields(request: Request, _auth=Depends(require_auth)):
"""Return available NocoDB field names (= available template variables)."""
return {"fields": NOCODB_FIELDS}
@app.get("/api/xmlbuilder/nocodb-devices")
def xmlbuilder_nocodb_devices(request: Request, group: str = "All", _auth=Depends(require_auth)):
"""Return hostnames for the given group from NocoDB."""
try:
devices = load_devices_full(group)
hostnames = [d["hostname"] for d in devices if d["hostname"]]
return {"hostnames": hostnames, "count": len(hostnames)}
except Exception as e:
return {"error": str(e), "hostnames": []}
@app.post("/api/xmlbuilder/upload-excel")
async def xmlbuilder_upload_excel(request: Request, file: UploadFile = File(...), _auth=Depends(require_auth)):
"""
Accept an uploaded Excel file, return its columns and hostnames.
Stores the file temporarily in memory (not on disk).
"""
try:
contents = await file.read()
df = pd.read_excel(io.BytesIO(contents))
columns = [c.strip().strip('{}') for c in df.columns]
hostname_col = next((c for c in df.columns if "hostname" in c.lower()), None)
hostnames = df[hostname_col].dropna().astype(str).tolist() if hostname_col else []
_excel_cache_set(file.filename, contents)
return {
"filename": file.filename,
"columns": columns,
"hostnames": hostnames,
"rows": len(df),
}
except Exception as e:
return {"error": str(e)}
@app.get("/api/xmlbuilder/templates")
def xmlbuilder_list_templates(request: Request, _auth=Depends(require_auth)):
"""List XML template files stored in xml_templates/."""
files = sorted(TEMPLATES_DIR.glob("*.xml"))
result = []
for f in files:
content = f.read_text(encoding="utf-8", errors="replace")
variables = _extract_template_variables(content)
result.append({
"name": f.name,
"size_kb": f.stat().st_size // 1024,
"variables": variables,
})
return {"templates": result}
@app.post("/api/xmlbuilder/upload-template")
async def xmlbuilder_upload_template(request: Request, file: UploadFile = File(...), _auth=Depends(require_auth)):
"""Save an uploaded XML template into xml_templates/."""
try:
contents = await file.read()
dest = TEMPLATES_DIR / file.filename
dest.write_bytes(contents)
text = contents.decode("utf-8", errors="replace")
variables = _extract_template_variables(text)
return {"name": file.filename, "variables": variables, "saved": True}
except Exception as e:
return {"error": str(e)}
@app.post("/api/xmlbuilder/generate/nocodb")
def xmlbuilder_generate_nocodb(request: Request, req: BuildRequest, _auth=Depends(require_auth)):
"""Generate XMLs from NocoDB data and save to template_uploads/."""
template_path = TEMPLATES_DIR / req.template_name
if not template_path.exists():
return {"error": f"Template '{req.template_name}' not found in xml_templates/"}
template = template_path.read_text(encoding="utf-8")
results = build_xmls_from_nocodb(req.group, req.hostnames, template, req.default_value)
ok = sum(1 for r in results if r["success"])
err = len(results) - ok
return {"results": results, "succeeded": ok, "failed": err}
@app.post("/api/xmlbuilder/generate/excel")
async def xmlbuilder_generate_excel(
request: Request,
hostnames: str = Form(...), # JSON array string
template_name: str = Form(...),
excel_filename: str = Form(...), # key into excel_cache
default_value: str = Form("NOTSET"),
excel_file: Optional[UploadFile] = File(None),
_auth=Depends(require_auth),
):
"""Generate XMLs from Excel data and save to template_uploads/."""
template_path = TEMPLATES_DIR / template_name
if not template_path.exists():
return {"error": f"Template '{template_name}' not found in xml_templates/"}
template = template_path.read_text(encoding="utf-8")
# Get Excel bytes — from fresh upload or cache
if excel_file:
excel_bytes = await excel_file.read()
_excel_cache_set(excel_filename, excel_bytes)
else:
excel_bytes = _excel_cache_get(excel_filename)
if excel_bytes is None:
return {"error": "Excel file not found or session expired. Please re-upload it."}
host_list = json.loads(hostnames)
results = build_xmls_from_excel(excel_bytes, template, host_list, default_value)
ok = sum(1 for r in results if r["success"])
err = len(results) - ok
return {"results": results, "succeeded": ok, "failed": err}
# ══════════════════════════════════════════════════════════════════════════════
# Abort endpoint
# ══════════════════════════════════════════════════════════════════════════════
@app.post("/api/jobs/{job_id}/abort")
def abort_job(request: Request, job_id: str, _auth=Depends(require_auth)):
if job_id not in jobs:
return {"error": "Job not found"}
if jobs[job_id]["status"] == "done":
return {"error": "Job already finished"}
abort_flags.add(job_id)
jobs[job_id]["logs"].append(
f"[{datetime.now().strftime('%H:%M:%S')}] ⚠ Abort requested — stopping after current device..."
)
return {"aborted": True, "job_id": job_id}
@app.post("/api/jobs/abort-all")
def abort_all_jobs(request: Request, _auth=Depends(require_auth)):
aborted = []
for job_id, job in jobs.items():
if job["status"] in ("running", "queued"):
abort_flags.add(job_id)
job["logs"].append(
f"[{datetime.now().strftime('%H:%M:%S')}] ⚠ Abort-all requested."
)
aborted.append(job_id)
return {"aborted": aborted, "count": len(aborted)}
# ══════════════════════════════════════════════════════════════════════════════
# AT Terminal — SSH command execution
# ══════════════════════════════════════════════════════════════════════════════
class ATRequest(BaseModel):
device_ids: list[str] = [] # empty = all devices in group
group: str = "All"
commands: list[str] # list of AT commands to send in order
ssh_username: str = ""
ssh_password: str = ""
concurrency: int = 3
def _ssh_send_at_commands(
device: dict,
commands: list[str],
ssh_username: str,
ssh_password: str,
session_id: str,
) -> dict:
"""
Open an SSH session to one device on port 3223, send AT commands
one at a time, capture output, return result dict.
"""
device_id = device["id"]
ip = device["ip"]
username = ssh_username or device.get("username", "user")
password = ssh_password or device.get("password", "")
ts = lambda: datetime.now().strftime('%H:%M:%S')
def log(msg, level="info"):
at_sessions[session_id]["logs"].append({
"ts": ts(), "device": device_id, "msg": msg, "level": level
})
log(f"Connecting SSH → {ip}:{SSH_PORT}")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(
ip,
port=SSH_PORT,
username=username,
password=password,
timeout=15,
look_for_keys=False,
allow_agent=False,
disabled_algorithms={"pubkeys": [], "keys": []},
)
except Exception as e:
log(f"✗ Connection failed: {str(e)[:80]}", "error")
return {"id": device_id, "success": False, "message": str(e)[:120], "outputs": []}
outputs = []
all_ok = True
try:
# Use an interactive shell channel so the modem's AT prompt stays alive
chan = ssh.invoke_shell(term="vt100", width=200, height=50)
chan.settimeout(10)
import time, socket
# Drain any banner/login text the modem sends on connect
time.sleep(1.0)
buf = b""
try:
while True:
chunk = chan.recv(4096)
if not chunk:
break
buf += chunk
except socket.timeout:
pass
for cmd in commands:
log(f"{cmd}", "cmd")
chan.send(cmd + "\r")
time.sleep(0.8)
response_buf = b""
deadline = time.time() + 8 # max 8 s per command
try:
while time.time() < deadline:
if chan.recv_ready():
chunk = chan.recv(4096)
if not chunk:
break
response_buf += chunk
# Stop early if we see OK or ERROR
decoded_so_far = response_buf.decode("utf-8", errors="replace")
if "\nOK" in decoded_so_far or "\nERROR" in decoded_so_far:
time.sleep(0.1)
break
else:
time.sleep(0.1)
except socket.timeout:
pass
response = response_buf.decode("utf-8", errors="replace").strip()
# Strip the echoed command from the start if present
if response.startswith(cmd):
response = response[len(cmd):].strip()
if response:
for line in response.splitlines():
stripped = line.strip()
if stripped:
log(f" {stripped}", "data")
else:
log(" (no response)", "warn")
outputs.append({"command": cmd, "output": response})
# If ATZ was sent the modem will reboot — nothing more to do
if cmd.strip().upper() == "ATZ":
log(" Modem rebooting.", "warn")
break
except Exception as e:
log(f"✗ Session error: {str(e)[:80]}", "error")
all_ok = False
outputs.append({"command": "session", "output": str(e)})
finally:
try:
chan.close()
except Exception:
pass
ssh.close()
log(f"{'✓ Done' if all_ok else '✗ Finished with errors'}", "ok" if all_ok else "error")
return {
"id": device_id, "success": all_ok,
"message": "OK" if all_ok else "Errors — see log",
"outputs": outputs,
}
def _run_at_session(session_id: str, devices: list, commands: list, ssh_username: str, ssh_password: str, concurrency: int):
"""Background thread: send AT commands to all selected devices."""
import concurrent.futures
at_sessions[session_id]["status"] = "running"
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
futures = {
executor.submit(_ssh_send_at_commands, d, commands, ssh_username, ssh_password, session_id): d
for d in devices
}
for future in concurrent.futures.as_completed(futures):
try:
results.append(future.result())
except Exception as e:
d = futures[future]
results.append({"id": d["id"], "success": False, "message": str(e), "outputs": []})
at_sessions[session_id]["results"] = results
at_sessions[session_id]["status"] = "done"
at_sessions[session_id]["finished"] = datetime.now().isoformat()
_persist_at_session(session_id)
@app.post("/api/at/send")
async def at_send(request: Request, req: ATRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)):
"""Start an AT command session against one or more devices."""
if not req.commands or not any(c.strip() for c in req.commands):
return {"error": "No commands provided"}
commands = [c.strip() for c in req.commands if c.strip()]
devices = load_devices(req.group)
if req.device_ids:
devices = [d for d in devices if d["id"] in req.device_ids]
if not devices:
return {"error": "No matching devices found"}
session_id = f"at_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
at_sessions[session_id] = {
"status": "queued",
"started": datetime.now().isoformat(),
"finished": None,
"device_count": len(devices),
"commands": commands,
"logs": [],
"results": [],
}
# Run in a background thread (paramiko is blocking/sync)
t = threading.Thread(
target=_run_at_session,
args=(session_id, devices, commands, req.ssh_username, req.ssh_password, req.concurrency),
daemon=True,
)
t.start()
return {"session_id": session_id, "device_count": len(devices)}
# ── AT preset helpers ──────────────────────────────────────────────────────────
def _read_presets() -> dict:
if PRESETS_FILE.exists():
try:
return json.loads(PRESETS_FILE.read_text(encoding="utf-8"))
except Exception:
return {}
return {}
def _write_presets(presets: dict):
PRESETS_FILE.write_text(json.dumps(presets, indent=2, ensure_ascii=False), encoding="utf-8")
# Preset routes MUST be registered before the {session_id} wildcard routes below.
@app.get("/api/at/presets")
def at_list_presets(request: Request, _auth=Depends(require_auth)):
presets = _read_presets()
return {"presets": [{"name": k, "commands": v} for k, v in presets.items()]}
@app.post("/api/at/presets")
def at_save_preset(request: Request, req: PresetRequest, _auth=Depends(require_auth)):
name = req.name.strip()
commands = [c.strip() for c in req.commands if c.strip()]
if not name:
return {"error": "Preset name cannot be empty"}
if not commands:
return {"error": "No commands provided"}
presets = _read_presets()
presets[name] = commands
_write_presets(presets)
return {"saved": True, "name": name, "commands": commands}
@app.delete("/api/at/presets/{name}")
def at_delete_preset(request: Request, name: str, _auth=Depends(require_auth)):
presets = _read_presets()
if name not in presets:
return {"error": "Preset not found"}
del presets[name]
_write_presets(presets)
return {"deleted": True, "name": name}
@app.get("/api/at/{session_id}/stream")
async def at_stream(request: Request, session_id: str, from_line: int = 0, _auth=Depends(require_auth)):
"""SSE stream for AT session logs — same pattern as job streaming."""
if session_id not in at_sessions:
return StreamingResponse(
iter([f'data: {json.dumps({"error": "session not found"})}\n\n']),
media_type="text/event-stream"
)
async def event_generator():
sent = max(0, from_line)
while True:
if session_id not in at_sessions:
yield f'data: {json.dumps({"error": "session gone"})}\n\n'
break
sess = at_sessions[session_id]
logs = sess["logs"]
if len(logs) > sent:
for entry in logs[sent:]:
yield f"data: {json.dumps({'log': entry})}\n\n"
sent += 1
if sess["status"] == "done":
yield f"data: {json.dumps({'done': True, 'results': sess['results']})}\n\n"
break
await asyncio.sleep(0.4)
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/at/{session_id}")
def at_get_session(request: Request, session_id: str, _auth=Depends(require_auth)):
if session_id not in at_sessions:
return {"error": "Session not found"}
return at_sessions[session_id]
# ══════════════════════════════════════════════════════════════════════════════
# Auth routes — login / logout
# ══════════════════════════════════════════════════════════════════════════════
LOGIN_HTML = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>RV50x Manager — Login</title>
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap" rel="stylesheet">
<style>
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
body {
background: #0a0c0f; color: #c8d8e8;
font-family: 'Inter', sans-serif;
min-height: 100vh; display: flex; align-items: center; justify-content: center;
}
body::before {
content: ''; position: fixed; inset: 0;
background: repeating-linear-gradient(0deg, transparent, transparent 2px, rgba(0,0,0,0.03) 2px, rgba(0,0,0,0.03) 4px);
pointer-events: none;
}
.card {
background: #10141a; border: 1px solid #1e2a38; border-radius: 8px;
padding: 40px; width: 360px; box-shadow: 0 0 40px rgba(0,191,255,0.06);
position: relative; z-index: 1;
}
.logo { display: flex; align-items: center; gap: 12px; margin-bottom: 32px; }
.logo-icon {
width: 32px; height: 32px; background: #00bfff; flex-shrink: 0;
clip-path: polygon(50% 0%, 100% 25%, 100% 75%, 50% 100%, 0% 75%, 0% 25%);
}
.logo-text { font-size: 18px; font-weight: 700; letter-spacing: 2px; color: #00bfff; text-transform: uppercase; }
.logo-sub { font-size: 10px; color: #3a5068; letter-spacing: 1px; margin-top: 2px; }
.field { margin-bottom: 16px; }
label { display: block; font-size: 11px; font-weight: 600; letter-spacing: 1.5px; color: #6a8aa8; text-transform: uppercase; margin-bottom: 6px; }
input[type=text], input[type=password] {
width: 100%; background: #161c25; border: 1px solid #1e2a38; color: #c8d8e8;
font-family: 'Inter', sans-serif; font-size: 14px; padding: 10px 14px;
border-radius: 4px; outline: none; transition: border-color 0.15s;
}
input[type=text]:focus, input[type=password]:focus { border-color: #00bfff; }
.btn-login {
width: 100%; background: #00bfff; border: none; color: #000;
font-family: 'Inter', sans-serif; font-size: 14px; font-weight: 700;
letter-spacing: 1px; padding: 11px; border-radius: 4px; cursor: pointer;
margin-top: 8px; transition: background 0.15s; text-transform: uppercase;
}
.btn-login:hover { background: #33ccff; }
.error {
background: rgba(255,23,68,0.1); border: 1px solid rgba(255,23,68,0.3);
color: #ff1744; font-size: 13px; padding: 10px 14px; border-radius: 4px; margin-bottom: 16px;
}
.footer { text-align: center; margin-top: 24px; font-size: 11px; color: #3a5068; letter-spacing: 0.5px; }
</style>
</head>
<body>
<div class="card">
<div class="logo">
<div class="logo-icon"></div>
<div>
<div class="logo-text">RV50x Manager</div>
<div class="logo-sub">SIERRA WIRELESS // TEMPLATE OPS</div>
</div>
</div>
{error_block}
<form method="POST" action="/auth/login">
<div class="field">
<label>Username</label>
<input type="text" name="username" autocomplete="username" autofocus required>
</div>
<div class="field">
<label>Password</label>
<input type="password" name="password" autocomplete="current-password" required>
</div>
<button class="btn-login" type="submit">Sign In</button>
</form>
<div class="footer">HTTPS secured · Session expires in {hours}h</div>
</div>
</body>
</html>"""
@app.get("/login", response_class=HTMLResponse)
def login_page(error: str = ""):
error_block = f'<div class="error">⚠ {error}</div>' if error else ""
return HTMLResponse(
LOGIN_HTML
.replace("{error_block}", error_block)
.replace("{hours}", str(SESSION_HOURS))
)
@app.post("/auth/login")
async def auth_login(username: str = Form(...), password: str = Form(...)):
if username == APP_USERNAME and password == APP_PASSWORD:
token = _make_session_cookie()
resp = RedirectResponse(url="/", status_code=303)
resp.set_cookie(
key=SESSION_COOKIE, value=token,
httponly=True, secure=False, samesite="lax",
max_age=SESSION_HOURS * 3600,
)
return resp
return RedirectResponse(url="/login?error=Invalid+username+or+password", status_code=303)
@app.get("/auth/logout")
def auth_logout():
resp = RedirectResponse(url="/login", status_code=303)
resp.delete_cookie(SESSION_COOKIE)
return resp
# ══════════════════════════════════════════════════════════════════════════════
# Serve frontend (auth protected)
# ══════════════════════════════════════════════════════════════════════════════
@app.get("/", response_class=HTMLResponse)
def index(request: Request, _auth=Depends(require_auth)):
html_path = SCRIPT_DIR / "index.html"
return HTMLResponse(html_path.read_text(encoding="utf-8"))