diff --git a/api/execute_test.go b/api/execute_test.go index e00f8d23..c4994717 100644 --- a/api/execute_test.go +++ b/api/execute_test.go @@ -34,7 +34,7 @@ func TestAPI_Execute(t *testing.T) { node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { res := execute.ResultMap{ - mocks.GenericPeerID: executionResult, + mocks.GenericPeerID: execute.NodeResult{Result: executionResult}, } cluster := execute.Cluster{ @@ -88,7 +88,7 @@ func TestAPI_Execute_HandlesErrors(t *testing.T) { node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { res := execute.ResultMap{ - mocks.GenericPeerID: executionResult, + mocks.GenericPeerID: execute.NodeResult{Result: executionResult}, } return expectedCode, "", res, execute.Cluster{}, mocks.GenericError diff --git a/consensus/pbft/config.go b/consensus/pbft/config.go index 9dcbfeb6..bf881e1d 100644 --- a/consensus/pbft/config.go +++ b/consensus/pbft/config.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" + "github.com/blocklessnetwork/b7s/metadata" "github.com/blocklessnetwork/b7s/models/execute" ) @@ -15,14 +16,16 @@ type Option func(*Config) type PostProcessFunc func(requestID string, origin peer.ID, request execute.Request, result execute.Result) var DefaultConfig = Config{ - NetworkTimeout: NetworkTimeout, - RequestTimeout: RequestTimeout, + NetworkTimeout: NetworkTimeout, + RequestTimeout: RequestTimeout, + MetadataProvider: metadata.NewNoopProvider(), } type Config struct { - PostProcessors []PostProcessFunc // Callback functions to be invoked after execution is done. - NetworkTimeout time.Duration - RequestTimeout time.Duration + PostProcessors []PostProcessFunc // Callback functions to be invoked after execution is done. + NetworkTimeout time.Duration + RequestTimeout time.Duration + MetadataProvider metadata.Provider } // WithNetworkTimeout sets how much time we allow for message sending. @@ -47,3 +50,10 @@ func WithPostProcessors(callbacks ...PostProcessFunc) Option { cfg.PostProcessors = fns } } + +// WithMetadataProvider sets the metadata provider for the node. +func WithMetadataProvider(p metadata.Provider) Option { + return func(cfg *Config) { + cfg.MetadataProvider = p + } +} diff --git a/consensus/pbft/execute.go b/consensus/pbft/execute.go index 11977099..3ff7d22b 100644 --- a/consensus/pbft/execute.go +++ b/consensus/pbft/execute.go @@ -87,11 +87,19 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error { r.lastExecuted = sequence + metadata, err := r.cfg.MetadataProvider.Metadata(request.Execute, res.Result) + if err != nil { + log.Warn().Err(err).Msg("could not get metadata") + } + msg := response.Execute{ Code: res.Code, RequestID: request.ID, Results: execute.ResultMap{ - r.id: res, + r.id: execute.NodeResult{ + Result: res, + Metadata: metadata, + }, }, PBFT: response.PBFTResultInfo{ View: r.view, diff --git a/executor/config.go b/executor/config.go index 16e723f2..8177227c 100644 --- a/executor/config.go +++ b/executor/config.go @@ -8,22 +8,22 @@ import ( // defaultConfig used to create Executor. var defaultConfig = Config{ - WorkDir: "workspace", - RuntimeDir: "", - ExecutableName: blockless.RuntimeCLI(), - FS: afero.NewOsFs(), - Limiter: &noopLimiter{}, + WorkDir: "workspace", + RuntimeDir: "", + ExecutableName: blockless.RuntimeCLI(), + FS: afero.NewOsFs(), + Limiter: &noopLimiter{}, DriversRootPath: "", } // Config represents the Executor configuration. type Config struct { - WorkDir string // directory where files needed for the execution are stored - RuntimeDir string // directory where the executable can be found - ExecutableName string // name for the executable - DriversRootPath string // where are cgi drivers stored - FS afero.Fs // FS accessor - Limiter Limiter // Resource limiter for executed processes + WorkDir string // directory where files needed for the execution are stored + RuntimeDir string // directory where the executable can be found + ExecutableName string // name for the executable + DriversRootPath string // where are cgi drivers stored + FS afero.Fs // FS accessor + Limiter Limiter // Resource limiter for executed processes } type Option func(*Config) diff --git a/executor/execute_function.go b/executor/execute_function.go index ae47e72e..f11c8fe6 100644 --- a/executor/execute_function.go +++ b/executor/execute_function.go @@ -15,20 +15,18 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu if err != nil { res := execute.Result{ - Code: codes.Error, - RequestID: requestID, - Result: out, - Usage: usage, + Code: codes.Error, + Result: out, + Usage: usage, } return res, fmt.Errorf("function execution failed: %w", err) } res := execute.Result{ - Code: codes.OK, - RequestID: requestID, - Result: out, - Usage: usage, + Code: codes.OK, + Result: out, + Usage: usage, } return res, nil diff --git a/executor/executor_integration_test.go b/executor/executor_integration_test.go index 805241ca..3cea854c 100644 --- a/executor/executor_integration_test.go +++ b/executor/executor_integration_test.go @@ -87,7 +87,6 @@ func TestExecutor_Execute(t *testing.T) { // Verify the execution result. require.Equal(t, codes.OK, res.Code) - require.Equal(t, requestID, res.RequestID) require.Equal(t, hash, res.Result.Stdout) // Verify usage info - for now, only that they are non-zero. diff --git a/metadata/metadata.go b/metadata/metadata.go new file mode 100644 index 00000000..dc157394 --- /dev/null +++ b/metadata/metadata.go @@ -0,0 +1,19 @@ +package metadata + +import ( + "github.com/blocklessnetwork/b7s/models/execute" +) + +type Provider interface { + Metadata(execute.Request, execute.RuntimeOutput) (any, error) +} + +type noopProvider struct{} + +func (p noopProvider) Metadata(execute.Request, execute.RuntimeOutput) (any, error) { + return nil, nil +} + +func NewNoopProvider() Provider { + return noopProvider{} +} diff --git a/models/execute/response.go b/models/execute/response.go index 6feddf38..8005b09a 100644 --- a/models/execute/response.go +++ b/models/execute/response.go @@ -9,12 +9,17 @@ import ( "github.com/blocklessnetwork/b7s/models/codes" ) +// NodeResult is an annotated execution result. +type NodeResult struct { + Result + Metadata any `json:"metadata,omitempty"` +} + // Result describes an execution result. type Result struct { - Code codes.Code `json:"code"` - Result RuntimeOutput `json:"result"` - RequestID string `json:"request_id"` - Usage Usage `json:"usage,omitempty"` + Code codes.Code `json:"code"` + Result RuntimeOutput `json:"result"` + Usage Usage `json:"usage,omitempty"` } // Cluster represents the set of peers that executed the request. @@ -40,7 +45,7 @@ type Usage struct { } // ResultMap contains execution results from multiple peers. -type ResultMap map[peer.ID]Result +type ResultMap map[peer.ID]NodeResult // MarshalJSON provides means to correctly handle JSON serialization/deserialization. // See: @@ -49,7 +54,7 @@ type ResultMap map[peer.ID]Result // https://github.com/libp2p/go-libp2p-resource-manager/pull/67#issuecomment-1176820561 func (m ResultMap) MarshalJSON() ([]byte, error) { - em := make(map[string]Result, len(m)) + em := make(map[string]NodeResult, len(m)) for p, v := range m { em[p.String()] = v } diff --git a/models/execute/runtime.go b/models/execute/runtime.go index 1b817e98..bc16e7b6 100644 --- a/models/execute/runtime.go +++ b/models/execute/runtime.go @@ -6,12 +6,12 @@ const ( // RuntimeConfig represents the CLI flags supported by the runtime type BLSRuntimeConfig struct { - Entry string `json:"entry,omitempty"` - ExecutionTime uint64 `json:"run_time,omitempty"` - DebugInfo bool `json:"debug_info,omitempty"` - Fuel uint64 `json:"limited_fuel,omitempty"` - Memory uint64 `json:"limited_memory,omitempty"` - Logger string `json:"runtime_logger,omitempty"` + Entry string `json:"entry,omitempty"` + ExecutionTime uint64 `json:"run_time,omitempty"` + DebugInfo bool `json:"debug_info,omitempty"` + Fuel uint64 `json:"limited_fuel,omitempty"` + Memory uint64 `json:"limited_memory,omitempty"` + Logger string `json:"runtime_logger,omitempty"` DriversRootPath string `json:"drivers_root_path,omitempty"` // Fields not allowed to be set in the request. Input string `json:"-"` @@ -29,5 +29,5 @@ const ( BLSRuntimeFlagLogger = "runtime-logger" BLSRuntimeFlagPermission = "permission" BLSRuntimeFlagEnv = "env" - BLSRuntimeFlagDrivers = "drivers-root-path" + BLSRuntimeFlagDrivers = "drivers-root-path" ) diff --git a/models/response/execute_test.go b/models/response/execute_test.go index f75e9609..18dad754 100644 --- a/models/response/execute_test.go +++ b/models/response/execute_test.go @@ -17,7 +17,7 @@ func TestExecute_Signing(t *testing.T) { RequestID: mocks.GenericUUID.String(), Code: codes.OK, Results: execute.ResultMap{ - mocks.GenericPeerID: mocks.GenericExecutionResult, + mocks.GenericPeerID: execute.NodeResult{Result: mocks.GenericExecutionResult}, }, Cluster: execute.Cluster{ Peers: mocks.GenericPeerIDs[:4], diff --git a/node/aggregate/aggregate.go b/node/aggregate/aggregate.go index 20cce244..4610b784 100644 --- a/node/aggregate/aggregate.go +++ b/node/aggregate/aggregate.go @@ -8,22 +8,6 @@ import ( "github.com/blocklessnetwork/b7s/models/execute" ) -type Results []Result - -// Result represents the execution result along with its aggregation stats. -type Result struct { - Result execute.RuntimeOutput `json:"result,omitempty"` - // Peers that got this result. - Peers []peer.ID `json:"peers,omitempty"` - // How frequent was this result, in percentages. - Frequency float64 `json:"frequency,omitempty"` -} - -type resultStats struct { - seen uint - peers []peer.ID -} - func Aggregate(results execute.ResultMap) Results { total := len(results) @@ -31,22 +15,32 @@ func Aggregate(results execute.ResultMap) Results { return nil } + type resultStats struct { + seen uint + peers []peer.ID + metadata map[peer.ID]any + } + stats := make(map[execute.RuntimeOutput]resultStats) for executingPeer, res := range results { // NOTE: It might make sense to ignore stderr in comparison. - output := res.Result + output := res.Result.Result stat, ok := stats[output] if !ok { - stats[output] = resultStats{ - seen: 0, - peers: make([]peer.ID, 0), + stat = resultStats{ + seen: 0, + peers: make([]peer.ID, 0), + metadata: make(map[peer.ID]any), } } stat.seen++ stat.peers = append(stat.peers, executingPeer) + if res.Metadata != nil { + stat.metadata[executingPeer] = res.Metadata + } stats[output] = stat } @@ -59,6 +53,7 @@ func Aggregate(results execute.ResultMap) Results { Result: res, Peers: stat.peers, Frequency: 100 * float64(stat.seen) / float64(total), + Metadata: stat.metadata, } aggregated = append(aggregated, aggr) diff --git a/node/aggregate/models.go b/node/aggregate/models.go new file mode 100644 index 00000000..2019c67f --- /dev/null +++ b/node/aggregate/models.go @@ -0,0 +1,34 @@ +package aggregate + +import ( + "encoding/json" + + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/blocklessnetwork/b7s/models/execute" +) + +type Results []Result + +// Result represents the execution result along with its aggregation stats. +type Result struct { + Result execute.RuntimeOutput `json:"result,omitempty"` + // Peers that got this result. + Peers []peer.ID `json:"peers,omitempty"` + // Peers metadata + Metadata NodeMetadata `json:"metadata,omitempty"` + // How frequent was this result, in percentages. + Frequency float64 `json:"frequency,omitempty"` +} + +type NodeMetadata map[peer.ID]any + +func (m NodeMetadata) MarshalJSON() ([]byte, error) { + + em := make(map[string]any, len(m)) + for p, v := range m { + em[p.String()] = v + } + + return json.Marshal(em) +} diff --git a/node/cluster_pbft_integration_test.go b/node/cluster_pbft_integration_test.go index 2c7a3501..d03ac586 100644 --- a/node/cluster_pbft_integration_test.go +++ b/node/cluster_pbft_integration_test.go @@ -221,7 +221,7 @@ This is the end of my program for peer, exres := range res.Results { require.Contains(t, workerIDs, peer) - require.Equal(t, expectedExecutionResult, exres.Result.Stdout) + require.Equal(t, expectedExecutionResult, exres.Result.Result.Stdout) } t.Log("client verified execution response") diff --git a/node/config.go b/node/config.go index 9fbbe046..ab5e0f31 100644 --- a/node/config.go +++ b/node/config.go @@ -6,6 +6,7 @@ import ( "time" "github.com/blocklessnetwork/b7s/consensus" + "github.com/blocklessnetwork/b7s/metadata" "github.com/blocklessnetwork/b7s/models/blockless" ) @@ -23,6 +24,7 @@ var DefaultConfig = Config{ ClusterFormationTimeout: DefaultClusterFormationTimeout, DefaultConsensus: DefaultConsensusAlgorithm, LoadAttributes: DefaultAttributeLoadingSetting, + MetadataProvider: metadata.NewNoopProvider(), } // Config represents the Node configuration. @@ -38,6 +40,7 @@ type Config struct { Workspace string // Directory where we can store files needed for execution. DefaultConsensus consensus.Type // Default consensus algorithm to use. LoadAttributes bool // Node should try to load its attributes from IPFS. + MetadataProvider metadata.Provider // Metadata provider for the node } // Validate checks if the given configuration is correct. @@ -153,6 +156,13 @@ func WithAttributeLoading(b bool) Option { } } +// WithMetadataProvider sets the metadata provider for the node. +func WithMetadataProvider(p metadata.Provider) Option { + return func(cfg *Config) { + cfg.MetadataProvider = p + } +} + func (n *Node) isWorker() bool { return n.cfg.Role == blockless.WorkerNode } diff --git a/node/consensus.go b/node/consensus.go index df89ca7d..bfa07834 100644 --- a/node/consensus.go +++ b/node/consensus.go @@ -38,15 +38,23 @@ func (n *Node) createRaftCluster(ctx context.Context, from peer.ID, fc request.F ctx, cancel := context.WithTimeout(context.Background(), consensusClusterSendTimeout) defer cancel() + metadata, err := n.cfg.MetadataProvider.Metadata(req.Execute, res.Result) + if err != nil { + n.log.Warn().Err(err).Msg("could not get metadata") + } + msg := response.Execute{ Code: res.Code, RequestID: req.RequestID, Results: execute.ResultMap{ - n.host.ID(): res, + n.host.ID(): execute.NodeResult{ + Result: res, + Metadata: metadata, + }, }, } - err := n.send(ctx, req.Origin, msg) + err = n.send(ctx, req.Origin, msg) if err != nil { n.log.Error().Err(err).Str("peer", req.Origin.String()).Msg("could not send execution result to node") } @@ -96,6 +104,7 @@ func (n *Node) createPBFTCluster(ctx context.Context, from peer.ID, fc request.F fc.Peers, fc.RequestID, pbft.WithPostProcessors(cacheFn), + pbft.WithMetadataProvider(n.cfg.MetadataProvider), ) if err != nil { return fmt.Errorf("could not create PBFT node: %w", err) diff --git a/node/execute_integration_test.go b/node/execute_integration_test.go index 9d10b560..e423a767 100644 --- a/node/execute_integration_test.go +++ b/node/execute_integration_test.go @@ -180,7 +180,7 @@ This is the end of my program require.Equal(t, codes.OK, res.Code) require.NotEmpty(t, res.RequestID) - require.Equal(t, expectedExecutionResult, res.Results[worker.host.ID()].Result.Stdout) + require.Equal(t, expectedExecutionResult, res.Results[worker.host.ID()].Result.Result.Stdout) t.Log("client verified execution response") diff --git a/node/execute_internal_test.go b/node/execute_internal_test.go index 8f73deeb..a3de46bc 100644 --- a/node/execute_internal_test.go +++ b/node/execute_internal_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" "github.com/blocklessnetwork/b7s/host" @@ -59,7 +58,6 @@ func TestNode_WorkerExecute(t *testing.T) { outRequestID = reqID res := mocks.GenericExecutionResult - res.RequestID = outRequestID return res, nil } @@ -89,7 +87,7 @@ func TestNode_WorkerExecute(t *testing.T) { require.Equal(t, outRequestID, received.RequestID) require.Equal(t, expected.Code, received.Code) - require.Equal(t, expected.Result, received.Results[node.host.ID()].Result) + require.Equal(t, expected.Result, received.Results[node.host.ID()].Result.Result) }) err = node.processExecute(context.Background(), receiver.ID(), executionRequest) @@ -121,7 +119,6 @@ func TestNode_WorkerExecute(t *testing.T) { requestID = reqID out := faultyExecutionResult - out.RequestID = reqID return out, mocks.GenericError } @@ -148,7 +145,7 @@ func TestNode_WorkerExecute(t *testing.T) { require.Equal(t, received.RequestID, requestID) require.Equal(t, faultyExecutionResult.Code, received.Code) - require.Equal(t, faultyExecutionResult.Result, received.Results[node.host.ID()].Result) + require.Equal(t, faultyExecutionResult.Result, received.Results[node.host.ID()].Result.Result) }) err = node.processExecute(context.Background(), receiver.ID(), executionRequest) @@ -334,10 +331,12 @@ func TestNode_HeadExecute(t *testing.T) { res := response.Execute{ Code: codes.OK, RequestID: requestID, - Results: map[peer.ID]execute.Result{ + Results: execute.ResultMap{ mockWorker.Host.ID(): { - Code: codes.OK, - Result: executionResult, + Result: execute.Result{ + Code: codes.OK, + Result: executionResult, + }, }, }, } diff --git a/node/execution_results.go b/node/execution_results.go index 1858a6f7..6fcbcc46 100644 --- a/node/execution_results.go +++ b/node/execution_results.go @@ -20,8 +20,9 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, defer exCancel() type aggregatedResult struct { - result execute.Result - peers []peer.ID + result execute.Result + peers []peer.ID + metadata map[peer.ID]any } var ( @@ -30,7 +31,7 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, wg sync.WaitGroup results = make(map[string]aggregatedResult) - out execute.ResultMap = make(map[peer.ID]execute.Result) + out execute.ResultMap = make(map[peer.ID]execute.NodeResult) ) wg.Add(len(peers)) @@ -69,26 +70,35 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, lock.Lock() defer lock.Unlock() - // Equality means same result and same timestamp. - reskey := fmt.Sprintf("%+#v-%s", exres.Result, er.PBFT.RequestTimestamp.String()) + // Equality means same result (output) and same timestamp. + reskey := fmt.Sprintf("%+#v-%s", exres.Result.Result, er.PBFT.RequestTimestamp.String()) result, ok := results[reskey] if !ok { results[reskey] = aggregatedResult{ - result: exres, + result: exres.Result, peers: []peer.ID{ sender, }, + metadata: map[peer.ID]any{ + sender: exres.Metadata, + }, } return } + // Record which peers have this result, and their metadata. result.peers = append(result.peers, sender) + result.metadata[sender] = exres.Metadata + if uint(len(result.peers)) >= count { n.log.Info().Str("request", requestID).Int("peers", len(peers)).Uint("matching_results", count).Msg("have enough matching results") exCancel() for _, peer := range result.peers { - out[peer] = result.result + out[peer] = execute.NodeResult{ + Result: result.result, + Metadata: result.metadata[peer], + } } } }(rp) @@ -107,7 +117,7 @@ func (n *Node) gatherExecutionResults(ctx context.Context, requestID string, pee defer exCancel() var ( - results execute.ResultMap = make(map[peer.ID]execute.Result) + results execute.ResultMap = make(map[peer.ID]execute.NodeResult) reslock sync.Mutex wg sync.WaitGroup ) diff --git a/node/worker_execute.go b/node/worker_execute.go index 343b4d90..1bc7b3db 100644 --- a/node/worker_execute.go +++ b/node/worker_execute.go @@ -35,6 +35,11 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque return nil } + metadata, err := n.cfg.MetadataProvider.Metadata(req.Request, result.Result) + if err != nil { + log.Error().Err(err).Msg("could not get metadata for the execution result") + } + log.Info().Str("code", code.String()).Msg("execution complete") // Cache the execution result. @@ -45,7 +50,10 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque Code: code, RequestID: requestID, Results: execute.ResultMap{ - n.host.ID(): result, + n.host.ID(): execute.NodeResult{ + Result: result, + Metadata: metadata, + }, }, } diff --git a/testing/mocks/generic.go b/testing/mocks/generic.go index 07f8d9af..c94748e4 100644 --- a/testing/mocks/generic.go +++ b/testing/mocks/generic.go @@ -38,7 +38,6 @@ var ( Stderr: "generic-execution-log", ExitCode: 0, }, - RequestID: GenericUUID.String(), } GenericExecutionRequest = execute.Request{ diff --git a/testing/mocks/node.go b/testing/mocks/node.go index aa0d71be..be03cd22 100644 --- a/testing/mocks/node.go +++ b/testing/mocks/node.go @@ -22,7 +22,9 @@ func BaselineNode(t *testing.T) *Node { ExecuteFunctionFunc: func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) { results := execute.ResultMap{ - GenericPeerID: GenericExecutionResult, + GenericPeerID: execute.NodeResult{ + Result: GenericExecutionResult, + }, } // TODO: Add a generic cluster info