-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathentrypoint.sh
131 lines (108 loc) · 5.38 KB
/
entrypoint.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/bin/bash
set -ex
# set default values to optional configures
OFFSET_TOPIC=${OFFSET_TOPIC:-connect-offsets}
CONFIG_TOPIC=${CONFIG_TOPIC:-connect-config}
STATUS_TOPIC=${STATUS_TOPIC:-connect-status}
OFFSET_REPLICA=${OFFSET_REPLICA:-2}
CONFIG_REPLICA=${CONFIG_REPLICA:-2}
STATUS_REPLICA=${STATUS_REPLICA:-2}
CONNECTOR_NAME=${CONNECTOR_NAME:-kinesis-kafka-connector}
MAX_TASKS=${MAX_TASKS:-1}
MAX_CONNECTIONS=${MAX_CONNECTIONS:-1}
ENABLE_AGGREGATION=${ENABLE_AGGREGATION:-true}
RATE_LIMIT=${RATE_LIMIT:-100}
MAX_BUFFERED_TIME=${MAX_BUFFERED_TIME:-1500}
RECORD_TTL=${RECORD_TTL:-60000}
OUTSTANDING_RECORD_THRESHOLD=${OUTSTANDING_RECORD_THRESHOLD:-10}
if [ -z $REST_ADVERTISED_HOSTNAME ]; then
# use EC2 way to setup advertised hostname by default, use internal subnet ip
REST_ADVERTISED_HOSTNAME=$(curl --connect-timeout 1 --max-time 3 -s http://169.254.169.254/1.0/meta-data/local-ipv4)
fi
# check required options for fail-fast
if [ -z $BOOTSTRAP_SERVERS ]; then
echo "ERROR: BOOTSTRAP_SERVERS required option is unset"
exit 1
fi
if [ -z $REGION ]; then
echo "ERROR: REGION required option is unset"
exit 1
fi
if [ -z $KINESIS_STREAM ]; then
echo "ERROR: KINESIS_STREAM required option is unset"
exit 1
fi
if [ -z $KAFKA_TOPICS ]; then
echo "ERROR: KAFKA_TOPICS required option is unset"
exit 1
fi
# prepare worker configures
echo >> /worker.properties # append each option in dedicated line
# for issue: https://issues.apache.org/jira/browse/KAFKA-3988
sed -i '/^internal\.key\.converter\.schemas\.enable=.*/d' /worker.properties
echo "internal.key.converter.schemas.enable=false" >> /worker.properties
sed -i '/^internal\.value\.converter\.schemas\.enable=.*/d' /worker.properties
echo "internal.value.converter.schemas.enable=false" >> /worker.properties
if [ -z $GROUP_ID ]; then
echo "GROUP_ID environment variable is not set"
echo "running in standalone mode"
else
echo "running in distributed mode"
sed -i '/^group\.id=.*/d' /worker.properties
echo "group.id=${GROUP_ID}" >> /worker.properties
sed -i '/^offset\.storage\.topic=.*/d' /worker.properties
echo "offset.storage.topic=${OFFSET_TOPIC}" >> /worker.properties
sed -i '/^config\.storage\.topic=.*/d' /worker.properties
echo "config.storage.topic=${CONFIG_TOPIC}" >> /worker.properties
sed -i '/^status\.storage\.topic=.*/d' /worker.properties
echo "status.storage.topic=${STATUS_TOPIC}" >> /worker.properties
sed -i '/^offset\.storage\.replication\.factor=.*/d' /worker.properties
echo "offset.storage.replication.factor=${OFFSET_REPLICA}" >> /worker.properties
sed -i '/^config\.storage\.replication\.factor=.*/d' /worker.properties
echo "config.storage.replication.factor=${CONFIG_REPLICA}" >> /worker.properties
sed -i '/^status\.storage\.replication\.factor=.*/d' /worker.properties
echo "status.storage.replication.factor=${STATUS_REPLICA}" >> /worker.properties
fi
sed -i "/^bootstrap\.servers=.*/c\bootstrap.servers=${BOOTSTRAP_SERVERS}" /worker.properties
sed -i '/^rest\.host\.name=.*/d' /worker.properties
echo "rest.host.name=0.0.0.0" >> /worker.properties
if ! [ -z $REST_ADVERTISED_HOSTNAME ]; then
echo "rest.advertised.host.name=${REST_ADVERTISED_HOSTNAME}" >> /worker.properties
fi
# prepare connector configures
sed -i "/^name=.*/c\name=${CONNECTOR_NAME}" /kinesis-streams-kafka-connector.properties
sed -i "/^region=.*/c\region=${REGION}" /kinesis-streams-kafka-connector.properties
sed -i "/^streamName=.*/c\streamName=${KINESIS_STREAM}" /kinesis-streams-kafka-connector.properties
sed -i "/^topics=.*/c\topics=${KAFKA_TOPICS}" /kinesis-streams-kafka-connector.properties
sed -i "/^tasks\.max=.*/c\tasks.max=${MAX_TASKS}" /kinesis-streams-kafka-connector.properties
sed -i "/^maxConnections=.*/c\maxConnections=${MAX_CONNECTIONS}" /kinesis-streams-kafka-connector.properties
sed -i "/^aggregration=.*/c\aggregration=${ENABLE_AGGREGATION}" /kinesis-streams-kafka-connector.properties # "aggregration" here is NOT a typo
sed -i "/^rateLimit=.*/c\rateLimit=${RATE_LIMIT}" /kinesis-streams-kafka-connector.properties
sed -i "/^maxBufferedTime=.*/c\maxBufferedTime=${MAX_BUFFERED_TIME}" /kinesis-streams-kafka-connector.properties
sed -i "/^ttl=.*/c\ttl=${RECORD_TTL}" /kinesis-streams-kafka-connector.properties
sed -i "/^outstandingRecordsThreshold=.*/c\outstandingRecordsThreshold=${OUTSTANDING_RECORD_THRESHOLD}" /kinesis-streams-kafka-connector.properties
if ! [ -z $GROUP_ID ]; then
wget -q https://github.com/stedolan/jq/releases/download/jq-1.5/jq-linux64 -O /usr/bin/jq
chmod u+x /usr/bin/jq
grep -v '#' /kinesis-streams-kafka-connector.properties | \
jq -sR '{
"name": split("\n")[0:1][] | rtrimstr("\\r") | split("=") | (.[1]),
"config": [split("\n")[1:-1][] | rtrimstr("\\r") | split("=") | {(.[0]): .[1]}] | add
}' - > /kinesis-streams-kafka-connector.post
fi
# print connector configures
echo "worker.properties file content:"
cat /worker.properties
if [ -z $GROUP_ID ]; then
echo "kinesis-streams-kafka-connector.properties file content:"
cat /kinesis-streams-kafka-connector.properties
else
echo "kinesis-streams-kafka-connector REST post body:"
cat /kinesis-streams-kafka-connector.post
fi
# start connector
if [ -z $GROUP_ID ]; then
/usr/bin/connect-standalone /worker.properties /kinesis-streams-kafka-connector.properties
else
/usr/bin/connect-distributed /worker.properties
fi