diff --git a/aligned/__init__.py b/aligned/__init__.py index 25be87cc..45c6ee66 100644 --- a/aligned/__init__.py +++ b/aligned/__init__.py @@ -18,7 +18,7 @@ feature_view, combined_feature_view, ) -from aligned.schemas.text_vectoriser import TextVectoriserModel +from aligned.schemas.text_vectoriser import EmbeddingModel from aligned.sources.kafka import KafkaConfig from aligned.sources.local import FileSource from aligned.sources.psql import PostgreSQLConfig @@ -53,7 +53,7 @@ 'EventTimestamp', 'Timestamp', 'Json', - 'TextVectoriserModel', + 'EmbeddingModel', 'feature_view', 'combined_feature_view', 'model_contract', diff --git a/aligned/compiler/feature_factory.py b/aligned/compiler/feature_factory.py index b2df756b..082c366d 100644 --- a/aligned/compiler/feature_factory.py +++ b/aligned/compiler/feature_factory.py @@ -30,7 +30,7 @@ from aligned.schemas.target import ClassificationTarget as ClassificationTargetSchemas from aligned.schemas.target import ClassTargetProbability from aligned.schemas.target import RegressionTarget as RegressionTargetSchemas -from aligned.schemas.transformation import TextVectoriserModel, Transformation +from aligned.schemas.transformation import EmbeddingModel, Transformation from aligned.schemas.vector_storage import VectorStorage if TYPE_CHECKING: @@ -969,7 +969,7 @@ def contains(self, value: str) -> Bool: feature.transformation = ContainsFactory(value, self) return feature - def sentence_vector(self, model: TextVectoriserModel) -> Embedding: + def sentence_vector(self, model: EmbeddingModel) -> Embedding: from aligned.compiler.transformation_factory import WordVectoriserFactory feature = Embedding() @@ -977,7 +977,7 @@ def sentence_vector(self, model: TextVectoriserModel) -> Embedding: feature.embedding_size = model.embedding_size return feature - def embedding(self, model: TextVectoriserModel) -> Embedding: + def embedding(self, model: EmbeddingModel) -> Embedding: return self.sentence_vector(model) def append(self, other: FeatureFactory | str) -> String: diff --git a/aligned/compiler/transformation_factory.py b/aligned/compiler/transformation_factory.py index 9378954a..681c0289 100644 --- a/aligned/compiler/transformation_factory.py +++ b/aligned/compiler/transformation_factory.py @@ -8,7 +8,7 @@ from aligned import AwsS3Config from aligned.compiler.feature_factory import FeatureFactory, Transformation, TransformationFactory -from aligned.schemas.transformation import FillNaValuesColumns, LiteralValue, TextVectoriserModel +from aligned.schemas.transformation import FillNaValuesColumns, LiteralValue, EmbeddingModel logger = logging.getLogger(__name__) @@ -670,7 +670,7 @@ def copy(self) -> 'MeanTransfomrationFactory': class WordVectoriserFactory(TransformationFactory): feature: FeatureFactory - model: TextVectoriserModel + model: EmbeddingModel @property def using_features(self) -> list[FeatureFactory]: diff --git a/aligned/data_source/batch_data_source.py b/aligned/data_source/batch_data_source.py index 28bdb170..0d55ba19 100644 --- a/aligned/data_source/batch_data_source.py +++ b/aligned/data_source/batch_data_source.py @@ -324,6 +324,7 @@ def all_between_dates( return ( self.source.all_between_dates(request, start_date, end_date) .filter(self.condition) + .aggregate(request) .derive_features([request]) ) @@ -334,7 +335,12 @@ def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: else: request.derived_features.add(self.condition) - return self.source.all_data(request, limit).filter(self.condition).derive_features([request]) + return ( + self.source.all_data(request, limit) + .filter(self.condition) + .aggregate(request) + .derive_features([request]) + ) def depends_on(self) -> set[FeatureLocation]: return self.source.depends_on() @@ -498,6 +504,7 @@ def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: left_on=self.left_on, right_on=self.right_on, ) + .aggregate(request) .derive_features([request]) ) @@ -519,6 +526,7 @@ def all_between_dates( left_on=self.left_on, right_on=self.right_on, ) + .aggregate(request) .derive_features([request]) ) @@ -599,6 +607,7 @@ def all_data(self, request: RetrivalRequest, limit: int | None) -> RetrivalJob: self.source.all_data(self.left_request, limit=limit) .derive_features([self.left_request]) .join(right_job, method=self.method, left_on=self.left_on, right_on=self.right_on) + .aggregate(request) .derive_features([request]) ) @@ -620,6 +629,7 @@ def all_between_dates( left_on=self.left_on, right_on=self.right_on, ) + .aggregate(request) .derive_features([request]) ) diff --git a/aligned/feature_store.py b/aligned/feature_store.py index f96003bd..bb40359b 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -31,9 +31,8 @@ StreamAggregationJob, SupervisedJob, ConvertableToRetrivalJob, - SupervisedTrainJob, ) -from aligned.schemas.feature import FeatureLocation, Feature +from aligned.schemas.feature import FeatureLocation, Feature, FeatureReferance from aligned.schemas.feature_view import CompiledFeatureView from aligned.schemas.model import EventTrigger from aligned.schemas.model import Model as ModelSchema @@ -602,7 +601,11 @@ def model_features_for(self, view_name: str) -> set[str]: all_model_features: set[str] = set() for model in self.models.values(): all_model_features.update( - {feature.name for feature in model.features if feature.location.name == view_name} + { + feature.name + for feature in model.features.default_features + if feature.location.name == view_name + } ) return all_model_features @@ -880,7 +883,8 @@ def cached_at(self, location: DataFileReference) -> RetrivalJob: """ from aligned.local.job import FileFullJob - features = {f'{feature.location.identifier}:{feature.name}' for feature in self.model.features} + references = self.model.feature_references(self.selected_version) + features = {f'{feature.location.identifier}:{feature.name}' for feature in references} request = self.store.requests_for(RawStringFeatureRequest(features)) return FileFullJob(location, RetrivalRequest.unsafe_combine(request.needed_requests)).select_columns( @@ -1065,15 +1069,14 @@ class TaxiEta: """ await self.store.insert_into(FeatureLocation.model(self.model.name), predictions) - async def store_train_test_dataset(self, job: SupervisedTrainJob) -> SupervisedTrainJob: - pass - @dataclass class SupervisedModelFeatureStore: model: ModelSchema store: FeatureStore + labels_estimates_refs: set[FeatureReferance] + selected_version: str | None = None def features_for( @@ -1114,7 +1117,7 @@ def features_for( features = {f'{feature.location.identifier}:{feature.name}' for feature in feature_refs} pred_view = self.model.predictions_view - target_feature_refs = pred_view.labels_estimates_refs() + target_feature_refs = self.labels_estimates_refs target_features = {feature.identifier for feature in target_feature_refs} targets = set() @@ -1201,8 +1204,9 @@ def predictions_for( request = pred_view.request(self.model.name) target_features = pred_view.labels_estimates_refs() - labels = pred_view.labels() target_features = {feature.identifier for feature in target_features} + + labels = pred_view.labels() pred_features = {f'model:{self.model.name}:{feature.name}' for feature in labels} request = self.store.requests_for( RawStringFeatureRequest(pred_features), event_timestamp_column=event_timestamp_column @@ -1411,7 +1415,9 @@ def process_input(self, values: ConvertableToRetrivalJob) -> RetrivalJob: job = RetrivalJob.from_convertable(values, request) - return job.fill_missing_columns().ensure_types([request]).derive_features([request]) + return ( + job.fill_missing_columns().ensure_types([request]).aggregate(request).derive_features([request]) + ) async def batch_write(self, values: ConvertableToRetrivalJob | RetrivalJob) -> None: """Takes a set of features, computes the derived features, and store them in the source diff --git a/aligned/feature_view/feature_view.py b/aligned/feature_view/feature_view.py index 698886a7..63d27094 100644 --- a/aligned/feature_view/feature_view.py +++ b/aligned/feature_view/feature_view.py @@ -289,7 +289,10 @@ class SomeView(FeatureView): store.add_compiled_view(self.compile()) return store.feature_view(self.metadata.name) - async def process(self, data: dict[str, list[Any]]) -> list[dict]: + def process_input(self, data: ConvertableToRetrivalJob) -> RetrivalJob: + return self.query().process_input(data) + + async def process(self, data: ConvertableToRetrivalJob) -> list[dict]: df = await self.query().process_input(data).to_polars() return df.collect().to_dicts() @@ -673,7 +676,7 @@ class MyView: return f""" from aligned import feature_view, {all_types} -{imports or ""} +{imports or ''} @feature_view( name="{view_name}", diff --git a/aligned/retrival_job.py b/aligned/retrival_job.py index 9dc876a9..fca7425f 100644 --- a/aligned/retrival_job.py +++ b/aligned/retrival_job.py @@ -263,9 +263,14 @@ class SupervisedJob: job: RetrivalJob target_columns: set[str] + should_filter_out_null_targets: bool = True async def to_pandas(self) -> SupervisedDataSet[pd.DataFrame]: data = await self.job.to_pandas() + + if self.should_filter_out_null_targets: + data = data.dropna(subset=list(self.target_columns)) + features = { feature.name for feature in self.job.request_result.features @@ -278,6 +283,9 @@ async def to_pandas(self) -> SupervisedDataSet[pd.DataFrame]: async def to_polars(self) -> SupervisedDataSet[pl.LazyFrame]: data = await self.job.to_polars() + if self.should_filter_out_null_targets: + data = data.drop_nulls([column for column in self.target_columns]) + features = [ feature.name for feature in self.job.request_result.features @@ -288,6 +296,10 @@ async def to_polars(self) -> SupervisedDataSet[pl.LazyFrame]: data, set(entities), set(features), self.target_columns, self.job.request_result.event_timestamp ) + def should_filter_null_targets(self, should_filter: bool) -> SupervisedJob: + self.should_filter_out_null_targets = should_filter + return self + @property def request_result(self) -> RequestResult: return self.job.request_result @@ -608,6 +620,8 @@ def select_columns(self, include_features: set[str]) -> RetrivalJob: return SelectColumnsJob(include_features, self) def aggregate(self, request: RetrivalRequest) -> RetrivalJob: + if not request.aggregated_features: + return self return AggregateJob(self, request) def with_request(self, requests: list[RetrivalRequest]) -> RetrivalJob: @@ -650,6 +664,9 @@ def ignore_event_timestamp(self) -> RetrivalJob: return self.copy_with(self.job.ignore_event_timestamp()) raise NotImplementedError('Not implemented ignore_event_timestamp') + def polars_method(self, polars_method: Callable[[pl.LazyFrame], pl.LazyFrame]) -> RetrivalJob: + return CustomPolarsJob(self, polars_method) + @staticmethod def from_dict(data: dict[str, list], request: list[RetrivalRequest] | RetrivalRequest) -> RetrivalJob: if isinstance(request, RetrivalRequest): @@ -724,6 +741,21 @@ def copy_with(self: JobType, job: RetrivalJob) -> JobType: return self +@dataclass +class CustomPolarsJob(RetrivalJob, ModificationJob): + + job: RetrivalJob + polars_method: Callable[[pl.LazyFrame], pl.LazyFrame] + + async def to_polars(self) -> pl.LazyFrame: + df = await self.job.to_polars() + return self.polars_method(df) + + async def to_pandas(self) -> pd.DataFrame: + df = await self.job.to_polars() + return df.collect().to_pandas() + + @dataclass class SubsetJob(RetrivalJob, ModificationJob): @@ -1213,6 +1245,13 @@ def retrival_requests(self) -> list[RetrivalRequest]: async def compute_derived_features_polars(self, df: pl.LazyFrame) -> pl.LazyFrame: for request in self.requests: + + missing_features = request.features_to_include - set(df.columns) + + if len(missing_features) == 0: + logger.debug('Skipping to compute derived features as they are already computed') + continue + for feature_round in request.derived_features_order(): round_expressions: list[pl.Expr] = [] @@ -1687,7 +1726,7 @@ async def to_pandas(self) -> pd.DataFrame: df[feature.name] = pd.to_datetime(df[feature.name], infer_datetime_format=True, utc=True) elif feature.dtype == FeatureType.datetime() or feature.dtype == FeatureType.string(): continue - elif feature.dtype == FeatureType.array(): + elif (feature.dtype == FeatureType.array()) or (feature.dtype == FeatureType.embedding()): import json if df[feature.name].dtype == 'object': @@ -1738,7 +1777,7 @@ async def to_polars(self) -> pl.LazyFrame: .cast(pl.Datetime(time_zone='UTC')) .alias(feature.name) ) - elif feature.dtype == FeatureType.array(): + elif (feature.dtype == FeatureType.array()) or (feature.dtype == FeatureType.embedding()): dtype = df.select(feature.name).dtypes[0] if dtype == pl.Utf8: df = df.with_columns(pl.col(feature.name).str.json_extract(pl.List(pl.Utf8))) diff --git a/aligned/schemas/feature_view.py b/aligned/schemas/feature_view.py index 876bea60..0924d7ee 100644 --- a/aligned/schemas/feature_view.py +++ b/aligned/schemas/feature_view.py @@ -43,9 +43,9 @@ class CompiledFeatureView(Codable): def __pre_serialize__(self) -> CompiledFeatureView: assert isinstance(self.name, str) - assert isinstance(self.description, str) assert isinstance(self.tags, dict) assert isinstance(self.source, BatchDataSource) + for entity in self.entities: assert isinstance(entity, Feature) for feature in self.features: @@ -54,6 +54,9 @@ def __pre_serialize__(self) -> CompiledFeatureView: assert isinstance(derived_feature, DerivedFeature) for aggregated_feature in self.aggregated_features: assert isinstance(aggregated_feature, AggregatedFeature) + + if self.description is not None: + assert isinstance(self.description, str) if self.event_timestamp is not None: assert isinstance(self.event_timestamp, EventTimestamp) if self.stream_data_source is not None: @@ -164,12 +167,6 @@ def dependent_features_for( derived_features.update(intermediate) aggregated_features.update(aggregated) - all_features = features.union(derived_features).union( - {feature.derived_feature for feature in aggregated_features} - ) - - exclude_names = {feature.name for feature in all_features} - feature_names - return FeatureRequest( FeatureLocation.feature_view(self.name), feature_names, @@ -182,7 +179,7 @@ def dependent_features_for( derived_features=derived_features, aggregated_features=aggregated_features, event_timestamp=self.event_timestamp, - features_to_include=exclude_names, + features_to_include=feature_names, ) ], ) diff --git a/aligned/schemas/text_vectoriser.py b/aligned/schemas/text_vectoriser.py index d0f7f11b..8afe91e1 100644 --- a/aligned/schemas/text_vectoriser.py +++ b/aligned/schemas/text_vectoriser.py @@ -16,11 +16,11 @@ logger = logging.getLogger(__name__) -class SupportedTextModels: +class SupportedEmbeddingModels: - types: dict[str, type[TextVectoriserModel]] + types: dict[str, type[EmbeddingModel]] - _shared: SupportedTextModels | None = None + _shared: SupportedEmbeddingModels | None = None def __init__(self) -> None: self.types = {} @@ -28,18 +28,18 @@ def __init__(self) -> None: for tran_type in [GensimModel, OpenAiEmbeddingModel, HuggingFaceTransformer]: self.add(tran_type) - def add(self, transformation: type[TextVectoriserModel]) -> None: + def add(self, transformation: type[EmbeddingModel]) -> None: self.types[transformation.name] = transformation @classmethod - def shared(cls) -> SupportedTextModels: + def shared(cls) -> SupportedEmbeddingModels: if cls._shared: return cls._shared - cls._shared = SupportedTextModels() + cls._shared = SupportedEmbeddingModels() return cls._shared -class TextVectoriserModel(Codable, SerializableType): +class EmbeddingModel(Codable, SerializableType): name: str @property @@ -50,10 +50,10 @@ def _serialize(self) -> dict: return self.to_dict() @classmethod - def _deserialize(cls, value: dict) -> TextVectoriserModel: + def _deserialize(cls, value: dict) -> EmbeddingModel: name_type = value['name'] del value['name'] - data_class = SupportedTextModels.shared().types[name_type] + data_class = SupportedEmbeddingModels.shared().types[name_type] with suppress(AttributeError): if data_class.dtype: del value['dtype'] @@ -64,10 +64,10 @@ async def load_model(self): pass async def vectorise_pandas(self, texts: pd.Series) -> pd.Series: - pass + raise NotImplementedError(type(self)) async def vectorise_polars(self, texts: pl.LazyFrame, text_key: str, output_key: str) -> pl.LazyFrame: - pass + raise NotImplementedError(type(self)) @staticmethod def gensim(model_name: str, config: GensimConfig | None = None) -> GensimModel: @@ -93,7 +93,7 @@ class GensimConfig(Codable): @dataclass -class GensimModel(TextVectoriserModel): +class GensimModel(EmbeddingModel): model_name: str config: GensimConfig = field(default_factory=GensimConfig) @@ -187,7 +187,7 @@ class OpenAiResponse(BaseModel): @dataclass -class OpenAiEmbeddingModel(TextVectoriserModel): +class OpenAiEmbeddingModel(EmbeddingModel): api_token_env_key: str = field(default='OPENAI_API_KEY') model: str = field(default='text-embedding-ada-002') @@ -247,18 +247,26 @@ async def vectorise_pandas(self, texts: pd.Series) -> pd.Series: async def vectorise_polars(self, texts: pl.LazyFrame, text_key: str, output_key: str) -> pl.LazyFrame: data = await self.embeddings(texts.select(text_key).collect().to_series().to_list()) - return texts.with_column( + return texts.with_columns( pl.Series(values=[embedding.embedding for embedding in data.data], name=output_key) ) @dataclass -class HuggingFaceTransformer(TextVectoriserModel): +class HuggingFaceTransformer(EmbeddingModel): model: str name: str = 'huggingface' loaded_model: Any = field(default=None) + @property + def embedding_size(self) -> int | None: + from sentence_transformers import SentenceTransformer + + model = SentenceTransformer(self.model) + + return len(model.encode(['test'])[0]) + async def load_model(self): from sentence_transformers import SentenceTransformer @@ -267,12 +275,11 @@ async def load_model(self): async def vectorise_polars(self, texts: pl.LazyFrame, text_key: str, output_key: str) -> pl.LazyFrame: if self.loaded_model is None: await self.load_model() - return texts.with_column( + return texts.with_columns( pl.Series( self.loaded_model.encode(texts.select(pl.col(text_key)).collect().to_series().to_list()) ).alias(output_key) ) - pass async def vectorise_pandas(self, texts: pd.Series) -> pd.Series: if self.loaded_model is None: diff --git a/aligned/schemas/transformation.py b/aligned/schemas/transformation.py index 8af7f23f..d9039e64 100644 --- a/aligned/schemas/transformation.py +++ b/aligned/schemas/transformation.py @@ -14,7 +14,7 @@ from aligned.schemas.codable import Codable from aligned.schemas.feature import FeatureType from aligned.schemas.literal_value import LiteralValue -from aligned.schemas.text_vectoriser import TextVectoriserModel +from aligned.schemas.text_vectoriser import EmbeddingModel if TYPE_CHECKING: from aligned.sources.s3 import AwsS3Config @@ -1361,9 +1361,10 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: return temp_df async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: - pandas_column = df.select(self.key).collect().to_pandas() + collected = df.collect() + pandas_column = collected.select(self.key).to_pandas() transformed = await self.transform_pandas(pandas_column) - return df.with_columns(pl.Series(transformed).alias(alias)) + return collected.with_columns(pl.Series(transformed).alias(alias)).lazy() # @staticmethod # def test_definition() -> TransformationTestDefinition: @@ -1721,7 +1722,7 @@ def test_definition() -> TransformationTestDefinition: @dataclass class WordVectoriser(Transformation): key: str - model: TextVectoriserModel + model: EmbeddingModel name = 'word_vectoriser' dtype = FeatureType.embedding() @@ -1820,7 +1821,7 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: return df[self.key] + self.string async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: - return pl.concat_str([pl.col(self.key).fill_null(''), pl.lit(self.string)], sep='').alias(alias) + return pl.concat_str([pl.col(self.key).fill_null(''), pl.lit(self.string)], separator='').alias(alias) @dataclass @@ -1839,7 +1840,8 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: return df.with_columns( pl.concat_str( - [pl.col(self.first_key).fill_null(''), pl.col(self.second_key).fill_null('')], sep=self.sep + [pl.col(self.first_key).fill_null(''), pl.col(self.second_key).fill_null('')], + separator=self.sep, ).alias(alias) ) @@ -1857,7 +1859,7 @@ async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: return self.string + df[self.key] async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: - return pl.concat_str([pl.lit(self.string), pl.col(self.key).fill_null('')], sep='').alias(alias) + return pl.concat_str([pl.lit(self.string), pl.col(self.key).fill_null('')], separator='').alias(alias) @dataclass @@ -2139,11 +2141,13 @@ class StructField(Transformation): dtype = FeatureType.string() async def transform_pandas(self, df: pd.DataFrame) -> pd.Series: - return ( - (await self.transform_polars(pl.from_pandas(df).lazy(), 'feature')) - .collect() - .to_pandas()['feature'] - ) + data = pl.from_pandas(df).lazy() + tran = await self.transform_polars(data, 'feature') + + if isinstance(tran, pl.LazyFrame): + return tran.collect().to_pandas()['feature'] + + return data.select(tran).collect().to_pandas()['feature'] async def transform_polars(self, df: pl.LazyFrame, alias: str) -> pl.LazyFrame | pl.Expr: if df.schema[self.key].is_(pl.Utf8): diff --git a/aligned/split_strategy.py b/aligned/split_strategy.py index 59c3a8ab..a0c1852e 100644 --- a/aligned/split_strategy.py +++ b/aligned/split_strategy.py @@ -3,9 +3,8 @@ import polars as pl from pandas import DataFrame, Index, concat -from pandas.core.generic import NDFrame -DatasetType = TypeVar('DatasetType', bound=NDFrame) +DatasetType = TypeVar('DatasetType') class TrainTestSet(Generic[DatasetType]): diff --git a/conftest.py b/conftest.py index cbd7b032..41267326 100644 --- a/conftest.py +++ b/conftest.py @@ -15,7 +15,7 @@ Int64, RedisConfig, String, - TextVectoriserModel, + EmbeddingModel, ) from aligned.feature_view.feature_view import FeatureView, FeatureViewMetadata from aligned.compiler.model import model_contract, ModelContractWrapper @@ -604,7 +604,7 @@ class TitanicPassenger(FeatureView): survived = Bool().description('If the passenger survived') name = String() - name_embedding = name.embedding(TextVectoriserModel.gensim('glove-wiki-gigaword-50')).indexed( + name_embedding = name.embedding(EmbeddingModel.gensim('glove-wiki-gigaword-50')).indexed( embedding_size=50, storage=redis.index(name='name_embedding_index'), metadata=[age, sex] )