-
Notifications
You must be signed in to change notification settings - Fork 1
/
prometheus.py
176 lines (150 loc) · 7.39 KB
/
prometheus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# from prometheus_client/exposition.py
from __future__ import unicode_literals
import copy, html, io, logging, threading
LOGGER = logging.getLogger('porter.prometheus')
from datetime import datetime
from socketserver import ThreadingMixIn
from urllib.parse import parse_qs, quote_plus, urlparse
from wsgiref.simple_server import make_server, WSGIRequestHandler, WSGIServer
from prometheus_client.registry import REGISTRY
from prometheus_client.exposition import choose_encoder
class SilentException(Exception):
"""This is used to fail a request with status 5xx"""
pass
def registry_view_factory(parent, path, params):
if not path.startswith('/probe'):
return parent
class ViewRestrictedRegistry(object):
def __init__(self, parent, path, params):
self.parent, self.path, self.params = parent, path, params
def collect(self):
collectors = None
ti = None
with parent._lock:
collectors = copy.copy(parent._collector_to_names)
if parent._target_info:
ti = parent._target_info_metric()
if ti:
yield ti
for collector in collectors:
collect2_func = None
try:
collect2_func = collector.collect2
except AttributeError:
pass
if collect2_func:
for metric in collector.collect2(path, params):
yield metric
# the only zero-argument collect collectors we have here are the default ones
# and they should not be emitted for /probe queries
#else:
# for metric in collector.collect():
# yield metric
return ViewRestrictedRegistry(parent, path, params)
def _bake_output(registry, accept_header, path, params, registry_view_factory):
"""Bake output for metrics output."""
encoder, content_type = choose_encoder(accept_header)
try:
if 'name[]' in params:
registry = registry.restricted_registry(params['name[]'])
if registry_view_factory:
registry = registry_view_factory(registry, path, params)
output = encoder(registry)
return str('200 OK'), (str('Content-Type'), content_type), output
except SilentException:
return str('503 Server Error'), (str('Content-Type'), content_type), b''
# in addition to logging to stderr, we also log to LOG_STREAM
LOG_STREAM = io.StringIO()
LOG_STREAM_HANDLER = logging.StreamHandler(LOG_STREAM)
LOG_STREAM_HANDLER.setFormatter(logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s'))
LOG_STREAM_HANDLER.setLevel(logging.DEBUG)
logging.getLogger().addHandler(LOG_STREAM_HANDLER)
logging.getLogger().setLevel(logging.INFO)
# but now we need to explicitly log to stderr...
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
logging.getLogger().addHandler(handler)
def make_wsgi_app(registry=REGISTRY, registry_view_factory=registry_view_factory):
"""Create a WSGI app which serves the metrics from a registry."""
def prometheus_app(environ, start_response):
# Prepare parameters
accept_header = environ.get('HTTP_ACCEPT')
path = environ['PATH_INFO']
params = parse_qs(environ.get('QUERY_STRING', ''))
if path == '/favicon.ico':
# Serve empty response for browsers
status = '200 OK'
header = ('', '')
output = b''
elif path == '/':
status = '200 OK'
header = ('', '')
output = b'''<html><head><title>Porter</title></head><body>
<h1>Porter</h1>
Someday this may be a form. Today is not that day.
<p>
<p><a href="/logging">view and set log level, and see logs</a>
<p><a href="/metrics">metrics</a>
</body></html>'''
elif path.startswith('/logging'):
module = params.get('module', [''])[0].lower()
logger = logging.getLogger(f'porter.{module}') if module else logging.root
if params.get('level', [''])[0].lower() == 'debug':
if logger.level != logging.DEBUG:
logger.setLevel(logging.DEBUG)
LOGGER.info(f'set DEBUG level for {module}')
elif params.get('level', [''])[0].lower() == 'info':
if logger.level != logging.INFO:
logger.setLevel(logging.INFO)
LOGGER.info(f'set INFO level for {module}')
def statusp(module):
modspec = f'&module={html.escape(module)}' if module else ''
logger = logging.getLogger(f'porter.{module}') if module else logging.root
def atag(level):
return f'<a href="/logging?level={level.lower()}{modspec}">{level.upper()}</a>'
if logger.level == logging.DEBUG:
level = f'<b>DEBUG</b> {atag("info")}'
elif logger.level == logging.INFO:
level = f'{atag("debug")} <b>INFO</b>'
else:
level = f'neither {atag("debug")} nor {atag("info")}: {logger.level if logger.level else "[default]"}'
return f'<p>Current {html.escape(module)} logging level: {level}'.encode()
status = '200 OK'
header = ('', '')
global LOG_STREAM
logs = LOG_STREAM.getvalue()
now = datetime.utcnow().ctime()
output = b'<html><head><title>Porter Logging</title></head><body><h1>Porter Logging</h1>' + b'\n'.join([statusp(m) for m in ['', 'brainstem', 'lutron', 'netaxs', 'tesla', 'totalconnect']]) + b'\n<h1>Event Log</h1><pre>' + html.escape(logs).encode() + b'</pre><p>This page generated at ' + html.escape(now).encode() + b' UTC</body></html>'
if len(logs) > 1024*256:
LOG_STREAM = io.StringIO()
global LOG_STREAM_HANDLER
LOG_STREAM_HANDLER.setStream(LOG_STREAM)
LOGGER.info(f'rotated log buffer with length {len(logs)}')
elif path == '/config':
status = '200 OK'
header = ('', '')
output = b'''<html><head><title>Porter Config</title></head><body>
Someday this may show the server's config, sanitized to exclude passwords etc.
</body></html>'''
else: # /metrics or /probe
status, header, output = _bake_output(registry, accept_header, path, params, registry_view_factory)
start_response(status, [header])
return [output]
return prometheus_app
class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
"""Thread per request HTTP server."""
# Make worker threads "fire and forget". Beginning with Python 3.7 this
# prevents a memory leak because ``ThreadingMixIn`` starts to gather all
# non-daemon threads in a list in order to join on them at server close.
daemon_threads = True
class _SilentHandler(WSGIRequestHandler):
"""WSGI handler that does not log requests."""
def log_message(self, format, *args):
"""Log nothing."""
def start_wsgi_server(port, addr='', registry=REGISTRY, registry_view_factory=registry_view_factory):
"""Starts a WSGI server for prometheus metrics as a daemon thread."""
app = make_wsgi_app(registry, registry_view_factory)
httpd = make_server(addr, port, app, ThreadingWSGIServer, handler_class=_SilentHandler)
t = threading.Thread(target=httpd.serve_forever)
t.daemon = True
t.start()