From fb233f206051e10d619075f6a3f78364975a20f9 Mon Sep 17 00:00:00 2001 From: Greg Lange Date: Wed, 13 Apr 2011 14:52:25 +0000 Subject: [PATCH 1/4] made stats processing handle generic exception in collate_worker() --- swift/stats/log_processor.py | 13 +++++++++---- test/unit/stats/test_log_processor.py | 14 ++++++++------ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index f2cfd220ce..6ac8e0501e 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -468,7 +468,7 @@ class LogProcessorDaemon(Daemon): # map processor_args = (self.total_conf, self.logger) results = multiprocess_collate(processor_args, logs_to_process, - self.worker_count) + self.worker_count, self.logger) # reduce aggr_data = self.get_aggregate_data(processed_files, results) @@ -527,7 +527,8 @@ class LogProcessorDaemon(Daemon): ((time.time() - start) / 60)) -def multiprocess_collate(processor_args, logs_to_process, worker_count): +def multiprocess_collate(processor_args, logs_to_process, worker_count, + logger): ''' yield hourly data from logs_to_process Every item that this function yields will be added to the processed files @@ -553,7 +554,11 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count): except Queue.Empty: time.sleep(.01) else: - if not isinstance(data, BadFileDownload): + if isinstance(data, Exception): + item_string = '/'.join(item[2:]) + logger.exception("Problem processing file '%s'" % + (item_string)) + else: yield item, data if not any(r.is_alive() for r in results) and out_queue.empty(): # all the workers are done and nothing is in the queue @@ -570,6 +575,6 @@ def collate_worker(processor_args, in_queue, out_queue): break try: ret = p.process_one_file(*item) - except BadFileDownload, err: + except Exception, err: ret = err out_queue.put((item, ret)) diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index fa90ec5825..8a325310fb 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -342,7 +342,7 @@ use = egg:swift#proxy def test_collate_worker_error(self): def get_object_data(*a,**kw): - raise log_processor.BadFileDownload() + raise Exception() orig_get_object_data = log_processor.LogProcessor.get_object_data try: log_processor.LogProcessor.get_object_data = get_object_data @@ -364,8 +364,7 @@ use = egg:swift#proxy self.assertEquals(item, work_request) # these only work for Py2.7+ #self.assertIsInstance(ret, log_processor.BadFileDownload) - self.assertTrue(isinstance(ret, log_processor.BadFileDownload), - type(ret)) + self.assertTrue(isinstance(ret, Exception), type(ret)) finally: log_processor.LogProcessor.get_object_data = orig_get_object_data @@ -388,7 +387,8 @@ use = egg:swift#proxy logs_to_process = [item] results = log_processor.multiprocess_collate(processor_args, logs_to_process, - 1) + 1, + DumbLogger()) results = list(results) expected = [(item, {('acct', '2010', '07', '09', '04'): {('public', 'object', 'GET', '2xx'): 1, @@ -422,7 +422,8 @@ use = egg:swift#proxy logs_to_process = [item] results = log_processor.multiprocess_collate(processor_args, logs_to_process, - 1) + 1, + DumbLogger()) results = list(results) expected = [] self.assertEquals(results, expected) @@ -762,12 +763,13 @@ class TestLogProcessorDaemon(unittest.TestCase): d = MockLogProcessorDaemon(self) def mock_multiprocess_collate(processor_args, logs_to_process, - worker_count): + worker_count, logger): self.assertEquals(d.total_conf, processor_args[0]) self.assertEquals(d.logger, processor_args[1]) self.assertEquals(mock_logs_to_process, logs_to_process) self.assertEquals(d.worker_count, worker_count) + self.assertEquals(d.logger, logger) return multiprocess_collate_return From 243fb2a49fc2307bfd55d1b4d8513bc737a1cb09 Mon Sep 17 00:00:00 2001 From: Greg Lange Date: Wed, 13 Apr 2011 15:48:43 +0000 Subject: [PATCH 2/4] needed to push logging to where exception is caught --- swift/stats/log_processor.py | 13 +++++-------- test/unit/stats/test_log_processor.py | 9 +++------ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 6ac8e0501e..f8ac20ac39 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -468,7 +468,7 @@ class LogProcessorDaemon(Daemon): # map processor_args = (self.total_conf, self.logger) results = multiprocess_collate(processor_args, logs_to_process, - self.worker_count, self.logger) + self.worker_count) # reduce aggr_data = self.get_aggregate_data(processed_files, results) @@ -527,8 +527,7 @@ class LogProcessorDaemon(Daemon): ((time.time() - start) / 60)) -def multiprocess_collate(processor_args, logs_to_process, worker_count, - logger): +def multiprocess_collate(processor_args, logs_to_process, worker_count): ''' yield hourly data from logs_to_process Every item that this function yields will be added to the processed files @@ -554,11 +553,7 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count, except Queue.Empty: time.sleep(.01) else: - if isinstance(data, Exception): - item_string = '/'.join(item[2:]) - logger.exception("Problem processing file '%s'" % - (item_string)) - else: + if not isinstance(data, Exception): yield item, data if not any(r.is_alive() for r in results) and out_queue.empty(): # all the workers are done and nothing is in the queue @@ -576,5 +571,7 @@ def collate_worker(processor_args, in_queue, out_queue): try: ret = p.process_one_file(*item) except Exception, err: + item_string = '/'.join(item[2:]) + p.logger.exception("Unable to process file '%s'" % (item_string)) ret = err out_queue.put((item, ret)) diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 8a325310fb..1dfa4424b2 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -387,8 +387,7 @@ use = egg:swift#proxy logs_to_process = [item] results = log_processor.multiprocess_collate(processor_args, logs_to_process, - 1, - DumbLogger()) + 1) results = list(results) expected = [(item, {('acct', '2010', '07', '09', '04'): {('public', 'object', 'GET', '2xx'): 1, @@ -422,8 +421,7 @@ use = egg:swift#proxy logs_to_process = [item] results = log_processor.multiprocess_collate(processor_args, logs_to_process, - 1, - DumbLogger()) + 1) results = list(results) expected = [] self.assertEquals(results, expected) @@ -763,13 +761,12 @@ class TestLogProcessorDaemon(unittest.TestCase): d = MockLogProcessorDaemon(self) def mock_multiprocess_collate(processor_args, logs_to_process, - worker_count, logger): + worker_count): self.assertEquals(d.total_conf, processor_args[0]) self.assertEquals(d.logger, processor_args[1]) self.assertEquals(mock_logs_to_process, logs_to_process) self.assertEquals(d.worker_count, worker_count) - self.assertEquals(d.logger, logger) return multiprocess_collate_return From 5e89b53ef38b86f354c163295a10c6fd80b5d782 Mon Sep 17 00:00:00 2001 From: Greg Lange Date: Wed, 13 Apr 2011 16:19:30 +0000 Subject: [PATCH 3/4] fixed a unit test assert statement --- test/unit/stats/test_log_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 1dfa4424b2..c1b3b68b19 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -364,7 +364,7 @@ use = egg:swift#proxy self.assertEquals(item, work_request) # these only work for Py2.7+ #self.assertIsInstance(ret, log_processor.BadFileDownload) - self.assertTrue(isinstance(ret, Exception), type(ret)) + self.assertTrue(isinstance(ret, Exception)) finally: log_processor.LogProcessor.get_object_data = orig_get_object_data From 68e1370abf338c81f5f6cb7753606a303d42bfe2 Mon Sep 17 00:00:00 2001 From: Greg Lange Date: Mon, 18 Apr 2011 15:27:43 +0000 Subject: [PATCH 4/4] added account to log message --- swift/stats/log_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index f8ac20ac39..691a560efe 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -571,7 +571,7 @@ def collate_worker(processor_args, in_queue, out_queue): try: ret = p.process_one_file(*item) except Exception, err: - item_string = '/'.join(item[2:]) + item_string = '/'.join(item[1:]) p.logger.exception("Unable to process file '%s'" % (item_string)) ret = err out_queue.put((item, ret))