Merge "Reduce `magic` conf attribute usage"

This commit is contained in:
Jenkins 2015-06-08 16:40:22 +00:00 committed by Gerrit Code Review
commit 02ab25ecc6
1 changed files with 79 additions and 56 deletions

View File

@ -156,7 +156,7 @@ rabbit_opts = [
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def _get_queue_arguments(conf): def _get_queue_arguments(rabbit_ha_queues):
"""Construct the arguments for declaring a queue. """Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we declare a mirrored queue If the rabbit_ha_queues option is set, we declare a mirrored queue
@ -167,7 +167,7 @@ def _get_queue_arguments(conf):
Setting x-ha-policy to all means that the queue will be mirrored Setting x-ha-policy to all means that the queue will be mirrored
to all nodes in the cluster. to all nodes in the cluster.
""" """
return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} return {'x-ha-policy': 'all'} if rabbit_ha_queues else {}
class RabbitMessage(dict): class RabbitMessage(dict):
@ -186,8 +186,8 @@ class RabbitMessage(dict):
class Consumer(object): class Consumer(object):
"""Consumer class.""" """Consumer class."""
def __init__(self, conf, exchange_name, queue_name, routing_key, type, def __init__(self, exchange_name, queue_name, routing_key, type, durable,
durable, auto_delete, callback, nowait=True): auto_delete, callback, nowait=True, rabbit_ha_queues=None):
"""Init the Publisher class with the exchange_name, routing_key, """Init the Publisher class with the exchange_name, routing_key,
type, durable auto_delete type, durable auto_delete
""" """
@ -199,7 +199,7 @@ class Consumer(object):
self.callback = callback self.callback = callback
self.type = type self.type = type
self.nowait = nowait self.nowait = nowait
self.queue_arguments = _get_queue_arguments(conf) self.queue_arguments = _get_queue_arguments(rabbit_ha_queues)
self.queue = None self.queue = None
self.exchange = kombu.entity.Exchange( self.exchange = kombu.entity.Exchange(
@ -377,26 +377,50 @@ class Connection(object):
pools = {} pools = {}
def __init__(self, conf, url, purpose): def __init__(self, conf, url, purpose):
self.conf = conf # NOTE(viktors): Parse config options
self.driver_conf = self.conf.oslo_messaging_rabbit driver_conf = conf.oslo_messaging_rabbit
self.max_retries = self.driver_conf.rabbit_max_retries
self.max_retries = driver_conf.rabbit_max_retries
self.interval_start = driver_conf.rabbit_retry_interval
self.interval_stepping = driver_conf.rabbit_retry_backoff
self.login_method = driver_conf.rabbit_login_method
self.fake_rabbit = driver_conf.fake_rabbit
self.virtual_host = driver_conf.rabbit_virtual_host
self.rabbit_hosts = driver_conf.rabbit_hosts
self.rabbit_port = driver_conf.rabbit_port
self.rabbit_userid = driver_conf.rabbit_userid
self.rabbit_password = driver_conf.rabbit_password
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.heartbeat_timeout_threshold = \
driver_conf.heartbeat_timeout_threshold
self.heartbeat_rate = driver_conf.heartbeat_rate
self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
self.amqp_durable_queues = driver_conf.amqp_durable_queues
self.amqp_auto_delete = driver_conf.amqp_auto_delete
self.rabbit_use_ssl = driver_conf.rabbit_use_ssl
self.kombu_reconnect_timeout = driver_conf.kombu_reconnect_timeout
if self.rabbit_use_ssl:
self.kombu_ssl_version = driver_conf.kombu_ssl_version
self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile
self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile
self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs
# Try forever? # Try forever?
if self.max_retries <= 0: if self.max_retries <= 0:
self.max_retries = None self.max_retries = None
self.interval_start = self.driver_conf.rabbit_retry_interval
self.interval_stepping = self.driver_conf.rabbit_retry_backoff
# max retry-interval = 30 seconds # max retry-interval = 30 seconds
self.interval_max = 30 self.interval_max = 30
self._login_method = self.driver_conf.rabbit_login_method
if url.virtual_host is not None: if url.virtual_host is not None:
virtual_host = url.virtual_host virtual_host = url.virtual_host
else: else:
virtual_host = self.driver_conf.rabbit_virtual_host virtual_host = self.virtual_host
self._url = '' self._url = ''
if self.driver_conf.fake_rabbit: if self.fake_rabbit:
LOG.warn("Deprecated: fake_rabbit option is deprecated, set " LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
"rpc_backend to kombu+memory or use the fake " "rpc_backend to kombu+memory or use the fake "
"driver instead.") "driver instead.")
@ -423,13 +447,13 @@ class Connection(object):
transport = url.transport.replace('kombu+', '') transport = url.transport.replace('kombu+', '')
self._url = "%s://%s" % (transport, virtual_host) self._url = "%s://%s" % (transport, virtual_host)
else: else:
for adr in self.driver_conf.rabbit_hosts: for adr in self.rabbit_hosts:
hostname, port = netutils.parse_host_port( hostname, port = netutils.parse_host_port(
adr, default_port=self.driver_conf.rabbit_port) adr, default_port=self.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % ( self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '', ";" if self._url else '',
parse.quote(self.driver_conf.rabbit_userid), parse.quote(self.rabbit_userid),
parse.quote(self.driver_conf.rabbit_password), parse.quote(self.rabbit_password),
self._parse_url_hostname(hostname), port, self._parse_url_hostname(hostname), port,
virtual_host) virtual_host)
@ -450,9 +474,9 @@ class Connection(object):
self.connection = kombu.connection.Connection( self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(), self._url, ssl=self._fetch_ssl_params(),
login_method=self._login_method, login_method=self.login_method,
failover_strategy="shuffle", failover_strategy="shuffle",
heartbeat=self.driver_conf.heartbeat_timeout_threshold, heartbeat=self.heartbeat_timeout_threshold,
transport_options={'confirm_publish': True}) transport_options={'confirm_publish': True})
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'), LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
@ -467,8 +491,8 @@ class Connection(object):
# (heatbeat_timeout/heartbeat_rate/2.0, default kombu # (heatbeat_timeout/heartbeat_rate/2.0, default kombu
# heartbeat_rate is 2) # heartbeat_rate is 2)
self._heartbeat_wait_timeout = ( self._heartbeat_wait_timeout = (
float(self.driver_conf.heartbeat_timeout_threshold) / float(self.heartbeat_timeout_threshold) /
float(self.driver_conf.heartbeat_rate) / 2.0) float(self.heartbeat_rate) / 2.0)
self._heartbeat_support_log_emitted = False self._heartbeat_support_log_emitted = False
# NOTE(sileht): just ensure the connection is setuped at startup # NOTE(sileht): just ensure the connection is setuped at startup
@ -538,19 +562,19 @@ class Connection(object):
"""Handles fetching what ssl params should be used for the connection """Handles fetching what ssl params should be used for the connection
(if any). (if any).
""" """
if self.driver_conf.rabbit_use_ssl: if self.rabbit_use_ssl:
ssl_params = dict() ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket # http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.driver_conf.kombu_ssl_version: if self.kombu_ssl_version:
ssl_params['ssl_version'] = self.validate_ssl_version( ssl_params['ssl_version'] = self.validate_ssl_version(
self.driver_conf.kombu_ssl_version) self.kombu_ssl_version)
if self.driver_conf.kombu_ssl_keyfile: if self.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile ssl_params['keyfile'] = self.kombu_ssl_keyfile
if self.driver_conf.kombu_ssl_certfile: if self.kombu_ssl_certfile:
ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile ssl_params['certfile'] = self.kombu_ssl_certfile
if self.driver_conf.kombu_ssl_ca_certs: if self.kombu_ssl_ca_certs:
ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs ssl_params['ca_certs'] = self.kombu_ssl_ca_certs
# We might want to allow variations in the # We might want to allow variations in the
# future with this? # future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
@ -591,8 +615,8 @@ class Connection(object):
recoverable_error_callback and recoverable_error_callback(exc) recoverable_error_callback and recoverable_error_callback(exc)
interval = (self.driver_conf.kombu_reconnect_delay + interval interval = (self.kombu_reconnect_delay + interval
if self.driver_conf.kombu_reconnect_delay > 0 if self.kombu_reconnect_delay > 0
else interval) else interval)
info = {'err_str': exc, 'sleep_time': interval} info = {'err_str': exc, 'sleep_time': interval}
@ -617,8 +641,8 @@ class Connection(object):
# use kombu for HA connection, the interval_step # use kombu for HA connection, the interval_step
# should sufficient, because the underlying kombu transport # should sufficient, because the underlying kombu transport
# connection object freed. # connection object freed.
if self.driver_conf.kombu_reconnect_delay > 0: if self.kombu_reconnect_delay > 0:
time.sleep(self.driver_conf.kombu_reconnect_delay) time.sleep(self.kombu_reconnect_delay)
def on_reconnection(new_channel): def on_reconnection(new_channel):
"""Callback invoked when the kombu reconnects and creates """Callback invoked when the kombu reconnects and creates
@ -712,7 +736,7 @@ class Connection(object):
self._consumers = [] self._consumers = []
def _heartbeat_supported_and_enabled(self): def _heartbeat_supported_and_enabled(self):
if self.driver_conf.heartbeat_timeout_threshold <= 0: if self.heartbeat_timeout_threshold <= 0:
return False return False
if self.connection.supports_heartbeats: if self.connection.supports_heartbeats:
@ -741,9 +765,9 @@ class Connection(object):
# NOTE(sileht): we are suposed to send at least one heartbeat # NOTE(sileht): we are suposed to send at least one heartbeat
# every heartbeat_timeout_threshold, so no need to way more # every heartbeat_timeout_threshold, so no need to way more
with self._transport_socket_timeout( with self._transport_socket_timeout(
self.driver_conf.heartbeat_timeout_threshold): self.heartbeat_timeout_threshold):
self.connection.heartbeat_check( self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate) rate=self.heartbeat_rate)
def _heartbeat_start(self): def _heartbeat_start(self):
if self._heartbeat_supported_and_enabled(): if self._heartbeat_supported_and_enabled():
@ -880,28 +904,28 @@ class Connection(object):
responses for call/multicall responses for call/multicall
""" """
consumer = Consumer(self.driver_conf, consumer = Consumer(exchange_name=topic,
exchange_name=topic,
queue_name=topic, queue_name=topic,
routing_key=topic, routing_key=topic,
type='direct', type='direct',
durable=False, durable=False,
auto_delete=True, auto_delete=True,
callback=callback) callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer) self.declare_consumer(consumer)
def declare_topic_consumer(self, exchange_name, topic, callback=None, def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None): queue_name=None):
"""Create a 'topic' consumer.""" """Create a 'topic' consumer."""
consumer = Consumer(self.driver_conf, consumer = Consumer(exchange_name=exchange_name,
exchange_name=exchange_name,
queue_name=queue_name or topic, queue_name=queue_name or topic,
routing_key=topic, routing_key=topic,
type='topic', type='topic',
durable=self.driver_conf.amqp_durable_queues, durable=self.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete, auto_delete=self.amqp_auto_delete,
callback=callback) callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer) self.declare_consumer(consumer)
@ -912,15 +936,14 @@ class Connection(object):
exchange_name = '%s_fanout' % topic exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique) queue_name = '%s_fanout_%s' % (topic, unique)
consumer = Consumer(self.driver_conf, consumer = Consumer(exchange_name=exchange_name,
exchange_name=exchange_name,
queue_name=queue_name, queue_name=queue_name,
routing_key=topic, routing_key=topic,
type='fanout', type='fanout',
durable=False, durable=False,
auto_delete=True, auto_delete=True,
callback=callback, callback=callback,
nowait=False) rabbit_ha_queues=self.rabbit_ha_queues)
self.declare_consumer(consumer) self.declare_consumer(consumer)
@ -955,7 +978,7 @@ class Connection(object):
# a answer before timeout is reached # a answer before timeout is reached
transport_timeout = timeout transport_timeout = timeout
heartbeat_timeout = self.driver_conf.heartbeat_timeout_threshold heartbeat_timeout = self.heartbeat_timeout_threshold
if (self._heartbeat_supported_and_enabled() and ( if (self._heartbeat_supported_and_enabled() and (
transport_timeout is None or transport_timeout is None or
transport_timeout > heartbeat_timeout)): transport_timeout > heartbeat_timeout)):
@ -999,7 +1022,7 @@ class Connection(object):
auto_delete=exchange.auto_delete, auto_delete=exchange.auto_delete,
name=routing_key, name=routing_key,
routing_key=routing_key, routing_key=routing_key,
queue_arguments=_get_queue_arguments(self.driver_conf)) queue_arguments=_get_queue_arguments(self.rabbit_ha_queues))
queue.declare() queue.declare()
self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier)
@ -1019,7 +1042,7 @@ class Connection(object):
# before timeout is exshauted # before timeout is exshauted
duration = ( duration = (
timeout if timeout is not None timeout if timeout is not None
else self.conf.oslo_messaging_rabbit.kombu_reconnect_timeout else self.kombu_reconnect_timeout
) )
timer = rpc_common.DecayingTimer(duration=duration) timer = rpc_common.DecayingTimer(duration=duration)
@ -1065,8 +1088,8 @@ class Connection(object):
exchange = kombu.entity.Exchange( exchange = kombu.entity.Exchange(
name=exchange_name, name=exchange_name,
type='topic', type='topic',
durable=self.driver_conf.amqp_durable_queues, durable=self.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete) auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish, exchange, msg, self._ensure_publishing(self._publish, exchange, msg,
routing_key=topic, retry=retry) routing_key=topic, retry=retry)
@ -1085,8 +1108,8 @@ class Connection(object):
exchange = kombu.entity.Exchange( exchange = kombu.entity.Exchange(
name=exchange_name, name=exchange_name,
type='topic', type='topic',
durable=self.driver_conf.amqp_durable_queues, durable=self.amqp_durable_queues,
auto_delete=self.driver_conf.amqp_auto_delete) auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish_and_creates_default_queue, self._ensure_publishing(self._publish_and_creates_default_queue,
exchange, msg, routing_key=topic, retry=retry) exchange, msg, routing_key=topic, retry=retry)