drop scheduler process
as we move to multiple buckets, we will let each worker figure out it's whether a bucket needs to be processed and thus we don't need a central scheduler to distribute work around. for now, all workers will go ham and work on same bucket. Change-Id: I659f93545961b2943902eaefe7c7bd622abdc42f
This commit is contained in:
parent
733d37a780
commit
2981b3f009
@ -13,22 +13,16 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import multiprocessing
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
import cotyledon
|
||||
from cotyledon import oslo_config_glue
|
||||
from futurist import periodics
|
||||
import msgpack
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
import tenacity
|
||||
import tooz
|
||||
from tooz import coordination
|
||||
|
||||
from gnocchi import archive_policy
|
||||
from gnocchi import genconfig
|
||||
@ -143,70 +137,26 @@ class MetricReporting(MetricProcessBase):
|
||||
exc_info=True)
|
||||
|
||||
|
||||
class MetricScheduler(MetricProcessBase):
|
||||
name = "scheduler"
|
||||
GROUP_ID = "gnocchi-scheduler"
|
||||
SYNC_RATE = 30
|
||||
BLOCK_SIZE = 4
|
||||
class MetricProcessor(MetricProcessBase):
|
||||
name = "processing"
|
||||
|
||||
def __init__(self, worker_id, conf, queue):
|
||||
super(MetricScheduler, self).__init__(
|
||||
def __init__(self, worker_id, conf):
|
||||
super(MetricProcessor, self).__init__(
|
||||
worker_id, conf, conf.metricd.metric_processing_delay)
|
||||
self._coord, self._my_id = utils.get_coordinator_and_start(
|
||||
conf.storage.coordination_url)
|
||||
self.queue = queue
|
||||
self.workers = conf.metricd.workers
|
||||
self.periodic = None
|
||||
|
||||
@utils.retry
|
||||
def _configure(self):
|
||||
super(MetricScheduler, self)._configure()
|
||||
try:
|
||||
cap = msgpack.dumps({'workers': self.workers})
|
||||
join_req = self._coord.join_group(self.GROUP_ID, cap)
|
||||
join_req.get()
|
||||
LOG.info('Joined coordination group: %s', self.GROUP_ID)
|
||||
|
||||
@periodics.periodic(spacing=self.SYNC_RATE, run_immediately=True)
|
||||
def run_watchers():
|
||||
self._coord.run_watchers()
|
||||
|
||||
self.periodic = periodics.PeriodicWorker.create([])
|
||||
self.periodic.add(run_watchers)
|
||||
t = threading.Thread(target=self.periodic.start)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
except coordination.GroupNotCreated as e:
|
||||
create_group_req = self._coord.create_group(self.GROUP_ID)
|
||||
try:
|
||||
create_group_req.get()
|
||||
except coordination.GroupAlreadyExist:
|
||||
pass
|
||||
raise tenacity.TryAgain(e)
|
||||
except tooz.NotImplemented:
|
||||
LOG.warning('Configured coordination driver does not support '
|
||||
'required functionality. Coordination is disabled.')
|
||||
except Exception as e:
|
||||
LOG.error('Failed to configure coordination. Coordination is '
|
||||
'disabled: %s', e)
|
||||
|
||||
def _run_job(self):
|
||||
try:
|
||||
metrics = list(
|
||||
self.store.incoming.list_metric_with_measures_to_process())
|
||||
for i in six.moves.range(0, len(metrics), self.BLOCK_SIZE):
|
||||
self.queue.put(metrics[i:i + self.BLOCK_SIZE])
|
||||
LOG.debug("%d metrics scheduled for processing.", len(metrics))
|
||||
self.store.process_background_tasks(self.index, metrics)
|
||||
except Exception:
|
||||
LOG.error("Unexpected error scheduling metrics for processing",
|
||||
exc_info=True)
|
||||
|
||||
def close_services(self):
|
||||
if self.periodic:
|
||||
self.periodic.stop()
|
||||
self.periodic.wait()
|
||||
self._coord.leave_group(self.GROUP_ID)
|
||||
self._coord.stop()
|
||||
|
||||
|
||||
@ -225,38 +175,14 @@ class MetricJanitor(MetricProcessBase):
|
||||
LOG.error("Unexpected error during metric cleanup", exc_info=True)
|
||||
|
||||
|
||||
class MetricProcessor(MetricProcessBase):
|
||||
name = "processing"
|
||||
|
||||
def __init__(self, worker_id, conf, queue):
|
||||
super(MetricProcessor, self).__init__(worker_id, conf, 0)
|
||||
self.queue = queue
|
||||
|
||||
def _run_job(self):
|
||||
try:
|
||||
try:
|
||||
metrics = self.queue.get(block=True, timeout=10)
|
||||
except six.moves.queue.Empty:
|
||||
# NOTE(sileht): Allow the process to exit gracefully every
|
||||
# 10 seconds
|
||||
return
|
||||
self.store.process_background_tasks(self.index, metrics)
|
||||
except Exception:
|
||||
LOG.error("Unexpected error during measures processing",
|
||||
exc_info=True)
|
||||
|
||||
|
||||
class MetricdServiceManager(cotyledon.ServiceManager):
|
||||
def __init__(self, conf):
|
||||
super(MetricdServiceManager, self).__init__()
|
||||
oslo_config_glue.setup(self, conf)
|
||||
|
||||
self.conf = conf
|
||||
self.queue = multiprocessing.Manager().Queue()
|
||||
|
||||
self.add(MetricScheduler, args=(self.conf, self.queue))
|
||||
self.metric_processor_id = self.add(
|
||||
MetricProcessor, args=(self.conf, self.queue),
|
||||
MetricProcessor, args=(self.conf,),
|
||||
workers=conf.metricd.workers)
|
||||
if self.conf.metricd.metric_reporting_delay >= 0:
|
||||
self.add(MetricReporting, args=(self.conf,))
|
||||
|
@ -23,4 +23,3 @@ tenacity>=3.1.0 # Apache-2.0
|
||||
WebOb>=1.4.1
|
||||
Paste
|
||||
PasteDeploy
|
||||
msgpack-python
|
||||
|
Loading…
Reference in New Issue
Block a user