Skip to content

Commit

Permalink
feat: add quickstart for collector
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgacsal committed Jan 21, 2025
1 parent 2a7562e commit 4bcb499
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 1 deletion.
4 changes: 4 additions & 0 deletions collector/quickstart/collector/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
http:
enabled: true
address: 0.0.0.0:4195
debug_endpoints: false
4 changes: 4 additions & 0 deletions collector/quickstart/collector/resources/dedupe-cache.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cache_resources:
- label: dedupe_cache
memory:
default_ttl: 3600s
71 changes: 71 additions & 0 deletions collector/quickstart/collector/streams/input.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
input:
http_server:
address: 0.0.0.0:8889
path: /api/v1/events
sync_response:
status: '${! meta("http_response_status").or("204") }'

pipeline:
processors:
- switch:
- check: meta("Content-Type").lowercase() == "application/cloudevents-batch+json"
processors:
- unarchive:
format: json_array
- check: meta("Content-Type").lowercase() == "application/cloudevents+json"
processors:
- noop: {}
- check: ""
processors:
- log:
level: ERROR
message: 'Unexpected Content-Type: ${!meta("Content-Type")}'
- mapping: |
meta http_response_status = "400"
root = {
"type": "about:blank",
"title": "Bad Request",
"status": 400,
"detail":"request body has an error: header Content-Type has unexpected value \"%s\"".format(meta("Content-Type")),
}
- sync_response: {}
- mapping: "root = deleted()"
- json_schema:
schema_path: "file://./cloudevents.spec.json"
- catch:
- log:
level: ERROR
message: "Schema validation failed due to: ${!error()}"
- mapping: |
meta http_response_status = "400"
root = {
"type": "about:blank",
"title": "Bad Request",
"status": 400,
"detail":"request body has an error: %s".format(error()),
}
- sync_response: {}
- mapping: "root = deleted()"

output:
switch:
cases:
- check: ""
continue: true
output:
broker:
pattern: fan_out
outputs:
- sync_response: {}
processors:
- mapping: root = null
# https://github.com/benthosdev/benthos/discussions/2324
# https://github.com/benthosdev/benthos/issues/1946
- inproc: openmeter

- check: '"${DEBUG_INPUT:false}" == "true"'
output:
stdout:
codec: lines
31 changes: 31 additions & 0 deletions collector/quickstart/collector/streams/output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
input:
inproc: openmeter

buffer:
sqlite:
path: /var/lib/collector/buffer.sqlite

output:
retry:
max_retries: 0
backoff:
initial_interval: 500ms
max_interval: 3s
max_elapsed_time: 0s
output:
switch:
cases:
- check: ""
continue: true
output:
openmeter:
url: "${OPENMETER_URL:https://openmeter.cloud}"
token: "${OPENMETER_TOKEN:}"
batching:
count: ${BATCH_SIZE:20}
period: ${BATCH_PERIOD:}

- check: '"${DEBUG_OUTPUT:false}" == "true"'
output:
stdout:
codec: lines
64 changes: 64 additions & 0 deletions collector/quickstart/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
include:
- ../../quickstart/docker-compose.yaml

services:
collector:
image: ghcr.io/openmeterio/benthos-collector:latest
ports:
- "127.0.0.1:4195:4195"
environment:
OPENMETER_URL: http://openmeter:8888
command: [
"--log.level",
"info",
"--config",
"/etc/collector/config.yaml",
"--resources",
"/etc/collector/resources/*.yaml",
"streams",
"--no-api",
"/etc/collector/streams/*.yaml",
]
healthcheck:
test: [ "CMD", "wget", "--spider", "http://collector:4195/ready" ]
interval: 10s
timeout: 5s
retries: 30
volumes:
- type: volume
source: collector_data
target: /var/lib/collector
- type: bind
source: ./collector
target: /etc/collector
depends_on:
openmeter:
condition: service_healthy

seeder:
image: ghcr.io/openmeterio/benthos-collector:latest
environment:
OPENMETER_URL: http://collector:8889
SEEDER_LOG: true
SEEDER_COUNT: 100
command: [
"--config",
"/etc/seeder/config.yaml",
]
ports:
- "127.0.0.1:4196:4196"
healthcheck:
test: [ "CMD", "wget", "--spider", "http://seeder:4196/ready" ]
interval: 10s
timeout: 5s
retries: 30
volumes:
- type: bind
source: ./seeder
target: /etc/seeder
depends_on:
collector:
condition: service_healthy

volumes:
collector_data:
63 changes: 63 additions & 0 deletions collector/quickstart/seeder/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
http:
enabled: true
address: 0.0.0.0:4196
debug_endpoints: false

input:
generate:
count: ${SEEDER_COUNT:0}
interval: "${SEEDER_INTERVAL:50ms}"
# batch_size: 1
mapping: |
let max_subjects = ${SEEDER_MAX_SUBJECTS:10}
let event_type = "request"
let source = "api-gateway"
let methods = ["GET", "POST"]
let paths = ["/", "/about", "/contact", "/pricing", "/docs"]
let regions = ["us-east-1", "us-west-1", "us-east-2", "us-west-2"]
let zoneSuffixes = ["a", "b", "c", "d"]
let subject = "customer-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects)
let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format()
let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length())
let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length())
let region = $regions.index(random_int(seed: timestamp_unix_nano()) % $regions.length())
let zone = "%s%s".format($region, $zoneSuffixes.index(random_int(seed: timestamp_unix_nano()) % $zoneSuffixes.length()))
let duration = random_int(seed: timestamp_unix_nano(), max: 1000)
root = {
"id": uuid_v4(),
"specversion": "1.0",
"type": $event_type,
"source": $source,
"subject": $subject,
"time": $time,
"data": {
"method": $method,
"path": $path,
"region": $region,
"zone": $zone,
"duration_ms": $duration,
},
}
output:
switch:
cases:
- check: ""
continue: true
output:
http_client:
url: ${OPENMETER_URL:http://127.0.0.1:8888}/api/v1/events
verb: POST
headers:
Content-Type: application/cloudevents+json
Authorization: "Bearer ${OPENMETER_TOKEN:}"
max_in_flight: 1

- check: '"${SEEDER_LOG:false}" == "true"'
output:
stdout:
codec: lines
5 changes: 5 additions & 0 deletions quickstart/config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
ingest:
kafka:
broker: kafka:9092
brokerAddressFamily: v4
socketKeepAliveEnable: true
topicMetadataRefreshInterval: 10s

aggregation:
clickhouse:
Expand All @@ -13,6 +16,8 @@ sink:
kafka:
brokers: kafka:9092
brokerAddressFamily: v4
socketKeepAliveEnable: true
topicMetadataRefreshInterval: 10s
dedupe:
enabled: true
driver: redis
Expand Down
2 changes: 1 addition & 1 deletion quickstart/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
volumes:
- ./config.yaml:/etc/openmeter/config.yaml
healthcheck:
test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/meters/api_requests_total/query"]
test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/debug/metrics"]
interval: 5s
timeout: 3s
retries: 30
Expand Down

0 comments on commit 4bcb499

Please sign in to comment.