collector: use an intermediate proxy class for event dispatcher

This makes sure that the base class EventDispatcherBase does not owns
any code itself. This makes external plugin easier to implement as they
don't _have_ to inherit from EventDispatcherBase, they just need to
implement a `record_events(events)' method and that's it.

This will allow Panko to provide the database dispatcher without having
a hard dependency on Ceilometer.

Change-Id: I094aed1e7e081fe1de130d987035a6c9d977d25e
This commit is contained in:
Julien Danjou 2016-05-10 18:26:07 +02:00
parent bb35821852
commit 10a36ae7b4
8 changed files with 120 additions and 131 deletions

View File

@ -24,8 +24,9 @@ from oslo_utils import netutils
from oslo_utils import units
from ceilometer import dispatcher
from ceilometer.i18n import _, _LE
from ceilometer.i18n import _, _LE, _LW
from ceilometer import messaging
from ceilometer.publisher import utils as publisher_utils
from ceilometer import service_base
from ceilometer import utils
@ -81,7 +82,8 @@ class CollectorService(service_base.ServiceBase):
self.sample_listener = (
messaging.get_batch_notification_listener(
transport, [sample_target],
[SampleEndpoint(self.meter_manager)],
[SampleEndpoint(cfg.CONF.publisher.telemetry_secret,
self.meter_manager)],
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
@ -93,7 +95,8 @@ class CollectorService(service_base.ServiceBase):
self.event_listener = (
messaging.get_batch_notification_listener(
transport, [event_target],
[EventEndpoint(self.event_manager)],
[EventEndpoint(cfg.CONF.publisher.telemetry_secret,
self.event_manager)],
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
@ -118,12 +121,17 @@ class CollectorService(service_base.ServiceBase):
except Exception:
LOG.warning(_("UDP: Cannot decode data sent by %s"), source)
else:
try:
LOG.debug("UDP: Storing %s", sample)
self.meter_manager.map_method(
'verify_and_record_metering_data', sample)
except Exception:
LOG.exception(_("UDP: Unable to store meter"))
if publisher_utils.verify_signature(
sample, cfg.CONF.publisher.telemetry_secret):
try:
LOG.debug("UDP: Storing %s", sample)
self.meter_manager.map_method(
'record_metering_data', sample)
except Exception:
LOG.exception(_("UDP: Unable to store meter"))
else:
LOG.warning(_LW('sample signature invalid, '
'discarding: %s'), sample)
def stop(self):
if self.sample_listener:
@ -137,7 +145,8 @@ class CollectorService(service_base.ServiceBase):
class CollectorEndpoint(object):
def __init__(self, dispatcher_manager):
def __init__(self, secret, dispatcher_manager):
self.secret = secret
self.dispatcher_manager = dispatcher_manager
def sample(self, messages):
@ -146,20 +155,24 @@ class CollectorEndpoint(object):
When another service sends a notification over the message
bus, this method receives it.
"""
samples = list(chain.from_iterable(m["payload"] for m in messages))
goods = []
for sample in chain.from_iterable(m["payload"] for m in messages):
if publisher_utils.verify_signature(sample, self.secret):
goods.append(sample)
else:
LOG.warning(_LW('notification signature invalid, '
'discarding: %s'), sample)
try:
self.dispatcher_manager.map_method(self.method, samples)
self.dispatcher_manager.map_method(self.method, goods)
except Exception:
LOG.exception(_LE("Dispatcher failed to handle the %s, "
"requeue it."), self.ep_type)
LOG.exception(_LE("Dispatcher failed to handle the notification, "
"re-queuing it."))
return oslo_messaging.NotificationResult.REQUEUE
class SampleEndpoint(CollectorEndpoint):
method = 'verify_and_record_metering_data'
ep_type = 'sample'
method = 'record_metering_data'
class EventEndpoint(CollectorEndpoint):
method = 'verify_and_record_events'
ep_type = 'event'
method = 'record_events'

View File

@ -21,7 +21,6 @@ import six
from stevedore import named
from ceilometer.i18n import _LW
from ceilometer.publisher import utils
LOG = log.getLogger(__name__)
@ -87,36 +86,9 @@ class MeterDispatcherBase(Base):
def record_metering_data(self, data):
"""Recording metering data interface."""
def verify_and_record_metering_data(self, datapoints):
"""Verify metering data's signature and record valid ones."""
if not isinstance(datapoints, list):
datapoints = [datapoints]
valid_datapoints = []
for datapoint in datapoints:
if utils.verify_signature(datapoint,
self.conf.publisher.telemetry_secret):
valid_datapoints.append(datapoint)
else:
LOG.warning(_LW('Message signature is invalid, discarding '
'it: <%r>.'), datapoint)
return self.record_metering_data(valid_datapoints)
@six.add_metaclass(abc.ABCMeta)
class EventDispatcherBase(Base):
@abc.abstractmethod
def record_events(self, events):
"""Record events."""
def verify_and_record_events(self, events):
"""Verify event signature and record them."""
goods = []
for event in events:
if utils.verify_signature(
event, self.conf.publisher.telemetry_secret):
goods.append(event)
else:
LOG.warning(_LW(
'event signature invalid, discarding event: %s'), event)
return self.record_events(goods)

View File

@ -48,17 +48,18 @@ class TestCollector(tests_base.BaseTestCase):
group='publisher')
self._setup_messaging()
self.counter = sample.Sample(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={},
).as_dict()
self.sample = utils.meter_message_from_counter(
sample.Sample(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={},
), self.CONF.publisher.telemetry_secret)
self.utf8_msg = utils.meter_message_from_counter(
sample.Sample(
@ -115,13 +116,8 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_base(self):
self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher()
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket') as mock_socket:
mock_socket.return_value = udp_socket
@ -132,14 +128,14 @@ class TestCollector(tests_base.BaseTestCase):
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM)
self._verify_udp_socket(udp_socket)
mock_record = mock_dispatcher.verify_and_record_metering_data
mock_record.assert_called_once_with(self.counter)
mock_record = mock_dispatcher.record_metering_data
mock_record.assert_called_once_with(self.sample)
def test_udp_socket_ipv6(self):
self._setup_messaging(False)
self.CONF.set_override('udp_address', '::1', group='collector')
self._setup_fake_dispatcher()
sock = self._make_fake_socket('data')
sock = self._make_fake_socket(self.sample)
with mock.patch.object(socket, 'socket') as mock_socket:
mock_socket.return_value = sock
@ -152,16 +148,10 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_storage_error(self):
self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher()
mock_record = mock_dispatcher.verify_and_record_metering_data
mock_record = mock_dispatcher.record_metering_data
mock_record.side_effect = self._raise_error
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start()
self.addCleanup(self.srv.stop)
@ -170,7 +160,7 @@ class TestCollector(tests_base.BaseTestCase):
self._verify_udp_socket(udp_socket)
mock_record.assert_called_once_with(self.counter)
mock_record.assert_called_once_with(self.sample)
@staticmethod
def _raise_error(*args, **kwargs):
@ -179,7 +169,7 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_bad_decoding(self):
self._setup_messaging(False)
self._setup_fake_dispatcher()
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket', return_value=udp_socket):
with mock.patch('msgpack.loads', self._raise_error):
self.srv.start()
@ -194,7 +184,7 @@ class TestCollector(tests_base.BaseTestCase):
"""Check that only UDP is started if messaging transport is unset."""
self._setup_messaging(False)
self._setup_fake_dispatcher()
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
real_start = oslo_messaging.MessageHandlingServer.start
with mock.patch.object(oslo_messaging.MessageHandlingServer,
'start', side_effect=real_start) as rpc_start:
@ -224,10 +214,9 @@ class TestCollector(tests_base.BaseTestCase):
mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
mock_record = mock_dispatcher.verify_and_record_metering_data
mock_record = mock_dispatcher.record_metering_data
mock_record.side_effect = Exception('boom')
mock_dispatcher.verify_and_record_events.side_effect = Exception(
'boom')
mock_dispatcher.record_events.side_effect = Exception('boom')
self.srv.start()
self.addCleanup(self.srv.stop)

View File

@ -44,28 +44,6 @@ class TestDispatcherDB(base.BaseTestCase):
self.dispatcher.record_events(event)
self.assertEqual(1, len(record_events.call_args_list[0][0][0]))
@mock.patch('ceilometer.publisher.utils.verify_signature')
def test_event_with_bad_signature(self, mocked_verify):
event = event_models.Event(uuid.uuid4(), 'test',
datetime.datetime(2012, 7, 2, 13, 53, 40),
[], {}).serialize()
def _fake_verify(ev, secret):
if ev.get('message_signature') == 'bad_signature':
return False
return True
mocked_verify.side_effect = _fake_verify
with mock.patch.object(self.dispatcher.event_conn,
'record_events') as record_events:
event['message_signature'] = 'bad_signature'
self.dispatcher.verify_and_record_events([event])
self.assertEqual([], record_events.call_args_list[0][0][0])
del event['message_signature']
event['message_signature'] = utils.compute_signature(
event, self.CONF.publisher.telemetry_secret)
self.dispatcher.verify_and_record_events([event])
self.assertEqual(1, len(record_events.call_args_list[1][0][0]))
def test_valid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
@ -77,30 +55,10 @@ class TestDispatcherDB(base.BaseTestCase):
with mock.patch.object(self.dispatcher.meter_conn,
'record_metering_data') as record_metering_data:
self.dispatcher.verify_and_record_metering_data(msg)
self.dispatcher.record_metering_data(msg)
record_metering_data.assert_called_once_with(msg)
def test_invalid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
'message_signature': 'invalid-signature'}
class ErrorConnection(object):
called = False
def record_metering_data_batch(self, data):
self.called = True
self.dispatcher._meter_conn = ErrorConnection()
self.dispatcher.verify_and_record_metering_data(msg)
if self.dispatcher.meter_conn.called:
self.fail('Should not have called the storage connection')
def test_timestamp_conversion(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
@ -116,7 +74,7 @@ class TestDispatcherDB(base.BaseTestCase):
with mock.patch.object(self.dispatcher.meter_conn,
'record_metering_data') as record_metering_data:
self.dispatcher.verify_and_record_metering_data(msg)
self.dispatcher.record_metering_data(msg)
record_metering_data.assert_called_once_with(expected)
@ -136,6 +94,6 @@ class TestDispatcherDB(base.BaseTestCase):
with mock.patch.object(self.dispatcher.meter_conn,
'record_metering_data') as record_metering_data:
self.dispatcher.verify_and_record_metering_data(msg)
self.dispatcher.record_metering_data(msg)
record_metering_data.assert_called_once_with(expected)

View File

@ -57,7 +57,7 @@ class TestDispatcherFile(base.BaseTestCase):
# The record_metering_data method should exist
# and not produce errors.
dispatcher.verify_and_record_metering_data(msg)
dispatcher.record_metering_data(msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))
@ -88,7 +88,7 @@ class TestDispatcherFile(base.BaseTestCase):
)
# The record_metering_data method should exist and not produce errors.
dispatcher.verify_and_record_metering_data(msg)
dispatcher.record_metering_data(msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))

View File

@ -142,7 +142,7 @@ class DispatcherTest(base.BaseTestCase):
def _do_test_activity_filter(self, expected_measures, fake_batch, __):
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.verify_and_record_metering_data(self.samples)
d.record_metering_data(self.samples)
fake_batch.assert_called_with(
mock.ANY, mock.ANY,
{'metrics': 1, 'resources': 1, 'measures': expected_measures})
@ -427,7 +427,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
batch = fakeclient.metric.batch_resources_metrics_measures
batch.side_effect = batch_side_effect
self.dispatcher.verify_and_record_metering_data([self.sample])
self.dispatcher.record_metering_data([self.sample])
# Check that the last log message is the expected one
if (self.post_measure_fail or self.create_metric_fail

View File

@ -55,7 +55,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.assertEqual('', dispatcher.target)
with mock.patch.object(requests, 'post') as post:
dispatcher.verify_and_record_metering_data(self.msg)
dispatcher.record_metering_data(self.msg)
# Since the target is not set, no http post should occur, thus the
# call_count should be zero.
@ -66,7 +66,7 @@ class TestDispatcherHttp(base.BaseTestCase):
dispatcher = http.HttpDispatcher(self.CONF)
with mock.patch.object(requests, 'post') as post:
dispatcher.verify_and_record_metering_data(self.msg)
dispatcher.record_metering_data(self.msg)
self.assertEqual(1, post.call_count)

View File

@ -0,0 +1,57 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import fixture
from oslotest import mockpatch
from ceilometer import collector
from ceilometer import dispatcher
from ceilometer.publisher import utils
from ceilometer.tests import base
class FakeDispatcher(dispatcher.EventDispatcherBase):
def __init__(self, conf):
super(FakeDispatcher, self).__init__(conf)
self.events = []
def record_events(self, events):
super(FakeDispatcher, self).record_events(events)
self.events.extend(events)
class TestEventDispatcherVerifier(base.BaseTestCase):
def setUp(self):
super(TestEventDispatcherVerifier, self).setUp()
self.conf = self.useFixture(fixture.Config()).conf
self.conf.import_opt('telemetry_secret',
'ceilometer.publisher.utils',
'publisher')
self.useFixture(mockpatch.Patch(
'ceilometer.dispatcher.database.DatabaseDispatcher',
new=FakeDispatcher))
@mock.patch('ceilometer.publisher.utils.verify_signature')
def test_sample_with_bad_signature(self, mocked_verify):
def _fake_verify(ev, secret):
return ev.get('message_signature') != 'bad_signature'
mocked_verify.side_effect = _fake_verify
sample = {"payload": [{"message_signature": "bad_signature"}]}
manager = dispatcher.load_dispatcher_manager()[0]
v = collector.EventEndpoint("secret", manager)
v.sample([sample])
self.assertEqual([], manager['database'].obj.events)
del sample['payload'][0]['message_signature']
sample['payload'][0]['message_signature'] = utils.compute_signature(
sample['payload'][0], "secret")
v.sample([sample])
self.assertEqual(sample['payload'], manager['database'].obj.events)