diff --git a/atlite/__init__.py b/atlite/__init__.py index 20099d30..d7ea98f0 100644 --- a/atlite/__init__.py +++ b/atlite/__init__.py @@ -31,6 +31,7 @@ # e.g. "0.17.1" or "0.17.1.dev4+ga3890dc0" (if installed from git) __version__ = version("atlite") +__version__ = 0.15.0 # e.g. "0.17.0" # TODO, in the network structure it should use the dev version match = re.match(r"(\d+\.\d+(\.\d+)?)", __version__) assert match, f"Could not determine release_version of pypsa: {__version__}" diff --git a/atlite/aggregate.py b/atlite/aggregate.py index e3d3b3a4..8a6bcb9a 100644 --- a/atlite/aggregate.py +++ b/atlite/aggregate.py @@ -27,3 +27,102 @@ def aggregate_matrix(da, matrix, index): else: da = da.stack(spatial=("y", "x")).transpose("spatial", "time") return xr.DataArray(matrix * da, [index, da.coords["time"]]) + + +def aggregate_gridcells(da, matrix, index, freq=None, agg="sum"): + """ + Resample and aggregate the data array `da` based on the specified frequency + and aggregation method using a matrix for spatial aggregation. + + Parameters: + ----------- + da : xarray.DataArray + The data array to process, typically containing time-series data + with spatial dimensions ("y", "x"). + matrix : sparse matrix or equivalent + Sparse matrix used to map spatial grid cells to aggregated regions + (e.g., buses or zones). + index : pandas.Index or xarray.Index + Index corresponding to the aggregated regions (e.g., bus IDs). + freq : str, optional + Resampling frequency string (e.g., 'D' for daily, 'M' for monthly). + If None, no resampling is applied. + agg : str, optional + Aggregation method to apply. Options are "mean" or "sum" + (default is "sum"). + + Returns: + -------- + da : xarray.DataArray + The aggregated data array after spatial and temporal processing. + layout : xarray.DataArray + A DataArray representing the layout or weight of each grid cell in the + aggregation process, aligned with the output dimensions. + """ + # Ensure the `index` has a name; if not, assign a default name. + if index.name is None: + index = index.rename("dim_0") + + # Check if the input data uses Dask for lazy evaluation. + if isinstance(da.data, dask.array.core.Array): + # Stack the spatial dimensions ("y", "x") into a single "spatial" dimension for efficient processing. + da = da.stack(spatial=("y", "x")) + da = da.chunk({"spatial": -1}) # Optimize chunking along the "spatial" dimension. + + # Convert the sparse matrix into a dense DataArray for compatibility with xarray operations. + layout = xr.DataArray( + matrix.toarray(), + dims=(index.name, "spatial"), + coords={index.name: index, "spatial": da["spatial"]}, + ) + + # Transpose the layout so that the "spatial" dimension comes first for alignment with `da`. + layout = layout.transpose("spatial", index.name) + + # Expand `da` to include the `index.name` dimension for matrix multiplication. + da = da.expand_dims({index.name: layout[index.name]}) + + # Perform element-wise multiplication of `da` and `layout` for spatial aggregation. + da = da * layout + else: + # For non-Dask arrays, follow a similar process without lazy evaluation. + da = da.stack(spatial=("y", "x")).transpose("spatial", "time") + + layout = xr.DataArray( + matrix.toarray(), + dims=(index.name, "spatial"), + coords={index.name: index, "spatial": da["spatial"]}, + ) + layout = layout.transpose("spatial", index.name) + + # Element-wise multiplication for spatial aggregation. + da = xr.DataArray(da * layout) + + # Unstack the "spatial" dimension back into the original "y" and "x" dimensions. + da = da.unstack("spatial") + layout = layout.unstack("spatial") + + # Resample the data if a frequency is provided. + if freq is not None: + da = da.resample(time=freq) # Resample along the "time" dimension. + + # Apply the specified aggregation method during resampling. + if agg == "mean": + da = da.mean("time", keep_attrs=True) + elif agg == "sum": + da = da.sum("time", keep_attrs=True) + else: + # Raise an error if an invalid aggregation method is specified. + raise ValueError( + f"Invalid aggregation method '{agg}' for frequency '{freq}'. " + "Use 'mean' or 'sum' instead." + ) + else: + # If no frequency is provided, apply aggregation directly over time. + if agg == "mean": + da = da.mean("time", keep_attrs=True) + elif agg == "sum": + da = da.sum("time", keep_attrs=True) + # If `agg` is None or unsupported, leave the data unmodified. + + return da, layout \ No newline at end of file diff --git a/atlite/convert.py b/atlite/convert.py index ce3dd75c..11eb4b13 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -27,7 +27,7 @@ from atlite import csp as cspm from atlite import hydro as hydrom from atlite import wind as windm -from atlite.aggregate import aggregate_matrix +from atlite.aggregate import aggregate_matrix, aggregate_gridcells from atlite.gis import spdiag from atlite.pv.irradiation import TiltedIrradiation from atlite.pv.orientation import SurfaceOrientation, get_orientation @@ -58,10 +58,10 @@ def convert_and_aggregate( shapes_crs=4326, per_unit=False, return_capacity=False, - capacity_factor=False, - capacity_factor_timeseries=False, + per_gridcell=False, show_progress=False, dask_kwargs={}, + agg_kwargs={'freq': None, 'agg': 'sum'}, **convert_kwds, ): """ @@ -74,136 +74,189 @@ def convert_and_aggregate( Parameters ---------- - matrix : N x S - xr.DataArray or sp.sparse.csr_matrix or None - If given, it is used to aggregate the grid cells to buses. - N is the number of buses, S the number of spatial coordinates, in the - order of `cutout.grid`. - index : pd.Index - Index of Buses. - layout : X x Y - xr.DataArray - The capacity to be build in each of the `grid_cells`. - shapes : list or pd.Series of shapely.geometry.Polygon - If given, matrix is constructed as indicatormatrix of the polygons, its - index determines the bus index on the time-series. - shapes_crs : pyproj.CRS or compatible - If different to the map crs of the cutout, the shapes are - transformed to match cutout.crs (defaults to EPSG:4326). - per_unit : boolean - Returns the time-series in per-unit units, instead of in MW (defaults - to False). - return_capacity : boolean - Additionally returns the installed capacity at each bus corresponding - to `layout` (defaults to False). - capacity_factor : boolean - If True, the static capacity factor of the chosen resource for each - grid cell is computed. - capacity_factor_timeseries : boolean - If True, the capacity factor time series of the chosen resource for - each grid cell is computed. - show_progress : boolean, default False - Whether to show a progress bar. + cutout : atlite.Cutout + Input cutout data containing weather-based grid-cell data. + convert_func : function + Function to convert raw weather data into renewable energy + generation (e.g., wind or solar power). + matrix : N x S xr.DataArray or sp.sparse.csr_matrix or None, optional + Aggregation matrix for mapping grid cells (S) to buses (N). + index : pd.Index, optional + Index corresponding to buses. + layout : X x Y xr.DataArray, optional + Installed capacity at each grid cell. + shapes : list or pd.Series of shapely.geometry.Polygon, optional + Polygons defining bus regions for aggregation. + shapes_crs : pyproj.CRS or compatible, default 4326 + Coordinate reference system of the shapes. + per_unit : bool, default False + Return results in per-unit (capacity factor) rather than MW. + return_capacity : bool, default False + If True, also return installed capacity at each bus. + per_gridcell : bool, default False + If True, compute generation per grid cell as well as bus region. + The capacity factor timeseries is returend when using per_unit = True + and agg_kwargs = {'freq': None, 'agg': None} + The static capacity factor is returned when using per_unit = True and + agg_kwargs = {'freq': None, 'agg': 'mean'} + show_progress : bool, default False + Show a progress bar during computation. dask_kwargs : dict, default {} - Dict with keyword arguments passed to `dask.compute`. - - Other Parameters - ---------------- - convert_func : Function - Callback like convert_wind, convert_pv - + Keyword arguments passed to `dask.compute`. + agg_kwargs : dict, default {'freq': None, 'agg': 'sum'} + Options for aggregation: resampling frequency (`freq`) and + aggregation method (`agg`). + **convert_kwds : dict + Additional arguments passed to `convert_func`. Returns ------- - resource : xr.DataArray - Time-series of renewable generation aggregated to buses, if - `matrix` or equivalents are provided else the total sum of - generated energy. - units : xr.DataArray (optional) - The installed units per bus in MW corresponding to `layout` - (only if `return_capacity` is True). - + results : xr.DataArray + Renewable generation time series aggregated to buses or per grid cell. + capacity : xr.DataArray (optional) + Installed capacity in MW at each bus (if `return_capacity` is True). """ + + # Log the type of conversion being performed (e.g., "wind" or "pv"). func_name = convert_func.__name__.replace("convert_", "") logger.info(f"Convert and aggregate '{func_name}'.") + + # Generate renewable generation data (e.g., wind/solar) using the provided function. da = convert_func(cutout.data, **convert_kwds) + # Check if no aggregation parameters (layout, shapes, matrix) are provided. no_args = all(v is None for v in [layout, shapes, matrix]) - if no_args: - if per_unit or return_capacity: - raise ValueError( - "One of `matrix`, `shapes` and `layout` must be " - "given for `per_unit` or `return_capacity`" - ) - if capacity_factor or capacity_factor_timeseries: - if capacity_factor_timeseries: - res = da.rename("capacity factor") - else: - res = da.mean("time").rename("capacity factor") - res.attrs["units"] = "p.u." - return maybe_progressbar(res, show_progress, **dask_kwargs) - else: - res = da.sum("time", keep_attrs=True) - return maybe_progressbar(res, show_progress, **dask_kwargs) + logger.info("No capacity information provided. " + "Assuming one MW installed capacity per grid cell.") + + # Create a default layout of 1 MW installed capacity for each grid cell. + layout = xr.DataArray( + np.ones((cutout.data.sizes['y'], cutout.data.sizes['x'])), + dims=('y', 'x'), + coords={ + 'y': cutout.data.coords['y'], + 'x': cutout.data.coords['x'] + } + ) + + # Prepare the aggregation matrix and index, ensuring compatibility with layout and shapes. + matrix, index = prepare_matrix_and_index(cutout, matrix, index, shapes, layout, shapes_crs) + + # Compute the installed capacity for each bus. + caps = matrix.sum(-1) # Sum contributions of grid cells to each bus. + capacity = xr.DataArray(np.asarray(caps).flatten(), [index]) + capacity.attrs["units"] = "MW" + + # Handle per-gridcell computations. + if per_gridcell: + # Aggregate data to grid cells and bus regions based on the matrix and index. + results, layout = aggregate_gridcells(da, matrix=matrix, index=index, **agg_kwargs) + results.attrs["units"] = "MW" + + # If per-unit is requested, normalize by installed capacity. + if per_unit: + results = (results / layout.where(layout != 0)).fillna(0.0) + results.attrs["units"] = "p.u." + + else: + # Aggregate data to buses using the aggregation matrix. + results = aggregate_matrix(da, matrix=matrix, index=index) + results.attrs["units"] = "MW" - if matrix is not None: - if shapes is not None: - raise ValueError( - "Passing matrix and shapes is ambiguous. Pass " "only one of them." - ) + # If per-unit is requested, normalize by bus capacity. + if per_unit: + results = (results / capacity.where(capacity != 0)).fillna(0.0) + results.attrs["units"] = "p.u." - if isinstance(matrix, xr.DataArray): - coords = matrix.indexes.get(matrix.dims[1]).to_frame(index=False) - if not np.array_equal(coords[["x", "y"]], cutout.grid[["x", "y"]]): - raise ValueError( - "Matrix spatial coordinates not aligned with cutout spatial " - "coordinates." - ) + # Include installed capacity in the return value if requested. + if return_capacity: + results = (maybe_progressbar(results, show_progress, **dask_kwargs), capacity) + else: + results = maybe_progressbar(results, show_progress, **dask_kwargs) - if index is None: - index = matrix + # Return results (and optionally capacity). + return results - if not matrix.ndim == 2: - raise ValueError("Matrix not 2-dimensional.") - matrix = csr_matrix(matrix) +def prepare_matrix_and_index(cutout, matrix=None, index=None, shapes=None, layout=None, shapes_crs=4326): + """ + Helper function to prepare and validate the aggregation matrix based on provided `matrix`, `shapes`, or `layout`. + + Parameters: + ----------- + cutout : xarray.DataArray + The source data array containing spatial grid information. + matrix : xr.DataArray, csr_matrix, or None + Aggregation matrix aligning grid cells to spatial points (like buses). + shapes : list, pd.Series, or gpd.GeoDataFrame + List of spatial shapes (polygons) for aggregation. + layout : xr.DataArray or None + Layout of installed capacity across grid cells. + shapes_crs : int, default 4326 + CRS for shapes if provided. + + Returns: + -------- + csr_matrix + Prepared sparse matrix for aggregation. + """ + # Ensure only one of `matrix` or `shapes` is provided + if matrix is not None and shapes is not None: + raise ValueError("Passing both `matrix` and `shapes` is ambiguous. Pass only one of them.") + + + # Prepare matrix if it's given as an xr.DataArray and validate alignment with cutout + if isinstance(matrix, xr.DataArray): + coords = matrix.indexes.get(matrix.dims[1]).to_frame(index=False) + if not np.array_equal(coords[["x", "y"]], cutout.grid[["x", "y"]]): + raise ValueError("Matrix spatial coordinates not aligned with cutout grid coordinates.") + + if index is None: + index = matrix + if shapes is not None: geoseries_like = (pd.Series, gpd.GeoDataFrame, gpd.GeoSeries) if isinstance(shapes, geoseries_like) and index is None: index = shapes.index - - matrix = cutout.indicatormatrix(shapes, shapes_crs) - + + # Create matrix from shapes + if matrix is None: + matrix = cutout.indicatormatrix(shapes, shapes_crs) + + + # Apply layout as weights if layout is provided if layout is not None: assert isinstance(layout, xr.DataArray) layout = layout.reindex_like(cutout.data).stack(spatial=["y", "x"]) + # Create matrix from shapes if matrix is None: - matrix = csr_matrix(layout.expand_dims("new")) + matrix = layout.expand_dims("new") else: - matrix = csr_matrix(matrix) * spdiag(layout) - - # From here on, matrix is defined and ensured to be a csr matrix. + # Check that layout is 1-dimensional and matches matrix shape + if layout.ndim != 1: + raise ValueError("`layout` must be 1-dimensional after stacking to apply as a diagonal.") + if layout.size != matrix.shape[1]: + raise ValueError("`layout` size must match the number of columns in `matrix` for diagonal application.") + + matrix = matrix * spdiag(layout) + + + # Check that matrix is 2-dimensional + if matrix.ndim != 2: + raise ValueError("Matrix must be 2-dimensional.") + + # Convert matrix to csr_matrix + # From here on matrix is defined + matrix = csr_matrix(matrix) + + # From here on index is defined if index is None: index = pd.RangeIndex(matrix.shape[0]) - - results = aggregate_matrix(da, matrix=matrix, index=index) - - if per_unit or return_capacity: - caps = matrix.sum(-1) - capacity = xr.DataArray(np.asarray(caps).flatten(), [index]) - capacity.attrs["units"] = "MW" - - if per_unit: - results = (results / capacity.where(capacity != 0)).fillna(0.0) - results.attrs["units"] = "p.u." - else: - results.attrs["units"] = "MW" - - if return_capacity: - return maybe_progressbar(results, show_progress, **dask_kwargs), capacity - else: - return maybe_progressbar(results, show_progress, **dask_kwargs) + + return matrix, index def maybe_progressbar(ds, show_progress, **kwargs): diff --git a/test/test_preparation_and_conversion.py b/test/test_preparation_and_conversion.py index 01127548..a7884fd7 100644 --- a/test/test_preparation_and_conversion.py +++ b/test/test_preparation_and_conversion.py @@ -66,7 +66,12 @@ def pv_test(cutout, time=TIME, skip_optimal_sum_test=False): Compare optimal orientation with flat orientation. """ orientation = {"slope": 0.0, "azimuth": 0.0} - cap_factor = cutout.pv(atlite.resource.solarpanels.CdTe, orientation) + cap_factor = cutout.pv(atlite.resource.solarpanels.CdTe, + orientation, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'sum'}) + cap_factor = cap_factor.loc[0] assert cap_factor.notnull().all() assert cap_factor.sum() > 0 @@ -97,7 +102,12 @@ def pv_test(cutout, time=TIME, skip_optimal_sum_test=False): assert all(cap_per_region.round(3) == capacity.round(3)) # Now compare with optimal orienation - cap_factor_opt = cutout.pv(atlite.resource.solarpanels.CdTe, "latitude_optimal") + cap_factor_opt = cutout.pv(atlite.resource.solarpanels.CdTe, + "latitude_optimal", + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'sum'}) + cap_factor_opt = cap_factor_opt.loc[0] if not skip_optimal_sum_test: assert cap_factor_opt.sum() > cap_factor.sum() @@ -145,14 +155,21 @@ def pv_tracking_test(cutout): cap_factor = cutout.pv( atlite.resource.solarpanels.CSi, orientation, - capacity_factor=True, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} ) + cap_factor = cap_factor.loc[0] + cap_factor_tracking_0axis = cutout.pv( atlite.resource.solarpanels.CSi, orientation, tracking=None, - capacity_factor=True, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} ) + cap_factor_tracking_0axis = cap_factor_tracking_0axis.loc[0] assert cap_factor_tracking_0axis.notnull().all() assert cap_factor_tracking_0axis.sum() > 0 @@ -163,29 +180,41 @@ def pv_tracking_test(cutout): atlite.resource.solarpanels.CSi, orientation, tracking="horizontal", - capacity_factor=True, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} ) + cap_factor_tracking_1axis_h = cap_factor_tracking_1axis_h.loc[0] cap_factor_tracking_1axis_th = cutout.pv( atlite.resource.solarpanels.CSi, orientation, tracking="tilted_horizontal", - capacity_factor=True, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} ) + cap_factor_tracking_1axis_th = cap_factor_tracking_1axis_th.loc[0] cap_factor_tracking_1axis_v = cutout.pv( atlite.resource.solarpanels.CSi, orientation, tracking="vertical", - capacity_factor=True, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} ) + cap_factor_tracking_1axis_v = cap_factor_tracking_1axis_v.loc[0] cap_factor_tracking_2axis = cutout.pv( atlite.resource.solarpanels.CSi, orientation, tracking="dual", - capacity_factor=True, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} ) + cap_factor_tracking_2axis = cap_factor_tracking_2axis.loc[0] assert cap_factor_tracking_1axis_v.notnull().all() assert cap_factor_tracking_1axis_v.sum() > 0 @@ -212,7 +241,11 @@ def csp_test(cutout): and technologies. """ ## Test technology = "solar tower" - st = cutout.csp(atlite.cspinstallations.SAM_solar_tower, capacity_factor=True) + st = cutout.csp(atlite.cspinstallations.SAM_solar_tower, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} + ) assert st.notnull().all() assert (st >= 0).all() @@ -224,7 +257,11 @@ def csp_test(cutout): assert (st <= ll).all() ## Test technology = "parabolic trough" - pt = cutout.csp(atlite.cspinstallations.SAM_parabolic_trough, capacity_factor=True) + pt = cutout.csp(atlite.cspinstallations.SAM_parabolic_trough, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'mean'} + ) assert pt.notnull().all() assert (pt >= 0).all() @@ -281,7 +318,11 @@ def wind_test(cutout): lower production than a layout proportionally to the capacity layout squared. """ - cap_factor = cutout.wind(atlite.windturbines.Enercon_E101_3000kW) + cap_factor = cutout.wind(atlite.windturbines.Enercon_E101_3000kW, + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'sum'}) + cap_factor = cap_factor.loc[0] assert cap_factor.notnull().all() assert cap_factor.sum() > 0 @@ -325,7 +366,11 @@ def runoff_test(cutout): lower sites (mostly at sea level) should have a smaller capacity factor total and production. """ - cap_factor = cutout.runoff() + cap_factor = cutout.runoff(per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'sum'}) + cap_factor = cap_factor.loc[0] + assert cap_factor.notnull().all() assert cap_factor.sum() > 0 @@ -372,11 +417,17 @@ def coefficient_of_performance_test(cutout): """ Test the coefficient_of_performance function. """ - cap_factor = cutout.coefficient_of_performance(source="air") + cap_factor = cutout.coefficient_of_performance(source="air", + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'sum'}) assert cap_factor.notnull().all() assert cap_factor.sum() > 0 - cap_factor = cutout.coefficient_of_performance(source="soil") + cap_factor = cutout.coefficient_of_performance(source="soil", + per_gridcell=True, + per_unit=True, + agg_kwargs={'freq': None, 'agg': 'sum'}) assert cap_factor.notnull().all() assert cap_factor.sum() > 0