trunk merge

This commit is contained in:
Sandy Walsh 2011-02-25 13:42:37 -08:00
commit f3efbaa3d1
35 changed files with 1165 additions and 155 deletions

View File

@ -15,6 +15,7 @@
<corywright@gmail.com> <cory.wright@rackspace.com>
<devin.carlen@gmail.com> <devcamcar@illian.local>
<ewan.mellor@citrix.com> <emellor@silver>
<itoumsn@nttdata.co.jp> <itoumsn@shayol>
<jaypipes@gmail.com> <jpipes@serialcoder>
<jmckenty@gmail.com> <jmckenty@joshua-mckentys-macbook-pro.local>
<jmckenty@gmail.com> <jmckenty@yyj-dhcp171.corp.flock.com>

View File

@ -39,6 +39,7 @@ Ken Pepple <ken.pepple@gmail.com>
Kevin L. Mitchell <kevin.mitchell@rackspace.com>
Koji Iida <iida.koji@lab.ntt.co.jp>
Lorin Hochstein <lorin@isi.edu>
Masanori Itoh <itoumsn@nttdata.co.jp>
Matt Dietz <matt.dietz@rackspace.com>
Michael Gundlach <michael.gundlach@rackspace.com>
Monsyne Dragon <mdragon@rackspace.com>

View File

@ -36,49 +36,15 @@ gettext.install('nova', unicode=1)
from nova import flags
from nova import log as logging
from nova import service
from nova import utils
from nova import version
from nova import wsgi
LOG = logging.getLogger('nova.api')
FLAGS = flags.FLAGS
flags.DEFINE_string('ec2_listen', "0.0.0.0",
'IP address for EC2 API to listen')
flags.DEFINE_integer('ec2_listen_port', 8773, 'port for ec2 api to listen')
flags.DEFINE_string('osapi_listen', "0.0.0.0",
'IP address for OpenStack API to listen')
flags.DEFINE_integer('osapi_listen_port', 8774, 'port for os api to listen')
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
API_ENDPOINTS = ['ec2', 'osapi']
def run_app(paste_config_file):
LOG.debug(_("Using paste.deploy config at: %s"), paste_config_file)
apps = []
for api in API_ENDPOINTS:
config = wsgi.load_paste_configuration(paste_config_file, api)
if config is None:
LOG.debug(_("No paste configuration for app: %s"), api)
continue
LOG.debug(_("App Config: %(api)s\n%(config)r") % locals())
LOG.info(_("Running %s API"), api)
app = wsgi.load_paste_app(paste_config_file, api)
apps.append((app, getattr(FLAGS, "%s_listen_port" % api),
getattr(FLAGS, "%s_listen" % api)))
if len(apps) == 0:
LOG.error(_("No known API applications configured in %s."),
paste_config_file)
return
server = wsgi.Server()
for app in apps:
server.start(*app)
server.wait()
if __name__ == '__main__':
utils.default_flagfile()
@ -90,8 +56,6 @@ if __name__ == '__main__':
for flag in FLAGS:
flag_get = FLAGS.get(flag, None)
LOG.debug("%(flag)s : %(flag_get)s" % locals())
conf = wsgi.paste_config_file('nova-api.conf')
if conf:
run_app(conf)
else:
LOG.error(_("No paste configuration found for: %s"), 'nova-api.conf')
service = service.serve_wsgi(service.ApiService)
service.wait()

View File

@ -198,6 +198,12 @@ class Requestify(wsgi.Middleware):
try:
# Raise KeyError if omitted
action = req.params['Action']
# Fix bug lp:720157 for older (version 1) clients
version = req.params['SignatureVersion']
if int(version) == 1:
non_args.remove('SignatureMethod')
if 'SignatureMethod' in args:
args.pop('SignatureMethod')
for non_arg in non_args:
# Remove, but raise KeyError if omitted
args.pop(non_arg)

View File

@ -52,7 +52,23 @@ def _database_to_isoformat(datetimeobj):
def _try_convert(value):
"""Return a non-string if possible"""
"""Return a non-string from a string or unicode, if possible.
============= =====================================================
When value is returns
============= =====================================================
zero-length ''
'None' None
'True' True
'False' False
'0', '-0' 0
0xN, -0xN int from hex (postitive) (N is any number)
0bN, -0bN int from binary (positive) (N is any number)
* try conversion to int, float, complex, fallback value
"""
if len(value) == 0:
return ''
if value == 'None':
return None
if value == 'True':

View File

@ -298,7 +298,7 @@ class CloudController(object):
'keyFingerprint': key_pair['fingerprint'],
})
return {'keypairsSet': result}
return {'keySet': result}
def create_key_pair(self, context, key_name, **kwargs):
LOG.audit(_("Create key pair %s"), key_name, context=context)
@ -838,14 +838,14 @@ class CloudController(object):
self.compute_api.unrescue(context, instance_id=instance_id)
return True
def update_instance(self, context, ec2_id, **kwargs):
def update_instance(self, context, instance_id, **kwargs):
updatable_fields = ['display_name', 'display_description']
changes = {}
for field in updatable_fields:
if field in kwargs:
changes[field] = kwargs[field]
if changes:
instance_id = ec2_id_to_id(ec2_id)
instance_id = ec2_id_to_id(instance_id)
self.compute_api.update(context, instance_id=instance_id, **kwargs)
return True

View File

@ -203,10 +203,58 @@ class Controller(wsgi.Controller):
return exc.HTTPNoContent()
def action(self, req, id):
""" Multi-purpose method used to reboot, rebuild, and
resize a server """
"""Multi-purpose method used to reboot, rebuild, or
resize a server"""
actions = {
'reboot': self._action_reboot,
'resize': self._action_resize,
'confirmResize': self._action_confirm_resize,
'revertResize': self._action_revert_resize,
'rebuild': self._action_rebuild,
}
input_dict = self._deserialize(req.body, req)
#TODO(sandy): rebuild/resize not supported.
for key in actions.keys():
if key in input_dict:
return actions[key](input_dict, req, id)
return faults.Fault(exc.HTTPNotImplemented())
def _action_confirm_resize(self, input_dict, req, id):
try:
self.compute_api.confirm_resize(req.environ['nova.context'], id)
except Exception, e:
LOG.exception(_("Error in confirm-resize %s"), e)
return faults.Fault(exc.HTTPBadRequest())
return exc.HTTPNoContent()
def _action_revert_resize(self, input_dict, req, id):
try:
self.compute_api.revert_resize(req.environ['nova.context'], id)
except Exception, e:
LOG.exception(_("Error in revert-resize %s"), e)
return faults.Fault(exc.HTTPBadRequest())
return exc.HTTPAccepted()
def _action_rebuild(self, input_dict, req, id):
return faults.Fault(exc.HTTPNotImplemented())
def _action_resize(self, input_dict, req, id):
""" Resizes a given instance to the flavor size requested """
try:
if 'resize' in input_dict and 'flavorId' in input_dict['resize']:
flavor_id = input_dict['resize']['flavorId']
self.compute_api.resize(req.environ['nova.context'], id,
flavor_id)
else:
LOG.exception(_("Missing arguments for resize"))
return faults.Fault(exc.HTTPUnprocessableEntity())
except Exception, e:
LOG.exception(_("Error in resize %s"), e)
return faults.Fault(exc.HTTPBadRequest())
return faults.Fault(exc.HTTPAccepted())
def _action_reboot(self, input_dict, req, id):
try:
reboot_type = input_dict['reboot']['type']
except Exception:
@ -402,7 +450,7 @@ class Controller(wsgi.Controller):
_("Cannot build from image %(image_id)s, status not active") %
locals())
if image['type'] != 'machine':
if image['disk_format'] != 'ami':
return None, None
try:

