Add OpenShiftNodesSubnets driver and MachineHandler

In order to support OpenShift's ability to run its nodes in various
OpenStack subnets in a dynamic way, this commit introduces the
OpenShiftNodesSubnets and MachineHandler. The idea is that
MachineHandler is responsible for watching the OpenShift Machine objects
and calling the driver. The driver will then save and serve a list of
current worker nodes subnets.

Change-Id: Iae3a5d011abaeab4aa97d6aa7153227c6f85b93c
This commit is contained in:
Michał Dulko 2020-12-23 17:42:01 +01:00
parent abc39b0e68
commit e95ed536d6
8 changed files with 409 additions and 1 deletions

View File

@ -43,6 +43,9 @@ K8S_OBJ_KURYRNETWORKPOLICY = 'KuryrNetworkPolicy'
K8S_OBJ_KURYRLOADBALANCER = 'KuryrLoadBalancer'
K8S_OBJ_KURYRPORT = 'KuryrPort'
OPENSHIFT_OBJ_MACHINE = 'Machine'
OPENSHIFT_API_CRD_MACHINES = '/apis/machine.openshift.io/v1beta1/machines'
K8S_POD_STATUS_PENDING = 'Pending'
K8S_POD_STATUS_SUCCEEDED = 'Succeeded'
K8S_POD_STATUS_FAILED = 'Failed'

View File

@ -13,10 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base
from kuryr_kubernetes import exceptions
from kuryr_kubernetes import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -41,3 +47,79 @@ class ConfigNodesSubnets(base.NodesSubnetsDriver):
def delete_node(self, node):
return False
class OpenShiftNodesSubnets(base.NodesSubnetsDriver):
"""Provides list of nodes subnets based on OpenShift Machine objects."""
def __init__(self):
super().__init__()
self.subnets = set()
def _get_subnet_from_machine(self, machine):
networks = machine['spec'].get(
'providerSpec', {}).get('value', {}).get('networks')
if not networks:
LOG.warning('No `networks` in Machine `providerSpec`')
return None
subnets = networks[0].get('subnets')
if not subnets:
LOG.warning('No `subnets` in Machine `providerSpec.values.'
'networks`.')
return None
primary_subnet = subnets[0]
if primary_subnet.get('uuid'):
return primary_subnet['uuid']
else:
subnet_filter = primary_subnet['filter']
subnet_id = utils.get_subnet_id(**subnet_filter)
return subnet_id
def get_nodes_subnets(self, raise_on_empty=False):
with lockutils.lock('kuryr-machine-add'):
if not self.subnets and raise_on_empty:
raise exceptions.ResourceNotReady(
'OpenShift Machines does not exist or are not yet '
'handled. Cannot determine worker nodes subnets.')
return list(self.subnets)
def add_node(self, machine):
subnet_id = self._get_subnet_from_machine(machine)
if not subnet_id:
LOG.warning('Could not determine subnet of Machine %s',
machine['metadata']['name'])
return False
with lockutils.lock('kuryr-machine-add'):
if subnet_id not in self.subnets:
LOG.info('Adding subnet %s to the worker nodes subnets as '
'machine %s runs in it.', subnet_id,
machine['metadata']['name'])
self.subnets.add(subnet_id)
return True
return False
def delete_node(self, machine):
k8s = clients.get_kubernetes_client()
affected_subnet_id = self._get_subnet_from_machine(machine)
if not affected_subnet_id:
LOG.warning('Could not determine subnet of Machine %s',
machine['metadata']['name'])
return False
machines = k8s.get(constants.OPENSHIFT_API_CRD_MACHINES)
for existing_machine in machines.get('items', []):
if affected_subnet_id == self._get_subnet_from_machine(
existing_machine):
return False
# We know that subnet is no longer used, so we remove it.
LOG.info('Removing subnet %s from the worker nodes subnets',
affected_subnet_id)
with lockutils.lock('kuryr-machine-add'):
self.subnets.remove(affected_subnet_id)
return True

