Merge "Increase unit test coverage."

This commit is contained in:
Jenkins 2016-09-19 23:32:03 +00:00 committed by Gerrit Code Review
commit 0a39637709
8 changed files with 584 additions and 2 deletions

View File

@ -232,6 +232,14 @@ class ControllerTest(object):
return self._data_request(path, data, content_type, method='PUT',
params=params, user=user, tenant=tenant)
def _patch(self, path, data,
content_type='application/murano-packages-json-patch',
params=None, user=DEFAULT_USER, tenant=DEFAULT_TENANT):
if params is None:
params = {}
return self._data_request(path, data, content_type, method='PATCH',
params=params, user=user, tenant=tenant)
def _set_policy_rules(self, rules):
policy.set_rules(rules, default_rule='default')

View File

@ -0,0 +1,45 @@
# Copyright 2016 AT&T Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from murano.api.middleware import context
from murano.tests.unit import base
from oslo_config import cfg
CONF = cfg.CONF
class MiddlewareContextTest(base.MuranoTestCase):
def test_middleware_context_default(self):
middleware = context.ContextMiddleware(None)
request_headers = {
'X-Roles': 'admin',
'X-User-Id': "",
'X-Tenant-Id': "",
'X-Configuration-Session': "",
}
request = webob.Request.blank('/environments',
headers=request_headers)
self.assertFalse(hasattr(request, 'context'))
middleware.process_request(request)
self.assertTrue(hasattr(request, 'context'))
def test_factory_returns_filter(self):
middleware = context.ContextMiddleware(None)
result = middleware.factory(CONF)
self.assertNotEqual(None, result)

View File

@ -0,0 +1,78 @@
# Copyright 2016 AT&T Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import sys
import unittest
import webob
from keystoneauth1 import exceptions
from murano.api.middleware import ext_context
from murano.tests.unit import base
from oslo_config import cfg
from oslo_serialization import base64
CONF = cfg.CONF
class MiddlewareExtContextTest(base.MuranoTestCase):
@unittest.skipIf(sys.version_info > (2, 7),
'Skip until bug/1625219 resolved')
def test_middleware_ext_context_default(self):
middleware = ext_context.ExternalContextMiddleware(None)
middleware.get_keystone_token = mock.MagicMock(return_value="token?")
auth = 'Basic {}'.format(base64.encode_as_text(b'test:test'))
request_headers = {
'Authorization': auth,
}
request = webob.Request.blank('/environments',
headers=request_headers)
middleware.process_request(request)
self.assertEqual(request.headers.get('X-Auth-Token'), "token?")
@unittest.skipIf(sys.version_info > (2, 7),
'Skip until bug/1625219 resolved')
def test_middleware_ext_context_except_key_error(self):
middleware = ext_context.ExternalContextMiddleware(None)
middleware.get_keystone_token = mock.MagicMock(
side_effect=KeyError('test key error')
)
auth = 'Basic {}'.format(base64.encode_as_text(b'test:test'))
request_headers = {
'Authorization': auth,
}
request = webob.Request.blank('/environments',
headers=request_headers)
self.assertRaises(webob.exc.HTTPUnauthorized,
middleware.process_request, request)
@unittest.skipIf(sys.version_info > (2, 7),
'Skip until bug/1625219 resolved')
def test_middleware_ext_context_except_unauthorized(self):
middleware = ext_context.ExternalContextMiddleware(None)
middleware.get_keystone_token = mock.MagicMock(
side_effect=exceptions.Unauthorized('')
)
auth = 'Basic {}'.format(base64.encode_as_text(b'test:test'))
request_headers = {
'Authorization': auth,
}
request = webob.Request.blank('/environments',
headers=request_headers)
self.assertRaises(webob.exc.HTTPUnauthorized,
middleware.process_request, request)

View File

