Source code for pyorc.api.transect

"""Transect module for pyorc."""

import numpy as np
import xarray as xr
from xarray.core import utils

from pyorc import helpers

from .cross_section import CrossSection
from .orcbase import ORCBase
from .plot import _Transect_PlotMethods


[docs]@xr.register_dataset_accessor("transect") class Transect(ORCBase): """Transect functionalities that can be applied on ``xarray.Dataset``."""
[docs] def __init__(self, xarray_obj): """Initialize a transect ``xarray.Dataset`` containing cross-sectional velocities in [time, points] dimensions. Parameters ---------- xarray_obj: xr.Dataset transect data fields (from ``pyorc.Velocimetry.get_transect``) """ super(Transect, self).__init__(xarray_obj)
@property def cross_section(self): """Return cross-sectional coordinates as `CrossSection` object.""" if not hasattr(self._obj, "zcoords"): return None coords = [ [_x, _y, _z] for _x, _y, _z in zip(self._obj.xcoords, self._obj.ycoords, self._obj.zcoords, strict=False) ] return CrossSection(camera_config=self.camera_config, cross_section=coords)
[docs] def vector_to_scalar(self, v_x="v_x", v_y="v_y"): """Set "v_eff" and "v_dir" variables as effective velocities over cross-section, and its angle. Parameters ---------- v_x : str, optional name of variable containing x-directional velocities. (default: "v_x") v_y : name of variable containing y-directional velocities. (default: "v_y") Returns ------- da : xr.DataArray velocities perpendicular to cross section [time, points] """ # compute per velocity vector in the dataset, what its angle is v_angle = np.arctan2(self._obj[v_x], self._obj[v_y]) # compute the scalar value of velocity v_scalar = (self._obj[v_x] ** 2 + self._obj[v_y] ** 2) ** 0.5 # compute difference in angle between velocity and perpendicular of cross section flow_dir = self._obj["v_dir"] angle_diff = v_angle - flow_dir # compute effective velocity in the flow direction (i.e. perpendicular to cross section) v_eff = np.cos(angle_diff) * v_scalar v_eff.attrs = { "standard_name": "velocity", "long_name": "velocity in perpendicular direction of cross section, measured by angle in radians, " "measured from up-direction", "units": "m s-1", } # set name v_eff.name = "v_eff_nofill" # there still may be gaps in this series self._obj["v_eff_nofill"] = v_eff
[docs] def get_bottom_surface_z_perspective(self, h, sample_size=1000, interval=None): """Return densified bottom and surface points, warped to image perspective.""" # get bottom coordinates bottom_points = self.get_transect_perspective(within_image=True) # get surface coordinates surface_points = self.get_transect_perspective(h=h, within_image=True) # densify points, to ensure zero water level crossings are captured bottom_points = helpers.densify_points(bottom_points, sample_size=sample_size) surface_points = helpers.densify_points(surface_points, sample_size=sample_size) # also densify to the same amount with zcoords z_points = helpers.densify_points(self._obj.zcoords, sample_size=sample_size) if interval is not None: # only sample every interval-th point bottom_points = bottom_points[::interval] surface_points = surface_points[::interval] z_points = z_points[::interval] # understand which points are below surface z_surface = h - self.camera_config.gcps["h_ref"] + self.camera_config.gcps["z_0"] mask = z_points < z_surface # filter bottom and surface bottom_points = np.array(bottom_points)[mask] surface_points = np.array(surface_points)[mask] return bottom_points, surface_points
[docs] def get_transect_perspective(self, h=None, within_image=True): """Get row, col locations of the transect coordinates. Parameters ---------- h : float, optional Water level (measured locally) used to calculate surface coordinates. If not provided, bottom coordinates. are used. within_image : bool, optional If True (default), removes points outside the camera objective. Returns ------- numpy.ndarray array of projected transect points based on the camera configuration and at given water level (if provided). """ x = self._obj.xcoords y = self._obj.ycoords if h is not None: z_surface = h - self.camera_config.gcps["h_ref"] + self.camera_config.gcps["z_0"] z = np.ones(len(x)) * z_surface # retrieve coordinates at the surface else: z = self._obj.zcoords # retrieve the bottom coordinates points = [[_x, _y, _z] for _x, _y, _z in zip(x, y, z, strict=False)] # ensure that y coordinates are in the right direction points_proj = self.camera_config.project_points(points, within_image=within_image, swap_y_coords=True) return points_proj
[docs] def get_wetted_perspective(self, h, sample_size=1000): """Get wetted polygon in camera perspective. Parameters ---------- h : float The water level value to calculate the surface perspective. sample_size : int, optional The number of points to densify the transect with, by default 1000 Returns ------- ndarray A numpy array containing the points forming the wetted polygon perspective. """ bottom_points, surface_points = self.get_bottom_surface_z_perspective(h=h, sample_size=sample_size) # concatenate points reversing one set for preps of a polygon pol_points = np.concatenate([bottom_points, np.flipud(surface_points)], axis=0) # add the first point at the end to close the polygon pol_points = np.concatenate([pol_points, pol_points[0:1]], axis=0) return pol_points
[docs] def get_depth_perspective(self, h, sample_size=1000, interval=25): """Get line (x, y) pairs that show the depth over several intervals in the wetted part of the cross section. Parameters ---------- h : float The water level with which the depth perspective needs to be calculated. sample_size : int, optional The number of samples to create by interpolating the cross section (default is 1000). interval : int, optional The interval between extended samples (default is 25). Returns ------- List of (x, y) tuple pairs. Each tuple pair defines one perspective depth line. """ bottom_points, surface_points = self.get_bottom_surface_z_perspective( h=h, sample_size=sample_size, interval=interval ) # make line pairs return list(zip(bottom_points, surface_points, strict=False))
[docs] def get_xyz_perspective(self, trans_mat=None, xs=None, ys=None, mask_outside=True): """Get camera-perspective column, row coordinates from cross-section locations. Parameters ---------- trans_mat : np.ndarray, optional perspective transform matrix (Default value = None) xs : np.array, optional x-coordinates to transform, derived from self.x if not provided (Default value = None) ys : y-coordinates to transform, derived from self.y if not provided (Default value = None) mask_outside : values not fitting in the original camera frame are set to NaN (Default value = True) Returns ------- cols : list of ints columns of locations in original camera perspective rows : list of ints rows of locations in original camera perspective """ if xs is None: xs = self._obj.x.values if ys is None: ys = self._obj.y.values # compute bathymetry as measured in local height reference (such as staff gauge) # if self.camera_config.gcps["h_ref"] is None: # h_ref = 0.0 # else: # h_ref = self.camera_config.gcps["h_ref"] hs = self.camera_config.z_to_h(self._obj.zcoords).values # zs = (self._obj.zcoords - self.camera_config.gcps["z_0"] + h_ref).values if trans_mat is None: ms = [self.camera_config.get_M(h, reverse=True, to_bbox_grid=True) for h in hs] else: # use user defined M instead ms = [trans_mat for _ in hs] # compute row and column position of vectors in original reprojected background image col/row coordinates cols, rows = zip( *[ helpers.xy_to_perspective( x, y, self.camera_config.resolution, trans_mat, reverse_y=self.camera_config.shape[0] ) for x, y, trans_mat in zip(xs, ys, ms, strict=False) ], strict=False, ) # ensure y coordinates start at the top in the right orientation shape_y, shape_x = self.camera_shape rows = shape_y - np.array(rows) cols = np.array(cols) if mask_outside: # remove values that do not fit in the frames cols[np.any([cols < 0, cols > self.camera_shape[1]], axis=0)] = np.nan rows[np.any([rows < 0, rows > self.camera_shape[0]], axis=0)] = np.nan return cols, rows
[docs] def get_river_flow(self, q_name="q", discharge_name="river_flow"): """Integrate time series of depth averaged velocities [m2 s-1] into cross-section integrated flow [m3 s-1]. Depth average velocities must first have been estimated using get_q. A variable "Q" will be added to Dataset with only "quantiles" as dimension. Parameters ---------- q_name : str, optional name of variable where depth integrated velocities [m2 s-1] are stored (Default value = "q") discharge_name : str, optional name of variable where resulting width integrated river flow estimates must be stored (Default value = "river_flow") """ if "q" not in self._obj: raise ValueError( f'Dataset must contain variable "{q_name}", which is the depth-integrated velocity [m2 s-1], ' "perpendicular to cross-section. Create this with ds.transect.get_q" ) # integrate over the distance coordinates (s-coord) discharge = self._obj[q_name].fillna(0.0).integrate(coord="scoords") discharge.attrs = { "standard_name": "river_discharge", "long_name": "River Flow", "units": "m3 s-1", } # set name discharge.name = "Q" self._obj[discharge_name] = discharge
[docs] def get_q(self, v_corr=0.9, fill_method="zeros"): """Integrated velocity over depth for quantiles of time series. A correction `v_corr` between surface velocity and depth-average velocity is used. Parameters ---------- v_corr : float, optional correction factor (default: 0.9) fill_method : str, optional method to fill missing values. "zeros" fills NaNS with zeros, "interpolate" interpolates values from nearest neighbour, "log_interp" interpolates values linearly with velocities scaled by the log of depth over a roughness length, "log_fit" fits a 4-parameter logarithmic profile with depth and with changing velocities towards banks on known velocities, and fills missing with the fitted relationship (experimental) (Default value = "zeros"). Returns ------- ds : xr.Dataset Transect for selected quantiles in time, with "q_nofill" containing the integrated values, and "q" the integrated values, filled with chosen fill method. """ # aggregate to a limited set of quantiles assert fill_method in [ "zeros", "log_fit", "log_interp", "interpolate", ], f'fill_method must be "zeros", "log_fit", "log_interp", or "interpolate", instead "{fill_method}" given' ds = self._obj x = ds["xcoords"].values y = ds["ycoords"].values z = ds["zcoords"].values # add filled surface velocities with a logarithmic profile curve fit depth = self.camera_config.get_depth(z, self.h_a) # make velocities zero where depth is zero ds["v_eff_nofill"][:, depth <= 0] = 0.0 if fill_method == "zeros": ds["v_eff"] = ds["v_eff_nofill"].fillna(0.0) elif fill_method == "log_fit": dist_shore = self.camera_config.get_dist_shore(x, y, z, self.h_a) ds["v_eff"] = helpers.velocity_log_fit(ds["v_eff_nofill"], depth, dist_shore, dim="quantile") elif fill_method == "log_interp": dist_wall = self.camera_config.get_dist_wall(x, y, z, self.h_a) ds["v_eff"] = helpers.velocity_log_interp(ds["v_eff_nofill"], dist_wall, dim="quantile") elif fill_method == "interpolate": depth = ds.zcoords * 0 + self.camera_config.get_depth(ds.zcoords, self.h_a) # interpolate gaps in between known values ds["v_eff"] = ds["v_eff_nofill"].interpolate_na(dim="points") # anywhere where depth == 0, remove values ds["v_eff"] = ds["v_eff"].where(depth > 0) # fill NAs with zeros ds["v_eff"] = ds["v_eff"].fillna(0.0) # compute q for both non-filled and filled velocities ds["q_nofill"] = helpers.depth_integrate(depth, ds["v_eff_nofill"], v_corr=v_corr, name="q_nofill") ds["q"] = helpers.depth_integrate(depth, ds["v_eff"], v_corr=v_corr, name="q") return ds
plot = utils.UncachedAccessor(_Transect_PlotMethods)