Merge "Watcher implementation"
This commit is contained in:
commit
1d9c6fa1e3
17
kuryr_kubernetes/constants.py
Normal file
17
kuryr_kubernetes/constants.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
# Copyright (c) 2016 Mirantis, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
K8S_API_BASE = '/api/v1'
|
||||||
|
K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces'
|
@ -21,6 +21,8 @@ from oslo_service import service
|
|||||||
|
|
||||||
from kuryr_kubernetes import clients
|
from kuryr_kubernetes import clients
|
||||||
from kuryr_kubernetes import config
|
from kuryr_kubernetes import config
|
||||||
|
from kuryr_kubernetes import constants
|
||||||
|
from kuryr_kubernetes import watcher
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -31,9 +33,16 @@ class KuryrK8sService(service.Service):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(KuryrK8sService, self).__init__()
|
super(KuryrK8sService, self).__init__()
|
||||||
|
|
||||||
|
def dummy_handler(event):
|
||||||
|
LOG.debug("Event: %r", event)
|
||||||
|
|
||||||
|
self.watcher = watcher.Watcher(dummy_handler, self.tg)
|
||||||
|
self.watcher.add(constants.K8S_API_NAMESPACES)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
LOG.info(_LI("Service '%s' starting"), self.__class__.__name__)
|
LOG.info(_LI("Service '%s' starting"), self.__class__.__name__)
|
||||||
super(KuryrK8sService, self).start()
|
super(KuryrK8sService, self).start()
|
||||||
|
self.watcher.start()
|
||||||
LOG.info(_LI("Service '%s' started"), self.__class__.__name__)
|
LOG.info(_LI("Service '%s' started"), self.__class__.__name__)
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
@ -42,6 +51,7 @@ class KuryrK8sService(service.Service):
|
|||||||
|
|
||||||
def stop(self, graceful=False):
|
def stop(self, graceful=False):
|
||||||
LOG.info(_LI("Service '%s' stopping"), self.__class__.__name__)
|
LOG.info(_LI("Service '%s' stopping"), self.__class__.__name__)
|
||||||
|
self.watcher.stop()
|
||||||
super(KuryrK8sService, self).stop(graceful)
|
super(KuryrK8sService, self).stop(graceful)
|
||||||
|
|
||||||
|
|
||||||
|
27
kuryr_kubernetes/tests/unit/kuryr_fixtures.py
Normal file
27
kuryr_kubernetes/tests/unit/kuryr_fixtures.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
# Copyright (c) 2016 Mirantis, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import fixtures
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from kuryr_kubernetes import k8s_client
|
||||||
|
|
||||||
|
|
||||||
|
class MockK8sClient(fixtures.Fixture):
|
||||||
|
def _setUp(self):
|
||||||
|
self.client = mock.Mock(k8s_client.K8sClient)
|
||||||
|
self.useFixture(fixtures.MockPatch(
|
||||||
|
'kuryr_kubernetes.clients.get_kubernetes_client',
|
||||||
|
lambda: self.client))
|
291
kuryr_kubernetes/tests/unit/test_watcher.py
Normal file
291
kuryr_kubernetes/tests/unit/test_watcher.py
Normal file
@ -0,0 +1,291 @@
|
|||||||
|
# Copyright (c) 2016 Mirantis, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from eventlet import greenlet
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from kuryr_kubernetes.tests import base as test_base
|
||||||
|
from kuryr_kubernetes.tests.unit import kuryr_fixtures as kuryr_fixtures
|
||||||
|
from kuryr_kubernetes import watcher
|
||||||
|
|
||||||
|
|
||||||
|
class TestWatcher(test_base.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestWatcher, self).setUp()
|
||||||
|
mock_client = self.useFixture(kuryr_fixtures.MockK8sClient())
|
||||||
|
self.client = mock_client.client
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_start_watch')
|
||||||
|
def test_add(self, m_start_watch):
|
||||||
|
paths = ['/test%s' % i for i in range(3)]
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
|
||||||
|
for path in paths:
|
||||||
|
watcher_obj.add(path)
|
||||||
|
|
||||||
|
self.assertEqual(set(paths), watcher_obj._resources)
|
||||||
|
m_start_watch.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_start_watch')
|
||||||
|
def test_add_running(self, m_start_watch):
|
||||||
|
paths = ['/test%s' % i for i in range(3)]
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._running = True
|
||||||
|
|
||||||
|
for path in paths:
|
||||||
|
watcher_obj.add(path)
|
||||||
|
|
||||||
|
self.assertEqual(set(paths), watcher_obj._resources)
|
||||||
|
m_start_watch.assert_has_calls([mock.call(path) for path in paths],
|
||||||
|
any_order=True)
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_start_watch')
|
||||||
|
def test_add_watching(self, m_start_watch):
|
||||||
|
paths = ['/test%s' % i for i in range(3)]
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._running = True
|
||||||
|
m_watching = watcher_obj._watching = mock.MagicMock()
|
||||||
|
m_watching.__contains__.return_value = True
|
||||||
|
|
||||||
|
for path in paths:
|
||||||
|
watcher_obj.add(path)
|
||||||
|
|
||||||
|
self.assertEqual(set(paths), watcher_obj._resources)
|
||||||
|
m_start_watch.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_stop_watch')
|
||||||
|
def test_remove(self, m_stop_watch):
|
||||||
|
path = '/test'
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._resources.add(path)
|
||||||
|
|
||||||
|
watcher_obj.remove(path)
|
||||||
|
|
||||||
|
self.assertEqual(set(), watcher_obj._resources)
|
||||||
|
m_stop_watch.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_stop_watch')
|
||||||
|
def test_remove_watching(self, m_stop_watch):
|
||||||
|
path = '/test'
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._resources.add(path)
|
||||||
|
m_watching = watcher_obj._watching = mock.MagicMock()
|
||||||
|
m_watching.__contains__.return_value = True
|
||||||
|
|
||||||
|
watcher_obj.remove(path)
|
||||||
|
|
||||||
|
self.assertEqual(set(), watcher_obj._resources)
|
||||||
|
m_stop_watch.assert_called_once_with(path)
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_start_watch')
|
||||||
|
def test_start(self, m_start_watch):
|
||||||
|
paths = ['/test%s' % i for i in range(3)]
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._resources.update(paths)
|
||||||
|
|
||||||
|
watcher_obj.start()
|
||||||
|
|
||||||
|
self.assertTrue(watcher_obj._running)
|
||||||
|
m_start_watch.assert_has_calls([mock.call(path) for path in paths],
|
||||||
|
any_order=True)
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_start_watch')
|
||||||
|
def test_start_already_watching(self, m_start_watch):
|
||||||
|
paths = ['/test%s' % i for i in range(3)]
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._resources.update(paths)
|
||||||
|
m_watching = watcher_obj._watching = mock.MagicMock()
|
||||||
|
m_watching.__iter__.return_value = paths
|
||||||
|
|
||||||
|
watcher_obj.start()
|
||||||
|
|
||||||
|
self.assertTrue(watcher_obj._running)
|
||||||
|
m_start_watch.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_stop_watch')
|
||||||
|
def test_stop(self, m_stop_watch):
|
||||||
|
paths = ['/test%s' % i for i in range(3)]
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._resources.update(paths)
|
||||||
|
|
||||||
|
watcher_obj.stop()
|
||||||
|
|
||||||
|
self.assertFalse(watcher_obj._running)
|
||||||
|
m_stop_watch.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_stop_watch')
|
||||||
|
def test_stop_watching(self, m_stop_watch):
|
||||||
|
paths = ['/test%s' % i for i in range(3)]
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
watcher_obj._resources.update(paths)
|
||||||
|
m_watching = watcher_obj._watching = mock.MagicMock()
|
||||||
|
m_watching.__iter__.return_value = paths
|
||||||
|
|
||||||
|
watcher_obj.stop()
|
||||||
|
|
||||||
|
self.assertFalse(watcher_obj._running)
|
||||||
|
m_stop_watch.assert_has_calls([mock.call(path) for path in paths],
|
||||||
|
any_order=True)
|
||||||
|
|
||||||
|
@mock.patch.object(watcher.Watcher, '_watch')
|
||||||
|
def test_start_watch(self, m_watch):
|
||||||
|
path = '/test'
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler)
|
||||||
|
|
||||||
|
watcher_obj._start_watch(path)
|
||||||
|
|
||||||
|
m_watch.assert_called_once_with(path)
|
||||||
|
self.assertTrue(watcher_obj._idle.get(path))
|
||||||
|
self.assertIn(path, watcher_obj._watching)
|
||||||
|
|
||||||
|
def test_start_watch_threaded(self):
|
||||||
|
path = '/test'
|
||||||
|
m_tg = mock.Mock()
|
||||||
|
m_tg.add_thread.return_value = mock.sentinel.watch_thread
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler, m_tg)
|
||||||
|
|
||||||
|
watcher_obj._start_watch(path)
|
||||||
|
|
||||||
|
m_tg.add_thread.assert_called_once_with(watcher_obj._watch, path)
|
||||||
|
self.assertTrue(watcher_obj._idle.get(path))
|
||||||
|
self.assertEqual(mock.sentinel.watch_thread,
|
||||||
|
watcher_obj._watching.get(path))
|
||||||
|
|
||||||
|
def test_stop_watch_threaded(self):
|
||||||
|
path = '/test'
|
||||||
|
m_tg = mock.Mock()
|
||||||
|
m_th = mock.Mock()
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler, m_tg)
|
||||||
|
watcher_obj._idle[path] = True
|
||||||
|
watcher_obj._watching[path] = m_th
|
||||||
|
|
||||||
|
watcher_obj._stop_watch(path)
|
||||||
|
|
||||||
|
m_th.kill.assert_called()
|
||||||
|
|
||||||
|
def test_stop_watch_idle(self):
|
||||||
|
path = '/test'
|
||||||
|
m_tg = mock.Mock()
|
||||||
|
m_th = mock.Mock()
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
watcher_obj = watcher.Watcher(m_handler, m_tg)
|
||||||
|
watcher_obj._idle[path] = False
|
||||||
|
watcher_obj._watching[path] = m_th
|
||||||
|
|
||||||
|
watcher_obj._stop_watch(path)
|
||||||
|
|
||||||
|
m_th.kill.assert_not_called()
|
||||||
|
|
||||||
|
def _test_watch_mock_events(self, watcher_obj, events):
|
||||||
|
def client_watch(client_path):
|
||||||
|
for e in events:
|
||||||
|
self.assertTrue(watcher_obj._idle[client_path])
|
||||||
|
yield e
|
||||||
|
self.assertTrue(watcher_obj._idle[client_path])
|
||||||
|
self.client.watch.side_effect = client_watch
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _test_watch_create_watcher(path, handler):
|
||||||
|
watcher_obj = watcher.Watcher(handler)
|
||||||
|
watcher_obj._running = True
|
||||||
|
watcher_obj._resources.add(path)
|
||||||
|
watcher_obj._idle[path] = True
|
||||||
|
watcher_obj._watching[path] = None
|
||||||
|
return watcher_obj
|
||||||
|
|
||||||
|
def test_watch(self):
|
||||||
|
path = '/test'
|
||||||
|
events = [{'e': i} for i in range(3)]
|
||||||
|
|
||||||
|
def handler(event):
|
||||||
|
self.assertFalse(watcher_obj._idle[path])
|
||||||
|
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler.side_effect = handler
|
||||||
|
watcher_obj = self._test_watch_create_watcher(path, m_handler)
|
||||||
|
self._test_watch_mock_events(watcher_obj, events)
|
||||||
|
|
||||||
|
watcher_obj._watch(path)
|
||||||
|
|
||||||
|
m_handler.assert_has_calls([mock.call(e) for e in events])
|
||||||
|
|
||||||
|
def test_watch_stopped(self):
|
||||||
|
path = '/test'
|
||||||
|
events = [{'e': i} for i in range(3)]
|
||||||
|
|
||||||
|
def handler(event):
|
||||||
|
self.assertFalse(watcher_obj._idle[path])
|
||||||
|
watcher_obj._running = False
|
||||||
|
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler.side_effect = handler
|
||||||
|
watcher_obj = self._test_watch_create_watcher(path, m_handler)
|
||||||
|
self._test_watch_mock_events(watcher_obj, events)
|
||||||
|
|
||||||
|
watcher_obj._watch(path)
|
||||||
|
|
||||||
|
m_handler.assert_called_once_with(events[0])
|
||||||
|
self.assertNotIn(path, watcher_obj._idle)
|
||||||
|
self.assertNotIn(path, watcher_obj._watching)
|
||||||
|
|
||||||
|
def test_watch_removed(self):
|
||||||
|
path = '/test'
|
||||||
|
events = [{'e': i} for i in range(3)]
|
||||||
|
|
||||||
|
def handler(event):
|
||||||
|
self.assertFalse(watcher_obj._idle[path])
|
||||||
|
watcher_obj._resources.remove(path)
|
||||||
|
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler.side_effect = handler
|
||||||
|
watcher_obj = self._test_watch_create_watcher(path, m_handler)
|
||||||
|
self._test_watch_mock_events(watcher_obj, events)
|
||||||
|
|
||||||
|
watcher_obj._watch(path)
|
||||||
|
|
||||||
|
m_handler.assert_called_once_with(events[0])
|
||||||
|
self.assertNotIn(path, watcher_obj._idle)
|
||||||
|
self.assertNotIn(path, watcher_obj._watching)
|
||||||
|
|
||||||
|
def test_watch_interrupted(self):
|
||||||
|
path = '/test'
|
||||||
|
events = [{'e': i} for i in range(3)]
|
||||||
|
|
||||||
|
def handler(event):
|
||||||
|
self.assertFalse(watcher_obj._idle[path])
|
||||||
|
raise greenlet.GreenletExit()
|
||||||
|
|
||||||
|
m_handler = mock.Mock()
|
||||||
|
m_handler.side_effect = handler
|
||||||
|
watcher_obj = self._test_watch_create_watcher(path, m_handler)
|
||||||
|
self._test_watch_mock_events(watcher_obj, events)
|
||||||
|
|
||||||
|
self.assertRaises(greenlet.GreenletExit, watcher_obj._watch, path)
|
||||||
|
|
||||||
|
m_handler.assert_called_once_with(events[0])
|
||||||
|
self.assertNotIn(path, watcher_obj._idle)
|
||||||
|
self.assertNotIn(path, watcher_obj._watching)
|
148
kuryr_kubernetes/watcher.py
Normal file
148
kuryr_kubernetes/watcher.py
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
# Copyright (c) 2016 Mirantis, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from kuryr.lib._i18n import _LI
|
||||||
|
from oslo_log import log as logging
|
||||||
|
|
||||||
|
from kuryr_kubernetes import clients
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Watcher(object):
|
||||||
|
"""Observes K8s resources' events using K8s '?watch=true' API.
|
||||||
|
|
||||||
|
The `Watcher` maintains a list of K8s resources and manages the event
|
||||||
|
processing loops for those resources. Event handling is delegated to the
|
||||||
|
`callable` object passed as the `handler` initialization parameter that
|
||||||
|
will be run for each K8s event observed by the `Watcher`.
|
||||||
|
|
||||||
|
The `Watcher` can operate in two different modes based on the
|
||||||
|
`thread_group` initialization parameter:
|
||||||
|
|
||||||
|
- synchronous, when the event processing loops run on the same thread
|
||||||
|
that called 'add' or 'start' methods
|
||||||
|
|
||||||
|
- asynchronous, when each event processing loop runs on its own thread
|
||||||
|
(`oslo_service.threadgroup.Thread`) from the `thread_group`
|
||||||
|
|
||||||
|
When started, the `Watcher` will run the event processing loops for each
|
||||||
|
of the K8s resources on the list. Adding a K8s resource to the running
|
||||||
|
`Watcher` also ensures that the event processing loop for that resource is
|
||||||
|
running.
|
||||||
|
|
||||||
|
Stopping the `Watcher` or removing the specific K8s resource from the
|
||||||
|
list will request the corresponding running event processing loops to
|
||||||
|
stop gracefully, but will not interrupt any running `handler`. Forcibly
|
||||||
|
stopping any 'stuck' `handler` is not supported by the `Watcher` and
|
||||||
|
should be handled externally (e.g. by using `thread_group.stop(
|
||||||
|
graceful=False)` for asynchronous `Watcher`).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, handler, thread_group=None):
|
||||||
|
"""Initializes a new Watcher instance.
|
||||||
|
|
||||||
|
:param handler: a `callable` object to be invoked for each observed
|
||||||
|
K8s event with the event body as a single argument.
|
||||||
|
Calling `handler` should never raise any exceptions
|
||||||
|
other than `eventlet.greenlet.GreenletExit` caused by
|
||||||
|
`eventlet.greenthread.GreenThread.kill` when the
|
||||||
|
`Watcher` is operating in asynchronous mode.
|
||||||
|
:param thread_group: an `oslo_service.threadgroup.ThreadGroup`
|
||||||
|
object used to run the event processing loops
|
||||||
|
asynchronously. If `thread_group` is not
|
||||||
|
specified, the `Watcher` will operate in a
|
||||||
|
synchronous mode.
|
||||||
|
"""
|
||||||
|
self._client = clients.get_kubernetes_client()
|
||||||
|
self._handler = handler
|
||||||
|
self._thread_group = thread_group
|
||||||
|
self._running = False
|
||||||
|
|
||||||
|
self._resources = set()
|
||||||
|
self._watching = {}
|
||||||
|
self._idle = {}
|
||||||
|
|
||||||
|
def add(self, path):
|
||||||
|
"""Adds ths K8s resource to the Watcher.
|
||||||
|
|
||||||
|
Adding a resource to a running `Watcher` also ensures that the event
|
||||||
|
processing loop for that resource is running. This method could block
|
||||||
|
for `Watcher`s operating in synchronous mode.
|
||||||
|
|
||||||
|
:param path: K8s resource URL path
|
||||||
|
"""
|
||||||
|
self._resources.add(path)
|
||||||
|
if self._running and path not in self._watching:
|
||||||
|
self._start_watch(path)
|
||||||
|
|
||||||
|
def remove(self, path):
|
||||||
|
"""Removes the K8s resource from the Watcher.
|
||||||
|
|
||||||
|
Also requests the corresponding event processing loop to stop if it
|
||||||
|
is running.
|
||||||
|
|
||||||
|
:param path: K8s resource URL path
|
||||||
|
"""
|
||||||
|
self._resources.discard(path)
|
||||||
|
if path in self._watching:
|
||||||
|
self._stop_watch(path)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Starts the Watcher.
|
||||||
|
|
||||||
|
Also ensures that the event processing loops are running. This method
|
||||||
|
could block for `Watcher`s operating in synchronous mode.
|
||||||
|
"""
|
||||||
|
self._running = True
|
||||||
|
for path in self._resources - set(self._watching):
|
||||||
|
self._start_watch(path)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stops the Watcher.
|
||||||
|
|
||||||
|
Also requests all running event processing loops to stop.
|
||||||
|
"""
|
||||||
|
self._running = False
|
||||||
|
for path in list(self._watching):
|
||||||
|
self._stop_watch(path)
|
||||||
|
|
||||||
|
def _start_watch(self, path):
|
||||||
|
tg = self._thread_group
|
||||||
|
self._idle[path] = True
|
||||||
|
if tg:
|
||||||
|
self._watching[path] = tg.add_thread(self._watch, path)
|
||||||
|
else:
|
||||||
|
self._watching[path] = None
|
||||||
|
self._watch(path)
|
||||||
|
|
||||||
|
def _stop_watch(self, path):
|
||||||
|
if self._idle.get(path):
|
||||||
|
if self._thread_group:
|
||||||
|
self._watching[path].kill()
|
||||||
|
|
||||||
|
def _watch(self, path):
|
||||||
|
try:
|
||||||
|
LOG.info(_LI("Started watching '%s'"), path)
|
||||||
|
for event in self._client.watch(path):
|
||||||
|
self._idle[path] = False
|
||||||
|
self._handler(event)
|
||||||
|
self._idle[path] = True
|
||||||
|
if not (self._running and path in self._resources):
|
||||||
|
return
|
||||||
|
finally:
|
||||||
|
self._watching.pop(path)
|
||||||
|
self._idle.pop(path)
|
||||||
|
LOG.info(_LI("Stopped watching '%s'"), path)
|
Loading…
x
Reference in New Issue
Block a user