Skip to content

Commit

Permalink
Fixed bug when loading a source with optional columns
Browse files Browse the repository at this point in the history
  • Loading branch information
MatsMoll committed Mar 11, 2024
1 parent 413655e commit da9757c
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 143 deletions.
74 changes: 52 additions & 22 deletions aligned/local/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from aligned.schemas.date_formatter import DateFormatter
from aligned.schemas.feature import Feature, FeatureType
from aligned.sources.local import DataFileReference
from aligned.schemas.constraints import Optional


class LiteralRetrivalJob(RetrivalJob):
Expand Down Expand Up @@ -166,26 +167,6 @@ def retrival_requests(self) -> list[RetrivalRequest]:
def describe(self) -> str:
return f'Reading everything form file {self.source}.'

def file_transformations(self, df: pd.DataFrame) -> pd.DataFrame:
from aligned.data_source.batch_data_source import ColumnFeatureMappable

entity_names = self.request.entity_names
all_names = list(self.request.all_required_feature_names.union(entity_names))

request_features = all_names
if isinstance(self.source, ColumnFeatureMappable):
request_features = self.source.feature_identifier_for(all_names)

columns = dict(zip(request_features, all_names))
df = df.rename(
columns=columns,
)

if self.limit and df.shape[0] > self.limit:
return df.iloc[: self.limit]
else:
return df

async def file_transform_polars(self, df: pl.LazyFrame) -> pl.LazyFrame:
from aligned.data_source.batch_data_source import ColumnFeatureMappable

Expand All @@ -198,13 +179,31 @@ async def file_transform_polars(self, df: pl.LazyFrame) -> pl.LazyFrame:
all_names = list(self.request.all_required_feature_names.union(entity_names))

request_features = all_names

feature_column_map = {}
if isinstance(self.source, ColumnFeatureMappable):
request_features = self.source.feature_identifier_for(all_names)
feature_column_map = dict(zip(all_names, request_features))

renames = {
org_name: wanted_name
for org_name, wanted_name in zip(request_features, all_names)
if org_name != wanted_name
}

optional_constraint = Optional()
optional_features = [
feature
for feature in self.request.features
if (
feature.constraints
and optional_constraint in feature.constraints
and feature_column_map.get(feature.name, feature.name) not in df.columns
)
]
if optional_features:
df = df.with_columns([pl.lit(None).alias(feature.name) for feature in optional_features])

df = df.rename(mapping=renames)
df = decode_timestamps(df, self.request, self.date_formatter)

Expand Down Expand Up @@ -279,8 +278,23 @@ def file_transform_polars(self, df: pl.LazyFrame) -> pl.LazyFrame:
raise ValueError(f'Source {self.source} have no event timestamp to filter on')

request_features = all_names
feature_column_map = {}
if isinstance(self.source, ColumnFeatureMappable):
request_features = self.source.feature_identifier_for(all_names)
feature_column_map = dict(zip(all_names, request_features))

optional_constraint = Optional()
optional_features = [
feature
for feature in self.request.features
if (
feature.constraints
and optional_constraint in feature.constraints
and feature_column_map.get(feature.name, feature.name) not in df.columns
)
]
if optional_features:
df = df.with_columns([pl.lit(None).alias(feature.name) for feature in optional_features])

df = df.rename(mapping=dict(zip(request_features, all_names)))
event_timestamp_column = self.request.event_timestamp.name
Expand All @@ -289,8 +303,7 @@ def file_transform_polars(self, df: pl.LazyFrame) -> pl.LazyFrame:
return df.filter(pl.col(event_timestamp_column).is_between(self.start_date, self.end_date))

async def to_pandas(self) -> pd.DataFrame:
file = await self.source.read_pandas()
return self.file_transformations(file)
return (await self.to_lazy_polars()).collect().to_pandas()

async def to_lazy_polars(self) -> pl.LazyFrame:
file = await self.source.to_lazy_polars()
Expand Down Expand Up @@ -392,6 +405,23 @@ async def file_transformations(self, df: pl.LazyFrame) -> pl.LazyFrame:
entity_names = request.entity_names
all_names = request.all_required_feature_names.union(entity_names)

request_features = list(all_names)
feature_column_map = {}
if isinstance(self.source, ColumnFeatureMappable):
request_features = self.source.feature_identifier_for(list(all_names))
feature_column_map = dict(zip(all_names, request_features))

optional_constraint = Optional()
optional_features = [
feature
for feature in request.features
if feature.constraints is not None
and optional_constraint in feature.constraints
and feature_column_map.get(feature.name, feature.name) not in df.columns
]
if optional_features:
df = df.with_columns([pl.lit(None).alias(feature.name) for feature in optional_features])

for derived_feature in request.derived_features:
if derived_feature.name in df.columns:
all_names.add(derived_feature.name)
Expand Down
31 changes: 30 additions & 1 deletion aligned/sources/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import polars as pl
from pathlib import Path

