distributed coordinated notifications

this patch enables a queue per pipeline per agent. each agent can
send data to pipeline queue set of any agent but only listens to it's
own pipeline queue. when receiving a notification, the agent will:
1. build datapoint
2. calculate hash key based on (hardcoded) attribute and mod hash
   by number of active agents
3. sends the data to single agents pipeline queue set.

ex. two agents, sample1 with res_id=1 and sample2 with res_id=2
1. agent1 builds both samples
2. hash(sample1) % 2 (num agents) == agent1 sends sample to agent2
   pipeline queue.
2a. agent2 process sample1
3. hash(sample2) % 2 (num agents) == agent1 sends sample to agent1
   pipeline queue.
3a. agent1 process sample2

Implements blueprint distributed-coordinated-notifications

Change-Id: Iab52cae0a5bfbc747a2918e67dfe8da5fd0fda84
This commit is contained in:
gordon chung 2015-07-20 16:42:13 -04:00
parent 9265eee5b3
commit e38ffd09e8
3 changed files with 99 additions and 54 deletions

View File

@ -91,12 +91,15 @@ class NotificationService(service_base.BaseService):
invoke_args=(pm, ) invoke_args=(pm, )
) )
def _get_notifier(self, transport, pipe): def _get_notifiers(self, transport, pipe):
return oslo_messaging.Notifier( notifiers = []
transport, for agent in self.partition_coordinator._get_members(self.group_id):
driver=cfg.CONF.publisher_notifier.telemetry_driver, notifiers.append(oslo_messaging.Notifier(
publisher_id='ceilometer.notification', transport,
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name)) driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='ceilometer.notification',
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, pipe.name, agent)))
return notifiers
def _get_pipe_manager(self, transport, pipeline_manager): def _get_pipe_manager(self, transport, pipeline_manager):
@ -105,7 +108,7 @@ class NotificationService(service_base.BaseService):
for pipe in pipeline_manager.pipelines: for pipe in pipeline_manager.pipelines:
pipe_manager.add_transporter( pipe_manager.add_transporter(
(pipe.source.support_meter, (pipe.source.support_meter,
self._get_notifier(transport, pipe))) self._get_notifiers(transport, pipe)))
else: else:
pipe_manager = pipeline_manager pipe_manager = pipeline_manager
@ -121,7 +124,7 @@ class NotificationService(service_base.BaseService):
for pipe in self.event_pipeline_manager.pipelines: for pipe in self.event_pipeline_manager.pipelines:
event_pipe_manager.add_transporter( event_pipe_manager.add_transporter(
(pipe.source.support_event, (pipe.source.support_event,
self._get_notifier(transport, pipe))) self._get_notifiers(transport, pipe)))
else: else:
event_pipe_manager = self.event_pipeline_manager event_pipe_manager = self.event_pipeline_manager
@ -133,17 +136,12 @@ class NotificationService(service_base.BaseService):
self.pipeline_manager = pipeline.setup_pipeline() self.pipeline_manager = pipeline.setup_pipeline()
self.transport = messaging.get_transport() self.transport = messaging.get_transport()
self.pipe_manager = self._get_pipe_manager(self.transport,
self.pipeline_manager)
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
if cfg.CONF.notification.workload_partitioning: if cfg.CONF.notification.workload_partitioning:
self.ctxt = context.get_admin_context() self.ctxt = context.get_admin_context()
self.group_id = self.NOTIFICATION_NAMESPACE self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
self.partition_coordinator.join_group(self.group_id)
else: else:
# FIXME(sileht): endpoint use notification_topics option # FIXME(sileht): endpoint use notification_topics option
# and it should not because this is oslo_messaging option # and it should not because this is oslo_messaging option
@ -154,12 +152,16 @@ class NotificationService(service_base.BaseService):
messaging.get_notifier(self.transport, '') messaging.get_notifier(self.transport, '')
self.group_id = None self.group_id = None
self.pipe_manager = self._get_pipe_manager(self.transport,
self.pipeline_manager)
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
self.listeners, self.pipeline_listeners = [], [] self.listeners, self.pipeline_listeners = [], []
self._configure_main_queue_listeners(self.pipe_manager, self._configure_main_queue_listeners(self.pipe_manager,
self.event_pipe_manager) self.event_pipe_manager)
if cfg.CONF.notification.workload_partitioning: if cfg.CONF.notification.workload_partitioning:
self.partition_coordinator.join_group(self.group_id)
self._configure_pipeline_listeners() self._configure_pipeline_listeners()
self.partition_coordinator.watch_group(self.group_id, self.partition_coordinator.watch_group(self.group_id,
self._refresh_agent) self._refresh_agent)
@ -220,6 +222,9 @@ class NotificationService(service_base.BaseService):
self.listeners.append(listener) self.listeners.append(listener)
def _refresh_agent(self, event): def _refresh_agent(self, event):
self.reload_pipeline()
def _refresh_listeners(self):
utils.kill_listeners(self.pipeline_listeners) utils.kill_listeners(self.pipeline_listeners)
self._configure_pipeline_listeners() self._configure_pipeline_listeners()
@ -228,10 +233,9 @@ class NotificationService(service_base.BaseService):
ev_pipes = [] ev_pipes = []
if cfg.CONF.notification.store_events: if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines ev_pipes = self.event_pipeline_manager.pipelines
partitioned = self.partition_coordinator.extract_my_subset( pipelines = self.pipeline_manager.pipelines + ev_pipes
self.group_id, self.pipeline_manager.pipelines + ev_pipes)
transport = messaging.get_transport() transport = messaging.get_transport()
for pipe in partitioned: for pipe in pipelines:
LOG.debug(_('Pipeline endpoint: %s'), pipe.name) LOG.debug(_('Pipeline endpoint: %s'), pipe.name)
pipe_endpoint = (pipeline.EventPipelineEndpoint pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline) else if isinstance(pipe, pipeline.EventPipeline) else
@ -239,7 +243,9 @@ class NotificationService(service_base.BaseService):
listener = messaging.get_notification_listener( listener = messaging.get_notification_listener(
transport, transport,
[oslo_messaging.Target( [oslo_messaging.Target(
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))], topic='%s-%s-%s' % (self.NOTIFICATION_IPC,
pipe.name,
self.partition_coordinator._my_id))],
[pipe_endpoint(self.ctxt, pipe)]) [pipe_endpoint(self.ctxt, pipe)])
listener.start() listener.start()
self.pipeline_listeners.append(listener) self.pipeline_listeners.append(listener)
@ -256,6 +262,9 @@ class NotificationService(service_base.BaseService):
self.pipe_manager = self._get_pipe_manager( self.pipe_manager = self._get_pipe_manager(
self.transport, self.pipeline_manager) self.transport, self.pipeline_manager)
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
# re-start the main queue listeners. # re-start the main queue listeners.
utils.kill_listeners(self.listeners) utils.kill_listeners(self.listeners)
self._configure_main_queue_listeners( self._configure_main_queue_listeners(
@ -264,4 +273,4 @@ class NotificationService(service_base.BaseService):
# re-start the pipeline listeners if workload partitioning # re-start the pipeline listeners if workload partitioning
# is enabled. # is enabled.
if cfg.CONF.notification.workload_partitioning: if cfg.CONF.notification.workload_partitioning:
self._refresh_agent(None) self._refresh_listeners()

View File

@ -130,6 +130,7 @@ class _PipelineTransportManager(object):
def publisher(self, context): def publisher(self, context):
serializer = self.serializer serializer = self.serializer
hash_to_bucketise = self.hash_to_bucketise
transporters = self.transporters transporters = self.transporters
filter_attr = self.filter_attr filter_attr = self.filter_attr
event_type = self.event_type event_type = self.event_type
@ -137,13 +138,20 @@ class _PipelineTransportManager(object):
class PipelinePublishContext(object): class PipelinePublishContext(object):
def __enter__(self): def __enter__(self):
def p(data): def p(data):
serialized_data = serializer(data) # TODO(gordc): cleanup so payload is always single
for d_filter, notifier in transporters: # datapoint. we can't correctly bucketise
if any(d_filter(d[filter_attr]) # datapoints if batched.
for d in serialized_data): data = [data] if not isinstance(data, list) else data
notifier.sample(context.to_dict(), for datapoint in data:
event_type=event_type, serialized_data = serializer(datapoint)
payload=serialized_data) for d_filter, notifiers in transporters:
if d_filter(serialized_data[filter_attr]):
key = (hash_to_bucketise(serialized_data) %
len(notifiers))
notifier = notifiers[key]
notifier.sample(context.to_dict(),
event_type=event_type,
payload=[serialized_data])
return p return p
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
@ -156,20 +164,28 @@ class SamplePipelineTransportManager(_PipelineTransportManager):
filter_attr = 'counter_name' filter_attr = 'counter_name'
event_type = 'ceilometer.pipeline' event_type = 'ceilometer.pipeline'
@staticmethod
def hash_to_bucketise(datapoint):
return hash(datapoint['resource_id'])
@staticmethod @staticmethod
def serializer(data): def serializer(data):
return [publisher_utils.meter_message_from_counter( return publisher_utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret) for sample in data] data, cfg.CONF.publisher.telemetry_secret)
class EventPipelineTransportManager(_PipelineTransportManager): class EventPipelineTransportManager(_PipelineTransportManager):
filter_attr = 'event_type' filter_attr = 'event_type'
event_type = 'pipeline.event' event_type = 'pipeline.event'
@staticmethod
def hash_to_bucketise(datapoint):
return hash(datapoint['event_type'])
@staticmethod @staticmethod
def serializer(data): def serializer(data):
return [publisher_utils.message_from_event( return publisher_utils.message_from_event(
data, cfg.CONF.publisher.telemetry_secret)] data, cfg.CONF.publisher.telemetry_secret)
class PublishContext(object): class PublishContext(object):

View File

@ -363,16 +363,6 @@ class TestRealNotification(BaseRealNotification):
self._check_notification_service() self._check_notification_service()
self.assertEqual('memory', self.publisher.samples[0].name) self.assertEqual('memory', self.publisher.samples[0].name)
@mock.patch('ceilometer.coordination.PartitionCoordinator')
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_ha_configured_agent_coord_disabled(self, fake_publisher_cls,
fake_coord):
fake_publisher_cls.return_value = self.publisher
fake_coord1 = mock.MagicMock()
fake_coord1.extract_my_subset.side_effect = lambda x, y: y
fake_coord.return_value = fake_coord1
self._check_notification_service()
@mock.patch.object(oslo_service.service.Service, 'stop') @mock.patch.object(oslo_service.service.Service, 'stop')
def test_notification_service_start_abnormal(self, mocked): def test_notification_service_start_abnormal(self, mocked):
try: try:
@ -390,32 +380,24 @@ class TestRealNotificationHA(BaseRealNotification):
group='notification') group='notification')
self.srv = notification.NotificationService() self.srv = notification.NotificationService()
@mock.patch('ceilometer.coordination.PartitionCoordinator')
@mock.patch('ceilometer.publisher.test.TestPublisher') @mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls, fake_coord): def test_notification_service(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher fake_publisher_cls.return_value = self.publisher
fake_coord1 = mock.MagicMock()
fake_coord1.extract_my_subset.side_effect = lambda x, y: y
fake_coord.return_value = fake_coord1
self._check_notification_service() self._check_notification_service()
@mock.patch('hmac.new') @mock.patch('hmac.new')
@mock.patch('ceilometer.coordination.PartitionCoordinator')
@mock.patch('ceilometer.publisher.test.TestPublisher') @mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service_no_secret(self, fake_publisher_cls, def test_notification_service_no_secret(self, fake_publisher_cls,
fake_coord, fake_hmac): fake_hmac):
self.CONF.set_override('telemetry_secret', None, group='publisher') self.CONF.set_override('telemetry_secret', None, group='publisher')
fake_publisher_cls.return_value = self.publisher fake_publisher_cls.return_value = self.publisher
fake_coord1 = mock.MagicMock()
fake_coord1.extract_my_subset.side_effect = lambda x, y: y
fake_coord.return_value = fake_coord1
self._check_notification_service() self._check_notification_service()
self.assertFalse(fake_hmac.called) self.assertFalse(fake_hmac.called)
def test_reset_listeners_on_refresh(self): def test_reset_listeners_on_refresh(self):
self.srv.start() self.srv.start()
self.assertEqual(2, len(self.srv.pipeline_listeners)) self.assertEqual(2, len(self.srv.pipeline_listeners))
self.srv._refresh_agent(None) self.srv._refresh_listeners()
self.assertEqual(2, len(self.srv.pipeline_listeners)) self.assertEqual(2, len(self.srv.pipeline_listeners))
self.srv.stop() self.srv.stop()
@ -449,3 +431,41 @@ class TestRealNotificationHA(BaseRealNotification):
self.assertEqual('ceilometer.pipeline', self.assertEqual('ceilometer.pipeline',
mock_notifier.call_args_list[2][1]['event_type']) mock_notifier.call_args_list[2][1]['event_type'])
self.srv.stop() self.srv.stop()
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self.srv2 = notification.NotificationService()
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'._get_members', return_value=['harry', 'lloyd']):
with mock.patch('uuid.uuid4', return_value='harry'):
self.srv.start()
with mock.patch('uuid.uuid4', return_value='lloyd'):
self.srv2.start()
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
payload1 = TEST_NOTICE_PAYLOAD.copy()
payload1['instance_id'] = '0'
notifier.info(context.RequestContext(), 'compute.instance.create.end',
payload1)
payload2 = TEST_NOTICE_PAYLOAD.copy()
payload2['instance_id'] = '1'
notifier.info(context.RequestContext(), 'compute.instance.create.end',
payload2)
self.expected_samples = 4
self.expected_events = 2
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 60:
if (len(self.publisher.samples) >= self.expected_samples and
len(self.publisher.events) >= self.expected_events):
break
eventlet.sleep(0)
self.srv.stop()
self.srv2.stop()
resources = set(s.resource_id for s in self.publisher.samples)
self.assertEqual(self.expected_samples, len(self.publisher.samples))
self.assertEqual(self.expected_events, len(self.publisher.events))
self.assertEqual(set(['1', '0']), resources)