-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRedisIntakeRedisClusterSample.py
93 lines (71 loc) · 2.66 KB
/
RedisIntakeRedisClusterSample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
from redis.exceptions import ResponseError
from rediscluster import RedisCluster
import sys
from datetime import datetime
from pathlib import Path
LOG_PATH = Path('./logs/')
LOG_PATH.mkdir(exist_ok=True)
import logging
run_start_time = datetime.today().strftime('%Y-%m-%d_%H-%M-%S')
logfile = str(LOG_PATH/'log-{}-{}.txt'.format(run_start_time, "Parsing articles - sampled at 200"))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
datefmt='%m/%d/%Y %H:%M:%S',
handlers=[
logging.FileHandler(logfile),
logging.StreamHandler(sys.stdout)
])
logger = logging.getLogger()
import config
rc_list=json.loads(config.config(section='rediscluster_local')['rediscluster'])
rediscluster_client = RedisCluster(startup_nodes=rc_list, decode_responses=True)
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
n_cpus = os.cpu_count()
logger.info(f'Number of CPUs: {n_cpus}')
executor = ThreadPoolExecutor(max_workers=n_cpus)
cwd=Path.cwd()
datapath=cwd.joinpath('./data/')
print(datapath)
import argparse
parser = argparse.ArgumentParser(description='This is a Intake python program')
parser.add_argument('--nsamples', type=int, default=10)
parser.add_argument('--path', type=str, default=".")
def parse_json_body_text(json_filename):
logger.info("Processing .." + json_filename.stem)
with open(json_filename) as json_data:
data = json.load(json_data)
for body_text in data['body_text']:
para = body_text['text']
yield para
#process document return sentences and entities
def process_file(f, rediscluster_client=rediscluster_client):
article_id=f.stem
logger.info("Processing article_id "+ article_id)
if rediscluster_client.sismember('processed_docs_stage1_para', article_id):
logger.info("already processed "+ article_id)
return article_id
article_body=[]
for para in parse_json_body_text(f):
article_body.append(para)
rediscluster_client.set(f"paragraphs:{article_id}"," ".join(article_body))
rediscluster_client.sadd('processed_docs_stage1_para',article_id)
return article_id
# main submission loop
counter=0
processed=[]
args = parser.parse_args()
json_filenames = datapath.glob(args.path+'/**/*.json')
for each_file in json_filenames:
logger.info("Submitting task")
task=executor.submit(process_file,each_file,rediscluster_client)
processed.append(task)
counter+=1
if counter>args.nsamples:
break
logger.info("Waiting for tasks to complete")
for each_task in as_completed(processed):
logger.info(task.result())
logger.info("Completed")