View File

@ -319,12 +319,12 @@ class API(base.Base):
try:
instance = self.get(context, instance_id)
except exception.NotFound:
LOG.warning(_("Instance %d was not found during terminate"),
LOG.warning(_("Instance %s was not found during terminate"),
instance_id)
raise
if (instance['state_description'] == 'terminating'):
LOG.warning(_("Instance %d is already being terminated"),
LOG.warning(_("Instance %s is already being terminated"),
instance_id)
return
@ -404,6 +404,10 @@ class API(base.Base):
kwargs = {'method': method, 'args': params}
return rpc.call(context, queue, kwargs)
def _cast_scheduler_message(self, context, args):
"""Generic handler for RPC calls to the scheduler"""
rpc.cast(context, FLAGS.scheduler_topic, args)
def snapshot(self, context, instance_id, name):
"""Snapshot the given instance.
@ -420,6 +424,45 @@ class API(base.Base):
"""Reboot the given instance."""
self._cast_compute_message('reboot_instance', context, instance_id)
def revert_resize(self, context, instance_id):
"""Reverts a resize, deleting the 'new' instance in the process"""
context = context.elevated()
migration_ref = self.db.migration_get_by_instance_and_status(context,
instance_id, 'finished')
if not migration_ref:
raise exception.NotFound(_("No finished migrations found for "
"instance"))
params = {'migration_id': migration_ref['id']}
self._cast_compute_message('revert_resize', context, instance_id,
migration_ref['dest_compute'], params=params)
def confirm_resize(self, context, instance_id):
"""Confirms a migration/resize, deleting the 'old' instance in the
process."""
context = context.elevated()
migration_ref = self.db.migration_get_by_instance_and_status(context,
instance_id, 'finished')
if not migration_ref:
raise exception.NotFound(_("No finished migrations found for "
"instance"))
instance_ref = self.db.instance_get(context, instance_id)
params = {'migration_id': migration_ref['id']}
self._cast_compute_message('confirm_resize', context, instance_id,
migration_ref['source_compute'], params=params)
self.db.migration_update(context, migration_id,
{'status': 'confirmed'})
self.db.instance_update(context, instance_id,
{'host': migration_ref['dest_compute'], })
def resize(self, context, instance_id, flavor):
"""Resize a running instance."""
self._cast_scheduler_message(context,
{"method": "prep_resize",
"args": {"topic": FLAGS.compute_topic,
"instance_id": instance_id, }},)
def pause(self, context, instance_id):
"""Pause the given instance."""
self._cast_compute_message('pause_instance', context, instance_id)

View File

@ -411,6 +411,110 @@ class ComputeManager(manager.Manager):
"""Update instance state when async task completes."""
self._update_state(context, instance_id)
@exception.wrap_exception
@checks_instance_lock
def confirm_resize(self, context, instance_id, migration_id):
"""Destroys the source instance"""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
migration_ref = self.db.migration_get(context, migration_id)
self.driver.destroy(instance_ref)
@exception.wrap_exception
@checks_instance_lock
def revert_resize(self, context, instance_id, migration_id):
"""Destroys the new instance on the destination machine,
reverts the model changes, and powers on the old
instance on the source machine"""
instance_ref = self.db.instance_get(context, instance_id)
migration_ref = self.db.migration_get(context, migration_id)
#TODO(mdietz): we may want to split these into separate methods.
if migration_ref['source_compute'] == FLAGS.host:
self.driver._start(instance_ref)
self.db.migration_update(context, migration_id,
{'status': 'reverted'})
else:
self.driver.destroy(instance_ref)
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
rpc.cast(context, topic,
{'method': 'revert_resize',
'args': {
'migration_id': migration_ref['id'],
'instance_id': instance_id, },
})
@exception.wrap_exception
@checks_instance_lock
def prep_resize(self, context, instance_id):
"""Initiates the process of moving a running instance to another
host, possibly changing the RAM and disk size in the process"""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
if instance_ref['host'] == FLAGS.host:
raise exception.Error(_(
'Migration error: destination same as source!'))
migration_ref = self.db.migration_create(context,
{'instance_id': instance_id,
'source_compute': instance_ref['host'],
'dest_compute': FLAGS.host,
'dest_host': self.driver.get_host_ip_addr(),
'status': 'pre-migrating'})
LOG.audit(_('instance %s: migrating to '), instance_id,
context=context)
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
rpc.cast(context, topic,
{'method': 'resize_instance',
'args': {
'migration_id': migration_ref['id'],
'instance_id': instance_id, },
})
@exception.wrap_exception
@checks_instance_lock
def resize_instance(self, context, instance_id, migration_id):
"""Starts the migration of a running instance to another host"""
migration_ref = self.db.migration_get(context, migration_id)
instance_ref = self.db.instance_get(context, instance_id)
self.db.migration_update(context, migration_id,
{'status': 'migrating', })
disk_info = self.driver.migrate_disk_and_power_off(instance_ref,
migration_ref['dest_host'])
self.db.migration_update(context, migration_id,
{'status': 'post-migrating', })
#TODO(mdietz): This is where we would update the VM record
#after resizing
service = self.db.service_get_by_host_and_topic(context,
migration_ref['dest_compute'], FLAGS.compute_topic)
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
migration_ref['dest_compute'])
rpc.cast(context, topic,
{'method': 'finish_resize',
'args': {
'migration_id': migration_id,
'instance_id': instance_id,
'disk_info': disk_info, },
})
@exception.wrap_exception
@checks_instance_lock
def finish_resize(self, context, instance_id, migration_id, disk_info):
"""Completes the migration process by setting up the newly transferred
disk and turning on the instance on its new host machine"""
migration_ref = self.db.migration_get(context, migration_id)
instance_ref = self.db.instance_get(context,
migration_ref['instance_id'])
self.driver.finish_resize(instance_ref, disk_info)
self.db.migration_update(context, migration_id,
{'status': 'finished', })
@exception.wrap_exception
@checks_instance_lock
def pause_instance(self, context, instance_id):

View File

@ -80,10 +80,15 @@ def service_destroy(context, instance_id):
def service_get(context, service_id):
"""Get an service or raise if it does not exist."""
"""Get a service or raise if it does not exist."""
return IMPL.service_get(context, service_id)
def service_get_by_host_and_topic(context, host, topic):
"""Get a service by host it's on and topic it listens to"""
return IMPL.service_get_by_host_and_topic(context, host, topic)
def service_get_all(context, disabled=False):
"""Get all services."""
return IMPL.service_get_all(context, disabled)
@ -254,6 +259,28 @@ def floating_ip_get_by_address(context, address):
####################
def migration_update(context, id, values):
"""Update a migration instance"""
return IMPL.migration_update(context, id, values)
def migration_create(context, values):
"""Create a migration record"""
return IMPL.migration_create(context, values)
def migration_get(context, migration_id):
"""Finds a migration by the id"""
return IMPL.migration_get(context, migration_id)
def migration_get_by_instance_and_status(context, instance_id, status):
"""Finds a migration by the instance id its migrating"""
return IMPL.migration_get_by_instance_and_status(context, instance_id,
status)
####################
def fixed_ip_associate(context, address, instance_id):
"""Associate fixed ip to instance.

View File

@ -154,6 +154,17 @@ def service_get_all_by_topic(context, topic):
all()
@require_admin_context
def service_get_by_host_and_topic(context, host, topic):
session = get_session()
return session.query(models.Service).\
filter_by(deleted=False).\
filter_by(disabled=False).\
filter_by(host=host).\
filter_by(topic=topic).\
first()
@require_admin_context
def service_get_all_by_host(context, host):
session = get_session()
@ -1972,6 +1983,51 @@ def host_get_networks(context, host):
all()
###################
@require_admin_context
def migration_create(context, values):
migration = models.Migration()
migration.update(values)
migration.save()
return migration
@require_admin_context
def migration_update(context, id, values):
session = get_session()
with session.begin():
migration = migration_get(context, id, session=session)
migration.update(values)
migration.save(session=session)
return migration
@require_admin_context
def migration_get(context, id, session=None):
if not session:
session = get_session()
result = session.query(models.Migration).\
filter_by(id=id).first()
if not result:
raise exception.NotFound(_("No migration found with id %s")
% migration_id)
return result
@require_admin_context
def migration_get_by_instance_and_status(context, instance_id, status):
session = get_session()
result = session.query(models.Migration).\
filter_by(instance_id=instance_id).\
filter_by(status=status).first()
if not result:
raise exception.NotFound(_("No migration found with instance id %s")
% migration_id)
return result
##################

View File

@ -0,0 +1,61 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.from sqlalchemy import *
from sqlalchemy import *
from migrate import *
from nova import log as logging
meta = MetaData()
# Just for the ForeignKey and column creation to succeed, these are not the
# actual definitions of instances or services.
instances = Table('instances', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)
#
# New Tables
#
migrations = Table('migrations', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('source_compute', String(255)),
Column('dest_compute', String(255)),
Column('dest_host', String(255)),
Column('instance_id', Integer, ForeignKey('instances.id'),
nullable=True),
Column('status', String(255)),
)
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine;
# bind migrate_engine to your metadata
meta.bind = migrate_engine
for table in (migrations, ):
try:
table.create()
except Exception:
logging.info(repr(table))
logging.exception('Exception while creating table')
raise

View File

@ -60,7 +60,7 @@ def db_version():
'key_pairs', 'networks', 'projects', 'quotas',
'security_group_instance_association',
'security_group_rules', 'security_groups',
'services',
'services', 'migrations',
'users', 'user_project_association',
'user_project_role_association',
'user_role_association',

View File

@ -389,6 +389,18 @@ class KeyPair(BASE, NovaBase):
public_key = Column(Text)
class Migration(BASE, NovaBase):
"""Represents a running host-to-host migration."""
__tablename__ = 'migrations'
id = Column(Integer, primary_key=True, nullable=False)
source_compute = Column(String(255))
dest_compute = Column(String(255))
dest_host = Column(String(255))
instance_id = Column(Integer, ForeignKey('instances.id'), nullable=True)
#TODO(_cerberus_): enum
status = Column(String(255))
class Network(BASE, NovaBase):
"""Represents a network."""
__tablename__ = 'networks'
@ -598,7 +610,7 @@ def register_models():
Network, SecurityGroup, SecurityGroupIngressRule,
SecurityGroupInstanceAssociation, AuthToken, User,
Project, Certificate, ConsolePool, Console, Zone,
InstanceMetadata)
InstanceMetadata, Migration)
engine = create_engine(FLAGS.sql_connection, echo=False)
for model in models:
model.metadata.create_all(engine)

View File

@ -48,7 +48,6 @@ class Exchange(object):
nm = self.name
LOG.debug(_('(%(nm)s) publish (key: %(routing_key)s)'
' %(message)s') % locals())
routing_key = routing_key.split('.')[0]
if routing_key in self._routes:
for f in self._routes[routing_key]:
LOG.debug(_('Publishing to route %s'), f)

View File

@ -266,7 +266,10 @@ class NovaRootLogger(NovaLogger):
def handle_exception(type, value, tb):
logging.root.critical(str(value), exc_info=(type, value, tb))
extra = {}
if FLAGS.verbose:
extra['exc_info'] = (type, value, tb)
logging.root.critical(str(value), **extra)
def reset():

View File

@ -21,6 +21,7 @@ Handles all requests relating to instances (guest vms).
"""
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import quota

View File

@ -123,7 +123,7 @@ class Consumer(messaging.Consumer):
LOG.error(_("Reconnected to queue"))
self.failed_connection = False
# NOTE(vish): This is catching all errors because we really don't
# exceptions to be logged 10 times a second if some
# want exceptions to be logged 10 times a second if some
# persistent failure occurs.
except Exception: # pylint: disable-msg=W0703
if not self.failed_connection:

View File

@ -2,6 +2,7 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -39,6 +40,7 @@ from nova import flags
from nova import rpc
from nova import utils
from nova import version
from nova import wsgi
FLAGS = flags.FLAGS
@ -48,6 +50,14 @@ flags.DEFINE_integer('report_interval', 10,
flags.DEFINE_integer('periodic_interval', 60,
'seconds between running periodic tasks',
lower_bound=1)
flags.DEFINE_string('ec2_listen', "0.0.0.0",
'IP address for EC2 API to listen')
flags.DEFINE_integer('ec2_listen_port', 8773, 'port for ec2 api to listen')
flags.DEFINE_string('osapi_listen', "0.0.0.0",
'IP address for OpenStack API to listen')
flags.DEFINE_integer('osapi_listen_port', 8774, 'port for os api to listen')
flags.DEFINE_string('api_paste_config', "api-paste.ini",
'File name for the paste.deploy config for nova-api')
class Service(object):
@ -210,6 +220,41 @@ class Service(object):
logging.exception(_("model server went away"))
class WsgiService(object):
"""Base class for WSGI based services.
For each api you define, you must also define these flags:
:<api>_listen: The address on which to listen
:<api>_listen_port: The port on which to listen
"""
def __init__(self, conf, apis):
self.conf = conf
self.apis = apis
self.wsgi_app = None
def start(self):
self.wsgi_app = _run_wsgi(self.conf, self.apis)
def wait(self):
self.wsgi_app.wait()
class ApiService(WsgiService):
"""Class for our nova-api service"""
@classmethod
def create(cls, conf=None):
if not conf:
conf = wsgi.paste_config_file(FLAGS.api_paste_config)
if not conf:
message = (_("No paste configuration found for: %s"),
FLAGS.api_paste_config)
raise exception.Error(message)
api_endpoints = ['ec2', 'osapi']
service = cls(conf, api_endpoints)
return service
def serve(*services):
try:
if not services:
@ -239,3 +284,46 @@ def serve(*services):
def wait():
while True:
greenthread.sleep(5)
def serve_wsgi(cls, conf=None):
try:
service = cls.create(conf)
except Exception:
logging.exception('in WsgiService.create()')
raise
finally:
# After we've loaded up all our dynamic bits, check
# whether we should print help
flags.DEFINE_flag(flags.HelpFlag())
flags.DEFINE_flag(flags.HelpshortFlag())
flags.DEFINE_flag(flags.HelpXMLFlag())
FLAGS.ParseNewFlags()
service.start()
return service
def _run_wsgi(paste_config_file, apis):
logging.debug(_("Using paste.deploy config at: %s"), paste_config_file)
apps = []
for api in apis:
config = wsgi.load_paste_configuration(paste_config_file, api)
if config is None:
logging.debug(_("No paste configuration for app: %s"), api)
continue
logging.debug(_("App Config: %(api)s\n%(config)r") % locals())
logging.info(_("Running %s API"), api)
app = wsgi.load_paste_app(paste_config_file, api)
apps.append((app, getattr(FLAGS, "%s_listen_port" % api),
getattr(FLAGS, "%s_listen" % api)))
if len(apps) == 0:
logging.error(_("No known API applications configured in %s."),
paste_config_file)
return
server = wsgi.Server()
for app in apps:
server.start(*app)
return server

View File

@ -0,0 +1,35 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import webob
def webob_factory(url):
"""Factory for removing duplicate webob code from tests"""
base_url = url
def web_request(url, method=None, body=None):
req = webob.Request.blank("%s%s" % (base_url, url))
if method:
req.method = method
if body:
req.body = json.dumps(body)
return req
return web_request

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 OpenStack LLC.
# Copyright 2010-2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -26,10 +26,12 @@ from nova import flags
from nova import test
import nova.api.openstack
from nova.api.openstack import servers
import nova.compute.api
import nova.db.api
from nova.db.sqlalchemy.models import Instance
from nova.db.sqlalchemy.models import InstanceMetadata
import nova.rpc
from nova.tests.api.openstack import common
from nova.tests.api.openstack import fakes
@ -144,6 +146,8 @@ class ServersTest(test.TestCase):
self.stubs.Set(nova.compute.API, "get_actions", fake_compute_api)
self.allow_admin = FLAGS.allow_admin_api
self.webreq = common.webob_factory('/v1.0/servers')
def tearDown(self):
self.stubs.UnsetAll()
FLAGS.allow_admin_api = self.allow_admin
@ -465,3 +469,99 @@ class ServersTest(test.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status, '202 Accepted')
self.assertEqual(self.server_delete_called, True)
def test_resize_server(self):
req = self.webreq('/1/action', 'POST', dict(resize=dict(flavorId=3)))
self.resize_called = False
def resize_mock(*args):
self.resize_called = True
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)
self.assertEqual(self.resize_called, True)
def test_resize_bad_flavor_fails(self):
req = self.webreq('/1/action', 'POST', dict(resize=dict(derp=3)))
self.resize_called = False
def resize_mock(*args):
self.resize_called = True
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 422)
self.assertEqual(self.resize_called, False)
def test_resize_raises_fails(self):
req = self.webreq('/1/action', 'POST', dict(resize=dict(flavorId=3)))
def resize_mock(*args):
raise Exception('hurr durr')
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
def test_confirm_resize_server(self):
req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
self.resize_called = False
def confirm_resize_mock(*args):
self.resize_called = True
self.stubs.Set(nova.compute.api.API, 'confirm_resize',
confirm_resize_mock)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 204)
self.assertEqual(self.resize_called, True)
def test_confirm_resize_server_fails(self):
req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
def confirm_resize_mock(*args):
raise Exception('hurr durr')
self.stubs.Set(nova.compute.api.API, 'confirm_resize',
confirm_resize_mock)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
def test_revert_resize_server(self):
req = self.webreq('/1/action', 'POST', dict(revertResize=None))
self.resize_called = False
def revert_resize_mock(*args):
self.resize_called = True
self.stubs.Set(nova.compute.api.API, 'revert_resize',
revert_resize_mock)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)
self.assertEqual(self.resize_called, True)
def test_revert_resize_server_fails(self):
req = self.webreq('/1/action', 'POST', dict(revertResize=None))
def revert_resize_mock(*args):
raise Exception('hurr durr')
self.stubs.Set(nova.compute.api.API, 'revert_resize',
revert_resize_mock)
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 400)
if __name__ == "__main__":
unittest.main()

