start of fanout
This commit is contained in:
20
nova/rpc.py
20
nova/rpc.py
@@ -218,6 +218,16 @@ class TopicPublisher(Publisher):
|
||||
super(TopicPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
"""Publishes messages to a fanout exchange."""
|
||||
exchange_type = "fanout"
|
||||
|
||||
def __init__(self, topic, connection=None):
|
||||
self.exchange = "%s_fanout" % topic
|
||||
self.durable = False
|
||||
super(FanoutPublisher, self).__init__(connection=connection)
|
||||
|
||||
|
||||
class DirectConsumer(Consumer):
|
||||
"""Consumes messages directly on a channel specified by msg_id"""
|
||||
exchange_type = "direct"
|
||||
@@ -360,6 +370,16 @@ def cast(context, topic, msg):
|
||||
publisher.close()
|
||||
|
||||
|
||||
def fanout_cast(context, topic, msg):
|
||||
"""Sends a message on a fanout exchange without waiting for a response"""
|
||||
LOG.debug(_("Making asynchronous fanout cast..."))
|
||||
_pack_context(msg, context)
|
||||
conn = Connection.instance()
|
||||
publisher = FanoutPublisher(topic, connection=conn)
|
||||
publisher.send(msg)
|
||||
publisher.close()
|
||||
|
||||
|
||||
def generic_response(message_data, message):
|
||||
"""Logs a result and exits"""
|
||||
LOG.debug(_('response %s'), message_data)
|
||||
|
||||
@@ -59,6 +59,10 @@ class SchedulerManager(manager.Manager):
|
||||
"""Get a list of zones from the ZoneManager."""
|
||||
return self.zone_manager.get_zone_list()
|
||||
|
||||
def update_compute_capabilities(self, context=None):
|
||||
"""Process a compute node update."""
|
||||
return self.zone_manager.update_compute_capabilities()
|
||||
|
||||
def _schedule(self, method, context, topic, *args, **kwargs):
|
||||
"""Tries to call schedule_* method on the driver to retrieve host.
|
||||
|
||||
|
||||
@@ -105,6 +105,7 @@ class ZoneManager(object):
|
||||
def __init__(self):
|
||||
self.last_zone_db_check = datetime.min
|
||||
self.zone_states = {}
|
||||
self.compute_states = {}
|
||||
self.green_pool = greenpool.GreenPool()
|
||||
|
||||
def get_zone_list(self):
|
||||
@@ -141,3 +142,6 @@ class ZoneManager(object):
|
||||
self.last_zone_db_check = datetime.now()
|
||||
self._refresh_from_db(context)
|
||||
self._poll_zones(context)
|
||||
|
||||
def update_compute_capabilities(self):
|
||||
logging.debug(_("****** UPDATE COMPUTE CAPABILITIES *******"))
|
||||
|
||||
@@ -84,6 +84,7 @@ class Service(object):
|
||||
|
||||
conn1 = rpc.Connection.instance(new=True)
|
||||
conn2 = rpc.Connection.instance(new=True)
|
||||
conn3 = rpc.Connection.instance(new=True)
|
||||
if self.report_interval:
|
||||
consumer_all = rpc.AdapterConsumer(
|
||||
connection=conn1,
|
||||
@@ -93,9 +94,14 @@ class Service(object):
|
||||
connection=conn2,
|
||||
topic='%s.%s' % (self.topic, self.host),
|
||||
proxy=self)
|
||||
fanout = rpc.AdapterConsumer(
|
||||
connection=conn2,
|
||||
topic='%s_fanout' % self.topic,
|
||||
proxy=self)
|
||||
|
||||
self.timers.append(consumer_all.attach_to_eventlet())
|
||||
self.timers.append(consumer_node.attach_to_eventlet())
|
||||
self.timers.append(fanout.attach_to_eventlet())
|
||||
|
||||
pulse = utils.LoopingCall(self.report_state)
|
||||
pulse.start(interval=self.report_interval, now=False)
|
||||
|
||||
Reference in New Issue
Block a user