Merge "Add OpenShiftNodesSubnets driver and MachineHandler"

This commit is contained in:
Zuul 2021-01-26 13:21:38 +00:00 committed by Gerrit Code Review
commit 6f0f7bc0e6
8 changed files with 409 additions and 1 deletions

View File

@ -43,6 +43,9 @@ K8S_OBJ_KURYRNETWORKPOLICY = 'KuryrNetworkPolicy'
K8S_OBJ_KURYRLOADBALANCER = 'KuryrLoadBalancer'
K8S_OBJ_KURYRPORT = 'KuryrPort'
OPENSHIFT_OBJ_MACHINE = 'Machine'
OPENSHIFT_API_CRD_MACHINES = '/apis/machine.openshift.io/v1beta1/machines'
K8S_POD_STATUS_PENDING = 'Pending'
K8S_POD_STATUS_SUCCEEDED = 'Succeeded'
K8S_POD_STATUS_FAILED = 'Failed'

View File

@ -13,10 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes import exceptions
from kuryr_kubernetes import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -41,3 +47,79 @@ class ConfigNodesSubnets(base.NodesSubnetsDriver):
def delete_node(self, node):
return False
class OpenShiftNodesSubnets(base.NodesSubnetsDriver):
"""Provides list of nodes subnets based on OpenShift Machine objects."""
def __init__(self):
super().__init__()
self.subnets = set()
def _get_subnet_from_machine(self, machine):
networks = machine['spec'].get(
'providerSpec', {}).get('value', {}).get('networks')
if not networks:
LOG.warning('No `networks` in Machine `providerSpec`')
return None
subnets = networks[0].get('subnets')
if not subnets:
LOG.warning('No `subnets` in Machine `providerSpec.values.'
'networks`.')
return None
primary_subnet = subnets[0]
if primary_subnet.get('uuid'):
return primary_subnet['uuid']
else:
subnet_filter = primary_subnet['filter']
subnet_id = utils.get_subnet_id(**subnet_filter)
return subnet_id
def get_nodes_subnets(self, raise_on_empty=False):
with lockutils.lock('kuryr-machine-add'):
if not self.subnets and raise_on_empty:
raise exceptions.ResourceNotReady(
'OpenShift Machines does not exist or are not yet '
'handled. Cannot determine worker nodes subnets.')
return list(self.subnets)
def add_node(self, machine):
subnet_id = self._get_subnet_from_machine(machine)
if not subnet_id:
LOG.warning('Could not determine subnet of Machine %s',
machine['metadata']['name'])
return False
with lockutils.lock('kuryr-machine-add'):
if subnet_id not in self.subnets:
LOG.info('Adding subnet %s to the worker nodes subnets as '
'machine %s runs in it.', subnet_id,
machine['metadata']['name'])
self.subnets.add(subnet_id)
return True
return False
def delete_node(self, machine):
k8s = clients.get_kubernetes_client()
affected_subnet_id = self._get_subnet_from_machine(machine)
if not affected_subnet_id:
LOG.warning('Could not determine subnet of Machine %s',
machine['metadata']['name'])
return False
machines = k8s.get(constants.OPENSHIFT_API_CRD_MACHINES)
for existing_machine in machines.get('items', []):
if affected_subnet_id == self._get_subnet_from_machine(
existing_machine):
return False
# We know that subnet is no longer used, so we remove it.
LOG.info('Removing subnet %s from the worker nodes subnets',
affected_subnet_id)
with lockutils.lock('kuryr-machine-add'):
self.subnets.remove(affected_subnet_id)
return True

View File

