diff --git a/kuryr_kubernetes/controller/handlers/__init__.py b/kuryr_kubernetes/controller/handlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kuryr_kubernetes/controller/handlers/pipeline.py b/kuryr_kubernetes/controller/handlers/pipeline.py new file mode 100644 index 000000000..6e5d24b3d --- /dev/null +++ b/kuryr_kubernetes/controller/handlers/pipeline.py @@ -0,0 +1,64 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from kuryr_kubernetes.handlers import asynchronous as h_async +from kuryr_kubernetes.handlers import dispatch as h_dis +from kuryr_kubernetes.handlers import k8s_base as h_k8s +from kuryr_kubernetes.handlers import logging as h_log +from kuryr_kubernetes.handlers import retry as h_retry + +LOG = logging.getLogger(__name__) + + +class ControllerPipeline(h_dis.EventPipeline): + """Serves as an entry point for controller Kubernetes events. + + `ControllerPipeline` is an entry point handler for the Kuryr-Kubernetes + controller. `ControllerPipeline` allows registering + :class:`kuryr_kubernetes.handlers.k8s_base.ResourceEventHandler`s and + ensures the proper handler is called for each event that is passed to the + `ControllerPipeline`. Also it ensures the following behavior: + + - multiple `ResourceEventHandler`s can be registered for the same + resource type (`OBJECT_KIND`) + + - failing handlers (i.e. ones that raise `Exception`s) are retried + until either the handler succeeds or a finite amount of time passes, + in which case the most recent exception is logged + + - in case there are multiple handlers registered for the same resource + type, all such handlers are considered independent (i.e. if one + handler fails, other handlers will still be called regardless; and the + order in which such handlers are called is not determined) + + - events for different Kubernetes objects can be handled concurrently + + - events for the same Kubernetes object are handled sequentially in + the order of arrival + """ + + def __init__(self, thread_group): + self._tg = thread_group + super(ControllerPipeline, self).__init__() + + def _wrap_consumer(self, consumer): + # TODO(ivc): tune retry interval/timeout + return h_log.LogExceptions(h_retry.Retry(consumer)) + + def _wrap_dispatcher(self, dispatcher): + return h_log.LogExceptions(h_async.Async(dispatcher, self._tg, + h_k8s.object_uid)) diff --git a/kuryr_kubernetes/controller/service.py b/kuryr_kubernetes/controller/service.py index 109dcfcf7..4a84b18af 100644 --- a/kuryr_kubernetes/controller/service.py +++ b/kuryr_kubernetes/controller/service.py @@ -22,10 +22,8 @@ from oslo_service import service from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants -from kuryr_kubernetes.handlers import asynchronous as h_async -from kuryr_kubernetes.handlers import dispatch as h_dis +from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline from kuryr_kubernetes.handlers import k8s_base as h_k8s -from kuryr_kubernetes.handlers import retry as h_retry from kuryr_kubernetes import watcher LOG = logging.getLogger(__name__) @@ -38,7 +36,7 @@ class KuryrK8sService(service.Service): super(KuryrK8sService, self).__init__() class DummyHandler(h_k8s.ResourceEventHandler): - OBJECT_KIND = constants.K8S_OBJ_NAMESPACE + # TODO(ivc): remove once real handlers are ready def __init__(self): self.event_seq = 0 @@ -65,24 +63,23 @@ class KuryrK8sService(service.Service): LOG.debug("present: %s", event['object']['metadata']['selfLink']) - class DummyPipeline(h_dis.EventPipeline): - def __init__(self, thread_group): - self._tg = thread_group - super(DummyPipeline, self).__init__() + class DummyPodHandler(DummyHandler): + OBJECT_KIND = constants.K8S_OBJ_POD - def _wrap_consumer(self, consumer): - retry = h_retry.Retry(consumer) - return super(DummyPipeline, self)._wrap_consumer(retry) + class DummyServiceHandler(DummyHandler): + OBJECT_KIND = constants.K8S_OBJ_SERVICE - def _wrap_dispatcher(self, dispatcher): - handler = super(DummyPipeline, self)._wrap_dispatcher( - dispatcher) - return h_async.Async(handler, self._tg, h_k8s.object_uid) + class DummyEndpointsHandler(DummyHandler): + OBJECT_KIND = constants.K8S_OBJ_ENDPOINTS - pipeline = DummyPipeline(self.tg) - pipeline.register(DummyHandler()) + pipeline = h_pipeline.ControllerPipeline(self.tg) self.watcher = watcher.Watcher(pipeline, self.tg) - self.watcher.add(constants.K8S_API_NAMESPACES) + # TODO(ivc): pluggable resource/handler registration + for resource in ["pods", "services", "endpoints"]: + self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource)) + pipeline.register(DummyPodHandler()) + pipeline.register(DummyServiceHandler()) + pipeline.register(DummyEndpointsHandler()) def start(self): LOG.info(_LI("Service '%s' starting"), self.__class__.__name__) diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/__init__.py b/kuryr_kubernetes/tests/unit/controller/handlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py new file mode 100644 index 000000000..9dd9d11eb --- /dev/null +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_pipeline.py @@ -0,0 +1,60 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline +from kuryr_kubernetes.handlers import dispatch as h_dis +from kuryr_kubernetes.handlers import k8s_base as h_k8s +from kuryr_kubernetes.tests import base as test_base + + +class TestControllerPipeline(test_base.TestCase): + @mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions') + @mock.patch('kuryr_kubernetes.handlers.retry.Retry') + def test_wrap_consumer(self, m_retry_type, m_logging_type): + consumer = mock.sentinel.consumer + retry_handler = mock.sentinel.retry_handler + logging_handler = mock.sentinel.logging_handler + m_retry_type.return_value = retry_handler + m_logging_type.return_value = logging_handler + thread_group = mock.sentinel.thread_group + + with mock.patch.object(h_dis.EventPipeline, '__init__'): + pipeline = h_pipeline.ControllerPipeline(thread_group) + ret = pipeline._wrap_consumer(consumer) + + self.assertEqual(logging_handler, ret) + m_logging_type.assert_called_with(retry_handler) + m_retry_type.assert_called_with(consumer) + + @mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions') + @mock.patch('kuryr_kubernetes.handlers.asynchronous.Async') + def test_wrap_dispatcher(self, m_async_type, m_logging_type): + dispatcher = mock.sentinel.dispatcher + async_handler = mock.sentinel.async_handler + logging_handler = mock.sentinel.logging_handler + m_async_type.return_value = async_handler + m_logging_type.return_value = logging_handler + thread_group = mock.sentinel.thread_group + + with mock.patch.object(h_dis.EventPipeline, '__init__'): + pipeline = h_pipeline.ControllerPipeline(thread_group) + ret = pipeline._wrap_dispatcher(dispatcher) + + self.assertEqual(logging_handler, ret) + m_logging_type.assert_called_with(async_handler) + m_async_type.assert_called_with(dispatcher, thread_group, + h_k8s.object_uid)