test for db/api and compass/utils

-patch1: fix pep8 style

Change-Id: Ib952cc4e46732d0f2ccf8220c16c88e8e570796d
This commit is contained in:
xichengc 2014-07-24 16:25:36 -07:00
parent 35d18ba688
commit 76436d717a
10 changed files with 1041 additions and 0 deletions

View File

@ -0,0 +1,13 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,13 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

View File

@ -0,0 +1,618 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest2
os.environ['COMPASS_IGNORE_SETTING'] = 'true'
from compass.utils import setting_wrapper as setting
from compass.db.api import database
from compass.db.api import utils
from compass.db import exception
from compass.db import models
from compass.utils import flags
from compass.utils import logsetting
reload(setting)
class TestModelQuery(unittest2.TestCase):
"""Test model query."""
def setUp(self):
super(TestModelQuery, self).setUp()
logsetting.init()
database.init('sqlite://')
def tearDown(self):
super(TestModelQuery, self).tearDown()
def test_model_query(self):
with database.session() as session:
model = models.Machine
res = utils.model_query(session, model)
self.assertIsNotNone(res)
def test_model_query_non_exist(self):
with database.session() as session:
self.assertRaises(
exception.DatabaseException,
utils.model_query,
session,
models.JSONEncoded
)
class TestModelFilter(unittest2.TestCase):
"""Test model filter"""
def setUp(self):
super(TestModelFilter, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestModelFilter, self).tearDown()
def _filter_test_dict_util(self, op, exp_name, exp_id, **kwargs):
with database.session() as session:
query = utils.model_query(session, models.Permission)
filters = {}
for key, value in kwargs.iteritems():
filters[key] = {
op: value
}
resources = utils.model_filter(query, models.Permission, **filters)
ret = [resource.to_dict() for resource in resources.all()]
expected = {
'name': exp_name,
'id': exp_id
}
return (expected, ret)
def test_filter_with_list(self):
with database.session() as session:
query = utils.model_query(session, models.Permission)
filters = {
'name': [
'list_permissions',
'list_switches'
]
}
resources = utils.model_filter(query, models.Permission, **filters)
ret = [resource.to_dict() for resource in resources.all()]
expected = [
{
'description': 'list all permissions',
'alias': 'list permissions',
'id': 1,
'name': 'list_permissions'
},
{
'description': 'list all switches',
'alias': 'list switches',
'id': 2,
'name': u'list_switches'
}
]
for i, v in enumerate(ret):
self.assertTrue(
all(item in ret[i].items() for item in expected[i].items())
)
def test_filter_with_dict_eq(self):
expected, ret = self._filter_test_dict_util(
'eq',
'list_permissions',
1,
id=1
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_lt(self):
expected, ret = self._filter_test_dict_util(
'lt',
'list_permissions',
1,
id=2
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_gt(self):
expected, ret = self._filter_test_dict_util(
'gt',
'update_clusterhost_state',
47,
id=46
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_le(self):
expected, ret = self._filter_test_dict_util(
'le',
'list_permissions',
1,
id=1
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_ge(self):
expected, ret = self._filter_test_dict_util(
'ge',
'update_clusterhost_state',
47,
id=47
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_ne(self):
expected, ret = self._filter_test_dict_util(
'ne',
'list_permissions',
1,
id=[2, 3, 4, 5, 6, 7, 8, 9]
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_startswith(self):
expected, ret = self._filter_test_dict_util(
'startswith',
'list_permissions',
1,
name='list_per'
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_endswith(self):
expected, ret = self._filter_test_dict_util(
'endswith',
'list_permissions',
1,
name='ssions'
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_like(self):
expected, ret = self._filter_test_dict_util(
'like',
'list_permissions',
1,
name='%per%'
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_between_one_table(self):
expected, ret = self._filter_test_dict_util(
'between',
'list_permissions',
1,
id=(1, 1)
)
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
def test_filter_with_dict_between_multiple_tables(self):
_, ret = self._filter_test_dict_util(
'between',
'list_permissions',
1,
id=(3, 6)
)
key_list = []
for item in ret:
for k, v in item.iteritems():
if k == 'id':
key_list.append(v)
self.assertEqual([3, 4, 5, 6], key_list)
def test_filter_with_other_type(self):
with database.session() as session:
query = utils.model_query(session, models.Permission)
filters = {
'id': 1
}
resources = utils.model_filter(query, models.Permission, **filters)
ret = [resource.to_dict() for resource in resources.all()]
expected = {
'id': 1
}
self.assertTrue(
all(item in ret[0].items() for item in expected.items())
)
class TestGetDbObject(unittest2.TestCase):
def setUp(self):
super(TestGetDbObject, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestGetDbObject, self).tearDown()
def test_get_alias(self):
with database.session() as session:
machines = utils.get_db_object(
session,
models.Permission,
name='list_machines'
)
expected = 'list machines'
self.assertEqual(expected, machines.alias)
self.assertEqual(expected, machines.description)
def test_get_none_with_flag_off(self):
with database.session() as session:
dummy = utils.get_db_object(
session,
models.Permission,
False,
name='dummy'
)
self.assertEqual(None, dummy)
def test_get_none_with_flag_on(self):
with self.assertRaises(exception.RecordNotExists):
with database.session() as session:
utils.get_db_object(
session,
models.Permission,
name='dummy'
)
class TestAddDbObject(unittest2.TestCase):
def setUp(self):
super(TestAddDbObject, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestAddDbObject, self).tearDown()
def test_add_alias(self):
with database.session() as session:
db_objs = utils.add_db_object(
session,
models.Permission,
True,
'test',
alias='test'
)
expected = 'test'
self.assertEqual(expected, db_objs.alias)
def test_add_nothing(self):
with database.session() as session:
db_objs = utils.add_db_object(
session,
models.Permission,
True,
'test'
)
self.assertEqual('test', db_objs.name)
self.assertIsNone(db_objs.alias)
def test_add_duplicate_with_flag(self):
with self.assertRaises(exception.DuplicatedRecord):
with database.session() as session:
utils.add_db_object(
session,
models.Permission,
True,
'test',
alias='test'
)
utils.add_db_object(
session,
models.Permission,
True,
'test',
alias='test'
)
def test_add_duplicate_with_no_flag(self):
with database.session() as session:
db_objs = utils.add_db_object(
session,
models.Permission,
False,
'test',
alias='test'
)
duplicate = utils.add_db_object(
session,
models.Permission,
False,
'test',
alias='test'
)
self.assertEqual(duplicate, db_objs)
def test_add_with_invalid_args(self):
with self.assertRaises(exception.InvalidParameter):
with database.session() as session:
utils.add_db_object(
session,
models.Permission,
True,
'test1',
'test2',
name='test1'
)
def test_add_with_multiple_args(self):
with database.session() as session:
db_objs = utils.add_db_object(
session,
models.PackageAdapterRole,
True,
'test1',
1,
name='test1',
alias='test1'
)
self.assertEqual('test1', db_objs.alias)
class TestListDbObjects(unittest2.TestCase):
def setUp(self):
super(TestListDbObjects, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestListDbObjects, self).tearDown()
def test_list_specific_obj(self):
with database.session() as session:
db_objs = utils.list_db_objects(
session,
models.Permission,
name='list_permissions'
)
self.assertEqual(
'list permissions',
db_objs[0].alias
)
def test_list_specfic_objs(self):
with database.session() as session:
db_objs = utils.list_db_objects(
session,
models.Permission,
name=[
'list_permissions',
'list_machines'
]
)
self.assertEqual(
['list_permissions', 'list_machines'].sort(),
[obj.name for obj in db_objs].sort()
)
def test_list_none_objs(self):
with database.session() as session:
db_objs = utils.list_db_objects(
session,
models.Permission,
id=99
)
self.assertListEqual([], db_objs)
def test_list_none_table(self):
with self.assertRaises(exception.DatabaseException):
with database.session() as session:
utils.list_db_objects(
session,
models.Dummy,
)
class TestDelDbObjects(unittest2.TestCase):
def setUp(self):
super(TestDelDbObjects, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestDelDbObjects, self).tearDown()
def test_del_all_objects(self):
with database.session() as session:
utils.del_db_objects(
session,
models.Permission
)
remained = utils.list_db_objects(
session,
models.Permission
)
self.assertListEqual([], remained)
def test_del_single_object(self):
with database.session() as session:
utils.del_db_objects(
session,
models.Permission,
name='list_permissions'
)
query_deleted = utils.list_db_objects(
session,
models.Permission,
name='list_permissions'
)
self.assertListEqual([], query_deleted)
class TestUpdateDbObject(unittest2.TestCase):
def setUp(self):
super(TestUpdateDbObject, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestUpdateDbObject, self).tearDown()
def test_update_db_object(self):
with database.session() as session:
db_obj = utils.get_db_object(
session,
models.Permission,
id=1
)
updated_obj = utils.update_db_object(
session,
db_obj,
alias='updated'
)
self.assertEqual(
'updated',
updated_obj.alias
)
def test_update_db_obj_none_exist(self):
with self.assertRaises(exception.DatabaseException):
with database.session() as session:
db_obj = utils.get_db_object(
session,
models.Permission,
id=1
)
utils.updated_db_object(
session,
db_obj,
dummy='dummy'
)
class TestDelDbObject(unittest2.TestCase):
def setUp(self):
super(TestDelDbObject, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestDelDbObject, self).tearDown()
def test_del_db_object(self):
with self.assertRaises(exception.RecordNotExists):
with database.session() as session:
db_obj = utils.get_db_object(
session,
models.Permission,
id=1
)
utils.del_db_object(
session,
db_obj
)
utils.get_db_object(
session,
models.Permission,
id=1
)
class TestCheckIp(unittest2.TestCase):
def setUp(self):
super(TestCheckIp, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestCheckIp, self).tearDown()
def test_check_ip_correct(self):
ip = '10.1.1.1'
self.assertIsNone(utils.check_ip(ip))
def test_check_ip_incorrect(self):
ip = 'dummy'
self.assertRaises(
exception.InvalidParameter,
utils.check_ip,
ip
)
class TestCheckMac(unittest2.TestCase):
def setUp(self):
super(TestCheckMac, self).setUp()
logsetting.init()
database.init('sqlite://')
database.create_db()
def tearDown(self):
database.drop_db()
super(TestCheckMac, self).tearDown()
def test_check_mac_correct(self):
mac = '00:01:02:03:04:05'
self.assertIsNone(utils.check_mac(mac))
def test_check_mac_incorrect(self):
mac = '00:01'
self.assertRaises(
exception.InvalidParameter,
utils.check_mac,
mac
)
if __name__ == '__main__':
flags.init()
logsetting.init()
unittest2.main()

View File

@ -0,0 +1,13 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,15 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
TEST=True
PROD=False

View File

@ -0,0 +1,15 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
TEST=True
PROD=False

View File

@ -0,0 +1,14 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
UTIL_TEST='unittest'

View File

@ -0,0 +1,15 @@
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
TEST=True
PROD=False

View File

@ -0,0 +1,325 @@
#!/usr/bin/python
# Copyright 2014 Huawei Technologies Co. Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest2
os.environ['COMPASS_IGNORE_SETTING'] = 'true'
from compass.utils import setting_wrapper as setting
reload(setting)
from compass.utils import flags
from compass.utils import logsetting
from compass.utils import util
import datetime
class TestParseDatetime(unittest2.TestCase):
"""Test parse datetime."""
def setUp(self):
super(TestParseDatetime, self).setUp()
logsetting.init()
def tearDown(self):
super(TestParseDatetime, self).tearDown()
def test_correct_format(self):
date_time = '2014-7-10 9:10:40'
parsed_datetime = util.parse_datetime(date_time)
expected_datetime = datetime.datetime(2014, 7, 10, 9, 10, 40)
self.assertEqual(parsed_datetime, expected_datetime)
class TestParseDatetimeRange(unittest2.TestCase):
"""Test parse datetime range."""
def setUp(self):
super(TestParseDatetimeRange, self).setUp()
logsetting.init()
def tearDown(self):
super(TestParseDatetimeRange, self).tearDown()
def test_both_start_end_datetime(self):
range = '2014-7-10 9:10:40,2014-7-11 9:10:40'
parsed_datetime_range = util.parse_datetime_range(range)
expected_datetime_range = (
datetime.datetime(2014, 7, 10, 9, 10, 40),
datetime.datetime(2014, 7, 11, 9, 10, 40)
)
self.assertEqual(expected_datetime_range, parsed_datetime_range)
def test_none_start(self):
range = ',2014-7-11 9:10:40'
parsed_datetime_range = util.parse_datetime_range(range)
expected_datetime_range = (
None,
datetime.datetime(2014, 7, 11, 9, 10, 40)
)
self.assertEqual(expected_datetime_range, parsed_datetime_range)
def test_none_end(self):
range = '2014-7-10 9:10:40,'
parsed_datetime_range = util.parse_datetime_range(range)
expected_datetime_range = (
datetime.datetime(2014, 7, 10, 9, 10, 40),
None
)
self.assertEqual(expected_datetime_range, parsed_datetime_range)
def test_none_both(self):
range = ','
parsed_datetime_range = util.parse_datetime_range(range)
expected_datetime_range = (None, None)
self.assertEqual(expected_datetime_range, parsed_datetime_range)
class TestParseRequestArgDict(unittest2.TestCase):
"""Test parse request arg dict."""
def setUp(self):
super(TestParseRequestArgDict, self).setUp()
logsetting.init()
def tearDown(self):
super(TestParseRequestArgDict, self).tearDown()
def test_single_pair(self):
arg = 'a=b'
parsed_arg_dict = util.parse_request_arg_dict(arg)
expected_arg_dict = {'a': 'b'}
self.assertEqual(parsed_arg_dict, expected_arg_dict)
class TestMergeDict(unittest2.TestCase):
"""Test merge dict."""
def setUp(self):
super(TestMergeDict, self).setUp()
logsetting.init()
def tearDown(self):
super(TestMergeDict, self).tearDown()
def test_single_merge(self):
lhs = {1: 1}
rhs = {2: 2}
merged_dict = util.merge_dict(lhs, rhs)
expected_dict = {1: 1, 2: 2}
self.assertEqual(merged_dict, expected_dict)
def test_recursive_merge(self):
lhs = {1: {2: 3}}
rhs = {1: {3: 4}}
merged_dict = util.merge_dict(lhs, rhs)
expected_dict = {1: {2: 3, 3: 4}}
self.assertEqual(merged_dict, expected_dict)
def test_single_merge_override(self):
lhs = {1: 1}
rhs = {1: 2}
merged_dict = util.merge_dict(lhs, rhs)
expected_dict = {1: 2}
self.assertEqual(merged_dict, expected_dict)
def test_recursive_merge_override(self):
lhs = {1: {2: 3, 3: 5}}
rhs = {1: {2: 4, 4: 6}}
merged_dict = util.merge_dict(lhs, rhs)
expected_dict = {1: {2: 4, 3: 5, 4: 6}}
self.assertEqual(merged_dict, expected_dict)
def test_single_merge_no_override(self):
lhs = {1: 1}
rhs = {1: 2}
merged_dict = util.merge_dict(lhs, rhs, False)
expected_dict = {1: 1}
self.assertEqual(merged_dict, expected_dict)
def test_recursive_merge_no_override(self):
lhs = {1: {2: 3, 3: 5}}
rhs = {1: {2: 4, 4: 6}}
merged_dict = util.merge_dict(lhs, rhs, False)
expected_dict = {1: {2: 3, 3: 5, 4: 6}}
self.assertEqual(merged_dict, expected_dict)
def test_merge_dict_with_list(self):
lhs = {1: {2: 3}}
rhs = {1: {3: [4, 5, 6]}}
merged_dict = util.merge_dict(lhs, rhs)
expected_dict = {1: {2: 3, 3: [4, 5, 6]}}
self.assertEqual(merged_dict, expected_dict)
def test_inputs_not_dict(self):
"""test inputs not dict."""
lhs = [1, 2, 3]
rhs = {1: 2}
merged = util.merge_dict(lhs, rhs)
expected = {1: 2}
self.assertEqual(merged, expected)
lhs = [1, 2, 3]
rhs = {1: 2}
merged = util.merge_dict(lhs, rhs, False)
expected = [1, 2, 3]
self.assertEqual(merged, expected)
lhs = {1: 2}
rhs = [1, 2, 3]
merged = util.merge_dict(lhs, rhs)
expected = [1, 2, 3]
self.assertEqual(merged, expected)
lhs = {1: 2}
rhs = [1, 2, 3]
merged = util.merge_dict(lhs, rhs, False)
expected = {1: 2}
self.assertEqual(merged, expected)
class TestEncrypt(unittest2.TestCase):
"""Test encrypt."""
def setUp(self):
super(TestEncrypt, self).setUp()
logsetting.init()
def tearDown(self):
super(TestEncrypt, self).tearDown()
class TestParseTimeInterval(unittest2.TestCase):
"""Test parse time interval."""
def setUp(self):
super(TestParseTimeInterval, self).setUp()
logsetting.init()
def tearDown(self):
super(TestParseTimeInterval, self).tearDown()
class TestLoadConfigs(unittest2.TestCase):
"""Test load configs."""
def setUp(self):
super(TestLoadConfigs, self).setUp()
self.TEST_UTIL_HOME = os.path.dirname(os.path.realpath(__file__))
logsetting.init()
def tearDown(self):
super(TestLoadConfigs, self).tearDown()
def test_load_no_suffix(self):
loaded = util.load_configs(
self.TEST_UTIL_HOME + '/data/test_no_suffix'
)
expected = []
self.assertEqual(loaded, expected)
def test_load_conf(self):
loaded = util.load_configs(
self.TEST_UTIL_HOME + '/data/test_load_conf'
)
expected = [{'TEST': True, 'PROD': False}]
self.assertEqual(loaded, expected)
def test_load_confs(self):
loaded = util.load_configs(
self.TEST_UTIL_HOME + '/data/test_load_confs'
)
expected = [
{
'TEST': True,
'PROD': False
},
{
'UTIL_TEST': 'unittest'
}
]
loaded.sort()
expected.sort()
self.assertTrue(loaded, expected)
def test_load_confs_global_env(self):
loaded = util.load_configs(
self.TEST_UTIL_HOME + '/data/test_load_confs',
env_globals={'TEST': False}
)
expected = [
{
'UTIL_TEST': 'unittest'
},
{
'TEST': True,
'PROD': False
}
]
loaded.sort()
expected.sort()
self.assertTrue(loaded, expected)
def test_load_confs_local_env(self):
loaded = util.load_configs(
self.TEST_UTIL_HOME + '/data/test_load_confs',
env_globals={'TEST': True}
)
expected = [
{
'TEST': True,
'UTIL_TEST': 'unittest'
},
{
'TEST': True,
'PROD': False
}]
loaded.sort()
expected.sort()
self.assertTrue(loaded, expected)
def test_load_confs_local_env_no_override(self):
loaded = util.load_configs(
self.TEST_UTIL_HOME + '/data/test_load_confs',
env_globals={'TEST': False}
)
expected = [
{
'TEST': False,
'UTIL_TEST': 'unittest'
},
{
'TEST': True,
'PROD': False
}
]
loaded.sort()
expected.sort()
self.assertTrue(loaded, expected)
def test_load_conf_error(self):
err_dir = 'non-exist/dir'
loaded = util.load_configs(err_dir)
self.assertEqual([], loaded)
if __name__ == '__main__':
flags.init()
logsetting.init()
unittest2.main()