Merge "Fix concurrency of XenAPI sessions"

This commit is contained in:
Jenkins
2011-10-25 19:11:41 +00:00
committed by Gerrit Code Review
6 changed files with 118 additions and 97 deletions

View File

@@ -1072,8 +1072,11 @@ class FakeSession(object):
'free-computed': 40} 'free-computed': 40}
return json.dumps({'host_memory': vm}) return json.dumps({'host_memory': vm})
def get_xenapi(self): def call_xenapi(self, method, *args):
return FakeXenApi() f = FakeXenApi()
for m in method.split('.'):
f = getattr(f, m)
return f(*args)
class HostStateTestCase(test.TestCase): class HostStateTestCase(test.TestCase):

View File

@@ -38,7 +38,7 @@ def stubout_instance_snapshot(stubs):
sr_ref = "fakesr" sr_ref = "fakesr"
vdi_ref = create_vdi(name_label=name_label, read_only=False, vdi_ref = create_vdi(name_label=name_label, read_only=False,
sr_ref=sr_ref, sharable=False) sr_ref=sr_ref, sharable=False)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
vdi_uuid = vdi_rec['uuid'] vdi_uuid = vdi_rec['uuid']
return [dict(vdi_type='os', vdi_uuid=vdi_uuid)] return [dict(vdi_type='os', vdi_uuid=vdi_uuid)]
@@ -307,7 +307,7 @@ def stub_out_migration_methods(stubs):
def fake_get_vdi(cls, session, vm_ref): def fake_get_vdi(cls, session, vm_ref):
vdi_ref = fake.create_vdi(name_label='derp', read_only=False, vdi_ref = fake.create_vdi(name_label='derp', read_only=False,
sr_ref='herp', sharable=False) sr_ref='herp', sharable=False)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
return vdi_ref, {'uuid': vdi_rec['uuid'], } return vdi_ref, {'uuid': vdi_rec['uuid'], }
def fake_shutdown(self, inst, vm, hard=True): def fake_shutdown(self, inst, vm, hard=True):

View File

