Merge "db: Remove 'db' argument from various managers"

This commit is contained in:
Zuul 2021-09-15 22:53:02 +00:00 committed by Gerrit Code Review
commit 01183a1717
29 changed files with 95 additions and 113 deletions

View File

@ -59,12 +59,11 @@ IMPORT_VOLUME_ID = '00000000-0000-0000-0000-000000000000'
class API(base.Base):
"""API for interacting with the volume backup manager."""
# TODO(stephenfin): The 'db' kwarg is unused; remove it
def __init__(self, db=None):
def __init__(self):
self.backup_rpcapi = backup_rpcapi.BackupAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.volume_api = cinder.volume.API()
super(API, self).__init__()
super().__init__()
def get(self, context, backup_id):
backup = objects.Backup.get_by_id(context, backup_id)

View File

@ -111,10 +111,11 @@ class ChunkedBackupDriver(driver.BackupDriver, metaclass=abc.ABCMeta):
err = _('unsupported compression algorithm: %s') % algorithm
raise ValueError(err)
def __init__(self, context, chunk_size_bytes, sha_block_size_bytes,
backup_default_container, enable_progress_timer,
db=None):
super(ChunkedBackupDriver, self).__init__(context, db)
def __init__(
self, context, chunk_size_bytes, sha_block_size_bytes,
backup_default_container, enable_progress_timer,
):
super(ChunkedBackupDriver, self).__init__(context)
self.chunk_size_bytes = chunk_size_bytes
self.sha_block_size_bytes = sha_block_size_bytes
self.backup_default_container = backup_default_container

View File

@ -52,7 +52,6 @@ class BackupMetadataAPI(base.Base):
TYPE_TAG_VOL_META = 'volume-metadata'
TYPE_TAG_VOL_GLANCE_META = 'volume-glance-metadata'
# TODO(stephenfin): The 'db' kwarg is unused; remove it
def __init__(self, context):
super().__init__()
self.context = context
@ -348,8 +347,7 @@ class BackupMetadataAPI(base.Base):
class BackupDriver(base.Base, metaclass=abc.ABCMeta):
# TODO(stephenfin): The 'db' kwarg is unused; remove it
def __init__(self, context, db=None):
def __init__(self, context):
super().__init__()
self.context = context
self.backup_meta_api = BackupMetadataAPI(context)

View File

