287 lines
8.9 KiB
Python
287 lines
8.9 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
""" Author: Hendrik Schutter, mail@hendrikschutter.com
|
|
"""
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
import paho.mqtt.client as mqtt
|
|
from datetime import datetime, timedelta
|
|
import threading
|
|
import time
|
|
import json
|
|
import sys
|
|
import config
|
|
import logging
|
|
import ssl
|
|
|
|
|
|
scrape_healthy = True
|
|
startTime = datetime.now()
|
|
lastMqttReception = datetime.now()
|
|
node_metrics = list()
|
|
mutex = threading.Lock()
|
|
request_count = 0
|
|
|
|
mqtt_client = None
|
|
mqtt_connected = False
|
|
mqtt_lock = threading.Lock()
|
|
|
|
def monitor_timeout():
|
|
global scrape_healthy
|
|
global lastMqttReception
|
|
global mqtt_connected
|
|
|
|
while True:
|
|
time_since_last_reception = datetime.now() - lastMqttReception
|
|
if time_since_last_reception > timedelta(hours=config.ttn_timeout):
|
|
with mutex:
|
|
scrape_healthy = False
|
|
mqtt_connected = False
|
|
time.sleep(60) # Check timeout every minute
|
|
|
|
def reconnect_mqtt():
|
|
global mqtt_client
|
|
global mqtt_connected
|
|
|
|
while True:
|
|
if not mqtt_connected:
|
|
with mqtt_lock:
|
|
try:
|
|
if mqtt_client is None:
|
|
print("MQTT client is None, creating a new client...")
|
|
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
mqtt_client.on_connect = on_connect
|
|
mqtt_client.on_message = on_message
|
|
mqtt_client.on_disconnect = on_disconnect
|
|
mqtt_client.username_pw_set(config.ttn_user, config.ttn_key)
|
|
mqtt_client.tls_set()
|
|
|
|
print("Attempting to reconnect to MQTT broker...")
|
|
mqtt_client.connect(
|
|
config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60
|
|
)
|
|
except Exception as e:
|
|
print(f"MQTT reconnect failed: {e}")
|
|
time.sleep(60) # Retry every 10 seconds
|
|
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
def get_metrics(self):
|
|
global request_count
|
|
global node_metrics
|
|
global mutex
|
|
mutex.acquire()
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "text/html")
|
|
self.end_headers()
|
|
self.wfile.write(
|
|
bytes(
|
|
config.exporter_prefix
|
|
+ "exporter_duration_seconds_sum "
|
|
+ str(int((datetime.now() - startTime).total_seconds()))
|
|
+ "\n",
|
|
"utf-8",
|
|
)
|
|
)
|
|
self.wfile.write(
|
|
bytes(
|
|
config.exporter_prefix
|
|
+ "exporter_request_count "
|
|
+ str(request_count)
|
|
+ "\n",
|
|
"utf-8",
|
|
)
|
|
)
|
|
self.wfile.write(
|
|
bytes(
|
|
config.exporter_prefix
|
|
+ "exporter_scrape_healthy "
|
|
+ str(int(scrape_healthy))
|
|
+ "\n",
|
|
"utf-8",
|
|
)
|
|
)
|
|
|
|
for metric in node_metrics:
|
|
self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8"))
|
|
|
|
mutex.release()
|
|
|
|
def do_GET(self):
|
|
global request_count
|
|
request_count += 1
|
|
if self.path.startswith("/metrics"):
|
|
self.get_metrics()
|
|
else:
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "text/html")
|
|
self.end_headers()
|
|
self.wfile.write(bytes("<html>", "utf-8"))
|
|
self.wfile.write(
|
|
bytes("<head><title>VEGAPULS Air exporter</title></head>", "utf-8")
|
|
)
|
|
self.wfile.write(bytes("<body>", "utf-8"))
|
|
self.wfile.write(
|
|
bytes(
|
|
"<h1>ttn-vegapulsair exporter based on data from LoRaWAN TTN node.</h1>",
|
|
"utf-8",
|
|
)
|
|
)
|
|
self.wfile.write(bytes('<p><a href="/metrics">Metrics</a></p>', "utf-8"))
|
|
self.wfile.write(bytes("</body>", "utf-8"))
|
|
self.wfile.write(bytes("</html>", "utf-8"))
|
|
|
|
def update_metrics(payload, metadata):
|
|
global node_metrics
|
|
global mutex
|
|
global scrape_healthy
|
|
global lastMqttReception
|
|
|
|
mutex.acquire()
|
|
node_metrics.clear()
|
|
|
|
if "Distance" in payload:
|
|
node_metrics.append("distance " + str(float(payload["Distance"])))
|
|
|
|
if "Inclination_degree" in payload:
|
|
node_metrics.append("inclination_degree " + str(int(payload["Inclination_degree"])))
|
|
|
|
if "MvLinProcent" in payload:
|
|
node_metrics.append("linprocent " + str(int(payload["MvLinProcent"])))
|
|
|
|
if "MvProcent" in payload:
|
|
node_metrics.append("procent " + str(int(payload["MvProcent"])))
|
|
|
|
if "MvScaled" in payload:
|
|
node_metrics.append("scaled " + str(float(payload["MvScaled"])))
|
|
|
|
if "MvScaledUnit" in payload:
|
|
node_metrics.append("scaled_unit " + str(int(payload["MvScaledUnit"])))
|
|
|
|
if "PacketIdentifier" in payload:
|
|
node_metrics.append("packet_identifier " + str(int(payload["PacketIdentifier"])))
|
|
|
|
if "RemainingPower" in payload:
|
|
node_metrics.append("remaining_power " + str(int(payload["RemainingPower"])))
|
|
|
|
if "Temperature" in payload:
|
|
node_metrics.append("temperature " + str(int(payload["Temperature"])))
|
|
|
|
if "Unit" in payload:
|
|
node_metrics.append("unit " + str(int(payload["Unit"])))
|
|
|
|
if "UnitTemperature" in payload:
|
|
node_metrics.append("temperature_unit " + str(int(payload["UnitTemperature"])))
|
|
|
|
if "rssi" in metadata[0]:
|
|
node_metrics.append("rssi " + str(int(metadata[0]["rssi"])))
|
|
|
|
if "channel_rssi" in metadata[0]:
|
|
node_metrics.append("channel_rssi " + str(int(metadata[0]["channel_rssi"])))
|
|
|
|
if "snr" in metadata[0]:
|
|
node_metrics.append("snr " + str(float(metadata[0]["snr"])))
|
|
|
|
scrape_healthy = True
|
|
lastMqttReception = datetime.now()
|
|
mutex.release()
|
|
|
|
def on_connect(client, userdata, flags, reason_code, properties):
|
|
global mqtt_connected
|
|
if reason_code == 0:
|
|
print("\nConnected to MQTT: reason_code = " + str(reason_code))
|
|
mqtt_connected = True
|
|
elif reason_code > 0:
|
|
print("\nNot connected to MQTT: reason_code = " + str(reason_code))
|
|
mqtt_connected = False
|
|
|
|
def on_disconnect(client, userdata, flags, reason_code, tmp):
|
|
global mqtt_connected
|
|
print(f"Disconnected from MQTT: reason_code = {reason_code}")
|
|
mqtt_connected = False
|
|
|
|
def on_message(mqttc, obj, msg):
|
|
print("on_message")
|
|
global scrape_healthy
|
|
|
|
try:
|
|
parsedJSON = json.loads(msg.payload)
|
|
print(parsedJSON)
|
|
uplink_message = parsedJSON["uplink_message"]
|
|
update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"])
|
|
except Exception as e:
|
|
with mutex:
|
|
scrape_healthy = False
|
|
print(f"Unable to parse uplink: {e}")
|
|
|
|
def poll_mqtt(mqtt_client):
|
|
# Start the network loop
|
|
mqtt_client.loop_forever()
|
|
|
|
def configure_mqtt_client():
|
|
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
client.on_disconnect = on_disconnect
|
|
|
|
# Set credentials
|
|
client.username_pw_set(config.ttn_user, config.ttn_key)
|
|
|
|
# Set up TLS/SSL
|
|
client.tls_set(
|
|
cert_reqs=ssl.CERT_REQUIRED,
|
|
tls_version=ssl.PROTOCOL_TLSv1_2, # Enforce TLS 1.2
|
|
)
|
|
client.tls_insecure_set(False) # Enforce strict certificate validation
|
|
|
|
return client
|
|
|
|
def main():
|
|
global mqtt_client
|
|
|
|
# Start timeout monitoring thread
|
|
timeout_thread = threading.Thread(target=monitor_timeout, daemon=True)
|
|
timeout_thread.start()
|
|
|
|
# Start MQTT reconnect thread
|
|
reconnect_thread = threading.Thread(target=reconnect_mqtt, daemon=True)
|
|
reconnect_thread.start()
|
|
|
|
while True:
|
|
mqtt_client = configure_mqtt_client()
|
|
try:
|
|
# Connect to TTN broker
|
|
broker_url = f"{config.ttn_region.lower()}.cloud.thethings.network"
|
|
mqtt_client.connect(broker_url, 8883, 60)
|
|
|
|
# Subscribe to all topics
|
|
mqtt_client.subscribe("#", 1)
|
|
logging.info(f"Subscribed to all topics.")
|
|
|
|
poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,)))
|
|
poll_mqtt_thread.start()
|
|
except Exception as e:
|
|
logging.error(f"Error occurred: {e}")
|
|
mqtt_client.loop_stop()
|
|
|
|
webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler)
|
|
print("Server started http://%s:%s" % (config.hostName, config.serverPort))
|
|
|
|
try:
|
|
webServer.serve_forever()
|
|
except KeyboardInterrupt:
|
|
sys.exit(-1)
|
|
|
|
webServer.server_close()
|
|
print("Server stopped.")
|
|
poll_mqtt_thread.join()
|
|
except Exception as e:
|
|
print(e)
|
|
time.sleep(60)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|