diff --git a/aligned/active_learning/job.py b/aligned/active_learning/job.py deleted file mode 100644 index 48c7e158..00000000 --- a/aligned/active_learning/job.py +++ /dev/null @@ -1,37 +0,0 @@ -from __future__ import annotations - -import logging -from dataclasses import dataclass - -import polars as pl - -from aligned.lazy_imports import pandas as pd -from aligned.active_learning.selection import ActiveLearningMetric, ActiveLearningSelection -from aligned.active_learning.write_policy import ActiveLearningWritePolicy -from aligned.retrival_job import RetrivalJob -from aligned.schemas.model import Model - -logger = logging.getLogger(__name__) - - -@dataclass -class ActiveLearningJob(RetrivalJob): - - job: RetrivalJob - model: Model - metric: ActiveLearningMetric - selection: ActiveLearningSelection - write_policy: ActiveLearningWritePolicy - - async def to_lazy_polars(self) -> pl.LazyFrame: - if not self.model.predictions_view.classification_targets: - logger.info('Found no target. Therefore, no data will be written to an active learning dataset.') - return await self.job.to_lazy_polars() - - data = await self.job.to_lazy_polars() - active_learning_set = self.selection.select(self.model, data, self.metric) - await self.write_policy.write(active_learning_set, self.model) - return data - - async def to_pandas(self) -> pd.DataFrame: - raise NotImplementedError() diff --git a/aligned/active_learning/selection.py b/aligned/active_learning/selection.py deleted file mode 100644 index 09ec09f2..00000000 --- a/aligned/active_learning/selection.py +++ /dev/null @@ -1,58 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Callable - -import polars as pl - -from aligned.schemas.model import Model - - -@dataclass -class ActiveLearningMetric: - def metric(self, model: Model) -> pl.Expr: - raise NotImplementedError() - - @staticmethod - def max_confidence() -> ActiveLearningMetric: - def metric_selection(model: Model) -> pl.Expr: - view = model.predictions_view - - if view.classification_targets and len(view.classification_targets) > 0: - confidence = [prob.confidence.name for prob in view.classification_targets if prob.confidence] - return pl.concat_list(confidence).arr.max() - - if view.regression_targets and len(view.regression_targets) > 0: - confidence = [prob.confidence.name for prob in view.regression_targets if prob.confidence] - return pl.concat_list(confidence).arr.max() - - return pl.lit(1) - - return ActivLearningPolarsExprMetric(metric_selection) - - -@dataclass -class ActivLearningPolarsExprMetric(ActiveLearningMetric): - - factory: Callable[[Model], pl.Expr] - - def metric(self, model: Model) -> pl.Expr: - return self.factory(model) - - -class ActiveLearningSelection: - def select(self, model: Model, data: pl.LazyFrame, metric: ActiveLearningMetric) -> pl.LazyFrame: - raise NotImplementedError() - - @staticmethod - def under_threshold(threshold: float) -> ActiveLearningSelection: - return ActivLearningPolarsExprSelection(lambda model, metric: metric.metric(model) < threshold) - - -@dataclass -class ActivLearningPolarsExprSelection(ActiveLearningSelection): - - factory: Callable[[Model, ActiveLearningMetric], pl.Expr] - - def select(self, model: Model, data: pl.LazyFrame, metric: ActiveLearningMetric) -> pl.LazyFrame: - return data.filter(self.factory(model, metric)) diff --git a/aligned/active_learning/write_policy.py b/aligned/active_learning/write_policy.py deleted file mode 100644 index b0ba136c..00000000 --- a/aligned/active_learning/write_policy.py +++ /dev/null @@ -1,64 +0,0 @@ -from __future__ import annotations - -import logging -from dataclasses import dataclass, field -from datetime import datetime -from pathlib import Path - -import polars as pl - -from aligned.schemas.model import Model - -logger = logging.getLogger(__name__) - - -class ActiveLearningWritePolicy: - async def write(self, data: pl.LazyFrame, model: Model): - raise NotImplementedError() - - @staticmethod - def sample_size(write_size: int, ideal_size: int) -> ActiveLearningWritePolicy: - return ActiveLearningSampleSizePolicy(write_size, ideal_size) - - -@dataclass -class ActiveLearningSampleSizePolicy(ActiveLearningWritePolicy): - - write_size: int - ideal_size: int - - dataset_folder_name: str = field(default='active_learning') - dataset_file_name: str = field(default='data.csv') - - unsaved_size: float = field(default=0) - write_timestamp: float = field(default_factory=lambda: datetime.utcnow().timestamp()) - current_frame: pl.DataFrame = field(default_factory=lambda: pl.DataFrame()) - - async def write(self, data: pl.LazyFrame, model: Model): - - if not model.dataset_store: - logger.info( - 'Found no dataset folder. Therefore, no data will be written to an active learning dataset.' - ) - return - - collected_data = data.collect() - - if self.current_frame.shape[0] == 0: - self.current_frame = collected_data - else: - self.current_frame = self.current_frame.extend(collected_data) - - self.unsaved_size += collected_data.shape[0] - - if self.unsaved_size >= self.write_size or self.current_frame.shape[0] >= self.ideal_size: - dataset_subfolder = Path(self.dataset_folder_name) / str(self.write_timestamp) - logger.info(f'Writing active learning data to {dataset_subfolder}') - - dataset = model.dataset_store.file_at(dataset_subfolder / self.dataset_file_name) - await dataset.write(self.current_frame.write_csv().encode('utf-8')) - self.unsaved_size = 0 - - if self.current_frame.shape[0] >= self.ideal_size: - self.write_timestamp = datetime.utcnow().timestamp() - self.current_frame = pl.DataFrame() diff --git a/aligned/compiler/feature_factory.py b/aligned/compiler/feature_factory.py index e0db18a3..7e5b6bdd 100644 --- a/aligned/compiler/feature_factory.py +++ b/aligned/compiler/feature_factory.py @@ -655,7 +655,7 @@ def as_model_version(self) -> ModelVersion: class CouldBeEntityFeature: - def as_entity(self: T) -> T: + def as_entity(self: T) -> T: # type: ignore return self.with_tag(StaticFeatureTags.is_entity) diff --git a/aligned/compiler/model.py b/aligned/compiler/model.py index eb1fceca..c8e1ef91 100644 --- a/aligned/compiler/model.py +++ b/aligned/compiler/model.py @@ -285,11 +285,12 @@ def join_asof( def resolve_dataset_store(dataset_store: DatasetStore | StorageFileReference) -> DatasetStore: - from aligned.schemas.folder import DatasetStore, JsonDatasetStore + from aligned.schemas.folder import DatasetStore, JsonDatasetStore, StorageFileSource if isinstance(dataset_store, DatasetStore): return dataset_store + assert isinstance(dataset_store, StorageFileSource) return JsonDatasetStore(dataset_store) diff --git a/aligned/feature_store.py b/aligned/feature_store.py index c4a91693..23a5f191 100644 --- a/aligned/feature_store.py +++ b/aligned/feature_store.py @@ -1931,7 +1931,7 @@ async def freshness(self) -> datetime | None: location = FeatureLocation.feature_view(view.name) - return (await self.source.freshness_for({location: view.event_timestamp}))[location] + return (await self.source.freshness_for({location: view.event_timestamp.as_feature()}))[location] class VectorIndexStore: diff --git a/aligned/retrival_job.py b/aligned/retrival_job.py index f573d8e9..10f13170 100644 --- a/aligned/retrival_job.py +++ b/aligned/retrival_job.py @@ -946,9 +946,9 @@ def add_additional_features( elif isinstance(data, list): df = pl.DataFrame(data, schema_overrides=schema).lazy() elif isinstance(data, pl.DataFrame): - df = data.cast(schema).lazy() + df = data.cast(schema).lazy() # type: ignore elif isinstance(data, pl.LazyFrame): - df = data.cast(schema) + df = data.cast(schema) # type: ignore elif isinstance(data, pd.DataFrame): df = pl.from_pandas(data, schema_overrides=schema).lazy() else: @@ -1062,14 +1062,14 @@ def polars_filter_expressions_from(features: list[Feature]) -> list[tuple[pl.Exp elif isinstance(constraint, MinLength): exprs.append( ( - pl.col(feature.name).str.lengths() > constraint.value, + pl.col(feature.name).str.len_chars() > constraint.value, f"MinLength {feature.name} {constraint.value}", ) ) elif isinstance(constraint, MaxLength): exprs.append( ( - pl.col(feature.name).str.lengths() < constraint.value, + pl.col(feature.name).str.len_chars() < constraint.value, f"MaxLength {feature.name} {constraint.value}", ) ) @@ -2048,7 +2048,7 @@ async def to_lazy_polars(self) -> pl.LazyFrame: window_data = await self.data_windows(window, data.select(required_features_name), now) - agg_data = window_data.lazy().groupby(window.group_by_names).agg(agg_expr).collect() + agg_data = window_data.lazy().group_by(window.group_by_names).agg(agg_expr).collect() data = data.join(agg_data, on=window.group_by_names, how='left') return data.lazy() @@ -2324,10 +2324,14 @@ async def to_lazy_polars(self) -> pl.LazyFrame: if feature.dtype == FeatureType.boolean(): df = df.with_columns(pl.col(feature.name).cast(pl.Int8).cast(pl.Boolean)) - elif (feature.dtype.is_array) or (feature.dtype.is_embedding): + elif feature.dtype.is_array: dtype = df.select(feature.name).dtypes[0] if dtype == pl.Utf8: - df = df.with_columns(pl.col(feature.name).str.json_extract(pl.List(pl.Utf8))) + df = df.with_columns(pl.col(feature.name).str.json_decode(pl.List(pl.Utf8))) + elif feature.dtype.is_embedding: + dtype = df.select(feature.name).dtypes[0] + if dtype == pl.Utf8: + df = df.with_columns(pl.col(feature.name).str.json_decode(pl.List(pl.Float64))) elif (feature.dtype == FeatureType.json()) or feature.dtype.is_datetime: logger.debug(f'Converting {feature.name} to {feature.dtype.name}') pass diff --git a/aligned/schemas/transformation.py b/aligned/schemas/transformation.py index 23b5a8cd..aee128f9 100644 --- a/aligned/schemas/transformation.py +++ b/aligned/schemas/transformation.py @@ -1306,33 +1306,33 @@ async def transform_polars( case 'day': expr = col.day() case 'days': - expr = col.days() + expr = col.ordinal_day() case 'epoch': expr = col.epoch() case 'hour': expr = col.hour() case 'hours': - expr = col.hours() + expr = col.total_hours() case 'iso_year': expr = col.iso_year() case 'microsecond': expr = col.microsecond() case 'microseconds': - expr = col.microseconds() + expr = col.total_microseconds() case 'millisecond': expr = col.millisecond() case 'milliseconds': - expr = col.milliseconds() + expr = col.total_milliseconds() case 'minute': expr = col.minute() case 'minutes': - expr = col.minutes() + expr = col.total_minutes() case 'month': expr = col.month() case 'nanosecond': expr = col.nanosecond() case 'nanoseconds': - expr = col.nanoseconds() + expr = col.total_nanoseconds() case 'ordinal_day': expr = col.ordinal_day() case 'quarter': @@ -1340,7 +1340,7 @@ async def transform_polars( case 'second': expr = col.second() case 'seconds': - expr = col.seconds() + expr = col.total_seconds() case 'week': expr = col.week() case 'weekday': @@ -1540,7 +1540,7 @@ async def transform_polars( ) -> pl.LazyFrame | pl.Expr: collected = df.collect() pandas_column = collected.select(self.key).to_pandas() - transformed = await self.transform_pandas(pandas_column) + transformed = await self.transform_pandas(pandas_column, store) return collected.with_columns(pl.Series(transformed).alias(alias)).lazy() # @staticmethod @@ -1980,7 +1980,7 @@ def grayscale(images) -> pl.Series: [np.mean(image, axis=2) if len(image.shape) == 3 else image for image in images.to_list()] ) - return pl.col(self.image_key).map(grayscale).alias(alias) + return pl.col(self.image_key).map_batches(grayscale).alias(alias) @dataclass @@ -2087,7 +2087,7 @@ class ConcatStringAggregation(Transformation, PsqlTransformation, RedshiftTransf dtype = FeatureType.string() async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: - pdf = await self.transform_polars(pl.from_pandas(df).lazy(), self.name) + pdf = await self.transform_polars(pl.from_pandas(df).lazy(), self.name, store) assert isinstance(pdf, pl.LazyFrame) return pdf.collect().to_pandas()[self.name] # type: ignore @@ -2365,7 +2365,7 @@ async def transform_polars( return df.with_columns( pl.col(self.key) - .apply(lambda x: s3.signed_download_url(x, max_age=self.max_age_seconds)) + .map_elements(lambda x: s3.signed_download_url(x, max_age=self.max_age_seconds)) .alias(alias) ) @@ -2381,7 +2381,7 @@ class StructField(Transformation): async def transform_pandas(self, df: pd.DataFrame, store: ContractStore) -> pd.Series: data = pl.from_pandas(df).lazy() - tran = await self.transform_polars(data, 'feature') + tran = await self.transform_polars(data, 'feature', store) if isinstance(tran, pl.LazyFrame): return tran.collect().to_pandas()['feature'] # type: ignore @@ -2392,7 +2392,7 @@ async def transform_polars( self, df: pl.LazyFrame, alias: str, store: ContractStore ) -> pl.LazyFrame | pl.Expr: if df.schema[self.key].is_(pl.Utf8): - return await JsonPath(self.key, f'$.{self.field}').transform_polars(df, alias) + return await JsonPath(self.key, f'$.{self.field}').transform_polars(df, alias, store) else: return pl.col(self.key).struct.field(self.field).alias(alias) diff --git a/aligned/sources/azure_blob_storage.py b/aligned/sources/azure_blob_storage.py index c17d442a..f535bb41 100644 --- a/aligned/sources/azure_blob_storage.py +++ b/aligned/sources/azure_blob_storage.py @@ -6,6 +6,7 @@ from datetime import datetime from io import BytesIO from pathlib import Path +from typing import TYPE_CHECKING import polars as pl from aligned.data_source.batch_data_source import CodableBatchDataSource, ColumnFeatureMappable @@ -31,13 +32,8 @@ from httpx import HTTPStatusError from aligned.lazy_imports import pandas as pd -try: - from azure.storage.blob import BlobServiceClient # type: ignore -except ModuleNotFoundError: - - class BlobServiceClient: - pass - +if TYPE_CHECKING: + from azure.storage.blob import BlobServiceClient logger = logging.getLogger(__name__) diff --git a/aligned/worker.py b/aligned/worker.py index 122c14ad..337e5b3c 100644 --- a/aligned/worker.py +++ b/aligned/worker.py @@ -2,19 +2,16 @@ import asyncio import logging -import timeit from dataclasses import dataclass, field from typing import TYPE_CHECKING from prometheus_client import Counter, Histogram -from aligned.active_learning.selection import ActiveLearningMetric, ActiveLearningSelection -from aligned.active_learning.write_policy import ActiveLearningWritePolicy from aligned.data_source.batch_data_source import ColumnFeatureMappable from aligned.data_source.stream_data_source import StreamDataSource from aligned.feature_source import WritableFeatureSource -from aligned.feature_store import ContractStore, FeatureViewStore, ModelFeatureStore +from aligned.feature_store import ContractStore, FeatureSourceable, FeatureViewStore from aligned.retrival_job import RetrivalJob, StreamAggregationJob from aligned.sources.local import AsRepoDefinition from aligned.streams.interface import ReadableStream @@ -34,14 +31,6 @@ # Very experimental, so can contain a lot of bugs -@dataclass -class ActiveLearningConfig: - metric: ActiveLearningMetric - selection: ActiveLearningSelection - write_policy: ActiveLearningWritePolicy - model_names: list[str] | None = None - - @dataclass class StreamWorker: @@ -49,7 +38,6 @@ class StreamWorker: sink_source: WritableFeatureSource views_to_process: set[str] | None = field(default=None) should_prune_unused_features: bool = field(default=False) - active_learning_configs: list[ActiveLearningConfig] = field(default_factory=list) metric_logging_port: int | None = field(default=None) read_timestamps: dict[str, str] = field(default_factory=dict) @@ -97,23 +85,6 @@ def from_object(repo: Path, file: Path, obj: str) -> StreamWorker: except ModuleNotFoundError: raise StreamWorkerNotFound(module_path) - def generate_active_learning_dataset( - self, - metric: ActiveLearningMetric | None = None, - selection: ActiveLearningSelection | None = None, - write_policy: ActiveLearningWritePolicy | None = None, - model_names: list[str] | None = None, - ) -> StreamWorker: - self.active_learning_configs.append( - ActiveLearningConfig( - metric=metric or ActiveLearningMetric.max_confidence(), - selection=selection or ActiveLearningSelection.under_threshold(0.5), - write_policy=write_policy or ActiveLearningWritePolicy.sample_size(10, 1000), - model_names=model_names, - ) - ) - return self - def read_from_timestamps(self, timestamps: dict[str, str]) -> StreamWorker: self.read_timestamps = timestamps return self @@ -179,6 +150,8 @@ def feature_views_by_topic(self, store: ContractStore) -> dict[str, list[Feature async def start(self) -> None: from prometheus_client import start_http_server + assert isinstance(self.sink_source, FeatureSourceable) + store = await self.repo_definition.feature_store() store = store.with_source(self.sink_source) @@ -194,24 +167,6 @@ async def start(self) -> None: ) processes.append(process(stream_consumer, topic_name, process_views)) - for active_learning_config in self.active_learning_configs: - - if not active_learning_config.model_names: - continue - - for model_name in set(active_learning_config.model_names): - model = store.models[model_name] - source = model.predictions_view.stream_source - - if not source: - logger.debug(f'Skipping to setup active learning set for {model_name}') - else: - processes.append( - process_predictions( - source.consumer(), store.model(model_name), active_learning_config - ) - ) - if self.metric_logging_port: start_http_server(self.metric_logging_port) @@ -221,40 +176,6 @@ async def start(self) -> None: await asyncio.gather(*processes) -async def process_predictions( - stream_source: ReadableStream, - model: ModelFeatureStore, - active_learning_config: ActiveLearningConfig | None, -) -> None: - from aligned.active_learning.job import ActiveLearningJob - - if not active_learning_config: - logger.debug('No active learning config found, will not listen to predictions') - return - - while True: - records = await stream_source.read() - - if not records: - continue - start_time = timeit.default_timer() - - request = model.model.request_all_predictions.needed_requests[0] - job = RetrivalJob.from_dict(records, request).ensure_types([request]) # type: ignore - job = ActiveLearningJob( - job, - model.model, - active_learning_config.metric, - active_learning_config.selection, - active_learning_config.write_policy, - ) - _ = await job.to_lazy_polars() - - logger.debug( - f'Processing {len(records)} predictions in {timeit.default_timer() - start_time} seconds' - ) - - def stream_job(values: list[dict], feature_view: FeatureViewStore) -> RetrivalJob: from aligned import FileSource diff --git a/pyproject.toml b/pyproject.toml index 0637ba27..b635c011 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aligned" -version = "0.0.112" +version = "0.0.113" description = "A data managment and lineage tool for ML applications." authors = ["Mats E. Mollestad "] license = "Apache-2.0" diff --git a/scripts/generate_snippets.py b/scripts/generate_snippets.py deleted file mode 100644 index a2891dac..00000000 --- a/scripts/generate_snippets.py +++ /dev/null @@ -1,65 +0,0 @@ -from dataclasses import asdict, dataclass -from pathlib import Path - -import polars as pl - -from aligned.compiler.repo_reader import find_files - - -def generate_snippets(): - root_path = Path().resolve() - markdown_files = find_files(root_path, file_extension='md') - - source_code_folder = root_path / 'aligned' - source_code_files = find_files(source_code_folder) - - all_snippets: list[Snippet] = [] - for file in markdown_files: - all_snippets.extend(generate_snippet_from_markdown_file(file, root_path)) - - for file in source_code_files: - all_snippets.extend(generate_snippet_from_python_file(file, root_path)) - - df = pl.DataFrame([asdict(snippet) for snippet in all_snippets]).with_row_count(name='id') - df.write_csv('snippets.csv', sep=';') - - -@dataclass -class Snippet: - source_file: Path - version_tag: str - snippet: str - - -def generate_snippet_from_markdown_file(file: Path, root_path: Path) -> list[Snippet]: - file_content = file.read_text() - sections = file_content.split('\n#') - return [ - Snippet(source_file=file.relative_to(root_path).as_posix(), version_tag='beta', snippet=section) - for section in sections - ] - - -def generate_snippet_from_python_file(file: Path, root_path: Path) -> list[Snippet]: - file_content = file.read_text() - - dataclass_suffix = '@dataclass\n' - classes = file_content.split('class ') - if len(classes) == 1: - return [] - # The first index will not contain any classes. - # Therefore, we can remove it - classes = classes[1:] - - return [ - Snippet( - source_file=file.relative_to(root_path).as_posix(), - version_tag='beta', - snippet=f'class {snippet.removesuffix(dataclass_suffix).strip()}', - ) - for snippet in classes - ] - - -if __name__ == '__main__': - generate_snippets()