Update plugin for tempest

Partially implements bp: tempest-job

Change-Id: I7119d4682434b54c2804d4bb2101712fbaafd616
This commit is contained in:
vponomaryov 2014-02-25 09:19:22 +02:00
parent 075f707530
commit 0760c35223
32 changed files with 1433 additions and 1007 deletions

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,15 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.api.share import base
from tempest import test
class AdminActionsTestJSON(base.BaseSharesAdminTest):
class AdminActionsTest(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(AdminActionsTestJSON, cls).setUpClass()
super(AdminActionsTest, cls).setUpClass()
# create share (available or error)
cls.share_states = ["error", "available"]
@ -49,7 +47,3 @@ class AdminActionsTestJSON(base.BaseSharesAdminTest):
status=status)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_snapshot_status(self.sn["id"], status)
class AdminActionsTestXML(AdminActionsTestJSON):
_interface = 'xml'

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,28 +13,27 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import clients_shares as clients
from tempest import config_shares as config
from tempest.api.share import base
from tempest import clients_share as clients
from tempest import config_share as config
from tempest import exceptions
from tempest import test
import testtools
CONF = config.CONF
class AdminActionsNegativeTestJSON(base.BaseSharesAdminTest):
class AdminActionsNegativeTest(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(AdminActionsNegativeTestJSON, cls).setUpClass()
super(AdminActionsNegativeTest, cls).setUpClass()
# create share (available or error)
__, cls.sh = cls.create_share_wait_for_active()
# create snapshot (available or error)
__, cls.sn = cls.create_snapshot_wait_for_active(cls.sh["id"])
cls.member_shares_client = clients.Manager().shares_client
@test.attr(type=['negative', ])
@ -61,8 +58,6 @@ class AdminActionsNegativeTestJSON(base.BaseSharesAdminTest):
self.shares_client.reset_state,
self.sn["id"], s_type="snapshots", status="fake")
@testtools.skipIf(not CONF.shares.only_admin_or_owner_for_action,
"Skipped, because not only admin allowed")
@test.attr(type=['negative', ])
def test_try_reset_share_state_with_member(self):
# Even if member from another tenant, it should be unauthorized
@ -70,15 +65,9 @@ class AdminActionsNegativeTestJSON(base.BaseSharesAdminTest):
self.member_shares_client.reset_state,
self.sh["id"])
@testtools.skipIf(not CONF.shares.only_admin_or_owner_for_action,
"Skipped, because not only admin allowed")
@test.attr(type=['negative', ])
def test_try_reset_snapshot_state_with_member(self):
# Even if member from another tenant, it should be unauthorized
self.assertRaises(exceptions.Unauthorized,
self.member_shares_client.reset_state,
self.sn["id"], s_type="snapshots")
class AdminActionsNegativeTestXML(AdminActionsNegativeTestJSON):
_interface = 'xml'

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,32 +13,40 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.api.share import base
from tempest import clients_share as clients
from tempest.common import isolated_creds
from tempest import config_share as config
from tempest import test
CONF = config.CONF
class SharesQuotasTestJSON(base.BaseSharesAdminTest):
# Tests should be used without unlimited quotas (-1).
# It is recommended to delete all entities in Manila before test run.
class SharesQuotasTest(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(SharesQuotasTestJSON, cls).setUpClass()
super(SharesQuotasTest, cls).setUpClass()
# Use isolated creds
cls.isolated_creds = isolated_creds.IsolatedCreds(cls.__name__)
creds = cls.isolated_creds.get_admin_creds()
username, tenant_name, password = creds
cls.os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
cls.shares_client = cls.os.shares_client
# Get tenant and user
cls.identity_client = cls._get_identity_admin_client()
cls.tenant = cls.identity_client\
.get_tenant_by_name(cls.shares_client.tenant_name)
cls.user = cls.identity_client\
.get_user_by_username(cls.tenant["id"],
cls.shares_client.username)
cls.tenant = cls.identity_client.get_tenant_by_name(
cls.shares_client.auth_params["tenant"])
cls.user = cls.identity_client.get_user_by_username(
cls.tenant["id"], cls.shares_client.auth_params["user"])
# save quotas before tests
__, cls.t_q = cls.shares_client.show_quotas(cls.tenant["id"])
__, cls.u_q = cls.shares_client.show_quotas(cls.tenant["id"],
cls.user["id"])
value = 1000
# set quotas before tests
value = 1000
cls.shares_client.update_quotas(cls.tenant["id"], shares=value,
snapshots=value, gigabytes=value)
cls.shares_client.update_quotas(cls.tenant["id"], cls.user["id"],
@ -49,17 +55,8 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
@classmethod
def tearDownClass(cls):
super(SharesQuotasTestJSON, cls).tearDownClass()
# back up quota values
cls.shares_client.update_quotas(cls.tenant["id"],
shares=cls.t_q["shares"],
snapshots=cls.t_q["snapshots"],
gigabytes=cls.t_q["gigabytes"])
cls.shares_client.update_quotas(cls.tenant["id"],
cls.user["id"],
shares=cls.u_q["shares"],
snapshots=cls.u_q["snapshots"],
gigabytes=cls.u_q["gigabytes"])
super(SharesQuotasTest, cls).tearDownClass()
cls.isolated_creds.clear_isolated_creds()
@test.attr(type=['positive', 'smoke'])
def test_limits_keys(self):
@ -93,7 +90,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertGreater(int(limits["absolute"]["maxTotalShares"]), -2)
self.assertGreater(int(limits["absolute"]["maxTotalSnapshots"]), -2)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_default_quotas(self):
resp, quotas = self.shares_client.default_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@ -118,14 +115,14 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertGreater(int(quotas["shares"]), -2)
self.assertGreater(int(quotas["snapshots"]), -2)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_default_quotas_with_empty_tenant_id(self):
# it should return default quotas without any tenant-id
resp, body = self.shares_client.default_quotas("")
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertTrue(len(body) > 0)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_update_tenant_quota_shares(self):
# get current quotas
@ -139,7 +136,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["shares"]), new_quota)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_update_user_quota_shares(self):
# get current quotas
@ -154,7 +151,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["shares"]), new_quota)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_update_tenant_quota_snapshots(self):
# get current quotas
@ -168,7 +165,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["snapshots"]), new_quota)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_update_user_quota_snapshots(self):
# get current quotas
@ -183,7 +180,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["snapshots"]), new_quota)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_update_tenant_quota_gigabytes(self):
# get current quotas
@ -199,7 +196,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["gigabytes"]), gigabytes)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_update_user_quota_gigabytes(self):
# get current quotas
@ -217,7 +214,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["gigabytes"]), gigabytes)
@test.attr(type='positive')
@test.attr(type=['positive', ])
def test_reset_tenant_quotas(self):
# get default_quotas
@ -244,7 +241,7 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
self.assertEqual(int(updated["gigabytes"]), gigabytes)
# reset customized quotas
resp, reseted = self.shares_client.reset_quotas(self.tenant["id"])
resp, reset = self.shares_client.reset_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify quotas
@ -255,7 +252,3 @@ class SharesQuotasTestJSON(base.BaseSharesAdminTest):
int(default["snapshots"]))
self.assertEqual(int(after_delete["gigabytes"]),
int(default["gigabytes"]))
class SharesQuotasTestXML(SharesQuotasTestJSON):
_interface = 'xml'

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,35 +13,43 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.api.share import base
from tempest import clients_share as clients
from tempest.common import isolated_creds
from tempest import config_share as config
from tempest import exceptions
from tempest import exceptions_shares
from tempest import test
import unittest
import testtools
CONF = config.CONF
class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
# Tests should be used without unlimited quotas (-1).
# It is recommended to delete all entities in Manila before test run.
class SharesQuotasNegativeTest(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(SharesQuotasNegativeTestJSON, cls).setUpClass()
super(SharesQuotasNegativeTest, cls).setUpClass()
# Use isolated creds
cls.isolated_creds = isolated_creds.IsolatedCreds(cls.__name__)
creds = cls.isolated_creds.get_admin_creds()
username, tenant_name, password = creds
cls.os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
cls.shares_client = cls.os.shares_client
# Get tenant and user
cls.identity_client = cls._get_identity_admin_client()
cls.tenant = cls.identity_client\
.get_tenant_by_name(cls.shares_client.tenant_name)
cls.user = cls.identity_client\
.get_user_by_username(cls.tenant["id"],
cls.shares_client.username)
cls.tenant = cls.identity_client.get_tenant_by_name(
cls.shares_client.auth_params["tenant"])
cls.user = cls.identity_client.get_user_by_username(
cls.tenant["id"], cls.shares_client.auth_params["user"])
# save quotas before tests
__, cls.t_q = cls.shares_client.show_quotas(cls.tenant["id"])
__, cls.u_q = cls.shares_client.show_quotas(cls.tenant["id"],
cls.user["id"])
value = 1000
# set quotas before tests
value = 1000
cls.shares_client.update_quotas(cls.tenant["id"], shares=value,
snapshots=value, gigabytes=value)
cls.shares_client.update_quotas(cls.tenant["id"], cls.user["id"],
@ -52,49 +58,40 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
@classmethod
def tearDownClass(cls):
super(SharesQuotasNegativeTestJSON, cls).tearDownClass()
# back up quota values
cls.shares_client.update_quotas(cls.tenant["id"],
shares=cls.t_q["shares"],
snapshots=cls.t_q["snapshots"],
gigabytes=cls.t_q["gigabytes"])
cls.shares_client.update_quotas(cls.tenant["id"],
cls.user["id"],
shares=cls.u_q["shares"],
snapshots=cls.u_q["snapshots"],
gigabytes=cls.u_q["gigabytes"])
super(SharesQuotasNegativeTest, cls).tearDownClass()
cls.isolated_creds.clear_isolated_creds()
@test.attr(type='negative')
@unittest.skip("Skip until Bug #1234244 is fixed")
@test.attr(type=['negative', ])
@testtools.skip("Skip until Bug #1234244 is fixed")
def test_quotas_with_wrong_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_quotas, "wrong_tenant_id")
@test.attr(type='negative')
@unittest.skip("Skip until Bug #1234244 is fixed")
@test.attr(type=['negative', ])
@testtools.skip("Skip until Bug #1234244 is fixed")
def test_quotas_with_wrong_user_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_quotas,
self.tenant["id"],
"wrong_user_id")
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_quotas_with_empty_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.show_quotas, "")
@test.attr(type='negative')
@unittest.skip("Skip until Bug #1233170 is fixed")
@test.attr(type=['negative', ])
@testtools.skip("Skip until Bug #1233170 is fixed")
def test_default_quotas_with_wrong_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.default_quotas, "wrong_tenant_id")
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_reset_quotas_with_empty_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.reset_quotas, "")
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_update_shares_quota_with_wrong_data(self):
# -1 is acceptable value as unlimited
self.assertRaises(exceptions.BadRequest,
@ -102,7 +99,7 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
self.tenant["id"],
shares=-2)
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_update_snapshots_quota_with_wrong_data(self):
# -1 is acceptable value as unlimited
self.assertRaises(exceptions.BadRequest,
@ -110,7 +107,7 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
self.tenant["id"],
snapshots=-2)
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_update_gigabytes_quota_with_wrong_data(self):
# -1 is acceptable value as unlimited
self.assertRaises(exceptions.BadRequest,
@ -118,7 +115,7 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
self.tenant["id"],
gigabytes=-2)
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_create_share_with_size_bigger_than_quota(self):
new_quota = 25
@ -134,33 +131,21 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
self.create_share_wait_for_active,
size=overquota)
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_unlimited_quota_for_gigabytes(self):
# get current quota
_, quotas = self.shares_client.show_quotas(self.tenant["id"])
# set unlimited quota for gigabytes
resp, __ = self.shares_client.update_quotas(self.tenant["id"],
gigabytes=-1)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['negative', ])
def test_unlimited_user_quota_for_gigabytes(self):
resp, __ = self.shares_client.update_quotas(self.tenant["id"],
self.user["id"],
gigabytes=-1)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# share should be scheduled
self.assertRaises(exceptions_shares.ShareBuildErrorException,
self.create_share_wait_for_active, size=987654)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# return quotas as it was
self.shares_client.update_quotas(self.tenant["id"],
gigabytes=quotas["gigabytes"])
self.shares_client.update_quotas(self.tenant["id"], self.user["id"],
gigabytes=quotas["gigabytes"])
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_try_set_user_quota_gigabytes_bigger_than_tenant_quota(self):
# get current quotas for tenant
@ -174,7 +159,7 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
self.user["id"],
gigabytes=bigger_value)
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_try_set_user_quota_shares_bigger_than_tenant_quota(self):
# get current quotas for tenant
@ -188,7 +173,7 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
self.user["id"],
shares=bigger_value)
@test.attr(type='negative')
@test.attr(type=['negative', ])
def test_try_set_user_quota_snaps_bigger_than_tenant_quota(self):
# get current quotas for tenant
@ -201,7 +186,3 @@ class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
self.tenant["id"],
self.user["id"],
snapshots=bigger_value)
class SharesQuotasNegativeTestXML(SharesQuotasNegativeTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,75 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import test
class SecurityServicesTest(base.BaseSharesAdminTest):
@test.attr(type=['positive', ])
def test_create_delete_security_service(self):
data = self.generate_security_service_data()
self.service_names = ["ldap", "kerberos", "active_directory"]
for ss_name in self.service_names:
resp, ss = self.create_security_service(ss_name, **data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, ss)
self.assertEqual(ss_name, ss["type"])
resp, __ = self.shares_client.delete_security_service(ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_get_security_service(self):
data = self.generate_security_service_data()
resp, ss = self.create_security_service(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, ss)
resp, get = self.shares_client.get_security_service(ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, get)
@test.attr(type=['positive', ])
def test_update_security_service(self):
data = self.generate_security_service_data()
resp, ss = self.create_security_service(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, ss)
upd_data = self.generate_security_service_data()
resp, updated = self.shares_client.update_security_service(ss["id"],
**upd_data)
resp, get = self.shares_client.get_security_service(ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(upd_data, updated)
self.assertDictContainsSubset(upd_data, get)
@test.attr(type=['positive', ])
def test_list_security_services(self):
data = self.generate_security_service_data()
resp, ss = self.create_security_service(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, ss)
resp, listed = self.shares_client.list_security_services()
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
any(ss["id"] in ss["id"] for ss in listed)
# verify keys
keys = ["name", "id", "status"]
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]

View File

@ -0,0 +1,78 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import test
class SecurityServicesMappingTest(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(SecurityServicesMappingTest, cls).setUpClass()
cls.cl = cls.shares_client
def setUp(self):
super(SecurityServicesMappingTest, self).setUp()
# create share network
data = self.generate_share_network_data()
resp, self.sn = self.create_share_network(client=self.cl, **data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, self.sn)
# create security service
data = self.generate_security_service_data()
resp, self.ss = self.create_security_service(client=self.cl, **data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, self.ss)
# Add security service to share network
resp, __ = self.cl.add_sec_service_to_share_network(self.sn["id"],
self.ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=["gate", "positive", "smoke"])
def test_map_ss_to_sn_and_list(self):
# List security services for share network
resp, ls = self.cl.list_sec_services_for_share_network(self.sn["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(1, len(ls))
for key in ["status", "id", "name"]:
self.assertIn(self.ss[key], ls[0][key])
@test.attr(type=["gate", "positive", "smoke"])
def test_map_ss_to_sn_and_delete(self):
# Remove security service from share network
resp, __ = self.cl.remove_sec_service_from_share_network(self.sn["id"],
self.ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=["gate", "positive", "smoke"])
def test_remap_ss_to_sn(self):
# Remove security service from share network
resp, __ = self.cl.remove_sec_service_from_share_network(self.sn["id"],
self.ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# Add security service to share network again
resp, __ = self.cl.add_sec_service_to_share_network(self.sn["id"],
self.ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)

View File

@ -0,0 +1,111 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import exceptions
from tempest import test
class SecServicesMappingNegativeTest(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(SecServicesMappingNegativeTest, cls).setUpClass()
__, cls.sn = cls.create_share_network()
__, cls.ss = cls.create_security_service()
cls.cl = cls.shares_client
@test.attr(type=['negative', ])
def test_add_sec_service_twice_to_share_network(self):
resp, __ = self.cl.add_sec_service_to_share_network(self.sn["id"],
self.ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertRaises(exceptions.BadRequest,
self.cl.add_sec_service_to_share_network,
self.sn["id"], self.ss["id"])
@test.attr(type=['negative', ])
def test_add_nonexistant_sec_service_to_share_network(self):
self.assertRaises(exceptions.NotFound,
self.cl.add_sec_service_to_share_network,
self.sn["id"], "wrong_ss_id")
@test.attr(type=['negative', ])
def test_add_empty_sec_service_id_to_share_network(self):
self.assertRaises(exceptions.NotFound,
self.cl.add_sec_service_to_share_network,
self.sn["id"], "")
@test.attr(type=['negative', ])
def test_add_sec_service_to_nonexistant_share_network(self):
self.assertRaises(exceptions.NotFound,
self.cl.add_sec_service_to_share_network,
"wrong_sn_id", self.ss["id"])
@test.attr(type=['negative', ])
def test_add_sec_service_to_share_network_with_empty_id(self):
self.assertRaises(exceptions.NotFound,
self.cl.add_sec_service_to_share_network,
"", self.ss["id"])
@test.attr(type=['negative', ])
def test_list_sec_services_for_nonexistant_share_network(self):
self.assertRaises(exceptions.NotFound,
self.cl.list_sec_services_for_share_network,
"wrong_id")
@test.attr(type=['negative', ])
def test_delete_nonexistant_sec_service_from_share_network(self):
self.assertRaises(exceptions.NotFound,
self.cl.remove_sec_service_from_share_network,
self.sn["id"], "wrong_id")
@test.attr(type=['negative', ])
def test_delete_sec_service_from_nonexistant_share_network(self):
self.assertRaises(exceptions.NotFound,
self.cl.remove_sec_service_from_share_network,
"wrong_id", self.ss["id"])
@test.attr(type=['negative', ])
def test_delete_nonexistant_ss_from_nonexistant_sn(self):
self.assertRaises(exceptions.NotFound,
self.cl.remove_sec_service_from_share_network,
"wrong_id", "wrong_id")
@test.attr(type=['negative', ])
def test_try_map_same_ss_to_sn_twice(self):
# create share network
data = self.generate_share_network_data()
resp, sn = self.create_share_network(client=self.cl, **data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, sn)
# create security service
data = self.generate_security_service_data()
resp, ss = self.create_security_service(client=self.cl, **data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, ss)
# Add security service to share network
resp, __ = self.cl.add_sec_service_to_share_network(sn["id"],
ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# Try add same security service one more time
self.assertRaises(exceptions.BadRequest,
self.cl.add_sec_service_to_share_network,
sn["id"], ss["id"])

View File

@ -0,0 +1,81 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import exceptions
from tempest import test
class SecurityServicesNegativeTest(base.BaseSharesAdminTest):
@test.attr(type=['negative', ])
def test_try_create_security_service_with_empty_type(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_security_service, "")
@test.attr(type=['negative', ])
def test_try_create_security_service_with_wrong_type(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_security_service,
"wrong_type")
@test.attr(type=['negative', ])
def test_try_get_security_service_without_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_security_service, "")
@test.attr(type=['negative', ])
def test_try_get_security_service_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_security_service,
"wrong_id")
@test.attr(type=['negative', ])
def test_try_delete_security_service_without_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_security_service, "")
@test.attr(type=['negative', ])
def test_try_delete_security_service_with_wrong_type(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_security_service,
"wrong_id")
@test.attr(type=['negative', ])
def test_try_update_nonexistant_security_service(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.update_security_service,
"wrong_id", name="name")
@test.attr(type=['negative', ])
def test_try_update_security_service_with_empty_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.update_security_service,
"", name="name")
@test.attr(type=['negative', ])
def test_get_deleted_security_service(self):
data = self.generate_security_service_data()
resp, ss = self.create_security_service(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, ss)
resp, __ = self.shares_client.delete_security_service(ss["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# try get deleted security service entity
self.assertRaises(exceptions.NotFound,
self.shares_client.get_security_service,
ss["id"])

View File

@ -0,0 +1,87 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import test
class ShareNetworksTest(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(ShareNetworksTest, cls).setUpClass()
cls.data = cls.generate_share_network_data()
_, cls.sn = cls.create_share_network(**cls.data)
@test.attr(type=['positive', ])
def test_create_delete_share_network(self):
# generate data for share network
data = self.generate_share_network_data()
# create share network
resp, created = self.shares_client.create_share_network(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, created)
# Delete share_network
resp, __ = self.shares_client.delete_share_network(created["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_get_share_network(self):
resp, get = self.shares_client.get_share_network(self.sn["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(self.data, get)
@test.attr(type=['positive', ])
def test_update_share_network(self):
update_data = self.generate_share_network_data()
resp, updated = self.shares_client.update_share_network(self.sn["id"],
**update_data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(update_data, updated)
@test.attr(type=['positive', ])
def test_list_share_networks(self):
resp, listed = self.shares_client.list_share_networks()
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
any(self.sn["id"] in sn["id"] for sn in listed)
# verify keys
keys = ["name", "id", "status"]
[self.assertIn(key, sn.keys()) for sn in listed for key in keys]
@test.attr(type=['positive', ])
def test_recreate_share_network(self):
# generate data for share network
data = self.generate_share_network_data()
# create share network
resp, sn1 = self.shares_client.create_share_network(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, sn1)
# Delete first share network
resp, __ = self.shares_client.delete_share_network(sn1["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# create second share network with same data
resp, sn2 = self.shares_client.create_share_network(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, sn2)
# Delete second share network
resp, __ = self.shares_client.delete_share_network(sn2["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)

View File

@ -0,0 +1,97 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import exceptions
from tempest import test
class ShareNetworksNegativeTest(base.BaseSharesAdminTest):
@test.attr(type=['negative', ])
def test_try_get_share_network_without_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_share_network, "")
@test.attr(type=['negative', ])
def test_try_get_share_network_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_share_network, "wrong_id")
@test.attr(type=['negative', ])
def test_try_delete_share_network_without_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_share_network, "")
@test.attr(type=['negative', ])
def test_try_delete_share_network_with_wrong_type(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_share_network, "wrong_id")
@test.attr(type=['negative', ])
def test_try_update_nonexistant_share_network(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.update_share_network,
"wrong_id", name="name")
@test.attr(type=['negative', ])
def test_try_update_share_network_with_empty_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.update_share_network,
"", name="name")
@test.attr(type=['negative', ])
def test_try_get_deleted_share_network(self):
data = self.generate_share_network_data()
resp, sn = self.create_share_network(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, sn)
resp, __ = self.shares_client.delete_share_network(sn["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# try get deleted share network entity
self.assertRaises(exceptions.NotFound,
self.shares_client.get_security_service,
sn["id"])
@test.attr(type=['negative', ])
def test_try_create_duplicate_of_share_network(self):
data = self.generate_share_network_data()
resp, sn = self.create_share_network(**data)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data, sn)
# try create duplicate of share network entity
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share_network,
**data)
@test.attr(type=['negative', ])
def test_try_create_duplicate_of_share_network_via_update(self):
data1 = self.generate_share_network_data()
resp, sn1 = self.create_share_network(**data1)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data1, sn1)
data2 = self.generate_share_network_data()
resp, sn2 = self.create_share_network(**data2)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertDictContainsSubset(data2, sn2)
# try create duplicate of share network entity via update
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_share_network,
sn2["id"], **data1)

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,9 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients_shares as clients
from tempest.common import isolated_creds
from tempest import config_shares as config
from tempest import clients_share as clients
from tempest.common.utils import data_utils
from tempest import config_share as config
from tempest import exceptions
from tempest import test
@ -25,73 +23,77 @@ CONF = config.CONF
class BaseSharesTest(test.BaseTestCase):
"""Base test case class for all Manila API tests."""
_interface = "json"
resources_of_tests = []
protocols = ["nfs", "cifs"]
@classmethod
def setUpClass(cls):
if not CONF.service_available.manila:
skip_msg = "Manila not available"
if not any(p in CONF.share.enable_protocols for p in cls.protocols):
skip_msg = "Manila is disabled"
raise cls.skipException(skip_msg)
super(BaseSharesTest, cls).setUpClass()
cls.isolated_creds = isolated_creds.IsolatedCreds(cls.__name__)
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_primary_creds()
username, tenant_name, password = creds
cls.os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
else:
cls.os = clients.Manager(interface=cls._interface)
cls.os = clients.Manager(interface=cls._interface)
cls.shares_client = cls.os.shares_client
cls.build_interval = CONF.shares.build_interval
cls.build_timeout = CONF.shares.build_timeout
@classmethod
def tearDownClass(cls):
super(BaseSharesTest, cls).tearDownClass()
cls.isolated_creds.clear_isolated_creds()
cls.clear_resources()
@classmethod
def create_share_wait_for_active(cls,
share_protocol=None,
size=1,
name=None,
snapshot_id=None,
description="tempests share",
metadata={},
client=None):
def create_share_wait_for_active(cls, share_protocol=None, size=1,
name=None, snapshot_id=None,
description=None, metadata={},
share_network_id=None, client=None):
if client is None:
client = cls.shares_client
if description is None:
description = "Tempest's share"
r, s = client.create_share(share_protocol=share_protocol, size=size,
name=name, snapshot_id=snapshot_id,
description=description,
metadata=metadata)
metadata=metadata,
share_network_id=share_network_id)
resource = {"type": "share", "body": s, "deleted": False}
cls.resources_of_tests.insert(0, resource) # last in first out (LIFO)
client.wait_for_share_status(s["id"], "available")
return r, s
@classmethod
def create_snapshot_wait_for_active(cls,
share_id,
name=None,
description="tempests share-ss",
force=False,
def create_snapshot_wait_for_active(cls, share_id, name=None,
description=None, force=False,
client=None):
if client is None:
client = cls.shares_client
if description is None:
description = "Tempest's snapshot"
r, s = client.create_snapshot(share_id, name, description, force)
resource = {"type": "snapshot", "body": s, "deleted": False}
cls.resources_of_tests.insert(0, resource) # last in first out (LIFO)
client.wait_for_snapshot_status(s["id"], "available")
return r, s
@classmethod
def create_share_network(cls, client=None, **kwargs):
if client is None:
client = cls.shares_client
resp, sn = client.create_share_network(**kwargs)
resource = {"type": "share_network", "body": sn, "deleted": False}
cls.resources_of_tests.insert(0, resource) # last in first out (LIFO)
return resp, sn
@classmethod
def create_security_service(cls, ss_type="ldap", client=None, **kwargs):
if client is None:
client = cls.shares_client
resp, ss = client.create_security_service(ss_type, **kwargs)
resource = {"type": "security_service", "body": ss, "deleted": False}
cls.resources_of_tests.insert(0, resource) # last in first out (LIFO)
return resp, ss
@classmethod
def clear_resources(cls, client=None):
if client is None:
@ -105,14 +107,40 @@ class BaseSharesTest(test.BaseTestCase):
client.delete_share(res["body"]['id'])
elif res["type"] is "snapshot":
client.delete_snapshot(res["body"]['id'])
cls.resources_of_tests[index]["deleted"] = True
elif res["type"] is "share_network":
client.delete_share_network(res["body"]['id'])
elif res["type"] is "security_service":
client.delete_security_service(res["body"]['id'])
except exceptions.NotFound:
pass
cls.resources_of_tests[index]["deleted"] = True
client.wait_for_resource_deletion(res["body"]['id'])
@classmethod
def generate_share_network_data(self):
data = {
"name": data_utils.rand_name("sn-name"),
"description": data_utils.rand_name("sn-desc"),
"neutron_net_id": data_utils.rand_name("net-id"),
"neutron_subnet_id": data_utils.rand_name("subnet-id"),
}
return data
@classmethod
def generate_security_service_data(self):
data = {
"name": data_utils.rand_name("ss-name"),
"description": data_utils.rand_name("ss-desc"),
"dns_ip": data_utils.rand_name("ss-dns_ip"),
"server": data_utils.rand_name("ss-server"),
"domain": data_utils.rand_name("ss-domain"),
"sid": data_utils.rand_name("ss-sid"),
"password": data_utils.rand_name("ss-password"),
}
return data
class BaseSharesAdminTest(BaseSharesTest):
"""Base test case class for all Shares Admin API tests."""
@classmethod
@ -125,13 +153,6 @@ class BaseSharesAdminTest(BaseSharesTest):
msg = ("Missing Shares Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = creds
cls.os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
interface=cls._interface)
else:
cls.os_adm = clients.AdminManager(interface=cls._interface)
cls.os_adm = clients.AdminManager(interface=cls._interface)
cls.shares_client = cls.os_adm.shares_client
cls.shares_client.share_network_id = CONF.share.admin_share_network_id

View File

@ -0,0 +1,31 @@
# Copyright 2014 mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import test
class ExtensionsTest(base.BaseSharesTest):
@test.attr(type=["smoke", "gate"])
def test_extensions(self):
# get extensions
resp, extensions = self.shares_client.list_extensions()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
keys = ["alias", "updated", "namespace", "name", "description"]
[self.assertIn(key, ext.keys()) for ext in extensions for key in keys]

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,15 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.api.share import base
from tempest import test
class SharesMetadataTestJSON(base.BaseSharesTest):
class SharesMetadataTest(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(SharesMetadataTestJSON, cls).setUpClass()
super(SharesMetadataTest, cls).setUpClass()
_, cls.share = cls.create_share_wait_for_active()
@test.attr(type=['positive', ])
@ -147,7 +145,3 @@ class SharesMetadataTestJSON(base.BaseSharesTest):
resp, body = self.shares_client.update_all_metadata(self.share["id"],
{"key": max_value})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
class SharesMetadataTestXML(SharesMetadataTestJSON):
_interface = 'xml'

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,16 +13,16 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.api.share import base
from tempest import exceptions
from tempest import test
class SharesMetadataNegativeTestJSON(base.BaseSharesTest):
class SharesMetadataNegativeTest(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(SharesMetadataNegativeTestJSON, cls).setUpClass()
super(SharesMetadataNegativeTest, cls).setUpClass()
_, cls.share = cls.create_share_wait_for_active()
@test.attr(type=['negative', ])
@ -90,7 +88,3 @@ class SharesMetadataNegativeTestJSON(base.BaseSharesTest):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_metadata,
self.share["id"], "wrong_key")
class SharesMetadataNegativeTestXML(SharesMetadataNegativeTestJSON):
_interface = 'xml'

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,24 +13,32 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.api.share import base
from tempest import config_share as config
from tempest import exceptions
from tempest import test
CONF = config.CONF
class ShareRulesTestJSON(base.BaseSharesTest):
class ShareIpRulesForNFSTest(base.BaseSharesTest):
protocol = "nfs"
@classmethod
def setUpClass(cls):
super(ShareRulesTestJSON, cls).setUpClass()
_, cls.share = cls.create_share_wait_for_active()
super(ShareIpRulesForNFSTest, cls).setUpClass()
if (cls.protocol not in CONF.share.enable_protocols or
cls.protocol not in CONF.share.enable_ip_rules_for_protocols):
msg = "IP rule tests for %s protocol are disabled" % cls.protocol
raise cls.skipException(msg)
__, cls.share = cls.create_share_wait_for_active(cls.protocol)
@test.attr(type='positive')
@test.attr(type=["gate", ])
def test_create_delete_access_rules_with_one_ip(self):
# test data
access_type = "ip"
access_to = "1.2.3.4"
access_to = "1.1.1.1"
# create rule
resp, rule = self.shares_client.create_access_rule(self.share["id"],
@ -45,9 +51,9 @@ class ShareRulesTestJSON(base.BaseSharesTest):
# delete rule
resp, _ = self.shares_client.delete_access_rule(self.share["id"],
rule["id"])
self.assertIn(int(resp["status"]), [200, 202])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type='positive')
@test.attr(type=["gate", ])
def test_create_delete_access_rule_with_cidr(self):
# test data
@ -67,17 +73,85 @@ class ShareRulesTestJSON(base.BaseSharesTest):
rule["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type='positive')
def test_list_access_rules(self):
class ShareIpRulesForCIFSTest(ShareIpRulesForNFSTest):
protocol = "cifs"
class ShareSidRulesForNFSTest(base.BaseSharesTest):
protocol = "nfs"
@classmethod
def setUpClass(cls):
super(ShareSidRulesForNFSTest, cls).setUpClass()
if (cls.protocol not in CONF.share.enable_protocols or
cls.protocol not in CONF.share.enable_sid_rules_for_protocols):
msg = "SID rule tests for %s protocol are disabled" % cls.protocol
raise cls.skipException(msg)
__, cls.share = cls.create_share_wait_for_active(cls.protocol)
@test.attr(type=["gate", ])
def test_create_delete_sid_rule(self):
# test data
access_type = "ip"
access_to = "1.2.3.4"
access_type = "sid"
access_to = CONF.share.username_for_sid_rules
# create rule
resp, rule = self.shares_client.create_access_rule(self.share["id"],
access_type,
access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
rule["id"],
"active")
# delete rule
resp, _ = self.shares_client.delete_access_rule(self.share["id"],
rule["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
class ShareSidRulesForCIFSTest(ShareSidRulesForNFSTest):
protocol = "cifs"
class ShareRulesTest(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(ShareRulesTest, cls).setUpClass()
if not (any(p in CONF.share.enable_ip_rules_for_protocols
for p in cls.protocols) or
any(p in CONF.share.enable_sid_rules_for_protocols
for p in cls.protocols)):
cls.message = "Rule tests are disabled"
raise cls.skipException(cls.message)
__, cls.share = cls.create_share_wait_for_active()
def setUp(self):
# Here we choose protocol and rule type for
# testing common rules functionality,
# that isn't dependent on protocol or rule type.
super(ShareRulesTest, self).setUp()
if CONF.share.enable_ip_rules_for_protocols:
self.access_type = "ip"
self.access_to = "8.8.8.8"
protocol = CONF.share.enable_ip_rules_for_protocols[0]
elif CONF.share.enable_sid_rules_for_protocols:
self.access_type = "sid"
self.access_to = CONF.share.username_for_sid_rules
protocol = CONF.share.enable_sid_rules_for_protocols[0]
else:
raise self.skipException(self.message)
self.shares_client.protocol = protocol
@test.attr(type=["gate", ])
def test_list_access_rules(self):
# create rule
resp, rule = self.shares_client.create_access_rule(self.share["id"],
self.access_type,
self.access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
@ -97,28 +171,24 @@ class ShareRulesTestJSON(base.BaseSharesTest):
# verify values
self.assertEqual("active", rules[0]["state"])
self.assertEqual(access_type, rules[0]["access_type"])
self.assertEqual(access_to, rules[0]["access_to"])
self.assertEqual(self.access_type, rules[0]["access_type"])
self.assertEqual(self.access_to, rules[0]["access_to"])
# our share id in list and have no duplicates
gen = [r["id"] for r in rules if r["id"] in rule["id"]]
msg = "expected id lists %s times in rule list" % (len(gen))
self.assertEquals(len(gen), 1, msg)
self.assertEqual(len(gen), 1, msg)
@test.attr(type='positive')
@test.attr(type=["gate", ])
def test_access_rules_deleted_if_share_deleted(self):
# test data
access_type = "ip"
access_to = "1.2.3.0/24"
# create share
resp, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
# create rule
resp, rule = self.shares_client.create_access_rule(share["id"],
access_type,
access_to)
self.access_type,
self.access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(share["id"], rule["id"],
"active")
@ -132,7 +202,3 @@ class ShareRulesTestJSON(base.BaseSharesTest):
self.assertRaises(exceptions.NotFound,
self.shares_client.list_access_rules,
share['id'])
class ShareRulesTestXML(ShareRulesTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,219 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.share import base
from tempest import config_share as config
from tempest import exceptions
from tempest import test
CONF = config.CONF
class ShareIpRulesForNFSNegativeTest(base.BaseSharesTest):
protocol = "nfs"
@classmethod
def setUpClass(cls):
super(ShareIpRulesForNFSNegativeTest, cls).setUpClass()
if not (cls.protocol in CONF.share.enable_protocols and
cls.protocol in CONF.share.enable_ip_rules_for_protocols):
msg = "IP rule tests for %s protocol are disabled" % cls.protocol
raise cls.skipException(msg)
# create share
__, cls.share = cls.create_share_wait_for_active(cls.protocol)
# create snapshot
__, cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_1(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.256")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_2(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.1.1.-")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_3(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.4/33")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_4(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.*")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_5(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.*/23")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_6(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1|23")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_7(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1/-1")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_target_8(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1/")
@test.attr(type=["negative", "gate", ])
def test_create_duplicate_of_ip_rule(self):
# test data
access_type = "ip"
access_to = "1.2.3.4"
# create rule
resp, rule = self.shares_client.create_access_rule(self.share["id"],
access_type,
access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
rule["id"],
"active")
# try create duplicate of rule
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], access_type, access_to)
class ShareIpRulesForCIFSNegativeTest(ShareIpRulesForNFSNegativeTest):
protocol = "cifs"
class ShareSidRulesForNFSNegativeTest(base.BaseSharesTest):
protocol = "nfs"
@classmethod
def setUpClass(cls):
super(ShareSidRulesForNFSNegativeTest, cls).setUpClass()
if not (cls.protocol in CONF.share.enable_protocols and
cls.protocol in CONF.share.enable_sid_rules_for_protocols):
msg = "SID rule tests for %s protocol are disabled" % cls.protocol
raise cls.skipException(msg)
# create share
__, cls.share = cls.create_share_wait_for_active(cls.protocol)
# create snapshot
__, cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_sid_with_wrong_input_2(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "sid",
"try+")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_sid_with_empty_key(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "sid", "")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_sid_with_too_little_key(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "sid", "abc")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_sid_with_too_big_key(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "sid", "a" * 33)
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_sid_with_wrong_input_1(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "sid",
"try+")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_sid_to_snapshot(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
self.snap["id"],
access_type="sid",
access_to="fakeuser")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_sid_with_wrong_share_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
"wrong_share_id",
access_type="sid",
access_to="fakeuser")
class ShareSidRulesForCIFSNegativeTest(ShareSidRulesForNFSNegativeTest):
protocol = "cifs"
class ShareRulesNegativeTest(base.BaseSharesTest):
# Tests independent from rule type and share protocol
@classmethod
def setUpClass(cls):
super(ShareRulesNegativeTest, cls).setUpClass()
if not (any(p in CONF.share.enable_ip_rules_for_protocols
for p in cls.protocols) or
any(p in CONF.share.enable_sid_rules_for_protocols
for p in cls.protocols)):
cls.message = "Rule tests are disabled"
raise cls.skipException(cls.message)
# create share
__, cls.share = cls.create_share_wait_for_active()
# create snapshot
__, cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
@test.attr(type=["negative", "gate", ])
def test_delete_access_rule_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_access_rule,
self.share["id"], "wrong_rule_id")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_type(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "wrong_type", "1.2.3.4")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_with_wrong_share_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
"wrong_share_id")
@test.attr(type=["negative", "gate", ])
def test_create_access_rule_ip_to_snapshot(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
self.snap["id"])

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 mirantis Inc.
# All Rights Reserved.
#
@ -15,23 +13,35 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.common.utils.data_utils import rand_name
from tempest.api.share import base
from tempest.common.utils import data_utils
from tempest import config_share as config
from tempest import exceptions
from tempest import test
CONF = config.CONF
class SharesTestJSON(base.BaseSharesTest):
class SharesNFSTest(base.BaseSharesTest):
"""Covers share functionality, that is related to NFS share type."""
protocol = "nfs"
@classmethod
def setUpClass(cls):
super(SharesNFSTest, cls).setUpClass()
if cls.protocol not in CONF.share.enable_protocols:
message = "%s tests are disabled" % cls.protocol
raise cls.skipException(message)
def tearDown(self):
super(SharesTestJSON, self).tearDown()
super(SharesNFSTest, self).tearDown()
self.clear_resources()
@test.attr(type=['positive', ])
@test.attr(type=["gate", ])
def test_create_delete_share(self):
# create share
resp, share = self.create_share_wait_for_active()
resp, share = self.create_share_wait_for_active(self.protocol)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# delete share
@ -42,18 +52,70 @@ class SharesTestJSON(base.BaseSharesTest):
self.shares_client.get_share,
share['id'])
@test.attr(type=['positive', ])
@test.attr(type=["gate", ])
def test_create_delete_snapshot(self):
# create share
__, share = self.create_share_wait_for_active(self.protocol)
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# delete snapshot
resp, __ = self.shares_client.delete_snapshot(snap["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_resource_deletion(snap["id"])
self.assertRaises(exceptions.NotFound,
self.shares_client.get_snapshot, snap['id'])
@test.attr(type=["gate", "smoke", ])
def test_create_share_from_snapshot(self):
# create share
__, share = self.create_share_wait_for_active(
share_protocol=self.protocol)
# create snapshot
__, snap = self.create_snapshot_wait_for_active(share["id"])
# crate share from snapshot
resp, s2 = self.create_share_wait_for_active(self.protocol,
snapshot_id=snap["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify share, created from snapshot
resp, get = self.shares_client.get_share(s2["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
msg = "Expected snapshot_id %s as "\
"source of share %s" % (snap["id"], get["snapshot_id"])
self.assertEqual(get["snapshot_id"], snap["id"], msg)
class SharesCIFSTest(SharesNFSTest):
"""Covers share functionality, that is related to CIFS share type."""
protocol = "cifs"
class SharesTest(base.BaseSharesTest):
"""Covers share functionality, that doesn't related to share type."""
def tearDown(self):
super(SharesTest, self).tearDown()
self.clear_resources()
@test.attr(type=["gate", ])
def test_get_share(self):
# test data
name = rand_name("rand-share-name-")
desc = rand_name("rand-share-description-")
name = data_utils.rand_name("tempest-share-name")
desc = data_utils.rand_name("tempest-share-description")
size = 1
# create share
resp, share = self.create_share_wait_for_active(name=name,
description=desc,
size=size)
__, share = self.create_share_wait_for_active(name=name,
description=desc,
size=size)
# get share
resp, share = self.shares_client.get_share(share['id'])
@ -79,11 +141,11 @@ class SharesTestJSON(base.BaseSharesTest):
msg = "Expected size: '%s', actual size: '%s'" % (size, share["size"])
self.assertEqual(size, int(share["size"]), msg)
@test.attr(type=['positive', ])
@test.attr(type=["gate", ])
def test_list_shares(self):
# create share
resp, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
# list shares
resp, shares = self.shares_client.list_shares()
@ -100,11 +162,11 @@ class SharesTestJSON(base.BaseSharesTest):
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEqual(len(gen), 1, msg)
@test.attr(type=['positive', 'gate'])
@test.attr(type=["gate", ])
def test_list_shares_with_detail(self):
# create share
resp, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
# list shares
resp, shares = self.shares_client.list_shares_with_detail()
@ -123,32 +185,17 @@ class SharesTestJSON(base.BaseSharesTest):
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEqual(len(gen), 1, msg)
@test.attr(type=['positive', ])
def test_create_delete_snapshot(self):
# create share
resp, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
# delete snapshot
self.shares_client.delete_snapshot(snap["id"])
self.shares_client.wait_for_resource_deletion(snap["id"])
self.assertRaises(exceptions.NotFound,
self.shares_client.get_snapshot, snap['id'])
@test.attr(type=['positive', ])
@test.attr(type=["gate", ])
def test_get_snapshot(self):
# create share
resp, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
#create snapshot
name = rand_name("tempest-snap-")
desc = rand_name("tempest-snap-description-")
resp, snap = self.create_snapshot_wait_for_active(share["id"],
name, desc)
name = data_utils.rand_name("tempest-snap-")
desc = data_utils.rand_name("tempest-snap-description-")
__, snap = self.create_snapshot_wait_for_active(share["id"],
name, desc)
# get snapshot
resp, get = self.shares_client.get_snapshot(snap["id"])
@ -175,14 +222,14 @@ class SharesTestJSON(base.BaseSharesTest):
"actual share_id: '%s'" % (name, get["share_id"])
self.assertEqual(share["id"], get["share_id"], msg)
@test.attr(type=['positive', ])
@test.attr(type=["gate", ])
def test_list_snapshots(self):
# create share
resp, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
#create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
__, snap = self.create_snapshot_wait_for_active(share["id"])
# list share snapshots
resp, snaps = self.shares_client.list_snapshots()
@ -197,16 +244,16 @@ class SharesTestJSON(base.BaseSharesTest):
# our share id in list and have no duplicates
gen = [sid["id"] for sid in snaps if sid["id"] in snap["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(1, len(gen), msg)
self.assertEqual(1, len(gen), msg)
@test.attr(type=['positive', 'gate'])
@test.attr(type=["gate", ])
def test_list_snapshots_with_detail(self):
# create share
resp, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
__, snap = self.create_snapshot_wait_for_active(share["id"])
# list share snapshots
resp, snaps = self.shares_client.list_snapshots_with_detail()
@ -225,45 +272,15 @@ class SharesTestJSON(base.BaseSharesTest):
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEqual(len(gen), 1, msg)
@test.attr(type=['positive', 'smoke', 'gate'])
def test_create_share_from_snapshot(self):
# create share
resp, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
# crate share from snapshot
resp, s2 = self.create_share_wait_for_active(snapshot_id=snap["id"])
# verify share, created from snapshot
resp, get = self.shares_client.get_share(s2["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
msg = "Expected snapshot_id %s as "\
"source of share %s" % (snap["id"], get["snapshot_id"])
self.assertEqual(get["snapshot_id"], snap["id"], msg)
@test.attr(type=['positive', 'smoke', 'gate'])
def test_extensions(self):
# get extensions
resp, extensions = self.shares_client.list_extensions()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
keys = ['alias', 'updated', 'namespace', 'name', 'description']
[self.assertIn(key, ext.keys()) for ext in extensions for key in keys]
@test.attr(type=['positive', ])
@test.attr(type=["gate", ])
def test_rename_share(self):
# create share
_, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
# rename share
new_name = rand_name("new_name_")
new_desc = rand_name("new_desc_")
new_name = data_utils.rand_name("tempest-new-name")
new_desc = data_utils.rand_name("tempest-new-description")
resp, renamed = self.shares_client.rename(share["id"],
new_name,
new_desc)
@ -271,25 +288,21 @@ class SharesTestJSON(base.BaseSharesTest):
self.assertEqual(new_name, renamed["name"])
self.assertEqual(new_desc, renamed["description"])
@test.attr(type=['positive', ])
@test.attr(type=["gate", ])
def test_rename_snapshot(self):
# create share
_, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
# create snapshot
_, snap = self.create_snapshot_wait_for_active(share["id"])
__, snap = self.create_snapshot_wait_for_active(share["id"])
# rename snapshot
new_name = rand_name("new_name_for_snap_")
new_desc = rand_name("new_desc_for_snap_")
new_name = data_utils.rand_name("tempest-new-name-for-snapshot")
new_desc = data_utils.rand_name("tempest-new-description-for-snapshot")
resp, renamed = self.shares_client.rename_snapshot(snap["id"],
new_name,
new_desc)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(new_name, renamed["name"])
self.assertEqual(new_desc, renamed["description"])
class SharesTestXML(SharesTestJSON):
_interface = 'xml'

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -15,84 +13,84 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.api.share import base
from tempest import exceptions
from tempest import exceptions_shares
from tempest import exceptions_share
from tempest import test
class SharesNegativeTestJSON(base.BaseSharesTest):
class SharesNegativeTest(base.BaseSharesTest):
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_create_share_with_invalid_protocol(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share,
share_protocol="nonexistent_protocol")
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_get_share_with_wrong_id(self):
self.assertRaises(exceptions.NotFound, self.shares_client.get_share,
"wrong_share_id")
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_get_share_without_passing_share_id(self):
# Should not be able to get share when empty ID is passed
self.assertRaises(exceptions.NotFound,
self.shares_client.get_share, '')
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_delete_share_with_wrong_id(self):
self.assertRaises(exceptions.NotFound, self.shares_client.delete_share,
"wrong_share_id")
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_delete_share_without_passing_share_id(self):
# Should not be able to delete share when empty ID is passed
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_share, '')
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_create_snapshot_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_snapshot,
"wrong_share_id")
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_delete_snapshot_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_snapshot,
"wrong_share_id")
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_create_share_with_invalid_size(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share, size="#$%")
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_create_share_with_out_passing_size(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share, size="")
@test.attr(type='negative')
@test.attr(type=["negative", "smoke", "gate", ])
def test_create_share_with_zero_size(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share, size=0)
@test.attr(type='negative')
@test.attr(type=["negative", "gate", ])
def test_try_delete_share_with_existing_snapshot(self):
# share can not be deleted while snapshot exists
# create share
resp, share = self.create_share_wait_for_active()
__, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
self.create_snapshot_wait_for_active(share["id"])
# try delete share
self.assertRaises(exceptions.Unauthorized,
self.shares_client.delete_share, share["id"])
@test.attr(type='negative')
@test.attr(type=["negative", "gate", ])
def test_create_share_from_snap_with_less_size(self):
# requires minimum 5Gb available space
@ -100,7 +98,7 @@ class SharesNegativeTestJSON(base.BaseSharesTest):
try: # create share
_, share = self.create_share_wait_for_active(size=2)
except exceptions_shares.ShareBuildErrorException:
except exceptions_share.ShareBuildErrorException:
self.skip(skip_msg)
try: # create snapshot
@ -113,6 +111,8 @@ class SharesNegativeTestJSON(base.BaseSharesTest):
self.create_share_wait_for_active,
size=1, snapshot_id=snap["id"])
class SharesNegativeTestXML(SharesNegativeTestJSON):
_interface = 'xml'
@test.attr(type=["negative", "smoke", "gate", ])
def test_create_share_with_nonexistant_share_network(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_share,
share_network_id="wrong_sn_id")

View File

@ -1,109 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import exceptions
from tempest import test
class ShareRulesNegativeTestJSON(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(ShareRulesNegativeTestJSON, cls).setUpClass()
# create share
_, cls.share = cls.create_share_wait_for_active()
# create snapshot
_, cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_share_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
"wrong_share_id")
@test.attr(type='negative')
def test_delete_access_rule_ip_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_access_rule,
self.share["id"], "wrong_rule_id")
@test.attr(type='negative')
def test_create_try_access_rule_ip_to_snapshot(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
self.snap["id"])
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_type(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "wrong_type", "1.2.3.4")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_1(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.256")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_2(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.1.1.-")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_3(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.4/33")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_4(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.*")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_5(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.*/23")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_6(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1|23")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_7(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1/-1")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_8(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1/")
class ShareRulesNegativeTestXML(ShareRulesNegativeTestJSON):
_interface = 'xml'

View File

@ -1,159 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import clients_shares as clients
from tempest import config_shares as config
from tempest import exceptions
from tempest import test
CONF = config.CONF
class SharesSecurityNegativeTestJSON(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(SharesSecurityNegativeTestJSON, cls).setUpClass()
if not CONF.shares.only_admin_or_owner_for_action:
skip_msg = "Disabled from tempest configuration"
raise cls.skipException(skip_msg)
cls.client = cls.shares_client
cls.alt_client = clients.AltManager().shares_client
_, cls.share = cls.create_share_wait_for_active()
_, cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_for_share_list(self):
# list shares
__, shares = self.client.list_shares()
# our share id is in list and have no duplicates
gen = [sid["id"] for sid in shares if sid["id"] in self.share["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 1, msg)
# list shares from another tenant
__, alt_shares = self.alt_client.list_shares()
# our share id is not in list
gen = [s["id"] for s in alt_shares if s["id"] in self.share["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 0, msg)
@test.attr(type='negative')
def test_tenant_isolation_share_delete(self):
# try delete share from another tenant
self.assertRaises(exceptions.Unauthorized,
self.alt_client.delete_share,
self.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_get(self):
# try delete share from another tenant
self.assertRaises(exceptions.Unauthorized,
self.alt_client.get_share, self.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_for_share_snapshot_list(self):
# list share snapshots
__, snaps = self.client.list_snapshots()
# our share id is in list and have no duplicates
gen = [sid["id"] for sid in snaps if sid["id"] in self.snap["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 1, msg)
# list shares from another tenant
__, alt_snaps = self.alt_client.list_snapshots()
# our snapshot id is not in list
gen = [sid["id"] for sid in alt_snaps if sid["id"] in self.snap["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 0, msg)
@test.attr(type='negative')
def test_tenant_isolation_share_snapshot_delete(self):
# try delete share from another tenant
self.assertRaises(exceptions.NotFound,
self.alt_client.delete_snapshot, self.snap["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_snapshot_get(self):
# try delete share from another tenant
self.assertRaises(exceptions.NotFound,
self.alt_client.get_snapshot, self.snap["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_access_list(self):
# try list share rules
self.assertRaises(exceptions.Unauthorized, # NotFound or Unauthorized
self.alt_client.list_access_rules,
self.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_access_rule_delete(self):
# create rule
resp, rule = self.client.create_access_rule(self.share["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
rule["id"],
"active")
# try delete rule
self.assertRaises(exceptions.Unauthorized, # NotFound or Unauthorized
self.alt_client.delete_access_rule,
self.share["id"], rule["id"])
@test.attr(type='negative')
def test_create_snapshot_from_alien_share(self):
# try create snapshot in another tenant
self.assertRaises(exceptions.Unauthorized, # NotFound or Unauthorized
self.create_snapshot_wait_for_active,
share_id=self.share["id"],
client=self.alt_client)
@test.attr(type='negative')
def test_create_share_from_alien_snapshot(self):
# try create share in another tenant from snap
self.assertRaises(exceptions.NotFound, # NotFound or Unauthorized
self.create_share_wait_for_active,
snapshot_id=self.snap["id"],
client=self.alt_client)
@test.attr(type='negative')
def test_create_access_rule_to_alien_share(self):
# try create access rule from another tenant
self.assertRaises(exceptions.Unauthorized,
self.alt_client.create_access_rule,
self.share["id"],
access_to="1.1.1.1")
# There is no need to perform security tests twice
#class SharesSecurityNegativeTestXML(SharesSecurityNegativeTestJSON):
# _interface = 'xml'

View File

@ -19,7 +19,7 @@ import re
import subprocess
from tempest.cli import manilaclient
from tempest import config_shares as config
from tempest import config_share as config
CONF = config.CONF
@ -69,16 +69,12 @@ class SimpleReadOnlyManilaClientTest(manilaclient.ClientTestBase):
def test_manila_quota_defaults(self):
"""This CLI can accept and string as param."""
roles = self.parser.listing(self.manila('quota-defaults',
params=self.identity.
admin_tenant_name))
roles = self.parser.listing(self.manila('quota-defaults'))
self.assertTableStruct(roles, ['Property', 'Value'])
def test_manila_quota_show(self):
"""This CLI can accept and string as param."""
roles = self.parser.listing(self.manila('quota-show',
params=self.identity.
admin_tenant_name))
roles = self.parser.listing(self.manila('quota-show'))
self.assertTableStruct(roles, ['Property', 'Value'])
def test_manila_rate_limits(self):
@ -122,7 +118,7 @@ class SimpleReadOnlyManilaClientTest(manilaclient.ClientTestBase):
commands = set(commands)
wanted_commands = set(('absolute-limits', 'list', 'help',
'quota-show', 'access-list', 'snapshot-list',
'allow-access', 'deny-access'))
'access-allow', 'access-deny'))
self.assertFalse(wanted_commands - commands)
# Optional arguments:
@ -137,4 +133,4 @@ class SimpleReadOnlyManilaClientTest(manilaclient.ClientTestBase):
self.manila('list', flags='--retries 3')
def test_manila_region_list(self):
self.manila('list', flags='--os-region-name ' + self.identity.region)
self.manila('list', flags='--os-region-name ' + CONF.identity.region)

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -16,10 +14,9 @@
# under the License.
from tempest import clients
from tempest import config_shares as config
from tempest import exceptions
from tempest.services.shares.json import shares_client as j_shares_client
from tempest.services.shares.xml import shares_client as x_shares_client
from tempest import config_share as config
from tempest.services.share.json import shares_client
CONF = config.CONF
@ -31,28 +28,15 @@ class Manager(clients.Manager):
"""
def __init__(self, username=None, password=None, tenant_name=None,
interface='json'):
interface='json', service=None):
super(Manager, self).__init__(username, password, tenant_name,
interface)
client_args = (CONF, self.username, self.password,
self.auth_url, self.tenant_name)
if interface == 'xml':
self.shares_client = x_shares_client.SharesClientXML(*client_args)
elif interface == 'json':
self.shares_client = j_shares_client.SharesClientJSON(*client_args)
else:
msg = "Unsupported interface type `%s'" % interface
raise exceptions.InvalidConfiguration(msg)
interface, service)
auth_provider = self.get_auth_provider(self.credentials)
if interface == 'json':
self.shares_client = shares_client.SharesClient(auth_provider)
class AltManager(Manager):
"""
Manager object that uses the alt_XXX credentials for its
managed client objects
"""
def __init__(self, interface='json'):
super(AltManager, self).__init__(CONF.identity.alt_username,
CONF.identity.alt_password,
@ -61,12 +45,6 @@ class AltManager(Manager):
class AdminManager(Manager):
"""
Manager object that uses the admin credentials for its
managed client objects
"""
def __init__(self, interface='json'):
super(AdminManager, self).__init__(CONF.identity.admin_username,
CONF.identity.admin_password,

View File

@ -0,0 +1,102 @@
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from oslo.config import cfg
from tempest import config
service_available_group = cfg.OptGroup(name="service_available",
title="Available OpenStack Services")
ServiceAvailableGroup = [
cfg.BoolOpt("manila",
default=True,
help="Whether or not manila is expected to be available"),
]
share_group = cfg.OptGroup(name="share", title="Share Service Options")
ShareGroup = [
cfg.StrOpt("catalog_type",
default="share",
help="Catalog type of the Share service."),
cfg.StrOpt('endpoint_type',
default='publicURL',
choices=['public', 'admin', 'internal',
'publicURL', 'adminURL', 'internalURL'],
help="The endpoint type to use for the share service."),
cfg.ListOpt("enable_protocols",
default=["nfs", "cifs"],
help="First value of list is protocol by default, "
"items of list show enabled protocols at all."),
cfg.ListOpt("enable_ip_rules_for_protocols",
default=["nfs", ],
help="Selection of protocols, that should "
"be covered with ip rule tests"),
cfg.ListOpt("enable_sid_rules_for_protocols",
default=[],
help="Selection of protocols, that should "
"be covered with sid rule tests"),
cfg.StrOpt("username_for_sid_rules",
default="Administrator",
help="Username, that will be used in sid tests. "
"In case of active directory it should be existed"),
cfg.StrOpt("share_network_id",
default="",
help="Some backend drivers requires share network "
"for share creation. Share network id, that will be "
"used for shares. If not set, it won't be used."),
cfg.StrOpt("alt_share_network_id",
default="",
help="Share network id, that will be used for shares"
" in alt tenant. If not set, it won't be used"),
cfg.StrOpt("admin_share_network_id",
default="",
help="Share network id, that will be used for shares"
" in admin tenant. If not set, it won't be used"),
cfg.IntOpt("build_interval",
default=10,
help="Time in seconds between volume availability checks."),
cfg.IntOpt("build_timeout",
default=300,
help="Timeout in seconds to wait for a volume to become"
"available."),
]
class TempestConfigPrivateManila(config.TempestConfigPrivate):
# manila's config wrap over standard config
def __init__(self, parse_conf=True):
super(TempestConfigPrivateManila, self).__init__()
config.register_opt_group(cfg.CONF, service_available_group,
ServiceAvailableGroup)
config.register_opt_group(cfg.CONF, share_group, ShareGroup)
self.share = cfg.CONF.share
class TempestConfigProxyManila(object):
_config = None
def __getattr__(self, attr):
if not self._config:
self._config = TempestConfigPrivateManila()
return getattr(self._config, attr)
CONF = TempestConfigProxyManila()

View File

@ -1,78 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from oslo.config import cfg
from tempest import config
service_available_group = cfg.OptGroup(name="service_available",
title="Available OpenStack Services")
ServiceAvailableGroup = [
cfg.BoolOpt('manila',
default=True,
help="Whether or not manila is expected to be available"),
]
shares_group = cfg.OptGroup(name="shares",
title="Shares Service Options")
SharesGroup = [
cfg.StrOpt('share_protocol',
default="nfs",
help="File share type by default"),
cfg.IntOpt('build_interval',
default=10,
help='Time in seconds between volume availability checks.'),
cfg.IntOpt('build_timeout',
default=300,
help='Timeout in seconds to wait for a volume to become'
'available.'),
cfg.StrOpt('catalog_type',
default="share",
help='Catalog type of the Shares service.'),
cfg.BoolOpt('only_admin_or_owner_for_action',
default=True,
help='This flag use tests that verify policy.json rules'),
]
# this should never be called outside of this class
class TempestConfigPrivateManila(config.TempestConfigPrivate):
# manila's config wrap over standard config
def __init__(self, parse_conf=True):
super(TempestConfigPrivateManila, self).__init__()
config.register_opt_group(cfg.CONF, service_available_group,
ServiceAvailableGroup)
config.register_opt_group(cfg.CONF, shares_group, SharesGroup)
self.shares = cfg.CONF.shares
class TempestConfigProxyManila(object):
_config = None
def __getattr__(self, attr):
if not self._config:
self._config = TempestConfigPrivateManila()
return getattr(self._config, attr)
CONF = TempestConfigProxyManila()

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
@ -24,3 +22,7 @@ class ShareBuildErrorException(exceptions.TempestException):
class AccessRuleBuildErrorException(exceptions.TempestException):
message = "Share's rule with id %(rule_id) is in ERROR status"
class ShareProtocolNotSpecified(exceptions.TempestException):
message = "Share can not be created, share protocol is not specified"

View File

@ -1,4 +1,5 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -16,46 +17,47 @@ import json
from tempest.common import rest_client
from tempest.common.utils.data_utils import rand_name
from tempest import config_share as config
from tempest import exceptions
from tempest import exceptions_shares
from tempest import exceptions_share
import time
import urllib
CONF = config.CONF
class SharesClientJSON(rest_client.RestClient):
"""
Tempest REST client for Manila.
It handles shares and access to it in openstack.
class SharesClient(rest_client.RestClient):
"""Tempest REST client for Manila.
It handles shares and access to it in OpenStack.
"""
def __init__(self, config, username, password, auth_url, tenant_name=None):
super(SharesClientJSON, self).__init__(config, username, password,
auth_url, tenant_name)
self.service = self.config.shares.catalog_type # share
self.share_protocol = self.config.shares.share_protocol
self.build_interval = self.config.shares.build_interval
self.build_timeout = self.config.shares.build_timeout
def __init__(self, auth_provider):
super(SharesClient, self).__init__(auth_provider)
self.service = CONF.share.catalog_type
self.share_protocol = None
if CONF.share.enable_protocols:
self.share_protocol = CONF.share.enable_protocols[0]
self.share_network_id = CONF.share.share_network_id
self.build_interval = CONF.share.build_interval
self.build_timeout = CONF.share.build_timeout
self.auth_params = auth_provider._auth_params()
self.tenant_name = tenant_name
self.username = username
def _parse_resp(self, body):
if len(body) > 0:
body = json.loads(body)
if len(body) is 1 and isinstance(body.items()[0][1], (dict, list)):
return body[body.items()[0][0]]
return body
def _get_endpoint_type(self, service):
# This is workaround for rest_client, that uses main config
return CONF.share.endpoint_type
def create_share(self, share_protocol=None, size=1,
name=None, snapshot_id=None,
description="tempest created share",
metadata={}):
metadata={}, share_network_id=None):
if name is None:
name = rand_name("tempest-created-share-")
name = rand_name("tempest-created-share")
if share_protocol is None:
share_protocol = self.share_protocol
if share_protocol is None:
raise exceptions.ShareProtocolNotSpecified()
post_body = {
"share": {
"share_proto": share_protocol,
@ -66,29 +68,31 @@ class SharesClientJSON(rest_client.RestClient):
"metadata": metadata
}
}
if share_network_id is not None:
post_body["share"]["share_network_id"] = share_network_id
elif self.share_network_id:
post_body["share"]["share_network_id"] = self.share_network_id
body = json.dumps(post_body)
resp, body = self.post("shares", body, self.headers)
resp, body = self.post("shares", body)
return resp, self._parse_resp(body)
def delete_share(self, share_id):
resp, body = self.delete("shares/%s" % share_id, self.headers)
return resp, self._parse_resp(body)
return self.delete("shares/%s" % share_id)
def list_shares(self):
resp, body = self.get("shares", self.headers)
resp, body = self.get("shares")
return resp, self._parse_resp(body)
def list_shares_with_detail(self, params=None):
"""List the details of all shares."""
url = 'shares/detail'
uri = 'shares/detail'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url, self.headers)
uri += '?%s' % urllib.urlencode(params)
resp, body = self.get(uri)
return resp, self._parse_resp(body)
def get_share(self, share_id):
uri = "shares/%s" % share_id
resp, body = self.get(uri, self.headers)
resp, body = self.get("shares/%s" % share_id)
return resp, self._parse_resp(body)
def create_access_rule(self, share_id,
@ -100,14 +104,12 @@ class SharesClientJSON(rest_client.RestClient):
}
}
body = json.dumps(post_body)
uri = "shares/%s/action" % share_id
resp, body = self.post(uri, body, self.headers)
resp, body = self.post("shares/%s/action" % share_id, body)
return resp, self._parse_resp(body)
def list_access_rules(self, share_id):
uri = "shares/%s/action" % share_id
body = {"os-access_list": None}
resp, body = self.post(uri, json.dumps(body), self.headers)
resp, body = self.post("shares/%s/action" % share_id, json.dumps(body))
return resp, self._parse_resp(body)
def delete_access_rule(self, share_id, rule_id):
@ -117,14 +119,13 @@ class SharesClientJSON(rest_client.RestClient):
}
}
body = json.dumps(post_body)
uri = "shares/%s/action" % share_id
return self.post(uri, body, self.headers)
return self.post("shares/%s/action" % share_id, body)
def create_snapshot(self, share_id, name=None,
description="tempest created share-ss",
force=False):
if name is None:
name = rand_name("tempest-created-share-snap-")
name = rand_name("tempest-created-share-snap")
post_body = {
"snapshot": {
"name": name,
@ -134,30 +135,27 @@ class SharesClientJSON(rest_client.RestClient):
}
}
body = json.dumps(post_body)
resp, body = self.post("snapshots", body, self.headers)
resp, body = self.post("snapshots", body)
return resp, self._parse_resp(body)
def get_snapshot(self, snapshot_id):
uri = "snapshots/%s" % snapshot_id
resp, body = self.get(uri, self.headers)
resp, body = self.get("snapshots/%s" % snapshot_id)
return resp, self._parse_resp(body)
def list_snapshots(self):
resp, body = self.get("snapshots", self.headers)
resp, body = self.get("snapshots")
return resp, self._parse_resp(body)
def list_snapshots_with_detail(self, params=None):
"""List the details of all shares."""
url = 'snapshots/detail'
uri = 'snapshots/detail'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url, self.headers)
uri += '?%s' % urllib.urlencode(params)
resp, body = self.get(uri)
return resp, self._parse_resp(body)
def delete_snapshot(self, snap_id):
uri = "snapshots/%s" % snap_id
resp, body = self.delete(uri, self.headers)
return resp, self._parse_resp(body)
return self.delete("snapshots/%s" % snap_id)
def wait_for_share_status(self, share_id, status):
"""Waits for a Share to reach a given status."""
@ -171,7 +169,7 @@ class SharesClientJSON(rest_client.RestClient):
resp, body = self.get_share(share_id)
share_status = body['status']
if 'error' in share_status:
raise exceptions_shares.\
raise exceptions_share.\
ShareBuildErrorException(share_id=share_id)
if int(time.time()) - start >= self.build_timeout:
@ -213,7 +211,7 @@ class SharesClientJSON(rest_client.RestClient):
rule_status = rule['state']
break
if 'error' in rule_status:
raise exceptions_shares.\
raise exceptions_share.\
AccessRuleBuildErrorException(rule_id=rule_id)
if int(time.time()) - start >= self.build_timeout:
@ -223,23 +221,21 @@ class SharesClientJSON(rest_client.RestClient):
raise exceptions.TimeoutException(message)
def default_quotas(self, tenant_id):
uri = "os-quota-sets/%s/defaults" % tenant_id
resp, body = self.get(uri, self.headers)
resp, body = self.get("os-quota-sets/%s/defaults" % tenant_id)
return resp, self._parse_resp(body)
def show_quotas(self, tenant_id, user_id=None):
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % (user_id)
resp, body = self.get(uri, self.headers)
uri += "?user_id=%s" % user_id
resp, body = self.get(uri)
return resp, self._parse_resp(body)
def reset_quotas(self, tenant_id, user_id=None):
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % user_id
resp, body = self.delete(uri, self.headers)
return resp, self._parse_resp(body)
return self.delete(uri)
def update_quotas(self, tenant_id, user_id=None,
shares=None, snapshots=None,
@ -258,11 +254,11 @@ class SharesClientJSON(rest_client.RestClient):
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % user_id
resp, body = self.put(uri, put_body, self.headers)
resp, body = self.put(uri, put_body)
return resp, self._parse_resp(body)
def get_limits(self):
resp, body = self.get("limits", self.headers)
resp, body = self.get("limits")
return resp, self._parse_resp(body)
def is_resource_deleted(self, s_id, rule_id=None):
@ -283,25 +279,23 @@ class SharesClientJSON(rest_client.RestClient):
return True
def list_extensions(self):
resp, extensions = self.get("extensions", self.headers)
resp, extensions = self.get("extensions")
return resp, self._parse_resp(extensions)
def rename(self, share_id, name, desc=None):
uri = "shares/%s" % share_id
body = {"share": {"display_name": name}}
if desc is not None:
body["share"].update({"display_description": desc})
body = json.dumps(body)
resp, body = self.put(uri, body, self.headers)
resp, body = self.put("shares/%s" % share_id, body)
return resp, self._parse_resp(body)
def rename_snapshot(self, snapshot_id, name, desc=None):
uri = "snapshots/%s" % snapshot_id
body = {"snapshot": {"display_name": name}}
if desc is not None:
body["snapshot"].update({"display_description": desc})
body = json.dumps(body)
resp, body = self.put(uri, body, self.headers)
resp, body = self.put("snapshots/%s" % snapshot_id, body)
return resp, self._parse_resp(body)
def reset_state(self, s_id, status="error", s_type="shares"):
@ -310,11 +304,9 @@ class SharesClientJSON(rest_client.RestClient):
status: available, error, creating, deleting, error_deleting
s_type: shares, snapshots
"""
uri = "%s/%s/action" % (s_type, s_id)
body = {"os-reset_status": {"status": status}}
body = json.dumps(body)
resp, body = self.post(uri, body, self.headers)
return resp, self._parse_resp(body)
return self.post("%s/%s/action" % (s_type, s_id), body)
###############
@ -323,9 +315,9 @@ class SharesClientJSON(rest_client.RestClient):
post_body = {"metadata": metadata}
body = json.dumps(post_body)
if method is "post":
resp, metadata = self.post(uri, body, self.headers)
resp, metadata = self.post(uri, body)
if method is "put":
resp, metadata = self.put(uri, body, self.headers)
resp, metadata = self.put(uri, body)
return resp, self._parse_resp(metadata)
def set_metadata(self, share_id, metadata={}):
@ -335,11 +327,93 @@ class SharesClientJSON(rest_client.RestClient):
return self._update_metadata(share_id, metadata, method="put")
def delete_metadata(self, share_id, key):
uri = "shares/%s/metadata/%s" % (share_id, key)
resp, body = self.delete(uri, self.headers)
return resp, self._parse_resp(body)
return self.delete("shares/%s/metadata/%s" % (share_id, key))
def get_metadata(self, share_id):
uri = "shares/%s/metadata" % share_id
resp, body = self.get(uri, self.headers)
resp, body = self.get("shares/%s/metadata" % share_id)
return resp, self._parse_resp(body)
###############
def create_security_service(self, ss_type="ldap", **kwargs):
# ss_type: ldap, kerberos, active_directory
# kwargs: name, description, dns_ip, server, domain, sid, password
post_body = {"type": ss_type}
post_body.update(kwargs)
body = json.dumps({"security_service": post_body})
resp, body = self.post("security-services", body)
return resp, self._parse_resp(body)
def update_security_service(self, ss_id, **kwargs):
# ss_id - id of security-service entity
# kwargs: dns_ip, server, domain, sid, password, name, description
# for 'active' status can be changed
# only 'name' and 'description' fields
body = json.dumps({"security_service": kwargs})
resp, body = self.put("security-services/%s" % ss_id, body)
return resp, self._parse_resp(body)
def get_security_service(self, ss_id):
resp, body = self.get("security-services/%s" % ss_id)
return resp, self._parse_resp(body)
def list_security_services(self):
resp, body = self.get("security-services")
return resp, self._parse_resp(body)
def delete_security_service(self, ss_id):
return self.delete("security-services/%s" % ss_id)
###############
def create_share_network(self, **kwargs):
# kwargs: name, description
#+ for neutron: neutron_net_id, neutron_subnet_id
body = json.dumps({"share_network": kwargs})
resp, body = self.post("share-networks", body)
return resp, self._parse_resp(body)
def update_share_network(self, sn_id, **kwargs):
# kwargs: name, description
#+ for neutron: neutron_net_id, neutron_subnet_id
body = json.dumps({"share_network": kwargs})
resp, body = self.put("share-networks/%s" % sn_id, body)
return resp, self._parse_resp(body)
def get_share_network(self, sn_id):
resp, body = self.get("share-networks/%s" % sn_id)
return resp, self._parse_resp(body)
def list_share_networks(self):
resp, body = self.get("share-networks")
return resp, self._parse_resp(body)
def delete_share_network(self, sn_id):
return self.delete("share-networks/%s" % sn_id)
###############
def _map_security_service_and_share_network(self, sn_id, ss_id,
action="add"):
# sn_id: id of share_network_entity
# ss_id: id of security service entity
# action: add, remove
data = {
"%s_security_service" % action: {
"security_service_id": ss_id
}
}
body = json.dumps(data)
resp, body = self.post("share-networks/%s/action" % sn_id, body)
return resp, self._parse_resp(body)
def add_sec_service_to_share_network(self, sn_id, ss_id):
return self._map_security_service_and_share_network(sn_id, ss_id)
def remove_sec_service_from_share_network(self, sn_id, ss_id):
return self._map_security_service_and_share_network(sn_id, ss_id,
"remove")
def list_sec_services_for_share_network(self, sn_id):
resp, body = self.get("security-services?share_network_id=%s" % sn_id)
return resp, self._parse_resp(body)

View File

@ -1,204 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from lxml import etree
from tempest.common.utils.data_utils import rand_name
from tempest.services.compute.xml import common as xml
from tempest.services.shares.json import shares_client
class SharesClientXML(shares_client.SharesClientJSON):
"""
Tempest REST client for Manila.
It handles shares and access to it in openstack.
"""
def __init__(self, config, username, password, auth_url, tenant_name=None):
super(SharesClientXML, self).__init__(config, username, password,
auth_url, tenant_name)
self.TYPE = "xml" # from RestClientXML
self.headers["Content-Type"] = "application/%s" % self.TYPE
self.headers["Accept"] = "application/%s" % self.TYPE
def _parse_resp(self, body): # from RestClientXML
if len(body) > 0:
element = etree.fromstring(body)
entity_list = ["shares", "snapshots", "extensions", "access_list"]
if "metadata" in element.tag:
dictionary = {}
for el in element.getchildren():
dictionary[u"%s" % el.get("key")] = u"%s" % el.text
return dictionary
elif any(s in element.tag for s in entity_list):
s_list = []
if element is not None:
s_list += [xml.xml_to_json(sh) for sh in list(element)]
return s_list
else:
return xml.xml_to_json(element)
return body
def is_absolute_limit(self, resp, resp_body): # from RestClientXML
if (not isinstance(resp_body, collections.Mapping) or
'retry-after' not in resp):
return True
return 'exceed' in resp_body.get('message', 'blabla')
def create_share(self, share_protocol=None,
size=1, name=None, snapshot_id=None,
description="tempest created share",
metadata={}):
if name is None:
name = rand_name("tempest-created-share-")
if share_protocol is None:
share_protocol = self.share_protocol
share = xml.Element("share", xmlns=xml.XMLNS_11)
share.append(xml.Element("share_proto", share_protocol))
if description is not None:
share.append(xml.Element("description", description))
if snapshot_id is not None:
share.append(xml.Element("snapshot_id", snapshot_id))
share.append(xml.Element("name", name))
share.append(xml.Element("size", size))
metadata_el = xml.Element("metadata")
for key, value in metadata.iteritems():
metadata_el.append(xml.Element(key, value))
share.append(metadata_el)
resp, body = self.post('shares', str(xml.Document(share)),
self.headers)
return resp, self._parse_resp(body)
def create_access_rule(self, share_id, access_type="ip",
access_to="0.0.0.0"):
rule = xml.Element("os-allow_access", xmlns=xml.XMLNS_11)
rule.append(xml.Element("access_type", access_type))
rule.append(xml.Element("access_to", access_to))
uri = "shares/%s/action" % (share_id)
resp, body = self.post(uri, str(xml.Document(rule)), self.headers)
return resp, self._parse_resp(body)
def list_access_rules(self, share_id):
uri = "shares/%s/action" % (share_id)
access_list = xml.Element("os-access_list",
xmlns=xml.XMLNS_11,
value=None)
resp, body = self.post(uri, str(xml.Document(access_list)),
self.headers)
return resp, self._parse_resp(body)
def delete_access_rule(self, share_id, rule_id):
rule = xml.Element("os-deny_access", xmlns=xml.XMLNS_11)
rule.append(xml.Element("access_id", rule_id))
uri = "shares/%s/action" % share_id
return self.post(uri, str(xml.Document(rule)), self.headers)
def create_snapshot(self, share_id, name=None,
description="tempest created share-ss", force=False):
if name is None:
name = rand_name("tempest-created-share-snap-")
snap = xml.Element("snapshot", xmlns=xml.XMLNS_11)
snap.append(xml.Element("name", name))
snap.append(xml.Element("force", force))
snap.append(xml.Element("description", description))
snap.append(xml.Element("share_id", share_id))
resp, body = self.post('snapshots', str(xml.Document(snap)),
self.headers)
return resp, self._parse_resp(body)
def update_quotas(self, tenant_id=None, user_id=None,
shares=None, snapshots=None, gigabytes=None,
force=True):
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % user_id
upd = xml.Element("quota_set", id=tenant_id)
if force:
upd.append(xml.Element("force", "true"))
if shares is not None:
upd.append(xml.Element("shares", shares))
if snapshots is not None:
upd.append(xml.Element("snapshots", snapshots))
if gigabytes is not None:
upd.append(xml.Element("gigabytes", gigabytes))
resp, body = self.put(uri, str(xml.Document(upd)), self.headers)
return resp, self._parse_resp(body)
def get_limits(self):
resp, element = self.get("limits", self.headers)
element = etree.fromstring(element)
limits = {"rate": [], "absolute": {}}
for abs_el in element.getchildren():
if "absolute" in abs_el.tag:
element = abs_el
break
for child in element.getchildren():
limit = {}
for key, value in child.attrib.iteritems():
limit[key] = value
limits["absolute"][limit["name"]] = limit["value"]
return resp, limits
def rename(self, share_id, name, desc=None):
uri = "shares/%s" % share_id
share = xml.Element("share", xmlns=xml.XMLNS_11)
share.append(xml.Element("display_name", name))
if desc is not None:
share.append(xml.Element("display_description", desc))
resp, body = self.put(uri, str(xml.Document(share)), self.headers)
return resp, self._parse_resp(body)
def rename_snapshot(self, snapshot_id, name, desc=None):
uri = "snapshots/%s" % snapshot_id
snap = xml.Element("snapshot", xmlns=xml.XMLNS_11)
snap.append(xml.Element("display_name", name))
if desc is not None:
snap.append(xml.Element("display_description", desc))
resp, body = self.put(uri, str(xml.Document(snap)), self.headers)
return resp, self._parse_resp(body)
def reset_state(self, s_id, status="error", s_type="shares"):
"""
Resets the state of a share or a snapshot
status: available, error, creating, deleting, error_deleting
s_type: shares, snapshots
"""
uri = "%s/%s/action" % (s_type, s_id)
body = xml.Element("os-reset_status", xmlns=xml.XMLNS_11)
body.append(xml.Element("status", status))
resp, body = self.post(uri, str(xml.Document(body)), self.headers)
return resp, self._parse_resp(body)
def _update_metadata(self, share_id, metadata={}, method="post"):
uri = "shares/%s/metadata" % (str(share_id))
metadata_el = xml.Element("metadata")
for key, value in metadata.iteritems():
metadata_el.append(xml.Element("meta", value, key=key))
meta_str = str(xml.Document(metadata_el))
if method is "post":
resp, body = self.post(uri, meta_str, self.headers)
elif method is "put":
resp, body = self.put(uri, meta_str, self.headers)
metas = self._parse_resp(body)
return resp, metas