Retrieve metrics in eventlet greenthreads

This updates the orchestrator in order to fetch metrics concurrently
with an eventlet Greenpool instead of one after another.

Change-Id: If19924175c9a5532dd272696a52daf5d8a21153b
Story: 2005423
Task: 30446
This commit is contained in:
Luka Peschke 2019-04-11 17:13:44 +02:00
parent c9889f2755
commit 07163cb627
3 changed files with 107 additions and 48 deletions

View File

@ -22,6 +22,7 @@ import sys
import time
import cotyledon
import eventlet
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@ -55,6 +56,8 @@ orchestrator_opts = [
default=multiprocessing.cpu_count(),
min=1,
help='Max nb of workers to run. Defaults to the nb of available CPUs'),
cfg.IntOpt('max_greenthreads', default=100, min=1,
help='Maximal number of greenthreads to use per worker.'),
]
CONF.register_opts(orchestrator_opts, group='orchestrator')
@ -182,11 +185,48 @@ class Worker(BaseWorker):
next_timestamp,
self._tenant_id,
)
if not raw_data:
raise collector.NoDataCollected
if raw_data:
return [{'period': {'begin': start_timestamp,
'end': next_timestamp},
'usage': raw_data}]
return {'period': {'begin': start_timestamp,
'end': next_timestamp},
'usage': raw_data}
def _do_collection(self, metrics, timestamp):
def _get_result(metric):
try:
return self._collect(metric, timestamp)
except collector.NoDataCollected:
LOG.info(
'[scope: {scope}, worker: {worker}] No data collected '
'for metric {metric} at timestamp {ts}'.format(
scope=self._tenant_id,
worker=self._worker_id,
metric=metric,
ts=ck_utils.ts2dt(timestamp))
)
return None
except Exception as e:
LOG.warning(
'[scope: {scope}, worker: {worker}] Error while collecting'
' metric {metric} at timestamp {ts}: {e}. Exiting.'.format(
scope=self._tenant_id,
worker=self._worker_id,
metric=metric,
ts=ck_utils.ts2dt(timestamp),
e=e)
)
# FIXME(peschk_l): here we just exit, and the
# collection will be retried during the next collect
# cycle. In the future, we should implement a retrying
# system in workers
sys.exit(1)
return list(filter(
lambda x: x is not None,
eventlet.GreenPool(size=CONF.orchestrator.max_greenthreads).imap(
_get_result, metrics)))
def check_state(self):
timestamp = self._state.get_state(self._tenant_id)
@ -202,50 +242,15 @@ class Worker(BaseWorker):
metrics = list(self._conf['metrics'].keys())
storage_data = []
for metric in metrics:
try:
try:
data = self._collect(metric, timestamp)
except collector.NoDataCollected:
raise
except Exception as e:
LOG.warning(
'[scope: {scope}, worker: {worker}] Error while'
'collecting metric {metric}: {error}. Retrying on '
'next collection cycle.'.format(
scope=self._tenant_id,
worker=self._worker_id,
metric=metric,
error=e,
),
)
# FIXME(peschk_l): here we just exit, and the
# collection will be retried during the next collect
# cycle. In the future, we should implement a retrying
# system in workers
sys.exit(0)
except collector.NoDataCollected:
LOG.info(
'[scope: {scope}, worker: {worker}] No data collected '
'for metric {metric} at timestamp {ts}'.format(
scope=self._tenant_id,
worker=self._worker_id,
metric=metric,
ts=ck_utils.ts2dt(timestamp))
)
else:
# Rating
for processor in self._processors:
processor.obj.process(data)
# Writing
if isinstance(data, list):
storage_data += data
else:
storage_data.append(data)
# Collection
data = self._do_collection(metrics, timestamp)
# We're getting a full period so we directly commit
self._storage.push(storage_data, self._tenant_id)
# Rating
for processor in self._processors:
processor.obj.process(data)
# Writing
self._storage.push(data, self._tenant_id)
self._state.set_state(self._tenant_id, timestamp)
@ -340,4 +345,5 @@ class OrchestratorServiceManager(cotyledon.ServiceManager):
def __init__(self):
super(OrchestratorServiceManager, self).__init__()
self.service_id = self.add(Orchestrator, workers=4)
self.service_id = self.add(Orchestrator,
workers=CONF.orchestrator.max_workers)

View File

@ -19,6 +19,7 @@ import mock
from oslo_messaging import conffixture
from stevedore import extension
from cloudkitty import collector
from cloudkitty import orchestrator
from cloudkitty import tests
@ -88,3 +89,48 @@ class OrchestratorTest(tests.TestCase):
self.assertEqual(2, worker._processors[1].obj.priority)
self.assertEqual('fake2', worker._processors[2].name)
self.assertEqual(1, worker._processors[2].obj.priority)
class WorkerTest(tests.TestCase):
def setUp(self):
super(WorkerTest, self).setUp()
class FakeWorker(orchestrator.Worker):
def __init__(self):
self._tenant_id = 'a'
self._worker_id = '0'
self.worker = FakeWorker()
self.worker._collect = mock.MagicMock()
def test_do_collection_all_valid(self):
side_effect = [
{'period': {'begin': 0,
'end': 3600},
'usage': [i]}
for i in range(5)
]
self.worker._collect.side_effect = side_effect
metrics = ['metric{}'.format(i) for i in range(5)]
output = sorted(self.worker._do_collection(metrics, 0),
key=lambda x: x['usage'][0])
self.assertEqual(side_effect, output)
def test_do_collection_some_empty(self):
side_effect = [
{'period': {'begin': 0,
'end': 3600},
'usage': [i]}
for i in range(5)
]
side_effect.insert(2, collector.NoDataCollected('a', 'b'))
side_effect.insert(4, collector.NoDataCollected('a', 'b'))
self.worker._collect.side_effect = side_effect
metrics = ['metric{}'.format(i) for i in range(7)]
output = sorted(self.worker._do_collection(metrics, 0),
key=lambda x: x['usage'][0])
self.assertEqual([
i for i in side_effect
if not isinstance(i, collector.NoDataCollected)
], output)

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
Metrics are now fetched concurrently with ``eventlet`` instead of one
after another by the orchestrator, leading to a consequent performance
improvement. The maximum number of greenthreads to use can be specified
through the ``max_greenthreads`` option of the ``orchestrator`` section.