Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore.bol load r7 #5237

Draft
wants to merge 29 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
98f7827
feat: warehouse transformer
achettyiitr Oct 7, 2024
a1fd52e
Feat.warehouse transformer fuzz (#5206)
lvrach Oct 18, 2024
14aa043
chore: adding failed seed data files
achettyiitr Oct 21, 2024
b388415
Add client side load balancing support for transformer requests
snarkychef Oct 21, 2024
85befe3
exp: jobsdb bench
lvrach Oct 22, 2024
937c8d0
Configured CSLB to our use case
snarkychef Oct 23, 2024
872a449
chore: adapt internal migrations to accommodate bytea columns
Sidddddarth Oct 24, 2024
f538f03
chore: remove compression
Sidddddarth Oct 24, 2024
8eb9dc3
fix: cast to text before converting to binary
Sidddddarth Oct 24, 2024
3ceca48
Merge remote-tracking branch 'origin/feat.transformer-cslb' into chor…
lvrach Oct 24, 2024
06dbbe4
chore: check the column type on src and dest dataset's jobs table
Sidddddarth Oct 24, 2024
12a8e21
fix: refresh connections to avoid hotspotting of load distribution
snarkychef Oct 24, 2024
679daa9
fixes
snarkychef Oct 24, 2024
b3cd6ea
Implemented for processor transformations
snarkychef Oct 24, 2024
4e3049e
Merge remote-tracking branch 'origin/fix.refresh-transformer-client' …
lvrach Oct 24, 2024
2819bc7
chore: add stat in both processor and badgerDedup
Sidddddarth Oct 25, 2024
8e12242
exp: use simple byte indexing instead of marshalling
Sidddddarth Oct 22, 2024
9cb1ff2
fix panic
lvrach Oct 25, 2024
a231ddb
Merge remote-tracking branch 'origin/feat.warehouse-transformer' into…
lvrach Oct 26, 2024
60e34b8
chore: use hash for pr builds
lvrach Oct 26, 2024
37f0270
use go code for warehouse transformations
lvrach Oct 26, 2024
7b35136
fixes
lvrach Oct 26, 2024
1316780
cache for better perfromance
lvrach Oct 26, 2024
35ecbbb
sync map
lvrach Oct 26, 2024
93ee8db
additional optimisations
lvrach Oct 26, 2024
5ff3872
fix: dedup duration stat name fix
Sidddddarth Oct 29, 2024
cf2e146
chore: add metric from sentAt
lvrach Oct 30, 2024
445896a
chore: match buckets
lvrach Oct 30, 2024
b85553d
find sent at inside message
lvrach Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ env:
type=ref,event=branch
type=raw,value=1-alpine,enable=${{ github.event_name == 'release' && !github.event.release.prerelease }}
type=raw,value=latest,enable=${{ github.event_name == 'release' && !github.event.release.prerelease }}
type=raw,value=${{ github.head_ref }},enable=${{ github.event_name == 'pull_request' }}
type=raw,value=${{ github.head_ref }}-${{ github.sha }},enable=${{ github.event_name == 'pull_request' }}
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
Expand All @@ -31,7 +31,7 @@ env:
docker_ent_tags: |
type=ref,event=branch
type=raw,value=latest,enable=${{ github.event_name == 'release' }}
type=raw,value=${{ github.head_ref }},enable=${{ github.event_name == 'pull_request' }}
type=raw,value=${{ github.head_ref }}-${{ github.sha }},enable=${{ github.event_name == 'pull_request' }}
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
Expand All @@ -41,7 +41,7 @@ env:
docker_sbsvc_tags: |
type=ref,event=branch
type=raw,value=latest,enable=${{ github.event_name == 'release' }}
type=raw,value=${{ github.head_ref }},enable=${{ github.event_name == 'pull_request' }}
type=raw,value=${{ github.head_ref }}-${{ github.sha }},enable=${{ github.event_name == 'pull_request' }}
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=semver,pattern={{major}}
Expand Down Expand Up @@ -417,3 +417,4 @@ jobs:
docker buildx imagetools create -t $tag $arm_tag $amd_tag

done <<< "${{ needs.docker-sbsvc-meta.outputs.tags }}"

3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/databricks/databricks-sql-go v1.6.1
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v4 v4.3.1
github.com/dlclark/regexp2 v1.11.4
github.com/docker/docker v27.3.1+incompatible
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/cli v27.2.1+incompatible // indirect
github.com/docker/cli-docs-tool v0.8.0 // indirect
Expand Down Expand Up @@ -288,6 +288,7 @@ require (
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/rs/zerolog v1.33.0 // indirect
github.com/rudderlabs/cslb v0.0.0-20241022023330-a4acff3d7b42
github.com/rudderlabs/goqu/v10 v10.3.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,8 @@ github.com/rudderlabs/bing-ads-go-sdk v0.2.3 h1:jR85Ep6X6SkiesaI7Q7WJHs/65SgByZb
github.com/rudderlabs/bing-ads-go-sdk v0.2.3/go.mod h1:38Yig/202ni4GcloXhaTeH1LqUyFPEx6iljnFa+IDQI=
github.com/rudderlabs/compose-test v0.1.3 h1:uyep6jDCIF737sfv4zIaMsKRQKX95IDz5Xbxor+x0kU=
github.com/rudderlabs/compose-test v0.1.3/go.mod h1:tuvS1eQdSfwOYv1qwyVAcpdJxPLQXJgy5xGDd/9XmMg=
github.com/rudderlabs/cslb v0.0.0-20241022023330-a4acff3d7b42 h1:GaVIxHlLdetHjwv4unLCtoRCriT4+kE4m+exC+6VyKc=
github.com/rudderlabs/cslb v0.0.0-20241022023330-a4acff3d7b42/go.mod h1:1dVMz4io4JnPCXNF7xmsTukgwGN6tiA6lyNFNhgdHn8=
github.com/rudderlabs/goqu/v10 v10.3.1 h1:rnfX+b4EwBWQ2UQfIGeEW299JBBkK5biEbnf7Kq4/Gg=
github.com/rudderlabs/goqu/v10 v10.3.1/go.mod h1:LH2vI5gGHBxEQuESqFyk5ZA2anGINc8o25hbidDWOYw=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
Expand Down
1 change: 1 addition & 0 deletions jobsdb/cmd/bench/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
331 changes: 331 additions & 0 deletions jobsdb/cmd/bench/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
package main

import (
"bufio"
"context"
"database/sql"
"encoding/json"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

"golang.org/x/sync/errgroup"

"github.com/google/uuid"
_ "github.com/lib/pq" // PostgreSQL driver
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
"github.com/phayes/freeport"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"

_ "embed"

"github.com/rudderlabs/rudder-server/jobsdb"
)

var (
duration time.Duration
batchSize int
eventCounter int64 // Atomic counter for total events processed
writeWorkers int // Number of write workers
binaryPayload bool
compression bool
postgresVersion string // New variable for PostgreSQL version
)

//go:embed init.sql
var initSQL string

//go:embed payload.json
var payloadJSON []byte

var eventPayload []byte

func init() {
flag.DurationVar(&duration, "duration", 1*time.Minute, "Duration to run the test")
flag.IntVar(&batchSize, "batch", 10000, "Batch size for storing and reading events")
flag.IntVar(&writeWorkers, "writers", 2, "Number of write workers")
flag.BoolVar(&binaryPayload, "binary", false, "Use binary payload")
flag.BoolVar(&compression, "compression", false, "Enable payload compression")
flag.StringVar(&postgresVersion, "postgres-version", "15", "PostgreSQL version to use")

var err error
eventPayload, err = json.Marshal(json.RawMessage(payloadJSON))
if err != nil {
panic(err)
}
}

type SimpleCleaner struct{}

func (sc *SimpleCleaner) Cleanup(f func()) { f() }
func (sc *SimpleCleaner) Log(args ...interface{}) {}
func (sc *SimpleCleaner) Logf(format string, args ...interface{}) {}
func (sc *SimpleCleaner) Failed() bool { return false }

func generateRandomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}

func main() {
flag.Parse()

// Create a new Docker pool
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

// Generate random password
randomPassword := generateRandomString(16)

// Find a free port
freePort, err := freeport.GetFreePort()
if err != nil {
log.Fatalf("Could not get free port: %s", err)
}

// Create a temporary file for the init script
tmpfile, err := os.CreateTemp("", "init.sql")
if err != nil {
log.Fatalf("Could not create temp file: %s", err)
}
defer os.Remove(tmpfile.Name())

if _, err := tmpfile.WriteString(initSQL); err != nil {
log.Fatalf("Could not write to temp file: %s", err)
}
if err := tmpfile.Close(); err != nil {
log.Fatalf("Could not close temp file: %s", err)
}

// Start a PostgreSQL container
// docker.io/rudderstack/postgres:bitnamiDoremonPocket15
resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "postgres",
Tag: postgresVersion, // Use the configurable version
Env: []string{
"POSTGRES_PASSWORD=" + randomPassword,
"POSTGRES_DB=testdb",
},
Mounts: []string{
fmt.Sprintf("%s:/docker-entrypoint-initdb.d/init.sql", tmpfile.Name()),
},
Cmd: []string{
"postgres",
"-c", "shared_preload_libraries=pg_stat_statements",
},
PortBindings: map[docker.Port][]docker.PortBinding{
"5432/tcp": {{HostIP: "0.0.0.0", HostPort: fmt.Sprintf("%d", freePort)}},
},
}, func(config *docker.HostConfig) {
config.AutoRemove = true
config.RestartPolicy = docker.RestartPolicy{
Name: "no",
}
})
if err != nil {
log.Fatalf("Could not start resource: %s", err)
}

