diff --git a/internal/database/offline/sqlutil/export_helper.go b/internal/database/offline/sqlutil/export_helper.go index bc2a4838f..a62e0e361 100644 --- a/internal/database/offline/sqlutil/export_helper.go +++ b/internal/database/offline/sqlutil/export_helper.go @@ -156,7 +156,7 @@ func prepareEntityTable(ctx context.Context, dbOpt dbutil.DBOpt, opt offline.Exp %s ); `, qtTableName, strings.Join(columnDefs, ",\n")) - if err = dbOpt.ExecContext(ctx, schema); err != nil { + if err := dbOpt.ExecContext(ctx, schema); err != nil { return "", err } @@ -173,7 +173,7 @@ func prepareEntityTable(ctx context.Context, dbOpt dbutil.DBOpt, opt offline.Exp if err != nil { return "", err } - if err = dbOpt.ExecContext(ctx, query, args...); err != nil { + if err := dbOpt.ExecContext(ctx, query, args...); err != nil { return "", err } diff --git a/internal/database/offline/sqlutil/join.go b/internal/database/offline/sqlutil/join.go index 4116434ba..b5c32b1fd 100644 --- a/internal/database/offline/sqlutil/join.go +++ b/internal/database/offline/sqlutil/join.go @@ -163,7 +163,7 @@ func joinOneGroup(ctx context.Context, dbOpt dbutil.DBOpt, opt joinOneGroupOpt) if err != nil { return nil, err } - if err = dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil { + if err := dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil { return nil, err } } @@ -192,7 +192,7 @@ func joinOneGroup(ctx context.Context, dbOpt dbutil.DBOpt, opt joinOneGroupOpt) if err != nil { return nil, err } - if err = dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil { + if err := dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil { return nil, err } } diff --git a/internal/database/offline/sqlutil/join_helper.go b/internal/database/offline/sqlutil/join_helper.go index f17382d67..0c8d83a2c 100644 --- a/internal/database/offline/sqlutil/join_helper.go +++ b/internal/database/offline/sqlutil/join_helper.go @@ -104,7 +104,7 @@ func prepareJoinedTable( ); ` schema = fmt.Sprintf(schema, qtTableName, strings.Join(columnDefs, ",\n")) - if err = dbOpt.ExecContext(ctx, schema); err != nil { + if err := dbOpt.ExecContext(ctx, schema); err != nil { return "", err } @@ -112,7 +112,7 @@ func prepareJoinedTable( // Step 2: create index on table joined_ if supportIndex(dbOpt.Backend) { index := fmt.Sprintf(`CREATE INDEX %s ON %s (unix_milli, entity_key)`, qt("idx_"+tableName), qtTableName) - if err = dbOpt.ExecContext(ctx, index); err != nil { + if err := dbOpt.ExecContext(ctx, index); err != nil { return "", err } } @@ -141,19 +141,19 @@ func prepareEntityRowsTable(ctx context.Context, ); `, qtTableName, strings.Join(columnDefs, ",\n")) - if err = dbOpt.ExecContext(ctx, schema); err != nil { + if err := dbOpt.ExecContext(ctx, schema); err != nil { return "", err } // Step 2: populate dataset to the table - if err = insertEntityRows(ctx, dbOpt, tableName, entityRows, valueNames); err != nil { + if err := insertEntityRows(ctx, dbOpt, tableName, entityRows, valueNames); err != nil { return "", err } // Step 3: create index on table entity_rows if supportIndex(dbOpt.Backend) { index := fmt.Sprintf(`CREATE INDEX idx_%s ON %s (unix_milli, entity_key)`, tableName, tableName) - if err = dbOpt.ExecContext(ctx, index); err != nil { + if err := dbOpt.ExecContext(ctx, index); err != nil { return "", err } } diff --git a/internal/database/offline/sqlutil/snapshot.go b/internal/database/offline/sqlutil/snapshot.go index c6d8e4e72..965ec725f 100644 --- a/internal/database/offline/sqlutil/snapshot.go +++ b/internal/database/offline/sqlutil/snapshot.go @@ -91,7 +91,7 @@ func Snapshot(ctx context.Context, dbOpt dbutil.DBOpt, opt offline.SnapshotOpt) if err != nil { return err } - if err = dbOpt.ExecContext(ctx, query); err != nil { + if err := dbOpt.ExecContext(ctx, query); err != nil { return err } diff --git a/internal/database/online/dynamodb/get.go b/internal/database/online/dynamodb/get.go index cdb1c8f83..a9447ba97 100644 --- a/internal/database/online/dynamodb/get.go +++ b/internal/database/online/dynamodb/get.go @@ -75,7 +75,7 @@ func (db *DB) MultiGet(ctx context.Context, opt online.MultiGetOpt) (map[string] entityName: entityKeyValue, }) if len(keys) == BatchGetItemCapacity { - if err = batchGetItem(ctx, db, keys, tableName, entityName, opt.Features, res); err != nil { + if err := batchGetItem(ctx, db, keys, tableName, entityName, opt.Features, res); err != nil { return nil, err } keys = make([]map[string]types.AttributeValue, 0, BatchGetItemCapacity) diff --git a/internal/database/online/dynamodb/import.go b/internal/database/online/dynamodb/import.go index 0cc1d70b0..7adb78dac 100644 --- a/internal/database/online/dynamodb/import.go +++ b/internal/database/online/dynamodb/import.go @@ -60,7 +60,7 @@ func (db *DB) Import(ctx context.Context, opt online.ImportOpt) error { }, }) if len(items) == BatchWriteItemCapacity { - if err = batchWrite(ctx, db, tableName, items); err != nil { + if err := batchWrite(ctx, db, tableName, items); err != nil { return err } items = make([]types.WriteRequest, 0, BatchWriteItemCapacity) diff --git a/internal/database/online/sqlutil/import.go b/internal/database/online/sqlutil/import.go index c29245e39..da148ee06 100644 --- a/internal/database/online/sqlutil/import.go +++ b/internal/database/online/sqlutil/import.go @@ -52,19 +52,19 @@ func Import(ctx context.Context, db *sqlx.DB, opt online.ImportOpt, backend type records = append(records, record.Record) if len(records) == importBatchSize { - if err = dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil { + if err := dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil { return err } records = make([]interface{}, 0, importBatchSize) } } - if err = dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil { + if err := dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil { return err } if opt.Group.Category == types.CategoryStream { streamTableName := dbutil.OnlineStreamTableName(opt.Group.ID) - if err = PurgeTx(ctx, tx, streamTableName, backend); err != nil { + if err := PurgeTx(ctx, tx, streamTableName, backend); err != nil { return err } query := fmt.Sprintf(`ALTER TABLE %s RENAME TO %s;`, tableName, streamTableName) diff --git a/oomcli/cmd/edit_entity.go b/oomcli/cmd/edit_entity.go index 133628f24..d6a8ff44f 100644 --- a/oomcli/cmd/edit_entity.go +++ b/oomcli/cmd/edit_entity.go @@ -65,7 +65,7 @@ func writeEntitiesToTempFile(ctx context.Context, oomStore *oomstore.OomStore, e _ = tempFile.Close() }() - if err = outputEntity(ctx, entities, outputParams{ + if err := outputEntity(ctx, entities, outputParams{ writer: tempFile, oomStore: oomStore, outputOpt: YAML, diff --git a/oomcli/cmd/edit_feature.go b/oomcli/cmd/edit_feature.go index 93331f7e5..6dbfdf714 100644 --- a/oomcli/cmd/edit_feature.go +++ b/oomcli/cmd/edit_feature.go @@ -71,7 +71,7 @@ func writeFeaturesToTempFile(features types.FeatureList) (string, error) { _ = tempFile.Close() }() - if err = outputFeature(features, outputParams{ + if err := outputFeature(features, outputParams{ writer: tempFile, outputOpt: YAML, }); err != nil { diff --git a/oomcli/cmd/edit_group.go b/oomcli/cmd/edit_group.go index f08b1e9c7..fc15e7808 100644 --- a/oomcli/cmd/edit_group.go +++ b/oomcli/cmd/edit_group.go @@ -75,7 +75,7 @@ func writeGroupsToTempFile(ctx context.Context, oomStore *oomstore.OomStore, gro _ = tempFile.Close() }() - if err = outputGroup(ctx, groups, outputParams{ + if err := outputGroup(ctx, groups, outputParams{ writer: tempFile, oomStore: oomStore, outputOpt: YAML, diff --git a/pkg/oomstore/apply.go b/pkg/oomstore/apply.go index 47a0b9035..e587be6ec 100644 --- a/pkg/oomstore/apply.go +++ b/pkg/oomstore/apply.go @@ -24,7 +24,7 @@ func (s *OomStore) Apply(ctx context.Context, opt apply.ApplyOpt) error { } onlineJobs := make([]func() error, 0) - if err = s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error { + if err := s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error { // apply entity for _, entity := range stage.NewEntities { if err2 := s.applyEntity(c, tx, entity); err2 != nil { @@ -59,7 +59,7 @@ func (s *OomStore) Apply(ctx context.Context, opt apply.ApplyOpt) error { } for _, job := range onlineJobs { - if err = job(); err != nil { + if err := job(); err != nil { return err } } diff --git a/pkg/oomstore/export.go b/pkg/oomstore/export.go index a615d92b2..dadfe5b9a 100644 --- a/pkg/oomstore/export.go +++ b/pkg/oomstore/export.go @@ -62,7 +62,7 @@ func (s *OomStore) ChannelExport(ctx context.Context, opt types.ChannelExportOpt return nil, errdefs.Errorf("group %s no feature values up to %d, use a later timestamp", group.Name, opt.UnixMilli) } else { if revision.SnapshotTable == "" { - if err = s.Snapshot(ctx, group.Name); err != nil { + if err := s.Snapshot(ctx, group.Name); err != nil { return nil, err } } diff --git a/pkg/oomstore/feature.go b/pkg/oomstore/feature.go index 2685ab685..57d16c6a7 100644 --- a/pkg/oomstore/feature.go +++ b/pkg/oomstore/feature.go @@ -111,7 +111,7 @@ func (s *OomStore) CreateFeature(ctx context.Context, opt types.CreateFeatureOpt if err != nil { return 0, err } - if err = s.online.CreateTable(ctx, online.CreateTableOpt{ + if err := s.online.CreateTable(ctx, online.CreateTableOpt{ EntityName: group.Entity.Name, TableName: dbutil.OnlineStreamTableName(group.ID), Features: features, diff --git a/pkg/oomstore/import_batch.go b/pkg/oomstore/import_batch.go index c8e576e63..d4f59024b 100644 --- a/pkg/oomstore/import_batch.go +++ b/pkg/oomstore/import_batch.go @@ -55,7 +55,7 @@ func (s *OomStore) csvReaderImportBatch(ctx context.Context, opt *importOpt, dat if opt.Revision != nil { revision = *opt.Revision } - if err = s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{ + if err := s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{ RevisionID: newRevisionID, NewRevision: &revision, NewSnapshotTable: &snapshotTableName, @@ -77,7 +77,7 @@ func (s *OomStore) tableLinkImportBatch(ctx context.Context, opt *importOpt, dat if err != nil { return 0, err } - if err = validateTableSchema(tableSchema, opt.features); err != nil { + if err := validateTableSchema(tableSchema, opt.features); err != nil { return 0, err } var revision int64 diff --git a/pkg/oomstore/import_stream.go b/pkg/oomstore/import_stream.go index 29393b5df..9fa9c98a8 100644 --- a/pkg/oomstore/import_stream.go +++ b/pkg/oomstore/import_stream.go @@ -75,10 +75,10 @@ func (s *OomStore) tableLinkImportStream(ctx context.Context, opt *importOpt, da return err } // validation - if err = validateTableSchema(tableSchema, opt.features); err != nil { + if err := validateTableSchema(tableSchema, opt.features); err != nil { return err } - if err = s.validateRevisions(ctx, opt.group.ID, tableSchema); err != nil { + if err := s.validateRevisions(ctx, opt.group.ID, tableSchema); err != nil { return err } @@ -119,11 +119,11 @@ func (s *OomStore) pushStreamingRecords(ctx context.Context, records []types.Str return err } - if err = s.createRevisionAndCdcTable(ctx, group.ID, revision); err != nil { + if err := s.createRevisionAndCdcTable(ctx, group.ID, revision); err != nil { return err } // push data to new offline stream cdc table - if err = s.offline.Push(ctx, pushOpt); err != nil { + if err := s.offline.Push(ctx, pushOpt); err != nil { return err } } diff --git a/pkg/oomstore/join.go b/pkg/oomstore/join.go index 3b52354be..c01d34d39 100644 --- a/pkg/oomstore/join.go +++ b/pkg/oomstore/join.go @@ -114,7 +114,7 @@ func (s *OomStore) buildRevisionRanges(ctx context.Context, group *types.Group) }) for _, revision := range revisions { if revision.SnapshotTable == "" { - if err = s.Snapshot(ctx, group.Name); err != nil { + if err := s.Snapshot(ctx, group.Name); err != nil { return nil, err } } diff --git a/pkg/oomstore/push_processor.go b/pkg/oomstore/push_processor.go index 062e97bbd..35f26ff96 100644 --- a/pkg/oomstore/push_processor.go +++ b/pkg/oomstore/push_processor.go @@ -138,7 +138,7 @@ func (p *PushProcessor) pushToOffline(ctx context.Context, s *OomStore, groupID if err != nil { return err } - if err = s.pushStreamingRecords(ctx, b.records, entity.Name, group, features); err != nil { + if err := s.pushStreamingRecords(ctx, b.records, entity.Name, group, features); err != nil { return err } diff --git a/pkg/oomstore/revision.go b/pkg/oomstore/revision.go index 49a9e9954..cbbef7d1e 100644 --- a/pkg/oomstore/revision.go +++ b/pkg/oomstore/revision.go @@ -105,7 +105,7 @@ func (s *OomStore) createSnapshotAndCdcTable(ctx context.Context, revisionID int return err } - if err = s.offline.CreateTable(ctx, offline.CreateTableOpt{ + if err := s.offline.CreateTable(ctx, offline.CreateTableOpt{ TableName: snapshotTableName, EntityName: revision.Group.Entity.Name, Features: features, @@ -117,7 +117,7 @@ func (s *OomStore) createSnapshotAndCdcTable(ctx context.Context, revisionID int var cdcTable *string if revision.Group.Category == types.CategoryStream { tableName := dbutil.OfflineStreamCdcTableName(revision.GroupID, revision.Revision) - if err = s.offline.CreateTable(ctx, offline.CreateTableOpt{ + if err := s.offline.CreateTable(ctx, offline.CreateTableOpt{ TableName: tableName, EntityName: revision.Group.Entity.Name, Features: features, diff --git a/pkg/oomstore/snapshot.go b/pkg/oomstore/snapshot.go index 07215a70f..8c1e7f0a5 100644 --- a/pkg/oomstore/snapshot.go +++ b/pkg/oomstore/snapshot.go @@ -41,7 +41,7 @@ func (s *OomStore) Snapshot(ctx context.Context, groupName string) error { continue } tableName := dbutil.OfflineStreamSnapshotTableName(group.ID, revision.Revision) - if err = s.offline.Snapshot(ctx, offline.SnapshotOpt{ + if err := s.offline.Snapshot(ctx, offline.SnapshotOpt{ Group: *group, Features: features, Revision: revisions[i].Revision, @@ -49,7 +49,7 @@ func (s *OomStore) Snapshot(ctx context.Context, groupName string) error { }); err != nil { return err } - if err = s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{ + if err := s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{ RevisionID: revision.ID, NewSnapshotTable: &tableName, }); err != nil { diff --git a/pkg/oomstore/sync.go b/pkg/oomstore/sync.go index 973b5cd4c..f0b042854 100644 --- a/pkg/oomstore/sync.go +++ b/pkg/oomstore/sync.go @@ -55,7 +55,7 @@ func (s *OomStore) syncBatch(ctx context.Context, opt types.SyncOpt, group *type return err } - if err = s.online.Import(ctx, online.ImportOpt{ + if err := s.online.Import(ctx, online.ImportOpt{ Group: *group, Features: features, ExportStream: exportResult.Data, @@ -64,19 +64,19 @@ func (s *OomStore) syncBatch(ctx context.Context, opt types.SyncOpt, group *type return err } - if err = s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error { + if err := s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error { // Update the online revision id of the feature group upon sync success - if err2 := tx.UpdateGroup(c, metadata.UpdateGroupOpt{ + if err := tx.UpdateGroup(c, metadata.UpdateGroupOpt{ GroupID: group.ID, NewOnlineRevisionID: &revision.ID, - }); err2 != nil { - return err2 + }); err != nil { + return err } if !revision.Anchored { newRevision := time.Now().UnixMilli() newChored := true // Update revision timestamp using current timestamp - if err = tx.UpdateRevision(c, metadata.UpdateRevisionOpt{ + if err := tx.UpdateRevision(c, metadata.UpdateRevisionOpt{ RevisionID: revision.ID, NewRevision: &newRevision, NewAnchored: &newChored,