Merged with trunk
Updated net injection for xenapi reflecting recent changes for libvirt
This commit is contained in:
		@@ -34,12 +34,14 @@ if os.path.exists(os.path.join(possible_topdir, 'nova', '__init__.py')):
 | 
			
		||||
 | 
			
		||||
gettext.install('nova', unicode=1)
 | 
			
		||||
 | 
			
		||||
from nova import compute
 | 
			
		||||
from nova import flags
 | 
			
		||||
from nova import log as logging
 | 
			
		||||
from nova import network
 | 
			
		||||
from nova import utils
 | 
			
		||||
from nova import volume
 | 
			
		||||
from nova import wsgi
 | 
			
		||||
from nova.api import direct
 | 
			
		||||
from nova.compute import api as compute_api
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
FLAGS = flags.FLAGS
 | 
			
		||||
@@ -50,13 +52,42 @@ flags.DEFINE_flag(flags.HelpshortFlag())
 | 
			
		||||
flags.DEFINE_flag(flags.HelpXMLFlag())
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# An example of an API that only exposes read-only methods.
 | 
			
		||||
# In this case we're just limiting which methods are exposed.
 | 
			
		||||
class ReadOnlyCompute(direct.Limited):
 | 
			
		||||
    """Read-only Compute API."""
 | 
			
		||||
 | 
			
		||||
    _allowed = ['get', 'get_all', 'get_console_output']
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# An example of an API that provides a backwards compatibility layer.
 | 
			
		||||
# In this case we're overwriting the implementation to ensure
 | 
			
		||||
# compatibility with an older version. In reality we would want the
 | 
			
		||||
# "description=None" to be part of the actual API so that code
 | 
			
		||||
# like this isn't even necessary, but this example shows what one can
 | 
			
		||||
# do if that isn't the situation.
 | 
			
		||||
class VolumeVersionOne(direct.Limited):
 | 
			
		||||
    _allowed = ['create', 'delete', 'update', 'get']
 | 
			
		||||
 | 
			
		||||
    def create(self, context, size, name):
 | 
			
		||||
        self.proxy.create(context, size, name, description=None)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
    utils.default_flagfile()
 | 
			
		||||
    FLAGS(sys.argv)
 | 
			
		||||
    logging.setup()
 | 
			
		||||
 | 
			
		||||
    direct.register_service('compute', compute_api.API())
 | 
			
		||||
    direct.register_service('compute', compute.API())
 | 
			
		||||
    direct.register_service('volume', volume.API())
 | 
			
		||||
    direct.register_service('network', network.API())
 | 
			
		||||
    direct.register_service('reflect', direct.Reflection())
 | 
			
		||||
 | 
			
		||||
    # Here is how we could expose the code in the examples above.
 | 
			
		||||
    #direct.register_service('compute-readonly',
 | 
			
		||||
    #                        ReadOnlyCompute(compute.API()))
 | 
			
		||||
    #direct.register_service('volume-v1', VolumeVersionOne(volume.API()))
 | 
			
		||||
 | 
			
		||||
    router = direct.Router()
 | 
			
		||||
    with_json = direct.JsonParamsMiddleware(router)
 | 
			
		||||
    with_req = direct.PostParamsMiddleware(with_json)
 | 
			
		||||
 
 | 
			
		||||
@@ -97,6 +97,7 @@ flags.DECLARE('vlan_start', 'nova.network.manager')
 | 
			
		||||
flags.DECLARE('vpn_start', 'nova.network.manager')
 | 
			
		||||
flags.DECLARE('fixed_range_v6', 'nova.network.manager')
 | 
			
		||||
flags.DECLARE('images_path', 'nova.image.local')
 | 
			
		||||
flags.DECLARE('libvirt_type', 'nova.virt.libvirt_conn')
 | 
			
		||||
flags.DEFINE_flag(flags.HelpFlag())
 | 
			
		||||
flags.DEFINE_flag(flags.HelpshortFlag())
 | 
			
		||||
flags.DEFINE_flag(flags.HelpXMLFlag())
 | 
			
		||||
@@ -610,7 +611,7 @@ class ServiceCommands(object):
 | 
			
		||||
        args: [host] [service]"""
 | 
			
		||||
        ctxt = context.get_admin_context()
 | 
			
		||||
        now = datetime.datetime.utcnow()
 | 
			
		||||
        services = db.service_get_all(ctxt) + db.service_get_all(ctxt, True)
 | 
			
		||||
        services = db.service_get_all(ctxt)
 | 
			
		||||
        if host:
 | 
			
		||||
            services = [s for s in services if s['host'] == host]
 | 
			
		||||
        if service:
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										14
									
								
								bin/stack
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								bin/stack
									
									
									
									
									
								
							@@ -59,11 +59,21 @@ USAGE = """usage: stack [options] <controller> <method> [arg1=value arg2=value]
 | 
			
		||||
 | 
			
		||||
def format_help(d):
 | 
			
		||||
    """Format help text, keys are labels and values are descriptions."""
 | 
			
		||||
    MAX_INDENT = 30
 | 
			
		||||
    indent = max([len(k) for k in d])
 | 
			
		||||
    if indent > MAX_INDENT:
 | 
			
		||||
        indent = MAX_INDENT - 6
 | 
			
		||||
 | 
			
		||||
    out = []
 | 
			
		||||
    for k, v in d.iteritems():
 | 
			
		||||
        t = textwrap.TextWrapper(initial_indent='   %s   ' % k.ljust(indent),
 | 
			
		||||
                                 subsequent_indent=' ' * (indent + 6))
 | 
			
		||||
        if (len(k) + 6) > MAX_INDENT:
 | 
			
		||||
            out.extend(['   %s' % k])
 | 
			
		||||
            initial_indent = ' ' * (indent + 6)
 | 
			
		||||
        else:
 | 
			
		||||
            initial_indent = '   %s   ' % k.ljust(indent)
 | 
			
		||||
        subsequent_indent = ' ' * (indent + 6)
 | 
			
		||||
        t = textwrap.TextWrapper(initial_indent=initial_indent,
 | 
			
		||||
                                 subsequent_indent=subsequent_indent)
 | 
			
		||||
        out.extend(t.wrap(v))
 | 
			
		||||
    return out
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -298,6 +298,8 @@ DEFINE_string('ec2_dmz_host', '$my_ip', 'internal ip of api server')
 | 
			
		||||
DEFINE_integer('ec2_port', 8773, 'cloud controller port')
 | 
			
		||||
DEFINE_string('ec2_scheme', 'http', 'prefix for ec2')
 | 
			
		||||
DEFINE_string('ec2_path', '/services/Cloud', 'suffix for ec2')
 | 
			
		||||
DEFINE_string('osapi_extensions_path', '/var/lib/nova/extensions',
 | 
			
		||||
               'default directory for nova extensions')
 | 
			
		||||
DEFINE_string('osapi_host', '$my_ip', 'ip of api server')
 | 
			
		||||
DEFINE_string('osapi_scheme', 'http', 'prefix for openstack')
 | 
			
		||||
DEFINE_integer('osapi_port', 8774, 'OpenStack API port')
 | 
			
		||||
@@ -358,5 +360,6 @@ DEFINE_string('node_availability_zone', 'nova',
 | 
			
		||||
              'availability zone of this node')
 | 
			
		||||
 | 
			
		||||
DEFINE_string('zone_name', 'nova', 'name of this zone')
 | 
			
		||||
DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
 | 
			
		||||
              'Key/Value tags which represent capabilities of this zone')
 | 
			
		||||
DEFINE_list('zone_capabilities',
 | 
			
		||||
                ['hypervisor=xenserver;kvm', 'os=linux;windows'],
 | 
			
		||||
                 'Key/Multi-value list representng capabilities of this zone')
 | 
			
		||||
 
 | 
			
		||||
@@ -53,11 +53,14 @@ This module provides Manager, a base class for managers.
 | 
			
		||||
 | 
			
		||||
from nova import utils
 | 
			
		||||
from nova import flags
 | 
			
		||||
from nova import log as logging
 | 
			
		||||
from nova.db import base
 | 
			
		||||
 | 
			
		||||
