From 627d9e99481db537fe71639dd38daf205a9e0728 Mon Sep 17 00:00:00 2001 From: Johannes Erdfelt Date: Fri, 9 Mar 2012 16:20:01 +0000 Subject: [PATCH] Switch all xenapi async plugin calls to be sync Originally all calls were required to be async because of integration with twisted, but that has long since been changed and just using the synchronous calls is easier and less code. Change-Id: Ib86fc721326de34fd71d68bbec42ecd65280aa8a --- nova/tests/xenapi/stubs.py | 1 - nova/virt/xenapi/firewall.py | 4 +- nova/virt/xenapi/host.py | 11 ++---- nova/virt/xenapi/pool.py | 4 +- nova/virt/xenapi/vm_utils.py | 21 +++++----- nova/virt/xenapi/vmops.py | 21 ++++------ nova/virt/xenapi_conn.py | 77 ++++-------------------------------- 7 files changed, 30 insertions(+), 109 deletions(-) diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 77557276966a..e6bf9dd4bffc 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -412,7 +412,6 @@ def stub_out_migration_methods(stubs): stubs.Set(vm_utils.VMHelper, 'scan_sr', fake_sr) stubs.Set(vmops.VMOps, '_create_snapshot', fake_create_snapshot) stubs.Set(vm_utils.VMHelper, 'get_vdi_for_vm_safely', fake_get_vdi) - stubs.Set(xenapi_conn.XenAPISession, 'wait_for_task', lambda x, y, z: None) stubs.Set(vm_utils.VMHelper, 'get_sr_path', fake_get_sr_path) stubs.Set(vmops.VMOps, 'reset_network', fake_reset_network) stubs.Set(vmops.VMOps, '_shutdown', fake_shutdown) diff --git a/nova/virt/xenapi/firewall.py b/nova/virt/xenapi/firewall.py index 3a3ce779c861..506ffcb2e763 100644 --- a/nova/virt/xenapi/firewall.py +++ b/nova/virt/xenapi/firewall.py @@ -50,9 +50,7 @@ class Dom0IptablesFirewallDriver(firewall.IptablesFirewallDriver): args = {} args.update(map(lambda x: (x, str(kwargs[x])), kwargs)) args['cmd_args'] = json.dumps(cmd) - task = self._session.async_call_plugin( - 'xenhost', 'iptables_config', args) - ret = self._session.wait_for_task(task) + ret = self._session.call_plugin('xenhost', 'iptables_config', args) json_ret = json.loads(ret) return (json_ret['out'], json_ret['err']) diff --git a/nova/virt/xenapi/host.py b/nova/virt/xenapi/host.py index 8de7a722a36c..141700500d02 100644 --- a/nova/virt/xenapi/host.py +++ b/nova/virt/xenapi/host.py @@ -21,7 +21,6 @@ Management class for host-related functions (start, reboot, etc). import logging import json -import random from nova.compute import vm_states from nova import context @@ -160,14 +159,12 @@ def call_xenhost(session, method, arg_dict): out that behavior. """ # Create a task ID as something that won't match any instance ID - task_id = random.randint(-80000, -70000) XenAPI = session.get_imported_xenapi() try: - task = session.async_call_plugin("xenhost", method, args=arg_dict) - task_result = session.wait_for_task(task, str(task_id)) - if not task_result: - task_result = json.dumps("") - return json.loads(task_result) + result = session.call_plugin('xenhost', method, args=arg_dict) + if not result: + return '' + return json.loads(result) except ValueError: LOG.exception(_("Unable to get updated status")) return None diff --git a/nova/virt/xenapi/pool.py b/nova/virt/xenapi/pool.py index 6635d4c19361..a5157fd0afb9 100644 --- a/nova/virt/xenapi/pool.py +++ b/nova/virt/xenapi/pool.py @@ -138,9 +138,7 @@ class ResourcePool(object): 'master_addr': self._host_addr, 'master_user': FLAGS.xenapi_connection_username, 'master_pass': FLAGS.xenapi_connection_password, } - task = self._session.async_call_plugin('xenhost', - 'host_join', args) - self._session.wait_for_task(task) + self._session.call_plugin('xenhost', 'host_join', args) except self.XenAPI.Failure as e: LOG.error(_("Pool-Join failed: %(e)s") % locals()) raise exception.AggregateError(aggregate_id=aggregate_id, diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index a929449b48d5..da4c3ca88e25 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -426,8 +426,7 @@ class VMHelper(xenapi.HelperBase): 'properties': properties} kwargs = {'params': pickle.dumps(params)} - task = session.async_call_plugin('glance', 'upload_vhd', kwargs) - session.wait_for_task(task, instance['uuid']) + session.call_plugin('glance', 'upload_vhd', kwargs) @classmethod def resize_disk(cls, session, vdi_ref, instance_type): @@ -584,9 +583,8 @@ class VMHelper(xenapi.HelperBase): args = {} args['cached-image'] = image args['new-image-uuid'] = str(uuid.uuid4()) - task = session.async_call_plugin('glance', "create_kernel_ramdisk", - args) - filename = session.wait_for_task(task, instance.id) + filename = session.call_plugin('glance', 'create_kernel_ramdisk', + args) if filename == "": return cls.fetch_image(context, session, instance, image, @@ -689,7 +687,7 @@ class VMHelper(xenapi.HelperBase): session, instance, image, image_type) @classmethod - def _retry_glance_download_vhd(cls, context, session, instance, image): + def _retry_glance_download_vhd(cls, context, session, image): # NOTE(sirp): The Glance plugin runs under Python 2.4 # which does not have the `uuid` module. To work around this, # we generate the uuids here (under Python 2.6+) and @@ -713,9 +711,8 @@ class VMHelper(xenapi.HelperBase): 'attempt %(attempt_num)d/%(max_attempts)d ' 'from %(glance_host)s:%(glance_port)s') % locals()) - task = session.async_call_plugin('glance', 'download_vhd', kwargs) try: - result = session.wait_for_task(task, instance['uuid']) + result = session.call_plugin('glance', 'download_vhd', kwargs) return json.loads(result) except cls.XenAPI.Failure as exc: _type, _method, error = exc.details[:3] @@ -741,8 +738,7 @@ class VMHelper(xenapi.HelperBase): % locals()) sr_ref = cls.safe_find_sr(session) - vdis = cls._retry_glance_download_vhd(context, session, instance, - image) + vdis = cls._retry_glance_download_vhd(context, session, image) # 'download_vhd' will return a list of dictionaries describing VDIs. # The dictionary will contain 'vdi_type' and 'vdi_uuid' keys. @@ -863,12 +859,13 @@ class VMHelper(xenapi.HelperBase): fn = "copy_kernel_vdi" args = {} args['vdi-ref'] = vdi_ref + # Let the plugin copy the correct number of bytes. args['image-size'] = str(vdi_size) if FLAGS.cache_images: args['cached-image'] = image - task = session.async_call_plugin('glance', fn, args) - filename = session.wait_for_task(task, instance['uuid']) + filename = session.call_plugin('glance', fn, args) + # Remove the VDI as it is not needed anymore. session.call_xenapi("VDI.destroy", vdi_ref) LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref) diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index 8669fbd09760..aada50965466 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -706,10 +706,8 @@ class VMOps(object): try: _params = {'params': pickle.dumps(params)} - task = self._session.async_call_plugin('migration', - 'transfer_vhd', - _params) - self._session.wait_for_task(task, instance_uuid) + self._session.call_plugin('migration', 'transfer_vhd', + _params) except self.XenAPI.Failure: msg = _("Failed to transfer vhd to new host") raise exception.MigrationError(reason=msg) @@ -869,9 +867,8 @@ class VMOps(object): else: new_uuid = new_base_copy_uuid - task = self._session.async_call_plugin('migration', - 'move_vhds_into_sr', {'params': pickle.dumps(params)}) - self._session.wait_for_task(task, instance['uuid']) + self._session.call_plugin('migration', 'move_vhds_into_sr', + {'params': pickle.dumps(params)}) # Now we rescan the SR so we find the VHDs VMHelper.scan_default_sr(self._session) @@ -1128,9 +1125,7 @@ class VMOps(object): args['kernel-file'] = kernel if ramdisk: args['ramdisk-file'] = ramdisk - task = self._session.async_call_plugin( - 'glance', 'remove_kernel_ramdisk', args) - self._session.wait_for_task(task) + self._session.call_plugin('glance', 'remove_kernel_ramdisk', args) def _destroy_kernel_ramdisk(self, instance, vm_ref): """Three situations can occur: @@ -1615,10 +1610,8 @@ class VMOps(object): args = {'dom_id': vm_rec['domid'], 'path': path} args.update(addl_args or {}) try: - task = self._session.async_call_plugin(plugin, method, args) - ret = self._session.wait_for_task(task, instance_uuid) + return self._session.call_plugin(plugin, method, args) except self.XenAPI.Failure, e: - ret = None err_msg = e.details[-1].splitlines()[-1] if 'TIMEOUT:' in err_msg: LOG.error(_('TIMEOUT: The call to %(method)s timed out. ' @@ -1633,7 +1626,7 @@ class VMOps(object): LOG.error(_('The call to %(method)s returned an error: %(e)s. ' 'VM id=%(instance_uuid)s; args=%(args)r') % locals()) return {'returncode': 'error', 'message': err_msg} - return ret + return None def add_to_xenstore(self, vm, path, key, value): """ diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index d6d05e8f874c..54eb886c55d4 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -24,17 +24,6 @@ All XenAPI calls are on a green thread (using eventlet's "tpool" thread pool). They are remote calls, and so may hang for the usual reasons. -All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async -(using XenAPI.VM.async_start etc). These return a task, which can then be -polled for completion. - -This combination of techniques means that we don't block the main thread at -all, and at the same time we don't hold lots of threads waiting for -long-running operations. - -FIXME: get_info currently doesn't conform to these rules, and will block the -reactor thread if the VM.get_by_name_label or VM.get_record calls block. - **Related Flags** :xenapi_connection_url: URL for connection to XenServer/Xen Cloud Platform. @@ -42,9 +31,6 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. Platform (default: root). :xenapi_connection_password: Password for connection to XenServer/Xen Cloud Platform. -:xenapi_task_poll_interval: The interval (seconds) used for polling of - remote tasks (Async.VM.start, etc) - (default: 0.5). :target_host: the iSCSI Target Host IP address, i.e. the IP address for the nova-volume host :target_port: iSCSI Target Port, 3260 Default @@ -99,11 +85,6 @@ xenapi_opts = [ default=5, help='Maximum number of concurrent XenAPI connections. ' 'Used only if connection_type=xenapi.'), - cfg.FloatOpt('xenapi_task_poll_interval', - default=0.5, - help='The interval used for polling of remote tasks ' - '(Async.VM.start, etc). ' - 'Used only if connection_type=xenapi.'), cfg.FloatOpt('xenapi_vhd_coalesce_poll_interval', default=5.0, help='The interval used for polling of coalescing vhds. ' @@ -121,9 +102,6 @@ xenapi_opts = [ cfg.StrOpt('xenapi_sr_base_path', default='/var/run/sr-mount', help='Base path to the storage repository'), - cfg.BoolOpt('xenapi_log_instance_actions', - default=False, - help='Log all instance calls to XenAPI in the database.'), cfg.StrOpt('target_host', default=None, help='iSCSI Target Host'), @@ -604,62 +582,23 @@ class XenAPISession(object): f = session.xenapi_request return tpool.execute(f, method, *args) - def async_call_plugin(self, plugin, fn, args): - """Call Async.host.call_plugin on a background thread.""" + def call_plugin(self, plugin, fn, args): + """Call host.call_plugin on a background thread.""" # NOTE(johannes): Fetch host before we acquire a session. Since - # _get_session() acquires a session too, it can result in a deadlock - # if multiple greenthreads race with each other. See bug 924918 + # get_xenapi_host() acquires a session too, it can result in a + # deadlock if multiple greenthreads race with each other. See + # bug 924918 host = self.get_xenapi_host() + # NOTE(armando): pass the host uuid along with the args so that # the plugin gets executed on the right host when using XS pools args['host_uuid'] = self.host_uuid + with self._get_session() as session: return tpool.execute(self._unwrap_plugin_exceptions, - session.xenapi.Async.host.call_plugin, + session.xenapi.host.call_plugin, host, plugin, fn, args) - def wait_for_task(self, task, uuid=None): - """Return the result of the given task. The task is polled - until it completes.""" - while True: - """Poll the given XenAPI task, and return the result if the - action was completed successfully or not. - """ - ctxt = context.get_admin_context() - name = self.call_xenapi("task.get_name_label", task) - status = self.call_xenapi("task.get_status", task) - - # Ensure action is never > 255 - action = dict(action=name[:255], error=None) - log_instance_actions = (FLAGS.xenapi_log_instance_actions and - uuid) - if log_instance_actions: - action["instance_uuid"] = uuid - - if status == "pending": - pass - elif status == "success": - result = self.call_xenapi("task.get_result", task) - LOG.info(_("Task [%(name)s] %(task)s status:" - " success %(result)s") % locals()) - - if log_instance_actions: - db.instance_action_create(ctxt, action) - - return _parse_xmlrpc_value(result) - else: - error_info = self.call_xenapi("task.get_error_info", task) - LOG.warn(_("Task [%(name)s] %(task)s status:" - " %(status)s %(error_info)s") % locals()) - - if log_instance_actions: - action["error"] = str(error_info) - db.instance_action_create(ctxt, action) - - raise self.XenAPI.Failure(error_info) - - greenthread.sleep(FLAGS.xenapi_task_poll_interval) - def _create_session(self, url): """Stubout point. This can be replaced with a mock session.""" return self.XenAPI.Session(url)