# Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import logging import sys import six import testresources from testtools import matchers from keystoneclient import exceptions from keystoneclient.tests import client_fixtures from keystoneclient.tests import utils as test_utils from keystoneclient import utils class FakeResource(object): pass class FakeManager(object): resource_class = FakeResource resources = { '1234': {'name': 'entity_one'}, '8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0': {'name': 'entity_two'}, '\xe3\x82\xbdtest': {'name': u'\u30bdtest'}, '5678': {'name': '9876'} } def get(self, resource_id): try: return self.resources[str(resource_id)] except KeyError: raise exceptions.NotFound(resource_id) def find(self, name=None): if name == '9999': # NOTE(morganfainberg): special case that raises NoUniqueMatch. raise exceptions.NoUniqueMatch() for resource_id, resource in self.resources.items(): if resource['name'] == str(name): return resource raise exceptions.NotFound(name) class FindResourceTestCase(test_utils.TestCase): def setUp(self): super(FindResourceTestCase, self).setUp() self.manager = FakeManager() def test_find_none(self): self.assertRaises(exceptions.CommandError, utils.find_resource, self.manager, 'asdf') def test_find_by_integer_id(self): output = utils.find_resource(self.manager, 1234) self.assertEqual(output, self.manager.resources['1234']) def test_find_by_str_id(self): output = utils.find_resource(self.manager, '1234') self.assertEqual(output, self.manager.resources['1234']) def test_find_by_uuid(self): uuid = '8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0' output = utils.find_resource(self.manager, uuid) self.assertEqual(output, self.manager.resources[uuid]) def test_find_by_unicode(self): name = '\xe3\x82\xbdtest' output = utils.find_resource(self.manager, name) self.assertEqual(output, self.manager.resources[name]) def test_find_by_str_name(self): output = utils.find_resource(self.manager, 'entity_one') self.assertEqual(output, self.manager.resources['1234']) def test_find_by_int_name(self): output = utils.find_resource(self.manager, 9876) self.assertEqual(output, self.manager.resources['5678']) def test_find_no_unique_match(self): self.assertRaises(exceptions.CommandError, utils.find_resource, self.manager, 9999) class FakeObject(object): def __init__(self, name): self.name = name class PrintTestCase(test_utils.TestCase): def setUp(self): super(PrintTestCase, self).setUp() self.old_stdout = sys.stdout self.stdout = six.moves.cStringIO() self.addCleanup(setattr, self, 'stdout', None) sys.stdout = self.stdout self.addCleanup(setattr, sys, 'stdout', self.old_stdout) def test_print_list_unicode(self): name = six.u('\u540d\u5b57') objs = [FakeObject(name)] # NOTE(Jeffrey4l) If the text's encode is proper, this method will not # raise UnicodeEncodeError exceptions utils.print_list(objs, ['name']) output = self.stdout.getvalue() # In Python 2, output will be bytes, while in Python 3, it will not. # Let's decode the value if needed. if isinstance(output, six.binary_type): output = output.decode('utf-8') self.assertIn(name, output) def test_print_dict_unicode(self): name = six.u('\u540d\u5b57') utils.print_dict({'name': name}) output = self.stdout.getvalue() # In Python 2, output will be bytes, while in Python 3, it will not. # Let's decode the value if needed. if isinstance(output, six.binary_type): output = output.decode('utf-8') self.assertIn(name, output) class TestPositional(test_utils.TestCase): @utils.positional(1) def no_vars(self): # positional doesn't enforce anything here return True @utils.positional(3, utils.positional.EXCEPT) def mixed_except(self, arg, kwarg1=None, kwarg2=None): # self, arg, and kwarg1 may be passed positionally return (arg, kwarg1, kwarg2) @utils.positional(3, utils.positional.WARN) def mixed_warn(self, arg, kwarg1=None, kwarg2=None): # self, arg, and kwarg1 may be passed positionally, only a warning # is emitted return (arg, kwarg1, kwarg2) def test_nothing(self): self.assertTrue(self.no_vars()) def test_mixed_except(self): self.assertEqual((1, 2, 3), self.mixed_except(1, 2, kwarg2=3)) self.assertEqual((1, 2, 3), self.mixed_except(1, kwarg1=2, kwarg2=3)) self.assertEqual((1, None, None), self.mixed_except(1)) self.assertRaises(TypeError, self.mixed_except, 1, 2, 3) def test_mixed_warn(self): logger_message = six.moves.cStringIO() handler = logging.StreamHandler(logger_message) handler.setLevel(logging.DEBUG) logger = logging.getLogger(utils.__name__) level = logger.getEffectiveLevel() logger.setLevel(logging.DEBUG) logger.addHandler(handler) self.addCleanup(logger.removeHandler, handler) self.addCleanup(logger.setLevel, level) self.mixed_warn(1, 2, 3) self.assertIn('takes at most 3 positional', logger_message.getvalue()) @utils.positional(enforcement=utils.positional.EXCEPT) def inspect_func(self, arg, kwarg=None): return (arg, kwarg) def test_inspect_positions(self): self.assertEqual((1, None), self.inspect_func(1)) self.assertEqual((1, 2), self.inspect_func(1, kwarg=2)) self.assertRaises(TypeError, self.inspect_func) self.assertRaises(TypeError, self.inspect_func, 1, 2) @utils.positional.classmethod(1) def class_method(cls, a, b): return (cls, a, b) @utils.positional.method(1) def normal_method(self, a, b): self.assertIsInstance(self, TestPositional) return (self, a, b) def test_class_method(self): self.assertEqual((TestPositional, 1, 2), self.class_method(1, b=2)) self.assertRaises(TypeError, self.class_method, 1, 2) def test_normal_method(self): self.assertEqual((self, 1, 2), self.normal_method(1, b=2)) self.assertRaises(TypeError, self.normal_method, 1, 2) class HashSignedTokenTestCase(test_utils.TestCase, testresources.ResourcedTestCase): """Unit tests for utils.hash_signed_token().""" resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] def test_default_md5(self): """The default hash method is md5.""" token = self.examples.SIGNED_TOKEN_SCOPED if six.PY3: token = token.encode('utf-8') token_id_default = utils.hash_signed_token(token) token_id_md5 = utils.hash_signed_token(token, mode='md5') self.assertThat(token_id_default, matchers.Equals(token_id_md5)) # md5 hash is 32 chars. self.assertThat(token_id_default, matchers.HasLength(32)) def test_sha256(self): """Can also hash with sha256.""" token = self.examples.SIGNED_TOKEN_SCOPED if six.PY3: token = token.encode('utf-8') token_id = utils.hash_signed_token(token, mode='sha256') # sha256 hash is 64 chars. self.assertThat(token_id, matchers.HasLength(64)) def load_tests(loader, tests, pattern): return testresources.OptimisingTestSuite(tests)