From 61e543284bec21489a5c91e56ff50c5b4d070964 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 9 Sep 2023 11:40:33 +0200 Subject: [PATCH 1/2] feature: calculate allocated memory (approx), OTEL support Signed-off-by: Valery Piashchynski --- go.sum | 4 - memorykv/kv.go | 294 +++++++++++++++++++++++++++++++++++++++---------- plugin.go | 2 +- 3 files changed, 234 insertions(+), 66 deletions(-) diff --git a/go.sum b/go.sum index 90d17d8..9d0ef92 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,6 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/roadrunner-server/api/v4 v4.7.0 h1:vZ8gYWjEpJOa8slUqPJJKENS4vrFKhvI9+nQKDtSqHU= -github.com/roadrunner-server/api/v4 v4.7.0/go.mod h1:Ut9T2j3E22cnRJtipbU8N3WVhyV040iiDfddzojlKwY= github.com/roadrunner-server/api/v4 v4.7.1 h1:BYU2n92mcVyanV4w3fBSSztfP8ILUh5IyndRI1UD2/8= github.com/roadrunner-server/api/v4 v4.7.1/go.mod h1:Ut9T2j3E22cnRJtipbU8N3WVhyV040iiDfddzojlKwY= github.com/roadrunner-server/endure/v2 v2.4.2 h1:aFnPc321l5HDzE2mN5wwfksJ40lgXwfU3RSqdS1LyUQ= @@ -39,8 +37,6 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= -golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/memorykv/kv.go b/memorykv/kv.go index 39d9796..89361ee 100644 --- a/memorykv/kv.go +++ b/memorykv/kv.go @@ -1,22 +1,42 @@ package memorykv import ( + "context" "strings" "sync" + "sync/atomic" "time" "github.com/roadrunner-server/api/v4/plugins/v1/kv" "github.com/roadrunner-server/errors" + "go.opentelemetry.io/otel/attribute" + sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/zap" ) +const ( + tracerName = "inmemory" +) + +type callback func(sCh <-chan struct{}) + +type cb struct { + updateCh chan int // new ttl + stopCh chan struct{} +} + type Driver struct { - clearMu sync.RWMutex - heap sync.Map - // stop is used to stop keys GC and close boltdb connection - stop chan struct{} - log *zap.Logger - cfg *Config + mu sync.RWMutex + + heap sync.Map // map[string]kv.Item + // callbacks contains all callbacks channels for the keys + callbacks sync.Map // map[string]*cb + broadcastStopCh atomic.Pointer[chan struct{}] + + mapSize int64 + tracer *sdktrace.TracerProvider + log *zap.Logger + cfg *Config } type Configurer interface { @@ -26,14 +46,24 @@ type Configurer interface { Has(name string) bool } -func NewInMemoryDriver(key string, log *zap.Logger, cfgPlugin Configurer) (*Driver, error) { +func NewInMemoryDriver(key string, log *zap.Logger, cfgPlugin Configurer, tracer *sdktrace.TracerProvider) (*Driver, error) { const op = errors.Op("new_in_memory_driver") + if tracer == nil { + tracer = sdktrace.NewTracerProvider() + } + d := &Driver{ - stop: make(chan struct{}), - log: log, + callbacks: sync.Map{}, + heap: sync.Map{}, + mapSize: 0, + log: log, + tracer: tracer, } + ch := make(chan struct{}) + d.broadcastStopCh.Store(&ch) + err := cfgPlugin.UnmarshalKey(key, &d.cfg) if err != nil { return nil, errors.E(op, err) @@ -45,16 +75,20 @@ func NewInMemoryDriver(key string, log *zap.Logger, cfgPlugin Configurer) (*Driv d.cfg.InitDefaults() - go d.gc() - return d, nil } func (d *Driver) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("in_memory_plugin_has") + + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:has") + defer span.End() + if keys == nil { + span.RecordError(errors.Str("no keys provided")) return nil, errors.E(op, errors.NoKeys) } + m := make(map[string]bool) for i := range keys { keyTrimmed := strings.TrimSpace(keys[i]) @@ -67,28 +101,43 @@ func (d *Driver) Has(keys ...string) (map[string]bool, error) { } } + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) + return m, nil } func (d *Driver) Get(key string) ([]byte, error) { const op = errors.Op("in_memory_plugin_get") + + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:get") + defer span.End() + // to get cases like " " keyTrimmed := strings.TrimSpace(key) if keyTrimmed == "" { + span.RecordError(errors.Str("empty key")) return nil, errors.E(op, errors.EmptyKey) } if data, exist := d.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) return data.(kv.Item).Value(), nil } + + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) + return nil, nil } func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("in_memory_plugin_mget") + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:mget") + defer span.End() + if keys == nil { + span.RecordError(errors.Str("no keys provided")) return nil, errors.E(op, errors.NoKeys) } @@ -96,6 +145,7 @@ func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { for i := range keys { keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { + span.RecordError(errors.Str("empty key")) return nil, errors.E(op, errors.EmptyKey) } } @@ -108,12 +158,18 @@ func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { } } + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) + return m, nil } func (d *Driver) Set(items ...kv.Item) error { const op = errors.Op("in_memory_plugin_set") + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:set") + defer span.End() + if items == nil { + span.RecordError(errors.Str("no items provided")) return errors.E(op, errors.NoKeys) } @@ -124,14 +180,41 @@ func (d *Driver) Set(items ...kv.Item) error { // TTL is set if items[i].Timeout() != "" { // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].Timeout()) + tt, err := time.Parse(time.RFC3339, items[i].Timeout()) if err != nil { + span.RecordError(err) return err } + + tm := int(tt.UTC().Sub(time.Now().UTC()).Seconds()) + // we already in the future :) + if tm < 0 { + d.updateAllocatedSize(int64(len(items[i].Key()) + len(items[i].Value()) + len(items[i].Timeout()))) + // set item + d.heap.Store(items[i].Key(), items[i]) + continue + } + + // create callback to delete the key from the heap + clbk, stopCh, updateCh := d.ttlcallback(items[i].Key(), tm) + go func() { + clbk(*d.broadcastStopCh.Load()) + }() + + // store the callback since we have TTL + d.callbacks.Store(items[i].Key(), &cb{ + updateCh: updateCh, + stopCh: stopCh, + }) } + d.updateAllocatedSize(int64(len(items[i].Key()) + len(items[i].Value()) + len(items[i].Timeout()))) + // set item d.heap.Store(items[i].Key(), items[i]) } + + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) + return nil } @@ -139,38 +222,62 @@ func (d *Driver) Set(items ...kv.Item) error { // If key already has the expiration time, it will be overwritten func (d *Driver) MExpire(items ...kv.Item) error { const op = errors.Op("in_memory_plugin_mexpire") + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:mexpire") + defer span.End() + for i := range items { if items[i] == nil { continue } + if items[i].Timeout() == "" || strings.TrimSpace(items[i].Key()) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) + span.RecordError(errors.Str("timeout for MExpire is empty or key is empty")) + return errors.E(op, errors.Str("timeout for MExpire is empty or key is empty")) } - // if key exist, overwrite it value - if pItem, ok := d.heap.LoadAndDelete(items[i].Key()); ok { - // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].Timeout()) - if err != nil { - return errors.E(op, err) - } - // guess that t is in the future - // in memory is just FOR TESTING PURPOSES - // LOGIC ISN'T IDEAL - d.heap.Store(items[i].Key(), &Item{ - key: items[i].Key(), - value: pItem.(kv.Item).Value(), - timeout: items[i].Timeout(), + // check if the time is correct + tm, err := time.Parse(time.RFC3339, items[i].Timeout()) + if err != nil { + span.RecordError(err) + return errors.E(op, err) + } + + ttm := int(tm.UTC().Sub(time.Now().UTC()).Seconds()) + if ttm < 0 { + // we're in the future, delete the item + ttm = 0 + } + + if clb, ok := d.callbacks.Load(items[i].Key()); ok { + // send new ttl to the callback + clb.(*cb).updateCh <- ttm + } else { + // we should set the callback + // create callback to delete the key from the heap + clbk, stopCh, updateCh := d.ttlcallback(items[i].Key(), ttm) + go func() { + clbk(*d.broadcastStopCh.Load()) + }() + + d.callbacks.Store(items[i].Key(), &cb{ + updateCh: updateCh, + stopCh: stopCh, }) } } + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) + return nil } func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("in_memory_plugin_ttl") + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:ttl") + defer span.End() + if keys == nil { + span.RecordError(errors.Str("no keys provided")) return nil, errors.E(op, errors.NoKeys) } @@ -178,6 +285,7 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) { for i := range keys { keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { + span.RecordError(errors.Str("empty key")) return nil, errors.E(op, errors.EmptyKey) } } @@ -189,12 +297,19 @@ func (d *Driver) TTL(keys ...string) (map[string]string, error) { m[keys[i]] = item.(kv.Item).Timeout() } } + + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) + return m, nil } func (d *Driver) Delete(keys ...string) error { const op = errors.Op("in_memory_plugin_delete") + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:delete") + + defer span.End() if keys == nil { + span.RecordError(errors.Str("no keys provided")) return errors.E(op, errors.NoKeys) } @@ -202,61 +317,118 @@ func (d *Driver) Delete(keys ...string) error { for i := range keys { keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { + span.RecordError(errors.Str("empty key")) return errors.E(op, errors.EmptyKey) } } for i := range keys { d.heap.Delete(keys[i]) + clbk, ok := d.callbacks.LoadAndDelete(keys[i]) + if ok { + // send signal to stop the timer and delete the item + clbk.(*cb).stopCh <- struct{}{} + } } + + span.SetAttributes(attribute.Int64("allocated memory:bytes", d.loadAllocatedSize())) + return nil } func (d *Driver) Clear() error { - d.clearMu.Lock() - d.heap = sync.Map{} - d.clearMu.Unlock() + _, span := d.tracer.Tracer(tracerName).Start(context.Background(), "inmemory:clear") + defer span.End() + // stop all callbacks + close(*d.broadcastStopCh.Load()) + + newCh := make(chan struct{}) + d.broadcastStopCh.Swap(&newCh) + + d.heap.Range(func(key any, value any) bool { + d.heap.Delete(key) + return true + }) + + // zero the allocated size + atomic.StoreInt64(&d.mapSize, 0) return nil } +func (d *Driver) updateAllocatedSize(newsize int64) { + if newsize > 0 { + atomic.AddInt64(&d.mapSize, newsize) + return + } + + curr := atomic.LoadInt64(&d.mapSize) + if curr >= newsize { + atomic.AddInt64(&d.mapSize, newsize) + } else { + atomic.StoreInt64(&d.mapSize, 0) + } +} + +func (d *Driver) loadAllocatedSize() int64 { + return atomic.LoadInt64(&d.mapSize) +} + func (d *Driver) Stop() { - d.stop <- struct{}{} + close(*d.broadcastStopCh.Load()) } // ================================== PRIVATE ====================================== -func (d *Driver) gc() { - ticker := time.NewTicker(time.Duration(d.cfg.Interval) * time.Second) - defer ticker.Stop() - for { +func (d *Driver) ttlcallback(id string, ttl int) (callback, chan struct{}, chan int) { + stopCbCh := make(chan struct{}, 1) + updateTTLCh := make(chan int, 1) + + // at this point, when adding lock, we should not have the callback + return func(sCh <-chan struct{}) { + // ttl + cbttl := ttl + ta := time.NewTicker(time.Second * time.Duration(cbttl)) + loop: select { - case <-d.stop: - return - case now := <-ticker.C: - // mutes needed to clear the map - d.clearMu.RLock() - - // check every second - d.heap.Range(func(key, value interface{}) bool { - v := value.(kv.Item) - if v.Timeout() == "" { - return true - } - - t, err := time.Parse(time.RFC3339, v.Timeout()) - if err != nil { - return false - } - - if now.After(t) { - d.log.Debug("key was deleted", zap.Any("key", key)) - d.heap.Delete(key) - } - return true - }) + case <-ta.C: + d.log.Debug("ttl expired", + zap.String("id", id), + zap.Int("ttl seconds", cbttl), + ) + ta.Stop() + // broadcast stop channel + case <-sCh: + d.log.Debug("ttl removed, broadcast call", + zap.String("id", id), + zap.Int("ttl seconds", cbttl), + ) + ta.Stop() + // item stop channel + case <-stopCbCh: + d.log.Debug("ttl removed, callback call", + zap.String("id", id), + zap.Int("ttl seconds", cbttl), + ) + ta.Stop() + case newTTL := <-updateTTLCh: + d.log.Debug("updating ttl", + zap.String("id", id), + zap.Int("prev_ttl", cbttl), + zap.Int("new_ttl", newTTL)) + // update callback TTL (for logs) + cbttl = newTTL + ta.Reset(time.Second * time.Duration(newTTL)) + // in case of TTL we don't need to remove the item, only update TTL + goto loop + } - d.clearMu.RUnlock() + val, ok := d.heap.LoadAndDelete(id) + if ok { + // subtract the size of the item + d.updateAllocatedSize(-int64(len(id) + len(val.(kv.Item).Value()) + len(val.(kv.Item).Timeout()))) } - } + + d.callbacks.Delete(id) + }, stopCbCh, updateTTLCh } diff --git a/plugin.go b/plugin.go index 07f98bc..d977158 100644 --- a/plugin.go +++ b/plugin.go @@ -54,7 +54,7 @@ func (p *Plugin) Collects() []*dep.In { // Drivers implementation func (p *Plugin) KvFromConfig(key string) (kv.Storage, error) { - return memorykv.NewInMemoryDriver(key, p.log, p.cfg) + return memorykv.NewInMemoryDriver(key, p.log, p.cfg, p.tracer) } // DriverFromConfig constructs memory driver from the .rr.yaml configuration From 8ee04af33b448068972ece50f1f445df5242231c Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sat, 9 Sep 2023 11:46:02 +0200 Subject: [PATCH 2/2] chore: linters Signed-off-by: Valery Piashchynski --- memorykv/kv.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/memorykv/kv.go b/memorykv/kv.go index 89361ee..302183a 100644 --- a/memorykv/kv.go +++ b/memorykv/kv.go @@ -26,8 +26,6 @@ type cb struct { } type Driver struct { - mu sync.RWMutex - heap sync.Map // map[string]kv.Item // callbacks contains all callbacks channels for the keys callbacks sync.Map // map[string]*cb