deb-oslo.vmware/tests/test_image_transfer.py
Davanum Srinivas a76bd253fb Support for IPv6 and Non-standard ports
IPv6 urls are formatted with '[' and ']' around the host and
the port actually shows up after the trailing ']' as well. So
we need to allow for better support for both ipv6 and non
standard urls when dealing with both SOAP and WSDL urls.

Examples:
   https://[fd2e:1201:1d9f:0:8886:78e0:38cd:cb03]/sdk
   https://[::1]:9443/sdk/

Closes-Bug: #1287292

Change-Id: I84ab38087ea77a30f6082d8c3fae0341e3371a78
2014-05-07 07:19:46 -04:00

517 lines
19 KiB
Python

# Copyright (c) 2014 VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for functions and classes for image transfer.
"""
import math
from eventlet import greenthread
from eventlet import timeout
import mock
from oslo.vmware import exceptions
from oslo.vmware import image_transfer
from oslo.vmware import rw_handles
from tests import base
class BlockingQueueTest(base.TestCase):
"""Tests for BlockingQueue."""
def test_read(self):
max_size = 10
chunk_size = 10
max_transfer_size = 30
queue = image_transfer.BlockingQueue(max_size, max_transfer_size)
def get_side_effect():
return [1] * chunk_size
queue.get = mock.Mock(side_effect=get_side_effect)
while True:
data_item = queue.read(chunk_size)
if not data_item:
break
self.assertEqual(max_transfer_size, queue._transferred)
exp_calls = [mock.call()] * int(math.ceil(float(max_transfer_size) /
chunk_size))
self.assertEqual(exp_calls, queue.get.call_args_list)
def test_write(self):
queue = image_transfer.BlockingQueue(10, 30)
queue.put = mock.Mock()
write_count = 10
for _ in range(0, write_count):
queue.write([1])
exp_calls = [mock.call([1])] * write_count
self.assertEqual(exp_calls, queue.put.call_args_list)
def test_tell(self):
max_transfer_size = 30
queue = image_transfer.BlockingQueue(10, 30)
self.assertEqual(max_transfer_size, queue.tell())
class ImageWriterTest(base.TestCase):
"""Tests for ImageWriter class."""
def _create_image_writer(self):
self.image_service = mock.Mock()
self.context = mock.Mock()
self.input_file = mock.Mock()
self.image_id = mock.Mock()
return image_transfer.ImageWriter(self.context, self.input_file,
self.image_service, self.image_id)
@mock.patch.object(greenthread, 'sleep')
def test_start(self, mock_sleep):
writer = self._create_image_writer()
status_list = ['queued', 'saving', 'active']
def image_service_show_side_effect(context, image_id):
status = status_list.pop(0)
return {'status': status}
self.image_service.show.side_effect = image_service_show_side_effect
exp_calls = [mock.call(self.context, self.image_id)] * len(status_list)
writer.start()
self.assertTrue(writer.wait())
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.assertEqual(exp_calls, self.image_service.show.call_args_list)
def test_start_with_killed_status(self):
writer = self._create_image_writer()
def image_service_show_side_effect(_context, _image_id):
return {'status': 'killed'}
self.image_service.show.side_effect = image_service_show_side_effect
writer.start()
self.assertRaises(exceptions.ImageTransferException,
writer.wait)
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.image_service.show.assert_called_once_with(self.context,
self.image_id)
def test_start_with_unknown_status(self):
writer = self._create_image_writer()
def image_service_show_side_effect(_context, _image_id):
return {'status': 'unknown'}
self.image_service.show.side_effect = image_service_show_side_effect
writer.start()
self.assertRaises(exceptions.ImageTransferException,
writer.wait)
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.image_service.show.assert_called_once_with(self.context,
self.image_id)
def test_start_with_image_service_show_exception(self):
writer = self._create_image_writer()
self.image_service.show.side_effect = RuntimeError()
writer.start()
self.assertRaises(exceptions.ImageTransferException, writer.wait)
self.image_service.update.assert_called_once_with(self.context,
self.image_id, {},
data=self.input_file)
self.image_service.show.assert_called_once_with(self.context,
self.image_id)
class FileReadWriteTaskTest(base.TestCase):
"""Tests for FileReadWriteTask class."""
def test_start(self):
data_items = [[1] * 10, [1] * 20, [1] * 5, []]
def input_file_read_side_effect(arg):
self.assertEqual(arg, rw_handles.READ_CHUNKSIZE)
data = data_items[input_file_read_side_effect.i]
input_file_read_side_effect.i += 1
return data
input_file_read_side_effect.i = 0
input_file = mock.Mock()
input_file.read.side_effect = input_file_read_side_effect
output_file = mock.Mock()
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
rw_task.start()
self.assertTrue(rw_task.wait())
self.assertEqual(len(data_items), input_file.read.call_count)
exp_calls = []
for i in range(0, len(data_items)):
exp_calls.append(mock.call(data_items[i]))
self.assertEqual(exp_calls, output_file.write.call_args_list)
self.assertEqual(len(data_items),
input_file.update_progress.call_count)
self.assertEqual(len(data_items),
output_file.update_progress.call_count)
def test_start_with_read_exception(self):
input_file = mock.Mock()
input_file.read.side_effect = RuntimeError()
output_file = mock.Mock()
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
rw_task.start()
self.assertRaises(exceptions.ImageTransferException, rw_task.wait)
input_file.read.assert_called_once_with(rw_handles.READ_CHUNKSIZE)
class ImageTransferUtilityTest(base.TestCase):
"""Tests for image_transfer utility methods."""
@mock.patch.object(timeout, 'Timeout')
@mock.patch.object(image_transfer, 'ImageWriter')
@mock.patch.object(image_transfer, 'FileReadWriteTask')
@mock.patch.object(image_transfer, 'BlockingQueue')
def test_start_transfer(self, fake_BlockingQueue, fake_FileReadWriteTask,
fake_ImageWriter, fake_Timeout):
context = mock.Mock()
read_file_handle = mock.Mock()
read_file_handle.close = mock.Mock()
image_service = mock.Mock()
image_id = mock.Mock()
blocking_queue = mock.Mock()
write_file_handle1 = mock.Mock()
write_file_handle1.close = mock.Mock()
write_file_handle2 = None
write_file_handles = [write_file_handle1, write_file_handle2]
timeout_secs = 10
blocking_queue_size = 10
image_meta = {}
max_data_size = 30
fake_BlockingQueue.return_value = blocking_queue
fake_timer = mock.Mock()
fake_timer.cancel = mock.Mock()
fake_Timeout.return_value = fake_timer
for write_file_handle in write_file_handles:
image_transfer._start_transfer(context,
timeout_secs,
read_file_handle,
max_data_size,
write_file_handle=write_file_handle,
image_service=image_service,
image_id=image_id,
image_meta=image_meta)
exp_calls = [mock.call(blocking_queue_size,
max_data_size)] * len(write_file_handles)
self.assertEqual(exp_calls,
fake_BlockingQueue.call_args_list)
exp_calls2 = [mock.call(read_file_handle, blocking_queue),
mock.call(blocking_queue, write_file_handle1),
mock.call(read_file_handle, blocking_queue)]
self.assertEqual(exp_calls2,
fake_FileReadWriteTask.call_args_list)
exp_calls3 = mock.call(context, blocking_queue, image_service,
image_id, image_meta)
self.assertEqual(exp_calls3,
fake_ImageWriter.call_args)
exp_calls4 = [mock.call(timeout_secs)] * len(write_file_handles)
self.assertEqual(exp_calls4,
fake_Timeout.call_args_list)
self.assertEqual(len(write_file_handles),
fake_timer.cancel.call_count)
self.assertEqual(len(write_file_handles),
read_file_handle.close.call_count)
write_file_handle1.close.assert_called_once()
@mock.patch.object(image_transfer, 'FileReadWriteTask')
@mock.patch.object(image_transfer, 'BlockingQueue')
def test_start_transfer_with_no_image_destination(self, fake_BlockingQueue,
fake_FileReadWriteTask):
context = mock.Mock()
read_file_handle = mock.Mock()
write_file_handle = None
image_service = None
image_id = None
timeout_secs = 10
image_meta = {}
blocking_queue_size = 10
max_data_size = 30
blocking_queue = mock.Mock()
fake_BlockingQueue.return_value = blocking_queue
self.assertRaises(ValueError,
image_transfer._start_transfer,
context,
timeout_secs,
read_file_handle,
max_data_size,
write_file_handle=write_file_handle,
image_service=image_service,
image_id=image_id,
image_meta=image_meta)
fake_BlockingQueue.assert_called_once_with(blocking_queue_size,
max_data_size)
fake_FileReadWriteTask.assert_called_once_with(read_file_handle,
blocking_queue)
@mock.patch('oslo.vmware.rw_handles.FileWriteHandle')
@mock.patch('oslo.vmware.rw_handles.ImageReadHandle')
@mock.patch.object(image_transfer, '_start_transfer')
def test_download_flat_image(
self,
fake_transfer,
fake_rw_handles_ImageReadHandle,
fake_rw_handles_FileWriteHandle):
context = mock.Mock()
image_id = mock.Mock()
image_service = mock.Mock()
image_service.download = mock.Mock()
image_service.download.return_value = 'fake_iter'
fake_ImageReadHandle = 'fake_ImageReadHandle'
fake_FileWriteHandle = 'fake_FileWriteHandle'
cookies = []
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
dc_path = 'dc1'
ds_name = 'ds1'
file_path = '/fake_path'
fake_rw_handles_ImageReadHandle.return_value = fake_ImageReadHandle
fake_rw_handles_FileWriteHandle.return_value = fake_FileWriteHandle
image_transfer.download_flat_image(
context,
timeout_secs,
image_service,
image_id,
image_size=image_size,
host=host,
port=port,
data_center_name=dc_path,
datastore_name=ds_name,
cookies=cookies,
file_path=file_path)
image_service.download.assert_called_once_with(context, image_id)
fake_rw_handles_ImageReadHandle.assert_called_once_with('fake_iter')
fake_rw_handles_FileWriteHandle.assert_called_once_with(
host,
port,
dc_path,
ds_name,
cookies,
file_path,
image_size)
fake_transfer.assert_called_once_with(
context,
timeout_secs,
fake_ImageReadHandle,
image_size,
write_file_handle=fake_FileWriteHandle)
@mock.patch('oslo.vmware.rw_handles.VmdkWriteHandle')
@mock.patch.object(image_transfer, '_start_transfer')
def test_download_stream_optimized_data(self, fake_transfer,
fake_rw_handles_VmdkWriteHandle):
context = mock.Mock()
session = mock.Mock()
read_handle = mock.Mock()
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
resource_pool = 'rp-1'
vm_folder = 'folder-1'
vm_import_spec = None
fake_VmdkWriteHandle = mock.Mock()
fake_VmdkWriteHandle.get_imported_vm = mock.Mock()
fake_rw_handles_VmdkWriteHandle.return_value = fake_VmdkWriteHandle
image_transfer.download_stream_optimized_data(
context,
timeout_secs,
read_handle,
session=session,
host=host,
port=port,
resource_pool=resource_pool,
vm_folder=vm_folder,
vm_import_spec=vm_import_spec,
image_size=image_size)
fake_rw_handles_VmdkWriteHandle.assert_called_once_with(
session,
host,
port,
resource_pool,
vm_folder,
vm_import_spec,
image_size)
fake_transfer.assert_called_once_with(
context,
timeout_secs,
read_handle,
image_size,
write_file_handle=fake_VmdkWriteHandle)
fake_VmdkWriteHandle.get_imported_vm.assert_called_once()
@mock.patch('oslo.vmware.rw_handles.ImageReadHandle')
@mock.patch.object(image_transfer, 'download_stream_optimized_data')
def test_download_stream_optimized_image(
self, fake_download_stream_optimized_data,
fake_rw_handles_ImageReadHandle):
context = mock.Mock()
session = mock.Mock()
image_id = mock.Mock()
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
resource_pool = 'rp-1'
vm_folder = 'folder-1'
vm_import_spec = None
fake_iter = 'fake_iter'
image_service = mock.Mock()
image_service.download = mock.Mock()
image_service.download.return_value = fake_iter
fake_ImageReadHandle = 'fake_ImageReadHandle'
fake_rw_handles_ImageReadHandle.return_value = fake_ImageReadHandle
image_transfer.download_stream_optimized_image(
context,
timeout_secs,
image_service,
image_id,
session=session,
host=host,
port=port,
resource_pool=resource_pool,
vm_folder=vm_folder,
vm_import_spec=vm_import_spec,
image_size=image_size)
image_service.download.assert_called_once_with(context, image_id)
fake_rw_handles_ImageReadHandle.assert_called_once_with(fake_iter)
fake_download_stream_optimized_data.assert_called_once_with(
context,
timeout_secs,
fake_ImageReadHandle,
session=session,
host=host,
port=port,
resource_pool=resource_pool,
vm_folder=vm_folder,
vm_import_spec=vm_import_spec,
image_size=image_size)
@mock.patch('oslo.vmware.rw_handles.VmdkReadHandle')
@mock.patch.object(image_transfer, '_start_transfer')
def test_upload_image(self, fake_transfer, fake_rw_handles_VmdkReadHandle):
context = mock.Mock()
image_id = mock.Mock()
owner_id = mock.Mock()
session = mock.Mock()
vm = mock.Mock()
image_service = mock.Mock()
timeout_secs = 10
image_size = 1000
host = '127.0.0.1'
port = 443
file_path = '/fake_path'
is_public = False
image_name = 'fake_image'
image_version = 1
fake_VmdkReadHandle = 'fake_VmdkReadHandle'
fake_rw_handles_VmdkReadHandle.return_value = fake_VmdkReadHandle
image_transfer.upload_image(context,
timeout_secs,
image_service,
image_id,
owner_id,
session=session,
host=host,
port=port,
vm=vm,
vmdk_file_path=file_path,
vmdk_size=image_size,
is_public=is_public,
image_name=image_name,
image_version=image_version)
fake_rw_handles_VmdkReadHandle.assert_called_once_with(session,
host,
port,
vm,
file_path,
image_size)
image_metadata = {'disk_format': 'vmdk',
'is_public': is_public,
'name': image_name,
'status': 'active',
'container_format': 'bare',
'size': 0,
'properties': {'vmware_image_version': image_version,
'vmware_disktype': 'streamOptimized',
'owner_id': owner_id}}
fake_transfer.assert_called_once_with(context,
timeout_secs,
fake_VmdkReadHandle,
0,
image_service=image_service,
image_id=image_id,
image_meta=image_metadata)