-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathstream-processing.py
104 lines (87 loc) · 3.17 KB
/
stream-processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#1. read from kafka, kafka broker, kafka topic
#2. write back to kafka , kafka broker, new kafka topic
import atexit
import logging
import json
import sys
import time
# write back to kafka
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
# import spark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
logger_format = '%(asctime)-15s %(message)s'
logging.basicConfig(format=logger_format)
logger = logging.getLogger('stream-processing')
logger.setLevel(logging.INFO)
topic = None
target_topic = None
brokers = None
kafka_producer = None
def shutdown_hook(producer):
"""
a shutdown hook to be called before the shutdown
:param producer: instance of a kafka producer
:return: None
"""
try:
logger.info('Flushing pending messages to kafka, timeout is set to 10s')
producer.flush(10)
logger.info('Finish flushing pending messages to kafka')
except KafkaError as kafka_error:
logger.warn('Failed to flush pending messages to kafka, caused by: %s', kafka_error.message)
finally:
try:
logger.info('Closing kafka connection')
producer.close(10)
except Exception as e:
logger.warn('Failed to close kafka connection, caused by: %s', e.message)
def process(timeobj, rdd):
num_of_records = rdd.count()
if num_of_records == 0:
return
price_sum = rdd \
.map(lambda record: float(json.loads(record[1].decode('utf-8'))[0].get('LastTradePrice'))) \
.reduce(lambda a, b: a + b)
average = price_sum / num_of_records
logger.info('Received %d records from kafka, average price is %f' % (num_of_records, average))
current_time = time.time()
data = json.dumps({
'timestamp': current_time,
'average': average
})
try:
kafka_producer.send(target_topic, value=data)
except KafkaError as error:
logger.warn('Failed to send average stock price to kafka, caused by: %s', error.message)
if __name__ == '__main__':
if len(sys.argv) != 4:
print("Usage: stream-process.py [topic] [target-topic] [broker-list]")
exit(1)
# - create SparkContext and StreamingContext
# - setup connection to spark cluster
## spark run on local
### 2 : number of thread
sc = SparkContext("local[2]", "StockAveragePrice")
#logging
sc.setLogLevel('ERROR')
### every 5 second
ssc = StreamingContext(sc, 5)
# ignore the name of index 1 (which is steaming processing )
topic, target_topic, brokers = sys.argv[1:]
# - instantiate a kafka stream for processing
# - create a data stream from spark
### - stream data - the source of data - the location of Kafka
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': brokers})
directKafkaStream.foreachRDD(process)
# - instantiate a simple kafka producer
kafka_producer = KafkaProducer(
bootstrap_servers=brokers
)
# - setup proper shutdown hook
atexit.register(shutdown_hook, kafka_producer)
# - start data streaming
ssc.start()
ssc.awaitTermination()