View File

@ -0,0 +1,67 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from oslo_log import log as logging
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import k8s_base
LOG = logging.getLogger(__name__)
class MachineHandler(k8s_base.ResourceEventHandler):
"""MachineHandler gathers info about OpenShift nodes needed by Kuryr.
At the moment that's the subnets of all the worker nodes.
"""
OBJECT_KIND = constants.OPENSHIFT_OBJ_MACHINE
OBJECT_WATCH_PATH = constants.OPENSHIFT_API_CRD_MACHINES
def __init__(self):
super(MachineHandler, self).__init__()
self.node_subnets_driver = drivers.NodesSubnetsDriver.get_instance()
def _bump_nps(self):
"""Bump NetworkPolicy objects to have the SG rules recalculated."""
k8s = clients.get_kubernetes_client()
# NOTE(dulek): Listing KuryrNetworkPolicies instead of NetworkPolicies,
# as we only care about NPs already handled.
knps = k8s.get(constants.K8S_API_CRD_KURYRNETWORKPOLICIES)
for knp in knps.get('items', []):
try:
k8s.annotate(
knp['metadata']['annotations']['networkPolicyLink'],
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except exceptions.K8sResourceNotFound:
# Had to be deleted in the meanwhile.
pass
def on_present(self, machine):
effect = self.node_subnets_driver.add_node(machine)
if effect:
# If the change was meaningful we need to make sure all the NPs
# are recalculated to get the new SG rules added.
self._bump_nps()
def on_deleted(self, machine):
effect = self.node_subnets_driver.delete_node(machine)
if effect:
# If the change was meaningful we need to make sure all the NPs
# are recalculated to get the old SG rule deleted.
self._bump_nps()

View File

@ -13,9 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_config import cfg
from kuryr_kubernetes.controller.drivers import node_subnets
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
@ -59,3 +62,137 @@ class TestConfigNodesSubnetsDriver(test_base.TestCase):
def test_delete_node(self):
driver = node_subnets.ConfigNodesSubnets()
self.assertFalse(driver.delete_node('node'))
class TestOpenShiftNodesSubnetsDriver(test_base.TestCase):
def setUp(self):
super().setUp()
self.machine = {
"apiVersion": "machine.openshift.io/v1beta1",
"kind": "Machine",
"metadata": {
"name": "foo-tv22d-master-2",
"namespace": "openshift-machine-api",
},
"spec": {
"metadata": {},
"providerSpec": {
"value": {
"cloudName": "openstack",
"cloudsSecret": {
"name": "openstack-cloud-credentials",
"namespace": "openshift-machine-api"
},
"kind": "OpenstackProviderSpec",
"networks": [
{
"filter": {},
"subnets": [{
"filter": {
"name": "foo-tv22d-nodes",
"tags": "openshiftClusterID=foo-tv22d"
}}
]
}
],
}
}
},
"status": {}
}
def test_get_nodes_subnets(self):
subnets = ['subnet1', 'subnet2']
driver = node_subnets.OpenShiftNodesSubnets()
for subnet in subnets:
driver.subnets.add(subnet)
self.assertCountEqual(subnets, driver.get_nodes_subnets())
def test_get_nodes_subnets_not_raise(self):
driver = node_subnets.OpenShiftNodesSubnets()
self.assertEqual([], driver.get_nodes_subnets())
def test_get_nodes_subnets_raise(self):
driver = node_subnets.OpenShiftNodesSubnets()
self.assertRaises(exceptions.ResourceNotReady,
driver.get_nodes_subnets, raise_on_empty=True)
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
m_get_subnet_id.return_value = 'foobar'
self.assertTrue(driver.add_node(self.machine))
m_get_subnet_id.assert_called_once_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual(['foobar'], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node_exists(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
m_get_subnet_id.return_value = 'foobar'
driver.subnets.add('foobar')
self.assertFalse(driver.add_node(self.machine))
m_get_subnet_id.assert_called_once_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual(['foobar'], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node_uuid(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
net = self.machine['spec']['providerSpec']['value']['networks'][0]
del net['subnets'][0]['filter']
net['subnets'][0]['uuid'] = 'barfoo'
self.assertTrue(driver.add_node(self.machine))
m_get_subnet_id.assert_not_called()
self.assertEqual(['barfoo'], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
def test_add_node_cannot(self, m_get_subnet_id):
driver = node_subnets.OpenShiftNodesSubnets()
net = self.machine['spec']['providerSpec']['value']['networks'][0]
del net['subnets']
self.assertFalse(driver.add_node(self.machine))
m_get_subnet_id.assert_not_called()
self.assertEqual([], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_delete_node_cannot(self, m_get_k8s, m_get_subnet_id):
m_k8s = mock.Mock()
m_get_k8s.return_value = m_k8s
driver = node_subnets.OpenShiftNodesSubnets()
net = self.machine['spec']['providerSpec']['value']['networks'][0]
del net['subnets']
self.assertFalse(driver.delete_node(self.machine))
m_get_subnet_id.assert_not_called()
self.assertEqual([], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_delete_node(self, m_get_k8s, m_get_subnet_id):
m_k8s = mock.Mock()
m_get_k8s.return_value = m_k8s
m_k8s.get.return_value = {'items': []}
driver = node_subnets.OpenShiftNodesSubnets()
driver.subnets.add('foobar')
m_get_subnet_id.return_value = 'foobar'
self.assertTrue(driver.delete_node(self.machine))
m_get_subnet_id.assert_called_once_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual([], driver.get_nodes_subnets())
@mock.patch('kuryr_kubernetes.utils.get_subnet_id')
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_delete_node_still_exists(self, m_get_k8s, m_get_subnet_id):
m_k8s = mock.Mock()
m_get_k8s.return_value = m_k8s
m_k8s.get.return_value = {'items': [self.machine]}
driver = node_subnets.OpenShiftNodesSubnets()
driver.subnets.add('foobar')
m_get_subnet_id.return_value = 'foobar'
self.assertFalse(driver.delete_node(self.machine))
m_get_subnet_id.assert_called_with(
name='foo-tv22d-nodes', tags='openshiftClusterID=foo-tv22d')
self.assertEqual(['foobar'], driver.get_nodes_subnets())

View File

@ -0,0 +1,84 @@
# Copyright 2020 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.handlers import machine
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.tests import base as test_base
class TestKuryrMachineHandler(test_base.TestCase):
@mock.patch(
'kuryr_kubernetes.controller.drivers.base.NodesSubnetsDriver.'
'get_instance')
def setUp(self, m_get_instance):
super(TestKuryrMachineHandler, self).setUp()
self.driver = mock.Mock()
m_get_instance.return_value = self.driver
self.handler = machine.MachineHandler()
def test_on_present(self):
self.handler._bump_nps = mock.Mock()
self.driver.add_node.return_value = False
self.handler.on_present(mock.sentinel.machine)
self.driver.add_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_not_called()
def test_on_present_new(self):
self.handler._bump_nps = mock.Mock()
self.driver.add_node.return_value = True
self.handler.on_present(mock.sentinel.machine)
self.driver.add_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_called_once()
def test_on_deleted(self):
self.handler._bump_nps = mock.Mock()
self.driver.delete_node.return_value = False
self.handler.on_deleted(mock.sentinel.machine)
self.driver.delete_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_not_called()
def test_on_deleted_gone(self):
self.handler._bump_nps = mock.Mock()
self.driver.delete_node.return_value = True
self.handler.on_deleted(mock.sentinel.machine)
self.driver.delete_node.assert_called_once_with(mock.sentinel.machine)
self.handler._bump_nps.assert_called_once()
@mock.patch('kuryr_kubernetes.clients.get_kubernetes_client')
def test_bump_nps(self, get_client):
m_k8s = mock.Mock()
get_client.return_value = m_k8s
m_k8s.get.return_value = {
'items': [
{'metadata': {'annotations': {
'networkPolicyLink': mock.sentinel.link1}}},
{'metadata': {'annotations': {
'networkPolicyLink': mock.sentinel.link2}}},
{'metadata': {'annotations': {
'networkPolicyLink': mock.sentinel.link3}}},
]
}
m_k8s.annotate.side_effect = (
None, exceptions.K8sResourceNotFound('NP'), None)
self.handler._bump_nps()
m_k8s.get.assert_called_once_with(
constants.K8S_API_CRD_KURYRNETWORKPOLICIES)
m_k8s.annotate.assert_has_calls([
mock.call(mock.sentinel.link1, mock.ANY),
mock.call(mock.sentinel.link2, mock.ANY),
mock.call(mock.sentinel.link3, mock.ANY),
])

View File

@ -440,3 +440,25 @@ class TestUtils(test_base.TestCase):
res = {'metadata': {'selfLink': '/foo'}}
self.assertEqual(utils.get_res_link(res), '/foo')
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test_get_subnet_id(self, m_get_net):
m_net = mock.Mock()
m_get_net.return_value = m_net
subnets = (mock.Mock(id=mock.sentinel.subnet1),
mock.Mock(id=mock.sentinel.subnet2))
m_net.subnets.return_value = iter(subnets)
filters = {'name': 'foo', 'tags': 'bar'}
sub = utils.get_subnet_id(**filters)
m_net.subnets.assert_called_with(**filters)
self.assertEqual(mock.sentinel.subnet1, sub)
@mock.patch('kuryr_kubernetes.clients.get_network_client')
def test_get_subnet_not_found(self, m_get_net):
m_net = mock.Mock()
m_get_net.return_value = m_net
m_net.subnets.return_value = iter(())
filters = {'name': 'foo', 'tags': 'bar'}
sub = utils.get_subnet_id(**filters)
m_net.subnets.assert_called_with(**filters)
self.assertIsNone(sub)

View File

@ -82,7 +82,8 @@ RESOURCE_MAP = {'Endpoints': 'endpoints',
'NetworkPolicy': 'networkpolicies',
'Node': 'nodes',
'Pod': 'pods',
'Service': 'services'}
'Service': 'services',
'Machine': 'machines'}
API_RE = re.compile(r'v\d+')
@ -296,6 +297,16 @@ def get_subnet_cidr(subnet_id):
return subnet_obj.cidr
def get_subnet_id(**filters):
os_net = clients.get_network_client()
subnets = os_net.subnets(**filters)
try:
return next(subnets).id
except StopIteration:
return None
@MEMOIZE
def get_subnets_id_cidrs(subnet_ids):
os_net = clients.get_network_client()

View File

@ -98,6 +98,7 @@ kuryr_kubernetes.controller.drivers.vif_pool =
kuryr_kubernetes.controller.drivers.nodes_subnets =
config = kuryr_kubernetes.controller.drivers.node_subnets:ConfigNodesSubnets
openshift = kuryr_kubernetes.controller.drivers.node_subnets:OpenShiftNodesSubnets
kuryr_kubernetes.controller.handlers =
vif = kuryr_kubernetes.controller.handlers.vif:VIFHandler
@ -112,6 +113,7 @@ kuryr_kubernetes.controller.handlers =
kuryrnetwork_population = kuryr_kubernetes.controller.handlers.kuryrnetwork_population:KuryrNetworkPopulationHandler
test_handler = kuryr_kubernetes.tests.unit.controller.handlers.test_fake_handler:TestHandler
kuryrport = kuryr_kubernetes.controller.handlers.kuryrport:KuryrPortHandler
openshift_machine = kuryr_kubernetes.controller.handlers.machine:MachineHandler
kuryr_kubernetes.controller.drivers.multi_vif =
noop = kuryr_kubernetes.controller.drivers.multi_vif:NoopMultiVIFDriver