From 3044abbaf4e54dcfb540af6533c7aeca53bc9d5e Mon Sep 17 00:00:00 2001 From: Victor Sergeyev Date: Wed, 13 May 2015 17:51:12 +0300 Subject: [PATCH] Reduce `magic` conf attribute usage Connection class contains `magic` attribute conf, which contains (or at least should contain) all config options. Set these config options as class attributes to make the code more clear Change-Id: Ib67f148e2ecf3def37e57d6e3359bced932fbc90 --- oslo_messaging/_drivers/impl_rabbit.py | 135 +++++++++++++++---------- 1 file changed, 79 insertions(+), 56 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index e5538e5e9..7bc7c43e9 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -156,7 +156,7 @@ rabbit_opts = [ LOG = logging.getLogger(__name__) -def _get_queue_arguments(conf): +def _get_queue_arguments(rabbit_ha_queues): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we declare a mirrored queue @@ -167,7 +167,7 @@ def _get_queue_arguments(conf): Setting x-ha-policy to all means that the queue will be mirrored to all nodes in the cluster. """ - return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {} + return {'x-ha-policy': 'all'} if rabbit_ha_queues else {} class RabbitMessage(dict): @@ -186,8 +186,8 @@ class RabbitMessage(dict): class Consumer(object): """Consumer class.""" - def __init__(self, conf, exchange_name, queue_name, routing_key, type, - durable, auto_delete, callback, nowait=True): + def __init__(self, exchange_name, queue_name, routing_key, type, durable, + auto_delete, callback, nowait=True, rabbit_ha_queues=None): """Init the Publisher class with the exchange_name, routing_key, type, durable auto_delete """ @@ -199,7 +199,7 @@ class Consumer(object): self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(conf) + self.queue_arguments = _get_queue_arguments(rabbit_ha_queues) self.queue = None self.exchange = kombu.entity.Exchange( @@ -377,26 +377,50 @@ class Connection(object): pools = {} def __init__(self, conf, url, purpose): - self.conf = conf - self.driver_conf = self.conf.oslo_messaging_rabbit - self.max_retries = self.driver_conf.rabbit_max_retries + # NOTE(viktors): Parse config options + driver_conf = conf.oslo_messaging_rabbit + + self.max_retries = driver_conf.rabbit_max_retries + self.interval_start = driver_conf.rabbit_retry_interval + self.interval_stepping = driver_conf.rabbit_retry_backoff + + self.login_method = driver_conf.rabbit_login_method + self.fake_rabbit = driver_conf.fake_rabbit + self.virtual_host = driver_conf.rabbit_virtual_host + self.rabbit_hosts = driver_conf.rabbit_hosts + self.rabbit_port = driver_conf.rabbit_port + self.rabbit_userid = driver_conf.rabbit_userid + self.rabbit_password = driver_conf.rabbit_password + self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.heartbeat_timeout_threshold = \ + driver_conf.heartbeat_timeout_threshold + self.heartbeat_rate = driver_conf.heartbeat_rate + self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay + self.amqp_durable_queues = driver_conf.amqp_durable_queues + self.amqp_auto_delete = driver_conf.amqp_auto_delete + self.rabbit_use_ssl = driver_conf.rabbit_use_ssl + self.kombu_reconnect_timeout = driver_conf.kombu_reconnect_timeout + + if self.rabbit_use_ssl: + self.kombu_ssl_version = driver_conf.kombu_ssl_version + self.kombu_ssl_keyfile = driver_conf.kombu_ssl_keyfile + self.kombu_ssl_certfile = driver_conf.kombu_ssl_certfile + self.kombu_ssl_ca_certs = driver_conf.kombu_ssl_ca_certs + # Try forever? if self.max_retries <= 0: self.max_retries = None - self.interval_start = self.driver_conf.rabbit_retry_interval - self.interval_stepping = self.driver_conf.rabbit_retry_backoff + # max retry-interval = 30 seconds self.interval_max = 30 - self._login_method = self.driver_conf.rabbit_login_method - if url.virtual_host is not None: virtual_host = url.virtual_host else: - virtual_host = self.driver_conf.rabbit_virtual_host + virtual_host = self.virtual_host self._url = '' - if self.driver_conf.fake_rabbit: + if self.fake_rabbit: LOG.warn("Deprecated: fake_rabbit option is deprecated, set " "rpc_backend to kombu+memory or use the fake " "driver instead.") @@ -423,13 +447,13 @@ class Connection(object): transport = url.transport.replace('kombu+', '') self._url = "%s://%s" % (transport, virtual_host) else: - for adr in self.driver_conf.rabbit_hosts: + for adr in self.rabbit_hosts: hostname, port = netutils.parse_host_port( - adr, default_port=self.driver_conf.rabbit_port) + adr, default_port=self.rabbit_port) self._url += '%samqp://%s:%s@%s:%s/%s' % ( ";" if self._url else '', - parse.quote(self.driver_conf.rabbit_userid), - parse.quote(self.driver_conf.rabbit_password), + parse.quote(self.rabbit_userid), + parse.quote(self.rabbit_password), self._parse_url_hostname(hostname), port, virtual_host) @@ -450,9 +474,9 @@ class Connection(object): self.connection = kombu.connection.Connection( self._url, ssl=self._fetch_ssl_params(), - login_method=self._login_method, + login_method=self.login_method, failover_strategy="shuffle", - heartbeat=self.driver_conf.heartbeat_timeout_threshold, + heartbeat=self.heartbeat_timeout_threshold, transport_options={'confirm_publish': True}) LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'), @@ -467,8 +491,8 @@ class Connection(object): # (heatbeat_timeout/heartbeat_rate/2.0, default kombu # heartbeat_rate is 2) self._heartbeat_wait_timeout = ( - float(self.driver_conf.heartbeat_timeout_threshold) / - float(self.driver_conf.heartbeat_rate) / 2.0) + float(self.heartbeat_timeout_threshold) / + float(self.heartbeat_rate) / 2.0) self._heartbeat_support_log_emitted = False # NOTE(sileht): just ensure the connection is setuped at startup @@ -538,19 +562,19 @@ class Connection(object): """Handles fetching what ssl params should be used for the connection (if any). """ - if self.driver_conf.rabbit_use_ssl: + if self.rabbit_use_ssl: ssl_params = dict() # http://docs.python.org/library/ssl.html - ssl.wrap_socket - if self.driver_conf.kombu_ssl_version: + if self.kombu_ssl_version: ssl_params['ssl_version'] = self.validate_ssl_version( - self.driver_conf.kombu_ssl_version) - if self.driver_conf.kombu_ssl_keyfile: - ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile - if self.driver_conf.kombu_ssl_certfile: - ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile - if self.driver_conf.kombu_ssl_ca_certs: - ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs + self.kombu_ssl_version) + if self.kombu_ssl_keyfile: + ssl_params['keyfile'] = self.kombu_ssl_keyfile + if self.kombu_ssl_certfile: + ssl_params['certfile'] = self.kombu_ssl_certfile + if self.kombu_ssl_ca_certs: + ssl_params['ca_certs'] = self.kombu_ssl_ca_certs # We might want to allow variations in the # future with this? ssl_params['cert_reqs'] = ssl.CERT_REQUIRED @@ -591,8 +615,8 @@ class Connection(object): recoverable_error_callback and recoverable_error_callback(exc) - interval = (self.driver_conf.kombu_reconnect_delay + interval - if self.driver_conf.kombu_reconnect_delay > 0 + interval = (self.kombu_reconnect_delay + interval + if self.kombu_reconnect_delay > 0 else interval) info = {'err_str': exc, 'sleep_time': interval} @@ -617,8 +641,8 @@ class Connection(object): # use kombu for HA connection, the interval_step # should sufficient, because the underlying kombu transport # connection object freed. - if self.driver_conf.kombu_reconnect_delay > 0: - time.sleep(self.driver_conf.kombu_reconnect_delay) + if self.kombu_reconnect_delay > 0: + time.sleep(self.kombu_reconnect_delay) def on_reconnection(new_channel): """Callback invoked when the kombu reconnects and creates @@ -712,7 +736,7 @@ class Connection(object): self._consumers = [] def _heartbeat_supported_and_enabled(self): - if self.driver_conf.heartbeat_timeout_threshold <= 0: + if self.heartbeat_timeout_threshold <= 0: return False if self.connection.supports_heartbeats: @@ -741,9 +765,9 @@ class Connection(object): # NOTE(sileht): we are suposed to send at least one heartbeat # every heartbeat_timeout_threshold, so no need to way more with self._transport_socket_timeout( - self.driver_conf.heartbeat_timeout_threshold): + self.heartbeat_timeout_threshold): self.connection.heartbeat_check( - rate=self.driver_conf.heartbeat_rate) + rate=self.heartbeat_rate) def _heartbeat_start(self): if self._heartbeat_supported_and_enabled(): @@ -880,28 +904,28 @@ class Connection(object): responses for call/multicall """ - consumer = Consumer(self.driver_conf, - exchange_name=topic, + consumer = Consumer(exchange_name=topic, queue_name=topic, routing_key=topic, type='direct', durable=False, auto_delete=True, - callback=callback) + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues) self.declare_consumer(consumer) def declare_topic_consumer(self, exchange_name, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - consumer = Consumer(self.driver_conf, - exchange_name=exchange_name, + consumer = Consumer(exchange_name=exchange_name, queue_name=queue_name or topic, routing_key=topic, type='topic', - durable=self.driver_conf.amqp_durable_queues, - auto_delete=self.driver_conf.amqp_auto_delete, - callback=callback) + durable=self.amqp_durable_queues, + auto_delete=self.amqp_auto_delete, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues) self.declare_consumer(consumer) @@ -912,15 +936,14 @@ class Connection(object): exchange_name = '%s_fanout' % topic queue_name = '%s_fanout_%s' % (topic, unique) - consumer = Consumer(self.driver_conf, - exchange_name=exchange_name, + consumer = Consumer(exchange_name=exchange_name, queue_name=queue_name, routing_key=topic, type='fanout', durable=False, auto_delete=True, callback=callback, - nowait=False) + rabbit_ha_queues=self.rabbit_ha_queues) self.declare_consumer(consumer) @@ -955,7 +978,7 @@ class Connection(object): # a answer before timeout is reached transport_timeout = timeout - heartbeat_timeout = self.driver_conf.heartbeat_timeout_threshold + heartbeat_timeout = self.heartbeat_timeout_threshold if (self._heartbeat_supported_and_enabled() and ( transport_timeout is None or transport_timeout > heartbeat_timeout)): @@ -999,7 +1022,7 @@ class Connection(object): auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.driver_conf)) + queue_arguments=_get_queue_arguments(self.rabbit_ha_queues)) queue.declare() self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) @@ -1019,7 +1042,7 @@ class Connection(object): # before timeout is exshauted duration = ( timeout if timeout is not None - else self.conf.oslo_messaging_rabbit.kombu_reconnect_timeout + else self.kombu_reconnect_timeout ) timer = rpc_common.DecayingTimer(duration=duration) @@ -1065,8 +1088,8 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.driver_conf.amqp_durable_queues, - auto_delete=self.driver_conf.amqp_auto_delete) + durable=self.amqp_durable_queues, + auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish, exchange, msg, routing_key=topic, retry=retry) @@ -1085,8 +1108,8 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.driver_conf.amqp_durable_queues, - auto_delete=self.driver_conf.amqp_auto_delete) + durable=self.amqp_durable_queues, + auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish_and_creates_default_queue, exchange, msg, routing_key=topic, retry=retry)