Skip to content

Commit

Permalink
Merge pull request #1534 from hyperledger/fix-1531
Browse files Browse the repository at this point in the history
When re-creating listeners in EVMConnect/EthConnect/FabConnect use the last event block
  • Loading branch information
EnriqueL8 authored Jul 10, 2024
2 parents a6700a8 + 8dd5a0d commit c00ecf8
Show file tree
Hide file tree
Showing 62 changed files with 502 additions and 148 deletions.
8 changes: 4 additions & 4 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (e *Ethereum) Capabilities() *blockchain.Capabilities {
return e.capabilities
}

func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) {
func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) {
ethLocation, err := e.parseContractLocation(ctx, contract.Location)
if err != nil {
return "", err
Expand All @@ -286,7 +286,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N
if !ok {
return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found")
}
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI)
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI, lastProtocolID)

if err != nil {
return "", err
Expand Down Expand Up @@ -874,7 +874,7 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio
return result, err
}

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) {
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
var location *Location
namespace := listener.Namespace
if listener.Location != nil {
Expand All @@ -893,7 +893,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi)
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID)
if err != nil {
return err
}
Expand Down
44 changes: 25 additions & 19 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func TestInitAllExistingStreams(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -964,7 +964,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func TestInitAllExistingStreamsOld(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -1080,7 +1080,7 @@ func TestInitAllExistingStreamsInvalidName(t *testing.T) {

<-toServer
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10416", err)
}

Expand Down Expand Up @@ -2027,7 +2027,7 @@ func TestAddSubscription(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.NoError(t, err)
}
Expand Down Expand Up @@ -2062,7 +2062,7 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.NoError(t, err)
}
Expand Down Expand Up @@ -2097,7 +2097,7 @@ func TestAddSubscriptionBadParamDetails(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10311", err)
}
Expand All @@ -2118,7 +2118,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) {
Event: &core.FFISerializedEvent{},
}

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10310", err)
}
Expand Down Expand Up @@ -2147,7 +2147,7 @@ func TestAddSubscriptionFail(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewStringResponder(500, "pop"))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -4005,7 +4005,7 @@ func TestAddSubBadLocation(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err := e.AddFireflySubscription(e.ctx, ns, contract)
_, err := e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10310", err)
}

Expand All @@ -4025,9 +4025,15 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) {
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{}))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub1",
}))
func(r *http.Request) (*http.Response, error) {
var s subscription
err := json.NewDecoder(r.Body).Decode(&s)
assert.NoError(t, err)
assert.Equal(t, "19", s.FromBlock)
return httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub1",
})(r)
})
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(2))

utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
Expand Down Expand Up @@ -4055,7 +4061,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
subID, err := e.AddFireflySubscription(e.ctx, ns, contract)
subID, err := e.AddFireflySubscription(e.ctx, ns, contract, "000000000020/000000/000000")
assert.NoError(t, err)
assert.NotNil(t, e.subs.GetSubscription("sub1"))

Expand Down Expand Up @@ -4103,7 +4109,7 @@ func TestAddFireflySubscriptionV1(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)
assert.NotNil(t, e.subs.GetSubscription("sub1"))
}
Expand Down Expand Up @@ -4147,7 +4153,7 @@ func TestAddFireflySubscriptionEventstreamFail(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10465", err)
}

Expand Down Expand Up @@ -4189,7 +4195,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down Expand Up @@ -4231,7 +4237,7 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down Expand Up @@ -4273,7 +4279,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down
61 changes: 51 additions & 10 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,6 +21,8 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"strings"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
Expand Down Expand Up @@ -201,18 +203,57 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
return sub.Name, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) {
// Map FireFly "firstEvent" values to Ethereum "fromBlock" values
switch firstEvent {
case string(core.SubOptsFirstEventOldest):
func resolveFromBlock(ctx context.Context, firstEvent, lastProtocolID string) (string, error) {
// Parse the lastProtocolID if supplied
var blockBeforeNewestEvent *uint64
if len(lastProtocolID) > 0 {
blockStr := strings.Split(lastProtocolID, "/")[0]
parsedUint, err := strconv.ParseUint(blockStr, 10, 64)
if err != nil {
return "", i18n.NewError(ctx, coremsgs.MsgInvalidLastEventProtocolID, lastProtocolID)
}
if parsedUint > 0 {
// We jump back on block from the last event, to minimize re-delivery while ensuring
// we get all events since the last delivered (including subsequent events in the same block)
parsedUint--
blockBeforeNewestEvent = &parsedUint
}
}

// If the user requested newest, then we use the last block number if we have one,
// or we pass the request for newest down to the connector
if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) || firstEvent == "latest" {
if blockBeforeNewestEvent != nil {
return strconv.FormatUint(*blockBeforeNewestEvent, 10), nil
}
return "latest", nil
}

// Otherwise we expect to be able to parse the block, with "oldest" being the same as "0"
if firstEvent == string(core.SubOptsFirstEventOldest) {
firstEvent = "0"
case string(core.SubOptsFirstEventNewest):
firstEvent = "latest"
}
blockNumber, err := strconv.ParseUint(firstEvent, 10, 64)
if err != nil {
return "", i18n.NewError(ctx, coremsgs.MsgInvalidFromBlockNumber, firstEvent)
}
// If the last event is already dispatched after this block, recreate the listener from that block
if blockBeforeNewestEvent != nil && *blockBeforeNewestEvent > blockNumber {
blockNumber = *blockBeforeNewestEvent
}
return strconv.FormatUint(blockNumber, 10), nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) {
fromBlock, err := resolveFromBlock(ctx, firstEvent, lastProtocolID)
if err != nil {
return nil, err
}

sub := subscription{
Name: subName,
Stream: stream,
FromBlock: firstEvent,
FromBlock: fromBlock,
EthCompatEvent: abi,
}

Expand Down Expand Up @@ -244,7 +285,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok
return nil
}

func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry) (sub *subscription, err error) {
func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry, lastProtocolID string) (sub *subscription, err error) {
// Include a hash of the instance path in the subscription, so if we ever point at a different
// contract configuration, we re-subscribe from block 0.
// We don't need full strength hashing, so just use the first 16 chars for readability.
Expand Down Expand Up @@ -286,7 +327,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace
name = v1Name
}
location := &Location{Address: instancePath}
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi, lastProtocolID); err != nil {
return nil, err
}
log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID)
Expand Down
69 changes: 69 additions & 0 deletions internal/blockchain/ethereum/eventstream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ethereum

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestCreateSubscriptionBadBlock(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()

_, err := e.streams.createSubscription(context.Background(), nil, "", "", "wrongness", nil, "")
assert.Regexp(t, "FF10473", err)
}

func TestResolveFromBlockCombinations(t *testing.T) {

ctx := context.Background()

fromBlock, err := resolveFromBlock(ctx, "", "")
assert.Equal(t, "latest", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "latest", "")
assert.Equal(t, "latest", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "newest", "")
assert.Equal(t, "latest", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "0", "")
assert.Equal(t, "0", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "0", "000000000010/000000/000050")
assert.Equal(t, "9", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "20", "000000000010/000000/000050")
assert.Equal(t, "20", fromBlock)
assert.NoError(t, err)

fromBlock, err = resolveFromBlock(ctx, "", "000000000010/000000/000050")
assert.Equal(t, "9", fromBlock)
assert.NoError(t, err)

_, err = resolveFromBlock(ctx, "", "wrong")
assert.Regexp(t, "FF10472", err)

}
Loading

0 comments on commit c00ecf8

Please sign in to comment.