Update core scanner, parser, SSH client, and UI

This commit is contained in:
2026-05-05 20:07:24 +00:00
parent 40d4679a59
commit d32ca80a22
6 changed files with 313 additions and 79 deletions
+16
View File
@@ -64,6 +64,20 @@ def api_scan():
return jsonify({"status": "started", "dept": None}) return jsonify({"status": "started", "dept": None})
@app.route("/api/scan/clear", methods=["POST"])
def api_scan_clear():
"""Wipe all switch & link data then run a full scan."""
if scanner.scan_state["running"]:
return jsonify({"error": "Scan already running"}), 409
conn = db.get_conn()
conn.execute("DELETE FROM switches")
conn.execute("DELETE FROM links")
conn.commit()
conn.close()
_trigger_scan_background(dept=None)
return jsonify({"status": "started", "cleared": True})
@app.route("/api/scan/elec", methods=["POST"]) @app.route("/api/scan/elec", methods=["POST"])
def api_scan_elec(): def api_scan_elec():
"""Scan only ELEC department switches.""" """Scan only ELEC department switches."""
@@ -116,6 +130,8 @@ def api_topology():
"hostname": sw.get("hostname", ""), "hostname": sw.get("hostname", ""),
"mgmt_ip": sw.get("mgmt_ip", ""), "mgmt_ip": sw.get("mgmt_ip", ""),
"description": sw.get("description", ""), "description": sw.get("description", ""),
"firmware": sw.get("firmware", ""),
"vendor": sw.get("vendor", ""),
"chassis_id": chassis_id, "chassis_id": chassis_id,
"last_seen": sw.get("last_seen", ""), "last_seen": sw.get("last_seen", ""),
} }
+15 -5
View File
@@ -25,9 +25,17 @@ def init_db():
hostname TEXT, hostname TEXT,
mgmt_ip TEXT, mgmt_ip TEXT,
description TEXT, description TEXT,
firmware TEXT DEFAULT '',
vendor TEXT DEFAULT '',
last_seen TEXT last_seen TEXT
) )
""") """)
# Migrate existing DBs that predate firmware/vendor columns
for col, default in [('firmware', ''), ('vendor', '')]:
try:
c.execute(f"ALTER TABLE switches ADD COLUMN {col} TEXT DEFAULT '{default}'")
except Exception:
pass # Column already exists
c.execute(""" c.execute("""
CREATE TABLE IF NOT EXISTS links ( CREATE TABLE IF NOT EXISTS links (
@@ -70,18 +78,20 @@ def init_db():
conn.close() conn.close()
def upsert_switch(chassis_id, hostname, mgmt_ip, description): def upsert_switch(chassis_id, hostname, mgmt_ip, description, firmware='', vendor=''):
conn = get_conn() conn = get_conn()
conn.execute(""" conn.execute("""
INSERT INTO switches (chassis_id, hostname, mgmt_ip, description, last_seen) INSERT INTO switches (chassis_id, hostname, mgmt_ip, description, firmware, vendor, last_seen)
VALUES (?, ?, ?, ?, datetime('now')) VALUES (?, ?, ?, ?, ?, ?, datetime('now'))
ON CONFLICT(chassis_id) DO UPDATE SET ON CONFLICT(chassis_id) DO UPDATE SET
hostname = excluded.hostname, hostname = excluded.hostname,
mgmt_ip = CASE WHEN excluded.mgmt_ip != '' AND excluded.mgmt_ip NOT LIKE '%.%.%.%' = 0 mgmt_ip = CASE WHEN excluded.mgmt_ip != '' AND excluded.mgmt_ip LIKE '%.%.%.%'
THEN excluded.mgmt_ip ELSE mgmt_ip END, THEN excluded.mgmt_ip ELSE mgmt_ip END,
description = CASE WHEN excluded.description != '' THEN excluded.description ELSE description END, description = CASE WHEN excluded.description != '' THEN excluded.description ELSE description END,
firmware = CASE WHEN excluded.firmware != '' THEN excluded.firmware ELSE firmware END,
vendor = CASE WHEN excluded.vendor != '' THEN excluded.vendor ELSE vendor END,
last_seen = excluded.last_seen last_seen = excluded.last_seen
""", (chassis_id, hostname, mgmt_ip, description)) """, (chassis_id, hostname, mgmt_ip, description, firmware, vendor))
conn.commit() conn.commit()
conn.close() conn.close()
+25 -1
View File
@@ -169,6 +169,9 @@ var Xr=function(e){if(!(this instanceof Xr))return new Xr(e);this.id="Thenable/1
} }
.btn-primary:hover { background: #3a7de8; } .btn-primary:hover { background: #3a7de8; }
.btn-primary:disabled { background: var(--surface2); color: var(--text-dim); cursor: not-allowed; } .btn-primary:disabled { background: var(--surface2); color: var(--text-dim); cursor: not-allowed; }
.btn-danger { background: #c0392b; color: white; }
.btn-danger:hover { background: #e74c3c; }
.btn-danger:disabled { background: var(--surface2); color: var(--text-dim); cursor: not-allowed; }
.btn-sm { .btn-sm {
padding: 5px 10px; padding: 5px 10px;
@@ -435,6 +438,12 @@ var Xr=function(e){if(!(this instanceof Xr))return new Xr(e);this.id="Thenable/1
<svg width="14" height="14" fill="none" stroke="currentColor" stroke-width="2.5" viewBox="0 0 24 24"><path d="M12 2C6 2 2 7 2 12s4 10 10 10 10-4.5 10-10S18 2 12 2zm0 4c1.1 0 2 .9 2 2s-.9 2-2 2-2-.9-2-2 .9-2 2-2zm4 12H8v-1c0-2.7 5.3-4 8-4v5z"/></svg> <svg width="14" height="14" fill="none" stroke="currentColor" stroke-width="2.5" viewBox="0 0 24 24"><path d="M12 2C6 2 2 7 2 12s4 10 10 10 10-4.5 10-10S18 2 12 2zm0 4c1.1 0 2 .9 2 2s-.9 2-2 2-2-.9-2-2 .9-2 2-2zm4 12H8v-1c0-2.7 5.3-4 8-4v5z"/></svg>
Scan GW Scan GW
</button> </button>
<button class="btn btn-danger" id="clearScanBtn" onclick="clearAndRescan()">
<svg width="14" height="14" fill="none" stroke="currentColor" stroke-width="2.5" viewBox="0 0 24 24">
<polyline points="3 6 5 6 21 6"/><path d="M19 6l-1 14H6L5 6"/><path d="M10 11v6m4-6v6"/><path d="M9 6V4h6v2"/>
</svg>
Clear &amp; Rescan
</button>
</div> </div>
</header> </header>
@@ -724,9 +733,13 @@ function showNodeDetail(data) {
<span class="detail-val">${data.chassis_id || '—'}</span> <span class="detail-val">${data.chassis_id || '—'}</span>
</div> </div>
<div class="detail-row"> <div class="detail-row">
<span class="detail-key">Description</span> <span class="detail-key">Model</span>
<span class="detail-val">${data.description || '—'}</span> <span class="detail-val">${data.description || '—'}</span>
</div> </div>
<div class="detail-row">
<span class="detail-key">Firmware</span>
<span class="detail-val">${data.firmware || '—'}</span>
</div>
<div class="detail-row"> <div class="detail-row">
<span class="detail-key">Last Seen</span> <span class="detail-key">Last Seen</span>
<span class="detail-val">${data.last_seen || '—'}</span> <span class="detail-val">${data.last_seen || '—'}</span>
@@ -793,6 +806,15 @@ async function clearLayout() {
await fetch('/api/layout', { method: 'DELETE' }); await fetch('/api/layout', { method: 'DELETE' });
alert('Layout cleared.'); alert('Layout cleared.');
} }
async function clearAndRescan() {
if (!confirm('This will delete all switch and link data, then run a full scan. Continue?')) return;
const btn = document.getElementById('clearScanBtn');
btn.disabled = true;
const res = await fetch('/api/scan/clear', { method: 'POST' });
if (res.status === 409) { alert('Scan already running'); btn.disabled = false; return; }
startPolling();
}
async function triggerScan(dept) { async function triggerScan(dept) {
const url = dept === 'elec' ? '/api/scan/elec' const url = dept === 'elec' ? '/api/scan/elec'
: dept === 'gw' ? '/api/scan/gw' : dept === 'gw' ? '/api/scan/gw'
@@ -823,6 +845,7 @@ async function pollStatus() {
scanBtn.disabled = true; scanBtn.disabled = true;
document.getElementById('scanElecBtn').disabled = true; document.getElementById('scanElecBtn').disabled = true;
document.getElementById('scanGwBtn').disabled = true; document.getElementById('scanGwBtn').disabled = true;
document.getElementById('clearScanBtn').disabled = true;
progressWrap.classList.add('visible'); progressWrap.classList.add('visible');
statusBar.classList.add('visible'); statusBar.classList.add('visible');
@@ -848,6 +871,7 @@ async function pollStatus() {
scanBtn.disabled = false; scanBtn.disabled = false;
document.getElementById('scanElecBtn').disabled = false; document.getElementById('scanElecBtn').disabled = false;
document.getElementById('scanGwBtn').disabled = false; document.getElementById('scanGwBtn').disabled = false;
document.getElementById('clearScanBtn').disabled = false;
progressBar.style.width = '100%'; progressBar.style.width = '100%';
setTimeout(() => { setTimeout(() => {
progressWrap.classList.remove('visible'); progressWrap.classList.remove('visible');
+129 -11
View File
@@ -1,7 +1,15 @@
# parser.py - Parse 'show lldp neighbors' output from FS switches # parser.py - Parse LLDP output from FS, HP/Aruba ProCurve, and Dell switches
import re import re
def normalize_mac(mac_str):
"""Normalize any MAC format to XX-XX-XX-XX-XX-XX uppercase (e.g. 649d99-aa5100 → 64-9D-99-AA-51-00)."""
clean = re.sub(r'[:\-\.\s]', '', mac_str).upper()
if len(clean) != 12:
return mac_str.strip()
return '-'.join(clean[i:i+2] for i in range(0, 12, 2))
def shorten_interface(iface): def shorten_interface(iface):
"""GigabitEthernet 1/9 -> Gi1/9, TenGigabitEthernet 1/1 -> Te1/1 etc.""" """GigabitEthernet 1/9 -> Gi1/9, TenGigabitEthernet 1/1 -> Te1/1 etc."""
replacements = [ replacements = [
@@ -67,6 +75,101 @@ def parse_lldp_neighbors(raw_output, local_chassis_id, local_hostname, local_mgm
return neighbors return neighbors
def parse_aruba_procurve_local(raw_output):
"""Parse 'show lldp info local-device' from HP/Aruba switch into a dict."""
chassis_id = ''
system_name = ''
system_desc = ''
mgmt_ip = ''
for line in raw_output.splitlines():
m = re.search(r'Chassis Id\s*:\s*(.+)', line, re.IGNORECASE)
if m:
chassis_id = normalize_mac(m.group(1).strip())
m = re.search(r'System Name\s*:\s*(.+)', line, re.IGNORECASE)
if m:
system_name = m.group(1).strip()
m = re.search(r'System Description\s*:\s*(.+)', line, re.IGNORECASE)
if m:
system_desc = m.group(1).strip()
m = re.search(r'Address\s*:\s*([\d\.]+)', line, re.IGNORECASE)
if m and '.' in m.group(1):
mgmt_ip = m.group(1).strip()
return {'chassis_id': chassis_id, 'system_name': system_name,
'system_desc': system_desc, 'mgmt_ip': mgmt_ip}
def parse_aruba_procurve_neighbors(raw_output):
"""
Parse 'show lldp info remote-device' tabular output from HP/Aruba switch.
Columns: LocalPort | ChassisId PortId PortDescr SysName
"""
neighbors = []
in_data = False
for line in raw_output.splitlines():
if re.match(r'\s*-+\s*\+', line):
in_data = True
continue
if not in_data or '|' not in line or not line.strip():
continue
left, right = line.split('|', 1)
local_port = left.strip()
parts = right.split()
if len(parts) < 4:
continue
chassis_id = normalize_mac(parts[0])
port_id = parts[1]
sys_name = parts[3]
neighbors.append({
'local_port': local_port,
'chassis_id': chassis_id,
'port_id': port_id,
'port_desc': '',
'system_name': sys_name,
'system_desc': '',
'mgmt_ip': '',
'capabilities': '',
'remote_port': port_id,
})
return neighbors
def parse_neighbor_description(system_desc):
"""
Extract (model, firmware) from an LLDP system description string.
Handles Dell OS10, HP ProCurve, and Aruba ArubaOS-Switch descriptions.
Returns ('', '') if unrecognized.
"""
if not system_desc:
return '', ''
# Dell OS10: "...System Type: S4112F-ON...OS Version: 10.5.4.7..."
if 'Dell' in system_desc or 'OS10' in system_desc:
model_m = re.search(r'System Type\s*:\s*([\w\-]+)', system_desc)
fw_m = re.search(r'OS Version\s*:\s*([\d\s\.]+)', system_desc)
model = f"Dell {model_m.group(1)}" if model_m else 'Dell'
firmware = re.sub(r'\s+', '', fw_m.group(1)).strip('.') if fw_m else ''
return model, firmware
# HP ProCurve / Aruba ArubaOS-Switch:
# "Aruba JL258A 2930F-8G-PoE+-2SFP+ Switch, revision WC.16.10.0012, ROM ..."
# "HP J9576A 3800-24G-PoE+-2SFP+ Switch, revision K.16.02.0019, ROM ..."
if re.match(r'(?:Aruba|HP)\s', system_desc, re.IGNORECASE):
model_m = re.match(r'((?:Aruba|HP)\s+\S+\s+[\w\-\+]+)', system_desc, re.IGNORECASE)
fw_m = re.search(r'revision\s+(\S+?)(?:[,\s]|$)', system_desc)
model = model_m.group(1) if model_m else system_desc.split(',')[0]
firmware = fw_m.group(1) if fw_m else ''
return model, firmware
return '', ''
def parse_hostname_from_prompt(prompt_line): def parse_hostname_from_prompt(prompt_line):
"""Extract hostname from CLI prompt like 'ls-vhls-sw01#'""" """Extract hostname from CLI prompt like 'ls-vhls-sw01#'"""
m = re.match(r'^([A-Za-z0-9_\-]+)[>#]', prompt_line.strip()) m = re.match(r'^([A-Za-z0-9_\-]+)[>#]', prompt_line.strip())
@@ -75,14 +178,29 @@ def parse_hostname_from_prompt(prompt_line):
def parse_mgmt_ip_from_interfaces(raw_output): def parse_mgmt_ip_from_interfaces(raw_output):
""" """
Parse 'show ip interface brief' to find management VLAN IP. Parse 'show ip interface brief' to find management IP.
Looks for Vlan interfaces with an IP assigned. Handles 'Vlan100' and 'Vlan 100' (space-separated) formats.
Returns first Vlan IP found (typically the management VLAN). Prefers Vlan interfaces; falls back to Management interfaces.
""" """
lines = raw_output.splitlines() IPV4 = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
for line in lines: fallback = None
# Match lines like: Vlan100 192.168.1.10 YES ... for line in raw_output.splitlines():
m = re.match(r'\s*(Vlan\S+)\s+([\d\.]+)\s+', line, re.IGNORECASE) line_stripped = line.strip()
if m and not m.group(2).startswith('0.0.0.0'): if not line_stripped:
return m.group(2) continue
return None # Match lines belonging to a Vlan or Management interface
iface_m = re.match(r'(Vlan|Management|Mgmt)\s*\S*', line_stripped, re.IGNORECASE)
if not iface_m:
continue
# Extract first valid IPv4 address anywhere on the line
ip_m = IPV4.search(line)
if not ip_m:
continue
ip = ip_m.group(1)
if ip.startswith('0.0.0.0') or ip.startswith('127.'):
continue
if re.match(r'Vlan', iface_m.group(1), re.IGNORECASE):
return ip
if not fallback:
fallback = ip
return fallback
+8 -1
View File
@@ -6,6 +6,7 @@ from db import (
upsert_switch, upsert_link, clear_links, upsert_switch, upsert_link, clear_links,
log_scan_start, log_scan_finish, merge_duplicate_switches log_scan_start, log_scan_finish, merge_duplicate_switches
) )
from parser import parse_neighbor_description
from exports import run_all_exports from exports import run_all_exports
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -75,15 +76,21 @@ def run_scan(dept: str = None):
hostname=result["hostname"], hostname=result["hostname"],
mgmt_ip=result["mgmt_ip"], mgmt_ip=result["mgmt_ip"],
description=result.get("description", ""), description=result.get("description", ""),
firmware=result.get("firmware", ""),
vendor=result.get("vendor", ""),
) )
for neighbor in result["neighbors"]: for neighbor in result["neighbors"]:
if neighbor.get("chassis_id") and neighbor.get("system_name"): if neighbor.get("chassis_id") and neighbor.get("system_name"):
nbr_model, nbr_firmware = parse_neighbor_description(
neighbor.get("system_desc", "")
)
upsert_switch( upsert_switch(
chassis_id=neighbor["chassis_id"], chassis_id=neighbor["chassis_id"],
hostname=neighbor["system_name"], hostname=neighbor["system_name"],
mgmt_ip=neighbor.get("mgmt_ip", ""), mgmt_ip=neighbor.get("mgmt_ip", ""),
description=neighbor.get("system_desc", ""), description=nbr_model, # empty for FS neighbors; direct scan fills it in
firmware=nbr_firmware,
) )
upsert_link( upsert_link(
chassis_a=result["chassis_id"], chassis_a=result["chassis_id"],
+120 -61
View File
@@ -1,41 +1,33 @@
# ssh_client.py - Netmiko SSH connections to FS switches # ssh_client.py - SSH connections to FS, HP/Aruba ProCurve, and Dell switches
import re
import logging import logging
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from netmiko import ConnectHandler, NetmikoTimeoutException, NetmikoAuthenticationException from netmiko import ConnectHandler, NetmikoTimeoutException, NetmikoAuthenticationException
from config import SSH_USERNAME, SSH_PASSWORD, SSH_PORT, SSH_TIMEOUT, DEVICE_TYPE from config import SSH_USERNAME, SSH_PASSWORD, SSH_PORT, SSH_TIMEOUT, DEVICE_TYPE
from parser import parse_lldp_neighbors, parse_mgmt_ip_from_interfaces from parser import (parse_lldp_neighbors, parse_mgmt_ip_from_interfaces,
parse_aruba_procurve_local, parse_aruba_procurve_neighbors,
normalize_mac)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def connect_and_query(ip): def connect_and_query(ip):
"""
SSH into a switch, run LLDP + interface commands.
Returns dict with switch info and neighbors, or error info.
"""
device = { device = {
"device_type": DEVICE_TYPE, "device_type": DEVICE_TYPE,
"host": ip, "host": ip,
"username": SSH_USERNAME, "username": SSH_USERNAME,
"password": SSH_PASSWORD, "password": SSH_PASSWORD,
"port": SSH_PORT, "port": SSH_PORT,
"timeout": SSH_TIMEOUT, "timeout": SSH_TIMEOUT,
"global_delay_factor": 2, "global_delay_factor": 2,
"fast_cli": False, "fast_cli": False,
"conn_timeout": SSH_TIMEOUT, "conn_timeout": SSH_TIMEOUT,
# "ssh_config_file": None,
# "disabled_algorithms": {},
# "transport": "paramiko",
} }
# FS switches use legacy ssh-rsa keys — must be explicitly allowed
import paramiko
_orig_connect = ConnectHandler.__init__
try: try:
logger.info(f"Connecting to {ip}...") logger.info(f"Connecting to {ip}...")
# Patch paramiko transport to allow legacy key algorithms # Allow legacy ssh-rsa keys used by FS switches
import paramiko.transport as _pt import paramiko.transport as _pt
_orig_preferred_keys = _pt.Transport._preferred_keys _orig_preferred_keys = _pt.Transport._preferred_keys
_pt.Transport._preferred_keys = ( _pt.Transport._preferred_keys = (
@@ -45,40 +37,30 @@ def connect_and_query(ip):
) )
with ConnectHandler(**device) as conn: with ConnectHandler(**device) as conn:
# Restore after connect
_pt.Transport._preferred_keys = _orig_preferred_keys _pt.Transport._preferred_keys = _orig_preferred_keys
# Get hostname from prompt
hostname = conn.find_prompt().replace('#', '').replace('>', '').strip() hostname = conn.find_prompt().replace('#', '').replace('>', '').strip()
version_output = conn.send_command("show version", read_timeout=30)
vendor = _detect_vendor(version_output)
# Get LLDP neighbors if vendor == 'aruba_procurve':
lldp_output = conn.send_command("show lldp neighbors", read_timeout=30) chassis_id, mgmt_ip, model, firmware, neighbors = _query_aruba(conn, version_output, ip)
else:
chassis_id, mgmt_ip, model, firmware, neighbors = _query_fs(conn, version_output, ip)
# Get management IP from interface brief logger.info(f" {hostname} ({ip}) [{vendor}]: {len(neighbors)} neighbors")
intf_output = conn.send_command("show ip interface brief", read_timeout=30)
mgmt_ip = parse_mgmt_ip_from_interfaces(intf_output) or ip
# Get chassis ID from LLDP local info return {
local_info_output = conn.send_command("show lldp local-information", read_timeout=30) "success": True,
chassis_id = _extract_local_chassis(local_info_output) or ip "ip": ip,
"hostname": hostname,
# Parse neighbors "chassis_id": chassis_id,
neighbors = parse_lldp_neighbors(lldp_output, chassis_id, hostname, mgmt_ip) "mgmt_ip": mgmt_ip,
"description": model,
# Get system description "firmware": firmware,
sys_desc = _extract_system_desc(local_info_output) "vendor": vendor,
"neighbors": neighbors,
logger.info(f" {hostname} ({ip}): {len(neighbors)} neighbors found") }
return {
"success": True,
"ip": ip,
"hostname": hostname,
"chassis_id": chassis_id,
"mgmt_ip": mgmt_ip,
"description": sys_desc,
"neighbors": neighbors,
}
except NetmikoAuthenticationException: except NetmikoAuthenticationException:
logger.error(f"Auth failed for {ip}") logger.error(f"Auth failed for {ip}")
@@ -91,27 +73,104 @@ def connect_and_query(ip):
return {"success": False, "ip": ip, "error": str(e)} return {"success": False, "ip": ip, "error": str(e)}
def _query_fs(conn, version_output, ip):
"""Run FS/IES switch-specific commands and return (chassis_id, mgmt_ip, model, firmware, neighbors)."""
intf_output = conn.send_command("show ip interface brief", read_timeout=30)
local_info_output = conn.send_command("show lldp local-information", read_timeout=30)
lldp_output = conn.send_command("show lldp neighbors", read_timeout=30)
mgmt_ip = parse_mgmt_ip_from_interfaces(intf_output) or ip
chassis_id = (_extract_local_chassis(local_info_output)
or _extract_mac_from_version(version_output)
or ip)
chassis_id = normalize_mac(chassis_id) if '-' in chassis_id or ':' in chassis_id else chassis_id
model = _fs_model(version_output) or _extract_system_desc(local_info_output)
firmware = _fs_firmware(version_output)
neighbors = parse_lldp_neighbors(lldp_output, chassis_id, '', mgmt_ip)
return chassis_id, mgmt_ip, model, firmware, neighbors
def _query_aruba(conn, version_output, ip):
"""Run HP/Aruba ProCurve-specific commands and return (chassis_id, mgmt_ip, model, firmware, neighbors)."""
local_output = conn.send_command("show lldp info local-device", read_timeout=30)
remote_output = conn.send_command("show lldp info remote-device", read_timeout=30)
local_info = parse_aruba_procurve_local(local_output)
chassis_id = local_info['chassis_id'] or _extract_mac_from_version(version_output) or ip
mgmt_ip = local_info['mgmt_ip'] or ip
model = _aruba_model(local_info.get('system_desc', ''))
firmware = _aruba_firmware(version_output)
neighbors = parse_aruba_procurve_neighbors(remote_output)
return chassis_id, mgmt_ip, model, firmware, neighbors
# ── Vendor detection ──────────────────────────────────────────────────────────
def _detect_vendor(version_output):
if 'Dell EMC Networking OS10' in version_output or 'Dell OS10' in version_output:
return 'dell_os10'
if '/ws/swbuildm/' in version_output:
return 'aruba_procurve'
return 'fs'
# ── FS/IES extractors ─────────────────────────────────────────────────────────
def _extract_local_chassis(output): def _extract_local_chassis(output):
import re m = re.search(r'Chassis ID\s*:\s*(.+)', output)
m = re.search(r'Chassis ID\s*:\s*([0-9A-Fa-f\-:]+)', output)
return m.group(1).strip() if m else None return m.group(1).strip() if m else None
def _extract_mac_from_version(output):
m = re.search(r'MAC Address\s*:\s*([0-9A-Fa-f]{2}(?:[:\-][0-9A-Fa-f]{2}){5})', output, re.IGNORECASE)
return m.group(1).strip() if m else None
def _fs_model(output):
m = re.search(r'(?:Model Name|Product)\s*:\s*([\w\-]+)', output, re.IGNORECASE)
return f"FS {m.group(1)}" if m else ''
def _fs_firmware(output):
fw_m = re.search(r'Software Version\s*:\s*(\S+)', output, re.IGNORECASE)
date_m = re.search(r'Build Date\s*:\s*(\d{2})-(\d{2})-(\d{4})T', output, re.IGNORECASE)
if not fw_m:
return ''
version = fw_m.group(1)
if date_m:
day, month, year = date_m.group(1), date_m.group(2), date_m.group(3)
return f"{version} ({year}-{month}-{day})"
return version
def _extract_system_desc(output): def _extract_system_desc(output):
import re m = re.search(r'System Description\s*:\s*(.*?)(?=\n\S|\Z)', output, re.DOTALL | re.IGNORECASE)
m = re.search(r'System Description\s*:\s*(.+)', output) return ' '.join(m.group(1).split()) if m else ''
# ── HP/Aruba extractors ───────────────────────────────────────────────────────
def _aruba_model(system_desc):
"""Extract model from LLDP system description, e.g. 'Aruba JL258A 2930F-8G-PoE+-2SFP+ Switch'."""
m = re.match(r'((?:Aruba|HP)\s+\S+\s+[\w\-\+]+)', system_desc, re.IGNORECASE)
return m.group(1) if m else system_desc.split(',')[0] if system_desc else ''
def _aruba_firmware(version_output):
"""Extract firmware version from image stamp, e.g. 'WC.16.10.0012'."""
m = re.search(r'^\s+([A-Z]+\.\d+\.\d+\.\d+)\s*$', version_output, re.MULTILINE)
return m.group(1).strip() if m else '' return m.group(1).strip() if m else ''
# ── Scan orchestration ────────────────────────────────────────────────────────
def scan_all_switches(ip_list, progress_callback=None, max_workers=10): def scan_all_switches(ip_list, progress_callback=None, max_workers=10):
"""
Scan all switches in parallel.
progress_callback(done, total, current_ip, result) called after each switch.
Returns list of results.
"""
results = [] results = []
total = len(ip_list) total = len(ip_list)
done = 0 done = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor: with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_ip = {executor.submit(connect_and_query, ip): ip for ip in ip_list} future_to_ip = {executor.submit(connect_and_query, ip): ip for ip in ip_list}
@@ -122,7 +181,7 @@ def scan_all_switches(ip_list, progress_callback=None, max_workers=10):
result = future.result() result = future.result()
except Exception as e: except Exception as e:
result = {"success": False, "ip": ip, "error": str(e)} result = {"success": False, "ip": ip, "error": str(e)}
import time; time.sleep(2) # 2s delay between switches to avoid RADIUS lockout import time; time.sleep(2) # avoid RADIUS lockout
results.append(result) results.append(result)
done += 1 done += 1