from nova.scheduler import api
 | 
			
		||||
 | 
			
		||||
FLAGS = flags.FLAGS
 | 
			
		||||
 | 
			
		||||
LOG = logging.getLogger('nova.manager')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Manager(base.Base):
 | 
			
		||||
    def __init__(self, host=None, db_driver=None):
 | 
			
		||||
@@ -74,3 +77,29 @@ class Manager(base.Base):
 | 
			
		||||
        """Do any initialization that needs to be run if this is a standalone
 | 
			
		||||
        service. Child classes should override this method."""
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SchedulerDependentManager(Manager):
 | 
			
		||||
    """Periodically send capability updates to the Scheduler services.
 | 
			
		||||
       Services that need to update the Scheduler of their capabilities
 | 
			
		||||
       should derive from this class. Otherwise they can derive from
 | 
			
		||||
       manager.Manager directly. Updates are only sent after
 | 
			
		||||
       update_service_capabilities is called with non-None values."""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, host=None, db_driver=None, service_name="undefined"):
 | 
			
		||||
        self.last_capabilities = None
 | 
			
		||||
        self.service_name = service_name
 | 
			
		||||
        super(SchedulerDependentManager, self).__init__(host, db_driver)
 | 
			
		||||
 | 
			
		||||
    def update_service_capabilities(self, capabilities):
 | 
			
		||||
        """Remember these capabilities to send on next periodic update."""
 | 
			
		||||
        self.last_capabilities = capabilities
 | 
			
		||||
 | 
			
		||||
    def periodic_tasks(self, context=None):
 | 
			
		||||
        """Pass data back to the scheduler at a periodic interval"""
 | 
			
		||||
        if self.last_capabilities:
 | 
			
		||||
            LOG.debug(_("Notifying Schedulers of capabilities ..."))
 | 
			
		||||
            api.update_service_capabilities(context, self.service_name,
 | 
			
		||||
                                self.host, self.last_capabilities)
 | 
			
		||||
 | 
			
		||||
        super(SchedulerDependentManager, self).periodic_tasks(context)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										77
									
								
								nova/rpc.py
									
									
									
									
									
								
							
							
						
						
									
										77
									
								
								nova/rpc.py
									
									
									
									
									
								
							@@ -137,24 +137,7 @@ class Consumer(messaging.Consumer):
 | 
			
		||||
        return timer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Publisher(messaging.Publisher):
 | 
			
		||||
    """Publisher base class"""
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TopicConsumer(Consumer):
 | 
			
		||||
    """Consumes messages on a specific topic"""
 | 
			
		||||
    exchange_type = "topic"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, connection=None, topic="broadcast"):
 | 
			
		||||
        self.queue = topic
 | 
			
		||||
        self.routing_key = topic
 | 
			
		||||
        self.exchange = FLAGS.control_exchange
 | 
			
		||||
        self.durable = False
 | 
			
		||||
        super(TopicConsumer, self).__init__(connection=connection)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class AdapterConsumer(TopicConsumer):
 | 
			
		||||
class AdapterConsumer(Consumer):
 | 
			
		||||
    """Calls methods on a proxy object based on method and args"""
 | 
			
		||||
    def __init__(self, connection=None, topic="broadcast", proxy=None):
 | 
			
		||||
        LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
 | 
			
		||||
@@ -207,6 +190,41 @@ class AdapterConsumer(TopicConsumer):
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Publisher(messaging.Publisher):
 | 
			
		||||
    """Publisher base class"""
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TopicAdapterConsumer(AdapterConsumer):
 | 
			
		||||
    """Consumes messages on a specific topic"""
 | 
			
		||||
    exchange_type = "topic"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, connection=None, topic="broadcast", proxy=None):
 | 
			
		||||
        self.queue = topic
 | 
			
		||||
        self.routing_key = topic
 | 
			
		||||
        self.exchange = FLAGS.control_exchange
 | 
			
		||||
        self.durable = False
 | 
			
		||||
        super(TopicAdapterConsumer, self).__init__(connection=connection,
 | 
			
		||||
                                    topic=topic, proxy=proxy)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FanoutAdapterConsumer(AdapterConsumer):
 | 
			
		||||
    """Consumes messages from a fanout exchange"""
 | 
			
		||||
    exchange_type = "fanout"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, connection=None, topic="broadcast", proxy=None):
 | 
			
		||||
        self.exchange = "%s_fanout" % topic
 | 
			
		||||
        self.routing_key = topic
 | 
			
		||||
        unique = uuid.uuid4().hex
 | 
			
		||||
        self.queue = "%s_fanout_%s" % (topic, unique)
 | 
			
		||||
        self.durable = False
 | 
			
		||||
        LOG.info(_("Created '%(exchange)s' fanout exchange "
 | 
			
		||||
                   "with '%(key)s' routing key"),
 | 
			
		||||
                dict(exchange=self.exchange, key=self.routing_key))
 | 
			
		||||
        super(FanoutAdapterConsumer, self).__init__(connection=connection,
 | 
			
		||||
                                    topic=topic, proxy=proxy)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TopicPublisher(Publisher):
 | 
			
		||||
    """Publishes messages on a specific topic"""
 | 
			
		||||
    exchange_type = "topic"
 | 
			
		||||
@@ -218,6 +236,19 @@ class TopicPublisher(Publisher):
 | 
			
		||||
        super(TopicPublisher, self).__init__(connection=connection)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FanoutPublisher(Publisher):
 | 
			
		||||
    """Publishes messages to a fanout exchange."""
 | 
			
		||||
    exchange_type = "fanout"
 | 
			
		||||
 | 
			
		||||
    def __init__(self, topic, connection=None):
 | 
			
		||||
        self.exchange = "%s_fanout" % topic
 | 
			
		||||
        self.queue = "%s_fanout" % topic
 | 
			
		||||
        self.durable = False
 | 
			
		||||
        LOG.info(_("Creating '%(exchange)s' fanout exchange"),
 | 
			
		||||
                            dict(exchange=self.exchange))
 | 
			
		||||
        super(FanoutPublisher, self).__init__(connection=connection)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DirectConsumer(Consumer):
 | 
			
		||||
    """Consumes messages directly on a channel specified by msg_id"""
 | 
			
		||||
    exchange_type = "direct"
 | 
			
		||||
@@ -360,6 +391,16 @@ def cast(context, topic, msg):
 | 
			
		||||
    publisher.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def fanout_cast(context, topic, msg):
 | 
			
		||||
    """Sends a message on a fanout exchange without waiting for a response"""
 | 
			
		||||
    LOG.debug(_("Making asynchronous fanout cast..."))
 | 
			
		||||
    _pack_context(msg, context)
 | 
			
		||||
    conn = Connection.instance()
 | 
			
		||||
    publisher = FanoutPublisher(topic, connection=conn)
 | 
			
		||||
    publisher.send(msg)
 | 
			
		||||
    publisher.close()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def generic_response(message_data, message):
 | 
			
		||||
    """Logs a result and exits"""
 | 
			
		||||
    LOG.debug(_('response %s'), message_data)
 | 
			
		||||
 
 | 
			
		||||
@@ -17,18 +17,25 @@
 | 
			
		||||
Handles all requests relating to schedulers.
 | 
			
		||||
"""
 | 
			
		||||
 | 
			
		||||
import novaclient
 | 
			
		||||
 | 
			
		||||
from nova import db
 | 
			
		||||
from nova import exception
 | 
			
		||||
from nova import flags
 | 
			
		||||
from nova import log as logging
 | 
			
		||||
from nova import rpc
 | 
			
		||||
 | 
			
		||||
from eventlet import greenpool
 | 
			
		||||
 | 
			
		||||
FLAGS = flags.FLAGS
 | 
			
		||||
flags.DEFINE_bool('enable_zone_routing',
 | 
			
		||||
    False,
 | 
			
		||||
    'When True, routing to child zones will occur.')
 | 
			
		||||
 | 
			
		||||
LOG = logging.getLogger('nova.scheduler.api')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class API(object):
 | 
			
		||||
    """API for interacting with the scheduler."""
 | 
			
		||||
 | 
			
		||||
    def _call_scheduler(self, method, context, params=None):
 | 
			
		||||
def _call_scheduler(method, context, params=None):
 | 
			
		||||
    """Generic handler for RPC calls to the scheduler.
 | 
			
		||||
 | 
			
		||||
    :param params: Optional dictionary of arguments to be passed to the
 | 
			
		||||
