Skip to content

Commit

Permalink
Updated deps
Browse files Browse the repository at this point in the history
  • Loading branch information
Mats E. Mollestad committed Nov 13, 2023
1 parent 363acc9 commit d673dc4
Show file tree
Hide file tree
Showing 24 changed files with 864 additions and 638 deletions.
40 changes: 22 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Aligned

n
Aligned helps improving ML system visibility, while also reducing technical, and data debt, as described in [Sculley et al. [2015]](https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf).

## Docs
Expand Down Expand Up @@ -51,8 +52,8 @@ This makes the features light weight, data source indipendent, and flexible.
@feature_view(
name="passenger",
description="Some features from the titanic dataset",
batch_source=FileSource.csv_at("titanic.csv"),
stream_source=HttpStreamSource(topic_name="titanic")
source=FileSource.csv_at("titanic.csv"),
materialized_source=FileSource.parquet_at("titanic.parquet"),
)
class TitanicPassenger:

Expand All @@ -61,16 +62,15 @@ class TitanicPassenger:
age = (
Float()
.description("A float as some have decimals")
.is_required()
.lower_bound(0)
.upper_bound(110)
)

name = String()
sex = String().accepted_values(["male", "female"])
survived = Bool().description("If the passenger survived")
sibsp = Int32().lower_bound(0, is_inclusive=True).description("Number of siblings on titanic")
cabin = String()
sibsp = Int32().lower_bound(0).description("Number of siblings on titanic")
cabin = String().is_optional()

# Creates two one hot encoded values
is_male, is_female = sex.one_hot_encode(['male', 'female'])
Expand All @@ -81,19 +81,22 @@ class TitanicPassenger:
Alinged makes handling data sources easy, as you do not have to think about how it is done.
Only define where the data is, and we handle the dirty work.

Furthermore, you can also add materialised sources which can be used as intermediate sources.

```python
my_db = PostgreSQLConfig(env_var="DATABASE_URL")
redis = RedisConfig(env_var="REDIS_URL")

@feature_view(
name="passenger",
description="Some features from the titanic dataset",
batch_source=my_db.table(
source=my_db.table(
"passenger",
mapping_keys={
"Passenger_Id": "passenger_id"
}
),
materialized_source=my_db.with_schema("inter").table("passenger"),
stream_source=redis.stream(topic="titanic")
)
class TitanicPassenger:
Expand All @@ -114,24 +117,25 @@ my_db = PostgreSQLConfig.localhost()
aws_bucket = AwsS3Config(...)

@feature_view(
name="some_features",
name="passengers",
description="...",
batch_source=my_db.table("local_features")
source=my_db.table("passengers")
)
class SomeFeatures:
class TitanicPassenger:

passenger_id = Int32().as_entity()

# Some features
...

@feature_view(
name="aws",
description="...",
batch_source=aws_bucket.file_at("path/to/file.parquet")
)
class AwsFeatures:
# Change data source
passenger_view = TitanicPassenger.query()

psql_passengers = await passenger_view.all().to_pandas()
aws_passengers = await passenger_view.using_source(
aws_bucket.parquet_at("passengers.parquet")
).to_pandas()

# Some features
...
```

## Describe Models
Expand Down Expand Up @@ -169,7 +173,7 @@ Therefore, Aligned provides an easy way to check how fresh a source is.
@feature_view(
name="departures",
description="Features related to the departure of a taxi ride",
batch_source=taxi_db.table("departures"),
source=taxi_db.table("departures"),
)
class TaxiDepartures:

