536 lines
18 KiB
Python
536 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
MAC Watcher
|
|
Polls a switch via SNMP, alerts on unknown MACs, and exposes a Prometheus
|
|
metrics endpoint reflecting the last SNMP readout.
|
|
|
|
Author: Hendrik Schutter, mail@hendrikschutter.com
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import smtplib
|
|
import sys
|
|
import threading
|
|
import time
|
|
import email.utils
|
|
from email.mime.text import MIMEText
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from subprocess import PIPE, Popen
|
|
from datetime import datetime
|
|
|
|
from mac_vendor_lookup import MacLookup
|
|
|
|
import config
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MAC normalization helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MAC_RE = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
|
|
|
|
|
|
def normalize_mac(mac: str) -> str:
|
|
"""Return MAC address in canonical uppercase colon-separated form."""
|
|
return mac.strip().upper()
|
|
|
|
|
|
def normalize_mac_list(macs: list[str]) -> list[str]:
|
|
"""Normalize a list of MAC addresses, remove duplicates, preserve order."""
|
|
seen: set[str] = set()
|
|
result: list[str] = []
|
|
for mac in macs:
|
|
n = normalize_mac(mac)
|
|
if n not in seen:
|
|
seen.add(n)
|
|
result.append(n)
|
|
return result
|
|
|
|
|
|
def is_valid_mac(mac: str) -> bool:
|
|
return bool(MAC_RE.match(mac))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging setup
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def setup_logging():
|
|
log_level = getattr(logging, config.log_level.upper(), logging.INFO)
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MAC vendor cache (persistent JSON file, not tracked by git)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class VendorCache:
|
|
"""Persistent JSON-backed cache for MAC vendor lookups."""
|
|
|
|
def __init__(self, cache_file: str):
|
|
self._file = cache_file
|
|
self._data: dict[str, str] = {}
|
|
self._hits: int = 0
|
|
self._misses: int = 0
|
|
self._lock = threading.Lock()
|
|
self._logger = logging.getLogger("VendorCache")
|
|
self._load()
|
|
|
|
def _load(self):
|
|
try:
|
|
with open(self._file, "r") as f:
|
|
self._data = json.load(f)
|
|
self._logger.info(
|
|
f"Loaded {len(self._data)} vendor cache entries from {self._file}"
|
|
)
|
|
except FileNotFoundError:
|
|
self._logger.info(
|
|
f"Cache file {self._file} not found, starting with empty cache"
|
|
)
|
|
self._data = {}
|
|
except json.JSONDecodeError as e:
|
|
self._logger.warning(f"Cache file corrupt, resetting: {e}")
|
|
self._data = {}
|
|
|
|
def _save(self):
|
|
try:
|
|
with open(self._file, "w") as f:
|
|
json.dump(self._data, f, indent=2)
|
|
except OSError as e:
|
|
self._logger.error(f"Failed to write cache file: {e}")
|
|
|
|
def lookup(self, mac: str) -> str:
|
|
"""Return vendor string for mac, querying mac_vendor_lookup if not cached."""
|
|
mac_upper = normalize_mac(mac)
|
|
with self._lock:
|
|
if mac_upper in self._data:
|
|
self._hits += 1
|
|
return self._data[mac_upper]
|
|
|
|
# Not in cache — query library (blocking, disk I/O)
|
|
self._misses += 1
|
|
try:
|
|
vendor = MacLookup().lookup(mac)
|
|
except Exception:
|
|
vendor = "Unknown vendor"
|
|
|
|
with self._lock:
|
|
self._data[mac_upper] = vendor
|
|
self._save()
|
|
|
|
return vendor
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
with self._lock:
|
|
return len(self._data)
|
|
|
|
@property
|
|
def hits(self) -> int:
|
|
with self._lock:
|
|
return self._hits
|
|
|
|
@property
|
|
def misses(self) -> int:
|
|
with self._lock:
|
|
return self._misses
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SNMP query
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def query_mac_table() -> list[str]:
|
|
"""
|
|
Query the switch MAC address table via SNMP (OID 1.3.6.1.2.1.17.4.3.1.1).
|
|
Returns a deduplicated list of normalized MAC strings "AA:BB:CC:DD:EE:FF".
|
|
"""
|
|
logger = logging.getLogger("SNMPQuery")
|
|
mac_addresses: list[str] = []
|
|
|
|
cmd = [
|
|
config.snmpwalk_bin,
|
|
"-v", "2c",
|
|
"-O", "vqe",
|
|
"-c", config.switch_snmp_community,
|
|
config.switch_ip_addr,
|
|
"1.3.6.1.2.1.17.4.3.1.1",
|
|
]
|
|
|
|
try:
|
|
with Popen(cmd, stdout=PIPE, stderr=PIPE) as process:
|
|
stdout, stderr = process.communicate()
|
|
if process.returncode != 0:
|
|
logger.error(
|
|
f"snmpwalk failed (rc={process.returncode}): "
|
|
f"{stderr.decode().strip()}"
|
|
)
|
|
return []
|
|
|
|
for line in stdout.decode("utf-8").splitlines():
|
|
mac = line.replace(" ", ":").replace('"', "").strip().rstrip(":")
|
|
mac = normalize_mac(mac)
|
|
if is_valid_mac(mac):
|
|
mac_addresses.append(mac)
|
|
|
|
except FileNotFoundError:
|
|
logger.error(
|
|
f"snmpwalk binary not found at '{config.snmpwalk_bin}'. "
|
|
f"Install snmp or adjust snmpwalk_bin in config.py."
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Exception during SNMP query: {e}")
|
|
|
|
result = normalize_mac_list(mac_addresses)
|
|
logger.debug(f"SNMP returned {len(result)} unique MAC addresses")
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Email alert
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def send_alert_mail(mac_addr: str, vendor: str):
|
|
logger = logging.getLogger("EmailAlert")
|
|
timestamp = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
|
|
|
|
body = (
|
|
f"New unknown MAC address detected.\n\n"
|
|
f"Date: {timestamp}\n"
|
|
f"MAC: {mac_addr}\n"
|
|
f"Vendor: {vendor}\n"
|
|
)
|
|
|
|
msg = MIMEText(body)
|
|
msg["Subject"] = f"MAC-Watcher: unknown MAC {mac_addr} ({vendor})"
|
|
msg["To"] = email.utils.formataddr((config.mail_to_name, config.mail_to_address))
|
|
msg["From"] = email.utils.formataddr((config.mail_from_name, config.mail_from_address))
|
|
|
|
try:
|
|
server = smtplib.SMTP(config.mail_server_domain, config.mail_server_port, timeout=10)
|
|
server.starttls()
|
|
server.login(config.mail_from_address, config.mail_server_password)
|
|
server.sendmail(config.mail_from_address, config.mail_to_address, msg.as_string())
|
|
server.quit()
|
|
logger.info(f"Alert sent for {mac_addr}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send alert mail for {mac_addr}: {e}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Prometheus metrics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MetricsServer:
|
|
"""
|
|
HTTP server exposing a /metrics endpoint for Prometheus.
|
|
|
|
Trusted MACs are always emitted (0 when absent from last readout, 1 when
|
|
present). Unknown MACs seen in the readout are appended with value 1.
|
|
A human-readable label comment is written above each device_present line.
|
|
|
|
Metric layout:
|
|
# <device label>
|
|
mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF",trusted="true"} 1
|
|
# Unknown
|
|
mac_watcher_device_present{mac="11:22:33:44:55:66",trusted="false"} 1
|
|
mac_watcher_vendor_cache_size 42
|
|
mac_watcher_vendor_cache_hits_total 100
|
|
mac_watcher_vendor_cache_misses_total 5
|
|
mac_watcher_snmp_polls_total 30
|
|
mac_watcher_exporter_uptime_seconds 900
|
|
mac_watcher_exporter_requests_total 15
|
|
"""
|
|
|
|
def __init__(self, vendor_cache: VendorCache,
|
|
trusted_ordered: list[tuple[str, str]]):
|
|
"""
|
|
Args:
|
|
vendor_cache: VendorCache instance.
|
|
trusted_ordered: Ordered list of (mac_uppercase, label) tuples
|
|
parsed from config.trusted_mac_addresses.
|
|
"""
|
|
self._vendor_cache = vendor_cache
|
|
# Ordered list preserves config order in metrics output
|
|
self._trusted_ordered: list[tuple[str, str]] = trusted_ordered
|
|
# Set for fast membership checks
|
|
self._trusted_set: set[str] = {mac for mac, _ in trusted_ordered}
|
|
self._lock = threading.Lock()
|
|
self._logger = logging.getLogger("MetricsServer")
|
|
self.start_time = datetime.now()
|
|
self.request_count: int = 0
|
|
self.snmp_poll_count: int = 0
|
|
|
|
# Last SNMP snapshot, replaced atomically on each poll.
|
|
self._snapshot: set[str] = set()
|
|
|
|
def update(self, current_macs: list[str]):
|
|
"""Replace the current snapshot with the latest SNMP readout."""
|
|
with self._lock:
|
|
self.snmp_poll_count += 1
|
|
self._snapshot = set(current_macs)
|
|
|
|
def _fmt_block(self, name: str, value, help_text: str,
|
|
metric_type: str = "gauge",
|
|
labels: dict[str, str] | None = None) -> list[str]:
|
|
"""Return HELP + TYPE + value lines for one metric."""
|
|
full_name = f"{config.exporter_prefix}{name}"
|
|
lines = [
|
|
f"# HELP {full_name} {help_text}",
|
|
f"# TYPE {full_name} {metric_type}",
|
|
]
|
|
if labels:
|
|
label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())
|
|
lines.append(f"{full_name}{{{label_str}}} {value}")
|
|
else:
|
|
lines.append(f"{full_name} {value}")
|
|
return lines
|
|
|
|
def _generate_metrics(self) -> str:
|
|
lines: list[str] = []
|
|
uptime = int((datetime.now() - self.start_time).total_seconds())
|
|
prefix = config.exporter_prefix
|
|
|
|
# --- Exporter meta ---
|
|
lines += self._fmt_block(
|
|
"exporter_uptime_seconds", uptime,
|
|
"Exporter uptime in seconds",
|
|
)
|
|
lines += self._fmt_block(
|
|
"exporter_requests_total", self.request_count,
|
|
"Total number of /metrics requests",
|
|
metric_type="counter",
|
|
)
|
|
lines += self._fmt_block(
|
|
"snmp_polls_total", self.snmp_poll_count,
|
|
"Total number of completed SNMP polls",
|
|
metric_type="counter",
|
|
)
|
|
|
|
# --- Vendor cache statistics ---
|
|
lines += self._fmt_block(
|
|
"vendor_cache_size", self._vendor_cache.size,
|
|
"Number of entries in the persistent vendor cache",
|
|
)
|
|
lines += self._fmt_block(
|
|
"vendor_cache_hits_total", self._vendor_cache.hits,
|
|
"Total vendor cache hits",
|
|
metric_type="counter",
|
|
)
|
|
lines += self._fmt_block(
|
|
"vendor_cache_misses_total", self._vendor_cache.misses,
|
|
"Total vendor cache misses (required library lookup)",
|
|
metric_type="counter",
|
|
)
|
|
|
|
# --- Device presence ---
|
|
# Trusted MACs are always present in the output (0 or 1).
|
|
# Unknown MACs seen in the current readout follow with value 1.
|
|
# A label comment above each line identifies the device by name.
|
|
metric_name = f"{prefix}device_present"
|
|
lines.append(
|
|
f"# HELP {metric_name} "
|
|
f"1 if the MAC address was present in the last SNMP readout, "
|
|
f"0 if absent. Trusted MACs are always emitted."
|
|
)
|
|
lines.append(f"# TYPE {metric_name} gauge")
|
|
|
|
with self._lock:
|
|
snapshot = set(self._snapshot)
|
|
|
|
# Trusted MACs — always emitted, value derived from snapshot
|
|
for mac, label in self._trusted_ordered:
|
|
value = 1 if mac in snapshot else 0
|
|
lines.append(f"# {label}")
|
|
lines.append(f'{metric_name}{{mac="{mac}",trusted="true"}} {value}')
|
|
|
|
# Unknown MACs — only those in snapshot but not in trusted list
|
|
for mac in sorted(snapshot - self._trusted_set):
|
|
lines.append("# Unknown")
|
|
lines.append(f'{metric_name}{{mac="{mac}",trusted="false"}} 1')
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
def create_handler(self):
|
|
server_instance = self
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
pass # suppress default access log
|
|
|
|
def do_GET(self):
|
|
with server_instance._lock:
|
|
server_instance.request_count += 1
|
|
|
|
if self.path == "/metrics":
|
|
body = server_instance._generate_metrics().encode("utf-8")
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
elif self.path in ("/", "/health"):
|
|
body = (
|
|
b"<html><head><title>MAC Watcher</title></head><body>"
|
|
b"<h1>MAC Watcher Prometheus Exporter</h1>"
|
|
b'<p><a href="/metrics">Metrics</a></p>'
|
|
b"</body></html>"
|
|
)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
return RequestHandler
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main watcher loop
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def watch(metrics_server: MetricsServer, vendor_cache: VendorCache,
|
|
trusted: set[str]):
|
|
"""
|
|
Poll the switch in a loop. Push each readout to the MetricsServer.
|
|
Send one email alert per unknown MAC (de-duplicated in memory).
|
|
|
|
Args:
|
|
metrics_server: receives each SNMP snapshot via update().
|
|
vendor_cache: MAC-to-vendor resolution with persistent cache.
|
|
trusted: normalized uppercase set of trusted MAC addresses.
|
|
"""
|
|
logger = logging.getLogger("Watcher")
|
|
alerted: set[str] = set()
|
|
|
|
while True:
|
|
macs = query_mac_table()
|
|
metrics_server.update(macs)
|
|
|
|
for mac in macs:
|
|
if mac not in trusted and mac not in alerted:
|
|
vendor = vendor_cache.lookup(mac)
|
|
logger.warning(f"Unknown MAC detected: {mac} ({vendor})")
|
|
alerted.add(mac)
|
|
send_alert_mail(mac, vendor)
|
|
|
|
time.sleep(config.snmp_poll_interval)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Label parser
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _parse_trusted_labels() -> list[str]:
|
|
"""
|
|
Extract the inline comment labels from config.trusted_mac_addresses.
|
|
|
|
Reads config.py source at runtime so labels stay in sync with the list
|
|
without requiring a separate data structure. Falls back to the MAC
|
|
address string itself if no comment is found for an entry.
|
|
|
|
Returns a list of label strings aligned 1:1 with
|
|
config.trusted_mac_addresses.
|
|
"""
|
|
import inspect
|
|
import re as _re
|
|
|
|
src = inspect.getsource(config)
|
|
# Match each quoted MAC followed by an optional inline comment
|
|
pairs = _re.findall(
|
|
r'"([0-9A-Fa-f:]{17})"[^#\n]*#\s*(.+)',
|
|
src,
|
|
)
|
|
comment_map: dict[str, str] = {
|
|
mac.upper(): label.strip() for mac, label in pairs
|
|
}
|
|
return [
|
|
comment_map.get(normalize_mac(mac), normalize_mac(mac))
|
|
for mac in config.trusted_mac_addresses
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
setup_logging()
|
|
logger = logging.getLogger("Main")
|
|
|
|
# Build ordered list of (mac_uppercase, label) from config.
|
|
# The label is the inline comment text after the MAC address.
|
|
# Normalization happens here once; all downstream code uses uppercase.
|
|
trusted_ordered: list[tuple[str, str]] = []
|
|
seen: set[str] = set()
|
|
for mac, label in zip(
|
|
config.trusted_mac_addresses,
|
|
_parse_trusted_labels(),
|
|
):
|
|
mac_upper = normalize_mac(mac)
|
|
if mac_upper not in seen:
|
|
seen.add(mac_upper)
|
|
trusted_ordered.append((mac_upper, label))
|
|
|
|
trusted_set: set[str] = {mac for mac, _ in trusted_ordered}
|
|
|
|
logger.info("=" * 50)
|
|
logger.info("MAC Watcher starting")
|
|
logger.info(f"Switch: {config.switch_ip_addr}")
|
|
logger.info(f"snmpwalk: {config.snmpwalk_bin}")
|
|
logger.info(f"Poll interval: {config.snmp_poll_interval}s")
|
|
logger.info(f"Trusted MACs: {len(trusted_ordered)}")
|
|
logger.info(f"Exporter: http://{config.exporter_host}:{config.exporter_port}/metrics")
|
|
logger.info("=" * 50)
|
|
|
|
# Update local vendor DB on startup
|
|
logger.info("Updating MAC vendor database...")
|
|
try:
|
|
MacLookup().update_vendors()
|
|
logger.info("Vendor database updated")
|
|
except Exception as e:
|
|
logger.warning(f"Vendor database update failed (offline?): {e}")
|
|
|
|
vendor_cache = VendorCache(config.vendor_cache_file)
|
|
metrics_server = MetricsServer(vendor_cache, trusted_ordered)
|
|
|
|
# Start watcher in background thread
|
|
watcher_thread = threading.Thread(
|
|
target=watch,
|
|
args=(metrics_server, vendor_cache, trusted_set),
|
|
daemon=True,
|
|
name="Watcher",
|
|
)
|
|
watcher_thread.start()
|
|
|
|
# Start HTTP metrics server (blocking)
|
|
handler = metrics_server.create_handler()
|
|
try:
|
|
http_server = HTTPServer((config.exporter_host, config.exporter_port), handler)
|
|
logger.info(
|
|
f"HTTP server listening on {config.exporter_host}:{config.exporter_port}"
|
|
)
|
|
http_server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutdown requested")
|
|
except Exception as e:
|
|
logger.error(f"Fatal error: {e}", exc_info=True)
|
|
finally:
|
|
logger.info("Shutdown complete")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|