From 7105e5583cf306498845407e06c427eb2b6807e6 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Thu, 13 Feb 2025 00:42:42 -0500 Subject: [PATCH 01/11] Add otelcol filelogreceiver --- .../otelcol/otelcol.receiver.filelog.md | 285 ++++++++++++++++ go.mod | 4 +- go.sum | 8 - internal/component/all/all.go | 1 + .../otelcol/config_consumer_retry.go | 30 -- .../otelcol/config_stanza_receivers.go | 93 ++++++ .../otelcol/receiver/filelog/filelog.go | 314 ++++++++++++++++++ .../otelcol/receiver/filelog/filelog_test.go | 162 +++++++++ .../otelcol/receiver/syslog/syslog.go | 54 +-- .../converter_filelogreceiver.go | 148 +++++++++ .../converter_syslogreceiver.go | 16 +- .../otelcolconvert/testdata/filelog.alloy | 61 ++++ .../otelcolconvert/testdata/filelog.yaml | 65 ++++ internal/util/otel_feature_gate.go | 10 + 14 files changed, 1157 insertions(+), 94 deletions(-) create mode 100644 docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md delete mode 100644 internal/component/otelcol/config_consumer_retry.go create mode 100644 internal/component/otelcol/config_stanza_receivers.go create mode 100644 internal/component/otelcol/receiver/filelog/filelog.go create mode 100644 internal/component/otelcol/receiver/filelog/filelog_test.go create mode 100644 internal/converter/internal/otelcolconvert/converter_filelogreceiver.go create mode 100644 internal/converter/internal/otelcolconvert/testdata/filelog.alloy create mode 100644 internal/converter/internal/otelcolconvert/testdata/filelog.yaml diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md b/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md new file mode 100644 index 0000000000..29a40eabbd --- /dev/null +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md @@ -0,0 +1,285 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.receiver.filelog/ +description: Learn about otelcol.receiver.filelog +title: otelcol.receiver.filelog +--- + +Public preview + +# otelcol.receiver.filelog + +{{< docs/shared lookup="stability/public_preview.md" source="alloy" version="" >}} + +`otelcol.receiver.filelog` reads log entries from files and forwards them to other `otelcol.*` components. + +{{< admonition type="note" >}} +`otelcol.receiver.filelog` is a wrapper over the upstream OpenTelemetry Collector `filelog` receiver. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} + +You can specify multiple `otelcol.receiver.filelog` components by giving them different labels. + +## Usage + +```alloy +otelcol.receiver.filelog "LABEL" { + include = [] + output { + logs = [...] + } +} +``` + +## Arguments + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|---------------------------------|---------------------|--------------------------------------------------------------------------------------------|-------------|----------| +| `include` | `list(string)` | A list of glob patterns to include files. | `[]` | yes | +| `exclude` | `list(string)` | A list of glob patterns to exclude files that would be included by the `include` patterns. | `[]` | no | +| `poll_interval` | `time.Duration` | The interval at which the file is polled for new entries. | `1s` | no | +| `max_concurrent_files` | `int` | The maximum number of files to read concurrently. | `10` | no | +| `max_batches` | `int` | The maximum number of batches to process concurrently. | `10` | no | +| `start_at` | `string` | The position to start reading the file from. | `beginning` | no | +| `fingerprint_size` | `units.Base2Bytes` | The size of the fingerprint used to detect file changes. | `1KiB` | no | +| `max_log_size` | `units.Base2Bytes` | The maximum size of a log entry. | `1MiB` | no | +| `encoding` | `string` | The encoding of the log file. | `utf-8` | no | +| `force_flush_period` | `time.Duration` | The period after which logs are flushed even if the buffer is not full. | `500ms` | no | +| `delete_after_read` | `bool` | Whether to delete the file after reading. | `false` | no | +| `compression` | `string` | The compression type used for the log file. | `none` | no | +| `acquire_fs_lock` | `bool` | Whether to acquire a file system lock while reading the file. | `false` | no | +| `attributes` | `map(string)` | A map of attributes to add to each log entry. | `{}` | no | +| `resource` | `map(string)` | A map of resource attributes to associate with each log entry. | `{}` | no | +| `exclude_older_than` | `time.Duration` | Exclude files with a modification time older than the specified duration. | `0s` | no | +| `include_file_record_number` | `bool` | Whether to include the file record number in the log entry. | `false` | no | +| `include_file_name` | `bool` | Whether to include the file name in the log entry. | `true` | no | +| `include_file_path` | `bool` | Whether to include the file path in the log entry. | `false` | no | +| `include_file_name_resolved` | `bool` | Whether to include the resolved file name in the log entry. | `false` | no | +| `include_file_path_resolved` | `bool` | Whether to include the resolved file path in the log entry. | `false` | no | +| `include_file_owner_name` | `bool` | Whether to include the file owner's name in the log entry. | `false` | no | +| `include_file_owner_group_name` | `bool` | Whether to include the file owner's group name in the log entry. | `false` | no | +| `preserve_leading_whitespaces` | `bool` | Preserves leading whitespace in messages when set to `true`. | `false` | no | +| `preserve_trailing_whitespaces` | `bool` | Preserves trailing whitespace in messages when set to `true`. | `false` | no | +| `operators` | `lists(map(string)` | A list of operators used to parse the log entries. | `[]` | no | + +`encoding` must be one of `utf-8`, `utf-16le`, `utf-16be`, `ascii`, `big5`, or `nop`. +Refer to the upstream receiver [documentation][encoding-documentation] for more details. + +`start_at` must be one of `beginning` or `end`. The `header` block may only be used if `start_at` is `beginning`. + +[encoding-documentation]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/{{< param "OTEL_VERSION" >}}/receiver/filelogreceiver/README.md#supported-encodings + +### operators + +The `operators` list is a list of stanza [operators][] that transform the log entries after they have been read. + +For example, if container logs are being collected you may want to utilize the stanza `container` parser operator to add relevant attributes to the log entries. + +```alloy +otelcol.receiver.filelog "default" { + ... + operators = [ + { + type = "container" + } + ] +} + +``` + +[operators]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/{{< param "OTEL_VERSION" >}}/pkg/stanza/docs/operators/README.md#what-operators-are-available + + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.receiver.filelog`: + +| Hierarchy | Block | Description | Required | +|-----------------------------|-----------------------|-------------------------------------------------------------------------------------------------|----------| +| output | [output][] | Configures where to send received telemetry data. | yes | +| multiline | [multiline][] | Configures rules for multiline parsing of log messages | no | +| header | [header][] | Configures rules for parsing a log header line | no | +| retry_on_failure | [retry_on_failure][] | Configures the retry behavior when the receiver encounters an error downstream in the pipeline. | no | +| debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no | +| ordering_criteria | [ordering_criteria][] | Configures the order in which log files are processed. | no | +| ordering_criteria > sort_by | [sort_by][] | Configures the fields to sort by within the ordering critera. | no | + +The `>` symbol indicates deeper levels of nesting. For example, `ordering_criteria > sort_by` +refers to a `sort_by` block defined inside a `ordering_criteria` block. + +[multiline]: #multiline-block +[header]: #header-block +[retry_on_failure]: #retry-on-failure-block +[debug_metrics]: #debug_metrics-block +[output]: #output-block +[ordering_criteria]: #ordering-criteria-block +[sort_by]: #sort-by-block + +### multiline block + +The `multiline` block configures logic for splitting incoming log entries. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|----------------------|----------|-----------------------------------------------------------------|---------|----------| +| `line_start_pattern` | `string` | A regular expression that matches the beginning of a log entry. | | no | +| `line_end_pattern` | `string` | A regular expression that matches the end of a log entry. | | no | +| `omit_pattern` | `bool` | Omit the start/end pattern from the split log entries. | `false` | no | + +A `multiline` block must contain either `line_start_pattern` or `line_end_pattern`. + +If a `multiline` block is not set, log entries will not be split. + +### header block + +The `header` block configures logic for parsing a log header line into additional attributes added to each log entry. +It may only be used when `start_at` is set to `beginning`. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|----------------------|---------------------|-------------------------------------------------------------|---------|----------| +| `pattern` | `string` | A regular expression that matches the the header line. | | yes | +| `metadata_operators` | `lists(map(string)` | A list of operators used to parse metadata from the header. | | yes | + +If a `header` block is not set, no log lines will be treated as header metadata. + +The `metadata_operators` list is a list of stanza [operators][] that parses metadata from the header. +Any attributes created from the embedded operators pipeline will be applied to all log entries in the file. + +For example, you might use a `regex_parser` to process a header line that has been identified by the `pattern` expression. +Below is an example ficticious header line, and then the `header` block that would parse an `environment` attribute from it. +``` +HEADER_IDENTIFIER env="production" +... + +``` + +```alloy +otelcol.receiver.filelog "default" { + ... + header { + pattern = '^HEADER_IDENTIFIER .*$' + metadata_operators = [ + { + type = "regex_parser" + regex = 'env="(?P.+)"' + } + ] + } +} + +``` + +### ordering criteria block + +The `ordering_criteria` block configures the order in which log files discovered will be processed. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|--------------------|------------|-----------------------------------------------------------------------------------------------------------|--------------|----------| +| `regex` | `string` | A regular expression to capture elements of log files to use in ordering calculations. | `""` | no | +| `top_n` | `int` | The number of top log files to track when using file ordering. | `1` | no | +| `group_by` | `string` | A named capture group from the `regex` attribute used for grouping pre-sort. | `""` | no | + +### sort by block + +The `sort_by` repeatable block configures the way the fields parsed in the `ordering_criteria` block will be applied to sort the discovered log files. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|-------------|----------|------------------------------------------------------------------------------|---------|----------| +| `sort_type` | `string` | The type of sorting to apply. | | yes | +| `regex_key` | `string` | The named capture group from the `regex` attribute to use for sorting. | `""` | no | +| `ascending` | `bool` | Whether to sort in ascending order. | `true` | no | +| `layout` | `string` | The layout of the timestamp to be parsed from a named `regex` capture group. | `""` | no | +| `location` | `string` | The location of the timestamp. | `UTC` | no | + +`sort_type` must be one of `numeric`, `lexicographic`, `timestamp`, or `mtime`. +When using `numeric`, `lexicographic`, or `timestamp` `sort_type`, a named capture group defined in the `ordering_criteria`'s `regex` attribute must be provided in `regex_key`. +When using `mtime` `sort_type`, the file's modified time will be used to sort. + +The `location` and `layout` arguments are only applicable when `sort_type` is `timestamp`. + +The `location` argument specifies a Time Zone identifier. The available locations depend on the local IANA Time Zone database. +Refer to the [list of tz database time zones][tz-wiki] in Wikipedia for a non-comprehensive list. + +### retry on failure block + +The `retry_on_failure` block configures the retry behavior when the receiver encounters an error downstream in the pipeline. +A backoff algorithm is used to delay the retry upon subsequent failures. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|--------------------|------------|-----------------------------------------------------------------------------------------------------------|---------|----------| +| `enabled` | `bool` | If true, the receiver will pause reading a file and attempt to resend the current batch of logs on error. | `false` | no | +| `initial_interval` | `duration` | The time to wait after first failure to retry. | `1s` | no | +| `max_interval` | `duration` | The maximum time to wait after applying backoff logic. | `30s` | no | +| `max_elapsed_time` | `duration` | The maximum age of a message before the data is discarded. | `5m` | no | + +If `max_elapsed_time` is set to `0` data will never be discarded. + + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +### output block + +{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="" >}} + +## Exported fields + +`otelcol.receiver.filelog` does not export any fields. + +## Component health + +`otelcol.receiver.filelog` is only reported as unhealthy if given an invalid +configuration. + +## Debug metrics + +`otelcol.receiver.filelog` does not expose any component-specific debug metrics. + +## Example + +This example reads log entries using the `otelcol.receiver.filelog` receiver and +they are logged by a `otelcol.exporter.debug` component. It expects the logs to start with an +ISO8601 compatible timestamp and parses it from the log using the `regex_parser` operator. + +```alloy +otelcol.receiver.filelog "default" { + include = ["/var/log/*.log"] + operators = [{ + type = "regex_parser", + regex = "^(?P\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3,6}Z)", + timestamp = { + parse_from = "attributes.timestamp", + layout = "%Y-%m-%dT%H:%M:%S.%fZ", + location = "UTC", + }, + }] + output { + logs = [otelcol.exporter.debug.default.input] + } +} + +otelcol.exporter.debug "default" {} +``` + + + +## Compatible components + +`otelcol.receiver.filelog` can accept arguments from the following components: + +- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters) + + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index d072c0656d..0541d62d7a 100644 --- a/go.mod +++ b/go.mod @@ -874,6 +874,7 @@ require ( require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.116.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver v0.116.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.116.0 ) @@ -883,9 +884,6 @@ require ( github.com/containers/common v0.61.0 // indirect github.com/deneonet/benc v1.1.2 // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect - github.com/onsi/ginkgo/v2 v2.21.0 // indirect - github.com/onsi/gomega v1.35.1 // indirect - go.etcd.io/bbolt v1.3.11 // indirect ) // NOTE: replace directives below must always be *temporary*. diff --git a/go.sum b/go.sum index 129499e92f..a5562f8eb4 100644 --- a/go.sum +++ b/go.sum @@ -1852,17 +1852,11 @@ github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoA github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gosnmp/gosnmp v1.37.0 h1:/Tf8D3b9wrnNuf/SfbvO+44mPrjVphBhRtcGg22V07Y= -github.com/gosnmp/gosnmp v1.37.0/go.mod h1:GDH9vNqpsD7f2HvZhKs5dlqSEcAS6s6Qp099oZRCR+M= github.com/gosnmp/gosnmp v1.38.0 h1:I5ZOMR8kb0DXAFg/88ACurnuwGwYkXWq3eLpJPHMEYc= github.com/gosnmp/gosnmp v1.38.0/go.mod h1:FE+PEZvKrFz9afP9ii1W3cprXuVZ17ypCcyyfYuu5LY= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/alloy-remote-config v0.0.9 h1:gy34SxZ8Iq/HrDTIFZi80+8BlT+FnJhKiP9mryHNEUE= github.com/grafana/alloy-remote-config v0.0.9/go.mod h1:kHE1usYo2WAVCikQkIXuoG1Clz8BSdiz3kF+DZSCQ4k= -github.com/grafana/beyla v0.0.0-20250108110233-3f1b9b55c6dc h1:oY8yQB8IG0dBo1UrLlLC2CspxbiVtSWWExMxXOnfWgk= -github.com/grafana/beyla v0.0.0-20250108110233-3f1b9b55c6dc/go.mod h1:hpk185gTeIQXjxV/so9vAxhZtSEgm8ODanWXZNVnH2M= -github.com/grafana/beyla v1.9.1-0.20250122195759-1117708def46 h1:/aw+Ze9lUluE1hNZ0fAtwhmf2CKP0VbsLFumpN8xztY= -github.com/grafana/beyla v1.9.1-0.20250122195759-1117708def46/go.mod h1:CRWu15fkScScSYBlYUtdJu2Ak8ojGvnuwHToGGkaOXY= github.com/grafana/beyla v1.10.0-alloy h1:kGyZtBSS/Br2qdhbvzu8sVYZHuE9a3OzWpbp6gN55EY= github.com/grafana/beyla v1.10.0-alloy/go.mod h1:CRWu15fkScScSYBlYUtdJu2Ak8ojGvnuwHToGGkaOXY= github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2 h1:ju6EcY2aEobeBg185ETtFCKj5WzaQ48qfkbsSRRQrF4= @@ -2126,8 +2120,6 @@ github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47 github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= -github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465 h1:KwWnWVWCNtNq/ewIX7HIKnELmEx2nDP42yskD/pi7QE= -github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= github.com/ianlancetaylor/demangle v0.0.0-20240912202439-0a2b6291aafd h1:EVX1s+XNss9jkRW9K6XGJn2jL2lB1h5H804oKPsxOec= github.com/ianlancetaylor/demangle v0.0.0-20240912202439-0a2b6291aafd/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw= github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973 h1:hk4LPqXIY/c9XzRbe7dA6qQxaT6Axcbny0L/G5a4owQ= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index b8df19f4d1..e611934983 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -97,6 +97,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/processor/transform" // Import otelcol.processor.transform _ "github.com/grafana/alloy/internal/component/otelcol/receiver/datadog" // Import otelcol.receiver.datadog _ "github.com/grafana/alloy/internal/component/otelcol/receiver/file_stats" // Import otelcol.receiver.file_stats + _ "github.com/grafana/alloy/internal/component/otelcol/receiver/filelog" // Import otelcol.receiver.filelog _ "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" // Import otelcol.receiver.influxdb _ "github.com/grafana/alloy/internal/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger _ "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" // Import otelcol.receiver.kafka diff --git a/internal/component/otelcol/config_consumer_retry.go b/internal/component/otelcol/config_consumer_retry.go deleted file mode 100644 index d687788606..0000000000 --- a/internal/component/otelcol/config_consumer_retry.go +++ /dev/null @@ -1,30 +0,0 @@ -package otelcol - -import ( - "time" - - "github.com/grafana/alloy/syntax" -) - -// ConsumerRetryArguments holds shared settings for stanza receivers which can retry -// requests. There is no Convert functionality as the consumerretry package is stanza internal -type ConsumerRetryArguments struct { - Enabled bool `alloy:"enabled,attr,optional"` - InitialInterval time.Duration `alloy:"initial_interval,attr,optional"` - MaxInterval time.Duration `alloy:"max_interval,attr,optional"` - MaxElapsedTime time.Duration `alloy:"max_elapsed_time,attr,optional"` -} - -var ( - _ syntax.Defaulter = (*ConsumerRetryArguments)(nil) -) - -// SetToDefault implements syntax.Defaulter. -func (args *ConsumerRetryArguments) SetToDefault() { - *args = ConsumerRetryArguments{ - Enabled: false, - InitialInterval: 1 * time.Second, - MaxInterval: 30 * time.Second, - MaxElapsedTime: 5 * time.Minute, - } -} diff --git a/internal/component/otelcol/config_stanza_receivers.go b/internal/component/otelcol/config_stanza_receivers.go new file mode 100644 index 0000000000..25da09cb7e --- /dev/null +++ b/internal/component/otelcol/config_stanza_receivers.go @@ -0,0 +1,93 @@ +package otelcol + +import ( + "errors" + "time" + + "github.com/grafana/alloy/syntax" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" + "go.opentelemetry.io/collector/confmap" +) + +var ( + _ syntax.Defaulter = (*ConsumerRetryArguments)(nil) +) + +// ConsumerRetryArguments holds shared settings for stanza receivers which can retry +// requests. There is no Convert functionality as the consumerretry package is stanza internal +type ConsumerRetryArguments struct { + Enabled bool `alloy:"enabled,attr,optional"` + InitialInterval time.Duration `alloy:"initial_interval,attr,optional"` + MaxInterval time.Duration `alloy:"max_interval,attr,optional"` + MaxElapsedTime time.Duration `alloy:"max_elapsed_time,attr,optional"` +} + +// SetToDefault implements syntax.Defaulter. +func (args *ConsumerRetryArguments) SetToDefault() { + *args = ConsumerRetryArguments{ + Enabled: false, + InitialInterval: 1 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + } +} + +type TrimConfig struct { + PreserveLeadingWhitespace bool `alloy:"preserve_leading_whitespaces,attr,optional"` + PreserveTrailingWhitespace bool `alloy:"preserve_trailing_whitespaces,attr,optional"` +} + +func (c *TrimConfig) Convert() *trim.Config { + if c == nil { + return nil + } + + return &trim.Config{ + PreserveLeading: c.PreserveLeadingWhitespace, + PreserveTrailing: c.PreserveTrailingWhitespace, + } +} + +type MultilineConfig struct { + LineStartPattern string `alloy:"line_start_pattern,attr,optional"` + LineEndPattern string `alloy:"line_end_pattern,attr,optional"` + OmitPattern bool `alloy:"omit_pattern,attr,optional"` +} + +func (c *MultilineConfig) Convert() *split.Config { + if c == nil { + return nil + } + + return &split.Config{ + LineStartPattern: c.LineStartPattern, + LineEndPattern: c.LineEndPattern, + OmitPattern: c.OmitPattern, + } +} + +func (c *MultilineConfig) Validate() error { + if c == nil { + return nil + } + + if c.LineStartPattern == "" && c.LineEndPattern == "" { + return errors.New("either line_start_pattern or line_end_pattern must be set") + } + + if c.LineStartPattern != "" && c.LineEndPattern != "" { + return errors.New("only one of line_start_pattern or line_end_pattern can be set") + } + + return nil +} + +type Operator map[string]interface{} + +func (o Operator) Convert() (*operator.Config, error) { + cfg := &operator.Config{} + err := cfg.Unmarshal(confmap.NewFromStringMap(o)) + return cfg, err +} diff --git a/internal/component/otelcol/receiver/filelog/filelog.go b/internal/component/otelcol/receiver/filelog/filelog.go new file mode 100644 index 0000000000..73e84f4b78 --- /dev/null +++ b/internal/component/otelcol/receiver/filelog/filelog.go @@ -0,0 +1,314 @@ +// Package filelog provides an otelcol.receiver.filelog component. +package filelog + +import ( + "errors" + "fmt" + "slices" + "time" + + "github.com/alecthomas/units" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/receiver" + "github.com/grafana/alloy/internal/featuregate" + "github.com/hashicorp/go-multierror" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + stanzainputfilelog "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/file" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pipeline" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.receiver.filelog", + Stability: featuregate.StabilityPublicPreview, + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := filelogreceiver.NewFactory() + return receiver.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.receiver.filelog component. +type Arguments struct { + MatchCriteria MatchCriteria `alloy:",squash"` + PollInterval time.Duration `alloy:"poll_interval,attr,optional"` + MaxConcurrentFiles int `alloy:"max_concurrent_files,attr,optional"` + MaxBatches int `alloy:"max_batches,attr,optional"` + StartAt string `alloy:"start_at,attr,optional"` + FingerprintSize units.Base2Bytes `alloy:"fingerprint_size,attr,optional"` + MaxLogSize units.Base2Bytes `alloy:"max_log_size,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + FlushPeriod time.Duration `alloy:"force_flush_period,attr,optional"` + DeleteAfterRead bool `alloy:"delete_after_read,attr,optional"` + IncludeFileRecordNumber bool `alloy:"include_file_record_number,attr,optional"` + Compression string `alloy:"compression,attr,optional"` + AcquireFSLock bool `alloy:"acquire_fs_lock,attr,optional"` + MultilineConfig *otelcol.MultilineConfig `alloy:"multiline,block,optional"` + TrimConfig *otelcol.TrimConfig `alloy:",squash"` + Header *HeaderConfig `alloy:"header,block,optional"` + Resolver Resolver `alloy:",squash"` + + Attributes map[string]string `alloy:"attributes,attr,optional"` + Resource map[string]string `alloy:"resource,attr,optional"` + + Operators []otelcol.Operator `alloy:"operators,attr,optional"` + ConsumerRetry otelcol.ConsumerRetryArguments `alloy:"retry_on_failure,block,optional"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` + + // Output configures where to send received data. Required. + Output *otelcol.ConsumerArguments `alloy:"output,block"` +} + +type HeaderConfig struct { + Pattern string `alloy:"pattern,attr"` + MetadataOperators []otelcol.Operator `alloy:"metadata_operators,attr"` +} + +type Resolver struct { + IncludeFileName bool `alloy:"include_file_name,attr,optional"` + IncludeFilePath bool `alloy:"include_file_path,attr,optional"` + IncludeFileNameResolved bool `alloy:"include_file_name_resolved,attr,optional"` + IncludeFilePathResolved bool `alloy:"include_file_path_resolved,attr,optional"` + IncludeFileOwnerName bool `alloy:"include_file_owner_name,attr,optional"` + IncludeFileOwnerGroupName bool `alloy:"include_file_owner_group_name,attr,optional"` +} + +type MatchCriteria struct { + Include []string `alloy:"include,attr"` + Exclude []string `alloy:"exclude,attr,optional"` + + ExcludeOlderThan time.Duration `alloy:"exclude_older_than,attr,optional"` + OrderingCriteria *OrderingCriteria `alloy:"ordering_criteria,block,optional"` +} + +type OrderingCriteria struct { + Regex string `alloy:"regex,attr,optional"` + TopN int `alloy:"top_n,attr,optional"` + SortBy []Sort `alloy:"sort_by,block"` + GroupBy string `alloy:"group_by,attr,optional"` +} + +type Sort struct { + SortType string `alloy:"sort_type,attr,optional"` + RegexKey string `alloy:"regex_key,attr,optional"` + Ascending bool `alloy:"ascending,attr,optional"` + + // Timestamp only + Layout string `alloy:"layout,attr,optional"` + Location string `alloy:"location,attr,optional"` +} + +var _ receiver.Arguments = Arguments{} + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{ + Output: &otelcol.ConsumerArguments{}, + StartAt: "end", + FlushPeriod: 500 * time.Millisecond, + Encoding: "utf-8", + Resolver: Resolver{ + IncludeFileName: true, + }, + PollInterval: 200 * time.Millisecond, + FingerprintSize: units.KiB, + MaxLogSize: units.MiB, + MaxConcurrentFiles: 1024, + } + args.DebugMetrics.SetToDefault() + args.ConsumerRetry.SetToDefault() +} + +// Convert implements receiver.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + c := stanzainputfilelog.NewConfig() + + def := filelogreceiver.ReceiverType{}.CreateDefaultConfig() + cfg := def.(*filelogreceiver.FileLogConfig) + cfg.InputConfig = *c + + // consumerretry package is stanza internal so we can't just Convert + cfg.RetryOnFailure.Enabled = args.ConsumerRetry.Enabled + cfg.RetryOnFailure.InitialInterval = args.ConsumerRetry.InitialInterval + cfg.RetryOnFailure.MaxInterval = args.ConsumerRetry.MaxInterval + cfg.RetryOnFailure.MaxElapsedTime = args.ConsumerRetry.MaxElapsedTime + + for _, op := range args.Operators { + converted, err := op.Convert() + if err != nil { + return nil, err + } + cfg.Operators = append(cfg.Operators, *converted) + } + + cfg.InputConfig.PollInterval = args.PollInterval + cfg.InputConfig.MaxConcurrentFiles = args.MaxConcurrentFiles + cfg.InputConfig.MaxBatches = args.MaxBatches + cfg.InputConfig.StartAt = args.StartAt + cfg.InputConfig.FingerprintSize = helper.ByteSize(args.FingerprintSize) + cfg.InputConfig.MaxLogSize = helper.ByteSize(args.MaxLogSize) + cfg.InputConfig.Encoding = args.Encoding + cfg.InputConfig.FlushPeriod = args.FlushPeriod + cfg.InputConfig.DeleteAfterRead = args.DeleteAfterRead + cfg.InputConfig.IncludeFileRecordNumber = args.IncludeFileRecordNumber + cfg.InputConfig.Compression = args.Compression + cfg.InputConfig.AcquireFSLock = args.AcquireFSLock + + if len(args.Attributes) > 0 { + cfg.InputConfig.Attributes = make(map[string]helper.ExprStringConfig, len(args.Attributes)) + for k, v := range args.Attributes { + cfg.InputConfig.Attributes[k] = helper.ExprStringConfig(v) + } + } + + if len(args.Resource) > 0 { + cfg.InputConfig.Resource = make(map[string]helper.ExprStringConfig, len(args.Resource)) + + for k, v := range args.Resource { + cfg.InputConfig.Resource[k] = helper.ExprStringConfig(v) + } + } + + if split := args.MultilineConfig.Convert(); split != nil { + cfg.InputConfig.SplitConfig = *split + } + if trim := args.TrimConfig.Convert(); trim != nil { + cfg.InputConfig.TrimConfig = *trim + } + + if args.Header != nil { + cfg.InputConfig.Header = &fileconsumer.HeaderConfig{ + Pattern: args.Header.Pattern, + } + for _, op := range args.Header.MetadataOperators { + converted, err := op.Convert() + if err != nil { + return nil, err + } + cfg.InputConfig.Header.MetadataOperators = append(cfg.InputConfig.Header.MetadataOperators, *converted) + } + } + + cfg.InputConfig.Resolver = attrs.Resolver(args.Resolver) + + cfg.InputConfig.Criteria.Include = args.MatchCriteria.Include + cfg.InputConfig.Criteria.Exclude = args.MatchCriteria.Exclude + cfg.InputConfig.Criteria.ExcludeOlderThan = args.MatchCriteria.ExcludeOlderThan + if args.MatchCriteria.OrderingCriteria == nil { + cfg.InputConfig.Criteria.OrderingCriteria.Regex = args.MatchCriteria.OrderingCriteria.Regex + cfg.InputConfig.Criteria.OrderingCriteria.TopN = args.MatchCriteria.OrderingCriteria.TopN + cfg.InputConfig.Criteria.OrderingCriteria.GroupBy = args.MatchCriteria.OrderingCriteria.GroupBy + + for _, s := range args.MatchCriteria.OrderingCriteria.SortBy { + cfg.InputConfig.Criteria.OrderingCriteria.SortBy = append(cfg.InputConfig.Criteria.OrderingCriteria.SortBy, matcher.Sort{ + SortType: s.SortType, + RegexKey: s.RegexKey, + Ascending: s.Ascending, + Layout: s.Layout, + Location: s.Location, + }) + } + } + + return cfg, nil +} + +// Extensions implements receiver.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements receiver.Arguments. +func (args Arguments) Exporters() map[pipeline.Signal]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements receiver.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + var errs error + // Convert will validate operator(s) + _, err := args.Convert() + if err != nil { + errs = multierror.Append(errs, err) + } + + if args.MaxConcurrentFiles < 1 { + errs = multierror.Append(errs, errors.New("'max_concurrent_files' must be positive")) + } + + if args.MaxBatches < 0 { + errs = multierror.Append(errs, errors.New("'max_batches' must not be negative")) + } + + if args.StartAt == "end" && args.DeleteAfterRead { + errs = multierror.Append(errs, errors.New("'delete_after_read' cannot be used with 'start_at = end'")) + } + + _, err = decode.LookupEncoding(args.Encoding) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("invalid encoding: %w", err)) + } + + if args.MatchCriteria.OrderingCriteria != nil { + if args.MatchCriteria.OrderingCriteria.TopN < 0 { + errs = multierror.Append(errs, errors.New("'top_n' must not be negative")) + } + + for _, s := range args.MatchCriteria.OrderingCriteria.SortBy { + if !slices.Contains([]string{"timestamp", "numeric", "lexicographic", "mtime"}, s.SortType) { + errs = multierror.Append(errs, fmt.Errorf("invalid sort type: %s", s.SortType)) + } + } + } + + if args.Compression != "" && args.Compression != "gzip" { + errs = multierror.Append(errs, fmt.Errorf("invalid compression type: %s", args.Compression)) + } + + if args.PollInterval < 0 { + errs = multierror.Append(errs, errors.New("'poll_interval' must not be negative")) + } + + if args.FingerprintSize < 0 { + errs = multierror.Append(errs, errors.New("'fingerprint_size' must not be negative")) + } + + if args.MaxLogSize < 0 { + errs = multierror.Append(errs, errors.New("'max_log_size' must not be negative")) + } + + if args.FlushPeriod < 0 { + errs = multierror.Append(errs, errors.New("'force_flush_period' must not be negative")) + } + + if args.MultilineConfig != nil { + if err := args.MultilineConfig.Validate(); err != nil { + errs = multierror.Append(errs, fmt.Errorf("invalid multiline: %w", err)) + } + } + + return errs +} + +// DebugMetricsConfig implements receiver.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/receiver/filelog/filelog_test.go b/internal/component/otelcol/receiver/filelog/filelog_test.go new file mode 100644 index 0000000000..5821240e62 --- /dev/null +++ b/internal/component/otelcol/receiver/filelog/filelog_test.go @@ -0,0 +1,162 @@ +package filelog_test + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/internal/fakeconsumer" + "github.com/grafana/alloy/internal/component/otelcol/receiver/filelog" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/syntax" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" +) + +// Test performs a basic integration test which runs the otelcol.receiver.filelog +// component and ensures that it can receive and forward data. +func Test(t *testing.T) { + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + f, err := os.CreateTemp(t.TempDir(), "example") + require.NoError(t, err) + defer f.Close() + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.receiver.filelog") + require.NoError(t, err) + + cfg := fmt.Sprintf(` + include = ["%s"] + + output { + // no-op: will be overridden by test code. + } + `, f.Name()) + + require.NoError(t, err) + + var args filelog.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + + // Override our settings so logs get forwarded to logsCh. + logCh := make(chan plog.Logs) + args.Output = makeLogsOutput(logCh) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(3*time.Second)) + + // TODO(@dehaansa) - test if this is removeable after https://github.com/grafana/alloy/pull/2262 + time.Sleep(1 * time.Second) + + // Add a log message to the file + f.WriteString(fmt.Sprintf("%s INFO test\n", time.Now().Format(time.RFC3339))) + + // Wait for our client to get a span. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for logs") + case log := <-logCh: + require.Equal(t, 1, log.LogRecordCount()) + } +} + +// makeLogsOutput returns ConsumerArguments which will forward logs to the +// provided channel. +func makeLogsOutput(ch chan plog.Logs) *otelcol.ConsumerArguments { + logsConsumer := fakeconsumer.Consumer{ + ConsumeLogsFunc: func(ctx context.Context, l plog.Logs) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- l: + return nil + } + }, + } + + return &otelcol.ConsumerArguments{ + Logs: []otelcol.Consumer{&logsConsumer}, + } +} + +func TestUnmarshal(t *testing.T) { + alloyCfg := ` + include = ["/var/log/*.log"] + exclude = ["/var/log/excluded.log"] + exclude_older_than = "24h0m0s" + ordering_criteria { + regex = "^(?P\\d{8})_(?P\\d+)_" + top_n = 12 + group_by = "severity" + + sort_by { + sort_type = "timestamp" + regex_key = "timestamp" + ascending = true + layout = "%Y%m%d" + location = "UTC" + } + + sort_by { + sort_type = "numeric" + regex_key = "severity" + ascending = true + } + } + poll_interval = "10s" + max_concurrent_files = 10 + max_batches = 100 + start_at = "beginning" + fingerprint_size = "10KiB" + max_log_size = "10MiB" + encoding = "utf-16" + force_flush_period = "5s" + delete_after_read = true + include_file_record_number = true + compression = "gzip" + acquire_fs_lock = true + + header { + pattern = "^HEADER .*$" + metadata_operators = [] + } + + multiline { + line_start_pattern = "\\d{4}-\\d{2}-\\d{2}" + omit_pattern = true + } + preserve_leading_whitespaces = true + preserve_trailing_whitespaces = true + include_file_name = true + include_file_path = true + include_file_name_resolved = true + include_file_path_resolved = true + include_file_owner_name = true + include_file_owner_group_name = true + attributes = {} + resource = {} + operators = [{ + type = "regex_parser", + regex = "^(?P[^ ]+)", + timestamp = { + parse_from = "attributes.timestamp", + layout = "%Y-%m-%dT%H:%M:%S.%fZ", + location = "UTC", + }, + }] + + output {} + ` + var args filelog.Arguments + err := syntax.Unmarshal([]byte(alloyCfg), &args) + require.NoError(t, err) +} diff --git a/internal/component/otelcol/receiver/syslog/syslog.go b/internal/component/otelcol/receiver/syslog/syslog.go index 842228be86..9d0d99cb74 100644 --- a/internal/component/otelcol/receiver/syslog/syslog.go +++ b/internal/component/otelcol/receiver/syslog/syslog.go @@ -20,8 +20,6 @@ import ( stanzainputtcp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" stanzainputudp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" stanzaparsersyslog "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/syslog" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver" otelcomponent "go.opentelemetry.io/collector/component" otelextension "go.opentelemetry.io/collector/extension" @@ -97,52 +95,18 @@ type TCP struct { AddAttributes bool `alloy:"add_attributes,attr,optional"` OneLogPerPacket bool `alloy:"one_log_per_packet,attr,optional"` Encoding string `alloy:"encoding,attr,optional"` - MultilineConfig *MultilineConfig `alloy:"multiline,block,optional"` - TrimConfig *TrimConfig `alloy:",squash"` + MultilineConfig *otelcol.MultilineConfig `alloy:"multiline,block,optional"` + TrimConfig *otelcol.TrimConfig `alloy:",squash"` } type UDP struct { - ListenAddress string `alloy:"listen_address,attr,optional"` - OneLogPerPacket bool `alloy:"one_log_per_packet,attr,optional"` - AddAttributes bool `alloy:"add_attributes,attr,optional"` - Encoding string `alloy:"encoding,attr,optional"` - MultilineConfig *MultilineConfig `alloy:"multiline,block,optional"` - TrimConfig *TrimConfig `alloy:",squash"` - Async *AsyncConfig `alloy:"async,block,optional"` -} - -type TrimConfig struct { - PreserveLeadingWhitespace bool `alloy:"preserve_leading_whitespaces,attr,optional"` - PreserveTrailingWhitespace bool `alloy:"preserve_trailing_whitespaces,attr,optional"` -} - -func (c *TrimConfig) Convert() *trim.Config { - if c == nil { - return nil - } - - return &trim.Config{ - PreserveLeading: c.PreserveLeadingWhitespace, - PreserveTrailing: c.PreserveTrailingWhitespace, - } -} - -type MultilineConfig struct { - LineStartPattern string `alloy:"line_start_pattern,attr,optional"` - LineEndPattern string `alloy:"line_end_pattern,attr,optional"` - OmitPattern bool `alloy:"omit_pattern,attr,optional"` -} - -func (c *MultilineConfig) Convert() *split.Config { - if c == nil { - return nil - } - - return &split.Config{ - LineStartPattern: c.LineStartPattern, - LineEndPattern: c.LineEndPattern, - OmitPattern: c.OmitPattern, - } + ListenAddress string `alloy:"listen_address,attr,optional"` + OneLogPerPacket bool `alloy:"one_log_per_packet,attr,optional"` + AddAttributes bool `alloy:"add_attributes,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + MultilineConfig *otelcol.MultilineConfig `alloy:"multiline,block,optional"` + TrimConfig *otelcol.TrimConfig `alloy:",squash"` + Async *AsyncConfig `alloy:"async,block,optional"` } type AsyncConfig struct { diff --git a/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go b/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go new file mode 100644 index 0000000000..73798c8eb8 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go @@ -0,0 +1,148 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/alecthomas/units" + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/receiver/filelog" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" +) + +func init() { + converters = append(converters, filelogReceiverConverter{}) +} + +type filelogReceiverConverter struct{} + +func (filelogReceiverConverter) Factory() component.Factory { + return filelogreceiver.NewFactory() +} + +func (filelogReceiverConverter) InputComponentName() string { + return "otelcol.receiver.filelog" +} + +func (filelogReceiverConverter) ConvertAndAppend(state *State, id componentstatus.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toOtelcolReceiverfilelog(cfg.(*filelogreceiver.FileLogConfig)) + + // TODO(@dehaansa) - find a way to convert the operators + if len(cfg.(*filelogreceiver.FileLogConfig).Operators) > 0 { + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Operators cannot currently be translated for %s", StringifyInstanceID(id)), + ) + } + + // TODO(@dehaansa) - find a way to convert the metadata operators + if cfg.(*filelogreceiver.FileLogConfig).InputConfig.Header != nil { + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Header cannot currently be translated for %s", StringifyInstanceID(id)), + ) + } + + block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "filelog"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toOtelcolReceiverfilelog(cfg *filelogreceiver.FileLogConfig) *filelog.Arguments { + args := &filelog.Arguments{ + MatchCriteria: *toOtelcolMatchCriteria(cfg.InputConfig.Criteria), + PollInterval: cfg.InputConfig.PollInterval, + MaxConcurrentFiles: cfg.InputConfig.MaxConcurrentFiles, + MaxBatches: cfg.InputConfig.MaxBatches, + StartAt: cfg.InputConfig.StartAt, + FingerprintSize: units.Base2Bytes(cfg.InputConfig.FingerprintSize), + MaxLogSize: units.Base2Bytes(cfg.InputConfig.MaxLogSize), + Encoding: cfg.InputConfig.Encoding, + FlushPeriod: cfg.InputConfig.FlushPeriod, + DeleteAfterRead: cfg.InputConfig.DeleteAfterRead, + IncludeFileRecordNumber: cfg.InputConfig.IncludeFileRecordNumber, + Compression: cfg.InputConfig.Compression, + AcquireFSLock: cfg.InputConfig.AcquireFSLock, + MultilineConfig: toOtelcolMultilineConfig(cfg.InputConfig.SplitConfig), + TrimConfig: toOtelcolTrimConfig(cfg.InputConfig.TrimConfig), + Resolver: filelog.Resolver(cfg.InputConfig.Resolver), + DebugMetrics: common.DefaultValue[filelog.Arguments]().DebugMetrics, + } + + if len(cfg.InputConfig.Attributes) > 0 { + args.Attributes = make(map[string]string, len(cfg.InputConfig.Attributes)) + for k, v := range cfg.InputConfig.Attributes { + args.Attributes[k] = string(v) + } + } + + if len(cfg.InputConfig.Resource) > 0 { + args.Resource = make(map[string]string, len(cfg.InputConfig.Resource)) + for k, v := range cfg.InputConfig.Resource { + args.Resource[k] = string(v) + } + } + + if cfg.InputConfig.Header != nil { + args.Header = &filelog.HeaderConfig{ + Pattern: cfg.InputConfig.Header.Pattern, + } + } + + // This isn't done in a function because the type is not exported + args.ConsumerRetry = otelcol.ConsumerRetryArguments{ + Enabled: cfg.RetryOnFailure.Enabled, + InitialInterval: cfg.RetryOnFailure.InitialInterval, + MaxInterval: cfg.RetryOnFailure.MaxInterval, + MaxElapsedTime: cfg.RetryOnFailure.MaxElapsedTime, + } + + return args +} + +func toOtelcolMatchCriteria(cfg matcher.Criteria) *filelog.MatchCriteria { + return &filelog.MatchCriteria{ + Include: cfg.Include, + Exclude: cfg.Exclude, + ExcludeOlderThan: cfg.ExcludeOlderThan, + OrderingCriteria: *toOtelcolOrderingCriteria(cfg.OrderingCriteria), + } +} + +func toOtelcolOrderingCriteria(cfg matcher.OrderingCriteria) *filelog.OrderingCriteria { + return &filelog.OrderingCriteria{ + Regex: cfg.Regex, + TopN: cfg.TopN, + SortBy: toOtelcolSortBy(cfg.SortBy), + GroupBy: cfg.GroupBy, + } +} + +func toOtelcolSortBy(cfg []matcher.Sort) []filelog.Sort { + var sorts []filelog.Sort + for _, s := range cfg { + sorts = append(sorts, + filelog.Sort{ + SortType: s.SortType, + RegexKey: s.RegexKey, + Ascending: s.Ascending, + Layout: s.Layout, + Location: s.Location, + }) + } + return sorts +} diff --git a/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go b/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go index 014abc0395..d3420022f0 100644 --- a/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go +++ b/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go @@ -71,8 +71,8 @@ func toOtelcolReceiversyslog(cfg *syslogreceiver.SysLogConfig) *syslog.Arguments AddAttributes: cfg.InputConfig.TCP.AddAttributes, OneLogPerPacket: cfg.InputConfig.TCP.OneLogPerPacket, Encoding: cfg.InputConfig.TCP.Encoding, - MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.TCP.SplitConfig), - TrimConfig: toSyslogTrimConfig(cfg.InputConfig.TCP.TrimConfig), + MultilineConfig: toOtelcolMultilineConfig(cfg.InputConfig.TCP.SplitConfig), + TrimConfig: toOtelcolTrimConfig(cfg.InputConfig.TCP.TrimConfig), } } @@ -82,8 +82,8 @@ func toOtelcolReceiversyslog(cfg *syslogreceiver.SysLogConfig) *syslog.Arguments OneLogPerPacket: cfg.InputConfig.UDP.OneLogPerPacket, AddAttributes: cfg.InputConfig.UDP.AddAttributes, Encoding: cfg.InputConfig.UDP.Encoding, - MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.UDP.SplitConfig), - TrimConfig: toSyslogTrimConfig(cfg.InputConfig.UDP.TrimConfig), + MultilineConfig: toOtelcolMultilineConfig(cfg.InputConfig.UDP.SplitConfig), + TrimConfig: toOtelcolTrimConfig(cfg.InputConfig.UDP.TrimConfig), Async: toSyslogAsyncConfig(cfg.InputConfig.UDP.AsyncConfig), } } @@ -100,16 +100,16 @@ func toOtelcolReceiversyslog(cfg *syslogreceiver.SysLogConfig) *syslog.Arguments } -func toSyslogMultilineConfig(cfg split.Config) *syslog.MultilineConfig { - return &syslog.MultilineConfig{ +func toOtelcolMultilineConfig(cfg split.Config) *otelcol.MultilineConfig { + return &otelcol.MultilineConfig{ LineStartPattern: cfg.LineStartPattern, LineEndPattern: cfg.LineEndPattern, OmitPattern: cfg.OmitPattern, } } -func toSyslogTrimConfig(cfg trim.Config) *syslog.TrimConfig { - return &syslog.TrimConfig{ +func toOtelcolTrimConfig(cfg trim.Config) *otelcol.TrimConfig { + return &otelcol.TrimConfig{ PreserveLeadingWhitespace: cfg.PreserveLeading, PreserveTrailingWhitespace: cfg.PreserveTrailing, } diff --git a/internal/converter/internal/otelcolconvert/testdata/filelog.alloy b/internal/converter/internal/otelcolconvert/testdata/filelog.alloy new file mode 100644 index 0000000000..88287a31c5 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/filelog.alloy @@ -0,0 +1,61 @@ +otelcol.receiver.filelog "default" { + include = ["/var/log/*.log"] + exclude = ["/var/log/excluded.log"] + exclude_older_than = "24h0m0s" + ordering_criteria { + regex = "^(?P\\d{8})_(?P\\d+)_" + top_n = 12 + group_by = "severity" + + sort_by { + sort_type = "timestamp" + regex_key = "timestamp" + ascending = true + layout = "%Y%m%d" + location = "UTC" + } + sort_by { + sort_type = "severity" + regex_key = "numeric" + } + } + poll_interval = "10s" + max_concurrent_files = 10 + max_batches = 100 + start_at = "beginning" + fingerprint_size = "10KiB" + max_log_size = "10MiB" + encoding = "utf-16" + force_flush_period = "5s" + delete_after_read = true + include_file_record_number = true + compression = "gzip" + acquire_fs_lock = true + + header { + pattern = "^HEADER .*$" + metadata_operators = [] + } + + multiline { + line_start_pattern = "\\d{4}-\\d{2}-\\d{2}" + omit_pattern = true + } + preserve_leading_whitespaces = true + preserve_trailing_whitespaces = true + include_file_name = true + include_file_path = true + include_file_name_resolved = true + include_file_path_resolved = true + include_file_owner_name = true + include_file_owner_group_name = true + attributes = {} + resource = {} + operators = [] +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/filelog.yaml b/internal/converter/internal/otelcolconvert/testdata/filelog.yaml new file mode 100644 index 0000000000..46409d6716 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/filelog.yaml @@ -0,0 +1,65 @@ +receivers: + filelog: + include: + - /var/log/*.log + exclude: + - /var/log/excluded.log + start_at: beginning + operators: + - type: regex_parser + regex: '^(?P[^ ]+) (?P[^ ]+) (?P.*)$' + timestamp: + parse_from: attributes.timestamp + layout: '%Y-%m-%dT%H:%M:%S.%fZ' + header: + pattern: '^HEADER .*$' + metadata_operators: + - type: regex_parser + regex: 'env="(?P[^ ]+)"' + poll_interval: 10s + max_concurrent_files: 10 + max_batches: 100 + fingerprint_size: 10KiB + max_log_size: 10MiB + encoding: "utf-16" + force_flush_period: 5s + delete_after_read: true + include_file_record_number: true + include_file_name: true + include_file_path: true + include_file_name_resolved: true + include_file_path_resolved: true + include_file_owner_name: true + include_file_owner_group_name: true + compression: "gzip" + acquire_fs_lock: true + multiline: + line_start_pattern: "\\d{4}-\\d{2}-\\d{2}" + omit_pattern: true + preserve_leading_whitespaces: true + preserve_trailing_whitespaces: true + exclude_older_than: 24h + ordering_criteria: + regex: '^(?P\d{8})_(?P\d+)_' + top_n: 12 + group_by: 'severity' + sort_by: + - regex_key: 'timestamp' + sort_type: 'timestamp' + ascending: true + layout: '%Y%m%d' + location: 'UTC' + - regex_key: 'severity' + sort_type: 'numeric' + ascending: true + +exporters: + otlp: + endpoint: database:4317 + +service: + pipelines: + logs: + receivers: [filelog] + processors: [] + exporters: [otlp] diff --git a/internal/util/otel_feature_gate.go b/internal/util/otel_feature_gate.go index ec3efaf96d..73f70b37ee 100644 --- a/internal/util/otel_feature_gate.go +++ b/internal/util/otel_feature_gate.go @@ -23,6 +23,16 @@ var ( name: "k8sattr.fieldExtractConfigRegex.disallow", enabled: false, }, + { + // This feature gate allows users of the otel filelogreceiver to use the `delete_after_read` setting. + name: "filelog.allowFileDeletion", + enabled: true, + }, + { + // This feature gate allows users of the otel filelogreceiver to use the `header` setting. + name: "filelog.allowHeaderMetadataParsing", + enabled: true, + }, } ) From 82f6960b7b6df3b03c10cb5e5b8787d0958c843a Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Thu, 13 Feb 2025 00:50:07 -0500 Subject: [PATCH 02/11] Commit changelog and fix converter --- CHANGELOG.md | 2 ++ .../internal/otelcolconvert/converter_filelogreceiver.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ca05a37768..83fbbd67ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Main (unreleased) ### Features +- - Add `otelcol.receiver.filelog` component to read otel log entries from files (@dehaansa) + - (_Experimental_) Add a `stage.windowsevent` block in the `loki.process` component. This aims to replace the existing `stage.eventlogmessage`. (@wildum) - Add `pyroscope.relabel` component to modify or filter profiles using Prometheus relabeling rules. (@marcsanmi) diff --git a/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go b/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go index 73798c8eb8..ff522e156c 100644 --- a/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go +++ b/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go @@ -119,7 +119,7 @@ func toOtelcolMatchCriteria(cfg matcher.Criteria) *filelog.MatchCriteria { Include: cfg.Include, Exclude: cfg.Exclude, ExcludeOlderThan: cfg.ExcludeOlderThan, - OrderingCriteria: *toOtelcolOrderingCriteria(cfg.OrderingCriteria), + OrderingCriteria: toOtelcolOrderingCriteria(cfg.OrderingCriteria), } } From 6433a22e98fa1ccdc14a8e7030a21f88fcf758ee Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Thu, 13 Feb 2025 01:12:30 -0500 Subject: [PATCH 03/11] generate docs --- docs/sources/reference/compatibility/_index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 3a22b11990..588946f1dd 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -362,6 +362,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol - [otelcol.processor.transform](../components/otelcol/otelcol.processor.transform) - [otelcol.receiver.datadog](../components/otelcol/otelcol.receiver.datadog) - [otelcol.receiver.file_stats](../components/otelcol/otelcol.receiver.file_stats) +- [otelcol.receiver.filelog](../components/otelcol/otelcol.receiver.filelog) - [otelcol.receiver.influxdb](../components/otelcol/otelcol.receiver.influxdb) - [otelcol.receiver.jaeger](../components/otelcol/otelcol.receiver.jaeger) - [otelcol.receiver.kafka](../components/otelcol/otelcol.receiver.kafka) From 020b225f1abd14ef8f3bb88fe79e484fc7152b07 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Thu, 13 Feb 2025 01:28:45 -0500 Subject: [PATCH 04/11] fix feature gate test --- internal/util/otel_feature_gate.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/internal/util/otel_feature_gate.go b/internal/util/otel_feature_gate.go index 73f70b37ee..398ee60de0 100644 --- a/internal/util/otel_feature_gate.go +++ b/internal/util/otel_feature_gate.go @@ -7,6 +7,8 @@ import ( // Registers the "k8sattr.fieldExtractConfigRegex.disallow" feature gate. _ "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor" + // Registers the "filelog.allowFileDeletion" feature gate. + _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" ) type gateDetails struct { @@ -28,11 +30,6 @@ var ( name: "filelog.allowFileDeletion", enabled: true, }, - { - // This feature gate allows users of the otel filelogreceiver to use the `header` setting. - name: "filelog.allowHeaderMetadataParsing", - enabled: true, - }, } ) From c32d06d13bd8d44edae22aff273ce51d20904ea1 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Thu, 13 Feb 2025 01:40:16 -0500 Subject: [PATCH 05/11] Fix converter test --- .../otelcolconvert/testdata/filelog.alloy | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/internal/converter/internal/otelcolconvert/testdata/filelog.alloy b/internal/converter/internal/otelcolconvert/testdata/filelog.alloy index 88287a31c5..0b865d5f4d 100644 --- a/internal/converter/internal/otelcolconvert/testdata/filelog.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/filelog.alloy @@ -2,10 +2,10 @@ otelcol.receiver.filelog "default" { include = ["/var/log/*.log"] exclude = ["/var/log/excluded.log"] exclude_older_than = "24h0m0s" - ordering_criteria { - regex = "^(?P\\d{8})_(?P\\d+)_" - top_n = 12 - group_by = "severity" + + ordering_criteria { + regex = "^(?P\\d{8})_(?P\\d+)_" + top_n = 12 sort_by { sort_type = "timestamp" @@ -14,10 +14,13 @@ otelcol.receiver.filelog "default" { layout = "%Y%m%d" location = "UTC" } + sort_by { - sort_type = "severity" - regex_key = "numeric" + sort_type = "numeric" + regex_key = "severity" + ascending = true } + group_by = "severity" } poll_interval = "10s" max_concurrent_files = 10 @@ -32,26 +35,22 @@ otelcol.receiver.filelog "default" { compression = "gzip" acquire_fs_lock = true - header { - pattern = "^HEADER .*$" - metadata_operators = [] - } - multiline { line_start_pattern = "\\d{4}-\\d{2}-\\d{2}" omit_pattern = true } preserve_leading_whitespaces = true preserve_trailing_whitespaces = true - include_file_name = true + + header { + pattern = "^HEADER .*$" + metadata_operators = [] + } include_file_path = true include_file_name_resolved = true include_file_path_resolved = true include_file_owner_name = true include_file_owner_group_name = true - attributes = {} - resource = {} - operators = [] } otelcol.exporter.otlp "default" { From c2b3c3028066e54d72266ca2d6e47685ee61f9e0 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Thu, 13 Feb 2025 01:46:27 -0500 Subject: [PATCH 06/11] Fix refactor typo --- internal/component/otelcol/receiver/filelog/filelog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/otelcol/receiver/filelog/filelog.go b/internal/component/otelcol/receiver/filelog/filelog.go index 73e84f4b78..1835a22478 100644 --- a/internal/component/otelcol/receiver/filelog/filelog.go +++ b/internal/component/otelcol/receiver/filelog/filelog.go @@ -207,7 +207,7 @@ func (args Arguments) Convert() (otelcomponent.Config, error) { cfg.InputConfig.Criteria.Include = args.MatchCriteria.Include cfg.InputConfig.Criteria.Exclude = args.MatchCriteria.Exclude cfg.InputConfig.Criteria.ExcludeOlderThan = args.MatchCriteria.ExcludeOlderThan - if args.MatchCriteria.OrderingCriteria == nil { + if args.MatchCriteria.OrderingCriteria != nil { cfg.InputConfig.Criteria.OrderingCriteria.Regex = args.MatchCriteria.OrderingCriteria.Regex cfg.InputConfig.Criteria.OrderingCriteria.TopN = args.MatchCriteria.OrderingCriteria.TopN cfg.InputConfig.Criteria.OrderingCriteria.GroupBy = args.MatchCriteria.OrderingCriteria.GroupBy From 15801fe9308106b280c65518a18a07b71e46bb55 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Fri, 14 Feb 2025 23:31:28 -0500 Subject: [PATCH 07/11] address PR feedback --- .../otelcol/otelcol.receiver.filelog.md | 4 +- internal/alloycli/automemlimit_linux.go | 3 + .../otelcol/config_stanza_receivers.go | 2 + .../otelcol/receiver/filelog/filelog.go | 36 +++++++--- .../otelcol/receiver/filelog/filelog_test.go | 70 ++++++++++++++++++- .../converter_filelogreceiver.go | 8 +-- .../otelcolconvert/testdata/filelog.diags | 2 + 7 files changed, 108 insertions(+), 17 deletions(-) create mode 100644 internal/converter/internal/otelcolconvert/testdata/filelog.diags diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md b/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md index 29a40eabbd..515982ad8f 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md @@ -38,7 +38,7 @@ The following arguments are supported: |---------------------------------|---------------------|--------------------------------------------------------------------------------------------|-------------|----------| | `include` | `list(string)` | A list of glob patterns to include files. | `[]` | yes | | `exclude` | `list(string)` | A list of glob patterns to exclude files that would be included by the `include` patterns. | `[]` | no | -| `poll_interval` | `time.Duration` | The interval at which the file is polled for new entries. | `1s` | no | +| `poll_interval` | `time.Duration` | The interval at which the file is polled for new entries. | `200ms` | no | | `max_concurrent_files` | `int` | The maximum number of files to read concurrently. | `10` | no | | `max_batches` | `int` | The maximum number of batches to process concurrently. | `10` | no | | `start_at` | `string` | The position to start reading the file from. | `beginning` | no | @@ -104,7 +104,7 @@ The following blocks are supported inside the definition of | retry_on_failure | [retry_on_failure][] | Configures the retry behavior when the receiver encounters an error downstream in the pipeline. | no | | debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no | | ordering_criteria | [ordering_criteria][] | Configures the order in which log files are processed. | no | -| ordering_criteria > sort_by | [sort_by][] | Configures the fields to sort by within the ordering critera. | no | +| ordering_criteria > sort_by | [sort_by][] | Configures the fields to sort by within the ordering critera. | yes | The `>` symbol indicates deeper levels of nesting. For example, `ordering_criteria > sort_by` refers to a `sort_by` block defined inside a `ordering_criteria` block. diff --git a/internal/alloycli/automemlimit_linux.go b/internal/alloycli/automemlimit_linux.go index 148c160a46..90155e2353 100644 --- a/internal/alloycli/automemlimit_linux.go +++ b/internal/alloycli/automemlimit_linux.go @@ -1,3 +1,6 @@ +//go:build linux +// +build linux + package alloycli import ( diff --git a/internal/component/otelcol/config_stanza_receivers.go b/internal/component/otelcol/config_stanza_receivers.go index 25da09cb7e..719889fb4c 100644 --- a/internal/component/otelcol/config_stanza_receivers.go +++ b/internal/component/otelcol/config_stanza_receivers.go @@ -1,3 +1,5 @@ +// Stanza is the name of the logs agent that was donated to the OpenTelemetry project. +// Stanza receivers are logs receivers built of stanza operators. package otelcol import ( diff --git a/internal/component/otelcol/receiver/filelog/filelog.go b/internal/component/otelcol/receiver/filelog/filelog.go index 1835a22478..8c5b673159 100644 --- a/internal/component/otelcol/receiver/filelog/filelog.go +++ b/internal/component/otelcol/receiver/filelog/filelog.go @@ -244,10 +244,11 @@ func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { // Validate implements syntax.Validator. func (args *Arguments) Validate() error { var errs error - // Convert will validate operator(s) - _, err := args.Convert() - if err != nil { - errs = multierror.Append(errs, err) + for _, op := range args.Operators { + _, err := op.Convert() + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("failed to parse 'operator': %w", err)) + } } if args.MaxConcurrentFiles < 1 { @@ -262,9 +263,9 @@ func (args *Arguments) Validate() error { errs = multierror.Append(errs, errors.New("'delete_after_read' cannot be used with 'start_at = end'")) } - _, err = decode.LookupEncoding(args.Encoding) + _, err := decode.LookupEncoding(args.Encoding) if err != nil { - errs = multierror.Append(errs, fmt.Errorf("invalid encoding: %w", err)) + errs = multierror.Append(errs, fmt.Errorf("invalid 'encoding': %w", err)) } if args.MatchCriteria.OrderingCriteria != nil { @@ -274,13 +275,13 @@ func (args *Arguments) Validate() error { for _, s := range args.MatchCriteria.OrderingCriteria.SortBy { if !slices.Contains([]string{"timestamp", "numeric", "lexicographic", "mtime"}, s.SortType) { - errs = multierror.Append(errs, fmt.Errorf("invalid sort type: %s", s.SortType)) + errs = multierror.Append(errs, fmt.Errorf("invalid 'sort_type': %s", s.SortType)) } } } if args.Compression != "" && args.Compression != "gzip" { - errs = multierror.Append(errs, fmt.Errorf("invalid compression type: %s", args.Compression)) + errs = multierror.Append(errs, fmt.Errorf("invalid 'compression' type: %s", args.Compression)) } if args.PollInterval < 0 { @@ -299,9 +300,26 @@ func (args *Arguments) Validate() error { errs = multierror.Append(errs, errors.New("'force_flush_period' must not be negative")) } + if args.MatchCriteria.ExcludeOlderThan < 0 { + errs = multierror.Append(errs, errors.New("'exclude_older_than' must not be negative")) + } + if args.MultilineConfig != nil { if err := args.MultilineConfig.Validate(); err != nil { - errs = multierror.Append(errs, fmt.Errorf("invalid multiline: %w", err)) + errs = multierror.Append(errs, fmt.Errorf("invalid 'multiline': %w", err)) + } + } + + if args.Header != nil { + if len(args.Header.MetadataOperators) == 0 { + errs = multierror.Append(errs, errors.New("'header' requires at least one 'metadata_operator'")) + } else { + for _, op := range args.Header.MetadataOperators { + _, err := op.Convert() + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("failed to parse 'metadata_operator': %w", err)) + } + } } } diff --git a/internal/component/otelcol/receiver/filelog/filelog_test.go b/internal/component/otelcol/receiver/filelog/filelog_test.go index 5821240e62..799f518079 100644 --- a/internal/component/otelcol/receiver/filelog/filelog_test.go +++ b/internal/component/otelcol/receiver/filelog/filelog_test.go @@ -54,7 +54,7 @@ func Test(t *testing.T) { require.NoError(t, ctrl.WaitRunning(3*time.Second)) - // TODO(@dehaansa) - test if this is removeable after https://github.com/grafana/alloy/pull/2262 + // TODO(@dehaansa) - discover a better way to wait for otelcol component readiness time.Sleep(1 * time.Second) // Add a log message to the file @@ -127,7 +127,10 @@ func TestUnmarshal(t *testing.T) { header { pattern = "^HEADER .*$" - metadata_operators = [] + metadata_operators = [{ + type = "regex_parser", + regex = "^HEADER env='(?P[^ ]+)'", + }] } multiline { @@ -159,4 +162,67 @@ func TestUnmarshal(t *testing.T) { var args filelog.Arguments err := syntax.Unmarshal([]byte(alloyCfg), &args) require.NoError(t, err) + + err = args.Validate() + require.NoError(t, err) +} + +func TestValidate(t *testing.T) { + alloyCfg := ` + include = ["/var/log/*.log"] + exclude_older_than = "-5m" + ordering_criteria { + regex = "^(?P\\d{8})_(?P\\d+)_" + top_n = -3 + group_by = "severity" + + sort_by { + sort_type = "std_dev" + regex_key = "severity" + ascending = true + } + } + poll_interval = "-10s" + max_concurrent_files = 0 + max_batches = -3 + start_at = "middle" + fingerprint_size = "-4KiB" + max_log_size = "-3MiB" + encoding = "webdings" + force_flush_period = "-5s" + compression = "tar" + + header { + pattern = "^HEADER .*$" + metadata_operators = [] + } + + operators = [{ + type = "regex_parser", + regex = "^(?P[^ ]+)", + timestamp = { + parse_from = "timestamp", + layout = "%Y-%m-%dT%H:%M:%S.%fZ", + location = "UTC", + }, + }] + + output {} + ` + var args filelog.Arguments + err := syntax.Unmarshal([]byte(alloyCfg), &args) + require.Error(t, err) + require.Contains(t, err.Error(), "error decoding 'parse_from': unrecognized prefix") + require.Contains(t, err.Error(), "'max_concurrent_files' must be positive") + require.Contains(t, err.Error(), "'max_batches' must not be negative") + require.Contains(t, err.Error(), "invalid 'encoding': unsupported encoding 'webdings'") + require.Contains(t, err.Error(), "'top_n' must not be negative") + require.Contains(t, err.Error(), "invalid 'sort_type': std_dev") + require.Contains(t, err.Error(), "invalid 'compression' type: tar") + require.Contains(t, err.Error(), "'poll_interval' must not be negative") + require.Contains(t, err.Error(), "'fingerprint_size' must not be negative") + require.Contains(t, err.Error(), "'max_log_size' must not be negative") + require.Contains(t, err.Error(), "'force_flush_period' must not be negative") + require.Contains(t, err.Error(), "'header' requires at least one 'metadata_operator'") + require.Contains(t, err.Error(), "'exclude_older_than' must not be negative") } diff --git a/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go b/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go index ff522e156c..bff344d49f 100644 --- a/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go +++ b/internal/converter/internal/otelcolconvert/converter_filelogreceiver.go @@ -38,16 +38,16 @@ func (filelogReceiverConverter) ConvertAndAppend(state *State, id componentstatu // TODO(@dehaansa) - find a way to convert the operators if len(cfg.(*filelogreceiver.FileLogConfig).Operators) > 0 { diags.Add( - diag.SeverityLevelInfo, - fmt.Sprintf("Operators cannot currently be translated for %s", StringifyInstanceID(id)), + diag.SeverityLevelWarn, + fmt.Sprintf("operators cannot currently be translated for %s", StringifyInstanceID(id)), ) } // TODO(@dehaansa) - find a way to convert the metadata operators if cfg.(*filelogreceiver.FileLogConfig).InputConfig.Header != nil { diags.Add( - diag.SeverityLevelInfo, - fmt.Sprintf("Header cannot currently be translated for %s", StringifyInstanceID(id)), + diag.SeverityLevelWarn, + fmt.Sprintf("header metadata_operators cannot currently be translated for %s", StringifyInstanceID(id)), ) } diff --git a/internal/converter/internal/otelcolconvert/testdata/filelog.diags b/internal/converter/internal/otelcolconvert/testdata/filelog.diags new file mode 100644 index 0000000000..752b637f19 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/filelog.diags @@ -0,0 +1,2 @@ +(Warn) operators cannot currently be translated for filelog +(Warn) header metadata_operators cannot currently be translated for filelog From d845e39a4efbfb5a78ffa8f8e8b96c9e1f513272 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Fri, 14 Feb 2025 23:32:46 -0500 Subject: [PATCH 08/11] Apply suggestions from code review Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- .../otelcol/otelcol.receiver.filelog.md | 131 +++++++++--------- 1 file changed, 65 insertions(+), 66 deletions(-) diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md b/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md index 515982ad8f..b8e56380d3 100644 --- a/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.filelog.md @@ -1,12 +1,12 @@ --- canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.receiver.filelog/ description: Learn about otelcol.receiver.filelog +labels: + stage: public-preview title: otelcol.receiver.filelog --- -Public preview - -# otelcol.receiver.filelog +# `otelcol.receiver.filelog` {{< docs/shared lookup="stability/public_preview.md" source="alloy" version="" >}} @@ -22,7 +22,7 @@ You can specify multiple `otelcol.receiver.filelog` components by giving them di ## Usage ```alloy -otelcol.receiver.filelog "LABEL" { +otelcol.receiver.filelog "