-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathextract-twitter-users-hashtags.py
73 lines (60 loc) · 2.6 KB
/
extract-twitter-users-hashtags.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# For usage and details, see http://www.gnu.org/licenses/gpl-3.0.txt
# AUTHOR:
#
# matteo DOT redaelli AT gmail DOT com
# http://www.redaelli.org/matteo
#
#
# USAGE:
#
# spark-submit --master yarn-client --driver-class-path /path/to/spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.5.2.jar extractTweetsStats.py --source "/user/r/staging/twitter/searches/tyre/2014/12/*.gz" --target /tmp/tests/15
import json
import re
import sys
import time
import itertools
import os, argparse
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == "__main__":
## parsing command line parameters
parser = argparse.ArgumentParser()
parser.add_argument("--source", help="source path")
parser.add_argument("--target", help="target path")
parser.add_argument("--min_occurs", help="min occurences", default=3)
args = parser.parse_args()
## connecting to hdfs data
source_path = args.source
min_occurs = args.min_occurs
if min_occurs <= 0:
min_occurs = 3
sc = SparkContext(appName="extraxt-user-hashtags.py")
sqlContext = SQLContext(sc)
tweets = sqlContext.read.json(source_path)
#tweets = sqlContext.read.json("/user/r/staging/twitter/searches/bigdata/2015/04/01.gz")
tweets.registerTempTable("tweets")
t = sqlContext.sql("SELECT distinct id,user.screenName, hashtagEntities FROM tweets")
related_hashtags= t.map(lambda t: map(lambda t0: (t[1], t0[2].lower()), t[2])) \
.flatMap(lambda t: t) \
.map(lambda x: '\t'.join(unicode(i) for i in x)) \
.map(lambda t: (t, 1)) \
.reduceByKey(lambda x,y: x+y) \
.filter(lambda t: t[1]>=min_occurs) \
.sortByKey(False) \
.map(lambda x: '\t'.join(unicode(i) for i in x)) \
.repartition(1)
## save stats from tweets to hdfs
related_hashtags.saveAsTextFile("%s/%s" % (args.target, "users_hashtags"))