-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathIago.py
150 lines (124 loc) · 5.76 KB
/
Iago.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# -*- coding: utf-8 -*-
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Created on Thu Apr 19 11:41:01 GMT+7 2018
#
# @author: rhamilton
from __future__ import division, print_function, absolute_import
import os
import sys
import copy
import time
from ligmos.workers import connSetup, workerSetup
from ligmos.utils import amq, classes, common
from ligmos.utils import amqListeners as amql
from dataservants import iago
def main():
"""
"""
# Define the default files we'll use/look for. These are passed to
# the worker constructor (toServeMan).
conf = './config/iago.conf'
passes = './config/passwords.conf'
logfile = '/tmp/iago.log'
desc = "Iago: The ActiveMQ Parrot"
eargs = iago.parseargs.extraArguments
conftype = classes.snoopTarget
# Interval between successive runs of the polling loop (seconds)
bigsleep = 120
# config: dictionary of parsed config file
# comm: common block from config file
# args: parsed options
# runner: class that contains logic to quit nicely
config, comm, _, runner = workerSetup.toServeMan(conf,
passes,
logfile,
desc=desc,
extraargs=eargs,
conftype=conftype,
logfile=True)
# Get this PID for diagnostics
pid = os.getpid()
# Print the preamble of this particular instance
# (helpful to find starts/restarts when scanning thru logs)
common.printPreamble(pid, config)
# Check to see if there are any connections/objects to establish
idbs = connSetup.connIDB(comm)
# Specify our custom listener(s) that will really do all the work
# They should be specified in a set of "topic-*" sections in the
# config file to define different listeners, to spread the load
topicTypes = {}
brokerConns = {}
for eachSection in config.keys():
if eachSection.lower().startswith("topic") is True:
print("%s:" % (eachSection))
# This means it's a valid topic section so pull out the listener
# type that we need as well as the broker connection
conSect = config[eachSection]
print("listenerType: %s" % (conSect.listenertype))
print("brokerReference: %s" % (conSect.broker))
print("databaseReference: %s" % (conSect.database))
try:
# Deal with mutability here! Must do a deepcopy to change the
# tablename between config sections, otherwise it'll get all
# sorts of screwed up.
dbr = copy.deepcopy(idbs[conSect.database])
dbr.tablename = conSect.tablename
except KeyError:
print("Database %s not in config :(" % (conSect.database))
dbr = None
# I admit in retrospect that this sucks
if conSect.listenertype.lower() == "ldt":
prlistener = iago.listener_LDT.LDTConsumer(dbconn=dbr)
elif conSect.listenertype.lower() == "omspdu":
prlistener = iago.listener_OMSPDU.OMSPDUConsumer(dbconn=dbr)
elif conSect.listenertype.lower() == "lois":
prlistener = iago.listener_LOIS.LOISConsumer(dbconn=dbr)
elif conSect.listenertype.lower() == "mesa":
prlistener = iago.listener_Mesa.MesaConsumer(dbconn=dbr)
elif conSect.listenertype.lower() == "marshill":
prlistener = iago.listener_MarsHill.MHConsumer(dbconn=dbr)
else:
print("WARNING: Unknown or no listenertype specified!")
print("Using no databases and switching to Parrot listener!")
prlistener = amql.ParrotSubscriber()
topicTypes.update({eachSection: [conSect, prlistener]})
bkr = connSetup.connAMQ_simple(comm[conSect.broker],
conSect.topics,
listener=prlistener)
brokerConns.update({eachSection: bkr})
# allTopics = amq.getAllTopics(config, comm)
# Semi-infinite loop
while runner.halt is False:
# Check on our connections
for eachBroker in brokerConns:
newRef = amq.checkSingleConnection(brokerConns[eachBroker],
subscribe=True)
brokerConns.update({eachBroker: newRef})
# There really isn't anything to actually *do* in here;
# all the real work happens in the listeners, so we really
# just spin our wheels here.
# Consider taking a big nap
if runner.halt is False:
print("Starting a big sleep")
# Sleep for bigsleep, but in very small chunks to check abort
for _ in range(bigsleep):
time.sleep(0.1)
if runner.halt is True:
break
# The above loop is exited when someone sends SIGTERM
print("PID %d is now out of here!" % (pid))
# Disconnect from all ActiveMQ brokers
for each in brokerConns:
brokerConns[each][0].disconnect()
# The PID file will have already been either deleted/overwritten by
# another function/process by this point, so just give back the
# console and return STDOUT and STDERR to their system defaults
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
print("STDOUT and STDERR reset.")
if __name__ == "__main__":
main()