Source code for pyorc.api.video

import copy
import cv2
import dask
import dask.array as da
import json

import matplotlib.pyplot as plt
import numpy as np
import os
import warnings
import xarray as xr

from typing import List, Optional, Union

from .. import cv, const
from .cameraconfig import load_camera_config, get_camera_config, CameraConfig


[docs]class Video: # (cv2.VideoCapture) def __repr__(self): template = """ Filename: {:s} FPS: {:f} start frame: {:d} end frame: {:d} Camera configuration: {:s} """.format return template( self.fn, self.fps, self.start_frame, self.end_frame, self.camera_config.__repr__() if hasattr(self, "camera_config") else "none" )
[docs] def __init__( self, fn: str, camera_config: Optional[Union[str, CameraConfig]] = None, h_a: Optional[float] = None, start_frame: Optional[int] = None, end_frame: Optional[int] = None, freq: Optional[int] = 1, stabilize: Optional[List[List]] = None, ): """ Video class, inheriting parts from cv2.VideoCapture. Contains a camera configuration to it, and a start and end frame to read from the video. Several methods read frames into memory or into a xr.DataArray with attributes. These can then be processed with other pyorc API functionalities. Parameters ---------- fn : str Locally stored video file camera_config : pyorc.CameraConfig, optional contains all information about the camera, lens parameters, lens position, ground control points with GPS coordinates, and all referencing information (see CameraConfig), needed to reproject frames on a horizontal geographically referenced plane. h_a : float, optional actual height [m], measured in local vertical reference during the video (e.g. a staff gauge in view of the camera) start_frame : int, optional first frame to use in analysis (default: 0) end_frame : int, optional last frame to use in analysis (if not set, last frame available in video will be used) stabilize : list of lists, optional set of coordinates, that together encapsulate the polygon that defines the mask, separating land from water. The mask is used to select region (on land) for rigid point search for stabilization. If not set, then no stabilization will be performed """ assert (isinstance(start_frame, (int, type(None)))), 'start_frame must be of type "int"' assert (isinstance(end_frame, (int, type(None)))), 'end_frame must be of type "int"' # assert (isinstance(stabilize, (list, type(None)))), f'stabilize must contain a list of points, but is {stabilize}' self.feats_stats = None self.feats_errs = None self.ms = None self.mask = None self.stabilize = stabilize if camera_config is not None: self.camera_config = camera_config # if camera_config is not None: # check if h_a is supplied, if so, then also z_0 and h_ref must be available if h_a is not None: assert (isinstance(self.camera_config.gcps["z_0"], float)), \ "h_a was supplied, but camera config's gcps do not contain z_0, this is needed for dynamic " \ "reprojection. You can supplying z_0 and h_ref in the camera_config's gcps upon making a camera " \ "configuration. " assert (isinstance(self.camera_config.gcps["h_ref"], float)), \ "h_a was supplied, but camera config's gcps do not contain h_ref, this is needed for dynamic " \ "reprojection. You must supply z_0 and h_ref in the camera_config's gcps upon making a camera " \ "configuration. " cap = cv2.VideoCapture(fn) cap.set(cv2.CAP_PROP_ORIENTATION_AUTO, 180.0) self.height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) # explicitly open file for reading if self.stabilize is not None: # set a gridded mask based on the roi points self.set_mask_from_exterior(self.stabilize) # set end and start frame self.frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if start_frame is not None: if (start_frame > self.frame_count and self.frame_count > 0): raise ValueError("Start frame is larger than total amount of frames") else: start_frame = 0 if end_frame is not None: if end_frame < start_frame: raise ValueError( f"Start frame {start_frame} is larger than end frame {end_frame}" ) # end frame cannot be larger than total amount of available frames end_frame = np.minimum(end_frame, self.frame_count) else: end_frame = self.frame_count # extract times and frame numbers as far as available time, frame_number = cv.get_time_frames(cap, start_frame, end_frame) # check if end_frame changed if frame_number[-1] != end_frame: warnings.warn(f"End frame {end_frame} cannot be read from file. End frame is adapted to {frame_number[-1]}") end_frame = frame_number[-1] self.end_frame = end_frame self.freq = freq self.time = time self.frame_number = frame_number self.start_frame = start_frame if self.stabilize is not None: # select the right recipe dependent on the movie being fixed or moving # recipe = const.CLASSIFY_CAM[self.stabilize] if self.stabilize in const.CLASSIFY_CAM else [] self.get_ms(cap) self.fps = cap.get(cv2.CAP_PROP_FPS) self.rotation = cap.get(cv2.CAP_PROP_ORIENTATION_META) # set other properties self.h_a = h_a # make camera config part of the vidoe object self.fn = fn self._stills = {} # here all stills are stored lazily # nothing to be done at this stage, release file for now. cap.release() del cap
@property def mask(self): """ Returns ------- np.ndarray Mask of region of interest """ return self._mask @mask.setter def mask(self, mask): if mask is None: self._mask = None else: self._mask = mask @property def camera_config(self): """ :return: CameraConfig object """ return self._camera_config @camera_config.setter def camera_config(self, camera_config_input): """ Set camera config as a serializable object from either a filename, json string or a dict :param camera_config_input: str, dict, CameraConfig object, filename string, or json string containing camera configuration. """ try: if isinstance(camera_config_input, str): if os.path.isfile(camera_config_input): # assume string is a file self._camera_config = load_camera_config(camera_config_input) else: # Try to read CameraConfig from string self._camera_config = get_camera_config(camera_config_input) elif isinstance(camera_config_input, CameraConfig): # set CameraConfig as is self._camera_config = camera_config_input elif isinstance(camera_config_input, dict): # Create CameraConfig from dict self._camera_config = CameraConfig(**camera_config_input) except: raise IOError( "Could not recognise input as a CameraConfig file, string, dictionary or CameraConfig object.") @property def end_frame(self): """ :return: int, last frame considered in analysis """ return self._end_frame @end_frame.setter def end_frame(self, end_frame=None): # sometimes last frames are not read by OpenCV, hence we skip the last frame always if end_frame is None: self._end_frame = self.frame_count - 1 else: self._end_frame = min(self.frame_count - 1, end_frame) @property def freq(self): """ Returns ------- int: frequency (1 in nth frames to select) """ return self._freq @freq.setter def freq(self, freq=1): self._freq = freq @property def stabilize(self): if self._stabilize is not None: return self._stabilize elif hasattr(self, "camera_config"): if hasattr(self.camera_config, "stabilize"): return self.camera_config.stabilize @stabilize.setter def stabilize( self, coords: Optional[List[List]] = None ): self._stabilize = coords @property def h_a(self): """ :return: Actual water level [m] during video """ return self._h_a @h_a.setter def h_a( self, h_a: float ): if h_a is not None: assert (isinstance(h_a, float)), f"The actual water level must be a float, you supplied a {type(h_a)}" if h_a < 0: warnings.warn( "Water level is negative. This can be correct, but may be unlikely, especially if you use a staff gauge.") self._h_a = h_a @property def start_frame(self): """ :return: int, first frame considered in analysis """ return self._start_frame @start_frame.setter def start_frame( self, start_frame: Optional[int] = None ): if start_frame is None: self._start_frame = 0 else: self._start_frame = start_frame @property def fps(self): """ :return: float, frames per second """ return self._fps @fps.setter def fps( self, fps: float ): if (np.isinf(fps)) or (fps <= 0): raise ValueError(f"FPS in video is {fps} which is not a valid value. Repair the video file before use") self._fps = fps @property def corners(self): """ :return: list of 4 lists (int) with [column, row] locations of area of interest in video objective """ return self._corners @corners.setter def corners( self, corners: List[List] ): self._corners = corners @property def rotation(self): return self._rotation @rotation.setter def rotation( self, rotation_code: int ): """ Solves a likely bug in OpenCV (4.6.0) that straight up videos rotate in the wrong direction. Tested for both 90 degree and 270 degrees rotation videos on several smartphone (iPhone and Android) """ if rotation_code in [90, 270]: self._rotation = cv2.ROTATE_180 else: self._rotation = None
[docs] def get_frame( self, n: int, method: Optional[str] = "grayscale", lens_corr: Optional[bool] = False ) -> np.ndarray: """ Retrieve one frame. Frame will be corrected for lens distortion if lens parameters are given. Parameters: ----------- n : int frame number to retrieve method : str can be "rgb", "grayscale", or "hsv", default: "grayscale" lens_corr: bool, optional if set to True, lens parameters will be used to undistort image Returns ------- frame : np.ndarray 2d array (grayscale) or 3d (rgb/hsv) with frame """ assert (n >= 0), "frame number cannot be negative" assert ( n - self.start_frame <= self.end_frame - self.start_frame), "frame number is larger than the different between the start and end frame" assert (method in ["grayscale", "rgb", "hsv"]), f'method must be "grayscale", "rgb" or "hsv", method is "{method}"' cap = cv2.VideoCapture(self.fn) cap.set(cv2.CAP_PROP_POS_FRAMES, n + self.start_frame) try: ret, img = cap.read() if self.rotation is not None: img = cv2.rotate(img, self.rotation) except: raise IOError(f"Cannot read") if ret: if self.ms is not None: img = cv.transform(img, self.ms[n]) # apply lens distortion correction if hasattr(self, "camera_config"): img = cv.undistort_img(img, self.camera_config.camera_matrix, self.camera_config.dist_coeffs) if method == "grayscale": # apply gray scaling, contrast- and gamma correction # img = _corr_color(img, alpha=None, beta=None, gamma=0.4) img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # mean(axis=2) elif method == "rgb": # turn bgr to rgb for plotting purposes img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) elif method == "hsv": img = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) self.frame_count = n + 1 cap.set(cv2.CAP_PROP_POS_FRAMES, 0) cap.release() return img
[docs] def get_frames( self, **kwargs ) -> xr.DataArray: """ Get a xr.DataArray, containing a dask array of frames, from `start_frame` until `end_frame`, expected to be read lazily. The xr.DataArray will contain all coordinate variables and attributes, needed for further processing steps. Parameters ---------- **kwargs: dict, optional keyword arguments to pass to `get_frame`. Currently only `grayscale` is supported. Returns ------- frames : xr.DataArray containing all requested frames """ assert (hasattr(self, "_camera_config")), "No camera configuration is set, add it to the video using the .camera_config method" # camera_config may be altered for the frames object, so copy below camera_config = copy.deepcopy(self.camera_config) get_frame = dask.delayed(self.get_frame, pure=True) # Lazy version of get_frame # get all listed frames frames = [get_frame(n=n, **kwargs) for n, f_number in enumerate(self.frame_number)] sample = frames[0].compute() data_array = [da.from_delayed( frame, dtype=sample.dtype, shape=sample.shape ) for frame in frames] # undistort source control points if hasattr(camera_config, "gcps"): camera_config.gcps["src"] = cv.undistort_points( camera_config.gcps["src"], camera_config.camera_matrix, camera_config.dist_coeffs, ) time = np.array( self.time) * 0.001 # measure in seconds to comply with CF conventions # np.arange(len(data_array))*1/self.fps # y needs to be flipped up down to match the order of rows followed by coordinate systems (bottom to top) y = np.flipud(np.arange(data_array[0].shape[0])) x = np.arange(data_array[0].shape[1]) # perspective column and row coordinate grids xp, yp = np.meshgrid(x, y) coords = { "time": time, "y": y, "x": x } if len(sample.shape) == 3: coords["rgb"] = np.array([0, 1, 2]) # make DataArray dimensions and attributes dims = tuple(coords.keys()) attrs = { "camera_shape": str([len(y), len(x)]), "camera_config": camera_config.to_json(), "h_a": json.dumps(self.h_a) } frames = xr.DataArray( da.stack(data_array, axis=0), dims=dims, coords=coords, attrs=attrs )[::self.freq] del coords["time"] if len(sample.shape) == 3: del coords["rgb"] # add coordinate grids (i.e. without time) frames = frames.frames._add_xy_coords([xp, yp], coords, const.PERSPECTIVE_ATTRS) frames.name = "frames" return frames
def set_mask_from_exterior( self, exterior ): """ Prepare a mask grid with 255 outside of the stabilization polygon and 0 inside Parameters ---------- exterior : list of lists coordinates defining the polygon for masking Returns ------- self.mask : np.ndarray mask for stabilization region """ mask_coords = np.array([exterior], dtype=np.int32) mask = np.zeros((self.height, self.width), np.uint8) mask = cv2.fillPoly(mask, [mask_coords], 255) mask[mask == 0] = 1 mask[mask == 255] = 0 mask[mask == 1] = 255 self.mask = mask def get_ms( self, cap: cv2.VideoCapture, split: Optional[int] = 2 ): self.ms = cv._get_ms_gftt( cap, start_frame=self.start_frame, end_frame=self.end_frame, split=split, mask=self.mask, )