Basic event handling pipeline components

This patch provides basic event handling pipeline components.
The EventPipeline class provided by this patch will serve as a base
class for ControllerEventPipeline and CNIEventPipeline classes that
are to be implemented by future patches.

Change-Id: I8c68941cbe323f80cd67341fe05656efd57bcdc5
Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
Ilya Chukhnakov 2016-10-13 22:28:27 +03:00
parent 1d9c6fa1e3
commit a3790b1555
11 changed files with 521 additions and 3 deletions

View File

@ -15,3 +15,8 @@
K8S_API_BASE = '/api/v1' K8S_API_BASE = '/api/v1'
K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces' K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces'
K8S_OBJ_NAMESPACE = 'Namespace'
K8S_OBJ_POD = 'Pod'
K8S_OBJ_SERVICE = 'Service'
K8S_OBJ_ENDPOINTS = 'Endpoints'

View File

@ -22,6 +22,8 @@ from oslo_service import service
from kuryr_kubernetes import clients from kuryr_kubernetes import clients
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes import constants from kuryr_kubernetes import constants
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes import watcher from kuryr_kubernetes import watcher
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -33,10 +35,28 @@ class KuryrK8sService(service.Service):
def __init__(self): def __init__(self):
super(KuryrK8sService, self).__init__() super(KuryrK8sService, self).__init__()
def dummy_handler(event): class DummyHandler(h_k8s.ResourceEventHandler):
LOG.debug("Event: %r", event) OBJECT_KIND = constants.K8S_OBJ_NAMESPACE
self.watcher = watcher.Watcher(dummy_handler, self.tg) def on_added(self, event):
LOG.debug("added: %s",
event['object']['metadata']['selfLink'])
def on_deleted(self, event):
LOG.debug("deleted: %s",
event['object']['metadata']['selfLink'])
def on_modified(self, event):
LOG.debug("modified: %s",
event['object']['metadata']['selfLink'])
def on_present(self, event):
LOG.debug("present: %s",
event['object']['metadata']['selfLink'])
pipeline = h_dis.EventPipeline()
pipeline.register(DummyHandler())
self.watcher = watcher.Watcher(pipeline, self.tg)
self.watcher.add(constants.K8S_API_NAMESPACES) self.watcher.add(constants.K8S_API_NAMESPACES)
def start(self): def start(self):

View File

View File

@ -0,0 +1,30 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class EventHandler(object):
"""Base class for event handlers."""
@abc.abstractmethod
def __call__(self, event):
"""Handle the event."""
raise NotImplementedError()
def __str__(self):
return self.__class__.__name__

View File

@ -0,0 +1,117 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from oslo_log import log as logging
from kuryr_kubernetes.handlers import base as h_base
from kuryr_kubernetes.handlers import logging as h_log
LOG = logging.getLogger(__name__)
class Dispatcher(h_base.EventHandler):
"""Dispatches events to registered handlers.
Dispatcher serves as both multiplexer and filter for dispatching events
to multiple registered handlers based on the event content and
predicates provided during the handler registration.
"""
def __init__(self):
self._registry = {}
def register(self, key_fn, key, handler):
"""Adds handler to the registry.
`key_fn` and `key` constitute the `key_fn(event) == key` predicate
that determines if the `handler` should be called for a given `event`.
:param key_fn: function that will be called for each event to
determine the event `key`
:param key: value to match against the result of `key_fn` function
that determines if the `handler` should be called for an
event
:param handler: `callable` object that would be called if the
conditions specified by `key_fn` and `key` are met
"""
key_group = self._registry.setdefault(key_fn, {})
handlers = key_group.setdefault(key, [])
handlers.append(handler)
def __call__(self, event):
handlers = set()
for key_fn, key_group in self._registry.items():
key = key_fn(event)
handlers.update(key_group.get(key, ()))
LOG.debug("%s handler(s) available", len(handlers))
for handler in handlers:
handler(event)
@six.add_metaclass(abc.ABCMeta)
class EventConsumer(h_base.EventHandler):
"""Consumes events matching specified predicates.
EventConsumer is an interface for all event handlers that are to be
registered by the `EventPipeline`.
"""
@abc.abstractproperty
def consumes(self):
"""Predicates determining events supported by this handler.
:return: `dict` object containing {key_fn: key} predicates to be
used by `Dispatcher.register`
"""
raise NotImplementedError()
class EventPipeline(h_base.EventHandler):
"""Serves as an entry-point for event handling.
Implementing subclasses should override `_wrap_dispatcher` and/or
`_wrap_consumer` methods to sanitize the consumers passed to `register`
(i.e. to satisfy the `Watcher` requirement that the event handler does
not raise exceptions) and to add features like asynchronous event
processing or retry-on-failure functionality.
"""
def __init__(self):
self._dispatcher = Dispatcher()
self._handler = self._wrap_dispatcher(self._dispatcher)
def register(self, consumer):
"""Adds handler to the registry.
:param consumer: `EventConsumer`-type object
"""
handler = self._wrap_consumer(consumer)
for key_fn, key in consumer.consumes.items():
self._dispatcher.register(key_fn, key, handler)
def __call__(self, event):
self._handler(event)
def _wrap_dispatcher(self, dispatcher):
return h_log.LogExceptions(dispatcher)
def _wrap_consumer(self, consumer):
return h_log.LogExceptions(consumer)