@ -0,0 +1,67 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import k8s_base
LOG = logging.getLogger(__name__)
class MachineHandler(k8s_base.ResourceEventHandler):
"""MachineHandler gathers info about OpenShift nodes needed by Kuryr.
At the moment that's the subnets of all the worker nodes.
"""
OBJECT_KIND = constants.OPENSHIFT_OBJ_MACHINE
OBJECT_WATCH_PATH = constants.OPENSHIFT_API_CRD_MACHINES
def __init__(self):
super(MachineHandler, self).__init__()
self.node_subnets_driver = drivers.NodesSubnetsDriver.get_instance()
def _bump_nps(self):
"""Bump NetworkPolicy objects to have the SG rules recalculated."""
k8s = clients.get_kubernetes_client()
# NOTE(dulek): Listing KuryrNetworkPolicies instead of NetworkPolicies,
# as we only care about NPs already handled.
knps = k8s.get(constants.K8S_API_CRD_KURYRNETWORKPOLICIES)
for knp in knps.get('items', []):
try:
k8s.annotate(
knp['metadata']['annotations']['networkPolicyLink'],
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except exceptions.K8sResourceNotFound:
# Had to be deleted in the meanwhile.
pass
def on_present(self, machine):
effect = self.node_subnets_driver.add_node(machine)
if effect:
# If the change was meaningful we need to make sure all the NPs
# are recalculated to get the new SG rules added.
self._bump_nps()
def on_deleted(self, machine):
effect = self.node_subnets_driver.delete_node(machine)
if effect:
# If the change was meaningful we need to make sure all the NPs
# are recalculated to get the old SG rule deleted.
self._bump_nps()

View File

@ -13,9 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_config import cfg
from kuryr_kubernetes.controller.drivers import node_subnets
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
@ -59,3 +62,137 @@ class TestConfigNodesSubnetsDriver(test_base.TestCase):
def test_delete_node(self):
driver = node_subnets.ConfigNodesSubnets()
self.assertFalse(driver.delete_node('node'))
class TestOpenShiftNodesSubnetsDriver(test_base.TestCase):
def setUp(self):
super().setUp()
self.machine = {
"apiVersion": "machine.openshift.io/v1beta1",
"kind": "Machine",
"metadata": {
"name": "foo-tv22d-master-2",
"namespace": "openshift-machine-api",
},
"spec": {
"metadata": {},
"providerSpec": {
"value": {
"cloudName": "openstack",
"cloudsSecret": {
"name": "openstack-cloud-credentials",
"namespace": "openshift-machine-api"
},
"kind": "OpenstackProviderSpec",
"networks": [
{
"filter": {},
"subnets": [{
"filter": {
"name": "foo-tv22d-nodes",
"tags": "openshiftClusterID=foo-tv22d"
}}
]
}
],
}
}
},
"status": {}
}
def test_get_nodes_subnets(self):
subnets = ['subnet1', 'subnet2']
driver = node_subnets.OpenShiftNodesSubnets()
for subnet in subnets:
driver.subnets.add(subnet)
self.assertCountEqual(subnets, driver.get_nodes_subnets())
def test_get_nodes_subnets_not_raise(self):
driver = node_subnets.OpenShiftNodesSubnets()
self.assertEqual([], driver.get_nodes_subnets())
def test_get_nodes_subnets_raise(self):
driver = node_subnets.OpenShiftNodesSubnets()
self.assertRaises(exceptions.ResourceNotReady,
driver.get_nodes_subnets, raise_on_empty=True)
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
m_get_subnet_id.return_value = 'foobar'
self.assertTrue(driver.add_node(self.machine))
m_get_subnet_id.assert_called_once_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual(['foobar'], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node_exists(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
m_get_subnet_id.return_value = 'foobar'
driver.subnets.add('foobar')
self.assertFalse(driver.add_node(self.machine))
m_get_subnet_id.assert_called_once_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual(['foobar'], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node_uuid(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
net = self.machine['spec']['providerSpec']['value']['networks'][0]
del net['subnets'][0]['filter']
net['subnets'][0]['uuid'] = 'barfoo'
self.assertTrue(driver.add_node(self.machine))
m_get_subnet_id.assert_not_called()
self.assertEqual(['barfoo'], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node_cannot(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
net = self.machine['spec']['providerSpec']['value']['networks'][0]
del net['subnets']
self.assertFalse(driver.add_node(self.machine))
m_get_subnet_id.assert_not_called()
self.assertEqual([], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_delete_node_cannot(self, m_get_k8s, m_get_subnet_id):
m_k8s = mock.Mock()
m_get_k8s.return_value = m_k8s
driver = node_subnets.OpenShiftNodesSubnets()
net = self.machine['spec']['providerSpec']['value']['networks'][0]
del net['subnets']
self.assertFalse(driver.delete_node(self.machine))
m_get_subnet_id.assert_not_called()
self.assertEqual([], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_delete_node(self, m_get_k8s, m_get_subnet_id):
m_k8s = mock.Mock()
m_get_k8s.return_value = m_k8s
m_k8s.get.return_value = {'items': []}
driver = node_subnets.OpenShiftNodesSubnets()
driver.subnets.add('foobar')
m_get_subnet_id.return_value = 'foobar'
self.assertTrue(driver.delete_node(self.machine))
m_get_subnet_id.assert_called_once_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual([], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_delete_node_still_exists(self, m_get_k8s, m_get_subnet_id):
m_k8s = mock.Mock()
m_get_k8s.return_value = m_k8s
m_k8s.get.return_value = {'items': [self.machine]}
driver = node_subnets.OpenShiftNodesSubnets()
driver.subnets.add('foobar')
m_get_subnet_id.return_value = 'foobar'
self.assertFalse(driver.delete_node(self.machine))
m_get_subnet_id.assert_called_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual(['foobar'], driver.get_nodes_subnets())

View File

@ -0,0 +1,84 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.handlers import machine
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
class TestKuryrMachineHandler(test_base.TestCase):
@mock.patch(
'kuryr_kubernetes.controller.drivers.base.NodesSubnetsDriver.'
'get_instance')
def setUp(self, m_get_instance):
super(TestKuryrMachineHandler, self).setUp()
self.driver = mock.Mock()
m_get_instance.return_value = self.driver
self.handler = machine.MachineHandler()
def test_on_present(self):
self.handler._bump_nps = mock.Mock()
self.driver.add_node.return_value = False
self.handler.on_present(mock.sentinel.machine)
self.driver.add_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_not_called()
def test_on_present_new(self):
self.handler._bump_nps = mock.Mock()
self.driver.add_node.return_value = True
self.handler.on_present(mock.sentinel.machine)
self.driver.add_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_called_once()
def test_on_deleted(self):
self.handler._bump_nps = mock.Mock()
self.driver.delete_node.return_value = False
self.handler.on_deleted(mock.sentinel.machine)
self.driver.delete_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_not_called()
def test_on_deleted_gone(self):
self.handler._bump_nps = mock.Mock()
self.driver.delete_node.return_value = True
self.handler.on_deleted(mock.sentinel.machine)
self.driver.delete_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_called_once()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_bump_nps(self, get_client):
m_k8s = mock.Mock()
get_client.return_value = m_k8s
m_k8s.get.return_value = {
'items': [
{'metadata': {'annotations': {
'networkPolicyLink': mock.sentinel.link1}}},
{'metadata': {'annotations': {
'networkPolicyLink': mock.sentinel.link2}}},
{'metadata': {'annotations': {
'networkPolicyLink': mock.sentinel.link3}}},
]
}
m_k8s.annotate.side_effect = (
None, exceptions.K8sResourceNotFound('NP'), None)
self.handler._bump_nps()
m_k8s.get.assert_called_once_with(
constants.K8S_API_CRD_KURYRNETWORKPOLICIES)
m_k8s.annotate.assert_has_calls([
mock.call(mock.sentinel.link1, mock.ANY),
mock.call(mock.sentinel.link2, mock.ANY),
mock.call(mock.sentinel.link3, mock.ANY),
])

View File

@ -449,3 +449,25 @@ class TestUtils(test_base.TestCase):
res = {'metadata': {'selfLink': '/foo'}}
self.assertEqual(utils.get_res_link(res), '/foo')
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test_get_subnet_id(self, m_get_net):
m_net = mock.Mock()
m_get_net.return_value = m_net
subnets = (mock.Mock(id=mock.sentinel.subnet1),
mock.Mock(id=mock.sentinel.subnet2))
m_net.subnets.return_value = iter(subnets)
filters = {'name': 'foo', 'tags': 'bar'}
sub = utils.get_subnet_id(**filters)
m_net.subnets.assert_called_with(**filters)
self.assertEqual(mock.sentinel.subnet1, sub)
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test_get_subnet_not_found(self, m_get_net):
m_net = mock.Mock()
m_get_net.return_value = m_net
m_net.subnets.return_value = iter(())
filters = {'name': 'foo', 'tags': 'bar'}
sub = utils.get_subnet_id(**filters)
m_net.subnets.assert_called_with(**filters)
self.assertIsNone(sub)

View File

@ -84,7 +84,8 @@ RESOURCE_MAP = {'Endpoints': 'endpoints',
'NetworkPolicy': 'networkpolicies',
'Node': 'nodes',
'Pod': 'pods',
'Service': 'services'}
'Service': 'services',
'Machine': 'machines'}
API_RE = re.compile(r'v\d+')
@ -298,6 +299,16 @@ def get_subnet_cidr(subnet_id):
return subnet_obj.cidr
def get_subnet_id(**filters):
os_net = clients.get_network_client()
subnets = os_net.subnets(**filters)
try:
return next(subnets).id
except StopIteration:
return None
@MEMOIZE
def get_subnets_id_cidrs(subnet_ids):
os_net = clients.get_network_client()

View File

@ -98,6 +98,7 @@ kuryr_kubernetes.controller.drivers.vif_pool =
kuryr_kubernetes.controller.drivers.nodes_subnets =
config = kuryr_kubernetes.controller.drivers.node_subnets:ConfigNodesSubnets
openshift = kuryr_kubernetes.controller.drivers.node_subnets:OpenShiftNodesSubnets
kuryr_kubernetes.controller.handlers =
vif = kuryr_kubernetes.controller.handlers.vif:VIFHandler
@ -112,6 +113,7 @@ kuryr_kubernetes.controller.handlers =
kuryrnetwork_population = kuryr_kubernetes.controller.handlers.kuryrnetwork_population:KuryrNetworkPopulationHandler
test_handler = kuryr_kubernetes.tests.unit.controller.handlers.test_fake_handler:TestHandler
kuryrport = kuryr_kubernetes.controller.handlers.kuryrport:KuryrPortHandler
openshift_machine = kuryr_kubernetes.controller.handlers.machine:MachineHandler
kuryr_kubernetes.controller.drivers.multi_vif =
noop = kuryr_kubernetes.controller.drivers.multi_vif:NoopMultiVIFDriver