Verify applying a new cephx rule after a previous failure

This test will create a share and will then assign an access rule
that will fall into the error status and will then create another
access rule on the share.
The test will then verify that the second access rule was applied
successfully.

This patch also refactors "test_different_tenants_cannot_use_same_cephx_id"
to use raise_rule_in_error_state=False.

Added: "allow_access" helper method the grants an access to a
share and deletes it at the end of the test.

Change-Id: If9ffab7fcf37fab77bb4c9fd1863a0316f0a370d
This commit is contained in:
lkuchlan 2020-07-14 18:05:09 +03:00
parent e52b78d3d8
commit 5af7cb48f3
3 changed files with 66 additions and 15 deletions

View File

@ -262,7 +262,8 @@ class SharesClient(rest_client.RestClient):
(snapshot_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def wait_for_access_rule_status(self, share_id, rule_id, status):
def wait_for_access_rule_status(self, share_id, rule_id, status,
raise_rule_in_error_state=True):
"""Waits for an access rule to reach a given status."""
rule_status = "new"
start = int(time.time())
@ -273,7 +274,7 @@ class SharesClient(rest_client.RestClient):
if rule["id"] in rule_id:
rule_status = rule['state']
break
if 'error' in rule_status:
if 'error' in rule_status and raise_rule_in_error_state:
raise share_exceptions.AccessRuleBuildErrorException(
rule_id=rule_id)

View File

@ -738,7 +738,8 @@ class BaseSharesTest(test.BaseTestCase):
access_to = "client3.com"
elif protocol in CONF.share.enable_cephx_rules_for_protocols:
access_type = "cephx"
access_to = "eve"
access_to = data_utils.rand_name(
cls.__class__.__name__ + '-cephx-id')
else:
message = "Unrecognized protocol and access rules configuration."
raise cls.skipException(message)
@ -1082,6 +1083,25 @@ class BaseSharesTest(test.BaseTestCase):
self.shares_v2_client.wait_for_share_status(share['id'], "error")
return self.shares_v2_client.wait_for_message(share['id'])
def allow_access(self, share_id, client=None, access_type=None,
access_level='rw', access_to=None, status='active',
raise_rule_in_error_state=True, cleanup=True):
client = client or self.shares_v2_client
a_type, a_to = self._get_access_rule_data_from_config()
access_type = access_type or a_type
access_to = access_to or a_to
rule = client.create_access_rule(share_id, access_type, access_to,
access_level)
client.wait_for_access_rule_status(share_id, rule['id'], status,
raise_rule_in_error_state)
if cleanup:
self.addCleanup(client.wait_for_resource_deletion,
rule_id=rule['id'], share_id=share_id)
self.addCleanup(client.delete_access_rule, share_id, rule['id'])
return rule
class BaseSharesAltTest(BaseSharesTest):
"""Base test case class for all Shares Alt API tests."""

View File

@ -20,7 +20,6 @@ import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests import share_exceptions
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
@ -389,10 +388,7 @@ class ShareCephxRulesForCephFSNegativeTest(base.BaseSharesMixedTest):
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_different_tenants_cannot_use_same_cephx_id(self):
# Grant access to the share
access1 = self.shares_v2_client.create_access_rule(
self.share['id'], self.access_type, self.access_to, 'rw')
self.shares_v2_client.wait_for_access_rule_status(
self.share['id'], access1['id'], 'active')
self.allow_access(self.share['id'], access_to=self.access_to)
# Create second share by the new user
share2 = self.create_share(client=self.alt_shares_v2_client,
@ -400,13 +396,47 @@ class ShareCephxRulesForCephFSNegativeTest(base.BaseSharesMixedTest):
share_type_id=self.share_type_id)
# Try grant access to the second share using the same cephx id as used
# on the first share
access2 = self.alt_shares_v2_client.create_access_rule(
share2['id'], self.access_type, self.access_to, 'rw')
self.assertRaises(
share_exceptions.AccessRuleBuildErrorException,
self.alt_shares_v2_client.wait_for_access_rule_status,
share2['id'], access2['id'], 'active')
# on the first share.
# Rule must be set to "error" status.
self.allow_access(share2['id'], client=self.alt_shares_v2_client,
access_to=self.access_to, status='error',
raise_rule_in_error_state=False)
share_alt_updated = self.alt_shares_v2_client.get_share(
share2['id'])
self.assertEqual('error', share_alt_updated['access_rules_status'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_can_apply_new_cephx_rules_when_one_is_in_error_state(self):
# Create share on "primary" tenant
share_primary = self.create_share()
# Add access rule to "Joe" by "primary" user
self.allow_access(share_primary['id'], access_to='Joe')
# Create share on "alt" tenant
share_alt = self.create_share(client=self.alt_shares_v2_client)
# Add access rule to "Joe" by "alt" user.
# Rule must be set to "error" status.
rule1 = self.allow_access(share_alt['id'],
client=self.alt_shares_v2_client,
access_to='Joe',
status='error',
raise_rule_in_error_state=False,
cleanup=False)
# Share's "access_rules_status" must be in "error" status
share_alt_updated = self.alt_shares_v2_client.get_share(
share_alt['id'])
self.assertEqual('error', share_alt_updated['access_rules_status'])
# Add second access rule to different client by "alt" user.
self.allow_access(share_alt['id'], client=self.alt_shares_v2_client)
# Check share's access_rules_status has transitioned to "active" status
self.alt_shares_v2_client.delete_access_rule(
share_alt['id'], rule1['id'])
self.alt_shares_v2_client.wait_for_share_status(
share_alt['id'], 'active', status_attr='access_rules_status')
@ddt.ddt