Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Active campaign batching #2799

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/v0/destinations/active_campaign/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const { getMappingConfig } = require('../../util');
// https://developers.activecampaign.com/reference/create-a-new-tag
// https://developers.activecampaign.com/reference/contact-tags
// https://developers.activecampaign.com/reference/update-list-status-for-contact
// https://developers.activecampaign.com/reference/bulk-import-contacts

// For PAGE
// https://developers.activecampaign.com/reference/site-tracking
Expand All @@ -20,6 +21,8 @@ const { getMappingConfig } = require('../../util');
// https://developers.activecampaign.com/reference/create-a-new-event-name-only
// https://developers.activecampaign.com/reference/track-event

const MAX_BATCH_SIZE = 250;

const CONFIG_CATEGORIES = {
IDENTIFY: {
name: 'ACIdentify',
Expand All @@ -30,6 +33,7 @@ const CONFIG_CATEGORIES = {
mergeTagWithContactUrl: '/api/3/contactTags',
mergeFieldValueWithContactUrl: '/api/3/fieldValues',
mergeListWithContactUrl: '/api/3/contactLists',
contactBulkImportUrl: '/api/3/import/bulk_import',
},

PAGE: { name: 'ACPage', endPoint: '/api/3/siteTrackingDomains' },
Expand Down Expand Up @@ -70,4 +74,5 @@ module.exports = {
CONFIG_CATEGORIES,
MAPPING_CONFIG,
getHeader,
MAX_BATCH_SIZE
};
42 changes: 42 additions & 0 deletions src/v0/destinations/active_campaign/data/ACIdentifyBatch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[
{
"destKey": "email",
"sourceKeys": "email",
"sourceFromGenericMap": true,
"required": true
},
{
"destKey": "phone",
"sourceKeys": "phone",
"sourceFromGenericMap": true,
"required": false
},
{
"destKey": "firstName",
"sourceKeys": "firstName",
"sourceFromGenericMap": true,
"required": false
},
{
"destKey": "lastName",
"sourceKeys": "lastName",
"sourceFromGenericMap": true,
"required": false
},
{
"destKey": "tags",
"sourceKeys": ["context.traits.tags", "traits.tags"],
"required": false
},
{
"destKey": "fieldInfo",
"sourceKeys": ["context.traits.fieldInfo", "traits.fieldInfo"],
"required": false
},
{
"destKey": "lists",
"sourceKeys": ["context.traits.lists", "traits.listse"],
"required": false
}
]

158 changes: 153 additions & 5 deletions src/v0/destinations/active_campaign/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
/* eslint-disable no-empty */
const get = require('get-value');
const { EventType } = require('../../../constants');
const { CONFIG_CATEGORIES, MAPPING_CONFIG, getHeader } = require('./config');
const { CONFIG_CATEGORIES, MAPPING_CONFIG, getHeader, MAX_BATCH_SIZE } = require('./config');
const {
defaultRequestConfig,
constructPayload,
defaultPostRequestConfig,
removeUndefinedAndNullValues,
simpleProcessRouterDest,
// simpleProcessRouterDest,
handleRtTfSingleEventError,
getErrorRespEvents,
getSuccessRespEvents
} = require('../../util');
const { errorHandler } = require('./util');
const { httpGET, httpPOST } = require('../../../adapters/network');
Expand Down Expand Up @@ -393,6 +396,33 @@ const identifyRequestHandler = async (message, category, destination) => {
// sync the enriched payload
return responseBuilderSimple(payload, category, destination);
};

const buildListObject = (listInfo) => {
const output = listInfo.reduce((result, item) => {
if (item.status === 'subscribe' || item.status === 'unsubscribe') {
if (!result[item.status]) {
// eslint-disable-next-line no-param-reassign
result[item.status] = [];
}
result[item.status].push({ listid: item.id });
}
return result;
}, {});
return output;
};

const buildFieldObject = (fieldInfo) => {

};

const identifyBatchRequestHandler = async (message, category, destination) => {
// create skeleton contact payload
let contactPayload = constructPayload(message, MAPPING_CONFIG[category.name]);
contactPayload = removeUndefinedAndNullValues(contactPayload);
const listInput = buildListObject(contactPayload.lists);
const fieldInput = buildFieldObject(contactPayload);

}
// This method handles any page request
// Creates the payload as per API spec and returns to rudder-server
// Ref - https://developers.activecampaign.com/reference/site-tracking
Expand Down Expand Up @@ -593,9 +623,127 @@ const process = async (event) => {
return result;
};

const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return respList;
// const processRouterDest = async (inputs, reqMetadata) => {
// const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
// return respList;
// };

const getSizeInBytes = (obj) => {
let str = null;
if (typeof obj === 'string') {
// If obj is a string, then use it
str = obj;
} else {
// Else, make obj into a string
str = JSON.stringify(obj);
}
// Get the length of the Uint8Array
const bytes = new TextEncoder().encode(str).length;
return bytes;
};


const getEventChunks = (identifyEvents) => {
const eventChunks = [];
let batchedData = [];
let metadata = [];
let size = 0;

identifyEvents.forEach((events) => {
const objSize = getSizeInBytes(events);
size += objSize;
if (batchedData.length === MAX_BATCH_SIZE || size > 399999) {
eventChunks.push({ data: batchedData, metadata });
batchedData = [];
metadata = [];
size = 0;
}
metadata.push(events.metadata);
batchedData.push(events.message.body.JSON);
});

if (batchedData.length > 0) {
eventChunks.push({ data: batchedData, metadata });
}

return eventChunks;
};

const batchEvents = (successRespList) => {
const batchedResponseList = [];
const identifyEvents = [];
// Filtering out group calls to process batching
successRespList.forEach((resp) => {
if (!resp.message.endpoint.includes('import/bulk_import')) {
batchedResponseList.push(
getSuccessRespEvents(resp.message, [resp.metadata], resp.destination),
);
} else {
identifyEvents.push(resp);
}
});

if (identifyEvents.length > 0) {
// Extracting metadata, destination and message from the first event in a batch
const { destination, message } = identifyEvents[0];
const { headers, endpoint } = message;

// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = getEventChunks(identifyEvents);

/**
* Ref : https://www.customer.io/docs/api/track/#operation/batch
*/
eventChunks.forEach((chunk) => {
const request = defaultRequestConfig();
request.endpoint = endpoint;
request.headers = { ...headers };
// Setting the request body to an object with a single property called "batch" containing the batched data
request.body.JSON = { batch: chunk.data };

batchedResponseList.push(getSuccessRespEvents(request, chunk.metadata, destination));
});
}
return batchedResponseList;
};

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
const { destination } = inputs[0];
inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
successRespList.push({
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const transformedPayload = {
message: process(event),
metadata: event.metadata,
destination,
};
successRespList.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
});

if (successRespList.length > 0) {
batchResponseList = batchEvents(successRespList);
}

return [...batchResponseList, ...batchErrorRespList];
};
module.exports = { process, processRouterDest };