Skip to content

Commit

Permalink
ddsched plugin gRPC calls have timeout by default
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal Tichák committed Oct 22, 2024
1 parent 153a87c commit 6455cbe
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 15 deletions.
2 changes: 2 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func setDefaults() error {
viper.SetDefault("ccdbEndpoint", "http://ccdb-test.cern.ch:8080")
viper.SetDefault("dcsServiceEndpoint", "//127.0.0.1:50051")
viper.SetDefault("dcsServiceUseSystemProxy", false)
viper.SetDefault("ddSchedulergRPCTimeout", "5")
viper.SetDefault("ddSchedulerEndpoint", "//127.0.0.1:50052")
viper.SetDefault("ddSchedulerUseSystemProxy", false)
viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060")
Expand Down Expand Up @@ -170,6 +171,7 @@ func setFlags() error {
pflag.String("dcsServiceEndpoint", viper.GetString("dcsServiceEndpoint"), "Endpoint of the DCS gRPC service (`host:port`)")
pflag.Bool("dcsServiceUseSystemProxy", viper.GetBool("dcsServiceUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("ddSchedulerEndpoint", viper.GetString("ddSchedulerEndpoint"), "Endpoint of the DD scheduler gRPC service (`host:port`)")
pflag.String("ddSchedulergRPCTimeout", viper.GetString("ddSchedulergRPCTimeout"), "Timeout for gRPC calls in ddshed plugin in seconds")
pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)")
pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")
Expand Down
32 changes: 17 additions & 15 deletions core/integration/ddsched/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/AliceO2Group/Control/common/logger"
"github.com/AliceO2Group/Control/core/integration"
ddpb "github.com/AliceO2Group/Control/core/integration/ddsched/protos"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand All @@ -38,12 +39,11 @@ import (
"google.golang.org/grpc/keepalive"
)

var log = logger.New(logrus.StandardLogger(),"ddschedclient")

var log = logger.New(logrus.StandardLogger(), "ddschedclient")

type RpcClient struct {
ddpb.DataDistributionControlClient
conn *grpc.ClientConn
conn *grpc.ClientConn
cancel context.CancelFunc
}

Expand All @@ -52,10 +52,10 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
"endpoint": endpoint,
}).Debug("dialing DD scheduler endpoint")

dialOptions := []grpc.DialOption {
dialOptions := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
Backoff: backoff.Config{
BaseDelay: backoff.DefaultConfig.BaseDelay,
Multiplier: backoff.DefaultConfig.Multiplier,
Jitter: backoff.DefaultConfig.Jitter,
Expand All @@ -68,14 +68,15 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)
Timeout: time.Second,
PermitWithoutStream: true,
}),
grpc.WithUnaryInterceptor(integration.UnaryTimeoutInterceptor(time.Second*time.Duration(viper.GetInt("ddSchedulergRPCTimeout")), "ddsched gRPC call failed")),
}
if !viper.GetBool("ddSchedulerUseSystemProxy") {
dialOptions = append(dialOptions, grpc.WithNoProxy())
}
conn, err := grpc.DialContext(cxt,
endpoint,
dialOptions...,
)
endpoint,
dialOptions...,
)
if err != nil {
log.WithField("error", err.Error()).
WithField("endpoint", endpoint).
Expand All @@ -95,27 +96,27 @@ func NewClient(cxt context.Context, cancel context.CancelFunc, endpoint string)

for {
select {
case ok := <- stateChangedNotify:
case ok := <-stateChangedNotify:
if !ok {
return
}
connState = conn.GetState()
log.Debugf("DD scheduler client %s", connState.String())
go notifyFunc(connState)
case <- time.After(2 * time.Minute):
case <-time.After(2 * time.Minute):
if conn.GetState() != connectivity.Ready {
conn.ResetConnectBackoff()
}
case <- cxt.Done():
case <-cxt.Done():
return
}
}
}()

client := &RpcClient {
client := &RpcClient{
DataDistributionControlClient: ddpb.NewDataDistributionControlClient(conn),
conn: conn,
cancel: cancel,
conn: conn,
cancel: cancel,
}

return client
Expand All @@ -131,4 +132,5 @@ func (m *RpcClient) GetConnState() connectivity.State {
func (m *RpcClient) Close() error {
m.cancel()
return m.conn.Close()
}
}

58 changes: 58 additions & 0 deletions core/integration/rpctimeoutinterceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2021 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

package integration

import (
"context"
"errors"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/status"
)

func UnaryTimeoutInterceptor(timeout time.Duration, cause string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
connection *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// Create a context with a timeout
ctx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New(cause))
defer cancel() // Ensure the context is canceled after the call

// Invoke the RPC call with the new context
err := invoker(ctx, method, req, reply, connection, opts...)
if err != nil {
st, _ := status.FromError(err)
// Handle error, maybe logging or processing the error status
return st.Err()
}
return nil
}
}

0 comments on commit 6455cbe

Please sign in to comment.