LocationHub/server/scripts/ttn-webhook-dummy.py

111 lines
3.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
""" Author: Hendrik Schutter, mail@hendrikschutter.com
"""
import requests
import os
import json
import argparse
import random
def send_post_request(uri, data, token):
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
try:
response = requests.post(uri, json=data, timeout=1, headers=headers)
print("Return code: " + str(response.status_code))
except requests.exceptions.RequestException as e:
print(e)
def main():
parser = argparse.ArgumentParser(
description="Read JSON files and send them as POST requests to simulate a webhook request."
)
parser.add_argument(
"uri",
type=str,
help="The URI to send POST requests to (e.g., http://127.0.0.1:8080/api)",
)
parser.add_argument(
"token",
type=str,
help="Bearer authorization token)",
)
parser.add_argument(
"directory",
type=str,
nargs="?",
default="./testdata/",
help="Directory containing JSON files (default: './testdata/')",
)
parser.add_argument(
"mode",
type=str,
choices=["send_all", "send_one", "send_random"],
default="send_one",
help="Mode for sending files: 'send_all', 'send_one', or 'send_random' (default: 'send_one')",
)
args = parser.parse_args()
if args.mode == "send_one":
if os.path.isfile(args.directory) and args.directory.endswith(".json"):
with open(args.directory, "r", encoding="utf-8") as file:
try:
data = json.load(file)
print(f"Sending {args.directory} to {args.uri}")
send_post_request(args.uri, data, args.token)
except json.JSONDecodeError as e:
print(f"Error reading {args.directory}: {e}")
return
if not os.path.exists(args.directory):
print(f"Directory {args.directory} does not exist.")
return
json_files = [f for f in os.listdir(args.directory) if f.endswith(".json")]
if not json_files:
print(f"No JSON files found in {args.directory}.")
return
if args.mode == "send_all":
for filename in json_files:
filepath = os.path.join(args.directory, filename)
with open(filepath, "r", encoding="utf-8") as file:
try:
data = json.load(file)
print(f"Sending {filename} to {args.uri}")
send_post_request(args.uri, data, args.token)
except json.JSONDecodeError as e:
print(f"Error reading {filename}: {e}")
elif args.mode == "send_one":
for filename in json_files:
filepath = os.path.join(args.directory, filename)
with open(filepath, "r", encoding="utf-8") as file:
try:
data = json.load(file)
print(f"Sending {filename} to {args.uri}")
send_post_request(args.uri, data, args.token)
input("Press Enter to send the next file...")
except json.JSONDecodeError as e:
print(f"Error reading {filename}: {e}")
elif args.mode == "send_random":
while json_files:
filename = random.choice(json_files)
filepath = os.path.join(args.directory, filename)
with open(filepath, "r", encoding="utf-8") as file:
try:
data = json.load(file)
print(f"Sending {filename} to {args.uri}")
send_post_request(args.uri, data, args.token)
input("Press Enter to send another random file...")
except json.JSONDecodeError as e:
print(f"Error reading {filename}: {e}")
if __name__ == "__main__":
main()