Merge "Switch all xenapi async plugin calls to be sync"

This commit is contained in:
Jenkins 2012-03-09 21:10:15 +00:00 committed by Gerrit Code Review
commit 07f8da5c46
7 changed files with 30 additions and 109 deletions

View File

@ -412,7 +412,6 @@ def stub_out_migration_methods(stubs):
stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr)
stubs.Set(vmops.VMOps, '_create_snapshot', fake_create_snapshot)
stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi)
stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None)
stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path)
stubs.Set(vmops.VMOps, 'reset_network', fake_reset_network)
stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown)

View File

@ -50,9 +50,7 @@ class Dom0IptablesFirewallDriver(firewall.IptablesFirewallDriver):
args = {}
args.update(map(lambda x: (x, str(kwargs[x])), kwargs))
args['cmd_args'] = json.dumps(cmd)
task = self._session.async_call_plugin(
'xenhost', 'iptables_config', args)
ret = self._session.wait_for_task(task)
ret = self._session.call_plugin('xenhost', 'iptables_config', args)
json_ret = json.loads(ret)
return (json_ret['out'], json_ret['err'])

View File

@ -21,7 +21,6 @@ Management class for host-related functions (start, reboot, etc).
import logging
import json
import random
from nova.compute import vm_states
from nova import context
@ -160,14 +159,12 @@ def call_xenhost(session, method, arg_dict):
out that behavior.
"""
# Create a task ID as something that won't match any instance ID
task_id = random.randint(-80000, -70000)
XenAPI = session.get_imported_xenapi()
try:
task = session.async_call_plugin("xenhost", method, args=arg_dict)
task_result = session.wait_for_task(task, str(task_id))
if not task_result:
task_result = json.dumps("")
return json.loads(task_result)
result = session.call_plugin('xenhost', method, args=arg_dict)
if not result:
return ''
return json.loads(result)
except ValueError:
LOG.exception(_("Unable to get updated status"))
return None

View File

@ -138,9 +138,7 @@ class ResourcePool(object):
'master_addr': self._host_addr,
'master_user': FLAGS.xenapi_connection_username,
'master_pass': FLAGS.xenapi_connection_password, }
task = self._session.async_call_plugin('xenhost',
'host_join', args)
self._session.wait_for_task(task)
self._session.call_plugin('xenhost', 'host_join', args)
except self.XenAPI.Failure as e:
LOG.error(_("Pool-Join failed: %(e)s") % locals())
raise exception.AggregateError(aggregate_id=aggregate_id,

View File

@ -426,8 +426,7 @@ class VMHelper(xenapi.HelperBase):
'properties': properties}
kwargs = {'params': pickle.dumps(params)}
task = session.async_call_plugin('glance', 'upload_vhd', kwargs)
session.wait_for_task(task, instance['uuid'])
session.call_plugin('glance', 'upload_vhd', kwargs)
@classmethod
def resize_disk(cls, session, vdi_ref, instance_type):
@ -584,9 +583,8 @@ class VMHelper(xenapi.HelperBase):
args = {}
args['cached-image'] = image
args['new-image-uuid'] = str(uuid.uuid4())
task = session.async_call_plugin('glance', "create_kernel_ramdisk",
filename = session.call_plugin('glance', 'create_kernel_ramdisk',
args)
filename = session.wait_for_task(task, instance.id)
if filename == "":
return cls.fetch_image(context, session, instance, image,
@ -689,7 +687,7 @@ class VMHelper(xenapi.HelperBase):
session, instance, image, image_type)
@classmethod
def _retry_glance_download_vhd(cls, context, session, instance, image):
def _retry_glance_download_vhd(cls, context, session, image):
# NOTE(sirp): The Glance plugin runs under Python 2.4
# which does not have the `uuid` module. To work around this,
# we generate the uuids here (under Python 2.6+) and
@ -713,9 +711,8 @@ class VMHelper(xenapi.HelperBase):
'attempt %(attempt_num)d/%(max_attempts)d '
'from %(glance_host)s:%(glance_port)s') % locals())
task = session.async_call_plugin('glance', 'download_vhd', kwargs)
try:
result = session.wait_for_task(task, instance['uuid'])
result = session.call_plugin('glance', 'download_vhd', kwargs)
return json.loads(result)
except cls.XenAPI.Failure as exc:
_type, _method, error = exc.details[:3]
@ -741,8 +738,7 @@ class VMHelper(xenapi.HelperBase):
% locals())
sr_ref = cls.safe_find_sr(session)
vdis = cls._retry_glance_download_vhd(context, session, instance,
image)
vdis = cls._retry_glance_download_vhd(context, session, image)
# 'download_vhd' will return a list of dictionaries describing VDIs.
# The dictionary will contain 'vdi_type' and 'vdi_uuid' keys.
@ -863,12 +859,13 @@ class VMHelper(xenapi.HelperBase):
fn = "copy_kernel_vdi"
args = {}
args['vdi-ref'] = vdi_ref
# Let the plugin copy the correct number of bytes.
args['image-size'] = str(vdi_size)
if FLAGS.cache_images:
args['cached-image'] = image
task = session.async_call_plugin('glance', fn, args)
filename = session.wait_for_task(task, instance['uuid'])
filename = session.call_plugin('glance', fn, args)
# Remove the VDI as it is not needed anymore.
session.call_xenapi("VDI.destroy", vdi_ref)
LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref)

View File

@ -706,10 +706,8 @@ class VMOps(object):
try:
_params = {'params': pickle.dumps(params)}
task = self._session.async_call_plugin('migration',
'transfer_vhd',
self._session.call_plugin('migration', 'transfer_vhd',
_params)
self._session.wait_for_task(task, instance_uuid)
except self.XenAPI.Failure:
msg = _("Failed to transfer vhd to new host")
raise exception.MigrationError(reason=msg)
@ -869,9 +867,8 @@ class VMOps(object):
else:
new_uuid = new_base_copy_uuid
task = self._session.async_call_plugin('migration',
'move_vhds_into_sr', {'params': pickle.dumps(params)})
self._session.wait_for_task(task, instance['uuid'])
self._session.call_plugin('migration', 'move_vhds_into_sr',
{'params': pickle.dumps(params)})
# Now we rescan the SR so we find the VHDs
VMHelper.scan_default_sr(self._session)
@ -1128,9 +1125,7 @@ class VMOps(object):
args['kernel-file'] = kernel
if ramdisk:
args['ramdisk-file'] = ramdisk
task = self._session.async_call_plugin(
'glance', 'remove_kernel_ramdisk', args)
self._session.wait_for_task(task)
self._session.call_plugin('glance', 'remove_kernel_ramdisk', args)
def _destroy_kernel_ramdisk(self, instance, vm_ref):
"""Three situations can occur:
@ -1615,10 +1610,8 @@ class VMOps(object):
args = {'dom_id': vm_rec['domid'], 'path': path}
args.update(addl_args or {})
try:
task = self._session.async_call_plugin(plugin, method, args)
ret = self._session.wait_for_task(task, instance_uuid)
return self._session.call_plugin(plugin, method, args)
except self.XenAPI.Failure, e:
ret = None
err_msg = e.details[-1].splitlines()[-1]
if 'TIMEOUT:' in err_msg:
LOG.error(_('TIMEOUT: The call to %(method)s timed out. '
@ -1633,7 +1626,7 @@ class VMOps(object):
LOG.error(_('The call to %(method)s returned an error: %(e)s. '
'VM id=%(instance_uuid)s; args=%(args)r') % locals())
return {'returncode': 'error', 'message': err_msg}
return ret
return None
def add_to_xenstore(self, vm, path, key, value):
"""

View File

@ -24,17 +24,6 @@ All XenAPI calls are on a green thread (using eventlet's "tpool"
thread pool). They are remote calls, and so may hang for the usual
reasons.
All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
(using XenAPI.VM.async_start etc). These return a task, which can then be
polled for completion.
This combination of techniques means that we don't block the main thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.
FIXME: get_info currently doesn't conform to these rules, and will block the
reactor thread if the VM.get_by_name_label or VM.get_record calls block.
**Related Flags**
:xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform.
@ -42,9 +31,6 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
Platform (default: root).
:xenapi_connection_password: Password for connection to XenServer/Xen Cloud
Platform.
:xenapi_task_poll_interval: The interval (seconds) used for polling of
remote tasks (Async.VM.start, etc)
(default: 0.5).
:target_host: the iSCSI Target Host IP address, i.e. the IP
address for the nova-volume host
:target_port: iSCSI Target Port, 3260 Default
@ -99,11 +85,6 @@ xenapi_opts = [
default=5,
help='Maximum number of concurrent XenAPI connections. '
'Used only if connection_type=xenapi.'),
cfg.FloatOpt('xenapi_task_poll_interval',
default=0.5,
help='The interval used for polling of remote tasks '
'(Async.VM.start, etc). '
'Used only if connection_type=xenapi.'),
cfg.FloatOpt('xenapi_vhd_coalesce_poll_interval',
default=5.0,
help='The interval used for polling of coalescing vhds. '
@ -121,9 +102,6 @@ xenapi_opts = [
cfg.StrOpt('xenapi_sr_base_path',
default='/var/run/sr-mount',
help='Base path to the storage repository'),
cfg.BoolOpt('xenapi_log_instance_actions',
default=False,
help='Log all instance calls to XenAPI in the database.'),
cfg.StrOpt('target_host',
default=None,
help='iSCSI Target Host'),
@ -604,62 +582,23 @@ class XenAPISession(object):
f = session.xenapi_request
return tpool.execute(f, method, *args)
def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread."""
def call_plugin(self, plugin, fn, args):
"""Call host.call_plugin on a background thread."""
# NOTE(johannes): Fetch host before we acquire a session. Since
# _get_session() acquires a session too, it can result in a deadlock
# if multiple greenthreads race with each other. See bug 924918
# get_xenapi_host() acquires a session too, it can result in a
# deadlock if multiple greenthreads race with each other. See
# bug 924918
host = self.get_xenapi_host()
# NOTE(armando): pass the host uuid along with the args so that
# the plugin gets executed on the right host when using XS pools
args['host_uuid'] = self.host_uuid
with self._get_session() as session:
return tpool.execute(self._unwrap_plugin_exceptions,
session.xenapi.Async.host.call_plugin,
session.xenapi.host.call_plugin,
host, plugin, fn, args)
def wait_for_task(self, task, uuid=None):
"""Return the result of the given task. The task is polled
until it completes."""
while True:
"""Poll the given XenAPI task, and return the result if the
action was completed successfully or not.
"""
ctxt = context.get_admin_context()
name = self.call_xenapi("task.get_name_label", task)
status = self.call_xenapi("task.get_status", task)
# Ensure action is never > 255
action = dict(action=name[:255], error=None)
log_instance_actions = (FLAGS.xenapi_log_instance_actions and
uuid)
if log_instance_actions:
action["instance_uuid"] = uuid
if status == "pending":
pass
elif status == "success":
result = self.call_xenapi("task.get_result", task)
LOG.info(_("Task [%(name)s] %(task)s status:"
" success %(result)s") % locals())
if log_instance_actions:
db.instance_action_create(ctxt, action)
return _parse_xmlrpc_value(result)
else:
error_info = self.call_xenapi("task.get_error_info", task)
LOG.warn(_("Task [%(name)s] %(task)s status:"
" %(status)s %(error_info)s") % locals())
if log_instance_actions:
action["error"] = str(error_info)
db.instance_action_create(ctxt, action)
raise self.XenAPI.Failure(error_info)
greenthread.sleep(FLAGS.xenapi_task_poll_interval)
def _create_session(self, url):
"""Stubout point. This can be replaced with a mock session."""
return self.XenAPI.Session(url)