Source code for pyorc.api.transect

import numpy as np
import xarray as xr

from xarray.core import utils

from pyorc import helpers
from .plot import _Transect_PlotMethods
from .orcbase import ORCBase


[docs]@xr.register_dataset_accessor("transect") class Transect(ORCBase): """Transect functionalities that can be applied on ``xarray.Dataset``"""
[docs] def __init__(self, xarray_obj): """ Initialize a transect ``xarray.Dataset`` containing cross sectional velocities in [time, points] dimensions Parameters ---------- xarray_obj: xr.Dataset transect data fields (from ``pyorc.Velocimetry.get_transect``) """ super(Transect, self).__init__(xarray_obj)
[docs] def vector_to_scalar(self, v_x="v_x", v_y="v_y"): """Set "v_eff" and "v_dir" variables as effective velocities over cross-section, and its angle Parameters ---------- v_x : str, optional name of variable containing x-directional velocities. (default: "v_x") v_y : name of variable containing y-directional velocities. (default: "v_y") Returns ------- da : xr.DataArray velocities perpendicular to cross section [time, points] """ xs = self._obj["x"].values ys = self._obj["y"].values # find points that are located on the area of interest idx = np.isfinite(xs) xs = xs[idx] ys = ys[idx] # compute per velocity vector in the dataset, what its angle is v_angle = np.arctan2(self._obj[v_x], self._obj[v_y]) # compute the scalar value of velocity v_scalar = (self._obj[v_x] ** 2 + self._obj[v_y] ** 2) ** 0.5 # compute difference in angle between velocity and perpendicular of cross section flow_dir = self._obj["v_dir"] angle_diff = v_angle - flow_dir # compute effective velocity in the flow direction (i.e. perpendicular to cross section) v_eff = np.cos(angle_diff) * v_scalar v_eff.attrs = { "standard_name": "velocity", "long_name": "velocity in perpendicular direction of cross section, measured by angle in radians, " "measured from up-direction", "units": "m s-1", } # set name v_eff.name = "v_eff_nofill" # there still may be gaps in this series self._obj["v_eff_nofill"] = v_eff
[docs] def get_xyz_perspective(self, M=None, xs=None, ys=None, mask_outside=True): """Get camera-perspective column, row coordinates from cross-section locations. Parameters ---------- M : np.ndarray, optional perspective transform matrix (Default value = None) xs : np.array, optional x-coordinates to transform, derived from self.x if not provided (Default value = None) ys : y-coordinates to transform, derived from self.y if not provided (Default value = None) mask_outside : values not fitting in the original camera frame are set to NaN (Default value = True) Returns ------- cols : list of ints columns of locations in original camera perspective rows : list of ints rows of locations in original camera perspective """ if xs is None: xs = self._obj.x.values if ys is None: ys = self._obj.y.values # compute bathymetry as measured in local height reference (such as staff gauge) if self.camera_config.gcps["h_ref"] is None: h_ref = 0. else: h_ref = self.camera_config.gcps["h_ref"] hs = self.camera_config.z_to_h(self._obj.zcoords).values # zs = (self._obj.zcoords - self.camera_config.gcps["z_0"] + h_ref).values if M is None: Ms = [self.camera_config.get_M(h, reverse=True, to_bbox_grid=True) for h in hs] else: # use user defined M instead Ms = [M for _ in hs] # compute row and column position of vectors in original reprojected background image col/row coordinates cols, rows = zip(*[ helpers.xy_to_perspective( x, y, self.camera_config.resolution, M, reverse_y=self.camera_config.shape[0] ) for x, y, M in zip(xs, ys, Ms) ]) # ensure y coordinates start at the top in the right orientation shape_y, shape_x = self.camera_shape rows = shape_y - np.array(rows) cols = np.array(cols) if mask_outside: # remove values that do not fit in the frames cols[np.any([cols < 0, cols > self.camera_shape[1]], axis=0)] = np.nan rows[np.any([rows < 0, rows > self.camera_shape[0]], axis=0)] = np.nan return cols, rows
[docs] def get_river_flow(self, q_name="q", Q_name="river_flow"): """Integrate time series of depth averaged velocities [m2 s-1] into cross-section integrated flow [m3 s-1] estimating one or several quantiles over the time dimension. Depth average velocities must first have been estimated using get_q. A variable "Q" will be added to Dataset, with only "quantiles" as dimension. Parameters ---------- q_name : str, optional name of variable where depth integrated velocities [m2 s-1] are stored (Default value = "q") Q_name : str, optional name of variable where resulting width integrated river flow estimates must be stored (Default value = "river_flow") """ if "q" not in self._obj: raise ValueError(f'Dataset must contain variable "{q_name}", which is the depth-integrated velocity [m2 s-1], perpendicular to cross-section. Create this with ds.transect.get_q') # integrate over the distance coordinates (s-coord) Q = self._obj[q_name].fillna(0.0).integrate(coord="scoords") Q.attrs = { "standard_name": "river_discharge", "long_name": "River Flow", "units": "m3 s-1", } # set name Q.name = "Q" self._obj[Q_name] = Q
[docs] def get_q(self, v_corr=0.9, fill_method="zeros"): """Depth integrated velocity for quantiles of time series using a correction v_corr between surface velocity and depth-average velocity. Parameters ---------- v_corr : float, optional correction factor (default: 0.9) fill_method : str, optional method to fill missing values. "zeros" fills NaNS with zeros, "interpolate" interpolates values from nearest neighbour, "log_interp" interpolates values linearly with velocities scaled by the log of depth over a roughness length, "log_fit" fits a 4-parameter logarithmic profile with depth and with changing velocities towards banks on known velocities, and fills missing with the fitted relationship (experimental) (Default value = "zeros"). Returns ------- ds : xr.Dataset Transect for selected quantiles in time, with "q_nofill" containing the integrated values, and "q" the integrated values, filled with chosen fill method. """ # aggregate to a limited set of quantiles assert(fill_method in ["zeros", "log_fit", "log_interp", "interpolate"]),\ f'fill_method must be "zeros", "log_fit", "log_interp", or "interpolate", instead "{fill_method}" given' ds = self._obj x = ds["xcoords"].values y = ds["ycoords"].values z = ds["zcoords"].values # add filled surface velocities with a logarithmic profile curve fit depth = self.camera_config.get_depth(z, self.h_a) # make velocities zero where depth is zero ds["v_eff_nofill"][:, depth<=0] = 0. if fill_method == "zeros": ds["v_eff"] = ds["v_eff_nofill"].fillna(0.) elif fill_method == "log_fit": dist_shore = self.camera_config.get_dist_shore(x, y, z, self.h_a) ds["v_eff"] = helpers.velocity_log_fit(ds["v_eff_nofill"], depth, dist_shore, dim="quantile") elif fill_method == "log_interp": dist_wall = self.camera_config.get_dist_wall(x, y, z, self.h_a) ds["v_eff"] = helpers.velocity_log_interp(ds["v_eff_nofill"], dist_wall, dim="quantile") elif fill_method == "interpolate": depth = ds.zcoords*0 + self.camera_config.get_depth(ds.zcoords, self.h_a) # interpolate gaps in between known values ds["v_eff"] = ds["v_eff_nofill"].interpolate_na(dim="points") # anywhere where depth == 0, remove values ds["v_eff"] = ds["v_eff"].where(depth > 0) # fill NAs with zeros ds["v_eff"] = ds["v_eff"].fillna(0.) # compute q for both non-filled and filled velocities ds["q_nofill"] = helpers.depth_integrate(depth, ds["v_eff_nofill"], v_corr=v_corr, name="q_nofill") ds["q"] = helpers.depth_integrate(depth, ds["v_eff"], v_corr=v_corr, name="q") return ds
plot = utils.UncachedAccessor(_Transect_PlotMethods)