Skip to content

Commit

Permalink
fix approvals
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing committed Jan 10, 2025
1 parent b9a5e64 commit 2728dfd
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 106 deletions.
5 changes: 3 additions & 2 deletions storage/operation/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ func RetrieveResultApproval(r storage.Reader, approvalID flow.Identifier, approv
return RetrieveByKey(r, MakePrefix(codeResultApproval, approvalID), approval)
}

// IndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// UnsafeIndexResultApproval inserts a ResultApproval ID keyed by ExecutionResult ID
// and chunk index. If a value for this key exists, a storage.ErrAlreadyExists
// error is returned. This operation is only used by the ResultApprovals store,
// which is only used within a Verification node, where it is assumed that there
// is only one approval per chunk.
func IndexResultApproval(w storage.Writer, resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
// CAUTION: Use of this function must be synchronized by storage.ResultApprovals.
func UnsafeIndexResultApproval(w storage.Writer, resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
return UpsertByKey(w, MakePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)
}

Expand Down
97 changes: 47 additions & 50 deletions storage/store/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@ type ResultApprovals struct {
}

func NewResultApprovals(collector module.CacheMetrics, db storage.DB) *ResultApprovals {
store := func(key flow.Identifier, val *flow.ResultApproval) func(rw storage.ReaderBatchWriter) error {
return storage.OnlyWriter(operation.InsertResultApproval(val))
store := func(rw storage.ReaderBatchWriter, key flow.Identifier, val *flow.ResultApproval) error {
return operation.InsertResultApproval(rw.Writer(), val)
}

retrieve := func(approvalID flow.Identifier) func(r storage.Reader) (*flow.ResultApproval, error) {
retrieve := func(r storage.Reader, approvalID flow.Identifier) (*flow.ResultApproval, error) {
var approval flow.ResultApproval
return func(r storage.Reader) (*flow.ResultApproval, error) {
err := operation.RetrieveResultApproval(approvalID, &approval)(r)
return &approval, err
}
err := operation.RetrieveResultApproval(r, approvalID, &approval)
return &approval, err
}

res := &ResultApprovals{
Expand All @@ -44,64 +42,60 @@ func NewResultApprovals(collector module.CacheMetrics, db storage.DB) *ResultApp
return res
}

func (r *ResultApprovals) store(approval *flow.ResultApproval) func(storage.ReaderBatchWriter) error {
return r.cache.PutTx(approval.ID(), approval)
func (r *ResultApprovals) store(rw storage.ReaderBatchWriter, approval *flow.ResultApproval) error {
return r.cache.PutTx(rw, approval.ID(), approval)
}

func (r *ResultApprovals) byID(approvalID flow.Identifier) func(storage.Reader) (*flow.ResultApproval, error) {
return func(reader storage.Reader) (*flow.ResultApproval, error) {
val, err := r.cache.Get(approvalID)(reader)
if err != nil {
return nil, err
}
return val, nil
func (r *ResultApprovals) byID(reader storage.Reader, approvalID flow.Identifier) (*flow.ResultApproval, error) {
val, err := r.cache.Get(reader, approvalID)
if err != nil {
return nil, err
}
return val, nil
}

func (r *ResultApprovals) byChunk(resultID flow.Identifier, chunkIndex uint64) func(storage.Reader) (*flow.ResultApproval, error) {
return func(reader storage.Reader) (*flow.ResultApproval, error) {
var approvalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &approvalID)(reader)
if err != nil {
return nil, fmt.Errorf("could not lookup result approval ID: %w", err)
}
return r.byID(approvalID)(reader)
func (r *ResultApprovals) byChunk(reader storage.Reader, resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) {
var approvalID flow.Identifier
err := operation.LookupResultApproval(reader, resultID, chunkIndex, &approvalID)
if err != nil {
return nil, fmt.Errorf("could not lookup result approval ID: %w", err)
}
return r.byID(reader, approvalID)
}

// CAUTION: Caller must acquire `indexing` lock.
func (r *ResultApprovals) index(resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) func(storage.ReaderBatchWriter) error {
return func(rw storage.ReaderBatchWriter) error {
var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(resultID, chunkIndex, &storedApprovalID)(rw.GlobalReader())
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

// no approval found, index the approval

return operation.UnsafeIndexResultApproval(resultID, chunkIndex, approvalID)(rw.Writer())
func (r *ResultApprovals) index(rw storage.ReaderBatchWriter, resultID flow.Identifier, chunkIndex uint64, approvalID flow.Identifier) error {
var storedApprovalID flow.Identifier
err := operation.LookupResultApproval(rw.GlobalReader(), resultID, chunkIndex, &storedApprovalID)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not lookup result approval ID: %w", err)
}

// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.
// no approval found, index the approval

if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
resultID, chunkIndex, approvalID, storedApprovalID, storage.ErrDataMismatch)
}
return operation.UnsafeIndexResultApproval(rw.Writer(), resultID, chunkIndex, approvalID)
}

// an approval is already indexed, double check if it is the same
// We don't allow indexing multiple approvals per chunk because the
// store is only used within Verification nodes, and it is impossible
// for a Verification node to compute different approvals for the same
// chunk.

return nil
if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
resultID, chunkIndex, approvalID, storedApprovalID, storage.ErrDataMismatch)
}

return nil
}

// Store stores a ResultApproval
func (r *ResultApprovals) Store(approval *flow.ResultApproval) error {
return r.db.WithReaderBatchWriter(r.store(approval))
return r.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return r.store(rw, approval)
})
}

// Index indexes a ResultApproval by chunk (ResultID + chunk index).
Expand All @@ -116,7 +110,10 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app
r.indexing.Lock()
defer r.indexing.Unlock()

err := r.db.WithReaderBatchWriter(r.index(resultID, chunkIndex, approvalID))
err := r.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error {
return r.index(rw, resultID, chunkIndex, approvalID)
})

if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
}
Expand All @@ -125,12 +122,12 @@ func (r *ResultApprovals) Index(resultID flow.Identifier, chunkIndex uint64, app

// ByID retrieves a ResultApproval by its ID
func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error) {
return r.byID(approvalID)(r.db.Reader())
return r.byID(r.db.Reader(), approvalID)
}

