add dummy receiver for testing without hardware
This commit is contained in:
		
							
								
								
									
										53
									
								
								dummy_receiver.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								dummy_receiver.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,53 @@ | ||||
| import sysv_ipc #pip install sysv-ipc | ||||
| import pycstruct #pip install pycstruct | ||||
|  | ||||
|  | ||||
| str_buffer_size = 64 | ||||
| msg_queue_key = 0x1B11193C0 | ||||
|  | ||||
| try: | ||||
|     mq = sysv_ipc.MessageQueue(msg_queue_key, sysv_ipc.IPC_CREAT) | ||||
|  | ||||
|     while True: | ||||
|         message, mtype = mq.receive() | ||||
|         print("") | ||||
|         #print("*** New message received ***") | ||||
|        # print(f"Raw message: {message}") | ||||
|  | ||||
| #uint8_t u8DriveIndex; | ||||
| #uint32_t u32DriveHours; | ||||
| #uint32_t u32DriveCycles; | ||||
| #uint32_t u32DriveError; | ||||
| #uint64_t u64DriveShredTimestamp; | ||||
| #uint64_t u64DriveShredDuration; | ||||
| #uint64_t u64DriveCapacity; | ||||
| #char caDriveState[STR_BUFFER_SIZE]; | ||||
| #char caDriveModelFamiliy[STR_BUFFER_SIZE]; | ||||
| #char caDriveModelName[STR_BUFFER_SIZE]; | ||||
| #char caDriveSerialnumber[STR_BUFFER_SIZE]; | ||||
|  | ||||
|  | ||||
|         driveData = pycstruct.StructDef() | ||||
|         driveData.add('utf-8', 'driveIndex', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveHours', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveCycles', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveErrors', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveShredTimestamp', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveShredDuration', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveCapacity', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveState', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveModelFamiliy', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveModelModel', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveSerialnumber', length=str_buffer_size) | ||||
|         driveData.add('utf-8', 'driveReHddVersion', length=str_buffer_size) | ||||
|          | ||||
|         # Dictionary representation | ||||
|         result = driveData.deserialize(message) | ||||
|         print('Dictionary object:') | ||||
|         print(str(result)) | ||||
|  | ||||
|  | ||||
| except sysv_ipc.ExistentialError: | ||||
|     print("ERROR: message queue creation failed") | ||||
|  | ||||
|  | ||||
		Reference in New Issue
	
	Block a user