Merge "Add API unit tests"

This commit is contained in:
Jenkins
2014-06-11 11:53:10 +00:00
committed by Gerrit Code Review
6 changed files with 373 additions and 0 deletions

View File

@@ -46,6 +46,10 @@ def get_session(autocommit=True, expire_on_commit=False):
return s
def get_engine():
return _create_facade_lazily().get_engine()
def db_sync():
repo_path = os.path.abspath(os.path.dirname(migrate_repo.__file__))
try:

View File

166
murano/tests/api/base.py Normal file
View File

@@ -0,0 +1,166 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fixtures
import logging
import mock
from oslo.config import cfg
import testtools
import urllib
import webob
from murano.api.v1 import request_statistics
from murano.common import rpc
from murano.openstack.common import timeutils
from murano.openstack.common import wsgi
from murano.tests import utils
TEST_DEFAULT_LOGLEVELS = {'migrate': logging.WARN, 'sqlalchemy': logging.WARN}
def test_with_middleware(self, middleware, func, req, *args, **kwargs):
@webob.dec.wsgify
def _app(req):
return func(req, *args, **kwargs)
resp = middleware(_app).process_request(req)
return resp
class FakeLogMixin:
"""Allow logs to be tested (rather than just disabling
logging. This is taken from heat
"""
def setup_logging(self):
# Assign default logs to self.LOG so we can still
# assert on heat logs.
self.LOG = self.useFixture(
fixtures.FakeLogger(level=logging.DEBUG))
base_list = set([nlog.split('.')[0]
for nlog in logging.Logger.manager.loggerDict])
for base in base_list:
if base in TEST_DEFAULT_LOGLEVELS:
self.useFixture(fixtures.FakeLogger(
level=TEST_DEFAULT_LOGLEVELS[base],
name=base))
elif base != 'murano':
self.useFixture(fixtures.FakeLogger(
name=base))
class MuranoTestCase(testtools.TestCase, FakeLogMixin):
# Set this if common.rpc is imported into other scopes so that
# it can be mocked properly
RPC_IMPORT = 'murano.common.rpc'
def setUp(self):
super(MuranoTestCase, self).setUp()
self.setup_logging()
# Mock the RPC classes
self.mock_api_rpc = mock.Mock(rpc.ApiClient)
self.mock_engine_rpc = mock.Mock(rpc.EngineClient)
mock.patch(self.RPC_IMPORT + '.engine',
return_value=self.mock_engine_rpc).start()
mock.patch(self.RPC_IMPORT + '.api',
return_value=self.mock_api_rpc).start()
self.addCleanup(mock.patch.stopall)
self.addCleanup(cfg.CONF.reset)
utils.setup_dummy_db()
self.addCleanup(utils.reset_dummy_db)
def tearDown(self):
super(MuranoTestCase, self).tearDown()
timeutils.utcnow.override_time = None
def _stub_uuid(self, values=[]):
class FakeUUID:
def __init__(self, v):
self.hex = v
mock_uuid4 = mock.patch('uuid.uuid4').start()
mock_uuid4.side_effect = [FakeUUID(v) for v in values]
return mock_uuid4
class ControllerTest(object):
"""
Common utilities for testing API Controllers.
"""
def __init__(self, *args, **kwargs):
super(ControllerTest, self).__init__(*args, **kwargs)
#cfg.CONF.set_default('host', 'server.test')
self.api_version = '1.0'
self.tenant = 'test_tenant'
self.mock_enforce = None
request_statistics.init_stats()
def _environ(self, path):
return {
'SERVER_NAME': 'server.test',
'SERVER_PORT': 8082,
'SCRIPT_NAME': '/v1',
'PATH_INFO': path,
'wsgi.url_scheme': 'http',
}
def _simple_request(self, path, params=None, method='GET'):
"""Returns a request with a fake but valid-looking context
and sets the request environment variables. If `params` is given,
it should be a dictionary or sequence of tuples.
"""
environ = self._environ(path)
environ['REQUEST_METHOD'] = method
if params:
qs = urllib.urlencode(params)
environ['QUERY_STRING'] = qs
req = wsgi.Request(environ)
req.context = utils.dummy_context('api_test_user', self.tenant)
self.context = req.context
return req
def _get(self, path, params=None):
return self._simple_request(path, params=params)
def _delete(self, path):
return self._simple_request(path, method='DELETE')
def _data_request(self, path, data, content_type='application/json',
method='POST'):
environ = self._environ(path)
environ['REQUEST_METHOD'] = method
req = wsgi.Request(environ)
req.context = utils.dummy_context('api_test_user', self.tenant)
self.context = req.context
req.body = data
return req
def _post(self, path, data, content_type='application/json'):
return self._data_request(path, data, content_type)
def _put(self, path, data, content_type='application/json'):
return self._data_request(path, data, content_type, method='PUT')
def tearDown(self):
# TODO(sjmc7): Add policy check once it's implemented
super(ControllerTest, self).tearDown()

