forked from EDCD/EDMarketConnector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdebug_webserver.py
178 lines (134 loc) · 5.59 KB
/
debug_webserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Simple HTTP listener to be used with debugging various EDMC sends."""
import gzip
import json
import pathlib
import tempfile
import threading
import zlib
from http import server
from typing import Any, Callable, Literal, Tuple, Union
from urllib.parse import parse_qs
from config import appname
from EDMCLogging import get_main_logger
logger = get_main_logger()
output_lock = threading.Lock()
output_data_path = pathlib.Path(tempfile.gettempdir()) / f'{appname}' / 'http_debug'
SAFE_TRANSLATE = str.maketrans({x: '_' for x in "!@#$%^&*()./\\\r\n[]-+='\";:?<>,~`"})
class LoggingHandler(server.BaseHTTPRequestHandler):
"""HTTP Handler implementation that logs to EDMCs logger and writes data to files on disk."""
def __init__(self, request: bytes, client_address: Tuple[str, int], server) -> None:
super().__init__(request, client_address, server)
def log_message(self, format: str, *args: Any) -> None:
"""Override default handler logger with EDMC logger."""
logger.info(format % args)
def do_POST(self) -> None: # noqa: N802 # I cant change it
"""Handle POST."""
logger.info(f"Received a POST for {self.path!r}!")
data_raw: bytes = self.rfile.read(int(self.headers['Content-Length']))
encoding = self.headers.get('Content-Encoding')
to_save = data = self.get_printable(data_raw, encoding)
target_path = self.path
if len(target_path) > 1 and target_path[0] == '/':
target_path = target_path[1:]
elif len(target_path) == 1 and target_path[0] == '/':
target_path = 'WEB_ROOT'
response: Union[Callable[[str], str], str, None] = DEFAULT_RESPONSES.get(target_path)
if callable(response):
response = response(to_save)
self.send_response_only(200, "OK")
if response is not None:
self.send_header('Content-Length', str(len(response)))
self.end_headers() # This is needed because send_response_only DOESN'T ACTUALLY SEND THE RESPONSE </rant>
if response is not None:
self.wfile.write(response.encode())
self.wfile.flush()
if target_path == 'edsm':
# attempt to extract data from urlencoded stream
try:
edsm_data = extract_edsm_data(data)
data = data + "\n" + json.dumps(edsm_data)
except Exception:
pass
target_file = output_data_path / (safe_file_name(target_path) + '.log')
if target_file.parent != output_data_path:
logger.warning(f"REFUSING TO WRITE FILE THAT ISN'T IN THE RIGHT PLACE! {target_file=}")
logger.warning(f'DATA FOLLOWS\n{data}') # type: ignore # mypy thinks data is a byte string here
return
with output_lock, target_file.open('a') as f:
f.write(to_save + "\n\n")
@staticmethod
def get_printable(data: bytes, compression: Literal['deflate'] | Literal['gzip'] | str | None = None) -> str:
"""
Convert an incoming data stream into a string.
:param data: The data to convert
:param compression: The compression to remove, defaults to None
:raises ValueError: If compression is unknown
:return: printable strings
"""
ret: bytes = b''
if compression is None:
ret = data
elif compression == 'deflate':
ret = zlib.decompress(data)
elif compression == 'gzip':
ret = gzip.decompress(data)
else:
raise ValueError(f'Unknown encoding for data {compression!r}')
return ret.decode('utf-8', errors='replace')
def safe_file_name(name: str):
"""
Escape special characters out of a file name.
This is a nicety. Don't rely on it to be ultra secure.
"""
return name.translate(SAFE_TRANSLATE)
def generate_inara_response(raw_data: str) -> str:
"""Generate nonstatic data for inara plugin."""
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
return "UNKNOWN REQUEST"
out = {
'header': {
'eventStatus': 200
},
'events': [
{
'eventName': e['eventName'], 'eventStatus': 200, 'eventStatusText': "DEBUG STUFF"
} for e in data.get('events')
]
}
return json.dumps(out)
def extract_edsm_data(data: str) -> dict[str, Any]:
"""Extract relevant data from edsm data."""
res = parse_qs(data)
return {name: data[0] for name, data in res.items()}
def generate_edsm_response(raw_data: str) -> str:
"""Generate nonstatic data for edsm plugin."""
try:
data = extract_edsm_data(raw_data)
events = json.loads(data['message'])
except (json.JSONDecodeError, Exception):
logger.exception("????")
return "UNKNOWN REQUEST"
out = {
'msgnum': 100, # Ok
'msg': 'debug stuff',
'events': [
{'event': e['event'], 'msgnum': 100, 'msg': 'debug stuff'} for e in events
]
}
return json.dumps(out)
DEFAULT_RESPONSES = {
'inara': generate_inara_response,
'edsm': generate_edsm_response
}
def run_listener(host: str = "127.0.0.1", port: int = 9090) -> None:
"""Run a listener thread."""
output_data_path.mkdir(exist_ok=True)
logger.info(f'Starting HTTP listener on {host=} {port=}!')
listener = server.HTTPServer((host, port), LoggingHandler)
logger.info(listener)
threading.Thread(target=listener.serve_forever, daemon=True).start()
if __name__ == "__main__":
output_data_path.mkdir(exist_ok=True)
server.HTTPServer(("127.0.0.1", 9090), LoggingHandler).serve_forever()