diff --git a/mikeio/dfsu/_layered.py b/mikeio/dfsu/_layered.py index 4038b4a40..2c6e14199 100644 --- a/mikeio/dfsu/_layered.py +++ b/mikeio/dfsu/_layered.py @@ -1,8 +1,6 @@ from __future__ import annotations from pathlib import Path -from typing import Any, Collection -from functools import wraps -import warnings +from typing import Any, Collection, Sequence, Tuple import numpy as np from mikecore.DfsuFile import DfsuFile, DfsuFileType @@ -92,18 +90,18 @@ def n_z_layers(self): def read( self, *, - items=None, - time=None, + items: str | int | Sequence[str | int] | None = None, + time: int | str | slice | None = None, elements: Collection[int] | None = None, - area=None, - x=None, - y=None, - z=None, - layers=None, - keepdims=False, - dtype=np.float32, - error_bad_data=True, - fill_bad_data_value=np.nan, + area: Tuple[float, float, float, float] | None = None, + x: float | None = None, + y: float | None = None, + z: float | None = None, + layers: int | str | Sequence[int] | None = None, + keepdims: bool = False, + dtype: Any = np.float32, + error_bad_data: bool = True, + fill_bad_data_value: float = np.nan, ) -> Dataset: """ Read data from a dfsu file @@ -121,9 +119,12 @@ def read( Read only data inside (horizontal) area given as a bounding box (tuple with left, lower, right, upper) or as list of coordinates for a polygon, by default None - x, y, z: float, optional - Read only data for elements containing the (x,y,z) points(s), - by default None + x: float, optional + Read only data for elements containing the (x,y,z) points(s) + y: float, optional + Read only data for elements containing the (x,y,z) points(s) + z: float, optional + Read only data for elements containing the (x,y,z) points(s) layers: int, str, list[int], optional Read only data for specific layers, by default None elements: list[int], optional @@ -142,13 +143,7 @@ def read( if dtype not in [np.float32, np.float64]: raise ValueError("Invalid data type. Choose np.float32 or np.float64") - # Open the dfs file for reading - # self._read_dfsu_header(self._filename) dfs = DfsuFile.Open(self._filename) - # time may have changes since we read the header - # (if engine is continuously writing to this file) - # TODO: add more checks that this is actually still the same file - # (could have been replaced in the meantime) single_time_selected, time_steps = _valid_timesteps(dfs, time) @@ -166,19 +161,16 @@ def read( elements = list(elements) n_elems = len(elements) geometry = self.geometry.elements_to_geometry(elements) - if self.is_layered: # and items[0].name == "Z coordinate": - node_ids, _ = self.geometry._get_nodes_and_table_for_elements(elements) - n_nodes = len(node_ids) + node_ids, _ = self.geometry._get_nodes_and_table_for_elements(elements) + n_nodes = len(node_ids) item_numbers = _valid_item_numbers( dfs.ItemInfo, items, ignore_first=self.is_layered ) items = _get_item_info(dfs.ItemInfo, item_numbers, ignore_first=self.is_layered) - if self.is_layered: - # we need the zn item too - item_numbers = [it + 1 for it in item_numbers] - if hasattr(geometry, "is_layered") and geometry.is_layered: - item_numbers.insert(0, 0) + item_numbers = [it + 1 for it in item_numbers] + if layered_data := hasattr(geometry, "is_layered") and geometry.is_layered: + item_numbers.insert(0, 0) n_items = len(item_numbers) deletevalue = self.deletevalue @@ -188,9 +180,7 @@ def read( n_steps = len(time_steps) item0_is_node_based = False for item in range(n_items): - # Initialize an empty data block - if hasattr(geometry, "is_layered") and geometry.is_layered and item == 0: - # and items[item].name == "Z coordinate": + if layered_data and item == 0: item0_is_node_based = True data: np.ndarray = np.ndarray(shape=(n_steps, n_nodes), dtype=dtype) else: @@ -244,7 +234,7 @@ def read( dims = tuple([d for d in dims if d != "element"]) data_list = [np.squeeze(d, axis=-1) for d in data_list] - if hasattr(geometry, "is_layered") and geometry.is_layered: + if layered_data: return Dataset( data_list[1:], # skip zn item time, @@ -260,8 +250,6 @@ def read( ) def _parse_geometry_sel(self, area, layers, x, y, z): - elements = None - if ( (x is not None) or (y is not None) @@ -269,18 +257,11 @@ def _parse_geometry_sel(self, area, layers, x, y, z): or (layers is not None) ): elements = self.geometry.find_index(x=x, y=y, z=z, area=area, layers=layers) - - if ( - (x is not None) - or (y is not None) - or (layers is not None) - or (area is not None) - ): - # selection was attempted if (elements is None) or len(elements) == 0: raise ValueError("No elements in selection!") + return elements - return elements + return None class Dfsu2DV(DfsuLayered): @@ -335,11 +316,6 @@ def plot_vertical_profile( class Dfsu3D(DfsuLayered): - @wraps(GeometryFM3D.to_2d_geometry) - def to_2d_geometry(self): - warnings.warn("Deprecated. Use geometry2d instead", FutureWarning) - return self.geometry.geometry2d - @property def geometry2d(self): """The 2d geometry for a 3d object""" diff --git a/mikeio/spatial/FM_geometry.py b/mikeio/spatial/FM_geometry.py deleted file mode 100644 index b7e5a8f19..000000000 --- a/mikeio/spatial/FM_geometry.py +++ /dev/null @@ -1,4 +0,0 @@ -from ._FM_geometry import GeometryFM2D - - -GeometryFM = GeometryFM2D # for backward compatibility diff --git a/mikeio/spatial/_FM_geometry.py b/mikeio/spatial/_FM_geometry.py index b8145f561..47f89ceda 100644 --- a/mikeio/spatial/_FM_geometry.py +++ b/mikeio/spatial/_FM_geometry.py @@ -261,6 +261,38 @@ def __init__( if reindex: self._reindex() + def _calc_element_coordinates(self, maxnodes: int = 4) -> np.ndarray: + element_table = self.element_table + + n_elements = len(element_table) + ec = np.empty([n_elements, 3]) + + # pre-allocate for speed + idx = np.zeros(maxnodes, dtype=int) + xcoords = np.zeros([maxnodes, n_elements]) + ycoords = np.zeros([maxnodes, n_elements]) + zcoords = np.zeros([maxnodes, n_elements]) + nnodes_per_elem = np.zeros(n_elements) + + for j in range(n_elements): + nodes = element_table[j] + nnodes = len(nodes) + nnodes_per_elem[j] = nnodes + for i in range(nnodes): + idx[i] = nodes[i] # - 1 + + x, y, z = self.node_coordinates[idx[:nnodes]].T + + xcoords[:nnodes, j] = x + ycoords[:nnodes, j] = y + zcoords[:nnodes, j] = z + + ec[:, 0] = np.sum(xcoords, axis=0) / nnodes_per_elem + ec[:, 1] = np.sum(ycoords, axis=0) / nnodes_per_elem + ec[:, 2] = np.sum(zcoords, axis=0) / nnodes_per_elem + + return ec + def _check_elements( self, element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray], @@ -463,39 +495,6 @@ def _tree2d(self) -> cKDTree: xy = self.element_coordinates[:, :2] return cKDTree(xy) - def _calc_element_coordinates(self) -> np.ndarray: - element_table = self.element_table - - n_elements = len(element_table) - ec = np.empty([n_elements, 3]) - - # pre-allocate for speed - maxnodes = 4 - idx = np.zeros(maxnodes, dtype=int) - xcoords = np.zeros([maxnodes, n_elements]) - ycoords = np.zeros([maxnodes, n_elements]) - zcoords = np.zeros([maxnodes, n_elements]) - nnodes_per_elem = np.zeros(n_elements) - - for j in range(n_elements): - nodes = element_table[j] - nnodes = len(nodes) - nnodes_per_elem[j] = nnodes - for i in range(nnodes): - idx[i] = nodes[i] # - 1 - - x, y, z = self.node_coordinates[idx[:nnodes]].T - - xcoords[:nnodes, j] = x - ycoords[:nnodes, j] = y - zcoords[:nnodes, j] = z - - ec[:, 0] = np.sum(xcoords, axis=0) / nnodes_per_elem - ec[:, 1] = np.sum(ycoords, axis=0) / nnodes_per_elem - ec[:, 2] = np.sum(zcoords, axis=0) / nnodes_per_elem - - return ec - def find_nearest_elements( self, x: float | np.ndarray, diff --git a/mikeio/spatial/_FM_geometry_layered.py b/mikeio/spatial/_FM_geometry_layered.py index 193c41ef5..36f9b7513 100644 --- a/mikeio/spatial/_FM_geometry_layered.py +++ b/mikeio/spatial/_FM_geometry_layered.py @@ -1,9 +1,9 @@ from __future__ import annotations from functools import cached_property -from typing import Collection, Sequence, List +from typing import Any, Collection, Iterable, Literal, Sequence, List, Tuple import numpy as np -from mikecore.DfsuFile import DfsuFileType # type: ignore +from mikecore.DfsuFile import DfsuFileType from ._FM_geometry import GeometryFM2D, _GeometryFM, _GeometryFMPlotter @@ -13,22 +13,24 @@ from ._utils import _relative_cumulative_distance +Layer = Literal["all", "bottom", "top"] + class _GeometryFMLayered(_GeometryFM): def __init__( self, *, - node_coordinates, - element_table, - codes=None, + node_coordinates: np.ndarray, + element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray], + codes: np.ndarray | None = None, projection: str = "LONG/LAT", - dfsu_type=DfsuFileType.Dfsu3DSigma, - element_ids=None, - node_ids=None, + dfsu_type: DfsuFileType = DfsuFileType.Dfsu3DSigma, + element_ids: np.ndarray | None = None, + node_ids: np.ndarray | None = None, n_layers: int = 1, # at least 1 layer - n_sigma=None, - validate=True, - reindex=False, + n_sigma: int | None = None, + validate: bool = True, + reindex: bool = False, ) -> None: super().__init__( @@ -44,13 +46,13 @@ def __init__( ) self._n_layers = n_layers - self._n_sigma: int = n_sigma if n_sigma is not None else n_layers + self._n_sigma = n_sigma if n_sigma is not None else n_layers # Lazy properties - self._bot_elems = None - self._e2_e3_table = None - self._2d_ids = None - self._layer_ids = None + self._bot_elems: np.ndarray | None = None + self._e2_e3_table: np.ndarray | None = None + self._2d_ids: np.ndarray | None = None + self._layer_ids: np.ndarray | None = None def __repr__(self) -> str: return ( @@ -63,18 +65,21 @@ def __repr__(self) -> str: ) @cached_property - def geometry2d(self): + def geometry2d(self) -> GeometryFM2D: return self.to_2d_geometry() - def isel(self, idx: Collection[int], keepdims=False, **kwargs): + def isel( + self, idx: Collection[int], keepdims: bool = False, **kwargs: Any + ) -> GeometryFM3D | GeometryPoint3D | GeometryFM2D | GeometryFMVerticalColumn: - return self.elements_to_geometry( - elements=idx, node_layers=None, keepdims=keepdims - ) + return self.elements_to_geometry(elements=idx, keepdims=keepdims) def elements_to_geometry( - self, elements: int | Collection[int], node_layers="all", keepdims=False - ): + self, + elements: int | Collection[int], + node_layers: Layer = "all", + keepdims: bool = False, + ) -> GeometryFM3D | GeometryPoint3D | GeometryFM2D | GeometryFMVerticalColumn: sel_elements: List[int] if isinstance(elements, (int, np.integer)): @@ -98,7 +103,8 @@ def elements_to_geometry( n_layers = len(unique_layer_ids) if n_layers > 1: - elem_bot = self.get_layer_elements("bottom") + bottom: Layer = "bottom" + elem_bot = self.get_layer_elements(layers=bottom) if np.all(np.in1d(sorted_elements, elem_bot)): n_layers = 1 @@ -108,7 +114,7 @@ def elements_to_geometry( ) and n_layers == 1: new_type = DfsuFileType.Dfsu2D - if n_layers == 1 and node_layers in ("all", None): + if n_layers == 1 and node_layers == "all": node_layers = "bottom" # extract information for selected elements @@ -157,7 +163,7 @@ def elements_to_geometry( ) else: klass = self.__class__ - return klass( + return klass( # type: ignore node_coordinates=node_coords, codes=codes, node_ids=node_ids, @@ -171,45 +177,15 @@ def elements_to_geometry( ) @cached_property - def element_coordinates(self): + def element_coordinates(self) -> np.ndarray: """Center coordinates of each element""" - return self._calc_element_coordinates() - - def _calc_element_coordinates(self): - - node_coordinates = self.node_coordinates - - element_table = self.element_table - - n_elements = len(element_table) - ec = np.empty([n_elements, 3]) - - # pre-allocate for speed - maxnodes = 4 if self.is_2d else 8 - idx = np.zeros(maxnodes, dtype=int) - xcoords = np.zeros([maxnodes, n_elements]) - ycoords = np.zeros([maxnodes, n_elements]) - zcoords = np.zeros([maxnodes, n_elements]) - nnodes_per_elem = np.zeros(n_elements) + return self._calc_element_coordinates(maxnodes=8) - for j in range(n_elements): - nodes = element_table[j] - nnodes = len(nodes) - nnodes_per_elem[j] = nnodes - for i in range(nnodes): - idx[i] = nodes[i] # - 1 - - xcoords[:nnodes, j] = node_coordinates[idx[:nnodes], 0] - ycoords[:nnodes, j] = node_coordinates[idx[:nnodes], 1] - zcoords[:nnodes, j] = node_coordinates[idx[:nnodes], 2] - - ec[:, 0] = np.sum(xcoords, axis=0) / nnodes_per_elem - ec[:, 1] = np.sum(ycoords, axis=0) / nnodes_per_elem - ec[:, 2] = np.sum(zcoords, axis=0) / nnodes_per_elem - - return ec - - def _get_nodes_and_table_for_elements(self, elements, node_layers="all"): + def _get_nodes_and_table_for_elements( + self, + elements: Collection[int] | np.ndarray, + node_layers: Layer = "all", + ) -> Tuple[Any, Any]: """list of nodes and element table for a list of elements Parameters @@ -228,14 +204,12 @@ def _get_nodes_and_table_for_elements(self, elements, node_layers="all"): element table with a list of nodes for each element """ elem_tbl = np.empty(len(elements), dtype=np.dtype("O")) - if (node_layers is None) or (node_layers == "all") or self.is_2d: + if (node_layers == "all") or self.is_2d: for j, eid in enumerate(elements): elem_tbl[j] = np.asarray(self.element_table[eid]) else: # 3D => 2D - if (node_layers != "bottom") and (node_layers != "top"): - raise ValueError("node_layers must be either all, bottom or top") for j, eid in enumerate(elements): elem_nodes = np.asarray(self.element_table[eid]) nn = len(elem_nodes) @@ -246,10 +220,10 @@ def _get_nodes_and_table_for_elements(self, elements, node_layers="all"): elem_nodes = elem_nodes[halfn:] elem_tbl[j] = elem_nodes - nodes = np.unique(np.hstack(elem_tbl)) + nodes = np.unique(np.hstack(elem_tbl)) # type: ignore return nodes, elem_tbl - def to_2d_geometry(self): + def to_2d_geometry(self) -> GeometryFM2D: """extract 2d geometry from 3d geometry Returns @@ -365,7 +339,7 @@ def _elements_in_area(self, area): return np.array([], dtype=int) @staticmethod - def _find_top_layer_elements(elementTable): + def _find_top_layer_elements(elementTable: np.ndarray) -> np.ndarray: """ Find element indices (zero based) of the elements being the upper-most element in its column. @@ -397,13 +371,6 @@ def _find_top_layer_elements(elementTable): topLayerElments.append(i) continue - if len(elmt1) % 2 != 0: - raise Exception( - "In a layered mesh, each element must have an even number of elements (element index " - + i - + ")" - ) - # Number of nodes in a 2D element elmt2DSize = len(elmt1) // 2 @@ -429,7 +396,7 @@ def _find_top_layer_elements(elementTable): return np.array(topLayerElments, dtype=np.int32) @cached_property - def n_layers_per_column(self): + def n_layers_per_column(self) -> np.ndarray: """List of number of layers for each column""" top_elems = self.top_elements @@ -441,11 +408,11 @@ def n_layers_per_column(self): return n_layers_column @cached_property - def bottom_elements(self): + def bottom_elements(self) -> np.ndarray: """List of 3d element ids of bottom layer""" return self.top_elements - self.n_layers_per_column + 1 - def get_layer_elements(self, layers, layer=None): + def get_layer_elements(self, layers: int | Layer | Iterable[int]) -> np.ndarray: """3d element ids for one (or more) specific layer(s) Parameters @@ -460,9 +427,9 @@ def get_layer_elements(self, layers, layer=None): element ids """ if isinstance(layers, str): - if layers in ("surface", "top"): + if layers == "top": return self.top_elements - elif layers in ("bottom"): + elif layers == "bottom": return self.bottom_elements else: raise ValueError( @@ -470,17 +437,20 @@ def get_layer_elements(self, layers, layer=None): ) if not np.isscalar(layers): + assert isinstance(layers, Iterable) elem_ids = [] for layer in layers: + assert isinstance(layer, int) elem_ids.append(self.get_layer_elements(layer)) elem_ids = np.concatenate(elem_ids, axis=0) return np.sort(elem_ids) n_lay = self.n_layers + assert isinstance(layers, int) if layers < (-n_lay) or layers >= n_lay: raise Exception( - f"Layer {layers} not allowed; must be between -{n_lay} and {n_lay-1}" + f"Layer {layers!r} not allowed; must be between -{n_lay} and {n_lay-1}" ) if layers < 0: @@ -489,7 +459,7 @@ def get_layer_elements(self, layers, layer=None): return self._element_ids[self.layer_ids == layers] @property - def e2_e3_table(self): + def e2_e3_table(self) -> np.ndarray: """The 2d-to-3d element connectivity table for a 3d object""" # e2_e3, 2d_ids and layer_ids are all set at the same time @@ -502,7 +472,7 @@ def e2_e3_table(self): return self._e2_e3_table @property - def elem2d_ids(self): + def elem2d_ids(self) -> np.ndarray: """The associated 2d element id for each 3d element""" if self._2d_ids is None: res = self._get_2d_to_3d_association() @@ -511,7 +481,7 @@ def elem2d_ids(self): self._layer_ids = res[2] return self._2d_ids - def _get_2d_to_3d_association(self): + def _get_2d_to_3d_association(self) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: e2_to_e3 = ( [] ) # for each 2d element: the corresponding 3d element ids from bot to top @@ -648,17 +618,17 @@ class GeometryFM3D(_GeometryFMLayered): def __init__( self, *, - node_coordinates, - element_table, - codes=None, + node_coordinates: np.ndarray, + element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray], + codes: np.ndarray | None = None, projection: str = "LONG/LAT", - dfsu_type=DfsuFileType.Dfsu3DSigma, - element_ids=None, - node_ids=None, + dfsu_type: DfsuFileType = DfsuFileType.Dfsu3DSigma, + element_ids: np.ndarray | None = None, + node_ids: np.ndarray | None = None, n_layers: int = 1, # at least 1 layer - n_sigma=None, - validate=True, - reindex=False, + n_sigma: int | None = None, + validate: bool = True, + reindex: bool = False, ) -> None: super().__init__( node_coordinates=node_coordinates, @@ -679,7 +649,7 @@ def __init__( def boundary_polylines(self): return self.geometry2d.boundary_polylines - def contains(self, points) -> Sequence[bool]: + def contains(self, points) -> np.ndarray: return self.geometry2d.contains(points) def to_mesh(self, outfilename): @@ -728,17 +698,17 @@ def find_index(self, x=None, y=None, z=None, coords=None, area=None, layers=None class GeometryFMVerticalProfile(_GeometryFMLayered): def __init__( self, - node_coordinates, - element_table, - codes=None, + node_coordinates: np.ndarray, + element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray], + codes: np.ndarray | None = None, projection: str = "LONG/LAT", - dfsu_type=None, - element_ids=None, - node_ids=None, - n_layers: int = 1, - n_sigma=None, - validate=True, - reindex=False, + dfsu_type: DfsuFileType = DfsuFileType.Dfsu3DSigma, + element_ids: np.ndarray | None = None, + node_ids: np.ndarray | None = None, + n_layers: int = 1, # at least 1 layer + n_sigma: int | None = None, + validate: bool = True, + reindex: bool = False, ) -> None: super().__init__( node_coordinates=node_coordinates, @@ -761,7 +731,7 @@ def relative_element_distance(self): nc0 = self.node_coordinates[0, :2] return _relative_cumulative_distance(ec, nc0, is_geo=self.is_geo) - def get_nearest_relative_distance(self, coords) -> float: + def get_nearest_relative_distance(self, coords: Tuple[float, float]) -> float: """For a point near a transect, find the nearest relative distance for showing position on transect plot. @@ -828,17 +798,17 @@ class GeometryFMVerticalColumn(GeometryFM3D): # TODO: add plotter - def calc_ze(self, zn=None): + def calc_ze(self, zn: np.ndarray | None = None) -> np.ndarray: if zn is None: zn = self.node_coordinates[:, 2] return self._calc_z_using_idx(zn, self._idx_e) - def calc_zf(self, zn=None): + def calc_zf(self, zn: np.ndarray | None = None) -> np.ndarray: if zn is None: zn = self.node_coordinates[:, 2] return self._calc_z_using_idx(zn, self._idx_f) - def _calc_zee(self, zn=None): + def _calc_zee(self, zn: np.ndarray | None = None) -> np.ndarray: ze = self.calc_ze(zn) zf = self.calc_zf(zn) if ze.ndim == 1: @@ -849,7 +819,9 @@ def _calc_zee(self, zn=None): (zf[:, 0].reshape((-1, 1)), ze, zf[:, -1].reshape((-1, 1))) ) - def _interp_values(self, zn, data, z): + def _interp_values( + self, zn: np.ndarray, data: np.ndarray, z: np.ndarray + ) -> np.ndarray: """Interpolate to other z values, allow linear extrapolation""" from scipy.interpolate import interp1d # type: ignore @@ -865,7 +837,7 @@ def _interp_values(self, zn, data, z): return dati @property - def _idx_f(self): + def _idx_f(self) -> np.ndarray: nnodes_half = int(len(self.element_table[0]) / 2) n_vfaces = self.n_elements + 1 idx_f = np.zeros((n_vfaces, nnodes_half), dtype=int) @@ -875,7 +847,7 @@ def _idx_f(self): return idx_f @property - def _idx_e(self): + def _idx_e(self) -> np.ndarray: nnodes_per_elem = len(self.element_table[0]) idx_e = np.zeros((self.n_elements, nnodes_per_elem), dtype=int) @@ -886,7 +858,7 @@ def _idx_e(self): return idx_e - def _calc_z_using_idx(self, zn, idx): + def _calc_z_using_idx(self, zn: np.ndarray, idx: np.ndarray) -> np.ndarray: if zn.ndim == 1: zf = zn[idx].mean(axis=1) elif zn.ndim == 2: diff --git a/tests/test_dfsu.py b/tests/test_dfsu.py index 8647a2b05..5b597993f 100644 --- a/tests/test_dfsu.py +++ b/tests/test_dfsu.py @@ -667,10 +667,6 @@ def test_geometry_2d(): dfs = mikeio.open(filename) - with pytest.warns(FutureWarning, match="geometry2d"): - geom = dfs.to_2d_geometry() - assert geom.is_2d - g2 = dfs.geometry2d assert g2.is_2d