Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow accessing a connection's verfied certificate chain #894

Merged
merged 9 commits into from
Aug 5, 2020
61 changes: 54 additions & 7 deletions src/OpenSSL/SSL.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
X509Name,
X509,
X509Store,
X509StoreContext,
)

__all__ = [
Expand Down Expand Up @@ -2126,6 +2127,22 @@ def get_peer_certificate(self):
return X509._from_raw_x509_ptr(cert)
return None

@staticmethod
def _cert_stack_to_list(cert_stack):
"""
Internal helper to convert a STACK_OF(X509) to a list of X509
instances.
"""
result = []
for i in range(_lib.sk_X509_num(cert_stack)):
cert = _lib.sk_X509_value(cert_stack, i)
_openssl_assert(cert != _ffi.NULL)
res = _lib.X509_up_ref(cert)
_openssl_assert(res >= 1)
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)
return result

def get_peer_cert_chain(self):
"""
Retrieve the other side's certificate (if any)
Expand All @@ -2137,13 +2154,43 @@ def get_peer_cert_chain(self):
if cert_stack == _ffi.NULL:
return None

result = []
for i in range(_lib.sk_X509_num(cert_stack)):
# TODO could incref instead of dup here
cert = _lib.X509_dup(_lib.sk_X509_value(cert_stack, i))
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)
return result
return self._cert_stack_to_list(cert_stack)

def get_verified_chain(self):
"""
Retrieve the verified certificate chain of the peer including the
peer's end entity certificate. It must be called after a session has
been successfully established. If peer verification was not successful
the chain may be incomplete, invalid, or None.

:return: A list of X509 instances giving the peer's verified
certificate chain, or None if it does not have one.

.. versionadded:: 20.0
"""
if hasattr(_lib, "SSL_get0_verified_chain"):
# OpenSSL 1.1+
cert_stack = _lib.SSL_get0_verified_chain(self._ssl)
if cert_stack == _ffi.NULL:
return None

return self._cert_stack_to_list(cert_stack)

pycert = self.get_peer_certificate()
if pycert is None:
return None

# Should never be NULL because the peer presented a certificate.
cert_stack = _lib.SSL_get_peer_cert_chain(self._ssl)
_openssl_assert(cert_stack != _ffi.NULL)

pystore = self._context.get_cert_store()
if pystore is None:
return None

pystorectx = X509StoreContext(pystore, pycert)
pystorectx._add_chain(cert_stack)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just set _chain directly and remove the setter since it doesn't hold any logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return pystorectx.get_verified_chain()

def want_read(self):
"""
Expand Down
48 changes: 47 additions & 1 deletion src/OpenSSL/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -1712,11 +1712,18 @@ def __init__(self, store, certificate):
self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
self._store = store
self._cert = certificate
self._chain = _ffi.NULL
# Make the store context available for use after instantiating this
# class by initializing it now. Per testing, subsequent calls to
# :meth:`_init` have no adverse affect.
self._init()

def _add_chain(self, chain):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed once the other comment is addressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"""
Internal helper to set the untrusted certification chain (peer chain).
"""
self._chain = chain

def _init(self):
"""
Set up the store context for a subsequent verification operation.
Expand All @@ -1725,7 +1732,7 @@ def _init(self):
:meth:`_cleanup` will leak memory.
"""
ret = _lib.X509_STORE_CTX_init(
self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL
self._store_ctx, self._store._store, self._cert._x509, self._chain
)
if ret <= 0:
_raise_current_error()
Expand Down Expand Up @@ -1797,6 +1804,45 @@ def verify_certificate(self):
if ret <= 0:
raise self._exception_from_context()

def get_verified_chain(self):
"""
Verify a certificate in a context and return the complete validated
chain.

:raises X509StoreContextError: If an error occurred when validating a
certificate in the context. Sets ``certificate`` attribute to
indicate which certificate caused the error.

.. versionadded:: 20.0
"""
# Always re-initialize the store context in case
# :meth:`verify_certificate` is called multiple times.
#
# :meth:`_init` is called in :meth:`__init__` so _cleanup is called
# before _init to ensure memory is not leaked.
self._cleanup()
self._init()
ret = _lib.X509_verify_cert(self._store_ctx)
if ret <= 0:
self._cleanup()
raise self._exception_from_context()

# Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
cert_stack = _lib.X509_STORE_CTX_get1_chain(self._store_ctx)
_openssl_assert(cert_stack != _ffi.NULL)

result = []
for i in range(_lib.sk_X509_num(cert_stack)):
cert = _lib.sk_X509_value(cert_stack, i)
_openssl_assert(cert != _ffi.NULL)
pycert = X509._from_raw_x509_ptr(cert)
result.append(pycert)

# Free the stack but not the members which are freed by the X509 class.
_lib.sk_X509_free(cert_stack)
self._cleanup()
return result


def load_certificate(type, buffer):
"""
Expand Down
35 changes: 35 additions & 0 deletions tests/test_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -3849,6 +3849,41 @@ def test_verify_with_time(self):

assert exc.value.args[0][2] == "certificate has expired"

def test_get_verified_chain(self):
"""
`get_verified_chain` returns the verified chain.
"""
store = X509Store()
store.add_cert(self.root_cert)
store.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)
chain = store_ctx.get_verified_chain()
assert len(chain) == 3
intermediate_subject = self.intermediate_server_cert.get_subject()
assert chain[0].get_subject() == intermediate_subject
assert chain[1].get_subject() == self.intermediate_cert.get_subject()
assert chain[2].get_subject() == self.root_cert.get_subject()
# Test reuse
chain = store_ctx.get_verified_chain()
assert len(chain) == 3
assert chain[0].get_subject() == intermediate_subject
assert chain[1].get_subject() == self.intermediate_cert.get_subject()
assert chain[2].get_subject() == self.root_cert.get_subject()

def test_get_verified_chain_invalid_chain_no_root(self):
"""
`get_verified_chain` raises error when cert verification fails.
"""
store = X509Store()
store.add_cert(self.intermediate_cert)
store_ctx = X509StoreContext(store, self.intermediate_server_cert)

with pytest.raises(X509StoreContextError) as exc:
store_ctx.get_verified_chain()

assert exc.value.args[0][2] == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"


class TestSignVerify(object):
"""
Expand Down
57 changes: 57 additions & 0 deletions tests/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2445,6 +2445,63 @@ def test_get_peer_cert_chain_none(self):
interact_in_memory(client, server)
assert None is server.get_peer_cert_chain()

def test_get_verified_chain(self):
"""
`Connection.get_verified_chain` returns a list of certificates
which the connected server returned for the certification verification.
"""
chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain

serverContext = Context(SSLv23_METHOD)
serverContext.use_privatekey(skey)
serverContext.use_certificate(scert)
serverContext.add_extra_chain_cert(icert)
serverContext.add_extra_chain_cert(cacert)
server = Connection(serverContext, None)
server.set_accept_state()

# Create the client
clientContext = Context(SSLv23_METHOD)
# cacert is self-signed so the client must trust it for verification
# to succeed.
clientContext.get_cert_store().add_cert(cacert)
clientContext.set_verify(VERIFY_PEER, verify_cb)
client = Connection(clientContext, None)
client.set_connect_state()

interact_in_memory(client, server)

chain = client.get_verified_chain()
assert len(chain) == 3
assert "Server Certificate" == chain[0].get_subject().CN
assert "Intermediate Certificate" == chain[1].get_subject().CN
assert "Authority Certificate" == chain[2].get_subject().CN

def test_get_verified_chain_none(self):
"""
`Connection.get_verified_chain` returns `None` if the peer sends
no certificate chain.
"""
ctx = Context(SSLv23_METHOD)
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
server = Connection(ctx, None)
server.set_accept_state()
client = Connection(Context(SSLv23_METHOD), None)
client.set_connect_state()
interact_in_memory(client, server)
assert None is server.get_verified_chain()

def test_get_verified_chain_unconnected(self):
"""
`Connection.get_verified_chain` returns `None` when used with an object
which has not been connected.
"""
ctx = Context(SSLv23_METHOD)
server = Connection(ctx, None)
assert None is server.get_verified_chain()

def test_get_session_unconnected(self):
"""
`Connection.get_session` returns `None` when used with an object
Expand Down