import collections import struct from enum import Flag, IntEnum, IntFlag, auto from typing import NamedTuple, List from . import crc16 from .exceptions import ParseError import os import json import numpy as np import logging _logger = logging.getLogger("IXCOM") class msg_iterator: def __init__(self, msg, in_bytes): self.msg = msg self.remaining_bytes = in_bytes self.start_idx = 0 self.msg_size = self.msg.size() def __iter__(self): return self def __next__(self): try: byte_chunk = self.remaining_bytes[self.start_idx:self.start_idx+self.msg_size ] self.start_idx += self.msg_size self.msg.from_bytes(byte_chunk) return self.msg.data except: raise StopIteration() SYNC_BYTE = 0x7E GENERAL_PORT = 3000 BROADCAST_PORT = 4000 LAST_CHANNEL_NUMBER = 31 WAIT_TIME_FOR_RESPONSE = 10 class Response(IntEnum): '''Enumeration for response IDs''' OK = 0x0 InvalidParameter = 0x1 InvalidChecksum = 0x2 InvalidLog = 0x3 InvalidRate = 0x4 InvalidPort = 0x5 InvalidCommand = 0x6 InvalidID = 0x7 InvalidChannel = 0x8 OutOfRange = 0x9 LogExists = 0xA InvalidTrigger = 0xB InternalError = 0xC class DataSelectionMask(IntFlag): '''Enumeration for the mask bits in the datsel field of the INSSOL message''' IMURAW = auto() '''Inertial data as measured by the ISA''' IMUCORR = auto() '''Inertial data as measured by the ISA, corrected for sensor errors (scale factor, bias, misalignments, nontorthogonality, etc...) estimated by the EKF''' IMUCOMP = auto() '''Inertial data corrected for sensor errors (scale factor, bias, misalignments, nontorthogonality, etc...) estimated by the EKF and compensated for gravity/earth rate''' VELNED = auto() '''Velocity is in NED frame''' VELENU = auto() '''Velocity is in ENU frame''' VELECEF = auto() '''Velocity is in ECEF frame''' VELBDY = auto() '''Velocity is in body frame''' ALTITUDE_WGS84 = auto() '''Altitude is height above WGS84 ellipsoid''' ALTITUDE_MSL = auto() '''Altitude is height above geoid''' ALTITUDE_BARO = auto() '''Altitude is baro altitude''' WGS84POS= auto() '''Position is longitude, latitude, altitude''' ECEFPOS = auto() '''Position is ECEF X,Y,Z''' RESERVED_12 = auto() RESERVED_13 = auto() RESERVED_14 = auto() RPY_DIN70000 = auto() '''Attitude in DIN70000 convention (DIN9300 otherwise)''' DatatSelectionMask = DataSelectionMask class ParDatVelMode(IntEnum): '''Enumeration for the velocity output frame of the INSSOL message''' NED = 0 '''The velocity fields of the INSSOL message contain the velocity in NED frame.''' ENU = 1 '''The velocity fields of the INSSOL message contain the velocity in ENU frame.''' ECEF = 2 '''The velocity fields of the INSSOL message contain the velocity in ECEF frame.''' BODY = 3 '''The velocity fields of the INSSOL message contain the velocity in vehicle body frame.''' class MessageID(IntEnum): '''Enumeration for special message IDs''' PLUGIN = 0x64 COMMAND = 0xFD RESPONSE = 0xFE PARAMETER = 0xFF class ParamID(IntEnum): '''Enumeration for special parameter IDs''' PARPLUGIN = 2000 class EkfCommand(IntEnum): '''Enumeration for the EKF command subcommands''' ALIGN = 0 SAVEPOS = 1 SAVEHDG = 2 SAVEANTOFFSET = 3 FORCED_ZUPT = 4 ALIGN_COMPLETE = 5 COPYODOSCF = 6 ZUPTCALIBRATION = 8 class LogTrigger(IntEnum): '''Logs can be triggered with a certain divider from the inertial sensor samples, polled, or triggered by events.''' SYNC = 0 EVENT = 1 POLLED = 2 EXTERNAL_EVENT = 3 PPT_EVENT = 4 class LogCommand(IntEnum): '''This enum contains possible log command''' ADD = 0 '''Add the log''' STOP = 1 '''Stop the current log, but do not remove from loglist''' START = 2 '''Start a previously stopped log''' CLEAR = 3 '''Clear log from loglist''' CLEAR_ALL = 4 '''Clear all logs from loglist''' STOP_ALL = 5 '''Stop all logs''' START_ALL = 6 '''Start all logs''' class StartupPositionMode(IntEnum): '''Enumeration for the available startup position modes''' GNSSPOS = 0 '''Feed initial position from GNSS''' STOREDPOS = 1 '''Feed initial position from stored position''' FORCEDPOS = 2 '''Feed initial position from forced position''' CURRENTPOS = 3 '''Feed initial position from current position''' class StartupHeadingMode(IntEnum): '''Enumeration for the available startup heading modes''' DEFAULT = 0 '''Start with unknown heading''' STOREDHDG = 1 '''Feed initial heading from stored heading''' FORCEDHDG = 2 '''Feed initial heading from forced heading''' MAGHDG = 3 '''Feed initial heading from magnetic heading''' DUALANTHDG = 4 '''Feed initial heading from dual antenna heading''' class AlignmentMode(IntEnum): '''Enumeration for the available alignment modes''' STATIONARY = 0 '''Stationary alignment, INS has to be at standstill''' IN_MOTION = 1 '''In-motion alignment, INS can move arbitrarily, needs aiding (e.g. GNSS)''' class ExtAidingTimeMode(IntEnum): '''External aiding can be executed with two timestamp modes.''' GPS_SEC_OF_WEEK = 0 '''The time field contains the second of week''' LATENCY = 1 '''The time field contains the measurement latency''' class GlobalAlignStatus(IntEnum): '''The alignment status will transition from levelling to aligning to alignment complete. Only if the heading standard deviation falls below the threshold defined in PAREKF_HDGPOSTHR, the alignment status will be "heading good" ''' LEVELLING = 0 '''Roll and pitch are being estimated from accelerometer measurement''' ALIGNING = 1 '''Heading is being estimated from available aiding data''' ALIGN_COMPLETE = 2 '''Stationary alignment is complete, the system is allowed to be moved.''' HDG_GOOD = 3 '''Heading standard deviation is lower than the threshold defined in PAREKF_HDGPOSTHR''' class GlobalPositionStatus(IntEnum): '''The position status depends on the 3D position standard deviation''' BAD = 0 '''Position standard deviation is worse than the medium position standard deviation threshold defined in PAREKF_HDGPOSTHR''' MEDIUM = 1 '''Position standard deviation is better than the medium position standard deviation threshold defined in PAREKF_HDGPOSTHR, but worse than the high accuracy position standard deviation threshold''' HIGH = 2 '''Position standard deviation is better than the high accuracy position standard deviation threshold''' UNDEFINED = 3 '''Position has not yet been set''' class GlobalStatusBit(IntFlag): '''The global status is contained in the footer of every iXCOM message''' HWERROR = auto() '''The hardware has detected an error condition during build-in-test''' COMERROR = auto() '''There is a communication error between the navigation processor and the IMU''' NAVERROR = auto() '''The navigation solution is erroneous, e.g. due to sensor overranges''' CALERROR = auto() '''The calibration routines have encountered an error, e.g. due to temperature being out of range''' GYROOVR = auto() '''A gyro overrange has been encountered''' ACCOVR = auto() '''An accelerometer overrange has been encountered''' GNSSINVALID = auto() '''No valid GNSS solution available''' STANDBY = auto() '''The system is in standby-mode, i.e. the inertial sensor assembly is not powered up.''' DYNAMICALIGN = auto() '''The system is in dynamic alignment.''' TIMEINVALID = auto() '''The system time has not been set from GNSS''' NAVMODE = auto() '''The system is in navigation mode''' AHRSMODE = auto() '''The system is in fallback AHRS mode''' @property def alignment_status(self): return GlobalAlignStatus((self.value & 0x3000) >> 12) @property def position_status(self): return GlobalPositionStatus((self.value & 0xc000) >> 14) class SysstatBit(IntFlag): '''An enumeration for the status bits in the extended system status''' IMU_INVALID = auto() IMU_CRC_ERROR = auto() IMU_TIMEOUT = auto() IMU_SAMPLE_LOST = auto() ACC_X_OVERRANGE = auto() ACC_Y_OVERRANGE = auto() ACC_Z_OVERRANGE = auto() RESERVED = auto() OMG_X_OVERRANGE = auto() OMG_Y_OVERRANGE = auto() OMG_Z_OVERRANGE = auto() INVALID_CAL = auto() BIT_FAIL = auto() DEFAULT_CONF = auto() GNSS_INVALID = auto() ZUPT_CALIB = auto() GNSS_CRC_ERROR = auto() GNSS_TIMEOUT = auto() EKF_ERROR = auto() SAVEDPOS_ERROR = auto() SAVEDHDG_ERROR = auto() MAG_TIMEOUT = auto() MADC_TIMEOUT = auto() LICENSE_EXPIRED = auto() IN_MOTION = auto() ODO_PLAUSIBILITY = auto() ODO_HW_ERROR = auto() WAITING_INITVAL = auto() PPS_LOST = auto() LIMITED_ACCURACY = auto() REC_ENABLED = auto() FPGA_NOGO = auto() class EKF_STATUS_LOW(IntFlag): '''An enumeration for the status bits in the lower EKF status word''' POSLLH_UPDATE = auto() POSLLH_LAT_OUTLIER = auto() POSLLH_LON_OUTLIER = auto() POSLLH_ALT_OUTLIER = auto() VNED_UPDATE = auto() VNED_VN_OUTLIER = auto() VNED_VE_OUTLIER = auto() VNED_VD_OUTLIER = auto() HEIGHT_UPDATE = auto() HEIGHT_OUTLIER = auto() BAROHEIGHT_UPDATE = auto() BAROHEIGHT_OUTLIER = auto() VBDY_UPDATE = auto() VBDY_VX_OUTLIER = auto() VBDY_VY_OUTLIER = auto() VBDY_VZ_OUTLIER = auto() VODO_UPDATE = auto() VODO_OUTLIER = auto() VAIR_UPDATE = auto() VAIR_OUTLIER = auto() HDGT_UPDATE = auto() HDGT_OUTLIER = auto() HDGM_UPDATE = auto() HDGM_OUTLIER = auto() MAGFIELD_UPDATE = auto() MAGFIELD_HX_OUTLIER = auto() MAGFIELD_HY_OUTLIER = auto() MAGFIELD_HZ_OUTLIER = auto() PSEUDORANGE_UPDATE = auto() RANGERATE_UPDATE = auto() TIME_UPDATE = auto() ZUPT_UPDATE = auto() class EKF_STATUS_HIGH(IntFlag): '''An enumeration for the status bits in the higher EKF status word''' EKF_RTKLLH_UPDATE = auto() EKF_RTKLLH_LON_OUTLIER = auto() EKF_RTKLLH_LAT_OUTLIER = auto() EKF_RTKLLH_ALT_OUTLIER = auto() EKF_DUALANT_UPDATE = auto() EKF_DUALANT_OUTLIER = auto() EKF_COG_UPDATE = auto() EKF_COG_OUTLIER = auto() EKF_TDCP_UPDATE = auto() EKF_TDCP_DD_UPDATE = auto() EKF_ODO_ALONGTRACK_UPDATE = auto() EKF_ODO_ALONGTRACK_OUTLIER = auto() EKF_ODO_CONSTRAINT_UPDATE = auto() EKF_ODO_CONSTRAINT_OUTLIER = auto() EKF_GRAVITY_UPDATE = auto() EKF_GRAVITY_OUTLIER = auto() EKF_EXTPOS_UPDATE = auto() EKF_EXTPOS_OUTLIER = auto() EKF_EXTVEL_UPDATE = auto() EKF_EXTVEL_OUTLIER = auto() EKF_ZARU_UPDATE = auto() EKF_WAHBA_VEL_TOO_OLD = auto() EKF_WAHBA_POS_TOO_OLD = auto() EKF_WAHBA_INSUFFOBS = auto() EKF_WAHBA_UPDATE = auto() EKF_WAHBA_FILTER = auto() EKF_FILTER_MODE_0 = auto() EKF_FILTER_MODE_1 = auto() EKF_WAHBA_INTERNAL = auto() EKF_LEVELLING_COMPLETE = auto() EKF_STATIONARY_ALIGN_COMPLETE = auto() EKF_POSITION_VALID = auto() class XcomCommandParameter(IntEnum): '''Parameters to the XCOM command''' channel_close = 0 '''Close the current XCOM channel''' channel_open = 1 '''Open an XCOM channel''' reboot = 2 '''Reboot the device''' class ParameterAction(IntEnum): '''Allowed parameter actions''' CHANGING = 0 REQUESTING = 1 class EkfStatus: def __init__(self, value): self.value = value def __str__(self): return f'Low Status: {str(EKF_STATUS_LOW(self.value[0]))}, High status: {str(EKF_STATUS_LOW(self.value[0]))}' # Payload classes class PayloadItem(NamedTuple): name: str dimension: int datatype: str description: str = '' unit: str = '' metatype = None def c_type(self): d = { 'b': 'int8_t', 'B': 'uint8_t', 'h': 'int16_t', 'H': 'uint16_t', 'i': 'int32_t', 'I': 'uint32_t', 'l': 'int32_t', 'L': 'uint32_t', 'q': 'int64_t', 'Q': 'uint64_t', 'f': 'int8_t', 'd': 'int8_t', 's': 'char', 'z': 'char*' } result = f'{d.get(self.datatype, "struct")} {self.name}' if self.dimension > 1: result += f' [{self.dimension}]' return result def describe(self): result = self.c_type() if self.description: result += ': ' + self.description if self.unit: result += f' (Unit: {self.unit})' return result def get_struct_string(self): if isinstance(self.datatype, str): struct_string = '' if self.dimension != 1: struct_string += '%d' % self.dimension struct_string += self.datatype elif isinstance(self.datatype, Message): struct_string = self.datatype.get_struct_string()*self.dimension return struct_string def get_size(self): basic_sizes = {'b': 1, 'B': 1, 's': 1, 'h': 2, 'H': 2, 'i': 4, 'I': 4, 'l': 4, 'L': 4, 'f': 4, 'd': 8, 'q': 8, 'Q': 8, 'z': 1} # z is null-terminated var-length string, we return size for empty string if isinstance(self.datatype, str): return basic_sizes[self.datatype]*self.dimension elif isinstance(self.datatype, Message): return self.datatype.get_size()*self.dimension def get_null_item(self): if isinstance(self.datatype, str): if self.datatype == 's': return b'\0'*self.dimension if self.datatype in 'BbHhIiLlQq': dt_null = 0 elif self.datatype in 'fd': dt_null = 0.0 elif self.datatype == 'z': dt_null = b'' # for "z" the dimension is regular count, not width like for "s" elif isinstance(self.datatype, Message): dt_null = self.datatype.get_null_item() else: raise ValueError('Illegal datatype "{}"'.format(self.datatype)) if self.dimension == 1: return dt_null else: return [dt_null for _ in range(0, self.dimension)] def consume(self, input_values): output_values = [] if isinstance(self.datatype, str): if self.datatype == 's': num_to_pop = 1 else: num_to_pop = self.dimension for _ in range(0, num_to_pop): output_values.append(input_values.pop(0)) else: for _ in range(0, self.dimension): output_values.append(self.datatype.consume(input_values)) if len(output_values) == 1: return {self.name: output_values[0]} else: return {self.name: output_values} class Message: def __init__(self, item_list: List[PayloadItem], name = ''): self.item_list = item_list self.data = self.generate_data_dict() self.struct_inst = struct.Struct(self.generate_final_struct_string()) self.name = name def unpack_from(self, buffer, offset = 0): try: values = list(self.struct_inst.unpack_from(buffer, offset)) return self.consume(values) except struct.error as ee: raise ParseError(f'Could not convert {self.name}. Reason: {repr(ee)}') def __repr__(self): result = 'Message([...])' return result def describe(self): result = '' for payload_item in self.item_list: result += f'\t- {payload_item.describe()}\n' return result def get_struct_string(self): result = '' for item in self.item_list: result += item.get_struct_string() return result def generate_final_struct_string(self): return '<' + self.get_struct_string() def generate_data_dict(self): data = collections.OrderedDict() for item in self.item_list: data[item.name] = item.get_null_item() return data def get_null_item(self): d = dict() for item in self.item_list: d[item.name] = item.get_null_item() return d def get_size(self): return self.struct_inst.size def consume(self, values): data = dict() for item in self.item_list: data.update(item.consume(values)) return data class MessageItem: struct_inst: struct.Struct def __init__(self): pass def to_bytes(self): raise NotImplementedError() def from_bytes(self, inBytes): raise NotImplementedError() def size(self): try: return self.struct_inst.size except: raise NotImplementedError() class ProtocolHeader(MessageItem): structString = " 0: while struct_string[0] not in type_string: current_item += struct_string[0] struct_string = struct_string[1:] current_item += struct_string[0] struct_string = struct_string[1:] item_list.append(current_item) current_item = '' def construct_item_spec_from_msg_def(fieldname, item): cardinality = item.rstrip(type_string) if len(cardinality) == 0: cardinality = 1 else: cardinality = int(cardinality) dtype = item[-1] if dtype in 'bB': byte_size = '1' elif dtype in 'hH': byte_size = '2' elif dtype in 'iIflL': byte_size = '4' elif dtype in 'dQq': byte_size = '8' if dtype in 'bhilq': out_type = 'i' elif dtype in 'BHILQ': out_type = 'u' elif dtype in 'fd': out_type = 'f' elif dtype in 's': out_type = 'V' if out_type == 'V': out_type = 'V'+str(cardinality) item_spec = (fieldname, out_type) else: out_type = out_type + byte_size if cardinality == 1: item_spec = (fieldname, out_type) else: item_spec = (fieldname, out_type, cardinality) return item_spec def build_dtype_list(data, item_list): dtype_list = [] idx_offset = 0 for idx, fieldname in enumerate(data.keys()): item = item_list[idx+idx_offset] is_nested = False is_nested_list = False is_empty_list = False if (type(data[fieldname]) is type(list())): if data[fieldname]: if (type(data[fieldname][0]) is type(dict())): is_nested = True is_nested_list = True else: is_empty_list = True else: if (type(data[fieldname]) is type(dict())): is_nested = True if is_nested: if is_nested_list: sublist_length = len(data[fieldname][0]) dtype_sublist, nested_offset = build_dtype_list(data[fieldname][0], item_list[idx+idx_offset:]) nestet_list_length = len(data[fieldname]) idx_offset += sublist_length * nestet_list_length - 1 + nestet_list_length * nested_offset item_spec = (fieldname, dtype_sublist, nestet_list_length) else: sublist_length = len(data[fieldname]) dtype_sublist, nested_offset = build_dtype_list(data[fieldname], item_list[idx+idx_offset:]) idx_offset += sublist_length - 1 + nested_offset item_spec = (fieldname, dtype_sublist) elif is_empty_list: if item[0] != "0": idx_offset -= 1 item_spec = (fieldname, list()) else: item_spec = construct_item_spec_from_msg_def(fieldname, item) dtype_list.append(item_spec) return dtype_list, idx_offset return build_dtype_list(self.data, item_list)[0] def size(self): return self.header.size()+self.payload.size()+self.bottom.size() class DefaultCommandPayload(ProtocolPayload): message_id = MessageID.COMMAND command_id = 0 command_header = Message([ PayloadItem(name = 'cmdID', dimension = 1, datatype = 'H'), PayloadItem(name = 'specific', dimension = 1, datatype = 'H'), ]) command_payload = Message([]) message_description = None def __init__(self, varsize_arg=None): cls = type(self) if varsize_arg is not None: self.command_payload = Message(self.get_varsize_item_list(varsize_arg)) self.message_description = Message(cls.command_header.item_list + self.command_payload.item_list) self._init_from_varsize_arg(varsize_arg) else: if cls.message_description is None: cls.message_description = Message(cls.command_header.item_list + cls.command_payload.item_list) self._init_from_varsize_arg(None) class DefaultPluginMessagePayload(ProtocolPayload): message_id = MessageID.PLUGIN plugin_message_id = 0 plugin_message_header = Message([ PayloadItem(name = 'plugin_data_id', dimension = 1, datatype = 'H'), PayloadItem(name = 'reserved_pluginheader', dimension = 1, datatype = 'H'), ]) plugin_message_payload = Message([]) message_description = None def __init__(self, varsize_arg=None): cls = type(self) if varsize_arg is not None: self.plugin_message_payload = Message(self.get_varsize_item_list(varsize_arg)) self.message_description = Message(cls.plugin_message_header.item_list + self.plugin_message_payload.item_list) self._init_from_varsize_arg(varsize_arg) else: if cls.message_description is None: cls.message_description = Message(cls.plugin_message_header.item_list + cls.plugin_message_payload.item_list) self._init_from_varsize_arg(None) def payload_from_bytes(self, in_bytes): if self.get_varsize_arg_from_bytes: self._handle_varsize_arg(self.get_varsize_arg_from_bytes(in_bytes)) self.data.update(self.parameter_payload.unpack_from(in_bytes)) class DefaultParameterPayload(ProtocolPayload): message_id = MessageID.PARAMETER parameter_id = 0 parameter_header = Message([ PayloadItem(name = 'parameterID', dimension = 1, datatype = 'H'), PayloadItem(name = 'reserved_paramheader', dimension = 1, datatype = 'B'), PayloadItem(name = 'action', dimension = 1, datatype = 'B'), ]) parameter_payload = Message([]) message_description = None def __init__(self, varsize_arg=None): cls = type(self) if varsize_arg is not None: self.parameter_payload = Message(self.get_varsize_item_list(varsize_arg)) self.message_description = Message(cls.parameter_header.item_list + self.parameter_payload.item_list) self._init_from_varsize_arg(varsize_arg) else: if cls.message_description is None: cls.message_description = Message(cls.parameter_header.item_list + cls.parameter_payload.item_list) self._init_from_varsize_arg(None) def payload_from_bytes(self, in_bytes): if self.get_varsize_arg_from_bytes: self._handle_varsize_arg(self.get_varsize_arg_from_bytes(in_bytes)) self.data.update(self.parameter_payload.unpack_from(in_bytes)) class DefaultPluginParameterPayload(DefaultParameterPayload): parameter_id = ParamID.PARPLUGIN plugin_parameter_id = 0 plugin_parameter_header = Message([ PayloadItem(name = 'plugin_index', dimension = 1, datatype = 'B'), PayloadItem(name = 'reserved_plugin_paramheader', dimension = 1, datatype = 'B'), PayloadItem(name = 'pluginParID', dimension = 1, datatype = 'H'), ]) plugin_parameter_payload = Message([]) def __init__(self, varsize_arg=None): cls = type(self) if varsize_arg is not None: self.plugin_parameter_payload = Message(self.get_varsize_item_list(varsize_arg)) self.parameter_payload = Message(cls.plugin_parameter_header.item_list +self.plugin_parameter_payload.item_list ) self.message_description = Message(cls.parameter_header.item_list + self.parameter_payload.item_list) self._init_from_varsize_arg(varsize_arg) else: if cls.message_description is None: cls.parameter_payload = Message(cls.plugin_parameter_header.item_list + cls.plugin_parameter_payload.item_list) cls.message_description = Message(cls.parameter_header.item_list + cls.parameter_payload.item_list) self._init_from_varsize_arg(None) def payload_from_bytes(self, in_bytes): if self.get_varsize_arg_from_bytes: self._handle_varsize_arg(self.get_varsize_arg_from_bytes(in_bytes)) self.data.update(self.plugin_parameter_payload.unpack_from(in_bytes)) # protocol dictionaries and decorators PayloadNameDictionary = dict() MessagePayloadDictionary = dict() def message(message_id): def decorator(cls): cls.__doc__ = f'''Message (ID = {hex(message_id)}) with the following payload:\n\n{cls.message_description.describe()}''' cls.message_id = message_id MessagePayloadDictionary[message_id] = cls PayloadNameDictionary[cls.get_name()] = cls return cls return decorator PluginMessagePayloadDictionary = dict() def plugin_message(plugin_message_id): def decorator(cls): cls.__doc__ = f'''Plugin Message (ID = {plugin_message_id}) with the following payload:\n\n{cls.plugin_message_payload.describe()}''' cls.plugin_message_id = plugin_message_id PluginMessagePayloadDictionary[plugin_message_id] = cls PayloadNameDictionary[cls.get_name()] = cls return cls return decorator ParameterBottomDictionary = dict() # parameter_id -> NonStandardProtocolBottom class (default is ProtocolBottom) ParameterPayloadDictionary = dict() def parameter(parameter_id, ixcom_bottom = None): """ Make @parameter a decocation to define python classes for ixcom parameters, where parameter_id must be specified as the numeric identifier of the parameter and the optional ixcom_bottom can be specified if the parameter does not have a standard ixcom protocol bottom (e.g. ixcom_bottom = ProtocolBottomWithoutStatus). The effect of the decoration is that the function getParameterWithID() knows the parameter id and returns a ProtocolMessage() with payload that is initialized according the decorated parameter class. """ def decorator(cls): cls.__doc__ = f'''Parameter (ID = {parameter_id}) with the following payload:\n\n{cls.parameter_payload.describe()}''' cls.parameter_id = parameter_id if parameter_id == ParamID.PARPLUGIN: assert("use plugin_parameter decorator for plugin parameter") ParameterPayloadDictionary[parameter_id] = cls PayloadNameDictionary[cls.get_name()] = cls if ixcom_bottom is not None: assert issubclass(ixcom_bottom, MessageItem), "ixcom_buttom must be derived from MessageItem" ParameterBottomDictionary[parameter_id] = ixcom_bottom return cls return decorator PluginParameterPayloadDictionary = dict() def plugin_parameter(plugin_parameter_id): def decorator(cls): cls.__doc__ = f'''Plugin Parameter (ID = {plugin_parameter_id}) with the following payload:\n\n{cls.plugin_parameter_payload.describe()}''' cls.plugin_parameter_id = plugin_parameter_id PluginParameterPayloadDictionary[plugin_parameter_id] = cls PayloadNameDictionary[cls.get_name()] = cls return cls return decorator CommandPayloadDictionary = dict() def command(command_id): def decorator(cls): cls.__doc__ = f'''Command (ID = {command_id}) with the following payload:\n\n{cls.command_payload.describe()}''' cls.command_id = command_id CommandPayloadDictionary[command_id] = cls PayloadNameDictionary[cls.get_name()] = cls return cls return decorator # protocol getter functions def getMessageWithID(msgID, varsize_arg=None): """creates message with specified message ID Args: varsize_arg: argument for _get_varsize_item_list """ if msgID in MessagePayloadDictionary: message = ProtocolMessage() message.header.msgID = msgID message.payload = MessagePayloadDictionary[msgID](varsize_arg) return message else: return None def getPluginMessageWithID(plugin_message_id, varsize_arg=None): message = ProtocolMessage() message.header.msgID = MessageID.PLUGIN if plugin_message_id in PluginMessagePayloadDictionary: message.payload = PluginMessagePayloadDictionary[plugin_message_id](varsize_arg) message.payload.data['plugin_data_id'] = plugin_message_id return message else: return None def getCommandWithID(cmdID, varsize_arg=None): message = ProtocolMessage() message.header.msgID = MessageID.COMMAND if cmdID in CommandPayloadDictionary: message.payload = CommandPayloadDictionary[cmdID](varsize_arg) message.payload.data['cmdID'] = cmdID return message else: return None def getParameterWithID(parameterID, varsize_arg=None): """creates parameter with specified parameter ID Args: varsize_arg: argument for _get_varsize_item_list """ message = ProtocolMessage() message.header.msgID = MessageID.PARAMETER if parameterID == ParamID.PARPLUGIN: assert ("use getPluginParameterWithID for plugin parameter") if parameterID in ParameterPayloadDictionary: if parameterID in ParameterBottomDictionary: message.bottom = ParameterBottomDictionary[parameterID]() message.payload = ParameterPayloadDictionary[parameterID](varsize_arg) message.payload.data['parameterID'] = parameterID return message else: return None def getPluginParameterWithID(pluginParameterID, varsize_arg=None): message = ProtocolMessage() message.header.msgID = MessageID.PARAMETER if pluginParameterID in PluginParameterPayloadDictionary: message.payload = PluginParameterPayloadDictionary[pluginParameterID](varsize_arg) # assemble plugin parameter header message.payload.data['parameterID'] = ParamID.PARPLUGIN message.payload.data['reserved_paramheader'] = 0 message.payload.data['plugin_index'] = 0 message.payload.data['pluginParID'] = pluginParameterID return message else: return None def getMessageByName(msg_name, varsize_arg=None): if msg_name not in PayloadNameDictionary: return None cls = PayloadNameDictionary[msg_name] if cls.message_id == MessageID.PLUGIN: return getPluginMessageWithID(cls.plugin_message_id, varsize_arg) elif cls.message_id == MessageID.COMMAND: return getCommandWithID(cls.command_id, varsize_arg) elif cls.message_id == MessageID.RESPONSE: return getMessageWithID(MessageID.RESPONSE) elif cls.message_id == MessageID.PARAMETER: if cls.parameter_id == ParamID.PARPLUGIN: return getPluginParameterWithID(cls.plugin_parameter_id, varsize_arg) else: return getParameterWithID(cls.parameter_id, varsize_arg) else: return getMessageWithID(cls.message_id, varsize_arg) MessageStashDictionary = dict() ParameterStashDictionary = dict() PluginMessageStashDictionary = dict() PluginParameterStashDictionary = dict() CommandStashDictionary = dict() def handle_stashed_msg(md, in_bytes,extra_desc = None): if md["is_varsize"]: vsa = md["msg"].payload.get_varsize_arg_from_bytes(in_bytes[16:-4]) if vsa in md["varsize_msgs"]: return md["varsize_msgs"][vsa] else: md["varsize_msgs"][vsa] = ProtocolMessage() md["varsize_msgs"][vsa].header.msgID = md["msg"].header.msgID md["varsize_msgs"][vsa].payload = md["pl_cls"](vsa) md["varsize_msgs"][vsa].set_desc() md["varsize_msgs"][vsa].set_extra_desc(extra_desc) return md["varsize_msgs"][vsa] else: return md["msg"] def getStashedMessageWithID(msgID, in_bytes,extra_desc = None): if msgID in MessageStashDictionary: md = MessageStashDictionary[msgID] return handle_stashed_msg(md, in_bytes, extra_desc = extra_desc) elif msgID in MessagePayloadDictionary: message = ProtocolMessage() message.header.msgID = msgID cla = MessagePayloadDictionary[msgID] message.payload = cla() message.set_desc() message.set_extra_desc(extra_desc) MessageStashDictionary[msgID] = dict() MessageStashDictionary[msgID]["msg"] = message MessageStashDictionary[msgID]["pl_cls"] = cla MessageStashDictionary[msgID]["is_varsize"] = cla._is_varsize() MessageStashDictionary[msgID]["varsize_msgs"] = dict() return getStashedMessageWithID(msgID, in_bytes, extra_desc = extra_desc) else: return None def getStashedPluginMessageWithID(pluginMsgID, in_bytes,extra_desc = None): if pluginMsgID in PluginMessageStashDictionary: md = PluginMessageStashDictionary[pluginMsgID] return handle_stashed_msg(md, in_bytes, extra_desc = extra_desc) elif pluginMsgID in PluginMessagePayloadDictionary: message = ProtocolMessage() message.header.msgID = MessageID.PLUGIN cla = PluginMessagePayloadDictionary[pluginMsgID] message.payload = cla() message.set_desc() message.set_extra_desc(extra_desc) PluginMessageStashDictionary[pluginMsgID] = dict() PluginMessageStashDictionary[pluginMsgID]["msg"] = message PluginMessageStashDictionary[pluginMsgID]["pl_cls"] = cla PluginMessageStashDictionary[pluginMsgID]["is_varsize"] = cla._is_varsize() PluginMessageStashDictionary[pluginMsgID]["varsize_msgs"] = dict() return getStashedPluginMessageWithID(pluginMsgID, in_bytes, extra_desc = extra_desc) else: return None def getStashedParameterWithID(parameterID, in_bytes,extra_desc = None): if parameterID in ParameterStashDictionary: md = ParameterStashDictionary[parameterID] return handle_stashed_msg(md, in_bytes, extra_desc = extra_desc) elif parameterID in ParameterPayloadDictionary: message = ProtocolMessage() message.header.msgID = MessageID.PARAMETER cla = ParameterPayloadDictionary[parameterID] message.payload = cla() message.set_desc() message.set_extra_desc(extra_desc) ParameterStashDictionary[parameterID] = dict() ParameterStashDictionary[parameterID]["msg"] = message ParameterStashDictionary[parameterID]["pl_cls"] = cla ParameterStashDictionary[parameterID]["is_varsize"] = cla._is_varsize() ParameterStashDictionary[parameterID]["varsize_msgs"] = dict() return getStashedParameterWithID(parameterID, in_bytes, extra_desc = extra_desc) else: return None def getStashedPluginParameterWithID(pluginParameterID, in_bytes,extra_desc = None): if pluginParameterID in PluginParameterStashDictionary: md = PluginParameterStashDictionary[pluginParameterID] return handle_stashed_msg(md, in_bytes, extra_desc = extra_desc) elif pluginParameterID in PluginParameterPayloadDictionary: message = ProtocolMessage() message.header.msgID = MessageID.PARAMETER cla = PluginParameterPayloadDictionary[pluginParameterID] message.payload = cla() message.set_desc() message.set_extra_desc(extra_desc) PluginParameterStashDictionary[pluginParameterID] = dict() PluginParameterStashDictionary[pluginParameterID]["msg"] = message PluginParameterStashDictionary[pluginParameterID]["pl_cls"] = cla PluginParameterStashDictionary[pluginParameterID]["is_varsize"] = cla._is_varsize() PluginParameterStashDictionary[pluginParameterID]["varsize_msgs"] = dict() return getStashedPluginParameterWithID(pluginParameterID, in_bytes, extra_desc = extra_desc) else: return None def getStashedCommandWithID(commandID, in_bytes,extra_desc = None): if commandID in CommandStashDictionary: md = CommandStashDictionary[commandID] return handle_stashed_msg(md, in_bytes, extra_desc = extra_desc) elif commandID in CommandPayloadDictionary: message = ProtocolMessage() message.header.msgID = MessageID.COMMAND cla = CommandPayloadDictionary[commandID] message.payload = cla() message.set_desc() message.set_extra_desc(extra_desc) CommandStashDictionary[commandID] = dict() CommandStashDictionary[commandID]["msg"] = message CommandStashDictionary[commandID]["pl_cls"] = cla CommandStashDictionary[commandID]["is_varsize"] = cla._is_varsize() CommandStashDictionary[commandID]["varsize_msgs"] = dict() return getStashedCommandWithID(commandID, in_bytes, extra_desc = extra_desc) else: return None def get_message_id_from_name(msg_name): return PayloadNameDictionary[msg_name].message_id # json parsing def split_string(string): dimension = "" datatype = "" for char in string: if char.isdigit(): dimension += char else: datatype += char if dimension == '': dimension = '1' if datatype == 'L': datatype = 'Q' if datatype == 'l': datatype = 'q' return int(dimension), datatype def parse_payload_item(payload): messageList = [] for idx in range(payload.__len__()): if "structFields" in payload[idx]: nestedMessageList = parse_payload_item(payload[idx]['structFields']) dim, dtype = split_string(payload[idx]['format']) messageList.append(PayloadItem(name=payload[idx]['var'], dimension=dim, datatype=Message(nestedMessageList))) else: dim, dtype = split_string(payload[idx]['format']) messageList.append(PayloadItem(name=payload[idx]['var'], dimension=dim, datatype=dtype)) return messageList def parse_parameter_json_folder(path_json): if os.path.isdir(path_json): for f in os.listdir(path_json): try: with open(os.path.join(path_json, f), "r") as fh: pardata = json.load(fh) parid = int(pardata['id']) if ('parameter' != pardata['type']) or (parid in ParameterPayloadDictionary) or (parid == ParamID.PARPLUGIN): continue parcls = type(pardata['name'] + '_Payload', (DefaultParameterPayload,), {}) parcls.parameter_id = parid itemlist = parse_payload_item(pardata['payload']) parcls.parameter_payload = Message(itemlist) ParameterPayloadDictionary[parid] = parcls PayloadNameDictionary[parcls.get_name()] = parcls except BaseException as ee: _logger.error("error parsing parameterfile %r: %r", f, ee) def parse_messages_json_folder(path_json): if os.path.isdir(path_json): for f in os.listdir(path_json): try: with open(os.path.join(path_json, f), "r") as fh: msgdata = json.load(fh) msgid = int(msgdata['id'], 16) if ('message' != msgdata['type']) or (msgid in MessagePayloadDictionary) or (msgid == MessageID.PLUGIN): continue msgcls = type(msgdata['name'] + '_Payload', (ProtocolPayload,), {}) msgcls.message_id = msgid itemlist = parse_payload_item(msgdata['payload']) msgcls.message_description = Message(itemlist) MessagePayloadDictionary[msgid] = msgcls PayloadNameDictionary[msgcls.get_name()] = msgcls except BaseException as ee: _logger.error("error parsing messagefile %r: %r", f, ee)