Extensible workflow orchestration framework
The package can be installed by adding flo
to your list of dependencies in mix.exs:
def deps do
[{:flo, "~> 0.2"}]
end
Add to list of applications
extra_applications: [:logger, :flo]
Flo exposes two behaviours which can be used to create your own triggers and components
Component
is a behaviour for exposing common application logic in a reusable manner. Think of this as a function, such as write to database, publish to Kafka, etc that can be used by all Flo appsTrigger
is a behaviour for building event-consumers that trigger workflows. The Kafka subscriber is an example of a trigger
- Triggers
- Invokes the workflow
- Can be invoked on its own (interval, cron etc) or by external events (http, queues etc)
- Components:
- Definition of a task
- Have a common interface which allows them to be interconnected
- Connections:
- Defines the order of invokation
- Can be conditional
- Allows branching and merging
Here is the implementation of a component which returns a dog pic of a given breed
@name
and @scope
are used for referencing this component in a workflow
@inports
are a list of properties which can be consumed by the component
@outports
defines the responses from this component, which can be consumed by other components
defmodule Flo.Core.Component.Dog do
alias Flo.{Port, Context, Outports}
@name "dog"
@scope "core"
@inports [
%Port{required: true, name: "breed", schema: %{"type" => "string"}}
]
@outports %Outports{
default: [
%Port{name: "url", required: true, schema: %{"type" => "string"}}
],
additional: %{
"error" => [
%Port{name: "message", required: true, schema: %{"type" => "string"}}
]
}
}
use Flo.Component
@impl true
def run(%Context.Element{inports: inports}) do
breed = inports |> Map.get("breed")
url = "https://dog.ceo/api/breed/#{breed}/images/random/1"
case HTTPoison.get(url) do
{:ok, %{status_code: 200, body: body}} ->
url = Jason.decode!(body) |> Map.get("message") |> Enum.at(0)
%Context.Outports{outcome: "default", value: %{"url" => url}}
_ ->
%Context.Outports{outcome: "error", value: %{"message" => "No dog :-("}}
end
end
end
Now this component can be used in all of your workflows
Here is the implementation of a trigger
@name
and @scope
are used for referencing this component in a workflow
@configs
are a list of configs which can be consumed by the component
@outports
defines the responses from this component, which can be consumed by other components
initialize
function will be invoked with a callback function start
The start
function will receive the outports map and will start the workflow whenever called
Flo.Trigger
uses a GenServer
behind the scenes, the return of initialize
will be the return of GenServer's init
callback
defmodule Flo.Core.Trigger.AMQP do
alias Flo.{Port, Context, Outports}
@name "amqp"
@scope "core"
@configs [
%Port{
name: "queue",
required: true,
schema: %{"type" => "string"}
},
%Port{
required: true,
name: "connection_string",
schema: %{"type" => "string"}
}
]
@outports %Outports{
default: [
%Port{
required: true,
name: "payload",
schema: %{"type" => "string"}
}
]
}
use Flo.Trigger
@impl true
def initialize(%Context.Stimulus{configs: configs}, start) do
queue = configs |> Map.get("queue")
connection_string = configs |> Map.get("connection_string")
{:ok, conn} = AMQP.Connection.open(connection_string)
{:ok, chan} = Channel.open(conn)
AMQP.Queue.subscribe(chan, queue, fn payload, _meta ->
start.(%{"payload" => payload})
end)
{:ok, %{start: start}}
end
end
Here is a sample which implements the above workflow
stimuli
are the list of triggers
elements
are the list of components
connections
form the flow between components
%Flo.Workflow{
name: "sample",
description: "Sample Flow",
stimuli: [
%Flo.Stimulus{
ref: "interval-trigger",
inports: %{},
scope: "core",
name: "interval",
configs: %{
"delay" => %Flo.Script{language: Flo.Script.Language.vanilla(), source: 5000}
}
}
],
elements: [
%Flo.Element{
ref: "dog-api",
name: "dog",
scope: "core",
inports: %{
"breed" => %Flo.Script{
source: "shihtzu",
language: Flo.Script.Language.vanilla(),
}
}
},
%Flo.Element{
ref: "delay",
name: "delay",
scope: "core",
inports: %{
"delay" => %Flo.Script{
source: 1500,
language: Flo.Script.Language.vanilla(),
}
}
},
%Flo.Element{
ref: "url-log",
name: "log",
scope: "core",
inports: %{
"delay" => %Flo.Script{
source: "Check the photo {{elements.dog-api.outports.url.value}}",
language: Flo.Script.Language.liquid(),
}
}
},
%Flo.Element{
ref: "error-log",
name: "log",
scope: "core",
inports: %{
"delay" => %Flo.Script{
source: "Error occured: {{elements.dog-api.outports.error.message.value}}",
language: Flo.Script.Language.liquid(),
}
}
},
%Flo.Element{
ref: "end-log",
name: "log",
scope: "core",
inports: %{
"delay" => %Flo.Script{
source: "Done!!",
language: Flo.Script.Language.vanilla(),
}
}
}
],
connections: [
%Flo.Connection{
source: "dog-api",
outcome: "default",
destination: "delay",
},
%Flo.Connection{
source: "delay",
outcome: "default",
destination: "url-log",
},
%Flo.Connection{
source: "dog-api",
outcome: "error",
destination: "error-log",
},
%Flo.Connection{
source: "url-log",
outcome: "default",
destination: "end-log",
},
%Flo.Connection{
source: "error-log",
outcome: "default",
destination: "end-log",
}
]
}
|> Flo.WorkflowRegistry.register()
A component will be executed when all of the incoming connections are resolved
A connection can be in three states INITIAL
, RESOLVED
, DISABLED
If there are multiple connections to the component, the component will be executed only when all of the connections are in RESOLVED
and DISABLED
state and at least one connection should be in resolved state
If a connection is DISABLED
, it will recursively disable all connections till a component is found which has a connection still in INITIAL
or RESOLVED
state
- Workflow execution
- Branching and merging
- Conditional branching
- Multiple outcomes for components
- Loops
- Visual Editor
- Sub flows
- Error handling
- Documentation
Request a new feature by creating an issue or create a pull request with new features or fixes.
Flo
source code is released under Mozilla Public License 2.0.
Check LICENSE file for more information