Merge "process to gracefully exit when last watcher exits"
This commit is contained in:
commit
5d4632ee69
@ -190,7 +190,7 @@ class CNIDaemonWatcherService(cotyledon.Service):
|
||||
self.pipeline = h_cni.CNIPipeline()
|
||||
self.pipeline.register(h_cni.CallbackHandler(self.on_done,
|
||||
self.on_deleted))
|
||||
self.watcher = k_watcher.Watcher(self.pipeline)
|
||||
self.watcher = k_watcher.Watcher(self.pipeline, exit_on_stop=True)
|
||||
self.watcher.add(
|
||||
"%(base)s/pods?fieldSelector=spec.nodeName=%(node_name)s" % {
|
||||
'base': k_const.K8S_API_BASE,
|
||||
|
@ -83,7 +83,7 @@ class KuryrK8sService(six.with_metaclass(KuryrK8sServiceMeta,
|
||||
|
||||
objects.register_locally_defined_vifs()
|
||||
pipeline = h_pipeline.ControllerPipeline(self.tg)
|
||||
self.watcher = watcher.Watcher(pipeline, self.tg)
|
||||
self.watcher = watcher.Watcher(pipeline, self.tg, exit_on_stop=True)
|
||||
self.health_manager = health.HealthServer()
|
||||
self.current_leader = None
|
||||
self.node_name = utils.get_node_name()
|
||||
|
@ -211,14 +211,16 @@ class TestWatcher(test_base.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def _test_watch_create_watcher(path, handler, timeout=0):
|
||||
watcher_obj = watcher.Watcher(handler, timeout=timeout)
|
||||
watcher_obj = watcher.Watcher(handler, timeout=timeout,
|
||||
exit_on_stop=True)
|
||||
watcher_obj._running = True
|
||||
watcher_obj._resources.add(path)
|
||||
watcher_obj._idle[path] = True
|
||||
watcher_obj._watching[path] = None
|
||||
return watcher_obj
|
||||
|
||||
def test_watch(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
@ -234,8 +236,12 @@ class TestWatcher(test_base.TestCase):
|
||||
|
||||
self.assertEqual(0, watcher_obj._timeout)
|
||||
m_handler.assert_has_calls([mock.call(e) for e in events])
|
||||
# After all events have been "handled", since there is only
|
||||
# one handler, we'll gracefully exit
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_stopped(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_stopped(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
@ -253,8 +259,10 @@ class TestWatcher(test_base.TestCase):
|
||||
m_handler.assert_called_once_with(events[0])
|
||||
self.assertNotIn(path, watcher_obj._idle)
|
||||
self.assertNotIn(path, watcher_obj._watching)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_removed(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_removed(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
@ -272,8 +280,10 @@ class TestWatcher(test_base.TestCase):
|
||||
m_handler.assert_called_once_with(events[0])
|
||||
self.assertNotIn(path, watcher_obj._idle)
|
||||
self.assertNotIn(path, watcher_obj._watching)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_interrupted(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_interrupted(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
|
||||
@ -291,8 +301,10 @@ class TestWatcher(test_base.TestCase):
|
||||
m_handler.assert_called_once_with(events[0])
|
||||
self.assertNotIn(path, watcher_obj._idle)
|
||||
self.assertNotIn(path, watcher_obj._watching)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_client_request_failed(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_client_request_failed(self, m_sys_exit):
|
||||
path = '/test'
|
||||
m_handler = mock.Mock()
|
||||
watcher_obj = self._test_watch_create_watcher(path, m_handler)
|
||||
@ -302,8 +314,10 @@ class TestWatcher(test_base.TestCase):
|
||||
|
||||
self.client.watch.assert_called_once()
|
||||
self.assertFalse(watcher_obj._healthy)
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
||||
def test_watch_retry(self):
|
||||
@mock.patch('sys.exit')
|
||||
def test_watch_retry(self, m_sys_exit):
|
||||
path = '/test'
|
||||
events = [{'e': i} for i in range(3)]
|
||||
side_effects = [exceptions.ChunkedEncodingError("Connection Broken")]
|
||||
@ -317,3 +331,4 @@ class TestWatcher(test_base.TestCase):
|
||||
watcher_obj._watch(path)
|
||||
|
||||
m_handler.assert_has_calls([mock.call(e) for e in events])
|
||||
m_sys_exit.assert_called_once_with(1)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
from kuryr_kubernetes import clients
|
||||
@ -55,7 +56,8 @@ class Watcher(health.HealthHandler):
|
||||
graceful=False)` for asynchronous `Watcher`).
|
||||
"""
|
||||
|
||||
def __init__(self, handler, thread_group=None, timeout=None):
|
||||
def __init__(self, handler, thread_group=None, timeout=None,
|
||||
exit_on_stop=False):
|
||||
"""Initializes a new Watcher instance.
|
||||
|
||||
:param handler: a `callable` object to be invoked for each observed
|
||||
@ -82,6 +84,7 @@ class Watcher(health.HealthHandler):
|
||||
if timeout is None:
|
||||
timeout = CONF.kubernetes.watch_retry_timeout
|
||||
self._timeout = timeout
|
||||
self._exit_on_stop = exit_on_stop
|
||||
|
||||
def add(self, path):
|
||||
"""Adds ths K8s resource to the Watcher.
|
||||
@ -152,6 +155,14 @@ class Watcher(health.HealthHandler):
|
||||
LOG.info("Stopped watching '%s'", path)
|
||||
except KeyError:
|
||||
LOG.error("Failed to exit watch gracefully")
|
||||
finally:
|
||||
if not self._watching and not self._idle:
|
||||
self.stop()
|
||||
if self._exit_on_stop:
|
||||
LOG.info("No remaining active watchers, Exiting...")
|
||||
# TODO(dulek): This complicates things, remove once we
|
||||
# don't support running without kuryr-daemon.
|
||||
sys.exit(1)
|
||||
|
||||
def _watch(self, path):
|
||||
attempts = 0
|
||||
|
Loading…
Reference in New Issue
Block a user