-
Notifications
You must be signed in to change notification settings - Fork 0
/
analysis2_script.py
70 lines (61 loc) · 2.98 KB
/
analysis2_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import pandas as pd
import json
import sys
import os
from opensearchpy import OpenSearch
username = 'admin'
password = 'Elasticsearch123!#'
index_name = 'analysis2'
http_auth = (username, password)
hosts = 'https://search-dsdgroup6-pt4vzj4nnlkksrgfvv6rmiuxgu.us-east-1.es.amazonaws.com:443'
es = OpenSearch(hosts=hosts, http_auth=http_auth)
class Category_Analysis:
def extract_category_count(self,csv_file_path,year):
data = pd.read_csv(csv_file_path)
data['trending_date'] = pd.to_datetime(data['trending_date'], format='%Y-%m-%dT%H:%M:%SZ').dt.tz_localize(None)
final_data=data[data['trending_date'].dt.year==year]
category_counts = final_data.groupby('categoryId').size().reset_index(name='counts')
return category_counts
def fetch_category_name_from_json(self, json_file_path, year, id_status):
# Read the JSON file
with open(json_file_path,'r') as json_file:
data = json.load(json_file)
sorted_counts = category_counts.sort_values(by='counts', ascending=False)
json_dict = {}
for item in data['items']:
json_dict[item['id']] = item['snippet']['title']
list_of_dicts = sorted_counts.to_dict(orient='records')
for dictionary in list_of_dicts:
dictionary['year'] = year
for item in list_of_dicts:
category_id = str(item['categoryId']) # Convert to string to match keys in json_dict
if category_id in json_dict:
item['category_name'] = json_dict[category_id]
else:
item['category_name'] = 'Unknown'
for item in list_of_dicts:
item['ID'] = f"{item['categoryId']}_{item['year']}"
print(list_of_dicts)
for single_dict in list_of_dicts:
try:
response = es.index(index=index_name, id=single_dict["ID"], body=single_dict)
print(f"Document inserted successfully. Document ID: {response['_id']}")
id_status["status"] = "success"
es.index(index="analysis_record", id=analysis_id, body=id_status)
except Exception as e:
print(f"Error inserting document: {e}")
id_status["status"] = "error"
es.index(index="analysis_record", id=analysis_id, body=id_status)
if __name__ == '__main__':
#csv_path = 'C:/Users/aayan/Desktop/Fall 2023/DSD/Project/DSD Youtube Dataset/preprocessed_single_file_dataset.csv'
#json_path='C:/Users/aayan/Desktop/Fall 2023/DSD/Project/DSD Youtube Dataset/BR_category_id.json'
dir = os.path.dirname(__file__)
csv_path = 'preprocessed_single_file_dataset.csv'
json_path='BR_category_id.json'
year = int(sys.argv[1])
analysis_id = sys.argv[2]
id_status = es.get(index="analysis_record", id=analysis_id)["_source"]
#year=2023
category_obj=Category_Analysis()
category_counts=category_obj.extract_category_count(csv_path,year)
category_obj.fetch_category_name_from_json(json_path, year,id_status)