new mqtt connection
This commit is contained in:
		| @ -11,6 +11,9 @@ import time | |||||||
| import json | import json | ||||||
| import sys | import sys | ||||||
| import config | import config | ||||||
|  | import logging | ||||||
|  | import ssl | ||||||
|  |  | ||||||
|  |  | ||||||
| scrape_healthy = True | scrape_healthy = True | ||||||
| startTime = datetime.now() | startTime = datetime.now() | ||||||
| @ -214,8 +217,27 @@ def on_message(mqttc, obj, msg): | |||||||
|             scrape_healthy = False |             scrape_healthy = False | ||||||
|         print(f"Unable to parse uplink: {e}") |         print(f"Unable to parse uplink: {e}") | ||||||
|  |  | ||||||
| def poll_mqtt(mqttc): | def poll_mqtt(mqtt_client): | ||||||
|     mqttc.loop_forever() |     # Start the network loop | ||||||
|  |     mqtt_client.loop_forever() | ||||||
|  |  | ||||||
|  | def configure_mqtt_client(): | ||||||
|  |     client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) | ||||||
|  |     client.on_connect = on_connect | ||||||
|  |     client.on_message = on_message | ||||||
|  |     client.on_disconnect = on_disconnect | ||||||
|  |  | ||||||
|  |     # Set credentials | ||||||
|  |     client.username_pw_set(config.ttn_user, config.ttn_key) | ||||||
|  |  | ||||||
|  |     # Set up TLS/SSL | ||||||
|  |     client.tls_set( | ||||||
|  |         cert_reqs=ssl.CERT_REQUIRED, | ||||||
|  |         tls_version=ssl.PROTOCOL_TLSv1_2,  # Enforce TLS 1.2 | ||||||
|  |     ) | ||||||
|  |     client.tls_insecure_set(False)  # Enforce strict certificate validation | ||||||
|  |  | ||||||
|  |     return client | ||||||
|  |  | ||||||
| def main(): | def main(): | ||||||
|     global mqtt_client |     global mqtt_client | ||||||
| @ -229,21 +251,21 @@ def main(): | |||||||
|     reconnect_thread.start() |     reconnect_thread.start() | ||||||
|  |  | ||||||
|     while True: |     while True: | ||||||
|  |         mqtt_client = configure_mqtt_client() | ||||||
|         try: |         try: | ||||||
|             print("starting ...") |             # Connect to TTN broker | ||||||
|             mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) |             broker_url = f"{config.ttn_region.lower()}.cloud.thethings.network" | ||||||
|             mqtt_client.on_connect = on_connect |             mqtt_client.connect(broker_url, 8883, 60) | ||||||
|             mqtt_client.on_message = on_message |              | ||||||
|             mqtt_client.on_disconnect = on_disconnect |             # Subscribe to all topics | ||||||
|             mqtt_client.username_pw_set(config.ttn_user, config.ttn_key) |             mqtt_client.subscribe("#", 1) | ||||||
|             mqtt_client.tls_set() |             logging.info(f"Subscribed to all topics.") | ||||||
|             mqtt_client.connect( |  | ||||||
|                 config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60 |  | ||||||
|             ) |  | ||||||
|             mqtt_client.subscribe("#", 0)  # all device uplinks |  | ||||||
|  |  | ||||||
|             poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,))) |             poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqtt_client,))) | ||||||
|             poll_mqtt_thread.start() |             poll_mqtt_thread.start() | ||||||
|  |         except Exception as e: | ||||||
|  |             logging.error(f"Error occurred: {e}") | ||||||
|  |             mqtt_client.loop_stop() | ||||||
|  |  | ||||||
|         webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler) |         webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler) | ||||||
|         print("Server started http://%s:%s" % (config.hostName, config.serverPort)) |         print("Server started http://%s:%s" % (config.hostName, config.serverPort)) | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user