Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] change incarnation metric type #433

Open
wants to merge 36 commits into
base: occ-main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1923a47
Add occ todos / comments (#317)
udpatil Sep 13, 2023
5ad76e0
Multiversion Item Implementation and Tests (#318)
udpatil Sep 26, 2023
5e5b4ce
[occ] Add incarnation field (#321)
udpatil Sep 29, 2023
cda8c31
[occ] Implement basic multiversion store (#322)
udpatil Oct 6, 2023
703b28a
[occ] Add concurrency worker configuration (#324)
stevenlanders Oct 9, 2023
1e60246
[occ] Occ multiversion store (#326)
udpatil Oct 10, 2023
a3aec8a
[occ] Add batch tx delivery interface (#327)
stevenlanders Oct 10, 2023
0520ced
[occ] MVKV store implementation and tests (#323)
udpatil Oct 10, 2023
0864c32
[occ] Add validation function for transaction state to multiversionst…
udpatil Oct 13, 2023
de92cbc
[occ] Add basic worker task and scheduler shell (#328)
stevenlanders Oct 17, 2023
6406dd9
[occ] Implement iterator for mvkv (#329)
udpatil Oct 17, 2023
f6b0b9f
fix dependency (#334)
udpatil Oct 17, 2023
f3a6cf4
[occ] Iterateset tracking and validation implementation (#337)
udpatil Oct 19, 2023
60b2113
[occ] Add scheduler logic for validation (#336)
stevenlanders Oct 19, 2023
1178e0b
[occ] Fix situation where no stores causes a panic (#338)
stevenlanders Oct 20, 2023
6ec1620
Add occ flag check to context (#340)
stevenlanders Oct 23, 2023
9530aeb
[occ] Add struct field and helpers for estimate prefills (#341)
udpatil Oct 24, 2023
3bdbc96
Fix map access panic (#343)
stevenlanders Oct 30, 2023
954c5a9
Gen estimates writeset (#344)
udpatil Nov 3, 2023
be45bb5
[OCC] Add trace spans to scheduler (#347)
stevenlanders Nov 6, 2023
061ef70
[occ] Fix parent store readset validation (#348)
udpatil Nov 10, 2023
f9541fb
[occ] OCC scheduler and validation fixes (#359)
udpatil Nov 22, 2023
30b7fab
[occ] Add optimizations for multiversion and mvkv (#361)
udpatil Nov 27, 2023
4d12564
[OCC] Add scheduler goroutine pool and optimizations (#362)
stevenlanders Nov 29, 2023
a780a58
update concurrency workers (#380)
udpatil Dec 12, 2023
cb02f7e
[OCC] Fix hang where abort channel blocks iterator (#379)
stevenlanders Dec 13, 2023
fa4b76d
Occ iterator fix (#389)
udpatil Jan 2, 2024
6bf4d72
fix interface
udpatil Jan 2, 2024
6878b96
fix deleteIterateSet and leave some comments (#395)
udpatil Jan 12, 2024
d08b8f0
Occ enabled cfg (#398)
udpatil Jan 18, 2024
75d1151
Remove block gas meter in occ (#407)
udpatil Jan 25, 2024
48dfff5
Relax locking contention (#427)
udpatil Feb 9, 2024
f9805fe
Rebase occ main (#428)
udpatil Feb 9, 2024
6b43421
fix assignment to not cause panic (#429)
udpatil Feb 9, 2024
5a05027
[OCC] add metrics for scheduler (#431)
stevenlanders Feb 13, 2024
00cc70d
change incarnation type
stevenlanders Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[occ] Implement iterator for mvkv (#329)
## Describe your changes and provide context
This implements Iterator and ReverseIterator for mvkv for the KVStore
interface. The memiterator will be composed of versionindexedstore and
multiversionstore, and will yield values in a cascading fashion firstly
from the writeset, and then second from the multiversion store.

This still needs optimization to persisted sorted keys instead of
reconstructing sorted keys each time.

## Testing performed to validate your change
Unit test to verify basic functionality
udpatil committed Jan 31, 2024
commit 6406dd910796040b93042bd99475d6fba11dfb94
74 changes: 74 additions & 0 deletions store/multiversion/memiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package multiversion

import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/store/types"
scheduler "github.com/cosmos/cosmos-sdk/types/occ"
)

// Iterates over iterKVCache items.
// if key is nil, means it was deleted.
// Implements Iterator.
type memIterator struct {
types.Iterator

mvStore MultiVersionStore
writeset map[string][]byte
index int
abortChannel chan scheduler.Abort
}

func (store *VersionIndexedStore) newMemIterator(
start, end []byte,
items *dbm.MemDB,
ascending bool,
) *memIterator {
var iter types.Iterator
var err error

if ascending {
iter, err = items.Iterator(start, end)
} else {
iter, err = items.ReverseIterator(start, end)
}

if err != nil {
if iter != nil {
iter.Close()
}
panic(err)
}

return &memIterator{
Iterator: iter,
mvStore: store.multiVersionStore,
index: store.transactionIndex,
abortChannel: store.abortChannel,
writeset: store.GetWriteset(),
}
}

// try to get value from the writeset, otherwise try to get from multiversion store, otherwise try to get from parent iterator
func (mi *memIterator) Value() []byte {
key := mi.Iterator.Key()

// try fetch from writeset - return if exists
if val, ok := mi.writeset[string(key)]; ok {
return val
}

// get the value from the multiversion store
val := mi.mvStore.GetLatestBeforeIndex(mi.index, key)

// if we have an estiamte, write to abort channel
if val.IsEstimate() {
mi.abortChannel <- scheduler.NewEstimateAbort(val.Index())
}

// if we have a deleted value, return nil
if val.IsDeleted() {
return nil
}
return val.Value()
}
256 changes: 256 additions & 0 deletions store/multiversion/mergeiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package multiversion

import (
"bytes"
"errors"

"github.com/cosmos/cosmos-sdk/store/types"
)

// mvsMergeIterator merges a parent Iterator and a cache Iterator.
// The cache iterator may return nil keys to signal that an item
// had been deleted (but not deleted in the parent).
// If the cache iterator has the same key as the parent, the
// cache shadows (overrides) the parent.
type mvsMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
}

var _ types.Iterator = (*mvsMergeIterator)(nil)

func NewMVSMergeIterator(
parent, cache types.Iterator,
ascending bool,
) *mvsMergeIterator {
iter := &mvsMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
}

return iter
}

// Domain implements Iterator.
// It returns the union of the iter.Parent doman, and the iter.Cache domain.
// If the domains are disjoint, this includes the domain in between them as well.
func (iter *mvsMergeIterator) Domain() (start, end []byte) {
startP, endP := iter.parent.Domain()
startC, endC := iter.cache.Domain()

if iter.compare(startP, startC) < 0 {
start = startP
} else {
start = startC
}

if iter.compare(endP, endC) < 0 {
end = endC
} else {
end = endP
}

return start, end
}

// Valid implements Iterator.
func (iter *mvsMergeIterator) Valid() bool {
return iter.skipUntilExistsOrInvalid()
}

// Next implements Iterator
func (iter *mvsMergeIterator) Next() {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the next cache item.
if !iter.parent.Valid() {
iter.cache.Next()
return
}

// If cache is invalid, get the next parent item.
if !iter.cache.Valid() {
iter.parent.Next()
return
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
switch iter.compare(keyP, keyC) {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
iter.parent.Next()
iter.cache.Next()
case 1: // parent > cache
iter.cache.Next()
}
}

// Key implements Iterator
func (iter *mvsMergeIterator) Key() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache key.
if !iter.parent.Valid() {
return iter.cache.Key()
}

// If cache is invalid, get the parent key.
if !iter.cache.Valid() {
return iter.parent.Key()
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
return keyP
case 0: // parent == cache
return keyP
case 1: // parent > cache
return keyC
default:
panic("invalid compare result")
}
}

// Value implements Iterator
func (iter *mvsMergeIterator) Value() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()

// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
value := iter.cache.Value()
return value
}

// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
value := iter.parent.Value()
return value
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
value := iter.parent.Value()
return value
case 0, 1: // parent >= cache
value := iter.cache.Value()
return value
default:
panic("invalid comparison result")
}
}

// Close implements Iterator
func (iter *mvsMergeIterator) Close() error {
if err := iter.parent.Close(); err != nil {
// still want to close cache iterator regardless
iter.cache.Close()
return err
}

return iter.cache.Close()
}

// Error returns an error if the mvsMergeIterator is invalid defined by the
// Valid method.
func (iter *mvsMergeIterator) Error() error {
if !iter.Valid() {
return errors.New("invalid mvsMergeIterator")
}

return nil
}

// If not valid, panics.
// NOTE: May have side-effect of iterating over cache.
func (iter *mvsMergeIterator) assertValid() {
if err := iter.Error(); err != nil {
panic(err)
}
}

// Like bytes.Compare but opposite if not ascending.
func (iter *mvsMergeIterator) compare(a, b []byte) int {
if iter.ascending {
return bytes.Compare(a, b)
}

return bytes.Compare(a, b) * -1
}

// Skip all delete-items from the cache w/ `key < until`. After this function,
// current cache item is a non-delete-item, or `until <= key`.
// If the current cache item is not a delete item, does nothing.
// If `until` is nil, there is no limit, and cache may end up invalid.
// CONTRACT: cache is valid.
func (iter *mvsMergeIterator) skipCacheDeletes(until []byte) {
for iter.cache.Valid() &&
iter.cache.Value() == nil &&
(until == nil || iter.compare(iter.cache.Key(), until) < 0) {
iter.cache.Next()
}
}

// Fast forwards cache (or parent+cache in case of deleted items) until current
// item exists, or until iterator becomes invalid.
// Returns whether the iterator is valid.
func (iter *mvsMergeIterator) skipUntilExistsOrInvalid() bool {
for {
// If parent is invalid, fast-forward cache.
if !iter.parent.Valid() {
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
}
// Parent is valid.
if !iter.cache.Valid() {
return true
}
// Parent is valid, cache is valid.

// Compare parent and cache.
keyP := iter.parent.Key()
keyC := iter.cache.Key()

switch iter.compare(keyP, keyC) {
case -1: // parent < cache.
return true

case 0: // parent == cache.
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.parent.Next()
iter.cache.Next()

continue
}
// Cache is not a delete.

return true // cache exists.
case 1: // cache < parent
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.skipCacheDeletes(keyP)
continue
}
// Cache is not a delete.

return true // cache exists.
}
}
}
Loading