Skip to content

Commit

Permalink
Avoid global logger in events (#6258)
Browse files Browse the repository at this point in the history
## Motivation

Follow up to this discussion: #6252 (comment)

Removal of the global logger from (most of) the event package.
  • Loading branch information
fasmat committed Aug 16, 2024
1 parent 5bedb4e commit d099983
Show file tree
Hide file tree
Showing 36 changed files with 265 additions and 325 deletions.
8 changes: 7 additions & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,13 @@ func (h *HandlerV1) processATX(
return nil, fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err)
}

events.ReportNewActivation(atx)
if err := events.ReportNewActivation(atx); err != nil {
h.logger.Error("failed to emit activation",
log.ZShortStringer("atx_id", atx.ID()),
zap.Uint32("epoch", atx.PublishEpoch.Uint32()),
zap.Error(err),
)
}
h.logger.Debug("new atx",
log.ZContext(ctx),
zap.Inline(atx),
Expand Down
8 changes: 7 additions & 1 deletion activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,13 @@ func (h *HandlerV2) processATX(
return fmt.Errorf("cannot store atx %s: %w", atx.ShortString(), err)
}

events.ReportNewActivation(atx)
if err := events.ReportNewActivation(atx); err != nil {
h.logger.Error("failed to emit activation",
log.ZShortStringer("atx_id", atx.ID()),
zap.Uint32("epoch", atx.PublishEpoch.Uint32()),
zap.Error(err),
)
}
h.logger.Debug("new atx", log.ZContext(ctx), zap.Inline(atx))
return err
}
Expand Down
32 changes: 26 additions & 6 deletions api/grpcserver/globalstate_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *GlobalStateService) Account(ctx context.Context, in *pb.AccountRequest)
}

ctxzap.Debug(ctx, "GRPC GlobalStateService.Account",
addr.Field().Zap(),
zap.Stringer("address", addr),
zap.Uint64("balance", acct.StateCurrent.Balance.Value),
zap.Uint64("counter", acct.StateCurrent.Counter),
zap.Uint64("balance projected", acct.StateProjected.Balance.Value),
Expand Down Expand Up @@ -240,12 +240,20 @@ func (s *GlobalStateService) AccountDataStream(
rewardsBufFull <-chan struct{}
)
if filterAccount {
if accountSubscription := events.SubscribeAccount(); accountSubscription != nil {
accountSubscription, err := events.SubscribeAccount()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to account events: %v", err)
}
if accountSubscription != nil {
accountCh, accountBufFull = consumeEvents[events.Account](stream.Context(), accountSubscription)
}
}
if filterReward {
if rewardsSubscription := events.SubscribeRewards(); rewardsSubscription != nil {
rewardsSubscription, err := events.SubscribeRewards()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to rewards events: %v", err)
}
if rewardsSubscription != nil {
rewardsCh, rewardsBufFull = consumeEvents[types.Reward](stream.Context(), rewardsSubscription)
}
}
Expand Down Expand Up @@ -369,20 +377,32 @@ func (s *GlobalStateService) GlobalStateStream(
layersBufFull <-chan struct{}
)
if filterAccount {
if accountSubscription := events.SubscribeAccount(); accountSubscription != nil {
accountSubscription, err := events.SubscribeAccount()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to account events: %v", err)
}
if accountSubscription != nil {
accountCh, accountBufFull = consumeEvents[events.Account](stream.Context(), accountSubscription)
}
}
if filterReward {
if rewardsSubscription := events.SubscribeRewards(); rewardsSubscription != nil {
rewardsSubscription, err := events.SubscribeRewards()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to rewards events: %v", err)
}
if rewardsSubscription != nil {
rewardsCh, rewardsBufFull = consumeEvents[types.Reward](stream.Context(), rewardsSubscription)
}
}

if filterState {
// Whenever new state is applied to the mesh, a new layer is reported.
// There is no separate reporting specifically for new state.
if layersSubscription := events.SubscribeLayers(); layersSubscription != nil {
layersSubscription, err := events.SubscribeLayers()
if err != nil {
return status.Errorf(codes.Internal, "error subscribing to layer updates: %v", err)
}
if layersSubscription != nil {
layersCh, layersBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription)
}
}
Expand Down
38 changes: 19 additions & 19 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,7 +1445,7 @@ func TestTransactionService(t *testing.T) {
// Give the server-side time to subscribe to events
time.Sleep(time.Millisecond * 50)

events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))
res, err := stream.Recv()
require.NoError(t, err)
require.Nil(t, res.Transaction)
Expand All @@ -1470,7 +1470,7 @@ func TestTransactionService(t *testing.T) {
// Give the server-side time to subscribe to events
time.Sleep(time.Millisecond * 50)

events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))

