New MQTT adaption

This commit is contained in:
2026-01-04 11:07:38 +01:00
parent b28da4739f
commit 16cc9c1cb4
9 changed files with 1135 additions and 243 deletions

View File

@ -1,286 +1,595 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com
"""
TTN VEGAPULS Air Prometheus Exporter
Exports metrics from VEGAPULS Air sensors connected via The Things Network
Author: Hendrik Schutter, mail@hendrikschutter.com
"""
from http.server import BaseHTTPRequestHandler, HTTPServer
import paho.mqtt.client as mqtt
from datetime import datetime, timedelta
import threading
import time
import json
import sys
import config
import json
import time
import threading
import logging
import ssl
from datetime import datetime, timedelta
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, Optional, Any
import paho.mqtt.client as mqtt
import config
scrape_healthy = True
startTime = datetime.now()
lastMqttReception = datetime.now()
node_metrics = list()
mutex = threading.Lock()
request_count = 0
class SensorDataCache:
"""Thread-safe cache for sensor uplink data with timeout tracking"""
mqtt_client = None
mqtt_connected = False
mqtt_lock = threading.Lock()
def __init__(self, timeout_hours: int = 19):
self._data: Dict[str, Dict[str, Any]] = {}
self._lock = threading.RLock()
self.timeout_hours = timeout_hours
def monitor_timeout():
global scrape_healthy
global lastMqttReception
global mqtt_connected
def update(
self, device_id: str, payload: Dict, metadata: list, timestamp: datetime
):
"""
Update cached data for a device
while True:
time_since_last_reception = datetime.now() - lastMqttReception
if time_since_last_reception > timedelta(hours=config.ttn_timeout):
with mutex:
scrape_healthy = False
mqtt_connected = False
time.sleep(60) # Check timeout every minute
Args:
device_id: Unique device identifier
payload: Decoded payload from TTN
metadata: RX metadata from TTN
timestamp: Timestamp of the uplink
"""
with self._lock:
self._data[device_id] = {
"payload": payload,
"metadata": metadata,
"timestamp": timestamp,
"is_online": True,
}
logging.info(f"Updated cache for device {device_id}")
def reconnect_mqtt():
global mqtt_client
global mqtt_connected
def get_all_devices(self) -> Dict[str, Dict[str, Any]]:
"""
Get all cached device data
while True:
if not mqtt_connected:
with mqtt_lock:
try:
if mqtt_client is None:
print("MQTT client is None, creating a new client...")
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.on_disconnect = on_disconnect
mqtt_client.username_pw_set(config.ttn_user, config.ttn_key)
mqtt_client.tls_set()
Returns:
Dictionary of device data
"""
with self._lock:
return dict(self._data)
print("Attempting to reconnect to MQTT broker...")
mqtt_client.connect(
config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60
def check_timeouts(self):
"""Check all devices for timeout and mark offline ones"""
with self._lock:
now = datetime.now()
timeout_threshold = timedelta(hours=self.timeout_hours)
for device_id, data in self._data.items():
time_since_update = now - data["timestamp"]
was_online = data["is_online"]
data["is_online"] = time_since_update < timeout_threshold
if was_online and not data["is_online"]:
logging.warning(
f"Device {device_id} marked as OFFLINE "
f"(no uplink for {time_since_update.total_seconds()/3600:.1f} hours)"
)
except Exception as e:
print(f"MQTT reconnect failed: {e}")
time.sleep(60) # Retry every 10 seconds
elif not was_online and data["is_online"]:
logging.info(f"Device {device_id} is back ONLINE")
def cleanup_old_entries(self, max_age_hours: int = 72):
"""Remove entries older than max_age_hours"""
with self._lock:
now = datetime.now()
max_age = timedelta(hours=max_age_hours)
devices_to_remove = [
device_id
for device_id, data in self._data.items()
if now - data["timestamp"] > max_age
]
for device_id in devices_to_remove:
del self._data[device_id]
logging.info(f"Removed stale cache entry for device {device_id}")
class RequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
class TTNMQTTClient:
"""Manages MQTT connection to TTN with automatic reconnection"""
def get_metrics(self):
global request_count
global node_metrics
global mutex
mutex.acquire()
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
bytes(
config.exporter_prefix
+ "exporter_duration_seconds_sum "
+ str(int((datetime.now() - startTime).total_seconds()))
+ "\n",
"utf-8",
)
)
self.wfile.write(
bytes(
config.exporter_prefix
+ "exporter_request_count "
+ str(request_count)
+ "\n",
"utf-8",
)
)
self.wfile.write(
bytes(
config.exporter_prefix
+ "exporter_scrape_healthy "
+ str(int(scrape_healthy))
+ "\n",
"utf-8",
)
)
def __init__(self, cache: SensorDataCache, config_module):
self.cache = cache
self.config = config_module
self.client: Optional[mqtt.Client] = None
self.connected = False
self._lock = threading.Lock()
self._should_run = True
for metric in node_metrics:
self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8"))
# Setup logging
self.logger = logging.getLogger("TTNMQTTClient")
mutex.release()
def _on_connect(self, client, userdata, flags, reason_code, properties):
"""Callback when connected to MQTT broker"""
if reason_code == 0:
self.logger.info("Successfully connected to TTN MQTT broker")
self.connected = True
def do_GET(self):
global request_count
request_count += 1
if self.path.startswith("/metrics"):
self.get_metrics()
# Subscribe to uplink messages
topic = f"v3/{self.config.ttn_user}/devices/+/up"
client.subscribe(topic, qos=1)
self.logger.info(f"Subscribed to topic: {topic}")
else:
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(bytes("<html>", "utf-8"))
self.wfile.write(
bytes("<head><title>VEGAPULS Air exporter</title></head>", "utf-8")
self.logger.error(
f"Failed to connect to MQTT broker. Reason code: {reason_code}"
)
self.wfile.write(bytes("<body>", "utf-8"))
self.wfile.write(
bytes(
"<h1>ttn-vegapulsair exporter based on data from LoRaWAN TTN node.</h1>",
"utf-8",
self.connected = False
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
"""Callback when disconnected from MQTT broker"""
self.logger.warning(
f"Disconnected from MQTT broker. Reason code: {reason_code}"
)
self.connected = False
def _on_message(self, client, userdata, msg):
"""Callback when a message is received"""
self.logger.debug(f"Uplink message received! {msg.topic}")
try:
# Parse the JSON payload
message_data = json.loads(msg.payload.decode("utf-8"))
# Extract device information
device_id = message_data.get("end_device_ids", {}).get(
"device_id", "unknown"
)
# Check if this is an uplink message with decoded payload
if "uplink_message" not in message_data:
self.logger.debug(f"Ignoring non-uplink message from {device_id}")
return
uplink = message_data["uplink_message"]
if "decoded_payload" not in uplink:
self.logger.warning(f"No decoded payload for device {device_id}")
return
# Update cache with new data
self.cache.update(
device_id=device_id,
payload=uplink["decoded_payload"],
metadata=uplink.get("rx_metadata", []),
timestamp=datetime.now(),
)
self.logger.debug(f"Processed uplink from device: {device_id}")
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse MQTT message: {e}")
except Exception as e:
self.logger.error(f"Error processing MQTT message: {e}", exc_info=True)
def _create_client(self):
"""Create and configure MQTT client"""
client = mqtt.Client(
client_id=f"vegapuls-exporter-{int(time.time())}",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
# Set callbacks
client.on_connect = self._on_connect
client.on_disconnect = self._on_disconnect
client.on_message = self._on_message
# Set credentials
client.username_pw_set(self.config.ttn_user, self.config.ttn_key)
# Configure TLS
client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT)
client.tls_insecure_set(False)
return client
def connect(self):
"""Connect to TTN MQTT broker"""
with self._lock:
try:
if self.client is None:
self.client = self._create_client()
broker_url = f"{self.config.ttn_region.lower()}.cloud.thethings.network"
self.logger.info(f"Connecting to MQTT broker: {broker_url}")
self.client.connect(
broker_url, port=8883, keepalive=self.config.mqtt_keepalive
)
# Start the network loop in a separate thread
self.client.loop_start()
return True
except Exception as e:
self.logger.error(f"Failed to connect to MQTT broker: {e}")
return False
def disconnect(self):
"""Disconnect from MQTT broker"""
with self._lock:
if self.client:
self.client.loop_stop()
self.client.disconnect()
self.connected = False
self.logger.info("Disconnected from MQTT broker")
def run_with_reconnect(self):
"""Main loop with automatic reconnection"""
reconnect_delay = self.config.mqtt_reconnect_delay
while self._should_run:
if not self.connected:
self.logger.info("Attempting to connect to MQTT broker...")
if self.connect():
# Reset reconnect delay on successful connection
reconnect_delay = self.config.mqtt_reconnect_delay
else:
# Exponential backoff for reconnection
self.logger.warning(
f"Reconnection failed. Retrying in {reconnect_delay}s..."
)
time.sleep(reconnect_delay)
reconnect_delay = min(
reconnect_delay * 2, self.config.mqtt_reconnect_max_delay
)
continue
# Wait a bit before checking connection again
time.sleep(10)
def stop(self):
"""Stop the MQTT client"""
self._should_run = False
self.disconnect()
class MetricsServer:
"""HTTP server for Prometheus metrics endpoint"""
def __init__(self, cache: SensorDataCache, config_module):
self.cache = cache
self.config = config_module
self.start_time = datetime.now()
self.request_count = 0
self._lock = threading.Lock()
def _format_metric(
self, name: str, value: Any, labels: Dict[str, str] = None
) -> str:
"""
Format a Prometheus metric
Args:
name: Metric name
value: Metric value
labels: Optional labels dictionary
Returns:
Formatted metric string
"""
metric_name = f"{self.config.exporter_prefix}{name}"
if labels:
label_str = ",".join([f'{k}="{v}"' for k, v in labels.items()])
return f"{metric_name}{{{label_str}}} {value}"
else:
return f"{metric_name} {value}"
def _generate_metrics(self) -> str:
"""Generate all Prometheus metrics"""
metrics = []
# Exporter meta metrics
uptime = int((datetime.now() - self.start_time).total_seconds())
metrics.append(self._format_metric("exporter_uptime_seconds", uptime))
metrics.append(
self._format_metric("exporter_requests_total", self.request_count)
)
# Get all device data
devices = self.cache.get_all_devices()
# Overall health metric
online_devices = sum(1 for d in devices.values() if d["is_online"])
total_devices = len(devices)
metrics.append(self._format_metric("devices_total", total_devices))
metrics.append(self._format_metric("devices_online", online_devices))
# Per-device metrics
for device_id, data in devices.items():
labels = {"device_id": device_id}
# Device online status (1 = online, 0 = offline/timeout)
metrics.append(
self._format_metric("device_online", int(data["is_online"]), labels)
)
# Time since last uplink in seconds
time_since_uplink = (datetime.now() - data["timestamp"]).total_seconds()
metrics.append(
self._format_metric(
"last_uplink_seconds_ago", int(time_since_uplink), labels
)
)
self.wfile.write(bytes('<p><a href="/metrics">Metrics</a></p>', "utf-8"))
self.wfile.write(bytes("</body>", "utf-8"))
self.wfile.write(bytes("</html>", "utf-8"))
def update_metrics(payload, metadata):
global node_metrics
global mutex
global scrape_healthy
global lastMqttReception
payload = data["payload"]
metadata = data["metadata"]
mutex.acquire()
node_metrics.clear()
# Sensor measurements
if "Distance" in payload:
metrics.append(
self._format_metric(
"distance_mm", float(payload["Distance"]), labels
)
)
if "Distance" in payload:
node_metrics.append("distance " + str(float(payload["Distance"])))
if "Temperature" in payload:
metrics.append(
self._format_metric(
"temperature_celsius", int(payload["Temperature"]), labels
)
)
if "Inclination_degree" in payload:
node_metrics.append("inclination_degree " + str(int(payload["Inclination_degree"])))
if "Inclination_degree" in payload:
metrics.append(
self._format_metric(
"inclination_degrees",
int(payload["Inclination_degree"]),
labels,
)
)
if "MvLinProcent" in payload:
node_metrics.append("linprocent " + str(int(payload["MvLinProcent"])))
if "MvLinProcent" in payload:
metrics.append(
self._format_metric(
"linear_percent", int(payload["MvLinProcent"]), labels
)
)
if "MvProcent" in payload:
node_metrics.append("procent " + str(int(payload["MvProcent"])))
if "MvProcent" in payload:
metrics.append(
self._format_metric("percent", int(payload["MvProcent"]), labels)
)
if "MvScaled" in payload:
node_metrics.append("scaled " + str(float(payload["MvScaled"])))
if "MvScaled" in payload:
metrics.append(
self._format_metric(
"scaled_value", float(payload["MvScaled"]), labels
)
)
if "MvScaledUnit" in payload:
node_metrics.append("scaled_unit " + str(int(payload["MvScaledUnit"])))
if "MvScaledUnit" in payload:
metrics.append(
self._format_metric(
"scaled_unit", int(payload["MvScaledUnit"]), labels
)
)
if "PacketIdentifier" in payload:
node_metrics.append("packet_identifier " + str(int(payload["PacketIdentifier"])))
if "PacketIdentifier" in payload:
metrics.append(
self._format_metric(
"packet_identifier", int(payload["PacketIdentifier"]), labels
)
)
if "RemainingPower" in payload:
node_metrics.append("remaining_power " + str(int(payload["RemainingPower"])))
if "RemainingPower" in payload:
metrics.append(
self._format_metric(
"battery_percent", int(payload["RemainingPower"]), labels
)
)
if "Temperature" in payload:
node_metrics.append("temperature " + str(int(payload["Temperature"])))
if "Unit" in payload:
metrics.append(
self._format_metric("unit", int(payload["Unit"]), labels)
)
if "Unit" in payload:
node_metrics.append("unit " + str(int(payload["Unit"])))
if "UnitTemperature" in payload:
metrics.append(
self._format_metric(
"temperature_unit", int(payload["UnitTemperature"]), labels
)
)
if "UnitTemperature" in payload:
node_metrics.append("temperature_unit " + str(int(payload["UnitTemperature"])))
# LoRaWAN metadata
if metadata and len(metadata) > 0:
first_gateway = metadata[0]
if "rssi" in metadata[0]:
node_metrics.append("rssi " + str(int(metadata[0]["rssi"])))
if "rssi" in first_gateway:
metrics.append(
self._format_metric(
"rssi_dbm", int(first_gateway["rssi"]), labels
)
)
if "channel_rssi" in metadata[0]:
node_metrics.append("channel_rssi " + str(int(metadata[0]["channel_rssi"])))
if "channel_rssi" in first_gateway:
metrics.append(
self._format_metric(
"channel_rssi_dbm",
int(first_gateway["channel_rssi"]),
labels,
)
)
if "snr" in metadata[0]:
node_metrics.append("snr " + str(float(metadata[0]["snr"])))
if "snr" in first_gateway:
metrics.append(
self._format_metric(
"snr_db", float(first_gateway["snr"]), labels
)
)
scrape_healthy = True
lastMqttReception = datetime.now()
mutex.release()
return "\n".join(metrics) + "\n"
def on_connect(client, userdata, flags, reason_code, properties):
global mqtt_connected
if reason_code == 0:
print("\nConnected to MQTT: reason_code = " + str(reason_code))
mqtt_connected = True
elif reason_code > 0:
print("\nNot connected to MQTT: reason_code = " + str(reason_code))
mqtt_connected = False
def create_handler(self):
"""Create HTTP request handler"""
server_instance = self
def on_disconnect(client, userdata, flags, reason_code, tmp):
global mqtt_connected
print(f"Disconnected from MQTT: reason_code = {reason_code}")
mqtt_connected = False
class RequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
"""Suppress default logging"""
pass
def on_message(mqttc, obj, msg):
print("on_message")
global scrape_healthy
def do_GET(self):
with server_instance._lock:
server_instance.request_count += 1
try:
parsedJSON = json.loads(msg.payload)
print(parsedJSON)
uplink_message = parsedJSON["uplink_message"]
update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"])
except Exception as e:
with mutex:
scrape_healthy = False
print(f"Unable to parse uplink: {e}")
if self.path == "/metrics":
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.end_headers()
def poll_mqtt(mqtt_client):
# Start the network loop
mqtt_client.loop_forever()
metrics = server_instance._generate_metrics()
self.wfile.write(metrics.encode("utf-8"))
def configure_mqtt_client():
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
elif self.path == "/" or self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.end_headers()
# Set credentials
client.username_pw_set(config.ttn_user, config.ttn_key)
html = """
<html>
<head><title>VEGAPULS Air Exporter</title></head>
<body>
<h1>TTN VEGAPULS Air Prometheus Exporter</h1>
<p>Exporter for VEGAPULS Air sensors connected via The Things Network</p>
<p><a href="/metrics">Metrics</a></p>
</body>
</html>
"""
self.wfile.write(html.encode("utf-8"))
# Set up TLS/SSL
client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2, # Enforce TLS 1.2
else:
self.send_response(404)
self.end_headers()
return RequestHandler
class TimeoutMonitor:
"""Background thread to monitor device timeouts"""
def __init__(self, cache: SensorDataCache, config_module):
self.cache = cache
self.config = config_module
self._should_run = True
self.logger = logging.getLogger("TimeoutMonitor")
def run(self):
"""Main monitoring loop"""
while self._should_run:
try:
self.cache.check_timeouts()
# Also cleanup old entries periodically
if hasattr(self.config, "cache_cleanup_interval"):
self.cache.cleanup_old_entries(self.config.max_cache_age_hours)
except Exception as e:
self.logger.error(f"Error in timeout monitoring: {e}", exc_info=True)
# Check every minute
time.sleep(60)
def stop(self):
"""Stop the monitor"""
self._should_run = False
def setup_logging(config_module):
"""Configure logging"""
log_level = getattr(logging, config_module.log_level.upper(), logging.INFO)
log_format = getattr(
config_module,
"log_format",
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logging.basicConfig(
level=log_level, format=log_format, handlers=[logging.StreamHandler(sys.stdout)]
)
client.tls_insecure_set(False) # Enforce strict certificate validation
return client
def main():
global mqtt_client
"""Main application entry point"""
# Setup logging
setup_logging(config)
logger = logging.getLogger("Main")
# Start timeout monitoring thread
timeout_thread = threading.Thread(target=monitor_timeout, daemon=True)
timeout_thread.start()
logger.info("=" * 60)
logger.info("TTN VEGAPULS Air Prometheus Exporter")
logger.info("=" * 60)
logger.info(f"Integration Method: {config.integration_method}")
logger.info(f"Sensor Timeout: {config.sensor_timeout_hours} hours")
logger.info(f"HTTP Server: {config.hostName}:{config.serverPort}")
logger.info("=" * 60)
# Start MQTT reconnect thread
reconnect_thread = threading.Thread(target=reconnect_mqtt, daemon=True)
reconnect_thread.start()
# Create sensor data cache
cache = SensorDataCache(timeout_hours=config.sensor_timeout_hours)
while True:
mqtt_client = configure_mqtt_client()
try:
# Connect to TTN broker
broker_url = f"{config.ttn_region.lower()}.cloud.thethings.network"
mqtt_client.connect(broker_url, 8883, 60)
# Subscribe to all topics
mqtt_client.subscribe("#", 1)
logging.info(f"Subscribed to all topics.")
# Start timeout monitor
timeout_monitor = TimeoutMonitor(cache, config)
monitor_thread = threading.Thread(
target=timeout_monitor.run, daemon=True, name="TimeoutMonitor"
)
monitor_thread.start()
logger.info("Started timeout monitor")
poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,)))
poll_mqtt_thread.start()
except Exception as e:
logging.error(f"Error occurred: {e}")
mqtt_client.loop_stop()
# Start MQTT client if configured
mqtt_client = None
mqtt_thread = None
if config.integration_method.lower() == "mqtt":
mqtt_client = TTNMQTTClient(cache, config)
mqtt_thread = threading.Thread(
target=mqtt_client.run_with_reconnect, daemon=True, name="MQTTClient"
)
mqtt_thread.start()
logger.info("Started MQTT client")
else:
logger.warning(f"Unsupported integration method: {config.integration_method}")
logger.warning("Only 'mqtt' is currently supported")
webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler)
print("Server started http://%s:%s" % (config.hostName, config.serverPort))
# Start HTTP server
metrics_server = MetricsServer(cache, config)
handler = metrics_server.create_handler()
try:
webServer.serve_forever()
except KeyboardInterrupt:
sys.exit(-1)
try:
http_server = HTTPServer((config.hostName, config.serverPort), handler)
logger.info(
f"HTTP server started at http://{config.hostName}:{config.serverPort}"
)
logger.info("Press Ctrl+C to stop")
http_server.serve_forever()
except KeyboardInterrupt:
logger.info("\nShutdown requested by user")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
finally:
# Cleanup
logger.info("Shutting down...")
if mqtt_client:
mqtt_client.stop()
timeout_monitor.stop()
logger.info("Shutdown complete")
sys.exit(0)
webServer.server_close()
print("Server stopped.")
poll_mqtt_thread.join()
except Exception as e:
print(e)
time.sleep(60)
if __name__ == "__main__":
main()