Skip to content

Commit

Permalink
fix: Table name format is backward compatible (#176)
Browse files Browse the repository at this point in the history
* fix: Table name hasing is backward compatible

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Feb 17, 2025
1 parent 63a10fa commit 25bf4ea
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 75 deletions.
105 changes: 86 additions & 19 deletions go/internal/feast/onlinestore/cassandraonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"math"
"math/big"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -38,6 +39,10 @@ type CassandraOnlineStore struct {
// The number of keys to include in a single CQL query for retrieval from the database
keyBatchSize int

// The version of the table name format
tableNameFormatVersion int

// Caches table names instead of generating the table name every time
tableNameCache sync.Map
}

Expand All @@ -54,9 +59,35 @@ type CassandraConfig struct {
}

const (
SCYLLADB_MAX_TABLE_NAME_LENGTH = 48
V2_TABLE_NAME_FORMAT_MAX_LENGTH = 48
BASE62_CHAR_SET = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
)

// toBase62 converts a big integer to a base62 string.
func toBase62(num *big.Int) string {
if num.Sign() == 0 {
return "0"
}

base := big.NewInt(62)
result := []string{}
zero := big.NewInt(0)
remainder := new(big.Int)

for num.Cmp(zero) > 0 {
num.DivMod(num, base, remainder)
result = append([]string{string(BASE62_CHAR_SET[remainder.Int64()])}, result...)
}

return strings.Join(result, "")
}

// base62Encode converts a byte slice to a Base62 string.
func base62Encode(data []byte) string {
num := new(big.Int).SetBytes(data)
return toBase62(num)
}

func parseStringField(config map[string]any, fieldName string, defaultValue string) (string, error) {
rawValue, ok := config[fieldName]
if !ok {
Expand Down Expand Up @@ -232,38 +263,68 @@ func NewCassandraOnlineStore(project string, config *registry.RepoConfig, online
}
store.keyBatchSize = cassandraConfig.keyBatchSize

// parse tableNameFormatVersion
tableNameFormatVersion, ok := onlineStoreConfig["table_name_format_version"]
if !ok {
tableNameFormatVersion = 1
log.Warn().Msg("table_name_format_version not specified: Using 1 instead")
}
store.tableNameFormatVersion = tableNameFormatVersion.(int)

return &store, nil
}

func (c *CassandraOnlineStore) getFqTableName(keySpace string, project string, featureViewName string) string {
// fqTableNameV2 generates a fully qualified table name with Base62 hashing.
func getFqTableNameV2(keyspace string, project string, featureViewName string) string {
dbTableName := fmt.Sprintf("%s_%s", project, featureViewName)

if len(dbTableName) <= V2_TABLE_NAME_FORMAT_MAX_LENGTH {
return dbTableName
}

// Truncate project & feature view name
prjPrefixMaxLen := 5
fvPrefixMaxLen := 5
truncatedProject := project[:min(len(project), prjPrefixMaxLen)]
truncatedFv := featureViewName[:min(len(featureViewName), fvPrefixMaxLen)]

projectToHash := project[len(truncatedProject):]
fvToHash := featureViewName[len(truncatedFv):]

projectHashBytes := md5.Sum([]byte(projectToHash))
fvHashBytes := md5.Sum([]byte(fvToHash))

// Compute MD5 hash and encode to Base62
projectHash := base62Encode(projectHashBytes[:])
fvHash := base62Encode(fvHashBytes[:])

// Format final table name (48 - 3 underscores - 5 prj prefix - 5 fv prefix) / 2 = ~17 each
dbTableName = fmt.Sprintf("%s_%s_%s_%s",
truncatedProject, projectHash[:17], truncatedFv, fvHash[:18])

return dbTableName
}

func (c *CassandraOnlineStore) getFqTableName(keySpace string, project string, featureViewName string, tableNameVersion int) (string, error) {
var dbTableName string

tableName := fmt.Sprintf("%s_%s", project, featureViewName)

if cacheValue, found := c.tableNameCache.Load(tableName); found {
return fmt.Sprintf(`"%s"."%s"`, keySpace, cacheValue.(string))
return fmt.Sprintf(`"%s"."%s"`, keySpace, cacheValue.(string)), nil
}

if len(tableName) <= SCYLLADB_MAX_TABLE_NAME_LENGTH {
if tableNameVersion == 1 {
dbTableName = tableName
} else if tableNameVersion == 2 {
dbTableName = getFqTableNameV2(keySpace, project, featureViewName)
} else {
projectHashBytes := md5.Sum([]byte(project))
projectHash := hex.EncodeToString(projectHashBytes[:])[:7]

dbTableHashBytes := md5.Sum([]byte(tableName))
dbTableHash := hex.EncodeToString(dbTableHashBytes[:])[:8]

// Shorten FeatureView name if it exceeds 30 characters
if len(featureViewName) > 30 {
featureViewName = featureViewName[:30]
}

dbTableName = fmt.Sprintf("p%s_%s_%s", projectHash, featureViewName, dbTableHash)
return "", fmt.Errorf("unknown table name format version: %d", tableNameVersion)
}

c.tableNameCache.Store(tableName, dbTableName)

return fmt.Sprintf(`"%s"."%s"`, keySpace, dbTableName)
return fmt.Sprintf(`"%s"."%s"`, keySpace, dbTableName), nil
}

func (c *CassandraOnlineStore) getSingleKeyCQLStatement(tableName string, featureNames []string) string {
Expand Down Expand Up @@ -341,7 +402,10 @@ func (c *CassandraOnlineStore) UnbatchedKeysOnlineRead(ctx context.Context, enti
featureViewName := featureViewNames[0]

// Prepare the query
tableName := c.getFqTableName(c.clusterConfigs.Keyspace, c.project, featureViewName)
tableName, err := c.getFqTableName(c.clusterConfigs.Keyspace, c.project, featureViewName, c.tableNameFormatVersion)
if err != nil {
return nil, err
}
cqlStatement := c.getSingleKeyCQLStatement(tableName, featureNames)

var waitGroup sync.WaitGroup
Expand Down Expand Up @@ -486,7 +550,10 @@ func (c *CassandraOnlineStore) BatchedKeysOnlineRead(ctx context.Context, entity
featureViewName := featureViewNames[0]

// Prepare the query
tableName := c.getFqTableName(c.clusterConfigs.Keyspace, c.project, featureViewName)
tableName, err := c.getFqTableName(c.clusterConfigs.Keyspace, c.project, featureViewName, c.tableNameFormatVersion)
if err != nil {
return nil, err
}

// Key batching
nKeys := len(serializedEntityKeys)
Expand Down
73 changes: 51 additions & 22 deletions go/internal/feast/onlinestore/cassandraonlinestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"reflect"
"strings"
"testing"

"github.com/gocql/gocql"
Expand Down Expand Up @@ -59,7 +58,9 @@ func TestGetFqTableName(t *testing.T) {
},
}

fqTableName := store.getFqTableName(store.clusterConfigs.Keyspace, store.project, "dummy_fv")
tableNameVersion := 1

fqTableName, _ := store.getFqTableName(store.clusterConfigs.Keyspace, store.project, "dummy_fv", tableNameVersion)
assert.Equal(t, `"scylladb"."dummy_project_dummy_fv"`, fqTableName)
}

Expand Down Expand Up @@ -91,63 +92,69 @@ func TestOnlineRead_RejectsDifferentFeatureViewsInSameRead(t *testing.T) {
assert.Error(t, err)
}

func TestGetFqTableNameWithinLimit(t *testing.T) {
func TestGetFqTableName_Version1(t *testing.T) {
store := CassandraOnlineStore{
project: "test_project",
}

keySpace := "test_keyspace"
featureViewName := "test_feature_view"
tableNameVersion := 1

expectedTableName := `"test_keyspace"."test_project_test_feature_view"`
actualTableName := store.getFqTableName(keySpace, store.project, featureViewName)
actualTableName, err := store.getFqTableName(keySpace, store.project, featureViewName, tableNameVersion)

assert.NoError(t, err)
assert.Equal(t, expectedTableName, actualTableName)
}

func TestGetFqTableNameExceedsLimit(t *testing.T) {
func TestGetFqTableName_Version2_WithinLimit(t *testing.T) {
store := CassandraOnlineStore{
project: "test_project",
}

keySpace := "test_keyspace"
featureViewName := "test_feature_view_with_a_very_long_name_exceeding_limit"
featureViewName := "test_feature_view"
tableNameVersion := 2

expectedTableName := `"test_keyspace"."p6e72a69_test_feature_view_with_a_very__4d479508"`
actualTableName := store.getFqTableName(keySpace, store.project, featureViewName)
expectedTableName := `"test_keyspace"."test_project_test_feature_view"`
actualTableName, err := store.getFqTableName(keySpace, store.project, featureViewName, tableNameVersion)

assert.NoError(t, err)
assert.Equal(t, expectedTableName, actualTableName)
}

func TestGetFqTableNameEdgeCase(t *testing.T) {
func TestGetFqTableName_Version2_ExceedsLimit(t *testing.T) {
store := CassandraOnlineStore{
project: "test_project",
}

keySpace := "test_keyspace"
featureViewName := strings.Repeat("a", 30)
featureViewName := "test_feature_view_with_a_very_long_name_exceeding_limit"
tableNameVersion := 2

expectedTableName := `"test_keyspace"."test_project_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"`
actualTableName := store.getFqTableName(keySpace, store.project, featureViewName)
expectedTableName := `"test_keyspace"."test__29UZUpJQRijDZsYzl_test__5Ur8Mv5QutEG23Cp2C"`
actualTableName, err := store.getFqTableName(keySpace, store.project, featureViewName, tableNameVersion)

assert.NoError(t, err)
assert.Equal(t, expectedTableName, actualTableName)
}

func TestGetFqTableNameEdgeCaseExceedsLimit(t *testing.T) {
func TestGetFqTableName_InvalidVersion(t *testing.T) {
store := CassandraOnlineStore{
project: "test_project_edge",
project: "test_project",
}

keySpace := "test_keyspace"
featureViewName := strings.Repeat("a", 31)

expectedTableName := `"test_keyspace"."pfd98066_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa_625ed0fd"`
actualTableName := store.getFqTableName(keySpace, store.project, featureViewName)
featureViewName := "test_feature_view"
tableNameVersion := 3

assert.Equal(t, expectedTableName, actualTableName)
_, err := store.getFqTableName(keySpace, store.project, featureViewName, tableNameVersion)
assert.Error(t, err)
assert.Equal(t, "unknown table name format version: 3", err.Error())
}

func TestGetFqTableNameWithCache(t *testing.T) {
func TestGetFqTableName_WithCache(t *testing.T) {
store := CassandraOnlineStore{
project: "test_project",
}
Expand All @@ -160,19 +167,41 @@ func TestGetFqTableNameWithCache(t *testing.T) {
expectedTableName := `"test_keyspace"."cached_table_name"`
store.tableNameCache.Store(tableName, "cached_table_name")

actualTableName := store.getFqTableName(keySpace, store.project, featureViewName)
actualTableName, err := store.getFqTableName(keySpace, store.project, featureViewName, 1)

assert.NoError(t, err)
assert.Equal(t, expectedTableName, actualTableName)
}

func TestGetFqTableName_EmptyCache(t *testing.T) {
store := CassandraOnlineStore{
project: "test_project",
}

keySpace := "test_keyspace"
featureViewName := "test_feature_view"
tableNameVersion := 1

expectedTableName := `"test_keyspace"."test_project_test_feature_view"`
actualTableName, err := store.getFqTableName(keySpace, store.project, featureViewName, tableNameVersion)

assert.NoError(t, err)
assert.Equal(t, expectedTableName, actualTableName)

// Verify that the table name is cached
cachedValue, found := store.tableNameCache.Load(fmt.Sprintf("%s_%s", store.project, featureViewName))
assert.True(t, found)
assert.Equal(t, "test_project_test_feature_view", cachedValue)
}
func BenchmarkGetFqTableName(b *testing.B) {
store := CassandraOnlineStore{
project: "test_project",
}
keySpace := "test_keyspace"
project := store.project
tableNameVersion := 2
featureViewNames := []string{"small_feature_view", "large_feature_view_large_feature_view_large_feature_view", "large_feature_view_large_feature_view_large_feature_view_large_feature_view_large_feature_view_large_feature_view_large_feature_view_large_feature_view_large_feature_view"}
for i := 0; i < b.N; i++ {
store.getFqTableName(keySpace, project, featureViewNames[i%3])
store.getFqTableName(keySpace, project, featureViewNames[i%3], tableNameVersion)
}
}
Loading

0 comments on commit 25bf4ea

Please sign in to comment.