Merge "Cleanup service usage"

This commit is contained in:
Jenkins 2016-11-22 13:14:09 +00:00 committed by Gerrit Code Review
commit bcf3889774
4 changed files with 23 additions and 21 deletions

View File

@ -250,7 +250,7 @@ class ThreadGroupManager(object):
@profiler.trace_cls("rpc")
class EngineListener(service.Service):
class EngineListener(object):
"""Listen on an AMQP queue named for the engine.
Allows individual engines to communicate with each other for multi-engine
@ -260,18 +260,27 @@ class EngineListener(service.Service):
ACTIONS = (STOP_STACK, SEND) = ('stop_stack', 'send')
def __init__(self, host, engine_id, thread_group_mgr):
super(EngineListener, self).__init__()
self.thread_group_mgr = thread_group_mgr
self.engine_id = engine_id
self.host = host
self._server = None
def start(self):
super(EngineListener, self).start()
self.target = messaging.Target(
server=self.engine_id,
topic=rpc_api.LISTENER_TOPIC)
server = rpc_messaging.get_rpc_server(self.target, self)
server.start()
self._server = rpc_messaging.get_rpc_server(self.target, self)
self._server.start()
def stop(self):
if self._server is not None:
LOG.debug("Attempting to stop engine listener...")
try:
self._server.stop()
self._server.wait()
LOG.info(_LI("Engine listener is stopped successfully"))
except Exception as e:
LOG.error(_LE("Failed to stop engine listener, %s"), e)
def listening(self, ctxt):
"""Respond to a watchdog request.
@ -292,7 +301,7 @@ class EngineListener(service.Service):
@profiler.trace_cls("rpc")
class EngineService(service.Service):
class EngineService(service.ServiceBase):
"""Manages the running instances from creation to destruction.
All the methods in here are called from the RPC backend. This is
@ -306,7 +315,6 @@ class EngineService(service.Service):
RPC_API_VERSION = '1.35'
def __init__(self, host, topic):
super(EngineService, self).__init__()
resources.initialise()
self.host = host
self.topic = topic
@ -399,8 +407,6 @@ class EngineService(service.Service):
self.service_manage_report)
self.manage_thread_grp.add_thread(self.reset_stack_status)
super(EngineService, self).start()
def _configure_db_conn_pool_size(self):
# bug #1491185
# Set the DB max_overflow to match the thread pool size.
@ -427,6 +433,8 @@ class EngineService(service.Service):
def stop(self):
self._stop_rpc_server()
if self.listener:
self.listener.stop()
if cfg.CONF.convergence_engine and self.worker_service:
# Stop the WorkerService
@ -451,10 +459,11 @@ class EngineService(service.Service):
# Terminate the engine process
LOG.info(_LI("All threads were gone, terminating engine"))
super(EngineService, self).stop()
def wait(self):
pass
def reset(self):
super(EngineService, self).reset()
logging.setup(cfg.CONF, 'heat')
@context.request_context

View File

@ -15,7 +15,6 @@ import uuid
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_service import service
from oslo_utils import timeutils
import requests
import six
@ -37,7 +36,7 @@ from heat.rpc import api as rpc_api
LOG = logging.getLogger(__name__)
class SoftwareConfigService(service.Service):
class SoftwareConfigService(object):
def show_software_config(self, cnxt, config_id):
sc = software_config_object.SoftwareConfig.get_by_id(cnxt, config_id)

View File

@ -17,7 +17,6 @@ import eventlet.queue
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from oslo_utils import uuidutils
from osprofiler import profiler
@ -40,7 +39,7 @@ CANCEL_RETRIES = 3
@profiler.trace_cls("rpc")
class WorkerService(service.Service):
class WorkerService(object):
"""Service that has 'worker' actor in convergence.
This service is dedicated to handle internal messages to the 'worker'
@ -57,7 +56,6 @@ class WorkerService(service.Service):
topic,
engine_id,
thread_group_mgr):
super(WorkerService, self).__init__()
self.host = host
self.topic = topic
self.engine_id = engine_id
@ -81,8 +79,6 @@ class WorkerService(service.Service):
self._rpc_server = rpc_messaging.get_rpc_server(target, self)
self._rpc_server.start()
super(WorkerService, self).start()
def stop(self):
if self._rpc_server is None:
return
@ -96,8 +92,6 @@ class WorkerService(service.Service):
LOG.error(_LE("%(topic)s is failed to stop, %(exc)s"),
{'topic': self.topic, 'exc': e})
super(WorkerService, self).stop()
def stop_traversal(self, stack):
"""Update current traversal to stop workers from propagating.

View File

@ -329,7 +329,7 @@ class ServiceEngineTest(common.HeatTestCase):
self.eng.thread_group_mgr.stop.assert_has_calls(calls, True)
# # Manage Thread group
self.eng.manage_thread_grp.stop.assert_called_with(False)
self.eng.manage_thread_grp.stop.assert_called_with()
# Service delete
admin_context_method.assert_called_once_with()