Implement _broadcast_dispatcher in EngineService.

This commit is contained in:
yanyanhu 2015-01-06 01:17:28 -05:00
parent 2c82c8e07f
commit 6a7fe4ab41

View File

@ -13,7 +13,6 @@
import functools
import eventlet
from oslo.config import cfg
from oslo import messaging
from oslo.utils import uuidutils
@ -57,16 +56,18 @@ class Dispatcher(service.Service):
OPERATIONS = (NEW_ACTION, CANCEL_ACTION, SEND, STOP) = (
'new_action', 'cancel_action', 'send', 'stop')
def __init__(self, host, engine_id, thread_group_mgr):
def __init__(self, engine_id, topic, version, thread_group_mgr):
super(Dispatcher, self).__init__()
self.TG = thread_group_mgr
self.engine_id = engine_id
self.host = host
self.topic = topic
self.version = version
def start(self):
super(Dispatcher, self).start()
self.target = messaging.Target(
server=self.host, topic=self.engine_id)
self.target = messaging.Target(server=self.engine_id,
topic=self.topic,
version=self.version)
server = rpc_messaging.get_rpc_server(self.target, self)
server.start()
@ -96,7 +97,7 @@ class Dispatcher(service.Service):
def stop(self):
super(Dispatcher, self).stop()
# Wait for all action threads to be finished
LOG.info(_LI("Stopping all action threads of engine %s"),
LOG.info(_LI("Stopping all action threads of engine %s"),
self.engine_id)
# Stop ThreadGroup gracefully
self.TG.stop(True)
@ -126,6 +127,7 @@ class EngineService(service.Service):
# is ready
self.host = host
self.topic = topic
self.dispatcher_topic = self.topic + '-dispatcher'
# The following are initialized here, but assigned in start() which
# happens after the fork when spawning multiple worker processes
@ -138,15 +140,18 @@ class EngineService(service.Service):
self.TG = scheduler.ThreadGroupManager()
# TODO(Yanyan): create a dispatcher for this engine thread.
# This dispatcher will run in a greenthread and it will not
# This dispatcher will run in a greenthread and it will not
# stop until being notified or the engine is stopped.
self.dispatcher = Dispatcher(self.host, self.engine_id, self.TG)
self.dispatcher = Dispatcher(self.engine_id,
self.dispatcher_topic,
self.RPC_API_VERSION,
self.TG)
LOG.debug("Starting dispatcher for engine %s" % self.engine_id)
self.dispatcher.start()
target = messaging.Target(
version=self.RPC_API_VERSION, server=self.host,
topic=self.topic)
target = messaging.Target(version=self.RPC_API_VERSION,
server=self.host,
topic=self.topic)
self.target = target
server = rpc_messaging.get_rpc_server(target, self)
server.start()
@ -165,9 +170,9 @@ class EngineService(service.Service):
pass
# Notify dispatcher to stop all action threads it started.
res = self._notify_dispatcher(context,
self.engine_id,
self.dispatcher.STOP)
self._notify_dispatcher(context,
self.engine_id,
self.dispatcher.STOP)
# Terminate the engine process
LOG.info(_LI("All threads were gone, terminating engine"))
@ -296,12 +301,11 @@ class EngineService(service.Service):
# Build an Action for Cluster creating
action = actions.Action(context, cluster, 'CLUSTER_CREATE', **kwargs)
action.store()
# Notify Dispatcher that a new action has been ready.
# TODO(Yanyan): We should broadcast this new action
# coming to all active Dispatchers.
res = self._notify_dispatcher(
context, self.engine_id, self.dispatcher.NEW_ACTION,
action_id=action.id)
# Notify Dispatchers that a new action has been ready.
self._broadcast_dispatcher(context,
self.engine_id,
self.dispatcher.NEW_ACTION,
action_id=action.id)
return cluster.id
@ -333,9 +337,10 @@ class EngineService(service.Service):
}
action = actions.Action(context, cluster, 'CLUSTER_UPDATE', **kwargs)
res = self._notify_dispatcher(
context, self.engine_id, self.dispatcher.NEW_ACTION,
action_id=action.id)
self._broadcast_dispatcher(context,
self.engine_id,
self.dispatcher.NEW_ACTION,
action_id=action.id)
return cluster.id
@ -351,28 +356,35 @@ class EngineService(service.Service):
db_cluster = self._get_cluster(context, cluster_identity)
LOG.info(_LI('Deleting cluster %s'), db_cluster.name)
# This is an operation on a cluster, so we try to acquire ClusterLock
cluster = clusters.Cluster.load(context, cluster=db_cluster)
action = actions.Action(context, cluster, 'CLUSTER_DELETE')
res = self._notify_dispatcher(
context, self.engine_id, self.dispatcher.NEW_ACTION,
action_id=action.id)
res = self._broadcast_dispatcher(context,
self.engine_id,
self.dispatcher.NEW_ACTION,
action_id=action.id)
return res
def _notify_dispatcher(self, cnxt, engine_id, call, *args, **kwargs):
'''Send notification to specific dispatcher'''
timeout = cfg.CONF.engine_life_check_timeout
self.cctxt = self._client.prepare(
version='1.0',
timeout=timeout,
topic=engine_id)
self.cctxt = self._client.prepare(version=self.RPC_API_VERSION,
timeout=timeout,
topic=self.dispatcher_topic,
server=engine_id)
try:
self.cctxt.call(cnxt, call, *args, **kwargs)
except messaging.MessagingTimeout:
return False
def _broadcast_dispatcher(self, cnxt, engine_id, call, *args,
**kwargs):
'''Broadcast the notification to all active dispatchers'''
raise NotImplementedError
def _broadcast_dispatcher(self, cnxt, call, *args, **kwargs):
'''Broadcast notification to all active dispatchers'''
timeout = cfg.CONF.engine_life_check_timeout
self.cctxt = self._client.prepare(version=self.RPC_API_VERSION,
timeout=timeout,
topic=self.dispatcher_topic,
fanout=True)
try:
self.cctxt.call(cnxt, call, *args, **kwargs)
except messaging.MessagingTimeout:
return False