Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: include commas in the profile size, handle failure case of profile more than 500kB in size #5241

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests and also add ratelimits in HTTP client

Copy link
Contributor Author

@yashasvibajpai yashasvibajpai Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be handling them now in separate PRs, this PR is mainly a followup to a reported issue

Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
)

const (
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/"
BATCHSIZE = 10000
MAXPAYLOADSIZE = 4900000
IMPORT_ID_SEPARATOR = ":"
KlaviyoAPIURL = "https://a.klaviyo.com/api/profile-bulk-import-jobs/"
BATCHSIZE = 10000
MAXALLOWEDPROFILESIZE = 512000
MAXPAYLOADSIZE = 4900000
IMPORT_ID_SEPARATOR = ":"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary
Expand Down Expand Up @@ -80,7 +81,7 @@
return nil, fmt.Errorf("failed to marshal profile: %w", err)
}

profileSize := len(profileJSON)
profileSize := len(profileJSON) + 1 // +1 for comma character

Check warning on line 84 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L84

Added line #L84 was not covered by tests

if (chunkSize+profileSize >= maxBytes || len(chunk) == maxElements) && len(chunk) > 0 {
chunks = append(chunks, chunk)
Expand Down Expand Up @@ -308,6 +309,8 @@
startTime := time.Now()
destination := asyncDestStruct.Destination
var failedJobs []int64
var abortedJobs []int64
var abortReason string

Check warning on line 313 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L312-L313

Added lines #L312 - L313 were not covered by tests
var successJobs []int64
filePath := asyncDestStruct.FileName
importingJobIDs := asyncDestStruct.ImportingJobIDs
Expand All @@ -317,6 +320,7 @@
statLabels := stats.Tags{
"module": "batch_router",
"destType": destType,
"destID": destinationID,

Check warning on line 323 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L323

Added line #L323 was not covered by tests
}
file, err := os.Open(filePath)
if err != nil {
Expand All @@ -325,6 +329,7 @@
defer file.Close()
var combinedProfiles []Profile
scanner := bufio.NewScanner(file)
profileSizeStat := kbu.statsFactory.NewTaggedStat("profile_size", stats.HistogramType, statLabels)

Check warning on line 332 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L332

Added line #L332 was not covered by tests
for scanner.Scan() {
var input Input
line := scanner.Text()
Expand All @@ -333,6 +338,15 @@
return kbu.generateKlaviyoErrorOutput("Error while parsing JSON.", err, importingJobIDs, destinationID)
}
profileStructure := kbu.ExtractProfile(input)
// if profileStructure length is more than 5 mb, throw an error
profileStructureJSON, _ := json.Marshal(profileStructure)
profileSize := float64(len(profileStructureJSON))
profileSizeStat.Observe(float64(profileSize)) // Record the size in the histogram
if float64(len(profileStructureJSON)) >= MAXALLOWEDPROFILESIZE {
abortReason = "Error while marshaling profiles. The profile size exceeds Klaviyo's limit of 500 kB for a single profile."
abortedJobs = append(abortedJobs, int64(input.Metadata.JobID))
continue

Check warning on line 348 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L341-L348

Added lines #L341 - L348 were not covered by tests
}
combinedProfiles = append(combinedProfiles, profileStructure)
}

Expand Down Expand Up @@ -379,12 +393,14 @@
if resp.StatusCode != 202 {
failedJobs = append(failedJobs, importingJobIDs[idx])
kbu.logger.Error("Got non 202 as statusCode.", errors.New(string(bodyBytes)))
continue

Check warning on line 396 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L396

Added line #L396 was not covered by tests
}
var uploadresp UploadResp
uploadRespErr := json.Unmarshal((bodyBytes), &uploadresp)
if uploadRespErr != nil {
failedJobs = append(failedJobs, importingJobIDs[idx])
kbu.logger.Error("Error while unmarshaling response.", uploadRespErr)
continue

Check warning on line 403 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L403

Added line #L403 was not covered by tests
}
importIds = append(importIds, uploadresp.Data.Id)
}
Expand All @@ -400,6 +416,8 @@
return common.AsyncUploadOutput{
ImportingParameters: importParameters,
FailedJobIDs: failedJobs,
AbortJobIDs: abortedJobs,
AbortReason: abortReason,

Check warning on line 420 in router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/klaviyobulkupload/klaviyobulkupload.go#L419-L420

Added lines #L419 - L420 were not covered by tests
FailedCount: len(failedJobs),
ImportingJobIDs: successJobs,
DestinationID: destination.ID,
Expand Down
Loading