Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement funding and watching service over tcp+perun's protobuf encoding. #266

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*~
app/payment/perun.log
demo
17 changes: 12 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ CLI_BIN := perunnodecli
TUI_PKG := ./cmd/perunnodetui
TUI_BIN := perunnodetui

DEMO_DIR := demo

LDFLAGS=-ldflags "-X 'main.version=$(VERSION)' -X 'main.gitCommitID=$(GIT_COMMIT_ID)' -X 'main.goperunVersion=$(GOPERUN_VERSION)'"

build:
go build $(LDFLAGS) $(NODE_PKG)
go build $(CLI_PKG)
go build $(TUI_PKG)
install:
go install $(LDFLAGS) $(NODE_PKG)
go install $(CLI_PKG)
go install $(TUI_PKG)

generate: install
@mkdir $(DEMO_DIR)
@cd $(DEMO_DIR) && $(NODE_BIN) generate
@echo "Configuration files for demo generated in ./$(DEMO_DIR)"

clean:
rm -rf $(NODE_BIN) $(CLI_BIN) $(TUI_BIN) node.yaml alice bob
rm -rf demo
243 changes: 16 additions & 227 deletions api/grpc/funding.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,276 +18,65 @@ package grpc

import (
"context"
"fmt"

"github.com/pkg/errors"
pchannel "perun.network/go-perun/channel"
psync "polycry.pt/poly-go/sync"

"github.com/hyperledger-labs/perun-node"
"github.com/hyperledger-labs/perun-node/api/grpc/pb"
"github.com/hyperledger-labs/perun-node/api/handlers"
)

// fundingServer represents a grpc server that can serve funding API.
type fundingServer struct {
pb.UnimplementedFunding_APIServer
n perun.NodeAPI

// The mutex should be used when accessing the map data structures.
psync.Mutex
subscribes map[string]map[pchannel.ID]pchannel.AdjudicatorSubscription
*handlers.FundingHandler
}

// Fund wraps session.Fund.
func (a *fundingServer) Fund(ctx context.Context, grpcReq *pb.FundReq) (*pb.FundResp, error) {
errResponse := func(err perun.APIError) *pb.FundResp {
return &pb.FundResp{
Error: pb.FromError(err),
}
}

sess, apiErr := a.n.GetSession(grpcReq.SessionID)
if apiErr != nil {
return errResponse(apiErr), nil
}
req, err := pb.ToFundingReq(grpcReq)
if err != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err)), nil
}

err = sess.Fund(ctx, req)
if err != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err)), nil
}

return &pb.FundResp{
Error: nil,
}, nil
func (a *fundingServer) Fund(ctx context.Context, req *pb.FundReq) (*pb.FundResp, error) {
return a.FundingHandler.Fund(ctx, req)
}

// RegisterAssetERC20 is a stub that always returns false. Because, the remote
// funder does not support use of assets other than the default ERC20 asset.
//
// TODO: Make actual implementation.
func (a *payChAPIServer) RegisterAssetERC20(_ context.Context, _ *pb.RegisterAssetERC20Req) (
func (a *fundingServer) RegisterAssetERC20(ctx context.Context, req *pb.RegisterAssetERC20Req) (
*pb.RegisterAssetERC20Resp, error,
) {
return &pb.RegisterAssetERC20Resp{
MsgSuccess: false,
}, nil
return a.FundingHandler.RegisterAssetERC20(ctx, req)
}

// IsAssetRegistered wraps session.IsAssetRegistered.
func (a *payChAPIServer) IsAssetRegistered(_ context.Context, req *pb.IsAssetRegisteredReq) (
func (a *fundingServer) IsAssetRegistered(ctx context.Context, req *pb.IsAssetRegisteredReq) (
*pb.IsAssetRegisteredResp,
error,
) {
errResponse := func(err perun.APIError) *pb.IsAssetRegisteredResp {
return &pb.IsAssetRegisteredResp{
Response: &pb.IsAssetRegisteredResp_Error{
Error: pb.FromError(err),
},
}
}

sess, err := a.n.GetSession(req.SessionID)
if err != nil {
return errResponse(err), nil
}
asset := pchannel.NewAsset()
err2 := asset.UnmarshalBinary(req.Asset)
if err2 != nil {
err = perun.NewAPIErrInvalidArgument(err2, "asset", fmt.Sprintf("%x", req.Asset))
return errResponse(err), nil
}

isRegistered := sess.IsAssetRegistered(asset)

return &pb.IsAssetRegisteredResp{
Response: &pb.IsAssetRegisteredResp_MsgSuccess_{
MsgSuccess: &pb.IsAssetRegisteredResp_MsgSuccess{
IsRegistered: isRegistered,
},
},
}, nil
return a.FundingHandler.IsAssetRegistered(ctx, req)
}

// Register wraps session.Register.
func (a *fundingServer) Register(ctx context.Context, req *pb.RegisterReq) (*pb.RegisterResp, error) {
errResponse := func(err perun.APIError) *pb.RegisterResp {
return &pb.RegisterResp{
Error: pb.FromError(err),
}
}

sess, err := a.n.GetSession(req.SessionID)
if err != nil {
return errResponse(err), nil
}
adjReq, err2 := pb.ToAdjReq(req.AdjReq)
if err2 != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil
}
signedStates := make([]pchannel.SignedState, len(req.SignedStates))
for i := range signedStates {
signedStates[i], err2 = pb.ToSignedState(req.SignedStates[i])
if err2 != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil
}
}

err2 = sess.Register(ctx, adjReq, signedStates)
if err2 != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil
}

