add prometheus export

This commit is contained in:
2026-03-29 22:10:04 +02:00
parent 62445b426a
commit f6047a0b6e
6 changed files with 572 additions and 261 deletions

198
.gitignore vendored
View File

@ -1,196 +1,34 @@
# ---> Python # Python
# Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class
# C extensions
*.so *.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg *.egg
MANIFEST *.egg-info/
dist/
build/
.eggs/
# PyInstaller # Virtual environments
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env .env
.venv .venv
env/
venv/ venv/
ENV/ env/
env.bak/
venv.bak/
# Spyder project settings # pytest / coverage
.spyderproject .pytest_cache/
.spyproject .coverage
htmlcov/
# Rope project settings # MAC vendor lookup cache (auto-generated, contains local data)
.ropeproject mac_vendor_cache.json
# mkdocs documentation # Editor
/site .vscode/
.idea/
# mypy *.kate-swp
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# ---> VisualStudioCode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
# Local History for Visual Studio Code
.history/
# Built Visual Studio Code Extensions
*.vsix
# ---> Kate
# Swap Files #
.*.kate-swp
.swp.*
# ---> Linux
*~ *~
# temporary files which can be created if a process still has a handle open of a deleted file # Linux
.fuse_hidden* .fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-* .Trash-*
# .nfs files are created when an open file is removed but is still being accessed
.nfs* .nfs*

View File

@ -1,22 +1,82 @@
# mac_watcher # mac_watcher
Scrape the MAC table from HP ProCurve switch and altert if a untrusted MAC is found. Polls a switch MAC address table via SNMP. Sends email alerts for unknown MACs. Exposes a Prometheus metrics endpoint reflecting the current switch state.
## Requirements
## Overview ## - Debian Linux
Scrape all known MAC addresses via snmp from a HP ProCurve switch. - Python 3.11+
If a reported MAC is not i a white list, a alert is sent via email. - `snmpwalk` (`apt install snmp`)
- Python dependencies: `pip install -r requirements.txt --break-system-packages`
## Install
## Install ## ```
apt install snmp
cd /opt
git clone https://git.mosad.xyz/localhorst/mac_watcher.git
cd /opt/mac_watcher
pip install -r requirements.txt --break-system-packages
```
- `pip install mac-vendor-lookup` Edit `config.py` and set all values.
- `cd /opt/`
- `git clone https://git.mosad.xyz/localhorst/mac_watcher.git`
- `cd /opt/mac_watcher/`
- Set the constants in `config.py`
- `chmod +x /opt/mac_watcher/mac_watcher.py`
- `cp scripts/mac-watcher.service /etc/systemd/system/mac-watcher.service`
- `systemctl daemon-reload && systemctl enable --now mac-watcher.service`
```
chmod +x /opt/mac_watcher/mac_watcher.py
cp scripts/mac-watcher.service /etc/systemd/system/
systemctl daemon-reload
systemctl enable --now mac-watcher.service
```
## Configuration
All settings are in `config.py`.
| Setting | Description |
|---|---|
| `switch_ip_addr` | IP of the managed switch |
| `switch_snmp_community` | SNMP v2c community string |
| `snmp_poll_interval` | Seconds between polls |
| `snmpwalk_bin` | Full path to `snmpwalk` binary |
| `trusted_mac_addresses` | Known/allowed MACs — case-insensitive, normalized at startup |
| `vendor_cache_file` | Path to persistent vendor cache JSON |
| `exporter_host` | Bind address for Prometheus exporter |
| `exporter_port` | Port for Prometheus exporter |
| `exporter_prefix` | Metric name prefix |
| `mail_*` | SMTP credentials and recipients |
## Prometheus Metrics
Endpoint: `http://<host>:<exporter_port>/metrics`
### Device presence
Single gauge reflecting the **last SNMP readout as-is**. Every MAC currently in the switch table is emitted with value `1`. MACs from previous readouts that are no longer present are dropped. Between startup and the first successful poll no device series are emitted.
```
mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF",trusted="true"} 1
mac_watcher_device_present{mac="11:22:33:44:55:66",trusted="false"} 1
```
The `trusted` label reflects whether the MAC is in `trusted_mac_addresses`.
### Exporter / cache statistics
```
mac_watcher_exporter_uptime_seconds
mac_watcher_exporter_requests_total
mac_watcher_snmp_polls_total
mac_watcher_vendor_cache_size
mac_watcher_vendor_cache_hits_total
mac_watcher_vendor_cache_misses_total
```
### prometheus.yml example
```yaml
scrape_configs:
- job_name: 'mac_watcher'
static_configs:
- targets: ['localhost:9200']
scrape_interval: 60s
```

