Skip to content

Commit

Permalink
A lot of improvments
Browse files Browse the repository at this point in the history
  • Loading branch information
MatsMoll committed Apr 13, 2024
1 parent bd2a12a commit a03b3ef
Show file tree
Hide file tree
Showing 39 changed files with 381 additions and 367 deletions.
3 changes: 2 additions & 1 deletion aligned/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from aligned.compiler.model import model_contract, FeatureInputVersions
from aligned.data_source.stream_data_source import HttpStreamSource
from aligned.data_source.batch_data_source import CustomMethodDataSource
from aligned.feature_store import FeatureStore
from aligned.feature_store import ContractStore, FeatureStore
from aligned.feature_view import feature_view, combined_feature_view, check_schema
from aligned.schemas.text_vectoriser import EmbeddingModel
from aligned.sources.kafka import KafkaConfig
Expand All @@ -36,6 +36,7 @@
from aligned.schemas.feature import FeatureLocation

__all__ = [
'ContractStore',
'FeatureStore',
'feature_view',
# Batch Data sources
Expand Down
133 changes: 2 additions & 131 deletions aligned/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import sys
from contextlib import suppress
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from typing import Any
Expand All @@ -12,8 +11,6 @@
from pytz import utc # type: ignore

from aligned.compiler.repo_reader import RepoReader, RepoReference
from aligned.schemas.codable import Codable
from aligned.schemas.feature import Feature
from aligned.worker import StreamWorker
from collections.abc import Callable
from datetime import datetime
Expand Down Expand Up @@ -263,132 +260,6 @@ async def serve_worker_command(repo_path: str, worker_path: str, env_file: str)
await worker.start()


@dataclass
class CategoricalFeatureSummary(Codable):
missing_percentage: float
unique_values: int
values: list[str]
value_count: list[int]


@dataclass
class NumericFeatureSummary(Codable):
missing_percentage: float
mean: float | None
median: float | None
std: float | None
lowest: float | None
highests: float | None
histogram_count: list[int]
histogram_splits: list[float]


@dataclass
class ProfilingResult(Codable):
numeric_features: dict[str, NumericFeatureSummary]
categorical_features: dict[str, CategoricalFeatureSummary]


# Should add some way of profiling models, not feature views.
# Or maybe both
@cli.command('profile')
@coro
@click.option(
'--repo-path',
default='.',
help='The path to the repo',
)
@click.option(
'--reference-file',
default='feature_store_location.py',
help='The file defining where to read the feature store from',
)
@click.option('--output', default='profiling-result.json')
@click.option('--dataset-size', default=10000)
@click.option(
'--env-file',
default='.env',
help='The path to env variables',
)
async def profile(repo_path: str, reference_file: str, env_file: str, output: str, dataset_size: int) -> None:
import numpy as np
from pandas import DataFrame

from aligned import FeatureStore

# Make sure modules can be read, and that the env is set
dir = Path.cwd() if repo_path == '.' else Path(repo_path).absolute()
sys.path.append(str(dir))
env_file_path = dir / env_file
load_envs(env_file_path)

online_store: FeatureStore = await FeatureStore.from_reference_at_path(repo_path, reference_file)
feature_store = online_store.offline_store()

results = ProfilingResult(numeric_features={}, categorical_features={})

for feature_view_name in sorted(feature_store.feature_views.keys()):
click.echo(f'Profiling: {feature_view_name}')
feature_view = feature_store.feature_view(feature_view_name)
data_set: DataFrame = feature_view.all(limit=dataset_size).to_pandas()

all_features: list[Feature] = list(feature_view.view.features) + list(
feature_view.view.derived_features
)
for feature in all_features:

data_slice = data_set[feature.name]

reference = f'{feature_view_name}:{feature.name}'

if (not feature.dtype.is_numeric) or feature.dtype.name == 'bool':
unique_values = data_slice.unique()
filter_unique_nan_values = [
value
for value in unique_values
if not (
str(value).lower() == 'nan' or str(value).lower() == 'nat' or str(value) == '<NA>'
)
]

results.categorical_features[reference] = CategoricalFeatureSummary(
missing_percentage=(data_slice.isna() | data_slice.isnull()).sum() / data_slice.shape[0],
unique_values=unique_values.shape[0],
values=[str(value) for value in filter_unique_nan_values],
value_count=data_slice.value_counts()[filter_unique_nan_values].tolist(),
)
else:
description = data_slice.describe()
n_bins = np.min([50, len(data_slice.unique())])
max_value = description['max']
min_value = description['min']

if np.isnan(max_value):
continue

width = (max_value - min_value) / n_bins

if width <= 0:
histogram = [description['count']]
cuts = []
else:
cuts = np.arange(start=min_value, stop=max_value + width, step=width)
histogram, _ = np.histogram(data_slice.loc[~data_slice.isna()].values, cuts)

results.numeric_features[reference] = NumericFeatureSummary(
missing_percentage=(data_slice.isna() | data_slice.isnull()).sum() / data_slice.shape[0],
mean=description['mean'] if not np.isnan(description['mean']) else None,
median=description['50%'] if not np.isnan(description['50%']) else None,
std=description['std'] if not np.isnan(description['std']) else None,
lowest=description['min'] if not np.isnan(description['min']) else None,
highests=description['max'] if not np.isnan(description['max']) else None,
histogram_count=list(histogram),
histogram_splits=list(cuts),
)

Path(output).write_bytes(results.to_json().encode('utf-8'))


@cli.command('create-indexes')
@coro
@click.option(
Expand All @@ -407,7 +278,7 @@ async def profile(repo_path: str, reference_file: str, env_file: str, output: st
help='The path to env variables',
)
async def create_indexes(repo_path: str, reference_file: str, env_file: str) -> None:
from aligned import FeatureStore, FileSource
from aligned import ContractStore, FileSource

setup_logger()

Expand All @@ -432,7 +303,7 @@ async def create_indexes(repo_path: str, reference_file: str, env_file: str) ->
click.echo(f'No repo file found at {dir}. Returning without creating indexes')
return

feature_store = FeatureStore.from_definition(repo_def)
feature_store = ContractStore.from_definition(repo_def)

for feature_view_name in sorted(feature_store.feature_views.keys()):
view = feature_store.feature_views[feature_view_name]
Expand Down
54 changes: 27 additions & 27 deletions aligned/compiler/aggregation_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aligned.compiler.feature_factory import (
AggregationTransformationFactory,
FeatureFactory,
FeatureReferance,
FeatureReference,
String,
TransformationFactory,
)
Expand All @@ -15,8 +15,8 @@


def aggregate_over(
group_by: list[FeatureReferance],
time_column: FeatureReferance | None,
group_by: list[FeatureReference],
time_column: FeatureReference | None,
time_window: timedelta | None,
every_interval: timedelta | None,
offset_interval: timedelta | None,
Expand Down Expand Up @@ -54,12 +54,12 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import ConcatStringAggregation

return ConcatStringAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
separator=self.separator or '',
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -82,11 +82,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import SumAggregation

return SumAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -109,11 +109,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import MeanAggregation

return MeanAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -136,11 +136,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import MinAggregation

return MinAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -163,11 +163,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import MaxAggregation

return MaxAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -190,11 +190,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import CountAggregation

return CountAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -217,11 +217,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import CountDistinctAggregation

return CountDistinctAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -244,11 +244,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import StdAggregation

return StdAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -271,11 +271,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import VarianceAggregation

return VarianceAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -298,11 +298,11 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import MedianAggregation

return MedianAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -326,12 +326,12 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import PercentileAggregation

return PercentileAggregation(
key=self.feature.feature_referance().name,
key=self.feature.feature_reference().name,
percentile=self.percentile,
)

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(
group_by, time_column, self.time_window, self.every_interval, self.offset_interval, None
Expand All @@ -350,7 +350,7 @@ def using_features(self) -> list[FeatureFactory]:
return self._using_features

def aggregate_over(
self, group_by: list[FeatureReferance], time_column: FeatureReferance | None
self, group_by: list[FeatureReference], time_column: FeatureReference | None
) -> AggregateOver:
return aggregate_over(group_by, time_column, None, None, None, None)

Expand All @@ -363,7 +363,7 @@ def compile(self) -> Transformation:
from aligned.schemas.transformation import PolarsFunctionTransformation, PolarsLambdaTransformation

if isinstance(self.method, pl.Expr):
method = lambda df, alias: self.method # type: ignore
method = lambda df, alias: self.method # noqa: E731
code = ''
return PolarsLambdaTransformation(method=dill.dumps(method), code=code, dtype=self.dtype.dtype)
else:
Expand Down
Loading

0 comments on commit a03b3ef

Please sign in to comment.