Skip to content

Commit

Permalink
chore: minor usability improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
MatsMoll committed May 21, 2023
1 parent e452a21 commit 941f39b
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 45 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# Aligned

Aligned makes it easier to reduce visibility debt, and data depencency debts, as described in [Sculley et al. [2015]](https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf).
Aligned helps improving ML system visibility, while also reducing technical, and data debt, as described in [Sculley et al. [2015]](https://papers.nips.cc/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf).

Want to look at examples of how to use `aligned`? View the [`MatsMoll/aligned-example` repo](https://github.com/MatsMoll/aligned-example).
Want to look at examples of how to use `aligned`?
View the [`MatsMoll/aligned-example` repo](https://github.com/MatsMoll/aligned-example).

This is done by providing an new innovative way of describing feature transformations, and data flow in ML systems. While also collecting dependency metadata that would otherwise be too inconvenient and error prone to manually type out.

Expand All @@ -27,7 +28,7 @@ As a result, loading model features is as easy as:

```python
entities = {"passenger_id": [1, 2, 3, 4]}
await store.model("titanic").features_for(entities).as_pandas()
await store.model("titanic").features_for(entities).to_pandas()
```

Aligned is still in active development, so changes are likely.
Expand Down
61 changes: 33 additions & 28 deletions aligned/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,36 +45,41 @@ def load_envs(path: Path) -> None:


def setup_logger():
from importlib.util import find_spec
from logging.config import dictConfig

handler = 'console'
log_format = '%(levelname)s:\t\b%(asctime)s %(name)s:%(lineno)d [%(correlation_id)s] %(message)s'
dictConfig(
{
'version': 1,
'disable_existing_loggers': False,
'filters': {
'correlation_id': {
'()': 'asgi_correlation_id.CorrelationIdFilter',
'uuid_length': 16,
},
},
'formatters': {
'console': {'class': 'logging.Formatter', 'datefmt': '%H:%M:%S', 'format': log_format}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['correlation_id'],
'formatter': 'console',
}
},
'loggers': {
# project
'': {'handlers': [handler], 'level': 'INFO', 'propagate': True},
},
log_format = '%(levelname)s:\t\b%(asctime)s %(name)s:%(lineno)d %(message)s'
configs = {
'version': 1,
'disable_existing_loggers': False,
'filters': {},
'formatters': {
'console': {'class': 'logging.Formatter', 'datefmt': '%H:%M:%S', 'format': log_format}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': [],
'formatter': 'console',
}
},
'loggers': {
# project
'': {'handlers': [handler], 'level': 'INFO', 'propagate': True},
},
}

if find_spec('asgi_correlation_id'):
log_format = '%(levelname)s:\t\b%(asctime)s %(name)s:%(lineno)d [%(correlation_id)s] %(message)s'
configs['filters']['correlation_id'] = {
'()': 'asgi_correlation_id.CorrelationIdFilter',
'uuid_length': 16,
}
)
configs['handlers']['console']['filters'].append('correlation_id')
configs['formatters']['console']['format'] = log_format

dictConfig(configs)


@click.group()
Expand Down Expand Up @@ -434,7 +439,7 @@ async def profile(repo_path: str, reference_file: str, env_file: str, output: st
Path(output).write_bytes(results.to_json().encode('utf-8'))


@cli.command('create_indexes')
@cli.command('create-indexes')
@coro
@click.option(
'--repo-path',
Expand Down Expand Up @@ -485,7 +490,7 @@ async def create_indexes(repo_path: str, reference_file: str, env_file: str) ->
continue

for index in view.indexes:
click.echo(f'Creating indexes for: {feature_view_name}, named {index.name}')
click.echo(f'Creating indexes for: {feature_view_name}')
await index.storage.create_index(index)


Expand Down
5 changes: 4 additions & 1 deletion aligned/compiler/feature_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,10 @@ def append(self, other: FeatureFactory | str) -> String:
from aligned.compiler.transformation_factory import AppendStrings

feature = String()
feature.transformation = AppendStrings(self, other)
if isinstance(other, FeatureFactory):
feature.transformation = AppendStrings(self, other)
else:
feature.transformation = AppendStrings(self, LiteralValue.from_value(other))
return feature


Expand Down
2 changes: 2 additions & 0 deletions aligned/compiler/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def compile(cls) -> ModelSchema:

regression_targets[var_name] = target_feature
inference_view.regression_targets.add(target_feature)
inference_view.features.add(target_feature.feature)
elif isinstance(feature, EventTimestamp):
inference_view.event_timestamp = feature.event_timestamp()

Expand Down Expand Up @@ -187,6 +188,7 @@ def compile(cls) -> ModelSchema:
depth=1,
)
inference_view.derived_features.add(arg_max_feature)

if not probability_features:
inference_view.features.update(
{target.feature for target in inference_view.classification_targets}
Expand Down
16 changes: 10 additions & 6 deletions aligned/compiler/repo_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,21 @@ def super_classes_in(obj: Any) -> set[str]:
return super_class_names


def python_files(repo_path: Path, ignore_path: Path | None = None) -> list[Path]:
def find_files(repo_path: Path, ignore_path: Path | None = None, file_extension: str = 'py') -> list[Path]:
files = {
path.resolve()
for path in repo_path.resolve().glob('**/*.py')
if path.is_file() and '__init__.py' != path.name
for path in repo_path.resolve().glob(f'**/*.{file_extension}')
if path.is_file()
and '__init__.py' != path.name
and not any(part.startswith('.') for part in path.parts)
}
if ignore_path:
ignore_files = {
path.resolve()
for path in ignore_path.glob('**/*.py')
if path.is_file() and '__init__.py' != path.name
for path in ignore_path.glob(f'**/*.{file_extension}')
if path.is_file()
and '__init__.py' != path.name
and not any(part.startswith('.') for part in path.parts)
}
files -= ignore_files
return sorted(files)
Expand Down Expand Up @@ -88,7 +92,7 @@ async def definition_from_path(repo_path: Path) -> RepoDefinition:

feature_view_names: dict[str, str] = {}

for py_file in python_files(repo_path):
for py_file in find_files(repo_path):
imports = imports_for(py_file)

module_path = path_to_py_module(py_file, repo_path)
Expand Down
7 changes: 5 additions & 2 deletions aligned/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ def from_definition(repo: RepoDefinition, feature_source: FeatureSource | None =
combined_feature_views=combined_feature_views,
models={model.name: model for model in repo.models},
feature_source=BatchFeatureSource(
{FeatureLocation.feature_view(fv.name).identifier: fv for fv in repo.feature_views}
{
FeatureLocation.feature_view(fv.name).identifier: fv.batch_data_source
for fv in repo.feature_views
}
),
)

Expand Down Expand Up @@ -404,7 +407,7 @@ def with_source(self, source: FeatureSource | FeatureSourceFactory | None = None
else:
feature_source = source or BatchFeatureSource(
{
FeatureLocation.feature_view(view.name).identifier: view
FeatureLocation.feature_view(view.name).identifier: view.batch_data_source
for view in set(self.feature_views.values())
}
)
Expand Down
8 changes: 7 additions & 1 deletion aligned/redis/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
from dataclasses import dataclass

from redis.asyncio import Redis # type: ignore
try:
from redis.asyncio import Redis # type: ignore
except ModuleNotFoundError:

class Redis:
async def xread(self, streams: dict[str, str], count: int, block: int):
pass


@dataclass
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ description = "A scalable feature store that makes it easy to align offline and
authors = ["Mats E. Mollestad <[email protected]>"]
license = "Apache-2.0"
readme = "README.md"
homepage = "https://github.com/otovo/aladdin"
repository = "https://github.com/otovo/aladdin"
homepage = "https://github.com/otovo/aligned"
repository = "https://github.com/otovo/aligned"
keywords = [
'python',
'typed',
Expand Down Expand Up @@ -53,7 +53,7 @@ pyarrow = "^8.0.0"
Jinja2 = "^3.1.2"
nest-asyncio = "^1.5.5"
asgi-correlation-id = { version = "^3.0.0", optional = true }
pandera = {version = "^0.13.3", optional = true}
pandera = { version = "^0.13.3", optional = true}
httpx = "^0.23.0"
polars = { version = "^0.15.6", extras = ["all"] }
connectorx = { version = "^0.3.1", optional = true }
Expand Down
65 changes: 65 additions & 0 deletions scripts/generate_snippets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from dataclasses import asdict, dataclass
from pathlib import Path

import polars as pl

from aligned.compiler.repo_reader import find_files


def generate_snippets():
root_path = Path().resolve()
markdown_files = find_files(root_path, file_extension='md')

source_code_folder = root_path / 'aligned'
source_code_files = find_files(source_code_folder)

all_snippets: list[Snippet] = []
for file in markdown_files:
all_snippets.extend(generate_snippet_from_markdown_file(file, root_path))

for file in source_code_files:
all_snippets.extend(generate_snippet_from_python_file(file, root_path))

df = pl.DataFrame([asdict(snippet) for snippet in all_snippets]).with_row_count(name='id')
df.write_csv('snippets.csv', sep=';')


@dataclass
class Snippet:
source_file: Path
version_tag: str
snippet: str


def generate_snippet_from_markdown_file(file: Path, root_path: Path) -> list[Snippet]:
file_content = file.read_text()
sections = file_content.split('\n#')
return [
Snippet(source_file=file.relative_to(root_path).as_posix(), version_tag='beta', snippet=section)
for section in sections
]


def generate_snippet_from_python_file(file: Path, root_path: Path) -> list[Snippet]:
file_content = file.read_text()

dataclass_suffix = '@dataclass\n'
classes = file_content.split('class ')
if len(classes) == 1:
return []
# The first index will not contain any classes.
# Therefore, we can remove it
classes = classes[1:]

return [
Snippet(
source_file=file.relative_to(root_path).as_posix(),
version_tag='beta',
snippet=f'class {snippet.removesuffix(dataclass_suffix).strip()}',
)
for snippet in classes
]


if __name__ == '__main__':
generate_snippets()
Loading

0 comments on commit 941f39b

Please sign in to comment.