Skip to content

Commit

Permalink
refactor: modify 'if err =' to 'if err :='
Browse files Browse the repository at this point in the history
  • Loading branch information
lianxmfor committed Feb 24, 2022
1 parent 12f7a60 commit ca86a88
Show file tree
Hide file tree
Showing 20 changed files with 40 additions and 40 deletions.
4 changes: 2 additions & 2 deletions internal/database/offline/sqlutil/export_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func prepareEntityTable(ctx context.Context, dbOpt dbutil.DBOpt, opt offline.Exp
%s
);
`, qtTableName, strings.Join(columnDefs, ",\n"))
if err = dbOpt.ExecContext(ctx, schema); err != nil {
if err := dbOpt.ExecContext(ctx, schema); err != nil {
return "", err
}

Expand All @@ -173,7 +173,7 @@ func prepareEntityTable(ctx context.Context, dbOpt dbutil.DBOpt, opt offline.Exp
if err != nil {
return "", err
}
if err = dbOpt.ExecContext(ctx, query, args...); err != nil {
if err := dbOpt.ExecContext(ctx, query, args...); err != nil {
return "", err
}

Expand Down
4 changes: 2 additions & 2 deletions internal/database/offline/sqlutil/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func joinOneGroup(ctx context.Context, dbOpt dbutil.DBOpt, opt joinOneGroupOpt)
if err != nil {
return nil, err
}
if err = dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil {
if err := dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func joinOneGroup(ctx context.Context, dbOpt dbutil.DBOpt, opt joinOneGroupOpt)
if err != nil {
return nil, err
}
if err = dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil {
if err := dbOpt.ExecContext(ctx, query, r.MinRevision, r.MaxRevision); err != nil {
return nil, err
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/database/offline/sqlutil/join_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ func prepareJoinedTable(
);
`
schema = fmt.Sprintf(schema, qtTableName, strings.Join(columnDefs, ",\n"))
if err = dbOpt.ExecContext(ctx, schema); err != nil {
if err := dbOpt.ExecContext(ctx, schema); err != nil {
return "", err
}

qt := dbutil.QuoteFn(dbOpt.Backend)
// Step 2: create index on table joined_
if supportIndex(dbOpt.Backend) {
index := fmt.Sprintf(`CREATE INDEX %s ON %s (unix_milli, entity_key)`, qt("idx_"+tableName), qtTableName)
if err = dbOpt.ExecContext(ctx, index); err != nil {
if err := dbOpt.ExecContext(ctx, index); err != nil {
return "", err
}
}
Expand Down Expand Up @@ -141,19 +141,19 @@ func prepareEntityRowsTable(ctx context.Context,
);
`, qtTableName, strings.Join(columnDefs, ",\n"))

if err = dbOpt.ExecContext(ctx, schema); err != nil {
if err := dbOpt.ExecContext(ctx, schema); err != nil {
return "", err
}

// Step 2: populate dataset to the table
if err = insertEntityRows(ctx, dbOpt, tableName, entityRows, valueNames); err != nil {
if err := insertEntityRows(ctx, dbOpt, tableName, entityRows, valueNames); err != nil {
return "", err
}

// Step 3: create index on table entity_rows
if supportIndex(dbOpt.Backend) {
index := fmt.Sprintf(`CREATE INDEX idx_%s ON %s (unix_milli, entity_key)`, tableName, tableName)
if err = dbOpt.ExecContext(ctx, index); err != nil {
if err := dbOpt.ExecContext(ctx, index); err != nil {
return "", err
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/database/offline/sqlutil/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func Snapshot(ctx context.Context, dbOpt dbutil.DBOpt, opt offline.SnapshotOpt)
if err != nil {
return err
}
if err = dbOpt.ExecContext(ctx, query); err != nil {
if err := dbOpt.ExecContext(ctx, query); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/database/online/dynamodb/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (db *DB) MultiGet(ctx context.Context, opt online.MultiGetOpt) (map[string]
entityName: entityKeyValue,
})
if len(keys) == BatchGetItemCapacity {
if err = batchGetItem(ctx, db, keys, tableName, entityName, opt.Features, res); err != nil {
if err := batchGetItem(ctx, db, keys, tableName, entityName, opt.Features, res); err != nil {
return nil, err
}
keys = make([]map[string]types.AttributeValue, 0, BatchGetItemCapacity)
Expand Down
2 changes: 1 addition & 1 deletion internal/database/online/dynamodb/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (db *DB) Import(ctx context.Context, opt online.ImportOpt) error {
},
})
if len(items) == BatchWriteItemCapacity {
if err = batchWrite(ctx, db, tableName, items); err != nil {
if err := batchWrite(ctx, db, tableName, items); err != nil {
return err
}
items = make([]types.WriteRequest, 0, BatchWriteItemCapacity)
Expand Down
6 changes: 3 additions & 3 deletions internal/database/online/sqlutil/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ func Import(ctx context.Context, db *sqlx.DB, opt online.ImportOpt, backend type
records = append(records, record.Record)

if len(records) == importBatchSize {
if err = dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil {
if err := dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil {
return err
}
records = make([]interface{}, 0, importBatchSize)
}
}
if err = dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil {
if err := dbutil.InsertRecordsToTableTx(tx, ctx, tableName, records, columns, backend); err != nil {
return err
}

if opt.Group.Category == types.CategoryStream {
streamTableName := dbutil.OnlineStreamTableName(opt.Group.ID)
if err = PurgeTx(ctx, tx, streamTableName, backend); err != nil {
if err := PurgeTx(ctx, tx, streamTableName, backend); err != nil {
return err
}
query := fmt.Sprintf(`ALTER TABLE %s RENAME TO %s;`, tableName, streamTableName)
Expand Down
2 changes: 1 addition & 1 deletion oomcli/cmd/edit_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func writeEntitiesToTempFile(ctx context.Context, oomStore *oomstore.OomStore, e
_ = tempFile.Close()
}()

if err = outputEntity(ctx, entities, outputParams{
if err := outputEntity(ctx, entities, outputParams{
writer: tempFile,
oomStore: oomStore,
outputOpt: YAML,
Expand Down
2 changes: 1 addition & 1 deletion oomcli/cmd/edit_feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func writeFeaturesToTempFile(features types.FeatureList) (string, error) {
_ = tempFile.Close()
}()

if err = outputFeature(features, outputParams{
if err := outputFeature(features, outputParams{
writer: tempFile,
outputOpt: YAML,
}); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion oomcli/cmd/edit_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func writeGroupsToTempFile(ctx context.Context, oomStore *oomstore.OomStore, gro
_ = tempFile.Close()
}()

if err = outputGroup(ctx, groups, outputParams{
if err := outputGroup(ctx, groups, outputParams{
writer: tempFile,
oomStore: oomStore,
outputOpt: YAML,
Expand Down
4 changes: 2 additions & 2 deletions pkg/oomstore/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (s *OomStore) Apply(ctx context.Context, opt apply.ApplyOpt) error {
}

onlineJobs := make([]func() error, 0)
if err = s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error {
if err := s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error {
// apply entity
for _, entity := range stage.NewEntities {
if err2 := s.applyEntity(c, tx, entity); err2 != nil {
Expand Down Expand Up @@ -59,7 +59,7 @@ func (s *OomStore) Apply(ctx context.Context, opt apply.ApplyOpt) error {
}

for _, job := range onlineJobs {
if err = job(); err != nil {
if err := job(); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/oomstore/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *OomStore) ChannelExport(ctx context.Context, opt types.ChannelExportOpt
return nil, errdefs.Errorf("group %s no feature values up to %d, use a later timestamp", group.Name, opt.UnixMilli)
} else {
if revision.SnapshotTable == "" {
if err = s.Snapshot(ctx, group.Name); err != nil {
if err := s.Snapshot(ctx, group.Name); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/oomstore/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *OomStore) CreateFeature(ctx context.Context, opt types.CreateFeatureOpt
if err != nil {
return 0, err
}
if err = s.online.CreateTable(ctx, online.CreateTableOpt{
if err := s.online.CreateTable(ctx, online.CreateTableOpt{
EntityName: group.Entity.Name,
TableName: dbutil.OnlineStreamTableName(group.ID),
Features: features,
Expand Down
4 changes: 2 additions & 2 deletions pkg/oomstore/import_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *OomStore) csvReaderImportBatch(ctx context.Context, opt *importOpt, dat
if opt.Revision != nil {
revision = *opt.Revision
}
if err = s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{
if err := s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{
RevisionID: newRevisionID,
NewRevision: &revision,
NewSnapshotTable: &snapshotTableName,
Expand All @@ -77,7 +77,7 @@ func (s *OomStore) tableLinkImportBatch(ctx context.Context, opt *importOpt, dat
if err != nil {
return 0, err
}
if err = validateTableSchema(tableSchema, opt.features); err != nil {
if err := validateTableSchema(tableSchema, opt.features); err != nil {
return 0, err
}
var revision int64
Expand Down
8 changes: 4 additions & 4 deletions pkg/oomstore/import_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ func (s *OomStore) tableLinkImportStream(ctx context.Context, opt *importOpt, da
return err
}
// validation
if err = validateTableSchema(tableSchema, opt.features); err != nil {
if err := validateTableSchema(tableSchema, opt.features); err != nil {
return err
}
if err = s.validateRevisions(ctx, opt.group.ID, tableSchema); err != nil {
if err := s.validateRevisions(ctx, opt.group.ID, tableSchema); err != nil {
return err
}

Expand Down Expand Up @@ -119,11 +119,11 @@ func (s *OomStore) pushStreamingRecords(ctx context.Context, records []types.Str
return err
}

if err = s.createRevisionAndCdcTable(ctx, group.ID, revision); err != nil {
if err := s.createRevisionAndCdcTable(ctx, group.ID, revision); err != nil {
return err
}
// push data to new offline stream cdc table
if err = s.offline.Push(ctx, pushOpt); err != nil {
if err := s.offline.Push(ctx, pushOpt); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/oomstore/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *OomStore) buildRevisionRanges(ctx context.Context, group *types.Group)
})
for _, revision := range revisions {
if revision.SnapshotTable == "" {
if err = s.Snapshot(ctx, group.Name); err != nil {
if err := s.Snapshot(ctx, group.Name); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/oomstore/push_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (p *PushProcessor) pushToOffline(ctx context.Context, s *OomStore, groupID
if err != nil {
return err
}
if err = s.pushStreamingRecords(ctx, b.records, entity.Name, group, features); err != nil {
if err := s.pushStreamingRecords(ctx, b.records, entity.Name, group, features); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/oomstore/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *OomStore) createSnapshotAndCdcTable(ctx context.Context, revisionID int
return err
}

if err = s.offline.CreateTable(ctx, offline.CreateTableOpt{
if err := s.offline.CreateTable(ctx, offline.CreateTableOpt{
TableName: snapshotTableName,
EntityName: revision.Group.Entity.Name,
Features: features,
Expand All @@ -117,7 +117,7 @@ func (s *OomStore) createSnapshotAndCdcTable(ctx context.Context, revisionID int
var cdcTable *string
if revision.Group.Category == types.CategoryStream {
tableName := dbutil.OfflineStreamCdcTableName(revision.GroupID, revision.Revision)
if err = s.offline.CreateTable(ctx, offline.CreateTableOpt{
if err := s.offline.CreateTable(ctx, offline.CreateTableOpt{
TableName: tableName,
EntityName: revision.Group.Entity.Name,
Features: features,
Expand Down
4 changes: 2 additions & 2 deletions pkg/oomstore/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func (s *OomStore) Snapshot(ctx context.Context, groupName string) error {
continue
}
tableName := dbutil.OfflineStreamSnapshotTableName(group.ID, revision.Revision)
if err = s.offline.Snapshot(ctx, offline.SnapshotOpt{
if err := s.offline.Snapshot(ctx, offline.SnapshotOpt{
Group: *group,
Features: features,
Revision: revisions[i].Revision,
PrevRevision: revisions[i-1].Revision,
}); err != nil {
return err
}
if err = s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{
if err := s.metadata.UpdateRevision(ctx, metadata.UpdateRevisionOpt{
RevisionID: revision.ID,
NewSnapshotTable: &tableName,
}); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/oomstore/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *OomStore) syncBatch(ctx context.Context, opt types.SyncOpt, group *type
return err
}

if err = s.online.Import(ctx, online.ImportOpt{
if err := s.online.Import(ctx, online.ImportOpt{
Group: *group,
Features: features,
ExportStream: exportResult.Data,
Expand All @@ -64,19 +64,19 @@ func (s *OomStore) syncBatch(ctx context.Context, opt types.SyncOpt, group *type
return err
}

if err = s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error {
if err := s.metadata.WithTransaction(ctx, func(c context.Context, tx metadata.DBStore) error {
// Update the online revision id of the feature group upon sync success
if err2 := tx.UpdateGroup(c, metadata.UpdateGroupOpt{
if err := tx.UpdateGroup(c, metadata.UpdateGroupOpt{
GroupID: group.ID,
NewOnlineRevisionID: &revision.ID,
}); err2 != nil {
return err2
}); err != nil {
return err
}
if !revision.Anchored {
newRevision := time.Now().UnixMilli()
newChored := true
// Update revision timestamp using current timestamp
if err = tx.UpdateRevision(c, metadata.UpdateRevisionOpt{
if err := tx.UpdateRevision(c, metadata.UpdateRevisionOpt{
RevisionID: revision.ID,
NewRevision: &newRevision,
NewAnchored: &newChored,
Expand Down

0 comments on commit ca86a88

Please sign in to comment.