Expand Down
28 changes: 20 additions & 8 deletions aligned/compiler/feature_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
LowerBoundInclusive,
MaxLength,
MinLength,
Optional,
Unique,
Regex,
StartsWith,
Expand Down Expand Up @@ -479,9 +480,10 @@ def transform_polars(
return dtype # type: ignore [return-value]

def is_required(self: T) -> T:
from aligned.schemas.constraints import Required
return self

self._add_constraint(Required()) # type: ignore[attr-defined]
def is_optional(self: T) -> T:
self._add_constraint(Optional()) # type: ignore[attr-defined]
return self

def _add_constraint(self, constraint: ConstraintFactory | Constraint) -> None:
Expand Down Expand Up @@ -589,7 +591,7 @@ def upper_bound(self: T, value: float) -> T:


class ArithmeticFeature(ComparableFeature):
def __sub__(self, other: FeatureFactory) -> Float:
def __sub__(self, other: FeatureFactory | Any) -> Float:
from aligned.compiler.transformation_factory import DifferanceBetweenFactory, TimeDifferanceFactory

feature = Float()
Expand All @@ -599,7 +601,14 @@ def __sub__(self, other: FeatureFactory) -> Float:
feature.transformation = DifferanceBetweenFactory(self, other)
return feature

def __add__(self, other: FeatureFactory) -> Float:
def __radd__(self, other: FeatureFactory | Any) -> Float:
from aligned.compiler.transformation_factory import AdditionBetweenFactory

feature = Float()
feature.transformation = AdditionBetweenFactory(self, other)
return feature

def __add__(self, other: FeatureFactory | Any) -> Float:
from aligned.compiler.transformation_factory import AdditionBetweenFactory

feature = Float()
Expand All @@ -616,11 +625,14 @@ def __truediv__(self, other: FeatureFactory | Any) -> Float:
feature.transformation = RatioFactory(self, LiteralValue.from_value(other))
return feature

def __floordiv__(self, other: FeatureFactory) -> Float:
def __floordiv__(self, other: FeatureFactory | Any) -> Float:
from aligned.compiler.transformation_factory import RatioFactory

feature = Float()
feature.transformation = RatioFactory(self, other)
if isinstance(other, FeatureFactory):
feature.transformation = RatioFactory(self, other)
else:
feature.transformation = RatioFactory(self, LiteralValue.from_value(other))
return feature

def __abs__(self) -> Float:
Expand Down Expand Up @@ -667,8 +679,8 @@ def log1p(self) -> Float:
def clip(self: T, lower_bound: float, upper_bound: float) -> T:
from aligned.compiler.transformation_factory import ClipFactory

feature = Float()
feature.transformation = ClipFactory(self, lower_bound, upper_bound)
feature = self.copy_type() # type: ignore
feature.transformation = ClipFactory(self, lower_bound, upper_bound) # type: ignore
return feature


Expand Down
19 changes: 14 additions & 5 deletions aligned/compiler/transformation_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ class RatioFactory(TransformationFactory):

@property
def using_features(self) -> list[FeatureFactory]:
return [self.numerator, self.denumerator]
if isinstance(self.denumerator, FeatureFactory):
return [self.numerator, self.denumerator]
else:
return [self.numerator]

def compile(self) -> Transformation:
from aligned.schemas.transformation import DivideDenumeratorValue, Ratio
Expand Down Expand Up @@ -259,16 +262,22 @@ def compile(self) -> Transformation:
class DifferanceBetweenFactory(TransformationFactory):

first_feature: FeatureFactory
second_feature: FeatureFactory
second_feature: FeatureFactory | LiteralValue

@property
def using_features(self) -> list[FeatureFactory]:
return [self.first_feature, self.second_feature]
if isinstance(self.second_feature, FeatureFactory):
return [self.first_feature, self.second_feature]
else:
return [self.first_feature]

def compile(self) -> Transformation:
from aligned.schemas.transformation import Subtraction
from aligned.schemas.transformation import Subtraction, SubtractionValue

return Subtraction(self.first_feature.name, self.second_feature.name)
if isinstance(self.second_feature, FeatureFactory):
return Subtraction(self.first_feature.name, self.second_feature.name)
else:
return SubtractionValue(self.first_feature.name, self.second_feature)


@dataclass
Expand Down
52 changes: 49 additions & 3 deletions aligned/data_source/batch_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from aligned.schemas.codable import Codable
from aligned.schemas.derivied_feature import DerivedFeature
from aligned.schemas.feature import EventTimestamp, Feature
from aligned.schemas.feature import EventTimestamp, Feature, FeatureLocation

if TYPE_CHECKING:
from aligned.compiler.feature_factory import FeatureFactory
Expand All @@ -29,6 +29,7 @@ def __init__(self) -> None:
from aligned.sources.psql import PostgreSQLDataSource
from aligned.sources.redshift import RedshiftSQLDataSource
from aligned.sources.s3 import AwsS3CsvDataSource, AwsS3ParquetDataSource
from aligned.schemas.feature_view import FeatureViewReferenceSource

source_types = [
PostgreSQLDataSource,
Expand All @@ -40,6 +41,7 @@ def __init__(self) -> None:
RedshiftSQLDataSource,
JoinDataSource,
FilteredDataSource,
FeatureViewReferenceSource,
]

self.supported_data_sources = {source.type_name: source for source in source_types}
Expand Down Expand Up @@ -261,6 +263,40 @@ async def freshness(self, event_timestamp: EventTimestamp) -> datetime | None:

raise NotImplementedError(f'Freshness is not implemented for {type(self)}.')

def filter(self, condition: DerivedFeature | Feature) -> BatchDataSource:
return FilteredDataSource(self, condition)

def join(self, view: Any, on: str | FeatureFactory, how: str = 'inner') -> BatchDataSource:
from aligned.compiler.feature_factory import FeatureFactory
from aligned.data_source.batch_data_source import JoinDataSource
from aligned.feature_view.feature_view import FeatureViewWrapper

if not hasattr(view, '__view_wrapper__'):
raise ValueError(f'Unable to join {view}')

wrapper = getattr(view, '__view_wrapper__')
if not isinstance(wrapper, FeatureViewWrapper):
raise ValueError()

if isinstance(on, FeatureFactory):
on = on.name

compiled_view = wrapper.compile()

request = compiled_view.request_all

return JoinDataSource(
source=self,
right_source=compiled_view.materialized_source or compiled_view.source,
right_request=request.needed_requests[0],
left_on=on,
right_on=on,
method=how,
)

def depends_on(self) -> set[FeatureLocation]:
return set()


@dataclass
class FilteredDataSource(BatchSourceModification, BatchDataSource):
Expand All @@ -281,6 +317,7 @@ def wrap_job(self, job: RetrivalJob) -> RetrivalJob:
class JoinDataSource(BatchSourceModification, BatchDataSource):

source: BatchDataSource
left_request: RetrivalRequest
right_source: BatchDataSource
right_request: RetrivalRequest
left_on: str
Expand All @@ -294,13 +331,22 @@ def job_group_key(self) -> str:

def wrap_job(self, job: RetrivalJob) -> RetrivalJob:

right_job = self.right_source.all_data(self.right_request, limit=None)
return job.join(right_job, self.method, (self.left_on, self.right_on))
right_job = self.right_source.all_data(self.right_request, limit=None).derive_features(
[self.right_request]
)

return job.derive_features([self.left_request]).join(
right_job, self.method, (self.left_on, self.right_on)
)


class ColumnFeatureMappable:
mapping_keys: dict[str, str]

def with_renames(self: T, mapping_keys: dict[str, str]) -> T:
self.mapping_keys = mapping_keys # type: ignore
return self

def columns_for(self, features: list[Feature]) -> list[str]:
return [self.mapping_keys.get(feature.name, feature.name) for feature in features]

Expand Down
8 changes: 5 additions & 3 deletions aligned/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ async def insert_into(
new_df = (await values.to_polars()).select(columns)
try:
existing_df = await source.to_polars()
write_df = pl.concat([new_df, existing_df.select(columns)])
write_df = pl.concat([new_df, existing_df.select(columns)], how='vertical_relaxed')
except UnableToFindFileException:
write_df = new_df
await source.write_polars(write_df)
Expand Down Expand Up @@ -1220,9 +1220,11 @@ def all(self, limit: int | None = None) -> RetrivalJob:
.derive_features(request.needed_requests)
)
if self.feature_filter:
return SelectColumnsJob(include_features=self.feature_filter, job=job)
selected_columns = self.feature_filter
else:
return job
selected_columns = set(request.needed_requests[0].all_returned_columns)

return job.select_columns(selected_columns)

def between_dates(self, start_date: datetime, end_date: datetime) -> RetrivalJob:
if not isinstance(self.source, RangeFeatureSource):
Expand Down
Loading

0 comments on commit d673dc4

Please sign in to comment.