By default use the most secure verification mode

This depends on the settings for 'x-ssl-peer-name' and the
availability of a proper CA file.
This commit is contained in:
Kenneth Giusti 2017-02-24 12:49:06 -05:00
parent 1d6965c798
commit 5949700ec5
5 changed files with 116 additions and 33 deletions

View File

@ -7,6 +7,16 @@ callback-based API for message passing.
See the User Guide in the docs directory for more detail.
## Release 2.2.0 ##
* Can now use the system's default CA by specifying the 'x-ssl' option
in the 'properties' field of the create_connection call and _NOT_
specifying the 'x-ssl-ca-file' property. (contributed by
Juan Antonio Osorio Robles)
* use the most secure default setting for x-ssl-verify-mode based on
the configuration
* bump max proton version to 0.17
## Release 2.1.4 ##
* avoid using deprecated next_tick in the container

View File

@ -23,4 +23,4 @@ from pyngus.link import SenderLink, SenderEventHandler
from pyngus.sockets import read_socket_input
from pyngus.sockets import write_socket_output
VERSION = (2, 1, 4) # major, minor, fix
VERSION = (2, 2, 0) # major, minor, fix

View File

@ -109,6 +109,16 @@ class Connection(Endpoint):
'x-sasl-mechs', 'x-sasl-config-dir',
'x-sasl-config-name', 'x-force-sasl'])
# set of all SSL connection configuration properties
_SSL_PROPS = set(['x-ssl', 'x-ssl-identity', 'x-ssl-ca-file',
'x-ssl-verify-mode', 'x-ssl-server',
'x-ssl-peer-name', 'x-ssl-allow-cleartext'])
# SSL peer certificate verification
_VERIFY_MODES = {'verify-peer': proton.SSLDomain.VERIFY_PEER_NAME,
'verify-cert': proton.SSLDomain.VERIFY_PEER,
'no-verify': proton.SSLDomain.ANONYMOUS_PEER}
def _not_reentrant(func):
"""Decorator that prevents callbacks from calling into methods that are
not reentrant
@ -185,7 +195,9 @@ class Connection(Endpoint):
x-ssl-ca-file: string, path to a file containing the certificates of
the trusted Certificate Authorities that will be used to check the
signature of the peer's certificate.
signature of the peer's certificate. Not used if x-ssl-verify-mode
is set to 'no-verify'. To use the system's default CAs instead leave
this option out and set x-ssl to True.
x-ssl-verify-mode: string, configure the level of security provided by
SSL. Possible values:
@ -723,11 +735,9 @@ class Connection(Endpoint):
self._error = error
def _configure_ssl(self, properties):
if not properties:
if (not properties or
not self._SSL_PROPS.intersection(set(iter(properties)))):
return None
verify_modes = {'verify-peer': proton.SSLDomain.VERIFY_PEER_NAME,
'verify-cert': proton.SSLDomain.VERIFY_PEER,
'no-verify': proton.SSLDomain.ANONYMOUS_PEER}
mode = proton.SSLDomain.MODE_CLIENT
if properties.get('x-ssl-server', properties.get('x-server')):
@ -735,14 +745,32 @@ class Connection(Endpoint):
identity = properties.get('x-ssl-identity')
ca_file = properties.get('x-ssl-ca-file')
if properties.get('x-ssl') and not ca_file:
ca_file = ssl.get_default_verify_paths().cafile
hostname = properties.get('x-ssl-peer-name',
properties.get('hostname'))
# default to most secure level of certificate validation
if not ca_file:
vdefault = 'no-verify'
elif not hostname:
vdefault = 'verify-cert'
else:
vdefault = 'verify-peer'
if not identity and not ca_file:
return None # SSL not configured
vmode = properties.get('x-ssl-verify-mode', vdefault)
try:
vmode = self._VERIFY_MODES[vmode]
except KeyError:
raise proton.SSLException("bad value for x-ssl-verify-mode: '%s'" %
vmode)
if vmode == proton.SSLDomain.VERIFY_PEER_NAME:
if not hostname or not ca_file:
raise proton.SSLException("verify-peer needs x-ssl-peer-name"
" and x-ssl-ca-file")
elif vmode == proton.SSLDomain.VERIFY_PEER:
if not ca_file:
raise proton.SSLException("verify-cert needs x-ssl-ca-file")
hostname = None
# This will throw proton.SSLUnavailable if SSL support is not installed
domain = proton.SSLDomain(mode)
if identity:
@ -751,17 +779,7 @@ class Connection(Endpoint):
if ca_file:
# how we verify peers:
domain.set_trusted_ca_db(ca_file)
hostname = properties.get('x-ssl-peer-name',
properties.get('hostname'))
vdefault = 'verify-peer' if hostname else 'verify-cert'
vmode = verify_modes.get(properties.get('x-ssl-verify-mode',
vdefault))
# check for configuration error
if not vmode:
raise proton.SSLException("bad value for x-ssl-verify-mode")
if vmode == proton.SSLDomain.VERIFY_PEER_NAME and not hostname:
raise proton.SSLException("verify-peer needs x-ssl-peer-name")
domain.set_peer_authentication(vmode, ca_file)
domain.set_peer_authentication(vmode, ca_file)
if mode == proton.SSLDomain.MODE_SERVER:
if properties.get('x-ssl-allow-cleartext'):
domain.allow_unsecured_client()

View File

@ -17,9 +17,10 @@
# specific language governing permissions and limitations
# under the License.
import os
from setuptools import setup
_VERSION = "2.1.4" # NOTE: update __init__.py too!
_VERSION = "2.2.0" # NOTE: update __init__.py too!
# I hack, therefore I am (productive) Some distros (which will not be named)
# don't use setup.py to install the proton python module. In this case, pip
@ -33,11 +34,11 @@ try:
except ImportError:
# this version of proton will download and install the proton shared
# library as well:
_dependencies = ['python-qpid-proton>=0.9,<0.17']
_dependencies = ['python-qpid-proton>=0.9,<0.18']
setup(name="pyngus",
version=_VERSION,
version=_VERSION + os.environ.get('PYNGUS_VERSION_SUFFIX', ''),
author="kgiusti",
author_email="kgiusti@apache.org",
packages=["pyngus"],
@ -49,4 +50,10 @@ setup(name="pyngus",
classifiers=["License :: OSI Approved :: Apache Software License",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python"])
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5"])

View File

@ -348,6 +348,16 @@ class APITest(common.Test):
assert args.properties.get("distribution-mode") == "move"
assert args.properties.get("target-address") == "my-target"
def _ssl_connect(self, s_props, c_props):
server = self.container1.create_connection("server",
properties=s_props)
client = self.container2.create_connection("client",
properties=c_props)
server.open()
client.open()
common.process_connections(server, client)
assert server.active and client.active
def _test_ssl(self,
server_password="server-password",
server_dns="some.server.com",
@ -370,8 +380,6 @@ class APITest(common.Test):
s_props['x-ssl-verify-mode'] = 'verify-peer'
s_props['x-ssl-peer-name'] = client_dns
server = self.container1.create_connection("server",
properties=s_props)
c_props = {}
if use_system_ca_bundle:
c_props.update({"x-ssl": True})
@ -388,12 +396,7 @@ class APITest(common.Test):
c_props['x-ssl-identity'] = (_testpath("client-certificate.pem"),
_testpath("client-private-key.pem"),
client_password)
client = self.container2.create_connection("client",
properties=c_props)
server.open()
client.open()
common.process_connections(server, client)
assert server.active and client.active
self._ssl_connect(s_props, c_props)
def test_ssl_ok(self):
try:
@ -465,6 +468,51 @@ class APITest(common.Test):
# connection setup should fail
pass
def test_ssl_bad_verify_mode(self):
try:
self._ssl_connect({'x-ssl-server': True,
'x-ssl-verify-mode': 'snagglepus'},
{'x-ssl-verify-mode': 'no-verify'})
assert False, "error expected!"
except SSLUnavailable:
raise common.Skipped("SSL not available.")
except SSLException:
# expected to fail
pass
def test_ssl_bad_no_cert(self):
# error: require verification, but no CA given
try:
self._ssl_connect({'x-ssl-server': True},
{'x-ssl-verify-mode': 'verify-cert'})
assert False, "error expected!"
except SSLUnavailable:
raise common.Skipped("SSL not available.")
except SSLException:
# expected to fail
pass
def test_ssl_bad_no_peer_name(self):
# error: require peer verification, CA supplied but no name given
try:
self._ssl_connect({'x-ssl-server': True},
{'x-ssl-verify-mode': 'verify-peer',
'x-ssl-ca-file': 'fakefile'})
assert False, "error expected!"
except SSLUnavailable:
raise common.Skipped("SSL not available.")
except SSLException:
# expected to fail
pass
def test_ssl_minimal(self):
# uses anonymous SSL
try:
self._ssl_connect({'x-ssl-server': True},
{'x-ssl-verify-mode': 'no-verify'})
except SSLUnavailable:
raise common.Skipped("SSL not available.")
def test_io_input_close(self):
"""Premature input close should trigger failed callback."""
cb1 = common.ConnCallback()