Cleanup RCP API versioning

Moves version checks into method preparing message context.
Provides default versions for context preparation to avoid repetitive
task of updating it in every method after introduction of new version.

Change-Id: I761e98ae46b7d1140f30b883a617a75b4af3fe0a
This commit is contained in:
Szymon Wroblewski 2016-09-19 18:18:15 -05:00
parent 14899a9a8e
commit 10bff37595
4 changed files with 114 additions and 208 deletions

View File

@ -48,32 +48,24 @@ class BackupAPI(rpc.RPCAPI):
"""
RPC_API_VERSION = '2.0'
RPC_DEFAULT_VERSION = '2.0'
TOPIC = constants.BACKUP_TOPIC
BINARY = 'cinder-backup'
def _compat_ver(self, current, legacy):
if self.client.can_send_version(current):
return current
else:
return legacy
def create_backup(self, ctxt, backup):
LOG.debug("create_backup in rpcapi backup_id %s", backup.id)
version = '2.0'
cctxt = self.client.prepare(server=backup.host, version=version)
cctxt = self._get_cctxt(server=backup.host)
cctxt.cast(ctxt, 'create_backup', backup=backup)
def restore_backup(self, ctxt, volume_host, backup, volume_id):
LOG.debug("restore_backup in rpcapi backup_id %s", backup.id)
version = '2.0'
cctxt = self.client.prepare(server=volume_host, version=version)
cctxt = self._get_cctxt(server=volume_host)
cctxt.cast(ctxt, 'restore_backup', backup=backup,
volume_id=volume_id)
def delete_backup(self, ctxt, backup):
LOG.debug("delete_backup rpcapi backup_id %s", backup.id)
version = '2.0'
cctxt = self.client.prepare(server=backup.host, version=version)
cctxt = self._get_cctxt(server=backup.host)
cctxt.cast(ctxt, 'delete_backup', backup=backup)
def export_record(self, ctxt, backup):
@ -81,24 +73,15 @@ class BackupAPI(rpc.RPCAPI):
"on host %(host)s.",
{'id': backup.id,
'host': backup.host})
version = '2.0'
cctxt = self.client.prepare(server=backup.host, version=version)
cctxt = self._get_cctxt(server=backup.host)
return cctxt.call(ctxt, 'export_record', backup=backup)
def import_record(self,
ctxt,
host,
backup,
backup_service,
backup_url,
def import_record(self, ctxt, host, backup, backup_service, backup_url,
backup_hosts):
LOG.debug("import_record rpcapi backup id %(id)s "
"on host %(host)s for backup_url %(url)s.",
{'id': backup.id,
'host': host,
'url': backup_url})
version = '2.0'
cctxt = self.client.prepare(server=host, version=version)
{'id': backup.id, 'host': host, 'url': backup_url})
cctxt = self._get_cctxt(server=host)
cctxt.cast(ctxt, 'import_record',
backup=backup,
backup_service=backup_service,
@ -108,15 +91,12 @@ class BackupAPI(rpc.RPCAPI):
def reset_status(self, ctxt, backup, status):
LOG.debug("reset_status in rpcapi backup_id %(id)s "
"on host %(host)s.",
{'id': backup.id,
'host': backup.host})
version = '2.0'
cctxt = self.client.prepare(server=backup.host, version=version)
{'id': backup.id, 'host': backup.host})
cctxt = self._get_cctxt(server=backup.host)
return cctxt.cast(ctxt, 'reset_status', backup=backup, status=status)
def check_support_to_force_delete(self, ctxt, host):
LOG.debug("Check if backup driver supports force delete "
"on host %(host)s.", {'host': host})
version = '2.0'
cctxt = self.client.prepare(server=host, version=version)
cctxt = self._get_cctxt(server=host)
return cctxt.call(ctxt, 'check_support_to_force_delete')

View File

@ -29,7 +29,6 @@ __all__ = [
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from oslo_utils import importutils
profiler = importutils.try_import('osprofiler.profiler')
@ -74,7 +73,7 @@ def init(conf):
allowed_remote_exmods=exmods,
aliases=TRANSPORT_ALIASES)
serializer = RequestContextSerializer(JsonPayloadSerializer())
serializer = RequestContextSerializer(messaging.JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT,
serializer=serializer)
@ -108,12 +107,6 @@ def get_allowed_exmods():
return ALLOWED_EXMODS + EXTRA_EXMODS
class JsonPayloadSerializer(messaging.NoOpSerializer):
@staticmethod
def serialize_entity(context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RequestContextSerializer(messaging.Serializer):
def __init__(self, base):
@ -185,6 +178,7 @@ class RPCAPI(object):
"""Mixin class aggregating methods related to RPC API compatibility."""
RPC_API_VERSION = '1.0'
RPC_DEFAULT_VERSION = '1.0'
TOPIC = ''
BINARY = ''
@ -205,6 +199,20 @@ class RPCAPI(object):
return version
return versions[-1]
def _get_cctxt(self, host=None, version=None, **kwargs):
"""Prepare client context
Version parameter accepts single version string or tuple of strings.
Compatible version can be obtained later using:
cctxt = _get_cctxt(...)
version = cctxt.target.version
"""
if version is None:
version = self.RPC_DEFAULT_VERSION
if isinstance(version, tuple):
version = self._compat_ver(*version)
return self.client.prepare(version=version, **kwargs)
@classmethod
def determine_rpc_version_cap(cls):
global LAST_RPC_VERSIONS

View File

@ -61,18 +61,15 @@ class SchedulerAPI(rpc.RPCAPI):
"""
RPC_API_VERSION = '3.0'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.SCHEDULER_TOPIC
BINARY = 'cinder-scheduler'
def create_consistencygroup(self, ctxt, group, request_spec_list=None,
filter_properties_list=None):
version = '3.0'
cctxt = self.client.prepare(version=version)
request_spec_p_list = []
for request_spec in request_spec_list:
request_spec_p = jsonutils.to_primitive(request_spec)
request_spec_p_list.append(request_spec_p)
cctxt = self._get_cctxt()
request_spec_p_list = [jsonutils.to_primitive(rs)
for rs in request_spec_list]
msg_args = {
'group': group, 'request_spec_list': request_spec_p_list,
'filter_properties_list': filter_properties_list,
@ -83,14 +80,10 @@ class SchedulerAPI(rpc.RPCAPI):
def create_group(self, ctxt, group, group_spec=None,
request_spec_list=None, group_filter_properties=None,
filter_properties_list=None):
version = '3.0'
cctxt = self.client.prepare(version=version)
request_spec_p_list = []
for request_spec in request_spec_list:
request_spec_p = jsonutils.to_primitive(request_spec)
request_spec_p_list.append(request_spec_p)
cctxt = self._get_cctxt()
request_spec_p_list = [jsonutils.to_primitive(rs)
for rs in request_spec_list]
group_spec_p = jsonutils.to_primitive(group_spec)
msg_args = {
'group': group, 'group_spec': group_spec_p,
'request_spec_list': request_spec_p_list,
@ -102,52 +95,46 @@ class SchedulerAPI(rpc.RPCAPI):
def create_volume(self, ctxt, volume, snapshot_id=None, image_id=None,
request_spec=None, filter_properties=None):
cctxt = self._get_cctxt()
msg_args = {'snapshot_id': snapshot_id, 'image_id': image_id,
'request_spec': request_spec,
'filter_properties': filter_properties, 'volume': volume}
version = '3.0'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'create_volume', **msg_args)
def migrate_volume_to_host(self, ctxt, volume, host, force_host_copy=False,
request_spec=None, filter_properties=None):
cctxt = self._get_cctxt()
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume}
version = '3.0'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'migrate_volume_to_host', **msg_args)
def retype(self, ctxt, volume, request_spec=None, filter_properties=None):
cctxt = self._get_cctxt()
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume}
version = '3.0'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, volume, request_spec=None,
filter_properties=None):
cctxt = self._get_cctxt()
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {
'request_spec': request_spec_p,
'filter_properties': filter_properties, 'volume': volume,
}
version = '3.0'
cctxt = self.client.prepare(version=version)
return cctxt.cast(ctxt, 'manage_existing', **msg_args)
def get_pools(self, ctxt, filters=None):
version = '3.0'
cctxt = self.client.prepare(version=version)
return cctxt.call(ctxt, 'get_pools',
filters=filters)
cctxt = self._get_cctxt()
return cctxt.call(ctxt, 'get_pools', filters=filters)
def update_service_capabilities(self, ctxt, service_name, host,
capabilities):
version = '3.0'
cctxt = self.client.prepare(fanout=True, version=version)
cctxt = self._get_cctxt(fanout=True)
cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)

