From 12a4da09021411496ebd61cfed973b5e6646e0dd Mon Sep 17 00:00:00 2001 From: localhorst Date: Thu, 24 Nov 2022 20:57:17 +0100 Subject: [PATCH] add dummy receiver for testing without hardware --- dummy_receiver.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 dummy_receiver.py diff --git a/dummy_receiver.py b/dummy_receiver.py new file mode 100644 index 0000000..bfcd21d --- /dev/null +++ b/dummy_receiver.py @@ -0,0 +1,53 @@ +import sysv_ipc #pip install sysv-ipc +import pycstruct #pip install pycstruct + + +str_buffer_size = 64 +msg_queue_key = 0x1B11193C0 + +try: + mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT) + + while True: + message, mtype = mq.receive() + print("") + #print("*** New message received ***") + # print(f"Raw message: {message}") + +#uint8_t u8DriveIndex; +#uint32_t u32DriveHours; +#uint32_t u32DriveCycles; +#uint32_t u32DriveError; +#uint64_t u64DriveShredTimestamp; +#uint64_t u64DriveShredDuration; +#uint64_t u64DriveCapacity; +#char caDriveState[STR_BUFFER_SIZE]; +#char caDriveModelFamiliy[STR_BUFFER_SIZE]; +#char caDriveModelName[STR_BUFFER_SIZE]; +#char caDriveSerialnumber[STR_BUFFER_SIZE]; + + + driveData = pycstruct.StructDef() + driveData.add('utf-8', 'driveIndex', length=str_buffer_size) + driveData.add('utf-8', 'driveHours', length=str_buffer_size) + driveData.add('utf-8', 'driveCycles', length=str_buffer_size) + driveData.add('utf-8', 'driveErrors', length=str_buffer_size) + driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size) + driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size) + driveData.add('utf-8', 'driveCapacity', length=str_buffer_size) + driveData.add('utf-8', 'driveState', length=str_buffer_size) + driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size) + driveData.add('utf-8', 'driveModelModel', length=str_buffer_size) + driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size) + driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size) + + # Dictionary representation + result = driveData.deserialize(message) + print('Dictionary object:') + print(str(result)) + + +except sysv_ipc.ExistentialError: + print("ERROR: message queue creation failed") + +