pollsters: Remove eventlet timers

This change removes usage of eventlet timers.

This allows coordinator heartbeat/watchers to work correctly
when the main thread is stuck for any reason (IO, time.sleep, ...).

This also fixes a concurrency issue in the notification-agent between
stop/reload_pipeline/refresh_agent that manipulate listeners.

For example a listener stopped by stop() could be restart by
reload_pipeline or refresh_agent. Now we use the coord_lock to
protect the listener manipulations and ensure we are not in a shutdown
process when we restart it.

This bug can't occurs with greenlet because we don't monkeypatch system
call for a while now and all of this methods wasn't ran in concurrency
manner. But remplacing greenlet by reel thread have show up the bug.

Closes-Bug: #1582641
Change-Id: I21c3b953a296316b983114435fcbeba1e29f051e
This commit is contained in:
Mehdi Abaakouk 2016-04-22 10:14:36 +02:00
parent 8c098a1745
commit 1ab0acb851
8 changed files with 177 additions and 104 deletions

View File

@ -18,6 +18,8 @@ import collections
import itertools
import random
from concurrent import futures
from futurist import periodics
from keystoneauth1 import exceptions as ka_exceptions
from keystoneclient import exceptions as ks_exceptions
from oslo_config import cfg
@ -274,8 +276,13 @@ class AgentManager(service_base.PipelineBasedService):
discoveries = (self._extensions('discover', namespace).extensions
for namespace in namespaces)
self.discoveries = list(itertools.chain(*list(discoveries)))
self.polling_periodics = None
self.partition_coordinator = coordination.PartitionCoordinator()
self.heartbeat_timer = utils.create_periodic(
target=self.partition_coordinator.heartbeat,
spacing=cfg.CONF.coordination.heartbeat,
run_immediately=True)
# Compose coordination group prefix.
# We'll use namespaces as the basement for this partitioning.
@ -345,27 +352,15 @@ class AgentManager(service_base.PipelineBasedService):
if not self.groups and self.partition_coordinator.is_active():
self.partition_coordinator.stop()
self.kill_heartbeat_timer()
self.heartbeat_timer.stop()
if self.groups and not self.partition_coordinator.is_active():
self.partition_coordinator.start()
self.heartbeat_timer = self.tg.add_timer(
cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat)
utils.spawn_thread(self.heartbeat_timer.start)
for group in self.groups:
self.partition_coordinator.join_group(group)
def kill_heartbeat_timer(self):
if not getattr(self, 'heartbeat_timer', None):
return
try:
self.heartbeat_timer.stop()
self.tg.timer_done(self.heartbeat_timer)
except Exception as e:
LOG.error(_LE("Failed to stop heartbeat timer due to: %s"), e)
def create_polling_task(self):
"""Create an initially empty polling task."""
return PollingTask(self)
@ -388,7 +383,12 @@ class AgentManager(service_base.PipelineBasedService):
discovery_group_id)
if discovery_group_id else None)
def configure_polling_tasks(self):
def start_polling_tasks(self):
# NOTE(sileht): 1000 is just the same as previous oslo_service code
self.polling_periodics = periodics.PeriodicWorker.create(
[], executor_factory=lambda:
futures.ThreadPoolExecutor(max_workers=1000))
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
@ -396,29 +396,33 @@ class AgentManager(service_base.PipelineBasedService):
delay_polling_time = random.randint(
0, cfg.CONF.shuffle_time_before_polling_task)
pollster_timers = []
data = self.setup_polling_tasks()
for interval, polling_task in data.items():
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
pollster_timers.append(self.tg.add_timer(interval,
self.interval_task,
initial_delay=delay_time,
task=polling_task))
return pollster_timers
@periodics.periodic(spacing=interval, run_immediately=False)
def task():
self.interval_task(polling_task)
utils.spawn_thread(utils.delayed, delay_time,
self.polling_periodics.add, task)
if data:
# Don't start useless threads if no task will run
utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
def start(self):
super(AgentManager, self).start()
self.polling_manager = pipeline.setup_polling()
self.join_partitioning_groups()
self.pollster_timers = self.configure_polling_tasks()
self.start_polling_tasks()
self.init_pipeline_refresh()
def stop(self):
if self.started:
self.stop_pollsters()
self.kill_heartbeat_timer()
self.stop_pollsters_tasks()
self.heartbeat_timer.stop()
self.partition_coordinator.stop()
super(AgentManager, self).stop()
@ -432,6 +436,16 @@ class AgentManager(service_base.PipelineBasedService):
@property
def keystone(self):
# FIXME(sileht): This lazy loading of keystone client doesn't
# look concurrently safe, we never see issue because once we have
# connected to keystone everything is fine, and because all pollsters
# are delayed during startup. But each polling task creates a new
# client and overrides it which has been created by other polling
# tasks. During this short time bad thing can occur.
#
# I think we must not reset keystone client before
# running a polling task, but refresh it periodicaly instead.
# NOTE(sileht): we do lazy loading of the keystone client
# for multiple reasons:
# * don't use it if no plugin need it
@ -501,25 +515,22 @@ class AgentManager(service_base.PipelineBasedService):
LOG.warning(_('Unknown discovery extension: %s') % name)
return resources
def stop_pollsters(self):
for x in self.pollster_timers:
try:
x.stop()
self.tg.timer_done(x)
except Exception:
LOG.error(_('Error stopping pollster.'), exc_info=True)
self.pollster_timers = []
def stop_pollsters_tasks(self):
if self.polling_periodics:
self.polling_periodics.stop()
self.polling_periodics.wait()
self.polling_periodics = None
def reload_pipeline(self):
if self.pipeline_validated:
LOG.info(_LI("Reconfiguring polling tasks."))
# stop existing pollsters and leave partitioning groups
self.stop_pollsters()
self.stop_pollsters_tasks()
for group in self.groups:
self.partition_coordinator.leave_group(group)
# re-create partitioning groups according to pipeline
# and configure polling tasks with latest pipeline conf
self.join_partitioning_groups()
self.pollster_timers = self.configure_polling_tasks()
self.start_polling_tasks()

