-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use separate event stream per namespace #1388
Changes from 4 commits
7ad5304
3f04c1e
a37304f
596caed
66173b2
e5f1ccc
9655d6f
91a18fc
94bfc75
0559710
cecee27
aa64620
e0b34a5
3fe2b08
6b4fd88
bf4656b
a52d3bc
3ad8416
65283ec
813e29b
7972561
ba212a9
99c7bc1
ee0770d
b9b7a29
716efcd
dd281b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,6 @@ import ( | |
"github.com/hyperledger/firefly-common/pkg/fftypes" | ||
"github.com/hyperledger/firefly-common/pkg/i18n" | ||
"github.com/hyperledger/firefly-common/pkg/log" | ||
"github.com/hyperledger/firefly-common/pkg/retry" | ||
"github.com/hyperledger/firefly-common/pkg/wsclient" | ||
"github.com/hyperledger/firefly-signer/pkg/abi" | ||
"github.com/hyperledger/firefly-signer/pkg/ffi2abi" | ||
|
@@ -61,24 +60,23 @@ const ( | |
type Ethereum struct { | ||
ctx context.Context | ||
cancelCtx context.CancelFunc | ||
topic string | ||
pluginTopic string | ||
prefixShort string | ||
prefixLong string | ||
capabilities *blockchain.Capabilities | ||
callbacks common.BlockchainCallbacks | ||
client *resty.Client | ||
streams *streamManager | ||
streamID string | ||
wsconn wsclient.WSClient | ||
closed chan struct{} | ||
streamID map[string]string | ||
wsconn map[string]wsclient.WSClient | ||
wsConfig *wsclient.WSConfig | ||
closed map[string]chan struct{} | ||
addressResolveAlways bool | ||
addressResolver *addressResolver | ||
metrics metrics.Manager | ||
ethconnectConf config.Section | ||
subs common.FireflySubscriptions | ||
cache cache.CInterface | ||
backgroundRetry *retry.Retry | ||
backgroundStart bool | ||
} | ||
|
||
type eventStreamWebsocket struct { | ||
|
@@ -162,7 +160,7 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf | |
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", ethconnectConf) | ||
} | ||
|
||
wsConfig, err := wsclient.GenerateConfig(ctx, ethconnectConf) | ||
e.wsConfig, err = wsclient.GenerateConfig(ctx, ethconnectConf) | ||
if err == nil { | ||
e.client, err = ffresty.New(e.ctx, ethconnectConf) | ||
} | ||
|
@@ -171,19 +169,15 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf | |
return err | ||
} | ||
|
||
e.topic = ethconnectConf.GetString(EthconnectConfigTopic) | ||
if e.topic == "" { | ||
e.pluginTopic = ethconnectConf.GetString(EthconnectConfigTopic) | ||
if e.pluginTopic == "" { | ||
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "topic", ethconnectConf) | ||
} | ||
e.prefixShort = ethconnectConf.GetString(EthconnectPrefixShort) | ||
e.prefixLong = ethconnectConf.GetString(EthconnectPrefixLong) | ||
|
||
if wsConfig.WSKeyPath == "" { | ||
wsConfig.WSKeyPath = "/ws" | ||
} | ||
e.wsconn, err = wsclient.New(ctx, wsConfig, nil, e.afterConnect) | ||
if err != nil { | ||
return err | ||
if e.wsConfig.WSKeyPath == "" { | ||
e.wsConfig.WSKeyPath = "/ws" | ||
} | ||
|
||
cache, err := cacheManager.GetCache( | ||
|
@@ -199,29 +193,68 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf | |
} | ||
e.cache = cache | ||
|
||
e.streamID = make(map[string]string) | ||
e.closed = make(map[string]chan struct{}) | ||
e.wsconn = make(map[string]wsclient.WSClient) | ||
e.streams = newStreamManager(e.client, e.cache, e.ethconnectConf.GetUint(EthconnectConfigBatchSize), uint(e.ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())) | ||
|
||
e.backgroundStart = e.ethconnectConf.GetBool(EthconnectBackgroundStart) | ||
if e.backgroundStart { | ||
e.backgroundRetry = &retry.Retry{ | ||
InitialDelay: e.ethconnectConf.GetDuration(EthconnectBackgroundStartInitialDelay), | ||
MaximumDelay: e.ethconnectConf.GetDuration(EthconnectBackgroundStartMaxDelay), | ||
Factor: e.ethconnectConf.GetFloat64(EthconnectBackgroundStartFactor), | ||
} | ||
return nil | ||
} | ||
|
||
return nil | ||
func (e *Ethereum) getTopic(namespace string) string { | ||
return fmt.Sprintf("%s/%s", e.pluginTopic, namespace) | ||
} | ||
|
||
func (e *Ethereum) StartNamespace(ctx context.Context, namespace string) (err error) { | ||
log.L(e.ctx).Debugf("Starting namespace: %s", namespace) | ||
topic := e.getTopic(namespace) | ||
|
||
e.wsconn[namespace], err = wsclient.New(ctx, e.wsConfig, nil, func(ctx context.Context, w wsclient.WSClient) error { | ||
// Send a subscribe to our topic after each connect/reconnect | ||
b, _ := json.Marshal(ðWSCommandPayload{ | ||
Type: "listen", | ||
Topic: topic, | ||
}) | ||
err := w.Send(ctx, b) | ||
if err == nil { | ||
b, _ = json.Marshal(ðWSCommandPayload{ | ||
Type: "listenreplies", | ||
}) | ||
err = w.Send(ctx, b) | ||
} | ||
return err | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
// Otherwise, make sure that our event stream is in place | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Otherwise feels superfluous here |
||
stream, err := e.streams.ensureEventStream(ctx, topic) | ||
if err != nil { | ||
return err | ||
} | ||
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", stream.ID, topic) | ||
e.streamID[namespace] = stream.ID | ||
|
||
stream, err := e.streams.ensureEventStream(e.ctx, e.topic) | ||
err = e.wsconn[namespace].Connect() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
e.streamID = stream.ID | ||
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic) | ||
e.closed[namespace] = make(chan struct{}) | ||
|
||
go e.eventLoop(namespace) | ||
|
||
return nil | ||
} | ||
|
||
e.closed = make(chan struct{}) | ||
go e.eventLoop() | ||
func (e *Ethereum) StopNamespace(ctx context.Context, namespace string) (err error) { | ||
wsconn, ok := e.wsconn[namespace] | ||
if ok { | ||
wsconn.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feels like we should wait for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. I added a wait for this here, but it then gets into a deadlock trying to stop the namespace if ok {
<-e.closed[namespace]
wsconn.Close()
} |
||
} | ||
delete(e.wsconn, namespace) | ||
delete(e.streamID, namespace) | ||
delete(e.closed, namespace) | ||
|
||
return nil | ||
} | ||
|
@@ -234,36 +267,6 @@ func (e *Ethereum) SetOperationHandler(namespace string, handler core.OperationC | |
e.callbacks.SetOperationalHandler(namespace, handler) | ||
} | ||
|
||
func (e *Ethereum) startBackgroundLoop() { | ||
_ = e.backgroundRetry.Do(e.ctx, fmt.Sprintf("ethereum connector %s", e.Name()), func(attempt int) (retry bool, err error) { | ||
stream, err := e.streams.ensureEventStream(e.ctx, e.topic) | ||
if err != nil { | ||
return true, err | ||
} | ||
|
||
e.streamID = stream.ID | ||
log.L(e.ctx).Infof("Event stream: %s (topic=%s)", e.streamID, e.topic) | ||
err = e.wsconn.Connect() | ||
if err != nil { | ||
return true, err | ||
} | ||
|
||
e.closed = make(chan struct{}) | ||
go e.eventLoop() | ||
|
||
return false, nil | ||
}) | ||
} | ||
|
||
func (e *Ethereum) Start() (err error) { | ||
if e.backgroundStart { | ||
go e.startBackgroundLoop() | ||
return nil | ||
} | ||
|
||
return e.wsconn.Connect() | ||
} | ||
|
||
func (e *Ethereum) Capabilities() *blockchain.Capabilities { | ||
return e.capabilities | ||
} | ||
|
@@ -279,7 +282,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N | |
return "", err | ||
} | ||
|
||
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID, batchPinEventABI) | ||
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID[namespace.Name], batchPinEventABI) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a check that the map entry is set (to avoid a nil panic)? Or is the dynamic config loading threading model such that we know it's impossible for an API call to come down to this layer to create a subscription for a new listener, after the stream has been cleaned up? |
||
if err != nil { | ||
return "", err | ||
} | ||
|
@@ -295,22 +298,6 @@ func (e *Ethereum) RemoveFireflySubscription(ctx context.Context, subID string) | |
e.subs.RemoveSubscription(ctx, subID) | ||
} | ||
|
||
func (e *Ethereum) afterConnect(ctx context.Context, w wsclient.WSClient) error { | ||
// Send a subscribe to our topic after each connect/reconnect | ||
b, _ := json.Marshal(ðWSCommandPayload{ | ||
Type: "listen", | ||
Topic: e.topic, | ||
}) | ||
err := w.Send(ctx, b) | ||
if err == nil { | ||
b, _ = json.Marshal(ðWSCommandPayload{ | ||
Type: "listenreplies", | ||
}) | ||
err = w.Send(ctx, b) | ||
} | ||
return err | ||
} | ||
|
||
func ethHexFormatB32(b *fftypes.Bytes32) string { | ||
if b == nil { | ||
return "0x0000000000000000000000000000000000000000000000000000000000000000" | ||
|
@@ -460,17 +447,19 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, batchID int64, messag | |
return e.callbacks.DispatchBlockchainEvents(ctx, events) | ||
} | ||
|
||
func (e *Ethereum) eventLoop() { | ||
defer e.wsconn.Close() | ||
defer close(e.closed) | ||
func (e *Ethereum) eventLoop(namespace string) { | ||
topic := e.getTopic(namespace) | ||
wsconn := e.wsconn[namespace] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Almost certain not to be a problem, but technically I think this should be passed in on the creation of the go-routine, as there's no locking around the map. So the starting routine sets this before kicking off the |
||
defer wsconn.Close() | ||
defer close(e.closed[namespace]) | ||
l := log.L(e.ctx).WithField("role", "event-loop") | ||
ctx := log.WithLogger(e.ctx, l) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
l.Debugf("Event loop exiting (context cancelled)") | ||
return | ||
case msgBytes, ok := <-e.wsconn.Receive(): | ||
case msgBytes, ok := <-wsconn.Receive(): | ||
if !ok { | ||
l.Debugf("Event loop exiting (receive channel closed). Terminating server!") | ||
e.cancelCtx() | ||
|
@@ -489,9 +478,9 @@ func (e *Ethereum) eventLoop() { | |
if err == nil { | ||
ack, _ := json.Marshal(ðWSCommandPayload{ | ||
Type: "ack", | ||
Topic: e.topic, | ||
Topic: topic, | ||
}) | ||
err = e.wsconn.Send(ctx, ack) | ||
err = wsconn.Send(ctx, ack) | ||
} | ||
case map[string]interface{}: | ||
isBatch := false | ||
|
@@ -502,7 +491,7 @@ func (e *Ethereum) eventLoop() { | |
err = e.handleMessageBatch(ctx, (int64)(batchNumber), events) | ||
// Errors processing messages are converted into nacks | ||
ackOrNack := ðWSCommandPayload{ | ||
Topic: e.topic, | ||
Topic: topic, | ||
BatchNumber: int64(batchNumber), | ||
} | ||
if err == nil { | ||
|
@@ -513,7 +502,7 @@ func (e *Ethereum) eventLoop() { | |
ackOrNack.Message = err.Error() | ||
} | ||
b, _ := json.Marshal(&ackOrNack) | ||
err = e.wsconn.Send(ctx, b) | ||
err = wsconn.Send(ctx, b) | ||
} | ||
} | ||
if !isBatch { | ||
|
@@ -875,6 +864,7 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio | |
|
||
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) { | ||
var location *Location | ||
namespace := listener.Namespace | ||
if listener.Location != nil { | ||
location, err = e.parseContractLocation(ctx, listener.Location) | ||
if err != nil { | ||
|
@@ -891,7 +881,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr | |
if listener.Options != nil { | ||
firstEvent = listener.Options.FirstEvent | ||
} | ||
result, err := e.streams.createSubscription(ctx, location, e.streamID, subName, firstEvent, abi) | ||
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi) | ||
if err != nil { | ||
return err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it's important we document this, in the docs for the configuration of the
EthconnectConfigTopic
config entry