@ -174,8 +174,8 @@ class CephBackupDriver(driver.BackupDriver):
gain.
"""
def __init__(self, context, db=None, execute=None):
super(CephBackupDriver, self).__init__(context, db)
def __init__(self, context, execute=None):
super().__init__(context)
self.rbd = rbd
self.rados = rados
self.chunk_size = CONF.backup_ceph_chunk_size

View File

@ -163,18 +163,20 @@ def _get_dist_version(name):
class GoogleBackupDriver(chunkeddriver.ChunkedBackupDriver):
"""Provides backup, restore and delete of backup objects within GCS."""
def __init__(self, context, db=None):
def __init__(self, context):
global OAUTH_EXCEPTIONS
backup_bucket = CONF.backup_gcs_bucket
self.gcs_project_id = CONF.backup_gcs_project_id
chunk_size_bytes = CONF.backup_gcs_object_size
sha_block_size_bytes = CONF.backup_gcs_block_size
enable_progress_timer = CONF.backup_gcs_enable_progress_timer
super(GoogleBackupDriver, self).__init__(context, chunk_size_bytes,
sha_block_size_bytes,
backup_bucket,
enable_progress_timer,
db)
super().__init__(
context,
chunk_size_bytes,
sha_block_size_bytes,
backup_bucket,
enable_progress_timer,
)
self.reader_chunk_size = CONF.backup_gcs_reader_chunk_size
self.writer_chunk_size = CONF.backup_gcs_writer_chunk_size
self.bucket_location = CONF.backup_gcs_bucket_location

View File

@ -46,14 +46,13 @@ CONF.register_opts(glusterfsbackup_service_opts)
class GlusterfsBackupDriver(posix.PosixBackupDriver):
"""Provides backup, restore and delete using GlusterFS repository."""
def __init__(self, context, db=None):
def __init__(self, context):
self.backup_mount_point_base = CONF.glusterfs_backup_mount_point
self.backup_share = CONF.glusterfs_backup_share
self._execute = putils.execute
self._root_helper = utils.get_root_helper()
backup_path = self._init_backup_repo_path()
super(GlusterfsBackupDriver, self).__init__(context,
backup_path=backup_path)
super().__init__(context, backup_path=backup_path)
@staticmethod
def get_driver_options():

View File

@ -58,7 +58,7 @@ CONF.register_opts(nfsbackup_service_opts)
class NFSBackupDriver(posix.PosixBackupDriver):
"""Provides backup, restore and delete using NFS supplied repository."""
def __init__(self, context, db=None):
def __init__(self, context):
self.backup_mount_point_base = CONF.backup_mount_point_base
self.backup_share = CONF.backup_share
self.mount_options = CONF.backup_mount_options
@ -66,8 +66,7 @@ class NFSBackupDriver(posix.PosixBackupDriver):
self._root_helper = utils.get_root_helper()
backup_path = self._init_backup_repo_path()
LOG.debug("Using NFS backup repository: %s", backup_path)
super(NFSBackupDriver, self).__init__(context,
backup_path=backup_path)
super().__init__(context, backup_path=backup_path)
def check_for_setup_error(self):
"""Raises error if any required configuration flag is missing."""

View File

@ -70,16 +70,18 @@ CONF.register_opts(posixbackup_service_opts)
class PosixBackupDriver(chunkeddriver.ChunkedBackupDriver):
"""Provides backup, restore and delete using a Posix file system."""
def __init__(self, context, db=None, backup_path=None):
def __init__(self, context, backup_path=None):
chunk_size_bytes = CONF.backup_file_size
sha_block_size_bytes = CONF.backup_sha_block_size_bytes
backup_default_container = CONF.backup_container
enable_progress_timer = CONF.backup_enable_progress_timer
super(PosixBackupDriver, self).__init__(context, chunk_size_bytes,
sha_block_size_bytes,
backup_default_container,
enable_progress_timer,
db)
super().__init__(
context,
chunk_size_bytes,
sha_block_size_bytes,
backup_default_container,
enable_progress_timer,
)
self.backup_path = backup_path
if not backup_path:
self.backup_path = CONF.backup_posix_path

View File

@ -170,16 +170,18 @@ def _wrap_exception(func):
class S3BackupDriver(chunkeddriver.ChunkedBackupDriver):
"""Provides backup, restore and delete of backup objects within S3."""
def __init__(self, context, db=None):
def __init__(self, context):
chunk_size_bytes = CONF.backup_s3_object_size
sha_block_size_bytes = CONF.backup_s3_block_size
backup_bucket = CONF.backup_s3_store_bucket
enable_progress_timer = CONF.backup_s3_enable_progress_timer
super(S3BackupDriver, self).__init__(context, chunk_size_bytes,
sha_block_size_bytes,
backup_bucket,
enable_progress_timer,
db)
super().__init__(
context,
chunk_size_bytes,
sha_block_size_bytes,
backup_bucket,
enable_progress_timer,
)
config_args = dict(
connect_timeout=CONF.backup_s3_timeout,
read_timeout=CONF.backup_s3_timeout,

View File

@ -151,16 +151,18 @@ CONF.register_opts(swiftbackup_service_opts)
class SwiftBackupDriver(chunkeddriver.ChunkedBackupDriver):
"""Provides backup, restore and delete of backup objects within Swift."""
def __init__(self, context, db=None):
def __init__(self, context):
chunk_size_bytes = CONF.backup_swift_object_size
sha_block_size_bytes = CONF.backup_swift_block_size
backup_default_container = CONF.backup_swift_container
enable_progress_timer = CONF.backup_swift_enable_progress_timer
super(SwiftBackupDriver, self).__init__(context, chunk_size_bytes,
sha_block_size_bytes,
backup_default_container,
enable_progress_timer,
db)
super().__init__(
context,
chunk_size_bytes,
sha_block_size_bytes,
backup_default_container,
enable_progress_timer,
)
# Do not intialize the instance created when the backup service
# starts up. The context will be missing information to do things

View File

@ -158,7 +158,7 @@ class BackupManager(manager.SchedulerDependentManager):
self.publish_service_capabilities(ctxt)
def _setup_backup_driver(self, ctxt):
backup_service = self.service(context=ctxt, db=self.db)
backup_service = self.service(context=ctxt)
backup_service.check_for_setup_error()
self.is_initialized = True
raise loopingcall.LoopingCallDone()

View File

@ -136,6 +136,7 @@ class API(base.Base):
def __init__(self):
self.message_api = message_api.API()
super().__init__()
def _get_volume_extended_event(self, server_id, volume_id):
return {'name': 'volume-extended',

View File

@ -17,8 +17,8 @@ from cinder.backup import driver
class FakeBackupService(driver.BackupDriver):
def __init__(self, context, db=None):
super(FakeBackupService, self).__init__(context, db)
def __init__(self, context):
super().__init__(context)
def backup(self, backup, volume_file):
pass

View File

@ -1206,7 +1206,6 @@ class VolumeUtilsTestCase(test.TestCase):
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
ret = volume_utils.check_encryption_provider(
db,
volume,
mock.sentinel.context)
self.assertEqual('aes-xts-plain64', ret['cipher'])
@ -1227,11 +1226,11 @@ class VolumeUtilsTestCase(test.TestCase):
ctxt = context.get_admin_context()
volume = fake_volume.fake_volume_obj(ctxt, **volume_data)
self.assertRaises(exception.VolumeDriverException,
volume_utils.check_encryption_provider,
db,
volume,
mock.sentinel.context)
self.assertRaises(
exception.VolumeDriverException,
volume_utils.check_encryption_provider,
volume,
mock.sentinel.context)
@mock.patch('cinder.volume.volume_utils.CONF.list_all_sections')
def test_get_backend_configuration_backend_stanza_not_found(self,

View File

@ -22,7 +22,6 @@ import requests
from requests import models
from cinder import context as cinder_context
from cinder import db
from cinder.db.sqlalchemy import api as sqlalchemy_api
from cinder.objects import group_snapshot as obj_group_snap
from cinder.objects import snapshot as obj_snap
@ -476,7 +475,7 @@ class HBSDRESTFCDriverTest(test.TestCase):
self, brick_get_connector_properties=None, request=None):
"""Set up the driver environment."""
self.driver = hbsd_fc.HBSDFCDriver(
configuration=self.configuration, db=db)
configuration=self.configuration)
request.side_effect = [FakeResponse(200, POST_SESSIONS_RESULT),
FakeResponse(200, GET_PORTS_RESULT),
FakeResponse(200, GET_HOST_WWNS_RESULT)]
@ -502,7 +501,7 @@ class HBSDRESTFCDriverTest(test.TestCase):
side_effect=_brick_get_connector_properties)
def test_do_setup(self, brick_get_connector_properties, request):
drv = hbsd_fc.HBSDFCDriver(
configuration=self.configuration, db=db)
configuration=self.configuration)
self._setup_config()
request.side_effect = [FakeResponse(200, POST_SESSIONS_RESULT),
FakeResponse(200, GET_PORTS_RESULT),
@ -524,7 +523,7 @@ class HBSDRESTFCDriverTest(test.TestCase):
def test_do_setup_create_hg(self, brick_get_connector_properties, request):
"""Normal case: The host group not exists."""
drv = hbsd_fc.HBSDFCDriver(
configuration=self.configuration, db=db)
configuration=self.configuration)
self._setup_config()
request.side_effect = [FakeResponse(200, POST_SESSIONS_RESULT),
FakeResponse(200, GET_PORTS_RESULT),
@ -551,7 +550,7 @@ class HBSDRESTFCDriverTest(test.TestCase):
def test_do_setup_pool_name(self, brick_get_connector_properties, request):
"""Normal case: Specify a pool name instead of pool id"""
drv = hbsd_fc.HBSDFCDriver(
configuration=self.configuration, db=db)
configuration=self.configuration)
self._setup_config()
tmp_pool = self.configuration.hitachi_pool
self.configuration.hitachi_pool = CONFIG_MAP['pool_name']

View File

@ -20,7 +20,6 @@ from oslo_config import cfg
import requests
from cinder import context as cinder_context
from cinder import db
from cinder.db.sqlalchemy import api as sqlalchemy_api
from cinder.objects import group_snapshot as obj_group_snap
from cinder.objects import snapshot as obj_snap
@ -373,7 +372,7 @@ class HBSDRESTISCSIDriverTest(test.TestCase):
self, brick_get_connector_properties=None, request=None):
"""Set up the driver environment."""
self.driver = hbsd_iscsi.HBSDISCSIDriver(
configuration=self.configuration, db=db)
configuration=self.configuration)
request.side_effect = [FakeResponse(200, POST_SESSIONS_RESULT),
FakeResponse(200, GET_PORTS_RESULT),
FakeResponse(200, GET_PORT_RESULT),
@ -401,7 +400,7 @@ class HBSDRESTISCSIDriverTest(test.TestCase):
side_effect=_brick_get_connector_properties)
def test_do_setup(self, brick_get_connector_properties, request):
drv = hbsd_iscsi.HBSDISCSIDriver(
configuration=self.configuration, db=db)
configuration=self.configuration)
self._setup_config()
request.side_effect = [FakeResponse(200, POST_SESSIONS_RESULT),
FakeResponse(200, GET_PORTS_RESULT),
@ -428,7 +427,7 @@ class HBSDRESTISCSIDriverTest(test.TestCase):
def test_do_setup_create_hg(self, brick_get_connector_properties, request):
"""Normal case: The host group not exists."""
drv = hbsd_iscsi.HBSDISCSIDriver(
configuration=self.configuration, db=db)
configuration=self.configuration)
self._setup_config()
request.side_effect = [FakeResponse(200, POST_SESSIONS_RESULT),
FakeResponse(200, GET_PORTS_RESULT),

View File

@ -53,8 +53,7 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
mock_exists):
self.configuration.volume_clear = 'zero'
self.configuration.volume_clear_size = 0
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db)
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration)
# Test volume without 'size' field and 'volume_size' field
self.assertRaises(exception.InvalidParameterValue,
lvm_driver._delete_volume,
@ -68,8 +67,7 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
self.configuration.volume_type = 'default'
volume = dict(self.FAKE_VOLUME, size=1)
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db)
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration)
self.assertRaises(exception.VolumeBackendAPIException,
lvm_driver._delete_volume, volume)
@ -87,8 +85,8 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
self.configuration.volume_clear_size = 0
self.configuration.lvm_type = 'thin'
self.configuration.target_helper = 'tgtadm'
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
vg_obj=vg_obj, db=db)
lvm_driver = lvm.LVMVolumeDriver(
configuration=self.configuration, vg_obj=vg_obj)
uuid = '00000000-0000-0000-0000-c3aa7ee01536'
@ -108,8 +106,8 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
'auto')
configuration = conf.Configuration(fake_opt, 'fake_group')
lvm_driver = lvm.LVMVolumeDriver(configuration=configuration,
vg_obj=vg_obj, db=db)
lvm_driver = lvm.LVMVolumeDriver(
configuration=configuration, vg_obj=vg_obj)
lvm_driver.delete_snapshot = mock.Mock()
@ -210,8 +208,7 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
def test_create_volume_from_snapshot_sparse(self):
self.configuration.lvm_type = 'thin'
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db)
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration)
with mock.patch.object(lvm_driver, 'vg'):
@ -227,8 +224,7 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
def test_create_volume_from_snapshot_sparse_extend(self):
self.configuration.lvm_type = 'thin'
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db)
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration)
with mock.patch.object(lvm_driver, 'vg'), \
mock.patch.object(lvm_driver, 'extend_volume') as mock_extend:
@ -342,9 +338,8 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
self.configuration.lvm_type = 'thin'
fake_vg = mock.Mock(fake_lvm.FakeBrickLVM('cinder-volumes', False,
None, 'default'))
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
vg_obj=fake_vg,
db=db)
lvm_driver = lvm.LVMVolumeDriver(
configuration=self.configuration, vg_obj=fake_vg)
fake_volume = tests_utils.create_volume(self.context, size=1)
fake_new_volume = tests_utils.create_volume(self.context, size=2)
@ -528,8 +523,7 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
return [{}]
self.configuration.lvm_type = 'thin'
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db)
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration)
with mock.patch.object(brick_lvm.LVM, 'get_all_physical_volumes',
return_value = [{}]), \
@ -746,8 +740,7 @@ class LVMVolumeDriverTestCase(test_driver.BaseDriverTestCase):
configuration = conf.Configuration(fake_opt, 'fake_group')
configuration.lvm_type = 'thin'
lvm_driver = lvm.LVMVolumeDriver(configuration=configuration,
db=db)
lvm_driver = lvm.LVMVolumeDriver(configuration=configuration)
fake_volume = tests_utils.create_volume(self.context,
display_name='fake_volume')
fake_snapshot = tests_utils.create_snapshot(
@ -975,9 +968,8 @@ class LVMISCSITestCase(test_driver.BaseDriverTestCase):
False,
None,
'default')
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db,
vg_obj=vg_obj)
lvm_driver = lvm.LVMVolumeDriver(
configuration=self.configuration, vg_obj=vg_obj)
lvm_driver.check_for_setup_error()
lvm_driver.vg = brick_lvm.LVM('cinder-volumes', 'sudo')
lvm_driver._update_volume_stats()
@ -1034,8 +1026,8 @@ class LVMISCSITestCase(test_driver.BaseDriverTestCase):
self.configuration = conf.Configuration(None)
vg_obj = fake_lvm.FakeBrickLVM('cinder-volumes', False, None,
'default')
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db, vg_obj=vg_obj)
lvm_driver = lvm.LVMVolumeDriver(
configuration=self.configuration, vg_obj=vg_obj)
with mock.patch.object(lvm_driver.target_driver,
'terminate_connection') as mock_term_conn:
@ -1088,8 +1080,8 @@ class LVMISCSITestCase(test_driver.BaseDriverTestCase):
self.configuration = conf.Configuration(None)
vg_obj = fake_lvm.FakeBrickLVM('cinder-volumes', False, None,
'default')
lvm_driver = lvm.LVMVolumeDriver(configuration=self.configuration,
db=db, vg_obj=vg_obj)
lvm_driver = lvm.LVMVolumeDriver(
configuration=self.configuration, vg_obj=vg_obj)
with mock.patch.object(lvm_driver.target_driver,
'terminate_connection') as mock_term_conn:

View File

@ -102,9 +102,7 @@ class QuobyteDriverTestCase(test.TestCase):
self._configuration.quobyte_volume_from_snapshot_cache = False
self._configuration.quobyte_overlay_volumes = False
self._driver =\
quobyte.QuobyteDriver(configuration=self._configuration,
db=FakeDb())
self._driver = quobyte.QuobyteDriver(configuration=self._configuration)
self._driver.shares = {}
self._driver.set_nas_security_options(is_new_cinder_install=False)
self._driver.base = self._configuration.quobyte_mount_point_base

View File

@ -119,9 +119,7 @@ class VMwareVcVmdkDriverTestCase(test.TestCase):
reserved_percentage=0
)
self._db = mock.Mock()
self._driver = vmdk.VMwareVcVmdkDriver(configuration=self._config,
db=self._db)
self._driver = vmdk.VMwareVcVmdkDriver(configuration=self._config)
self._context = context.get_admin_context()
self.updated_at = timeutils.utcnow()

View File

@ -25,6 +25,7 @@ from oslo_config import types
from oslo_log import log as logging
from oslo_utils import excutils
from cinder import db
from cinder import exception
from cinder.i18n import _
from cinder.image import image_utils
@ -400,8 +401,8 @@ class BaseVD(object, metaclass=abc.ABCMeta):
'a/a': 'failover_completed'}
def __init__(self, execute=utils.execute, *args, **kwargs):
# NOTE(vish): db is set by Manager
self.db = kwargs.get('db')
# TODO(stephenfin): Drop this in favour of using 'db' directly
self.db = db
self.host = kwargs.get('host')
self.cluster_name = kwargs.get('cluster_name')
self.configuration = kwargs.get('configuration', None)

View File

@ -1036,7 +1036,6 @@ class LinstorIscsiDriver(LinstorBaseDriver):
self.target_driver = importutils.import_object(
self.helper_driver,
configuration=self.configuration,
db=self.db,
executor=self._execute)
LOG.info('START: LINSTOR DRBD driver %s', self.helper_name)

View File

@ -107,7 +107,6 @@ class LVMVolumeDriver(driver.VolumeDriver):
self.target_driver = importutils.import_object(
target_driver,
configuration=self.configuration,
db=self.db,
executor=self._execute)
self.protocol = self.target_driver.protocol
self._sparse_copy_volume = False

View File

@ -922,10 +922,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
This works by creating an encrypted image locally,
and then uploading it to the volume.
"""
encryption = volume_utils.check_encryption_provider(self.db,
volume,
context)
encryption = volume_utils.check_encryption_provider(volume, context)
# Fetch the key associated with the volume and decode the passphrase
keymgr = key_manager.API(CONF)
@ -1619,10 +1616,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._copy_image_to_volume(context, volume, image_service, image_id)
def _encrypt_image(self, context, volume, tmp_dir, src_image_path):
encryption = volume_utils.check_encryption_provider(
self.db,
volume,
context)
encryption = volume_utils.check_encryption_provider(volume, context)
# Fetch the key associated with the volume and decode the passphrase
keymgr = key_manager.API(CONF)

