Merge "Verify applying a new cephx rule after a previous failure"

This commit is contained in:
Zuul 2020-07-20 20:10:20 +00:00 committed by Gerrit Code Review
commit eba8fa9fc2
3 changed files with 66 additions and 15 deletions

View File

@ -262,7 +262,8 @@ class SharesClient(rest_client.RestClient):
(snapshot_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def wait_for_access_rule_status(self, share_id, rule_id, status):
def wait_for_access_rule_status(self, share_id, rule_id, status,
raise_rule_in_error_state=True):
"""Waits for an access rule to reach a given status."""
rule_status = "new"
start = int(time.time())
@ -273,7 +274,7 @@ class SharesClient(rest_client.RestClient):
if rule["id"] in rule_id:
rule_status = rule['state']
break
if 'error' in rule_status:
if 'error' in rule_status and raise_rule_in_error_state:
raise share_exceptions.AccessRuleBuildErrorException(
rule_id=rule_id)

View File

@ -738,7 +738,8 @@ class BaseSharesTest(test.BaseTestCase):
access_to = "client3.com"
elif protocol in CONF.share.enable_cephx_rules_for_protocols:
access_type = "cephx"
access_to = "eve"
access_to = data_utils.rand_name(
cls.__class__.__name__ + '-cephx-id')
else:
message = "Unrecognized protocol and access rules configuration."
raise cls.skipException(message)
@ -1082,6 +1083,25 @@ class BaseSharesTest(test.BaseTestCase):
self.shares_v2_client.wait_for_share_status(share['id'], "error")
return self.shares_v2_client.wait_for_message(share['id'])
def allow_access(self, share_id, client=None, access_type=None,
access_level='rw', access_to=None, status='active',
raise_rule_in_error_state=True, cleanup=True):
client = client or self.shares_v2_client
a_type, a_to = self._get_access_rule_data_from_config()
access_type = access_type or a_type
access_to = access_to or a_to
rule = client.create_access_rule(share_id, access_type, access_to,
access_level)
client.wait_for_access_rule_status(share_id, rule['id'], status,
raise_rule_in_error_state)
if cleanup:
self.addCleanup(client.wait_for_resource_deletion,
rule_id=rule['id'], share_id=share_id)
self.addCleanup(client.delete_access_rule, share_id, rule['id'])
return rule
class BaseSharesAltTest(BaseSharesTest):
"""Base test case class for all Shares Alt API tests."""

View File

@ -20,7 +20,6 @@ import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests import share_exceptions
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
@ -389,10 +388,7 @@ class ShareCephxRulesForCephFSNegativeTest(base.BaseSharesMixedTest):
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_different_tenants_cannot_use_same_cephx_id(self):
# Grant access to the share
access1 = self.shares_v2_client.create_access_rule(
self.share['id'], self.access_type, self.access_to, 'rw')
self.shares_v2_client.wait_for_access_rule_status(
self.share['id'], access1['id'], 'active')
self.allow_access(self.share['id'], access_to=self.access_to)
# Create second share by the new user
share2 = self.create_share(client=self.alt_shares_v2_client,
@ -400,13 +396,47 @@ class ShareCephxRulesForCephFSNegativeTest(base.BaseSharesMixedTest):
share_type_id=self.share_type_id)
# Try grant access to the second share using the same cephx id as used
# on the first share
access2 = self.alt_shares_v2_client.create_access_rule(
share2['id'], self.access_type, self.access_to, 'rw')
self.assertRaises(
share_exceptions.AccessRuleBuildErrorException,
self.alt_shares_v2_client.wait_for_access_rule_status,
share2['id'], access2['id'], 'active')
# on the first share.
# Rule must be set to "error" status.
self.allow_access(share2['id'], client=self.alt_shares_v2_client,
access_to=self.access_to, status='error',
raise_rule_in_error_state=False)
share_alt_updated = self.alt_shares_v2_client.get_share(
share2['id'])
self.assertEqual('error', share_alt_updated['access_rules_status'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_can_apply_new_cephx_rules_when_one_is_in_error_state(self):
# Create share on "primary" tenant
share_primary = self.create_share()
# Add access rule to "Joe" by "primary" user
self.allow_access(share_primary['id'], access_to='Joe')
# Create share on "alt" tenant
share_alt = self.create_share(client=self.alt_shares_v2_client)
# Add access rule to "Joe" by "alt" user.
# Rule must be set to "error" status.
rule1 = self.allow_access(share_alt['id'],
client=self.alt_shares_v2_client,
access_to='Joe',
status='error',
raise_rule_in_error_state=False,
cleanup=False)
# Share's "access_rules_status" must be in "error" status
share_alt_updated = self.alt_shares_v2_client.get_share(
share_alt['id'])
self.assertEqual('error', share_alt_updated['access_rules_status'])
# Add second access rule to different client by "alt" user.
self.allow_access(share_alt['id'], client=self.alt_shares_v2_client)
# Check share's access_rules_status has transitioned to "active" status
self.alt_shares_v2_client.delete_access_rule(
share_alt['id'], rule1['id'])
self.alt_shares_v2_client.wait_for_share_status(
share_alt['id'], 'active', status_attr='access_rules_status')
@ddt.ddt