Add manila's tempest-plugin

Adds plugin for tempest as for devstack
Purpose - run tempest in ci job for manila project

Partially implements bp: tempest-job

Change-Id: I7a24a8fe6ca44f75d74e0c941dc25e561e79f81a
This commit is contained in:
vponomaryov 2014-01-14 15:46:34 +02:00
parent 55f42ea27c
commit a2240d911a
25 changed files with 2721 additions and 0 deletions

View File

@ -0,0 +1,16 @@
====================
Tempest Integration
====================
This directory contains the files necessary for tempest to cover Manila project.
To install:
$ TEMPEST_DIR=/path/to/tempest
$ cp tempest/* ${TEMPEST_DIR}
notes:
These files based on tempest master branch (pre-icehouse), it is pluggable-like files without requirements to change core tempest files. But the way of its pluggability is work-around for tempest, which hasn't pluggable functionality for exceptions, config and clients modules.

View File

@ -0,0 +1,55 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import test
class AdminActionsTestJSON(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(AdminActionsTestJSON, cls).setUpClass()
# create share (available or error)
cls.share_states = ["error", "available"]
__, cls.sh = cls.create_share_wait_for_active()
# create snapshot (available or error)
cls.snapshot_states = ["error", "available"]
__, cls.sn = cls.create_snapshot_wait_for_active(cls.sh["id"])
@test.attr(type=['positive', ])
def test_reset_share_state(self):
for status in self.share_states:
resp, __ = self.shares_client.reset_state(self.sh["id"],
status=status)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_share_status(self.sh["id"], status)
@test.attr(type=['positive', ])
def test_reset_snapshot_state_to_error(self):
for status in self.snapshot_states:
resp, __ = self.shares_client.reset_state(self.sn["id"],
s_type="snapshots",
status=status)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_snapshot_status(self.sn["id"], status)
class AdminActionsTestXML(AdminActionsTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,84 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import clients_shares as clients
from tempest import config_shares as config
from tempest import exceptions
from tempest import test
import testtools
CONF = config.CONF
class AdminActionsNegativeTestJSON(base.BaseSharesAdminTest):
@classmethod
def setUpClass(cls):
super(AdminActionsNegativeTestJSON, cls).setUpClass()
# create share (available or error)
__, cls.sh = cls.create_share_wait_for_active()
# create snapshot (available or error)
__, cls.sn = cls.create_snapshot_wait_for_active(cls.sh["id"])
cls.member_shares_client = clients.Manager().shares_client
@test.attr(type=['negative', ])
def test_reset_unexistant_share_state(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.reset_state, "fake")
@test.attr(type=['negative', ])
def test_reset_unexistant_snapshot_state(self):
self.assertRaises(exceptions.NotFound, self.shares_client.reset_state,
"fake", s_type="snapshots")
@test.attr(type=['negative', ])
def test_reset_share_state_to_unacceptable_state(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.reset_state,
self.sh["id"], status="fake")
@test.attr(type=['negative', ])
def test_reset_snapshot_state_to_unacceptable_state(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.reset_state,
self.sn["id"], s_type="snapshots", status="fake")
@testtools.skipIf(not CONF.shares.only_admin_or_owner_for_action,
"Skipped, because not only admin allowed")
@test.attr(type=['negative', ])
def test_try_reset_share_state_with_member(self):
# Even if member from another tenant, it should be unauthorized
self.assertRaises(exceptions.Unauthorized,
self.member_shares_client.reset_state,
self.sh["id"])
@testtools.skipIf(not CONF.shares.only_admin_or_owner_for_action,
"Skipped, because not only admin allowed")
@test.attr(type=['negative', ])
def test_try_reset_snapshot_state_with_member(self):
# Even if member from another tenant, it should be unauthorized
self.assertRaises(exceptions.Unauthorized,
self.member_shares_client.reset_state,
self.sn["id"], s_type="snapshots")
class AdminActionsNegativeTestXML(AdminActionsNegativeTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,261 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import test
class SharesQuotasTestJSON(base.BaseSharesAdminTest):
# Tests should be used without unlimited quotas (-1).
# It is recommended to delete all entities in Manila before test run.
@classmethod
def setUpClass(cls):
super(SharesQuotasTestJSON, cls).setUpClass()
cls.identity_client = cls._get_identity_admin_client()
cls.tenant = cls.identity_client\
.get_tenant_by_name(cls.shares_client.tenant_name)
cls.user = cls.identity_client\
.get_user_by_username(cls.tenant["id"],
cls.shares_client.username)
# save quotas before tests
__, cls.t_q = cls.shares_client.show_quotas(cls.tenant["id"])
__, cls.u_q = cls.shares_client.show_quotas(cls.tenant["id"],
cls.user["id"])
value = 1000
# set quotas before tests
cls.shares_client.update_quotas(cls.tenant["id"], shares=value,
snapshots=value, gigabytes=value)
cls.shares_client.update_quotas(cls.tenant["id"], cls.user["id"],
shares=value, snapshots=value,
gigabytes=value)
@classmethod
def tearDownClass(cls):
super(SharesQuotasTestJSON, cls).tearDownClass()
# back up quota values
cls.shares_client.update_quotas(cls.tenant["id"],
shares=cls.t_q["shares"],
snapshots=cls.t_q["snapshots"],
gigabytes=cls.t_q["gigabytes"])
cls.shares_client.update_quotas(cls.tenant["id"],
cls.user["id"],
shares=cls.u_q["shares"],
snapshots=cls.u_q["snapshots"],
gigabytes=cls.u_q["gigabytes"])
@test.attr(type=['positive', 'smoke'])
def test_limits_keys(self):
# list limits
resp, limits = self.shares_client.get_limits()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
keys = ["rate", "absolute"]
[self.assertIn(key, limits.keys()) for key in keys]
abs_keys = ["maxTotalShareGigabytes",
"maxTotalShares",
"maxTotalSnapshots"]
[self.assertIn(key, limits["absolute"].keys()) for key in abs_keys]
@test.attr(type=['positive', 'smoke'])
def test_limits_values(self):
# list limits
resp, limits = self.shares_client.get_limits()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify integer values for absolute limits
self.assertGreater(int(limits["absolute"]["maxTotalShareGigabytes"]),
-2)
self.assertGreater(int(limits["absolute"]["maxTotalShares"]), -2)
self.assertGreater(int(limits["absolute"]["maxTotalSnapshots"]), -2)
@test.attr(type='positive')
def test_default_quotas(self):
resp, quotas = self.shares_client.default_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertGreater(int(quotas["gigabytes"]), -2)
self.assertGreater(int(quotas["shares"]), -2)
self.assertGreater(int(quotas["snapshots"]), -2)
@test.attr(type=['positive', 'smoke'])
def test_show_quotas(self):
resp, quotas = self.shares_client.show_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertGreater(int(quotas["gigabytes"]), -2)
self.assertGreater(int(quotas["shares"]), -2)
self.assertGreater(int(quotas["snapshots"]), -2)
@test.attr(type=['positive', 'smoke'])
def test_show_quotas_for_user(self):
resp, quotas = self.shares_client.show_quotas(self.tenant["id"],
self.user["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertGreater(int(quotas["gigabytes"]), -2)
self.assertGreater(int(quotas["shares"]), -2)
self.assertGreater(int(quotas["snapshots"]), -2)
@test.attr(type='positive')
def test_default_quotas_with_empty_tenant_id(self):
# it should return default quotas without any tenant-id
resp, body = self.shares_client.default_quotas("")
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertTrue(len(body) > 0)
@test.attr(type='positive')
def test_update_tenant_quota_shares(self):
# get current quotas
resp, quotas = self.shares_client.show_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
new_quota = int(quotas["shares"]) + 2
# set new quota for shares
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
shares=new_quota)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["shares"]), new_quota)
@test.attr(type='positive')
def test_update_user_quota_shares(self):
# get current quotas
resp, quotas = self.shares_client.show_quotas(self.tenant["id"],
self.user["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
new_quota = int(quotas["shares"]) - 1
# set new quota for shares
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
shares=new_quota)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["shares"]), new_quota)
@test.attr(type='positive')
def test_update_tenant_quota_snapshots(self):
# get current quotas
resp, quotas = self.shares_client.show_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
new_quota = int(quotas["snapshots"]) + 2
# set new quota for snapshots
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
snapshots=new_quota)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["snapshots"]), new_quota)
@test.attr(type='positive')
def test_update_user_quota_snapshots(self):
# get current quotas
resp, quotas = self.shares_client.show_quotas(self.tenant["id"],
self.user["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
new_quota = int(quotas["snapshots"]) - 1
# set new quota for snapshots
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
snapshots=new_quota)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["snapshots"]), new_quota)
@test.attr(type='positive')
def test_update_tenant_quota_gigabytes(self):
# get current quotas
resp, custom = self.shares_client.show_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# make quotas for update
gigabytes = int(custom["gigabytes"]) + 2
# set new quota for shares
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
gigabytes=gigabytes)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["gigabytes"]), gigabytes)
@test.attr(type='positive')
def test_update_user_quota_gigabytes(self):
# get current quotas
resp, custom = self.shares_client.show_quotas(self.tenant["id"],
self.user["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# make quotas for update
gigabytes = int(custom["gigabytes"]) - 1
# set new quota for shares
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
gigabytes=gigabytes)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["gigabytes"]), gigabytes)
@test.attr(type='positive')
def test_reset_tenant_quotas(self):
# get default_quotas
resp, default = self.shares_client.default_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# get current quotas
resp, custom = self.shares_client.show_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# make quotas for update
shares = int(custom["shares"]) + 2
snapshots = int(custom["snapshots"]) + 2
gigabytes = int(custom["gigabytes"]) + 2
# set new quota
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
shares=shares,
snapshots=snapshots,
gigabytes=gigabytes)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(updated["shares"]), shares)
self.assertEqual(int(updated["snapshots"]), snapshots)
self.assertEqual(int(updated["gigabytes"]), gigabytes)
# reset customized quotas
resp, reseted = self.shares_client.reset_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify quotas
resp, after_delete = self.shares_client.show_quotas(self.tenant["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(int(after_delete["shares"]), int(default["shares"]))
self.assertEqual(int(after_delete["snapshots"]),
int(default["snapshots"]))
self.assertEqual(int(after_delete["gigabytes"]),
int(default["gigabytes"]))
class SharesQuotasTestXML(SharesQuotasTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,207 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import exceptions
from tempest import exceptions_shares
from tempest import test
import unittest
class SharesQuotasNegativeTestJSON(base.BaseSharesAdminTest):
# Tests should be used without unlimited quotas (-1).
# It is recommended to delete all entities in Manila before test run.
@classmethod
def setUpClass(cls):
super(SharesQuotasNegativeTestJSON, cls).setUpClass()
cls.identity_client = cls._get_identity_admin_client()
cls.tenant = cls.identity_client\
.get_tenant_by_name(cls.shares_client.tenant_name)
cls.user = cls.identity_client\
.get_user_by_username(cls.tenant["id"],
cls.shares_client.username)
# save quotas before tests
__, cls.t_q = cls.shares_client.show_quotas(cls.tenant["id"])
__, cls.u_q = cls.shares_client.show_quotas(cls.tenant["id"],
cls.user["id"])
value = 1000
# set quotas before tests
cls.shares_client.update_quotas(cls.tenant["id"], shares=value,
snapshots=value, gigabytes=value)
cls.shares_client.update_quotas(cls.tenant["id"], cls.user["id"],
shares=value, snapshots=value,
gigabytes=value)
@classmethod
def tearDownClass(cls):
super(SharesQuotasNegativeTestJSON, cls).tearDownClass()
# back up quota values
cls.shares_client.update_quotas(cls.tenant["id"],
shares=cls.t_q["shares"],
snapshots=cls.t_q["snapshots"],
gigabytes=cls.t_q["gigabytes"])
cls.shares_client.update_quotas(cls.tenant["id"],
cls.user["id"],
shares=cls.u_q["shares"],
snapshots=cls.u_q["snapshots"],
gigabytes=cls.u_q["gigabytes"])
@test.attr(type='negative')
@unittest.skip("Skip until Bug #1234244 is fixed")
def test_quotas_with_wrong_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_quotas, "wrong_tenant_id")
@test.attr(type='negative')
@unittest.skip("Skip until Bug #1234244 is fixed")
def test_quotas_with_wrong_user_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.get_quotas,
self.tenant["id"],
"wrong_user_id")
@test.attr(type='negative')
def test_quotas_with_empty_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.show_quotas, "")
@test.attr(type='negative')
@unittest.skip("Skip until Bug #1233170 is fixed")
def test_default_quotas_with_wrong_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.default_quotas, "wrong_tenant_id")
@test.attr(type='negative')
def test_reset_quotas_with_empty_tenant_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.reset_quotas, "")
@test.attr(type='negative')
def test_update_shares_quota_with_wrong_data(self):
# -1 is acceptable value as unlimited
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_quotas,
self.tenant["id"],
shares=-2)
@test.attr(type='negative')
def test_update_snapshots_quota_with_wrong_data(self):
# -1 is acceptable value as unlimited
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_quotas,
self.tenant["id"],
snapshots=-2)
@test.attr(type='negative')
def test_update_gigabytes_quota_with_wrong_data(self):
# -1 is acceptable value as unlimited
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_quotas,
self.tenant["id"],
gigabytes=-2)
@test.attr(type='negative')
def test_create_share_with_size_bigger_than_quota(self):
new_quota = 25
overquota = new_quota + 2
# set quota for gigabytes
resp, updated = self.shares_client.update_quotas(self.tenant["id"],
gigabytes=new_quota)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# try schedule share with size, bigger than gigabytes quota
self.assertRaises(exceptions.OverLimit,
self.create_share_wait_for_active,
size=overquota)
@test.attr(type='negative')
def test_unlimited_quota_for_gigabytes(self):
# get current quota
_, quotas = self.shares_client.show_quotas(self.tenant["id"])
# set unlimited quota for gigabytes
resp, __ = self.shares_client.update_quotas(self.tenant["id"],
gigabytes=-1)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
resp, __ = self.shares_client.update_quotas(self.tenant["id"],
self.user["id"],
gigabytes=-1)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# share should be scheduled
self.assertRaises(exceptions_shares.ShareBuildErrorException,
self.create_share_wait_for_active, size=987654)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# return quotas as it was
self.shares_client.update_quotas(self.tenant["id"],
gigabytes=quotas["gigabytes"])
self.shares_client.update_quotas(self.tenant["id"], self.user["id"],
gigabytes=quotas["gigabytes"])
@test.attr(type='negative')
def test_try_set_user_quota_gigabytes_bigger_than_tenant_quota(self):
# get current quotas for tenant
_, tenant_quotas = self.shares_client.show_quotas(self.tenant["id"])
# try set user quota for gigabytes bigger than tenant quota
bigger_value = int(tenant_quotas["gigabytes"]) + 2
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_quotas,
self.tenant["id"],
self.user["id"],
gigabytes=bigger_value)
@test.attr(type='negative')
def test_try_set_user_quota_shares_bigger_than_tenant_quota(self):
# get current quotas for tenant
_, tenant_quotas = self.shares_client.show_quotas(self.tenant["id"])
# try set user quota for shares bigger than tenant quota
bigger_value = int(tenant_quotas["shares"]) + 2
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_quotas,
self.tenant["id"],
self.user["id"],
shares=bigger_value)
@test.attr(type='negative')
def test_try_set_user_quota_snaps_bigger_than_tenant_quota(self):
# get current quotas for tenant
_, tenant_quotas = self.shares_client.show_quotas(self.tenant["id"])
# try set user quota for snapshots bigger than tenant quota
bigger_value = int(tenant_quotas["snapshots"]) + 2
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_quotas,
self.tenant["id"],
self.user["id"],
snapshots=bigger_value)
class SharesQuotasNegativeTestXML(SharesQuotasNegativeTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,137 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients_shares as clients
from tempest.common import isolated_creds
from tempest import config_shares as config
from tempest import exceptions
from tempest import test
CONF = config.CONF
class BaseSharesTest(test.BaseTestCase):
"""Base test case class for all Manila API tests."""
_interface = "json"
resources_of_tests = []
@classmethod
def setUpClass(cls):
if not CONF.service_available.manila:
skip_msg = "Manila not available"
raise cls.skipException(skip_msg)
super(BaseSharesTest, cls).setUpClass()
cls.isolated_creds = isolated_creds.IsolatedCreds(cls.__name__)
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_primary_creds()
username, tenant_name, password = creds
cls.os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
else:
cls.os = clients.Manager(interface=cls._interface)
cls.shares_client = cls.os.shares_client
cls.build_interval = CONF.shares.build_interval
cls.build_timeout = CONF.shares.build_timeout
@classmethod
def tearDownClass(cls):
super(BaseSharesTest, cls).tearDownClass()
cls.isolated_creds.clear_isolated_creds()
cls.clear_resources()
@classmethod
def create_share_wait_for_active(cls,
share_protocol=None,
size=1,
name=None,
snapshot_id=None,
description="tempests share",
metadata={},
client=None):
if client is None:
client = cls.shares_client
r, s = client.create_share(share_protocol=share_protocol, size=size,
name=name, snapshot_id=snapshot_id,
description=description,
metadata=metadata)
resource = {"type": "share", "body": s, "deleted": False}
cls.resources_of_tests.insert(0, resource) # last in first out (LIFO)
client.wait_for_share_status(s["id"], "available")
return r, s
@classmethod
def create_snapshot_wait_for_active(cls,
share_id,
name=None,
description="tempests share-ss",
force=False,
client=None):
if client is None:
client = cls.shares_client
r, s = client.create_snapshot(share_id, name, description, force)
resource = {"type": "snapshot", "body": s, "deleted": False}
cls.resources_of_tests.insert(0, resource) # last in first out (LIFO)
client.wait_for_snapshot_status(s["id"], "available")
return r, s
@classmethod
def clear_resources(cls, client=None):
if client is None:
client = cls.shares_client
# Here we expect, that all resources were added as LIFO
# due to restriction of deletion resources, that is in the chain
for index, res in enumerate(cls.resources_of_tests):
if not(res["deleted"]):
try:
if res["type"] is "share":
client.delete_share(res["body"]['id'])
elif res["type"] is "snapshot":
client.delete_snapshot(res["body"]['id'])
cls.resources_of_tests[index]["deleted"] = True
except exceptions.NotFound:
pass
client.wait_for_resource_deletion(res["body"]['id'])
class BaseSharesAdminTest(BaseSharesTest):
"""Base test case class for all Shares Admin API tests."""
@classmethod
def setUpClass(cls):
super(BaseSharesAdminTest, cls).setUpClass()
cls.adm_user = CONF.identity.admin_username
cls.adm_pass = CONF.identity.admin_password
cls.adm_tenant = CONF.identity.admin_tenant_name
if not all((cls.adm_user, cls.adm_pass, cls.adm_tenant)):
msg = ("Missing Shares Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
if CONF.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_admin_creds()
admin_username, admin_tenant_name, admin_password = creds
cls.os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
interface=cls._interface)
else:
cls.os_adm = clients.AdminManager(interface=cls._interface)
cls.shares_client = cls.os_adm.shares_client

View File

@ -0,0 +1,153 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import test
class SharesMetadataTestJSON(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(SharesMetadataTestJSON, cls).setUpClass()
_, cls.share = cls.create_share_wait_for_active()
@test.attr(type=['positive', ])
def test_set_metadata_in_share_creation(self):
md = {u"key1": u"value1", u"key2": u"value2", }
# create share with metadata
_, share = self.create_share_wait_for_active(metadata=md)
# get metadata of share
resp, metadata = self.shares_client.get_metadata(share["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify metadata
self.assertEqual(md, metadata)
@test.attr(type=['positive', ])
def test_set_get_delete_metadata(self):
md = {u"key3": u"value3", u"key4": u"value4", }
# create share
_, share = self.create_share_wait_for_active()
# set metadata
resp, set_md = self.shares_client.set_metadata(share["id"], md)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# read metadata
resp, get_md = self.shares_client.get_metadata(share["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify metadata
self.assertEqual(md, get_md)
# delete metadata
for key in md.keys():
resp, del_md = self.shares_client\
.delete_metadata(share["id"], key)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify deletion of metadata
resp, get_metadata = self.shares_client.get_metadata(share["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual({}, get_metadata)
@test.attr(type=['positive', ])
def test_set_and_update_metadata_by_key(self):
md1 = {u"key5": u"value5", u"key6": u"value6", }
md2 = {u"key7": u"value7", u"key8": u"value8", }
# create share
_, share = self.create_share_wait_for_active()
# set metadata
resp, set_md = self.shares_client.set_metadata(share["id"], md1)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# update metadata
resp, upd_md = self.shares_client\
.update_all_metadata(share["id"], md2)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# get metadata
resp, get_md = self.shares_client.get_metadata(share["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify metadata
self.assertEqual(md2, get_md)
@test.attr(type=['positive', ])
def test_set_metadata_min_size_key(self):
resp, min = self.shares_client.set_metadata(self.share["id"],
{"k": "value"})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_set_metadata_max_size_key(self):
max_key = "k" * 255
resp, max = self.shares_client.set_metadata(self.share["id"],
{max_key: "value"})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_set_metadata_min_size_value(self):
resp, min = self.shares_client.set_metadata(self.share["id"],
{"key": "v"})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_set_metadata_max_size_value(self):
max_value = "v" * 1023
resp, body = self.shares_client.set_metadata(self.share["id"],
{"key": max_value})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_upd_metadata_min_size_key(self):
resp, body = self.shares_client.update_all_metadata(self.share["id"],
{"k": "value"})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_upd_metadata_max_size_key(self):
max_key = "k" * 255
resp, body = self.shares_client.update_all_metadata(self.share["id"],
{max_key: "value"})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_upd_metadata_min_size_value(self):
resp, body = self.shares_client.update_all_metadata(self.share["id"],
{"key": "v"})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type=['positive', ])
def test_upd_metadata_max_size_value(self):
max_value = "v" * 1023
resp, body = self.shares_client.update_all_metadata(self.share["id"],
{"key": max_value})
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
class SharesMetadataTestXML(SharesMetadataTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,96 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import exceptions
from tempest import test
class SharesMetadataNegativeTestJSON(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(SharesMetadataNegativeTestJSON, cls).setUpClass()
_, cls.share = cls.create_share_wait_for_active()
@test.attr(type=['negative', ])
def test_try_set_metadata_to_unexisting_share(self):
md = {u"key1": u"value1", u"key2": u"value2", }
self.assertRaises(exceptions.NotFound,
self.shares_client.set_metadata,
"wrong_share_id", md)
@test.attr(type=['negative', ])
def test_try_update_all_metadata_for_unexisting_share(self):
md = {u"key1": u"value1", u"key2": u"value2", }
self.assertRaises(exceptions.NotFound,
self.shares_client.update_all_metadata,
"wrong_share_id", md)
@test.attr(type=['negative', ])
def test_try_set_metadata_with_empty_key(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.set_metadata,
self.share["id"], {"": "value"})
@test.attr(type=['negative', ])
def test_try_upd_metadata_with_empty_key(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_all_metadata,
self.share["id"], {"": "value"})
@test.attr(type=['negative', ])
def test_try_set_metadata_with_too_big_key(self):
too_big_key = "x" * 256
md = {too_big_key: "value"}
self.assertRaises(exceptions.BadRequest,
self.shares_client.set_metadata,
self.share["id"], md)
@test.attr(type=['negative', ])
def test_try_upd_metadata_with_too_big_key(self):
too_big_key = "x" * 256
md = {too_big_key: "value"}
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_all_metadata,
self.share["id"], md)
@test.attr(type=['negative', ])
def test_try_set_metadata_with_too_big_value(self):
too_big_value = "x" * 1024
md = {"key": too_big_value}
self.assertRaises(exceptions.BadRequest,
self.shares_client.set_metadata,
self.share["id"], md)
@test.attr(type=['negative', ])
def test_try_upd_metadata_with_too_big_value(self):
too_big_value = "x" * 1024
md = {"key": too_big_value}
self.assertRaises(exceptions.BadRequest,
self.shares_client.update_all_metadata,
self.share["id"], md)
@test.attr(type=['negative', ])
def test_try_delete_unexisting_metadata(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_metadata,
self.share["id"], "wrong_key")
class SharesMetadataNegativeTestXML(SharesMetadataNegativeTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,138 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import exceptions
from tempest import test
class ShareRulesTestJSON(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(ShareRulesTestJSON, cls).setUpClass()
_, cls.share = cls.create_share_wait_for_active()
@test.attr(type='positive')
def test_create_delete_access_rules_with_one_ip(self):
# test data
access_type = "ip"
access_to = "1.2.3.4"
# create rule
resp, rule = self.shares_client.create_access_rule(self.share["id"],
access_type,
access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
rule["id"],
"active")
# delete rule
resp, _ = self.shares_client.delete_access_rule(self.share["id"],
rule["id"])
self.assertIn(int(resp["status"]), [200, 202])
@test.attr(type='positive')
def test_create_delete_access_rule_with_cidr(self):
# test data
access_type = "ip"
access_to = "1.2.3.4/32"
# create rule
resp, rule = self.shares_client.create_access_rule(self.share["id"],
access_type,
access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
rule["id"],
"active")
# delete rule
resp, _ = self.shares_client.delete_access_rule(self.share["id"],
rule["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
@test.attr(type='positive')
def test_list_access_rules(self):
# test data
access_type = "ip"
access_to = "1.2.3.4"
# create rule
resp, rule = self.shares_client.create_access_rule(self.share["id"],
access_type,
access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
rule["id"],
"active")
# list rules
resp, rules = self.shares_client.list_access_rules(self.share["id"])
# verify response
msg = "We expected status 200, but got %s" % (str(resp["status"]))
self.assertEqual(200, int(resp["status"]), msg)
# verify keys
keys = ["state", "id", "access_type", "access_to"]
[self.assertIn(key, r.keys()) for r in rules for key in keys]
# verify values
self.assertEqual("active", rules[0]["state"])
self.assertEqual(access_type, rules[0]["access_type"])
self.assertEqual(access_to, rules[0]["access_to"])
# our share id in list and have no duplicates
gen = [r["id"] for r in rules if r["id"] in rule["id"]]
msg = "expected id lists %s times in rule list" % (len(gen))
self.assertEquals(len(gen), 1, msg)
@test.attr(type='positive')
def test_access_rules_deleted_if_share_deleted(self):
# test data
access_type = "ip"
access_to = "1.2.3.0/24"
# create share
resp, share = self.create_share_wait_for_active()
# create rule
resp, rule = self.shares_client.create_access_rule(share["id"],
access_type,
access_to)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(share["id"], rule["id"],
"active")
# delete share
resp, _ = self.shares_client.delete_share(share['id'])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_resource_deletion(share['id'])
# verify absence of rules for nonexistent share id
self.assertRaises(exceptions.NotFound,
self.shares_client.list_access_rules,
share['id'])
class ShareRulesTestXML(ShareRulesTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,109 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import exceptions
from tempest import test
class ShareRulesNegativeTestJSON(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(ShareRulesNegativeTestJSON, cls).setUpClass()
# create share
_, cls.share = cls.create_share_wait_for_active()
# create snapshot
_, cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_share_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
"wrong_share_id")
@test.attr(type='negative')
def test_delete_access_rule_ip_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_access_rule,
self.share["id"], "wrong_rule_id")
@test.attr(type='negative')
def test_create_try_access_rule_ip_to_snapshot(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_access_rule,
self.snap["id"])
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_type(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "wrong_type", "1.2.3.4")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_1(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.256")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_2(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.1.1.-")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_3(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.4/33")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_4(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.*")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_5(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.*/23")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_6(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1|23")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_7(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1/-1")
@test.attr(type='negative')
def test_create_access_rule_ip_with_wrong_target_8(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_access_rule,
self.share["id"], "ip", "1.2.3.1/")
class ShareRulesNegativeTestXML(ShareRulesNegativeTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,159 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import clients_shares as clients
from tempest import config_shares as config
from tempest import exceptions
from tempest import test
CONF = config.CONF
class SharesSecurityNegativeTestJSON(base.BaseSharesTest):
@classmethod
def setUpClass(cls):
super(SharesSecurityNegativeTestJSON, cls).setUpClass()
if not CONF.shares.only_admin_or_owner_for_action:
skip_msg = "Disabled from tempest configuration"
raise cls.skipException(skip_msg)
cls.client = cls.shares_client
cls.alt_client = clients.AltManager().shares_client
_, cls.share = cls.create_share_wait_for_active()
_, cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_for_share_list(self):
# list shares
__, shares = self.client.list_shares()
# our share id is in list and have no duplicates
gen = [sid["id"] for sid in shares if sid["id"] in self.share["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 1, msg)
# list shares from another tenant
__, alt_shares = self.alt_client.list_shares()
# our share id is not in list
gen = [s["id"] for s in alt_shares if s["id"] in self.share["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 0, msg)
@test.attr(type='negative')
def test_tenant_isolation_share_delete(self):
# try delete share from another tenant
self.assertRaises(exceptions.Unauthorized,
self.alt_client.delete_share,
self.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_get(self):
# try delete share from another tenant
self.assertRaises(exceptions.Unauthorized,
self.alt_client.get_share, self.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_for_share_snapshot_list(self):
# list share snapshots
__, snaps = self.client.list_snapshots()
# our share id is in list and have no duplicates
gen = [sid["id"] for sid in snaps if sid["id"] in self.snap["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 1, msg)
# list shares from another tenant
__, alt_snaps = self.alt_client.list_snapshots()
# our snapshot id is not in list
gen = [sid["id"] for sid in alt_snaps if sid["id"] in self.snap["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(len(gen), 0, msg)
@test.attr(type='negative')
def test_tenant_isolation_share_snapshot_delete(self):
# try delete share from another tenant
self.assertRaises(exceptions.NotFound,
self.alt_client.delete_snapshot, self.snap["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_snapshot_get(self):
# try delete share from another tenant
self.assertRaises(exceptions.NotFound,
self.alt_client.get_snapshot, self.snap["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_access_list(self):
# try list share rules
self.assertRaises(exceptions.Unauthorized, # NotFound or Unauthorized
self.alt_client.list_access_rules,
self.share["id"])
@test.attr(type='negative')
def test_tenant_isolation_share_access_rule_delete(self):
# create rule
resp, rule = self.client.create_access_rule(self.share["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_access_rule_status(self.share["id"],
rule["id"],
"active")
# try delete rule
self.assertRaises(exceptions.Unauthorized, # NotFound or Unauthorized
self.alt_client.delete_access_rule,
self.share["id"], rule["id"])
@test.attr(type='negative')
def test_create_snapshot_from_alien_share(self):
# try create snapshot in another tenant
self.assertRaises(exceptions.Unauthorized, # NotFound or Unauthorized
self.create_snapshot_wait_for_active,
share_id=self.share["id"],
client=self.alt_client)
@test.attr(type='negative')
def test_create_share_from_alien_snapshot(self):
# try create share in another tenant from snap
self.assertRaises(exceptions.NotFound, # NotFound or Unauthorized
self.create_share_wait_for_active,
snapshot_id=self.snap["id"],
client=self.alt_client)
@test.attr(type='negative')
def test_create_access_rule_to_alien_share(self):
# try create access rule from another tenant
self.assertRaises(exceptions.Unauthorized,
self.alt_client.create_access_rule,
self.share["id"],
access_to="1.1.1.1")
# There is no need to perform security tests twice
#class SharesSecurityNegativeTestXML(SharesSecurityNegativeTestJSON):
# _interface = 'xml'

View File

@ -0,0 +1,295 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest.common.utils.data_utils import rand_name
from tempest import exceptions
from tempest import test
class SharesTestJSON(base.BaseSharesTest):
def tearDown(self):
super(SharesTestJSON, self).tearDown()
self.clear_resources()
@test.attr(type=['positive', ])
def test_create_delete_share(self):
# create share
resp, share = self.create_share_wait_for_active()
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# delete share
resp, __ = self.shares_client.delete_share(share['id'])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.shares_client.wait_for_resource_deletion(share['id'])
self.assertRaises(exceptions.NotFound,
self.shares_client.get_share,
share['id'])
@test.attr(type=['positive', ])
def test_get_share(self):
# test data
name = rand_name("rand-share-name-")
desc = rand_name("rand-share-description-")
size = 1
# create share
resp, share = self.create_share_wait_for_active(name=name,
description=desc,
size=size)
# get share
resp, share = self.shares_client.get_share(share['id'])
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify keys
expected_keys = ["status", "description", "links", "availability_zone",
"created_at", "export_location", "share_proto",
"name", "snapshot_id", "id", "size"]
actual_keys = share.keys()
[self.assertIn(key, actual_keys) for key in expected_keys]
# verify values
msg = "Expected name: '%s', actual name: '%s'" % (name, share["name"])
self.assertEqual(name, str(share["name"]), msg)
msg = "Expected description: '%s', "\
"actual description: '%s'" % (desc, share["description"])
self.assertEqual(desc, str(share["description"]), msg)
msg = "Expected size: '%s', actual size: '%s'" % (size, share["size"])
self.assertEqual(size, int(share["size"]), msg)
@test.attr(type=['positive', ])
def test_list_shares(self):
# create share
resp, share = self.create_share_wait_for_active()
# list shares
resp, shares = self.shares_client.list_shares()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify keys
keys = ["name", "id", "links"]
[self.assertIn(key, sh.keys()) for sh in shares for key in keys]
# our share id in list and have no duplicates
gen = [sid["id"] for sid in shares if sid["id"] in share["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEqual(len(gen), 1, msg)
@test.attr(type=['positive', 'gate'])
def test_list_shares_with_detail(self):
# create share
resp, share = self.create_share_wait_for_active()
# list shares
resp, shares = self.shares_client.list_shares_with_detail()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify keys
keys = ["status", "description", "links", "availability_zone",
"created_at", "export_location", "share_proto",
"name", "snapshot_id", "id", "size"]
[self.assertIn(key, sh.keys()) for sh in shares for key in keys]
# our share id in list and have no duplicates
gen = [sid["id"] for sid in shares if sid["id"] in share["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEqual(len(gen), 1, msg)
@test.attr(type=['positive', ])
def test_create_delete_snapshot(self):
# create share
resp, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
# delete snapshot
self.shares_client.delete_snapshot(snap["id"])
self.shares_client.wait_for_resource_deletion(snap["id"])
self.assertRaises(exceptions.NotFound,
self.shares_client.get_snapshot, snap['id'])
@test.attr(type=['positive', ])
def test_get_snapshot(self):
# create share
resp, share = self.create_share_wait_for_active()
#create snapshot
name = rand_name("tempest-snap-")
desc = rand_name("tempest-snap-description-")
resp, snap = self.create_snapshot_wait_for_active(share["id"],
name, desc)
# get snapshot
resp, get = self.shares_client.get_snapshot(snap["id"])
# verify data
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify keys
expected_keys = ["status", "links", "share_id", "name",
"export_location", "share_proto", "created_at",
"description", "id", "share_size"]
actual_keys = get.keys()
[self.assertIn(key, actual_keys) for key in expected_keys]
# verify data
msg = "Expected name: '%s', actual name: '%s'" % (name, get["name"])
self.assertEqual(name, get["name"], msg)
msg = "Expected description: '%s', "\
"actual description: '%s'" % (desc, get["description"])
self.assertEqual(desc, get["description"], msg)
msg = "Expected share_id: '%s', "\
"actual share_id: '%s'" % (name, get["share_id"])
self.assertEqual(share["id"], get["share_id"], msg)
@test.attr(type=['positive', ])
def test_list_snapshots(self):
# create share
resp, share = self.create_share_wait_for_active()
#create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
# list share snapshots
resp, snaps = self.shares_client.list_snapshots()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify keys
keys = ["id", "name", "links"]
[self.assertIn(key, sn.keys()) for sn in snaps for key in keys]
# our share id in list and have no duplicates
gen = [sid["id"] for sid in snaps if sid["id"] in snap["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEquals(1, len(gen), msg)
@test.attr(type=['positive', 'gate'])
def test_list_snapshots_with_detail(self):
# create share
resp, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
# list share snapshots
resp, snaps = self.shares_client.list_snapshots_with_detail()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
# verify keys
keys = ["status", "links", "share_id", "name",
"export_location", "share_proto", "created_at",
"description", "id", "share_size"]
[self.assertIn(key, sn.keys()) for sn in snaps for key in keys]
# our share id in list and have no duplicates
gen = [sid["id"] for sid in snaps if sid["id"] in snap["id"]]
msg = "expected id lists %s times in share list" % (len(gen))
self.assertEqual(len(gen), 1, msg)
@test.attr(type=['positive', 'smoke', 'gate'])
def test_create_share_from_snapshot(self):
# create share
resp, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
# crate share from snapshot
resp, s2 = self.create_share_wait_for_active(snapshot_id=snap["id"])
# verify share, created from snapshot
resp, get = self.shares_client.get_share(s2["id"])
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
msg = "Expected snapshot_id %s as "\
"source of share %s" % (snap["id"], get["snapshot_id"])
self.assertEqual(get["snapshot_id"], snap["id"], msg)
@test.attr(type=['positive', 'smoke', 'gate'])
def test_extensions(self):
# get extensions
resp, extensions = self.shares_client.list_extensions()
# verify response
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
keys = ['alias', 'updated', 'namespace', 'name', 'description']
[self.assertIn(key, ext.keys()) for ext in extensions for key in keys]
@test.attr(type=['positive', ])
def test_rename_share(self):
# create share
_, share = self.create_share_wait_for_active()
# rename share
new_name = rand_name("new_name_")
new_desc = rand_name("new_desc_")
resp, renamed = self.shares_client.rename(share["id"],
new_name,
new_desc)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(new_name, renamed["name"])
self.assertEqual(new_desc, renamed["description"])
@test.attr(type=['positive', ])
def test_rename_snapshot(self):
# create share
_, share = self.create_share_wait_for_active()
# create snapshot
_, snap = self.create_snapshot_wait_for_active(share["id"])
# rename snapshot
new_name = rand_name("new_name_for_snap_")
new_desc = rand_name("new_desc_for_snap_")
resp, renamed = self.shares_client.rename_snapshot(snap["id"],
new_name,
new_desc)
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
self.assertEqual(new_name, renamed["name"])
self.assertEqual(new_desc, renamed["description"])
class SharesTestXML(SharesTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,118 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.shares import base
from tempest import exceptions
from tempest import exceptions_shares
from tempest import test
class SharesNegativeTestJSON(base.BaseSharesTest):
@test.attr(type='negative')
def test_create_share_with_invalid_protocol(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share,
share_protocol="nonexistent_protocol")
@test.attr(type='negative')
def test_get_share_with_wrong_id(self):
self.assertRaises(exceptions.NotFound, self.shares_client.get_share,
"wrong_share_id")
@test.attr(type='negative')
def test_get_share_without_passing_share_id(self):
# Should not be able to get share when empty ID is passed
self.assertRaises(exceptions.NotFound,
self.shares_client.get_share, '')
@test.attr(type='negative')
def test_delete_share_with_wrong_id(self):
self.assertRaises(exceptions.NotFound, self.shares_client.delete_share,
"wrong_share_id")
@test.attr(type='negative')
def test_delete_share_without_passing_share_id(self):
# Should not be able to delete share when empty ID is passed
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_share, '')
@test.attr(type='negative')
def test_create_snapshot_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.create_snapshot,
"wrong_share_id")
@test.attr(type='negative')
def test_delete_snapshot_with_wrong_id(self):
self.assertRaises(exceptions.NotFound,
self.shares_client.delete_snapshot,
"wrong_share_id")
@test.attr(type='negative')
def test_create_share_with_invalid_size(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share, size="#$%")
@test.attr(type='negative')
def test_create_share_with_out_passing_size(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share, size="")
@test.attr(type='negative')
def test_create_share_with_zero_size(self):
self.assertRaises(exceptions.BadRequest,
self.shares_client.create_share, size=0)
@test.attr(type='negative')
def test_try_delete_share_with_existing_snapshot(self):
# share can not be deleted while snapshot exists
# create share
resp, share = self.create_share_wait_for_active()
# create snapshot
resp, snap = self.create_snapshot_wait_for_active(share["id"])
# try delete share
self.assertRaises(exceptions.Unauthorized,
self.shares_client.delete_share, share["id"])
@test.attr(type='negative')
def test_create_share_from_snap_with_less_size(self):
# requires minimum 5Gb available space
skip_msg = "Check disc space for this test"
try: # create share
_, share = self.create_share_wait_for_active(size=2)
except exceptions_shares.ShareBuildErrorException:
self.skip(skip_msg)
try: # create snapshot
_, snap = self.create_snapshot_wait_for_active(share["id"])
except exceptions.SnapshotBuildErrorException:
self.skip(skip_msg)
# try create share from snapshot with less size
self.assertRaises(exceptions.BadRequest,
self.create_share_wait_for_active,
size=1, snapshot_id=snap["id"])
class SharesNegativeTestXML(SharesNegativeTestJSON):
_interface = 'xml'

View File

@ -0,0 +1,26 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import cli
class ClientTestBase(cli.ClientTestBase):
def manila(self, action, flags='', params='', admin=True, fail_ok=False):
"""Executes manila command for the given action."""
return self.cmd_with_auth(
'manila', action, flags, params, admin, fail_ok)

View File

@ -0,0 +1,140 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import subprocess
from tempest.cli import manilaclient
from tempest import config_shares as config
CONF = config.CONF
class SimpleReadOnlyManilaClientTest(manilaclient.ClientTestBase):
"""Basic, read-only tests for Manila CLI client.
Checks return values and output of read-only commands.
These tests do not presume any content, nor do they create
their own. They only verify the structure of output if present.
"""
@classmethod
def setUpClass(cls):
super(SimpleReadOnlyManilaClientTest, cls).setUpClass()
if not CONF.service_available.manila:
raise cls.skipException("Manila not available")
def test_manila_fake_action(self):
self.assertRaises(subprocess.CalledProcessError,
self.manila, 'this-does-not-exist')
def test_manila_absolute_limit_list(self):
roles = self.parser.listing(self.manila('absolute-limits'))
self.assertTableStruct(roles, ['Name', 'Value'])
def test_manila_shares_list(self):
self.manila('list')
def test_manila_shares_list_all_tenants(self):
self.manila('list', params='--all-tenants')
def test_manila_shares_list_filter_by_name(self):
self.manila('list', params='--name name')
def test_manila_shares_list_filter_by_status(self):
self.manila('list', params='--status status')
def test_manila_endpoints(self):
self.manila('endpoints')
def test_manila_quota_class_show(self):
"""This CLI can accept and string as param."""
roles = self.parser.listing(self.manila('quota-class-show',
params='abc'))
self.assertTableStruct(roles, ['Property', 'Value'])
def test_manila_quota_defaults(self):
"""This CLI can accept and string as param."""
roles = self.parser.listing(self.manila('quota-defaults',
params=self.identity.
admin_tenant_name))
self.assertTableStruct(roles, ['Property', 'Value'])
def test_manila_quota_show(self):
"""This CLI can accept and string as param."""
roles = self.parser.listing(self.manila('quota-show',
params=self.identity.
admin_tenant_name))
self.assertTableStruct(roles, ['Property', 'Value'])
def test_manila_rate_limits(self):
self.manila('rate-limits')
def test_manila_snapshot_list(self):
self.manila('snapshot-list')
def test_manila_snapshot_list_all_tenants(self):
self.manila('snapshot-list', params='--all-tenants')
def test_manila_snapshot_list_filter_by_name(self):
self.manila('snapshot-list', params='--name name')
def test_manila_snapshot_list_filter_by_status(self):
self.manila('snapshot-list', params='--status status')
def test_manila_snapshot_list_filter_by_share_id(self):
self.manila('snapshot-list', params='--share-id share_id')
def test_manila_credentials(self):
self.manila('credentials')
def test_manila_list_extensions(self):
roles = self.parser.listing(self.manila('list-extensions'))
self.assertTableStruct(roles, ['Name', 'Summary', 'Alias', 'Updated'])
def test_manila_help(self):
help_text = self.manila('help')
lines = help_text.split('\n')
self.assertFirstLineStartsWith(lines, 'usage: manila')
commands = []
cmds_start = lines.index('Positional arguments:')
cmds_end = lines.index('Optional arguments:')
command_pattern = re.compile('^ {4}([a-z0-9\-\_]+)')
for line in lines[cmds_start:cmds_end]:
match = command_pattern.match(line)
if match:
commands.append(match.group(1))
commands = set(commands)
wanted_commands = set(('absolute-limits', 'list', 'help',
'quota-show', 'access-list', 'snapshot-list',
'allow-access', 'deny-access'))
self.assertFalse(wanted_commands - commands)
# Optional arguments:
def test_manila_version(self):
self.manila('', flags='--version')
def test_manila_debug_list(self):
self.manila('list', flags='--debug')
def test_manila_retries_list(self):
self.manila('list', flags='--retries 3')
def test_manila_region_list(self):
self.manila('list', flags='--os-region-name ' + self.identity.region)

View File

@ -0,0 +1,74 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import clients
from tempest import config_shares as config
from tempest import exceptions
from tempest.services.shares.json import shares_client as j_shares_client
from tempest.services.shares.xml import shares_client as x_shares_client
CONF = config.CONF
class Manager(clients.Manager):
"""
Top level manager for OpenStack Compute clients
"""
def __init__(self, username=None, password=None, tenant_name=None,
interface='json'):
super(Manager, self).__init__(username, password, tenant_name,
interface)
client_args = (CONF, self.username, self.password,
self.auth_url, self.tenant_name)
if interface == 'xml':
self.shares_client = x_shares_client.SharesClientXML(*client_args)
elif interface == 'json':
self.shares_client = j_shares_client.SharesClientJSON(*client_args)
else:
msg = "Unsupported interface type `%s'" % interface
raise exceptions.InvalidConfiguration(msg)
class AltManager(Manager):
"""
Manager object that uses the alt_XXX credentials for its
managed client objects
"""
def __init__(self, interface='json'):
super(AltManager, self).__init__(CONF.identity.alt_username,
CONF.identity.alt_password,
CONF.identity.alt_tenant_name,
interface=interface)
class AdminManager(Manager):
"""
Manager object that uses the admin credentials for its
managed client objects
"""
def __init__(self, interface='json'):
super(AdminManager, self).__init__(CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name,
interface=interface)

View File

@ -0,0 +1,78 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
from oslo.config import cfg
from tempest import config
service_available_group = cfg.OptGroup(name="service_available",
title="Available OpenStack Services")
ServiceAvailableGroup = [
cfg.BoolOpt('manila',
default=True,
help="Whether or not manila is expected to be available"),
]
shares_group = cfg.OptGroup(name="shares",
title="Shares Service Options")
SharesGroup = [
cfg.StrOpt('share_protocol',
default="nfs",
help="File share type by default"),
cfg.IntOpt('build_interval',
default=10,
help='Time in seconds between volume availability checks.'),
cfg.IntOpt('build_timeout',
default=300,
help='Timeout in seconds to wait for a volume to become'
'available.'),
cfg.StrOpt('catalog_type',
default="share",
help='Catalog type of the Shares service.'),
cfg.BoolOpt('only_admin_or_owner_for_action',
default=True,
help='This flag use tests that verify policy.json rules'),
]
# this should never be called outside of this class
class TempestConfigPrivateManila(config.TempestConfigPrivate):
# manila's config wrap over standard config
def __init__(self, parse_conf=True):
super(TempestConfigPrivateManila, self).__init__()
config.register_opt_group(cfg.CONF, service_available_group,
ServiceAvailableGroup)
config.register_opt_group(cfg.CONF, shares_group, SharesGroup)
self.shares = cfg.CONF.shares
class TempestConfigProxyManila(object):
_config = None
def __getattr__(self, attr):
if not self._config:
self._config = TempestConfigPrivateManila()
return getattr(self._config, attr)
CONF = TempestConfigProxyManila()

View File

@ -0,0 +1,26 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import exceptions
class ShareBuildErrorException(exceptions.TempestException):
message = "Share %(share_id)s failed to build and is in ERROR status"
class AccessRuleBuildErrorException(exceptions.TempestException):
message = "Share's rule with id %(rule_id) is in ERROR status"

View File

@ -0,0 +1,345 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from tempest.common import rest_client
from tempest.common.utils.data_utils import rand_name
from tempest import exceptions
from tempest import exceptions_shares
import time
import urllib
class SharesClientJSON(rest_client.RestClient):
"""
Tempest REST client for Manila.
It handles shares and access to it in openstack.
"""
def __init__(self, config, username, password, auth_url, tenant_name=None):
super(SharesClientJSON, self).__init__(config, username, password,
auth_url, tenant_name)
self.service = self.config.shares.catalog_type # share
self.share_protocol = self.config.shares.share_protocol
self.build_interval = self.config.shares.build_interval
self.build_timeout = self.config.shares.build_timeout
self.tenant_name = tenant_name
self.username = username
def _parse_resp(self, body):
if len(body) > 0:
body = json.loads(body)
if len(body) is 1 and isinstance(body.items()[0][1], (dict, list)):
return body[body.items()[0][0]]
return body
def create_share(self, share_protocol=None, size=1,
name=None, snapshot_id=None,
description="tempest created share",
metadata={}):
if name is None:
name = rand_name("tempest-created-share-")
if share_protocol is None:
share_protocol = self.share_protocol
post_body = {
"share": {
"share_proto": share_protocol,
"description": description,
"snapshot_id": snapshot_id,
"name": name,
"size": size,
"metadata": metadata
}
}
body = json.dumps(post_body)
resp, body = self.post("shares", body, self.headers)
return resp, self._parse_resp(body)
def delete_share(self, share_id):
resp, body = self.delete("shares/%s" % share_id, self.headers)
return resp, self._parse_resp(body)
def list_shares(self):
resp, body = self.get("shares", self.headers)
return resp, self._parse_resp(body)
def list_shares_with_detail(self, params=None):
"""List the details of all shares."""
url = 'shares/detail'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url, self.headers)
return resp, self._parse_resp(body)
def get_share(self, share_id):
uri = "shares/%s" % share_id
resp, body = self.get(uri, self.headers)
return resp, self._parse_resp(body)
def create_access_rule(self, share_id,
access_type="ip", access_to="0.0.0.0"):
post_body = {
"os-allow_access": {
"access_type": access_type,
"access_to": access_to
}
}
body = json.dumps(post_body)
uri = "shares/%s/action" % share_id
resp, body = self.post(uri, body, self.headers)
return resp, self._parse_resp(body)
def list_access_rules(self, share_id):
uri = "shares/%s/action" % share_id
body = {"os-access_list": None}
resp, body = self.post(uri, json.dumps(body), self.headers)
return resp, self._parse_resp(body)
def delete_access_rule(self, share_id, rule_id):
post_body = {
"os-deny_access": {
"access_id": rule_id
}
}
body = json.dumps(post_body)
uri = "shares/%s/action" % share_id
return self.post(uri, body, self.headers)
def create_snapshot(self, share_id, name=None,
description="tempest created share-ss",
force=False):
if name is None:
name = rand_name("tempest-created-share-snap-")
post_body = {
"snapshot": {
"name": name,
"force": force,
"description": description,
"share_id": share_id
}
}
body = json.dumps(post_body)
resp, body = self.post("snapshots", body, self.headers)
return resp, self._parse_resp(body)
def get_snapshot(self, snapshot_id):
uri = "snapshots/%s" % snapshot_id
resp, body = self.get(uri, self.headers)
return resp, self._parse_resp(body)
def list_snapshots(self):
resp, body = self.get("snapshots", self.headers)
return resp, self._parse_resp(body)
def list_snapshots_with_detail(self, params=None):
"""List the details of all shares."""
url = 'snapshots/detail'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url, self.headers)
return resp, self._parse_resp(body)
def delete_snapshot(self, snap_id):
uri = "snapshots/%s" % snap_id
resp, body = self.delete(uri, self.headers)
return resp, self._parse_resp(body)
def wait_for_share_status(self, share_id, status):
"""Waits for a Share to reach a given status."""
resp, body = self.get_share(share_id)
share_name = body['name']
share_status = body['status']
start = int(time.time())
while share_status != status:
time.sleep(self.build_interval)
resp, body = self.get_share(share_id)
share_status = body['status']
if 'error' in share_status:
raise exceptions_shares.\
ShareBuildErrorException(share_id=share_id)
if int(time.time()) - start >= self.build_timeout:
message = ('Share %s failed to reach %s status within '
'the required time (%s s).' %
(share_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def wait_for_snapshot_status(self, snapshot_id, status):
"""Waits for a Share to reach a given status."""
resp, body = self.get_snapshot(snapshot_id)
snapshot_name = body['name']
snapshot_status = body['status']
start = int(time.time())
while snapshot_status != status:
time.sleep(self.build_interval)
resp, body = self.get_snapshot(snapshot_id)
snapshot_status = body['status']
if 'error' in snapshot_status:
raise exceptions.\
SnapshotBuildErrorException(snapshot_id=snapshot_id)
if int(time.time()) - start >= self.build_timeout:
message = ('Share Snapshot %s failed to reach %s status '
'within the required time (%s s).' %
(snapshot_name, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def wait_for_access_rule_status(self, share_id, rule_id, status):
"""Waits for a Share to reach a given status."""
rule_status = "new"
start = int(time.time())
while rule_status != status:
time.sleep(self.build_interval)
resp, rules = self.list_access_rules(share_id)
for rule in rules:
if rule["id"] in rule_id:
rule_status = rule['state']
break
if 'error' in rule_status:
raise exceptions_shares.\
AccessRuleBuildErrorException(rule_id=rule_id)
if int(time.time()) - start >= self.build_timeout:
message = ('Share Access Rule %s failed to reach %s status '
'within the required time (%s s).' %
(rule_id, status, self.build_timeout))
raise exceptions.TimeoutException(message)
def default_quotas(self, tenant_id):
uri = "os-quota-sets/%s/defaults" % tenant_id
resp, body = self.get(uri, self.headers)
return resp, self._parse_resp(body)
def show_quotas(self, tenant_id, user_id=None):
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % (user_id)
resp, body = self.get(uri, self.headers)
return resp, self._parse_resp(body)
def reset_quotas(self, tenant_id, user_id=None):
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % user_id
resp, body = self.delete(uri, self.headers)
return resp, self._parse_resp(body)
def update_quotas(self, tenant_id, user_id=None,
shares=None, snapshots=None,
gigabytes=None, force=True):
put_body = {"quota_set": {}}
put_body["quota_set"]["tenant_id"] = tenant_id
if force:
put_body["quota_set"]["force"] = "true"
if shares is not None:
put_body["quota_set"]["shares"] = shares
if snapshots is not None:
put_body["quota_set"]["snapshots"] = snapshots
if gigabytes is not None:
put_body["quota_set"]["gigabytes"] = gigabytes
put_body = json.dumps(put_body)
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % user_id
resp, body = self.put(uri, put_body, self.headers)
return resp, self._parse_resp(body)
def get_limits(self):
resp, body = self.get("limits", self.headers)
return resp, self._parse_resp(body)
def is_resource_deleted(self, s_id, rule_id=None):
if rule_id is None:
try:
self.get_snapshot(s_id)
except exceptions.NotFound:
try:
self.get_share(s_id)
except exceptions.NotFound:
return True
return False
else:
_, rules = self.list_share_access_rules(s_id)
for rule in rules:
if rule["id"] in rule_id:
return False
return True
def list_extensions(self):
resp, extensions = self.get("extensions", self.headers)
return resp, self._parse_resp(extensions)
def rename(self, share_id, name, desc=None):
uri = "shares/%s" % share_id
body = {"share": {"display_name": name}}
if desc is not None:
body["share"].update({"display_description": desc})
body = json.dumps(body)
resp, body = self.put(uri, body, self.headers)
return resp, self._parse_resp(body)
def rename_snapshot(self, snapshot_id, name, desc=None):
uri = "snapshots/%s" % snapshot_id
body = {"snapshot": {"display_name": name}}
if desc is not None:
body["snapshot"].update({"display_description": desc})
body = json.dumps(body)
resp, body = self.put(uri, body, self.headers)
return resp, self._parse_resp(body)
def reset_state(self, s_id, status="error", s_type="shares"):
"""
Resets the state of a share or a snapshot
status: available, error, creating, deleting, error_deleting
s_type: shares, snapshots
"""
uri = "%s/%s/action" % (s_type, s_id)
body = {"os-reset_status": {"status": status}}
body = json.dumps(body)
resp, body = self.post(uri, body, self.headers)
return resp, self._parse_resp(body)
###############
def _update_metadata(self, share_id, metadata={}, method="post"):
uri = "shares/%s/metadata" % share_id
post_body = {"metadata": metadata}
body = json.dumps(post_body)
if method is "post":
resp, metadata = self.post(uri, body, self.headers)
if method is "put":
resp, metadata = self.put(uri, body, self.headers)
return resp, self._parse_resp(metadata)
def set_metadata(self, share_id, metadata={}):
return self._update_metadata(share_id, metadata)
def update_all_metadata(self, share_id, metadata={}):
return self._update_metadata(share_id, metadata, method="put")
def delete_metadata(self, share_id, key):
uri = "shares/%s/metadata/%s" % (share_id, key)
resp, body = self.delete(uri, self.headers)
return resp, self._parse_resp(body)
def get_metadata(self, share_id):
uri = "shares/%s/metadata" % share_id
resp, body = self.get(uri, self.headers)
return resp, self._parse_resp(body)

View File

@ -0,0 +1,204 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from lxml import etree
from tempest.common.utils.data_utils import rand_name
from tempest.services.compute.xml import common as xml
from tempest.services.shares.json import shares_client
class SharesClientXML(shares_client.SharesClientJSON):
"""
Tempest REST client for Manila.
It handles shares and access to it in openstack.
"""
def __init__(self, config, username, password, auth_url, tenant_name=None):
super(SharesClientXML, self).__init__(config, username, password,
auth_url, tenant_name)
self.TYPE = "xml" # from RestClientXML
self.headers["Content-Type"] = "application/%s" % self.TYPE
self.headers["Accept"] = "application/%s" % self.TYPE
def _parse_resp(self, body): # from RestClientXML
if len(body) > 0:
element = etree.fromstring(body)
entity_list = ["shares", "snapshots", "extensions", "access_list"]
if "metadata" in element.tag:
dictionary = {}
for el in element.getchildren():
dictionary[u"%s" % el.get("key")] = u"%s" % el.text
return dictionary
elif any(s in element.tag for s in entity_list):
s_list = []
if element is not None:
s_list += [xml.xml_to_json(sh) for sh in list(element)]
return s_list
else:
return xml.xml_to_json(element)
return body
def is_absolute_limit(self, resp, resp_body): # from RestClientXML
if (not isinstance(resp_body, collections.Mapping) or
'retry-after' not in resp):
return True
return 'exceed' in resp_body.get('message', 'blabla')
def create_share(self, share_protocol=None,
size=1, name=None, snapshot_id=None,
description="tempest created share",
metadata={}):
if name is None:
name = rand_name("tempest-created-share-")
if share_protocol is None:
share_protocol = self.share_protocol
share = xml.Element("share", xmlns=xml.XMLNS_11)
share.append(xml.Element("share_proto", share_protocol))
if description is not None:
share.append(xml.Element("description", description))
if snapshot_id is not None:
share.append(xml.Element("snapshot_id", snapshot_id))
share.append(xml.Element("name", name))
share.append(xml.Element("size", size))
metadata_el = xml.Element("metadata")
for key, value in metadata.iteritems():
metadata_el.append(xml.Element(key, value))
share.append(metadata_el)
resp, body = self.post('shares', str(xml.Document(share)),
self.headers)
return resp, self._parse_resp(body)
def create_access_rule(self, share_id, access_type="ip",
access_to="0.0.0.0"):
rule = xml.Element("os-allow_access", xmlns=xml.XMLNS_11)
rule.append(xml.Element("access_type", access_type))
rule.append(xml.Element("access_to", access_to))
uri = "shares/%s/action" % (share_id)
resp, body = self.post(uri, str(xml.Document(rule)), self.headers)
return resp, self._parse_resp(body)
def list_access_rules(self, share_id):
uri = "shares/%s/action" % (share_id)
access_list = xml.Element("os-access_list",
xmlns=xml.XMLNS_11,
value=None)
resp, body = self.post(uri, str(xml.Document(access_list)),
self.headers)
return resp, self._parse_resp(body)
def delete_access_rule(self, share_id, rule_id):
rule = xml.Element("os-deny_access", xmlns=xml.XMLNS_11)
rule.append(xml.Element("access_id", rule_id))
uri = "shares/%s/action" % share_id
return self.post(uri, str(xml.Document(rule)), self.headers)
def create_snapshot(self, share_id, name=None,
description="tempest created share-ss", force=False):
if name is None:
name = rand_name("tempest-created-share-snap-")
snap = xml.Element("snapshot", xmlns=xml.XMLNS_11)
snap.append(xml.Element("name", name))
snap.append(xml.Element("force", force))
snap.append(xml.Element("description", description))
snap.append(xml.Element("share_id", share_id))
resp, body = self.post('snapshots', str(xml.Document(snap)),
self.headers)
return resp, self._parse_resp(body)
def update_quotas(self, tenant_id=None, user_id=None,
shares=None, snapshots=None, gigabytes=None,
force=True):
uri = "os-quota-sets/%s" % tenant_id
if user_id is not None:
uri += "?user_id=%s" % user_id
upd = xml.Element("quota_set", id=tenant_id)
if force:
upd.append(xml.Element("force", "true"))
if shares is not None:
upd.append(xml.Element("shares", shares))
if snapshots is not None:
upd.append(xml.Element("snapshots", snapshots))
if gigabytes is not None:
upd.append(xml.Element("gigabytes", gigabytes))
resp, body = self.put(uri, str(xml.Document(upd)), self.headers)
return resp, self._parse_resp(body)
def get_limits(self):
resp, element = self.get("limits", self.headers)
element = etree.fromstring(element)
limits = {"rate": [], "absolute": {}}
for abs_el in element.getchildren():
if "absolute" in abs_el.tag:
element = abs_el
break
for child in element.getchildren():
limit = {}
for key, value in child.attrib.iteritems():
limit[key] = value
limits["absolute"][limit["name"]] = limit["value"]
return resp, limits
def rename(self, share_id, name, desc=None):
uri = "shares/%s" % share_id
share = xml.Element("share", xmlns=xml.XMLNS_11)
share.append(xml.Element("display_name", name))
if desc is not None:
share.append(xml.Element("display_description", desc))
resp, body = self.put(uri, str(xml.Document(share)), self.headers)
return resp, self._parse_resp(body)
def rename_snapshot(self, snapshot_id, name, desc=None):
uri = "snapshots/%s" % snapshot_id
snap = xml.Element("snapshot", xmlns=xml.XMLNS_11)
snap.append(xml.Element("display_name", name))
if desc is not None:
snap.append(xml.Element("display_description", desc))
resp, body = self.put(uri, str(xml.Document(snap)), self.headers)
return resp, self._parse_resp(body)
def reset_state(self, s_id, status="error", s_type="shares"):
"""
Resets the state of a share or a snapshot
status: available, error, creating, deleting, error_deleting
s_type: shares, snapshots
"""
uri = "%s/%s/action" % (s_type, s_id)
body = xml.Element("os-reset_status", xmlns=xml.XMLNS_11)
body.append(xml.Element("status", status))
resp, body = self.post(uri, str(xml.Document(body)), self.headers)
return resp, self._parse_resp(body)
def _update_metadata(self, share_id, metadata={}, method="post"):
uri = "shares/%s/metadata" % (str(share_id))
metadata_el = xml.Element("metadata")
for key, value in metadata.iteritems():
metadata_el.append(xml.Element("meta", value, key=key))
meta_str = str(xml.Document(metadata_el))
if method is "post":
resp, body = self.post(uri, meta_str, self.headers)
elif method is "put":
resp, body = self.put(uri, meta_str, self.headers)
metas = self._parse_resp(body)
return resp, metas