Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metadata cleanup #272

Closed
wants to merge 19 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/dlt_tempo.py
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ def ts_bronze():
@dlt.expect_or_drop("User_check","User in ('a','c','i')")
def ts_ft():
phone_accel_df = dlt.read("ts_bronze")
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", partition_cols = ["User"])
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts")
ts_ft_df = phone_accel_tsdf.fourier_transform(timestep=1, valueCol="x").df
return ts_ft_df

6 changes: 3 additions & 3 deletions examples/financial_services_quickstart.py
Original file line number Diff line number Diff line change
@@ -92,8 +92,8 @@
# DBTITLE 1,Define TSDF Time Series Data Structure
from tempo import *

trades_tsdf = TSDF(trades_df, partition_cols = ['date', 'symbol'], ts_col = 'event_ts')
quotes_tsdf = TSDF(quotes_df, partition_cols = ['date', 'symbol'], ts_col = 'event_ts')
trades_tsdf = TSDF(trades_df, ts_col='event_ts')
quotes_tsdf = TSDF(quotes_df, ts_col='event_ts')

# COMMAND ----------

@@ -178,7 +178,7 @@
from tempo import *
from pyspark.sql.functions import *

minute_bars = TSDF(spark.table("time_test"), partition_cols=['ticker'], ts_col="ts").calc_bars(freq = '1 minute', func= 'ceil')
minute_bars = TSDF(spark.table("time_test"), ts_col="ts").calc_bars(freq ='1 minute', func='ceil')

display(minute_bars)

45 changes: 22 additions & 23 deletions python/README.md
Original file line number Diff line number Diff line change
@@ -51,7 +51,7 @@ phone_accel_df = spark.read.format("csv").option("header", "true").load("dbfs:/h

from tempo import *

phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", partition_cols = ["User"])
phone_accel_tsdf = TSDF(phone_accel_df, ts_col="event_ts", series_ids = ["User"])

display(phone_accel_tsdf)
```
@@ -65,7 +65,7 @@ Note: You can upsample any missing values by using an option in the resample int

```python
# ts_col = timestamp column on which to sort fact and source table
# partition_cols - columns to use for partitioning the TSDF into more granular time series for windowing and sorting
# series_ids - columns to use for partitioning the TSDF into more granular time series for windowing and sorting

resampled_sdf = phone_accel_tsdf.resample(freq='min', func='floor')
resampled_pdf = resampled_sdf.df.filter(col('event_ts').cast("date") == "2015-02-23").toPandas()
@@ -97,7 +97,7 @@ from pyspark.sql.functions import *

watch_accel_df = spark.read.format("csv").option("header", "true").load("dbfs:/home/tempo/Watch_accelerometer").withColumn("event_ts", (col("Arrival_Time").cast("double")/1000).cast("timestamp")).withColumn("x", col("x").cast("double")).withColumn("y", col("y").cast("double")).withColumn("z", col("z").cast("double")).withColumn("event_ts_dbl", col("event_ts").cast("double"))

watch_accel_tsdf = TSDF(watch_accel_df, ts_col="event_ts", partition_cols = ["User"])
watch_accel_tsdf = TSDF(watch_accel_df, ts_col="event_ts", series_ids = ["User"])

# Applying AS OF join to TSDF datasets
joined_df = watch_accel_tsdf.asofJoin(phone_accel_tsdf, right_prefix="phone_accel")
@@ -107,12 +107,12 @@ display(joined_df)

#### 3. Skew Join Optimized AS OF Join

The purpose of the skew optimized as of join is to bucket each set of `partition_cols` to get the latest source record merged onto the fact table
The purpose of the skew optimized as of join is to bucket each set of `series_ids` to get the latest source record merged onto the fact table

Parameters:

ts_col = timestamp column for sorting
partition_cols = partition columns for defining granular time series for windowing and sorting
series_ids = partition columns for defining granular time series for windowing and sorting
tsPartitionVal = value to break up each partition into time brackets
fraction = overlap fraction
right_prefix = prefix used for source columns when merged into fact table
@@ -185,11 +185,10 @@ Valid columns data types for interpolation are: `["int", "bigint", "float", "dou
```python
# Create instance of the TSDF class
input_tsdf = TSDF(
input_df,
partition_cols=["partition_a", "partition_b"],
ts_col="event_ts",
)

input_df,
series_ids=["partition_a", "partition_b"],
ts_col="event_ts",
)

# What the following chain of operation does is:
# 1. Aggregate all valid numeric columns using mean into 30 second intervals
@@ -205,32 +204,32 @@ interpolated_tsdf = input_tsdf.resample(freq="30 seconds", func="mean").interpol
interpolated_tsdf = input_tsdf.interpolate(
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
target_cols=["columnA", "columnB"],
method="linear"

)

# Alternatively it's also possible to override default TSDF parameters.
# e.g. partition_cols, ts_col a
# e.g. series_ids, ts_col a
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
series_ids=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
target_cols= ["columnA","columnB"],
method="linear"
freq = "30 seconds",
func = "mean",
target_cols = ["columnA", "columnB"],
method = "linear"
)

# The show_interpolated flag can be set to `True` to show additional boolean columns
# for a given row that shows if a column has been interpolated.
interpolated_tsdf = input_tsdf.interpolate(
partition_cols=["partition_c"],
series_ids=["partition_c"],
ts_col="other_event_ts"
freq="30 seconds",
func="mean",
method="linear",
target_cols= ["columnA","columnB"],
show_interpolated=True,
freq = "30 seconds",
func = "mean",
method = "linear",
target_cols = ["columnA", "columnB"],
show_interpolated = True,
)

```
Loading