diff --git a/src/OpenSSL/SSL.py b/src/OpenSSL/SSL.py index 0cde0b26..786c441d 100644 --- a/src/OpenSSL/SSL.py +++ b/src/OpenSSL/SSL.py @@ -827,6 +827,16 @@ class Session: _session: Any +def _require_not_used(f): + @wraps(f) + def inner(self, *args, **kwargs): + if self._used: + raise ValueError("Context already used") + return f(self, *args, **kwargs) + + return inner + + class Context: """ :class:`OpenSSL.SSL.Context` instances define the parameters for setting @@ -870,6 +880,7 @@ def __init__(self, method: int) -> None: context = _ffi.gc(context, _lib.SSL_CTX_free) self._context = context + self._used = False self._passphrase_helper: _PassphraseHelper | None = None self._passphrase_callback: _PassphraseCallback[Any] | None = None self._passphrase_userdata: Any | None = None @@ -898,6 +909,7 @@ def __init__(self, method: int) -> None: self.set_min_proto_version(version) self.set_max_proto_version(version) + @_require_not_used def set_min_proto_version(self, version: int) -> None: """ Set the minimum supported protocol version. Setting the minimum @@ -911,6 +923,7 @@ def set_min_proto_version(self, version: int) -> None: _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 ) + @_require_not_used def set_max_proto_version(self, version: int) -> None: """ Set the maximum supported protocol version. Setting the maximum @@ -924,6 +937,7 @@ def set_max_proto_version(self, version: int) -> None: _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 ) + @_require_not_used def load_verify_locations( self, cafile: _StrOrBytesPath | None, @@ -971,6 +985,7 @@ def wrapper(size: int, verify: bool, userdata: Any) -> bytes: FILETYPE_PEM, wrapper, more_args=True, truncate=True ) + @_require_not_used def set_passwd_cb( self, callback: _PassphraseCallback[_T], @@ -1004,6 +1019,7 @@ def set_passwd_cb( ) self._passphrase_userdata = userdata + @_require_not_used def set_default_verify_paths(self) -> None: """ Specify that the platform provided CA certificates are to be used for @@ -1079,6 +1095,7 @@ def _fallback_default_verify_paths( self.load_verify_locations(None, capath) break + @_require_not_used def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: """ Load a certificate chain from a file. @@ -1096,6 +1113,7 @@ def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: if not result: _raise_current_error() + @_require_not_used def use_certificate_file( self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM ) -> None: @@ -1120,6 +1138,7 @@ def use_certificate_file( if not use_result: _raise_current_error() + @_require_not_used def use_certificate(self, cert: X509 | x509.Certificate) -> None: """ Load a certificate from a X509 object @@ -1144,6 +1163,7 @@ def use_certificate(self, cert: X509 | x509.Certificate) -> None: if not use_result: _raise_current_error() + @_require_not_used def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None: """ Add certificate to chain @@ -1176,6 +1196,7 @@ def _raise_passphrase_exception(self) -> None: _raise_current_error() + @_require_not_used def use_privatekey_file( self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM ) -> None: @@ -1200,6 +1221,7 @@ def use_privatekey_file( if not use_result: self._raise_passphrase_exception() + @_require_not_used def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: """ Load a private key from a PKey object @@ -1234,6 +1256,7 @@ def check_privatekey(self) -> None: if not _lib.SSL_CTX_check_private_key(self._context): _raise_current_error() + @_require_not_used def load_client_ca(self, cafile: bytes) -> None: """ Load the trusted certificates that will be sent to the client. Does @@ -1249,6 +1272,7 @@ def load_client_ca(self, cafile: bytes) -> None: _openssl_assert(ca_list != _ffi.NULL) _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) + @_require_not_used def set_session_id(self, buf: bytes) -> None: """ Set the session id to *buf* within which a session can be reused for @@ -1266,6 +1290,7 @@ def set_session_id(self, buf: bytes) -> None: == 1 ) + @_require_not_used def set_session_cache_mode(self, mode: int) -> int: """ Set the behavior of the session cache used by all connections using @@ -1293,6 +1318,7 @@ def get_session_cache_mode(self) -> int: """ return _lib.SSL_CTX_get_session_cache_mode(self._context) + @_require_not_used def set_verify( self, mode: int, callback: _VerifyCallback | None = None ) -> None: @@ -1330,6 +1356,7 @@ def set_verify( self._verify_callback = self._verify_helper.callback _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) + @_require_not_used def set_verify_depth(self, depth: int) -> None: """ Set the maximum depth for the certificate chain verification that shall @@ -1361,6 +1388,7 @@ def get_verify_depth(self) -> int: """ return _lib.SSL_CTX_get_verify_depth(self._context) + @_require_not_used def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: """ Load parameters for Ephemeral Diffie-Hellman @@ -1382,6 +1410,7 @@ def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) _openssl_assert(res == 1) + @_require_not_used def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None: """ Select a curve to use for ECDHE key exchange. @@ -1421,6 +1450,7 @@ def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None: ec = _ffi.gc(ec, _lib.EC_KEY_free) _lib.SSL_CTX_set_tmp_ecdh(self._context, ec) + @_require_not_used def set_cipher_list(self, cipher_list: bytes) -> None: """ Set the list of ciphers to be used in this context. @@ -1460,6 +1490,7 @@ def set_cipher_list(self, cipher_list: bytes) -> None: ], ) + @_require_not_used def set_client_ca_list( self, certificate_authorities: Sequence[X509Name] ) -> None: @@ -1497,6 +1528,7 @@ def set_client_ca_list( _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) + @_require_not_used def add_client_ca( self, certificate_authority: X509 | x509.Certificate ) -> None: @@ -1531,6 +1563,7 @@ def add_client_ca( ) _openssl_assert(add_result == 1) + @_require_not_used def set_timeout(self, timeout: int) -> None: """ Set the timeout for newly created sessions for this Context object to @@ -1554,6 +1587,7 @@ def get_timeout(self) -> int: """ return _lib.SSL_CTX_get_timeout(self._context) + @_require_not_used def set_info_callback( self, callback: Callable[[Connection, int, int], None] ) -> None: @@ -1579,6 +1613,7 @@ def wrapper(ssl, where, return_code): # type: ignore[no-untyped-def] _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) @_requires_keylog + @_require_not_used def set_keylog_callback( self, callback: Callable[[Connection, bytes], None] ) -> None: @@ -1613,6 +1648,7 @@ def get_app_data(self) -> Any: """ return self._app_data + @_require_not_used def set_app_data(self, data: Any) -> None: """ Set the application data (will be returned from get_app_data()) @@ -1639,6 +1675,7 @@ def get_cert_store(self) -> X509Store | None: pystore._store = store return pystore + @_require_not_used def set_options(self, options: int) -> int: """ Add options. Options set before are not cleared! @@ -1652,6 +1689,7 @@ def set_options(self, options: int) -> int: return _lib.SSL_CTX_set_options(self._context, options) + @_require_not_used def set_mode(self, mode: int) -> int: """ Add modes via bitmask. Modes set before are not cleared! This method @@ -1665,6 +1703,7 @@ def set_mode(self, mode: int) -> int: return _lib.SSL_CTX_set_mode(self._context, mode) + @_require_not_used def set_tlsext_servername_callback( self, callback: Callable[[Connection], None] ) -> None: @@ -1690,6 +1729,7 @@ def wrapper(ssl, alert, arg): # type: ignore[no-untyped-def] self._context, self._tlsext_servername_callback ) + @_require_not_used def set_tlsext_use_srtp(self, profiles: bytes) -> None: """ Enable support for negotiating SRTP keying material. @@ -1705,6 +1745,7 @@ def set_tlsext_use_srtp(self, profiles: bytes) -> None: _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 ) + @_require_not_used def set_alpn_protos(self, protos: list[bytes]) -> None: """ Specify the protocols that the client is prepared to speak after the @@ -1742,6 +1783,7 @@ def set_alpn_protos(self, protos: list[bytes]) -> None: == 0 ) + @_require_not_used def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None: """ Specify a callback function that will be called on the server when a @@ -1786,6 +1828,7 @@ def _set_ocsp_callback( rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) _openssl_assert(rc == 1) + @_require_not_used def set_ocsp_server_callback( self, callback: _OCSPServerCallback[_T], @@ -1808,6 +1851,7 @@ def set_ocsp_server_callback( helper = _OCSPServerCallbackHelper(callback) self._set_ocsp_callback(helper, data) + @_require_not_used def set_ocsp_client_callback( self, callback: _OCSPClientCallback[_T], @@ -1832,6 +1876,7 @@ def set_ocsp_client_callback( helper = _OCSPClientCallbackHelper(callback) self._set_ocsp_callback(helper, data) + @_require_not_used def set_cookie_generate_callback( self, callback: _CookieGenerateCallback ) -> None: @@ -1841,6 +1886,7 @@ def set_cookie_generate_callback( self._cookie_generate_helper.callback, ) + @_require_not_used def set_cookie_verify_callback( self, callback: _CookieVerifyCallback ) -> None: @@ -1869,6 +1915,8 @@ def __init__( if not isinstance(context, Context): raise TypeError("context must be a Context instance") + context._used = True + ssl = _lib.SSL_new(context._context) self._ssl = _ffi.gc(ssl, _lib.SSL_free) # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns @@ -2000,6 +2048,7 @@ def set_context(self, context: Context) -> None: _lib.SSL_set_SSL_CTX(self._ssl, context._context) self._context = context + self._context._used = True def get_servername(self) -> bytes | None: """