@ -0,0 +1,104 @@
# Copyright 2016 AT&T Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from murano.api.middleware import fault
from murano.common import wsgi
from murano.packages import exceptions
from murano.tests.unit import base
from oslo_config import cfg
from oslo_serialization import jsonutils
from webob import exc
CONF = cfg.CONF
class FaultWrapperTest(base.MuranoTestCase):
@mock.patch('traceback.format_exc')
def test_error_500(self, mock_trace):
mock_trace.return_value = "test trace"
fault_wrapper = fault.FaultWrapper(None)
result = fault_wrapper._error(exc.HTTPInternalServerError())
self.assertEqual(result['code'], 500)
self.assertEqual(result['explanation'],
'The server has either erred or is incapable'
' of performing the requested operation.')
@mock.patch('traceback.format_exc')
def test_error_value_error(self, mock_trace):
mock_trace.return_value = "test trace"
fault_wrapper = fault.FaultWrapper(None)
exception = exceptions.PackageClassLoadError("test")
exception.message = "Unable to load class 'test' from package"
result = fault_wrapper._error(exception)
self.assertEqual(result['code'], 400)
self.assertEqual(result['error']['message'],
"Unable to load class 'test' from package")
@mock.patch('traceback.format_exc')
def test_fault_wrapper(self, mock_trace):
mock_trace.return_value = "test trace"
fault_wrapper = fault.FaultWrapper(None)
exception_disguise = fault.HTTPExceptionDisguise(
exc.HTTPInternalServerError())
result = fault_wrapper._error(exception_disguise)
self.assertEqual(result['code'], 500)
self.assertEqual(result['explanation'],
'The server has either erred or is incapable'
' of performing the requested operation.')
def test_process_request(self):
fault_wrapper = fault.FaultWrapper(None)
environ = {
'SERVER_NAME': 'server.test',
'SERVER_PORT': '8082',
'SERVER_PROTOCOL': 'http',
'SCRIPT_NAME': '/',
'PATH_INFO': '/asdf/asdf/asdf/asdf',
'wsgi.url_scheme': 'http',
'QUERY_STRING': '',
'CONTENT_TYPE': 'application/json',
'REQUEST_METHOD': 'HEAD'
}
req = wsgi.Request(environ)
req.get_response = mock.MagicMock(side_effect=exc.
HTTPInternalServerError())
self.assertRaises(exc.HTTPInternalServerError,
fault_wrapper.process_request, req)
@mock.patch('traceback.format_exc')
def test_fault_call(self, mock_trace):
mock_trace.return_value = "test trace"
fault_wrapper = fault.FaultWrapper(None)
exception = exceptions.PackageClassLoadError("test")
exception.message = "Unable to load class 'test' from package"
test_fault = fault.Fault(fault_wrapper._error(exception))
environ = {
'SERVER_NAME': 'server.test',
'SERVER_PORT': '8082',
'SERVER_PROTOCOL': 'http',
'SCRIPT_NAME': '/',
'PATH_INFO': '/',
'wsgi.url_scheme': 'http',
'QUERY_STRING': '',
'CONTENT_TYPE': 'application/json',
'REQUEST_METHOD': 'HEAD'
}
req = wsgi.Request(environ)
response = jsonutils.loads(test_fault(req).body)
self.assertEqual(response['code'], 400)

View File

@ -0,0 +1,35 @@
# Copyright 2016 AT&T Corp
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from murano.api import versions
from murano.api.middleware import version_negotiation
from murano.tests.unit import base
from oslo_config import cfg
CONF = cfg.CONF
class MiddlewareVersionNegotiationTest(base.MuranoTestCase):
def test_middleware_version_negotiation_default(self):
middleware_vn = version_negotiation.VersionNegotiationFilter(None)
request = webob.Request.blank('/environments')
result = middleware_vn.process_request(request)
self.assertTrue(isinstance(result, versions.Controller))

View File

