Merge "Use single producer and to avoid an exchange redeclaration"

This commit is contained in:
Jenkins
2016-05-17 20:36:26 +00:00
committed by Gerrit Code Review
2 changed files with 48 additions and 32 deletions

View File

@@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import collections
import contextlib import contextlib
import errno import errno
import functools import functools
@@ -505,10 +504,17 @@ class Connection(object):
self._initial_pid = os.getpid() self._initial_pid = os.getpid()
self._consumers = {} self._consumers = {}
self._producer = None
self._new_tags = set() self._new_tags = set()
self._active_tags = {} self._active_tags = {}
self._tags = itertools.count(1) self._tags = itertools.count(1)
# Set of exchanges and queues declared on the channel to avoid
# unnecessary redeclaration. This set is resetted each time
# the connection is resetted in Connection._set_current_channel
self._declared_exchanges = set()
self._declared_queues = set()
self._consume_loop_stopped = False self._consume_loop_stopped = False
self.channel = None self.channel = None
self.purpose = purpose self.purpose = purpose
@@ -791,14 +797,16 @@ class Connection(object):
return return
if self.channel is not None: if self.channel is not None:
self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None) self._declared_queues.clear()
self._declared_exchanges.clear()
self.connection.maybe_close_channel(self.channel) self.connection.maybe_close_channel(self.channel)
self.channel = new_channel self.channel = new_channel
if (new_channel is not None and if new_channel is not None:
self.purpose == rpc_common.PURPOSE_LISTEN): if self.purpose == rpc_common.PURPOSE_LISTEN:
self._set_qos(new_channel) self._set_qos(new_channel)
self._producer = kombu.messaging.Producer(new_channel)
def _set_qos(self, channel): def _set_qos(self, channel):
"""Set QoS prefetch count on the channel""" """Set QoS prefetch count on the channel"""
@@ -1117,10 +1125,10 @@ class Connection(object):
def _publish(self, exchange, msg, routing_key=None, timeout=None): def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""Publish a message.""" """Publish a message."""
producer = kombu.messaging.Producer(exchange=exchange,
channel=self.channel, if not (exchange.passive or exchange.name in self._declared_exchanges):
auto_declare=not exchange.passive, exchange(self.channel).declare()
routing_key=routing_key) self._declared_exchanges.add(exchange.name)
log_info = {'msg': msg, log_info = {'msg': msg,
'who': exchange or 'default', 'who': exchange or 'default',
@@ -1131,13 +1139,11 @@ class Connection(object):
# NOTE(sileht): no need to wait more, caller expects # NOTE(sileht): no need to wait more, caller expects
# a answer before timeout is reached # a answer before timeout is reached
with self._transport_socket_timeout(timeout): with self._transport_socket_timeout(timeout):
producer.publish(msg, expiration=self._get_expiration(timeout), self._producer.publish(msg,
compression=self.kombu_compression) exchange=exchange,
routing_key=routing_key,
# List of notification queue declared on the channel to avoid expiration=self._get_expiration(timeout),
# unnecessary redeclaration. This list is resetted each time compression=self.kombu_compression)
# the connection is resetted in Connection._set_current_channel
PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set)
def _publish_and_creates_default_queue(self, exchange, msg, def _publish_and_creates_default_queue(self, exchange, msg,
routing_key=None, timeout=None): routing_key=None, timeout=None):
@@ -1157,8 +1163,7 @@ class Connection(object):
# NOTE(sileht): We only do it once per reconnection # NOTE(sileht): We only do it once per reconnection
# the Connection._set_current_channel() is responsible to clear # the Connection._set_current_channel() is responsible to clear
# this cache # this cache
if (queue_indentifier not in if queue_indentifier not in self._declared_queues:
self.PUBLISHER_DECLARED_QUEUES[self.channel]):
queue = kombu.entity.Queue( queue = kombu.entity.Queue(
channel=self.channel, channel=self.channel,
exchange=exchange, exchange=exchange,
@@ -1172,7 +1177,7 @@ class Connection(object):
'Connection._publish_and_creates_default_queue: ' 'Connection._publish_and_creates_default_queue: '
'declare queue %(key)s on %(exchange)s exchange', log_info) 'declare queue %(key)s on %(exchange)s exchange', log_info)
queue.declare() queue.declare()
self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) self._declared_queues.add(queue_indentifier)
self._publish(exchange, msg, routing_key=routing_key, timeout=timeout) self._publish(exchange, msg, routing_key=routing_key, timeout=timeout)

View File

@@ -212,10 +212,11 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
def test_send_with_timeout(self, fake_publish): def test_send_with_timeout(self, fake_publish):
transport = oslo_messaging.get_transport(self.conf, transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////') 'kombu+memory:////')
exchange_mock = mock.Mock()
with transport._driver._get_connection( with transport._driver._get_connection(
driver_common.PURPOSE_SEND) as pool_conn: driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key', conn._publish(exchange_mock, 'msg', routing_key='routing_key',
timeout=1) timeout=1)
# NOTE(gcb) kombu accept TTL as seconds instead of millisecond since # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
@@ -226,23 +227,30 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
if versionutils.is_compatible('3.0.25', kombu_version): if versionutils.is_compatible('3.0.25', kombu_version):
fake_publish.assert_called_with( fake_publish.assert_called_with(
'msg', expiration=1, 'msg', expiration=1,
compression=self.conf.oslo_messaging_rabbit.kombu_compression) exchange=exchange_mock,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
routing_key='routing_key')
else: else:
fake_publish.assert_called_with( fake_publish.assert_called_with(
'msg', expiration=1000, 'msg', expiration=1000,
compression=self.conf.oslo_messaging_rabbit.kombu_compression) exchange=exchange_mock,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
routing_key='routing_key')
@mock.patch('kombu.messaging.Producer.publish') @mock.patch('kombu.messaging.Producer.publish')
def test_send_no_timeout(self, fake_publish): def test_send_no_timeout(self, fake_publish):
transport = oslo_messaging.get_transport(self.conf, transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////') 'kombu+memory:////')
exchange_mock = mock.Mock()
with transport._driver._get_connection( with transport._driver._get_connection(
driver_common.PURPOSE_SEND) as pool_conn: driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key') conn._publish(exchange_mock, 'msg', routing_key='routing_key')
fake_publish.assert_called_with( fake_publish.assert_called_with(
'msg', expiration=None, 'msg', expiration=None,
compression=self.conf.oslo_messaging_rabbit.kombu_compression) compression=self.conf.oslo_messaging_rabbit.kombu_compression,
exchange=exchange_mock,
routing_key='routing_key')
def test_declared_queue_publisher(self): def test_declared_queue_publisher(self):
transport = oslo_messaging.get_transport(self.conf, transport = oslo_messaging.get_transport(self.conf,
@@ -277,14 +285,17 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
# Ensure it creates it # Ensure it creates it
try_send(e_passive) try_send(e_passive)
with mock.patch('kombu.messaging.Producer', side_effect=exc): with mock.patch('kombu.messaging.Producer.publish',
# Should reset the cache and ensures the exchange does side_effect=exc):
# not exists # Ensure the exchange is already in cache
self.assertRaises(exc, try_send, e_passive) self.assertIn('foobar', conn._declared_exchanges)
# Recreate it # Reset connection
try_send(e_active) self.assertRaises(exc, try_send, e_passive)
# Ensure it have been recreated # Ensure the cache is empty
try_send(e_passive) self.assertEqual(0, len(conn._declared_exchanges))
try_send(e_active)
self.assertIn('foobar', conn._declared_exchanges)
class TestRabbitConsume(test_utils.BaseTestCase): class TestRabbitConsume(test_utils.BaseTestCase):