@@ -42,8 +49,193 @@ class API(object):
 | 
			
		||||
    kwargs = {'method': method, 'args': params}
 | 
			
		||||
    return rpc.call(context, queue, kwargs)
 | 
			
		||||
 | 
			
		||||
    def get_zone_list(self, context):
 | 
			
		||||
        items = self._call_scheduler('get_zone_list', context)
 | 
			
		||||
 | 
			
		||||
def get_zone_list(context):
 | 
			
		||||
    """Return a list of zones assoicated with this zone."""
 | 
			
		||||
    items = _call_scheduler('get_zone_list', context)
 | 
			
		||||
    for item in items:
 | 
			
		||||
        item['api_url'] = item['api_url'].replace('\\/', '/')
 | 
			
		||||
    if not items:
 | 
			
		||||
        items = db.zone_get_all(context)
 | 
			
		||||
    return items
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def zone_get(context, zone_id):
 | 
			
		||||
    return db.zone_get(context, zone_id)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def zone_delete(context, zone_id):
 | 
			
		||||
    return db.zone_delete(context, zone_id)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def zone_create(context, data):
 | 
			
		||||
    return db.zone_create(context, data)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def zone_update(context, zone_id, data):
 | 
			
		||||
    return db.zone_update(context, zone_id, data)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_zone_capabilities(context, service=None):
 | 
			
		||||
    """Returns a dict of key, value capabilities for this zone,
 | 
			
		||||
       or for a particular class of services running in this zone."""
 | 
			
		||||
    return _call_scheduler('get_zone_capabilities', context=context,
 | 
			
		||||
                          params=dict(service=service))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_service_capabilities(context, service_name, host, capabilities):
 | 
			
		||||
    """Send an update to all the scheduler services informing them
 | 
			
		||||
       of the capabilities of this service."""
 | 
			
		||||
    kwargs = dict(method='update_service_capabilities',
 | 
			
		||||
                  args=dict(service_name=service_name, host=host,
 | 
			
		||||
                            capabilities=capabilities))
 | 
			
		||||
    return rpc.fanout_cast(context, 'scheduler', kwargs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _wrap_method(function, self):
 | 
			
		||||
    """Wrap method to supply self."""
 | 
			
		||||
    def _wrap(*args, **kwargs):
 | 
			
		||||
        return function(self, *args, **kwargs)
 | 
			
		||||
    return _wrap
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _process(func, zone):
 | 
			
		||||
    """Worker stub for green thread pool. Give the worker
 | 
			
		||||
    an authenticated nova client and zone info."""
 | 
			
		||||
    nova = novaclient.OpenStack(zone.username, zone.password, zone.api_url)
 | 
			
		||||
    nova.authenticate()
 | 
			
		||||
    return func(nova, zone)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def child_zone_helper(zone_list, func):
 | 
			
		||||
    """Fire off a command to each zone in the list.
 | 
			
		||||
    The return is [novaclient return objects] from each child zone.
 | 
			
		||||
    For example, if you are calling server.pause(), the list will
 | 
			
		||||
    be whatever the response from server.pause() is. One entry
 | 
			
		||||
    per child zone called."""
 | 
			
		||||
    green_pool = greenpool.GreenPool()
 | 
			
		||||
    return [result for result in green_pool.imap(
 | 
			
		||||
                    _wrap_method(_process, func), zone_list)]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _issue_novaclient_command(nova, zone, collection, method_name, item_id):
 | 
			
		||||
    """Use novaclient to issue command to a single child zone.
 | 
			
		||||
       One of these will be run in parallel for each child zone."""
 | 
			
		||||
    manager = getattr(nova, collection)
 | 
			
		||||
    result = None
 | 
			
		||||
    try:
 | 
			
		||||
        try:
 | 
			
		||||
            result = manager.get(int(item_id))
 | 
			
		||||
        except ValueError, e:
 | 
			
		||||
            result = manager.find(name=item_id)
 | 
			
		||||
    except novaclient.NotFound:
 | 
			
		||||
        url = zone.api_url
 | 
			
		||||
        LOG.debug(_("%(collection)s '%(item_id)s' not found on '%(url)s'" %
 | 
			
		||||
                                                locals()))
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    if method_name.lower() not in ['get', 'find']:
 | 
			
		||||
        result = getattr(result, method_name)()
 | 
			
		||||
    return result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def wrap_novaclient_function(f, collection, method_name, item_id):
 | 
			
		||||
    """Appends collection, method_name and item_id to the incoming
 | 
			
		||||
    (nova, zone) call from child_zone_helper."""
 | 
			
		||||
    def inner(nova, zone):
 | 
			
		||||
        return f(nova, zone, collection, method_name, item_id)
 | 
			
		||||
 | 
			
		||||
    return inner
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RedirectResult(exception.Error):
 | 
			
		||||
    """Used to the HTTP API know that these results are pre-cooked
 | 
			
		||||
    and they can be returned to the caller directly."""
 | 
			
		||||
    def __init__(self, results):
 | 
			
		||||
        self.results = results
 | 
			
		||||
        super(RedirectResult, self).__init__(
 | 
			
		||||
               message=_("Uncaught Zone redirection exception"))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class reroute_compute(object):
 | 
			
		||||
    """Decorator used to indicate that the method should
 | 
			
		||||
       delegate the call the child zones if the db query
 | 
			
		||||
       can't find anything."""
 | 
			
		||||
    def __init__(self, method_name):
 | 
			
		||||
        self.method_name = method_name
 | 
			
		||||
 | 
			
		||||
    def __call__(self, f):
 | 
			
		||||
        def wrapped_f(*args, **kwargs):
 | 
			
		||||
            collection, context, item_id = \
 | 
			
		||||
                            self.get_collection_context_and_id(args, kwargs)
 | 
			
		||||
            try:
 | 
			
		||||
                # Call the original function ...
 | 
			
		||||
                return f(*args, **kwargs)
 | 
			
		||||
            except exception.InstanceNotFound, e:
 | 
			
		||||
                LOG.debug(_("Instance %(item_id)s not found "
 | 
			
		||||
                                    "locally: '%(e)s'" % locals()))
 | 
			
		||||
 | 
			
		||||
                if not FLAGS.enable_zone_routing:
 | 
			
		||||
                    raise
 | 
			
		||||
 | 
			
		||||
                zones = db.zone_get_all(context)
 | 
			
		||||
                if not zones:
 | 
			
		||||
                    raise
 | 
			
		||||
 | 
			
		||||
                # Ask the children to provide an answer ...
 | 
			
		||||
                LOG.debug(_("Asking child zones ..."))
 | 
			
		||||
                result = self._call_child_zones(zones,
 | 
			
		||||
                            wrap_novaclient_function(_issue_novaclient_command,
 | 
			
		||||
                                   collection, self.method_name, item_id))
 | 
			
		||||
                # Scrub the results and raise another exception
 | 
			
		||||
                # so the API layers can bail out gracefully ...
 | 
			
		||||
                raise RedirectResult(self.unmarshall_result(result))
 | 
			
		||||
        return wrapped_f
 | 
			
		||||
 | 
			
		||||
    def _call_child_zones(self, zones, function):
 | 
			
		||||
        """Ask the child zones to perform this operation.
 | 
			
		||||
        Broken out for testing."""
 | 
			
		||||
        return child_zone_helper(zones, function)
 | 
			
		||||
 | 
			
		||||
    def get_collection_context_and_id(self, args, kwargs):
 | 
			
		||||
        """Returns a tuple of (novaclient collection name, security
 | 
			
		||||
           context and resource id. Derived class should override this."""
 | 
			
		||||
        context = kwargs.get('context', None)
 | 
			
		||||
        instance_id = kwargs.get('instance_id', None)
 | 
			
		||||
        if len(args) > 0 and not context:
 | 
			
		||||
            context = args[1]
 | 
			
		||||
        if len(args) > 1 and not instance_id:
 | 
			
		||||
            instance_id = args[2]
 | 
			
		||||
        return ("servers", context, instance_id)
 | 
			
		||||
 | 
			
		||||
    def unmarshall_result(self, zone_responses):
 | 
			
		||||
        """Result is a list of responses from each child zone.
 | 
			
		||||
        Each decorator derivation is responsible to turning this
 | 
			
		||||
        into a format expected by the calling method. For
 | 
			
		||||
        example, this one is expected to return a single Server
 | 
			
		||||
        dict {'server':{k:v}}. Others may return a list of them, like
 | 
			
		||||
        {'servers':[{k,v}]}"""
 | 
			
		||||
        reduced_response = []
 | 
			
		||||
        for zone_response in zone_responses:
 | 
			
		||||
            if not zone_response:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            server = zone_response.__dict__
 | 
			
		||||
 | 
			
		||||
            for k in server.keys():
 | 
			
		||||
                if k[0] == '_' or k == 'manager':
 | 
			
		||||
                    del server[k]
 | 
			
		||||
 | 
			
		||||
            reduced_response.append(dict(server=server))
 | 
			
		||||
        if reduced_response:
 | 
			
		||||
            return reduced_response[0]  # first for now.
 | 
			
		||||
        return {}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def redirect_handler(f):
 | 
			
		||||
    def new_f(*args, **kwargs):
 | 
			
		||||
        try:
 | 
			
		||||
            return f(*args, **kwargs)
 | 
			
		||||
        except RedirectResult, e:
 | 
			
		||||
            return e.results
 | 
			
		||||
    return new_f
 | 
			
		||||
 
 | 
			
		||||
@@ -49,6 +49,13 @@ class WillNotSchedule(exception.Error):
 | 
			
		||||
class Scheduler(object):
 | 
			
		||||
    """The base class that all Scheduler clases should inherit from."""
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.zone_manager = None
 | 
			
		||||
 | 
			
		||||
    def set_zone_manager(self, zone_manager):
 | 
			
		||||
        """Called by the Scheduler Service to supply a ZoneManager."""
 | 
			
		||||
        self.zone_manager = zone_manager
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def service_is_up(service):
 | 
			
		||||
        """Check whether a service is up based on last heartbeat."""
 | 
			
		||||
 
 | 
			
		||||
@@ -41,10 +41,11 @@ flags.DEFINE_string('scheduler_driver',
 | 
			
		||||
class SchedulerManager(manager.Manager):
 | 
			
		||||
    """Chooses a host to run instances on."""
 | 
			
		||||
    def __init__(self, scheduler_driver=None, *args, **kwargs):
 | 
			
		||||
        self.zone_manager = zone_manager.ZoneManager()
 | 
			
		||||
        if not scheduler_driver:
 | 
			
		||||
            scheduler_driver = FLAGS.scheduler_driver
 | 
			
		||||
        self.driver = utils.import_object(scheduler_driver)
 | 
			
		||||
        self.zone_manager = zone_manager.ZoneManager()
 | 
			
		||||
        self.driver.set_zone_manager(self.zone_manager)
 | 
			
		||||
        super(SchedulerManager, self).__init__(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def __getattr__(self, key):
 | 
			
		||||
@@ -59,6 +60,17 @@ class SchedulerManager(manager.Manager):
 | 
			
		||||
        """Get a list of zones from the ZoneManager."""
 | 
			
		||||
        return self.zone_manager.get_zone_list()
 | 
			
		||||
 | 
			
		||||
    def get_zone_capabilities(self, context=None, service=None):
 | 
			
		||||
        """Get the normalized set of capabilites for this zone,
 | 
			
		||||
           or for a particular service."""
 | 
			
		||||
        return self.zone_manager.get_zone_capabilities(context, service)
 | 
			
		||||
 | 
			
		||||
    def update_service_capabilities(self, context=None, service_name=None,
 | 
			
		||||
                                                host=None, capabilities={}):
 | 
			
		||||
        """Process a capability update from a service node."""
 | 
			
		||||
        self.zone_manager.update_service_capabilities(service_name,
 | 
			
		||||
                            host, capabilities)
 | 
			
		||||
 | 
			
		||||
    def _schedule(self, method, context, topic, *args, **kwargs):
 | 
			
		||||
        """Tries to call schedule_* method on the driver to retrieve host.
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -58,8 +58,9 @@ class ZoneState(object):
 | 
			
		||||
           child zone."""
 | 
			
		||||
        self.last_seen = datetime.now()
 | 
			
		||||
        self.attempt = 0
 | 
			
		||||
        self.name = zone_metadata["name"]
 | 
			
		||||
        self.capabilities = zone_metadata["capabilities"]
 | 
			
		||||
        self.name = zone_metadata.get("name", "n/a")
 | 
			
		||||
        self.capabilities = ", ".join(["%s=%s" % (k, v)
 | 
			
		||||
                        for k, v in zone_metadata.iteritems() if k != 'name'])
 | 
			
		||||
        self.is_active = True
 | 
			
		||||
 | 
			
		||||
    def to_dict(self):
 | 
			
		||||
@@ -104,13 +105,37 @@ class ZoneManager(object):
 | 
			
		||||
    """Keeps the zone states updated."""
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.last_zone_db_check = datetime.min
 | 
			
		||||
        self.zone_states = {}
 | 
			
		||||
        self.zone_states = {}  # { <zone_id> : ZoneState }
 | 
			
		||||
        self.service_states = {}  # { <service> : { <host> : { cap k : v }}}
 | 
			
		||||
        self.green_pool = greenpool.GreenPool()
 | 
			
		||||
 | 
			
		||||
    def get_zone_list(self):
 | 
			
		||||
        """Return the list of zones we know about."""
 | 
			
		||||
        return [zone.to_dict() for zone in self.zone_states.values()]
 | 
			
		||||
 | 
			
		||||
    def get_zone_capabilities(self, context, service=None):
 | 
			
		||||
        """Roll up all the individual host info to generic 'service'
 | 
			
		||||
           capabilities. Each capability is aggregated into
 | 
			
		||||
           <cap>_min and <cap>_max values."""
 | 
			
		||||
        service_dict = self.service_states
 | 
			
		||||
        if service:
 | 
			
		||||
            service_dict = {service: self.service_states.get(service, {})}
 | 
			
		||||
 | 
			
		||||
        # TODO(sandy) - be smarter about fabricating this structure.
 | 
			
		||||
        # But it's likely to change once we understand what the Best-Match
 | 
			
		||||
        # code will need better.
 | 
			
		||||
        combined = {}  # { <service>_<cap> : (min, max), ... }
 | 
			
		||||
        for service_name, host_dict in service_dict.iteritems():
 | 
			
		||||
            for host, caps_dict in host_dict.iteritems():
 | 
			
		||||
                for cap, value in caps_dict.iteritems():
 | 
			
		||||
                    key = "%s_%s" % (service_name, cap)
 | 
			
		||||
                    min_value, max_value = combined.get(key, (value, value))
 | 
			
		||||
                    min_value = min(min_value, value)
 | 
			
		||||
                    max_value = max(max_value, value)
 | 
			
		||||
                    combined[key] = (min_value, max_value)
 | 
			
		||||
 | 
			
		||||
        return combined
 | 
			
		||||
 | 
			
		||||
    def _refresh_from_db(self, context):
 | 
			
		||||
        """Make our zone state map match the db."""
 | 
			
		||||
        # Add/update existing zones ...
 | 
			
		||||
@@ -141,3 +166,11 @@ class ZoneManager(object):
 | 
			
		||||
            self.last_zone_db_check = datetime.now()
 | 
			
		||||
            self._refresh_from_db(context)
 | 
			
		||||
        self._poll_zones(context)
 | 
			
		||||
 | 
			
		||||
    def update_service_capabilities(self, service_name, host, capabilities):
 | 
			
		||||
        """Update the per-service capabilities based on this notification."""
 | 
			
		||||
        logging.debug(_("Received %(service_name)s service update from "
 | 
			
		||||
                            "%(host)s: %(capabilities)s") % locals())
 | 
			
		||||
        service_caps = self.service_states.get(service_name, {})
 | 
			
		||||
        service_caps[host] = capabilities
 | 
			
		||||
        self.service_states[service_name] = service_caps
 | 
			
		||||
 
 | 
			
		||||
@@ -97,18 +97,24 @@ class Service(object):
 | 
			
		||||
 | 
			
		||||
        conn1 = rpc.Connection.instance(new=True)
 | 
			
		||||
        conn2 = rpc.Connection.instance(new=True)
 | 
			
		||||
        conn3 = rpc.Connection.instance(new=True)
 | 
			
		||||
        if self.report_interval:
 | 
			
		||||
            consumer_all = rpc.AdapterConsumer(
 | 
			
		||||
            consumer_all = rpc.TopicAdapterConsumer(
 | 
			
		||||
                    connection=conn1,
 | 
			
		||||
                    topic=self.topic,
 | 
			
		||||
                    proxy=self)
 | 
			
		||||
            consumer_node = rpc.AdapterConsumer(
 | 
			
		||||
            consumer_node = rpc.TopicAdapterConsumer(
 | 
			
		||||
                    connection=conn2,
 | 
			
		||||
                    topic='%s.%s' % (self.topic, self.host),
 | 
			
		||||
                    proxy=self)
 | 
			
		||||
            fanout = rpc.FanoutAdapterConsumer(
 | 
			
		||||
                    connection=conn3,
 | 
			
		||||
                    topic=self.topic,
 | 
			
		||||
                    proxy=self)
 | 
			
		||||
 | 
			
		||||
            self.timers.append(consumer_all.attach_to_eventlet())
 | 
			
		||||
            self.timers.append(consumer_node.attach_to_eventlet())
 | 
			
		||||
            self.timers.append(fanout.attach_to_eventlet())
 | 
			
		||||
 | 
			
		||||
            pulse = utils.LoopingCall(self.report_state)
 | 
			
		||||
            pulse.start(interval=self.report_interval, now=False)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										57
									
								
								nova/test.py
									
									
									
									
									
								
							
							
						
						
									
										57
									
								
								nova/test.py
									
									
									
									
									
								
							@@ -150,3 +150,60 @@ class TestCase(unittest.TestCase):
 | 
			
		||||
 | 
			
		||||
        _wrapped.func_name = self.originalAttach.func_name
 | 
			
		||||
        rpc.Consumer.attach_to_eventlet = _wrapped
 | 
			
		||||
 | 
			
		||||
    # Useful assertions
 | 
			
		||||
    def assertDictMatch(self, d1, d2):
 | 
			
		||||
        """Assert two dicts are equivalent.
 | 
			
		||||
 | 
			
		||||
        This is a 'deep' match in the sense that it handles nested
 | 
			
		||||
        dictionaries appropriately.
 | 
			
		||||
 | 
			
		||||
        NOTE:
 | 
			
		||||
 | 
			
		||||
            If you don't care (or don't know) a given value, you can specify
 | 
			
		||||
            the string DONTCARE as the value. This will cause that dict-item
 | 
			
		||||
            to be skipped.
 | 
			
		||||
        """
 | 
			
		||||
        def raise_assertion(msg):
 | 
			
		||||
            d1str = str(d1)
 | 
			
		||||
            d2str = str(d2)
 | 
			
		||||
            base_msg = ("Dictionaries do not match. %(msg)s d1: %(d1str)s "
 | 
			
		||||
                        "d2: %(d2str)s" % locals())
 | 
			
		||||
            raise AssertionError(base_msg)
 | 
			
		||||
 | 
			
		||||
        d1keys = set(d1.keys())
 | 
			
		||||
        d2keys = set(d2.keys())
 | 
			
		||||
        if d1keys != d2keys:
 | 
			
		||||
            d1only = d1keys - d2keys
 | 
			
		||||
            d2only = d2keys - d1keys
 | 
			
		||||
            raise_assertion("Keys in d1 and not d2: %(d1only)s. "
 | 
			
		||||
                            "Keys in d2 and not d1: %(d2only)s" % locals())
 | 
			
		||||
 | 
			
		||||
        for key in d1keys:
 | 
			
		||||
            d1value = d1[key]
 | 
			
		||||
            d2value = d2[key]
 | 
			
		||||
            if hasattr(d1value, 'keys') and hasattr(d2value, 'keys'):
 | 
			
		||||
                self.assertDictMatch(d1value, d2value)
 | 
			
		||||
            elif 'DONTCARE' in (d1value, d2value):
 | 
			
		||||
                continue
 | 
			
		||||
            elif d1value != d2value:
 | 
			
		||||
                raise_assertion("d1['%(key)s']=%(d1value)s != "
 | 
			
		||||
                                "d2['%(key)s']=%(d2value)s" % locals())
 | 
			
		||||
 | 
			
		||||
    def assertDictListMatch(self, L1, L2):
 | 
			
		||||
        """Assert a list of dicts are equivalent"""
 | 
			
		||||
        def raise_assertion(msg):
 | 
			
		||||
            L1str = str(L1)
 | 
			
		||||
            L2str = str(L2)
 | 
			
		||||
            base_msg = ("List of dictionaries do not match: %(msg)s "
 | 
			
		||||
                        "L1: %(L1str)s L2: %(L2str)s" % locals())
 | 
			
		||||
            raise AssertionError(base_msg)
 | 
			
		||||
 | 
			
		||||
        L1count = len(L1)
 | 
			
		||||
        L2count = len(L2)
 | 
			
		||||
        if L1count != L2count:
 | 
			
		||||
            raise_assertion("Length mismatch: len(L1)=%(L1count)d != "
 | 
			
		||||
                            "len(L2)=%(L2count)d" % locals())
 | 
			
		||||
 | 
			
		||||
        for d1, d2 in zip(L1, L2):
 | 
			
		||||
            self.assertDictMatch(d1, d2)
 | 
			
		||||
 
 | 
			
		||||
@@ -25,12 +25,18 @@ import webob
 | 
			
		||||
from nova import compute
 | 
			
		||||
from nova import context
 | 
			
		||||
from nova import exception
 | 
			
		||||
from nova import network
 | 
			
		||||
from nova import test
 | 
			
		||||
from nova import volume
 | 
			
		||||
from nova import utils
 | 
			
		||||
from nova.api import direct
 | 
			
		||||
from nova.tests import test_cloud
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ArbitraryObject(object):
 | 
			
		||||
    pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeService(object):
 | 
			
		||||
    def echo(self, context, data):
 | 
			
		||||
        return {'data': data}
 | 
			
		||||
@@ -39,6 +45,9 @@ class FakeService(object):
 | 
			
		||||
        return {'user': context.user_id,
 | 
			
		||||
                'project': context.project_id}
 | 
			
		||||
 | 
			
		||||
    def invalid_return(self, context):
 | 
			
		||||
        return ArbitraryObject()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DirectTestCase(test.TestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
@@ -84,6 +93,12 @@ class DirectTestCase(test.TestCase):
 | 
			
		||||
        resp_parsed = json.loads(resp.body)
 | 
			
		||||
        self.assertEqual(resp_parsed['data'], 'foo')
 | 
			
		||||
 | 
			
		||||
    def test_invalid(self):
 | 
			
		||||
        req = webob.Request.blank('/fake/invalid_return')
 | 
			
		||||
        req.environ['openstack.context'] = self.context
 | 
			
		||||
        req.method = 'POST'
 | 
			
		||||
        self.assertRaises(exception.Error, req.get_response, self.router)
 | 
			
		||||
 | 
			
		||||
    def test_proxy(self):
 | 
			
		||||
        proxy = direct.Proxy(self.router)
 | 
			
		||||
        rv = proxy.fake.echo(self.context, data='baz')
 | 
			
		||||
@@ -93,12 +108,20 @@ class DirectTestCase(test.TestCase):
 | 
			
		||||
class DirectCloudTestCase(test_cloud.CloudTestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(DirectCloudTestCase, self).setUp()
 | 
			
		||||
        compute_handle = compute.API(network_api=self.cloud.network_api,
 | 
			
		||||
                                     volume_api=self.cloud.volume_api)
 | 
			
		||||
        compute_handle = compute.API(image_service=self.cloud.image_service)
 | 
			
		||||
        volume_handle = volume.API()
 | 
			
		||||
        network_handle = network.API()
 | 
			
		||||
        direct.register_service('compute', compute_handle)
 | 
			
		||||
        direct.register_service('volume', volume_handle)
 | 
			
		||||
        direct.register_service('network', network_handle)
 | 
			
		||||
 | 
			
		||||
        self.router = direct.JsonParamsMiddleware(direct.Router())
 | 
			
		||||
        proxy = direct.Proxy(self.router)
 | 
			
		||||
        self.cloud.compute_api = proxy.compute
 | 
			
		||||
        self.cloud.volume_api = proxy.volume
 | 
			
		||||
        self.cloud.network_api = proxy.network
 | 
			
		||||
        compute_handle.volume_api = proxy.volume
 | 
			
		||||
        compute_handle.network_api = proxy.network
 | 
			
		||||
 | 
			
		||||
    def tearDown(self):
 | 
			
		||||
        super(DirectCloudTestCase, self).tearDown()
 | 
			
		||||
 
 | 
			
		||||
@@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase):
 | 
			
		||||
        super(RpcTestCase, self).setUp()
 | 
			
		||||
        self.conn = rpc.Connection.instance(True)
 | 
			
		||||
        self.receiver = TestReceiver()
 | 
			
		||||
        self.consumer = rpc.AdapterConsumer(connection=self.conn,
 | 
			
		||||
        self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
 | 
			
		||||
                                            topic='test',
 | 
			
		||||
                                            proxy=self.receiver)
 | 
			
		||||
        self.consumer.attach_to_eventlet()
 | 
			
		||||
@@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase):
 | 
			
		||||
 | 
			
		||||
        nested = Nested()
 | 
			
		||||
        conn = rpc.Connection.instance(True)
 | 
			
		||||
        consumer = rpc.AdapterConsumer(connection=conn,
 | 
			
		||||
        consumer = rpc.TopicAdapterConsumer(connection=conn,
 | 
			
		||||
                                       topic='nested',
 | 
			
		||||
                                       proxy=nested)
 | 
			
		||||
        consumer.attach_to_eventlet()
 | 
			
		||||
 
 | 
			
		||||
@@ -21,6 +21,9 @@ Tests For Scheduler
 | 
			
		||||
 | 
			
		||||
import datetime
 | 
			
		||||
import mox
 | 
			
		||||
import novaclient.exceptions
 | 
			
		||||
import stubout
 | 
			
		||||
import webob
 | 
			
		||||
 | 
			
		||||
from mox import IgnoreArg
 | 
			
		||||
from nova import context
 | 
			
		||||
@@ -32,6 +35,7 @@ from nova import test
 | 
			
		||||
from nova import rpc
 | 
			
		||||
from nova import utils
 | 
			
		||||
from nova.auth import manager as auth_manager
 | 
			
		||||
from nova.scheduler import api
 | 
			
		||||
from nova.scheduler import manager
 | 
			
		||||
from nova.scheduler import driver
 | 
			
		||||
from nova.compute import power_state
 | 
			
		||||
@@ -937,3 +941,160 @@ class SimpleDriverTestCase(test.TestCase):
 | 
			
		||||
        db.instance_destroy(self.context, instance_id)
 | 
			
		||||
        db.service_destroy(self.context, s_ref['id'])
 | 
			
		||||
        db.service_destroy(self.context, s_ref2['id'])
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeZone(object):
 | 
			
		||||
    def __init__(self, api_url, username, password):
 | 
			
		||||
        self.api_url = api_url
 | 
			
		||||
        self.username = username
 | 
			
		||||
        self.password = password
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def zone_get_all(context):
 | 
			
		||||
    return [
 | 
			
		||||
                FakeZone('http://example.com', 'bob', 'xxx'),
 | 
			
		||||
           ]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeRerouteCompute(api.reroute_compute):
 | 
			
		||||
    def _call_child_zones(self, zones, function):
 | 
			
		||||
        return []
 | 
			
		||||
 | 
			
		||||
    def get_collection_context_and_id(self, args, kwargs):
 | 
			
		||||
        return ("servers", None, 1)
 | 
			
		||||
 | 
			
		||||
    def unmarshall_result(self, zone_responses):
 | 
			
		||||
        return dict(magic="found me")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def go_boom(self, context, instance):
 | 
			
		||||
    raise exception.InstanceNotFound("boom message", instance)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def found_instance(self, context, instance):
 | 
			
		||||
    return dict(name='myserver')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeResource(object):
 | 
			
		||||
    def __init__(self, attribute_dict):
 | 
			
		||||
        for k, v in attribute_dict.iteritems():
 | 
			
		||||
            setattr(self, k, v)
 | 
			
		||||
 | 
			
		||||
    def pause(self):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ZoneRedirectTest(test.TestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(ZoneRedirectTest, self).setUp()
 | 
			
		||||
        self.stubs = stubout.StubOutForTesting()
 | 
			
		||||
 | 
			
		||||
        self.stubs.Set(db, 'zone_get_all', zone_get_all)
 | 
			
		||||
 | 
			
		||||
        self.enable_zone_routing = FLAGS.enable_zone_routing
 | 
			
		||||
        FLAGS.enable_zone_routing = True
 | 
			
		||||
 | 
			
		||||
    def tearDown(self):
 | 
			
		||||
        self.stubs.UnsetAll()
 | 
			
		||||
        FLAGS.enable_zone_routing = self.enable_zone_routing
 | 
			
		||||
        super(ZoneRedirectTest, self).tearDown()
 | 
			
		||||
 | 
			
		||||
    def test_trap_found_locally(self):
 | 
			
		||||
        decorator = FakeRerouteCompute("foo")
 | 
			
		||||
        try:
 | 
			
		||||
            result = decorator(found_instance)(None, None, 1)
 | 
			
		||||
        except api.RedirectResult, e:
 | 
			
		||||
            self.fail(_("Successful database hit should succeed"))
 | 
			
		||||
 | 
			
		||||
    def test_trap_not_found_locally(self):
 | 
			
		||||
        decorator = FakeRerouteCompute("foo")
 | 
			
		||||
        try:
 | 
			
		||||
            result = decorator(go_boom)(None, None, 1)
 | 
			
		||||
            self.assertFail(_("Should have rerouted."))
 | 
			
		||||
        except api.RedirectResult, e:
 | 
			
		||||
            self.assertEquals(e.results['magic'], 'found me')
 | 
			
		||||
 | 
			
		||||
    def test_routing_flags(self):
 | 
			
		||||
        FLAGS.enable_zone_routing = False
 | 
			
		||||
        decorator = FakeRerouteCompute("foo")
 | 
			
		||||
        try:
 | 
			
		||||
            result = decorator(go_boom)(None, None, 1)
 | 
			
		||||
            self.assertFail(_("Should have thrown exception."))
 | 
			
		||||
        except exception.InstanceNotFound, e:
 | 
			
		||||
            self.assertEquals(e.message, 'boom message')
 | 
			
		||||
 | 
			
		||||
    def test_get_collection_context_and_id(self):
 | 
			
		||||
        decorator = api.reroute_compute("foo")
 | 
			
		||||
        self.assertEquals(decorator.get_collection_context_and_id(
 | 
			
		||||
            (None, 10, 20), {}), ("servers", 10, 20))
 | 
			
		||||
        self.assertEquals(decorator.get_collection_context_and_id(
 | 
			
		||||
            (None, 11,),  dict(instance_id=21)), ("servers", 11, 21))
 | 
			
		||||
        self.assertEquals(decorator.get_collection_context_and_id(
 | 
			
		||||
            (None,), dict(context=12, instance_id=22)), ("servers", 12, 22))
 | 
			
		||||
 | 
			
		||||
    def test_unmarshal_single_server(self):
 | 
			
		||||
        decorator = api.reroute_compute("foo")
 | 
			
		||||
        self.assertEquals(decorator.unmarshall_result([]), {})
 | 
			
		||||
        self.assertEquals(decorator.unmarshall_result(
 | 
			
		||||
                [FakeResource(dict(a=1, b=2)), ]),
 | 
			
		||||
                dict(server=dict(a=1, b=2)))
 | 
			
		||||
        self.assertEquals(decorator.unmarshall_result(
 | 
			
		||||
                [FakeResource(dict(a=1, _b=2)), ]),
 | 
			
		||||
                dict(server=dict(a=1,)))
 | 
			
		||||
        self.assertEquals(decorator.unmarshall_result(
 | 
			
		||||
                [FakeResource(dict(a=1, manager=2)), ]),
 | 
			
		||||
                dict(server=dict(a=1,)))
 | 
			
		||||
        self.assertEquals(decorator.unmarshall_result(
 | 
			
		||||
                [FakeResource(dict(_a=1, manager=2)), ]),
 | 
			
		||||
                dict(server={}))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeServerCollection(object):
 | 
			
		||||
    def get(self, instance_id):
 | 
			
		||||
        return FakeResource(dict(a=10, b=20))
 | 
			
		||||
 | 
			
		||||
    def find(self, name):
 | 
			
		||||
        return FakeResource(dict(a=11, b=22))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeEmptyServerCollection(object):
 | 
			
		||||
    def get(self, f):
 | 
			
		||||
        raise novaclient.NotFound(1)
 | 
			
		||||
 | 
			
		||||
    def find(self, name):
 | 
			
		||||
        raise novaclient.NotFound(2)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeNovaClient(object):
 | 
			
		||||
    def __init__(self, collection):
 | 
			
		||||
        self.servers = collection
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DynamicNovaClientTest(test.TestCase):
 | 
			
		||||
    def test_issue_novaclient_command_found(self):
 | 
			
		||||
        zone = FakeZone('http://example.com', 'bob', 'xxx')
 | 
			
		||||
        self.assertEquals(api._issue_novaclient_command(
 | 
			
		||||
                    FakeNovaClient(FakeServerCollection()),
 | 
			
		||||
                    zone, "servers", "get", 100).a, 10)
 | 
			
		||||
 | 
			
		||||
        self.assertEquals(api._issue_novaclient_command(
 | 
			
		||||
                    FakeNovaClient(FakeServerCollection()),
 | 
			
		||||
                    zone, "servers", "find", "name").b, 22)
 | 
			
		||||
 | 
			
		||||
        self.assertEquals(api._issue_novaclient_command(
 | 
			
		||||
                    FakeNovaClient(FakeServerCollection()),
 | 
			
		||||
                    zone, "servers", "pause", 100), None)
 | 
			
		||||
 | 
			
		||||
    def test_issue_novaclient_command_not_found(self):
 | 
			
		||||
        zone = FakeZone('http://example.com', 'bob', 'xxx')
 | 
			
		||||
        self.assertEquals(api._issue_novaclient_command(
 | 
			
		||||
                    FakeNovaClient(FakeEmptyServerCollection()),
 | 
			
		||||
                    zone, "servers", "get", 100), None)
 | 
			
		||||
 | 
			
		||||
        self.assertEquals(api._issue_novaclient_command(
 | 
			
		||||
                    FakeNovaClient(FakeEmptyServerCollection()),
 | 
			
		||||
                    zone, "servers", "find", "name"), None)
 | 
			
		||||
 | 
			
		||||
        self.assertEquals(api._issue_novaclient_command(
 | 
			
		||||
                    FakeNovaClient(FakeEmptyServerCollection()),
 | 
			
		||||
                    zone, "servers", "any", "name"), None)
 | 
			
		||||
 
 | 
			
		||||
@@ -796,7 +796,8 @@ class NWFilterTestCase(test.TestCase):
 | 
			
		||||
 | 
			
		||||
        instance_ref = db.instance_create(self.context,
 | 
			
		||||
                                          {'user_id': 'fake',
 | 
			
		||||
                                          'project_id': 'fake'})
 | 
			
		||||
                                          'project_id': 'fake',
 | 
			
		||||
                                          'mac_address': '00:A0:C9:14:C8:29'})
 | 
			
		||||
        inst_id = instance_ref['id']
 | 
			
		||||
 | 
			
		||||
        ip = '10.11.12.13'
 | 
			
		||||
@@ -813,7 +814,8 @@ class NWFilterTestCase(test.TestCase):
 | 
			
		||||
                                            'instance_id': instance_ref['id']})
 | 
			
		||||
 | 
			
		||||
        def _ensure_all_called():
 | 
			
		||||
            instance_filter = 'nova-instance-%s' % instance_ref['name']
 | 
			
		||||
            instance_filter = 'nova-instance-%s-%s' % (instance_ref['name'],
 | 
			
		||||
                                                       '00A0C914C829')
 | 
			
		||||
            secgroup_filter = 'nova-secgroup-%s' % self.security_group['id']
 | 
			
		||||
            for required in [secgroup_filter, 'allow-dhcp-server',
 | 
			
		||||
                             'no-arp-spoofing', 'no-ip-spoofing',
 | 
			
		||||
 
 | 
			
		||||
@@ -356,8 +356,8 @@ class ISCSITestCase(DriverTestCase):
 | 
			
		||||
        tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0])
 | 
			
		||||
        self.mox.StubOutWithMock(self.volume.driver, '_execute')
 | 
			
		||||
        self.volume.driver._execute("sudo", "ietadm", "--op", "show",
 | 
			
		||||
                                    "--tid=%(tid)d" % locals()
 | 
			
		||||
                                 ).AndRaise(exception.ProcessExecutionError())
 | 
			
		||||
                                    "--tid=%(tid)d" % locals()).AndRaise(
 | 
			
		||||
                                            exception.ProcessExecutionError())
 | 
			
		||||
 | 
			
		||||
        self.mox.ReplayAll()
 | 
			
		||||
        self.assertRaises(exception.ProcessExecutionError,
 | 
			
		||||
 
 | 
			
		||||
@@ -195,6 +195,7 @@ class XenAPIVMTestCase(test.TestCase):
 | 
			
		||||
        stubs.stubout_stream_disk(self.stubs)
 | 
			
		||||
        stubs.stubout_is_vdi_pv(self.stubs)
 | 
			
		||||
        self.stubs.Set(VMOps, 'reset_network', reset_network)
 | 
			
		||||
        stubs.stub_out_vm_methods(self.stubs)
 | 
			
		||||
        glance_stubs.stubout_glance_client(self.stubs,
 | 
			
		||||
                                           glance_stubs.FakeGlance)
 | 
			
		||||
        fake_utils.stub_out_utils_execute(self.stubs)
 | 
			
		||||
@@ -475,6 +476,18 @@ class XenAPIVMTestCase(test.TestCase):
 | 
			
		||||
            self.assertEquals(vif_rec['qos_algorithm_params']['kbps'],
 | 
			
		||||
                              str(4 * 1024))
 | 
			
		||||
 | 
			
		||||
    def test_rescue(self):
 | 
			
		||||
        self.flags(xenapi_inject_image=False)
 | 
			
		||||
        instance = self._create_instance()
 | 
			
		||||
        conn = xenapi_conn.get_connection(False)
 | 
			
		||||
        conn.rescue(instance, None)
 | 
			
		||||
 | 
			
		||||
    def test_unrescue(self):
 | 
			
		||||
        instance = self._create_instance()
 | 
			
		||||
        conn = xenapi_conn.get_connection(False)
 | 
			
		||||
        # Ensure that it will not unrescue a non-rescued instance.
 | 
			
		||||
        self.assertRaises(Exception, conn.unrescue, instance, None)
 | 
			
		||||
 | 
			
		||||
    def tearDown(self):
 | 
			
		||||
        super(XenAPIVMTestCase, self).tearDown()
 | 
			
		||||
        self.manager.delete_project(self.project)
 | 
			
		||||
 
 | 
			
		||||
@@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase):
 | 
			
		||||
        self.assertEquals(len(zm.zone_states), 1)
 | 
			
		||||
        self.assertEquals(zm.zone_states[1].username, 'user1')
 | 
			
		||||
 | 
			
		||||
    def test_service_capabilities(self):
 | 
			
		||||
        zm = zone_manager.ZoneManager()
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, None)
 | 
			
		||||
        self.assertEquals(caps, {})
 | 
			
		||||
 | 
			
		||||
        zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, None)
 | 
			
		||||
        self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
 | 
			
		||||
 | 
			
		||||
        zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, None)
 | 
			
		||||
        self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
 | 
			
		||||
 | 
			
		||||
        zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, None)
 | 
			
		||||
        self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
 | 
			
		||||
 | 
			
		||||
        zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, None)
 | 
			
		||||
        self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
 | 
			
		||||
                                     svc10_a=(99, 99), svc10_b=(99, 99)))
 | 
			
		||||
 | 
			
		||||
        zm.update_service_capabilities("svc1", "host3", dict(c=5))
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, None)
 | 
			
		||||
        self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
 | 
			
		||||
                                     svc1_c=(5, 5), svc10_a=(99, 99),
 | 
			
		||||
                                     svc10_b=(99, 99)))
 | 
			
		||||
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, 'svc1')
 | 
			
		||||
        self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
 | 
			
		||||
                                     svc1_c=(5, 5)))
 | 
			
		||||
        caps = zm.get_zone_capabilities(self, 'svc10')
 | 
			
		||||
        self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
 | 
			
		||||
 | 
			
		||||
    def test_refresh_from_db_replace_existing(self):
 | 
			
		||||
        zm = zone_manager.ZoneManager()
 | 
			
		||||
        zone_state = zone_manager.ZoneState()
 | 
			
		||||
 
 | 
			
		||||
