diff --git a/nova/flags.py b/nova/flags.py index 9123e9ac..bf83b8e0 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -358,5 +358,6 @@ DEFINE_string('node_availability_zone', 'nova', 'availability zone of this node') DEFINE_string('zone_name', 'nova', 'name of this zone') -DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux', - 'Key/Value tags which represent capabilities of this zone') +DEFINE_list('zone_capabilities', + ['hypervisor=xenserver;kvm', 'os=linux;windows'], + 'Key/Multi-value list representng capabilities of this zone') diff --git a/nova/rpc.py b/nova/rpc.py index 5935e1fb..388f78d6 100644 --- a/nova/rpc.py +++ b/nova/rpc.py @@ -137,24 +137,7 @@ class Consumer(messaging.Consumer): return timer -class Publisher(messaging.Publisher): - """Publisher base class""" - pass - - -class TopicConsumer(Consumer): - """Consumes messages on a specific topic""" - exchange_type = "topic" - - def __init__(self, connection=None, topic="broadcast"): - self.queue = topic - self.routing_key = topic - self.exchange = FLAGS.control_exchange - self.durable = False - super(TopicConsumer, self).__init__(connection=connection) - - -class AdapterConsumer(TopicConsumer): +class AdapterConsumer(Consumer): """Calls methods on a proxy object based on method and args""" def __init__(self, connection=None, topic="broadcast", proxy=None): LOG.debug(_('Initing the Adapter Consumer for %s') % topic) @@ -207,6 +190,41 @@ class AdapterConsumer(TopicConsumer): return +class Publisher(messaging.Publisher): + """Publisher base class""" + pass + + +class TopicAdapterConsumer(AdapterConsumer): + """Consumes messages on a specific topic""" + exchange_type = "topic" + + def __init__(self, connection=None, topic="broadcast", proxy=None): + self.queue = topic + self.routing_key = topic + self.exchange = FLAGS.control_exchange + self.durable = False + super(TopicAdapterConsumer, self).__init__(connection=connection, + topic=topic, proxy=proxy) + + +class FanoutAdapterConsumer(AdapterConsumer): + """Consumes messages from a fanout exchange""" + exchange_type = "fanout" + + def __init__(self, connection=None, topic="broadcast", proxy=None): + self.exchange = "%s_fanout" % topic + self.routing_key = topic + unique = uuid.uuid4().hex + self.queue = "%s_fanout_%s" % (topic, unique) + self.durable = False + LOG.info(_("Created '%(exchange)s' fanout exchange " + "with '%(key)s' routing key"), + dict(exchange=self.exchange, key=self.routing_key)) + super(FanoutAdapterConsumer, self).__init__(connection=connection, + topic=topic, proxy=proxy) + + class TopicPublisher(Publisher): """Publishes messages on a specific topic""" exchange_type = "topic" @@ -218,6 +236,19 @@ class TopicPublisher(Publisher): super(TopicPublisher, self).__init__(connection=connection) +class FanoutPublisher(Publisher): + """Publishes messages to a fanout exchange.""" + exchange_type = "fanout" + + def __init__(self, topic, connection=None): + self.exchange = "%s_fanout" % topic + self.queue = "%s_fanout" % topic + self.durable = False + LOG.info(_("Creating '%(exchange)s' fanout exchange"), + dict(exchange=self.exchange)) + super(FanoutPublisher, self).__init__(connection=connection) + + class DirectConsumer(Consumer): """Consumes messages directly on a channel specified by msg_id""" exchange_type = "direct" @@ -360,6 +391,16 @@ def cast(context, topic, msg): publisher.close() +def fanout_cast(context, topic, msg): + """Sends a message on a fanout exchange without waiting for a response""" + LOG.debug(_("Making asynchronous fanout cast...")) + _pack_context(msg, context) + conn = Connection.instance() + publisher = FanoutPublisher(topic, connection=conn) + publisher.send(msg) + publisher.close() + + def generic_response(message_data, message): """Logs a result and exits""" LOG.debug(_('response %s'), message_data) diff --git a/nova/scheduler/api.py b/nova/scheduler/api.py index 2405f134..19a05b71 100644 --- a/nova/scheduler/api.py +++ b/nova/scheduler/api.py @@ -25,25 +25,40 @@ FLAGS = flags.FLAGS LOG = logging.getLogger('nova.scheduler.api') -class API(object): - """API for interacting with the scheduler.""" +def _call_scheduler(method, context, params=None): + """Generic handler for RPC calls to the scheduler. - def _call_scheduler(self, method, context, params=None): - """Generic handler for RPC calls to the scheduler. + :param params: Optional dictionary of arguments to be passed to the + scheduler worker - :param params: Optional dictionary of arguments to be passed to the - scheduler worker + :retval: Result returned by scheduler worker + """ + if not params: + params = {} + queue = FLAGS.scheduler_topic + kwargs = {'method': method, 'args': params} + return rpc.call(context, queue, kwargs) - :retval: Result returned by scheduler worker - """ - if not params: - params = {} - queue = FLAGS.scheduler_topic - kwargs = {'method': method, 'args': params} - return rpc.call(context, queue, kwargs) - def get_zone_list(self, context): - items = self._call_scheduler('get_zone_list', context) - for item in items: - item['api_url'] = item['api_url'].replace('\\/', '/') - return items +def get_zone_list(context): + """Return a list of zones assoicated with this zone.""" + items = _call_scheduler('get_zone_list', context) + for item in items: + item['api_url'] = item['api_url'].replace('\\/', '/') + return items + + +def get_zone_capabilities(context, service=None): + """Returns a dict of key, value capabilities for this zone, + or for a particular class of services running in this zone.""" + return _call_scheduler('get_zone_capabilities', context=context, + params=dict(service=service)) + + +def update_service_capabilities(context, service_name, host, capabilities): + """Send an update to all the scheduler services informing them + of the capabilities of this service.""" + kwargs = dict(method='update_service_capabilities', + args=dict(service_name=service_name, host=host, + capabilities=capabilities)) + return rpc.fanout_cast(context, 'scheduler', kwargs) diff --git a/nova/scheduler/zone_manager.py b/nova/scheduler/zone_manager.py index edf9000c..c1a50dbc 100644 --- a/nova/scheduler/zone_manager.py +++ b/nova/scheduler/zone_manager.py @@ -105,12 +105,36 @@ class ZoneManager(object): def __init__(self): self.last_zone_db_check = datetime.min self.zone_states = {} + self.service_states = {} # { : { : { cap k : v }}} self.green_pool = greenpool.GreenPool() def get_zone_list(self): """Return the list of zones we know about.""" return [zone.to_dict() for zone in self.zone_states.values()] + def get_zone_capabilities(self, context, service=None): + """Roll up all the individual host info to generic 'service' + capabilities. Each capability is aggregated into + _min and _max values.""" + service_dict = self.service_states + if service: + service_dict = {service: self.service_states.get(service, {})} + + # TODO(sandy) - be smarter about fabricating this structure. + # But it's likely to change once we understand what the Best-Match + # code will need better. + combined = {} # { _ : (min, max), ... } + for service_name, host_dict in service_dict.iteritems(): + for host, caps_dict in host_dict.iteritems(): + for cap, value in caps_dict.iteritems(): + key = "%s_%s" % (service_name, cap) + min_value, max_value = combined.get(key, (value, value)) + min_value = min(min_value, value) + max_value = max(max_value, value) + combined[key] = (min_value, max_value) + + return combined + def _refresh_from_db(self, context): """Make our zone state map match the db.""" # Add/update existing zones ... @@ -141,3 +165,11 @@ class ZoneManager(object): self.last_zone_db_check = datetime.now() self._refresh_from_db(context) self._poll_zones(context) + + def update_service_capabilities(self, service_name, host, capabilities): + """Update the per-service capabilities based on this notification.""" + logging.debug(_("Received %(service_name)s service update from " + "%(host)s: %(capabilities)s") % locals()) + service_caps = self.service_states.get(service_name, {}) + service_caps[host] = capabilities + self.service_states[service_name] = service_caps diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 4820e04f..44d7c91e 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase): super(RpcTestCase, self).setUp() self.conn = rpc.Connection.instance(True) self.receiver = TestReceiver() - self.consumer = rpc.AdapterConsumer(connection=self.conn, + self.consumer = rpc.TopicAdapterConsumer(connection=self.conn, topic='test', proxy=self.receiver) self.consumer.attach_to_eventlet() @@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase): nested = Nested() conn = rpc.Connection.instance(True) - consumer = rpc.AdapterConsumer(connection=conn, + consumer = rpc.TopicAdapterConsumer(connection=conn, topic='nested', proxy=nested) consumer.attach_to_eventlet() diff --git a/nova/tests/test_zones.py b/nova/tests/test_zones.py index 5a52a050..688dc704 100644 --- a/nova/tests/test_zones.py +++ b/nova/tests/test_zones.py @@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase): self.assertEquals(len(zm.zone_states), 1) self.assertEquals(zm.zone_states[1].username, 'user1') + def test_service_capabilities(self): + zm = zone_manager.ZoneManager() + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, {}) + + zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2))) + + zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3))) + + zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30))) + + zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30), + svc10_a=(99, 99), svc10_b=(99, 99))) + + zm.update_service_capabilities("svc1", "host3", dict(c=5)) + caps = zm.get_zone_capabilities(self, None) + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30), + svc1_c=(5, 5), svc10_a=(99, 99), + svc10_b=(99, 99))) + + caps = zm.get_zone_capabilities(self, 'svc1') + self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30), + svc1_c=(5, 5))) + caps = zm.get_zone_capabilities(self, 'svc10') + self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99))) + def test_refresh_from_db_replace_existing(self): zm = zone_manager.ZoneManager() zone_state = zone_manager.ZoneState()