Merge "RBD: Use static methods where possible"

This commit is contained in:
Zuul 2022-05-17 23:23:35 +00:00 committed by Gerrit Code Review
commit b1d8895f01
2 changed files with 26 additions and 17 deletions

View File

@ -200,11 +200,13 @@ class CephBackupDriver(driver.BackupDriver):
def get_driver_options() -> list: def get_driver_options() -> list:
return service_opts return service_opts
def _validate_string_args(self, *args: str) -> bool: @staticmethod
def _validate_string_args(*args: str) -> bool:
"""Ensure all args are non-None and non-empty.""" """Ensure all args are non-None and non-empty."""
return all(args) return all(args)
def _ceph_args(self, user: str, conf: Optional[str] = None, @staticmethod
def _ceph_args(user: str, conf: Optional[str] = None,
pool: Optional[str] = None) -> List[str]: pool: Optional[str] = None) -> List[str]:
"""Create default ceph args for executing rbd commands. """Create default ceph args for executing rbd commands.
@ -214,7 +216,7 @@ class CephBackupDriver(driver.BackupDriver):
# Make sure user arg is valid since rbd command may not fail if # Make sure user arg is valid since rbd command may not fail if
# invalid/no user provided, resulting in unexpected behaviour. # invalid/no user provided, resulting in unexpected behaviour.
if not self._validate_string_args(user): if not CephBackupDriver._validate_string_args(user):
raise exception.BackupInvalidCephArgs(_("invalid user '%s'") % raise exception.BackupInvalidCephArgs(_("invalid user '%s'") %
user) user)
@ -326,20 +328,21 @@ class CephBackupDriver(driver.BackupDriver):
client.shutdown() client.shutdown()
raise raise
def _disconnect_from_rados(self, @staticmethod
client: 'rados.Rados', def _disconnect_from_rados(client: 'rados.Rados',
ioctx: 'rados.Ioctx') -> None: ioctx: 'rados.Ioctx') -> None:
"""Terminate connection with the backup Ceph cluster.""" """Terminate connection with the backup Ceph cluster."""
# closing an ioctx cannot raise an exception # closing an ioctx cannot raise an exception
ioctx.close() ioctx.close()
client.shutdown() client.shutdown()
def _format_base_name(self, service_metadata: str) -> str: @staticmethod
def _format_base_name(service_metadata: str) -> str:
base_name = json.loads(service_metadata)["base"] base_name = json.loads(service_metadata)["base"]
return utils.convert_str(base_name) return utils.convert_str(base_name)
@staticmethod
def _get_backup_base_name( def _get_backup_base_name(
self,
volume_id: str, volume_id: str,
backup: Optional['objects.Backup'] = None) -> str: backup: Optional['objects.Backup'] = None) -> str:
"""Return name of base image used for backup. """Return name of base image used for backup.
@ -352,7 +355,7 @@ class CephBackupDriver(driver.BackupDriver):
return utils.convert_str("volume-%s.backup.base" % volume_id) return utils.convert_str("volume-%s.backup.base" % volume_id)
if backup.service_metadata: if backup.service_metadata:
return self._format_base_name(backup.service_metadata) return CephBackupDriver._format_base_name(backup.service_metadata)
# 'parent' field will only be present in incremental backups. This is # 'parent' field will only be present in incremental backups. This is
# filled by cinder-api # filled by cinder-api
@ -361,7 +364,8 @@ class CephBackupDriver(driver.BackupDriver):
# so we use the default RBD backup base # so we use the default RBD backup base
if backup.parent.service_metadata: if backup.parent.service_metadata:
service_metadata = backup.parent.service_metadata service_metadata = backup.parent.service_metadata
base_name = self._format_base_name(service_metadata) base_name = CephBackupDriver._format_base_name(
service_metadata)
else: else:
base_name = utils.convert_str("volume-%s.backup.base" base_name = utils.convert_str("volume-%s.backup.base"
% volume_id) % volume_id)
@ -692,7 +696,7 @@ class CephBackupDriver(driver.BackupDriver):
if name not in rbds: if name not in rbds:
LOG.debug("Image '%s' not found - trying diff format name", name) LOG.debug("Image '%s' not found - trying diff format name", name)
if try_diff_format: if try_diff_format:
name = self._get_backup_base_name(volume_id) name = CephBackupDriver._get_backup_base_name(volume_id)
if name not in rbds: if name not in rbds:
LOG.debug("Diff format image '%s' not found", name) LOG.debug("Diff format image '%s' not found", name)
return False, name return False, name
@ -849,7 +853,8 @@ class CephBackupDriver(driver.BackupDriver):
return {'service_metadata': '{"base": "%s"}' % base_name} return {'service_metadata': '{"base": "%s"}' % base_name}
def _file_is_rbd(self, volume_file: linuxrbd.RBDVolumeIOWrapper) -> bool: @staticmethod
def _file_is_rbd(volume_file: linuxrbd.RBDVolumeIOWrapper) -> bool:
"""Returns True if the volume_file is actually an RBD image.""" """Returns True if the volume_file is actually an RBD image."""
return hasattr(volume_file, 'rbd_image') return hasattr(volume_file, 'rbd_image')
@ -1188,7 +1193,8 @@ class CephBackupDriver(driver.BackupDriver):
return restore_point return restore_point
def _rbd_has_extents(self, rbd_volume) -> bool: @staticmethod
def _rbd_has_extents(rbd_volume) -> bool:
"""Check whether the given rbd volume has extents. """Check whether the given rbd volume has extents.
Return True if has extents, otherwise False. Return True if has extents, otherwise False.

View File

@ -549,14 +549,15 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return _do_conn(pool, remote, timeout) return _do_conn(pool, remote, timeout)
def _disconnect_from_rados(self, @staticmethod
client: 'rados.Rados', def _disconnect_from_rados(client: 'rados.Rados',
ioctx: 'rados.Ioctx') -> None: ioctx: 'rados.Ioctx') -> None:
# closing an ioctx cannot raise an exception # closing an ioctx cannot raise an exception
ioctx.close() ioctx.close()
client.shutdown() client.shutdown()
def _get_backup_snaps(self, rbd_image) -> List: @staticmethod
def _get_backup_snaps(rbd_image) -> List:
"""Get list of any backup snapshots that exist on this volume. """Get list of any backup snapshots that exist on this volume.
There should only ever be one but accept all since they need to be There should only ever be one but accept all since they need to be
@ -1454,7 +1455,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
"""Retype from one volume type to another on the same backend.""" """Retype from one volume type to another on the same backend."""
return True, self._setup_volume(volume, new_type) return True, self._setup_volume(volume, new_type)
def _dumps(self, obj: Dict[str, Union[bool, int]]) -> str: @staticmethod
def _dumps(obj: Dict[str, Union[bool, int]]) -> str:
return json.dumps(obj, separators=(',', ':'), sort_keys=True) return json.dumps(obj, separators=(',', ':'), sort_keys=True)
def _exec_on_volume(self, def _exec_on_volume(self,
@ -1659,7 +1661,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
**kwargs) -> None: **kwargs) -> None:
pass pass
def _parse_location(self, location: str) -> List[str]: @staticmethod
def _parse_location(location: str) -> List[str]:
prefix = 'rbd://' prefix = 'rbd://'
if not location.startswith(prefix): if not location.startswith(prefix):
reason = _('Not stored in rbd') reason = _('Not stored in rbd')