 82359492dc
			
		
	
	82359492dc
	
	
	
		
			
			The token hash functions always used MD5. With this change, the hash function can be passed in to the hash functions. SecurityImpact Related-Bug: #1174499 Change-Id: Ia08c2d6252bb034087a244b47d5bcbea7dcfa70b
		
			
				
	
	
		
			241 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| import logging
 | |
| import sys
 | |
| 
 | |
| import six
 | |
| import testresources
 | |
| from testtools import matchers
 | |
| 
 | |
| from keystoneclient import exceptions
 | |
| from keystoneclient.tests import client_fixtures
 | |
| from keystoneclient.tests import utils as test_utils
 | |
| from keystoneclient import utils
 | |
| 
 | |
| 
 | |
| class FakeResource(object):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class FakeManager(object):
 | |
| 
 | |
|     resource_class = FakeResource
 | |
| 
 | |
|     resources = {
 | |
|         '1234': {'name': 'entity_one'},
 | |
|         '8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0': {'name': 'entity_two'},
 | |
|         '\xe3\x82\xbdtest': {'name': u'\u30bdtest'},
 | |
|         '5678': {'name': '9876'}
 | |
|     }
 | |
| 
 | |
|     def get(self, resource_id):
 | |
|         try:
 | |
|             return self.resources[str(resource_id)]
 | |
|         except KeyError:
 | |
|             raise exceptions.NotFound(resource_id)
 | |
| 
 | |
|     def find(self, name=None):
 | |
|         if name == '9999':
 | |
|             # NOTE(morganfainberg): special case that raises NoUniqueMatch.
 | |
|             raise exceptions.NoUniqueMatch()
 | |
|         for resource_id, resource in self.resources.items():
 | |
|             if resource['name'] == str(name):
 | |
|                 return resource
 | |
|         raise exceptions.NotFound(name)
 | |
| 
 | |
| 
 | |
