From e95ed536d6d8f95bfa6fa798b71ceb9af5d0ff35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dulko?= Date: Wed, 23 Dec 2020 17:42:01 +0100 Subject: [PATCH] Add OpenShiftNodesSubnets driver and MachineHandler In order to support OpenShift's ability to run its nodes in various OpenStack subnets in a dynamic way, this commit introduces the OpenShiftNodesSubnets and MachineHandler. The idea is that MachineHandler is responsible for watching the OpenShift Machine objects and calling the driver. The driver will then save and serve a list of current worker nodes subnets. Change-Id: Iae3a5d011abaeab4aa97d6aa7153227c6f85b93c --- kuryr_kubernetes/constants.py | 3 + .../controller/drivers/node_subnets.py | 82 +++++++++++ .../controller/handlers/machine.py | 67 +++++++++ .../controller/drivers/test_node_subnets.py | 137 ++++++++++++++++++ .../unit/controller/handlers/test_machine.py | 84 +++++++++++ kuryr_kubernetes/tests/unit/test_utils.py | 22 +++ kuryr_kubernetes/utils.py | 13 +- setup.cfg | 2 + 8 files changed, 409 insertions(+), 1 deletion(-) create mode 100644 kuryr_kubernetes/controller/handlers/machine.py create mode 100644 kuryr_kubernetes/tests/unit/controller/handlers/test_machine.py diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py index 90b515c2c..45fd81022 100644 --- a/kuryr_kubernetes/constants.py +++ b/kuryr_kubernetes/constants.py @@ -43,6 +43,9 @@ K8S_OBJ_KURYRNETWORKPOLICY = 'KuryrNetworkPolicy' K8S_OBJ_KURYRLOADBALANCER = 'KuryrLoadBalancer' K8S_OBJ_KURYRPORT = 'KuryrPort' +OPENSHIFT_OBJ_MACHINE = 'Machine' +OPENSHIFT_API_CRD_MACHINES = '/apis/machine.openshift.io/v1beta1/machines' + K8S_POD_STATUS_PENDING = 'Pending' K8S_POD_STATUS_SUCCEEDED = 'Succeeded' K8S_POD_STATUS_FAILED = 'Failed' diff --git a/kuryr_kubernetes/controller/drivers/node_subnets.py b/kuryr_kubernetes/controller/drivers/node_subnets.py index 09738f7cf..cbbcae44c 100644 --- a/kuryr_kubernetes/controller/drivers/node_subnets.py +++ b/kuryr_kubernetes/controller/drivers/node_subnets.py @@ -13,10 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging +from kuryr_kubernetes import clients +from kuryr_kubernetes import constants from kuryr_kubernetes.controller.drivers import base +from kuryr_kubernetes import exceptions +from kuryr_kubernetes import utils + CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -41,3 +47,79 @@ class ConfigNodesSubnets(base.NodesSubnetsDriver): def delete_node(self, node): return False + + +class OpenShiftNodesSubnets(base.NodesSubnetsDriver): + """Provides list of nodes subnets based on OpenShift Machine objects.""" + + def __init__(self): + super().__init__() + self.subnets = set() + + def _get_subnet_from_machine(self, machine): + networks = machine['spec'].get( + 'providerSpec', {}).get('value', {}).get('networks') + if not networks: + LOG.warning('No `networks` in Machine `providerSpec`') + return None + + subnets = networks[0].get('subnets') + if not subnets: + LOG.warning('No `subnets` in Machine `providerSpec.values.' + 'networks`.') + return None + + primary_subnet = subnets[0] + if primary_subnet.get('uuid'): + return primary_subnet['uuid'] + else: + subnet_filter = primary_subnet['filter'] + subnet_id = utils.get_subnet_id(**subnet_filter) + return subnet_id + + def get_nodes_subnets(self, raise_on_empty=False): + with lockutils.lock('kuryr-machine-add'): + if not self.subnets and raise_on_empty: + raise exceptions.ResourceNotReady( + 'OpenShift Machines does not exist or are not yet ' + 'handled. Cannot determine worker nodes subnets.') + + return list(self.subnets) + + def add_node(self, machine): + subnet_id = self._get_subnet_from_machine(machine) + if not subnet_id: + LOG.warning('Could not determine subnet of Machine %s', + machine['metadata']['name']) + return False + + with lockutils.lock('kuryr-machine-add'): + if subnet_id not in self.subnets: + LOG.info('Adding subnet %s to the worker nodes subnets as ' + 'machine %s runs in it.', subnet_id, + machine['metadata']['name']) + self.subnets.add(subnet_id) + return True + return False + + def delete_node(self, machine): + k8s = clients.get_kubernetes_client() + affected_subnet_id = self._get_subnet_from_machine(machine) + if not affected_subnet_id: + LOG.warning('Could not determine subnet of Machine %s', + machine['metadata']['name']) + return False + + machines = k8s.get(constants.OPENSHIFT_API_CRD_MACHINES) + for existing_machine in machines.get('items', []): + if affected_subnet_id == self._get_subnet_from_machine( + existing_machine): + return False + + # We know that subnet is no longer used, so we remove it. + LOG.info('Removing subnet %s from the worker nodes subnets', + affected_subnet_id) + with lockutils.lock('kuryr-machine-add'): + self.subnets.remove(affected_subnet_id) + + return True diff --git a/kuryr_kubernetes/controller/handlers/machine.py b/kuryr_kubernetes/controller/handlers/machine.py new file mode 100644 index 000000000..fe6b7519a --- /dev/null +++ b/kuryr_kubernetes/controller/handlers/machine.py @@ -0,0 +1,67 @@ +# Copyright 2020 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +from oslo_log import log as logging + +from kuryr_kubernetes import clients +from kuryr_kubernetes import constants +from kuryr_kubernetes.controller.drivers import base as drivers +from kuryr_kubernetes import exceptions +from kuryr_kubernetes.handlers import k8s_base + +LOG = logging.getLogger(__name__) + + +class MachineHandler(k8s_base.ResourceEventHandler): + """MachineHandler gathers info about OpenShift nodes needed by Kuryr. + + At the moment that's the subnets of all the worker nodes. + """ + OBJECT_KIND = constants.OPENSHIFT_OBJ_MACHINE + OBJECT_WATCH_PATH = constants.OPENSHIFT_API_CRD_MACHINES + + def __init__(self): + super(MachineHandler, self).__init__() + self.node_subnets_driver = drivers.NodesSubnetsDriver.get_instance() + + def _bump_nps(self): + """Bump NetworkPolicy objects to have the SG rules recalculated.""" + k8s = clients.get_kubernetes_client() + # NOTE(dulek): Listing KuryrNetworkPolicies instead of NetworkPolicies, + # as we only care about NPs already handled. + knps = k8s.get(constants.K8S_API_CRD_KURYRNETWORKPOLICIES) + for knp in knps.get('items', []): + try: + k8s.annotate( + knp['metadata']['annotations']['networkPolicyLink'], + {constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())}) + except exceptions.K8sResourceNotFound: + # Had to be deleted in the meanwhile. + pass + + def on_present(self, machine): + effect = self.node_subnets_driver.add_node(machine) + if effect: + # If the change was meaningful we need to make sure all the NPs + # are recalculated to get the new SG rules added. + self._bump_nps() + + def on_deleted(self, machine): + effect = self.node_subnets_driver.delete_node(machine) + if effect: + # If the change was meaningful we need to make sure all the NPs + # are recalculated to get the old SG rule deleted. + self._bump_nps() diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py index 7e65da0db..96f8e55be 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py @@ -13,9 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. +from unittest import mock + from oslo_config import cfg from kuryr_kubernetes.controller.drivers import node_subnets +from kuryr_kubernetes import exceptions from kuryr_kubernetes.tests import base as test_base @@ -59,3 +62,137 @@ class TestConfigNodesSubnetsDriver(test_base.TestCase): def test_delete_node(self): driver = node_subnets.ConfigNodesSubnets() self.assertFalse(driver.delete_node('node')) + + +class TestOpenShiftNodesSubnetsDriver(test_base.TestCase): + def setUp(self): + super().setUp() + self.machine = { + "apiVersion": "machine.openshift.io/v1beta1", + "kind": "Machine", + "metadata": { + "name": "foo-tv22d-master-2", + "namespace": "openshift-machine-api", + }, + "spec": { + "metadata": {}, + "providerSpec": { + "value": { + "cloudName": "openstack", + "cloudsSecret": { + "name": "openstack-cloud-credentials", + "namespace": "openshift-machine-api" + }, + "kind": "OpenstackProviderSpec", + "networks": [ + { + "filter": {}, + "subnets": [{ + "filter": { + "name": "foo-tv22d-nodes", + "tags": "openshiftClusterID=foo-tv22d" + }} + ] + } + ], + } + } + }, + "status": {} + } + + def test_get_nodes_subnets(self): + subnets = ['subnet1', 'subnet2'] + driver = node_subnets.OpenShiftNodesSubnets() + for subnet in subnets: + driver.subnets.add(subnet) + self.assertCountEqual(subnets, driver.get_nodes_subnets()) + + def test_get_nodes_subnets_not_raise(self): + driver = node_subnets.OpenShiftNodesSubnets() + self.assertEqual([], driver.get_nodes_subnets()) + + def test_get_nodes_subnets_raise(self): + driver = node_subnets.OpenShiftNodesSubnets() + self.assertRaises(exceptions.ResourceNotReady, + driver.get_nodes_subnets, raise_on_empty=True) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + m_get_subnet_id.return_value = 'foobar' + self.assertTrue(driver.add_node(self.machine)) + m_get_subnet_id.assert_called_once_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual(['foobar'], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node_exists(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + m_get_subnet_id.return_value = 'foobar' + driver.subnets.add('foobar') + self.assertFalse(driver.add_node(self.machine)) + m_get_subnet_id.assert_called_once_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual(['foobar'], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node_uuid(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + net = self.machine['spec']['providerSpec']['value']['networks'][0] + del net['subnets'][0]['filter'] + net['subnets'][0]['uuid'] = 'barfoo' + self.assertTrue(driver.add_node(self.machine)) + m_get_subnet_id.assert_not_called() + self.assertEqual(['barfoo'], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node_cannot(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + net = self.machine['spec']['providerSpec']['value']['networks'][0] + del net['subnets'] + self.assertFalse(driver.add_node(self.machine)) + m_get_subnet_id.assert_not_called() + self.assertEqual([], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_delete_node_cannot(self, m_get_k8s, m_get_subnet_id): + m_k8s = mock.Mock() + m_get_k8s.return_value = m_k8s + driver = node_subnets.OpenShiftNodesSubnets() + net = self.machine['spec']['providerSpec']['value']['networks'][0] + del net['subnets'] + self.assertFalse(driver.delete_node(self.machine)) + m_get_subnet_id.assert_not_called() + self.assertEqual([], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_delete_node(self, m_get_k8s, m_get_subnet_id): + m_k8s = mock.Mock() + m_get_k8s.return_value = m_k8s + m_k8s.get.return_value = {'items': []} + + driver = node_subnets.OpenShiftNodesSubnets() + driver.subnets.add('foobar') + m_get_subnet_id.return_value = 'foobar' + self.assertTrue(driver.delete_node(self.machine)) + m_get_subnet_id.assert_called_once_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual([], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_delete_node_still_exists(self, m_get_k8s, m_get_subnet_id): + m_k8s = mock.Mock() + m_get_k8s.return_value = m_k8s + m_k8s.get.return_value = {'items': [self.machine]} + + driver = node_subnets.OpenShiftNodesSubnets() + driver.subnets.add('foobar') + m_get_subnet_id.return_value = 'foobar' + self.assertFalse(driver.delete_node(self.machine)) + m_get_subnet_id.assert_called_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual(['foobar'], driver.get_nodes_subnets()) diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_machine.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_machine.py new file mode 100644 index 000000000..bb3a1aea5 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_machine.py @@ -0,0 +1,84 @@ +# Copyright 2020 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from kuryr_kubernetes import constants +from kuryr_kubernetes.controller.handlers import machine +from kuryr_kubernetes import exceptions +from kuryr_kubernetes.tests import base as test_base + + +class TestKuryrMachineHandler(test_base.TestCase): + @mock.patch( + 'kuryr_kubernetes.controller.drivers.base.NodesSubnetsDriver.' + 'get_instance') + def setUp(self, m_get_instance): + super(TestKuryrMachineHandler, self).setUp() + self.driver = mock.Mock() + m_get_instance.return_value = self.driver + self.handler = machine.MachineHandler() + + def test_on_present(self): + self.handler._bump_nps = mock.Mock() + self.driver.add_node.return_value = False + self.handler.on_present(mock.sentinel.machine) + self.driver.add_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_not_called() + + def test_on_present_new(self): + self.handler._bump_nps = mock.Mock() + self.driver.add_node.return_value = True + self.handler.on_present(mock.sentinel.machine) + self.driver.add_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_called_once() + + def test_on_deleted(self): + self.handler._bump_nps = mock.Mock() + self.driver.delete_node.return_value = False + self.handler.on_deleted(mock.sentinel.machine) + self.driver.delete_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_not_called() + + def test_on_deleted_gone(self): + self.handler._bump_nps = mock.Mock() + self.driver.delete_node.return_value = True + self.handler.on_deleted(mock.sentinel.machine) + self.driver.delete_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_called_once() + + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_bump_nps(self, get_client): + m_k8s = mock.Mock() + get_client.return_value = m_k8s + m_k8s.get.return_value = { + 'items': [ + {'metadata': {'annotations': { + 'networkPolicyLink': mock.sentinel.link1}}}, + {'metadata': {'annotations': { + 'networkPolicyLink': mock.sentinel.link2}}}, + {'metadata': {'annotations': { + 'networkPolicyLink': mock.sentinel.link3}}}, + ] + } + m_k8s.annotate.side_effect = ( + None, exceptions.K8sResourceNotFound('NP'), None) + self.handler._bump_nps() + m_k8s.get.assert_called_once_with( + constants.K8S_API_CRD_KURYRNETWORKPOLICIES) + m_k8s.annotate.assert_has_calls([ + mock.call(mock.sentinel.link1, mock.ANY), + mock.call(mock.sentinel.link2, mock.ANY), + mock.call(mock.sentinel.link3, mock.ANY), + ]) diff --git a/kuryr_kubernetes/tests/unit/test_utils.py b/kuryr_kubernetes/tests/unit/test_utils.py index 2bceedcec..878d3397e 100644 --- a/kuryr_kubernetes/tests/unit/test_utils.py +++ b/kuryr_kubernetes/tests/unit/test_utils.py @@ -440,3 +440,25 @@ class TestUtils(test_base.TestCase): res = {'metadata': {'selfLink': '/foo'}} self.assertEqual(utils.get_res_link(res), '/foo') + + @mock.patch('kuryr_kubernetes.clients.get_network_client') + def test_get_subnet_id(self, m_get_net): + m_net = mock.Mock() + m_get_net.return_value = m_net + subnets = (mock.Mock(id=mock.sentinel.subnet1), + mock.Mock(id=mock.sentinel.subnet2)) + m_net.subnets.return_value = iter(subnets) + filters = {'name': 'foo', 'tags': 'bar'} + sub = utils.get_subnet_id(**filters) + m_net.subnets.assert_called_with(**filters) + self.assertEqual(mock.sentinel.subnet1, sub) + + @mock.patch('kuryr_kubernetes.clients.get_network_client') + def test_get_subnet_not_found(self, m_get_net): + m_net = mock.Mock() + m_get_net.return_value = m_net + m_net.subnets.return_value = iter(()) + filters = {'name': 'foo', 'tags': 'bar'} + sub = utils.get_subnet_id(**filters) + m_net.subnets.assert_called_with(**filters) + self.assertIsNone(sub) diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 23aae679c..00c0e73f2 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -82,7 +82,8 @@ RESOURCE_MAP = {'Endpoints': 'endpoints', 'NetworkPolicy': 'networkpolicies', 'Node': 'nodes', 'Pod': 'pods', - 'Service': 'services'} + 'Service': 'services', + 'Machine': 'machines'} API_RE = re.compile(r'v\d+') @@ -296,6 +297,16 @@ def get_subnet_cidr(subnet_id): return subnet_obj.cidr +def get_subnet_id(**filters): + os_net = clients.get_network_client() + subnets = os_net.subnets(**filters) + + try: + return next(subnets).id + except StopIteration: + return None + + @MEMOIZE def get_subnets_id_cidrs(subnet_ids): os_net = clients.get_network_client() diff --git a/setup.cfg b/setup.cfg index ee3e67243..1b3464e74 100644 --- a/setup.cfg +++ b/setup.cfg @@ -98,6 +98,7 @@ kuryr_kubernetes.controller.drivers.vif_pool = kuryr_kubernetes.controller.drivers.nodes_subnets = config = kuryr_kubernetes.controller.drivers.node_subnets:ConfigNodesSubnets + openshift = kuryr_kubernetes.controller.drivers.node_subnets:OpenShiftNodesSubnets kuryr_kubernetes.controller.handlers = vif = kuryr_kubernetes.controller.handlers.vif:VIFHandler @@ -112,6 +113,7 @@ kuryr_kubernetes.controller.handlers = kuryrnetwork_population = kuryr_kubernetes.controller.handlers.kuryrnetwork_population:KuryrNetworkPopulationHandler test_handler = kuryr_kubernetes.tests.unit.controller.handlers.test_fake_handler:TestHandler kuryrport = kuryr_kubernetes.controller.handlers.kuryrport:KuryrPortHandler + openshift_machine = kuryr_kubernetes.controller.handlers.machine:MachineHandler kuryr_kubernetes.controller.drivers.multi_vif = noop = kuryr_kubernetes.controller.drivers.multi_vif:NoopMultiVIFDriver