Skip to content

Commit

Permalink
Merge pull request #836 from davidhassell/dask-in-cfdm
Browse files Browse the repository at this point in the history
Get core Dask functionality from `cfdm`
  • Loading branch information
davidhassell authored Jan 15, 2025
2 parents 0f2c702 + 582a076 commit 9e29106
Show file tree
Hide file tree
Showing 49 changed files with 1,297 additions and 7,722 deletions.
8 changes: 6 additions & 2 deletions Changelog.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version NEXTVERSION
-------------------

**2024-??-??**
**2024-12-??**

* Allow ``'nearest_dtos'`` 2-d regridding to work with discrete
sampling geometry source grids
Expand All @@ -23,6 +23,8 @@ version NEXTVERSION
* New class `cf.NetCDF4Array`
* New class `cf.CFAH5netcdfArray`
* New class `cf.CFANetCDF4Array`
* Replace core `dask` functionality with that imported from `cfdm`
(https://github.com/NCAS-CMS/cf-python/issues/839)
* Fix bug that sometimes puts an incorrect ``radian-1`` or
``radian-2`` in the returned units of the differential operator
methods and functions
Expand All @@ -41,9 +43,11 @@ version NEXTVERSION
(https://github.com/NCAS-CMS/cf-python/issues/828)
* New dependency: ``h5netcdf>=1.3.0``
* New dependency: ``h5py>=3.10.0``
* New dependency: ``s3fs>=2024.2.0``
* New dependency: ``s3fs>=2024.6.0``
* Changed dependency: ``numpy>=1.15,<2.0``
* Changed dependency: ``1.11.2.0<=cfdm<1.11.3.0``
* Changed dependency: ``cfunits>=3.3.7``
* Changed dependency: ``dask>=2024.6.0,<=2024.7.1``

----

Expand Down
34 changes: 26 additions & 8 deletions cf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
raise ImportError(_error0 + str(error1))

try:
import numpy
import numpy as np
except ImportError as error1:
raise ImportError(_error0 + str(error1))

Expand Down Expand Up @@ -191,10 +191,11 @@

# Check the version of numpy
_minimum_vn = "1.22"
if Version(numpy.__version__) < Version(_minimum_vn):
raise RuntimeError(
f"Bad numpy version: cf requires numpy>={_minimum_vn}. "
f"Got {numpy.__version__} at {numpy.__file__}"
_maximum_vn = "2.0"
if not Version(_minimum_vn) <= Version(np.__version__) < Version(_maximum_vn):
raise ValueError(
"Bad numpy version: cf requires _minimum_vn}<=numpy<{_maximum_vn}. "
f"Got {np.__version__} at {np.__file__}"
)

# Check the version of cfunits
Expand All @@ -208,15 +209,30 @@
# Check the version of cfdm
_minimum_vn = "1.11.2.0"
_maximum_vn = "1.11.3.0"
_cfdm_version = Version(cfdm.__version__)
if not Version(_minimum_vn) <= _cfdm_version < Version(_maximum_vn):
if (
not Version(_minimum_vn)
<= Version(cfdm.__version__)
< Version(_maximum_vn)
):
raise RuntimeError(
f"Bad cfdm version: cf requires {_minimum_vn}<=cfdm<{_maximum_vn}. "
f"Got {_cfdm_version} at {cfdm.__file__}"
f"Got {cfdm.__version__} at {cfdm.__file__}"
)

# Check the version of dask

_minimum_vn = "2024.6.1"
_maximum_vn = "2024.7.1"
if (
not Version(_minimum_vn)
<= Version(dask.__version__)
<= Version(_maximum_vn)
):
raise ValueError(
"Bad dask version: cf requires {_minimum_vn}<=dask<={_maximum_vn}. "
f"Got {dask.__version__} at {dask.__file__}"
)

# Check the version of Python
_minimum_vn = "3.8.0"
if Version(platform.python_version()) < Version(_minimum_vn):
Expand All @@ -233,6 +249,8 @@
f"Got {scipy.__version__} at {scipy.__file__}"
)

del _minimum_vn, _maximum_vn

from .constructs import Constructs

from .mixin import Coordinate
Expand Down
2 changes: 1 addition & 1 deletion cf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
Find the total amount of physical memory (in bytes).
CHUNKSIZE: `int`
The chunk size (in bytes) for data storage and processing.
The Dask chunk size (in bytes). See `cf.chunksize`.
TEMPDIR: `str`
The location to store temporary files. By default it is the
Expand Down
2 changes: 1 addition & 1 deletion cf/data/array/fullarray.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import numpy as np
from cfdm.data.mixin import IndexMixin

from ...functions import indices_shape, parse_indices
from .abstract import Array
from .mixin import IndexMixin

_FULLARRAY_HANDLED_FUNCTIONS = {}

Expand Down
60 changes: 1 addition & 59 deletions cf/data/array/h5netcdfarray.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import cfdm

from ...mixin_container import Container
from .locks import netcdf_lock
from .mixin import ActiveStorageMixin, ArrayMixin, FileArrayMixin, IndexMixin
from .mixin import ActiveStorageMixin, ArrayMixin, FileArrayMixin


class H5netcdfArray(
ActiveStorageMixin,
IndexMixin,
FileArrayMixin,
ArrayMixin,
Container,
Expand All @@ -23,59 +21,3 @@ class H5netcdfArray(
.. versionadded:: NEXTVERSION
"""

def __dask_tokenize__(self):
"""Return a value fully representative of the object.
.. versionadded:: NEXTVERSION
"""
return super().__dask_tokenize__() + (self.get_mask(),)

@property
def _lock(self):
"""Set the lock for use in `dask.array.from_array`.
Returns a lock object because concurrent reads are not
currently supported by the HDF5 library. The lock object will
be the same for all `NetCDF4Array` and `H5netcdfArray`
instances, regardless of the dataset they access, which means
that access to all netCDF and HDF files coordinates around the
same lock.
.. versionadded:: NEXTVERSION
"""
return netcdf_lock

def _get_array(self, index=None):
"""Returns a subspace of the dataset variable.
.. versionadded:: NEXTVERSION
.. seealso:: `__array__`, `index`
:Parameters:
{{index: `tuple` or `None`, optional}}
:Returns:
`numpy.ndarray`
The subspace.
"""
if index is None:
index = self.index()

# We need to lock because the netCDF file is about to be accessed.
self._lock.acquire()

# It's cfdm.H5netcdfArray.__getitem__ that we want to
# call here, but we use 'Container' in super because
# that comes immediately before cfdm.H5netcdfArray in
# the method resolution order.
array = super(Container, self).__getitem__(index)

self._lock.release()
return array
4 changes: 0 additions & 4 deletions cf/data/array/locks.py

This file was deleted.

1 change: 0 additions & 1 deletion cf/data/array/mixin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@
from .cfamixin import CFAMixin
from .compressedarraymixin import CompressedArrayMixin
from .filearraymixin import FileArrayMixin
from .indexmixin import IndexMixin
3 changes: 1 addition & 2 deletions cf/data/array/mixin/cfamixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from itertools import accumulate, product

import numpy as np

from ...utils import chunk_locations, chunk_positions
from cfdm.data.utils import chunk_locations, chunk_positions


class CFAMixin:
Expand Down
3 changes: 1 addition & 2 deletions cf/data/array/mixin/compressedarraymixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ def to_dask_array(self, chunks="auto"):
from functools import partial

import dask.array as da
from cfdm.data.utils import normalize_chunks
from dask import config
from dask.array.core import getter
from dask.base import tokenize

from ...utils import normalize_chunks

name = (f"{self.__class__.__name__}-{tokenize(self)}",)

dtype = self.dtype
Expand Down
Loading

0 comments on commit 9e29106

Please sign in to comment.