Update cinderlib

Sync the cinderlib repository with patches proposed to the Cinder
repository:

- https://review.openstack.org/620669
- https://review.openstack.org/620670
- https://review.openstack.org/620671
This commit is contained in:
Gorka Eguileor 2019-01-14 12:22:06 +01:00 committed by Gorka Eguileor
parent c491c71f47
commit 85776225cb
37 changed files with 1795 additions and 248 deletions

View File

@ -52,13 +52,13 @@ python-requirements:
pip install -e .
lint: python-requirements ## check style with flake8
flake8 cinderlib tests
flake8 cinderlib
unit-tests:
tox -epy27
functional-tests:
unit2 discover -v -s tests/functional
CL_FTEST_CFG=`pwd`/tools/lvm.yaml unit2 discover -v -s cinderlib/tests/functional
test-all: ## run tests on every Python version with tox
tox

View File

@ -1,12 +1,26 @@
# Copyright (c) 2018, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from cinderlib import workarounds # noqa
from cinderlib import cinderlib
from cinderlib import serialization
from cinderlib import objects
import pkg_resources
__author__ = """Gorka Eguileor"""
__email__ = 'geguileo@redhat.com'
__version__ = '0.2.2'
from cinderlib import cinderlib
from cinderlib import objects
from cinderlib import serialization
from cinderlib import workarounds # noqa
__version__ = pkg_resources.get_distribution('cinder').version
DEFAULT_PROJECT_ID = objects.DEFAULT_PROJECT_ID
DEFAULT_USER_ID = objects.DEFAULT_USER_ID
@ -23,3 +37,5 @@ dumps = serialization.dumps
setup = cinderlib.setup
Backend = cinderlib.Backend
get_connector_properties = objects.brick_connector.get_connector_properties

View File

@ -17,19 +17,26 @@ from __future__ import absolute_import
import json as json_lib
import logging
import os
import six
from cinder import coordination
# NOTE(geguileo): If we want to prevent eventlet from monkey_patching we would
# need to do something about volume's L27-32.
# NOTE(geguileo): Probably a good idea not to depend on cinder.cmd.volume
# having all the other imports as they could change.
from cinder.cmd import volume as volume_cmd
from cinder.db import api as db_api
from cinder import objects as cinder_objects
# We need this here until we remove from cinder/volume/manager.py:
# VA_LIST = objects.VolumeAttachmentList
cinder_objects.register_all() # noqa
from cinder import utils
from cinder.volume import configuration
import nos_brick
from cinder.volume import manager
from oslo_config import cfg
from oslo_log import log as oslo_logging
from oslo_utils import importutils
import urllib3
import cinderlib
from cinderlib import nos_brick
from cinderlib import objects
from cinderlib import persistence
from cinderlib import serialization
@ -54,6 +61,9 @@ class Backend(object):
"""
backends = {}
global_initialization = False
# Some drivers try access the DB directly for extra specs on creation.
# With this dictionary the DB class can get the necessary data
_volumes_inflight = {}
def __init__(self, volume_backend_name, **driver_cfg):
if not self.global_initialization:
@ -61,13 +71,13 @@ class Backend(object):
driver_cfg['volume_backend_name'] = volume_backend_name
Backend.backends[volume_backend_name] = self
conf = self._get_config(**driver_cfg)
conf = self._set_backend_config(driver_cfg)
self.driver = importutils.import_object(
conf.volume_driver,
configuration=conf,
db=self.persistence.db,
host='%s@%s' % (objects.CONFIGURED_HOST, volume_backend_name),
cluster_name=None, # No clusters for now: volume_cmd.CONF.cluster,
host='%s@%s' % (cfg.CONF.host, volume_backend_name),
cluster_name=None, # We don't user cfg.CONF.cluster for now
active_backend_id=None) # No failover for now
self.driver.do_setup(objects.CONTEXT)
self.driver.check_for_setup_error()
@ -79,7 +89,7 @@ class Backend(object):
# init_capabilities already calls get_volume_stats with refresh=True
# so we can call it without refresh to get pool names.
self._pool_names = tuple(pool['pool_name']
for pool in self.get_volume_stats()['pools'])
for pool in self.stats()['pools'])
@property
def pool_names(self):
@ -107,8 +117,18 @@ class Backend(object):
volume_name=volume_name)
def stats(self, refresh=False):
stats = self.driver.get_volume_stats(refresh=refresh)
return stats
stats_data = self.driver.get_volume_stats(refresh=refresh)
# Fill pools for legacy driver reports
if stats_data and 'pools' not in stats_data:
pool = stats_data.copy()
pool['pool_name'] = self.id
for key in ('driver_version', 'shared_targets',
'sparse_copy_volume', 'storage_protocol',
'vendor_name', 'volume_backend_name'):
pool.pop(key, None)
stats_data['pools'] = [pool]
return stats_data
def create_volume(self, size, name='', description='', bootable=False,
**kwargs):
@ -125,9 +145,14 @@ class Backend(object):
del self._volumes[i]
break
@classmethod
def _start_creating_volume(cls, volume):
cls._volumes_inflight[volume.id] = volume
def _volume_created(self, volume):
if self._volumes is not None:
self._volumes.append(volume)
self._volumes_inflight.pop(volume.id, None)
def validate_connector(self, connector_dict):
"""Raise exception if missing info for volume's connect call."""
@ -144,12 +169,83 @@ class Backend(object):
for backend in cls.backends.values():
backend.driver.db = cls.persistence.db
# Replace the standard DB implementation instance with the one from
# the persistence plugin.
db_api.IMPL = cls.persistence.db
# NOTE(geguileo): Staticmethod used instead of classmethod to make it work
# on Python3 when assigning the unbound method.
@staticmethod
def _config_parse(self):
"""Replacer oslo_config.cfg.ConfigParser.parse for in-memory cfg."""
res = super(cfg.ConfigParser, self).parse(Backend._config_string_io)
return res
@classmethod
def _update_cinder_config(cls):
"""Parse in-memory file to update OSLO configuration used by Cinder."""
cls._config_string_io.seek(0)
cls._parser.write(cls._config_string_io)
cls._config_string_io.seek(0)
cfg.CONF.reload_config_files()
@classmethod
def _set_cinder_config(cls, host, locks_path, cinder_config_params):
"""Setup the parser with all the known Cinder configuration."""
cfg.CONF.set_default('state_path', os.getcwd())
cfg.CONF.set_default('lock_path', '$state_path', 'oslo_concurrency')
cls._parser = six.moves.configparser.SafeConfigParser()
cls._parser.set('DEFAULT', 'enabled_backends', '')
if locks_path:
cls._parser.add_section('oslo_concurrency')
cls._parser.set('oslo_concurrency', 'lock_path', locks_path)
cls._parser.add_section('coordination')
cls._parser.set('coordination',
'backend_url',
'file://' + locks_path)
if host:
cls._parser.set('DEFAULT', 'host', host)
# All other configuration options go into the DEFAULT section
for key, value in cinder_config_params.items():
if not isinstance(value, six.string_types):
value = six.text_type(value)
cls._parser.set('DEFAULT', key, value)
# We replace the OSLO's default parser to read from a StringIO instead
# of reading from a file.
cls._config_string_io = six.moves.StringIO()
cfg.ConfigParser.parse = six.create_unbound_method(cls._config_parse,
cfg.ConfigParser)
# Update the configuration with the options we have configured
cfg.CONF(project='cinder', version=cinderlib.__version__,
default_config_files=['in_memory_file'])
cls._update_cinder_config()
def _set_backend_config(self, driver_cfg):
backend_name = driver_cfg['volume_backend_name']
self._parser.add_section(backend_name)
for key, value in driver_cfg.items():
if not isinstance(value, six.string_types):
value = six.text_type(value)
self._parser.set(backend_name, key, value)
self._parser.set('DEFAULT', 'enabled_backends',
','.join(self.backends.keys()))
self._update_cinder_config()
config = configuration.Configuration(manager.volume_backend_opts,
config_group=backend_name)
return config
@classmethod
def global_setup(cls, file_locks_path=None, root_helper='sudo',
suppress_requests_ssl_warnings=True, disable_logs=True,
non_uuid_ids=False, output_all_backend_info=False,
project_id=None, user_id=None, persistence_config=None,
fail_on_missing_backend=True, host=None, **log_params):
fail_on_missing_backend=True, host=None,
**cinder_config_params):
# Global setup can only be set once
if cls.global_initialization:
raise Exception('Already setup')
@ -159,20 +255,15 @@ class Backend(object):
cls.project_id = project_id
cls.user_id = user_id
cls.non_uuid_ids = non_uuid_ids
objects.CONFIGURED_HOST = host or volume_cmd.CONF.host
cls.set_persistence(persistence_config)
volume_cmd.CONF.version = volume_cmd.version.version_string()
volume_cmd.CONF.register_opt(
configuration.cfg.StrOpt('stateless_cinder'),
group=configuration.SHARED_CONF_GROUP)
cls._set_cinder_config(host, file_locks_path, cinder_config_params)
serialization.setup(cls)
cls._set_logging(disable_logs, **log_params)
cls._set_logging(disable_logs)
cls._set_priv_helper(root_helper)
cls._set_coordinator(file_locks_path)
coordination.COORDINATOR.start()
if suppress_requests_ssl_warnings:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@ -182,43 +273,21 @@ class Backend(object):
cls.global_initialization = True
cls.output_all_backend_info = output_all_backend_info
def _get_config(self, volume_backend_name, **kwargs):
volume_cmd.CONF.register_opt(volume_cmd.host_opt,
group=volume_backend_name)
backend_opts = getattr(volume_cmd.CONF, volume_backend_name)
for key, value in kwargs.items():
setattr(backend_opts, key, value)
config = configuration.Configuration([],
config_group=volume_backend_name)
return config
@classmethod
def _set_logging(cls, disable_logs, **log_params):
def _set_logging(cls, disable_logs):
if disable_logs:
logging.Logger.disabled = property(lambda s: True,
lambda s, x: None)
return
for key, value in log_params.items():
volume_cmd.CONF.set_override(key, value)
volume_cmd.logging.setup(volume_cmd.CONF, 'cinder')
volume_cmd.python_logging.captureWarnings(True)
oslo_logging.setup(cfg.CONF, 'cinder')
logging.captureWarnings(True)
@classmethod
def _set_priv_helper(cls, root_helper):
utils.get_root_helper = lambda: root_helper
nos_brick.init(root_helper)
@classmethod
def _set_coordinator(cls, file_locks_path):
file_locks_path = file_locks_path or os.getcwd()
volume_cmd.CONF.set_override('lock_path', file_locks_path,
'oslo_concurrency')
volume_cmd.CONF.set_override('backend_url',
'file://' + file_locks_path,
'coordination')
coordination.COORDINATOR.start()
@property
def config(self):
if self.output_all_backend_info:

View File

@ -20,10 +20,18 @@ NotFound = exception.NotFound
VolumeNotFound = exception.VolumeNotFound
SnapshotNotFound = exception.SnapshotNotFound
ConnectionNotFound = exception.VolumeAttachmentNotFound
InvalidVolume = exception.InvalidVolume
class InvalidPersistence(Exception):
__msg = 'Invalid persistence storage: %s'
__msg = 'Invalid persistence storage: %s.'
def __init__(self, name):
super(InvalidPersistence, self).__init__(self.__msg % name)
class NotLocal(Exception):
__msg = "Volume %s doesn't seem to be attached locally."
def __init__(self, name):
super(NotLocal, self).__init__(self.__msg % name)

164
cinderlib/nos_brick.py Normal file
View File

