Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
oatalabi committed Dec 7, 2018
1 parent 759a2c9 commit 878aff6
Show file tree
Hide file tree
Showing 3,367 changed files with 52,058 additions and 267,551 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ to perform graph analysis using PySpark graphframes.

To run analysis on the Spark shell with the GraphFrames package, specify the below optional arguments (using facebook, twitter and google, and max of 10,000 items as as an example):

`$ spark-submit --packages graphframes:graphframes:0.6.0-spark2.3-s_2.11 analysis.py --dir bootstrap/spark-warehouse/6warcs --focus twitter google facebook`
`$ spark-submit --packages graphframes:graphframes:0.6.0-spark2.3-s_2.11 analysis.py
--inputs './bootstrap/spark-warehouse/may/may*' --outputs may --path news --focus cnn foxnews huffingtonpost washingtonpost nytimes usatoday`

To plot community clusters after analysis"

`$ python3 plot_communities.py`
`$ python3 plot_communities.py --outputs may --path news `

To view communities detected on a browser:

Expand Down
83 changes: 29 additions & 54 deletions analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,17 @@
assert spark.version >= '2.3' # make sure we have Spark 2.3+
sqlContext = SQLContext(sc)

months = ['may', 'oct']

def read_parqet(month):
def read_parqet(inputs, focus):
# Read in parquet files
data = sqlContext.read.parquet('./bootstrap/spark-warehouse/'+month+'/'+month+'*')
focus = ['etsy', 'ebay', 'amazon']
# Select focus shopping domains
specified_domain = data.filter(data.childDomain.isin(focus))

return specified_domain
data = sqlContext.read.parquet(inputs)
# Select focus domains
if focus:
data = data.filter(data.childDomain.isin(focus))
return data

def hashnode(x):
# Return a unique hashkey for each entry
return hashlib.sha1(x.encode("UTF-8")).hexdigest()[:8]

def renameFile(directory, newname, extension):
# Rename filename as spark partitions file
for root, dirs_list, files_list in os.walk(os.getcwd()+"/"+directory):
for file_name in files_list:
if os.path.splitext(file_name)[-1] == extension:
old_file = os.path.join(directory, file_name)
new_file = os.path.join(directory, newname)
os.rename(old_file,new_file)
return hashlib.sha1(x.encode("UTF-8")).hexdigest()[:8]

def get_edge_vertices(df):
hashnode_udf = functions.udf(hashnode)
Expand All @@ -52,60 +40,47 @@ def get_edge_vertices(df):
edges = df.select("parentTLD","childTLD")\
.withColumn("src", hashnode_udf("parentTLD"))\
.withColumn("dst", hashnode_udf("childTLD"))\
.select("src","dst")
.select("src","dst").distinct()
return vertices, edges

def create_graph(vertices, edges, month):
def create_graph(vertices, edges, outputs, path):
# create GraphFrame
graph = GraphFrame(vertices, edges)
links = edges.coalesce(1).write.csv('links-'+month, mode='overwrite')
renameFile('links-'+month, 'links-'+month+'.csv', '.csv')
links = edges.write.csv(path+'/links-'+outputs, mode='overwrite')
return graph

def run_LPA(graph,month, maxiter=5):
def run_LPA(graph, outputs, path, maxiter=5):
# Run LPA
communities = graph.labelPropagation(maxIter=maxiter)
communityCount = communities.select('label').distinct().count()

labelCount = communities.groupBy("label").count().sort(desc("count")).limit(10)
topLabels = labelCount.select('label').collect()
top10List = [str(i.label) for i in topLabels]
top10Communitites = communities.filter(communities.label.isin(top10List))

top10Communitites.coalesce(1).write.csv('top10-'+month, mode='overwrite')
renameFile('top10-'+month, 'top10-'+month+'.csv', '.csv')

communities.coalesce(1).write.csv('communities-'+month, mode='overwrite')
renameFile('communities-'+month, 'communities-'+month+'.csv', '.csv')
communities.write.csv(path+'/communities-'+outputs, mode='overwrite')
return communities, communityCount

def run_pagerank(graph, communities, month, maxiter=10):
def run_pagerank(graph, communities, outputs, path, maxiter=10):
# Run PageRank
pageRank = graph.pageRank(resetProbability=0.15, maxIter=maxiter)
# Organize communities based on page rankings and weights
topTenRankings = pageRank.vertices.select("id", "pagerank").orderBy("pagerank", ascending=False).limit(10)
topTenRankings = functions.broadcast(topTenRankings)
getRankingInfo = communities.join(topTenRankings, communities.id == topTenRankings.id).drop(topTenRankings.id).orderBy("pagerank", ascending=False)
getRankingInfo.coalesce(1).write.csv('rankings-'+month, mode='overwrite', header='true')
renameFile('rankings-'+month, 'rankings-'+month+'.csv', '.csv')

weightedRelationship = pageRank.edges.select("src", "dst", "weight").distinct().orderBy("weight", ascending=False).limit(10)

getRankingInfo.write.csv(path+'/rankings-'+outputs, mode='overwrite')
return pageRank

def main():
def main(inputs, outputs, path, focus):
# Get Edges and Vertices
for m in months:
df = read_parqet(m).cache()
vertices, edges = get_edge_vertices(df)
# Create GraphFrame
graph = create_graph(vertices, edges, m).cache()
# Analysis
communities, count = run_LPA(graph, m, maxiter=5)
communitiesNodeRanking = run_pagerank(graph, communities, m, maxiter=10)
df = read_parqet(inputs, focus).cache()
vertices, edges = get_edge_vertices(df)
# Create GraphFrame
graph = create_graph(vertices, edges, outputs, path).cache()
# Analysis
communities, count = run_LPA(graph, outputs, path, maxiter=5)
communitiesNodeRanking = run_pagerank(graph, communities, outputs, path, maxiter=10)

if __name__ == '__main__':
# parser = argparse.ArgumentParser(description='Perform graph analysis on CommonCrawl data')
# parser.add_argument('-d', '--dir', type=str, nargs='?', default='', help='Input path to parquet files')
# args = parser.parse_args()
main()
parser = argparse.ArgumentParser(description='Perform graph analysis on CommonCrawl data')
parser.add_argument('-i', '--inputs', type=str, nargs='?', default='./bootstrap/spark-warehouse/oct/oct*', help='Input path to parquet files')
parser.add_argument('-o', '--outputs', type=str, nargs='?', help='Output name for files generated')
parser.add_argument('-p', '--path', type=str, nargs='?', help='Output directory path name for focus domains')
parser.add_argument('-f', '--focus', type=str, nargs='*', help='List of items we want to focus on')
args = parser.parse_args()
main(args.inputs, args.outputs, args.path, args.focus)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 878aff6

Please sign in to comment.