Merging trunk

This commit is contained in:
Rick Harris
2011-03-24 21:13:55 +00:00
59 changed files with 1878 additions and 505 deletions

View File

@@ -97,6 +97,7 @@ flags.DECLARE('vlan_start', 'nova.network.manager')
flags.DECLARE('vpn_start', 'nova.network.manager')
flags.DECLARE('fixed_range_v6', 'nova.network.manager')
flags.DECLARE('images_path', 'nova.image.local')
flags.DECLARE('libvirt_type', 'nova.virt.libvirt_conn')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())

View File

@@ -67,11 +67,14 @@ paste.app_factory = nova.api.ec2.metadatarequesthandler:MetadataRequestHandler.f
[composite:osapi]
use = egg:Paste#urlmap
/: osversions
/v1.0: openstackapi
/v1.1: openstackapi
/v1.0: openstackapi10
/v1.1: openstackapi11
[pipeline:openstackapi]
pipeline = faultwrap auth ratelimit osapiapp
[pipeline:openstackapi10]
pipeline = faultwrap auth ratelimit osapiapp10
[pipeline:openstackapi11]
pipeline = faultwrap auth ratelimit osapiapp11
[filter:faultwrap]
paste.filter_factory = nova.api.openstack:FaultWrapper.factory
@@ -82,8 +85,11 @@ paste.filter_factory = nova.api.openstack.auth:AuthMiddleware.factory
[filter:ratelimit]
paste.filter_factory = nova.api.openstack.limits:RateLimitingMiddleware.factory
[app:osapiapp]
paste.app_factory = nova.api.openstack:APIRouter.factory
[app:osapiapp10]
paste.app_factory = nova.api.openstack:APIRouterV10.factory
[app:osapiapp11]
paste.app_factory = nova.api.openstack:APIRouterV11.factory
[pipeline:osversions]
pipeline = faultwrap osversionapp

View File

