Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand support for upsampling to nominal resolutions #180

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions timely_beliefs/beliefs/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import timely_beliefs.utils as tb_utils
from timely_beliefs.beliefs import probabilistic_utils
from timely_beliefs.beliefs import utils as belief_utils
from timely_beliefs.beliefs.time_utils import DatetimeLike, TimedeltaLike
from timely_beliefs.beliefs.utils import is_pandas_structure, is_tb_structure, meta_repr
from timely_beliefs.db_base import Base
from timely_beliefs.sensors import utils as sensor_utils
Expand All @@ -48,8 +49,6 @@
from timely_beliefs.sources.classes import BeliefSource, DBBeliefSource

METADATA = ["sensor", "event_resolution"]
DatetimeLike = Union[datetime, str, pd.Timestamp]
TimedeltaLike = Union[timedelta, str, pd.Timedelta]
JoinTarget = Union[
Selectable,
type,
Expand Down
150 changes: 150 additions & 0 deletions timely_beliefs/beliefs/time_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from __future__ import annotations

from datetime import datetime, timedelta
from typing import Union

import pandas as pd
from isodate import parse_duration

DatetimeLike = Union[datetime, str, pd.Timestamp]
TimedeltaLike = Union[timedelta, str, pd.Timedelta]


def to_max_timedelta(duration: str | pd.DateOffset | timedelta) -> pd.Timedelta:
"""Determine the maximum pd.Timedelta for a given ISO duration string or Pandas DateOffset object.

- Converts years to 366 days and months to 31 days.
- Does not convert days to 25 hours.
"""
if isinstance(duration, timedelta):
return pd.Timedelta(duration)
if isinstance(duration, pd.DateOffset):
duration = offset_to_iso_duration(duration)

offset_args = _iso_duration_to_offset_args(duration)
years = offset_args.get("years", 0)
months = offset_args.get("months", 0)
days = offset_args.get("days", 0)
if years:
days += 366 * years
offset_args["years"] = 0
if months:
days += 31 * months
offset_args["months"] = 0
offset_args["days"] = days

return pd.Timedelta(**offset_args)


def _iso_duration_to_offset_args(iso_duration: str) -> dict[str, int]:
# Validate ISO duration string before commencing our own parsing
parse_duration(iso_duration)

# Initialize offset components
offset_args = {}

# Parsing ISO duration string
pos = 0
encountered_time_designator = False
while pos < len(iso_duration):
num = ""
while pos < len(iso_duration) and iso_duration[pos].isdigit():
num += iso_duration[pos]
pos += 1

if pos >= len(iso_duration):
break

if not encountered_time_designator and iso_duration[pos] == "Y":
offset_args["years"] = int(num)
elif not encountered_time_designator and iso_duration[pos] == "M":
offset_args["months"] = int(num)
elif not encountered_time_designator and iso_duration[pos] == "W":
offset_args["weeks"] = int(num)
elif not encountered_time_designator and iso_duration[pos] == "D":
offset_args["days"] = int(num)
elif iso_duration[pos] == "T":
encountered_time_designator = True
elif encountered_time_designator and iso_duration[pos] == "H":
offset_args["hours"] = int(num)
elif encountered_time_designator and iso_duration[pos] == "M":
offset_args["minutes"] = int(num)
elif encountered_time_designator and iso_duration[pos] == "S":
offset_args["seconds"] = int(num)
pos += 1

return offset_args


def iso_duration_to_offset(iso_duration: str) -> pd.DateOffset:
"""
Convert an ISO duration string to a Pandas DateOffset object.

:param iso_duration: ISO duration string to convert.
:return: Pandas DateOffset object representing the duration.
"""
offset_args = _iso_duration_to_offset_args(iso_duration)

# Construct DateOffset without zero-valued components
return pd.DateOffset(**{k: v for k, v in offset_args.items() if v})


def _get_nominal_period_from_offset(
offset: pd.DateOffset, name: str, designator: str
) -> str:
try:
n_periods = getattr(offset, name)
if n_periods != 0:
return str(n_periods) + designator
except AttributeError:
pass
return ""


def _offset_contains_time(offset: pd.DateOffset) -> bool:
"""Also returns False if the offset contains only zero-valued time components."""
n_hours = False
n_minutes = False
n_seconds = False
try:
n_hours = offset.hours
except AttributeError:
pass
try:
n_minutes = offset.minutes
except AttributeError:
pass
try:
n_seconds = offset.seconds
except AttributeError:
pass
return bool(n_hours * n_minutes * n_seconds)


def offset_to_iso_duration(offset: pd.DateOffset) -> str:
"""
Convert a Pandas DateOffset to an ISO duration string.

Parameters:
offset (DateOffset): Pandas DateOffset object to convert.

Returns:
str: ISO duration string representing the duration of the offset.
"""
iso_duration = "P"
iso_duration += _get_nominal_period_from_offset(offset, "years", "Y")
iso_duration += _get_nominal_period_from_offset(offset, "months", "M")
iso_duration += _get_nominal_period_from_offset(offset, "weeks", "W")
iso_duration += _get_nominal_period_from_offset(offset, "days", "D")

# check for hours/minutes/seconds
if _offset_contains_time(offset):
iso_duration += "T"
iso_duration += _get_nominal_period_from_offset(offset, "hours", "H")
iso_duration += _get_nominal_period_from_offset(offset, "minutes", "M")
iso_duration += _get_nominal_period_from_offset(offset, "seconds", "S")

if iso_duration == "P":
iso_duration = "PT0H"

return iso_duration
36 changes: 25 additions & 11 deletions timely_beliefs/beliefs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import math
import warnings
from datetime import datetime, timedelta
from typing import Union

import numpy as np
import pandas as pd
Expand All @@ -21,10 +20,13 @@
get_median_belief,
probabilistic_nan_mean,
)
from timely_beliefs.beliefs.time_utils import (
TimedeltaLike,
iso_duration_to_offset,
to_max_timedelta,
)
from timely_beliefs.sources import utils as source_utils

