Source code for pose_format.numpy.pose_body

from typing import BinaryIO, List, Union

import numpy as np
import numpy.ma as ma

from ..pose_body import POINTS_DIMS, PoseBody
from ..pose_header import PoseHeader
from ..utils.reader import BufferReader, ConstStructs

# import numpy as np
# np.seterr(all='raise')


[docs]class NumPyPoseBody(PoseBody): """ Represents pose information leveraging NumPy operations and structures. * Inherits from: `PoseBody` * Implements pose info using NumPy operations and structures. * Provides method for operations: matrix, multiplication, interpolation and data type conversions The `NumPyPoseBody` is an implementation of `PoseBody` base class. This subclass uses NumPy masked arrays to handle pose data. Makes it suitable for applications where you need NumPy-based operations. The masked arrays allow for efficient handling of missing or invalid pose values The class also comes with methods to transform, modify, and operate on pose data, including matrix multiplication, interpolation, and conversions to other data types like PyTorch tensors or TensorFlow tensors Parameters ---------- fps : float Frames per second, to represent the temporal aspect of pose data. data : Union[ma.MaskedArray, np.ndarray] Pose data either as a masked array or a regular numpy array. confidence : np.ndarray confidence array of the pose keypoints. """ tensor_reader = 'unpack_numpy' """Specifies the method name for unpacking a numpy array (Value: 'unpack_numpy').""" def __init__(self, fps: float, data: Union[ma.MaskedArray, np.ndarray], confidence: np.ndarray): """ Initializes the NumPyPoseBody instance """ if isinstance(data, np.ndarray): # If array is not masked mask = confidence == 0 # 0 means no-mask, 1 means with-mask stacked_mask = np.stack([mask] * data.shape[-1], axis=3) data = ma.masked_array(data, mask=stacked_mask) super().__init__(fps, data, confidence)
[docs] @classmethod def read_v0_0(cls, header: PoseHeader, reader: BufferReader, **unused_kwargs): """ Reads pose data from a given buffer reader using a specified data format version (see: ``docs/specs``). Parameters ---------- header : PoseHeader Pose header information reader : BufferReader binary buffer reader Returns ------- NumPyPoseBody Instance of NumPyPoseBody with read pose data. """ fps, _frames = reader.unpack(ConstStructs.double_ushort) _dims = max([len(c.format) for c in header.components]) - 1 _points = sum([len(c.points) for c in header.components]) frames_d = [] frames_c = [] for _ in range(_frames): _people = reader.unpack(ConstStructs.ushort) people_d = [] people_c = [] for pid in range(_people): reader.advance(ConstStructs.short) # Skip Person ID person_d = [] person_c = [] for component in header.components: points = np.array( reader.unpack_numpy(ConstStructs.float, (len(component.points), len(component.format)))) dimensions, confidence = np.split(points, [-1], axis=1) boolean_confidence = np.where(confidence > 0, 0, 1) # To create the mask mask = np.column_stack(tuple([boolean_confidence] * (len(component.format) - 1))) person_d.append(ma.masked_array(dimensions, mask=mask)) person_c.append(np.squeeze(confidence, axis=-1)) if pid == 0: people_d.append(ma.concatenate(person_d)) people_c.append(np.concatenate(person_c)) # In case no person, should all be zeros if len(people_d) == 0: people_d.append(np.zeros((_points, _dims))) people_c.append(np.zeros(_points)) frames_d.append(ma.stack(people_d)) frames_c.append(np.stack(people_c)) return cls(fps, ma.stack(frames_d), ma.stack(frames_c))
[docs] def write(self, version: float, buffer: BinaryIO): """ Writes pose data to a binary buffer using specified data format version. Parameters ---------- version : float Version of the data format. buffer : BinaryIO The binary buffer to write to. """ _frames, _people, _points, _dims = self.data.shape _frames = _frames if _frames < 65535 else 0 # TODO change from short to int buffer.write(ConstStructs.triple_ushort.pack(self.fps, _frames, _people)) buffer.write(np.array(self.data.data, dtype=np.float32).tobytes()) buffer.write(np.array(self.confidence, dtype=np.float32).tobytes())
@property def mask(self): """ Returns mask associated with data. """ return self.data.mask
[docs] def torch(self): """ converts current instance into a TorchPoseBody instance. Returns ------- TorchPoseBody The pose body data represented in PyTorch tensors. """ try: import torch except ImportError: raise ImportError("Please install torch. https://pytorch.org/") import torch from ..torch.pose_body import TorchPoseBody torch_confidence = torch.from_numpy(self.confidence) torch_data = torch.from_numpy(self.data.data) return TorchPoseBody(self.fps, torch_data, torch_confidence)
[docs] def tensorflow(self): """ converts current instance into a TensorflowPoseBody instance Returns ------- TensorflowPoseBody pose body data represented in TensorFlow tensors """ import tensorflow from ..tensorflow.pose_body import TensorflowPoseBody tf_confidence = tensorflow.constant(self.confidence) tf_data = tensorflow.constant(self.data.data) return TensorflowPoseBody(self.fps, tf_data, tf_confidence)
[docs] def zero_filled(self): """ fills missing values with zeros. Returns ------- NumPyPoseBody changed pose body data. """ self.data = ma.array(self.data.filled(0), mask=self.data.mask) return self
[docs] def matmul(self, matrix: np.ndarray): """ Performs matrix multiplication on pose data. Parameters ---------- matrix : np.ndarray matrix to multiply the pose data with Returns ------- NumPyPoseBody transformed pose body data """ data = ma.dot(self.data, matrix) return NumPyPoseBody(self.fps, data, self.confidence)
[docs] def flip(self, axis=0): """ flips pose data across a specified axis Parameters ---------- axis : int, optional axis along which the pose data should be flipped. Returns ------- NumPyPoseBody flipped pose body data """ vec = np.ones(self.data.shape[-1]) vec[axis] = -1 data = self.data * vec return NumPyPoseBody(self.fps, data, self.confidence)
[docs] def points_perspective(self): """ Transforms pose data to get a perspective based on points. Returns ------- ma.MaskedArray Transformed pose data """ return ma.transpose(self.data, axes=POINTS_DIMS)
[docs] def get_points(self, indexes: List[int]): """ Get points (keypoints) based on given indexes. Parameters ---------- indexes : List[int] List of indices representing the keypoints to get. Returns ------- NumPyPoseBody Pose body data containing only a specified points. """ data = ma.transpose(self.data, axes=POINTS_DIMS) new_data = ma.transpose(data[indexes], axes=POINTS_DIMS) confidence_reshape = (2, 1, 0) confidence = np.transpose(self.confidence, axes=confidence_reshape) new_confidence = np.transpose(confidence[indexes], axes=confidence_reshape) return NumPyPoseBody(self.fps, new_data, new_confidence)
[docs] def bbox(self, header: PoseHeader): """ Computes the bounding boxes for each component based on the pose data. Parameters ---------- header : PoseHeader Pose header information. Returns ------- NumPyPoseBody Pose body data representing bounding boxes. """ data = ma.transpose(self.data, axes=POINTS_DIMS) # Split data by components, `ma` doesn't support ".split" components = [] idx = 0 for component in header.components: components.append(data[list(range(idx, idx + len(component.points)))]) idx += len(component.points) boxes = [ma.stack([ma.min(c, axis=0), ma.max(c, axis=0)]) for c in components] boxes_cat = ma.concatenate(boxes) if type(boxes_cat.mask) == np.bool_: # Sometimes, it doesn't concatenate the mask... boxes_mask = ma.concatenate([b.mask for b in boxes]) boxes_cat = ma.array(boxes_cat, mask=boxes_mask) new_data = ma.transpose(boxes_cat, axes=POINTS_DIMS) confidence_mask = np.split(new_data.mask, [-1], axis=3)[0] confidence_mask = np.squeeze(confidence_mask, axis=-1) confidence = np.where(confidence_mask == True, 0, 1) return NumPyPoseBody(self.fps, new_data, confidence)
[docs] def interpolate(self, new_fps: int = None, kind='cubic'): """ Interpolates the pose data to match a new frame rate. Parameters ---------- new_fps : int, optional The desired frame rate for interpolation. kind : str, optional The type of interpolation. Options include: "linear", "quadratic", and "cubic". Returns ------- NumPyPoseBody Interpolated pose body data. """ try: from scipy.interpolate import interp1d except ImportError: raise ImportError("Please install scipy with: pip install scipy") if new_fps is None: new_fps = self.fps _frames = self.data.shape[0] if _frames == 1: raise ValueError("Can't interpolate single frame") _new_frames = round(_frames * new_fps / self.fps) steps = np.linspace(0, 1, _frames) new_steps = np.linspace(0, 1, _new_frames) transposed = self.points_perspective() # (points, people, frames, dims) masked_confidence = ma.array(self.confidence, mask=self.confidence == 0) confidence = ma.expand_dims(masked_confidence.transpose(), axis=3) # (points, people, frames, 1) points = ma.concatenate([transposed, confidence], axis=3) new_people = [] for people in points: new_frames = [] for frames in people: mask = frames.transpose()[0].mask partial_steps = ma.array(steps, mask=mask).compressed() if partial_steps.shape[0] == 0: # No data for this point new_frames.append(np.zeros((_new_frames, frames.shape[1]))) else: partial_frames = frames.compressed().reshape(partial_steps.shape[0], frames.shape[1]) if len(partial_steps) == 1: f = lambda l: partial_frames else: this_kind = kind if len(partial_steps) > 3 \ else "quadratic" if len(partial_steps) > 2 and kind == "cubic" \ else "linear" # Can't do something fancy for 2 points f = interp1d(partial_steps, partial_frames, axis=0, kind=this_kind) first_step = partial_steps[0] last_step = partial_steps[-1] if first_step == 0 and last_step == 1: new_frames.append(f(new_steps)) else: first_step_where = np.argwhere(new_steps >= first_step) first_step_index = first_step_where[0][0] if len(first_step_where) > 0 else 0 last_step_where = np.argwhere(new_steps > last_step) last_step_index = last_step_where[0][0] if len(last_step_where) > 0 else len(new_steps) if first_step_index == last_step_index: new_frames.append(np.zeros((len(new_steps), frames.shape[1]))) else: frame_data = f(new_steps[first_step_index:last_step_index]) new_frames.append( np.concatenate([ np.zeros((first_step_index, frames.shape[1])), np.array(frame_data), np.zeros((len(new_steps) - last_step_index, frames.shape[1])) ])) new_people.append(np.stack(new_frames, axis=0)) new_data = np.stack(new_people, axis=0).transpose([2, 1, 0, 3]) dimensions, confidence = np.split(new_data, [-1], axis=3) confidence = np.squeeze(confidence, axis=3) return NumPyPoseBody(fps=new_fps, data=dimensions, confidence=confidence)
[docs] def flatten(self): """ Flattens data and confidence arrays. method reshapes data and confidence arrays to a two-dimensional array. The flattened array is filtered to remove rows where confidence is zero. Returns ------- numpy.ndarray flattened and filtered version of the data array. """ shape = self.data.shape data = self.data.data.reshape(-1, shape[-1]) # Not masked data confidence = self.confidence.flatten() indexes = list(np.ndindex(shape[:-1])) flat = np.c_[indexes, confidence, data] # Filter data from flat flat = flat[confidence != 0] # Scale the first axis by fps scalar = np.ones(len(shape) + shape[-1]) scalar[0] = 1 / self.fps return flat * scalar