@ -1,5 +1,5 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -24,6 +24,7 @@ from oslo_utils import timeutils
import six
from six.moves import cStringIO
from six.moves import range
from webob import exc
from murano.api.v1 import catalog
from murano.db.catalog import api as db_catalog_api
@ -538,6 +539,109 @@ class TestCatalogApi(test_base.ControllerTest, test_base.MuranoApiTestCase):
}
return pkg, package
def test_modify_package_tags(self):
self._set_policy_rules(
{'modify_package': '',
'manage_public_package': '',
'publicize_package': ''}
)
saved_package = self._add_pkg('test_tenant')
saved_package_pub = self._add_pkg('test_tenant', public=True)
self.expect_policy_check('modify_package',
{'package_id': saved_package.id})
url = '/v1/catalog/packages/' + str(saved_package.id)
data = []
data.append({'op': 'add', 'path': ['tags'], 'value': ["foo", "bar"]})
req = self._patch(url, jsonutils.dump_as_bytes(data))
result = self.controller.update(req, data, saved_package.id)
self.assertIn('foo', result['tags'])
self.assertIn('bar', result['tags'])
self.expect_policy_check('modify_package',
{'package_id': saved_package_pub.id})
self.expect_policy_check('manage_public_package', {})
url = '/v1/catalog/packages/' + str(saved_package_pub.id)
data = []
data.append({'op': 'add', 'path': ['tags'], 'value': ["foo", "bar"]})
req = self._patch(url, jsonutils.dump_as_bytes(data))
result = self.controller.update(req, data, saved_package_pub.id)
self.assertIn('foo', result['tags'])
self.assertIn('bar', result['tags'])
def test_modify_package_name(self):
self._set_policy_rules(
{'modify_package': ''}
)
saved_package = self._add_pkg('test_tenant')
self.expect_policy_check('modify_package',
{'package_id': saved_package.id})
url = '/v1/catalog/packages/' + str(saved_package.id)
data = []
data.append({'op': 'replace', 'path': ['name'], 'value': 'test_name'})
req = self._patch(url, jsonutils.dump_as_bytes(data))
result = self.controller.update(req, data, saved_package.id)
self.assertEqual('test_name', result['name'])
self.expect_policy_check('modify_package',
{'package_id': saved_package.id})
data = []
data.append({'op': 'replace', 'path': ['name'], 'value': 'a'*81})
req = self._patch(url, jsonutils.dump_as_bytes(data))
self.assertRaises(exc.HTTPBadRequest, self.controller.update,
req, data, saved_package.id)
def test_modify_package_no_body(self):
self._set_policy_rules(
{'modify_package': ''}
)
saved_package = self._add_pkg('test_tenant')
self.expect_policy_check('modify_package',
{'package_id': saved_package.id})
url = '/v1/catalog/packages/' + str(saved_package.id)
req = self._patch(url, jsonutils.dump_as_bytes(None))
self.assertRaises(exc.HTTPBadRequest, self.controller.update,
req, None, saved_package.id)
def test_modify_package_is_public(self):
self._set_policy_rules(
{'modify_package': '',
'manage_public_package': '',
'publicize_package': ''}
)
saved_package = self._add_pkg('test_tenant')
url = '/v1/catalog/packages/' + str(saved_package.id)
self.expect_policy_check('modify_package',
{'package_id': saved_package.id})
self.expect_policy_check('publicize_package', {})
data = []
data.append({'op': 'replace', 'path': ['is_public'], 'value': True})
req = self._patch(url, jsonutils.dump_as_bytes(data))
result = self.controller.update(req, data, saved_package.id)
self.assertTrue(result['is_public'])
def test_modify_package_bad_content_type(self):
self._set_policy_rules(
{'modify_package': '',
'manage_public_package': '',
'publicize_package': ''}
)
saved_package = self._add_pkg('test_tenant')
url = '/v1/catalog/packages/' + str(saved_package.id)
self.expect_policy_check('modify_package',
{'package_id': saved_package.id})
self.expect_policy_check('publicize_package', {})
data = []
data.append({'op': 'replace', 'path': ['is_public'], 'value': True})
req = self._patch(url, jsonutils.dump_as_bytes(data))
result = self.controller.update(req, data, saved_package.id)
self.assertTrue(result['is_public'])
def test_not_valid_logo(self):
self.assertRaises(exceptions.PackageLoadError,
self._test_package, 'manifest_with_broken_logo.yaml')

