tests: block when acquiring processing lock
In tests, not blocking when processing the data in background might trigger a race condition since all the tests run in parallel on the same database. Change-Id: Id808f9e659f242fdaecd52cad08a6d00c96ac051
This commit is contained in:
parent
54b7b70f3a
commit
9309cdc8f6
|
@ -157,17 +157,31 @@ class StorageDriver(object):
|
||||||
def upgrade(index):
|
def upgrade(index):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def process_background_tasks(self, index):
|
def process_background_tasks(self, index, sync=False):
|
||||||
|
"""Process background tasks for this storage.
|
||||||
|
|
||||||
|
This calls :func:`process_measures` to process new measures and
|
||||||
|
:func:`expunge_metrics` to expunge deleted metrics.
|
||||||
|
|
||||||
|
:param index: An indexer to be used for querying metrics
|
||||||
|
:param sync: If True, then process everything synchronously and raise
|
||||||
|
on error
|
||||||
|
:type sync: bool
|
||||||
|
"""
|
||||||
LOG.debug("Processing new and to delete measures")
|
LOG.debug("Processing new and to delete measures")
|
||||||
try:
|
try:
|
||||||
self.process_measures(index)
|
self.process_measures(index, sync)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
if sync:
|
||||||
|
raise
|
||||||
LOG.error("Unexpected error during measures processing",
|
LOG.error("Unexpected error during measures processing",
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
LOG.debug("Expunging deleted metrics")
|
LOG.debug("Expunging deleted metrics")
|
||||||
try:
|
try:
|
||||||
self.expunge_metrics(index)
|
self.expunge_metrics(index)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
if sync:
|
||||||
|
raise
|
||||||
LOG.error("Unexpected error during deleting metrics",
|
LOG.error("Unexpected error during deleting metrics",
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
|
|
||||||
|
@ -197,7 +211,7 @@ class StorageDriver(object):
|
||||||
raise exceptions.NotImplementedError
|
raise exceptions.NotImplementedError
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def process_measures(indexer=None):
|
def process_measures(indexer=None, sync=False):
|
||||||
"""Process added measures in background.
|
"""Process added measures in background.
|
||||||
|
|
||||||
Some drivers might need to have a background task running that process
|
Some drivers might need to have a background task running that process
|
||||||
|
|
|
@ -162,7 +162,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||||
(metric_id, self._pending_measures_to_process_count(metric_id))
|
(metric_id, self._pending_measures_to_process_count(metric_id))
|
||||||
for metric_id in metrics_to_process)
|
for metric_id in metrics_to_process)
|
||||||
|
|
||||||
def process_measures(self, indexer):
|
def process_measures(self, indexer, sync=False):
|
||||||
metrics_to_process = self._list_metric_with_measures_to_process()
|
metrics_to_process = self._list_metric_with_measures_to_process()
|
||||||
metrics = indexer.get_metrics(metrics_to_process)
|
metrics = indexer.get_metrics(metrics_to_process)
|
||||||
# This build the list of deleted metrics, i.e. the metrics we have
|
# This build the list of deleted metrics, i.e. the metrics we have
|
||||||
|
@ -177,7 +177,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||||
# Do not block if we cannot acquire the lock, that means some other
|
# Do not block if we cannot acquire the lock, that means some other
|
||||||
# worker is doing the job. We'll just ignore this metric and may
|
# worker is doing the job. We'll just ignore this metric and may
|
||||||
# get back later to it if needed.
|
# get back later to it if needed.
|
||||||
if lock.acquire(blocking=False):
|
if lock.acquire(blocking=sync):
|
||||||
try:
|
try:
|
||||||
LOG.debug("Processing measures for %s" % metric)
|
LOG.debug("Processing measures for %s" % metric)
|
||||||
with self._process_measure_for_metric(metric) as measures:
|
with self._process_measure_for_metric(metric) as measures:
|
||||||
|
@ -234,6 +234,8 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||||
self._store_unaggregated_timeserie(metric,
|
self._store_unaggregated_timeserie(metric,
|
||||||
ts.serialize())
|
ts.serialize())
|
||||||
except Exception:
|
except Exception:
|
||||||
|
if sync:
|
||||||
|
raise
|
||||||
LOG.error("Error processing new measures", exc_info=True)
|
LOG.error("Error processing new measures", exc_info=True)
|
||||||
finally:
|
finally:
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
|
@ -60,7 +60,7 @@ class TestAggregates(tests_base.TestCase):
|
||||||
|
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [metric]
|
f.return_value = [metric]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
return metric
|
return metric
|
||||||
|
|
||||||
|
|
|
@ -149,7 +149,7 @@ class TestingApp(webtest.TestApp):
|
||||||
if self.auth:
|
if self.auth:
|
||||||
req.headers['X-Auth-Token'] = self.token
|
req.headers['X-Auth-Token'] = self.token
|
||||||
response = super(TestingApp, self).do_request(req, *args, **kwargs)
|
response = super(TestingApp, self).do_request(req, *args, **kwargs)
|
||||||
self.storage.process_background_tasks(self.indexer)
|
self.storage.process_background_tasks(self.indexer, True)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,7 @@ class TestStatsd(tests_base.TestCase):
|
||||||
|
|
||||||
metric = r.get_metric(metric_key)
|
metric = r.get_metric(metric_key)
|
||||||
|
|
||||||
self.stats.storage.process_background_tasks(self.stats.indexer)
|
self.stats.storage.process_background_tasks(self.stats.indexer, True)
|
||||||
|
|
||||||
measures = self.stats.storage.get_measures(metric)
|
measures = self.stats.storage.get_measures(metric)
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
|
@ -86,7 +86,7 @@ class TestStatsd(tests_base.TestCase):
|
||||||
("127.0.0.1", 12345))
|
("127.0.0.1", 12345))
|
||||||
self.stats.flush()
|
self.stats.flush()
|
||||||
|
|
||||||
self.stats.storage.process_background_tasks(self.stats.indexer)
|
self.stats.storage.process_background_tasks(self.stats.indexer, True)
|
||||||
|
|
||||||
measures = self.stats.storage.get_measures(metric)
|
measures = self.stats.storage.get_measures(metric)
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
|
@ -117,7 +117,7 @@ class TestStatsd(tests_base.TestCase):
|
||||||
with_metrics=True)
|
with_metrics=True)
|
||||||
metric = r.get_metric(metric_key)
|
metric = r.get_metric(metric_key)
|
||||||
|
|
||||||
self.stats.storage.process_background_tasks(self.stats.indexer)
|
self.stats.storage.process_background_tasks(self.stats.indexer, True)
|
||||||
|
|
||||||
measures = self.stats.storage.get_measures(metric)
|
measures = self.stats.storage.get_measures(metric)
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
|
@ -134,7 +134,7 @@ class TestStatsd(tests_base.TestCase):
|
||||||
("127.0.0.1", 12345))
|
("127.0.0.1", 12345))
|
||||||
self.stats.flush()
|
self.stats.flush()
|
||||||
|
|
||||||
self.stats.storage.process_background_tasks(self.stats.indexer)
|
self.stats.storage.process_background_tasks(self.stats.indexer, True)
|
||||||
|
|
||||||
measures = self.stats.storage.get_measures(metric)
|
measures = self.stats.storage.get_measures(metric)
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
|
|
|
@ -49,7 +49,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
])
|
])
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [self.metric]
|
f.return_value = [self.metric]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
self.storage.add_measures(self.metric, [
|
self.storage.add_measures(self.metric, [
|
||||||
storage.Measure(datetime.datetime(2014, 1, 1, 13, 0, 1), 1),
|
storage.Measure(datetime.datetime(2014, 1, 1, 13, 0, 1), 1),
|
||||||
|
@ -60,7 +60,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
side_effect=ValueError("boom!")):
|
side_effect=ValueError("boom!")):
|
||||||
with mock.patch('gnocchi.carbonara.msgpack.loads',
|
with mock.patch('gnocchi.carbonara.msgpack.loads',
|
||||||
side_effect=ValueError("boom!")):
|
side_effect=ValueError("boom!")):
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
expected_calls = [
|
expected_calls = [
|
||||||
mock.call.debug('Processing measures for %s' % self.metric.id),
|
mock.call.debug('Processing measures for %s' % self.metric.id),
|
||||||
|
@ -86,7 +86,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
])
|
])
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [self.metric]
|
f.return_value = [self.metric]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
self.storage.delete_metric(self.metric)
|
self.storage.delete_metric(self.metric)
|
||||||
|
|
||||||
def test_delete_nonempty_metric_unprocessed(self):
|
def test_delete_nonempty_metric_unprocessed(self):
|
||||||
|
@ -129,7 +129,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
for i in six.moves.range(0, 60) for j in six.moves.range(0, 60)])
|
for i in six.moves.range(0, 60) for j in six.moves.range(0, 60)])
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [m]
|
f.return_value = [m]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
self.assertEqual(3661, len(self.storage.get_measures(m)))
|
self.assertEqual(3661, len(self.storage.get_measures(m)))
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
])
|
])
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [self.metric]
|
f.return_value = [self.metric]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
(utils.datetime_utc(2014, 1, 1), 86400.0, 39.75),
|
(utils.datetime_utc(2014, 1, 1), 86400.0, 39.75),
|
||||||
|
@ -277,7 +277,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
])
|
])
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [self.metric, metric2]
|
f.return_value = [self.metric, metric2]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
values = self.storage.get_cross_metric_measures([self.metric, metric2])
|
values = self.storage.get_cross_metric_measures([self.metric, metric2])
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
|
@ -337,7 +337,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
])
|
])
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [self.metric, metric2]
|
f.return_value = [self.metric, metric2]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
values = self.storage.get_cross_metric_measures([self.metric, metric2])
|
values = self.storage.get_cross_metric_measures([self.metric, metric2])
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
|
@ -367,7 +367,7 @@ class TestStorageDriver(tests_base.TestCase):
|
||||||
])
|
])
|
||||||
with mock.patch.object(self.index, 'get_metrics') as f:
|
with mock.patch.object(self.index, 'get_metrics') as f:
|
||||||
f.return_value = [self.metric, metric2]
|
f.return_value = [self.metric, metric2]
|
||||||
self.storage.process_background_tasks(self.index)
|
self.storage.process_background_tasks(self.index, True)
|
||||||
|
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
{metric2: [],
|
{metric2: [],
|
||||||
|
|
Loading…
Reference in New Issue