Skip to content

Commit

Permalink
This refactors GetOpts so it's no longer used in the storage layer. (#…
Browse files Browse the repository at this point in the history
…1007)

* Add GetOpts to storage API

And move it to storage for consistency. Fix up issues this creates.

* Fixups after merge of multiple tree type code

* Move NewGetOpts to storage where GetOpts is defined

* Fix up doc comments

* Fixup after merge of QueueLeaves API change

* Regenerate storage mocks.

* Fix log_rpc_server test for api change

* Update var declarations

* Lots of moving stuff around again

* Fixup log rpc server tests

* Fix up rpc server tests for log and map

* Fixup memory storage

* Fixup sequencer and dumplib

* Regen storage mocks, fixup admin server, interceptor and tree_gc

* Fixup mysql storage

* Fix up other tools and vmap toy

* Fix incomplete sequencer refactor

* Fix mysql quota test

* Fixup trees test

* Ensure trees in mysql test have HashStrategy set

* Mysql test tweaks

* Fix the readwrite transaction test

Tree state enforcement is not a responsiblility of this code any more.
The caller must get valid trees from AdminService.

* Fix lint issues

And clean up some stuff that's unused now.

* Make the same fix to the map test as the log one

* Fix bad merge

* Fix up memory storage after merge

* Remove unused var

* Move GetOpts back to trees package

It is no longer used by storage.

* Fixup some GetOpts references that were missed

* Fix another merge related issue.

* Latest round of fixes after review.

* Rename Accessor, AccessType -> Operation, OpType

* Rename param to match type.

* Fixup cloudspanner for API changes.

* Switch AddSequencedLeaves to from int64 to *trillian.Tree

* Fix test
  • Loading branch information