View File

@ -304,7 +304,6 @@ class RemoteFSDriver(driver.BaseVD):
if encrypted:
encryption = volume_utils.check_encryption_provider(
self.db,
volume,
volume.obj_context)
@ -1481,7 +1480,6 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
file_json = jsonutils.dumps(file_json_dict)
encryption = volume_utils.check_encryption_provider(
db=db,
volume=snapshot.volume,
context=obj_context)

View File

@ -57,7 +57,6 @@ class SPDKDriver(driver.VolumeDriver):
self.target_driver = importutils.import_object(
target_driver,
configuration=self.configuration,
db=self.db,
executor=self._execute)
@staticmethod

View File

@ -550,7 +550,6 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
raise exception.RekeyNotSupported()
encryption = volume_utils.check_encryption_provider(
self.db,
volume,
context)

View File

@ -305,7 +305,6 @@ class VolumeManager(manager.CleanableManager,
self.driver = importutils.import_object(
volume_driver,
configuration=self.configuration,
db=self.db,
host=self.host,
cluster_name=self.cluster,
is_vol_db_empty=vol_db_empty,

View File

@ -14,6 +14,8 @@ import abc
from oslo_config import cfg
from cinder import db
CONF = cfg.CONF
@ -31,7 +33,8 @@ class Target(object, metaclass=abc.ABCMeta):
"""
def __init__(self, *args, **kwargs):
self.db = kwargs.get('db')
# TODO(stephenfin): Drop this in favour of using 'db' directly
self.db = db
self.configuration = kwargs.get('configuration')
self._root_helper = kwargs.get('root_helper',
'sudo cinder-rootwrap %s' %

View File

@ -1282,9 +1282,10 @@ def image_conversion_dir() -> str:
return tmpdir
def check_encryption_provider(db,
volume: 'objects.Volume',
context: context.RequestContext) -> dict:
def check_encryption_provider(
volume: 'objects.Volume',
context: context.RequestContext,
) -> dict:
"""Check that this is a LUKS encryption provider.
:returns: encryption dict