Skip to content

Commit

Permalink
Merge branch 'v0.34.x-celestia' into sanaz/bp/v0.34.x-celestia/pr-1289
Browse files Browse the repository at this point in the history
  • Loading branch information
staheri14 authored Apr 23, 2024
2 parents 5dab5b2 + ca8bd53 commit e0cab11
Show file tree
Hide file tree
Showing 34 changed files with 1,287 additions and 943 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ linters:
enable:
- asciicheck
- bodyclose
- depguard
- dogsled
- dupl
- errcheck
Expand Down
12 changes: 6 additions & 6 deletions cmd/cometbft/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func AddNodeFlags(cmd *cobra.Command) {
"database directory")

cmd.PersistentFlags().String(
trace.FlagInfluxDBURL,
config.Instrumentation.InfluxURL,
trace.FlagInfluxDBURLDescription,
trace.FlagTracePushConfig,
config.Instrumentation.TracePushConfig,
trace.FlagTracePushConfigDescription,
)

cmd.PersistentFlags().String(
trace.FlagInfluxDBToken,
config.Instrumentation.InfluxToken,
trace.FlagInfluxDBTokenDescription,
trace.FlagTracePullAddress,
config.Instrumentation.TracePullAddress,
trace.FlagTracePullAddressDescription,
)

cmd.PersistentFlags().String(
Expand Down
55 changes: 26 additions & 29 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ var (
minSubscriptionBufferSize = 100
defaultSubscriptionBufferSize = 200

// DefaultInfluxTables is a list of tables that are used for storing traces.
// DefaultTracingTables is a list of tables that are used for storing traces.
// This global var is filled by an init function in the schema package. This
// allows for the schema package to contain all the relevant logic while
// avoiding import cycles.
DefaultInfluxTables = []string{}
DefaultTracingTables = ""
)

// Config defines the top level configuration for a CometBFT node
Expand Down Expand Up @@ -1188,24 +1188,24 @@ type InstrumentationConfig struct {
// Instrumentation namespace.
Namespace string `mapstructure:"namespace"`

// InfluxURL is the influxdb url.
InfluxURL string `mapstructure:"influx_url"`
// TracePushConfig is the relative path of the push config. This second
// config contains credentials for where and how often to.
TracePushConfig string `mapstructure:"trace_push_config"`

// InfluxToken is the influxdb token.
InfluxToken string `mapstructure:"influx_token"`
// TracePullAddress is the address that the trace server will listen on for
// pulling data.
TracePullAddress string `mapstructure:"trace_pull_address"`

// InfluxOrg is the influxdb organization.
InfluxOrg string `mapstructure:"influx_org"`
// TraceType is the type of tracer used. Options are "local" and "noop".
TraceType string `mapstructure:"trace_type"`

// InfluxBucket is the influxdb bucket.
InfluxBucket string `mapstructure:"influx_bucket"`
// TraceBufferSize is the number of traces to write in a single batch.
TraceBufferSize int `mapstructure:"trace_push_batch_size"`

// InfluxBatchSize is the number of points to write in a single batch.
InfluxBatchSize int `mapstructure:"influx_batch_size"`

// InfluxTables is the list of tables that will be traced. See the
// pkg/trace/schema for a complete list of tables.
InfluxTables []string `mapstructure:"influx_tables"`
// TracingTables is the list of tables that will be traced. See the
// pkg/trace/schema for a complete list of tables. It is represented as a
// comma separate string. For example: "consensus_round_state,mempool_tx".
TracingTables string `mapstructure:"tracing_tables"`

// PyroscopeURL is the pyroscope url used to establish a connection with a
// pyroscope continuous profiling server.
Expand All @@ -1229,11 +1229,11 @@ func DefaultInstrumentationConfig() *InstrumentationConfig {
PrometheusListenAddr: ":26660",
MaxOpenConnections: 3,
Namespace: "cometbft",
InfluxURL: "",
InfluxOrg: "celestia",
InfluxBucket: "e2e",
InfluxBatchSize: 20,
InfluxTables: DefaultInfluxTables,
TracePushConfig: "",
TracePullAddress: "",
TraceType: "noop",
TraceBufferSize: 1000,
TracingTables: DefaultTracingTables,
PyroscopeURL: "",
PyroscopeTrace: false,
PyroscopeProfileTypes: []string{
Expand Down Expand Up @@ -1264,21 +1264,18 @@ func (cfg *InstrumentationConfig) ValidateBasic() error {
if cfg.PyroscopeTrace && cfg.PyroscopeURL == "" {
return errors.New("pyroscope_trace can't be enabled if profiling is disabled")
}
// if there is not InfluxURL configured, then we do not need to validate the rest
// if there is not TracePushConfig configured, then we do not need to validate the rest
// of the config because we are not connecting.
if cfg.InfluxURL == "" {
if cfg.TracePushConfig == "" {
return nil
}
if cfg.InfluxToken == "" {
if cfg.TracePullAddress == "" {
return fmt.Errorf("token is required")
}
if cfg.InfluxOrg == "" {
if cfg.TraceType == "" {
return fmt.Errorf("org is required")
}
if cfg.InfluxBucket == "" {
return fmt.Errorf("bucket is required")
}
if cfg.InfluxBatchSize <= 0 {
if cfg.TraceBufferSize <= 0 {
return fmt.Errorf("batch size must be greater than 0")
}
return nil
Expand Down
27 changes: 14 additions & 13 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,25 +547,26 @@ max_open_connections = {{ .Instrumentation.MaxOpenConnections }}
# Instrumentation namespace
namespace = "{{ .Instrumentation.Namespace }}"
# The URL of the influxdb instance to use for remote event
# collection. If empty, remote event collection is disabled.
influx_url = "{{ .Instrumentation.InfluxURL }}"
# TracePushConfig is the relative path of the push config.
# This second config contains credentials for where and how often to
# push trace data to. For example, if the config is next to this config,
# it would be "push_config.json".
trace_push_config = "{{ .Instrumentation.TracePushConfig }}"
# The influxdb token to use for remote event collection.
influx_token = "{{ .Instrumentation.InfluxToken }}"
# The tracer pull address specifies which address will be used for pull based
# event collection. If empty, the pull based server will not be started.
trace_pull_address = "{{ .Instrumentation.TracePullAddress }}"
# The influxdb bucket to use for remote event collection.
influx_bucket = "{{ .Instrumentation.InfluxBucket }}"
# The influxdb org to use for event remote collection.
influx_org = "{{ .Instrumentation.InfluxOrg }}"
# The tracer to use for collecting trace data.
trace_type = "{{ .Instrumentation.TraceType }}"
# The size of the batches that are sent to the database.
influx_batch_size = {{ .Instrumentation.InfluxBatchSize }}
trace_push_batch_size = {{ .Instrumentation.TraceBufferSize }}
# The list of tables that are updated when tracing. All available tables and
# their schema can be found in the pkg/trace/schema package.
influx_tables = [{{ range .Instrumentation.InfluxTables }}{{ printf "%q, " . }}{{end}}]
# their schema can be found in the pkg/trace/schema package. It is represented as a
# comma separate string. For example: "consensus_round_state,mempool_tx".
tracing_tables = "{{ .Instrumentation.TracingTables }}"
# The URL of the pyroscope instance to use for continuous profiling.
# If empty, continuous profiling is disabled.
Expand Down
16 changes: 8 additions & 8 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Reactor struct {
rs *cstypes.RoundState

Metrics *Metrics
traceClient *trace.Client
traceClient trace.Tracer
}

type ReactorOption func(*Reactor)
Expand All @@ -63,7 +63,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
traceClient: &trace.Client{},
traceClient: trace.NoOpTracer(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

Expand Down Expand Up @@ -338,7 +338,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.Download)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand All @@ -357,7 +357,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()

schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.TransferTypeDownload)
schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.Download)

ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
Expand Down Expand Up @@ -599,7 +599,7 @@ OUTER_LOOP:
Part: *parts,
},
}, logger) {
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload)
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.Upload)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
Expand Down Expand Up @@ -783,7 +783,7 @@ OUTER_LOOP:
if vote != nil {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.TransferTypeUpload)
ps.peer.ID(), schema.Upload)
continue OUTER_LOOP
}
}
Expand Down Expand Up @@ -812,7 +812,7 @@ func (conR *Reactor) pickSendVoteAndTrace(votes types.VoteSetReader, rs *cstypes
vote := ps.PickSendVote(votes)
if vote != nil { // if a vote is sent, trace it
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.TransferTypeUpload)
ps.peer.ID(), schema.Upload)
return true
}
return false
Expand Down Expand Up @@ -1046,7 +1046,7 @@ func ReactorMetrics(metrics *Metrics) ReactorOption {
return func(conR *Reactor) { conR.Metrics = metrics }
}

