From 9309cdc8f64bb0c3ea14a27490c0e81770206c8c Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Tue, 24 Nov 2015 12:07:01 +0100 Subject: [PATCH] tests: block when acquiring processing lock In tests, not blocking when processing the data in background might trigger a race condition since all the tests run in parallel on the same database. Change-Id: Id808f9e659f242fdaecd52cad08a6d00c96ac051 --- gnocchi/storage/__init__.py | 20 +++++++++++++++++--- gnocchi/storage/_carbonara.py | 6 ++++-- gnocchi/tests/test_aggregates.py | 2 +- gnocchi/tests/test_rest.py | 2 +- gnocchi/tests/test_statsd.py | 8 ++++---- gnocchi/tests/test_storage.py | 16 ++++++++-------- 6 files changed, 35 insertions(+), 19 deletions(-) diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index f41889e8a..759b939b7 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -157,17 +157,31 @@ class StorageDriver(object): def upgrade(index): pass - def process_background_tasks(self, index): + def process_background_tasks(self, index, sync=False): + """Process background tasks for this storage. + + This calls :func:`process_measures` to process new measures and + :func:`expunge_metrics` to expunge deleted metrics. + + :param index: An indexer to be used for querying metrics + :param sync: If True, then process everything synchronously and raise + on error + :type sync: bool + """ LOG.debug("Processing new and to delete measures") try: - self.process_measures(index) + self.process_measures(index, sync) except Exception: + if sync: + raise LOG.error("Unexpected error during measures processing", exc_info=True) LOG.debug("Expunging deleted metrics") try: self.expunge_metrics(index) except Exception: + if sync: + raise LOG.error("Unexpected error during deleting metrics", exc_info=True) @@ -197,7 +211,7 @@ class StorageDriver(object): raise exceptions.NotImplementedError @staticmethod - def process_measures(indexer=None): + def process_measures(indexer=None, sync=False): """Process added measures in background. Some drivers might need to have a background task running that process diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index 5c256b848..c54a3e20a 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -162,7 +162,7 @@ class CarbonaraBasedStorage(storage.StorageDriver): (metric_id, self._pending_measures_to_process_count(metric_id)) for metric_id in metrics_to_process) - def process_measures(self, indexer): + def process_measures(self, indexer, sync=False): metrics_to_process = self._list_metric_with_measures_to_process() metrics = indexer.get_metrics(metrics_to_process) # This build the list of deleted metrics, i.e. the metrics we have @@ -177,7 +177,7 @@ class CarbonaraBasedStorage(storage.StorageDriver): # Do not block if we cannot acquire the lock, that means some other # worker is doing the job. We'll just ignore this metric and may # get back later to it if needed. - if lock.acquire(blocking=False): + if lock.acquire(blocking=sync): try: LOG.debug("Processing measures for %s" % metric) with self._process_measure_for_metric(metric) as measures: @@ -234,6 +234,8 @@ class CarbonaraBasedStorage(storage.StorageDriver): self._store_unaggregated_timeserie(metric, ts.serialize()) except Exception: + if sync: + raise LOG.error("Error processing new measures", exc_info=True) finally: lock.release() diff --git a/gnocchi/tests/test_aggregates.py b/gnocchi/tests/test_aggregates.py index c9bf266cd..74da82f1b 100644 --- a/gnocchi/tests/test_aggregates.py +++ b/gnocchi/tests/test_aggregates.py @@ -60,7 +60,7 @@ class TestAggregates(tests_base.TestCase): with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [metric] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) return metric diff --git a/gnocchi/tests/test_rest.py b/gnocchi/tests/test_rest.py index 26a892ee1..1a87888b7 100644 --- a/gnocchi/tests/test_rest.py +++ b/gnocchi/tests/test_rest.py @@ -149,7 +149,7 @@ class TestingApp(webtest.TestApp): if self.auth: req.headers['X-Auth-Token'] = self.token response = super(TestingApp, self).do_request(req, *args, **kwargs) - self.storage.process_background_tasks(self.indexer) + self.storage.process_background_tasks(self.indexer, True) return response diff --git a/gnocchi/tests/test_statsd.py b/gnocchi/tests/test_statsd.py index d1c084516..a4073b5bc 100644 --- a/gnocchi/tests/test_statsd.py +++ b/gnocchi/tests/test_statsd.py @@ -67,7 +67,7 @@ class TestStatsd(tests_base.TestCase): metric = r.get_metric(metric_key) - self.stats.storage.process_background_tasks(self.stats.indexer) + self.stats.storage.process_background_tasks(self.stats.indexer, True) measures = self.stats.storage.get_measures(metric) self.assertEqual([ @@ -86,7 +86,7 @@ class TestStatsd(tests_base.TestCase): ("127.0.0.1", 12345)) self.stats.flush() - self.stats.storage.process_background_tasks(self.stats.indexer) + self.stats.storage.process_background_tasks(self.stats.indexer, True) measures = self.stats.storage.get_measures(metric) self.assertEqual([ @@ -117,7 +117,7 @@ class TestStatsd(tests_base.TestCase): with_metrics=True) metric = r.get_metric(metric_key) - self.stats.storage.process_background_tasks(self.stats.indexer) + self.stats.storage.process_background_tasks(self.stats.indexer, True) measures = self.stats.storage.get_measures(metric) self.assertEqual([ @@ -134,7 +134,7 @@ class TestStatsd(tests_base.TestCase): ("127.0.0.1", 12345)) self.stats.flush() - self.stats.storage.process_background_tasks(self.stats.indexer) + self.stats.storage.process_background_tasks(self.stats.indexer, True) measures = self.stats.storage.get_measures(metric) self.assertEqual([ diff --git a/gnocchi/tests/test_storage.py b/gnocchi/tests/test_storage.py index baf09a82a..37c8887d8 100644 --- a/gnocchi/tests/test_storage.py +++ b/gnocchi/tests/test_storage.py @@ -49,7 +49,7 @@ class TestStorageDriver(tests_base.TestCase): ]) with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [self.metric] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) self.storage.add_measures(self.metric, [ storage.Measure(datetime.datetime(2014, 1, 1, 13, 0, 1), 1), @@ -60,7 +60,7 @@ class TestStorageDriver(tests_base.TestCase): side_effect=ValueError("boom!")): with mock.patch('gnocchi.carbonara.msgpack.loads', side_effect=ValueError("boom!")): - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) expected_calls = [ mock.call.debug('Processing measures for %s' % self.metric.id), @@ -86,7 +86,7 @@ class TestStorageDriver(tests_base.TestCase): ]) with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [self.metric] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) self.storage.delete_metric(self.metric) def test_delete_nonempty_metric_unprocessed(self): @@ -129,7 +129,7 @@ class TestStorageDriver(tests_base.TestCase): for i in six.moves.range(0, 60) for j in six.moves.range(0, 60)]) with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [m] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) self.assertEqual(3661, len(self.storage.get_measures(m))) @@ -142,7 +142,7 @@ class TestStorageDriver(tests_base.TestCase): ]) with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [self.metric] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) self.assertEqual([ (utils.datetime_utc(2014, 1, 1), 86400.0, 39.75), @@ -277,7 +277,7 @@ class TestStorageDriver(tests_base.TestCase): ]) with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [self.metric, metric2] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) values = self.storage.get_cross_metric_measures([self.metric, metric2]) self.assertEqual([ @@ -337,7 +337,7 @@ class TestStorageDriver(tests_base.TestCase): ]) with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [self.metric, metric2] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) values = self.storage.get_cross_metric_measures([self.metric, metric2]) self.assertEqual([ @@ -367,7 +367,7 @@ class TestStorageDriver(tests_base.TestCase): ]) with mock.patch.object(self.index, 'get_metrics') as f: f.return_value = [self.metric, metric2] - self.storage.process_background_tasks(self.index) + self.storage.process_background_tasks(self.index, True) self.assertEqual( {metric2: [],