TelluridePipeline collaborates with TellurideSensor and TellurideUI to provide an example of a Broadway pipeline consuming a stream of simulated IoT sensor reading messages from a RabbitMQ
queue, in batches, computing some simple aggregate metrics over the stream of messages, and then publishhing those metrics in a batch-oriented way to a queue on RabbitMQ
by way of the BroadwayRabbitMQ producer. The point of this example is not the domain, which is contrived, but the mechanics of Broadway
and Rabbit MQ working together.
Broadway
is built on GenStage that is in turn a GenServer
. This hierarchy of relationships is leveraged to configure, start, supervise, stop, and restart Broadway
in this example.
See Getting Started below for instructions on starting this example.
with:
- broadway library
- broadway_rabbitmq library
TelemetryBroadwayWorker
is the Broadway
module, and its configuration is in its' start_link/1 function. While start_link/1 is a bit verbose, the mileage that is achieved in terms of worker processes is tremendous.
For this example, we take advantage of the ability to pass in behavior by way of handler(s) in the :context
option. The logic for partitioning messages is defined as a fn and assigned to :handle_message
. :context
is provided to every callback.
In our example, configuration in start_link/1 collaborates with a GenServer
data container called BroadwayConfig
. BroadwayConfig
has client functions for setting and obtaining each of the available configuration options which it keeps in its state. The only arguments that it allows are the integers for each assign function. start_link/1 fetches the assigned value or a sensible default for each configuration option.
The BroadwayConfigConsumer
listens on a Rabbit MQ queue for a message where any of these BroadwayConfig
elements can be changed. When such a message is received, the configuration elements are updated and then TelemetryBroadwayWorker
is sent a :stop message under :normal circumstances. This causes Broadway
to: 1) safely drain existing messages, 2) and then terminate Broadway. The Supervisor notices that Broadway is stopped and restarts it allowing start_link/1 to pick up the new configuration.
As with all things Elixir, messages are required to cause work to happen. In this example, after startup, the messages originate from Rabbit MQ.
Broadway
configuration default values are obtained from config/config.exs. Once started, our example can be reconfigured by way of BroadwayConfigConsumer
which expects that the received JSON payload will decode to key-value pairs as found in config/config.exs.
Exchange | Exchange Type | Routing Key | Queue |
---|---|---|---|
sensor_events | direct | sensor.config | broadway_config_queue |
%{
"processor_concurrency" => 6,
"producer_concurrency" => 2,
"rate_limit_allowed" => 50,
"rate_limit_interval" => 1000,
"sensor_batcher_one_batch_size" => 6,
"sensor_batcher_one_concurrency" => 4,
"sensor_batcher_two_batch_size" => 6,
"sensor_batcher_two_concurrency" => 4
}
Our Broadway
producer is a RabbitMQ consumer. In our case, can be found in TelemetryBroadwayWorker
, we expect messages in the shape of the struct represented by SensorMessage
. The shape of the message is completely up to you, the domain, and the source of the data.
Exchange | Exchange Type | Routing Key | Queue |
---|---|---|---|
sensor_events | direct | sensor.reading | sensor_readings_queue |
%SensorMessage{
device_id: "line_two_device_02",
line_id: "line_two",
reading: 156.09505261601717,
sensor_id: "line_two::line_two_device_02",
timestamp: "2021-02-18T22:18:12.588910Z"
}
Our simple example partitions messages by sensor_id
across two batchers, each with a configurable concurrency and batch size. The partition_by option in combination with :erlang.phash2/1 ensure that messages associated with a given sensor is always processed by the same Broadway processes.
Our simple domain simply computes a running mean, min, and max value for the sensor. It collaborates with another GenServer
data container, SensorTracker
, to keep this running tally and to publish those that have changed in a given period back to Rabbit MQ by way of SensorAggregateProducer
.
Exchange | Exchange Type | Routing Key | Queue |
---|---|---|---|
sensor_events | direct | sensor.health | sensor_health_queue |
%SensorAggregate{
max: 101.75975626404166,
mean: 95.0549809719666,
min: 89.0575969722796,
sensor_id: "line_two::line_two_device_03",
total_reads: 43
}
Broadway
includes telemetry and we take advantage of these call backs to track: node-level min, max, and mean, as well as time so that throughput can be calculated. This information is published to Rabbit MQ by way of MetricProducer
in collaboration with a GenServer
data container, InstrumentationTracker
.
Exchange | Exchange Type | Routing Key | Queue |
---|---|---|---|
sensor_events | direct | sensor.metric | sensor_metric_queue |
%NodeMetric{
call_count: 46,
first_time: -576460750997645000,
last_duration: 589000,
last_time: -576460747795139000,
max_duration: 4405000,
mean_duration: 1381586.9565217393,
min_duration: 376000,
msg_count: 446,
name: "sensor_batcher_two",
node_type: "batcher_processor",
partition: 1
}
Broadway
is comprised of a configurable number of concurrent processes. We use the built-in telemetry to capture overall throughput by way of the collaboration of a GenServer
data container, ThroughputTracker
, and a Rabbit MQ producer, ThroughputProducer
.
Exchange | Exchange Type | Routing Key | Queue |
---|---|---|---|
sensor_events | direct | broadway.throughput | broadway_throughput_queue |
%Throughput{
earliest_raw_time: -576460751035217000,
last_raw_time: -576460741745295000,
total_failed_count: 0,
total_message_count: 1056,
total_successful_count: 1056
}
alias TelluridePipeline.Ets.BroadwayConfig
:ok = BroadwayConfig.upsert(key, value) # key must be binary
value = BroadwayConfig.find(key) # key must be binary
How to stop a GenServer using a binary pid by converting it to a PID:
Process.flag(:trap_exit, true)
a_pid = :erlang.list_to_pid(String.to_charlist("<0.316.0>"))
GenServer.stop(a_pid, :normal)
- Start RabbitMQ.
A docker-compose.yaml
that includes RabbitMQ is provided in telluride_pipeline
. Start RabbitMQ by executing:
cd telluride_pipeline/
docker-compose up -d
- Start TelluridePipeline by executing:
cd telluride_pipeline/
iex -S mix
To run the telluride_pipeline
tests:
mix test --only telemetry_broadway
- Start TellurideSensor by executing:
cd telluride_sensor/
iex -S mix
- Start TellurideUI by executing:
cd telluride_ui/
mix phx.server
and then point your browser at http://localhost:4000
If you need help with your Elixir projects, contact [email protected] or visit https://brsg.io.
This project was inspired by Marlus Saraiva's ElixirConf 2019 talk Build Efficient Data Processing Pipelines.
Copyright 2021 - Blue River Systems Group, LLC - All Rights Reeserved
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.