agentnotifiers: retried notifications refactor

This patch introduces retry(func, max_attempts) method
which wraps the original func in such a way,
that if execution results in MessagingException,
the given function will be retried for max_attempts
till success or MessagingException is raised.
The function is in the utils module and can be reused
by different agentnotifiers if necessary.

Change-Id: I0d0c17e500e44c1a17438c29a0e76a9ef00872e8
This commit is contained in:
Inessa Vasilevskaya 2016-04-06 18:28:59 +03:00
parent 3db5f5e677
commit 495f417c8c
4 changed files with 106 additions and 48 deletions

View File

@ -20,6 +20,7 @@ from oslo_log import log as logging
import oslo_messaging
from neutron._i18n import _LE
from neutron.api.rpc.agentnotifiers import utils as ag_utils
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
@ -29,6 +30,10 @@ from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
# default messaging timeout is 60 sec, so 2 here is chosen to not block API
# call for more than 2 minutes
AGENT_NOTIFY_MAX_ATTEMPTS = 2
class L3AgentNotifyAPI(object):
"""API for plugin to notify L3 agent."""
@ -44,7 +49,8 @@ class L3AgentNotifyAPI(object):
'%(method)s', {'host': host,
'method': method})
cctxt = self.client.prepare(server=host)
rpc_method = cctxt.call if use_call else cctxt.cast
rpc_method = (ag_utils.retry(cctxt.call, AGENT_NOTIFY_MAX_ATTEMPTS)
if use_call else cctxt.cast)
rpc_method(context, method, **kwargs)
def _agent_notification(self, context, method, router_ids, operation,

View File

@ -0,0 +1,62 @@
# Copyright (c) 2016 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
import oslo_messaging
from oslo_utils import excutils
from neutron._i18n import _LW
LOG = logging.getLogger(__name__)
def _call_with_retry(max_attempts):
"""A wrapper to retry function using rpc call in case of
MessagingException.
Retries the decorated function in case of MessagingException of some kind
(a timeout, client send error etc).
If maximum attempts exceeded, the exception which occured during last
attempt is reraised.
"""
def wrapper(f):
def func_wrapper(*args, **kwargs):
# (ivasilevskaya) think of a more informative data to log
action = '%(func)s' % {'func': getattr(f, '__name__', f)}
for attempt in range(1, max_attempts + 1):
try:
return f(*args, **kwargs)
except oslo_messaging.MessagingException:
with excutils.save_and_reraise_exception(
reraise=False) as ctxt:
LOG.warning(
_LW('Failed to execute %(action)s. %(attempt)d out'
' of %(max_attempts)d'),
{'attempt': attempt,
'max_attempts': max_attempts,
'action': action})
if attempt == max_attempts:
ctxt.reraise = True
return func_wrapper
return wrapper
def retry(func, max_attempts):
"""Adds the retry logic to original function and returns a partial.
The returned partial can be called with the same arguments as the original
function.
"""
return _call_with_retry(max_attempts)(func)

View File

@ -57,10 +57,6 @@ L3_AGENTS_SCHEDULER_OPTS = [
cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS)
# default messaging timeout is 60 sec, so 2 here is chosen to not block API
# call for more than 2 minutes
AGENT_NOTIFY_MAX_ATTEMPTS = 2
class RouterL3AgentBinding(model_base.BASEV2):
"""Represents binding between neutron routers and L3 agents."""
@ -314,19 +310,10 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
context, router_id, host)
for agent in new_agents:
# Need to make sure agents are notified or unschedule otherwise
for attempt in range(AGENT_NOTIFY_MAX_ATTEMPTS):
try:
l3_notifier.router_added_to_agent(
context, [router_id], agent['host'])
break
except oslo_messaging.MessagingException:
LOG.warning(_LW('Failed to notify L3 agent on host '
'%(host)s about added router. Attempt '
'%(attempt)d out of %(max_attempts)d'),
{'host': agent['host'], 'attempt': attempt + 1,
'max_attempts': AGENT_NOTIFY_MAX_ATTEMPTS})
else:
try:
l3_notifier.router_added_to_agent(
context, [router_id], agent['host'])
except oslo_messaging.MessagingException:
self._unbind_router(context, router_id, agent['id'])
raise l3agentscheduler.RouterReschedulingFailed(
router_id=router_id)

