#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Author: Hendrik Schutter, mail@hendrikschutter.com """ from http.server import BaseHTTPRequestHandler, HTTPServer import paho.mqtt.client as mqtt from datetime import datetime, timedelta import threading import time import json import sys import config import logging import ssl scrape_healthy = True startTime = datetime.now() lastMqttReception = datetime.now() node_metrics = list() mutex = threading.Lock() request_count = 0 mqtt_client = None mqtt_connected = False mqtt_lock = threading.Lock() def monitor_timeout(): global scrape_healthy global lastMqttReception global mqtt_connected while True: time_since_last_reception = datetime.now() - lastMqttReception if time_since_last_reception > timedelta(hours=config.ttn_timeout): with mutex: scrape_healthy = False mqtt_connected = False time.sleep(60) # Check timeout every minute def reconnect_mqtt(): global mqtt_client global mqtt_connected while True: if not mqtt_connected: with mqtt_lock: try: if mqtt_client is None: print("MQTT client is None, creating a new client...") mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message mqtt_client.on_disconnect = on_disconnect mqtt_client.username_pw_set(config.ttn_user, config.ttn_key) mqtt_client.tls_set() print("Attempting to reconnect to MQTT broker...") mqtt_client.connect( config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60 ) except Exception as e: print(f"MQTT reconnect failed: {e}") time.sleep(60) # Retry every 10 seconds class RequestHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass def get_metrics(self): global request_count global node_metrics global mutex mutex.acquire() self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write( bytes( config.exporter_prefix + "exporter_duration_seconds_sum " + str(int((datetime.now() - startTime).total_seconds())) + "\n", "utf-8", ) ) self.wfile.write( bytes( config.exporter_prefix + "exporter_request_count " + str(request_count) + "\n", "utf-8", ) ) self.wfile.write( bytes( config.exporter_prefix + "exporter_scrape_healthy " + str(int(scrape_healthy)) + "\n", "utf-8", ) ) for metric in node_metrics: self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8")) mutex.release() def do_GET(self): global request_count request_count += 1 if self.path.startswith("/metrics"): self.get_metrics() else: self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(bytes("", "utf-8")) self.wfile.write( bytes("VEGAPULS Air exporter", "utf-8") ) self.wfile.write(bytes("", "utf-8")) self.wfile.write( bytes( "

ttn-vegapulsair exporter based on data from LoRaWAN TTN node.

", "utf-8", ) ) self.wfile.write(bytes('

Metrics

', "utf-8")) self.wfile.write(bytes("", "utf-8")) self.wfile.write(bytes("", "utf-8")) def update_metrics(payload, metadata): global node_metrics global mutex global scrape_healthy global lastMqttReception mutex.acquire() node_metrics.clear() if "Distance" in payload: node_metrics.append("distance " + str(float(payload["Distance"]))) if "Inclination_degree" in payload: node_metrics.append("inclination_degree " + str(int(payload["Inclination_degree"]))) if "MvLinProcent" in payload: node_metrics.append("linprocent " + str(int(payload["MvLinProcent"]))) if "MvProcent" in payload: node_metrics.append("procent " + str(int(payload["MvProcent"]))) if "MvScaled" in payload: node_metrics.append("scaled " + str(float(payload["MvScaled"]))) if "MvScaledUnit" in payload: node_metrics.append("scaled_unit " + str(int(payload["MvScaledUnit"]))) if "PacketIdentifier" in payload: node_metrics.append("packet_identifier " + str(int(payload["PacketIdentifier"]))) if "RemainingPower" in payload: node_metrics.append("remaining_power " + str(int(payload["RemainingPower"]))) if "Temperature" in payload: node_metrics.append("temperature " + str(int(payload["Temperature"]))) if "Unit" in payload: node_metrics.append("unit " + str(int(payload["Unit"]))) if "UnitTemperature" in payload: node_metrics.append("temperature_unit " + str(int(payload["UnitTemperature"]))) if "rssi" in metadata[0]: node_metrics.append("rssi " + str(int(metadata[0]["rssi"]))) if "channel_rssi" in metadata[0]: node_metrics.append("channel_rssi " + str(int(metadata[0]["channel_rssi"]))) if "snr" in metadata[0]: node_metrics.append("snr " + str(float(metadata[0]["snr"]))) scrape_healthy = True lastMqttReception = datetime.now() mutex.release() def on_connect(client, userdata, flags, reason_code, properties): global mqtt_connected if reason_code == 0: print("\nConnected to MQTT: reason_code = " + str(reason_code)) mqtt_connected = True elif reason_code > 0: print("\nNot connected to MQTT: reason_code = " + str(reason_code)) mqtt_connected = False def on_disconnect(client, userdata, flags, reason_code, tmp): global mqtt_connected print(f"Disconnected from MQTT: reason_code = {reason_code}") mqtt_connected = False def on_message(mqttc, obj, msg): print("on_message") global scrape_healthy try: parsedJSON = json.loads(msg.payload) print(parsedJSON) uplink_message = parsedJSON["uplink_message"] update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"]) except Exception as e: with mutex: scrape_healthy = False print(f"Unable to parse uplink: {e}") def poll_mqtt(mqtt_client): # Start the network loop mqtt_client.loop_forever() def configure_mqtt_client(): client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect # Set credentials client.username_pw_set(config.ttn_user, config.ttn_key) # Set up TLS/SSL client.tls_set( cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, # Enforce TLS 1.2 ) client.tls_insecure_set(False) # Enforce strict certificate validation return client def main(): global mqtt_client # Start timeout monitoring thread timeout_thread = threading.Thread(target=monitor_timeout, daemon=True) timeout_thread.start() # Start MQTT reconnect thread reconnect_thread = threading.Thread(target=reconnect_mqtt, daemon=True) reconnect_thread.start() while True: mqtt_client = configure_mqtt_client() try: # Connect to TTN broker broker_url = f"{config.ttn_region.lower()}.cloud.thethings.network" mqtt_client.connect(broker_url, 8883, 60) # Subscribe to all topics mqtt_client.subscribe("#", 1) logging.info(f"Subscribed to all topics.") poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,))) poll_mqtt_thread.start() except Exception as e: logging.error(f"Error occurred: {e}") mqtt_client.loop_stop() webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler) print("Server started http://%s:%s" % (config.hostName, config.serverPort)) try: webServer.serve_forever() except KeyboardInterrupt: sys.exit(-1) webServer.server_close() print("Server stopped.") poll_mqtt_thread.join() except Exception as e: print(e) time.sleep(60) if __name__ == "__main__": main()