Controller event handling pipeline

This patch introduces ControllerPipeline which is an entry point
handler for the Kuryr-Kubernetes controller. ControllerPipeline
allows registering ResourceEventHandlers and ensures the proper
handler is called for each event that is passed to the
ControllerPipeline. Also it ensures the following behavior:

- multiple ResourceEventHandlers can be registered for the same
  resource type (OBJECT_KIND)

- failing handlers (i.e. ones that raise Exceptions) are retried
  until either the handler succeeds or a finite amount of time
  passes, in which case the most recent exception is logged

- in case there are multiple handlers registered for the same
  resource type, all such handlers are considered independent (i.e.
  if one handler fails, other handlers will still be called
  regardless; and the order in which such handlers are called is not
  determined)

- events for different Kubernetes objects can be handled concurrently

- events for the same Kubernetes object are handled sequentially in
  the order of arrival

Change-Id: Ib17e0c7a2790cdbc31be4f59d50972b43c272480
Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
Ilya Chukhnakov 2016-10-28 08:39:31 +03:00
parent ac672fd30a
commit f0e1c206a9
5 changed files with 139 additions and 18 deletions

View File

@ -0,0 +1,64 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from kuryr_kubernetes.handlers import asynchronous as h_async
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.handlers import logging as h_log
from kuryr_kubernetes.handlers import retry as h_retry
LOG = logging.getLogger(__name__)
class ControllerPipeline(h_dis.EventPipeline):
"""Serves as an entry point for controller Kubernetes events.
`ControllerPipeline` is an entry point handler for the Kuryr-Kubernetes
controller. `ControllerPipeline` allows registering
:class:`kuryr_kubernetes.handlers.k8s_base.ResourceEventHandler`s and
ensures the proper handler is called for each event that is passed to the
`ControllerPipeline`. Also it ensures the following behavior:
- multiple `ResourceEventHandler`s can be registered for the same
resource type (`OBJECT_KIND`)
- failing handlers (i.e. ones that raise `Exception`s) are retried
until either the handler succeeds or a finite amount of time passes,
in which case the most recent exception is logged
- in case there are multiple handlers registered for the same resource
type, all such handlers are considered independent (i.e. if one
handler fails, other handlers will still be called regardless; and the
order in which such handlers are called is not determined)
- events for different Kubernetes objects can be handled concurrently
- events for the same Kubernetes object are handled sequentially in
the order of arrival
"""
def __init__(self, thread_group):
self._tg = thread_group
super(ControllerPipeline, self).__init__()
def _wrap_consumer(self, consumer):
# TODO(ivc): tune retry interval/timeout
return h_log.LogExceptions(h_retry.Retry(consumer))
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(h_async.Async(dispatcher, self._tg,
h_k8s.object_uid))

View File

@ -22,10 +22,8 @@ from oslo_service import service
from kuryr_kubernetes import clients
from kuryr_kubernetes import config
from kuryr_kubernetes import constants
from kuryr_kubernetes.handlers import asynchronous as h_async
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.handlers import retry as h_retry
from kuryr_kubernetes import watcher
LOG = logging.getLogger(__name__)
@ -38,7 +36,7 @@ class KuryrK8sService(service.Service):
super(KuryrK8sService, self).__init__()
class DummyHandler(h_k8s.ResourceEventHandler):
OBJECT_KIND = constants.K8S_OBJ_NAMESPACE
# TODO(ivc): remove once real handlers are ready
def __init__(self):
self.event_seq = 0
@ -65,24 +63,23 @@ class KuryrK8sService(service.Service):
LOG.debug("present: %s",
event['object']['metadata']['selfLink'])
class DummyPipeline(h_dis.EventPipeline):
def __init__(self, thread_group):
self._tg = thread_group
super(DummyPipeline, self).__init__()
class DummyPodHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_POD
def _wrap_consumer(self, consumer):
retry = h_retry.Retry(consumer)
return super(DummyPipeline, self)._wrap_consumer(retry)
class DummyServiceHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_SERVICE
def _wrap_dispatcher(self, dispatcher):
handler = super(DummyPipeline, self)._wrap_dispatcher(
dispatcher)
return h_async.Async(handler, self._tg, h_k8s.object_uid)
class DummyEndpointsHandler(DummyHandler):
OBJECT_KIND = constants.K8S_OBJ_ENDPOINTS
pipeline = DummyPipeline(self.tg)
pipeline.register(DummyHandler())
pipeline = h_pipeline.ControllerPipeline(self.tg)
self.watcher = watcher.Watcher(pipeline, self.tg)
self.watcher.add(constants.K8S_API_NAMESPACES)
# TODO(ivc): pluggable resource/handler registration
for resource in ["pods", "services", "endpoints"]:
self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource))
pipeline.register(DummyPodHandler())
pipeline.register(DummyServiceHandler())
pipeline.register(DummyEndpointsHandler())
def start(self):
LOG.info(_LI("Service '%s' starting"), self.__class__.__name__)

View File

@ -0,0 +1,60 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kuryr_kubernetes.controller.handlers import pipeline as h_pipeline
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.tests import base as test_base
class TestControllerPipeline(test_base.TestCase):
@mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')
@mock.patch('kuryr_kubernetes.handlers.retry.Retry')
def test_wrap_consumer(self, m_retry_type, m_logging_type):
consumer = mock.sentinel.consumer
retry_handler = mock.sentinel.retry_handler
logging_handler = mock.sentinel.logging_handler
m_retry_type.return_value = retry_handler
m_logging_type.return_value = logging_handler
thread_group = mock.sentinel.thread_group
with mock.patch.object(h_dis.EventPipeline, '__init__'):
pipeline = h_pipeline.ControllerPipeline(thread_group)
ret = pipeline._wrap_consumer(consumer)
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(retry_handler)
m_retry_type.assert_called_with(consumer)
@mock.patch('kuryr_kubernetes.handlers.logging.LogExceptions')
@mock.patch('kuryr_kubernetes.handlers.asynchronous.Async')
def test_wrap_dispatcher(self, m_async_type, m_logging_type):
dispatcher = mock.sentinel.dispatcher
async_handler = mock.sentinel.async_handler
logging_handler = mock.sentinel.logging_handler
m_async_type.return_value = async_handler
m_logging_type.return_value = logging_handler
thread_group = mock.sentinel.thread_group
with mock.patch.object(h_dis.EventPipeline, '__init__'):
pipeline = h_pipeline.ControllerPipeline(thread_group)
ret = pipeline._wrap_dispatcher(dispatcher)
self.assertEqual(logging_handler, ret)
m_logging_type.assert_called_with(async_handler)
m_async_type.assert_called_with(dispatcher, thread_group,
h_k8s.object_uid)