diff --git a/ceilometer/notification.py b/ceilometer/notification.py index cebd09dbf6..af228f0787 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -1,5 +1,5 @@ # -# Copyright 2017 Red Hat, Inc. +# Copyright 2017-2018 Red Hat, Inc. # Copyright 2012-2013 eNovance # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -13,45 +13,25 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import itertools -import threading import time -import uuid -from concurrent import futures import cotyledon -from futurist import periodics from oslo_config import cfg from oslo_log import log import oslo_messaging from stevedore import named -from tooz import coordination from ceilometer.i18n import _ from ceilometer import messaging -from ceilometer import utils LOG = log.getLogger(__name__) OPTS = [ - cfg.IntOpt('pipeline_processing_queues', - deprecated_for_removal=True, - default=10, - min=1, - help='Number of queues to parallelize workload across. This ' - 'value should be larger than the number of active ' - 'notification agents for optimal results. WARNING: ' - 'Once set, lowering this value may result in lost data.'), cfg.BoolOpt('ack_on_event_error', default=True, help='Acknowledge message when event persistence fails.'), - cfg.BoolOpt('workload_partitioning', - deprecated_for_removal=True, - default=False, - help='Enable workload partitioning, allowing multiple ' - 'notification agents to be run simultaneously.'), cfg.MultiStrOpt('messaging_urls', default=[], secret=True, @@ -68,10 +48,6 @@ OPTS = [ help='Number of notification messages to wait before ' 'publishing them. Batching is advised when transformations are ' 'applied in pipeline.'), - cfg.IntOpt('batch_timeout', - default=5, - help='Number of seconds to wait before publishing samples ' - 'when batch_size is not reached (None means indefinitely)'), cfg.IntOpt('workers', default=1, min=1, @@ -114,25 +90,11 @@ class NotificationService(cotyledon.Service): self.startup_delay = worker_id self.conf = conf - self.periodic = None - self.shutdown = False self.listeners = [] # NOTE(kbespalov): for the pipeline queues used a single amqp host # hence only one listener is required self.pipeline_listener = None - if self.conf.notification.workload_partitioning: - # XXX uuid4().bytes ought to work, but it requires ascii for now - coordination_id = (coordination_id or - str(uuid.uuid4()).encode('ascii')) - self.partition_coordinator = coordination.get_coordinator( - self.conf.coordination.backend_url, coordination_id) - self.partition_set = list(range( - self.conf.notification.pipeline_processing_queues)) - self.group_state = None - else: - self.partition_coordinator = None - def get_targets(self): """Return a sequence of oslo_messaging.Target @@ -154,49 +116,22 @@ class NotificationService(cotyledon.Service): time.sleep(self.startup_delay) super(NotificationService, self).run() - self.coord_lock = threading.Lock() self.managers = [ext.obj for ext in named.NamedExtensionManager( namespace='ceilometer.notification.pipeline', names=self.conf.notification.pipelines, invoke_on_load=True, on_missing_entrypoints_callback=self._log_missing_pipeline, - invoke_args=(self.conf, - self.conf.notification.workload_partitioning))] + invoke_args=(self.conf,))] self.transport = messaging.get_transport(self.conf) - if self.conf.notification.workload_partitioning: - self.partition_coordinator.start(start_heart=True) - else: - # FIXME(sileht): endpoint uses the notification_topics option - # and it should not because this is an oslo_messaging option - # not a ceilometer. Until we have something to get the - # notification_topics in another way, we must create a transport - # to ensure the option has been registered by oslo_messaging. - messaging.get_notifier(self.transport, '') + # FIXME(sileht): endpoint uses the notification_topics option + # and it should not because this is an oslo_messaging option + # not a ceilometer. Until we have something to get the + # notification_topics in another way, we must create a transport + # to ensure the option has been registered by oslo_messaging. + messaging.get_notifier(self.transport, '') - self._configure_main_queue_listeners() - - if self.conf.notification.workload_partitioning: - # join group after all manager set up is configured - self.hashring = self.partition_coordinator.join_partitioned_group( - self.NOTIFICATION_NAMESPACE) - - @periodics.periodic(spacing=self.conf.coordination.check_watchers, - run_immediately=True) - def run_watchers(): - self.partition_coordinator.run_watchers() - if self.group_state != self.hashring.ring.nodes: - self.group_state = self.hashring.ring.nodes.copy() - self._refresh_agent() - - self.periodic = periodics.PeriodicWorker.create( - [], executor_factory=lambda: - futures.ThreadPoolExecutor(max_workers=10)) - self.periodic.add(run_watchers) - utils.spawn_thread(self.periodic.start) - - def _configure_main_queue_listeners(self): endpoints = [] for pipe_mgr in self.managers: endpoints.extend(pipe_mgr.get_main_endpoints()) @@ -214,41 +149,6 @@ class NotificationService(cotyledon.Service): ) self.listeners.append(listener) - def _refresh_agent(self): - with self.coord_lock: - if self.shutdown: - # NOTE(sileht): We are going to shutdown we everything will be - # stopped, we should not restart them - return - self._configure_pipeline_listener() - - def _configure_pipeline_listener(self): - partitioned = list(filter( - self.hashring.belongs_to_self, self.partition_set)) - - endpoints = [] - for pipe_mgr in self.managers: - endpoints.extend(pipe_mgr.get_interim_endpoints()) - - targets = [] - for mgr, hash_id in itertools.product(self.managers, partitioned): - topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)]) - LOG.debug('Listening to queue: %s', topic) - targets.append(oslo_messaging.Target(topic=topic)) - - if self.pipeline_listener: - self.kill_listeners([self.pipeline_listener]) - - self.pipeline_listener = messaging.get_batch_notification_listener( - self.transport, targets, endpoints, allow_requeue=True, - batch_size=self.conf.notification.batch_size, - batch_timeout=self.conf.notification.batch_timeout) - # NOTE(gordc): set single thread to process data sequentially - # if batching enabled. - batch = (1 if self.conf.notification.batch_size > 1 - else self.conf.max_parallel_requests) - self.pipeline_listener.start(override_pool_size=batch) - @staticmethod def kill_listeners(listeners): # NOTE(gordc): correct usage of oslo.messaging listener is to stop(), @@ -259,15 +159,8 @@ class NotificationService(cotyledon.Service): listener.wait() def terminate(self): - self.shutdown = True - if self.periodic: - self.periodic.stop() - self.periodic.wait() - if self.partition_coordinator: - self.partition_coordinator.stop() - with self.coord_lock: - if self.pipeline_listener: - self.kill_listeners([self.pipeline_listener]) - self.kill_listeners(self.listeners) + if self.pipeline_listener: + self.kill_listeners([self.pipeline_listener]) + self.kill_listeners(self.listeners) super(NotificationService, self).terminate() diff --git a/ceilometer/pipeline/base.py b/ceilometer/pipeline/base.py index 99a7b70907..73114b1d34 100644 --- a/ceilometer/pipeline/base.py +++ b/ceilometer/pipeline/base.py @@ -22,7 +22,6 @@ import oslo_messaging import six from ceilometer import agent -from ceilometer import messaging from ceilometer import publisher OPTS = [ @@ -45,52 +44,6 @@ class PipelineException(agent.ConfigException): super(PipelineException, self).__init__('Pipeline', message, cfg) -class InterimPublishContext(object): - """Publisher to hash/shard data to pipelines""" - - def __init__(self, conf, mgr): - self.conf = conf - self.mgr = mgr - self.notifiers = self._get_notifiers(messaging.get_transport(conf)) - - def _get_notifiers(self, transport): - notifiers = [] - for x in range(self.conf.notification.pipeline_processing_queues): - notifiers.append(oslo_messaging.Notifier( - transport, - driver=self.conf.publisher_notifier.telemetry_driver, - topics=['-'.join( - [self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])])) - return notifiers - - @staticmethod - def hash_grouping(datapoint, grouping_keys): - # FIXME(gordc): this logic only supports a single grouping_key. we - # need to change to support pipeline with multiple transformers and - # different grouping_keys - value = '' - for key in grouping_keys or []: - value += datapoint.get(key) if datapoint.get(key) else '' - return hash(value) - - def __enter__(self): - def p(data): - data = [data] if not isinstance(data, list) else data - for datapoint in data: - for pipe in self.mgr.pipelines: - if pipe.supported(datapoint): - serialized_data = pipe.serializer(datapoint) - key = (self.hash_grouping(serialized_data, - pipe.get_grouping_key()) - % len(self.notifiers)) - self.notifiers[key].sample({}, event_type=pipe.name, - payload=[serialized_data]) - return p - - def __exit__(self, exc_type, exc_value, traceback): - pass - - class PublishContext(object): def __init__(self, pipelines): self.pipelines = pipelines or [] @@ -239,24 +192,10 @@ class Pipeline(object): def publish_data(self, data): """Publish data from pipeline.""" - @abc.abstractproperty - def default_grouping_key(self): - """Attribute to hash data on. Pass if no partitioning.""" - @abc.abstractmethod def supported(self, data): """Attribute to filter on. Pass if no partitioning.""" - @abc.abstractmethod - def serializer(self, data): - """Serialize data for interim transport. Pass if no partitioning.""" - - def get_grouping_key(self): - keys = [] - for transformer in self.sink.transformers: - keys += transformer.grouping_keys - return list(set(keys)) or self.default_grouping_key - class PublisherManager(object): def __init__(self, conf, purpose): @@ -281,7 +220,7 @@ class PipelineManager(agent.ConfigManagerBase): NOTIFICATION_IPC = 'ceilometer_ipc' - def __init__(self, conf, cfg_file, transformer_manager, partition): + def __init__(self, conf, cfg_file, transformer_manager): """Setup the pipelines according to config. The configuration is supported as follows: @@ -381,7 +320,6 @@ class PipelineManager(agent.ConfigManagerBase): unique_names.add(pipe.name) self.pipelines.append(pipe) unique_names.clear() - self.partition = partition @abc.abstractproperty def pm_type(self): @@ -403,23 +341,10 @@ class PipelineManager(agent.ConfigManagerBase): """Build publisher for pipeline publishing.""" return PublishContext(self.pipelines) - def interim_publisher(self): - """Build publishing context for IPC.""" - return InterimPublishContext(self.conf, self) - - def get_main_publisher(self): - """Return the publishing context to use""" - return (self.interim_publisher() if self.partition else - self.publisher()) - def get_main_endpoints(self): """Return endpoints for main queue.""" pass - def get_interim_endpoints(self): - """Return endpoints for interim pipeline queues.""" - pass - class NotificationEndpoint(object): """Base Endpoint for plugins that support the notification API.""" diff --git a/ceilometer/pipeline/event.py b/ceilometer/pipeline/event.py index 1243d70662..4b3f0b6413 100644 --- a/ceilometer/pipeline/event.py +++ b/ceilometer/pipeline/event.py @@ -11,18 +11,13 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from itertools import chain - from oslo_log import log import oslo_messaging -from oslo_utils import timeutils from stevedore import extension from ceilometer import agent from ceilometer.event import converter -from ceilometer.event import models from ceilometer.pipeline import base -from ceilometer.publisher import utils as publisher_utils LOG = log.getLogger(__name__) @@ -67,39 +62,6 @@ class EventEndpoint(base.MainNotificationEndpoint): return oslo_messaging.NotificationResult.HANDLED -class InterimEventEndpoint(base.NotificationEndpoint): - def __init__(self, conf, publisher, pipe_name): - self.event_types = [pipe_name] - super(InterimEventEndpoint, self).__init__(conf, publisher) - - def sample(self, notifications): - return self.process_notifications('sample', notifications) - - def process_notifications(self, priority, notifications): - events = chain.from_iterable(m["payload"] for m in notifications) - events = [ - models.Event( - message_id=ev['message_id'], - event_type=ev['event_type'], - generated=timeutils.normalize_time( - timeutils.parse_isotime(ev['generated'])), - traits=[models.Trait(name, dtype, - models.Trait.convert_value(dtype, value)) - for name, dtype, value in ev['traits']], - raw=ev.get('raw', {})) - for ev in events if publisher_utils.verify_signature( - ev, self.conf.publisher.telemetry_secret) - ] - try: - with self.publisher as p: - p(events) - except Exception: - if not self.conf.notification.ack_on_event_error: - return oslo_messaging.NotificationResult.REQUEUE - raise - return oslo_messaging.NotificationResult.HANDLED - - class EventSource(base.PipelineSource): """Represents a source of events. @@ -140,8 +102,6 @@ class EventSink(base.Sink): class EventPipeline(base.Pipeline): """Represents a pipeline for Events.""" - default_grouping_key = ['event_type'] - def __str__(self): # NOTE(gordc): prepend a namespace so we ensure event and sample # pipelines do not have the same name. @@ -153,10 +113,6 @@ class EventPipeline(base.Pipeline): supported = [e for e in events if self.supported(e)] self.sink.publish_events(supported) - def serializer(self, event): - return publisher_utils.message_from_event( - event, self.conf.publisher.telemetry_secret) - def supported(self, event): return self.source.support_event(event.event_type) @@ -168,17 +124,9 @@ class EventPipelineManager(base.PipelineManager): pm_source = EventSource pm_sink = EventSink - def __init__(self, conf, partition=False): + def __init__(self, conf): super(EventPipelineManager, self).__init__( - conf, conf.event_pipeline_cfg_file, {}, partition) + conf, conf.event_pipeline_cfg_file, {}) def get_main_endpoints(self): - return [EventEndpoint(self.conf, self.get_main_publisher())] - - def get_interim_endpoints(self): - # FIXME(gordc): change this so we shard data rather than per - # pipeline. this will allow us to use self.publisher and less - # queues. - return [InterimEventEndpoint( - self.conf, base.PublishContext([pipe]), pipe.name) - for pipe in self.pipelines] + return [EventEndpoint(self.conf, self.publisher())] diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py index 3e3db8fa41..f036f1d201 100644 --- a/ceilometer/pipeline/sample.py +++ b/ceilometer/pipeline/sample.py @@ -10,15 +10,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -from itertools import chain - from oslo_log import log from stevedore import extension from ceilometer import agent from ceilometer.pipeline import base -from ceilometer.publisher import utils as publisher_utils -from ceilometer import sample as sample_util LOG = log.getLogger(__name__) @@ -52,37 +48,6 @@ class SampleEndpoint(base.MainNotificationEndpoint): pass -class InterimSampleEndpoint(base.NotificationEndpoint): - def __init__(self, conf, publisher, pipe_name): - self.event_types = [pipe_name] - super(InterimSampleEndpoint, self).__init__(conf, publisher) - - def sample(self, notifications): - return self.process_notifications('sample', notifications) - - def process_notifications(self, priority, notifications): - samples = chain.from_iterable(m["payload"] for m in notifications) - samples = [ - sample_util.Sample(name=s['counter_name'], - type=s['counter_type'], - unit=s['counter_unit'], - volume=s['counter_volume'], - user_id=s['user_id'], - project_id=s['project_id'], - resource_id=s['resource_id'], - timestamp=s['timestamp'], - resource_metadata=s['resource_metadata'], - source=s.get('source'), - # NOTE(sileht): May come from an older node, - # Put None in this case. - monotonic_time=s.get('monotonic_time')) - for s in samples if publisher_utils.verify_signature( - s, self.conf.publisher.telemetry_secret) - ] - with self.publisher as p: - p(samples) - - class SampleSource(base.PipelineSource): """Represents a source of samples. @@ -181,8 +146,6 @@ class SampleSink(base.Sink): class SamplePipeline(base.Pipeline): """Represents a pipeline for Samples.""" - default_grouping_key = ['resource_id'] - def _validate_volume(self, s): volume = s.volume if volume is None: @@ -219,10 +182,6 @@ class SamplePipeline(base.Pipeline): and self._validate_volume(s)] self.sink.publish_samples(supported) - def serializer(self, sample): - return publisher_utils.meter_message_from_counter( - sample, self.conf.publisher.telemetry_secret) - def supported(self, sample): return self.source.support_meter(sample.name) @@ -234,10 +193,9 @@ class SamplePipelineManager(base.PipelineManager): pm_source = SampleSource pm_sink = SampleSink - def __init__(self, conf, partition=False): + def __init__(self, conf): super(SamplePipelineManager, self).__init__( - conf, conf.pipeline_cfg_file, self.get_transform_manager(), - partition) + conf, conf.pipeline_cfg_file, self.get_transform_manager()) @staticmethod def get_transform_manager(): @@ -247,13 +205,5 @@ class SamplePipelineManager(base.PipelineManager): exts = extension.ExtensionManager( namespace='ceilometer.sample.endpoint', invoke_on_load=True, - invoke_args=(self.conf, self.get_main_publisher())) + invoke_args=(self.conf, self.publisher())) return [ext.obj for ext in exts] - - def get_interim_endpoints(self): - # FIXME(gordc): change this so we shard data rather than per - # pipeline. this will allow us to use self.publisher and less - # queues. - return [InterimSampleEndpoint( - self.conf, base.PublishContext([pipe]), pipe.name) - for pipe in self.pipelines] diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py index 345e6930ae..10b212be59 100644 --- a/ceilometer/tests/unit/pipeline_base.py +++ b/ceilometer/tests/unit/pipeline_base.py @@ -75,7 +75,6 @@ class BasePipelineTestCase(base.BaseTestCase): class TransformerClass(transformer.TransformerBase): samples = [] - grouping_keys = ['counter_name'] def __init__(self, append_name='_update'): self.__class__.samples = [] @@ -102,7 +101,6 @@ class BasePipelineTestCase(base.BaseTestCase): class TransformerClassDrop(transformer.TransformerBase): samples = [] - grouping_keys = ['resource_id'] def __init__(self): self.__class__.samples = [] @@ -111,7 +109,6 @@ class BasePipelineTestCase(base.BaseTestCase): self.__class__.samples.append(counter) class TransformerClassException(object): - grouping_keys = ['resource_id'] @staticmethod def handle_sample(counter): @@ -2171,46 +2168,3 @@ class BasePipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() - - def test_get_pipeline_grouping_key(self): - transformer_cfg = [ - { - 'name': 'update', - 'parameters': {} - }, - { - 'name': 'unit_conversion', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_mins', - 'unit': 'min', - 'scale': 'volume'}, - } - }, - { - 'name': 'update', - 'parameters': {} - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - self.assertEqual(set(['resource_id', 'counter_name']), - set(pipeline_manager.pipelines[0].get_grouping_key())) - - def test_get_pipeline_duplicate_grouping_key(self): - transformer_cfg = [ - { - 'name': 'update', - 'parameters': {} - }, - { - 'name': 'update', - 'parameters': {} - }, - ] - self._set_pipeline_cfg('transformers', transformer_cfg) - self._build_and_set_new_pipeline() - pipeline_manager = pipeline.SamplePipelineManager(self.CONF) - self.assertEqual(['counter_name'], - pipeline_manager.pipelines[0].get_grouping_key()) diff --git a/ceilometer/tests/unit/test_event_pipeline.py b/ceilometer/tests/unit/test_event_pipeline.py index 3c5dd54830..edbfc38fcf 100644 --- a/ceilometer/tests/unit/test_event_pipeline.py +++ b/ceilometer/tests/unit/test_event_pipeline.py @@ -16,15 +16,12 @@ import traceback import uuid import fixtures -import mock -import oslo_messaging from ceilometer.event import models from ceilometer.pipeline import base as pipeline from ceilometer.pipeline import event from ceilometer import publisher from ceilometer.publisher import test as test_publisher -from ceilometer.publisher import utils from ceilometer import service from ceilometer.tests import base @@ -357,40 +354,3 @@ class EventPipelineTestCase(base.BaseTestCase): def test_unique_pipeline_names(self): self._dup_pipeline_name_cfg() self._exception_create_pipelinemanager() - - def test_event_pipeline_endpoint_requeue_on_failure(self): - self.CONF.set_override("ack_on_event_error", False, - group="notification") - self.CONF.set_override("telemetry_secret", "not-so-secret", - group="publisher") - test_data = { - 'message_id': uuid.uuid4(), - 'event_type': 'a', - 'generated': '2013-08-08 21:06:37.803826', - 'traits': [ - {'name': 't_text', - 'value': 1, - 'dtype': 'text_trait' - } - ], - 'raw': {'status': 'started'} - } - message_sign = utils.compute_signature(test_data, 'not-so-secret') - test_data['message_signature'] = message_sign - - fake_publisher = mock.Mock() - self.useFixture(fixtures.MockPatch( - 'ceilometer.publisher.test.TestPublisher', - return_value=fake_publisher)) - - self._build_and_set_new_pipeline() - pipeline_manager = event.EventPipelineManager(self.CONF) - pipe = pipeline_manager.pipelines[0] - event_pipeline_endpoint = event.InterimEventEndpoint( - self.CONF, pipeline.PublishContext([pipe]), pipe.name) - - fake_publisher.publish_events.side_effect = Exception - ret = event_pipeline_endpoint.sample([ - {'ctxt': {}, 'publisher_id': 'compute.vagrant-precise', - 'event_type': 'a', 'payload': [test_data], 'metadata': {}}]) - self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py index cfe3bcd4aa..e7d5fca71b 100644 --- a/ceilometer/tests/unit/test_notification.py +++ b/ceilometer/tests/unit/test_notification.py @@ -17,7 +17,6 @@ import time import mock -import oslo_messaging from oslo_utils import fileutils import six import yaml @@ -84,14 +83,6 @@ class BaseNotificationTest(tests_base.BaseTestCase): def run_service(self, srv): srv.run() self.addCleanup(srv.terminate) - if srv.conf.notification.workload_partitioning: - start = time.time() - while time.time() - start < 10: - if srv.group_state and srv.pipeline_listener: - break # ensure pipeline is set if HA - time.sleep(0.1) - else: - self.fail('Did not start pipeline queues') class TestNotification(BaseNotificationTest): @@ -242,273 +233,3 @@ class TestRealNotification(BaseRealNotification): if len(self.publisher.events) >= self.expected_events: break self.assertEqual(self.expected_events, len(self.publisher.events)) - - -class TestRealNotificationHA(BaseRealNotification): - - def setUp(self): - super(TestRealNotificationHA, self).setUp() - self.CONF.set_override('workload_partitioning', True, - group='notification') - self.CONF.set_override("backend_url", "zake://", group="coordination") - self.srv = notification.NotificationService(0, self.CONF) - - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_notification_service(self, fake_publisher_cls): - fake_publisher_cls.return_value = self.publisher - self._check_notification_service() - - @mock.patch.object(oslo_messaging.MessageHandlingServer, 'stop') - @mock.patch.object(oslo_messaging.MessageHandlingServer, 'wait') - @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start') - def test_notification_threads(self, m_listener, m_wait, m_stop): - self.CONF.set_override('batch_size', 1, group='notification') - self.srv.run() - m_listener.assert_called_with( - override_pool_size=self.CONF.max_parallel_requests) - m_listener.reset_mock() - self.CONF.set_override('batch_size', 2, group='notification') - self.srv._refresh_agent() - m_listener.assert_called_with(override_pool_size=1) - - @mock.patch('oslo_messaging.get_batch_notification_listener') - def test_reset_listener_on_refresh(self, mock_listener): - mock_listener.side_effect = [ - mock.MagicMock(), # main listener - mock.MagicMock(), # pipeline listener - mock.MagicMock(), # refresh pipeline listener - ] - self.run_service(self.srv) - listener = self.srv.pipeline_listener - self.srv._refresh_agent() - self.assertIsNot(listener, self.srv.pipeline_listener) - - def test_hashring_targets(self): - maybe = {"maybe": 0} - - def _once_over_five(item): - maybe["maybe"] += 1 - return maybe["maybe"] % 5 == 0 - - hashring = mock.MagicMock() - hashring.belongs_to_self = _once_over_five - self.srv.partition_coordinator = pc = mock.MagicMock() - pc.join_partitioned_group.return_value = hashring - self.run_service(self.srv) - topics = [target.topic for target in - self.srv.pipeline_listener.targets] - self.assertEqual(4, len(topics)) - self.assertEqual( - {'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9', - 'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'}, - set(topics)) - - @mock.patch('oslo_messaging.get_batch_notification_listener') - def test_notify_to_relevant_endpoint(self, mock_listener): - self.run_service(self.srv) - - targets = mock_listener.call_args[0][1] - self.assertIsNotEmpty(targets) - - pipe_list = [] - for mgr in self.srv.managers: - for pipe in mgr.pipelines: - pipe_list.append(pipe.name) - - for pipe in pipe_list: - for endpoint in mock_listener.call_args[0][2]: - self.assertTrue(hasattr(endpoint, 'filter_rule')) - if endpoint.filter_rule.match(None, None, pipe, None, None): - break - else: - self.fail('%s not handled by any endpoint' % pipe) - - @mock.patch('oslo_messaging.Notifier.sample') - def test_broadcast_to_relevant_pipes_only(self, mock_notifier): - self.run_service(self.srv) - for endpoint in self.srv.listeners[0].dispatcher.endpoints: - if (hasattr(endpoint, 'filter_rule') and - not endpoint.filter_rule.match(None, None, 'nonmatching.end', - None, None)): - continue - endpoint.info([{ - 'ctxt': TEST_NOTICE_CTXT, - 'publisher_id': 'compute.vagrant-precise', - 'event_type': 'nonmatching.end', - 'payload': TEST_NOTICE_PAYLOAD, - 'metadata': TEST_NOTICE_METADATA}]) - self.assertFalse(mock_notifier.called) - for endpoint in self.srv.listeners[0].dispatcher.endpoints: - if (hasattr(endpoint, 'filter_rule') and - not endpoint.filter_rule.match(None, None, - 'compute.instance.create.end', - None, None)): - continue - endpoint.info([{ - 'ctxt': TEST_NOTICE_CTXT, - 'publisher_id': 'compute.vagrant-precise', - 'event_type': 'compute.instance.create.end', - 'payload': TEST_NOTICE_PAYLOAD, - 'metadata': TEST_NOTICE_METADATA}]) - - self.assertTrue(mock_notifier.called) - self.assertEqual(3, mock_notifier.call_count) - self.assertEqual(1, len([i for i in mock_notifier.call_args_list - if 'event_type' in i[1]['payload'][0]])) - self.assertEqual(2, len([i for i in mock_notifier.call_args_list - if 'counter_name' in i[1]['payload'][0]])) - - -class TestRealNotificationMultipleAgents(BaseNotificationTest): - def setup_pipeline(self, transformers): - pipeline = yaml.dump({ - 'sources': [{ - 'name': 'test_pipeline', - 'interval': 5, - 'meters': ['vcpus', 'memory'], - 'sinks': ['test_sink'] - }], - 'sinks': [{ - 'name': 'test_sink', - 'transformers': transformers, - 'publishers': ['test://'] - }] - }) - if six.PY3: - pipeline = pipeline.encode('utf-8') - - pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline, - prefix="pipeline", - suffix="yaml") - return pipeline_cfg_file - - def setup_event_pipeline(self): - pipeline = yaml.dump({ - 'sources': [], - 'sinks': [] - }) - if six.PY3: - pipeline = pipeline.encode('utf-8') - - pipeline_cfg_file = fileutils.write_to_tempfile( - content=pipeline, prefix="event_pipeline", suffix="yaml") - return pipeline_cfg_file - - def setUp(self): - super(TestRealNotificationMultipleAgents, self).setUp() - self.CONF = service.prepare_service([], []) - self.setup_messaging(self.CONF, 'nova') - - pipeline_cfg_file = self.setup_pipeline(['instance', 'memory']) - event_pipeline_cfg_file = self.setup_event_pipeline() - self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self.CONF.set_override("event_pipeline_cfg_file", - event_pipeline_cfg_file) - self.CONF.set_override("backend_url", "zake://", group="coordination") - self.CONF.set_override('workload_partitioning', True, - group='notification') - self.CONF.set_override('pipeline_processing_queues', 2, - group='notification') - self.CONF.set_override('check_watchers', 1, group='coordination') - self.publisher = test_publisher.TestPublisher(self.CONF, "") - self.publisher2 = test_publisher.TestPublisher(self.CONF, "") - - def _check_notifications(self, fake_publisher_cls): - fake_publisher_cls.side_effect = [self.publisher, self.publisher2] - - maybe = {"srv": 0, "srv2": -1} - - def _sometimes_srv(item): - maybe["srv"] += 1 - return (maybe["srv"] % 2) == 0 - - self.srv = notification.NotificationService(0, self.CONF) - self.srv.partition_coordinator = pc = mock.MagicMock() - hashring_srv1 = mock.MagicMock() - hashring_srv1.belongs_to_self = _sometimes_srv - hashring_srv1.ring.nodes = {'id1': mock.Mock()} - pc.join_partitioned_group.return_value = hashring_srv1 - self.run_service(self.srv) - - def _sometimes_srv2(item): - maybe["srv2"] += 1 - return (maybe["srv2"] % 2) == 0 - - self.srv2 = notification.NotificationService(0, self.CONF) - self.srv2.partition_coordinator = pc = mock.MagicMock() - hashring = mock.MagicMock() - hashring.belongs_to_self = _sometimes_srv2 - hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()} - self.srv.hashring.ring.nodes = hashring.ring.nodes.copy() - pc.join_partitioned_group.return_value = hashring - self.run_service(self.srv2) - - notifier = messaging.get_notifier(self.transport, - "compute.vagrant-precise") - payload1 = TEST_NOTICE_PAYLOAD.copy() - payload1['instance_id'] = '0' - notifier.info({}, 'compute.instance.create.end', payload1) - payload2 = TEST_NOTICE_PAYLOAD.copy() - payload2['instance_id'] = '1' - notifier.info({}, 'compute.instance.create.end', payload2) - self.expected_samples = 4 - with mock.patch('six.moves.builtins.hash', lambda x: int(x)): - start = time.time() - while time.time() - start < 10: - if (len(self.publisher.samples + self.publisher2.samples) >= - self.expected_samples and - len(self.srv.group_state) == 2): - break - time.sleep(0.1) - - self.assertEqual(2, len(self.publisher.samples)) - self.assertEqual(2, len(self.publisher2.samples)) - self.assertEqual(1, len(set( - s.resource_id for s in self.publisher.samples))) - self.assertEqual(1, len(set( - s.resource_id for s in self.publisher2.samples))) - self.assertEqual(2, len(self.srv.group_state)) - - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_multiple_agents_no_transform(self, fake_publisher_cls): - pipeline_cfg_file = self.setup_pipeline([]) - self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self._check_notifications(fake_publisher_cls) - - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_multiple_agents_transform(self, fake_publisher_cls): - pipeline_cfg_file = self.setup_pipeline( - [{ - 'name': 'unit_conversion', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_mins', - 'unit': 'min', - 'scale': 'volume'}, - } - }]) - self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self._check_notifications(fake_publisher_cls) - - @mock.patch('ceilometer.publisher.test.TestPublisher') - def test_multiple_agents_multiple_transform(self, fake_publisher_cls): - pipeline_cfg_file = self.setup_pipeline( - [{ - 'name': 'unit_conversion', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_mins', - 'unit': 'min', - 'scale': 'volume'}, - } - }, { - 'name': 'unit_conversion', - 'parameters': { - 'source': {}, - 'target': {'name': 'cpu_mins', - 'unit': 'min', - 'scale': 'volume'}, - } - }]) - self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file) - self._check_notifications(fake_publisher_cls) diff --git a/ceilometer/transformer/__init__.py b/ceilometer/transformer/__init__.py index 48d78b4dca..3afffee655 100644 --- a/ceilometer/transformer/__init__.py +++ b/ceilometer/transformer/__init__.py @@ -42,10 +42,6 @@ class TransformerBase(object): :param sample: A sample. """ - @abc.abstractproperty - def grouping_keys(self): - """Keys used to group transformer.""" - @staticmethod def flush(): """Flush samples cached previously.""" diff --git a/ceilometer/transformer/accumulator.py b/ceilometer/transformer/accumulator.py index 1e14497c98..db7500762e 100644 --- a/ceilometer/transformer/accumulator.py +++ b/ceilometer/transformer/accumulator.py @@ -22,8 +22,6 @@ class TransformerAccumulator(transformer.TransformerBase): And then flushes them out into the wild. """ - grouping_keys = ['resource_id'] - def __init__(self, size=1, **kwargs): if size >= 1: self.samples = [] diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py index 9d688ccd5d..6039d22afb 100644 --- a/ceilometer/transformer/arithmetic.py +++ b/ceilometer/transformer/arithmetic.py @@ -36,8 +36,6 @@ class ArithmeticTransformer(transformer.TransformerBase): over one or more meters and/or their metadata. """ - grouping_keys = ['resource_id'] - meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)') def __init__(self, target=None, **kwargs): diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 4614528bce..5c3b809f03 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -30,8 +30,6 @@ LOG = log.getLogger(__name__) class BaseConversionTransformer(transformer.TransformerBase): """Transformer to derive conversion.""" - grouping_keys = ['resource_id'] - def __init__(self, source=None, target=None, **kwargs): """Initialize transformer with configured parameters. diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 02101d9f97..9031b25855 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -262,7 +262,6 @@ function configure_ceilometer { if [[ -n "$CEILOMETER_COORDINATION_URL" ]]; then iniset $CEILOMETER_CONF coordination backend_url $CEILOMETER_COORDINATION_URL - iniset $CEILOMETER_CONF notification workload_partitioning True iniset $CEILOMETER_CONF notification workers $API_WORKERS fi diff --git a/doc/source/contributor/plugins.rst b/doc/source/contributor/plugins.rst index 6a7c50a97c..9b19807728 100644 --- a/doc/source/contributor/plugins.rst +++ b/doc/source/contributor/plugins.rst @@ -94,18 +94,9 @@ Additionally, it must set ``get_main_endpoints`` which provides endpoints to be added to the main queue listener in the notification agent. This main queue endpoint inherits :class:`ceilometer.pipeline.base.MainNotificationEndpoint` and defines which notification priorities to listen, normalises the data, -and redirects the data for pipeline processing or requeuing depending on -`workload_partitioning` configuration. +and redirects the data for pipeline processing. -If a pipeline is configured to support `workload_partitioning`, data from the -main queue endpoints are shared and requeued in internal queues. The -notification agent configures a second notification consumer to handle these -internal queues and pushes data to endpoints defined by -``get_interim_endpoints`` in the pipeline manager. These interim endpoints -define how to handle the shared, normalised data models for pipeline -processing - -Both main queue and interim queue notification endpoints should implement: +Notification endpoints should implement: ``event_types`` A sequence of strings defining the event types the endpoint should handle diff --git a/releasenotes/notes/remove-notification-workload-partitioning-2cef114fb2478e39.yaml b/releasenotes/notes/remove-notification-workload-partitioning-2cef114fb2478e39.yaml new file mode 100644 index 0000000000..d72f1f43c5 --- /dev/null +++ b/releasenotes/notes/remove-notification-workload-partitioning-2cef114fb2478e39.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + The deprecated workload partitioning for notification agent has been removed.