Source code for pylorenzmie.utilities.vmedian

'''Efficient approximation to a running median filter.'''

import numpy as np


[docs] class VMedian: '''Running median of a video stream. Computes an approximate running median using a hierarchical tree of 3-element median buffers. Each level reduces the update rate by a factor of 3, so a tree of depth ``order`` accumulates ``3**(order + 1)`` frames before producing its first output. Parameters ---------- order : int Depth of the median tree. Default: 0 (single 3-frame buffer). shape : tuple of int, optional Shape ``(height, width)`` of the input images. ''' def __init__(self, order: int = 0, shape: tuple | None = None) -> None: self.child = None self.shape = shape self.order = order self.index = 0 self._initialized = False self._cycled = False
[docs] def filter(self, data: np.ndarray) -> np.ndarray: '''Add *data* and return the current median estimate.''' self.add(data) return self.get()
[docs] def get(self, reshape: bool = True) -> np.ndarray: '''Return the current median image. Parameters ---------- reshape : bool If True (default), return array with the original image shape. If False, return the flattened internal buffer. Returns ------- median : ndarray ''' return self._data.reshape(self.shape) if reshape else self._data
[docs] def add(self, data: np.ndarray) -> None: '''Include a new image in the median calculation. Parameters ---------- data : ndarray New image frame. ''' if data.shape != self.shape: self._data = data.astype(np.uint8).ravel() self.shape = data.shape if self.order == 0: self.buffer[self.index, :] = data.astype(np.uint8).ravel() self.index += 1 else: child = self.child child.add(data) if child.initialized: self.buffer[self.index, :] = child.get(reshape=False) if child.cycled: self.index += 1 if self.index == 3: self.index = 0 self._data = np.median(self.buffer, axis=0).astype(np.uint8) self._initialized = True self._cycled = True else: self._cycled = False
[docs] def reset(self) -> None: '''Reset the filter state.''' self._initialized = False self._cycled = False if self.order > 0: self.child.reset()
@property def initialized(self) -> bool: '''True once enough frames have been accumulated.''' return self._initialized @property def cycled(self) -> bool: '''True if the buffer completed a cycle on the last :meth:`add`.''' return self._cycled @property def shape(self) -> tuple | None: return self._shape @shape.setter def shape(self, shape: tuple | None) -> None: self._shape = shape if shape is None: return if self.child is not None: self.child.shape = shape npts = np.prod(shape) self.buffer = np.zeros((3, npts), dtype=np.uint8) self.index = 0 self._initialized = False @property def order(self) -> int: return self._order @order.setter def order(self, order: int) -> None: self._order = int(np.clip(order, 0, 10)) if self._order > 0: self.child = VMedian(order=self._order - 1, shape=self.shape) self._initialized = False
vmedian = VMedian # backward-compatibility alias