Support A/A in delete actions and get_capabilities

This patch allows deleting operations and getting capabilities in HA
Active-Active configurations.

To allow this we have to use the cluster topic queue in the Message
Broker, so following RPC methods in cinder/volume/rpcapi have been
changed:

- delete_volume
- delete_snapshot
- delete_consistencygroup
- delete_cgsnapshot
- delete_group
- delete_group_snapshot

When the cluster field is not set all RPC calls will behave as before,
sending it to the host topic queue.

Change-Id: I8bcf26afec7f2c54f281dc1ac771825b1c1625ad
Specs: https://review.openstack.org/327283
Implements: blueprint cinder-volume-active-active-support
This commit is contained in:
Gorka Eguileor 2016-07-15 15:47:19 +02:00
parent 850a197226
commit f91b29cd0c
13 changed files with 243 additions and 87 deletions

View File

@ -93,6 +93,10 @@ class Manager(base.Base, PeriodicTasks):
self.additional_endpoints = []
super(Manager, self).__init__(db_driver)
@property
def service_topic_queue(self):
return self.cluster or self.host
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)

View File

@ -23,7 +23,7 @@ from oslo_versionedobjects import fields
@base.CinderObjectRegistry.register
class Group(base.CinderPersistentObject, base.CinderObject,
base.CinderObjectDictCompat):
base.CinderObjectDictCompat, base.ClusteredObject):
# Version 1.0: Initial version
# Version 1.1: Added group_snapshots, group_snapshot_id, and
# source_group_id

View File

@ -40,6 +40,10 @@ class GroupSnapshot(base.CinderPersistentObject, base.CinderObject,
'snapshots': fields.ObjectField('SnapshotList', nullable=True),
}
@property
def service_topic_queue(self):
return self.group.service_topic_queue
@classmethod
def _from_db_object(cls, context, group_snapshot, db_group_snapshots,
expected_attrs=None):

View File