View File

@ -35,23 +35,28 @@ class FakeGlance(object):
IMAGE_FIXTURES = {
IMAGE_MACHINE: {
'image_meta': {'name': 'fakemachine', 'size': 0,
'type': 'machine'},
'disk_format': 'ami',
'container_format': 'ami'},
'image_data': StringIO.StringIO('')},
IMAGE_KERNEL: {
'image_meta': {'name': 'fakekernel', 'size': 0,
'type': 'kernel'},
'disk_format': 'aki',
'container_format': 'aki'},
'image_data': StringIO.StringIO('')},
IMAGE_RAMDISK: {
'image_meta': {'name': 'fakeramdisk', 'size': 0,
'type': 'ramdisk'},
'disk_format': 'ari',
'container_format': 'ari'},
'image_data': StringIO.StringIO('')},
IMAGE_RAW: {
'image_meta': {'name': 'fakeraw', 'size': 0,
'type': 'raw'},
'disk_format': 'raw',
'container_format': 'bare'},
'image_data': StringIO.StringIO('')},
IMAGE_VHD: {
'image_meta': {'name': 'fakevhd', 'size': 0,
'type': 'vhd'},
'disk_format': 'vhd',
'container_format': 'ovf'},
'image_data': StringIO.StringIO('')}}
def __init__(self, host, port=None, use_ssl=False):

