From 48e85569f7e43e3dd5f09fd1ef4cb165526a92cd Mon Sep 17 00:00:00 2001 From: Adam Harwell Date: Tue, 29 May 2018 23:58:36 -0700 Subject: [PATCH] Allow DB retries on controller_worker creates Temporary workaround for commit ordering issues with the amphora driver, until the driver can be rewritten against the real driver interface. Story: 2002127 Task: 19809 Change-Id: Idfaca392b278a6efad36e51adaedc6c80372a006 --- octavia/common/exceptions.py | 4 - .../controller/worker/controller_worker.py | 87 +++++++++++++++++-- .../worker/test_controller_worker.py | 17 +++- requirements.txt | 1 + 4 files changed, 97 insertions(+), 12 deletions(-) diff --git a/octavia/common/exceptions.py b/octavia/common/exceptions.py index 24d8072ef3..e0b4cf96ca 100644 --- a/octavia/common/exceptions.py +++ b/octavia/common/exceptions.py @@ -224,10 +224,6 @@ class GlanceNoTaggedImages(OctaviaException): message = _("No Glance images are tagged with %(tag)s tag.") -class NoSuitableAmphoraException(OctaviaException): - message = _('Unable to allocate an amphora due to: %(msg)s') - - # This is an internal use exception for the taskflow work flow # and will not be exposed to the customer. This means it is a # normal part of operation while waiting for compute to go active diff --git a/octavia/controller/worker/controller_worker.py b/octavia/controller/worker/controller_worker.py index e4cd4b2e8b..d019bb49f9 100644 --- a/octavia/controller/worker/controller_worker.py +++ b/octavia/controller/worker/controller_worker.py @@ -17,7 +17,9 @@ import logging from oslo_config import cfg from oslo_utils import excutils +from sqlalchemy.orm import exc as db_exceptions from taskflow.listeners import logging as tf_logging +import tenacity from octavia.common import base_taskflow from octavia.common import constants @@ -35,6 +37,11 @@ from octavia.db import repositories as repo CONF = cfg.CONF LOG = logging.getLogger(__name__) +RETRY_ATTEMPTS = 15 +RETRY_INITIAL_DELAY = 1 +RETRY_BACKOFF = 1 +RETRY_MAX = 5 + class ControllerWorker(base_taskflow.BaseTaskFlowEngine): @@ -109,15 +116,24 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): delete_amp_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_health_monitor(self, health_monitor_id): """Creates a health monitor. :param pool_id: ID of the pool to create a health monitor on :returns: None - :raises NoSuitablePool: Unable to find the node pool + :raises NoResultFound: Unable to find the object """ health_mon = self._health_mon_repo.get(db_apis.get_session(), id=health_monitor_id) + if not health_mon: + LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' + '60 seconds.', 'health_monitor', health_monitor_id) + raise db_exceptions.NoResultFound pool = health_mon.pool listeners = pool.listeners @@ -185,15 +201,25 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): update_hm_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_listener(self, listener_id): """Creates a listener. :param listener_id: ID of the listener to create :returns: None - :raises NoSuitableLB: Unable to find the load balancer + :raises NoResultFound: Unable to find the object """ listener = self._listener_repo.get(db_apis.get_session(), id=listener_id) + if not listener: + LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' + '60 seconds.', 'listener', listener_id) + raise db_exceptions.NoResultFound + load_balancer = listener.load_balancer create_listener_tf = self._taskflow_load(self._listener_flows. @@ -251,6 +277,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG): update_listener_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_load_balancer(self, load_balancer_id): """Creates a load balancer by allocating Amphorae. @@ -260,8 +291,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): :param load_balancer_id: ID of the load balancer to create :returns: None - :raises NoSuitableAmphoraException: Unable to allocate an Amphora. + :raises NoResultFound: Unable to find the object """ + lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id) + if not lb: + LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' + '60 seconds.', 'load_balancer', load_balancer_id) + raise db_exceptions.NoResultFound store = {constants.LOADBALANCER_ID: load_balancer_id, constants.BUILD_TYPE_PRIORITY: @@ -273,7 +309,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): constants.LOADBALANCER_TOPOLOGY: topology } - lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id) create_lb_flow = self._lb_flows.get_create_load_balancer_flow( topology=topology, listeners=lb.listeners) @@ -330,6 +365,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): update_lb_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_member(self, member_id): """Creates a pool member. @@ -339,6 +379,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): """ member = self._member_repo.get(db_apis.get_session(), id=member_id) + if not member: + LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' + '60 seconds.', 'member', member_id) + raise db_exceptions.NoResultFound + pool = member.pool listeners = pool.listeners load_balancer = pool.load_balancer @@ -434,15 +479,24 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): update_member_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_pool(self, pool_id): """Creates a node pool. :param pool_id: ID of the pool to create :returns: None - :raises NoSuitableLB: Unable to find the load balancer + :raises NoResultFound: Unable to find the object """ pool = self._pool_repo.get(db_apis.get_session(), id=pool_id) + if not pool: + LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' + '60 seconds.', 'pool', pool_id) + raise db_exceptions.NoResultFound listeners = pool.listeners load_balancer = pool.load_balancer @@ -506,15 +560,24 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): update_pool_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_l7policy(self, l7policy_id): """Creates an L7 Policy. :param l7policy_id: ID of the l7policy to create :returns: None - :raises NoSuitableLB: Unable to find the load balancer + :raises NoResultFound: Unable to find the object """ l7policy = self._l7policy_repo.get(db_apis.get_session(), id=l7policy_id) + if not l7policy: + LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' + '60 seconds.', 'l7policy', l7policy_id) + raise db_exceptions.NoResultFound listeners = [l7policy.listener] load_balancer = l7policy.listener.load_balancer @@ -574,15 +637,25 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine): log=LOG): update_l7policy_tf.run() + @tenacity.retry( + retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def create_l7rule(self, l7rule_id): """Creates an L7 Rule. :param l7rule_id: ID of the l7rule to create :returns: None - :raises NoSuitableLB: Unable to find the load balancer + :raises NoResultFound: Unable to find the object """ l7rule = self._l7rule_repo.get(db_apis.get_session(), id=l7rule_id) + if not l7rule: + LOG.warning('Failed to fetch %s %s from DB. Retrying for up to ' + '60 seconds.', 'l7rule', l7rule_id) + raise db_exceptions.NoResultFound + l7policy = l7rule.l7policy listeners = [l7policy.listener] load_balancer = l7policy.listener.load_balancer diff --git a/octavia/tests/unit/controller/worker/test_controller_worker.py b/octavia/tests/unit/controller/worker/test_controller_worker.py index 432e16f92c..4d039f5230 100644 --- a/octavia/tests/unit/controller/worker/test_controller_worker.py +++ b/octavia/tests/unit/controller/worker/test_controller_worker.py @@ -190,6 +190,7 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() + mock_health_mon_repo_get.side_effect = [None, _health_mon_mock] cw = controller_worker.ControllerWorker() cw.create_health_monitor(_health_mon_mock) @@ -206,6 +207,7 @@ class TestControllerWorker(base.TestCase): _pool_mock})) _flow_mock.run.assert_called_once_with() + self.assertEqual(2, mock_health_mon_repo_get.call_count) @mock.patch('octavia.controller.worker.flows.' 'health_monitor_flows.HealthMonitorFlows.' @@ -300,6 +302,7 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() + mock_listener_repo_get.side_effect = [None, _listener_mock] cw = controller_worker.ControllerWorker() cw.create_listener(LB_ID) @@ -312,6 +315,7 @@ class TestControllerWorker(base.TestCase): [_listener_mock]})) _flow_mock.run.assert_called_once_with() + self.assertEqual(2, mock_listener_repo_get.call_count) @mock.patch('octavia.controller.worker.flows.' 'listener_flows.ListenerFlows.get_delete_listener_flow', @@ -406,7 +410,9 @@ class TestControllerWorker(base.TestCase): 'update_dict': {'topology': constants.TOPOLOGY_SINGLE}, constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY } - setattr(mock_lb_repo_get.return_value, 'listeners', []) + lb_mock = mock.MagicMock() + lb_mock.listeners = [] + mock_lb_repo_get.side_effect = [None, None, None, lb_mock] cw = controller_worker.ControllerWorker() cw.create_load_balancer(LB_ID) @@ -416,6 +422,7 @@ class TestControllerWorker(base.TestCase): mock_taskflow_load.assert_called_with( mock_get_create_load_balancer_flow.return_value, store=store) mock_eng.run.assert_any_call() + self.assertEqual(4, mock_lb_repo_get.call_count) @mock.patch('octavia.controller.worker.flows.load_balancer_flows.' 'LoadBalancerFlows.get_create_load_balancer_flow', @@ -687,6 +694,7 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() + mock_member_repo_get.side_effect = [None, _member_mock] cw = controller_worker.ControllerWorker() cw.create_member(MEMBER_ID) @@ -702,6 +710,7 @@ class TestControllerWorker(base.TestCase): _pool_mock})) _flow_mock.run.assert_called_once_with() + self.assertEqual(2, mock_member_repo_get.call_count) @mock.patch('octavia.controller.worker.flows.' 'member_flows.MemberFlows.get_delete_member_flow', @@ -823,6 +832,7 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() + mock_pool_repo_get.side_effect = [None, _pool_mock] cw = controller_worker.ControllerWorker() cw.create_pool(POOL_ID) @@ -836,6 +846,7 @@ class TestControllerWorker(base.TestCase): _load_balancer_mock})) _flow_mock.run.assert_called_once_with() + self.assertEqual(2, mock_pool_repo_get.call_count) @mock.patch('octavia.controller.worker.flows.' 'pool_flows.PoolFlows.get_delete_pool_flow', @@ -921,6 +932,7 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() + mock_l7policy_repo_get.side_effect = [None, _l7policy_mock] cw = controller_worker.ControllerWorker() cw.create_l7policy(L7POLICY_ID) @@ -934,6 +946,7 @@ class TestControllerWorker(base.TestCase): _load_balancer_mock})) _flow_mock.run.assert_called_once_with() + self.assertEqual(2, mock_l7policy_repo_get.call_count) @mock.patch('octavia.controller.worker.flows.' 'l7policy_flows.L7PolicyFlows.get_delete_l7policy_flow', @@ -1019,6 +1032,7 @@ class TestControllerWorker(base.TestCase): mock_amp_repo_get): _flow_mock.reset_mock() + mock_l7rule_repo_get.side_effect = [None, _l7rule_mock] cw = controller_worker.ControllerWorker() cw.create_l7rule(L7RULE_ID) @@ -1033,6 +1047,7 @@ class TestControllerWorker(base.TestCase): _load_balancer_mock})) _flow_mock.run.assert_called_once_with() + self.assertEqual(2, mock_l7rule_repo_get.call_count) @mock.patch('octavia.controller.worker.flows.' 'l7rule_flows.L7RuleFlows.get_delete_l7rule_flow', diff --git a/requirements.txt b/requirements.txt index 872195dadc..90670b0144 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,6 +39,7 @@ taskflow>=2.16.0 # Apache-2.0 diskimage-builder!=1.6.0,!=1.7.0,!=1.7.1,>=1.1.2 # Apache-2.0 futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD castellan>=0.16.0 # Apache-2.0 +tenacity # Apache-2.0 #for the amphora api Flask!=0.11,<1.0,>=0.10 # BSD