xenapi: Make dom0 serialization consistent.
The dom0 plugin code had been using `pickle` for serializing input and `json` for serializing output which was needlessly inconsistent. This patch makes the code use `pickle`--chosen for its better handling of `datetime` objects--for both sending and receiving data. This patch also refactors the code so that neither the caller nor the callee need to explicitly worry about serialization: the caller just passes in args and kwargs, and the callee's function signature just accepts the args and kwargs as usual. Bonus: Removes unecessary imports Change-Id: I3abb42eeebd8d37d67e6c26fa7bcae66d876b3ee
This commit is contained in:
parent
a68dfb7c9a
commit
6392ad2924
nova
plugins/xenserver/xenapi/etc/xapi.d/plugins
@ -2105,6 +2105,10 @@ class VmUtilsTestCase(test.TestCase):
|
||||
def call_plugin(session_self, service, command, kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
def call_plugin_serialized(session_self, service, command, *args,
|
||||
**kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
def fake_dumps(thing):
|
||||
return thing
|
||||
|
||||
@ -2119,7 +2123,7 @@ class VmUtilsTestCase(test.TestCase):
|
||||
session = FakeSession()
|
||||
vm_utils.upload_image(ctx, session, instance, "vmi uuids", "image id")
|
||||
|
||||
actual = self.kwargs['params']['properties']
|
||||
actual = self.kwargs['properties']
|
||||
expected = dict(a=1, b=2, c='c', d='d',
|
||||
auto_disk_config='auto disk config',
|
||||
os_type='os type')
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""Stubouts, mocks and fixtures for the test suite"""
|
||||
|
||||
import contextlib
|
||||
import pickle
|
||||
import random
|
||||
import sys
|
||||
|
||||
@ -169,7 +170,7 @@ class FakeSessionForVMTests(fake.SessionBase):
|
||||
def host_call_plugin(self, _1, _2, plugin, method, _5):
|
||||
if (plugin, method) == ('glance', 'download_vhd'):
|
||||
root_uuid = _make_fake_vdi()
|
||||
return jsonutils.dumps(dict(root=dict(uuid=root_uuid)))
|
||||
return pickle.dumps(dict(root=dict(uuid=root_uuid)))
|
||||
elif (plugin, method) == ("xenhost", "iptables_config"):
|
||||
return fake.as_json(out=self._fake_iptables_save_output,
|
||||
err='')
|
||||
@ -181,8 +182,8 @@ class FakeSessionForVMTests(fake.SessionBase):
|
||||
if (plugin, method) == ('glance', 'download_vhd'):
|
||||
root_uuid = _make_fake_vdi()
|
||||
swap_uuid = _make_fake_vdi()
|
||||
return jsonutils.dumps(dict(root=dict(uuid=root_uuid),
|
||||
swap=dict(uuid=swap_uuid)))
|
||||
return pickle.dumps(dict(root=dict(uuid=root_uuid),
|
||||
swap=dict(uuid=swap_uuid)))
|
||||
else:
|
||||
return (super(FakeSessionForVMTests, self).
|
||||
host_call_plugin(_1, _2, plugin, method, _5))
|
||||
|
@ -38,6 +38,7 @@ A driver for XenServer or Xen Cloud Platform.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import cPickle as pickle
|
||||
import time
|
||||
import urlparse
|
||||
import xmlrpclib
|
||||
@ -710,6 +711,11 @@ class XenAPISession(object):
|
||||
session.xenapi.host.call_plugin,
|
||||
host, plugin, fn, args)
|
||||
|
||||
def call_plugin_serialized(self, plugin, fn, *args, **kwargs):
|
||||
params = {'params': pickle.dumps(dict(args=args, kwargs=kwargs))}
|
||||
rv = self.call_plugin(plugin, fn, params)
|
||||
return pickle.loads(rv)
|
||||
|
||||
def _create_session(self, url):
|
||||
"""Stubout point. This can be replaced with a mock session."""
|
||||
return self.XenAPI.Session(url)
|
||||
|
@ -50,7 +50,7 @@
|
||||
A fake XenAPI SDK.
|
||||
"""
|
||||
|
||||
|
||||
import pickle
|
||||
import random
|
||||
import uuid
|
||||
from xml.sax import saxutils
|
||||
@ -561,12 +561,15 @@ class SessionBase(object):
|
||||
def _plugin_noop(self, method, args):
|
||||
return ''
|
||||
|
||||
_plugin_glance_upload_vhd = _plugin_noop
|
||||
def _plugin_pickle_noop(self, method, args):
|
||||
return pickle.dumps(None)
|
||||
|
||||
_plugin_glance_upload_vhd = _plugin_pickle_noop
|
||||
_plugin_kernel_copy_vdi = _plugin_noop
|
||||
_plugin_kernel_create_kernel_ramdisk = _plugin_noop
|
||||
_plugin_kernel_remove_kernel_ramdisk = _plugin_noop
|
||||
_plugin_migration_move_vhds_into_sr = _plugin_noop
|
||||
_plugin_migration_transfer_vhd = _plugin_noop
|
||||
_plugin_migration_transfer_vhd = _plugin_pickle_noop
|
||||
|
||||
def _plugin_xenhost_host_data(self, method, args):
|
||||
return jsonutils.dumps({'host_memory': {'total': 10,
|
||||
|
@ -22,7 +22,6 @@ their attributes like VDIs, VIFs, as well as their lookup functions.
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import cPickle as pickle
|
||||
import decimal
|
||||
import os
|
||||
import re
|
||||
@ -44,7 +43,6 @@ from nova import flags
|
||||
from nova.image import glance
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common import excutils
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import log as logging
|
||||
from nova import utils
|
||||
from nova.virt.disk import api as disk
|
||||
@ -467,17 +465,11 @@ def _safe_copy_vdi(session, sr_ref, instance, vdi_to_copy_ref):
|
||||
"""
|
||||
with _dummy_vm(session, instance, vdi_to_copy_ref) as vm_ref:
|
||||
label = "snapshot"
|
||||
|
||||
with snapshot_attached_here(
|
||||
session, instance, vm_ref, label) as vdi_uuids:
|
||||
params = {'sr_path': get_sr_path(session),
|
||||
'vdi_uuids': vdi_uuids,
|
||||
'uuid_stack': _make_uuid_stack()}
|
||||
|
||||
kwargs = {'params': pickle.dumps(params)}
|
||||
result = session.call_plugin(
|
||||
'workarounds', 'safe_copy_vdis', kwargs)
|
||||
imported_vhds = jsonutils.loads(result)
|
||||
imported_vhds = session.call_plugin_serialized(
|
||||
'workarounds', 'safe_copy_vdis', sr_path=get_sr_path(session),
|
||||
vdi_uuids=vdi_uuids, uuid_stack=_make_uuid_stack())
|
||||
|
||||
root_uuid = imported_vhds['root']['uuid']
|
||||
|
||||
@ -674,8 +666,7 @@ def upload_image(context, session, instance, vdi_uuids, image_id):
|
||||
'auth_token': getattr(context, 'auth_token', None),
|
||||
'properties': properties}
|
||||
|
||||
kwargs = {'params': pickle.dumps(params)}
|
||||
session.call_plugin('glance', 'upload_vhd', kwargs)
|
||||
session.call_plugin_serialized('glance', 'upload_vhd', **params)
|
||||
|
||||
|
||||
def resize_disk(session, instance, vdi_ref, instance_type):
|
||||
@ -971,9 +962,9 @@ def _fetch_using_dom0_plugin_with_retry(context, session, image_id,
|
||||
try:
|
||||
if callback:
|
||||
callback(params)
|
||||
kwargs = {'params': pickle.dumps(params)}
|
||||
result = session.call_plugin(plugin_name, 'download_vhd', kwargs)
|
||||
return jsonutils.loads(result)
|
||||
|
||||
return session.call_plugin_serialized(
|
||||
plugin_name, 'download_vhd', **params)
|
||||
except session.XenAPI.Failure as exc:
|
||||
_type, _method, error = exc.details[:3]
|
||||
if error == 'RetryableError':
|
||||
@ -2163,13 +2154,9 @@ def ensure_correct_host(session):
|
||||
|
||||
def move_disks(session, instance, disk_info):
|
||||
"""Move and possibly link VHDs via the XAPI plugin."""
|
||||
params = {'instance_uuid': instance['uuid'],
|
||||
'sr_path': get_sr_path(session),
|
||||
'uuid_stack': _make_uuid_stack()}
|
||||
|
||||
result = session.call_plugin(
|
||||
'migration', 'move_vhds_into_sr', {'params': pickle.dumps(params)})
|
||||
imported_vhds = jsonutils.loads(result)
|
||||
imported_vhds = session.call_plugin_serialized(
|
||||
'migration', 'move_vhds_into_sr', instance_uuid=instance['uuid'],
|
||||
sr_path=get_sr_path(session), uuid_stack=_make_uuid_stack())
|
||||
|
||||
# Now we rescan the SR so we find the VHDs
|
||||
scan_default_sr(session)
|
||||
|
@ -19,7 +19,6 @@
|
||||
Management class for VM-related functions (spawn, reboot, etc).
|
||||
"""
|
||||
|
||||
import cPickle as pickle
|
||||
import functools
|
||||
import itertools
|
||||
import time
|
||||
@ -610,16 +609,10 @@ class VMOps(object):
|
||||
LOG.debug(_("Migrating VHD '%(vdi_uuid)s' with seq_num %(seq_num)d"),
|
||||
locals(), instance=instance)
|
||||
instance_uuid = instance['uuid']
|
||||
params = {'host': dest,
|
||||
'vdi_uuid': vdi_uuid,
|
||||
'instance_uuid': instance_uuid,
|
||||
'sr_path': sr_path,
|
||||
'seq_num': seq_num}
|
||||
|
||||
try:
|
||||
_params = {'params': pickle.dumps(params)}
|
||||
self._session.call_plugin('migration', 'transfer_vhd',
|
||||
_params)
|
||||
self._session.call_plugin_serialized('migration', 'transfer_vhd',
|
||||
instance_uuid=instance_uuid, host=dest, vdi_uuid=vdi_uuid,
|
||||
sr_path=sr_path, seq_num=seq_num)
|
||||
except self._session.XenAPI.Failure:
|
||||
msg = _("Failed to transfer vhd to new host")
|
||||
raise exception.MigrationError(reason=msg)
|
||||
|
@ -20,18 +20,9 @@
|
||||
|
||||
"""Handle the uploading and downloading of images via Glance."""
|
||||
|
||||
import cPickle as pickle
|
||||
import httplib
|
||||
try:
|
||||
import json
|
||||
except ImportError:
|
||||
import simplejson as json
|
||||
import md5
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import urllib2
|
||||
import XenAPIPlugin
|
||||
|
||||
import utils
|
||||
|
||||
@ -202,19 +193,11 @@ def _upload_tarball(staging_path, image_id, glance_host, glance_port,
|
||||
conn.close()
|
||||
|
||||
|
||||
def download_vhd(session, args):
|
||||
def download_vhd(session, image_id, glance_host, glance_port, glance_use_ssl,
|
||||
uuid_stack, sr_path, auth_token):
|
||||
"""Download an image from Glance, unbundle it, and then deposit the VHDs
|
||||
into the storage repository
|
||||
"""
|
||||
params = pickle.loads(exists(args, 'params'))
|
||||
image_id = params["image_id"]
|
||||
glance_host = params["glance_host"]
|
||||
glance_port = params["glance_port"]
|
||||
glance_use_ssl = params["glance_use_ssl"]
|
||||
uuid_stack = params["uuid_stack"]
|
||||
sr_path = params["sr_path"]
|
||||
auth_token = params["auth_token"]
|
||||
|
||||
staging_path = utils.make_staging_area(sr_path)
|
||||
try:
|
||||
# Download tarball into staging area and extract it
|
||||
@ -223,28 +206,15 @@ def download_vhd(session, args):
|
||||
glance_use_ssl, auth_token)
|
||||
|
||||
# Move the VHDs from the staging area into the storage repository
|
||||
imported_vhds = utils.import_vhds(sr_path, staging_path, uuid_stack)
|
||||
return utils.import_vhds(sr_path, staging_path, uuid_stack)
|
||||
finally:
|
||||
utils.cleanup_staging_area(staging_path)
|
||||
|
||||
# Right now, it's easier to return a single string via XenAPI,
|
||||
# so we'll json encode the list of VHDs.
|
||||
return json.dumps(imported_vhds)
|
||||
|
||||
|
||||
def upload_vhd(session, args):
|
||||
def upload_vhd(session, vdi_uuids, image_id, glance_host, glance_port,
|
||||
glance_use_ssl, sr_path, auth_token, properties):
|
||||
"""Bundle the VHDs comprising an image and then stream them into Glance.
|
||||
"""
|
||||
params = pickle.loads(exists(args, 'params'))
|
||||
vdi_uuids = params["vdi_uuids"]
|
||||
image_id = params["image_id"]
|
||||
glance_host = params["glance_host"]
|
||||
glance_port = params["glance_port"]
|
||||
glance_use_ssl = params["glance_use_ssl"]
|
||||
sr_path = params["sr_path"]
|
||||
auth_token = params["auth_token"]
|
||||
properties = params["properties"]
|
||||
|
||||
staging_path = utils.make_staging_area(sr_path)
|
||||
try:
|
||||
utils.prepare_staging_area(sr_path, staging_path, vdi_uuids)
|
||||
@ -253,9 +223,6 @@ def upload_vhd(session, args):
|
||||
finally:
|
||||
utils.cleanup_staging_area(staging_path)
|
||||
|
||||
return "" # Nothing useful to return on an upload
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
XenAPIPlugin.dispatch({'upload_vhd': upload_vhd,
|
||||
'download_vhd': download_vhd})
|
||||
utils.register_plugin_calls(download_vhd, upload_vhd)
|
||||
|
@ -18,37 +18,18 @@
|
||||
"""
|
||||
XenAPI Plugin for transfering data between host nodes
|
||||
"""
|
||||
|
||||
import cPickle as pickle
|
||||
try:
|
||||
import json
|
||||
except ImportError:
|
||||
import simplejson as json
|
||||
import os
|
||||
import os.path
|
||||
import shlex
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
import XenAPIPlugin
|
||||
|
||||
import utils
|
||||
|
||||
from pluginlib_nova import *
|
||||
configure_logging('migration')
|
||||
|
||||
|
||||
def move_vhds_into_sr(session, args):
|
||||
def move_vhds_into_sr(session, instance_uuid, sr_path, uuid_stack):
|
||||
"""Moves the VHDs from their copied location to the SR"""
|
||||
params = pickle.loads(exists(args, 'params'))
|
||||
instance_uuid = params['instance_uuid']
|
||||
sr_path = params['sr_path']
|
||||
uuid_stack = params['uuid_stack']
|
||||
|
||||
staging_path = "/images/instance%s" % instance_uuid
|
||||
imported_vhds = utils.import_vhds(sr_path, staging_path, uuid_stack)
|
||||
utils.cleanup_staging_area(staging_path)
|
||||
return json.dumps(imported_vhds)
|
||||
return imported_vhds
|
||||
|
||||
|
||||
def _rsync_vhds(instance_uuid, host, staging_path, user="root"):
|
||||
@ -65,15 +46,8 @@ def _rsync_vhds(instance_uuid, host, staging_path, user="root"):
|
||||
utils.finish_subprocess(rsync_proc, rsync_cmd)
|
||||
|
||||
|
||||
def transfer_vhd(session, args):
|
||||
def transfer_vhd(session, instance_uuid, host, vdi_uuid, sr_path, seq_num):
|
||||
"""Rsyncs a VHD to an adjacent host"""
|
||||
params = pickle.loads(exists(args, 'params'))
|
||||
instance_uuid = params['instance_uuid']
|
||||
host = params['host']
|
||||
vdi_uuid = params['vdi_uuid']
|
||||
sr_path = params['sr_path']
|
||||
seq_num = params['seq_num']
|
||||
|
||||
staging_path = utils.make_staging_area(sr_path)
|
||||
try:
|
||||
utils.prepare_staging_area(
|
||||
@ -82,9 +56,6 @@ def transfer_vhd(session, args):
|
||||
finally:
|
||||
utils.cleanup_staging_area(staging_path)
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
XenAPIPlugin.dispatch({'transfer_vhd': transfer_vhd,
|
||||
'move_vhds_into_sr': move_vhds_into_sr})
|
||||
utils.register_plugin_calls(move_vhds_into_sr, transfer_vhd)
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
"""Various utilities used by XenServer plugins."""
|
||||
|
||||
import cPickle as pickle
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
@ -21,6 +22,7 @@ import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
import XenAPIPlugin
|
||||
|
||||
CHUNK_SIZE = 8192
|
||||
|
||||
@ -362,3 +364,21 @@ def extract_tarball(fileobj, path, callback=None):
|
||||
tar_proc.stdin.write(chunk)
|
||||
|
||||
finish_subprocess(tar_proc, tar_cmd)
|
||||
|
||||
|
||||
def _handle_serialization(func):
|
||||
def wrapped(session, params):
|
||||
params = pickle.loads(params['params'])
|
||||
rv = func(session, *params['args'], **params['kwargs'])
|
||||
return pickle.dumps(rv)
|
||||
return wrapped
|
||||
|
||||
|
||||
def register_plugin_calls(*funcs):
|
||||
"""Wrapper around XenAPIPlugin.dispatch which handles pickle
|
||||
serialization.
|
||||
"""
|
||||
wrapped_dict = {}
|
||||
for func in funcs:
|
||||
wrapped_dict[func.__name__] = _handle_serialization(func)
|
||||
XenAPIPlugin.dispatch(wrapped_dict)
|
||||
|
@ -17,21 +17,14 @@
|
||||
|
||||
"""Handle the uploading and downloading of images via Glance."""
|
||||
|
||||
import cPickle as pickle
|
||||
try:
|
||||
import json
|
||||
except ImportError:
|
||||
import simplejson as json
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import XenAPIPlugin
|
||||
|
||||
import utils
|
||||
|
||||
#FIXME(sirp): should this use pluginlib from 5.6?
|
||||
from pluginlib_nova import *
|
||||
configure_logging('hacks')
|
||||
configure_logging('workarounds')
|
||||
|
||||
|
||||
def _copy_vdis(sr_path, staging_path, vdi_uuids):
|
||||
@ -43,23 +36,14 @@ def _copy_vdis(sr_path, staging_path, vdi_uuids):
|
||||
seq_num += 1
|
||||
|
||||
|
||||
def safe_copy_vdis(session, args):
|
||||
params = pickle.loads(exists(args, 'params'))
|
||||
sr_path = params["sr_path"]
|
||||
vdi_uuids = params["vdi_uuids"]
|
||||
uuid_stack = params["uuid_stack"]
|
||||
|
||||
def safe_copy_vdis(session, sr_path, vdi_uuids, uuid_stack):
|
||||
staging_path = utils.make_staging_area(sr_path)
|
||||
try:
|
||||
_copy_vdis(sr_path, staging_path, vdi_uuids)
|
||||
imported_vhds = utils.import_vhds(sr_path, staging_path, uuid_stack)
|
||||
return utils.import_vhds(sr_path, staging_path, uuid_stack)
|
||||
finally:
|
||||
utils.cleanup_staging_area(staging_path)
|
||||
|
||||
# Right now, it's easier to return a single string via XenAPI,
|
||||
# so we'll json encode the list of VHDs.
|
||||
return json.dumps(imported_vhds)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
XenAPIPlugin.dispatch({'safe_copy_vdis': safe_copy_vdis})
|
||||
utils.register_plugin_calls(safe_copy_vdis)
|
||||
|
Loading…
x
Reference in New Issue
Block a user