Skip to content

Commit

Permalink
Merge pull request #23 from MatsMoll/matsei/feature_wrapper_conveniance
Browse files Browse the repository at this point in the history
feat: added some conveniance methods for feature_wrappers
  • Loading branch information
MatsMoll authored Dec 9, 2023
2 parents 07bfb9b + fca10e2 commit 9cae6db
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 31 deletions.
28 changes: 26 additions & 2 deletions aligned/compiler/feature_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,10 +642,10 @@ def __floordiv__(self, other: FeatureFactory | Any) -> Float:
feature.transformation = RatioFactory(self, LiteralValue.from_value(other))
return feature

def __abs__(self) -> Float:
def __abs__(self) -> Int64:
from aligned.compiler.transformation_factory import AbsoluteFactory

feature = Float()
feature = Int64()
feature.transformation = AbsoluteFactory(self)
return feature

Expand Down Expand Up @@ -850,6 +850,30 @@ def aggregate(self) -> ArithmeticAggregation:
return ArithmeticAggregation(self)


class Int8(ArithmeticFeature, CouldBeEntityFeature, CouldBeModelVersion):
def copy_type(self) -> Int8:
return Int8()

@property
def dtype(self) -> FeatureType:
return FeatureType.int8()

def aggregate(self) -> ArithmeticAggregation:
return ArithmeticAggregation(self)


class Int16(ArithmeticFeature, CouldBeEntityFeature, CouldBeModelVersion):
def copy_type(self) -> Int16:
return Int16()

@property
def dtype(self) -> FeatureType:
return FeatureType.int16()

def aggregate(self) -> ArithmeticAggregation:
return ArithmeticAggregation(self)


class Int32(ArithmeticFeature, CouldBeEntityFeature, CouldBeModelVersion):
def copy_type(self) -> Int32:
return Int32()
Expand Down
4 changes: 3 additions & 1 deletion aligned/data_source/batch_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ def multi_source_features_for(

source, _ = requests[0]
if isinstance(source, BatchSourceModification):
return source.wrap_job(type(source.source).multi_source_features_for(facts, requests))
return source.wrap_job(
type(source.source).multi_source_features_for(facts, requests) # type: ignore
)
elif isinstance(source, DataFileReference):
from aligned.local.job import FileFactualJob

Expand Down
35 changes: 35 additions & 0 deletions aligned/feature_view/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import copy
import logging
import polars as pl
import pandas as pd

from abc import ABC, abstractproperty
from dataclasses import dataclass, field
Expand All @@ -24,6 +26,7 @@
resolve_keys,
)
from aligned.data_source.stream_data_source import StreamDataSource
from aligned.retrival_job import ConvertableToRetrivalJob, RetrivalJob
from aligned.schemas.derivied_feature import (
AggregatedFeature,
)
Expand All @@ -34,10 +37,13 @@
if TYPE_CHECKING:
from aligned.feature_store import FeatureViewStore
from datetime import datetime
from aligned.validation.interface import Validator

# Enables code compleation in the select method
T = TypeVar('T')

ConvertableData = TypeVar('ConvertableData', dict, pl.DataFrame, pd.DataFrame)


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -330,6 +336,35 @@ class MyView:
compiled = self.compile()
return await FeatureView.freshness_in_source(compiled, compiled.source)

def from_data(self, data: ConvertableToRetrivalJob) -> RetrivalJob:
request = self.compile().request_all
return RetrivalJob.from_convertable(data, request)

def drop_invalid(self, data: ConvertableData, validator: Validator | None = None) -> ConvertableData:
from aligned.retrival_job import DropInvalidJob

if not validator:
from aligned.validation.pandera import PanderaValidator

validator = PanderaValidator()

features = list(DropInvalidJob.features_to_validate(self.compile().request_all.needed_requests))

if isinstance(data, dict):
validate_data = pd.DataFrame(data)
else:
validate_data = data

if isinstance(validate_data, pl.DataFrame):
return validator.validate_polars(features, validate_data.lazy()).collect()
elif isinstance(validate_data, pd.DataFrame):
validated = validator.validate_pandas(features, validate_data)
if isinstance(data, dict):
return validated.to_dict(orient='list')
return validated # type: ignore
else:
raise ValueError(f'Invalid data type: {type(data)}')