View File

@ -25,6 +25,7 @@ from webob import exc
from neutron.api import extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.v2 import attributes
@ -234,7 +235,11 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin,
service_plugins = {'l3_plugin_name': self.l3_plugin}
else:
service_plugins = None
mock.patch('neutron.common.rpc.get_client').start()
# NOTE(ivasilevskaya) mocking this way allows some control over mocked
# client like further method mocking with asserting calls
self.client_mock = mock.MagicMock(name="mocked client")
mock.patch('neutron.common.rpc.get_client'
).start().return_value = self.client_mock
super(OvsAgentSchedulerTestCaseBase, self).setUp(
self.plugin_str, service_plugins=service_plugins)
mock.patch.object(
@ -789,47 +794,45 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase):
self.assertFalse(ret_b)
def test_router_reschedule_succeeded_after_failed_notification(self):
l3_plugin = (manager.NeutronManager.get_service_plugins()
[service_constants.L3_ROUTER_NAT])
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
with self.router() as router:
# schedule the router to host A
l3_rpc_cb.get_router_ids(self.adminContext, host=L3_HOSTA)
with mock.patch.object(
l3_notifier, 'router_added_to_agent') as notification_mock:
notification_mock.side_effect = [
oslo_messaging.MessagingTimeout, None]
self._take_down_agent_and_run_reschedule(L3_HOSTA)
self.assertEqual(
2, l3_notifier.router_added_to_agent.call_count)
# make sure router was rescheduled even when first attempt
# failed to notify l3 agent
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])['agents']
self.assertEqual(1, len(l3_agents))
self.assertEqual(L3_HOSTB, l3_agents[0]['host'])
ctxt_mock = mock.MagicMock()
call_mock = mock.MagicMock(
side_effect=[oslo_messaging.MessagingTimeout, None])
ctxt_mock.call = call_mock
self.client_mock.prepare = mock.MagicMock(return_value=ctxt_mock)
self._take_down_agent_and_run_reschedule(L3_HOSTA)
self.assertEqual(2, call_mock.call_count)
# make sure router was rescheduled even when first attempt
# failed to notify l3 agent
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])['agents']
self.assertEqual(1, len(l3_agents))
self.assertEqual(L3_HOSTB, l3_agents[0]['host'])
def test_router_reschedule_failed_notification_all_attempts(self):
l3_plugin = (manager.NeutronManager.get_service_plugins()
[service_constants.L3_ROUTER_NAT])
l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3]
l3_rpc_cb = l3_rpc.L3RpcCallback()
self._register_agent_states()
with self.router() as router:
# schedule the router to host A
l3_rpc_cb.get_router_ids(self.adminContext, host=L3_HOSTA)
with mock.patch.object(
l3_notifier, 'router_added_to_agent') as notification_mock:
notification_mock.side_effect = oslo_messaging.MessagingTimeout
self._take_down_agent_and_run_reschedule(L3_HOSTA)
self.assertEqual(
l3_agentschedulers_db.AGENT_NOTIFY_MAX_ATTEMPTS,
l3_notifier.router_added_to_agent.call_count)
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])['agents']
self.assertEqual(0, len(l3_agents))
# mock client.prepare and context.call
ctxt_mock = mock.MagicMock()
call_mock = mock.MagicMock(
side_effect=oslo_messaging.MessagingTimeout)
ctxt_mock.call = call_mock
self.client_mock.prepare = mock.MagicMock(return_value=ctxt_mock)
# perform operations
self._take_down_agent_and_run_reschedule(L3_HOSTA)
self.assertEqual(
l3_rpc_agent_api.AGENT_NOTIFY_MAX_ATTEMPTS,
call_mock.call_count)
l3_agents = self._list_l3_agents_hosting_router(
router['router']['id'])['agents']
self.assertEqual(0, len(l3_agents))
def test_router_auto_schedule_with_invalid_router(self):
with self.router() as router: