diff --git a/kuryr_kubernetes/controller/drivers/lbaasv2.py b/kuryr_kubernetes/controller/drivers/lbaasv2.py index 4bc2ddb3b..84773d3ff 100644 --- a/kuryr_kubernetes/controller/drivers/lbaasv2.py +++ b/kuryr_kubernetes/controller/drivers/lbaasv2.py @@ -49,6 +49,9 @@ _OCTAVIA_DL_VERSION = 2, 13 _OCTAVIA_ACL_VERSION = 2, 12 _OCTAVIA_PROVIDER_VERSION = 2, 6 +# HTTP Codes raised by Octavia when a Resource already exists +OKAY_CODES = (409, 500) + class LBaaSv2Driver(base.LBaaSDriver): """LBaaSv2Driver implements LBaaSDriver for Neutron LBaaSv2 API.""" @@ -140,8 +143,9 @@ class LBaaSv2Driver(base.LBaaSDriver): 'security_groups': security_groups_ids, 'provider': provider } - response = self._ensure(self._create_loadbalancer, - self._find_loadbalancer, request) + + response = self._ensure_loadbalancer(request) + if not response: # NOTE(ivc): load balancer was present before 'create', but got # deleted externally between 'create' and 'find' @@ -346,7 +350,7 @@ class LBaaSv2Driver(base.LBaaSDriver): try: result = self._ensure_provisioned( loadbalancer, listener, self._create_listener, - self._find_listener, _LB_STS_POLL_SLOW_INTERVAL) + self._find_listener, interval=_LB_STS_POLL_SLOW_INTERVAL) except os_exc.SDKException: LOG.exception("Listener creation failed, most probably because " "protocol %(prot)s is not supported", @@ -425,6 +429,7 @@ class LBaaSv2Driver(base.LBaaSDriver): def ensure_member(self, loadbalancer, pool, subnet_id, ip, port, target_ref_namespace, target_ref_name, listener_port=None): + lbaas = clients.get_loadbalancer_client() name = ("%s/%s" % (target_ref_namespace, target_ref_name)) name += ":%s" % port member = { @@ -437,7 +442,8 @@ class LBaaSv2Driver(base.LBaaSDriver): } result = self._ensure_provisioned(loadbalancer, member, self._create_member, - self._find_member) + self._find_member, + update=lbaas.update_member) network_policy = ( 'policy' in CONF.kubernetes.enabled_handlers and @@ -643,9 +649,9 @@ class LBaaSv2Driver(base.LBaaSDriver): def _find_member(self, member, loadbalancer): lbaas = clients.get_loadbalancer_client() + member = dict(member) response = lbaas.members( member['pool_id'], - name=member['name'], project_id=member['project_id'], subnet_id=member['subnet_id'], address=member['ip'], @@ -654,6 +660,7 @@ class LBaaSv2Driver(base.LBaaSDriver): try: os_members = next(response) member['id'] = os_members.id + member['name'] = os_members.name if os_members.provisioning_status == 'ERROR': LOG.debug("Releasing Member %s", os_members.id) self.release_member(loadbalancer, member) @@ -663,29 +670,48 @@ class LBaaSv2Driver(base.LBaaSDriver): return member - def _ensure(self, create, find, *args): - okay_codes = (409, 500) + def _ensure(self, create, find, obj, loadbalancer, update=None): try: - result = create(args[0]) + result = create(obj) LOG.debug("Created %(obj)s", {'obj': result}) return result except os_exc.HttpException as e: - if e.status_code not in okay_codes: + if e.status_code not in OKAY_CODES: raise - result = find(*args) + result = find(obj, loadbalancer) + # NOTE(maysams): A conflict may happen when a member is + # a lefover and a new pod uses the same address. Let's + # attempt to udpate the member name if already existent. + if result and obj['name'] != result.get('name') and update: + update(result['id'], obj['pool_id'], name=obj['name']) + result['name'] = obj['name'] + if result: + LOG.debug("Found %(obj)s", {'obj': result}) + return result + + def _ensure_loadbalancer(self, loadbalancer): + try: + result = self._create_loadbalancer(loadbalancer) + LOG.debug("Created %(obj)s", {'obj': result}) + return result + except os_exc.HttpException as e: + if e.status_code not in OKAY_CODES: + raise + result = self._find_loadbalancer(loadbalancer) if result: LOG.debug("Found %(obj)s", {'obj': result}) return result def _ensure_provisioned(self, loadbalancer, obj, create, find, - interval=_LB_STS_POLL_FAST_INTERVAL): + interval=_LB_STS_POLL_FAST_INTERVAL, **kwargs): for remaining in self._provisioning_timer(_ACTIVATION_TIMEOUT, interval): if not self._wait_for_provisioning( loadbalancer, remaining, interval): return None try: - result = self._ensure(create, find, obj, loadbalancer) + result = self._ensure( + create, find, obj, loadbalancer, **kwargs) if result: return result except os_exc.BadRequestException: diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_lbaasv2.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_lbaasv2.py index 38260d7b3..a41ce20cf 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_lbaasv2.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_lbaasv2.py @@ -123,14 +123,13 @@ class TestLBaaSv2Driver(test_base.TestCase): sg_ids = ['foo', 'bar'] lb_name = 'just_a_name' - m_driver._ensure.return_value = expected_resp + m_driver._ensure_loadbalancer.return_value = expected_resp os_net.update_port = mock.Mock() resp = cls.ensure_loadbalancer(m_driver, lb_name, project_id, subnet_id, ip, sg_ids, 'ClusterIP') - m_driver._ensure.assert_called_once_with(m_driver._create_loadbalancer, - m_driver._find_loadbalancer, - mock.ANY) - req = m_driver._ensure.call_args[0][2] + m_driver._ensure_loadbalancer.assert_called_once_with( + mock.ANY) + req = m_driver._ensure_loadbalancer.call_args[0][0] self.assertEqual(lb_name, req['name']) self.assertEqual(project_id, req['project_id']) self.assertEqual(subnet_id, req['subnet_id']) @@ -148,7 +147,7 @@ class TestLBaaSv2Driver(test_base.TestCase): # TODO(ivc): handle security groups sg_ids = [] - m_driver._ensure.return_value = None + m_driver._ensure_loadbalancer.return_value = None self.assertRaises(k_exc.ResourceNotReady, cls.ensure_loadbalancer, m_driver, name, project_id, subnet_id, ip, sg_ids, 'ClusterIP') @@ -309,6 +308,7 @@ class TestLBaaSv2Driver(test_base.TestCase): pool['id']) def test_ensure_member(self): + lbaas = self.useFixture(k_fix.MockLBaaSClient()).client cls = d_lbaasv2.LBaaSv2Driver m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver) expected_resp = mock.sentinel.expected_resp @@ -335,7 +335,7 @@ class TestLBaaSv2Driver(test_base.TestCase): m_driver._ensure_provisioned.assert_called_once_with( loadbalancer, mock.ANY, m_driver._create_member, - m_driver._find_member) + m_driver._find_member, update=lbaas.update_member) member = m_driver._ensure_provisioned.call_args[0][1] self.assertEqual("%s/%s:%s" % (namespace, name, port), member['name']) self.assertEqual(pool['project_id'], member['project_id']) @@ -809,17 +809,17 @@ class TestLBaaSv2Driver(test_base.TestCase): 'port': 1234 } member_id = '3A70CEC0-392D-4BC1-A27C-06E63A0FD54F' - resp = iter([o_mem.Member(id=member_id)]) + resp = iter([o_mem.Member(id=member_id, name='TEST_NAME')]) lbaas.members.return_value = resp - ret = cls._find_member(m_driver, member, loadbalancer) lbaas.members.assert_called_once_with( member['pool_id'], - name=member['name'], project_id=member['project_id'], subnet_id=member['subnet_id'], address=member['ip'], protocol_port=member['port']) + # the member dict is copied, so the id is added to the return obj + member['id'] = member_id self.assertEqual(member, ret) self.assertEqual(member_id, ret['id']) @@ -842,7 +842,6 @@ class TestLBaaSv2Driver(test_base.TestCase): ret = cls._find_member(m_driver, member, loadbalancer) lbaas.members.assert_called_once_with( member['pool_id'], - name=member['name'], project_id=member['project_id'], subnet_id=member['subnet_id'], address=member['ip'], @@ -853,12 +852,14 @@ class TestLBaaSv2Driver(test_base.TestCase): cls = d_lbaasv2.LBaaSv2Driver m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver) obj = mock.Mock() + lb = mock.Mock() m_create = mock.Mock() m_find = mock.Mock() expected_result = mock.sentinel.expected_result m_create.return_value = expected_result - ret = cls._ensure(m_driver, m_create, m_find, obj) + ret = cls._ensure(m_driver, m_create, m_find, + obj, lb) m_create.assert_called_once_with(obj) self.assertEqual(expected_result, ret) @@ -866,15 +867,17 @@ class TestLBaaSv2Driver(test_base.TestCase): cls = d_lbaasv2.LBaaSv2Driver m_driver = mock.Mock(spec=d_lbaasv2.LBaaSv2Driver) obj = mock.Mock() + lb = mock.Mock() m_create = mock.Mock() m_find = mock.Mock() - expected_result = mock.sentinel.expected_result + expected_result = None m_create.side_effect = exception_value m_find.return_value = expected_result - ret = cls._ensure(m_driver, m_create, m_find, obj) + ret = cls._ensure(m_driver, m_create, m_find, + obj, lb) m_create.assert_called_once_with(obj) - m_find.assert_called_once_with(obj) + m_find.assert_called_once_with(obj, lb) self.assertEqual(expected_result, ret) def test_ensure_with_conflict(self):