-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'develop' into intersect
- Loading branch information
Showing
15 changed files
with
608 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import pandas as pd | ||
import portfolyo as pf | ||
from portfolyo.core.shared import concat | ||
|
||
|
||
def get_idx( | ||
startdate: str, starttime: str, tz: str, freq: str, enddate: str | ||
) -> pd.DatetimeIndex: | ||
# Empty index. | ||
if startdate is None: | ||
return pd.DatetimeIndex([], freq=freq, tz=tz) | ||
# Normal index. | ||
ts_start = pd.Timestamp(f"{startdate} {starttime}", tz=tz) | ||
ts_end = pd.Timestamp(f"{enddate} {starttime}", tz=tz) | ||
return pd.date_range(ts_start, ts_end, freq=freq, inclusive="left") | ||
|
||
|
||
index = pd.date_range("2020", "2024", freq="QS", inclusive="left") | ||
# index2 = pd.date_range("2023", "2025", freq="QS", inclusive="left") | ||
# pfl = pf.dev.get_flatpfline(index) | ||
# pfl2 = pf.dev.get_flatpfline(index2) | ||
# print(pfl) | ||
# print(pfl2) | ||
|
||
# pfs = pf.dev.get_pfstate(index) | ||
|
||
# pfs2 = pf.dev.get_pfstate(index2) | ||
# pfl3 = concat.general(pfl, pfl2) | ||
# print(pfl3) | ||
|
||
# print(index) | ||
# print(index2) | ||
|
||
whole_pfl = pf.dev.get_nestedpfline(index) | ||
pfl_a = whole_pfl.slice[:"2021"] | ||
|
||
pfl_b = whole_pfl.slice["2021":"2022"] | ||
pfl_c = whole_pfl.slice["2022":] | ||
result = concat.concat_pflines(pfl_a, pfl_b, pfl_c) | ||
result2 = concat.concat_pflines(pfl_b, pfl_c, pfl_a) | ||
print(result) | ||
print(result2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
# import pandas as pd | ||
# import portfolyo as pf | ||
from __future__ import annotations | ||
from typing import Iterable | ||
import pandas as pd | ||
from portfolyo import tools | ||
|
||
from ..pfstate import PfState | ||
from ..pfline.enums import Structure | ||
|
||
from ..pfline import PfLine, create | ||
from .. import pfstate | ||
|
||
|
||
def general(pfl_or_pfs: Iterable[PfLine | PfState]) -> None: | ||
""" | ||
Based on passed parameters calls either concat_pflines() or concat_pfstates(). | ||
Parameters | ||
---------- | ||
pfl_or_pfs: Iterable[PfLine | PfState] | ||
The input values. Can be either a list of Pflines or PfStates to concatenate. | ||
Returns | ||
------- | ||
None | ||
Notes | ||
----- | ||
Input portfolio lines must contain compatible information, i.e., same frequency, | ||
timezone, start-of-day, and kind. Their indices must be gapless and without overlap. | ||
For nested pflines, the number and names of their children must match; concatenation | ||
is done on a name-by-name basis. | ||
Concatenation returns the same result regardless of input order. | ||
""" | ||
if all(isinstance(item, PfLine) for item in pfl_or_pfs): | ||
return concat_pflines(pfl_or_pfs) | ||
elif all(isinstance(item, PfState) for item in pfl_or_pfs): | ||
return concat_pfstates(pfl_or_pfs) | ||
else: | ||
raise NotImplementedError( | ||
"Concatenation is implemented only for PfState or PfLine." | ||
) | ||
|
||
|
||
def concat_pflines(pfls: Iterable[PfLine]) -> PfLine: | ||
""" | ||
Concatenate porfolyo lines along their index. | ||
Parameters | ||
---------- | ||
pfls: Iterable[PfLine] | ||
The input values. | ||
Returns | ||
------- | ||
PfLine | ||
Concatenated version of PfLines. | ||
Notes | ||
----- | ||
Input portfolio lines must contain compatible information, i.e., same frequency, | ||
timezone, start-of-day, and kind. Their indices must be gapless and without overlap. | ||
For nested pflines, the number and names of their children must match; concatenation | ||
is done on a name-by-name basis. | ||
Concatenation returns the same result regardless of input order. | ||
""" | ||
if len(pfls) < 2: | ||
raise NotImplementedError( | ||
"Cannot perform operation with less than 2 portfolio lines." | ||
) | ||
if len({pfl.kind for pfl in pfls}) != 1: | ||
raise TypeError("Not possible to concatenate PfLines of different kinds.") | ||
if len({pfl.index.freq for pfl in pfls}) != 1: | ||
raise TypeError("Not possible to concatenate PfLines of different frequencies.") | ||
if len({pfl.index.tz for pfl in pfls}) != 1: | ||
raise TypeError("Not possible to concatenate PfLines of different time zones.") | ||
if len({tools.startofday.get(pfl.index, "str") for pfl in pfls}) != 1: | ||
raise TypeError( | ||
"Not possible to concatenate PfLines of different start_of_day." | ||
) | ||
# we can concatenate only pflines of the same type: nested of flat | ||
# with this test and check whether pfls are the same types and they have the same number of children | ||
if len({pfl.structure for pfl in pfls}) != 1: | ||
raise TypeError("Not possible to concatenate PfLines of different structures.") | ||
if pfls[0].structure is Structure.NESTED: | ||
child_names = pfls[0].children.keys() | ||
for pfl in pfls: | ||
diffs = set(child_names) ^ set(pfl.children.keys()) | ||
if len(diffs) != 0: | ||
raise TypeError( | ||
"Not possible to concatenate PfLines with different children names." | ||
) | ||
# If we reach here, all pfls have same kind, same number and names of children. | ||
|
||
# concat(a,b) and concat(b,a) should give the same result: | ||
sorted_pfls = sorted(pfls, key=lambda pfl: pfl.index[0]) | ||
if pfls[0].structure is Structure.FLAT: | ||
# create flat dataframe of parent | ||
dataframes_flat = [pfl.df for pfl in sorted_pfls] | ||
# concatenate dataframes into one | ||
concat_data = pd.concat(dataframes_flat, axis=0) | ||
try: | ||
# Call create.flatpfline() and catch any ValueError | ||
return create.flatpfline(concat_data) | ||
except ValueError as e: | ||
# Handle the error | ||
raise ValueError( | ||
"Error by creating PfLine. PfLine is either not gapless or has overlaps" | ||
) from e | ||
child_data = {} | ||
child_names = pfls[0].children.keys() | ||
for cname in child_names: | ||
# for every name in children need to concatenate elements | ||
child_values = [pfl.children[cname] for pfl in sorted_pfls] | ||
child_data[cname] = concat_pflines(child_values) | ||
|
||
# create pfline from dataframes: -> | ||
# call the constructor of pfl to check check gaplesnes and overplap | ||
return create.nestedpfline(child_data) | ||
|
||
|
||
def concat_pfstates(pfss: Iterable[PfState]) -> PfState: | ||
""" | ||
Concatenate porfolyo states along their index. | ||
Parameters | ||
---------- | ||
pfss: Iterable[PfState] | ||
The input values. | ||
Returns | ||
------- | ||
PfState | ||
Concatenated version of PfStates. | ||
""" | ||
if len(pfss) < 2: | ||
print("Concatenate needs at least two elements.") | ||
return | ||
offtakevolume = concat_pflines([pfs.offtakevolume for pfs in pfss]) | ||
sourced = concat_pflines([pfs.sourced for pfs in pfss]) | ||
unsourcedprice = concat_pflines([pfs.unsourcedprice for pfs in pfss]) | ||
return pfstate.PfState(offtakevolume, unsourcedprice, sourced) |
File renamed without changes.
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
"""Test different error cases for concatenation of PfStates and PfLines.""" | ||
|
||
import pandas as pd | ||
import pytest | ||
|
||
|
||
from portfolyo import dev | ||
from portfolyo.core.pfline.enums import Kind | ||
from portfolyo.core.pfstate.pfstate import PfState | ||
from portfolyo.core.shared import concat | ||
|
||
|
||
def test_general(): | ||
"""Test if concatenating PfLine with PfState raises error.""" | ||
index = pd.date_range("2020", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024", "2025", freq="QS", inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
pfs = dev.get_pfstate(index2) | ||
with pytest.raises(NotImplementedError): | ||
_ = concat.general([pfl, pfs]) | ||
|
||
|
||
def test_diff_freq(): | ||
"""Test if concatenating of two flat PfLines with different freq raises error.""" | ||
index = pd.date_range("2020", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024", "2025", freq="AS", inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
pfl2 = dev.get_flatpfline(index2) | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_diff_sod(): | ||
"""Test if concatenating of two flat PfLines with different sod raises error.""" | ||
index = pd.date_range("2020-01-01 00:00", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024-01-01 06:00", "2025", freq="QS", inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
pfl2 = dev.get_flatpfline(index2) | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_slice_not_sod(): | ||
"""Test if concatenating of two flat PfLines with different sod raises error.""" | ||
index = pd.date_range("2020-01-01 00:00", "2020-03-01", freq="H", inclusive="left") | ||
index2 = pd.date_range( | ||
"2020-02-01 06:00", "2020-04-01 06:00", freq="H", inclusive="left" | ||
) | ||
pfl_a = dev.get_flatpfline(index) | ||
pfl_b = dev.get_flatpfline(index2) | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pflines([pfl_a, pfl_b]) | ||
|
||
|
||
def test_diff_tz(): | ||
"""Test if concatenating of two flat PfLines with different tz raises error.""" | ||
index = pd.date_range( | ||
"2020-01-01", "2024", freq="QS", tz="Europe/Berlin", inclusive="left" | ||
) | ||
index2 = pd.date_range("2024-01-01", "2025", freq="QS", tz=None, inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
pfl2 = dev.get_flatpfline(index2) | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_diff_kind(): | ||
"""Test if concatenating of two flat PfLines with different kind raises error.""" | ||
index = pd.date_range("2020-01-01", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024-01-01", "2025", freq="QS", inclusive="left") | ||
pfl = dev.get_flatpfline(index, kind=Kind.COMPLETE) | ||
pfl2 = dev.get_flatpfline(index2, kind=Kind.VOLUME) | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_app_lenght(): | ||
"""Test if concatenatination raises error if we pass only one parameter.""" | ||
index = pd.date_range("2020-01-01", "2024", freq="QS", inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
with pytest.raises(NotImplementedError): | ||
_ = concat.concat_pflines([pfl]) | ||
|
||
|
||
def test_concat_with_overlap(): | ||
"""Test if concatenatination raises error if there is overlap in indices of PfLines.""" | ||
index = pd.date_range("2020-01-01", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2020-01-01", "2023", freq="QS", inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
pfl2 = dev.get_flatpfline(index2) | ||
with pytest.raises(ValueError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_concat_with_gaps(): | ||
"""Test if concatenatination raises error if there is a gap in indices of PfLines.""" | ||
index = pd.date_range("2020-01-01", "2023", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024-01-01", "2025", freq="QS", inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
pfl2 = dev.get_flatpfline(index2) | ||
with pytest.raises(ValueError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_concat_children(): | ||
"""Test if concatenating of flat PfLine with nested PfLine raises error.""" | ||
index = pd.date_range("2020-01-01", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024-01-01", "2025", freq="QS", inclusive="left") | ||
pfl = dev.get_flatpfline(index) | ||
pfl2 = dev.get_nestedpfline(index2) | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_concat_diff_children(): | ||
"""Test if concatenating of two nested PfLines with different children raises error.""" | ||
index = pd.date_range("2020-01-01", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024-01-01", "2025", freq="QS", inclusive="left") | ||
pfl = dev.get_nestedpfline(index) | ||
pfl2 = dev.get_nestedpfline(index2).drop_child(name="a") | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pflines([pfl, pfl2]) | ||
|
||
|
||
def test_concat_pfss(): | ||
"""Test if concatenating of Pfstate with "nested" PfState | ||
(meaning that offtakevolume, sourced and unsourcedprice are nested Pflines) raises error. | ||
""" | ||
index = pd.date_range("2020-01-01", "2024", freq="QS", inclusive="left") | ||
index2 = pd.date_range("2024-01-01", "2025", freq="QS", inclusive="left") | ||
pfs1 = dev.get_pfstate(index) | ||
offtakevolume = dev.get_nestedpfline(index2, kind=Kind.VOLUME) | ||
sourced = dev.get_nestedpfline(index2, kind=Kind.COMPLETE) | ||
unsourcedprice = dev.get_nestedpfline(index2, kind=Kind.PRICE) | ||
pfs2 = PfState(offtakevolume, unsourcedprice, sourced) | ||
with pytest.raises(TypeError): | ||
_ = concat.concat_pfstates([pfs1, pfs2]) |
Oops, something went wrong.