Add small tests for unit testing
implements blueprint unit-testing Change-Id: I31cd863fed15b7f4110bc31d3535beae7bb7d168
This commit is contained in:
parent
786806e5d3
commit
41bef2aa44
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
*.pyc
|
||||||
|
*.egg*
|
||||||
|
.coverage
|
||||||
|
.testrepository
|
||||||
|
.tox
|
8
.testr.conf
Normal file
8
.testr.conf
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
[DEFAULT]
|
||||||
|
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
|
||||||
|
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
|
||||||
|
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
|
||||||
|
OS_LOG_CAPTURE=${OS_LOG_CAPTURE:-1} \
|
||||||
|
${PYTHON:-python} -m subunit.run discover -t ./ ./meteos/tests $LISTOPT $IDOPTION
|
||||||
|
test_id_option=--load-list $IDFILE
|
||||||
|
test_list_option=--list
|
49
meteos/tests/conf_fixture.py
Normal file
49
meteos/tests/conf_fixture.py
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
# Copyright 2010 United States Government as represented by the
|
||||||
|
# Administrator of the National Aeronautics and Space Administration.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from oslo_policy import opts
|
||||||
|
|
||||||
|
from meteos.common import config
|
||||||
|
|
||||||
|
CONF = config.CONF
|
||||||
|
|
||||||
|
|
||||||
|
def set_defaults(conf):
|
||||||
|
_safe_set_of_opts(conf, 'verbose', True)
|
||||||
|
_safe_set_of_opts(conf, 'state_path', os.path.abspath(
|
||||||
|
os.path.join(os.path.dirname(__file__),
|
||||||
|
'..',
|
||||||
|
'..')))
|
||||||
|
_safe_set_of_opts(conf, 'connection', "sqlite://", group='database')
|
||||||
|
_safe_set_of_opts(conf, 'sqlite_synchronous', False)
|
||||||
|
_POLICY_PATH = os.path.abspath(os.path.join(CONF.state_path,
|
||||||
|
'meteos/tests/policy.json'))
|
||||||
|
opts.set_defaults(conf, policy_file=_POLICY_PATH)
|
||||||
|
_safe_set_of_opts(conf, 'service_instance_user', 'fake_user')
|
||||||
|
_API_PASTE_PATH = os.path.abspath(os.path.join(CONF.state_path,
|
||||||
|
'etc/meteos/api-paste.ini'))
|
||||||
|
_safe_set_of_opts(conf, 'api_paste_config', _API_PASTE_PATH)
|
||||||
|
_safe_set_of_opts(conf, 'auth_strategy', 'noauth')
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_set_of_opts(conf, *args, **kwargs):
|
||||||
|
try:
|
||||||
|
conf.set_default(*args, **kwargs)
|
||||||
|
except config.cfg.NoSuchOptError:
|
||||||
|
# Assumed that opt is not imported and not used
|
||||||
|
pass
|
71
meteos/tests/fake_notifier.py
Normal file
71
meteos/tests/fake_notifier.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
# Copyright 2014 Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import collections
|
||||||
|
import functools
|
||||||
|
|
||||||
|
import oslo_messaging as messaging
|
||||||
|
from oslo_serialization import jsonutils
|
||||||
|
|
||||||
|
from meteos import rpc
|
||||||
|
|
||||||
|
NOTIFICATIONS = []
|
||||||
|
|
||||||
|
|
||||||
|
def reset():
|
||||||
|
del NOTIFICATIONS[:]
|
||||||
|
|
||||||
|
|
||||||
|
FakeMessage = collections.namedtuple(
|
||||||
|
'Message',
|
||||||
|
['publisher_id', 'priority', 'event_type', 'payload'],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeNotifier(object):
|
||||||
|
|
||||||
|
def __init__(self, transport, publisher_id, serializer=None):
|
||||||
|
self.transport = transport
|
||||||
|
self.publisher_id = publisher_id
|
||||||
|
for priority in ['debug', 'info', 'warn', 'error', 'critical']:
|
||||||
|
setattr(self, priority,
|
||||||
|
functools.partial(self._notify, priority.upper()))
|
||||||
|
self._serializer = serializer or messaging.serializer.NoOpSerializer()
|
||||||
|
|
||||||
|
def prepare(self, publisher_id=None):
|
||||||
|
if publisher_id is None:
|
||||||
|
publisher_id = self.publisher_id
|
||||||
|
return self.__class__(self.transport, publisher_id, self._serializer)
|
||||||
|
|
||||||
|
def _notify(self, priority, ctxt, event_type, payload):
|
||||||
|
payload = self._serializer.serialize_entity(ctxt, payload)
|
||||||
|
# NOTE(sileht): simulate the kombu serializer
|
||||||
|
# this permit to raise an exception if something have not
|
||||||
|
# been serialized correctly
|
||||||
|
jsonutils.to_primitive(payload)
|
||||||
|
msg = dict(publisher_id=self.publisher_id,
|
||||||
|
priority=priority,
|
||||||
|
event_type=event_type,
|
||||||
|
payload=payload)
|
||||||
|
NOTIFICATIONS.append(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def stub_notifier(testcase):
|
||||||
|
testcase.mock_object(messaging, 'Notifier', FakeNotifier)
|
||||||
|
if rpc.NOTIFIER:
|
||||||
|
serializer = getattr(rpc.NOTIFIER, '_serializer', None)
|
||||||
|
testcase.mock_object(rpc, 'NOTIFIER',
|
||||||
|
FakeNotifier(rpc.NOTIFIER.transport,
|
||||||
|
rpc.NOTIFIER.publisher_id,
|
||||||
|
serializer=serializer))
|
72
meteos/tests/test_context.py
Normal file
72
meteos/tests/test_context.py
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
# Copyright 2011 OpenStack LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from meteos import context
|
||||||
|
from meteos import test
|
||||||
|
|
||||||
|
|
||||||
|
class ContextTestCase(test.TestCase):
|
||||||
|
|
||||||
|
def test_request_context_elevated(self):
|
||||||
|
user_context = context.RequestContext(
|
||||||
|
'fake_user', 'fake_project', is_admin=False)
|
||||||
|
self.assertFalse(user_context.is_admin)
|
||||||
|
self.assertEqual([], user_context.roles)
|
||||||
|
admin_context = user_context.elevated()
|
||||||
|
self.assertFalse(user_context.is_admin)
|
||||||
|
self.assertTrue(admin_context.is_admin)
|
||||||
|
self.assertNotIn('admin', user_context.roles)
|
||||||
|
self.assertIn('admin', admin_context.roles)
|
||||||
|
|
||||||
|
def test_request_context_read_deleted(self):
|
||||||
|
ctxt = context.RequestContext('111',
|
||||||
|
'222',
|
||||||
|
read_deleted='yes')
|
||||||
|
self.assertEqual('yes', ctxt.read_deleted)
|
||||||
|
|
||||||
|
ctxt.read_deleted = 'no'
|
||||||
|
self.assertEqual('no', ctxt.read_deleted)
|
||||||
|
|
||||||
|
def test_request_context_read_deleted_invalid(self):
|
||||||
|
self.assertRaises(ValueError,
|
||||||
|
context.RequestContext,
|
||||||
|
'111',
|
||||||
|
'222',
|
||||||
|
read_deleted=True)
|
||||||
|
|
||||||
|
ctxt = context.RequestContext('111', '222')
|
||||||
|
self.assertRaises(ValueError,
|
||||||
|
setattr,
|
||||||
|
ctxt,
|
||||||
|
'read_deleted',
|
||||||
|
True)
|
||||||
|
|
||||||
|
def test_to_dict_works_w_missing_meteos_context_attributes(self):
|
||||||
|
meteos_context_attributes = ['user_id', 'project_id', 'read_deleted',
|
||||||
|
'remote_address', 'timestamp',
|
||||||
|
'quota_class', 'service_catalog']
|
||||||
|
ctxt = context.RequestContext('111', '222', roles=['admin', 'weasel'])
|
||||||
|
|
||||||
|
# Early in context initialization to_dict() can be triggered
|
||||||
|
# before all meteos specific context attributes have been set.
|
||||||
|
# Set up this situation here.
|
||||||
|
for attr in meteos_context_attributes:
|
||||||
|
delattr(ctxt, attr)
|
||||||
|
|
||||||
|
# We should be able to run to_dict() without getting an
|
||||||
|
# AttributeError exception
|
||||||
|
res = ctxt.to_dict()
|
||||||
|
|
||||||
|
for attr in meteos_context_attributes:
|
||||||
|
self.assertIsNone(res[attr])
|
Loading…
Reference in New Issue
Block a user