Allow DB retries on controller_worker creates

Temporary workaround for commit ordering issues with the amphora driver,
until the driver can be rewritten against the real driver interface.

Story: 2002127
Task: 19809
Change-Id: Idfaca392b278a6efad36e51adaedc6c80372a006
This commit is contained in:
Adam Harwell 2018-05-29 23:58:36 -07:00
parent 190f682aac
commit 48e85569f7
4 changed files with 97 additions and 12 deletions

View File

@ -224,10 +224,6 @@ class GlanceNoTaggedImages(OctaviaException):
message = _("No Glance images are tagged with %(tag)s tag.")
class NoSuitableAmphoraException(OctaviaException):
message = _('Unable to allocate an amphora due to: %(msg)s')
# This is an internal use exception for the taskflow work flow
# and will not be exposed to the customer. This means it is a
# normal part of operation while waiting for compute to go active

View File

@ -17,7 +17,9 @@ import logging
from oslo_config import cfg
from oslo_utils import excutils
from sqlalchemy.orm import exc as db_exceptions
from taskflow.listeners import logging as tf_logging
import tenacity
from octavia.common import base_taskflow
from octavia.common import constants
@ -35,6 +37,11 @@ from octavia.db import repositories as repo
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
RETRY_ATTEMPTS = 15
RETRY_INITIAL_DELAY = 1
RETRY_BACKOFF = 1
RETRY_MAX = 5
class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
@ -109,15 +116,24 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
delete_amp_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_health_monitor(self, health_monitor_id):
"""Creates a health monitor.
:param pool_id: ID of the pool to create a health monitor on
:returns: None
:raises NoSuitablePool: Unable to find the node pool
:raises NoResultFound: Unable to find the object
"""
health_mon = self._health_mon_repo.get(db_apis.get_session(),
id=health_monitor_id)
if not health_mon:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'health_monitor', health_monitor_id)
raise db_exceptions.NoResultFound
pool = health_mon.pool
listeners = pool.listeners
@ -185,15 +201,25 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_hm_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_listener(self, listener_id):
"""Creates a listener.
:param listener_id: ID of the listener to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
listener = self._listener_repo.get(db_apis.get_session(),
id=listener_id)
if not listener:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'listener', listener_id)
raise db_exceptions.NoResultFound
load_balancer = listener.load_balancer
create_listener_tf = self._taskflow_load(self._listener_flows.
@ -251,6 +277,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
with tf_logging.DynamicLoggingListener(update_listener_tf, log=LOG):
update_listener_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_load_balancer(self, load_balancer_id):
"""Creates a load balancer by allocating Amphorae.
@ -260,8 +291,13 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
:param load_balancer_id: ID of the load balancer to create
:returns: None
:raises NoSuitableAmphoraException: Unable to allocate an Amphora.
:raises NoResultFound: Unable to find the object
"""
lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
if not lb:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'load_balancer', load_balancer_id)
raise db_exceptions.NoResultFound
store = {constants.LOADBALANCER_ID: load_balancer_id,
constants.BUILD_TYPE_PRIORITY:
@ -273,7 +309,6 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
constants.LOADBALANCER_TOPOLOGY: topology
}
lb = self._lb_repo.get(db_apis.get_session(), id=load_balancer_id)
create_lb_flow = self._lb_flows.get_create_load_balancer_flow(
topology=topology, listeners=lb.listeners)
@ -330,6 +365,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_lb_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_member(self, member_id):
"""Creates a pool member.
@ -339,6 +379,11 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
"""
member = self._member_repo.get(db_apis.get_session(),
id=member_id)
if not member:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'member', member_id)
raise db_exceptions.NoResultFound
pool = member.pool
listeners = pool.listeners
load_balancer = pool.load_balancer
@ -434,15 +479,24 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_member_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_pool(self, pool_id):
"""Creates a node pool.
:param pool_id: ID of the pool to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
pool = self._pool_repo.get(db_apis.get_session(),
id=pool_id)
if not pool:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'pool', pool_id)
raise db_exceptions.NoResultFound
listeners = pool.listeners
load_balancer = pool.load_balancer
@ -506,15 +560,24 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_pool_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_l7policy(self, l7policy_id):
"""Creates an L7 Policy.
:param l7policy_id: ID of the l7policy to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
l7policy = self._l7policy_repo.get(db_apis.get_session(),
id=l7policy_id)
if not l7policy:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7policy', l7policy_id)
raise db_exceptions.NoResultFound
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer
@ -574,15 +637,25 @@ class ControllerWorker(base_taskflow.BaseTaskFlowEngine):
log=LOG):
update_l7policy_tf.run()
@tenacity.retry(
retry=tenacity.retry_if_exception_type(db_exceptions.NoResultFound),
wait=tenacity.wait_incrementing(
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
def create_l7rule(self, l7rule_id):
"""Creates an L7 Rule.
:param l7rule_id: ID of the l7rule to create
:returns: None
:raises NoSuitableLB: Unable to find the load balancer
:raises NoResultFound: Unable to find the object
"""
l7rule = self._l7rule_repo.get(db_apis.get_session(),
id=l7rule_id)
if not l7rule:
LOG.warning('Failed to fetch %s %s from DB. Retrying for up to '
'60 seconds.', 'l7rule', l7rule_id)
raise db_exceptions.NoResultFound
l7policy = l7rule.l7policy
listeners = [l7policy.listener]
load_balancer = l7policy.listener.load_balancer

View File

@ -190,6 +190,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_health_mon_repo_get.side_effect = [None, _health_mon_mock]
cw = controller_worker.ControllerWorker()
cw.create_health_monitor(_health_mon_mock)
@ -206,6 +207,7 @@ class TestControllerWorker(base.TestCase):
_pool_mock}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_health_mon_repo_get.call_count)
@mock.patch('octavia.controller.worker.flows.'
'health_monitor_flows.HealthMonitorFlows.'
@ -300,6 +302,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_listener_repo_get.side_effect = [None, _listener_mock]
cw = controller_worker.ControllerWorker()
cw.create_listener(LB_ID)
@ -312,6 +315,7 @@ class TestControllerWorker(base.TestCase):
[_listener_mock]}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_listener_repo_get.call_count)
@mock.patch('octavia.controller.worker.flows.'
'listener_flows.ListenerFlows.get_delete_listener_flow',
@ -406,7 +410,9 @@ class TestControllerWorker(base.TestCase):
'update_dict': {'topology': constants.TOPOLOGY_SINGLE},
constants.BUILD_TYPE_PRIORITY: constants.LB_CREATE_NORMAL_PRIORITY
}
setattr(mock_lb_repo_get.return_value, 'listeners', [])
lb_mock = mock.MagicMock()
lb_mock.listeners = []
mock_lb_repo_get.side_effect = [None, None, None, lb_mock]
cw = controller_worker.ControllerWorker()
cw.create_load_balancer(LB_ID)
@ -416,6 +422,7 @@ class TestControllerWorker(base.TestCase):
mock_taskflow_load.assert_called_with(
mock_get_create_load_balancer_flow.return_value, store=store)
mock_eng.run.assert_any_call()
self.assertEqual(4, mock_lb_repo_get.call_count)
@mock.patch('octavia.controller.worker.flows.load_balancer_flows.'
'LoadBalancerFlows.get_create_load_balancer_flow',
@ -687,6 +694,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_member_repo_get.side_effect = [None, _member_mock]
cw = controller_worker.ControllerWorker()
cw.create_member(MEMBER_ID)
@ -702,6 +710,7 @@ class TestControllerWorker(base.TestCase):
_pool_mock}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_member_repo_get.call_count)
@mock.patch('octavia.controller.worker.flows.'
'member_flows.MemberFlows.get_delete_member_flow',
@ -823,6 +832,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_pool_repo_get.side_effect = [None, _pool_mock]
cw = controller_worker.ControllerWorker()
cw.create_pool(POOL_ID)
@ -836,6 +846,7 @@ class TestControllerWorker(base.TestCase):
_load_balancer_mock}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_pool_repo_get.call_count)
@mock.patch('octavia.controller.worker.flows.'
'pool_flows.PoolFlows.get_delete_pool_flow',
@ -921,6 +932,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_l7policy_repo_get.side_effect = [None, _l7policy_mock]
cw = controller_worker.ControllerWorker()
cw.create_l7policy(L7POLICY_ID)
@ -934,6 +946,7 @@ class TestControllerWorker(base.TestCase):
_load_balancer_mock}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_l7policy_repo_get.call_count)
@mock.patch('octavia.controller.worker.flows.'
'l7policy_flows.L7PolicyFlows.get_delete_l7policy_flow',
@ -1019,6 +1032,7 @@ class TestControllerWorker(base.TestCase):
mock_amp_repo_get):
_flow_mock.reset_mock()
mock_l7rule_repo_get.side_effect = [None, _l7rule_mock]
cw = controller_worker.ControllerWorker()
cw.create_l7rule(L7RULE_ID)
@ -1033,6 +1047,7 @@ class TestControllerWorker(base.TestCase):
_load_balancer_mock}))
_flow_mock.run.assert_called_once_with()
self.assertEqual(2, mock_l7rule_repo_get.call_count)
@mock.patch('octavia.controller.worker.flows.'
'l7rule_flows.L7RuleFlows.get_delete_l7rule_flow',

View File

@ -39,6 +39,7 @@ taskflow>=2.16.0 # Apache-2.0
diskimage-builder!=1.6.0,!=1.7.0,!=1.7.1,>=1.1.2 # Apache-2.0
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD
castellan>=0.16.0 # Apache-2.0
tenacity # Apache-2.0
#for the amphora api
Flask!=0.11,<1.0,>=0.10 # BSD