From a3790b155543ca7f8b05432e37cdfdbbbc28849c Mon Sep 17 00:00:00 2001 From: Ilya Chukhnakov Date: Thu, 13 Oct 2016 22:28:27 +0300 Subject: [PATCH] Basic event handling pipeline components This patch provides basic event handling pipeline components. The EventPipeline class provided by this patch will serve as a base class for ControllerEventPipeline and CNIEventPipeline classes that are to be implemented by future patches. Change-Id: I8c68941cbe323f80cd67341fe05656efd57bcdc5 Partially-Implements: blueprint kuryr-k8s-integration --- kuryr_kubernetes/constants.py | 5 + kuryr_kubernetes/controller/service.py | 26 +++- kuryr_kubernetes/handlers/__init__.py | 0 kuryr_kubernetes/handlers/base.py | 30 +++++ kuryr_kubernetes/handlers/dispatch.py | 117 ++++++++++++++++++ kuryr_kubernetes/handlers/k8s_base.py | 70 +++++++++++ kuryr_kubernetes/handlers/logging.py | 40 ++++++ .../tests/unit/handlers/__init__.py | 0 .../tests/unit/handlers/test_dispatch.py | 107 ++++++++++++++++ .../tests/unit/handlers/test_k8s_base.py | 64 ++++++++++ .../tests/unit/handlers/test_logging.py | 65 ++++++++++ 11 files changed, 521 insertions(+), 3 deletions(-) create mode 100644 kuryr_kubernetes/handlers/__init__.py create mode 100644 kuryr_kubernetes/handlers/base.py create mode 100644 kuryr_kubernetes/handlers/dispatch.py create mode 100644 kuryr_kubernetes/handlers/k8s_base.py create mode 100644 kuryr_kubernetes/handlers/logging.py create mode 100644 kuryr_kubernetes/tests/unit/handlers/__init__.py create mode 100644 kuryr_kubernetes/tests/unit/handlers/test_dispatch.py create mode 100644 kuryr_kubernetes/tests/unit/handlers/test_k8s_base.py create mode 100644 kuryr_kubernetes/tests/unit/handlers/test_logging.py diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py index 2488b10fb..16fa00d5d 100644 --- a/kuryr_kubernetes/constants.py +++ b/kuryr_kubernetes/constants.py @@ -15,3 +15,8 @@ K8S_API_BASE = '/api/v1' K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces' + +K8S_OBJ_NAMESPACE = 'Namespace' +K8S_OBJ_POD = 'Pod' +K8S_OBJ_SERVICE = 'Service' +K8S_OBJ_ENDPOINTS = 'Endpoints' diff --git a/kuryr_kubernetes/controller/service.py b/kuryr_kubernetes/controller/service.py index 4c8be142d..4a5c0e641 100644 --- a/kuryr_kubernetes/controller/service.py +++ b/kuryr_kubernetes/controller/service.py @@ -22,6 +22,8 @@ from oslo_service import service from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants +from kuryr_kubernetes.handlers import dispatch as h_dis +from kuryr_kubernetes.handlers import k8s_base as h_k8s from kuryr_kubernetes import watcher LOG = logging.getLogger(__name__) @@ -33,10 +35,28 @@ class KuryrK8sService(service.Service): def __init__(self): super(KuryrK8sService, self).__init__() - def dummy_handler(event): - LOG.debug("Event: %r", event) + class DummyHandler(h_k8s.ResourceEventHandler): + OBJECT_KIND = constants.K8S_OBJ_NAMESPACE - self.watcher = watcher.Watcher(dummy_handler, self.tg) + def on_added(self, event): + LOG.debug("added: %s", + event['object']['metadata']['selfLink']) + + def on_deleted(self, event): + LOG.debug("deleted: %s", + event['object']['metadata']['selfLink']) + + def on_modified(self, event): + LOG.debug("modified: %s", + event['object']['metadata']['selfLink']) + + def on_present(self, event): + LOG.debug("present: %s", + event['object']['metadata']['selfLink']) + + pipeline = h_dis.EventPipeline() + pipeline.register(DummyHandler()) + self.watcher = watcher.Watcher(pipeline, self.tg) self.watcher.add(constants.K8S_API_NAMESPACES) def start(self): diff --git a/kuryr_kubernetes/handlers/__init__.py b/kuryr_kubernetes/handlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kuryr_kubernetes/handlers/base.py b/kuryr_kubernetes/handlers/base.py new file mode 100644 index 000000000..18b77cdfb --- /dev/null +++ b/kuryr_kubernetes/handlers/base.py @@ -0,0 +1,30 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class EventHandler(object): + """Base class for event handlers.""" + + @abc.abstractmethod + def __call__(self, event): + """Handle the event.""" + raise NotImplementedError() + + def __str__(self): + return self.__class__.__name__ diff --git a/kuryr_kubernetes/handlers/dispatch.py b/kuryr_kubernetes/handlers/dispatch.py new file mode 100644 index 000000000..cb9f5b2f0 --- /dev/null +++ b/kuryr_kubernetes/handlers/dispatch.py @@ -0,0 +1,117 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import six + +from oslo_log import log as logging + +from kuryr_kubernetes.handlers import base as h_base +from kuryr_kubernetes.handlers import logging as h_log + + +LOG = logging.getLogger(__name__) + + +class Dispatcher(h_base.EventHandler): + """Dispatches events to registered handlers. + + Dispatcher serves as both multiplexer and filter for dispatching events + to multiple registered handlers based on the event content and + predicates provided during the handler registration. + """ + + def __init__(self): + self._registry = {} + + def register(self, key_fn, key, handler): + """Adds handler to the registry. + + `key_fn` and `key` constitute the `key_fn(event) == key` predicate + that determines if the `handler` should be called for a given `event`. + + :param key_fn: function that will be called for each event to + determine the event `key` + :param key: value to match against the result of `key_fn` function + that determines if the `handler` should be called for an + event + :param handler: `callable` object that would be called if the + conditions specified by `key_fn` and `key` are met + """ + key_group = self._registry.setdefault(key_fn, {}) + handlers = key_group.setdefault(key, []) + handlers.append(handler) + + def __call__(self, event): + handlers = set() + + for key_fn, key_group in self._registry.items(): + key = key_fn(event) + handlers.update(key_group.get(key, ())) + + LOG.debug("%s handler(s) available", len(handlers)) + for handler in handlers: + handler(event) + + +@six.add_metaclass(abc.ABCMeta) +class EventConsumer(h_base.EventHandler): + """Consumes events matching specified predicates. + + EventConsumer is an interface for all event handlers that are to be + registered by the `EventPipeline`. + """ + + @abc.abstractproperty + def consumes(self): + """Predicates determining events supported by this handler. + + :return: `dict` object containing {key_fn: key} predicates to be + used by `Dispatcher.register` + """ + raise NotImplementedError() + + +class EventPipeline(h_base.EventHandler): + """Serves as an entry-point for event handling. + + Implementing subclasses should override `_wrap_dispatcher` and/or + `_wrap_consumer` methods to sanitize the consumers passed to `register` + (i.e. to satisfy the `Watcher` requirement that the event handler does + not raise exceptions) and to add features like asynchronous event + processing or retry-on-failure functionality. + """ + + def __init__(self): + self._dispatcher = Dispatcher() + self._handler = self._wrap_dispatcher(self._dispatcher) + + def register(self, consumer): + """Adds handler to the registry. + + :param consumer: `EventConsumer`-type object + """ + handler = self._wrap_consumer(consumer) + for key_fn, key in consumer.consumes.items(): + self._dispatcher.register(key_fn, key, handler) + + def __call__(self, event): + self._handler(event) + + def _wrap_dispatcher(self, dispatcher): + return h_log.LogExceptions(dispatcher) + + def _wrap_consumer(self, consumer): + return h_log.LogExceptions(consumer) diff --git a/kuryr_kubernetes/handlers/k8s_base.py b/kuryr_kubernetes/handlers/k8s_base.py new file mode 100644 index 000000000..3d11ce270 --- /dev/null +++ b/kuryr_kubernetes/handlers/k8s_base.py @@ -0,0 +1,70 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kuryr_kubernetes.handlers import dispatch + + +def object_kind(event): + try: + return event['object']['kind'] + except KeyError: + return None + + +class ResourceEventHandler(dispatch.EventConsumer): + """Base class for K8s event handlers. + + Implementing classes should override the `OBJECT_KIND` attribute with a + valid Kubernetes object type name (e.g. 'Pod' or 'Namespace'; see [1] + for more details). + + Implementing classes are expected to override any or all of the + `on_added`, `on_present`, `on_modified`, `on_deleted` methods that would + be called depending on the type of the event (with K8s object as a single + argument). + + [1] https://github.com/kubernetes/kubernetes/blob/release-1.4/docs/devel\ + /api-conventions.md#types-kinds + """ + + OBJECT_KIND = None + + @property + def consumes(self): + return {object_kind: self.OBJECT_KIND} + + def __call__(self, event): + event_type = event.get('type') + obj = event.get('object') + if 'MODIFIED' == event_type: + self.on_modified(obj) + self.on_present(obj) + elif 'ADDED' == event_type: + self.on_added(obj) + self.on_present(obj) + elif 'DELETED' == event_type: + self.on_deleted(obj) + + def on_added(self, obj): + pass + + def on_present(self, obj): + pass + + def on_modified(self, obj): + pass + + def on_deleted(self, obj): + pass diff --git a/kuryr_kubernetes/handlers/logging.py b/kuryr_kubernetes/handlers/logging.py new file mode 100644 index 000000000..ff3ae7d27 --- /dev/null +++ b/kuryr_kubernetes/handlers/logging.py @@ -0,0 +1,40 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kuryr.lib._i18n import _LE +from oslo_log import log as logging + +from kuryr_kubernetes.handlers import base + +LOG = logging.getLogger(__name__) + + +class LogExceptions(base.EventHandler): + """Suppresses exceptions and sends them to log. + + LogExceptions wraps `handler` passed as an initialization parameter by + suppressing `exceptions` it raises and sending them to logging facility + instead. + """ + + def __init__(self, handler, exceptions=Exception): + self._handler = handler + self._exceptions = exceptions + + def __call__(self, event): + try: + self._handler(event) + except self._exceptions: + LOG.exception(_LE("Failed to handle event %s"), event) diff --git a/kuryr_kubernetes/tests/unit/handlers/__init__.py b/kuryr_kubernetes/tests/unit/handlers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kuryr_kubernetes/tests/unit/handlers/test_dispatch.py b/kuryr_kubernetes/tests/unit/handlers/test_dispatch.py new file mode 100644 index 000000000..fa388f1f9 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/handlers/test_dispatch.py @@ -0,0 +1,107 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from kuryr_kubernetes.handlers import dispatch as h_dis +from kuryr_kubernetes.tests import base as test_base + + +class TestDispatch(test_base.TestCase): + def test_dispatch(self): + events = list(range(3)) + handler = mock.Mock() + dispatcher = h_dis.Dispatcher() + dispatcher.register(lambda e: True, True, handler) + + for event in events: + dispatcher(event) + + handler.assert_has_calls([mock.call(e) for e in events]) + + def test_dispatch_broadcast(self): + handlers = [mock.Mock() for _ in range(3)] + dispatcher = h_dis.Dispatcher() + + for handler in handlers: + dispatcher.register(lambda e: True, True, handler) + + dispatcher(mock.sentinel.event) + + for handler in handlers: + handler.assert_called_once_with(mock.sentinel.event) + + def test_dispatch_by_key(self): + def key_fn(event): + return str(event) + + events = {key_fn(i): i for i in range(3)} + handlers = {key: mock.Mock() for key in events} + dispatcher = h_dis.Dispatcher() + for key, handler in handlers.items(): + dispatcher.register(key_fn, key, handler) + + for event in events.values(): + dispatcher(event) + + for key, handler in handlers.items(): + handler.assert_called_once_with(events[key]) + + +class TestEventPipeline(test_base.TestCase): + @mock.patch.object(h_dis.EventPipeline, '_wrap_dispatcher') + @mock.patch('kuryr_kubernetes.handlers.dispatch.Dispatcher') + def test_init(self, m_dispatcher_type, m_wrapper): + m_dispatcher_type.return_value = mock.sentinel.dispatcher + m_wrapper.return_value = mock.sentinel.handler + + pipeline = h_dis.EventPipeline() + + m_dispatcher_type.assert_called_once() + m_wrapper.assert_called_once_with(mock.sentinel.dispatcher) + self.assertEqual(mock.sentinel.dispatcher, pipeline._dispatcher) + self.assertEqual(mock.sentinel.handler, pipeline._handler) + + @mock.patch.object(h_dis.EventPipeline, '_wrap_consumer') + @mock.patch.object(h_dis.EventPipeline, '__init__') + def test_register(self, m_init, m_wrap_consumer): + consumes = {mock.sentinel.key_fn1: mock.sentinel.key1, + mock.sentinel.key_fn2: mock.sentinel.key2, + mock.sentinel.key_fn3: mock.sentinel.key3} + m_dispatcher = mock.Mock() + m_consumer = mock.Mock() + m_consumer.consumes = consumes + m_wrap_consumer.return_value = mock.sentinel.handler + m_init.return_value = None + pipeline = h_dis.EventPipeline() + pipeline._dispatcher = m_dispatcher + + pipeline.register(m_consumer) + + m_wrap_consumer.assert_called_once_with(m_consumer) + m_dispatcher.register.assert_has_calls([ + mock.call(key_fn, key, mock.sentinel.handler) + for key_fn, key in consumes.items()], any_order=True) + + @mock.patch.object(h_dis.EventPipeline, '__init__') + def test_call(self, m_init): + m_init.return_value = None + m_handler = mock.Mock() + pipeline = h_dis.EventPipeline() + pipeline._handler = m_handler + + pipeline(mock.sentinel.event) + + m_handler.assert_called_once_with(mock.sentinel.event) diff --git a/kuryr_kubernetes/tests/unit/handlers/test_k8s_base.py b/kuryr_kubernetes/tests/unit/handlers/test_k8s_base.py new file mode 100644 index 000000000..ccfd06dcb --- /dev/null +++ b/kuryr_kubernetes/tests/unit/handlers/test_k8s_base.py @@ -0,0 +1,64 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from kuryr_kubernetes.handlers import k8s_base as h_k8s +from kuryr_kubernetes.tests import base as test_base + + +class TestResourceEventHandler(test_base.TestCase): + + @mock.patch.object(h_k8s.ResourceEventHandler, 'on_added') + @mock.patch.object(h_k8s.ResourceEventHandler, 'on_present') + def test_added(self, m_added, m_present): + obj = mock.sentinel.obj + event = {'type': 'ADDED', 'object': obj} + handler = h_k8s.ResourceEventHandler() + + handler(event) + + m_added.assert_called_once_with(obj) + m_present.assert_called_once_with(obj) + + @mock.patch.object(h_k8s.ResourceEventHandler, 'on_modified') + @mock.patch.object(h_k8s.ResourceEventHandler, 'on_present') + def test_modified(self, m_modified, m_present): + obj = mock.sentinel.obj + event = {'type': 'MODIFIED', 'object': obj} + handler = h_k8s.ResourceEventHandler() + + handler(event) + + m_modified.assert_called_once_with(obj) + m_present.assert_called_once_with(obj) + + @mock.patch.object(h_k8s.ResourceEventHandler, 'on_deleted') + def test_deleted(self, m_deleted): + obj = mock.sentinel.obj + event = {'type': 'DELETED', 'object': obj} + handler = h_k8s.ResourceEventHandler() + + handler(event) + + m_deleted.assert_called_once_with(obj) + + def test_unknown(self): + event = {'type': 'UNKNOWN'} + handler = h_k8s.ResourceEventHandler() + + handler(event) + + self.assertTrue(True) diff --git a/kuryr_kubernetes/tests/unit/handlers/test_logging.py b/kuryr_kubernetes/tests/unit/handlers/test_logging.py new file mode 100644 index 000000000..34fc3aa56 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/handlers/test_logging.py @@ -0,0 +1,65 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from kuryr_kubernetes.handlers import logging as h_log +from kuryr_kubernetes.tests import base as test_base + + +class TestLoggingHandler(test_base.TestCase): + + @mock.patch('kuryr_kubernetes.handlers.logging.LOG') + def test_no_exception(self, m_log): + m_handler = mock.Mock() + handler = h_log.LogExceptions(m_handler) + + handler(mock.sentinel.event) + + m_handler.assert_called_once_with(mock.sentinel.event) + m_log.exception.assert_not_called() + + @mock.patch('kuryr_kubernetes.handlers.logging.LOG') + def test_exception(self, m_log): + m_handler = mock.Mock() + m_handler.side_effect = ValueError() + handler = h_log.LogExceptions(m_handler, exceptions=ValueError) + + handler(mock.sentinel.event) + + m_handler.assert_called_once_with(mock.sentinel.event) + m_log.exception.assert_called_once() + + @mock.patch('kuryr_kubernetes.handlers.logging.LOG') + def test_exception_default(self, m_log): + m_handler = mock.Mock() + m_handler.side_effect = ValueError() + handler = h_log.LogExceptions(m_handler) + + handler(mock.sentinel.event) + + m_handler.assert_called_once_with(mock.sentinel.event) + m_log.exception.assert_called_once() + + @mock.patch('kuryr_kubernetes.handlers.logging.LOG') + def test_raises(self, m_log): + m_handler = mock.Mock() + m_handler.side_effect = KeyError() + handler = h_log.LogExceptions(m_handler, exceptions=ValueError) + + self.assertRaises(KeyError, handler, mock.sentinel.event) + + m_handler.assert_called_once_with(mock.sentinel.event) + m_log.exception.assert_not_called()