Skip to content

factor18/flo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

54 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Flo

Extensible workflow orchestration framework

Installation

The package can be installed by adding flo to your list of dependencies in mix.exs:

def deps do
  [{:flo, "~> 0.2"}]
end

Add to list of applications

extra_applications: [:logger, :flo]

Flo Framework

Flo exposes two behaviours which can be used to create your own triggers and components

  • Component is a behaviour for exposing common application logic in a reusable manner. Think of this as a function, such as write to database, publish to Kafka, etc that can be used by all Flo apps
  • Trigger is a behaviour for building event-consumers that trigger workflows. The Kafka subscriber is an example of a trigger

Workflow is a combination of Trigger(s) and Components

  • Triggers
    • Invokes the workflow
    • Can be invoked on its own (interval, cron etc) or by external events (http, queues etc)
  • Components:
    • Definition of a task
    • Have a common interface which allows them to be interconnected
  • Connections:
    • Defines the order of invokation
    • Can be conditional
    • Allows branching and merging

Component

Here is the implementation of a component which returns a dog pic of a given breed

@name and @scope are used for referencing this component in a workflow

@inports are a list of properties which can be consumed by the component

@outports defines the responses from this component, which can be consumed by other components

defmodule Flo.Core.Component.Dog do
  alias Flo.{Port, Context, Outports}

  @name "dog"

  @scope "core"

  @inports [
    %Port{required: true, name: "breed", schema: %{"type" => "string"}}
  ]

  @outports %Outports{
    default: [
      %Port{name: "url", required: true, schema: %{"type" => "string"}}
    ],
    additional: %{
      "error" => [
        %Port{name: "message", required: true, schema: %{"type" => "string"}}
      ]
    }
  }

  use Flo.Component

  @impl true
  def run(%Context.Element{inports: inports}) do
    breed = inports |> Map.get("breed")

    url = "https://dog.ceo/api/breed/#{breed}/images/random/1"

    case HTTPoison.get(url) do
      {:ok, %{status_code: 200, body: body}} ->
        url = Jason.decode!(body) |> Map.get("message") |> Enum.at(0)
        %Context.Outports{outcome: "default", value: %{"url" => url}}
      _ ->
        %Context.Outports{outcome: "error", value: %{"message" => "No dog :-("}}
    end
  end
end

Now this component can be used in all of your workflows

Trigger

Here is the implementation of a trigger @name and @scope are used for referencing this component in a workflow

@configs are a list of configs which can be consumed by the component

@outports defines the responses from this component, which can be consumed by other components

initialize function will be invoked with a callback function start

The start function will receive the outports map and will start the workflow whenever called

Flo.Trigger uses a GenServer behind the scenes, the return of initialize will be the return of GenServer's init callback

defmodule Flo.Core.Trigger.AMQP do
  alias Flo.{Port, Context, Outports}

  @name "amqp"

  @scope "core"

  @configs [
    %Port{
      name: "queue",
      required: true,
      schema: %{"type" => "string"}
    },
    %Port{
      required: true,
      name: "connection_string",
      schema: %{"type" => "string"}
    }
  ]

  @outports %Outports{
    default: [
      %Port{
        required: true,
        name: "payload",
        schema: %{"type" => "string"}
      }
    ]
  }

  use Flo.Trigger

  @impl true
  def initialize(%Context.Stimulus{configs: configs}, start) do
    queue = configs |> Map.get("queue")
    connection_string = configs |> Map.get("connection_string")

    {:ok, conn} = AMQP.Connection.open(connection_string)
    {:ok, chan} = Channel.open(conn)

    AMQP.Queue.subscribe(chan, queue, fn payload, _meta ->
      start.(%{"payload" => payload})
    end)

    {:ok, %{start: start}}
  end
end

Workflow

Image of Workflow

Here is a sample which implements the above workflow

stimuli are the list of triggers

elements are the list of components

connections form the flow between components

%Flo.Workflow{
  name: "sample",
  description: "Sample Flow",
  stimuli: [
    %Flo.Stimulus{
      ref: "interval-trigger",
      inports: %{},
      scope: "core",
      name: "interval",
      configs: %{
        "delay" => %Flo.Script{language: Flo.Script.Language.vanilla(), source: 5000}
      }
    }
  ],
  elements: [
    %Flo.Element{
      ref: "dog-api",
      name: "dog",
      scope: "core",
      inports: %{
        "breed" => %Flo.Script{
          source: "shihtzu",
          language: Flo.Script.Language.vanilla(),
        }
      }
    },
    %Flo.Element{
      ref: "delay",
      name: "delay",
      scope: "core",
      inports: %{
        "delay" => %Flo.Script{
          source: 1500,
          language: Flo.Script.Language.vanilla(),
        }
      }
    },
    %Flo.Element{
      ref: "url-log",
      name: "log",
      scope: "core",
      inports: %{
        "delay" => %Flo.Script{
          source: "Check the photo {{elements.dog-api.outports.url.value}}",
          language: Flo.Script.Language.liquid(),
        }
      }
    },
    %Flo.Element{
      ref: "error-log",
      name: "log",
      scope: "core",
      inports: %{
        "delay" => %Flo.Script{
          source: "Error occured: {{elements.dog-api.outports.error.message.value}}",
          language: Flo.Script.Language.liquid(),
        }
      }
    },
    %Flo.Element{
      ref: "end-log",
      name: "log",
      scope: "core",
      inports: %{
        "delay" => %Flo.Script{
          source: "Done!!",
          language: Flo.Script.Language.vanilla(),
        }
      }
    }
  ],
  connections: [
    %Flo.Connection{
      source: "dog-api",
      outcome: "default",
      destination: "delay",
    },
    %Flo.Connection{
      source: "delay",
      outcome: "default",
      destination: "url-log",
    },
    %Flo.Connection{
      source: "dog-api",
      outcome: "error",
      destination: "error-log",
    },
    %Flo.Connection{
      source: "url-log",
      outcome: "default",
      destination: "end-log",
    },
    %Flo.Connection{
      source: "error-log",
      outcome: "default",
      destination: "end-log",
    }
  ]
}
|> Flo.WorkflowRegistry.register()

A component will be executed when all of the incoming connections are resolved

A connection can be in three states INITIAL, RESOLVED, DISABLED

If there are multiple connections to the component, the component will be executed only when all of the connections are in RESOLVED and DISABLED state and at least one connection should be in resolved state

If a connection is DISABLED, it will recursively disable all connections till a component is found which has a connection still in INITIAL or RESOLVED state

TODO

  • Workflow execution
  • Branching and merging
  • Conditional branching
  • Multiple outcomes for components
  • Loops
  • Visual Editor
  • Sub flows
  • Error handling
  • Documentation

Contributing

Request a new feature by creating an issue or create a pull request with new features or fixes.

License

Flo source code is released under Mozilla Public License 2.0. Check LICENSE file for more information