From e38ffd09e8a550293ae416e5009561ddf04eaae7 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Mon, 20 Jul 2015 16:42:13 -0400 Subject: [PATCH] distributed coordinated notifications this patch enables a queue per pipeline per agent. each agent can send data to pipeline queue set of any agent but only listens to it's own pipeline queue. when receiving a notification, the agent will: 1. build datapoint 2. calculate hash key based on (hardcoded) attribute and mod hash by number of active agents 3. sends the data to single agents pipeline queue set. ex. two agents, sample1 with res_id=1 and sample2 with res_id=2 1. agent1 builds both samples 2. hash(sample1) % 2 (num agents) == agent1 sends sample to agent2 pipeline queue. 2a. agent2 process sample1 3. hash(sample2) % 2 (num agents) == agent1 sends sample to agent1 pipeline queue. 3a. agent1 process sample2 Implements blueprint distributed-coordinated-notifications Change-Id: Iab52cae0a5bfbc747a2918e67dfe8da5fd0fda84 --- ceilometer/notification.py | 53 +++++++++++++---------- ceilometer/pipeline.py | 38 +++++++++++----- ceilometer/tests/test_notification.py | 62 ++++++++++++++++++--------- 3 files changed, 99 insertions(+), 54 deletions(-) diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 275e694e28..8096f6d67d 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -91,12 +91,15 @@ class NotificationService(service_base.BaseService): invoke_args=(pm, ) ) - def _get_notifier(self, transport, pipe): - return oslo_messaging.Notifier( - transport, - driver=cfg.CONF.publisher_notifier.telemetry_driver, - publisher_id='ceilometer.notification', - topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name)) + def _get_notifiers(self, transport, pipe): + notifiers = [] + for agent in self.partition_coordinator._get_members(self.group_id): + notifiers.append(oslo_messaging.Notifier( + transport, + driver=cfg.CONF.publisher_notifier.telemetry_driver, + publisher_id='ceilometer.notification', + topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, agent))) + return notifiers def _get_pipe_manager(self, transport, pipeline_manager): @@ -105,7 +108,7 @@ class NotificationService(service_base.BaseService): for pipe in pipeline_manager.pipelines: pipe_manager.add_transporter( (pipe.source.support_meter, - self._get_notifier(transport, pipe))) + self._get_notifiers(transport, pipe))) else: pipe_manager = pipeline_manager @@ -121,7 +124,7 @@ class NotificationService(service_base.BaseService): for pipe in self.event_pipeline_manager.pipelines: event_pipe_manager.add_transporter( (pipe.source.support_event, - self._get_notifier(transport, pipe))) + self._get_notifiers(transport, pipe))) else: event_pipe_manager = self.event_pipeline_manager @@ -133,17 +136,12 @@ class NotificationService(service_base.BaseService): self.pipeline_manager = pipeline.setup_pipeline() self.transport = messaging.get_transport() - self.pipe_manager = self._get_pipe_manager(self.transport, - self.pipeline_manager) - self.event_pipe_manager = self._get_event_pipeline_manager( - self.transport) - - self.partition_coordinator = coordination.PartitionCoordinator() - self.partition_coordinator.start() - if cfg.CONF.notification.workload_partitioning: self.ctxt = context.get_admin_context() self.group_id = self.NOTIFICATION_NAMESPACE + self.partition_coordinator = coordination.PartitionCoordinator() + self.partition_coordinator.start() + self.partition_coordinator.join_group(self.group_id) else: # FIXME(sileht): endpoint use notification_topics option # and it should not because this is oslo_messaging option @@ -154,12 +152,16 @@ class NotificationService(service_base.BaseService): messaging.get_notifier(self.transport, '') self.group_id = None + self.pipe_manager = self._get_pipe_manager(self.transport, + self.pipeline_manager) + self.event_pipe_manager = self._get_event_pipeline_manager( + self.transport) + self.listeners, self.pipeline_listeners = [], [] self._configure_main_queue_listeners(self.pipe_manager, self.event_pipe_manager) if cfg.CONF.notification.workload_partitioning: - self.partition_coordinator.join_group(self.group_id) self._configure_pipeline_listeners() self.partition_coordinator.watch_group(self.group_id, self._refresh_agent) @@ -220,6 +222,9 @@ class NotificationService(service_base.BaseService): self.listeners.append(listener) def _refresh_agent(self, event): + self.reload_pipeline() + + def _refresh_listeners(self): utils.kill_listeners(self.pipeline_listeners) self._configure_pipeline_listeners() @@ -228,10 +233,9 @@ class NotificationService(service_base.BaseService): ev_pipes = [] if cfg.CONF.notification.store_events: ev_pipes = self.event_pipeline_manager.pipelines - partitioned = self.partition_coordinator.extract_my_subset( - self.group_id, self.pipeline_manager.pipelines + ev_pipes) + pipelines = self.pipeline_manager.pipelines + ev_pipes transport = messaging.get_transport() - for pipe in partitioned: + for pipe in pipelines: LOG.debug(_('Pipeline endpoint: %s'), pipe.name) pipe_endpoint = (pipeline.EventPipelineEndpoint if isinstance(pipe, pipeline.EventPipeline) else @@ -239,7 +243,9 @@ class NotificationService(service_base.BaseService): listener = messaging.get_notification_listener( transport, [oslo_messaging.Target( - topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))], + topic='%s-%s-%s' % (self.NOTIFICATION_IPC, + pipe.name, + self.partition_coordinator._my_id))], [pipe_endpoint(self.ctxt, pipe)]) listener.start() self.pipeline_listeners.append(listener) @@ -256,6 +262,9 @@ class NotificationService(service_base.BaseService): self.pipe_manager = self._get_pipe_manager( self.transport, self.pipeline_manager) + self.event_pipe_manager = self._get_event_pipeline_manager( + self.transport) + # re-start the main queue listeners. utils.kill_listeners(self.listeners) self._configure_main_queue_listeners( @@ -264,4 +273,4 @@ class NotificationService(service_base.BaseService): # re-start the pipeline listeners if workload partitioning # is enabled. if cfg.CONF.notification.workload_partitioning: - self._refresh_agent(None) + self._refresh_listeners() diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 6aff21a4d5..2ef74e4646 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -130,6 +130,7 @@ class _PipelineTransportManager(object): def publisher(self, context): serializer = self.serializer + hash_to_bucketise = self.hash_to_bucketise transporters = self.transporters filter_attr = self.filter_attr event_type = self.event_type @@ -137,13 +138,20 @@ class _PipelineTransportManager(object): class PipelinePublishContext(object): def __enter__(self): def p(data): - serialized_data = serializer(data) - for d_filter, notifier in transporters: - if any(d_filter(d[filter_attr]) - for d in serialized_data): - notifier.sample(context.to_dict(), - event_type=event_type, - payload=serialized_data) + # TODO(gordc): cleanup so payload is always single + # datapoint. we can't correctly bucketise + # datapoints if batched. + data = [data] if not isinstance(data, list) else data + for datapoint in data: + serialized_data = serializer(datapoint) + for d_filter, notifiers in transporters: + if d_filter(serialized_data[filter_attr]): + key = (hash_to_bucketise(serialized_data) % + len(notifiers)) + notifier = notifiers[key] + notifier.sample(context.to_dict(), + event_type=event_type, + payload=[serialized_data]) return p def __exit__(self, exc_type, exc_value, traceback): @@ -156,20 +164,28 @@ class SamplePipelineTransportManager(_PipelineTransportManager): filter_attr = 'counter_name' event_type = 'ceilometer.pipeline' + @staticmethod + def hash_to_bucketise(datapoint): + return hash(datapoint['resource_id']) + @staticmethod def serializer(data): - return [publisher_utils.meter_message_from_counter( - sample, cfg.CONF.publisher.telemetry_secret) for sample in data] + return publisher_utils.meter_message_from_counter( + data, cfg.CONF.publisher.telemetry_secret) class EventPipelineTransportManager(_PipelineTransportManager): filter_attr = 'event_type' event_type = 'pipeline.event' + @staticmethod + def hash_to_bucketise(datapoint): + return hash(datapoint['event_type']) + @staticmethod def serializer(data): - return [publisher_utils.message_from_event( - data, cfg.CONF.publisher.telemetry_secret)] + return publisher_utils.message_from_event( + data, cfg.CONF.publisher.telemetry_secret) class PublishContext(object): diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index be3f4325ff..844539f030 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -363,16 +363,6 @@ class TestRealNotification(BaseRealNotification): self._check_notification_service() self.assertEqual('memory', self.publisher.samples[0].name) - @mock.patch('ceilometer.coordination.PartitionCoordinator') - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_ha_configured_agent_coord_disabled(self, fake_publisher_cls, - fake_coord): - fake_publisher_cls.return_value = self.publisher - fake_coord1 = mock.MagicMock() - fake_coord1.extract_my_subset.side_effect = lambda x, y: y - fake_coord.return_value = fake_coord1 - self._check_notification_service() - @mock.patch.object(oslo_service.service.Service, 'stop') def test_notification_service_start_abnormal(self, mocked): try: @@ -390,32 +380,24 @@ class TestRealNotificationHA(BaseRealNotification): group='notification') self.srv = notification.NotificationService() - @mock.patch('ceilometer.coordination.PartitionCoordinator') @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_notification_service(self, fake_publisher_cls, fake_coord): + def test_notification_service(self, fake_publisher_cls): fake_publisher_cls.return_value = self.publisher - fake_coord1 = mock.MagicMock() - fake_coord1.extract_my_subset.side_effect = lambda x, y: y - fake_coord.return_value = fake_coord1 self._check_notification_service() @mock.patch('hmac.new') - @mock.patch('ceilometer.coordination.PartitionCoordinator') @mock.patch('ceilometer.publisher.test.TestPublisher') def test_notification_service_no_secret(self, fake_publisher_cls, - fake_coord, fake_hmac): + fake_hmac): self.CONF.set_override('telemetry_secret', None, group='publisher') fake_publisher_cls.return_value = self.publisher - fake_coord1 = mock.MagicMock() - fake_coord1.extract_my_subset.side_effect = lambda x, y: y - fake_coord.return_value = fake_coord1 self._check_notification_service() self.assertFalse(fake_hmac.called) def test_reset_listeners_on_refresh(self): self.srv.start() self.assertEqual(2, len(self.srv.pipeline_listeners)) - self.srv._refresh_agent(None) + self.srv._refresh_listeners() self.assertEqual(2, len(self.srv.pipeline_listeners)) self.srv.stop() @@ -449,3 +431,41 @@ class TestRealNotificationHA(BaseRealNotification): self.assertEqual('ceilometer.pipeline', mock_notifier.call_args_list[2][1]['event_type']) self.srv.stop() + + @mock.patch('ceilometer.publisher.test.TestPublisher') + def test_multiple_agents(self, fake_publisher_cls): + fake_publisher_cls.return_value = self.publisher + + self.srv2 = notification.NotificationService() + with mock.patch('ceilometer.coordination.PartitionCoordinator' + '._get_members', return_value=['harry', 'lloyd']): + with mock.patch('uuid.uuid4', return_value='harry'): + self.srv.start() + with mock.patch('uuid.uuid4', return_value='lloyd'): + self.srv2.start() + + notifier = messaging.get_notifier(self.transport, + "compute.vagrant-precise") + payload1 = TEST_NOTICE_PAYLOAD.copy() + payload1['instance_id'] = '0' + notifier.info(context.RequestContext(), 'compute.instance.create.end', + payload1) + payload2 = TEST_NOTICE_PAYLOAD.copy() + payload2['instance_id'] = '1' + notifier.info(context.RequestContext(), 'compute.instance.create.end', + payload2) + self.expected_samples = 4 + self.expected_events = 2 + start = timeutils.utcnow() + while timeutils.delta_seconds(start, timeutils.utcnow()) < 60: + if (len(self.publisher.samples) >= self.expected_samples and + len(self.publisher.events) >= self.expected_events): + break + eventlet.sleep(0) + self.srv.stop() + self.srv2.stop() + + resources = set(s.resource_id for s in self.publisher.samples) + self.assertEqual(self.expected_samples, len(self.publisher.samples)) + self.assertEqual(self.expected_events, len(self.publisher.events)) + self.assertEqual(set(['1', '0']), resources)