@@ -205,8 +205,8 @@ class VMHelper(HelperBase):
mem = long(instance_type['memory_mb']) * 1024 * 1024 mem = long(instance_type['memory_mb']) * 1024 * 1024
#get free memory from host #get free memory from host
host = session.get_xenapi_host() host = session.get_xenapi_host()
host_free_mem = long(session.get_xenapi().host. host_free_mem = long(session.call_xenapi("host.compute_free_memory",
compute_free_memory(host)) host))
return host_free_mem >= mem return host_free_mem >= mem
@classmethod @classmethod
@@ -260,11 +260,11 @@ class VMHelper(HelperBase):
@classmethod @classmethod
def find_vbd_by_number(cls, session, vm_ref, number): def find_vbd_by_number(cls, session, vm_ref, number):
"""Get the VBD reference from the device number""" """Get the VBD reference from the device number"""
vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref)
if vbd_refs: if vbd_refs:
for vbd_ref in vbd_refs: for vbd_ref in vbd_refs:
try: try:
vbd_rec = session.get_xenapi().VBD.get_record(vbd_ref) vbd_rec = session.call_xenapi("VBD.get_record", vbd_ref)
if vbd_rec['userdevice'] == str(number): if vbd_rec['userdevice'] == str(number):
return vbd_ref return vbd_ref
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
@@ -303,7 +303,7 @@ class VMHelper(HelperBase):
@classmethod @classmethod
def create_vdi(cls, session, sr_ref, name_label, virtual_size, read_only): def create_vdi(cls, session, sr_ref, name_label, virtual_size, read_only):
"""Create a VDI record and returns its reference.""" """Create a VDI record and returns its reference."""
vdi_ref = session.get_xenapi().VDI.create( vdi_ref = session.call_xenapi("VDI.create",
{'name_label': name_label, {'name_label': name_label,
'name_description': '', 'name_description': '',
'SR': sr_ref, 'SR': sr_ref,
@@ -322,18 +322,18 @@ class VMHelper(HelperBase):
@classmethod @classmethod
def set_vdi_name_label(cls, session, vdi_uuid, name_label): def set_vdi_name_label(cls, session, vdi_uuid, name_label):
vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid) vdi_ref = session.call_xenapi("VDI.get_by_uuid", vdi_uuid)
session.get_xenapi().VDI.set_name_label(vdi_ref, name_label) session.call_xenapi("VDI.set_name_label", vdi_ref, name_label)
@classmethod @classmethod
def get_vdi_for_vm_safely(cls, session, vm_ref): def get_vdi_for_vm_safely(cls, session, vm_ref):
"""Retrieves the primary VDI for a VM""" """Retrieves the primary VDI for a VM"""
vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref)
for vbd in vbd_refs: for vbd in vbd_refs:
vbd_rec = session.get_xenapi().VBD.get_record(vbd) vbd_rec = session.call_xenapi("VBD.get_record", vbd)
# Convention dictates the primary VDI will be userdevice 0 # Convention dictates the primary VDI will be userdevice 0
if vbd_rec['userdevice'] == '0': if vbd_rec['userdevice'] == '0':
vdi_rec = session.get_xenapi().VDI.get_record(vbd_rec['VDI']) vdi_rec = session.call_xenapi("VDI.get_record", vbd_rec['VDI'])
return vbd_rec['VDI'], vdi_rec return vbd_rec['VDI'], vdi_rec
raise exception.Error(_("No primary VDI found for" raise exception.Error(_("No primary VDI found for"
"%(vm_ref)s") % locals()) "%(vm_ref)s") % locals())
@@ -377,7 +377,7 @@ class VMHelper(HelperBase):
snapshots or by restoring an image in the DISK_VHD format. snapshots or by restoring an image in the DISK_VHD format.
""" """
sr_ref = safe_find_sr(session) sr_ref = safe_find_sr(session)
sr_rec = session.get_xenapi().SR.get_record(sr_ref) sr_rec = session.call_xenapi("SR.get_record", sr_ref)
sr_uuid = sr_rec["uuid"] sr_uuid = sr_rec["uuid"]
return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid) return os.path.join(FLAGS.xenapi_sr_base_path, sr_uuid)
@@ -602,9 +602,9 @@ w
os_vdi_uuid = vdis[0]['vdi_uuid'] os_vdi_uuid = vdis[0]['vdi_uuid']
# Set the name-label to ease debugging # Set the name-label to ease debugging
vdi_ref = session.get_xenapi().VDI.get_by_uuid(os_vdi_uuid) vdi_ref = session.call_xenapi("VDI.get_by_uuid", os_vdi_uuid)
primary_name_label = instance.name primary_name_label = instance.name
session.get_xenapi().VDI.set_name_label(vdi_ref, primary_name_label) session.call_xenapi("VDI.set_name_label", vdi_ref, primary_name_label)
cls._check_vdi_size(context, session, instance, os_vdi_uuid) cls._check_vdi_size(context, session, instance, os_vdi_uuid)
return vdis return vdis
@@ -696,7 +696,7 @@ w
# If anything goes wrong, we need to remember its uuid. # If anything goes wrong, we need to remember its uuid.
try: try:
filename = None filename = None
vdi_uuid = session.get_xenapi().VDI.get_uuid(vdi_ref) vdi_uuid = session.call_xenapi("VDI.get_uuid", vdi_ref)
with vdi_attached_here(session, vdi_ref, read_only=False) as dev: with vdi_attached_here(session, vdi_ref, read_only=False) as dev:
_stream_disk(dev, image_type, virtual_size, image_file) _stream_disk(dev, image_type, virtual_size, image_file)
@@ -713,7 +713,7 @@ w
task = session.async_call_plugin('glance', fn, args) task = session.async_call_plugin('glance', fn, args)
filename = session.wait_for_task(task, instance_id) filename = session.wait_for_task(task, instance_id)
# Remove the VDI as it is not needed anymore. # Remove the VDI as it is not needed anymore.
session.get_xenapi().VDI.destroy(vdi_ref) session.call_xenapi("VDI.destroy", vdi_ref)
LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref) LOG.debug(_("Kernel/Ramdisk VDI %s destroyed"), vdi_ref)
return [dict(vdi_type=ImageType.to_string(image_type), return [dict(vdi_type=ImageType.to_string(image_type),
vdi_uuid=None, vdi_uuid=None,
@@ -828,12 +828,12 @@ w
@classmethod @classmethod
def set_vm_name_label(cls, session, vm_ref, name_label): def set_vm_name_label(cls, session, vm_ref, name_label):
session.get_xenapi().VM.set_name_label(vm_ref, name_label) session.call_xenapi("VM.set_name_label", vm_ref, name_label)
@classmethod @classmethod
def lookup(cls, session, name_label): def lookup(cls, session, name_label):
"""Look the instance up and return it if available""" """Look the instance up and return it if available"""
vm_refs = session.get_xenapi().VM.get_by_name_label(name_label) vm_refs = session.call_xenapi("VM.get_by_name_label", name_label)
n = len(vm_refs) n = len(vm_refs)
if n == 0: if n == 0:
return None return None
@@ -847,14 +847,14 @@ w
"""Look for the VDIs that are attached to the VM""" """Look for the VDIs that are attached to the VM"""
# Firstly we get the VBDs, then the VDIs. # Firstly we get the VBDs, then the VDIs.
# TODO(Armando): do we leave the read-only devices? # TODO(Armando): do we leave the read-only devices?
vbd_refs = session.get_xenapi().VM.get_VBDs(vm_ref) vbd_refs = session.call_xenapi("VM.get_VBDs", vm_ref)
vdi_refs = [] vdi_refs = []
if vbd_refs: if vbd_refs:
for vbd_ref in vbd_refs: for vbd_ref in vbd_refs:
try: try:
vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) vdi_ref = session.call_xenapi("VBD.get_VDI", vbd_ref)
# Test valid VDI # Test valid VDI
record = session.get_xenapi().VDI.get_record(vdi_ref) record = session.call_xenapi("VDI.get_record", vdi_ref)
LOG.debug(_('VDI %s is still available'), record['uuid']) LOG.debug(_('VDI %s is still available'), record['uuid'])
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
LOG.exception(exc) LOG.exception(exc)
@@ -884,7 +884,7 @@ w
@classmethod @classmethod
def lookup_kernel_ramdisk(cls, session, vm): def lookup_kernel_ramdisk(cls, session, vm):
vm_rec = session.get_xenapi().VM.get_record(vm) vm_rec = session.call_xenapi("VM.get_record", vm)
if 'PV_kernel' in vm_rec and 'PV_ramdisk' in vm_rec: if 'PV_kernel' in vm_rec and 'PV_ramdisk' in vm_rec:
return (vm_rec['PV_kernel'], vm_rec['PV_ramdisk']) return (vm_rec['PV_kernel'], vm_rec['PV_ramdisk'])
else: else:
@@ -908,7 +908,7 @@ w
"""Compile VM diagnostics data""" """Compile VM diagnostics data"""
try: try:
host = session.get_xenapi_host() host = session.get_xenapi_host()
host_ip = session.get_xenapi().host.get_record(host)["address"] host_ip = session.call_xenapi("host.get_record", host)["address"]
except (cls.XenAPI.Failure, KeyError) as e: except (cls.XenAPI.Failure, KeyError) as e:
return {"Unable to retrieve diagnostics": e} return {"Unable to retrieve diagnostics": e}
@@ -936,7 +936,7 @@ w
start_time = int(start_time) start_time = int(start_time)
try: try:
host = session.get_xenapi_host() host = session.get_xenapi_host()
host_ip = session.get_xenapi().host.get_record(host)["address"] host_ip = session.call_xenapi("host.get_record", host)["address"]
except (cls.XenAPI.Failure, KeyError) as e: except (cls.XenAPI.Failure, KeyError) as e:
raise exception.CouldNotFetchMetrics() raise exception.CouldNotFetchMetrics()
@@ -1066,8 +1066,8 @@ def get_vhd_parent(session, vdi_rec):
""" """
if 'vhd-parent' in vdi_rec['sm_config']: if 'vhd-parent' in vdi_rec['sm_config']:
parent_uuid = vdi_rec['sm_config']['vhd-parent'] parent_uuid = vdi_rec['sm_config']['vhd-parent']
parent_ref = session.get_xenapi().VDI.get_by_uuid(parent_uuid) parent_ref = session.call_xenapi("VDI.get_by_uuid", parent_uuid)
parent_rec = session.get_xenapi().VDI.get_record(parent_ref) parent_rec = session.call_xenapi("VDI.get_record", parent_ref)
vdi_uuid = vdi_rec['uuid'] vdi_uuid = vdi_rec['uuid']
LOG.debug(_("VHD %(vdi_uuid)s has parent %(parent_ref)s") % locals()) LOG.debug(_("VHD %(vdi_uuid)s has parent %(parent_ref)s") % locals())
return parent_ref, parent_rec return parent_ref, parent_rec
@@ -1076,7 +1076,7 @@ def get_vhd_parent(session, vdi_rec):
def get_vhd_parent_uuid(session, vdi_ref): def get_vhd_parent_uuid(session, vdi_ref):
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
ret = get_vhd_parent(session, vdi_rec) ret = get_vhd_parent(session, vdi_rec)
if ret: if ret:
parent_ref, parent_rec = ret parent_ref, parent_rec = ret
@@ -1089,8 +1089,8 @@ def walk_vdi_chain(session, vdi_uuid):
"""Yield vdi_recs for each element in a VDI chain""" """Yield vdi_recs for each element in a VDI chain"""
# TODO(jk0): perhaps make get_vhd_parent use this # TODO(jk0): perhaps make get_vhd_parent use this
while True: while True:
vdi_ref = session.get_xenapi().VDI.get_by_uuid(vdi_uuid) vdi_ref = session.call_xenapi("VDI.get_by_uuid", vdi_uuid)
vdi_rec = session.get_xenapi().VDI.get_record(vdi_ref) vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
yield vdi_rec yield vdi_rec
parent_uuid = vdi_rec['sm_config'].get('vhd-parent') parent_uuid = vdi_rec['sm_config'].get('vhd-parent')
@@ -1153,14 +1153,14 @@ def safe_find_sr(session):
def find_sr(session): def find_sr(session):
"""Return the storage repository to hold VM images""" """Return the storage repository to hold VM images"""
host = session.get_xenapi_host() host = session.get_xenapi_host()
sr_refs = session.get_xenapi().SR.get_all() sr_refs = session.call_xenapi("SR.get_all")
for sr_ref in sr_refs: for sr_ref in sr_refs:
sr_rec = session.get_xenapi().SR.get_record(sr_ref) sr_rec = session.call_xenapi("SR.get_record", sr_ref)
if not ('i18n-key' in sr_rec['other_config'] and if not ('i18n-key' in sr_rec['other_config'] and
sr_rec['other_config']['i18n-key'] == 'local-storage'): sr_rec['other_config']['i18n-key'] == 'local-storage'):
continue continue
for pbd_ref in sr_rec['PBDs']: for pbd_ref in sr_rec['PBDs']:
pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref) pbd_rec = session.call_xenapi("PBD.get_record", pbd_ref)
if pbd_rec['host'] == host: if pbd_rec['host'] == host:
return sr_ref return sr_ref
return None return None
@@ -1179,9 +1179,9 @@ def safe_find_iso_sr(session):
def find_iso_sr(session): def find_iso_sr(session):
"""Return the storage repository to hold ISO images""" """Return the storage repository to hold ISO images"""
host = session.get_xenapi_host() host = session.get_xenapi_host()
sr_refs = session.get_xenapi().SR.get_all() sr_refs = session.call_xenapi("SR.get_all")
for sr_ref in sr_refs: for sr_ref in sr_refs:
sr_rec = session.get_xenapi().SR.get_record(sr_ref) sr_rec = session.call_xenapi("SR.get_record", sr_ref)
LOG.debug(_("ISO: looking at SR %(sr_rec)s") % locals()) LOG.debug(_("ISO: looking at SR %(sr_rec)s") % locals())
if not sr_rec['content_type'] == 'iso': if not sr_rec['content_type'] == 'iso':
@@ -1198,7 +1198,7 @@ def find_iso_sr(session):
LOG.debug(_("ISO: SR MATCHing our criteria")) LOG.debug(_("ISO: SR MATCHing our criteria"))
for pbd_ref in sr_rec['PBDs']: for pbd_ref in sr_rec['PBDs']:
LOG.debug(_("ISO: ISO, looking to see if it is host local")) LOG.debug(_("ISO: ISO, looking to see if it is host local"))
pbd_rec = session.get_xenapi().PBD.get_record(pbd_ref) pbd_rec = session.call_xenapi("PBD.get_record", pbd_ref)
pbd_rec_host = pbd_rec['host'] pbd_rec_host = pbd_rec['host']
LOG.debug(_("ISO: PBD matching, want %(pbd_rec)s, have %(host)s") % LOG.debug(_("ISO: PBD matching, want %(pbd_rec)s, have %(host)s") %
locals()) locals())
@@ -1257,14 +1257,14 @@ def vdi_attached_here(session, vdi_ref, read_only=False):
vbd_rec['qos_algorithm_params'] = {} vbd_rec['qos_algorithm_params'] = {}
vbd_rec['qos_supported_algorithms'] = [] vbd_rec['qos_supported_algorithms'] = []
LOG.debug(_('Creating VBD for VDI %s ... '), vdi_ref) LOG.debug(_('Creating VBD for VDI %s ... '), vdi_ref)
vbd_ref = session.get_xenapi().VBD.create(vbd_rec) vbd_ref = session.call_xenapi("VBD.create", vbd_rec)
LOG.debug(_('Creating VBD for VDI %s done.'), vdi_ref) LOG.debug(_('Creating VBD for VDI %s done.'), vdi_ref)
try: try:
LOG.debug(_('Plugging VBD %s ... '), vbd_ref) LOG.debug(_('Plugging VBD %s ... '), vbd_ref)
session.get_xenapi().VBD.plug(vbd_ref) session.call_xenapi("VBD.plug", vbd_ref)
try: try:
LOG.debug(_('Plugging VBD %s done.'), vbd_ref) LOG.debug(_('Plugging VBD %s done.'), vbd_ref)
orig_dev = session.get_xenapi().VBD.get_device(vbd_ref) orig_dev = session.call_xenapi("VBD.get_device", vbd_ref)
LOG.debug(_('VBD %(vbd_ref)s plugged as %(orig_dev)s') % locals()) LOG.debug(_('VBD %(vbd_ref)s plugged as %(orig_dev)s') % locals())
dev = remap_vbd_dev(orig_dev) dev = remap_vbd_dev(orig_dev)
if dev != orig_dev: if dev != orig_dev:
@@ -1280,7 +1280,7 @@ def vdi_attached_here(session, vdi_ref, read_only=False):
LOG.debug(_('Destroying VBD for VDI %s ... '), vdi_ref) LOG.debug(_('Destroying VBD for VDI %s ... '), vdi_ref)
vbd_unplug_with_retry(session, vbd_ref) vbd_unplug_with_retry(session, vbd_ref)
finally: finally:
ignore_failure(session.get_xenapi().VBD.destroy, vbd_ref) ignore_failure(session.call_xenapi, "VBD.destroy", vbd_ref)
LOG.debug(_('Destroying VBD for VDI %s done.'), vdi_ref) LOG.debug(_('Destroying VBD for VDI %s done.'), vdi_ref)
@@ -1292,7 +1292,7 @@ def vbd_unplug_with_retry(session, vbd_ref):
# FIXME(sirp): We can use LoopingCall here w/o blocking sleep() # FIXME(sirp): We can use LoopingCall here w/o blocking sleep()
while True: while True:
try: try:
session.get_xenapi().VBD.unplug(vbd_ref) session.call_xenapi("VBD.unplug", vbd_ref)
LOG.debug(_('VBD.unplug successful first time.')) LOG.debug(_('VBD.unplug successful first time.'))
return return
except VMHelper.XenAPI.Failure, e: except VMHelper.XenAPI.Failure, e:
@@ -1325,7 +1325,7 @@ def get_this_vm_uuid():
def get_this_vm_ref(session): def get_this_vm_ref(session):
return session.get_xenapi().VM.get_by_uuid(get_this_vm_uuid()) return session.call_xenapi("VM.get_by_uuid", get_this_vm_uuid())
def _is_vdi_pv(dev): def _is_vdi_pv(dev):

View File

@@ -97,8 +97,8 @@ class VMOps(object):
# TODO(justinsb): Should we just always use the details method? # TODO(justinsb): Should we just always use the details method?
# Seems to be the same number of API calls.. # Seems to be the same number of API calls..
vm_refs = [] vm_refs = []
for vm_ref in self._session.get_xenapi().VM.get_all(): for vm_ref in self._session.call_xenapi("VM.get_all"):
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]:
vm_refs.append(vm_rec["name_label"]) vm_refs.append(vm_rec["name_label"])
return vm_refs return vm_refs
@@ -106,8 +106,8 @@ class VMOps(object):
def list_instances_detail(self): def list_instances_detail(self):
"""List VM instances, returning InstanceInfo objects.""" """List VM instances, returning InstanceInfo objects."""
instance_infos = [] instance_infos = []
for vm_ref in self._session.get_xenapi().VM.get_all(): for vm_ref in self._session.call_xenapi("VM.get_all"):
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]: if not vm_rec["is_a_template"] and not vm_rec["is_control_domain"]:
name = vm_rec["name_label"] name = vm_rec["name_label"]
@@ -519,7 +519,7 @@ class VMOps(object):
obj = None obj = None
try: try:
# check for opaque ref # check for opaque ref
obj = self._session.get_xenapi().VM.get_uuid(instance_or_vm) obj = self._session.call_xenapi("VM.get_uuid", instance_or_vm)
return instance_or_vm return instance_or_vm
except self.XenAPI.Failure: except self.XenAPI.Failure:
# wasn't an opaque ref, can be an instance name # wasn't an opaque ref, can be an instance name
@@ -788,7 +788,7 @@ class VMOps(object):
return resp['message'].replace('\\r\\n', '') return resp['message'].replace('\\r\\n', '')
vm_ref = self._get_vm_opaque_ref(instance) vm_ref = self._get_vm_opaque_ref(instance)
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
domid = vm_rec['domid'] domid = vm_rec['domid']
@@ -798,7 +798,7 @@ class VMOps(object):
if ret: if ret:
return ret return ret
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
if vm_rec['domid'] != domid: if vm_rec['domid'] != domid:
LOG.info(_('domid changed from %(olddomid)s to ' LOG.info(_('domid changed from %(olddomid)s to '
'%(newdomid)s') % { '%(newdomid)s') % {
@@ -913,8 +913,8 @@ class VMOps(object):
We use the second VBD here because swap is first with the root file We use the second VBD here because swap is first with the root file
system coming in second.""" system coming in second."""
vbd_ref = self._session.get_xenapi().VM.get_VBDs(vm_ref)[1] vbd_ref = self._session.call_xenapi("VM.get_VBDs", vm_ref)[1]
vdi_ref = self._session.get_xenapi().VBD.get_record(vbd_ref)["VDI"] vdi_ref = self._session.call_xenapi("VBD.get_record", vbd_ref)["VDI"]
return VMHelper.create_vbd(self._session, rescue_vm_ref, vdi_ref, 1, return VMHelper.create_vbd(self._session, rescue_vm_ref, vdi_ref, 1,
False) False)
@@ -951,9 +951,9 @@ class VMOps(object):
def _destroy_rescue_vbds(self, rescue_vm_ref): def _destroy_rescue_vbds(self, rescue_vm_ref):
"""Destroys all VBDs tied to a rescue VM.""" """Destroys all VBDs tied to a rescue VM."""
vbd_refs = self._session.get_xenapi().VM.get_VBDs(rescue_vm_ref) vbd_refs = self._session.call_xenapi("VM.get_VBDs", rescue_vm_ref)
for vbd_ref in vbd_refs: for vbd_ref in vbd_refs:
vbd_rec = self._session.get_xenapi().VBD.get_record(vbd_ref) vbd_rec = self._session.call_xenapi("VBD.get_record", vbd_ref)
if vbd_rec.get("userdevice", None) == "1": # VBD is always 1 if vbd_rec.get("userdevice", None) == "1": # VBD is always 1
VMHelper.unplug_vbd(self._session, vbd_ref) VMHelper.unplug_vbd(self._session, vbd_ref)
VMHelper.destroy_vbd(self._session, vbd_ref) VMHelper.destroy_vbd(self._session, vbd_ref)
@@ -1147,14 +1147,14 @@ class VMOps(object):
def _cancel_stale_tasks(self, timeout, task): def _cancel_stale_tasks(self, timeout, task):
"""Cancel the given tasks that are older than the given timeout.""" """Cancel the given tasks that are older than the given timeout."""
task_refs = self._session.get_xenapi().task.get_by_name_label(task) task_refs = self._session.call_xenapi("task.get_by_name_label", task)
for task_ref in task_refs: for task_ref in task_refs:
task_rec = self._session.get_xenapi().task.get_record(task_ref) task_rec = self._session.call_xenapi("task.get_record", task_ref)
task_created = utils.parse_strtime(task_rec["created"].value, task_created = utils.parse_strtime(task_rec["created"].value,
"%Y%m%dT%H:%M:%SZ") "%Y%m%dT%H:%M:%SZ")
if utils.is_older_than(task_created, timeout): if utils.is_older_than(task_created, timeout):
self._session.get_xenapi().task.cancel(task_ref) self._session.call_xenapi("task.cancel", task_ref)
def poll_rebooting_instances(self, timeout): def poll_rebooting_instances(self, timeout):
"""Look for expirable rebooting instances. """Look for expirable rebooting instances.
@@ -1243,13 +1243,13 @@ class VMOps(object):
def get_info(self, instance): def get_info(self, instance):
"""Return data about VM instance.""" """Return data about VM instance."""
vm_ref = self._get_vm_opaque_ref(instance) vm_ref = self._get_vm_opaque_ref(instance)
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
return VMHelper.compile_info(vm_rec) return VMHelper.compile_info(vm_rec)
def get_diagnostics(self, instance): def get_diagnostics(self, instance):
"""Return data about VM diagnostics.""" """Return data about VM diagnostics."""
vm_ref = self._get_vm_opaque_ref(instance) vm_ref = self._get_vm_opaque_ref(instance)
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
return VMHelper.compile_diagnostics(self._session, vm_rec) return VMHelper.compile_diagnostics(self._session, vm_rec)
def get_all_bw_usage(self, start_time, stop_time=None): def get_all_bw_usage(self, start_time, stop_time=None):
@@ -1264,10 +1264,10 @@ class VMOps(object):
exc_info=sys.exc_info()) exc_info=sys.exc_info())
bw = {} bw = {}
for uuid, data in metrics.iteritems(): for uuid, data in metrics.iteritems():
vm_ref = self._session.get_xenapi().VM.get_by_uuid(uuid) vm_ref = self._session.call_xenapi("VM.get_by_uuid", uuid)
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
vif_map = {} vif_map = {}
for vif in [self._session.get_xenapi().VIF.get_record(vrec) for vif in [self._session.call_xenapi("VIF.get_record", vrec)
for vrec in vm_rec['VIFs']]: for vrec in vm_rec['VIFs']]:
vif_map[vif['device']] = vif['MAC'] vif_map[vif['device']] = vif['MAC']
name = vm_rec['name_label'] name = vm_rec['name_label']
@@ -1340,7 +1340,7 @@ class VMOps(object):
""" """
if vm_ref: if vm_ref:
# this function raises if vm_ref is not a vm_opaque_ref # this function raises if vm_ref is not a vm_opaque_ref
self._session.get_xenapi().VM.get_record(vm_ref) self._session.call_xenapi("VM.get_record", vm_ref)
else: else:
vm_ref = VMHelper.lookup(self._session, instance.name) vm_ref = VMHelper.lookup(self._session, instance.name)
logging.debug(_("injecting network info to xs for vm: |%s|"), vm_ref) logging.debug(_("injecting network info to xs for vm: |%s|"), vm_ref)
@@ -1364,7 +1364,7 @@ class VMOps(object):
logging.debug(_("creating vif(s) for vm: |%s|"), vm_ref) logging.debug(_("creating vif(s) for vm: |%s|"), vm_ref)
# this function raises if vm_ref is not a vm_opaque_ref # this function raises if vm_ref is not a vm_opaque_ref
self._session.get_xenapi().VM.get_record(vm_ref) self._session.call_xenapi("VM.get_record", vm_ref)
for device, (network, info) in enumerate(network_info): for device, (network, info) in enumerate(network_info):
vif_rec = self.vif_driver.plug(self._session, vif_rec = self.vif_driver.plug(self._session,
@@ -1473,7 +1473,7 @@ class VMOps(object):
""" """
instance_id = vm.id instance_id = vm.id
vm_ref = vm_ref or self._get_vm_opaque_ref(vm) vm_ref = vm_ref or self._get_vm_opaque_ref(vm)
vm_rec = self._session.get_xenapi().VM.get_record(vm_ref) vm_rec = self._session.call_xenapi("VM.get_record", vm_ref)
args = {'dom_id': vm_rec['domid'], 'path': path} args = {'dom_id': vm_rec['domid'], 'path': path}
args.update(addl_args or {}) args.update(addl_args or {})
try: try:

View File

@@ -52,7 +52,7 @@ class VolumeHelper(HelperBase):
Create an iSCSI storage repository that will be used to mount Create an iSCSI storage repository that will be used to mount
the volume for the specified instance the volume for the specified instance
""" """
sr_ref = session.get_xenapi().SR.get_by_name_label(label) sr_ref = session.call_xenapi("SR.get_by_name_label", label)
if len(sr_ref) == 0: if len(sr_ref) == 0:
LOG.debug(_('Introducing %s...'), label) LOG.debug(_('Introducing %s...'), label)
record = {} record = {}
@@ -67,7 +67,7 @@ class VolumeHelper(HelperBase):
'port': info['targetPort'], 'port': info['targetPort'],
'targetIQN': info['targetIQN']} 'targetIQN': info['targetIQN']}
try: try:
sr_ref = session.get_xenapi().SR.create( sr_ref = session.call_xenapi("SR.create",
session.get_xenapi_host(), session.get_xenapi_host(),
record, record,
'0', label, description, 'iscsi', '', False, {}) '0', label, description, 'iscsi', '', False, {})
@@ -83,8 +83,8 @@ class VolumeHelper(HelperBase):
def find_sr_from_vbd(cls, session, vbd_ref): def find_sr_from_vbd(cls, session, vbd_ref):
"""Find the SR reference from the VBD reference""" """Find the SR reference from the VBD reference"""
try: try:
vdi_ref = session.get_xenapi().VBD.get_VDI(vbd_ref) vdi_ref = session.call_xenapi("VBD.get_VDI", vbd_ref)
sr_ref = session.get_xenapi().VDI.get_SR(vdi_ref) sr_ref = session.call_xenapi("VDI.get_SR", vdi_ref)
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
LOG.exception(exc) LOG.exception(exc)
raise StorageError(_('Unable to find SR from VBD %s') % vbd_ref) raise StorageError(_('Unable to find SR from VBD %s') % vbd_ref)
@@ -96,18 +96,18 @@ class VolumeHelper(HelperBase):
LOG.debug(_("Forgetting SR %s ... "), sr_ref) LOG.debug(_("Forgetting SR %s ... "), sr_ref)
pbds = [] pbds = []
try: try:
pbds = session.get_xenapi().SR.get_PBDs(sr_ref) pbds = session.call_xenapi("SR.get_PBDs", sr_ref)
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
LOG.warn(_('Ignoring exception %(exc)s when getting PBDs' LOG.warn(_('Ignoring exception %(exc)s when getting PBDs'
' for %(sr_ref)s') % locals()) ' for %(sr_ref)s') % locals())
for pbd in pbds: for pbd in pbds:
try: try:
session.get_xenapi().PBD.unplug(pbd) session.call_xenapi("PBD.unplug", pbd)
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
LOG.warn(_('Ignoring exception %(exc)s when unplugging' LOG.warn(_('Ignoring exception %(exc)s when unplugging'
' PBD %(pbd)s') % locals()) ' PBD %(pbd)s') % locals())
try: try:
session.get_xenapi().SR.forget(sr_ref) session.call_xenapi("SR.forget", sr_ref)
LOG.debug(_("Forgetting SR %s done."), sr_ref) LOG.debug(_("Forgetting SR %s done."), sr_ref)
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
LOG.warn(_('Ignoring exception %(exc)s when forgetting' LOG.warn(_('Ignoring exception %(exc)s when forgetting'
@@ -117,19 +117,19 @@ class VolumeHelper(HelperBase):
def introduce_vdi(cls, session, sr_ref): def introduce_vdi(cls, session, sr_ref):
"""Introduce VDI in the host""" """Introduce VDI in the host"""
try: try:
vdi_refs = session.get_xenapi().SR.get_VDIs(sr_ref) vdi_refs = session.call_xenapi("SR.get_VDIs", sr_ref)
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
LOG.exception(exc) LOG.exception(exc)
raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref) raise StorageError(_('Unable to introduce VDI on SR %s') % sr_ref)
try: try:
vdi_rec = session.get_xenapi().VDI.get_record(vdi_refs[0]) vdi_rec = session.call_xenapi("VDI.get_record", vdi_refs[0])
except cls.XenAPI.Failure, exc: except cls.XenAPI.Failure, exc:
LOG.exception(exc) LOG.exception(exc)
raise StorageError(_('Unable to get record' raise StorageError(_('Unable to get record'
' of VDI %s on') % vdi_refs[0]) ' of VDI %s on') % vdi_refs[0])
else: else:
try: try:
return session.get_xenapi().VDI.introduce( return session.call_xenapi("VDI.introduce",
vdi_rec['uuid'], vdi_rec['uuid'],
vdi_rec['name_label'], vdi_rec['name_label'],
vdi_rec['name_description'], vdi_rec['name_description'],

View File

@@ -57,6 +57,7 @@ reactor thread if the VM.get_by_name_label or VM.get_record calls block.
- suffix "_rec" for record objects - suffix "_rec" for record objects
""" """
import contextlib
import json import json
import random import random
import sys import sys
@@ -65,6 +66,7 @@ import urlparse
import xmlrpclib import xmlrpclib
from eventlet import event from eventlet import event
from eventlet import queue
from eventlet import tpool from eventlet import tpool
from eventlet import timeout from eventlet import timeout
@@ -97,6 +99,10 @@ flags.DEFINE_string('xenapi_connection_password',
None, None,
'Password for connection to XenServer/Xen Cloud Platform.' 'Password for connection to XenServer/Xen Cloud Platform.'
' Used only if connection_type=xenapi.') ' Used only if connection_type=xenapi.')
flags.DEFINE_integer('xenapi_connection_concurrent',
5,
'Maximum number of concurrent XenAPI connections.'
' Used only if connection_type=xenapi.')
flags.DEFINE_float('xenapi_task_poll_interval', flags.DEFINE_float('xenapi_task_poll_interval',
0.5, 0.5,
'The interval used for polling of remote tasks ' 'The interval used for polling of remote tasks '
@@ -401,44 +407,56 @@ class XenAPISession(object):
def __init__(self, url, user, pw): def __init__(self, url, user, pw):
self.XenAPI = self.get_imported_xenapi() self.XenAPI = self.get_imported_xenapi()
self._session = self._create_session(url) self._sessions = queue.Queue()
exception = self.XenAPI.Failure(_("Unable to log in to XenAPI " exception = self.XenAPI.Failure(_("Unable to log in to XenAPI "
"(is the Dom0 disk full?)")) "(is the Dom0 disk full?)"))
with timeout.Timeout(FLAGS.xenapi_login_timeout, exception): for i in xrange(FLAGS.xenapi_connection_concurrent):
self._session.login_with_password(user, pw) session = self._create_session(url)
with timeout.Timeout(FLAGS.xenapi_login_timeout, exception):
session.login_with_password(user, pw)
self._sessions.put(session)
def get_imported_xenapi(self): def get_imported_xenapi(self):
"""Stubout point. This can be replaced with a mock xenapi module.""" """Stubout point. This can be replaced with a mock xenapi module."""
return __import__('XenAPI') return __import__('XenAPI')
def get_xenapi(self): @contextlib.contextmanager
"""Return the xenapi object""" def _get_session(self):
return self._session.xenapi """Return exclusive session for scope of with statement"""
session = self._sessions.get()
try:
yield session
finally:
self._sessions.put(session)
def get_xenapi_host(self): def get_xenapi_host(self):
"""Return the xenapi host""" """Return the xenapi host"""
return self._session.xenapi.session.get_this_host(self._session.handle) with self._get_session() as session:
return session.xenapi.session.get_this_host(session.handle)
def call_xenapi(self, method, *args): def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread.""" """Call the specified XenAPI method on a background thread."""
f = self._session.xenapi with self._get_session() as session:
for m in method.split('.'): f = session.xenapi
f = f.__getattr__(m) for m in method.split('.'):
return tpool.execute(f, *args) f = getattr(f, m)
return tpool.execute(f, *args)
def call_xenapi_request(self, method, *args): def call_xenapi_request(self, method, *args):
"""Some interactions with dom0, such as interacting with xenstore's """Some interactions with dom0, such as interacting with xenstore's
param record, require using the xenapi_request method of the session param record, require using the xenapi_request method of the session
object. This wraps that call on a background thread. object. This wraps that call on a background thread.
""" """
f = self._session.xenapi_request with self._get_session() as session:
return tpool.execute(f, method, *args) f = session.xenapi_request
return tpool.execute(f, method, *args)
def async_call_plugin(self, plugin, fn, args): def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread.""" """Call Async.host.call_plugin on a background thread."""
return tpool.execute(self._unwrap_plugin_exceptions, with self._get_session() as session:
self._session.xenapi.Async.host.call_plugin, return tpool.execute(self._unwrap_plugin_exceptions,
self.get_xenapi_host(), plugin, fn, args) session.xenapi.Async.host.call_plugin,
self.get_xenapi_host(), plugin, fn, args)
def wait_for_task(self, task, id=None): def wait_for_task(self, task, id=None):
"""Return the result of the given task. The task is polled """Return the result of the given task. The task is polled
@@ -452,8 +470,8 @@ class XenAPISession(object):
""" """
try: try:
ctxt = context.get_admin_context() ctxt = context.get_admin_context()
name = self._session.xenapi.task.get_name_label(task) name = self.call_xenapi("task.get_name_label", task)
status = self._session.xenapi.task.get_status(task) status = self.call_xenapi("task.get_status", task)
# Ensure action is never > 255 # Ensure action is never > 255
action = dict(action=name[:255], error=None) action = dict(action=name[:255], error=None)
@@ -464,7 +482,7 @@ class XenAPISession(object):
if status == "pending": if status == "pending":
return return
elif status == "success": elif status == "success":
result = self._session.xenapi.task.get_result(task) result = self.call_xenapi("task.get_result", task)
LOG.info(_("Task [%(name)s] %(task)s status:" LOG.info(_("Task [%(name)s] %(task)s status:"
" success %(result)s") % locals()) " success %(result)s") % locals())
@@ -473,7 +491,7 @@ class XenAPISession(object):
done.send(_parse_xmlrpc_value(result)) done.send(_parse_xmlrpc_value(result))
else: else:
error_info = self._session.xenapi.task.get_error_info(task) error_info = self.call_xenapi("task.get_error_info", task)
LOG.warn(_("Task [%(name)s] %(task)s status:" LOG.warn(_("Task [%(name)s] %(task)s status:"
" %(status)s %(error_info)s") % locals()) " %(status)s %(error_info)s") % locals())
@@ -559,7 +577,7 @@ class HostState(object):
# No SR configured # No SR configured
LOG.error(_("Unable to get SR for this host: %s") % e) LOG.error(_("Unable to get SR for this host: %s") % e)
return return
sr_rec = self._session.get_xenapi().SR.get_record(sr_ref) sr_rec = self._session.call_xenapi("SR.get_record", sr_ref)
total = int(sr_rec["virtual_allocation"]) total = int(sr_rec["virtual_allocation"])
used = int(sr_rec["physical_utilisation"]) used = int(sr_rec["physical_utilisation"])
data["disk_total"] = total data["disk_total"] = total