Controller side of pods' port/VIF binding

This patch introduces VIFHandler that is used by Kuryr-Kubernetes
controller to manage VIF annotation and related Neutron resources
which are used by CNI driver to enable pod networking.

Change-Id: I1acfa052a530f06def4f7179945fac1687e1951a
Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
Ilya Chukhnakov 2016-11-09 17:15:49 +03:00
parent d6dd891bef
commit 4c78d6e51a
8 changed files with 341 additions and 49 deletions

View File

@ -20,3 +20,8 @@ K8S_OBJ_NAMESPACE = 'Namespace'
K8S_OBJ_POD = 'Pod'
K8S_OBJ_SERVICE = 'Service'
K8S_OBJ_ENDPOINTS = 'Endpoints'
K8S_POD_STATUS_PENDING = 'Pending'
K8S_ANNOTATION_PREFIX = 'openstack.org/kuryr'
K8S_ANNOTATION_VIF = K8S_ANNOTATION_PREFIX + '-vif'

View File

@ -15,6 +15,7 @@
from oslo_log import log as logging
from kuryr_kubernetes import exceptions
from kuryr_kubernetes.handlers import asynchronous as h_async
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.handlers import k8s_base as h_k8s
@ -57,7 +58,8 @@ class ControllerPipeline(h_dis.EventPipeline):
def _wrap_consumer(self, consumer):
# TODO(ivc): tune retry interval/timeout
return h_log.LogExceptions(h_retry.Retry(consumer))
return h_log.LogExceptions(h_retry.Retry(
consumer, exceptions=exceptions.ResourceNotReady))
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(h_async.Async(dispatcher, self._tg,

View File

@ -0,0 +1,124 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_vif import objects as obj_vif
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes import clients
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
LOG = logging.getLogger(__name__)
class VIFHandler(k8s_base.ResourceEventHandler):
"""Controller side of VIF binding process for Kubernetes pods.
`VIFHandler` runs on the Kuryr-Kubernetes controller and together with
the CNI driver (that runs on 'kubelet' nodes) is responsible for providing
networking to Kubernetes pods. `VIFHandler` relies on a set of drivers
(which are responsible for managing Neutron resources) to define the VIF
object and pass it to the CNI driver in form of the Kubernetes pod
annotation.
"""
OBJECT_KIND = constants.K8S_OBJ_POD
def __init__(self):
self._drv_project = drivers.PodProjectDriver.get_instance()
self._drv_subnets = drivers.PodSubnetsDriver.get_instance()
self._drv_sg = drivers.PodSecurityGroupsDriver.get_instance()
self._drv_vif = drivers.PodVIFDriver.get_instance()
def on_present(self, pod):
if self._is_host_network(pod) or not self._is_pending(pod):
# REVISIT(ivc): consider an additional configurable check that
# would allow skipping pods to enable heterogeneous environments
# where certain pods/namespaces/nodes can be managed by other
# networking solutions/CNI drivers.
return
vif = self._get_vif(pod)
if not vif:
project_id = self._drv_project.get_project(pod)
security_groups = self._drv_sg.get_security_groups(pod, project_id)
subnets = self._drv_subnets.get_subnets(pod, project_id)
vif = self._drv_vif.request_vif(pod, project_id, subnets,
security_groups)
try:
self._set_vif(pod, vif)
except k_exc.K8sClientException as ex:
LOG.debug("Failed to set annotation: %s", ex)
# FIXME(ivc): improve granularity of K8sClient exceptions:
# only resourceVersion conflict should be ignored
self._drv_vif.release_vif(pod, vif)
elif not vif.active:
self._drv_vif.activate_vif(pod, vif)
self._set_vif(pod, vif)
def on_deleted(self, pod):
if self._is_host_network(pod):
return
vif = self._get_vif(pod)
if vif:
self._drv_vif.release_vif(pod, vif)
@staticmethod
def _is_host_network(pod):
try:
return pod['spec']['hostNetwork']
except KeyError:
return False
@staticmethod
def _is_pending(pod):
try:
return (pod['spec']['nodeName'] and
pod['status']['phase'] == constants.K8S_POD_STATUS_PENDING)
except KeyError:
return False
def _set_vif(self, pod, vif):
# TODO(ivc): extract annotation interactions
if vif is None:
LOG.debug("Removing VIF annotation: %r", vif)
annotation = None
else:
vif.obj_reset_changes(recursive=True)
LOG.debug("Setting VIF annotation: %r", vif)
annotation = jsonutils.dumps(vif.obj_to_primitive(),
sort_keys=True)
k8s = clients.get_kubernetes_client()
k8s.annotate(pod['metadata']['selfLink'],
{constants.K8S_ANNOTATION_VIF: annotation},
resource_version=pod['metadata']['resourceVersion'])
def _get_vif(self, pod):
# TODO(ivc): same as '_set_vif'
try:
annotations = pod['metadata']['annotations']
vif_annotation = annotations[constants.K8S_ANNOTATION_VIF]
except KeyError:
return None
vif_dict = jsonutils.loads(vif_annotation)
vif = obj_vif.vif.VIFBase.obj_from_primitive(vif_dict)
LOG.debug("Got VIF from annotation: %r", vif)
return vif

View File

@ -15,7 +15,7 @@
import sys
from kuryr.lib._i18n import _LI, _LE
from kuryr.lib._i18n import _LI
import os_vif
from oslo_log import log as logging
from oslo_service import service
@ -24,7 +24,7 @@ from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.controller.handlers import vif as h_vif
from kuryr_kubernetes import watcher
LOG = logging.getLogger(__name__)
@ -36,51 +36,12 @@ class KuryrK8sService(service.Service):
def __init__(self):
super(KuryrK8sService, self).__init__()
class DummyHandler(h_k8s.ResourceEventHandler):
# TODO(ivc): remove once real handlers are ready
def __init__(self):
self.event_seq = 0
def __call__(self, event):
self.event_seq += 1
if self.event_seq % 4:
raise Exception(_LE("Dummy exception %s") % self.event_seq)
super(DummyHandler, self).__call__(event)
def on_added(self, event):
LOG.debug("added: %s",
event['object']['metadata']['selfLink'])
def on_deleted(self, event):
LOG.debug("deleted: %s",
event['object']['metadata']['selfLink'])
def on_modified(self, event):
LOG.debug("modified: %s",
event['object']['metadata']['selfLink'])
def on_present(self, event):
LOG.debug("present: %s",
event['object']['metadata']['selfLink'])
class DummyPodHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_POD
class DummyServiceHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_SERVICE
class DummyEndpointsHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_ENDPOINTS
pipeline = h_pipeline.ControllerPipeline(self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg)
# TODO(ivc): pluggable resource/handler registration
for resource in ["pods", "services", "endpoints"]:
self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource))
pipeline.register(DummyPodHandler())
pipeline.register(DummyServiceHandler())
pipeline.register(DummyEndpointsHandler())
pipeline.register(h_vif.VIFHandler())
def start(self):
LOG.info(_LI("Service '%s' starting"), self.__class__.__name__)

