Source code for pose_format.pose_body

from random import sample
from typing import BinaryIO, List, Tuple

import numpy as np

from pose_format.pose_header import PoseHeader
from pose_format.utils.reader import BufferReader, ConstStructs

POINTS_DIMS = (2, 1, 0, 3)


[docs]class PoseBody: """ Class for body data of a pose. Parameters ---------- fps : float Frames per second. data: Data in the format (Frames, People, Points, Dims) e.g., (93, 1, 137, 2). confidence: Confidence data in the format (Frames, People, Points) e.g., (93, 1, 137). """ tensor_reader = 'ABSTRACT-DO-NOT-USE' def __init__(self, fps: float, data, confidence): """Initialize a PoseBody instance.""" self.fps = fps self.data = data # Shape (Frames, People, Points, Dims) - eg (93, 1, 137, 2) self.confidence = confidence # Shape (Frames, People, Points) - eg (93, 1, 137)
[docs] @classmethod def read(cls, header: PoseHeader, reader: BufferReader, **kwargs) -> "PoseBody": """ Reads pose data a buffer (BufferReader) based on the header's version. Parameters ---------- header : PoseHeader Header containing the version of its pose data. reader : BufferReader Buffer from which to read the pose data. **kwargs : dict Additional parameters for reading specific versions. Returns ------- PoseBody PoseBody object initialized with the read data. Raises ------ NotImplementedError If header's version is not supported / unknown. """ if header.version == 0: return cls.read_v0_0(header, reader, **kwargs) elif round(header.version, 3) == 0.1: return cls.read_v0_1(header, reader, **kwargs) raise NotImplementedError("Unknown version - %f" % header.version)
[docs] @classmethod def read_v0_0(cls, header: PoseHeader, reader: BufferReader, **unused_kwargs): """ reads version 0.0 pose data. Parameters ---------- header : PoseHeader Header containing the version of the pose data. reader : BufferReader Buffer from which to read the pose data. unused_kwargs : dict Unused additional parameters for this version. Raises ------ NotImplementedError method for this version is not implemented. """ raise NotImplementedError("'read_v0_0' not implemented on '%s'" % cls.__class__)
[docs] @classmethod def read_v0_1_frames(cls, frames: int, shape: List[int], reader: BufferReader, start_frame: int = None, end_frame: int = None): """ Reads frame data for version 0.1 from a buffer. Parameters ---------- frames : int Number of frames in the pose data. shape : List[int] Shape of the pose data. reader : BufferReader Buffer from which to read the pose data. start_frame : int, optional Index of the first frame to read. Default is None. end_frame : int, optional Index of the last frame to read. Default is None. Returns ------- ndarray Array containing the pose data for the specified frames. Raises ------ ValueError If start_frame is greater than number of frames. """ tensor_reader = reader.__getattribute__(cls.tensor_reader) s = ConstStructs.float _frames = frames if start_frame is not None and start_frame > 0: if start_frame >= frames: raise ValueError("Start frame is greater than the number of frames") # Advance to the start frame reader.advance(s, int(np.prod((start_frame, *shape)))) _frames -= start_frame remove_frames = None if end_frame is not None: end_frame = min(end_frame, frames) # Do not allow overflow remove_frames = frames - end_frame _frames -= remove_frames tensor = tensor_reader(ConstStructs.float, shape=(_frames, *shape)) if remove_frames is not None: reader.advance(s, int(np.prod((remove_frames, *shape)))) return tensor
[docs] @classmethod def read_v0_1(cls, header: PoseHeader, reader: BufferReader, start_frame: int = None, end_frame: int = None, **unused_kwargs) -> "PoseBody": """ Reads pose data for version 0.1 from a buffer. Parameters ---------- header : PoseHeader Header containing the version of the pose data. reader : BufferReader Buffer from which to read the pose data. start_frame : int, optional Index of the first frame to read. Default is None. end_frame : int, optional Index of the last frame to read. Default is None. **unused_kwargs : dict Unused additional parameters for this version. Returns ------- PoseBody PoseBody object initialized with the read data for version 0.1. """ fps, _frames = reader.unpack(ConstStructs.double_ushort) _people = reader.unpack(ConstStructs.ushort) _points = sum([len(c.points) for c in header.components]) _dims = max([len(c.format) for c in header.components]) - 1 # _frames is defined as short, which sometimes is not enough! TODO change to int _frames = int(reader.bytes_left() / (_people * _points * (_dims + 1) * 4)) data = cls.read_v0_1_frames(_frames, (_people, _points, _dims), reader, start_frame, end_frame) confidence = cls.read_v0_1_frames(_frames, (_people, _points), reader, start_frame, end_frame) return cls(fps, data, confidence)
[docs] def write(self, version: float, buffer: BinaryIO): """ Writes data to a file based on version of spec: in docs/spec. Parameters ---------- version : float Version of the pose data to write. buffer : BinaryIO Buffer to write the pose data to. """ raise NotImplementedError("'write' not implemented on '%s'" % self.__class__)
def __getitem__(self, index): """ Gets a version of the PoseBody data and confidence based on the provided index. Parameters ---------- index : int or slice Index or slice to get data. Returns ------- PoseBody PoseBody object with the sliced data and confidence. """ # Get the sliced data and confidence sliced_data = self.data[index] sliced_confidence = self.confidence[index] # Create a new PoseBody object with the sliced data and confidence return type(self)(self.fps, sliced_data, sliced_confidence)
[docs] def numpy(self): """ Convert the current PoseBody representation to NumpyPoseBody. Returns ------- NumpyPoseBody The converted PoseBody object. Raises ------ NotImplementedError If numpy is not implemented. """ raise NotImplementedError("'numpy' not implemented on '%s'" % self.__class__)
[docs] def torch(self): """ Converts current PoseBody to TorchPoseBody. Returns ------- TorchPoseBody The converted PoseBody object. Raises ------ NotImplementedError If toch is not implemented. """ raise NotImplementedError("'torch' not implemented on '%s'" % self.__class__)
[docs] def tensorflow(self): """ Converts current PoseBody representation to TensorflowPoseBody. Returns ------- TensorflowPoseBody Converted PoseBody object. Raises ------ NotImplementedError If tensorflow is not implemented. """ raise NotImplementedError("'tensorflow' not implemented on '%s'" % self.__class__)
[docs] def flatten(self): """ Converts data from the (Frames, People, Points, Dims) masked representation to an array of points. Every item in the result array contains the following dimensions: 0. Time in milliseconds 1. Person ID 2. Point ID 3. X dimension 4. Y dimension 5. Z dimension (if exists) 6. Pose estimation confidence Returns ------- np.ndarray Array of points with detailed dimensions. Raises ------ NotImplementedError If the method is not implemented for the specific class. """ raise NotImplementedError("'flatten' not implemented on '%s'" % self.__class__)
[docs] def slice_step(self, by: int) -> "PoseBody": """ Slices data by skipping rows. This affects the fps (frames per seconds). Parameters ---------- by : int Take one row every "by" rows. Returns ------- PoseBody PoseBody instance with sliced data. """ new_data = self.data[::by] new_confidence = self.confidence[::by] new_fps = self.fps / by return self.__class__(fps=new_fps, data=new_data, confidence=new_confidence)
[docs] def augment2d(self, rotation_std=0.2, shear_std=0.2, scale_std=0.2): """ Augment 2D data with given standard deviations. Parameters ---------- rotation_std : float, optional Rotation in radians. Default is 0.2. shear_std : float, optional Shear X in percent. Default is 0.2. scale_std : float, optional Scale X in percent. Default is 0.2. Returns ------- PoseBody Augmented PoseBody instance. Note ---- - The method modifies the PoseBody based on shear, rotation, and scaling. - **shear_std** based on https://en.wikipedia.org/wiki/Shear_matrix - **rotation_std** based on https://en.wikipedia.org/wiki/Rotation_matrix - **scale_std** based on https://en.wikipedia.org/wiki/Scaling_(geometry) """ matrix = np.eye(2) # Based on https://en.wikipedia.org/wiki/Shear_matrix if shear_std > 0: shear_matrix = np.eye(2) shear_matrix[0][1] = np.random.normal(loc=0, scale=shear_std, size=1)[0] matrix = np.dot(matrix, shear_matrix) # Based on https://en.wikipedia.org/wiki/Rotation_matrix if rotation_std > 0: rotation_angle = np.random.normal(loc=0, scale=rotation_std, size=1)[0] rotation_cos = np.cos(rotation_angle) rotation_sin = np.sin(rotation_angle) rotation_matrix = np.array([[rotation_cos, -rotation_sin], [rotation_sin, rotation_cos]]) matrix = np.dot(matrix, rotation_matrix) # Based on https://en.wikipedia.org/wiki/Scaling_(geometry) if scale_std > 0: scale_matrix = np.eye(2) scale_matrix[1][1] += np.random.normal(loc=0, scale=scale_std, size=1)[0] matrix = np.dot(matrix, scale_matrix) # Cast to matrix the correct size dim_matrix = np.eye(self.data.shape[-1]) dim_matrix[0:2, 0:2] = matrix return self.matmul(dim_matrix.astype(dtype=np.float32))
[docs] def zero_filled(self) -> __qualname__: """ Creates a new PoseBody instance with data replaced by zeros. Returns ------- PoseBody PoseBody instance with zero-filled data. Raises ------ NotImplementedError If the zero_filled is not implemented on class . """ raise NotImplementedError("'zero_filled' not implemented on '%s'" % self.__class__)
[docs] def matmul(self, matrix: np.ndarray) -> __qualname__: """ Multiplies PoseBody data with a numpy.ndarray matrix. Parameters ---------- matrix : np.ndarray The matrix to multiply the PoseBody data with. Returns ------- PoseBody PoseBody instance with data multiplied by a numpy array. Raises ------ NotImplementedError If the matmul is not implemented in class. """ raise NotImplementedError("'matmul' not implemented on '%s'" % self.__class__)
[docs] def get_points(self, indexes: List[int]) -> __qualname__: """ Get points from PoseBody. Parameters ---------- indexes : List[int] List of point indices to get from PoseBody. Returns ------- PoseBody PoseBody instance containing only choosen points. Raises ------ NotImplementedError If the `get_points` is not implemented in class. """ raise NotImplementedError("'get_points' not implemented on '%s'" % self.__class__)
[docs] def bbox(self, header: PoseHeader) -> __qualname__: """ For computing bounding box of PoseBody. Parameters ---------- header : PoseHeader Header containing the version of the pose data. Returns ------- PoseBody PoseBody instance with bounding box. Raises ------ NotImplementedError If the `bbox` is not implemented in class. """ raise NotImplementedError("'bbox' not implemented on '%s'" % self.__class__)
[docs] def points_perspective(self): """ Give points in PoseBody as a perspective view. Returns ------- PoseBody PoseBody instance with points adjusted for perspective. Raises ------ NotImplementedError If the method is not implemented for the specific class. """ raise NotImplementedError("'points_perspective' not implemented on '%s'" % self.__class__)
[docs] def select_frames(self, frame_indexes: List[int]) -> "PoseBody": """ Selects specific frames from PoseBody object. Parameters ---------- frame_indexes : List[int] List of frame indexes to select. Returns ------- PoseBody PoseBody object containing only the selected frames. Raises ------ IndexError If any of the specified frame indices are out of the valid range for the current PoseBody data. """ data = self.data[frame_indexes] confidence = self.confidence[frame_indexes] return self.__class__(fps=self.fps, data=data, confidence=confidence)
[docs] def frame_dropout_given_percent(self, dropout_percent: float) -> Tuple["PoseBody", List[int]]: """ Drop of frames based on given dropout percentage. Parameters ---------- dropout_percent : float Percentage of frames to drop. Between 0 and 1 (e.g., 0.2 means drop 20% of the frames). Returns ------- Tuple[PoseBody, List[int]] - New PoseBody object with the gotten frames. - List of frame indexes. Note ---- Actual number of dropped frames might be slightly different due to rounding! """ data_len = len(self.data) dropout_number = min(int(data_len * dropout_percent), int(data_len * 0.99)) dropout_indexes = set(sample(range(0, data_len), dropout_number)) select_indexes = [i for i in range(0, data_len) if i not in dropout_indexes] return self.select_frames(select_indexes), select_indexes
[docs] def frame_dropout_uniform(self, dropout_min: float = 0.2, dropout_max: float = 1.0) -> Tuple["PoseBody", List[int]]: """ Randomly drops frames depending on a uniform distribution - given minimum and maximum percentages. Parameters ---------- dropout_min : float, optional Minimum percentage of frames to drop. Default is 0.2. dropout_max : float, optional Maximum percentage of frames to drop. Default is 1.0. Returns ------- Tuple[PoseBody, List[int]] - New PoseBody object with dropped frames. - List of frame indexes that were retained. """ dropout_percent = np.random.uniform(low=dropout_min, high=dropout_max, size=1)[0] return self.frame_dropout_given_percent(dropout_percent)
[docs] def frame_dropout_normal(self, dropout_mean: float = 0.5, dropout_std: float = 0.1) -> Tuple["PoseBody", List[int]]: """ drop frames depending on normal distribution with given mean and standard deviation. Parameters ---------- dropout_mean : float, optional Mean percentage of frames to drop. Default is 0.5. dropout_std : float, optional Standard deviation of percentage of frames to drop. Default is 0.1. Returns ------- Tuple[PoseBody, List[int]] - New PoseBody object with dropped frames. - List of retrieved frame indexes. """ dropout_percent = np.abs(np.random.normal(loc=dropout_mean, scale=dropout_std, size=1))[0] return self.frame_dropout_given_percent(dropout_percent)