Compare commits

2 Commits

Author SHA1 Message Date
048b4a28e7 always output trusted metrics 2026-03-31 21:47:52 +02:00
f6047a0b6e add prometheus export 2026-03-29 22:10:04 +02:00
6 changed files with 638 additions and 261 deletions

198
.gitignore vendored
View File

@ -1,196 +1,34 @@
# ---> Python
# Byte-compiled / optimized / DLL files
# Python
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
*.egg-info/
dist/
build/
.eggs/
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
env/
# Spyder project settings
.spyderproject
.spyproject
# pytest / coverage
.pytest_cache/
.coverage
htmlcov/
# Rope project settings
.ropeproject
# MAC vendor lookup cache (auto-generated, contains local data)
mac_vendor_cache.json
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# ---> VisualStudioCode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
# ---> Kate
# Swap Files #
.*.kate-swp
.swp.*
# ---> Linux
# Editor
.vscode/
.idea/
*.kate-swp
*~
# temporary files which can be created if a process still has a handle open of a deleted file
# Linux
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
# .nfs files are created when an open file is removed but is still being accessed
.nfs*

View File

@ -1,22 +1,82 @@
# mac_watcher
Scrape the MAC table from HP ProCurve switch and altert if a untrusted MAC is found.
Polls a switch MAC address table via SNMP. Sends email alerts for unknown MACs. Exposes a Prometheus metrics endpoint reflecting the current switch state.
## Requirements
## Overview ##
Scrape all known MAC addresses via snmp from a HP ProCurve switch.
If a reported MAC is not i a white list, a alert is sent via email.
- Debian Linux
- Python 3.11+
- `snmpwalk` (`apt install snmp`)
- Python dependencies: `pip install -r requirements.txt --break-system-packages`
## Install
## Install ##
```
apt install snmp
cd /opt
git clone https://git.mosad.xyz/localhorst/mac_watcher.git
cd /opt/mac_watcher
pip install -r requirements.txt --break-system-packages
```
- `pip install mac-vendor-lookup`
- `cd /opt/`
- `git clone https://git.mosad.xyz/localhorst/mac_watcher.git`
- `cd /opt/mac_watcher/`
- Set the constants in `config.py`
- `chmod +x /opt/mac_watcher/mac_watcher.py`
- `cp scripts/mac-watcher.service /etc/systemd/system/mac-watcher.service`
- `systemctl daemon-reload && systemctl enable --now mac-watcher.service`
Edit `config.py` and set all values.
```
chmod +x /opt/mac_watcher/mac_watcher.py
cp scripts/mac-watcher.service /etc/systemd/system/
systemctl daemon-reload
systemctl enable --now mac-watcher.service
```
## Configuration
All settings are in `config.py`.
| Setting | Description |
|---|---|
| `switch_ip_addr` | IP of the managed switch |
| `switch_snmp_community` | SNMP v2c community string |
| `snmp_poll_interval` | Seconds between polls |
| `snmpwalk_bin` | Full path to `snmpwalk` binary |
| `trusted_mac_addresses` | Known/allowed MACs — case-insensitive, normalized at startup |
| `vendor_cache_file` | Path to persistent vendor cache JSON |
| `exporter_host` | Bind address for Prometheus exporter |
| `exporter_port` | Port for Prometheus exporter |
| `exporter_prefix` | Metric name prefix |
| `mail_*` | SMTP credentials and recipients |
## Prometheus Metrics
Endpoint: `http://<host>:<exporter_port>/metrics`
### Device presence
Single gauge reflecting the **last SNMP readout as-is**. Every MAC currently in the switch table is emitted with value `1`. MACs from previous readouts that are no longer present are dropped. Between startup and the first successful poll no device series are emitted.
```
mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF",trusted="true"} 1
mac_watcher_device_present{mac="11:22:33:44:55:66",trusted="false"} 1
```
The `trusted` label reflects whether the MAC is in `trusted_mac_addresses`.
### Exporter / cache statistics
```
mac_watcher_exporter_uptime_seconds
mac_watcher_exporter_requests_total
mac_watcher_snmp_polls_total
mac_watcher_vendor_cache_size
mac_watcher_vendor_cache_hits_total
mac_watcher_vendor_cache_misses_total
```
### prometheus.yml example
```yaml
scrape_configs:
- job_name: 'mac_watcher'
static_configs:
- targets: ['localhost:9200']
scrape_interval: 60s
```