View File

@ -35,13 +35,14 @@ class K8sClient(object):
raise exc.K8sClientException(response.text)
return response.json()
def annotate(self, path, annotations):
def annotate(self, path, annotations, resource_version=None):
url = self._base_url + path
data = jsonutils.dumps({
"metadata": {
"annotations": annotations
"annotations": annotations,
"resourceVersion": resource_version,
}
})
}, sort_keys=True)
response = requests.patch(url, data=data, headers={
'Content-Type': 'application/merge-patch+json',
'Accept': 'application/json',

View File

@ -38,7 +38,7 @@ class TestControllerPipeline(test_base.TestCase):
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(retry_handler)
m_retry_type.assert_called_with(consumer)
m_retry_type.assert_called_with(consumer, exceptions=mock.ANY)
@mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')
@mock.patch('kuryr_kubernetes.handlers.asynchronous.Async')

View File

@ -0,0 +1,196 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drivers
from kuryr_kubernetes.controller.handlers import vif as h_vif
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.tests import base as test_base
class TestVIFHandler(test_base.TestCase):
def setUp(self):
super(TestVIFHandler, self).setUp()
self._project_id = mock.sentinel.project_id
self._subnets = mock.sentinel.subnets
self._security_groups = mock.sentinel.security_groups
self._vif = mock.Mock()
self._vif.active = True
self._vif_serialized = mock.sentinel.vif_serialized
self._pod_version = mock.sentinel.pod_version
self._pod_link = mock.sentinel.pod_link
self._pod = {
'metadata': {'resourceVersion': self._pod_version,
'selfLink': self._pod_link},
'status': {'phase': k_const.K8S_POD_STATUS_PENDING},
'spec': {'hostNetwork': False,
'nodeName': 'hostname'}
}
self._handler = mock.MagicMock(spec=h_vif.VIFHandler)
self._handler._drv_project = mock.Mock(spec=drivers.PodProjectDriver)
self._handler._drv_subnets = mock.Mock(spec=drivers.PodSubnetsDriver)
self._handler._drv_sg = mock.Mock(spec=drivers.PodSecurityGroupsDriver)
self._handler._drv_vif = mock.Mock(spec=drivers.PodVIFDriver)
self._get_project = self._handler._drv_project.get_project
self._get_subnets = self._handler._drv_subnets.get_subnets
self._get_security_groups = self._handler._drv_sg.get_security_groups
self._request_vif = self._handler._drv_vif.request_vif
self._release_vif = self._handler._drv_vif.release_vif
self._activate_vif = self._handler._drv_vif.activate_vif
self._get_vif = self._handler._get_vif
self._set_vif = self._handler._set_vif
self._is_host_network = self._handler._is_host_network
self._is_pending = self._handler._is_pending
self._request_vif.return_value = self._vif
self._get_vif.return_value = self._vif
self._is_host_network.return_value = False
self._is_pending.return_value = True
self._get_project.return_value = self._project_id
self._get_subnets.return_value = self._subnets
self._get_security_groups.return_value = self._security_groups
@mock.patch.object(drivers.PodVIFDriver, 'get_instance')
@mock.patch.object(drivers.PodSecurityGroupsDriver, 'get_instance')
@mock.patch.object(drivers.PodSubnetsDriver, 'get_instance')
@mock.patch.object(drivers.PodProjectDriver, 'get_instance')
def test_init(self, m_get_project_driver, m_get_subnets_driver,
m_get_sg_driver, m_get_vif_driver):
project_driver = mock.sentinel.project_driver
subnets_driver = mock.sentinel.subnets_driver
sg_driver = mock.sentinel.sg_driver
vif_driver = mock.sentinel.vif_driver
m_get_project_driver.return_value = project_driver
m_get_subnets_driver.return_value = subnets_driver
m_get_sg_driver.return_value = sg_driver
m_get_vif_driver.return_value = vif_driver
handler = h_vif.VIFHandler()
self.assertEqual(project_driver, handler._drv_project)
self.assertEqual(subnets_driver, handler._drv_subnets)
self.assertEqual(sg_driver, handler._drv_sg)
self.assertEqual(vif_driver, handler._drv_vif)
def test_is_host_network(self):
self._pod['spec']['hostNetwork'] = True
self.assertTrue(h_vif.VIFHandler._is_host_network(self._pod))
def test_is_not_host_network(self):
self.assertFalse(h_vif.VIFHandler._is_host_network(self._pod))
def test_unset_host_network(self):
self.assertFalse(h_vif.VIFHandler._is_host_network({}))
def test_is_pending(self):
self.assertTrue(h_vif.VIFHandler._is_pending(self._pod))
def test_is_not_pending(self):
self._pod['status']['phase'] = 'Unknown'
self.assertFalse(h_vif.VIFHandler._is_pending(self._pod))
def test_unset_pending(self):
self.assertFalse(h_vif.VIFHandler._is_pending({}))
def test_on_present(self):
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._request_vif.assert_not_called()
self._activate_vif.assert_not_called()
self._set_vif.assert_not_called()
def test_on_present_host_network(self):
self._is_host_network.return_value = True
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_not_called()
self._request_vif.assert_not_called()
self._activate_vif.assert_not_called()
self._set_vif.assert_not_called()
def test_on_present_not_pending(self):
self._is_pending.return_value = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_not_called()
self._request_vif.assert_not_called()
self._activate_vif.assert_not_called()
self._set_vif.assert_not_called()
def test_on_present_activate(self):
self._vif.active = False
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._activate_vif.assert_called_once_with(self._pod, self._vif)
self._set_vif.assert_called_once_with(self._pod, self._vif)
self._request_vif.assert_not_called()
def test_on_present_create(self):
self._get_vif.return_value = None
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._request_vif.assert_called_once_with(
self._pod, self._project_id, self._subnets, self._security_groups)
self._set_vif.assert_called_once_with(self._pod, self._vif)
self._activate_vif.assert_not_called()
def test_on_present_rollback(self):
self._get_vif.return_value = None
self._set_vif.side_effect = k_exc.K8sClientException
h_vif.VIFHandler.on_present(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._request_vif.assert_called_once_with(
self._pod, self._project_id, self._subnets, self._security_groups)
self._set_vif.assert_called_once_with(self._pod, self._vif)
self._release_vif.assert_called_once_with(self._pod, self._vif)
self._activate_vif.assert_not_called()
def test_on_deleted(self):
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._release_vif.assert_called_once_with(self._pod, self._vif)
def test_on_deleted_host_network(self):
self._is_host_network.return_value = True
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
self._get_vif.assert_not_called()
self._release_vif.assert_not_called()
def test_on_deleted_no_annotation(self):
self._get_vif.return_value = None
h_vif.VIFHandler.on_deleted(self._handler, self._pod)
self._get_vif.assert_called_once_with(self._pod)
self._release_vif.assert_not_called()

View File

@ -56,7 +56,9 @@ class TestK8sClient(test_base.TestCase):
def test_annotate(self, m_patch):
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
ret = {'metadata': {'annotations': annotations}}
resource_version = "123"
ret = {'metadata': {'annotations': annotations,
"resourceVersion": resource_version}}
data = jsonutils.dumps(ret, sort_keys=True)
m_resp = mock.MagicMock()
@ -64,7 +66,8 @@ class TestK8sClient(test_base.TestCase):
m_resp.json.return_value = ret
m_patch.return_value = m_resp
self.assertEqual(annotations, self.client.annotate(path, annotations))
self.assertEqual(annotations, self.client.annotate(
path, annotations, resource_version=resource_version))
m_patch.assert_called_once_with(self.base_url + path,
data=data, headers=mock.ANY)