Aggregates capabilities from Compute, Network, Volume to the ZoneManager in Scheduler.
This commit is contained in:
@@ -358,5 +358,6 @@ DEFINE_string('node_availability_zone', 'nova',
|
|||||||
'availability zone of this node')
|
'availability zone of this node')
|
||||||
|
|
||||||
DEFINE_string('zone_name', 'nova', 'name of this zone')
|
DEFINE_string('zone_name', 'nova', 'name of this zone')
|
||||||
DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
|
DEFINE_list('zone_capabilities',
|
||||||
'Key/Value tags which represent capabilities of this zone')
|
['hypervisor=xenserver;kvm', 'os=linux;windows'],
|
||||||
|
'Key/Multi-value list representng capabilities of this zone')
|
||||||
|
77
nova/rpc.py
77
nova/rpc.py
@@ -137,24 +137,7 @@ class Consumer(messaging.Consumer):
|
|||||||
return timer
|
return timer
|
||||||
|
|
||||||
|
|
||||||
class Publisher(messaging.Publisher):
|
class AdapterConsumer(Consumer):
|
||||||
"""Publisher base class"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(Consumer):
|
|
||||||
"""Consumes messages on a specific topic"""
|
|
||||||
exchange_type = "topic"
|
|
||||||
|
|
||||||
def __init__(self, connection=None, topic="broadcast"):
|
|
||||||
self.queue = topic
|
|
||||||
self.routing_key = topic
|
|
||||||
self.exchange = FLAGS.control_exchange
|
|
||||||
self.durable = False
|
|
||||||
super(TopicConsumer, self).__init__(connection=connection)
|
|
||||||
|
|
||||||
|
|
||||||
class AdapterConsumer(TopicConsumer):
|
|
||||||
"""Calls methods on a proxy object based on method and args"""
|
"""Calls methods on a proxy object based on method and args"""
|
||||||
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
||||||
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
|
||||||
@@ -207,6 +190,41 @@ class AdapterConsumer(TopicConsumer):
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class Publisher(messaging.Publisher):
|
||||||
|
"""Publisher base class"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TopicAdapterConsumer(AdapterConsumer):
|
||||||
|
"""Consumes messages on a specific topic"""
|
||||||
|
exchange_type = "topic"
|
||||||
|
|
||||||
|
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
||||||
|
self.queue = topic
|
||||||
|
self.routing_key = topic
|
||||||
|
self.exchange = FLAGS.control_exchange
|
||||||
|
self.durable = False
|
||||||
|
super(TopicAdapterConsumer, self).__init__(connection=connection,
|
||||||
|
topic=topic, proxy=proxy)
|
||||||
|
|
||||||
|
|
||||||
|
class FanoutAdapterConsumer(AdapterConsumer):
|
||||||
|
"""Consumes messages from a fanout exchange"""
|
||||||
|
exchange_type = "fanout"
|
||||||
|
|
||||||
|
def __init__(self, connection=None, topic="broadcast", proxy=None):
|
||||||
|
self.exchange = "%s_fanout" % topic
|
||||||
|
self.routing_key = topic
|
||||||
|
unique = uuid.uuid4().hex
|
||||||
|
self.queue = "%s_fanout_%s" % (topic, unique)
|
||||||
|
self.durable = False
|
||||||
|
LOG.info(_("Created '%(exchange)s' fanout exchange "
|
||||||
|
"with '%(key)s' routing key"),
|
||||||
|
dict(exchange=self.exchange, key=self.routing_key))
|
||||||
|
super(FanoutAdapterConsumer, self).__init__(connection=connection,
|
||||||
|
topic=topic, proxy=proxy)
|
||||||
|
|
||||||
|
|
||||||
class TopicPublisher(Publisher):
|
class TopicPublisher(Publisher):
|
||||||
"""Publishes messages on a specific topic"""
|
"""Publishes messages on a specific topic"""
|
||||||
exchange_type = "topic"
|
exchange_type = "topic"
|
||||||
@@ -218,6 +236,19 @@ class TopicPublisher(Publisher):
|
|||||||
super(TopicPublisher, self).__init__(connection=connection)
|
super(TopicPublisher, self).__init__(connection=connection)
|
||||||
|
|
||||||
|
|
||||||
|
class FanoutPublisher(Publisher):
|
||||||
|
"""Publishes messages to a fanout exchange."""
|
||||||
|
exchange_type = "fanout"
|
||||||
|
|
||||||
|
def __init__(self, topic, connection=None):
|
||||||
|
self.exchange = "%s_fanout" % topic
|
||||||
|
self.queue = "%s_fanout" % topic
|
||||||
|
self.durable = False
|
||||||
|
LOG.info(_("Creating '%(exchange)s' fanout exchange"),
|
||||||
|
dict(exchange=self.exchange))
|
||||||
|
super(FanoutPublisher, self).__init__(connection=connection)
|
||||||
|
|
||||||
|
|
||||||
class DirectConsumer(Consumer):
|
class DirectConsumer(Consumer):
|
||||||
"""Consumes messages directly on a channel specified by msg_id"""
|
"""Consumes messages directly on a channel specified by msg_id"""
|
||||||
exchange_type = "direct"
|
exchange_type = "direct"
|
||||||
@@ -360,6 +391,16 @@ def cast(context, topic, msg):
|
|||||||
publisher.close()
|
publisher.close()
|
||||||
|
|
||||||
|
|
||||||
|
def fanout_cast(context, topic, msg):
|
||||||
|
"""Sends a message on a fanout exchange without waiting for a response"""
|
||||||
|
LOG.debug(_("Making asynchronous fanout cast..."))
|
||||||
|
_pack_context(msg, context)
|
||||||
|
conn = Connection.instance()
|
||||||
|
publisher = FanoutPublisher(topic, connection=conn)
|
||||||
|
publisher.send(msg)
|
||||||
|
publisher.close()
|
||||||
|
|
||||||
|
|
||||||
def generic_response(message_data, message):
|
def generic_response(message_data, message):
|
||||||
"""Logs a result and exits"""
|
"""Logs a result and exits"""
|
||||||
LOG.debug(_('response %s'), message_data)
|
LOG.debug(_('response %s'), message_data)
|
||||||
|
@@ -25,25 +25,40 @@ FLAGS = flags.FLAGS
|
|||||||
LOG = logging.getLogger('nova.scheduler.api')
|
LOG = logging.getLogger('nova.scheduler.api')
|
||||||
|
|
||||||
|
|
||||||
class API(object):
|
def _call_scheduler(method, context, params=None):
|
||||||
"""API for interacting with the scheduler."""
|
"""Generic handler for RPC calls to the scheduler.
|
||||||
|
|
||||||
def _call_scheduler(self, method, context, params=None):
|
:param params: Optional dictionary of arguments to be passed to the
|
||||||
"""Generic handler for RPC calls to the scheduler.
|
scheduler worker
|
||||||
|
|
||||||
:param params: Optional dictionary of arguments to be passed to the
|
:retval: Result returned by scheduler worker
|
||||||
scheduler worker
|
"""
|
||||||
|
if not params:
|
||||||
|
params = {}
|
||||||
|
queue = FLAGS.scheduler_topic
|
||||||
|
kwargs = {'method': method, 'args': params}
|
||||||
|
return rpc.call(context, queue, kwargs)
|
||||||
|
|
||||||
:retval: Result returned by scheduler worker
|
|
||||||
"""
|
|
||||||
if not params:
|
|
||||||
params = {}
|
|
||||||
queue = FLAGS.scheduler_topic
|
|
||||||
kwargs = {'method': method, 'args': params}
|
|
||||||
return rpc.call(context, queue, kwargs)
|
|
||||||
|
|
||||||
def get_zone_list(self, context):
|
def get_zone_list(context):
|
||||||
items = self._call_scheduler('get_zone_list', context)
|
"""Return a list of zones assoicated with this zone."""
|
||||||
for item in items:
|
items = _call_scheduler('get_zone_list', context)
|
||||||
item['api_url'] = item['api_url'].replace('\\/', '/')
|
for item in items:
|
||||||
return items
|
item['api_url'] = item['api_url'].replace('\\/', '/')
|
||||||
|
return items
|
||||||
|
|
||||||
|
|
||||||
|
def get_zone_capabilities(context, service=None):
|
||||||
|
"""Returns a dict of key, value capabilities for this zone,
|
||||||
|
or for a particular class of services running in this zone."""
|
||||||
|
return _call_scheduler('get_zone_capabilities', context=context,
|
||||||
|
params=dict(service=service))
|
||||||
|
|
||||||
|
|
||||||
|
def update_service_capabilities(context, service_name, host, capabilities):
|
||||||
|
"""Send an update to all the scheduler services informing them
|
||||||
|
of the capabilities of this service."""
|
||||||
|
kwargs = dict(method='update_service_capabilities',
|
||||||
|
args=dict(service_name=service_name, host=host,
|
||||||
|
capabilities=capabilities))
|
||||||
|
return rpc.fanout_cast(context, 'scheduler', kwargs)
|
||||||
|
@@ -105,12 +105,36 @@ class ZoneManager(object):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.last_zone_db_check = datetime.min
|
self.last_zone_db_check = datetime.min
|
||||||
self.zone_states = {}
|
self.zone_states = {}
|
||||||
|
self.service_states = {} # { <service> : { <host> : { cap k : v }}}
|
||||||
self.green_pool = greenpool.GreenPool()
|
self.green_pool = greenpool.GreenPool()
|
||||||
|
|
||||||
def get_zone_list(self):
|
def get_zone_list(self):
|
||||||
"""Return the list of zones we know about."""
|
"""Return the list of zones we know about."""
|
||||||
return [zone.to_dict() for zone in self.zone_states.values()]
|
return [zone.to_dict() for zone in self.zone_states.values()]
|
||||||
|
|
||||||
|
def get_zone_capabilities(self, context, service=None):
|
||||||
|
"""Roll up all the individual host info to generic 'service'
|
||||||
|
capabilities. Each capability is aggregated into
|
||||||
|
<cap>_min and <cap>_max values."""
|
||||||
|
service_dict = self.service_states
|
||||||
|
if service:
|
||||||
|
service_dict = {service: self.service_states.get(service, {})}
|
||||||
|
|
||||||
|
# TODO(sandy) - be smarter about fabricating this structure.
|
||||||
|
# But it's likely to change once we understand what the Best-Match
|
||||||
|
# code will need better.
|
||||||
|
combined = {} # { <service>_<cap> : (min, max), ... }
|
||||||
|
for service_name, host_dict in service_dict.iteritems():
|
||||||
|
for host, caps_dict in host_dict.iteritems():
|
||||||
|
for cap, value in caps_dict.iteritems():
|
||||||
|
key = "%s_%s" % (service_name, cap)
|
||||||
|
min_value, max_value = combined.get(key, (value, value))
|
||||||
|
min_value = min(min_value, value)
|
||||||
|
max_value = max(max_value, value)
|
||||||
|
combined[key] = (min_value, max_value)
|
||||||
|
|
||||||
|
return combined
|
||||||
|
|
||||||
def _refresh_from_db(self, context):
|
def _refresh_from_db(self, context):
|
||||||
"""Make our zone state map match the db."""
|
"""Make our zone state map match the db."""
|
||||||
# Add/update existing zones ...
|
# Add/update existing zones ...
|
||||||
@@ -141,3 +165,11 @@ class ZoneManager(object):
|
|||||||
self.last_zone_db_check = datetime.now()
|
self.last_zone_db_check = datetime.now()
|
||||||
self._refresh_from_db(context)
|
self._refresh_from_db(context)
|
||||||
self._poll_zones(context)
|
self._poll_zones(context)
|
||||||
|
|
||||||
|
def update_service_capabilities(self, service_name, host, capabilities):
|
||||||
|
"""Update the per-service capabilities based on this notification."""
|
||||||
|
logging.debug(_("Received %(service_name)s service update from "
|
||||||
|
"%(host)s: %(capabilities)s") % locals())
|
||||||
|
service_caps = self.service_states.get(service_name, {})
|
||||||
|
service_caps[host] = capabilities
|
||||||
|
self.service_states[service_name] = service_caps
|
||||||
|
@@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase):
|
|||||||
super(RpcTestCase, self).setUp()
|
super(RpcTestCase, self).setUp()
|
||||||
self.conn = rpc.Connection.instance(True)
|
self.conn = rpc.Connection.instance(True)
|
||||||
self.receiver = TestReceiver()
|
self.receiver = TestReceiver()
|
||||||
self.consumer = rpc.AdapterConsumer(connection=self.conn,
|
self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
|
||||||
topic='test',
|
topic='test',
|
||||||
proxy=self.receiver)
|
proxy=self.receiver)
|
||||||
self.consumer.attach_to_eventlet()
|
self.consumer.attach_to_eventlet()
|
||||||
@@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase):
|
|||||||
|
|
||||||
nested = Nested()
|
nested = Nested()
|
||||||
conn = rpc.Connection.instance(True)
|
conn = rpc.Connection.instance(True)
|
||||||
consumer = rpc.AdapterConsumer(connection=conn,
|
consumer = rpc.TopicAdapterConsumer(connection=conn,
|
||||||
topic='nested',
|
topic='nested',
|
||||||
proxy=nested)
|
proxy=nested)
|
||||||
consumer.attach_to_eventlet()
|
consumer.attach_to_eventlet()
|
||||||
|
@@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase):
|
|||||||
self.assertEquals(len(zm.zone_states), 1)
|
self.assertEquals(len(zm.zone_states), 1)
|
||||||
self.assertEquals(zm.zone_states[1].username, 'user1')
|
self.assertEquals(zm.zone_states[1].username, 'user1')
|
||||||
|
|
||||||
|
def test_service_capabilities(self):
|
||||||
|
zm = zone_manager.ZoneManager()
|
||||||
|
caps = zm.get_zone_capabilities(self, None)
|
||||||
|
self.assertEquals(caps, {})
|
||||||
|
|
||||||
|
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
|
||||||
|
caps = zm.get_zone_capabilities(self, None)
|
||||||
|
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
|
||||||
|
|
||||||
|
zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
|
||||||
|
caps = zm.get_zone_capabilities(self, None)
|
||||||
|
self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
|
||||||
|
|
||||||
|
zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
|
||||||
|
caps = zm.get_zone_capabilities(self, None)
|
||||||
|
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
|
||||||
|
|
||||||
|
zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
|
||||||
|
caps = zm.get_zone_capabilities(self, None)
|
||||||
|
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
|
||||||
|
svc10_a=(99, 99), svc10_b=(99, 99)))
|
||||||
|
|
||||||
|
zm.update_service_capabilities("svc1", "host3", dict(c=5))
|
||||||
|
caps = zm.get_zone_capabilities(self, None)
|
||||||
|
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
|
||||||
|
svc1_c=(5, 5), svc10_a=(99, 99),
|
||||||
|
svc10_b=(99, 99)))
|
||||||
|
|
||||||
|
caps = zm.get_zone_capabilities(self, 'svc1')
|
||||||
|
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
|
||||||
|
svc1_c=(5, 5)))
|
||||||
|
caps = zm.get_zone_capabilities(self, 'svc10')
|
||||||
|
self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
|
||||||
|
|
||||||
def test_refresh_from_db_replace_existing(self):
|
def test_refresh_from_db_replace_existing(self):
|
||||||
zm = zone_manager.ZoneManager()
|
zm = zone_manager.ZoneManager()
|
||||||
zone_state = zone_manager.ZoneState()
|
zone_state = zone_manager.ZoneState()
|
||||||
|
Reference in New Issue
Block a user