-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreal_time_producer.py
executable file
·74 lines (53 loc) · 2.39 KB
/
real_time_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import urllib.request
import time
import feedparser
from kafka import KafkaProducer
from nltk.corpus import stopwords
nltk_words = list(stopwords.words('english'))
from extract_info_common import extract_func
import json
# Base api query url
base_url = 'http://export.arxiv.org/api/query?';
# word to search -> put equal to 'a' to search for all papers
# we suppose that every paper contains at least once the word a
keyword = 'a'
# where to search the word
# all -> means to search for it in the title, or in author names,
# or in the abstract, or comments, ecc. -> see section 5.1 of
#https://arxiv.org/help/api/user-manual#query_details
prefix = 'all'
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
# Search parameters
search_query = prefix + ':' + keyword + "&sortBy=lastUpdatedDate&sortOrder=descending" # search for the keyword in all fields
start = 0 # start at the first result
total_results = 2000 # total results per cycle
results_per_iteration = 2 # results at a time
wait_time = 2 # number of seconds to wait beetween calls
# Kafka parameters
topic = 'real_time_arXiv'
print('Searching arXiv for %s' % search_query)
i = 0
while(True):
print("Produced papers between %i and %i" % (i,i+results_per_iteration))
query = 'search_query=%s&start=%i&max_results=%i' % (search_query,
i,
results_per_iteration)
# perform a GET request using the base_url and query
response = urllib.request.urlopen(base_url+query).read()
# uncomment to check that the produced papers info correspond to the consumed ones
# parse the response using feedparser
feed = feedparser.parse(response)
print(type(response))
#print(response)
# Run through each entry, and print out information
for entry in feed.entries:
extract_func(producer, topic, entry, nltk_words)
i += results_per_iteration
# Remember to play nice and sleep a bit before you call
# the api again!
# Comment these 2 lines to get more than 1 row (= results_per_iteration papers)
# in a batch in the Consumer
print('Sleeping for %i seconds' % wait_time )
time.sleep(wait_time)