diff --git a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py index 70ab9f39533..ee0544337df 100644 --- a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py @@ -20,6 +20,7 @@ from oslo_log import log as logging import oslo_messaging from neutron._i18n import _LE +from neutron.api.rpc.agentnotifiers import utils as ag_utils from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.common import utils @@ -29,6 +30,10 @@ from neutron.plugins.common import constants as service_constants LOG = logging.getLogger(__name__) +# default messaging timeout is 60 sec, so 2 here is chosen to not block API +# call for more than 2 minutes +AGENT_NOTIFY_MAX_ATTEMPTS = 2 + class L3AgentNotifyAPI(object): """API for plugin to notify L3 agent.""" @@ -44,7 +49,8 @@ class L3AgentNotifyAPI(object): '%(method)s', {'host': host, 'method': method}) cctxt = self.client.prepare(server=host) - rpc_method = cctxt.call if use_call else cctxt.cast + rpc_method = (ag_utils.retry(cctxt.call, AGENT_NOTIFY_MAX_ATTEMPTS) + if use_call else cctxt.cast) rpc_method(context, method, **kwargs) def _agent_notification(self, context, method, router_ids, operation, diff --git a/neutron/api/rpc/agentnotifiers/utils.py b/neutron/api/rpc/agentnotifiers/utils.py new file mode 100644 index 00000000000..a808120584d --- /dev/null +++ b/neutron/api/rpc/agentnotifiers/utils.py @@ -0,0 +1,62 @@ +# Copyright (c) 2016 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import oslo_messaging +from oslo_utils import excutils + +from neutron._i18n import _LW + +LOG = logging.getLogger(__name__) + + +def _call_with_retry(max_attempts): + """A wrapper to retry function using rpc call in case of + MessagingException. + + Retries the decorated function in case of MessagingException of some kind + (a timeout, client send error etc). + If maximum attempts exceeded, the exception which occured during last + attempt is reraised. + """ + def wrapper(f): + def func_wrapper(*args, **kwargs): + # (ivasilevskaya) think of a more informative data to log + action = '%(func)s' % {'func': getattr(f, '__name__', f)} + for attempt in range(1, max_attempts + 1): + try: + return f(*args, **kwargs) + except oslo_messaging.MessagingException: + with excutils.save_and_reraise_exception( + reraise=False) as ctxt: + LOG.warning( + _LW('Failed to execute %(action)s. %(attempt)d out' + ' of %(max_attempts)d'), + {'attempt': attempt, + 'max_attempts': max_attempts, + 'action': action}) + if attempt == max_attempts: + ctxt.reraise = True + return func_wrapper + return wrapper + + +def retry(func, max_attempts): + """Adds the retry logic to original function and returns a partial. + + The returned partial can be called with the same arguments as the original + function. + """ + return _call_with_retry(max_attempts)(func) diff --git a/neutron/db/l3_agentschedulers_db.py b/neutron/db/l3_agentschedulers_db.py index b0516072ee1..01b154cd4a2 100644 --- a/neutron/db/l3_agentschedulers_db.py +++ b/neutron/db/l3_agentschedulers_db.py @@ -57,10 +57,6 @@ L3_AGENTS_SCHEDULER_OPTS = [ cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS) -# default messaging timeout is 60 sec, so 2 here is chosen to not block API -# call for more than 2 minutes -AGENT_NOTIFY_MAX_ATTEMPTS = 2 - class RouterL3AgentBinding(model_base.BASEV2): """Represents binding between neutron routers and L3 agents.""" @@ -314,19 +310,10 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, context, router_id, host) for agent in new_agents: - # Need to make sure agents are notified or unschedule otherwise - for attempt in range(AGENT_NOTIFY_MAX_ATTEMPTS): - try: - l3_notifier.router_added_to_agent( - context, [router_id], agent['host']) - break - except oslo_messaging.MessagingException: - LOG.warning(_LW('Failed to notify L3 agent on host ' - '%(host)s about added router. Attempt ' - '%(attempt)d out of %(max_attempts)d'), - {'host': agent['host'], 'attempt': attempt + 1, - 'max_attempts': AGENT_NOTIFY_MAX_ATTEMPTS}) - else: + try: + l3_notifier.router_added_to_agent( + context, [router_id], agent['host']) + except oslo_messaging.MessagingException: self._unbind_router(context, router_id, agent['id']) raise l3agentscheduler.RouterReschedulingFailed( router_id=router_id) diff --git a/neutron/tests/unit/db/test_agentschedulers_db.py b/neutron/tests/unit/db/test_agentschedulers_db.py index 692e1b96de1..7049fd93ae4 100644 --- a/neutron/tests/unit/db/test_agentschedulers_db.py +++ b/neutron/tests/unit/db/test_agentschedulers_db.py @@ -25,6 +25,7 @@ from webob import exc from neutron.api import extensions from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api +from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.rpc.handlers import dhcp_rpc from neutron.api.rpc.handlers import l3_rpc from neutron.api.v2 import attributes @@ -234,7 +235,11 @@ class OvsAgentSchedulerTestCaseBase(test_l3.L3NatTestCaseMixin, service_plugins = {'l3_plugin_name': self.l3_plugin} else: service_plugins = None - mock.patch('neutron.common.rpc.get_client').start() + # NOTE(ivasilevskaya) mocking this way allows some control over mocked + # client like further method mocking with asserting calls + self.client_mock = mock.MagicMock(name="mocked client") + mock.patch('neutron.common.rpc.get_client' + ).start().return_value = self.client_mock super(OvsAgentSchedulerTestCaseBase, self).setUp( self.plugin_str, service_plugins=service_plugins) mock.patch.object( @@ -789,47 +794,45 @@ class OvsAgentSchedulerTestCase(OvsAgentSchedulerTestCaseBase): self.assertFalse(ret_b) def test_router_reschedule_succeeded_after_failed_notification(self): - l3_plugin = (manager.NeutronManager.get_service_plugins() - [service_constants.L3_ROUTER_NAT]) - l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3] l3_rpc_cb = l3_rpc.L3RpcCallback() self._register_agent_states() with self.router() as router: # schedule the router to host A l3_rpc_cb.get_router_ids(self.adminContext, host=L3_HOSTA) - with mock.patch.object( - l3_notifier, 'router_added_to_agent') as notification_mock: - notification_mock.side_effect = [ - oslo_messaging.MessagingTimeout, None] - self._take_down_agent_and_run_reschedule(L3_HOSTA) - self.assertEqual( - 2, l3_notifier.router_added_to_agent.call_count) - # make sure router was rescheduled even when first attempt - # failed to notify l3 agent - l3_agents = self._list_l3_agents_hosting_router( - router['router']['id'])['agents'] - self.assertEqual(1, len(l3_agents)) - self.assertEqual(L3_HOSTB, l3_agents[0]['host']) + ctxt_mock = mock.MagicMock() + call_mock = mock.MagicMock( + side_effect=[oslo_messaging.MessagingTimeout, None]) + ctxt_mock.call = call_mock + self.client_mock.prepare = mock.MagicMock(return_value=ctxt_mock) + self._take_down_agent_and_run_reschedule(L3_HOSTA) + self.assertEqual(2, call_mock.call_count) + # make sure router was rescheduled even when first attempt + # failed to notify l3 agent + l3_agents = self._list_l3_agents_hosting_router( + router['router']['id'])['agents'] + self.assertEqual(1, len(l3_agents)) + self.assertEqual(L3_HOSTB, l3_agents[0]['host']) def test_router_reschedule_failed_notification_all_attempts(self): - l3_plugin = (manager.NeutronManager.get_service_plugins() - [service_constants.L3_ROUTER_NAT]) - l3_notifier = l3_plugin.agent_notifiers[constants.AGENT_TYPE_L3] l3_rpc_cb = l3_rpc.L3RpcCallback() self._register_agent_states() with self.router() as router: # schedule the router to host A l3_rpc_cb.get_router_ids(self.adminContext, host=L3_HOSTA) - with mock.patch.object( - l3_notifier, 'router_added_to_agent') as notification_mock: - notification_mock.side_effect = oslo_messaging.MessagingTimeout - self._take_down_agent_and_run_reschedule(L3_HOSTA) - self.assertEqual( - l3_agentschedulers_db.AGENT_NOTIFY_MAX_ATTEMPTS, - l3_notifier.router_added_to_agent.call_count) - l3_agents = self._list_l3_agents_hosting_router( - router['router']['id'])['agents'] - self.assertEqual(0, len(l3_agents)) + # mock client.prepare and context.call + ctxt_mock = mock.MagicMock() + call_mock = mock.MagicMock( + side_effect=oslo_messaging.MessagingTimeout) + ctxt_mock.call = call_mock + self.client_mock.prepare = mock.MagicMock(return_value=ctxt_mock) + # perform operations + self._take_down_agent_and_run_reschedule(L3_HOSTA) + self.assertEqual( + l3_rpc_agent_api.AGENT_NOTIFY_MAX_ATTEMPTS, + call_mock.call_count) + l3_agents = self._list_l3_agents_hosting_router( + router['router']['id'])['agents'] + self.assertEqual(0, len(l3_agents)) def test_router_auto_schedule_with_invalid_router(self): with self.router() as router: