-
Notifications
You must be signed in to change notification settings - Fork 9
/
query_and_filter.py
863 lines (666 loc) · 29.8 KB
/
query_and_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
If executed directly, runs get_filtered_tweets(). Tweet ids until which the
paginated querying moves backwards in time along the Twitter timelines can be
passed via {cutoff_ids}. After a filtering stage a merged dictionary of unique
tweets is returned, containing:
1) every new mention of {target_user_id} since last execution
2) every new quote tweet of tweets by {target_user_id} since last execution
Filtering: Any tweet will be dropped if a regex pattern from {filter_patterns}
matches within tweet["text"]. If any query function is called directly, stopping
parameters can be passed using {add_params}. Examples:
add_params["since_id"] = "<tweet id>"
add_params["end_time"] = "2023-01-10T00:00:00.000Z"
add_params["start_time"] = "2023-02-01T00:00:00.000Z"
"""
import os
import inspect
import requests
import re
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
from ast import literal_eval
from os.path import exists
from pprint import pp, pformat
from copy import deepcopy
from time import sleep
from dotenv import load_dotenv
from helpers import *
load_dotenv('./.env')
target_user_id = os.environ.get("TWITTER_USER_ID")
bearer_token = os.environ.get("API_BEARER_TOKEN")
N_TWEETS_QUERIED = 0
MEDIA_TWEETS = {} # used as temporary storage for scraping media tags from a tweet on Twitter
# Any filtered-out tweets go here for checking if filters work correctly
discarded_path = "./discarded_tweets.json"
# Regex filter patterns. Any tweet where a pattern matches the tweet text gets dropped.
filter_patterns = [
#{
#"name": "more_than_5_mentions",
#"pattern" : r"@\w+.?\s.*@\w+.?\s.*@\w+.?\s.*@\w+.?\s.*@\w+.?\s.*@\w+",
#"flag": "dotall"
#},
{
"name": "red_flag",
"pattern" : r"airdrop",
"flag": "ignorecase"
},
{
"name": "retweets",
"pattern" : r"^RT",
"flag": None
}
]
def bearer_oauth(r) -> dict:
"""Method required by bearer token authentication."""
r.headers["Authorization"] = f"Bearer {bearer_token}"
return r
def get_query_params() -> dict:
"""Tweet information returned by api is defined here."""
params = {
"tweet.fields": "created_at,public_metrics,in_reply_to_user_id,note_tweet," + \
"referenced_tweets,conversation_id,entities",
"user.fields": "id,username,entities,public_metrics",
"expansions": "author_id,in_reply_to_user_id,attachments.media_keys",
"media.fields": "media_key",
"max_results": "100"
}
return params
def connect_to_endpoint(url, params, bearer_token) -> tuple:
"""Wrapper for Twitter API queries. Returns response & status code."""
response = requests.request("GET", url, auth=bearer_oauth, params=params)
print(response.status_code)
handled_quietly = {200, 429}
if response.status_code not in handled_quietly:
raise Exception(
"Request returned an error: {} {}".format(
response.status_code, response.text
)
)
return (response.json(), response.status_code)
def merge_user_data(tweets_list, users_list) -> list:
"""
Helper function needed while querying the Twitter API.
Takes the ["data"] and ["includes"]["users"] lists from the json_response,
adds user parameters to their respective tweets matching "author_id" & "id"
"""
out_list = []
users_dict = {u["id"]: u for u in users_list}
# Iterate over all tweets & copy over their user information from users_list
for t in tweets_list:
user_id = t["author_id"]
u = users_dict[user_id]
t["username"] = u["username"]
t["followers_count"] = u["public_metrics"]["followers_count"]
t["following_count"] = u["public_metrics"]["following_count"]
t["tweet_count"] = u["public_metrics"]["tweet_count"]
t["listed_count"] = u["public_metrics"]["listed_count"]
out_list.append(t)
return out_list
def simple_query(url, params, bearer_token, infinite=False) -> list:
"""
Queries Twitter API as specified in {url} & {params}.
Returns tuple (list_of_tweets, response_status_code).
Tweets causing Authorization or Not Found Error are dropped.
"""
global N_TWEETS_QUERIED
json_response, status_code = connect_to_endpoint(url, params, bearer_token)
if status_code == 429:
print("Rate limit reached (429: Too many requests). Waiting for 16m to continue querying.")
sleep(16*60)
json_response, status_code = connect_to_endpoint(url, params, bearer_token)
if "errors" in json_response:
[print(x["title"] + ":", x["detail"]) for x in json_response["errors"]]
if "data" not in json_response:
return ([], status_code)
tweets = json_response["data"]
N_TWEETS_QUERIED += len(tweets)
users = json_response["includes"]["users"]
merged = merge_user_data(tweets, users)
return (merged, status_code)
def paginated_query(url, params, bearer_token, infinite=False) -> list:
"""
Queries pagewise for max results until last page. Returns list of tweets
and most recent query status code. Will abort if no end_trigger is set,
unless "infinite" is set to True.
"""
global N_TWEETS_QUERIED
if not infinite:
assert ("since_id" or "start_time" in params), ("No end for querying defined. Will query until rate limit reached!")
tweets_list = []
users_list = []
# First query. If no results & no error -> Return emtpy list
json_response, status_code = connect_to_endpoint(url, params, bearer_token)
# If rate limit reached (TooManyRequests) -> wait for 16m and continue querying
if status_code == 429:
print("Rate limit reached (429: Too many requests). Waiting for 16m to continue querying.")
sleep(16*60)
json_response, status_code = connect_to_endpoint(url, params, bearer_token)
# If end of data reached (last page) -> abort here & return emtpy list
meta = json_response["meta"]
if "data" not in json_response:
return ([], status_code)
# Else continue querying until last (=oldest) page reached
tweets = json_response["data"]
users = json_response["includes"]["users"]
tweets_list.extend(tweets)
users_list.extend(users)
N_TWEETS_QUERIED += len(tweets)
# Query for a next page as long as there is one & API rate limit is not exceeded
while ("next_token" in meta) and status_code != 429:
params["pagination_token"] = meta["next_token"]
json_response, status_code = connect_to_endpoint(url, params, bearer_token)
meta = json_response["meta"]
if "data" in json_response:
tweets = json_response["data"]
users = json_response["includes"]["users"]
tweets_list.extend(tweets)
users_list.extend(users)
N_TWEETS_QUERIED += len(tweets)
# Add user data back to original tweets
out_list = merge_user_data(tweets_list, users_list)
return (out_list, status_code)
def parse_date_range(tweets: list) -> str:
"""Takes a tweets list, returns a str of the earliest & latest tweet date."""
dates = sorted([t["created_at"] for t in tweets])
stripped = [d[:d.find('T')] for d in dates]
earliest = stripped[0]
latest = stripped[-1]
return f"{earliest} - {latest}"
def get_cutoffs(csv_path) -> dict:
"""
Loads DataFrame from {csv_path}. Searches through column "source".
Returns a dictionary of type {func_1: "<highest tweet id>", ...}
"""
cutoff_d = {}
js_tweet_ids = set()
# Load df
df = csv_to_df(csv_path)
# Get most recent mention, skip if none found
mentions = df[df["source"] == "get_new_mentions()"]["id"].tolist()
if mentions != []:
cutoff_d["get_new_mentions()"] = max(mentions)
# Get ids of quoted JediSwap tweets
quoted_referenced_tweets = df[df["source"] == "get_quotes_for_tweet()"]["referenced_tweets"]
for l in quoted_referenced_tweets:
referenced_tweets_list = literal_eval(l)
for t in referenced_tweets_list:
if t["type"] == "quoted":
js_tweet_ids.add(t["id"])
# Add most recent id of quoted tweets, skip if none found.
if js_tweet_ids != set():
cutoff_d["get_quotes_for_tweet()"] = max(js_tweet_ids)
return cutoff_d
def tweets_to_json(tweets: list, name: str) -> None:
"""Saves a tweets list to json. Appends its date range to name."""
if tweets == []:
return f"EMPTY {name}.json"
date_range = parse_date_range(tweets)
out_name = f"{date_range} unfiltered {name}.json"
write_list_to_json(tweets, out_name)
def query_tweets(url, params, bearer_token) -> list:
"""
Queries for multiple (max 100) tweets. Merges user & tweet data.
Returns list of tweets and most recent query status code.
"""
global N_TWEETS_QUERIED
tweets_list = []
users_list = []
# Query. If no results & no error -> Return emtpy list
json_response, status_code = connect_to_endpoint(url, params, bearer_token)
# If rate limit reached (TooManyRequests) -> wait for 16m and continue querying
if status_code == 429:
print("Rate limit reached (429: Too many requests). Waiting for 16m to continue querying.")
sleep(16*60)
json_response, status_code = connect_to_endpoint(url, params, bearer_token)
# If nothing found -> abort here & return emtpy list
if "data" not in json_response:
return ([], status_code)
# Merge tweet data with corresponding user data
tweets = json_response["data"]
users = json_response["includes"]["users"]
tweets_list.extend(tweets)
users_list.extend(users)
N_TWEETS_QUERIED += len(tweets)
# Add user data back to original tweets
out_list = merge_user_data(tweets_list, users_list)
return (out_list, status_code)
def get_tweets(id_list, bearer_token, add_params=None) -> list:
"""
Assumes list of tweet ids.
Queries Twitter API in chunks of 100 tweets per query (maximum).
Returns list of tweet dictionaries.
"""
def chunk_list(_list, n):
for i in range(0, len(_list), n):
yield _list[i:i+n]
out_tweets = []
tweets_per_query = 100
id_chunk = list(chunk_list(id_list, tweets_per_query))
params = get_query_params()
if add_params:
params.update(add_params)
del params["max_results"]
# Query 100 tweets at a time
for ids in id_chunk:
id_str = "ids=" + ",".join(ids)
url = "https://api.twitter.com/2/tweets?{}".format(id_str)
tweets, status_code = query_tweets(url, params, bearer_token)
out_tweets.extend(tweets)
if status_code == 429:
print("Rate limit reached (429: Too many requests). Waiting for 16m to continue querying.")
sleep(16*60)
tweets, status_code = query_tweets(url, params, bearer_token)
out_tweets.extend(tweets)
if out_tweets == []:
return []
# De-truncate tweets longer than 140 chars
out_tweets = de_truncate(out_tweets)
# Add function name to tweets & save queried data to json as backup
func_name = str(inspect.currentframe().f_code.co_name + "()")
[x.update({"source": func_name}) for x in out_tweets]
tweets_to_json(out_tweets, func_name)
return out_tweets
def get_new_mentions(user_id, bearer_token, add_params=None) -> list:
"""
Queries mentions timeline of Twitter user until tweet id from
{last_queried_path} encountered. Returns list of all tweets newer
than that id. Updates this tweet id with newest id from this query.
"""
# Define query parameters
url = "https://api.twitter.com/2/users/{}/mentions".format(user_id)
params = get_query_params()
# Add any additional query parameters from {add_params} dictionary
if add_params:
params.update(add_params)
# Query for tweets. Skip rest if no results or rate limit reached.
new_mentions, status_code = paginated_query(url, params, bearer_token)
if status_code == 429:
print(f"Api rate limit reached. Waiting for 16m to get new mentions for user {user_id}.")
sleep(16*60)
new_mentions, status_code = paginated_query(url, params, bearer_token)
if new_mentions == []:
return []
# Add source attribute to tweets to trace potential bugs back to origin
func_name = str(inspect.currentframe().f_code.co_name + "()")
[x.update({"source": func_name}) for x in new_mentions]
# Save queried data to json as backup
tweets_to_json(new_mentions, func_name)
return new_mentions
def get_new_tweets_by_user(user_id, bearer_token, add_params=None) -> list:
"""
Queries tweets timeline of Twitter user until tweet id from
{last_queried_path} encountered. Returns list of all tweets newer
than that id. Updates this tweet id in the end. Retweets are filtered out.
"""
# Define query parameters
url = "https://api.twitter.com/2/users/{}/tweets".format(user_id)
params = get_query_params()
# Add any additional query parameters from {add_params} dictionary
if add_params:
params.update(add_params)
# Query for tweets. Skip rest if no results
new_tweets, status_code = paginated_query(url, params, bearer_token)
if status_code == 429:
print(f"Api rate limit reached. Stopped querying for tweets by user {user_id}.")
sleep(16*60)
new_tweets, status_code = paginated_query(url, params, bearer_token)
if new_tweets == []:
return []
# Filter out retweets
new_tweets = [t for t in new_tweets if not t["text"].startswith("RT")]
# Add source attribute to tweets to trace potential bugs back to origin
func_name = str(inspect.currentframe().f_code.co_name + "()")
[x.update({"source": func_name}) for x in new_tweets]
# Save queried data to json as backup
tweets_to_json(new_tweets, func_name)
return new_tweets
def get_quotes_for_tweet(tweet_id, bearer_token) -> tuple:
"""Queries API for all quote tweets of {tweet_id}."""
# Define query parameters & query for tweets. Skip rest if no results
url = "https://api.twitter.com/2/tweets/{}/quote_tweets".format(tweet_id)
params = get_query_params()
quotes, status_code = paginated_query(url, params, bearer_token, infinite=True)
if status_code == 429:
print(f"Api rate limit reached while querying quote tweets of tweet {tweet_id}.")
print("Waiting for 16m and continuing to query after.")
sleep(16*60)
quotes, status_code = paginated_query(url, params, bearer_token, infinite=True)
if quotes == []:
return ([], status_code)
# Add source attribute to tweets to trace potential bugs back to origin
func_name = str(inspect.currentframe().f_code.co_name + "()")
[x.update({"source": func_name}) for x in quotes]
return (quotes, status_code)
def get_new_quote_tweets(user_id, bearer_token, add_params=None) -> list:
"""
Queries API for all JediSwap tweets since the tweet id stored in the
json file in {last_queried_path}. Discards retweets, iterates through
results & returns all quote tweets for these tweets.
Updates json from {last_queried_path} with new most recent JediSwap tweet id.
"""
new_quotes = []
new_jediswap_tweets = get_new_tweets_by_user(
user_id,
bearer_token,
add_params=add_params
)
tweet_ids = [t["id"] for t in new_jediswap_tweets]
print(f"In get_new_quote_tweets(): Getting quotes for {len(tweet_ids)} tweets...")
# Get quotes of each new tweet
for t_id in tweet_ids:
quotes, status_code = get_quotes_for_tweet(t_id, bearer_token)
# If api limit reached -> Abort & return what was fetched so far
if status_code == 429:
print(f"Rate limit reached. Waiting for 16m. Paused fetching quote tweets at tweet {t_id}.")
sleep(16*60)
quotes, status_code = get_quotes_for_tweet(t_id, bearer_token)
new_quotes.extend(quotes)
if new_quotes != []:
# Save queried data to json as backup
func_name = str(inspect.currentframe().f_code.co_name + "()")
tweets_to_json(new_quotes, func_name)
return new_quotes
def remove_if_regex_matches(tweets, regex_p, discarded_json_path, discarded_key, regex_flag=None) -> list:
"""
Takes a list of tweets. Discards where {regex_pattern} matches in tweet["text"].
Saves/overwrites all discarded tweets to {discarded_json_path}, according to the
{discarded_key} specified.
"""
discarded = []
out_tweets = []
flags = 0
if regex_flag == "multiline":
flags = re.M
if regex_flag == "dotall":
flags |= re.S
if regex_flag == "verbose":
flags |= re.X
if regex_flag == "ignorecase":
flags |= re.I
if regex_flag == "uni_code":
flags |= re.U
# Drop each tweet where regex matches
for t in tweets:
if re.search(regex_p, t["text"], flags=flags):
discarded.append(t)
else:
out_tweets.append(t)
# Save discarded tweets to json (for debugging)
out_d = read_from_json(discarded_json_path)
out_d[discarded_key] = discarded
write_to_json(out_d, discarded_json_path)
# Append discarded tweets to csv (for keeping track of filters)
csv_path = discarded_json_path.replace(".json", f"_{discarded_key}.csv")
if discarded != []:
include = ["id", "text", "created_at", "username", "author_id"]
new_data = pd.DataFrame(discarded)[include]
if exists(csv_path):
known_data = csv_to_df(csv_path)
new_data = pd.concat([known_data, new_data]) \
.drop_duplicates("id") \
.sort_values("id")
df_to_csv(new_data, csv_path)
#new_data.to_csv(csv_path, index=False, header=not exists(csv_path))
return out_tweets
def apply_filters(tweets, filters, discarded_json_path) -> list:
"""
Takes a list of tweets. Returns the same list with all tweets removed where a regex pattern from
{filter_patterns} matches in tweet["text"]. Discarded tweets are stored in json file, ordered by
filter name.
"""
# Wipe old json file
write_to_json(dict(), discarded_json_path)
# Apply regex filters iteratively
for f in filters:
tweets = remove_if_regex_matches(
tweets,
regex_p=f["pattern"],
discarded_json_path=discarded_json_path,
discarded_key=f["name"],
regex_flag=f["flag"]
)
return tweets
def scrape_image_tags(tweet_dict) -> list:
"""
Scrapes all Twitter accounts tagged in an image within a single tweet and returns them as a list.
This needs to be scraped from web since it's not supported via official Twitter API 2.0.
"""
tweet_id = tweet_dict["id"]
username = tweet_dict["username"]
print(f"Scraping image tags for tweet {tweet_id}...")
# Construct url
target_url = 'https://twitter.com/' + username + '/status/' + str(tweet_id) + '/media_tags'
# Scrape Twitter frontend web content for this tweet
options = Options()
options.add_argument("--user-data-dir=ChromeUserData")
options.add_argument("--remote-debugging-port=9222")
options.page_load_strategy = 'normal'
driver = webdriver.Chrome(options=options)
try:
driver.get(target_url)
sleep(80)
resp = driver.page_source
soup = BeautifulSoup(resp, 'html.parser')
# Parse users from frontend modal to list
modal = soup.find("div", {"role":"group"})
users = modal.find_all("div",{"class":"css-1rynq56 r-dnmrzs r-1udh08x r-3s2u2q r-bcqeeo r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-18u37iz r-1wvb978"})
tagged_users_list = [x.text.replace('@', '') for x in users]
print(f"Done. Got these tagged users: {tagged_users_list}")
except:
tagged_users_list = []
print(f"Couldn't scrape {target_url}. Returned empty list of users tagged in media.")
finally:
driver.close()
return tagged_users_list
def de_truncate(tweets_list) -> list:
for t in tweets_list:
# Reconstruct original tweet text
t["text"] = t["text"] if "note_tweet" not in t else t["note_tweet"]["text"]
# Reconstruct mentions & urls from the original tweet
if "note_tweet" in t:
if "entities" in t["note_tweet"]:
if "mentions" in t["note_tweet"]["entities"]:
if "entities" in t:
t["entities"]["mentions"] = t["note_tweet"]["entities"]["mentions"]
else:
t["entities"] = {}
t["entities"]["mentions"] = t["note_tweet"]["entities"]["mentions"]
if "urls" in t["note_tweet"]["entities"]:
if "entities" in t:
t["entities"]["urls"] = t["note_tweet"]["entities"]["urls"]
else:
t["entities"] = {}
t["entities"]["urls"] = t["note_tweet"]["entities"]["urls"]
else:
continue
return tweets_list
def discount_mentions(tweets_dict) -> dict:
"""
Tweets fetched from the mentions timeline might not mention JediSwap at all, but
instead "inherit" some or all mentions from tweets higher up in the conversation thread.
For reply tweets, this method subtracts mentions that have been present in the tweet
that's been replied to. For replies to JediSwap, the mention is discounted in any case.
"""
global MEDIA_TWEETS
discarded = []
reply_ids = set()
def is_reply(tweet_dict) -> bool:
if "referenced_tweets" in tweet_dict:
ref_types = {x["type"] for x in tweet_dict["referenced_tweets"]}
if "replied_to" in ref_types:
return True
return False
def is_reply_to_jediswap(tweet_dict) -> bool:
if "in_reply_to_user_id" in tweet_dict:
if tweet_dict["in_reply_to_user_id"] == "1470315931142393857":
return True
return False
def contains_media(tweet_dict) -> bool:
# Returns True if tweet contains media, False if not
if 'entities' in tweet_dict:
if 'urls' in tweet_dict['entities']:
urls = tweet_dict['entities']['urls']
if any('media_key' in x for x in urls):
return True
return False
def is_quote(tweet_dict) -> bool:
if "referenced_tweets" in tweet_dict:
ref_types = {x["type"] for x in tweet_dict["referenced_tweets"]}
if "quoted" in ref_types:
return True
return False
def get_reply_id(tweet_dict) -> str:
for ref in tweet_dict["referenced_tweets"]:
if ref["type"] == "replied_to":
return ref["id"]
def get_mentions(tweet_dict) -> list:
"""Returns a [potentially empty] list of all usernames mentioned in the tweet."""
if "entities" in tweet_dict:
if "mentions" in tweet_dict["entities"]:
mentions = tweet_dict["entities"]["mentions"]
usernames = {x["username"] for x in mentions}
return list(usernames)
return []
def remove_leading_mentions_from_text(tweet_dict) -> dict:
"""Takes one tweet at a time & removes all leading mentions from the tweet text."""
text = tweet_dict["text"]
no_space_char = None
while text.startswith("@") and not no_space_char:
newline_index = text.find("\n")
space_index = text.find(" ")
no_space_char = (newline_index == -1) and (space_index == -1)
if newline_index == -1:
text = text[space_index+1:]
elif space_index == -1:
text = text[newline_index+1:]
else:
first_trigger = min(space_index, newline_index)
text = text[first_trigger+1:]
tweet_dict["text"] = text
return tweet_dict
# Trim all leading mentions from all tweets' text attributes
out_dict = {k: remove_leading_mentions_from_text(v) for k, v in tweets_dict.items()}
# Collect ids of all tweets being replied to
for t in tweets_dict.values():
if is_reply(t):
parent_tweet_id = get_reply_id(t)
reply_ids.add(parent_tweet_id)
# Query tweet data & create dict for these "parent tweets"
tweets_list = get_tweets(list(reply_ids), bearer_token)
parent_tweets = {t["id"]: t for t in tweets_list}
# Discount mentions inherited from other tweets & drop conditionally from data
for _id, t in tweets_dict.items():
if is_quote(t):
mentions = get_mentions(t)
out_dict[_id]["discounted_mentions"] = mentions
continue
if is_reply_to_jediswap(t):
t["comment"] = "Tweet is a reply to a JediSwap tweet."
discarded.append(t)
del out_dict[_id]
continue
# Remove mentions that have been inherited from parent tweets or parent photo tags
if is_reply(t):
parent_id = get_reply_id(t)
mentions = get_mentions(t)
# Get mentions of parent tweet
if parent_id in parent_tweets:
parent_tweet_dict = parent_tweets[parent_id]
parent_mentions = get_mentions(parent_tweet_dict)
# Get photo tags of parent tweet and add to mentions
if contains_media(parent_tweet_dict):
if parent_id in MEDIA_TWEETS:
tagged_users_list = MEDIA_TWEETS[parent_id]["tagged_users_list"]
else:
tagged_users_list = scrape_image_tags(parent_tweet_dict)
parent_tweet_dict["tagged_users_list"] = tagged_users_list
MEDIA_TWEETS[parent_id] = parent_tweet_dict
parent_mentions.extend(tagged_users_list)
# Case: Tweet is reply to deleted tweet: Nothing to remove
else:
parent_mentions = []
# Keep only the difference of the mentions sets
discounted_mentions = list(set(mentions)^set(parent_mentions))
if "JediSwap" not in discounted_mentions:
t["comment"] = "Inherited JediSwap mention from other tweet in conversation."
discarded.append(t)
del out_dict[_id]
continue
else:
out_dict[_id]["discounted_mentions"] = discounted_mentions
else:
mentions = get_mentions(t)
out_dict[_id]["discounted_mentions"] = mentions
if "JediSwap" not in mentions:
t["comment"] = "No JediSwap mention found. Not a quote tweet either."
discarded.append(t)
del out_dict[_id]
# Append discarded tweets to csv (to keep track of all filtered out tweets)
csv_path = "not_mentioning_jediswap.csv"
if discarded != []:
include = ["id", "text", "comment", "created_at", "username", "author_id", "source"]
new_data = pd.DataFrame(discarded)[include]
if exists(csv_path):
known_data = csv_to_df(csv_path)
new_data = pd.concat([known_data, new_data]) \
.drop_duplicates("id") \
.sort_values("id")
df_to_csv(new_data, csv_path)
print(f"Sorted out {len(discarded)} tweets not actually mentioning jediswap. See {csv_path}.")
return out_dict
def get_filtered_tweets(cutoff_ids=None, add_params=None) -> dict:
"""
Main wrapper function. Calls query functions, applies filtering, returns dictionary
of filtered tweets. Queries backwards in time. End triggers can be defined in
{add_params} (all queries) or {cutoff_ids} (per function). Examples:
cutoff_ids = {"get_new_mentions()": "<tweet id>", "get_new_quotes()":<other tweet id>"}
add_params = {"start_time" = "2023-03-01T00:00:00.000Z"}
"""
obvious_print("Fetching new tweets...")
if cutoff_ids:
print(f"Querying until tweet ids:")
[print(f"{k}\t{v}") for k, v in cutoff_ids.items()]
elif add_params:
print("Querying using these constraints:")
[print(f"{k}\t{v}") for k, v in add_params.items()]
else:
input("Querying until rate limit reached per function. Continue?")
new_mentions_params = {}
new_quotes_params = {}
# Add query parameters from {add_params} (preferred) or {cutoff_ids}.
if add_params:
new_mentions_params.update(add_params)
new_quotes_params.update(add_params)
elif cutoff_ids:
if "get_new_mentions()" in cutoff_ids:
new_mentions_params["since_id"] = cutoff_ids["get_new_mentions()"]
if "get_quotes_for_tweet()" in cutoff_ids:
new_quotes_params["since_id"] = cutoff_ids["get_quotes_for_tweet()"]
else:
pass
# Fetch all mentions new since last execution of script
new_mentions = get_new_mentions(
target_user_id,
bearer_token,
add_params=new_mentions_params
)
# Fetch all mentions new since last execution of script
new_quotes = get_new_quote_tweets(
target_user_id,
bearer_token,
add_params=new_quotes_params
)
# Merge to one list & keep only 1 entry per tweet id
tweets = merge_unique([new_mentions, new_quotes], unique_att="id")
# De-truncate tweets longer than 140 chars
tweets = de_truncate(tweets)
# Apply regex filters
filtered_tweets = apply_filters(tweets, filter_patterns, discarded_path)
# Convert to dictionary
out_d = {t["id"]: t for t in filtered_tweets}
return out_d
if __name__ == "__main__":
get_filtered_tweets()