Add retry logic to post_lifecycle_hook_message
This patch adds retry logic to post_lifecycle_hook_message when posting a lifecyle hook to Zaqar. This will make posting messages to Zaqar more robust in the event Zaqar is unavailable. Change-Id: I1681cf44a4c0a8bded21d328b9a7fff611355529
This commit is contained in:
parent
184434f5e6
commit
2bb47c1428
|
@ -34,4 +34,5 @@ six>=1.10.0 # MIT
|
||||||
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
|
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
|
||||||
sqlalchemy-migrate>=0.11.0 # Apache-2.0
|
sqlalchemy-migrate>=0.11.0 # Apache-2.0
|
||||||
stevedore>=1.20.0 # Apache-2.0
|
stevedore>=1.20.0 # Apache-2.0
|
||||||
|
tenacity>=4.9.0 # Apache-2.0
|
||||||
WebOb>=1.7.1 # MIT
|
WebOb>=1.7.1 # MIT
|
||||||
|
|
|
@ -14,6 +14,7 @@ from oslo_config import cfg
|
||||||
from oslo_context import context as oslo_context
|
from oslo_context import context as oslo_context
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
import six
|
import six
|
||||||
|
import tenacity
|
||||||
|
|
||||||
from senlin.common import context as senlin_context
|
from senlin.common import context as senlin_context
|
||||||
from senlin.common import exception
|
from senlin.common import exception
|
||||||
|
@ -24,6 +25,11 @@ LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
RETRY_ATTEMPTS = 3
|
||||||
|
RETRY_INITIAL_DELAY = 1
|
||||||
|
RETRY_BACKOFF = 1
|
||||||
|
RETRY_MAX = 3
|
||||||
|
|
||||||
|
|
||||||
class Message(object):
|
class Message(object):
|
||||||
"""Zaqar message type of notification."""
|
"""Zaqar message type of notification."""
|
||||||
|
@ -67,19 +73,23 @@ class Message(object):
|
||||||
|
|
||||||
return params
|
return params
|
||||||
|
|
||||||
|
@tenacity.retry(
|
||||||
|
retry=tenacity.retry_if_exception_type(exception.EResourceCreation),
|
||||||
|
wait=tenacity.wait_incrementing(
|
||||||
|
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||||
|
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||||
def post_lifecycle_hook_message(self, lifecycle_action_token, node_id,
|
def post_lifecycle_hook_message(self, lifecycle_action_token, node_id,
|
||||||
resource_id, lifecycle_transition_type):
|
resource_id, lifecycle_transition_type):
|
||||||
|
message_list = [{
|
||||||
|
"ttl": CONF.notification.ttl,
|
||||||
|
"body": {
|
||||||
|
"lifecycle_action_token": lifecycle_action_token,
|
||||||
|
"node_id": node_id,
|
||||||
|
"resource_id": resource_id,
|
||||||
|
"lifecycle_transition_type": lifecycle_transition_type
|
||||||
|
}
|
||||||
|
}]
|
||||||
try:
|
try:
|
||||||
message_list = [{
|
|
||||||
"ttl": CONF.notification.ttl,
|
|
||||||
"body": {
|
|
||||||
"lifecycle_action_token": lifecycle_action_token,
|
|
||||||
"node_id": node_id,
|
|
||||||
"resource_id": resource_id,
|
|
||||||
"lifecycle_transition_type": lifecycle_transition_type
|
|
||||||
}
|
|
||||||
}]
|
|
||||||
|
|
||||||
if not self.zaqar().queue_exists(self.queue_name):
|
if not self.zaqar().queue_exists(self.queue_name):
|
||||||
kwargs = {
|
kwargs = {
|
||||||
"_max_messages_post_size":
|
"_max_messages_post_size":
|
||||||
|
@ -91,5 +101,6 @@ class Message(object):
|
||||||
|
|
||||||
return self.zaqar().message_post(self.queue_name, message_list)
|
return self.zaqar().message_post(self.queue_name, message_list)
|
||||||
except exception.InternalError as ex:
|
except exception.InternalError as ex:
|
||||||
raise exception.EResourceCreation(type='queue',
|
raise exception.EResourceCreation(
|
||||||
message=six.text_type(ex))
|
type='queue',
|
||||||
|
message=six.text_type(ex))
|
||||||
|
|
|
@ -15,6 +15,7 @@ import mock
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
from senlin.common import exception
|
||||||
from senlin.drivers import base as driver_base
|
from senlin.drivers import base as driver_base
|
||||||
from senlin.engine.notifications import message as mmod
|
from senlin.engine.notifications import message as mmod
|
||||||
from senlin.tests.unit.common import base
|
from senlin.tests.unit.common import base
|
||||||
|
@ -121,3 +122,28 @@ class TestMessage(base.SenlinTestCase):
|
||||||
}
|
}
|
||||||
}]
|
}]
|
||||||
mock_zc.message_post.assert_called_once_with(queue_name, message_list)
|
mock_zc.message_post.assert_called_once_with(queue_name, message_list)
|
||||||
|
|
||||||
|
@mock.patch.object(mmod.Message, 'zaqar')
|
||||||
|
def test_post_lifecycle_hook_message_queue_retry(self, mock_zaqar):
|
||||||
|
cfg.CONF.set_override('max_message_size', 8192, 'notification')
|
||||||
|
mock_zc = mock.Mock()
|
||||||
|
mock_zaqar.return_value = mock_zc
|
||||||
|
queue_name = 'my_queue'
|
||||||
|
message = mmod.Message(queue_name)
|
||||||
|
mock_zc.queue_exists.return_value = True
|
||||||
|
test_exception = exception.EResourceCreation(type='queue',
|
||||||
|
message="test")
|
||||||
|
mock_zc.message_post.side_effect = [
|
||||||
|
test_exception, test_exception, None]
|
||||||
|
|
||||||
|
lifecycle_action_token = 'ACTION_ID'
|
||||||
|
node_id = 'NODE_ID'
|
||||||
|
resource_id = 'RESOURCE_ID'
|
||||||
|
lifecycle_transition_type = 'TYPE'
|
||||||
|
|
||||||
|
message.post_lifecycle_hook_message(lifecycle_action_token, node_id,
|
||||||
|
resource_id,
|
||||||
|
lifecycle_transition_type)
|
||||||
|
|
||||||
|
mock_zc.queue_create.assert_not_called()
|
||||||
|
self.assertEqual(3, mock_zc.message_post.call_count)
|
||||||
|
|
Loading…
Reference in New Issue