Merge "Refactor the image transfer"
This commit is contained in:
commit
d4fa571c36
@ -17,16 +17,14 @@
|
||||
Functions and classes for image transfer between ESX/VC & image service.
|
||||
"""
|
||||
|
||||
import errno
|
||||
import logging
|
||||
import tarfile
|
||||
|
||||
from eventlet import event
|
||||
from eventlet import greenthread
|
||||
from eventlet import queue
|
||||
from eventlet import timeout
|
||||
|
||||
from oslo_utils import units
|
||||
from oslo_vmware._i18n import _
|
||||
from oslo_vmware.common import loopingcall
|
||||
from oslo_vmware import constants
|
||||
from oslo_vmware import exceptions
|
||||
from oslo_vmware import image_util
|
||||
@ -37,363 +35,40 @@ from oslo_vmware import vim_util
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
IMAGE_SERVICE_POLL_INTERVAL = 5
|
||||
FILE_READ_WRITE_TASK_SLEEP_TIME = 0.01
|
||||
BLOCKING_QUEUE_SIZE = 10
|
||||
NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec.
|
||||
CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer
|
||||
|
||||
|
||||
class BlockingQueue(queue.LightQueue):
|
||||
"""Producer-Consumer queue to share data between reader/writer threads."""
|
||||
|
||||
def __init__(self, max_size, max_transfer_size):
|
||||
"""Initializes the queue with the given parameters.
|
||||
|
||||
:param max_size: maximum queue size; if max_size is less than zero or
|
||||
None, the queue size is infinite.
|
||||
:param max_transfer_size: maximum amount of data that can be
|
||||
_transferred using this queue
|
||||
"""
|
||||
queue.LightQueue.__init__(self, max_size)
|
||||
self._max_transfer_size = max_transfer_size
|
||||
self._transferred = 0
|
||||
|
||||
def read(self, chunk_size):
|
||||
"""Read data from the queue.
|
||||
|
||||
This method blocks until data is available. The input chunk size is
|
||||
ignored since we have ensured that the data chunks written to the pipe
|
||||
by the image reader thread is the same as the chunks asked for by the
|
||||
image writer thread.
|
||||
"""
|
||||
if (self._max_transfer_size is 0 or
|
||||
self._transferred < self._max_transfer_size):
|
||||
data_item = self.get()
|
||||
self._transferred += len(data_item)
|
||||
return data_item
|
||||
else:
|
||||
LOG.debug("Completed transfer of size %s.", self._transferred)
|
||||
return ""
|
||||
|
||||
def write(self, data):
|
||||
"""Write data into the queue.
|
||||
|
||||
:param data: data to be written
|
||||
"""
|
||||
self.put(data)
|
||||
|
||||
# Below methods are provided in order to enable treating the queue
|
||||
# as a file handle.
|
||||
|
||||
def seek(self, offset, whence=0):
|
||||
"""Set the file's current position at the offset.
|
||||
|
||||
This method throws IOError since seek cannot be supported for a pipe.
|
||||
"""
|
||||
raise IOError(errno.ESPIPE, "Illegal seek")
|
||||
|
||||
def tell(self):
|
||||
"""Get the current file position."""
|
||||
return self._transferred
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return "blocking queue"
|
||||
|
||||
|
||||
class ImageWriter(object):
|
||||
"""Class to write the image to the image service from an input file."""
|
||||
|
||||
def __init__(self, context, input_file, image_service, image_id,
|
||||
image_meta=None):
|
||||
"""Initializes the image writer instance with given parameters.
|
||||
|
||||
:param context: write context needed by the image service
|
||||
:param input_file: file to read the image data from
|
||||
:param image_service: handle to image service
|
||||
:param image_id: ID of the image in the image service
|
||||
:param image_meta: image meta-data
|
||||
"""
|
||||
if not image_meta:
|
||||
image_meta = {}
|
||||
|
||||
self._context = context
|
||||
self._input_file = input_file
|
||||
self._image_service = image_service
|
||||
self._image_id = image_id
|
||||
self._image_meta = image_meta
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
"""Start the image write task.
|
||||
|
||||
:returns: the event indicating the status of the write task
|
||||
"""
|
||||
self._done = event.Event()
|
||||
|
||||
def _inner():
|
||||
"""Task performing the image write operation.
|
||||
|
||||
This method performs image data transfer through an update call.
|
||||
After the update, it waits until the image state becomes
|
||||
'active', 'killed' or unknown. If the final state is not 'active'
|
||||
an instance of ImageTransferException is thrown.
|
||||
|
||||
:raises: ImageTransferException
|
||||
"""
|
||||
LOG.debug("Calling image service update on image: %(image)s "
|
||||
"with meta: %(meta)s",
|
||||
{'image': self._image_id,
|
||||
'meta': self._image_meta})
|
||||
|
||||
try:
|
||||
self._image_service.update(self._context,
|
||||
self._image_id,
|
||||
self._image_meta,
|
||||
data=self._input_file)
|
||||
self._running = True
|
||||
while self._running:
|
||||
LOG.debug("Retrieving status of image: %s.",
|
||||
self._image_id)
|
||||
image_meta = self._image_service.show(self._context,
|
||||
self._image_id)
|
||||
image_status = image_meta.get('status')
|
||||
if image_status == 'active':
|
||||
self.stop()
|
||||
LOG.debug("Image: %s is now active.",
|
||||
self._image_id)
|
||||
self._done.send(True)
|
||||
elif image_status == 'killed':
|
||||
self.stop()
|
||||
excep_msg = (_("Image: %s is in killed state.") %
|
||||
self._image_id)
|
||||
LOG.error(excep_msg)
|
||||
excep = exceptions.ImageTransferException(excep_msg)
|
||||
self._done.send_exception(excep)
|
||||
elif image_status in ['saving', 'queued']:
|
||||
LOG.debug("Image: %(image)s is in %(state)s state; "
|
||||
"sleeping for %(sleep)d seconds.",
|
||||
{'image': self._image_id,
|
||||
'state': image_status,
|
||||
'sleep': IMAGE_SERVICE_POLL_INTERVAL})
|
||||
greenthread.sleep(IMAGE_SERVICE_POLL_INTERVAL)
|
||||
else:
|
||||
self.stop()
|
||||
excep_msg = (_("Image: %(image)s is in unknown "
|
||||
"state: %(state)s.") %
|
||||
{'image': self._image_id,
|
||||
'state': image_status})
|
||||
LOG.error(excep_msg)
|
||||
excep = exceptions.ImageTransferException(excep_msg)
|
||||
self._done.send_exception(excep)
|
||||
except Exception as excep:
|
||||
self.stop()
|
||||
excep_msg = (_("Error occurred while writing image: %s") %
|
||||
self._image_id)
|
||||
LOG.exception(excep_msg)
|
||||
excep = exceptions.ImageTransferException(excep_msg, excep)
|
||||
self._done.send_exception(excep)
|
||||
|
||||
LOG.debug("Starting image write task for image: %(image)s with"
|
||||
" source: %(source)s.",
|
||||
{'source': self._input_file,
|
||||
'image': self._image_id})
|
||||
greenthread.spawn(_inner)
|
||||
return self._done
|
||||
|
||||
def stop(self):
|
||||
"""Stop the image writing task."""
|
||||
LOG.debug("Stopping the writing task for image: %s.",
|
||||
self._image_id)
|
||||
self._running = False
|
||||
|
||||
def wait(self):
|
||||
"""Wait for the image writer task to complete.
|
||||
|
||||
This method returns True if the writer thread completes successfully.
|
||||
In case of error, it raises ImageTransferException.
|
||||
|
||||
:raises ImageTransferException
|
||||
"""
|
||||
return self._done.wait()
|
||||
|
||||
def close(self):
|
||||
"""This is a NOP."""
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
string = "Image Writer <source = %s, dest = %s>" % (self._input_file,
|
||||
self._image_id)
|
||||
return string
|
||||
|
||||
|
||||
class FileReadWriteTask(object):
|
||||
"""Task which reads data from the input file and writes to the output file.
|
||||
|
||||
This class defines the task which copies the given input file to the given
|
||||
output file. The copy operation involves reading chunks of data from the
|
||||
input file and writing the same to the output file.
|
||||
"""
|
||||
|
||||
def __init__(self, input_file, output_file):
|
||||
"""Initializes the read-write task with the given input parameters.
|
||||
|
||||
:param input_file: the input file handle
|
||||
:param output_file: the output file handle
|
||||
"""
|
||||
self._input_file = input_file
|
||||
self._output_file = output_file
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
"""Start the file read - file write task.
|
||||
|
||||
:returns: the event indicating the status of the read-write task
|
||||
"""
|
||||
self._done = event.Event()
|
||||
|
||||
def _inner():
|
||||
"""Task performing the file read-write operation."""
|
||||
self._running = True
|
||||
while self._running:
|
||||
try:
|
||||
data = self._input_file.read(rw_handles.READ_CHUNKSIZE)
|
||||
if not data:
|
||||
LOG.debug("File read-write task is done.")
|
||||
self.stop()
|
||||
self._done.send(True)
|
||||
self._output_file.write(data)
|
||||
|
||||
# update lease progress if applicable
|
||||
if hasattr(self._input_file, "update_progress"):
|
||||
self._input_file.update_progress()
|
||||
if hasattr(self._output_file, "update_progress"):
|
||||
self._output_file.update_progress()
|
||||
|
||||
greenthread.sleep(FILE_READ_WRITE_TASK_SLEEP_TIME)
|
||||
except Exception as excep:
|
||||
self.stop()
|
||||
excep_msg = _("Error occurred during file read-write "
|
||||
"task.")
|
||||
LOG.exception(excep_msg)
|
||||
excep = exceptions.ImageTransferException(excep_msg, excep)
|
||||
self._done.send_exception(excep)
|
||||
|
||||
LOG.debug("Starting file read-write task with source: %(source)s "
|
||||
"and destination: %(dest)s.",
|
||||
{'source': self._input_file,
|
||||
'dest': self._output_file})
|
||||
greenthread.spawn(_inner)
|
||||
return self._done
|
||||
|
||||
def stop(self):
|
||||
"""Stop the read-write task."""
|
||||
LOG.debug("Stopping the file read-write task.")
|
||||
self._running = False
|
||||
|
||||
def wait(self):
|
||||
"""Wait for the file read-write task to complete.
|
||||
|
||||
This method returns True if the read-write thread completes
|
||||
successfully. In case of error, it raises ImageTransferException.
|
||||
|
||||
:raises: ImageTransferException
|
||||
"""
|
||||
return self._done.wait()
|
||||
|
||||
def __str__(self):
|
||||
string = ("File Read-Write Task <source = %s, dest = %s>" %
|
||||
(self._input_file, self._output_file))
|
||||
return string
|
||||
|
||||
|
||||
# Functions to perform image transfer between VMware servers and image service.
|
||||
|
||||
|
||||
def _start_transfer(context, timeout_secs, read_file_handle, max_data_size,
|
||||
write_file_handle=None, image_service=None, image_id=None,
|
||||
image_meta=None):
|
||||
"""Start the image transfer.
|
||||
|
||||
The image reader reads the data from the image source and writes to the
|
||||
blocking queue. The image source is always a file handle (VmdkReadHandle
|
||||
or ImageReadHandle); therefore, a FileReadWriteTask is created for this
|
||||
transfer. The image writer reads the data from the blocking queue and
|
||||
writes it to the image destination. The image destination is either a
|
||||
file or VMDK in VMware datastore or an image in the image service.
|
||||
|
||||
If the destination is a file or VMDK in VMware datastore, the method
|
||||
creates a FileReadWriteTask which reads from the blocking queue and
|
||||
writes to either FileWriteHandle or VmdkWriteHandle. In the case of
|
||||
image service as the destination, an instance of ImageWriter task is
|
||||
created which reads from the blocking queue and writes to the image
|
||||
service.
|
||||
|
||||
:param context: write context needed for the image service
|
||||
:param timeout_secs: time in seconds to wait for the transfer to complete
|
||||
:param read_file_handle: handle to read data from
|
||||
:param max_data_size: maximum transfer size
|
||||
:param write_file_handle: handle to write data to; if this is None, then
|
||||
param image_service and param image_id should
|
||||
be set.
|
||||
:param image_service: image service handle
|
||||
:param image_id: ID of the image in the image service
|
||||
:param image_meta: image meta-data
|
||||
:raises: ImageTransferException, ValueError
|
||||
"""
|
||||
|
||||
# Create the blocking queue
|
||||
blocking_queue = BlockingQueue(BLOCKING_QUEUE_SIZE, max_data_size)
|
||||
|
||||
# Create the image reader
|
||||
reader = FileReadWriteTask(read_file_handle, blocking_queue)
|
||||
|
||||
# Create the image writer
|
||||
if write_file_handle:
|
||||
# File or VMDK in VMware datastore is the image destination
|
||||
writer = FileReadWriteTask(blocking_queue, write_file_handle)
|
||||
elif image_service and image_id:
|
||||
# Image service image is the destination
|
||||
writer = ImageWriter(context,
|
||||
blocking_queue,
|
||||
image_service,
|
||||
image_id,
|
||||
image_meta)
|
||||
else:
|
||||
excep_msg = _("No image destination given.")
|
||||
LOG.error(excep_msg)
|
||||
raise ValueError(excep_msg)
|
||||
|
||||
# Start the reader and writer
|
||||
LOG.debug("Starting image transfer with reader: %(reader)s and writer: "
|
||||
"%(writer)s",
|
||||
{'reader': reader,
|
||||
'writer': writer})
|
||||
reader.start()
|
||||
writer.start()
|
||||
def _start_transfer(read_handle, write_handle, timeout_secs):
|
||||
# write_handle could be an NFC lease, so we need to periodically
|
||||
# update its progress
|
||||
update_cb = getattr(write_handle, 'update_progress', lambda: None)
|
||||
updater = loopingcall.FixedIntervalLoopingCall(update_cb)
|
||||
timer = timeout.Timeout(timeout_secs)
|
||||
try:
|
||||
# Wait for the reader and writer to complete
|
||||
reader.wait()
|
||||
writer.wait()
|
||||
except (timeout.Timeout, exceptions.ImageTransferException) as excep:
|
||||
excep_msg = (_("Error occurred during image transfer with reader: "
|
||||
"%(reader)s and writer: %(writer)s") %
|
||||
{'reader': reader,
|
||||
'writer': writer})
|
||||
LOG.exception(excep_msg)
|
||||
reader.stop()
|
||||
writer.stop()
|
||||
|
||||
if isinstance(excep, exceptions.ImageTransferException):
|
||||
raise
|
||||
raise exceptions.ImageTransferException(excep_msg, excep)
|
||||
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||
while True:
|
||||
data = read_handle.read(CHUNK_SIZE)
|
||||
if not data:
|
||||
break
|
||||
write_handle.write(data)
|
||||
except timeout.Timeout as excep:
|
||||
msg = (_('Timeout, read_handle: "%(src)s", write_handle: "%(dest)s"') %
|
||||
{'src': read_handle,
|
||||
'dest': write_handle})
|
||||
LOG.exception(msg)
|
||||
raise exceptions.ImageTransferException(msg, excep)
|
||||
except Exception as excep:
|
||||
msg = (_('Error, read_handle: "%(src)s", write_handle: "%(dest)s"') %
|
||||
{'src': read_handle,
|
||||
'dest': write_handle})
|
||||
LOG.exception(msg)
|
||||
raise exceptions.ImageTransferException(msg, excep)
|
||||
finally:
|
||||
timer.cancel()
|
||||
read_file_handle.close()
|
||||
if write_file_handle:
|
||||
write_file_handle.close()
|
||||
updater.stop()
|
||||
read_handle.close()
|
||||
write_handle.close()
|
||||
|
||||
|
||||
def download_image(image, image_meta, session, datastore, rel_path,
|
||||
@ -427,8 +102,7 @@ def download_image(image, image_meta, session, datastore, rel_path,
|
||||
conn.write = conn.send
|
||||
|
||||
read_handle = rw_handles.ImageReadHandle(image)
|
||||
_start_transfer(None, timeout_secs, read_handle, image_size,
|
||||
write_file_handle=conn)
|
||||
_start_transfer(read_handle, conn, timeout_secs)
|
||||
|
||||
|
||||
def download_flat_image(context, timeout_secs, image_service, image_id,
|
||||
@ -458,11 +132,7 @@ def download_flat_image(context, timeout_secs, image_service, image_id,
|
||||
kwargs.get('file_path'),
|
||||
file_size,
|
||||
cacerts=kwargs.get('cacerts'))
|
||||
_start_transfer(context,
|
||||
timeout_secs,
|
||||
read_handle,
|
||||
file_size,
|
||||
write_file_handle=write_handle)
|
||||
_start_transfer(read_handle, write_handle, timeout_secs)
|
||||
LOG.debug("Downloaded image: %s from image service as a flat file.",
|
||||
image_id)
|
||||
|
||||
@ -490,11 +160,7 @@ def download_stream_optimized_data(context, timeout_secs, read_handle,
|
||||
kwargs.get('vm_folder'),
|
||||
kwargs.get('vm_import_spec'),
|
||||
file_size)
|
||||
_start_transfer(context,
|
||||
timeout_secs,
|
||||
read_handle,
|
||||
file_size,
|
||||
write_file_handle=write_handle)
|
||||
_start_transfer(read_handle, write_handle, timeout_secs)
|
||||
return write_handle.get_imported_vm()
|
||||
|
||||
|
||||
@ -578,8 +244,7 @@ def copy_stream_optimized_disk(
|
||||
kwargs.get('vm'),
|
||||
kwargs.get('vmdk_file_path'),
|
||||
file_size)
|
||||
_start_transfer(context, timeout_secs, read_handle, file_size,
|
||||
write_file_handle=write_handle)
|
||||
_start_transfer(read_handle, write_handle, timeout_secs)
|
||||
LOG.debug("Downloaded virtual disk: %s.", vmdk_file_path)
|
||||
|
||||
|
||||
@ -623,14 +288,12 @@ def upload_image(context, timeout_secs, image_service, image_id, owner_id,
|
||||
kwargs.get('image_version'),
|
||||
'vmware_disktype': 'streamOptimized',
|
||||
'owner_id': owner_id}}
|
||||
|
||||
# Passing 0 as the file size since data size to be transferred cannot be
|
||||
# predetermined.
|
||||
_start_transfer(context,
|
||||
timeout_secs,
|
||||
read_handle,
|
||||
0,
|
||||
image_service=image_service,
|
||||
image_id=image_id,
|
||||
image_meta=image_metadata)
|
||||
updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress)
|
||||
try:
|
||||
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||
image_service.update(context, image_id, image_metadata,
|
||||
data=read_handle)
|
||||
finally:
|
||||
updater.stop()
|
||||
read_handle.close()
|
||||
LOG.debug("Uploaded image: %s.", image_id)
|
||||
|
@ -17,280 +17,23 @@
|
||||
Unit tests for functions and classes for image transfer.
|
||||
"""
|
||||
|
||||
import math
|
||||
|
||||
from eventlet import greenthread
|
||||
from eventlet import timeout
|
||||
import mock
|
||||
import six
|
||||
|
||||
from oslo_vmware import exceptions
|
||||
from oslo_vmware import image_transfer
|
||||
from oslo_vmware import rw_handles
|
||||
from oslo_vmware.tests import base
|
||||
|
||||
|
||||
class BlockingQueueTest(base.TestCase):
|
||||
"""Tests for BlockingQueue."""
|
||||
|
||||
def test_read(self):
|
||||
max_size = 10
|
||||
chunk_size = 10
|
||||
max_transfer_size = 30
|
||||
queue = image_transfer.BlockingQueue(max_size, max_transfer_size)
|
||||
|
||||
def get_side_effect():
|
||||
return [1] * chunk_size
|
||||
|
||||
queue.get = mock.Mock(side_effect=get_side_effect)
|
||||
while True:
|
||||
data_item = queue.read(chunk_size)
|
||||
if not data_item:
|
||||
break
|
||||
|
||||
self.assertEqual(max_transfer_size, queue._transferred)
|
||||
exp_calls = [mock.call()] * int(math.ceil(float(max_transfer_size) /
|
||||
chunk_size))
|
||||
self.assertEqual(exp_calls, queue.get.call_args_list)
|
||||
|
||||
def test_write(self):
|
||||
queue = image_transfer.BlockingQueue(10, 30)
|
||||
queue.put = mock.Mock()
|
||||
write_count = 10
|
||||
for _ in range(0, write_count):
|
||||
queue.write([1])
|
||||
exp_calls = [mock.call([1])] * write_count
|
||||
self.assertEqual(exp_calls, queue.put.call_args_list)
|
||||
|
||||
def test_seek(self):
|
||||
queue = image_transfer.BlockingQueue(10, 30)
|
||||
self.assertRaises(IOError, queue.seek, 5)
|
||||
|
||||
def test_tell(self):
|
||||
queue = image_transfer.BlockingQueue(10, 30)
|
||||
self.assertEqual(0, queue.tell())
|
||||
queue.get = mock.Mock(return_value=[1] * 10)
|
||||
queue.read(10)
|
||||
self.assertEqual(10, queue.tell())
|
||||
|
||||
|
||||
class ImageWriterTest(base.TestCase):
|
||||
"""Tests for ImageWriter class."""
|
||||
|
||||
def _create_image_writer(self):
|
||||
self.image_service = mock.Mock()
|
||||
self.context = mock.Mock()
|
||||
self.input_file = mock.Mock()
|
||||
self.image_id = mock.Mock()
|
||||
return image_transfer.ImageWriter(self.context, self.input_file,
|
||||
self.image_service, self.image_id)
|
||||
|
||||
@mock.patch.object(greenthread, 'sleep')
|
||||
def test_start(self, mock_sleep):
|
||||
writer = self._create_image_writer()
|
||||
status_list = ['queued', 'saving', 'active']
|
||||
|
||||
def image_service_show_side_effect(context, image_id):
|
||||
status = status_list.pop(0)
|
||||
return {'status': status}
|
||||
|
||||
self.image_service.show.side_effect = image_service_show_side_effect
|
||||
exp_calls = [mock.call(self.context, self.image_id)] * len(status_list)
|
||||
writer.start()
|
||||
self.assertTrue(writer.wait())
|
||||
self.image_service.update.assert_called_once_with(self.context,
|
||||
self.image_id, {},
|
||||
data=self.input_file)
|
||||
self.assertEqual(exp_calls, self.image_service.show.call_args_list)
|
||||
|
||||
def test_start_with_killed_status(self):
|
||||
writer = self._create_image_writer()
|
||||
|
||||
def image_service_show_side_effect(_context, _image_id):
|
||||
return {'status': 'killed'}
|
||||
|
||||
self.image_service.show.side_effect = image_service_show_side_effect
|
||||
writer.start()
|
||||
self.assertRaises(exceptions.ImageTransferException,
|
||||
writer.wait)
|
||||
self.image_service.update.assert_called_once_with(self.context,
|
||||
self.image_id, {},
|
||||
data=self.input_file)
|
||||
self.image_service.show.assert_called_once_with(self.context,
|
||||
self.image_id)
|
||||
|
||||
def test_start_with_unknown_status(self):
|
||||
writer = self._create_image_writer()
|
||||
|
||||
def image_service_show_side_effect(_context, _image_id):
|
||||
return {'status': 'unknown'}
|
||||
|
||||
self.image_service.show.side_effect = image_service_show_side_effect
|
||||
writer.start()
|
||||
self.assertRaises(exceptions.ImageTransferException,
|
||||
writer.wait)
|
||||
self.image_service.update.assert_called_once_with(self.context,
|
||||
self.image_id, {},
|
||||
data=self.input_file)
|
||||
self.image_service.show.assert_called_once_with(self.context,
|
||||
self.image_id)
|
||||
|
||||
def test_start_with_image_service_show_exception(self):
|
||||
writer = self._create_image_writer()
|
||||
self.image_service.show.side_effect = RuntimeError()
|
||||
writer.start()
|
||||
self.assertRaises(exceptions.ImageTransferException, writer.wait)
|
||||
self.image_service.update.assert_called_once_with(self.context,
|
||||
self.image_id, {},
|
||||
data=self.input_file)
|
||||
self.image_service.show.assert_called_once_with(self.context,
|
||||
self.image_id)
|
||||
|
||||
|
||||
class FileReadWriteTaskTest(base.TestCase):
|
||||
"""Tests for FileReadWriteTask class."""
|
||||
|
||||
def test_start(self):
|
||||
data_items = [[1] * 10, [1] * 20, [1] * 5, []]
|
||||
|
||||
def input_file_read_side_effect(arg):
|
||||
self.assertEqual(arg, rw_handles.READ_CHUNKSIZE)
|
||||
data = data_items[input_file_read_side_effect.i]
|
||||
input_file_read_side_effect.i += 1
|
||||
return data
|
||||
|
||||
input_file_read_side_effect.i = 0
|
||||
input_file = mock.Mock()
|
||||
input_file.read.side_effect = input_file_read_side_effect
|
||||
output_file = mock.Mock()
|
||||
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
|
||||
rw_task.start()
|
||||
self.assertTrue(rw_task.wait())
|
||||
self.assertEqual(len(data_items), input_file.read.call_count)
|
||||
|
||||
exp_calls = []
|
||||
for i in range(0, len(data_items)):
|
||||
exp_calls.append(mock.call(data_items[i]))
|
||||
self.assertEqual(exp_calls, output_file.write.call_args_list)
|
||||
|
||||
self.assertEqual(len(data_items),
|
||||
input_file.update_progress.call_count)
|
||||
self.assertEqual(len(data_items),
|
||||
output_file.update_progress.call_count)
|
||||
|
||||
def test_start_with_read_exception(self):
|
||||
input_file = mock.Mock()
|
||||
input_file.read.side_effect = RuntimeError()
|
||||
output_file = mock.Mock()
|
||||
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
|
||||
rw_task.start()
|
||||
self.assertRaises(exceptions.ImageTransferException, rw_task.wait)
|
||||
input_file.read.assert_called_once_with(rw_handles.READ_CHUNKSIZE)
|
||||
|
||||
|
||||
class ImageTransferUtilityTest(base.TestCase):
|
||||
"""Tests for image_transfer utility methods."""
|
||||
|
||||
@mock.patch.object(timeout, 'Timeout')
|
||||
@mock.patch.object(image_transfer, 'ImageWriter')
|
||||
@mock.patch.object(image_transfer, 'FileReadWriteTask')
|
||||
@mock.patch.object(image_transfer, 'BlockingQueue')
|
||||
def test_start_transfer(self, fake_BlockingQueue, fake_FileReadWriteTask,
|
||||
fake_ImageWriter, fake_Timeout):
|
||||
|
||||
context = mock.Mock()
|
||||
read_file_handle = mock.Mock()
|
||||
read_file_handle.close = mock.Mock()
|
||||
image_service = mock.Mock()
|
||||
image_id = mock.Mock()
|
||||
blocking_queue = mock.Mock()
|
||||
|
||||
write_file_handle1 = mock.Mock()
|
||||
write_file_handle1.close = mock.Mock()
|
||||
write_file_handle2 = None
|
||||
write_file_handles = [write_file_handle1, write_file_handle2]
|
||||
|
||||
timeout_secs = 10
|
||||
blocking_queue_size = 10
|
||||
image_meta = {}
|
||||
max_data_size = 30
|
||||
|
||||
fake_BlockingQueue.return_value = blocking_queue
|
||||
fake_timer = mock.Mock()
|
||||
fake_timer.cancel = mock.Mock()
|
||||
fake_Timeout.return_value = fake_timer
|
||||
|
||||
for write_file_handle in write_file_handles:
|
||||
image_transfer._start_transfer(context,
|
||||
timeout_secs,
|
||||
read_file_handle,
|
||||
max_data_size,
|
||||
write_file_handle=write_file_handle,
|
||||
image_service=image_service,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta)
|
||||
|
||||
exp_calls = [mock.call(blocking_queue_size,
|
||||
max_data_size)] * len(write_file_handles)
|
||||
self.assertEqual(exp_calls,
|
||||
fake_BlockingQueue.call_args_list)
|
||||
|
||||
exp_calls2 = [mock.call(read_file_handle, blocking_queue),
|
||||
mock.call(blocking_queue, write_file_handle1),
|
||||
mock.call(read_file_handle, blocking_queue)]
|
||||
self.assertEqual(exp_calls2,
|
||||
fake_FileReadWriteTask.call_args_list)
|
||||
|
||||
exp_calls3 = mock.call(context, blocking_queue, image_service,
|
||||
image_id, image_meta)
|
||||
self.assertEqual(exp_calls3,
|
||||
fake_ImageWriter.call_args)
|
||||
|
||||
exp_calls4 = [mock.call(timeout_secs)] * len(write_file_handles)
|
||||
self.assertEqual(exp_calls4,
|
||||
fake_Timeout.call_args_list)
|
||||
|
||||
self.assertEqual(len(write_file_handles),
|
||||
fake_timer.cancel.call_count)
|
||||
|
||||
self.assertEqual(len(write_file_handles),
|
||||
read_file_handle.close.call_count)
|
||||
|
||||
write_file_handle1.close.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(image_transfer, 'FileReadWriteTask')
|
||||
@mock.patch.object(image_transfer, 'BlockingQueue')
|
||||
def test_start_transfer_with_no_image_destination(self, fake_BlockingQueue,
|
||||
fake_FileReadWriteTask):
|
||||
|
||||
context = mock.Mock()
|
||||
read_file_handle = mock.Mock()
|
||||
write_file_handle = None
|
||||
image_service = None
|
||||
image_id = None
|
||||
timeout_secs = 10
|
||||
image_meta = {}
|
||||
blocking_queue_size = 10
|
||||
max_data_size = 30
|
||||
blocking_queue = mock.Mock()
|
||||
|
||||
fake_BlockingQueue.return_value = blocking_queue
|
||||
|
||||
self.assertRaises(ValueError,
|
||||
image_transfer._start_transfer,
|
||||
context,
|
||||
timeout_secs,
|
||||
read_file_handle,
|
||||
max_data_size,
|
||||
write_file_handle=write_file_handle,
|
||||
image_service=image_service,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta)
|
||||
|
||||
fake_BlockingQueue.assert_called_once_with(blocking_queue_size,
|
||||
max_data_size)
|
||||
|
||||
fake_FileReadWriteTask.assert_called_once_with(read_file_handle,
|
||||
blocking_queue)
|
||||
def test_start_transfer(self):
|
||||
data = 'image-data-here'
|
||||
read_handle = six.StringIO(data)
|
||||
write_handle = mock.Mock()
|
||||
image_transfer._start_transfer(read_handle, write_handle, None)
|
||||
write_handle.write.assert_called_once_with(data)
|
||||
|
||||
@mock.patch('oslo_vmware.rw_handles.FileWriteHandle')
|
||||
@mock.patch('oslo_vmware.rw_handles.ImageReadHandle')
|
||||
@ -349,11 +92,9 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||
cacerts=None)
|
||||
|
||||
fake_transfer.assert_called_once_with(
|
||||
context,
|
||||
timeout_secs,
|
||||
fake_ImageReadHandle,
|
||||
image_size,
|
||||
write_file_handle=fake_FileWriteHandle)
|
||||
fake_FileWriteHandle,
|
||||
timeout_secs)
|
||||
|
||||
@mock.patch('oslo_vmware.rw_handles.VmdkWriteHandle')
|
||||
@mock.patch.object(image_transfer, '_start_transfer')
|
||||
@ -396,12 +137,9 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||
vm_import_spec,
|
||||
image_size)
|
||||
|
||||
fake_transfer.assert_called_once_with(
|
||||
context,
|
||||
timeout_secs,
|
||||
read_handle,
|
||||
image_size,
|
||||
write_file_handle=fake_VmdkWriteHandle)
|
||||
fake_transfer.assert_called_once_with(read_handle,
|
||||
fake_VmdkWriteHandle,
|
||||
timeout_secs)
|
||||
|
||||
fake_VmdkWriteHandle.get_imported_vm.assert_called_once_with()
|
||||
|
||||
@ -573,9 +311,8 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||
|
||||
vmdk_read_handle.assert_called_once_with(
|
||||
session, host, port, vm, vmdk_file_path, vmdk_size)
|
||||
start_transfer.assert_called_once_with(
|
||||
context, timeout, read_handle, vmdk_size,
|
||||
write_file_handle=write_handle)
|
||||
start_transfer.assert_called_once_with(read_handle, write_handle,
|
||||
timeout)
|
||||
|
||||
@mock.patch('oslo_vmware.rw_handles.VmdkReadHandle')
|
||||
@mock.patch.object(image_transfer, '_start_transfer')
|
||||
@ -601,7 +338,7 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||
image_name = 'fake_image'
|
||||
image_version = 1
|
||||
|
||||
fake_VmdkReadHandle = 'fake_VmdkReadHandle'
|
||||
fake_VmdkReadHandle = mock.Mock()
|
||||
fake_rw_handles_VmdkReadHandle.return_value = fake_VmdkReadHandle
|
||||
|
||||
image_transfer.upload_image(context,
|
||||
@ -633,10 +370,7 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||
'vmware_disktype': 'streamOptimized',
|
||||
'owner_id': owner_id}}
|
||||
|
||||
fake_transfer.assert_called_once_with(context,
|
||||
timeout_secs,
|
||||
fake_VmdkReadHandle,
|
||||
0,
|
||||
image_service=image_service,
|
||||
image_id=image_id,
|
||||
image_meta=image_metadata)
|
||||
image_service.update.assert_called_once_with(context,
|
||||
image_id,
|
||||
image_metadata,
|
||||
data=fake_VmdkReadHandle)
|
||||
|
Loading…
x
Reference in New Issue
Block a user