From 7d11eee01bdacbba2e3961a60bcc429075ad708f Mon Sep 17 00:00:00 2001 From: Frank Martinez Date: Fri, 10 Aug 2018 11:03:37 -0400 Subject: [PATCH] add engine channels --- app/config.go | 2 ++ engine/channels/channels.go | 26 ++++++++++++++++++++++++++ engine/engine.go | 16 ++++++++++++++++ 3 files changed, 44 insertions(+) create mode 100644 engine/channels/channels.go diff --git a/app/config.go b/app/config.go index f923701..7548d16 100644 --- a/app/config.go +++ b/app/config.go @@ -19,7 +19,9 @@ type Config struct { Type string `json:"type"` Version string `json:"version"` Description string `json:"description"` + Properties []*data.Attribute `json:"properties"` + Channels []string `json:"channels"` Triggers []*trigger.Config `json:"triggers"` Resources []*resource.Config `json:"resources"` Actions []*action.Config `json:"actions"` diff --git a/engine/channels/channels.go b/engine/channels/channels.go new file mode 100644 index 0000000..dfe4cac --- /dev/null +++ b/engine/channels/channels.go @@ -0,0 +1,26 @@ +package channels + +var channels = make(map[string]chan interface{}) + +// Count returns the number of channels +func Count() int { + return len(channels) +} + +// Add adds an engine channel, assumes these are created before startup +func Add(name string){ + //todo add size? + channels[name] = make(chan interface{}) +} + +// Get gets the named channel +func Get(name string) chan interface{} { + return channels[name] +} + +//Close closes all the channels, assumes it is called on shutdown +func Close() { + for _, value := range channels { + close(value) + } +} \ No newline at end of file diff --git a/engine/engine.go b/engine/engine.go index 27cb162..0ffa51c 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -18,6 +18,7 @@ import ( "github.com/TIBCOSoftware/flogo-lib/util" "github.com/TIBCOSoftware/flogo-lib/util/managed" "sync" + "github.com/TIBCOSoftware/flogo-lib/engine/channels" ) var managedServices []managed.Managed @@ -110,6 +111,16 @@ func (e *engineImpl) Init(directRunner bool) error { } } + //add engine channels + channelNames := e.app.Channels + if len(channelNames) > 0 { + for _, channelName := range channelNames { + + logger.Debugf("Creating Engine Channel '%s'", channelName) + channels.Add(channelName) + } + } + err = app.RegisterResources(e.app.Resources) if err != nil { return err @@ -225,6 +236,11 @@ func (e *engineImpl) Start() error { func (e *engineImpl) Stop() error { logger.Info("Engine Stopping...") + if channels.Count() > 0 { + logger.Info("Closing Engine Channels...") + channels.Close() + } + logger.Info("Stopping Triggers...") // Stop Triggers