// Verify
res, err := stream.Recv()
Expand Down Expand Up @@ -1563,7 +1563,7 @@ func TestTransactionService(t *testing.T) {

// TODO send header after stream has subscribed

events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))

for _, stream := range streams {
res, err := stream.Recv()
Expand Down Expand Up @@ -1593,7 +1593,7 @@ func TestTransactionService(t *testing.T) {
time.Sleep(time.Millisecond * 50)

for range subscriptionChanBufSize * 2 {
events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))
}

for range subscriptionChanBufSize {
Expand Down Expand Up @@ -1691,15 +1691,15 @@ func TestAccountMeshDataStream_comprehensive(t *testing.T) {
time.Sleep(time.Millisecond * 50)

// publish a tx
events.ReportNewTx(0, globalTx)
require.NoError(t, events.ReportNewTx(0, globalTx))
res, err := stream.Recv()
require.NoError(t, err, "got error from stream")
checkAccountMeshDataItemTx(t, res.Datum.Datum)

// test streaming a tx and an atx that are filtered out
// these should not be received
events.ReportNewTx(0, globalTx2)
events.ReportNewActivation(globalAtx2)
require.NoError(t, events.ReportNewTx(0, globalTx2))
require.NoError(t, events.ReportNewActivation(globalAtx2))

_, err = stream.Recv()
require.Error(t, err)
Expand Down Expand Up @@ -1739,29 +1739,29 @@ func TestAccountDataStream_comprehensive(t *testing.T) {
// Give the server-side time to subscribe to events
time.Sleep(time.Millisecond * 50)

events.ReportRewardReceived(types.Reward{
require.NoError(t, events.ReportRewardReceived(types.Reward{
Layer: layerFirst,
TotalReward: rewardAmount,
LayerReward: rewardAmount * 2,
Coinbase: addr1,
SmesherID: rewardSmesherID,
})
}))

res, err := stream.Recv()
require.NoError(t, err)
checkAccountDataItemReward(t, res.Datum.Datum)

// publish an account data update
events.ReportAccountUpdate(addr1)
require.NoError(t, events.ReportAccountUpdate(addr1))

res, err = stream.Recv()
require.NoError(t, err)
checkAccountDataItemAccount(t, res.Datum.Datum)

// test streaming a reward and account update that should be filtered out
// these should not be received
events.ReportAccountUpdate(addr2)
events.ReportRewardReceived(types.Reward{Coinbase: addr2})
require.NoError(t, events.ReportAccountUpdate(addr2))
require.NoError(t, events.ReportRewardReceived(types.Reward{Coinbase: addr2}))

_, err = stream.Recv()
require.Error(t, err)
Expand Down Expand Up @@ -1796,19 +1796,19 @@ func TestGlobalStateStream_comprehensive(t *testing.T) {
time.Sleep(time.Millisecond * 50)

// publish a reward
events.ReportRewardReceived(types.Reward{
require.NoError(t, events.ReportRewardReceived(types.Reward{
Layer: layerFirst,
TotalReward: rewardAmount,
LayerReward: rewardAmount * 2,
Coinbase: addr1,
SmesherID: rewardSmesherID,
})
}))
res, err := stream.Recv()
require.NoError(t, err, "got error from stream")
checkGlobalStateDataReward(t, res.Datum.Datum)

// publish an account data update
events.ReportAccountUpdate(addr1)
require.NoError(t, events.ReportAccountUpdate(addr1))
res, err = stream.Recv()
require.NoError(t, err, "got error from stream")
checkGlobalStateDataAccountWrapper(t, res.Datum.Datum)
Expand All @@ -1817,10 +1817,10 @@ func TestGlobalStateStream_comprehensive(t *testing.T) {
layer, err := meshAPIMock.GetLayer(layerFirst)
require.NoError(t, err)

events.ReportLayerUpdate(events.LayerUpdate{
require.NoError(t, events.ReportLayerUpdate(events.LayerUpdate{
LayerID: layer.Index(),
Status: events.LayerStatusTypeApplied,
})
}))
res, err = stream.Recv()
require.NoError(t, err, "got error from stream")
checkGlobalStateDataGlobalState(t, res.Datum.Datum)
Expand Down Expand Up @@ -1868,10 +1868,10 @@ func TestLayerStream_comprehensive(t *testing.T) {
require.NoError(t, err)

// Act
events.ReportLayerUpdate(events.LayerUpdate{
require.NoError(t, events.ReportLayerUpdate(events.LayerUpdate{
LayerID: layer.Index(),
Status: events.LayerStatusTypeConfirmed,
})
}))

// Verify
res, err := stream.Recv()
Expand Down
34 changes: 27 additions & 7 deletions api/grpcserver/mesh_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (s *MeshService) readLayer(
// internal or an input error? For now, all missing layers produce
// internal errors.
if err != nil {
ctxzap.Error(ctx, "could not read layer from database", layerID.Field().Zap(), zap.Error(err))
ctxzap.Error(ctx, "could not read layer from database", zap.Uint32("lid", layerID.Uint32()), zap.Error(err))
return pbLayer, status.Errorf(codes.Internal, "error reading layer data: %v", err)
} else if block == nil {
return pbLayer, nil
Expand All @@ -305,7 +305,9 @@ func (s *MeshService) readLayer(
// E.g., if this node has not synced/received them yet.
if len(missing) != 0 {
ctxzap.Error(ctx, "could not find transactions from layer",
zap.String("missing", fmt.Sprint(missing)), layerID.Field().Zap())
zap.String("missing", fmt.Sprint(missing)),
zap.Uint32("lid", layerID.Uint32()),
)
return pbLayer, status.Errorf(codes.Internal, "error retrieving tx data")
}

Expand All @@ -325,14 +327,20 @@ func (s *MeshService) readLayer(
// This is expected. We can only retrieve state root for a layer that was applied to state,
// which only happens after it's approved/confirmed.
ctxzap.Debug(ctx, "no state root for layer",
layerID.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err))
zap.Uint32("lid", layerID.Uint32()),
zap.Stringer("status", layerStatus),
zap.Error(err),
)
}
hash, err := s.mesh.MeshHash(layerID)
if err != nil {
// This is expected. We can only retrieve state root for a layer that was applied to state,
// which only happens after it's approved/confirmed.
ctxzap.Debug(ctx, "no mesh hash at layer",
layerID.Field().Zap(), zap.Stringer("status", layerStatus), zap.Error(err))
zap.Uint32("lid", layerID.Uint32()),
zap.Stringer("status", layerStatus),
zap.Error(err),
)
}
pbLayer.Blocks = []*pb.Block{pbBlock}
pbLayer.Hash = hash.Bytes()
Expand Down Expand Up @@ -424,12 +432,20 @@ func (s *MeshService) AccountMeshDataStream(
)

if filterTx {
if txsSubscription := events.SubscribeTxs(); txsSubscription != nil {
txsSubscription, err := events.SubscribeTxs()
if err != nil {
return status.Errorf(codes.Internal, "subscribing to txs failed: %v", err)
}
if txsSubscription != nil {
txCh, txBufFull = consumeEvents[events.Transaction](stream.Context(), txsSubscription)
}
}
if filterActivations {
if activationsSubscription := events.SubscribeActivations(); activationsSubscription != nil {
activationsSubscription, err := events.SubscribeActivations()
if err != nil {
return status.Errorf(codes.Internal, "subscribing to activations failed: %v", err)
}
if activationsSubscription != nil {
activationsCh, activationsBufFull = consumeEvents[events.ActivationTx](
stream.Context(),
activationsSubscription,
Expand Down Expand Up @@ -497,7 +513,11 @@ func (s *MeshService) LayerStream(_ *pb.LayerStreamRequest, stream pb.MeshServic
layersBufFull <-chan struct{}
)

if layersSubscription := events.SubscribeLayers(); layersSubscription != nil {
layersSubscription, err := events.SubscribeLayers()
if err != nil {
return status.Errorf(codes.Internal, "subscribing to layers failed: %v", err)
}
if layersSubscription != nil {
layerCh, layersBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription)
}

Expand Down
12 changes: 10 additions & 2 deletions api/grpcserver/node_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (s *NodeService) StatusStream(_ *pb.StatusStreamRequest, stream pb.NodeServ
statusBufFull <-chan struct{}
)

if statusSubscription := events.SubscribeStatus(); statusSubscription != nil {
statusSubscription, err := events.SubscribeStatus()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to status events: %v", err)
}
if statusSubscription != nil {
statusCh, statusBufFull = consumeEvents[events.Status](stream.Context(), statusSubscription)
}

Expand Down Expand Up @@ -180,7 +184,11 @@ func (s *NodeService) ErrorStream(_ *pb.ErrorStreamRequest, stream pb.NodeServic
errorsBufFull <-chan struct{}
)

if errorsSubscription := events.SubscribeErrors(); errorsSubscription != nil {
errorsSubscription, err := events.SubscribeErrors()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to error events: %v", err)
}
if errorsSubscription != nil {
errorsCh, errorsBufFull = consumeEvents[events.NodeError](stream.Context(), errorsSubscription)
}
if err := stream.SendHeader(metadata.MD{}); err != nil {
Expand Down
18 changes: 13 additions & 5 deletions api/grpcserver/transaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,19 @@ func (s *TransactionService) TransactionsStateStream(
txBufFull, layerBufFull <-chan struct{}
)

if txsSubscription := events.SubscribeTxs(); txsSubscription != nil {
txsSubscription, err := events.SubscribeTxs()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to tx events: %v", err)
}
if txsSubscription != nil {
txCh, txBufFull = consumeEvents[events.Transaction](stream.Context(), txsSubscription)
}

if layersSubscription := events.SubscribeLayers(); layersSubscription != nil {
layersSubscription, err := events.SubscribeLayers()
if err != nil {
return status.Errorf(codes.Internal, "failed to subscribe to layer events: %v", err)
}
if layersSubscription != nil {
layerCh, layerBufFull = consumeEvents[events.LayerUpdate](stream.Context(), layersSubscription)
}

Expand Down Expand Up @@ -265,7 +273,7 @@ func (s *TransactionService) TransactionsStateStream(
ctxzap.Error(
stream.Context(),
"error reading layer data for updated layer",
layer.LayerID.Field().Zap(),
zap.Uint32("lid", layer.LayerID.Uint32()),
zap.Error(err),
)
return status.Error(codes.Internal, "error reading layer data")
Expand Down Expand Up @@ -313,8 +321,8 @@ func (s *TransactionService) TransactionsStateStream(
ctxzap.Error(
stream.Context(),
"could not find transaction from layer",
txid.Field().Zap(),
layer.Field().Zap(),
zap.Stringer("tx_id", txid),
zap.Inline(layer),
zap.Error(err),
)
return status.Error(codes.Internal, "error retrieving tx data")
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/transaction_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestTransactionService_StreamResults(t *testing.T) {

var expect []*types.TransactionWithResult
for _, rst := range streamed {
events.ReportResult(*rst)
require.NoError(t, events.ReportResult(*rst))
if tc.matcher.match(rst) {
expect = append(expect, rst)
}
Expand Down
Loading

0 comments on commit d099983

Please sign in to comment.