return &pb.RegisterResp{
Error: nil,
}, nil
return a.FundingHandler.Register(ctx, req)
}

// Withdraw wraps session.Withdraw.
func (a *fundingServer) Withdraw(ctx context.Context, req *pb.WithdrawReq) (*pb.WithdrawResp, error) {
errResponse := func(err perun.APIError) *pb.WithdrawResp {
return &pb.WithdrawResp{
Error: pb.FromError(err),
}
}

sess, err := a.n.GetSession(req.SessionID)
if err != nil {
return errResponse(err), nil
}
adjReq, err2 := pb.ToAdjReq(req.AdjReq)
if err2 != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil
}
stateMap := pchannel.StateMap(make(map[pchannel.ID]*pchannel.State))

for i := range req.StateMap {
var id pchannel.ID
copy(id[:], req.StateMap[i].Id)
stateMap[id], err2 = pb.ToState(req.StateMap[i].State)
if err2 != nil {
return errResponse(err), nil
}
}

err2 = sess.Withdraw(ctx, adjReq, stateMap)
if err2 != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil
}

return &pb.WithdrawResp{
Error: nil,
}, nil
return a.FundingHandler.Withdraw(ctx, req)
}

// Progress wraps session.Progress.
func (a *fundingServer) Progress(ctx context.Context, req *pb.ProgressReq) (*pb.ProgressResp, error) {
errResponse := func(err perun.APIError) *pb.ProgressResp {
return &pb.ProgressResp{
Error: pb.FromError(err),
}
}

sess, err := a.n.GetSession(req.SessionID)
if err != nil {
return errResponse(err), nil
}
var progReq perun.ProgressReq
var err2 error
progReq.AdjudicatorReq, err2 = pb.ToAdjReq(req.AdjReq)
if err2 != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil
}
progReq.NewState, err2 = pb.ToState(req.NewState)
if err2 != nil {
return errResponse(err), nil
}
copy(progReq.Sig, req.Sig)

err2 = sess.Progress(ctx, progReq)
if err2 != nil {
return errResponse(perun.NewAPIErrUnknownInternal(err2)), nil
}

return &pb.ProgressResp{
Error: nil,
}, nil
return a.FundingHandler.Progress(ctx, req)
}

// Subscribe wraps session.Subscribe.

func (a *fundingServer) Subscribe(req *pb.SubscribeReq, stream pb.Funding_API_SubscribeServer) error {
sess, err := a.n.GetSession(req.SessionID)
if err != nil {
return errors.WithMessage(err, "retrieving session")
}

var chID pchannel.ID
copy(chID[:], req.ChID)

adjSub, err := sess.Subscribe(context.Background(), chID)
if err != nil {
return errors.WithMessage(err, "setting up subscription")
notify := func(notif *pb.SubscribeResp) error {
return stream.Send(notif)
}

a.Lock()
a.subscribes[req.SessionID][chID] = adjSub
a.Unlock()

// This stream is anyways closed when StopWatching is called for.
// Hence, that will act as the exit condition for the loop.
go func() {
// will return nil, when the sub is closed.
// so, we need a mechanism to call close on the server side.
// so, add a call Unsubscribe, which simply calls close.
for {
adjEvent := adjSub.Next()
if adjEvent == nil {
err := errors.WithMessage(adjSub.Err(), "sub closed with error")
notif := &pb.SubscribeResp_Error{
Error: pb.FromError(perun.NewAPIErrUnknownInternal(err)),
}
// TODO: Proper error handling. For now, ignore this error.
_ = stream.Send(&pb.SubscribeResp{Response: notif}) //nolint: errcheck
return
}
notif, err := pb.SubscribeResponseFromAdjEvent(adjEvent)
if err != nil {
return
}
err = stream.Send(notif)
if err != nil {
return
}
}
}()

return nil
return a.FundingHandler.Subscribe(req, notify)
}

func (a *fundingServer) Unsubscribe(_ context.Context, req *pb.UnsubscribeReq) (*pb.UnsubscribeResp, error) {
errResponse := func(err perun.APIError) *pb.UnsubscribeResp {
return &pb.UnsubscribeResp{
Error: pb.FromError(err),
}
}

var chID pchannel.ID
copy(chID[:], req.ChID)

a.Lock()
adjSub := a.subscribes[req.SessionID][chID]
delete(a.subscribes[req.SessionID], chID)
a.Unlock()

if err := adjSub.Close(); err != nil {
return errResponse(perun.NewAPIErrUnknownInternal(errors.WithMessage(err, "retrieving session"))), nil
}

return &pb.UnsubscribeResp{
Error: nil,
}, nil
func (a *fundingServer) Unsubscribe(ctx context.Context, req *pb.UnsubscribeReq) (*pb.UnsubscribeResp, error) {
return a.FundingHandler.Unsubscribe(ctx, req)
}
4 changes: 2 additions & 2 deletions api/grpc/payment_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func StartServer(t *testing.T, nodeCfg perun.NodeConfig, grpcPort string) {
nodeAPI, err := node.New(nodeCfg)
require.NoErrorf(t, err, "initializing nodeAPI")

t.Log("Started ListenAndServePayChAPI")
t.Log("Started grpc service")
go func() {
if err := grpc.ListenAndServePayChAPI(nodeAPI, grpcPort); err != nil {
if err := grpc.ServePaymentAPI(nodeAPI, grpcPort); err != nil {
t.Logf("server returned with error: %v", err)
}
}()
Expand Down
Loading