 ea498fb052
			
		
	
	ea498fb052
	
	
	
		
			
			* On Python 3, the printer doesn't encode Unicode to utf8 anymore, since print() expects a Unicode string. * Update unit tests for Python 3 since repr() doesn't escape non-ASCII characters in Unicode strings anymore: http://legacy.python.org/dev/peps/pep-3138/ Change-Id: I89471019d691a46651312d6a49964b719192148a
		
			
				
	
	
		
			348 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			348 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright (c) 2010-2013 OpenStack, LLC.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #    http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 | |
| # implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| import sys
 | |
| import time
 | |
| 
 | |
| try:
 | |
|     from unittest import mock
 | |
| except ImportError:
 | |
|     import mock
 | |
| 
 | |
| import testtools
 | |
| import threading
 | |
| import six
 | |
| from six.moves.queue import Queue, Empty
 | |
| 
 | |
| from swiftclient import multithreading as mt
 | |
| from swiftclient.exceptions import ClientException
 | |
| 
 | |
| 
 | |
| class ThreadTestCase(testtools.TestCase):
 | |
|     def setUp(self):
 | |
|         super(ThreadTestCase, self).setUp()
 | |
|         self.got_args_kwargs = Queue()
 | |
|         self.starting_thread_count = threading.active_count()
 | |
| 
 | |
|     def _func(self, q_item, *args, **kwargs):
 | |
|         self.got_items.put(q_item)
 | |
|         self.got_args_kwargs.put((args, kwargs))
 | |
| 
 | |
|         if q_item == 'go boom':
 | |
|             raise Exception('I went boom!')
 | |
|         if q_item == 'c boom':
 | |
|             raise ClientException(
 | |
|                 'Client Boom', http_scheme='http', http_host='192.168.22.1',
 | |
|                 http_port=80, http_path='/booze', http_status=404,
 | |
|                 http_reason='to much', http_response_content='no sir!')
 | |
| 
 | |
|         return 'best result EVAR!'
 | |
| 
 | |
|     def assertQueueContains(self, queue, expected_contents):
 | |
|         got_contents = []
 | |
|         try:
 | |
|             while True:
 | |
|                 got_contents.append(queue.get(timeout=0.1))
 | |
|         except Empty:
 | |
|             pass
 | |
|         if isinstance(expected_contents, set):
 | |
|             got_contents = set(got_contents)
 | |
|         self.assertEqual(expected_contents, got_contents)
 | |
| 
 | |
| 
 | |
| class TestQueueFunctionThread(ThreadTestCase):
 | |
|     def setUp(self):
 | |
|         super(TestQueueFunctionThread, self).setUp()
 | |
| 
 | |
|         self.input_queue = Queue()
 | |
|         self.got_items = Queue()
 | |
|         self.stored_results = []
 | |
| 
 | |
|         self.qft = mt.QueueFunctionThread(self.input_queue, self._func,
 | |
|                                           'one_arg', 'two_arg',
 | |
|                                           red_fish='blue_arg',
 | |
|                                           store_results=self.stored_results)
 | |
|         self.qft.start()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         if self.qft.is_alive():
 | |
|             self.finish_up_thread()
 | |
| 
 | |
|         super(TestQueueFunctionThread, self).tearDown()
 | |
| 
 | |
|     def finish_up_thread(self):
 | |
|         self.input_queue.put(mt.StopWorkerThreadSignal())
 | |
|         while self.qft.is_alive():
 | |
|             time.sleep(0.05)
 | |
| 
 | |
|     def test_plumbing_and_store_results(self):
 | |
|         self.input_queue.put('abc')
 | |
|         self.input_queue.put(123)
 | |
|         self.finish_up_thread()
 | |
| 
 | |
|         self.assertQueueContains(self.got_items, ['abc', 123])
 | |