from aligned import FeatureStore, FileSource
from aligned import FeatureStore, FileSource, feature_view, Int32
from aligned.feature_view.feature_view import FeatureView
from aligned.schemas.date_formatter import DateFormatter
from conftest import DataTest
Expand Down Expand Up @@ -147,3 +147,32 @@ async def test_read_csv(point_in_time_data_test: DataTest) -> None:
stored = await store.feature_view(compiled.name).all().to_polars()
df = stored.select(source.data.columns)
assert df.equals(source.data)


@pytest.mark.asyncio
async def test_read_optional_csv() -> None:

source = FileSource.csv_at('test_data/optional_test.csv')
df = pl.DataFrame(
{
'a': [1, 2, 3],
'c': [1, 2, 3],
}
)
await source.write_polars(df.lazy())

@feature_view(name='test', source=source)
class Test:
a = Int32().as_entity()
b = Int32().is_optional()
c = Int32()

filled = b.fill_na(0)

expected_df = df.with_columns(pl.lit(None).alias('b'), pl.lit(0).alias('filled'))
loaded = await Test.query().all().to_polars()

assert loaded.equals(expected_df.select(loaded.columns))

facts = await Test.query().features_for({'a': [2]}).to_polars()
assert expected_df.filter(pl.col('a') == 2).equals(facts.select(expected_df.columns))
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aligned"
version = "0.0.77"
version = "0.0.78"
description = "A data managment and lineage tool for ML applications."
authors = ["Mats E. Mollestad <[email protected]>"]
license = "Apache-2.0"
Expand Down
14 changes: 7 additions & 7 deletions test_data/credit_history.csv
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
due_sum,student_loan_due,credit_card_due,event_timestamp,bankruptcies,dob_ssn
30747,22328,8419,1587924064746575,0,19530219_5179
5459,2515,2944,1587924064746575,0,19520816_8737
33833,33000,833,1587924064746575,0,19860413_2537
54891,48955,5936,1588010464746575,0,19530219_5179
11076,9501,1575,1588010464746575,0,19520816_8737
41773,35510,6263,1588010464746575,0,19860413_2537
credit_card_due,dob_ssn,due_sum,bankruptcies,student_loan_due,event_timestamp
8419,19530219_5179,30747,0,22328,1587924064746575
2944,19520816_8737,5459,0,2515,1587924064746575
833,19860413_2537,33833,0,33000,1587924064746575
5936,19530219_5179,54891,0,48955,1588010464746575
1575,19520816_8737,11076,0,9501,1588010464746575
6263,19860413_2537,41773,0,35510,1588010464746575
Binary file modified test_data/credit_history_mater.parquet
Binary file not shown.
2 changes: 1 addition & 1 deletion test_data/feature-store.json

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions test_data/loan.csv
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
loan_id,loan_amount,event_timestamp,loan_status,personal_income
10000,35000,1587924064746575,True,59000
10001,1000,1587924064746575,False,9600
10002,5500,1587924064746575,True,9600
10000,35000,1588010464746575,True,65500
10001,35000,1588010464746575,True,54400
10002,2500,1588010464746575,True,9900
loan_amount,loan_id,event_timestamp,personal_income,loan_status
35000,10000,1587924064746575,59000,True
1000,10001,1587924064746575,9600,False
5500,10002,1587924064746575,9600,True
35000,10000,1588010464746575,65500,True
35000,10001,1588010464746575,54400,True
2500,10002,1588010464746575,9900,True
4 changes: 4 additions & 0 deletions test_data/optional_test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
a,c
1,1
2,2
3,3
Binary file modified test_data/test_model.parquet
Binary file not shown.
2 changes: 1 addition & 1 deletion test_data/titanic-sets.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"raw_data": [], "train_test": [], "train_test_validation": [{"id": "titanic_test", "name": null, "request_result": {"entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}]}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}]}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "optional"}, {"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 20.0}]}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}, {"name": "in_domain", "values": ["male", "female"]}]}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}], "event_timestamp": null}, "train_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-train.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}, "formatter": {"date_format": "yyyy-MM-ddTHH:mm:ssZ", "time_unit": null, "time_zone": null, "name": "string_form"}}, "test_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-test.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}, "formatter": {"date_format": "yyyy-MM-ddTHH:mm:ssZ", "time_unit": null, "time_zone": null, "name": "string_form"}}, "validation_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-validate.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}, "formatter": {"date_format": "yyyy-MM-ddTHH:mm:ssZ", "time_unit": null, "time_zone": null, "name": "string_form"}}, "train_size_fraction": 0.6, "test_size_fraction": 0.20000000000000007, "validate_size_fraction": 0.19999999999999996, "target": ["survived"], "description": null, "tags": null}], "active_learning": []}
{"raw_data": [], "train_test": [], "train_test_validation": [{"id": "titanic_test", "name": null, "request_result": {"entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}]}, {"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}]}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "optional"}, {"name": "upper_bound_inc", "value": 20.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}, {"name": "optional"}]}], "event_timestamp": null}, "train_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-train.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}, "formatter": {"date_format": "yyyy-MM-ddTHH:mm:ssZ", "time_unit": null, "time_zone": null, "name": "string_form"}}, "test_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-test.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}, "formatter": {"date_format": "yyyy-MM-ddTHH:mm:ssZ", "time_unit": null, "time_zone": null, "name": "string_form"}}, "validation_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-validate.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}, "formatter": {"date_format": "yyyy-MM-ddTHH:mm:ssZ", "time_unit": null, "time_zone": null, "name": "string_form"}}, "train_size_fraction": 0.6, "test_size_fraction": 0.20000000000000007, "validate_size_fraction": 0.19999999999999996, "target": ["survived"], "description": null, "tags": null}], "active_learning": []}
42 changes: 21 additions & 21 deletions test_data/titanic-test.csv
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
age,cabin,name,sibsp,survived,sex,has_siblings,is_male,is_mr,is_female,passenger_id
22.0,,"Sirayanian, Mr. Orsen",0,False,male,False,True,True,False,61
38.0,B28,"Icard, Miss. Amelie",0,True,female,False,False,False,True,62
45.0,C83,"Harris, Mr. Henry Birkhardt",1,False,male,True,True,True,False,63
4.0,,"Skoog, Master. Harald",3,False,male,True,True,False,False,64
,,"Stewart, Mr. Albert A",0,False,male,False,True,True,False,65
,,"Moubarek, Master. Gerios",1,True,male,True,True,False,False,66
29.0,F33,"Nye, Mrs. (Elizabeth Ramell)",0,True,female,False,False,True,True,67
19.0,,"Crease, Mr. Ernest James",0,False,male,False,True,True,False,68
17.0,,"Andersson, Miss. Erna Alexandra",4,True,female,True,False,False,True,69
26.0,,"Kink, Mr. Vincenz",2,False,male,True,True,True,False,70
32.0,,"Jenkin, Mr. Stephen Curnow",0,False,male,False,True,True,False,71
16.0,,"Goodwin, Miss. Lillian Amy",5,False,female,True,False,False,True,72
21.0,,"Hood, Mr. Ambrose Jr",0,False,male,False,True,True,False,73
26.0,,"Chronopoulos, Mr. Apostolos",1,False,male,True,True,True,False,74
32.0,,"Bing, Mr. Lee",0,True,male,False,True,True,False,75
25.0,F G73,"Moen, Mr. Sigurd Hansen",0,False,male,False,True,True,False,76
,,"Staneff, Mr. Ivan",0,False,male,False,True,True,False,77
,,"Moutal, Mr. Rahamin Haim",0,False,male,False,True,True,False,78
0.83,,"Caldwell, Master. Alden Gates",0,True,male,False,True,False,False,79
30.0,,"Dowdell, Miss. Elizabeth",0,True,female,False,False,False,True,80
is_mr,cabin,age,is_female,has_siblings,is_male,name,survived,sibsp,sex,passenger_id
True,,22.0,False,False,True,"Sirayanian, Mr. Orsen",False,0,male,61
False,B28,38.0,True,False,False,"Icard, Miss. Amelie",True,0,female,62
True,C83,45.0,False,True,True,"Harris, Mr. Henry Birkhardt",False,1,male,63
False,,4.0,False,True,True,"Skoog, Master. Harald",False,3,male,64
True,,,False,False,True,"Stewart, Mr. Albert A",False,0,male,65
False,,,False,True,True,"Moubarek, Master. Gerios",True,1,male,66
True,F33,29.0,True,False,False,"Nye, Mrs. (Elizabeth Ramell)",True,0,female,67
True,,19.0,False,False,True,"Crease, Mr. Ernest James",False,0,male,68
False,,17.0,True,True,False,"Andersson, Miss. Erna Alexandra",True,4,female,69
True,,26.0,False,True,True,"Kink, Mr. Vincenz",False,2,male,70
True,,32.0,False,False,True,"Jenkin, Mr. Stephen Curnow",False,0,male,71
False,,16.0,True,True,False,"Goodwin, Miss. Lillian Amy",False,5,female,72
True,,21.0,False,False,True,"Hood, Mr. Ambrose Jr",False,0,male,73
True,,26.0,False,True,True,"Chronopoulos, Mr. Apostolos",False,1,male,74
True,,32.0,False,False,True,"Bing, Mr. Lee",True,0,male,75
True,F G73,25.0,False,False,True,"Moen, Mr. Sigurd Hansen",False,0,male,76
True,,,False,False,True,"Staneff, Mr. Ivan",False,0,male,77
True,,,False,False,True,"Moutal, Mr. Rahamin Haim",False,0,male,78
False,,0.83,False,False,True,"Caldwell, Master. Alden Gates",True,0,male,79
False,,30.0,True,False,False,"Dowdell, Miss. Elizabeth",True,0,female,80
Loading

0 comments on commit da9757c

Please sign in to comment.