Source code for vblf.lin

import struct
from dataclasses import dataclass
from typing import ClassVar

from typing_extensions import Self

from vblf.general import ObjectHeader, ObjectWithHeader


[docs] @dataclass class LinMessage(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HBB8sBBBBHB5s") header: ObjectHeader channel: int id: int dlc: int data: bytes fsm_id: int fsm_state: int header_time: int full_time: int crc: int dir: int reserved: bytes
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, lin_id, dlc, data_, fsm_id, fsm_state, header_time, full_time, crc, direction, reserved, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, lin_id, dlc, data_, fsm_id, fsm_state, header_time, full_time, crc, direction, reserved, )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.id, self.dlc, self.data, self.fsm_id, self.fsm_state, self.header_time, self.full_time, self.crc, self.dir, self.reserved, )
[docs] @dataclass class LinBusEvent: _FORMAT: ClassVar[struct.Struct] = struct.Struct("QIH2s") SIZE: ClassVar[int] = _FORMAT.size sof: int event_baudrate: int channel: int reserved: bytes
[docs] @classmethod def unpack_from(cls, buffer: bytes, offset: int = 0) -> Self: ( sof, event_baudrate, channel, reserved, ) = cls._FORMAT.unpack_from(buffer, offset) return cls( sof, event_baudrate, channel, reserved, )
[docs] def pack_into(self, buffer: bytearray, offset: int) -> None: self._FORMAT.pack_into( buffer, offset, self.sof, self.event_baudrate, self.channel, self.reserved )
[docs] @dataclass class LinSynchFieldEvent: _FORMAT: ClassVar[struct.Struct] = struct.Struct("QQ") SIZE: ClassVar[int] = LinBusEvent.SIZE + _FORMAT.size lin_bus_event: LinBusEvent synch_break_length: int synch_del_length: int
[docs] @classmethod def unpack_from(cls, buffer: bytes, offset: int = 0) -> Self: lin_bus_event = LinBusEvent.unpack_from(buffer, offset) ( synch_break_length, synch_del_length, ) = cls._FORMAT.unpack_from(buffer, offset + LinBusEvent.SIZE) return cls( lin_bus_event, synch_break_length, synch_del_length, )
[docs] def pack_into(self, buffer: bytearray, offset: int) -> None: self.lin_bus_event.pack_into(buffer, offset) self._FORMAT.pack_into( buffer, offset + LinBusEvent.SIZE, self.synch_break_length, self.synch_del_length )
[docs] @dataclass class LinMessageDescriptor: _FORMAT: ClassVar[struct.Struct] = struct.Struct("HHBBBB") SIZE: ClassVar[int] = LinSynchFieldEvent.SIZE + _FORMAT.size lin_synch_field_event: LinSynchFieldEvent supplier_id: int message_id: int nad: int id: int dlc: int checksum_model: int
[docs] @classmethod def unpack_from(cls, buffer: bytes, offset: int = 0) -> Self: lin_synch_field_event = LinSynchFieldEvent.unpack_from(buffer, offset) ( supplier_id, message_id, nad, lin_id, dlc, checksum_model, ) = cls._FORMAT.unpack_from(buffer, offset + LinSynchFieldEvent.SIZE) return cls( lin_synch_field_event, supplier_id, message_id, nad, lin_id, dlc, checksum_model, )
[docs] def pack_into(self, buffer: bytearray, offset: int) -> None: self.lin_synch_field_event.pack_into(buffer, offset) self._FORMAT.pack_into( buffer, offset + LinSynchFieldEvent.SIZE, self.supplier_id, self.message_id, self.nad, self.id, self.dlc, self.checksum_model, )
[docs] @dataclass class LinDatabyteTimestampEvent: _FORMAT: ClassVar[struct.Struct] = struct.Struct("9Q") SIZE: ClassVar[int] = LinMessageDescriptor.SIZE + _FORMAT.size lin_msg_descr_event: LinMessageDescriptor databyte_timestamps: tuple[int]
[docs] @classmethod def unpack_from(cls, buffer: bytes, offset: int = 0) -> Self: lin_msg_descr_event = LinMessageDescriptor.unpack_from(buffer, offset) databyte_timestamps = cls._FORMAT.unpack_from(buffer, offset + LinMessageDescriptor.SIZE) return cls( lin_msg_descr_event, databyte_timestamps, )
[docs] def pack_into(self, buffer: bytearray, offset: int) -> None: self.lin_msg_descr_event.pack_into(buffer, offset) self._FORMAT.pack_into( buffer, offset + LinMessageDescriptor.SIZE, *self.databyte_timestamps )
[docs] @dataclass class LinMessage2(ObjectWithHeader[ObjectHeader]): _FORMAT_V1: ClassVar[struct.Struct] = struct.Struct("8sHBBBBBBB3s") _FORMAT_V2: ClassVar[struct.Struct] = struct.Struct("I") _FORMAT_V3: ClassVar[struct.Struct] = struct.Struct("dII") _V1_SIZE: ClassVar[int] = ObjectHeader.SIZE + LinDatabyteTimestampEvent.SIZE + _FORMAT_V1.size _V2_SIZE: ClassVar[int] = _V1_SIZE + _FORMAT_V2.size _V3_SIZE: ClassVar[int] = _V2_SIZE + _FORMAT_V3.size header: ObjectHeader # V1 lin_timestamp_event: LinDatabyteTimestampEvent data: bytes crc: int direction: int simulated: int is_etf: int etf_assoc_index: int etf_assoc_etf_id: int fsm_id: int fsm_state: int reserved: bytes # V2 resp_baudrate: int # V3 exact_header_baudrate: float early_stopbit_offset: int early_stopbit_offset_response: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) lin_timestamp_event = LinDatabyteTimestampEvent.unpack_from(buffer, ObjectHeader.SIZE) ( data_, crc, direction, simulated, is_etf, etf_assoc_index, etf_assoc_etf_id, fsm_id, fsm_state, reserved, ) = cls._FORMAT_V1.unpack_from(buffer, ObjectHeader.SIZE + LinDatabyteTimestampEvent.SIZE) if header.base.object_size >= cls._V2_SIZE: (resp_baudrate,) = cls._FORMAT_V2.unpack_from(buffer, cls._V1_SIZE) else: resp_baudrate = 0 if header.base.object_size >= cls._V3_SIZE: ( exact_header_baudrate, early_stopbit_offset, early_stopbit_offset_response, ) = cls._FORMAT_V3.unpack_from(buffer, cls._V2_SIZE) else: exact_header_baudrate = 0 early_stopbit_offset = 0 early_stopbit_offset_response = 0 return cls( header, lin_timestamp_event, data_, crc, direction, simulated, is_etf, etf_assoc_index, etf_assoc_etf_id, fsm_id, fsm_state, reserved, resp_baudrate, exact_header_baudrate, early_stopbit_offset, early_stopbit_offset_response, )
[docs] def pack(self) -> bytes: buffer = bytearray(self.header.base.object_size) self.header.pack_into(buffer, 0) self.lin_timestamp_event.pack_into(buffer, ObjectHeader.SIZE) self._FORMAT_V1.pack_into( buffer, ObjectHeader.SIZE + LinDatabyteTimestampEvent.SIZE, self.data, self.crc, self.direction, self.simulated, self.is_etf, self.etf_assoc_index, self.etf_assoc_etf_id, self.fsm_id, self.fsm_state, self.reserved, ) if self.header.base.object_size >= self._V2_SIZE: self._FORMAT_V2.pack_into(buffer, self._V1_SIZE, self.resp_baudrate) if self.header.base.object_size >= self._V3_SIZE: self._FORMAT_V3.pack_into( buffer, self._V2_SIZE, self.exact_header_baudrate, self.early_stopbit_offset, self.early_stopbit_offset_response, ) return bytes(buffer)