|         self.assertQueueContains(self.got_args_kwargs, [
 | |
|             (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'}),
 | |
|             (('one_arg', 'two_arg'), {'red_fish': 'blue_arg'})])
 | |
|         self.assertEqual(self.stored_results,
 | |
|                          ['best result EVAR!', 'best result EVAR!'])
 | |
| 
 | |
|     def test_exception_handling(self):
 | |
|         self.input_queue.put('go boom')
 | |
|         self.input_queue.put('ok')
 | |
|         self.input_queue.put('go boom')
 | |
|         self.finish_up_thread()
 | |
| 
 | |
|         self.assertQueueContains(self.got_items,
 | |
|                                  ['go boom', 'ok', 'go boom'])
 | |
|         self.assertEqual(len(self.qft.exc_infos), 2)
 | |
|         self.assertEqual(Exception, self.qft.exc_infos[0][0])
 | |
|         self.assertEqual(Exception, self.qft.exc_infos[1][0])
 | |
|         self.assertEqual(('I went boom!',), self.qft.exc_infos[0][1].args)
 | |
|         self.assertEqual(('I went boom!',), self.qft.exc_infos[1][1].args)
 | |
| 
 | |
| 
 | |
| class TestQueueFunctionManager(ThreadTestCase):
 | |
|     def setUp(self):
 | |
|         super(TestQueueFunctionManager, self).setUp()
 | |
|         self.thread_manager = mock.create_autospec(
 | |
|             mt.MultiThreadingManager, spec_set=True, instance=True)
 | |
|         self.thread_count = 4
 | |
|         self.error_counter = [0]
 | |
|         self.got_items = Queue()
 | |
|         self.stored_results = []
 | |
|         self.qfq = mt.QueueFunctionManager(
 | |
|             self._func, self.thread_count, self.thread_manager,
 | |
|             thread_args=('1arg', '2arg'),
 | |
|             thread_kwargs={'a': 'b', 'store_results': self.stored_results},
 | |
|             error_counter=self.error_counter,
 | |
|             connection_maker=self.connection_maker)
 | |
| 
 | |
|     def connection_maker(self):
 | |
|         return 'yup, I made a connection'
 | |
| 
 | |
|     def test_context_manager_without_error_counter(self):
 | |
|         self.qfq = mt.QueueFunctionManager(
 | |
|             self._func, self.thread_count, self.thread_manager,
 | |
|             thread_args=('1arg', '2arg'),
 | |
|             thread_kwargs={'a': 'b', 'store_results': self.stored_results},
 | |
|             connection_maker=self.connection_maker)
 | |
| 
 | |
|         with self.qfq as input_queue:
 | |
|             self.assertEqual(self.starting_thread_count + self.thread_count,
 | |
|                              threading.active_count())
 | |
|             input_queue.put('go boom')
 | |
| 
 | |
|         self.assertEqual(self.starting_thread_count, threading.active_count())
 | |
|         error_strs = list(map(str, self.thread_manager.error.call_args_list))
 | |
|         self.assertEqual(1, len(error_strs))
 | |
|         self.assertTrue('Exception: I went boom!' in error_strs[0])
 | |
| 
 | |
|     def test_context_manager_without_conn_maker_or_error_counter(self):
 | |
|         self.qfq = mt.QueueFunctionManager(
 | |
|             self._func, self.thread_count, self.thread_manager,
 | |
|             thread_args=('1arg', '2arg'), thread_kwargs={'a': 'b'})
 | |
| 
 | |
|         with self.qfq as input_queue:
 | |
|             self.assertEqual(self.starting_thread_count + self.thread_count,
 | |
|                              threading.active_count())
 | |
|             for i in range(20):
 | |
|                 input_queue.put('slap%d' % i)
 | |
| 
 | |
|         self.assertEqual(self.starting_thread_count, threading.active_count())
 | |
|         self.assertEqual([], self.thread_manager.error.call_args_list)
 | |
|         self.assertEqual(0, self.error_counter[0])
 | |
|         self.assertQueueContains(self.got_items,
 | |
|                                  set(['slap%d' % i for i in range(20)]))
 | |
|         self.assertQueueContains(
 | |
|             self.got_args_kwargs,
 | |
|             [(('1arg', '2arg'), {'a': 'b'})] * 20)
 | |
|         self.assertEqual(self.stored_results, [])
 | |
| 
 | |
|     def test_context_manager_with_exceptions(self):
 | |
|         with self.qfq as input_queue:
 | |
|             self.assertEqual(self.starting_thread_count + self.thread_count,
 | |
|                              threading.active_count())
 | |
|             for i in range(20):
 | |
|                 input_queue.put('item%d' % i if i % 2 == 0 else 'go boom')
 | |
| 
 | |
|         self.assertEqual(self.starting_thread_count, threading.active_count())
 | |
|         error_strs = list(map(str, self.thread_manager.error.call_args_list))
 | |
|         self.assertEqual(10, len(error_strs))
 | |
|         self.assertTrue(all(['Exception: I went boom!' in s for s in
 | |
|                              error_strs]))
 | |
|         self.assertEqual(10, self.error_counter[0])
 | |
|         expected_items = set(['go boom'] +
 | |
|                              ['item%d' % i for i in range(20)
 | |
|                               if i % 2 == 0])
 | |
|         self.assertQueueContains(self.got_items, expected_items)
 | |
|         self.assertQueueContains(
 | |
|             self.got_args_kwargs,
 | |
|             [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
 | |
|         self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
 | |
| 
 | |
|     def test_context_manager_with_client_exceptions(self):
 | |
|         with self.qfq as input_queue:
 | |
|             self.assertEqual(self.starting_thread_count + self.thread_count,
 | |
|                              threading.active_count())
 | |
|             for i in range(20):
 | |
|                 input_queue.put('item%d' % i if i % 2 == 0 else 'c boom')
 | |
| 
 | |
|         self.assertEqual(self.starting_thread_count, threading.active_count())
 | |
|         error_strs = list(map(str, self.thread_manager.error.call_args_list))
 | |
|         self.assertEqual(10, len(error_strs))
 | |
|         stringification = 'Client Boom: ' \
 | |
|             'http://192.168.22.1:80/booze 404 to much   no sir!'
 | |
|         self.assertTrue(all([stringification in s for s in error_strs]))
 | |
|         self.assertEqual(10, self.error_counter[0])
 | |
|         expected_items = set(['c boom'] +
 | |
|                              ['item%d' % i for i in range(20)
 | |
|                               if i % 2 == 0])
 | |
|         self.assertQueueContains(self.got_items, expected_items)
 | |
|         self.assertQueueContains(
 | |
|             self.got_args_kwargs,
 | |
|             [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
 | |
|         self.assertEqual(self.stored_results, ['best result EVAR!'] * 10)
 | |
| 
 | |
|     def test_context_manager_with_connection_maker(self):
 | |
|         with self.qfq as input_queue:
 | |
|             self.assertEqual(self.starting_thread_count + self.thread_count,
 | |
|                              threading.active_count())
 | |
|             for i in range(20):
 | |
|                 input_queue.put('item%d' % i)
 | |
| 
 | |
|         self.assertEqual(self.starting_thread_count, threading.active_count())
 | |
|         self.assertEqual([], self.thread_manager.error.call_args_list)
 | |
|         self.assertEqual(0, self.error_counter[0])
 | |
|         self.assertQueueContains(self.got_items,
 | |
|                                  set(['item%d' % i for i in range(20)]))
 | |
|         self.assertQueueContains(
 | |
|             self.got_args_kwargs,
 | |
|             [(('yup, I made a connection', '1arg', '2arg'), {'a': 'b'})] * 20)
 | |
|         self.assertEqual(self.stored_results, ['best result EVAR!'] * 20)
 | |
| 
 | |
| 
 | |
| class TestMultiThreadingManager(ThreadTestCase):
 | |
| 
 | |
|     @mock.patch('swiftclient.multithreading.QueueFunctionManager')
 | |
|     def test_instantiation(self, mock_qfq):
 | |
|         thread_manager = mt.MultiThreadingManager()
 | |
| 
 | |
|         self.assertEqual([
 | |
|             mock.call(thread_manager._print, 1, thread_manager),
 | |
|             mock.call(thread_manager._print_error, 1, thread_manager),
 | |
|         ], mock_qfq.call_args_list)
 | |
| 
 | |
|         # These contexts don't get entered into until the
 | |
|         # MultiThreadingManager's context is entered.
 | |
|         self.assertEqual([], thread_manager.printer.__enter__.call_args_list)
 | |
|         self.assertEqual([],
 | |
|                          thread_manager.error_printer.__enter__.call_args_list)
 | |
| 
 | |
|         # Test default values for the streams.
 | |
|         self.assertEqual(sys.stdout, thread_manager.print_stream)
 | |
|         self.assertEqual(sys.stderr, thread_manager.error_stream)
 | |
| 
 | |
|     @mock.patch('swiftclient.multithreading.QueueFunctionManager')
 | |
|     def test_queue_manager_no_args(self, mock_qfq):
 | |
|         thread_manager = mt.MultiThreadingManager()
 | |
| 
 | |
|         mock_qfq.reset_mock()
 | |
|         mock_qfq.return_value = 'slap happy!'
 | |
| 
 | |
|         self.assertEqual(
 | |
|             'slap happy!',
 | |
|             thread_manager.queue_manager(self._func, 88))
 | |
| 
 | |
|         self.assertEqual([
 | |
|             mock.call(self._func, 88, thread_manager, thread_args=(),
 | |
|                       thread_kwargs={}, connection_maker=None,
 | |
|                       error_counter=None)
 | |
|         ], mock_qfq.call_args_list)
 | |
| 
 | |
|     @mock.patch('swiftclient.multithreading.QueueFunctionManager')
 | |
|     def test_queue_manager_with_args(self, mock_qfq):
 | |
|         thread_manager = mt.MultiThreadingManager()
 | |
| 
 | |
|         mock_qfq.reset_mock()
 | |
|         mock_qfq.return_value = 'do run run'
 | |
| 
 | |
|         self.assertEqual(
 | |
|             'do run run',
 | |
|             thread_manager.queue_manager(self._func, 88, 'fun', times='are',
 | |
|                                          connection_maker='abc', to='be had',
 | |
|                                          error_counter='def'))
 | |
| 
 | |
|         self.assertEqual([
 | |
|             mock.call(self._func, 88, thread_manager, thread_args=('fun',),
 | |
|                       thread_kwargs={'times': 'are', 'to': 'be had'},
 | |
|                       connection_maker='abc', error_counter='def')
 | |
|         ], mock_qfq.call_args_list)
 | |
| 
 | |
|     def test_printers(self):
 | |
|         out_stream = six.StringIO()
 | |
|         err_stream = six.StringIO()
 | |
| 
 | |
|         with mt.MultiThreadingManager(
 | |
|                 print_stream=out_stream,
 | |
|                 error_stream=err_stream) as thread_manager:
 | |
| 
 | |
|             # Sanity-checking these gives power to the previous test which
 | |
|             # looked at the default values of thread_manager.print/error_stream
 | |
|             self.assertEqual(out_stream, thread_manager.print_stream)
 | |
|             self.assertEqual(err_stream, thread_manager.error_stream)
 | |
| 
 | |
|             self.assertEqual(self.starting_thread_count + 2,
 | |
|                              threading.active_count())
 | |
| 
 | |
|             thread_manager.print_msg('one-argument')
 | |
|             thread_manager.print_msg('one %s, %d fish', 'fish', 88)
 | |
|             thread_manager.error('I have %d problems, but a %s is not one',
 | |
|                                  99, u'\u062A\u062A')
 | |
|             thread_manager.print_msg('some\n%s\nover the %r', 'where',
 | |
|                                      u'\u062A\u062A')
 | |
|             thread_manager.error('one-error-argument')
 | |
|             thread_manager.error('Sometimes\n%.1f%% just\ndoes not\nwork!',
 | |
|                                  3.14159)
 | |
| 
 | |
|         self.assertEqual(self.starting_thread_count, threading.active_count())
 | |
| 
 | |
|         out_stream.seek(0)
 | |
|         if six.PY3:
 | |
|             over_the = "over the '\u062a\u062a'\n"
 | |
|         else:
 | |
|             over_the = "over the u'\\u062a\\u062a'\n"
 | |
|         self.assertEqual([
 | |
|             'one-argument\n',
 | |
|             'one fish, 88 fish\n',
 | |
|             'some\n', 'where\n', over_the,
 | |
|         ], list(out_stream.readlines()))
 | |
| 
 | |
|         err_stream.seek(0)
 | |
|         first_item = u'I have 99 problems, but a \u062A\u062A is not one\n'
 | |
|         if six.PY2:
 | |
|             first_item = first_item.encode('utf8')
 | |
|         self.assertEqual([
 | |
|             first_item,
 | |
|             'one-error-argument\n',
 | |
|             'Sometimes\n', '3.1% just\n', 'does not\n', 'work!\n',
 | |
|         ], list(err_stream.readlines()))
 | |
| 
 | |
|         self.assertEqual(3, thread_manager.error_count)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     testtools.main()
 |