From a9344856381996f213b3cd578628777d81278eb8 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Mon, 16 Sep 2024 18:04:35 -0400 Subject: [PATCH] verify server certificate (#136) --- docs/security.md | 11 +++++ tnz/tnz.py | 109 ++++++++++++++++++++++++++++++++++------------- tnz/zti.py | 23 ++++++++-- 3 files changed, 110 insertions(+), 33 deletions(-) diff --git a/docs/security.md b/docs/security.md index c8bd1ef..64316b1 100644 --- a/docs/security.md +++ b/docs/security.md @@ -4,6 +4,17 @@ hide: - toc --- +## SSL Verification + +Use environment variable `SESSION_SSL_VERIFY=cert` to require that the +server provide a trusted certificate. + +Use environment variable `SESSION_SSL_VERIFY=hostname` to require that +the certificate hostname match the requested hostname. Note that this +also requires that the server provide a trusted certificate. + +## Cipher Issues + Python 3.10 is aggressive in causing failures for algorithms/options that are not secure enough. If you receive an SSL-related message, there is a good chance of a security weakness in the host/server. The best course of action is to request that the server be updated to support security best practices in terms of supported encryption algorithms and key sizes. diff --git a/tnz/tnz.py b/tnz/tnz.py index 901a7eb..b2876da 100644 --- a/tnz/tnz.py +++ b/tnz/tnz.py @@ -7,11 +7,13 @@ Environment variables used: SESSION_PS_SIZE + SESSION_SECLEVEL + SESSION_SSL_VERIFY TNZ_COLORS TNZ_LOGGING ZTI_SECLEVEL -Copyright 2021, 2023 IBM Inc. All Rights Reserved. +Copyright 2021, 2024 IBM Inc. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 """ @@ -94,6 +96,8 @@ def __init__(self, name=None): self.colors = 768 self.__secure = False + self.__cert_verified = False + self.__start_tls_hostname = None self.__host_verified = False self._event = None self.__loop = None @@ -372,6 +376,12 @@ def connect(self, host=None, port=None, if host is None: host = "127.0.0.1" # default host + elif not secure: # if might need hostname later for start_tls + import socket + try: + self.__start_tls_hostname = socket.getfqdn(host) + except socket.gaierror: + pass if port is None: if secure is False: @@ -383,13 +393,20 @@ def connect(self, host=None, port=None, self._event = event self.__secure = False + self.__cert_verified = False self.__host_verified = False + def _connection_made(_, transport): + self._transport = transport + self.seslost = False + if context: + self.__secure = True + if context.verify_mode == ssl.CERT_REQUIRED: + self.__cert_verified = True + self.__host_verified = context.check_hostname + class _TnzProtocol(asyncio.BaseProtocol): - @staticmethod - def connection_made(transport): - self._transport = transport - self.seslost = False + connection_made = _connection_made @staticmethod def connection_lost(exc): @@ -425,22 +442,7 @@ def resume_writing(): """ self._log_warn("resume_writing") - if secure: - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - if os.getenv("ZTI_SECLEVEL", "2") == "1": - context.set_ciphers("DEFAULT@SECLEVEL=1") - - if verifycert: - context.load_verify_locations("ibm-cacerts.pem") - self.__host_verified = True # ? too soon ? - - else: - context.check_hostname = False # insecure FIXME - context.verify_mode = ssl.CERT_NONE # insecure FIXME - - else: - context = None - + context = self.__create_context(verifycert) if secure else None coro = self.__connect(_TnzProtocol, host, port, context) loop = self.__get_event_loop() task = loop.create_task(coro) @@ -2293,9 +2295,7 @@ def _process(self, data): elif data == b"\xff\xfa\x2e\x01": # IAC SB ... self.__log_info("i<< START_TLS FOLLOWS") - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE + context = self.__create_context() coro = self.__start_tls(context) task = self.__loop.create_task(coro) self.__connect_task = task @@ -3713,6 +3713,37 @@ async def __connect(self, protocol, host, port, ssl_context): if self.__connect_task is task: self.__connect_task = None + def __create_context(self, verifycert=None): + """Create an SSL context for a session. + + Uses environment variables to determine context options. + """ + context = ssl.create_default_context() + + getenv = os.environ.get + seclevel = getenv("SESSION_SECLEVEL") + if not seclevel and getenv("ZTI_SECLEVEL") == "1": + seclevel = "1" + + if seclevel: + context.set_ciphers(f"DEFAULT@SECLEVEL={seclevel}") + + ssl_verify = getenv("SESSION_SSL_VERIFY", "") + context.check_hostname = ssl_verify == "hostname" + + if verifycert is None: + if not ssl_verify and context.check_hostname: + verifycert = True + else: + verifycert = ssl_verify in ("cert", "hostname") + + if verifycert: + context.verify_mode = ssl.CERT_REQUIRED + else: + context.verify_mode = ssl.CERT_NONE + + return context + def __erase(self, saddr, eaddr): """Process erase function. @@ -4447,10 +4478,19 @@ async def __start_tls(self, context): transport = self._transport protocol = transport.get_protocol() self._transport = None + server_hostname = None + if context.check_hostname: + server_hostname = self.__start_tls_hostname + if server_hostname is None: + raise TnzError("no hostname for check_hostname") + try: - transport = await loop.start_tls(transport, - protocol, - context) + transport = await loop.start_tls( + transport, + protocol, + context, + server_hostname=server_hostname, + ) except asyncio.CancelledError: self.seslost = True self._event.set() @@ -4463,6 +4503,10 @@ async def __start_tls(self, context): else: self._transport = transport self.__secure = True + if context.verify_mode == ssl.CERT_REQUIRED: + self.__cert_verified = True + self.__host_verified = context.check_hostname + self.__log_debug("__start_tls transport: %r", transport) self.send() # in case send() ignored for _transport = None @@ -4702,6 +4746,12 @@ def __repl(mat): # Readonly properties + @property + def cert_verified(self): + """Bool indicating if secure and cert was verified as trusted. + """ + return self.__cert_verified + @property def host_verified(self): """Bool indicating if secure and host was verified. @@ -4871,15 +4921,16 @@ def connect(host=None, port=None, secure = True for encrypted connection verifycert only has meaning when secure is True """ + ssl_verify = os.environ.get("SESSION_SSL_VERIFY", "") tnz = Tnz(name=name) if port is None and secure is not False: port = 992 - if verifycert is None: + if verifycert is None and secure is None and not ssl_verify: verifycert = False if secure and verifycert is None: - verifycert = True + verifycert = ssl_verify in ("cert", "hostname") if secure is None: secure = bool(port != 23) diff --git a/tnz/zti.py b/tnz/zti.py index e18c6a3..1169670 100644 --- a/tnz/zti.py +++ b/tnz/zti.py @@ -30,7 +30,7 @@ ZTI_TITLE _BPX_TERMPATH (see _termlib.py) -Copyright 2021, 2023 IBM Inc. All Rights Reserved. +Copyright 2021, 2024 IBM Inc. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 """ @@ -772,6 +772,22 @@ def do_session(self, arg): print(f" SESSION_CODE_PAGE={tns.codec_info[0].name}") print(f" SESSION_PS_SIZE={tns.amaxrow}x{tns.amaxcol}") + + if tns.secure: + verify = "" + if tns.host_verified: + verify = "hostname" + elif tns.cert_verified: + verify = "cert" + + if verify: + print(f" SESSION_SSL_VERIFY={verify}") + else: + print(f" SESSION_SSL=1") + print(f" SESSION_SSL_VERIFY=none") + else: + print(f" SESSION_SSL=0") + print(f" SESSION_TN_ENHANCED={tns.tn3270e:d}") print(f" SESSION_DEVICE_TYPE={tns.terminal_type}") @@ -780,8 +796,6 @@ def do_session(self, arg): else: print(" Alternate code page not supported") - print(" socket type: "+repr(tns.getsockettype())) - if tns.extended_color_mode(): print(" Extended color mode") else: @@ -1061,10 +1075,11 @@ def help_vars(self): print("""Variables used when creating a new session: SESSION_CODE_PAGE - code page, e.g. cp037 SESSION_LU_NAME - LU name for TN3270E CONNECT - SESSION_HOST - tcp/ip hostname + SESSION_HOST - tcp/ip hostname or IP address SESSION_PORT - tcp/ip port, default is 992 SESSION_PS_SIZE - terminal size, e.g. 62x160 SESSION_SSL - set to 0 to not force SSL + SESSION_SSL_VERIFY - set to cert or hostname to require verification SESSION_TN_ENHANCED - set to 1 allow TN3270E SESSION_DEVICE_TYPE - device-type, e.g. IBM-DYNAMIC """)