// Construct the database URL
dbURL := fmt.Sprintf("postgres://postgres:%s@localhost:%d/testdb?sslmode=disable", randomPassword, freePort)

// Print the database URL
fmt.Printf("Database URL: %s\n", dbURL)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

Sensitive data returned by an access to randomPassword
flows to a logging call.

Copilot Autofix AI 8 days ago

To fix the problem, we should avoid logging the sensitive information contained in randomPassword. Instead of printing the entire dbURL, we can print a sanitized version that omits the password. This way, we maintain the functionality of logging the database URL without exposing sensitive information.

Suggested changeset 1
jobsdb/cmd/bench/main.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/jobsdb/cmd/bench/main.go b/jobsdb/cmd/bench/main.go
--- a/jobsdb/cmd/bench/main.go
+++ b/jobsdb/cmd/bench/main.go
@@ -146,5 +146,6 @@
 	dbURL := fmt.Sprintf("postgres://postgres:%s@localhost:%d/testdb?sslmode=disable", randomPassword, freePort)
+	sanitizedDbURL := fmt.Sprintf("postgres://postgres:****@localhost:%d/testdb?sslmode=disable", freePort)
 
-	// Print the database URL
-	fmt.Printf("Database URL: %s\n", dbURL)
+	// Print the sanitized database URL
+	fmt.Printf("Database URL: %s\n", sanitizedDbURL)
 
