From 2981b3f00902433395fe3ec6436cc2a2d6c4de56 Mon Sep 17 00:00:00 2001 From: gord chung Date: Tue, 4 Apr 2017 14:36:04 +0000 Subject: [PATCH] drop scheduler process as we move to multiple buckets, we will let each worker figure out it's whether a bucket needs to be processed and thus we don't need a central scheduler to distribute work around. for now, all workers will go ham and work on same bucket. Change-Id: I659f93545961b2943902eaefe7c7bd622abdc42f --- gnocchi/cli.py | 86 ++++-------------------------------------------- requirements.txt | 1 - setup.cfg | 1 - 3 files changed, 6 insertions(+), 82 deletions(-) diff --git a/gnocchi/cli.py b/gnocchi/cli.py index 56facb39..0a4a7dba 100644 --- a/gnocchi/cli.py +++ b/gnocchi/cli.py @@ -13,22 +13,16 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing import sys import threading import time import cotyledon from cotyledon import oslo_config_glue -from futurist import periodics -import msgpack from oslo_config import cfg from oslo_log import log from oslo_utils import timeutils import six -import tenacity -import tooz -from tooz import coordination from gnocchi import archive_policy from gnocchi import genconfig @@ -143,70 +137,26 @@ class MetricReporting(MetricProcessBase): exc_info=True) -class MetricScheduler(MetricProcessBase): - name = "scheduler" - GROUP_ID = "gnocchi-scheduler" - SYNC_RATE = 30 - BLOCK_SIZE = 4 +class MetricProcessor(MetricProcessBase): + name = "processing" - def __init__(self, worker_id, conf, queue): - super(MetricScheduler, self).__init__( + def __init__(self, worker_id, conf): + super(MetricProcessor, self).__init__( worker_id, conf, conf.metricd.metric_processing_delay) self._coord, self._my_id = utils.get_coordinator_and_start( conf.storage.coordination_url) - self.queue = queue - self.workers = conf.metricd.workers - self.periodic = None - - @utils.retry - def _configure(self): - super(MetricScheduler, self)._configure() - try: - cap = msgpack.dumps({'workers': self.workers}) - join_req = self._coord.join_group(self.GROUP_ID, cap) - join_req.get() - LOG.info('Joined coordination group: %s', self.GROUP_ID) - - @periodics.periodic(spacing=self.SYNC_RATE, run_immediately=True) - def run_watchers(): - self._coord.run_watchers() - - self.periodic = periodics.PeriodicWorker.create([]) - self.periodic.add(run_watchers) - t = threading.Thread(target=self.periodic.start) - t.daemon = True - t.start() - - except coordination.GroupNotCreated as e: - create_group_req = self._coord.create_group(self.GROUP_ID) - try: - create_group_req.get() - except coordination.GroupAlreadyExist: - pass - raise tenacity.TryAgain(e) - except tooz.NotImplemented: - LOG.warning('Configured coordination driver does not support ' - 'required functionality. Coordination is disabled.') - except Exception as e: - LOG.error('Failed to configure coordination. Coordination is ' - 'disabled: %s', e) def _run_job(self): try: metrics = list( self.store.incoming.list_metric_with_measures_to_process()) - for i in six.moves.range(0, len(metrics), self.BLOCK_SIZE): - self.queue.put(metrics[i:i + self.BLOCK_SIZE]) LOG.debug("%d metrics scheduled for processing.", len(metrics)) + self.store.process_background_tasks(self.index, metrics) except Exception: LOG.error("Unexpected error scheduling metrics for processing", exc_info=True) def close_services(self): - if self.periodic: - self.periodic.stop() - self.periodic.wait() - self._coord.leave_group(self.GROUP_ID) self._coord.stop() @@ -225,38 +175,14 @@ class MetricJanitor(MetricProcessBase): LOG.error("Unexpected error during metric cleanup", exc_info=True) -class MetricProcessor(MetricProcessBase): - name = "processing" - - def __init__(self, worker_id, conf, queue): - super(MetricProcessor, self).__init__(worker_id, conf, 0) - self.queue = queue - - def _run_job(self): - try: - try: - metrics = self.queue.get(block=True, timeout=10) - except six.moves.queue.Empty: - # NOTE(sileht): Allow the process to exit gracefully every - # 10 seconds - return - self.store.process_background_tasks(self.index, metrics) - except Exception: - LOG.error("Unexpected error during measures processing", - exc_info=True) - - class MetricdServiceManager(cotyledon.ServiceManager): def __init__(self, conf): super(MetricdServiceManager, self).__init__() oslo_config_glue.setup(self, conf) self.conf = conf - self.queue = multiprocessing.Manager().Queue() - - self.add(MetricScheduler, args=(self.conf, self.queue)) self.metric_processor_id = self.add( - MetricProcessor, args=(self.conf, self.queue), + MetricProcessor, args=(self.conf,), workers=conf.metricd.workers) if self.conf.metricd.metric_reporting_delay >= 0: self.add(MetricReporting, args=(self.conf,)) diff --git a/requirements.txt b/requirements.txt index f6f69b98..dae4c542 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,4 +23,3 @@ tenacity>=3.1.0 # Apache-2.0 WebOb>=1.4.1 Paste PasteDeploy -msgpack-python diff --git a/setup.cfg b/setup.cfg index 45dbbeb4..0be52afb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,7 +41,6 @@ s3 = tooz>=1.38 redis = redis>=2.10.0 # MIT - msgpack-python lz4>=0.9.0 tooz>=1.38 swift =