always output trusted metrics

This commit is contained in:
2026-03-31 21:47:52 +02:00
parent f6047a0b6e
commit 048b4a28e7

View File

@ -231,13 +231,15 @@ class MetricsServer:
"""
HTTP server exposing a /metrics endpoint for Prometheus.
The device presence metric is a direct snapshot of the last SNMP readout.
Every MAC seen in that readout gets value 1; MACs from previous readouts
that are no longer present are dropped from the output entirely.
The trusted/unknown status is exposed as an additional label.
Trusted MACs are always emitted (0 when absent from last readout, 1 when
present). Unknown MACs seen in the readout are appended with value 1.
A human-readable label comment is written above each device_present line.
Metric layout:
# <device label>
mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF",trusted="true"} 1
# Unknown
mac_watcher_device_present{mac="11:22:33:44:55:66",trusted="false"} 1
mac_watcher_vendor_cache_size 42
mac_watcher_vendor_cache_hits_total 100
mac_watcher_vendor_cache_misses_total 5
@ -246,24 +248,33 @@ class MetricsServer:
mac_watcher_exporter_requests_total 15
"""
def __init__(self, vendor_cache: VendorCache, trusted: set[str]):
def __init__(self, vendor_cache: VendorCache,
trusted_ordered: list[tuple[str, str]]):
"""
Args:
vendor_cache: VendorCache instance.
trusted_ordered: Ordered list of (mac_uppercase, label) tuples
parsed from config.trusted_mac_addresses.
"""
self._vendor_cache = vendor_cache
self._trusted = trusted
# Ordered list preserves config order in metrics output
self._trusted_ordered: list[tuple[str, str]] = trusted_ordered
# Set for fast membership checks
self._trusted_set: set[str] = {mac for mac, _ in trusted_ordered}
self._lock = threading.Lock()
self._logger = logging.getLogger("MetricsServer")
self.start_time = datetime.now()
self.request_count: int = 0
self.snmp_poll_count: int = 0
# Last SNMP snapshot: mac -> 1 (always 1 while in snapshot)
# Replaced atomically on each poll.
self._snapshot: list[str] = []
# Last SNMP snapshot, replaced atomically on each poll.
self._snapshot: set[str] = set()
def update(self, current_macs: list[str]):
"""Replace the current snapshot with the latest SNMP readout."""
with self._lock:
self.snmp_poll_count += 1
self._snapshot = list(current_macs)
self._snapshot = set(current_macs)
def _fmt_block(self, name: str, value, help_text: str,
metric_type: str = "gauge",
@ -318,22 +329,31 @@ class MetricsServer:
metric_type="counter",
)
# --- Device presence snapshot ---
# HELP and TYPE appear once; one series per MAC in the last readout.
# --- Device presence ---
# Trusted MACs are always present in the output (0 or 1).
# Unknown MACs seen in the current readout follow with value 1.
# A label comment above each line identifies the device by name.
metric_name = f"{prefix}device_present"
lines.append(
f"# HELP {metric_name} "
f"1 if the MAC address was present in the last SNMP readout"
f"1 if the MAC address was present in the last SNMP readout, "
f"0 if absent. Trusted MACs are always emitted."
)
lines.append(f"# TYPE {metric_name} gauge")
with self._lock:
snapshot = list(self._snapshot)
snapshot = set(self._snapshot)
for mac in snapshot:
trusted_label = "true" if mac in self._trusted else "false"
label_str = f'mac="{mac}",trusted="{trusted_label}"'
lines.append(f"{metric_name}{{{label_str}}} 1")
# Trusted MACs — always emitted, value derived from snapshot
for mac, label in self._trusted_ordered:
value = 1 if mac in snapshot else 0
lines.append(f"# {label}")
lines.append(f'{metric_name}{{mac="{mac}",trusted="true"}} {value}')
# Unknown MACs — only those in snapshot but not in trusted list
for mac in sorted(snapshot - self._trusted_set):
lines.append("# Unknown")
lines.append(f'{metric_name}{{mac="{mac}",trusted="false"}} 1')
return "\n".join(lines) + "\n"
@ -408,6 +428,39 @@ def watch(metrics_server: MetricsServer, vendor_cache: VendorCache,
time.sleep(config.snmp_poll_interval)
# ---------------------------------------------------------------------------
# Label parser
# ---------------------------------------------------------------------------
def _parse_trusted_labels() -> list[str]:
"""
Extract the inline comment labels from config.trusted_mac_addresses.
Reads config.py source at runtime so labels stay in sync with the list
without requiring a separate data structure. Falls back to the MAC
address string itself if no comment is found for an entry.
Returns a list of label strings aligned 1:1 with
config.trusted_mac_addresses.
"""
import inspect
import re as _re
src = inspect.getsource(config)
# Match each quoted MAC followed by an optional inline comment
pairs = _re.findall(
r'"([0-9A-Fa-f:]{17})"[^#\n]*#\s*(.+)',
src,
)
comment_map: dict[str, str] = {
mac.upper(): label.strip() for mac, label in pairs
}
return [
comment_map.get(normalize_mac(mac), normalize_mac(mac))
for mac in config.trusted_mac_addresses
]
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
@ -416,15 +469,28 @@ def main():
setup_logging()
logger = logging.getLogger("Main")
# Normalize trusted MAC list once at startup.
trusted_macs: set[str] = set(normalize_mac_list(config.trusted_mac_addresses))
# Build ordered list of (mac_uppercase, label) from config.
# The label is the inline comment text after the MAC address.
# Normalization happens here once; all downstream code uses uppercase.
trusted_ordered: list[tuple[str, str]] = []
seen: set[str] = set()
for mac, label in zip(
config.trusted_mac_addresses,
_parse_trusted_labels(),
):
mac_upper = normalize_mac(mac)
if mac_upper not in seen:
seen.add(mac_upper)
trusted_ordered.append((mac_upper, label))
trusted_set: set[str] = {mac for mac, _ in trusted_ordered}
logger.info("=" * 50)
logger.info("MAC Watcher starting")
logger.info(f"Switch: {config.switch_ip_addr}")
logger.info(f"snmpwalk: {config.snmpwalk_bin}")
logger.info(f"Poll interval: {config.snmp_poll_interval}s")
logger.info(f"Trusted MACs: {len(trusted_macs)}")
logger.info(f"Trusted MACs: {len(trusted_ordered)}")
logger.info(f"Exporter: http://{config.exporter_host}:{config.exporter_port}/metrics")
logger.info("=" * 50)
@ -437,12 +503,12 @@ def main():
logger.warning(f"Vendor database update failed (offline?): {e}")
vendor_cache = VendorCache(config.vendor_cache_file)
metrics_server = MetricsServer(vendor_cache, trusted_macs)
metrics_server = MetricsServer(vendor_cache, trusted_ordered)
# Start watcher in background thread
watcher_thread = threading.Thread(
target=watch,
args=(metrics_server, vendor_cache, trusted_macs),
args=(metrics_server, vendor_cache, trusted_set),
daemon=True,
name="Watcher",
)