diff --git a/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml b/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml new file mode 100644 index 000000000000..aecdf987b857 --- /dev/null +++ b/.chloggen/mysqlreceiver_older_replica_metrics_support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: mysqlreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add replica metric support for versions of MySQL earlier than 8.0.22. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35217] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/mysqlreceiver/client.go b/receiver/mysqlreceiver/client.go index 736257e6a623..3aeabf1b777b 100644 --- a/receiver/mysqlreceiver/client.go +++ b/receiver/mysqlreceiver/client.go @@ -12,11 +12,12 @@ import ( // registers the mysql driver "github.com/go-sql-driver/mysql" + "github.com/hashicorp/go-version" ) type client interface { Connect() error - getVersion() (string, error) + getVersion() (*version.Version, error) getGlobalStats() (map[string]string, error) getInnodbStats() (map[string]string, error) getTableStats() ([]TableStats, error) @@ -110,66 +111,83 @@ type tableLockWaitEventStats struct { } type ReplicaStatusStats struct { - replicaIOState string - sourceHost string - sourceUser string - sourcePort int64 - connectRetry int64 - sourceLogFile string - readSourceLogPos int64 - relayLogFile string - relayLogPos int64 - relaySourceLogFile string - replicaIORunning string - replicaSQLRunning string - replicateDoDB string - replicateIgnoreDB string - replicateDoTable string - replicateIgnoreTable string - replicateWildDoTable string - replicateWildIgnoreTable string - lastErrno int64 - lastError string - skipCounter int64 - execSourceLogPos int64 - relayLogSpace int64 - untilCondition string - untilLogFile string - untilLogPos string - sourceSSLAllowed string - sourceSSLCAFile string - sourceSSLCAPath string - sourceSSLCert string - sourceSSLCipher string - sourceSSLKey string - secondsBehindSource sql.NullInt64 - sourceSSLVerifyServerCert string - lastIOErrno int64 - lastIOError string - lastSQLErrno int64 - lastSQLError string - replicateIgnoreServerIDs string - sourceServerID int64 - sourceUUID string - sourceInfoFile string - sqlDelay int64 - sqlRemainingDelay sql.NullInt64 - replicaSQLRunningState string - sourceRetryCount int64 - sourceBind string - lastIOErrorTimestamp string - lastSQLErrorTimestamp string - sourceSSLCrl string - sourceSSLCrlpath string - retrievedGtidSet string - executedGtidSet string - autoPosition string - replicateRewriteDB string - channelName string - sourceTLSVersion string - sourcePublicKeyPath string - getSourcePublicKey int64 - networkNamespace string + replicaIOState string + sourceHost string + sourceUser string + sourcePort int64 + connectRetry int64 + sourceLogFile string + readSourceLogPos int64 + relayLogFile string + relayLogPos int64 + relaySourceLogFile string + replicaIORunning string + replicaSQLRunning string + replicateDoDB string + replicateIgnoreDB string + replicateDoTable string + replicateIgnoreTable string + replicateWildDoTable string + replicateWildIgnoreTable string + lastErrno int64 + lastError string + skipCounter int64 + execSourceLogPos int64 + relayLogSpace int64 + untilCondition string + untilLogFile string + untilLogPos string + sourceSSLAllowed string + sourceSSLCAFile string + sourceSSLCAPath string + sourceSSLCert string + sourceSSLCipher string + sourceSSLKey string + secondsBehindSource sql.NullInt64 + sourceSSLVerifyServerCert string + lastIOErrno int64 + lastIOError string + lastSQLErrno int64 + lastSQLError string + replicateIgnoreServerIDs string + sourceServerID int64 + sourceUUID string + sourceInfoFile string + sqlDelay int64 + sqlRemainingDelay sql.NullInt64 + replicaSQLRunningState string + sourceRetryCount int64 + sourceBind string + lastIOErrorTimestamp string + lastSQLErrorTimestamp string + sourceSSLCrl string + sourceSSLCrlpath string + retrievedGtidSet string + executedGtidSet string + autoPosition string + replicateRewriteDB string + channelName string + sourceTLSVersion string + sourcePublicKeyPath string + getSourcePublicKey int64 + networkNamespace string + usingGtid string + gtidIoPos string + slaveDdlGroups int64 + slaveNonTransactionalGroups int64 + slaveTransactionalGroups int64 + retriedTransactions int64 + maxRelayLogSize int64 + executedLogEntries int64 + slaveReceivedHeartbeats int64 + slaveHeartbeatPeriod int64 + gtidSlavePos string + masterLastEventTime string + slaveLastEventTime string + masterSlaveTimeDiff string + parallelMode string + replicateDoDomainIDs string + replicateIgnoreDomainIDs string } var _ client = (*mySQLClient)(nil) @@ -218,15 +236,15 @@ func (c *mySQLClient) Connect() error { } // getVersion queries the db for the version. -func (c *mySQLClient) getVersion() (string, error) { +func (c *mySQLClient) getVersion() (*version.Version, error) { query := "SELECT VERSION();" - var version string - err := c.client.QueryRow(query).Scan(&version) + var versionStr string + err := c.client.QueryRow(query).Scan(&versionStr) if err != nil { - return "", err + return nil, err } - - return version, nil + version, err := version.NewVersion(versionStr) + return version, err } // getGlobalStats queries the db for global status metrics. @@ -397,16 +415,19 @@ func (c *mySQLClient) getTableLockWaitEventStats() ([]tableLockWaitEventStats, e } func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { - version, err := c.getVersion() + mysqlVersion, err := c.getVersion() if err != nil { return nil, err } - if version < "8.0.22" { - return nil, nil + query := "SHOW REPLICA STATUS" + minMysqlVersion, _ := version.NewVersion("8.0.22") + if strings.Contains(mysqlVersion.String(), "MariaDB") { + query = "SHOW SLAVE STATUS" + } else if mysqlVersion.LessThan(minMysqlVersion) { + query = "SHOW SLAVE STATUS" } - query := "SHOW REPLICA STATUS" rows, err := c.client.Query(query) if err != nil { @@ -427,28 +448,46 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { switch strings.ToLower(col) { case "replica_io_state": dest = append(dest, &s.replicaIOState) + case "slave_io_state": + dest = append(dest, &s.replicaIOState) case "source_host": dest = append(dest, &s.sourceHost) + case "master_host": + dest = append(dest, &s.sourceHost) case "source_user": dest = append(dest, &s.sourceUser) + case "master_user": + dest = append(dest, &s.sourceUser) case "source_port": dest = append(dest, &s.sourcePort) + case "master_port": + dest = append(dest, &s.sourcePort) case "connect_retry": dest = append(dest, &s.connectRetry) case "source_log_file": dest = append(dest, &s.sourceLogFile) + case "master_log_file": + dest = append(dest, &s.sourceLogFile) case "read_source_log_pos": dest = append(dest, &s.readSourceLogPos) + case "read_master_log_pos": + dest = append(dest, &s.readSourceLogPos) case "relay_log_file": dest = append(dest, &s.relayLogFile) case "relay_log_pos": dest = append(dest, &s.relayLogPos) case "relay_source_log_file": dest = append(dest, &s.relaySourceLogFile) + case "relay_master_log_file": + dest = append(dest, &s.relaySourceLogFile) case "replica_io_running": dest = append(dest, &s.replicaIORunning) + case "slave_io_running": + dest = append(dest, &s.replicaIORunning) case "replica_sql_running": dest = append(dest, &s.replicaSQLRunning) + case "slave_sql_running": + dest = append(dest, &s.replicaSQLRunning) case "replicate_do_db": dest = append(dest, &s.replicateDoDB) case "replicate_ignore_db": @@ -469,6 +508,8 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.skipCounter) case "exec_source_log_pos": dest = append(dest, &s.execSourceLogPos) + case "exec_master_log_pos": + dest = append(dest, &s.execSourceLogPos) case "relay_log_space": dest = append(dest, &s.relayLogSpace) case "until_condition": @@ -479,20 +520,36 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.untilLogPos) case "source_ssl_allowed": dest = append(dest, &s.sourceSSLAllowed) + case "master_ssl_allowed": + dest = append(dest, &s.sourceSSLAllowed) case "source_ssl_ca_file": dest = append(dest, &s.sourceSSLCAFile) + case "master_ssl_ca_file": + dest = append(dest, &s.sourceSSLCAFile) case "source_ssl_ca_path": dest = append(dest, &s.sourceSSLCAPath) + case "master_ssl_ca_path": + dest = append(dest, &s.sourceSSLCAPath) case "source_ssl_cert": dest = append(dest, &s.sourceSSLCert) + case "master_ssl_cert": + dest = append(dest, &s.sourceSSLCert) case "source_ssl_cipher": dest = append(dest, &s.sourceSSLCipher) + case "master_ssl_cipher": + dest = append(dest, &s.sourceSSLCipher) case "source_ssl_key": dest = append(dest, &s.sourceSSLKey) + case "master_ssl_key": + dest = append(dest, &s.sourceSSLKey) case "seconds_behind_source": dest = append(dest, &s.secondsBehindSource) + case "seconds_behind_master": + dest = append(dest, &s.secondsBehindSource) case "source_ssl_verify_server_cert": dest = append(dest, &s.sourceSSLVerifyServerCert) + case "master_ssl_verify_server_cert": + dest = append(dest, &s.sourceSSLVerifyServerCert) case "last_io_errno": dest = append(dest, &s.lastIOErrno) case "last_io_error": @@ -505,28 +562,44 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.replicateIgnoreServerIDs) case "source_server_id": dest = append(dest, &s.sourceServerID) + case "master_server_id": + dest = append(dest, &s.sourceServerID) case "source_uuid": dest = append(dest, &s.sourceUUID) + case "master_uuid": + dest = append(dest, &s.sourceUUID) case "source_info_file": dest = append(dest, &s.sourceInfoFile) + case "master_info_file": + dest = append(dest, &s.sourceInfoFile) case "sql_delay": dest = append(dest, &s.sqlDelay) case "sql_remaining_delay": dest = append(dest, &s.sqlRemainingDelay) case "replica_sql_running_state": dest = append(dest, &s.replicaSQLRunningState) + case "slave_sql_running_state": + dest = append(dest, &s.replicaSQLRunningState) case "source_retry_count": dest = append(dest, &s.sourceRetryCount) + case "master_retry_count": + dest = append(dest, &s.sourceRetryCount) case "source_bind": dest = append(dest, &s.sourceBind) + case "master_bind": + dest = append(dest, &s.sourceBind) case "last_io_error_timestamp": dest = append(dest, &s.lastIOErrorTimestamp) case "last_sql_error_timestamp": dest = append(dest, &s.lastSQLErrorTimestamp) case "source_ssl_crl": dest = append(dest, &s.sourceSSLCrl) + case "master_ssl_crl": + dest = append(dest, &s.sourceSSLCrl) case "source_ssl_crlpath": dest = append(dest, &s.sourceSSLCrlpath) + case "master_ssl_crlpath": + dest = append(dest, &s.sourceSSLCrlpath) case "retrieved_gtid_set": dest = append(dest, &s.retrievedGtidSet) case "executed_gtid_set": @@ -539,12 +612,52 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { dest = append(dest, &s.channelName) case "source_tls_version": dest = append(dest, &s.sourceTLSVersion) + case "master_tls_version": + dest = append(dest, &s.sourceTLSVersion) case "source_public_key_path": dest = append(dest, &s.sourcePublicKeyPath) + case "master_public_key_path": + dest = append(dest, &s.sourcePublicKeyPath) case "get_source_public_key": dest = append(dest, &s.getSourcePublicKey) + case "get_master_public_key": + dest = append(dest, &s.getSourcePublicKey) case "network_namespace": dest = append(dest, &s.networkNamespace) + case "using_gtid": + dest = append(dest, &s.usingGtid) + case "gtid_io_pos": + dest = append(dest, &s.gtidIoPos) + case "slave_ddl_groups": + dest = append(dest, &s.slaveDdlGroups) + case "slave_non_transactional_groups": + dest = append(dest, &s.slaveNonTransactionalGroups) + case "slave_transactional_groups": + dest = append(dest, &s.slaveTransactionalGroups) + case "retried_transactions": + dest = append(dest, &s.retriedTransactions) + case "max_relay_log_size": + dest = append(dest, &s.maxRelayLogSize) + case "executed_log_entries": + dest = append(dest, &s.executedLogEntries) + case "slave_received_heartbeats": + dest = append(dest, &s.slaveReceivedHeartbeats) + case "slave_heartbeat_period": + dest = append(dest, &s.slaveHeartbeatPeriod) + case "gtid_slave_pos": + dest = append(dest, &s.gtidSlavePos) + case "master_last_event_time": + dest = append(dest, &s.masterLastEventTime) + case "slave_last_event_time": + dest = append(dest, &s.slaveLastEventTime) + case "master_slave_time_diff": + dest = append(dest, &s.masterSlaveTimeDiff) + case "parallel_mode": + dest = append(dest, &s.parallelMode) + case "replicate_do_domain_ids": + dest = append(dest, &s.replicateDoDomainIDs) + case "replicate_ignore_domain_ids": + dest = append(dest, &s.replicateIgnoreDomainIDs) default: return nil, fmt.Errorf("unknown column name %s for replica status", col) } diff --git a/receiver/mysqlreceiver/go.mod b/receiver/mysqlreceiver/go.mod index 44c73fe67420..9f70a56d04ab 100644 --- a/receiver/mysqlreceiver/go.mod +++ b/receiver/mysqlreceiver/go.mod @@ -48,6 +48,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect diff --git a/receiver/mysqlreceiver/go.sum b/receiver/mysqlreceiver/go.sum index cf30c4b18167..a054b1a95e2a 100644 --- a/receiver/mysqlreceiver/go.sum +++ b/receiver/mysqlreceiver/go.sum @@ -58,6 +58,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/receiver/mysqlreceiver/scraper_test.go b/receiver/mysqlreceiver/scraper_test.go index 01dc4dd11840..8753752e948a 100644 --- a/receiver/mysqlreceiver/scraper_test.go +++ b/receiver/mysqlreceiver/scraper_test.go @@ -12,6 +12,7 @@ import ( "strings" "testing" + "github.com/hashicorp/go-version" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/confignet" @@ -158,8 +159,9 @@ func (c *mockClient) Connect() error { return nil } -func (c *mockClient) getVersion() (string, error) { - return "8.0.27", nil +func (c *mockClient) getVersion() (*version.Version, error) { + version, _ := version.NewVersion("8.0.27") + return version, nil } func (c *mockClient) getGlobalStats() (map[string]string, error) {