View File

@ -1,16 +1,17 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com
Date of creation: 2023/02/26
Date of last modification: 2023/02/26
"""
MAC Watcher - Configuration
"""
# switch
# --- SNMP / Switch ---
switch_ip_addr = "10.0.0.2"
switch_snmp_community = "public"
snmp_poll_interval = 30 # seconds between SNMP queries
snmpwalk_bin = "/usr/bin/snmpwalk" # full path to snmpwalk binary
# email
mail_server_domain = 'smtp.maildomain.dev'
# --- Email alerts ---
mail_server_domain = "smtp.maildomain.dev"
mail_server_port = 587
mail_server_password = "__PW_DB__"
mail_from_address = "sender@domain.com"
@ -18,11 +19,23 @@ mail_from_name = "MAC-Watcher"
mail_to_address = "dummy@example.com"
mail_to_name = "Joe Doe"
# trusted macs
# --- Trusted MAC addresses (whitelist) ---
# Case-insensitive. Normalized to uppercase at startup.
# MACs not in this list trigger an email alert.
# All MACs seen in the last SNMP readout are exposed via Prometheus regardless.
# Format: "AA:BB:CC:DD:EE:FF" # device description
trusted_mac_addresses = [
"00:EE:00:EE:40:EE", #Router
"00:EE:C2:EE:82:EE", #Smartphone
"00:EE:00:EE:40:EE", # Router
"00:EE:C2:EE:82:EE", # Smartphone
]
# --- MAC vendor lookup cache ---
vendor_cache_file = "mac_vendor_cache.json"
# --- Prometheus exporter ---
exporter_prefix = "mac_watcher_"
exporter_host = "10.0.0.12"
exporter_port = 9200
# --- Logging ---
log_level = "INFO" # DEBUG, INFO, WARNING, ERROR

View File

