Skip to content

Commit

Permalink
Merge pull request #16 from globalsign/bugfix/jameinel-max-txn-queue-…
Browse files Browse the repository at this point in the history
…length

Bound TXN queue lengths
  • Loading branch information
domodwyer authored Jul 5, 2017
2 parents a724dca + d47fb18 commit 73a9463
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili
* Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2))
* Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5))
* Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7))
* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11))
* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464))
* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16))
* Fixes cursor timeouts ([detials](https://jira.mongodb.org/browse/SERVER-24899))

---
Expand Down
10 changes: 10 additions & 0 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ NextDoc:
change.Upsert = false
chaos("")
if _, err := cquery.Apply(change, &info); err == nil {
if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength {
// abort with TXN Queue too long, but remove the entry we just added
innerErr := c.UpdateId(dkey.Id,
bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}})
if innerErr != nil {
f.debugf("error while backing out of queue-too-long: %v", innerErr)
}
return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)",
dkey.Id, dkey.C, len(info.Queue))
}
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
Expand Down
40 changes: 36 additions & 4 deletions txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,14 @@ const (
// A Runner applies operations as part of a transaction onto any number
// of collections within a database. See the Run method for details.
type Runner struct {
tc *mgo.Collection // txns
sc *mgo.Collection // stash
lc *mgo.Collection // log
tc *mgo.Collection // txns
sc *mgo.Collection // stash
lc *mgo.Collection // log
opts RunnerOptions // runtime options
}

const defaultMaxTxnQueueLength = 1000

// NewRunner returns a new transaction runner that uses tc to hold its
// transactions.
//
Expand All @@ -233,7 +236,36 @@ type Runner struct {
// will be used for implementing the transactional behavior of insert
// and remove operations.
func NewRunner(tc *mgo.Collection) *Runner {
return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil}
return &Runner{
tc: tc,
sc: tc.Database.C(tc.Name + ".stash"),
lc: nil,
opts: DefaultRunnerOptions(),
}
}

// RunnerOptions encapsulates ways you can tweak transaction Runner behavior.
type RunnerOptions struct {
// MaxTxnQueueLength is a way to limit bad behavior. Many operations on
// transaction queues are O(N^2), and transaction queues growing too large
// are usually indicative of a bug in behavior. This should be larger
// than the maximum number of concurrent operations to a single document.
// Normal operations are likely to only ever hit 10 or so, we use a default
// maximum length of 1000.
MaxTxnQueueLength int
}

// SetOptions allows people to change some of the internal behavior of a Runner.
func (r *Runner) SetOptions(opts RunnerOptions) {
r.opts = opts
}

// DefaultRunnerOptions defines default behavior for a Runner.
// Users can use the DefaultRunnerOptions to only override specific behavior.
func DefaultRunnerOptions() RunnerOptions {
return RunnerOptions{
MaxTxnQueueLength: defaultMaxTxnQueueLength,
}
}

var ErrAborted = fmt.Errorf("transaction aborted")
Expand Down
84 changes: 84 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,90 @@ func (s *S) TestTxnQueueStashStressTest(c *C) {
}
}

func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) {
txn.SetDebug(false)
txn.SetChaos(txn.Chaos{
KillChance: 1,
Breakpoint: "set-applying",
})
defer txn.SetChaos(txn.Chaos{})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
for i := 0; i < expectedQueueLength; i++ {
err := s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetDebug(true)
// Now that we've filled up the queue, we should see that there are 1000
// items in the queue, and the error applying a new one will change.
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
err = s.runner.Run(ops, "", nil)
c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`)
// The txn-queue should not have grown
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
}

func (s *S) TestTxnQueueDefaultMaxSize(c *C) {
s.runner.SetOptions(txn.DefaultRunnerOptions())
s.checkTxnQueueLength(c, 1000)
}

func (s *S) TestTxnQueueCustomMaxSize(c *C) {
opts := txn.DefaultRunnerOptions()
opts.MaxTxnQueueLength = 100
s.runner.SetOptions(opts)
s.checkTxnQueueLength(c, 100)
}

func (s *S) TestTxnQueueUnlimited(c *C) {
opts := txn.DefaultRunnerOptions()
// A value of 0 should mean 'unlimited'
opts.MaxTxnQueueLength = 0
s.runner.SetOptions(opts)
// it isn't possible to actually prove 'unlimited' but we can prove that
// we at least can insert more than the default number of transactions
// without getting a 'too many transactions' failure.
txn.SetDebug(false)
txn.SetChaos(txn.Chaos{
KillChance: 1,
// Use set-prepared because we are adding more transactions than
// other tests, and this speeds up setup time a bit
Breakpoint: "set-prepared",
})
defer txn.SetChaos(txn.Chaos{})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
for i := 0; i < 1100; i++ {
err := s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetDebug(true)
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100)
err = s.runner.Run(ops, "", nil)
c.Check(err, Equals, txn.ErrChaos)
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101)
}

func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
// This test ensures that PurgeMissing can handle very large
// txn-queue fields. Previous iterations of PurgeMissing would
Expand Down

0 comments on commit 73a9463

Please sign in to comment.