@ -0,0 +1,164 @@
# Copyright (c) 2018, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Helper code to attach/detach out of OpenStack
OS-Brick is meant to be used within OpenStack, which means that there are some
issues when using it on non OpenStack systems.
Here we take care of:
- Making sure we can work without privsep and using sudo directly
- Replacing an unlink privsep method that would run python code privileged
- Local attachment of RBD volumes using librados
Some of these changes may be later moved to OS-Brick. For now we just copied it
from the nos-brick repository.
"""
import functools
import os
from os_brick import exception
from os_brick.initiator import connector
from os_brick.initiator import connectors
from os_brick.privileged import rootwrap
from oslo_concurrency import processutils as putils
from oslo_privsep import priv_context
from oslo_utils import fileutils
class RBDConnector(connectors.rbd.RBDConnector):
""""Connector class to attach/detach RBD volumes locally.
OS-Brick's implementation covers only 2 cases:
- Local attachment on controller node.
- Returning a file object on non controller nodes.
We need a third one, local attachment on non controller node.
"""
def connect_volume(self, connection_properties):
# NOTE(e0ne): sanity check if ceph-common is installed.
try:
self._execute('which', 'rbd')
except putils.ProcessExecutionError:
msg = 'ceph-common package not installed'
raise exception.BrickException(msg)
# Extract connection parameters and generate config file
try:
user = connection_properties['auth_username']
pool, volume = connection_properties['name'].split('/')
cluster_name = connection_properties.get('cluster_name')
monitor_ips = connection_properties.get('hosts')
monitor_ports = connection_properties.get('ports')
keyring = connection_properties.get('keyring')
except IndexError:
msg = 'Malformed connection properties'
raise exception.BrickException(msg)
conf = self._create_ceph_conf(monitor_ips, monitor_ports,
str(cluster_name), user,
keyring)
# Map RBD volume if it's not already mapped
rbd_dev_path = self.get_rbd_device_name(pool, volume)
if (not os.path.islink(rbd_dev_path) or
not os.path.exists(os.path.realpath(rbd_dev_path))):
cmd = ['rbd', 'map', volume, '--pool', pool, '--conf', conf]
cmd += self._get_rbd_args(connection_properties)
self._execute(*cmd, root_helper=self._root_helper,
run_as_root=True)
return {'path': os.path.realpath(rbd_dev_path),
'conf': conf,
'type': 'block'}
def check_valid_device(self, path, run_as_root=True):
"""Verify an existing RBD handle is connected and valid."""
try:
self._execute('dd', 'if=' + path, 'of=/dev/null', 'bs=4096',
'count=1', root_helper=self._root_helper,
run_as_root=True)
except putils.ProcessExecutionError:
return False
return True
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
pool, volume = connection_properties['name'].split('/')
conf_file = device_info['conf']
dev_name = self.get_rbd_device_name(pool, volume)
cmd = ['rbd', 'unmap', dev_name, '--conf', conf_file]
cmd += self._get_rbd_args(connection_properties)
self._execute(*cmd, root_helper=self._root_helper,
run_as_root=True)
fileutils.delete_if_exists(conf_file)
ROOT_HELPER = 'sudo'
def unlink_root(*links, **kwargs):
no_errors = kwargs.get('no_errors', False)
raise_at_end = kwargs.get('raise_at_end', False)
exc = exception.ExceptionChainer()
catch_exception = no_errors or raise_at_end
for link in links:
with exc.context(catch_exception, 'Unlink failed for %s', link):
putils.execute('unlink', link, run_as_root=True,
root_helper=ROOT_HELPER)
if not no_errors and raise_at_end and exc:
raise exc
def init(root_helper='sudo'):
global ROOT_HELPER
ROOT_HELPER = root_helper
priv_context.init(root_helper=[root_helper])
existing_bgcp = connector.get_connector_properties
existing_bcp = connector.InitiatorConnector.factory
def my_bgcp(*args, **kwargs):
if len(args):
args = list(args)
args[0] = ROOT_HELPER
else:
kwargs['root_helper'] = ROOT_HELPER
kwargs['execute'] = rootwrap.custom_execute
return existing_bgcp(*args, **kwargs)
def my_bgc(protocol, *args, **kwargs):
if len(args):
# args is a tuple and we cannot do assignments
args = list(args)
args[0] = ROOT_HELPER
else:
kwargs['root_helper'] = ROOT_HELPER
kwargs['execute'] = rootwrap.custom_execute
# OS-Brick's implementation for RBD is not good enough for us
if protocol == 'rbd':
factory = RBDConnector
else:
factory = functools.partial(existing_bcp, protocol)
return factory(*args, **kwargs)
connector.get_connector_properties = my_bgcp
connector.InitiatorConnector.factory = staticmethod(my_bgc)
if hasattr(rootwrap, 'unlink_root'):
rootwrap.unlink_root = unlink_root

View File

@ -15,18 +15,17 @@
from __future__ import absolute_import
import json as json_lib
import sys
import uuid
from cinder import context
# NOTE(geguileo): Probably a good idea not to depend on cinder.cmd.volume
# having all the other imports as they could change.
from cinder.cmd import volume as volume_cmd
from cinder import objects as cinder_objs
from cinder import exception as cinder_exception
from cinder import objects as cinder_objs
from cinder.objects import base as cinder_base_ovo
from os_brick import exception as brick_exception
from os_brick import initiator as brick_initiator
from os_brick.initiator import connector as brick_connector
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
import six
@ -40,11 +39,9 @@ DEFAULT_USER_ID = 'cinderlib'
BACKEND_NAME_SNAPSHOT_FIELD = 'progress'
CONNECTIONS_OVO_FIELD = 'volume_attachment'
CONFIGURED_HOST = 'cinderlib'
# This cannot go in the setup method because cinderlib objects need them to
# be setup to set OVO_CLASS
volume_cmd.objects.register_all()
cinder_objs.register_all()
class KeyValue(object):
@ -58,6 +55,7 @@ class KeyValue(object):
class Object(object):
"""Base class for our resource representation objects."""
SIMPLE_JSON_IGNORE = tuple()
DEFAULT_FIELDS_VALUES = {}
LAZY_PROPERTIES = tuple()
backend_class = None
@ -107,7 +105,7 @@ class Object(object):
# Configure OVOs to support non_uuid_ids
if non_uuid_ids:
for ovo_name in cinder_base_ovo.CinderObjectRegistry.obj_classes():
ovo_cls = getattr(volume_cmd.objects, ovo_name)
ovo_cls = getattr(cinder_objs, ovo_name)
if 'id' in ovo_cls.fields:
ovo_cls.fields['id'] = cinder_base_ovo.fields.StringField()
@ -142,14 +140,28 @@ class Object(object):
@property
def json(self):
ovo = self._ovo.obj_to_primitive()
return self.to_json(simplified=False)
def to_json(self, simplified=True):
visited = set()
if simplified:
for field in self.SIMPLE_JSON_IGNORE:
if self._ovo.obj_attr_is_set(field):
visited.add(id(getattr(self._ovo, field)))
ovo = self._ovo.obj_to_primitive(visited=visited)
return {'class': type(self).__name__,
'backend': getattr(self.backend, 'config', self.backend),
# If no driver loaded, just return the name of the backend
'backend': getattr(self.backend, 'config',
{'volume_backend_name': self.backend}),
'ovo': ovo}
@property
def jsons(self):
return json_lib.dumps(self.json, separators=(',', ':'))
return self.to_jsons(simplified=False)
def to_jsons(self, simplified=True):
json_data = self.to_json(simplified)
return json_lib.dumps(json_data, separators=(',', ':'))
def _only_ovo_data(self, ovo):
if isinstance(ovo, dict):
@ -204,6 +216,11 @@ class Object(object):
raise AttributeError('Attribute _ovo is not yet set')
return getattr(self._ovo, name)
def _raise_with_resource(self):
exc_info = sys.exc_info()
exc_info[1].resource = self
six.reraise(*exc_info)
class NamedObject(Object):
def __init__(self, backend, **fields_data):
@ -228,6 +245,7 @@ class NamedObject(Object):
class LazyVolumeAttr(object):
LAZY_PROPERTIES = ('volume',)
_volume = None
def __init__(self, volume):
if volume:
@ -260,7 +278,8 @@ class LazyVolumeAttr(object):
class Volume(NamedObject):
OVO_CLASS = volume_cmd.objects.Volume
OVO_CLASS = cinder_objs.Volume
SIMPLE_JSON_IGNORE = ('snapshots', 'volume_attachment')
DEFAULT_FIELDS_VALUES = {
'size': 1,
'user_id': Object.CONTEXT.user_id,
@ -273,7 +292,7 @@ class Volume(NamedObject):
}
LAZY_PROPERTIES = ('snapshots', 'connections')
_ignore_keys = ('id', CONNECTIONS_OVO_FIELD, 'snapshots')
_ignore_keys = ('id', CONNECTIONS_OVO_FIELD, 'snapshots', 'volume_type')
def __init__(self, backend_or_vol, pool_name=None, **kwargs):
# Accept backend name for convenience
@ -283,19 +302,27 @@ class Volume(NamedObject):
elif isinstance(backend_or_vol, self.backend_class):
backend_name = backend_or_vol.id
elif isinstance(backend_or_vol, Volume):
backend_name, pool = backend_or_vol._ovo.host.split('#')
backend_str, pool = backend_or_vol._ovo.host.split('#')
backend_name = backend_str.split('@')[-1]
pool_name = pool_name or pool
for key in backend_or_vol._ovo.fields:
if (backend_or_vol._ovo.obj_attr_is_set(key) and
key not in self._ignore_keys):
kwargs.setdefault(key, getattr(backend_or_vol._ovo, key))
if backend_or_vol.volume_type:
kwargs.setdefault('extra_specs',
backend_or_vol.volume_type.extra_specs)
if backend_or_vol.volume_type.qos_specs:
kwargs.setdefault(
'qos_specs',
backend_or_vol.volume_type.qos_specs.specs)
backend_or_vol = backend_or_vol.backend
if '__ovo' not in kwargs:
kwargs[CONNECTIONS_OVO_FIELD] = (
volume_cmd.objects.VolumeAttachmentList(context=self.CONTEXT))
cinder_objs.VolumeAttachmentList(context=self.CONTEXT))
kwargs['snapshots'] = (
volume_cmd.objects.SnapshotList(context=self.CONTEXT))
cinder_objs.SnapshotList(context=self.CONTEXT))
self._snapshots = []
self._connections = []
@ -309,9 +336,10 @@ class Volume(NamedObject):
# If we overwrote the host, then we ignore pool_name and don't set a
# default value or copy the one from the source either.
if 'host' not in kwargs and '__ovo' not in kwargs:
# TODO(geguileo): Add pool support
pool_name = pool_name or backend_or_vol.pool_names[0]
self._ovo.host = ('%s@%s#%s' %
(CONFIGURED_HOST, backend_name, pool_name))
(cfg.CONF.host, backend_name, pool_name))
if qos_specs or extra_specs:
if qos_specs:
@ -406,6 +434,7 @@ class Volume(NamedObject):
return vol
def create(self):
self.backend._start_creating_volume(self)
try:
model_update = self.backend.driver.create_volume(self._ovo)
self._ovo.status = 'available'
@ -414,22 +443,23 @@ class Volume(NamedObject):
self.backend._volume_created(self)
except Exception:
self._ovo.status = 'error'
# TODO: raise with the vol info
raise
self._raise_with_resource()
finally:
self.save()
def delete(self):
# Some backends delete existing snapshots while others leave them
if self.snapshots:
msg = 'Cannot delete volume %s with snapshots' % self.id
raise exception.InvalidVolume(reason=msg)
try:
self.backend.driver.delete_volume(self._ovo)
self.persistence.delete_volume(self)
self.backend._volume_removed(self)
self.status = 'deleted'
except Exception:
# We don't change status to error on deletion error, we assume it
# just didn't complete.
# TODO: raise with the vol info
raise
self.status = 'error_deleting'
self.save()
self._raise_with_resource()
def extend(self, size):
volume = self._ovo
@ -442,14 +472,14 @@ class Volume(NamedObject):
volume.previous_status = None
except Exception:
volume.status = 'error'
# TODO: raise with the vol info
raise
self._raise_with_resource()
finally:
self.save()
def clone(self, **new_vol_attrs):
new_vol_attrs['source_vol_id'] = self.id
new_vol = Volume(self, **new_vol_attrs)
self.backend._start_creating_volume(new_vol)
try:
model_update = self.backend.driver.create_cloned_volume(
new_vol._ovo, self._ovo)
@ -459,24 +489,25 @@ class Volume(NamedObject):
self.backend._volume_created(new_vol)
except Exception:
new_vol.status = 'error'
# TODO: raise with the new volume info
raise
new_vol._raise_with_resource()
finally:
new_vol.save()
return new_vol
def create_snapshot(self, name='', description='', **kwargs):
snap = Snapshot(self, name=name, description=description, **kwargs)
snap.create()
if self._snapshots is not None:
self._snapshots.append(snap)
self._ovo.snapshots.objects.append(snap._ovo)
try:
snap.create()
finally:
if self._snapshots is not None:
self._snapshots.append(snap)
self._ovo.snapshots.objects.append(snap._ovo)
return snap
def attach(self):
connector_dict = brick_connector.get_connector_properties(
self.backend_class.root_helper,
volume_cmd.CONF.my_ip,
cfg.CONF.my_ip,
self.backend.configuration.use_multipath_for_image_xfer,
self.backend.configuration.enforce_multipath_for_image_xfer)
conn = self.connect(connector_dict)
@ -489,7 +520,7 @@ class Volume(NamedObject):
def detach(self, force=False, ignore_errors=False):
if not self.local_attach:
raise Exception('Not attached')
raise exception.NotLocal(self.id)
exc = brick_exception.ExceptionChainer()
conn = self.local_attach
@ -523,13 +554,12 @@ class Volume(NamedObject):
self.save()
except Exception:
self._remove_export()
# TODO: Improve raised exception
raise
self._raise_with_resource()
return conn
def _disconnect(self, connection):
self._remove_export()
if self._connections is not None:
if self._connections:
self._connections.remove(connection)
ovo_conns = getattr(self._ovo, CONNECTIONS_OVO_FIELD).objects
ovo_conns.remove(connection._ovo)
@ -543,7 +573,7 @@ class Volume(NamedObject):
self._disconnect(connection)
def cleanup(self):
for attach in self.attachments:
for attach in self.connections:
attach.detach()
self._remove_export()
@ -574,7 +604,8 @@ class Connection(Object, LazyVolumeAttr):
'connector': connector dictionary
'device': result of connect_volume}
"""
OVO_CLASS = volume_cmd.objects.VolumeAttachment
OVO_CLASS = cinder_objs.VolumeAttachment
SIMPLE_JSON_IGNORE = ('volume',)
@classmethod
def connect(cls, volume, connector, **kwargs):
@ -613,7 +644,7 @@ class Connection(Object, LazyVolumeAttr):
return connector['multipath']
# If multipathed not defined autodetect based on connection info
conn_info = conn_info['conn']['data']
conn_info = conn_info['conn'].get('data', {})
iscsi_mp = 'target_iqns' in conn_info and 'target_portals' in conn_info
fc_mp = not isinstance(conn_info.get('target_wwn', ''),
six.string_types)
@ -624,7 +655,7 @@ class Connection(Object, LazyVolumeAttr):
scan_attempts = brick_initiator.DEVICE_SCAN_ATTEMPTS_DEFAULT
self.scan_attempts = kwargs.pop('device_scan_attempts', scan_attempts)
volume = kwargs.pop('volume')
volume = kwargs.pop('volume', None)
self._connector = None
super(Connection, self).__init__(*args, **kwargs)
@ -659,6 +690,8 @@ class Connection(Object, LazyVolumeAttr):
@connector_info.setter
def connector_info(self, value):
if self._ovo.connection_info is None:
self._ovo.connection_info = {}
self.connection_info['connector'] = value
# Since we are changing the dictionary the OVO won't detect the change
self._changed_fields.add('connection_info')
@ -801,7 +834,8 @@ class Connection(Object, LazyVolumeAttr):
class Snapshot(NamedObject, LazyVolumeAttr):
OVO_CLASS = volume_cmd.objects.Snapshot
OVO_CLASS = cinder_objs.Snapshot
SIMPLE_JSON_IGNORE = ('volume',)
DEFAULT_FIELDS_VALUES = {
'status': 'creating',
'metadata': {},
@ -856,8 +890,7 @@ class Snapshot(NamedObject, LazyVolumeAttr):
self._ovo.update(model_update)
except Exception:
self._ovo.status = 'error'
# TODO: raise with the vol info
raise
self._raise_with_resource()
finally:
self.save()
@ -865,11 +898,11 @@ class Snapshot(NamedObject, LazyVolumeAttr):
try:
self.backend.driver.delete_snapshot(self._ovo)
self.persistence.delete_snapshot(self)
self.status = 'deleted'
except Exception:
# We don't change status to error on deletion error, we assume it
# just didn't complete.
# TODO: raise with the snap info
raise
self.status = 'error_deleting'
self.save()
self._raise_with_resource()
if self._volume is not None and self._volume._snapshots is not None:
try:
self._volume._snapshots.remove(self)
@ -881,6 +914,7 @@ class Snapshot(NamedObject, LazyVolumeAttr):
new_vol_params.setdefault('size', self.volume_size)
new_vol_params['snapshot_id'] = self.id
new_vol = Volume(self.volume, **new_vol_params)
self.backend._start_creating_volume(new_vol)
try:
model_update = self.backend.driver.create_volume_from_snapshot(
new_vol._ovo, self._ovo)
@ -890,8 +924,7 @@ class Snapshot(NamedObject, LazyVolumeAttr):
self.backend._volume_created(new_vol)
except Exception:
new_vol._ovo.status = 'error'
# TODO: raise with the new volume info
raise
new_vol._raise_with_resource()
finally:
new_vol.save()

View File

@ -13,14 +13,17 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
# NOTE(geguileo): Probably a good idea not to depend on cinder.cmd.volume
# having all the other imports as they could change.
from cinder.cmd import volume as volume_cmd
from cinder import objects
from cinder.objects import base as cinder_base_ovo
from oslo_utils import timeutils
from oslo_versionedobjects import fields
import six
import cinderlib
from cinderlib import serialization
@ -101,11 +104,11 @@ class PersistenceDriverBase(object):
for key in resource._changed_fields
if not isinstance(resource.fields[key], fields.ObjectField)}
if getattr(resource._ovo, 'volume_type_id', None):
if ('qos_specs' in resource.volume_type._changed_fields
and resource.volume_type.qos_specs):
if ('qos_specs' in resource.volume_type._changed_fields and
resource.volume_type.qos_specs):
result['qos_specs'] = resource._ovo.volume_type.qos_specs.specs
if ('extra_specs' in resource.volume_type._changed_fields
and resource.volume_type.extra_specs):
if ('extra_specs' in resource.volume_type._changed_fields and
resource.volume_type.extra_specs):
result['extra_specs'] = resource._ovo.volume_type.extra_specs
return result
@ -135,22 +138,24 @@ class DB(object):
Data will be retrieved using the persistence driver we setup.
"""
GET_METHODS_PER_DB_MODEL = {
objects.Volume.model: 'volume_get',
objects.VolumeType.model: 'volume_type_get',
objects.Snapshot.model: 'snapshot_get',
objects.QualityOfServiceSpecs.model: 'qos_specs_get',
}
def __init__(self, persistence_driver):
self.persistence = persistence_driver
# Replace the standard DB configuration for code that doesn't use the
# driver.db attribute (ie: OVOs).
volume_cmd.session.IMPL = self
# Replace get_by_id OVO methods with something that will return
# expected data
volume_cmd.objects.Volume.get_by_id = self.volume_get
volume_cmd.objects.Snapshot.get_by_id = self.snapshot_get
objects.Volume.get_by_id = self.volume_get
objects.Snapshot.get_by_id = self.snapshot_get
# Disable saving in OVOs
for ovo_name in cinder_base_ovo.CinderObjectRegistry.obj_classes():
ovo_cls = getattr(volume_cmd.objects, ovo_name)
ovo_cls = getattr(objects, ovo_name)
ovo_cls.save = lambda *args, **kwargs: None
def volume_get(self, context, volume_id, *args, **kwargs):
@ -159,27 +164,38 @@ class DB(object):
def snapshot_get(self, context, snapshot_id, *args, **kwargs):
return self.persistence.get_snapshots(snapshot_id)[0]._ovo
def get_volume_type(self, context, id, inactive=False,
def volume_type_get(self, context, id, inactive=False,
expected_fields=None):
res = self.persistence.get_volumes(id)[0]._ovo
if not res.volume_type_id:
if id in cinderlib.Backend._volumes_inflight:
vol = cinderlib.Backend._volumes_inflight[id]
else:
vol = self.persistence.get_volumes(id)[0]
if not vol._ovo.volume_type_id:
return None
return self._vol_type_to_dict(res.volume_type)
return vol_type_to_dict(vol._ovo.volume_type)
def qos_specs_get(self, context, qos_specs_id, inactive=False):
res = self.persistence.get_volumes(qos_specs_id)[0]._ovo
if not res.volume_type_id:
if qos_specs_id in cinderlib.Backend._volumes_inflight:
vol = cinderlib.Backend._volumes_inflight[qos_specs_id]
else:
vol = self.persistence.get_volumes(qos_specs_id)[0]
if not vol._ovo.volume_type_id:
return None
return self._vol_type_to_dict(res.volume_type)['qos_specs']
@staticmethod
def _vol_type_to_dict(volume_type):
res = serialization.obj_to_primitive(volume_type)
res = res['versioned_object.data']
if res.get('qos_specs'):
res['qos_specs'] = res['qos_specs']['versioned_object.data']
return res
return vol_type_to_dict(vol._ovo.volume_type)['qos_specs']
@classmethod
def image_volume_cache_get_by_volume_id(cls, context, volume_id):
return None
def get_by_id(self, context, model, id, *args, **kwargs):
method = getattr(self, self.GET_METHODS_PER_DB_MODEL[model])
return method(context, id)
def vol_type_to_dict(volume_type):
res = serialization.obj_to_primitive(volume_type)
res = res['versioned_object.data']
if res.get('qos_specs'):
res['qos_specs'] = res['qos_specs']['versioned_object.data']
return res

View File

@ -17,12 +17,12 @@ from __future__ import absolute_import
import logging
from cinder.cmd import volume as volume_cmd
from cinder.db import api as db_api
from cinder.db import migration
from cinder.db.sqlalchemy import api as sqla_api
from cinder.db.sqlalchemy import models
from cinder import objects as cinder_objs
from oslo_config import cfg
from oslo_db import exception
from oslo_log import log
@ -40,13 +40,18 @@ class KeyValue(models.BASE, models.models.ModelBase, objects.KeyValue):
class DBPersistence(persistence_base.PersistenceDriverBase):
GET_METHODS_PER_DB_MODEL = {
cinder_objs.VolumeType.model: 'volume_type_get',
cinder_objs.QualityOfServiceSpecs.model: 'qos_specs_get',
}
def __init__(self, connection, sqlite_synchronous=True,
soft_deletes=False):
self.soft_deletes = soft_deletes
volume_cmd.CONF.set_override('connection', connection, 'database')
volume_cmd.CONF.set_override('sqlite_synchronous',
sqlite_synchronous,
'database')
cfg.CONF.set_override('connection', connection, 'database')
cfg.CONF.set_override('sqlite_synchronous',
sqlite_synchronous,
'database')
# Suppress logging for migration
migrate_logger = logging.getLogger('migrate')
@ -54,20 +59,54 @@ class DBPersistence(persistence_base.PersistenceDriverBase):
self._clear_facade()
self.db_instance = db_api.oslo_db_api.DBAPI.from_config(
conf=volume_cmd.CONF, backend_mapping=db_api._BACKEND_MAPPING,
conf=cfg.CONF, backend_mapping=db_api._BACKEND_MAPPING,
lazy=True)
# We need to wrap some get methods that get called before the volume is
# actually created.
self.original_vol_type_get = self.db_instance.volume_type_get
self.db_instance.volume_type_get = self.vol_type_get
self.original_qos_specs_get = self.db_instance.qos_specs_get
self.db_instance.qos_specs_get = self.qos_specs_get
self.original_get_by_id = self.db_instance.get_by_id
self.db_instance.get_by_id = self.get_by_id
migration.db_sync()
self._create_key_value_table()
super(DBPersistence, self).__init__()
def vol_type_get(self, context, id, inactive=False,
expected_fields=None):
if id not in objects.Backend._volumes_inflight:
return self.original_vol_type_get(context, id, inactive)
vol = objects.Backend._volumes_inflight[id]._ovo
if not vol.volume_type_id:
return None
return persistence_base.vol_type_to_dict(vol.volume_type)
def qos_specs_get(self, context, qos_specs_id, inactive=False):
if qos_specs_id not in objects.Backend._volumes_inflight:
return self.original_qos_specs_get(context, qos_specs_id, inactive)
vol = objects.Backend._volumes_inflight[qos_specs_id]._ovo
if not vol.volume_type_id:
return None
return persistence_base.vol_type_to_dict(vol.volume_type)['qos_specs']
def get_by_id(self, context, model, id, *args, **kwargs):
if model not in self.GET_METHODS_PER_DB_MODEL:
return self.original_get_by_id(context, model, id, *args, **kwargs)
method = getattr(self, self.GET_METHODS_PER_DB_MODEL[model])
return method(context, id)
def _clear_facade(self):
# This is for Pike
if hasattr(sqla_api, '_FACADE'):
sqla_api._FACADE = None
# This is for Queens and Rocky (untested)
elif hasattr(sqla_api, 'configure'):
sqla_api.configure(volume_cmd.CONF)
sqla_api.configure(cfg.CONF)
def _create_key_value_table(self):
models.BASE.metadata.create_all(sqla_api.get_engine(),
@ -82,18 +121,15 @@ class DBPersistence(persistence_base.PersistenceDriverBase):
return {key: value for key, value in kwargs.items() if value}
def get_volumes(self, volume_id=None, volume_name=None, backend_name=None):
if backend_name:
host = '%s@%s' % (objects.CONFIGURED_HOST, backend_name)
else:
host = None
# Use the % wildcard to ignore the host name on the backend_name search
host = '%@' + backend_name if backend_name else None
filters = self._build_filter(id=volume_id, display_name=volume_name,
host=host)
LOG.debug('get_volumes for %s', filters)
ovos = cinder_objs.VolumeList.get_all(objects.CONTEXT, filters=filters)
result = []
for ovo in ovos:
# We have stored the backend reversed with the host, switch it back
backend = ovo.host.split('@')[1].split('#')[0]
backend = ovo.host.split('@')[-1].split('#')[0]
# Trigger lazy loading of specs
if ovo.volume_type_id:

View File

@ -28,6 +28,7 @@ import six
from cinder.objects import base as cinder_base_ovo
from oslo_versionedobjects import base as base_ovo
from oslo_versionedobjects import fields as ovo_fields
from cinderlib import objects
@ -67,13 +68,23 @@ def wrap_to_primitive(cls):
setattr(cls, 'to_primitive', staticmethod(to_primitive))
def _set_visited(element, visited):
# visited keeps track of elements visited to prevent loops
if visited is None:
visited = set()
# We only care about complex object that can have loops, others are ignored
# to prevent us from not serializing simple objects, such as booleans, that
# can have the same instance used for multiple fields.
if isinstance(element,
(ovo_fields.ObjectField, cinder_base_ovo.CinderObject)):
visited.add(id(element))
return visited
def obj_to_primitive(self, target_version=None,
version_manifest=None, visited=None):
# No target_version, version_manifest, or changes support
if visited is None:
visited = set()
visited.add(id(self))
visited = _set_visited(self, visited)
primitive = {}
for name, field in self.fields.items():
if self.obj_attr_is_set(name):
@ -120,29 +131,24 @@ def field_to_primitive(self, obj, attr, value, visited=None):
def iterable_to_primitive(self, obj, attr, value, visited=None):
if visited is None:
visited = set()
visited.add(id(value))
visited = _set_visited(self, visited)
result = []
for elem in value:
if id(elem) in visited:
continue
visited.add(id(elem))
_set_visited(elem, visited)
r = self._element_type.to_primitive(obj, attr, elem, visited)
result.append(r)
return result
def dict_to_primitive(self, obj, attr, value, visited=None):
if visited is None:
visited = set()
visited.add(id(value))
visited = _set_visited(self, visited)
primitive = {}
for key, elem in value.items():
if id(elem) in visited:
continue
visited.add(id(elem))
_set_visited(elem, visited)
primitive[key] = self._element_type.to_primitive(
obj, '%s["%s"]' % (attr, key), elem, visited)
return primitive

View File

View File

@ -18,10 +18,13 @@ import os
import subprocess
import tempfile
from oslo_config import cfg
import six
import unittest2
import yaml
import cinderlib
from cinderlib.tests.functional import cinder_to_yaml
def set_backend(func, new_name, backend_name):
@ -35,6 +38,7 @@ def set_backend(func, new_name, backend_name):
def test_all_backends(cls):
"""Decorator to run tests in a class for all available backends."""
config = BaseFunctTestCase.ensure_config_loaded()
for fname, func in cls.__dict__.items():
if fname.startswith('test_'):
@ -47,44 +51,55 @@ def test_all_backends(cls):
class BaseFunctTestCase(unittest2.TestCase):
DEFAULTS = {'logs': False, 'venv_sudo': False, 'size_precision': 0}
FNULL = open(os.devnull, 'w')
CONFIG_FILE = os.environ.get('CL_FTEST_CFG', 'tests/functional/lvm.yaml')
CONFIG_FILE = os.environ.get('CL_FTEST_CFG', '/etc/cinder/cinder.conf')
PRECISION = os.environ.get('CL_FTEST_PRECISION', 0)
LOGGING_ENABLED = os.environ.get('CL_FTEST_LOGGING', False)
ROOT_HELPER = os.environ.get('CL_FTEST_ROOT_HELPER', 'sudo')
tests_config = None
@classmethod
def ensure_config_loaded(cls):
if not cls.tests_config:
# Read backend configuration file
with open(cls.CONFIG_FILE, 'r') as f:
cls.tests_config = yaml.load(f)
# Set configuration default values
for k, v in cls.DEFAULTS.items():
cls.tests_config.setdefault(k, v)
# If it's a .conf type of configuration file convert it to dict
if cls.CONFIG_FILE.endswith('.conf'):
cls.tests_config = cinder_to_yaml.convert(cls.CONFIG_FILE)
else:
with open(cls.CONFIG_FILE, 'r') as f:
cls.tests_config = yaml.load(f)
cls.tests_config.setdefault('logs', cls.LOGGING_ENABLED)
cls.tests_config.setdefault('size_precision', cls.PRECISION)
return cls.tests_config
@staticmethod
def _replace_oslo_cli_parse():
original_cli_parser = cfg.ConfigOpts._parse_cli_opts
def _parse_cli_opts(self, args):
return original_cli_parser(self, [])
cfg.ConfigOpts._parse_cli_opts = six.create_unbound_method(
_parse_cli_opts, cfg.ConfigOpts)
@classmethod
def setUpClass(cls):
cls._replace_oslo_cli_parse()
config = cls.ensure_config_loaded()
if config['venv_sudo']:
# NOTE(geguileo): For some drivers need to use a custom sudo script
# to find virtualenv commands (ie: cinder-rtstool).
path = os.path.dirname(os.path.abspath(os.path.realpath(__file__)))
sudo_tool = os.path.join(path, '../../tools/virtualenv-sudo.sh')
cls.root_helper = os.path.abspath(sudo_tool)
else:
cls.root_helper = 'sudo'
cinderlib.setup(root_helper=cls.root_helper,
disable_logs=not config['logs'])
# Use memory_db persistence instead of memory to ensure migrations work
cinderlib.setup(root_helper=cls.ROOT_HELPER,
disable_logs=not config['logs'],
persistence_config={'storage': 'memory_db'})
# Initialize backends
cls.backends = [cinderlib.Backend(**cfg) for cfg in
config['backends']]
# Lazy load backend's _volumes variable using the volumes property so
# new volumes are added to this list on successful creation.
for backend in cls.backends:
backend.volumes
# Set current backend, by default is the first
cls.backend = cls.backends[0]
cls.size_precision = config['size_precision']
@classmethod
@ -131,7 +146,7 @@ class BaseFunctTestCase(unittest2.TestCase):
raise Exception('Errors on test cleanup: %s' % '\n\t'.join(errors))
def _root_execute(self, *args, **kwargs):
cmd = [self.root_helper]
cmd = [self.ROOT_HELPER]
cmd.extend(args)
cmd.extend("%s=%s" % (k, v) for k, v in kwargs.items())
return subprocess.check_output(cmd, stderr=self.FNULL)
@ -187,7 +202,7 @@ class BaseFunctTestCase(unittest2.TestCase):
def _write_data(self, vol, data=None, do_detach=True):
if not data:
data = '0123456789' * 100
data = b'0123456789' * 100
if not vol.local_attach:
vol.attach()

View File

@ -0,0 +1,70 @@
# Copyright (c) 2018, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os import path
import yaml
from six.moves import configparser
from cinder.cmd import volume
volume.objects.register_all() # noqa
from cinder.volume import configuration as config
from cinder.volume import manager
def convert(cinder_source, yaml_dest=None):
result_cfgs = []
if not path.exists(cinder_source):
raise Exception("Cinder config file %s doesn't exist" % cinder_source)
# Manually parse the Cinder configuration file so we know which options are
# set.
parser = configparser.ConfigParser()
parser.read(cinder_source)
enabled_backends = parser.get('DEFAULT', 'enabled_backends')
backends = [name.strip() for name in enabled_backends.split(',') if name]
volume.CONF(('--config-file', cinder_source), project='cinder')
for backend in backends:
options_present = parser.options(backend)
# Dynamically loading the driver triggers adding the specific
# configuration options to the backend_defaults section
cfg = config.Configuration(manager.volume_backend_opts,
config_group=backend)
driver_ns = cfg.volume_driver.rsplit('.', 1)[0]
__import__(driver_ns)
# Use the backend_defaults section to extract the configuration for
# options that are present in the backend section and add them to
# the backend section.
opts = volume.CONF._groups['backend_defaults']._opts
known_present_options = [opt for opt in options_present if opt in opts]
volume_opts = [opts[option]['opt'] for option in known_present_options]
cfg.append_config_values(volume_opts)
# Now retrieve the options that are set in the configuration file.
result_cfgs.append({option: cfg.safe_get(option)
for option in known_present_options})
result = {'backends': result_cfgs}
if yaml_dest:
# Write the YAML to the destination
with open(yaml_dest, 'w') as f:
yaml.dump(result, f)
return result

View File

@ -202,15 +202,3 @@ class BackendFunctBasic(base_tests.BaseFunctTestCase):
read_data = self._read_data(new_vol, len(data))
self.assertEqual(original_size, created_size)
self.assertEqual(data, read_data)
def test_connect_disconnect_volume(self):
# TODO(geguileo): Implement the test
pass
def test_connect_disconnect_multiple_volumes(self):
# TODO(geguileo): Implement the test
pass
def test_connect_disconnect_multiple_times(self):
# TODO(geguileo): Implement the test
pass

View File

View File

@ -0,0 +1,62 @@
# Copyright (c) 2018, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import unittest2
from oslo_config import cfg
import six
import cinderlib
from cinderlib.tests.unit import utils
def _replace_oslo_cli_parse():
original_cli_parser = cfg.ConfigOpts._parse_cli_opts
def _parse_cli_opts(self, args):
return original_cli_parser(self, [])
cfg.ConfigOpts._parse_cli_opts = six.create_unbound_method(_parse_cli_opts,
cfg.ConfigOpts)
_replace_oslo_cli_parse()
cinderlib.setup(persistence_config={'storage': utils.get_mock_persistence()})
class BaseTest(unittest2.TestCase):
PERSISTENCE_CFG = None
def setUp(self):
if not self.PERSISTENCE_CFG:
cfg = {'storage': utils.get_mock_persistence()}
cinderlib.Backend.set_persistence(cfg)
self.backend_name = 'fake_backend'
self.backend = utils.FakeBackend(volume_backend_name=self.backend_name)
self.persistence = self.backend.persistence
cinderlib.Backend._volumes_inflight = {}
def tearDown(self):
# Clear all existing backends
cinderlib.Backend.backends = {}
def patch(self, path, *args, **kwargs):
"""Use python mock to mock a path with automatic cleanup."""
patcher = mock.patch(path, *args, **kwargs)
result = patcher.start()
self.addCleanup(patcher.stop)
return result

View File

View File

@ -0,0 +1,259 @@
# Copyright (c) 2018, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from cinderlib import exception
from cinderlib import objects
from cinderlib.tests.unit import base
@ddt.ddt
class TestConnection(base.BaseTest):
def setUp(self):
self.original_is_multipathed = objects.Connection._is_multipathed_conn
self.mock_is_mp = self.patch(
'cinderlib.objects.Connection._is_multipathed_conn')
self.mock_default = self.patch(
'os_brick.initiator.DEVICE_SCAN_ATTEMPTS_DEFAULT')
super(TestConnection, self).setUp()
self.vol = objects.Volume(self.backend_name, size=10)
self.kwargs = {'k1': 'v1', 'k2': 'v2'}
self.conn = objects.Connection(self.backend, volume=self.vol,
**self.kwargs)
self.conn._ovo.connection_info = {
'connector': {'multipath': mock.sentinel.mp_ovo_connector}}
def test_init(self):
self.mock_is_mp.assert_called_once_with(self.kwargs)
self.assertEqual(self.conn.use_multipath, self.mock_is_mp.return_value)
self.assertEqual(self.conn.scan_attempts, self.mock_default)
self.assertIsNone(self.conn._connector)
self.assertEqual(self.vol, self.conn._volume)
self.assertEqual(self.vol._ovo, self.conn._ovo.volume)
self.assertEqual(self.vol._ovo.id, self.conn._ovo.volume_id)
def test__is_multipathed_conn_kwargs(self):
res = self.original_is_multipathed(dict(
use_multipath=mock.sentinel.mp_kwargs,
connector={'multipath': mock.sentinel.mp_connector},
__ovo=self.conn._ovo))
self.assertEqual(mock.sentinel.mp_kwargs, res)
def test__is_multipathed_conn_connector_kwarg(self):
res = self.original_is_multipathed(dict(
connector={'multipath': mock.sentinel.mp_connector},
__ovo=self.conn._ovo))
self.assertEqual(mock.sentinel.mp_connector, res)
def test__is_multipathed_conn_connector_ovo(self):
res = self.original_is_multipathed(dict(connector={},
__ovo=self.conn._ovo))
self.assertEqual(mock.sentinel.mp_ovo_connector, res)
def test__is_multipathed_conn_connection_info_iscsi_true(self):
res = self.original_is_multipathed(dict(
connection_info={'conn': {'data': {'target_iqns': '',
'target_portals': ''}}}))
self.assertTrue(res)
def test__is_multipathed_conn_connection_info_iscsi_false(self):
res = self.original_is_multipathed(dict(
connection_info={'conn': {'data': {'target_iqns': ''}}}))
self.assertFalse(res)
def test__is_multipathed_conn_connection_info_fc_true(self):
res = self.original_is_multipathed(dict(
connection_info={'conn': {'data': {'target_wwn': []}}}))
self.assertTrue(res)
def test__is_multipathed_conn_connection_info_fc_false(self):
res = self.original_is_multipathed(dict(
connection_info={'conn': {'data': {'target_wwn': ''}}}))
self.assertFalse(res)
def test_init_no_backend(self):
self.assertRaises(TypeError, objects.Connection)
def test_init_no_volume(self):
self.mock_is_mp.reset_mock()
conn = objects.Connection(self.backend, **self.kwargs)
self.mock_is_mp.assert_called_once_with(self.kwargs)
self.assertEqual(conn.use_multipath, self.mock_is_mp.return_value)
self.assertEqual(conn.scan_attempts, self.mock_default)
self.assertIsNone(conn._connector)
def test_connect(self):
connector = {'my_c': 'v'}
conn = self.conn.connect(self.vol, connector)
init_conn = self.backend.driver.initialize_connection
init_conn.assert_called_once_with(self.vol, connector)
self.assertIsInstance(conn, objects.Connection)
self.assertEqual('attached', conn.status)
self.assertEqual(init_conn.return_value, conn.connection_info['conn'])
self.assertEqual(connector, conn.connector_info)
self.persistence.set_connection.assert_called_once_with(conn)
@mock.patch('cinderlib.objects.Volume._disconnect')
@mock.patch('cinderlib.objects.Connection._disconnect')
def test_disconnect(self, mock_disc, mock_vol_disc):
self.conn.disconnect(force=mock.sentinel.force)
mock_disc.assert_called_once_with(mock.sentinel.force)
mock_vol_disc.assert_called_once_with(self.conn)
def test__disconnect(self):
conn_info = self.conn.connector_info
self.conn._disconnect(mock.sentinel.force)
self.backend.driver.terminate_connection.assert_called_once_with(
self.vol._ovo, conn_info, force=mock.sentinel.force)
self.assertEqual({}, self.conn.conn_info)
self.assertEqual('detached', self.conn.status)
self.persistence.delete_connection.assert_called_once_with(self.conn)
@mock.patch('cinderlib.objects.Connection.conn_info', {'data': 'mydata'})
@mock.patch('cinderlib.objects.Connection.path')
@mock.patch('cinderlib.objects.Connection.device_attached')
def test_attach(self, mock_attached, mock_path):
with mock.patch('cinderlib.objects.Connection.connector') as mock_conn:
self.conn.attach()
mock_conn.connect_volume.assert_called_once_with('mydata')
mock_attached.assert_called_once_with(
mock_conn.connect_volume.return_value)
mock_conn.check_valid_device.assert_called_once_with(mock_path)
self.assertEqual(self.conn, self.vol.local_attach)
@mock.patch('cinderlib.objects.Connection.conn_info', {'data': 'mydata'})
@mock.patch('cinderlib.objects.Connection.device')
def test_detach(self, mock_device):
self.vol.local_attach = mock.Mock()
with mock.patch('cinderlib.objects.Connection.connector') as mock_conn:
self.conn.detach(mock.sentinel.force, mock.sentinel.ignore)
mock_conn.disconnect_volume.assert_called_once_with(
'mydata',
mock_device,
force=mock.sentinel.force,
ignore_errors=mock.sentinel.ignore)
self.assertIsNone(self.vol.local_attach)
self.assertIsNone(self.conn.device)
self.assertIsNone(self.conn._connector)
self.persistence.set_connection.assert_called_once_with(self.conn)
def test_get_by_id(self):
self.persistence.get_connections.return_value = [mock.sentinel.conn]
res = objects.Connection.get_by_id(mock.sentinel.conn_id)
self.assertEqual(mock.sentinel.conn, res)
self.persistence.get_connections.assert_called_once_with(
connection_id=mock.sentinel.conn_id)
def test_get_by_id_not_found(self):
self.persistence.get_connections.return_value = None
self.assertRaises(exception.ConnectionNotFound,
objects.Connection.get_by_id,
mock.sentinel.conn_id)
self.persistence.get_connections.assert_called_once_with(
connection_id=mock.sentinel.conn_id)
def test_device_attached(self):
self.conn.device_attached(mock.sentinel.device)
self.assertEqual(mock.sentinel.device,
self.conn.connection_info['device'])
self.persistence.set_connection.assert_called_once_with(self.conn)
def test_conn_info_setter(self):
self.conn.conn_info = mock.sentinel.conn_info
self.assertEqual(mock.sentinel.conn_info,
self.conn._ovo.connection_info['conn'])
def test_conn_info_setter_clear(self):
self.conn.conn_info = mock.sentinel.conn_info
self.conn.conn_info = {}
self.assertIsNone(self.conn._ovo.connection_info)
def test_conn_info_getter(self):
self.conn.conn_info = mock.sentinel.conn_info
self.assertEqual(mock.sentinel.conn_info, self.conn.conn_info)
def test_conn_info_getter_none(self):
self.conn.conn_info = None
self.assertEqual({}, self.conn.conn_info)
def test_protocol(self):
self.conn.conn_info = {'driver_volume_type': mock.sentinel.iscsi}
self.assertEqual(mock.sentinel.iscsi, self.conn.protocol)
def test_connector_info_setter(self):
self.conn.connector_info = mock.sentinel.connector
self.assertEqual(mock.sentinel.connector,
self.conn._ovo.connection_info['connector'])
self.assertIn('connection_info', self.conn._ovo._changed_fields)
def test_connector_info_getter(self):
self.conn.connector_info = mock.sentinel.connector
self.assertEqual(mock.sentinel.connector, self.conn.connector_info)
def test_connector_info_getter_empty(self):
self.conn._ovo.connection_info = None
self.assertIsNone(self.conn.connector_info)
def test_device_setter(self):
self.conn.device = mock.sentinel.device
self.assertEqual(mock.sentinel.device,
self.conn._ovo.connection_info['device'])
self.assertIn('connection_info', self.conn._ovo._changed_fields)
def test_device_setter_none(self):
self.conn.device = mock.sentinel.device
self.conn.device = None
self.assertNotIn('device', self.conn._ovo.connection_info)
self.assertIn('connection_info', self.conn._ovo._changed_fields)
def test_device_getter(self):
self.conn.device = mock.sentinel.device
self.assertEqual(mock.sentinel.device, self.conn.device)
def test_path(self):
self.conn.device = {'path': mock.sentinel.path}
self.assertEqual(mock.sentinel.path, self.conn.path)
@mock.patch('cinderlib.objects.Connection.conn_info')
@mock.patch('cinderlib.objects.Connection.protocol')
@mock.patch('os_brick.initiator.connector.InitiatorConnector.factory')
def test_connector_getter(self, mock_connector, mock_proto, mock_info):
res = self.conn.connector
self.assertEqual(mock_connector.return_value, res)
mock_connector.assert_called_once_with(
mock_proto,
self.backend.root_helper,
use_multipath=self.mock_is_mp.return_value,
device_scan_attempts=self.mock_default,
conn=mock_info,
do_local_attach=True)
# Make sure we cache the value
res = self.conn.connector
self.assertEqual(1, mock_connector.call_count)
@ddt.data(True, False)
def test_attached_true(self, value):
with mock.patch('cinderlib.objects.Connection.device', value):
self.assertEqual(value, self.conn.attached)
@ddt.data(True, False)
def test_connected(self, value):
with mock.patch('cinderlib.objects.Connection.conn_info', value):
self.assertEqual(value, self.conn.connected)

View File

@ -0,0 +1,146 @@
# Copyright (c) 2018, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinderlib import exception
from cinderlib import objects
from cinderlib.tests.unit import base
class TestSnapshot(base.BaseTest):
def setUp(self):
super(TestSnapshot, self).setUp()
self.vol = objects.Volume(self.backend_name, size=10,
extra_specs={'e': 'v'},
qos_specs={'q': 'qv'})
self.snap = objects.Snapshot(self.vol,
name='my_snap', description='my_desc')
self.vol._snapshots.append(self.snap)
self.vol._ovo.snapshots.objects.append(self.snap._ovo)
def test_init_from_volume(self):
self.assertIsNotNone(self.snap.id)
self.assertEqual(self.backend, self.snap.backend)
self.assertEqual('my_snap', self.snap.name)
self.assertEqual('my_snap', self.snap.display_name)
self.assertEqual('my_desc', self.snap.description)
self.assertEqual(self.vol.user_id, self.snap.user_id)
self.assertEqual(self.vol.project_id, self.snap.project_id)
self.assertEqual(self.vol.id, self.snap.volume_id)
self.assertEqual(self.vol.size, self.snap.volume_size)
self.assertEqual(self.vol._ovo, self.snap._ovo.volume)
self.assertEqual(self.vol.volume_type_id, self.snap.volume_type_id)
self.assertEqual(self.vol, self.snap.volume)
def test_init_from_ovo(self):
snap2 = objects.Snapshot(None, __ovo=self.snap._ovo)
self.assertEqual(self.snap.backend, snap2.backend)
self.assertEqual(self.snap._ovo, snap2._ovo)
self.assertEqual(self.vol, self.snap.volume)
def test_create(self):
update_vol = {'provider_id': 'provider_id'}
self.backend.driver.create_snapshot.return_value = update_vol
self.snap.create()
self.assertEqual('available', self.snap.status)
self.assertEqual('provider_id', self.snap.provider_id)
self.backend.driver.create_snapshot.assert_called_once_with(
self.snap._ovo)
self.persistence.set_snapshot.assert_called_once_with(self.snap)
def test_create_error(self):
self.backend.driver.create_snapshot.side_effect = exception.NotFound
with self.assertRaises(exception.NotFound) as assert_context:
self.snap.create()
self.assertEqual(self.snap, assert_context.exception.resource)
self.backend.driver.create_snapshot.assert_called_once_with(
self.snap._ovo)
self.assertEqual('error', self.snap.status)
self.persistence.set_snapshot.assert_called_once_with(self.snap)
def test_delete(self):
self.snap.delete()
self.backend.driver.delete_snapshot.assert_called_once_with(
self.snap._ovo)
self.persistence.delete_snapshot.assert_called_once_with(self.snap)
self.assertEqual([], self.vol.snapshots)
self.assertEqual([], self.vol._ovo.snapshots.objects)
self.assertEqual('deleted', self.snap.status)
def test_delete_error(self):
self.backend.driver.delete_snapshot.side_effect = exception.NotFound
with self.assertRaises(exception.NotFound) as assert_context:
self.snap.delete()
self.assertEqual(self.snap, assert_context.exception.resource)
self.backend.driver.delete_snapshot.assert_called_once_with(
self.snap._ovo)
self.persistence.delete_snapshot.assert_not_called()
self.assertEqual([self.snap], self.vol.snapshots)
self.assertEqual([self.snap._ovo], self.vol._ovo.snapshots.objects)
self.assertEqual('error_deleting', self.snap.status)
def test_create_volume(self):
create_mock = self.backend.driver.create_volume_from_snapshot
create_mock.return_value = None
vol2 = self.snap.create_volume(name='new_name', description='new_desc')
create_mock.assert_called_once_with(vol2._ovo, self.snap._ovo)
self.assertEqual('available', vol2.status)
self.assertEqual(1, len(self.backend._volumes))
self.assertEqual(vol2, self.backend._volumes[0])
self.persistence.set_volume.assert_called_once_with(vol2)
self.assertEqual(self.vol.id, self.vol.volume_type_id)
self.assertNotEqual(self.vol.id, vol2.id)
self.assertEqual(vol2.id, vol2.volume_type_id)
self.assertEqual(self.vol.volume_type.extra_specs,
vol2.volume_type.extra_specs)
self.assertEqual(self.vol.volume_type.qos_specs.specs,
vol2.volume_type.qos_specs.specs)
def test_create_volume_error(self):
create_mock = self.backend.driver.create_volume_from_snapshot
create_mock.side_effect = exception.NotFound
with self.assertRaises(exception.NotFound) as assert_context:
self.snap.create_volume()
self.assertEqual(1, len(self.backend._volumes_inflight))
vol2 = list(self.backend._volumes_inflight.values())[0]
self.assertEqual(vol2, assert_context.exception.resource)
create_mock.assert_called_once_with(vol2, self.snap._ovo)
self.assertEqual('error', vol2.status)
self.persistence.set_volume.assert_called_once_with(mock.ANY)
def test_get_by_id(self):
mock_get_snaps = self.persistence.get_snapshots
mock_get_snaps.return_value = [mock.sentinel.snap]
res = objects.Snapshot.get_by_id(mock.sentinel.snap_id)
mock_get_snaps.assert_called_once_with(
snapshot_id=mock.sentinel.snap_id)
self.assertEqual(mock.sentinel.snap, res)
def test_get_by_id_not_found(self):
mock_get_snaps = self.persistence.get_snapshots
mock_get_snaps.return_value = None
self.assertRaises(exception.SnapshotNotFound,
objects.Snapshot.get_by_id, mock.sentinel.snap_id)
mock_get_snaps.assert_called_once_with(
snapshot_id=mock.sentinel.snap_id)
def test_get_by_name(self):
res = objects.Snapshot.get_by_name(mock.sentinel.name)
mock_get_snaps = self.persistence.get_snapshots
mock_get_snaps.assert_called_once_with(
snapshot_name=mock.sentinel.name)
self.assertEqual(mock_get_snaps.return_value, res)

View File

@ -0,0 +1,428 @@
# Copyright (c) 2018, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinderlib import exception
from cinderlib import objects
from cinderlib.tests.unit import base
class TestVolume(base.BaseTest):
def test_init_from_args_backend_name(self):
vol = objects.Volume(self.backend_name,
name='vol_name', description='vol_desc', size=10)
self.assertEqual(self.backend, vol.backend)
self.assertEqual('vol_name', vol.name)
self.assertEqual('vol_name', vol.display_name)
self.assertEqual('vol_desc', vol.description)
self.assertEqual(10, vol.size)
self.assertIsNotNone(vol.id)
def test_init_from_args_backend(self):
vol = objects.Volume(self.backend,
name='vol_name', description='vol_desc', size=10)
self.assertEqual(self.backend, vol.backend)
self.assertEqual('vol_name', vol.name)
self.assertEqual('vol_name', vol.display_name)
self.assertEqual('vol_desc', vol.description)
self.assertEqual(10, vol.size)
self.assertIsNotNone(vol.id)
def test_init_from_volume(self):
vol = objects.Volume(self.backend,
name='vol_name', description='vol_desc', size=10)
vol2 = objects.Volume(vol, name='new_name', size=11)
self.assertEqual(self.backend, vol2.backend)
self.assertEqual('new_name', vol2.name)
self.assertEqual('new_name', vol2.display_name)
self.assertEqual(vol.description, vol2.description)
self.assertEqual(11, vol2.size)
self.assertIsNotNone(vol2.id)
self.assertNotEqual(vol.id, vol2.id)
def test_init_from_ovo(self):
vol = objects.Volume(self.backend, size=10)
vol2 = objects.Volume(self.backend, __ovo=vol._ovo)
self.assertEqual(vol._ovo, vol2._ovo)
def test_snapshots_lazy_loading(self):
vol = objects.Volume(self.backend, size=10)
vol._snapshots = None
snaps = [objects.Snapshot(vol, name='my_snap')]
# Persistence retrieves Snapshots without the Volume, just volume_id
snaps[0]._ovo.volume = None
mock_get_snaps = self.persistence.get_snapshots
mock_get_snaps.return_value = snaps
result = vol.snapshots
mock_get_snaps.called_once_with(vol.id)
self.assertEqual(snaps, result)
self.assertEqual(snaps, vol._snapshots)
self.assertEqual(1, len(vol._ovo.snapshots))
self.assertEqual(vol._ovo.snapshots[0], result[0]._ovo)
# There is no second call when we reference it again
mock_get_snaps.reset_mock()
result = vol.snapshots
self.assertEqual(snaps, result)
mock_get_snaps.not_called()
def test_connections_lazy_loading(self):
vol = objects.Volume(self.backend, size=10)
vol._connections = None
conns = [objects.Connection(self.backend, connector={'k': 'v'},
volume_id=vol.id, status='attached',
attach_mode='rw',
connection_info={'conn': {}},
name='my_snap')]
mock_get_conns = self.persistence.get_connections
mock_get_conns.return_value = conns
result = vol.connections
mock_get_conns.called_once_with(volume_id=vol.id)
self.assertEqual(conns, result)
self.assertEqual(conns, vol._connections)
self.assertEqual(1, len(vol._ovo.volume_attachment))
self.assertEqual(vol._ovo.volume_attachment[0], result[0]._ovo)
# There is no second call when we reference it again
mock_get_conns.reset_mock()
result = vol.connections
self.assertEqual(conns, result)
mock_get_conns.not_called()
def test_get_by_id(self):
mock_get_vols = self.persistence.get_volumes
mock_get_vols.return_value = [mock.sentinel.vol]
res = objects.Volume.get_by_id(mock.sentinel.vol_id)
mock_get_vols.assert_called_once_with(volume_id=mock.sentinel.vol_id)
self.assertEqual(mock.sentinel.vol, res)
def test_get_by_id_not_found(self):
mock_get_vols = self.persistence.get_volumes
mock_get_vols.return_value = None
self.assertRaises(exception.VolumeNotFound,
objects.Volume.get_by_id, mock.sentinel.vol_id)
mock_get_vols.assert_called_once_with(volume_id=mock.sentinel.vol_id)
def test_get_by_name(self):
res = objects.Volume.get_by_name(mock.sentinel.name)
mock_get_vols = self.persistence.get_volumes
mock_get_vols.assert_called_once_with(volume_name=mock.sentinel.name)
self.assertEqual(mock_get_vols.return_value, res)
def test_create(self):
self.backend.driver.create_volume.return_value = None
vol = self.backend.create_volume(10, name='vol_name',
description='des')
self.backend.driver.create_volume.assert_called_once_with(vol._ovo)
self.assertEqual('available', vol.status)
self.persistence.set_volume.assert_called_once_with(vol)
def test_create_error(self):
self.backend.driver.create_volume.side_effect = exception.NotFound
with self.assertRaises(exception.NotFound) as assert_context:
self.backend.create_volume(10, name='vol_name', description='des')
vol = assert_context.exception.resource
self.assertIsInstance(vol, objects.Volume)
self.assertEqual(10, vol.size)
self.assertEqual('vol_name', vol.name)
self.assertEqual('des', vol.description)
def test_delete(self):
vol = objects.Volume(self.backend_name, size=10)
vol.delete()
self.backend.driver.delete_volume.assert_called_once_with(vol._ovo)
self.persistence.delete_volume.assert_called_once_with(vol)
self.assertEqual('deleted', vol.status)
def test_delete_error_with_snaps(self):
vol = objects.Volume(self.backend_name, size=10, status='available')
snap = objects.Snapshot(vol)
vol._snapshots.append(snap)
self.assertRaises(exception.InvalidVolume, vol.delete)
self.assertEqual('available', vol.status)
def test_delete_error(self):
vol = objects.Volume(self.backend_name,
name='vol_name', description='vol_desc', size=10)
self.backend.driver.delete_volume.side_effect = exception.NotFound
with self.assertRaises(exception.NotFound) as assert_context:
vol.delete()
self.assertEqual(vol, assert_context.exception.resource)
self.backend.driver.delete_volume.assert_called_once_with(vol._ovo)
self.assertEqual('error_deleting', vol.status)
def test_extend(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
vol.extend(11)
self.backend.driver.extend_volume.assert_called_once_with(vol._ovo, 11)
self.persistence.set_volume.assert_called_once_with(vol)
self.assertEqual('available', vol.status)
self.assertEqual(11, vol.size)
def test_extend_error(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
self.backend.driver.extend_volume.side_effect = exception.NotFound
with self.assertRaises(exception.NotFound) as assert_context:
vol.extend(11)
self.assertEqual(vol, assert_context.exception.resource)
self.backend.driver.extend_volume.assert_called_once_with(vol._ovo, 11)
self.persistence.set_volume.assert_called_once_with(vol)
self.assertEqual('error', vol.status)
self.assertEqual(10, vol.size)
def test_clone(self):
vol = objects.Volume(self.backend_name, status='available', size=10,
extra_specs={'e': 'v'}, qos_specs={'q': 'qv'})
mock_clone = self.backend.driver.create_cloned_volume
mock_clone.return_value = None
res = vol.clone(size=11)
mock_clone.assert_called_once_with(res._ovo, vol._ovo)
self.persistence.set_volume.assert_called_once_with(res)
self.assertEqual('available', res.status)
self.assertEqual(11, res.size)
self.assertEqual(vol.id, vol.volume_type_id)
self.assertNotEqual(vol.id, res.id)
self.assertEqual(res.id, res.volume_type_id)
self.assertEqual(vol.volume_type.extra_specs,
res.volume_type.extra_specs)
self.assertEqual(vol.volume_type.qos_specs.specs,
res.volume_type.qos_specs.specs)
def test_clone_error(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_clone = self.backend.driver.create_cloned_volume
mock_clone.side_effect = exception.NotFound
with self.assertRaises(exception.NotFound) as assert_context:
vol.clone(size=11)
# Cloning volume is still in flight
self.assertEqual(1, len(self.backend._volumes_inflight))
new_vol = list(self.backend._volumes_inflight.values())[0]
self.assertEqual(new_vol, assert_context.exception.resource)
mock_clone.assert_called_once_with(new_vol, vol._ovo)
self.persistence.set_volume.assert_called_once_with(new_vol)
self.assertEqual('error', new_vol.status)
self.assertEqual(11, new_vol.size)
def test_create_snapshot(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_create = self.backend.driver.create_snapshot
mock_create.return_value = None
snap = vol.create_snapshot()
self.assertEqual([snap], vol.snapshots)
self.assertEqual([snap._ovo], vol._ovo.snapshots.objects)
mock_create.assert_called_once_with(snap._ovo)
self.assertEqual('available', snap.status)
self.assertEqual(10, snap.volume_size)
self.persistence.set_snapshot.assert_called_once_with(snap)
def test_create_snapshot_error(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_create = self.backend.driver.create_snapshot
mock_create.side_effect = exception.NotFound
self.assertRaises(exception.NotFound, vol.create_snapshot)
self.assertEqual(1, len(vol.snapshots))
snap = vol.snapshots[0]
self.persistence.set_snapshot.assert_called_once_with(snap)
self.assertEqual('error', snap.status)
mock_create.assert_called_once_with(snap._ovo)
@mock.patch('os_brick.initiator.connector.get_connector_properties')
@mock.patch('cinderlib.objects.Volume.connect')
def test_attach(self, mock_connect, mock_conn_props):
vol = objects.Volume(self.backend_name, status='available', size=10)
res = vol.attach()
mock_conn_props.assert_called_once_with(
self.backend.root_helper,
mock.ANY,
self.backend.configuration.use_multipath_for_image_xfer,
self.backend.configuration.enforce_multipath_for_image_xfer)
mock_connect.assert_called_once_with(mock_conn_props.return_value)
mock_connect.return_value.attach.assert_called_once_with()
self.assertEqual(mock_connect.return_value, res)
@mock.patch('os_brick.initiator.connector.get_connector_properties')
@mock.patch('cinderlib.objects.Volume.connect')
def test_attach_error_connect(self, mock_connect, mock_conn_props):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_connect.side_effect = exception.NotFound
self.assertRaises(exception.NotFound, vol.attach)
mock_conn_props.assert_called_once_with(
self.backend.root_helper,
mock.ANY,
self.backend.configuration.use_multipath_for_image_xfer,
self.backend.configuration.enforce_multipath_for_image_xfer)
mock_connect.assert_called_once_with(mock_conn_props.return_value)
mock_connect.return_value.attach.assert_not_called()
@mock.patch('cinderlib.objects.Volume.disconnect')
@mock.patch('os_brick.initiator.connector.get_connector_properties')
@mock.patch('cinderlib.objects.Volume.connect')
def test_attach_error_attach(self, mock_connect, mock_conn_props,
mock_disconnect):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_attach = mock_connect.return_value.attach
mock_attach.side_effect = exception.NotFound
self.assertRaises(exception.NotFound, vol.attach)
mock_conn_props.assert_called_once_with(
self.backend.root_helper,
mock.ANY,
self.backend.configuration.use_multipath_for_image_xfer,
self.backend.configuration.enforce_multipath_for_image_xfer)
mock_connect.assert_called_once_with(mock_conn_props.return_value)
mock_disconnect.assert_called_once_with(mock_connect.return_value)
def test_detach_not_local(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
self.assertRaises(exception.NotLocal, vol.detach)
def test_detach(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_conn = mock.Mock()
vol.local_attach = mock_conn
vol.detach(mock.sentinel.force, mock.sentinel.ignore_errors)
mock_conn.detach.assert_called_once_with(mock.sentinel.force,
mock.sentinel.ignore_errors,
mock.ANY)
mock_conn.disconnect.assert_called_once_with(mock.sentinel.force)
def test_detach_error_detach(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_conn = mock.Mock()
mock_conn.detach.side_effect = exception.NotFound
vol.local_attach = mock_conn
self.assertRaises(exception.NotFound,
vol.detach,
False, mock.sentinel.ignore_errors)
mock_conn.detach.assert_called_once_with(False,
mock.sentinel.ignore_errors,
mock.ANY)
mock_conn.disconnect.assert_not_called()
def test_detach_error_disconnect(self):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_conn = mock.Mock()
mock_conn.disconnect.side_effect = exception.NotFound
vol.local_attach = mock_conn
self.assertRaises(objects.brick_exception.ExceptionChainer,
vol.detach,
mock.sentinel.force, False)
mock_conn.detach.assert_called_once_with(mock.sentinel.force,
False,
mock.ANY)
mock_conn.disconnect.assert_called_once_with(mock.sentinel.force)
@mock.patch('cinderlib.objects.Connection.connect')
def test_connect(self, mock_connect):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_connect.return_value._ovo = objects.cinder_objs.VolumeAttachment()
mock_export = self.backend.driver.create_export
mock_export.return_value = None
res = vol.connect(mock.sentinel.conn_dict)
mock_connect.assert_called_once_with(vol, mock.sentinel.conn_dict)
self.assertEqual([res], vol.connections)
self.assertEqual([res._ovo], vol._ovo.volume_attachment.objects)
self.assertEqual('in-use', vol.status)
self.persistence.set_volume.assert_called_once_with(vol)
@mock.patch('cinderlib.objects.Volume._remove_export')
@mock.patch('cinderlib.objects.Connection.connect')
def test_connect_error(self, mock_connect, mock_remove_export):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_export = self.backend.driver.create_export
mock_export.return_value = None
mock_connect.side_effect = exception.NotFound
self.assertRaises(exception.NotFound,
vol.connect, mock.sentinel.conn_dict)
mock_connect.assert_called_once_with(vol, mock.sentinel.conn_dict)
self.assertEqual('available', vol.status)
self.persistence.set_volume.assert_not_called()
mock_remove_export.assert_called_once_with()
@mock.patch('cinderlib.objects.Volume._disconnect')
def test_disconnect(self, mock_disconnect):
vol = objects.Volume(self.backend_name, status='available', size=10)
mock_conn = mock.Mock()
vol.disconnect(mock_conn, mock.sentinel.force)
mock_conn._disconnect.assert_called_once_with(mock.sentinel.force)
mock_disconnect.assert_called_once_with(mock_conn)
@mock.patch('cinderlib.objects.Volume._remove_export')
def test__disconnect(self, mock_remove_export):
vol = objects.Volume(self.backend_name, status='in-use', size=10)
vol._disconnect(mock.sentinel.connection)
mock_remove_export.assert_called_once_with()
self.assertEqual('available', vol.status)
self.persistence.set_volume.assert_called_once_with(vol)
def test__remove_export(self):
vol = objects.Volume(self.backend_name, status='in-use', size=10)
vol._remove_export()
self.backend.driver.remove_export.assert_called_once_with(vol._context,
vol._ovo)
@mock.patch('cinderlib.objects.Volume._remove_export')
def test_cleanup(self, mock_remove_export):
vol = objects.Volume(self.backend_name, status='in-use', size=10)
connections = [mock.Mock(), mock.Mock()]
vol._connections = connections
vol.cleanup()
mock_remove_export.assert_called_once_with()
for c in connections:
c.detach.asssert_called_once_with()

View File

@ -13,45 +13,39 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest2
from cinder.cmd import volume as volume_cmd
from cinder.db.sqlalchemy import api
from cinder.db.sqlalchemy import models
from oslo_versionedobjects import fields
import cinderlib
from tests.unit import utils
from cinderlib.tests.unit import base
from cinderlib.tests.unit import utils
class BasePersistenceTest(unittest2.TestCase):
class BasePersistenceTest(base.BaseTest):
@classmethod
def setUpClass(cls):
cls.original_impl = volume_cmd.session.IMPL
# We check the entrypoint is working
cinderlib.Backend.global_initialization = False
cinderlib.setup(persistence_config=cls.PERSISTENCE_CFG)
cls.persistence = cinderlib.Backend.persistence
cls.context = cinderlib.objects.CONTEXT
@classmethod
def tearDownClass(cls):
volume_cmd.session.IMPL = cls.original_impl
cinderlib.Backend.global_initialization = False
api.main_context_manager = api.enginefacade.transaction_context()
def setUp(self):
self.backend = utils.FakeBackend()
def tearDown(self):
# Clear all existing backends
cinderlib.Backend.backends = {}
super(BasePersistenceTest, self).tearDown()
super(BasePersistenceTest, self).setUp()
self.context = cinderlib.objects.CONTEXT
def sorted(self, resources, key='id'):
return sorted(resources, key=lambda x: getattr(x, key))
def create_n_volumes(self, n):
return self.create_volumes([{'size': i, 'name': 'disk%s' % i}
for i in range(1, n+1)])
for i in range(1, n + 1)])
def create_volumes(self, data, sort=True):
vols = []

View File

@ -21,7 +21,7 @@ from oslo_db import api as oslo_db_api
import cinderlib
from cinderlib.persistence import dbms
from tests.unit.persistence import base
from cinderlib.tests.unit.persistence import base
class TestDBPersistence(base.BasePersistenceTest):
@ -105,6 +105,5 @@ class TestDBPersistence(base.BasePersistenceTest):
self.assertListEqualObj(expected, actual)
# TODO: Figure out why we can't run both DB persistence test classes
# class TestMemoryDBPersistence(TestDBPersistence):
# PERSISTENCE_CFG = {'storage': 'memory_db'}
class TestMemoryDBPersistence(TestDBPersistence):
PERSISTENCE_CFG = {'storage': 'memory_db'}

View File

@ -14,7 +14,7 @@
# under the License.
import cinderlib
from tests.unit.persistence import base
from cinderlib.tests.unit.persistence import base
class TestMemoryPersistence(base.BasePersistenceTest):

View File

@ -0,0 +1,241 @@
# Copyright (c) 2017, Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
import cinderlib
from cinderlib import objects
from cinderlib.tests.unit import base
class TestCinderlib(base.BaseTest):
def test_lib_assignations(self):
self.assertEqual(cinderlib.setup, cinderlib.Backend.global_setup)
self.assertEqual(cinderlib.Backend, cinderlib.objects.Backend)
self.assertEqual(cinderlib.Backend,
cinderlib.objects.Object.backend_class)
@mock.patch('oslo_utils.importutils.import_object')
@mock.patch('cinderlib.Backend._set_backend_config')
@mock.patch('cinderlib.Backend.global_setup')
def test_init(self, mock_global_setup, mock_config, mock_import):
cfg.CONF.set_override('host', 'host')
driver_cfg = {'k': 'v', 'k2': 'v2', 'volume_backend_name': 'Test'}
cinderlib.Backend.global_initialization = False
driver = mock_import.return_value
driver.get_volume_stats.return_value = {
'pools': [{'pool_name': 'default'}]}
backend = objects.Backend(**driver_cfg)
mock_global_setup.assert_called_once_with()
self.assertIn('Test', objects.Backend.backends)
self.assertEqual(backend, objects.Backend.backends['Test'])
mock_config.assert_called_once_with(driver_cfg)
conf = mock_config.return_value
mock_import.assert_called_once_with(conf.volume_driver,
configuration=conf,
db=self.persistence.db,
host='host@Test',
cluster_name=None,
active_backend_id=None)
self.assertEqual(backend.driver, driver)
driver.do_setup.assert_called_once_with(objects.CONTEXT)
driver.check_for_setup_error.assert_called_once_with()
driver.init_capabilities.assert_called_once_with()
driver.set_throttle.assert_called_once_with()
driver.set_initialized.assert_called_once_with()
self.assertEqual(driver_cfg, backend._driver_cfg)
self.assertIsNone(backend._volumes)
driver.get_volume_stats.assert_called_once_with(refresh=False)
self.assertEqual(('default',), backend.pool_names)
@mock.patch('urllib3.disable_warnings')
@mock.patch('cinder.coordination.COORDINATOR')
@mock.patch('cinderlib.Backend._set_priv_helper')
@mock.patch('cinderlib.Backend._set_logging')
@mock.patch('cinderlib.cinderlib.serialization')
@mock.patch('cinderlib.Backend.set_persistence')
def test_global_setup(self, mock_set_pers, mock_serial, mock_log,
mock_sudo, mock_coord, mock_disable_warn):
cls = objects.Backend
cls.global_initialization = False
cinder_cfg = {'k': 'v', 'k2': 'v2'}
cls.global_setup('file_locks',
mock.sentinel.root_helper,
mock.sentinel.ssl_warnings,
mock.sentinel.disable_logs,
mock.sentinel.non_uuid_ids,
mock.sentinel.backend_info,
mock.sentinel.project_id,
mock.sentinel.user_id,
mock.sentinel.pers_cfg,
mock.sentinel.fail_missing_backend,
'mock.sentinel.host',
**cinder_cfg)
self.assertEqual('file_locks', cfg.CONF.oslo_concurrency.lock_path)
self.assertEqual('file://file_locks',
cfg.CONF.coordination.backend_url)
self.assertEqual(mock.sentinel.fail_missing_backend,
cls.fail_on_missing_backend)
self.assertEqual(mock.sentinel.root_helper, cls.root_helper)
self.assertEqual(mock.sentinel.project_id, cls.project_id)
self.assertEqual(mock.sentinel.user_id, cls.user_id)
self.assertEqual(mock.sentinel.non_uuid_ids, cls.non_uuid_ids)
self.assertEqual('mock.sentinel.host', cfg.CONF.host)
mock_set_pers.assert_called_once_with(mock.sentinel.pers_cfg)
self.assertEqual(cinderlib.__version__, cfg.CONF.version)
mock_serial.setup.assert_called_once_with(cls)
mock_log.assert_called_once_with(mock.sentinel.disable_logs)
mock_sudo.assert_called_once_with(mock.sentinel.root_helper)
mock_coord.start.assert_called_once_with()
self.assertEqual(2, mock_disable_warn.call_count)
self.assertTrue(cls.global_initialization)
self.assertEqual(mock.sentinel.backend_info,
cls.output_all_backend_info)
def test_pool_names(self):
pool_names = [mock.sentinel._pool_names]
self.backend._pool_names = pool_names
self.assertEqual(pool_names, self.backend.pool_names)
def test_volumes(self):
self.backend._volumes = None
res = self.backend.volumes
self.assertEqual(self.persistence.get_volumes.return_value, res)
self.assertEqual(self.persistence.get_volumes.return_value,
self.backend._volumes)
self.persistence.get_volumes.assert_called_once_with(
backend_name=self.backend.id)
def test_id(self):
self.assertEqual(self.backend._driver_cfg['volume_backend_name'],
self.backend.id)
def test_volumes_filtered(self):
res = self.backend.volumes_filtered(mock.sentinel.vol_id,
mock.sentinel.vol_name)
self.assertEqual(self.persistence.get_volumes.return_value, res)
self.assertEqual([], self.backend._volumes)
self.persistence.get_volumes.assert_called_once_with(
backend_name=self.backend.id,
volume_id=mock.sentinel.vol_id,
volume_name=mock.sentinel.vol_name)
def test_stats(self):
expect = {'pools': [mock.sentinel.data]}
with mock.patch.object(self.backend.driver, 'get_volume_stats',
return_value=expect) as mock_stat:
res = self.backend.stats(mock.sentinel.refresh)
self.assertEqual(expect, res)
mock_stat.assert_called_once_with(refresh=mock.sentinel.refresh)
def test_stats_single(self):
stat_value = {'driver_version': 'v1', 'key': 'value'}
expect = {'driver_version': 'v1', 'key': 'value',
'pools': [{'key': 'value', 'pool_name': 'fake_backend'}]}
with mock.patch.object(self.backend.driver, 'get_volume_stats',
return_value=stat_value) as mock_stat:
res = self.backend.stats(mock.sentinel.refresh)
self.assertEqual(expect, res)
mock_stat.assert_called_once_with(refresh=mock.sentinel.refresh)
@mock.patch('cinderlib.objects.Volume')
def test_create_volume(self, mock_vol):
kwargs = {'k': 'v', 'k2': 'v2'}
res = self.backend.create_volume(mock.sentinel.size,
mock.sentinel.name,
mock.sentinel.desc,
mock.sentinel.boot,
**kwargs)
self.assertEqual(mock_vol.return_value, res)
mock_vol.assert_called_once_with(self.backend, size=mock.sentinel.size,
name=mock.sentinel.name,
description=mock.sentinel.desc,
bootable=mock.sentinel.boot,
**kwargs)
mock_vol.return_value.create.assert_called_once_with()
def test__volume_removed_no_list(self):
self.backend._volume_removed(mock.sentinel.volume)
def test__volume_removed(self):
vol = cinderlib.objects.Volume(self.backend, size=10)
vol2 = cinderlib.objects.Volume(self.backend, id=vol.id, size=10)
self.backend._volumes.append(vol)
self.backend._volume_removed(vol2)
self.assertEqual([], self.backend.volumes)
def test__volume_created(self):
vol = cinderlib.objects.Volume(self.backend, size=10)
self.backend._volume_created(vol)
self.assertEqual([vol], self.backend.volumes)
def test__volume_created_is_none(self):
vol = cinderlib.objects.Volume(self.backend, size=10)
self.backend._volume_created(vol)
self.assertEqual([vol], self.backend.volumes)
def test_validate_connector(self):
self.backend.validate_connector(mock.sentinel.connector)
self.backend.driver.validate_connector.assert_called_once_with(
mock.sentinel.connector)
@mock.patch('cinderlib.objects.setup')
@mock.patch('cinderlib.persistence.setup')
def test_set_persistence(self, mock_pers_setup, mock_obj_setup):
cinderlib.Backend.global_initialization = True
cinderlib.Backend.set_persistence(mock.sentinel.pers_cfg)
mock_pers_setup.assert_called_once_with(mock.sentinel.pers_cfg)
self.assertEqual(mock_pers_setup.return_value,
cinderlib.Backend.persistence)
mock_obj_setup.assert_called_once_with(mock_pers_setup.return_value,
cinderlib.Backend,
self.backend.project_id,
self.backend.user_id,
self.backend.non_uuid_ids)
self.assertEqual(mock_pers_setup.return_value.db,
self.backend.driver.db)
def test_config(self):
self.backend.output_all_backend_info = False
res = self.backend.config
self.assertEqual({'volume_backend_name': self.backend.id}, res)
def test_config_full(self):
self.backend.output_all_backend_info = True
with mock.patch.object(self.backend, '_driver_cfg') as mock_driver:
res = self.backend.config
self.assertEqual(mock_driver, res)
def test_refresh(self):
self.backend.refresh()
self.persistence.get_volumes.assert_called_once_with(
backend_name=self.backend.id)
def test_refresh_no_call(self):
self.backend._volumes = None
self.backend.refresh()
self.persistence.get_volumes.assert_not_called()

View File

@ -16,6 +16,11 @@
import mock
import cinderlib
from cinderlib.persistence import base
def get_mock_persistence():
return mock.MagicMock(spec=base.PersistenceDriverBase)
class FakeBackend(cinderlib.Backend):
@ -24,4 +29,6 @@ class FakeBackend(cinderlib.Backend):
cinderlib.Backend.backends[driver_name] = self
self._driver_cfg = {'volume_backend_name': driver_name}
self.driver = mock.Mock()
self.driver.persistence = cinderlib.Backend.persistence
self._pool_names = (driver_name,)
self._volumes = []

View File

@ -18,7 +18,7 @@ import six
if six.PY2:
# Python 2 workaround for getaddrinfo (fails if port is valid unicode)
def my_getaddrinfo(original, host, port, *args, **kwargs):
if isinstance(port, unicode):
if isinstance(port, six.text_type):
port = str(port)
return original(host, port, *args, **kwargs)
import functools

View File

@ -1,3 +1,4 @@
ddt>=1.0.1
unittest2
pyyaml
pip==8.1.2

View File

@ -1,4 +1,3 @@
Sphinx==1.6.5
git+https://github.com/akrog/modulefaker.git#egg=modulefaker
git+https://github.com/akrog/cindermock.git
git+https://github.com/akrog/nosbrickmock.git

View File

@ -17,7 +17,6 @@ with open('HISTORY.rst') as history_file:
requirements = [
'cinder>=11.0',
'nos-brick',
]
test_requirements = [
@ -63,7 +62,7 @@ setuptools.setup(
author="Gorka Eguileor",
author_email='geguileo@redhat.com',
url='https://github.com/akrog/cinderlib',
packages=setuptools.find_packages(exclude=['tmp', 'tests*']),
packages=setuptools.find_packages(exclude=['tmp', 'cinderlib/tests']),
include_package_data=False,
install_requires=requirements,
extras_require=extras,

View File

@ -1 +0,0 @@
# -*- coding: utf-8 -*-

View File

@ -1 +0,0 @@
# -*- coding: utf-8 -*-

View File

@ -1 +0,0 @@
# -*- coding: utf-8 -*-

View File

@ -1,25 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_cinderlib
----------------------------------
Tests for `cinderlib` module.
"""
import unittest2
import cinderlib
class TestCinderlib(unittest2.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_lib_setup(self):
self.assertEqual(cinderlib.setup, cinderlib.Backend.global_setup)

19
tools/lvm.yaml Normal file
View File

@ -0,0 +1,19 @@
# For Fedora, CentOS, RHEL we require the targetcli package.
# For Ubuntu we require lio-utils or changing the target iscsi_helper
#
# Logs are way too verbose, so we disable them
logs: false
# LVM backend uses cinder-rtstool command that is installed by Cinder in the
# virtual environment, so we need the custom sudo command that inherits the
# virtualenv binaries PATH
venv_sudo: false
# We only define one backend
backends:
- volume_backend_name: lvm
volume_driver: cinder.volume.drivers.lvm.LVMVolumeDriver
volume_group: cinder-volumes
target_protocol: iscsi
target_helper: lioadm

View File

@ -18,7 +18,7 @@ setenv =
deps= -r{toxinidir}/requirements_dev.txt
commands =
unit2 discover -v -s tests/unit []
unit2 discover -v -s cinderlib/tests/unit []
[testenv:functional]
sitepackages = True
@ -28,6 +28,6 @@ basepython=python2.7
envdir = {toxworkdir}/py27
# Pass on the location of the backend configuration to the tests
setenv = CL_FTEST_CFG = {env:CL_FTEST_CFG:tests/functional/lvm.yaml}
setenv = CL_FTEST_CFG = {env:CL_FTEST_CFG:tools/lvm.yaml}
commands =
unit2 discover -v -s tests/functional []
unit2 discover -v -s cinderlib/tests/functional []