From c171403fdd01dd3b08a38ecf2875a2b725a65d31 Mon Sep 17 00:00:00 2001 From: "mykyta.oleksiienko" Date: Wed, 11 Dec 2024 12:56:26 +0200 Subject: [PATCH] Add support for cassandra 4.0 table options In the PR implemented backward compatibility with previous versions, and added new types support. To make metadata table support easier for future Cassandra versions, hardcode scan from Cassandra were replaced with new "parseSystemSchemaViews" method which is much easier to expand, even if some fields were added in the middle of the table it wouldn`t be an issue anymore. patch by Mykyta Oleksiienko; reviewed by Joao Reis CASSGO-13 --- CHANGELOG.md | 2 + cassandra_test.go | 25 ++++- metadata.go | 238 ++++++++++++++++++++++++++++++++++++---------- 3 files changed, 208 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67c88a141..d5af49219 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Change Batch API to be consistent with Query() (CASSGO-7) +- Added Cassandra 4.0 table options support(CASSGO-13) + ### Fixed - Retry policy now takes into account query idempotency (CASSGO-27) diff --git a/cassandra_test.go b/cassandra_test.go index ec6969190..c1cf25fb9 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -2314,7 +2314,7 @@ func TestViewMetadata(t *testing.T) { func TestMaterializedViewMetadata(t *testing.T) { if flagCassVersion.Before(3, 0, 0) { - return + t.Skip("The Cassandra version is too old") } session := createSession(t) defer session.Close() @@ -2333,14 +2333,19 @@ func TestMaterializedViewMetadata(t *testing.T) { expectedChunkLengthInKB := "16" expectedDCLocalReadRepairChance := float64(0) expectedSpeculativeRetry := "99p" + expectedAdditionalWritePolicy := "99p" + expectedReadRepair := "BLOCKING" if flagCassVersion.Before(4, 0, 0) { expectedChunkLengthInKB = "64" expectedDCLocalReadRepairChance = 0.1 expectedSpeculativeRetry = "99PERCENTILE" + expectedReadRepair = "" + expectedAdditionalWritePolicy = "" } expectedView1 := MaterializedViewMetadata{ Keyspace: "gocql_test", Name: "view_view", + AdditionalWritePolicy: expectedAdditionalWritePolicy, baseTableName: "view_table", BloomFilterFpChance: 0.01, Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"}, @@ -2352,12 +2357,17 @@ func TestMaterializedViewMetadata(t *testing.T) { DefaultTimeToLive: 0, Extensions: map[string]string{}, GcGraceSeconds: 864000, - IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0, - SpeculativeRetry: expectedSpeculativeRetry, + IncludeAllColumns: false, MaxIndexInterval: 2048, + MemtableFlushPeriodInMs: 0, + MinIndexInterval: 128, + ReadRepair: expectedReadRepair, + ReadRepairChance: 0, + SpeculativeRetry: expectedSpeculativeRetry, } expectedView2 := MaterializedViewMetadata{ Keyspace: "gocql_test", Name: "view_view2", + AdditionalWritePolicy: expectedAdditionalWritePolicy, baseTableName: "view_table2", BloomFilterFpChance: 0.01, Caching: map[string]string{"keys": "ALL", "rows_per_partition": "NONE"}, @@ -2369,8 +2379,13 @@ func TestMaterializedViewMetadata(t *testing.T) { DefaultTimeToLive: 0, Extensions: map[string]string{}, GcGraceSeconds: 864000, - IncludeAllColumns: false, MaxIndexInterval: 2048, MemtableFlushPeriodInMs: 0, MinIndexInterval: 128, ReadRepairChance: 0, - SpeculativeRetry: expectedSpeculativeRetry, + IncludeAllColumns: false, + MaxIndexInterval: 2048, + MemtableFlushPeriodInMs: 0, + MinIndexInterval: 128, + ReadRepair: expectedReadRepair, + ReadRepairChance: 0, + SpeculativeRetry: expectedSpeculativeRetry, } expectedView1.BaseTableId = materializedViews[0].BaseTableId diff --git a/metadata.go b/metadata.go index 6eb798f8a..1d165003f 100644 --- a/metadata.go +++ b/metadata.go @@ -122,6 +122,7 @@ type ViewMetadata struct { type MaterializedViewMetadata struct { Keyspace string Name string + AdditionalWritePolicy string BaseTableId UUID BaseTable *TableMetadata BloomFilterFpChance float64 @@ -139,7 +140,8 @@ type MaterializedViewMetadata struct { MaxIndexInterval int MemtableFlushPeriodInMs int MinIndexInterval int - ReadRepairChance float64 + ReadRepair string // Only present in Cassandra 4.0+ + ReadRepairChance float64 // Note: Cassandra 4.0 removed ReadRepairChance and added ReadRepair instead SpeculativeRetry string baseTableName string @@ -999,69 +1001,201 @@ func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, er return views, nil } +func bytesMapToStringsMap(byteData map[string][]byte) map[string]string { + extensions := make(map[string]string, len(byteData)) + for key, rowByte := range byteData { + extensions[key] = string(rowByte) + } + + return extensions +} + +func materializedViewMetadataFromMap(currentObject map[string]interface{}, materializedView *MaterializedViewMetadata) error { + const errorMessage = "gocql.materializedViewMetadataFromMap failed to read column %s" + var ok bool + for key, value := range currentObject { + switch key { + case "keyspace_name": + materializedView.Keyspace, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "view_name": + materializedView.Name, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "additional_write_policy": + materializedView.AdditionalWritePolicy, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "base_table_id": + materializedView.BaseTableId, ok = value.(UUID) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "base_table_name": + materializedView.baseTableName, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "bloom_filter_fp_chance": + materializedView.BloomFilterFpChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "caching": + materializedView.Caching, ok = value.(map[string]string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "comment": + materializedView.Comment, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "compaction": + materializedView.Compaction, ok = value.(map[string]string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "compression": + materializedView.Compression, ok = value.(map[string]string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "crc_check_chance": + materializedView.CrcCheckChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "dclocal_read_repair_chance": + materializedView.DcLocalReadRepairChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "default_time_to_live": + materializedView.DefaultTimeToLive, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "extensions": + byteData, ok := value.(map[string][]byte) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + materializedView.Extensions = bytesMapToStringsMap(byteData) + + case "gc_grace_seconds": + materializedView.GcGraceSeconds, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "id": + materializedView.Id, ok = value.(UUID) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "include_all_columns": + materializedView.IncludeAllColumns, ok = value.(bool) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "max_index_interval": + materializedView.MaxIndexInterval, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "memtable_flush_period_in_ms": + materializedView.MemtableFlushPeriodInMs, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "min_index_interval": + materializedView.MinIndexInterval, ok = value.(int) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "read_repair": + materializedView.ReadRepair, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "read_repair_chance": + materializedView.ReadRepairChance, ok = value.(float64) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + case "speculative_retry": + materializedView.SpeculativeRetry, ok = value.(string) + if !ok { + return fmt.Errorf(errorMessage, key) + } + + } + } + return nil +} + +func parseSystemSchemaViews(iter *Iter) ([]MaterializedViewMetadata, error) { + var materializedViews []MaterializedViewMetadata + s, err := iter.SliceMap() + if err != nil { + return nil, err + } + + for _, row := range s { + var materializedView MaterializedViewMetadata + err = materializedViewMetadataFromMap(row, &materializedView) + if err != nil { + return nil, err + } + + materializedViews = append(materializedViews, materializedView) + } + + return materializedViews, nil +} + func getMaterializedViewsMetadata(session *Session, keyspaceName string) ([]MaterializedViewMetadata, error) { if !session.useSystemSchema { return nil, nil } var tableName = "system_schema.views" stmt := fmt.Sprintf(` - SELECT - view_name, - base_table_id, - base_table_name, - bloom_filter_fp_chance, - caching, - comment, - compaction, - compression, - crc_check_chance, - dclocal_read_repair_chance, - default_time_to_live, - extensions, - gc_grace_seconds, - id, - include_all_columns, - max_index_interval, - memtable_flush_period_in_ms, - min_index_interval, - read_repair_chance, - speculative_retry + SELECT * FROM %s WHERE keyspace_name = ?`, tableName) var materializedViews []MaterializedViewMetadata - rows := session.control.query(stmt, keyspaceName).Scanner() - for rows.Next() { - materializedView := MaterializedViewMetadata{Keyspace: keyspaceName} - err := rows.Scan(&materializedView.Name, - &materializedView.BaseTableId, - &materializedView.baseTableName, - &materializedView.BloomFilterFpChance, - &materializedView.Caching, - &materializedView.Comment, - &materializedView.Compaction, - &materializedView.Compression, - &materializedView.CrcCheckChance, - &materializedView.DcLocalReadRepairChance, - &materializedView.DefaultTimeToLive, - &materializedView.Extensions, - &materializedView.GcGraceSeconds, - &materializedView.Id, - &materializedView.IncludeAllColumns, - &materializedView.MaxIndexInterval, - &materializedView.MemtableFlushPeriodInMs, - &materializedView.MinIndexInterval, - &materializedView.ReadRepairChance, - &materializedView.SpeculativeRetry, - ) - if err != nil { - return nil, err - } - materializedViews = append(materializedViews, materializedView) - } + iter := session.control.query(stmt, keyspaceName) - if err := rows.Err(); err != nil { + materializedViews, err := parseSystemSchemaViews(iter) + if err != nil { return nil, err }