From 0c2d4eef5f774ad3793e53dcc34bcfe7a0b70988 Mon Sep 17 00:00:00 2001 From: Dimitris Stafylarakis Date: Sun, 5 Jan 2025 10:26:21 +0100 Subject: [PATCH] remove unused online features service Signed-off-by: Dimitris Stafylarakis --- MANIFEST.in | 1 - go/README.md | 4 +- go/embedded/online_features.go | 424 ------------------ pyproject.toml | 2 +- sdk/python/feast/embedded_go/__init__.py | 0 .../embedded_go/online_features_service.py | 345 -------------- sdk/python/feast/embedded_go/type_map.py | 88 ---- sdk/python/feast/feature_logging.py | 57 ++- 8 files changed, 58 insertions(+), 863 deletions(-) delete mode 100644 go/embedded/online_features.go delete mode 100644 sdk/python/feast/embedded_go/__init__.py delete mode 100644 sdk/python/feast/embedded_go/online_features_service.py delete mode 100644 sdk/python/feast/embedded_go/type_map.py diff --git a/MANIFEST.in b/MANIFEST.in index c9828633d9..96f7c38c8a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -6,4 +6,3 @@ prune infra prune examples graft sdk/python/feast/ui/build -graft sdk/python/feast/embedded_go/lib diff --git a/go/README.md b/go/README.md index d18b75815f..89d5e56075 100644 --- a/go/README.md +++ b/go/README.md @@ -1,12 +1,10 @@ [Update 10/31/2024] This Go feature server code is updated from the Expedia Group's forked Feast branch (https://github.com/EXPEbdodla/feast) on 10/22/2024. Thanks the engineers of the Expedia Groups who contributed and improved the Go feature server. -This directory contains the Go logic that's executed by the `EmbeddedOnlineFeatureServer` from Python. - ## Build and Run To build and run the Go Feature Server locally, create a feature_store.yaml file with necessary configurations and run below commands: ```bash go build -o feast ./go/main.go ./feast --type=http --port=8080 -``` \ No newline at end of file +``` diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go deleted file mode 100644 index 3cbd47ae5b..0000000000 --- a/go/embedded/online_features.go +++ /dev/null @@ -1,424 +0,0 @@ -package embedded - -import ( - "context" - "fmt" - "log" - "net" - "os" - "os/signal" - //"strings" - "syscall" - "time" - - //"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - - "github.com/apache/arrow/go/v17/arrow" - "github.com/apache/arrow/go/v17/arrow/array" - "github.com/apache/arrow/go/v17/arrow/cdata" - "github.com/apache/arrow/go/v17/arrow/memory" - "google.golang.org/grpc" - - "github.com/feast-dev/feast/go/internal/feast" - "github.com/feast-dev/feast/go/internal/feast/model" - "github.com/feast-dev/feast/go/internal/feast/onlineserving" - "github.com/feast-dev/feast/go/internal/feast/registry" - "github.com/feast-dev/feast/go/internal/feast/server" - "github.com/feast-dev/feast/go/internal/feast/server/logging" - "github.com/feast-dev/feast/go/internal/feast/transformation" - "github.com/feast-dev/feast/go/protos/feast/serving" - prototypes "github.com/feast-dev/feast/go/protos/feast/types" - "github.com/feast-dev/feast/go/types" - jsonlog "github.com/rs/zerolog/log" - "google.golang.org/grpc/health" - "google.golang.org/grpc/health/grpc_health_v1" - //grpctrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/google.golang.org/grpc" -) - -type OnlineFeatureService struct { - fs *feast.FeatureStore - grpcStopCh chan os.Signal - httpStopCh chan os.Signal - - statusColumnBuildersToRelease []*array.Int32Builder - tsColumnBuildersToRelease []*array.Int64Builder - arraysToRelease []arrow.Array - resultsToRelease []arrow.Record - - err error -} - -type OnlineFeatureServiceConfig struct { - RepoPath string - RepoConfig string -} - -type DataTable struct { - DataPtr uintptr - SchemaPtr uintptr -} - -// LoggingOptions is a public (embedded) copy of logging.LoggingOptions struct. -// See logging.LoggingOptions for properties description -type LoggingOptions struct { - ChannelCapacity int - EmitTimeout time.Duration - WriteInterval time.Duration - FlushInterval time.Duration -} - -func NewOnlineFeatureService(conf *OnlineFeatureServiceConfig, transformationCallback transformation.TransformationCallback) *OnlineFeatureService { - repoConfig, err := registry.NewRepoConfigFromJSON(conf.RepoPath, conf.RepoConfig) - if err != nil { - jsonlog.Error().Stack().Err(err).Msg("Failed to convert to RepoConfig") - return &OnlineFeatureService{ - err: err, - } - } - - fs, err := feast.NewFeatureStore(repoConfig, transformationCallback) - if err != nil { - jsonlog.Error().Stack().Err(err).Msg("Failed to create NewFeatureStore") - return &OnlineFeatureService{ - err: err, - } - } - - // Notify these channels when receiving interrupt or termination signals from OS - httpStopCh := make(chan os.Signal, 1) - grpcStopCh := make(chan os.Signal, 1) - signal.Notify(httpStopCh, syscall.SIGINT, syscall.SIGTERM) - signal.Notify(grpcStopCh, syscall.SIGINT, syscall.SIGTERM) - - return &OnlineFeatureService{fs: fs, httpStopCh: httpStopCh, grpcStopCh: grpcStopCh} -} - -func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[string]int32, error) { - viewNames := make(map[string]interface{}) - for _, featureRef := range featureRefs { - viewName, _, err := onlineserving.ParseFeatureReference(featureRef) - if err != nil { - return nil, err - } - viewNames[viewName] = nil - } - - joinKeyTypes := make(map[string]int32) - - for viewName := range viewNames { - view, err := s.fs.GetFeatureView(viewName, true) - if err != nil { - // skip on demand feature views - continue - } - for _, entityColumn := range view.EntityColumns { - joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) - } - } - - return joinKeyTypes, nil -} - -func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceName string) (map[string]int32, error) { - featureService, err := s.fs.GetFeatureService(featureServiceName) - if err != nil { - return nil, err - } - - joinKeyTypes := make(map[string]int32) - - for _, projection := range featureService.Projections { - view, err := s.fs.GetFeatureView(projection.Name, true) - if err != nil { - // skip on demand feature views - continue - } - for _, entityColumn := range view.EntityColumns { - joinKeyTypes[entityColumn.Name] = int32(entityColumn.Dtype.Number()) - } - } - - return joinKeyTypes, nil -} - -func (s *OnlineFeatureService) CheckForInstantiationError() error { - return s.err -} - -func (s *OnlineFeatureService) GetOnlineFeatures( - featureRefs []string, - featureServiceName string, - entities DataTable, - requestData DataTable, - fullFeatureNames bool, - output DataTable) error { - - entitiesRecord, err := readArrowRecord(entities) - if err != nil { - return err - } - defer entitiesRecord.Release() - - numRows := entitiesRecord.Column(0).Len() - - entitiesProto, err := recordToProto(entitiesRecord) - if err != nil { - return err - } - - requestDataRecords, err := readArrowRecord(requestData) - if err != nil { - return err - } - defer requestDataRecords.Release() - - requestDataProto, err := recordToProto(requestDataRecords) - if err != nil { - return err - } - - var featureService *model.FeatureService - if featureServiceName != "" { - featureService, err = s.fs.GetFeatureService(featureServiceName) - } - - resp, err := s.fs.GetOnlineFeatures( - context.Background(), - featureRefs, - featureService, - entitiesProto, - requestDataProto, - fullFeatureNames) - - if err != nil { - return err - } - - // Release all objects that are no longer required. - for _, statusColumnBuilderToRelease := range s.statusColumnBuildersToRelease { - statusColumnBuilderToRelease.Release() - } - for _, tsColumnBuilderToRelease := range s.tsColumnBuildersToRelease { - tsColumnBuilderToRelease.Release() - } - for _, arrayToRelease := range s.arraysToRelease { - arrayToRelease.Release() - } - for _, resultsToRelease := range s.resultsToRelease { - resultsToRelease.Release() - } - s.statusColumnBuildersToRelease = nil - s.tsColumnBuildersToRelease = nil - s.arraysToRelease = nil - s.resultsToRelease = nil - - outputFields := make([]arrow.Field, 0) - outputColumns := make([]arrow.Array, 0) - pool := memory.NewGoAllocator() - for _, featureVector := range resp { - outputFields = append(outputFields, - arrow.Field{ - Name: featureVector.Name, - Type: featureVector.Values.DataType()}) - outputFields = append(outputFields, - arrow.Field{ - Name: fmt.Sprintf("%s__status", featureVector.Name), - Type: arrow.PrimitiveTypes.Int32}) - outputFields = append(outputFields, - arrow.Field{ - Name: fmt.Sprintf("%s__timestamp", featureVector.Name), - Type: arrow.PrimitiveTypes.Int64}) - - outputColumns = append(outputColumns, featureVector.Values) - - statusColumnBuilder := array.NewInt32Builder(pool) - for _, status := range featureVector.Statuses { - statusColumnBuilder.Append(int32(status)) - } - statusColumn := statusColumnBuilder.NewArray() - outputColumns = append(outputColumns, statusColumn) - - tsColumnBuilder := array.NewInt64Builder(pool) - for _, ts := range featureVector.Timestamps { - tsColumnBuilder.Append(ts.GetSeconds()) - } - tsColumn := tsColumnBuilder.NewArray() - outputColumns = append(outputColumns, tsColumn) - - // Mark builders and arrays for release. - s.statusColumnBuildersToRelease = append(s.statusColumnBuildersToRelease, statusColumnBuilder) - s.tsColumnBuildersToRelease = append(s.tsColumnBuildersToRelease, tsColumnBuilder) - s.arraysToRelease = append(s.arraysToRelease, statusColumn) - s.arraysToRelease = append(s.arraysToRelease, tsColumn) - s.arraysToRelease = append(s.arraysToRelease, featureVector.Values) - } - - result := array.NewRecord(arrow.NewSchema(outputFields, nil), outputColumns, int64(numRows)) - s.resultsToRelease = append(s.resultsToRelease, result) - - cdata.ExportArrowRecordBatch(result, cdata.ArrayFromPtr(output.DataPtr), cdata.SchemaFromPtr(output.SchemaPtr)) - - return nil -} - -// StartGprcServer starts gRPC server with disabled feature logging and blocks the thread -func (s *OnlineFeatureService) StartGprcServer(host string, port int) error { - return s.StartGrpcServerWithLogging(host, port, nil, LoggingOptions{}) -} - -// StartGprcServerWithLoggingDefaultOpts starts gRPC server with enabled feature logging but default configuration for logging -// Caller of this function must provide Python callback to flush buffered logs -func (s *OnlineFeatureService) StartGprcServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error { - defaultOpts := LoggingOptions{ - ChannelCapacity: logging.DefaultOptions.ChannelCapacity, - EmitTimeout: logging.DefaultOptions.EmitTimeout, - WriteInterval: logging.DefaultOptions.WriteInterval, - FlushInterval: logging.DefaultOptions.FlushInterval, - } - return s.StartGrpcServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts) -} - -func (s *OnlineFeatureService) constructLoggingService(writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) (*logging.LoggingService, error) { - var loggingService *logging.LoggingService = nil - if writeLoggedFeaturesCallback != nil { - sink, err := logging.NewOfflineStoreSink(writeLoggedFeaturesCallback) - if err != nil { - return nil, err - } - - loggingService, err = logging.NewLoggingService(s.fs, sink, logging.LoggingOptions{ - ChannelCapacity: loggingOpts.ChannelCapacity, - EmitTimeout: loggingOpts.EmitTimeout, - WriteInterval: loggingOpts.WriteInterval, - FlushInterval: loggingOpts.FlushInterval, - }) - if err != nil { - return nil, err - } - } - return loggingService, nil -} - -// StartGrpcServerWithLogging starts gRPC server with enabled feature logging -// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts) -func (s *OnlineFeatureService) StartGrpcServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error { - //if strings.ToLower(os.Getenv("ENABLE_DATADOG_TRACING")) == "true" { - // tracer.Start(tracer.WithRuntimeMetrics()) - // defer tracer.Stop() - //} - - loggingService, err := s.constructLoggingService(writeLoggedFeaturesCallback, loggingOpts) - if err != nil { - return err - } - ser := server.NewGrpcServingServiceServer(s.fs, loggingService) - log.Printf("Starting a gRPC server on host %s port %d\n", host, port) - lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", host, port)) - if err != nil { - return err - } - - //grpcServer := grpc.NewServer(grpc.UnaryInterceptor(grpctrace.UnaryServerInterceptor())) - grpcServer := grpc.NewServer() - - serving.RegisterServingServiceServer(grpcServer, ser) - healthService := health.NewServer() - grpc_health_v1.RegisterHealthServer(grpcServer, healthService) - - go func() { - // As soon as these signals are received from OS, try to gracefully stop the gRPC server - <-s.grpcStopCh - log.Println("Stopping the gRPC server...") - grpcServer.GracefulStop() - if loggingService != nil { - loggingService.Stop() - } - log.Println("gRPC server terminated") - }() - - err = grpcServer.Serve(lis) - if err != nil { - return err - } - return nil -} - -// StartHttpServer starts HTTP server with disabled feature logging and blocks the thread -func (s *OnlineFeatureService) StartHttpServer(host string, port int) error { - return s.StartHttpServerWithLogging(host, port, nil, LoggingOptions{}) -} - -// StartHttpServerWithLoggingDefaultOpts starts HTTP server with enabled feature logging but default configuration for logging -// Caller of this function must provide Python callback to flush buffered logs -func (s *OnlineFeatureService) StartHttpServerWithLoggingDefaultOpts(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback) error { - defaultOpts := LoggingOptions{ - ChannelCapacity: logging.DefaultOptions.ChannelCapacity, - EmitTimeout: logging.DefaultOptions.EmitTimeout, - WriteInterval: logging.DefaultOptions.WriteInterval, - FlushInterval: logging.DefaultOptions.FlushInterval, - } - return s.StartHttpServerWithLogging(host, port, writeLoggedFeaturesCallback, defaultOpts) -} - -// StartHttpServerWithLogging starts HTTP server with enabled feature logging -// Caller of this function must provide Python callback to flush buffered logs as well as logging configuration (loggingOpts) -func (s *OnlineFeatureService) StartHttpServerWithLogging(host string, port int, writeLoggedFeaturesCallback logging.OfflineStoreWriteCallback, loggingOpts LoggingOptions) error { - loggingService, err := s.constructLoggingService(writeLoggedFeaturesCallback, loggingOpts) - if err != nil { - return err - } - ser := server.NewHttpServer(s.fs, loggingService) - log.Printf("Starting a HTTP server on host %s port %d\n", host, port) - - go func() { - // As soon as these signals are received from OS, try to gracefully stop the gRPC server - <-s.httpStopCh - log.Println("Stopping the HTTP server...") - err := ser.Stop() - if err != nil { - log.Printf("Error when stopping the HTTP server: %v\n", err) - } - if loggingService != nil { - loggingService.Stop() - } - log.Println("HTTP server terminated") - }() - - return ser.Serve(host, port) -} - -func (s *OnlineFeatureService) StopHttpServer() { - s.httpStopCh <- syscall.SIGINT -} - -func (s *OnlineFeatureService) StopGrpcServer() { - s.grpcStopCh <- syscall.SIGINT -} - -/* -Read Record Batch from memory managed by Python caller. -Python part uses C ABI interface to export this record into C Data Interface, -and then it provides pointers (dataPtr & schemaPtr) to the Go part. -Here we import this data from given pointers and wrap the underlying values -into Go Arrow Interface (array.Record). -See export code here https://github.com/feast-dev/feast/blob/master/sdk/python/feast/embedded_go/online_features_service.py -*/ -func readArrowRecord(data DataTable) (arrow.Record, error) { - return cdata.ImportCRecordBatch( - cdata.ArrayFromPtr(data.DataPtr), - cdata.SchemaFromPtr(data.SchemaPtr)) -} - -func recordToProto(rec arrow.Record) (map[string]*prototypes.RepeatedValue, error) { - r := make(map[string]*prototypes.RepeatedValue) - schema := rec.Schema() - for idx, column := range rec.Columns() { - field := schema.Field(idx) - values, err := types.ArrowValuesToProtoValues(column) - if err != nil { - return nil, err - } - r[field.Name] = &prototypes.RepeatedValue{Val: values} - } - return r, nil -} diff --git a/pyproject.toml b/pyproject.toml index 2a051231e2..d15ac4515e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,4 +31,4 @@ exclude = [ "pb2.py", ".pyi", "protos", - "sdk/python/feast/embedded_go/lib"] +] diff --git a/sdk/python/feast/embedded_go/__init__.py b/sdk/python/feast/embedded_go/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py deleted file mode 100644 index 8dd7b5ba0a..0000000000 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ /dev/null @@ -1,345 +0,0 @@ -import logging -from functools import partial -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union - -import pyarrow as pa -from google.protobuf.timestamp_pb2 import Timestamp -from pyarrow.cffi import ffi - -from feast.errors import ( - FeatureNameCollisionError, - RequestDataNotFoundInEntityRowsException, -) -from feast.feature_service import FeatureService -from feast.infra.feature_servers.base_config import FeatureLoggingConfig -from feast.online_response import OnlineResponse -from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse -from feast.protos.feast.types import Value_pb2 -from feast.repo_config import RepoConfig -from feast.types import from_value_type -from feast.value_type import ValueType - -from .lib.embedded import ( - DataTable, - LoggingOptions, - NewOnlineFeatureService, - OnlineFeatureServiceConfig, -) -from .lib.go import Slice_string -from .type_map import FEAST_TYPE_TO_ARROW_TYPE, arrow_array_to_array_of_proto - -if TYPE_CHECKING: - from feast.feature_store import FeatureStore - -NANO_SECOND = 1 -MICRO_SECOND = 1000 * NANO_SECOND -MILLI_SECOND = 1000 * MICRO_SECOND -SECOND = 1000 * MILLI_SECOND - -logger = logging.getLogger(__name__) - - -class EmbeddedOnlineFeatureServer: - def __init__( - self, repo_path: str, repo_config: RepoConfig, feature_store: "FeatureStore" - ): - # keep callback in self to prevent it from GC - self._transformation_callback = partial(transformation_callback, feature_store) - self._logging_callback = partial(logging_callback, feature_store) - - self._config = OnlineFeatureServiceConfig( - RepoPath=repo_path, RepoConfig=repo_config.json() - ) - - self._service = NewOnlineFeatureService( - self._config, - self._transformation_callback, - ) - - # This should raise an exception if there were any errors in NewOnlineFeatureService. - self._service.CheckForInstantiationError() - - def get_online_features( - self, - features_refs: List[str], - feature_service: Optional[FeatureService], - entities: Dict[str, Union[List[Any], Value_pb2.RepeatedValue]], - request_data: Dict[str, Union[List[Any], Value_pb2.RepeatedValue]], - full_feature_names: bool = False, - ): - if feature_service: - join_keys_types = self._service.GetEntityTypesMapByFeatureService( - feature_service.name - ) - else: - join_keys_types = self._service.GetEntityTypesMap( - Slice_string(features_refs) - ) - - join_keys_types = { - join_key: ValueType(enum_value) for join_key, enum_value in join_keys_types - } - - # Here we create C structures that will be shared between Python and Go. - # We will pass entities as arrow Record Batch to Go part (in_c_array & in_c_schema) - # and receive features as Record Batch from Go (out_c_array & out_c_schema) - # This objects needs to be initialized here in order to correctly - # free them later using Python GC. - ( - entities_c_schema, - entities_ptr_schema, - entities_c_array, - entities_ptr_array, - ) = allocate_schema_and_array() - ( - req_data_c_schema, - req_data_ptr_schema, - req_data_c_array, - req_data_ptr_array, - ) = allocate_schema_and_array() - - ( - features_c_schema, - features_ptr_schema, - features_c_array, - features_ptr_array, - ) = allocate_schema_and_array() - - batch, schema = map_to_record_batch(entities, join_keys_types) - schema._export_to_c(entities_ptr_schema) - batch._export_to_c(entities_ptr_array) - - batch, schema = map_to_record_batch(request_data) - schema._export_to_c(req_data_ptr_schema) - batch._export_to_c(req_data_ptr_array) - - try: - self._service.GetOnlineFeatures( - featureRefs=Slice_string(features_refs), - featureServiceName=feature_service and feature_service.name or "", - entities=DataTable( - SchemaPtr=entities_ptr_schema, DataPtr=entities_ptr_array - ), - requestData=DataTable( - SchemaPtr=req_data_ptr_schema, DataPtr=req_data_ptr_array - ), - fullFeatureNames=full_feature_names, - output=DataTable( - SchemaPtr=features_ptr_schema, DataPtr=features_ptr_array - ), - ) - except RuntimeError as exc: - (msg,) = exc.args - if msg.startswith("featureNameCollisionError"): - feature_refs = msg[len("featureNameCollisionError: ") : msg.find(";")] - feature_refs = feature_refs.split(",") - raise FeatureNameCollisionError( - feature_refs_collisions=feature_refs, - full_feature_names=full_feature_names, - ) - - if msg.startswith("requestDataNotFoundInEntityRowsException"): - feature_refs = msg[len("requestDataNotFoundInEntityRowsException: ") :] - feature_refs = feature_refs.split(",") - raise RequestDataNotFoundInEntityRowsException(feature_refs) - - raise - - record_batch = pa.RecordBatch._import_from_c( - features_ptr_array, features_ptr_schema - ) - resp = record_batch_to_online_response(record_batch) - del record_batch - return OnlineResponse(resp) - - def start_grpc_server( - self, - host: str, - port: int, - enable_logging: bool = True, - logging_options: Optional[FeatureLoggingConfig] = None, - ): - if enable_logging: - if logging_options: - self._service.StartGprcServerWithLogging( - host, - port, - self._logging_callback, - LoggingOptions( - FlushInterval=logging_options.flush_interval_secs * SECOND, - WriteInterval=logging_options.write_to_disk_interval_secs - * SECOND, - EmitTimeout=logging_options.emit_timeout_micro_secs - * MICRO_SECOND, - ChannelCapacity=logging_options.queue_capacity, - ), - ) - else: - self._service.StartGprcServerWithLoggingDefaultOpts( - host, port, self._logging_callback - ) - else: - self._service.StartGprcServer(host, port) - - def start_http_server( - self, - host: str, - port: int, - enable_logging: bool = True, - logging_options: Optional[FeatureLoggingConfig] = None, - ): - if enable_logging: - if logging_options: - self._service.StartHttpServerWithLogging( - host, - port, - self._logging_callback, - LoggingOptions( - FlushInterval=logging_options.flush_interval_secs * SECOND, - WriteInterval=logging_options.write_to_disk_interval_secs - * SECOND, - EmitTimeout=logging_options.emit_timeout_micro_secs - * MICRO_SECOND, - ChannelCapacity=logging_options.queue_capacity, - ), - ) - else: - self._service.StartHttpServerWithLoggingDefaultOpts( - host, port, self._logging_callback - ) - else: - self._service.StartHttpServer(host, port) - - def stop_grpc_server(self): - self._service.StopGrpcServer() - - def stop_http_server(self): - self._service.StopHttpServer() - - -def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array: - if isinstance(value, Value_pb2.RepeatedValue): - _proto_to_arrow(value) - - if type_hint: - feast_type = from_value_type(type_hint) - if feast_type in FEAST_TYPE_TO_ARROW_TYPE: - return pa.array(value, FEAST_TYPE_TO_ARROW_TYPE[feast_type]) - - return pa.array(value) - - -def _proto_to_arrow(value: Value_pb2.RepeatedValue) -> pa.Array: - """ - ToDo: support entity rows already packed in protos - """ - raise NotImplementedError - - -def transformation_callback( - fs: "FeatureStore", - on_demand_feature_view_name: str, - input_arr_ptr: int, - input_schema_ptr: int, - output_arr_ptr: int, - output_schema_ptr: int, - full_feature_names: bool, -) -> int: - try: - odfv = fs.get_on_demand_feature_view(on_demand_feature_view_name) - - input_record = pa.RecordBatch._import_from_c(input_arr_ptr, input_schema_ptr) - - # For some reason, the callback is called with `full_feature_names` as a 1 if True or 0 if false. This handles - # the typeguard requirement. - full_feature_names = bool(full_feature_names) - - if odfv.mode != "pandas": - raise Exception( - f"OnDemandFeatureView mode '{odfv.mode} not supported by EmbeddedOnlineFeatureServer." - ) - - output = odfv.get_transformed_features_df( # type: ignore - input_record.to_pandas(), full_feature_names=full_feature_names - ) - output_record = pa.RecordBatch.from_pandas(output) - - output_record.schema._export_to_c(output_schema_ptr) - output_record._export_to_c(output_arr_ptr) - - return output_record.num_rows - except Exception as e: - logger.exception(f"transformation callback failed with exception: {e}", e) - return 0 - - -def logging_callback( - fs: "FeatureStore", - feature_service_name: str, - dataset_dir: str, -) -> bytes: - feature_service = fs.get_feature_service(feature_service_name, allow_cache=True) - try: - fs.write_logged_features(logs=Path(dataset_dir), source=feature_service) - except Exception as exc: - return repr(exc).encode() - - return "".encode() # no error - - -def allocate_schema_and_array(): - c_schema = ffi.new("struct ArrowSchema*") - ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - - c_array = ffi.new("struct ArrowArray*") - ptr_array = int(ffi.cast("uintptr_t", c_array)) - return c_schema, ptr_schema, c_array, ptr_array - - -def map_to_record_batch( - map: Dict[str, Union[List[Any], Value_pb2.RepeatedValue]], - type_hint: Optional[Dict[str, ValueType]] = None, -) -> Tuple[pa.RecordBatch, pa.Schema]: - fields = [] - columns = [] - type_hint = type_hint or {} - - for name, values in map.items(): - arr = _to_arrow(values, type_hint.get(name)) - fields.append((name, arr.type)) - columns.append(arr) - - schema = pa.schema(fields) - batch = pa.RecordBatch.from_arrays(columns, schema=schema) - return batch, schema - - -def record_batch_to_online_response(record_batch): - resp = GetOnlineFeaturesResponse() - - for idx, field in enumerate(record_batch.schema): - if field.name.endswith("__timestamp") or field.name.endswith("__status"): - continue - - feature_vector = GetOnlineFeaturesResponse.FeatureVector( - statuses=record_batch.columns[idx + 1].to_pylist(), - event_timestamps=[ - Timestamp(seconds=seconds) - for seconds in record_batch.columns[idx + 2].to_pylist() - ], - ) - - if field.type == pa.null(): - feature_vector.values.extend( - [Value_pb2.Value()] * len(record_batch.columns[idx]) - ) - else: - feature_vector.values.extend( - arrow_array_to_array_of_proto(field.type, record_batch.columns[idx]) - ) - - resp.results.append(feature_vector) - resp.metadata.feature_names.val.append(field.name) - - return resp diff --git a/sdk/python/feast/embedded_go/type_map.py b/sdk/python/feast/embedded_go/type_map.py deleted file mode 100644 index 8f467c57ca..0000000000 --- a/sdk/python/feast/embedded_go/type_map.py +++ /dev/null @@ -1,88 +0,0 @@ -from datetime import timezone -from typing import List - -import pyarrow as pa - -from feast.protos.feast.types import Value_pb2 -from feast.types import Array, PrimitiveFeastType - -PA_TIMESTAMP_TYPE = pa.timestamp("s", tz=timezone.utc) - -ARROW_TYPE_TO_PROTO_FIELD = { - pa.int32(): "int32_val", - pa.int64(): "int64_val", - pa.float32(): "float_val", - pa.float64(): "double_val", - pa.bool_(): "bool_val", - pa.string(): "string_val", - pa.binary(): "bytes_val", - PA_TIMESTAMP_TYPE: "unix_timestamp_val", -} - -ARROW_LIST_TYPE_TO_PROTO_FIELD = { - pa.int32(): "int32_list_val", - pa.int64(): "int64_list_val", - pa.float32(): "float_list_val", - pa.float64(): "double_list_val", - pa.bool_(): "bool_list_val", - pa.string(): "string_list_val", - pa.binary(): "bytes_list_val", - PA_TIMESTAMP_TYPE: "unix_timestamp_list_val", -} - -ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = { - pa.int32(): Value_pb2.Int32List, - pa.int64(): Value_pb2.Int64List, - pa.float32(): Value_pb2.FloatList, - pa.float64(): Value_pb2.DoubleList, - pa.bool_(): Value_pb2.BoolList, - pa.string(): Value_pb2.StringList, - pa.binary(): Value_pb2.BytesList, - PA_TIMESTAMP_TYPE: Value_pb2.Int64List, -} - -FEAST_TYPE_TO_ARROW_TYPE = { - PrimitiveFeastType.INT32: pa.int32(), - PrimitiveFeastType.INT64: pa.int64(), - PrimitiveFeastType.FLOAT32: pa.float32(), - PrimitiveFeastType.FLOAT64: pa.float64(), - PrimitiveFeastType.STRING: pa.string(), - PrimitiveFeastType.BYTES: pa.binary(), - PrimitiveFeastType.BOOL: pa.bool_(), - PrimitiveFeastType.UNIX_TIMESTAMP: pa.timestamp("s"), - Array(PrimitiveFeastType.INT32): pa.list_(pa.int32()), - Array(PrimitiveFeastType.INT64): pa.list_(pa.int64()), - Array(PrimitiveFeastType.FLOAT32): pa.list_(pa.float32()), - Array(PrimitiveFeastType.FLOAT64): pa.list_(pa.float64()), - Array(PrimitiveFeastType.STRING): pa.list_(pa.string()), - Array(PrimitiveFeastType.BYTES): pa.list_(pa.binary()), - Array(PrimitiveFeastType.BOOL): pa.list_(pa.bool_()), - Array(PrimitiveFeastType.UNIX_TIMESTAMP): pa.list_(pa.timestamp("s")), -} - - -def arrow_array_to_array_of_proto( - arrow_type: pa.DataType, arrow_array: pa.Array -) -> List[Value_pb2.Value]: - values = [] - if isinstance(arrow_type, pa.ListType): - proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[arrow_type.value_type] - proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[arrow_type.value_type] - - if arrow_type.value_type == PA_TIMESTAMP_TYPE: - arrow_array = arrow_array.cast(pa.list_(pa.int64())) - - for v in arrow_array.tolist(): - values.append( - Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)}) - ) - else: - proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[arrow_type] - - if arrow_type == PA_TIMESTAMP_TYPE: - arrow_array = arrow_array.cast(pa.int64()) - - for v in arrow_array.tolist(): - values.append(Value_pb2.Value(**{proto_field_name: v})) - - return values diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 9bd5d8a91c..15e6ff5325 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -5,7 +5,6 @@ import pyarrow as pa from feast.data_source import DataSource -from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE from feast.errors import ( FeastObjectNotFoundException, FeatureViewNotFoundException, @@ -15,6 +14,8 @@ from feast.protos.feast.core.FeatureService_pb2 import ( LoggingConfig as LoggingConfigProto, ) +from feast.protos.feast.types import Value_pb2 +from feast.types import Array, PrimitiveFeastType if TYPE_CHECKING: from feast.feature_service import FeatureService @@ -25,6 +26,60 @@ LOG_TIMESTAMP_FIELD = "__log_timestamp" LOG_DATE_FIELD = "__log_date" +PA_TIMESTAMP_TYPE = pa.timestamp("s", tz=timezone.utc) + +ARROW_TYPE_TO_PROTO_FIELD = { + pa.int32(): "int32_val", + pa.int64(): "int64_val", + pa.float32(): "float_val", + pa.float64(): "double_val", + pa.bool_(): "bool_val", + pa.string(): "string_val", + pa.binary(): "bytes_val", + PA_TIMESTAMP_TYPE: "unix_timestamp_val", +} + +ARROW_LIST_TYPE_TO_PROTO_FIELD = { + pa.int32(): "int32_list_val", + pa.int64(): "int64_list_val", + pa.float32(): "float_list_val", + pa.float64(): "double_list_val", + pa.bool_(): "bool_list_val", + pa.string(): "string_list_val", + pa.binary(): "bytes_list_val", + PA_TIMESTAMP_TYPE: "unix_timestamp_list_val", +} + +ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = { + pa.int32(): Value_pb2.Int32List, + pa.int64(): Value_pb2.Int64List, + pa.float32(): Value_pb2.FloatList, + pa.float64(): Value_pb2.DoubleList, + pa.bool_(): Value_pb2.BoolList, + pa.string(): Value_pb2.StringList, + pa.binary(): Value_pb2.BytesList, + PA_TIMESTAMP_TYPE: Value_pb2.Int64List, +} + +FEAST_TYPE_TO_ARROW_TYPE = { + PrimitiveFeastType.INT32: pa.int32(), + PrimitiveFeastType.INT64: pa.int64(), + PrimitiveFeastType.FLOAT32: pa.float32(), + PrimitiveFeastType.FLOAT64: pa.float64(), + PrimitiveFeastType.STRING: pa.string(), + PrimitiveFeastType.BYTES: pa.binary(), + PrimitiveFeastType.BOOL: pa.bool_(), + PrimitiveFeastType.UNIX_TIMESTAMP: pa.timestamp("s"), + Array(PrimitiveFeastType.INT32): pa.list_(pa.int32()), + Array(PrimitiveFeastType.INT64): pa.list_(pa.int64()), + Array(PrimitiveFeastType.FLOAT32): pa.list_(pa.float32()), + Array(PrimitiveFeastType.FLOAT64): pa.list_(pa.float64()), + Array(PrimitiveFeastType.STRING): pa.list_(pa.string()), + Array(PrimitiveFeastType.BYTES): pa.list_(pa.binary()), + Array(PrimitiveFeastType.BOOL): pa.list_(pa.bool_()), + Array(PrimitiveFeastType.UNIX_TIMESTAMP): pa.list_(pa.timestamp("s")), +} + class LoggingSource: """