Skip to content

Commit

Permalink
code to create time-range DFs and (empty) lattices
Browse files Browse the repository at this point in the history
  • Loading branch information
tnixon committed Jun 2, 2024
1 parent baf9f80 commit 5b17a02
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 9 deletions.
105 changes: 98 additions & 7 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@
import logging
import operator
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable
from functools import cached_property
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union, Mapping
from typing import Collection, Dict, cast, overload

import pyspark.sql.functions as sfn
from datetime import datetime as dt, timedelta as td

from IPython.core.display import HTML
from IPython.display import display as ipydisplay
from pyspark.sql import GroupedData
from pyspark.sql import SparkSession
import pandas as pd
from pandas.core.frame import DataFrame as PandasDataFrame

from pyspark import RDD
import pyspark.sql.functions as sfn
from pyspark.sql import SparkSession, GroupedData
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import DataType, StructType
from pyspark.sql.types import AtomicType, DataType, StructType
from pyspark.sql.window import Window, WindowSpec

import tempo.interpol as t_interpolation
Expand Down Expand Up @@ -97,7 +103,9 @@ def time_str_to_double(df: DataFrame,

class TSDF(WindowBuilder):
"""
This object is the main wrapper over a Spark data frame which allows a user to parallelize time series computations on a Spark data frame by various dimensions. The two dimensions required are partition_cols (list of columns by which to summarize) and ts_col (timestamp column, which can be epoch or TimestampType).
This class represents a time series DataFrame (TSDF) - a DataFrame with a
time series index. It can represent multiple logical time series,
each identified by a unique set of series IDs.
"""

def __init__(
Expand Down Expand Up @@ -142,7 +150,8 @@ def __withStandardizedColOrder(self) -> TSDF:
* ts_index,
* observation columns
:return: a :class:`TSDF` with the columns reordered into "standard order" (as described above)
:return: a :class:`TSDF` with the columns reordered into
"standard order" (as described above)
"""
std_ordered_cols = (
list(self.series_ids)
Expand All @@ -155,6 +164,88 @@ def __withStandardizedColOrder(self) -> TSDF:
# default column name for constructed timeseries index struct columns
__DEFAULT_TS_IDX_COL = "ts_idx"

@classmethod
def buildEmptyLattice(
cls,
spark: SparkSession,
start_time: dt,
end_time: Optional[dt] = None,
step_size: Optional[td] = None,
num_intervals: Optional[int] = None,
ts_col: Optional[str] = None,
series_ids: Optional[Any] = None,
series_schema: Optional[Union[AtomicType, StructType, str]] = None,
observation_cols: Optional[Union[Mapping[str, str], Iterable[str]]] = None,
num_partitions: Optional[int] = None) -> TSDF:
"""
Construct an empty "lattice", i.e. a :class:`TSDF` with a time range
for each unique series and a set of observational columns (initialized to Nulls)
:param spark: the Spark session to use
:param start_time: the start time of the lattice
:param end_time: the end time of the lattice (optional)
:param step_size: the step size between each time interval (optional)
:param num_intervals: the number of intervals to create (optional)
:param ts_col: the name of the timestamp column (optional)
:param series_ids: the unique series identifiers (optional)
:param series_schema: the schema of the series identifiers (optional)
:param observation_cols: the observational columns to include (optional)
:param num_partitions: the number of partitions to create (optional)
:return: a :class:`TSDF` representing the empty lattice
"""

# set a default timestamp column if not provided
if ts_col is None:
ts_col = cls.__DEFAULT_TS_IDX_COL

# initialize the lattice as a time range
lattice_df = t_utils.time_range(spark,
start_time,
end_time,
step_size,
num_intervals,
ts_colname=ts_col)
select_exprs = [sfn.col(ts_col)]

# handle construction of the series_ids DataFrame
series_df = None
if series_ids:
if isinstance(series_ids, DataFrame):
series_df = series_ids
elif isinstance(series_ids, (RDD, PandasDataFrame)):
series_df = spark.createDataFrame(series_ids)
elif isinstance(series_ids, dict):
series_df = spark.createDataFrame(pd.DataFrame(series_ids))
else:
series_df = spark.createDataFrame(data=series_ids, schema=series_schema)
# add the series columns to the select expressions
select_exprs += [sfn.col(c) for c in series_df.columns]
# lattice is the cross join of the time range and the series identifiers
lattice_df = lattice_df.crossJoin(series_df)

# set up select expressions for the observation columns
if observation_cols:
# convert to a dict if not already, mapping all columns to "double" types
if not isinstance(observation_cols, dict):
observation_cols = {col: "double" for col in observation_cols}
select_exprs += [sfn.lit(None).cast(coltype).alias(colname)
for colname, coltype in observation_cols.items()]
lattice_df = lattice_df.select(*select_exprs)

# repartition the lattice in a more optimal way
if num_partitions is None:
num_partitions = lattice_df.rdd.getNumPartitions()
if series_df:
sort_cols = series_df.columns + [ts_col]
lattice_df = (lattice_df.repartition(num_partitions, *(series_df.columns))
.sortWithinPartitions(*sort_cols))
else:
lattice_df = lattice_df.repartitionByRange(num_partitions, ts_col)

# construct the appropriate TSDF
return TSDF(lattice_df, ts_col=ts_col, series_ids=series_df.columns)

@classmethod
def fromSubsequenceCol(
cls,
Expand Down
72 changes: 70 additions & 2 deletions python/tempo/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from __future__ import annotations

import logging
import os
import warnings
from typing import Optional, Union, overload

import os
import math
from datetime import datetime as dt, timedelta as td

import pyspark.sql.functions as sfn
from pyspark.sql import SparkSession, DataFrame

from IPython import get_ipython
from IPython.core.display import HTML
from IPython.display import display as ipydisplay
from pandas.core.frame import DataFrame as pandasDataFrame
from pyspark.sql.dataframe import DataFrame

import tempo.resample as t_resample
import tempo.tsdf as t_tsdf
Expand All @@ -26,6 +30,70 @@
"""


def time_range(spark: SparkSession,
start_time: dt,
end_time: Optional[dt] = None,
step_size: Optional[td] = None,
num_intervals: Optional[int] = None,
ts_colname: str = "ts",
include_interval_ends: bool = False) -> DataFrame:
"""
Generate a DataFrame of a range of timestamps with a regular interval,
similar to pandas.date_range, but for Spark DataFrames.
The DataFrame will have a single column named `ts_colname` (default is "ts")
that contains timestamps starting at `start_time` and ending at `end_time`
(if provided), with a step size of `step_size` (if provided) or
`num_intervals` (if provided). At least 2 of the 3 arguments `end_time`,
`step_size`, and `num_intervals` must be provided. The third
argument can be computed based on the other two, if needed. Optionally, the end of
each time interval can be included as a separate column in the DataFrame.
:param spark: SparkSession object
:param start_time: start time of the range
:param end_time: end time of the range (optional)
:param step_size: time step size (optional)
:param num_intervals: number of intervals (optional)
:param ts_colname: name of the timestamp column, default is "ts"
:param include_interval_ends: whether to include the end of each time
interval as a separate column in the DataFrame
:return: DataFrame with a time range of timestamps
"""

# compute step_size if not provided
if not step_size:
# must have both end_time and num_intervals defined
assert end_time and num_intervals, \
"must provide at least 2 of: end_time, step_size, num_intervals"
diff_time = end_time - start_time
step_size = diff_time / num_intervals

# compute the number of intervals if not provided
if not num_intervals:
# must have both end_time and num_intervals defined
assert end_time and step_size, \
"must provide at least 2 of: end_time, step_size, num_intervals"
diff_time = end_time - start_time
num_intervals = math.ceil(diff_time / step_size)

# define expressions for the time range
start_time_expr = sfn.to_timestamp(sfn.lit(str(start_time)))
step_fractional_seconds = step_size.seconds + (step_size.microseconds / 1E6)
interval_expr = sfn.make_dt_interval(days=sfn.lit(step_size.days),
secs=sfn.lit(step_fractional_seconds))

# create the DataFrame
range_df = spark.range(0, num_intervals) \
.withColumn(ts_colname,
start_time_expr + sfn.col("id") * interval_expr)
if include_interval_ends:
interval_end_colname = ts_colname + "_interval_end"
range_df = range_df.withColumn(
interval_end_colname,
start_time_expr + (sfn.col("id") + sfn.lit(1)) * interval_expr)
return range_df.drop("id")


class ResampleWarning(Warning):
"""
This class is a warning that is raised when the interpolate or resample with fill methods are called.
Expand Down

0 comments on commit 5b17a02

Please sign in to comment.