@@ -72,9 +72,14 @@ class APIRouter(wsgi.Router):
return cls()
def __init__(self):
self.server_members = {}
mapper = routes.Mapper()
self._setup_routes(mapper)
super(APIRouter, self).__init__(mapper)
server_members = {'action': 'POST'}
def _setup_routes(self, mapper):
server_members = self.server_members
server_members['action'] = 'POST'
if FLAGS.allow_admin_api:
LOG.debug(_("Including admin operations in API."))
@@ -99,10 +104,6 @@ class APIRouter(wsgi.Router):
controller=accounts.Controller(),
collection={'detail': 'GET'})
mapper.resource("server", "servers", controller=servers.Controller(),
collection={'detail': 'GET'},
member=server_members)
mapper.resource("backup_schedule", "backup_schedule",
controller=backup_schedules.Controller(),
parent_resource=dict(member_name='server',
@@ -126,7 +127,27 @@ class APIRouter(wsgi.Router):
_limits = limits.LimitsController()
mapper.resource("limit", "limits", controller=_limits)
super(APIRouter, self).__init__(mapper)
class APIRouterV10(APIRouter):
"""Define routes specific to OpenStack API V1.0."""
def _setup_routes(self, mapper):
super(APIRouterV10, self)._setup_routes(mapper)
mapper.resource("server", "servers",
controller=servers.ControllerV10(),
collection={'detail': 'GET'},
member=self.server_members)
class APIRouterV11(APIRouter):
"""Define routes specific to OpenStack API V1.1."""
def _setup_routes(self, mapper):
super(APIRouterV11, self)._setup_routes(mapper)
mapper.resource("server", "servers",
controller=servers.ControllerV11(),
collection={'detail': 'GET'},
member=self.server_members)
class Versions(wsgi.Application):

View File

@@ -69,8 +69,6 @@ class AuthMiddleware(wsgi.Middleware):
return faults.Fault(webob.exc.HTTPUnauthorized())
req.environ['nova.context'] = context.RequestContext(user, account)
version = req.path.split('/')[1].replace('v', '')
req.environ['api.version'] = version
return self.application
def has_authentication(self, req):

View File

@@ -15,7 +15,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import webob.exc
from urlparse import urlparse
import webob
from nova import exception
@@ -76,5 +78,14 @@ def get_image_id_from_image_hash(image_service, context, image_hash):
raise exception.NotFound(image_hash)
def get_api_version(req):
return req.environ.get('api.version')
def get_id_from_href(href):
"""Return the id portion of a url as an int.
Given: http://www.foo.com/bar/123?q=4
Returns: 123
"""
try:
return int(urlparse(href).path.split('/')[-1])
except:
raise webob.exc.HTTPBadRequest(_('could not parse id from href'))

View File

@@ -160,40 +160,33 @@ def _translate_from_image_service_to_api(image_metadata):
# 4. Format values
# 4a. Format Image Status (API requires uppercase)
status_service2api = {'queued': 'QUEUED',
'preparing': 'PREPARING',
'saving': 'SAVING',
'active': 'ACTIVE',
'killed': 'FAILED'}
api_metadata['status'] = status_service2api[api_metadata['status']]
api_metadata['status'] = _format_status_for_api(api_metadata['status'])
# 4b. Format timestamps
def _format_timestamp(dt_str):
"""Return a timestamp formatted for OpenStack API
NOTE(sirp):
ImageService (specifically GlanceImageService) is currently
returning timestamps as strings. This should probably be datetime
objects. In the mean time, we work around this by using strptime() to
create datetime objects.
"""
if dt_str is None:
return None
service_timestamp_fmt = "%Y-%m-%dT%H:%M:%S"
api_timestamp_fmt = "%Y-%m-%dT%H:%M:%SZ"
dt = datetime.datetime.strptime(dt_str, service_timestamp_fmt)
return dt.strftime(api_timestamp_fmt)
for ts_attr in ('created', 'updated'):
if ts_attr in api_metadata:
formatted_timestamp = _format_timestamp(api_metadata[ts_attr])
api_metadata[ts_attr] = formatted_timestamp
for attr in ('created', 'updated'):
if attr in api_metadata:
api_metadata[attr] = _format_datetime_for_api(
api_metadata[attr])
return api_metadata
def _format_status_for_api(status):
"""Return status in a format compliant with OpenStack API"""
mapping = {'queued': 'QUEUED',
'preparing': 'PREPARING',
'saving': 'SAVING',
'active': 'ACTIVE',
'killed': 'FAILED'}
return mapping[status]
def _format_datetime_for_api(datetime_):
"""Stringify datetime objects in a format compliant with OpenStack API"""
API_DATETIME_FMT = '%Y-%m-%dT%H:%M:%SZ'
return datetime_.strftime(API_DATETIME_FMT)
def _safe_translate(image_metadata):
"""Translate attributes for OpenStack API, temporary workaround for
S3ImageService attribute leakage.

View File

@@ -22,6 +22,7 @@ from xml.dom import minidom
from webob import exc
from nova import compute
from nova import context
from nova import exception
from nova import flags
from nova import log as logging
@@ -29,8 +30,9 @@ from nova import wsgi
from nova import utils
from nova.api.openstack import common
from nova.api.openstack import faults
from nova.api.openstack.views import servers as servers_views
from nova.api.openstack.views import addresses as addresses_views
import nova.api.openstack.views.addresses
import nova.api.openstack.views.flavors
import nova.api.openstack.views.servers
from nova.auth import manager as auth_manager
from nova.compute import instance_types
from nova.compute import power_state
@@ -63,7 +65,7 @@ class Controller(wsgi.Controller):
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
builder = addresses_views.get_view_builder(req)
builder = self._get_addresses_view_builder(req)
return builder.build(instance)
def index(self, req):
@@ -81,7 +83,7 @@ class Controller(wsgi.Controller):
"""
instance_list = self.compute_api.get_all(req.environ['nova.context'])
limited_list = common.limited(instance_list, req)
builder = servers_views.get_view_builder(req)
builder = self._get_view_builder(req)
servers = [builder.build(inst, is_detail)['server']
for inst in limited_list]
return dict(servers=servers)
@@ -90,7 +92,7 @@ class Controller(wsgi.Controller):
""" Returns server details by server id """
try:
instance = self.compute_api.get(req.environ['nova.context'], id)
builder = servers_views.get_view_builder(req)
builder = self._get_view_builder(req)
return builder.build(instance, is_detail=True)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
@@ -119,8 +121,9 @@ class Controller(wsgi.Controller):
key_name = key_pair['name']
key_data = key_pair['public_key']
requested_image_id = self._image_id_from_req_data(env)
image_id = common.get_image_id_from_image_hash(self._image_service,
context, env['server']['imageId'])
context, requested_image_id)
kernel_id, ramdisk_id = self._get_kernel_ramdisk_from_image(
req, image_id)
@@ -139,10 +142,11 @@ class Controller(wsgi.Controller):
if personality:
injected_files = self._get_injected_files(personality)
flavor_id = self._flavor_id_from_req_data(env)
try:
instances = self.compute_api.create(
(inst,) = self.compute_api.create(
context,
instance_types.get_by_flavor_id(env['server']['flavorId']),
instance_types.get_by_flavor_id(flavor_id),
image_id,
kernel_id=kernel_id,
ramdisk_id=ramdisk_id,
@@ -155,8 +159,11 @@ class Controller(wsgi.Controller):
except QuotaError as error:
self._handle_quota_errors(error)
builder = servers_views.get_view_builder(req)
server = builder.build(instances[0], is_detail=False)
inst['instance_type'] = flavor_id
inst['image_id'] = requested_image_id
builder = self._get_view_builder(req)
server = builder.build(inst, is_detail=True)
password = "%s%s" % (server['server']['name'][:4],
utils.generate_password(12))
server['server']['adminPass'] = password
@@ -511,6 +518,45 @@ class Controller(wsgi.Controller):
return kernel_id, ramdisk_id
class ControllerV10(Controller):
def _image_id_from_req_data(self, data):
return data['server']['imageId']
def _flavor_id_from_req_data(self, data):
return data['server']['flavorId']
def _get_view_builder(self, req):
addresses_builder = nova.api.openstack.views.addresses.ViewBuilderV10()
return nova.api.openstack.views.servers.ViewBuilderV10(
addresses_builder)
def _get_addresses_view_builder(self, req):
return nova.api.openstack.views.addresses.ViewBuilderV10(req)
class ControllerV11(Controller):
def _image_id_from_req_data(self, data):
href = data['server']['imageRef']
return common.get_id_from_href(href)
def _flavor_id_from_req_data(self, data):
href = data['server']['flavorRef']
return common.get_id_from_href(href)
def _get_view_builder(self, req):
base_url = req.application_url
flavor_builder = nova.api.openstack.views.flavors.ViewBuilderV11(
base_url)
image_builder = nova.api.openstack.views.images.ViewBuilderV11(
base_url)
addresses_builder = nova.api.openstack.views.addresses.ViewBuilderV11()
return nova.api.openstack.views.servers.ViewBuilderV11(
addresses_builder, flavor_builder, image_builder)
def _get_addresses_view_builder(self, req):
return nova.api.openstack.views.addresses.ViewBuilderV11(req)
class ServerCreateRequestXMLDeserializer(object):
"""
Deserializer to handle xml-formatted server create requests.

View File

@@ -19,18 +19,6 @@ from nova import utils
from nova.api.openstack import common
def get_view_builder(req):
'''
A factory method that returns the correct builder based on the version of
the api requested.
'''
version = common.get_api_version(req)
if version == '1.1':
return ViewBuilder_1_1()
else:
return ViewBuilder_1_0()
class ViewBuilder(object):
''' Models a server addresses response as a python dictionary.'''
@@ -38,14 +26,14 @@ class ViewBuilder(object):
raise NotImplementedError()
class ViewBuilder_1_0(ViewBuilder):
class ViewBuilderV10(ViewBuilder):
def build(self, inst):
private_ips = utils.get_from_path(inst, 'fixed_ip/address')
public_ips = utils.get_from_path(inst, 'fixed_ip/floating_ips/address')
return dict(public=public_ips, private=private_ips)
class ViewBuilder_1_1(ViewBuilder):
class ViewBuilderV11(ViewBuilder):
def build(self, inst):
private_ips = utils.get_from_path(inst, 'fixed_ip/address')
private_ips = [dict(version=4, addr=a) for a in private_ips]

View File

@@ -18,19 +18,6 @@
from nova.api.openstack import common
def get_view_builder(req):
'''
A factory method that returns the correct builder based on the version of
the api requested.
'''
version = common.get_api_version(req)
base_url = req.application_url
if version == '1.1':
return ViewBuilder_1_1(base_url)
else:
return ViewBuilder_1_0()
class ViewBuilder(object):
def __init__(self):
pass
@@ -39,13 +26,9 @@ class ViewBuilder(object):
raise NotImplementedError()
class ViewBuilder_1_1(ViewBuilder):
class ViewBuilderV11(ViewBuilder):
def __init__(self, base_url):
self.base_url = base_url
def generate_href(self, flavor_id):
return "%s/flavors/%s" % (self.base_url, flavor_id)
class ViewBuilder_1_0(ViewBuilder):
pass

View File

@@ -18,19 +18,6 @@
from nova.api.openstack import common
def get_view_builder(req):
'''
A factory method that returns the correct builder based on the version of
the api requested.
'''
version = common.get_api_version(req)
base_url = req.application_url
if version == '1.1':
return ViewBuilder_1_1(base_url)
else:
return ViewBuilder_1_0()
class ViewBuilder(object):
def __init__(self):
pass
@@ -39,13 +26,9 @@ class ViewBuilder(object):
raise NotImplementedError()
class ViewBuilder_1_1(ViewBuilder):
class ViewBuilderV11(ViewBuilder):
def __init__(self, base_url):
self.base_url = base_url
def generate_href(self, image_id):
return "%s/images/%s" % (self.base_url, image_id)
class ViewBuilder_1_0(ViewBuilder):
pass

View File

@@ -16,7 +16,10 @@
# under the License.
import hashlib
from nova.compute import power_state
import nova.compute
import nova.context
from nova.api.openstack import common
from nova.api.openstack.views import addresses as addresses_view
from nova.api.openstack.views import flavors as flavors_view
@@ -24,45 +27,30 @@ from nova.api.openstack.views import images as images_view
from nova import utils
def get_view_builder(req):
'''
A factory method that returns the correct builder based on the version of
the api requested.
'''
version = common.get_api_version(req)
addresses_builder = addresses_view.get_view_builder(req)
if version == '1.1':
flavor_builder = flavors_view.get_view_builder(req)
image_builder = images_view.get_view_builder(req)
return ViewBuilder_1_1(addresses_builder, flavor_builder,
image_builder)
else:
return ViewBuilder_1_0(addresses_builder)
class ViewBuilder(object):
'''
Models a server response as a python dictionary.
"""Model a server response as a python dictionary.
Public methods: build
Abstract methods: _build_image, _build_flavor
'''
"""
def __init__(self, addresses_builder):
self.addresses_builder = addresses_builder
def build(self, inst, is_detail):
"""
Coerces into dictionary format, mapping everything to
Rackspace-like attributes for return
"""
"""Return a dict that represenst a server."""
if is_detail:
return self._build_detail(inst)
else:
return self._build_simple(inst)
def _build_simple(self, inst):
return dict(server=dict(id=inst['id'], name=inst['display_name']))
"""Return a simple model of a server."""
return dict(server=dict(id=inst['id'], name=inst['display_name']))
def _build_detail(self, inst):
"""Returns a detailed model of a server."""
power_mapping = {
None: 'build',
power_state.NOSTATE: 'build',
@@ -74,27 +62,26 @@ class ViewBuilder(object):
power_state.SHUTOFF: 'active',
power_state.CRASHED: 'error',
power_state.FAILED: 'error'}
inst_dict = {}
#mapped_keys = dict(status='state', imageId='image_id',
# flavorId='instance_type', name='display_name', id='id')
inst_dict = {
'id': int(inst['id']),
'name': inst['display_name'],
'addresses': self.addresses_builder.build(inst),
'status': power_mapping[inst.get('state')]}
mapped_keys = dict(status='state', name='display_name', id='id')
for k, v in mapped_keys.iteritems():
inst_dict[k] = inst[v]
inst_dict['status'] = power_mapping[inst_dict['status']]
inst_dict['addresses'] = self.addresses_builder.build(inst)
ctxt = nova.context.get_admin_context()
compute_api = nova.compute.API()
if compute_api.has_finished_migration(ctxt, inst['id']):
inst_dict['status'] = 'resize-confirm'
# Return the metadata as a dictionary
metadata = {}
for item in inst['metadata']:
for item in inst.get('metadata', []):
metadata[item['key']] = item['value']
inst_dict['metadata'] = metadata
inst_dict['hostId'] = ''
if inst['host']:
if inst.get('host'):
inst_dict['hostId'] = hashlib.sha224(inst['host']).hexdigest()
self._build_image(inst_dict, inst)
@@ -103,21 +90,27 @@ class ViewBuilder(object):
return dict(server=inst_dict)
def _build_image(self, response, inst):
"""Return the image sub-resource of a server."""
raise NotImplementedError()
def _build_flavor(self, response, inst):
"""Return the flavor sub-resource of a server."""
raise NotImplementedError()
class ViewBuilder_1_0(ViewBuilder):
class ViewBuilderV10(ViewBuilder):
"""Model an Openstack API V1.0 server response."""
def _build_image(self, response, inst):
response["imageId"] = inst["image_id"]
response['imageId'] = inst['image_id']
def _build_flavor(self, response, inst):
response["flavorId"] = inst["instance_type"]
response['flavorId'] = inst['instance_type']
class ViewBuilder_1_1(ViewBuilder):
class ViewBuilderV11(ViewBuilder):
"""Model an Openstack API V1.0 server response."""
def __init__(self, addresses_builder, flavor_builder, image_builder):
ViewBuilder.__init__(self, addresses_builder)
self.flavor_builder = flavor_builder

View File

@@ -15,9 +15,9 @@
import common
from nova import db
from nova import flags
from nova import wsgi
from nova import db
from nova.scheduler import api
@@ -52,7 +52,7 @@ class Controller(wsgi.Controller):
"""Return all zones in brief"""
# Ask the ZoneManager in the Scheduler for most recent data,
# or fall-back to the database ...
items = api.API().get_zone_list(req.environ['nova.context'])
items = api.get_zone_list(req.environ['nova.context'])
if not items:
items = db.zone_get_all(req.environ['nova.context'])
@@ -67,8 +67,16 @@ class Controller(wsgi.Controller):
def info(self, req):
"""Return name and capabilities for this zone."""
return dict(zone=dict(name=FLAGS.zone_name,
capabilities=FLAGS.zone_capabilities))
items = api.get_zone_capabilities(req.environ['nova.context'])
zone = dict(name=FLAGS.zone_name)
caps = FLAGS.zone_capabilities
for cap in caps:
key, value = cap.split('=')
zone[key] = value
for item, (min_value, max_value) in items.iteritems():
zone[item] = "%s,%s" % (min_value, max_value)
return dict(zone=zone)
def show(self, req, id):
"""Return data about the given zone id"""

View File

@@ -253,6 +253,16 @@ class API(base.Base):
return [dict(x.iteritems()) for x in instances]
def has_finished_migration(self, context, instance_id):
"""Retrieves whether or not a finished migration exists for
an instance"""
try:
db.migration_get_by_instance_and_status(context, instance_id,
'finished')
return True
except exception.NotFound:
return False
def ensure_default_security_group(self, context):
""" Create security group for the security context if it
does not already exist
@@ -473,6 +483,8 @@ class API(base.Base):
params = {'migration_id': migration_ref['id']}
self._cast_compute_message('revert_resize', context, instance_id,
migration_ref['dest_compute'], params=params)
self.db.migration_update(context, migration_ref['id'],
{'status': 'reverted'})
def confirm_resize(self, context, instance_id):
"""Confirms a migration/resize, deleting the 'old' instance in the
@@ -488,17 +500,41 @@ class API(base.Base):
self._cast_compute_message('confirm_resize', context, instance_id,
migration_ref['source_compute'], params=params)
self.db.migration_update(context, migration_id,
self.db.migration_update(context, migration_ref['id'],
{'status': 'confirmed'})
self.db.instance_update(context, instance_id,
{'host': migration_ref['dest_compute'], })
def resize(self, context, instance_id, flavor):
def resize(self, context, instance_id, flavor_id):
"""Resize a running instance."""
instance = self.db.instance_get(context, instance_id)
current_instance_type = self.db.instance_type_get_by_name(
context, instance['instance_type'])
new_instance_type = self.db.instance_type_get_by_flavor_id(
context, flavor_id)
current_instance_type_name = current_instance_type['name']
new_instance_type_name = new_instance_type['name']
LOG.debug(_("Old instance type %(current_instance_type_name)s, "
" new instance type %(new_instance_type_name)s") % locals())
if not new_instance_type:
raise exception.ApiError(_("Requested flavor %(flavor_id)d "
"does not exist") % locals())
current_memory_mb = current_instance_type['memory_mb']
new_memory_mb = new_instance_type['memory_mb']
if current_memory_mb > new_memory_mb:
raise exception.ApiError(_("Invalid flavor: cannot downsize"
"instances"))
if current_memory_mb == new_memory_mb:
raise exception.ApiError(_("Invalid flavor: cannot use"
"the same flavor. "))
self._cast_scheduler_message(context,
{"method": "prep_resize",
"args": {"topic": FLAGS.compute_topic,
"instance_id": instance_id, }},)
"instance_id": instance_id,
"flavor_id": flavor_id}})
def pause(self, context, instance_id):
"""Pause the given instance."""

View File

@@ -2,6 +2,7 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -41,9 +42,10 @@ import string
import socket
import sys
import tempfile
import time
import functools
from eventlet import greenthread
from nova import exception
from nova import flags
from nova import log as logging
@@ -51,6 +53,7 @@ from nova import manager
from nova import rpc
from nova import utils
from nova.compute import power_state
from nova.virt import driver
FLAGS = flags.FLAGS
flags.DEFINE_string('instances_path', '$state_path/instances',
@@ -65,8 +68,11 @@ flags.DEFINE_string('console_host', socket.gethostname(),
'Console proxy host to use to connect to instances on'
'this host.')
flags.DEFINE_integer('live_migration_retry_count', 30,
("Retry count needed in live_migration."
" sleep 1 sec for each count"))
"Retry count needed in live_migration."
" sleep 1 sec for each count")
flags.DEFINE_integer("rescue_timeout", 0,
"Automatically unrescue an instance after N seconds."
" Set to 0 to disable.")
LOG = logging.getLogger('nova.compute.manager')
@@ -105,7 +111,7 @@ def checks_instance_lock(function):
return decorated_function
class ComputeManager(manager.Manager):
class ComputeManager(manager.SchedulerDependentManager):
"""Manages the running instances from creation to destruction."""
@@ -117,14 +123,17 @@ class ComputeManager(manager.Manager):
compute_driver = FLAGS.compute_driver
try:
self.driver = utils.import_object(compute_driver)
except ImportError:
LOG.error("Unable to load the virtualization driver.")
self.driver = utils.check_isinstance(
utils.import_object(compute_driver),
driver.ComputeDriver)
except ImportError as e:
LOG.error(_("Unable to load the virtualization driver: %s") % (e))
sys.exit(1)
self.network_manager = utils.import_object(FLAGS.network_manager)
self.volume_manager = utils.import_object(FLAGS.volume_manager)
super(ComputeManager, self).__init__(*args, **kwargs)
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
def init_host(self):
"""Do any initialization that needs to be run if this is a
@@ -132,6 +141,12 @@ class ComputeManager(manager.Manager):
"""
self.driver.init_host(host=self.host)
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval."""
super(ComputeManager, self).periodic_tasks(context)
if FLAGS.rescue_timeout > 0:
self.driver.poll_rescued_instances(FLAGS.rescue_timeout)
def _update_state(self, context, instance_id):
"""Update the state of an instance from the driver info."""
# FIXME(ja): include other fields from state?
@@ -437,25 +452,41 @@ class ComputeManager(manager.Manager):
instance_ref = self.db.instance_get(context, instance_id)
migration_ref = self.db.migration_get(context, migration_id)
#TODO(mdietz): we may want to split these into separate methods.
if migration_ref['source_compute'] == FLAGS.host:
self.driver._start(instance_ref)
self.db.migration_update(context, migration_id,
{'status': 'reverted'})
else:
self.driver.destroy(instance_ref)
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
rpc.cast(context, topic,
{'method': 'revert_resize',
'args': {
'migration_id': migration_ref['id'],
'instance_id': instance_id, },
})
self.driver.destroy(instance_ref)
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
rpc.cast(context, topic,
{'method': 'finish_revert_resize',
'args': {
'migration_id': migration_ref['id'],
'instance_id': instance_id, },
})
@exception.wrap_exception
@checks_instance_lock
def prep_resize(self, context, instance_id):
def finish_revert_resize(self, context, instance_id, migration_id):
"""Finishes the second half of reverting a resize, powering back on
the source instance and reverting the resized attributes in the
database"""
instance_ref = self.db.instance_get(context, instance_id)
migration_ref = self.db.migration_get(context, migration_id)
instance_type = self.db.instance_type_get_by_flavor_id(context,
migration_ref['old_flavor_id'])
# Just roll back the record. There's no need to resize down since
# the 'old' VM already has the preferred attributes
self.db.instance_update(context, instance_id,
dict(memory_mb=instance_type['memory_mb'],
vcpus=instance_type['vcpus'],
local_gb=instance_type['local_gb']))
self.driver.revert_resize(instance_ref)
self.db.migration_update(context, migration_id,
{'status': 'reverted'})
@exception.wrap_exception
@checks_instance_lock
def prep_resize(self, context, instance_id, flavor_id):
"""Initiates the process of moving a running instance to another
host, possibly changing the RAM and disk size in the process"""
context = context.elevated()
@@ -464,12 +495,17 @@ class ComputeManager(manager.Manager):
raise exception.Error(_(
'Migration error: destination same as source!'))
instance_type = self.db.instance_type_get_by_flavor_id(context,
flavor_id)
migration_ref = self.db.migration_create(context,
{'instance_id': instance_id,
'source_compute': instance_ref['host'],
'dest_compute': FLAGS.host,
'dest_host': self.driver.get_host_ip_addr(),
'old_flavor_id': instance_type['flavorid'],
'new_flavor_id': flavor_id,
'status': 'pre-migrating'})
LOG.audit(_('instance %s: migrating to '), instance_id,
context=context)
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
@@ -495,8 +531,6 @@ class ComputeManager(manager.Manager):
self.db.migration_update(context, migration_id,
{'status': 'post-migrating', })
#TODO(mdietz): This is where we would update the VM record
#after resizing
service = self.db.service_get_by_host_and_topic(context,
migration_ref['dest_compute'], FLAGS.compute_topic)
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
@@ -517,7 +551,19 @@ class ComputeManager(manager.Manager):
migration_ref = self.db.migration_get(context, migration_id)
instance_ref = self.db.instance_get(context,
migration_ref['instance_id'])
# TODO(mdietz): apply the rest of the instance_type attributes going
# after they're supported
instance_type = self.db.instance_type_get_by_flavor_id(context,
migration_ref['new_flavor_id'])
self.db.instance_update(context, instance_id,
dict(instance_type=instance_type['name'],
memory_mb=instance_type['memory_mb'],
vcpus=instance_type['vcpus'],
local_gb=instance_type['local_gb']))
# reload the updated instance ref
# FIXME(mdietz): is there reload functionality?
instance_ref = self.db.instance_get(context, instance_id)
self.driver.finish_resize(instance_ref, disk_info)
self.db.migration_update(context, migration_id,
@@ -800,7 +846,7 @@ class ComputeManager(manager.Manager):
return self.driver.update_available_resource(context, self.host)
def pre_live_migration(self, context, instance_id):
def pre_live_migration(self, context, instance_id, time=None):
"""Preparations for live migration at dest host.
:param context: security context
@@ -808,6 +854,9 @@ class ComputeManager(manager.Manager):
"""
if not time:
time = greenthread
# Getting instance info
instance_ref = self.db.instance_get(context, instance_id)
ec2_id = instance_ref['hostname']
@@ -976,3 +1025,59 @@ class ComputeManager(manager.Manager):
for volume in instance_ref['volumes']:
self.db.volume_update(ctxt, volume['id'], {'status': 'in-use'})
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval."""
error_list = super(ComputeManager, self).periodic_tasks(context)
if error_list is None:
error_list = []
try:
self._poll_instance_states(context)
except Exception as ex:
LOG.warning(_("Error during instance poll: %s"),
unicode(ex))
error_list.append(ex)
return error_list
def _poll_instance_states(self, context):
vm_instances = self.driver.list_instances_detail()
vm_instances = dict((vm.name, vm) for vm in vm_instances)
# Keep a list of VMs not in the DB, cross them off as we find them
vms_not_found_in_db = list(vm_instances.keys())
db_instances = self.db.instance_get_all_by_host(context, self.host)
for db_instance in db_instances:
name = db_instance['name']
vm_instance = vm_instances.get(name)
if vm_instance is None:
LOG.info(_("Found instance '%(name)s' in DB but no VM. "
"Setting state to shutoff.") % locals())
vm_state = power_state.SHUTOFF
else:
vm_state = vm_instance.state
vms_not_found_in_db.remove(name)
db_state = db_instance['state']
if vm_state != db_state:
LOG.info(_("DB/VM state mismatch. Changing state from "
"'%(db_state)s' to '%(vm_state)s'") % locals())
self.db.instance_set_state(context,
db_instance['id'],
vm_state)
if vm_state == power_state.SHUTOFF:
# TODO(soren): This is what the compute manager does when you
# terminate an instance. At some point I figure we'll have a
# "terminated" state and some sort of cleanup job that runs
# occasionally, cleaning them out.
self.db.instance_destroy(context, db_instance['id'])
# Are there VMs not in the DB?
for vm_not_found_in_db in vms_not_found_in_db:
name = vm_not_found_in_db
# TODO(justinsb): What to do here? Adopt it? Shut it down?
LOG.warning(_("Found VM not in DB: '%(name)s'. Ignoring")
% locals())

View File

@@ -2,6 +2,7 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
# Copyright (c) 2010 Citrix Systems, Inc.
#
@@ -19,6 +20,7 @@
"""The various power states that a VM can be in."""
#NOTE(justinsb): These are the virDomainState values from libvirt
NOSTATE = 0x00
RUNNING = 0x01
BLOCKED = 0x02
@@ -29,9 +31,10 @@ CRASHED = 0x06
SUSPENDED = 0x07
FAILED = 0x08
def name(code):
d = {
# TODO(justinsb): Power state really needs to be a proper class,
# so that we're not locked into the libvirt status codes and can put mapping
# logic here rather than spread throughout the code
_STATE_MAP = {
NOSTATE: 'pending',
RUNNING: 'running',
BLOCKED: 'blocked',
@@ -41,4 +44,11 @@ def name(code):
CRASHED: 'crashed',
SUSPENDED: 'suspended',
FAILED: 'failed to spawn'}
return d[code]
def name(code):
return _STATE_MAP[code]
def valid_states():
return _STATE_MAP.keys()

View File

@@ -26,6 +26,7 @@ import gettext
import hashlib
import os
import shutil
import string
import struct
import tempfile
import time
@@ -267,7 +268,7 @@ def _sign_csr(csr_text, ca_folder):
'./openssl.cnf', '-infiles', inbound)
out, _err = utils.execute('openssl', 'x509', '-in', outbound,
'-serial', '-noout')
serial = out.rpartition("=")[2]
serial = string.strip(out.rpartition("=")[2])
os.chdir(start)
with open(outbound, "r") as crtfile:
return (serial, crtfile.read())

View File

@@ -214,7 +214,7 @@ def certificate_update(context, certificate_id, values):
Raises NotFound if service does not exist.
"""
return IMPL.service_update(context, certificate_id, values)
return IMPL.certificate_update(context, certificate_id, values)
###################

View File

@@ -2220,8 +2220,8 @@ def migration_get_by_instance_and_status(context, instance_id, status):
filter_by(instance_id=instance_id).\
filter_by(status=status).first()
if not result:
raise exception.NotFound(_("No migration found with instance id %s")
% migration_id)
raise exception.NotFound(_("No migration found for instance "
"%(instance_id)s with status %(status)s") % locals())
return result
@@ -2336,8 +2336,8 @@ def instance_type_create(_context, values):
instance_type_ref = models.InstanceTypes()
instance_type_ref.update(values)
instance_type_ref.save()
except:
raise exception.DBError
except Exception, e:
raise exception.DBError(e)
return instance_type_ref

View File

@@ -0,0 +1,50 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.from sqlalchemy import *
from sqlalchemy import *
from migrate import *
from nova import log as logging
meta = MetaData()
migrations = Table('migrations', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)
#
# Tables to alter
#
#
old_flavor_id = Column('old_flavor_id', Integer())
new_flavor_id = Column('new_flavor_id', Integer())
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine;
# bind migrate_engine to your metadata
meta.bind = migrate_engine
migrations.create_column(old_flavor_id)
migrations.create_column(new_flavor_id)
def downgrade(migrate_engine):
meta.bind = migrate_engine
migrations.drop_column(old_flavor_id)
migrations.drop_column(new_flavor_id)

View File

@@ -436,6 +436,8 @@ class Migration(BASE, NovaBase):
source_compute = Column(String(255))
dest_compute = Column(String(255))
dest_host = Column(String(255))
old_flavor_id = Column(Integer())
new_flavor_id = Column(Integer())
instance_id = Column(Integer, ForeignKey('instances.id'), nullable=True)
#TODO(_cerberus_): enum
status = Column(String(255))

View File

@@ -358,5 +358,6 @@ DEFINE_string('node_availability_zone', 'nova',
'availability zone of this node')
DEFINE_string('zone_name', 'nova', 'name of this zone')
DEFINE_string('zone_capabilities', 'kypervisor:xenserver;os:linux',
'Key/Value tags which represent capabilities of this zone')
DEFINE_list('zone_capabilities',
['hypervisor=xenserver;kvm', 'os=linux;windows'],
'Key/Multi-value list representng capabilities of this zone')

View File

@@ -18,6 +18,8 @@
from __future__ import absolute_import
import datetime
from glance.common import exception as glance_exception
from nova import exception
@@ -45,8 +47,13 @@ class GlanceImageService(service.BaseImageService):
SERVICE_IMAGE_ATTRS = service.BaseImageService.BASE_IMAGE_ATTRS +\
GLANCE_ONLY_ATTRS
def __init__(self):
self.client = GlanceClient(FLAGS.glance_host, FLAGS.glance_port)
def __init__(self, client=None):
# FIXME(sirp): can we avoid dependency-injection here by using
# stubbing out a fake?
if client is None:
self.client = GlanceClient(FLAGS.glance_host, FLAGS.glance_port)
else:
self.client = client
def index(self, context):
"""
@@ -59,8 +66,8 @@ class GlanceImageService(service.BaseImageService):
image_metas = self.client.get_images_detailed()
for image_meta in image_metas:
if self._is_image_available(context, image_meta):
meta = utils.subset_dict(image_meta, ('id', 'name'))
filtered.append(meta)
meta_subset = utils.subset_dict(image_meta, ('id', 'name'))
filtered.append(meta_subset)
return filtered
def detail(self, context):
@@ -71,8 +78,8 @@ class GlanceImageService(service.BaseImageService):
image_metas = self.client.get_images_detailed()
for image_meta in image_metas:
if self._is_image_available(context, image_meta):
meta = self._translate_to_base(image_meta)
filtered.append(meta)
base_image_meta = self._translate_to_base(image_meta)
filtered.append(base_image_meta)
return filtered
def show(self, context, image_id):
@@ -84,8 +91,11 @@ class GlanceImageService(service.BaseImageService):
except glance_exception.NotFound:
raise exception.NotFound
meta = self._translate_to_base(image_meta)
return meta
if not self._is_image_available(context, image_meta):
raise exception.NotFound
base_image_meta = self._translate_to_base(image_meta)
return base_image_meta
def show_by_name(self, context, name):
"""
@@ -93,15 +103,11 @@ class GlanceImageService(service.BaseImageService):
"""
# TODO(vish): replace this with more efficient call when glance
# supports it.
images = self.detail(context)
image = None
for cantidate in images:
if name == cantidate.get('name'):
image = cantidate
break
if image is None:
raise exception.NotFound
return image
image_metas = self.detail(context)
for image_meta in image_metas:
if name == image_meta.get('name'):
return image_meta
raise exception.NotFound
def get(self, context, image_id, data):
"""
@@ -115,31 +121,43 @@ class GlanceImageService(service.BaseImageService):
for chunk in image_chunks:
data.write(chunk)
meta = self._translate_to_base(image_meta)
return meta
base_image_meta = self._translate_to_base(image_meta)
return base_image_meta
def create(self, context, metadata, data=None):
def create(self, context, image_meta, data=None):
"""
Store the image data and return the new image id.
:raises AlreadyExists if the image already exist.
"""
# Translate Base -> Service
LOG.debug(_("Creating image in Glance. Metadata passed in %s"),
metadata)
meta = self._translate_to_service(metadata)
LOG.debug(_("Metadata after formatting for Glance %s"), meta)
return self.client.add_image(meta, data)
image_meta)
sent_service_image_meta = self._translate_to_service(image_meta)
LOG.debug(_("Metadata after formatting for Glance %s"),
sent_service_image_meta)
def update(self, context, image_id, metadata, data=None):
recv_service_image_meta = self.client.add_image(
sent_service_image_meta, data)
# Translate Service -> Base
base_image_meta = self._translate_to_base(recv_service_image_meta)
LOG.debug(_("Metadata returned from Glance formatted for Base %s"),
base_image_meta)
return base_image_meta
def update(self, context, image_id, image_meta, data=None):
"""Replace the contents of the given image with the new data.
:raises NotFound if the image does not exist.
"""
try:
result = self.client.update_image(image_id, metadata, data)
image_meta = self.client.update_image(image_id, image_meta, data)
except glance_exception.NotFound:
raise exception.NotFound
return result
base_image_meta = self._translate_to_base(image_meta)
return base_image_meta
def delete(self, context, image_id):
"""
@@ -159,6 +177,15 @@ class GlanceImageService(service.BaseImageService):
"""
pass
@classmethod
def _translate_to_base(cls, image_meta):
"""Overriding the base translation to handle conversion to datetime
objects
"""
image_meta = service.BaseImageService._translate_to_base(image_meta)
image_meta = _convert_timestamps_to_datetimes(image_meta)
return image_meta
@staticmethod
def _is_image_available(context, image_meta):
"""
@@ -185,3 +212,26 @@ class GlanceImageService(service.BaseImageService):
return False
return str(user_id) == str(context.user_id)
# utility functions
def _convert_timestamps_to_datetimes(image_meta):
"""
Returns image with known timestamp fields converted to datetime objects
"""
for attr in ['created_at', 'updated_at', 'deleted_at']:
if image_meta.get(attr) is not None:
image_meta[attr] = _parse_glance_iso8601_timestamp(
image_meta[attr])
return image_meta
def _parse_glance_iso8601_timestamp(timestamp):
"""
Parse a subset of iso8601 timestamps into datetime objects
"""
GLANCE_FMT = "%Y-%m-%dT%H:%M:%S"
ISO_FMT = "%Y-%m-%dT%H:%M:%S.%f"
# FIXME(sirp): Glance is not returning in ISO format, we should fix Glance
# to do so, and then switch to parsing it here
return datetime.datetime.strptime(timestamp, GLANCE_FMT)

View File

@@ -44,41 +44,6 @@ class BaseImageService(object):
# the ImageService subclass
SERVICE_IMAGE_ATTRS = []
@classmethod
def _translate_to_base(cls, metadata):
"""Return a metadata dictionary that is BaseImageService compliant.
This is used by subclasses to expose only a metadata dictionary that
is the same across ImageService implementations.
"""
return cls.propertify_metadata(metadata, cls.BASE_IMAGE_ATTRS)
@classmethod
def _translate_to_service(cls, metadata):
"""Return a metadata dictionary that is usable by the ImageService
subclass.
As an example, Glance has additional attributes (like 'location'); the
BaseImageService considers these properties, but we need to translate
these back to first-class attrs for sending to Glance. This method
handles this by allowing you to specify the attributes an ImageService
considers first-class.
"""
if not cls.SERVICE_IMAGE_ATTRS:
raise NotImplementedError(_("Cannot use this without specifying "
"SERVICE_IMAGE_ATTRS for subclass"))
return cls.propertify_metadata(metadata, cls.SERVICE_IMAGE_ATTRS)
@staticmethod
def propertify_metadata(metadata, keys):
"""Return a dict with any unrecognized keys placed in the nested
'properties' dict.
"""
flattened = utils.flatten_dict(metadata)
attributes, properties = utils.partition_dict(flattened, keys)
attributes['properties'] = properties
return attributes
def index(self, context):
"""
Returns a sequence of mappings of id and name information about
@@ -99,9 +64,9 @@ class BaseImageService(object):
:retval: a sequence of mappings with the following signature
{'id': opaque id of image,
'name': name of image,
'created_at': creation timestamp,
'updated_at': modification timestamp,
'deleted_at': deletion timestamp or None,
'created_at': creation datetime object,
'updated_at': modification datetime object,
'deleted_at': deletion datetime object or None,
'deleted': boolean indicating if image has been deleted,
'status': string description of image status,
'is_public': boolean indicating if image is public
@@ -123,9 +88,9 @@ class BaseImageService(object):
{'id': opaque id of image,
'name': name of image,
'created_at': creation timestamp,
'updated_at': modification timestamp,
'deleted_at': deletion timestamp or None,
'created_at': creation datetime object,
'updated_at': modification datetime object,
'deleted_at': deletion datetime object or None,
'deleted': boolean indicating if image has been deleted,
'status': string description of image status,
'is_public': boolean indicating if image is public
@@ -147,7 +112,7 @@ class BaseImageService(object):
def create(self, context, metadata, data=None):
"""
Store the image metadata and data and return the new image id.
Store the image metadata and data and return the new image metadata.
:raises AlreadyExists if the image already exist.
@@ -155,7 +120,7 @@ class BaseImageService(object):
raise NotImplementedError
def update(self, context, image_id, metadata, data=None):
"""Update the given image with the new metadata and data.
"""Update the given image metadata and data and return the metadata
:raises NotFound if the image does not exist.
@@ -170,3 +135,38 @@ class BaseImageService(object):
"""
raise NotImplementedError
@classmethod
def _translate_to_base(cls, metadata):
"""Return a metadata dictionary that is BaseImageService compliant.
This is used by subclasses to expose only a metadata dictionary that
is the same across ImageService implementations.
"""
return cls._propertify_metadata(metadata, cls.BASE_IMAGE_ATTRS)
@classmethod
def _translate_to_service(cls, metadata):
"""Return a metadata dictionary that is usable by the ImageService
subclass.
As an example, Glance has additional attributes (like 'location'); the
BaseImageService considers these properties, but we need to translate
these back to first-class attrs for sending to Glance. This method
handles this by allowing you to specify the attributes an ImageService
considers first-class.
"""
if not cls.SERVICE_IMAGE_ATTRS:
raise NotImplementedError(_("Cannot use this without specifying "
"SERVICE_IMAGE_ATTRS for subclass"))
return cls._propertify_metadata(metadata, cls.SERVICE_IMAGE_ATTRS)
@staticmethod
def _propertify_metadata(metadata, keys):
"""Return a dict with any unrecognized keys placed in the nested
'properties' dict.
"""
flattened = utils.flatten_dict(metadata)
attributes, properties = utils.partition_dict(flattened, keys)
attributes['properties'] = properties
return attributes

View File

@@ -53,11 +53,14 @@ This module provides Manager, a base class for managers.
from nova import utils
from nova import flags
from nova import log as logging
from nova.db import base
from nova.scheduler import api
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.manager')
class Manager(base.Base):
def __init__(self, host=None, db_driver=None):
@@ -74,3 +77,29 @@ class Manager(base.Base):
"""Do any initialization that needs to be run if this is a standalone
service. Child classes should override this method."""
pass
class SchedulerDependentManager(Manager):
"""Periodically send capability updates to the Scheduler services.
Services that need to update the Scheduler of their capabilities
should derive from this class. Otherwise they can derive from
manager.Manager directly. Updates are only sent after
update_service_capabilities is called with non-None values."""
def __init__(self, host=None, db_driver=None, service_name="undefined"):
self.last_capabilities = None
self.service_name = service_name
super(SchedulerDependentManager, self).__init__(host, db_driver)
def update_service_capabilities(self, capabilities):
"""Remember these capabilities to send on next periodic update."""
self.last_capabilities = capabilities
def periodic_tasks(self, context=None):
"""Pass data back to the scheduler at a periodic interval"""
if self.last_capabilities:
LOG.debug(_("Notifying Schedulers of capabilities ..."))
api.update_service_capabilities(context, self.service_name,
self.host, self.last_capabilities)
super(SchedulerDependentManager, self).periodic_tasks(context)

View File

@@ -21,8 +21,6 @@ import inspect
import os
import calendar
from eventlet import semaphore
from nova import db
from nova import exception
from nova import flags
@@ -272,37 +270,30 @@ class IptablesManager(object):
self.ipv4['nat'].add_chain('floating-snat')
self.ipv4['nat'].add_rule('snat', '-j $floating-snat')
self.semaphore = semaphore.Semaphore()
@utils.synchronized('iptables')
@utils.synchronized('iptables', external=True)
def apply(self):
"""Apply the current in-memory set of iptables rules
This will blow away any rules left over from previous runs of the
same component of Nova, and replace them with our current set of
rules. This happens atomically, thanks to iptables-restore.
We wrap the call in a semaphore lock, so that we don't race with
ourselves. In the event of a race with another component running
an iptables-* command at the same time, we retry up to 5 times.
"""
with self.semaphore:
s = [('iptables', self.ipv4)]
if FLAGS.use_ipv6:
s += [('ip6tables', self.ipv6)]
s = [('iptables', self.ipv4)]
if FLAGS.use_ipv6:
s += [('ip6tables', self.ipv6)]
for cmd, tables in s:
for table in tables:
current_table, _ = self.execute('sudo',
'%s-save' % (cmd,),
'-t', '%s' % (table,),
attempts=5)
current_lines = current_table.split('\n')
new_filter = self._modify_rules(current_lines,
tables[table])
self.execute('sudo', '%s-restore' % (cmd,),
process_input='\n'.join(new_filter),
attempts=5)
for cmd, tables in s:
for table in tables:
current_table, _ = self.execute('sudo',
'%s-save' % (cmd,),
'-t', '%s' % (table,),
attempts=5)
current_lines = current_table.split('\n')
new_filter = self._modify_rules(current_lines,
tables[table])
self.execute('sudo', '%s-restore' % (cmd,),
process_input='\n'.join(new_filter),
attempts=5)
def _modify_rules(self, current_lines, table, binary=None):
unwrapped_chains = table.unwrapped_chains
@@ -595,6 +586,7 @@ def update_dhcp(context, network_id):
_execute(*command, addl_env=env)
@utils.synchronized('radvd_start')
def update_ra(context, network_id):
network_ref = db.network_get(context, network_id)

View File

@@ -105,7 +105,7 @@ class AddressAlreadyAllocated(exception.Error):
pass
class NetworkManager(manager.Manager):
class NetworkManager(manager.SchedulerDependentManager):
"""Implements common network manager functionality.
This class must be subclassed to support specific topologies.
@@ -116,7 +116,8 @@ class NetworkManager(manager.Manager):
if not network_driver:
network_driver = FLAGS.network_driver
self.driver = utils.import_object(network_driver)
super(NetworkManager, self).__init__(*args, **kwargs)
super(NetworkManager, self).__init__(service_name='network',
*args, **kwargs)
def init_host(self):
"""Do any initialization that needs to be run if this is a

View File

@@ -137,24 +137,7 @@ class Consumer(messaging.Consumer):
return timer
class Publisher(messaging.Publisher):
"""Publisher base class"""
pass
class TopicConsumer(Consumer):
"""Consumes messages on a specific topic"""
exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast"):
self.queue = topic
self.routing_key = topic
self.exchange = FLAGS.control_exchange
self.durable = False
super(TopicConsumer, self).__init__(connection=connection)
class AdapterConsumer(TopicConsumer):
class AdapterConsumer(Consumer):
"""Calls methods on a proxy object based on method and args"""
def __init__(self, connection=None, topic="broadcast", proxy=None):
LOG.debug(_('Initing the Adapter Consumer for %s') % topic)
@@ -207,6 +190,41 @@ class AdapterConsumer(TopicConsumer):
return
class Publisher(messaging.Publisher):
"""Publisher base class"""
pass
class TopicAdapterConsumer(AdapterConsumer):
"""Consumes messages on a specific topic"""
exchange_type = "topic"
def __init__(self, connection=None, topic="broadcast", proxy=None):
self.queue = topic
self.routing_key = topic
self.exchange = FLAGS.control_exchange
self.durable = False
super(TopicAdapterConsumer, self).__init__(connection=connection,
topic=topic, proxy=proxy)
class FanoutAdapterConsumer(AdapterConsumer):
"""Consumes messages from a fanout exchange"""
exchange_type = "fanout"
def __init__(self, connection=None, topic="broadcast", proxy=None):
self.exchange = "%s_fanout" % topic
self.routing_key = topic
unique = uuid.uuid4().hex
self.queue = "%s_fanout_%s" % (topic, unique)
self.durable = False
LOG.info(_("Created '%(exchange)s' fanout exchange "
"with '%(key)s' routing key"),
dict(exchange=self.exchange, key=self.routing_key))
super(FanoutAdapterConsumer, self).__init__(connection=connection,
topic=topic, proxy=proxy)
class TopicPublisher(Publisher):
"""Publishes messages on a specific topic"""
exchange_type = "topic"
@@ -218,6 +236,19 @@ class TopicPublisher(Publisher):
super(TopicPublisher, self).__init__(connection=connection)
class FanoutPublisher(Publisher):
"""Publishes messages to a fanout exchange."""
exchange_type = "fanout"
def __init__(self, topic, connection=None):
self.exchange = "%s_fanout" % topic
self.queue = "%s_fanout" % topic
self.durable = False
LOG.info(_("Creating '%(exchange)s' fanout exchange"),
dict(exchange=self.exchange))
super(FanoutPublisher, self).__init__(connection=connection)
class DirectConsumer(Consumer):
"""Consumes messages directly on a channel specified by msg_id"""
exchange_type = "direct"
@@ -360,6 +391,16 @@ def cast(context, topic, msg):
publisher.close()
def fanout_cast(context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response"""
LOG.debug(_("Making asynchronous fanout cast..."))
_pack_context(msg, context)
conn = Connection.instance()
publisher = FanoutPublisher(topic, connection=conn)
publisher.send(msg)
publisher.close()
def generic_response(message_data, message):
"""Logs a result and exits"""
LOG.debug(_('response %s'), message_data)

View File

@@ -25,25 +25,40 @@ FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.scheduler.api')
class API(object):
"""API for interacting with the scheduler."""
def _call_scheduler(method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
def _call_scheduler(self, method, context, params=None):
"""Generic handler for RPC calls to the scheduler.
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:param params: Optional dictionary of arguments to be passed to the
scheduler worker
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
:retval: Result returned by scheduler worker
"""
if not params:
params = {}
queue = FLAGS.scheduler_topic
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
def get_zone_list(self, context):
items = self._call_scheduler('get_zone_list', context)
for item in items:
item['api_url'] = item['api_url'].replace('\\/', '/')
return items
def get_zone_list(context):
"""Return a list of zones assoicated with this zone."""
items = _call_scheduler('get_zone_list', context)
for item in items:
item['api_url'] = item['api_url'].replace('\\/', '/')
return items
def get_zone_capabilities(context, service=None):
"""Returns a dict of key, value capabilities for this zone,
or for a particular class of services running in this zone."""
return _call_scheduler('get_zone_capabilities', context=context,
params=dict(service=service))
def update_service_capabilities(context, service_name, host, capabilities):
"""Send an update to all the scheduler services informing them
of the capabilities of this service."""
kwargs = dict(method='update_service_capabilities',
args=dict(service_name=service_name, host=host,
capabilities=capabilities))
return rpc.fanout_cast(context, 'scheduler', kwargs)

View File

@@ -49,6 +49,13 @@ class WillNotSchedule(exception.Error):
class Scheduler(object):
"""The base class that all Scheduler clases should inherit from."""
def __init__(self):
self.zone_manager = None
def set_zone_manager(self, zone_manager):
"""Called by the Scheduler Service to supply a ZoneManager."""
self.zone_manager = zone_manager
@staticmethod
def service_is_up(service):
"""Check whether a service is up based on last heartbeat."""

View File

@@ -41,10 +41,11 @@ flags.DEFINE_string('scheduler_driver',
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
def __init__(self, scheduler_driver=None, *args, **kwargs):
self.zone_manager = zone_manager.ZoneManager()
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
self.driver = utils.import_object(scheduler_driver)
self.zone_manager = zone_manager.ZoneManager()
self.driver.set_zone_manager(self.zone_manager)
super(SchedulerManager, self).__init__(*args, **kwargs)
def __getattr__(self, key):
@@ -59,6 +60,17 @@ class SchedulerManager(manager.Manager):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
def get_zone_capabilities(self, context=None, service=None):
"""Get the normalized set of capabilites for this zone,
or for a particular service."""
return self.zone_manager.get_zone_capabilities(context, service)
def update_service_capabilities(self, context=None, service_name=None,
host=None, capabilities={}):
"""Process a capability update from a service node."""
self.zone_manager.update_service_capabilities(service_name,
host, capabilities)
def _schedule(self, method, context, topic, *args, **kwargs):
"""Tries to call schedule_* method on the driver to retrieve host.

View File

@@ -105,12 +105,36 @@ class ZoneManager(object):
def __init__(self):
self.last_zone_db_check = datetime.min
self.zone_states = {}
self.service_states = {} # { <service> : { <host> : { cap k : v }}}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
def get_zone_capabilities(self, context, service=None):
"""Roll up all the individual host info to generic 'service'
capabilities. Each capability is aggregated into
<cap>_min and <cap>_max values."""
service_dict = self.service_states
if service:
service_dict = {service: self.service_states.get(service, {})}
# TODO(sandy) - be smarter about fabricating this structure.
# But it's likely to change once we understand what the Best-Match
# code will need better.
combined = {} # { <service>_<cap> : (min, max), ... }
for service_name, host_dict in service_dict.iteritems():
for host, caps_dict in host_dict.iteritems():
for cap, value in caps_dict.iteritems():
key = "%s_%s" % (service_name, cap)
min_value, max_value = combined.get(key, (value, value))
min_value = min(min_value, value)
max_value = max(max_value, value)
combined[key] = (min_value, max_value)
return combined
def _refresh_from_db(self, context):
"""Make our zone state map match the db."""
# Add/update existing zones ...
@@ -141,3 +165,11 @@ class ZoneManager(object):
self.last_zone_db_check = datetime.now()
self._refresh_from_db(context)
self._poll_zones(context)
def update_service_capabilities(self, service_name, host, capabilities):
"""Update the per-service capabilities based on this notification."""
logging.debug(_("Received %(service_name)s service update from "
"%(host)s: %(capabilities)s") % locals())
service_caps = self.service_states.get(service_name, {})
service_caps[host] = capabilities
self.service_states[service_name] = service_caps

View File

@@ -97,18 +97,24 @@ class Service(object):
conn1 = rpc.Connection.instance(new=True)
conn2 = rpc.Connection.instance(new=True)
conn3 = rpc.Connection.instance(new=True)
if self.report_interval:
consumer_all = rpc.AdapterConsumer(
consumer_all = rpc.TopicAdapterConsumer(
connection=conn1,
topic=self.topic,
proxy=self)
consumer_node = rpc.AdapterConsumer(
consumer_node = rpc.TopicAdapterConsumer(
connection=conn2,
topic='%s.%s' % (self.topic, self.host),
proxy=self)
fanout = rpc.FanoutAdapterConsumer(
connection=conn3,
topic=self.topic,
proxy=self)
self.timers.append(consumer_all.attach_to_eventlet())
self.timers.append(consumer_node.attach_to_eventlet())
self.timers.append(fanout.attach_to_eventlet())
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)

View File

@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import datetime
import json
import random
@@ -72,14 +73,18 @@ def fake_wsgi(self, req):
return self.application
def wsgi_app(inner_application=None):
if not inner_application:
inner_application = openstack.APIRouter()
def wsgi_app(inner_app10=None, inner_app11=None):
if not inner_app10:
inner_app10 = openstack.APIRouterV10()
if not inner_app11:
inner_app11 = openstack.APIRouterV11()
mapper = urlmap.URLMap()
api = openstack.FaultWrapper(auth.AuthMiddleware(
limits.RateLimitingMiddleware(inner_application)))
mapper['/v1.0'] = api
mapper['/v1.1'] = api
api10 = openstack.FaultWrapper(auth.AuthMiddleware(
limits.RateLimitingMiddleware(inner_app10)))
api11 = openstack.FaultWrapper(auth.AuthMiddleware(
limits.RateLimitingMiddleware(inner_app11)))
mapper['/v1.0'] = api10
mapper['/v1.1'] = api11
mapper['/'] = openstack.FaultWrapper(openstack.Versions())
return mapper
@@ -165,15 +170,16 @@ def stub_out_glance(stubs, initial_fixtures=None):
for f in self.fixtures]
def fake_get_images_detailed(self):
return self.fixtures
return copy.deepcopy(self.fixtures)
def fake_get_image_meta(self, image_id):
for f in self.fixtures:
if f['id'] == image_id:
return f
image = self._find_image(image_id)
if image:
return copy.deepcopy(image)
raise glance_exc.NotFound
def fake_add_image(self, image_meta, data=None):
image_meta = copy.deepcopy(image_meta)
image_id = ''.join(random.choice(string.letters)
for _ in range(20))
image_meta['id'] = image_id
@@ -181,7 +187,7 @@ def stub_out_glance(stubs, initial_fixtures=None):
return image_meta
def fake_update_image(self, image_id, image_meta, data=None):
f = self.fake_get_image_meta(image_id)
f = self._find_image(image_id)
if not f:
raise glance_exc.NotFound
@@ -189,12 +195,18 @@ def stub_out_glance(stubs, initial_fixtures=None):
return f
def fake_delete_image(self, image_id):
f = self.fake_get_image_meta(image_id)
f = self._find_image(image_id)
if not f:
raise glance_exc.NotFound
self.fixtures.remove(f)
def _find_image(self, image_id):
for f in self.fixtures:
if f['id'] == image_id:
return f
return None
GlanceClient = glance_client.Client
fake = FakeGlanceClient(initial_fixtures)
@@ -208,6 +220,7 @@ def stub_out_glance(stubs, initial_fixtures=None):
class FakeToken(object):
# FIXME(sirp): let's not use id here
id = 0
def __init__(self, **kwargs):

View File

@@ -83,8 +83,7 @@ class Test(test.TestCase):
self.assertEqual(result.headers['X-Storage-Url'], "")
token = result.headers['X-Auth-Token']
self.stubs.Set(nova.api.openstack, 'APIRouter',
fakes.FakeRouter)
self.stubs.Set(nova.api.openstack, 'APIRouterV10', fakes.FakeRouter)
req = webob.Request.blank('/v1.0/fake')
req.headers['X-Auth-Token'] = token
result = req.get_response(fakes.wsgi_app())
@@ -201,8 +200,7 @@ class TestLimiter(test.TestCase):
self.assertEqual(len(result.headers['X-Auth-Token']), 40)
token = result.headers['X-Auth-Token']
self.stubs.Set(nova.api.openstack, 'APIRouter',
fakes.FakeRouter)
self.stubs.Set(nova.api.openstack, 'APIRouterV10', fakes.FakeRouter)
req = webob.Request.blank('/v1.0/fake')
req.method = 'POST'
req.headers['X-Auth-Token'] = token

View File

@@ -161,7 +161,6 @@ class GlanceImageServiceTest(test.TestCase,
2. ImageService -> API - This is needed so we can support multple
APIs (OpenStack, EC2)
"""
def setUp(self):
super(GlanceImageServiceTest, self).setUp()
self.stubs = stubout.StubOutForTesting()
@@ -185,14 +184,14 @@ class GlanceImageServiceTest(test.TestCase,
'properties': {'instance_id': '42', 'user_id': '1'}}
image_id = self.service.create(self.context, fixture)['id']
expected = fixture
self.assertDictMatch(self.sent_to_glance['metadata'], expected)
image_meta = self.service.show(self.context, image_id)
expected = {'id': image_id,
'name': 'test image',
'is_public': False,
'properties': {'instance_id': '42', 'user_id': '1'}}
self.assertDictMatch(self.sent_to_glance['metadata'], expected)
image_meta = self.service.show(self.context, image_id)
self.assertDictMatch(image_meta, expected)
image_metas = self.service.detail(self.context)
@@ -207,19 +206,15 @@ class GlanceImageServiceTest(test.TestCase,
fixture = {'name': 'test image'}
image_id = self.service.create(self.context, fixture)['id']
expected = {'id': image_id, 'name': 'test image', 'properties': {}}
expected = {'name': 'test image', 'properties': {}}
self.assertDictMatch(self.sent_to_glance['metadata'], expected)
class ImageControllerWithGlanceServiceTest(test.TestCase):
"""Test of the OpenStack API /images application controller"""
# FIXME(sirp): The ImageService and API use two different formats for
# timestamps. Ultimately, the ImageService should probably use datetime
# objects
NOW_SERVICE_STR = "2010-10-11T10:30:22"
NOW_API_STR = "2010-10-11T10:30:22Z"
NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
NOW_API_FORMAT = "2010-10-11T10:30:22Z"
def setUp(self):
super(ImageControllerWithGlanceServiceTest, self).setUp()
@@ -258,20 +253,21 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
image_metas = json.loads(res.body)['images']
now = self.NOW_API_FORMAT
expected = [
{'id': 123, 'name': 'public image', 'updated': self.NOW_API_STR,
'created': self.NOW_API_STR, 'status': 'ACTIVE'},
{'id': 123, 'name': 'public image', 'updated': now,
'created': now, 'status': 'ACTIVE'},
{'id': 124, 'name': 'queued backup', 'serverId': 42,
'updated': self.NOW_API_STR, 'created': self.NOW_API_STR,
'updated': now, 'created': now,
'status': 'QUEUED'},
{'id': 125, 'name': 'saving backup', 'serverId': 42,
'updated': self.NOW_API_STR, 'created': self.NOW_API_STR,
'updated': now, 'created': now,
'status': 'SAVING', 'progress': 0},
{'id': 126, 'name': 'active backup', 'serverId': 42,
'updated': self.NOW_API_STR, 'created': self.NOW_API_STR,
'updated': now, 'created': now,
'status': 'ACTIVE'},
{'id': 127, 'name': 'killed backup', 'serverId': 42,
'updated': self.NOW_API_STR, 'created': self.NOW_API_STR,
'updated': now, 'created': now,
'status': 'FAILED'}
]
@@ -282,8 +278,8 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
image_meta = json.loads(res.body)['image']
expected = {'id': 123, 'name': 'public image',
'updated': self.NOW_API_STR, 'created': self.NOW_API_STR,
'status': 'ACTIVE'}
'updated': self.NOW_API_FORMAT,
'created': self.NOW_API_FORMAT, 'status': 'ACTIVE'}
self.assertDictMatch(image_meta, expected)
def test_get_image_non_existent(self):
@@ -302,8 +298,8 @@ class ImageControllerWithGlanceServiceTest(test.TestCase):
@classmethod
def _make_image_fixtures(cls):
image_id = 123
base_attrs = {'created_at': cls.NOW_SERVICE_STR,
'updated_at': cls.NOW_SERVICE_STR,
base_attrs = {'created_at': cls.NOW_GLANCE_FORMAT,
'updated_at': cls.NOW_GLANCE_FORMAT,
'deleted_at': None,
'deleted': False}

View File

@@ -161,7 +161,7 @@ class ServersTest(test.TestCase):
req = webob.Request.blank('/v1.0/servers/1')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res_dict['server']['id'], '1')
self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['name'], 'server1')
def test_get_server_by_id_with_addresses(self):
@@ -172,7 +172,7 @@ class ServersTest(test.TestCase):
req = webob.Request.blank('/v1.0/servers/1')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res_dict['server']['id'], '1')
self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['name'], 'server1')
addresses = res_dict['server']['addresses']
self.assertEqual(len(addresses["public"]), len(public))
@@ -180,7 +180,7 @@ class ServersTest(test.TestCase):
self.assertEqual(len(addresses["private"]), 1)
self.assertEqual(addresses["private"][0], private)
def test_get_server_by_id_with_addresses_v1_1(self):
def test_get_server_by_id_with_addresses_v11(self):
private = "192.168.0.3"
public = ["1.2.3.4"]
new_return_server = return_server_with_addresses(private, public)
@@ -189,7 +189,7 @@ class ServersTest(test.TestCase):
req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res_dict['server']['id'], '1')
self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['name'], 'server1')
addresses = res_dict['server']['addresses']
self.assertEqual(len(addresses["public"]), len(public))
@@ -239,7 +239,7 @@ class ServersTest(test.TestCase):
servers = json.loads(res.body)['servers']
self.assertEqual([s['id'] for s in servers], [1, 2])
def _test_create_instance_helper(self):
def _setup_for_create_instance(self):
"""Shared implementation for tests below that create instance"""
def instance_create(context, inst):
return {'id': '1', 'display_name': 'server_test'}
@@ -276,14 +276,17 @@ class ServersTest(test.TestCase):
self.stubs.Set(nova.api.openstack.common,
"get_image_id_from_image_hash", image_id_from_hash)
def _test_create_instance_helper(self):
self._setup_for_create_instance()
body = dict(server=dict(
name='server_test', imageId=2, flavorId=2,
name='server_test', imageId=3, flavorId=2,
metadata={'hello': 'world', 'open': 'stack'},
personality={}))
req = webob.Request.blank('/v1.0/servers')
req.method = 'POST'
req.body = json.dumps(body)
req.headers["Content-Type"] = "application/json"
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
@@ -291,8 +294,9 @@ class ServersTest(test.TestCase):
self.assertEqual('serv', server['adminPass'][:4])
self.assertEqual(16, len(server['adminPass']))
self.assertEqual('server_test', server['name'])
self.assertEqual('1', server['id'])
self.assertEqual(1, server['id'])
self.assertEqual(2, server['flavorId'])
self.assertEqual(3, server['imageId'])
self.assertEqual(res.status_int, 200)
def test_create_instance(self):
@@ -302,6 +306,56 @@ class ServersTest(test.TestCase):
fakes.stub_out_key_pair_funcs(self.stubs, have_key_pair=False)
self._test_create_instance_helper()
def test_create_instance_v11(self):
self._setup_for_create_instance()
imageRef = 'http://localhost/v1.1/images/2'
flavorRef = 'http://localhost/v1.1/flavors/3'
body = {
'server': {
'name': 'server_test',
'imageRef': imageRef,
'flavorRef': flavorRef,
'metadata': {
'hello': 'world',
'open': 'stack',
},
'personality': {},
},
}
req = webob.Request.blank('/v1.1/servers')
req.method = 'POST'
req.body = json.dumps(body)
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
server = json.loads(res.body)['server']
self.assertEqual('serv', server['adminPass'][:4])
self.assertEqual(16, len(server['adminPass']))
self.assertEqual('server_test', server['name'])
self.assertEqual(1, server['id'])
self.assertEqual(flavorRef, server['flavorRef'])
self.assertEqual(imageRef, server['imageRef'])
self.assertEqual(res.status_int, 200)
def test_create_instance_v11_bad_href(self):
self._setup_for_create_instance()
imageRef = 'http://localhost/v1.1/images/asdf'
flavorRef = 'http://localhost/v1.1/flavors/3'
body = dict(server=dict(
name='server_test', imageRef=imageRef, flavorRef=flavorRef,
metadata={'hello': 'world', 'open': 'stack'},
personality={}))
req = webob.Request.blank('/v1.1/servers')
req.method = 'POST'
req.body = json.dumps(body)
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
def test_update_no_body(self):
req = webob.Request.blank('/v1.0/servers/1')
req.method = 'PUT'
@@ -524,16 +578,6 @@ class ServersTest(test.TestCase):
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
def test_server_resize(self):
body = dict(server=dict(
name='server_test', imageId=2, flavorId=2, metadata={},
personality={}))
req = webob.Request.blank('/v1.0/servers/1/action')
req.method = 'POST'
req.content_type = 'application/json'
req.body = json.dumps(body)
res = req.get_response(fakes.wsgi_app())
def test_delete_server_instance(self):
req = webob.Request.blank('/v1.0/servers/1')
req.method = 'DELETE'
@@ -589,6 +633,18 @@ class ServersTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
def test_resized_server_has_correct_status(self):
req = self.webreq('/1', 'GET', dict(resize=dict(flavorId=3)))
def fake_migration_get(*args):
return {}
self.stubs.Set(nova.db, 'migration_get_by_instance_and_status',
fake_migration_get)
res = req.get_response(fakes.wsgi_app())
body = json.loads(res.body)
self.assertEqual(body['server']['status'], 'resize-confirm')
def test_confirm_resize_server(self):
req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
@@ -943,7 +999,7 @@ class TestServerInstanceCreation(test.TestCase):
def _setup_mock_compute_api_for_personality(self):
class MockComputeAPI(object):
class MockComputeAPI(nova.compute.API):
def __init__(self):
self.injected_files = None

View File

@@ -75,6 +75,10 @@ def zone_get_all_db(context):
]
def zone_capabilities(method, context, params):
return dict()
class ZonesTest(test.TestCase):
def setUp(self):
super(ZonesTest, self).setUp()
@@ -93,13 +97,18 @@ class ZonesTest(test.TestCase):
self.stubs.Set(nova.db, 'zone_create', zone_create)
self.stubs.Set(nova.db, 'zone_delete', zone_delete)
self.old_zone_name = FLAGS.zone_name
self.old_zone_capabilities = FLAGS.zone_capabilities
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.allow_admin_api = self.allow_admin
FLAGS.zone_name = self.old_zone_name
FLAGS.zone_capabilities = self.old_zone_capabilities
super(ZonesTest, self).tearDown()
def test_get_zone_list_scheduler(self):
self.stubs.Set(api.API, '_call_scheduler', zone_get_all_scheduler)
self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler)
req = webob.Request.blank('/v1.0/zones')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
@@ -108,8 +117,7 @@ class ZonesTest(test.TestCase):
self.assertEqual(len(res_dict['zones']), 2)
def test_get_zone_list_db(self):
self.stubs.Set(api.API, '_call_scheduler',
zone_get_all_scheduler_empty)
self.stubs.Set(api, '_call_scheduler', zone_get_all_scheduler_empty)
self.stubs.Set(nova.db, 'zone_get_all', zone_get_all_db)
req = webob.Request.blank('/v1.0/zones')
req.headers["Content-Type"] = "application/json"
@@ -167,3 +175,18 @@ class ZonesTest(test.TestCase):
self.assertEqual(res_dict['zone']['id'], 1)
self.assertEqual(res_dict['zone']['api_url'], 'http://example.com')
self.assertFalse('username' in res_dict['zone'])
def test_zone_info(self):
FLAGS.zone_name = 'darksecret'
FLAGS.zone_capabilities = ['cap1=a;b', 'cap2=c;d']
self.stubs.Set(api, '_call_scheduler', zone_capabilities)
body = dict(zone=dict(username='zeb', password='sneaky'))
req = webob.Request.blank('/v1.0/zones/info')
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual(res_dict['zone']['name'], 'darksecret')
self.assertEqual(res_dict['zone']['cap1'], 'a;b')
self.assertEqual(res_dict['zone']['cap2'], 'c;d')

View File

@@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Openstack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,191 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Openstack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import unittest
from nova import context
from nova import test
from nova.image import glance
class StubGlanceClient(object):
def __init__(self, images, add_response=None, update_response=None):
self.images = images
self.add_response = add_response
self.update_response = update_response
def get_image_meta(self, image_id):
return self.images[image_id]
def get_images_detailed(self):
return self.images.itervalues()
def get_image(self, image_id):
return self.images[image_id], []
def add_image(self, metadata, data):
return self.add_response
def update_image(self, image_id, metadata, data):
return self.update_response
class NullWriter(object):
"""Used to test ImageService.get which takes a writer object"""
def write(self, *arg, **kwargs):
pass
class BaseGlanceTest(unittest.TestCase):
NOW_GLANCE_FORMAT = "2010-10-11T10:30:22"
NOW_DATETIME = datetime.datetime(2010, 10, 11, 10, 30, 22)
def setUp(self):
# FIXME(sirp): we can probably use stubs library here rather than
# dependency injection
self.client = StubGlanceClient(None)
self.service = glance.GlanceImageService(self.client)
self.context = context.RequestContext(None, None)
def assertDateTimesFilled(self, image_meta):
self.assertEqual(image_meta['created_at'], self.NOW_DATETIME)
self.assertEqual(image_meta['updated_at'], self.NOW_DATETIME)
self.assertEqual(image_meta['deleted_at'], self.NOW_DATETIME)
def assertDateTimesEmpty(self, image_meta):
self.assertEqual(image_meta['updated_at'], None)
self.assertEqual(image_meta['deleted_at'], None)
class TestGlanceImageServiceProperties(BaseGlanceTest):
def test_show_passes_through_to_client(self):
"""Ensure attributes which aren't BASE_IMAGE_ATTRS are stored in the
properties dict
"""
fixtures = {'image1': {'name': 'image1', 'is_public': True,
'foo': 'bar',
'properties': {'prop1': 'propvalue1'}}}
self.client.images = fixtures
image_meta = self.service.show(self.context, 'image1')
expected = {'name': 'image1', 'is_public': True,
'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}
self.assertEqual(image_meta, expected)
def test_detail_passes_through_to_client(self):
fixtures = {'image1': {'name': 'image1', 'is_public': True,
'foo': 'bar',
'properties': {'prop1': 'propvalue1'}}}
self.client.images = fixtures
image_meta = self.service.detail(self.context)
expected = [{'name': 'image1', 'is_public': True,
'properties': {'prop1': 'propvalue1', 'foo': 'bar'}}]
self.assertEqual(image_meta, expected)
class TestGetterDateTimeNoneTests(BaseGlanceTest):
def test_show_handles_none_datetimes(self):
self.client.images = self._make_none_datetime_fixtures()
image_meta = self.service.show(self.context, 'image1')
self.assertDateTimesEmpty(image_meta)
def test_detail_handles_none_datetimes(self):
self.client.images = self._make_none_datetime_fixtures()
image_meta = self.service.detail(self.context)[0]
self.assertDateTimesEmpty(image_meta)
def test_get_handles_none_datetimes(self):
self.client.images = self._make_none_datetime_fixtures()
writer = NullWriter()
image_meta = self.service.get(self.context, 'image1', writer)
self.assertDateTimesEmpty(image_meta)
def test_show_makes_datetimes(self):
self.client.images = self._make_datetime_fixtures()
image_meta = self.service.show(self.context, 'image1')
self.assertDateTimesFilled(image_meta)
def test_detail_makes_datetimes(self):
self.client.images = self._make_datetime_fixtures()
image_meta = self.service.detail(self.context)[0]
self.assertDateTimesFilled(image_meta)
def test_get_makes_datetimes(self):
self.client.images = self._make_datetime_fixtures()
writer = NullWriter()
image_meta = self.service.get(self.context, 'image1', writer)
self.assertDateTimesFilled(image_meta)
def _make_datetime_fixtures(self):
fixtures = {'image1': {'name': 'image1', 'is_public': True,
'created_at': self.NOW_GLANCE_FORMAT,
'updated_at': self.NOW_GLANCE_FORMAT,
'deleted_at': self.NOW_GLANCE_FORMAT}}
return fixtures
def _make_none_datetime_fixtures(self):
fixtures = {'image1': {'name': 'image1', 'is_public': True,
'updated_at': None,
'deleted_at': None}}
return fixtures
class TestMutatorDateTimeTests(BaseGlanceTest):
"""Tests create(), update()"""
def test_create_handles_datetimes(self):
self.client.add_response = self._make_datetime_fixture()
image_meta = self.service.create(self.context, {})
self.assertDateTimesFilled(image_meta)
def test_create_handles_none_datetimes(self):
self.client.add_response = self._make_none_datetime_fixture()
dummy_meta = {}
image_meta = self.service.create(self.context, dummy_meta)
self.assertDateTimesEmpty(image_meta)
def test_update_handles_datetimes(self):
self.client.update_response = self._make_datetime_fixture()
dummy_id = 'dummy_id'
dummy_meta = {}
image_meta = self.service.update(self.context, 'dummy_id', dummy_meta)
self.assertDateTimesFilled(image_meta)
def test_update_handles_none_datetimes(self):
self.client.update_response = self._make_none_datetime_fixture()
dummy_id = 'dummy_id'
dummy_meta = {}
image_meta = self.service.update(self.context, 'dummy_id', dummy_meta)
self.assertDateTimesEmpty(image_meta)
def _make_datetime_fixture(self):
fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
'created_at': self.NOW_GLANCE_FORMAT,
'updated_at': self.NOW_GLANCE_FORMAT,
'deleted_at': self.NOW_GLANCE_FORMAT}
return fixture
def _make_none_datetime_fixture(self):
fixture = {'id': 'image1', 'name': 'image1', 'is_public': True,
'updated_at': None,
'deleted_at': None}
return fixture

View File

@@ -44,6 +44,14 @@ flags.DECLARE('stub_network', 'nova.compute.manager')
flags.DECLARE('live_migration_retry_count', 'nova.compute.manager')
class FakeTime(object):
def __init__(self):
self.counter = 0
def sleep(self, t):
self.counter += t
class ComputeTestCase(test.TestCase):
"""Test case for compute"""
def setUp(self):
@@ -82,6 +90,21 @@ class ComputeTestCase(test.TestCase):
inst.update(params)
return db.instance_create(self.context, inst)['id']
def _create_instance_type(self, params={}):
"""Create a test instance"""
context = self.context.elevated()
inst = {}
inst['name'] = 'm1.small'
inst['memory_mb'] = '1024'
inst['vcpus'] = '1'
inst['local_gb'] = '20'
inst['flavorid'] = '1'
inst['swap'] = '2048'
inst['rxtx_quota'] = 100
inst['rxtx_cap'] = 200
inst.update(params)
return db.instance_type_create(context, inst)['id']
def _create_group(self):
values = {'name': 'testgroup',
'description': 'testgroup',
@@ -299,15 +322,53 @@ class ComputeTestCase(test.TestCase):
"""Ensure instance can be migrated/resized"""
instance_id = self._create_instance()
context = self.context.elevated()
self.compute.run_instance(self.context, instance_id)
db.instance_update(self.context, instance_id, {'host': 'foo'})
self.compute.prep_resize(context, instance_id)
self.compute.prep_resize(context, instance_id, 1)
migration_ref = db.migration_get_by_instance_and_status(context,
instance_id, 'pre-migrating')
self.compute.resize_instance(context, instance_id,
migration_ref['id'])
self.compute.terminate_instance(context, instance_id)
def test_resize_invalid_flavor_fails(self):
"""Ensure invalid flavors raise"""
instance_id = self._create_instance()
context = self.context.elevated()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.NotFound, self.compute_api.resize,
context, instance_id, 200)
self.compute.terminate_instance(context, instance_id)
def test_resize_down_fails(self):
"""Ensure resizing down raises and fails"""
context = self.context.elevated()
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
db.instance_update(self.context, instance_id,
{'instance_type': 'm1.xlarge'})
self.assertRaises(exception.ApiError, self.compute_api.resize,
context, instance_id, 1)
self.compute.terminate_instance(context, instance_id)
def test_resize_same_size_fails(self):
"""Ensure invalid flavors raise"""
context = self.context.elevated()
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.ApiError, self.compute_api.resize,
context, instance_id, 1)
self.compute.terminate_instance(context, instance_id)
def test_get_by_flavor_id(self):
type = instance_types.get_by_flavor_id(1)
self.assertEqual(type, 'm1.tiny')
@@ -318,10 +379,8 @@ class ComputeTestCase(test.TestCase):
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.Error, self.compute.prep_resize,
self.context, instance_id)
self.context, instance_id, 1)
self.compute.terminate_instance(self.context, instance_id)
type = instance_types.get_by_flavor_id("1")
self.assertEqual(type, 'm1.tiny')
def _setup_other_managers(self):
self.volume_manager = utils.import_object(FLAGS.volume_manager)
@@ -342,7 +401,7 @@ class ComputeTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.NotFound,
self.compute.pre_live_migration,
c, instance_ref['id'])
c, instance_ref['id'], time=FakeTime())
def test_pre_live_migration_instance_has_volume(self):
"""Confirm setup_compute_volume is called when volume is mounted."""
@@ -395,7 +454,7 @@ class ComputeTestCase(test.TestCase):
self.compute.driver = drivermock
self.mox.ReplayAll()
ret = self.compute.pre_live_migration(c, i_ref['id'])
ret = self.compute.pre_live_migration(c, i_ref['id'], time=FakeTime())
self.assertEqual(ret, None)
def test_pre_live_migration_setup_compute_node_fail(self):
@@ -428,7 +487,7 @@ class ComputeTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
self.compute.pre_live_migration,
c, i_ref['id'])
c, i_ref['id'], time=FakeTime())
def test_live_migration_works_correctly_with_volume(self):
"""Confirm check_for_export to confirm volume health check."""
@@ -575,3 +634,24 @@ class ComputeTestCase(test.TestCase):
db.instance_destroy(c, instance_id)
db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr)
def test_run_kill_vm(self):
"""Detect when a vm is terminated behind the scenes"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
instances = db.instance_get_all(context.get_admin_context())
LOG.info(_("Running instances: %s"), instances)
self.assertEqual(len(instances), 1)
instance_name = instances[0].name
self.compute.driver.test_remove_vm(instance_name)
# Force the compute manager to do its periodic poll
error_list = self.compute.periodic_tasks(context.get_admin_context())
self.assertFalse(error_list)
instances = db.instance_get_all(context.get_admin_context())
LOG.info(_("After force-killing instances: %s"), instances)
self.assertEqual(len(instances), 0)

View File

@@ -21,9 +21,10 @@ import sys
import unittest
import nova
from nova import test
class LocalizationTestCase(unittest.TestCase):
class LocalizationTestCase(test.TestCase):
def test_multiple_positional_format_placeholders(self):
pat = re.compile("\W_\(")
single_pat = re.compile("\W%\W")

View File

@@ -18,8 +18,12 @@ import errno
import os
import select
from eventlet import greenpool
from eventlet import greenthread
from nova import test
from nova.utils import parse_mailmap, str_dict_replace, synchronized
from nova import utils
from nova.utils import parse_mailmap, str_dict_replace
class ProjectTestCase(test.TestCase):
@@ -63,7 +67,7 @@ class ProjectTestCase(test.TestCase):
class LockTestCase(test.TestCase):
def test_synchronized_wrapped_function_metadata(self):
@synchronized('whatever')
@utils.synchronized('whatever')
def foo():
"""Bar"""
pass
@@ -72,11 +76,42 @@ class LockTestCase(test.TestCase):
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_synchronized(self):
def test_synchronized_internally(self):
"""We can lock across multiple green threads"""
saved_sem_num = len(utils._semaphores)
seen_threads = list()
@utils.synchronized('testlock2', external=False)
def f(id):
for x in range(10):
seen_threads.append(id)
greenthread.sleep(0)
threads = []
pool = greenpool.GreenPool(10)
for i in range(10):
threads.append(pool.spawn(f, i))
for thread in threads:
thread.wait()
self.assertEquals(len(seen_threads), 100)
# Looking at the seen threads, split it into chunks of 10, and verify
# that the last 9 match the first in each chunk.
for i in range(10):
for j in range(9):
self.assertEquals(seen_threads[i * 10],
seen_threads[i * 10 + 1 + j])
self.assertEqual(saved_sem_num, len(utils._semaphores),
"Semaphore leak detected")
def test_synchronized_externally(self):
"""We can lock across multiple processes"""
rpipe1, wpipe1 = os.pipe()
rpipe2, wpipe2 = os.pipe()
@synchronized('testlock')
@utils.synchronized('testlock1', external=True)
def f(rpipe, wpipe):
try:
os.write(wpipe, "foo")

View File

@@ -36,7 +36,7 @@ class RpcTestCase(test.TestCase):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
topic='test',
proxy=self.receiver)
self.consumer.attach_to_eventlet()
@@ -97,7 +97,7 @@ class RpcTestCase(test.TestCase):
nested = Nested()
conn = rpc.Connection.instance(True)
consumer = rpc.AdapterConsumer(connection=conn,
consumer = rpc.TopicAdapterConsumer(connection=conn,
topic='nested',
proxy=nested)
consumer.attach_to_eventlet()

View File

@@ -109,20 +109,29 @@ class ServiceTestCase(test.TestCase):
app = service.Service.create(host=host, binary=binary)
self.mox.StubOutWithMock(rpc,
'AdapterConsumer',
'TopicAdapterConsumer',
use_mock_anything=True)
rpc.AdapterConsumer(connection=mox.IgnoreArg(),
self.mox.StubOutWithMock(rpc,
'FanoutAdapterConsumer',
use_mock_anything=True)
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
rpc.AdapterConsumer)
rpc.TopicAdapterConsumer)
rpc.AdapterConsumer(connection=mox.IgnoreArg(),
rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
topic='%s.%s' % (topic, host),
proxy=mox.IsA(service.Service)).AndReturn(
rpc.AdapterConsumer)
rpc.TopicAdapterConsumer)
rpc.AdapterConsumer.attach_to_eventlet()
rpc.AdapterConsumer.attach_to_eventlet()
rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
topic=topic,
proxy=mox.IsA(service.Service)).AndReturn(
rpc.FanoutAdapterConsumer)
rpc.TopicAdapterConsumer.attach_to_eventlet()
rpc.TopicAdapterConsumer.attach_to_eventlet()
rpc.FanoutAdapterConsumer.attach_to_eventlet()
service_create = {'host': host,
'binary': binary,
@@ -279,6 +288,7 @@ class ServiceTestCase(test.TestCase):
self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
service.rpc.Connection.instance(new=mox.IgnoreArg())
service.rpc.Connection.instance(new=mox.IgnoreArg())
service.rpc.Connection.instance(new=mox.IgnoreArg())
self.mox.StubOutWithMock(serv.manager.driver,
'update_available_resource')
serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)

View File

@@ -34,7 +34,7 @@ class IsolationTestCase(test.TestCase):
def test_rpc_consumer_isolation(self):
connection = rpc.Connection.instance(new=True)
consumer = rpc.TopicConsumer(connection, topic='compute')
consumer = rpc.TopicAdapterConsumer(connection, topic='compute')
consumer.register_callback(
lambda x, y: self.fail('I should never be called'))
consumer.attach_to_eventlet()

View File

@@ -77,13 +77,11 @@ class CacheConcurrencyTestCase(test.TestCase):
eventlet.sleep(0)
try:
self.assertFalse(done2.ready())
self.assertTrue('fname' in conn._image_sems)
finally:
wait1.send()
done1.wait()
eventlet.sleep(0)
self.assertTrue(done2.ready())
self.assertFalse('fname' in conn._image_sems)
def test_different_fname_concurrency(self):
"""Ensures that two different fname caches are concurrent"""
@@ -429,6 +427,15 @@ class LibvirtConnTestCase(test.TestCase):
def fake_raise(self):
raise libvirt.libvirtError('ERR')
class FakeTime(object):
def __init__(self):
self.counter = 0
def sleep(self, t):
self.counter += t
fake_timer = FakeTime()
self.create_fake_libvirt_mock(nwfilterLookupByName=fake_raise)
instance_ref = db.instance_create(self.context, self.test_instance)
@@ -438,11 +445,15 @@ class LibvirtConnTestCase(test.TestCase):
conn = libvirt_conn.LibvirtConnection(False)
conn.firewall_driver.setattr('setup_basic_filtering', fake_none)
conn.firewall_driver.setattr('prepare_instance_filter', fake_none)
conn.ensure_filtering_rules_for_instance(instance_ref)
conn.ensure_filtering_rules_for_instance(instance_ref,
time=fake_timer)
except exception.Error, e:
c1 = (0 <= e.message.find('Timeout migrating for'))
self.assertTrue(c1)
self.assertEqual(29, fake_timer.counter, "Didn't wait the expected "
"amount of time")
db.instance_destroy(self.context, instance_ref['id'])
def test_live_migration_raises_exception(self):

View File

@@ -76,6 +76,40 @@ class ZoneManagerTestCase(test.TestCase):
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user1')
def test_service_capabilities(self):
zm = zone_manager.ZoneManager()
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, {})
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc10_a=(99, 99), svc10_b=(99, 99)))
zm.update_service_capabilities("svc1", "host3", dict(c=5))
caps = zm.get_zone_capabilities(self, None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc1_c=(5, 5), svc10_a=(99, 99),
svc10_b=(99, 99)))
caps = zm.get_zone_capabilities(self, 'svc1')
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc1_c=(5, 5)))
caps = zm.get_zone_capabilities(self, 'svc10')
self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()

View File

@@ -228,6 +228,9 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def VDI_get_by_uuid(*args):
return 'hurr'
def VDI_resize_online(*args):
pass
def VM_start(self, _1, ref, _2, _3):
vm = fake.get_record('VM', ref)
if vm['power_state'] != 'Halted':
@@ -240,7 +243,7 @@ class FakeSessionForMigrationTests(fake.SessionBase):
def stub_out_migration_methods(stubs):
def fake_get_snapshot(self, instance):
return 'foo', 'bar'
return 'vm_ref', dict(image='foo', snap='bar')
@classmethod
def fake_get_vdi(cls, session, vm_ref):
@@ -249,7 +252,7 @@ def stub_out_migration_methods(stubs):
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
return vdi_ref, {'uuid': vdi_rec['uuid'], }
def fake_shutdown(self, inst, vm, method='clean'):
def fake_shutdown(self, inst, vm, hard=True):
pass
@classmethod

View File

@@ -41,6 +41,7 @@ from xml.sax import saxutils
from eventlet import event
from eventlet import greenthread
from eventlet import semaphore
from eventlet.green import subprocess
None
from nova import exception
@@ -334,6 +335,14 @@ def utcnow():
utcnow.override_time = None
def is_older_than(before, seconds):
"""Return True if before is older than 'seconds'"""
if utcnow() - before > datetime.timedelta(seconds=seconds):
return True
else:
return False
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return time.mktime(utcnow().timetuple())
@@ -531,17 +540,76 @@ def loads(s):
return json.loads(s)
def synchronized(name):
_semaphores = {}
class _NoopContextManager(object):
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def synchronized(name, external=False):
"""Synchronization decorator
Decorating a method like so:
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the bar method at a time.
Different methods can share the same lock:
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
LOG.debug(_("Attempting to grab %(lock)s for method "
"%(method)s..." % {"lock": name,
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn\
# amically-allocating-and-destroying-mutexes
if name not in _semaphores:
_semaphores[name] = semaphore.Semaphore()
sem = _semaphores[name]
LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method '
'"%(method)s"...' % {"lock": name,
"method": f.__name__}))
lock = lockfile.FileLock(os.path.join(FLAGS.lock_path,
'nova-%s.lock' % name))
with lock:
return f(*args, **kwargs)
with sem:
if external:
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...' %
{"lock": name, "method": f.__name__}))
lock_file_path = os.path.join(FLAGS.lock_path,
'nova-%s.lock' % name)
lock = lockfile.FileLock(lock_file_path)
else:
lock = _NoopContextManager()
with lock:
retval = f(*args, **kwargs)
# If no-one else is waiting for it, delete it.
# See note about possible raciness above.
if not sem.balance < 1:
del _semaphores[name]
return retval
return inner
return wrap
@@ -635,3 +703,12 @@ def subset_dict(dict_, keys):
"""Return a dict that only contains a subset of keys"""
subset = partition_dict(dict_, keys)[0]
return subset
def check_isinstance(obj, cls):
"""Checks that obj is of type cls, and lets PyLint infer types"""
if isinstance(obj, cls):
return obj
raise Exception(_("Expected object of type: %s") % (str(cls)))
# TODO(justinsb): Can we make this better??
return cls() # Ugly PyLint hack

View File

@@ -23,6 +23,8 @@ import sys
from nova import flags
from nova import log as logging
from nova import utils
from nova.virt import driver
from nova.virt import fake
from nova.virt import libvirt_conn
from nova.virt import xenapi_conn
@@ -72,4 +74,4 @@ def get_connection(read_only=False):
if conn is None:
LOG.error(_('Failed to open connection to the hypervisor'))
sys.exit(1)
return conn
return utils.check_isinstance(conn, driver.ComputeDriver)

234
nova/virt/driver.py Normal file
View File

@@ -0,0 +1,234 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Driver base-classes:
(Beginning of) the contract that compute drivers must follow, and shared
types that support that contract
"""
from nova.compute import power_state
class InstanceInfo(object):
def __init__(self, name, state):
self.name = name
assert state in power_state.valid_states(), "Bad state: %s" % state
self.state = state
class ComputeDriver(object):
"""Base class for compute drivers.
Lots of documentation is currently on fake.py.
"""
def init_host(self, host):
"""Adopt existing VM's running here"""
raise NotImplementedError()
def get_info(self, instance_name):
"""Get the current status of an instance, by name (not ID!)
Returns a dict containing:
:state: the running state, one of the power_state codes
:max_mem: (int) the maximum memory in KBytes allowed
:mem: (int) the memory in KBytes used by the domain
:num_cpu: (int) the number of virtual CPUs for the domain
:cpu_time: (int) the CPU time used in nanoseconds
"""
raise NotImplementedError()
def list_instances(self):
raise NotImplementedError()
def list_instances_detail(self):
"""Return a list of InstanceInfo for all registered VMs"""
raise NotImplementedError()
def spawn(self, instance):
"""Launch a VM for the specified instance"""
raise NotImplementedError()
def destroy(self, instance, cleanup=True):
"""Shutdown specified VM"""
raise NotImplementedError()
def reboot(self, instance):
"""Reboot specified VM"""
raise NotImplementedError()
def snapshot_instance(self, context, instance_id, image_id):
raise NotImplementedError()
def get_console_pool_info(self, console_type):
"""???
Returns a dict containing:
:address: ???
:username: ???
:password: ???
"""
raise NotImplementedError()
def get_console_output(self, instance):
raise NotImplementedError()
def get_ajax_console(self, instance):
raise NotImplementedError()
def get_diagnostics(self, instance):
"""Return data about VM diagnostics"""
raise NotImplementedError()
def get_host_ip_addr(self):
raise NotImplementedError()
def attach_volume(self, context, instance_id, volume_id, mountpoint):
raise NotImplementedError()
def detach_volume(self, context, instance_id, volume_id):
raise NotImplementedError()
def compare_cpu(self, context, cpu_info):
raise NotImplementedError()
def migrate_disk_and_power_off(self, instance, dest):
"""Transfers the VHD of a running instance to another host, then shuts
off the instance copies over the COW disk"""
raise NotImplementedError()
def snapshot(self, instance, image_id):
""" Create snapshot from a running VM instance """
raise NotImplementedError()
def finish_resize(self, instance, disk_info):
"""Completes a resize, turning on the migrated instance"""
raise NotImplementedError()
def revert_resize(self, instance):
"""Reverts a resize, powering back on the instance"""
raise NotImplementedError()
def pause(self, instance, callback):
"""Pause VM instance"""
raise NotImplementedError()
def unpause(self, instance, callback):
"""Unpause paused VM instance"""
raise NotImplementedError()
def suspend(self, instance, callback):
"""suspend the specified instance"""
raise NotImplementedError()
def resume(self, instance, callback):
"""resume the specified instance"""
raise NotImplementedError()
def rescue(self, instance, callback):
"""Rescue the specified instance"""
raise NotImplementedError()
def unrescue(self, instance, callback):
"""Unrescue the specified instance"""
raise NotImplementedError()
def update_available_resource(self, ctxt, host):
"""Updates compute manager resource info on ComputeNode table.
This method is called when nova-compute launches, and
whenever admin executes "nova-manage service update_resource".
:param ctxt: security context
:param host: hostname that compute manager is currently running
"""
raise NotImplementedError()
def live_migration(self, ctxt, instance_ref, dest,
post_method, recover_method):
"""Spawning live_migration operation for distributing high-load.
:params ctxt: security context
:params instance_ref:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:params dest: destination host
:params post_method:
post operation method.
expected nova.compute.manager.post_live_migration.
:params recover_method:
recovery method when any exception occurs.
expected nova.compute.manager.recover_live_migration.
"""
raise NotImplementedError()
def refresh_security_group_rules(self, security_group_id):
raise NotImplementedError()
def refresh_security_group_members(self, security_group_id):
raise NotImplementedError()
def reset_network(self, instance):
"""reset networking for specified instance"""
raise NotImplementedError()
def ensure_filtering_rules_for_instance(self, instance_ref):
"""Setting up filtering rules and waiting for its completion.
To migrate an instance, filtering rules to hypervisors
and firewalls are inevitable on destination host.
( Waiting only for filtering rules to hypervisor,
since filtering rules to firewall rules can be set faster).
Concretely, the below method must be called.
- setup_basic_filtering (for nova-basic, etc.)
- prepare_instance_filter(for nova-instance-instance-xxx, etc.)
to_xml may have to be called since it defines PROJNET, PROJMASK.
but libvirt migrates those value through migrateToURI(),
so , no need to be called.
Don't use thread for this method since migration should
not be started when setting-up filtering rules operations
are not completed.
:params instance_ref: nova.db.sqlalchemy.models.Instance object
"""
raise NotImplementedError()
def unfilter_instance(self, instance):
"""Stop filtering instance"""
raise NotImplementedError()
def set_admin_password(self, context, instance_id, new_pass=None):
"""Set the root/admin password for an instance on this server."""
raise NotImplementedError()
def inject_file(self, instance, b64_path, b64_contents):
"""Create a file on the VM instance. The file path and contents
should be base64-encoded.
"""
raise NotImplementedError()
def inject_network_info(self, instance):
"""inject network info for specified instance"""
raise NotImplementedError()

View File

@@ -26,7 +26,9 @@ semantics of real hypervisor connections.
"""
from nova import exception
from nova import utils
from nova.compute import power_state
from nova.virt import driver
def get_connection(_):
@@ -34,7 +36,14 @@ def get_connection(_):
return FakeConnection.instance()
class FakeConnection(object):
class FakeInstance(object):
def __init__(self, name, state):
self.name = name
self.state = state
class FakeConnection(driver.ComputeDriver):
"""
The interface to this class talks in terms of 'instances' (Amazon EC2 and
internal Nova terminology), by which we mean 'running virtual machine'
@@ -90,6 +99,17 @@ class FakeConnection(object):
"""
return self.instances.keys()
def _map_to_instance_info(self, instance):
instance = utils.check_isinstance(instance, FakeInstance)
info = driver.InstanceInfo(instance.name, instance.state)
return info
def list_instances_detail(self):
info_list = []
for instance in self.instances.values():
info_list.append(self._map_to_instance_info(instance))
return info_list
def spawn(self, instance):
"""
Create a new instance/VM/domain on the virtualization platform.
@@ -109,9 +129,10 @@ class FakeConnection(object):
that it was before this call began.
"""
fake_instance = FakeInstance()
self.instances[instance.name] = fake_instance
fake_instance._state = power_state.RUNNING
name = instance.name
state = power_state.RUNNING
fake_instance = FakeInstance(name, state)
self.instances[name] = fake_instance
def snapshot(self, instance, name):
"""
@@ -270,7 +291,7 @@ class FakeConnection(object):
raise exception.NotFound(_("Instance %s Not Found")
% instance_name)
i = self.instances[instance_name]
return {'state': i._state,
return {'state': i.state,
'max_mem': 0,
'mem': 0,
'num_cpu': 2,
@@ -428,8 +449,6 @@ class FakeConnection(object):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
class FakeInstance(object):
def __init__(self):
self._state = power_state.NOSTATE
def test_remove_vm(self, instance_name):
""" Removes the named VM, as if it crashed. For testing"""
self.instances.pop(instance_name)

View File

@@ -68,6 +68,7 @@ from nova import flags
from nova import log as logging
from nova.auth import manager
from nova.compute import power_state
from nova.virt import driver
from nova.virt import images
wmi = None
@@ -108,8 +109,9 @@ def get_connection(_):
return HyperVConnection()
class HyperVConnection(object):
class HyperVConnection(driver.ComputeDriver):
def __init__(self):
super(HyperVConnection, self).__init__()
self._conn = wmi.WMI(moniker='//./root/virtualization')
self._cim_conn = wmi.WMI(moniker='//./root/cimv2')
@@ -124,6 +126,19 @@ class HyperVConnection(object):
for v in self._conn.Msvm_ComputerSystem(['ElementName'])]
return vms
def list_instances_detail(self):
# TODO(justinsb): This is a terrible implementation (1+N)
instance_infos = []
for instance_name in self.list_instances():
info = self.get_info(instance_name)
state = info['state']
instance_info = driver.InstanceInfo(instance_name, state)
instance_infos.append(instance_info)
return instance_infos
def spawn(self, instance):
""" Create a new VM and start it."""
vm = self._lookup(instance.name)
@@ -345,7 +360,7 @@ class HyperVConnection(object):
newinst = cl.new()
#Copy the properties from the original.
for prop in wmi_obj._properties:
newinst.Properties_.Item(prop).Value =\
newinst.Properties_.Item(prop).Value = \
wmi_obj.Properties_.Item(prop).Value
return newinst
@@ -467,3 +482,6 @@ class HyperVConnection(object):
if vm is None:
raise exception.NotFound('Cannot detach volume from missing %s '
% instance_name)
def poll_rescued_instances(self, timeout):
pass

View File

@@ -42,13 +42,12 @@ import shutil
import sys
import random
import subprocess
import time
import uuid
from xml.dom import minidom
from eventlet import greenthread
from eventlet import tpool
from eventlet import semaphore
import IPy
from nova import context
@@ -62,6 +61,7 @@ from nova.auth import manager
from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import disk
from nova.virt import driver
from nova.virt import images
libvirt = None
@@ -133,8 +133,8 @@ def get_connection(read_only):
def _late_load_cheetah():
global Template
if Template is None:
t = __import__('Cheetah.Template', globals(), locals(), ['Template'],
-1)
t = __import__('Cheetah.Template', globals(), locals(),
['Template'], -1)
Template = t.Template
@@ -153,9 +153,10 @@ def _get_ip_version(cidr):
return int(net.version())
class LibvirtConnection(object):
class LibvirtConnection(driver.ComputeDriver):
def __init__(self, read_only):
super(LibvirtConnection, self).__init__()
self.libvirt_uri = self.get_uri()
self.libvirt_xml = open(FLAGS.libvirt_xml_template).read()
@@ -235,6 +236,29 @@ class LibvirtConnection(object):
return [self._conn.lookupByID(x).name()
for x in self._conn.listDomainsID()]
def _map_to_instance_info(self, domain):
"""Gets info from a virsh domain object into an InstanceInfo"""
# domain.info() returns a list of:
# state: one of the state values (virDomainState)
# maxMemory: the maximum memory used by the domain
# memory: the current amount of memory used by the domain
# nbVirtCPU: the number of virtual CPU
# puTime: the time used by the domain in nanoseconds
(state, _max_mem, _mem, _num_cpu, _cpu_time) = domain.info()
name = domain.name()
return driver.InstanceInfo(name, state)
def list_instances_detail(self):
infos = []
for domain_id in self._conn.listDomainsID():
domain = self._conn.lookupByID(domain_id)
info = self._map_to_instance_info(domain)
infos.append(info)
return infos
def destroy(self, instance, cleanup=True):
try:
virt_dom = self._conn.lookupByName(instance['name'])
@@ -416,6 +440,10 @@ class LibvirtConnection(object):
# the normal xml file, we can just call reboot here
self.reboot(instance)
@exception.wrap_exception
def poll_rescued_instances(self, timeout):
pass
@exception.wrap_exception
def spawn(self, instance):
xml = self.to_xml(instance)
@@ -556,13 +584,12 @@ class LibvirtConnection(object):
os.mkdir(base_dir)
base = os.path.join(base_dir, fname)
if fname not in LibvirtConnection._image_sems:
LibvirtConnection._image_sems[fname] = semaphore.Semaphore()
with LibvirtConnection._image_sems[fname]:
@utils.synchronized(fname)
def call_if_not_exists(base, fn, *args, **kwargs):
if not os.path.exists(base):
fn(target=base, *args, **kwargs)
if not LibvirtConnection._image_sems[fname].locked():
del LibvirtConnection._image_sems[fname]
call_if_not_exists(base, fn, *args, **kwargs)
if cow:
utils.execute('qemu-img', 'create', '-f', 'qcow2', '-o',
@@ -1144,7 +1171,8 @@ class LibvirtConnection(object):
return
def ensure_filtering_rules_for_instance(self, instance_ref):
def ensure_filtering_rules_for_instance(self, instance_ref,
time=None):
"""Setting up filtering rules and waiting for its completion.
To migrate an instance, filtering rules to hypervisors
@@ -1168,6 +1196,9 @@ class LibvirtConnection(object):
"""
if not time:
time = greenthread
# If any instances never launch at destination host,
# basic-filtering must be set here.
self.firewall_driver.setup_basic_filtering(instance_ref)
@@ -1780,15 +1811,15 @@ class IptablesFirewallDriver(FirewallDriver):
pass
def refresh_security_group_rules(self, security_group):
# We use the semaphore to make sure noone applies the rule set
# after we've yanked the existing rules but before we've put in
# the new ones.
with self.iptables.semaphore:
for instance in self.instances.values():
self.remove_filters_for_instance(instance)
self.add_filters_for_instance(instance)
self.do_refresh_security_group_rules(security_group)
self.iptables.apply()
@utils.synchronized('iptables', external=True)
def do_refresh_security_group_rules(self, security_group):
for instance in self.instances.values():
self.remove_filters_for_instance(instance)
self.add_filters_for_instance(instance)
def _security_group_chain_name(self, security_group_id):
return 'nova-sg-%s' % (security_group_id,)

View File

@@ -36,6 +36,7 @@ from nova import utils
from nova.auth.manager import AuthManager
from nova.compute import power_state
from nova.virt import driver
from nova.virt.xenapi.network_utils import NetworkHelper
from nova.virt.xenapi.vm_utils import VMHelper
from nova.virt.xenapi.vm_utils import ImageType
@@ -51,11 +52,14 @@ class VMOps(object):
def __init__(self, session):
self.XenAPI = session.get_imported_xenapi()
self._session = session
self.poll_rescue_last_ran = None
VMHelper.XenAPI = self.XenAPI
def list_instances(self):
"""List VM instances"""
# TODO(justinsb): Should we just always use the details method?
# Seems to be the same number of API calls..
vm_refs = []
for vm_ref in self._session.get_xenapi().VM.get_all():
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref)
@@ -63,6 +67,33 @@ class VMOps(object):
vm_refs.append(vm_rec["name_label"])
return vm_refs
def list_instances_detail(self):
"""List VM instances, returning InstanceInfo objects"""
instance_infos = []
for vm_ref in self._session.get_xenapi().VM.get_all():
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref)
if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]:
name = vm_rec["name_label"]
# TODO(justinsb): This a roundabout way to map the state
openstack_format = VMHelper.compile_info(vm_rec)
state = openstack_format['state']
instance_info = driver.InstanceInfo(name, state)
instance_infos.append(instance_info)
return instance_infos
def revert_resize(self, instance):
vm_ref = VMHelper.lookup(self._session, instance.name)
self._start(instance, vm_ref)
def finish_resize(self, instance, disk_info):
vdi_uuid = self.link_disks(instance, disk_info['base_copy'],
disk_info['cow'])
vm_ref = self._create_vm(instance, vdi_uuid)
self.resize_instance(instance, vdi_uuid)
self._spawn(instance, vm_ref)
def _start(self, instance, vm_ref=None):
"""Power on a VM instance"""
if not vm_ref:
@@ -73,7 +104,7 @@ class VMOps(object):
LOG.debug(_("Starting instance %s"), instance.name)
self._session.call_xenapi('VM.start', vm_ref, False, False)
def create_disk(self, instance):
def _create_disk(self, instance):
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
disk_image_type = VMHelper.determine_disk_image_type(instance)
@@ -82,10 +113,11 @@ class VMOps(object):
return vdi_uuid
def spawn(self, instance, network_info=None):
vdi_uuid = self.create_disk(instance)
self._spawn_with_disk(instance, vdi_uuid, network_info)
vdi_uuid = self._create_disk(instance)
vm_ref = self._create_vm(instance, vdi_uuid, network_info)
self._spawn(instance, vm_ref)
def _spawn_with_disk(self, instance, vdi_uuid, network_info=None):
def _create_vm(self, instance, vdi_uuid, network_info=None):
"""Create VM instance"""
instance_name = instance.name
vm_ref = VMHelper.lookup(self._session, instance_name)
@@ -128,16 +160,19 @@ class VMOps(object):
VMHelper.create_vbd(session=self._session, vm_ref=vm_ref,
vdi_ref=vdi_ref, userdevice=0, bootable=True)
# inject_network_info and create vifs
# TODO(tr3buchet) - check to make sure we have network info, otherwise
# create it now. This goes away once nova-multi-nic hits.
if network_info is None:
network_info = self._get_network_info(instance)
self.create_vifs(vm_ref, network_info)
self.inject_network_info(instance, vm_ref, network_info)
return vm_ref
def _spawn(self, instance, vm_ref):
"""Spawn a new instance"""
LOG.debug(_('Starting VM %s...'), vm_ref)
self._start(instance, vm_ref)
instance_name = instance.name
LOG.info(_('Spawning VM %(instance_name)s created %(vm_ref)s.')
% locals())
@@ -310,7 +345,7 @@ class VMOps(object):
try:
# transfer the base copy
template_vm_ref, template_vdi_uuids = self._get_snapshot(instance)
base_copy_uuid = template_vdi_uuids[1]
base_copy_uuid = template_vdi_uuids['image']
vdi_ref, vm_vdi_rec = \
VMHelper.get_vdi_for_vm_safely(self._session, vm_ref)
cow_uuid = vm_vdi_rec['uuid']
@@ -325,7 +360,7 @@ class VMOps(object):
self._session.wait_for_task(task, instance.id)
# Now power down the instance and transfer the COW VHD
self._shutdown(instance, vm_ref, method='clean')
self._shutdown(instance, vm_ref, hard=False)
params = {'host': dest,
'vdi_uuid': cow_uuid,
@@ -345,7 +380,7 @@ class VMOps(object):
# sensible so we don't need to blindly pass around dictionaries
return {'base_copy': base_copy_uuid, 'cow': cow_uuid}
def attach_disk(self, instance, base_copy_uuid, cow_uuid):
def link_disks(self, instance, base_copy_uuid, cow_uuid):
"""Links the base copy VHD to the COW via the XAPI plugin"""
vm_ref = VMHelper.lookup(self._session, instance.name)
new_base_copy_uuid = str(uuid.uuid4())
@@ -366,9 +401,19 @@ class VMOps(object):
return new_cow_uuid
def resize(self, instance, flavor):
def resize_instance(self, instance, vdi_uuid):
"""Resize a running instance by changing it's RAM and disk size """
raise NotImplementedError()
#TODO(mdietz): this will need to be adjusted for swap later
#The new disk size must be in bytes
new_disk_size = str(instance.local_gb * 1024 * 1024 * 1024)
instance_name = instance.name
instance_local_gb = instance.local_gb
LOG.debug(_("Resizing VDI %(vdi_uuid)s for instance %(instance_name)s."
" Expanding to %(instance_local_gb)d GB") % locals())
vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
self._session.call_xenapi('VDI.resize_online', vdi_ref, new_disk_size)
LOG.debug(_("Resize instance %s complete") % (instance.name))
def reboot(self, instance):
"""Reboot VM instance"""
@@ -443,8 +488,9 @@ class VMOps(object):
"""Shutdown an instance"""
state = self.get_info(instance['name'])['state']
if state == power_state.SHUTDOWN:
LOG.warn(_("VM %(vm)s already halted, skipping shutdown...") %
locals())
instance_name = instance.name
LOG.warn(_("VM %(instance_name)s already halted,"
"skipping shutdown...") % locals())
return
instance_id = instance.id
@@ -462,6 +508,10 @@ class VMOps(object):
except self.XenAPI.Failure, exc:
LOG.exception(exc)
def _shutdown_rescue(self, rescue_vm_ref):
"""Shutdown a rescue instance"""
self._session.call_xenapi("Async.VM.hard_shutdown", rescue_vm_ref)
def _destroy_vdis(self, instance, vm_ref):
"""Destroys all VDIs associated with a VM"""
instance_id = instance.id
@@ -479,6 +529,24 @@ class VMOps(object):
except self.XenAPI.Failure, exc:
LOG.exception(exc)
def _destroy_rescue_vdis(self, rescue_vm_ref):
"""Destroys all VDIs associated with a rescued VM"""
vdi_refs = VMHelper.lookup_vm_vdis(self._session, rescue_vm_ref)
for vdi_ref in vdi_refs:
try:
self._session.call_xenapi("Async.VDI.destroy", vdi_ref)
except self.XenAPI.Failure:
continue
def _destroy_rescue_vbds(self, rescue_vm_ref):
"""Destroys all VBDs tied to a rescue VM"""
vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref)
for vbd_ref in vbd_refs:
vbd_rec = self._session.get_xenapi().VBD.get_record(vbd_ref)
if vbd_rec["userdevice"] == "1": # primary VBD is always 1
VMHelper.unplug_vbd(self._session, vbd_ref)
VMHelper.destroy_vbd(self._session, vbd_ref)
def _destroy_kernel_ramdisk(self, instance, vm_ref):
"""
Three situations can occur:
@@ -529,6 +597,14 @@ class VMOps(object):
LOG.debug(_("Instance %(instance_id)s VM destroyed") % locals())
def _destroy_rescue_instance(self, rescue_vm_ref):
"""Destroy a rescue instance"""
self._destroy_rescue_vbds(rescue_vm_ref)
self._shutdown_rescue(rescue_vm_ref)
self._destroy_rescue_vdis(rescue_vm_ref)
self._session.call_xenapi("Async.VM.destroy", rescue_vm_ref)
def destroy(self, instance):
"""
Destroy VM instance
@@ -632,41 +708,57 @@ class VMOps(object):
"""
rescue_vm_ref = VMHelper.lookup(self._session,
instance.name + "-rescue")
instance.name + "-rescue")
if not rescue_vm_ref:
raise exception.NotFound(_(
"Instance is not in Rescue Mode: %s" % instance.name))
original_vm_ref = self._get_vm_opaque_ref(instance)
vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref)
instance._rescue = False
for vbd_ref in vbd_refs:
_vbd_ref = self._session.get_xenapi().VBD.get_record(vbd_ref)
if _vbd_ref["userdevice"] == "1":
VMHelper.unplug_vbd(self._session, vbd_ref)
VMHelper.destroy_vbd(self._session, vbd_ref)
task1 = self._session.call_xenapi("Async.VM.hard_shutdown",
rescue_vm_ref)
self._session.wait_for_task(task1, instance.id)
vdi_refs = VMHelper.lookup_vm_vdis(self._session, rescue_vm_ref)
for vdi_ref in vdi_refs:
try:
task = self._session.call_xenapi('Async.VDI.destroy', vdi_ref)
self._session.wait_for_task(task, instance.id)
except self.XenAPI.Failure:
continue
task2 = self._session.call_xenapi('Async.VM.destroy', rescue_vm_ref)
self._session.wait_for_task(task2, instance.id)
self._destroy_rescue_instance(rescue_vm_ref)
self._release_bootlock(original_vm_ref)
self._start(instance, original_vm_ref)
def poll_rescued_instances(self, timeout):
"""Look for expirable rescued instances
- forcibly exit rescue mode for any instances that have been
in rescue mode for >= the provided timeout
"""
last_ran = self.poll_rescue_last_ran
if last_ran:
if not utils.is_older_than(last_ran, timeout):
# Do not run. Let's bail.
return
else:
# Update the time tracker and proceed.
self.poll_rescue_last_ran = utils.utcnow()
else:
# We need a base time to start tracking.
self.poll_rescue_last_ran = utils.utcnow()
return
rescue_vms = []
for instance in self.list_instances():
if instance.endswith("-rescue"):
rescue_vms.append(dict(name=instance,
vm_ref=VMHelper.lookup(self._session,
instance)))
for vm in rescue_vms:
rescue_name = vm["name"]
rescue_vm_ref = vm["vm_ref"]
self._destroy_rescue_instance(rescue_vm_ref)
original_name = vm["name"].split("-rescue", 1)[0]
original_vm_ref = VMHelper.lookup(self._session, original_name)
self._release_bootlock(original_vm_ref)
self._session.call_xenapi("VM.start", original_vm_ref, False,
False)
def get_info(self, instance):
"""Return data about VM instance"""
vm_ref = self._get_vm_opaque_ref(instance)
@@ -723,8 +815,9 @@ class VMOps(object):
'mac': instance.mac_address,
'rxtx_cap': flavor['rxtx_cap'],
'dns': [network['dns']],
'ips': [ip_dict(ip) for ip in network_IPs],
'ip6s': [ip6_dict(ip) for ip in network_IPs]}
'ips': [ip_dict(ip) for ip in network_IPs]}
if network['cidr_v6']:
info['ip6s'] = [ip6_dict(ip) for ip in network_IPs]
network_info.append((network, info))
return network_info
@@ -915,7 +1008,7 @@ class VMOps(object):
"""
vm_ref = self._get_vm_opaque_ref(instance_or_vm)
data = self._session.call_xenapi_request('VM.get_xenstore_data',
(vm_ref, ))
(vm_ref,))
ret = {}
if keys is None:
keys = data.keys()

View File

@@ -69,6 +69,7 @@ from nova import db
from nova import utils
from nova import flags
from nova import log as logging
from nova.virt import driver
from nova.virt.xenapi.vmops import VMOps
from nova.virt.xenapi.volumeops import VolumeOps
@@ -141,10 +142,11 @@ def get_connection(_):
return XenAPIConnection(url, username, password)
class XenAPIConnection(object):
class XenAPIConnection(driver.ComputeDriver):
"""A connection to XenServer or Xen Cloud Platform"""
def __init__(self, url, user, pw):
super(XenAPIConnection, self).__init__()
session = XenAPISession(url, user, pw)
self._vmops = VMOps(session)
self._volumeops = VolumeOps(session)
@@ -160,24 +162,25 @@ class XenAPIConnection(object):
"""List VM instances"""
return self._vmops.list_instances()
def list_instances_detail(self):
return self._vmops.list_instances_detail()
def spawn(self, instance):
"""Create VM instance"""
self._vmops.spawn(instance)
def revert_resize(self, instance):
"""Reverts a resize, powering back on the instance"""
self._vmops.revert_resize(instance)
def finish_resize(self, instance, disk_info):
"""Completes a resize, turning on the migrated instance"""
vdi_uuid = self._vmops.attach_disk(instance, disk_info['base_copy'],
disk_info['cow'])
self._vmops._spawn_with_disk(instance, vdi_uuid)
self._vmops.finish_resize(instance, disk_info)
def snapshot(self, instance, image_id):
""" Create snapshot from a running VM instance """
self._vmops.snapshot(instance, image_id)
def resize(self, instance, flavor):
"""Resize a VM instance"""
raise NotImplementedError()
def reboot(self, instance):
"""Reboot VM instance"""
self._vmops.reboot(instance)
@@ -225,6 +228,10 @@ class XenAPIConnection(object):
"""Unrescue the specified instance"""
self._vmops.unrescue(instance, callback)
def poll_rescued_instances(self, timeout):
"""Poll for rescued instances"""
self._vmops.poll_rescued_instances(timeout)
def reset_network(self, instance):
"""reset networking for specified instance"""
self._vmops.reset_network(instance)

View File

@@ -64,14 +64,15 @@ flags.DEFINE_boolean('use_local_volumes', True,
'if True, will not discover local volumes')
class VolumeManager(manager.Manager):
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
def __init__(self, volume_driver=None, *args, **kwargs):
"""Load the driver from the one specified in args, or from flags."""
if not volume_driver:
volume_driver = FLAGS.volume_driver
self.driver = utils.import_object(volume_driver)
super(VolumeManager, self).__init__(*args, **kwargs)
super(VolumeManager, self).__init__(service_name='volume',
*args, **kwargs)
# NOTE(vish): Implementation specific db handling is done
# by the driver.
self.driver.db = self.db

View File

@@ -22,6 +22,7 @@ XenAPI Plugin for transfering data between host nodes
import os
import os.path
import pickle
import shlex
import shutil
import subprocess
@@ -97,7 +98,7 @@ def transfer_vhd(session, args):
logging.debug("Preparing to transmit %s to %s" % (source_path,
dest_path))
ssh_cmd = 'ssh -o StrictHostKeyChecking=no'
ssh_cmd = '\"ssh -o StrictHostKeyChecking=no\"'
rsync_args = shlex.split('nohup /usr/bin/rsync -av --progress -e %s %s %s'
% (ssh_cmd, source_path, dest_path))

View File

@@ -32,7 +32,6 @@ SUITE_NAMES = '[image, instance, volume]'
FLAGS = flags.FLAGS
flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
flags.DEFINE_integer('ssh_tries', 3, 'Numer of times to try ssh')
boto_v6 = None
class SmokeTestCase(unittest.TestCase):
@@ -183,6 +182,9 @@ class SmokeTestCase(unittest.TestCase):
TEST_DATA = {}
if FLAGS.use_ipv6:
global boto_v6
boto_v6 = __import__('boto_v6')
class UserSmokeTestCase(SmokeTestCase):