View File

View File

@@ -0,0 +1,157 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from webob import exc
from murano.api.v1 import environments
from murano.db import models
from murano.openstack.common import timeutils
import murano.tests.api.base as test_base
import murano.tests.utils as test_utils
class TestEnvironmentApi(test_base.ControllerTest, test_base.MuranoTestCase):
RPC_IMPORT = 'murano.db.services.environments.rpc'
def setUp(self):
super(TestEnvironmentApi, self).setUp()
self.controller = environments.Controller()
def test_list_empty_environments(self):
"""Check that with no environments an empty list is returned"""
req = self._get('/environments')
result = self.controller.index(req)
self.assertEqual({'environments': []}, result)
def test_create_environment(self):
"""Create an environment, test environment.show()"""
fake_now = timeutils.utcnow()
timeutils.utcnow.override_time = fake_now
uuids = ('env_object_id', 'network_id', 'environment_id')
mock_uuid = self._stub_uuid(uuids)
expected = {'tenant_id': self.tenant,
'id': 'environment_id',
'name': 'my_env',
'networking': {},
'version': 0,
'created': fake_now,
'updated': fake_now}
body = {'name': 'my_env'}
req = self._post('/environments', json.dumps(body))
result = self.controller.create(req, body)
self.assertEqual(expected, result)
expected['status'] = 'ready'
req = self._get('/environments')
result = self.controller.index(req)
self.assertEqual({'environments': [expected]}, result)
expected['services'] = []
req = self._get('/environments/%s' % uuids[-1])
result = self.controller.show(req, uuids[-1])
self.assertEqual(expected, result)
self.assertEqual(3, mock_uuid.call_count)
def test_missing_environment(self):
"""Check that a missing environment results in an HTTPNotFound"""
req = self._get('/environments/no-such-id')
self.assertRaises(exc.HTTPNotFound, self.controller.show,
req, 'no-such-id')
def test_update_environment(self):
"""Check that environment rename works"""
fake_now = timeutils.utcnow()
timeutils.utcnow.override_time = fake_now
expected = dict(
id='12345',
name='my-env',
version=0,
networking={},
created=fake_now,
updated=fake_now,
tenant_id=self.tenant,
description={
'Objects': {
'?': {'id': '12345'}
},
'Attributes': {}
}
)
e = models.Environment(**expected)
test_utils.save_models(e)
fake_now = timeutils.utcnow()
timeutils.utcnow.override_time = fake_now
del expected['description']
expected['services'] = []
expected['status'] = 'ready'
expected['name'] = 'renamed env'
expected['updated'] = fake_now
body = {
'name': 'renamed env'
}
req = self._post('/environments/12345', json.dumps(body))
result = self.controller.update(req, '12345', body)
req = self._get('/environments/%s' % '12345')
result = self.controller.show(req, '12345')
self.assertEqual(expected, result)
def test_delete_environment(self):
"""Test that environment deletion results in the correct rpc call"""
fake_now = timeutils.utcnow()
expected = dict(
id='12345',
name='my-env',
version=0,
networking={},
created=fake_now,
updated=fake_now,
tenant_id=self.tenant,
description={
'Objects': {
'?': {'id': '12345'}
},
'Attributes': {}
}
)
e = models.Environment(**expected)
test_utils.save_models(e)
rpc_task = {
'tenant_id': self.tenant,
'model': {'Attributes': {}, 'Objects': None},
'token': None
}
req = self._delete('/environments/12345')
result = self.controller.delete(req, '12345')
self.mock_engine_rpc.handle_task.assert_called_once_with(rpc_task)
# Should this be expected behavior?
self.assertEqual(None, result)

46
murano/tests/utils.py Normal file
View File

@@ -0,0 +1,46 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from murano import context
from murano.db import models
from murano.db import session
from murano.openstack.common.db import options
def setup_dummy_db():
options.cfg.set_defaults(options.database_opts, sqlite_synchronous=False)
options.set_defaults(sql_connection="sqlite://", sqlite_db='murano.db')
models.register_models(session.get_engine())
def reset_dummy_db():
models.unregister_models(session.get_engine())
def dummy_context(user='test_username', tenant_id='test_tenant_id',
password='password', roles=[], user_id=None):
return context.RequestContext.from_dict({
'tenant': tenant_id,
'user': user,
#'roles': roles, # Commented until policy check changes land
'is_admin': False,
})
def save_models(*models):
s = session.get_session()
for m in models:
m.save(s)