From 97cfccc2ce35b29fd98809b6c7ecdd86c56f76e3 Mon Sep 17 00:00:00 2001 From: Johannes Erdfelt Date: Mon, 24 Oct 2011 21:55:12 +0000 Subject: [PATCH] Fix concurrency of XenAPI sessions Fixes bug 879044 Nova currently does not serialize access to the XenAPI session which can result in multiple (green)threads trying to use the same HTTP connection. This will typically only affect Python 2.7 which has updated xmlrpclib to try to use one HTTP connection for multiple requests. Change-Id: I101d63b822c8bf8c28674a836e4b54aa8259e1a8 --- nova/tests/test_xenapi.py | 7 ++- nova/tests/xenapi/stubs.py | 4 +- nova/virt/xenapi/vm_utils.py | 80 ++++++++++++++++---------------- nova/virt/xenapi/vmops.py | 44 +++++++++--------- nova/virt/xenapi/volume_utils.py | 20 ++++---- nova/virt/xenapi_conn.py | 60 +++++++++++++++--------- 6 files changed, 118 insertions(+), 97 deletions(-) diff --git a/nova/tests/test_xenapi.py b/nova/tests/test_xenapi.py index c2aa0a3738f9..e2ed14495e35 100644 --- a/nova/tests/test_xenapi.py +++ b/nova/tests/test_xenapi.py @@ -1072,8 +1072,11 @@ class FakeSession(object): 'free-computed': 40} return json.dumps({'host_memory': vm}) - def get_xenapi(self): - return FakeXenApi() + def call_xenapi(self, method, *args): + f = FakeXenApi() + for m in method.split('.'): + f = getattr(f, m) + return f(*args) class HostStateTestCase(test.TestCase): diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index f44d96d20fdf..c79bda6823d4 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -38,7 +38,7 @@ def stubout_instance_snapshot(stubs): sr_ref = "fakesr" vdi_ref = create_vdi(name_label=name_label, read_only=False, sr_ref=sr_ref, sharable=False) - vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) + vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref) vdi_uuid = vdi_rec['uuid'] return [dict(vdi_type='os', vdi_uuid=vdi_uuid)] @@ -307,7 +307,7 @@ def stub_out_migration_methods(stubs): def fake_get_vdi(cls, session, vm_ref): vdi_ref = fake.create_vdi(name_label='derp', read_only=False, sr_ref='herp', sharable=False) - vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) + vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref) return vdi_ref, {'uuid': vdi_rec['uuid'], } def fake_shutdown(self, inst, vm, hard=True): diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 9a1ceef4eb3c..640c74ab27a8 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -205,8 +205,8 @@ class VMHelper(HelperBase): mem = long(instance_type['memory_mb']) * 1024 * 1024 #get free memory from host host = session.get_xenapi_host() - host_free_mem = long(session.get_xenapi().host. - compute_free_memory(host)) + host_free_mem = long(session.call_xenapi("host.compute_free_memory", + host)) return host_free_mem >= mem @classmethod @@ -260,11 +260,11 @@ class VMHelper(HelperBase): @classmethod def find_vbd_by_number(cls, session, vm_ref, number): """Get the VBD reference from the device number""" - vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) + vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref) if vbd_refs: for vbd_ref in vbd_refs: try: - vbd_rec = session.get_xenapi().VBD.get_record(vbd_ref) + vbd_rec = session.call_xenapi("VBD.get_record", vbd_ref) if vbd_rec['userdevice'] == str(number): return vbd_ref except cls.XenAPI.Failure, exc: @@ -303,7 +303,7 @@ class VMHelper(HelperBase): @classmethod def create_vdi(cls, session, sr_ref, name_label, virtual_size, read_only): """Create a VDI record and returns its reference.""" - vdi_ref = session.get_xenapi().VDI.create( + vdi_ref = session.call_xenapi("VDI.create", {'name_label': name_label, 'name_description': '', 'SR': sr_ref, @@ -322,18 +322,18 @@ class VMHelper(HelperBase): @classmethod def set_vdi_name_label(cls, session, vdi_uuid, name_label): - vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid) - session.get_xenapi().VDI.set_name_label(vdi_ref, name_label) + vdi_ref = session.call_xenapi("VDI.get_by_uuid", vdi_uuid) + session.call_xenapi("VDI.set_name_label", vdi_ref, name_label) @classmethod def get_vdi_for_vm_safely(cls, session, vm_ref): """Retrieves the primary VDI for a VM""" - vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) + vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref) for vbd in vbd_refs: - vbd_rec = session.get_xenapi().VBD.get_record(vbd) + vbd_rec = session.call_xenapi("VBD.get_record", vbd) # Convention dictates the primary VDI will be userdevice 0 if vbd_rec['userdevice'] == '0': - vdi_rec = session.get_xenapi().VDI.get_record(vbd_rec['VDI']) + vdi_rec = session.call_xenapi("VDI.get_record", vbd_rec['VDI']) return vbd_rec['VDI'], vdi_rec raise exception.Error(_("No primary VDI found for" "%(vm_ref)s") % locals()) @@ -377,7 +377,7 @@ class VMHelper(HelperBase): snapshots or by restoring an image in the DISK_VHD format. """ sr_ref = safe_find_sr(session) - sr_rec = session.get_xenapi().SR.get_record(sr_ref) + sr_rec = session.call_xenapi("SR.get_record", sr_ref) sr_uuid = sr_rec["uuid"] return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid) @@ -602,9 +602,9 @@ w os_vdi_uuid = vdis[0]['vdi_uuid'] # Set the name-label to ease debugging - vdi_ref = session.get_xenapi().VDI.get_by_uuid(os_vdi_uuid) + vdi_ref = session.call_xenapi("VDI.get_by_uuid", os_vdi_uuid) primary_name_label = instance.name - session.get_xenapi().VDI.set_name_label(vdi_ref, primary_name_label) + session.call_xenapi("VDI.set_name_label", vdi_ref, primary_name_label) cls._check_vdi_size(context, session, instance, os_vdi_uuid) return vdis @@ -696,7 +696,7 @@ w # If anything goes wrong, we need to remember its uuid. try: filename = None - vdi_uuid = session.get_xenapi().VDI.get_uuid(vdi_ref) + vdi_uuid = session.call_xenapi("VDI.get_uuid", vdi_ref) with vdi_attached_here(session, vdi_ref, read_only=False) as dev: _stream_disk(dev, image_type, virtual_size, image_file) @@ -713,7 +713,7 @@ w task = session.async_call_plugin('glance', fn, args) filename = session.wait_for_task(task, instance_id) # Remove the VDI as it is not needed anymore. - session.get_xenapi().VDI.destroy(vdi_ref) + session.call_xenapi("VDI.destroy", vdi_ref) LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref) return [dict(vdi_type=ImageType.to_string(image_type), vdi_uuid=None, @@ -828,12 +828,12 @@ w @classmethod def set_vm_name_label(cls, session, vm_ref, name_label): - session.get_xenapi().VM.set_name_label(vm_ref, name_label) + session.call_xenapi("VM.set_name_label", vm_ref, name_label) @classmethod def lookup(cls, session, name_label): """Look the instance up and return it if available""" - vm_refs = session.get_xenapi().VM.get_by_name_label(name_label) + vm_refs = session.call_xenapi("VM.get_by_name_label", name_label) n = len(vm_refs) if n == 0: return None @@ -847,14 +847,14 @@ w """Look for the VDIs that are attached to the VM""" # Firstly we get the VBDs, then the VDIs. # TODO(Armando): do we leave the read-only devices? - vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) + vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref) vdi_refs = [] if vbd_refs: for vbd_ref in vbd_refs: try: - vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) + vdi_ref = session.call_xenapi("VBD.get_VDI", vbd_ref) # Test valid VDI - record = session.get_xenapi().VDI.get_record(vdi_ref) + record = session.call_xenapi("VDI.get_record", vdi_ref) LOG.debug(_('VDI %s is still available'), record['uuid']) except cls.XenAPI.Failure, exc: LOG.exception(exc) @@ -884,7 +884,7 @@ w @classmethod def lookup_kernel_ramdisk(cls, session, vm): - vm_rec = session.get_xenapi().VM.get_record(vm) + vm_rec = session.call_xenapi("VM.get_record", vm) if 'PV_kernel' in vm_rec and 'PV_ramdisk' in vm_rec: return (vm_rec['PV_kernel'], vm_rec['PV_ramdisk']) else: @@ -908,7 +908,7 @@ w """Compile VM diagnostics data""" try: host = session.get_xenapi_host() - host_ip = session.get_xenapi().host.get_record(host)["address"] + host_ip = session.call_xenapi("host.get_record", host)["address"] except (cls.XenAPI.Failure, KeyError) as e: return {"Unable to retrieve diagnostics": e} @@ -936,7 +936,7 @@ w start_time = int(start_time) try: host = session.get_xenapi_host() - host_ip = session.get_xenapi().host.get_record(host)["address"] + host_ip = session.call_xenapi("host.get_record", host)["address"] except (cls.XenAPI.Failure, KeyError) as e: raise exception.CouldNotFetchMetrics() @@ -1066,8 +1066,8 @@ def get_vhd_parent(session, vdi_rec): """ if 'vhd-parent' in vdi_rec['sm_config']: parent_uuid = vdi_rec['sm_config']['vhd-parent'] - parent_ref = session.get_xenapi().VDI.get_by_uuid(parent_uuid) - parent_rec = session.get_xenapi().VDI.get_record(parent_ref) + parent_ref = session.call_xenapi("VDI.get_by_uuid", parent_uuid) + parent_rec = session.call_xenapi("VDI.get_record", parent_ref) vdi_uuid = vdi_rec['uuid'] LOG.debug(_("VHD %(vdi_uuid)s has parent %(parent_ref)s") % locals()) return parent_ref, parent_rec @@ -1076,7 +1076,7 @@ def get_vhd_parent(session, vdi_rec): def get_vhd_parent_uuid(session, vdi_ref): - vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) + vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref) ret = get_vhd_parent(session, vdi_rec) if ret: parent_ref, parent_rec = ret @@ -1089,8 +1089,8 @@ def walk_vdi_chain(session, vdi_uuid): """Yield vdi_recs for each element in a VDI chain""" # TODO(jk0): perhaps make get_vhd_parent use this while True: - vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid) - vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) + vdi_ref = session.call_xenapi("VDI.get_by_uuid", vdi_uuid) + vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref) yield vdi_rec parent_uuid = vdi_rec['sm_config'].get('vhd-parent') @@ -1153,14 +1153,14 @@ def safe_find_sr(session): def find_sr(session): """Return the storage repository to hold VM images""" host = session.get_xenapi_host() - sr_refs = session.get_xenapi().SR.get_all() + sr_refs = session.call_xenapi("SR.get_all") for sr_ref in sr_refs: - sr_rec = session.get_xenapi().SR.get_record(sr_ref) + sr_rec = session.call_xenapi("SR.get_record", sr_ref) if not ('i18n-key' in sr_rec['other_config'] and sr_rec['other_config']['i18n-key'] == 'local-storage'): continue for pbd_ref in sr_rec['PBDs']: - pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref) + pbd_rec = session.call_xenapi("PBD.get_record", pbd_ref) if pbd_rec['host'] == host: return sr_ref return None @@ -1179,9 +1179,9 @@ def safe_find_iso_sr(session): def find_iso_sr(session): """Return the storage repository to hold ISO images""" host = session.get_xenapi_host() - sr_refs = session.get_xenapi().SR.get_all() + sr_refs = session.call_xenapi("SR.get_all") for sr_ref in sr_refs: - sr_rec = session.get_xenapi().SR.get_record(sr_ref) + sr_rec = session.call_xenapi("SR.get_record", sr_ref) LOG.debug(_("ISO: looking at SR %(sr_rec)s") % locals()) if not sr_rec['content_type'] == 'iso': @@ -1198,7 +1198,7 @@ def find_iso_sr(session): LOG.debug(_("ISO: SR MATCHing our criteria")) for pbd_ref in sr_rec['PBDs']: LOG.debug(_("ISO: ISO, looking to see if it is host local")) - pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref) + pbd_rec = session.call_xenapi("PBD.get_record", pbd_ref) pbd_rec_host = pbd_rec['host'] LOG.debug(_("ISO: PBD matching, want %(pbd_rec)s, have %(host)s") % locals()) @@ -1257,14 +1257,14 @@ def vdi_attached_here(session, vdi_ref, read_only=False): vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_supported_algorithms'] = [] LOG.debug(_('Creating VBD for VDI %s ... '), vdi_ref) - vbd_ref = session.get_xenapi().VBD.create(vbd_rec) + vbd_ref = session.call_xenapi("VBD.create", vbd_rec) LOG.debug(_('Creating VBD for VDI %s done.'), vdi_ref) try: LOG.debug(_('Plugging VBD %s ... '), vbd_ref) - session.get_xenapi().VBD.plug(vbd_ref) + session.call_xenapi("VBD.plug", vbd_ref) try: LOG.debug(_('Plugging VBD %s done.'), vbd_ref) - orig_dev = session.get_xenapi().VBD.get_device(vbd_ref) + orig_dev = session.call_xenapi("VBD.get_device", vbd_ref) LOG.debug(_('VBD %(vbd_ref)s plugged as %(orig_dev)s') % locals()) dev = remap_vbd_dev(orig_dev) if dev != orig_dev: @@ -1280,7 +1280,7 @@ def vdi_attached_here(session, vdi_ref, read_only=False): LOG.debug(_('Destroying VBD for VDI %s ... '), vdi_ref) vbd_unplug_with_retry(session, vbd_ref) finally: - ignore_failure(session.get_xenapi().VBD.destroy, vbd_ref) + ignore_failure(session.call_xenapi, "VBD.destroy", vbd_ref) LOG.debug(_('Destroying VBD for VDI %s done.'), vdi_ref) @@ -1292,7 +1292,7 @@ def vbd_unplug_with_retry(session, vbd_ref): # FIXME(sirp): We can use LoopingCall here w/o blocking sleep() while True: try: - session.get_xenapi().VBD.unplug(vbd_ref) + session.call_xenapi("VBD.unplug", vbd_ref) LOG.debug(_('VBD.unplug successful first time.')) return except VMHelper.XenAPI.Failure, e: @@ -1325,7 +1325,7 @@ def get_this_vm_uuid(): def get_this_vm_ref(session): - return session.get_xenapi().VM.get_by_uuid(get_this_vm_uuid()) + return session.call_xenapi("VM.get_by_uuid", get_this_vm_uuid()) def _is_vdi_pv(dev): diff --git a/nova/virt/xenapi/vmops.py b/nova/virt/xenapi/vmops.py index e96853845da5..c7b519887ca3 100644 --- a/nova/virt/xenapi/vmops.py +++ b/nova/virt/xenapi/vmops.py @@ -97,8 +97,8 @@ class VMOps(object): # TODO(justinsb): Should we just always use the details method? # Seems to be the same number of API calls.. vm_refs = [] - for vm_ref in self._session.get_xenapi().VM.get_all(): - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + for vm_ref in self._session.call_xenapi("VM.get_all"): + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: vm_refs.append(vm_rec["name_label"]) return vm_refs @@ -106,8 +106,8 @@ class VMOps(object): def list_instances_detail(self): """List VM instances, returning InstanceInfo objects.""" instance_infos = [] - for vm_ref in self._session.get_xenapi().VM.get_all(): - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + for vm_ref in self._session.call_xenapi("VM.get_all"): + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: name = vm_rec["name_label"] @@ -519,7 +519,7 @@ class VMOps(object): obj = None try: # check for opaque ref - obj = self._session.get_xenapi().VM.get_uuid(instance_or_vm) + obj = self._session.call_xenapi("VM.get_uuid", instance_or_vm) return instance_or_vm except self.XenAPI.Failure: # wasn't an opaque ref, can be an instance name @@ -788,7 +788,7 @@ class VMOps(object): return resp['message'].replace('\\r\\n', '') vm_ref = self._get_vm_opaque_ref(instance) - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) domid = vm_rec['domid'] @@ -798,7 +798,7 @@ class VMOps(object): if ret: return ret - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) if vm_rec['domid'] != domid: LOG.info(_('domid changed from %(olddomid)s to ' '%(newdomid)s') % { @@ -913,8 +913,8 @@ class VMOps(object): We use the second VBD here because swap is first with the root file system coming in second.""" - vbd_ref = self._session.get_xenapi().VM.get_VBDs(vm_ref)[1] - vdi_ref = self._session.get_xenapi().VBD.get_record(vbd_ref)["VDI"] + vbd_ref = self._session.call_xenapi("VM.get_VBDs", vm_ref)[1] + vdi_ref = self._session.call_xenapi("VBD.get_record", vbd_ref)["VDI"] return VMHelper.create_vbd(self._session, rescue_vm_ref, vdi_ref, 1, False) @@ -951,9 +951,9 @@ class VMOps(object): def _destroy_rescue_vbds(self, rescue_vm_ref): """Destroys all VBDs tied to a rescue VM.""" - vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref) + vbd_refs = self._session.call_xenapi("VM.get_VBDs", rescue_vm_ref) for vbd_ref in vbd_refs: - vbd_rec = self._session.get_xenapi().VBD.get_record(vbd_ref) + vbd_rec = self._session.call_xenapi("VBD.get_record", vbd_ref) if vbd_rec.get("userdevice", None) == "1": # VBD is always 1 VMHelper.unplug_vbd(self._session, vbd_ref) VMHelper.destroy_vbd(self._session, vbd_ref) @@ -1147,14 +1147,14 @@ class VMOps(object): def _cancel_stale_tasks(self, timeout, task): """Cancel the given tasks that are older than the given timeout.""" - task_refs = self._session.get_xenapi().task.get_by_name_label(task) + task_refs = self._session.call_xenapi("task.get_by_name_label", task) for task_ref in task_refs: - task_rec = self._session.get_xenapi().task.get_record(task_ref) + task_rec = self._session.call_xenapi("task.get_record", task_ref) task_created = utils.parse_strtime(task_rec["created"].value, "%Y%m%dT%H:%M:%SZ") if utils.is_older_than(task_created, timeout): - self._session.get_xenapi().task.cancel(task_ref) + self._session.call_xenapi("task.cancel", task_ref) def poll_rebooting_instances(self, timeout): """Look for expirable rebooting instances. @@ -1243,13 +1243,13 @@ class VMOps(object): def get_info(self, instance): """Return data about VM instance.""" vm_ref = self._get_vm_opaque_ref(instance) - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) return VMHelper.compile_info(vm_rec) def get_diagnostics(self, instance): """Return data about VM diagnostics.""" vm_ref = self._get_vm_opaque_ref(instance) - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) return VMHelper.compile_diagnostics(self._session, vm_rec) def get_all_bw_usage(self, start_time, stop_time=None): @@ -1264,10 +1264,10 @@ class VMOps(object): exc_info=sys.exc_info()) bw = {} for uuid, data in metrics.iteritems(): - vm_ref = self._session.get_xenapi().VM.get_by_uuid(uuid) - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + vm_ref = self._session.call_xenapi("VM.get_by_uuid", uuid) + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) vif_map = {} - for vif in [self._session.get_xenapi().VIF.get_record(vrec) + for vif in [self._session.call_xenapi("VIF.get_record", vrec) for vrec in vm_rec['VIFs']]: vif_map[vif['device']] = vif['MAC'] name = vm_rec['name_label'] @@ -1340,7 +1340,7 @@ class VMOps(object): """ if vm_ref: # this function raises if vm_ref is not a vm_opaque_ref - self._session.get_xenapi().VM.get_record(vm_ref) + self._session.call_xenapi("VM.get_record", vm_ref) else: vm_ref = VMHelper.lookup(self._session, instance.name) logging.debug(_("injecting network info to xs for vm: |%s|"), vm_ref) @@ -1364,7 +1364,7 @@ class VMOps(object): logging.debug(_("creating vif(s) for vm: |%s|"), vm_ref) # this function raises if vm_ref is not a vm_opaque_ref - self._session.get_xenapi().VM.get_record(vm_ref) + self._session.call_xenapi("VM.get_record", vm_ref) for device, (network, info) in enumerate(network_info): vif_rec = self.vif_driver.plug(self._session, @@ -1473,7 +1473,7 @@ class VMOps(object): """ instance_id = vm.id vm_ref = vm_ref or self._get_vm_opaque_ref(vm) - vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) + vm_rec = self._session.call_xenapi("VM.get_record", vm_ref) args = {'dom_id': vm_rec['domid'], 'path': path} args.update(addl_args or {}) try: diff --git a/nova/virt/xenapi/volume_utils.py b/nova/virt/xenapi/volume_utils.py index ccb4e085d2cc..c2d8e511f50f 100644 --- a/nova/virt/xenapi/volume_utils.py +++ b/nova/virt/xenapi/volume_utils.py @@ -52,7 +52,7 @@ class VolumeHelper(HelperBase): Create an iSCSI storage repository that will be used to mount the volume for the specified instance """ - sr_ref = session.get_xenapi().SR.get_by_name_label(label) + sr_ref = session.call_xenapi("SR.get_by_name_label", label) if len(sr_ref) == 0: LOG.debug(_('Introducing %s...'), label) record = {} @@ -67,7 +67,7 @@ class VolumeHelper(HelperBase): 'port': info['targetPort'], 'targetIQN': info['targetIQN']} try: - sr_ref = session.get_xenapi().SR.create( + sr_ref = session.call_xenapi("SR.create", session.get_xenapi_host(), record, '0', label, description, 'iscsi', '', False, {}) @@ -83,8 +83,8 @@ class VolumeHelper(HelperBase): def find_sr_from_vbd(cls, session, vbd_ref): """Find the SR reference from the VBD reference""" try: - vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) - sr_ref = session.get_xenapi().VDI.get_SR(vdi_ref) + vdi_ref = session.call_xenapi("VBD.get_VDI", vbd_ref) + sr_ref = session.call_xenapi("VDI.get_SR", vdi_ref) except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('Unable to find SR from VBD %s') % vbd_ref) @@ -96,18 +96,18 @@ class VolumeHelper(HelperBase): LOG.debug(_("Forgetting SR %s ... "), sr_ref) pbds = [] try: - pbds = session.get_xenapi().SR.get_PBDs(sr_ref) + pbds = session.call_xenapi("SR.get_PBDs", sr_ref) except cls.XenAPI.Failure, exc: LOG.warn(_('Ignoring exception %(exc)s when getting PBDs' ' for %(sr_ref)s') % locals()) for pbd in pbds: try: - session.get_xenapi().PBD.unplug(pbd) + session.call_xenapi("PBD.unplug", pbd) except cls.XenAPI.Failure, exc: LOG.warn(_('Ignoring exception %(exc)s when unplugging' ' PBD %(pbd)s') % locals()) try: - session.get_xenapi().SR.forget(sr_ref) + session.call_xenapi("SR.forget", sr_ref) LOG.debug(_("Forgetting SR %s done."), sr_ref) except cls.XenAPI.Failure, exc: LOG.warn(_('Ignoring exception %(exc)s when forgetting' @@ -117,19 +117,19 @@ class VolumeHelper(HelperBase): def introduce_vdi(cls, session, sr_ref): """Introduce VDI in the host""" try: - vdi_refs = session.get_xenapi().SR.get_VDIs(sr_ref) + vdi_refs = session.call_xenapi("SR.get_VDIs", sr_ref) except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref) try: - vdi_rec = session.get_xenapi().VDI.get_record(vdi_refs[0]) + vdi_rec = session.call_xenapi("VDI.get_record", vdi_refs[0]) except cls.XenAPI.Failure, exc: LOG.exception(exc) raise StorageError(_('Unable to get record' ' of VDI %s on') % vdi_refs[0]) else: try: - return session.get_xenapi().VDI.introduce( + return session.call_xenapi("VDI.introduce", vdi_rec['uuid'], vdi_rec['name_label'], vdi_rec['name_description'], diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index aa200f6a4c22..7fb57d967676 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -57,6 +57,7 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block. - suffix "_rec" for record objects """ +import contextlib import json import random import sys @@ -65,6 +66,7 @@ import urlparse import xmlrpclib from eventlet import event +from eventlet import queue from eventlet import tpool from eventlet import timeout @@ -97,6 +99,10 @@ flags.DEFINE_string('xenapi_connection_password', None, 'Password for connection to XenServer/Xen Cloud Platform.' ' Used only if connection_type=xenapi.') +flags.DEFINE_integer('xenapi_connection_concurrent', + 5, + 'Maximum number of concurrent XenAPI connections.' + ' Used only if connection_type=xenapi.') flags.DEFINE_float('xenapi_task_poll_interval', 0.5, 'The interval used for polling of remote tasks ' @@ -401,44 +407,56 @@ class XenAPISession(object): def __init__(self, url, user, pw): self.XenAPI = self.get_imported_xenapi() - self._session = self._create_session(url) + self._sessions = queue.Queue() exception = self.XenAPI.Failure(_("Unable to log in to XenAPI " "(is the Dom0 disk full?)")) - with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): - self._session.login_with_password(user, pw) + for i in xrange(FLAGS.xenapi_connection_concurrent): + session = self._create_session(url) + with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): + session.login_with_password(user, pw) + self._sessions.put(session) def get_imported_xenapi(self): """Stubout point. This can be replaced with a mock xenapi module.""" return __import__('XenAPI') - def get_xenapi(self): - """Return the xenapi object""" - return self._session.xenapi + @contextlib.contextmanager + def _get_session(self): + """Return exclusive session for scope of with statement""" + session = self._sessions.get() + try: + yield session + finally: + self._sessions.put(session) def get_xenapi_host(self): """Return the xenapi host""" - return self._session.xenapi.session.get_this_host(self._session.handle) + with self._get_session() as session: + return session.xenapi.session.get_this_host(session.handle) def call_xenapi(self, method, *args): """Call the specified XenAPI method on a background thread.""" - f = self._session.xenapi - for m in method.split('.'): - f = f.__getattr__(m) - return tpool.execute(f, *args) + with self._get_session() as session: + f = session.xenapi + for m in method.split('.'): + f = getattr(f, m) + return tpool.execute(f, *args) def call_xenapi_request(self, method, *args): """Some interactions with dom0, such as interacting with xenstore's param record, require using the xenapi_request method of the session object. This wraps that call on a background thread. """ - f = self._session.xenapi_request - return tpool.execute(f, method, *args) + with self._get_session() as session: + f = session.xenapi_request + return tpool.execute(f, method, *args) def async_call_plugin(self, plugin, fn, args): """Call Async.host.call_plugin on a background thread.""" - return tpool.execute(self._unwrap_plugin_exceptions, - self._session.xenapi.Async.host.call_plugin, - self.get_xenapi_host(), plugin, fn, args) + with self._get_session() as session: + return tpool.execute(self._unwrap_plugin_exceptions, + session.xenapi.Async.host.call_plugin, + self.get_xenapi_host(), plugin, fn, args) def wait_for_task(self, task, id=None): """Return the result of the given task. The task is polled @@ -452,8 +470,8 @@ class XenAPISession(object): """ try: ctxt = context.get_admin_context() - name = self._session.xenapi.task.get_name_label(task) - status = self._session.xenapi.task.get_status(task) + name = self.call_xenapi("task.get_name_label", task) + status = self.call_xenapi("task.get_status", task) # Ensure action is never > 255 action = dict(action=name[:255], error=None) @@ -464,7 +482,7 @@ class XenAPISession(object): if status == "pending": return elif status == "success": - result = self._session.xenapi.task.get_result(task) + result = self.call_xenapi("task.get_result", task) LOG.info(_("Task [%(name)s] %(task)s status:" " success %(result)s") % locals()) @@ -473,7 +491,7 @@ class XenAPISession(object): done.send(_parse_xmlrpc_value(result)) else: - error_info = self._session.xenapi.task.get_error_info(task) + error_info = self.call_xenapi("task.get_error_info", task) LOG.warn(_("Task [%(name)s] %(task)s status:" " %(status)s %(error_info)s") % locals()) @@ -559,7 +577,7 @@ class HostState(object): # No SR configured LOG.error(_("Unable to get SR for this host: %s") % e) return - sr_rec = self._session.get_xenapi().SR.get_record(sr_ref) + sr_rec = self._session.call_xenapi("SR.get_record", sr_ref) total = int(sr_rec["virtual_allocation"]) used = int(sr_rec["physical_utilisation"]) data["disk_total"] = total