| class FindResourceTestCase(test_utils.TestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         super(FindResourceTestCase, self).setUp()
 | |
|         self.manager = FakeManager()
 | |
| 
 | |
|     def test_find_none(self):
 | |
|         self.assertRaises(exceptions.CommandError,
 | |
|                           utils.find_resource,
 | |
|                           self.manager,
 | |
|                           'asdf')
 | |
| 
 | |
|     def test_find_by_integer_id(self):
 | |
|         output = utils.find_resource(self.manager, 1234)
 | |
|         self.assertEqual(output, self.manager.resources['1234'])
 | |
| 
 | |
|     def test_find_by_str_id(self):
 | |
|         output = utils.find_resource(self.manager, '1234')
 | |
|         self.assertEqual(output, self.manager.resources['1234'])
 | |
| 
 | |
|     def test_find_by_uuid(self):
 | |
|         uuid = '8e8ec658-c7b0-4243-bdf8-6f7f2952c0d0'
 | |
|         output = utils.find_resource(self.manager, uuid)
 | |
|         self.assertEqual(output, self.manager.resources[uuid])
 | |
| 
 | |
|     def test_find_by_unicode(self):
 | |
|         name = '\xe3\x82\xbdtest'
 | |
|         output = utils.find_resource(self.manager, name)
 | |
|         self.assertEqual(output, self.manager.resources[name])
 | |
| 
 | |
|     def test_find_by_str_name(self):
 | |
|         output = utils.find_resource(self.manager, 'entity_one')
 | |
|         self.assertEqual(output, self.manager.resources['1234'])
 | |
| 
 | |
|     def test_find_by_int_name(self):
 | |
|         output = utils.find_resource(self.manager, 9876)
 | |
|         self.assertEqual(output, self.manager.resources['5678'])
 | |
| 
 | |
|     def test_find_no_unique_match(self):
 | |
|         self.assertRaises(exceptions.CommandError,
 | |
|                           utils.find_resource,
 | |
|                           self.manager,
 | |
|                           9999)
 | |
| 
 | |
| 
 | |
| class FakeObject(object):
 | |
|     def __init__(self, name):
 | |
|         self.name = name
 | |
| 
 | |
| 
 | |
| class PrintTestCase(test_utils.TestCase):
 | |
|     def setUp(self):
 | |
|         super(PrintTestCase, self).setUp()
 | |
|         self.old_stdout = sys.stdout
 | |
|         self.stdout = six.moves.cStringIO()
 | |
|         self.addCleanup(setattr, self, 'stdout', None)
 | |
|         sys.stdout = self.stdout
 | |
|         self.addCleanup(setattr, sys, 'stdout', self.old_stdout)
 | |
| 
 | |
|     def test_print_list_unicode(self):
 | |
|         name = u'\u540d\u5b57'
 | |
|         objs = [FakeObject(name)]
 | |
|         # NOTE(Jeffrey4l) If the text's encode is proper, this method will not
 | |
|         # raise UnicodeEncodeError exceptions
 | |
|         utils.print_list(objs, ['name'])
 | |
|         output = self.stdout.getvalue()
 | |
|         # In Python 2, output will be bytes, while in Python 3, it will not.
 | |
|         # Let's decode the value if needed.
 | |
|         if isinstance(output, six.binary_type):
 | |
|             output = output.decode('utf-8')
 | |
|         self.assertIn(name, output)
 | |
| 
 | |
|     def test_print_dict_unicode(self):
 | |
|         name = u'\u540d\u5b57'
 | |
|         utils.print_dict({'name': name})
 | |
|         output = self.stdout.getvalue()
 | |
|         # In Python 2, output will be bytes, while in Python 3, it will not.
 | |
|         # Let's decode the value if needed.
 | |
|         if isinstance(output, six.binary_type):
 | |
|             output = output.decode('utf-8')
 | |
|         self.assertIn(name, output)
 | |
| 
 | |
| 
 | |
| class TestPositional(test_utils.TestCase):
 | |
| 
 | |
|     @utils.positional(1)
 | |
|     def no_vars(self):
 | |
|         # positional doesn't enforce anything here
 | |
|         return True
 | |
| 
 | |
|     @utils.positional(3, utils.positional.EXCEPT)
 | |
|     def mixed_except(self, arg, kwarg1=None, kwarg2=None):
 | |
|         # self, arg, and kwarg1 may be passed positionally
 | |
|         return (arg, kwarg1, kwarg2)
 | |
| 
 | |
|     @utils.positional(3, utils.positional.WARN)
 | |
|     def mixed_warn(self, arg, kwarg1=None, kwarg2=None):
 | |
|         # self, arg, and kwarg1 may be passed positionally, only a warning
 | |
|         # is emitted
 | |
|         return (arg, kwarg1, kwarg2)
 | |
| 
 | |
|     def test_nothing(self):
 | |
|         self.assertTrue(self.no_vars())
 | |
| 
 | |
|     def test_mixed_except(self):
 | |
|         self.assertEqual((1, 2, 3), self.mixed_except(1, 2, kwarg2=3))
 | |
|         self.assertEqual((1, 2, 3), self.mixed_except(1, kwarg1=2, kwarg2=3))
 | |
|         self.assertEqual((1, None, None), self.mixed_except(1))
 | |
|         self.assertRaises(TypeError, self.mixed_except, 1, 2, 3)
 | |
| 
 | |
|     def test_mixed_warn(self):
 | |
|         logger_message = six.moves.cStringIO()
 | |
|         handler = logging.StreamHandler(logger_message)
 | |
|         handler.setLevel(logging.DEBUG)
 | |
| 
 | |
|         logger = logging.getLogger(utils.__name__)
 | |
|         level = logger.getEffectiveLevel()
 | |
|         logger.setLevel(logging.DEBUG)
 | |
|         logger.addHandler(handler)
 | |
| 
 | |
|         self.addCleanup(logger.removeHandler, handler)
 | |
|         self.addCleanup(logger.setLevel, level)
 | |
| 
 | |
|         self.mixed_warn(1, 2, 3)
 | |
| 
 | |
|         self.assertIn('takes at most 3 positional', logger_message.getvalue())
 | |
| 
 | |
|     @utils.positional(enforcement=utils.positional.EXCEPT)
 | |
|     def inspect_func(self, arg, kwarg=None):
 | |
|         return (arg, kwarg)
 | |
| 
 | |
|     def test_inspect_positions(self):
 | |
|         self.assertEqual((1, None), self.inspect_func(1))
 | |
|         self.assertEqual((1, 2), self.inspect_func(1, kwarg=2))
 | |
|         self.assertRaises(TypeError, self.inspect_func)
 | |
|         self.assertRaises(TypeError, self.inspect_func, 1, 2)
 | |
| 
 | |
|     @utils.positional.classmethod(1)
 | |
|     def class_method(cls, a, b):
 | |
|         return (cls, a, b)
 | |
| 
 | |
|     @utils.positional.method(1)
 | |
|     def normal_method(self, a, b):
 | |
|         self.assertIsInstance(self, TestPositional)
 | |
|         return (self, a, b)
 | |
| 
 | |
|     def test_class_method(self):
 | |
|         self.assertEqual((TestPositional, 1, 2), self.class_method(1, b=2))
 | |
|         self.assertRaises(TypeError, self.class_method, 1, 2)
 | |
| 
 | |
|     def test_normal_method(self):
 | |
|         self.assertEqual((self, 1, 2), self.normal_method(1, b=2))
 | |
|         self.assertRaises(TypeError, self.normal_method, 1, 2)
 | |
| 
 | |
| 
 | |
| class HashSignedTokenTestCase(test_utils.TestCase,
 | |
|                               testresources.ResourcedTestCase):
 | |
|     """Unit tests for utils.hash_signed_token()."""
 | |
| 
 | |
|     resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
 | |
| 
 | |
|     def test_default_md5(self):
 | |
|         """The default hash method is md5."""
 | |
|         token = self.examples.SIGNED_TOKEN_SCOPED
 | |
|         if six.PY3:
 | |
|             token = token.encode('utf-8')
 | |
|         token_id_default = utils.hash_signed_token(token)
 | |
|         token_id_md5 = utils.hash_signed_token(token, mode='md5')
 | |
|         self.assertThat(token_id_default, matchers.Equals(token_id_md5))
 | |
|         # md5 hash is 32 chars.
 | |
|         self.assertThat(token_id_default, matchers.HasLength(32))
 | |
| 
 | |
|     def test_sha256(self):
 | |
|         """Can also hash with sha256."""
 | |
|         token = self.examples.SIGNED_TOKEN_SCOPED
 | |
|         if six.PY3:
 | |
|             token = token.encode('utf-8')
 | |
|         token_id = utils.hash_signed_token(token, mode='sha256')
 | |
|         # sha256 hash is 64 chars.
 | |
|         self.assertThat(token_id, matchers.HasLength(64))
 | |
| 
 | |
| 
 | |
| def load_tests(loader, tests, pattern):
 | |
|     return testresources.OptimisingTestSuite(tests)
 |