diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py index 90b515c2c..45fd81022 100644 --- a/kuryr_kubernetes/constants.py +++ b/kuryr_kubernetes/constants.py @@ -43,6 +43,9 @@ K8S_OBJ_KURYRNETWORKPOLICY = 'KuryrNetworkPolicy' K8S_OBJ_KURYRLOADBALANCER = 'KuryrLoadBalancer' K8S_OBJ_KURYRPORT = 'KuryrPort' +OPENSHIFT_OBJ_MACHINE = 'Machine' +OPENSHIFT_API_CRD_MACHINES = '/apis/machine.openshift.io/v1beta1/machines' + K8S_POD_STATUS_PENDING = 'Pending' K8S_POD_STATUS_SUCCEEDED = 'Succeeded' K8S_POD_STATUS_FAILED = 'Failed' diff --git a/kuryr_kubernetes/controller/drivers/node_subnets.py b/kuryr_kubernetes/controller/drivers/node_subnets.py index 09738f7cf..cbbcae44c 100644 --- a/kuryr_kubernetes/controller/drivers/node_subnets.py +++ b/kuryr_kubernetes/controller/drivers/node_subnets.py @@ -13,10 +13,16 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging +from kuryr_kubernetes import clients +from kuryr_kubernetes import constants from kuryr_kubernetes.controller.drivers import base +from kuryr_kubernetes import exceptions +from kuryr_kubernetes import utils + CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -41,3 +47,79 @@ class ConfigNodesSubnets(base.NodesSubnetsDriver): def delete_node(self, node): return False + + +class OpenShiftNodesSubnets(base.NodesSubnetsDriver): + """Provides list of nodes subnets based on OpenShift Machine objects.""" + + def __init__(self): + super().__init__() + self.subnets = set() + + def _get_subnet_from_machine(self, machine): + networks = machine['spec'].get( + 'providerSpec', {}).get('value', {}).get('networks') + if not networks: + LOG.warning('No `networks` in Machine `providerSpec`') + return None + + subnets = networks[0].get('subnets') + if not subnets: + LOG.warning('No `subnets` in Machine `providerSpec.values.' + 'networks`.') + return None + + primary_subnet = subnets[0] + if primary_subnet.get('uuid'): + return primary_subnet['uuid'] + else: + subnet_filter = primary_subnet['filter'] + subnet_id = utils.get_subnet_id(**subnet_filter) + return subnet_id + + def get_nodes_subnets(self, raise_on_empty=False): + with lockutils.lock('kuryr-machine-add'): + if not self.subnets and raise_on_empty: + raise exceptions.ResourceNotReady( + 'OpenShift Machines does not exist or are not yet ' + 'handled. Cannot determine worker nodes subnets.') + + return list(self.subnets) + + def add_node(self, machine): + subnet_id = self._get_subnet_from_machine(machine) + if not subnet_id: + LOG.warning('Could not determine subnet of Machine %s', + machine['metadata']['name']) + return False + + with lockutils.lock('kuryr-machine-add'): + if subnet_id not in self.subnets: + LOG.info('Adding subnet %s to the worker nodes subnets as ' + 'machine %s runs in it.', subnet_id, + machine['metadata']['name']) + self.subnets.add(subnet_id) + return True + return False + + def delete_node(self, machine): + k8s = clients.get_kubernetes_client() + affected_subnet_id = self._get_subnet_from_machine(machine) + if not affected_subnet_id: + LOG.warning('Could not determine subnet of Machine %s', + machine['metadata']['name']) + return False + + machines = k8s.get(constants.OPENSHIFT_API_CRD_MACHINES) + for existing_machine in machines.get('items', []): + if affected_subnet_id == self._get_subnet_from_machine( + existing_machine): + return False + + # We know that subnet is no longer used, so we remove it. + LOG.info('Removing subnet %s from the worker nodes subnets', + affected_subnet_id) + with lockutils.lock('kuryr-machine-add'): + self.subnets.remove(affected_subnet_id) + + return True diff --git a/kuryr_kubernetes/controller/handlers/machine.py b/kuryr_kubernetes/controller/handlers/machine.py new file mode 100644 index 000000000..fe6b7519a --- /dev/null +++ b/kuryr_kubernetes/controller/handlers/machine.py @@ -0,0 +1,67 @@ +# Copyright 2020 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +from oslo_log import log as logging + +from kuryr_kubernetes import clients +from kuryr_kubernetes import constants +from kuryr_kubernetes.controller.drivers import base as drivers +from kuryr_kubernetes import exceptions +from kuryr_kubernetes.handlers import k8s_base + +LOG = logging.getLogger(__name__) + + +class MachineHandler(k8s_base.ResourceEventHandler): + """MachineHandler gathers info about OpenShift nodes needed by Kuryr. + + At the moment that's the subnets of all the worker nodes. + """ + OBJECT_KIND = constants.OPENSHIFT_OBJ_MACHINE + OBJECT_WATCH_PATH = constants.OPENSHIFT_API_CRD_MACHINES + + def __init__(self): + super(MachineHandler, self).__init__() + self.node_subnets_driver = drivers.NodesSubnetsDriver.get_instance() + + def _bump_nps(self): + """Bump NetworkPolicy objects to have the SG rules recalculated.""" + k8s = clients.get_kubernetes_client() + # NOTE(dulek): Listing KuryrNetworkPolicies instead of NetworkPolicies, + # as we only care about NPs already handled. + knps = k8s.get(constants.K8S_API_CRD_KURYRNETWORKPOLICIES) + for knp in knps.get('items', []): + try: + k8s.annotate( + knp['metadata']['annotations']['networkPolicyLink'], + {constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())}) + except exceptions.K8sResourceNotFound: + # Had to be deleted in the meanwhile. + pass + + def on_present(self, machine): + effect = self.node_subnets_driver.add_node(machine) + if effect: + # If the change was meaningful we need to make sure all the NPs + # are recalculated to get the new SG rules added. + self._bump_nps() + + def on_deleted(self, machine): + effect = self.node_subnets_driver.delete_node(machine) + if effect: + # If the change was meaningful we need to make sure all the NPs + # are recalculated to get the old SG rule deleted. + self._bump_nps() diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py index 7e65da0db..96f8e55be 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_node_subnets.py @@ -13,9 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. +from unittest import mock + from oslo_config import cfg from kuryr_kubernetes.controller.drivers import node_subnets +from kuryr_kubernetes import exceptions from kuryr_kubernetes.tests import base as test_base @@ -59,3 +62,137 @@ class TestConfigNodesSubnetsDriver(test_base.TestCase): def test_delete_node(self): driver = node_subnets.ConfigNodesSubnets() self.assertFalse(driver.delete_node('node')) + + +class TestOpenShiftNodesSubnetsDriver(test_base.TestCase): + def setUp(self): + super().setUp() + self.machine = { + "apiVersion": "machine.openshift.io/v1beta1", + "kind": "Machine", + "metadata": { + "name": "foo-tv22d-master-2", + "namespace": "openshift-machine-api", + }, + "spec": { + "metadata": {}, + "providerSpec": { + "value": { + "cloudName": "openstack", + "cloudsSecret": { + "name": "openstack-cloud-credentials", + "namespace": "openshift-machine-api" + }, + "kind": "OpenstackProviderSpec", + "networks": [ + { + "filter": {}, + "subnets": [{ + "filter": { + "name": "foo-tv22d-nodes", + "tags": "openshiftClusterID=foo-tv22d" + }} + ] + } + ], + } + } + }, + "status": {} + } + + def test_get_nodes_subnets(self): + subnets = ['subnet1', 'subnet2'] + driver = node_subnets.OpenShiftNodesSubnets() + for subnet in subnets: + driver.subnets.add(subnet) + self.assertCountEqual(subnets, driver.get_nodes_subnets()) + + def test_get_nodes_subnets_not_raise(self): + driver = node_subnets.OpenShiftNodesSubnets() + self.assertEqual([], driver.get_nodes_subnets()) + + def test_get_nodes_subnets_raise(self): + driver = node_subnets.OpenShiftNodesSubnets() + self.assertRaises(exceptions.ResourceNotReady, + driver.get_nodes_subnets, raise_on_empty=True) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + m_get_subnet_id.return_value = 'foobar' + self.assertTrue(driver.add_node(self.machine)) + m_get_subnet_id.assert_called_once_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual(['foobar'], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node_exists(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + m_get_subnet_id.return_value = 'foobar' + driver.subnets.add('foobar') + self.assertFalse(driver.add_node(self.machine)) + m_get_subnet_id.assert_called_once_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual(['foobar'], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node_uuid(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + net = self.machine['spec']['providerSpec']['value']['networks'][0] + del net['subnets'][0]['filter'] + net['subnets'][0]['uuid'] = 'barfoo' + self.assertTrue(driver.add_node(self.machine)) + m_get_subnet_id.assert_not_called() + self.assertEqual(['barfoo'], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + def test_add_node_cannot(self, m_get_subnet_id): + driver = node_subnets.OpenShiftNodesSubnets() + net = self.machine['spec']['providerSpec']['value']['networks'][0] + del net['subnets'] + self.assertFalse(driver.add_node(self.machine)) + m_get_subnet_id.assert_not_called() + self.assertEqual([], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_delete_node_cannot(self, m_get_k8s, m_get_subnet_id): + m_k8s = mock.Mock() + m_get_k8s.return_value = m_k8s + driver = node_subnets.OpenShiftNodesSubnets() + net = self.machine['spec']['providerSpec']['value']['networks'][0] + del net['subnets'] + self.assertFalse(driver.delete_node(self.machine)) + m_get_subnet_id.assert_not_called() + self.assertEqual([], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_delete_node(self, m_get_k8s, m_get_subnet_id): + m_k8s = mock.Mock() + m_get_k8s.return_value = m_k8s + m_k8s.get.return_value = {'items': []} + + driver = node_subnets.OpenShiftNodesSubnets() + driver.subnets.add('foobar') + m_get_subnet_id.return_value = 'foobar' + self.assertTrue(driver.delete_node(self.machine)) + m_get_subnet_id.assert_called_once_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual([], driver.get_nodes_subnets()) + + @mock.patch('kuryr_kubernetes.utils.get_subnet_id') + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_delete_node_still_exists(self, m_get_k8s, m_get_subnet_id): + m_k8s = mock.Mock() + m_get_k8s.return_value = m_k8s + m_k8s.get.return_value = {'items': [self.machine]} + + driver = node_subnets.OpenShiftNodesSubnets() + driver.subnets.add('foobar') + m_get_subnet_id.return_value = 'foobar' + self.assertFalse(driver.delete_node(self.machine)) + m_get_subnet_id.assert_called_with( + name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d') + self.assertEqual(['foobar'], driver.get_nodes_subnets()) diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_machine.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_machine.py new file mode 100644 index 000000000..bb3a1aea5 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_machine.py @@ -0,0 +1,84 @@ +# Copyright 2020 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from kuryr_kubernetes import constants +from kuryr_kubernetes.controller.handlers import machine +from kuryr_kubernetes import exceptions +from kuryr_kubernetes.tests import base as test_base + + +class TestKuryrMachineHandler(test_base.TestCase): + @mock.patch( + 'kuryr_kubernetes.controller.drivers.base.NodesSubnetsDriver.' + 'get_instance') + def setUp(self, m_get_instance): + super(TestKuryrMachineHandler, self).setUp() + self.driver = mock.Mock() + m_get_instance.return_value = self.driver + self.handler = machine.MachineHandler() + + def test_on_present(self): + self.handler._bump_nps = mock.Mock() + self.driver.add_node.return_value = False + self.handler.on_present(mock.sentinel.machine) + self.driver.add_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_not_called() + + def test_on_present_new(self): + self.handler._bump_nps = mock.Mock() + self.driver.add_node.return_value = True + self.handler.on_present(mock.sentinel.machine) + self.driver.add_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_called_once() + + def test_on_deleted(self): + self.handler._bump_nps = mock.Mock() + self.driver.delete_node.return_value = False + self.handler.on_deleted(mock.sentinel.machine) + self.driver.delete_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_not_called() + + def test_on_deleted_gone(self): + self.handler._bump_nps = mock.Mock() + self.driver.delete_node.return_value = True + self.handler.on_deleted(mock.sentinel.machine) + self.driver.delete_node.assert_called_once_with(mock.sentinel.machine) + self.handler._bump_nps.assert_called_once() + + @mock.patch('kuryr_kubernetes.clients.get_kubernetes_client') + def test_bump_nps(self, get_client): + m_k8s = mock.Mock() + get_client.return_value = m_k8s + m_k8s.get.return_value = { + 'items': [ + {'metadata': {'annotations': { + 'networkPolicyLink': mock.sentinel.link1}}}, + {'metadata': {'annotations': { + 'networkPolicyLink': mock.sentinel.link2}}}, + {'metadata': {'annotations': { + 'networkPolicyLink': mock.sentinel.link3}}}, + ] + } + m_k8s.annotate.side_effect = ( + None, exceptions.K8sResourceNotFound('NP'), None) + self.handler._bump_nps() + m_k8s.get.assert_called_once_with( + constants.K8S_API_CRD_KURYRNETWORKPOLICIES) + m_k8s.annotate.assert_has_calls([ + mock.call(mock.sentinel.link1, mock.ANY), + mock.call(mock.sentinel.link2, mock.ANY), + mock.call(mock.sentinel.link3, mock.ANY), + ]) diff --git a/kuryr_kubernetes/tests/unit/test_utils.py b/kuryr_kubernetes/tests/unit/test_utils.py index 2bceedcec..878d3397e 100644 --- a/kuryr_kubernetes/tests/unit/test_utils.py +++ b/kuryr_kubernetes/tests/unit/test_utils.py @@ -440,3 +440,25 @@ class TestUtils(test_base.TestCase): res = {'metadata': {'selfLink': '/foo'}} self.assertEqual(utils.get_res_link(res), '/foo') + + @mock.patch('kuryr_kubernetes.clients.get_network_client') + def test_get_subnet_id(self, m_get_net): + m_net = mock.Mock() + m_get_net.return_value = m_net + subnets = (mock.Mock(id=mock.sentinel.subnet1), + mock.Mock(id=mock.sentinel.subnet2)) + m_net.subnets.return_value = iter(subnets) + filters = {'name': 'foo', 'tags': 'bar'} + sub = utils.get_subnet_id(**filters) + m_net.subnets.assert_called_with(**filters) + self.assertEqual(mock.sentinel.subnet1, sub) + + @mock.patch('kuryr_kubernetes.clients.get_network_client') + def test_get_subnet_not_found(self, m_get_net): + m_net = mock.Mock() + m_get_net.return_value = m_net + m_net.subnets.return_value = iter(()) + filters = {'name': 'foo', 'tags': 'bar'} + sub = utils.get_subnet_id(**filters) + m_net.subnets.assert_called_with(**filters) + self.assertIsNone(sub) diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index 23aae679c..00c0e73f2 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -82,7 +82,8 @@ RESOURCE_MAP = {'Endpoints': 'endpoints', 'NetworkPolicy': 'networkpolicies', 'Node': 'nodes', 'Pod': 'pods', - 'Service': 'services'} + 'Service': 'services', + 'Machine': 'machines'} API_RE = re.compile(r'v\d+') @@ -296,6 +297,16 @@ def get_subnet_cidr(subnet_id): return subnet_obj.cidr +def get_subnet_id(**filters): + os_net = clients.get_network_client() + subnets = os_net.subnets(**filters) + + try: + return next(subnets).id + except StopIteration: + return None + + @MEMOIZE def get_subnets_id_cidrs(subnet_ids): os_net = clients.get_network_client() diff --git a/setup.cfg b/setup.cfg index ee3e67243..1b3464e74 100644 --- a/setup.cfg +++ b/setup.cfg @@ -98,6 +98,7 @@ kuryr_kubernetes.controller.drivers.vif_pool = kuryr_kubernetes.controller.drivers.nodes_subnets = config = kuryr_kubernetes.controller.drivers.node_subnets:ConfigNodesSubnets + openshift = kuryr_kubernetes.controller.drivers.node_subnets:OpenShiftNodesSubnets kuryr_kubernetes.controller.handlers = vif = kuryr_kubernetes.controller.handlers.vif:VIFHandler @@ -112,6 +113,7 @@ kuryr_kubernetes.controller.handlers = kuryrnetwork_population = kuryr_kubernetes.controller.handlers.kuryrnetwork_population:KuryrNetworkPopulationHandler test_handler = kuryr_kubernetes.tests.unit.controller.handlers.test_fake_handler:TestHandler kuryrport = kuryr_kubernetes.controller.handlers.kuryrport:KuryrPortHandler + openshift_machine = kuryr_kubernetes.controller.handlers.machine:MachineHandler kuryr_kubernetes.controller.drivers.multi_vif = noop = kuryr_kubernetes.controller.drivers.multi_vif:NoopMultiVIFDriver