EOF
@@ -146,5 +146,6 @@
dbURL := fmt.Sprintf("postgres://postgres:%s@localhost:%d/testdb?sslmode=disable", randomPassword, freePort)
sanitizedDbURL := fmt.Sprintf("postgres://postgres:****@localhost:%d/testdb?sslmode=disable", freePort)

// Print the database URL
fmt.Printf("Database URL: %s\n", dbURL)
// Print the sanitized database URL
fmt.Printf("Database URL: %s\n", sanitizedDbURL)

Copilot is powered by AI and may make mistakes. Always verify output.
Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options

// Create a connection to the database
var db *sql.DB
err = pool.Retry(func() error {
var err error
db, err = sql.Open("postgres", dbURL)
if err != nil {
return err
}
return db.Ping()
})
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

fmt.Println("Successfully connected to database")

// Ensure the container is removed when we're done
defer func() {
if err := pool.Purge(resource); err != nil {
log.Printf("Could not purge resource: %s", err)
}
}()

c := config.New()
c.Set("INSTANCE_ID", "1")

opts := []jobsdb.OptsFunc{
jobsdb.WithDBHandle(db), jobsdb.WithConfig(c), jobsdb.WithStats(stats.NOP),
}
if binaryPayload {
opts = append(opts,
jobsdb.WithBinaryPayload(config.SingleValueLoader(true)),
)
}

