-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProducer.py
30 lines (26 loc) · 1.13 KB
/
Producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import praw
import time
from kafka import KafkaProducer
def auth_reddit():
# Create a connection to the Reddit API
reddit = praw.Reddit(
client_id="-0zelvLs3NH6lZEu5I5uOQ",
client_secret="JSsDEgmScZkIb1UDhqmVLxL0t2rofA",
user_agent="android:com.example.myredditapp:v1.2.3 (by u/vig1818)")
return reddit
if __name__ == "__main__":
# Create a connection to the Reddit API
reddit = auth_reddit()
# Create a connection to the Kafka API
producer = KafkaProducer(bootstrap_servers='localhost:9092')
topic_kafka = 'mytopic'
#loop through 10 submissions from the cars subreddit and send the comments to the Kafka API
subreddit = reddit.subreddit("bikes")
for submission in subreddit.hot(limit=10):
submission.comments.replace_more(limit=None)
for comment in submission.comments.list():
#periodically print comments to the console and send them to the Kafka API
#time.sleep(5)
#print(comment.body)
#print('-------------------\n')
producer.send(topic_kafka,key = submission.title.encode('utf-8'), value=comment.body.encode('utf-8'))