Use URL to define RabbitMQ connection
* kombu have been updated to be compatible with global requirements * Unit test have been modified due to breaking changes introduced by kombu library * kombu will be replaced later by oslo.messaging * Add new config option to define multiple connection URL separated by a semi-colon * Use secret parameter to avoid any password leaks in log files * Add option for connection heartbeat Change-Id: Ia1db99f5804a0b3cdd496485f6a9410758d567fe
This commit is contained in:
parent
07cb2a0441
commit
f1cb73cab3
@ -34,11 +34,8 @@ def main():
|
||||
database_driver.connect()
|
||||
|
||||
application_controller = controller.Controller(config, database_driver)
|
||||
connection = kombu.Connection(hostname=config.collector.rabbit_host,
|
||||
port=config.collector.rabbit_port,
|
||||
userid=config.collector.rabbit_username,
|
||||
password=config.collector.rabbit_password,
|
||||
heartbeat=540)
|
||||
|
||||
connection = kombu.Connection(config.collector.url, heartbeat=config.collector.heartbeat)
|
||||
retry_listener = retry_adapter.RetryAdapter(config, connection)
|
||||
bus_listener = bus_adapter.BusAdapter(config, application_controller,
|
||||
connection, retry_listener)
|
||||
|
@ -20,18 +20,18 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class RetryAdapter(object):
|
||||
def __init__(self, config, connection):
|
||||
def __init__(self, config, connection, retry_producer=None, dead_producer=None):
|
||||
self.config = config
|
||||
self.connection = connection
|
||||
|
||||
retry_exchange = self._configure_retry_exchanges(self.connection)
|
||||
dead_exchange = self._configure_dead_exchange(self.connection)
|
||||
|
||||
self._retry_producer = kombu.Producer(self.connection, exchange=retry_exchange)
|
||||
self._dead_producer = kombu.Producer(self.connection, exchange=dead_exchange)
|
||||
self._retry_producer = retry_producer or kombu.Producer(self.connection, exchange=retry_exchange)
|
||||
self._dead_producer = dead_producer or kombu.Producer(self.connection, exchange=dead_exchange)
|
||||
|
||||
def publish_to_dead_letter(self, message):
|
||||
death_count = self._rejected_count(message)
|
||||
death_count = self._get_rejected_count(message)
|
||||
LOG.info('Message die %d times', death_count)
|
||||
|
||||
if death_count < self.config.collector.max_retries:
|
||||
@ -114,7 +114,7 @@ class RetryAdapter(object):
|
||||
"x-dead-letter-routing-key": self.config.collector.routing_key,
|
||||
}
|
||||
|
||||
def _rejected_count(self, message):
|
||||
def _get_rejected_count(self, message):
|
||||
if 'x-death' in message.headers:
|
||||
return len(message.headers['x-death'])
|
||||
return 0
|
||||
|
@ -24,6 +24,7 @@ database_opts = [
|
||||
default='mongodb',
|
||||
help='Database driver'),
|
||||
cfg.StrOpt('connection_url',
|
||||
secret=True,
|
||||
default='mongodb://almanach:almanach@localhost:27017/almanach',
|
||||
help='Database connection URL'),
|
||||
]
|
||||
@ -38,16 +39,13 @@ api_opts = [
|
||||
]
|
||||
|
||||
collector_opts = [
|
||||
cfg.HostnameOpt('rabbit_host',
|
||||
default='localhost',
|
||||
help='RabbitMQ Hostname'),
|
||||
cfg.PortOpt('rabbit_port',
|
||||
default=5672,
|
||||
help='RabbitMQ TCP port'),
|
||||
cfg.StrOpt('rabbit_username',
|
||||
help='RabbitMQ Username'),
|
||||
cfg.StrOpt('rabbit_password',
|
||||
help='RabbitMQ Password'),
|
||||
cfg.StrOpt('url',
|
||||
secret=True,
|
||||
default='amqp://guest:guest@localhost:5672',
|
||||
help='RabbitMQ connection URL'),
|
||||
cfg.IntOpt('heartbeat',
|
||||
default=540,
|
||||
help='RabbitMQ connection heartbeat'),
|
||||
cfg.StrOpt('queue',
|
||||
default='almanach.info',
|
||||
help='Default queue name'),
|
||||
@ -85,11 +83,13 @@ auth_opts = [
|
||||
default='private_key',
|
||||
help='Authentication driver for the API'),
|
||||
cfg.StrOpt('private_key',
|
||||
secret=True,
|
||||
default='secret',
|
||||
help='Private key for private key authentication'),
|
||||
cfg.StrOpt('keystone_username',
|
||||
help='Keystone service username'),
|
||||
cfg.StrOpt('keystone_password',
|
||||
secret=True,
|
||||
help='Keystone service password'),
|
||||
cfg.StrOpt('keystone_tenant',
|
||||
help='Keystone service tenant'),
|
||||
|
@ -47,19 +47,8 @@ bind_port = 8000
|
||||
# From almanach
|
||||
#
|
||||
|
||||
# RabbitMQ Hostname (hostname value)
|
||||
rabbit_host = messaging
|
||||
|
||||
# RabbitMQ TCP port (port value)
|
||||
# Minimum value: 0
|
||||
# Maximum value: 65535
|
||||
#rabbit_port = 5672
|
||||
|
||||
# RabbitMQ Username (string value)
|
||||
rabbit_username = guest
|
||||
|
||||
# RabbitMQ Password (string value)
|
||||
rabbit_password = guest
|
||||
# RabbitMQ connection URL (string value)
|
||||
url = amqp://guest:guest@messaging:5672
|
||||
|
||||
# Default queue name (string value)
|
||||
#default_queue = almanach.info
|
||||
|
@ -3,7 +3,7 @@ Flask==0.10.1
|
||||
PyYAML==3.11
|
||||
jsonpickle==0.7.1
|
||||
pymongo>=3.0.2,!=3.1 # Apache-2.0
|
||||
kombu>=3.0.30
|
||||
kombu>=3.0.25 # BSD
|
||||
pytz>=2014.10
|
||||
voluptuous==0.8.11
|
||||
python-keystoneclient>=1.6.0
|
||||
|
@ -12,3 +12,4 @@ sphinxcontrib-httpdomain # BSD
|
||||
flake8>=2.5.4,<2.6.0 # MIT
|
||||
hacking<0.12,>=0.11.0 # Apache-2.0
|
||||
testtools>=1.4.0 # MIT
|
||||
mock>=2.0 # BSD
|
@ -12,10 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from flexmock import flexmock
|
||||
from kombu import Connection
|
||||
from kombu.tests import mocks
|
||||
from kombu.transport import pyamqp
|
||||
import mock
|
||||
|
||||
from almanach.collector import retry_adapter
|
||||
from tests import base
|
||||
@ -25,73 +22,28 @@ class BusAdapterTest(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(BusAdapterTest, self).setUp()
|
||||
self.setup_connection_mock()
|
||||
self.retry_adapter = retry_adapter.RetryAdapter(self.config, self.connection)
|
||||
self.connection = mock.Mock()
|
||||
self.retry_producer = mock.Mock()
|
||||
self.dead_producer = mock.Mock()
|
||||
self.retry_adapter = retry_adapter.RetryAdapter(self.config, self.connection,
|
||||
self.retry_producer, self.dead_producer)
|
||||
|
||||
def setup_connection_mock(self):
|
||||
mocks.Transport.recoverable_connection_errors = pyamqp.Transport.recoverable_connection_errors
|
||||
self.connection = flexmock(Connection(transport=mocks.Transport))
|
||||
self.channel_mock = flexmock(self.connection.default_channel)
|
||||
self.connection.should_receive('channel').and_return(self.channel_mock)
|
||||
|
||||
def test_declare_retry_exchanges_retries_if_it_fails(self):
|
||||
connection = flexmock(Connection(transport=mocks.Transport))
|
||||
connection.should_receive('_establish_connection').times(3)\
|
||||
.and_raise(IOError)\
|
||||
.and_raise(IOError)\
|
||||
.and_return(connection.transport.establish_connection())
|
||||
|
||||
self.retry_adapter = retry_adapter.RetryAdapter(self.config, connection)
|
||||
|
||||
def test_publish_to_retry_queue_happy_path(self):
|
||||
message = self.build_message()
|
||||
|
||||
self.expect_publish_with(message, 'almanach.retry').once()
|
||||
self.retry_adapter.publish_to_dead_letter(message)
|
||||
|
||||
def test_publish_to_retry_queue_retries_if_it_fails(self):
|
||||
message = self.build_message()
|
||||
|
||||
self.expect_publish_with(message, 'almanach.retry').times(4)\
|
||||
.and_raise(IOError)\
|
||||
.and_raise(IOError)\
|
||||
.and_raise(IOError)\
|
||||
.and_return(message)
|
||||
def test_message_is_published_to_retry_queue(self):
|
||||
message = mock.Mock(headers=dict())
|
||||
message.delivery_info = dict(routing_key='test')
|
||||
|
||||
self.retry_adapter.publish_to_dead_letter(message)
|
||||
self.connection.ensure.assert_called_with(self.retry_producer, self.retry_producer.publish,
|
||||
errback=self.retry_adapter._error_callback,
|
||||
interval_max=30, interval_start=0, interval_step=5)
|
||||
|
||||
def build_message(self, headers=dict()):
|
||||
message = MyObject()
|
||||
message.headers = headers
|
||||
message.body = b'Now that the worst is behind you, it\'s time we get you back. - Mr. Robot'
|
||||
message.delivery_info = {'routing_key': 42}
|
||||
message.content_type = 'xml/rapture'
|
||||
message.content_encoding = 'iso8859-1'
|
||||
return message
|
||||
|
||||
def test_publish_to_dead_letter_messages_retried_more_than_twice(self):
|
||||
message = self.build_message(headers={'x-death': [0, 1, 2, 3]})
|
||||
|
||||
self.expect_publish_with(message, 'almanach.dead').once()
|
||||
def test_message_is_published_to_dead_queue(self):
|
||||
message = mock.Mock(headers={'x-death': [0, 1, 2, 3]})
|
||||
message.delivery_info = dict(routing_key='test')
|
||||
|
||||
self.retry_adapter.publish_to_dead_letter(message)
|
||||
self.assertEqual(self.connection.ensure.call_count, 3)
|
||||
|
||||
def expect_publish_with(self, message, exchange):
|
||||
expected_message = {'body': message.body,
|
||||
'priority': 0,
|
||||
'content_encoding': message.content_encoding,
|
||||
'content_type': message.content_type,
|
||||
'headers': message.headers,
|
||||
'properties': {'delivery_mode': 2}}
|
||||
|
||||
return self.channel_mock.should_receive('basic_publish')\
|
||||
.with_args(expected_message, exchange=exchange, routing_key=message.delivery_info['routing_key'],
|
||||
mandatory=False, immediate=False)
|
||||
|
||||
|
||||
class MyObject(object):
|
||||
headers = None
|
||||
body = None
|
||||
delivery_info = None
|
||||
content_type = None
|
||||
content_encoding = None
|
||||
self.connection.ensure.assert_called_with(self.dead_producer, self.dead_producer.publish,
|
||||
errback=self.retry_adapter._error_callback,
|
||||
interval_max=30, interval_start=0, interval_step=5)
|
||||
|
Loading…
Reference in New Issue
Block a user