// ByChunk retrieves a ResultApproval by result ID and chunk index. The
// ResultApprovals store is only used within a verification node, where it is
// assumed that there is never more than one approval per chunk.
func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error) {
return r.byChunk(resultID, chunkIndex)(r.db.Reader())
return r.byChunk(r.db.Reader(), resultID, chunkIndex)
}
95 changes: 41 additions & 54 deletions storage/store/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) {
}
}

type storeFunc[K comparable, V any] func(key K, val V) func(storage.ReaderBatchWriter) error
type storeFunc[K comparable, V any] func(rw storage.ReaderBatchWriter, key K, val V) error

// nolint:unused
func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) {
Expand All @@ -27,20 +27,16 @@ func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) {
}

// nolint:unused
func noStore[K comparable, V any](_ K, _ V) func(storage.ReaderBatchWriter) error {
return func(tx storage.ReaderBatchWriter) error {
return fmt.Errorf("no store function for cache put available")
}
func noStore[K comparable, V any](_ storage.ReaderBatchWriter, _ K, _ V) error {
return fmt.Errorf("no store function for cache put available")
}

// nolint: unused
func noopStore[K comparable, V any](_ K, _ V) func(storage.ReaderBatchWriter) error {
return func(tx storage.ReaderBatchWriter) error {
return nil
}
func noopStore[K comparable, V any](_ storage.ReaderBatchWriter, _ K, _ V) error {
return nil
}

type retrieveFunc[K comparable, V any] func(key K) func(storage.Reader) (V, error)
type retrieveFunc[K comparable, V any] func(r storage.Reader, key K) (V, error)

// nolint:unused
func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[K, V]) {
Expand All @@ -50,11 +46,9 @@ func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[
}

// nolint:unused
func noRetrieve[K comparable, V any](_ K) func(storage.Reader) (V, error) {
return func(tx storage.Reader) (V, error) {
var nullV V
return nullV, fmt.Errorf("no retrieve function for cache get available")
}
func noRetrieve[K comparable, V any](_ storage.Reader, _ K) (V, error) {
var nullV V
return nullV, fmt.Errorf("no retrieve function for cache get available")
}

type Cache[K comparable, V any] struct {
Expand Down Expand Up @@ -93,36 +87,33 @@ func (c *Cache[K, V]) IsCached(key K) bool {
// Get will try to retrieve the resource from cache first, and then from the
// injected. During normal operations, the following error returns are expected:
// - `storage.ErrNotFound` if key is unknown.
func (c *Cache[K, V]) Get(key K) func(storage.Reader) (V, error) {
return func(r storage.Reader) (V, error) {

// check if we have it in the cache
resource, cached := c.cache.Get(key)
if cached {
c.metrics.CacheHit(c.resource)
return resource, nil
}
func (c *Cache[K, V]) Get(r storage.Reader, key K) (V, error) {
// check if we have it in the cache
resource, cached := c.cache.Get(key)
if cached {
c.metrics.CacheHit(c.resource)
return resource, nil
}

// get it from the database
resource, err := c.retrieve(key)(r)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
c.metrics.CacheNotFound(c.resource)
}
var nullV V
return nullV, fmt.Errorf("could not retrieve resource: %w", err)
// get it from the database
resource, err := c.retrieve(r, key)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
c.metrics.CacheNotFound(c.resource)
}
var nullV V
return nullV, fmt.Errorf("could not retrieve resource: %w", err)
}

c.metrics.CacheMiss(c.resource)

// cache the resource and eject least recently used one if we reached limit
evicted := c.cache.Add(key, resource)
if !evicted {
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
}
c.metrics.CacheMiss(c.resource)

return resource, nil
// cache the resource and eject least recently used one if we reached limit
evicted := c.cache.Add(key, resource)
if !evicted {
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
}

return resource, nil
}

func (c *Cache[K, V]) Remove(key K) {
Expand All @@ -139,19 +130,15 @@ func (c *Cache[K, V]) Insert(key K, resource V) {
}

// PutTx will return tx which adds a resource to the cache with the given ID.
func (c *Cache[K, V]) PutTx(key K, resource V) func(storage.ReaderBatchWriter) error {
storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution)

return func(rw storage.ReaderBatchWriter) error {
storage.OnCommitSucceed(rw, func() {
c.Insert(key, resource)
})

err := storeOps(rw) // execute operations to store resource
if err != nil {
return fmt.Errorf("could not store resource: %w", err)
}

return nil
func (c *Cache[K, V]) PutTx(rw storage.ReaderBatchWriter, key K, resource V) error {
storage.OnCommitSucceed(rw, func() {
c.Insert(key, resource)
})

err := c.store(rw, key, resource)
if err != nil {
return fmt.Errorf("could not store resource: %w", err)
}

return nil
}

0 comments on commit 2728dfd

Please sign in to comment.