Skip to content

Commit

Permalink
chore: sync release v1.30.1 to main branch (#4924)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Jul 24, 2024
2 parents 663cdd3 + ea1f21d commit ee3131f
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 41 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [1.30.1](https://github.com/rudderlabs/rudder-server/compare/v1.30.0...v1.30.1) (2024-07-24)


### Bug Fixes

* type conversion from float64 to int ([#4921](https://github.com/rudderlabs/rudder-server/issues/4921)) ([f5423ac](https://github.com/rudderlabs/rudder-server/commit/f5423acbfa33bcd4baf4d791940cc6b6a71b147b))

## [1.30.0](https://github.com/rudderlabs/rudder-server/compare/v1.29.0...v1.30.0) (2024-07-22)


Expand Down
5 changes: 2 additions & 3 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,6 @@ func (proc *Handle) loadConfig() {
proc.config.eventSchemaV2Enabled = config.GetBoolVar(false, "EventSchemas2.enabled")
proc.config.batchDestinations = misc.BatchDestinations()
proc.config.transformTimesPQLength = config.GetIntVar(5, 1, "Processor.transformTimesPQLength")
proc.config.transformerURL = config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090")
// GWCustomVal is used as a key in the jobsDB customval column
proc.config.GWCustomVal = config.GetStringVar("GW", "Gateway.CustomVal")

Expand Down Expand Up @@ -1830,8 +1829,8 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf
if id, ok := rudderTyperTPID.(string); ok && id != "" {
trackingPlanID = id
}
if version, ok := rudderTyperTPVersion.(int); ok && version > 0 {
trackingPlanVersion = version
if version, ok := rudderTyperTPVersion.(float64); ok && version > 0 {
trackingPlanVersion = int(version)
}
}
shallowEventCopy.Metadata.TrackingPlanId = trackingPlanID
Expand Down
138 changes: 100 additions & 38 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"slices"
"sort"
Expand All @@ -31,6 +29,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"

"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/internal/enricher"
Expand Down Expand Up @@ -900,17 +899,111 @@ func initProcessor() {
format.MaxDepth = 10
}

var _ = Describe("Tracking Plan Validation", Ordered, func() {
initProcessor()

var c *testContext

BeforeEach(func() {
c = &testContext{}
c.Setup()
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) // crash recovery check
})
AfterEach(func() {
c.Finish()
})

Context("RudderTyper", func() {
It("TrackingPlanId and TrackingPlanVersion", func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
mockTransformer.EXPECT().Validate(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(transformer.Response{})

isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone)
Expect(err).To(BeNil())

processor := NewHandle(config.Default, mockTransformer)
processor.isolationStrategy = isolationStrategy
processor.config.archivalEnabled = config.SingleValueLoader(false)
Setup(processor, c, false, false)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")

_ = processor.processJobsForDest(
"",
subJob{
subJobs: []*jobsdb.JobT{
{
UUID: uuid.New(),
JobID: 1,
CreatedAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC),
ExpireAt: time.Date(2020, 0o4, 28, 23, 26, 0o0, 0o0, time.UTC),
CustomVal: gatewayCustomVal[0],
EventPayload: createBatchPayload(
WriteKeyEnabledNoUT,
"2001-01-02T02:23:45.000Z",
[]mockEventData{
{
id: "1",
jobid: 1,
originalTimestamp: "2000-01-02T01:23:45",
expectedOriginalTimestamp: "2000-01-02T01:23:45.000Z",
sentAt: "2000-01-02 01:23",
expectedSentAt: "2000-01-02T01:23:00.000Z",
expectedReceivedAt: "2001-01-02T02:23:45.000Z",
},
},
func(e mockEventData) string {
return fmt.Sprintf(`
{
"rudderId": "some-rudder-id",
"messageId": "message-%[1]s",
"some-property": "property-%[1]s",
"originalTimestamp": %[2]q,
"sentAt": %[3]q,
"context": {
"ruddertyper": {
"trackingPlanId": "tracking-plan-id",
"trackingPlanVersion": 123
}
}
}
`,
e.id,
e.originalTimestamp,
e.sentAt,
)
},
),
EventCount: 1,
LastJobStatus: jobsdb.JobStatusT{},
Parameters: createBatchParameters(SourceIDEnabledNoUT),
WorkspaceId: sampleWorkspaceID,
},
},
},
)

Expect(c.MockObserver.calls).To(HaveLen(1))
for _, v := range c.MockObserver.calls {
for _, e := range v.events {
Expect(e.Metadata.TrackingPlanId).To(BeEquivalentTo("tracking-plan-id"))
Expect(e.Metadata.TrackingPlanVersion).To(BeEquivalentTo(123))
}
}
})
})
})

var _ = Describe("Processor with event schemas v2", Ordered, func() {
initProcessor()

var c *testContext
transformerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"routerTransform": {}}`))
w.WriteHeader(http.StatusOK)
}))

prepareHandle := func(proc *Handle) *Handle {
proc.config.transformerURL = transformerServer.URL
proc.eventSchemaDB = c.mockEventSchemasDB
proc.config.eventSchemaV2Enabled = true
isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone)
Expand All @@ -930,10 +1023,6 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
c.Finish()
})

AfterAll(func() {
transformerServer.Close()
})

Context("event schemas DB", func() {
It("should process events and write to event schemas DB", func() {
messages := map[string]mockEventData{
Expand Down Expand Up @@ -1112,13 +1201,8 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() {
initProcessor()

var c *testContext
transformerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"routerTransform": {}}`))
w.WriteHeader(http.StatusOK)
}))

prepareHandle := func(proc *Handle) *Handle {
proc.config.transformerURL = transformerServer.URL
proc.archivalDB = c.mockArchivalDB
proc.config.archivalEnabled = config.SingleValueLoader(true)
isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone)
Expand All @@ -1138,10 +1222,6 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() {
c.Finish()
})

AfterAll(func() {
transformerServer.Close()
})

Context("archival DB", func() {
It("should process events and write to archival DB", func() {
messages := map[string]mockEventData{
Expand Down Expand Up @@ -1457,13 +1537,8 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func()
initProcessor()

var c *testContext
transformerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"routerTransform": {}}`))
w.WriteHeader(http.StatusOK)
}))

prepareHandle := func(proc *Handle) *Handle {
proc.config.transformerURL = transformerServer.URL
isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone)
Expect(err).To(BeNil())
proc.isolationStrategy = isolationStrategy
Expand All @@ -1478,10 +1553,6 @@ var _ = Describe("Processor with trackedUsers feature enabled", Ordered, func()
c.Finish()
})

AfterAll(func() {
transformerServer.Close()
})

Context("trackedUsers", func() {
BeforeEach(func() {
// crash recovery check
Expand Down Expand Up @@ -1778,13 +1849,8 @@ var _ = Describe("Processor", Ordered, func() {
initProcessor()

var c *testContext
transformerServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(`{"routerTransform": {}}`))
w.WriteHeader(http.StatusOK)
}))

prepareHandle := func(proc *Handle) *Handle {
proc.config.transformerURL = transformerServer.URL
isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone)
Expect(err).To(BeNil())
proc.isolationStrategy = isolationStrategy
Expand All @@ -1799,10 +1865,6 @@ var _ = Describe("Processor", Ordered, func() {
c.Finish()
})

AfterAll(func() {
transformerServer.Close()
})

Context("Initialization", func() {
It("should initialize (no jobs to recover)", func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)
Expand Down

0 comments on commit ee3131f

Please sign in to comment.