-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtweetStreamer.py
129 lines (97 loc) · 3.96 KB
/
tweetStreamer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- coding: utf-8 -*-
"""
Created on Tue May 19 22:36:53 2020
@author: Aykut Caner
"""
import tweepy
import keys
import sqlite3
import winsound
class TweetListener(tweepy.StreamListener):
""" A class that inheretred from Tweepy.StreamListener to initialize streaming and processing retrieved tweets """
def __init__(self,api,connection, limit=10 ):
self.tweet_count = 0
self.TWEET_LIMIT = limit
self.connection = connection
super().__init__(api)
def on_status(self, status):
# ignore retweets
if status.text.startswith('RT'):
return
#create a dictionary that keeps the current tweet
field = {}
field['created_at'] = status.created_at
field['screen_name'] = status.user.screen_name
field['language'] = status.lang
field['location'] = status.user.location
try:
#try to get extended tweet
tweet_text = status.extended_tweet['full_text']
field['text'] = status.extended_tweet['full_text']
except Exception as er:
#if it is not extended, get the normal tweet
#print(er)
tweet_text = status.text
field['text'] = status.text
#print details of the tweet
print(f'Created at: {field["created_at"]}')
print(f'Screen name: {field["screen_name"]}')
print(f'Language: {field["language"]}')
print(f'Location: {field["location"]}')
print(f'{tweet_text}')
print('-'*10 + '\n')
#ignore the tweets that do not contain track words
track=['corona','COVID-19','covid','coronavirus']
accept='no'
for word in track:
if word in field['text'].lower():
accept = 'yes'
break
if accept == 'no':
return
# insert tweet details to database
cursor = self.connection.cursor()
cursor = cursor.execute('''INSERT INTO TweetsDB (TIMESTAMP,USER,LANGUAGE,LOCATION,CONTENT)
VALUES (?,?,?,?,?);''', (field["created_at"], field["screen_name"], field["language"], field["location"],field['text']))
connection.commit()
self.tweet_count += 1
# return false when reached tweet limit
return self.tweet_count < self.TWEET_LIMIT
def on_limit(self, track):
print("Reached rate limit.")
def on_error(self, status_code):
print(f"Error with status code: {status_code}")
return False
def create_connection(db_file):
""" create a database connection to a SQLite database """
conn = None
try:
conn = sqlite3.connect(db_file)
db_name = db_file.split("\\")[-1]
print(f"Connected to {db_name}")
except Error as e:
print(e)
return conn
def authenticate():
""" A function that authenticate the user and returns tweepy API """
#keys.py is a document where API keys are stored.
auth = tweepy.OAuthHandler(keys.consumer_key, keys.consumer_secret)
auth.set_access_token(keys.access_token, keys.access_token_secret)
api = tweepy.API(auth,wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
return api
connection = create_connection(r"C:\Users\Aykut Caner\Desktop\CoronaProject\CloneDB")
api = authenticate()
tweet_listener = TweetListener(api,connection, limit=10)
stream = tweepy.Stream(api.auth,tweet_listener)
try:
stream.filter(track=['corona','COVID-19','covid','coronavirus'],languages=['tr','de','nl','fr','es','it','pt'], is_async=False)
except Exception as e:
# If streaming stops, make a sound.
print(e)
frequency = 2500 # Set Frequency To 2500 Hertz
duration = 1000 # Set Duration To 1000 ms == 1 second
winsound.Beep(frequency, duration)
print(connection.total_changes)
#close database connection by printing total number of changes
print(connection.total_changes)
connection.close()