From 6a7fe4ab413bb144432a5dfd4294c7d4c1096b34 Mon Sep 17 00:00:00 2001 From: yanyanhu Date: Tue, 6 Jan 2015 01:17:28 -0500 Subject: [PATCH] Implement _broadcast_dispatcher in EngineService. --- senlin/engine/service.py | 82 +++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/senlin/engine/service.py b/senlin/engine/service.py index 0decc8f10..cecdb4544 100644 --- a/senlin/engine/service.py +++ b/senlin/engine/service.py @@ -13,7 +13,6 @@ import functools -import eventlet from oslo.config import cfg from oslo import messaging from oslo.utils import uuidutils @@ -57,16 +56,18 @@ class Dispatcher(service.Service): OPERATIONS = (NEW_ACTION, CANCEL_ACTION, SEND, STOP) = ( 'new_action', 'cancel_action', 'send', 'stop') - def __init__(self, host, engine_id, thread_group_mgr): + def __init__(self, engine_id, topic, version, thread_group_mgr): super(Dispatcher, self).__init__() self.TG = thread_group_mgr self.engine_id = engine_id - self.host = host + self.topic = topic + self.version = version def start(self): super(Dispatcher, self).start() - self.target = messaging.Target( - server=self.host, topic=self.engine_id) + self.target = messaging.Target(server=self.engine_id, + topic=self.topic, + version=self.version) server = rpc_messaging.get_rpc_server(self.target, self) server.start() @@ -96,7 +97,7 @@ class Dispatcher(service.Service): def stop(self): super(Dispatcher, self).stop() # Wait for all action threads to be finished - LOG.info(_LI("Stopping all action threads of engine %s"), + LOG.info(_LI("Stopping all action threads of engine %s"), self.engine_id) # Stop ThreadGroup gracefully self.TG.stop(True) @@ -126,6 +127,7 @@ class EngineService(service.Service): # is ready self.host = host self.topic = topic + self.dispatcher_topic = self.topic + '-dispatcher' # The following are initialized here, but assigned in start() which # happens after the fork when spawning multiple worker processes @@ -138,15 +140,18 @@ class EngineService(service.Service): self.TG = scheduler.ThreadGroupManager() # TODO(Yanyan): create a dispatcher for this engine thread. - # This dispatcher will run in a greenthread and it will not + # This dispatcher will run in a greenthread and it will not # stop until being notified or the engine is stopped. - self.dispatcher = Dispatcher(self.host, self.engine_id, self.TG) + self.dispatcher = Dispatcher(self.engine_id, + self.dispatcher_topic, + self.RPC_API_VERSION, + self.TG) LOG.debug("Starting dispatcher for engine %s" % self.engine_id) self.dispatcher.start() - target = messaging.Target( - version=self.RPC_API_VERSION, server=self.host, - topic=self.topic) + target = messaging.Target(version=self.RPC_API_VERSION, + server=self.host, + topic=self.topic) self.target = target server = rpc_messaging.get_rpc_server(target, self) server.start() @@ -165,9 +170,9 @@ class EngineService(service.Service): pass # Notify dispatcher to stop all action threads it started. - res = self._notify_dispatcher(context, - self.engine_id, - self.dispatcher.STOP) + self._notify_dispatcher(context, + self.engine_id, + self.dispatcher.STOP) # Terminate the engine process LOG.info(_LI("All threads were gone, terminating engine")) @@ -296,12 +301,11 @@ class EngineService(service.Service): # Build an Action for Cluster creating action = actions.Action(context, cluster, 'CLUSTER_CREATE', **kwargs) action.store() - # Notify Dispatcher that a new action has been ready. - # TODO(Yanyan): We should broadcast this new action - # coming to all active Dispatchers. - res = self._notify_dispatcher( - context, self.engine_id, self.dispatcher.NEW_ACTION, - action_id=action.id) + # Notify Dispatchers that a new action has been ready. + self._broadcast_dispatcher(context, + self.engine_id, + self.dispatcher.NEW_ACTION, + action_id=action.id) return cluster.id @@ -333,9 +337,10 @@ class EngineService(service.Service): } action = actions.Action(context, cluster, 'CLUSTER_UPDATE', **kwargs) - res = self._notify_dispatcher( - context, self.engine_id, self.dispatcher.NEW_ACTION, - action_id=action.id) + self._broadcast_dispatcher(context, + self.engine_id, + self.dispatcher.NEW_ACTION, + action_id=action.id) return cluster.id @@ -351,28 +356,35 @@ class EngineService(service.Service): db_cluster = self._get_cluster(context, cluster_identity) LOG.info(_LI('Deleting cluster %s'), db_cluster.name) - # This is an operation on a cluster, so we try to acquire ClusterLock cluster = clusters.Cluster.load(context, cluster=db_cluster) action = actions.Action(context, cluster, 'CLUSTER_DELETE') - res = self._notify_dispatcher( - context, self.engine_id, self.dispatcher.NEW_ACTION, - action_id=action.id) + res = self._broadcast_dispatcher(context, + self.engine_id, + self.dispatcher.NEW_ACTION, + action_id=action.id) return res def _notify_dispatcher(self, cnxt, engine_id, call, *args, **kwargs): '''Send notification to specific dispatcher''' timeout = cfg.CONF.engine_life_check_timeout - self.cctxt = self._client.prepare( - version='1.0', - timeout=timeout, - topic=engine_id) + self.cctxt = self._client.prepare(version=self.RPC_API_VERSION, + timeout=timeout, + topic=self.dispatcher_topic, + server=engine_id) try: self.cctxt.call(cnxt, call, *args, **kwargs) except messaging.MessagingTimeout: return False - def _broadcast_dispatcher(self, cnxt, engine_id, call, *args, - **kwargs): - '''Broadcast the notification to all active dispatchers''' - raise NotImplementedError + def _broadcast_dispatcher(self, cnxt, call, *args, **kwargs): + '''Broadcast notification to all active dispatchers''' + timeout = cfg.CONF.engine_life_check_timeout + self.cctxt = self._client.prepare(version=self.RPC_API_VERSION, + timeout=timeout, + topic=self.dispatcher_topic, + fanout=True) + try: + self.cctxt.call(cnxt, call, *args, **kwargs) + except messaging.MessagingTimeout: + return False