#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MAC Watcher Polls a switch via SNMP, alerts on unknown MACs, and exposes a Prometheus metrics endpoint reflecting the last SNMP readout. Author: Hendrik Schutter, mail@hendrikschutter.com """ import json import logging import re import smtplib import sys import threading import time import email.utils from email.mime.text import MIMEText from http.server import BaseHTTPRequestHandler, HTTPServer from subprocess import PIPE, Popen from datetime import datetime from mac_vendor_lookup import MacLookup import config # --------------------------------------------------------------------------- # MAC normalization helpers # --------------------------------------------------------------------------- MAC_RE = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$") def normalize_mac(mac: str) -> str: """Return MAC address in canonical uppercase colon-separated form.""" return mac.strip().upper() def normalize_mac_list(macs: list[str]) -> list[str]: """Normalize a list of MAC addresses, remove duplicates, preserve order.""" seen: set[str] = set() result: list[str] = [] for mac in macs: n = normalize_mac(mac) if n not in seen: seen.add(n) result.append(n) return result def is_valid_mac(mac: str) -> bool: return bool(MAC_RE.match(mac)) # --------------------------------------------------------------------------- # Logging setup # --------------------------------------------------------------------------- def setup_logging(): log_level = getattr(logging, config.log_level.upper(), logging.INFO) logging.basicConfig( level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) # --------------------------------------------------------------------------- # MAC vendor cache (persistent JSON file, not tracked by git) # --------------------------------------------------------------------------- class VendorCache: """Persistent JSON-backed cache for MAC vendor lookups.""" def __init__(self, cache_file: str): self._file = cache_file self._data: dict[str, str] = {} self._hits: int = 0 self._misses: int = 0 self._lock = threading.Lock() self._logger = logging.getLogger("VendorCache") self._load() def _load(self): try: with open(self._file, "r") as f: self._data = json.load(f) self._logger.info( f"Loaded {len(self._data)} vendor cache entries from {self._file}" ) except FileNotFoundError: self._logger.info( f"Cache file {self._file} not found, starting with empty cache" ) self._data = {} except json.JSONDecodeError as e: self._logger.warning(f"Cache file corrupt, resetting: {e}") self._data = {} def _save(self): try: with open(self._file, "w") as f: json.dump(self._data, f, indent=2) except OSError as e: self._logger.error(f"Failed to write cache file: {e}") def lookup(self, mac: str) -> str: """Return vendor string for mac, querying mac_vendor_lookup if not cached.""" mac_upper = normalize_mac(mac) with self._lock: if mac_upper in self._data: self._hits += 1 return self._data[mac_upper] # Not in cache — query library (blocking, disk I/O) self._misses += 1 try: vendor = MacLookup().lookup(mac) except Exception: vendor = "Unknown vendor" with self._lock: self._data[mac_upper] = vendor self._save() return vendor @property def size(self) -> int: with self._lock: return len(self._data) @property def hits(self) -> int: with self._lock: return self._hits @property def misses(self) -> int: with self._lock: return self._misses # --------------------------------------------------------------------------- # SNMP query # --------------------------------------------------------------------------- def query_mac_table() -> list[str]: """ Query the switch MAC address table via SNMP (OID 1.3.6.1.2.1.17.4.3.1.1). Returns a deduplicated list of normalized MAC strings "AA:BB:CC:DD:EE:FF". """ logger = logging.getLogger("SNMPQuery") mac_addresses: list[str] = [] cmd = [ config.snmpwalk_bin, "-v", "2c", "-O", "vqe", "-c", config.switch_snmp_community, config.switch_ip_addr, "1.3.6.1.2.1.17.4.3.1.1", ] try: with Popen(cmd, stdout=PIPE, stderr=PIPE) as process: stdout, stderr = process.communicate() if process.returncode != 0: logger.error( f"snmpwalk failed (rc={process.returncode}): " f"{stderr.decode().strip()}" ) return [] for line in stdout.decode("utf-8").splitlines(): mac = line.replace(" ", ":").replace('"', "").strip().rstrip(":") mac = normalize_mac(mac) if is_valid_mac(mac): mac_addresses.append(mac) except FileNotFoundError: logger.error( f"snmpwalk binary not found at '{config.snmpwalk_bin}'. " f"Install snmp or adjust snmpwalk_bin in config.py." ) except Exception as e: logger.error(f"Exception during SNMP query: {e}") result = normalize_mac_list(mac_addresses) logger.debug(f"SNMP returned {len(result)} unique MAC addresses") return result # --------------------------------------------------------------------------- # Email alert # --------------------------------------------------------------------------- def send_alert_mail(mac_addr: str, vendor: str): logger = logging.getLogger("EmailAlert") timestamp = datetime.now().strftime("%d.%m.%Y %H:%M:%S") body = ( f"New unknown MAC address detected.\n\n" f"Date: {timestamp}\n" f"MAC: {mac_addr}\n" f"Vendor: {vendor}\n" ) msg = MIMEText(body) msg["Subject"] = f"MAC-Watcher: unknown MAC {mac_addr} ({vendor})" msg["To"] = email.utils.formataddr((config.mail_to_name, config.mail_to_address)) msg["From"] = email.utils.formataddr((config.mail_from_name, config.mail_from_address)) try: server = smtplib.SMTP(config.mail_server_domain, config.mail_server_port, timeout=10) server.starttls() server.login(config.mail_from_address, config.mail_server_password) server.sendmail(config.mail_from_address, config.mail_to_address, msg.as_string()) server.quit() logger.info(f"Alert sent for {mac_addr}") except Exception as e: logger.error(f"Failed to send alert mail for {mac_addr}: {e}") # --------------------------------------------------------------------------- # Prometheus metrics # --------------------------------------------------------------------------- class MetricsServer: """ HTTP server exposing a /metrics endpoint for Prometheus. Trusted MACs are always emitted (0 when absent from last readout, 1 when present). Unknown MACs seen in the readout are appended with value 1. A human-readable label comment is written above each device_present line. Metric layout: # mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF",trusted="true"} 1 # Unknown mac_watcher_device_present{mac="11:22:33:44:55:66",trusted="false"} 1 mac_watcher_vendor_cache_size 42 mac_watcher_vendor_cache_hits_total 100 mac_watcher_vendor_cache_misses_total 5 mac_watcher_snmp_polls_total 30 mac_watcher_exporter_uptime_seconds 900 mac_watcher_exporter_requests_total 15 """ def __init__(self, vendor_cache: VendorCache, trusted_ordered: list[tuple[str, str]]): """ Args: vendor_cache: VendorCache instance. trusted_ordered: Ordered list of (mac_uppercase, label) tuples parsed from config.trusted_mac_addresses. """ self._vendor_cache = vendor_cache # Ordered list preserves config order in metrics output self._trusted_ordered: list[tuple[str, str]] = trusted_ordered # Set for fast membership checks self._trusted_set: set[str] = {mac for mac, _ in trusted_ordered} self._lock = threading.Lock() self._logger = logging.getLogger("MetricsServer") self.start_time = datetime.now() self.request_count: int = 0 self.snmp_poll_count: int = 0 # Last SNMP snapshot, replaced atomically on each poll. self._snapshot: set[str] = set() def update(self, current_macs: list[str]): """Replace the current snapshot with the latest SNMP readout.""" with self._lock: self.snmp_poll_count += 1 self._snapshot = set(current_macs) def _fmt_block(self, name: str, value, help_text: str, metric_type: str = "gauge", labels: dict[str, str] | None = None) -> list[str]: """Return HELP + TYPE + value lines for one metric.""" full_name = f"{config.exporter_prefix}{name}" lines = [ f"# HELP {full_name} {help_text}", f"# TYPE {full_name} {metric_type}", ] if labels: label_str = ",".join(f'{k}="{v}"' for k, v in labels.items()) lines.append(f"{full_name}{{{label_str}}} {value}") else: lines.append(f"{full_name} {value}") return lines def _generate_metrics(self) -> str: lines: list[str] = [] uptime = int((datetime.now() - self.start_time).total_seconds()) prefix = config.exporter_prefix # --- Exporter meta --- lines += self._fmt_block( "exporter_uptime_seconds", uptime, "Exporter uptime in seconds", ) lines += self._fmt_block( "exporter_requests_total", self.request_count, "Total number of /metrics requests", metric_type="counter", ) lines += self._fmt_block( "snmp_polls_total", self.snmp_poll_count, "Total number of completed SNMP polls", metric_type="counter", ) # --- Vendor cache statistics --- lines += self._fmt_block( "vendor_cache_size", self._vendor_cache.size, "Number of entries in the persistent vendor cache", ) lines += self._fmt_block( "vendor_cache_hits_total", self._vendor_cache.hits, "Total vendor cache hits", metric_type="counter", ) lines += self._fmt_block( "vendor_cache_misses_total", self._vendor_cache.misses, "Total vendor cache misses (required library lookup)", metric_type="counter", ) # --- Device presence --- # Trusted MACs are always present in the output (0 or 1). # Unknown MACs seen in the current readout follow with value 1. # A label comment above each line identifies the device by name. metric_name = f"{prefix}device_present" lines.append( f"# HELP {metric_name} " f"1 if the MAC address was present in the last SNMP readout, " f"0 if absent. Trusted MACs are always emitted." ) lines.append(f"# TYPE {metric_name} gauge") with self._lock: snapshot = set(self._snapshot) # Trusted MACs — always emitted, value derived from snapshot for mac, label in self._trusted_ordered: value = 1 if mac in snapshot else 0 lines.append(f"# {label}") lines.append(f'{metric_name}{{mac="{mac}",trusted="true"}} {value}') # Unknown MACs — only those in snapshot but not in trusted list for mac in sorted(snapshot - self._trusted_set): lines.append("# Unknown") lines.append(f'{metric_name}{{mac="{mac}",trusted="false"}} 1') return "\n".join(lines) + "\n" def create_handler(self): server_instance = self class RequestHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass # suppress default access log def do_GET(self): with server_instance._lock: server_instance.request_count += 1 if self.path == "/metrics": body = server_instance._generate_metrics().encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) elif self.path in ("/", "/health"): body = ( b"MAC Watcher" b"

