Merge "Add support for virtual hosts"

This commit is contained in:
Jenkins 2017-09-07 03:27:50 +00:00 committed by Gerrit Code Review
commit ff905655e9
6 changed files with 242 additions and 57 deletions

View File

@ -104,40 +104,49 @@ class Addresser(object):
"""Address used for shared subscribers (competing consumers)
"""
def _concat(self, sep, items):
return sep.join(filter(bool, items))
class LegacyAddresser(Addresser):
"""Legacy addresses are in the following format:
multicast: '$broadcast_prefix.$exchange.$topic.all'
unicast: '$server_prefix.$exchange.$topic.$server'
anycast: '$group_prefix.$exchange.$topic'
multicast: '$broadcast_prefix[.$vhost].$exchange.$topic.all'
unicast: '$server_prefix[.$vhost].$exchange.$topic.$server'
anycast: '$group_prefix[.$vhost].$exchange.$topic'
Legacy addresses do not distinguish RPC traffic from Notification traffic
"""
def __init__(self, default_exchange, server_prefix, broadcast_prefix,
group_prefix):
group_prefix, vhost):
super(LegacyAddresser, self).__init__(default_exchange)
self._server_prefix = server_prefix
self._broadcast_prefix = broadcast_prefix
self._group_prefix = group_prefix
self._vhost = vhost
def multicast_address(self, target, service):
return self._concatenate([self._broadcast_prefix,
target.exchange or self._default_exchange,
target.topic, "all"])
return self._concat(".",
[self._broadcast_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic,
"all"])
def unicast_address(self, target, service=SERVICE_RPC):
return self._concatenate([self._server_prefix,
target.exchange or self._default_exchange,
target.topic, target.server])
return self._concat(".",
[self._server_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic,
target.server])
def anycast_address(self, target, service=SERVICE_RPC):
return self._concatenate([self._group_prefix,
target.exchange or self._default_exchange,
target.topic])
def _concatenate(self, items):
return ".".join(filter(bool, items))
return self._concat(".",
[self._group_prefix,
self._vhost,
target.exchange or self._default_exchange,
target.topic])
# for debug:
def _is_multicast(self, address):
@ -162,21 +171,25 @@ class RoutableAddresser(Addresser):
'anycast'. The delivery semantics are followed by information pulled from
the Target. The template is:
$prefix/$semantics/$exchange/$topic[/$server]
$prefix/$semantics[/$vhost]/$exchange/$topic[/$server]
Examples based on the default prefix and semantic values:
rpc-unicast: "openstack.org/om/rpc/unicast/$exchange/$topic/$server"
notify-anycast: "openstack.org/om/notify/anycast/$exchange/$topic"
rpc-unicast: "openstack.org/om/rpc/unicast/my-exchange/my-topic/my-server"
notify-anycast: "openstack.org/om/notify/anycast/my-vhost/exchange/topic"
"""
def __init__(self, default_exchange, rpc_exchange, rpc_prefix,
notify_exchange, notify_prefix, unicast_tag, multicast_tag,
anycast_tag):
anycast_tag, vhost):
super(RoutableAddresser, self).__init__(default_exchange)
if not self._default_exchange:
self._default_exchange = "openstack"
# templates for address generation:
self._vhost = vhost
_rpc = rpc_prefix + "/"
self._rpc_prefix = _rpc
self._rpc_unicast = _rpc + unicast_tag
@ -201,32 +214,34 @@ class RoutableAddresser(Addresser):
prefix = self._rpc_multicast
else:
prefix = self._notify_multicast
return "%s/%s/%s" % (prefix,
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic)
target.topic])
def unicast_address(self, target, service=SERVICE_RPC):
if service == SERVICE_RPC:
prefix = self._rpc_unicast
else:
prefix = self._notify_unicast
if target.server:
return "%s/%s/%s/%s" % (prefix,
target.exchange or self._exchange[service],
target.topic,
target.server)
return "%s/%s/%s" % (prefix,
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic)
target.topic,
target.server])
def anycast_address(self, target, service=SERVICE_RPC):
if service == SERVICE_RPC:
prefix = self._rpc_anycast
else:
prefix = self._notify_anycast
return "%s/%s/%s" % (prefix,
return self._concat("/",
[prefix,
self._vhost,
target.exchange or self._exchange[service],
target.topic)
target.topic])
# for debug:
def _is_multicast(self, address):
@ -255,7 +270,7 @@ class AddresserFactory(object):
self._mode = mode
self._kwargs = kwargs
def __call__(self, remote_properties):
def __call__(self, remote_properties, vhost=None):
# for backwards compatibility use legacy if dynamic and we're connected
# to qpidd or we cannot identify the message bus. This can be
# overridden via the configuration.
@ -275,7 +290,8 @@ class AddresserFactory(object):
return LegacyAddresser(self._default_exchange,
self._kwargs['legacy_server_prefix'],
self._kwargs['legacy_broadcast_prefix'],
self._kwargs['legacy_group_prefix'])
self._kwargs['legacy_group_prefix'],
vhost)
else:
return RoutableAddresser(self._default_exchange,
self._kwargs.get("rpc_exchange"),
@ -284,4 +300,5 @@ class AddresserFactory(object):
self._kwargs["notify_prefix"],
self._kwargs["unicast"],
self._kwargs["multicast"],
self._kwargs["anycast"])
self._kwargs["anycast"],
vhost)

View File

@ -768,11 +768,12 @@ class Hosts(object):
configuration and are used only if no username/password/realm is present in
the URL.
"""
def __init__(self, entries=None, default_username=None,
def __init__(self, url, default_username=None,
default_password=None,
default_realm=None):
if entries:
self._entries = entries[:]
self.virtual_host = url.virtual_host
if url.hosts:
self._entries = url.hosts[:]
else:
self._entries = [transport.TransportHost(hostname="localhost",
port=5672)]
@ -797,7 +798,8 @@ class Hosts(object):
return '<Hosts ' + str(self) + '>'
def __str__(self):
return ", ".join(["%r" % th for th in self._entries])
r = ', vhost=%s' % self.virtual_host if self.virtual_host else ''
return ", ".join(["%r" % th for th in self._entries]) + r
class Controller(pyngus.ConnectionEventHandler):
@ -807,7 +809,7 @@ class Controller(pyngus.ConnectionEventHandler):
work is done on the Eventloop thread, allowing the driver to run
asynchronously from the messaging clients.
"""
def __init__(self, hosts, default_exchange, config):
def __init__(self, url, default_exchange, config):
self.processor = None
self._socket_connection = None
self._node = platform.node() or "<UNKNOWN>"
@ -839,10 +841,12 @@ class Controller(pyngus.ConnectionEventHandler):
self.ssl_key_password = config.oslo_messaging_amqp.ssl_key_password
self.ssl_allow_insecure = \
config.oslo_messaging_amqp.allow_insecure_clients
self.ssl_verify_vhost = config.oslo_messaging_amqp.ssl_verify_vhost
self.pseudo_vhost = config.oslo_messaging_amqp.pseudo_vhost
self.sasl_mechanisms = config.oslo_messaging_amqp.sasl_mechanisms
self.sasl_config_dir = config.oslo_messaging_amqp.sasl_config_dir
self.sasl_config_name = config.oslo_messaging_amqp.sasl_config_name
self.hosts = Hosts(hosts, config.oslo_messaging_amqp.username,
self.hosts = Hosts(url, config.oslo_messaging_amqp.username,
config.oslo_messaging_amqp.password,
config.oslo_messaging_amqp.sasl_default_realm)
self.conn_retry_interval = \
@ -974,20 +978,44 @@ class Controller(pyngus.ConnectionEventHandler):
host = self.hosts.current
conn_props = {'properties': {u'process': self._command,
u'pid': self._pid,
u'node': self._node},
'hostname': host.hostname}
u'node': self._node}}
# only set hostname in the AMQP 1.0 Open performative if the message
# bus can interpret it as the virtual host. We leave it unspecified
# since apparently noone can agree on how it should be used otherwise!
if self.hosts.virtual_host and not self.pseudo_vhost:
conn_props['hostname'] = self.hosts.virtual_host
if self.idle_timeout:
conn_props["idle-time-out"] = float(self.idle_timeout)
if self.trace_protocol:
conn_props["x-trace-protocol"] = self.trace_protocol
# SSL configuration
ssl_enabled = False
if self.ssl:
ssl_enabled = True
conn_props["x-ssl"] = self.ssl
if self.ssl_ca_file:
conn_props["x-ssl-ca-file"] = self.ssl_ca_file
ssl_enabled = True
if self.ssl_cert_file:
ssl_enabled = True
conn_props["x-ssl-identity"] = (self.ssl_cert_file,
self.ssl_key_file,
self.ssl_key_password)
if ssl_enabled:
# Set the identity of the remote server for SSL to use when
# verifying the received certificate. Typically this is the DNS
# name used to set up the TCP connections. However some servers
# may provide a certificate for the virtual host instead. If that
# is the case we need to use the virtual hostname instead.
# Refer to SSL Server Name Indication (SNI) for the entire story:
# https://tools.ietf.org/html/rfc6066
if self.ssl_verify_vhost:
if self.hosts.virtual_host:
conn_props['x-ssl-peer-name'] = self.hosts.virtual_host
else:
conn_props['x-ssl-peer-name'] = host.hostname
# SASL configuration:
if self.sasl_mechanisms:
conn_props["x-sasl-mechs"] = self.sasl_mechanisms
@ -1052,9 +1080,12 @@ class Controller(pyngus.ConnectionEventHandler):
point, we are ready to receive messages, so start all pending RPC
requests.
"""
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"),
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s%(vhost)s)"),
{'hostname': self.hosts.current.hostname,
'port': self.hosts.current.port})
'port': self.hosts.current.port,
'vhost': ("/" + self.hosts.virtual_host
if self.hosts.virtual_host else "")})
for sender in itervalues(self._all_senders):
sender.attach(self._socket_connection.pyngus_conn,
self.reply_link, self.addresser)
@ -1099,7 +1130,9 @@ class Controller(pyngus.ConnectionEventHandler):
# allocate an addresser based on the advertised properties of the
# message bus
props = connection.remote_properties or {}
self.addresser = self.addresser_factory(props)
self.addresser = self.addresser_factory(props,
self.hosts.virtual_host
if self.pseudo_vhost else None)
for servers in itervalues(self._servers):
for server in itervalues(servers):
server.attach(self._socket_connection.pyngus_conn,

View File

@ -346,7 +346,15 @@ class Thread(threading.Thread):
"""Run the proton event/timer loop."""
LOG.debug("Starting Proton thread, container=%s",
self._container.name)
try:
self._main_loop()
except Exception:
# unknown error - fatal
LOG.exception("Fatal unhandled event loop error!")
raise
def _main_loop(self):
# Main event loop
while not self._shutdown:
readfds = [self._requests]

View File

@ -60,6 +60,17 @@ amqp1_opts = [
secret=True,
help='Password for decrypting ssl_key_file (if encrypted)'),
cfg.BoolOpt('ssl_verify_vhost',
default=False,
help="By default SSL checks that the name in the server's"
" certificate matches the hostname in the transport_url. In"
" some configurations it may be preferable to use the virtual"
" hostname instead, for example if the server uses the Server"
" Name Indication TLS extension (rfc6066) to provide a"
" certificate per virtual host. Set ssl_verify_vhost to True"
" if the server's SSL certificate uses the virtual host name"
" instead of the DNS name."),
cfg.BoolOpt('allow_insecure_clients',
default=False,
deprecated_group='amqp1',
@ -172,6 +183,17 @@ amqp1_opts = [
"'dynamic' - use legacy addresses if the message bus does not"
" support routing otherwise use routable addressing"),
cfg.BoolOpt('pseudo_vhost',
default=True,
help="Enable virtual host support for those message buses"
" that do not natively support virtual hosting (such as"
" qpidd). When set to true the virtual host name will be"
" added to all message bus addresses, effectively creating"
" a private 'subnet' per virtual host. Set to False if the"
" message bus supports virtual hosting using the 'hostname'"
" field in the AMQP 1.0 Open performative as the name of the"
" virtual host."),
# Legacy addressing customization:
cfg.StrOpt('server_request_prefix',

View File

@ -202,7 +202,6 @@ class ProtonDriver(base.BaseDriver):
conf.register_opts(opts.amqp1_opts, group=opt_group)
conf = common.ConfigOptsProxy(conf, url, opt_group.name)
self._hosts = url.hosts
self._conf = conf
self._default_exchange = default_exchange
@ -257,7 +256,7 @@ class ProtonDriver(base.BaseDriver):
self._ctrl = None
# Create a Controller that connects to the messaging
# service:
self._ctrl = controller.Controller(self._hosts,
self._ctrl = controller.Controller(self._url,
self._default_exchange,
self._conf)
self._ctrl.connect()

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import logging
import os
import select
@ -1515,19 +1516,12 @@ class TestSSL(test_utils.BaseTestCase):
self._tmpdir = None
self.skipTest("OpenSSL tools not installed - skipping")
def test_server_ok(self):
# test client authenticates server
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = oslo_messaging.TransportURL.parse(self.conf, "amqp://%s:%d" %
(self._broker.host,
self._broker.port))
def _ssl_server_ok(self, url):
self._broker.start()
self.config(ssl_ca_file=self._ssl_config['ca_cert'],
group='oslo_messaging_amqp')
driver = amqp_driver.ProtonDriver(self.conf, url)
tport_url = oslo_messaging.TransportURL.parse(self.conf, url)
driver = amqp_driver.ProtonDriver(self.conf, tport_url)
target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread(
driver.listen(target, None, None)._poll_style_listener, 1)
@ -1541,6 +1535,34 @@ class TestSSL(test_utils.BaseTestCase):
self.assertFalse(listener.isAlive())
driver.cleanup()
def test_server_ok(self):
# test client authenticates server
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = "amqp://%s:%d" % (self._broker.host, self._broker.port)
self._ssl_server_ok(url)
def test_server_ignore_vhost_ok(self):
# test client authenticates server and ignores vhost
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = "amqp://%s:%d/my-vhost" % (self._broker.host, self._broker.port)
self._ssl_server_ok(url)
def test_server_check_vhost_ok(self):
# test client authenticates server using vhost as CN
# Use 'Invalid' from bad_cert CN
self.config(ssl_verify_vhost=True, group='oslo_messaging_amqp')
self._ssl_config['s_cert'] = self._ssl_config['bad_cert']
self._ssl_config['s_key'] = self._ssl_config['bad_key']
self._broker = FakeBroker(self.conf.oslo_messaging_amqp,
sock_addr=self._ssl_config['s_name'],
ssl_config=self._ssl_config)
url = "amqp://%s:%d/Invalid" % (self._broker.host, self._broker.port)
self._ssl_server_ok(url)
@mock.patch('ssl.get_default_verify_paths')
def test_server_ok_with_ssl_set_in_transport_url(self, mock_verify_paths):
# test client authenticates server
@ -1630,6 +1652,90 @@ class TestSSL(test_utils.BaseTestCase):
super(TestSSL, self).tearDown()
@testtools.skipUnless(pyngus, "proton modules not present")
class TestVHost(_AmqpBrokerTestCaseAuto):
"""Verify the pseudo virtual host behavior"""
def _vhost_test(self):
"""Verify that all messaging for a particular vhost stays on that vhost
"""
self.config(pseudo_vhost=True,
group="oslo_messaging_amqp")
vhosts = ["None", "HOSTA", "HOSTB", "HOSTC"]
target = oslo_messaging.Target(topic="test-topic")
fanout = oslo_messaging.Target(topic="test-topic", fanout=True)
listeners = {}
ldrivers = {}
sdrivers = {}
replies = {}
msgs = {}
for vhost in vhosts:
url = copy.copy(self._broker_url)
url.virtual_host = vhost if vhost != "None" else None
ldriver = amqp_driver.ProtonDriver(self.conf, url)
listeners[vhost] = _ListenerThread(
ldriver.listen(target, None, None)._poll_style_listener,
10)
ldrivers[vhost] = ldriver
sdrivers[vhost] = amqp_driver.ProtonDriver(self.conf, url)
replies[vhost] = []
msgs[vhost] = []
# send a fanout and a single rpc call to each listener
for vhost in vhosts:
if vhost == "HOSTC": # expect no messages to HOSTC
continue
sdrivers[vhost].send(fanout,
{"context": vhost},
{"vhost": vhost,
"fanout": True,
"id": vhost})
replies[vhost].append(sdrivers[vhost].send(target,
{"context": vhost},
{"method": "echo",
"id": vhost},
wait_for_reply=True))
time.sleep(1)
for vhost in vhosts:
msgs[vhost] += listeners[vhost].get_messages()
if vhost == "HOSTC":
# HOSTC should get nothing
self.assertEqual(0, len(msgs[vhost]))
self.assertEqual(0, len(replies[vhost]))
continue
self.assertEqual(2, len(msgs[vhost]))
for m in msgs[vhost]:
# the id must match the vhost
self.assertEqual(vhost, m.message.get("id"))
self.assertEqual(1, len(replies[vhost]))
for m in replies[vhost]:
# same for correlation id
self.assertEqual(vhost, m.get("correlation-id"))
for vhost in vhosts:
listeners[vhost].kill()
ldrivers[vhost].cleanup
sdrivers[vhost].cleanup()
def test_vhost_routing(self):
"""Test vhost using routable addresses
"""
self.config(addressing_mode='routable', group="oslo_messaging_amqp")
self._vhost_test()
def test_vhost_legacy(self):
"""Test vhost using legacy addresses
"""
self.config(addressing_mode='legacy', group="oslo_messaging_amqp")
self._vhost_test()
class FakeBroker(threading.Thread):
"""A test AMQP message 'broker'."""