Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqlookupd: fix write lock starvation #1208

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions nsqlookupd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,9 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
defer s.ctx.nsqlookupd.DB.RUnlock()

data := make(map[string][]map[string]interface{})
for r, producers := range s.ctx.nsqlookupd.DB.registrationMap {
s.ctx.nsqlookupd.DB.registrationMap.Range(func(k, v interface{}) bool {
producers := v.(ProducerMap)
r := k.(Registration)
key := r.Category + ":" + r.Key + ":" + r.SubKey
for _, p := range producers {
m := map[string]interface{}{
Expand All @@ -324,7 +326,8 @@ func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httpro
}
data[key] = append(data[key], m)
}
}
return true
})

return data, nil
}
89 changes: 47 additions & 42 deletions nsqlookupd/registration_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]ProducerMap
registrationMap *sync.Map
}

type Registration struct {
Expand Down Expand Up @@ -54,119 +54,124 @@ func (p *Producer) IsTombstoned(lifetime time.Duration) bool {

func NewRegistrationDB() *RegistrationDB {
return &RegistrationDB{
registrationMap: make(map[Registration]ProducerMap),
registrationMap: &sync.Map{},
}
}

// add a registration key
func (r *RegistrationDB) AddRegistration(k Registration) {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = make(map[string]*Producer)
}
r.registrationMap.LoadOrStore(k, make(ProducerMap))
}

// add a producer to a registration
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = make(map[string]*Producer)
}
producers := r.registrationMap[k]
val, _ := r.registrationMap.LoadOrStore(k, make(ProducerMap))
producers := val.(ProducerMap)
_, found := producers[p.peerInfo.id]
if found == false {
producers[p.peerInfo.id] = p
}

return !found
}

// remove a producer from a registration
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
r.Lock()
defer r.Unlock()
producers, ok := r.registrationMap[k]
value, ok := r.registrationMap.Load(k)
if !ok {
return false, 0
}
producers := value.(ProducerMap)
removed := false
if _, exists := producers[id]; exists {
removed = true
}

// Note: this leaves keys in the DB even if they have empty lists
delete(producers, id)

return removed, len(producers)
}

// remove a Registration and all it's producers
func (r *RegistrationDB) RemoveRegistration(k Registration) {
r.Lock()
defer r.Unlock()
delete(r.registrationMap, k)
r.registrationMap.Delete(k)
}

func (r *RegistrationDB) needFilter(key string, subkey string) bool {
return key == "*" || subkey == "*"
}

func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
if _, ok := r.registrationMap[k]; ok {
if _, ok := r.registrationMap.Load(k); ok {
return Registrations{k}
}
return Registrations{}
}
results := Registrations{}
for k := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
r.registrationMap.Range(func(k, _ interface{}) bool {
if k.(Registration).IsMatch(category, key, subkey) {
results = append(results, k.(Registration))
}
results = append(results, k)
}
return true
})

return results
}

func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
return ProducerMap2Slice(r.registrationMap[k])
val, _ := r.registrationMap.Load(k)

r.RLock()
defer r.RUnlock()
return ProducerMap2Slice(val.(ProducerMap))
}

r.RLock()
results := make(map[string]struct{})
var retProducers Producers
for k, producers := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
for _, producer := range producers {
_, found := results[producer.peerInfo.id]
if found == false {
results[producer.peerInfo.id] = struct{}{}
retProducers = append(retProducers, producer)
r.registrationMap.Range(func(k, v interface{}) bool {
if k.(Registration).IsMatch(category, key, subkey) {
producers := v.(ProducerMap)
for _, producer := range producers {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to r.RLock() around this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take the r.RLock() around the whole loop, that puts this back close to where it started. If you can take the RLock() only after matching a Registration and before iterating over the ProducerMap, and then releasing it after finishing with that ProducerMap, it could avoid starving the write lock.

Copy link
Member Author

@andyxning andyxning Nov 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Actually after finally finished this PR, i have thought about this. The PR now seems similar to the original one. But, some write lock has been erased.

If you can take the RLock() only after matching a Registration and before iterating over the ProducerMap, and then releasing it after finishing with that ProducerMap, it could avoid starving the write lock.

Not quite sure about this. Since read lock can be acquired simultaneously and acquiring and releasing read lock frequently may utilize more CPU.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah ... that's why I said this strategy is "sounding more iffy"

as I see it, it's either go back to the original, or take the read lock just around accessing each ProducerMap ("thrashing" the lock I guess), or turn all these ProducerMap into sync.Map themselves

_, found := results[producer.peerInfo.id]
if found == false {
results[producer.peerInfo.id] = struct{}{}
retProducers = append(retProducers, producer)
}
}
}
}
return true
})

r.RUnlock()

return retProducers
}

func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
r.RLock()
defer r.RUnlock()

results := Registrations{}
for k, producers := range r.registrationMap {
r.registrationMap.Range(func(k, v interface{}) bool {
producers := v.(ProducerMap)
if _, exists := producers[id]; exists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also need the r.RLock() here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

results = append(results, k)
results = append(results, k.(Registration))
}
}

return true
})

r.RUnlock()

return results
}

Expand Down