Source code for vblf.writer

import datetime
import os
import time
import zlib
from contextlib import AbstractContextManager
from typing import Any, BinaryIO, Final

from vblf.constants import Compression, ObjFlags
from vblf.general import FileStatistics, HeaderWithBase, LogContainer, ObjectWithHeader, SystemTime

BYTE_ALIGNMENT: Final = 8


[docs] class BlfWriter(AbstractContextManager["BlfWriter"]): """Binary Log Format (BLF) file writer. Writes Vector BLF log files with optional compression. Handles automatic buffering and container creation for optimal file structure. :param file: Path to BLF file or file-like object :param compression_level: Compression level (0-9), defaults to no compression :param buffer_size: Size of internal buffer in bytes before flushing, defaults to 128 KiB :raises TypeError: If file parameter is of unsupported type """ def __init__( self, file: os.PathLike[Any], compression_level: Compression = Compression.NONE, buffer_size: int = 128 * 1024, ) -> None: """Initialize BLF writer. See class documentation for details. """ self._buffer_size = buffer_size self._buffer = bytearray() self._measurement_start_time = time.time() self._time_of_last_object = self._measurement_start_time self._file: BinaryIO if isinstance(file, (str, bytes, os.PathLike)): self._file = open(file, "wb") # noqa: SIM115 elif hasattr(file, "write"): self._file = file else: err_msg = f"Unsupported type {type(file)}" raise TypeError(err_msg) # write file statistics self._file_statistics = FileStatistics.new() self._file_statistics.compression_level = compression_level self._file.write(self._file_statistics.pack())
[docs] def write(self, obj: ObjectWithHeader[HeaderWithBase]) -> None: """Write an object to the BLF file. The object is first buffered and only written to disk when the buffer is full or when the file is closed. :param obj: Object to write :raises ValueError: If object size doesn't match its header """ # byte alignment if rest := len(self._buffer) % BYTE_ALIGNMENT: self._buffer.extend(b"\x00" * (BYTE_ALIGNMENT - rest)) obj_data = obj.pack() if len(obj_data) != obj.header.base.object_size: err_msg = f"Object size mismatch: {len(obj_data)} != {obj.header.base.object_size}" raise ValueError(err_msg) self._buffer.extend(obj_data) self._file_statistics.object_count += 1 self._file_statistics.uncompressed_file_size += len(obj_data) self._time_of_last_object = time.time() if len(self._buffer) >= self._buffer_size: self._flush_container()
def _flush_container(self) -> None: """Flush the internal buffer to disk. Creates a LogContainer with the buffered data and writes it to the file. Handles compression if enabled. """ if not self._buffer: return # byte alignment if rest := self._file.tell() % BYTE_ALIGNMENT: self._file.write(b"\x00" * (BYTE_ALIGNMENT - rest)) buffer, self._buffer = self._buffer[: self._buffer_size], self._buffer[self._buffer_size :] if self._file_statistics.compression_level > Compression.NONE: compressed_data = zlib.compress(buffer, level=self._file_statistics.compression_level) else: compressed_data = bytes(buffer) log_container = LogContainer.new( data=compressed_data, time_stamp=round((time.time() - self._measurement_start_time) * 1e9), flags=ObjFlags.TIME_ONE_NANS, ) self._file.write(log_container.pack()) self._file_statistics.file_size = self._file.tell() def _update_file_statistics(self) -> None: """Update file statistics and write them to the beginning of the file. Updates the object count, file size and timestamps in the file header. """ self._file_statistics.last_object_time = SystemTime.from_datetime( datetime.datetime.fromtimestamp(self._time_of_last_object, datetime.timezone.utc) ) self._file.seek(0) self._file.write(self._file_statistics.pack())
[docs] def close(self) -> None: """Close the BLF file. Flushes any remaining buffered data and updates file statistics before closing. """ if not self._file.closed: while self._buffer: self._flush_container() self._update_file_statistics() self._file.close()
def __enter__(self) -> "BlfWriter": """Enter context manager. :returns: BlfWriter instance """ return self def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: """Exit context manager and close file. :param exc_type: Exception type if an exception occurred :param exc_value: Exception instance if an exception occurred :param traceback: Traceback if an exception occurred """ self.close()