Skip to content

Commit

Permalink
Misc improvement for layered dfsu
Browse files Browse the repository at this point in the history
  • Loading branch information
ecomodeller committed Mar 16, 2024
1 parent c31fc27 commit dce0125
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 106 deletions.
78 changes: 27 additions & 51 deletions mikeio/dfsu/_layered.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Collection
from functools import wraps
import warnings
from typing import Any, Collection, Sequence, Tuple

import numpy as np
from mikecore.DfsuFile import DfsuFile, DfsuFileType
Expand Down Expand Up @@ -92,18 +90,18 @@ def n_z_layers(self):
def read(
self,
*,
items=None,
time=None,
items: str | int | Sequence[str | int] | None = None,
time: int | str | slice | None = None,
elements: Collection[int] | None = None,
area=None,
x=None,
y=None,
z=None,
layers=None,
keepdims=False,
dtype=np.float32,
error_bad_data=True,
fill_bad_data_value=np.nan,
area: Tuple[float, float, float, float] | None = None,
x: float | None = None,
y: float | None = None,
z: float | None = None,
layers: int | str | Sequence[int] | None = None,
keepdims: bool = False,
dtype: Any = np.float32,
error_bad_data: bool = True,
fill_bad_data_value: float = np.nan,
) -> Dataset:
"""
Read data from a dfsu file
Expand All @@ -121,9 +119,12 @@ def read(
Read only data inside (horizontal) area given as a
bounding box (tuple with left, lower, right, upper)
or as list of coordinates for a polygon, by default None
x, y, z: float, optional
Read only data for elements containing the (x,y,z) points(s),
by default None
x: float, optional
Read only data for elements containing the (x,y,z) points(s)
y: float, optional
Read only data for elements containing the (x,y,z) points(s)
z: float, optional
Read only data for elements containing the (x,y,z) points(s)
layers: int, str, list[int], optional
Read only data for specific layers, by default None
elements: list[int], optional
Expand All @@ -142,13 +143,7 @@ def read(
if dtype not in [np.float32, np.float64]:
raise ValueError("Invalid data type. Choose np.float32 or np.float64")

# Open the dfs file for reading
# self._read_dfsu_header(self._filename)
dfs = DfsuFile.Open(self._filename)
# time may have changes since we read the header
# (if engine is continuously writing to this file)
# TODO: add more checks that this is actually still the same file
# (could have been replaced in the meantime)

single_time_selected, time_steps = _valid_timesteps(dfs, time)

Expand All @@ -166,19 +161,16 @@ def read(
elements = list(elements)
n_elems = len(elements)
geometry = self.geometry.elements_to_geometry(elements)
if self.is_layered: # and items[0].name == "Z coordinate":
node_ids, _ = self.geometry._get_nodes_and_table_for_elements(elements)
n_nodes = len(node_ids)
node_ids, _ = self.geometry._get_nodes_and_table_for_elements(elements)
n_nodes = len(node_ids)

item_numbers = _valid_item_numbers(
dfs.ItemInfo, items, ignore_first=self.is_layered
)
items = _get_item_info(dfs.ItemInfo, item_numbers, ignore_first=self.is_layered)
if self.is_layered:
# we need the zn item too
item_numbers = [it + 1 for it in item_numbers]
if hasattr(geometry, "is_layered") and geometry.is_layered:
item_numbers.insert(0, 0)
item_numbers = [it + 1 for it in item_numbers]
if layered_data := hasattr(geometry, "is_layered") and geometry.is_layered:
item_numbers.insert(0, 0)
n_items = len(item_numbers)

deletevalue = self.deletevalue
Expand All @@ -188,9 +180,7 @@ def read(
n_steps = len(time_steps)
item0_is_node_based = False
for item in range(n_items):
# Initialize an empty data block
if hasattr(geometry, "is_layered") and geometry.is_layered and item == 0:
# and items[item].name == "Z coordinate":
if layered_data and item == 0:
item0_is_node_based = True
data: np.ndarray = np.ndarray(shape=(n_steps, n_nodes), dtype=dtype)
else:
Expand Down Expand Up @@ -244,7 +234,7 @@ def read(
dims = tuple([d for d in dims if d != "element"])
data_list = [np.squeeze(d, axis=-1) for d in data_list]

if hasattr(geometry, "is_layered") and geometry.is_layered:
if layered_data:
return Dataset(
data_list[1:], # skip zn item
time,
Expand All @@ -260,27 +250,18 @@ def read(
)

def _parse_geometry_sel(self, area, layers, x, y, z):
elements = None

if (
(x is not None)
or (y is not None)
or (area is not None)
or (layers is not None)
):
elements = self.geometry.find_index(x=x, y=y, z=z, area=area, layers=layers)

if (
(x is not None)
or (y is not None)
or (layers is not None)
or (area is not None)
):
# selection was attempted
if (elements is None) or len(elements) == 0:
raise ValueError("No elements in selection!")
return elements

return elements
return None


class Dfsu2DV(DfsuLayered):
Expand Down Expand Up @@ -335,11 +316,6 @@ def plot_vertical_profile(


class Dfsu3D(DfsuLayered):
@wraps(GeometryFM3D.to_2d_geometry)
def to_2d_geometry(self):
warnings.warn("Deprecated. Use geometry2d instead", FutureWarning)
return self.geometry.geometry2d

@property
def geometry2d(self):
"""The 2d geometry for a 3d object"""
Expand Down
4 changes: 0 additions & 4 deletions mikeio/spatial/FM_geometry.py

This file was deleted.

107 changes: 60 additions & 47 deletions mikeio/spatial/_FM_geometry_layered.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations
from functools import cached_property
from typing import Collection, Sequence, List
from typing import Any, Collection, Iterable, Literal, Sequence, List, Tuple

import numpy as np
from mikecore.DfsuFile import DfsuFileType # type: ignore
Expand All @@ -13,22 +13,24 @@

from ._utils import _relative_cumulative_distance

Layer = Literal["all", "bottom", "top"]


class _GeometryFMLayered(_GeometryFM):
def __init__(
self,
*,
node_coordinates,
element_table,
codes=None,
node_coordinates: np.ndarray,
element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray],
codes: np.ndarray | None = None,
projection: str = "LONG/LAT",
dfsu_type=DfsuFileType.Dfsu3DSigma,
element_ids=None,
node_ids=None,
dfsu_type: DfsuFileType = DfsuFileType.Dfsu3DSigma,
element_ids: np.ndarray | None = None,
node_ids: np.ndarray | None = None,
n_layers: int = 1, # at least 1 layer
n_sigma=None,
validate=True,
reindex=False,
n_sigma: int | None = None,
validate: bool = True,
reindex: bool = False,
) -> None:

super().__init__(
Expand Down Expand Up @@ -66,15 +68,18 @@ def __repr__(self) -> str:
def geometry2d(self):
return self.to_2d_geometry()

def isel(self, idx: Collection[int], keepdims=False, **kwargs):
def isel(
self, idx: Collection[int], keepdims: bool = False, **kwargs: Any
) -> GeometryFM3D | GeometryPoint3D | GeometryFM2D | GeometryFMVerticalColumn:

return self.elements_to_geometry(
elements=idx, node_layers=None, keepdims=keepdims
)
return self.elements_to_geometry(elements=idx, keepdims=keepdims)

def elements_to_geometry(
self, elements: int | Collection[int], node_layers="all", keepdims=False
):
self,
elements: int | Collection[int],
node_layers: Layer = "all",
keepdims: bool = False,
) -> GeometryFM3D | GeometryPoint3D | GeometryFM2D | GeometryFMVerticalColumn:
sel_elements: List[int]

if isinstance(elements, (int, np.integer)):
Expand All @@ -98,7 +103,8 @@ def elements_to_geometry(
n_layers = len(unique_layer_ids)

if n_layers > 1:
elem_bot = self.get_layer_elements("bottom")
bottom: Layer = "bottom"
elem_bot = self.get_layer_elements(layers=bottom)
if np.all(np.in1d(sorted_elements, elem_bot)):
n_layers = 1

Expand All @@ -108,7 +114,7 @@ def elements_to_geometry(
) and n_layers == 1:
new_type = DfsuFileType.Dfsu2D

if n_layers == 1 and node_layers in ("all", None):
if n_layers == 1 and node_layers == "all":
node_layers = "bottom"

# extract information for selected elements
Expand Down Expand Up @@ -157,7 +163,7 @@ def elements_to_geometry(
)
else:
klass = self.__class__
return klass(
return klass( # type: ignore
node_coordinates=node_coords,
codes=codes,
node_ids=node_ids,
Expand Down Expand Up @@ -209,7 +215,11 @@ def _calc_element_coordinates(self):

return ec

def _get_nodes_and_table_for_elements(self, elements, node_layers="all"):
def _get_nodes_and_table_for_elements(
self,
elements: Collection[int] | np.ndarray,
node_layers: Layer = "all",
) -> Tuple[Any, Any]:
"""list of nodes and element table for a list of elements
Parameters
Expand All @@ -228,7 +238,7 @@ def _get_nodes_and_table_for_elements(self, elements, node_layers="all"):
element table with a list of nodes for each element
"""
elem_tbl = np.empty(len(elements), dtype=np.dtype("O"))
if (node_layers is None) or (node_layers == "all") or self.is_2d:
if (node_layers == "all") or self.is_2d:
for j, eid in enumerate(elements):
elem_tbl[j] = np.asarray(self.element_table[eid])

Expand All @@ -246,10 +256,10 @@ def _get_nodes_and_table_for_elements(self, elements, node_layers="all"):
elem_nodes = elem_nodes[halfn:]
elem_tbl[j] = elem_nodes

nodes = np.unique(np.hstack(elem_tbl))
nodes = np.unique(np.hstack(elem_tbl)) # type: ignore
return nodes, elem_tbl

def to_2d_geometry(self):
def to_2d_geometry(self) -> GeometryFM2D:
"""extract 2d geometry from 3d geometry
Returns
Expand Down Expand Up @@ -445,7 +455,7 @@ def bottom_elements(self):
"""List of 3d element ids of bottom layer"""
return self.top_elements - self.n_layers_per_column + 1

