Skip to content

Commit

Permalink
fix: added session timeout to prevent TooManySessions from happening
Browse files Browse the repository at this point in the history
  • Loading branch information
JeremyTheocharis committed Apr 2, 2024
1 parent b7bc6e8 commit bcbd97f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
22 changes: 22 additions & 0 deletions opcua_plugin/opcua.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/gopcua/opcua/ua"
)

const SESSION_TIMEOUT = 5 * time.Second

type NodeDef struct {
NodeID *ua.NodeID
NodeClass ua.NodeClass
Expand Down Expand Up @@ -221,6 +223,7 @@ var OPCUAConfigSpec = service.NewConfigSpec().
Field(service.NewStringField("endpoint").Description("Address of the OPC-UA server to connect with.")).
Field(service.NewStringField("username").Description("Username for server access. If not set, no username is used.").Default("")).
Field(service.NewStringField("password").Description("Password for server access. If not set, no password is used.").Default("")).
Field(service.NewStringField("sessionTimeout").Description("The duration in milliseconds that a OPC UA session will last. Is used to ensure that older failed sessions will timeout and that we will not get a TooManySession error.").Default("10000")).
Field(service.NewStringListField("nodeIDs").Description("List of OPC-UA node IDs to begin browsing.")).
Field(service.NewStringField("securityMode").Description("Security mode to use. If not set, a reasonable security mode will be set depending on the discovered endpoints.").Default("")).
Field(service.NewStringField("securityPolicy").Description("The security policy to use. If not set, a reasonable security policy will be set depending on the discovered endpoints.").Default("")).
Expand Down Expand Up @@ -286,6 +289,11 @@ func newOPCUAInput(conf *service.ParsedConfig, mgr *service.Resources) (service.
return nil, err
}

sessionTimeout, err := conf.FieldInt("sessionTimeout")
if err != nil {
return nil, err
}

// fail if no nodeIDs are provided
if len(nodeIDs) == 0 {
return nil, errors.New("no nodeIDs provided")
Expand All @@ -303,6 +311,7 @@ func newOPCUAInput(conf *service.ParsedConfig, mgr *service.Resources) (service.
SecurityPolicy: securityPolicy,
Insecure: insecure,
SubscribeEnabled: subscribeEnabled,
SessionTimeout: sessionTimeout,
}

return service.AutoRetryNacksBatched(m), nil
Expand Down Expand Up @@ -337,6 +346,7 @@ type OPCUAInput struct {
// this is required for subscription
SubscribeEnabled bool
SubNotifyChan chan *opcua.PublishNotificationData
SessionTimeout int
}

// updateNodePaths updates the node paths to use the nodeID instead of the browseName
Expand Down Expand Up @@ -726,6 +736,12 @@ func (g *OPCUAInput) Connect(ctx context.Context) error {

// TODO: only continue if it is not something like password wrong or toomanysessions, etc.
if errors.Is(err, ua.StatusBadUserAccessDenied) || errors.Is(err, ua.StatusBadTooManySessions) {
// Adding a sleep to prevent immediate re-connect
// In the case of ua.StatusBadUserAccessDenied, the session is for some reason not properly closed on the server
// If we were to re-connect, we could overload the server with too many sessions as the default timeout is 10 seconds
timeout := time.Duration(g.SessionTimeout * int(time.Millisecond))
g.Log.Errorf("Encountered unrecoverable error. Waiting before trying to re-connect to prevent overloading the server.", err, timeout)
time.Sleep(timeout)
return err
}

Expand Down Expand Up @@ -888,6 +904,12 @@ func (g *OPCUAInput) GetOPCUAClientOptions(selectedEndpoint *ua.EndpointDescript
opts = append(opts, opcua.PrivateKey(pk), opcua.Certificate(cert.Certificate[0]))
}

opts = append(opts, opcua.SessionName("benthos-umh"))
opts = append(opts, opcua.SessionTimeout(time.Duration(g.SessionTimeout*int(time.Millisecond)))) // set the session timeout to prevent having to many connections
opts = append(opts, opcua.ApplicationName("benthos-umh"))
//opts = append(opts, opcua.ApplicationURI("urn:benthos-umh"))
//opts = append(opts, opcua.ProductURI("urn:benthos-umh"))

return opts, nil
}

Expand Down
9 changes: 5 additions & 4 deletions opcua_plugin/opcua_plc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ var _ = FDescribe("Test Against WAGO PLC", Serial, func() {
FIt("should fail to connect", func() {

input = &OPCUAInput{
Endpoint: endpoint,
Username: "123", // Incorrect username and password
Password: "123",
NodeIDs: nil,
Endpoint: endpoint,
Username: "123", // Incorrect username and password
Password: "123",
NodeIDs: nil,
SessionTimeout: 1000,
}
// Attempt to connect
err := input.Connect(ctx)
Expand Down

0 comments on commit bcbd97f

Please sign in to comment.