diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index 119056a571e2..04ccb1680b37 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -42,29 +42,29 @@ const ( ) type DbEngineUnderTest struct { - Port string - SQLParameter string - CheckCompatibility func(t *testing.T) - ConnectionString func(host string, externalPort nat.Port) string - Driver string - CurrentTimestampFunction string - ConvertColumnName func(string) string - ContainerRequest testcontainers.ContainerRequest + Port string + SQLParameter func(position int) string + CheckCompatibility func(t *testing.T) + ConnectionString func(host string, externalPort nat.Port) string + Driver string + ConvertColumnName func(string) string + ContainerRequest testcontainers.ContainerRequest } var ( Postgres = DbEngineUnderTest{ - Port: postgresqlPort, - SQLParameter: "$1", + Port: postgresqlPort, + SQLParameter: func(position int) string { + return fmt.Sprintf("$%d", position) + }, CheckCompatibility: func(_ *testing.T) { // No compatibility checks needed for Postgres }, ConnectionString: func(host string, externalPort nat.Port) string { return fmt.Sprintf("host=%s port=%s user=otel password=otel sslmode=disable", host, externalPort.Port()) }, - Driver: "postgres", - CurrentTimestampFunction: "now()", - ConvertColumnName: func(name string) string { return name }, + Driver: "postgres", + ConvertColumnName: func(name string) string { return name }, ContainerRequest: testcontainers.ContainerRequest{ Image: "postgres:9.6.24", Env: map[string]string{ @@ -83,17 +83,18 @@ var ( }, } MySQL = DbEngineUnderTest{ - Port: mysqlPort, - SQLParameter: "?", + Port: mysqlPort, + SQLParameter: func(_ int) string { + return "?" + }, CheckCompatibility: func(_ *testing.T) { // No compatibility checks needed for MySQL }, ConnectionString: func(host string, externalPort nat.Port) string { return fmt.Sprintf("otel:otel@tcp(%s:%s)/otel", host, externalPort.Port()) }, - Driver: "mysql", - CurrentTimestampFunction: "now()", - ConvertColumnName: func(name string) string { return name }, + Driver: "mysql", + ConvertColumnName: func(name string) string { return name }, ContainerRequest: testcontainers.ContainerRequest{ Image: "mysql:8.0.33", Env: map[string]string{ @@ -112,32 +113,42 @@ var ( }, } Oracle = DbEngineUnderTest{ - Port: oraclePort, - SQLParameter: ":1", + Port: oraclePort, + SQLParameter: func(position int) string { + return fmt.Sprintf(":%d", position) + }, CheckCompatibility: func(t *testing.T) { - if runtime.GOARCH == "arm64" { - t.Skip("Incompatible with arm64") - } t.Skip("Skipping the test until https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27577 is fixed") }, ConnectionString: func(host string, externalPort nat.Port) string { - return fmt.Sprintf("oracle://otel:p@ssw%%25rd@%s:%s/XE", host, externalPort.Port()) + return fmt.Sprintf("oracle://otel:otel@%s:%s/FREEPDB1", host, externalPort.Port()) }, - Driver: "oracle", - CurrentTimestampFunction: "SYSTIMESTAMP", - ConvertColumnName: strings.ToUpper, + Driver: "oracle", + ConvertColumnName: strings.ToUpper, ContainerRequest: testcontainers.ContainerRequest{ - FromDockerfile: testcontainers.FromDockerfile{ - Context: filepath.Join("testdata", "integration", "oracle"), - Dockerfile: "Dockerfile.oracledb", + Image: "gvenzl/oracle-free:slim-faststart", + Env: map[string]string{ + "ORACLE_PASSWORD": "mysecurepassword", + "APP_USER": "otel", + "APP_USER_PASSWORD": "otel", }, + Files: []testcontainers.ContainerFile{{ + HostFilePath: filepath.Join("testdata", "integration", "oracle", "init.sql"), + ContainerFilePath: "/docker-entrypoint-initdb.d/init.sql", + FileMode: 700, + }}, ExposedPorts: []string{oraclePort}, - WaitingFor: wait.NewHealthStrategy().WithStartupTimeout(30 * time.Minute), + WaitingFor: wait.ForAll( + wait.ForListeningPort(oraclePort), + wait.ForLog("DATABASE IS READY TO USE!"), + ).WithDeadline(5 * time.Minute), }, } SQLServer = DbEngineUnderTest{ - Port: sqlServerPort, - SQLParameter: "@p1", + Port: sqlServerPort, + SQLParameter: func(position int) string { + return fmt.Sprintf("@p%d", position) + }, CheckCompatibility: func(t *testing.T) { if runtime.GOARCH == "arm64" { t.Skip("Incompatible with arm64") @@ -147,9 +158,8 @@ var ( ConnectionString: func(host string, externalPort nat.Port) string { return fmt.Sprintf("sqlserver://otel:YourStrong%%21Passw0rd@%s:%s?database=otel", host, externalPort.Port()) }, - Driver: "sqlserver", - CurrentTimestampFunction: "GETDATE()", - ConvertColumnName: func(name string) string { return name }, + Driver: "sqlserver", + ConvertColumnName: func(name string) string { return name }, ContainerRequest: testcontainers.ContainerRequest{ FromDockerfile: testcontainers.FromDockerfile{ Context: filepath.Join("testdata", "integration", "sqlserver"), @@ -164,228 +174,292 @@ var ( } ) -func TestIntegrationLogsTrackingWithStorage(t *testing.T) { - tests := []struct { - name string - engine DbEngineUnderTest +func TestIntegrationLogsTracking(t *testing.T) { + testGroupedByDbEngine := map[string][]struct { + name string + runTest func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) }{ - {name: "Postgres", engine: Postgres}, - {name: "MySQL", engine: MySQL}, - {name: "SQLServer", engine: SQLServer}, - {name: "Oracle", engine: Oracle}, + Postgres.Driver: { + {name: "PostgresWithStorage", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithStorage(t, engine, container) + }}, + {name: "PostgresById", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "id", "0", "") + }}, + {name: "PostgresByTimestamp", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "insert_time", "2022-06-03 21:00:00+00", "") + }}, + }, + MySQL.Driver: { + {name: "MySQLWithStorage", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithStorage(t, engine, container) + }}, + {name: "MySQLById", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "id", "0", "") + }}, + {name: "MySQLByTimestamp", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "insert_time", "2022-06-03 21:00:00", "") + }}, + }, + SQLServer.Driver: { + {name: "SQLServerWithStorage", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithStorage(t, engine, container) + }}, + {name: "SQLServerById", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "id", "0", "") + }}, + {name: "SQLServerByTimestamp", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "insert_time", "2022-06-03 21:00:00", "") + }}, + }, + Oracle.Driver: { + {name: "OracleWithStorage", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithStorage(t, engine, container) + }}, + {name: "OracleById", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "ID", "0", "") + }}, + {name: "OracleByTimestamp", runTest: func(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + runTestForLogTrackingWithoutStorage(t, engine, container, "INSERT_TIME", "2022-06-03T21:00:00.000Z", "TO_TIMESTAMP_TZ(:1, 'YYYY-MM-DD\"T\"HH24:MI:SS.FF6TZH:TZM')") + }}, + }, } - for _, tt := range tests { - trackingColumn := tt.engine.ConvertColumnName("id") - trackingStartValue := "0" - t.Run(tt.name, func(t *testing.T) { - tt.engine.CheckCompatibility(t) - dbContainer, dbHost, externalPort := startDbContainerWithConfig(t, tt.engine) + for driver, dbEngineTests := range testGroupedByDbEngine { + t.Run(driver, func(t *testing.T) { + engine := getDbEngine(driver) + engine.CheckCompatibility(t) + dbContainer, err := testcontainers.GenericContainer( + context.Background(), + testcontainers.GenericContainerRequest{ + ContainerRequest: engine.ContainerRequest, + Started: true, + }, + ) + require.NoError(t, err) defer func() { require.NoError(t, dbContainer.Terminate(context.Background())) }() - storageDir := t.TempDir() - storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir) - - receiverCreateSettings := receivertest.NewNopSettings() - receiver, config, consumer := createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) - config.CollectionInterval = time.Second - config.Telemetry.Logs.Query = true - config.StorageID = &storageExtension.ID - config.Queries = []sqlquery.Query{ - { - SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, tt.engine.SQLParameter), - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: tt.engine.ConvertColumnName("body"), - AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, - }, - }, - TrackingColumn: trackingColumn, - TrackingStartValue: trackingStartValue, - }, + for _, test := range dbEngineTests { + t.Run(test.name, func(t *testing.T) { + test.runTest(t, engine, dbContainer) + }) } + }) + } +} - host := storagetest.NewStorageHost().WithExtension(storageExtension.ID, storageExtension) - err := receiver.Start(context.Background(), host) - require.NoError(t, err) +func getDbEngine(driver string) DbEngineUnderTest { + switch driver { + case Postgres.Driver: + return Postgres + case MySQL.Driver: + return MySQL + case SQLServer.Driver: + return SQLServer + case Oracle.Driver: + return Oracle + default: + panic(fmt.Sprintf("unsupported driver: %s", driver)) + } +} - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 +func runTestForLogTrackingWithStorage(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) { + dbHost, dbPort := getContainerHostAndPort(t, container, engine.Port) + storageDir := t.TempDir() + storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir) + + trackingColumn := engine.ConvertColumnName("id") + trackingStartValue := "0" + + receiverCreateSettings := receivertest.NewNopSettings() + receiver, config, consumer := createTestLogsReceiver(t, engine.Driver, engine.ConnectionString(dbHost, dbPort), receiverCreateSettings) + config.CollectionInterval = time.Second + config.Telemetry.Logs.Query = true + config.StorageID = &storageExtension.ID + config.Queries = []sqlquery.Query{ + { + SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, engine.SQLParameter(1)), + Logs: []sqlquery.LogsCfg{ + { + BodyColumn: engine.ConvertColumnName("body"), + AttributeColumns: []string{engine.ConvertColumnName("attribute")}, }, - 1*time.Minute, - 1*time.Second, - "failed to receive more than 0 logs", - ) + }, + TrackingColumn: trackingColumn, + TrackingStartValue: trackingStartValue, + }, + } - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) + host := storagetest.NewStorageHost().WithExtension(storageExtension.ID, storageExtension) + err := receiver.Start(context.Background(), host) + require.NoError(t, err) - initialLogCount := 5 - require.Equal(t, initialLogCount, consumer.LogRecordCount()) - testAllSimpleLogs(t, consumer.AllLogs(), tt.engine.ConvertColumnName("attribute")) + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 1*time.Minute, + 1*time.Second, + "failed to receive more than 0 logs", + ) + + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) - receiver, config, consumer = createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) - config.CollectionInterval = time.Second - config.Telemetry.Logs.Query = true - config.StorageID = &storageExtension.ID - config.Queries = []sqlquery.Query{ + initialLogCount := 5 + require.Equal(t, initialLogCount, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs(), engine.ConvertColumnName("attribute")) + + receiver, config, consumer = createTestLogsReceiver(t, engine.Driver, engine.ConnectionString(dbHost, dbPort), receiverCreateSettings) + config.CollectionInterval = time.Second + config.Telemetry.Logs.Query = true + config.StorageID = &storageExtension.ID + config.Queries = []sqlquery.Query{ + { + SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, engine.SQLParameter(1)), + Logs: []sqlquery.LogsCfg{ { - SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, tt.engine.SQLParameter), - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: tt.engine.ConvertColumnName("body"), - AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, - }, - }, - TrackingColumn: trackingColumn, - TrackingStartValue: trackingStartValue, + BodyColumn: engine.ConvertColumnName("body"), + AttributeColumns: []string{engine.ConvertColumnName("attribute")}, }, - } - err = receiver.Start(context.Background(), host) - require.NoError(t, err) + }, + TrackingColumn: trackingColumn, + TrackingStartValue: trackingStartValue, + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) - time.Sleep(5 * time.Second) + time.Sleep(5 * time.Second) - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) - require.Equal(t, 0, consumer.LogRecordCount()) + require.Equal(t, 0, consumer.LogRecordCount()) - newLogCount := 3 - insertSimpleLogs(t, tt.engine, dbContainer, initialLogCount, newLogCount) + newLogCount := 3 + insertSimpleLogs(t, engine, container, initialLogCount, newLogCount) + defer cleanupSimpleLogs(t, engine, container, initialLogCount) - receiver, config, consumer = createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) - config.CollectionInterval = time.Second - config.Telemetry.Logs.Query = true - config.StorageID = &storageExtension.ID - config.Queries = []sqlquery.Query{ + receiver, config, consumer = createTestLogsReceiver(t, engine.Driver, engine.ConnectionString(dbHost, dbPort), receiverCreateSettings) + config.CollectionInterval = time.Second + config.Telemetry.Logs.Query = true + config.StorageID = &storageExtension.ID + config.Queries = []sqlquery.Query{ + { + SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, engine.SQLParameter(1)), + Logs: []sqlquery.LogsCfg{ { - SQL: fmt.Sprintf("select * from simple_logs where %s > %s", trackingColumn, tt.engine.SQLParameter), - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: tt.engine.ConvertColumnName("body"), - AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, - }, - }, - TrackingColumn: trackingColumn, - TrackingStartValue: trackingStartValue, + BodyColumn: engine.ConvertColumnName("body"), + AttributeColumns: []string{engine.ConvertColumnName("attribute")}, }, - } - err = receiver.Start(context.Background(), host) - require.NoError(t, err) + }, + TrackingColumn: trackingColumn, + TrackingStartValue: trackingStartValue, + }, + } + err = receiver.Start(context.Background(), host) + require.NoError(t, err) - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 - }, - 1*time.Minute, - 1*time.Second, - "failed to receive more than 0 logs", - ) + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 + }, + 1*time.Minute, + 1*time.Second, + "failed to receive more than 0 logs", + ) - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) + err = receiver.Shutdown(context.Background()) + require.NoError(t, err) - require.Equal(t, newLogCount, consumer.LogRecordCount()) - }) - } + require.Equal(t, newLogCount, consumer.LogRecordCount()) } -func TestIntegrationLogsTrackingWithoutStorage(t *testing.T) { - tests := []struct { - name string - engine DbEngineUnderTest - trackingColumn string - trackingStartValue string - trackingStartValueFormat string - }{ - {name: "PostgresById", engine: Postgres, trackingColumn: "id", trackingStartValue: "0", trackingStartValueFormat: ""}, - {name: "MySQLById", engine: MySQL, trackingColumn: "id", trackingStartValue: "0", trackingStartValueFormat: ""}, - {name: "SQLServerById", engine: SQLServer, trackingColumn: "id", trackingStartValue: "0", trackingStartValueFormat: ""}, - {name: "OracleById", engine: Oracle, trackingColumn: "ID", trackingStartValue: "0", trackingStartValueFormat: ""}, - {name: "PostgresByTimestamp", engine: Postgres, trackingColumn: "insert_time", trackingStartValue: "2022-06-03 21:00:00+00", trackingStartValueFormat: ""}, - {name: "MySQLByTimestamp", engine: MySQL, trackingColumn: "insert_time", trackingStartValue: "2022-06-03 21:00:00", trackingStartValueFormat: ""}, - {name: "SQLServerByTimestamp", engine: SQLServer, trackingColumn: "insert_time", trackingStartValue: "2022-06-03 21:00:00", trackingStartValueFormat: ""}, - {name: "OracleByTimestamp", engine: Oracle, trackingColumn: "INSERT_TIME", trackingStartValue: "2022-06-03T21:00:00.000Z", trackingStartValueFormat: "TO_TIMESTAMP_TZ(:1, 'YYYY-MM-DD\"T\"HH24:MI:SS.FF6TZH:TZM')"}, +func runTestForLogTrackingWithoutStorage(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container, trackingColumn, trackingStartValue, trackingStartValueFormat string) { + receiverCreateSettings := receivertest.NewNopSettings() + dbHost, dbPort := getContainerHostAndPort(t, container, engine.Port) + receiver, config, consumer := createTestLogsReceiver(t, engine.Driver, engine.ConnectionString(dbHost, dbPort), receiverCreateSettings) + config.CollectionInterval = 100 * time.Millisecond + config.Telemetry.Logs.Query = true + + trackingColumn = engine.ConvertColumnName(trackingColumn) + trackingColumnParameter := engine.SQLParameter(1) + if trackingStartValueFormat != "" { + trackingColumnParameter = trackingStartValueFormat } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.engine.CheckCompatibility(t) - dbContainer, dbHost, externalPort := startDbContainerWithConfig(t, tt.engine) - defer func() { - require.NoError(t, dbContainer.Terminate(context.Background())) - }() - - receiverCreateSettings := receivertest.NewNopSettings() - receiver, config, consumer := createTestLogsReceiver(t, tt.engine.Driver, tt.engine.ConnectionString(dbHost, externalPort), receiverCreateSettings) - config.CollectionInterval = 100 * time.Millisecond - config.Telemetry.Logs.Query = true - - trackingColumn := tt.engine.ConvertColumnName(tt.trackingColumn) - trackingColumnParameter := tt.engine.SQLParameter - if tt.trackingStartValueFormat != "" { - trackingColumnParameter = tt.trackingStartValueFormat - } - - config.Queries = []sqlquery.Query{ + config.Queries = []sqlquery.Query{ + { + SQL: fmt.Sprintf("select * from simple_logs where %s > %s order by %s asc", trackingColumn, trackingColumnParameter, trackingColumn), + Logs: []sqlquery.LogsCfg{ { - SQL: fmt.Sprintf("select * from simple_logs where %s > %s order by %s asc", trackingColumn, trackingColumnParameter, trackingColumn), - Logs: []sqlquery.LogsCfg{ - { - BodyColumn: tt.engine.ConvertColumnName("body"), - AttributeColumns: []string{tt.engine.ConvertColumnName("attribute")}, - }, - }, - TrackingColumn: trackingColumn, - TrackingStartValue: tt.trackingStartValue, - }, - } - host := componenttest.NewNopHost() - err := receiver.Start(context.Background(), host) - require.NoError(t, err) - - require.Eventuallyf( - t, - func() bool { - return consumer.LogRecordCount() > 0 + BodyColumn: engine.ConvertColumnName("body"), + AttributeColumns: []string{engine.ConvertColumnName("attribute")}, }, - 1*time.Minute, - 500*time.Millisecond, - "failed to receive more than 0 logs", - ) - require.Equal(t, 5, consumer.LogRecordCount()) - testAllSimpleLogs(t, consumer.AllLogs(), tt.engine.ConvertColumnName("attribute")) - - err = receiver.Shutdown(context.Background()) - require.NoError(t, err) - }) + }, + TrackingColumn: trackingColumn, + TrackingStartValue: trackingStartValue, + }, } -} + host := componenttest.NewNopHost() + err := receiver.Start(context.Background(), host) + require.NoError(t, err) -func startDbContainerWithConfig(t *testing.T, engine DbEngineUnderTest) (testcontainers.Container, string, nat.Port) { - ctx := context.Background() - container, err := testcontainers.GenericContainer( - ctx, - testcontainers.GenericContainerRequest{ - ContainerRequest: engine.ContainerRequest, - Started: true, + require.Eventuallyf( + t, + func() bool { + return consumer.LogRecordCount() > 0 }, + 1*time.Minute, + 500*time.Millisecond, + "failed to receive more than 0 logs", ) + require.Equal(t, 5, consumer.LogRecordCount()) + testAllSimpleLogs(t, consumer.AllLogs(), engine.ConvertColumnName("attribute")) + + err = receiver.Shutdown(context.Background()) require.NoError(t, err) - dbPort, err := container.MappedPort(ctx, nat.Port(engine.Port)) +} + +func getContainerHostAndPort(t *testing.T, container testcontainers.Container, port string) (string, nat.Port) { + dbPort, err := container.MappedPort(context.Background(), nat.Port(port)) require.NoError(t, err) - host, err := container.Host(ctx) + dbHost, err := container.Host(context.Background()) require.NoError(t, err) - return container, host, dbPort + return dbHost, dbPort } func insertSimpleLogs(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container, existingLogID, newLogCount int) { + db := openDatabase(t, engine, container) + defer db.Close() + + stmt := prepareStatement(t, db, fmt.Sprintf("INSERT INTO simple_logs (id, body, attribute) VALUES (%s, %s, %s)", engine.SQLParameter(1), engine.SQLParameter(2), engine.SQLParameter(3))) + defer stmt.Close() + + for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ { + _, err := stmt.Exec(newLogID, fmt.Sprintf("another log %d", newLogID), "TLSv1.2") + require.NoError(t, err) + } +} + +func cleanupSimpleLogs(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container, existingLogID int) { + db := openDatabase(t, engine, container) + defer db.Close() + + deleteStatement := "DELETE FROM simple_logs WHERE id > " + engine.SQLParameter(1) + stmt := prepareStatement(t, db, deleteStatement) + defer stmt.Close() + + _, err := stmt.Exec(existingLogID) + require.NoError(t, err) +} + +func openDatabase(t *testing.T, engine DbEngineUnderTest, container testcontainers.Container) *sql.DB { externalPort, err := container.MappedPort(context.Background(), nat.Port(engine.Port)) require.NoError(t, err) @@ -394,13 +468,13 @@ func insertSimpleLogs(t *testing.T, engine DbEngineUnderTest, container testcont db, err := sql.Open(engine.Driver, engine.ConnectionString(host, externalPort)) require.NoError(t, err) - defer db.Close() + return db +} - for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ { - query := fmt.Sprintf("insert into simple_logs (id, insert_time, body, attribute) values (%d, %s, 'another log %d', 'TLSv1.2')", newLogID, engine.CurrentTimestampFunction, newLogID) //nolint:gosec // Ignore, not possible to use prepared statements here for currentTimestampFunction - _, err := db.Exec(query) - require.NoError(t, err) - } +func prepareStatement(t *testing.T, db *sql.DB, query string) *sql.Stmt { + stmt, err := db.Prepare(query) + require.NoError(t, err) + return stmt } func createTestLogsReceiver(t *testing.T, driver, dataSource string, receiverCreateSettings receiver.Settings) (*logsReceiver, *Config, *consumertest.LogsSink) { diff --git a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql index e1ac6a2e21ad..82098a7e796a 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql @@ -21,7 +21,7 @@ values ('Mission Impossible', 'Action', 7.1); create table simple_logs ( id integer, - insert_time timestamp, + insert_time timestamp default now(), body text, attribute text, primary key (id) diff --git a/receiver/sqlqueryreceiver/testdata/integration/oracle/Dockerfile.oracledb b/receiver/sqlqueryreceiver/testdata/integration/oracle/Dockerfile.oracledb deleted file mode 100644 index 49433d84fe35..000000000000 --- a/receiver/sqlqueryreceiver/testdata/integration/oracle/Dockerfile.oracledb +++ /dev/null @@ -1,9 +0,0 @@ -FROM gvenzl/oracle-xe:21-slim-faststart - -ENV ORACLE_PASSWORD=mysecurepassword - -RUN mkdir -p /container-entrypoint-initdb.d - -COPY init.sql /container-entrypoint-initdb.d/startup.sql - -HEALTHCHECK CMD "$ORACLE_BASE/healthcheck.sh" diff --git a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql index fe0c5e4c09a7..fe2c24d5e25d 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql @@ -1,10 +1,9 @@ /* The alter session command is required to enable user creation in an Oracle docker container This command shouldn't be used outside of test environments. */ alter session set "_ORACLE_SCRIPT"=true; -CREATE USER OTEL IDENTIFIED BY "p@ssw%rd"; -GRANT CREATE SESSION TO OTEL; -GRANT ALL PRIVILEGES TO OTEL; -ALTER USER OTEL QUOTA UNLIMITED ON USERS; +ALTER SESSION SET CONTAINER=FREEPDB1; +CREATE USER OTEL IDENTIFIED BY otel QUOTA UNLIMITED ON USERS +GRANT CONNECT, RESOURCE TO OTEL; -- Switch to the OTEL schema ALTER SESSION SET CURRENT_SCHEMA = OTEL; @@ -30,7 +29,7 @@ values ('Mission Impossible', 'Action', 7.1); create table simple_logs ( id number primary key, - insert_time timestamp with time zone, + insert_time timestamp with time zone default SYSTIMESTAMP, body varchar2(4000), attribute varchar2(100) ); diff --git a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql index 4ef4274072e5..fd374f3c3d82 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql @@ -24,11 +24,11 @@ grant select on movie to otel; create table simple_logs ( id integer primary key, - insert_time timestamp, + insert_time timestamp default now(), body text, attribute text ); -grant select, insert on simple_logs to otel; +grant select, insert, delete on simple_logs to otel; insert into simple_logs (id, insert_time, body, attribute) values (1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'), diff --git a/receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql index 1d3422c8cfc3..a2dd88cc87ca 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/sqlserver/init.sql @@ -35,7 +35,7 @@ PRINT 'Data inserted into movie table.'; CREATE TABLE simple_logs ( id INT PRIMARY KEY, - insert_time DATETIME2, + insert_time DATETIME2 default GETDATE(), body NVARCHAR(MAX), attribute NVARCHAR(100) );