def get_layer_elements(self, layers, layer=None):
def get_layer_elements(self, layers: int | Layer | Iterable[int]) -> np.ndarray:
"""3d element ids for one (or more) specific layer(s)
Parameters
Expand All @@ -460,27 +470,30 @@ def get_layer_elements(self, layers, layer=None):
element ids
"""
if isinstance(layers, str):
if layers in ("surface", "top"):
if layers == "top":
return self.top_elements
elif layers in ("bottom"):
elif layers == "bottom":
return self.bottom_elements
else:
raise ValueError(
f"layers '{layers}' not recognized ('top', 'bottom' or integer)"
)

if not np.isscalar(layers):
assert isinstance(layers, Iterable)
elem_ids = []
for layer in layers:
assert isinstance(layer, int)
elem_ids.append(self.get_layer_elements(layer))
elem_ids = np.concatenate(elem_ids, axis=0)
return np.sort(elem_ids)

n_lay = self.n_layers

assert isinstance(layers, int)
if layers < (-n_lay) or layers >= n_lay:
raise Exception(
f"Layer {layers} not allowed; must be between -{n_lay} and {n_lay-1}"
f"Layer {layers!r} not allowed; must be between -{n_lay} and {n_lay-1}"
)

if layers < 0:
Expand Down Expand Up @@ -648,17 +661,17 @@ class GeometryFM3D(_GeometryFMLayered):
def __init__(
self,
*,
node_coordinates,
element_table,
codes=None,
node_coordinates: np.ndarray,
element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray],
codes: np.ndarray | None = None,
projection: str = "LONG/LAT",
dfsu_type=DfsuFileType.Dfsu3DSigma,
element_ids=None,
node_ids=None,
dfsu_type: DfsuFileType = DfsuFileType.Dfsu3DSigma,
element_ids: np.ndarray | None = None,
node_ids: np.ndarray | None = None,
n_layers: int = 1, # at least 1 layer
n_sigma=None,
validate=True,
reindex=False,
n_sigma: int | None = None,
validate: bool = True,
reindex: bool = False,
) -> None:
super().__init__(
node_coordinates=node_coordinates,
Expand Down Expand Up @@ -728,17 +741,17 @@ def find_index(self, x=None, y=None, z=None, coords=None, area=None, layers=None
class GeometryFMVerticalProfile(_GeometryFMLayered):
def __init__(
self,
node_coordinates,
element_table,
codes=None,
node_coordinates: np.ndarray,
element_table: np.ndarray | List[Sequence[int]] | List[np.ndarray],
codes: np.ndarray | None = None,
projection: str = "LONG/LAT",
dfsu_type=None,
element_ids=None,
node_ids=None,
n_layers: int = 1,
n_sigma=None,
validate=True,
reindex=False,
dfsu_type: DfsuFileType = DfsuFileType.Dfsu3DSigma,
element_ids: np.ndarray | None = None,
node_ids: np.ndarray | None = None,
n_layers: int = 1, # at least 1 layer
n_sigma: int | None = None,
validate: bool = True,
reindex: bool = False,
) -> None:
super().__init__(
node_coordinates=node_coordinates,
Expand All @@ -761,7 +774,7 @@ def relative_element_distance(self):
nc0 = self.node_coordinates[0, :2]
return _relative_cumulative_distance(ec, nc0, is_geo=self.is_geo)

def get_nearest_relative_distance(self, coords) -> float:
def get_nearest_relative_distance(self, coords: Tuple[float, float]) -> float:
"""For a point near a transect, find the nearest relative distance
for showing position on transect plot.
Expand Down
Loading

0 comments on commit dce0125

Please sign in to comment.