[scenario] Add test case to check for RO access
Test if a compute instance with RO access granted to it via floating IP address fails to write on the share. Also, add the capability to pass the access level to the methods that allow access to shares. Change-Id: I1aef8f1ed1b93c0847a6aa982f62c8e539d04337 Signed-off-by: Rishabh Dave <ridave@redhat.com>
This commit is contained in:
parent
7f35fc0f5e
commit
42329e2a9f
@ -136,18 +136,20 @@ class ShareScenarioTest(manager.NetworkScenarioTest):
|
||||
sn['id'])
|
||||
return sn
|
||||
|
||||
def _allow_access(self, share_id, client=None,
|
||||
access_type="ip", access_to="0.0.0.0", cleanup=True):
|
||||
def _allow_access(self, share_id, client=None, access_type="ip",
|
||||
access_level="rw", access_to="0.0.0.0", cleanup=True):
|
||||
"""Allow share access
|
||||
|
||||
:param share_id: id of the share
|
||||
:param client: client object
|
||||
:param access_type: "ip", "user" or "cert"
|
||||
:param access_level: "rw" or "ro"
|
||||
:param access_to
|
||||
:returns: access object
|
||||
"""
|
||||
client = client or self.shares_client
|
||||
access = client.create_access_rule(share_id, access_type, access_to)
|
||||
access = client.create_access_rule(share_id, access_type, access_to,
|
||||
access_level)
|
||||
|
||||
# NOTE(u_glide): Ignore provided client, because we always need v2
|
||||
# client to make this call
|
||||
@ -158,6 +160,17 @@ class ShareScenarioTest(manager.NetworkScenarioTest):
|
||||
self.addCleanup(client.delete_access_rule, share_id, access['id'])
|
||||
return access
|
||||
|
||||
def _deny_access(self, share_id, rule_id, client=None):
|
||||
"""Deny share access
|
||||
|
||||
:param share_id: id of the share
|
||||
:param rule_id: id of the rule that will be deleted
|
||||
"""
|
||||
client = client or self.shares_client
|
||||
client.delete_access_rule(share_id, rule_id)
|
||||
self.shares_v2_client.wait_for_share_status(
|
||||
share_id, "active", status_attr='access_rules_status')
|
||||
|
||||
def _allow_access_snapshot(self, snapshot_id, access_type="ip",
|
||||
access_to="0.0.0.0/0", cleanup=True):
|
||||
"""Allow snapshot access
|
||||
|
@ -206,8 +206,8 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
|
||||
self.share = self._create_share(**kwargs)
|
||||
return self.share
|
||||
|
||||
def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True,
|
||||
snapshot=None):
|
||||
def allow_access_ip(self, share_id, ip=None, instance=None,
|
||||
access_level="rw", cleanup=True, snapshot=None):
|
||||
if instance and not ip:
|
||||
try:
|
||||
net_addresses = instance['addresses']
|
||||
@ -225,16 +225,21 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
|
||||
self._allow_access_snapshot(snapshot['id'], access_type='ip',
|
||||
access_to=ip, cleanup=cleanup)
|
||||
else:
|
||||
self._allow_access(share_id, access_type='ip', access_to=ip,
|
||||
cleanup=cleanup, client=self.shares_v2_client)
|
||||
return self._allow_access(share_id, access_type='ip',
|
||||
access_level=access_level, access_to=ip,
|
||||
cleanup=cleanup,
|
||||
client=self.shares_v2_client)
|
||||
|
||||
def deny_access(self, share_id, access_rule_id):
|
||||
self._deny_access(share_id, access_rule_id)
|
||||
|
||||
def provide_access_to_auxiliary_instance(self, instance, share=None,
|
||||
snapshot=None):
|
||||
snapshot=None, access_level='rw'):
|
||||
share = share or self.share
|
||||
if self.protocol.lower() == 'cifs':
|
||||
self.allow_access_ip(
|
||||
return self.allow_access_ip(
|
||||
share['id'], instance=instance, cleanup=False,
|
||||
snapshot=snapshot)
|
||||
snapshot=snapshot, access_level=access_level)
|
||||
elif not CONF.share.multitenancy_enabled:
|
||||
if self.use_ipv6:
|
||||
server_ip = self._get_ipv6_server_ip(instance)
|
||||
@ -242,14 +247,15 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
|
||||
server_ip = (CONF.share.override_ip_for_nfs_access or
|
||||
self.floatings[instance['id']]['ip'])
|
||||
self.assertIsNotNone(server_ip)
|
||||
self.allow_access_ip(
|
||||
return self.allow_access_ip(
|
||||
share['id'], ip=server_ip,
|
||||
instance=instance, cleanup=False, snapshot=snapshot)
|
||||
instance=instance, cleanup=False, snapshot=snapshot,
|
||||
access_level=access_level)
|
||||
elif (CONF.share.multitenancy_enabled and
|
||||
self.protocol.lower() == 'nfs'):
|
||||
self.allow_access_ip(
|
||||
return self.allow_access_ip(
|
||||
share['id'], instance=instance, cleanup=False,
|
||||
snapshot=snapshot)
|
||||
snapshot=snapshot, access_level=access_level)
|
||||
|
||||
def wait_for_active_instance(self, instance_id):
|
||||
waiters.wait_for_server_status(
|
||||
@ -340,6 +346,31 @@ class ShareBasicOpsBase(manager.ShareScenarioTest):
|
||||
|
||||
return locations
|
||||
|
||||
@tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
|
||||
def test_write_with_ro_access(self):
|
||||
'''Test if an instance with ro access can write on the share.'''
|
||||
test_data = "Some test data to write"
|
||||
|
||||
instance = self.boot_instance(wait_until="BUILD")
|
||||
self.create_share()
|
||||
location = self._get_user_export_locations(self.share)[0]
|
||||
instance = self.wait_for_active_instance(instance["id"])
|
||||
|
||||
ssh_client_inst = self.init_ssh(instance)
|
||||
|
||||
# First, check if write works RW access.
|
||||
acc_rule_id = self.provide_access_to_auxiliary_instance(instance)['id']
|
||||
self.mount_share(location, ssh_client_inst)
|
||||
self.write_data(test_data, ssh_client_inst)
|
||||
self.deny_access(self.share['id'], acc_rule_id)
|
||||
|
||||
self.provide_access_to_auxiliary_instance(instance, access_level='ro')
|
||||
self.addCleanup(self.umount_share, ssh_client_inst)
|
||||
|
||||
# Test if write with RO access fails.
|
||||
self.assertRaises(exceptions.SSHExecCommandFailed,
|
||||
self.write_data, test_data, ssh_client_inst)
|
||||
|
||||
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
|
||||
def test_read_write_two_vms(self):
|
||||
"""Boots two vms and writes/reads data on it."""
|
||||
|
Loading…
Reference in New Issue
Block a user