View File

@ -1,16 +1,17 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com """
Date of creation: 2023/02/26 MAC Watcher - Configuration
Date of last modification: 2023/02/26
""" """
# switch # --- SNMP / Switch ---
switch_ip_addr = "10.0.0.2" switch_ip_addr = "10.0.0.2"
switch_snmp_community = "public" switch_snmp_community = "public"
snmp_poll_interval = 30 # seconds between SNMP queries
snmpwalk_bin = "/usr/bin/snmpwalk" # full path to snmpwalk binary
# email # --- Email alerts ---
mail_server_domain = 'smtp.maildomain.dev' mail_server_domain = "smtp.maildomain.dev"
mail_server_port = 587 mail_server_port = 587
mail_server_password = "__PW_DB__" mail_server_password = "__PW_DB__"
mail_from_address = "sender@domain.com" mail_from_address = "sender@domain.com"
@ -18,11 +19,23 @@ mail_from_name = "MAC-Watcher"
mail_to_address = "dummy@example.com" mail_to_address = "dummy@example.com"
mail_to_name = "Joe Doe" mail_to_name = "Joe Doe"
# trusted macs # --- Trusted MAC addresses (whitelist) ---
# Case-insensitive. Normalized to uppercase at startup.
# MACs not in this list trigger an email alert.
# All MACs seen in the last SNMP readout are exposed via Prometheus regardless.
# Format: "AA:BB:CC:DD:EE:FF" # device description
trusted_mac_addresses = [ trusted_mac_addresses = [
"00:EE:00:EE:40:EE", #Router "00:EE:00:EE:40:EE", # Router
"00:EE:C2:EE:82:EE", #Smartphone "00:EE:C2:EE:82:EE", # Smartphone
] ]
# --- MAC vendor lookup cache ---
vendor_cache_file = "mac_vendor_cache.json"
# --- Prometheus exporter ---
exporter_prefix = "mac_watcher_"
exporter_host = "10.0.0.12"
exporter_port = 9200
# --- Logging ---
log_level = "INFO" # DEBUG, INFO, WARNING, ERROR

View File

@ -1,73 +1,469 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com """
Date of creation: 2023/02/26 MAC Watcher
Date of last modification: 2023/02/26 Polls a switch via SNMP, alerts on unknown MACs, and exposes a Prometheus
metrics endpoint reflecting the last SNMP readout.
Author: Hendrik Schutter, mail@hendrikschutter.com
""" """
from subprocess import PIPE, Popen import json
from mac_vendor_lookup import MacLookup import logging
import re
import smtplib import smtplib
import sys
import threading
import time
import email.utils import email.utils
from email.mime.text import MIMEText from email.mime.text import MIMEText
import time from http.server import BaseHTTPRequestHandler, HTTPServer
from subprocess import PIPE, Popen
from datetime import datetime
from mac_vendor_lookup import MacLookup
import config import config
def send_alert_mail(mac_addr): # ---------------------------------------------------------------------------
server = smtplib.SMTP(config.mail_server_domain, config.mail_server_port) # MAC normalization helpers
# ---------------------------------------------------------------------------
MAC_RE = re.compile(r"^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$")
def normalize_mac(mac: str) -> str:
"""Return MAC address in canonical uppercase colon-separated form."""
return mac.strip().upper()
def normalize_mac_list(macs: list[str]) -> list[str]:
"""Normalize a list of MAC addresses, remove duplicates, preserve order."""
seen: set[str] = set()
result: list[str] = []
for mac in macs:
n = normalize_mac(mac)
if n not in seen:
seen.add(n)
result.append(n)
return result
def is_valid_mac(mac: str) -> bool:
return bool(MAC_RE.match(mac))
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
def setup_logging():
log_level = getattr(logging, config.log_level.upper(), logging.INFO)
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# ---------------------------------------------------------------------------
# MAC vendor cache (persistent JSON file, not tracked by git)
# ---------------------------------------------------------------------------
class VendorCache:
"""Persistent JSON-backed cache for MAC vendor lookups."""
def __init__(self, cache_file: str):
self._file = cache_file
self._data: dict[str, str] = {}
self._hits: int = 0
self._misses: int = 0
self._lock = threading.Lock()
self._logger = logging.getLogger("VendorCache")
self._load()
def _load(self):
try:
with open(self._file, "r") as f:
self._data = json.load(f)
self._logger.info(
f"Loaded {len(self._data)} vendor cache entries from {self._file}"
)
except FileNotFoundError:
self._logger.info(
f"Cache file {self._file} not found, starting with empty cache"
)
self._data = {}
except json.JSONDecodeError as e:
self._logger.warning(f"Cache file corrupt, resetting: {e}")
self._data = {}
def _save(self):
try:
with open(self._file, "w") as f:
json.dump(self._data, f, indent=2)
except OSError as e:
self._logger.error(f"Failed to write cache file: {e}")
def lookup(self, mac: str) -> str:
"""Return vendor string for mac, querying mac_vendor_lookup if not cached."""
mac_upper = normalize_mac(mac)
with self._lock:
if mac_upper in self._data:
self._hits += 1
return self._data[mac_upper]
# Not in cache — query library (blocking, disk I/O)
self._misses += 1
try:
vendor = MacLookup().lookup(mac)
except Exception:
vendor = "Unknown vendor"
with self._lock:
self._data[mac_upper] = vendor
self._save()
return vendor
@property
def size(self) -> int:
with self._lock:
return len(self._data)
@property
def hits(self) -> int:
with self._lock:
return self._hits
@property
def misses(self) -> int:
with self._lock:
return self._misses
# ---------------------------------------------------------------------------
# SNMP query
# ---------------------------------------------------------------------------
def query_mac_table() -> list[str]:
"""
Query the switch MAC address table via SNMP (OID 1.3.6.1.2.1.17.4.3.1.1).
Returns a deduplicated list of normalized MAC strings "AA:BB:CC:DD:EE:FF".
"""
logger = logging.getLogger("SNMPQuery")
mac_addresses: list[str] = []
cmd = [
config.snmpwalk_bin,
"-v", "2c",
"-O", "vqe",
"-c", config.switch_snmp_community,
config.switch_ip_addr,
"1.3.6.1.2.1.17.4.3.1.1",
]
try:
with Popen(cmd, stdout=PIPE, stderr=PIPE) as process:
stdout, stderr = process.communicate()
if process.returncode != 0:
logger.error(
f"snmpwalk failed (rc={process.returncode}): "
f"{stderr.decode().strip()}"
)
return []
for line in stdout.decode("utf-8").splitlines():
mac = line.replace(" ", ":").replace('"', "").strip().rstrip(":")
mac = normalize_mac(mac)
if is_valid_mac(mac):
mac_addresses.append(mac)
except FileNotFoundError:
logger.error(
f"snmpwalk binary not found at '{config.snmpwalk_bin}'. "
f"Install snmp or adjust snmpwalk_bin in config.py."
)
except Exception as e:
logger.error(f"Exception during SNMP query: {e}")
result = normalize_mac_list(mac_addresses)
logger.debug(f"SNMP returned {len(result)} unique MAC addresses")
return result
# ---------------------------------------------------------------------------
# Email alert
# ---------------------------------------------------------------------------
def send_alert_mail(mac_addr: str, vendor: str):
logger = logging.getLogger("EmailAlert")
timestamp = datetime.now().strftime("%d.%m.%Y %H:%M:%S")
body = (
f"New unknown MAC address detected.\n\n"
f"Date: {timestamp}\n"
f"MAC: {mac_addr}\n"
f"Vendor: {vendor}\n"
)
msg = MIMEText(body)
msg["Subject"] = f"MAC-Watcher: unknown MAC {mac_addr} ({vendor})"
msg["To"] = email.utils.formataddr((config.mail_to_name, config.mail_to_address))
msg["From"] = email.utils.formataddr((config.mail_from_name, config.mail_from_address))
try:
server = smtplib.SMTP(config.mail_server_domain, config.mail_server_port, timeout=10)
server.starttls() server.starttls()
server.login(config.mail_from_address, config.mail_server_password) server.login(config.mail_from_address, config.mail_server_password)
server.sendmail(config.mail_from_address, config.mail_to_address, msg.as_string())
try:
mac_vendor = MacLookup().lookup(mac_addr)
except:
mac_vendor = " Vendor not found"
timeLong = time.strftime("%d.%m.%Y %H:%M:%S")
body = "Hallo Admin,\n\nneue MAC-Adresse gefunden!\n\nDatum: "+ timeLong + "\nMAC: " + str(mac_addr) +"\nVendor: " + mac_vendor + "\n\nVersion: 1.0 - 26.02.2023"
msg = MIMEText(body)
msg['Subject'] = 'New MAC found: ' + str(mac_addr) + " - " + mac_vendor
msg['To'] = email.utils.formataddr((config.mail_to_name, config.mail_to_address ))
msg['From'] = email.utils.formataddr((config.mail_from_name, config.mail_from_address))
server.sendmail(config.mail_from_address, config.mail_to_address , msg.as_string())
server.quit() server.quit()
logger.info(f"Alert sent for {mac_addr}")
except Exception as e:
logger.error(f"Failed to send alert mail for {mac_addr}: {e}")
def query_mac_from_switch():
mac_addresses = list()
command = "snmpwalk -v 2c -O vqe -c " + config.switch_snmp_community + " " + config.switch_ip_addr + " 1.3.6.1.2.1.17.4.3.1.1"
with Popen(command, stdout=PIPE, stderr=None, shell=True) as process:
output = process.communicate()[0].decode("utf-8")
for mac in output.split("\n"):
mac = mac.replace(" ", ":")
mac = mac.replace('"', "")
mac = mac[0:-1]
if(len(mac) == 17):
mac_addresses.append(mac)
return mac_addresses
def watch(): # ---------------------------------------------------------------------------
# Prometheus metrics
# ---------------------------------------------------------------------------
alerted_mac_addresses = list() class MetricsServer:
"""
HTTP server exposing a /metrics endpoint for Prometheus.
The device presence metric is a direct snapshot of the last SNMP readout.
Every MAC seen in that readout gets value 1; MACs from previous readouts
that are no longer present are dropped from the output entirely.
The trusted/unknown status is exposed as an additional label.
Metric layout:
mac_watcher_device_present{mac="AA:BB:CC:DD:EE:FF", trusted="true"} 1
mac_watcher_vendor_cache_size 42
mac_watcher_vendor_cache_hits_total 100
mac_watcher_vendor_cache_misses_total 5
mac_watcher_snmp_polls_total 30
mac_watcher_exporter_uptime_seconds 900
mac_watcher_exporter_requests_total 15
"""
def __init__(self, vendor_cache: VendorCache, trusted: set[str]):
self._vendor_cache = vendor_cache
self._trusted = trusted
self._lock = threading.Lock()
self._logger = logging.getLogger("MetricsServer")
self.start_time = datetime.now()
self.request_count: int = 0
self.snmp_poll_count: int = 0
# Last SNMP snapshot: mac -> 1 (always 1 while in snapshot)
# Replaced atomically on each poll.
self._snapshot: list[str] = []
def update(self, current_macs: list[str]):
"""Replace the current snapshot with the latest SNMP readout."""
with self._lock:
self.snmp_poll_count += 1
self._snapshot = list(current_macs)
def _fmt_block(self, name: str, value, help_text: str,
metric_type: str = "gauge",
labels: dict[str, str] | None = None) -> list[str]:
"""Return HELP + TYPE + value lines for one metric."""
full_name = f"{config.exporter_prefix}{name}"
lines = [
f"# HELP {full_name} {help_text}",
f"# TYPE {full_name} {metric_type}",
]
if labels:
label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())
lines.append(f"{full_name}{{{label_str}}} {value}")
else:
lines.append(f"{full_name} {value}")
return lines
def _generate_metrics(self) -> str:
lines: list[str] = []
uptime = int((datetime.now() - self.start_time).total_seconds())
prefix = config.exporter_prefix
# --- Exporter meta ---
lines += self._fmt_block(
"exporter_uptime_seconds", uptime,
"Exporter uptime in seconds",
)
lines += self._fmt_block(
"exporter_requests_total", self.request_count,
"Total number of /metrics requests",
metric_type="counter",
)
lines += self._fmt_block(
"snmp_polls_total", self.snmp_poll_count,
"Total number of completed SNMP polls",
metric_type="counter",
)
# --- Vendor cache statistics ---
lines += self._fmt_block(
"vendor_cache_size", self._vendor_cache.size,
"Number of entries in the persistent vendor cache",
)
lines += self._fmt_block(
"vendor_cache_hits_total", self._vendor_cache.hits,
"Total vendor cache hits",
metric_type="counter",
)
lines += self._fmt_block(
"vendor_cache_misses_total", self._vendor_cache.misses,
"Total vendor cache misses (required library lookup)",
metric_type="counter",
)
# --- Device presence snapshot ---
# HELP and TYPE appear once; one series per MAC in the last readout.
metric_name = f"{prefix}device_present"
lines.append(
f"# HELP {metric_name} "
f"1 if the MAC address was present in the last SNMP readout"
)
lines.append(f"# TYPE {metric_name} gauge")
with self._lock:
snapshot = list(self._snapshot)
for mac in snapshot:
trusted_label = "true" if mac in self._trusted else "false"
label_str = f'mac="{mac}",trusted="{trusted_label}"'
lines.append(f"{metric_name}{{{label_str}}} 1")
return "\n".join(lines) + "\n"
def create_handler(self):
server_instance = self
class RequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass # suppress default access log
def do_GET(self):
with server_instance._lock:
server_instance.request_count += 1
if self.path == "/metrics":
body = server_instance._generate_metrics().encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
elif self.path in ("/", "/health"):
body = (
b"<html><head><title>MAC Watcher</title></head><body>"
b"<h1>MAC Watcher Prometheus Exporter</h1>"
b'<p><a href="/metrics">Metrics</a></p>'
b"</body></html>"
)
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
else:
self.send_response(404)
self.end_headers()
return RequestHandler
# ---------------------------------------------------------------------------
# Main watcher loop
# ---------------------------------------------------------------------------
def watch(metrics_server: MetricsServer, vendor_cache: VendorCache,
trusted: set[str]):
"""
Poll the switch in a loop. Push each readout to the MetricsServer.
Send one email alert per unknown MAC (de-duplicated in memory).
Args:
metrics_server: receives each SNMP snapshot via update().
vendor_cache: MAC-to-vendor resolution with persistent cache.
trusted: normalized uppercase set of trusted MAC addresses.
"""
logger = logging.getLogger("Watcher")
alerted: set[str] = set()
while True:
macs = query_mac_table()
metrics_server.update(macs)
for mac in macs:
if mac not in trusted and mac not in alerted:
vendor = vendor_cache.lookup(mac)
logger.warning(f"Unknown MAC detected: {mac} ({vendor})")
alerted.add(mac)
send_alert_mail(mac, vendor)
time.sleep(config.snmp_poll_interval)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main():
setup_logging()
logger = logging.getLogger("Main")
# Normalize trusted MAC list once at startup.
trusted_macs: set[str] = set(normalize_mac_list(config.trusted_mac_addresses))
logger.info("=" * 50)
logger.info("MAC Watcher starting")
logger.info(f"Switch: {config.switch_ip_addr}")
logger.info(f"snmpwalk: {config.snmpwalk_bin}")
logger.info(f"Poll interval: {config.snmp_poll_interval}s")
logger.info(f"Trusted MACs: {len(trusted_macs)}")
logger.info(f"Exporter: http://{config.exporter_host}:{config.exporter_port}/metrics")
logger.info("=" * 50)
# Update local vendor DB on startup
logger.info("Updating MAC vendor database...")
try:
MacLookup().update_vendors()
logger.info("Vendor database updated")
except Exception as e:
logger.warning(f"Vendor database update failed (offline?): {e}")
vendor_cache = VendorCache(config.vendor_cache_file)
metrics_server = MetricsServer(vendor_cache, trusted_macs)
# Start watcher in background thread
watcher_thread = threading.Thread(
target=watch,
args=(metrics_server, vendor_cache, trusted_macs),
daemon=True,
name="Watcher",
)
watcher_thread.start()
# Start HTTP metrics server (blocking)
handler = metrics_server.create_handler()
try:
http_server = HTTPServer((config.exporter_host, config.exporter_port), handler)
logger.info(
f"HTTP server listening on {config.exporter_host}:{config.exporter_port}"
)
http_server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutdown requested")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
finally:
logger.info("Shutdown complete")
sys.exit(0)
while(True):
mac_addresses = query_mac_from_switch()
for mac_address in mac_addresses:
if mac_address not in config.trusted_mac_addresses:
if mac_address not in alerted_mac_addresses:
alerted_mac_addresses.append(mac_address)
send_alert_mail(mac_address)
time.sleep(10)
if __name__ == "__main__": if __name__ == "__main__":
main()
print("updating MAC vendors ...")
MacLookup().update_vendors()
print("update done\n")
try:
watch()
except:
pass

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
mac-vendor-lookup>=0.1.12

View File

@ -1,15 +1,18 @@
[Unit] [Unit]
Description=MAC-Watcher Description=MAC Watcher - SNMP MAC table monitor with Prometheus exporter
After=syslog.target
After=network.target After=network.target
Wants=network.target
[Service] [Service]
RestartSec=2s Type=simple
Type=oneshot
User=root User=root
Group=root Group=root
WorkingDirectory=/opt/mac_watcher/ WorkingDirectory=/opt/mac_watcher
ExecStart=/usr/bin/python3 /opt/mac_watcher/mac_watcher.py ExecStart=/usr/bin/python3 /opt/mac_watcher/mac_watcher.py
Restart=on-failure
RestartSec=10s
StandardOutput=journal
StandardError=journal
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target