Source code for vblf.can

import struct
from dataclasses import dataclass
from typing import ClassVar

from typing_extensions import Self

from vblf.constants import CanFdFlags, ObjFlags, ObjType

from .general import ObjectHeader, ObjectWithHeader


[docs] @dataclass class CanMessage(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HBBI8s") header: ObjectHeader channel: int flags: int dlc: int frame_id: int data: bytes
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, flags, dlc, frame_id, data_, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, flags, dlc, frame_id, data_, )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.flags, self.dlc, self.frame_id, self.data, )
[docs] @classmethod def new( cls, object_flags: ObjFlags, object_time_stamp: int, channel: int, flags: int, dlc: int, frame_id: int, data: bytes, ) -> Self: header = ObjectHeader.new( ObjectHeader.SIZE + cls._FORMAT.size, ObjType.CAN_MESSAGE, object_flags, 0, object_time_stamp, ) return cls( header, channel, flags, dlc, frame_id, data, )
[docs] @dataclass class CanMessage2(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HBBI8sIBBH") header: ObjectHeader channel: int flags: int dlc: int frame_id: int data: bytes frame_length: int bit_count: int reserved1: int reserved2: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, flags, dlc, frame_id, data_, frame_length, bit_count, reserved1, reserved2, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, flags, dlc, frame_id, data_, frame_length, bit_count, reserved1, reserved2, )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.flags, self.dlc, self.frame_id, self.data, self.frame_length, self.bit_count, self.reserved1, self.reserved2, )
[docs] @classmethod def new( cls, object_flags: ObjFlags, object_time_stamp: int, channel: int, flags: int, dlc: int, frame_id: int, data: bytes, frame_length: int, bit_count: int, ) -> Self: header = ObjectHeader.new( ObjectHeader.SIZE + cls._FORMAT.size, ObjType.CAN_MESSAGE2, object_flags, 0, object_time_stamp, ) return cls( header, channel, flags, dlc, frame_id, data, frame_length, bit_count, 0, 0, )
[docs] @dataclass class CanFdMessage(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HBBIIBBBBI64sI") header: ObjectHeader channel: int flags: int dlc: int frame_id: int frame_length: int arb_bit_count: int canfd_flags: CanFdFlags valid_data_bytes: int reserved1: int reserved2: int data: bytes reserved3: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, flags, dlc, frame_id, frame_length, arb_bit_count, canfd_flags, valid_data_bytes, reserved1, reserved2, data_, reserved3, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, flags, dlc, frame_id, frame_length, arb_bit_count, CanFdFlags(canfd_flags), valid_data_bytes, reserved1, reserved2, data_, reserved3, )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.flags, self.dlc, self.frame_id, self.frame_length, self.arb_bit_count, self.canfd_flags, self.valid_data_bytes, self.reserved1, self.reserved2, self.data, self.reserved3, )
[docs] @classmethod def new( cls, object_flags: ObjFlags, object_time_stamp: int, channel: int, flags: int, dlc: int, frame_id: int, frame_length: int, arb_bit_count: int, canfd_flags: CanFdFlags, data: bytes, ) -> Self: header = ObjectHeader.new( ObjectHeader.SIZE + cls._FORMAT.size, ObjType.CAN_FD_MESSAGE, object_flags, 0, object_time_stamp, ) return cls( header, channel, flags, dlc, frame_id, frame_length, arb_bit_count, canfd_flags, len(data), 0, 0, data, 0, )
[docs] @dataclass class CanFdMessage64(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("BBBBIIIIIIIHBBI") _FORMAT_EXT: ClassVar[struct.Struct] = struct.Struct("II") header: ObjectHeader channel: int dlc: int valid_data_bytes: int tx_count: int frame_id: int frame_length: int flags: CanFdFlags btr_cfg_arb: int btr_cfg_data: int time_offset_brs_ns: int time_offset_crc_del_ns: int bit_count: int dir: int ext_data_offset: int crc: int data: bytes btr_ext_arb: int btr_ext_data: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) # get fixed size values ( channel, dlc, valid_data_bytes, tx_count, frame_id, frame_length, flags, btr_cfg_arb, btr_cfg_data, time_offset_brs_ns, time_offset_crc_del_ns, bit_count, direction, ext_data_offset, crc, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) # get data data_offset = ObjectHeader.SIZE + cls._FORMAT.size data_length = (ext_data_offset or header.base.object_size) - data_offset data = buffer[data_offset : data_offset + data_length] # get ext frame data btr_ext_arb, btr_ext_data = 0, 0 if ( ext_data_offset != 0 and header.base.object_size >= ext_data_offset + cls._FORMAT_EXT.size ): btr_ext_arb, btr_ext_data = cls._FORMAT_EXT.unpack_from(buffer, ext_data_offset) return cls( header, channel, dlc, valid_data_bytes, tx_count, frame_id, frame_length, CanFdFlags(flags), btr_cfg_arb, btr_cfg_data, time_offset_brs_ns, time_offset_crc_del_ns, bit_count, direction, ext_data_offset, crc, data, btr_ext_arb, btr_ext_data, )
[docs] def pack(self) -> bytes: buffer = bytearray(self.header.base.object_size) # pack header self.header.pack_into(buffer, 0) # pack fixed size values self._FORMAT.pack_into( buffer, ObjectHeader.SIZE, self.channel, self.dlc, self.valid_data_bytes, self.tx_count, self.frame_id, self.frame_length, self.flags, self.btr_cfg_arb, self.btr_cfg_data, self.time_offset_brs_ns, self.time_offset_crc_del_ns, self.bit_count, self.dir, self.ext_data_offset, self.crc, ) # pack data data_offset = ObjectHeader.SIZE + self._FORMAT.size buffer[data_offset : data_offset + len(self.data)] = self.data # pack ext frame data if ( self.ext_data_offset != 0 and self.header.base.object_size >= self.ext_data_offset + self._FORMAT_EXT.size ): self._FORMAT_EXT.pack_into( buffer, self.ext_data_offset, self.btr_ext_arb, self.btr_ext_data ) return bytes(buffer)
[docs] @dataclass class CanDriverStatistic(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HHIIIIIII") header: ObjectHeader channel: int bus_load: int standard_data_frames: int extended_data_frames: int standard_remote_frames: int extended_remote_frames: int error_frames: int overload_frames: int reserved: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, bus_load, standard_data_frames, extended_data_frames, standard_remote_frames, extended_remote_frames, error_frames, overload_frames, reserved, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, bus_load, standard_data_frames, extended_data_frames, standard_remote_frames, extended_remote_frames, error_frames, overload_frames, reserved, )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.bus_load, self.standard_data_frames, self.extended_data_frames, self.standard_remote_frames, self.extended_remote_frames, self.error_frames, self.overload_frames, self.reserved, )
[docs] @dataclass class CanDriverError(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HBBI") header: ObjectHeader channel: int tx_errors: int rx_errors: int error_code: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, tx_errors, rx_errors, error_code, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, tx_errors, rx_errors, error_code, )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.tx_errors, self.rx_errors, self.error_code, )
[docs] @dataclass class CanDriverErrorExt(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HBBIIBBH4I") header: ObjectHeader channel: int tx_errors: int rx_errors: int error_code: int flags: int state: int reserved1: int reserved2: int reserved3: list[int]
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, tx_errors, rx_errors, error_code, flags, state, reserved1, reserved2, reserved3_0, reserved3_1, reserved3_2, reserved3_3, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, tx_errors, rx_errors, error_code, flags, state, reserved1, reserved2, [reserved3_0, reserved3_1, reserved3_2, reserved3_3], )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.tx_errors, self.rx_errors, self.error_code, self.flags, self.state, self.reserved1, self.reserved2, self.reserved3[0], self.reserved3[1], self.reserved3[2], self.reserved3[3], )
[docs] @dataclass class CanErrorFrame(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HHI") header: ObjectHeader channel: int length: int reserved: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) channel, length, reserved = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls(header, channel, length, reserved)
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack(self.channel, self.length, self.reserved)
[docs] @dataclass class CanErrorFrameExt(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HHIBBBBIIHH8s") header: ObjectHeader channel: int length: int flags: int ecc: int position: int dlc: int reserved1: int frame_length_in_ns: int frame_id: int flags_ext: int reserved2: int data: bytes
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) ( channel, length, flags, ecc, position, dlc, reserverd1, frame_length_in_ns, frame_id, flags_ext, reserved2, data, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls( header, channel, length, flags, ecc, position, dlc, reserverd1, frame_length_in_ns, frame_id, flags_ext, reserved2, data, )
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.length, self.flags, self.ecc, self.position, self.dlc, self.reserved1, self.frame_length_in_ns, self.frame_id, self.flags_ext, self.reserved2, self.data, )
[docs] @dataclass class CanFdErrorFrame64(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("BBBBHHHBBIIIIIIIHH") _FORMAT_EXT: ClassVar[struct.Struct] = struct.Struct("II") header: ObjectHeader channel: int dlc: int valid_data_bytes: int ecc: int flags: int error_code_ext: int ext_flags: int ext_data_offset: int reserved1: int frame_id: int frame_length: int btr_cfg_arb: int btr_cfg_data: int time_offset_brs_ns: int time_offset_crc_del_ns: int crc: int error_position: int reserved2: int data: bytes btr_ext_arb: int btr_ext_data: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) # get fixed size values ( channel, dlc, valid_data_bytes, ecc, flags, error_code_ext, ext_flags, ext_data_offset, reserved1, frame_id, frame_length, btr_cfg_arb, btr_cfg_data, time_offset_brs_ns, time_offset_crc_del_ns, crc, error_position, reserved2, ) = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) # get data data_offset = ObjectHeader.SIZE + cls._FORMAT.size data = buffer[data_offset : data_offset + valid_data_bytes] # get ext frame data btr_ext_arb, btr_ext_data = 0, 0 if header.base.object_size >= ext_data_offset + cls._FORMAT_EXT.size: btr_ext_arb, btr_ext_data = cls._FORMAT_EXT.unpack_from(buffer, ext_data_offset) return cls( header, channel, dlc, valid_data_bytes, ecc, flags, error_code_ext, ext_flags, ext_data_offset, reserved1, frame_id, frame_length, btr_cfg_arb, btr_cfg_data, time_offset_brs_ns, time_offset_crc_del_ns, crc, error_position, reserved2, data, btr_ext_arb, btr_ext_data, )
[docs] def pack(self) -> bytes: buffer = bytearray(self.header.base.object_size) # pack header self.header.pack_into(buffer, 0) # pack fixed size values self._FORMAT.pack_into( buffer, ObjectHeader.SIZE, self.channel, self.dlc, self.valid_data_bytes, self.ecc, self.flags, self.error_code_ext, self.ext_flags, self.ext_data_offset, self.reserved1, self.frame_id, self.frame_length, self.btr_cfg_arb, self.btr_cfg_data, self.time_offset_brs_ns, self.time_offset_crc_del_ns, self.crc, self.error_position, self.reserved2, ) # pack data data_offset = ObjectHeader.SIZE + self._FORMAT.size buffer[data_offset : data_offset + self.valid_data_bytes] = self.data # pack ext frame data if self.header.base.object_size >= self.ext_data_offset + self._FORMAT_EXT.size: self._FORMAT_EXT.pack_into( buffer, self.ext_data_offset, self.btr_ext_arb, self.btr_ext_data ) return bytes(buffer)
[docs] @dataclass class CanDriverHwSync(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HBBI") header: ObjectHeader channel: int flags: int reserved1: int reserved2: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) channel, flags, reserved1, reserved2 = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls(header, channel, flags, reserved1, reserved2)
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack( self.channel, self.flags, self.reserved1, self.reserved2 )
[docs] @dataclass class CanOverloadFrame(ObjectWithHeader[ObjectHeader]): _FORMAT: ClassVar[struct.Struct] = struct.Struct("HHI") header: ObjectHeader channel: int reserved1: int reserved2: int
[docs] @classmethod def unpack(cls, buffer: bytes) -> Self: header = ObjectHeader.unpack_from(buffer) channel, reserved1, reserved2 = cls._FORMAT.unpack_from(buffer, ObjectHeader.SIZE) return cls(header, channel, reserved1, reserved2)
[docs] def pack(self) -> bytes: return self.header.pack() + self._FORMAT.pack(self.channel, self.reserved1, self.reserved2)