statsd+metric: make retry code common
This makes sure the retry code to retry the connection to the coordinator is shared and work in both cases. Until now, it only worked for metricd. We don't care about indexer right now, as oslo.db is in charge of retrying. Change-Id: I9323e66d72e325c071788caaa90fb14ba93ade51
This commit is contained in:
parent
94d773b0d5
commit
a59c759a54
@ -1,5 +1,5 @@
|
||||
# Copyright (c) 2013 Mirantis Inc.
|
||||
# Copyright (c) 2015 Red Hat
|
||||
# Copyright (c) 2015-2016 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -25,7 +25,6 @@ import msgpack
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
import retrying
|
||||
import six
|
||||
import tooz
|
||||
from tooz import coordination
|
||||
@ -35,6 +34,7 @@ from gnocchi import indexer
|
||||
from gnocchi import service
|
||||
from gnocchi import statsd as statsd_service
|
||||
from gnocchi import storage
|
||||
from gnocchi import utils
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -76,14 +76,6 @@ def statsd():
|
||||
statsd_service.start()
|
||||
|
||||
|
||||
class Retry(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def retry_if_retry_is_raised(exception):
|
||||
return isinstance(exception, Retry)
|
||||
|
||||
|
||||
class MetricProcessBase(cotyledon.Service):
|
||||
def __init__(self, worker_id, conf, interval_delay=0):
|
||||
super(MetricProcessBase, self).__init__(worker_id)
|
||||
@ -93,22 +85,10 @@ class MetricProcessBase(cotyledon.Service):
|
||||
self._shutdown = threading.Event()
|
||||
self._shutdown_done = threading.Event()
|
||||
|
||||
# Retry with exponential backoff for up to 1 minute
|
||||
@retrying.retry(wait_exponential_multiplier=500,
|
||||
wait_exponential_max=60000,
|
||||
retry_on_exception=retry_if_retry_is_raised)
|
||||
def _configure(self):
|
||||
try:
|
||||
self.store = storage.get_driver(self.conf)
|
||||
except storage.StorageError as e:
|
||||
LOG.error("Unable to initialize storage: %s" % e)
|
||||
raise Retry(e)
|
||||
try:
|
||||
self.index = indexer.get_driver(self.conf)
|
||||
self.index.connect()
|
||||
except indexer.IndexerException as e:
|
||||
LOG.error("Unable to initialize indexer: %s" % e)
|
||||
raise Retry(e)
|
||||
self.store = storage.get_driver(self.conf)
|
||||
self.index = indexer.get_driver(self.conf)
|
||||
self.index.connect()
|
||||
|
||||
def run(self):
|
||||
self._configure()
|
||||
@ -200,10 +180,7 @@ class MetricScheduler(MetricProcessBase):
|
||||
self.block_index = 0
|
||||
self.block_size = self.block_size_default
|
||||
|
||||
# Retry with exponential backoff for up to 1 minute
|
||||
@retrying.retry(wait_exponential_multiplier=500,
|
||||
wait_exponential_max=60000,
|
||||
retry_on_exception=retry_if_retry_is_raised)
|
||||
@utils.retry
|
||||
def _configure(self):
|
||||
super(MetricScheduler, self)._configure()
|
||||
try:
|
||||
@ -233,7 +210,7 @@ class MetricScheduler(MetricProcessBase):
|
||||
create_group_req.get()
|
||||
except coordination.GroupAlreadyExist:
|
||||
pass
|
||||
raise Retry(e)
|
||||
raise utils.Retry(e)
|
||||
except tooz.NotImplemented:
|
||||
LOG.warning('Configured coordination driver does not support '
|
||||
'required functionality. Coordination is disabled.')
|
||||
|
@ -30,6 +30,8 @@ from tooz import coordination
|
||||
|
||||
from gnocchi import carbonara
|
||||
from gnocchi import storage
|
||||
from gnocchi import utils
|
||||
|
||||
|
||||
OPTS = [
|
||||
cfg.IntOpt('aggregation_workers_number',
|
||||
@ -51,14 +53,19 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(CarbonaraBasedStorage, self).__init__(conf)
|
||||
self.coord = coordination.get_coordinator(
|
||||
conf.coordination_url,
|
||||
str(uuid.uuid4()).encode('ascii'))
|
||||
self.aggregation_workers_number = conf.aggregation_workers_number
|
||||
self.start()
|
||||
|
||||
@utils.retry
|
||||
def start(self):
|
||||
try:
|
||||
self.coord = coordination.get_coordinator(
|
||||
conf.coordination_url,
|
||||
str(uuid.uuid4()).encode('ascii'))
|
||||
self.coord.start(start_heart=True)
|
||||
except Exception as e:
|
||||
raise storage.StorageError("Unable to start coordinator: %s" % e)
|
||||
self.aggregation_workers_number = conf.aggregation_workers_number
|
||||
LOG.error("Unable to start coordinator: %s" % e)
|
||||
raise utils.Retry(e)
|
||||
|
||||
def stop(self):
|
||||
self.coord.stop()
|
||||
|
@ -1,6 +1,6 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2015 eNovance
|
||||
# Copyright © 2015-2016 eNovance
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -18,10 +18,10 @@ import datetime
|
||||
import iso8601
|
||||
from oslo_utils import timeutils
|
||||
from pytimeparse import timeparse
|
||||
import retrying
|
||||
import six
|
||||
import uuid
|
||||
|
||||
|
||||
# uuid5 namespace for id transformation.
|
||||
# NOTE(chdent): This UUID must stay the same, forever, across all
|
||||
# of gnocchi to preserve its value as a URN namespace.
|
||||
@ -50,6 +50,20 @@ def UUID(value):
|
||||
raise ValueError(e)
|
||||
|
||||
|
||||
class Retry(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def retry_if_retry_is_raised(exception):
|
||||
return isinstance(exception, Retry)
|
||||
|
||||
|
||||
# Retry with exponential backoff for up to 1 minute
|
||||
retry = retrying.retry(wait_exponential_multiplier=500,
|
||||
wait_exponential_max=60000,
|
||||
retry_on_exception=retry_if_retry_is_raised)
|
||||
|
||||
|
||||
def to_timestamp(v):
|
||||
if isinstance(v, datetime.datetime):
|
||||
return v
|
||||
|
Loading…
Reference in New Issue
Block a user