View File

@ -0,0 +1,70 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr_kubernetes.handlers import dispatch
def object_kind(event):
try:
return event['object']['kind']
except KeyError:
return None
class ResourceEventHandler(dispatch.EventConsumer):
"""Base class for K8s event handlers.
Implementing classes should override the `OBJECT_KIND` attribute with a
valid Kubernetes object type name (e.g. 'Pod' or 'Namespace'; see [1]
for more details).
Implementing classes are expected to override any or all of the
`on_added`, `on_present`, `on_modified`, `on_deleted` methods that would
be called depending on the type of the event (with K8s object as a single
argument).
[1] https://github.com/kubernetes/kubernetes/blob/release-1.4/docs/devel\
/api-conventions.md#types-kinds
"""
OBJECT_KIND = None
@property
def consumes(self):
return {object_kind: self.OBJECT_KIND}
def __call__(self, event):
event_type = event.get('type')
obj = event.get('object')
if 'MODIFIED' == event_type:
self.on_modified(obj)
self.on_present(obj)
elif 'ADDED' == event_type:
self.on_added(obj)
self.on_present(obj)
elif 'DELETED' == event_type:
self.on_deleted(obj)
def on_added(self, obj):
pass
def on_present(self, obj):
pass
def on_modified(self, obj):
pass
def on_deleted(self, obj):
pass

View File

@ -0,0 +1,40 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr.lib._i18n import _LE
from oslo_log import log as logging
from kuryr_kubernetes.handlers import base
LOG = logging.getLogger(__name__)
class LogExceptions(base.EventHandler):
"""Suppresses exceptions and sends them to log.
LogExceptions wraps `handler` passed as an initialization parameter by
suppressing `exceptions` it raises and sending them to logging facility
instead.
"""
def __init__(self, handler, exceptions=Exception):
self._handler = handler
self._exceptions = exceptions
def __call__(self, event):
try:
self._handler(event)
except self._exceptions:
LOG.exception(_LE("Failed to handle event %s"), event)

View File

@ -0,0 +1,107 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kuryr_kubernetes.handlers import dispatch as h_dis
from kuryr_kubernetes.tests import base as test_base
class TestDispatch(test_base.TestCase):
def test_dispatch(self):
events = list(range(3))
handler = mock.Mock()
dispatcher = h_dis.Dispatcher()
dispatcher.register(lambda e: True, True, handler)
for event in events:
dispatcher(event)
handler.assert_has_calls([mock.call(e) for e in events])
def test_dispatch_broadcast(self):
handlers = [mock.Mock() for _ in range(3)]
dispatcher = h_dis.Dispatcher()
for handler in handlers:
dispatcher.register(lambda e: True, True, handler)
dispatcher(mock.sentinel.event)
for handler in handlers:
handler.assert_called_once_with(mock.sentinel.event)
def test_dispatch_by_key(self):
def key_fn(event):
return str(event)
events = {key_fn(i): i for i in range(3)}
handlers = {key: mock.Mock() for key in events}
dispatcher = h_dis.Dispatcher()
for key, handler in handlers.items():
dispatcher.register(key_fn, key, handler)
for event in events.values():
dispatcher(event)
for key, handler in handlers.items():
handler.assert_called_once_with(events[key])
class TestEventPipeline(test_base.TestCase):
@mock.patch.object(h_dis.EventPipeline, '_wrap_dispatcher')
@mock.patch('kuryr_kubernetes.handlers.dispatch.Dispatcher')
def test_init(self, m_dispatcher_type, m_wrapper):
m_dispatcher_type.return_value = mock.sentinel.dispatcher
m_wrapper.return_value = mock.sentinel.handler
pipeline = h_dis.EventPipeline()
m_dispatcher_type.assert_called_once()
m_wrapper.assert_called_once_with(mock.sentinel.dispatcher)
self.assertEqual(mock.sentinel.dispatcher, pipeline._dispatcher)
self.assertEqual(mock.sentinel.handler, pipeline._handler)
@mock.patch.object(h_dis.EventPipeline, '_wrap_consumer')
@mock.patch.object(h_dis.EventPipeline, '__init__')
def test_register(self, m_init, m_wrap_consumer):
consumes = {mock.sentinel.key_fn1: mock.sentinel.key1,
mock.sentinel.key_fn2: mock.sentinel.key2,
mock.sentinel.key_fn3: mock.sentinel.key3}
m_dispatcher = mock.Mock()
m_consumer = mock.Mock()
m_consumer.consumes = consumes
m_wrap_consumer.return_value = mock.sentinel.handler
m_init.return_value = None
pipeline = h_dis.EventPipeline()
pipeline._dispatcher = m_dispatcher
pipeline.register(m_consumer)
m_wrap_consumer.assert_called_once_with(m_consumer)
m_dispatcher.register.assert_has_calls([
mock.call(key_fn, key, mock.sentinel.handler)
for key_fn, key in consumes.items()], any_order=True)
@mock.patch.object(h_dis.EventPipeline, '__init__')
def test_call(self, m_init):
m_init.return_value = None
m_handler = mock.Mock()
pipeline = h_dis.EventPipeline()
pipeline._handler = m_handler
pipeline(mock.sentinel.event)
m_handler.assert_called_once_with(mock.sentinel.event)

