Files
TTN-VEGAPULS-Air-exporter/ttn-vegapulsair-exporter.py
2026-01-04 11:07:38 +01:00

596 lines
20 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TTN VEGAPULS Air Prometheus Exporter
Exports metrics from VEGAPULS Air sensors connected via The Things Network
Author: Hendrik Schutter, mail@hendrikschutter.com
"""
import sys
import json
import time
import threading
import logging
import ssl
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, Optional, Any
import paho.mqtt.client as mqtt
import config
class SensorDataCache:
"""Thread-safe cache for sensor uplink data with timeout tracking"""
def __init__(self, timeout_hours: int = 19):
self._data: Dict[str, Dict[str, Any]] = {}
self._lock = threading.RLock()
self.timeout_hours = timeout_hours
def update(
self, device_id: str, payload: Dict, metadata: list, timestamp: datetime
):
"""
Update cached data for a device
Args:
device_id: Unique device identifier
payload: Decoded payload from TTN
metadata: RX metadata from TTN
timestamp: Timestamp of the uplink
"""
with self._lock:
self._data[device_id] = {
"payload": payload,
"metadata": metadata,
"timestamp": timestamp,
"is_online": True,
}
logging.info(f"Updated cache for device {device_id}")
def get_all_devices(self) -> Dict[str, Dict[str, Any]]:
"""
Get all cached device data
Returns:
Dictionary of device data
"""
with self._lock:
return dict(self._data)
def check_timeouts(self):
"""Check all devices for timeout and mark offline ones"""
with self._lock:
now = datetime.now()
timeout_threshold = timedelta(hours=self.timeout_hours)
for device_id, data in self._data.items():
time_since_update = now - data["timestamp"]
was_online = data["is_online"]
data["is_online"] = time_since_update < timeout_threshold
if was_online and not data["is_online"]:
logging.warning(
f"Device {device_id} marked as OFFLINE "
f"(no uplink for {time_since_update.total_seconds()/3600:.1f} hours)"
)
elif not was_online and data["is_online"]:
logging.info(f"Device {device_id} is back ONLINE")
def cleanup_old_entries(self, max_age_hours: int = 72):
"""Remove entries older than max_age_hours"""
with self._lock:
now = datetime.now()
max_age = timedelta(hours=max_age_hours)
devices_to_remove = [
device_id
for device_id, data in self._data.items()
if now - data["timestamp"] > max_age
]
for device_id in devices_to_remove:
del self._data[device_id]
logging.info(f"Removed stale cache entry for device {device_id}")
class TTNMQTTClient:
"""Manages MQTT connection to TTN with automatic reconnection"""
def __init__(self, cache: SensorDataCache, config_module):
self.cache = cache
self.config = config_module
self.client: Optional[mqtt.Client] = None
self.connected = False
self._lock = threading.Lock()
self._should_run = True
# Setup logging
self.logger = logging.getLogger("TTNMQTTClient")
def _on_connect(self, client, userdata, flags, reason_code, properties):
"""Callback when connected to MQTT broker"""
if reason_code == 0:
self.logger.info("Successfully connected to TTN MQTT broker")
self.connected = True
# Subscribe to uplink messages
topic = f"v3/{self.config.ttn_user}/devices/+/up"
client.subscribe(topic, qos=1)
self.logger.info(f"Subscribed to topic: {topic}")
else:
self.logger.error(
f"Failed to connect to MQTT broker. Reason code: {reason_code}"
)
self.connected = False
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
"""Callback when disconnected from MQTT broker"""
self.logger.warning(
f"Disconnected from MQTT broker. Reason code: {reason_code}"
)
self.connected = False
def _on_message(self, client, userdata, msg):
"""Callback when a message is received"""
self.logger.debug(f"Uplink message received! {msg.topic}")
try:
# Parse the JSON payload
message_data = json.loads(msg.payload.decode("utf-8"))
# Extract device information
device_id = message_data.get("end_device_ids", {}).get(
"device_id", "unknown"
)
# Check if this is an uplink message with decoded payload
if "uplink_message" not in message_data:
self.logger.debug(f"Ignoring non-uplink message from {device_id}")
return
uplink = message_data["uplink_message"]
if "decoded_payload" not in uplink:
self.logger.warning(f"No decoded payload for device {device_id}")
return
# Update cache with new data
self.cache.update(
device_id=device_id,
payload=uplink["decoded_payload"],
metadata=uplink.get("rx_metadata", []),
timestamp=datetime.now(),
)
self.logger.debug(f"Processed uplink from device: {device_id}")
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse MQTT message: {e}")
except Exception as e:
self.logger.error(f"Error processing MQTT message: {e}", exc_info=True)
def _create_client(self):
"""Create and configure MQTT client"""
client = mqtt.Client(
client_id=f"vegapuls-exporter-{int(time.time())}",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
# Set callbacks
client.on_connect = self._on_connect
client.on_disconnect = self._on_disconnect
client.on_message = self._on_message
# Set credentials
client.username_pw_set(self.config.ttn_user, self.config.ttn_key)
# Configure TLS
client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT)
client.tls_insecure_set(False)
return client
def connect(self):
"""Connect to TTN MQTT broker"""
with self._lock:
try:
if self.client is None:
self.client = self._create_client()
broker_url = f"{self.config.ttn_region.lower()}.cloud.thethings.network"
self.logger.info(f"Connecting to MQTT broker: {broker_url}")
self.client.connect(
broker_url, port=8883, keepalive=self.config.mqtt_keepalive
)
# Start the network loop in a separate thread
self.client.loop_start()
return True
except Exception as e:
self.logger.error(f"Failed to connect to MQTT broker: {e}")
return False
def disconnect(self):
"""Disconnect from MQTT broker"""
with self._lock:
if self.client:
self.client.loop_stop()
self.client.disconnect()
self.connected = False
self.logger.info("Disconnected from MQTT broker")
def run_with_reconnect(self):
"""Main loop with automatic reconnection"""
reconnect_delay = self.config.mqtt_reconnect_delay
while self._should_run:
if not self.connected:
self.logger.info("Attempting to connect to MQTT broker...")
if self.connect():
# Reset reconnect delay on successful connection
reconnect_delay = self.config.mqtt_reconnect_delay
else:
# Exponential backoff for reconnection
self.logger.warning(
f"Reconnection failed. Retrying in {reconnect_delay}s..."
)
time.sleep(reconnect_delay)
reconnect_delay = min(
reconnect_delay * 2, self.config.mqtt_reconnect_max_delay
)
continue
# Wait a bit before checking connection again
time.sleep(10)
def stop(self):
"""Stop the MQTT client"""
self._should_run = False
self.disconnect()
class MetricsServer:
"""HTTP server for Prometheus metrics endpoint"""
def __init__(self, cache: SensorDataCache, config_module):
self.cache = cache
self.config = config_module
self.start_time = datetime.now()
self.request_count = 0
self._lock = threading.Lock()
def _format_metric(
self, name: str, value: Any, labels: Dict[str, str] = None
) -> str:
"""
Format a Prometheus metric
Args:
name: Metric name
value: Metric value
labels: Optional labels dictionary
Returns:
Formatted metric string
"""
metric_name = f"{self.config.exporter_prefix}{name}"
if labels:
label_str = ",".join([f'{k}="{v}"' for k, v in labels.items()])
return f"{metric_name}{{{label_str}}} {value}"
else:
return f"{metric_name} {value}"
def _generate_metrics(self) -> str:
"""Generate all Prometheus metrics"""
metrics = []
# Exporter meta metrics
uptime = int((datetime.now() - self.start_time).total_seconds())
metrics.append(self._format_metric("exporter_uptime_seconds", uptime))
metrics.append(
self._format_metric("exporter_requests_total", self.request_count)
)
# Get all device data
devices = self.cache.get_all_devices()
# Overall health metric
online_devices = sum(1 for d in devices.values() if d["is_online"])
total_devices = len(devices)
metrics.append(self._format_metric("devices_total", total_devices))
metrics.append(self._format_metric("devices_online", online_devices))
# Per-device metrics
for device_id, data in devices.items():
labels = {"device_id": device_id}
# Device online status (1 = online, 0 = offline/timeout)
metrics.append(
self._format_metric("device_online", int(data["is_online"]), labels)
)
# Time since last uplink in seconds
time_since_uplink = (datetime.now() - data["timestamp"]).total_seconds()
metrics.append(
self._format_metric(
"last_uplink_seconds_ago", int(time_since_uplink), labels
)
)
payload = data["payload"]
metadata = data["metadata"]
# Sensor measurements
if "Distance" in payload:
metrics.append(
self._format_metric(
"distance_mm", float(payload["Distance"]), labels
)
)
if "Temperature" in payload:
metrics.append(
self._format_metric(
"temperature_celsius", int(payload["Temperature"]), labels
)
)
if "Inclination_degree" in payload:
metrics.append(
self._format_metric(
"inclination_degrees",
int(payload["Inclination_degree"]),
labels,
)
)
if "MvLinProcent" in payload:
metrics.append(
self._format_metric(
"linear_percent", int(payload["MvLinProcent"]), labels
)
)
if "MvProcent" in payload:
metrics.append(
self._format_metric("percent", int(payload["MvProcent"]), labels)
)
if "MvScaled" in payload:
metrics.append(
self._format_metric(
"scaled_value", float(payload["MvScaled"]), labels
)
)
if "MvScaledUnit" in payload:
metrics.append(
self._format_metric(
"scaled_unit", int(payload["MvScaledUnit"]), labels
)
)
if "PacketIdentifier" in payload:
metrics.append(
self._format_metric(
"packet_identifier", int(payload["PacketIdentifier"]), labels
)
)
if "RemainingPower" in payload:
metrics.append(
self._format_metric(
"battery_percent", int(payload["RemainingPower"]), labels
)
)
if "Unit" in payload:
metrics.append(
self._format_metric("unit", int(payload["Unit"]), labels)
)
if "UnitTemperature" in payload:
metrics.append(
self._format_metric(
"temperature_unit", int(payload["UnitTemperature"]), labels
)
)
# LoRaWAN metadata
if metadata and len(metadata) > 0:
first_gateway = metadata[0]
if "rssi" in first_gateway:
metrics.append(
self._format_metric(
"rssi_dbm", int(first_gateway["rssi"]), labels
)
)
if "channel_rssi" in first_gateway:
metrics.append(
self._format_metric(
"channel_rssi_dbm",
int(first_gateway["channel_rssi"]),
labels,
)
)
if "snr" in first_gateway:
metrics.append(
self._format_metric(
"snr_db", float(first_gateway["snr"]), labels
)
)
return "\n".join(metrics) + "\n"
def create_handler(self):
"""Create HTTP request handler"""
server_instance = self
class RequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""Suppress default logging"""
pass
def do_GET(self):
with server_instance._lock:
server_instance.request_count += 1
if self.path == "/metrics":
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.end_headers()
metrics = server_instance._generate_metrics()
self.wfile.write(metrics.encode("utf-8"))
elif self.path == "/" or self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
html = """
<html>
<head><title>VEGAPULS Air Exporter</title></head>
<body>
<h1>TTN VEGAPULS Air Prometheus Exporter</h1>
<p>Exporter for VEGAPULS Air sensors connected via The Things Network</p>
<p><a href="/metrics">Metrics</a></p>
</body>
</html>
"""
self.wfile.write(html.encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
return RequestHandler
class TimeoutMonitor:
"""Background thread to monitor device timeouts"""
def __init__(self, cache: SensorDataCache, config_module):
self.cache = cache
self.config = config_module
self._should_run = True
self.logger = logging.getLogger("TimeoutMonitor")
def run(self):
"""Main monitoring loop"""
while self._should_run:
try:
self.cache.check_timeouts()
# Also cleanup old entries periodically
if hasattr(self.config, "cache_cleanup_interval"):
self.cache.cleanup_old_entries(self.config.max_cache_age_hours)
except Exception as e:
self.logger.error(f"Error in timeout monitoring: {e}", exc_info=True)
# Check every minute
time.sleep(60)
def stop(self):
"""Stop the monitor"""
self._should_run = False
def setup_logging(config_module):
"""Configure logging"""
log_level = getattr(logging, config_module.log_level.upper(), logging.INFO)
log_format = getattr(
config_module,
"log_format",
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logging.basicConfig(
level=log_level, format=log_format, handlers=[logging.StreamHandler(sys.stdout)]
)
def main():
"""Main application entry point"""
# Setup logging
setup_logging(config)
logger = logging.getLogger("Main")
logger.info("=" * 60)
logger.info("TTN VEGAPULS Air Prometheus Exporter")
logger.info("=" * 60)
logger.info(f"Integration Method: {config.integration_method}")
logger.info(f"Sensor Timeout: {config.sensor_timeout_hours} hours")
logger.info(f"HTTP Server: {config.hostName}:{config.serverPort}")
logger.info("=" * 60)
# Create sensor data cache
cache = SensorDataCache(timeout_hours=config.sensor_timeout_hours)
# Start timeout monitor
timeout_monitor = TimeoutMonitor(cache, config)
monitor_thread = threading.Thread(
target=timeout_monitor.run, daemon=True, name="TimeoutMonitor"
)
monitor_thread.start()
logger.info("Started timeout monitor")
# Start MQTT client if configured
mqtt_client = None
mqtt_thread = None
if config.integration_method.lower() == "mqtt":
mqtt_client = TTNMQTTClient(cache, config)
mqtt_thread = threading.Thread(
target=mqtt_client.run_with_reconnect, daemon=True, name="MQTTClient"
)
mqtt_thread.start()
logger.info("Started MQTT client")
else:
logger.warning(f"Unsupported integration method: {config.integration_method}")
logger.warning("Only 'mqtt' is currently supported")
# Start HTTP server
metrics_server = MetricsServer(cache, config)
handler = metrics_server.create_handler()
try:
http_server = HTTPServer((config.hostName, config.serverPort), handler)
logger.info(
f"HTTP server started at http://{config.hostName}:{config.serverPort}"
)
logger.info("Press Ctrl+C to stop")
http_server.serve_forever()
except KeyboardInterrupt:
logger.info("\nShutdown requested by user")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
finally:
# Cleanup
logger.info("Shutting down...")
if mqtt_client:
mqtt_client.stop()
timeout_monitor.stop()
logger.info("Shutdown complete")
sys.exit(0)
if __name__ == "__main__":
main()