View File

@ -267,7 +267,7 @@ class CloudTestCase(test.TestCase):
self._create_key('test1')
self._create_key('test2')
result = self.cloud.describe_key_pairs(self.context)
keys = result["keypairsSet"]
keys = result["keySet"]
self.assertTrue(filter(lambda k: k['keyName'] == 'test1', keys))
self.assertTrue(filter(lambda k: k['keyName'] == 'test2', keys))

View File

@ -57,7 +57,7 @@ class ComputeTestCase(test.TestCase):
self.manager.delete_project(self.project)
super(ComputeTestCase, self).tearDown()
def _create_instance(self):
def _create_instance(self, params={}):
"""Create a test instance"""
inst = {}
inst['image_id'] = 'ami-test'
@ -68,6 +68,7 @@ class ComputeTestCase(test.TestCase):
inst['instance_type'] = 'm1.tiny'
inst['mac_address'] = utils.generate_mac()
inst['ami_launch_index'] = 0
inst.update(params)
return db.instance_create(self.context, inst)['id']
def _create_group(self):
@ -268,9 +269,30 @@ class ComputeTestCase(test.TestCase):
self.compute.terminate_instance(self.context, instance_id)
def test_resize_instance(self):
"""Ensure instance can be migrated/resized"""
instance_id = self._create_instance()
context = self.context.elevated()
self.compute.run_instance(self.context, instance_id)
db.instance_update(self.context, instance_id, {'host': 'foo'})
self.compute.prep_resize(context, instance_id)
migration_ref = db.migration_get_by_instance_and_status(context,
instance_id, 'pre-migrating')
self.compute.resize_instance(context, instance_id,
migration_ref['id'])
self.compute.terminate_instance(context, instance_id)
def test_get_by_flavor_id(self):
type = instance_types.get_by_flavor_id(1)
self.assertEqual(type, 'm1.tiny')
def test_resize_same_source_fails(self):
"""Ensure instance fails to migrate when source and destination are
the same host"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.assertRaises(exception.Error, self.compute.prep_resize,
self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
type = instance_types.get_by_flavor_id("1")
self.assertEqual(type, 'm1.tiny')

View File

@ -346,6 +346,56 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
super(XenAPIDiffieHellmanTestCase, self).tearDown()
class XenAPIMigrateInstance(test.TestCase):
"""
Unit test for verifying migration-related actions
"""
def setUp(self):
super(XenAPIMigrateInstance, self).setUp()
self.stubs = stubout.StubOutForTesting()
FLAGS.target_host = '127.0.0.1'
FLAGS.xenapi_connection_url = 'test_url'
FLAGS.xenapi_connection_password = 'test_pass'
db_fakes.stub_out_db_instance_api(self.stubs)
stubs.stub_out_get_target(self.stubs)
xenapi_fake.reset()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.values = {'name': 1, 'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
'image_id': 1,
'kernel_id': None,
'ramdisk_id': None,
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff',
}
stubs.stub_out_migration_methods(self.stubs)
glance_stubs.stubout_glance_client(self.stubs,
glance_stubs.FakeGlance)
def tearDown(self):
super(XenAPIMigrateInstance, self).tearDown()
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
self.stubs.UnsetAll()
def test_migrate_disk_and_power_off(self):
instance = db.instance_create(self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
conn = xenapi_conn.get_connection(False)
conn.migrate_disk_and_power_off(instance, '127.0.0.1')
def test_finish_resize(self):
instance = db.instance_create(self.values)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
conn = xenapi_conn.get_connection(False)
conn.finish_resize(instance, dict(base_copy='hurr', cow='durr'))
class XenAPIDetermineDiskImageTestCase(test.TestCase):
"""
Unit tests for code that detects the ImageType