@ -1,73 +1,535 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com
Date of creation: 2023/02/26
Date of last modification: 2023/02/26
"""
MAC Watcher
Polls a switch via SNMP, alerts on unknown MACs, and exposes a Prometheus
metrics endpoint reflecting the last SNMP readout.
Author: Hendrik Schutter, mail@hendrikschutter.com
"""
from subprocess import PIPE, Popen
from mac_vendor_lookup import MacLookup
import json
import logging
import re
import smtplib
import sys
import threading
import time
import email.utils
from email.mime.text import MIMEText
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from subprocess import PIPE, Popen
from datetime import datetime
from mac_vendor_lookup import MacLookup
import config
def send_alert_mail(mac_addr):
server = smtplib.SMTP(config.mail_server_domain, config.mail_server_port)
server.starttls()
server.login(config.mail_from_address, config.mail_server_password)
# ---------------------------------------------------------------------------
# MAC normalization helpers
# ---------------------------------------------------------------------------
MAC_RE = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
def normalize_mac(mac: str) -> str:
"""Return MAC address in canonical uppercase colon-separated form."""
return mac.strip().upper()
def normalize_mac_list(macs: list[str]) -> list[str]:
"""Normalize a list of MAC addresses, remove duplicates, preserve order."""
seen: set[str] = set()
result: list[str] = []
for mac in macs:
n = normalize_mac(mac)
if n not in seen:
seen.add(n)
result.append(n)
return result
def is_valid_mac(mac: str) -> bool:
return bool(MAC_RE.match(mac))
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
def setup_logging():
log_level = getattr(logging, config.log_level.upper(), logging.INFO)
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# ---------------------------------------------------------------------------
# MAC vendor cache (persistent JSON file, not tracked by git)
# ---------------------------------------------------------------------------
class VendorCache:
"""Persistent JSON-backed cache for MAC vendor lookups."""
def __init__(self, cache_file: str):
self._file = cache_file
self._data: dict[str, str] = {}
self._hits: int = 0
self._misses: int = 0
self._lock = threading.Lock()
self._logger = logging.getLogger("VendorCache")
self._load()
def _load(self):
try:
with open(self._file, "r") as f:
self._data = json.load(f)
self._logger.info(
f"Loaded {len(self._data)} vendor cache entries from {self._file}"
)
except FileNotFoundError:
self._logger.info(
f"Cache file {self._file} not found, starting with empty cache"
)
self._data = {}
except json.JSONDecodeError as e:
self._logger.warning(f"Cache file corrupt, resetting: {e}")
self._data = {}
def _save(self):
try:
with open(self._file, "w") as f:
json.dump(self._data, f, indent=2)
except OSError as e:
self._logger.error(f"Failed to write cache file: {e}")
def lookup(self, mac: str) -> str:
"""Return vendor string for mac, querying mac_vendor_lookup if not cached."""
mac_upper = normalize_mac(mac)
with self._lock:
if mac_upper in self._data:
self._hits += 1
return self._data[mac_upper]
# Not in cache — query library (blocking, disk I/O)
self._misses += 1
try:
vendor = MacLookup().lookup(mac)
except Exception:
vendor = "Unknown vendor"
with self._lock:
self._data[mac_upper] = vendor
self._save()
return vendor
@property
def size(self) -> int:
with self._lock:
return len(self._data)
@property
def hits(self) -> int:
with self._lock:
return self._hits
@property
def misses(self) -> int:
with self._lock:
return self._misses
# ---------------------------------------------------------------------------
# SNMP query
# ---------------------------------------------------------------------------
def query_mac_table() -> list[str]:
"""
Query the switch MAC address table via SNMP (OID 1.3.6.1.2.1.17.4.3.1.1).
Returns a deduplicated list of normalized MAC strings "AA:BB:CC:DD:EE:FF".
"""
logger = logging.getLogger("SNMPQuery")
mac_addresses: list[str] = []
cmd = [
config.snmpwalk_bin,
"-v", "2c",
"-O", "vqe",
"-c", config.switch_snmp_community,
config.switch_ip_addr,
"1.3.6.1.2.1.17.4.3.1.1",
]
try:
mac_vendor = MacLookup().lookup(mac_addr)
except:
mac_vendor = " Vendor not found"
with Popen(cmd, stdout=PIPE, stderr=PIPE) as process:
stdout, stderr = process.communicate()
if process.returncode != 0:
logger.error(
f"snmpwalk failed (rc={process.returncode}): "
f"{stderr.decode().strip()}"
)
return []
for line in stdout.decode("utf-8").splitlines():
mac = line.replace(" ", ":").replace('"', "").strip().rstrip(":")
mac = normalize_mac(mac)
if is_valid_mac(mac):
mac_addresses.append(mac)
except FileNotFoundError:
logger.error(
f"snmpwalk binary not found at '{config.snmpwalk_bin}'. "
f"Install snmp or adjust snmpwalk_bin in config.py."
)
except Exception as e:
logger.error(f"Exception during SNMP query: {e}")
result = normalize_mac_list(mac_addresses)
logger.debug(f"SNMP returned {len(result)} unique MAC addresses")
return result
# ---------------------------------------------------------------------------
# Email alert
# ---------------------------------------------------------------------------
def send_alert_mail(mac_addr: str, vendor: str):
logger = logging.getLogger("EmailAlert")
timestamp = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
body = (
f"New unknown MAC address detected.\n\n"
f"Date: {timestamp}\n"
f"MAC: {mac_addr}\n"
f"Vendor: {vendor}\n"
)
timeLong = time.strftime("%d.%m.%Y %H:%M:%S")
body = "Hallo Admin,\n\nneue MAC-Adresse gefunden!\n\nDatum: "+ timeLong + "\nMAC: " + str(mac_addr) +"\nVendor: " + mac_vendor + "\n\nVersion: 1.0 - 26.02.2023"
msg = MIMEText(body)
msg['Subject'] = 'New MAC found: ' + str(mac_addr) + " - " + mac_vendor
msg['To'] = email.utils.formataddr((config.mail_to_name, config.mail_to_address ))
msg['From'] = email.utils.formataddr((config.mail_from_name, config.mail_from_address))
msg["Subject"] = f"MAC-Watcher: unknown MAC {mac_addr} ({vendor})"
msg["To"] = email.utils.formataddr((config.mail_to_name, config.mail_to_address))
msg["From"] = email.utils.formataddr((config.mail_from_name, config.mail_from_address))
server.sendmail(config.mail_from_address, config.mail_to_address , msg.as_string())
server.quit()
try:
server = smtplib.SMTP(config.mail_server_domain, config.mail_server_port, timeout=10)
server.starttls()
server.login(config.mail_from_address, config.mail_server_password)
server.sendmail(config.mail_from_address, config.mail_to_address, msg.as_string())
server.quit()
logger.info(f"Alert sent for {mac_addr}")
except Exception as e:
logger.error(f"Failed to send alert mail for {mac_addr}: {e}")
def query_mac_from_switch():
mac_addresses = list()
command = "snmpwalk -v 2c -O vqe -c " + config.switch_snmp_community + " " + config.switch_ip_addr + " 1.3.6.1.2.1.17.4.3.1.1"
with Popen(command, stdout=PIPE, stderr=None, shell=True) as process:
output = process.communicate()[0].decode("utf-8")
for mac in output.split("\n"):
mac = mac.replace(" ", ":")
mac = mac.replace('"', "")
mac = mac[0:-1]
if(len(mac) == 17):
mac_addresses.append(mac)
return mac_addresses
def watch():
# ---------------------------------------------------------------------------
# Prometheus metrics
# ---------------------------------------------------------------------------
alerted_mac_addresses = list()
class MetricsServer:
"""
HTTP server exposing a /metrics endpoint for Prometheus.
Trusted MACs are always emitted (0 when absent from last readout, 1 when
present). Unknown MACs seen in the readout are appended with value 1.
A human-readable label comment is written above each device_present line.
Metric layout:
# <device label>
mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF",trusted="true"} 1
# Unknown
mac_watcher_device_present{mac="11:22:33:44:55:66",trusted="false"} 1
mac_watcher_vendor_cache_size 42
mac_watcher_vendor_cache_hits_total 100
mac_watcher_vendor_cache_misses_total 5
mac_watcher_snmp_polls_total 30
mac_watcher_exporter_uptime_seconds 900
mac_watcher_exporter_requests_total 15
"""
def __init__(self, vendor_cache: VendorCache,
trusted_ordered: list[tuple[str, str]]):
"""
Args:
vendor_cache: VendorCache instance.
trusted_ordered: Ordered list of (mac_uppercase, label) tuples
parsed from config.trusted_mac_addresses.
"""
self._vendor_cache = vendor_cache
# Ordered list preserves config order in metrics output
self._trusted_ordered: list[tuple[str, str]] = trusted_ordered
# Set for fast membership checks
self._trusted_set: set[str] = {mac for mac, _ in trusted_ordered}
self._lock = threading.Lock()
self._logger = logging.getLogger("MetricsServer")
self.start_time = datetime.now()
self.request_count: int = 0
self.snmp_poll_count: int = 0
# Last SNMP snapshot, replaced atomically on each poll.
self._snapshot: set[str] = set()
def update(self, current_macs: list[str]):
"""Replace the current snapshot with the latest SNMP readout."""
with self._lock:
self.snmp_poll_count += 1
self._snapshot = set(current_macs)
def _fmt_block(self, name: str, value, help_text: str,
metric_type: str = "gauge",
labels: dict[str, str] | None = None) -> list[str]:
"""Return HELP + TYPE + value lines for one metric."""
full_name = f"{config.exporter_prefix}{name}"
lines = [
f"# HELP {full_name} {help_text}",
f"# TYPE {full_name} {metric_type}",
]
if labels:
label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())
lines.append(f"{full_name}{{{label_str}}} {value}")
else:
lines.append(f"{full_name} {value}")
return lines
def _generate_metrics(self) -> str:
lines: list[str] = []
uptime = int((datetime.now() - self.start_time).total_seconds())
prefix = config.exporter_prefix
# --- Exporter meta ---
lines += self._fmt_block(
"exporter_uptime_seconds", uptime,
"Exporter uptime in seconds",
)
lines += self._fmt_block(
"exporter_requests_total", self.request_count,
"Total number of /metrics requests",
metric_type="counter",
)
lines += self._fmt_block(
"snmp_polls_total", self.snmp_poll_count,
"Total number of completed SNMP polls",
metric_type="counter",
)
# --- Vendor cache statistics ---
lines += self._fmt_block(
"vendor_cache_size", self._vendor_cache.size,
"Number of entries in the persistent vendor cache",
)
lines += self._fmt_block(
"vendor_cache_hits_total", self._vendor_cache.hits,
"Total vendor cache hits",
metric_type="counter",
)
lines += self._fmt_block(
"vendor_cache_misses_total", self._vendor_cache.misses,
"Total vendor cache misses (required library lookup)",
metric_type="counter",
)
# --- Device presence ---
# Trusted MACs are always present in the output (0 or 1).
# Unknown MACs seen in the current readout follow with value 1.
# A label comment above each line identifies the device by name.
metric_name = f"{prefix}device_present"
lines.append(
f"# HELP {metric_name} "
f"1 if the MAC address was present in the last SNMP readout, "
f"0 if absent. Trusted MACs are always emitted."
)
lines.append(f"# TYPE {metric_name} gauge")
with self._lock:
snapshot = set(self._snapshot)
# Trusted MACs — always emitted, value derived from snapshot
for mac, label in self._trusted_ordered:
value = 1 if mac in snapshot else 0
lines.append(f"# {label}")
lines.append(f'{metric_name}{{mac="{mac}",trusted="true"}} {value}')
# Unknown MACs — only those in snapshot but not in trusted list
for mac in sorted(snapshot - self._trusted_set):
lines.append("# Unknown")
lines.append(f'{metric_name}{{mac="{mac}",trusted="false"}} 1')
return "\n".join(lines) + "\n"
def create_handler(self):
server_instance = self
class RequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # suppress default access log
def do_GET(self):
with server_instance._lock:
server_instance.request_count += 1
if self.path == "/metrics":
body = server_instance._generate_metrics().encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
elif self.path in ("/", "/health"):
body = (
b"<html><head><title>MAC Watcher</title></head><body>"
b"<h1>MAC Watcher Prometheus Exporter</h1>"
b'<p><a href="/metrics">Metrics</a></p>'
b"</body></html>"
)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
else:
self.send_response(404)
self.end_headers()
return RequestHandler
# ---------------------------------------------------------------------------
# Main watcher loop
# ---------------------------------------------------------------------------
def watch(metrics_server: MetricsServer, vendor_cache: VendorCache,
trusted: set[str]):
"""
Poll the switch in a loop. Push each readout to the MetricsServer.
Send one email alert per unknown MAC (de-duplicated in memory).
Args:
metrics_server: receives each SNMP snapshot via update().
vendor_cache: MAC-to-vendor resolution with persistent cache.
trusted: normalized uppercase set of trusted MAC addresses.
"""
logger = logging.getLogger("Watcher")
alerted: set[str] = set()
while True:
macs = query_mac_table()
metrics_server.update(macs)
for mac in macs:
if mac not in trusted and mac not in alerted:
vendor = vendor_cache.lookup(mac)
logger.warning(f"Unknown MAC detected: {mac} ({vendor})")
alerted.add(mac)
send_alert_mail(mac, vendor)
time.sleep(config.snmp_poll_interval)
# ---------------------------------------------------------------------------
# Label parser
# ---------------------------------------------------------------------------
def _parse_trusted_labels() -> list[str]:
"""
Extract the inline comment labels from config.trusted_mac_addresses.
Reads config.py source at runtime so labels stay in sync with the list
without requiring a separate data structure. Falls back to the MAC
address string itself if no comment is found for an entry.
Returns a list of label strings aligned 1:1 with
config.trusted_mac_addresses.
"""
import inspect
import re as _re
src = inspect.getsource(config)
# Match each quoted MAC followed by an optional inline comment
pairs = _re.findall(
r'"([0-9A-Fa-f:]{17})"[^#\n]*#\s*(.+)',
src,
)
comment_map: dict[str, str] = {
mac.upper(): label.strip() for mac, label in pairs
}
return [
comment_map.get(normalize_mac(mac), normalize_mac(mac))
for mac in config.trusted_mac_addresses
]
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
setup_logging()
logger = logging.getLogger("Main")
# Build ordered list of (mac_uppercase, label) from config.
# The label is the inline comment text after the MAC address.
# Normalization happens here once; all downstream code uses uppercase.
trusted_ordered: list[tuple[str, str]] = []
seen: set[str] = set()
for mac, label in zip(
config.trusted_mac_addresses,
_parse_trusted_labels(),
):
mac_upper = normalize_mac(mac)
if mac_upper not in seen:
seen.add(mac_upper)
trusted_ordered.append((mac_upper, label))
trusted_set: set[str] = {mac for mac, _ in trusted_ordered}
logger.info("=" * 50)
logger.info("MAC Watcher starting")
logger.info(f"Switch: {config.switch_ip_addr}")
logger.info(f"snmpwalk: {config.snmpwalk_bin}")
logger.info(f"Poll interval: {config.snmp_poll_interval}s")
logger.info(f"Trusted MACs: {len(trusted_ordered)}")
logger.info(f"Exporter: http://{config.exporter_host}:{config.exporter_port}/metrics")
logger.info("=" * 50)
# Update local vendor DB on startup
logger.info("Updating MAC vendor database...")
try:
MacLookup().update_vendors()
logger.info("Vendor database updated")
except Exception as e:
logger.warning(f"Vendor database update failed (offline?): {e}")
vendor_cache = VendorCache(config.vendor_cache_file)
metrics_server = MetricsServer(vendor_cache, trusted_ordered)
# Start watcher in background thread
watcher_thread = threading.Thread(
target=watch,
args=(metrics_server, vendor_cache, trusted_set),
daemon=True,
name="Watcher",
)
watcher_thread.start()
# Start HTTP metrics server (blocking)
handler = metrics_server.create_handler()
try:
http_server = HTTPServer((config.exporter_host, config.exporter_port), handler)
logger.info(
f"HTTP server listening on {config.exporter_host}:{config.exporter_port}"
)
http_server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutdown requested")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
finally:
logger.info("Shutdown complete")
sys.exit(0)
while(True):
mac_addresses = query_mac_from_switch()
for mac_address in mac_addresses:
if mac_address not in config.trusted_mac_addresses:
if mac_address not in alerted_mac_addresses:
alerted_mac_addresses.append(mac_address)
send_alert_mail(mac_address)
time.sleep(10)
if __name__ == "__main__":
print("updating MAC vendors ...")
MacLookup().update_vendors()
print("update done\n")
try:
watch()
except:
pass
main()

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
mac-vendor-lookup>=0.1.12

View File

@ -1,15 +1,18 @@
[Unit]
Description=MAC-Watcher
After=syslog.target
Description=MAC Watcher - SNMP MAC table monitor with Prometheus exporter
After=network.target
Wants=network.target
[Service]
RestartSec=2s
Type=oneshot
Type=simple
User=root
Group=root
WorkingDirectory=/opt/mac_watcher/
WorkingDirectory=/opt/mac_watcher
ExecStart=/usr/bin/python3 /opt/mac_watcher/mac_watcher.py
Restart=on-failure
RestartSec=10s
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
WantedBy=multi-user.target