From ca94d770520c8a0658f29b5e4a7386747c9f9ddd Mon Sep 17 00:00:00 2001 From: localhorst Date: Sun, 4 May 2025 12:06:15 +0200 Subject: [PATCH] rework timeout handling --- ttn-vegapulsair-exporter.py | 144 +++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 60 deletions(-) diff --git a/ttn-vegapulsair-exporter.py b/ttn-vegapulsair-exporter.py index 2d4ff07..b929669 100644 --- a/ttn-vegapulsair-exporter.py +++ b/ttn-vegapulsair-exporter.py @@ -19,6 +19,49 @@ node_metrics = list() mutex = threading.Lock() request_count = 0 +mqtt_client = None +mqtt_connected = False +mqtt_lock = threading.Lock() + +def monitor_timeout(): + global scrape_healthy + global lastMqttReception + global mqtt_connected + + while True: + time_since_last_reception = datetime.now() - lastMqttReception + if time_since_last_reception > timedelta(hours=config.ttn_timeout): + with mutex: + scrape_healthy = False + mqtt_connected = False + time.sleep(60) # Check timeout every minute + +def reconnect_mqtt(): + global mqtt_client + global mqtt_connected + + while True: + if not mqtt_connected: + with mqtt_lock: + try: + if mqtt_client is None: + print("MQTT client is None, creating a new client...") + mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + mqtt_client.on_connect = on_connect + mqtt_client.on_message = on_message + mqtt_client.on_disconnect = on_disconnect + mqtt_client.username_pw_set(config.ttn_user, config.ttn_key) + mqtt_client.tls_set() + + print("Attempting to reconnect to MQTT broker...") + mqtt_client.connect( + config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60 + ) + except Exception as e: + print(f"MQTT reconnect failed: {e}") + time.sleep(10) # Retry every 10 seconds + + class RequestHandler(BaseHTTPRequestHandler): def get_metrics(self): @@ -32,7 +75,7 @@ class RequestHandler(BaseHTTPRequestHandler): self.wfile.write( bytes( config.exporter_prefix - + "expoter_duration_seconds_sum " + + "exporter_duration_seconds_sum " + str(int((datetime.now() - startTime).total_seconds())) + "\n", "utf-8", @@ -58,20 +101,14 @@ class RequestHandler(BaseHTTPRequestHandler): ) for metric in node_metrics: - # print(metric) self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8")) mutex.release() def do_GET(self): global request_count - global lastMqttReception - global scrape_healthy - request_count = request_count + 1 - # print("Request: " + self.path) + request_count += 1 if self.path.startswith("/metrics"): - # if (datetime.now() - lastMqttReception) > timedelta(hours=7): - # scrape_healthy = False self.get_metrics() else: self.send_response(200) @@ -93,128 +130,115 @@ class RequestHandler(BaseHTTPRequestHandler): self.wfile.write(bytes("", "utf-8")) def update_metrics(payload, metadata): - # print("Payload: "+ str(payload)) - # print("Metadata: "+ str(metadata)) - global node_metrics global mutex global scrape_healthy + global lastMqttReception + mutex.acquire() - # scrape_healthy = False node_metrics.clear() if "Distance" in payload: - print("set Distance: " + str(float(payload["Distance"]))) node_metrics.append("distance " + str(float(payload["Distance"]))) if "Inclination_degree" in payload: - print("set Inclination_degree: " + str(int(payload["Inclination_degree"]))) - node_metrics.append( - "inclination_degree " + str(int(payload["Inclination_degree"])) - ) + node_metrics.append("inclination_degree " + str(int(payload["Inclination_degree"]))) if "MvLinProcent" in payload: - print("set MvLinProcent: " + str(int(payload["MvLinProcent"]))) node_metrics.append("linprocent " + str(int(payload["MvLinProcent"]))) if "MvProcent" in payload: - print("set MvProcent: " + str(int(payload["MvProcent"]))) node_metrics.append("procent " + str(int(payload["MvProcent"]))) if "MvScaled" in payload: - print("set MvScaled: " + str(float(payload["MvScaled"]))) node_metrics.append("scaled " + str(float(payload["MvScaled"]))) if "MvScaledUnit" in payload: - print("set MvScaledUnit: " + str(int(payload["MvScaledUnit"]))) node_metrics.append("scaled_unit " + str(int(payload["MvScaledUnit"]))) if "PacketIdentifier" in payload: - print("set PacketIdentifier: " + str(int(payload["PacketIdentifier"]))) - node_metrics.append( - "packet_identifier " + str(int(payload["PacketIdentifier"])) - ) + node_metrics.append("packet_identifier " + str(int(payload["PacketIdentifier"]))) if "RemainingPower" in payload: - print("set RemainingPower: " + str(int(payload["RemainingPower"]))) node_metrics.append("remaining_power " + str(int(payload["RemainingPower"]))) if "Temperature" in payload: - print("set Temperature: " + str(int(payload["Temperature"]))) node_metrics.append("temperature " + str(int(payload["Temperature"]))) if "Unit" in payload: - print("set Unit: " + str(int(payload["Unit"]))) node_metrics.append("unit " + str(int(payload["Unit"]))) if "UnitTemperature" in payload: - print("set UnitTemperature: " + str(int(payload["UnitTemperature"]))) node_metrics.append("temperature_unit " + str(int(payload["UnitTemperature"]))) if "rssi" in metadata[0]: - print("set rssi: " + str(int(metadata[0]["rssi"]))) node_metrics.append("rssi " + str(int(metadata[0]["rssi"]))) if "channel_rssi" in metadata[0]: - print("set channel_rssi: " + str(int(metadata[0]["channel_rssi"]))) node_metrics.append("channel_rssi " + str(int(metadata[0]["channel_rssi"]))) if "snr" in metadata[0]: - print("set snr: " + str(float(metadata[0]["snr"]))) node_metrics.append("snr " + str(float(metadata[0]["snr"]))) scrape_healthy = True + lastMqttReception = datetime.now() mutex.release() def on_connect(client, userdata, flags, reason_code, properties): + global mqtt_connected if reason_code == 0: print("\nConnected to MQTT: reason_code = " + str(reason_code)) - if reason_code > 0: + mqtt_connected = True + elif reason_code > 0: print("\nNot connected to MQTT: reason_code = " + str(reason_code)) + mqtt_connected = False + +def on_disconnect(client, userdata, flags, reason_code): + global mqtt_connected + print(f"Disconnected from MQTT: reason_code = {reason_code}") + mqtt_connected = False def on_message(mqttc, obj, msg): - # print("\nMessage: " + msg.topic + " " + str(msg.qos)) - parsedJSON = json.loads(msg.payload) - # print(json.dumps(parsedJSON, indent=4)) + global scrape_healthy try: + parsedJSON = json.loads(msg.payload) uplink_message = parsedJSON["uplink_message"] update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"]) - lastMqttReception = datetime.now() - except: - mutex.acquire() - scrape_healthy = False - mutex.release() - print("Unable to parse uplink") - -def on_subscribe(client, userdata, mid, reason_codes, properties): - print("\nSubscribed to MQTT: " + str(mid)) + except Exception as e: + with mutex: + scrape_healthy = False + print(f"Unable to parse uplink: {e}") def poll_mqtt(mqttc): mqttc.loop_forever() -def on_disconnect(client, userdata, flags, reason_code, properties): - print("disconnect occoured") - client.reconnect() - def main(): + global mqtt_client + + # Start timeout monitoring thread + timeout_thread = threading.Thread(target=monitor_timeout, daemon=True) + timeout_thread.start() + + # Start MQTT reconnect thread + reconnect_thread = threading.Thread(target=reconnect_mqtt, daemon=True) + reconnect_thread.start() + while True: try: print("starting ...") - mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) - mqttc.on_connect = on_connect - mqttc.on_subscribe = on_subscribe - mqttc.on_message = on_message - mqttc.on_disconnect = on_disconnect - mqttc.username_pw_set(config.ttn_user, config.ttn_key) - mqttc.tls_set() - mqttc.connect( + mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + mqtt_client.on_connect = on_connect + mqtt_client.on_message = on_message + mqtt_client.on_disconnect = on_disconnect + mqtt_client.username_pw_set(config.ttn_user, config.ttn_key) + mqtt_client.tls_set() + mqtt_client.connect( config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60 ) - mqttc.subscribe("#", 0) # all device uplinks + mqtt_client.subscribe("#", 0) # all device uplinks - # run mqtt in thread forever - poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqttc,))) + poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,))) poll_mqtt_thread.start() webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler)