-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.bal
40 lines (35 loc) · 1.06 KB
/
main.bal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import ballerina/uuid;
import ballerinax/kafka;
type LeadAnalytics record {|
string firstName;
string lastName;
string email;
string phone;
string company;
string status;
string 'source;
float score;
string owner;
string createdDate;
string lastContactDate;
string lastActivity;
string converted;
string conversionDate;
float opportunityAmount;
string opportunityStage;
|};
final kafka:ConsumerConfiguration consumerConfiguration = {
groupId: "lead-uuid",
topics: ["lead-analytics"],
pollingInterval: 1,
autoCommit: false
};
final Client leadsDbClient = check new;
listener kafka:Listener kafkaListener = new (kafka:DEFAULT_URL, consumerConfiguration);
isolated service on kafkaListener {
remote function onConsumerRecord(kafka:Caller caller, LeadAnalytics[] leadsData) returns error? {
LeadAnalyticsDataInsert[] insertData = from var lead in leadsData
select {id: uuid:createType1AsString(), ...lead};
_ = check leadsDbClient->/leadanalyticsdata.post(insertData);
}
}