Skip to content

Commit

Permalink
[occ] OCC scheduler and validation fixes (#359)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
This makes optimizations to the scheduler and validation

## Testing performed to validate your change

---------

Co-authored-by: Steven Landers <[email protected]>
  • Loading branch information
udpatil and stevenlanders authored Nov 22, 2023
1 parent eac8657 commit 6260732
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 102 deletions.
33 changes: 20 additions & 13 deletions store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type (

// the same CommitKVStoreCache may be accessed concurrently by multiple
// goroutines due to transaction parallelization
mtx sync.Mutex
mtx sync.RWMutex
}

// CommitKVStoreCacheManager maintains a mapping from a StoreKey to a
Expand Down Expand Up @@ -102,27 +102,34 @@ func (ckv *CommitKVStoreCache) CacheWrap(storeKey types.StoreKey) types.CacheWra
return cachekv.NewStore(ckv, storeKey, ckv.cacheKVSize)
}

// getFromCache queries the write-through cache for a value by key.
func (ckv *CommitKVStoreCache) getFromCache(key []byte) ([]byte, bool) {
ckv.mtx.RLock()
defer ckv.mtx.RUnlock()
return ckv.cache.Get(string(key))
}

// getAndWriteToCache queries the underlying CommitKVStore and writes the result
func (ckv *CommitKVStoreCache) getAndWriteToCache(key []byte) []byte {
ckv.mtx.Lock()
defer ckv.mtx.Unlock()
value := ckv.CommitKVStore.Get(key)
ckv.cache.Add(string(key), value)
return value
}

// Get retrieves a value by key. It will first look in the write-through cache.
// If the value doesn't exist in the write-through cache, the query is delegated
// to the underlying CommitKVStore.
func (ckv *CommitKVStoreCache) Get(key []byte) []byte {
ckv.mtx.Lock()
defer ckv.mtx.Unlock()

types.AssertValidKey(key)

keyStr := string(key)
value, ok := ckv.cache.Get(keyStr)
if ok {
// cache hit
if value, ok := ckv.getFromCache(key); ok {
return value
}

// cache miss; write to cache
value = ckv.CommitKVStore.Get(key)
ckv.cache.Add(keyStr, value)

return value
// if not found in the cache, query the underlying CommitKVStore and init cache value
return ckv.getAndWriteToCache(key)
}

// Set inserts a key/value pair into both the write-through cache and the
Expand Down
33 changes: 23 additions & 10 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (b mapCacheBackend) Range(f func(string, *types.CValue) bool) {

// Store wraps an in-memory cache around an underlying types.KVStore.
type Store struct {
mtx sync.Mutex
mtx sync.RWMutex
cache *types.BoundedCache
deleted *sync.Map
unsortedCache map[string]struct{}
Expand Down Expand Up @@ -104,20 +104,33 @@ func (store *Store) GetStoreType() types.StoreType {
return store.parent.GetStoreType()
}

// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
// getFromCache queries the write-through cache for a value by key.
func (store *Store) getFromCache(key []byte) ([]byte, bool) {
store.mtx.RLock()
defer store.mtx.RUnlock()
if cv, ok := store.cache.Get(conv.UnsafeBytesToStr(key)); ok {
return cv.Value(), true
}
return nil, false
}

// getAndWriteToCache queries the underlying CommitKVStore and writes the result
func (store *Store) getAndWriteToCache(key []byte) []byte {
store.mtx.Lock()
defer store.mtx.Unlock()
value := store.parent.Get(key)
store.setCacheValue(key, value, false, false)
return value
}

// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
types.AssertValidKey(key)

cacheValue, ok := store.cache.Get(conv.UnsafeBytesToStr(key))
value, ok := store.getFromCache(key)
if !ok {
// TODO: (occ) This is an example of when we fall through when we dont have a cache hit. Similarly, for mvkv, we'll try to serve reads from a local cache thats transient to the TX, and if its NOT present, then we read through AND mark the access (along with the value that was read) for validation
value = store.parent.Get(key)
store.setCacheValue(key, value, false, false)
} else {
value = cacheValue.Value()
value = store.getAndWriteToCache(key)
}
// TODO: (occ) This is an example of how we currently track accesses
store.eventManager.EmitResourceAccessReadEvent("get", store.storeKey, key, value)
Expand All @@ -140,8 +153,8 @@ func (store *Store) Set(key []byte, value []byte) {
// Has implements types.KVStore.
func (store *Store) Has(key []byte) bool {
value := store.Get(key)
store.mtx.Lock()
defer store.mtx.Unlock()
store.mtx.RLock()
defer store.mtx.RUnlock()
store.eventManager.EmitResourceAccessReadEvent("has", store.storeKey, key, value)
return value != nil
}
Expand Down
13 changes: 7 additions & 6 deletions store/multiversion/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,11 @@ func (s *Store) validateIterator(index int, tracker iterationTracker) bool {
}

func (s *Store) checkIteratorAtIndex(index int) bool {
s.mtx.RLock()
defer s.mtx.RUnlock()

valid := true
s.mtx.RLock()
iterateset := s.txIterateSets[index]
s.mtx.RUnlock()

for _, iterationTracker := range iterateset {
iteratorValid := s.validateIterator(index, iterationTracker)
valid = valid && iteratorValid
Expand All @@ -333,11 +333,12 @@ func (s *Store) checkIteratorAtIndex(index int) bool {
}

func (s *Store) checkReadsetAtIndex(index int) (bool, []int) {
s.mtx.RLock()
defer s.mtx.RUnlock()

conflictSet := make(map[int]struct{})

s.mtx.RLock()
readset := s.txReadSets[index]
s.mtx.RUnlock()

valid := true

// iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store
Expand Down
Loading

0 comments on commit 6260732

Please sign in to comment.