Files
lldp-mapper/app.py
T

245 lines
7.9 KiB
Python

# app.py - Flask backend
import logging
import threading
import os
from flask import Flask, jsonify, send_file, render_template, request
from apscheduler.schedulers.background import BackgroundScheduler
import db
from db import save_node_positions, get_node_positions, clear_node_positions
import scanner
from config import EXPORTS_DIR
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder='templates', static_folder='static')
# --- Scheduler setup ---
apscheduler = BackgroundScheduler()
apscheduler.start()
_scheduler_job_id = "autoscan"
def _reschedule(interval_minutes):
if apscheduler.get_job(_scheduler_job_id):
apscheduler.remove_job(_scheduler_job_id)
apscheduler.add_job(
lambda: _trigger_scan_background(dept=None),
'interval',
minutes=int(interval_minutes),
id=_scheduler_job_id,
replace_existing=True
)
logger.info(f"Auto-scan scheduled every {interval_minutes} minutes")
def _trigger_scan_background(dept: str = None):
"""Run scan in a background thread. dept=None → all switches."""
if not scanner.scan_state["running"]:
t = threading.Thread(target=scanner.run_scan, kwargs={"dept": dept}, daemon=True)
t.start()
# Restore scheduler state on startup
db.init_db()
if db.get_setting("autoscan_enabled") == "true":
interval = db.get_setting("autoscan_interval") or "60"
_reschedule(interval)
# --- API Routes ---
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/scan", methods=["POST"])
def api_scan():
"""Scan all active switches (no dept filter)."""
if scanner.scan_state["running"]:
return jsonify({"error": "Scan already running"}), 409
_trigger_scan_background(dept=None)
return jsonify({"status": "started", "dept": None})
@app.route("/api/scan/clear", methods=["POST"])
def api_scan_clear():
"""Wipe all switch & link data then run a full scan."""
if scanner.scan_state["running"]:
return jsonify({"error": "Scan already running"}), 409
conn = db.get_conn()
conn.execute("DELETE FROM switches")
conn.execute("DELETE FROM links")
conn.commit()
conn.close()
_trigger_scan_background(dept=None)
return jsonify({"status": "started", "cleared": True})
@app.route("/api/scan/elec", methods=["POST"])
def api_scan_elec():
"""Scan only ELEC department switches."""
if scanner.scan_state["running"]:
return jsonify({"error": "Scan already running"}), 409
_trigger_scan_background(dept="ELEC")
return jsonify({"status": "started", "dept": "ELEC"})
@app.route("/api/scan/gw", methods=["POST"])
def api_scan_gw():
"""Scan only GW department switches."""
if scanner.scan_state["running"]:
return jsonify({"error": "Scan already running"}), 409
_trigger_scan_background(dept="GW")
return jsonify({"status": "started", "dept": "GW"})
@app.route("/api/status")
def api_status():
state = dict(scanner.scan_state)
last = db.get_last_scan()
state["last_scan_log"] = dict(last) if last else None
state["autoscan_enabled"] = db.get_setting("autoscan_enabled") == "true"
state["autoscan_interval"] = int(db.get_setting("autoscan_interval") or 60)
return jsonify(state)
@app.route("/api/switches")
def api_switches():
return jsonify(db.get_all_switches())
@app.route("/api/links")
def api_links():
return jsonify(db.get_all_links())
@app.route("/api/topology")
def api_topology():
switches = {s["chassis_id"]: s for s in db.get_all_switches()}
links = db.get_all_links()
nodes = []
for chassis_id, sw in switches.items():
nodes.append({
"data": {
"id": chassis_id,
"label": sw.get("hostname") or chassis_id,
"hostname": sw.get("hostname", ""),
"mgmt_ip": sw.get("mgmt_ip", ""),
"description": sw.get("description", ""),
"firmware": sw.get("firmware", ""),
"vendor": sw.get("vendor", ""),
"chassis_id": chassis_id,
"last_seen": sw.get("last_seen", ""),
}
})
edges = []
for link in links:
edge_id = f"{link['chassis_a']}:{link['port_a']}__{link['chassis_b']}:{link['port_b']}"
hostname_a = switches.get(link['chassis_a'], {}).get('hostname', link['chassis_a'])
hostname_b = switches.get(link['chassis_b'], {}).get('hostname', link['chassis_b'])
edges.append({
"data": {
"id": edge_id,
"source": link["chassis_a"],
"target": link["chassis_b"],
"port_a": link["port_a"],
"port_b": link["port_b"],
"hostname_a": hostname_a,
"hostname_b": hostname_b,
}
})
return jsonify({"nodes": nodes, "edges": edges})
@app.route("/api/settings", methods=["POST"])
def api_settings():
data = request.json
enabled = data.get("autoscan_enabled", False)
interval = int(data.get("autoscan_interval", 60))
db.set_setting("autoscan_enabled", "true" if enabled else "false")
db.set_setting("autoscan_interval", str(interval))
if enabled:
_reschedule(interval)
else:
if apscheduler.get_job(_scheduler_job_id):
apscheduler.remove_job(_scheduler_job_id)
logger.info("Auto-scan disabled")
return jsonify({"autoscan_enabled": enabled, "autoscan_interval": interval})
@app.route("/api/layout", methods=["POST"])
def api_save_layout():
data = request.json
positions = data.get("positions", [])
if not positions:
return jsonify({"error": "No positions provided"}), 400
# Normalize 'id' -> 'chassis_id' so JS can send either key
normalized = [{"chassis_id": p.get("chassis_id") or p.get("id"), "x": p["x"], "y": p["y"]} for p in positions]
save_node_positions(normalized)
return jsonify({"saved": len(positions)})
@app.route("/api/layout", methods=["DELETE"])
def api_clear_layout():
clear_node_positions()
return jsonify({"status": "cleared"})
@app.route("/api/layout", methods=["GET"])
def api_get_layout():
positions = get_node_positions()
return jsonify({"has_custom_layout": len(positions) > 0, "count": len(positions)})
@app.route("/api/layout/positions", methods=["GET"])
def api_get_layout_positions():
"""Return saved positions as list of {id, x, y} for Cytoscape."""
positions = get_node_positions()
return jsonify([{"id": cid, "x": xy[0], "y": xy[1]} for cid, xy in positions.items()])
@app.route("/api/switches/clear-stale", methods=["POST"])
def api_clear_stale_switches():
"""Remove switches not seen in the last scan."""
last = db.get_last_scan()
if not last:
return jsonify({"error": "No scan has been run yet"}), 400
since = last["started_at"]
deleted = db.clear_stale_switches(since)
return jsonify({"deleted": deleted})
@app.route("/api/export/csv")
def api_export_csv():
path = os.path.join(EXPORTS_DIR, "topology.csv")
if not os.path.exists(path):
return jsonify({"error": "No export found, run a scan first"}), 404
return send_file(path, as_attachment=True, download_name="topology.csv")
@app.route("/api/export/mermaid")
def api_export_mermaid():
path = os.path.join(EXPORTS_DIR, "topology.md")
if not os.path.exists(path):
return jsonify({"error": "No export found, run a scan first"}), 404
return send_file(path, as_attachment=True, download_name="topology.md")
@app.route("/api/export/png")
def api_export_png():
path = os.path.join(EXPORTS_DIR, "topology.png")
if not os.path.exists(path):
return jsonify({"error": "No PNG found, run a scan first"}), 404
return send_file(path, as_attachment=True, download_name="topology.png")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)