hide lock error on delete_metric

tooz non-blocking lock will raise an error if done through
contextmanager. delete_metric uses this functionality. we shouldn't
show an error log in this case since it likely means another process
is handling it. this raises an non-critical error to be caught so
worker skips processing, either another process handles it, or we
will come back to it.

also, metric should only be removed from indexer if storage completes
succesfully.

Change-Id: I3ab657072706c648cb66eee83a58b69feb919b93
This commit is contained in:
gord chung 2017-04-09 13:46:12 -04:00
parent 717d6c76ec
commit b6188b69c9
2 changed files with 19 additions and 8 deletions

View File

@ -129,6 +129,14 @@ class MetricUnaggregatable(StorageError):
% (", ".join((str(m.id) for m in metrics)), reason))
class LockedMetric(StorageError):
"""Error raised when this metric is already being handled by another."""
def __init__(self, metric):
self.metric = metric
super(LockedMetric, self).__init__("Metric %s is locked" % metric)
def get_driver_class(namespace, conf):
"""Return the storage driver class.
@ -190,18 +198,16 @@ class StorageDriver(object):
for m in metrics_to_expunge:
try:
self.delete_metric(m, sync)
index.expunge_metric(m.id)
except (indexer.NoSuchMetric, LockedMetric):
# It's possible another process deleted or is deleting the
# metric, not a big deal
pass
except Exception:
if sync:
raise
LOG.error("Unable to expunge metric %s from storage", m,
exc_info=True)
continue
try:
index.expunge_metric(m.id)
except indexer.NoSuchMetric:
# It's possible another process deleted the metric in the mean
# time, not a big deal
pass
@staticmethod
def process_new_measures(indexer, metrics, sync=False):

View File

@ -339,9 +339,14 @@ class CarbonaraBasedStorage(storage.StorageDriver):
def delete_metric(self, metric, sync=False):
LOG.debug("Deleting metric %s", metric)
with self._lock(metric.id)(blocking=sync):
lock = self._lock(metric.id)
if not lock.acquire(blocking=sync):
raise storage.LockedMetric(metric)
try:
self._delete_metric(metric)
self.incoming.delete_unprocessed_measures_for_metric_id(metric.id)
finally:
lock.release()
@staticmethod
def _delete_metric_measures(metric, timestamp_key,