#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Author: Hendrik Schutter, mail@hendrikschutter.com """ from http.server import BaseHTTPRequestHandler, HTTPServer import paho.mqtt.client as mqtt from datetime import datetime, timedelta import threading import time import json import sys import config scrape_healthy = True startTime = datetime.now() lastMqttReception = datetime.now() node_metrics = list() mutex = threading.Lock() request_count = 0 class RequestHandler(BaseHTTPRequestHandler): def get_metrics(self): global request_count global node_metrics global mutex mutex.acquire() self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(bytes(config.exporter_prefix + "expoter_duration_seconds_sum " + str(int((datetime.now() - startTime).total_seconds())) + "\n", "utf-8")) self.wfile.write(bytes(config.exporter_prefix + "exporter_request_count " + str(request_count) + "\n", "utf-8")) self.wfile.write(bytes(config.exporter_prefix + "exporter_scrape_healthy " + str(int(scrape_healthy)) + "\n", "utf-8")) for metric in node_metrics: #print(metric) self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8")) mutex.release() def do_GET(self): global request_count global lastMqttReception global scrape_healthy request_count = request_count + 1 #print("Request: " + self.path) if (self.path.startswith("/metrics")): if (datetime.now() - lastMqttReception) > timedelta(hours=7): scrape_healthy = False self.get_metrics() else: self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() self.wfile.write(bytes("", "utf-8")) self.wfile.write(bytes("VEGAPULS Air exporter", "utf-8")) self.wfile.write(bytes("", "utf-8")) self.wfile.write(bytes('

ttn-vegapulsair exporter based on data from LoRaWAN TTN node.

', "utf-8")) self.wfile.write(bytes('

Metrics

', "utf-8")) self.wfile.write(bytes("", "utf-8")) self.wfile.write(bytes("", "utf-8")) def update_metrics(payload, metadata): #print("Payload: "+ str(payload)) #print("Metadata: "+ str(metadata)) global node_metrics global mutex global scrape_healthy mutex.acquire() scrape_healthy = False node_metrics.clear() if "Distance" in payload: print("set Distance: " + str(float(payload["Distance"]))) node_metrics.append("distance " + str(float(payload["Distance"]))) if "Inclination_degree" in payload: print("set Inclination_degree: " + str(int(payload["Inclination_degree"]))) node_metrics.append("inclination_degree " + str(int(payload["Inclination_degree"]))) if "MvLinProcent" in payload: print("set MvLinProcent: " + str(int(payload["MvLinProcent"]))) node_metrics.append("linprocent " + str(int(payload["MvLinProcent"]))) if "MvProcent" in payload: print("set MvProcent: " + str(int(payload["MvProcent"]))) node_metrics.append("procent " + str(int(payload["MvProcent"]))) if "MvScaled" in payload: print("set MvScaled: " + str(float(payload["MvScaled"]))) node_metrics.append("scaled " + str(float(payload["MvScaled"]))) if "MvScaledUnit" in payload: print("set MvScaledUnit: " + str(int(payload["MvScaledUnit"]))) node_metrics.append("scaled_unit " + str(int(payload["MvScaledUnit"]))) if "PacketIdentifier" in payload: print("set PacketIdentifier: " + str(int(payload["PacketIdentifier"]))) node_metrics.append("packet_identifier " + str(int(payload["PacketIdentifier"]))) if "RemainingPower" in payload: print("set RemainingPower: " + str(int(payload["RemainingPower"]))) node_metrics.append("remaining_power " + str(int(payload["RemainingPower"]))) if "Temperature" in payload: print("set Temperature: " + str(int(payload["Temperature"]))) node_metrics.append("temperature " + str(int(payload["Temperature"]))) if "Unit" in payload: print("set Unit: " + str(int(payload["Unit"]))) node_metrics.append("unit " + str(int(payload["Unit"]))) if "UnitTemperature" in payload: print("set UnitTemperature: " + str(int(payload["UnitTemperature"]))) node_metrics.append("temperature_unit " + str(int(payload["UnitTemperature"]))) if "rssi" in metadata[0]: print("set rssi: " + str(int(metadata[0]["rssi"]))) node_metrics.append("rssi " + str(int(metadata[0]["rssi"]))) if "channel_rssi" in metadata[0]: print("set channel_rssi: " + str(int(metadata[0]["channel_rssi"]))) node_metrics.append("channel_rssi " + str(int(metadata[0]["channel_rssi"]))) if "snr" in metadata[0]: print("set snr: " + str(float(metadata[0]["snr"]))) node_metrics.append("snr " + str(float(metadata[0]["snr"]))) scrape_healthy = True mutex.release() def on_connect(mqttc, obj, flags, rc): print("\nConnected to MQTT: rc = " + str(rc)) def on_message(mqttc, obj, msg): #print("\nMessage: " + msg.topic + " " + str(msg.qos)) parsedJSON = json.loads(msg.payload) #print(json.dumps(parsedJSON, indent=4)) try: uplink_message = parsedJSON["uplink_message"] update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"]) lastMqttReception = datetime.now() except: mutex.acquire() scrape_healthy = False mutex.release() print("Unable to parse uplink") def on_subscribe(mqttc, obj, mid, granted_qos): print("\nSubscribed to MQTT: " + str(mid) + " " + str(granted_qos)) def poll_mqtt(mqttc): while True: mqttc.loop(10) # seconds timeout def main(): print("starting ...") mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1) mqttc.on_connect = on_connect mqttc.on_subscribe = on_subscribe mqttc.on_message = on_message mqttc.username_pw_set(config.ttn_user, config.ttn_key) mqttc.tls_set() mqttc.connect(config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60) mqttc.subscribe("#", 0) # all device uplinks # run mqtt in thread forever poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqttc,))) poll_mqtt_thread.start() webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler) print("Server started http://%s:%s" % (config.hostName, config.serverPort)) try: webServer.serve_forever() except KeyboardInterrupt: sys.exit(-1) webServer.server_close() print("Server stopped.") poll_mqtt_thread.join() if __name__ == "__main__": main()