notification: remove workload partitioning

Workload partitioning has been quite fragile and poorly performing so it's not
advised to use it. It was useful for transformers: since transformers are going
away too, let's simplify the code base and remove it

Change-Id: Ief2f0e00d3c091f978084da153b0c76377772f28
This commit is contained in:
Julien Danjou 2018-07-06 15:18:17 +02:00
parent b5ec5e43c1
commit 9d90ce8d37
14 changed files with 24 additions and 689 deletions

View File

@ -1,5 +1,5 @@
# #
# Copyright 2017 Red Hat, Inc. # Copyright 2017-2018 Red Hat, Inc.
# Copyright 2012-2013 eNovance <licensing@enovance.com> # Copyright 2012-2013 eNovance <licensing@enovance.com>
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,45 +13,25 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import itertools
import threading
import time import time
import uuid
from concurrent import futures
import cotyledon import cotyledon
from futurist import periodics
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
import oslo_messaging import oslo_messaging
from stevedore import named from stevedore import named
from tooz import coordination
from ceilometer.i18n import _ from ceilometer.i18n import _
from ceilometer import messaging from ceilometer import messaging
from ceilometer import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
OPTS = [ OPTS = [
cfg.IntOpt('pipeline_processing_queues',
deprecated_for_removal=True,
default=10,
min=1,
help='Number of queues to parallelize workload across. This '
'value should be larger than the number of active '
'notification agents for optimal results. WARNING: '
'Once set, lowering this value may result in lost data.'),
cfg.BoolOpt('ack_on_event_error', cfg.BoolOpt('ack_on_event_error',
default=True, default=True,
help='Acknowledge message when event persistence fails.'), help='Acknowledge message when event persistence fails.'),
cfg.BoolOpt('workload_partitioning',
deprecated_for_removal=True,
default=False,
help='Enable workload partitioning, allowing multiple '
'notification agents to be run simultaneously.'),
cfg.MultiStrOpt('messaging_urls', cfg.MultiStrOpt('messaging_urls',
default=[], default=[],
secret=True, secret=True,
@ -68,10 +48,6 @@ OPTS = [
help='Number of notification messages to wait before ' help='Number of notification messages to wait before '
'publishing them. Batching is advised when transformations are ' 'publishing them. Batching is advised when transformations are '
'applied in pipeline.'), 'applied in pipeline.'),
cfg.IntOpt('batch_timeout',
default=5,
help='Number of seconds to wait before publishing samples '
'when batch_size is not reached (None means indefinitely)'),
cfg.IntOpt('workers', cfg.IntOpt('workers',
default=1, default=1,
min=1, min=1,
@ -114,25 +90,11 @@ class NotificationService(cotyledon.Service):
self.startup_delay = worker_id self.startup_delay = worker_id
self.conf = conf self.conf = conf
self.periodic = None
self.shutdown = False
self.listeners = [] self.listeners = []
# NOTE(kbespalov): for the pipeline queues used a single amqp host # NOTE(kbespalov): for the pipeline queues used a single amqp host
# hence only one listener is required # hence only one listener is required
self.pipeline_listener = None self.pipeline_listener = None
if self.conf.notification.workload_partitioning:
# XXX uuid4().bytes ought to work, but it requires ascii for now
coordination_id = (coordination_id or
str(uuid.uuid4()).encode('ascii'))
self.partition_coordinator = coordination.get_coordinator(
self.conf.coordination.backend_url, coordination_id)
self.partition_set = list(range(
self.conf.notification.pipeline_processing_queues))
self.group_state = None
else:
self.partition_coordinator = None
def get_targets(self): def get_targets(self):
"""Return a sequence of oslo_messaging.Target """Return a sequence of oslo_messaging.Target
@ -154,20 +116,15 @@ class NotificationService(cotyledon.Service):
time.sleep(self.startup_delay) time.sleep(self.startup_delay)
super(NotificationService, self).run() super(NotificationService, self).run()
self.coord_lock = threading.Lock()
self.managers = [ext.obj for ext in named.NamedExtensionManager( self.managers = [ext.obj for ext in named.NamedExtensionManager(
namespace='ceilometer.notification.pipeline', namespace='ceilometer.notification.pipeline',
names=self.conf.notification.pipelines, invoke_on_load=True, names=self.conf.notification.pipelines, invoke_on_load=True,
on_missing_entrypoints_callback=self._log_missing_pipeline, on_missing_entrypoints_callback=self._log_missing_pipeline,
invoke_args=(self.conf, invoke_args=(self.conf,))]
self.conf.notification.workload_partitioning))]
self.transport = messaging.get_transport(self.conf) self.transport = messaging.get_transport(self.conf)
if self.conf.notification.workload_partitioning:
self.partition_coordinator.start(start_heart=True)
else:
# FIXME(sileht): endpoint uses the notification_topics option # FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option # and it should not because this is an oslo_messaging option
# not a ceilometer. Until we have something to get the # not a ceilometer. Until we have something to get the
@ -175,28 +132,6 @@ class NotificationService(cotyledon.Service):
# to ensure the option has been registered by oslo_messaging. # to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '') messaging.get_notifier(self.transport, '')
self._configure_main_queue_listeners()
if self.conf.notification.workload_partitioning:
# join group after all manager set up is configured
self.hashring = self.partition_coordinator.join_partitioned_group(
self.NOTIFICATION_NAMESPACE)
@periodics.periodic(spacing=self.conf.coordination.check_watchers,
run_immediately=True)
def run_watchers():
self.partition_coordinator.run_watchers()
if self.group_state != self.hashring.ring.nodes:
self.group_state = self.hashring.ring.nodes.copy()
self._refresh_agent()
self.periodic = periodics.PeriodicWorker.create(
[], executor_factory=lambda:
futures.ThreadPoolExecutor(max_workers=10))
self.periodic.add(run_watchers)
utils.spawn_thread(self.periodic.start)
def _configure_main_queue_listeners(self):
endpoints = [] endpoints = []
for pipe_mgr in self.managers: for pipe_mgr in self.managers:
endpoints.extend(pipe_mgr.get_main_endpoints()) endpoints.extend(pipe_mgr.get_main_endpoints())
@ -214,41 +149,6 @@ class NotificationService(cotyledon.Service):
) )
self.listeners.append(listener) self.listeners.append(listener)
def _refresh_agent(self):
with self.coord_lock:
if self.shutdown:
# NOTE(sileht): We are going to shutdown we everything will be
# stopped, we should not restart them
return
self._configure_pipeline_listener()
def _configure_pipeline_listener(self):
partitioned = list(filter(
self.hashring.belongs_to_self, self.partition_set))
endpoints = []
for pipe_mgr in self.managers:
endpoints.extend(pipe_mgr.get_interim_endpoints())
targets = []
for mgr, hash_id in itertools.product(self.managers, partitioned):
topic = '-'.join([mgr.NOTIFICATION_IPC, mgr.pm_type, str(hash_id)])
LOG.debug('Listening to queue: %s', topic)
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.kill_listeners([self.pipeline_listener])
self.pipeline_listener = messaging.get_batch_notification_listener(
self.transport, targets, endpoints, allow_requeue=True,
batch_size=self.conf.notification.batch_size,
batch_timeout=self.conf.notification.batch_timeout)
# NOTE(gordc): set single thread to process data sequentially
# if batching enabled.
batch = (1 if self.conf.notification.batch_size > 1
else self.conf.max_parallel_requests)
self.pipeline_listener.start(override_pool_size=batch)
@staticmethod @staticmethod
def kill_listeners(listeners): def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(), # NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
@ -259,13 +159,6 @@ class NotificationService(cotyledon.Service):
listener.wait() listener.wait()
def terminate(self): def terminate(self):
self.shutdown = True
if self.periodic:
self.periodic.stop()
self.periodic.wait()
if self.partition_coordinator:
self.partition_coordinator.stop()
with self.coord_lock:
if self.pipeline_listener: if self.pipeline_listener:
self.kill_listeners([self.pipeline_listener]) self.kill_listeners([self.pipeline_listener])
self.kill_listeners(self.listeners) self.kill_listeners(self.listeners)

View File

@ -22,7 +22,6 @@ import oslo_messaging
import six import six
from ceilometer import agent from ceilometer import agent
from ceilometer import messaging
from ceilometer import publisher from ceilometer import publisher
OPTS = [ OPTS = [
@ -45,52 +44,6 @@ class PipelineException(agent.ConfigException):
super(PipelineException, self).__init__('Pipeline', message, cfg) super(PipelineException, self).__init__('Pipeline', message, cfg)
class InterimPublishContext(object):
"""Publisher to hash/shard data to pipelines"""
def __init__(self, conf, mgr):
self.conf = conf
self.mgr = mgr
self.notifiers = self._get_notifiers(messaging.get_transport(conf))
def _get_notifiers(self, transport):
notifiers = []
for x in range(self.conf.notification.pipeline_processing_queues):
notifiers.append(oslo_messaging.Notifier(
transport,
driver=self.conf.publisher_notifier.telemetry_driver,
topics=['-'.join(
[self.mgr.NOTIFICATION_IPC, self.mgr.pm_type, str(x)])]))
return notifiers
@staticmethod
def hash_grouping(datapoint, grouping_keys):
# FIXME(gordc): this logic only supports a single grouping_key. we
# need to change to support pipeline with multiple transformers and
# different grouping_keys
value = ''
for key in grouping_keys or []:
value += datapoint.get(key) if datapoint.get(key) else ''
return hash(value)
def __enter__(self):
def p(data):
data = [data] if not isinstance(data, list) else data
for datapoint in data:
for pipe in self.mgr.pipelines:
if pipe.supported(datapoint):
serialized_data = pipe.serializer(datapoint)
key = (self.hash_grouping(serialized_data,
pipe.get_grouping_key())
% len(self.notifiers))
self.notifiers[key].sample({}, event_type=pipe.name,
payload=[serialized_data])
return p
def __exit__(self, exc_type, exc_value, traceback):
pass
class PublishContext(object): class PublishContext(object):
def __init__(self, pipelines): def __init__(self, pipelines):
self.pipelines = pipelines or [] self.pipelines = pipelines or []
@ -239,24 +192,10 @@ class Pipeline(object):
def publish_data(self, data): def publish_data(self, data):
"""Publish data from pipeline.""" """Publish data from pipeline."""
@abc.abstractproperty
def default_grouping_key(self):
"""Attribute to hash data on. Pass if no partitioning."""
@abc.abstractmethod @abc.abstractmethod
def supported(self, data): def supported(self, data):
"""Attribute to filter on. Pass if no partitioning.""" """Attribute to filter on. Pass if no partitioning."""
@abc.abstractmethod
def serializer(self, data):
"""Serialize data for interim transport. Pass if no partitioning."""
def get_grouping_key(self):
keys = []
for transformer in self.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys)) or self.default_grouping_key
class PublisherManager(object): class PublisherManager(object):
def __init__(self, conf, purpose): def __init__(self, conf, purpose):
@ -281,7 +220,7 @@ class PipelineManager(agent.ConfigManagerBase):
NOTIFICATION_IPC = 'ceilometer_ipc' NOTIFICATION_IPC = 'ceilometer_ipc'
def __init__(self, conf, cfg_file, transformer_manager, partition): def __init__(self, conf, cfg_file, transformer_manager):
"""Setup the pipelines according to config. """Setup the pipelines according to config.
The configuration is supported as follows: The configuration is supported as follows:
@ -381,7 +320,6 @@ class PipelineManager(agent.ConfigManagerBase):
unique_names.add(pipe.name) unique_names.add(pipe.name)
self.pipelines.append(pipe) self.pipelines.append(pipe)
unique_names.clear() unique_names.clear()
self.partition = partition
@abc.abstractproperty @abc.abstractproperty
def pm_type(self): def pm_type(self):
@ -403,23 +341,10 @@ class PipelineManager(agent.ConfigManagerBase):
"""Build publisher for pipeline publishing.""" """Build publisher for pipeline publishing."""
return PublishContext(self.pipelines) return PublishContext(self.pipelines)
def interim_publisher(self):
"""Build publishing context for IPC."""
return InterimPublishContext(self.conf, self)
def get_main_publisher(self):
"""Return the publishing context to use"""
return (self.interim_publisher() if self.partition else
self.publisher())
def get_main_endpoints(self): def get_main_endpoints(self):
"""Return endpoints for main queue.""" """Return endpoints for main queue."""
pass pass
def get_interim_endpoints(self):
"""Return endpoints for interim pipeline queues."""
pass
class NotificationEndpoint(object): class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API.""" """Base Endpoint for plugins that support the notification API."""

View File

@ -11,18 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from itertools import chain
from oslo_log import log from oslo_log import log
import oslo_messaging import oslo_messaging
from oslo_utils import timeutils
from stevedore import extension from stevedore import extension
from ceilometer import agent from ceilometer import agent
from ceilometer.event import converter from ceilometer.event import converter
from ceilometer.event import models
from ceilometer.pipeline import base from ceilometer.pipeline import base
from ceilometer.publisher import utils as publisher_utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -67,39 +62,6 @@ class EventEndpoint(base.MainNotificationEndpoint):
return oslo_messaging.NotificationResult.HANDLED return oslo_messaging.NotificationResult.HANDLED
class InterimEventEndpoint(base.NotificationEndpoint):
def __init__(self, conf, publisher, pipe_name):
self.event_types = [pipe_name]
super(InterimEventEndpoint, self).__init__(conf, publisher)
def sample(self, notifications):
return self.process_notifications('sample', notifications)
def process_notifications(self, priority, notifications):
events = chain.from_iterable(m["payload"] for m in notifications)
events = [
models.Event(
message_id=ev['message_id'],
event_type=ev['event_type'],
generated=timeutils.normalize_time(
timeutils.parse_isotime(ev['generated'])),
traits=[models.Trait(name, dtype,
models.Trait.convert_value(dtype, value))
for name, dtype, value in ev['traits']],
raw=ev.get('raw', {}))
for ev in events if publisher_utils.verify_signature(
ev, self.conf.publisher.telemetry_secret)
]
try:
with self.publisher as p:
p(events)
except Exception:
if not self.conf.notification.ack_on_event_error:
return oslo_messaging.NotificationResult.REQUEUE
raise
return oslo_messaging.NotificationResult.HANDLED
class EventSource(base.PipelineSource): class EventSource(base.PipelineSource):
"""Represents a source of events. """Represents a source of events.
@ -140,8 +102,6 @@ class EventSink(base.Sink):
class EventPipeline(base.Pipeline): class EventPipeline(base.Pipeline):
"""Represents a pipeline for Events.""" """Represents a pipeline for Events."""
default_grouping_key = ['event_type']
def __str__(self): def __str__(self):
# NOTE(gordc): prepend a namespace so we ensure event and sample # NOTE(gordc): prepend a namespace so we ensure event and sample
# pipelines do not have the same name. # pipelines do not have the same name.
@ -153,10 +113,6 @@ class EventPipeline(base.Pipeline):
supported = [e for e in events if self.supported(e)] supported = [e for e in events if self.supported(e)]
self.sink.publish_events(supported) self.sink.publish_events(supported)
def serializer(self, event):
return publisher_utils.message_from_event(
event, self.conf.publisher.telemetry_secret)
def supported(self, event): def supported(self, event):
return self.source.support_event(event.event_type) return self.source.support_event(event.event_type)
@ -168,17 +124,9 @@ class EventPipelineManager(base.PipelineManager):
pm_source = EventSource pm_source = EventSource
pm_sink = EventSink pm_sink = EventSink
def __init__(self, conf, partition=False): def __init__(self, conf):
super(EventPipelineManager, self).__init__( super(EventPipelineManager, self).__init__(
conf, conf.event_pipeline_cfg_file, {}, partition) conf, conf.event_pipeline_cfg_file, {})
def get_main_endpoints(self): def get_main_endpoints(self):
return [EventEndpoint(self.conf, self.get_main_publisher())] return [EventEndpoint(self.conf, self.publisher())]
def get_interim_endpoints(self):
# FIXME(gordc): change this so we shard data rather than per
# pipeline. this will allow us to use self.publisher and less
# queues.
return [InterimEventEndpoint(
self.conf, base.PublishContext([pipe]), pipe.name)
for pipe in self.pipelines]

View File

@ -10,15 +10,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from itertools import chain
from oslo_log import log from oslo_log import log
from stevedore import extension from stevedore import extension
from ceilometer import agent from ceilometer import agent
from ceilometer.pipeline import base from ceilometer.pipeline import base
from ceilometer.publisher import utils as publisher_utils
from ceilometer import sample as sample_util
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -52,37 +48,6 @@ class SampleEndpoint(base.MainNotificationEndpoint):
pass pass
class InterimSampleEndpoint(base.NotificationEndpoint):
def __init__(self, conf, publisher, pipe_name):
self.event_types = [pipe_name]
super(InterimSampleEndpoint, self).__init__(conf, publisher)
def sample(self, notifications):
return self.process_notifications('sample', notifications)
def process_notifications(self, priority, notifications):
samples = chain.from_iterable(m["payload"] for m in notifications)
samples = [
sample_util.Sample(name=s['counter_name'],
type=s['counter_type'],
unit=s['counter_unit'],
volume=s['counter_volume'],
user_id=s['user_id'],
project_id=s['project_id'],
resource_id=s['resource_id'],
timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'],
source=s.get('source'),
# NOTE(sileht): May come from an older node,
# Put None in this case.
monotonic_time=s.get('monotonic_time'))
for s in samples if publisher_utils.verify_signature(
s, self.conf.publisher.telemetry_secret)
]
with self.publisher as p:
p(samples)
class SampleSource(base.PipelineSource): class SampleSource(base.PipelineSource):
"""Represents a source of samples. """Represents a source of samples.
@ -181,8 +146,6 @@ class SampleSink(base.Sink):
class SamplePipeline(base.Pipeline): class SamplePipeline(base.Pipeline):
"""Represents a pipeline for Samples.""" """Represents a pipeline for Samples."""
default_grouping_key = ['resource_id']
def _validate_volume(self, s): def _validate_volume(self, s):
volume = s.volume volume = s.volume
if volume is None: if volume is None:
@ -219,10 +182,6 @@ class SamplePipeline(base.Pipeline):
and self._validate_volume(s)] and self._validate_volume(s)]
self.sink.publish_samples(supported) self.sink.publish_samples(supported)
def serializer(self, sample):
return publisher_utils.meter_message_from_counter(
sample, self.conf.publisher.telemetry_secret)
def supported(self, sample): def supported(self, sample):
return self.source.support_meter(sample.name) return self.source.support_meter(sample.name)
@ -234,10 +193,9 @@ class SamplePipelineManager(base.PipelineManager):
pm_source = SampleSource pm_source = SampleSource
pm_sink = SampleSink pm_sink = SampleSink
def __init__(self, conf, partition=False): def __init__(self, conf):
super(SamplePipelineManager, self).__init__( super(SamplePipelineManager, self).__init__(
conf, conf.pipeline_cfg_file, self.get_transform_manager(), conf, conf.pipeline_cfg_file, self.get_transform_manager())
partition)
@staticmethod @staticmethod
def get_transform_manager(): def get_transform_manager():
@ -247,13 +205,5 @@ class SamplePipelineManager(base.PipelineManager):
exts = extension.ExtensionManager( exts = extension.ExtensionManager(
namespace='ceilometer.sample.endpoint', namespace='ceilometer.sample.endpoint',
invoke_on_load=True, invoke_on_load=True,
invoke_args=(self.conf, self.get_main_publisher())) invoke_args=(self.conf, self.publisher()))
return [ext.obj for ext in exts] return [ext.obj for ext in exts]
def get_interim_endpoints(self):
# FIXME(gordc): change this so we shard data rather than per
# pipeline. this will allow us to use self.publisher and less
# queues.
return [InterimSampleEndpoint(
self.conf, base.PublishContext([pipe]), pipe.name)
for pipe in self.pipelines]

View File

@ -75,7 +75,6 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClass(transformer.TransformerBase): class TransformerClass(transformer.TransformerBase):
samples = [] samples = []
grouping_keys = ['counter_name']
def __init__(self, append_name='_update'): def __init__(self, append_name='_update'):
self.__class__.samples = [] self.__class__.samples = []
@ -102,7 +101,6 @@ class BasePipelineTestCase(base.BaseTestCase):
class TransformerClassDrop(transformer.TransformerBase): class TransformerClassDrop(transformer.TransformerBase):
samples = [] samples = []
grouping_keys = ['resource_id']
def __init__(self): def __init__(self):
self.__class__.samples = [] self.__class__.samples = []
@ -111,7 +109,6 @@ class BasePipelineTestCase(base.BaseTestCase):
self.__class__.samples.append(counter) self.__class__.samples.append(counter)
class TransformerClassException(object): class TransformerClassException(object):
grouping_keys = ['resource_id']
@staticmethod @staticmethod
def handle_sample(counter): def handle_sample(counter):
@ -2171,46 +2168,3 @@ class BasePipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self): def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg() self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager() self._exception_create_pipelinemanager()
def test_get_pipeline_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._build_and_set_new_pipeline()
pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
self.assertEqual(set(['resource_id', 'counter_name']),
set(pipeline_manager.pipelines[0].get_grouping_key()))
def test_get_pipeline_duplicate_grouping_key(self):
transformer_cfg = [
{
'name': 'update',
'parameters': {}
},
{
'name': 'update',
'parameters': {}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._build_and_set_new_pipeline()
pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
self.assertEqual(['counter_name'],
pipeline_manager.pipelines[0].get_grouping_key())

View File

@ -16,15 +16,12 @@ import traceback
import uuid import uuid
import fixtures import fixtures
import mock
import oslo_messaging
from ceilometer.event import models from ceilometer.event import models
from ceilometer.pipeline import base as pipeline from ceilometer.pipeline import base as pipeline
from ceilometer.pipeline import event from ceilometer.pipeline import event
from ceilometer import publisher from ceilometer import publisher
from ceilometer.publisher import test as test_publisher from ceilometer.publisher import test as test_publisher
from ceilometer.publisher import utils
from ceilometer import service from ceilometer import service
from ceilometer.tests import base from ceilometer.tests import base
@ -357,40 +354,3 @@ class EventPipelineTestCase(base.BaseTestCase):
def test_unique_pipeline_names(self): def test_unique_pipeline_names(self):
self._dup_pipeline_name_cfg() self._dup_pipeline_name_cfg()
self._exception_create_pipelinemanager() self._exception_create_pipelinemanager()
def test_event_pipeline_endpoint_requeue_on_failure(self):
self.CONF.set_override("ack_on_event_error", False,
group="notification")
self.CONF.set_override("telemetry_secret", "not-so-secret",
group="publisher")
test_data = {
'message_id': uuid.uuid4(),
'event_type': 'a',
'generated': '2013-08-08 21:06:37.803826',
'traits': [
{'name': 't_text',
'value': 1,
'dtype': 'text_trait'
}
],
'raw': {'status': 'started'}
}
message_sign = utils.compute_signature(test_data, 'not-so-secret')
test_data['message_signature'] = message_sign
fake_publisher = mock.Mock()
self.useFixture(fixtures.MockPatch(
'ceilometer.publisher.test.TestPublisher',
return_value=fake_publisher))
self._build_and_set_new_pipeline()
pipeline_manager = event.EventPipelineManager(self.CONF)
pipe = pipeline_manager.pipelines[0]
event_pipeline_endpoint = event.InterimEventEndpoint(
self.CONF, pipeline.PublishContext([pipe]), pipe.name)
fake_publisher.publish_events.side_effect = Exception
ret = event_pipeline_endpoint.sample([
{'ctxt': {}, 'publisher_id': 'compute.vagrant-precise',
'event_type': 'a', 'payload': [test_data], 'metadata': {}}])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)

View File

@ -17,7 +17,6 @@
import time import time
import mock import mock
import oslo_messaging
from oslo_utils import fileutils from oslo_utils import fileutils
import six import six
import yaml import yaml
@ -84,14 +83,6 @@ class BaseNotificationTest(tests_base.BaseTestCase):
def run_service(self, srv): def run_service(self, srv):
srv.run() srv.run()
self.addCleanup(srv.terminate) self.addCleanup(srv.terminate)
if srv.conf.notification.workload_partitioning:
start = time.time()
while time.time() - start < 10:
if srv.group_state and srv.pipeline_listener:
break # ensure pipeline is set if HA
time.sleep(0.1)
else:
self.fail('Did not start pipeline queues')
class TestNotification(BaseNotificationTest): class TestNotification(BaseNotificationTest):
@ -242,273 +233,3 @@ class TestRealNotification(BaseRealNotification):
if len(self.publisher.events) >= self.expected_events: if len(self.publisher.events) >= self.expected_events:
break break
self.assertEqual(self.expected_events, len(self.publisher.events)) self.assertEqual(self.expected_events, len(self.publisher.events))
class TestRealNotificationHA(BaseRealNotification):
def setUp(self):
super(TestRealNotificationHA, self).setUp()
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.srv = notification.NotificationService(0, self.CONF)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'stop')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'wait')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
def test_notification_threads(self, m_listener, m_wait, m_stop):
self.CONF.set_override('batch_size', 1, group='notification')
self.srv.run()
m_listener.assert_called_with(
override_pool_size=self.CONF.max_parallel_requests)
m_listener.reset_mock()
self.CONF.set_override('batch_size', 2, group='notification')
self.srv._refresh_agent()
m_listener.assert_called_with(override_pool_size=1)
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_reset_listener_on_refresh(self, mock_listener):
mock_listener.side_effect = [
mock.MagicMock(), # main listener
mock.MagicMock(), # pipeline listener
mock.MagicMock(), # refresh pipeline listener
]
self.run_service(self.srv)
listener = self.srv.pipeline_listener
self.srv._refresh_agent()
self.assertIsNot(listener, self.srv.pipeline_listener)
def test_hashring_targets(self):
maybe = {"maybe": 0}
def _once_over_five(item):
maybe["maybe"] += 1
return maybe["maybe"] % 5 == 0
hashring = mock.MagicMock()
hashring.belongs_to_self = _once_over_five
self.srv.partition_coordinator = pc = mock.MagicMock()
pc.join_partitioned_group.return_value = hashring
self.run_service(self.srv)
topics = [target.topic for target in
self.srv.pipeline_listener.targets]
self.assertEqual(4, len(topics))
self.assertEqual(
{'ceilometer_ipc-sample-4', 'ceilometer_ipc-sample-9',
'ceilometer_ipc-event-4', 'ceilometer_ipc-event-9'},
set(topics))
@mock.patch('oslo_messaging.get_batch_notification_listener')
def test_notify_to_relevant_endpoint(self, mock_listener):
self.run_service(self.srv)
targets = mock_listener.call_args[0][1]
self.assertIsNotEmpty(targets)
pipe_list = []
for mgr in self.srv.managers:
for pipe in mgr.pipelines:
pipe_list.append(pipe.name)
for pipe in pipe_list:
for endpoint in mock_listener.call_args[0][2]:
self.assertTrue(hasattr(endpoint, 'filter_rule'))
if endpoint.filter_rule.match(None, None, pipe, None, None):
break
else:
self.fail('%s not handled by any endpoint' % pipe)
@mock.patch('oslo_messaging.Notifier.sample')
def test_broadcast_to_relevant_pipes_only(self, mock_notifier):
self.run_service(self.srv)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None, 'nonmatching.end',
None, None)):
continue
endpoint.info([{
'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'nonmatching.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertFalse(mock_notifier.called)
for endpoint in self.srv.listeners[0].dispatcher.endpoints:
if (hasattr(endpoint, 'filter_rule') and
not endpoint.filter_rule.match(None, None,
'compute.instance.create.end',
None, None)):
continue
endpoint.info([{
'ctxt': TEST_NOTICE_CTXT,
'publisher_id': 'compute.vagrant-precise',
'event_type': 'compute.instance.create.end',
'payload': TEST_NOTICE_PAYLOAD,
'metadata': TEST_NOTICE_METADATA}])
self.assertTrue(mock_notifier.called)
self.assertEqual(3, mock_notifier.call_count)
self.assertEqual(1, len([i for i in mock_notifier.call_args_list
if 'event_type' in i[1]['payload'][0]]))
self.assertEqual(2, len([i for i in mock_notifier.call_args_list
if 'counter_name' in i[1]['payload'][0]]))
class TestRealNotificationMultipleAgents(BaseNotificationTest):
def setup_pipeline(self, transformers):
pipeline = yaml.dump({
'sources': [{
'name': 'test_pipeline',
'interval': 5,
'meters': ['vcpus', 'memory'],
'sinks': ['test_sink']
}],
'sinks': [{
'name': 'test_sink',
'transformers': transformers,
'publishers': ['test://']
}]
})
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
return pipeline_cfg_file
def setup_event_pipeline(self):
pipeline = yaml.dump({
'sources': [],
'sinks': []
})
if six.PY3:
pipeline = pipeline.encode('utf-8')
pipeline_cfg_file = fileutils.write_to_tempfile(
content=pipeline, prefix="event_pipeline", suffix="yaml")
return pipeline_cfg_file
def setUp(self):
super(TestRealNotificationMultipleAgents, self).setUp()
self.CONF = service.prepare_service([], [])
self.setup_messaging(self.CONF, 'nova')
pipeline_cfg_file = self.setup_pipeline(['instance', 'memory'])
event_pipeline_cfg_file = self.setup_event_pipeline()
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("event_pipeline_cfg_file",
event_pipeline_cfg_file)
self.CONF.set_override("backend_url", "zake://", group="coordination")
self.CONF.set_override('workload_partitioning', True,
group='notification')
self.CONF.set_override('pipeline_processing_queues', 2,
group='notification')
self.CONF.set_override('check_watchers', 1, group='coordination')
self.publisher = test_publisher.TestPublisher(self.CONF, "")
self.publisher2 = test_publisher.TestPublisher(self.CONF, "")
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]
maybe = {"srv": 0, "srv2": -1}
def _sometimes_srv(item):
maybe["srv"] += 1
return (maybe["srv"] % 2) == 0
self.srv = notification.NotificationService(0, self.CONF)
self.srv.partition_coordinator = pc = mock.MagicMock()
hashring_srv1 = mock.MagicMock()
hashring_srv1.belongs_to_self = _sometimes_srv
hashring_srv1.ring.nodes = {'id1': mock.Mock()}
pc.join_partitioned_group.return_value = hashring_srv1
self.run_service(self.srv)
def _sometimes_srv2(item):
maybe["srv2"] += 1
return (maybe["srv2"] % 2) == 0
self.srv2 = notification.NotificationService(0, self.CONF)
self.srv2.partition_coordinator = pc = mock.MagicMock()
hashring = mock.MagicMock()
hashring.belongs_to_self = _sometimes_srv2
hashring.ring.nodes = {'id1': mock.Mock(), 'id2': mock.Mock()}
self.srv.hashring.ring.nodes = hashring.ring.nodes.copy()
pc.join_partitioned_group.return_value = hashring
self.run_service(self.srv2)
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
payload1 = TEST_NOTICE_PAYLOAD.copy()
payload1['instance_id'] = '0'
notifier.info({}, 'compute.instance.create.end', payload1)
payload2 = TEST_NOTICE_PAYLOAD.copy()
payload2['instance_id'] = '1'
notifier.info({}, 'compute.instance.create.end', payload2)
self.expected_samples = 4
with mock.patch('six.moves.builtins.hash', lambda x: int(x)):
start = time.time()
while time.time() - start < 10:
if (len(self.publisher.samples + self.publisher2.samples) >=
self.expected_samples and
len(self.srv.group_state) == 2):
break
time.sleep(0.1)
self.assertEqual(2, len(self.publisher.samples))
self.assertEqual(2, len(self.publisher2.samples))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher.samples)))
self.assertEqual(1, len(set(
s.resource_id for s in self.publisher2.samples)))
self.assertEqual(2, len(self.srv.group_state))
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_no_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline([])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_multiple_agents_multiple_transform(self, fake_publisher_cls):
pipeline_cfg_file = self.setup_pipeline(
[{
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}, {
'name': 'unit_conversion',
'parameters': {
'source': {},
'target': {'name': 'cpu_mins',
'unit': 'min',
'scale': 'volume'},
}
}])
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self._check_notifications(fake_publisher_cls)

View File

@ -42,10 +42,6 @@ class TransformerBase(object):
:param sample: A sample. :param sample: A sample.
""" """
@abc.abstractproperty
def grouping_keys(self):
"""Keys used to group transformer."""
@staticmethod @staticmethod
def flush(): def flush():
"""Flush samples cached previously.""" """Flush samples cached previously."""

View File

@ -22,8 +22,6 @@ class TransformerAccumulator(transformer.TransformerBase):
And then flushes them out into the wild. And then flushes them out into the wild.
""" """
grouping_keys = ['resource_id']
def __init__(self, size=1, **kwargs): def __init__(self, size=1, **kwargs):
if size >= 1: if size >= 1:
self.samples = [] self.samples = []

View File

@ -36,8 +36,6 @@ class ArithmeticTransformer(transformer.TransformerBase):
over one or more meters and/or their metadata. over one or more meters and/or their metadata.
""" """
grouping_keys = ['resource_id']
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)') meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
def __init__(self, target=None, **kwargs): def __init__(self, target=None, **kwargs):

View File

@ -30,8 +30,6 @@ LOG = log.getLogger(__name__)
class BaseConversionTransformer(transformer.TransformerBase): class BaseConversionTransformer(transformer.TransformerBase):
"""Transformer to derive conversion.""" """Transformer to derive conversion."""
grouping_keys = ['resource_id']
def __init__(self, source=None, target=None, **kwargs): def __init__(self, source=None, target=None, **kwargs):
"""Initialize transformer with configured parameters. """Initialize transformer with configured parameters.

View File

@ -262,7 +262,6 @@ function configure_ceilometer {
if [[ -n "$CEILOMETER_COORDINATION_URL" ]]; then if [[ -n "$CEILOMETER_COORDINATION_URL" ]]; then
iniset $CEILOMETER_CONF coordination backend_url $CEILOMETER_COORDINATION_URL iniset $CEILOMETER_CONF coordination backend_url $CEILOMETER_COORDINATION_URL
iniset $CEILOMETER_CONF notification workload_partitioning True
iniset $CEILOMETER_CONF notification workers $API_WORKERS iniset $CEILOMETER_CONF notification workers $API_WORKERS
fi fi

View File

@ -94,18 +94,9 @@ Additionally, it must set ``get_main_endpoints`` which provides endpoints to be
added to the main queue listener in the notification agent. This main queue added to the main queue listener in the notification agent. This main queue
endpoint inherits :class:`ceilometer.pipeline.base.MainNotificationEndpoint` endpoint inherits :class:`ceilometer.pipeline.base.MainNotificationEndpoint`
and defines which notification priorities to listen, normalises the data, and defines which notification priorities to listen, normalises the data,
and redirects the data for pipeline processing or requeuing depending on and redirects the data for pipeline processing.
`workload_partitioning` configuration.
If a pipeline is configured to support `workload_partitioning`, data from the Notification endpoints should implement:
main queue endpoints are shared and requeued in internal queues. The
notification agent configures a second notification consumer to handle these
internal queues and pushes data to endpoints defined by
``get_interim_endpoints`` in the pipeline manager. These interim endpoints
define how to handle the shared, normalised data models for pipeline
processing
Both main queue and interim queue notification endpoints should implement:
``event_types`` ``event_types``
A sequence of strings defining the event types the endpoint should handle A sequence of strings defining the event types the endpoint should handle

View File

@ -0,0 +1,4 @@
---
upgrade:
- |
The deprecated workload partitioning for notification agent has been removed.