"""Transect module for pyorc."""
import numpy as np
import xarray as xr
from shapely import geometry
from xarray.core import utils
from pyorc import helpers
from .cross_section import CrossSection
from .orcbase import ORCBase
from .plot import _Transect_PlotMethods
[docs]@xr.register_dataset_accessor("transect")
class Transect(ORCBase):
"""Transect functionalities that can be applied on ``xarray.Dataset``."""
[docs] def __init__(self, xarray_obj):
"""Initialize a transect ``xarray.Dataset`` containing cross-sectional velocities in [time, points] dimensions.
Parameters
----------
xarray_obj: xr.Dataset
transect data fields (from ``pyorc.Velocimetry.get_transect``)
"""
super(Transect, self).__init__(xarray_obj)
@property
def cross_section(self):
"""Return cross-sectional coordinates as `CrossSection` object."""
if not hasattr(self._obj, "zcoords"):
return None
coords = [[_x, _y, _z] for _x, _y, _z in zip(self._obj.xcoords, self._obj.ycoords, self._obj.zcoords)]
return CrossSection(camera_config=self.camera_config, cross_section=coords)
@property
def wetted_surface_polygon(self) -> geometry.MultiPolygon:
"""Return wetted surface as `shapely.geometry.MultiPolygon` object."""
return self.cross_section.get_wetted_surface_sz(self.h_a)
@property
def wetted_perimeter_linestring(self) -> geometry.MultiLineString:
"""Return wetted perimeter as `shapely.geometry.MultiLineString` object."""
return self.cross_section.get_wetted_surface_sz(self.h_a, perimeter=True)
@property
def wetted_surface(self) -> float:
"""Return wetted surface as float."""
return self.wetted_surface_polygon.area
@property
def wetted_perimeter(self) -> float:
"""Return wetted perimeter as float."""
return self.wetted_perimeter_linestring.length
[docs] def vector_to_scalar(self, v_x="v_x", v_y="v_y"):
"""Set "v_eff" and "v_dir" variables as effective velocities over cross-section, and its angle.
Parameters
----------
v_x : str, optional
name of variable containing x-directional velocities. (default: "v_x")
v_y :
name of variable containing y-directional velocities. (default: "v_y")
Returns
-------
da : xr.DataArray
velocities perpendicular to cross section [time, points]
"""
# compute per velocity vector in the dataset, what its angle is
v_angle = np.arctan2(self._obj[v_x], self._obj[v_y])
# compute the scalar value of velocity
v_scalar = (self._obj[v_x] ** 2 + self._obj[v_y] ** 2) ** 0.5
# compute difference in angle between velocity and perpendicular of cross section
flow_dir = self._obj["v_dir"]
angle_diff = v_angle - flow_dir
# compute effective velocity in the flow direction (i.e. perpendicular to cross section)
v_eff = np.cos(angle_diff) * v_scalar
v_eff.attrs = {
"standard_name": "velocity",
"long_name": "velocity in perpendicular direction of cross section, measured by angle in radians, "
"measured from up-direction",
"units": "m s-1",
}
# set name
v_eff.name = "v_eff_nofill" # there still may be gaps in this series
self._obj["v_eff_nofill"] = v_eff
[docs] def get_bottom_surface_z_perspective(self, h, sample_size=1000, interval=None):
"""Return densified bottom and surface points, warped to image perspective."""
# get bottom coordinates
bottom_points = self.get_transect_perspective(within_image=True)
# get surface coordinates
surface_points = self.get_transect_perspective(h=h, within_image=True)
# densify points, to ensure zero water level crossings are captured
bottom_points = helpers.densify_points(bottom_points, sample_size=sample_size)
surface_points = helpers.densify_points(surface_points, sample_size=sample_size)
# also densify to the same amount with zcoords
z_points = helpers.densify_points(self._obj.zcoords, sample_size=sample_size)
if interval is not None:
# only sample every interval-th point
bottom_points = bottom_points[::interval]
surface_points = surface_points[::interval]
z_points = z_points[::interval]
# understand which points are below surface
z_surface = h - self.camera_config.gcps["h_ref"] + self.camera_config.gcps["z_0"]
mask = z_points < z_surface
# filter bottom and surface
bottom_points = np.array(bottom_points)[mask]
surface_points = np.array(surface_points)[mask]
return bottom_points, surface_points
[docs] def get_transect_perspective(self, h=None, within_image=True):
"""Get row, col locations of the transect coordinates.
Parameters
----------
h : float, optional
Water level (measured locally) used to calculate surface coordinates. If not provided, bottom coordinates.
are used.
within_image : bool, optional
If True (default), removes points outside the camera objective.
Returns
-------
numpy.ndarray
array of projected transect points based on the camera configuration and at given water level (if provided).
"""
x = self._obj.xcoords
y = self._obj.ycoords
if h is not None:
z_surface = h - self.camera_config.gcps["h_ref"] + self.camera_config.gcps["z_0"]
z = np.ones(len(x)) * z_surface
# retrieve coordinates at the surface
else:
z = self._obj.zcoords # retrieve the bottom coordinates
points = [[_x, _y, _z] for _x, _y, _z in zip(x, y, z)]
# ensure that y coordinates are in the right direction
points_proj = self.camera_config.project_points(points, within_image=within_image, swap_y_coords=True)
return points_proj
[docs] def get_depth_perspective(self, h, sample_size=1000, interval=25):
"""Get line (x, y) pairs that show the depth over several intervals in the wetted part of the cross section.
Parameters
----------
h : float
The water level with which the depth perspective needs to be calculated.
sample_size : int, optional
The number of samples to create by interpolating the cross section (default is 1000).
interval : int, optional
The interval between extended samples (default is 25).
Returns
-------
List of (x, y) tuple pairs.
Each tuple pair defines one perspective depth line.
"""
bottom_points, surface_points = self.get_bottom_surface_z_perspective(
h=h, sample_size=sample_size, interval=interval
)
# make line pairs
return list(zip(bottom_points, surface_points))
[docs] def get_v_surf(self, v_name="v_eff"):
"""Compute mean surface velocity in locations that are below water level.
Parameters
----------
v_name : str, optional
name of variable where surface velocities [m s-1] are stored (Default value = "v_eff")
Returns
-------
xr.DataArray
mean surface velocities for all provided quantiles or time steps
"""
## Mean velocity over entire profile
z_a = self.camera_config.h_to_z(self.h_a)
depth = z_a - self._obj.zcoords
depth[depth < 0] = 0.0
# ds.transect.camera_config.get_depth(ds.zcoords, ds.transect.h_a)
wet_scoords = self._obj.scoords[depth > 0].values
if len(wet_scoords) == 0:
# no wet points found. Velocity can only be missing
v_av = np.nan
if len(wet_scoords) > 1:
velocity_int = self._obj[v_name].fillna(0.0).integrate(coord="scoords") # m2/s
width = (wet_scoords[-1] + (wet_scoords[-1] - wet_scoords[-2]) * 0.5) - (
wet_scoords[0] - (wet_scoords[1] - wet_scoords[0]) * 0.5
)
v_av = velocity_int / width
else:
v_av = self._obj[v_name][:, depth > 0]
return v_av
[docs] def get_v_bulk(self, q_name="q"):
"""Compute the bulk velocity.
Parameters
----------
q_name : str, optional
name of variable where depth integrated velocities [m2 s-1] are stored (Default value = "q")
Returns
-------
xr.DataArray
bulk velocities for all provided quantiles or time steps
"""
discharge = self._obj[q_name].fillna(0.0).integrate(coord="scoords")
wet_surf = self.wetted_surface
v_bulk = discharge / wet_surf
return v_bulk
[docs] def get_river_flow(self, q_name="q", discharge_name="river_flow"):
"""Integrate time series of depth averaged velocities [m2 s-1] into cross-section integrated flow [m3 s-1].
Depth average velocities must first have been estimated using get_q. A variable "Q" will be added to Dataset
with only "quantiles" as dimension.
Parameters
----------
q_name : str, optional
name of variable where depth integrated velocities [m2 s-1] are stored (Default value = "q")
discharge_name : str, optional
name of variable where resulting width integrated river flow estimates must be stored
(Default value = "river_flow")
"""
if "q" not in self._obj:
raise ValueError(
f'Dataset must contain variable "{q_name}", which is the depth-integrated velocity [m2 s-1], '
"perpendicular to cross-section. Create this with ds.transect.get_q"
)
# integrate over the distance coordinates (s-coord)
discharge = self._obj[q_name].fillna(0.0).integrate(coord="scoords")
discharge.attrs = {
"standard_name": "river_discharge",
"long_name": "River Flow",
"units": "m3 s-1",
}
# set name
discharge.name = "Q"
self._obj[discharge_name] = discharge
[docs] def get_q(self, v_corr=0.9, fill_method="zeros"):
"""Integrated velocity over depth for quantiles of time series.
A correction `v_corr` between surface velocity and depth-average velocity is used.
Parameters
----------
v_corr : float, optional
correction factor (default: 0.9)
fill_method : str, optional
method to fill missing values. "zeros" fills NaNS with zeros, "interpolate" interpolates values
from nearest neighbour, "log_interp" interpolates values linearly with velocities scaled by the log of
depth over a roughness length, "log_fit" fits a 4-parameter logarithmic profile with depth and with
changing velocities towards banks on known velocities, and fills missing with the fitted relationship
(experimental) (Default value = "zeros").
Returns
-------
ds : xr.Dataset
Transect for selected quantiles in time, with "q_nofill" containing the integrated values, and "q" the
integrated values, filled with chosen fill method.
"""
# aggregate to a limited set of quantiles
assert fill_method in [
"zeros",
"log_fit",
"log_interp",
"interpolate",
], f'fill_method must be "zeros", "log_fit", "log_interp", or "interpolate", instead "{fill_method}" given'
ds = self._obj
x = ds["xcoords"].values
y = ds["ycoords"].values
z = ds["zcoords"].values
# add filled surface velocities with a logarithmic profile curve fit
depth = self.camera_config.get_depth(z, self.h_a)
# make velocities zero where depth is zero
ds["v_eff_nofill"][:, depth <= 0] = 0.0
if fill_method == "zeros":
ds["v_eff"] = ds["v_eff_nofill"].fillna(0.0)
elif fill_method == "log_fit":
dist_shore = self.camera_config.get_dist_shore(x, y, z, self.h_a)
ds["v_eff"] = helpers.velocity_log_fit(ds["v_eff_nofill"], depth, dist_shore, dim="quantile")
elif fill_method == "log_interp":
dist_wall = self.camera_config.get_dist_wall(x, y, z, self.h_a)
ds["v_eff"] = helpers.velocity_log_interp(ds["v_eff_nofill"], dist_wall, dim="quantile")
elif fill_method == "interpolate":
depth = ds.zcoords * 0 + self.camera_config.get_depth(ds.zcoords, self.h_a)
# interpolate gaps in between known values
ds["v_eff"] = ds["v_eff_nofill"].interpolate_na(dim="points")
# anywhere where depth == 0, remove values
ds["v_eff"] = ds["v_eff"].where(depth > 0)
# fill NAs with zeros
ds["v_eff"] = ds["v_eff"].fillna(0.0)
# compute q for both non-filled and filled velocities
ds["q_nofill"] = helpers.depth_integrate(depth, ds["v_eff_nofill"], v_corr=v_corr, name="q_nofill")
ds["q"] = helpers.depth_integrate(depth, ds["v_eff"], v_corr=v_corr, name="q")
return ds
plot = utils.UncachedAccessor(_Transect_PlotMethods)