Files
lldp-mapper/db.py
T
dstephenson 40d4679a59 Initial commit — LLDP network mapper
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 20:56:13 +00:00

313 lines
9.9 KiB
Python

# db.py - SQLite database operations
import sqlite3
import re
import os
import logging
from config import DB_PATH
logger = logging.getLogger(__name__)
def get_conn():
conn = sqlite3.connect(DB_PATH, check_same_thread=False, timeout=10)
conn.row_factory = sqlite3.Row
return conn
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = get_conn()
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS switches (
chassis_id TEXT PRIMARY KEY,
hostname TEXT,
mgmt_ip TEXT,
description TEXT,
last_seen TEXT
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chassis_a TEXT,
port_a TEXT,
chassis_b TEXT,
port_b TEXT,
UNIQUE(chassis_a, port_a, chassis_b, port_b)
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
)
""")
c.execute("""
CREATE TABLE IF NOT EXISTS scan_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT,
finished_at TEXT,
status TEXT,
switches_ok INTEGER DEFAULT 0,
switches_fail INTEGER DEFAULT 0
)
""")
c.execute("INSERT OR IGNORE INTO settings VALUES ('autoscan_enabled', 'false')")
c.execute("INSERT OR IGNORE INTO settings VALUES ('autoscan_interval', '60')")
c.execute(
"CREATE TABLE IF NOT EXISTS node_positions "
"(chassis_id TEXT PRIMARY KEY, x REAL, y REAL)"
)
conn.commit()
conn.close()
def upsert_switch(chassis_id, hostname, mgmt_ip, description):
conn = get_conn()
conn.execute("""
INSERT INTO switches (chassis_id, hostname, mgmt_ip, description, last_seen)
VALUES (?, ?, ?, ?, datetime('now'))
ON CONFLICT(chassis_id) DO UPDATE SET
hostname = excluded.hostname,
mgmt_ip = CASE WHEN excluded.mgmt_ip != '' AND excluded.mgmt_ip NOT LIKE '%.%.%.%' = 0
THEN excluded.mgmt_ip ELSE mgmt_ip END,
description = CASE WHEN excluded.description != '' THEN excluded.description ELSE description END,
last_seen = excluded.last_seen
""", (chassis_id, hostname, mgmt_ip, description))
conn.commit()
conn.close()
def upsert_link(chassis_a, port_a, chassis_b, port_b):
# Normalize order so A→B and B→A are treated as the same link
if chassis_a > chassis_b:
chassis_a, chassis_b = chassis_b, chassis_a
port_a, port_b = port_b, port_a
conn = get_conn()
# Check both orderings before inserting
existing = conn.execute("""
SELECT id FROM links WHERE
(chassis_a=? AND port_a=? AND chassis_b=? AND port_b=?) OR
(chassis_a=? AND port_a=? AND chassis_b=? AND port_b=?)
""", (chassis_a, port_a, chassis_b, port_b,
chassis_b, port_b, chassis_a, port_a)).fetchone()
if not existing:
conn.execute("""
INSERT OR IGNORE INTO links (chassis_a, port_a, chassis_b, port_b)
VALUES (?, ?, ?, ?)
""", (chassis_a, port_a, chassis_b, port_b))
conn.commit()
conn.close()
def clear_stale_switches(since_dt):
"""Remove switches not seen since the given datetime string."""
conn = get_conn()
conn.execute("DELETE FROM switches WHERE last_seen < ? OR last_seen IS NULL", (since_dt,))
deleted = conn.execute("SELECT changes()").fetchone()[0]
conn.commit()
conn.close()
return deleted
def clear_links():
conn = get_conn()
conn.execute("DELETE FROM links")
conn.commit()
conn.close()
def _is_mac(chassis):
return bool(re.match(r'^[0-9A-Fa-f]{2}[-:][0-9A-Fa-f]{2}', chassis))
def _is_real_ip(val):
return bool(re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', val or ''))
def merge_duplicate_switches():
"""
After a scan, some switches may have two records:
- One with MAC chassis_id (seen as neighbor by another switch)
- One with IP chassis_id (directly scanned but chassis extraction failed)
This function merges them: keeps MAC as canonical, fixes links, removes IP record.
"""
conn = get_conn()
dupes = conn.execute("""
SELECT hostname, COUNT(*) as cnt, GROUP_CONCAT(chassis_id) as ids,
GROUP_CONCAT(mgmt_ip) as ips
FROM switches
GROUP BY hostname
HAVING cnt > 1
""").fetchall()
merged = 0
for row in dupes:
ids = row['ids'].split(',')
ips_raw = row['ips'].split(',')
mac_ids = [i for i in ids if _is_mac(i)]
ip_ids = [i for i in ids if not _is_mac(i)]
real_ips = [v for v in ips_raw if _is_real_ip(v)]
if not mac_ids or not ip_ids:
continue
canonical = mac_ids[0]
best_ip = real_ips[0] if real_ips else ''
# Update canonical with best mgmt_ip
if best_ip:
conn.execute("UPDATE switches SET mgmt_ip=? WHERE chassis_id=?",
(best_ip, canonical))
# Repoint links
for old_id in ip_ids:
conn.execute("UPDATE links SET chassis_a=? WHERE chassis_a=?", (canonical, old_id))
conn.execute("UPDATE links SET chassis_b=? WHERE chassis_b=?", (canonical, old_id))
conn.execute("DELETE FROM switches WHERE chassis_id=?", (old_id,))
merged += 1
logger.info(f"Merged {row['hostname']}: {ip_ids} -> {canonical} (mgmt_ip={best_ip})")
# Remove any self-links or exact duplicate links created by repointing
# Remove duplicate links - normalize chassis+port order before grouping
conn.execute("""
DELETE FROM links WHERE id NOT IN (
SELECT MIN(id) FROM links
GROUP BY
CASE WHEN chassis_a < chassis_b THEN chassis_a ELSE chassis_b END,
CASE WHEN chassis_a < chassis_b THEN chassis_b ELSE chassis_a END,
CASE WHEN chassis_a < chassis_b THEN port_a ELSE port_b END,
CASE WHEN chassis_a < chassis_b THEN port_b ELSE port_a END
)
""")
conn.execute("DELETE FROM links WHERE chassis_a = chassis_b")
conn.commit()
conn.close()
logger.info(f"merge_duplicate_switches: merged {merged} hostname groups")
def get_all_switches():
conn = get_conn()
rows = conn.execute("SELECT * FROM switches ORDER BY hostname").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_all_links():
conn = get_conn()
rows = conn.execute("""
SELECT l.*,
sa.hostname as hostname_a, sa.mgmt_ip as ip_a,
sb.hostname as hostname_b, sb.mgmt_ip as ip_b
FROM links l
LEFT JOIN switches sa ON l.chassis_a = sa.chassis_id
LEFT JOIN switches sb ON l.chassis_b = sb.chassis_id
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def get_setting(key):
conn = get_conn()
row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
conn.close()
return row["value"] if row else None
def set_setting(key, value):
conn = get_conn()
conn.execute("INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", (key, str(value)))
conn.commit()
conn.close()
def log_scan_start():
conn = get_conn()
c = conn.cursor()
c.execute("INSERT INTO scan_log (started_at, status) VALUES (datetime('now'), 'running')")
scan_id = c.lastrowid
conn.commit()
conn.close()
return scan_id
def log_scan_finish(scan_id, ok_count, fail_count):
conn = get_conn()
conn.execute("""
UPDATE scan_log SET finished_at=datetime('now'), status='done',
switches_ok=?, switches_fail=? WHERE id=?
""", (ok_count, fail_count, scan_id))
conn.commit()
conn.close()
def get_last_scan():
conn = get_conn()
row = conn.execute(
"SELECT * FROM scan_log ORDER BY id DESC LIMIT 1"
).fetchone()
conn.close()
return dict(row) if row else None
def deduplicate_links():
"""Remove duplicate bidirectional links. Always run after a scan."""
conn = get_conn()
before = conn.execute("SELECT COUNT(*) FROM links").fetchone()[0]
conn.execute("""
DELETE FROM links WHERE id NOT IN (
SELECT MIN(id) FROM links
GROUP BY
CASE WHEN chassis_a < chassis_b THEN chassis_a ELSE chassis_b END,
CASE WHEN chassis_a < chassis_b THEN chassis_b ELSE chassis_a END,
CASE WHEN chassis_a < chassis_b THEN port_a ELSE port_b END,
CASE WHEN chassis_a < chassis_b THEN port_b ELSE port_a END
)
""")
conn.execute("DELETE FROM links WHERE chassis_a = chassis_b")
conn.commit()
after = conn.execute("SELECT COUNT(*) FROM links").fetchone()[0]
conn.close()
logger.info(f"deduplicate_links: {before} -> {after} links")
def save_node_positions(positions):
"""Save node positions from Cytoscape. positions = list of {chassis_id, x, y}"""
conn = get_conn()
conn.execute("DELETE FROM node_positions")
for p in positions:
conn.execute(
"INSERT INTO node_positions (chassis_id, x, y) VALUES (?, ?, ?)",
(p['chassis_id'], float(p['x']), float(p['y']))
)
conn.commit()
conn.close()
logging.getLogger(__name__).info(f"Saved {len(positions)} node positions")
def get_node_positions():
"""Returns dict of chassis_id -> (x, y), empty if none saved."""
conn = get_conn()
rows = conn.execute("SELECT chassis_id, x, y FROM node_positions").fetchall()
conn.close()
return {r["chassis_id"]: (r["x"], r["y"]) for r in rows}
def clear_node_positions():
conn = get_conn()
conn.execute("DELETE FROM node_positions")
conn.commit()
conn.close()
logging.getLogger(__name__).info("Node positions cleared")