From 895df34bc0c9319a8295bc985457cbe43806c312 Mon Sep 17 00:00:00 2001 From: "Mats E. Mollestad" Date: Mon, 25 Dec 2023 22:09:59 +0100 Subject: [PATCH] Cleanup and some new dataset features --- aligned/active_learning/write_policy.py | 4 +- aligned/compiler/feature_factory.py | 18 +- aligned/compiler/model.py | 328 ++++++++---------- aligned/compiler/transformation_factory.py | 22 +- aligned/data_source/batch_data_source.py | 10 +- aligned/entity_data_source.py | 11 - aligned/feature_store.py | 19 +- aligned/feature_view/feature_view.py | 12 +- aligned/request/retrival_request.py | 26 +- aligned/retrival_job.py | 298 +++++++++++++++- aligned/schemas/folder.py | 204 ++++++++++- aligned/schemas/model.py | 4 +- aligned/schemas/transformation.py | 32 ++ aligned/sources/local.py | 2 +- aligned/sources/redshift.py | 6 +- aligned/tests/test_models_as_feature.py | 38 +- aligned/tests/test_train_test_validate_set.py | 66 +++- aligned/tests/test_transformations.py | 25 +- conftest.py | 60 ++-- pyproject.toml | 2 +- test_data/credit_history_mater.parquet | Bin 1018 -> 1016 bytes test_data/feature-store.json | 2 +- test_data/test_model.parquet | Bin 624 -> 624 bytes test_data/titanic-sets.json | 1 + test_data/titanic-test.csv | 21 ++ test_data/titanic-train.csv | 61 ++++ test_data/titanic-validate.csv | 21 ++ 27 files changed, 970 insertions(+), 323 deletions(-) delete mode 100644 aligned/entity_data_source.py create mode 100644 test_data/titanic-sets.json create mode 100644 test_data/titanic-test.csv create mode 100644 test_data/titanic-train.csv create mode 100644 test_data/titanic-validate.csv diff --git a/aligned/active_learning/write_policy.py b/aligned/active_learning/write_policy.py index a4cd05f4..b0ba136c 100644 --- a/aligned/active_learning/write_policy.py +++ b/aligned/active_learning/write_policy.py @@ -36,7 +36,7 @@ class ActiveLearningSampleSizePolicy(ActiveLearningWritePolicy): async def write(self, data: pl.LazyFrame, model: Model): - if not model.dataset_folder: + if not model.dataset_store: logger.info( 'Found no dataset folder. Therefore, no data will be written to an active learning dataset.' ) @@ -55,7 +55,7 @@ async def write(self, data: pl.LazyFrame, model: Model): dataset_subfolder = Path(self.dataset_folder_name) / str(self.write_timestamp) logger.info(f'Writing active learning data to {dataset_subfolder}') - dataset = model.dataset_folder.file_at(dataset_subfolder / self.dataset_file_name) + dataset = model.dataset_store.file_at(dataset_subfolder / self.dataset_file_name) await dataset.write(self.current_frame.write_csv().encode('utf-8')) self.unsaved_size = 0 diff --git a/aligned/compiler/feature_factory.py b/aligned/compiler/feature_factory.py index b2656ae2..d86ff710 100644 --- a/aligned/compiler/feature_factory.py +++ b/aligned/compiler/feature_factory.py @@ -34,7 +34,6 @@ from aligned.schemas.vector_storage import VectorStorage if TYPE_CHECKING: - from aligned.compiler.transformation_factory import FillNaStrategy from aligned.sources.s3 import AwsS3Config @@ -426,19 +425,14 @@ def add_values(feature: FeatureFactory) -> None: def copy_type(self: T) -> T: raise NotImplementedError() - def fill_na(self: T, value: FillNaStrategy | Any) -> T: - - from aligned.compiler.transformation_factory import ( - ConstantFillNaStrategy, - FillMissingFactory, - FillNaStrategy, - ) + def fill_na(self: T, value: FeatureFactory | Any) -> T: + from aligned.compiler.transformation_factory import FillMissingFactory instance: FeatureFactory = self.copy_type() # type: ignore [attr-defined] - if isinstance(value, FillNaStrategy): - instance.transformation = FillMissingFactory(self, value) - else: - instance.transformation = FillMissingFactory(self, ConstantFillNaStrategy(value)) + if not isinstance(value, FeatureFactory): + value = LiteralValue.from_value(value) + + instance.transformation = FillMissingFactory(self, value) # type: ignore [attr-defined] return instance # type: ignore [return-value] def transformed_using_features_pandas( diff --git a/aligned/compiler/model.py b/aligned/compiler/model.py index 89d8c105..864d34c8 100644 --- a/aligned/compiler/model.py +++ b/aligned/compiler/model.py @@ -2,15 +2,11 @@ import copy import logging -from abc import ABC, abstractproperty from dataclasses import dataclass, field -from datetime import datetime, timedelta -from typing import Any, Callable, Type, TypeVar, Generic +from typing import Any, Callable, Type, TypeVar, Generic, TYPE_CHECKING from uuid import uuid4 -import polars as pl - from aligned.compiler.feature_factory import ( ClassificationLabel, Entity, @@ -24,46 +20,24 @@ ) from aligned.data_source.batch_data_source import BatchDataSource from aligned.data_source.stream_data_source import StreamDataSource -from aligned.entity_data_source import EntityDataSource from aligned.feature_view.feature_view import FeatureView from aligned.schemas.derivied_feature import DerivedFeature from aligned.schemas.feature import Feature, FeatureLocation, FeatureReferance, FeatureType -from aligned.schemas.folder import Folder +from aligned.schemas.folder import DatasetStore, JsonDatasetStore from aligned.schemas.literal_value import LiteralValue from aligned.schemas.model import Model as ModelSchema from aligned.schemas.model import PredictionsView from aligned.schemas.target import ClassificationTarget as ClassificationTargetSchema from aligned.schemas.target import RegressionTarget as RegressionTargetSchema +if TYPE_CHECKING: + from aligned.sources.local import StorageFileReference + logger = logging.getLogger(__name__) T = TypeVar('T') -class SqlEntityDataSource(EntityDataSource): - - url: str - timestamp_column: str - - def __init__(self, sql: Callable[[str], str], url: str, timestamp_column: str) -> None: - self.sql = sql - self.url = url - self.timestamp_column = timestamp_column - - async def all_in_range(self, start_date: datetime, end_date: datetime) -> pl.DataFrame: - import os - - start = start_date.strftime('%Y-%m-%d %H:%M:%S') - end = end_date.strftime('%Y-%m-%d %H:%M:%S') - - query = self.sql(f'{self.timestamp_column} BETWEEN \'{start}\' AND \'{end}\'') - return pl.read_sql(query, os.environ[self.url]) - - async def last(self, days: int, hours: int, seconds: int) -> pl.DataFrame: - now = datetime.utcnow() - return await self.all_in_range(now - timedelta(days=days, hours=hours, seconds=seconds), now) - - @dataclass class ModelMetadata: name: str @@ -76,7 +50,8 @@ class ModelMetadata: prediction_source: BatchDataSource | None = field(default=None) prediction_stream: StreamDataSource | None = field(default=None) application_source: BatchDataSource | None = field(default=None) - dataset_folder: Folder | None = field(default=None) + + dataset_store: DatasetStore | None = field(default=None) @dataclass @@ -104,7 +79,7 @@ def __call__(self) -> T: return contract def compile(self) -> ModelSchema: - return ModelContract.compile_with_metadata(self.contract(), self.metadata) + return compile_with_metadata(self.contract(), self.metadata) def filter( self, name: str, where: Callable[[T], Bool], application_source: BatchDataSource | None = None @@ -137,6 +112,14 @@ def filter( return ModelContractWrapper(metadata=meta, contract=self.contract) +def resolve_dataset_store(dataset_store: DatasetStore | StorageFileReference) -> DatasetStore: + + if isinstance(dataset_store, DatasetStore): + return dataset_store + + return JsonDatasetStore(dataset_store) + + def model_contract( name: str, features: list[FeatureReferencable], @@ -146,7 +129,7 @@ def model_contract( prediction_source: BatchDataSource | None = None, prediction_stream: StreamDataSource | None = None, application_source: BatchDataSource | None = None, - dataset_folder: Folder | None = None, + dataset_folder: DatasetStore | StorageFileReference | None = None, ) -> Callable[[Type[T]], ModelContractWrapper[T]]: def decorator(cls: Type[T]) -> ModelContractWrapper[T]: metadata = ModelMetadata( @@ -158,172 +141,135 @@ def decorator(cls: Type[T]) -> ModelContractWrapper[T]: prediction_source=prediction_source, prediction_stream=prediction_stream, application_source=application_source, - dataset_folder=dataset_folder, + dataset_store=resolve_dataset_store(dataset_folder) if dataset_folder else None, ) return ModelContractWrapper(metadata, cls) return decorator -class ModelContract(ABC): - @staticmethod - def metadata_with( - name: str, - features: list[FeatureReferencable], - description: str | None = None, - contacts: list[str] | None = None, - tags: dict[str, str] | None = None, - predictions_source: BatchDataSource | None = None, - predictions_stream: StreamDataSource | None = None, - application_source: BatchDataSource | None = None, - dataset_folder: Folder | None = None, - ) -> ModelMetadata: - return ModelMetadata( - name, - features, - contacts, - tags, - description, - predictions_source, - predictions_stream, - application_source=application_source, - dataset_folder=dataset_folder, - ) - - @abstractproperty - def metadata(self) -> ModelMetadata: - raise NotImplementedError() - - @classmethod - def compile(cls) -> ModelSchema: - instance = cls() - return ModelContract.compile_with_metadata(instance, instance.metadata) - - @staticmethod - def compile_with_metadata(model: Any, metadata: ModelMetadata) -> ModelSchema: - """ - Compiles the ModelContract in to ModelSchema structure that can further be encoded. - - ```python - class MyModel(ModelContract): - ... - - metadata = ModelContract.metadata_with(...) - - model_schema = MyModel().compile_instance() - - ``` - - Returns: The compiled Model schema - """ - var_names = [name for name in model.__dir__() if not name.startswith('_')] - - inference_view: PredictionsView = PredictionsView( - entities=set(), - features=set(), - derived_features=set(), - model_version_column=None, - source=metadata.prediction_source, - application_source=metadata.application_source, - stream_source=metadata.prediction_stream, - classification_targets=set(), - regression_targets=set(), - ) - probability_features: dict[str, set[TargetProbability]] = {} - - classification_targets: dict[str, ClassificationTargetSchema] = {} - regression_targets: dict[str, RegressionTargetSchema] = {} - - for var_name in var_names: - feature = getattr(model, var_name) - - if isinstance(feature, FeatureFactory): - feature._location = FeatureLocation.model(metadata.name) - - if isinstance(feature, ModelVersion): - inference_view.model_version_column = feature.feature() - if isinstance(feature, FeatureView): - compiled = feature.compile() - inference_view.entities.update(compiled.entities) - elif isinstance(feature, ModelContract): - compiled = feature.compile() - inference_view.entities.update(compiled.predictions_view.entities) - elif isinstance(feature, ClassificationLabel): - assert feature._name - feature._location = FeatureLocation.model(metadata.name) - target_feature = feature.compile() - - classification_targets[var_name] = target_feature - inference_view.classification_targets.add(target_feature) - elif isinstance(feature, RegressionLabel): - assert feature._name - feature._location = FeatureLocation.model(metadata.name) - target_feature = feature.compile() - - regression_targets[var_name] = target_feature - inference_view.regression_targets.add(target_feature) - inference_view.features.add(target_feature.feature) - elif isinstance(feature, EventTimestamp): - inference_view.event_timestamp = feature.event_timestamp() - - elif isinstance(feature, TargetProbability): - feature_name = feature.target._name - assert feature._name - assert ( - feature.target._name in classification_targets - ), 'Target must be a classification target.' - - target = classification_targets[feature.target._name] - target.class_probabilities.add(feature.compile()) - - inference_view.features.add( - Feature( - var_name, - FeatureType.float(), - f"The probability of target named {feature_name} being '{feature.of_value}'.", - ) +def compile_with_metadata(model: Any, metadata: ModelMetadata) -> ModelSchema: + """ + Compiles the ModelContract in to ModelSchema structure that can further be encoded. + + ```python + class MyModel(ModelContract): + ... + + metadata = ModelContract.metadata_with(...) + + model_schema = MyModel().compile_instance() + + ``` + + Returns: The compiled Model schema + """ + var_names = [name for name in model.__dir__() if not name.startswith('_')] + + inference_view: PredictionsView = PredictionsView( + entities=set(), + features=set(), + derived_features=set(), + model_version_column=None, + source=metadata.prediction_source, + application_source=metadata.application_source, + stream_source=metadata.prediction_stream, + classification_targets=set(), + regression_targets=set(), + ) + probability_features: dict[str, set[TargetProbability]] = {} + + classification_targets: dict[str, ClassificationTargetSchema] = {} + regression_targets: dict[str, RegressionTargetSchema] = {} + + for var_name in var_names: + feature = getattr(model, var_name) + + if isinstance(feature, FeatureFactory): + feature._location = FeatureLocation.model(metadata.name) + + if isinstance(feature, ModelVersion): + inference_view.model_version_column = feature.feature() + if isinstance(feature, FeatureView): + compiled = feature.compile() + inference_view.entities.update(compiled.entities) + + elif isinstance(feature, ModelContractWrapper): + compiled = feature.compile() + inference_view.entities.update(compiled.predictions_view.entities) + + elif isinstance(feature, ClassificationLabel): + assert feature._name + feature._location = FeatureLocation.model(metadata.name) + target_feature = feature.compile() + + classification_targets[var_name] = target_feature + inference_view.classification_targets.add(target_feature) + elif isinstance(feature, RegressionLabel): + assert feature._name + feature._location = FeatureLocation.model(metadata.name) + target_feature = feature.compile() + + regression_targets[var_name] = target_feature + inference_view.regression_targets.add(target_feature) + inference_view.features.add(target_feature.feature) + elif isinstance(feature, EventTimestamp): + inference_view.event_timestamp = feature.event_timestamp() + + elif isinstance(feature, TargetProbability): + feature_name = feature.target._name + assert feature._name + assert feature.target._name in classification_targets, 'Target must be a classification target.' + + target = classification_targets[feature.target._name] + target.class_probabilities.add(feature.compile()) + + inference_view.features.add( + Feature( + var_name, + FeatureType.float(), + f"The probability of target named {feature_name} being '{feature.of_value}'.", ) - probability_features[feature_name] = probability_features.get(feature_name, set()).union( - {feature} - ) - elif isinstance(feature, Entity): - inference_view.entities.add(feature.feature()) - elif isinstance(feature, FeatureFactory): - inference_view.features.add(feature.feature()) - - # Needs to run after the feature views have compiled - features: set[FeatureReferance] = {feature.feature_referance() for feature in metadata.features} - - for target, probabilities in probability_features.items(): - from aligned.schemas.transformation import MapArgMax - - transformation = MapArgMax( - {probs._name: LiteralValue.from_value(probs.of_value) for probs in probabilities} ) - - arg_max_feature = DerivedFeature( - name=target, - dtype=transformation.dtype, - transformation=transformation, - depending_on={ - FeatureReferance(feat, FeatureLocation.model(metadata.name), dtype=FeatureType.float()) - for feat in transformation.column_mappings.keys() - }, - depth=1, + probability_features[feature_name] = probability_features.get(feature_name, set()).union( + {feature} ) - inference_view.derived_features.add(arg_max_feature) + elif isinstance(feature, Entity): + inference_view.entities.add(feature.feature()) + elif isinstance(feature, FeatureFactory): + inference_view.features.add(feature.feature()) - if not probability_features and inference_view.classification_targets: - inference_view.features.update( - {target.feature for target in inference_view.classification_targets} - ) + # Needs to run after the feature views have compiled + features: set[FeatureReferance] = {feature.feature_referance() for feature in metadata.features} + + for target, probabilities in probability_features.items(): + from aligned.schemas.transformation import MapArgMax + + transformation = MapArgMax( + {probs._name: LiteralValue.from_value(probs.of_value) for probs in probabilities} + ) - return ModelSchema( - name=metadata.name, - features=features, - predictions_view=inference_view, - contacts=metadata.contacts, - tags=metadata.tags, - description=metadata.description, - dataset_folder=metadata.dataset_folder, + arg_max_feature = DerivedFeature( + name=target, + dtype=transformation.dtype, + transformation=transformation, + depending_on={ + FeatureReferance(feat, FeatureLocation.model(metadata.name), dtype=FeatureType.float()) + for feat in transformation.column_mappings.keys() + }, + depth=1, ) + inference_view.derived_features.add(arg_max_feature) + + if not probability_features and inference_view.classification_targets: + inference_view.features.update({target.feature for target in inference_view.classification_targets}) + + return ModelSchema( + name=metadata.name, + features=features, + predictions_view=inference_view, + contacts=metadata.contacts, + tags=metadata.tags, + description=metadata.description, + dataset_store=metadata.dataset_store, + ) diff --git a/aligned/compiler/transformation_factory.py b/aligned/compiler/transformation_factory.py index baa62894..ae1cc78f 100644 --- a/aligned/compiler/transformation_factory.py +++ b/aligned/compiler/transformation_factory.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass, field -from datetime import timedelta +from datetime import timedelta # noqa: TC003 from typing import Any, Callable import pandas as pd @@ -8,7 +8,7 @@ from aligned import AwsS3Config from aligned.compiler.feature_factory import FeatureFactory, Transformation, TransformationFactory -from aligned.schemas.transformation import LiteralValue, TextVectoriserModel +from aligned.schemas.transformation import FillNaValuesColumns, LiteralValue, TextVectoriserModel logger = logging.getLogger(__name__) @@ -469,20 +469,24 @@ def compile(self) -> Any: class FillMissingFactory(TransformationFactory): feature: FeatureFactory - strategy: FillNaStrategy + value: LiteralValue | FeatureFactory @property def using_features(self) -> list[FeatureFactory]: - return [self.feature] + if isinstance(self.value, LiteralValue): + return [self.feature] + else: + return [self.feature, self.value] def compile(self) -> Transformation: from aligned.schemas.transformation import FillNaValues - fill_value = self.strategy.compile() - - return FillNaValues( - key=self.feature.name, value=LiteralValue.from_value(fill_value), dtype=self.feature.dtype - ) + if isinstance(self.value, LiteralValue): + return FillNaValues(key=self.feature.name, value=self.value, dtype=self.feature.dtype) + else: + return FillNaValuesColumns( + key=self.feature.name, fill_key=self.value.name, dtype=self.feature.dtype + ) @dataclass diff --git a/aligned/data_source/batch_data_source.py b/aligned/data_source/batch_data_source.py index e47fe4b4..28bdb170 100644 --- a/aligned/data_source/batch_data_source.py +++ b/aligned/data_source/batch_data_source.py @@ -9,7 +9,7 @@ from aligned.schemas.codable import Codable from aligned.schemas.derivied_feature import DerivedFeature from aligned.schemas.feature import EventTimestamp, Feature, FeatureLocation -from aligned.request.retrival_request import RetrivalRequest +from aligned.request.retrival_request import RequestResult, RetrivalRequest from aligned.compiler.feature_factory import FeatureFactory if TYPE_CHECKING: @@ -137,6 +137,12 @@ def _deserialize(cls, value: dict) -> BatchDataSource: data_class = BatchDataSourceFactory.shared().supported_data_sources[name_type] return data_class.from_dict(value) + def all(self, result: RequestResult, limit: int | None = None) -> RetrivalJob: + return self.all_data( + result.as_retrival_request('read_all', location=FeatureLocation.feature_view('read_all')), + limit=limit, + ) + def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: if isinstance(self, BatchSourceModification): return self.wrap_job(self.source.all_data(request, limit)) @@ -653,7 +659,7 @@ class ColumnFeatureMappable: def with_renames(self: T, mapping_keys: dict[str, str]) -> T: self.mapping_keys = mapping_keys # type: ignore - return selfFileSource.parquet_at('source_data/transactions.parquet') + return self def columns_for(self, features: list[Feature]) -> list[str]: return [self.mapping_keys.get(feature.name, feature.name) for feature in features] diff --git a/aligned/entity_data_source.py b/aligned/entity_data_source.py deleted file mode 100644 index a34e4f15..00000000 --- a/aligned/entity_data_source.py +++ /dev/null @@ -1,11 +0,0 @@ -from datetime import datetime - -from polars import DataFrame - - -class EntityDataSource: - async def all_in_range(self, start_date: datetime, end_date: datetime) -> DataFrame: - pass - - async def last(self, days: int, hours: int, seconds: int) -> DataFrame: - pass diff --git a/aligned/feature_store.py b/aligned/feature_store.py index d43b1f83..36f6cf0c 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -9,7 +9,7 @@ from prometheus_client import Histogram -from aligned.compiler.model import ModelContract +from aligned.compiler.model import ModelContractWrapper from aligned.data_file import DataFileReference, upsert_on_column from aligned.data_source.batch_data_source import BatchDataSource from aligned.enricher import Enricher @@ -22,7 +22,7 @@ WritableFeatureSource, ) from aligned.feature_view.combined_view import CombinedFeatureView, CompiledCombinedFeatureView -from aligned.feature_view.feature_view import FeatureView +from aligned.feature_view.feature_view import FeatureView, FeatureViewWrapper from aligned.request.retrival_request import FeatureRequest, RetrivalRequest from aligned.retrival_job import ( SelectColumnsJob, @@ -30,6 +30,7 @@ StreamAggregationJob, SupervisedJob, ConvertableToRetrivalJob, + SupervisedTrainJob, ) from aligned.schemas.feature import FeatureLocation, Feature from aligned.schemas.feature_view import CompiledFeatureView @@ -472,8 +473,11 @@ class MyFeatureView: view.materialized_source or view.source ) - def add_feature_view(self, feature_view: FeatureView) -> None: - self.add_compiled_view(feature_view.compile_instance()) + def add_feature_view(self, feature_view: FeatureView | FeatureViewWrapper) -> None: + if isinstance(feature_view, FeatureViewWrapper): + self.add_compiled_view(feature_view.compile()) + else: + self.add_compiled_view(feature_view.compile_instance()) def add_combined_feature_view(self, feature_view: CombinedFeatureView) -> None: compiled_view = type(feature_view).compile() @@ -482,14 +486,14 @@ def add_combined_feature_view(self, feature_view: CombinedFeatureView) -> None: def add_combined_view(self, compiled_view: CompiledCombinedFeatureView) -> None: self.combined_feature_views[compiled_view.name] = compiled_view - def add_model(self, model: ModelContract) -> None: + def add_model(self, model: ModelContractWrapper) -> None: """ Compiles and adds the model to the store Args: model (Model): The model to add """ - compiled_model = type(model).compile() + compiled_model = model.compile() self.models[compiled_model.name] = compiled_model def add_compiled_model(self, model: ModelSchema) -> None: @@ -1049,6 +1053,9 @@ class TaxiEta: """ await self.store.insert_into(FeatureLocation.model(self.model.name), predictions) + async def store_train_test_dataset(self, job: SupervisedTrainJob) -> SupervisedTrainJob: + pass + @dataclass class SupervisedModelFeatureStore: diff --git a/aligned/feature_view/feature_view.py b/aligned/feature_view/feature_view.py index 837aa5aa..6cabffe8 100644 --- a/aligned/feature_view/feature_view.py +++ b/aligned/feature_view/feature_view.py @@ -489,6 +489,16 @@ def compile_with_metadata(feature_view: Any, metadata: FeatureViewMetadata) -> C ] ) + if isinstance(feature, Entity): + view.entities.add(compiled_feature) + + if feature._dtype.transformation: + feature = feature._dtype + feature._name = var_name + feature._location = FeatureLocation.feature_view(metadata.name) + else: + continue + if feature.transformation: # Adding features that is not stored in the view # e.g: @@ -543,8 +553,6 @@ def sort_key(x: tuple[int, FeatureFactory]) -> int: else: view.derived_features.add(feature.compile()) # Should decide on which payload to send - elif isinstance(feature, Entity): - view.entities.add(compiled_feature) elif isinstance(feature, EventTimestamp): if view.event_timestamp is not None: raise Exception( diff --git a/aligned/request/retrival_request.py b/aligned/request/retrival_request.py index 9626654c..35b63cad 100644 --- a/aligned/request/retrival_request.py +++ b/aligned/request/retrival_request.py @@ -79,11 +79,12 @@ def filter_features(self, feature_names: set[str]) -> 'RetrivalRequest': def all_returned_columns(self) -> list[str]: result = self.entity_names - if self.event_timestamp: - if all([agg.aggregate_over.window is not None for agg in self.aggregated_features]): - result = result.union({self.event_timestamp.name}) - elif len(self.aggregated_features) == 0: - result = result.union({self.event_timestamp.name}) + + if self.event_timestamp and ( + all(agg.aggregate_over.window is not None for agg in self.aggregated_features) + or len(self.aggregated_features) == 0 + ): + result = result.union({self.event_timestamp.name}) if self.aggregated_features: agg_names = [feat.name for feat in self.aggregated_features] @@ -393,6 +394,21 @@ def from_result_list(requests: list['RequestResult']) -> 'RequestResult': else: return requests[0] + def as_retrival_request(self, name: str, location: FeatureLocation) -> RetrivalRequest: + return RetrivalRequest( + name=name, + location=location, + entities=self.entities, + features=self.features, + derived_features=set(), + aggregated_features=set(), + event_timestamp_request=EventTimestampRequest( + event_timestamp=EventTimestamp(name=self.event_timestamp), entity_column=None + ) + if self.event_timestamp + else None, + ) + @dataclass class FeatureRequest(Codable): diff --git a/aligned/retrival_job.py b/aligned/retrival_job.py index 3abe9fca..9d0f603c 100644 --- a/aligned/retrival_job.py +++ b/aligned/retrival_job.py @@ -8,7 +8,7 @@ from contextlib import suppress from dataclasses import dataclass, field from datetime import datetime -from typing import TYPE_CHECKING, Union, TypeVar +from typing import TYPE_CHECKING, Callable, Union, TypeVar, Coroutine, Any import pandas as pd import polars as pl @@ -30,10 +30,11 @@ if TYPE_CHECKING: from typing import AsyncIterator + from aligned.schemas.folder import DatasetMetadata, DatasetStore from aligned.schemas.derivied_feature import AggregatedFeature, AggregateOver from aligned.schemas.model import EventTrigger - from aligned.sources.local import DataFileReference + from aligned.sources.local import DataFileReference, StorageFileReference from aligned.feature_source import WritableFeatureSource @@ -61,6 +62,23 @@ def split( return index +def subset_polars( + data: pl.DataFrame, start_ratio: float, end_ratio: float, event_timestamp_column: str | None = None +) -> pl.DataFrame: + + if event_timestamp_column: + data = data.sort(event_timestamp_column) + + group_size = data.height + start_index = round(group_size * start_ratio) + end_index = round(group_size * end_ratio) + + if end_index >= group_size: + return data[start_index:] + else: + return data[start_index:end_index] + + def split_polars( data: pl.DataFrame, start_ratio: float, end_ratio: float, event_timestamp_column: str | None = None ) -> pd.Series: @@ -90,6 +108,156 @@ def split_polars( return data[start_index:end_index][row_name].to_pandas() +@dataclass +class TrainTestJob: + + train_job: RetrivalJob + test_job: RetrivalJob + + target_columns: set[str] + + @property + def train(self) -> SupervisedJob: + return SupervisedJob(self.train_job, self.target_columns) + + @property + def test(self) -> SupervisedJob: + return SupervisedJob(self.test_job, self.target_columns) + + def store_dataset( + self, + dataset_store: DatasetStore, + metadata: DatasetMetadata, + train_source: DataFileReference, + test_source: DataFileReference, + train_size: float | None = None, + test_size: float | None = None, + ) -> TrainTestJob: + from aligned.schemas.folder import TrainDatasetMetadata + from aligned.data_source.batch_data_source import BatchDataSource + + request_result = self.train_job.request_result + + if isinstance(self.train_job, SubsetJob): + train_size = self.train_job.fraction + + if isinstance(self.test_job, SubsetJob): + test_size = self.test_job.fraction + + if not isinstance(test_source, BatchDataSource): + raise ValueError('test_source should be a BatchDataSource') + + if not isinstance(train_source, BatchDataSource): + raise ValueError('train_source should be a BatchDataSource') + + test_metadata = TrainDatasetMetadata( + id=metadata.id, + name=metadata.name, + request_result=request_result, + description=metadata.description, + train_size_fraction=train_size, + test_size_fraction=test_size, + train_dataset=train_source, + test_dataset=test_source, + target=list(self.target_columns), + ) + + async def update_metadata() -> None: + await dataset_store.store_train_test(test_metadata) + + return TrainTestJob( + train_job=self.train_job.on_load(update_metadata).cached_at(train_source), + test_job=self.test_job.on_load(update_metadata).cached_at(test_source), + target_columns=self.target_columns, + ) + + +@dataclass +class TrainTestValidateJob: + + train_job: RetrivalJob + test_job: RetrivalJob + validate_job: RetrivalJob + + target_columns: set[str] + + @property + def train(self) -> SupervisedJob: + return SupervisedJob(self.train_job, self.target_columns) + + @property + def test(self) -> SupervisedJob: + return SupervisedJob(self.test_job, self.target_columns) + + @property + def validate(self) -> SupervisedJob: + return SupervisedJob(self.validate_job, self.target_columns) + + def store_dataset( + self, + dataset_store: DatasetStore | StorageFileReference, + metadata: DatasetMetadata, + train_source: DataFileReference, + test_source: DataFileReference, + validate_source: DataFileReference, + train_size: float | None = None, + test_size: float | None = None, + validation_size: float | None = None, + ) -> TrainTestValidateJob: + from aligned.schemas.folder import TrainDatasetMetadata, JsonDatasetStore + from aligned.data_source.batch_data_source import BatchDataSource + from aligned.sources.local import StorageFileReference + + if isinstance(dataset_store, StorageFileReference): + data_store = JsonDatasetStore(dataset_store) + else: + data_store = dataset_store + + request_result = self.train_job.request_result + + if isinstance(self.train_job, SubsetJob): + train_size = self.train_job.fraction + + if isinstance(self.test_job, SubsetJob): + test_size = self.test_job.fraction + + if isinstance(self.validate_job, SubsetJob): + validation_size = self.validate_job.fraction + + if not isinstance(test_source, BatchDataSource): + raise ValueError('test_source should be a BatchDataSource') + + if not isinstance(train_source, BatchDataSource): + raise ValueError('train_source should be a BatchDataSource') + + if not isinstance(validate_source, BatchDataSource): + raise ValueError('validation_source should be a BatchDataSource') + + test_metadata = TrainDatasetMetadata( + id=metadata.id, + name=metadata.name, + request_result=request_result, + description=metadata.description, + train_size_fraction=train_size, + test_size_fraction=test_size, + validate_size_fraction=validation_size, + train_dataset=train_source, + test_dataset=test_source, + validation_dataset=validate_source, + target=list(self.target_columns), + ) + + async def update_metadata() -> None: + await data_store.store_train_test_validate(test_metadata) + + return TrainTestValidateJob( + train_job=self.train_job.on_load(update_metadata).cached_at(train_source), + test_job=self.test_job.on_load(update_metadata).cached_at(test_source), + validate_job=self.validate_job.on_load(update_metadata).cached_at(validate_source), + target_columns=self.target_columns, + ) + + @dataclass class SupervisedJob: @@ -127,6 +295,34 @@ def request_result(self) -> RequestResult: def train_set(self, train_size: float) -> SupervisedTrainJob: return SupervisedTrainJob(self, train_size) + def train_test(self, train_size: float) -> TrainTestJob: + + cached_job = InMemoryCacheJob(self.job) + + event_timestamp = self.job.request_result.event_timestamp + + return TrainTestJob( + train_job=SubsetJob(cached_job, 0, train_size, event_timestamp), + test_job=SubsetJob(cached_job, train_size, 1, event_timestamp), + target_columns=self.target_columns, + ) + + def train_test_validate(self, train_size: float, validate_size: float) -> TrainTestValidateJob: + + cached_job = InMemoryCacheJob(self.job) + + event_timestamp = self.job.request_result.event_timestamp + + test_ratio_start = train_size + validate_ratio_start = test_ratio_start + validate_size + + return TrainTestValidateJob( + train_job=SubsetJob(cached_job, 0, train_size, event_timestamp), + test_job=SubsetJob(cached_job, train_size, validate_ratio_start, event_timestamp), + validate_job=SubsetJob(cached_job, validate_ratio_start, 1, event_timestamp), + target_columns=self.target_columns, + ) + def with_subfeatures(self) -> SupervisedJob: return SupervisedJob(self.job.with_subfeatures(), self.target_columns) @@ -324,6 +520,9 @@ def join( return JoinJobs(method=method, left_job=self, right_job=job, left_on=left_on, right_on=right_on) + def on_load(self, on_load: Callable[[], Coroutine[Any, Any, None]]) -> RetrivalJob: + return OnLoadJob(self, on_load) + def filter(self, condition: str | Feature | DerivedFeature) -> RetrivalJob: return FilteredJob(self, condition) @@ -354,6 +553,34 @@ def test_size(self, test_size: float, target_column: str) -> SupervisedTrainJob: def train_set(self, train_size: float, target_column: str) -> SupervisedTrainJob: return SupervisedJob(self, {target_column}).train_set(train_size=train_size) + def train_test(self, train_size: float, target_column: str) -> TrainTestJob: + cached = InMemoryCacheJob(self) + + event_timestamp = self.request_result.event_timestamp + + return TrainTestJob( + train_job=SubsetJob(cached, 0, train_size, event_timestamp), + test_job=SubsetJob(cached, train_size, 1, event_timestamp), + target_columns={target_column}, + ) + + def train_test_validate( + self, train_size: float, validate_size: float, target_column: str + ) -> TrainTestValidateJob: + + cached = InMemoryCacheJob(self) + + event_timestamp = self.request_result.event_timestamp + + validate_ratio_start = train_size + validate_size + + return TrainTestValidateJob( + train_job=SubsetJob(cached, 0, train_size, event_timestamp), + test_job=SubsetJob(cached, train_size, validate_ratio_start, event_timestamp), + validate_job=SubsetJob(cached, validate_ratio_start, 1, event_timestamp), + target_columns={target_column}, + ) + def drop_invalid(self, validator: Validator) -> RetrivalJob: return DropInvalidJob(self, validator) @@ -497,6 +724,71 @@ def copy_with(self: JobType, job: RetrivalJob) -> JobType: return self +@dataclass +class SubsetJob(RetrivalJob, ModificationJob): + + job: RetrivalJob + start_ratio: float + end_ratio: float + sort_column: str | None = None + + @property + def fraction(self) -> float: + return self.end_ratio - self.start_ratio + + async def to_polars(self) -> pl.LazyFrame: + data = (await self.job.to_polars()).collect() + return subset_polars(data, self.start_ratio, self.end_ratio, self.sort_column).lazy() + + async def to_pandas(self) -> pd.DataFrame: + data = await self.job.to_pandas() + selection = split(data, self.start_ratio, self.end_ratio, self.sort_column) + return data.iloc[selection] + + +@dataclass +class OnLoadJob(RetrivalJob, ModificationJob): # type: ignore + + job: RetrivalJob # type + on_load: Callable[[], Coroutine[Any, Any, None]] # type: ignore + + async def to_pandas(self) -> pd.DataFrame: + data = await self.job.to_pandas() + await self.on_load() + return data + + async def to_polars(self) -> pl.LazyFrame: + data = (await self.job.to_polars()).collect() + await self.on_load() + return data.lazy() + + def describe(self) -> str: + return f'OnLoadJob {self.on_load} -> {self.job.describe()}' + + +@dataclass +class InMemoryCacheJob(RetrivalJob, ModificationJob): + + job: RetrivalJob + cached_data: pl.DataFrame | None = None + + async def to_polars(self) -> pl.LazyFrame: + if self.cached_data is not None: + return self.cached_data.lazy() + + data = (await self.job.to_polars()).collect() + self.cached_data = data + return data.lazy() + + async def to_pandas(self) -> pd.DataFrame: + if self.cached_data is not None: + return self.cached_data.to_pandas() + + data = await self.job.to_pandas() + self.cached_data = pl.from_pandas(data) + return data + + @dataclass class AggregateJob(RetrivalJob, ModificationJob): @@ -979,7 +1271,7 @@ def remove_derived_features(self) -> RetrivalJob: class UniqueRowsJob(RetrivalJob, ModificationJob): job: RetrivalJob - unique_on: list[str] + unique_on: list[str] # type: ignore sort_key: str | None = field(default=None) async def to_pandas(self) -> pd.DataFrame: diff --git a/aligned/schemas/folder.py b/aligned/schemas/folder.py index b5d696c6..c64240bf 100644 --- a/aligned/schemas/folder.py +++ b/aligned/schemas/folder.py @@ -1,9 +1,12 @@ from __future__ import annotations -from pathlib import Path from typing import TYPE_CHECKING +from dataclasses import dataclass, field + from mashumaro.types import SerializableType +from aligned.data_source.batch_data_source import BatchDataSource +from aligned.request.retrival_request import RequestResult from aligned.schemas.codable import Codable @@ -11,44 +14,213 @@ from aligned.sources.local import StorageFileReference -class FolderFactory: +class DatasetStorageFactory: - supported_folders: dict[str, type[Folder]] + supported_stores: dict[str, type[DatasetStore]] - _shared: FolderFactory | None = None + _shared: DatasetStorageFactory | None = None def __init__(self) -> None: - self.supported_folders = {folder_type.name: folder_type for folder_type in Folder.__subclasses__()} + dataset_types = [ + JsonDatasetStore, + ] + for dataset_type in dataset_types: + self.supported_stores[dataset_type.name] = dataset_type @classmethod - def shared(cls) -> FolderFactory: + def shared(cls) -> DatasetStorageFactory: if cls._shared: return cls._shared - cls._shared = FolderFactory() + cls._shared = DatasetStorageFactory() return cls._shared -class Folder(Codable, SerializableType): +@dataclass +class DatasetMetadata(Codable): - name: str + id: str + name: str | None = field(default=None) + description: str | None = field(default=None) + tags: list[str] | None = field(default=None) + + +@dataclass +class TrainDatasetMetadata(Codable): + + id: str + name: str | None + + request_result: RequestResult + + train_dataset: BatchDataSource + test_dataset: BatchDataSource + + validation_dataset: BatchDataSource | None = field(default=None) + + train_size_fraction: float | None = field(default=None) + test_size_fraction: float | None = field(default=None) + validate_size_fraction: float | None = field(default=None) + + target: list[str] | None = field(default=None) + + description: str | None = field(default=None) + tags: list[str] | None = field(default=None) + + +@dataclass +class GroupedDatasetList(Codable): + + raw_data: list[DatasetMetadata] + + train_test: list[TrainDatasetMetadata] + train_test_validation: list[TrainDatasetMetadata] + + active_learning: list[DatasetMetadata] + + @property + def all(self) -> list[DatasetMetadata]: + return self.raw_data + self.train_test + self.train_test_validation + self.active_learning - def file_at(self, path: Path) -> StorageFileReference: - raise NotImplementedError() + +class DatasetStore(Codable, SerializableType): + + name: str def _serialize(self) -> dict: - assert self.name in FolderFactory.shared().supported_folders, f'Unknown type_name: {self.name}' + assert self.name in DatasetStorageFactory.shared().supported_stores, f'Unknown type_name: {self.name}' return self.to_dict() @classmethod - def _deserialize(cls, value: dict) -> Folder: + def _deserialize(cls, value: dict) -> DatasetStore: name_type = value['name'] - if name_type not in FolderFactory.shared().supported_folders: + if name_type not in DatasetStorageFactory.shared().supported_stores: + supported = DatasetStorageFactory.shared().supported_stores.keys() raise ValueError( f"Unknown batch data source id: '{name_type}'.\nRemember to add the" ' data source to the FolderFactory.supported_folders if' ' it is a custom type.' - f' Have access to the following types: {FolderFactory.shared().supported_folders.keys()}' + f' Have access to the following types: {supported}' ) del value['name'] - data_class = FolderFactory.shared().supported_folders[name_type] + data_class = DatasetStorageFactory.shared().supported_stores[name_type] return data_class.from_dict(value) + + async def list_datasets(self) -> GroupedDatasetList: + raise NotImplementedError(type(self)) + + async def store_raw_data(self, metadata: DatasetMetadata) -> None: + raise NotImplementedError(type(self)) + + async def store_train_test(self, metadata: TrainDatasetMetadata) -> None: + raise NotImplementedError(type(self)) + + async def store_train_test_validate(self, metadata: TrainDatasetMetadata) -> None: + raise NotImplementedError(type(self)) + + async def store_active_learning(self, metadata: DatasetMetadata) -> None: + raise NotImplementedError(type(self)) + + async def metadata_for(self, dataset_id: str) -> DatasetMetadata | None: + raise NotImplementedError(type(self)) + + async def delete_metadata_for(self, dataset_id: str) -> DatasetMetadata | None: + raise NotImplementedError(type(self)) + + +@dataclass +class JsonDatasetStore(DatasetStore): + + source: StorageFileReference + name = 'json' + + async def list_datasets(self) -> GroupedDatasetList: + try: + data = await self.source.read() + return GroupedDatasetList.from_json(data) + except FileNotFoundError: + return GroupedDatasetList( + raw_data=[], + train_test=[], + train_test_validation=[], + active_learning=[], + ) + + def index_of( + self, metadata_id: str, array: list[DatasetMetadata] | list[TrainDatasetMetadata] + ) -> int | None: + + for i, dataset in enumerate(array): + if dataset.id == metadata_id: + return i + return None + + async def store_train_test(self, metadata: TrainDatasetMetadata) -> None: + datasets = await self.list_datasets() + + index = self.index_of(metadata.id, datasets.train_test) + if index is None: + datasets.train_test.append(metadata) + else: + datasets.train_test[index] = metadata + + data = datasets.to_json() + if isinstance(data, str): + data = data.encode('utf-8') + await self.source.write(data) + + async def store_train_test_validate(self, metadata: TrainDatasetMetadata) -> None: + datasets = await self.list_datasets() + + index = self.index_of(metadata.id, datasets.train_test_validation) + if index is None: + datasets.train_test_validation.append(metadata) + else: + datasets.train_test_validation[index] = metadata + + data = datasets.to_json() + if isinstance(data, str): + data = data.encode('utf-8') + + await self.source.write(data) + + async def store_raw_data(self, metadata: DatasetMetadata) -> None: + datasets = await self.list_datasets() + + index = self.index_of(metadata.id, datasets.raw_data) + if index is not None: + datasets.raw_data[index] = metadata + else: + datasets.raw_data.append(metadata) + + data = datasets.to_json() + if isinstance(data, str): + data = data.encode('utf-8') + await self.source.write(data) + + async def store_active_learning(self, metadata: DatasetMetadata) -> None: + datasets = await self.list_datasets() + + index = self.index_of(metadata.id, datasets.active_learning) + if index is None: + datasets.active_learning.append(metadata) + else: + datasets.active_learning[index] = metadata + + data = datasets.to_json() + if isinstance(data, str): + data = data.encode('utf-8') + await self.source.write(data) + + async def metadata_for(self, dataset_id: str) -> DatasetMetadata | None: + datasets = await self.list_datasets() + for dataset in datasets.all: + if dataset.id == dataset_id: + return dataset + return None + + async def delete_metadata_for(self, dataset_id: str) -> DatasetMetadata | None: + datasets = await self.list_datasets() + index = self.index_of(dataset_id, datasets.all) + if index is None: + return None + return datasets.all[index] diff --git a/aligned/schemas/model.py b/aligned/schemas/model.py index ac10ca7a..6d7f63c4 100644 --- a/aligned/schemas/model.py +++ b/aligned/schemas/model.py @@ -10,7 +10,7 @@ from aligned.schemas.target import ClassificationTarget, RegressionTarget from aligned.schemas.derivied_feature import DerivedFeature from aligned.data_source.batch_data_source import BatchDataSource -from aligned.schemas.folder import Folder +from aligned.schemas.folder import DatasetStore logger = logging.getLogger(__name__) @@ -116,7 +116,7 @@ class Model(Codable): description: str | None = field(default=None) contacts: list[str] | None = field(default=None) tags: dict[str, str] | None = field(default=None) - dataset_folder: Folder | None = field(default=None) + dataset_store: DatasetStore | None = field(default=None) def __hash__(self) -> int: return self.name.__hash__() diff --git a/aligned/schemas/transformation.py b/aligned/schemas/transformation.py index b7a99d51..44ed33fb 100644 --- a/aligned/schemas/transformation.py +++ b/aligned/schemas/transformation.py @@ -202,6 +202,7 @@ def __init__(self) -> None: Inverse, Ordinal, FillNaValues, + FillNaValuesColumns, Absolute, Round, Ceil, @@ -1452,6 +1453,37 @@ def test_definition() -> TransformationTestDefinition: ) +@dataclass +class FillNaValuesColumns(Transformation): + + key: str + fill_key: str + dtype: FeatureType + + name: str = 'fill_missing_key' + + async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: + return df[self.key].fillna(df[self.fill_key]) + + async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: + if self.dtype == FeatureType.float(): + return pl.col(self.key).fill_nan(pl.col(self.fill_key)).fill_null(pl.col(self.fill_key)) + + else: + return pl.col(self.key).fill_null(pl.col(self.fill_key)) + + def should_skip(self, output_column: str, columns: list[str]) -> bool: + return False + + @staticmethod + def test_definition() -> TransformationTestDefinition: + return TransformationTestDefinition( + FillNaValuesColumns('x', 'y', dtype=FeatureType.int32()), + input={'x': [1, 1, None, None, 3, 3, None, 4, 5, None], 'y': [1, 2, 1, 2, 7, 2, 4, 1, 1, 9]}, + output=[1, 1, 1, 2, 3, 3, 4, 4, 5, 9], + ) + + @dataclass class FillNaValues(Transformation): diff --git a/aligned/sources/local.py b/aligned/sources/local.py index 3c600447..43996ad0 100644 --- a/aligned/sources/local.py +++ b/aligned/sources/local.py @@ -439,7 +439,7 @@ async def as_repo_definition(self) -> RepoDefinition: @dataclass -class FileDirectory: +class FileDirectory(Codable): dir_path: Path diff --git a/aligned/sources/redshift.py b/aligned/sources/redshift.py index 3894c4a5..e1f06e1b 100644 --- a/aligned/sources/redshift.py +++ b/aligned/sources/redshift.py @@ -2,10 +2,9 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta -from typing import Callable, Any, TYPE_CHECKING +from typing import Any, TYPE_CHECKING from aligned import RedisConfig -from aligned.compiler.model import EntityDataSource, SqlEntityDataSource from aligned.data_source.batch_data_source import BatchDataSource, ColumnFeatureMappable from aligned.enricher import Enricher from aligned.request.retrival_request import RetrivalRequest @@ -81,9 +80,6 @@ def data_enricher( def with_schema(self, name: str) -> RedshiftSQLConfig: return RedshiftSQLConfig(env_var=self.env_var, schema=name) - def entity_source(self, timestamp_column: str, sql: Callable[[str], str]) -> EntityDataSource: - return SqlEntityDataSource(sql, self.url, timestamp_column) - def fetch(self, query: str) -> RetrivalJob: from aligned.redshift.jobs import PostgreSqlJob diff --git a/aligned/tests/test_models_as_feature.py b/aligned/tests/test_models_as_feature.py index 1c5e83de..f6763a54 100644 --- a/aligned/tests/test_models_as_feature.py +++ b/aligned/tests/test_models_as_feature.py @@ -1,21 +1,19 @@ from aligned import Bool, FeatureStore, FileSource, Int32, String -from aligned.feature_view.feature_view import FeatureView -from aligned.compiler.model import ModelContract +from aligned.feature_view.feature_view import feature_view +from aligned.compiler.model import model_contract from aligned.schemas.feature import FeatureLocation -class View(FeatureView): - - metadata = FeatureView.metadata_with('view', 'test', FileSource.csv_at('')) +@feature_view('view', FileSource.csv_at(''), 'test') +class View: view_id = Int32().as_entity() feature_a = String() -class OtherView(FeatureView): - - metadata = FeatureView.metadata_with('other', 'test', FileSource.csv_at('')) +@feature_view('other', FileSource.csv_at(''), 'test') +class OtherView: other_id = Int32().as_entity() @@ -23,25 +21,27 @@ class OtherView(FeatureView): is_true = Bool() -class First(ModelContract): +view = View() +other = OtherView() - view = View() - other = OtherView() - metadata = ModelContract.metadata_with('test_model', features=[view.feature_a, other.feature_b]) +@model_contract('test_model', features=[view.feature_a, other.feature_b]) +class First: target = other.is_true.as_classification_label() -class Second(ModelContract): +first = First() - first = First() - metadata = ModelContract.metadata_with('second_model', features=[first.target]) +@model_contract('second_model', features=[first.target]) +class Second: + other_id = Int32().as_entity() + view_id = Int32().as_entity() def test_model_referenced_as_feature() -> None: - model = Second.compile() + model = Second.compile() # type: ignore feature = list(model.features)[0] @@ -52,9 +52,9 @@ def test_model_referenced_as_feature() -> None: def test_model_request() -> None: store = FeatureStore.experimental() - store.add_feature_view(View()) - store.add_feature_view(OtherView()) - store.add_model(First()) + store.add_feature_view(View) # type: ignore + store.add_feature_view(OtherView) # type: ignore + store.add_model(First) assert len(store.feature_views) == 2 diff --git a/aligned/tests/test_train_test_validate_set.py b/aligned/tests/test_train_test_validate_set.py index c938ac48..02113a0b 100644 --- a/aligned/tests/test_train_test_validate_set.py +++ b/aligned/tests/test_train_test_validate_set.py @@ -1,8 +1,10 @@ import pytest +from pathlib import Path from aligned.feature_store import FeatureStore from aligned.retrival_job import split -from aligned.sources.local import CsvFileSource +from aligned.schemas.folder import DatasetMetadata +from aligned.sources.local import CsvFileSource, FileSource @pytest.mark.asyncio @@ -50,3 +52,65 @@ async def test_train_test_validate_set(titanic_feature_store: FeatureStore) -> N assert 'passenger_id' not in dataset.train_input.columns assert 'survived' not in dataset.train_input.columns + + +@pytest.mark.asyncio +async def test_train_test_validate_set_new(titanic_feature_store: FeatureStore) -> None: + from aligned.schemas.folder import JsonDatasetStore + + unlink_paths = [ + 'test_data/titanic-sets.json', + 'test_data/titanic-train.csv', + 'test_data/titanic-test.csv', + 'test_data/titanic-validate.csv', + ] + + for path_str in unlink_paths: + path = Path(path_str) + if path.exists(): + path.unlink() + + dataset_size = 100 + train_fraction = 0.6 + validation_fraction = 0.2 + + train_size = int(round(dataset_size * train_fraction)) + test_size = int(round(dataset_size * (1 - train_fraction - validation_fraction))) + validate_size = int(round(dataset_size * validation_fraction)) + + dataset_store = FileSource.json_at('test_data/titanic-sets.json') + dataset = ( + titanic_feature_store.feature_view('titanic') + .all(limit=dataset_size) + .train_test_validate(train_fraction, validation_fraction, target_column='survived') + .store_dataset( + dataset_store, + metadata=DatasetMetadata( + id='titanic_test', + ), + train_source=FileSource.csv_at('test_data/titanic-train.csv'), + test_source=FileSource.csv_at('test_data/titanic-test.csv'), + validate_source=FileSource.csv_at('test_data/titanic-validate.csv'), + ) + ) + + train = await dataset.train.to_pandas() + test = await dataset.test.to_pandas() + validate = await dataset.validate.to_pandas() + + datasets = await JsonDatasetStore(dataset_store).list_datasets() + + assert len(datasets.train_test_validation) == 1 + train_dataset = datasets.train_test_validation[0] + + assert train.data.shape[0] == train_size + assert test.data.shape[0] == test_size + assert validate.data.shape[0] == validate_size + + assert train_dataset.train_size_fraction == train_fraction + + assert 'passenger_id' in train.data.columns + assert 'survived' in train.data.columns + + assert 'passenger_id' not in train.input.columns + assert 'survived' not in train.input.columns diff --git a/aligned/tests/test_transformations.py b/aligned/tests/test_transformations.py index cfc82b65..ea009bab 100644 --- a/aligned/tests/test_transformations.py +++ b/aligned/tests/test_transformations.py @@ -4,7 +4,7 @@ from aligned.feature_store import FeatureStore from aligned.feature_view.feature_view import feature_view from aligned.schemas.transformation import SupportedTransformations -from aligned.sources.local import FileSource +from aligned.sources.local import FileSource, CsvFileSource @pytest.mark.asyncio @@ -48,7 +48,7 @@ class TestAgg: credit_card_due_sum = credit_card_due.aggregate().over(days=1).sum() student_loan_due_mean = student_loan_due.aggregate().over(days=1).mean() - df = await TestAgg.query().all().to_pandas() + df = await TestAgg.query().all().to_pandas() # type: ignore assert df.shape[0] == 6 @@ -66,7 +66,7 @@ class TestAgg: credit_card_due_sum = credit_card_due.aggregate().sum() student_loan_due_mean = student_loan_due.aggregate().mean() - df = await TestAgg.query().all().to_pandas() + df = await TestAgg.query().all().to_pandas() # type: ignore assert df.shape[0] == 3 @@ -90,10 +90,25 @@ class TestAgg: credit_card_due_sum = credit_card_due.aggregate().sum() student_loan_due_mean = student_loan_due.aggregate().mean() - org_values_job = TestAgg.query().using_source(TestAgg.metadata.source).all() + org_values_job = TestAgg.query().using_source(TestAgg.metadata.source).all() # type: ignore await org_values_job.write_to_source(materialized_source) values = await org_values_job.to_polars() - df = await TestAgg.query().all().to_polars() + df = await TestAgg.query().all().to_polars() # type: ignore assert df.sort('dob_ssn').collect().frame_equal(values.sort('dob_ssn').select(df.columns).collect()) + + +@pytest.mark.asyncio +async def test_transform_entity(titanic_source: CsvFileSource) -> None: + @feature_view(name='titanic', source=titanic_source) + class Titanic: + + passenger_id = Int32() + cabin = String().fill_na(passenger_id).as_entity() + + sex = String() + + data = await Titanic.query().all().to_pandas() # type: ignore + + assert data['cabin'].isnull().sum() == 0 diff --git a/conftest.py b/conftest.py index b7592176..f4b20802 100644 --- a/conftest.py +++ b/conftest.py @@ -18,7 +18,7 @@ TextVectoriserModel, ) from aligned.feature_view.feature_view import FeatureView, FeatureViewMetadata -from aligned.compiler.model import ModelContract +from aligned.compiler.model import model_contract, ModelContractWrapper from aligned.feature_store import FeatureStore from aligned.feature_view.combined_view import CombinedFeatureView, CombinedFeatureViewMetadata from aligned.retrival_job import DerivedFeatureJob, RetrivalJob, RetrivalRequest @@ -431,26 +431,26 @@ class TitanicPassenger(FeatureView): @pytest.fixture -def titanic_model(titanic_feature_view: FeatureView) -> ModelContract: - class Titanic(ModelContract): - - features = titanic_feature_view - - metadata = ModelContract.metadata_with( - 'titanic', - description='A model predicting if a passenger will survive', - features=[ - features.age, # type: ignore - features.sibsp, # type: ignore - features.has_siblings, # type: ignore - features.is_male, # type: ignore - features.is_mr, # type: ignore - ], - ) +def titanic_model(titanic_feature_view: FeatureView) -> ModelContractWrapper: + + features = titanic_feature_view + + @model_contract( + name='titanic', + description='A model predicting if a passenger will survive', + features=[ + features.age, # type: ignore + features.sibsp, # type: ignore + features.has_siblings, # type: ignore + features.is_male, # type: ignore + features.is_mr, # type: ignore + ], + ) + class Titanic: will_survive = features.survived.as_classification_label() # type: ignore - return Titanic() + return Titanic @pytest.fixture @@ -488,7 +488,9 @@ class TitanicPassenger(FeatureView): @pytest_asyncio.fixture async def titanic_feature_store( - titanic_feature_view: FeatureView, titanic_feature_view_parquet: FeatureView, titanic_model: ModelContract + titanic_feature_view: FeatureView, + titanic_feature_view_parquet: FeatureView, + titanic_model: ModelContractWrapper, ) -> FeatureStore: feature_store = FeatureStore.experimental() feature_store.add_feature_view(titanic_feature_view) @@ -621,28 +623,28 @@ class TitanicPassenger(FeatureView): @pytest.fixture -def titanic_model_scd(titanic_feature_view_scd: FeatureView) -> ModelContract: - class Titanic(ModelContract): +def titanic_model_scd(titanic_feature_view_scd: FeatureView) -> ModelContractWrapper: - features = titanic_feature_view_scd + features = titanic_feature_view_scd - metadata = ModelContract.metadata_with( - 'titanic', - description='A model predicting if a passenger will survive', - features=[features.age, features.sibsp, features.has_siblings, features.is_male], # type: ignore - ) + @model_contract( + 'titanic', + description='A model predicting if a passenger will survive', + features=[features.age, features.sibsp, features.has_siblings, features.is_male], # type: ignore + ) + class Titanic: will_survive = features.survived.as_classification_label() # type: ignore probability = will_survive.probability_of(True) - return Titanic() + return Titanic @pytest_asyncio.fixture async def titanic_feature_store_scd( titanic_feature_view_scd: FeatureView, titanic_feature_view_parquet: FeatureView, - titanic_model_scd: ModelContract, + titanic_model_scd: ModelContractWrapper, ) -> FeatureStore: feature_store = FeatureStore.experimental() feature_store.add_feature_view(titanic_feature_view_scd) diff --git a/pyproject.toml b/pyproject.toml index b2392b00..da1b7290 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aligned" -version = "0.0.54" +version = "0.0.55" description = "A data managment and lineage tool for ML applications." authors = ["Mats E. Mollestad "] license = "Apache-2.0" diff --git a/test_data/credit_history_mater.parquet b/test_data/credit_history_mater.parquet index 75dc1727c43e7ac14a5adb769153d80c4777f74c..8604240f02f4657214eaaf8a42092361f0bbd16f 100644 GIT binary patch delta 242 zcmeyx{)1gRz%j^Blugt`)FcK-F@Q0PD1#sa12ek>0|O%?$Bx|&jO;oMoM$HL=r|;( zae&lhi6&qwH+JAKPCoA88Hsg1Tml?K-M-!(H=%9`ybO}bH)_LjLCBt zRd_&75Zl5cD#D<4X!03GDL#;Sj0|EYSVScl)E-R!!l)?(G)(LPi>Mfb*fl0GHZ=~h LbDQ;0Aa;gD mRDwb63A4<^59&fd9_YU1JhsQ{xc3v{{$wG$R1NDKRSm diff --git a/test_data/feature-store.json b/test_data/feature-store.json index 987c3bbc..19806304 100644 --- a/test_data/feature-store.json +++ b/test_data/feature-store.json @@ -1 +1 @@ -{"metadata": {"created_at": "2023-12-16T11:34:22.035088", "name": "feature_store_location.py", "github_url": null}, "feature_views": [{"name": "titanic_parquet", "tags": {}, "source": {"mapping_keys": {}, "type_name": "parquet", "path": "test_data/titanic.parquet", "config": {"engine": "auto", "compression": "snappy", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 20.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 100.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}], "derived_features": [{"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": null, "stream_data_source": null, "application_source": null, "materialized_source": null, "event_triggers": null, "contacts": null, "indexes": []}, {"name": "titanic", "tags": {}, "source": {"mapping_keys": {"PassengerId": "passenger_id", "Age": "age", "Sex": "sex", "Survived": "survived", "SibSp": "sibsp", "UpdatedAt": "updated_at"}, "type_name": "csv", "path": "test_data/titanic_scd_data.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "updated_at", "dtype": {"name": "datetime"}, "description": null, "tags": null, "constraints": null}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 20.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 100.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}], "derived_features": [{"name": "square_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul", "dtype": {"name": "float"}, "front": "sibsp", "behind": "sibsp"}, "depth": 1}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "double_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul_val", "dtype": {"name": "float"}, "key": "sibsp", "value": {"name": "int", "value": 2}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "word_vectoriser", "dtype": {"name": "embedding"}, "key": "name", "model": {"name": "gensim", "model_name": "glove-wiki-gigaword-50", "config": {"to_lowercase": false, "deaccent": false, "encoding": "utf8", "errors": "strict"}, "loaded_model": null}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": {"name": "updated_at", "ttl": null, "description": null, "tags": null, "dtype": {"name": "datetime"}}, "stream_data_source": {"mapping_keys": {}, "name": "redis", "topic_name": "titanic_stream", "config": {"env_var": "REDIS_URL"}, "record_coder": {"coder_type": "json", "key": "json"}}, "application_source": null, "materialized_source": null, "event_triggers": null, "contacts": null, "indexes": [{"location": {"name": "titanic", "location": "feature_view"}, "vector": {"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null}, "vector_dim": 50, "metadata": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "upper_bound_inc", "value": 100.0}, {"name": "lower_bound_inc", "value": 0.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}], "storage": {"type_name": "redis", "config": {"env_var": "REDIS_URL"}, "name": "name_embedding_index", "initial_cap": 10000, "distance_metric": "COSINE", "index_alogrithm": "FLAT", "embedding_type": "FLOAT32"}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}]}]}], "combined_feature_views": [], "models": [{"name": "titanic", "features": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}, {"name": "is_male", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, {"name": "age", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "float"}}, {"name": "has_siblings", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}], "predictions_view": {"entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "probability", "dtype": {"name": "float"}, "description": "The probability of target named will_survive being 'True'.", "tags": null, "constraints": null}], "derived_features": [{"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "probability", "location": {"name": "titanic", "location": "model"}, "dtype": {"name": "float"}}], "transformation": {"name": "map_arg_max", "dtype": {"name": "bool"}, "column_mappings": {"probability": {"name": "bool", "value": true}}}, "depth": 1}], "model_version_column": null, "event_timestamp": null, "source": null, "application_source": null, "stream_source": null, "regression_targets": [], "classification_targets": [{"estimating": {"name": "survived", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, "feature": {"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null}, "on_ground_truth_event": null, "event_trigger": null, "class_probabilities": [{"outcome": {"name": "bool", "value": true}, "feature": {"name": "probability", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null}}], "confidence": null}]}, "description": "A model predicting if a passenger will survive", "contacts": null, "tags": null, "dataset_folder": null}], "enrichers": []} +{"metadata": {"created_at": "2023-12-25T10:06:34.473870", "name": "feature_store_location.py", "github_url": null}, "feature_views": [{"name": "titanic_parquet", "tags": {}, "source": {"mapping_keys": {}, "type_name": "parquet", "path": "test_data/titanic.parquet", "config": {"engine": "auto", "compression": "snappy", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 20.0}]}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}], "derived_features": [{"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic_parquet", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": null, "stream_data_source": null, "application_source": null, "materialized_source": null, "event_triggers": null, "contacts": null, "indexes": []}, {"name": "titanic", "tags": {}, "source": {"mapping_keys": {"PassengerId": "passenger_id", "Age": "age", "Sex": "sex", "Survived": "survived", "SibSp": "sibsp", "UpdatedAt": "updated_at"}, "type_name": "csv", "path": "test_data/titanic_scd_data.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 20.0}]}, {"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}, {"name": "updated_at", "dtype": {"name": "datetime"}, "description": null, "tags": null, "constraints": null}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": null}], "derived_features": [{"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "word_vectoriser", "dtype": {"name": "embedding"}, "key": "name", "model": {"name": "gensim", "model_name": "glove-wiki-gigaword-50", "config": {"to_lowercase": false, "deaccent": false, "encoding": "utf8", "errors": "strict"}, "loaded_model": null}}, "depth": 1}, {"name": "double_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul_val", "dtype": {"name": "float"}, "key": "sibsp", "value": {"name": "int", "value": 2}}, "depth": 1}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "square_sibsp", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "mul", "dtype": {"name": "float"}, "front": "sibsp", "behind": "sibsp"}, "depth": 1}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}], "description": "Some features from the titanic dataset", "aggregated_features": [], "event_timestamp": {"name": "updated_at", "ttl": null, "description": null, "tags": null, "dtype": {"name": "datetime"}}, "stream_data_source": {"mapping_keys": {}, "name": "redis", "topic_name": "titanic_stream", "config": {"env_var": "REDIS_URL"}, "record_coder": {"coder_type": "json", "key": "json"}}, "application_source": null, "materialized_source": null, "event_triggers": null, "contacts": null, "indexes": [{"location": {"name": "titanic", "location": "feature_view"}, "vector": {"name": "name_embedding", "dtype": {"name": "embedding"}, "description": null, "tags": null, "constraints": null}, "vector_dim": 50, "metadata": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "in_domain", "values": ["male", "female"]}]}], "storage": {"type_name": "redis", "config": {"env_var": "REDIS_URL"}, "name": "name_embedding_index", "initial_cap": 10000, "distance_metric": "COSINE", "index_alogrithm": "FLAT", "embedding_type": "FLOAT32"}, "entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}]}]}], "combined_feature_views": [], "models": [{"name": "titanic", "features": [{"name": "is_male", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, {"name": "age", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "float"}}, {"name": "has_siblings", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, {"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "predictions_view": {"entities": [], "features": [{"name": "probability", "dtype": {"name": "float"}, "description": "The probability of target named will_survive being 'True'.", "tags": null, "constraints": null}], "derived_features": [{"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "probability", "location": {"name": "titanic", "location": "model"}, "dtype": {"name": "float"}}], "transformation": {"name": "map_arg_max", "dtype": {"name": "bool"}, "column_mappings": {"probability": {"name": "bool", "value": true}}}, "depth": 1}], "model_version_column": null, "event_timestamp": null, "source": null, "application_source": null, "stream_source": null, "regression_targets": [], "classification_targets": [{"estimating": {"name": "survived", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "bool"}}, "feature": {"name": "will_survive", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null}, "on_ground_truth_event": null, "event_trigger": null, "class_probabilities": [{"outcome": {"name": "bool", "value": true}, "feature": {"name": "probability", "dtype": {"name": "float"}, "description": null, "tags": null, "constraints": null}}], "confidence": null}]}, "description": "A model predicting if a passenger will survive", "contacts": null, "tags": null, "dataset_store": null}], "enrichers": []} diff --git a/test_data/test_model.parquet b/test_data/test_model.parquet index 42d1080d54192053a781346d175c656f69f42add..bf0079638d7a39c629ece46707a19e7d35afce00 100644 GIT binary patch delta 53 scmeys@_}W7oUjN3CnEz3C%Xd!9|Ht%O*AwUW`+whL4_GNR()dx0K~2a#Q*>R delta 53 tcmeys@_}W7oG=RmCnEzFC%XfK2m=K0O*Axv2s46&nW4f=8>_xC0szDR2E_mX diff --git a/test_data/titanic-sets.json b/test_data/titanic-sets.json new file mode 100644 index 00000000..6bc36d9a --- /dev/null +++ b/test_data/titanic-sets.json @@ -0,0 +1 @@ +{"raw_data": [], "train_test": [], "train_test_validation": [{"id": "titanic_test", "name": null, "request_result": {"entities": [{"name": "passenger_id", "dtype": {"name": "int32"}, "description": null, "tags": null, "constraints": null}], "features": [{"name": "age", "dtype": {"name": "float"}, "description": "A float as some have decimals", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "upper_bound_inc", "value": 100.0}]}, {"name": "is_mr", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "name", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "contains", "dtype": {"name": "bool"}, "key": "name", "value": "Mr."}, "depth": 1}, {"name": "has_siblings", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sibsp", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "int32"}}], "transformation": {"name": "not-equals", "dtype": {"name": "bool"}, "key": "sibsp", "value": {"name": "int", "value": 0}}, "depth": 1}, {"name": "is_female", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "female"}}, "depth": 1}, {"name": "sex", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}, {"name": "in_domain", "values": ["male", "female"]}]}, {"name": "cabin", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}]}, {"name": "sibsp", "dtype": {"name": "int32"}, "description": "Number of siblings on titanic", "tags": null, "constraints": [{"name": "lower_bound_inc", "value": 0.0}, {"name": "optional"}, {"name": "upper_bound_inc", "value": 20.0}]}, {"name": "is_male", "dtype": {"name": "bool"}, "description": null, "tags": null, "constraints": null, "depending_on": [{"name": "sex", "location": {"name": "titanic", "location": "feature_view"}, "dtype": {"name": "string"}}], "transformation": {"name": "equals", "dtype": {"name": "bool"}, "key": "sex", "value": {"name": "string", "value": "male"}}, "depth": 1}, {"name": "name", "dtype": {"name": "string"}, "description": null, "tags": null, "constraints": [{"name": "optional"}]}, {"name": "survived", "dtype": {"name": "bool"}, "description": "If the passenger survived", "tags": null, "constraints": null}], "event_timestamp": null}, "train_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-train.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}}, "test_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-test.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}}, "validation_dataset": {"mapping_keys": {}, "type_name": "csv", "path": "test_data/titanic-validate.csv", "csv_config": {"seperator": ",", "compression": "infer", "should_write_index": false}}, "train_size_fraction": 0.6, "test_size_fraction": 0.20000000000000007, "validate_size_fraction": 0.19999999999999996, "target": ["survived"], "description": null, "tags": null}], "active_learning": []} diff --git a/test_data/titanic-test.csv b/test_data/titanic-test.csv new file mode 100644 index 00000000..39f098cf --- /dev/null +++ b/test_data/titanic-test.csv @@ -0,0 +1,21 @@ +age,passenger_id,is_mr,has_siblings,is_female,sex,cabin,sibsp,is_male,name,survived +22.0,61,True,False,False,male,,0,True,"Sirayanian, Mr. Orsen",False +38.0,62,False,False,True,female,B28,0,False,"Icard, Miss. Amelie",True +45.0,63,True,True,False,male,C83,1,True,"Harris, Mr. Henry Birkhardt",False +4.0,64,False,True,False,male,,3,True,"Skoog, Master. Harald",False +,65,True,False,False,male,,0,True,"Stewart, Mr. Albert A",False +,66,False,True,False,male,,1,True,"Moubarek, Master. Gerios",True +29.0,67,True,False,True,female,F33,0,False,"Nye, Mrs. (Elizabeth Ramell)",True +19.0,68,True,False,False,male,,0,True,"Crease, Mr. Ernest James",False +17.0,69,False,True,True,female,,4,False,"Andersson, Miss. Erna Alexandra",True +26.0,70,True,True,False,male,,2,True,"Kink, Mr. Vincenz",False +32.0,71,True,False,False,male,,0,True,"Jenkin, Mr. Stephen Curnow",False +16.0,72,False,True,True,female,,5,False,"Goodwin, Miss. Lillian Amy",False +21.0,73,True,False,False,male,,0,True,"Hood, Mr. Ambrose Jr",False +26.0,74,True,True,False,male,,1,True,"Chronopoulos, Mr. Apostolos",False +32.0,75,True,False,False,male,,0,True,"Bing, Mr. Lee",True +25.0,76,True,False,False,male,F G73,0,True,"Moen, Mr. Sigurd Hansen",False +,77,True,False,False,male,,0,True,"Staneff, Mr. Ivan",False +,78,True,False,False,male,,0,True,"Moutal, Mr. Rahamin Haim",False +0.83,79,False,False,False,male,,0,True,"Caldwell, Master. Alden Gates",True +30.0,80,False,False,True,female,,0,False,"Dowdell, Miss. Elizabeth",True diff --git a/test_data/titanic-train.csv b/test_data/titanic-train.csv new file mode 100644 index 00000000..4399215b --- /dev/null +++ b/test_data/titanic-train.csv @@ -0,0 +1,61 @@ +age,passenger_id,is_mr,has_siblings,is_female,sex,cabin,sibsp,is_male,name,survived +22.0,1,True,True,False,male,,1,True,"Braund, Mr. Owen Harris",False +38.0,2,True,True,True,female,C85,1,False,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",True +26.0,3,False,False,True,female,,0,False,"Heikkinen, Miss. Laina",True +35.0,4,True,True,True,female,C123,1,False,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",True +35.0,5,True,False,False,male,,0,True,"Allen, Mr. William Henry",False +,6,True,False,False,male,,0,True,"Moran, Mr. James",False +54.0,7,True,False,False,other,E46,0,False,"McCarthy, Mr. Timothy J",False +2.0,8,False,True,False,male,,3,True,"Palsson, Master. Gosta Leonard",False +27.0,9,True,False,True,female,,0,False,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",True +14.0,10,True,True,True,female,,1,False,"Nasser, Mrs. Nicholas (Adele Achem)",True +4.0,11,False,True,True,female,G6,1,False,"Sandstrom, Miss. Marguerite Rut",True +58.0,12,False,False,True,female,C103,0,False,"Bonnell, Miss. Elizabeth",True +20.0,13,True,False,False,male,,0,True,"Saundercock, Mr. William Henry",False +39.0,14,True,True,False,male,,1,True,"Andersson, Mr. Anders Johan",False +14.0,15,False,False,True,female,,0,False,"Vestrom, Miss. Hulda Amanda Adolfina",False +55.0,16,True,False,True,female,,0,False,"Hewlett, Mrs. (Mary D Kingcome) ",True +2.0,17,False,True,False,male,,4,True,"Rice, Master. Eugene",False +,18,True,False,False,male,,0,True,"Williams, Mr. Charles Eugene",True +31.0,19,True,True,True,female,,1,False,"Vander Planke, Mrs. Julius (Emelia Maria Vandemoortele)",False +,20,True,False,True,female,,0,False,"Masselmani, Mrs. Fatima",True +35.0,21,True,False,False,male,,0,True,"Fynney, Mr. Joseph J",False +34.0,22,True,False,False,male,D56,0,True,"Beesley, Mr. Lawrence",True +15.0,23,False,False,True,female,,0,False,"McGowan, Miss. Anna ""Annie""",True +28.0,24,True,False,False,male,A6,0,True,"Sloper, Mr. William Thompson",True +8.0,25,False,True,True,female,,3,False,"Palsson, Miss. Torborg Danira",False +38.0,26,True,True,True,female,,1,False,"Asplund, Mrs. Carl Oscar (Selma Augusta Emilia Johansson)",True +,27,True,False,False,male,,0,True,"Emir, Mr. Farred Chehab",False +19.0,28,True,True,False,male,C23 C25 C27,3,True,"Fortune, Mr. Charles Alexander",False +,29,False,False,True,female,,0,False,"O'Dwyer, Miss. Ellen ""Nellie""",True +,30,True,False,False,male,,0,True,"Todoroff, Mr. Lalio",False +40.0,31,False,False,False,male,,0,True,"Uruchurtu, Don. Manuel E",False +,32,True,True,True,female,B78,1,False,"Spencer, Mrs. William Augustus (Marie Eugenie)",True +,33,False,False,True,female,,0,False,"Glynn, Miss. Mary Agatha",True +66.0,34,True,False,False,male,,0,True,"Wheadon, Mr. Edward H",False +28.0,35,True,True,False,male,,1,True,"Meyer, Mr. Edgar Joseph",False +42.0,36,True,True,False,male,,1,True,"Holverson, Mr. Alexander Oskar",False +,37,True,False,False,male,,0,True,"Mamee, Mr. Hanna",True +21.0,38,True,False,False,male,,0,True,"Cann, Mr. Ernest Charles",False +18.0,39,False,True,True,female,,2,False,"Vander Planke, Miss. Augusta Maria",False +14.0,40,False,True,True,female,,1,False,"Nicola-Yarred, Miss. Jamila",True +40.0,41,True,True,True,female,,1,False,"Ahlin, Mrs. Johan (Johanna Persdotter Larsson)",False +27.0,42,True,True,True,female,,1,False,"Turpin, Mrs. William John Robert (Dorothy Ann Wonnacott)",False +,43,True,False,False,male,,0,True,"Kraeff, Mr. Theodor",False +3.0,44,False,True,True,female,,1,False,"Laroche, Miss. Simonne Marie Anne Andree",True +19.0,45,False,False,True,female,,0,False,"Devaney, Miss. Margaret Delia",True +,46,True,False,False,male,,0,True,"Rogers, Mr. William John",False +,47,True,True,False,male,,1,True,"Lennon, Mr. Denis",False +,48,False,False,True,female,,0,False,"O'Driscoll, Miss. Bridget",True +,49,True,True,False,male,,2,True,"Samaan, Mr. Youssef",False +18.0,50,True,True,True,female,,1,False,"Arnold-Franchi, Mrs. Josef (Josefine Franchi)",False +7.0,51,False,True,False,male,,4,True,"Panula, Master. Juha Niilo",False +21.0,52,True,False,False,male,,0,True,"Nosworthy, Mr. Richard Cater",False +49.0,53,True,True,True,female,D33,1,False,"Harper, Mrs. Henry Sleeper (Myna Haxtun)",True +29.0,54,True,True,True,female,,1,False,"Faunthorpe, Mrs. Lizzie (Elizabeth Anne Wilkinson)",True +65.0,55,True,False,False,male,B30,0,True,"Ostby, Mr. Engelhart Cornelius",False +,56,True,False,False,male,C52,0,True,"Woolner, Mr. Hugh",True +21.0,57,False,False,True,female,,0,False,"Rugg, Miss. Emily",True +28.5,58,True,False,False,male,,0,True,"Novel, Mr. Mansouer",False +5.0,59,False,True,True,female,,1,False,"West, Miss. Constance Mirium",True +11.0,60,False,True,False,male,,5,True,"Goodwin, Master. William Frederick",False diff --git a/test_data/titanic-validate.csv b/test_data/titanic-validate.csv new file mode 100644 index 00000000..7c892f7a --- /dev/null +++ b/test_data/titanic-validate.csv @@ -0,0 +1,21 @@ +age,passenger_id,is_mr,has_siblings,is_female,sex,cabin,sibsp,is_male,name,survived +22.0,81,True,False,False,male,,0,True,"Waelens, Mr. Achille",False +29.0,82,True,False,False,male,,0,True,"Sheerlinck, Mr. Jan Baptist",True +,83,False,False,True,female,,0,False,"McDermott, Miss. Brigdet Delia",True +28.0,84,True,False,False,male,,0,True,"Carrau, Mr. Francisco M",False +17.0,85,False,False,True,female,,0,False,"Ilett, Miss. Bertha",True +33.0,86,True,True,True,female,,3,False,"Backstrom, Mrs. Karl Alfred (Maria Mathilda Gustafsson)",True +16.0,87,True,True,False,male,,1,True,"Ford, Mr. William Neal",False +,88,True,False,False,male,,0,True,"Slocovski, Mr. Selman Francis",False +23.0,89,False,True,True,female,C23 C25 C27,3,False,"Fortune, Miss. Mabel Helen",True +24.0,90,True,False,False,male,,0,True,"Celotti, Mr. Francesco",False +29.0,91,True,False,False,male,,0,True,"Christmann, Mr. Emil",False +20.0,92,True,False,False,male,,0,True,"Andreasson, Mr. Paul Edvin",False +46.0,93,True,True,False,male,E31,1,True,"Chaffee, Mr. Herbert Fuller",False +26.0,94,True,True,False,male,,1,True,"Dean, Mr. Bertram Frank",False +59.0,95,True,False,False,male,,0,True,"Coxon, Mr. Daniel",False +,96,True,False,False,male,,0,True,"Shorney, Mr. Charles Joseph",False +71.0,97,True,False,False,male,A5,0,True,"Goldschmidt, Mr. George B",False +23.0,98,True,False,False,male,D10 D12,0,True,"Greenfield, Mr. William Bertram",True +34.0,99,True,False,True,female,,0,False,"Doling, Mrs. John T (Ada Julia Bone)",True +34.0,100,True,True,False,male,,1,True,"Kantor, Mr. Sinai",False