// Use the db connection for your jobsdb
jobsDB := jobsdb.NewForReadWrite("bench_db", opts...)
if err := jobsDB.Start(); err != nil {
panic(err)
}

defer jobsDB.Close()

ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

// Create a separate context for signal handling
sigCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

g, ctx := errgroup.WithContext(ctx)
start := time.Now()

// Start multiple store workers
for i := 0; i < writeWorkers; i++ {
g.Go(func() error {
return storeWorker(ctx, jobsDB)
})
}

// Start the read worker
g.Go(func() error {
return readWorker(ctx, jobsDB)
})

// Start the status update goroutine
g.Go(func() error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
lastCount := int64(0)
lastTime := start

for {
select {
case <-ctx.Done():
return ctx.Err()
case t := <-ticker.C:
currentCount := atomic.LoadInt64(&eventCounter)
currentTime := t
duration := currentTime.Sub(lastTime)
rate := float64(currentCount-lastCount) / duration.Seconds()

fmt.Printf("[%s] Processed %d events. Current rate: %.2f events/second\n",
currentTime.Format("15:04:05"), currentCount, rate)

lastCount = currentCount
lastTime = currentTime
}
}
})

// Wait for either the workers to finish or a signal
select {
case <-ctx.Done():
if err := g.Wait(); err != nil && err != context.DeadlineExceeded {
log.Printf("Error occurred: %v", err)
}
case <-sigCtx.Done():
log.Println("Received termination signal")
cancel() // Cancel the worker context
if err := g.Wait(); err != nil && err != context.Canceled {
log.Printf("Error occurred during shutdown: %v", err)
}
}

elapsed := time.Since(start)
totalEvents := atomic.LoadInt64(&eventCounter)
fmt.Printf("\nFinal results:\n")
fmt.Printf("Processed %d events in %v\n", totalEvents, elapsed)
fmt.Printf("Average rate: %.2f events/second\n", float64(totalEvents)/elapsed.Seconds())
fmt.Printf("Database URL: %s\n", dbURL)

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

Sensitive data returned by an access to randomPassword
flows to a logging call.

// Wait for user confirmation before exiting
fmt.Println("\nPress Enter to quit...")
bufio.NewReader(os.Stdin).ReadString('\n')

Check failure on line 265 in jobsdb/cmd/bench/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `(*bufio.Reader).ReadString` is not checked (errcheck)
}

func storeWorker(ctx context.Context, db jobsdb.JobsDB) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
batch := make([]*jobsdb.JobT, 0, batchSize)
for j := 0; j < batchSize; j++ {
job := &jobsdb.JobT{
UserID: fmt.Sprintf("user-%d", atomic.AddInt64(&eventCounter, 1)),
UUID: uuid.New(),
Parameters: []byte(fmt.Sprintf(`{"event_id": %d}`, atomic.LoadInt64(&eventCounter))),
CustomVal: "benchmark",
EventPayload: eventPayload,
}
batch = append(batch, job)
}

if err := db.Store(ctx, batch); err != nil {
return fmt.Errorf("failed to store batch: %w", err)
}
}
}
}

func readWorker(ctx context.Context, db jobsdb.JobsDB) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
jobs, err := db.GetUnprocessed(ctx, jobsdb.GetQueryParams{
JobsLimit: batchSize,
// ... existing query params ...
})
if err != nil {
return fmt.Errorf("failed to retrieve jobs: %w", err)
}

if len(jobs.Jobs) == 0 {
time.Sleep(10 * time.Millisecond) // Avoid tight loop if no jobs
continue
}

var statusList []*jobsdb.JobStatusT
for _, job := range jobs.Jobs {
statusList = append(statusList, &jobsdb.JobStatusT{
JobID: job.JobID,
JobState: jobsdb.Succeeded.State,
AttemptNum: 1,
ExecTime: time.Now(),
RetryTime: time.Now(),
ErrorCode: "200",
ErrorResponse: []byte(`{"success": true}`),
Parameters: []byte(`{"event_id": 1}`),
})
}

if err := db.UpdateJobStatus(ctx, statusList, []string{}, []jobsdb.ParameterFilterT{}); err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}
}
}
}
Loading
Loading