From 1ab0acb851a72e4a13903dab73a20a45de98baaa Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Fri, 22 Apr 2016 10:14:36 +0200 Subject: [PATCH] pollsters: Remove eventlet timers This change removes usage of eventlet timers. This allows coordinator heartbeat/watchers to work correctly when the main thread is stuck for any reason (IO, time.sleep, ...). This also fixes a concurrency issue in the notification-agent between stop/reload_pipeline/refresh_agent that manipulate listeners. For example a listener stopped by stop() could be restart by reload_pipeline or refresh_agent. Now we use the coord_lock to protect the listener manipulations and ensure we are not in a shutdown process when we restart it. This bug can't occurs with greenlet because we don't monkeypatch system call for a while now and all of this methods wasn't ran in concurrency manner. But remplacing greenlet by reel thread have show up the bug. Closes-Bug: #1582641 Change-Id: I21c3b953a296316b983114435fcbeba1e29f051e --- ceilometer/agent/manager.py | 79 +++++++------ ceilometer/collector.py | 15 +-- ceilometer/notification.py | 104 ++++++++++-------- ceilometer/service_base.py | 14 ++- .../tests/functional/test_notification.py | 6 +- ceilometer/tests/unit/agent/agentbase.py | 35 ++++-- ceilometer/tests/unit/agent/test_manager.py | 11 +- ceilometer/utils.py | 17 +++ 8 files changed, 177 insertions(+), 104 deletions(-) diff --git a/ceilometer/agent/manager.py b/ceilometer/agent/manager.py index 28f70b68..3b9e5a2d 100644 --- a/ceilometer/agent/manager.py +++ b/ceilometer/agent/manager.py @@ -18,6 +18,8 @@ import collections import itertools import random +from concurrent import futures +from futurist import periodics from keystoneauth1 import exceptions as ka_exceptions from keystoneclient import exceptions as ks_exceptions from oslo_config import cfg @@ -274,8 +276,13 @@ class AgentManager(service_base.PipelineBasedService): discoveries = (self._extensions('discover', namespace).extensions for namespace in namespaces) self.discoveries = list(itertools.chain(*list(discoveries))) + self.polling_periodics = None self.partition_coordinator = coordination.PartitionCoordinator() + self.heartbeat_timer = utils.create_periodic( + target=self.partition_coordinator.heartbeat, + spacing=cfg.CONF.coordination.heartbeat, + run_immediately=True) # Compose coordination group prefix. # We'll use namespaces as the basement for this partitioning. @@ -345,27 +352,15 @@ class AgentManager(service_base.PipelineBasedService): if not self.groups and self.partition_coordinator.is_active(): self.partition_coordinator.stop() - self.kill_heartbeat_timer() + self.heartbeat_timer.stop() if self.groups and not self.partition_coordinator.is_active(): self.partition_coordinator.start() - self.heartbeat_timer = self.tg.add_timer( - cfg.CONF.coordination.heartbeat, - self.partition_coordinator.heartbeat) + utils.spawn_thread(self.heartbeat_timer.start) for group in self.groups: self.partition_coordinator.join_group(group) - def kill_heartbeat_timer(self): - if not getattr(self, 'heartbeat_timer', None): - return - - try: - self.heartbeat_timer.stop() - self.tg.timer_done(self.heartbeat_timer) - except Exception as e: - LOG.error(_LE("Failed to stop heartbeat timer due to: %s"), e) - def create_polling_task(self): """Create an initially empty polling task.""" return PollingTask(self) @@ -388,7 +383,12 @@ class AgentManager(service_base.PipelineBasedService): discovery_group_id) if discovery_group_id else None) - def configure_polling_tasks(self): + def start_polling_tasks(self): + # NOTE(sileht): 1000 is just the same as previous oslo_service code + self.polling_periodics = periodics.PeriodicWorker.create( + [], executor_factory=lambda: + futures.ThreadPoolExecutor(max_workers=1000)) + # allow time for coordination if necessary delay_start = self.partition_coordinator.is_active() @@ -396,29 +396,33 @@ class AgentManager(service_base.PipelineBasedService): delay_polling_time = random.randint( 0, cfg.CONF.shuffle_time_before_polling_task) - pollster_timers = [] data = self.setup_polling_tasks() for interval, polling_task in data.items(): delay_time = (interval + delay_polling_time if delay_start else delay_polling_time) - pollster_timers.append(self.tg.add_timer(interval, - self.interval_task, - initial_delay=delay_time, - task=polling_task)) - return pollster_timers + @periodics.periodic(spacing=interval, run_immediately=False) + def task(): + self.interval_task(polling_task) + + utils.spawn_thread(utils.delayed, delay_time, + self.polling_periodics.add, task) + + if data: + # Don't start useless threads if no task will run + utils.spawn_thread(self.polling_periodics.start, allow_empty=True) def start(self): super(AgentManager, self).start() self.polling_manager = pipeline.setup_polling() self.join_partitioning_groups() - self.pollster_timers = self.configure_polling_tasks() + self.start_polling_tasks() self.init_pipeline_refresh() def stop(self): if self.started: - self.stop_pollsters() - self.kill_heartbeat_timer() + self.stop_pollsters_tasks() + self.heartbeat_timer.stop() self.partition_coordinator.stop() super(AgentManager, self).stop() @@ -432,6 +436,16 @@ class AgentManager(service_base.PipelineBasedService): @property def keystone(self): + # FIXME(sileht): This lazy loading of keystone client doesn't + # look concurrently safe, we never see issue because once we have + # connected to keystone everything is fine, and because all pollsters + # are delayed during startup. But each polling task creates a new + # client and overrides it which has been created by other polling + # tasks. During this short time bad thing can occur. + # + # I think we must not reset keystone client before + # running a polling task, but refresh it periodicaly instead. + # NOTE(sileht): we do lazy loading of the keystone client # for multiple reasons: # * don't use it if no plugin need it @@ -501,25 +515,22 @@ class AgentManager(service_base.PipelineBasedService): LOG.warning(_('Unknown discovery extension: %s') % name) return resources - def stop_pollsters(self): - for x in self.pollster_timers: - try: - x.stop() - self.tg.timer_done(x) - except Exception: - LOG.error(_('Error stopping pollster.'), exc_info=True) - self.pollster_timers = [] + def stop_pollsters_tasks(self): + if self.polling_periodics: + self.polling_periodics.stop() + self.polling_periodics.wait() + self.polling_periodics = None def reload_pipeline(self): if self.pipeline_validated: LOG.info(_LI("Reconfiguring polling tasks.")) # stop existing pollsters and leave partitioning groups - self.stop_pollsters() + self.stop_pollsters_tasks() for group in self.groups: self.partition_coordinator.leave_group(group) # re-create partitioning groups according to pipeline # and configure polling tasks with latest pipeline conf self.join_partitioning_groups() - self.pollster_timers = self.configure_polling_tasks() + self.start_polling_tasks() diff --git a/ceilometer/collector.py b/ceilometer/collector.py index a5843dd6..f646bdd2 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -126,13 +126,14 @@ class CollectorService(service_base.ServiceBase): LOG.exception(_("UDP: Unable to store meter")) def stop(self): - if self.sample_listener: - utils.kill_listeners([self.sample_listener]) - if self.event_listener: - utils.kill_listeners([self.event_listener]) - if self.udp_thread: - self.udp_run = False - self.udp_thread.join() + if self.started: + if self.sample_listener: + utils.kill_listeners([self.sample_listener]) + if self.event_listener: + utils.kill_listeners([self.event_listener]) + if self.udp_thread: + self.udp_run = False + self.udp_thread.join() super(CollectorService, self).stop() diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 41e7e847..415b829c 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -148,6 +148,7 @@ class NotificationService(service_base.PipelineBasedService): def start(self): super(NotificationService, self).start() + self.shutdown = False self.periodic = None self.partition_coordinator = None self.coord_lock = threading.Lock() @@ -211,7 +212,8 @@ class NotificationService(service_base.PipelineBasedService): utils.spawn_thread(self.periodic.start) # configure pipelines after all coordination is configured. - self._configure_pipeline_listener() + with self.coord_lock: + self._configure_pipeline_listener() if not cfg.CONF.notification.disable_non_metric_meters: LOG.warning(_LW('Non-metric meters may be collected. It is highly ' @@ -264,57 +266,63 @@ class NotificationService(service_base.PipelineBasedService): self.listeners.append(listener) def _refresh_agent(self, event): - self._configure_pipeline_listener() + with self.coord_lock: + if self.shutdown: + # NOTE(sileht): We are going to shutdown we everything will be + # stopped, we should not restart them + return + self._configure_pipeline_listener() def _configure_pipeline_listener(self): - with self.coord_lock: - ev_pipes = [] - if cfg.CONF.notification.store_events: - ev_pipes = self.event_pipeline_manager.pipelines - pipelines = self.pipeline_manager.pipelines + ev_pipes - transport = messaging.get_transport() - partitioned = self.partition_coordinator.extract_my_subset( - self.group_id, - range(cfg.CONF.notification.pipeline_processing_queues)) + ev_pipes = [] + if cfg.CONF.notification.store_events: + ev_pipes = self.event_pipeline_manager.pipelines + pipelines = self.pipeline_manager.pipelines + ev_pipes + transport = messaging.get_transport() + partitioned = self.partition_coordinator.extract_my_subset( + self.group_id, + range(cfg.CONF.notification.pipeline_processing_queues)) - endpoints = [] - targets = [] + endpoints = [] + targets = [] - for pipe in pipelines: - if isinstance(pipe, pipeline.EventPipeline): - endpoints.append(pipeline.EventPipelineEndpoint(pipe)) - else: - endpoints.append(pipeline.SamplePipelineEndpoint(pipe)) + for pipe in pipelines: + if isinstance(pipe, pipeline.EventPipeline): + endpoints.append(pipeline.EventPipelineEndpoint(pipe)) + else: + endpoints.append(pipeline.SamplePipelineEndpoint(pipe)) - for pipe_set, pipe in itertools.product(partitioned, pipelines): - LOG.debug('Pipeline endpoint: %s from set: %s', - pipe.name, pipe_set) - topic = '%s-%s-%s' % (self.NOTIFICATION_IPC, - pipe.name, pipe_set) - targets.append(oslo_messaging.Target(topic=topic)) + for pipe_set, pipe in itertools.product(partitioned, pipelines): + LOG.debug('Pipeline endpoint: %s from set: %s', + pipe.name, pipe_set) + topic = '%s-%s-%s' % (self.NOTIFICATION_IPC, + pipe.name, pipe_set) + targets.append(oslo_messaging.Target(topic=topic)) - if self.pipeline_listener: - self.pipeline_listener.stop() - self.pipeline_listener.wait() + if self.pipeline_listener: + self.pipeline_listener.stop() + self.pipeline_listener.wait() - self.pipeline_listener = messaging.get_batch_notification_listener( - transport, - targets, - endpoints, - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) - self.pipeline_listener.start() + self.pipeline_listener = messaging.get_batch_notification_listener( + transport, + targets, + endpoints, + batch_size=cfg.CONF.notification.batch_size, + batch_timeout=cfg.CONF.notification.batch_timeout) + self.pipeline_listener.start() def stop(self): if self.started: + self.shutdown = True if self.periodic: self.periodic.stop() self.periodic.wait() if self.partition_coordinator: self.partition_coordinator.stop() - if self.pipeline_listener: - utils.kill_listeners([self.pipeline_listener]) - utils.kill_listeners(self.listeners) + with self.coord_lock: + if self.pipeline_listener: + utils.kill_listeners([self.pipeline_listener]) + utils.kill_listeners(self.listeners) super(NotificationService, self).stop() def reload_pipeline(self): @@ -328,12 +336,18 @@ class NotificationService(service_base.PipelineBasedService): self.event_pipe_manager = self._get_event_pipeline_manager( self.transport) - # restart the main queue listeners. - utils.kill_listeners(self.listeners) - self._configure_main_queue_listeners( - self.pipe_manager, self.event_pipe_manager) + with self.coord_lock: + if self.shutdown: + # NOTE(sileht): We are going to shutdown we everything will be + # stopped, we should not restart them + return - # restart the pipeline listeners if workload partitioning - # is enabled. - if cfg.CONF.notification.workload_partitioning: - self._configure_pipeline_listener() + # restart the main queue listeners. + utils.kill_listeners(self.listeners) + self._configure_main_queue_listeners( + self.pipe_manager, self.event_pipe_manager) + + # restart the pipeline listeners if workload partitioning + # is enabled. + if cfg.CONF.notification.workload_partitioning: + self._configure_pipeline_listener() diff --git a/ceilometer/service_base.py b/ceilometer/service_base.py index 9fcbe11b..cca4e9fa 100644 --- a/ceilometer/service_base.py +++ b/ceilometer/service_base.py @@ -22,6 +22,7 @@ import six from ceilometer.i18n import _LE, _LI from ceilometer import pipeline +from ceilometer import utils LOG = log.getLogger(__name__) @@ -56,10 +57,19 @@ class PipelineBasedService(ServiceBase): self.set_pipeline_hash(pipeline.get_pipeline_hash( pipeline.EVENT_TYPE), pipeline.EVENT_TYPE) + self.refresh_pipeline_periodic = None if (cfg.CONF.refresh_pipeline_cfg or cfg.CONF.refresh_event_pipeline_cfg): - self.tg.add_timer(cfg.CONF.pipeline_polling_interval, - self.refresh_pipeline) + self.refresh_pipeline_periodic = utils.create_periodic( + target=self.refresh_pipeline, + spacing=cfg.CONF.pipeline_polling_interval) + utils.spawn_thread(self.refresh_pipeline_periodic.start) + + def stop(self): + if self.started and self.refresh_pipeline_periodic: + self.refresh_pipeline_periodic.stop() + self.refresh_pipeline_periodic.wait() + super(PipelineBasedService, self).stop() def get_pipeline_mtime(self, p_type=pipeline.SAMPLE_TYPE): return (self.event_pipeline_mtime if p_type == pipeline.EVENT_TYPE else diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index 7e4ee6a4..ea5ec557 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -276,13 +276,9 @@ class TestRealNotificationReloadablePipeline(BaseRealNotification): @mock.patch('ceilometer.publisher.test.TestPublisher') def test_notification_pipeline_poller(self, fake_publisher_cls): fake_publisher_cls.return_value = self.publisher - self.srv.tg = mock.MagicMock() self.srv.start() self.addCleanup(self.srv.stop) - - pipeline_poller_call = mock.call(1, self.srv.refresh_pipeline) - self.assertIn(pipeline_poller_call, - self.srv.tg.add_timer.call_args_list) + self.assertIsNotNone(self.srv.refresh_pipeline_periodic) def test_notification_reloaded_pipeline(self): pipeline_cfg_file = self.setup_pipeline(['instance']) diff --git a/ceilometer/tests/unit/agent/agentbase.py b/ceilometer/tests/unit/agent/agentbase.py index 32e687d3..254627c8 100644 --- a/ceilometer/tests/unit/agent/agentbase.py +++ b/ceilometer/tests/unit/agent/agentbase.py @@ -19,6 +19,7 @@ import abc import copy import datetime +import time import mock from oslo_config import fixture as fixture_config @@ -250,6 +251,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): 'pipeline_cfg_file', self.path_get('etc/ceilometer/pipeline.yaml') ) + self.CONF.set_override('heartbeat', 1.0, group='coordination') self.CONF(args=[]) self.mgr = self.create_manager() self.mgr.extensions = self.create_extension_list() @@ -307,13 +309,22 @@ class BaseAgentManagerTestCase(base.BaseTestCase): mpc = self.mgr.partition_coordinator mpc.is_active.return_value = False self.CONF.set_override('heartbeat', 1.0, group='coordination') + self.mgr.partition_coordinator.heartbeat = mock.MagicMock() self.mgr.start() setup_polling.assert_called_once_with() mpc.start.assert_called_once_with() self.assertEqual(2, mpc.join_group.call_count) self.mgr.setup_polling_tasks.assert_called_once_with() - timer_call = mock.call(1.0, mpc.heartbeat) - self.assertEqual([timer_call], self.mgr.tg.add_timer.call_args_list) + + # Wait first heatbeat + runs = 0 + for i in six.moves.range(10): + runs = list(self.mgr.heartbeat_timer.iter_watchers())[0].runs + if runs > 0: + break + time.sleep(0.5) + self.assertGreaterEqual(1, runs) + self.mgr.stop() mpc.stop.assert_called_once_with() @@ -323,6 +334,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): mpc = self.mgr.partition_coordinator mpc.is_active.return_value = False self.CONF.set_override('heartbeat', 1.0, group='coordination') + self.mgr.partition_coordinator.heartbeat = mock.MagicMock() + self.CONF.set_override('refresh_pipeline_cfg', True) self.CONF.set_override('pipeline_polling_interval', 5) self.addCleanup(self.mgr.stop) @@ -332,10 +345,17 @@ class BaseAgentManagerTestCase(base.BaseTestCase): mpc.start.assert_called_once_with() self.assertEqual(2, mpc.join_group.call_count) self.mgr.setup_polling_tasks.assert_called_once_with() - timer_call = mock.call(1.0, mpc.heartbeat) - pipeline_poller_call = mock.call(5, self.mgr.refresh_pipeline) - self.assertEqual([timer_call, pipeline_poller_call], - self.mgr.tg.add_timer.call_args_list) + + # Wait first heatbeat + runs = 0 + for i in six.moves.range(10): + runs = list(self.mgr.heartbeat_timer.iter_watchers())[0].runs + if runs > 0: + break + time.sleep(0.5) + self.assertGreaterEqual(1, runs) + + self.assertEqual([], list(self.mgr.polling_periodics.iter_watchers())) def test_join_partitioning_groups(self): self.mgr.discoveries = self.create_discoveries() @@ -412,10 +432,9 @@ class BaseAgentManagerTestCase(base.BaseTestCase): mgr = self.create_manager() mgr.extensions = self.mgr.extensions mgr.create_polling_task = mock.MagicMock() - mgr.tg = mock.MagicMock() mgr.start() self.addCleanup(mgr.stop) - self.assertTrue(mgr.tg.add_timer.called) + mgr.create_polling_task.assert_called_once_with() def test_manager_exception_persistency(self): self.pipeline_cfg['sources'].append({ diff --git a/ceilometer/tests/unit/agent/test_manager.py b/ceilometer/tests/unit/agent/test_manager.py index 03d1f131..ed1e33a1 100644 --- a/ceilometer/tests/unit/agent/test_manager.py +++ b/ceilometer/tests/unit/agent/test_manager.py @@ -36,6 +36,10 @@ from ceilometer import pipeline from ceilometer.tests.unit.agent import agentbase +def fakedelayed(delay, target, *args, **kwargs): + return target(*args, **kwargs) + + class PollingException(Exception): pass @@ -408,6 +412,8 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): self._batching_samples(4, 1) def _batching_samples(self, expected_samples, call_count): + self.useFixture(mockpatch.PatchObject(manager.utils, 'delayed', + side_effect=fakedelayed)) pipeline = yaml.dump({ 'sources': [{ 'name': 'test_pipeline', @@ -425,12 +431,11 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self.mgr.tg = os_service.threadgroup.ThreadGroup(1000) self.mgr.start() self.addCleanup(self.mgr.stop) # Manually executes callbacks - for timer in self.mgr.pollster_timers: - timer.f(*timer.args, **timer.kw) + for cb, __, args, kwargs in self.mgr.polling_periodics._callables: + cb(*args, **kwargs) samples = self.notified_samples self.assertEqual(expected_samples, len(samples)) diff --git a/ceilometer/utils.py b/ceilometer/utils.py index a4495da7..544d5523 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -26,7 +26,10 @@ import decimal import hashlib import struct import threading +import time +from concurrent import futures +from futurist import periodics from oslo_concurrency import processutils from oslo_config import cfg from oslo_utils import timeutils @@ -257,8 +260,22 @@ def kill_listeners(listeners): listener.wait() +def delayed(delay, target, *args, **kwargs): + time.sleep(delay) + return target(*args, **kwargs) + + def spawn_thread(target, *args, **kwargs): t = threading.Thread(target=target, args=args, kwargs=kwargs) t.daemon = True t.start() return t + + +def create_periodic(target, spacing, run_immediately=True, *args, **kwargs): + p = periodics.PeriodicWorker.create( + [], executor_factory=lambda: futures.ThreadPoolExecutor(max_workers=1)) + p.add(periodics.periodic( + spacing=spacing, run_immediately=run_immediately)( + lambda: target(*args, **kwargs))) + return p