596 lines
20 KiB
Python
596 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
TTN VEGAPULS Air Prometheus Exporter
|
|
Exports metrics from VEGAPULS Air sensors connected via The Things Network
|
|
|
|
Author: Hendrik Schutter, mail@hendrikschutter.com
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import time
|
|
import threading
|
|
import logging
|
|
import ssl
|
|
from datetime import datetime, timedelta
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
from typing import Dict, Optional, Any
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
import config
|
|
|
|
|
|
class SensorDataCache:
|
|
"""Thread-safe cache for sensor uplink data with timeout tracking"""
|
|
|
|
def __init__(self, timeout_hours: int = 19):
|
|
self._data: Dict[str, Dict[str, Any]] = {}
|
|
self._lock = threading.RLock()
|
|
self.timeout_hours = timeout_hours
|
|
|
|
def update(
|
|
self, device_id: str, payload: Dict, metadata: list, timestamp: datetime
|
|
):
|
|
"""
|
|
Update cached data for a device
|
|
|
|
Args:
|
|
device_id: Unique device identifier
|
|
payload: Decoded payload from TTN
|
|
metadata: RX metadata from TTN
|
|
timestamp: Timestamp of the uplink
|
|
"""
|
|
with self._lock:
|
|
self._data[device_id] = {
|
|
"payload": payload,
|
|
"metadata": metadata,
|
|
"timestamp": timestamp,
|
|
"is_online": True,
|
|
}
|
|
logging.info(f"Updated cache for device {device_id}")
|
|
|
|
def get_all_devices(self) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Get all cached device data
|
|
|
|
Returns:
|
|
Dictionary of device data
|
|
"""
|
|
with self._lock:
|
|
return dict(self._data)
|
|
|
|
def check_timeouts(self):
|
|
"""Check all devices for timeout and mark offline ones"""
|
|
with self._lock:
|
|
now = datetime.now()
|
|
timeout_threshold = timedelta(hours=self.timeout_hours)
|
|
|
|
for device_id, data in self._data.items():
|
|
time_since_update = now - data["timestamp"]
|
|
was_online = data["is_online"]
|
|
data["is_online"] = time_since_update < timeout_threshold
|
|
|
|
if was_online and not data["is_online"]:
|
|
logging.warning(
|
|
f"Device {device_id} marked as OFFLINE "
|
|
f"(no uplink for {time_since_update.total_seconds()/3600:.1f} hours)"
|
|
)
|
|
elif not was_online and data["is_online"]:
|
|
logging.info(f"Device {device_id} is back ONLINE")
|
|
|
|
def cleanup_old_entries(self, max_age_hours: int = 72):
|
|
"""Remove entries older than max_age_hours"""
|
|
with self._lock:
|
|
now = datetime.now()
|
|
max_age = timedelta(hours=max_age_hours)
|
|
|
|
devices_to_remove = [
|
|
device_id
|
|
for device_id, data in self._data.items()
|
|
if now - data["timestamp"] > max_age
|
|
]
|
|
|
|
for device_id in devices_to_remove:
|
|
del self._data[device_id]
|
|
logging.info(f"Removed stale cache entry for device {device_id}")
|
|
|
|
|
|
class TTNMQTTClient:
|
|
"""Manages MQTT connection to TTN with automatic reconnection"""
|
|
|
|
def __init__(self, cache: SensorDataCache, config_module):
|
|
self.cache = cache
|
|
self.config = config_module
|
|
self.client: Optional[mqtt.Client] = None
|
|
self.connected = False
|
|
self._lock = threading.Lock()
|
|
self._should_run = True
|
|
|
|
# Setup logging
|
|
self.logger = logging.getLogger("TTNMQTTClient")
|
|
|
|
def _on_connect(self, client, userdata, flags, reason_code, properties):
|
|
"""Callback when connected to MQTT broker"""
|
|
if reason_code == 0:
|
|
self.logger.info("Successfully connected to TTN MQTT broker")
|
|
self.connected = True
|
|
|
|
# Subscribe to uplink messages
|
|
topic = f"v3/{self.config.ttn_user}/devices/+/up"
|
|
client.subscribe(topic, qos=1)
|
|
self.logger.info(f"Subscribed to topic: {topic}")
|
|
else:
|
|
self.logger.error(
|
|
f"Failed to connect to MQTT broker. Reason code: {reason_code}"
|
|
)
|
|
self.connected = False
|
|
|
|
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
|
|
"""Callback when disconnected from MQTT broker"""
|
|
self.logger.warning(
|
|
f"Disconnected from MQTT broker. Reason code: {reason_code}"
|
|
)
|
|
self.connected = False
|
|
|
|
def _on_message(self, client, userdata, msg):
|
|
"""Callback when a message is received"""
|
|
self.logger.debug(f"Uplink message received! {msg.topic}")
|
|
try:
|
|
# Parse the JSON payload
|
|
message_data = json.loads(msg.payload.decode("utf-8"))
|
|
|
|
# Extract device information
|
|
device_id = message_data.get("end_device_ids", {}).get(
|
|
"device_id", "unknown"
|
|
)
|
|
|
|
# Check if this is an uplink message with decoded payload
|
|
if "uplink_message" not in message_data:
|
|
self.logger.debug(f"Ignoring non-uplink message from {device_id}")
|
|
return
|
|
|
|
uplink = message_data["uplink_message"]
|
|
|
|
if "decoded_payload" not in uplink:
|
|
self.logger.warning(f"No decoded payload for device {device_id}")
|
|
return
|
|
|
|
# Update cache with new data
|
|
self.cache.update(
|
|
device_id=device_id,
|
|
payload=uplink["decoded_payload"],
|
|
metadata=uplink.get("rx_metadata", []),
|
|
timestamp=datetime.now(),
|
|
)
|
|
|
|
self.logger.debug(f"Processed uplink from device: {device_id}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.error(f"Failed to parse MQTT message: {e}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing MQTT message: {e}", exc_info=True)
|
|
|
|
def _create_client(self):
|
|
"""Create and configure MQTT client"""
|
|
client = mqtt.Client(
|
|
client_id=f"vegapuls-exporter-{int(time.time())}",
|
|
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
|
|
)
|
|
|
|
# Set callbacks
|
|
client.on_connect = self._on_connect
|
|
client.on_disconnect = self._on_disconnect
|
|
client.on_message = self._on_message
|
|
|
|
# Set credentials
|
|
client.username_pw_set(self.config.ttn_user, self.config.ttn_key)
|
|
|
|
# Configure TLS
|
|
client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT)
|
|
client.tls_insecure_set(False)
|
|
|
|
return client
|
|
|
|
def connect(self):
|
|
"""Connect to TTN MQTT broker"""
|
|
with self._lock:
|
|
try:
|
|
if self.client is None:
|
|
self.client = self._create_client()
|
|
|
|
broker_url = f"{self.config.ttn_region.lower()}.cloud.thethings.network"
|
|
self.logger.info(f"Connecting to MQTT broker: {broker_url}")
|
|
|
|
self.client.connect(
|
|
broker_url, port=8883, keepalive=self.config.mqtt_keepalive
|
|
)
|
|
|
|
# Start the network loop in a separate thread
|
|
self.client.loop_start()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect to MQTT broker: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from MQTT broker"""
|
|
with self._lock:
|
|
if self.client:
|
|
self.client.loop_stop()
|
|
self.client.disconnect()
|
|
self.connected = False
|
|
self.logger.info("Disconnected from MQTT broker")
|
|
|
|
def run_with_reconnect(self):
|
|
"""Main loop with automatic reconnection"""
|
|
reconnect_delay = self.config.mqtt_reconnect_delay
|
|
|
|
while self._should_run:
|
|
if not self.connected:
|
|
self.logger.info("Attempting to connect to MQTT broker...")
|
|
|
|
if self.connect():
|
|
# Reset reconnect delay on successful connection
|
|
reconnect_delay = self.config.mqtt_reconnect_delay
|
|
else:
|
|
# Exponential backoff for reconnection
|
|
self.logger.warning(
|
|
f"Reconnection failed. Retrying in {reconnect_delay}s..."
|
|
)
|
|
time.sleep(reconnect_delay)
|
|
reconnect_delay = min(
|
|
reconnect_delay * 2, self.config.mqtt_reconnect_max_delay
|
|
)
|
|
continue
|
|
|
|
# Wait a bit before checking connection again
|
|
time.sleep(10)
|
|
|
|
def stop(self):
|
|
"""Stop the MQTT client"""
|
|
self._should_run = False
|
|
self.disconnect()
|
|
|
|
|
|
class MetricsServer:
|
|
"""HTTP server for Prometheus metrics endpoint"""
|
|
|
|
def __init__(self, cache: SensorDataCache, config_module):
|
|
self.cache = cache
|
|
self.config = config_module
|
|
self.start_time = datetime.now()
|
|
self.request_count = 0
|
|
self._lock = threading.Lock()
|
|
|
|
def _format_metric(
|
|
self, name: str, value: Any, labels: Dict[str, str] = None
|
|
) -> str:
|
|
"""
|
|
Format a Prometheus metric
|
|
|
|
Args:
|
|
name: Metric name
|
|
value: Metric value
|
|
labels: Optional labels dictionary
|
|
|
|
Returns:
|
|
Formatted metric string
|
|
"""
|
|
metric_name = f"{self.config.exporter_prefix}{name}"
|
|
|
|
if labels:
|
|
label_str = ",".join([f'{k}="{v}"' for k, v in labels.items()])
|
|
return f"{metric_name}{{{label_str}}} {value}"
|
|
else:
|
|
return f"{metric_name} {value}"
|
|
|
|
def _generate_metrics(self) -> str:
|
|
"""Generate all Prometheus metrics"""
|
|
metrics = []
|
|
|
|
# Exporter meta metrics
|
|
uptime = int((datetime.now() - self.start_time).total_seconds())
|
|
metrics.append(self._format_metric("exporter_uptime_seconds", uptime))
|
|
metrics.append(
|
|
self._format_metric("exporter_requests_total", self.request_count)
|
|
)
|
|
|
|
# Get all device data
|
|
devices = self.cache.get_all_devices()
|
|
|
|
# Overall health metric
|
|
online_devices = sum(1 for d in devices.values() if d["is_online"])
|
|
total_devices = len(devices)
|
|
metrics.append(self._format_metric("devices_total", total_devices))
|
|
metrics.append(self._format_metric("devices_online", online_devices))
|
|
|
|
# Per-device metrics
|
|
for device_id, data in devices.items():
|
|
labels = {"device_id": device_id}
|
|
|
|
# Device online status (1 = online, 0 = offline/timeout)
|
|
metrics.append(
|
|
self._format_metric("device_online", int(data["is_online"]), labels)
|
|
)
|
|
|
|
# Time since last uplink in seconds
|
|
time_since_uplink = (datetime.now() - data["timestamp"]).total_seconds()
|
|
metrics.append(
|
|
self._format_metric(
|
|
"last_uplink_seconds_ago", int(time_since_uplink), labels
|
|
)
|
|
)
|
|
|
|
payload = data["payload"]
|
|
metadata = data["metadata"]
|
|
|
|
# Sensor measurements
|
|
if "Distance" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"distance_mm", float(payload["Distance"]), labels
|
|
)
|
|
)
|
|
|
|
if "Temperature" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"temperature_celsius", int(payload["Temperature"]), labels
|
|
)
|
|
)
|
|
|
|
if "Inclination_degree" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"inclination_degrees",
|
|
int(payload["Inclination_degree"]),
|
|
labels,
|
|
)
|
|
)
|
|
|
|
if "MvLinProcent" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"linear_percent", int(payload["MvLinProcent"]), labels
|
|
)
|
|
)
|
|
|
|
if "MvProcent" in payload:
|
|
metrics.append(
|
|
self._format_metric("percent", int(payload["MvProcent"]), labels)
|
|
)
|
|
|
|
if "MvScaled" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"scaled_value", float(payload["MvScaled"]), labels
|
|
)
|
|
)
|
|
|
|
if "MvScaledUnit" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"scaled_unit", int(payload["MvScaledUnit"]), labels
|
|
)
|
|
)
|
|
|
|
if "PacketIdentifier" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"packet_identifier", int(payload["PacketIdentifier"]), labels
|
|
)
|
|
)
|
|
|
|
if "RemainingPower" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"battery_percent", int(payload["RemainingPower"]), labels
|
|
)
|
|
)
|
|
|
|
if "Unit" in payload:
|
|
metrics.append(
|
|
self._format_metric("unit", int(payload["Unit"]), labels)
|
|
)
|
|
|
|
if "UnitTemperature" in payload:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"temperature_unit", int(payload["UnitTemperature"]), labels
|
|
)
|
|
)
|
|
|
|
# LoRaWAN metadata
|
|
if metadata and len(metadata) > 0:
|
|
first_gateway = metadata[0]
|
|
|
|
if "rssi" in first_gateway:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"rssi_dbm", int(first_gateway["rssi"]), labels
|
|
)
|
|
)
|
|
|
|
if "channel_rssi" in first_gateway:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"channel_rssi_dbm",
|
|
int(first_gateway["channel_rssi"]),
|
|
labels,
|
|
)
|
|
)
|
|
|
|
if "snr" in first_gateway:
|
|
metrics.append(
|
|
self._format_metric(
|
|
"snr_db", float(first_gateway["snr"]), labels
|
|
)
|
|
)
|
|
|
|
return "\n".join(metrics) + "\n"
|
|
|
|
def create_handler(self):
|
|
"""Create HTTP request handler"""
|
|
server_instance = self
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
"""Suppress default logging"""
|
|
pass
|
|
|
|
def do_GET(self):
|
|
with server_instance._lock:
|
|
server_instance.request_count += 1
|
|
|
|
if self.path == "/metrics":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|
self.end_headers()
|
|
|
|
metrics = server_instance._generate_metrics()
|
|
self.wfile.write(metrics.encode("utf-8"))
|
|
|
|
elif self.path == "/" or self.path == "/health":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
self.end_headers()
|
|
|
|
html = """
|
|
<html>
|
|
<head><title>VEGAPULS Air Exporter</title></head>
|
|
<body>
|
|
<h1>TTN VEGAPULS Air Prometheus Exporter</h1>
|
|
<p>Exporter for VEGAPULS Air sensors connected via The Things Network</p>
|
|
<p><a href="/metrics">Metrics</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
self.wfile.write(html.encode("utf-8"))
|
|
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
return RequestHandler
|
|
|
|
|
|
class TimeoutMonitor:
|
|
"""Background thread to monitor device timeouts"""
|
|
|
|
def __init__(self, cache: SensorDataCache, config_module):
|
|
self.cache = cache
|
|
self.config = config_module
|
|
self._should_run = True
|
|
self.logger = logging.getLogger("TimeoutMonitor")
|
|
|
|
def run(self):
|
|
"""Main monitoring loop"""
|
|
while self._should_run:
|
|
try:
|
|
self.cache.check_timeouts()
|
|
|
|
# Also cleanup old entries periodically
|
|
if hasattr(self.config, "cache_cleanup_interval"):
|
|
self.cache.cleanup_old_entries(self.config.max_cache_age_hours)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error in timeout monitoring: {e}", exc_info=True)
|
|
|
|
# Check every minute
|
|
time.sleep(60)
|
|
|
|
def stop(self):
|
|
"""Stop the monitor"""
|
|
self._should_run = False
|
|
|
|
|
|
def setup_logging(config_module):
|
|
"""Configure logging"""
|
|
log_level = getattr(logging, config_module.log_level.upper(), logging.INFO)
|
|
log_format = getattr(
|
|
config_module,
|
|
"log_format",
|
|
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
|
|
logging.basicConfig(
|
|
level=log_level, format=log_format, handlers=[logging.StreamHandler(sys.stdout)]
|
|
)
|
|
|
|
|
|
def main():
|
|
"""Main application entry point"""
|
|
# Setup logging
|
|
setup_logging(config)
|
|
logger = logging.getLogger("Main")
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("TTN VEGAPULS Air Prometheus Exporter")
|
|
logger.info("=" * 60)
|
|
logger.info(f"Integration Method: {config.integration_method}")
|
|
logger.info(f"Sensor Timeout: {config.sensor_timeout_hours} hours")
|
|
logger.info(f"HTTP Server: {config.hostName}:{config.serverPort}")
|
|
logger.info("=" * 60)
|
|
|
|
# Create sensor data cache
|
|
cache = SensorDataCache(timeout_hours=config.sensor_timeout_hours)
|
|
|
|
# Start timeout monitor
|
|
timeout_monitor = TimeoutMonitor(cache, config)
|
|
monitor_thread = threading.Thread(
|
|
target=timeout_monitor.run, daemon=True, name="TimeoutMonitor"
|
|
)
|
|
monitor_thread.start()
|
|
logger.info("Started timeout monitor")
|
|
|
|
# Start MQTT client if configured
|
|
mqtt_client = None
|
|
mqtt_thread = None
|
|
if config.integration_method.lower() == "mqtt":
|
|
mqtt_client = TTNMQTTClient(cache, config)
|
|
mqtt_thread = threading.Thread(
|
|
target=mqtt_client.run_with_reconnect, daemon=True, name="MQTTClient"
|
|
)
|
|
mqtt_thread.start()
|
|
logger.info("Started MQTT client")
|
|
else:
|
|
logger.warning(f"Unsupported integration method: {config.integration_method}")
|
|
logger.warning("Only 'mqtt' is currently supported")
|
|
|
|
# Start HTTP server
|
|
metrics_server = MetricsServer(cache, config)
|
|
handler = metrics_server.create_handler()
|
|
|
|
try:
|
|
http_server = HTTPServer((config.hostName, config.serverPort), handler)
|
|
logger.info(
|
|
f"HTTP server started at http://{config.hostName}:{config.serverPort}"
|
|
)
|
|
logger.info("Press Ctrl+C to stop")
|
|
|
|
http_server.serve_forever()
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\nShutdown requested by user")
|
|
except Exception as e:
|
|
logger.error(f"Fatal error: {e}", exc_info=True)
|
|
finally:
|
|
# Cleanup
|
|
logger.info("Shutting down...")
|
|
|
|
if mqtt_client:
|
|
mqtt_client.stop()
|
|
|
|
timeout_monitor.stop()
|
|
|
|
logger.info("Shutdown complete")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|