func ReactorTracing(traceClient *trace.Client) ReactorOption {
func ReactorTracing(traceClient trace.Tracer) ReactorOption {
return func(conR *Reactor) { conR.traceClient = traceClient }
}

Expand Down
8 changes: 4 additions & 4 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type State struct {
// for reporting metrics
metrics *Metrics

traceClient *trace.Client
traceClient trace.Tracer
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -174,7 +174,7 @@ func NewState(
evpool: evpool,
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
traceClient: &trace.Client{},
traceClient: trace.NoOpTracer(),
}

// set function defaults (may be overwritten before calling Start)
Expand Down Expand Up @@ -217,7 +217,7 @@ func StateMetrics(metrics *Metrics) StateOption {
}

// SetTraceClient sets the remote event collector.
func SetTraceClient(ec *trace.Client) StateOption {
func SetTraceClient(ec trace.Tracer) StateOption {
return func(cs *State) { cs.traceClient = ec }
}

Expand Down Expand Up @@ -1845,7 +1845,7 @@ func (cs *State) recordMetrics(height int64, block *types.Block) {
blockSize := block.Size()

// trace some metadata about the block
schema.WriteBlock(cs.traceClient, block, blockSize)
schema.WriteBlockSummary(cs.traceClient, block, blockSize)

cs.metrics.NumTxs.Set(float64(len(block.Data.Txs)))
cs.metrics.TotalTxs.Add(float64(len(block.Data.Txs)))
Expand Down
Loading

0 comments on commit e0cab11

Please sign in to comment.