Source code for pyorc.api.frames

"""Frames methods for pyorc."""

import copy
from typing import Literal, Optional

import cv2
import matplotlib.pyplot as plt
import numpy as np
import xarray as xr
from ffpiv import window
from matplotlib.animation import FuncAnimation
from tqdm import tqdm

from pyorc import const, cv, helpers, project
from pyorc.velocimetry import ffpiv, openpiv

from .orcbase import ORCBase
from .plot import _frames_plot

__all__ = ["Frames"]


[docs]@xr.register_dataarray_accessor("frames") class Frames(ORCBase): """Frames functionalities that can be applied on xr.DataArray."""
[docs] def __init__(self, xarray_obj): """Initialize a frames `xr.DataArray`. Parameters ---------- xarray_obj: xr.DataArray frames data fields (from pyorc.Video.get_frames) """ super(Frames, self).__init__(xarray_obj)
def get_piv_coords( self, window_size: tuple[int, int], search_area_size: tuple[int, int], overlap: tuple[int, int] ) -> tuple[dict, dict]: """Get Particle Image Velocimetry (PIV) coordinates and mesh grid projections. This function calculates the PIV coordinates and the corresponding mesh grid projections based on the provided window size, search area size, and overlap. The results include both projected coordinates (xp, yp) and the respective longitude and latitude values (if available). Parameters ---------- window_size : (int, int) The size of the window for the PIV analysis. search_area_size : (int, int) The size of the search area for the PIV analysis. overlap : (int, int) The overlap ratio between consecutive windows. Returns ------- coords : dict A dictionary containing the PIV local non-geographical projection coordinates: - 'y': The y-axis coordinates. - 'x': The x-axis coordinates. mesh_coords : dict A dictionary containing the mesh grid projections and coordinates: - 'xp': The projected x-coordinates. - 'yp': The projected y-coordinates. - 'xs': The x-coordinates in the image space. - 'ys': The y-coordinates in the image space. - 'lon': The longitude values (if CRS is provided). - 'lat': The latitude values (if CRS is provided). """ dim_size = self._obj[0].shape # get the cols and rows coordinates of the expected results cols_vector, rows_vector = window.get_rect_coordinates( dim_size=dim_size, window_size=window_size, search_area_size=search_area_size, overlap=overlap, ) cols, rows = np.meshgrid(cols_vector, rows_vector) # retrieve the x and y-axis belonging to the results x, y = helpers.get_axes(cols_vector, rows_vector, self._obj.x.values, self._obj.y.values) # convert in projected and latlon coordinates xs, ys = helpers.get_xs_ys( cols, rows, self.camera_config.transform, ) if hasattr(self.camera_config, "crs"): lons, lats = helpers.get_lons_lats(xs, ys, self.camera_config.crs) else: lons = None lats = None # calculate projected coordinates z = self.camera_config.h_to_z(self.h_a) zs = np.ones(xs.shape) * z xp, yp = self.camera_config.project_grid(xs, ys, zs, swap_y_coords=True) # package the coordinates coords = {"y": y, "x": x} mesh_coords = {"xp": xp, "yp": yp, "xs": xs, "ys": ys, "lon": lons, "lat": lats} return coords, mesh_coords
[docs] def get_piv( self, window_size: Optional[tuple[int, int]] = None, overlap: Optional[tuple[int, int]] = None, engine: str = "openpiv", **kwargs, ) -> xr.Dataset: """Perform PIV computation on projected frames. Only a pipeline graph to computation is set up. Call a result to trigger actual computation. The dataset returned contains velocity information for each interrogation window including "v_x" for x-direction velocity components, "v_y" for y-direction velocity component, "corr" for the maximum correlation [-] found in the interrogation window and s2n [-] for the signal to noise ratio. Parameters ---------- window_size : (int, int), optional size of interrogation windows in pixels (y, x) overlap : (int, int), optional amount of overlap between interrogation windows in pixels (y, x) engine : str, optional select the compute engine, can be "openpiv" (default), "numba", or "numpy". "numba" will give the fastest performance but is still experimental. It can boost performance by almost an order of magnitude compared to openpiv or numpy. both "numba" and "numpy" use the FF-PIV library as back-end. **kwargs : dict keyword arguments to pass to the piv engine. For "numba" and "numpy" the argument `chunks` can be provided with an integer defining in how many batches of work the total velocimetry problem should be subdivided. Returns ------- xr.Dataset PIV results in a lazy dask.array form in DataArrays "v_x", "v_y", "corr" and "s2n". See Also -------- OpenPIV project: https://github.com/OpenPIV/openpiv-python FF-PIV project: https://github.com/localdevices/ffpiv """ camera_config = copy.deepcopy(self.camera_config) dt = self._obj["time"].diff(dim="time") # Use window_size from camera_config unless provided in the method if window_size is not None: camera_config.window_size = window_size window_size = ( 2 * (camera_config.window_size,) if isinstance(camera_config.window_size, int) else camera_config.window_size ) # ensure window size is a round number window_size = window.round_to_even(window_size) search_area_size = window_size # set an overlap if not provided in kwargs if overlap is None: overlap = 2 * (int(round(camera_config.window_size) / 2),) # get all required coordinates for the PIV result coords, mesh_coords = self.get_piv_coords(window_size, search_area_size, overlap) # provide kwargs for OpenPIV analysis if engine == "openpiv": import warnings warnings.warn( '"openpiv" is currently the default engine, but it will be replaced by "numba" in a future release.', DeprecationWarning, stacklevel=2, ) kwargs = { **kwargs, "search_area_size": search_area_size[0], "window_size": window_size[0], "overlap": overlap[0], "res_x": camera_config.resolution, "res_y": camera_config.resolution, } ds = openpiv.get_openpiv(self._obj, coords["y"], coords["x"], dt, **kwargs) elif engine in ["numba", "numpy"]: kwargs = { **kwargs, "search_area_size": search_area_size, "window_size": window_size, "overlap": overlap, "res_x": camera_config.resolution, "res_y": camera_config.resolution, } ds = ffpiv.get_ffpiv(self._obj, coords["y"], coords["x"], dt, engine=engine, **kwargs) else: raise ValueError(f"Selected PIV engine {engine} does not exist.") # add all 2D-coordinates ds = ds.velocimetry.add_xy_coords(mesh_coords, coords, {**const.PERSPECTIVE_ATTRS, **const.GEOGRAPHICAL_ATTRS}) # ensure all metadata is transferred ds.attrs = self._obj.attrs # in case window_size was changed, overrule the camera_config attribute ds.attrs.update(camera_config=camera_config.to_json()) # set encoding ds.velocimetry.set_encoding() return ds
[docs] def project( self, method: Literal["cv", "numpy"] = "cv", resolution: Optional[float] = None, reducer: Optional[str] = "mean" ): """Project frames into a projected frames object, with information from the camera_config attr. This requires that the CameraConfig contains full gcp information. If a CRS is provided, also "lat" and "lon" variables will be added to the output, containing geographical latitude and longitude coordinates. Parameters ---------- method : str, optional can be `numpy` or `cv`. Currently `cv` is still default but this may change in a future release. With `cv` (opencv) resampling is performed by first undistorting images, and then by resampling to the desired grid. With heavily distorted images and part of the area of interest outside of the field of view, the orthoprojection of the corners may end up in the wrong space. With `numpy` each individual orthoprojected grid cell is mapped to the image pixels space. For oversampled areas, this is also done vice versa. Undersampled areas result in nearest-neighbour interpolations, whilst for oversampled, this results in a reduced value (which can be defined by the user). We recommend switching to `numpy` if you experience strange results with `cv`. resolution : float, optional resolution to project to. If not provided, this will be taken from the camera config in the metadata (Default value = None) reducer: str, optional Currently only used when `method="numpy"`. Default "mean", resulting in averaging of oversampled grid cells. Returns ------- frames: xr.DataArray projected frames and x and y in local coordinate system (origin: top-left), lat and lon if a crs was provided. """ cc = copy.deepcopy(self.camera_config) if resolution is not None: cc.resolution = resolution # convert bounding box coords into row/column space shape = cc.shape # prepare y and x axis of targfet y = np.flipud(np.linspace(cc.resolution / 2, cc.resolution * (shape[0] - 0.5), shape[0])) x = np.linspace(cc.resolution / 2, cc.resolution * (shape[1] - 0.5), shape[1]) cols, rows = np.meshgrid(np.arange(len(x)), np.arange(len(y))) xs, ys = helpers.get_xs_ys( cols, rows, cc.transform, ) if hasattr(cc, "crs"): lons, lats = helpers.get_lons_lats(xs, ys, cc.crs) else: lons = None lats = None coords = { "y": y, "x": x, } ## PROJECTION PREPARATIONS # ======================== z = cc.get_z_a(self.h_a) if not (hasattr(project, f"project_{method}")): raise ValueError(f"Selected projection method {method} does not exist.") proj_method = getattr(project, f"project_{method}") da_proj = proj_method(self._obj, cc, x, y, z, reducer) # ensure no missing values are persisting da_proj = da_proj.fillna(0.0) # assign coordinates da_proj = da_proj.frames.add_xy_coords( {"xs": xs, "ys": ys, "lon": lons, "lat": lats}, coords, const.GEOGRAPHICAL_ATTRS ) if "rgb" in da_proj.dims and len(da_proj.dims) == 4: # ensure that "rgb" is the last dimension da_proj = da_proj.transpose("time", "y", "x", "rgb") # in case resolution was changed, overrule the camera_config attribute da_proj.attrs.update(camera_config=cc.to_json()) return da_proj
def landmask(self, dilate_iter=10, samples=15): """Attempt to mask out land from water. This is done by assuming that the time standard deviation over mean of land is much higher than that of water. An automatic threshold using Otsu thresholding is used to separate and a dilation operation is used to make the land mask slightly larger than the exact defined pixels. Parameters ---------- dilate_iter : int, optional number of dilation iterations to use, to dilate land mask (Default value = 10) samples : int, optional amount of samples to retrieve from frames for estimating standard deviation and mean. Set to a lower number to speed up calculation (Default value = 15) Returns ------- da : xr.DataArray filtered frames """ time_interval = round(len(self._obj) / samples) assert time_interval != 0, f"Amount of frames is too small to provide {samples} samples" # ensure attributes are kept xr.set_options(keep_attrs=True) # compute standard deviation over mean, assuming this value is low over water, and high over land std_norm = (self._obj[::time_interval].std(axis=0) / self._obj[::time_interval].mean(axis=0)).load() # retrieve a simple 3x3 equal weight kernel kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) # dilate the std_norm by some dilation iterations dilate_std_norm = cv2.dilate(std_norm.values, kernel, iterations=dilate_iter) # rescale result to typical uint8 0-255 range img = cv2.normalize( dilate_std_norm, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F ).astype(np.uint8) # threshold with Otsu thresholding ret, thres = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # mask is where thres is mask = thres != 255 # make mask 3-dimensional return self._obj * mask
[docs] def normalize(self, samples=15): """Remove the temporal mean of sampled frames. This is typically used to remove non-moving background from foreground, and helps to increase contrast when river bottoms are visible, or when the objective contains partly illuminated and partly shaded parts. Parameters ---------- samples : int, optional amount of samples to retrieve from frames for estimating standard deviation and mean. Set to a lower number to speed up calculation, default: 15 (which is normally sufficient and fast enough). Returns ------- xr.DataArray normalized frames """ time_interval = round(len(self._obj) / samples) assert time_interval != 0, f"Amount of frames is too small to provide {samples} samples" # ensure attributes are kept xr.set_options(keep_attrs=True) mean = self._obj[::time_interval].mean(axis=0).load().astype("float32") frames_reduce = self._obj.astype("float32") - mean frames_min = frames_reduce.min(axis=-1).min(axis=-1) frames_max = frames_reduce.max(axis=-1).max(axis=-1) frames_norm = ((frames_reduce - frames_min) / (frames_max - frames_min) * 255).astype("uint8") return frames_norm
[docs] def edge_detect(self, wdw_1=1, wdw_2=2): """Highlight edges of frame intensities, using a band convolution filter. The filter uses two slightly differently convolved images and computes their difference to detect edges. Parameters ---------- wdw_1 : int, optional window size to use for first gaussian blur filter (Default value = 2) wdw_2 : int, optional stride to use for second gaussian blur filter (Default value = 4) Returns ------- xr.DataArray filtered frames (i.e. difference between first and second gaussian convolution) """ # shape = self._obj[0].shape # single-frame shape does not change # da_convert_edge = dask.delayed(cv._convert_edge) stride_1 = wdw_1 * 2 + 1 stride_2 = wdw_2 * 2 + 1 return xr.apply_ufunc( cv._convert_edge, self._obj, stride_1, stride_2, input_core_dims=[["y", "x"], [], []], output_core_dims=[["y", "x"]], vectorize=True, # exclude_dims=set(("y",)), # kwargs={"stride_1": 5, "stride_2": 9}, dask="parallelized", keep_attrs=True, )
[docs] def minmax(self, min=-np.Inf, max=np.Inf): """Minimum / maximum intensity filter. All pixels will be thresholded to a minimum and maximum value. Parameters ---------- min : float, optional minimum value to bound intensities to. If not provided, no minimum bound is used. max : float, optional maximum value to bound intensities to. If not provided, no maximum bound is used. Returns ------- xr.DataArray Treated frames """ return np.maximum(np.minimum(self._obj, max), min)
[docs] def reduce_rolling(self, samples=25): """Remove a rolling mean from the frames. Very slow method, so in most cases, it is recommended to use Frames.normalize instead. Parameters ---------- samples : int, optional number of samples per rolling (Default value = 25) Returns ------- da : xr.DataArray filtered frames """ roll_mean = self._obj.rolling(time=samples).mean() assert len(self._obj) >= samples, f"Amount of frames is smaller than requested rolling of {samples} samples" # ensure attributes are kept xr.set_options(keep_attrs=True) # normalize = dask.delayed(cv2.normalize) frames_reduce = self._obj - roll_mean frames_thres = np.maximum(frames_reduce, 0) # # normalize frames_norm = (frames_thres * 255 / frames_thres.max(axis=-1).max(axis=-1)).astype("uint8") frames_norm = frames_norm.where(roll_mean != 0, 0) return frames_norm
[docs] def time_diff(self, thres=2, abs=False): """Apply a difference over time. Method subtracts frame 1 from frame 2, frame 2 from frame 3, etcetera. This method is very efficient to highlight moving objects when the video is very stable. If the video is very unstable this filter may lead to very bad results. Parameters ---------- thres : float, optional obsolute value intensity threshold to set values to zero when intensity is lower than this threshold default: 2. abs : boolean, optional if set to True (default: False) apply absolute value on result Returns ------- da : xr.DataArray filtered frames """ frames_diff = self._obj.astype(np.float32).diff(dim="time") frames_diff = frames_diff.where(np.abs(frames_diff) > thres) frames_diff.attrs = self._obj.attrs # frames_diff -= frames_diff.min(dim=["x", "y"]) frames_diff = frames_diff.fillna(0.0) if abs: return np.abs(frames_diff) return frames_diff
[docs] def smooth(self, wdw=1): """Smooth each frame with a Gaussian kernel. Parameters ---------- wdw : int, optional window height or width applied. if set to 1 (default) then the total window is 3x3 (i.e. 2 * 1 + 1). When set to 2, the total window is 5x5 (i.e. 2 * 2 + 1). Very effective to apply before ``Frames.time_diff``. The value for ``wdw`` shouild be chosen such that the moving features of interest are not removed from the view. This can be based on a visual interpretation of a result. Returns ------- da : xr.DataArray filtered frames """ stride = wdw * 2 + 1 f = xr.apply_ufunc( cv._smooth, self._obj, stride, input_core_dims=[["y", "x"], []], output_core_dims=[["y", "x"]], output_dtypes=[np.float32], vectorize=True, dask="parallelized", keep_attrs=True, ) return f
[docs] def to_ani( self, fn, figure_kwargs=const.FIGURE_ARGS, video_kwargs=const.VIDEO_ARGS, anim_kwargs=const.ANIM_ARGS, progress_bar=True, **kwargs, ): """Store an animation of the frames in the object. Parameters ---------- fn : str path to file to which animation is stored figure_kwargs : dict, optional keyword args passed to ``matplotlib.pyplot.figure`` (Default value = const.FIGURE_ARGS) video_kwargs : dict, optional keyword arguments passed to ``save`` method of animation, containing parameters such as the frame rate, dpi and codec settings to use. (Default value = const.VIDEO_ARGS) anim_kwargs : dict, optional keyword arguments passed to ``matplotlib.animation.FuncAnimation`` to control the animation. (Default value = const.ANIM_ARGS) progress_bar : bool, optional print a progress bar while storing video (default: True) **kwargs : dict keyword arguments to pass to ``matplotlib.pyplot.imshow`` """ def init(): # set imshow data to values in the first frame im.set_data(self._obj[0]) return ax # line, def animate(i): # set imshow data to values in the current frame im.set_data(self._obj[i]) return ax # setup a standard 16/9 borderless window with black background f = plt.figure(**figure_kwargs) f.set_size_inches(16, 9, True) f.patch.set_facecolor("k") f.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=None, hspace=None) ax = plt.subplot(111) im = ax.imshow(self._obj[0], **kwargs) if progress_bar: frames = tqdm(range(len(self._obj)), position=0, leave=True) else: frames = range(len(self._obj)) anim = FuncAnimation(f, animate, init_func=init, frames=frames, **anim_kwargs) anim.save(fn, **video_kwargs)
[docs] def to_video(self, fn, video_format=None, fps=None): """Write frames to a video file without any layout. Frames from the input object are written into a video file. The format and frame rate can be customized as per user preference or derived automatically from the input object. Parameters ---------- fn : str Path to the output video file. video_format : cv2.VideoWriter_fourcc, optional The desired video file format codec. If not provided, defaults to `cv2.VideoWriter_fourcc(*"mp4v")`. fps : float, optional Frames per second for the output video. If not specified, it is estimated from the time differences in the input frames. """ # """Write frames to a video file without any layout. # # Parameters # ---------- # fn : str # Path to output file # video_format : cv2.VideoWriter_fourcc, optional # A VideoWriter preference, default is cv2.VideoWriter_fourcc(*"mp4v") # fps : float, optional # Frames per second, if not provided, derived from original video # # """ if video_format is None: # set to a default video_format = cv2.VideoWriter_fourcc(*"mp4v") if fps is None: # estimate it from the time differences fps = 1 / (self._obj["time"].diff(dim="time").values.mean()) h = self._obj.shape[1] w = self._obj.shape[2] out = cv2.VideoWriter(fn, video_format, fps, (w, h)) pbar = tqdm(self._obj, position=0, leave=True) pbar.set_description("Writing frames") for n, f in enumerate(pbar): if len(f.shape) == 3: img = cv2.cvtColor(np.uint8(f.values), cv2.COLOR_RGB2BGR) else: img = f.values if n == 0: # make a scale between 0 and 255, only with first frame img_min = img.min(axis=0).min(axis=0) img_max = img.max(axis=0).max(axis=0) img = np.uint8(255 * ((img - img_min) / (img_max - img_min))) img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) out.write(img) out.release()
plot = _frames_plot