@ -613,7 +613,7 @@ class VolumeList(base.ObjectListBase, base.CinderObject):
volumes, expected_attrs=expected_attrs)
@classmethod
def get_all_by_project(cls, context, project_id, marker, limit,
def get_all_by_project(cls, context, project_id, marker=None, limit=None,
sort_keys=None, sort_dirs=None, filters=None,
offset=None):
volumes = db.volume_get_all_by_project(context, project_id, marker,

View File

@ -23,7 +23,6 @@ from cinder import test
from cinder.tests.unit.api import fakes
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
# This list of fake snapshot is used by our tests.
@ -89,26 +88,15 @@ class SnapshotUnmanageTest(test.TestCase):
@mock.patch('cinder.db.conditional_update', return_value=1)
@mock.patch('cinder.db.snapshot_update')
@mock.patch('cinder.objects.Volume.get_by_id')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.delete_snapshot')
def test_unmanage_snapshot_ok(self, mock_rpcapi, mock_volume_get_by_id,
mock_db_update, mock_conditional_update):
def test_unmanage_snapshot_ok(self, mock_rpcapi, mock_db_update,
mock_conditional_update):
"""Return success for valid and unattached volume."""
ctxt = context.RequestContext(fake.USER_ID, fake.PROJECT_ID, True)
volume = fake_volume.fake_volume_obj(ctxt, id=fake.VOLUME_ID)
mock_volume_get_by_id.return_value = volume
res = self._get_resp(snapshot_id)
self.assertEqual(1, mock_volume_get_by_id.call_count)
self.assertEqual(2, len(mock_volume_get_by_id.call_args[0]),
mock_volume_get_by_id.call_args)
self.assertEqual(fake.VOLUME_ID,
mock_volume_get_by_id.call_args[0][1])
self.assertEqual(1, mock_rpcapi.call_count)
self.assertEqual(3, len(mock_rpcapi.call_args[0]))
self.assertEqual(1, len(mock_rpcapi.call_args[1]))
self.assertTrue(mock_rpcapi.call_args[1]['unmanage_only'])
self.assertEqual(0, len(mock_rpcapi.call_args[1]))
self.assertEqual(202, res.status_int, res)

View File

@ -10,8 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_config import cfg
from cinder import context
@ -33,6 +33,7 @@ CGQUOTAS = quota.CGQUOTAS
CONF = cfg.CONF
@ddt.ddt
class ConsistencyGroupTestCase(base.BaseVolumeTestCase):
def test_delete_volume_in_consistency_group(self):
"""Test deleting a volume that's tied to a consistency group fails."""
@ -475,6 +476,8 @@ class ConsistencyGroupTestCase(base.BaseVolumeTestCase):
return cgsnap, snaps
@ddt.data((CONF.host, None), (CONF.host + 'fake', 'mycluster'))
@ddt.unpack
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch('cinder.volume.driver.VolumeDriver.create_consistencygroup',
autospec=True,
@ -488,18 +491,23 @@ class ConsistencyGroupTestCase(base.BaseVolumeTestCase):
@mock.patch('cinder.volume.driver.VolumeDriver.delete_cgsnapshot',
autospec=True,
return_value=({'status': 'deleted'}, []))
def test_create_delete_cgsnapshot(self,
def test_create_delete_cgsnapshot(self, host, cluster,
mock_del_cgsnap, mock_create_cgsnap,
mock_del_cg, _mock_create_cg,
mock_notify):
"""Test cgsnapshot can be created and deleted."""
self.volume.cluster = cluster
group = tests_utils.create_consistencygroup(
self.context,
host=host,
cluster_name=cluster,
availability_zone=CONF.storage_availability_zone,
volume_type='type1,type2')
self.volume_params['host'] = host
volume = tests_utils.create_volume(
self.context,
cluster_name=cluster,
consistencygroup_id=group.id,
**self.volume_params)
self.volume.create_volume(self.context, volume)
@ -592,6 +600,47 @@ class ConsistencyGroupTestCase(base.BaseVolumeTestCase):
self.assertTrue(mock_del_cg.called)
@mock.patch('cinder.volume.driver.VolumeDriver.create_consistencygroup',
mock.Mock(return_value={'status': 'available'}))
@mock.patch('cinder.volume.driver.VolumeDriver.delete_consistencygroup',
return_value=({'status': 'deleted'}, []))
def test_delete_consistencygroup_cluster(self, mock_del_cg):
"""Test consistencygroup can be deleted.
Test consistencygroup can be deleted when volumes are on
the correct volume node.
"""
cluster_name = 'cluster@backend1'
self.volume.host = 'host2@backend1'
self.volume.cluster = cluster_name
group = tests_utils.create_consistencygroup(
self.context,
host=CONF.host + 'fake',
cluster_name=cluster_name,
availability_zone=CONF.storage_availability_zone,
volume_type='type1,type2')
volume = tests_utils.create_volume(
self.context,
consistencygroup_id=group.id,
host='host1@backend1#pool1',
cluster_name=cluster_name,
status='creating',
size=1)
self.volume.create_volume(self.context, volume)
self.volume.delete_consistencygroup(self.context, group)
cg = objects.ConsistencyGroup.get_by_id(
context.get_admin_context(read_deleted='yes'),
group.id)
self.assertEqual(fields.ConsistencyGroupStatus.DELETED, cg.status)
self.assertRaises(exception.NotFound,
objects.ConsistencyGroup.get_by_id,
self.context,
group.id)
self.assertTrue(mock_del_cg.called)
@mock.patch('cinder.volume.driver.VolumeDriver.create_consistencygroup',
return_value={'status': 'available'})
def test_delete_consistencygroup_wrong_host(self, *_mock_create_cg):

View File

@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from oslo_config import cfg
from oslo_utils import importutils
@ -38,6 +38,7 @@ GROUP_QUOTAS = quota.GROUP_QUOTAS
CONF = cfg.CONF
@ddt.ddt
class GroupManagerTestCase(test.TestCase):
def setUp(self):
@ -508,6 +509,8 @@ class GroupManagerTestCase(test.TestCase):
return grpsnap, snaps
@ddt.data((CONF.host, None), (CONF.host + 'fake', 'mycluster'))
@ddt.unpack
@mock.patch('cinder.tests.unit.fake_notifier.FakeNotifier._notify')
@mock.patch('cinder.volume.driver.VolumeDriver.create_group',
autospec=True,
@ -521,23 +524,26 @@ class GroupManagerTestCase(test.TestCase):
@mock.patch('cinder.volume.driver.VolumeDriver.delete_group_snapshot',
autospec=True,
return_value=({'status': 'deleted'}, []))
def test_create_delete_group_snapshot(self,
def test_create_delete_group_snapshot(self, host, cluster,
mock_del_grpsnap,
mock_create_grpsnap,
mock_del_grp,
_mock_create_grp,
mock_notify):
"""Test group_snapshot can be created and deleted."""
self.volume.cluster = cluster
group = tests_utils.create_group(
self.context,
cluster_name=cluster,
availability_zone=CONF.storage_availability_zone,
volume_type_ids=[fake.VOLUME_TYPE_ID],
group_type_id=fake.GROUP_TYPE_ID,
host=CONF.host)
host=host)
volume = tests_utils.create_volume(
self.context,
group_id=group.id,
host=group.host,
cluster_name=group.cluster_name,
volume_type_id=fake.VOLUME_TYPE_ID)
self.volume.create_volume(self.context, volume)
@ -630,6 +636,45 @@ class GroupManagerTestCase(test.TestCase):
self.assertTrue(mock_del_grp.called)
@mock.patch('cinder.volume.driver.VolumeDriver.create_group',
mock.Mock(return_value={'status': 'available'}))
@mock.patch('cinder.volume.driver.VolumeDriver.delete_group',
return_value=({'status': 'deleted'}, []))
def test_delete_group_cluster(self, mock_del_grp):
"""Test group can be deleted on another service in the cluster."""
cluster_name = 'cluster@backend1'
self.volume.host = 'host2@backend1'
self.volume.cluster = cluster_name
group = tests_utils.create_group(
self.context,
host=CONF.host + 'fake',
cluster_name=cluster_name,
availability_zone=CONF.storage_availability_zone,
volume_type_ids=[fake.VOLUME_TYPE_ID],
group_type_id=fake.GROUP_TYPE_ID)
volume = tests_utils.create_volume(
self.context,
group_id=group.id,
host='host1@backend1#pool1',
cluster_name=cluster_name,
status='creating',
volume_type_id=fake.VOLUME_TYPE_ID,
size=1)
self.volume.host = 'host2@backend1'
self.volume.create_volume(self.context, volume)
self.volume.delete_group(self.context, group)
grp = objects.Group.get_by_id(
context.get_admin_context(read_deleted='yes'),
group.id)
self.assertEqual(fields.GroupStatus.DELETED, grp.status)
self.assertRaises(exception.NotFound,
objects.Group.get_by_id,
self.context,
group.id)
self.assertTrue(mock_del_grp.called)
@mock.patch('cinder.volume.driver.VolumeDriver.create_group',
return_value={'status': 'available'})
def test_delete_group_wrong_host(self, *_mock_create_grp):

View File

@ -574,6 +574,18 @@ class VolumeTestCase(base.BaseVolumeTestCase):
self.context,
volume_id)
def test_delete_volume_another_cluster_fails(self):
"""Test delete of volume from another cluster fails."""
self.volume.cluster = 'mycluster'
volume = tests_utils.create_volume(self.context, status='available',
size=1, host=CONF.host + 'fake',
cluster_name=self.volume.cluster)
self.volume.delete_volume(self.context, volume)
self.assertRaises(exception.NotFound,
db.volume_get,
self.context,
volume.id)
@mock.patch('cinder.db.volume_metadata_update')
def test_create_volume_metadata(self, metadata_update):
metadata = {'fake_key': 'fake_value'}
@ -3057,6 +3069,20 @@ class VolumeTestCase(base.BaseVolumeTestCase):
self.context,
snapshot_id)
def test_delete_snapshot_another_cluster_fails(self):
"""Test delete of snapshot from another cluster fails."""
self.volume.cluster = 'mycluster'
volume = tests_utils.create_volume(self.context, status='available',
size=1, host=CONF.host + 'fake',
cluster_name=self.volume.cluster)
snapshot = create_snapshot(volume.id, size=volume.size)
self.volume.delete_snapshot(self.context, snapshot)
self.assertRaises(exception.NotFound,
db.snapshot_get,
self.context,
snapshot.id)
@mock.patch.object(db, 'snapshot_create',
side_effect=exception.InvalidSnapshot(
'Create snapshot in db failed!'))

View File

@ -108,7 +108,7 @@ class VolumeRpcAPITestCase(test.TestCase):
self.fake_reservations = ["RESERVATION"]
self.fake_cg = cg
self.fake_cg2 = cg2
self.fake_src_cg = jsonutils.to_primitive(source_group)
self.fake_src_cg = source_group
self.fake_cgsnap = cgsnapshot
self.fake_backup_obj = fake_backup.fake_backup_obj(self.context)
self.fake_group = generic_group
@ -128,24 +128,7 @@ class VolumeRpcAPITestCase(test.TestCase):
def test_serialized_volume_has_id(self):
self.assertIn('id', self.fake_volume)
def _test_volume_api(self, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
if 'rpcapi_class' in kwargs:
rpcapi_class = kwargs.pop('rpcapi_class')
else:
rpcapi_class = volume_rpcapi.VolumeAPI
rpcapi = rpcapi_class()
expected_retval = {} if rpc_method == 'call' else None
target = {
"version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
}
if 'request_spec' in kwargs:
spec = jsonutils.to_primitive(kwargs['request_spec'])
kwargs['request_spec'] = spec
def _get_expected_msg(self, kwargs):
expected_msg = copy.deepcopy(kwargs)
if 'volume' in expected_msg:
volume = expected_msg.pop('volume')
@ -156,9 +139,10 @@ class VolumeRpcAPITestCase(test.TestCase):
expected_msg['volume_id'] = volume['id']
expected_msg['volume'] = volume
if 'snapshot' in expected_msg:
snapshot = expected_msg.pop('snapshot')
snapshot = expected_msg['snapshot']
if isinstance(snapshot, objects.Snapshot) and 'volume' in snapshot:
snapshot.volume.obj_reset_changes()
expected_msg['snapshot_id'] = snapshot.id
expected_msg['snapshot'] = snapshot
if 'cgsnapshot' in expected_msg:
cgsnapshot = expected_msg['cgsnapshot']
if cgsnapshot:
@ -179,18 +163,41 @@ class VolumeRpcAPITestCase(test.TestCase):
if 'new_volume' in expected_msg:
volume = expected_msg['new_volume']
expected_msg['new_volume_id'] = volume['id']
return expected_msg
def _test_volume_api(self, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
if 'rpcapi_class' in kwargs:
rpcapi_class = kwargs.pop('rpcapi_class')
else:
rpcapi_class = volume_rpcapi.VolumeAPI
rpcapi = rpcapi_class()
expected_retval = {} if rpc_method == 'call' else None
target = {
"version": kwargs.pop('version', rpcapi.RPC_API_VERSION)
}
if 'request_spec' in kwargs:
spec = jsonutils.to_primitive(kwargs['request_spec'])
kwargs['request_spec'] = spec
expected_msg = self._get_expected_msg(kwargs)
if 'host' in kwargs:
host = kwargs['host']
elif 'backend_id' in kwargs:
host = kwargs['backend_id']
elif 'group' in kwargs:
host = kwargs['group']['host']
host = kwargs['group'].service_topic_queue
elif 'volume' in kwargs:
vol = kwargs['volume']
host = vol.service_topic_queue
elif 'snapshot' in kwargs:
host = 'fake_host'
elif 'cgsnapshot' in kwargs:
host = kwargs['cgsnapshot'].consistencygroup.host
host = kwargs['cgsnapshot'].consistencygroup.service_topic_queue
target['server'] = utils.extract_host(host)
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
@ -276,9 +283,9 @@ class VolumeRpcAPITestCase(test.TestCase):
if 'host' in kwargs:
host = kwargs['host']
elif 'group' in kwargs:
host = kwargs['group']['host']
host = kwargs['group'].service_topic_queue
elif 'group_snapshot' in kwargs:
host = kwargs['group_snapshot'].group.host
host = kwargs['group_snapshot'].service_topic_queue
target['server'] = utils.extract_host(host)
target['topic'] = '%s.%s' % (constants.VOLUME_TOPIC, host)
@ -328,6 +335,11 @@ class VolumeRpcAPITestCase(test.TestCase):
self._test_volume_api('delete_consistencygroup', rpc_method='cast',
group=self.fake_cg, version='3.0')
def test_delete_consistencygroup_cluster(self):
self._set_cluster()
self._test_volume_api('delete_consistencygroup', rpc_method='cast',
group=self.fake_src_cg, version='3.0')
def test_update_consistencygroup(self):
self._test_volume_api('update_consistencygroup', rpc_method='cast',
group=self.fake_cg, add_volumes=['vol1'],
@ -338,6 +350,7 @@ class VolumeRpcAPITestCase(test.TestCase):
cgsnapshot=self.fake_cgsnap, version='3.0')
def test_delete_cgsnapshot(self):
self._set_cluster()
self._test_volume_api('delete_cgsnapshot', rpc_method='cast',
cgsnapshot=self.fake_cgsnap, version='3.0')
@ -359,6 +372,15 @@ class VolumeRpcAPITestCase(test.TestCase):
cascade=False,
version='3.0')
def test_delete_volume_cluster(self):
self._set_cluster()
self._test_volume_api('delete_volume',
rpc_method='cast',
volume=self.fake_volume_obj,
unmanage_only=False,
cascade=False,
version='3.0')
def test_delete_volume_cascade(self):
self._test_volume_api('delete_volume',
rpc_method='cast',
@ -375,18 +397,27 @@ class VolumeRpcAPITestCase(test.TestCase):
version='3.0')
def test_delete_snapshot(self):
self.fake_snapshot.volume
self._test_volume_api('delete_snapshot',
rpc_method='cast',
snapshot=self.fake_snapshot,
unmanage_only=False,
version='3.0')
def test_delete_snapshot_cluster(self):
self._set_cluster()
self.fake_snapshot.volume
self._test_volume_api('delete_snapshot',
rpc_method='cast',
snapshot=self.fake_snapshot,
host='fake_host',
unmanage_only=False,
version='3.0')
def test_delete_snapshot_with_unmanage_only(self):
self.fake_snapshot.volume.metadata
self._test_volume_api('delete_snapshot',
rpc_method='cast',
snapshot=self.fake_snapshot,
host='fake_host',
unmanage_only=True,
version='3.0')
@ -419,6 +450,8 @@ class VolumeRpcAPITestCase(test.TestCase):
def _set_cluster(self):
self.fake_volume_obj.cluster_name = 'my_cluster'
self.fake_volume_obj.obj_reset_changes(['cluster_name'])
self.fake_src_cg.cluster_name = 'my_cluster'
self.fake_src_cg.obj_reset_changes(['my_cluster'])
@ddt.data('3.0', '3.3')
@mock.patch('oslo_messaging.RPCClient.can_send_version')
@ -613,7 +646,7 @@ class VolumeRpcAPITestCase(test.TestCase):
def test_get_capabilities(self):
self._test_volume_api('get_capabilities',
rpc_method='call',
host='fake_host',
backend_id='fake_host',
discover=True,
version='3.0')
@ -659,6 +692,11 @@ class VolumeRpcAPITestCase(test.TestCase):
self._test_group_api('delete_group', rpc_method='cast',
group=self.fake_group, version='3.0')
def test_delete_group_cluster(self):
self.fake_group.cluster_name = 'mycluster'
self._test_group_api('delete_group', rpc_method='cast',
group=self.fake_group, version='3.0')
def test_update_group(self):
self._test_group_api('update_group', rpc_method='cast',
group=self.fake_group, add_volumes=['vol1'],

View File

@ -995,10 +995,7 @@ class API(base.Base):
LOG.error(msg)
raise exception.InvalidSnapshot(reason=msg)
# Make RPC call to the right host
volume = objects.Volume.get_by_id(context, snapshot.volume_id)
self.volume_rpcapi.delete_snapshot(context, snapshot, volume.host,
unmanage_only=unmanage_only)
self.volume_rpcapi.delete_snapshot(context, snapshot, unmanage_only)
LOG.info(_LI("Snapshot delete request issued successfully."),
resource=snapshot)

View File

@ -639,6 +639,10 @@ class VolumeManager(manager.CleanableManager,
LOG.info(_LI("Created volume successfully."), resource=volume)
return volume.id
def _is_our_resource(self, resource):
resource_topic = vol_utils.extract_host(resource.service_topic_queue)
return resource_topic == self.service_topic_queue
@coordination.synchronized('{volume.id}-{f_name}')
@objects.Volume.set_workers
def delete_volume(self, context, volume, unmanage_only=False,
@ -671,7 +675,7 @@ class VolumeManager(manager.CleanableManager,
if volume['attach_status'] == fields.VolumeAttachStatus.ATTACHED:
# Volume is still attached, need to detach first
raise exception.VolumeAttached(volume_id=volume.id)
if vol_utils.extract_host(volume.host) != self.host:
if not self._is_our_resource(volume):
raise exception.InvalidVolume(
reason=_("volume is not local to this node"))
@ -1647,7 +1651,8 @@ class VolumeManager(manager.CleanableManager,
# Check the backend capabilities of migration destination host.
rpcapi = volume_rpcapi.VolumeAPI()
capabilities = rpcapi.get_capabilities(ctxt, dest_vol['host'],
capabilities = rpcapi.get_capabilities(ctxt,
dest_vol.service_topic_queue,
False)
sparse_copy_volume = bool(capabilities and
capabilities.get('sparse_copy_volume',
@ -2900,19 +2905,18 @@ class VolumeManager(manager.CleanableManager,
else:
project_id = context.project_id
volumes = self.db.volume_get_all_by_group(context, group.id)
volumes = objects.VolumeList.get_all_by_group(context, group.id)
for volume_ref in volumes:
if (volume_ref['attach_status'] ==
for volume in volumes:
if (volume.attach_status ==
fields.VolumeAttachStatus.ATTACHED):
# Volume is still attached, need to detach first
raise exception.VolumeAttached(volume_id=volume_ref['id'])
raise exception.VolumeAttached(volume_id=volume.id)
# self.host is 'host@backend'
# volume_ref['host'] is 'host@backend#pool'
# volume.host is 'host@backend#pool'
# Extract host before doing comparison
if volume_ref['host']:
new_host = vol_utils.extract_host(volume_ref['host'])
if new_host != self.host:
if volume.host:
if not self._is_our_resource(volume):
raise exception.InvalidVolume(
reason=_("Volume is not local to this node"))
@ -2958,8 +2962,8 @@ class VolumeManager(manager.CleanableManager,
# None for volumes_model_update.
if not volumes_model_update:
for vol in volumes:
self.db.volume_update(
context, vol['id'], {'status': 'error'})
vol.status = 'error'
vol.save()
# Get reservations for group
try:
@ -2974,15 +2978,14 @@ class VolumeManager(manager.CleanableManager,
resource={'type': 'consistency_group',
'id': group.id})
for volume_ref in volumes:
for volume in volumes:
# Get reservations for volume
try:
volume_id = volume_ref['id']
reserve_opts = {'volumes': -1,
'gigabytes': -volume_ref['size']}
'gigabytes': -volume.size}
QUOTAS.add_volume_type_opts(context,
reserve_opts,
volume_ref.get('volume_type_id'))
volume.volume_type_id)
reservations = QUOTAS.reserve(context,
project_id=project_id,
**reserve_opts)
@ -2994,15 +2997,15 @@ class VolumeManager(manager.CleanableManager,
'id': group.id})
# Delete glance metadata if it exists
self.db.volume_glance_metadata_delete_by_volume(context, volume_id)
self.db.volume_glance_metadata_delete_by_volume(context, volume.id)
self.db.volume_destroy(context, volume_id)
self.db.volume_destroy(context, volume.id)
# Commit the reservations
if reservations:
QUOTAS.commit(context, reservations, project_id=project_id)
self.stats['allocated_capacity_gb'] -= volume_ref['size']
self.stats['allocated_capacity_gb'] -= volume.size
if cgreservations:
CGQUOTAS.commit(context, cgreservations,
@ -3038,11 +3041,10 @@ class VolumeManager(manager.CleanableManager,
# vol_obj.host is 'host@backend#pool'
# Extract host before doing comparison
if vol_obj.host:
new_host = vol_utils.extract_host(vol_obj.host)
msg = (_("Volume %(vol_id)s is not local to this node "
"%(host)s") % {'vol_id': vol_obj.id,
'host': self.host})
if new_host != self.host:
if not self._is_our_resource(vol_obj):
backend = vol_utils.extract_host(self.service_topic_queue)
msg = (_("Volume %(vol_id)s is not local to %(backend)s") %
{'vol_id': vol_obj.id, 'backend': backend})
raise exception.InvalidVolume(reason=msg)
self._notify_about_group_usage(

View File

@ -132,7 +132,7 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'create_consistencygroup', group=group)
def delete_consistencygroup(self, ctxt, group):
cctxt = self._get_cctxt(group.host)
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'delete_consistencygroup', group=group)
def update_consistencygroup(self, ctxt, group, add_volumes=None,
@ -156,7 +156,7 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
def delete_cgsnapshot(self, ctxt, cgsnapshot):
cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host)
cctxt = self._get_cctxt(cgsnapshot.service_topic_queue)
cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
def create_volume(self, ctxt, volume, host, request_spec,
@ -170,7 +170,7 @@ class VolumeAPI(rpc.RPCAPI):
def delete_volume(self, ctxt, volume, unmanage_only=False, cascade=False):
volume.create_worker()
cctxt = self._get_cctxt(volume.host)
cctxt = self._get_cctxt(volume.service_topic_queue)
msg_args = {
'volume': volume, 'unmanage_only': unmanage_only,
'cascade': cascade,
@ -183,8 +183,8 @@ class VolumeAPI(rpc.RPCAPI):
cctxt = self._get_cctxt(volume['host'])
cctxt.cast(ctxt, 'create_snapshot', snapshot=snapshot)
def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
cctxt = self._get_cctxt(host)
def delete_snapshot(self, ctxt, snapshot, unmanage_only=False):
cctxt = self._get_cctxt(snapshot.service_topic_queue)
cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
unmanage_only=unmanage_only)
@ -300,8 +300,8 @@ class VolumeAPI(rpc.RPCAPI):
snapshot=snapshot,
ref=ref)
def get_capabilities(self, ctxt, host, discover):
cctxt = self._get_cctxt(host)
def get_capabilities(self, ctxt, backend_id, discover):
cctxt = self._get_cctxt(backend_id)
return cctxt.call(ctxt, 'get_capabilities', discover=discover)
def get_backup_device(self, ctxt, backup, volume):
@ -339,7 +339,7 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'create_group', group=group)
def delete_group(self, ctxt, group):
cctxt = self._get_cctxt(group.host)
cctxt = self._get_cctxt(group.service_topic_queue)
cctxt.cast(ctxt, 'delete_group', group=group)
def update_group(self, ctxt, group, add_volumes=None, remove_volumes=None):
@ -359,6 +359,6 @@ class VolumeAPI(rpc.RPCAPI):
group_snapshot=group_snapshot)
def delete_group_snapshot(self, ctxt, group_snapshot):
cctxt = self._get_cctxt(group_snapshot.group.host)
cctxt = self._get_cctxt(group_snapshot.service_topic_queue)
cctxt.cast(ctxt, 'delete_group_snapshot',
group_snapshot=group_snapshot)

View File

@ -20,3 +20,6 @@ features:
status (`is_up`) as URL parameters. Also added their respective policies."
- "HA A-A: Attach and detach operations are now cluster aware and make full
use of clustered cinder-volume services."
- "HA A-A: Delete volume, delete snapshot, delete consistency group, and
delete consistency group snapshot operations are now cluster aware and make
full use of clustered cinder-volume services."