c8de2620c8
Serialised logins now sleep `login_delay` seconds between each SSH auth to prevent AD/LDAP lockout. Both max sessions (1-10) and login delay (0-15s) are configurable via UI sliders in the header and passed as JSON to all scan endpoints. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
258 lines
8.6 KiB
Python
258 lines
8.6 KiB
Python
# app.py - Flask backend
|
|
import logging
|
|
import threading
|
|
import os
|
|
from flask import Flask, jsonify, send_file, render_template, request
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
|
|
import db
|
|
from db import save_node_positions, get_node_positions, clear_node_positions
|
|
import scanner
|
|
from config import EXPORTS_DIR
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__, template_folder='templates', static_folder='static')
|
|
|
|
# --- Scheduler setup ---
|
|
apscheduler = BackgroundScheduler()
|
|
apscheduler.start()
|
|
_scheduler_job_id = "autoscan"
|
|
|
|
|
|
def _reschedule(interval_minutes):
|
|
if apscheduler.get_job(_scheduler_job_id):
|
|
apscheduler.remove_job(_scheduler_job_id)
|
|
apscheduler.add_job(
|
|
lambda: _trigger_scan_background(dept=None),
|
|
'interval',
|
|
minutes=int(interval_minutes),
|
|
id=_scheduler_job_id,
|
|
replace_existing=True
|
|
)
|
|
logger.info(f"Auto-scan scheduled every {interval_minutes} minutes")
|
|
|
|
|
|
def _trigger_scan_background(dept: str = None, workers: int = 5, login_delay: int = 3):
|
|
"""Run scan in a background thread. dept=None → all switches."""
|
|
if not scanner.scan_state["running"]:
|
|
t = threading.Thread(target=scanner.run_scan,
|
|
kwargs={"dept": dept, "workers": workers, "login_delay": login_delay},
|
|
daemon=True)
|
|
t.start()
|
|
|
|
|
|
# Restore scheduler state on startup
|
|
db.init_db()
|
|
if db.get_setting("autoscan_enabled") == "true":
|
|
interval = db.get_setting("autoscan_interval") or "60"
|
|
_reschedule(interval)
|
|
|
|
|
|
# --- API Routes ---
|
|
|
|
@app.route("/")
|
|
def index():
|
|
return render_template("index.html")
|
|
|
|
|
|
def _scan_params():
|
|
body = request.get_json(silent=True) or {}
|
|
workers = max(1, min(10, int(body.get("workers", 5))))
|
|
login_delay = max(0, min(30, int(body.get("login_delay", 3))))
|
|
return workers, login_delay
|
|
|
|
|
|
@app.route("/api/scan", methods=["POST"])
|
|
def api_scan():
|
|
"""Scan all active switches (no dept filter)."""
|
|
if scanner.scan_state["running"]:
|
|
return jsonify({"error": "Scan already running"}), 409
|
|
workers, login_delay = _scan_params()
|
|
_trigger_scan_background(dept=None, workers=workers, login_delay=login_delay)
|
|
return jsonify({"status": "started", "dept": None})
|
|
|
|
|
|
@app.route("/api/scan/clear", methods=["POST"])
|
|
def api_scan_clear():
|
|
"""Wipe all switch & link data then run a full scan."""
|
|
if scanner.scan_state["running"]:
|
|
return jsonify({"error": "Scan already running"}), 409
|
|
conn = db.get_conn()
|
|
conn.execute("DELETE FROM switches")
|
|
conn.execute("DELETE FROM links")
|
|
conn.commit()
|
|
conn.close()
|
|
workers, login_delay = _scan_params()
|
|
_trigger_scan_background(dept=None, workers=workers, login_delay=login_delay)
|
|
return jsonify({"status": "started", "cleared": True})
|
|
|
|
|
|
@app.route("/api/scan/elec", methods=["POST"])
|
|
def api_scan_elec():
|
|
"""Scan only ELEC department switches."""
|
|
if scanner.scan_state["running"]:
|
|
return jsonify({"error": "Scan already running"}), 409
|
|
workers, login_delay = _scan_params()
|
|
_trigger_scan_background(dept="ELEC", workers=workers, login_delay=login_delay)
|
|
return jsonify({"status": "started", "dept": "ELEC"})
|
|
|
|
|
|
@app.route("/api/scan/gw", methods=["POST"])
|
|
def api_scan_gw():
|
|
"""Scan only GW department switches."""
|
|
if scanner.scan_state["running"]:
|
|
return jsonify({"error": "Scan already running"}), 409
|
|
workers, login_delay = _scan_params()
|
|
_trigger_scan_background(dept="GW", workers=workers, login_delay=login_delay)
|
|
return jsonify({"status": "started", "dept": "GW"})
|
|
|
|
|
|
@app.route("/api/status")
|
|
def api_status():
|
|
state = dict(scanner.scan_state)
|
|
last = db.get_last_scan()
|
|
state["last_scan_log"] = dict(last) if last else None
|
|
state["autoscan_enabled"] = db.get_setting("autoscan_enabled") == "true"
|
|
state["autoscan_interval"] = int(db.get_setting("autoscan_interval") or 60)
|
|
return jsonify(state)
|
|
|
|
|
|
@app.route("/api/switches")
|
|
def api_switches():
|
|
return jsonify(db.get_all_switches())
|
|
|
|
|
|
@app.route("/api/links")
|
|
def api_links():
|
|
return jsonify(db.get_all_links())
|
|
|
|
|
|
@app.route("/api/topology")
|
|
def api_topology():
|
|
switches = {s["chassis_id"]: s for s in db.get_all_switches()}
|
|
links = db.get_all_links()
|
|
|
|
nodes = []
|
|
for chassis_id, sw in switches.items():
|
|
nodes.append({
|
|
"data": {
|
|
"id": chassis_id,
|
|
"label": sw.get("hostname") or chassis_id,
|
|
"hostname": sw.get("hostname", ""),
|
|
"mgmt_ip": sw.get("mgmt_ip", ""),
|
|
"description": sw.get("description", ""),
|
|
"firmware": sw.get("firmware", ""),
|
|
"vendor": sw.get("vendor", ""),
|
|
"chassis_id": chassis_id,
|
|
"last_seen": sw.get("last_seen", ""),
|
|
}
|
|
})
|
|
|
|
edges = []
|
|
for link in links:
|
|
edge_id = f"{link['chassis_a']}:{link['port_a']}__{link['chassis_b']}:{link['port_b']}"
|
|
hostname_a = switches.get(link['chassis_a'], {}).get('hostname', link['chassis_a'])
|
|
hostname_b = switches.get(link['chassis_b'], {}).get('hostname', link['chassis_b'])
|
|
edges.append({
|
|
"data": {
|
|
"id": edge_id,
|
|
"source": link["chassis_a"],
|
|
"target": link["chassis_b"],
|
|
"port_a": link["port_a"],
|
|
"port_b": link["port_b"],
|
|
"hostname_a": hostname_a,
|
|
"hostname_b": hostname_b,
|
|
}
|
|
})
|
|
|
|
return jsonify({"nodes": nodes, "edges": edges})
|
|
|
|
|
|
@app.route("/api/settings", methods=["POST"])
|
|
def api_settings():
|
|
data = request.json
|
|
enabled = data.get("autoscan_enabled", False)
|
|
interval = int(data.get("autoscan_interval", 60))
|
|
|
|
db.set_setting("autoscan_enabled", "true" if enabled else "false")
|
|
db.set_setting("autoscan_interval", str(interval))
|
|
|
|
if enabled:
|
|
_reschedule(interval)
|
|
else:
|
|
if apscheduler.get_job(_scheduler_job_id):
|
|
apscheduler.remove_job(_scheduler_job_id)
|
|
logger.info("Auto-scan disabled")
|
|
|
|
return jsonify({"autoscan_enabled": enabled, "autoscan_interval": interval})
|
|
|
|
|
|
@app.route("/api/layout", methods=["POST"])
|
|
def api_save_layout():
|
|
data = request.json
|
|
positions = data.get("positions", [])
|
|
if not positions:
|
|
return jsonify({"error": "No positions provided"}), 400
|
|
# Normalize 'id' -> 'chassis_id' so JS can send either key
|
|
normalized = [{"chassis_id": p.get("chassis_id") or p.get("id"), "x": p["x"], "y": p["y"]} for p in positions]
|
|
save_node_positions(normalized)
|
|
return jsonify({"saved": len(positions)})
|
|
|
|
|
|
@app.route("/api/layout", methods=["DELETE"])
|
|
def api_clear_layout():
|
|
clear_node_positions()
|
|
return jsonify({"status": "cleared"})
|
|
|
|
|
|
@app.route("/api/layout", methods=["GET"])
|
|
def api_get_layout():
|
|
positions = get_node_positions()
|
|
return jsonify({"has_custom_layout": len(positions) > 0, "count": len(positions)})
|
|
|
|
@app.route("/api/layout/positions", methods=["GET"])
|
|
def api_get_layout_positions():
|
|
"""Return saved positions as list of {id, x, y} for Cytoscape."""
|
|
positions = get_node_positions()
|
|
return jsonify([{"id": cid, "x": xy[0], "y": xy[1]} for cid, xy in positions.items()])
|
|
|
|
|
|
@app.route("/api/switches/clear-stale", methods=["POST"])
|
|
def api_clear_stale_switches():
|
|
"""Remove switches not seen in the last scan."""
|
|
last = db.get_last_scan()
|
|
if not last:
|
|
return jsonify({"error": "No scan has been run yet"}), 400
|
|
since = last["started_at"]
|
|
deleted = db.clear_stale_switches(since)
|
|
return jsonify({"deleted": deleted})
|
|
|
|
@app.route("/api/export/csv")
|
|
def api_export_csv():
|
|
path = os.path.join(EXPORTS_DIR, "topology.csv")
|
|
if not os.path.exists(path):
|
|
return jsonify({"error": "No export found, run a scan first"}), 404
|
|
return send_file(path, as_attachment=True, download_name="topology.csv")
|
|
|
|
|
|
@app.route("/api/export/mermaid")
|
|
def api_export_mermaid():
|
|
path = os.path.join(EXPORTS_DIR, "topology.md")
|
|
if not os.path.exists(path):
|
|
return jsonify({"error": "No export found, run a scan first"}), 404
|
|
return send_file(path, as_attachment=True, download_name="topology.md")
|
|
|
|
|
|
@app.route("/api/export/png")
|
|
def api_export_png():
|
|
path = os.path.join(EXPORTS_DIR, "topology.png")
|
|
if not os.path.exists(path):
|
|
return jsonify({"error": "No PNG found, run a scan first"}), 404
|
|
return send_file(path, as_attachment=True, download_name="topology.png")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000, debug=False)
|