Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set an upper limit of how large we will let txn-queues grow. #463

Open
wants to merge 5 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,21 @@ NextDoc:
change.Upsert = false
chaos("")
if _, err := cquery.Apply(change, &info); err == nil {
if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength {
// txn-queue is too long, abort this transaction. abortOrReload will pull the tokens from
// all of the docs that we've touched so far.
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
revnos := assembledRevnos(t.Ops, revno)
pull := map[bson.ObjectId]*transaction{t.Id: t}
err := f.abortOrReload(t, revnos, pull)
if err == nil {
// If we managed to abort the transaction, report on the bad data
return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)",
dkey.Id, dkey.C, len(info.Queue))
}
return nil, err
}
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
Expand Down Expand Up @@ -610,8 +625,8 @@ func (f *flusher) assert(t *transaction, revnos []int64, pull map[bson.ObjectId]

func (f *flusher) abortOrReload(t *transaction, revnos []int64, pull map[bson.ObjectId]*transaction) (err error) {
f.debugf("Aborting or reloading %s (was %q)", t, t.State)
if t.State == tprepared {
qdoc := bson.D{{"_id", t.Id}, {"s", tprepared}}
if t.State == tprepared || t.State == tpreparing {
qdoc := bson.D{{"_id", t.Id}, {"s", t.State}}
udoc := bson.D{{"$set", bson.D{{"s", taborting}}}}
chaos("set-aborting")
if err = f.tc.Update(qdoc, udoc); err == nil {
Expand Down
Empty file removed txn/output.txt
Empty file.
40 changes: 36 additions & 4 deletions txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,14 @@ const (
// A Runner applies operations as part of a transaction onto any number
// of collections within a database. See the Run method for details.
type Runner struct {
tc *mgo.Collection // txns
sc *mgo.Collection // stash
lc *mgo.Collection // log
tc *mgo.Collection // txns
sc *mgo.Collection // stash
lc *mgo.Collection // log
opts RunnerOptions // runtime options
}

const defaultMaxTxnQueueLength = 1000

// NewRunner returns a new transaction runner that uses tc to hold its
// transactions.
//
Expand All @@ -232,7 +235,36 @@ type Runner struct {
// will be used for implementing the transactional behavior of insert
// and remove operations.
func NewRunner(tc *mgo.Collection) *Runner {
return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil}
return &Runner{
tc: tc,
sc: tc.Database.C(tc.Name + ".stash"),
lc: nil,
opts: DefaultRunnerOptions(),
}
}

// RunnerOptions encapsulates ways you can tweak transaction Runner behavior.
type RunnerOptions struct {
// MaxTxnQueueLength is a way to limit bad behavior. Many operations on
// transaction queues are O(N^2), and transaction queues growing too large
// are usually indicative of a bug in behavior. This should be larger
// than the maximum number of concurrent operations to a single document.
// Normal operations are likely to only ever hit 10 or so, we use a default
// maximum length of 1000.
MaxTxnQueueLength int
}

// SetOptions allows people to change some of the internal behavior of a Runner.
func (r *Runner) SetOptions(opts RunnerOptions) {
r.opts = opts
}

// DefaultRunnerOptions defines default behavior for a Runner.
// Users can use the DefaultRunnerOptions to only override specific behavior.
func DefaultRunnerOptions() RunnerOptions {
return RunnerOptions{
MaxTxnQueueLength: defaultMaxTxnQueueLength,
}
}

var ErrAborted = fmt.Errorf("transaction aborted")
Expand Down
156 changes: 156 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,162 @@ func (s *S) TestTxnQueueStashStressTest(c *C) {
}
}

func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) {
txn.SetDebug(false)
txn.SetChaos(txn.Chaos{
KillChance: 1,
Breakpoint: "set-applying",
})
defer txn.SetChaos(txn.Chaos{})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
for i := 0; i < expectedQueueLength; i++ {
err := s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetDebug(true)
// Now that we've filled up the queue, we should see that there are 1000
// items in the queue, and the error applying a new one will change.
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
err = s.runner.Run(ops, "", nil)
c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`)
// The txn-queue should not have grown
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength)
}

func (s *S) TestTxnQueueDefaultMaxSize(c *C) {
s.runner.SetOptions(txn.DefaultRunnerOptions())
s.checkTxnQueueLength(c, 1000)
}

func (s *S) TestTxnQueueCustomMaxSize(c *C) {
opts := txn.DefaultRunnerOptions()
opts.MaxTxnQueueLength = 100
s.runner.SetOptions(opts)
s.checkTxnQueueLength(c, 100)
}

func (s *S) TestTxnQueueMultipleDocs(c *C) {
expectedLength := 100
maxDocs := 110
opts := txn.DefaultRunnerOptions()
opts.MaxTxnQueueLength = expectedLength
s.runner.SetOptions(opts)
txn.SetDebug(false)
createOps := []txn.Op{{
C: "accounts",
Id: 0,
Insert: M{"balance": 1000},
}}
for i := 1; i < maxDocs; i++ {
createOps = append(createOps, txn.Op{
C: "accounts",
Id: i,
Insert: M{"balance": 0},
})
}
err := s.runner.Run(createOps, "", nil)
c.Assert(err, IsNil)
// Force a bad transaction into the queue
badTxnId := "deadbeef1234567812345678_12345678"
err = s.accounts.UpdateId(0, M{"$set": M{"txn-queue": []string{badTxnId}}})
c.Assert(err, IsNil)
for i := 1; i < expectedLength; i++ {
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": -1}},
}, {
C: "accounts",
Id: i,
Update: M{"$inc": M{"balance": 1}},
}}
err = s.runner.Run(ops, "", nil)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, `cannot find transaction ObjectIdHex."deadbeef1234567812345678".`)
}
// Now that we've filled up the txn-queue of the first document, any
// further changes should be aborted
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedLength)
txn.SetDebug(true)
for i := 100; i < maxDocs; i++ {
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": -1}},
}, {
C: "accounts",
Id: i,
Update: M{"$inc": M{"balance": 1}},
}}
err = s.runner.Run(ops, "", nil)
c.Assert(err, NotNil)
c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`)
}
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedLength)
err = s.accounts.UpdateId(0, M{"$pullAll": M{"txn-queue": []string{badTxnId}}})
c.Assert(err, IsNil)
c.Log("Updated removing the invalid transaction")
// Now we should be able to cleanup
err = s.runner.ResumeAll()
c.Assert(err, IsNil)
c.Log("resumed all")
}

func (s *S) TestTxnQueueUnlimited(c *C) {
opts := txn.DefaultRunnerOptions()
// A value of 0 should mean 'unlimited'
opts.MaxTxnQueueLength = 0
s.runner.SetOptions(opts)
// it isn't possible to actually prove 'unlimited' but we can prove that
// we at least can insert more than the default number of transactions
// without getting a 'too many transactions' failure.
txn.SetDebug(false)
txn.SetChaos(txn.Chaos{
KillChance: 1,
// Use set-prepared because we are adding more transactions than
// other tests, and this speeds up setup time a bit
Breakpoint: "set-prepared",
})
defer txn.SetChaos(txn.Chaos{})
err := s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
for i := 0; i < 1100; i++ {
err := s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
txn.SetDebug(true)
var doc bson.M
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100)
err = s.runner.Run(ops, "", nil)
c.Check(err, Equals, txn.ErrChaos)
err = s.accounts.FindId(0).One(&doc)
c.Assert(err, IsNil)
c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101)
}

func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
// This test ensures that PurgeMissing can handle very large
// txn-queue fields. Previous iterations of PurgeMissing would
Expand Down