View File

@ -126,13 +126,14 @@ class CollectorService(service_base.ServiceBase):
LOG.exception(_("UDP: Unable to store meter"))
def stop(self):
if self.sample_listener:
utils.kill_listeners([self.sample_listener])
if self.event_listener:
utils.kill_listeners([self.event_listener])
if self.udp_thread:
self.udp_run = False
self.udp_thread.join()
if self.started:
if self.sample_listener:
utils.kill_listeners([self.sample_listener])
if self.event_listener:
utils.kill_listeners([self.event_listener])
if self.udp_thread:
self.udp_run = False
self.udp_thread.join()
super(CollectorService, self).stop()

View File

@ -148,6 +148,7 @@ class NotificationService(service_base.PipelineBasedService):
def start(self):
super(NotificationService, self).start()
self.shutdown = False
self.periodic = None
self.partition_coordinator = None
self.coord_lock = threading.Lock()
@ -211,7 +212,8 @@ class NotificationService(service_base.PipelineBasedService):
utils.spawn_thread(self.periodic.start)
# configure pipelines after all coordination is configured.
self._configure_pipeline_listener()
with self.coord_lock:
self._configure_pipeline_listener()
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
@ -264,57 +266,63 @@ class NotificationService(service_base.PipelineBasedService):
self.listeners.append(listener)
def _refresh_agent(self, event):
self._configure_pipeline_listener()
with self.coord_lock:
if self.shutdown:
# NOTE(sileht): We are going to shutdown we everything will be
# stopped, we should not restart them
return
self._configure_pipeline_listener()
def _configure_pipeline_listener(self):
with self.coord_lock:
ev_pipes = []
if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport()
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
ev_pipes = []
if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines
pipelines = self.pipeline_manager.pipelines + ev_pipes
transport = messaging.get_transport()
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
endpoints = []
targets = []
endpoints = []
targets = []
for pipe in pipelines:
if isinstance(pipe, pipeline.EventPipeline):
endpoints.append(pipeline.EventPipelineEndpoint(pipe))
else:
endpoints.append(pipeline.SamplePipelineEndpoint(pipe))
for pipe in pipelines:
if isinstance(pipe, pipeline.EventPipeline):
endpoints.append(pipeline.EventPipelineEndpoint(pipe))
else:
endpoints.append(pipeline.SamplePipelineEndpoint(pipe))
for pipe_set, pipe in itertools.product(partitioned, pipelines):
LOG.debug('Pipeline endpoint: %s from set: %s',
pipe.name, pipe_set)
topic = '%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name, pipe_set)
targets.append(oslo_messaging.Target(topic=topic))
for pipe_set, pipe in itertools.product(partitioned, pipelines):
LOG.debug('Pipeline endpoint: %s from set: %s',
pipe.name, pipe_set)
topic = '%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name, pipe_set)
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.pipeline_listener.stop()
self.pipeline_listener.wait()
if self.pipeline_listener:
self.pipeline_listener.stop()
self.pipeline_listener.wait()
self.pipeline_listener = messaging.get_batch_notification_listener(
transport,
targets,
endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
self.pipeline_listener.start()
self.pipeline_listener = messaging.get_batch_notification_listener(
transport,
targets,
endpoints,
batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout)
self.pipeline_listener.start()
def stop(self):
if self.started:
self.shutdown = True
if self.periodic:
self.periodic.stop()
self.periodic.wait()
if self.partition_coordinator:
self.partition_coordinator.stop()
if self.pipeline_listener:
utils.kill_listeners([self.pipeline_listener])
utils.kill_listeners(self.listeners)
with self.coord_lock:
if self.pipeline_listener:
utils.kill_listeners([self.pipeline_listener])
utils.kill_listeners(self.listeners)
super(NotificationService, self).stop()
def reload_pipeline(self):
@ -328,12 +336,18 @@ class NotificationService(service_base.PipelineBasedService):
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
# restart the main queue listeners.
utils.kill_listeners(self.listeners)
self._configure_main_queue_listeners(
self.pipe_manager, self.event_pipe_manager)
with self.coord_lock:
if self.shutdown:
# NOTE(sileht): We are going to shutdown we everything will be
# stopped, we should not restart them
return
# restart the pipeline listeners if workload partitioning
# is enabled.
if cfg.CONF.notification.workload_partitioning:
self._configure_pipeline_listener()
# restart the main queue listeners.
utils.kill_listeners(self.listeners)
self._configure_main_queue_listeners(
self.pipe_manager, self.event_pipe_manager)
# restart the pipeline listeners if workload partitioning
# is enabled.
if cfg.CONF.notification.workload_partitioning:
self._configure_pipeline_listener()

View File

@ -22,6 +22,7 @@ import six
from ceilometer.i18n import _LE, _LI
from ceilometer import pipeline
from ceilometer import utils
LOG = log.getLogger(__name__)
@ -56,10 +57,19 @@ class PipelineBasedService(ServiceBase):
self.set_pipeline_hash(pipeline.get_pipeline_hash(
pipeline.EVENT_TYPE), pipeline.EVENT_TYPE)
self.refresh_pipeline_periodic = None
if (cfg.CONF.refresh_pipeline_cfg or
cfg.CONF.refresh_event_pipeline_cfg):
self.tg.add_timer(cfg.CONF.pipeline_polling_interval,
self.refresh_pipeline)
self.refresh_pipeline_periodic = utils.create_periodic(
target=self.refresh_pipeline,
spacing=cfg.CONF.pipeline_polling_interval)
utils.spawn_thread(self.refresh_pipeline_periodic.start)
def stop(self):
if self.started and self.refresh_pipeline_periodic:
self.refresh_pipeline_periodic.stop()
self.refresh_pipeline_periodic.wait()
super(PipelineBasedService, self).stop()
def get_pipeline_mtime(self, p_type=pipeline.SAMPLE_TYPE):
return (self.event_pipeline_mtime if p_type == pipeline.EVENT_TYPE else

View File

@ -276,13 +276,9 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification):
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_pipeline_poller(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self.srv.tg = mock.MagicMock()
self.srv.start()
self.addCleanup(self.srv.stop)
pipeline_poller_call = mock.call(1, self.srv.refresh_pipeline)
self.assertIn(pipeline_poller_call,
self.srv.tg.add_timer.call_args_list)
self.assertIsNotNone(self.srv.refresh_pipeline_periodic)
def test_notification_reloaded_pipeline(self):
pipeline_cfg_file = self.setup_pipeline(['instance'])

View File

@ -19,6 +19,7 @@
import abc
import copy
import datetime
import time
import mock
from oslo_config import fixture as fixture_config
@ -250,6 +251,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
'pipeline_cfg_file',
self.path_get('etc/ceilometer/pipeline.yaml')
)
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.CONF(args=[])
self.mgr = self.create_manager()
self.mgr.extensions = self.create_extension_list()
@ -307,13 +309,22 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
mpc = self.mgr.partition_coordinator
mpc.is_active.return_value = False
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.mgr.partition_coordinator.heartbeat = mock.MagicMock()
self.mgr.start()
setup_polling.assert_called_once_with()
mpc.start.assert_called_once_with()
self.assertEqual(2, mpc.join_group.call_count)
self.mgr.setup_polling_tasks.assert_called_once_with()
timer_call = mock.call(1.0, mpc.heartbeat)
self.assertEqual([timer_call], self.mgr.tg.add_timer.call_args_list)
# Wait first heatbeat
runs = 0
for i in six.moves.range(10):
runs = list(self.mgr.heartbeat_timer.iter_watchers())[0].runs
if runs > 0:
break
time.sleep(0.5)
self.assertGreaterEqual(1, runs)
self.mgr.stop()
mpc.stop.assert_called_once_with()
@ -323,6 +334,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
mpc = self.mgr.partition_coordinator
mpc.is_active.return_value = False
self.CONF.set_override('heartbeat', 1.0, group='coordination')
self.mgr.partition_coordinator.heartbeat = mock.MagicMock()
self.CONF.set_override('refresh_pipeline_cfg', True)
self.CONF.set_override('pipeline_polling_interval', 5)
self.addCleanup(self.mgr.stop)
@ -332,10 +345,17 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
mpc.start.assert_called_once_with()
self.assertEqual(2, mpc.join_group.call_count)
self.mgr.setup_polling_tasks.assert_called_once_with()
timer_call = mock.call(1.0, mpc.heartbeat)
pipeline_poller_call = mock.call(5, self.mgr.refresh_pipeline)
self.assertEqual([timer_call, pipeline_poller_call],
self.mgr.tg.add_timer.call_args_list)
# Wait first heatbeat
runs = 0
for i in six.moves.range(10):
runs = list(self.mgr.heartbeat_timer.iter_watchers())[0].runs
if runs > 0:
break
time.sleep(0.5)
self.assertGreaterEqual(1, runs)
self.assertEqual([], list(self.mgr.polling_periodics.iter_watchers()))
def test_join_partitioning_groups(self):
self.mgr.discoveries = self.create_discoveries()
@ -412,10 +432,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
mgr = self.create_manager()
mgr.extensions = self.mgr.extensions
mgr.create_polling_task = mock.MagicMock()
mgr.tg = mock.MagicMock()
mgr.start()
self.addCleanup(mgr.stop)
self.assertTrue(mgr.tg.add_timer.called)
mgr.create_polling_task.assert_called_once_with()
def test_manager_exception_persistency(self):
self.pipeline_cfg['sources'].append({

View File

@ -36,6 +36,10 @@ from ceilometer import pipeline
from ceilometer.tests.unit.agent import agentbase
def fakedelayed(delay, target, *args, **kwargs):
return target(*args, **kwargs)
class PollingException(Exception):
pass
@ -408,6 +412,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self._batching_samples(4, 1)
def _batching_samples(self, expected_samples, call_count):
self.useFixture(mockpatch.PatchObject(manager.utils, 'delayed',
side_effect=fakedelayed))
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
@ -425,12 +431,11 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase):
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.mgr.tg = os_service.threadgroup.ThreadGroup(1000)
self.mgr.start()
self.addCleanup(self.mgr.stop)
# Manually executes callbacks
for timer in self.mgr.pollster_timers:
timer.f(*timer.args, **timer.kw)
for cb, __, args, kwargs in self.mgr.polling_periodics._callables:
cb(*args, **kwargs)
samples = self.notified_samples
self.assertEqual(expected_samples, len(samples))

View File

@ -26,7 +26,10 @@ import decimal
import hashlib
import struct
import threading
import time
from concurrent import futures
from futurist import periodics
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import timeutils
@ -257,8 +260,22 @@ def kill_listeners(listeners):
listener.wait()
def delayed(delay, target, *args, **kwargs):
time.sleep(delay)
return target(*args, **kwargs)
def spawn_thread(target, *args, **kwargs):
t = threading.Thread(target=target, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t
def create_periodic(target, spacing, run_immediately=True, *args, **kwargs):
p = periodics.PeriodicWorker.create(
[], executor_factory=lambda: futures.ThreadPoolExecutor(max_workers=1))
p.add(periodics.periodic(
spacing=spacing, run_immediately=run_immediately)(
lambda: target(*args, **kwargs)))
return p