Martin2112 authored Mar 1, 2018
1 parent 8837d1d commit 8b6ef37
Show file tree
Hide file tree
Showing 31 changed files with 669 additions and 510 deletions.
50 changes: 26 additions & 24 deletions log/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,27 +284,27 @@ func (s logSequencingTask) update(ctx context.Context, leaves []*trillian.LogLea

// IntegrateBatch wraps up all the operations needed to take a batch of queued
// leaves and integrate them into the tree.
func (s Sequencer) IntegrateBatch(ctx context.Context, logID int64, limit int, guardWindow, maxRootDurationInterval time.Duration) (int, error) {
func (s Sequencer) IntegrateBatch(ctx context.Context, tree *trillian.Tree, limit int, guardWindow, maxRootDurationInterval time.Duration) (int, error) {
start := s.timeSource.Now()
label := strconv.FormatInt(logID, 10)
label := strconv.FormatInt(tree.TreeId, 10)

numLeaves := 0
var newLogRoot *trillian.SignedLogRoot
err := s.logStorage.ReadWriteTransaction(ctx, logID, func(ctx context.Context, tx storage.LogTreeTX) error {
err := s.logStorage.ReadWriteTransaction(ctx, tree, func(ctx context.Context, tx storage.LogTreeTX) error {
stageStart := s.timeSource.Now()
defer seqBatches.Inc(label)
defer func() { seqLatency.Observe(util.SecondsSince(s.timeSource, start), label) }()

// Get the latest known root from storage
currentRoot, err := tx.LatestSignedLogRoot(ctx)
if err != nil {
glog.Warningf("%v: Sequencer failed to get latest root: %v", logID, err)
glog.Warningf("%v: Sequencer failed to get latest root: %v", tree.TreeId, err)
return err
}
seqGetRootLatency.Observe(util.SecondsSince(s.timeSource, stageStart), label)

if currentRoot.RootHash == nil {
glog.Warningf("%v: Fresh log - no previous TreeHeads exist.", logID)
glog.Warningf("%v: Fresh log - no previous TreeHeads exist.", tree.TreeId)
return storage.ErrTreeNeedsInit
}

Expand All @@ -316,7 +316,7 @@ func (s Sequencer) IntegrateBatch(ctx context.Context, logID int64, limit int, g
}
sequencedLeaves, err := st.fetch(ctx, limit, start.Add(-guardWindow))
if err != nil {
glog.Warningf("%v: Sequencer failed to load sequenced batch: %v", logID, err)
glog.Warningf("%v: Sequencer failed to load sequenced batch: %v", tree.TreeId, err)
return err
}
numLeaves = len(sequencedLeaves)
Expand All @@ -328,10 +328,10 @@ func (s Sequencer) IntegrateBatch(ctx context.Context, logID int64, limit int, g
interval := time.Duration(nowNanos - currentRoot.TimestampNanos)
if maxRootDurationInterval == 0 || interval < maxRootDurationInterval {
// We have nothing to integrate into the tree.
glog.V(1).Infof("%v: No leaves sequenced in this signing operation", logID)
glog.V(1).Infof("%v: No leaves sequenced in this signing operation", tree.TreeId)
return nil
}
glog.Infof("%v: Force new root generation as %v since last root", logID, interval)
glog.Infof("%v: Force new root generation as %v since last root", tree.TreeId, interval)
}

stageStart = s.timeSource.Now()
Expand All @@ -348,7 +348,7 @@ func (s Sequencer) IntegrateBatch(ctx context.Context, logID int64, limit int, g
// commit.
newVersion := tx.WriteRevision()
if got, want := newVersion, currentRoot.TreeRevision+int64(1); got != want {
return fmt.Errorf("%v: got writeRevision of %v, but expected %v", logID, got, want)
return fmt.Errorf("%v: got writeRevision of %v, but expected %v", tree.TreeId, got, want)
}

// Collate node updates.
Expand All @@ -370,14 +370,14 @@ func (s Sequencer) IntegrateBatch(ctx context.Context, logID int64, limit int, g
targetNodes, err := s.buildNodesFromNodeMap(nodeMap, newVersion)
if err != nil {
// Probably an internal error with map building, unexpected.
glog.Warningf("%v: Failed to build target nodes in sequencer: %v", logID, err)
glog.Warningf("%v: Failed to build target nodes in sequencer: %v", tree.TreeId, err)
return err
}

// Now insert or update the nodes affected by the above, at the new tree
// version.
if err := tx.SetMerkleNodes(ctx, targetNodes); err != nil {
glog.Warningf("%v: Sequencer failed to set Merkle nodes: %v", logID, err)
glog.Warningf("%v: Sequencer failed to set Merkle nodes: %v", tree.TreeId, err)
return err
}
seqSetNodesLatency.Observe(util.SecondsSince(s.timeSource, stageStart), label)
Expand All @@ -394,13 +394,13 @@ func (s Sequencer) IntegrateBatch(ctx context.Context, logID int64, limit int, g
}
sig, err := s.signer.SignLogRoot(newLogRoot)
if err != nil {
glog.Warningf("%v: signer failed to sign root: %v", logID, err)
glog.Warningf("%v: signer failed to sign root: %v", tree.TreeId, err)
return err
}
newLogRoot.Signature = sig

if err := tx.StoreSignedLogRoot(ctx, *newLogRoot); err != nil {
glog.Warningf("%v: failed to write updated tree root: %v", logID, err)
glog.Warningf("%v: failed to write updated tree root: %v", tree.TreeId, err)
return err
}
seqStoreRootLatency.Observe(util.SecondsSince(s.timeSource, stageStart), label)
Expand All @@ -421,33 +421,35 @@ func (s Sequencer) IntegrateBatch(ctx context.Context, logID int64, limit int, g
if numLeaves > 0 {
tokens := int(float64(numLeaves) * quotaIncreaseFactor())
specs := []quota.Spec{
{Group: quota.Tree, Kind: quota.Read, TreeID: logID},
{Group: quota.Tree, Kind: quota.Write, TreeID: logID},
{Group: quota.Tree, Kind: quota.Read, TreeID: tree.TreeId},
{Group: quota.Tree, Kind: quota.Write, TreeID: tree.TreeId},
{Group: quota.Global, Kind: quota.Read},
{Group: quota.Global, Kind: quota.Write},
}
glog.V(2).Infof("%v: Replenishing %v tokens (numLeaves = %v)", logID, tokens, numLeaves)
glog.V(2).Infof("%v: Replenishing %v tokens (numLeaves = %v)", tree.TreeId, tokens, numLeaves)
err := s.qm.PutTokens(ctx, tokens, specs)
if err != nil {
glog.Warningf("%v: Failed to replenish %v tokens: %v", logID, tokens, err)
glog.Warningf("%v: Failed to replenish %v tokens: %v", tree.TreeId, tokens, err)
}
quota.Metrics.IncReplenished(tokens, specs, err == nil)
}

seqCounter.Add(float64(numLeaves), label)
if newLogRoot != nil {
glog.Infof("%v: sequenced %v leaves, size %v, tree-revision %v", logID, numLeaves, newLogRoot.TreeSize, newLogRoot.TreeRevision)
glog.Infof("%v: sequenced %v leaves, size %v, tree-revision %v", tree.TreeId, numLeaves, newLogRoot.TreeSize, newLogRoot.TreeRevision)
} else {
glog.Errorf("newLogRoot = nil")
}
return numLeaves, nil
}

// SignRoot wraps up all the operations for creating a new log signed root.
func (s Sequencer) SignRoot(ctx context.Context, logID int64) error {
return s.logStorage.ReadWriteTransaction(ctx, logID, func(ctx context.Context, tx storage.LogTreeTX) error {
func (s Sequencer) SignRoot(ctx context.Context, tree *trillian.Tree) error {
return s.logStorage.ReadWriteTransaction(ctx, tree, func(ctx context.Context, tx storage.LogTreeTX) error {
// Get the latest known root from storage
currentRoot, err := tx.LatestSignedLogRoot(ctx)
if err != nil {
glog.Warningf("%v: signer failed to get latest root: %v", logID, err)
glog.Warningf("%v: signer failed to get latest root: %v", tree.TreeId, err)
return err
}

Expand All @@ -466,17 +468,17 @@ func (s Sequencer) SignRoot(ctx context.Context, logID int64) error {
}
sig, err := s.signer.SignLogRoot(newLogRoot)
if err != nil {
glog.Warningf("%v: signer failed to sign root: %v", logID, err)
glog.Warningf("%v: signer failed to sign root: %v", tree.TreeId, err)
return err
}
newLogRoot.Signature = sig

// Store the new root and we're done
if err := tx.StoreSignedLogRoot(ctx, *newLogRoot); err != nil {
glog.Warningf("%v: signer failed to write updated root: %v", logID, err)
glog.Warningf("%v: signer failed to write updated root: %v", tree.TreeId, err)
return err
}
glog.V(2).Infof("%v: new signed root, size %v, tree-revision %v", logID, newLogRoot.TreeSize, newLogRoot.TreeRevision)
glog.V(2).Infof("%v: new signed root, size %v, tree-revision %v", tree.TreeId, newLogRoot.TreeSize, newLogRoot.TreeRevision)

return nil
})
Expand Down
9 changes: 6 additions & 3 deletions log/sequencer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,9 @@ func TestIntegrateBatch(t *testing.T) {
qm.EXPECT().PutTokens(gomock.Any(), test.wantCount, specs).Return(nil)
}
c, ctx := createTestContext(ctrl, test.params)
tree := &trillian.Tree{TreeId: test.params.logID, TreeType: trillian.TreeType_LOG}

got, err := c.sequencer.IntegrateBatch(ctx, test.params.logID, 1, test.guardWindow, test.maxRootDuration)
got, err := c.sequencer.IntegrateBatch(ctx, tree, 1, test.guardWindow, test.maxRootDuration)
if err != nil {
if test.errStr == "" {
t.Errorf("IntegrateBatch(%+v)=%v,%v; want _,nil", test.params, got, err)
Expand Down Expand Up @@ -638,7 +639,8 @@ func TestIntegrateBatch_PutTokens(t *testing.T) {
}

sequencer := NewSequencer(hasher, ts, logStorage, signer, nil /* mf */, qm)
leaves, err := sequencer.IntegrateBatch(ctx, treeID, limit, guardWindow, maxRootDuration)
tree := &trillian.Tree{TreeId: treeID, TreeType: trillian.TreeType_LOG}
leaves, err := sequencer.IntegrateBatch(ctx, tree, limit, guardWindow, maxRootDuration)
if err != nil {
t.Errorf("%v: IntegrateBatch() returned err = %v", test.desc, err)
return
Expand Down Expand Up @@ -764,7 +766,8 @@ func TestSignRoot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
c, ctx := createTestContext(ctrl, test.params)
err := c.sequencer.SignRoot(ctx, test.params.logID)
tree := &trillian.Tree{TreeId: test.params.logID, TreeType: trillian.TreeType_LOG}
err := c.sequencer.SignRoot(ctx, tree)
if test.errStr != "" {
if err == nil {
t.Errorf("SignRoot(%+v)=nil; want error with %q", test.params, test.errStr)
Expand Down
4 changes: 2 additions & 2 deletions quota/mysqlqm/mysql_quota_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func createTree(ctx context.Context, db *sql.DB) (*trillian.Tree, error) {

{
ls := mysql.NewLogStorage(db, nil)
err := ls.ReadWriteTransaction(ctx, tree.TreeId, func(ctx context.Context, tx storage.LogTreeTX) error {
err := ls.ReadWriteTransaction(ctx, tree, func(ctx context.Context, tx storage.LogTreeTX) error {
return tx.StoreSignedLogRoot(ctx, trillian.SignedLogRoot{LogId: tree.TreeId, RootHash: []byte{0}, Signature: &sigpb.DigitallySigned{}})
})
if err != nil {
Expand Down Expand Up @@ -338,7 +338,7 @@ func queueLeaves(ctx context.Context, db *sql.DB, tree *trillian.Tree, firstID,
}

ls := mysql.NewLogStorage(db, nil)
return ls.ReadWriteTransaction(ctx, tree.TreeId, func(ctx context.Context, tx storage.LogTreeTX) error {
return ls.ReadWriteTransaction(ctx, tree, func(ctx context.Context, tx storage.LogTreeTX) error {
_, err := tx.QueueLeaves(ctx, leaves, time.Now())
return err
})
Expand Down
2 changes: 1 addition & 1 deletion server/interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (tp *trillianProcessor) Before(ctx context.Context, req interface{}) (conte

if info.getTree {
tree, err := trees.GetTree(
ctx, tp.parent.admin, info.treeID, trees.NewGetOpts(info.readonly, info.treeTypes...))
ctx, tp.parent.admin, info.treeID, trees.NewGetOpts(trees.Admin, info.readonly, info.treeTypes...))
if err != nil {
incRequestDeniedCounter(badTreeReason, info.treeID, quotaUser)
return ctx, err
Expand Down
64 changes: 46 additions & 18 deletions server/log_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (
const proofMaxBitLen = 64

var (
optsLogRead = trees.NewGetOpts(true, trillian.TreeType_LOG, trillian.TreeType_PREORDERED_LOG)
optsLogWrite = trees.NewGetOpts(false, trillian.TreeType_LOG)
optsPreorderedLogWrite = trees.NewGetOpts(false, trillian.TreeType_PREORDERED_LOG)
optsLogRead = trees.NewGetOpts(trees.Query, true, trillian.TreeType_LOG, trillian.TreeType_PREORDERED_LOG)
optsLogWrite = trees.NewGetOpts(trees.Queue, false, trillian.TreeType_LOG)
optsPreorderedLogWrite = trees.NewGetOpts(trees.Queue, false, trillian.TreeType_PREORDERED_LOG)
)

// TrillianLogRPCServer implements the RPC API defined in the proto
Expand Down Expand Up @@ -124,7 +124,7 @@ func (t *TrillianLogRPCServer) QueueLeaves(ctx context.Context, req *trillian.Qu
return nil, err
}

ret, err := t.registry.LogStorage.QueueLeaves(ctx, logID, req.Leaves, t.timeSource.Now())
ret, err := t.registry.LogStorage.QueueLeaves(ctx, tree, req.Leaves, t.timeSource.Now())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,7 +181,7 @@ func (t *TrillianLogRPCServer) AddSequencedLeaves(ctx context.Context, req *tril
}

ctx = trees.NewContext(ctx, tree)
leaves, err := t.registry.LogStorage.AddSequencedLeaves(ctx, tree.TreeId, req.Leaves)
leaves, err := t.registry.LogStorage.AddSequencedLeaves(ctx, tree, req.Leaves)
if err != nil {
return nil, err
}
Expand All @@ -208,7 +208,7 @@ func (t *TrillianLogRPCServer) GetInclusionProof(ctx context.Context, req *trill

// Next we need to make sure the requested tree size corresponds to an STH, so that we
// have a usable tree revision
tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func (t *TrillianLogRPCServer) GetInclusionProofByHash(ctx context.Context, req

// Next we need to make sure the requested tree size corresponds to an STH, so that we
// have a usable tree revision
tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func (t *TrillianLogRPCServer) GetConsistencyProof(ctx context.Context, req *tri
}
ctx = trees.NewContext(ctx, tree)

tx, err := t.prepareReadOnlyStorageTx(ctx, logID)
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -336,7 +336,11 @@ func (t *TrillianLogRPCServer) GetConsistencyProof(ctx context.Context, req *tri
// GetLatestSignedLogRoot obtains the latest published tree root for the Merkle Tree that
// underlies the log.
func (t *TrillianLogRPCServer) GetLatestSignedLogRoot(ctx context.Context, req *trillian.GetLatestSignedLogRootRequest) (*trillian.GetLatestSignedLogRootResponse, error) {
tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tree, ctx, err := t.getTreeAndContext(ctx, req.LogId, optsLogRead)
if err != nil {
return nil, err
}
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand All @@ -357,7 +361,11 @@ func (t *TrillianLogRPCServer) GetLatestSignedLogRoot(ctx context.Context, req *
// GetSequencedLeafCount returns the number of leaves that have been integrated into the Merkle
// Tree. This can be zero for a log containing no entries.
func (t *TrillianLogRPCServer) GetSequencedLeafCount(ctx context.Context, req *trillian.GetSequencedLeafCountRequest) (*trillian.GetSequencedLeafCountResponse, error) {
tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tree, ctx, err := t.getTreeAndContext(ctx, req.LogId, optsLogRead)
if err != nil {
return nil, err
}
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand All @@ -384,7 +392,11 @@ func (t *TrillianLogRPCServer) GetLeavesByIndex(ctx context.Context, req *trilli
return nil, err
}

tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tree, ctx, err := t.getTreeAndContext(ctx, req.LogId, optsLogRead)
if err != nil {
return nil, err
}
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand All @@ -410,7 +422,11 @@ func (t *TrillianLogRPCServer) GetLeavesByRange(ctx context.Context, req *trilli
return nil, err
}

tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tree, ctx, err := t.getTreeAndContext(ctx, req.LogId, optsLogRead)
if err != nil {
return nil, err
}
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand All @@ -436,7 +452,11 @@ func (t *TrillianLogRPCServer) GetLeavesByHash(ctx context.Context, req *trillia
return nil, err
}

tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tree, ctx, err := t.getTreeAndContext(ctx, req.LogId, optsLogRead)
if err != nil {
return nil, err
}
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -472,7 +492,7 @@ func (t *TrillianLogRPCServer) GetEntryAndProof(ctx context.Context, req *trilli

// Next we need to make sure the requested tree size corresponds to an STH, so that we
// have a usable tree revision
tx, err := t.prepareReadOnlyStorageTx(ctx, req.LogId)
tx, err := t.prepareReadOnlyStorageTx(ctx, tree)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -509,8 +529,9 @@ func (t *TrillianLogRPCServer) GetEntryAndProof(ctx context.Context, req *trilli
}, nil
}

func (t *TrillianLogRPCServer) prepareReadOnlyStorageTx(ctx context.Context, treeID int64) (storage.ReadOnlyLogTreeTX, error) {
tx, err := t.registry.LogStorage.SnapshotForTree(ctx, treeID)
// TODO(Martin2112): Clean this up.
func (t *TrillianLogRPCServer) prepareReadOnlyStorageTx(ctx context.Context, tree *trillian.Tree) (storage.ReadOnlyLogTreeTX, error) {
tx, err := t.registry.LogStorage.SnapshotForTree(ctx, tree)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -554,6 +575,14 @@ func (t *TrillianLogRPCServer) getTreeAndHasher(
return tree, hasher, nil
}

func (t *TrillianLogRPCServer) getTreeAndContext(ctx context.Context, treeID int64, opts trees.GetOpts) (*trillian.Tree, context.Context, error) {
tree, err := trees.GetTree(ctx, t.registry.AdminStorage, treeID, opts)
if err != nil {
return nil, nil, err
}
return tree, trees.NewContext(ctx, tree), nil
}

// InitLog initialises a freshly created Log by creating the first STH with
// size 0.
//
Expand All @@ -567,7 +596,7 @@ func (t *TrillianLogRPCServer) InitLog(ctx context.Context, req *trillian.InitLo
}

var newRoot *trillian.SignedLogRoot
err = t.registry.LogStorage.ReadWriteTransaction(ctx, logID, func(ctx context.Context, tx storage.LogTreeTX) error {
err = t.registry.LogStorage.ReadWriteTransaction(ctx, tree, func(ctx context.Context, tx storage.LogTreeTX) error {
newRoot = nil

latestRoot, err := tx.LatestSignedLogRoot(ctx)
Expand Down Expand Up @@ -612,5 +641,4 @@ func (t *TrillianLogRPCServer) InitLog(ctx context.Context, req *trillian.InitLo
return &trillian.InitLogResponse{
Created: newRoot,
}, nil

}
Loading

0 comments on commit 8b6ef37

Please sign in to comment.