cinder/cinder/tests/test_create_volume_flow.py

132 lines
4.4 KiB
Python

# Copyright 2013 Canonical Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Tests for create_volume TaskFlow """
import time
import mock
from cinder import context
from cinder import test
from cinder.volume.flows import create_volume
class fake_scheduler_rpc_api(object):
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
class fake_volume_api(object):
def __init__(self, expected_spec, test_inst):
self.expected_spec = expected_spec
self.test_inst = test_inst
def create_volume(self, ctxt, volume, host,
request_spec, filter_properties,
allow_reschedule=True,
snapshot_id=None, image_id=None,
source_volid=None):
self.test_inst.assertEqual(self.expected_spec, request_spec)
self.test_inst.assertEqual(request_spec['source_volid'], source_volid)
self.test_inst.assertEqual(request_spec['snapshot_id'], snapshot_id)
self.test_inst.assertEqual(request_spec['image_id'], image_id)
class fake_db(object):
def volume_get(self, *args, **kwargs):
return {'host': 'barf'}
def volume_update(self, *args, **kwargs):
return {'host': 'farb'}
def snapshot_get(self, *args, **kwargs):
return {'volume_id': 1}
class CreateVolumeFlowTestCase(test.TestCase):
def time_inc(self):
self.counter += 1
return self.counter
def setUp(self):
super(CreateVolumeFlowTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.counter = float(0)
# Ensure that time.time() always returns more than the last time it was
# called to avoid div by zero errors.
self.counter = float(0)
self.stubs.Set(time, 'time', self.time_inc)
def test_exception_to_unicode(self):
class FakeException(Exception):
def __str__(self):
raise UnicodeError()
exc = Exception('error message')
ret = create_volume._exception_to_unicode(exc)
self.assertEqual(unicode, type(ret))
self.assertEqual(ret, 'error message')
exc = Exception('\xa5 error message')
ret = create_volume._exception_to_unicode(exc)
self.assertEqual(unicode, type(ret))
self.assertEqual(ret, ' error message')
unicodeExc = FakeException('\xa5 error message')
ret = create_volume._exception_to_unicode(unicodeExc)
self.assertEqual(unicode, type(ret))
self.assertEqual(ret, _("Caught '%(exception)s' exception.") %
{'exception': 'FakeException'})
def test_cast_create_volume(self):
props = {}
spec = {'volume_id': None,
'source_volid': None,
'snapshot_id': None,
'image_id': None}
task = create_volume.VolumeCastTask(fake_scheduler_rpc_api(spec, self),
fake_volume_api(spec, self),
fake_db())
task._cast_create_volume(self.ctxt, spec, props)
spec = {'volume_id': 1,
'source_volid': 2,
'snapshot_id': 3,
'image_id': 4}
task = create_volume.VolumeCastTask(fake_scheduler_rpc_api(spec, self),
fake_volume_api(spec, self),
fake_db())
task._cast_create_volume(self.ctxt, spec, props)
def tearDown(self):
self.stubs.UnsetAll()
super(CreateVolumeFlowTestCase, self).tearDown()