TimedeltaLike = Union[timedelta, str, pd.Timedelta]


def select_most_recent_belief(
df: "classes.BeliefsDataFrame",
Expand Down Expand Up @@ -904,9 +906,14 @@ def convert_to_timezone(
if version.parse(pd.__version__) >= version.parse("1.4.0"):

def initialize_index(
start: datetime, end: datetime, resolution: timedelta, inclusive: str = "left"
start: datetime,
end: datetime,
resolution: timedelta | pd.DateOffset | str,
inclusive: str = "left",
) -> pd.DatetimeIndex:
"""Initialize DatetimeIndex for event starts."""
if isinstance(resolution, str):
resolution = iso_duration_to_offset(resolution)
return pd.date_range(
start=start,
end=end,
Expand All @@ -918,9 +925,14 @@ def initialize_index(
else:

def initialize_index(
start: datetime, end: datetime, resolution: timedelta, inclusive: str = "left"
start: datetime,
end: datetime,
resolution: timedelta | pd.DateOffset | str,
inclusive: str = "left",
) -> pd.DatetimeIndex:
"""Initialize DatetimeIndex for event starts."""
if isinstance(resolution, str):
resolution = iso_duration_to_offset(resolution)
return pd.date_range(
start=start, end=end, freq=resolution, closed=inclusive, name="event_start"
)
Expand Down Expand Up @@ -1101,20 +1113,22 @@ def convert_to_instantaneous(

def upsample_beliefs_data_frame(
df: "classes.BeliefsDataFrame" | pd.DataFrame,
event_resolution: timedelta,
event_resolution: TimedeltaLike,
keep_nan_values: bool = False,
boundary_policy: str = "first",
) -> "classes.BeliefsDataFrame":
"""Because simply doing df.resample().ffill() does not correctly resample the last event in the data frame.

todo: stop converting nominal to max absolute durations once BeliefsDataFrames can handle nominal event resolutions

:param df: In case of a regular pd.DataFrame, make sure to set df.event_resolution before passing it to this function.
:param event_resolution: Resolution to upsample to.
:param keep_nan_values: If True, place back resampled NaN values. Drops NaN values by default.
:param boundary_policy: When upsampling to instantaneous events,
take the 'max', 'min' or 'first' value at event boundaries.
"""
if df.empty:
df.event_resolution = event_resolution
df.event_resolution = to_max_timedelta(event_resolution)
return df
if event_resolution == timedelta(0):
return convert_to_instantaneous(
Expand All @@ -1124,9 +1138,9 @@ def upsample_beliefs_data_frame(
from_event_resolution = df.event_resolution
if from_event_resolution == timedelta(0):
raise NotImplementedError("Cannot upsample from zero event resolution.")
resample_ratio = pd.to_timedelta(to_offset(from_event_resolution)) / pd.Timedelta(
event_resolution
)
resample_ratio = pd.to_timedelta(
to_offset(from_event_resolution)
) / to_max_timedelta(event_resolution)
if keep_nan_values:
# Back up NaN values.
# We are flagging the positions of the NaN values in the original data with a unique number.
Expand Down Expand Up @@ -1190,5 +1204,5 @@ def upsample_beliefs_data_frame(
if keep_nan_values:
# place back original NaN values
df = df.replace(unique_event_value_not_in_df, np.NaN)
df.event_resolution = event_resolution
df.event_resolution = to_max_timedelta(event_resolution)
return df
Loading