View File

@ -20,6 +20,7 @@ from nova.virt import xenapi_conn
from nova.virt.xenapi import fake
from nova.virt.xenapi import volume_utils
from nova.virt.xenapi import vm_utils
from nova.virt.xenapi import vmops
def stubout_instance_snapshot(stubs):
@ -217,3 +218,60 @@ class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
def SR_forget(self, _1, ref):
pass
class FakeSessionForMigrationTests(fake.SessionBase):
"""Stubs out a XenAPISession for Migration tests"""
def __init__(self, uri):
super(FakeSessionForMigrationTests, self).__init__(uri)
def VDI_get_by_uuid(*args):
return 'hurr'
def VM_start(self, _1, ref, _2, _3):
vm = fake.get_record('VM', ref)
if vm['power_state'] != 'Halted':
raise fake.Failure(['VM_BAD_POWER_STATE', ref, 'Halted',
vm['power_state']])
vm['power_state'] = 'Running'
vm['is_a_template'] = False
vm['is_control_domain'] = False
def stub_out_migration_methods(stubs):
def fake_get_snapshot(self, instance):
return 'foo', 'bar'
@classmethod
def fake_get_vdi(cls, session, vm_ref):
vdi_ref = fake.create_vdi(name_label='derp', read_only=False,
sr_ref='herp', sharable=False)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
return vdi_ref, {'uuid': vdi_rec['uuid'], }
def fake_shutdown(self, inst, vm, method='clean'):
pass
@classmethod
def fake_sr(cls, session, *args):
pass
@classmethod
def fake_get_sr_path(cls, *args):
return "fake"
def fake_destroy(*args, **kwargs):
pass
def fake_reset_network(*args, **kwargs):
pass
stubs.Set(vmops.VMOps, '_destroy', fake_destroy)
stubs.Set(vm_utils.VMHelper, 'scan_default_sr', fake_sr)
stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
stubs.Set(vmops.VMOps, '_get_snapshot', fake_get_snapshot)
stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
stubs.Set(vmops.VMOps, 'reset_network', fake_reset_network)
stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)

View File

@ -139,6 +139,24 @@ class FakeConnection(object):
"""
pass
def get_host_ip_addr(self):
"""
Retrieves the IP address of the dom0
"""
pass
def resize(self, instance, flavor):
"""
Resizes/Migrates the specified instance.
The flavor parameter determines whether or not the instance RAM and
disk space are modified, and if so, to what size.
The work will be done asynchronously. This function returns a task
that allows the caller to detect when it is complete.
"""
pass
def set_admin_password(self, instance, new_pass):
"""
Set the root password on the specified instance.
@ -179,6 +197,19 @@ class FakeConnection(object):
"""
pass
def migrate_disk_and_power_off(self, instance, dest):
"""
Transfers the disk of a running instance in multiple phases, turning
off the instance before the end.
"""
pass
def attach_disk(self, instance, disk_info):
"""
Attaches the disk to an instance given the metadata disk_info
"""
pass
def pause(self, instance, callback):
"""
Pause the specified instance.

View File