View File

@ -1,4 +1,4 @@
# coding: utf-8
#
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -19,6 +19,7 @@ from oslo_serialization import jsonutils
from oslo_utils import timeutils
from murano.api.v1 import environments
from murano.api.v1 import sessions
from murano.db import models
from murano.services import states
import murano.tests.unit.api.base as tb
@ -29,6 +30,7 @@ class TestEnvironmentApi(tb.ControllerTest, tb.MuranoApiTestCase):
def setUp(self):
super(TestEnvironmentApi, self).setUp()
self.controller = environments.Controller()
self.sessions_controller = sessions.Controller()
self.fixture = self.useFixture(config_fixture.Config())
self.fixture.conf(args=[])
@ -195,6 +197,29 @@ class TestEnvironmentApi(tb.ControllerTest, tb.MuranoApiTestCase):
'is either malformed or otherwise incorrect.',
result_msg)
def test_create_environment_duplicate_name(self):
"""Check that duplicate names results in HTTPConflict"""
self._set_policy_rules(
{'list_environments': '@',
'create_environment': '@',
'show_environment': '@'}
)
self.expect_policy_check('create_environment')
body = {'name': u'my_env_dup'}
req = self._post('/environments', jsonutils.dump_as_bytes(body))
result = req.get_response(self.api)
self.assertEqual(200, result.status_code)
self.expect_policy_check('create_environment')
body = {'name': u'my_env_dup'}
req = self._post('/environments', jsonutils.dump_as_bytes(body))
result = req.get_response(self.api)
self.assertEqual(409, result.status_code)
result_msg = result.text.replace('\n', '')
self.assertIn('Environment with specified name already exists',
result_msg)
def test_missing_environment(self):
"""Check that a missing environment results in an HTTPNotFound.
@ -475,3 +500,87 @@ class TestEnvironmentApi(tb.ControllerTest, tb.MuranoApiTestCase):
result = req.get_response(self.api)
return result
def test_last_status_session(self):
CREDENTIALS = {'tenant': 'test_tenant_1', 'user': 'test_user_1'}
self._set_policy_rules({'create_environment': '@'})
self.expect_policy_check('create_environment')
# Create environment
request = self._post(
'/environments',
jsonutils.dump_as_bytes({'name': 'test_environment_1'}),
**CREDENTIALS
)
response_body = jsonutils.loads(request.get_response(self.api).body)
self.assertEqual(CREDENTIALS['tenant'],
response_body['tenant_id'])
ENVIRONMENT_ID = response_body['id']
# Create session
request = self._post(
'/environments/{environment_id}/configure'
.format(environment_id=ENVIRONMENT_ID),
b'',
**CREDENTIALS
)
response_body = jsonutils.loads(request.get_response(self.api).body)
SESSION_ID = response_body['id']
# Test getting last status doesn't error
request = self._get(
'/environments/{environment_id}/lastStatus'
.format(environment_id=ENVIRONMENT_ID),
**CREDENTIALS
)
request.headers['X-Configuration-Session'] = str(SESSION_ID)
request.context.session = SESSION_ID
response_body = jsonutils.loads(request.get_response(self.api).body)
self.assertIsNot(response_body, None)
def test_show_environments_session(self):
CREDENTIALS = {'tenant': 'test_tenant_1', 'user': 'test_user_1'}
self._set_policy_rules(
{'create_environment': '@',
'list_environments': '@',
'show_environment': '@'}
)
self.expect_policy_check('create_environment')
# Create environment
request = self._post(
'/environments',
jsonutils.dump_as_bytes({'name': 'test_environment_1'}),
**CREDENTIALS
)
response_body = jsonutils.loads(request.get_response(self.api).body)
self.assertEqual(CREDENTIALS['tenant'],
response_body['tenant_id'])
ENVIRONMENT_ID = response_body['id']
# Create session
self.expect_policy_check(
'show_environment', {'environment_id': ENVIRONMENT_ID})
request = self._post(
'/environments/{environment_id}/configure'
.format(environment_id=ENVIRONMENT_ID),
b'',
**CREDENTIALS
)
response_body = jsonutils.loads(request.get_response(self.api).body)
SESSION_ID = response_body['id']
# Show the environment and test that it is correct.
request = self._get(
'/environments/{environment_id}'
.format(environment_id=ENVIRONMENT_ID),
**CREDENTIALS
)
request.headers['X-Configuration-Session'] = str(SESSION_ID)
request.context.session = SESSION_ID
response_body = jsonutils.loads(request.get_response(self.api).body)
self.assertEqual(ENVIRONMENT_ID, response_body['id'])

View File

@ -82,3 +82,102 @@ class TestSessionsApi(tb.ControllerTest, tb.MuranoApiTestCase):
# Should be forbidden!
self.assertEqual(403, response.status_code)
def test_session_show(self):
CREDENTIALS_1 = {'tenant': 'test_tenant_1', 'user': 'test_user_1'}
CREDENTIALS_2 = {'tenant': 'test_tenant_2', 'user': 'test_user_2'}
self._set_policy_rules(
{'create_environment': '@'}
)
self.expect_policy_check('create_environment')
# Create environment for user #1
request = self._post(
'/environments',
jsonutils.dump_as_bytes({'name': 'test_environment_1'}),
**CREDENTIALS_1
)
response_body = jsonutils.loads(request.get_response(self.api).body)
self.assertEqual(CREDENTIALS_1['tenant'],
response_body['tenant_id'])
ENVIRONMENT_ID = response_body['id']
# Create session of user #1
request = self._post(
'/environments/{environment_id}/configure'
.format(environment_id=ENVIRONMENT_ID),
b'',
**CREDENTIALS_1
)
response_body = jsonutils.loads(request.get_response(self.api).body)
SESSION_ID = response_body['id']
# Show environment with correct credentials
request = self._get(
'/environments/{environment_id}/sessions/{session_id}'
.format(environment_id=ENVIRONMENT_ID, session_id=SESSION_ID),
b'',
**CREDENTIALS_1
)
response_body = jsonutils.loads(request.get_response(self.api).body)
self.assertEqual(SESSION_ID, response_body['id'])
# Show environment with incorrect credentials
request = self._get(
'/environments/{environment_id}/sessions/{session_id}'
.format(environment_id=ENVIRONMENT_ID, session_id=SESSION_ID),
b'',
**CREDENTIALS_2
)
response = request.get_response(self.api)
self.assertEqual(403, response.status_code)
def test_session_delete(self):
CREDENTIALS = {'tenant': 'test_tenant_1', 'user': 'test_user_1'}
self._set_policy_rules(
{'create_environment': '@'}
)
self.expect_policy_check('create_environment')
# Create environment
request = self._post(
'/environments',
jsonutils.dump_as_bytes({'name': 'test_environment_1'}),
**CREDENTIALS
)
response_body = jsonutils.loads(request.get_response(self.api).body)
self.assertEqual(CREDENTIALS['tenant'],
response_body['tenant_id'])
ENVIRONMENT_ID = response_body['id']
# Create session
request = self._post(
'/environments/{environment_id}/configure'
.format(environment_id=ENVIRONMENT_ID),
b'',
**CREDENTIALS
)
response_body = jsonutils.loads(request.get_response(self.api).body)
SESSION_ID = response_body['id']
# Delete session
request = self._delete(
'/environments/{environment_id}/delete/{session_id}'
.format(environment_id=ENVIRONMENT_ID, session_id=SESSION_ID),
b'',
**CREDENTIALS
)
response = self.sessions_controller.delete(
request, ENVIRONMENT_ID, SESSION_ID)
# Make sure the session was deleted
request = self._get(
'/environments/{environment_id}/sessions/{session_id}'
.format(environment_id=ENVIRONMENT_ID, session_id=SESSION_ID),
b'',
**CREDENTIALS
)
response = request.get_response(self.api)
self.assertEqual(404, response.status_code)