diff --git a/heat/engine/service.py b/heat/engine/service.py index dbf5c34819..c8159a6182 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -250,7 +250,7 @@ class ThreadGroupManager(object): @profiler.trace_cls("rpc") -class EngineListener(service.Service): +class EngineListener(object): """Listen on an AMQP queue named for the engine. Allows individual engines to communicate with each other for multi-engine @@ -260,18 +260,27 @@ class EngineListener(service.Service): ACTIONS = (STOP_STACK, SEND) = ('stop_stack', 'send') def __init__(self, host, engine_id, thread_group_mgr): - super(EngineListener, self).__init__() self.thread_group_mgr = thread_group_mgr self.engine_id = engine_id self.host = host + self._server = None def start(self): - super(EngineListener, self).start() self.target = messaging.Target( server=self.engine_id, topic=rpc_api.LISTENER_TOPIC) - server = rpc_messaging.get_rpc_server(self.target, self) - server.start() + self._server = rpc_messaging.get_rpc_server(self.target, self) + self._server.start() + + def stop(self): + if self._server is not None: + LOG.debug("Attempting to stop engine listener...") + try: + self._server.stop() + self._server.wait() + LOG.info(_LI("Engine listener is stopped successfully")) + except Exception as e: + LOG.error(_LE("Failed to stop engine listener, %s"), e) def listening(self, ctxt): """Respond to a watchdog request. @@ -292,7 +301,7 @@ class EngineListener(service.Service): @profiler.trace_cls("rpc") -class EngineService(service.Service): +class EngineService(service.ServiceBase): """Manages the running instances from creation to destruction. All the methods in here are called from the RPC backend. This is @@ -306,7 +315,6 @@ class EngineService(service.Service): RPC_API_VERSION = '1.35' def __init__(self, host, topic): - super(EngineService, self).__init__() resources.initialise() self.host = host self.topic = topic @@ -399,8 +407,6 @@ class EngineService(service.Service): self.service_manage_report) self.manage_thread_grp.add_thread(self.reset_stack_status) - super(EngineService, self).start() - def _configure_db_conn_pool_size(self): # bug #1491185 # Set the DB max_overflow to match the thread pool size. @@ -427,6 +433,8 @@ class EngineService(service.Service): def stop(self): self._stop_rpc_server() + if self.listener: + self.listener.stop() if cfg.CONF.convergence_engine and self.worker_service: # Stop the WorkerService @@ -451,10 +459,11 @@ class EngineService(service.Service): # Terminate the engine process LOG.info(_LI("All threads were gone, terminating engine")) - super(EngineService, self).stop() + + def wait(self): + pass def reset(self): - super(EngineService, self).reset() logging.setup(cfg.CONF, 'heat') @context.request_context diff --git a/heat/engine/service_software_config.py b/heat/engine/service_software_config.py index d309393ad0..ad48c9df02 100644 --- a/heat/engine/service_software_config.py +++ b/heat/engine/service_software_config.py @@ -15,7 +15,6 @@ import uuid from oslo_log import log as logging from oslo_serialization import jsonutils -from oslo_service import service from oslo_utils import timeutils import requests import six @@ -37,7 +36,7 @@ from heat.rpc import api as rpc_api LOG = logging.getLogger(__name__) -class SoftwareConfigService(service.Service): +class SoftwareConfigService(object): def show_software_config(self, cnxt, config_id): sc = software_config_object.SoftwareConfig.get_by_id(cnxt, config_id) diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 6957585354..59973bf243 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -17,7 +17,6 @@ import eventlet.queue from oslo_log import log as logging import oslo_messaging -from oslo_service import service from oslo_utils import uuidutils from osprofiler import profiler @@ -40,7 +39,7 @@ CANCEL_RETRIES = 3 @profiler.trace_cls("rpc") -class WorkerService(service.Service): +class WorkerService(object): """Service that has 'worker' actor in convergence. This service is dedicated to handle internal messages to the 'worker' @@ -57,7 +56,6 @@ class WorkerService(service.Service): topic, engine_id, thread_group_mgr): - super(WorkerService, self).__init__() self.host = host self.topic = topic self.engine_id = engine_id @@ -81,8 +79,6 @@ class WorkerService(service.Service): self._rpc_server = rpc_messaging.get_rpc_server(target, self) self._rpc_server.start() - super(WorkerService, self).start() - def stop(self): if self._rpc_server is None: return @@ -96,8 +92,6 @@ class WorkerService(service.Service): LOG.error(_LE("%(topic)s is failed to stop, %(exc)s"), {'topic': self.topic, 'exc': e}) - super(WorkerService, self).stop() - def stop_traversal(self, stack): """Update current traversal to stop workers from propagating. diff --git a/heat/tests/engine/service/test_service_engine.py b/heat/tests/engine/service/test_service_engine.py index e98e3a92ee..28431cfbc1 100644 --- a/heat/tests/engine/service/test_service_engine.py +++ b/heat/tests/engine/service/test_service_engine.py @@ -329,7 +329,7 @@ class ServiceEngineTest(common.HeatTestCase): self.eng.thread_group_mgr.stop.assert_has_calls(calls, True) # # Manage Thread group - self.eng.manage_thread_grp.stop.assert_called_with(False) + self.eng.manage_thread_grp.stop.assert_called_with() # Service delete admin_context_method.assert_called_once_with()