Skip to content

Commit

Permalink
Refactor MerkleDB.commitChanges (#2921)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Apr 8, 2024
1 parent b9033b0 commit 0ba0ace
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 36 deletions.
84 changes: 51 additions & 33 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,10 @@ func (db *merkleDB) commitBatch(ops []database.BatchOp) error {
return view.commitToDB(context.Background())
}

// commitChanges commits the changes in [trieToCommit] to [db].
// commitView commits the changes in [trieToCommit] to [db].
// Assumes [trieToCommit]'s node IDs have been calculated.
// Assumes [db.commitLock] is held.
func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error {
func (db *merkleDB) commitView(ctx context.Context, trieToCommit *view) error {
db.lock.Lock()
defer db.lock.Unlock()

Expand All @@ -949,7 +949,7 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error
}

changes := trieToCommit.changes
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitChanges", oteltrace.WithAttributes(
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitView", oteltrace.WithAttributes(
attribute.Int("nodesChanged", len(changes.nodes)),
attribute.Int("valuesChanged", len(changes.values)),
))
Expand All @@ -965,8 +965,46 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error
return nil
}

currentValueNodeBatch := db.valueNodeDB.NewBatch()
_, nodesSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.writeNodes")
valueNodeBatch := db.valueNodeDB.NewBatch()
if err := db.applyChanges(ctx, valueNodeBatch, changes); err != nil {
return err
}

if err := db.commitValueChanges(ctx, valueNodeBatch); err != nil {
return err
}

db.history.record(changes)

// Update root in database.
db.root = changes.rootChange.after
db.rootID = changes.rootID
return nil
}

// moveChildViewsToDB removes any child views from the trieToCommit and moves
// them to the db.
//
// assumes [db.lock] is held
func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) {
trieToCommit.validityTrackingLock.Lock()
defer trieToCommit.validityTrackingLock.Unlock()

for _, childView := range trieToCommit.childViews {
childView.updateParent(db)
db.childViews = append(db.childViews, childView)
}
trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize)
}

// applyChanges takes the [changes] and applies them to [db.intermediateNodeDB]
// and [valueNodeBatch].
//
// assumes [db.lock] is held
func (db *merkleDB) applyChanges(ctx context.Context, valueNodeBatch *valueNodeBatch, changes *changeSummary) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.applyChanges")
defer span.End()

for key, nodeChange := range changes.nodes {
shouldAddIntermediate := nodeChange.after != nil && !nodeChange.after.hasValue()
shouldDeleteIntermediate := !shouldAddIntermediate && nodeChange.before != nil && !nodeChange.before.hasValue()
Expand All @@ -976,50 +1014,30 @@ func (db *merkleDB) commitChanges(ctx context.Context, trieToCommit *view) error

if shouldAddIntermediate {
if err := db.intermediateNodeDB.Put(key, nodeChange.after); err != nil {
nodesSpan.End()
return err
}
} else if shouldDeleteIntermediate {
if err := db.intermediateNodeDB.Delete(key); err != nil {
nodesSpan.End()
return err
}
}

if shouldAddValue {
currentValueNodeBatch.Put(key, nodeChange.after)
valueNodeBatch.Put(key, nodeChange.after)
} else if shouldDeleteValue {
currentValueNodeBatch.Delete(key)
valueNodeBatch.Delete(key)
}
}
nodesSpan.End()

_, commitSpan := db.infoTracer.Start(ctx, "MerkleDB.commitChanges.valueNodeDBCommit")
err := currentValueNodeBatch.Write()
commitSpan.End()
if err != nil {
return err
}

db.history.record(changes)

// Update root in database.
db.root = changes.rootChange.after
db.rootID = changes.rootID
return nil
}

// moveChildViewsToDB removes any child views from the trieToCommit and moves them to the db
// assumes [db.lock] is held
func (db *merkleDB) moveChildViewsToDB(trieToCommit *view) {
trieToCommit.validityTrackingLock.Lock()
defer trieToCommit.validityTrackingLock.Unlock()
// commitValueChanges is a thin wrapper around [valueNodeBatch.Write()] to
// provide tracing.
func (db *merkleDB) commitValueChanges(ctx context.Context, valueNodeBatch *valueNodeBatch) error {
_, span := db.infoTracer.Start(ctx, "MerkleDB.commitValueChanges")
defer span.End()

for _, childView := range trieToCommit.childViews {
childView.updateParent(db)
db.childViews = append(db.childViews, childView)
}
trieToCommit.childViews = make([]*view, 0, defaultPreallocationSize)
return valueNodeBatch.Write()
}

// CommitToDB is a no-op for db since it is already in sync with itself.
Expand Down
6 changes: 3 additions & 3 deletions x/merkledb/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,13 @@ func (v *view) commitToDB(ctx context.Context) error {
))
defer span.End()

// Call this here instead of in [v.db.commitChanges]
// because doing so there would be a deadlock.
// Call this here instead of in [v.db.commitView] because doing so there
// would be a deadlock.
if err := v.applyValueChanges(ctx); err != nil {
return err
}

if err := v.db.commitChanges(ctx, v); err != nil {
if err := v.db.commitView(ctx, v); err != nil {
return err
}

Expand Down

0 comments on commit 0ba0ace

Please sign in to comment.