Merge "better support notification coordination"

This commit is contained in:
Jenkins 2016-01-26 18:36:10 +00:00 committed by Gerrit Code Review
commit fe1743f647
3 changed files with 57 additions and 44 deletions

View File

@ -167,7 +167,7 @@ class PartitionCoordinator(object):
self.join_group(group_id) self.join_group(group_id)
try: try:
members = self._get_members(group_id) members = self._get_members(group_id)
LOG.debug('Members of group: %s', members) LOG.debug('Members of group: %s, Me: %s', members, self._my_id)
hr = utils.HashRing(members) hr = utils.HashRing(members)
filtered = [v for v in iterable filtered = [v for v in iterable
if hr.get_node(str(v)) == self._my_id] if hr.get_node(str(v)) == self._my_id]

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import itertools import itertools
import threading
from oslo_config import cfg from oslo_config import cfg
from oslo_context import context from oslo_context import context
@ -99,6 +100,7 @@ class NotificationService(service_base.BaseService):
super(NotificationService, self).__init__(*args, **kwargs) super(NotificationService, self).__init__(*args, **kwargs)
self.partition_coordinator = None self.partition_coordinator = None
self.listeners, self.pipeline_listeners = [], [] self.listeners, self.pipeline_listeners = [], []
self.coord_lock = threading.Lock()
self.group_id = None self.group_id = None
@classmethod @classmethod
@ -162,7 +164,6 @@ class NotificationService(service_base.BaseService):
self.group_id = self.NOTIFICATION_NAMESPACE self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator() self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start() self.partition_coordinator.start()
self.partition_coordinator.join_group(self.group_id)
else: else:
# FIXME(sileht): endpoint uses the notification_topics option # FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option # and it should not because this is an oslo_messaging option
@ -182,14 +183,16 @@ class NotificationService(service_base.BaseService):
self.event_pipe_manager) self.event_pipe_manager)
if cfg.CONF.notification.workload_partitioning: if cfg.CONF.notification.workload_partitioning:
self._configure_pipeline_listeners() # join group after all manager set up is configured
self.partition_coordinator.join_group(self.group_id)
self.partition_coordinator.watch_group(self.group_id, self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent) self._refresh_agent)
self.tg.add_timer(cfg.CONF.coordination.heartbeat, self.tg.add_timer(cfg.CONF.coordination.heartbeat,
self.partition_coordinator.heartbeat) self.partition_coordinator.heartbeat)
self.tg.add_timer(cfg.CONF.coordination.check_watchers, self.tg.add_timer(cfg.CONF.coordination.check_watchers,
self.partition_coordinator.run_watchers) self.partition_coordinator.run_watchers)
# configure pipelines after all coordination is configured.
self._configure_pipeline_listeners()
if not cfg.CONF.notification.disable_non_metric_meters: if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly ' LOG.warning(_LW('Non-metric meters may be collected. It is highly '
@ -247,49 +250,50 @@ class NotificationService(service_base.BaseService):
self._configure_pipeline_listeners(True) self._configure_pipeline_listeners(True)
def _configure_pipeline_listeners(self, reuse_listeners=False): def _configure_pipeline_listeners(self, reuse_listeners=False):
ev_pipes = [] with self.coord_lock:
if cfg.CONF.notification.store_events: ev_pipes = []
ev_pipes = self.event_pipeline_manager.pipelines if cfg.CONF.notification.store_events:
pipelines = self.pipeline_manager.pipelines + ev_pipes ev_pipes = self.event_pipeline_manager.pipelines
transport = messaging.get_transport() pipelines = self.pipeline_manager.pipelines + ev_pipes
partitioned = self.partition_coordinator.extract_my_subset( transport = messaging.get_transport()
self.group_id, partitioned = self.partition_coordinator.extract_my_subset(
range(cfg.CONF.notification.pipeline_processing_queues)) self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues))
queue_set = {} queue_set = {}
for pipe_set, pipe in itertools.product(partitioned, pipelines): for pipe_set, pipe in itertools.product(partitioned, pipelines):
queue_set['%s-%s-%s' % queue_set['%s-%s-%s' %
(self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
if reuse_listeners: if reuse_listeners:
topics = queue_set.keys() topics = queue_set.keys()
kill_list = [] kill_list = []
for listener in self.pipeline_listeners: for listener in self.pipeline_listeners:
if listener.dispatcher.targets[0].topic in topics: if listener.dispatcher.targets[0].topic in topics:
queue_set.pop(listener.dispatcher.targets[0].topic) queue_set.pop(listener.dispatcher.targets[0].topic)
else: else:
kill_list.append(listener) kill_list.append(listener)
for listener in kill_list: for listener in kill_list:
utils.kill_listeners([listener]) utils.kill_listeners([listener])
self.pipeline_listeners.remove(listener) self.pipeline_listeners.remove(listener)
else: else:
utils.kill_listeners(self.pipeline_listeners) utils.kill_listeners(self.pipeline_listeners)
self.pipeline_listeners = [] self.pipeline_listeners = []
for topic, pipe in queue_set.items(): for topic, pipe in queue_set.items():
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set) pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline) if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint) else pipeline.SamplePipelineEndpoint)
listener = messaging.get_batch_notification_listener( listener = messaging.get_batch_notification_listener(
transport, transport,
[oslo_messaging.Target(topic=topic)], [oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)], [pipe_endpoint(self.ctxt, pipe)],
batch_size=cfg.CONF.notification.batch_size, batch_size=cfg.CONF.notification.batch_size,
batch_timeout=cfg.CONF.notification.batch_timeout) batch_timeout=cfg.CONF.notification.batch_timeout)
listener.start() listener.start()
self.pipeline_listeners.append(listener) self.pipeline_listeners.append(listener)
def stop(self): def stop(self):
if self.partition_coordinator: if self.partition_coordinator:

View File

@ -0,0 +1,9 @@
---
critical:
- >
[`bug 1533787 <https://bugs.launchpad.net/ceilometer/+bug/1533787>`_]
Fix an issue where agents are not properly getting registered to group
when multiple notification agents are deployed. This can result in
bad transformation as the agents are not coordinated. It is still
recommended to set heartbeat_timeout_threshold = 0 in
[oslo_messaging_rabbit] section when deploying multiple agents.