@ -607,7 +607,7 @@ class LibvirtConnection(object):
user=user,
project=project,
size=size)
type_data = instance_types.get_instance_type([inst['instance_type']])
type_data = instance_types.get_instance_type(inst['instance_type'])
if type_data['local_gb']:
self._cache_image(fn=self._create_local,

View File

@ -290,6 +290,9 @@ class SessionBase(object):
#Always return 12GB available
return 12 * 1024 * 1024 * 1024
def host_call_plugin(*args):
return 'herp'
def xenapi_request(self, methodname, params):
if methodname.startswith('login'):
self._login(methodname, params)

View File

@ -252,17 +252,32 @@ class VMHelper(HelperBase):
% locals())
return vdi_ref
@classmethod
def get_vdi_for_vm_safely(cls, session, vm_ref):
vdi_refs = VMHelper.lookup_vm_vdis(session, vm_ref)
if vdi_refs is None:
raise Exception(_("No VDIs found for VM %s") % vm_ref)
else:
num_vdis = len(vdi_refs)
if num_vdis != 1:
raise Exception(
_("Unexpected number of VDIs (%(num_vdis)s) found"
" for VM %(vm_ref)s") % locals())
vdi_ref = vdi_refs[0]
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
return vdi_ref, vdi_rec
@classmethod
def create_snapshot(cls, session, instance_id, vm_ref, label):
""" Creates Snapshot (Template) VM, Snapshot VBD, Snapshot VDI,
Snapshot VHD
"""
"""Creates Snapshot (Template) VM, Snapshot VBD, Snapshot VDI,
Snapshot VHD"""
#TODO(sirp): Add quiesce and VSS locking support when Windows support
# is added
LOG.debug(_("Snapshotting VM %(vm_ref)s with label '%(label)s'...")
% locals())
vm_vdi_ref, vm_vdi_rec = get_vdi_for_vm_safely(session, vm_ref)
vm_vdi_ref, vm_vdi_rec = cls.get_vdi_for_vm_safely(session, vm_ref)
vm_vdi_uuid = vm_vdi_rec["uuid"]
sr_ref = vm_vdi_rec["SR"]
@ -270,7 +285,8 @@ class VMHelper(HelperBase):
task = session.call_xenapi('Async.VM.snapshot', vm_ref, label)
template_vm_ref = session.wait_for_task(task, instance_id)
template_vdi_rec = get_vdi_for_vm_safely(session, template_vm_ref)[1]
template_vdi_rec = cls.get_vdi_for_vm_safely(session,
template_vm_ref)[1]
template_vdi_uuid = template_vdi_rec["uuid"]
LOG.debug(_('Created snapshot %(template_vm_ref)s from'
@ -284,6 +300,24 @@ class VMHelper(HelperBase):
'snap': template_vdi_uuid}
return template_vm_ref, template_vdi_uuids
@classmethod
def get_sr(cls, session, sr_label='slices'):
"""Finds the SR named by the given name label and returns
the UUID"""
return session.call_xenapi('SR.get_by_name_label', sr_label)[0]
@classmethod
def get_sr_path(cls, session):
"""Return the path to our storage repository
This is used when we're dealing with VHDs directly, either by taking
snapshots or by restoring an image in the DISK_VHD format.
"""
sr_ref = safe_find_sr(session)
sr_rec = session.get_xenapi().SR.get_record(sr_ref)
sr_uuid = sr_rec["uuid"]
return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)
@classmethod
def upload_image(cls, session, instance_id, vdi_uuids, image_id):
""" Requests that the Glance plugin bundle the specified VDIs and
@ -298,7 +332,7 @@ class VMHelper(HelperBase):
'image_id': image_id,
'glance_host': FLAGS.glance_host,
'glance_port': FLAGS.glance_port,
'sr_path': get_sr_path(session)}
'sr_path': cls.get_sr_path(session)}
kwargs = {'params': pickle.dumps(params)}
task = session.async_call_plugin('glance', 'upload_vhd', kwargs)
@ -341,13 +375,13 @@ class VMHelper(HelperBase):
'glance_host': FLAGS.glance_host,
'glance_port': FLAGS.glance_port,
'uuid_stack': uuid_stack,
'sr_path': get_sr_path(session)}
'sr_path': cls.get_sr_path(session)}
kwargs = {'params': pickle.dumps(params)}
task = session.async_call_plugin('glance', 'download_vhd', kwargs)
vdi_uuid = session.wait_for_task(task, instance_id)
scan_sr(session, instance_id, sr_ref)
cls.scan_sr(session, instance_id, sr_ref)
# Set the name-label to ease debugging
vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid)
@ -433,19 +467,21 @@ class VMHelper(HelperBase):
"%(image_id)s, instance %(instance_id)s") % locals())
def determine_from_glance():
glance_type2nova_type = {'machine': ImageType.DISK,
'raw': ImageType.DISK_RAW,
'vhd': ImageType.DISK_VHD,
'kernel': ImageType.KERNEL_RAMDISK,
'ramdisk': ImageType.KERNEL_RAMDISK}
glance_disk_format2nova_type = {
'ami': ImageType.DISK,
'aki': ImageType.KERNEL_RAMDISK,
'ari': ImageType.KERNEL_RAMDISK,
'raw': ImageType.DISK_RAW,
'vhd': ImageType.DISK_VHD}
client = glance.client.Client(FLAGS.glance_host, FLAGS.glance_port)
meta = client.get_image_meta(instance.image_id)
type_ = meta['type']
disk_format = meta['disk_format']
try:
return glance_type2nova_type[type_]
return glance_disk_format2nova_type[disk_format]
except KeyError:
raise exception.NotFound(
_("Unrecognized image type '%(type_)s'") % locals())
_("Unrecognized disk_format '%(disk_format)s'")
% locals())
def determine_from_instance():
if instance.kernel_id:
@ -609,6 +645,21 @@ class VMHelper(HelperBase):
except cls.XenAPI.Failure as e:
return {"Unable to retrieve diagnostics": e}
@classmethod
def scan_sr(cls, session, instance_id=None, sr_ref=None):
"""Scans the SR specified by sr_ref"""
if sr_ref:
LOG.debug(_("Re-scanning SR %s"), sr_ref)
task = session.call_xenapi('Async.SR.scan', sr_ref)
session.wait_for_task(task, instance_id)
@classmethod
def scan_default_sr(cls, session):
"""Looks for the system default SR and triggers a re-scan"""
#FIXME(sirp/mdietz): refactor scan_default_sr in there
sr_ref = cls.get_sr(session)
session.call_xenapi('SR.scan', sr_ref)
def get_rrd(host, uuid):
"""Return the VM RRD XML as a string"""
@ -651,12 +702,6 @@ def get_vhd_parent_uuid(session, vdi_ref):
return None
def scan_sr(session, instance_id, sr_ref):
LOG.debug(_("Re-scanning SR %s"), sr_ref)
task = session.call_xenapi('Async.SR.scan', sr_ref)
session.wait_for_task(task, instance_id)
def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref,
original_parent_uuid):
""" Spin until the parent VHD is coalesced into its parent VHD
@ -681,7 +726,7 @@ def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref,
" %(max_attempts)d), giving up...") % locals())
raise exception.Error(msg)
scan_sr(session, instance_id, sr_ref)
VMHelper.scan_sr(session, instance_id, sr_ref)
parent_uuid = get_vhd_parent_uuid(session, vdi_ref)
if original_parent_uuid and (parent_uuid != original_parent_uuid):
LOG.debug(_("Parent %(parent_uuid)s doesn't match original parent"
@ -738,18 +783,6 @@ def find_sr(session):
return None
def get_sr_path(session):
"""Return the path to our storage repository
This is used when we're dealing with VHDs directly, either by taking
snapshots or by restoring an image in the DISK_VHD format.
"""
sr_ref = safe_find_sr(session)
sr_rec = session.get_xenapi().SR.get_record(sr_ref)
sr_uuid = sr_rec["uuid"]
return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)
def remap_vbd_dev(dev):
"""Return the appropriate location for a plugged-in VBD device

View File

@ -22,6 +22,7 @@ Management class for VM-related functions (spawn, reboot, etc).
import json
import M2Crypto
import os
import pickle
import subprocess
import tempfile
import uuid
@ -61,13 +62,35 @@ class VMOps(object):
vms.append(rec["name_label"])
return vms
def _start(self, instance, vm_ref=None):
"""Power on a VM instance"""
if not vm_ref:
vm_ref = VMHelper.lookup(self._session, instance.name)
if vm_ref is None:
raise exception(_('Attempted to power on non-existent instance'
' bad instance id %s') % instance.id)
LOG.debug(_("Starting instance %s"), instance.name)
self._session.call_xenapi('VM.start', vm_ref, False, False)
def create_disk(self, instance):
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
disk_image_type = VMHelper.determine_disk_image_type(instance)
vdi_uuid = VMHelper.fetch_image(self._session, instance.id,
instance.image_id, user, project, disk_image_type)
return vdi_uuid
def spawn(self, instance):
vdi_uuid = self.create_disk(instance)
self._spawn_with_disk(instance, vdi_uuid=vdi_uuid)
def _spawn_with_disk(self, instance, vdi_uuid):
"""Create VM instance"""
instance_name = instance.name
vm = VMHelper.lookup(self._session, instance_name)
if vm is not None:
raise exception.Duplicate(_('Attempted to create'
' non-unique name %s') % instance_name)
' non-unique name %s') % instance_name)
#ensure enough free memory is available
if not VMHelper.ensure_free_mem(self._session, instance):
@ -81,16 +104,14 @@ class VMOps(object):
user = AuthManager().get_user(instance.user_id)
project = AuthManager().get_project(instance.project_id)
disk_image_type = VMHelper.determine_disk_image_type(instance)
vdi_uuid = VMHelper.fetch_image(self._session, instance.id,
instance.image_id, user, project, disk_image_type)
kernel = ramdisk = pv_kernel = None
# Are we building from a pre-existing disk?
vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
pv_kernel = False
disk_image_type = VMHelper.determine_disk_image_type(instance)
if disk_image_type == ImageType.DISK_RAW:
#Have a look at the VDI and see if it has a PV kernel
# Have a look at the VDI and see if it has a PV kernel
pv_kernel = VMHelper.lookup_image(self._session, instance.id,
vdi_ref)
elif disk_image_type == ImageType.DISK_VHD:
@ -98,19 +119,18 @@ class VMOps(object):
# configurable as Windows will use HVM.
pv_kernel = True
kernel = None
if instance.kernel_id:
kernel = VMHelper.fetch_image(self._session, instance.id,
instance.kernel_id, user, project, ImageType.KERNEL_RAMDISK)
ramdisk = None
if instance.ramdisk_id:
ramdisk = VMHelper.fetch_image(self._session, instance.id,
instance.ramdisk_id, user, project, ImageType.KERNEL_RAMDISK)
vm_ref = VMHelper.create_vm(self._session,
instance, kernel, ramdisk, pv_kernel)
VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
VMHelper.create_vbd(session=self._session, vm_ref=vm_ref,
vdi_ref=vdi_ref, userdevice=0, bootable=True)
# inject_network_info and create vifs
networks = self.inject_network_info(instance)
@ -172,35 +192,38 @@ class VMOps(object):
"""Refactored out the common code of many methods that receive either
a vm name or a vm instance, and want a vm instance in return.
"""
vm = None
try:
if instance_or_vm.startswith("OpaqueRef:"):
# Got passed an opaque ref; return it
# if instance_or_vm is a string it must be opaque ref or instance name
if isinstance(instance_or_vm, basestring):
obj = None
try:
# check for opaque ref
obj = self._session.get_xenapi().VM.get_record(instance_or_vm)
return instance_or_vm
else:
# Must be the instance name
except self.XenAPI.Failure:
# wasn't an opaque ref, must be an instance name
instance_name = instance_or_vm
except (AttributeError, KeyError):
# Note the the KeyError will only happen with fakes.py
# Not a string; must be an ID or a vm instance
if isinstance(instance_or_vm, (int, long)):
ctx = context.get_admin_context()
try:
instance_obj = db.instance_get(ctx, instance_or_vm)
instance_name = instance_obj.name
except exception.NotFound:
# The unit tests screw this up, as they use an integer for
# the vm name. I'd fix that up, but that's a matter for
# another bug report. So for now, just try with the passed
# value
instance_name = instance_or_vm
else:
instance_name = instance_or_vm.name
vm = VMHelper.lookup(self._session, instance_name)
if vm is None:
# if instance_or_vm is an int/long it must be instance id
elif isinstance(instance_or_vm, (int, long)):
ctx = context.get_admin_context()
try:
instance_obj = db.instance_get(ctx, instance_or_vm)
instance_name = instance_obj.name
except exception.NotFound:
# The unit tests screw this up, as they use an integer for
# the vm name. I'd fix that up, but that's a matter for
# another bug report. So for now, just try with the passed
# value
instance_name = instance_or_vm
# otherwise instance_or_vm is an instance object
else:
instance_name = instance_or_vm.name
vm_ref = VMHelper.lookup(self._session, instance_name)
if vm_ref is None:
raise exception.NotFound(
_('Instance not present %s') % instance_name)
return vm
return vm_ref
def _acquire_bootlock(self, vm):
"""Prevent an instance from booting"""
@ -217,7 +240,7 @@ class VMOps(object):
"start")
def snapshot(self, instance, image_id):
""" Create snapshot from a running VM instance
"""Create snapshot from a running VM instance
:param instance: instance to be snapshotted
:param image_id: id of image to upload to
@ -238,7 +261,20 @@ class VMOps(object):
that will bundle the VHDs together and then push the bundle into
Glance.
"""
template_vm_ref = None
try:
template_vm_ref, template_vdi_uuids = self._get_snapshot(instance)
# call plugin to ship snapshot off to glance
VMHelper.upload_image(
self._session, instance.id, template_vdi_uuids, image_id)
finally:
if template_vm_ref:
self._destroy(instance, template_vm_ref,
shutdown=False, destroy_kernel_ramdisk=False)
logging.debug(_("Finished snapshot and upload for VM %s"), instance)
def _get_snapshot(self, instance):
#TODO(sirp): Add quiesce and VSS locking support when Windows support
# is added
@ -249,20 +285,89 @@ class VMOps(object):
try:
template_vm_ref, template_vdi_uuids = VMHelper.create_snapshot(
self._session, instance.id, vm_ref, label)
return template_vm_ref, template_vdi_uuids
except self.XenAPI.Failure, exc:
logging.error(_("Unable to Snapshot %(vm_ref)s: %(exc)s")
% locals())
return
try:
# call plugin to ship snapshot off to glance
VMHelper.upload_image(
self._session, instance.id, template_vdi_uuids, image_id)
finally:
self._destroy(instance, template_vm_ref, shutdown=False,
destroy_kernel_ramdisk=False)
def migrate_disk_and_power_off(self, instance, dest):
"""Copies a VHD from one host machine to another
logging.debug(_("Finished snapshot and upload for VM %s"), instance)
:param instance: the instance that owns the VHD in question
:param dest: the destination host machine
:param disk_type: values are 'primary' or 'cow'
"""
vm_ref = VMHelper.lookup(self._session, instance.name)
# The primary VDI becomes the COW after the snapshot, and we can
# identify it via the VBD. The base copy is the parent_uuid returned
# from the snapshot creation
base_copy_uuid = cow_uuid = None
template_vdi_uuids = template_vm_ref = None
try:
# transfer the base copy
template_vm_ref, template_vdi_uuids = self._get_snapshot(instance)
base_copy_uuid = template_vdi_uuids[1]
vdi_ref, vm_vdi_rec = \
VMHelper.get_vdi_for_vm_safely(self._session, vm_ref)
cow_uuid = vm_vdi_rec['uuid']
params = {'host': dest,
'vdi_uuid': base_copy_uuid,
'instance_id': instance.id,
'sr_path': VMHelper.get_sr_path(self._session)}
task = self._session.async_call_plugin('migration', 'transfer_vhd',
{'params': pickle.dumps(params)})
self._session.wait_for_task(task, instance.id)
# Now power down the instance and transfer the COW VHD
self._shutdown(instance, vm_ref, method='clean')
params = {'host': dest,
'vdi_uuid': cow_uuid,
'instance_id': instance.id,
'sr_path': VMHelper.get_sr_path(self._session), }
task = self._session.async_call_plugin('migration', 'transfer_vhd',
{'params': pickle.dumps(params)})
self._session.wait_for_task(task, instance.id)
finally:
if template_vm_ref:
self._destroy(instance, template_vm_ref,
shutdown=False, destroy_kernel_ramdisk=False)
# TODO(mdietz): we could also consider renaming these to something
# sensible so we don't need to blindly pass around dictionaries
return {'base_copy': base_copy_uuid, 'cow': cow_uuid}
def attach_disk(self, instance, base_copy_uuid, cow_uuid):
"""Links the base copy VHD to the COW via the XAPI plugin"""
vm_ref = VMHelper.lookup(self._session, instance.name)
new_base_copy_uuid = str(uuid.uuid4())
new_cow_uuid = str(uuid.uuid4())
params = {'instance_id': instance.id,
'old_base_copy_uuid': base_copy_uuid,
'old_cow_uuid': cow_uuid,
'new_base_copy_uuid': new_base_copy_uuid,
'new_cow_uuid': new_cow_uuid,
'sr_path': VMHelper.get_sr_path(self._session), }
task = self._session.async_call_plugin('migration',
'move_vhds_into_sr', {'params': pickle.dumps(params)})
self._session.wait_for_task(task, instance.id)
# Now we rescan the SR so we find the VHDs
VMHelper.scan_default_sr(self._session)
return new_cow_uuid
def resize(self, instance, flavor):
"""Resize a running instance by changing it's RAM and disk size """
raise NotImplementedError()
def reboot(self, instance):
"""Reboot VM instance"""
@ -308,11 +413,6 @@ class VMOps(object):
raise RuntimeError(resp_dict['message'])
return resp_dict['message']
def _start(self, instance, vm):
"""Start an instance"""
task = self._session.call_xenapi("Async.VM.start", vm, False, False)
self._session.wait_for_task(task, instance.id)
def inject_file(self, instance, b64_path, b64_contents):
"""Write a file to the VM instance. The path to which it is to be
written and the contents of the file need to be supplied; both should
@ -355,8 +455,7 @@ class VMOps(object):
if hard:
task = self._session.call_xenapi("Async.VM.hard_shutdown", vm)
else:
task = self._session.call_xenapi("Async.VM.clean_shutdown", vm)
task = self._session.call_xenapi('Async.VM.clean_shutdown', vm)
self._session.wait_for_task(task, instance.id)
except self.XenAPI.Failure, exc:
LOG.exception(exc)

View File

@ -158,10 +158,20 @@ class XenAPIConnection(object):
"""Create VM instance"""
self._vmops.spawn(instance)
def finish_resize(self, instance, disk_info):
"""Completes a resize, turning on the migrated instance"""
vdi_uuid = self._vmops.attach_disk(instance, disk_info['base_copy'],
disk_info['cow'])
self._vmops._spawn_with_disk(instance, vdi_uuid)
def snapshot(self, instance, image_id):
""" Create snapshot from a running VM instance """
self._vmops.snapshot(instance, image_id)
def resize(self, instance, flavor):
"""Resize a VM instance"""
raise NotImplementedError()
def reboot(self, instance):
"""Reboot VM instance"""
self._vmops.reboot(instance)
@ -188,6 +198,11 @@ class XenAPIConnection(object):
"""Unpause paused VM instance"""
self._vmops.unpause(instance, callback)
def migrate_disk_and_power_off(self, instance, dest):
"""Transfers the VHD of a running instance to another host, then shuts
off the instance copies over the COW disk"""
return self._vmops.migrate_disk_and_power_off(instance, dest)
def suspend(self, instance, callback):
"""suspend the specified instance"""
self._vmops.suspend(instance, callback)
@ -228,6 +243,10 @@ class XenAPIConnection(object):
"""Return link to instance's ajax console"""
return self._vmops.get_ajax_console(instance)
def get_host_ip_addr(self):
xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
return xs_url.netloc
def attach_volume(self, instance_name, device_path, mountpoint):
"""Attach volume storage to VM instance"""
return self._volumeops.attach_volume(instance_name,

View File

@ -201,13 +201,21 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port):
# to request
conn.putrequest('PUT', '/images/%s' % image_id)
# TODO(sirp): make `store` configurable
# NOTE(sirp): There is some confusion around OVF. Here's a summary of
# where we currently stand:
# 1. OVF as a container format is misnamed. We really should be using
# OVA since that is the name for the container format; OVF is the
# standard applied to the manifest file contained within.
# 2. We're currently uploading a vanilla tarball. In order to be OVF/OVA
# compliant, we'll need to embed a minimal OVF manifest as the first
# file.
headers = {
'content-type': 'application/octet-stream',
'transfer-encoding': 'chunked',
'x-image-meta-is_public': 'True',
'x-image-meta-is-public': 'True',
'x-image-meta-status': 'queued',
'x-image-meta-type': 'vhd'}
'x-image-meta-disk-format': 'vhd',
'x-image-meta-container-format': 'ovf'}
for header, value in headers.iteritems():
conn.putheader(header, value)
conn.endheaders()