View File

@ -113,29 +113,26 @@ class VolumeAPI(rpc.RPCAPI):
"""
RPC_API_VERSION = '3.1'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
def _get_cctxt(self, host, version):
new_host = utils.get_volume_rpc_host(host)
return self.client.prepare(server=new_host, version=version)
def _get_cctxt(self, host=None, **kwargs):
if host is not None:
kwargs['server'] = utils.get_volume_rpc_host(host)
return super(VolumeAPI, self)._get_cctxt(**kwargs)
def create_consistencygroup(self, ctxt, group, host):
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'create_consistencygroup',
group=group)
cctxt = self._get_cctxt(host)
cctxt.cast(ctxt, 'create_consistencygroup', group=group)
def delete_consistencygroup(self, ctxt, group):
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'delete_consistencygroup',
group=group)
cctxt = self._get_cctxt(group.host)
cctxt.cast(ctxt, 'delete_consistencygroup', group=group)
def update_consistencygroup(self, ctxt, group, add_volumes=None,
remove_volumes=None):
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt = self._get_cctxt(group.host)
cctxt.cast(ctxt, 'update_consistencygroup',
group=group,
add_volumes=add_volumes,
@ -143,38 +140,31 @@ class VolumeAPI(rpc.RPCAPI):
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
source_cg=None):
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt = self._get_cctxt(group.host)
cctxt.cast(ctxt, 'create_consistencygroup_from_src',
group=group,
cgsnapshot=cgsnapshot,
source_cg=source_cg)
def create_cgsnapshot(self, ctxt, cgsnapshot):
version = '3.0'
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version)
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host)
cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
def delete_cgsnapshot(self, ctxt, cgsnapshot):
version = '3.0'
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, version)
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host)
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
def create_volume(self, ctxt, volume, host, request_spec,
filter_properties, allow_reschedule=True):
msg_args = {'request_spec': request_spec,
'filter_properties': filter_properties,
'allow_reschedule': allow_reschedule,
'volume': volume,
}
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'create_volume', **msg_args)
cctxt = self._get_cctxt(host)
cctxt.cast(ctxt, 'create_volume',
request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=allow_reschedule,
volume=volume)
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
cctxt = self._get_cctxt(volume.host)
msg_args = {
'volume': volume, 'unmanage_only': unmanage_only,
'cascade': cascade,
@ -183,20 +173,17 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot):
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt = self._get_cctxt(volume['host'])
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt = self._get_cctxt(host)
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
unmanage_only=unmanage_only)
def attach_volume(self, ctxt, volume, instance_uuid, host_name,
mountpoint, mode):
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt = self._get_cctxt(volume['host'])
return cctxt.call(ctxt, 'attach_volume',
volume_id=volume['id'],
instance_uuid=instance_uuid,
@ -205,209 +192,153 @@ class VolumeAPI(rpc.RPCAPI):
mode=mode)
def detach_volume(self, ctxt, volume, attachment_id):
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt = self._get_cctxt(volume['host'])
return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'],
attachment_id=attachment_id)
def copy_volume_to_image(self, ctxt, volume, image_meta):
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt = self._get_cctxt(volume['host'])
cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
image_meta=image_meta)
def initialize_connection(self, ctxt, volume, connector):
version = '3.0'
msg_args = {'connector': connector, 'volume': volume}
cctxt = self._get_cctxt(volume['host'], version=version)
return cctxt.call(ctxt, 'initialize_connection', **msg_args)
cctxt = self._get_cctxt(volume['host'])
return cctxt.call(ctxt, 'initialize_connection', connector=connector,
volume=volume)
def terminate_connection(self, ctxt, volume, connector, force=False):
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt = self._get_cctxt(volume['host'])
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
connector=connector, force=force)
def remove_export(self, ctxt, volume):
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt = self._get_cctxt(volume['host'])
cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
def publish_service_capabilities(self, ctxt):
version = '3.0'
cctxt = self.client.prepare(fanout=True, version=version)
cctxt = self._get_cctxt(fanout=True)
cctxt.cast(ctxt, 'publish_service_capabilities')
def accept_transfer(self, ctxt, volume, new_user, new_project):
version = '3.0'
cctxt = self._get_cctxt(volume['host'], version)
cctxt = self._get_cctxt(volume['host'])
return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
new_user=new_user, new_project=new_project)
def extend_volume(self, ctxt, volume, new_size, reservations):
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
'volume': volume, 'new_size': new_size,
'reservations': reservations,
}
cctxt.cast(ctxt, 'extend_volume', **msg_args)
cctxt = self._get_cctxt(volume.host)
cctxt.cast(ctxt, 'extend_volume', volume=volume, new_size=new_size,
reservations=reservations)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
'volume': volume, 'host': host_p,
'force_host_copy': force_host_copy,
}
cctxt.cast(ctxt, 'migrate_volume', **msg_args)
cctxt = self._get_cctxt(volume.host)
cctxt.cast(ctxt, 'migrate_volume', volume=volume, host=host_p,
force_host_copy=force_host_copy)
def migrate_volume_completion(self, ctxt, volume, new_volume, error):
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
'volume': volume, 'new_volume': new_volume, 'error': error,
}
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
cctxt = self._get_cctxt(volume.host)
return cctxt.call(ctxt, 'migrate_volume_completion', volume=volume,
new_volume=new_volume, error=error,)
def retype(self, ctxt, volume, new_type_id, dest_host,
migration_policy='never', reservations=None,
old_reservations=None):
host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities}
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
msg_args = {
'volume': volume, 'new_type_id': new_type_id, 'host': host_p,
'migration_policy': migration_policy, 'reservations': reservations,
'old_reservations': old_reservations,
}
cctxt.cast(ctxt, 'retype', **msg_args)
cctxt = self._get_cctxt(volume.host)
cctxt.cast(ctxt, 'retype', volume=volume, new_type_id=new_type_id,
host=host_p, migration_policy=migration_policy,
reservations=reservations,
old_reservations=old_reservations)
def manage_existing(self, ctxt, volume, ref):
msg_args = {
'ref': ref, 'volume': volume,
}
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'manage_existing', **msg_args)
cctxt = self._get_cctxt(volume.host)
cctxt.cast(ctxt, 'manage_existing', ref=ref, volume=volume)
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
version = '3.0'
cctxt = self._get_cctxt(new_volume['host'], version)
cctxt.call(ctxt,
'update_migrated_volume',
cctxt = self._get_cctxt(new_volume['host'])
cctxt.call(ctxt, 'update_migrated_volume',
volume=volume,
new_volume=new_volume,
volume_status=original_volume_status)
def freeze_host(self, ctxt, host):
"""Set backend host to frozen."""
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt = self._get_cctxt(host)
return cctxt.call(ctxt, 'freeze_host')
def thaw_host(self, ctxt, host):
"""Clear the frozen setting on a backend host."""
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt = self._get_cctxt(host)
return cctxt.call(ctxt, 'thaw_host')
def failover_host(self, ctxt, host, secondary_backend_id=None):
"""Failover host to the specified backend_id (secondary). """
version = '3.0'
cctxt = self._get_cctxt(host, version)
"""Failover host to the specified backend_id (secondary)."""
cctxt = self._get_cctxt(host)
cctxt.cast(ctxt, 'failover_host',
secondary_backend_id=secondary_backend_id)
def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt = self._get_cctxt(host)
cctxt.cast(ctxt, 'manage_existing_snapshot',
snapshot=snapshot,
ref=ref)
def get_capabilities(self, ctxt, host, discover):
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt = self._get_cctxt(host)
return cctxt.call(ctxt, 'get_capabilities', discover=discover)
def get_backup_device(self, ctxt, backup, volume):
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
cctxt = self._get_cctxt(volume.host)
backup_dict = cctxt.call(ctxt, 'get_backup_device', backup=backup)
return backup_dict
def secure_file_operations_enabled(self, ctxt, volume):
version = '3.0'
cctxt = self._get_cctxt(volume.host, version)
cctxt = self._get_cctxt(volume.host)
return cctxt.call(ctxt, 'secure_file_operations_enabled',
volume=volume)
def get_manageable_volumes(self, ctxt, host, marker, limit, offset,
sort_keys, sort_dirs):
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt = self._get_cctxt(host)
return cctxt.call(ctxt, 'get_manageable_volumes', marker=marker,
limit=limit, offset=offset, sort_keys=sort_keys,
sort_dirs=sort_dirs)
def get_manageable_snapshots(self, ctxt, host, marker, limit, offset,
sort_keys, sort_dirs):
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt = self._get_cctxt(host)
return cctxt.call(ctxt, 'get_manageable_snapshots', marker=marker,
limit=limit, offset=offset, sort_keys=sort_keys,
sort_dirs=sort_dirs)
def create_group(self, ctxt, group, host):
version = '3.0'
cctxt = self._get_cctxt(host, version)
cctxt.cast(ctxt, 'create_group',
group=group)
cctxt = self._get_cctxt(host)
cctxt.cast(ctxt, 'create_group', group=group)
def delete_group(self, ctxt, group):
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'delete_group',
group=group)
cctxt = self._get_cctxt(group.host)
cctxt.cast(ctxt, 'delete_group', group=group)
def update_group(self, ctxt, group, add_volumes=None,
remove_volumes=None):
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'update_group',
group=group,
add_volumes=add_volumes,
def update_group(self, ctxt, group, add_volumes=None, remove_volumes=None):
cctxt = self._get_cctxt(group.host)
cctxt.cast(ctxt, 'update_group', group=group, add_volumes=add_volumes,
remove_volumes=remove_volumes)
def create_group_from_src(self, ctxt, group, group_snapshot=None,
source_group=None):
version = '3.0'
cctxt = self._get_cctxt(group.host, version)
cctxt.cast(ctxt, 'create_group_from_src',
group=group,
group_snapshot=group_snapshot,
source_group=source_group)
cctxt = self._get_cctxt(group.host)
cctxt.cast(ctxt, 'create_group_from_src', group=group,
group_snapshot=group_snapshot, source_group=source_group)
def create_group_snapshot(self, ctxt, group_snapshot):
version = '3.0'
cctxt = self._get_cctxt(group_snapshot.group.host, version)
cctxt = self._get_cctxt(group_snapshot.group.host)
cctxt.cast(ctxt, 'create_group_snapshot',
group_snapshot=group_snapshot)
def delete_group_snapshot(self, ctxt, group_snapshot):
version = '3.0'
cctxt = self._get_cctxt(group_snapshot.group.host, version)
cctxt = self._get_cctxt(group_snapshot.group.host)
cctxt.cast(ctxt, 'delete_group_snapshot',
group_snapshot=group_snapshot)