From 9eaa1073621dd46b3e721183efc318c95aba0d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jeremy=20Lain=C3=A9?= Date: Fri, 21 Jun 2024 21:14:11 +0200 Subject: [PATCH] Add type annotations for the `SSL` module (#1308) * Add type annotations for the `SSL` module Instead of relying on the third-party `types-pyOpenSSL` package, provide our own type annotations. This will ensure `pyOpenSSL` users always have type annotations which matches their version. * Replace `py.typed` by an empty file --- MANIFEST.in | 2 +- src/OpenSSL/SSL.py | 453 ++++++++++++++++++++++++------------------- src/OpenSSL/py.typed | 0 3 files changed, 252 insertions(+), 203 deletions(-) create mode 100644 src/OpenSSL/py.typed diff --git a/MANIFEST.in b/MANIFEST.in index a9642c785..308658240 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -include LICENSE MANIFEST.in *.rst tox.ini .coveragerc +include LICENSE MANIFEST.in *.rst tox.ini .coveragerc src/OpenSSL/py.typed exclude codecov.yml .readthedocs.yml mypy.ini recursive-include tests *.py recursive-include doc * diff --git a/src/OpenSSL/SSL.py b/src/OpenSSL/SSL.py index 852d83d52..5e42289a3 100644 --- a/src/OpenSSL/SSL.py +++ b/src/OpenSSL/SSL.py @@ -5,10 +5,11 @@ from functools import partial, wraps from itertools import chain, count from sys import platform +from typing import Any, Callable, List, Optional, Sequence, Tuple, TypeVar from weakref import WeakValueDictionary from OpenSSL._util import ( - UNSPECIFIED as _UNSPECIFIED, + StrOrBytesPath as _StrOrBytesPath, ) from OpenSSL._util import ( exception_from_error_queue as _exception_from_error_queue, @@ -37,6 +38,7 @@ PKey, X509Name, X509Store, + _EllipticCurve, _PassphraseHelper, ) @@ -142,12 +144,18 @@ ] -OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER -OPENSSL_VERSION = SSLEAY_VERSION = _lib.OPENSSL_VERSION -OPENSSL_CFLAGS = SSLEAY_CFLAGS = _lib.OPENSSL_CFLAGS -OPENSSL_PLATFORM = SSLEAY_PLATFORM = _lib.OPENSSL_PLATFORM -OPENSSL_DIR = SSLEAY_DIR = _lib.OPENSSL_DIR -OPENSSL_BUILT_ON = SSLEAY_BUILT_ON = _lib.OPENSSL_BUILT_ON +OPENSSL_VERSION_NUMBER: int = _lib.OPENSSL_VERSION_NUMBER +OPENSSL_VERSION: int = _lib.OPENSSL_VERSION +OPENSSL_CFLAGS: int = _lib.OPENSSL_CFLAGS +OPENSSL_PLATFORM: int = _lib.OPENSSL_PLATFORM +OPENSSL_DIR: int = _lib.OPENSSL_DIR +OPENSSL_BUILT_ON: int = _lib.OPENSSL_BUILT_ON + +SSLEAY_VERSION = OPENSSL_VERSION +SSLEAY_CFLAGS = OPENSSL_CFLAGS +SSLEAY_PLATFORM = OPENSSL_PLATFORM +SSLEAY_DIR = OPENSSL_DIR +SSLEAY_BUILT_ON = OPENSSL_BUILT_ON SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN @@ -163,105 +171,108 @@ DTLS_SERVER_METHOD = 11 DTLS_CLIENT_METHOD = 12 -SSL3_VERSION = _lib.SSL3_VERSION -TLS1_VERSION = _lib.TLS1_VERSION -TLS1_1_VERSION = _lib.TLS1_1_VERSION -TLS1_2_VERSION = _lib.TLS1_2_VERSION -TLS1_3_VERSION = _lib.TLS1_3_VERSION - -OP_NO_SSLv2 = _lib.SSL_OP_NO_SSLv2 -OP_NO_SSLv3 = _lib.SSL_OP_NO_SSLv3 -OP_NO_TLSv1 = _lib.SSL_OP_NO_TLSv1 -OP_NO_TLSv1_1 = _lib.SSL_OP_NO_TLSv1_1 -OP_NO_TLSv1_2 = _lib.SSL_OP_NO_TLSv1_2 +SSL3_VERSION: int = _lib.SSL3_VERSION +TLS1_VERSION: int = _lib.TLS1_VERSION +TLS1_1_VERSION: int = _lib.TLS1_1_VERSION +TLS1_2_VERSION: int = _lib.TLS1_2_VERSION +TLS1_3_VERSION: int = _lib.TLS1_3_VERSION + +OP_NO_SSLv2: int = _lib.SSL_OP_NO_SSLv2 +OP_NO_SSLv3: int = _lib.SSL_OP_NO_SSLv3 +OP_NO_TLSv1: int = _lib.SSL_OP_NO_TLSv1 +OP_NO_TLSv1_1: int = _lib.SSL_OP_NO_TLSv1_1 +OP_NO_TLSv1_2: int = _lib.SSL_OP_NO_TLSv1_2 try: - OP_NO_TLSv1_3 = _lib.SSL_OP_NO_TLSv1_3 + OP_NO_TLSv1_3: int = _lib.SSL_OP_NO_TLSv1_3 __all__.append("OP_NO_TLSv1_3") except AttributeError: pass -MODE_RELEASE_BUFFERS = _lib.SSL_MODE_RELEASE_BUFFERS +MODE_RELEASE_BUFFERS: int = _lib.SSL_MODE_RELEASE_BUFFERS -OP_SINGLE_DH_USE = _lib.SSL_OP_SINGLE_DH_USE -OP_SINGLE_ECDH_USE = _lib.SSL_OP_SINGLE_ECDH_USE -OP_EPHEMERAL_RSA = _lib.SSL_OP_EPHEMERAL_RSA -OP_MICROSOFT_SESS_ID_BUG = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG -OP_NETSCAPE_CHALLENGE_BUG = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG -OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG = ( +OP_SINGLE_DH_USE: int = _lib.SSL_OP_SINGLE_DH_USE +OP_SINGLE_ECDH_USE: int = _lib.SSL_OP_SINGLE_ECDH_USE +OP_EPHEMERAL_RSA: int = _lib.SSL_OP_EPHEMERAL_RSA +OP_MICROSOFT_SESS_ID_BUG: int = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG +OP_NETSCAPE_CHALLENGE_BUG: int = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG +OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG: int = ( _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG ) -OP_SSLREF2_REUSE_CERT_TYPE_BUG = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG -OP_MICROSOFT_BIG_SSLV3_BUFFER = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER -OP_MSIE_SSLV2_RSA_PADDING = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING -OP_SSLEAY_080_CLIENT_DH_BUG = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG -OP_TLS_D5_BUG = _lib.SSL_OP_TLS_D5_BUG -OP_TLS_BLOCK_PADDING_BUG = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG -OP_DONT_INSERT_EMPTY_FRAGMENTS = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS -OP_CIPHER_SERVER_PREFERENCE = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE -OP_TLS_ROLLBACK_BUG = _lib.SSL_OP_TLS_ROLLBACK_BUG +OP_SSLREF2_REUSE_CERT_TYPE_BUG: int = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG +OP_MICROSOFT_BIG_SSLV3_BUFFER: int = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER +OP_MSIE_SSLV2_RSA_PADDING: int = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING +OP_SSLEAY_080_CLIENT_DH_BUG: int = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG +OP_TLS_D5_BUG: int = _lib.SSL_OP_TLS_D5_BUG +OP_TLS_BLOCK_PADDING_BUG: int = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG +OP_DONT_INSERT_EMPTY_FRAGMENTS: int = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS +OP_CIPHER_SERVER_PREFERENCE: int = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE +OP_TLS_ROLLBACK_BUG: int = _lib.SSL_OP_TLS_ROLLBACK_BUG OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1 -OP_PKCS1_CHECK_2 = _lib.SSL_OP_PKCS1_CHECK_2 -OP_NETSCAPE_CA_DN_BUG = _lib.SSL_OP_NETSCAPE_CA_DN_BUG -OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG = ( +OP_PKCS1_CHECK_2: int = _lib.SSL_OP_PKCS1_CHECK_2 +OP_NETSCAPE_CA_DN_BUG: int = _lib.SSL_OP_NETSCAPE_CA_DN_BUG +OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG: int = ( _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG ) -OP_NO_COMPRESSION = _lib.SSL_OP_NO_COMPRESSION +OP_NO_COMPRESSION: int = _lib.SSL_OP_NO_COMPRESSION -OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU -OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE -OP_NO_TICKET = _lib.SSL_OP_NO_TICKET +OP_NO_QUERY_MTU: int = _lib.SSL_OP_NO_QUERY_MTU +OP_COOKIE_EXCHANGE: int = _lib.SSL_OP_COOKIE_EXCHANGE +OP_NO_TICKET: int = _lib.SSL_OP_NO_TICKET try: - OP_NO_RENEGOTIATION = _lib.SSL_OP_NO_RENEGOTIATION + OP_NO_RENEGOTIATION: int = _lib.SSL_OP_NO_RENEGOTIATION __all__.append("OP_NO_RENEGOTIATION") except AttributeError: pass try: - OP_IGNORE_UNEXPECTED_EOF = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF + OP_IGNORE_UNEXPECTED_EOF: int = _lib.SSL_OP_IGNORE_UNEXPECTED_EOF __all__.append("OP_IGNORE_UNEXPECTED_EOF") except AttributeError: pass try: - OP_LEGACY_SERVER_CONNECT = _lib.SSL_OP_LEGACY_SERVER_CONNECT + OP_LEGACY_SERVER_CONNECT: int = _lib.SSL_OP_LEGACY_SERVER_CONNECT __all__.append("OP_LEGACY_SERVER_CONNECT") except AttributeError: pass -OP_ALL = _lib.SSL_OP_ALL - -VERIFY_PEER = _lib.SSL_VERIFY_PEER -VERIFY_FAIL_IF_NO_PEER_CERT = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT -VERIFY_CLIENT_ONCE = _lib.SSL_VERIFY_CLIENT_ONCE -VERIFY_NONE = _lib.SSL_VERIFY_NONE - -SESS_CACHE_OFF = _lib.SSL_SESS_CACHE_OFF -SESS_CACHE_CLIENT = _lib.SSL_SESS_CACHE_CLIENT -SESS_CACHE_SERVER = _lib.SSL_SESS_CACHE_SERVER -SESS_CACHE_BOTH = _lib.SSL_SESS_CACHE_BOTH -SESS_CACHE_NO_AUTO_CLEAR = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR -SESS_CACHE_NO_INTERNAL_LOOKUP = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP -SESS_CACHE_NO_INTERNAL_STORE = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE -SESS_CACHE_NO_INTERNAL = _lib.SSL_SESS_CACHE_NO_INTERNAL - -SSL_ST_CONNECT = _lib.SSL_ST_CONNECT -SSL_ST_ACCEPT = _lib.SSL_ST_ACCEPT -SSL_ST_MASK = _lib.SSL_ST_MASK - -SSL_CB_LOOP = _lib.SSL_CB_LOOP -SSL_CB_EXIT = _lib.SSL_CB_EXIT -SSL_CB_READ = _lib.SSL_CB_READ -SSL_CB_WRITE = _lib.SSL_CB_WRITE -SSL_CB_ALERT = _lib.SSL_CB_ALERT -SSL_CB_READ_ALERT = _lib.SSL_CB_READ_ALERT -SSL_CB_WRITE_ALERT = _lib.SSL_CB_WRITE_ALERT -SSL_CB_ACCEPT_LOOP = _lib.SSL_CB_ACCEPT_LOOP -SSL_CB_ACCEPT_EXIT = _lib.SSL_CB_ACCEPT_EXIT -SSL_CB_CONNECT_LOOP = _lib.SSL_CB_CONNECT_LOOP -SSL_CB_CONNECT_EXIT = _lib.SSL_CB_CONNECT_EXIT -SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START -SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE +OP_ALL: int = _lib.SSL_OP_ALL + +VERIFY_PEER: int = _lib.SSL_VERIFY_PEER +VERIFY_FAIL_IF_NO_PEER_CERT: int = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT +VERIFY_CLIENT_ONCE: int = _lib.SSL_VERIFY_CLIENT_ONCE +VERIFY_NONE: int = _lib.SSL_VERIFY_NONE + +SESS_CACHE_OFF: int = _lib.SSL_SESS_CACHE_OFF +SESS_CACHE_CLIENT: int = _lib.SSL_SESS_CACHE_CLIENT +SESS_CACHE_SERVER: int = _lib.SSL_SESS_CACHE_SERVER +SESS_CACHE_BOTH: int = _lib.SSL_SESS_CACHE_BOTH +SESS_CACHE_NO_AUTO_CLEAR: int = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR +SESS_CACHE_NO_INTERNAL_LOOKUP: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP +SESS_CACHE_NO_INTERNAL_STORE: int = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE +SESS_CACHE_NO_INTERNAL: int = _lib.SSL_SESS_CACHE_NO_INTERNAL + +SSL_ST_CONNECT: int = _lib.SSL_ST_CONNECT +SSL_ST_ACCEPT: int = _lib.SSL_ST_ACCEPT +SSL_ST_MASK: int = _lib.SSL_ST_MASK + +SSL_CB_LOOP: int = _lib.SSL_CB_LOOP +SSL_CB_EXIT: int = _lib.SSL_CB_EXIT +SSL_CB_READ: int = _lib.SSL_CB_READ +SSL_CB_WRITE: int = _lib.SSL_CB_WRITE +SSL_CB_ALERT: int = _lib.SSL_CB_ALERT +SSL_CB_READ_ALERT: int = _lib.SSL_CB_READ_ALERT +SSL_CB_WRITE_ALERT: int = _lib.SSL_CB_WRITE_ALERT +SSL_CB_ACCEPT_LOOP: int = _lib.SSL_CB_ACCEPT_LOOP +SSL_CB_ACCEPT_EXIT: int = _lib.SSL_CB_ACCEPT_EXIT +SSL_CB_CONNECT_LOOP: int = _lib.SSL_CB_CONNECT_LOOP +SSL_CB_CONNECT_EXIT: int = _lib.SSL_CB_CONNECT_EXIT +SSL_CB_HANDSHAKE_START: int = _lib.SSL_CB_HANDSHAKE_START +SSL_CB_HANDSHAKE_DONE: int = _lib.SSL_CB_HANDSHAKE_DONE + +_T = TypeVar("_T") +_SetVerifyCallback = Callable[["Connection", X509, int, int, int], bool] class X509VerificationCodes: @@ -431,10 +442,10 @@ class _CallbackExceptionHelper: possible to do so. """ - def __init__(self): - self._problems = [] + def __init__(self) -> None: + self._problems: List[Exception] = [] - def raise_if_problem(self): + def raise_if_problem(self) -> None: """ Raise an exception from the OpenSSL error queue or that was previously captured whe running a callback. @@ -739,7 +750,7 @@ def _asFileDescriptor(obj): return fd -def OpenSSL_version(type): +def OpenSSL_version(type: int) -> bytes: """ Return a string describing the version of OpenSSL in use. @@ -751,7 +762,7 @@ def OpenSSL_version(type): SSLeay_version = OpenSSL_version -def _make_requires(flag, error): +def _make_requires(flag: int, error: str) -> Callable[[_T], _T]: """ Builds a decorator that ensures that functions that rely on OpenSSL functions that are not present in this build raise NotImplementedError, @@ -782,7 +793,7 @@ def explode(*args, **kwargs): _requires_keylog = _make_requires( - getattr(_lib, "Cryptography_HAS_KEYLOG", None), "Key logging not available" + getattr(_lib, "Cryptography_HAS_KEYLOG", 0), "Key logging not available" ) @@ -822,7 +833,7 @@ class Context: DTLS_CLIENT_METHOD: (_lib.DTLS_client_method, None), } - def __init__(self, method): + def __init__(self, method: int) -> None: if not isinstance(method, int): raise TypeError("method must be an integer") @@ -861,7 +872,7 @@ def __init__(self, method): self.set_min_proto_version(version) self.set_max_proto_version(version) - def set_min_proto_version(self, version): + def set_min_proto_version(self, version: int) -> None: """ Set the minimum supported protocol version. Setting the minimum version to 0 will enable protocol versions down to the lowest version @@ -874,7 +885,7 @@ def set_min_proto_version(self, version): _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 ) - def set_max_proto_version(self, version): + def set_max_proto_version(self, version: int) -> None: """ Set the maximum supported protocol version. Setting the maximum version to 0 will enable protocol versions up to the highest version @@ -887,7 +898,11 @@ def set_max_proto_version(self, version): _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 ) - def load_verify_locations(self, cafile, capath=None): + def load_verify_locations( + self, + cafile: Optional[_StrOrBytesPath], + capath: Optional[_StrOrBytesPath] = None, + ) -> None: """ Let SSL know where we can find trusted certificates for the certificate chain. Note that the certificates have to be in PEM format. @@ -897,9 +912,9 @@ def load_verify_locations(self, cafile, capath=None): *pemfile* or *capath* may be :data:`None`. :param cafile: In which file we can find the certificates (``bytes`` or - ``unicode``). + ``str``). :param capath: In which directory we can find the certificates - (``bytes`` or ``unicode``). + (``bytes`` or ``str``). :return: None """ @@ -928,7 +943,11 @@ def wrapper(size, verify, userdata): FILETYPE_PEM, wrapper, more_args=True, truncate=True ) - def set_passwd_cb(self, callback, userdata=None): + def set_passwd_cb( + self, + callback: Callable[[int, bool, Optional[_T]], bytes], + userdata: Optional[_T] = None, + ) -> None: """ Set the passphrase callback. This function will be called when a private key with a passphrase is loaded. @@ -957,7 +976,7 @@ def set_passwd_cb(self, callback, userdata=None): ) self._passphrase_userdata = userdata - def set_default_verify_paths(self): + def set_default_verify_paths(self) -> None: """ Specify that the platform provided CA certificates are to be used for verification purposes. This method has some caveats related to the @@ -1036,12 +1055,12 @@ def _fallback_default_verify_paths(self, file_path, dir_path): self.load_verify_locations(None, capath) break - def use_certificate_chain_file(self, certfile): + def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: """ Load a certificate chain from a file. :param certfile: The name of the certificate chain file (``bytes`` or - ``unicode``). Must be PEM encoded. + ``str``). Must be PEM encoded. :return: None """ @@ -1053,12 +1072,14 @@ def use_certificate_chain_file(self, certfile): if not result: _raise_current_error() - def use_certificate_file(self, certfile, filetype=FILETYPE_PEM): + def use_certificate_file( + self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM + ) -> None: """ Load a certificate from a file :param certfile: The name of the certificate file (``bytes`` or - ``unicode``). + ``str``). :param filetype: (optional) The encoding of the file, which is either :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is :const:`FILETYPE_PEM`. @@ -1075,7 +1096,7 @@ def use_certificate_file(self, certfile, filetype=FILETYPE_PEM): if not use_result: _raise_current_error() - def use_certificate(self, cert): + def use_certificate(self, cert: X509) -> None: """ Load a certificate from a X509 object @@ -1090,7 +1111,7 @@ def use_certificate(self, cert): if not use_result: _raise_current_error() - def add_extra_chain_cert(self, certobj): + def add_extra_chain_cert(self, certobj: X509) -> None: """ Add certificate to chain @@ -1107,17 +1128,19 @@ def add_extra_chain_cert(self, certobj): _lib.X509_free(copy) _raise_current_error() - def _raise_passphrase_exception(self): + def _raise_passphrase_exception(self) -> None: if self._passphrase_helper is not None: self._passphrase_helper.raise_if_problem(Error) _raise_current_error() - def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED): + def use_privatekey_file( + self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM + ) -> None: """ Load a private key from a file - :param keyfile: The name of the key file (``bytes`` or ``unicode``) + :param keyfile: The name of the key file (``bytes`` or ``str``) :param filetype: (optional) The encoding of the file, which is either :const:`FILETYPE_PEM` or :const:`FILETYPE_ASN1`. The default is :const:`FILETYPE_PEM`. @@ -1126,9 +1149,7 @@ def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED): """ keyfile = _path_bytes(keyfile) - if filetype is _UNSPECIFIED: - filetype = FILETYPE_PEM - elif not isinstance(filetype, int): + if not isinstance(filetype, int): raise TypeError("filetype must be an integer") use_result = _lib.SSL_CTX_use_PrivateKey_file( @@ -1137,7 +1158,7 @@ def use_privatekey_file(self, keyfile, filetype=_UNSPECIFIED): if not use_result: self._raise_passphrase_exception() - def use_privatekey(self, pkey): + def use_privatekey(self, pkey: PKey) -> None: """ Load a private key from a PKey object @@ -1152,7 +1173,7 @@ def use_privatekey(self, pkey): if not use_result: self._raise_passphrase_exception() - def check_privatekey(self): + def check_privatekey(self) -> None: """ Check if the private key (loaded with :meth:`use_privatekey`) matches the certificate (loaded with :meth:`use_certificate`) @@ -1162,7 +1183,7 @@ def check_privatekey(self): if not _lib.SSL_CTX_check_private_key(self._context): _raise_current_error() - def load_client_ca(self, cafile): + def load_client_ca(self, cafile: bytes) -> None: """ Load the trusted certificates that will be sent to the client. Does not actually imply any of the certificates are trusted; that must be @@ -1177,7 +1198,7 @@ def load_client_ca(self, cafile): _openssl_assert(ca_list != _ffi.NULL) _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) - def set_session_id(self, buf): + def set_session_id(self, buf: bytes) -> None: """ Set the session id to *buf* within which a session can be reused for this Context object. This is needed when doing session resumption, @@ -1194,7 +1215,7 @@ def set_session_id(self, buf): == 1 ) - def set_session_cache_mode(self, mode): + def set_session_cache_mode(self, mode: int) -> None: """ Set the behavior of the session cache used by all connections using this Context. The previously set mode is returned. See @@ -1211,7 +1232,7 @@ def set_session_cache_mode(self, mode): return _lib.SSL_CTX_set_session_cache_mode(self._context, mode) - def get_session_cache_mode(self): + def get_session_cache_mode(self) -> int: """ Get the current session cache mode. @@ -1221,7 +1242,9 @@ def get_session_cache_mode(self): """ return _lib.SSL_CTX_get_session_cache_mode(self._context) - def set_verify(self, mode, callback=None): + def set_verify( + self, mode: int, callback: Optional[_SetVerifyCallback] = None + ) -> None: """ Set the verification flags for this Context object to *mode* and specify that *callback* should be used for verification callbacks. @@ -1256,7 +1279,7 @@ def set_verify(self, mode, callback=None): self._verify_callback = self._verify_helper.callback _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) - def set_verify_depth(self, depth): + def set_verify_depth(self, depth: int) -> None: """ Set the maximum depth for the certificate chain verification that shall be allowed for this Context object. @@ -1269,7 +1292,7 @@ def set_verify_depth(self, depth): _lib.SSL_CTX_set_verify_depth(self._context, depth) - def get_verify_mode(self): + def get_verify_mode(self) -> int: """ Retrieve the Context object's verify mode, as set by :meth:`set_verify`. @@ -1278,7 +1301,7 @@ def get_verify_mode(self): """ return _lib.SSL_CTX_get_verify_mode(self._context) - def get_verify_depth(self): + def get_verify_depth(self) -> int: """ Retrieve the Context object's verify depth, as set by :meth:`set_verify_depth`. @@ -1287,12 +1310,12 @@ def get_verify_depth(self): """ return _lib.SSL_CTX_get_verify_depth(self._context) - def load_tmp_dh(self, dhfile): + def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: """ Load parameters for Ephemeral Diffie-Hellman :param dhfile: The file to load EDH parameters from (``bytes`` or - ``unicode``). + ``str``). :return: None """ @@ -1308,7 +1331,7 @@ def load_tmp_dh(self, dhfile): res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) _openssl_assert(res == 1) - def set_tmp_ecdh(self, curve): + def set_tmp_ecdh(self, curve: _EllipticCurve) -> None: """ Select a curve to use for ECDHE key exchange. @@ -1320,7 +1343,7 @@ def set_tmp_ecdh(self, curve): """ _lib.SSL_CTX_set_tmp_ecdh(self._context, curve._to_EC_KEY()) - def set_cipher_list(self, cipher_list): + def set_cipher_list(self, cipher_list: bytes) -> None: """ Set the list of ciphers to be used in this context. @@ -1359,7 +1382,9 @@ def set_cipher_list(self, cipher_list): ], ) - def set_client_ca_list(self, certificate_authorities): + def set_client_ca_list( + self, certificate_authorities: Sequence[X509Name] + ) -> None: """ Set the list of preferred client certificate signers for this server context. @@ -1394,7 +1419,7 @@ def set_client_ca_list(self, certificate_authorities): _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) - def add_client_ca(self, certificate_authority): + def add_client_ca(self, certificate_authority: X509) -> None: """ Add the CA certificate to the list of preferred signers for this context. @@ -1415,7 +1440,7 @@ def add_client_ca(self, certificate_authority): ) _openssl_assert(add_result == 1) - def set_timeout(self, timeout): + def set_timeout(self, timeout: int) -> None: """ Set the timeout for newly created sessions for this Context object to *timeout*. The default value is 300 seconds. See the OpenSSL manual @@ -1429,7 +1454,7 @@ def set_timeout(self, timeout): return _lib.SSL_CTX_set_timeout(self._context, timeout) - def get_timeout(self): + def get_timeout(self) -> int: """ Retrieve session timeout, as set by :meth:`set_timeout`. The default is 300 seconds. @@ -1438,7 +1463,9 @@ def get_timeout(self): """ return _lib.SSL_CTX_get_timeout(self._context) - def set_info_callback(self, callback): + def set_info_callback( + self, callback: Callable[["Connection", int, int], None] + ) -> None: """ Set the information callback to *callback*. This function will be called from time to time during SSL handshakes. @@ -1461,7 +1488,9 @@ def wrapper(ssl, where, return_code): _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) @_requires_keylog - def set_keylog_callback(self, callback): + def set_keylog_callback( + self, callback: Callable[["Connection", bytes], None] + ) -> None: """ Set the TLS key logging callback to *callback*. This function will be called whenever TLS key material is generated or received, in order @@ -1485,7 +1514,7 @@ def wrapper(ssl, line): ) _lib.SSL_CTX_set_keylog_callback(self._context, self._keylog_callback) - def get_app_data(self): + def get_app_data(self) -> Any: """ Get the application data (supplied via :meth:`set_app_data()`) @@ -1493,7 +1522,7 @@ def get_app_data(self): """ return self._app_data - def set_app_data(self, data): + def set_app_data(self, data: Any) -> None: """ Set the application data (will be returned from get_app_data()) @@ -1502,7 +1531,7 @@ def set_app_data(self, data): """ self._app_data = data - def get_cert_store(self): + def get_cert_store(self) -> X509Store: """ Get the certificate store for the context. This can be used to add "trusted" certificates without using the @@ -1519,7 +1548,7 @@ def get_cert_store(self): pystore._store = store return pystore - def set_options(self, options): + def set_options(self, options: int) -> None: """ Add options. Options set before are not cleared! This method should be used with the :const:`OP_*` constants. @@ -1532,7 +1561,7 @@ def set_options(self, options): return _lib.SSL_CTX_set_options(self._context, options) - def set_mode(self, mode): + def set_mode(self, mode: int) -> None: """ Add modes via bitmask. Modes set before are not cleared! This method should be used with the :const:`MODE_*` constants. @@ -1545,7 +1574,9 @@ def set_mode(self, mode): return _lib.SSL_CTX_set_mode(self._context, mode) - def set_tlsext_servername_callback(self, callback): + def set_tlsext_servername_callback( + self, callback: Callable[["Connection"], None] + ) -> None: """ Specify a callback function to be called when clients specify a server name. @@ -1568,7 +1599,7 @@ def wrapper(ssl, alert, arg): self._context, self._tlsext_servername_callback ) - def set_tlsext_use_srtp(self, profiles): + def set_tlsext_use_srtp(self, profiles: bytes) -> None: """ Enable support for negotiating SRTP keying material. @@ -1584,7 +1615,7 @@ def set_tlsext_use_srtp(self, profiles): ) @_requires_alpn - def set_alpn_protos(self, protos): + def set_alpn_protos(self, protos: List[bytes]) -> None: """ Specify the protocols that the client is prepared to speak after the TLS connection has been negotiated using Application Layer Protocol @@ -1622,7 +1653,9 @@ def set_alpn_protos(self, protos): ) @_requires_alpn - def set_alpn_select_callback(self, callback): + def set_alpn_select_callback( + self, callback: Callable[["Connection", List[bytes]], None] + ) -> None: """ Specify a callback function that will be called on the server when a client offers protocols using ALPN. @@ -1662,7 +1695,11 @@ def _set_ocsp_callback(self, helper, data): rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) _openssl_assert(rc == 1) - def set_ocsp_server_callback(self, callback, data=None): + def set_ocsp_server_callback( + self, + callback: Callable[["Connection", Optional[_T]], bytes], + data: Optional[_T] = None, + ) -> None: """ Set a callback to provide OCSP data to be stapled to the TLS handshake on the server side. @@ -1680,7 +1717,11 @@ def set_ocsp_server_callback(self, callback, data=None): helper = _OCSPServerCallbackHelper(callback) self._set_ocsp_callback(helper, data) - def set_ocsp_client_callback(self, callback, data=None): + def set_ocsp_client_callback( + self, + callback: Callable[["Connection", bytes, Optional[_T]], bool], + data: Optional[_T] = None, + ) -> None: """ Set a callback to validate OCSP data stapled to the TLS handshake on the client side. @@ -1718,7 +1759,9 @@ def set_cookie_verify_callback(self, callback): class Connection: _reverse_mapping = WeakValueDictionary() - def __init__(self, context, socket=None): + def __init__( + self, context: Context, socket: Optional[socket.socket] = None + ) -> None: """ Create a new Connection object, using the given OpenSSL.SSL.Context instance and socket. @@ -1843,14 +1886,14 @@ def _raise_ssl_error(self, ssl, result): else: _raise_current_error() - def get_context(self): + def get_context(self) -> Context: """ Retrieve the :class:`Context` object associated with this :class:`Connection`. """ return self._context - def set_context(self, context): + def set_context(self, context: Context) -> None: """ Switch this connection to a new session context. @@ -1863,7 +1906,7 @@ def set_context(self, context): _lib.SSL_set_SSL_CTX(self._ssl, context._context) self._context = context - def get_servername(self): + def get_servername(self) -> Optional[bytes]: """ Retrieve the servername extension value if provided in the client hello message, or None if there wasn't one. @@ -1880,7 +1923,9 @@ def get_servername(self): return _ffi.string(name) - def set_verify(self, mode, callback=None): + def set_verify( + self, mode: int, callback: Optional[_SetVerifyCallback] = None + ) -> None: """ Override the Context object's verification flags for this specific connection. See :py:meth:`Context.set_verify` for details. @@ -1900,7 +1945,7 @@ def set_verify(self, mode, callback=None): self._verify_callback = self._verify_helper.callback _lib.SSL_set_verify(self._ssl, mode, self._verify_callback) - def get_verify_mode(self): + def get_verify_mode(self) -> int: """ Retrieve the Connection object's verify mode, as set by :meth:`set_verify`. @@ -1909,7 +1954,7 @@ def get_verify_mode(self): """ return _lib.SSL_get_verify_mode(self._ssl) - def use_certificate(self, cert): + def use_certificate(self, cert: X509) -> None: """ Load a certificate from a X509 object @@ -1924,7 +1969,7 @@ def use_certificate(self, cert): if not use_result: _raise_current_error() - def use_privatekey(self, pkey): + def use_privatekey(self, pkey: PKey) -> None: """ Load a private key from a PKey object @@ -1939,7 +1984,7 @@ def use_privatekey(self, pkey): if not use_result: self._context._raise_passphrase_exception() - def set_ciphertext_mtu(self, mtu): + def set_ciphertext_mtu(self, mtu: int) -> None: """ For DTLS, set the maximum UDP payload size (*not* including IP/UDP overhead). @@ -1953,7 +1998,7 @@ def set_ciphertext_mtu(self, mtu): """ _lib.SSL_set_mtu(self._ssl, mtu) - def get_cleartext_mtu(self): + def get_cleartext_mtu(self) -> int: """ For DTLS, get the maximum size of unencrypted data you can pass to :meth:`write` without exceeding the MTU (as passed to @@ -1968,7 +2013,7 @@ def get_cleartext_mtu(self): raise NotImplementedError("requires OpenSSL 1.1.1 or better") return _lib.DTLS_get_data_mtu(self._ssl) - def set_tlsext_host_name(self, name): + def set_tlsext_host_name(self, name: bytes) -> None: """ Set the value of the servername extension to send in the client hello. @@ -1984,7 +2029,7 @@ def set_tlsext_host_name(self, name): # XXX I guess this can fail sometimes? _lib.SSL_set_tlsext_host_name(self._ssl, name) - def pending(self): + def pending(self) -> int: """ Get the number of bytes that can be safely read from the SSL buffer (**not** the underlying transport buffer). @@ -1993,7 +2038,7 @@ def pending(self): """ return _lib.SSL_pending(self._ssl) - def send(self, buf, flags=0): + def send(self, buf: bytes, flags: int = 0) -> int: """ Send data on the connection. NOTE: If you get one of the WantRead, WantWrite or WantX509Lookup exceptions on this, you have to call the @@ -2021,7 +2066,7 @@ def send(self, buf, flags=0): write = send - def sendall(self, buf, flags=0): + def sendall(self, buf: bytes, flags: int = 0) -> int: """ Send "all" data on the connection. This calls send() repeatedly until all data is sent. If an error occurs, it's impossible to tell how much @@ -2050,7 +2095,7 @@ def sendall(self, buf, flags=0): return total_sent - def recv(self, bufsiz, flags=None): + def recv(self, bufsiz: int, flags: Optional[int] = None) -> bytes: """ Receive data on the connection. @@ -2069,7 +2114,9 @@ def recv(self, bufsiz, flags=None): read = recv - def recv_into(self, buffer, nbytes=None, flags=None): + def recv_into( + self, buffer, nbytes: Optional[int] = None, flags: Optional[int] = None + ) -> int: """ Receive data on the connection and copy it directly into the provided buffer, rather than creating a new string. @@ -2124,7 +2171,7 @@ def _handle_bio_errors(self, bio, result): # TODO: This is untested. _raise_current_error() - def bio_read(self, bufsiz): + def bio_read(self, bufsiz: int) -> bytes: """ If the Connection was created with a memory BIO, this method can be used to read bytes from the write end of that memory BIO. Many @@ -2148,7 +2195,7 @@ def bio_read(self, bufsiz): return _ffi.buffer(buf, result)[:] - def bio_write(self, buf): + def bio_write(self, buf: bytes) -> int: """ If the Connection was created with a memory BIO, this method can be used to add bytes to the read end of that memory BIO. The Connection @@ -2169,7 +2216,7 @@ def bio_write(self, buf): self._handle_bio_errors(self._into_ssl, result) return result - def renegotiate(self): + def renegotiate(self) -> bool: """ Renegotiate the session. @@ -2181,7 +2228,7 @@ def renegotiate(self): return True return False - def do_handshake(self): + def do_handshake(self) -> None: """ Perform an SSL handshake (usually called after :meth:`renegotiate` or one of :meth:`set_accept_state` or :meth:`set_connect_state`). This can @@ -2192,7 +2239,7 @@ def do_handshake(self): result = _lib.SSL_do_handshake(self._ssl) self._raise_ssl_error(self._ssl, result) - def renegotiate_pending(self): + def renegotiate_pending(self) -> bool: """ Check if there's a renegotiation in progress, it will return False once a renegotiation is finished. @@ -2202,7 +2249,7 @@ def renegotiate_pending(self): """ return _lib.SSL_renegotiate_pending(self._ssl) == 1 - def total_renegotiations(self): + def total_renegotiations(self) -> int: """ Find out the total number of renegotiations. @@ -2223,7 +2270,7 @@ def connect(self, addr): _lib.SSL_set_connect_state(self._ssl) return self._socket.connect(addr) - def connect_ex(self, addr): + def connect_ex(self, addr) -> int: """ Call the :meth:`connect_ex` method of the underlying socket and set up SSL on the socket, using the Context object supplied to this Connection @@ -2237,7 +2284,7 @@ def connect_ex(self, addr): self.set_connect_state() return connect_ex(addr) - def accept(self): + def accept(self) -> Tuple["Connection", Any]: """ Call the :meth:`accept` method of the underlying socket and set up SSL on the returned socket, using the Context object supplied to this @@ -2252,7 +2299,7 @@ def accept(self): conn.set_accept_state() return (conn, addr) - def DTLSv1_listen(self): + def DTLSv1_listen(self) -> None: """ Call the OpenSSL function DTLSv1_listen on this connection. See the OpenSSL manual for more details. @@ -2278,7 +2325,7 @@ def DTLSv1_listen(self): if result < 0: self._raise_ssl_error(self._ssl, result) - def DTLSv1_get_timeout(self): + def DTLSv1_get_timeout(self) -> Optional[int]: """ Determine when the DTLS SSL object next needs to perform internal processing due to the passage of time. @@ -2296,7 +2343,7 @@ def DTLSv1_get_timeout(self): else: return None - def DTLSv1_handle_timeout(self): + def DTLSv1_handle_timeout(self) -> bool: """ Handles any timeout events which have become pending on a DTLS SSL object. @@ -2309,7 +2356,7 @@ def DTLSv1_handle_timeout(self): else: return bool(result) - def bio_shutdown(self): + def bio_shutdown(self) -> None: """ If the Connection was created with a memory BIO, this method can be used to indicate that *end of file* has been reached on the read end of @@ -2322,7 +2369,7 @@ def bio_shutdown(self): _lib.BIO_set_mem_eof_return(self._into_ssl, 0) - def shutdown(self): + def shutdown(self) -> bool: """ Send the shutdown message to the Connection. @@ -2339,7 +2386,7 @@ def shutdown(self): else: return False - def get_cipher_list(self): + def get_cipher_list(self) -> List[str]: """ Retrieve the list of ciphers used by the Connection object. @@ -2353,7 +2400,7 @@ def get_cipher_list(self): ciphers.append(_ffi.string(result).decode("utf-8")) return ciphers - def get_client_ca_list(self): + def get_client_ca_list(self) -> List[X509Name]: """ Get CAs whose certificates are suggested for client authentication. @@ -2382,7 +2429,7 @@ def get_client_ca_list(self): result.append(pyname) return result - def makefile(self, *args, **kwargs): + def makefile(self, *args, **kwargs) -> None: """ The makefile() method is not implemented, since there is no dup semantics for SSL connections @@ -2393,7 +2440,7 @@ def makefile(self, *args, **kwargs): "Cannot make file object of OpenSSL.SSL.Connection" ) - def get_app_data(self): + def get_app_data(self) -> Any: """ Retrieve application data as set by :meth:`set_app_data`. @@ -2401,7 +2448,7 @@ def get_app_data(self): """ return self._app_data - def set_app_data(self, data): + def set_app_data(self, data: Any) -> None: """ Set application data @@ -2410,7 +2457,7 @@ def set_app_data(self, data): """ self._app_data = data - def get_shutdown(self): + def get_shutdown(self) -> int: """ Get the shutdown state of the Connection. @@ -2419,7 +2466,7 @@ def get_shutdown(self): """ return _lib.SSL_get_shutdown(self._ssl) - def set_shutdown(self, state): + def set_shutdown(self, state: int) -> None: """ Set the shutdown state of the Connection. @@ -2431,7 +2478,7 @@ def set_shutdown(self, state): _lib.SSL_set_shutdown(self._ssl, state) - def get_state_string(self): + def get_state_string(self) -> bytes: """ Retrieve a verbose string detailing the state of the Connection. @@ -2440,7 +2487,7 @@ def get_state_string(self): """ return _ffi.string(_lib.SSL_state_string_long(self._ssl)) - def server_random(self): + def server_random(self) -> Optional[bytes]: """ Retrieve the random value used with the server hello message. @@ -2455,7 +2502,7 @@ def server_random(self): _lib.SSL_get_server_random(self._ssl, outp, length) return _ffi.buffer(outp, length)[:] - def client_random(self): + def client_random(self) -> Optional[bytes]: """ Retrieve the random value used with the client hello message. @@ -2471,7 +2518,7 @@ def client_random(self): _lib.SSL_get_client_random(self._ssl, outp, length) return _ffi.buffer(outp, length)[:] - def master_key(self): + def master_key(self) -> Optional[bytes]: """ Retrieve the value of the master key for this session. @@ -2487,7 +2534,9 @@ def master_key(self): _lib.SSL_SESSION_get_master_key(session, outp, length) return _ffi.buffer(outp, length)[:] - def export_keying_material(self, label, olen, context=None): + def export_keying_material( + self, label: bytes, olen: int, context: Optional[bytes] = None + ) -> bytes: """ Obtain keying material for application use. @@ -2526,7 +2575,7 @@ def sock_shutdown(self, *args, **kwargs): """ return self._socket.shutdown(*args, **kwargs) - def get_certificate(self): + def get_certificate(self) -> Optional[X509]: """ Retrieve the local certificate (if any) @@ -2538,7 +2587,7 @@ def get_certificate(self): return X509._from_raw_x509_ptr(cert) return None - def get_peer_certificate(self): + def get_peer_certificate(self) -> Optional[X509]: """ Retrieve the other side's certificate (if any) @@ -2550,7 +2599,7 @@ def get_peer_certificate(self): return None @staticmethod - def _cert_stack_to_list(cert_stack): + def _cert_stack_to_list(cert_stack) -> List[X509]: """ Internal helper to convert a STACK_OF(X509) to a list of X509 instances. @@ -2565,7 +2614,7 @@ def _cert_stack_to_list(cert_stack): result.append(pycert) return result - def get_peer_cert_chain(self): + def get_peer_cert_chain(self) -> Optional[List[X509]]: """ Retrieve the other side's certificate (if any) @@ -2578,7 +2627,7 @@ def get_peer_cert_chain(self): return self._cert_stack_to_list(cert_stack) - def get_verified_chain(self): + def get_verified_chain(self) -> Optional[List[X509]]: """ Retrieve the verified certificate chain of the peer including the peer's end entity certificate. It must be called after a session has @@ -2597,7 +2646,7 @@ def get_verified_chain(self): return self._cert_stack_to_list(cert_stack) - def want_read(self): + def want_read(self) -> bool: """ Checks if more data has to be read from the transport layer to complete an operation. @@ -2606,7 +2655,7 @@ def want_read(self): """ return _lib.SSL_want_read(self._ssl) - def want_write(self): + def want_write(self) -> bool: """ Checks if there is data to write to the transport layer to complete an operation. @@ -2615,7 +2664,7 @@ def want_write(self): """ return _lib.SSL_want_write(self._ssl) - def set_accept_state(self): + def set_accept_state(self) -> None: """ Set the connection to work in server mode. The handshake will be handled automatically by read/write. @@ -2624,7 +2673,7 @@ def set_accept_state(self): """ _lib.SSL_set_accept_state(self._ssl) - def set_connect_state(self): + def set_connect_state(self) -> None: """ Set the connection to work in client mode. The handshake will be handled automatically by read/write. @@ -2633,7 +2682,7 @@ def set_connect_state(self): """ _lib.SSL_set_connect_state(self._ssl) - def get_session(self): + def get_session(self) -> Optional[Session]: """ Returns the Session currently used. @@ -2650,7 +2699,7 @@ def get_session(self): pysession._session = _ffi.gc(session, _lib.SSL_SESSION_free) return pysession - def set_session(self, session): + def set_session(self, session: Session) -> None: """ Set the session to be used when the TLS/SSL connection is established. @@ -2665,7 +2714,7 @@ def set_session(self, session): result = _lib.SSL_set_session(self._ssl, session._session) _openssl_assert(result == 1) - def _get_finished_message(self, function): + def _get_finished_message(self, function) -> Optional[bytes]: """ Helper to implement :meth:`get_finished` and :meth:`get_peer_finished`. @@ -2699,7 +2748,7 @@ def _get_finished_message(self, function): function(self._ssl, buf, size) return _ffi.buffer(buf, size)[:] - def get_finished(self): + def get_finished(self) -> Optional[bytes]: """ Obtain the latest TLS Finished message that we sent. @@ -2711,7 +2760,7 @@ def get_finished(self): """ return self._get_finished_message(_lib.SSL_get_finished) - def get_peer_finished(self): + def get_peer_finished(self) -> Optional[bytes]: """ Obtain the latest TLS Finished message that we received from the peer. @@ -2723,13 +2772,13 @@ def get_peer_finished(self): """ return self._get_finished_message(_lib.SSL_get_peer_finished) - def get_cipher_name(self): + def get_cipher_name(self) -> Optional[str]: """ Obtain the name of the currently used cipher. :returns: The name of the currently used cipher or :obj:`None` if no connection has been established. - :rtype: :class:`unicode` or :class:`NoneType` + :rtype: :class:`str` or :class:`NoneType` .. versionadded:: 0.15 """ @@ -2740,7 +2789,7 @@ def get_cipher_name(self): name = _ffi.string(_lib.SSL_CIPHER_get_name(cipher)) return name.decode("utf-8") - def get_cipher_bits(self): + def get_cipher_bits(self) -> Optional[int]: """ Obtain the number of secret bits of the currently used cipher. @@ -2756,13 +2805,13 @@ def get_cipher_bits(self): else: return _lib.SSL_CIPHER_get_bits(cipher, _ffi.NULL) - def get_cipher_version(self): + def get_cipher_version(self) -> Optional[str]: """ Obtain the protocol version of the currently used cipher. :returns: The protocol name of the currently used cipher or :obj:`None` if no connection has been established. - :rtype: :class:`unicode` or :class:`NoneType` + :rtype: :class:`str` or :class:`NoneType` .. versionadded:: 0.15 """ @@ -2773,19 +2822,19 @@ def get_cipher_version(self): version = _ffi.string(_lib.SSL_CIPHER_get_version(cipher)) return version.decode("utf-8") - def get_protocol_version_name(self): + def get_protocol_version_name(self) -> str: """ Retrieve the protocol version of the current connection. :returns: The TLS version of the current connection, for example the value for TLS 1.2 would be ``TLSv1.2``or ``Unknown`` for connections that were not successfully established. - :rtype: :class:`unicode` + :rtype: :class:`str` """ version = _ffi.string(_lib.SSL_get_version(self._ssl)) return version.decode("utf-8") - def get_protocol_version(self): + def get_protocol_version(self) -> int: """ Retrieve the SSL or TLS protocol version of the current connection. @@ -2797,7 +2846,7 @@ def get_protocol_version(self): return version @_requires_alpn - def set_alpn_protos(self, protos): + def set_alpn_protos(self, protos: List[bytes]) -> None: """ Specify the client's ALPN protocol list. @@ -2832,7 +2881,7 @@ def set_alpn_protos(self, protos): ) @_requires_alpn - def get_alpn_proto_negotiated(self): + def get_alpn_proto_negotiated(self) -> bytes: """ Get the protocol that was negotiated by ALPN. @@ -2849,7 +2898,7 @@ def get_alpn_proto_negotiated(self): return _ffi.buffer(data[0], data_len[0])[:] - def get_selected_srtp_profile(self): + def get_selected_srtp_profile(self) -> bytes: """ Get the SRTP protocol which was negotiated. @@ -2862,7 +2911,7 @@ def get_selected_srtp_profile(self): return _ffi.string(profile.name) - def request_ocsp(self): + def request_ocsp(self) -> None: """ Called to request that the server sends stapled OCSP data, if available. If this is not called on the client side then the server diff --git a/src/OpenSSL/py.typed b/src/OpenSSL/py.typed new file mode 100644 index 000000000..e69de29bb