-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsentences_matcher_run_operator.py
116 lines (99 loc) · 4.38 KB
/
sentences_matcher_run_operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
Automata=None
url = "https://system-operator.s3.eu-west-2.amazonaws.com/term_to_id.csv.gz"
role = "operator"
rconn=None
def enable_debug():
debug=execute('GET','debug{%s}'% hashtag())
if debug=='1':
debug=True
else:
debug=False
return debug
def connecttoRedis():
import redis
redis_client=redis.Redis(host='redisgraph',port=6379,charset="utf-8", decode_responses=True)
return redis_client
def OnRegisteredAutomata():
global Automata
global url
import httpimport
with httpimport.remote_repo(['terraphim_utils'], "https://raw.githubusercontent.com/terraphim/terraphim-platform-automata/main/"):
import terraphim_utils
from terraphim_utils import loadAutomata
# debug=enable_debug()
debug = False
if debug:
log(f"Loading automata from url {url}")
Automata=loadAutomata(url)
return Automata
def process_item(record):
import httpimport
with httpimport.remote_repo(['stop_words'], "https://raw.githubusercontent.com/explosion/spaCy/master/spacy/lang/en/"):
import stop_words
from stop_words import STOP_WORDS
with httpimport.remote_repo(['terraphim_utils'], "https://raw.githubusercontent.com/terraphim/terraphim-platform-automata/main/"):
import terraphim_utils
from terraphim_utils import loadAutomata, find_matches
from string import punctuation
import itertools
import re
debug=enable_debug()
global Automata
global url
global role
if not Automata:
if debug:
log(f"Loading automata from url {url}")
Automata=loadAutomata(url)
global rconn
if not rconn:
rconn=connecttoRedis()
shard_id=hashtag()
article_id=record['key'].split(':')[1]
if debug:
log(f"Matcher received {record['key']} and my {shard_id}")
for each_key in record['value']:
sentence_key=record['key']+f':{each_key}'
# tokens=set(record['value'][each_key].split(' '))
processed=execute('SISMEMBER','processed_docs_stage3_%s_{%s}' % (role,shard_id),sentence_key)
# processed = False
if not processed:
# if debug:
# log("Matcher: tokens " + str(tokens))
# log("Matcher: length of tokens " + str(len(tokens)))
# tokens.difference_update(set(punctuation))
# tokens.difference_update(STOP_WORDS)
token_str=record['value'][each_key]
if debug:
log(f"Matcher: tokens after removing stop words {token_str}")
matched_ents = find_matches(" ".join(token_str).lower(), Automata)
if debug:
log("Matcher: length of matched_ents " + str(len(matched_ents)))
log("Matcher: url " + str(url))
if len(matched_ents)<1:
if debug:
log("Error matching sentence "+sentence_key)
else:
if debug:
log("Matcher: Matching sentence "+sentence_key)
for pair in itertools.combinations(matched_ents, 2):
source_entity_id=pair[0][0]
destination_entity_id=pair[1][0]
label_source=pair[0][1]
label_destination=pair[1][1]
source_canonical_name=re.sub('[^A-Za-z0-9]+', ' ', str(label_source))
destination_canonical_name=re.sub('[^A-Za-z0-9]+', ' ', str(label_destination))
year=rconn.hget(f"article_id:{article_id}",'year')
if not year:
year='2021'
execute('XADD', 'edges_matched_%s_{%s}' % (role,shard_id), '*','source',f'{source_entity_id}','destination',f'{destination_entity_id}','source_name',source_canonical_name,'destination_name',destination_canonical_name,'rank',1,'year',year)
#FIXME: this breaks design pattern of NLP processing to support microservices pattern on front end
rconn.zincrby(f'edges_scored:{source_entity_id}:{destination_entity_id}',1, sentence_key)
execute('SADD','processed_docs_stage3_%s_{%s}' % (role,shard_id),sentence_key)
else:
if debug:
log(f"Matcher Alteady processed {sentence_key} for rol {role}")
bg = GearsBuilder('KeysReader')
bg.run(process_item)
bg.count()
bg.register('sentence:*', mode="async_local",onRegistered=OnRegisteredAutomata)