cinder/cinder/tests/unit/group/test_groups_manager_replica...

134 lines
5.5 KiB
Python

# Copyright (C) 2017 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from oslo_config import cfg
from oslo_utils import importutils
from cinder import context
from cinder import exception
from cinder import objects
from cinder.objects import fields
from cinder import quota
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import test
from cinder.tests.unit import utils as tests_utils
from cinder.volume import api as volume_api
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume import volume_utils
GROUP_QUOTAS = quota.GROUP_QUOTAS
CONF = cfg.CONF
@ddt.ddt
class GroupManagerTestCase(test.TestCase):
def setUp(self):
super(GroupManagerTestCase, self).setUp()
self.volume = importutils.import_object(CONF.volume_manager)
self.configuration = mock.Mock(conf.Configuration)
self.context = context.get_admin_context()
self.context.user_id = fake.USER_ID
self.project_id = fake.PROJECT3_ID
self.context.project_id = self.project_id
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0,
'pools': {}}
self.volume_api = volume_api.API()
@mock.patch.object(GROUP_QUOTAS, "reserve",
return_value=["RESERVATION"])
@mock.patch.object(GROUP_QUOTAS, "commit")
@mock.patch.object(GROUP_QUOTAS, "rollback")
@mock.patch.object(driver.VolumeDriver,
"delete_group",
return_value=({'status': (
fields.GroupStatus.DELETED)}, []))
@mock.patch.object(driver.VolumeDriver,
"enable_replication",
return_value=(None, []))
@mock.patch.object(driver.VolumeDriver,
"disable_replication",
return_value=(None, []))
@mock.patch.object(driver.VolumeDriver,
"failover_replication",
return_value=(None, []))
def test_replication_group(self, fake_failover_rep, fake_disable_rep,
fake_enable_rep, fake_delete_grp,
fake_rollback, fake_commit, fake_reserve):
"""Test enable, disable, and failover replication for group."""
def fake_driver_create_grp(context, group):
"""Make sure that the pool is part of the host."""
self.assertIn('host', group)
host = group.host
pool = volume_utils.extract_host(host, level='pool')
self.assertEqual('fakepool', pool)
return {'status': fields.GroupStatus.AVAILABLE,
'replication_status': fields.ReplicationStatus.DISABLING}
self.mock_object(self.volume.driver, 'create_group',
fake_driver_create_grp)
group = tests_utils.create_group(
self.context,
availability_zone=CONF.storage_availability_zone,
volume_type_ids=[fake.VOLUME_TYPE_ID],
host='fakehost@fakedrv#fakepool',
group_type_id=fake.GROUP_TYPE_ID)
group = objects.Group.get_by_id(self.context, group.id)
self.volume.create_group(self.context, group)
self.assertEqual(
group.id,
objects.Group.get_by_id(context.get_admin_context(),
group.id).id)
self.volume.disable_replication(self.context, group)
group = objects.Group.get_by_id(
context.get_admin_context(), group.id)
self.assertEqual(fields.ReplicationStatus.DISABLED,
group.replication_status)
group.replication_status = fields.ReplicationStatus.ENABLING
group.save()
self.volume.enable_replication(self.context, group)
group = objects.Group.get_by_id(
context.get_admin_context(), group.id)
self.assertEqual(fields.ReplicationStatus.ENABLED,
group.replication_status)
group.replication_status = fields.ReplicationStatus.FAILING_OVER
group.save()
self.volume.failover_replication(self.context, group)
group = objects.Group.get_by_id(
context.get_admin_context(), group.id)
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
group.replication_status)
targets = self.volume.list_replication_targets(self.context, group)
self.assertIn('replication_targets', targets)
self.volume.delete_group(self.context, group)
grp = objects.Group.get_by_id(
context.get_admin_context(read_deleted='yes'), group.id)
self.assertEqual(fields.GroupStatus.DELETED, grp.status)
self.assertRaises(exception.NotFound,
objects.Group.get_by_id,
self.context,
group.id)