Skip to content

Commit

Permalink
fix: Update ScyllaDB required dependencies for compression (#169)
Browse files Browse the repository at this point in the history
* fix: Update ScyllaDB required dependencies

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Feb 11, 2025
1 parent afde88c commit 6b2643a
Show file tree
Hide file tree
Showing 32 changed files with 836 additions and 790 deletions.
2 changes: 1 addition & 1 deletion sdk/python/feast/driver_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import itertools
from datetime import timedelta, timezone
from enum import Enum
from zoneinfo import ZoneInfo

import numpy as np
import pandas as pd
from zoneinfo import ZoneInfo

from feast.infra.offline_stores.offline_utils import (
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __str__(self) -> str:

def __repr__(self) -> str:
if hasattr(self, "__overridden_message__"):
return f"{type(self).__name__}('{getattr(self,'__overridden_message__')}')"
return f"{type(self).__name__}('{getattr(self, '__overridden_message__')}')"
return super().__repr__()

def to_error_detail(self) -> str:
Expand Down
9 changes: 4 additions & 5 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,8 +838,7 @@ def apply(
views_to_update = [
ob
for ob in objects
if
(
if (
# BFVs are not handled separately from FVs right now.
(isinstance(ob, FeatureView) or isinstance(ob, BatchFeatureView))
and not isinstance(ob, StreamFeatureView)
Expand Down Expand Up @@ -1854,9 +1853,9 @@ def write_logged_features(
if not isinstance(source, FeatureService):
raise ValueError("Only feature service is currently supported as a source")

assert (
source.logging_config is not None
), "Feature service must be configured with logging config in order to use this functionality"
assert source.logging_config is not None, (
"Feature service must be configured with logging config in order to use this functionality"
)

assert isinstance(logs, (pa.Table, Path))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def _print_pod_logs(self, job_id, feature_view, offset=0):
label_selector=f"job-name={job_id}",
).items
for i, pod in enumerate(pods_list):
logger.info(f"Logging output for {feature_view.name} pod {offset+i}")
logger.info(f"Logging output for {feature_view.name} pod {offset + i}")
try:
logger.info(
self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)
Expand Down
25 changes: 13 additions & 12 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ def __init__(
online_store: OnlineStore,
**kwargs,
):
assert (
repo_config.offline_store.type == "snowflake.offline"
), "To use SnowflakeMaterializationEngine, you must use Snowflake as an offline store."
assert repo_config.offline_store.type == "snowflake.offline", (
"To use SnowflakeMaterializationEngine, you must use Snowflake as an offline store."
)

super().__init__(
repo_config=repo_config,
Expand Down Expand Up @@ -237,10 +237,11 @@ def _materialize_one(
project: str,
tqdm_builder: Callable[[int], tqdm],
):
assert (
isinstance(feature_view, BatchFeatureView)
or isinstance(feature_view, FeatureView)
), "Snowflake can only materialize FeatureView & BatchFeatureView feature view types."
assert isinstance(feature_view, BatchFeatureView) or isinstance(
feature_view, FeatureView
), (
"Snowflake can only materialize FeatureView & BatchFeatureView feature view types."
)

entities = []
for entity_name in feature_view.entities:
Expand Down Expand Up @@ -410,7 +411,7 @@ def generate_snowflake_materialization_query(
{serial_func.upper()}({entity_names}, {entity_data}, {entity_types}) AS "entity_key",
{features_str},
"{feature_view.batch_source.timestamp_field}"
{fv_created_str if fv_created_str else ''}
{fv_created_str if fv_created_str else ""}
FROM (
{fv_latest_mapped_values_sql}
)
Expand Down Expand Up @@ -450,7 +451,7 @@ def materialize_to_snowflake_online_store(
"feature_name",
"feature_value" AS "value",
"{feature_view.batch_source.timestamp_field}" AS "event_ts"
{fv_created_str + ' AS "created_ts"' if fv_created_str else ''}
{fv_created_str + ' AS "created_ts"' if fv_created_str else ""}
FROM (
{materialization_sql}
)
Expand All @@ -462,16 +463,16 @@ def materialize_to_snowflake_online_store(
online_table."feature_name" = latest_values."feature_name",
online_table."value" = latest_values."value",
online_table."event_ts" = latest_values."event_ts"
{',online_table."created_ts" = latest_values."created_ts"' if fv_created_str else ''}
{',online_table."created_ts" = latest_values."created_ts"' if fv_created_str else ""}
WHEN NOT MATCHED THEN
INSERT ("entity_feature_key", "entity_key", "feature_name", "value", "event_ts" {', "created_ts"' if fv_created_str else ''})
INSERT ("entity_feature_key", "entity_key", "feature_name", "value", "event_ts" {', "created_ts"' if fv_created_str else ""})
VALUES (
latest_values."entity_feature_key",
latest_values."entity_key",
latest_values."feature_name",
latest_values."value",
latest_values."event_ts"
{',latest_values."created_ts"' if fv_created_str else ''}
{',latest_values."created_ts"' if fv_created_str else ""}
)
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def pull_latest_from_table_or_query(
SELECT {field_string},
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
FROM {from_expression}
WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date.strftime('%Y-%m-%d %H:%M:%S')}' AND TIMESTAMP '{end_date.strftime('%Y-%m-%d %H:%M:%S')}'
{"AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''}
WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date.strftime("%Y-%m-%d %H:%M:%S")}' AND TIMESTAMP '{end_date.strftime("%Y-%m-%d %H:%M:%S")}'
{"AND " + date_partition_column + " >= '" + start_date.strftime("%Y-%m-%d") + "' AND " + date_partition_column + " <= '" + end_date.strftime("%Y-%m-%d") + "' " if date_partition_column != "" and date_partition_column is not None else ""}
)
WHERE _feast_row = 1
"""
Expand Down Expand Up @@ -151,7 +151,7 @@ def pull_all_from_table_or_query(
SELECT {field_string}
FROM {from_expression}
WHERE {timestamp_field} BETWEEN TIMESTAMP '{start_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}' AND TIMESTAMP '{end_date.astimezone(tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]}'
{"AND "+date_partition_column+" >= '"+start_date.strftime('%Y-%m-%d')+"' AND "+date_partition_column+" <= '"+end_date.strftime('%Y-%m-%d')+"' " if date_partition_column != "" and date_partition_column is not None else ''}
{"AND " + date_partition_column + " >= '" + start_date.strftime("%Y-%m-%d") + "' AND " + date_partition_column + " <= '" + end_date.strftime("%Y-%m-%d") + "' " if date_partition_column != "" and date_partition_column is not None else ""}
"""

return AthenaRetrievalJob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def query_generator() -> Iterator[str]:
# Hack for query_context.entity_selections to support uppercase in columns
for context in query_context_dict:
context["entity_selections"] = [
f""""{entity_selection.replace(' AS ', '" AS "')}\""""
f""""{entity_selection.replace(" AS ", '" AS "')}\""""
for entity_selection in context["entity_selections"]
]

Expand Down Expand Up @@ -370,7 +370,7 @@ def build_point_in_time_query(
final_output_feature_names.extend(
[
(
f'{fv["name"]}__{fv["field_mapping"].get(feature, feature)}'
f"{fv['name']}__{fv['field_mapping'].get(feature, feature)}"
if full_feature_names
else fv["field_mapping"].get(feature, feature)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class JWTAuthModel(FeastConfigBaseModel):


class CertificateAuthModel(FeastConfigBaseModel):
cert: FilePath = Field(default=None, alias="cert-file")
key: FilePath = Field(default=None, alias="key-file")
cert: Optional[FilePath] = Field(default=None, alias="cert-file")
key: Optional[FilePath] = Field(default=None, alias="key-file")


CLASSES_BY_AUTH_TYPE = {
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ def evaluate_historical_retrieval():
):
# Make sure all event timestamp fields are tz-aware. We default tz-naive fields to UTC
entity_df_with_features[entity_df_event_timestamp_col] = (
entity_df_with_features[
entity_df_event_timestamp_col
].apply(
entity_df_with_features[entity_df_event_timestamp_col].apply(
lambda x: x
if x.tzinfo is not None
else x.replace(tzinfo=timezone.utc)
Expand Down
12 changes: 6 additions & 6 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,9 @@ def write_feature_service_logs(
config: RepoConfig,
registry: BaseRegistry,
):
assert (
feature_service.logging_config is not None
), "Logging should be configured for the feature service before calling this function"
assert feature_service.logging_config is not None, (
"Logging should be configured for the feature service before calling this function"
)

self.offline_store.write_logged_features(
config=config,
Expand All @@ -419,9 +419,9 @@ def retrieve_feature_service_logs(
config: RepoConfig,
registry: BaseRegistry,
) -> RetrievalJob:
assert (
feature_service.logging_config is not None
), "Logging should be configured for the feature service before calling this function"
assert feature_service.logging_config is not None, (
"Logging should be configured for the feature service before calling this function"
)

logging_source = FeatureServiceLoggingSource(feature_service, config.project)
schema = logging_source.get_schema(registry)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ def upload_arrow_table_to_athena(
f"CREATE EXTERNAL TABLE {database}.{table_name} {'IF NOT EXISTS' if not fail_if_exists else ''}"
f"({column_query_list}) "
f"STORED AS PARQUET "
f"LOCATION '{s3_path[:s3_path.rfind('/')]}' "
f"LOCATION '{s3_path[: s3_path.rfind('/')]}' "
f"TBLPROPERTIES('parquet.compress' = 'SNAPPY') "
)

Expand Down
54 changes: 27 additions & 27 deletions sdk/python/feast/offline_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ def do_get(self, context: fl.ServerCallContext, ticket: fl.Ticket):
return fl.RecordBatchStream(table)

def _validate_offline_write_batch_parameters(self, command: dict):
assert (
"feature_view_names" in command
), "feature_view_names is a mandatory parameter"
assert "feature_view_names" in command, (
"feature_view_names is a mandatory parameter"
)
assert "name_aliases" in command, "name_aliases is a mandatory parameter"

feature_view_names = command["feature_view_names"]
assert (
len(feature_view_names) == 1
), "feature_view_names list should only have one item"
assert len(feature_view_names) == 1, (
"feature_view_names list should only have one item"
)

name_aliases = command["name_aliases"]
assert len(name_aliases) == 1, "name_aliases list should only have one item"
Expand Down Expand Up @@ -286,9 +286,9 @@ def write_logged_features(self, command: dict, key: str):
command["feature_service_name"]
)

assert (
feature_service.logging_config is not None
), "feature service must have logging_config set"
assert feature_service.logging_config is not None, (
"feature service must have logging_config set"
)

assert_permissions(
resource=feature_service,
Expand All @@ -305,15 +305,15 @@ def write_logged_features(self, command: dict, key: str):
)

def _validate_pull_all_from_table_or_query_parameters(self, command: dict):
assert (
"data_source_name" in command
), "data_source_name is a mandatory parameter"
assert (
"join_key_columns" in command
), "join_key_columns is a mandatory parameter"
assert (
"feature_name_columns" in command
), "feature_name_columns is a mandatory parameter"
assert "data_source_name" in command, (
"data_source_name is a mandatory parameter"
)
assert "join_key_columns" in command, (
"join_key_columns is a mandatory parameter"
)
assert "feature_name_columns" in command, (
"feature_name_columns is a mandatory parameter"
)
assert "timestamp_field" in command, "timestamp_field is a mandatory parameter"
assert "start_date" in command, "start_date is a mandatory parameter"
assert "end_date" in command, "end_date is a mandatory parameter"
Expand All @@ -334,15 +334,15 @@ def pull_all_from_table_or_query(self, command: dict):
)

def _validate_pull_latest_from_table_or_query_parameters(self, command: dict):
assert (
"data_source_name" in command
), "data_source_name is a mandatory parameter"
assert (
"join_key_columns" in command
), "join_key_columns is a mandatory parameter"
assert (
"feature_name_columns" in command
), "feature_name_columns is a mandatory parameter"
assert "data_source_name" in command, (
"data_source_name is a mandatory parameter"
)
assert "join_key_columns" in command, (
"join_key_columns is a mandatory parameter"
)
assert "feature_name_columns" in command, (
"feature_name_columns is a mandatory parameter"
)
assert "timestamp_field" in command, "timestamp_field is a mandatory parameter"
assert "start_date" in command, "start_date is a mandatory parameter"
assert "end_date" in command, "end_date is a mandatory parameter"
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,14 @@ def _validate_auth_config(cls, values: Any) -> Any:
)
elif values["auth"]["type"] not in ALLOWED_AUTH_TYPES:
raise ValueError(
f'auth configuration has invalid authentication type={values["auth"]["type"]}. Possible '
f'values={ALLOWED_AUTH_TYPES}'
f"auth configuration has invalid authentication type={values['auth']['type']}. Possible "
f"values={ALLOWED_AUTH_TYPES}"
)
elif isinstance(values["auth"], AuthConfig):
if values["auth"].type not in ALLOWED_AUTH_TYPES:
raise ValueError(
f'auth configuration has invalid authentication type={values["auth"].type}. Possible '
f'values={ALLOWED_AUTH_TYPES}'
f"auth configuration has invalid authentication type={values['auth'].type}. Possible "
f"values={ALLOWED_AUTH_TYPES}"
)
return values

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/templates/cassandra/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def collect_cassandra_store_settings():
# it's regular Cassandra
c_secure_bundle_path = None
hosts_string = click.prompt(
("Enter the seed hosts of your cluster " "(comma-separated IP addresses)"),
("Enter the seed hosts of your cluster (comma-separated IP addresses)"),
default="127.0.0.1",
)
c_hosts = [
Expand Down
15 changes: 7 additions & 8 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ def python_type_to_feast_value_type(
return ValueType[common_item_value_type.name + "_LIST"]

raise ValueError(
f"Value with native type {type_name} "
f"cannot be converted into Feast value type"
f"Value with native type {type_name} cannot be converted into Feast value type"
)


Expand Down Expand Up @@ -451,13 +450,13 @@ def _python_value_to_proto_value(
# Numpy convert 0 to int. However, in the feature view definition, the type of column may be a float.
# So, if value is 0, type validation must pass if scalar_types are either int or float.
allowed_types = {np.int64, int, np.float64, float}
assert (
type(sample) in allowed_types
), f"Type `{type(sample)}` not in {allowed_types}"
assert type(sample) in allowed_types, (
f"Type `{type(sample)}` not in {allowed_types}"
)
else:
assert (
type(sample) in valid_scalar_types
), f"Type `{type(sample)}` not in {valid_scalar_types}"
assert type(sample) in valid_scalar_types, (
f"Type `{type(sample)}` not in {valid_scalar_types}"
)
if feast_value_type == ValueType.BOOL:
# ProtoValue does not support conversion of np.bool_ so we need to convert it to support np.bool_.
return [
Expand Down
Loading

0 comments on commit 6b2643a

Please sign in to comment.