rework timeout handling
This commit is contained in:
		| @ -19,6 +19,49 @@ node_metrics = list() | ||||
| mutex = threading.Lock() | ||||
| request_count = 0 | ||||
|  | ||||
| mqtt_client = None | ||||
| mqtt_connected = False | ||||
| mqtt_lock = threading.Lock() | ||||
|  | ||||
| def monitor_timeout(): | ||||
|     global scrape_healthy | ||||
|     global lastMqttReception | ||||
|     global mqtt_connected | ||||
|  | ||||
|     while True: | ||||
|         time_since_last_reception = datetime.now() - lastMqttReception | ||||
|         if time_since_last_reception > timedelta(hours=config.ttn_timeout): | ||||
|             with mutex: | ||||
|                 scrape_healthy = False | ||||
|                 mqtt_connected = False | ||||
|         time.sleep(60)  # Check timeout every minute | ||||
|  | ||||
| def reconnect_mqtt(): | ||||
|     global mqtt_client | ||||
|     global mqtt_connected | ||||
|  | ||||
|     while True: | ||||
|         if not mqtt_connected: | ||||
|             with mqtt_lock: | ||||
|                 try: | ||||
|                     if mqtt_client is None: | ||||
|                         print("MQTT client is None, creating a new client...") | ||||
|                         mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) | ||||
|                         mqtt_client.on_connect = on_connect | ||||
|                         mqtt_client.on_message = on_message | ||||
|                         mqtt_client.on_disconnect = on_disconnect | ||||
|                         mqtt_client.username_pw_set(config.ttn_user, config.ttn_key) | ||||
|                         mqtt_client.tls_set() | ||||
|  | ||||
|                     print("Attempting to reconnect to MQTT broker...") | ||||
|                     mqtt_client.connect( | ||||
|                         config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60 | ||||
|                     ) | ||||
|                 except Exception as e: | ||||
|                     print(f"MQTT reconnect failed: {e}") | ||||
|         time.sleep(10)  # Retry every 10 seconds | ||||
|  | ||||
|  | ||||
| class RequestHandler(BaseHTTPRequestHandler): | ||||
|  | ||||
|     def get_metrics(self): | ||||
| @ -32,7 +75,7 @@ class RequestHandler(BaseHTTPRequestHandler): | ||||
|         self.wfile.write( | ||||
|             bytes( | ||||
|                 config.exporter_prefix | ||||
|                 + "expoter_duration_seconds_sum " | ||||
|                 + "exporter_duration_seconds_sum " | ||||
|                 + str(int((datetime.now() - startTime).total_seconds())) | ||||
|                 + "\n", | ||||
|                 "utf-8", | ||||
| @ -58,20 +101,14 @@ class RequestHandler(BaseHTTPRequestHandler): | ||||
|         ) | ||||
|  | ||||
|         for metric in node_metrics: | ||||
|             # print(metric) | ||||
|             self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8")) | ||||
|  | ||||
|         mutex.release() | ||||
|  | ||||
|     def do_GET(self): | ||||
|         global request_count | ||||
|         global lastMqttReception | ||||
|         global scrape_healthy | ||||
|         request_count = request_count + 1 | ||||
|         # print("Request: " + self.path) | ||||
|         request_count += 1 | ||||
|         if self.path.startswith("/metrics"): | ||||
|             # if (datetime.now() - lastMqttReception) > timedelta(hours=7): | ||||
|             #   scrape_healthy = False | ||||
|             self.get_metrics() | ||||
|         else: | ||||
|             self.send_response(200) | ||||
| @ -93,128 +130,115 @@ class RequestHandler(BaseHTTPRequestHandler): | ||||
|             self.wfile.write(bytes("</html>", "utf-8")) | ||||
|  | ||||
| def update_metrics(payload, metadata): | ||||
|     # print("Payload: "+ str(payload)) | ||||
|     # print("Metadata: "+ str(metadata)) | ||||
|  | ||||
|     global node_metrics | ||||
|     global mutex | ||||
|     global scrape_healthy | ||||
|     global lastMqttReception | ||||
|  | ||||
|     mutex.acquire() | ||||
|     # scrape_healthy = False | ||||
|     node_metrics.clear() | ||||
|  | ||||
|     if "Distance" in payload: | ||||
|         print("set Distance: " + str(float(payload["Distance"]))) | ||||
|         node_metrics.append("distance " + str(float(payload["Distance"]))) | ||||
|  | ||||
|     if "Inclination_degree" in payload: | ||||
|         print("set Inclination_degree: " + str(int(payload["Inclination_degree"]))) | ||||
|         node_metrics.append( | ||||
|             "inclination_degree " + str(int(payload["Inclination_degree"])) | ||||
|         ) | ||||
|         node_metrics.append("inclination_degree " + str(int(payload["Inclination_degree"]))) | ||||
|  | ||||
|     if "MvLinProcent" in payload: | ||||
|         print("set MvLinProcent: " + str(int(payload["MvLinProcent"]))) | ||||
|         node_metrics.append("linprocent " + str(int(payload["MvLinProcent"]))) | ||||
|  | ||||
|     if "MvProcent" in payload: | ||||
|         print("set MvProcent: " + str(int(payload["MvProcent"]))) | ||||
|         node_metrics.append("procent " + str(int(payload["MvProcent"]))) | ||||
|  | ||||
|     if "MvScaled" in payload: | ||||
|         print("set MvScaled: " + str(float(payload["MvScaled"]))) | ||||
|         node_metrics.append("scaled " + str(float(payload["MvScaled"]))) | ||||
|  | ||||
|     if "MvScaledUnit" in payload: | ||||
|         print("set MvScaledUnit: " + str(int(payload["MvScaledUnit"]))) | ||||
|         node_metrics.append("scaled_unit " + str(int(payload["MvScaledUnit"]))) | ||||
|  | ||||
|     if "PacketIdentifier" in payload: | ||||
|         print("set PacketIdentifier: " + str(int(payload["PacketIdentifier"]))) | ||||
|         node_metrics.append( | ||||
|             "packet_identifier " + str(int(payload["PacketIdentifier"])) | ||||
|         ) | ||||
|         node_metrics.append("packet_identifier " + str(int(payload["PacketIdentifier"]))) | ||||
|  | ||||
|     if "RemainingPower" in payload: | ||||
|         print("set RemainingPower: " + str(int(payload["RemainingPower"]))) | ||||
|         node_metrics.append("remaining_power " + str(int(payload["RemainingPower"]))) | ||||
|  | ||||
|     if "Temperature" in payload: | ||||
|         print("set Temperature: " + str(int(payload["Temperature"]))) | ||||
|         node_metrics.append("temperature " + str(int(payload["Temperature"]))) | ||||
|  | ||||
|     if "Unit" in payload: | ||||
|         print("set Unit: " + str(int(payload["Unit"]))) | ||||
|         node_metrics.append("unit " + str(int(payload["Unit"]))) | ||||
|  | ||||
|     if "UnitTemperature" in payload: | ||||
|         print("set UnitTemperature: " + str(int(payload["UnitTemperature"]))) | ||||
|         node_metrics.append("temperature_unit " + str(int(payload["UnitTemperature"]))) | ||||
|  | ||||
|     if "rssi" in metadata[0]: | ||||
|         print("set rssi: " + str(int(metadata[0]["rssi"]))) | ||||
|         node_metrics.append("rssi " + str(int(metadata[0]["rssi"]))) | ||||
|  | ||||
|     if "channel_rssi" in metadata[0]: | ||||
|         print("set channel_rssi: " + str(int(metadata[0]["channel_rssi"]))) | ||||
|         node_metrics.append("channel_rssi " + str(int(metadata[0]["channel_rssi"]))) | ||||
|  | ||||
|     if "snr" in metadata[0]: | ||||
|         print("set snr: " + str(float(metadata[0]["snr"]))) | ||||
|         node_metrics.append("snr " + str(float(metadata[0]["snr"]))) | ||||
|  | ||||
|     scrape_healthy = True | ||||
|     lastMqttReception = datetime.now() | ||||
|     mutex.release() | ||||
|  | ||||
| def on_connect(client, userdata, flags, reason_code, properties): | ||||
|     global mqtt_connected | ||||
|     if reason_code == 0: | ||||
|         print("\nConnected to MQTT: reason_code = " + str(reason_code)) | ||||
|     if reason_code > 0: | ||||
|         mqtt_connected = True | ||||
|     elif reason_code > 0: | ||||
|         print("\nNot connected to MQTT: reason_code = " + str(reason_code)) | ||||
|         mqtt_connected = False | ||||
|  | ||||
| def on_disconnect(client, userdata, flags, reason_code): | ||||
|     global mqtt_connected | ||||
|     print(f"Disconnected from MQTT: reason_code = {reason_code}") | ||||
|     mqtt_connected = False | ||||
|  | ||||
| def on_message(mqttc, obj, msg): | ||||
|     # print("\nMessage: " + msg.topic + " " + str(msg.qos)) | ||||
|     parsedJSON = json.loads(msg.payload) | ||||
|     # print(json.dumps(parsedJSON, indent=4)) | ||||
|     global scrape_healthy | ||||
|  | ||||
|     try: | ||||
|         parsedJSON = json.loads(msg.payload) | ||||
|         uplink_message = parsedJSON["uplink_message"] | ||||
|         update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"]) | ||||
|         lastMqttReception = datetime.now() | ||||
|     except: | ||||
|         mutex.acquire() | ||||
|         scrape_healthy = False | ||||
|         mutex.release() | ||||
|         print("Unable to parse uplink") | ||||
|  | ||||
| def on_subscribe(client, userdata, mid, reason_codes, properties): | ||||
|     print("\nSubscribed to MQTT: " + str(mid)) | ||||
|     except Exception as e: | ||||
|         with mutex: | ||||
|             scrape_healthy = False | ||||
|         print(f"Unable to parse uplink: {e}") | ||||
|  | ||||
| def poll_mqtt(mqttc): | ||||
|     mqttc.loop_forever() | ||||
|  | ||||
| def on_disconnect(client, userdata, flags, reason_code, properties): | ||||
|     print("disconnect occoured") | ||||
|     client.reconnect() | ||||
|  | ||||
| def main(): | ||||
|     global mqtt_client | ||||
|  | ||||
|     # Start timeout monitoring thread | ||||
|     timeout_thread = threading.Thread(target=monitor_timeout, daemon=True) | ||||
|     timeout_thread.start() | ||||
|  | ||||
|     # Start MQTT reconnect thread | ||||
|     reconnect_thread = threading.Thread(target=reconnect_mqtt, daemon=True) | ||||
|     reconnect_thread.start() | ||||
|  | ||||
|     while True: | ||||
|         try: | ||||
|             print("starting ...") | ||||
|             mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) | ||||
|             mqttc.on_connect = on_connect | ||||
|             mqttc.on_subscribe = on_subscribe | ||||
|             mqttc.on_message = on_message | ||||
|             mqttc.on_disconnect = on_disconnect | ||||
|             mqttc.username_pw_set(config.ttn_user, config.ttn_key) | ||||
|             mqttc.tls_set() | ||||
|             mqttc.connect( | ||||
|             mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) | ||||
|             mqtt_client.on_connect = on_connect | ||||
|             mqtt_client.on_message = on_message | ||||
|             mqtt_client.on_disconnect = on_disconnect | ||||
|             mqtt_client.username_pw_set(config.ttn_user, config.ttn_key) | ||||
|             mqtt_client.tls_set() | ||||
|             mqtt_client.connect( | ||||
|                 config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60 | ||||
|             ) | ||||
|             mqttc.subscribe("#", 0)  # all device uplinks | ||||
|             mqtt_client.subscribe("#", 0)  # all device uplinks | ||||
|  | ||||
|             # run mqtt in thread forever | ||||
|             poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqttc,))) | ||||
|             poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,))) | ||||
|             poll_mqtt_thread.start() | ||||
|  | ||||
|             webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler) | ||||
|  | ||||
		Reference in New Issue
	
	Block a user