Skip to content

Commit

Permalink
Merge pull request #1 from Ycallaer/feature/add_http_link_proto
Browse files Browse the repository at this point in the history
Adding support for proto files annotated with an http link
  • Loading branch information
Ycallaer authored Mar 28, 2021
2 parents 6cdb143 + 9b08480 commit e35bb36
Show file tree
Hide file tree
Showing 6 changed files with 1,223 additions and 0 deletions.
15 changes: 15 additions & 0 deletions kafka_proto_api/config/configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from kafka_proto_api.protos.etf_pb2 import etf
from kafka_proto_api.protos.etf_complex_pb2 import etf_complex
from kafka_proto_api.protos.etf_http_ref_pb2 import etf_http_ref

config = {
'local': {
Expand Down Expand Up @@ -29,6 +30,20 @@
'auto.register.schemas': True,
'schemaregistry.url': 'http://localhost:8081',
'proto_msg_type': etf_complex
},
'local_http_ref': {
'bootstrap_servers': 'localhost:9092',
'kafkaMaxRequestSize': 50331648,
'kafkaMaxMessageBytes': 50331648,
'kafka_produce_topic': 'etf_dummy_http_ref',
'fetch.wait.max.ms': 60000,
'session.timeout.ms': 180000,
'heartbeat.interval.ms': 60000,
'queue.buffering.max.ms': 0,
'queue.buffering.max.messages': 15000,
'auto.register.schemas': True,
'schemaregistry.url': 'http://localhost:8081',
'proto_msg_type': etf_http_ref
}
}

Expand Down
246 changes: 246 additions & 0 deletions kafka_proto_api/protos/etf_http_ref_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions kafka_proto_api/start_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import arrow
from kafka_proto_api.protos import etf_pb2
from kafka_proto_api.protos import etf_complex_pb2
from kafka_proto_api.protos import etf_http_ref_pb2
import csv
from decimal import Decimal

Expand All @@ -25,6 +26,7 @@ def main():
print("Initializing the kafka producer")
producer = ProtoKafkaProducer(config_env=getConfigForEnv("local"))
producer_complex = ProtoKafkaProducer(config_env=getConfigForEnv("local_complex"))
producer_http_ref = ProtoKafkaProducer(config_env=getConfigForEnv("local_http_ref"))

data_set=load_data_file(filename="resources/etf.csv")

Expand All @@ -47,9 +49,18 @@ def main():
volume=int(data_element[5]),
openint=int(data_element[6]))

etf_http =etf_http_ref_pb2.etf_http_ref(date=data_element[0],
open=Decimal(data_element[1]),
high=Decimal(data_element[2]),
low=Decimal(data_element[3]),
close=Decimal(data_element[4]),
volume=int(data_element[5]),
openint=int(data_element[6]))

utc = str(arrow.now().timestamp)
producer.produce(kafka_msg=etf, kafka_key=utc)
producer_complex.produce(kafka_msg=etf_complex, kafka_key=utc)
producer_http_ref.produce(kafka_msg=etf_http, kafka_key=utc)


if __name__=="__main__":
Expand Down
39 changes: 39 additions & 0 deletions proto-schema/etf_http_ref.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
syntax = "proto3";

import "google/protobuf/descriptor.proto";

message etf_http_ref {
option (term_source) = TERM_SOURCE_FIBO;
option (term_source_ref) = "https://en.wikipedia.org/wiki/ISO_8601";

string date = 1;
double open=2;
double high=3;
double low=4;
double close=5;
int64 volume=6;
int64 openint=7;
}

enum TermSource {
TERM_SOURCE_UNSPECIFIED = 0;
TERM_SOURCE_ONE = 1;
TERM_SOURCE_FIBO = 2;
TERM_SOURCE_ISO = 3;
TERM_SOURCE_ISO20022 = 4;
TERM_SOURCE_FIX = 5;
TERM_SOURCE_FPML = 6;
}

extend google.protobuf.MessageOptions {
string coding_scheme = 55002;
TermSource term_source = 55003;
string term_source_ref = 55004;
string msg_term_link = 55005;
}

extend google.protobuf.FieldOptions {
bool is_identifier = 56002;
string external_schema = 56003;
string field_term_link = 56004;
}
Loading

0 comments on commit e35bb36

Please sign in to comment.