@@ -189,6 +189,25 @@ class FakeSessionForVMTests(fake.SessionBase):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def stub_out_vm_methods(stubs):
 | 
			
		||||
    def fake_shutdown(self, inst, vm, method="clean"):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def fake_acquire_bootlock(self, vm):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def fake_release_bootlock(self, vm):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def fake_spawn_rescue(self, inst):
 | 
			
		||||
        inst._rescue = False
 | 
			
		||||
 | 
			
		||||
    stubs.Set(vmops.VMOps, "_shutdown", fake_shutdown)
 | 
			
		||||
    stubs.Set(vmops.VMOps, "_acquire_bootlock", fake_acquire_bootlock)
 | 
			
		||||
    stubs.Set(vmops.VMOps, "_release_bootlock", fake_release_bootlock)
 | 
			
		||||
    stubs.Set(vmops.VMOps, "spawn_rescue", fake_spawn_rescue)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeSessionForVolumeTests(fake.SessionBase):
 | 
			
		||||
    """ Stubs out a XenAPISession for Volume tests """
 | 
			
		||||
    def __init__(self, uri):
 | 
			
		||||
 
 | 
			
		||||
@@ -171,10 +171,6 @@ def execute(*cmd, **kwargs):
 | 
			
		||||
                                                stdout=stdout,
 | 
			
		||||
                                                stderr=stderr,
 | 
			
		||||
                                                cmd=' '.join(cmd))
 | 
			
		||||
            # NOTE(termie): this appears to be necessary to let the subprocess
 | 
			
		||||
            #               call clean something up in between calls, without
 | 
			
		||||
            #               it two execute calls in a row hangs the second one
 | 
			
		||||
            greenthread.sleep(0)
 | 
			
		||||
            return result
 | 
			
		||||
        except ProcessExecutionError:
 | 
			
		||||
            if not attempts:
 | 
			
		||||