MAC Watcher Prometheus Exporter

" b'

Metrics

' b"" ) self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) else: self.send_response(404) self.end_headers() return RequestHandler # --------------------------------------------------------------------------- # Main watcher loop # --------------------------------------------------------------------------- def watch(metrics_server: MetricsServer, vendor_cache: VendorCache, trusted: set[str]): """ Poll the switch in a loop. Push each readout to the MetricsServer. Send one email alert per unknown MAC (de-duplicated in memory). Args: metrics_server: receives each SNMP snapshot via update(). vendor_cache: MAC-to-vendor resolution with persistent cache. trusted: normalized uppercase set of trusted MAC addresses. """ logger = logging.getLogger("Watcher") alerted: set[str] = set() while True: macs = query_mac_table() metrics_server.update(macs) for mac in macs: if mac not in trusted and mac not in alerted: vendor = vendor_cache.lookup(mac) logger.warning(f"Unknown MAC detected: {mac} ({vendor})") alerted.add(mac) send_alert_mail(mac, vendor) time.sleep(config.snmp_poll_interval) # --------------------------------------------------------------------------- # Label parser # --------------------------------------------------------------------------- def _parse_trusted_labels() -> list[str]: """ Extract the inline comment labels from config.trusted_mac_addresses. Reads config.py source at runtime so labels stay in sync with the list without requiring a separate data structure. Falls back to the MAC address string itself if no comment is found for an entry. Returns a list of label strings aligned 1:1 with config.trusted_mac_addresses. """ import inspect import re as _re src = inspect.getsource(config) # Match each quoted MAC followed by an optional inline comment pairs = _re.findall( r'"([0-9A-Fa-f:]{17})"[^#\n]*#\s*(.+)', src, ) comment_map: dict[str, str] = { mac.upper(): label.strip() for mac, label in pairs } return [ comment_map.get(normalize_mac(mac), normalize_mac(mac)) for mac in config.trusted_mac_addresses ] # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): setup_logging() logger = logging.getLogger("Main") # Build ordered list of (mac_uppercase, label) from config. # The label is the inline comment text after the MAC address. # Normalization happens here once; all downstream code uses uppercase. trusted_ordered: list[tuple[str, str]] = [] seen: set[str] = set() for mac, label in zip( config.trusted_mac_addresses, _parse_trusted_labels(), ): mac_upper = normalize_mac(mac) if mac_upper not in seen: seen.add(mac_upper) trusted_ordered.append((mac_upper, label)) trusted_set: set[str] = {mac for mac, _ in trusted_ordered} logger.info("=" * 50) logger.info("MAC Watcher starting") logger.info(f"Switch: {config.switch_ip_addr}") logger.info(f"snmpwalk: {config.snmpwalk_bin}") logger.info(f"Poll interval: {config.snmp_poll_interval}s") logger.info(f"Trusted MACs: {len(trusted_ordered)}") logger.info(f"Exporter: http://{config.exporter_host}:{config.exporter_port}/metrics") logger.info("=" * 50) # Update local vendor DB on startup logger.info("Updating MAC vendor database...") try: MacLookup().update_vendors() logger.info("Vendor database updated") except Exception as e: logger.warning(f"Vendor database update failed (offline?): {e}") vendor_cache = VendorCache(config.vendor_cache_file) metrics_server = MetricsServer(vendor_cache, trusted_ordered) # Start watcher in background thread watcher_thread = threading.Thread( target=watch, args=(metrics_server, vendor_cache, trusted_set), daemon=True, name="Watcher", ) watcher_thread.start() # Start HTTP metrics server (blocking) handler = metrics_server.create_handler() try: http_server = HTTPServer((config.exporter_host, config.exporter_port), handler) logger.info( f"HTTP server listening on {config.exporter_host}:{config.exporter_port}" ) http_server.serve_forever() except KeyboardInterrupt: logger.info("Shutdown requested") except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) finally: logger.info("Shutdown complete") sys.exit(0) if __name__ == "__main__": main()