start of fanout

This commit is contained in:
Sandy Walsh
2011-03-03 16:28:04 -04:00
parent 3baf837cea
commit 5d00101e5c
2 changed files with 24 additions and 0 deletions

View File

@@ -218,6 +218,16 @@ class TopicPublisher(Publisher):
super(TopicPublisher, self).__init__(connection=connection)
class FanoutPublisher(Publisher):
"""Publishes messages to a fanout exchange."""
exchange_type = "fanout"
def __init__(self, topic, connection=None):
self.exchange = "%s_fanout" % topic
self.durable = False
super(FanoutPublisher, self).__init__(connection=connection)
class DirectConsumer(Consumer):
"""Consumes messages directly on a channel specified by msg_id"""
exchange_type = "direct"
@@ -360,6 +370,16 @@ def cast(context, topic, msg):
publisher.close()
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response"""
LOG.debug(_("Making asynchronous fanout cast..."))
_pack_context(msg, context)
conn = Connection.instance()
publisher = FanoutPublisher(topic, connection=conn)
publisher.send(msg)
publisher.close()
def generic_response(message_data, message):
"""Logs a result and exits"""
LOG.debug(_('response %s'), message_data)

View File

@@ -105,6 +105,7 @@ class ZoneManager(object):
def __init__(self):
self.last_zone_db_check = datetime.min
self.zone_states = {}
self.compute_states = {}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
@@ -141,3 +142,6 @@ class ZoneManager(object):
self.last_zone_db_check = datetime.now()
self._refresh_from_db(context)
self._poll_zones(context)
def update_compute_capabilities(self):
logging.debug(_("****** UPDATE COMPUTE CAPABILITIES *******"))