diff --git a/internal/component/database_observability/mysql/collector/connection_info.go b/internal/component/database_observability/mysql/collector/connection_info.go index 05c83fb4a1..d2ea416af5 100644 --- a/internal/component/database_observability/mysql/collector/connection_info.go +++ b/internal/component/database_observability/mysql/collector/connection_info.go @@ -33,7 +33,7 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { Namespace: "database_observability", Name: "connection_info", Help: "Information about the connection", - }, []string{"provider_name", "region", "db_instance_identifier"}) + }, []string{"provider_name", "provider_region", "db_instance_identifier"}) args.Registry.MustRegister(infoMetric) diff --git a/internal/component/database_observability/mysql/collector/connection_info_test.go b/internal/component/database_observability/mysql/collector/connection_info_test.go index 8a480a9863..ffadaf33da 100644 --- a/internal/component/database_observability/mysql/collector/connection_info_test.go +++ b/internal/component/database_observability/mysql/collector/connection_info_test.go @@ -18,7 +18,7 @@ func TestConnectionInfo(t *testing.T) { const baseExpectedMetrics = ` # HELP database_observability_connection_info Information about the connection # TYPE database_observability_connection_info gauge - database_observability_connection_info{db_instance_identifier="%s",provider_name="%s",region="%s"} 1 + database_observability_connection_info{db_instance_identifier="%s",provider_name="%s",provider_region="%s"} 1 ` testCases := []struct { @@ -28,12 +28,12 @@ func TestConnectionInfo(t *testing.T) { }{ { name: "generic dsn", - dsn: "user:pass@tcp(localhost:3306)/db", + dsn: "user:pass@tcp(localhost:3306)/dbname", expectedMetrics: fmt.Sprintf(baseExpectedMetrics, "unknown", "unknown", "unknown"), }, { name: "AWS/RDS dsn", - dsn: "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/db", + dsn: "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/dbname", expectedMetrics: fmt.Sprintf(baseExpectedMetrics, "products-db", "aws", "us-east-1"), }, } diff --git a/internal/component/database_observability/mysql/collector/query_sample.go b/internal/component/database_observability/mysql/collector/query_sample.go index 2e83c9ec1d..9bc2ff85ff 100644 --- a/internal/component/database_observability/mysql/collector/query_sample.go +++ b/internal/component/database_observability/mysql/collector/query_sample.go @@ -62,7 +62,7 @@ func NewQuerySample(args QuerySampleArguments) (*QuerySample, error) { instanceKey: args.InstanceKey, collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, - logger: log.With(args.Logger, "collector", "QuerySample"), + logger: log.With(args.Logger, "collector", QuerySampleName), running: &atomic.Bool{}, }, nil } @@ -72,7 +72,7 @@ func (c *QuerySample) Name() string { } func (c *QuerySample) Start(ctx context.Context) error { - level.Debug(c.logger).Log("msg", "QuerySample collector started") + level.Debug(c.logger).Log("msg", QuerySampleName+" collector started") c.running.Store(true) ctx, cancel := context.WithCancel(ctx) diff --git a/internal/component/database_observability/mysql/collector/schema_table.go b/internal/component/database_observability/mysql/collector/schema_table.go index 1446231b15..9574388776 100644 --- a/internal/component/database_observability/mysql/collector/schema_table.go +++ b/internal/component/database_observability/mysql/collector/schema_table.go @@ -64,6 +64,20 @@ const ( WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION ASC` + + selectIndexNames = ` + SELECT + index_name, + seq_in_index, + column_name, + nullable, + non_unique, + index_type + FROM + information_schema.statistics + WHERE + table_schema = ? and table_name = ? + ORDER BY table_name, index_name, seq_in_index` ) type SchemaTableArguments struct { @@ -109,6 +123,7 @@ type tableInfo struct { type tableSpec struct { Columns []columnSpec `json:"columns"` + Indexes []indexSpec `json:"indexes,omitempty"` } type columnSpec struct { Name string `json:"name"` @@ -119,13 +134,21 @@ type columnSpec struct { DefaultValue string `json:"default_value,omitempty"` } +type indexSpec struct { + Name string `json:"name"` + Type string `json:"type"` + Columns []string `json:"columns"` + Unique bool `json:"unique"` + Nullable bool `json:"nullable"` +} + func NewSchemaTable(args SchemaTableArguments) (*SchemaTable, error) { c := &SchemaTable{ dbConnection: args.DB, instanceKey: args.InstanceKey, collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, - logger: log.With(args.Logger, "collector", "SchemaTable"), + logger: log.With(args.Logger, "collector", SchemaTableName), running: &atomic.Bool{}, } @@ -141,7 +164,7 @@ func (c *SchemaTable) Name() string { } func (c *SchemaTable) Start(ctx context.Context) error { - level.Debug(c.logger).Log("msg", "SchemaTable collector started") + level.Debug(c.logger).Log("msg", SchemaTableName+" collector started") c.running.Store(true) ctx, cancel := context.WithCancel(ctx) @@ -415,5 +438,43 @@ func (c *SchemaTable) fetchColumnsDefinitions(ctx context.Context, schemaName st return nil, err } + rs, err = c.dbConnection.QueryContext(ctx, selectIndexNames, schemaName, tableName) + if err != nil { + level.Error(c.logger).Log("msg", "failed to query table indexes", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + defer rs.Close() + + for rs.Next() { + var indexName, columnName, indexType string + var seqInIndex, nonUnique int + var nullable sql.NullString + if err := rs.Scan(&indexName, &seqInIndex, &columnName, &nullable, &nonUnique, &indexType); err != nil { + level.Error(c.logger).Log("msg", "failed to scan table indexes", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + + if len(tblSpec.Indexes) > 0 && tblSpec.Indexes[len(tblSpec.Indexes)-1].Name == indexName { + lastIndex := &tblSpec.Indexes[len(tblSpec.Indexes)-1] + if len(lastIndex.Columns) != seqInIndex-1 { + panic(seqInIndex) + } + lastIndex.Columns = append(lastIndex.Columns, columnName) + } else { + tblSpec.Indexes = append(tblSpec.Indexes, indexSpec{ + Name: indexName, + Type: indexType, + Columns: []string{columnName}, + Unique: nonUnique == 0, + Nullable: nullable.Valid && nullable.String == "YES", + }) + } + } + + if err := rs.Err(); err != nil { + level.Error(c.logger).Log("msg", "error during iterating over table indexes result set", "schema", schemaName, "table", tableName, "err", err) + return nil, err + } + return tblSpec, nil } diff --git a/internal/component/database_observability/mysql/collector/schema_table_test.go b/internal/component/database_observability/mysql/collector/schema_table_test.go index cc952328bc..3cbbe636f4 100644 --- a/internal/component/database_observability/mysql/collector/schema_table_test.go +++ b/internal/component/database_observability/mysql/collector/schema_table_test.go @@ -96,6 +96,25 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + err = collector.Start(context.Background()) require.NoError(t, err) @@ -113,13 +132,16 @@ func TestSchemaTable(t *testing.T) { err = mock.ExpectationsWereMet() require.NoError(t, err) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) t.Run("detect table schema, cache enabled (write)", func(t *testing.T) { t.Parallel() @@ -131,7 +153,7 @@ func TestSchemaTable(t *testing.T) { lokiClient := loki_fake.NewClient(func() {}) // Enable caching. This will exercise the code path - // that writes to cache (but we explicitly assert it in this test) + // that writes to cache (but we don't explicitly assert it in this test) collector, err := NewSchemaTable(SchemaTableArguments{ DB: db, InstanceKey: "mysql-db", @@ -198,6 +220,25 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + err = collector.Start(context.Background()) require.NoError(t, err) @@ -217,13 +258,16 @@ func TestSchemaTable(t *testing.T) { require.Equal(t, 1, collector.cache.Len()) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) t.Run("detect table schema, cache enabled (write and read)", func(t *testing.T) { t.Parallel() @@ -301,6 +345,25 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + // second loop, table info will be read from cache // and no further queries will be executed mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). @@ -346,19 +409,22 @@ func TestSchemaTable(t *testing.T) { err = mock.ExpectationsWereMet() require.NoError(t, err) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[3].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[3].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[4].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[4].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[5].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[5].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[5].Line) }) t.Run("detect view schema", func(t *testing.T) { t.Parallel() @@ -439,6 +505,25 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + err = collector.Start(context.Background()) require.NoError(t, err) @@ -456,13 +541,16 @@ func TestSchemaTable(t *testing.T) { err = mock.ExpectationsWereMet() require.NoError(t, err) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) t.Run("schemas result set iteration error", func(t *testing.T) { t.Parallel() @@ -654,6 +742,25 @@ func TestSchemaTable(t *testing.T) { ), ) + mock.ExpectQuery(selectIndexNames).WithArgs("some_schema", "some_table").RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "index_name", + "seq_in_index", + "column_name", + "nullable", + "non_unique", + "index_type", + }).AddRow( + "PRIMARY", + 1, + "id", + "", + 0, + "BTREE", + ), + ) + err = collector.Start(context.Background()) require.NoError(t, err) @@ -668,12 +775,15 @@ func TestSchemaTable(t *testing.T) { return collector.Stopped() }, 5*time.Second, 100*time.Millisecond) + expectedCreateStmt := base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")) + expectedTableSpec := base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}],"indexes":[{"name":"PRIMARY","type":"BTREE","columns":["id"],"unique":true,"nullable":false}]}`)) + lokiEntries := lokiClient.Received() require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) }