class FeatureView(ABC):
"""
Expand Down
63 changes: 63 additions & 0 deletions aligned/feature_view/tests/test_joined_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pytest
from aligned import feature_view, Int32, FileSource
import polars as pl


@feature_view(name='left', source=FileSource.csv_at('some_file.csv'))
class LeftData:

some_id = Int32().as_entity()

feature = Int32()


@feature_view(name='right', source=FileSource.csv_at('some_file.csv'))
class RightData:

some_id = Int32().as_entity()

other_feature = Int32()


@pytest.mark.asyncio
async def test_join_different_types_polars() -> None:

left_data = LeftData.from_data( # type: ignore
pl.DataFrame(
{'some_id': [1, 2, 3], 'feature': [2, 3, 4]}, schema={'some_id': pl.Int8, 'feature': pl.Int32}
)
)

right_data = RightData.from_data( # type: ignore
pl.DataFrame(
{'some_id': [1, 3, 2], 'other_feature': [3, 4, 5]},
schema={'some_id': pl.Int16, 'other_feature': pl.Int32},
)
)

expected_df = pl.DataFrame(
data={'some_id': [1, 2, 3], 'feature': [2, 3, 4], 'other_feature': [3, 5, 4]},
schema={
'some_id': pl.Int32,
'feature': pl.Int32,
'other_feature': pl.Int32,
},
)

new_data = left_data.join(right_data, 'inner', left_on='some_id', right_on='some_id')
result = await new_data.to_polars()

joined = result.collect().sort('some_id', descending=False)
assert joined.frame_equal(expected_df.select(joined.columns))


@pytest.mark.asyncio
async def test_unique_entities() -> None:

left_data = LeftData.from_data( # type: ignore
pl.DataFrame(
{'some_id': [1, 3, 3], 'feature': [2, 3, 4]}, schema={'some_id': pl.Int8, 'feature': pl.Int32}
)
)

left_data.unique_entities()
76 changes: 65 additions & 11 deletions aligned/retrival_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,10 @@ def derive_features(self, requests: list[RetrivalRequest] | None = None) -> Retr
def combined_features(self, requests: list[RetrivalRequest] | None = None) -> RetrivalJob:
return CombineFactualJob([self], requests or self.retrival_requests)

def ensure_types(self, requests: list[RetrivalRequest]) -> RetrivalJob:
def ensure_types(self, requests: list[RetrivalRequest] | None = None) -> RetrivalJob:
if not requests:
requests = self.retrival_requests

return EnsureTypesJob(job=self, requests=requests)

def select_columns(self, include_features: set[str]) -> RetrivalJob:
Expand All @@ -392,6 +395,14 @@ def update_vector_index(self, indexes: list[VectorIndex]) -> RetrivalJob:
def validate_entites(self) -> RetrivalJob:
return ValidateEntitiesJob(self)

def unique_on(self, unique_on: list[str], sort_key: str | None = None) -> RetrivalJob:
return UniqueRowsJob(job=self, unique_on=unique_on, sort_key=sort_key)

def unique_entities(self) -> RetrivalJob:
request = self.request_result

return self.unique_on(unique_on=request.entity_columns, sort_key=request.event_timestamp)

def fill_missing_columns(self) -> RetrivalJob:
return FillMissingColumnsJob(self)

Expand Down Expand Up @@ -585,6 +596,29 @@ async def to_polars(self) -> pl.LazyFrame:
left = await self.left_job.to_polars()
right = await self.right_job.to_polars()

return_request = self.left_job.request_result

# Need to ensure that the data types are the same. Otherwise will the join fail
for left_col, right_col in zip(self.left_on, self.right_on):
polars_type = [
feature
for feature in return_request.features.union(return_request.entities)
if feature.name == left_col
]
if not polars_type:
raise ValueError(f'Unable to find {left_col} in left request {return_request}.')

polars_type = polars_type[0].dtype.polars_type

left_column_dtypes = dict(zip(left.columns, left.dtypes))
right_column_dtypes = dict(zip(right.columns, right.dtypes))

if not left_column_dtypes[left_col].is_(polars_type):
left = left.with_columns(pl.col(left_col).cast(polars_type))

if not right_column_dtypes[right_col].is_(polars_type):
right = right.with_columns(pl.col(right_col).cast(polars_type))

return left.join(right, left_on=self.left_on, right_on=self.right_on, how=self.method)

def log_each_job(self) -> RetrivalJob:
Expand Down Expand Up @@ -816,20 +850,21 @@ def request_result(self) -> RequestResult:
def retrival_requests(self) -> list[RetrivalRequest]:
return self.job.retrival_requests

@property
def features_to_validate(self) -> set[Feature]:
return RequestResult.from_request_list(
[request for request in self.retrival_requests if not request.aggregated_features]
).features
@staticmethod
def features_to_validate(retrival_requests: list[RetrivalRequest]) -> set[Feature]:
result = RequestResult.from_request_list(
[request for request in retrival_requests if not request.aggregated_features]
)
return result.features.union(result.entities)

async def to_pandas(self) -> pd.DataFrame:
return await self.validator.validate_pandas(
list(self.features_to_validate), await self.job.to_pandas()
return self.validator.validate_pandas(
list(DropInvalidJob.features_to_validate(self.retrival_requests)), await self.job.to_pandas()
)

async def to_polars(self) -> pl.LazyFrame:
return await self.validator.validate_polars(
list(self.features_to_validate), await self.job.to_polars()
return self.validator.validate_polars(
list(DropInvalidJob.features_to_validate(self.retrival_requests)), await self.job.to_polars()
)

def with_subfeatures(self) -> RetrivalJob:
Expand Down Expand Up @@ -918,6 +953,25 @@ def remove_derived_features(self) -> RetrivalJob:
return self.job.remove_derived_features()


@dataclass
class UniqueRowsJob(RetrivalJob, ModificationJob):

job: RetrivalJob
unique_on: list[str]
sort_key: str | None = field(default=None)

async def to_pandas(self) -> pd.DataFrame:
return (await self.to_polars()).collect().to_pandas()

async def to_polars(self) -> pl.LazyFrame:
data = await self.job.to_polars()

if self.sort_key:
data = data.sort(self.sort_key, descending=True)

return data.unique(self.unique_on, keep='first').lazy()


@dataclass
class ValidateEntitiesJob(RetrivalJob, ModificationJob):

Expand All @@ -932,7 +986,7 @@ async def to_pandas(self) -> pd.DataFrame:

return data

async def to_polars(self) -> pl.DataFrame:
async def to_polars(self) -> pl.LazyFrame:
data = await self.job.to_polars()

for request in self.retrival_requests:
Expand Down
24 changes: 23 additions & 1 deletion aligned/schemas/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ class FeatureType(Codable):

@property
def is_numeric(self) -> bool:
return self.name in {'bool', 'int32', 'int64', 'float', 'double'} # Can be represented as an int
return self.name in {
'bool',
'int8',
'int16',
'int32',
'int64',
'float',
'double',
} # Can be represented as an int

@property
def python_type(self) -> type:
Expand All @@ -107,6 +115,8 @@ def python_type(self) -> type:

return {
'string': str,
'int8': int,
'int16': int,
'int32': int,
'int64': int,
'float': float,
Expand All @@ -127,6 +137,8 @@ def pandas_type(self) -> str | type:

return {
'string': str,
'int8': 'Int8',
'int16': 'Int16',
'int32': 'Int32',
'int64': 'Int64',
'float': np.float64,
Expand All @@ -149,6 +161,8 @@ def polars_type(self) -> type:
def feature_factory(self) -> ff.FeatureFactory:
return {
'string': ff.String(),
'int8': ff.Int8(),
'int16': ff.Int16(),
'int32': ff.Int32(),
'int64': ff.Int64(),
'float': ff.Float(),
Expand Down Expand Up @@ -186,6 +200,14 @@ def from_polars(polars_type: pl.DataType) -> FeatureType:
def string() -> FeatureType:
return FeatureType(name='string')

@staticmethod
def int8() -> FeatureType:
return FeatureType(name='int8')

@staticmethod
def int16() -> FeatureType:
return FeatureType(name='int16')

@staticmethod
def int32() -> FeatureType:
return FeatureType(name='int32')
Expand Down
Loading

0 comments on commit 9cae6db

Please sign in to comment.