drop single bucket partitioning
as we move to multiple buckets, we want workers to process buckets completely. this removes any internal bucket partitioning as first step Change-Id: I4ac40688e3069965d5899ae024d3183989e281e9
This commit is contained in:
parent
b1f03e1b32
commit
733d37a780
@ -145,10 +145,8 @@ class MetricReporting(MetricProcessBase):
|
||||
|
||||
class MetricScheduler(MetricProcessBase):
|
||||
name = "scheduler"
|
||||
MAX_OVERLAP = 0.3
|
||||
GROUP_ID = "gnocchi-scheduler"
|
||||
SYNC_RATE = 30
|
||||
TASKS_PER_WORKER = 16
|
||||
BLOCK_SIZE = 4
|
||||
|
||||
def __init__(self, worker_id, conf, queue):
|
||||
@ -157,32 +155,9 @@ class MetricScheduler(MetricProcessBase):
|
||||
self._coord, self._my_id = utils.get_coordinator_and_start(
|
||||
conf.storage.coordination_url)
|
||||
self.queue = queue
|
||||
self.previously_scheduled_metrics = set()
|
||||
self.workers = conf.metricd.workers
|
||||
self.block_index = 0
|
||||
self.block_size_default = self.workers * self.TASKS_PER_WORKER
|
||||
self.block_size = self.block_size_default
|
||||
self.periodic = None
|
||||
|
||||
def set_block(self, event):
|
||||
get_members_req = self._coord.get_members(self.GROUP_ID)
|
||||
try:
|
||||
members = sorted(get_members_req.get())
|
||||
self.block_index = members.index(self._my_id)
|
||||
reqs = list(self._coord.get_member_capabilities(self.GROUP_ID, m)
|
||||
for m in members)
|
||||
for req in reqs:
|
||||
cap = msgpack.loads(req.get(), encoding='utf-8')
|
||||
max_workers = max(cap['workers'], self.workers)
|
||||
self.block_size = max_workers * self.TASKS_PER_WORKER
|
||||
LOG.info('New set of agents detected. Now working on block: %s, '
|
||||
'with up to %s metrics', self.block_index,
|
||||
self.block_size)
|
||||
except Exception:
|
||||
LOG.warning('Error getting block to work on, defaulting to first')
|
||||
self.block_index = 0
|
||||
self.block_size = self.block_size_default
|
||||
|
||||
@utils.retry
|
||||
def _configure(self):
|
||||
super(MetricScheduler, self)._configure()
|
||||
@ -191,7 +166,6 @@ class MetricScheduler(MetricProcessBase):
|
||||
join_req = self._coord.join_group(self.GROUP_ID, cap)
|
||||
join_req.get()
|
||||
LOG.info('Joined coordination group: %s', self.GROUP_ID)
|
||||
self.set_block(None)
|
||||
|
||||
@periodics.periodic(spacing=self.SYNC_RATE, run_immediately=True)
|
||||
def run_watchers():
|
||||
@ -203,8 +177,6 @@ class MetricScheduler(MetricProcessBase):
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
self._coord.watch_join_group(self.GROUP_ID, self.set_block)
|
||||
self._coord.watch_leave_group(self.GROUP_ID, self.set_block)
|
||||
except coordination.GroupNotCreated as e:
|
||||
create_group_req = self._coord.create_group(self.GROUP_ID)
|
||||
try:
|
||||
@ -221,23 +193,10 @@ class MetricScheduler(MetricProcessBase):
|
||||
|
||||
def _run_job(self):
|
||||
try:
|
||||
metrics = set(
|
||||
self.store.incoming.list_metric_with_measures_to_process(
|
||||
self.block_size, self.block_index))
|
||||
if metrics and not self.queue.empty():
|
||||
# NOTE(gordc): drop metrics we previously process to avoid
|
||||
# handling twice
|
||||
number_of_scheduled_metrics = len(metrics)
|
||||
metrics = metrics - self.previously_scheduled_metrics
|
||||
if (float(number_of_scheduled_metrics - len(metrics)) /
|
||||
self.block_size > self.MAX_OVERLAP):
|
||||
LOG.warning('Metric processing lagging scheduling rate. '
|
||||
'It is recommended to increase the number of '
|
||||
'workers or to lengthen processing interval.')
|
||||
metrics = list(metrics)
|
||||
metrics = list(
|
||||
self.store.incoming.list_metric_with_measures_to_process())
|
||||
for i in six.moves.range(0, len(metrics), self.BLOCK_SIZE):
|
||||
self.queue.put(metrics[i:i + self.BLOCK_SIZE])
|
||||
self.previously_scheduled_metrics = set(metrics)
|
||||
LOG.debug("%d metrics scheduled for processing.", len(metrics))
|
||||
except Exception:
|
||||
LOG.error("Unexpected error scheduling metrics for processing",
|
||||
@ -326,8 +285,8 @@ def metricd_tester(conf):
|
||||
index = indexer.get_driver(conf)
|
||||
index.connect()
|
||||
s = storage.get_driver(conf)
|
||||
metrics = s.incoming.list_metric_with_measures_to_process(
|
||||
conf.stop_after_processing_metrics, 0)
|
||||
metrics = s.incoming.list_metric_with_measures_to_process()[
|
||||
:conf.stop_after_processing_metrics]
|
||||
s.process_new_measures(index, metrics, True)
|
||||
|
||||
|
||||
|
@ -52,5 +52,5 @@ class StorageDriver(object):
|
||||
raise exceptions.NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def list_metric_with_measures_to_process(size, part, full=False):
|
||||
def list_metric_with_measures_to_process():
|
||||
raise NotImplementedError
|
||||
|
@ -67,10 +67,6 @@ class CarbonaraBasedStorage(incoming.StorageDriver):
|
||||
def _build_report(details):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def list_metric_with_measures_to_process(size, part, full=False):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def delete_unprocessed_measures_for_metric_id(metric_id):
|
||||
raise NotImplementedError
|
||||
|
@ -16,7 +16,6 @@ import contextlib
|
||||
import datetime
|
||||
import errno
|
||||
import functools
|
||||
import itertools
|
||||
import uuid
|
||||
|
||||
|
||||
@ -119,14 +118,18 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
||||
return ()
|
||||
return (k for k, v in omaps)
|
||||
|
||||
def list_metric_with_measures_to_process(self, size, part, full=False):
|
||||
names = self._list_object_names_to_process(limit=-1 if full else
|
||||
size * (part + 1))
|
||||
if full:
|
||||
objs_it = names
|
||||
else:
|
||||
objs_it = itertools.islice(names, size * part, size * (part + 1))
|
||||
return set([name.split("_")[1] for name in objs_it])
|
||||
def list_metric_with_measures_to_process(self):
|
||||
names = set()
|
||||
marker = ""
|
||||
while True:
|
||||
obj_names = list(self._list_object_names_to_process(
|
||||
marker=marker, limit=1000))
|
||||
names.update(name.split("_")[1] for name in obj_names)
|
||||
if len(obj_names) < 1000:
|
||||
break
|
||||
else:
|
||||
marker = obj_names[-1]
|
||||
return names
|
||||
|
||||
def delete_unprocessed_measures_for_metric_id(self, metric_id):
|
||||
object_prefix = self.MEASURE_PREFIX + "_" + str(metric_id)
|
||||
|
@ -75,11 +75,8 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
|
||||
return (len(metric_details.keys()), sum(metric_details.values()),
|
||||
metric_details if details else None)
|
||||
|
||||
def list_metric_with_measures_to_process(self, size, part, full=False):
|
||||
if full:
|
||||
return set(os.listdir(self.measure_path))
|
||||
return set(
|
||||
os.listdir(self.measure_path)[size * part:size * (part + 1)])
|
||||
def list_metric_with_measures_to_process(self):
|
||||
return set(os.listdir(self.measure_path))
|
||||
|
||||
def _list_measures_container_for_metric_id(self, metric_id):
|
||||
try:
|
||||
|
@ -46,13 +46,11 @@ class RedisStorage(_carbonara.CarbonaraBasedStorage):
|
||||
return (len(metric_details.keys()), sum(metric_details.values()),
|
||||
metric_details if details else None)
|
||||
|
||||
def list_metric_with_measures_to_process(self, size, part, full=False):
|
||||
def list_metric_with_measures_to_process(self):
|
||||
match = redis.SEP.join([self.STORAGE_PREFIX, "*"])
|
||||
keys = self._client.scan_iter(match=match, count=1000)
|
||||
measures = set([k.decode('utf8').split(redis.SEP)[1] for k in keys])
|
||||
if full:
|
||||
return measures
|
||||
return set(list(measures)[size * part:size * (part + 1)])
|
||||
return measures
|
||||
|
||||
def delete_unprocessed_measures_for_metric_id(self, metric_id):
|
||||
self._client.delete(self._build_measure_path(metric_id))
|
||||
|
@ -80,12 +80,8 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
||||
return (len(metric_details), sum(metric_details.values()),
|
||||
metric_details if details else None)
|
||||
|
||||
def list_metric_with_measures_to_process(self, size, part, full=False):
|
||||
if full:
|
||||
limit = 1000 # 1000 is the default anyway
|
||||
else:
|
||||
limit = size * (part + 1)
|
||||
|
||||
def list_metric_with_measures_to_process(self):
|
||||
limit = 1000 # 1000 is the default anyway
|
||||
metrics = set()
|
||||
response = {}
|
||||
# Handle pagination
|
||||
@ -103,11 +99,7 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
||||
**kwargs)
|
||||
for p in response.get('CommonPrefixes', ()):
|
||||
metrics.add(p['Prefix'].rstrip('/'))
|
||||
|
||||
if full:
|
||||
return metrics
|
||||
|
||||
return sorted(list(metrics))[size * part:]
|
||||
return metrics
|
||||
|
||||
def _list_measure_files_for_metric_id(self, metric_id):
|
||||
files = set()
|
||||
|
@ -58,16 +58,10 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
|
||||
measures = int(headers.get('x-container-object-count'))
|
||||
return nb_metrics, measures, metric_details if details else None
|
||||
|
||||
def list_metric_with_measures_to_process(self, size, part, full=False):
|
||||
limit = None
|
||||
if not full:
|
||||
limit = size * (part + 1)
|
||||
def list_metric_with_measures_to_process(self):
|
||||
headers, files = self.swift.get_container(self.MEASURE_PREFIX,
|
||||
delimiter='/',
|
||||
full_listing=full,
|
||||
limit=limit)
|
||||
if not full:
|
||||
files = files[size * part:]
|
||||
full_listing=True)
|
||||
return set(f['subdir'][:-1] for f in files if 'subdir' in f)
|
||||
|
||||
def _list_measure_files_for_metric_id(self, metric_id):
|
||||
|
@ -173,8 +173,7 @@ class MetricdThread(threading.Thread):
|
||||
def run(self):
|
||||
incoming = self.storage.incoming
|
||||
while self.flag:
|
||||
metrics = incoming.list_metric_with_measures_to_process(
|
||||
None, None, full=True)
|
||||
metrics = incoming.list_metric_with_measures_to_process()
|
||||
self.storage.process_background_tasks(self.index, metrics)
|
||||
time.sleep(0.1)
|
||||
|
||||
|
@ -60,8 +60,7 @@ class TestAggregates(tests_base.TestCase):
|
||||
for n, val in enumerate(data)]
|
||||
self.index.create_metric(metric.id, str(uuid.uuid4()), 'medium')
|
||||
self.storage.incoming.add_measures(metric, measures)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process(
|
||||
None, None, full=True)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
self.storage.process_background_tasks(self.index, metrics, sync=True)
|
||||
|
||||
return metric
|
||||
|
@ -122,8 +122,7 @@ class TestingApp(webtest.TestApp):
|
||||
req.headers['X-User-Id'] = self.USER_ID
|
||||
req.headers['X-Project-Id'] = self.PROJECT_ID
|
||||
response = super(TestingApp, self).do_request(req, *args, **kwargs)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process(
|
||||
None, None, full=True)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
self.storage.process_background_tasks(self.indexer, metrics, sync=True)
|
||||
return response
|
||||
|
||||
|
@ -77,18 +77,15 @@ class TestStorageDriver(tests_base.TestCase):
|
||||
self.assertIn((utils.datetime_utc(2014, 1, 1, 13), 300.0, 1), m)
|
||||
|
||||
def test_list_metric_with_measures_to_process(self):
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process(
|
||||
None, None, full=True)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
self.assertEqual(set(), metrics)
|
||||
self.storage.incoming.add_measures(self.metric, [
|
||||
storage.Measure(utils.dt_to_unix_ns(2014, 1, 1, 12, 0, 1), 69),
|
||||
])
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process(
|
||||
None, None, full=True)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
self.assertEqual(set([str(self.metric.id)]), metrics)
|
||||
self.trigger_processing()
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process(
|
||||
None, None, full=True)
|
||||
metrics = self.storage.incoming.list_metric_with_measures_to_process()
|
||||
self.assertEqual(set([]), metrics)
|
||||
|
||||
def test_delete_nonempty_metric(self):
|
||||
|
Loading…
Reference in New Issue
Block a user