View File

@ -0,0 +1,64 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kuryr_kubernetes.handlers import k8s_base as h_k8s
from kuryr_kubernetes.tests import base as test_base
class TestResourceEventHandler(test_base.TestCase):
@mock.patch.object(h_k8s.ResourceEventHandler, 'on_added')
@mock.patch.object(h_k8s.ResourceEventHandler, 'on_present')
def test_added(self, m_added, m_present):
obj = mock.sentinel.obj
event = {'type': 'ADDED', 'object': obj}
handler = h_k8s.ResourceEventHandler()
handler(event)
m_added.assert_called_once_with(obj)
m_present.assert_called_once_with(obj)
@mock.patch.object(h_k8s.ResourceEventHandler, 'on_modified')
@mock.patch.object(h_k8s.ResourceEventHandler, 'on_present')
def test_modified(self, m_modified, m_present):
obj = mock.sentinel.obj
event = {'type': 'MODIFIED', 'object': obj}
handler = h_k8s.ResourceEventHandler()
handler(event)
m_modified.assert_called_once_with(obj)
m_present.assert_called_once_with(obj)
@mock.patch.object(h_k8s.ResourceEventHandler, 'on_deleted')
def test_deleted(self, m_deleted):
obj = mock.sentinel.obj
event = {'type': 'DELETED', 'object': obj}
handler = h_k8s.ResourceEventHandler()
handler(event)
m_deleted.assert_called_once_with(obj)
def test_unknown(self):
event = {'type': 'UNKNOWN'}
handler = h_k8s.ResourceEventHandler()
handler(event)
self.assertTrue(True)

View File

@ -0,0 +1,65 @@
# Copyright (c) 2016 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from kuryr_kubernetes.handlers import logging as h_log
from kuryr_kubernetes.tests import base as test_base
class TestLoggingHandler(test_base.TestCase):
@mock.patch('kuryr_kubernetes.handlers.logging.LOG')
def test_no_exception(self, m_log):
m_handler = mock.Mock()
handler = h_log.LogExceptions(m_handler)
handler(mock.sentinel.event)
m_handler.assert_called_once_with(mock.sentinel.event)
m_log.exception.assert_not_called()
@mock.patch('kuryr_kubernetes.handlers.logging.LOG')
def test_exception(self, m_log):
m_handler = mock.Mock()
m_handler.side_effect = ValueError()
handler = h_log.LogExceptions(m_handler, exceptions=ValueError)
handler(mock.sentinel.event)
m_handler.assert_called_once_with(mock.sentinel.event)
m_log.exception.assert_called_once()
@mock.patch('kuryr_kubernetes.handlers.logging.LOG')
def test_exception_default(self, m_log):
m_handler = mock.Mock()
m_handler.side_effect = ValueError()
handler = h_log.LogExceptions(m_handler)
handler(mock.sentinel.event)
m_handler.assert_called_once_with(mock.sentinel.event)
m_log.exception.assert_called_once()
@mock.patch('kuryr_kubernetes.handlers.logging.LOG')
def test_raises(self, m_log):
m_handler = mock.Mock()
m_handler.side_effect = KeyError()
handler = h_log.LogExceptions(m_handler, exceptions=ValueError)
self.assertRaises(KeyError, handler, mock.sentinel.event)
m_handler.assert_called_once_with(mock.sentinel.event)
m_log.exception.assert_not_called()