Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When re-creating listeners in EVMConnect/EthConnect/FabConnect use the last event block #1534

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (e *Ethereum) Capabilities() *blockchain.Capabilities {
return e.capabilities
}

func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) {
func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) {
ethLocation, err := e.parseContractLocation(ctx, contract.Location)
if err != nil {
return "", err
Expand All @@ -286,7 +286,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N
if !ok {
return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found")
}
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI)
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI, lastProtocolID)

if err != nil {
return "", err
Expand Down Expand Up @@ -874,7 +874,7 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio
return result, err
}

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) {
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
var location *Location
namespace := listener.Namespace
if listener.Location != nil {
Expand All @@ -893,7 +893,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi)
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID)
if err != nil {
return err
}
Expand Down
44 changes: 25 additions & 19 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func TestInitAllExistingStreams(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -964,7 +964,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func TestInitAllExistingStreamsOld(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -1080,7 +1080,7 @@ func TestInitAllExistingStreamsInvalidName(t *testing.T) {

<-toServer
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10416", err)
}

Expand Down Expand Up @@ -2027,7 +2027,7 @@ func TestAddSubscription(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.NoError(t, err)
}
Expand Down Expand Up @@ -2062,7 +2062,7 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.NoError(t, err)
}
Expand Down Expand Up @@ -2097,7 +2097,7 @@ func TestAddSubscriptionBadParamDetails(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10311", err)
}
Expand All @@ -2118,7 +2118,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) {
Event: &core.FFISerializedEvent{},
}

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10310", err)
}
Expand Down Expand Up @@ -2147,7 +2147,7 @@ func TestAddSubscriptionFail(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewStringResponder(500, "pop"))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -4005,7 +4005,7 @@ func TestAddSubBadLocation(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err := e.AddFireflySubscription(e.ctx, ns, contract)
_, err := e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10310", err)
}

Expand All @@ -4025,9 +4025,15 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) {
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{}))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub1",
}))
func(r *http.Request) (*http.Response, error) {
var s subscription
err := json.NewDecoder(r.Body).Decode(&s)
assert.NoError(t, err)
assert.Equal(t, "19", s.FromBlock)
return httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub1",
})(r)
})
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(2))

utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
Expand Down Expand Up @@ -4055,7 +4061,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
subID, err := e.AddFireflySubscription(e.ctx, ns, contract)
subID, err := e.AddFireflySubscription(e.ctx, ns, contract, "000000000020/000000/000000")
assert.NoError(t, err)
assert.NotNil(t, e.subs.GetSubscription("sub1"))

Expand Down Expand Up @@ -4103,7 +4109,7 @@ func TestAddFireflySubscriptionV1(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)
assert.NotNil(t, e.subs.GetSubscription("sub1"))
}
Expand Down Expand Up @@ -4147,7 +4153,7 @@ func TestAddFireflySubscriptionEventstreamFail(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10465", err)
}

Expand Down Expand Up @@ -4189,7 +4195,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down Expand Up @@ -4231,7 +4237,7 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down Expand Up @@ -4273,7 +4279,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down
61 changes: 51 additions & 10 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,6 +21,8 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"strings"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
Expand Down Expand Up @@ -201,18 +203,57 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
return sub.Name, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) {
// Map FireFly "firstEvent" values to Ethereum "fromBlock" values
switch firstEvent {
case string(core.SubOptsFirstEventOldest):
func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) {
// Parse the lastProtocolID if supplied
var blockBeforeNewestEvent *uint64
if len(lastProtocolID) > 0 {
blockStr := strings.Split(lastProtocolID, "/")[0]
parsedUint, err := strconv.ParseUint(blockStr, 10, 64)
if err != nil {
return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID)
}
if parsedUint > 0 {
// We jump back on block from the last event, to minimize re-delivery while ensuring
// we get all events since the last delivered (including subsequent events in the same block)
parsedUint--
blockBeforeNewestEvent = &parsedUint
}
}

// If the user requested newest, then we use the last block number if we have one,
// or we pass the request for newest down to the connector
if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" {
if blockBeforeNewestEvent != nil {
return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil
}
return "latest", nil
}

// Otherwise we expect to be able to parse the block, with "oldest" being the same as "0"
if firstEvent == string(core.SubOptsFirstEventOldest) {
firstEvent = "0"
case string(core.SubOptsFirstEventNewest):
firstEvent = "latest"
}
blockNumber, err := strconv.ParseUint(firstEvent, 10, 64)
if err != nil {
return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent)
}
// If the last event is already dispatched after this block, recreate the listener from that block
if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber {
blockNumber = *blockBeforeNewestEvent
}
return strconv.FormatUint(blockNumber, 10), nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) {
fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID)
if err != nil {
return nil, err
}

sub := subscription{
Name: subName,
Stream: stream,
FromBlock: firstEvent,
FromBlock: fromBlock,
EthCompatEvent: abi,
}

Expand Down Expand Up @@ -244,7 +285,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok
return nil
}

func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry) (sub *subscription, err error) {
func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry, lastProtocolID string) (sub *subscription, err error) {
// Include a hash of the instance path in the subscription, so if we ever point at a different
// contract configuration, we re-subscribe from block 0.
// We don't need full strength hashing, so just use the first 16 chars for readability.
Expand Down Expand Up @@ -286,7 +327,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace
name = v1Name
}
location := &Location{Address: instancePath}
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi, lastProtocolID); err != nil {
return nil, err
}
log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID)
Expand Down
69 changes: 69 additions & 0 deletions internal/blockchain/ethereum/eventstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ethereum

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestCreateSubscriptionBadBlock(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()

_, err := e.streams.createSubscription(context.Background(), nil, "", "", "wrongness", nil, "")
assert.Regexp(t, "FF10473", err)
}

func TestResolveFromBlockCombinations(t *testing.T) {

ctx := context.Background()

fromBlock, err := resolveFromBlock(ctx, "", "")
assert.Equal(t, "latest", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "latest", "")
assert.Equal(t, "latest", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "newest", "")
assert.Equal(t, "latest", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "0", "")
assert.Equal(t, "0", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/000000/000050")
assert.Equal(t, "9", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/000000/000050")
assert.Equal(t, "20", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "", "000000000010/000000/000050")
assert.Equal(t, "9", fromBlock)
assert.NoError(t, err)

_, err = resolveFromBlock(ctx, "", "wrong")
assert.Regexp(t, "FF10472", err)

}
Loading
Loading