diff --git a/.env b/.env index e9bdb69..a37211a 100644 --- a/.env +++ b/.env @@ -4,4 +4,5 @@ ELK_DISTRIBUTION_VERSION=7.12.0 CONFLUENT_DISTRO_REPO=confluentinc MYSQL_DISTRIBUTION_VERSION=5.7 INFLUXDB_DISTRIBUTION_VERSION=1.8 -INFLUXDB_DISTRO_REPO= \ No newline at end of file +INFLUXDB_DISTRO_REPO= +MYSQL_DISTRIBUTION_REPO= \ No newline at end of file diff --git a/README.md b/README.md index 5ceaab0..891c0e4 100644 --- a/README.md +++ b/README.md @@ -27,14 +27,15 @@ docker-compose -f docker-compose.yml up -d The following technologies will be started: * Confluent center and all related technologies * ElasticSearch and Kibana -* Postgresql +* MySql +* InfluxDB ## Running the python producer The following script needs to be executed: ```python .../kafka_proto_py/kafka_proto_api/start_producer.py ``` -You will need to set the working directory to the root of the project +You will need to set the working directory to the root of the project. ## Running the kafka connector The repo also contains an example of a kafka connector under the folder `kafka-connect`. @@ -44,11 +45,15 @@ You can run the following command from the `kafka-connect directory` ```bash curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '@etf-2-elk.json' ``` +Additional connectors have been added and can be run with the above command. Don't forget to alter the data input. ## Installing additional connectors If you want you can install additional kafka connectors by editing the file in `docker/DockerfileConnect`. To make the maintenance easier, we install connectors through the command `confluent-hub install`. +If `confluent-hub` is not present I suggest you download the individual zip files, place them in the docker directory +and unzip them. Using the `ADD` command in docker, you can add the individual directories. + ## Changing image versions and repos If you want to change the versions of certain images or change the repo from where you want to pull (in case you have mirrored the images), you can edit the `.env` file, which is present in the root directory. diff --git a/docker-compose.yml b/docker-compose.yml index 5685115..279124a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -220,7 +220,7 @@ services: INFLUXDB_REPORTING_DISABLED: "false" mysqldb: - image: mysql:${MYSQL_DISTRIBUTION_VERSION} + image: ${MYSQL_DISTRIBUTION_REPO}mysql:${MYSQL_DISTRIBUTION_VERSION} restart: always environment: MYSQL_DATABASE: 'mysqldb' diff --git a/docker/DockerfileConnect b/docker/DockerfileConnect index 757d847..a359739 100644 --- a/docker/DockerfileConnect +++ b/docker/DockerfileConnect @@ -6,4 +6,8 @@ ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latest -RUN confluent-hub install --no-prompt confluentinc/kafka-connect-influxdb:latest \ No newline at end of file +RUN confluent-hub install --no-prompt confluentinc/kafka-connect-influxdb:latest + +#ADD confluentinc-kafka-connect-datagen-0.4.0 /usr/share/confluent-hub-components/ +#ADD confluentinc-kafka-connect-elasticsearch-11.0.3 /usr/share/confluent-hub-components/ +#ADD confluentinc-kafka-connect-influxdb-1.2.1 /usr/share/confluent-hub-components/1 \ No newline at end of file diff --git a/kafka-connect/etf-2-influxdb.json b/kafka-connect/etf-2-influxdb.json new file mode 100644 index 0000000..eaff06d --- /dev/null +++ b/kafka-connect/etf-2-influxdb.json @@ -0,0 +1,20 @@ +{ +"name" : "etf-2-influxdb", +"config" : { + "connector.class":"io.confluent.influxdb.InfluxDBSinkConnector", + "errors.retry.timeout":"3600000", + "errors.log.include.messages":"true", + "topics":"etf_dummy_data", + "tasks.max":"1", + "errors.retry.delay.max.ms":"60000", + "influxdb.db":"etf-dummy-data-raw", + "measurement.name.format":"dummy_data_raw", + "influxdb.url":"http://influxdb:8086", + "value.converter": "io.confluent.connect.protobuf.ProtobufConverter", + "value.converter.schemas.enable": "false", + "value.converter.schema.registry.url": "http://schema-registry:8081", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "key.converter.schemas.enable" : "false", + "errors.log.enable":"true" + } +} diff --git a/kafka-connect/etf-complex-2-elk.json b/kafka-connect/etf-complex-2-elk.json new file mode 100644 index 0000000..2060865 --- /dev/null +++ b/kafka-connect/etf-complex-2-elk.json @@ -0,0 +1,19 @@ +{ + "name": "etf-complex-2-elk", + "config": { + "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", + "tasks.max": "1", + "topics": "etf_dummy_data_complex", + "connection.url": "http://es01:9200", + "type.name": "_doc", + "value.converter": "io.confluent.connect.protobuf.ProtobufConverter", + "value.converter.schemas.enable": "false", + "value.converter.schema.registry.url": "http://schema-registry:8081", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "key.converter.schemas.enable" : "false", + "schema.ignore": "true", + "key.ignore": "true", + "auto.create.indices.at.start":"true", + "write.method":"insert" + } +} diff --git a/kafka-connect/etf-complex-2-influxdb.json b/kafka-connect/etf-complex-2-influxdb.json new file mode 100644 index 0000000..9c2d794 --- /dev/null +++ b/kafka-connect/etf-complex-2-influxdb.json @@ -0,0 +1,20 @@ +{ + "name" : "etf-complex-2-influxdb", + "config" : { + "connector.class":"io.confluent.influxdb.InfluxDBSinkConnector", + "errors.retry.timeout":"3600000", + "errors.log.include.messages":"true", + "topics":"etf_dummy_data_complex", + "tasks.max":"1", + "errors.retry.delay.max.ms":"60000", + "influxdb.db":"etf-dummy-data-raw", + "measurement.name.format":"dummy_data_complex_raw", + "influxdb.url":"http://influxdb:8086", + "value.converter": "io.confluent.connect.protobuf.ProtobufConverter", + "value.converter.schemas.enable": "false", + "value.converter.schema.registry.url": "http://schema-registry:8081", + "key.converter":"org.apache.kafka.connect.storage.StringConverter", + "key.converter.schemas.enable" : "false", + "errors.log.enable":"true" + } + }