push incoming into different sacks
having everything in one giant folder/bucket/container/object is bad because: - does not allow for good distribution of backend driver. - makes it hella hard to cleanly split work across multiple metricd - starves metrics from being processed. we used "sacks" to avoid naming same as driver paradigms. driver implementation will be done individually so this does nothing but pretend to lock multiple buckets when it's actually just one. Related-Bug: #1629420 Related-Bug: #1623263 Related-Bug: #1620674 Change-Id: Icc32d918fe55416385122470c47d60ddbb30dd34
This commit is contained in:
parent
b18e1d4d28
commit
9889bd04f5
@ -146,15 +146,31 @@ class MetricProcessor(MetricProcessBase):
|
||||
self._coord, self._my_id = utils.get_coordinator_and_start(
|
||||
conf.storage.coordination_url)
|
||||
|
||||
def _sack_lock(self, sack):
|
||||
lock_name = b'gnocchi-sack-%s-lock' % str(sack).encode('ascii')
|
||||
return self._coord.get_lock(lock_name)
|
||||
|
||||
def _run_job(self):
|
||||
try:
|
||||
metrics = list(
|
||||
self.store.incoming.list_metric_with_measures_to_process())
|
||||
LOG.debug("%d metrics scheduled for processing.", len(metrics))
|
||||
self.store.process_background_tasks(self.index, metrics)
|
||||
except Exception:
|
||||
LOG.error("Unexpected error scheduling metrics for processing",
|
||||
exc_info=True)
|
||||
m_count = 0
|
||||
s_count = 0
|
||||
in_store = self.store.incoming
|
||||
for s in six.moves.range(in_store.NUM_SACKS):
|
||||
# TODO(gordc): support delay release lock so we don't
|
||||
# process a sack right after another process
|
||||
lock = self._sack_lock(s)
|
||||
if not lock.acquire(blocking=False):
|
||||
continue
|
||||
try:
|
||||
metrics = in_store.list_metric_with_measures_to_process(s)
|
||||
m_count = len(metrics)
|
||||
self.store.process_background_tasks(self.index, metrics)
|
||||
s_count += 1
|
||||
except Exception:
|
||||
LOG.error("Unexpected error processing assigned job",
|
||||
exc_info=True)
|
||||
finally:
|
||||
lock.release()
|
||||
LOG.debug("%d metrics processed from %d sacks", m_count, s_count)
|
||||
|
||||
def close_services(self):
|
||||
self._coord.stop()
|
||||
@ -211,9 +227,13 @@ def metricd_tester(conf):
|
||||
index = indexer.get_driver(conf)
|
||||
index.connect()
|
||||
s = storage.get_driver(conf)
|
||||
metrics = s.incoming.list_metric_with_measures_to_process()[
|
||||
:conf.stop_after_processing_metrics]
|
||||
s.process_new_measures(index, metrics, True)
|
||||
metrics = set()
|
||||
for i in six.moves.range(s.incoming.NUM_SACKS):
|
||||
metrics.update(s.incoming.list_metric_with_measures_to_process(i))
|
||||
if len(metrics) >= conf.stop_after_processing_metrics:
|
||||
break
|
||||
s.process_new_measures(
|
||||
index, list(metrics)[:conf.stop_after_processing_metrics], True)
|
||||
|
||||
|
||||
def metricd():
|
||||
|
@ -60,5 +60,5 @@ class StorageDriver(object):
|
||||
raise exceptions.NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def list_metric_with_measures_to_process():
|
||||
def list_metric_with_measures_to_process(sack):
|
||||
raise NotImplementedError
|
||||
|
@ -32,9 +32,12 @@ _NUM_WORKERS = utils.get_default_workers()
|
||||
|
||||
class CarbonaraBasedStorage(incoming.StorageDriver):
|
||||
MEASURE_PREFIX = "measure"
|
||||
SACK_PREFIX = "incoming-%s"
|
||||
_MEASURE_SERIAL_FORMAT = "Qd"
|
||||
_MEASURE_SERIAL_LEN = struct.calcsize(_MEASURE_SERIAL_FORMAT)
|
||||
|
||||
NUM_SACKS = 8
|
||||
|
||||
def _unserialize_measures(self, measure_id, data):
|
||||
nb_measures = len(data) // self._MEASURE_SERIAL_LEN
|
||||
try:
|
||||
@ -85,3 +88,9 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
|
||||
@staticmethod
|
||||
def process_measure_for_metric(metric):
|
||||
raise NotImplementedError
|
||||
|
||||
def sack_for_metric(self, metric_id):
|
||||
return metric_id.int % self.NUM_SACKS
|
||||
|
||||
def get_sack_name(self, sack):
|
||||
return self.SACK_PREFIX % sack
|
||||
|
@ -124,7 +124,7 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
||||
|
||||
return (k for k, v in omaps)
|
||||
|
||||
def list_metric_with_measures_to_process(self):
|
||||
def list_metric_with_measures_to_process(self, sack):
|
||||
names = set()
|
||||
marker = ""
|
||||
while True:
|
||||
|
@ -75,7 +75,7 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
|
||||
return (len(metric_details.keys()), sum(metric_details.values()),
|
||||
metric_details if details else None)
|
||||
|
||||
def list_metric_with_measures_to_process(self):
|
||||
def list_metric_with_measures_to_process(self, sack):
|
||||
return set(os.listdir(self.measure_path))
|
||||
|
||||
def _list_measures_container_for_metric_id(self, metric_id):
|
||||
|
@ -46,7 +46,7 @@ class RedisStorage(_carbonara.CarbonaraBasedStorage):
|
||||
return (len(metric_details.keys()), sum(metric_details.values()),
|
||||
metric_details if details else None)
|
||||
|
||||
def list_metric_with_measures_to_process(self):
|
||||
def list_metric_with_measures_to_process(self, sack):
|
||||
match = redis.SEP.join([self.STORAGE_PREFIX, "*"])
|
||||
keys = self._client.scan_iter(match=match, count=1000)
|
||||
measures = set([k.decode('utf8').split(redis.SEP)[1] for k in keys])
|
||||
|
@ -20,7 +20,6 @@ import uuid
|
||||
|
||||
import six
|
||||
|
||||
|
||||
from gnocchi.storage.common import s3
|
||||
from gnocchi.storage.incoming import _carbonara
|
||||
|
||||
@ -80,7 +79,7 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
||||
return (len(metric_details), sum(metric_details.values()),
|
||||
metric_details if details else None)
|
||||
|
||||
def list_metric_with_measures_to_process(self):
|
||||
def list_metric_with_measures_to_process(self, sack):
|
||||
limit = 1000 # 1000 is the default anyway
|
||||
metrics = set()
|
||||
response = {}
|
||||
|
@ -58,7 +58,7 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
|
||||
measures = int(headers.get('x-container-object-count'))
|
||||
return nb_metrics, measures, metric_details if details else None
|
||||
|
||||
def list_metric_with_measures_to_process(self):
|
||||
def list_metric_with_measures_to_process(self, sack):
|
||||
headers, files = self.swift.get_container(self.MEASURE_PREFIX,
|
||||
delimiter='/',
|
||||
full_listing=True)
|
||||
|
@ -34,6 +34,7 @@ from gnocchi.indexer import sqlalchemy
|
||||
from gnocchi.rest import app
|
||||
from gnocchi import service
|
||||
from gnocchi import storage
|
||||
from gnocchi.tests import utils
|
||||
|
||||
|
||||
# NOTE(chdent): Hack to restore semblance of global configuration to
|
||||
@ -179,9 +180,8 @@ class MetricdThread(threading.Thread):
|
||||
self.flag = True
|
||||
|
||||
def run(self):
|
||||
incoming = self.storage.incoming
|
||||
while self.flag:
|
||||
metrics = incoming.list_metric_with_measures_to_process()
|
||||
metrics = utils.list_all_incoming_metrics(self.storage.incoming)
|
||||
self.storage.process_background_tasks(self.index, metrics)
|
||||
time.sleep(0.1)
|
||||
|
||||
|
@ -23,6 +23,7 @@ from gnocchi import aggregates
|
||||
from gnocchi.aggregates import moving_stats
|
||||
from gnocchi import storage
|
||||
from gnocchi.tests import base as tests_base
|
||||
from gnocchi.tests import utils as tests_utils
|
||||
from gnocchi import utils
|
||||
|
||||
|
||||
@ -60,7 +61,7 @@ class TestAggregates(tests_base.TestCase):
|
||||
for n, val in enumerate(data)]
|
||||
self.index.create_metric(metric.id, str(uuid.uuid4()), 'medium')
|
||||
self.storage.incoming.add_measures(metric, measures)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
metrics = tests_utils.list_all_incoming_metrics(self.storage.incoming)
|
||||
self.storage.process_background_tasks(self.index, metrics, sync=True)
|
||||
|
||||
return metric
|
||||
|
@ -36,6 +36,7 @@ from gnocchi import archive_policy
|
||||
from gnocchi import rest
|
||||
from gnocchi.rest import app
|
||||
from gnocchi.tests import base as tests_base
|
||||
from gnocchi.tests import utils as tests_utils
|
||||
from gnocchi import utils
|
||||
|
||||
|
||||
@ -122,7 +123,7 @@ class TestingApp(webtest.TestApp):
|
||||
req.headers['X-User-Id'] = self.USER_ID
|
||||
req.headers['X-Project-Id'] = self.PROJECT_ID
|
||||
response = super(TestingApp, self).do_request(req, *args, **kwargs)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
metrics = tests_utils.list_all_incoming_metrics(self.storage.incoming)
|
||||
self.storage.process_background_tasks(self.indexer, metrics, sync=True)
|
||||
return response
|
||||
|
||||
|
@ -27,6 +27,7 @@ from gnocchi import indexer
|
||||
from gnocchi import storage
|
||||
from gnocchi.storage import _carbonara
|
||||
from gnocchi.tests import base as tests_base
|
||||
from gnocchi.tests import utils as tests_utils
|
||||
from gnocchi import utils
|
||||
|
||||
|
||||
@ -97,15 +98,15 @@ class TestStorageDriver(tests_base.TestCase):
|
||||
self.assertIn((utils.datetime_utc(2014, 1, 1, 12), 300.0, 5.0), m)
|
||||
|
||||
def test_list_metric_with_measures_to_process(self):
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
metrics = tests_utils.list_all_incoming_metrics(self.storage.incoming)
|
||||
self.assertEqual(set(), metrics)
|
||||
self.storage.incoming.add_measures(self.metric, [
|
||||
storage.Measure(utils.dt_to_unix_ns(2014, 1, 1, 12, 0, 1), 69),
|
||||
])
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
metrics = tests_utils.list_all_incoming_metrics(self.storage.incoming)
|
||||
self.assertEqual(set([str(self.metric.id)]), metrics)
|
||||
self.trigger_processing()
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
metrics = tests_utils.list_all_incoming_metrics(self.storage.incoming)
|
||||
self.assertEqual(set([]), metrics)
|
||||
|
||||
def test_delete_nonempty_metric(self):
|
||||
|
19
gnocchi/tests/utils.py
Normal file
19
gnocchi/tests/utils.py
Normal file
@ -0,0 +1,19 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import six
|
||||
|
||||
|
||||
def list_all_incoming_metrics(incoming):
|
||||
return set.union(*[incoming.list_metric_with_measures_to_process(i)
|
||||
for i in six.moves.range(incoming.NUM_SACKS)])
|
Loading…
Reference in New Issue
Block a user