diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py new file mode 100644 index 000000000..2488b10fb --- /dev/null +++ b/kuryr_kubernetes/constants.py @@ -0,0 +1,17 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +K8S_API_BASE = '/api/v1' +K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces' diff --git a/kuryr_kubernetes/controller/service.py b/kuryr_kubernetes/controller/service.py index 2089a6ce2..4c8be142d 100644 --- a/kuryr_kubernetes/controller/service.py +++ b/kuryr_kubernetes/controller/service.py @@ -21,6 +21,8 @@ from oslo_service import service from kuryr_kubernetes import clients from kuryr_kubernetes import config +from kuryr_kubernetes import constants +from kuryr_kubernetes import watcher LOG = logging.getLogger(__name__) @@ -31,9 +33,16 @@ class KuryrK8sService(service.Service): def __init__(self): super(KuryrK8sService, self).__init__() + def dummy_handler(event): + LOG.debug("Event: %r", event) + + self.watcher = watcher.Watcher(dummy_handler, self.tg) + self.watcher.add(constants.K8S_API_NAMESPACES) + def start(self): LOG.info(_LI("Service '%s' starting"), self.__class__.__name__) super(KuryrK8sService, self).start() + self.watcher.start() LOG.info(_LI("Service '%s' started"), self.__class__.__name__) def wait(self): @@ -42,6 +51,7 @@ class KuryrK8sService(service.Service): def stop(self, graceful=False): LOG.info(_LI("Service '%s' stopping"), self.__class__.__name__) + self.watcher.stop() super(KuryrK8sService, self).stop(graceful) diff --git a/kuryr_kubernetes/tests/unit/kuryr_fixtures.py b/kuryr_kubernetes/tests/unit/kuryr_fixtures.py new file mode 100644 index 000000000..00cc452c4 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/kuryr_fixtures.py @@ -0,0 +1,27 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import fixtures +import mock + +from kuryr_kubernetes import k8s_client + + +class MockK8sClient(fixtures.Fixture): + def _setUp(self): + self.client = mock.Mock(k8s_client.K8sClient) + self.useFixture(fixtures.MockPatch( + 'kuryr_kubernetes.clients.get_kubernetes_client', + lambda: self.client)) diff --git a/kuryr_kubernetes/tests/unit/test_watcher.py b/kuryr_kubernetes/tests/unit/test_watcher.py new file mode 100644 index 000000000..7be9d7dd3 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/test_watcher.py @@ -0,0 +1,291 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from eventlet import greenlet +import mock + +from kuryr_kubernetes.tests import base as test_base +from kuryr_kubernetes.tests.unit import kuryr_fixtures as kuryr_fixtures +from kuryr_kubernetes import watcher + + +class TestWatcher(test_base.TestCase): + def setUp(self): + super(TestWatcher, self).setUp() + mock_client = self.useFixture(kuryr_fixtures.MockK8sClient()) + self.client = mock_client.client + + @mock.patch.object(watcher.Watcher, '_start_watch') + def test_add(self, m_start_watch): + paths = ['/test%s' % i for i in range(3)] + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + + for path in paths: + watcher_obj.add(path) + + self.assertEqual(set(paths), watcher_obj._resources) + m_start_watch.assert_not_called() + + @mock.patch.object(watcher.Watcher, '_start_watch') + def test_add_running(self, m_start_watch): + paths = ['/test%s' % i for i in range(3)] + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._running = True + + for path in paths: + watcher_obj.add(path) + + self.assertEqual(set(paths), watcher_obj._resources) + m_start_watch.assert_has_calls([mock.call(path) for path in paths], + any_order=True) + + @mock.patch.object(watcher.Watcher, '_start_watch') + def test_add_watching(self, m_start_watch): + paths = ['/test%s' % i for i in range(3)] + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._running = True + m_watching = watcher_obj._watching = mock.MagicMock() + m_watching.__contains__.return_value = True + + for path in paths: + watcher_obj.add(path) + + self.assertEqual(set(paths), watcher_obj._resources) + m_start_watch.assert_not_called() + + @mock.patch.object(watcher.Watcher, '_stop_watch') + def test_remove(self, m_stop_watch): + path = '/test' + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._resources.add(path) + + watcher_obj.remove(path) + + self.assertEqual(set(), watcher_obj._resources) + m_stop_watch.assert_not_called() + + @mock.patch.object(watcher.Watcher, '_stop_watch') + def test_remove_watching(self, m_stop_watch): + path = '/test' + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._resources.add(path) + m_watching = watcher_obj._watching = mock.MagicMock() + m_watching.__contains__.return_value = True + + watcher_obj.remove(path) + + self.assertEqual(set(), watcher_obj._resources) + m_stop_watch.assert_called_once_with(path) + + @mock.patch.object(watcher.Watcher, '_start_watch') + def test_start(self, m_start_watch): + paths = ['/test%s' % i for i in range(3)] + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._resources.update(paths) + + watcher_obj.start() + + self.assertTrue(watcher_obj._running) + m_start_watch.assert_has_calls([mock.call(path) for path in paths], + any_order=True) + + @mock.patch.object(watcher.Watcher, '_start_watch') + def test_start_already_watching(self, m_start_watch): + paths = ['/test%s' % i for i in range(3)] + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._resources.update(paths) + m_watching = watcher_obj._watching = mock.MagicMock() + m_watching.__iter__.return_value = paths + + watcher_obj.start() + + self.assertTrue(watcher_obj._running) + m_start_watch.assert_not_called() + + @mock.patch.object(watcher.Watcher, '_stop_watch') + def test_stop(self, m_stop_watch): + paths = ['/test%s' % i for i in range(3)] + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._resources.update(paths) + + watcher_obj.stop() + + self.assertFalse(watcher_obj._running) + m_stop_watch.assert_not_called() + + @mock.patch.object(watcher.Watcher, '_stop_watch') + def test_stop_watching(self, m_stop_watch): + paths = ['/test%s' % i for i in range(3)] + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + watcher_obj._resources.update(paths) + m_watching = watcher_obj._watching = mock.MagicMock() + m_watching.__iter__.return_value = paths + + watcher_obj.stop() + + self.assertFalse(watcher_obj._running) + m_stop_watch.assert_has_calls([mock.call(path) for path in paths], + any_order=True) + + @mock.patch.object(watcher.Watcher, '_watch') + def test_start_watch(self, m_watch): + path = '/test' + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler) + + watcher_obj._start_watch(path) + + m_watch.assert_called_once_with(path) + self.assertTrue(watcher_obj._idle.get(path)) + self.assertIn(path, watcher_obj._watching) + + def test_start_watch_threaded(self): + path = '/test' + m_tg = mock.Mock() + m_tg.add_thread.return_value = mock.sentinel.watch_thread + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler, m_tg) + + watcher_obj._start_watch(path) + + m_tg.add_thread.assert_called_once_with(watcher_obj._watch, path) + self.assertTrue(watcher_obj._idle.get(path)) + self.assertEqual(mock.sentinel.watch_thread, + watcher_obj._watching.get(path)) + + def test_stop_watch_threaded(self): + path = '/test' + m_tg = mock.Mock() + m_th = mock.Mock() + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler, m_tg) + watcher_obj._idle[path] = True + watcher_obj._watching[path] = m_th + + watcher_obj._stop_watch(path) + + m_th.kill.assert_called() + + def test_stop_watch_idle(self): + path = '/test' + m_tg = mock.Mock() + m_th = mock.Mock() + m_handler = mock.Mock() + watcher_obj = watcher.Watcher(m_handler, m_tg) + watcher_obj._idle[path] = False + watcher_obj._watching[path] = m_th + + watcher_obj._stop_watch(path) + + m_th.kill.assert_not_called() + + def _test_watch_mock_events(self, watcher_obj, events): + def client_watch(client_path): + for e in events: + self.assertTrue(watcher_obj._idle[client_path]) + yield e + self.assertTrue(watcher_obj._idle[client_path]) + self.client.watch.side_effect = client_watch + + @staticmethod + def _test_watch_create_watcher(path, handler): + watcher_obj = watcher.Watcher(handler) + watcher_obj._running = True + watcher_obj._resources.add(path) + watcher_obj._idle[path] = True + watcher_obj._watching[path] = None + return watcher_obj + + def test_watch(self): + path = '/test' + events = [{'e': i} for i in range(3)] + + def handler(event): + self.assertFalse(watcher_obj._idle[path]) + + m_handler = mock.Mock() + m_handler.side_effect = handler + watcher_obj = self._test_watch_create_watcher(path, m_handler) + self._test_watch_mock_events(watcher_obj, events) + + watcher_obj._watch(path) + + m_handler.assert_has_calls([mock.call(e) for e in events]) + + def test_watch_stopped(self): + path = '/test' + events = [{'e': i} for i in range(3)] + + def handler(event): + self.assertFalse(watcher_obj._idle[path]) + watcher_obj._running = False + + m_handler = mock.Mock() + m_handler.side_effect = handler + watcher_obj = self._test_watch_create_watcher(path, m_handler) + self._test_watch_mock_events(watcher_obj, events) + + watcher_obj._watch(path) + + m_handler.assert_called_once_with(events[0]) + self.assertNotIn(path, watcher_obj._idle) + self.assertNotIn(path, watcher_obj._watching) + + def test_watch_removed(self): + path = '/test' + events = [{'e': i} for i in range(3)] + + def handler(event): + self.assertFalse(watcher_obj._idle[path]) + watcher_obj._resources.remove(path) + + m_handler = mock.Mock() + m_handler.side_effect = handler + watcher_obj = self._test_watch_create_watcher(path, m_handler) + self._test_watch_mock_events(watcher_obj, events) + + watcher_obj._watch(path) + + m_handler.assert_called_once_with(events[0]) + self.assertNotIn(path, watcher_obj._idle) + self.assertNotIn(path, watcher_obj._watching) + + def test_watch_interrupted(self): + path = '/test' + events = [{'e': i} for i in range(3)] + + def handler(event): + self.assertFalse(watcher_obj._idle[path]) + raise greenlet.GreenletExit() + + m_handler = mock.Mock() + m_handler.side_effect = handler + watcher_obj = self._test_watch_create_watcher(path, m_handler) + self._test_watch_mock_events(watcher_obj, events) + + self.assertRaises(greenlet.GreenletExit, watcher_obj._watch, path) + + m_handler.assert_called_once_with(events[0]) + self.assertNotIn(path, watcher_obj._idle) + self.assertNotIn(path, watcher_obj._watching) diff --git a/kuryr_kubernetes/watcher.py b/kuryr_kubernetes/watcher.py new file mode 100644 index 000000000..e93d23ca6 --- /dev/null +++ b/kuryr_kubernetes/watcher.py @@ -0,0 +1,148 @@ +# Copyright (c) 2016 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kuryr.lib._i18n import _LI +from oslo_log import log as logging + +from kuryr_kubernetes import clients + +LOG = logging.getLogger(__name__) + + +class Watcher(object): + """Observes K8s resources' events using K8s '?watch=true' API. + + The `Watcher` maintains a list of K8s resources and manages the event + processing loops for those resources. Event handling is delegated to the + `callable` object passed as the `handler` initialization parameter that + will be run for each K8s event observed by the `Watcher`. + + The `Watcher` can operate in two different modes based on the + `thread_group` initialization parameter: + + - synchronous, when the event processing loops run on the same thread + that called 'add' or 'start' methods + + - asynchronous, when each event processing loop runs on its own thread + (`oslo_service.threadgroup.Thread`) from the `thread_group` + + When started, the `Watcher` will run the event processing loops for each + of the K8s resources on the list. Adding a K8s resource to the running + `Watcher` also ensures that the event processing loop for that resource is + running. + + Stopping the `Watcher` or removing the specific K8s resource from the + list will request the corresponding running event processing loops to + stop gracefully, but will not interrupt any running `handler`. Forcibly + stopping any 'stuck' `handler` is not supported by the `Watcher` and + should be handled externally (e.g. by using `thread_group.stop( + graceful=False)` for asynchronous `Watcher`). + """ + + def __init__(self, handler, thread_group=None): + """Initializes a new Watcher instance. + + :param handler: a `callable` object to be invoked for each observed + K8s event with the event body as a single argument. + Calling `handler` should never raise any exceptions + other than `eventlet.greenlet.GreenletExit` caused by + `eventlet.greenthread.GreenThread.kill` when the + `Watcher` is operating in asynchronous mode. + :param thread_group: an `oslo_service.threadgroup.ThreadGroup` + object used to run the event processing loops + asynchronously. If `thread_group` is not + specified, the `Watcher` will operate in a + synchronous mode. + """ + self._client = clients.get_kubernetes_client() + self._handler = handler + self._thread_group = thread_group + self._running = False + + self._resources = set() + self._watching = {} + self._idle = {} + + def add(self, path): + """Adds ths K8s resource to the Watcher. + + Adding a resource to a running `Watcher` also ensures that the event + processing loop for that resource is running. This method could block + for `Watcher`s operating in synchronous mode. + + :param path: K8s resource URL path + """ + self._resources.add(path) + if self._running and path not in self._watching: + self._start_watch(path) + + def remove(self, path): + """Removes the K8s resource from the Watcher. + + Also requests the corresponding event processing loop to stop if it + is running. + + :param path: K8s resource URL path + """ + self._resources.discard(path) + if path in self._watching: + self._stop_watch(path) + + def start(self): + """Starts the Watcher. + + Also ensures that the event processing loops are running. This method + could block for `Watcher`s operating in synchronous mode. + """ + self._running = True + for path in self._resources - set(self._watching): + self._start_watch(path) + + def stop(self): + """Stops the Watcher. + + Also requests all running event processing loops to stop. + """ + self._running = False + for path in list(self._watching): + self._stop_watch(path) + + def _start_watch(self, path): + tg = self._thread_group + self._idle[path] = True + if tg: + self._watching[path] = tg.add_thread(self._watch, path) + else: + self._watching[path] = None + self._watch(path) + + def _stop_watch(self, path): + if self._idle.get(path): + if self._thread_group: + self._watching[path].kill() + + def _watch(self, path): + try: + LOG.info(_LI("Started watching '%s'"), path) + for event in self._client.watch(path): + self._idle[path] = False + self._handler(event) + self._idle[path] = True + if not (self._running and path in self._resources): + return + finally: + self._watching.pop(path) + self._idle.pop(path) + LOG.info(_LI("Stopped watching '%s'"), path)