53501d5a57
Strips embedded nocodb/postgres services from compose; switches to shared nocodb-network. Adds modem_num to NocoDB field list. Fixes session cookie secure flag. Removes previously-tracked XML template files (covered by .gitignore). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1500 lines
63 KiB
Python
1500 lines
63 KiB
Python
"""
|
|
RV50x Template Manager - Web GUI Backend
|
|
|
|
Run directly:
|
|
uvicorn app:app --host 0.0.0.0 --port 8000
|
|
|
|
Run via Docker:
|
|
docker-compose up -d
|
|
|
|
Environment variables (set in .env or docker-compose.yml):
|
|
NOCODB_URL NocoDB base URL e.g. http://192.168.16.130:8080
|
|
NOCODB_TOKEN NocoDB API token
|
|
NOCODB_BASE_ID Base ID from browser URL
|
|
NOCODB_TABLE_ID Table ID from browser URL
|
|
NOCODB_VIEW_ID View ID for the "All" group
|
|
NOCODB_VIEW_ID_ELECTRIC View ID for the "Electric" group (falls back to NOCODB_VIEW_ID)
|
|
NOCODB_VIEW_ID_GW View ID for the "Gas & Water" group (falls back to NOCODB_VIEW_ID)
|
|
PAGE_TIMEOUT ms to wait for page elements (default 90000)
|
|
DOWNLOAD_TIMEOUT ms to wait for template gen (default 120000)
|
|
UPLOAD_TIMEOUT ms to wait for upload done (default 120000)
|
|
MAX_RETRIES retry attempts per device (default 3)
|
|
DB_PATH path to SQLite database file (default ./rv50x.db)
|
|
MAX_JOBS_MEMORY max completed jobs kept in RAM (default 200)
|
|
EXCEL_CACHE_MAX max Excel files cached in RAM (default 20)
|
|
EXCEL_CACHE_TTL seconds before cached Excel expires (default 3600)
|
|
REACH_TIMEOUT seconds for TCP reachability pre-check (default 3)
|
|
"""
|
|
|
|
import asyncio
|
|
import html as html_lib
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import threading
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import pandas as pd
|
|
import paramiko
|
|
from fastapi import FastAPI, BackgroundTasks, File, Form, UploadFile, Request, Response, Depends, HTTPException
|
|
from fastapi.responses import HTMLResponse, StreamingResponse, RedirectResponse
|
|
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
|
|
from pydantic import BaseModel
|
|
|
|
# ── NocoDB connection — read from environment, fall back to defaults ───────────
|
|
NOCODB_URL = os.environ.get("NOCODB_URL", "http://192.168.16.130:8080")
|
|
NOCODB_TOKEN = os.environ.get("NOCODB_TOKEN", "eWU_ilelaCtNy1JzC7vf41DokkqFOovcLHM0zVml")
|
|
NOCODB_BASE_ID = os.environ.get("NOCODB_BASE_ID", "pdq96x915xt4a0m")
|
|
NOCODB_TABLE_ID = os.environ.get("NOCODB_TABLE_ID", "mkewnr53ahqvnt9")
|
|
NOCODB_VIEW_IDS = {
|
|
"All": os.environ.get("NOCODB_VIEW_ID", "vwl7qvxo1xclvawz"),
|
|
"Electric": os.environ.get("NOCODB_VIEW_ID_ELECTRIC", os.environ.get("NOCODB_VIEW_ID", "vwl7qvxo1xclvawz")),
|
|
"Gas & Water": os.environ.get("NOCODB_VIEW_ID_GW", os.environ.get("NOCODB_VIEW_ID", "vwl7qvxo1xclvawz")),
|
|
}
|
|
|
|
# ── Local paths ────────────────────────────────────────────────────────────────
|
|
# In Docker these are bind-mounted from the host so data persists across rebuilds.
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
DOWNLOAD_DIR = Path(os.environ.get("DOWNLOAD_DIR", str(SCRIPT_DIR / "template_downloads")))
|
|
UPLOAD_DIR = Path(os.environ.get("UPLOAD_DIR", str(SCRIPT_DIR / "template_uploads")))
|
|
TEMPLATES_DIR = Path(os.environ.get("TEMPLATES_DIR", str(SCRIPT_DIR / "xml_templates")))
|
|
DB_PATH = Path(os.environ.get("DB_PATH", str(SCRIPT_DIR / "rv50x.db")))
|
|
PRESETS_FILE = SCRIPT_DIR / "at_presets.json"
|
|
REACH_TIMEOUT = int(os.environ.get("REACH_TIMEOUT", "3")) # seconds for TCP pre-check
|
|
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ── Playwright timeouts — tunable via environment ──────────────────────────────
|
|
PAGE_TIMEOUT = int(os.environ.get("PAGE_TIMEOUT", "90000"))
|
|
DOWNLOAD_TIMEOUT = int(os.environ.get("DOWNLOAD_TIMEOUT", "120000"))
|
|
UPLOAD_TIMEOUT = int(os.environ.get("UPLOAD_TIMEOUT", "120000"))
|
|
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", "3"))
|
|
|
|
# ── In-memory job store ────────────────────────────────────────────────────────
|
|
jobs: dict[str, dict] = {}
|
|
abort_flags: set[str] = set() # job_ids that have been asked to abort
|
|
MAX_JOBS_MEMORY = int(os.environ.get("MAX_JOBS_MEMORY", "200"))
|
|
|
|
# ── AT Terminal job store (separate from Playwright jobs) ──────────────────────
|
|
at_sessions: dict[str, dict] = {} # session_id → {logs, status, results}
|
|
SSH_PORT = int(os.environ.get("SSH_PORT", "3223"))
|
|
|
|
# ── Excel upload cache ─────────────────────────────────────────────────────────
|
|
EXCEL_CACHE_MAX = int(os.environ.get("EXCEL_CACHE_MAX", "20"))
|
|
EXCEL_CACHE_TTL_S = int(os.environ.get("EXCEL_CACHE_TTL", "3600"))
|
|
# Values: {"data": bytes, "ts": datetime}
|
|
excel_cache: dict[str, dict] = {}
|
|
|
|
# ── Auth config ────────────────────────────────────────────────────────────────
|
|
APP_USERNAME = os.environ.get("APP_USERNAME", "admin")
|
|
APP_PASSWORD = os.environ.get("APP_PASSWORD", "changeme")
|
|
SESSION_SECRET = os.environ.get("SESSION_SECRET", "change-this-secret")
|
|
SESSION_HOURS = int(os.environ.get("SESSION_HOURS", "8"))
|
|
SESSION_COOKIE = "rv50x_session"
|
|
_signer = URLSafeTimedSerializer(SESSION_SECRET)
|
|
|
|
def _make_session_cookie() -> str:
|
|
return _signer.dumps({"user": APP_USERNAME})
|
|
|
|
def _verify_session_cookie(token: str) -> bool:
|
|
try:
|
|
_signer.loads(token, max_age=SESSION_HOURS * 3600)
|
|
return True
|
|
except (BadSignature, SignatureExpired):
|
|
return False
|
|
|
|
def require_auth(request: Request):
|
|
token = request.cookies.get(SESSION_COOKIE)
|
|
if not token or not _verify_session_cookie(token):
|
|
raise HTTPException(status_code=302, headers={"Location": "/login"})
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# SQLite persistence — jobs & AT sessions survive restarts
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _init_db():
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
job_id TEXT PRIMARY KEY,
|
|
type TEXT,
|
|
status TEXT,
|
|
device_count INTEGER,
|
|
started TEXT,
|
|
finished TEXT,
|
|
reboot INTEGER DEFAULT 0,
|
|
logs TEXT DEFAULT '[]',
|
|
results TEXT DEFAULT '[]'
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS at_sessions (
|
|
session_id TEXT PRIMARY KEY,
|
|
status TEXT,
|
|
started TEXT,
|
|
finished TEXT,
|
|
device_count INTEGER,
|
|
commands TEXT DEFAULT '[]',
|
|
logs TEXT DEFAULT '[]',
|
|
results TEXT DEFAULT '[]'
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
|
|
def _persist_job(job_id: str):
|
|
j = jobs.get(job_id)
|
|
if not j:
|
|
return
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.execute(
|
|
"""INSERT OR REPLACE INTO jobs
|
|
(job_id, type, status, device_count, started, finished, reboot, logs, results)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
job_id, j["type"], j["status"], j["device_count"],
|
|
j.get("started"), j.get("finished"),
|
|
1 if j.get("reboot") else 0,
|
|
json.dumps(j["logs"]), json.dumps(j["results"]),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def _persist_at_session(session_id: str):
|
|
s = at_sessions.get(session_id)
|
|
if not s:
|
|
return
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.execute(
|
|
"""INSERT OR REPLACE INTO at_sessions
|
|
(session_id, status, started, finished, device_count, commands, logs, results)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
session_id, s["status"], s.get("started"), s.get("finished"),
|
|
s["device_count"], json.dumps(s["commands"]),
|
|
json.dumps(s["logs"]), json.dumps(s["results"]),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def _load_history_from_db():
|
|
"""Load the most-recent completed jobs/sessions from SQLite into memory."""
|
|
try:
|
|
with sqlite3.connect(DB_PATH) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
for row in conn.execute(
|
|
"SELECT * FROM jobs ORDER BY started DESC LIMIT ?", (MAX_JOBS_MEMORY,)
|
|
):
|
|
if row["job_id"] not in jobs:
|
|
jobs[row["job_id"]] = {
|
|
"type": row["type"],
|
|
"status": row["status"],
|
|
"device_count": row["device_count"],
|
|
"started": row["started"],
|
|
"finished": row["finished"],
|
|
"reboot": bool(row["reboot"]),
|
|
"logs": json.loads(row["logs"]),
|
|
"results": json.loads(row["results"]),
|
|
}
|
|
for row in conn.execute(
|
|
"SELECT * FROM at_sessions ORDER BY started DESC LIMIT ?", (MAX_JOBS_MEMORY,)
|
|
):
|
|
if row["session_id"] not in at_sessions:
|
|
at_sessions[row["session_id"]] = {
|
|
"status": row["status"],
|
|
"started": row["started"],
|
|
"finished": row["finished"],
|
|
"device_count": row["device_count"],
|
|
"commands": json.loads(row["commands"]),
|
|
"logs": json.loads(row["logs"]),
|
|
"results": json.loads(row["results"]),
|
|
}
|
|
except Exception as e:
|
|
print(f"[DB] Failed to load history: {e}")
|
|
|
|
|
|
def _prune_jobs_memory():
|
|
"""Drop oldest completed jobs from memory once we exceed MAX_JOBS_MEMORY."""
|
|
done = sorted(
|
|
[(jid, j) for jid, j in jobs.items() if j["status"] == "done"],
|
|
key=lambda x: x[1].get("finished", ""),
|
|
)
|
|
excess = len(done) - MAX_JOBS_MEMORY
|
|
for jid, _ in done[:max(0, excess)]:
|
|
del jobs[jid]
|
|
|
|
|
|
app = FastAPI(title="RV50x Template Manager")
|
|
|
|
# Initialise DB and reload history on startup
|
|
_init_db()
|
|
_load_history_from_db()
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Excel cache helpers — bounded size + TTL eviction
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _excel_cache_evict():
|
|
"""Remove expired entries, then drop oldest until under EXCEL_CACHE_MAX."""
|
|
now = datetime.now()
|
|
expired = [
|
|
k for k, v in excel_cache.items()
|
|
if (now - v["ts"]).total_seconds() > EXCEL_CACHE_TTL_S
|
|
]
|
|
for k in expired:
|
|
del excel_cache[k]
|
|
if len(excel_cache) >= EXCEL_CACHE_MAX:
|
|
oldest = sorted(excel_cache.items(), key=lambda x: x[1]["ts"])
|
|
for k, _ in oldest[: len(excel_cache) - EXCEL_CACHE_MAX + 1]:
|
|
del excel_cache[k]
|
|
|
|
|
|
def _excel_cache_set(filename: str, data: bytes):
|
|
_excel_cache_evict()
|
|
excel_cache[filename] = {"data": data, "ts": datetime.now()}
|
|
|
|
|
|
def _excel_cache_get(filename: str) -> bytes | None:
|
|
entry = excel_cache.get(filename)
|
|
if not entry:
|
|
return None
|
|
if (datetime.now() - entry["ts"]).total_seconds() > EXCEL_CACHE_TTL_S:
|
|
del excel_cache[filename]
|
|
return None
|
|
return entry["data"]
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# NocoDB helpers
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _fetch_all_rows(view_id: str) -> list:
|
|
headers = {"xc-token": NOCODB_TOKEN, "Content-Type": "application/json"}
|
|
all_rows = []
|
|
offset, limit = 0, 1000
|
|
while True:
|
|
params = urllib.parse.urlencode({"viewId": view_id, "limit": limit, "offset": offset})
|
|
url = f"{NOCODB_URL}/api/v1/db/data/noco/{NOCODB_BASE_ID}/{NOCODB_TABLE_ID}?{params}"
|
|
req = urllib.request.Request(url, headers=headers)
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
data = json.loads(resp.read().decode())
|
|
all_rows.extend(data.get("list", []))
|
|
if data.get("pageInfo", {}).get("isLastPage", True):
|
|
break
|
|
offset += limit
|
|
return all_rows
|
|
|
|
|
|
# All NocoDB fields we care about — everything becomes a template variable
|
|
NOCODB_FIELDS = [
|
|
"hostname", "ip_address", "username", "password", "https_port",
|
|
"use_https", "ssh_port", "location", "alias", "dept", "def_pass",
|
|
"snmp_auth_key", "snmp_priv_key", "fort_key", "vpn_subnets", "modem_num",
|
|
]
|
|
|
|
|
|
def load_devices(group: str = "All") -> list:
|
|
view_id = NOCODB_VIEW_IDS.get(group, NOCODB_VIEW_IDS["All"])
|
|
rows = _fetch_all_rows(view_id)
|
|
devices = []
|
|
for row in rows:
|
|
device_id = str(row.get("hostname", "")).strip()
|
|
ip = str(row.get("ip_address", "")).strip()
|
|
username = str(row.get("username", "")).strip()
|
|
password = str(row.get("password", "")).strip()
|
|
dept = str(row.get("dept", "")).strip()
|
|
location = str(row.get("location", "")).strip()
|
|
if not device_id or not ip:
|
|
continue
|
|
if group == "Electric" and dept.upper() != "ELEC":
|
|
continue
|
|
if group == "Gas & Water" and dept.upper() != "GW":
|
|
continue
|
|
devices.append({
|
|
"id": device_id, "ip": ip,
|
|
"username": username, "password": password,
|
|
"dept": dept, "location": location,
|
|
})
|
|
return devices
|
|
|
|
|
|
def load_devices_full(group: str = "All") -> list:
|
|
"""Load all NocoDB fields for each device (used by XML Builder)."""
|
|
view_id = NOCODB_VIEW_IDS.get(group, NOCODB_VIEW_IDS["All"])
|
|
rows = _fetch_all_rows(view_id)
|
|
devices = []
|
|
for row in rows:
|
|
device_id = str(row.get("hostname", "")).strip()
|
|
dept = str(row.get("dept", "")).strip()
|
|
if not device_id:
|
|
continue
|
|
if group == "Electric" and dept.upper() != "ELEC":
|
|
continue
|
|
if group == "Gas & Water" and dept.upper() != "GW":
|
|
continue
|
|
# Build a flat dict of all available fields
|
|
device = {}
|
|
for field in NOCODB_FIELDS:
|
|
val = row.get(field, "")
|
|
device[field] = str(val).strip() if val is not None else ""
|
|
devices.append(device)
|
|
return devices
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# XML Builder — core logic (replaces excel_to_xml.py)
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _extract_template_variables(template: str) -> list[str]:
|
|
"""Return unique variable names found in {var} or {{ var }} placeholders."""
|
|
pattern = r'\{\{?\s*([^{}]+?)\s*\}?\}'
|
|
return list(set(re.findall(pattern, template)))
|
|
|
|
|
|
def _escape_xml(value) -> str:
|
|
"""Convert a value to an XML-safe string."""
|
|
if not isinstance(value, str):
|
|
# Convert float-integers like 443.0 → "443"
|
|
if isinstance(value, float) and value == int(value):
|
|
value = str(int(value))
|
|
else:
|
|
value = str(value)
|
|
# Escape backslashes then XML special chars
|
|
value = value.replace('\\', '\\\\')
|
|
value = html_lib.escape(value)
|
|
return value
|
|
|
|
|
|
def _render_template(template: str, data: dict, default: str = "NOTSET") -> str:
|
|
"""Replace all {var} and {{ var }} placeholders in template with data values."""
|
|
for var, raw_val in data.items():
|
|
val = _escape_xml(raw_val) if raw_val not in ("", None) else default
|
|
try:
|
|
template = re.sub(r'\{\{\s*' + re.escape(var) + r'\s*\}\}', val, template)
|
|
template = re.sub(r'\{' + re.escape(var) + r'\}', val, template)
|
|
except Exception:
|
|
pass
|
|
# Fill any remaining unreplaced variables with the default
|
|
template = re.sub(r'\{\{?\s*[^{}]+?\s*\}?\}', default, template)
|
|
return template
|
|
|
|
|
|
def build_xmls_from_nocodb(
|
|
group: str,
|
|
hostnames: list[str],
|
|
template: str,
|
|
default: str = "NOTSET",
|
|
) -> list[dict]:
|
|
"""
|
|
Generate XML files for the given hostnames using live NocoDB data.
|
|
Files are written to template_uploads/.
|
|
Returns a list of result dicts: {hostname, success, message}.
|
|
"""
|
|
all_devices = load_devices_full(group)
|
|
device_map = {d["hostname"]: d for d in all_devices}
|
|
results = []
|
|
|
|
for hostname in hostnames:
|
|
if hostname not in device_map:
|
|
results.append({"hostname": hostname, "success": False, "message": "Not found in NocoDB"})
|
|
continue
|
|
data = device_map[hostname]
|
|
content = _render_template(template, data, default)
|
|
out = UPLOAD_DIR / f"{hostname}.xml"
|
|
try:
|
|
out.write_text(content, encoding="utf-8")
|
|
results.append({"hostname": hostname, "success": True, "message": f"→ {out.name}"})
|
|
except Exception as e:
|
|
results.append({"hostname": hostname, "success": False, "message": str(e)})
|
|
|
|
return results
|
|
|
|
|
|
def build_xmls_from_excel(
|
|
excel_bytes: bytes,
|
|
template: str,
|
|
hostnames: list[str],
|
|
default: str = "NOTSET",
|
|
) -> list[dict]:
|
|
"""
|
|
Generate XML files for the given hostnames using data from an uploaded Excel file.
|
|
Files are written to template_uploads/.
|
|
Returns a list of result dicts: {hostname, success, message}.
|
|
"""
|
|
df = pd.read_excel(io.BytesIO(excel_bytes))
|
|
|
|
# Find hostname column
|
|
hostname_col = next(
|
|
(c for c in df.columns if "hostname" in c.lower()), None
|
|
)
|
|
if not hostname_col:
|
|
return [{"hostname": h, "success": False, "message": "No hostname column in Excel"} for h in hostnames]
|
|
|
|
results = []
|
|
for hostname in hostnames:
|
|
rows = df[df[hostname_col] == hostname]
|
|
if rows.empty:
|
|
results.append({"hostname": hostname, "success": False, "message": "Not found in Excel"})
|
|
continue
|
|
|
|
row = rows.iloc[0].to_dict()
|
|
data = {}
|
|
for col, val in row.items():
|
|
clean = col.strip().strip('{}')
|
|
if pd.notna(val):
|
|
if isinstance(val, float) and val == int(val):
|
|
data[clean] = str(int(val))
|
|
else:
|
|
data[clean] = str(val)
|
|
else:
|
|
data[clean] = default
|
|
|
|
content = _render_template(template, data, default)
|
|
out = UPLOAD_DIR / f"{hostname}.xml"
|
|
try:
|
|
out.write_text(content, encoding="utf-8")
|
|
results.append({"hostname": hostname, "success": True, "message": f"→ {out.name}"})
|
|
except Exception as e:
|
|
results.append({"hostname": hostname, "success": False, "message": str(e)})
|
|
|
|
return results
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Report writer
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _write_report(job_id: str, job: dict, output_dir: Path) -> Path:
|
|
"""Write a timestamped report file matching the CLI format."""
|
|
results = job.get("results", [])
|
|
succeeded = [r for r in results if r.get("success")]
|
|
failed = [r for r in results if not r.get("success")]
|
|
started = datetime.fromisoformat(job["started"]) if job.get("started") else datetime.now()
|
|
finished = datetime.fromisoformat(job["finished"]) if job.get("finished") else datetime.now()
|
|
elapsed = (finished - started).total_seconds()
|
|
job_type = job.get("type", "job").upper()
|
|
timestamp = finished.strftime("%Y-%m-%d %H:%M")
|
|
|
|
lines = []
|
|
lines.append("═" * 60)
|
|
lines.append(f" {job_type} SUMMARY — {timestamp}")
|
|
lines.append(f" Job ID: {job_id}")
|
|
lines.append("═" * 60)
|
|
lines.append(f" Total: {len(results)}")
|
|
lines.append(f" ✓ Success: {len(succeeded)}")
|
|
lines.append(f" ✗ Failed: {len(failed)}")
|
|
lines.append(f" Time: {elapsed:.0f}s")
|
|
if succeeded:
|
|
lines.append("")
|
|
lines.append(f" {'Downloaded' if job_type == 'DOWNLOAD' else 'Uploaded'}:")
|
|
for r in succeeded:
|
|
extra = f" ({r.get('size_kb', '')} KB)" if r.get("size_kb") else ""
|
|
lines.append(f" ✓ {r['id']}{extra}")
|
|
if failed:
|
|
lines.append("")
|
|
lines.append(" Failed:")
|
|
for r in failed:
|
|
lines.append(f" ✗ {r['id']} — {r.get('message', '')}")
|
|
lines.append("═" * 60)
|
|
lines.append("")
|
|
lines.append(" Full log:")
|
|
lines.append("")
|
|
for log_line in job.get("logs", []):
|
|
lines.append(f" {log_line}")
|
|
lines.append("═" * 60)
|
|
|
|
report_name = f"report_{finished.strftime('%Y%m%d_%H%M%S')}.txt"
|
|
report_path = output_dir / report_name
|
|
report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
return report_path
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Playwright workers (download / upload to modems)
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
def _is_reachable(ip: str, port: int = 443) -> bool:
|
|
"""Single TCP connect to confirm a device is accepting connections on port 443."""
|
|
import socket
|
|
try:
|
|
with socket.create_connection((ip, port), timeout=REACH_TIMEOUT):
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
|
|
async def _login_and_open_modal(page, device: dict, log) -> bool:
|
|
"""
|
|
Log in to a device web UI and open the template dialog.
|
|
Returns True when the modal is visible and ready, False if it never opened.
|
|
Raises on network / timeout errors so callers can handle them uniformly.
|
|
"""
|
|
ip = device["ip"]
|
|
username = device["username"]
|
|
password = device["password"]
|
|
|
|
await page.goto(f"https://{ip}", wait_until="load")
|
|
await page.wait_for_selector('input[name="username"]', state="visible", timeout=PAGE_TIMEOUT)
|
|
await page.wait_for_timeout(1000)
|
|
await page.click('input[name="username"]', click_count=3)
|
|
await page.fill('input[name="username"]', username)
|
|
await page.fill('input[name="password"]', password)
|
|
await page.wait_for_timeout(800)
|
|
login_btn = page.locator('input[name="Login"]')
|
|
await login_btn.scroll_into_view_if_needed()
|
|
await login_btn.click()
|
|
await page.wait_for_selector('#btn_tpl', state="visible", timeout=PAGE_TIMEOUT)
|
|
await page.wait_for_function("typeof showTemplateDialog === 'function'", timeout=PAGE_TIMEOUT)
|
|
await page.wait_for_timeout(500)
|
|
log("Logged in.")
|
|
|
|
for _ in range(5):
|
|
await page.evaluate("showTemplateDialog()")
|
|
await page.wait_for_timeout(1000)
|
|
visible = await page.evaluate("""
|
|
() => {
|
|
const el = document.getElementById('template_name');
|
|
if (!el) return false;
|
|
const form = el.closest('form') || el.closest('div[id]');
|
|
if (!form) return el.offsetParent !== null;
|
|
return form.offsetParent !== null;
|
|
}
|
|
""")
|
|
if visible:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def _download_one(device: dict, job_id: str) -> dict:
|
|
from playwright.async_api import async_playwright, TimeoutError as PWTimeout
|
|
device_id = device["id"]
|
|
|
|
def log(msg):
|
|
jobs[job_id]["logs"].append(f"[{datetime.now().strftime('%H:%M:%S')}] [{device_id}] {msg}")
|
|
|
|
if not _is_reachable(device["ip"]):
|
|
log(f"✗ Unreachable (no TCP response on :443 within {REACH_TIMEOUT}s)")
|
|
return {"id": device_id, "success": False, "message": "Unreachable"}
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=True)
|
|
context = await browser.new_context(ignore_https_errors=True, accept_downloads=True)
|
|
page = await context.new_page()
|
|
page.set_default_timeout(PAGE_TIMEOUT)
|
|
try:
|
|
if not await _login_and_open_modal(page, device, log):
|
|
return {"id": device_id, "success": False, "message": "Modal did not open"}
|
|
|
|
await page.fill('input[id="template_name"]', device_id)
|
|
pwd_cb = page.locator('input[id="template_pwds"]')
|
|
if not await pwd_cb.is_checked():
|
|
await pwd_cb.check()
|
|
info_cb = page.locator('input[id="template_info"]')
|
|
if await info_cb.count() > 0 and await info_cb.is_checked():
|
|
await info_cb.uncheck()
|
|
|
|
page.on("dialog", lambda d: asyncio.ensure_future(d.dismiss()))
|
|
out_path = DOWNLOAD_DIR / f"{device_id}.xml"
|
|
async with page.expect_download(timeout=DOWNLOAD_TIMEOUT) as dl:
|
|
await page.click('input.fbtn[name="download_template"]')
|
|
download = await dl.value
|
|
await download.save_as(out_path)
|
|
await page.wait_for_timeout(1000)
|
|
|
|
if out_path.exists() and out_path.stat().st_size > 0:
|
|
size_kb = out_path.stat().st_size // 1024
|
|
log(f"✓ Saved {size_kb} KB")
|
|
return {"id": device_id, "success": True, "message": f"{size_kb} KB", "size_kb": size_kb}
|
|
return {"id": device_id, "success": False, "message": "File empty after download"}
|
|
|
|
except PWTimeout:
|
|
log("✗ TIMEOUT")
|
|
return {"id": device_id, "success": False, "message": "TIMEOUT"}
|
|
except Exception as e:
|
|
log(f"✗ ERROR: {str(e)[:80]}")
|
|
return {"id": device_id, "success": False, "message": str(e)[:120]}
|
|
finally:
|
|
await context.close()
|
|
await browser.close()
|
|
|
|
|
|
async def _upload_one(device: dict, reboot: bool, job_id: str) -> dict:
|
|
from playwright.async_api import async_playwright, TimeoutError as PWTimeout
|
|
device_id = device["id"]
|
|
template_file = UPLOAD_DIR / f"{device_id}.xml"
|
|
|
|
def log(msg):
|
|
jobs[job_id]["logs"].append(f"[{datetime.now().strftime('%H:%M:%S')}] [{device_id}] {msg}")
|
|
|
|
if not template_file.exists():
|
|
log("✗ Template file not found")
|
|
return {"id": device_id, "success": False, "message": "Template file not found"}
|
|
|
|
if not _is_reachable(device["ip"]):
|
|
log(f"✗ Unreachable (no TCP response on :443 within {REACH_TIMEOUT}s)")
|
|
return {"id": device_id, "success": False, "message": "Unreachable"}
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=True)
|
|
context = await browser.new_context(ignore_https_errors=True, accept_downloads=True)
|
|
page = await context.new_page()
|
|
page.set_default_timeout(PAGE_TIMEOUT)
|
|
try:
|
|
if not await _login_and_open_modal(page, device, log):
|
|
return {"id": device_id, "success": False, "message": "Modal did not open"}
|
|
|
|
await page.locator('input[id="template_filename"]').set_input_files(str(template_file))
|
|
log(f"File set: {template_file.name}")
|
|
|
|
reboot_cb = page.locator('input[name="reboot_after_upload"]')
|
|
is_checked = await reboot_cb.is_checked()
|
|
if reboot and not is_checked:
|
|
await reboot_cb.check()
|
|
elif not reboot and is_checked:
|
|
await reboot_cb.uncheck()
|
|
|
|
page.on("dialog", lambda d: asyncio.ensure_future(d.dismiss()))
|
|
log("Uploading...")
|
|
|
|
if reboot:
|
|
# Click upload then race multiple reboot signals.
|
|
# Different firmware versions signal reboot differently:
|
|
# - URL navigates to /?reboot
|
|
# - Page shows a reboot/success message
|
|
# - Connection drops entirely (device gone)
|
|
await page.click('input[name="upload_template"]')
|
|
|
|
reboot_confirmed = False
|
|
deadline = UPLOAD_TIMEOUT / 1000 # seconds
|
|
|
|
async def _any_reboot_signal():
|
|
tasks = [
|
|
asyncio.ensure_future(page.wait_for_url("**/?reboot**", timeout=UPLOAD_TIMEOUT)),
|
|
asyncio.ensure_future(page.wait_for_url("**reboot**", timeout=UPLOAD_TIMEOUT)),
|
|
asyncio.ensure_future(page.wait_for_selector(
|
|
'text=Rebooting, text=reboot, text=Upload Complete, text=Apply Complete',
|
|
timeout=UPLOAD_TIMEOUT
|
|
)),
|
|
]
|
|
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
|
for t in pending:
|
|
t.cancel()
|
|
return bool(done)
|
|
|
|
try:
|
|
reboot_confirmed = await asyncio.wait_for(_any_reboot_signal(), timeout=deadline)
|
|
except (asyncio.TimeoutError, Exception):
|
|
# Connection drop or page crash after clicking upload
|
|
# is itself strong evidence the device is rebooting.
|
|
try:
|
|
await page.title()
|
|
except Exception:
|
|
reboot_confirmed = True # page is gone = rebooted
|
|
|
|
if reboot_confirmed:
|
|
log("✓ Rebooting — upload successful.")
|
|
return {"id": device_id, "success": True, "message": "OK"}
|
|
|
|
log("✗ Reboot signal not detected after upload")
|
|
return {"id": device_id, "success": False, "message": "Reboot not confirmed"}
|
|
|
|
else:
|
|
await page.click('input[name="upload_template"]')
|
|
await page.wait_for_selector(
|
|
'text=Upload Complete, text=Apply Complete, text=Template Applied',
|
|
timeout=UPLOAD_TIMEOUT
|
|
)
|
|
log("✓ Upload complete (no reboot).")
|
|
return {"id": device_id, "success": True, "message": "OK"}
|
|
|
|
except PWTimeout:
|
|
log("✗ TIMEOUT")
|
|
return {"id": device_id, "success": False, "message": "TIMEOUT"}
|
|
except Exception as e:
|
|
err = str(e).lower()
|
|
# Connection errors AFTER upload started = device rebooted
|
|
if reboot and any(k in err for k in ("connection", "net::", "closed", "crashed", "target", "disconnected")):
|
|
log("✓ Connection closed — device is rebooting.")
|
|
return {"id": device_id, "success": True, "message": "OK"}
|
|
log(f"✗ ERROR: {str(e)[:80]}")
|
|
return {"id": device_id, "success": False, "message": str(e)[:120]}
|
|
finally:
|
|
await context.close()
|
|
await browser.close()
|
|
|
|
|
|
async def _run_with_retry(fn, device, job_id, **kwargs):
|
|
device_id = device["id"]
|
|
ts = lambda: datetime.now().strftime('%H:%M:%S')
|
|
|
|
for attempt in range(1, MAX_RETRIES + 1):
|
|
# Check abort flag before each attempt
|
|
if job_id in abort_flags:
|
|
jobs[job_id]["logs"].append(f"[{ts()}] [{device_id}] ✗ Aborted.")
|
|
return {"id": device_id, "success": False, "message": "Aborted"}
|
|
|
|
jobs[job_id]["logs"].append(
|
|
f"[{ts()}] [{device_id}] Attempt {attempt}/{MAX_RETRIES}"
|
|
)
|
|
result = await fn(device, job_id=job_id, **kwargs)
|
|
|
|
if result["success"]:
|
|
return result
|
|
|
|
if attempt < MAX_RETRIES:
|
|
if job_id in abort_flags:
|
|
jobs[job_id]["logs"].append(f"[{ts()}] [{device_id}] ✗ Aborted.")
|
|
return {"id": device_id, "success": False, "message": "Aborted"}
|
|
jobs[job_id]["logs"].append(
|
|
f"[{ts()}] [{device_id}] Retrying in 15s..."
|
|
)
|
|
# Poll abort flag every second so abort is responsive
|
|
for _ in range(15):
|
|
if job_id in abort_flags:
|
|
break
|
|
await asyncio.sleep(1)
|
|
|
|
return result
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Background job runners
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
async def run_download_job(job_id: str, devices: list, concurrency: int):
|
|
jobs[job_id]["status"] = "running"
|
|
semaphore = asyncio.Semaphore(concurrency)
|
|
|
|
async def _guarded(device):
|
|
async with semaphore:
|
|
return await _run_with_retry(_download_one, device, job_id)
|
|
|
|
results = await asyncio.gather(*[_guarded(d) for d in devices])
|
|
jobs[job_id]["results"] = list(results)
|
|
jobs[job_id]["status"] = "done"
|
|
jobs[job_id]["finished"] = datetime.now().isoformat()
|
|
abort_flags.discard(job_id)
|
|
report_path = _write_report(job_id, jobs[job_id], DOWNLOAD_DIR)
|
|
jobs[job_id]["logs"].append(
|
|
f"[{datetime.now().strftime('%H:%M:%S')}] Report saved → {report_path.name}"
|
|
)
|
|
_persist_job(job_id)
|
|
_prune_jobs_memory()
|
|
|
|
|
|
async def run_upload_job(job_id: str, devices: list, concurrency: int, reboot: bool):
|
|
jobs[job_id]["status"] = "running"
|
|
semaphore = asyncio.Semaphore(concurrency)
|
|
|
|
async def _guarded(device):
|
|
async with semaphore:
|
|
return await _run_with_retry(_upload_one, device, job_id, reboot=reboot)
|
|
|
|
results = await asyncio.gather(*[_guarded(d) for d in devices])
|
|
jobs[job_id]["results"] = list(results)
|
|
jobs[job_id]["status"] = "done"
|
|
jobs[job_id]["finished"] = datetime.now().isoformat()
|
|
abort_flags.discard(job_id)
|
|
report_path = _write_report(job_id, jobs[job_id], UPLOAD_DIR)
|
|
jobs[job_id]["logs"].append(
|
|
f"[{datetime.now().strftime('%H:%M:%S')}] Report saved → {report_path.name}"
|
|
)
|
|
_persist_job(job_id)
|
|
_prune_jobs_memory()
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# API models
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class JobRequest(BaseModel):
|
|
group: str = "All"
|
|
device_ids: list[str] = []
|
|
concurrency: int = 1
|
|
reboot: bool = True
|
|
|
|
|
|
class BuildRequest(BaseModel):
|
|
source: str # "nocodb" or "excel"
|
|
group: str = "All"
|
|
hostnames: list[str]
|
|
template_name: str # filename in xml_templates/
|
|
default_value: str = "NOTSET"
|
|
|
|
|
|
class PresetRequest(BaseModel):
|
|
name: str
|
|
commands: list[str]
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# API routes — devices & jobs (unchanged)
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
@app.get("/api/devices")
|
|
def api_devices(request: Request, group: str = "All", _auth=Depends(require_auth)):
|
|
try:
|
|
devices = load_devices(group)
|
|
return {"devices": devices, "count": len(devices)}
|
|
except Exception as e:
|
|
return {"error": str(e), "devices": [], "count": 0}
|
|
|
|
|
|
@app.post("/api/jobs/download")
|
|
async def start_download(request: Request, req: JobRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)):
|
|
devices = load_devices(req.group)
|
|
if req.device_ids:
|
|
devices = [d for d in devices if d["id"] in req.device_ids]
|
|
if not devices:
|
|
return {"error": "No matching devices found"}
|
|
job_id = f"dl_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
jobs[job_id] = {
|
|
"type": "download", "status": "queued",
|
|
"logs": [], "results": [],
|
|
"started": datetime.now().isoformat(), "finished": None,
|
|
"device_count": len(devices),
|
|
}
|
|
background_tasks.add_task(run_download_job, job_id, devices, req.concurrency)
|
|
return {"job_id": job_id}
|
|
|
|
|
|
@app.post("/api/jobs/upload")
|
|
async def start_upload(request: Request, req: JobRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)):
|
|
devices = load_devices(req.group)
|
|
if req.device_ids:
|
|
devices = [d for d in devices if d["id"] in req.device_ids]
|
|
devices = [d for d in devices if (UPLOAD_DIR / f"{d['id']}.xml").exists()]
|
|
if not devices:
|
|
return {"error": "No devices with matching template files found in template_uploads/"}
|
|
job_id = f"ul_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
jobs[job_id] = {
|
|
"type": "upload", "status": "queued",
|
|
"logs": [], "results": [],
|
|
"started": datetime.now().isoformat(), "finished": None,
|
|
"device_count": len(devices),
|
|
"reboot": req.reboot,
|
|
}
|
|
background_tasks.add_task(run_upload_job, job_id, devices, req.concurrency, req.reboot)
|
|
return {"job_id": job_id}
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}")
|
|
def get_job(request: Request, job_id: str, _auth=Depends(require_auth)):
|
|
if job_id not in jobs:
|
|
return {"error": "Job not found"}
|
|
return jobs[job_id]
|
|
|
|
|
|
@app.get("/api/jobs/{job_id}/stream")
|
|
async def stream_job(request: Request, job_id: str, from_line: int = 0, _auth=Depends(require_auth)):
|
|
"""Stream job logs via SSE. Pass ?from_line=N to skip already-seen lines."""
|
|
async def event_generator():
|
|
sent = max(0, from_line)
|
|
while True:
|
|
if job_id not in jobs:
|
|
yield "data: {\"error\": \"job not found\"}\n\n"
|
|
break
|
|
job = jobs[job_id]
|
|
logs = job["logs"]
|
|
if len(logs) > sent:
|
|
for line in logs[sent:]:
|
|
yield f"data: {json.dumps({'log': line, 'line_num': sent})}\n\n"
|
|
sent += 1
|
|
if job["status"] == "done":
|
|
yield f"data: {json.dumps({'done': True, 'results': job['results']})}\n\n"
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
|
|
|
|
|
@app.get("/api/jobs")
|
|
def list_jobs(request: Request, _auth=Depends(require_auth)):
|
|
summary = []
|
|
for jid, j in sorted(jobs.items(), reverse=True):
|
|
summary.append({
|
|
"job_id": jid, "type": j["type"], "status": j["status"],
|
|
"device_count": j["device_count"], "started": j["started"],
|
|
"finished": j.get("finished"),
|
|
"succeeded": sum(1 for r in j["results"] if r.get("success")),
|
|
"failed": sum(1 for r in j["results"] if not r.get("success")),
|
|
})
|
|
return {"jobs": summary}
|
|
|
|
|
|
@app.get("/api/files/downloads")
|
|
def list_downloads(request: Request, _auth=Depends(require_auth)):
|
|
files = sorted(DOWNLOAD_DIR.glob("*.xml"))
|
|
return {"files": [{"name": f.name, "size_kb": f.stat().st_size // 1024} for f in files]}
|
|
|
|
|
|
@app.get("/api/files/uploads")
|
|
def list_uploads(request: Request, _auth=Depends(require_auth)):
|
|
files = sorted(UPLOAD_DIR.glob("*.xml"))
|
|
return {"files": [{"name": f.name, "size_kb": f.stat().st_size // 1024} for f in files]}
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# API routes — XML Builder
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
@app.get("/api/xmlbuilder/nocodb-fields")
|
|
def xmlbuilder_nocodb_fields(request: Request, _auth=Depends(require_auth)):
|
|
"""Return available NocoDB field names (= available template variables)."""
|
|
return {"fields": NOCODB_FIELDS}
|
|
|
|
|
|
@app.get("/api/xmlbuilder/nocodb-devices")
|
|
def xmlbuilder_nocodb_devices(request: Request, group: str = "All", _auth=Depends(require_auth)):
|
|
"""Return hostnames for the given group from NocoDB."""
|
|
try:
|
|
devices = load_devices_full(group)
|
|
hostnames = [d["hostname"] for d in devices if d["hostname"]]
|
|
return {"hostnames": hostnames, "count": len(hostnames)}
|
|
except Exception as e:
|
|
return {"error": str(e), "hostnames": []}
|
|
|
|
|
|
@app.post("/api/xmlbuilder/upload-excel")
|
|
async def xmlbuilder_upload_excel(request: Request, file: UploadFile = File(...), _auth=Depends(require_auth)):
|
|
"""
|
|
Accept an uploaded Excel file, return its columns and hostnames.
|
|
Stores the file temporarily in memory (not on disk).
|
|
"""
|
|
try:
|
|
contents = await file.read()
|
|
df = pd.read_excel(io.BytesIO(contents))
|
|
columns = [c.strip().strip('{}') for c in df.columns]
|
|
hostname_col = next((c for c in df.columns if "hostname" in c.lower()), None)
|
|
hostnames = df[hostname_col].dropna().astype(str).tolist() if hostname_col else []
|
|
_excel_cache_set(file.filename, contents)
|
|
return {
|
|
"filename": file.filename,
|
|
"columns": columns,
|
|
"hostnames": hostnames,
|
|
"rows": len(df),
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
@app.get("/api/xmlbuilder/templates")
|
|
def xmlbuilder_list_templates(request: Request, _auth=Depends(require_auth)):
|
|
"""List XML template files stored in xml_templates/."""
|
|
files = sorted(TEMPLATES_DIR.glob("*.xml"))
|
|
result = []
|
|
for f in files:
|
|
content = f.read_text(encoding="utf-8", errors="replace")
|
|
variables = _extract_template_variables(content)
|
|
result.append({
|
|
"name": f.name,
|
|
"size_kb": f.stat().st_size // 1024,
|
|
"variables": variables,
|
|
})
|
|
return {"templates": result}
|
|
|
|
|
|
@app.post("/api/xmlbuilder/upload-template")
|
|
async def xmlbuilder_upload_template(request: Request, file: UploadFile = File(...), _auth=Depends(require_auth)):
|
|
"""Save an uploaded XML template into xml_templates/."""
|
|
try:
|
|
contents = await file.read()
|
|
dest = TEMPLATES_DIR / file.filename
|
|
dest.write_bytes(contents)
|
|
text = contents.decode("utf-8", errors="replace")
|
|
variables = _extract_template_variables(text)
|
|
return {"name": file.filename, "variables": variables, "saved": True}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
@app.post("/api/xmlbuilder/generate/nocodb")
|
|
def xmlbuilder_generate_nocodb(request: Request, req: BuildRequest, _auth=Depends(require_auth)):
|
|
"""Generate XMLs from NocoDB data and save to template_uploads/."""
|
|
template_path = TEMPLATES_DIR / req.template_name
|
|
if not template_path.exists():
|
|
return {"error": f"Template '{req.template_name}' not found in xml_templates/"}
|
|
template = template_path.read_text(encoding="utf-8")
|
|
results = build_xmls_from_nocodb(req.group, req.hostnames, template, req.default_value)
|
|
ok = sum(1 for r in results if r["success"])
|
|
err = len(results) - ok
|
|
return {"results": results, "succeeded": ok, "failed": err}
|
|
|
|
|
|
@app.post("/api/xmlbuilder/generate/excel")
|
|
async def xmlbuilder_generate_excel(
|
|
request: Request,
|
|
hostnames: str = Form(...), # JSON array string
|
|
template_name: str = Form(...),
|
|
excel_filename: str = Form(...), # key into excel_cache
|
|
default_value: str = Form("NOTSET"),
|
|
excel_file: Optional[UploadFile] = File(None),
|
|
_auth=Depends(require_auth),
|
|
):
|
|
"""Generate XMLs from Excel data and save to template_uploads/."""
|
|
template_path = TEMPLATES_DIR / template_name
|
|
if not template_path.exists():
|
|
return {"error": f"Template '{template_name}' not found in xml_templates/"}
|
|
template = template_path.read_text(encoding="utf-8")
|
|
|
|
# Get Excel bytes — from fresh upload or cache
|
|
if excel_file:
|
|
excel_bytes = await excel_file.read()
|
|
_excel_cache_set(excel_filename, excel_bytes)
|
|
else:
|
|
excel_bytes = _excel_cache_get(excel_filename)
|
|
if excel_bytes is None:
|
|
return {"error": "Excel file not found or session expired. Please re-upload it."}
|
|
|
|
host_list = json.loads(hostnames)
|
|
results = build_xmls_from_excel(excel_bytes, template, host_list, default_value)
|
|
ok = sum(1 for r in results if r["success"])
|
|
err = len(results) - ok
|
|
return {"results": results, "succeeded": ok, "failed": err}
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Abort endpoint
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
@app.post("/api/jobs/{job_id}/abort")
|
|
def abort_job(request: Request, job_id: str, _auth=Depends(require_auth)):
|
|
if job_id not in jobs:
|
|
return {"error": "Job not found"}
|
|
if jobs[job_id]["status"] == "done":
|
|
return {"error": "Job already finished"}
|
|
abort_flags.add(job_id)
|
|
jobs[job_id]["logs"].append(
|
|
f"[{datetime.now().strftime('%H:%M:%S')}] ⚠ Abort requested — stopping after current device..."
|
|
)
|
|
return {"aborted": True, "job_id": job_id}
|
|
|
|
|
|
@app.post("/api/jobs/abort-all")
|
|
def abort_all_jobs(request: Request, _auth=Depends(require_auth)):
|
|
aborted = []
|
|
for job_id, job in jobs.items():
|
|
if job["status"] in ("running", "queued"):
|
|
abort_flags.add(job_id)
|
|
job["logs"].append(
|
|
f"[{datetime.now().strftime('%H:%M:%S')}] ⚠ Abort-all requested."
|
|
)
|
|
aborted.append(job_id)
|
|
return {"aborted": aborted, "count": len(aborted)}
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# AT Terminal — SSH command execution
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class ATRequest(BaseModel):
|
|
device_ids: list[str] = [] # empty = all devices in group
|
|
group: str = "All"
|
|
commands: list[str] # list of AT commands to send in order
|
|
ssh_username: str = ""
|
|
ssh_password: str = ""
|
|
concurrency: int = 3
|
|
|
|
|
|
def _ssh_send_at_commands(
|
|
device: dict,
|
|
commands: list[str],
|
|
ssh_username: str,
|
|
ssh_password: str,
|
|
session_id: str,
|
|
) -> dict:
|
|
"""
|
|
Open an SSH session to one device on port 3223, send AT commands
|
|
one at a time, capture output, return result dict.
|
|
"""
|
|
device_id = device["id"]
|
|
ip = device["ip"]
|
|
username = ssh_username or device.get("username", "user")
|
|
password = ssh_password or device.get("password", "")
|
|
ts = lambda: datetime.now().strftime('%H:%M:%S')
|
|
|
|
def log(msg, level="info"):
|
|
at_sessions[session_id]["logs"].append({
|
|
"ts": ts(), "device": device_id, "msg": msg, "level": level
|
|
})
|
|
|
|
log(f"Connecting SSH → {ip}:{SSH_PORT}")
|
|
ssh = paramiko.SSHClient()
|
|
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
try:
|
|
ssh.connect(
|
|
ip,
|
|
port=SSH_PORT,
|
|
username=username,
|
|
password=password,
|
|
timeout=15,
|
|
look_for_keys=False,
|
|
allow_agent=False,
|
|
disabled_algorithms={"pubkeys": [], "keys": []},
|
|
)
|
|
except Exception as e:
|
|
log(f"✗ Connection failed: {str(e)[:80]}", "error")
|
|
return {"id": device_id, "success": False, "message": str(e)[:120], "outputs": []}
|
|
|
|
outputs = []
|
|
all_ok = True
|
|
|
|
try:
|
|
# Use an interactive shell channel so the modem's AT prompt stays alive
|
|
chan = ssh.invoke_shell(term="vt100", width=200, height=50)
|
|
chan.settimeout(10)
|
|
|
|
import time, socket
|
|
|
|
# Drain any banner/login text the modem sends on connect
|
|
time.sleep(1.0)
|
|
buf = b""
|
|
try:
|
|
while True:
|
|
chunk = chan.recv(4096)
|
|
if not chunk:
|
|
break
|
|
buf += chunk
|
|
except socket.timeout:
|
|
pass
|
|
|
|
for cmd in commands:
|
|
log(f"→ {cmd}", "cmd")
|
|
chan.send(cmd + "\r")
|
|
time.sleep(0.8)
|
|
|
|
response_buf = b""
|
|
deadline = time.time() + 8 # max 8 s per command
|
|
try:
|
|
while time.time() < deadline:
|
|
if chan.recv_ready():
|
|
chunk = chan.recv(4096)
|
|
if not chunk:
|
|
break
|
|
response_buf += chunk
|
|
# Stop early if we see OK or ERROR
|
|
decoded_so_far = response_buf.decode("utf-8", errors="replace")
|
|
if "\nOK" in decoded_so_far or "\nERROR" in decoded_so_far:
|
|
time.sleep(0.1)
|
|
break
|
|
else:
|
|
time.sleep(0.1)
|
|
except socket.timeout:
|
|
pass
|
|
|
|
response = response_buf.decode("utf-8", errors="replace").strip()
|
|
# Strip the echoed command from the start if present
|
|
if response.startswith(cmd):
|
|
response = response[len(cmd):].strip()
|
|
|
|
if response:
|
|
for line in response.splitlines():
|
|
stripped = line.strip()
|
|
if stripped:
|
|
log(f" {stripped}", "data")
|
|
else:
|
|
log(" (no response)", "warn")
|
|
|
|
outputs.append({"command": cmd, "output": response})
|
|
|
|
# If ATZ was sent the modem will reboot — nothing more to do
|
|
if cmd.strip().upper() == "ATZ":
|
|
log(" Modem rebooting.", "warn")
|
|
break
|
|
|
|
except Exception as e:
|
|
log(f"✗ Session error: {str(e)[:80]}", "error")
|
|
all_ok = False
|
|
outputs.append({"command": "session", "output": str(e)})
|
|
finally:
|
|
try:
|
|
chan.close()
|
|
except Exception:
|
|
pass
|
|
ssh.close()
|
|
|
|
log(f"{'✓ Done' if all_ok else '✗ Finished with errors'}", "ok" if all_ok else "error")
|
|
return {
|
|
"id": device_id, "success": all_ok,
|
|
"message": "OK" if all_ok else "Errors — see log",
|
|
"outputs": outputs,
|
|
}
|
|
|
|
|
|
def _run_at_session(session_id: str, devices: list, commands: list, ssh_username: str, ssh_password: str, concurrency: int):
|
|
"""Background thread: send AT commands to all selected devices."""
|
|
import concurrent.futures
|
|
at_sessions[session_id]["status"] = "running"
|
|
results = []
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
|
|
futures = {
|
|
executor.submit(_ssh_send_at_commands, d, commands, ssh_username, ssh_password, session_id): d
|
|
for d in devices
|
|
}
|
|
for future in concurrent.futures.as_completed(futures):
|
|
try:
|
|
results.append(future.result())
|
|
except Exception as e:
|
|
d = futures[future]
|
|
results.append({"id": d["id"], "success": False, "message": str(e), "outputs": []})
|
|
|
|
at_sessions[session_id]["results"] = results
|
|
at_sessions[session_id]["status"] = "done"
|
|
at_sessions[session_id]["finished"] = datetime.now().isoformat()
|
|
_persist_at_session(session_id)
|
|
|
|
|
|
@app.post("/api/at/send")
|
|
async def at_send(request: Request, req: ATRequest, background_tasks: BackgroundTasks, _auth=Depends(require_auth)):
|
|
"""Start an AT command session against one or more devices."""
|
|
if not req.commands or not any(c.strip() for c in req.commands):
|
|
return {"error": "No commands provided"}
|
|
commands = [c.strip() for c in req.commands if c.strip()]
|
|
|
|
devices = load_devices(req.group)
|
|
if req.device_ids:
|
|
devices = [d for d in devices if d["id"] in req.device_ids]
|
|
if not devices:
|
|
return {"error": "No matching devices found"}
|
|
|
|
session_id = f"at_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
|
|
at_sessions[session_id] = {
|
|
"status": "queued",
|
|
"started": datetime.now().isoformat(),
|
|
"finished": None,
|
|
"device_count": len(devices),
|
|
"commands": commands,
|
|
"logs": [],
|
|
"results": [],
|
|
}
|
|
|
|
# Run in a background thread (paramiko is blocking/sync)
|
|
t = threading.Thread(
|
|
target=_run_at_session,
|
|
args=(session_id, devices, commands, req.ssh_username, req.ssh_password, req.concurrency),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
return {"session_id": session_id, "device_count": len(devices)}
|
|
|
|
|
|
# ── AT preset helpers ──────────────────────────────────────────────────────────
|
|
|
|
def _read_presets() -> dict:
|
|
if PRESETS_FILE.exists():
|
|
try:
|
|
return json.loads(PRESETS_FILE.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return {}
|
|
|
|
|
|
def _write_presets(presets: dict):
|
|
PRESETS_FILE.write_text(json.dumps(presets, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
|
|
|
|
# Preset routes MUST be registered before the {session_id} wildcard routes below.
|
|
@app.get("/api/at/presets")
|
|
def at_list_presets(request: Request, _auth=Depends(require_auth)):
|
|
presets = _read_presets()
|
|
return {"presets": [{"name": k, "commands": v} for k, v in presets.items()]}
|
|
|
|
|
|
@app.post("/api/at/presets")
|
|
def at_save_preset(request: Request, req: PresetRequest, _auth=Depends(require_auth)):
|
|
name = req.name.strip()
|
|
commands = [c.strip() for c in req.commands if c.strip()]
|
|
if not name:
|
|
return {"error": "Preset name cannot be empty"}
|
|
if not commands:
|
|
return {"error": "No commands provided"}
|
|
presets = _read_presets()
|
|
presets[name] = commands
|
|
_write_presets(presets)
|
|
return {"saved": True, "name": name, "commands": commands}
|
|
|
|
|
|
@app.delete("/api/at/presets/{name}")
|
|
def at_delete_preset(request: Request, name: str, _auth=Depends(require_auth)):
|
|
presets = _read_presets()
|
|
if name not in presets:
|
|
return {"error": "Preset not found"}
|
|
del presets[name]
|
|
_write_presets(presets)
|
|
return {"deleted": True, "name": name}
|
|
|
|
|
|
@app.get("/api/at/{session_id}/stream")
|
|
async def at_stream(request: Request, session_id: str, from_line: int = 0, _auth=Depends(require_auth)):
|
|
"""SSE stream for AT session logs — same pattern as job streaming."""
|
|
if session_id not in at_sessions:
|
|
return StreamingResponse(
|
|
iter([f'data: {json.dumps({"error": "session not found"})}\n\n']),
|
|
media_type="text/event-stream"
|
|
)
|
|
|
|
async def event_generator():
|
|
sent = max(0, from_line)
|
|
while True:
|
|
if session_id not in at_sessions:
|
|
yield f'data: {json.dumps({"error": "session gone"})}\n\n'
|
|
break
|
|
sess = at_sessions[session_id]
|
|
logs = sess["logs"]
|
|
if len(logs) > sent:
|
|
for entry in logs[sent:]:
|
|
yield f"data: {json.dumps({'log': entry})}\n\n"
|
|
sent += 1
|
|
if sess["status"] == "done":
|
|
yield f"data: {json.dumps({'done': True, 'results': sess['results']})}\n\n"
|
|
break
|
|
await asyncio.sleep(0.4)
|
|
|
|
return StreamingResponse(event_generator(), media_type="text/event-stream")
|
|
|
|
|
|
@app.get("/api/at/{session_id}")
|
|
def at_get_session(request: Request, session_id: str, _auth=Depends(require_auth)):
|
|
if session_id not in at_sessions:
|
|
return {"error": "Session not found"}
|
|
return at_sessions[session_id]
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Auth routes — login / logout
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
LOGIN_HTML = """<!DOCTYPE html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="UTF-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
<title>RV50x Manager — Login</title>
|
|
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap" rel="stylesheet">
|
|
<style>
|
|
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
|
|
body {
|
|
background: #0a0c0f; color: #c8d8e8;
|
|
font-family: 'Inter', sans-serif;
|
|
min-height: 100vh; display: flex; align-items: center; justify-content: center;
|
|
}
|
|
body::before {
|
|
content: ''; position: fixed; inset: 0;
|
|
background: repeating-linear-gradient(0deg, transparent, transparent 2px, rgba(0,0,0,0.03) 2px, rgba(0,0,0,0.03) 4px);
|
|
pointer-events: none;
|
|
}
|
|
.card {
|
|
background: #10141a; border: 1px solid #1e2a38; border-radius: 8px;
|
|
padding: 40px; width: 360px; box-shadow: 0 0 40px rgba(0,191,255,0.06);
|
|
position: relative; z-index: 1;
|
|
}
|
|
.logo { display: flex; align-items: center; gap: 12px; margin-bottom: 32px; }
|
|
.logo-icon {
|
|
width: 32px; height: 32px; background: #00bfff; flex-shrink: 0;
|
|
clip-path: polygon(50% 0%, 100% 25%, 100% 75%, 50% 100%, 0% 75%, 0% 25%);
|
|
}
|
|
.logo-text { font-size: 18px; font-weight: 700; letter-spacing: 2px; color: #00bfff; text-transform: uppercase; }
|
|
.logo-sub { font-size: 10px; color: #3a5068; letter-spacing: 1px; margin-top: 2px; }
|
|
.field { margin-bottom: 16px; }
|
|
label { display: block; font-size: 11px; font-weight: 600; letter-spacing: 1.5px; color: #6a8aa8; text-transform: uppercase; margin-bottom: 6px; }
|
|
input[type=text], input[type=password] {
|
|
width: 100%; background: #161c25; border: 1px solid #1e2a38; color: #c8d8e8;
|
|
font-family: 'Inter', sans-serif; font-size: 14px; padding: 10px 14px;
|
|
border-radius: 4px; outline: none; transition: border-color 0.15s;
|
|
}
|
|
input[type=text]:focus, input[type=password]:focus { border-color: #00bfff; }
|
|
.btn-login {
|
|
width: 100%; background: #00bfff; border: none; color: #000;
|
|
font-family: 'Inter', sans-serif; font-size: 14px; font-weight: 700;
|
|
letter-spacing: 1px; padding: 11px; border-radius: 4px; cursor: pointer;
|
|
margin-top: 8px; transition: background 0.15s; text-transform: uppercase;
|
|
}
|
|
.btn-login:hover { background: #33ccff; }
|
|
.error {
|
|
background: rgba(255,23,68,0.1); border: 1px solid rgba(255,23,68,0.3);
|
|
color: #ff1744; font-size: 13px; padding: 10px 14px; border-radius: 4px; margin-bottom: 16px;
|
|
}
|
|
.footer { text-align: center; margin-top: 24px; font-size: 11px; color: #3a5068; letter-spacing: 0.5px; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="card">
|
|
<div class="logo">
|
|
<div class="logo-icon"></div>
|
|
<div>
|
|
<div class="logo-text">RV50x Manager</div>
|
|
<div class="logo-sub">SIERRA WIRELESS // TEMPLATE OPS</div>
|
|
</div>
|
|
</div>
|
|
{error_block}
|
|
<form method="POST" action="/auth/login">
|
|
<div class="field">
|
|
<label>Username</label>
|
|
<input type="text" name="username" autocomplete="username" autofocus required>
|
|
</div>
|
|
<div class="field">
|
|
<label>Password</label>
|
|
<input type="password" name="password" autocomplete="current-password" required>
|
|
</div>
|
|
<button class="btn-login" type="submit">Sign In</button>
|
|
</form>
|
|
<div class="footer">HTTPS secured · Session expires in {hours}h</div>
|
|
</div>
|
|
</body>
|
|
</html>"""
|
|
|
|
|
|
@app.get("/login", response_class=HTMLResponse)
|
|
def login_page(error: str = ""):
|
|
error_block = f'<div class="error">⚠ {error}</div>' if error else ""
|
|
return HTMLResponse(
|
|
LOGIN_HTML
|
|
.replace("{error_block}", error_block)
|
|
.replace("{hours}", str(SESSION_HOURS))
|
|
)
|
|
|
|
|
|
@app.post("/auth/login")
|
|
async def auth_login(username: str = Form(...), password: str = Form(...)):
|
|
if username == APP_USERNAME and password == APP_PASSWORD:
|
|
token = _make_session_cookie()
|
|
resp = RedirectResponse(url="/", status_code=303)
|
|
resp.set_cookie(
|
|
key=SESSION_COOKIE, value=token,
|
|
httponly=True, secure=False, samesite="lax",
|
|
max_age=SESSION_HOURS * 3600,
|
|
)
|
|
return resp
|
|
return RedirectResponse(url="/login?error=Invalid+username+or+password", status_code=303)
|
|
|
|
|
|
@app.get("/auth/logout")
|
|
def auth_logout():
|
|
resp = RedirectResponse(url="/login", status_code=303)
|
|
resp.delete_cookie(SESSION_COOKIE)
|
|
return resp
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# Serve frontend (auth protected)
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index(request: Request, _auth=Depends(require_auth)):
|
|
html_path = SCRIPT_DIR / "index.html"
|
|
return HTMLResponse(html_path.read_text(encoding="utf-8"))
|