-
Notifications
You must be signed in to change notification settings - Fork 317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Chore.bol load r7 #5237
Draft
lvrach
wants to merge
29
commits into
master
Choose a base branch
from
chore.bol-load-r7
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+14,030
−50
Draft
Chore.bol load r7 #5237
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
98f7827
feat: warehouse transformer
achettyiitr a1fd52e
Feat.warehouse transformer fuzz (#5206)
lvrach 14aa043
chore: adding failed seed data files
achettyiitr b388415
Add client side load balancing support for transformer requests
snarkychef 85befe3
exp: jobsdb bench
lvrach 937c8d0
Configured CSLB to our use case
snarkychef 872a449
chore: adapt internal migrations to accommodate bytea columns
Sidddddarth f538f03
chore: remove compression
Sidddddarth 8eb9dc3
fix: cast to text before converting to binary
Sidddddarth 3ceca48
Merge remote-tracking branch 'origin/feat.transformer-cslb' into chor…
lvrach 06dbbe4
chore: check the column type on src and dest dataset's jobs table
Sidddddarth 12a8e21
fix: refresh connections to avoid hotspotting of load distribution
snarkychef 679daa9
fixes
snarkychef b3cd6ea
Implemented for processor transformations
snarkychef 4e3049e
Merge remote-tracking branch 'origin/fix.refresh-transformer-client' …
lvrach 2819bc7
chore: add stat in both processor and badgerDedup
Sidddddarth 8e12242
exp: use simple byte indexing instead of marshalling
Sidddddarth 9cb1ff2
fix panic
lvrach a231ddb
Merge remote-tracking branch 'origin/feat.warehouse-transformer' into…
lvrach 60e34b8
chore: use hash for pr builds
lvrach 37f0270
use go code for warehouse transformations
lvrach 7b35136
fixes
lvrach 1316780
cache for better perfromance
lvrach 35ecbbb
sync map
lvrach 93ee8db
additional optimisations
lvrach 5ff3872
fix: dedup duration stat name fix
Sidddddarth cf2e146
chore: add metric from sentAt
lvrach 445896a
chore: match buckets
lvrach b85553d
find sent at inside message
lvrach File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
CREATE EXTENSION IF NOT EXISTS pg_stat_statements; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,331 @@ | ||
package main | ||
|
||
import ( | ||
"bufio" | ||
"context" | ||
"database/sql" | ||
"encoding/json" | ||
"flag" | ||
"fmt" | ||
"log" | ||
"math/rand" | ||
"os" | ||
"os/signal" | ||
"sync/atomic" | ||
"syscall" | ||
"time" | ||
|
||
"golang.org/x/sync/errgroup" | ||
|
||
"github.com/google/uuid" | ||
_ "github.com/lib/pq" // PostgreSQL driver | ||
"github.com/ory/dockertest/v3" | ||
"github.com/ory/dockertest/v3/docker" | ||
"github.com/phayes/freeport" | ||
|
||
"github.com/rudderlabs/rudder-go-kit/config" | ||
"github.com/rudderlabs/rudder-go-kit/stats" | ||
|
||
_ "embed" | ||
|
||
"github.com/rudderlabs/rudder-server/jobsdb" | ||
) | ||
|
||
var ( | ||
duration time.Duration | ||
batchSize int | ||
eventCounter int64 // Atomic counter for total events processed | ||
writeWorkers int // Number of write workers | ||
binaryPayload bool | ||
compression bool | ||
postgresVersion string // New variable for PostgreSQL version | ||
) | ||
|
||
//go:embed init.sql | ||
var initSQL string | ||
|
||
//go:embed payload.json | ||
var payloadJSON []byte | ||
|
||
var eventPayload []byte | ||
|
||
func init() { | ||
flag.DurationVar(&duration, "duration", 1*time.Minute, "Duration to run the test") | ||
flag.IntVar(&batchSize, "batch", 10000, "Batch size for storing and reading events") | ||
flag.IntVar(&writeWorkers, "writers", 2, "Number of write workers") | ||
flag.BoolVar(&binaryPayload, "binary", false, "Use binary payload") | ||
flag.BoolVar(&compression, "compression", false, "Enable payload compression") | ||
flag.StringVar(&postgresVersion, "postgres-version", "15", "PostgreSQL version to use") | ||
|
||
var err error | ||
eventPayload, err = json.Marshal(json.RawMessage(payloadJSON)) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
type SimpleCleaner struct{} | ||
|
||
func (sc *SimpleCleaner) Cleanup(f func()) { f() } | ||
func (sc *SimpleCleaner) Log(args ...interface{}) {} | ||
func (sc *SimpleCleaner) Logf(format string, args ...interface{}) {} | ||
func (sc *SimpleCleaner) Failed() bool { return false } | ||
|
||
func generateRandomString(length int) string { | ||
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | ||
seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) | ||
b := make([]byte, length) | ||
for i := range b { | ||
b[i] = charset[seededRand.Intn(len(charset))] | ||
} | ||
return string(b) | ||
} | ||
|
||
func main() { | ||
flag.Parse() | ||
|
||
// Create a new Docker pool | ||
pool, err := dockertest.NewPool("") | ||
if err != nil { | ||
log.Fatalf("Could not connect to docker: %s", err) | ||
} | ||
|
||
// Generate random password | ||
randomPassword := generateRandomString(16) | ||
|
||
// Find a free port | ||
freePort, err := freeport.GetFreePort() | ||
if err != nil { | ||
log.Fatalf("Could not get free port: %s", err) | ||
} | ||
|
||
// Create a temporary file for the init script | ||
tmpfile, err := os.CreateTemp("", "init.sql") | ||
if err != nil { | ||
log.Fatalf("Could not create temp file: %s", err) | ||
} | ||
defer os.Remove(tmpfile.Name()) | ||
|
||
if _, err := tmpfile.WriteString(initSQL); err != nil { | ||
log.Fatalf("Could not write to temp file: %s", err) | ||
} | ||
if err := tmpfile.Close(); err != nil { | ||
log.Fatalf("Could not close temp file: %s", err) | ||
} | ||
|
||
// Start a PostgreSQL container | ||
// docker.io/rudderstack/postgres:bitnamiDoremonPocket15 | ||
resource, err := pool.RunWithOptions(&dockertest.RunOptions{ | ||
Repository: "postgres", | ||
Tag: postgresVersion, // Use the configurable version | ||
Env: []string{ | ||
"POSTGRES_PASSWORD=" + randomPassword, | ||
"POSTGRES_DB=testdb", | ||
}, | ||
Mounts: []string{ | ||
fmt.Sprintf("%s:/docker-entrypoint-initdb.d/init.sql", tmpfile.Name()), | ||
}, | ||
Cmd: []string{ | ||
"postgres", | ||
"-c", "shared_preload_libraries=pg_stat_statements", | ||
}, | ||
PortBindings: map[docker.Port][]docker.PortBinding{ | ||
"5432/tcp": {{HostIP: "0.0.0.0", HostPort: fmt.Sprintf("%d", freePort)}}, | ||
}, | ||
}, func(config *docker.HostConfig) { | ||
config.AutoRemove = true | ||
config.RestartPolicy = docker.RestartPolicy{ | ||
Name: "no", | ||
} | ||
}) | ||
if err != nil { | ||
log.Fatalf("Could not start resource: %s", err) | ||
} | ||
|
||
// Construct the database URL | ||
dbURL := fmt.Sprintf("postgres://postgres:%s@localhost:%d/testdb?sslmode=disable", randomPassword, freePort) | ||
|
||
// Print the database URL | ||
fmt.Printf("Database URL: %s\n", dbURL) | ||
|
||
// Create a connection to the database | ||
var db *sql.DB | ||
err = pool.Retry(func() error { | ||
var err error | ||
db, err = sql.Open("postgres", dbURL) | ||
if err != nil { | ||
return err | ||
} | ||
return db.Ping() | ||
}) | ||
if err != nil { | ||
log.Fatalf("Could not connect to docker: %s", err) | ||
} | ||
|
||
fmt.Println("Successfully connected to database") | ||
|
||
// Ensure the container is removed when we're done | ||
defer func() { | ||
if err := pool.Purge(resource); err != nil { | ||
log.Printf("Could not purge resource: %s", err) | ||
} | ||
}() | ||
|
||
c := config.New() | ||
c.Set("INSTANCE_ID", "1") | ||
|
||
opts := []jobsdb.OptsFunc{ | ||
jobsdb.WithDBHandle(db), jobsdb.WithConfig(c), jobsdb.WithStats(stats.NOP), | ||
} | ||
if binaryPayload { | ||
opts = append(opts, | ||
jobsdb.WithBinaryPayload(config.SingleValueLoader(true)), | ||
) | ||
} | ||
|
||
// Use the db connection for your jobsdb | ||
jobsDB := jobsdb.NewForReadWrite("bench_db", opts...) | ||
if err := jobsDB.Start(); err != nil { | ||
panic(err) | ||
} | ||
|
||
defer jobsDB.Close() | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), duration) | ||
defer cancel() | ||
|
||
// Create a separate context for signal handling | ||
sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | ||
defer stop() | ||
|
||
g, ctx := errgroup.WithContext(ctx) | ||
start := time.Now() | ||
|
||
// Start multiple store workers | ||
for i := 0; i < writeWorkers; i++ { | ||
g.Go(func() error { | ||
return storeWorker(ctx, jobsDB) | ||
}) | ||
} | ||
|
||
// Start the read worker | ||
g.Go(func() error { | ||
return readWorker(ctx, jobsDB) | ||
}) | ||
|
||
// Start the status update goroutine | ||
g.Go(func() error { | ||
ticker := time.NewTicker(5 * time.Second) | ||
defer ticker.Stop() | ||
lastCount := int64(0) | ||
lastTime := start | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case t := <-ticker.C: | ||
currentCount := atomic.LoadInt64(&eventCounter) | ||
currentTime := t | ||
duration := currentTime.Sub(lastTime) | ||
rate := float64(currentCount-lastCount) / duration.Seconds() | ||
|
||
fmt.Printf("[%s] Processed %d events. Current rate: %.2f events/second\n", | ||
currentTime.Format("15:04:05"), currentCount, rate) | ||
|
||
lastCount = currentCount | ||
lastTime = currentTime | ||
} | ||
} | ||
}) | ||
|
||
// Wait for either the workers to finish or a signal | ||
select { | ||
case <-ctx.Done(): | ||
if err := g.Wait(); err != nil && err != context.DeadlineExceeded { | ||
log.Printf("Error occurred: %v", err) | ||
} | ||
case <-sigCtx.Done(): | ||
log.Println("Received termination signal") | ||
cancel() // Cancel the worker context | ||
if err := g.Wait(); err != nil && err != context.Canceled { | ||
log.Printf("Error occurred during shutdown: %v", err) | ||
} | ||
} | ||
|
||
elapsed := time.Since(start) | ||
totalEvents := atomic.LoadInt64(&eventCounter) | ||
fmt.Printf("\nFinal results:\n") | ||
fmt.Printf("Processed %d events in %v\n", totalEvents, elapsed) | ||
fmt.Printf("Average rate: %.2f events/second\n", float64(totalEvents)/elapsed.Seconds()) | ||
fmt.Printf("Database URL: %s\n", dbURL) | ||
Check failure Code scanning / CodeQL Clear-text logging of sensitive information High Sensitive data returned by an access to randomPassword Error loading related location Loading |
||
|
||
// Wait for user confirmation before exiting | ||
fmt.Println("\nPress Enter to quit...") | ||
bufio.NewReader(os.Stdin).ReadString('\n') | ||
} | ||
|
||
func storeWorker(ctx context.Context, db jobsdb.JobsDB) error { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
default: | ||
batch := make([]*jobsdb.JobT, 0, batchSize) | ||
for j := 0; j < batchSize; j++ { | ||
job := &jobsdb.JobT{ | ||
UserID: fmt.Sprintf("user-%d", atomic.AddInt64(&eventCounter, 1)), | ||
UUID: uuid.New(), | ||
Parameters: []byte(fmt.Sprintf(`{"event_id": %d}`, atomic.LoadInt64(&eventCounter))), | ||
CustomVal: "benchmark", | ||
EventPayload: eventPayload, | ||
} | ||
batch = append(batch, job) | ||
} | ||
|
||
if err := db.Store(ctx, batch); err != nil { | ||
return fmt.Errorf("failed to store batch: %w", err) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func readWorker(ctx context.Context, db jobsdb.JobsDB) error { | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
default: | ||
jobs, err := db.GetUnprocessed(ctx, jobsdb.GetQueryParams{ | ||
JobsLimit: batchSize, | ||
// ... existing query params ... | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed to retrieve jobs: %w", err) | ||
} | ||
|
||
if len(jobs.Jobs) == 0 { | ||
time.Sleep(10 * time.Millisecond) // Avoid tight loop if no jobs | ||
continue | ||
} | ||
|
||
var statusList []*jobsdb.JobStatusT | ||
for _, job := range jobs.Jobs { | ||
statusList = append(statusList, &jobsdb.JobStatusT{ | ||
JobID: job.JobID, | ||
JobState: jobsdb.Succeeded.State, | ||
AttemptNum: 1, | ||
ExecTime: time.Now(), | ||
RetryTime: time.Now(), | ||
ErrorCode: "200", | ||
ErrorResponse: []byte(`{"success": true}`), | ||
Parameters: []byte(`{"event_id": 1}`), | ||
}) | ||
} | ||
|
||
if err := db.UpdateJobStatus(ctx, statusList, []string{}, []jobsdb.ParameterFilterT{}); err != nil { | ||
return fmt.Errorf("failed to update job status: %w", err) | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
Copilot Autofix AI 8 days ago
To fix the problem, we should avoid logging the sensitive information contained in
randomPassword
. Instead of printing the entiredbURL
, we can print a sanitized version that omits the password. This way, we maintain the functionality of logging the database URL without exposing sensitive information.