-
Notifications
You must be signed in to change notification settings - Fork 5
/
cassandra_enron.pig
73 lines (59 loc) · 3.2 KB
/
cassandra_enron.pig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/* CassandraStorage and utilities */
register /me/Software/apache-cassandra-1.1.5-src/build/apache-cassandra*.jar
register /me/Software/apache-cassandra-1.1.5-src/lib/*.jar
register /me/Software/apache-cassandra-1.1.5-src/build/lib/jars/*.jar /* */
register /me/Software/pygmalion/udf/target/pygmalion-1.1.0-SNAPSHOT.jar
define CassandraStorage org.apache.cassandra.hadoop.pig.CassandraStorage();
define FromCassandraBag org.pygmalion.udf.FromCassandraBag();
define ToCassandraBag org.pygmalion.udf.ToCassandraBag();
/* AvroStorage */
register /me/Software/pig/build/ivy/lib/Pig/avro-1.5.3.jar
register /me/Software/pig/build/ivy/lib/Pig/json-simple-1.1.jar
register /me/Software/pig/contrib/piggybank/java/piggybank.jar
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
emails = load '/me/Data/enron.avro' using AvroStorage();
emails = filter emails by message_id is not null;
/* Limit to 1,000 documents for local mode, or go bake a cake in the meanwhile */
emails = limit emails 100;
id_body = foreach emails generate message_id, body;
define test_stream `token_extractor.py` SHIP ('token_extractor.py');
cleaned_words = stream id_body through test_stream as (message_id:chararray, token_strings:chararray);
token_records = foreach cleaned_words generate message_id, FLATTEN(TOKENIZE(token_strings)) as tokens;
/* Calculate the term count per document */
doc_word_totals = foreach (group token_records by (message_id, tokens)) generate
flatten(group) as (message_id, token),
COUNT_STAR(token_records) as doc_total;
/* Calculate the document size */
pre_term_counts = foreach (group doc_word_totals by message_id) generate
group AS message_id,
FLATTEN(doc_word_totals.(token, doc_total)) as (token, doc_total),
SUM(doc_word_totals.doc_total) as doc_size;
/* Calculate the TF */
term_freqs = foreach pre_term_counts generate message_id as message_id,
token as token,
((double)doc_total / (double)doc_size) AS term_freq;
/* Get count of documents using each token, for idf */
token_usages = foreach (group term_freqs by token) generate
FLATTEN(term_freqs) as (message_id, token, term_freq),
COUNT_STAR(term_freqs) as num_docs_with_token;
/* Get document count */
just_ids = foreach emails generate message_id;
ndocs = foreach (group just_ids all) generate COUNT_STAR(just_ids) as total_docs;
/* Note the use of Pig Scalars to calculate idf */
tfidf_all = foreach token_usages {
idf = LOG((double)ndocs.total_docs/(double)num_docs_with_token);
tf_idf = (double)term_freq * idf;
generate message_id as message_id,
token as score,
(chararray)tf_idf as value:chararray;
};
/* Get the top 10 Tf*Idf scores per message */
per_message_cassandra = foreach (group tfidf_all by message_id) {
sorted = order tfidf_all by value desc;
top_10_topics = limit sorted 10;
generate group, top_10_topics.(score, value);
}
store per_message_cassandra into 'cassandra://enron/email_topics' USING CassandraStorage();
/* This will give you some message_id keys to fetch in Cassandra, and some message bodies to compare topics to. */
samples = limit just_ids 10;
dump samples;