TTN-VEGAPULS-Air-exporter/ttn-vegapulsair-exporter.py
2025-06-01 12:46:16 +02:00

287 lines
8.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com
"""
from http.server import BaseHTTPRequestHandler, HTTPServer
import paho.mqtt.client as mqtt
from datetime import datetime, timedelta
import threading
import time
import json
import sys
import config
import logging
import ssl
scrape_healthy = True
startTime = datetime.now()
lastMqttReception = datetime.now()
node_metrics = list()
mutex = threading.Lock()
request_count = 0
mqtt_client = None
mqtt_connected = False
mqtt_lock = threading.Lock()
def monitor_timeout():
global scrape_healthy
global lastMqttReception
global mqtt_connected
while True:
time_since_last_reception = datetime.now() - lastMqttReception
if time_since_last_reception > timedelta(hours=config.ttn_timeout):
with mutex:
scrape_healthy = False
mqtt_connected = False
time.sleep(60) # Check timeout every minute
def reconnect_mqtt():
global mqtt_client
global mqtt_connected
while True:
if not mqtt_connected:
with mqtt_lock:
try:
if mqtt_client is None:
print("MQTT client is None, creating a new client...")
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.on_disconnect = on_disconnect
mqtt_client.username_pw_set(config.ttn_user, config.ttn_key)
mqtt_client.tls_set()
print("Attempting to reconnect to MQTT broker...")
mqtt_client.connect(
config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60
)
except Exception as e:
print(f"MQTT reconnect failed: {e}")
time.sleep(60) # Retry every 10 seconds
class RequestHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def get_metrics(self):
global request_count
global node_metrics
global mutex
mutex.acquire()
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
bytes(
config.exporter_prefix
+ "exporter_duration_seconds_sum "
+ str(int((datetime.now() - startTime).total_seconds()))
+ "\n",
"utf-8",
)
)
self.wfile.write(
bytes(
config.exporter_prefix
+ "exporter_request_count "
+ str(request_count)
+ "\n",
"utf-8",
)
)
self.wfile.write(
bytes(
config.exporter_prefix
+ "exporter_scrape_healthy "
+ str(int(scrape_healthy))
+ "\n",
"utf-8",
)
)
for metric in node_metrics:
self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8"))
mutex.release()
def do_GET(self):
global request_count
request_count += 1
if self.path.startswith("/metrics"):
self.get_metrics()
else:
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(bytes("<html>", "utf-8"))
self.wfile.write(
bytes("<head><title>VEGAPULS Air exporter</title></head>", "utf-8")
)
self.wfile.write(bytes("<body>", "utf-8"))
self.wfile.write(
bytes(
"<h1>ttn-vegapulsair exporter based on data from LoRaWAN TTN node.</h1>",
"utf-8",
)
)
self.wfile.write(bytes('<p><a href="/metrics">Metrics</a></p>', "utf-8"))
self.wfile.write(bytes("</body>", "utf-8"))
self.wfile.write(bytes("</html>", "utf-8"))
def update_metrics(payload, metadata):
global node_metrics
global mutex
global scrape_healthy
global lastMqttReception
mutex.acquire()
node_metrics.clear()
if "Distance" in payload:
node_metrics.append("distance " + str(float(payload["Distance"])))
if "Inclination_degree" in payload:
node_metrics.append("inclination_degree " + str(int(payload["Inclination_degree"])))
if "MvLinProcent" in payload:
node_metrics.append("linprocent " + str(int(payload["MvLinProcent"])))
if "MvProcent" in payload:
node_metrics.append("procent " + str(int(payload["MvProcent"])))
if "MvScaled" in payload:
node_metrics.append("scaled " + str(float(payload["MvScaled"])))
if "MvScaledUnit" in payload:
node_metrics.append("scaled_unit " + str(int(payload["MvScaledUnit"])))
if "PacketIdentifier" in payload:
node_metrics.append("packet_identifier " + str(int(payload["PacketIdentifier"])))
if "RemainingPower" in payload:
node_metrics.append("remaining_power " + str(int(payload["RemainingPower"])))
if "Temperature" in payload:
node_metrics.append("temperature " + str(int(payload["Temperature"])))
if "Unit" in payload:
node_metrics.append("unit " + str(int(payload["Unit"])))
if "UnitTemperature" in payload:
node_metrics.append("temperature_unit " + str(int(payload["UnitTemperature"])))
if "rssi" in metadata[0]:
node_metrics.append("rssi " + str(int(metadata[0]["rssi"])))
if "channel_rssi" in metadata[0]:
node_metrics.append("channel_rssi " + str(int(metadata[0]["channel_rssi"])))
if "snr" in metadata[0]:
node_metrics.append("snr " + str(float(metadata[0]["snr"])))
scrape_healthy = True
lastMqttReception = datetime.now()
mutex.release()
def on_connect(client, userdata, flags, reason_code, properties):
global mqtt_connected
if reason_code == 0:
print("\nConnected to MQTT: reason_code = " + str(reason_code))
mqtt_connected = True
elif reason_code > 0:
print("\nNot connected to MQTT: reason_code = " + str(reason_code))
mqtt_connected = False
def on_disconnect(client, userdata, flags, reason_code, tmp):
global mqtt_connected
print(f"Disconnected from MQTT: reason_code = {reason_code}")
mqtt_connected = False
def on_message(mqttc, obj, msg):
print("on_message")
global scrape_healthy
try:
parsedJSON = json.loads(msg.payload)
print(parsedJSON)
uplink_message = parsedJSON["uplink_message"]
update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"])
except Exception as e:
with mutex:
scrape_healthy = False
print(f"Unable to parse uplink: {e}")
def poll_mqtt(mqtt_client):
# Start the network loop
mqtt_client.loop_forever()
def configure_mqtt_client():
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
# Set credentials
client.username_pw_set(config.ttn_user, config.ttn_key)
# Set up TLS/SSL
client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLSv1_2, # Enforce TLS 1.2
)
client.tls_insecure_set(False) # Enforce strict certificate validation
return client
def main():
global mqtt_client
# Start timeout monitoring thread
timeout_thread = threading.Thread(target=monitor_timeout, daemon=True)
timeout_thread.start()
# Start MQTT reconnect thread
reconnect_thread = threading.Thread(target=reconnect_mqtt, daemon=True)
reconnect_thread.start()
while True:
mqtt_client = configure_mqtt_client()
try:
# Connect to TTN broker
broker_url = f"{config.ttn_region.lower()}.cloud.thethings.network"
mqtt_client.connect(broker_url, 8883, 60)
# Subscribe to all topics
mqtt_client.subscribe("#", 1)
logging.info(f"Subscribed to all topics.")
poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,)))
poll_mqtt_thread.start()
except Exception as e:
logging.error(f"Error occurred: {e}")
mqtt_client.loop_stop()
webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler)
print("Server started http://%s:%s" % (config.hostName, config.serverPort))
try:
webServer.serve_forever()
except KeyboardInterrupt:
sys.exit(-1)
webServer.server_close()
print("Server stopped.")
poll_mqtt_thread.join()
except Exception as e:
print(e)
time.sleep(60)
if __name__ == "__main__":
main()