Merge "Support ZeroMQ messaging driver in cinder"

This commit is contained in:
Jenkins 2016-02-11 20:26:30 +00:00 committed by Gerrit Code Review
commit f8659a7eef
4 changed files with 53 additions and 61 deletions

View File

@ -789,6 +789,13 @@ class VolumeUtilsTestCase(test.TestCase):
self.assertEqual(pool, self.assertEqual(pool,
volume_utils.extract_host(host, 'pool', True)) volume_utils.extract_host(host, 'pool', True))
def test_get_volume_rpc_host(self):
host = 'Host@backend'
# default level is 'backend'
# check if host with backend is returned
self.assertEqual(volume_utils.extract_host(host),
volume_utils.get_volume_rpc_host(host))
def test_append_host(self): def test_append_host(self):
host = 'Host' host = 'Host'
pool = 'Pool' pool = 'Pool'

View File

@ -91,22 +91,23 @@ class VolumeAPI(rpc.RPCAPI):
TOPIC = CONF.volume_topic TOPIC = CONF.volume_topic
BINARY = 'cinder-volume' BINARY = 'cinder-volume'
def _get_cctxt(self, host, version):
new_host = utils.get_volume_rpc_host(host)
return self.client.prepare(server=new_host, version=version)
def create_consistencygroup(self, ctxt, group, host): def create_consistencygroup(self, ctxt, group, host):
new_host = utils.extract_host(host) cctxt = self._get_cctxt(host, '1.26')
cctxt = self.client.prepare(server=new_host, version='1.26')
cctxt.cast(ctxt, 'create_consistencygroup', cctxt.cast(ctxt, 'create_consistencygroup',
group=group) group=group)
def delete_consistencygroup(self, ctxt, group): def delete_consistencygroup(self, ctxt, group):
host = utils.extract_host(group.host) cctxt = self._get_cctxt(group.host, '1.26')
cctxt = self.client.prepare(server=host, version='1.26')
cctxt.cast(ctxt, 'delete_consistencygroup', cctxt.cast(ctxt, 'delete_consistencygroup',
group=group) group=group)
def update_consistencygroup(self, ctxt, group, add_volumes=None, def update_consistencygroup(self, ctxt, group, add_volumes=None,
remove_volumes=None): remove_volumes=None):
host = utils.extract_host(group.host) cctxt = self._get_cctxt(group.host, '1.26')
cctxt = self.client.prepare(server=host, version='1.26')
cctxt.cast(ctxt, 'update_consistencygroup', cctxt.cast(ctxt, 'update_consistencygroup',
group=group, group=group,
add_volumes=add_volumes, add_volumes=add_volumes,
@ -114,21 +115,18 @@ class VolumeAPI(rpc.RPCAPI):
def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None, def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
source_cg=None): source_cg=None):
new_host = utils.extract_host(group.host) cctxt = self._get_cctxt(group.host, '1.31')
cctxt = self.client.prepare(server=new_host, version='1.31')
cctxt.cast(ctxt, 'create_consistencygroup_from_src', cctxt.cast(ctxt, 'create_consistencygroup_from_src',
group=group, group=group,
cgsnapshot=cgsnapshot, cgsnapshot=cgsnapshot,
source_cg=source_cg) source_cg=source_cg)
def create_cgsnapshot(self, ctxt, cgsnapshot): def create_cgsnapshot(self, ctxt, cgsnapshot):
host = utils.extract_host(cgsnapshot.consistencygroup.host) cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
cctxt = self.client.prepare(server=host, version='1.31')
cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot) cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
def delete_cgsnapshot(self, ctxt, cgsnapshot): def delete_cgsnapshot(self, ctxt, cgsnapshot):
new_host = utils.extract_host(cgsnapshot.consistencygroup.host) cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
cctxt = self.client.prepare(server=new_host, version='1.31')
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot) cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
def create_volume(self, ctxt, volume, host, request_spec, def create_volume(self, ctxt, volume, host, request_spec,
@ -143,8 +141,7 @@ class VolumeAPI(rpc.RPCAPI):
else: else:
version = '1.24' version = '1.24'
new_host = utils.extract_host(host) cctxt = self._get_cctxt(host, version)
cctxt = self.client.prepare(server=new_host, version=version)
request_spec_p = jsonutils.to_primitive(request_spec) request_spec_p = jsonutils.to_primitive(request_spec)
cctxt.cast(ctxt, 'create_volume', **msg_args) cctxt.cast(ctxt, 'create_volume', **msg_args)
@ -156,27 +153,23 @@ class VolumeAPI(rpc.RPCAPI):
else: else:
version = '1.15' version = '1.15'
new_host = utils.extract_host(volume.host) cctxt = self._get_cctxt(volume.host, version)
cctxt = self.client.prepare(server=new_host, version=version)
cctxt.cast(ctxt, 'delete_volume', **msg_args) cctxt.cast(ctxt, 'delete_volume', **msg_args)
def create_snapshot(self, ctxt, volume, snapshot): def create_snapshot(self, ctxt, volume, snapshot):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], version='1.20')
cctxt = self.client.prepare(server=new_host, version='1.20')
cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'], cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'],
snapshot=snapshot) snapshot=snapshot)
def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False): def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
new_host = utils.extract_host(host) cctxt = self._get_cctxt(host, version='1.20')
cctxt = self.client.prepare(server=new_host, version='1.20')
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot, cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
unmanage_only=unmanage_only) unmanage_only=unmanage_only)
def attach_volume(self, ctxt, volume, instance_uuid, host_name, def attach_volume(self, ctxt, volume, instance_uuid, host_name,
mountpoint, mode): mountpoint, mode):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.11')
cctxt = self.client.prepare(server=new_host, version='1.11')
return cctxt.call(ctxt, 'attach_volume', return cctxt.call(ctxt, 'attach_volume',
volume_id=volume['id'], volume_id=volume['id'],
instance_uuid=instance_uuid, instance_uuid=instance_uuid,
@ -185,33 +178,28 @@ class VolumeAPI(rpc.RPCAPI):
mode=mode) mode=mode)
def detach_volume(self, ctxt, volume, attachment_id): def detach_volume(self, ctxt, volume, attachment_id):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.20')
cctxt = self.client.prepare(server=new_host, version='1.20')
return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'], return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'],
attachment_id=attachment_id) attachment_id=attachment_id)
def copy_volume_to_image(self, ctxt, volume, image_meta): def copy_volume_to_image(self, ctxt, volume, image_meta):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.3')
cctxt = self.client.prepare(server=new_host, version='1.3')
cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'], cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
image_meta=image_meta) image_meta=image_meta)
def initialize_connection(self, ctxt, volume, connector): def initialize_connection(self, ctxt, volume, connector):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], version='1.0')
cctxt = self.client.prepare(server=new_host, version='1.0')
return cctxt.call(ctxt, 'initialize_connection', return cctxt.call(ctxt, 'initialize_connection',
volume_id=volume['id'], volume_id=volume['id'],
connector=connector) connector=connector)
def terminate_connection(self, ctxt, volume, connector, force=False): def terminate_connection(self, ctxt, volume, connector, force=False):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], version='1.0')
cctxt = self.client.prepare(server=new_host, version='1.0')
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'], return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
connector=connector, force=force) connector=connector, force=force)
def remove_export(self, ctxt, volume): def remove_export(self, ctxt, volume):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.30')
cctxt = self.client.prepare(server=new_host, version='1.30')
cctxt.cast(ctxt, 'remove_export', volume_id=volume['id']) cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
def publish_service_capabilities(self, ctxt): def publish_service_capabilities(self, ctxt):
@ -219,13 +207,11 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'publish_service_capabilities') cctxt.cast(ctxt, 'publish_service_capabilities')
def accept_transfer(self, ctxt, volume, new_user, new_project): def accept_transfer(self, ctxt, volume, new_user, new_project):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.9')
cctxt = self.client.prepare(server=new_host, version='1.9')
return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'], return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
new_user=new_user, new_project=new_project) new_user=new_user, new_project=new_project)
def extend_volume(self, ctxt, volume, new_size, reservations): def extend_volume(self, ctxt, volume, new_size, reservations):
new_host = utils.extract_host(volume.host)
msg_args = {'volume_id': volume.id, 'new_size': new_size, msg_args = {'volume_id': volume.id, 'new_size': new_size,
'reservations': reservations} 'reservations': reservations}
@ -235,11 +221,10 @@ class VolumeAPI(rpc.RPCAPI):
else: else:
version = '1.14' version = '1.14'
cctxt = self.client.prepare(server=new_host, version=version) cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'extend_volume', **msg_args) cctxt.cast(ctxt, 'extend_volume', **msg_args)
def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
new_host = utils.extract_host(volume.host)
host_p = {'host': dest_host.host, host_p = {'host': dest_host.host,
'capabilities': dest_host.capabilities} 'capabilities': dest_host.capabilities}
@ -251,11 +236,10 @@ class VolumeAPI(rpc.RPCAPI):
else: else:
version = '1.8' version = '1.8'
cctxt = self.client.prepare(server=new_host, version=version) cctxt = self._get_cctxt(volume.host, version)
cctxt.cast(ctxt, 'migrate_volume', **msg_args) cctxt.cast(ctxt, 'migrate_volume', **msg_args)
def migrate_volume_completion(self, ctxt, volume, new_volume, error): def migrate_volume_completion(self, ctxt, volume, new_volume, error):
new_host = utils.extract_host(volume.host)
msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id, msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
'error': error} 'error': error}
@ -266,7 +250,7 @@ class VolumeAPI(rpc.RPCAPI):
else: else:
version = '1.10' version = '1.10'
cctxt = self.client.prepare(server=new_host, version=version) cctxt = self._get_cctxt(volume.host, version)
return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args) return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
def retype(self, ctxt, volume, new_type_id, dest_host, def retype(self, ctxt, volume, new_type_id, dest_host,
@ -287,29 +271,24 @@ class VolumeAPI(rpc.RPCAPI):
else: else:
version = '1.12' version = '1.12'
new_host = utils.extract_host(volume.host) cctxt = self._get_cctxt(volume.host, version)
cctxt = self.client.prepare(server=new_host, version=version)
cctxt.cast(ctxt, 'retype', **msg_args) cctxt.cast(ctxt, 'retype', **msg_args)
def manage_existing(self, ctxt, volume, ref): def manage_existing(self, ctxt, volume, ref):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.15')
cctxt = self.client.prepare(server=new_host, version='1.15')
cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref) cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
def promote_replica(self, ctxt, volume): def promote_replica(self, ctxt, volume):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.17')
cctxt = self.client.prepare(server=new_host, version='1.17')
cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id']) cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
def reenable_replication(self, ctxt, volume): def reenable_replication(self, ctxt, volume):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.17')
cctxt = self.client.prepare(server=new_host, version='1.17')
cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id']) cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])
def update_migrated_volume(self, ctxt, volume, new_volume, def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status): original_volume_status):
host = utils.extract_host(new_volume['host']) cctxt = self._get_cctxt(new_volume['host'], '1.36')
cctxt = self.client.prepare(server=host, version='1.36')
cctxt.call(ctxt, cctxt.call(ctxt,
'update_migrated_volume', 'update_migrated_volume',
volume=volume, volume=volume,
@ -317,13 +296,11 @@ class VolumeAPI(rpc.RPCAPI):
volume_status=original_volume_status) volume_status=original_volume_status)
def enable_replication(self, ctxt, volume): def enable_replication(self, ctxt, volume):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt = self.client.prepare(server=new_host, version='1.27')
cctxt.cast(ctxt, 'enable_replication', volume=volume) cctxt.cast(ctxt, 'enable_replication', volume=volume)
def disable_replication(self, ctxt, volume): def disable_replication(self, ctxt, volume):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt = self.client.prepare(server=new_host, version='1.27')
cctxt.cast(ctxt, 'disable_replication', cctxt.cast(ctxt, 'disable_replication',
volume=volume) volume=volume)
@ -331,24 +308,21 @@ class VolumeAPI(rpc.RPCAPI):
ctxt, ctxt,
volume, volume,
secondary=None): secondary=None):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt = self.client.prepare(server=new_host, version='1.27')
cctxt.cast(ctxt, 'failover_replication', cctxt.cast(ctxt, 'failover_replication',
volume=volume, volume=volume,
secondary=secondary) secondary=secondary)
def list_replication_targets(self, ctxt, volume): def list_replication_targets(self, ctxt, volume):
new_host = utils.extract_host(volume['host']) cctxt = self._get_cctxt(volume['host'], '1.27')
cctxt = self.client.prepare(server=new_host, version='1.27')
return cctxt.call(ctxt, 'list_replication_targets', volume=volume) return cctxt.call(ctxt, 'list_replication_targets', volume=volume)
def manage_existing_snapshot(self, ctxt, snapshot, ref, host): def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
cctxt = self.client.prepare(server=host, version='1.28') cctxt = self._get_cctxt(host, '1.28')
cctxt.cast(ctxt, 'manage_existing_snapshot', cctxt.cast(ctxt, 'manage_existing_snapshot',
snapshot=snapshot, snapshot=snapshot,
ref=ref) ref=ref)
def get_capabilities(self, ctxt, host, discover): def get_capabilities(self, ctxt, host, discover):
new_host = utils.extract_host(host) cctxt = self._get_cctxt(host, '1.29')
cctxt = self.client.prepare(server=new_host, version='1.29')
return cctxt.call(ctxt, 'get_capabilities', discover=discover) return cctxt.call(ctxt, 'get_capabilities', discover=discover)

View File

@ -623,6 +623,14 @@ def extract_host(host, level='backend', default_pool_name=False):
return None return None
def get_volume_rpc_host(host):
if CONF.rpc_backend and CONF.rpc_backend == "zmq":
# ZeroMQ RPC driver requires only the hostname.
# So, return just that.
return extract_host(host, 'host')
return extract_host(host)
def append_host(host, pool): def append_host(host, pool):
"""Encode pool into host info.""" """Encode pool into host info."""
if not host or not pool: if not host or not pool:

View File

@ -0,0 +1,3 @@
---
features:
- Added support for ZeroMQ messaging driver in cinder single backend config