|
|
|
@ -2,28 +2,16 @@
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
""" Author: Hendrik Schutter, mail@hendrikschutter.com
|
|
|
|
|
Date of creation: 2022/03/01
|
|
|
|
|
Date of last modification: 2023/03/01
|
|
|
|
|
Date of last modification: 2023/03/02
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
|
import time
|
|
|
|
|
import threading
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from urllib.parse import urlsplit, parse_qs
|
|
|
|
|
from random import randrange
|
|
|
|
|
import os, sys, logging, time
|
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
|
import json
|
|
|
|
|
import csv
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
|
hostName = "127.0.0.1"
|
|
|
|
|
serverPort = 9101
|
|
|
|
|
exporter_prefix = "msv_clubhouse_"
|
|
|
|
|
|
|
|
|
|
User = "APP UUID @ttn"
|
|
|
|
|
Password = "API Key"
|
|
|
|
|
theRegion = "EU1" # The region you are using
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
import json
|
|
|
|
|
import config
|
|
|
|
|
|
|
|
|
|
scrape_healthy = True
|
|
|
|
|
startTime = datetime.now()
|
|
|
|
@ -31,45 +19,23 @@ node_metrics = list()
|
|
|
|
|
mutex = threading.Lock()
|
|
|
|
|
request_count = 0
|
|
|
|
|
|
|
|
|
|
# MQTT event functions
|
|
|
|
|
def on_connect(mqttc, obj, flags, rc):
|
|
|
|
|
print("\nConnect: rc = " + str(rc))
|
|
|
|
|
|
|
|
|
|
def on_message(mqttc, obj, msg):
|
|
|
|
|
print("\nMessage: " + msg.topic + " " + str(msg.qos)) # + " " + str(msg.payload))
|
|
|
|
|
parsedJSON = json.loads(msg.payload)
|
|
|
|
|
print(json.dumps(parsedJSON, indent=4)) # Uncomment this to fill your terminal screen with JSON
|
|
|
|
|
uplink_message = parsedJSON["uplink_message"];
|
|
|
|
|
frm_payload = uplink_message["frm_payload"];
|
|
|
|
|
print("MSG: " + frm_payload)
|
|
|
|
|
print("MSG: " + str(type(frm_payload)))
|
|
|
|
|
|
|
|
|
|
def on_subscribe(mqttc, obj, mid, granted_qos):
|
|
|
|
|
print("\nSubscribe: " + str(mid) + " " + str(granted_qos))
|
|
|
|
|
|
|
|
|
|
def on_log(mqttc, obj, level, string):
|
|
|
|
|
print("\nLog: "+ string)
|
|
|
|
|
logging_level = mqtt.LOGGING_LEVEL[level]
|
|
|
|
|
logging.log(logging_level, string)
|
|
|
|
|
|
|
|
|
|
class RequestHandler(BaseHTTPRequestHandler):
|
|
|
|
|
|
|
|
|
|
def get_metrics(self):
|
|
|
|
|
global request_count
|
|
|
|
|
global node_metrics
|
|
|
|
|
global exporter_prefix
|
|
|
|
|
global mutex
|
|
|
|
|
mutex.acquire()
|
|
|
|
|
self.send_response(200)
|
|
|
|
|
self.send_header("Content-type", "text/html")
|
|
|
|
|
self.end_headers()
|
|
|
|
|
self.wfile.write(bytes(exporter_prefix + "expoter_duration_seconds_sum " + str(int((datetime.now() - startTime).total_seconds())) + "\n", "utf-8"))
|
|
|
|
|
self.wfile.write(bytes(exporter_prefix + "exporter_request_count " + str(request_count) + "\n", "utf-8"))
|
|
|
|
|
self.wfile.write(bytes(exporter_prefix + "exporter_scrape_healthy " + str(int(scrape_healthy)) + "\n", "utf-8"))
|
|
|
|
|
self.wfile.write(bytes(config.exporter_prefix + "expoter_duration_seconds_sum " + str(int((datetime.now() - startTime).total_seconds())) + "\n", "utf-8"))
|
|
|
|
|
self.wfile.write(bytes(config.exporter_prefix + "exporter_request_count " + str(request_count) + "\n", "utf-8"))
|
|
|
|
|
self.wfile.write(bytes(config.exporter_prefix + "exporter_scrape_healthy " + str(int(scrape_healthy)) + "\n", "utf-8"))
|
|
|
|
|
|
|
|
|
|
for metric in node_metrics:
|
|
|
|
|
#print(metric)
|
|
|
|
|
self.wfile.write(bytes(exporter_prefix + metric + "\n", "utf-8"))
|
|
|
|
|
self.wfile.write(bytes(config.exporter_prefix + metric + "\n", "utf-8"))
|
|
|
|
|
|
|
|
|
|
mutex.release()
|
|
|
|
|
|
|
|
|
@ -91,70 +57,65 @@ class RequestHandler(BaseHTTPRequestHandler):
|
|
|
|
|
self.wfile.write(bytes("</body>", "utf-8"))
|
|
|
|
|
self.wfile.write(bytes("</html>", "utf-8"))
|
|
|
|
|
|
|
|
|
|
def update_metrics(payload, metadata):
|
|
|
|
|
print("Payload: "+ str(payload))
|
|
|
|
|
print("Metadata: "+ str(metadata))
|
|
|
|
|
|
|
|
|
|
def update_metrics():
|
|
|
|
|
while True:
|
|
|
|
|
print("set data from ttn")
|
|
|
|
|
global node_metrics
|
|
|
|
|
global mutex
|
|
|
|
|
global scrape_healthy
|
|
|
|
|
mutex.acquire()
|
|
|
|
|
scrape_healthy = True
|
|
|
|
|
node_metrics.clear()
|
|
|
|
|
print("set data from ttn")
|
|
|
|
|
global node_metrics
|
|
|
|
|
global mutex
|
|
|
|
|
global scrape_healthy
|
|
|
|
|
mutex.acquire()
|
|
|
|
|
scrape_healthy = True
|
|
|
|
|
node_metrics.clear()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
node_metrics.append("target_temperature " + str(int(42)))
|
|
|
|
|
node_metrics.append("valve " + str(int(42)))
|
|
|
|
|
node_metrics.append("low_battery " + str(int(42)))
|
|
|
|
|
node_metrics.append("window_open " + str(int(42)))
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
print("unable to poll data from TTN! error: " + str(ex))
|
|
|
|
|
scrape_healthy = False
|
|
|
|
|
pass
|
|
|
|
|
mutex.release()
|
|
|
|
|
time.sleep(300)
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
print("unable to poll data from TTN! error: " + str(ex))
|
|
|
|
|
scrape_healthy = False
|
|
|
|
|
pass
|
|
|
|
|
mutex.release()
|
|
|
|
|
time.sleep(300)
|
|
|
|
|
|
|
|
|
|
def on_connect(mqttc, obj, flags, rc):
|
|
|
|
|
print("\nConnected to MQTT: rc = " + str(rc))
|
|
|
|
|
|
|
|
|
|
def on_message(mqttc, obj, msg):
|
|
|
|
|
#print("\nMessage: " + msg.topic + " " + str(msg.qos))
|
|
|
|
|
parsedJSON = json.loads(msg.payload)
|
|
|
|
|
#print(json.dumps(parsedJSON, indent=4))
|
|
|
|
|
uplink_message = parsedJSON["uplink_message"];
|
|
|
|
|
update_metrics(uplink_message["decoded_payload"], uplink_message["rx_metadata"])
|
|
|
|
|
|
|
|
|
|
def on_subscribe(mqttc, obj, mid, granted_qos):
|
|
|
|
|
print("\nSubscribed to MQTT: " + str(mid) + " " + str(granted_qos))
|
|
|
|
|
|
|
|
|
|
def poll_mqtt(mqttc):
|
|
|
|
|
while True:
|
|
|
|
|
mqttc.loop(10) # seconds timeout / blocking time
|
|
|
|
|
print(".", end="", flush=True) # feedback to the user that something is actually happening
|
|
|
|
|
|
|
|
|
|
mqttc.loop(10) # seconds timeout
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
print("start")
|
|
|
|
|
print("starting ...")
|
|
|
|
|
|
|
|
|
|
print("Init mqtt client")
|
|
|
|
|
mqttc = mqtt.Client()
|
|
|
|
|
|
|
|
|
|
print("Assign callbacks")
|
|
|
|
|
mqttc.on_connect = on_connect
|
|
|
|
|
mqttc.on_subscribe = on_subscribe
|
|
|
|
|
mqttc.on_message = on_message
|
|
|
|
|
|
|
|
|
|
print("Connect")
|
|
|
|
|
# Setup authentication from settings above
|
|
|
|
|
mqttc.username_pw_set(User, Password)
|
|
|
|
|
|
|
|
|
|
# IMPORTANT - this enables the encryption of messages
|
|
|
|
|
mqttc.tls_set() # default certification authority of the system
|
|
|
|
|
|
|
|
|
|
mqttc.connect(theRegion.lower() + ".cloud.thethings.network", 8883, 60)
|
|
|
|
|
|
|
|
|
|
print("Subscribe")
|
|
|
|
|
mqttc.username_pw_set(config.ttn_user, config.ttn_key)
|
|
|
|
|
mqttc.tls_set()
|
|
|
|
|
mqttc.connect(config.ttn_region.lower() + ".cloud.thethings.network", 8883, 60)
|
|
|
|
|
mqttc.subscribe("#", 0) # all device uplinks
|
|
|
|
|
|
|
|
|
|
print("And run forever")
|
|
|
|
|
# run mqtt in thread forever
|
|
|
|
|
poll_mqtt_thread = threading.Thread(target=poll_mqtt, args=((mqttc,)))
|
|
|
|
|
poll_mqtt_thread.start()
|
|
|
|
|
|
|
|
|
|
webServer = HTTPServer((hostName, serverPort), RequestHandler)
|
|
|
|
|
|
|
|
|
|
print("Server started http://%s:%s" % (hostName, serverPort))
|
|
|
|
|
|
|
|
|
|
update_metrics_thread = threading.Thread(target=update_metrics, args=())
|
|
|
|
|
update_metrics_thread.start()
|
|
|
|
|
|
|
|
|
|
webServer = HTTPServer((config.hostName, config.serverPort), RequestHandler)
|
|
|
|
|
print("Server started http://%s:%s" % (config.hostName, config.serverPort))
|
|
|
|
|
try:
|
|
|
|
|
webServer.serve_forever()
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
@ -162,7 +123,6 @@ def main():
|
|
|
|
|
|
|
|
|
|
webServer.server_close()
|
|
|
|
|
print("Server stopped.")
|
|
|
|
|
update_metrics_thread.join()
|
|
|
|
|
poll_mqtt_thread.join()
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|