Skip to content

Commit

Permalink
Merge pull request #2 from Ycallaer/feature/add_external_components_d…
Browse files Browse the repository at this point in the history
…ocker

Adding kafka connector to read data and push to ELK
  • Loading branch information
Ycallaer authored Mar 28, 2021
2 parents e35bb36 + 074c87a commit 823b413
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 25 deletions.
47 changes: 44 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Kafka proto
This repo contains a kafka producer for different type of proto files.
This repo is used to interact with protobuf data for kafka.
We have a python producer that creates several outputs on kafka, each topic with its own schema.
From here on you can use kafka connect or other technologies to push the data to other solutions.

The data itself was taken from a kaggle competition.

## Getting started
Expand All @@ -13,10 +16,48 @@ You will need the following tools to get started:
The following proto files are present
* etf.proto: A simple proto file with basic datatypes
* etf_complex.proto: A proto file with complex data types
* etf_http_ref.proto: A proto file that has an http link as a reference

## Starting the docker environment
The docker environment can be started with the single command:
```bash
docker-compose -f docker-compose.yml up -d
```

## Running the program
The following technologies will be started:
* Confluent center and all related technologies
* ElasticSearch and Kibana
* Postgresql

## Running the python producer
The following script needs to be executed:
```python
.../kafka_proto_py/kafka_proto_api/start_producer.py
```
You will need to set the working directory to the root of the project
You will need to set the working directory to the root of the project

## Running the kafka connector
The repo also contains an example of a kafka connector under the folder `kafka-connect`.
The file present will start a kafka connect job that will read the data from the configured topic and write it to
elasticsearch.
You can run the following command from the `kafka-connect directory`
```bash
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '@etf-2-elk.json'
```

## Installing additional connectors
If you want you can install additional kafka connectors by editing the file in `docker/DockerfileConnect`.
To make the maintenance easier, we install connectors through the command `confluent-hub install`.

## Querying the data
If you want to verify if the data is in elasticsearch you will need to create an index. This can be done from the
kibana console, which is reachable from the URL `http://localhost:5601`

Once the index is created you can create the dev tool to query the data as per the screenshot
![screenshot](docu_img/elasticsearch_index.JPG)

## Contributing
If you want to contribute, please abide by the following rules:
* Create a feature branch and add your changes
* Create a pull request to merge into master
* Have a reviewer and merge
39 changes: 38 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ services:
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

connect:
image: cnfldemos/cp-server-connect-datagen:0.4.0-6.1.0
#image: cnfldemos/cp-server-connect-datagen:0.4.0-6.1.0
build:
context: ./docker
dockerfile: DockerfileConnect
hostname: connect
container_name: connect
depends_on:
Expand Down Expand Up @@ -176,3 +179,37 @@ services:
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

postgressdb:
image: "postgres:11"
container_name: "postgress_kafka"
environment:
POSTGRES_HOST_AUTH_METHOD: "trust"
ports:
- "54320:5432"

es01:
image: docker.elastic.co/elasticsearch/elasticsearch:7.12.0
container_name: es01
environment:
- bootstrap.memory_lock=true
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- 9200:9200
- 9300:9300

kib01:
image: docker.elastic.co/kibana/kibana:7.12.0
depends_on:
- es01
container_name: kib01
ports:
- 5601:5601
environment:
ELASTICSEARCH_URL: http://es01:9200
ELASTICSEARCH_HOSTS: '["http://es01:9200"]'
6 changes: 6 additions & 0 deletions docker/DockerfileConnect
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM confluentinc/cp-kafka-connect:6.1.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latest
Binary file added docu_img/elasticsearch_index.JPG
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
19 changes: 19 additions & 0 deletions kafka-connect/etf-2-elk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "etf-simple-2-elk",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "etf_dummy_data",
"connection.url": "http://es01:9200",
"type.name": "_doc",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable" : "false",
"schema.ignore": "true",
"key.ignore": "true",
"auto.create.indices.at.start":"true",
"write.method":"insert"
}
}
3 changes: 1 addition & 2 deletions kafka_proto_api/config/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,5 @@
}



def getConfigForEnv(environment):
return config[environment]
return config[environment]
6 changes: 3 additions & 3 deletions kafka_proto_api/producer/proto_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class ProtoKafkaProducer:
def __init__(self,config_env):
def __init__(self, config_env):
self.config = config_env
self.topic_name = self.config["kafka_produce_topic"]

Expand Down Expand Up @@ -41,9 +41,9 @@ def produce(self, kafka_msg, kafka_key):
value=kafka_msg,
key=kafka_key,
on_delivery=self.on_delivery
)
)

self.producer.flush()

except Exception as e:
print("Error during producing to kafka topic. Stacktrace is %s",e)
print("Error during producing to kafka topic. Stacktrace is %s", e)
32 changes: 16 additions & 16 deletions kafka_proto_api/start_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def main():
producer_complex = ProtoKafkaProducer(config_env=getConfigForEnv("local_complex"))
producer_http_ref = ProtoKafkaProducer(config_env=getConfigForEnv("local_http_ref"))

data_set=load_data_file(filename="resources/etf.csv")
data_set = load_data_file(filename="resources/etf.csv")

for data_element in data_set:
etf = etf_pb2.etf(date=data_element[0],
Expand All @@ -42,26 +42,26 @@ def main():
etf_complex_data = etf_complex_pb2.etf_date(date=data_element[0])

etf_complex = etf_complex_pb2.etf_complex(date=etf_complex_data,
open=Decimal(data_element[1]),
high=Decimal(data_element[2]),
low=Decimal(data_element[3]),
close=Decimal(data_element[4]),
volume=int(data_element[5]),
openint=int(data_element[6]))
open=Decimal(data_element[1]),
high=Decimal(data_element[2]),
low=Decimal(data_element[3]),
close=Decimal(data_element[4]),
volume=int(data_element[5]),
openint=int(data_element[6]))

etf_http =etf_http_ref_pb2.etf_http_ref(date=data_element[0],
open=Decimal(data_element[1]),
high=Decimal(data_element[2]),
low=Decimal(data_element[3]),
close=Decimal(data_element[4]),
volume=int(data_element[5]),
openint=int(data_element[6]))
etf_http = etf_http_ref_pb2.etf_http_ref(date=data_element[0],
open=Decimal(data_element[1]),
high=Decimal(data_element[2]),
low=Decimal(data_element[3]),
close=Decimal(data_element[4]),
volume=int(data_element[5]),
openint=int(data_element[6]))

utc = str(arrow.now().timestamp)
producer.produce(kafka_msg=etf, kafka_key=utc)
producer_complex.produce(kafka_msg=etf_complex, kafka_key=utc)
producer_http_ref.produce(kafka_msg=etf_http, kafka_key=utc)


if __name__=="__main__":
main()
if __name__ == "__main__":
main()

0 comments on commit 823b413

Please sign in to comment.