From 7600620be8a2dd6b014ebd3d9891393f8b2ea7e7 Mon Sep 17 00:00:00 2001 From: Erik Olof Gunnar Andersson Date: Mon, 23 Dec 2019 00:23:39 -0800 Subject: [PATCH] Fixing services getting stuck on shutdown The new heartbeat code was causing the service to not exit properly. This patch removes the use of a thread group for handling heartbeats and replaces it with a FixedIntervalLoopingCall instead. Closes-Bug: #1857476 Change-Id: Ida918a2d670a69cc8995983d23e5424bd48de8a9 (cherry picked from commit 136a9f79fafaf94d18b81618943c0f73b38bfdfe) --- designate/cmd/agent.py | 2 +- designate/cmd/api.py | 2 +- designate/cmd/central.py | 3 +- designate/cmd/mdns.py | 2 +- designate/cmd/producer.py | 2 +- designate/cmd/sink.py | 2 +- designate/cmd/worker.py | 2 +- designate/service.py | 5 +-- designate/service_status.py | 28 ++++++------ designate/tests/unit/test_heartbeat.py | 18 ++++---- designate/tests/unit/test_service_status.py | 48 +++++++++++++-------- 11 files changed, 62 insertions(+), 52 deletions(-) diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py index e96a5d59b..edc7f46da 100644 --- a/designate/cmd/agent.py +++ b/designate/cmd/agent.py @@ -38,7 +38,7 @@ def main(): hookpoints.log_hook_setup() server = agent_service.Service() - heartbeat = service.Heartbeat(server.service_name, server.tg) + heartbeat = service.Heartbeat(server.service_name) service.serve(server, workers=CONF['service:agent'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/api.py b/designate/cmd/api.py index 6ac6d5581..61b079117 100644 --- a/designate/cmd/api.py +++ b/designate/cmd/api.py @@ -40,7 +40,7 @@ def main(): hookpoints.log_hook_setup() server = api_service.Service() - heartbeat = service.Heartbeat(server.service_name, server.tg) + heartbeat = service.Heartbeat(server.service_name) service.serve(server, workers=CONF['service:api'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/central.py b/designate/cmd/central.py index 80ee8b930..6ae0524ae 100644 --- a/designate/cmd/central.py +++ b/designate/cmd/central.py @@ -38,8 +38,7 @@ def main(): hookpoints.log_hook_setup() server = central_service.Service() - heartbeat = service.Heartbeat(server.service_name, server.tg, - rpc_api=server) + heartbeat = service.Heartbeat(server.service_name, rpc_api=server) service.serve(server, workers=CONF['service:central'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py index e79586dc8..b8993d9cf 100644 --- a/designate/cmd/mdns.py +++ b/designate/cmd/mdns.py @@ -38,7 +38,7 @@ def main(): hookpoints.log_hook_setup() server = mdns_service.Service() - heartbeat = service.Heartbeat(server.service_name, server.tg) + heartbeat = service.Heartbeat(server.service_name) service.serve(server, workers=CONF['service:mdns'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py index 4bf187323..89808f27e 100644 --- a/designate/cmd/producer.py +++ b/designate/cmd/producer.py @@ -38,7 +38,7 @@ def main(): hookpoints.log_hook_setup() server = producer_service.Service() - heartbeat = service.Heartbeat(server.service_name, server.tg) + heartbeat = service.Heartbeat(server.service_name) service.serve(server, workers=CONF['service:producer'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/sink.py b/designate/cmd/sink.py index 7242b198c..307257120 100644 --- a/designate/cmd/sink.py +++ b/designate/cmd/sink.py @@ -38,7 +38,7 @@ def main(): hookpoints.log_hook_setup() server = sink_service.Service() - heartbeat = service.Heartbeat(server.service_name, server.tg) + heartbeat = service.Heartbeat(server.service_name) service.serve(server, workers=CONF['service:sink'].workers) heartbeat.start() service.wait() diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py index e0046ac2e..2e0f07af4 100644 --- a/designate/cmd/worker.py +++ b/designate/cmd/worker.py @@ -38,7 +38,7 @@ def main(): hookpoints.log_hook_setup() server = worker_service.Service() - heartbeat = service.Heartbeat(server.service_name, server.tg) + heartbeat = service.Heartbeat(server.service_name) service.serve(server, workers=CONF['service:worker'].workers) heartbeat.start() service.wait() diff --git a/designate/service.py b/designate/service.py index e4e590b55..db9df513d 100644 --- a/designate/service.py +++ b/designate/service.py @@ -68,9 +68,8 @@ class Service(service.Service): class Heartbeat(object): - def __init__(self, name, tg, rpc_api=None): + def __init__(self, name, rpc_api=None): self.name = name - self.tg = tg self._status = 'UP' self._stats = {} @@ -80,7 +79,7 @@ class Heartbeat(object): CONF.heartbeat_emitter.emitter_type ) self.heartbeat_emitter = emitter_cls( - self.name, self.tg, + self.name, status_factory=self.get_status, rpc_api=rpc_api ) diff --git a/designate/service_status.py b/designate/service_status.py index b6f0ba241..22d4410ef 100644 --- a/designate/service_status.py +++ b/designate/service_status.py @@ -14,6 +14,7 @@ import abc from oslo_log import log as logging +from oslo_service import loopingcall from oslo_utils import timeutils import designate.conf @@ -31,19 +32,15 @@ class HeartBeatEmitter(plugin.DriverPlugin): __plugin_ns__ = 'designate.heartbeat_emitter' __plugin_type__ = 'heartbeat_emitter' - def __init__(self, service, thread_group, status_factory=None, - *args, **kwargs): + def __init__(self, service, status_factory=None, *args, **kwargs): super(HeartBeatEmitter, self).__init__() self._service = service self._hostname = CONF.host - self._running = False - self._tg = thread_group - self._tg.add_timer( - CONF.heartbeat_emitter.heartbeat_interval, - self._emit_heartbeat) - + self._timer = loopingcall.FixedIntervalLoopingCall( + self._emit_heartbeat + ) self._status_factory = status_factory def _get_status(self): @@ -56,9 +53,6 @@ class HeartBeatEmitter(plugin.DriverPlugin): """ Returns Status, Stats, Capabilities """ - if not self._running: - return - status, stats, capabilities = self._get_status() service_status = objects.ServiceStatus( @@ -79,10 +73,13 @@ class HeartBeatEmitter(plugin.DriverPlugin): pass def start(self): - self._running = True + self._timer.start( + CONF.heartbeat_emitter.heartbeat_interval, + stop_on_exception=False + ) def stop(self): - self._running = False + self._timer.stop() class NoopEmitter(HeartBeatEmitter): @@ -95,9 +92,8 @@ class NoopEmitter(HeartBeatEmitter): class RpcEmitter(HeartBeatEmitter): __plugin_name__ = 'rpc' - def __init__(self, service, thread_group, rpc_api=None, *args, **kwargs): - super(RpcEmitter, self).__init__( - service, thread_group, *args, **kwargs) + def __init__(self, service, rpc_api=None, *args, **kwargs): + super(RpcEmitter, self).__init__(service, *args, **kwargs) self.rpc_api = rpc_api def _transmit(self, status): diff --git a/designate/tests/unit/test_heartbeat.py b/designate/tests/unit/test_heartbeat.py index fbb01862f..a531d5b01 100644 --- a/designate/tests/unit/test_heartbeat.py +++ b/designate/tests/unit/test_heartbeat.py @@ -13,6 +13,7 @@ import mock import oslotest.base from oslo_config import cfg from oslo_config import fixture as cfg_fixture +from oslo_service import loopingcall from designate import service @@ -20,14 +21,18 @@ CONF = cfg.CONF class HeartbeatTest(oslotest.base.BaseTestCase): - def setUp(self): + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def setUp(self, mock_looping): super(HeartbeatTest, self).setUp() + + self.mock_timer = mock.Mock() + mock_looping.return_value = self.mock_timer + self.useFixture(cfg_fixture.Config(CONF)) CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter') - self.mock_tg = mock.Mock() - self.heartbeat = service.Heartbeat('test', self.mock_tg) + self.heartbeat = service.Heartbeat('test') def test_get_status(self): self.assertEqual(('UP', {}, {},), self.heartbeat.get_status()) @@ -38,16 +43,13 @@ class HeartbeatTest(oslotest.base.BaseTestCase): ) def test_start_heartbeat(self): - self.assertFalse(self.heartbeat.heartbeat_emitter._running) - self.heartbeat.start() - self.assertTrue(self.heartbeat.heartbeat_emitter._running) + self.mock_timer.start.assert_called_once() def test_stop_heartbeat(self): - self.assertFalse(self.heartbeat.heartbeat_emitter._running) self.heartbeat.start() self.heartbeat.stop() - self.assertFalse(self.heartbeat.heartbeat_emitter._running) + self.mock_timer.stop.assert_called_once() diff --git a/designate/tests/unit/test_service_status.py b/designate/tests/unit/test_service_status.py index a8fe38808..c78272890 100644 --- a/designate/tests/unit/test_service_status.py +++ b/designate/tests/unit/test_service_status.py @@ -14,6 +14,7 @@ import mock import oslotest.base from oslo_config import cfg +from oslo_service import loopingcall from designate import objects from designate import service_status @@ -22,40 +23,46 @@ from designate import service_status class NoopEmitterTest(oslotest.base.BaseTestCase): def setUp(self): super(NoopEmitterTest, self).setUp() - self.mock_tg = mock.Mock() - def test_init(self): - service_status.NoopEmitter("svc", self.mock_tg) + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def test_start(self, mock_looping): + mock_timer = mock.Mock() + mock_looping.return_value = mock_timer - def test_start(self): - emitter = service_status.NoopEmitter("svc", self.mock_tg) + emitter = service_status.NoopEmitter("svc") emitter.start() - self.mock_tg.add_timer.assert_called_once_with( - 10.0, emitter._emit_heartbeat) + mock_timer.start.assert_called_once() - def test_stop(self): - mock_pulse = mock.Mock() - self.mock_tg.add_timer.return_value = mock_pulse + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def test_stop(self, mock_looping): + mock_timer = mock.Mock() + mock_looping.return_value = mock_timer - emitter = service_status.NoopEmitter("svc", self.mock_tg) + emitter = service_status.NoopEmitter("svc") emitter.start() emitter.stop() - self.assertFalse(emitter._running) + mock_timer.stop.assert_called_once() class RpcEmitterTest(oslotest.base.BaseTestCase): def setUp(self): super(RpcEmitterTest, self).setUp() - self.mock_tg = mock.Mock() @mock.patch.object(objects, "ServiceStatus") @mock.patch("designate.context.DesignateContext.get_admin_context") - def test_emit_no_status_factory(self, mock_context, mock_service_status): - emitter = service_status.RpcEmitter("svc", self.mock_tg) + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def test_emit_no_status_factory(self, mock_looping, mock_context, + mock_service_status): + mock_timer = mock.Mock() + mock_looping.return_value = mock_timer + + emitter = service_status.RpcEmitter("svc") emitter.start() + mock_timer.start.assert_called_once() + central = mock.Mock() with mock.patch("designate.central.rpcapi.CentralAPI.get_instance", return_value=central): @@ -76,16 +83,23 @@ class RpcEmitterTest(oslotest.base.BaseTestCase): @mock.patch.object(objects, "ServiceStatus") @mock.patch("designate.context.DesignateContext.get_admin_context") - def test_emit_status_factory(self, mock_context, mock_service_status): + @mock.patch.object(loopingcall, 'FixedIntervalLoopingCall') + def test_emit_status_factory(self, mock_looping, mock_context, + mock_service_status): + mock_timer = mock.Mock() + mock_looping.return_value = mock_timer + status = False stats = {"a": 1} capabilities = {"b": 2} status_factory = mock.Mock(return_value=(status, stats, capabilities,)) - emitter = service_status.RpcEmitter("svc", self.mock_tg, + emitter = service_status.RpcEmitter("svc", status_factory=status_factory) emitter.start() + mock_timer.start.assert_called_once() + central = mock.Mock() with mock.patch("designate.central.rpcapi.CentralAPI.get_instance", return_value=central):