@@ -183,6 +179,11 @@ def execute(*cmd, **kwargs):
 | 
			
		||||
                LOG.debug(_("%r failed. Retrying."), cmd)
 | 
			
		||||
                if delay_on_retry:
 | 
			
		||||
                    greenthread.sleep(random.randint(20, 200) / 100.0)
 | 
			
		||||
        finally:
 | 
			
		||||
            # NOTE(termie): this appears to be necessary to let the subprocess
 | 
			
		||||
            #               call clean something up in between calls, without
 | 
			
		||||
            #               it two execute calls in a row hangs the second one
 | 
			
		||||
            greenthread.sleep(0)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def ssh_execute(ssh, cmd, process_input=None,
 | 
			
		||||
@@ -310,11 +311,15 @@ def  get_my_linklocal(interface):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def to_global_ipv6(prefix, mac):
 | 
			
		||||
    try:
 | 
			
		||||
        mac64 = netaddr.EUI(mac).eui64().words
 | 
			
		||||
        int_addr = int(''.join(['%02x' % i for i in mac64]), 16)
 | 
			
		||||
        mac64_addr = netaddr.IPAddress(int_addr)
 | 
			
		||||
        maskIP = netaddr.IPNetwork(prefix).ip
 | 
			
		||||
    return (mac64_addr ^ netaddr.IPAddress('::0200:0:0:0') | maskIP).format()
 | 
			
		||||
        return (mac64_addr ^ netaddr.IPAddress('::0200:0:0:0') | maskIP).\
 | 
			
		||||
                                                                    format()
 | 
			
		||||
    except TypeError:
 | 
			
		||||
        raise TypeError(_("Bad mac for to_global_ipv6: %s") % mac)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def to_mac(ipv6_address):
 | 
			
		||||
@@ -336,11 +341,8 @@ utcnow.override_time = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_older_than(before, seconds):
 | 
			
		||||
    """Return True if before is older than 'seconds'"""
 | 
			
		||||
    if utcnow() - before > datetime.timedelta(seconds=seconds):
 | 
			
		||||
        return True
 | 
			
		||||
    else:
 | 
			
		||||
        return False
 | 
			
		||||
    """Return True if before is older than seconds"""
 | 
			
		||||
    return utcnow() - before > datetime.timedelta(seconds=seconds)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def utcnow_ts():
 | 
			
		||||
@@ -663,6 +665,48 @@ def get_from_path(items, path):
 | 
			
		||||
        return get_from_path(results, remainder)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def flatten_dict(dict_, flattened=None):
 | 
			
		||||
    """Recursively flatten a nested dictionary"""
 | 
			
		||||
    flattened = flattened or {}
 | 
			
		||||
    for key, value in dict_.iteritems():
 | 
			
		||||
        if hasattr(value, 'iteritems'):
 | 
			
		||||
            flatten_dict(value, flattened)
 | 
			
		||||
        else:
 | 
			
		||||
            flattened[key] = value
 | 
			
		||||
    return flattened
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def partition_dict(dict_, keys):
 | 
			
		||||
    """Return two dicts, one containing only `keys` the other containing
 | 
			
		||||
    everything but `keys`
 | 
			
		||||
    """
 | 
			
		||||
    intersection = {}
 | 
			
		||||
    difference = {}
 | 
			
		||||
    for key, value in dict_.iteritems():
 | 
			
		||||
        if key in keys:
 | 
			
		||||
            intersection[key] = value
 | 
			
		||||
        else:
 | 
			
		||||
            difference[key] = value
 | 
			
		||||
    return intersection, difference
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def map_dict_keys(dict_, key_map):
 | 
			
		||||
    """Return a dictionary in which the dictionaries keys are mapped to
 | 
			
		||||
    new keys.
 | 
			
		||||
    """
 | 
			
		||||
    mapped = {}
 | 
			
		||||
    for key, value in dict_.iteritems():
 | 
			
		||||
        mapped_key = key_map[key] if key in key_map else key
 | 
			
		||||
        mapped[mapped_key] = value
 | 
			
		||||
    return mapped
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def subset_dict(dict_, keys):
 | 
			
		||||
    """Return a dict that only contains a subset of keys"""
 | 
			
		||||
    subset = partition_dict(dict_, keys)[0]
 | 
			
		||||
    return subset
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def check_isinstance(obj, cls):
 | 
			
		||||
    """Checks that obj is of type cls, and lets PyLint infer types"""
 | 
			
		||||
    if isinstance(obj, cls):
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user