Add new search options for security service
We can filter security services only by status, name, id and type now. Add new search options: user, server, dns_ip, domain to _get_security_services in SecurityServiceController to be able to filter security services by these fields. Allow filtering security services by share network id. Add information about share networks to result if 'detailed' mode enabled. Add unit and tempest tests for filtering security services. Implements bp improve-security-service-list-filtering Change-Id: I8b3845c2d705188ec1dc0db33c1e20c8e6c5e559
This commit is contained in:
parent
30f105a1f0
commit
f3289eb610
@ -0,0 +1,68 @@
|
||||
# Copyright 2014 Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from tempest.api.share import base
|
||||
from tempest.api.share import test_security_services
|
||||
from tempest import test
|
||||
|
||||
|
||||
class SecurityServiceAdminTest(
|
||||
base.BaseSharesAdminTest,
|
||||
test_security_services.SecurityServiceListMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(SecurityServiceAdminTest, self).setUp()
|
||||
ss_ldap_data = {
|
||||
'name': 'ss_ldap',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'server': 'fake_server_1',
|
||||
'domain': 'fake_domain_1',
|
||||
'user': 'fake_user',
|
||||
'password': 'pass',
|
||||
}
|
||||
ss_kerberos_data = {
|
||||
'name': 'ss_kerberos',
|
||||
'dns_ip': '2.2.2.2',
|
||||
'server': 'fake_server_2',
|
||||
'domain': 'fake_domain_2',
|
||||
'user': 'test_user',
|
||||
'password': 'word',
|
||||
}
|
||||
resp, self.ss_ldap = self.create_security_service('ldap',
|
||||
**ss_ldap_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
resp, self.ss_kerberos = self.create_security_service(
|
||||
'kerberos',
|
||||
**ss_kerberos_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
@test.attr(type=["gate", "smoke", ])
|
||||
def test_list_security_services_all_tenants(self):
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'all_tenants': 1})
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke", ])
|
||||
def test_list_security_services_invalid_filters(self):
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'fake_opt': 'some_value'})
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertEqual(0, len(listed))
|
@ -22,7 +22,110 @@ from tempest import test
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SecurityServicesTest(base.BaseSharesTest):
|
||||
class SecurityServiceListMixin(object):
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services(self):
|
||||
resp, listed = self.shares_client.list_security_services()
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
||||
# verify keys
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_with_detail(self):
|
||||
resp, listed = self.shares_client.list_security_services(detailed=True)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
||||
# verify keys
|
||||
keys = [
|
||||
"name", "id", "status", "description",
|
||||
"domain", "server", "dns_ip", "user", "password", "type",
|
||||
"created_at", "updated_at", "project_id",
|
||||
]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_filter_by_share_network(self):
|
||||
sn = self.shares_client.get_share_network(
|
||||
self.os.shares_client.share_network_id)[1]
|
||||
fresh_sn = []
|
||||
for i in range(2):
|
||||
resp, sn = self.create_share_network(
|
||||
neutron_net_id=sn["neutron_net_id"],
|
||||
neutron_subnet_id=sn["neutron_subnet_id"])
|
||||
fresh_sn.append(sn)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
resp, body = self.shares_client.add_sec_service_to_share_network(
|
||||
fresh_sn[0]["id"], self.ss_ldap["id"])
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
resp, body = self.shares_client.add_sec_service_to_share_network(
|
||||
fresh_sn[1]["id"], self.ss_kerberos["id"])
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'share_network_id': fresh_sn[0]['id']})
|
||||
self.assertEqual(1, len(listed))
|
||||
self.assertEqual(self.ss_ldap['id'], listed[0]['id'])
|
||||
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_filter_by_ss_attributes(self):
|
||||
search_opts = {
|
||||
'status': 'NEW',
|
||||
'name': 'ss_ldap',
|
||||
'type': 'ldap',
|
||||
'user': 'fake_user',
|
||||
'server': 'fake_server_1',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'domain': 'fake_domain_1',
|
||||
}
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params=search_opts)
|
||||
self.assertEqual(1, len(listed))
|
||||
self.assertEqual(self.ss_ldap['id'], listed[0]['id'])
|
||||
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
|
||||
class SecurityServicesTest(base.BaseSharesTest,
|
||||
SecurityServiceListMixin):
|
||||
def setUp(self):
|
||||
super(SecurityServicesTest, self).setUp()
|
||||
ss_ldap_data = {
|
||||
'name': 'ss_ldap',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'server': 'fake_server_1',
|
||||
'domain': 'fake_domain_1',
|
||||
'user': 'fake_user',
|
||||
'password': 'pass',
|
||||
}
|
||||
ss_kerberos_data = {
|
||||
'name': 'ss_kerberos',
|
||||
'dns_ip': '2.2.2.2',
|
||||
'server': 'fake_server_2',
|
||||
'domain': 'fake_domain_2',
|
||||
'user': 'test_user',
|
||||
'password': 'word',
|
||||
}
|
||||
resp, self.ss_ldap = self.create_security_service('ldap',
|
||||
**ss_ldap_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
resp, self.ss_kerberos = self.create_security_service(
|
||||
'kerberos',
|
||||
**ss_kerberos_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_create_delete_security_service(self):
|
||||
@ -105,35 +208,10 @@ class SecurityServicesTest(base.BaseSharesTest):
|
||||
self.assertDictContainsSubset(update_data, updated)
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services(self):
|
||||
data = self.generate_security_service_data()
|
||||
resp, ss = self.create_security_service(**data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertDictContainsSubset(data, ss)
|
||||
|
||||
resp, listed = self.shares_client.list_security_services()
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
any(ss["id"] in ss["id"] for ss in listed)
|
||||
|
||||
# verify keys
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_with_detail(self):
|
||||
data = self.generate_security_service_data()
|
||||
resp, ss = self.create_security_service(**data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertDictContainsSubset(data, ss)
|
||||
|
||||
resp, listed = self.shares_client.list_security_services_with_detail()
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
any(ss["id"] in ss["id"] for ss in listed)
|
||||
|
||||
# verify keys
|
||||
keys = [
|
||||
"name", "id", "status", "description",
|
||||
"domain", "server", "dns_ip", "user", "password", "type",
|
||||
"created_at", "updated_at", "project_id",
|
||||
]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
def test_list_security_services_filter_by_invalid_opt(self):
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'fake_opt': 'some_value'})
|
||||
self.assertIn(int(resp['status']), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
@ -120,3 +120,9 @@ class SecurityServicesNegativeTest(base.BaseSharesTest):
|
||||
self.assertRaises(exceptions.NotFound,
|
||||
self.shares_client.get_security_service,
|
||||
ss["id"])
|
||||
|
||||
@test.attr(type=["gate", "smoke", "negative"])
|
||||
def test_try_list_security_services_all_tenants(self):
|
||||
self.assertRaises(exceptions.Unauthorized,
|
||||
self.shares_client.list_security_services,
|
||||
params={'all_tenants': 1})
|
||||
|
@ -436,13 +436,10 @@ class SharesClient(rest_client.RestClient):
|
||||
resp, body = self.get("security-services/%s" % ss_id)
|
||||
return resp, self._parse_resp(body)
|
||||
|
||||
def list_security_services(self):
|
||||
resp, body = self.get("security-services")
|
||||
return resp, self._parse_resp(body)
|
||||
|
||||
def list_security_services_with_detail(self, params=None):
|
||||
"""List the details of all shares."""
|
||||
uri = "security-services/detail"
|
||||
def list_security_services(self, detailed=False, params=None):
|
||||
uri = "security-services"
|
||||
if detailed:
|
||||
uri += '/detail'
|
||||
if params:
|
||||
uri += "?%s" % urllib.urlencode(params)
|
||||
resp, body = self.get(uri)
|
||||
|
@ -45,6 +45,7 @@
|
||||
"security_service:show": [["rule:default"]],
|
||||
"security_service:index": [["rule:default"]],
|
||||
"security_service:detail": [["rule:default"]],
|
||||
"security_service:get_all_security_services": [["rule:admin_api"]],
|
||||
|
||||
"share_server:index": [["rule:admin_api"]],
|
||||
"share_server:show": [["rule:admin_api"]],
|
||||
|
@ -103,11 +103,15 @@ class SecurityServiceController(wsgi.Controller):
|
||||
@wsgi.serializers(xml=SecurityServicesTemplate)
|
||||
def index(self, req):
|
||||
"""Returns a summary list of security services."""
|
||||
policy.check_policy(req.environ['manila.context'], RESOURCE_NAME,
|
||||
'index')
|
||||
return self._get_security_services(req, is_detail=False)
|
||||
|
||||
@wsgi.serializers(xml=SecurityServicesTemplate)
|
||||
def detail(self, req):
|
||||
"""Returns a detailed list of security services."""
|
||||
policy.check_policy(req.environ['manila.context'], RESOURCE_NAME,
|
||||
'detail')
|
||||
return self._get_security_services(req, is_detail=True)
|
||||
|
||||
def _get_security_services(self, req, is_detail):
|
||||
@ -116,8 +120,6 @@ class SecurityServiceController(wsgi.Controller):
|
||||
The list gets transformed through view builder.
|
||||
"""
|
||||
context = req.environ['manila.context']
|
||||
policy.check_policy(context, RESOURCE_NAME,
|
||||
'get_all_security_services')
|
||||
|
||||
search_opts = {}
|
||||
search_opts.update(req.GET)
|
||||
@ -126,26 +128,27 @@ class SecurityServiceController(wsgi.Controller):
|
||||
share_nw = db.share_network_get(context,
|
||||
search_opts['share_network_id'])
|
||||
security_services = share_nw['security_services']
|
||||
del search_opts['share_network_id']
|
||||
else:
|
||||
if 'all_tenants' in search_opts:
|
||||
policy.check_policy(context, RESOURCE_NAME,
|
||||
'get_all_security_services')
|
||||
security_services = db.security_service_get_all(context)
|
||||
else:
|
||||
security_services = db.security_service_get_all_by_project(
|
||||
context, context.project_id)
|
||||
search_opts.pop('all_tenants', None)
|
||||
common.remove_invalid_options(
|
||||
context,
|
||||
search_opts,
|
||||
self._get_security_services_search_options())
|
||||
if 'all_tenants' in search_opts:
|
||||
security_services = db.security_service_get_all(context)
|
||||
del search_opts['all_tenants']
|
||||
else:
|
||||
security_services = db.security_service_get_all_by_project(
|
||||
context, context.project_id)
|
||||
if search_opts:
|
||||
results = []
|
||||
not_found = object()
|
||||
for service in security_services:
|
||||
for opt, value in six.iteritems(search_opts):
|
||||
if service.get(opt, not_found) != value:
|
||||
break
|
||||
else:
|
||||
results.append(service)
|
||||
for ss in security_services:
|
||||
if all(ss.get(opt, not_found) == value for opt, value in
|
||||
six.iteritems(search_opts)):
|
||||
results.append(ss)
|
||||
security_services = results
|
||||
|
||||
limited_list = common.limited(security_services, req)
|
||||
@ -153,13 +156,19 @@ class SecurityServiceController(wsgi.Controller):
|
||||
if is_detail:
|
||||
security_services = self._view_builder.detail_list(
|
||||
req, limited_list)
|
||||
for ss in security_services['security_services']:
|
||||
share_networks = db.share_network_get_all_by_security_service(
|
||||
context,
|
||||
ss['id'])
|
||||
ss['share_networks'] = [sn['id'] for sn in share_networks]
|
||||
else:
|
||||
security_services = self._view_builder.summary_list(
|
||||
req, limited_list)
|
||||
return security_services
|
||||
|
||||
def _get_security_services_search_options(self):
|
||||
return ('status', 'name', 'id', 'type', )
|
||||
return ('status', 'name', 'id', 'type', 'user',
|
||||
'server', 'dns_ip', 'domain', )
|
||||
|
||||
def _share_servers_dependent_on_sn_exist(self, context,
|
||||
security_service_id):
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from six.moves.urllib import parse
|
||||
import webob
|
||||
|
||||
from manila.api.v1 import security_service
|
||||
@ -30,7 +31,7 @@ class ShareApiTest(test.TestCase):
|
||||
super(ShareApiTest, self).setUp()
|
||||
self.controller = security_service.SecurityServiceController()
|
||||
self.maxDiff = None
|
||||
self.security_service = {
|
||||
self.ss_active_directory = {
|
||||
"created_at": "fake-time",
|
||||
"updated_at": "fake-time-2",
|
||||
"id": 1,
|
||||
@ -45,15 +46,55 @@ class ShareApiTest(test.TestCase):
|
||||
"status": "new",
|
||||
"project_id": "fake",
|
||||
}
|
||||
security_service.policy.check_policy = mock.Mock()
|
||||
self.ss_ldap = {
|
||||
"created_at": "fake-time",
|
||||
"updated_at": "fake-time-2",
|
||||
"id": 2,
|
||||
"name": "ss-ldap",
|
||||
"description": "Fake Security Service Desc",
|
||||
"type": constants.SECURITY_SERVICES_ALLOWED_TYPES[1],
|
||||
"dns_ip": "2.2.2.2",
|
||||
"server": "test-server",
|
||||
"domain": "test-domain",
|
||||
"user": "test-user",
|
||||
"password": "test-password",
|
||||
"status": "active",
|
||||
"project_id": "fake",
|
||||
}
|
||||
self.valid_search_opts = {
|
||||
'user': 'fake-user',
|
||||
'server': 'fake-server',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'domain': 'fake-domain',
|
||||
'status': 'new',
|
||||
'type': constants.SECURITY_SERVICES_ALLOWED_TYPES[0],
|
||||
}
|
||||
self.check_policy_patcher = mock.patch(
|
||||
'manila.api.v1.security_service.policy.check_policy')
|
||||
self.check_policy_patcher.start()
|
||||
self.addCleanup(self._stop_started_patcher, self.check_policy_patcher)
|
||||
self.security_service_list_expected_resp = {
|
||||
'security_services': [{
|
||||
'id': self.ss_active_directory['id'],
|
||||
'name': self.ss_active_directory['name'],
|
||||
'type': self.ss_active_directory['type'],
|
||||
'status': self.ss_active_directory['status']
|
||||
}, ]
|
||||
}
|
||||
|
||||
def _stop_started_patcher(self, patcher):
|
||||
if hasattr(patcher, 'is_local'):
|
||||
patcher.stop()
|
||||
|
||||
def test_security_service_show(self):
|
||||
db.security_service_get = mock.Mock(return_value=self.security_service)
|
||||
db.security_service_get = mock.Mock(
|
||||
return_value=self.ss_active_directory)
|
||||
req = fakes.HTTPRequest.blank('/security-services/1')
|
||||
res_dict = self.controller.show(req, '1')
|
||||
expected = self.security_service.copy()
|
||||
expected = self.ss_active_directory.copy()
|
||||
expected.update()
|
||||
self.assertEqual(res_dict, {'security_service': self.security_service})
|
||||
self.assertEqual(res_dict,
|
||||
{'security_service': self.ss_active_directory})
|
||||
|
||||
def test_security_service_show_not_found(self):
|
||||
db.security_service_get = mock.Mock(side_effect=exception.NotFound)
|
||||
@ -63,7 +104,7 @@ class ShareApiTest(test.TestCase):
|
||||
req, '1')
|
||||
|
||||
def test_security_service_create(self):
|
||||
sec_service = self.security_service.copy()
|
||||
sec_service = self.ss_active_directory.copy()
|
||||
create_stub = mock.Mock(
|
||||
return_value=sec_service)
|
||||
self.stubs.Set(db, 'security_service_create', create_stub)
|
||||
@ -71,11 +112,11 @@ class ShareApiTest(test.TestCase):
|
||||
req = fakes.HTTPRequest.blank('/security-services')
|
||||
res_dict = self.controller.create(
|
||||
req, {"security_service": sec_service})
|
||||
expected = self.security_service.copy()
|
||||
expected = self.ss_active_directory.copy()
|
||||
self.assertEqual(res_dict, {'security_service': expected})
|
||||
|
||||
def test_security_service_create_invalid_types(self):
|
||||
sec_service = self.security_service.copy()
|
||||
sec_service = self.ss_active_directory.copy()
|
||||
sec_service['type'] = 'invalid'
|
||||
req = fakes.HTTPRequest.blank('/security-services')
|
||||
self.assertRaises(exception.InvalidInput, self.controller.create, req,
|
||||
@ -117,8 +158,8 @@ class ShareApiTest(test.TestCase):
|
||||
req, 1)
|
||||
|
||||
def test_security_service_update_name(self):
|
||||
new = self.security_service.copy()
|
||||
updated = self.security_service.copy()
|
||||
new = self.ss_active_directory.copy()
|
||||
updated = self.ss_active_directory.copy()
|
||||
updated['name'] = 'new'
|
||||
db.security_service_get = mock.Mock(return_value=new)
|
||||
db.security_service_update = mock.Mock(return_value=updated)
|
||||
@ -135,8 +176,8 @@ class ShareApiTest(test.TestCase):
|
||||
req.environ['manila.context'], 1)
|
||||
|
||||
def test_security_service_update_description(self):
|
||||
new = self.security_service.copy()
|
||||
updated = self.security_service.copy()
|
||||
new = self.ss_active_directory.copy()
|
||||
updated = self.ss_active_directory.copy()
|
||||
updated['description'] = 'new'
|
||||
db.security_service_get = mock.Mock(return_value=new)
|
||||
db.security_service_update = mock.Mock(return_value=updated)
|
||||
@ -159,7 +200,7 @@ class ShareApiTest(test.TestCase):
|
||||
db.share_network_get_all_by_security_service.return_value = [
|
||||
{'id': 'fake_id', 'share_servers': 'fake_share_servers'},
|
||||
]
|
||||
security_service = self.security_service.copy()
|
||||
security_service = self.ss_active_directory.copy()
|
||||
db.security_service_get.return_value = security_service
|
||||
body = {'security_service': {'user_id': 'new_user'}}
|
||||
req = fakes.HTTPRequest.blank('/security_services/1')
|
||||
@ -178,8 +219,8 @@ class ShareApiTest(test.TestCase):
|
||||
db.share_network_get_all_by_security_service.return_value = [
|
||||
{'id': 'fake_id', 'share_servers': 'fake_share_servers'},
|
||||
]
|
||||
old = self.security_service.copy()
|
||||
updated = self.security_service.copy()
|
||||
old = self.ss_active_directory.copy()
|
||||
updated = self.ss_active_directory.copy()
|
||||
updated['name'] = 'new name'
|
||||
updated['description'] = 'new description'
|
||||
db.security_service_get.return_value = old
|
||||
@ -203,14 +244,101 @@ class ShareApiTest(test.TestCase):
|
||||
|
||||
def test_security_service_list(self):
|
||||
db.security_service_get_all_by_project = mock.Mock(
|
||||
return_value=[self.security_service.copy()])
|
||||
return_value=[self.ss_active_directory.copy()])
|
||||
req = fakes.HTTPRequest.blank('/security_services')
|
||||
res_dict = self.controller.index(req)
|
||||
expected = {'security_services': [
|
||||
{'id': self.security_service['id'],
|
||||
'name': self.security_service['name'],
|
||||
'type': self.security_service['type'],
|
||||
'status': self.security_service['status']
|
||||
self.assertEqual(self.security_service_list_expected_resp, res_dict)
|
||||
|
||||
@mock.patch.object(db, 'share_network_get', mock.Mock())
|
||||
def test_security_service_list_filter_by_sn(self):
|
||||
sn = {
|
||||
'id': 'fake_sn_id',
|
||||
'security_services': [self.ss_active_directory, ],
|
||||
}
|
||||
]}
|
||||
self.assertEqual(res_dict, expected)
|
||||
db.share_network_get.return_value = sn
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?share_network_id=fake_sn_id')
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp, res_dict)
|
||||
db.share_network_get.assert_called_once_with(
|
||||
req.environ['manila.context'],
|
||||
sn['id'])
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all', mock.Mock())
|
||||
def test_security_services_list_all_tenants_admin_context(self):
|
||||
self.check_policy_patcher.stop()
|
||||
db.security_service_get_all.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?all_tenants=1&name=fake-name',
|
||||
use_admin_context=True)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp, res_dict)
|
||||
db.security_service_get_all.assert_called_once_with(
|
||||
req.environ['manila.context'])
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all', mock.Mock())
|
||||
def test_security_services_list_all_tenants_non_admin_context(self):
|
||||
self.check_policy_patcher.stop()
|
||||
db.security_service_get_all.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?all_tenants=1')
|
||||
self.assertRaises(exception.PolicyNotAuthorized, self.controller.index,
|
||||
req)
|
||||
self.assertFalse(db.security_service_get_all.called)
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock())
|
||||
def test_security_services_list_admin_context_invalid_opts(self):
|
||||
db.security_service_get_all_by_project.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?fake_opt=fake_value',
|
||||
use_admin_context=True)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual({'security_services': []}, res_dict)
|
||||
db.security_service_get_all_by_project.assert_called_once_with(
|
||||
req.environ['manila.context'],
|
||||
req.environ['manila.context'].project_id)
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock())
|
||||
def test_security_service_list_all_filter_opts_separately(self):
|
||||
db.security_service_get_all_by_project.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
for opt, val in self.valid_search_opts.items():
|
||||
for use_admin_context in [True, False]:
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?' + opt + '=' + val,
|
||||
use_admin_context=use_admin_context)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp,
|
||||
res_dict)
|
||||
db.security_service_get_all_by_project.assert_called_with(
|
||||
req.environ['manila.context'],
|
||||
req.environ['manila.context'].project_id)
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock())
|
||||
def test_security_service_list_all_filter_opts(self):
|
||||
db.security_service_get_all_by_project.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
query_string = '/security-services?' + parse.urlencode(sorted(
|
||||
[(k, v) for (k, v) in list(self.valid_search_opts.items())]))
|
||||
for use_admin_context in [True, False]:
|
||||
req = fakes.HTTPRequest.blank(query_string,
|
||||
use_admin_context=use_admin_context)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp,
|
||||
res_dict)
|
||||
db.security_service_get_all_by_project.assert_called_with(
|
||||
req.environ['manila.context'],
|
||||
req.environ['manila.context'].project_id)
|
||||
|
@ -35,5 +35,8 @@
|
||||
"share_extension:types_manage": [],
|
||||
"share_extension:types_extra_specs": [],
|
||||
|
||||
"security_service:index": [],
|
||||
"security_service:get_all_security_services": [["rule:admin_api"]],
|
||||
|
||||
"limits_extension:used_limits": []
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user