Source code for pose_format.utils.reader

import struct
from dataclasses import dataclass
from typing import Tuple

import numpy as np


[docs]@dataclass class ConstStructs: """ Class hold collection of predefined struct formats to reuse """ float: struct.Struct = struct.Struct("<f") """ Struct format for floating-point number""" short: struct.Struct = struct.Struct("<h") """ Struct format for signed short integer,'<h' """ ushort: struct.Struct = struct.Struct("<H") """ Struct format for unsigned short integer""" double_ushort: struct.Struct = struct.Struct("<HH") """ Struct format for two unsigned short integers""" triple_ushort: struct.Struct = struct.Struct("<HHH") """ Struct format for three unsigned short integers"""
[docs]class BufferReader: """ Class is used to read binary data from buffer Parameters ---------- buffer: bytes buffer from which to read data read_offset: int current read offset in buffer """ def __init__(self, buffer: bytes): self.buffer = buffer self.read_offset = 0
[docs] def bytes_left(self): """ gives number of bytes left to read from buffer Returns ------- int The number of bytes left to read. """ return len(self.buffer) - self.read_offset
[docs] def unpack_f(self, s_format: str): """ unpacks data from buffer using given struct format Parameters ---------- s_format : str The struct format to use for unpacking data. Returns ------- Unpacked data as specified by the struct format. """ if not hasattr(ConstStructs, s_format): le_format: str = "<" + s_format setattr(ConstStructs, s_format, struct.Struct(le_format)) return self.unpack(getattr(ConstStructs, s_format))
[docs] def unpack_numpy(self, s: struct.Struct, shape: Tuple): """ unpacks data from buffer into a numpy array using struct format and shape Parameters ---------- s : struct.Struct The struct format to use. shape : Tuple[int, ...] The shape of the NumPy array. Returns ------- np.ndarray The unpacked NumPy array. """ arr = np.ndarray(shape, s.format, self.buffer, self.read_offset).copy() self.advance(s, int(np.prod(shape))) return arr
[docs] def unpack_torch(self, s: struct.Struct, shape: Tuple): """ unpacks data from buffer into a torch tensor using struct format and shape Parameters ---------- s : struct.Struct The struct format to use. shape : Tuple[int, ...] The shape of the PyTorch tensor. Returns ------- torch.Tensor The unpacked PyTorch tensor. """ import torch arr = self.unpack_numpy(s, shape) return torch.from_numpy(arr)
[docs] def unpack_tensorflow(self, s: struct.Struct, shape: Tuple): """ Unpacks into a tensorflow tensor using struct format and shape Parameters ---------- s : struct.Struct The struct format to use. shape : Tuple[int, ...] The shape of the TensorFlow tensor. Returns ------- tensorflow.Tensor The unpacked TensorFlow tensor. """ import tensorflow as tf arr = self.unpack_numpy(s, shape) return tf.constant(arr)
[docs] def unpack(self, s: struct.Struct): """ Unpacks data from the buffer using a given struct format. Parameters ---------- s : struct.Struct The struct format to use for unpacking data. Returns ------- Unpacked data as specified by the struct format. """ unpack: tuple = s.unpack_from(self.buffer, self.read_offset) self.advance(s) if len(unpack) == 1: return unpack[0] return unpack
[docs] def advance(self, s: struct.Struct, times=1): """ Updates read_offset by number of times and size of given struct -> advances read offset in buffer Parameters ---------- s : struct.Struct The struct format that determines the data size. times : int, optional The number of times to advance the read offset. Default is 1. """ self.read_offset += s.size * times
[docs] def unpack_str(self) -> str: """ Unpacks a string from the buffer. Returns ------- str The unpacked string, encoded in UTF-8. """ length: int = self.unpack(ConstStructs.ushort) bytes_: bytes = self.unpack_f("%ds" % length) return bytes_.decode("utf-8")
if __name__ == "__main__": from tqdm import tqdm buffer = struct.pack("<H5s", 5, bytes("hello", 'utf8')) reader = BufferReader(buffer) for _ in tqdm(range(10000)): reader.read_offset = 0 reader.unpack_str()