#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Author: Hendrik Schutter, mail@hendrikschutter.com """ import requests import os import json import argparse import random def send_post_request(uri, data, token): headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } try: response = requests.post(uri, json=data, timeout=1, headers=headers) print("Return code: " + str(response.status_code)) except requests.exceptions.RequestException as e: print(e) def main(): parser = argparse.ArgumentParser( description="Read JSON files and send them as POST requests to simulate a webhook request." ) parser.add_argument( "uri", type=str, help="The URI to send POST requests to (e.g., http://127.0.0.1:8080/api)", ) parser.add_argument( "token", type=str, help="Bearer authorization token)", ) parser.add_argument( "directory", type=str, nargs="?", default="./testdata/", help="Directory containing JSON files (default: './testdata/')", ) parser.add_argument( "mode", type=str, choices=["send_all", "send_one", "send_random"], default="send_one", help="Mode for sending files: 'send_all', 'send_one', or 'send_random' (default: 'send_one')", ) args = parser.parse_args() if args.mode == "send_one": if os.path.isfile(args.directory) and args.directory.endswith(".json"): with open(args.directory, "r", encoding="utf-8") as file: try: data = json.load(file) print(f"Sending {args.directory} to {args.uri}") send_post_request(args.uri, data, args.token) except json.JSONDecodeError as e: print(f"Error reading {args.directory}: {e}") return if not os.path.exists(args.directory): print(f"Directory {args.directory} does not exist.") return json_files = [f for f in os.listdir(args.directory) if f.endswith(".json")] if not json_files: print(f"No JSON files found in {args.directory}.") return if args.mode == "send_all": for filename in json_files: filepath = os.path.join(args.directory, filename) with open(filepath, "r", encoding="utf-8") as file: try: data = json.load(file) print(f"Sending {filename} to {args.uri}") send_post_request(args.uri, data, args.token) except json.JSONDecodeError as e: print(f"Error reading {filename}: {e}") elif args.mode == "send_one": for filename in json_files: filepath = os.path.join(args.directory, filename) with open(filepath, "r", encoding="utf-8") as file: try: data = json.load(file) print(f"Sending {filename} to {args.uri}") send_post_request(args.uri, data, args.token) input("Press Enter to send the next file...") except json.JSONDecodeError as e: print(f"Error reading {filename}: {e}") elif args.mode == "send_random": while json_files: filename = random.choice(json_files) filepath = os.path.join(args.directory, filename) with open(filepath, "r", encoding="utf-8") as file: try: data = json.load(file) print(f"Sending {filename} to {args.uri}") send_post_request(args.uri, data, args.token) input("Press Enter to send another random file...") except json.JSONDecodeError as e: print(f"Error reading {filename}: {e}") if __name__ == "__main__": main()