207 lines
7.7 KiB
Python
207 lines
7.7 KiB
Python
# parser.py - Parse LLDP output from FS, HP/Aruba ProCurve, and Dell switches
|
|
import re
|
|
|
|
|
|
def normalize_mac(mac_str):
|
|
"""Normalize any MAC format to XX-XX-XX-XX-XX-XX uppercase (e.g. 649d99-aa5100 → 64-9D-99-AA-51-00)."""
|
|
clean = re.sub(r'[:\-\.\s]', '', mac_str).upper()
|
|
if len(clean) != 12:
|
|
return mac_str.strip()
|
|
return '-'.join(clean[i:i+2] for i in range(0, 12, 2))
|
|
|
|
|
|
def shorten_interface(iface):
|
|
"""GigabitEthernet 1/9 -> Gi1/9, TenGigabitEthernet 1/1 -> Te1/1 etc."""
|
|
replacements = [
|
|
(r'GigabitEthernet\s*', 'Gi'),
|
|
(r'TenGigabitEthernet\s*', 'Te'),
|
|
(r'TwentyFiveGigE\s*', 'Twe'),
|
|
(r'FortyGigabitEthernet\s*', 'Fo'),
|
|
(r'HundredGigE\s*', 'Hu'),
|
|
(r'FastEthernet\s*', 'Fa'),
|
|
(r'Ethernet\s*', 'Eth'),
|
|
(r'mgmt\s*', 'mgmt'),
|
|
]
|
|
for pattern, short in replacements:
|
|
iface = re.sub(pattern, short, iface, flags=re.IGNORECASE)
|
|
return iface.strip()
|
|
|
|
|
|
def parse_lldp_neighbors(raw_output, local_chassis_id, local_hostname, local_mgmt_ip):
|
|
"""
|
|
Parse raw 'show lldp neighbors' output from an FS switch.
|
|
|
|
Returns:
|
|
neighbors: list of dicts with parsed neighbor info
|
|
local_info: dict with this switch's details (enriched from LLDP data if needed)
|
|
"""
|
|
neighbors = []
|
|
|
|
# Split output into per-neighbor blocks (blank line separated)
|
|
# Each block starts with "Local Interface"
|
|
blocks = re.split(r'\n\s*\n', raw_output.strip())
|
|
|
|
for block in blocks:
|
|
if not block.strip():
|
|
continue
|
|
if 'Local Interface' not in block and 'Local Port' not in block:
|
|
continue
|
|
|
|
neighbor = {}
|
|
|
|
def extract(pattern, text, default=''):
|
|
m = re.search(pattern, text, re.IGNORECASE)
|
|
return m.group(1).strip() if m else default
|
|
|
|
neighbor['local_port'] = shorten_interface(extract(r'Local Interface\s*:\s*(.+)', block))
|
|
neighbor['chassis_id'] = extract(r'Chassis ID\s*:\s*(.+)', block)
|
|
neighbor['port_id'] = extract(r'Port ID\s*:\s*(.+)', block)
|
|
neighbor['port_desc'] = shorten_interface(extract(r'Port Description\s*:\s*(.+)', block))
|
|
neighbor['system_name'] = extract(r'System Name\s*:\s*(.+)', block)
|
|
neighbor['system_desc'] = extract(r'System Description\s*:\s*(.+)', block)
|
|
# FS switches report Management Address as MAC (e.g. '64-9D-99-AA-50-B0 (Other)')
|
|
# or as IP (e.g. '10.214.0.192'). Extract only valid IPv4.
|
|
raw_mgmt = extract(r'Management Address\s*:\s*([\d\.A-Fa-f\-:]+)', block)
|
|
ipv4_match = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', raw_mgmt)
|
|
neighbor['mgmt_ip'] = ipv4_match.group(1) if ipv4_match else ''
|
|
neighbor['capabilities'] = extract(r'System Capabilities\s*:\s*(.+)', block)
|
|
|
|
# Only include bridge/switch neighbors (skip phones, APs listed as endpoints)
|
|
if neighbor['chassis_id'] and neighbor['system_name']:
|
|
# Use port_desc as remote port if available, fallback to port_id
|
|
neighbor['remote_port'] = neighbor['port_desc'] if neighbor['port_desc'] else neighbor['port_id']
|
|
neighbors.append(neighbor)
|
|
|
|
return neighbors
|
|
|
|
|
|
def parse_aruba_procurve_local(raw_output):
|
|
"""Parse 'show lldp info local-device' from HP/Aruba switch into a dict."""
|
|
chassis_id = ''
|
|
system_name = ''
|
|
system_desc = ''
|
|
mgmt_ip = ''
|
|
|
|
for line in raw_output.splitlines():
|
|
m = re.search(r'Chassis Id\s*:\s*(.+)', line, re.IGNORECASE)
|
|
if m:
|
|
chassis_id = normalize_mac(m.group(1).strip())
|
|
m = re.search(r'System Name\s*:\s*(.+)', line, re.IGNORECASE)
|
|
if m:
|
|
system_name = m.group(1).strip()
|
|
m = re.search(r'System Description\s*:\s*(.+)', line, re.IGNORECASE)
|
|
if m:
|
|
system_desc = m.group(1).strip()
|
|
m = re.search(r'Address\s*:\s*([\d\.]+)', line, re.IGNORECASE)
|
|
if m and '.' in m.group(1):
|
|
mgmt_ip = m.group(1).strip()
|
|
|
|
return {'chassis_id': chassis_id, 'system_name': system_name,
|
|
'system_desc': system_desc, 'mgmt_ip': mgmt_ip}
|
|
|
|
|
|
def parse_aruba_procurve_neighbors(raw_output):
|
|
"""
|
|
Parse 'show lldp info remote-device' tabular output from HP/Aruba switch.
|
|
Columns: LocalPort | ChassisId PortId PortDescr SysName
|
|
"""
|
|
neighbors = []
|
|
in_data = False
|
|
|
|
for line in raw_output.splitlines():
|
|
if re.match(r'\s*-+\s*\+', line):
|
|
in_data = True
|
|
continue
|
|
if not in_data or '|' not in line or not line.strip():
|
|
continue
|
|
|
|
left, right = line.split('|', 1)
|
|
local_port = left.strip()
|
|
parts = right.split()
|
|
if len(parts) < 4:
|
|
continue
|
|
|
|
chassis_id = normalize_mac(parts[0])
|
|
port_id = parts[1]
|
|
sys_name = parts[3]
|
|
|
|
neighbors.append({
|
|
'local_port': local_port,
|
|
'chassis_id': chassis_id,
|
|
'port_id': port_id,
|
|
'port_desc': '',
|
|
'system_name': sys_name,
|
|
'system_desc': '',
|
|
'mgmt_ip': '',
|
|
'capabilities': '',
|
|
'remote_port': port_id,
|
|
})
|
|
|
|
return neighbors
|
|
|
|
|
|
def parse_neighbor_description(system_desc):
|
|
"""
|
|
Extract (model, firmware) from an LLDP system description string.
|
|
Handles Dell OS10, HP ProCurve, and Aruba ArubaOS-Switch descriptions.
|
|
Returns ('', '') if unrecognized.
|
|
"""
|
|
if not system_desc:
|
|
return '', ''
|
|
|
|
# Dell OS10: "...System Type: S4112F-ON...OS Version: 10.5.4.7..."
|
|
if 'Dell' in system_desc or 'OS10' in system_desc:
|
|
model_m = re.search(r'System Type\s*:\s*([\w\-]+)', system_desc)
|
|
fw_m = re.search(r'OS Version\s*:\s*([\d\s\.]+)', system_desc)
|
|
model = f"Dell {model_m.group(1)}" if model_m else 'Dell'
|
|
firmware = re.sub(r'\s+', '', fw_m.group(1)).strip('.') if fw_m else ''
|
|
return model, firmware
|
|
|
|
# HP ProCurve / Aruba ArubaOS-Switch:
|
|
# "Aruba JL258A 2930F-8G-PoE+-2SFP+ Switch, revision WC.16.10.0012, ROM ..."
|
|
# "HP J9576A 3800-24G-PoE+-2SFP+ Switch, revision K.16.02.0019, ROM ..."
|
|
if re.match(r'(?:Aruba|HP)\s', system_desc, re.IGNORECASE):
|
|
model_m = re.match(r'((?:Aruba|HP)\s+\S+\s+[\w\-\+]+)', system_desc, re.IGNORECASE)
|
|
fw_m = re.search(r'revision\s+(\S+?)(?:[,\s]|$)', system_desc)
|
|
model = model_m.group(1) if model_m else system_desc.split(',')[0]
|
|
firmware = fw_m.group(1) if fw_m else ''
|
|
return model, firmware
|
|
|
|
return '', ''
|
|
|
|
|
|
def parse_hostname_from_prompt(prompt_line):
|
|
"""Extract hostname from CLI prompt like 'ls-vhls-sw01#'"""
|
|
m = re.match(r'^([A-Za-z0-9_\-]+)[>#]', prompt_line.strip())
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def parse_mgmt_ip_from_interfaces(raw_output):
|
|
"""
|
|
Parse 'show ip interface brief' to find management IP.
|
|
Handles 'Vlan100' and 'Vlan 100' (space-separated) formats.
|
|
Prefers Vlan interfaces; falls back to Management interfaces.
|
|
"""
|
|
IPV4 = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
|
|
fallback = None
|
|
for line in raw_output.splitlines():
|
|
line_stripped = line.strip()
|
|
if not line_stripped:
|
|
continue
|
|
# Match lines belonging to a Vlan or Management interface
|
|
iface_m = re.match(r'(Vlan|Management|Mgmt)\s*\S*', line_stripped, re.IGNORECASE)
|
|
if not iface_m:
|
|
continue
|
|
# Extract first valid IPv4 address anywhere on the line
|
|
ip_m = IPV4.search(line)
|
|
if not ip_m:
|
|
continue
|
|
ip = ip_m.group(1)
|
|
if ip.startswith('0.0.0.0') or ip.startswith('127.'):
|
|
continue
|
|
if re.match(r'Vlan', iface_m.group(1), re.IGNORECASE):
|
|
return ip
|
|
if not fallback:
|
|
fallback = ip
|
|
return fallback
|