diff --git a/collector/quickstart/collector/config.yaml b/collector/quickstart/collector/config.yaml new file mode 100644 index 000000000..e4f79ab3b --- /dev/null +++ b/collector/quickstart/collector/config.yaml @@ -0,0 +1,4 @@ +http: + enabled: true + address: 0.0.0.0:4195 + debug_endpoints: false diff --git a/collector/quickstart/collector/resources/dedupe-cache.yaml b/collector/quickstart/collector/resources/dedupe-cache.yaml new file mode 100644 index 000000000..c4a1e505c --- /dev/null +++ b/collector/quickstart/collector/resources/dedupe-cache.yaml @@ -0,0 +1,4 @@ +cache_resources: + - label: dedupe_cache + memory: + default_ttl: 3600s diff --git a/collector/quickstart/collector/streams/input.yaml b/collector/quickstart/collector/streams/input.yaml new file mode 100644 index 000000000..c11485a19 --- /dev/null +++ b/collector/quickstart/collector/streams/input.yaml @@ -0,0 +1,71 @@ +input: + http_server: + address: 0.0.0.0:8889 + path: /api/v1/events + sync_response: + status: '${! meta("http_response_status").or("204") }' + +pipeline: + processors: + - switch: + - check: meta("Content-Type").lowercase() == "application/cloudevents-batch+json" + processors: + - unarchive: + format: json_array + - check: meta("Content-Type").lowercase() == "application/cloudevents+json" + processors: + - noop: {} + - check: "" + processors: + - log: + level: ERROR + message: 'Unexpected Content-Type: ${!meta("Content-Type")}' + - mapping: | + meta http_response_status = "400" + + root = { + "type": "about:blank", + "title": "Bad Request", + "status": 400, + "detail":"request body has an error: header Content-Type has unexpected value \"%s\"".format(meta("Content-Type")), + } + - sync_response: {} + - mapping: "root = deleted()" + - json_schema: + schema_path: "file://./cloudevents.spec.json" + - catch: + - log: + level: ERROR + message: "Schema validation failed due to: ${!error()}" + - mapping: | + meta http_response_status = "400" + + root = { + "type": "about:blank", + "title": "Bad Request", + "status": 400, + "detail":"request body has an error: %s".format(error()), + } + - sync_response: {} + - mapping: "root = deleted()" + +output: + switch: + cases: + - check: "" + continue: true + output: + broker: + pattern: fan_out + outputs: + - sync_response: {} + processors: + - mapping: root = null + # https://github.com/benthosdev/benthos/discussions/2324 + # https://github.com/benthosdev/benthos/issues/1946 + - inproc: openmeter + + - check: '"${DEBUG_INPUT:false}" == "true"' + output: + stdout: + codec: lines diff --git a/collector/quickstart/collector/streams/output.yaml b/collector/quickstart/collector/streams/output.yaml new file mode 100644 index 000000000..492436409 --- /dev/null +++ b/collector/quickstart/collector/streams/output.yaml @@ -0,0 +1,31 @@ +input: + inproc: openmeter + +buffer: + sqlite: + path: /var/lib/collector/buffer.sqlite + +output: + retry: + max_retries: 0 + backoff: + initial_interval: 500ms + max_interval: 3s + max_elapsed_time: 0s + output: + switch: + cases: + - check: "" + continue: true + output: + openmeter: + url: "${OPENMETER_URL:https://openmeter.cloud}" + token: "${OPENMETER_TOKEN:}" + batching: + count: ${BATCH_SIZE:20} + period: ${BATCH_PERIOD:} + + - check: '"${DEBUG_OUTPUT:false}" == "true"' + output: + stdout: + codec: lines diff --git a/collector/quickstart/docker-compose.yaml b/collector/quickstart/docker-compose.yaml new file mode 100644 index 000000000..8ca702445 --- /dev/null +++ b/collector/quickstart/docker-compose.yaml @@ -0,0 +1,64 @@ +include: + - ../../quickstart/docker-compose.yaml + +services: + collector: + image: ghcr.io/openmeterio/benthos-collector:latest + ports: + - "127.0.0.1:4195:4195" + environment: + OPENMETER_URL: http://openmeter:8888 + command: [ + "--log.level", + "info", + "--config", + "/etc/collector/config.yaml", + "--resources", + "/etc/collector/resources/*.yaml", + "streams", + "--no-api", + "/etc/collector/streams/*.yaml", + ] + healthcheck: + test: [ "CMD", "wget", "--spider", "http://collector:4195/ready" ] + interval: 10s + timeout: 5s + retries: 30 + volumes: + - type: volume + source: collector_data + target: /var/lib/collector + - type: bind + source: ./collector + target: /etc/collector + depends_on: + openmeter: + condition: service_healthy + + seeder: + image: ghcr.io/openmeterio/benthos-collector:latest + environment: + OPENMETER_URL: http://collector:8889 + SEEDER_LOG: true + SEEDER_COUNT: 100 + command: [ + "--config", + "/etc/seeder/config.yaml", + ] + ports: + - "127.0.0.1:4196:4196" + healthcheck: + test: [ "CMD", "wget", "--spider", "http://seeder:4196/ready" ] + interval: 10s + timeout: 5s + retries: 30 + volumes: + - type: bind + source: ./seeder + target: /etc/seeder + depends_on: + collector: + condition: service_healthy + +volumes: + collector_data: diff --git a/collector/quickstart/seeder/config.yaml b/collector/quickstart/seeder/config.yaml new file mode 100644 index 000000000..801f97d45 --- /dev/null +++ b/collector/quickstart/seeder/config.yaml @@ -0,0 +1,63 @@ +http: + enabled: true + address: 0.0.0.0:4196 + debug_endpoints: false + +input: + generate: + count: ${SEEDER_COUNT:0} + interval: "${SEEDER_INTERVAL:50ms}" + # batch_size: 1 + mapping: | + let max_subjects = ${SEEDER_MAX_SUBJECTS:10} + + let event_type = "request" + let source = "api-gateway" + let methods = ["GET", "POST"] + let paths = ["/", "/about", "/contact", "/pricing", "/docs"] + let regions = ["us-east-1", "us-west-1", "us-east-2", "us-west-2"] + let zoneSuffixes = ["a", "b", "c", "d"] + + let subject = "customer-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects) + let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format() + + let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length()) + let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length()) + let region = $regions.index(random_int(seed: timestamp_unix_nano()) % $regions.length()) + let zone = "%s%s".format($region, $zoneSuffixes.index(random_int(seed: timestamp_unix_nano()) % $zoneSuffixes.length())) + let duration = random_int(seed: timestamp_unix_nano(), max: 1000) + + root = { + "id": uuid_v4(), + "specversion": "1.0", + "type": $event_type, + "source": $source, + "subject": $subject, + "time": $time, + "data": { + "method": $method, + "path": $path, + "region": $region, + "zone": $zone, + "duration_ms": $duration, + }, + } + +output: + switch: + cases: + - check: "" + continue: true + output: + http_client: + url: ${OPENMETER_URL:http://127.0.0.1:8888}/api/v1/events + verb: POST + headers: + Content-Type: application/cloudevents+json + Authorization: "Bearer ${OPENMETER_TOKEN:}" + max_in_flight: 1 + + - check: '"${SEEDER_LOG:false}" == "true"' + output: + stdout: + codec: lines diff --git a/quickstart/config.yaml b/quickstart/config.yaml index ab2a53a00..95dbd4b01 100644 --- a/quickstart/config.yaml +++ b/quickstart/config.yaml @@ -1,6 +1,9 @@ ingest: kafka: broker: kafka:9092 + brokerAddressFamily: v4 + socketKeepAliveEnable: true + topicMetadataRefreshInterval: 10s aggregation: clickhouse: @@ -13,6 +16,8 @@ sink: kafka: brokers: kafka:9092 brokerAddressFamily: v4 + socketKeepAliveEnable: true + topicMetadataRefreshInterval: 10s dedupe: enabled: true driver: redis diff --git a/quickstart/docker-compose.yaml b/quickstart/docker-compose.yaml index df085f54d..9f3bc6714 100644 --- a/quickstart/docker-compose.yaml +++ b/quickstart/docker-compose.yaml @@ -16,7 +16,7 @@ services: volumes: - ./config.yaml:/etc/openmeter/config.yaml healthcheck: - test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/meters/api_requests_total/query"] + test: ["CMD", "wget", "--spider", "http://openmeter:8888/api/v1/debug/metrics"] interval: 5s timeout: 3s retries: 30