View File

@ -0,0 +1,117 @@
#!/usr/bin/env python
# Copyright 2010 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
XenAPI Plugin for transfering data between host nodes
"""
import os
import os.path
import pickle
import shutil
import subprocess
import XenAPIPlugin
from pluginlib_nova import *
configure_logging('migration')
def move_vhds_into_sr(session, args):
"""Moves the VHDs from their copied location to the SR"""
params = pickle.loads(exists(args, 'params'))
instance_id = params['instance_id']
old_base_copy_uuid = params['old_base_copy_uuid']
old_cow_uuid = params['old_cow_uuid']
new_base_copy_uuid = params['new_base_copy_uuid']
new_cow_uuid = params['new_cow_uuid']
sr_path = params['sr_path']
sr_temp_path = "%s/images/" % sr_path
# Discover the copied VHDs locally, and then set up paths to copy
# them to under the SR
source_image_path = "%s/instance%d" % ('/images/', instance_id)
source_base_copy_path = "%s/%s.vhd" % (source_image_path,
old_base_copy_uuid)
source_cow_path = "%s/%s.vhd" % (source_image_path, old_cow_uuid)
temp_vhd_path = "%s/instance%d/" % (sr_temp_path, instance_id)
new_base_copy_path = "%s/%s.vhd" % (temp_vhd_path, new_base_copy_uuid)
new_cow_path = "%s/%s.vhd" % (temp_vhd_path, new_cow_uuid)
logging.debug('Creating temporary SR path %s' % temp_vhd_path)
os.makedirs(temp_vhd_path)
logging.debug('Moving %s into %s' % (source_base_copy_path, temp_vhd_path))
shutil.move(source_base_copy_path, new_base_copy_path)
logging.debug('Moving %s into %s' % (source_cow_path, temp_vhd_path))
shutil.move(source_cow_path, new_cow_path)
logging.debug('Cleaning up %s' % source_image_path)
os.rmdir(source_image_path)
# Link the COW to the base copy
logging.debug('Attaching COW to the base copy %s -> %s' %
(new_cow_path, new_base_copy_path))
subprocess.call(shlex.split('/usr/sbin/vhd-util modify -n %s -p %s' %
(new_cow_path, new_base_copy_path)))
logging.debug('Moving VHDs into SR %s' % sr_path)
shutil.move("%s/%s.vhd" % (temp_vhd_path, new_base_copy_uuid), sr_path)
shutil.move("%s/%s.vhd" % (temp_vhd_path, new_cow_uuid), sr_path)
logging.debug('Cleaning up temporary SR path %s' % temp_vhd_path)
os.rmdir(temp_vhd_path)
return ""
def transfer_vhd(session, args):
"""Rsyncs a VHD to an adjacent host"""
params = pickle.loads(exists(args, 'params'))
instance_id = params['instance_id']
host = params['host']
vdi_uuid = params['vdi_uuid']
sr_path = params['sr_path']
vhd_path = "%s.vhd" % vdi_uuid
source_path = "%s/%s" % (sr_path, vhd_path)
dest_path = '%s:%sinstance%d/' % (host, '/images/', instance_id)
logging.debug("Preparing to transmit %s to %s" % (source_path,
dest_path))
ssh_cmd = 'ssh -o StrictHostKeyChecking=no'
rsync_args = shlex.split('nohup /usr/bin/rsync -av --progress -e %s %s %s'
% (ssh_cmd, source_path, dest_path))
logging.debug('rsync %s' % (' '.join(rsync_args, )))
rsync_proc = subprocess.Popen(rsync_args, stdout=subprocess.PIPE)
logging.debug('Rsync output: \n %s' % rsync_proc.communicate()[0])
logging.debug('Rsync return: %d' % rsync_proc.returncode)
if rsync_proc.returncode != 0:
raise Exception("Unexpected VHD transfer failure")
return ""
if __name__ == '__main__':
XenAPIPlugin.dispatch({'transfer_vhd': transfer_vhd,
'move_vhds_into_sr': move_vhds_into_sr, })