From 2666c1f78ecd6874f9f0d755434a7b5c79c72812 Mon Sep 17 00:00:00 2001 From: Alina Voilova <91426818+Pizza2Pizza@users.noreply.github.com> Date: Tue, 30 Apr 2024 15:26:25 +0200 Subject: [PATCH] Plot children (#87) * changed to poetry * fixed pint problem (groups in unitdef.txt) and pint-pandas dependency problem by fixing pandas to 2.0 * ci pipeline * ci pipeline * ci with poetry * solve dependency problems between pandas and pint-pandas * workaround pint-pandas bug * typo * seaborn pinned version to 0.8 * fixed to_excel function, updated version of pint * fixed arithmatic on pflines without overlap * small changes small changes * more consistent functions to get random pfline * deleted comment * change the python version for push request to 3.11 * test * Removed unused dependency * updated toml * added exception if no clipboard available * updated toml * fixed pfline_excelclipboad.py * changed lock file for pre-commit * install poetry libraries in pre-commit step * use black on all files in pre-commit * updated flake8 version in pre-commit.yaml * exclude .venv folder from flake8 * changed setup.cfg to ignore flake8 error messages * initial commit * first try at plotting children * bar plot with children * created plot_children(),fixed bug with darken * area plots for children stacked on top of each other. for daily it plots only parent * changed the logic of all plot_timeseries_as functions, now based on frequency * created a function to test plotting pfline * created slice attr, wrote tests for it * more flexible intersec function with ignore freq, tz, start_of_day + tests * small changes to intersect * intersect_flex function is finished, more testing with frames needed * finished intersect_flex, more testing with frames needed * added children bool to plot pfstate * added hash function for colors for children * deleted unnecessary test * changed hash function, and width of hline * deleted unnecessary test * plot with children * changed plot_pfstate to work with new logic * tests for plot function * added function to set limits to plot pfstate * fixed strict variable for test cases * Deleted unnecessary files * changed yaml file to exclude python 3.10 with mac 14 --------- Co-authored-by: rwijtvliet Co-authored-by: Alina Voilova --- .github/workflows/ci-on-pullreq.yaml | 5 +- portfolyo/__init__.py | 1 + portfolyo/core/pfline/classes.py | 4 +- portfolyo/core/pfline/concat.py | 0 portfolyo/core/shared/plot.py | 260 ++++++++------ portfolyo/dev/develop.py | 4 +- portfolyo/visualize/__init__.py | 2 - portfolyo/visualize/colors.py | 29 +- portfolyo/visualize/plot.py | 333 +++++++++--------- .../test_pfline_arithmatic_kind_and_error.py | 24 +- tests/visualize/test_plot.py | 79 +++++ 11 files changed, 457 insertions(+), 284 deletions(-) create mode 100644 portfolyo/core/pfline/concat.py create mode 100644 tests/visualize/test_plot.py diff --git a/.github/workflows/ci-on-pullreq.yaml b/.github/workflows/ci-on-pullreq.yaml index 3096bda..ac41777 100644 --- a/.github/workflows/ci-on-pullreq.yaml +++ b/.github/workflows/ci-on-pullreq.yaml @@ -10,7 +10,10 @@ jobs: matrix: os: ["ubuntu-latest", "macos-latest", "windows-latest"] python-version: ["3.10", "3.11", "3.12"] - + exclude: + # Exclude Python 3.10 on macOS latest + - os: "macos-latest" + python-version: "3.10" steps: - name: Checkout source uses: actions/checkout@v2 diff --git a/portfolyo/__init__.py b/portfolyo/__init__.py index 1e48760..25efc59 100644 --- a/portfolyo/__init__.py +++ b/portfolyo/__init__.py @@ -23,6 +23,7 @@ # from .core.shared.concat import general as concat + VOLUME = Kind.VOLUME PRICE = Kind.PRICE REVENUE = Kind.REVENUE diff --git a/portfolyo/core/pfline/classes.py b/portfolyo/core/pfline/classes.py index 740bea2..1408218 100644 --- a/portfolyo/core/pfline/classes.py +++ b/portfolyo/core/pfline/classes.py @@ -8,16 +8,16 @@ import pandas as pd from ... import tools -from ..shared import ExcelClipboardOutput, PfLinePlot, PfLineText from ..ndframelike import NDFrameLike +from ..shared import ExcelClipboardOutput, PfLinePlot, PfLineText from . import ( + children, create, dataframeexport, decorators, flat_methods, nested_methods, prices, - children, ) from .arithmatic import PfLineArithmatic from .enums import Kind, Structure diff --git a/portfolyo/core/pfline/concat.py b/portfolyo/core/pfline/concat.py new file mode 100644 index 0000000..e69de29 diff --git a/portfolyo/core/shared/plot.py b/portfolyo/core/shared/plot.py index a328d25..caa414e 100644 --- a/portfolyo/core/shared/plot.py +++ b/portfolyo/core/shared/plot.py @@ -4,21 +4,23 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import hashlib +from typing import TYPE_CHECKING, Dict, Tuple import matplotlib - +import pandas as pd from matplotlib import pyplot as plt from ... import tools from ... import visualize as vis +from ..pfline import classes +from ..pfline.enums import Kind if TYPE_CHECKING: # needed to avoid circular imports from ..pfline import PfLine from ..pfstate import PfState -DEFAULTHOW = {"r": "bar", "q": "bar", "p": "hline", "w": "area", "f": "area"} DEFAULTFMT = { "w": "{:,.1f}", "q": "{:,.0f}", @@ -28,94 +30,151 @@ } -def defaultkwargs(col: str, is_cat: bool): - """Styling and type of graph, depending on column ``col`` and whether or not the x-axis - is a category axis (``is_cat``).""" - kwargs = {} - kwargs["alpha"] = 0.8 - # Add defaults for each column. - kwargs["color"] = getattr(vis.Colors.Wqpr, col, "grey") - kwargs["labelfmt"] = DEFAULTFMT.get(col, "{:.2f}") - kwargs["how"] = DEFAULTHOW.get(col, "hline") - kwargs["cat"] = is_cat - # Override specific cases. - if not is_cat and col == "p": - kwargs["how"] = "step" - if is_cat and col == "f": - kwargs["how"] = "bar" +def defaultkwargs(name: str, col: str): + # Get plot default kwargs. + if name is None: # no children + kwargs = { + "color": getattr(vis.Colors.Wqpr, col, "grey"), + "alpha": 0.7, + "labelfmt": DEFAULTFMT.get(col, "{:.2f}"), + } + elif name == "": # parent with children + kwargs = { + "color": "grey", + "alpha": 0.7, + "labelfmt": DEFAULTFMT.get(col, "{:.2f}"), + } + else: # child with name + hashed_value = hashlib.sha256(name.encode()).hexdigest() + hashed_int = int(hashed_value, 16) + index = hashed_int % len(vis.Colors.General) + kwargs = { + "color": list(vis.Colors.General)[index].value, + "alpha": 0.9, + "labelfmt": "", # no labels on children + "label": name, + "linewidth": 0.9, + } + return kwargs +def plotfn_and_kwargs( + col: str, freq: str, name: str +) -> Tuple[vis.PlotTimeseriesToAxFunction, Dict]: + """Get correct function to plot as well as default kwargs. ``col``: one of 'qwprf', + ``freq``: frequency; ``name``: name of the child. If name is emptystring, it is the + parent of a plot which also has children. If it is None, there are no children.""" + # Get plot function. + if tools.freq.shortest(freq, "MS") == "MS": # categorical + if name == "" or name is None: # parent + fn = vis.plot_timeseries_as_bar + else: # child + fn = vis.plot_timeseries_as_hline + else: # timeaxis + if col in ["w", "q"]: + if name == "" or name is None: # parent + fn = vis.plot_timeseries_as_area + else: # child + fn = vis.plot_timeseries_as_step + else: # col in ['p', 'r'] + if name == "" or name is None: # parent + fn = vis.plot_timeseries_as_step + else: # child + fn = vis.plot_timeseries_as_step + + kwargs = defaultkwargs(name, col) + + return fn, kwargs + + class PfLinePlot: def plot_to_ax( - self: PfLine, ax: plt.Axes, col: str, how: str, labelfmt: str, **kwargs + self: PfLine, ax: plt.Axes, children: bool = False, kind: Kind = None, **kwargs ) -> None: - """Plot a timeseries of the PfLine to a specific axes. + """Plot a specific dimension (i.e., kind) of the PfLine to a specific axis. Parameters ---------- ax : plt.Axes The axes object to which to plot the timeseries. - col : str - The column to plot. One of {'w', 'q', 'p', 'r'}. - how : str - How to plot the data. One of {'jagged', 'bar', 'area', 'step', 'hline'}. - labelfmt : str - Labels are added to each datapoint in the specified format. ('' to add no labels) - Any additional kwargs are passed to the pd.Series.plot function. + children : bool, optional (default: False) + If True, plot also the direct children of the PfLine. + kind : Kind, optional (default: None) + What dimension of the data to plot. Ignored unless PfLine.kind is COMPLETE. + **kwargs + Any additional kwargs are passed to the pd.Series.plot function when drawing + the parent. + + Returns + ------- + None """ - if col not in self.kind.available: + # Ensure ``kind`` is volume, price, or revenue. + if self.kind is not Kind.COMPLETE: + kind = self.kind + elif kind not in [Kind.VOLUME, Kind.PRICE, Kind.REVENUE]: raise ValueError( - f"For this PfLine, parameter ``col`` must be one of {', '.join(self.kind.available)}; got {col}." + "To plot a complete portfolio line, the dimension to be plotted must be specified. " + f"Parameter ``kind`` must be one of {{Kind.VOLUME, Kind.PRICE, Kind.REVENUE}}; got {kind}." ) - vis.plot_timeseries(ax, getattr(self, col), how, labelfmt, **kwargs) - def plot(self: PfLine, cols: str = None) -> plt.Figure: - """Plot one or more timeseries of the PfLine. + # Create function to select correct series of the pfline. + def col_and_series(pfl: PfLine) -> Tuple[str, pd.Series]: + if kind is Kind.PRICE: + return "p", pfl.p + elif kind is Kind.REVENUE: + return "r", pfl.r + elif tools.freq.longest(pfl.index.freq, "D") == "D": # timeaxis + return "w", pfl.w # kind is Kind.VOLUME + else: + return "q", pfl.q # kind is Kind.VOLUME + + # Plot top-level data first. + col, s = col_and_series(self) + # s = s.pint.m + fn, d_kwargs = plotfn_and_kwargs(col, self.index.freq, "" if children else None) + fn(ax, s, **(d_kwargs | kwargs)) + + # Plot children if wanted and available. + if not children or not isinstance(self, classes.NestedPfLine): + return + for name, child in self.items(): + col, s = col_and_series(child) + # s = s.pint.m + fn, d_kwargs = plotfn_and_kwargs(col, self.index.freq, name) + fn(ax, s, **d_kwargs) + ax.legend() + + def plot(self, children: bool = False) -> plt.Figure: + """Plot the PfLine. Parameters ---------- - cols : str, optional - The columns to plot. Default: plot volume (in [MW] for daily values and - shorter, [MWh] for monthly values and longer) and price `p` [Eur/MWh] - (if available). + children : bool, optional (default: False) + If True, plot also the direct children of the PfLine. Returns ------- plt.Figure The figure object to which the series was plotted. """ - # Plot on category axis if freq monthly or longer, else on time axis. - is_category = tools.freq.shortest(self.index.freq, "MS") == "MS" - - # If columns are specified, plot these. Else: take defaults, based on what's available - if cols is None: - cols = "" - if "q" in self.kind.available: - cols += "q" if is_category else "w" - if "p" in self.kind.available: - cols += "p" - else: - cols = [col for col in cols if col in self.kind.available] - if not cols: - raise ValueError("No columns to plot.") - - # Create the plots. - size = (10, len(cols) * 3) - fig, axes = plt.subplots( - len(cols), 1, sharex=True, sharey=False, squeeze=False, figsize=size - ) - for col, ax in zip(cols, axes.flatten()): - kwargs = defaultkwargs(col, is_category) - s = getattr(self, col) - vis.plot_timeseries(ax, s, **kwargs) + if self.kind is not Kind.COMPLETE: + # one axes + fig, ax = plt.subplots(1, 1, squeeze=True, figsize=(10, 3)) + self.plot_to_ax(ax, children) + + else: + fig, axes = plt.subplots(3, 1, sharex=True, squeeze=True, figsize=(10, 9)) + for ax, kind in zip(axes, [Kind.VOLUME, Kind.PRICE, Kind.REVENUE]): + self.plot_to_ax(ax, children, kind) return fig class PfStatePlot: - def plot(self: PfState) -> plt.Figure: + def plot(self: PfState, children: bool = False) -> plt.Figure: """Plot the portfolio state. Parameters @@ -127,56 +186,61 @@ def plot(self: PfState) -> plt.Figure: plt.Figure The figure object to which the series was plotted. """ - gridspec = {"width_ratios": [1, 1], "height_ratios": [4, 1]} - fig, axes = plt.subplots( - 2, 2, sharex=True, gridspec_kw=gridspec, figsize=(10, 6) + gridspec = {"width_ratios": [1, 1, 1], "height_ratios": [4, 1]} + fig, (volumeaxes, priceaxes) = plt.subplots( + 2, 3, sharex=True, sharey="row", gridspec_kw=gridspec, figsize=(10, 6) ) - axes = axes.flatten() - axes[0].sharey(axes[1]) - - # If freq is MS or longer: use categorical axes. Plot volumes in MWh. - # If freq is D or shorter: use time axes. Plot volumes in MW. - is_category = tools.freq.shortest(self.index.freq, "MS") == "MS" - # Volumes. - if is_category: - so, ss = -1 * self.offtakevolume.q, self.sourced.q - kwargs = defaultkwargs("q", is_category) - else: - so, ss = -1 * self.offtakevolume.w, self.sourced.w - kwargs = defaultkwargs("w", is_category) - vis.plot_timeseries(axes[0], so, **kwargs) - vis.plot_timeseries(axes[1], ss, **kwargs) + so, ss, usv = ( + -1 * self.offtakevolume, + self.sourced, + self.unsourced, + ) + so.plot_to_ax(volumeaxes[0], children=children, kind=so.kind, labelfmt="") + ss.plot_to_ax(volumeaxes[1], children=children, kind=Kind.VOLUME, labelfmt="") + # Unsourced volume. + usv.plot_to_ax(volumeaxes[2], kind=Kind.VOLUME, labelfmt="") # Procurement Price. - vis.plot_timeseries(axes[2], self.pnl_cost.p, **defaultkwargs("p", is_category)) - - # Sourced fraction. - vis.plot_timeseries( - axes[3], self.sourcedfraction, **defaultkwargs("f", is_category) + self.pnl_cost.plot_to_ax(priceaxes[0], kind=Kind.PRICE, labelfmt="") + self.sourced.plot_to_ax( + priceaxes[1], children=children, kind=Kind.PRICE, labelfmt="" ) - - # Empty. - + # Unsourced price + self.unsourced.plot_to_ax(priceaxes[2], kind=Kind.PRICE, labelfmt="") # Set titles. - axes[0].set_title("Offtake volume") - axes[1].set_title("Sourced volume") - axes[2].set_title("Procurement price") - axes[3].set_title("Sourced fraction") + volumeaxes[0].set_title("Offtake volume") + volumeaxes[1].set_title("Sourced volume") + volumeaxes[2].set_title("Unsourced volume") + priceaxes[0].set_title("Procurement price") + priceaxes[1].set_title("Sourced price") + priceaxes[2].set_title("Unsourced price") + + limits_vol = [ax.get_ylim() for ax in volumeaxes] + limits_pr = [ax.get_ylim() for ax in priceaxes] + PfStatePlot.set_max_min_limits(volumeaxes, limits_vol) + PfStatePlot.set_max_min_limits(priceaxes, limits_pr) # Format tick labels. formatter = matplotlib.ticker.FuncFormatter( lambda x, p: "{:,.0f}".format(x).replace(",", " ") ) - axes[0].yaxis.set_major_formatter(formatter) - axes[1].yaxis.set_major_formatter(formatter) - axes[3].yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(1.0)) + volumeaxes[0].yaxis.set_major_formatter(formatter) + priceaxes[0].yaxis.set_major_formatter(formatter) + # axes[3].yaxis.set_major_formatter(matplotlib.ticker.PercentFormatter(1.0)) # Set ticks. - axes[0].xaxis.set_tick_params(labeltop=False, labelbottom=True) - axes[1].xaxis.set_tick_params(labeltop=False, labelbottom=True) - axes[2].xaxis.set_tick_params(labeltop=False, labelbottom=False) - axes[3].xaxis.set_tick_params(labeltop=False, labelbottom=False) + for ax in volumeaxes: + ax.xaxis.set_tick_params(labeltop=False, labelbottom=True) + for ax in priceaxes: + ax.xaxis.set_tick_params(labeltop=False, labelbottom=False) fig.tight_layout() return fig + + def set_max_min_limits(axes: plt.Axes, limit: int): + mins_vol, maxs_vol = zip(*limit) + + themin, themax = min(mins_vol), max(maxs_vol) + for ax in axes: + ax.set_ylim(themin * 1.1, themax * 1.1) diff --git a/portfolyo/dev/develop.py b/portfolyo/dev/develop.py index fb59837..edc7658 100644 --- a/portfolyo/dev/develop.py +++ b/portfolyo/dev/develop.py @@ -210,7 +210,7 @@ def get_pfline( childcount : int, optional (default: 2) Number of children on each level. (Ignored if `nlevels` == 1) positive : bool, optional (default: False) - If True, return only positive values. If False, make 1/3 of pflines negative. + If True, return only positive values. If False, make 1/2 of pflines negative. _ancestornames : Tuple[str], optional (default: ()) Text to start the childrens' names with (concatenated with '-') _seed : int, optional (default: no seed value) @@ -234,7 +234,7 @@ def get_pfline( Kind.COMPLETE: "qr", }[kind] df = get_dataframe(i, columns, _seed=_seed) - if not positive and np.random.rand() < 0.33: + if not positive and np.random.randint(1, 4) == 1: df = -1 * df # HACK: `-df` leads to error in pint. Maybe fixed in future return create.flatpfline(df) # Create nested PfLine. diff --git a/portfolyo/visualize/__init__.py b/portfolyo/visualize/__init__.py index 0cb9e32..c678a33 100644 --- a/portfolyo/visualize/__init__.py +++ b/portfolyo/visualize/__init__.py @@ -1,9 +1,7 @@ from .colors import Color, Colors from .plot import ( - plot_timeseries, plot_timeseries_as_area, plot_timeseries_as_bar, plot_timeseries_as_hline, - plot_timeseries_as_jagged, plot_timeseries_as_step, ) diff --git a/portfolyo/visualize/colors.py b/portfolyo/visualize/colors.py index 28be08e..20c27c4 100644 --- a/portfolyo/visualize/colors.py +++ b/portfolyo/visualize/colors.py @@ -1,6 +1,7 @@ """Creating colors for use in plotting.""" import colorsys +from enum import Enum from collections import namedtuple import matplotlib as mpl @@ -18,7 +19,7 @@ def lighten(self, value): def darken(self, value): """Darken the color by fraction ``value`` (between 0 and 1).""" - return self.lighten(self, -value) + return self.lighten(-value) light = property(lambda self: self.lighten(0.3)) xlight = property(lambda self: self.lighten(0.6)) @@ -27,20 +28,22 @@ def darken(self, value): class Colors: - class General: - PURPLE = Color(0.549, 0.110, 0.706) - GREEN = Color(0.188, 0.463, 0.165) - BLUE = Color(0.125, 0.247, 0.600) - ORANGE = Color(0.961, 0.533, 0.114) - RED = Color(0.820, 0.098, 0.114) - YELLOW = Color(0.945, 0.855, 0.090) - LBLUE = Color(0.067, 0.580, 0.812) - LGREEN = Color(0.325, 0.773, 0.082) - BLACK = Color(0, 0, 0) - WHITE = Color(1, 1, 1) + class General(Enum): + DARK_BURGUNDY = Color(0.2667, 0.0000, 0.0745) + BROWN_SUGAR = Color(0.3765, 0.2902, 0.1804) + DUSTY_GRAY = Color(0.5529, 0.5176, 0.3765) + MOONSTONE = Color(0.7647, 0.7569, 0.6118) + CORAL = Color(0.9608, 0.3412, 0.4980) + ORANGE = Color(0.9608, 0.5098, 0.1140) + RED = Color(0.8196, 0.0980, 0.1137) + YELLOW = Color(0.9451, 0.8549, 0.0902) + LBLUE = Color(0.0667, 0.5804, 0.8118) + LGREEN = Color(0.3255, 0.7725, 0.0824) + BLACK = Color(0.0000, 0.0000, 0.0000) + WHITE = Color(1.0000, 1.0000, 1.0000) class Wqpr: # Standard colors when plotting a portfolio w = Color(*mpl.colors.to_rgb("#0E524F")).lighten(0.15) q = Color(*mpl.colors.to_rgb("#0E524F")) r = Color(*mpl.colors.to_rgb("#8B7557")) - p = Color(*mpl.colors.to_rgb("#E53454")) + p = Color(*mpl.colors.to_rgb("#cd3759")) diff --git a/portfolyo/visualize/plot.py b/portfolyo/visualize/plot.py index bfdba2f..4d79233 100644 --- a/portfolyo/visualize/plot.py +++ b/portfolyo/visualize/plot.py @@ -2,12 +2,15 @@ Visualize portfolio lines, etc. """ +from typing import Callable + import matplotlib as mpl -import numpy as np import pandas as pd from matplotlib import pyplot as plt -from .. import tools +from portfolyo.tools.unit import to_name +from portfolyo.visualize.colors import Colors +from ..tools import freq as tools_freq from .categories import Categories, Category # noqa mpl.style.use("seaborn-v0_8") @@ -52,32 +55,24 @@ # pick the correct graph type. # -MAX_XLABELS = 20 +MAX_XLABELS = 15 + + +class ContinuousValuesNotSupported(Exception): + pass + +class CategoricalValuesNotSupported(Exception): + pass -def use_categories(ax: plt.Axes, s: pd.Series, cat: bool = None) -> bool: - """Determine if plot should be made with category axis (True) or datetime axis (False).""" - # We use categorical data if... - if (ax.lines or ax.collections or ax.containers) and ax.xaxis.have_units(): - return True # ...ax already has category axis; or - elif cat is None and tools.freq.shortest(s.index.freq, "MS") == "MS": - return True # ...it's the default for the given frequency; or - elif cat is True: - return True # ...user wants it. - return False +PlotTimeseriesToAxFunction = Callable[[plt.Axes, pd.Series, ...], None] docstringliteral_plotparameters = """ Other parameters ---------------- labelfmt : str, optional (default: '') Labels are added to each datapoint in the specified format. ('' to add no labels) -cat : bool, optional - If False, plots x-axis as timeline with timestamps spaced according to their - duration. If True, plots x-axis categorically, with timestamps spaced equally. - Disregarded if ``ax`` already has values (then: use whatever is already set). - Default: use True if ``s`` has a monthly frequency or longer, False if the frequency - is shorter than monthly. **kwargs : any formatting are passed to the Axes plot method being used.""" @@ -89,172 +84,132 @@ def decorator(fn): return decorator -@append_to_doc(docstringliteral_plotparameters) -def plot_timeseries_as_jagged( - ax: plt.Axes, s: pd.Series, labelfmt: str = "", cat: bool = None, **kwargs -) -> None: - """Plot timeseries ``s`` to axis ``ax``, as jagged line and/or as markers. Use kwargs - ``linestyle`` and ``marker`` to specify line style and/or marker style. (Default: line only). - """ - s = prepare_ax_and_s(ax, s) # ensure unit compatibility (if possible) - - if use_categories(ax, s, cat): - categories = Categories(s) - ax.plot(categories.x(), categories.y(), **kwargs) - ax.set_xticks(categories.x(MAX_XLABELS), categories.labels(MAX_XLABELS)) - set_data_labels(ax, categories.x(), categories.y(), labelfmt, False) - - else: - ax.plot(s.index, s.values, **kwargs) - set_data_labels(ax, s.index, s.values, labelfmt, False) - - @append_to_doc(docstringliteral_plotparameters) def plot_timeseries_as_bar( - ax: plt.Axes, s: pd.Series, labelfmt: str = "", cat: bool = None, **kwargs + ax: plt.Axes, + s: pd.Series, + labelfmt: str = "", + width: float = 0.8, + **kwargs, ) -> None: - """Plot timeseries ``s`` to axis ``ax``, as bars. Ideally, only used for plots with - categorical (i.e, non-time) x-axis.""" + """Plot timeseries ``s`` to axis ``ax``, as bars. + On plots with categorical (i.e, non-continuous) time axis.""" + if not is_categorical(s): + raise ContinuousValuesNotSupported( + "This plot is not compatible with continous values" + ) + check_ax_s_compatible(ax, s) s = prepare_ax_and_s(ax, s) # ensure unit compatibility (if possible) - if use_categories(ax, s, cat): - categories = Categories(s) - ax.bar(categories.x(), categories.y(), 0.8, **kwargs) - ax.set_xticks(categories.x(MAX_XLABELS), categories.labels(MAX_XLABELS)) - set_data_labels(ax, categories.x(), categories.y(), labelfmt, True) - - else: - # Bad combination: bar graph on time-axis. But allow anyway. - - # This is slow if there are many elements. - # x = s.index + 0.5 * (s.index.right - s.index) - # width = pd.Timedelta(hours=s.index.duration.median().to("h").magnitude * 0.8) - # ax.bar(x.values, s.values, width, **kwargs) - - # This is faster. - delta = s.index.right - s.index - x = np.array(list(zip(s.index + 0.1 * delta, s.index + 0.9 * delta))).flatten() - magnitudes = np.array([[v, 0] for v in s.values.quantity.magnitude]).flatten() - values = tools.unit.PA_(magnitudes, s.values.quantity.units) - ax.fill_between(x, 0, values, step="post", **kwargs) - - set_data_labels(ax, s.index + 0.5 * delta, s.values, labelfmt, True) + categories = Categories(s) + ax.bar(categories.x(), categories.y(), width=width, **kwargs) + ax.set_xticks(categories.x(MAX_XLABELS), categories.labels(MAX_XLABELS)) + set_data_labels( + ax, categories.x(MAX_XLABELS), categories.y(MAX_XLABELS), labelfmt, True + ) + ax.autoscale() @append_to_doc(docstringliteral_plotparameters) def plot_timeseries_as_area( - ax: plt.Axes, s: pd.Series, labelfmt: str = "", cat: bool = None, **kwargs + ax: plt.Axes, + s: pd.Series, + labelfmt: str = "", + **kwargs, ) -> None: - """Plot timeseries ``s`` to axis ``ax``, as stepped area between 0 and value. Ideally, - only used for plots with time (i.e., non-categorical) axis.""" + """Plot timeseries ``s`` to axis ``ax``, as stepped area between 0 and value. + On plots with continuous (i.e., non-categorical) time axis.""" + if is_categorical(s): + raise CategoricalValuesNotSupported( + "This plot is not compatible with categorical values" + ) + check_ax_s_compatible(ax, s) s = prepare_ax_and_s(ax, s) # ensure unit compatibility (if possible) splot = s.copy() # modified with additional (repeated) datapoint splot[splot.index.right[-1]] = splot.values[-1] - if use_categories(ax, s, cat): - # Bad combination: area graph on categorical axis. But allow anyway. - - categories = Categories(s) - ctgr_extra = Categories(splot) - # Center around x-tick: - ax.fill_between(ctgr_extra.x() - 0.5, 0, ctgr_extra.y(), step="post", **kwargs) - ax.set_xticks(categories.x(MAX_XLABELS), categories.labels(MAX_XLABELS)) - set_data_labels(ax, categories.x(), categories.y(), labelfmt, True) + bottom = [0.0 for i in range(0, splot.size)] + # make bottom into pintarray + bottom = bottom * splot.values[0].units - else: - ax.fill_between(splot.index, 0, splot.values, step="post", **kwargs) - delta = s.index.right - s.index - set_data_labels(ax, s.index + 0.5 * delta, s.values, labelfmt, True) + ax.fill_between( + splot.index, + bottom, + [sum(x) for x in zip(bottom, splot.values)], + step="post", + **kwargs, + ) + delta = s.index.right - s.index + set_data_labels(ax, s.index + 0.5 * delta, s.values, labelfmt, True) + ax.autoscale() @append_to_doc(docstringliteral_plotparameters) def plot_timeseries_as_step( - ax: plt.Axes, s: pd.Series, labelfmt: str = "", cat: bool = None, **kwargs + ax: plt.Axes, s: pd.Series, labelfmt: str = "", **kwargs ) -> None: """Plot timeseries ``s`` to axis ``ax``, as stepped line (horizontal and vertical lines). - Ideally, only used for plots with time (i.e., non-categorical) axis.""" + On plots with continuous (i.e., non-categorical) time axis.""" + if is_categorical(s): + raise CategoricalValuesNotSupported( + "This plot is not compatible with categorical values" + ) + check_ax_s_compatible(ax, s) s = prepare_ax_and_s(ax, s) # ensure unit compatibility (if possible) splot = s.copy() # modified with additional (repeated) datapoint splot[splot.index.right[-1]] = splot.values[-1] - if use_categories(ax, s, cat): - # Bad combination: step graph on categorical axis. But allow anyway. - - categories = Categories(s) - ctgr_extra = Categories(splot) - # Center around x-tick: - ax.step(ctgr_extra.x() - 0.5, ctgr_extra.y(), where="post", **kwargs) - ax.set_xticks(categories.x(MAX_XLABELS), categories.labels(MAX_XLABELS)) - set_data_labels(ax, categories.x(), categories.y(), labelfmt, True) - - else: - ax.step(splot.index, splot.values, where="post", **kwargs) - delta = s.index.right - s.index - set_data_labels(ax, s.index + 0.5 * delta, s.values, labelfmt, True) + ax.step(splot.index, splot.values, where="mid", **kwargs) + delta = s.index.right - s.index + set_data_labels(ax, s.index + 0.5 * delta, s.values, labelfmt, True) + ax.autoscale() @append_to_doc(docstringliteral_plotparameters) def plot_timeseries_as_hline( - ax: plt.Axes, s: pd.Series, labelfmt: str = "", cat: bool = None, **kwargs + ax: plt.Axes, s: pd.Series, labelfmt: str = "", **kwargs ) -> None: - """Plot timeseries ``s`` to axis ``ax``, as horizontal lines. Ideally, only used for - plots with time (i.e., non-categorical) axis.""" + """Plot timeseries ``s`` to axis ``ax``, as horizontal lines. + On plots with categorical (i.e., non-continuous) time axis.""" + if not is_categorical(s): + raise ContinuousValuesNotSupported( + "This plot is not compatible with continous time axis" + ) + check_ax_s_compatible(ax, s) s = prepare_ax_and_s(ax, s) # ensure unit compatibility (if possible) + categories = Categories(s) + # Center around x-tick: + ax.hlines(categories.y(), categories.x() - 0.4, categories.x() + 0.4, **kwargs) + ax.set_xticks(categories.x(MAX_XLABELS), categories.labels(MAX_XLABELS)) + set_data_labels(ax, categories.x(), categories.y(), labelfmt, True) + ax.autoscale() + # Adjust the margins around the plot + ax.margins(x=0.2, y=0.2) - if use_categories(ax, s, cat): - # Bad combination: hline graph on categorical axis. But allow anyway. - - categories = Categories(s) - # Center around x-tick: - ax.hlines(categories.y(), categories.x() - 0.5, categories.x() + 0.5, **kwargs) - ax.set_xticks(categories.x(MAX_XLABELS), categories.labels(MAX_XLABELS)) - set_data_labels(ax, categories.x(), categories.y(), labelfmt, True) - else: - delta = s.index.right - s.index - ax.hlines(s.values, s.index, s.index.right, **kwargs) - set_data_labels(ax, s.index + 0.5 * delta, s.values, labelfmt, False) +def set_portfolyo_attr(ax, name, val): + """ + Sets attribute ax._portfolyo which is a dictionary: ._portfolyo = {'unit': ..., 'freq': ..., ...} + If dictionary doesn't exist yet, creates an empty dictionary + """ + pattr = getattr(ax, "_portfolyo", {}) + pattr[name] = val + setattr(ax, "_portfolyo", pattr) -def plot_timeseries( - ax: plt.Axes, - s: pd.Series, - how: str = "jagged", - labelfmt: str = None, - cat: bool = None, - **kwargs, -) -> None: - """Plot timeseries to given axis. - - Parameters - ---------- - ax : plt.Axes - Axes to plot to. - s : pd.Series - Timeseries to plot - how : str, optional (default: 'jagged') - How to plot the data; one of {'jagged', 'bar', 'area', 'step', 'hline'}. - labelfmt : str, optional (default: '') - Labels are added to each datapoint in the specified format. ('' to add no labels) - cat : bool, optional (default: True if frequency is monthly or larger) - Plot as categorical x-axis. +def get_portfolyo_attr(ax, name, default_val=None): """ - if how == "jagged": - plot_timeseries_as_jagged(ax, s, labelfmt, cat, **kwargs) - elif how == "bar": - plot_timeseries_as_bar(ax, s, labelfmt, cat, **kwargs) - elif how == "area": - plot_timeseries_as_area(ax, s, labelfmt, cat, **kwargs) - elif how == "step": - plot_timeseries_as_step(ax, s, labelfmt, cat, **kwargs) - elif how == "hline": - plot_timeseries_as_hline(ax, s, labelfmt, cat, **kwargs) - else: - raise ValueError( - f"Parameter ``how`` must be one of 'jagged', 'bar', 'area', 'step', 'hline'; got {how}." - ) + Gets values from dictionary ax._portfolyo, if it doesn't exist returns None. + """ + pattr = getattr(ax, "_portfolyo", {}) + return pattr.get(name, default_val) + + +def is_categorical(s: pd.Series) -> bool: + """The function checks whether frequency of panda Series falls into continous or categorical group""" + return tools_freq.longer_or_shorter(s.index.freq, "D") == 1 def prepare_ax_and_s(ax: plt.Axes, s: pd.Series, unit=None) -> pd.Series: @@ -275,7 +230,7 @@ def prepare_ax_and_s(ax: plt.Axes, s: pd.Series, unit=None) -> pd.Series: if s.dtype == float or s.dtype == int: s = s.astype("pint[1]") # Find preexisting unit. - axunit = getattr(ax, "_unit", None) + axunit = get_portfolyo_attr(ax, "unit", None) # Axes already has unit. Convert series to that unit; ignore any supplied custom unit. if axunit is not None: @@ -283,7 +238,7 @@ def prepare_ax_and_s(ax: plt.Axes, s: pd.Series, unit=None) -> pd.Series: raise ValueError( f"Cannot plot series with units {s.pint.units} on axes with units {axunit}." ) - return s.astype(axunit) + return s.astype(f"pint[{axunit}]") # Axes does not have unit. if unit is not None: @@ -293,16 +248,48 @@ def prepare_ax_and_s(ax: plt.Axes, s: pd.Series, unit=None) -> pd.Series: f"Cannot convert series with units {s.pint.units} to units {unit}." ) s = s.astype(unit) - setattr(ax, "_unit", unit) + set_portfolyo_attr(ax, "unit", unit) else: # No custom unit provided. Convert series to base units. s = s.pint.to_base_units() - setattr(ax, "_unit", s.pint.units) - - ax.set_ylabel(f"{ax._unit:~P}") + set_portfolyo_attr(ax, "unit", s.pint.units) + # Get unit attribute + unit = get_portfolyo_attr(ax, "unit") + name_unit = to_name(unit) + # Define color mapping based on 'Wqpr' class attributes + unit_colors = { + "w": Colors.Wqpr.w, + "q": Colors.Wqpr.q, + "r": Colors.Wqpr.r, + "p": Colors.Wqpr.p, + } + # Set default color if name_unit not found + default_color = "white" + # Get background color based on name_unit + background_color = unit_colors.get(name_unit, default_color) + ax.set_ylabel(f"{unit:~P}", backgroundcolor=background_color.lighten(0.3)) return s +def check_ax_s_compatible(ax: plt.Axes, s: pd.Series): + """Ensure axes ``ax`` has frequency associated with it and checks compatibility with series + ``s``. + If axes `ax`` has frequency, compare to pd.Series frequency. If they are equal, return s, if not equal, return error. + If axes `ax`` doesn't have frequency, assign frequency of s to it. + """ + # Find preexisting frequency. + axfreq = get_portfolyo_attr(ax, "freq", None) + series_freq = s.index.freq + # Axes does not have frequency. + if axfreq is None: + set_portfolyo_attr(ax, "freq", series_freq) + else: + if get_portfolyo_attr(ax, "freq") != series_freq: + raise AttributeError( + "The frequency of PFLine is not compatible with current axes" + ) + + def set_data_labels( ax: plt.Axes, xx, yy, labelfmt, outside: bool = False, maxcount: int = 24 ): @@ -320,12 +307,38 @@ def set_data_labels( for x, y in zip(xx, yy): lbl = labelfmt.format(y.magnitude).replace(",", " ") xytext = (0, -10) if outside and y.magnitude < 0 else (0, 10) - ax.annotate(lbl, (x, y), textcoords="offset points", xytext=xytext, ha="center") + ax.annotate( + lbl, + (x, y), + textcoords="offset points", + xytext=xytext, + ha="center", + va="top" if y.magnitude < 0 else "bottom", + rotation=90, + ) + + # # Add labels only to every third data point. + # for i in range(0, len(xx), 3): # Iterate every third index + # x = xx[i] + # y = yy[i] + # lbl = labelfmt.format(y.magnitude).replace(",", " ") + # xytext = (0, -10) if outside and y.magnitude < 0 else (0, 10) + # ax.annotate( + # lbl, + # (x, y), + # textcoords="offset points", + # xytext=xytext, + # ha="center", + # rotation=90, + # ) # Increase axis range to give label space to stay inside box. - ylim = list(ax.get_ylim()) - if not np.isclose(ylim[0], 0) and ylim[0] < 0: - ylim[0] *= 1.1 - if not np.isclose(ylim[1], 0) and ylim[1] > 0: - ylim[1] *= 1.1 - ax.set_ylim(*ylim) + # ylim = list(ax.get_ylim()) + miny, maxy = ax.get_ylim() + delta = 0.5 + miny2, maxy2 = (1 + delta) * miny - delta * maxy, (1 + delta) * maxy - delta * miny + # if not np.isclose(ylim[0], 0) and ylim[0] < 0: + # ylim[0] *= 1.1 + # if not np.isclose(ylim[1], 0) and ylim[1] > 0: + # ylim[1] *= 1.1 + ax.set_ylim(miny2, maxy2) diff --git a/tests/core/pfline/test_pfline_arithmatic_kind_and_error.py b/tests/core/pfline/test_pfline_arithmatic_kind_and_error.py index d716236..11f67a5 100644 --- a/tests/core/pfline/test_pfline_arithmatic_kind_and_error.py +++ b/tests/core/pfline/test_pfline_arithmatic_kind_and_error.py @@ -1,4 +1,5 @@ """Test if arithmatic returns correct type/kind of object and/or correctly raises error.""" + from dataclasses import dataclass from enum import Enum from functools import lru_cache @@ -17,7 +18,14 @@ # TODO: use/change STRICT setting -arithmatic.STRICT = True +@pytest.fixture +def strict_arithmetic(): + arithmatic.STRICT = True + yield + arithmatic.STRICT = False + + +# arithmatic.STRICT = True class Kind2(Enum): # Kind of value for other operand @@ -319,7 +327,9 @@ def from_config(cls, config: CaseConfig, er: ER) -> Iterable[Case]: ), ids=id_fn, ) -def test_pfl_arithmatic_kind_addraddsubrsub(testcase: Case, operation: str): +def test_pfl_arithmatic_kind_addraddsubrsub( + testcase: Case, operation: str, strict_arithmetic +): """Test if arithmatic expectedly raises Error or returns expected type/kind.""" do_kind_test(testcase, operation) @@ -383,7 +393,7 @@ def test_pfl_arithmatic_kind_addraddsubrsub(testcase: Case, operation: str): ), ids=id_fn, ) -def test_pfl_arithmatic_kind_mulrmul(testcase: Case, operation: str): +def test_pfl_arithmatic_kind_mulrmul(testcase: Case, operation: str, strict_arithmetic): """Test if arithmatic expectedly raises Error or returns expected type/kind.""" do_kind_test(testcase, operation) @@ -456,7 +466,7 @@ def test_pfl_arithmatic_kind_mulrmul(testcase: Case, operation: str): ), ids=id_fn, ) -def test_pfl_arithmatic_kind_div(testcase: Case, operation: str): +def test_pfl_arithmatic_kind_div(testcase: Case, operation: str, strict_arithmetic): """Test if arithmatic expectedly raises Error or returns expected type/kind.""" do_kind_test(testcase, operation) @@ -505,7 +515,7 @@ def test_pfl_arithmatic_kind_div(testcase: Case, operation: str): ), ids=id_fn, ) -def test_pfl_arithmatic_kind_rdiv(testcase: Case, operation: str): +def test_pfl_arithmatic_kind_rdiv(testcase: Case, operation: str, strict_arithmetic): """Test if arithmatic expectedly raises Error or returns expected type/kind.""" do_kind_test(testcase, operation) @@ -581,7 +591,9 @@ def test_pfl_arithmatic_kind_rdiv(testcase: Case, operation: str): ), ids=id_fn, ) -def test_pfl_arithmatic_kind_unionrunion(testcase: Case, operation: str): +def test_pfl_arithmatic_kind_unionrunion( + testcase: Case, operation: str, strict_arithmetic +): """Test if arithmatic expectedly raises Error or returns expected type/kind.""" do_kind_test(testcase, operation) diff --git a/tests/visualize/test_plot.py b/tests/visualize/test_plot.py new file mode 100644 index 0000000..1b15d8a --- /dev/null +++ b/tests/visualize/test_plot.py @@ -0,0 +1,79 @@ +"""Test if portfolio line can be plotted.""" + +import pytest +import pandas as pd +import portfolyo as pf +from portfolyo.core.pfline.enums import Kind +from portfolyo.core.pfstate.pfstate import PfState +import matplotlib.pyplot as plt + + +@pytest.mark.parametrize("levels", [1, 2, 3]) +@pytest.mark.parametrize("childcount", [1, 2, 3]) +@pytest.mark.parametrize("children", ["True", "False"]) +@pytest.mark.parametrize("kind", [Kind.VOLUME, Kind.PRICE, Kind.REVENUE, Kind.COMPLETE]) +@pytest.mark.parametrize("freq", ["MS", "D"]) +def test_pfline_plot( + levels: int, childcount: int, children: str, kind: Kind, freq: str +): + """Test if data can be plotted with plot() function.""" + index = pd.date_range("2020-01-01", "2021-01-01", freq=freq, tz=None) + pfl = pf.dev.get_pfline(index, nlevels=levels, childcount=childcount, kind=kind) + pfl.plot(children=children) + + +@pytest.mark.parametrize("childcount", [1, 2, 3]) +@pytest.mark.parametrize("children", ["True", "False"]) +@pytest.mark.parametrize("freq", ["MS", "D"]) +def test_pfstate_plot( + childcount: int, + children: str, + freq: str, +): + """Test if pfstate can be plotted with plot() function.""" + index = pd.date_range( + "2022-06-01", "2024-02-01", freq=freq, tz="Europe/Berlin", inclusive="left" + ) + offtakevolume = pf.dev.get_nestedpfline( + index, kind=Kind.VOLUME, childcount=childcount + ) + sourced = pf.dev.get_nestedpfline(index, kind=Kind.COMPLETE, childcount=childcount) + unsourcedprice = pf.dev.get_nestedpfline( + index, kind=Kind.PRICE, childcount=childcount + ) + pfs = PfState(-1 * offtakevolume, unsourcedprice, sourced) + pfs.plot(children=children) + + +@pytest.mark.parametrize("children", ["True", "False"]) +def test_flatpfline_plot(children: str): + """Test if plotting flatpfline with children attribute gives an error.""" + pfl = pf.dev.get_flatpfline() + pfl.plot(children=children) + + +@pytest.mark.parametrize("freq", ["MS", "D"]) +@pytest.mark.parametrize("children", ["True", "False"]) +@pytest.mark.parametrize("levels", [1, 2, 3]) +@pytest.mark.parametrize("childcount", [1, 2, 3]) +def test_plot_to_ax(levels: int, childcount: int, children: str, freq: str): + """Test if frunction plot_to_ax works with every kind of pfline.""" + index = pd.date_range("2020-01-01", "2021-01-01", freq=freq, tz=None) + pfl_compl = pf.dev.get_pfline( + index, nlevels=levels, childcount=childcount, kind=Kind.COMPLETE + ) + pfl_vol = pf.dev.get_pfline( + index, nlevels=levels, childcount=childcount, kind=Kind.VOLUME + ) + pfl_price = pf.dev.get_pfline( + index, nlevels=levels, childcount=childcount, kind=Kind.PRICE + ) + pfl_rev = pf.dev.get_pfline( + index, nlevels=levels, childcount=childcount, kind=Kind.REVENUE + ) + fig, axs = plt.subplots(2, 2) + with pytest.raises(ValueError): + _ = pfl_compl.plot_to_ax(axs[0][0], children=children, kind=Kind.COMPLETE) + pfl_vol.plot_to_ax(axs[0][1], children=children, kind=Kind.VOLUME) + pfl_price.plot_to_ax(axs[1][0], children=children, kind=Kind.PRICE) + pfl_rev.plot_to_ax(axs[1][1], children=children, kind=Kind.REVENUE)