Skip to content

Commit

Permalink
[#53] Update replication.factor inplace (#148)
Browse files Browse the repository at this point in the history
* [#53] Update replication.factor inplace

* Update docker, ssl, and tests

Since we'll be updating a topic's replication.factor in-place, we need
a multi-broker cluster to test against.

This is a pretty big diff, but the changes are essentially:

  * create new secrets to support multiple brokers

  * update the docker/docker-compose files to support a multi-broker
    cluster. This cluster can still be run locally with docker-compose
    up, and CI systems can use the docker-compose.test.yaml to execute a
    well-defined environment

  * update the tests to play nicely with the new secrets and docker
    setup (brokers aren't hard-coded to localhost anymore)

  * when a topic creation is applied, the resource will be considered
    'Pending' until its existence is confirmed via
    client.ReadTopic. This was less of an issue with one broker, but
    once we introduced others, the acc tests would sometimes fail
    because a topic would get created but subsequent steps that ran
    immediately afterwards wouldn't see it

* Update RF before adding partitions

If a plan both:
  * increases the partitions
  * changes the replication.factor

Then there's no point in creating new partitions with the old
replication.factor, just to go through and change it immediately
afterwards.

* Add IsReplicationFactorUpdating method to Client

Before we introduced the in-place replication.factor changes, the
replication.factor of a topic was updated atomically across all of its
partitions:

  * when the topic was first created, all of its initial partitions
    had the same rf

  * when partitions were added to a topic, they would be replicated
    according to the topic's rf

  * when a topic's rf was changed, it would be deleted and created
    again (bullet point 1)

Now that a topic's rf may be updated in-place, the rf is not updated
atomically; it's instead updated in a rolling/eventually-consistent
manner. For sometime after the apply, some partitions will have the
new rf, and some will still have the old rf. Eventually, however,
they'll all have the new rf.

This commit introduces a new method to determine whether or not an rf
update has finished propagating through all of a topic's partitions,
and changes the kafka_topic resource to be aware of this
propagating/updating state.

* Add acc tests for in-place rf update

Overall flow:
  * produce a well-known set of messages to a new topic
  * change that topic's rf and partition count
  * verify the changes went through and that no messages were lost

* Wait for rf update before adding partitions

We can't add partitions while a replication.factor update is ongoing,
so this commit waits for an rf update to finish before adding any new
partitions.

Because we're now waiting for the rf to update separately, the
IsReplicationFactorUpdating logic has been removed from
topicRefreshFunc (which only waits for partition and config updates to
finish).

* Add CanAlterReplicationFactor method

This new method determines if the cluster supports altering a topic's
replication.factor in-place. If so, then the provider will go ahead
and make alterations in-place; otherwise, it will replace the topic
like it did before.

populateAPIVersions was changed to ensure that the max api versions
are supported by every broker in the cluster.

* Send apiVersions request asynchronously to brokers
  • Loading branch information
SahilKang authored Feb 13, 2021
1 parent 1e243a0 commit 8bb7929
Show file tree
Hide file tree
Showing 34 changed files with 1,168 additions and 343 deletions.
14 changes: 3 additions & 11 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,18 @@ jobs:
- image: circleci/golang:1.13
environment:
TEST_RESULTS: /tmp/test-results
- image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
- image: mongey/kafka:5.3.3-new-certs
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_LOG_SEGMENT_BYTES: 1234
steps:
- checkout
- setup_remote_docker
- run: make test
- run: go get github.com/jstemmer/go-junit-report
- run: mkdir -p $TEST_RESULTS
- run:
name: Run Tests
command: |
trap "go-junit-report <${TEST_RESULTS}/go-test.out > ${TEST_RESULTS}/go-test-report.xml" EXIT
make testacc| tee ${TEST_RESULTS}/go-test.out
docker-compose -f docker-compose.yaml -f docker-compose.testacc.yaml up \
--abort-on-container-exit testacc | tee ${TEST_RESULTS}/go-test.out
- store_test_results:
path: /tmp/test-results

Expand Down
21 changes: 6 additions & 15 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
FROM confluentinc/cp-kafka:5.3.3
COPY secrets/ /etc/kafka/secrets
FROM golang:1.13

ENV KAFKA_ADVERTISED_LISTENERS=SSL://localhost:9092
ENV KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
ENV KAFKA_SSL_KEYSTORE_FILENAME=kafka.broker1.keystore.jks
ENV KAFKA_SSL_KEYSTORE_CREDENTIALS=broker1_keystore_creds
ENV KAFKA_SSL_KEY_CREDENTIALS=broker1_sslkey_creds
ENV KAFKA_SSL_TRUSTSTORE_FILENAME=kafka.broker1.truststore.jks
ENV KAFKA_SSL_TRUSTSTORE_CREDENTIALS=broker1_truststore_creds
ENV KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SSL
ENV KAFKA_SSL_CLIENT_AUTH=required
ENV KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
ENV KAFKA_LISTENER_NAME_INTERNAL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
ENV KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND="true"
ENV KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizer
WORKDIR /go/src/github.com/Mongey/terraform-provider-kafka/

COPY go.mod go.sum main.go GNUmakefile ./
COPY kafka kafka
COPY secrets secrets
32 changes: 32 additions & 0 deletions Dockerfile.kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FROM confluentinc/cp-kafka:5.4.1

ARG broker_id
ARG zookeeper_connect
ARG listener_host
ARG listener_port

COPY secrets/ /etc/kafka/secrets

ENV KAFKA_BROKER_ID=$broker_id
ENV KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
ENV KAFKA_ZOOKEEPER_CONNECT=$zookeeper_connect

# confluent's bash script looks for an 'SSL' suffix in listener names:
# https://github.com/confluentinc/cp-docker-images/blob/76d786d0243ea16626b8b46dba34ec0b1066de84/debian/kafka/include/etc/confluent/docker/configure#L65
ENV KAFKA_LISTENERS=INTERNAL_SSL://$listener_host:9090,EXTERNAL_SSL://$listener_host:9092
ENV KAFKA_ADVERTISED_LISTENERS=INTERNAL_SSL://$listener_host:9090,EXTERNAL_SSL://localhost:$listener_port
ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL_SSL:SSL,EXTERNAL_SSL:SSL
ENV KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL_SSL

ENV KAFKA_SSL_KEYSTORE_FILENAME=kafka.$listener_host.keystore.jks
ENV KAFKA_SSL_KEYSTORE_CREDENTIALS=password

ENV KAFKA_SSL_TRUSTSTORE_FILENAME=kafka.truststore.jks
ENV KAFKA_SSL_TRUSTSTORE_CREDENTIALS=password

ENV KAFKA_SSL_KEY_CREDENTIALS=password
ENV KAFKA_SSL_CLIENT_AUTH=required
ENV KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
ENV KAFKA_LISTENER_NAME_INTERNAL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
ENV KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=true
ENV KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.auth.SimpleAclAuthorizer
7 changes: 3 additions & 4 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ test:
go test ./...

testacc:
KAFKA_BOOTSTRAP_SERVER=localhost:9092 \
KAFKA_CA_CERT=../secrets/ca.crt \
KAFKA_CLIENT_CERT=../secrets/terraform-cert.pem \
KAFKA_CLIENT_KEY=../secrets/terraform-with-passphrase.pem \
KAFKA_CLIENT_KEY_PASSPHRASE=confluent \
KAFKA_CLIENT_CERT=../secrets/client.pem \
KAFKA_CLIENT_KEY=../secrets/client.key \
KAFKA_CLIENT_KEY_PASSPHRASE=test-pass \
KAFKA_SKIP_VERIFY=false \
KAFKA_ENABLE_TLS=true \
TF_LOG=DEBUG \
Expand Down
15 changes: 15 additions & 0 deletions docker-compose.testacc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
version: '3.2'
services:
testacc:
build: .
environment:
KAFKA_BOOTSTRAP_SERVER: kafka1:9090,kafka2:9090,kafka3:9090
entrypoint:
- make
- testacc
depends_on:
- zookeeper
- kafka1
- kafka2
- kafka3
63 changes: 41 additions & 22 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,50 @@ services:
extra_hosts:
- "moby:127.0.0.1"

kafka:
image: confluentinc/cp-kafka:5.4.1
kafka1:
build:
context: .
dockerfile: Dockerfile.kafka
args:
broker_id: 1
zookeeper_connect: zookeeper:2181
listener_host: kafka1
listener_port: 9092
ports:
- "9092:9092"
depends_on:
- zookeeper
volumes:
- type: bind
source: ./secrets
target: /etc/kafka/secrets
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker1.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: broker1_keystore_creds
KAFKA_SSL_KEY_CREDENTIALS: broker1_sslkey_creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker1.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker1_truststore_creds
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_SSL_CLIENT_AUTH: required
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_LISTENER_NAME_INTERNAL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
extra_hosts:
- "moby:127.0.0.1"

kafka2:
build:
context: .
dockerfile: Dockerfile.kafka
args:
broker_id: 2
zookeeper_connect: zookeeper:2181
listener_host: kafka2
listener_port: 9093
ports:
- "9093:9092"
depends_on:
- zookeeper
extra_hosts:
- "moby:127.0.0.1"

kafka3:
build:
context: .
dockerfile: Dockerfile.kafka
args:
broker_id: 3
zookeeper_connect: zookeeper:2181
listener_host: kafka3
listener_port: 9094
ports:
- "9094:9092"
depends_on:
- zookeeper
extra_hosts:
- "moby:127.0.0.1"
Loading

0 comments on commit 8bb7929

Please sign in to comment.