From f9d84591dfc2173d1734161ffe823e3bd098fb25 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 4 Jul 2017 12:54:20 +0400 Subject: [PATCH 1/3] Set an upper limit of how large we will let txn-queues grow. When we have broken transaction data in the database (such as from mongo getting OOM killed), it can cause cascade failure, where that document ends up getting too many transactions queued up against it. This can also happen if you have nothing but assert-only transactions against a single document. If we have lots of transactions, it becomes harder and harder to add new entries and clearing out a large queue is O(N^2) which means capping it is worthwhile. (It also makes the document grow until it hits max-doc-size.) The upper bound is still quite large, so it should not be triggered if everything is operating normally. --- txn/flusher.go | 12 ++++++++++++ txn/txn_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/txn/flusher.go b/txn/flusher.go index f640a4380..473ea600c 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -212,6 +212,8 @@ var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {" var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false") +const maxTxnQueueLength = 1000 + // prepare injects t's id onto txn-queue for all affected documents // and collects the current txn-queue and txn-revno values during // the process. If the prepared txn-queue indicates that there are @@ -244,6 +246,16 @@ NextDoc: change.Upsert = false chaos("") if _, err := cquery.Apply(change, &info); err == nil { + if len(info.Queue) > maxTxnQueueLength { + // abort with TXN Queue too long, but remove the entry we just added + innerErr := c.UpdateId(dkey.Id, + bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}}) + if innerErr != nil { + f.debugf("error while backing out of queue-too-long: %v", innerErr) + } + return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)", + dkey.Id, dkey.C, len(info.Queue)) + } if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno diff --git a/txn/txn_test.go b/txn/txn_test.go index 12923ca12..40e897a8b 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -621,6 +621,39 @@ func (s *S) TestTxnQueueStashStressTest(c *C) { } } +func (s *S) TestTxnQueueMaxSize(c *C) { + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + Breakpoint: "set-applying", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < 1000; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + // Now that we've filled up the queue, we should see that there are 1000 + // items in the queue, and the error applying a new one will change. + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) + err = s.runner.Run(ops, "", nil) + c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(1001\)`) + // The txn-queue should not have grown + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) +} + func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { // This test ensures that PurgeMissing can handle very large // txn-queue fields. Previous iterations of PurgeMissing would From f89b2fc02022bf49ce17983d682715ba0f6a7fb5 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Wed, 5 Jul 2017 12:37:59 +0400 Subject: [PATCH 2/3] Add Runner.SetOptions to control maximum queue length. Still defaults to 1000 without any other configuration, but allows callers to know that they can be stricter/less strict. --- txn/flusher.go | 4 +--- txn/txn.go | 40 ++++++++++++++++++++++++++++---- txn/txn_test.go | 61 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index 473ea600c..5643ea8d2 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -212,8 +212,6 @@ var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {" var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false") -const maxTxnQueueLength = 1000 - // prepare injects t's id onto txn-queue for all affected documents // and collects the current txn-queue and txn-revno values during // the process. If the prepared txn-queue indicates that there are @@ -246,7 +244,7 @@ NextDoc: change.Upsert = false chaos("") if _, err := cquery.Apply(change, &info); err == nil { - if len(info.Queue) > maxTxnQueueLength { + if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength { // abort with TXN Queue too long, but remove the entry we just added innerErr := c.UpdateId(dkey.Id, bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}}) diff --git a/txn/txn.go b/txn/txn.go index 204b3cf1d..8ff42c4d2 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -216,11 +216,14 @@ const ( // A Runner applies operations as part of a transaction onto any number // of collections within a database. See the Run method for details. type Runner struct { - tc *mgo.Collection // txns - sc *mgo.Collection // stash - lc *mgo.Collection // log + tc *mgo.Collection // txns + sc *mgo.Collection // stash + lc *mgo.Collection // log + opts RunnerOptions // runtime options } +const defaultMaxTxnQueueLength = 1000 + // NewRunner returns a new transaction runner that uses tc to hold its // transactions. // @@ -232,7 +235,36 @@ type Runner struct { // will be used for implementing the transactional behavior of insert // and remove operations. func NewRunner(tc *mgo.Collection) *Runner { - return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil} + return &Runner{ + tc: tc, + sc: tc.Database.C(tc.Name + ".stash"), + lc: nil, + opts: DefaultRunnerOptions(), + } +} + +// RunnerOptions encapsulates ways you can tweak transaction Runner behavior. +type RunnerOptions struct { + // MaxTxnQueueLength is a way to limit bad behavior. Many operations on + // transaction queues are O(N^2), and transaction queues growing too large + // are usually indicative of a bug in behavior. This should be larger + // than the maximum number of concurrent operations to a single document. + // Normal operations are likely to only ever hit 10 or so, we use a default + // maximum length of 1000. + MaxTxnQueueLength int +} + +// SetOptions allows people to change some of the internal behavior of a Runner. +func (r *Runner) SetOptions(opts RunnerOptions) { + r.opts = opts +} + +// DefaultRunnerOptions defines default behavior for a Runner. +// Users can use the DefaultRunnerOptions to only override specific behavior. +func DefaultRunnerOptions() RunnerOptions { + return RunnerOptions{ + MaxTxnQueueLength: defaultMaxTxnQueueLength, + } } var ErrAborted = fmt.Errorf("transaction aborted") diff --git a/txn/txn_test.go b/txn/txn_test.go index 40e897a8b..1dca1720c 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -621,7 +621,7 @@ func (s *S) TestTxnQueueStashStressTest(c *C) { } } -func (s *S) TestTxnQueueMaxSize(c *C) { +func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) { txn.SetDebug(false) txn.SetChaos(txn.Chaos{ KillChance: 1, @@ -635,7 +635,7 @@ func (s *S) TestTxnQueueMaxSize(c *C) { Id: 0, Update: M{"$inc": M{"balance": 100}}, }} - for i := 0; i < 1000; i++ { + for i := 0; i < expectedQueueLength; i++ { err := s.runner.Run(ops, "", nil) c.Assert(err, Equals, txn.ErrChaos) } @@ -645,13 +645,64 @@ func (s *S) TestTxnQueueMaxSize(c *C) { var doc bson.M err = s.accounts.FindId(0).One(&doc) c.Assert(err, IsNil) - c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) err = s.runner.Run(ops, "", nil) - c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(1001\)`) + c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`) // The txn-queue should not have grown err = s.accounts.FindId(0).One(&doc) c.Assert(err, IsNil) - c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) +} + +func (s *S) TestTxnQueueDefaultMaxSize(c *C) { + s.runner.SetOptions(txn.DefaultRunnerOptions()) + s.checkTxnQueueLength(c, 1000) +} + +func (s *S) TestTxnQueueCustomMaxSize(c *C) { + opts := txn.DefaultRunnerOptions() + opts.MaxTxnQueueLength = 100 + s.runner.SetOptions(opts) + s.checkTxnQueueLength(c, 100) +} + +func (s *S) TestTxnQueueUnlimited(c *C) { + opts := txn.DefaultRunnerOptions() + // A value of 0 should mean 'unlimited' + opts.MaxTxnQueueLength = 0 + s.runner.SetOptions(opts) + // it isn't possible to actually prove 'unlimited' but we can prove that + // we at least can insert more than the default number of transactions + // without getting a 'too many transactions' failure. + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + // Use set-prepared because we are adding more transactions than + // other tests, and this speeds up setup time a bit + Breakpoint: "set-prepared", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < 1100; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100) + err = s.runner.Run(ops, "", nil) + c.Check(err, Equals, txn.ErrChaos) + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101) } func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { From 71bfa1c6c4478d7bec1c7a5045f4f7f2166a7a6a Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 5 Jul 2017 11:45:06 +0100 Subject: [PATCH 3/3] Add link to improvement by @jameinel --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b8ba057b9..e3e2247cb 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2)) * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) -* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11)) +* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16)) ---