Implementation for XenServer migrations. There are several places for optimization but I based the current implementation on the chance scheduler just to be safe. Beyond that, a few features are missing, such as ensuring the IP address is transferred along with the migrated instance. This will be added in a subsequent patch. Finally, everything is implemented through the Openstack API resize hooks, but actual resizing of the instance RAM and hard drive space is not yet implemented.
This commit is contained in:
commit
421cab4312
@ -203,10 +203,58 @@ class Controller(wsgi.Controller):
|
||||
return exc.HTTPNoContent()
|
||||
|
||||
def action(self, req, id):
|
||||
""" Multi-purpose method used to reboot, rebuild, and
|
||||
resize a server """
|
||||
"""Multi-purpose method used to reboot, rebuild, or
|
||||
resize a server"""
|
||||
|
||||
actions = {
|
||||
'reboot': self._action_reboot,
|
||||
'resize': self._action_resize,
|
||||
'confirmResize': self._action_confirm_resize,
|
||||
'revertResize': self._action_revert_resize,
|
||||
'rebuild': self._action_rebuild,
|
||||
}
|
||||
|
||||
input_dict = self._deserialize(req.body, req)
|
||||
#TODO(sandy): rebuild/resize not supported.
|
||||
for key in actions.keys():
|
||||
if key in input_dict:
|
||||
return actions[key](input_dict, req, id)
|
||||
return faults.Fault(exc.HTTPNotImplemented())
|
||||
|
||||
def _action_confirm_resize(self, input_dict, req, id):
|
||||
try:
|
||||
self.compute_api.confirm_resize(req.environ['nova.context'], id)
|
||||
except Exception, e:
|
||||
LOG.exception(_("Error in confirm-resize %s"), e)
|
||||
return faults.Fault(exc.HTTPBadRequest())
|
||||
return exc.HTTPNoContent()
|
||||
|
||||
def _action_revert_resize(self, input_dict, req, id):
|
||||
try:
|
||||
self.compute_api.revert_resize(req.environ['nova.context'], id)
|
||||
except Exception, e:
|
||||
LOG.exception(_("Error in revert-resize %s"), e)
|
||||
return faults.Fault(exc.HTTPBadRequest())
|
||||
return exc.HTTPAccepted()
|
||||
|
||||
def _action_rebuild(self, input_dict, req, id):
|
||||
return faults.Fault(exc.HTTPNotImplemented())
|
||||
|
||||
def _action_resize(self, input_dict, req, id):
|
||||
""" Resizes a given instance to the flavor size requested """
|
||||
try:
|
||||
if 'resize' in input_dict and 'flavorId' in input_dict['resize']:
|
||||
flavor_id = input_dict['resize']['flavorId']
|
||||
self.compute_api.resize(req.environ['nova.context'], id,
|
||||
flavor_id)
|
||||
else:
|
||||
LOG.exception(_("Missing arguments for resize"))
|
||||
return faults.Fault(exc.HTTPUnprocessableEntity())
|
||||
except Exception, e:
|
||||
LOG.exception(_("Error in resize %s"), e)
|
||||
return faults.Fault(exc.HTTPBadRequest())
|
||||
return faults.Fault(exc.HTTPAccepted())
|
||||
|
||||
def _action_reboot(self, input_dict, req, id):
|
||||
try:
|
||||
reboot_type = input_dict['reboot']['type']
|
||||
except Exception:
|
||||
|
@ -404,6 +404,10 @@ class API(base.Base):
|
||||
kwargs = {'method': method, 'args': params}
|
||||
return rpc.call(context, queue, kwargs)
|
||||
|
||||
def _cast_scheduler_message(self, context, args):
|
||||
"""Generic handler for RPC calls to the scheduler"""
|
||||
rpc.cast(context, FLAGS.scheduler_topic, args)
|
||||
|
||||
def snapshot(self, context, instance_id, name):
|
||||
"""Snapshot the given instance.
|
||||
|
||||
@ -420,6 +424,45 @@ class API(base.Base):
|
||||
"""Reboot the given instance."""
|
||||
self._cast_compute_message('reboot_instance', context, instance_id)
|
||||
|
||||
def revert_resize(self, context, instance_id):
|
||||
"""Reverts a resize, deleting the 'new' instance in the process"""
|
||||
context = context.elevated()
|
||||
migration_ref = self.db.migration_get_by_instance_and_status(context,
|
||||
instance_id, 'finished')
|
||||
if not migration_ref:
|
||||
raise exception.NotFound(_("No finished migrations found for "
|
||||
"instance"))
|
||||
|
||||
params = {'migration_id': migration_ref['id']}
|
||||
self._cast_compute_message('revert_resize', context, instance_id,
|
||||
migration_ref['dest_compute'], params=params)
|
||||
|
||||
def confirm_resize(self, context, instance_id):
|
||||
"""Confirms a migration/resize, deleting the 'old' instance in the
|
||||
process."""
|
||||
context = context.elevated()
|
||||
migration_ref = self.db.migration_get_by_instance_and_status(context,
|
||||
instance_id, 'finished')
|
||||
if not migration_ref:
|
||||
raise exception.NotFound(_("No finished migrations found for "
|
||||
"instance"))
|
||||
instance_ref = self.db.instance_get(context, instance_id)
|
||||
params = {'migration_id': migration_ref['id']}
|
||||
self._cast_compute_message('confirm_resize', context, instance_id,
|
||||
migration_ref['source_compute'], params=params)
|
||||
|
||||
self.db.migration_update(context, migration_id,
|
||||
{'status': 'confirmed'})
|
||||
self.db.instance_update(context, instance_id,
|
||||
{'host': migration_ref['dest_compute'], })
|
||||
|
||||
def resize(self, context, instance_id, flavor):
|
||||
"""Resize a running instance."""
|
||||
self._cast_scheduler_message(context,
|
||||
{"method": "prep_resize",
|
||||
"args": {"topic": FLAGS.compute_topic,
|
||||
"instance_id": instance_id, }},)
|
||||
|
||||
def pause(self, context, instance_id):
|
||||
"""Pause the given instance."""
|
||||
self._cast_compute_message('pause_instance', context, instance_id)
|
||||
|
@ -411,6 +411,112 @@ class ComputeManager(manager.Manager):
|
||||
"""Update instance state when async task completes."""
|
||||
self._update_state(context, instance_id)
|
||||
|
||||
@exception.wrap_exception
|
||||
@checks_instance_lock
|
||||
def confirm_resize(self, context, instance_id, migration_id):
|
||||
"""Destroys the source instance"""
|
||||
context = context.elevated()
|
||||
instance_ref = self.db.instance_get(context, instance_id)
|
||||
migration_ref = self.db.migration_get(context, migration_id)
|
||||
self.driver.destroy(instance_ref)
|
||||
|
||||
@exception.wrap_exception
|
||||
@checks_instance_lock
|
||||
def revert_resize(self, context, instance_id, migration_id):
|
||||
"""Destroys the new instance on the destination machine,
|
||||
reverts the model changes, and powers on the old
|
||||
instance on the source machine"""
|
||||
instance_ref = self.db.instance_get(context, instance_id)
|
||||
migration_ref = self.db.migration_get(context, migration_id)
|
||||
|
||||
#TODO(mdietz): we may want to split these into separate methods.
|
||||
if migration_ref['source_compute'] == FLAGS.host:
|
||||
self.driver._start(instance_ref)
|
||||
self.db.migration_update(context, migration_id,
|
||||
{'status': 'reverted'})
|
||||
else:
|
||||
self.driver.destroy(instance_ref)
|
||||
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
|
||||
instance_ref['host'])
|
||||
rpc.cast(context, topic,
|
||||
{'method': 'revert_resize',
|
||||
'args': {
|
||||
'migration_id': migration_ref['id'],
|
||||
'instance_id': instance_id, },
|
||||
})
|
||||
|
||||
@exception.wrap_exception
|
||||
@checks_instance_lock
|
||||
def prep_resize(self, context, instance_id):
|
||||
"""Initiates the process of moving a running instance to another
|
||||
host, possibly changing the RAM and disk size in the process"""
|
||||
context = context.elevated()
|
||||
instance_ref = self.db.instance_get(context, instance_id)
|
||||
if instance_ref['host'] == FLAGS.host:
|
||||
raise exception.Error(_(
|
||||
'Migration error: destination same as source!'))
|
||||
|
||||
migration_ref = self.db.migration_create(context,
|
||||
{'instance_id': instance_id,
|
||||
'source_compute': instance_ref['host'],
|
||||
'dest_compute': FLAGS.host,
|
||||
'dest_host': self.driver.get_host_ip_addr(),
|
||||
'status': 'pre-migrating'})
|
||||
LOG.audit(_('instance %s: migrating to '), instance_id,
|
||||
context=context)
|
||||
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
|
||||
instance_ref['host'])
|
||||
rpc.cast(context, topic,
|
||||
{'method': 'resize_instance',
|
||||
'args': {
|
||||
'migration_id': migration_ref['id'],
|
||||
'instance_id': instance_id, },
|
||||
})
|
||||
|
||||
@exception.wrap_exception
|
||||
@checks_instance_lock
|
||||
def resize_instance(self, context, instance_id, migration_id):
|
||||
"""Starts the migration of a running instance to another host"""
|
||||
migration_ref = self.db.migration_get(context, migration_id)
|
||||
instance_ref = self.db.instance_get(context, instance_id)
|
||||
self.db.migration_update(context, migration_id,
|
||||
{'status': 'migrating', })
|
||||
|
||||
disk_info = self.driver.migrate_disk_and_power_off(instance_ref,
|
||||
migration_ref['dest_host'])
|
||||
self.db.migration_update(context, migration_id,
|
||||
{'status': 'post-migrating', })
|
||||
|
||||
#TODO(mdietz): This is where we would update the VM record
|
||||
#after resizing
|
||||
service = self.db.service_get_by_host_and_topic(context,
|
||||
migration_ref['dest_compute'], FLAGS.compute_topic)
|
||||
topic = self.db.queue_get_for(context, FLAGS.compute_topic,
|
||||
migration_ref['dest_compute'])
|
||||
rpc.cast(context, topic,
|
||||
{'method': 'finish_resize',
|
||||
'args': {
|
||||
'migration_id': migration_id,
|
||||
'instance_id': instance_id,
|
||||
'disk_info': disk_info, },
|
||||
})
|
||||
|
||||
@exception.wrap_exception
|
||||
@checks_instance_lock
|
||||
def finish_resize(self, context, instance_id, migration_id, disk_info):
|
||||
"""Completes the migration process by setting up the newly transferred
|
||||
disk and turning on the instance on its new host machine"""
|
||||
migration_ref = self.db.migration_get(context, migration_id)
|
||||
instance_ref = self.db.instance_get(context,
|
||||
migration_ref['instance_id'])
|
||||
|
||||
# this may get passed into the following spawn instead
|
||||
new_disk_info = self.driver.attach_disk(instance_ref, disk_info)
|
||||
self.driver.spawn(instance_ref, disk=new_disk_info)
|
||||
|
||||
self.db.migration_update(context, migration_id,
|
||||
{'status': 'finished', })
|
||||
|
||||
@exception.wrap_exception
|
||||
@checks_instance_lock
|
||||
def pause_instance(self, context, instance_id):
|
||||
|
@ -80,10 +80,15 @@ def service_destroy(context, instance_id):
|
||||
|
||||
|
||||
def service_get(context, service_id):
|
||||
"""Get an service or raise if it does not exist."""
|
||||
"""Get a service or raise if it does not exist."""
|
||||
return IMPL.service_get(context, service_id)
|
||||
|
||||
|
||||
def service_get_by_host_and_topic(context, host, topic):
|
||||
"""Get a service by host it's on and topic it listens to"""
|
||||
return IMPL.service_get_by_host_and_topic(context, host, topic)
|
||||
|
||||
|
||||
def service_get_all(context, disabled=False):
|
||||
"""Get all services."""
|
||||
return IMPL.service_get_all(context, disabled)
|
||||
@ -254,6 +259,28 @@ def floating_ip_get_by_address(context, address):
|
||||
|
||||
####################
|
||||
|
||||
def migration_update(context, id, values):
|
||||
"""Update a migration instance"""
|
||||
return IMPL.migration_update(context, id, values)
|
||||
|
||||
|
||||
def migration_create(context, values):
|
||||
"""Create a migration record"""
|
||||
return IMPL.migration_create(context, values)
|
||||
|
||||
|
||||
def migration_get(context, migration_id):
|
||||
"""Finds a migration by the id"""
|
||||
return IMPL.migration_get(context, migration_id)
|
||||
|
||||
|
||||
def migration_get_by_instance_and_status(context, instance_id, status):
|
||||
"""Finds a migration by the instance id its migrating"""
|
||||
return IMPL.migration_get_by_instance_and_status(context, instance_id,
|
||||
status)
|
||||
|
||||
####################
|
||||
|
||||
|
||||
def fixed_ip_associate(context, address, instance_id):
|
||||
"""Associate fixed ip to instance.
|
||||
|
@ -154,6 +154,17 @@ def service_get_all_by_topic(context, topic):
|
||||
all()
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def service_get_by_host_and_topic(context, host, topic):
|
||||
session = get_session()
|
||||
return session.query(models.Service).\
|
||||
filter_by(deleted=False).\
|
||||
filter_by(disabled=False).\
|
||||
filter_by(host=host).\
|
||||
filter_by(topic=topic).\
|
||||
first()
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def service_get_all_by_host(context, host):
|
||||
session = get_session()
|
||||
@ -1972,6 +1983,51 @@ def host_get_networks(context, host):
|
||||
all()
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def migration_create(context, values):
|
||||
migration = models.Migration()
|
||||
migration.update(values)
|
||||
migration.save()
|
||||
return migration
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def migration_update(context, id, values):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
migration = migration_get(context, id, session=session)
|
||||
migration.update(values)
|
||||
migration.save(session=session)
|
||||
return migration
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def migration_get(context, id, session=None):
|
||||
if not session:
|
||||
session = get_session()
|
||||
result = session.query(models.Migration).\
|
||||
filter_by(id=id).first()
|
||||
if not result:
|
||||
raise exception.NotFound(_("No migration found with id %s")
|
||||
% migration_id)
|
||||
return result
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def migration_get_by_instance_and_status(context, instance_id, status):
|
||||
session = get_session()
|
||||
result = session.query(models.Migration).\
|
||||
filter_by(instance_id=instance_id).\
|
||||
filter_by(status=status).first()
|
||||
if not result:
|
||||
raise exception.NotFound(_("No migration found with instance id %s")
|
||||
% migration_id)
|
||||
return result
|
||||
|
||||
|
||||
##################
|
||||
|
||||
|
||||
|
@ -0,0 +1,61 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.from sqlalchemy import *
|
||||
|
||||
from sqlalchemy import *
|
||||
from migrate import *
|
||||
|
||||
from nova import log as logging
|
||||
|
||||
|
||||
meta = MetaData()
|
||||
|
||||
# Just for the ForeignKey and column creation to succeed, these are not the
|
||||
# actual definitions of instances or services.
|
||||
instances = Table('instances', meta,
|
||||
Column('id', Integer(), primary_key=True, nullable=False),
|
||||
)
|
||||
|
||||
#
|
||||
# New Tables
|
||||
#
|
||||
|
||||
migrations = Table('migrations', meta,
|
||||
Column('created_at', DateTime(timezone=False)),
|
||||
Column('updated_at', DateTime(timezone=False)),
|
||||
Column('deleted_at', DateTime(timezone=False)),
|
||||
Column('deleted', Boolean(create_constraint=True, name=None)),
|
||||
Column('id', Integer(), primary_key=True, nullable=False),
|
||||
Column('source_compute', String(255)),
|
||||
Column('dest_compute', String(255)),
|
||||
Column('dest_host', String(255)),
|
||||
Column('instance_id', Integer, ForeignKey('instances.id'),
|
||||
nullable=True),
|
||||
Column('status', String(255)),
|
||||
)
|
||||
|
||||
|
||||
def upgrade(migrate_engine):
|
||||
# Upgrade operations go here. Don't create your own engine;
|
||||
# bind migrate_engine to your metadata
|
||||
meta.bind = migrate_engine
|
||||
for table in (migrations, ):
|
||||
try:
|
||||
table.create()
|
||||
except Exception:
|
||||
logging.info(repr(table))
|
||||
logging.exception('Exception while creating table')
|
||||
raise
|
@ -60,7 +60,7 @@ def db_version():
|
||||
'key_pairs', 'networks', 'projects', 'quotas',
|
||||
'security_group_instance_association',
|
||||
'security_group_rules', 'security_groups',
|
||||
'services',
|
||||
'services', 'migrations',
|
||||
'users', 'user_project_association',
|
||||
'user_project_role_association',
|
||||
'user_role_association',
|
||||
|
@ -389,6 +389,18 @@ class KeyPair(BASE, NovaBase):
|
||||
public_key = Column(Text)
|
||||
|
||||
|
||||
class Migration(BASE, NovaBase):
|
||||
"""Represents a running host-to-host migration."""
|
||||
__tablename__ = 'migrations'
|
||||
id = Column(Integer, primary_key=True, nullable=False)
|
||||
source_compute = Column(String(255))
|
||||
dest_compute = Column(String(255))
|
||||
dest_host = Column(String(255))
|
||||
instance_id = Column(Integer, ForeignKey('instances.id'), nullable=True)
|
||||
#TODO(_cerberus_): enum
|
||||
status = Column(String(255))
|
||||
|
||||
|
||||
class Network(BASE, NovaBase):
|
||||
"""Represents a network."""
|
||||
__tablename__ = 'networks'
|
||||
@ -598,7 +610,7 @@ def register_models():
|
||||
Network, SecurityGroup, SecurityGroupIngressRule,
|
||||
SecurityGroupInstanceAssociation, AuthToken, User,
|
||||
Project, Certificate, ConsolePool, Console, Zone,
|
||||
InstanceMetadata)
|
||||
InstanceMetadata, Migration)
|
||||
engine = create_engine(FLAGS.sql_connection, echo=False)
|
||||
for model in models:
|
||||
model.metadata.create_all(engine)
|
||||
|
@ -123,7 +123,7 @@ class Consumer(messaging.Consumer):
|
||||
LOG.error(_("Reconnected to queue"))
|
||||
self.failed_connection = False
|
||||
# NOTE(vish): This is catching all errors because we really don't
|
||||
# exceptions to be logged 10 times a second if some
|
||||
# want exceptions to be logged 10 times a second if some
|
||||
# persistent failure occurs.
|
||||
except Exception: # pylint: disable-msg=W0703
|
||||
if not self.failed_connection:
|
||||
|
35
nova/tests/api/openstack/common.py
Normal file
35
nova/tests/api/openstack/common.py
Normal file
@ -0,0 +1,35 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
import webob
|
||||
|
||||
|
||||
def webob_factory(url):
|
||||
"""Factory for removing duplicate webob code from tests"""
|
||||
|
||||
base_url = url
|
||||
|
||||
def web_request(url, method=None, body=None):
|
||||
req = webob.Request.blank("%s%s" % (base_url, url))
|
||||
if method:
|
||||
req.method = method
|
||||
if body:
|
||||
req.body = json.dumps(body)
|
||||
return req
|
||||
return web_request
|
@ -1,6 +1,6 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 OpenStack LLC.
|
||||
# Copyright 2010-2011 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@ -26,10 +26,12 @@ from nova import flags
|
||||
from nova import test
|
||||
import nova.api.openstack
|
||||
from nova.api.openstack import servers
|
||||
import nova.compute.api
|
||||
import nova.db.api
|
||||
from nova.db.sqlalchemy.models import Instance
|
||||
from nova.db.sqlalchemy.models import InstanceMetadata
|
||||
import nova.rpc
|
||||
from nova.tests.api.openstack import common
|
||||
from nova.tests.api.openstack import fakes
|
||||
|
||||
|
||||
@ -144,6 +146,8 @@ class ServersTest(test.TestCase):
|
||||
self.stubs.Set(nova.compute.API, "get_actions", fake_compute_api)
|
||||
self.allow_admin = FLAGS.allow_admin_api
|
||||
|
||||
self.webreq = common.webob_factory('/v1.0/servers')
|
||||
|
||||
def tearDown(self):
|
||||
self.stubs.UnsetAll()
|
||||
FLAGS.allow_admin_api = self.allow_admin
|
||||
@ -465,3 +469,99 @@ class ServersTest(test.TestCase):
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status, '202 Accepted')
|
||||
self.assertEqual(self.server_delete_called, True)
|
||||
|
||||
def test_resize_server(self):
|
||||
req = self.webreq('/1/action', 'POST', dict(resize=dict(flavorId=3)))
|
||||
|
||||
self.resize_called = False
|
||||
|
||||
def resize_mock(*args):
|
||||
self.resize_called = True
|
||||
|
||||
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
|
||||
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status_int, 202)
|
||||
self.assertEqual(self.resize_called, True)
|
||||
|
||||
def test_resize_bad_flavor_fails(self):
|
||||
req = self.webreq('/1/action', 'POST', dict(resize=dict(derp=3)))
|
||||
|
||||
self.resize_called = False
|
||||
|
||||
def resize_mock(*args):
|
||||
self.resize_called = True
|
||||
|
||||
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
|
||||
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status_int, 422)
|
||||
self.assertEqual(self.resize_called, False)
|
||||
|
||||
def test_resize_raises_fails(self):
|
||||
req = self.webreq('/1/action', 'POST', dict(resize=dict(flavorId=3)))
|
||||
|
||||
def resize_mock(*args):
|
||||
raise Exception('hurr durr')
|
||||
|
||||
self.stubs.Set(nova.compute.api.API, 'resize', resize_mock)
|
||||
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status_int, 400)
|
||||
|
||||
def test_confirm_resize_server(self):
|
||||
req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
|
||||
|
||||
self.resize_called = False
|
||||
|
||||
def confirm_resize_mock(*args):
|
||||
self.resize_called = True
|
||||
|
||||
self.stubs.Set(nova.compute.api.API, 'confirm_resize',
|
||||
confirm_resize_mock)
|
||||
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status_int, 204)
|
||||
self.assertEqual(self.resize_called, True)
|
||||
|
||||
def test_confirm_resize_server_fails(self):
|
||||
req = self.webreq('/1/action', 'POST', dict(confirmResize=None))
|
||||
|
||||
def confirm_resize_mock(*args):
|
||||
raise Exception('hurr durr')
|
||||
|
||||
self.stubs.Set(nova.compute.api.API, 'confirm_resize',
|
||||
confirm_resize_mock)
|
||||
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status_int, 400)
|
||||
|
||||
def test_revert_resize_server(self):
|
||||
req = self.webreq('/1/action', 'POST', dict(revertResize=None))
|
||||
|
||||
self.resize_called = False
|
||||
|
||||
def revert_resize_mock(*args):
|
||||
self.resize_called = True
|
||||
|
||||
self.stubs.Set(nova.compute.api.API, 'revert_resize',
|
||||
revert_resize_mock)
|
||||
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status_int, 202)
|
||||
self.assertEqual(self.resize_called, True)
|
||||
|
||||
def test_revert_resize_server_fails(self):
|
||||
req = self.webreq('/1/action', 'POST', dict(revertResize=None))
|
||||
|
||||
def revert_resize_mock(*args):
|
||||
raise Exception('hurr durr')
|
||||
|
||||
self.stubs.Set(nova.compute.api.API, 'revert_resize',
|
||||
revert_resize_mock)
|
||||
|
||||
res = req.get_response(fakes.wsgi_app())
|
||||
self.assertEqual(res.status_int, 400)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@ -57,7 +57,7 @@ class ComputeTestCase(test.TestCase):
|
||||
self.manager.delete_project(self.project)
|
||||
super(ComputeTestCase, self).tearDown()
|
||||
|
||||
def _create_instance(self):
|
||||
def _create_instance(self, params={}):
|
||||
"""Create a test instance"""
|
||||
inst = {}
|
||||
inst['image_id'] = 'ami-test'
|
||||
@ -68,6 +68,7 @@ class ComputeTestCase(test.TestCase):
|
||||
inst['instance_type'] = 'm1.tiny'
|
||||
inst['mac_address'] = utils.generate_mac()
|
||||
inst['ami_launch_index'] = 0
|
||||
inst.update(params)
|
||||
return db.instance_create(self.context, inst)['id']
|
||||
|
||||
def _create_group(self):
|
||||
@ -268,9 +269,30 @@ class ComputeTestCase(test.TestCase):
|
||||
|
||||
self.compute.terminate_instance(self.context, instance_id)
|
||||
|
||||
def test_resize_instance(self):
|
||||
"""Ensure instance can be migrated/resized"""
|
||||
instance_id = self._create_instance()
|
||||
context = self.context.elevated()
|
||||
self.compute.run_instance(self.context, instance_id)
|
||||
db.instance_update(self.context, instance_id, {'host': 'foo'})
|
||||
self.compute.prep_resize(context, instance_id)
|
||||
migration_ref = db.migration_get_by_instance_and_status(context,
|
||||
instance_id, 'pre-migrating')
|
||||
self.compute.resize_instance(context, instance_id,
|
||||
migration_ref['id'])
|
||||
self.compute.terminate_instance(context, instance_id)
|
||||
|
||||
def test_get_by_flavor_id(self):
|
||||
type = instance_types.get_by_flavor_id(1)
|
||||
self.assertEqual(type, 'm1.tiny')
|
||||
|
||||
def test_resize_same_source_fails(self):
|
||||
"""Ensure instance fails to migrate when source and destination are
|
||||
the same host"""
|
||||
instance_id = self._create_instance()
|
||||
self.compute.run_instance(self.context, instance_id)
|
||||
self.assertRaises(exception.Error, self.compute.prep_resize,
|
||||
self.context, instance_id)
|
||||
self.compute.terminate_instance(self.context, instance_id)
|
||||
type = instance_types.get_by_flavor_id("1")
|
||||
self.assertEqual(type, 'm1.tiny')
|
||||
|
@ -346,6 +346,44 @@ class XenAPIDiffieHellmanTestCase(test.TestCase):
|
||||
super(XenAPIDiffieHellmanTestCase, self).tearDown()
|
||||
|
||||
|
||||
class XenAPIMigrateInstance(test.TestCase):
|
||||
"""
|
||||
Unit test for verifying migration-related actions
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(XenAPIMigrateInstance, self).setUp()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
FLAGS.target_host = '127.0.0.1'
|
||||
FLAGS.xenapi_connection_url = 'test_url'
|
||||
FLAGS.xenapi_connection_password = 'test_pass'
|
||||
db_fakes.stub_out_db_instance_api(self.stubs)
|
||||
stubs.stub_out_get_target(self.stubs)
|
||||
xenapi_fake.reset()
|
||||
self.values = {'name': 1, 'id': 1,
|
||||
'project_id': 'fake',
|
||||
'user_id': 'fake',
|
||||
'image_id': 1,
|
||||
'kernel_id': 2,
|
||||
'ramdisk_id': 3,
|
||||
'instance_type': 'm1.large',
|
||||
'mac_address': 'aa:bb:cc:dd:ee:ff',
|
||||
}
|
||||
stubs.stub_out_migration_methods(self.stubs)
|
||||
|
||||
def test_migrate_disk_and_power_off(self):
|
||||
instance = db.instance_create(self.values)
|
||||
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
|
||||
conn = xenapi_conn.get_connection(False)
|
||||
conn.migrate_disk_and_power_off(instance, '127.0.0.1')
|
||||
|
||||
def test_attach_disk(self):
|
||||
instance = db.instance_create(self.values)
|
||||
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
|
||||
conn = xenapi_conn.get_connection(False)
|
||||
conn.attach_disk(instance, {'base_copy': 'hurr', 'cow': 'durr'})
|
||||
|
||||
|
||||
class XenAPIDetermineDiskImageTestCase(test.TestCase):
|
||||
"""
|
||||
Unit tests for code that detects the ImageType
|
||||
|
@ -20,6 +20,7 @@ from nova.virt import xenapi_conn
|
||||
from nova.virt.xenapi import fake
|
||||
from nova.virt.xenapi import volume_utils
|
||||
from nova.virt.xenapi import vm_utils
|
||||
from nova.virt.xenapi import vmops
|
||||
|
||||
|
||||
def stubout_instance_snapshot(stubs):
|
||||
@ -217,3 +218,44 @@ class FakeSessionForVolumeFailedTests(FakeSessionForVolumeTests):
|
||||
|
||||
def SR_forget(self, _1, ref):
|
||||
pass
|
||||
|
||||
|
||||
class FakeSessionForMigrationTests(fake.SessionBase):
|
||||
"""Stubs out a XenAPISession for Migration tests"""
|
||||
def __init__(self, uri):
|
||||
super(FakeSessionForMigrationTests, self).__init__(uri)
|
||||
|
||||
|
||||
def stub_out_migration_methods(stubs):
|
||||
def fake_get_snapshot(self, instance):
|
||||
return 'foo', 'bar'
|
||||
|
||||
@classmethod
|
||||
def fake_get_vdi(cls, session, vm_ref):
|
||||
vdi_ref = fake.create_vdi(name_label='derp', read_only=False,
|
||||
sr_ref='herp', sharable=False)
|
||||
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
|
||||
return vdi_ref, {'uuid': vdi_rec['uuid'], }
|
||||
|
||||
def fake_shutdown(self, inst, vm, method='clean'):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def fake_sr(cls, session, *args):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def fake_get_sr_path(cls, *args):
|
||||
return "fake"
|
||||
|
||||
def fake_destroy(*args, **kwargs):
|
||||
pass
|
||||
|
||||
stubs.Set(vmops.VMOps, '_destroy', fake_destroy)
|
||||
stubs.Set(vm_utils.VMHelper, 'scan_default_sr', fake_sr)
|
||||
stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
|
||||
stubs.Set(vmops.VMOps, '_get_snapshot', fake_get_snapshot)
|
||||
stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
|
||||
stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
|
||||
stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
|
||||
stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)
|
||||
|
@ -139,6 +139,24 @@ class FakeConnection(object):
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_host_ip_addr(self):
|
||||
"""
|
||||
Retrieves the IP address of the dom0
|
||||
"""
|
||||
pass
|
||||
|
||||
def resize(self, instance, flavor):
|
||||
"""
|
||||
Resizes/Migrates the specified instance.
|
||||
|
||||
The flavor parameter determines whether or not the instance RAM and
|
||||
disk space are modified, and if so, to what size.
|
||||
|
||||
The work will be done asynchronously. This function returns a task
|
||||
that allows the caller to detect when it is complete.
|
||||
"""
|
||||
pass
|
||||
|
||||
def set_admin_password(self, instance, new_pass):
|
||||
"""
|
||||
Set the root password on the specified instance.
|
||||
@ -179,6 +197,19 @@ class FakeConnection(object):
|
||||
"""
|
||||
pass
|
||||
|
||||
def migrate_disk_and_power_off(self, instance, dest):
|
||||
"""
|
||||
Transfers the disk of a running instance in multiple phases, turning
|
||||
off the instance before the end.
|
||||
"""
|
||||
pass
|
||||
|
||||
def attach_disk(self, instance, disk_info):
|
||||
"""
|
||||
Attaches the disk to an instance given the metadata disk_info
|
||||
"""
|
||||
pass
|
||||
|
||||
def pause(self, instance, callback):
|
||||
"""
|
||||
Pause the specified instance.
|
||||
|
@ -290,6 +290,9 @@ class SessionBase(object):
|
||||
#Always return 12GB available
|
||||
return 12 * 1024 * 1024 * 1024
|
||||
|
||||
def host_call_plugin(*args):
|
||||
return 'herp'
|
||||
|
||||
def xenapi_request(self, methodname, params):
|
||||
if methodname.startswith('login'):
|
||||
self._login(methodname, params)
|
||||
|
@ -252,17 +252,32 @@ class VMHelper(HelperBase):
|
||||
% locals())
|
||||
return vdi_ref
|
||||
|
||||
@classmethod
|
||||
def get_vdi_for_vm_safely(cls, session, vm_ref):
|
||||
vdi_refs = VMHelper.lookup_vm_vdis(session, vm_ref)
|
||||
if vdi_refs is None:
|
||||
raise Exception(_("No VDIs found for VM %s") % vm_ref)
|
||||
else:
|
||||
num_vdis = len(vdi_refs)
|
||||
if num_vdis != 1:
|
||||
raise Exception(
|
||||
_("Unexpected number of VDIs (%(num_vdis)s) found"
|
||||
" for VM %(vm_ref)s") % locals())
|
||||
|
||||
vdi_ref = vdi_refs[0]
|
||||
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref)
|
||||
return vdi_ref, vdi_rec
|
||||
|
||||
@classmethod
|
||||
def create_snapshot(cls, session, instance_id, vm_ref, label):
|
||||
""" Creates Snapshot (Template) VM, Snapshot VBD, Snapshot VDI,
|
||||
Snapshot VHD
|
||||
"""
|
||||
"""Creates Snapshot (Template) VM, Snapshot VBD, Snapshot VDI,
|
||||
Snapshot VHD"""
|
||||
#TODO(sirp): Add quiesce and VSS locking support when Windows support
|
||||
# is added
|
||||
LOG.debug(_("Snapshotting VM %(vm_ref)s with label '%(label)s'...")
|
||||
% locals())
|
||||
|
||||
vm_vdi_ref, vm_vdi_rec = get_vdi_for_vm_safely(session, vm_ref)
|
||||
vm_vdi_ref, vm_vdi_rec = cls.get_vdi_for_vm_safely(session, vm_ref)
|
||||
vm_vdi_uuid = vm_vdi_rec["uuid"]
|
||||
sr_ref = vm_vdi_rec["SR"]
|
||||
|
||||
@ -270,7 +285,8 @@ class VMHelper(HelperBase):
|
||||
|
||||
task = session.call_xenapi('Async.VM.snapshot', vm_ref, label)
|
||||
template_vm_ref = session.wait_for_task(task, instance_id)
|
||||
template_vdi_rec = get_vdi_for_vm_safely(session, template_vm_ref)[1]
|
||||
template_vdi_rec = cls.get_vdi_for_vm_safely(session,
|
||||
template_vm_ref)[1]
|
||||
template_vdi_uuid = template_vdi_rec["uuid"]
|
||||
|
||||
LOG.debug(_('Created snapshot %(template_vm_ref)s from'
|
||||
@ -284,6 +300,24 @@ class VMHelper(HelperBase):
|
||||
'snap': template_vdi_uuid}
|
||||
return template_vm_ref, template_vdi_uuids
|
||||
|
||||
@classmethod
|
||||
def get_sr(cls, session, sr_label='slices'):
|
||||
"""Finds the SR named by the given name label and returns
|
||||
the UUID"""
|
||||
return session.call_xenapi('SR.get_by_name_label', sr_label)[0]
|
||||
|
||||
@classmethod
|
||||
def get_sr_path(cls, session):
|
||||
"""Return the path to our storage repository
|
||||
|
||||
This is used when we're dealing with VHDs directly, either by taking
|
||||
snapshots or by restoring an image in the DISK_VHD format.
|
||||
"""
|
||||
sr_ref = safe_find_sr(session)
|
||||
sr_rec = session.get_xenapi().SR.get_record(sr_ref)
|
||||
sr_uuid = sr_rec["uuid"]
|
||||
return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)
|
||||
|
||||
@classmethod
|
||||
def upload_image(cls, session, instance_id, vdi_uuids, image_id):
|
||||
""" Requests that the Glance plugin bundle the specified VDIs and
|
||||
@ -298,7 +332,7 @@ class VMHelper(HelperBase):
|
||||
'image_id': image_id,
|
||||
'glance_host': FLAGS.glance_host,
|
||||
'glance_port': FLAGS.glance_port,
|
||||
'sr_path': get_sr_path(session)}
|
||||
'sr_path': cls.get_sr_path(session)}
|
||||
|
||||
kwargs = {'params': pickle.dumps(params)}
|
||||
task = session.async_call_plugin('glance', 'upload_vhd', kwargs)
|
||||
@ -341,13 +375,13 @@ class VMHelper(HelperBase):
|
||||
'glance_host': FLAGS.glance_host,
|
||||
'glance_port': FLAGS.glance_port,
|
||||
'uuid_stack': uuid_stack,
|
||||
'sr_path': get_sr_path(session)}
|
||||
'sr_path': cls.get_sr_path(session)}
|
||||
|
||||
kwargs = {'params': pickle.dumps(params)}
|
||||
task = session.async_call_plugin('glance', 'download_vhd', kwargs)
|
||||
vdi_uuid = session.wait_for_task(task, instance_id)
|
||||
|
||||
scan_sr(session, instance_id, sr_ref)
|
||||
cls.scan_sr(session, instance_id, sr_ref)
|
||||
|
||||
# Set the name-label to ease debugging
|
||||
vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid)
|
||||
@ -609,6 +643,21 @@ class VMHelper(HelperBase):
|
||||
except cls.XenAPI.Failure as e:
|
||||
return {"Unable to retrieve diagnostics": e}
|
||||
|
||||
@classmethod
|
||||
def scan_sr(cls, session, instance_id=None, sr_ref=None):
|
||||
"""Scans the SR specified by sr_ref"""
|
||||
if sr_ref:
|
||||
LOG.debug(_("Re-scanning SR %s"), sr_ref)
|
||||
task = session.call_xenapi('Async.SR.scan', sr_ref)
|
||||
session.wait_for_task(task, instance_id)
|
||||
|
||||
@classmethod
|
||||
def scan_default_sr(cls, session):
|
||||
"""Looks for the system default SR and triggers a re-scan"""
|
||||
#FIXME(sirp/mdietz): refactor scan_default_sr in there
|
||||
sr_ref = cls.get_sr(session)
|
||||
session.call_xenapi('SR.scan', sr_ref)
|
||||
|
||||
|
||||
def get_rrd(host, uuid):
|
||||
"""Return the VM RRD XML as a string"""
|
||||
@ -651,12 +700,6 @@ def get_vhd_parent_uuid(session, vdi_ref):
|
||||
return None
|
||||
|
||||
|
||||
def scan_sr(session, instance_id, sr_ref):
|
||||
LOG.debug(_("Re-scanning SR %s"), sr_ref)
|
||||
task = session.call_xenapi('Async.SR.scan', sr_ref)
|
||||
session.wait_for_task(task, instance_id)
|
||||
|
||||
|
||||
def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref,
|
||||
original_parent_uuid):
|
||||
""" Spin until the parent VHD is coalesced into its parent VHD
|
||||
@ -681,7 +724,7 @@ def wait_for_vhd_coalesce(session, instance_id, sr_ref, vdi_ref,
|
||||
" %(max_attempts)d), giving up...") % locals())
|
||||
raise exception.Error(msg)
|
||||
|
||||
scan_sr(session, instance_id, sr_ref)
|
||||
VMHelper.scan_sr(session, instance_id, sr_ref)
|
||||
parent_uuid = get_vhd_parent_uuid(session, vdi_ref)
|
||||
if original_parent_uuid and (parent_uuid != original_parent_uuid):
|
||||
LOG.debug(_("Parent %(parent_uuid)s doesn't match original parent"
|
||||
@ -738,18 +781,6 @@ def find_sr(session):
|
||||
return None
|
||||
|
||||
|
||||
def get_sr_path(session):
|
||||
"""Return the path to our storage repository
|
||||
|
||||
This is used when we're dealing with VHDs directly, either by taking
|
||||
snapshots or by restoring an image in the DISK_VHD format.
|
||||
"""
|
||||
sr_ref = safe_find_sr(session)
|
||||
sr_rec = session.get_xenapi().SR.get_record(sr_ref)
|
||||
sr_uuid = sr_rec["uuid"]
|
||||
return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)
|
||||
|
||||
|
||||
def remap_vbd_dev(dev):
|
||||
"""Return the appropriate location for a plugged-in VBD device
|
||||
|
||||
|
@ -22,6 +22,7 @@ Management class for VM-related functions (spawn, reboot, etc).
|
||||
import json
|
||||
import M2Crypto
|
||||
import os
|
||||
import pickle
|
||||
import subprocess
|
||||
import tempfile
|
||||
import uuid
|
||||
@ -61,7 +62,17 @@ class VMOps(object):
|
||||
vms.append(rec["name_label"])
|
||||
return vms
|
||||
|
||||
def spawn(self, instance):
|
||||
def _start(self, instance, vm_ref=None):
|
||||
"""Power on a VM instance"""
|
||||
if not vm_ref:
|
||||
vm_ref = VMHelper.lookup(self._session, instance.name)
|
||||
if vm_ref is None:
|
||||
raise exception(_('Attempted to power on non-existent instance'
|
||||
' bad instance id %s') % instance.id)
|
||||
LOG.debug(_("Starting instance %s"), instance.name)
|
||||
self._session.call_xenapi('VM.start', vm_ref, False, False)
|
||||
|
||||
def spawn(self, instance, disk):
|
||||
"""Create VM instance"""
|
||||
instance_name = instance.name
|
||||
vm = VMHelper.lookup(self._session, instance_name)
|
||||
@ -81,16 +92,22 @@ class VMOps(object):
|
||||
user = AuthManager().get_user(instance.user_id)
|
||||
project = AuthManager().get_project(instance.project_id)
|
||||
|
||||
disk_image_type = VMHelper.determine_disk_image_type(instance)
|
||||
vdi_ref = kernel = ramdisk = pv_kernel = None
|
||||
|
||||
vdi_uuid = VMHelper.fetch_image(self._session, instance.id,
|
||||
instance.image_id, user, project, disk_image_type)
|
||||
# Are we building from a pre-existing disk?
|
||||
if not disk:
|
||||
#if kernel is not present we must download a raw disk
|
||||
|
||||
vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
|
||||
disk_image_type = VMHelper.determine_disk_image_type(instance)
|
||||
vdi_uuid = VMHelper.fetch_image(self._session, instance.id,
|
||||
instance.image_id, user, project, disk_image_type)
|
||||
vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', vdi_uuid)
|
||||
|
||||
else:
|
||||
vdi_ref = self._session.call_xenapi('VDI.get_by_uuid', disk)
|
||||
|
||||
pv_kernel = False
|
||||
if disk_image_type == ImageType.DISK_RAW:
|
||||
#Have a look at the VDI and see if it has a PV kernel
|
||||
# Have a look at the VDI and see if it has a PV kernel
|
||||
pv_kernel = VMHelper.lookup_image(self._session, instance.id,
|
||||
vdi_ref)
|
||||
elif disk_image_type == ImageType.DISK_VHD:
|
||||
@ -98,19 +115,18 @@ class VMOps(object):
|
||||
# configurable as Windows will use HVM.
|
||||
pv_kernel = True
|
||||
|
||||
kernel = None
|
||||
if instance.kernel_id:
|
||||
kernel = VMHelper.fetch_image(self._session, instance.id,
|
||||
instance.kernel_id, user, project, ImageType.KERNEL_RAMDISK)
|
||||
|
||||
ramdisk = None
|
||||
if instance.ramdisk_id:
|
||||
ramdisk = VMHelper.fetch_image(self._session, instance.id,
|
||||
instance.ramdisk_id, user, project, ImageType.KERNEL_RAMDISK)
|
||||
|
||||
vm_ref = VMHelper.create_vm(self._session,
|
||||
instance, kernel, ramdisk, pv_kernel)
|
||||
VMHelper.create_vbd(self._session, vm_ref, vdi_ref, 0, True)
|
||||
VMHelper.create_vbd(session=self._session, vm_ref=vm_ref,
|
||||
vdi_ref=vdi_ref, userdevice=0, bootable=True)
|
||||
|
||||
# inject_network_info and create vifs
|
||||
networks = self.inject_network_info(instance)
|
||||
@ -217,7 +233,7 @@ class VMOps(object):
|
||||
"start")
|
||||
|
||||
def snapshot(self, instance, image_id):
|
||||
""" Create snapshot from a running VM instance
|
||||
"""Create snapshot from a running VM instance
|
||||
|
||||
:param instance: instance to be snapshotted
|
||||
:param image_id: id of image to upload to
|
||||
@ -238,7 +254,20 @@ class VMOps(object):
|
||||
that will bundle the VHDs together and then push the bundle into
|
||||
Glance.
|
||||
"""
|
||||
template_vm_ref = None
|
||||
try:
|
||||
template_vm_ref, template_vdi_uuids = self._get_snapshot(instance)
|
||||
# call plugin to ship snapshot off to glance
|
||||
VMHelper.upload_image(
|
||||
self._session, instance.id, template_vdi_uuids, image_id)
|
||||
finally:
|
||||
if template_vm_ref:
|
||||
self._destroy(instance, template_vm_ref,
|
||||
shutdown=False, destroy_kernel_ramdisk=False)
|
||||
|
||||
logging.debug(_("Finished snapshot and upload for VM %s"), instance)
|
||||
|
||||
def _get_snapshot(self, instance):
|
||||
#TODO(sirp): Add quiesce and VSS locking support when Windows support
|
||||
# is added
|
||||
|
||||
@ -249,20 +278,89 @@ class VMOps(object):
|
||||
try:
|
||||
template_vm_ref, template_vdi_uuids = VMHelper.create_snapshot(
|
||||
self._session, instance.id, vm_ref, label)
|
||||
return template_vm_ref, template_vdi_uuids
|
||||
except self.XenAPI.Failure, exc:
|
||||
logging.error(_("Unable to Snapshot %(vm_ref)s: %(exc)s")
|
||||
% locals())
|
||||
return
|
||||
|
||||
try:
|
||||
# call plugin to ship snapshot off to glance
|
||||
VMHelper.upload_image(
|
||||
self._session, instance.id, template_vdi_uuids, image_id)
|
||||
finally:
|
||||
self._destroy(instance, template_vm_ref, shutdown=False,
|
||||
destroy_kernel_ramdisk=False)
|
||||
def migrate_disk_and_power_off(self, instance, dest):
|
||||
"""Copies a VHD from one host machine to another
|
||||
|
||||
logging.debug(_("Finished snapshot and upload for VM %s"), instance)
|
||||
:param instance: the instance that owns the VHD in question
|
||||
:param dest: the destination host machine
|
||||
:param disk_type: values are 'primary' or 'cow'
|
||||
"""
|
||||
vm_ref = VMHelper.lookup(self._session, instance.name)
|
||||
|
||||
# The primary VDI becomes the COW after the snapshot, and we can
|
||||
# identify it via the VBD. The base copy is the parent_uuid returned
|
||||
# from the snapshot creation
|
||||
|
||||
base_copy_uuid = cow_uuid = None
|
||||
template_vdi_uuids = template_vm_ref = None
|
||||
try:
|
||||
# transfer the base copy
|
||||
template_vm_ref, template_vdi_uuids = self._get_snapshot(instance)
|
||||
base_copy_uuid = template_vdi_uuids[1]
|
||||
vdi_ref, vm_vdi_rec = \
|
||||
VMHelper.get_vdi_for_vm_safely(self._session, vm_ref)
|
||||
cow_uuid = vm_vdi_rec['uuid']
|
||||
|
||||
params = {'host': dest,
|
||||
'vdi_uuid': base_copy_uuid,
|
||||
'instance_id': instance.id,
|
||||
'sr_path': VMHelper.get_sr_path(self._session)}
|
||||
|
||||
task = self._session.async_call_plugin('migration', 'transfer_vhd',
|
||||
{'params': pickle.dumps(params)})
|
||||
self._session.wait_for_task(task, instance.id)
|
||||
|
||||
# Now power down the instance and transfer the COW VHD
|
||||
self._shutdown(instance, vm_ref, method='clean')
|
||||
|
||||
params = {'host': dest,
|
||||
'vdi_uuid': cow_uuid,
|
||||
'instance_id': instance.id,
|
||||
'sr_path': VMHelper.get_sr_path(self._session), }
|
||||
|
||||
task = self._session.async_call_plugin('migration', 'transfer_vhd',
|
||||
{'params': pickle.dumps(params)})
|
||||
self._session.wait_for_task(task, instance.id)
|
||||
|
||||
finally:
|
||||
if template_vm_ref:
|
||||
self._destroy(instance, template_vm_ref,
|
||||
shutdown=False, destroy_kernel_ramdisk=False)
|
||||
|
||||
# TODO(mdietz): we could also consider renaming these to something
|
||||
# sensible so we don't need to blindly pass around dictionaries
|
||||
return {'base_copy': base_copy_uuid, 'cow': cow_uuid}
|
||||
|
||||
def attach_disk(self, instance, disk_info):
|
||||
"""Links the base copy VHD to the COW via the XAPI plugin"""
|
||||
vm_ref = VMHelper.lookup(self._session, instance.name)
|
||||
new_base_copy_uuid = str(uuid.uuid4())
|
||||
new_cow_uuid = str(uuid.uuid4())
|
||||
params = {'instance_id': instance.id,
|
||||
'old_base_copy_uuid': disk_info['base_copy'],
|
||||
'old_cow_uuid': disk_info['cow'],
|
||||
'new_base_copy_uuid': new_base_copy_uuid,
|
||||
'new_cow_uuid': new_cow_uuid,
|
||||
'sr_path': VMHelper.get_sr_path(self._session), }
|
||||
|
||||
task = self._session.async_call_plugin('migration',
|
||||
'move_vhds_into_sr', {'params': pickle.dumps(params)})
|
||||
self._session.wait_for_task(task, instance.id)
|
||||
|
||||
# Now we rescan the SR so we find the VHDs
|
||||
VMHelper.scan_default_sr(self._session)
|
||||
|
||||
return new_cow_uuid
|
||||
|
||||
def resize(self, instance, flavor):
|
||||
"""Resize a running instance by changing it's RAM and disk size """
|
||||
raise NotImplementedError()
|
||||
|
||||
def reboot(self, instance):
|
||||
"""Reboot VM instance"""
|
||||
@ -308,11 +406,6 @@ class VMOps(object):
|
||||
raise RuntimeError(resp_dict['message'])
|
||||
return resp_dict['message']
|
||||
|
||||
def _start(self, instance, vm):
|
||||
"""Start an instance"""
|
||||
task = self._session.call_xenapi("Async.VM.start", vm, False, False)
|
||||
self._session.wait_for_task(task, instance.id)
|
||||
|
||||
def inject_file(self, instance, b64_path, b64_contents):
|
||||
"""Write a file to the VM instance. The path to which it is to be
|
||||
written and the contents of the file need to be supplied; both should
|
||||
@ -355,8 +448,7 @@ class VMOps(object):
|
||||
if hard:
|
||||
task = self._session.call_xenapi("Async.VM.hard_shutdown", vm)
|
||||
else:
|
||||
task = self._session.call_xenapi("Async.VM.clean_shutdown", vm)
|
||||
|
||||
task = self._session.call_xenapi('Async.VM.clean_shutdown', vm)
|
||||
self._session.wait_for_task(task, instance.id)
|
||||
except self.XenAPI.Failure, exc:
|
||||
LOG.exception(exc)
|
||||
|
@ -154,14 +154,18 @@ class XenAPIConnection(object):
|
||||
"""List VM instances"""
|
||||
return self._vmops.list_instances()
|
||||
|
||||
def spawn(self, instance):
|
||||
def spawn(self, instance, disk=None):
|
||||
"""Create VM instance"""
|
||||
self._vmops.spawn(instance)
|
||||
self._vmops.spawn(instance, disk)
|
||||
|
||||
def snapshot(self, instance, image_id):
|
||||
""" Create snapshot from a running VM instance """
|
||||
self._vmops.snapshot(instance, image_id)
|
||||
|
||||
def resize(self, instance, flavor):
|
||||
"""Resize a VM instance"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def reboot(self, instance):
|
||||
"""Reboot VM instance"""
|
||||
self._vmops.reboot(instance)
|
||||
@ -188,6 +192,15 @@ class XenAPIConnection(object):
|
||||
"""Unpause paused VM instance"""
|
||||
self._vmops.unpause(instance, callback)
|
||||
|
||||
def migrate_disk_and_power_off(self, instance, dest):
|
||||
"""Transfers the VHD of a running instance to another host, then shuts
|
||||
off the instance copies over the COW disk"""
|
||||
return self._vmops.migrate_disk_and_power_off(instance, dest)
|
||||
|
||||
def attach_disk(self, instance, disk_info):
|
||||
"""Moves the copied VDIs into the SR"""
|
||||
return self._vmops.attach_disk(instance, disk_info)
|
||||
|
||||
def suspend(self, instance, callback):
|
||||
"""suspend the specified instance"""
|
||||
self._vmops.suspend(instance, callback)
|
||||
@ -228,6 +241,10 @@ class XenAPIConnection(object):
|
||||
"""Return link to instance's ajax console"""
|
||||
return self._vmops.get_ajax_console(instance)
|
||||
|
||||
def get_host_ip_addr(self):
|
||||
xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
|
||||
return xs_url.netloc
|
||||
|
||||
def attach_volume(self, instance_name, device_path, mountpoint):
|
||||
"""Attach volume storage to VM instance"""
|
||||
return self._volumeops.attach_volume(instance_name,
|
||||
|
117
plugins/xenserver/xenapi/etc/xapi.d/plugins/migration
Normal file
117
plugins/xenserver/xenapi/etc/xapi.d/plugins/migration
Normal file
@ -0,0 +1,117 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2010 OpenStack LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
XenAPI Plugin for transfering data between host nodes
|
||||
"""
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import pickle
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
import XenAPIPlugin
|
||||
|
||||
from pluginlib_nova import *
|
||||
configure_logging('migration')
|
||||
|
||||
|
||||
def move_vhds_into_sr(session, args):
|
||||
"""Moves the VHDs from their copied location to the SR"""
|
||||
params = pickle.loads(exists(args, 'params'))
|
||||
instance_id = params['instance_id']
|
||||
|
||||
old_base_copy_uuid = params['old_base_copy_uuid']
|
||||
old_cow_uuid = params['old_cow_uuid']
|
||||
|
||||
new_base_copy_uuid = params['new_base_copy_uuid']
|
||||
new_cow_uuid = params['new_cow_uuid']
|
||||
|
||||
sr_path = params['sr_path']
|
||||
sr_temp_path = "%s/images/" % sr_path
|
||||
|
||||
# Discover the copied VHDs locally, and then set up paths to copy
|
||||
# them to under the SR
|
||||
source_image_path = "%s/instance%d" % ('/images/', instance_id)
|
||||
source_base_copy_path = "%s/%s.vhd" % (source_image_path,
|
||||
old_base_copy_uuid)
|
||||
source_cow_path = "%s/%s.vhd" % (source_image_path, old_cow_uuid)
|
||||
|
||||
temp_vhd_path = "%s/instance%d/" % (sr_temp_path, instance_id)
|
||||
new_base_copy_path = "%s/%s.vhd" % (temp_vhd_path, new_base_copy_uuid)
|
||||
new_cow_path = "%s/%s.vhd" % (temp_vhd_path, new_cow_uuid)
|
||||
|
||||
logging.debug('Creating temporary SR path %s' % temp_vhd_path)
|
||||
os.makedirs(temp_vhd_path)
|
||||
|
||||
logging.debug('Moving %s into %s' % (source_base_copy_path, temp_vhd_path))
|
||||
shutil.move(source_base_copy_path, new_base_copy_path)
|
||||
|
||||
logging.debug('Moving %s into %s' % (source_cow_path, temp_vhd_path))
|
||||
shutil.move(source_cow_path, new_cow_path)
|
||||
|
||||
logging.debug('Cleaning up %s' % source_image_path)
|
||||
os.rmdir(source_image_path)
|
||||
|
||||
# Link the COW to the base copy
|
||||
logging.debug('Attaching COW to the base copy %s -> %s' %
|
||||
(new_cow_path, new_base_copy_path))
|
||||
subprocess.call(shlex.split('/usr/sbin/vhd-util modify -n %s -p %s' %
|
||||
(new_cow_path, new_base_copy_path)))
|
||||
logging.debug('Moving VHDs into SR %s' % sr_path)
|
||||
shutil.move("%s/%s.vhd" % (temp_vhd_path, new_base_copy_uuid), sr_path)
|
||||
shutil.move("%s/%s.vhd" % (temp_vhd_path, new_cow_uuid), sr_path)
|
||||
|
||||
logging.debug('Cleaning up temporary SR path %s' % temp_vhd_path)
|
||||
os.rmdir(temp_vhd_path)
|
||||
return ""
|
||||
|
||||
|
||||
def transfer_vhd(session, args):
|
||||
"""Rsyncs a VHD to an adjacent host"""
|
||||
params = pickle.loads(exists(args, 'params'))
|
||||
instance_id = params['instance_id']
|
||||
host = params['host']
|
||||
vdi_uuid = params['vdi_uuid']
|
||||
sr_path = params['sr_path']
|
||||
vhd_path = "%s.vhd" % vdi_uuid
|
||||
|
||||
source_path = "%s/%s" % (sr_path, vhd_path)
|
||||
dest_path = '%s:%sinstance%d/' % (host, '/images/', instance_id)
|
||||
|
||||
logging.debug("Preparing to transmit %s to %s" % (source_path,
|
||||
dest_path))
|
||||
|
||||
ssh_cmd = 'ssh -o StrictHostKeyChecking=no'
|
||||
|
||||
rsync_args = shlex.split('nohup /usr/bin/rsync -av --progress -e %s %s %s'
|
||||
% (ssh_cmd, source_path, dest_path))
|
||||
|
||||
logging.debug('rsync %s' % (' '.join(rsync_args, )))
|
||||
|
||||
rsync_proc = subprocess.Popen(rsync_args, stdout=subprocess.PIPE)
|
||||
logging.debug('Rsync output: \n %s' % rsync_proc.communicate()[0])
|
||||
logging.debug('Rsync return: %d' % rsync_proc.returncode)
|
||||
if rsync_proc.returncode != 0:
|
||||
raise Exception("Unexpected VHD transfer failure")
|
||||
return ""
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
XenAPIPlugin.dispatch({'transfer_vhd': transfer_vhd,
|
||||
'move_vhds_into_sr': move_vhds_into_sr, })
|
Loading…
x
Reference in New Issue
Block a user