Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication lag for group replication. #904

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ collect.perf_schema.tablelocks | 5.6 | C
collect.perf_schema.replication_group_members | 5.7 | Collect metrics from performance_schema.replication_group_members.
collect.perf_schema.replication_group_member_stats | 5.7 | Collect metrics from performance_schema.replication_group_member_stats.
collect.perf_schema.replication_applier_status_by_worker | 5.7 | Collect metrics from performance_schema.replication_applier_status_by_worker.
collect.perf_schema.replication_group_member_lag | 5.7 | Collect group member lag from performance_schema.
collect.slave_status | 5.1 | Collect from SHOW SLAVE STATUS (Enabled by default)
collect.slave_hosts | 5.1 | Collect from SHOW SLAVE HOSTS
collect.sys.user_summary | 5.7 | Collect metrics from sys.x$user_summary (disabled by default).
Expand Down
93 changes: 93 additions & 0 deletions collector/perf_schema_replication_group_member_lag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"database/sql"
"log/slog"

"github.com/prometheus/client_golang/prometheus"
)

const perfReplicationGroupMemberLagQuery = `
SELECT IF(
applier_coordinator_status.SERVICE_STATE = 'OFF'
OR conn_status.SERVICE_STATE = 'OFF',
99999999,
IF(
GTID_SUBTRACT(conn_status.LAST_QUEUED_TRANSACTION,
applier_status.LAST_APPLIED_TRANSACTION) = ''
OR UNIX_TIMESTAMP(applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) =
0,
0,
TIME_TO_SEC(TIMEDIFF(
NOW(6),
applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP
))
)
) AS replication_group_member_lag
FROM performance_schema.replication_connection_status AS conn_status
JOIN performance_schema.replication_applier_status_by_worker AS applier_status
ON applier_status.channel_name = conn_status.channel_name
JOIN performance_schema.replication_applier_status_by_coordinator AS applier_coordinator_status
ON applier_coordinator_status.channel_name = conn_status.channel_name
WHERE conn_status.channel_name = 'group_replication_applier'
ORDER BY IF(GTID_SUBTRACT(conn_status.LAST_QUEUED_TRANSACTION,
applier_status.LAST_APPLIED_TRANSACTION) = ''
OR UNIX_TIMESTAMP(applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP) = 0,
'1-IDLE', '0-EXECUTING') ASC,
applier_status.APPLYING_TRANSACTION_IMMEDIATE_COMMIT_TIMESTAMP ASC
LIMIT 1;
`

// ScrapeReplicationGroupMembers collects from `performance_schema.replication_group_members`.
type ScrapePerfReplicationGroupMemberLag struct{}

// Name of the Scraper. Should be unique.
func (ScrapePerfReplicationGroupMemberLag) Name() string {
return performanceSchema + ".replication_group_member_lag"
}

// Help describes the role of the Scraper.
func (ScrapePerfReplicationGroupMemberLag) Help() string {
return "Collect the replication lag according to applier queue from performance_schema group replication tables"
}

// Version of MySQL from which scraper is available.
func (ScrapePerfReplicationGroupMemberLag) Version() float64 {
return 5.7
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapePerfReplicationGroupMemberLag) Scrape(ctx context.Context, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) error {
db := instance.getDB()
var lag uint64
err := db.QueryRowContext(ctx, perfReplicationGroupMemberLagQuery).Scan(&lag)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(prometheus.BuildFQName(namespace, performanceSchema, "replication_group_member_lag"),
"Group replication lag in seconds", nil, nil),
prometheus.GaugeValue, float64(lag),
)
return nil
}

// check interface
var _ Scraper = ScrapePerfReplicationGroupMemberLag{}
61 changes: 61 additions & 0 deletions collector/perf_schema_replication_group_member_lag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/promslog"
"github.com/smartystreets/goconvey/convey"
)

func TestScrapePerfReplicationGroupMemberLag(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
inst := &instance{db: db}

columns := []string{"replication_group_member_lag"}
rows := sqlmock.NewRows(columns).AddRow(1)
mock.ExpectQuery(sanitizeQuery(perfReplicationGroupMemberLagQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapePerfReplicationGroupMemberLag{}).Scrape(context.Background(), inst, ch, promslog.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()

metricExpected := []MetricResult{
{labels: labelMap{}, value: 1, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range metricExpected {
got := readMetric(<-ch)
convey.So(got, convey.ShouldResemble, expect)
}
})

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unfulfilled expectations: %s", err)
}
}
1 change: 1 addition & 0 deletions mysqld_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ var scrapers = map[collector.Scraper]bool{
collector.ScrapePerfReplicationGroupMembers{}: false,
collector.ScrapePerfReplicationGroupMemberStats{}: false,
collector.ScrapePerfReplicationApplierStatsByWorker{}: false,
collector.ScrapePerfReplicationGroupMemberLag{}: false,
collector.ScrapeSysUserSummary{}: false,
collector.ScrapeUserStat{}: false,
collector.ScrapeClientStat{}: false,
Expand Down