diff --git a/mac_watcher.py b/mac_watcher.py index 5a9389c..8fb78d4 100644 --- a/mac_watcher.py +++ b/mac_watcher.py @@ -231,39 +231,50 @@ class MetricsServer: """ HTTP server exposing a /metrics endpoint for Prometheus. - The device presence metric is a direct snapshot of the last SNMP readout. - Every MAC seen in that readout gets value 1; MACs from previous readouts - that are no longer present are dropped from the output entirely. - The trusted/unknown status is exposed as an additional label. + Trusted MACs are always emitted (0 when absent from last readout, 1 when + present). Unknown MACs seen in the readout are appended with value 1. + A human-readable label comment is written above each device_present line. Metric layout: - mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF", trusted="true"} 1 - mac_watcher_vendor_cache_size 42 - mac_watcher_vendor_cache_hits_total 100 - mac_watcher_vendor_cache_misses_total 5 - mac_watcher_snmp_polls_total 30 - mac_watcher_exporter_uptime_seconds 900 - mac_watcher_exporter_requests_total 15 + # + mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF",trusted="true"} 1 + # Unknown + mac_watcher_device_present{mac="11:22:33:44:55:66",trusted="false"} 1 + mac_watcher_vendor_cache_size 42 + mac_watcher_vendor_cache_hits_total 100 + mac_watcher_vendor_cache_misses_total 5 + mac_watcher_snmp_polls_total 30 + mac_watcher_exporter_uptime_seconds 900 + mac_watcher_exporter_requests_total 15 """ - def __init__(self, vendor_cache: VendorCache, trusted: set[str]): + def __init__(self, vendor_cache: VendorCache, + trusted_ordered: list[tuple[str, str]]): + """ + Args: + vendor_cache: VendorCache instance. + trusted_ordered: Ordered list of (mac_uppercase, label) tuples + parsed from config.trusted_mac_addresses. + """ self._vendor_cache = vendor_cache - self._trusted = trusted + # Ordered list preserves config order in metrics output + self._trusted_ordered: list[tuple[str, str]] = trusted_ordered + # Set for fast membership checks + self._trusted_set: set[str] = {mac for mac, _ in trusted_ordered} self._lock = threading.Lock() self._logger = logging.getLogger("MetricsServer") self.start_time = datetime.now() self.request_count: int = 0 self.snmp_poll_count: int = 0 - # Last SNMP snapshot: mac -> 1 (always 1 while in snapshot) - # Replaced atomically on each poll. - self._snapshot: list[str] = [] + # Last SNMP snapshot, replaced atomically on each poll. + self._snapshot: set[str] = set() def update(self, current_macs: list[str]): """Replace the current snapshot with the latest SNMP readout.""" with self._lock: self.snmp_poll_count += 1 - self._snapshot = list(current_macs) + self._snapshot = set(current_macs) def _fmt_block(self, name: str, value, help_text: str, metric_type: str = "gauge", @@ -318,22 +329,31 @@ class MetricsServer: metric_type="counter", ) - # --- Device presence snapshot --- - # HELP and TYPE appear once; one series per MAC in the last readout. + # --- Device presence --- + # Trusted MACs are always present in the output (0 or 1). + # Unknown MACs seen in the current readout follow with value 1. + # A label comment above each line identifies the device by name. metric_name = f"{prefix}device_present" lines.append( f"# HELP {metric_name} " - f"1 if the MAC address was present in the last SNMP readout" + f"1 if the MAC address was present in the last SNMP readout, " + f"0 if absent. Trusted MACs are always emitted." ) lines.append(f"# TYPE {metric_name} gauge") with self._lock: - snapshot = list(self._snapshot) + snapshot = set(self._snapshot) - for mac in snapshot: - trusted_label = "true" if mac in self._trusted else "false" - label_str = f'mac="{mac}",trusted="{trusted_label}"' - lines.append(f"{metric_name}{{{label_str}}} 1") + # Trusted MACs — always emitted, value derived from snapshot + for mac, label in self._trusted_ordered: + value = 1 if mac in snapshot else 0 + lines.append(f"# {label}") + lines.append(f'{metric_name}{{mac="{mac}",trusted="true"}} {value}') + + # Unknown MACs — only those in snapshot but not in trusted list + for mac in sorted(snapshot - self._trusted_set): + lines.append("# Unknown") + lines.append(f'{metric_name}{{mac="{mac}",trusted="false"}} 1') return "\n".join(lines) + "\n" @@ -408,6 +428,39 @@ def watch(metrics_server: MetricsServer, vendor_cache: VendorCache, time.sleep(config.snmp_poll_interval) +# --------------------------------------------------------------------------- +# Label parser +# --------------------------------------------------------------------------- + +def _parse_trusted_labels() -> list[str]: + """ + Extract the inline comment labels from config.trusted_mac_addresses. + + Reads config.py source at runtime so labels stay in sync with the list + without requiring a separate data structure. Falls back to the MAC + address string itself if no comment is found for an entry. + + Returns a list of label strings aligned 1:1 with + config.trusted_mac_addresses. + """ + import inspect + import re as _re + + src = inspect.getsource(config) + # Match each quoted MAC followed by an optional inline comment + pairs = _re.findall( + r'"([0-9A-Fa-f:]{17})"[^#\n]*#\s*(.+)', + src, + ) + comment_map: dict[str, str] = { + mac.upper(): label.strip() for mac, label in pairs + } + return [ + comment_map.get(normalize_mac(mac), normalize_mac(mac)) + for mac in config.trusted_mac_addresses + ] + + # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- @@ -416,15 +469,28 @@ def main(): setup_logging() logger = logging.getLogger("Main") - # Normalize trusted MAC list once at startup. - trusted_macs: set[str] = set(normalize_mac_list(config.trusted_mac_addresses)) + # Build ordered list of (mac_uppercase, label) from config. + # The label is the inline comment text after the MAC address. + # Normalization happens here once; all downstream code uses uppercase. + trusted_ordered: list[tuple[str, str]] = [] + seen: set[str] = set() + for mac, label in zip( + config.trusted_mac_addresses, + _parse_trusted_labels(), + ): + mac_upper = normalize_mac(mac) + if mac_upper not in seen: + seen.add(mac_upper) + trusted_ordered.append((mac_upper, label)) + + trusted_set: set[str] = {mac for mac, _ in trusted_ordered} logger.info("=" * 50) logger.info("MAC Watcher starting") logger.info(f"Switch: {config.switch_ip_addr}") logger.info(f"snmpwalk: {config.snmpwalk_bin}") logger.info(f"Poll interval: {config.snmp_poll_interval}s") - logger.info(f"Trusted MACs: {len(trusted_macs)}") + logger.info(f"Trusted MACs: {len(trusted_ordered)}") logger.info(f"Exporter: http://{config.exporter_host}:{config.exporter_port}/metrics") logger.info("=" * 50) @@ -437,12 +503,12 @@ def main(): logger.warning(f"Vendor database update failed (offline?): {e}") vendor_cache = VendorCache(config.vendor_cache_file) - metrics_server = MetricsServer(vendor_cache, trusted_macs) + metrics_server = MetricsServer(vendor_cache, trusted_ordered) # Start watcher in background thread watcher_thread = threading.Thread( target=watch, - args=(metrics_server, vendor_cache, trusted_macs), + args=(metrics_server, vendor_cache, trusted_set), daemon=True, name="Watcher", )