Merge "collector: use an intermediate proxy class for event dispatcher"

This commit is contained in:
Jenkins 2016-07-12 11:16:42 +00:00 committed by Gerrit Code Review
commit 432f411603
8 changed files with 120 additions and 131 deletions

View File

@ -24,8 +24,9 @@ from oslo_utils import netutils
from oslo_utils import units
from ceilometer import dispatcher
from ceilometer.i18n import _, _LE
from ceilometer.i18n import _, _LE, _LW
from ceilometer import messaging
from ceilometer.publisher import utils as publisher_utils
from ceilometer import service_base
from ceilometer import utils
@ -81,7 +82,8 @@ class CollectorService(service_base.ServiceBase):
self.sample_listener = (
messaging.get_batch_notification_listener(
transport, [sample_target],
[SampleEndpoint(self.meter_manager)],
[SampleEndpoint(cfg.CONF.publisher.telemetry_secret,
self.meter_manager)],
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
@ -93,7 +95,8 @@ class CollectorService(service_base.ServiceBase):
self.event_listener = (
messaging.get_batch_notification_listener(
transport, [event_target],
[EventEndpoint(self.event_manager)],
[EventEndpoint(cfg.CONF.publisher.telemetry_secret,
self.event_manager)],
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
@ -118,12 +121,17 @@ class CollectorService(service_base.ServiceBase):
except Exception:
LOG.warning(_("UDP: Cannot decode data sent by %s"), source)
else:
try:
LOG.debug("UDP: Storing %s", sample)
self.meter_manager.map_method(
'verify_and_record_metering_data', sample)
except Exception:
LOG.exception(_("UDP: Unable to store meter"))
if publisher_utils.verify_signature(
sample, cfg.CONF.publisher.telemetry_secret):
try:
LOG.debug("UDP: Storing %s", sample)
self.meter_manager.map_method(
'record_metering_data', sample)
except Exception:
LOG.exception(_("UDP: Unable to store meter"))
else:
LOG.warning(_LW('sample signature invalid, '
'discarding: %s'), sample)
def stop(self):
if self.started:
@ -138,7 +146,8 @@ class CollectorService(service_base.ServiceBase):
class CollectorEndpoint(object):
def __init__(self, dispatcher_manager):
def __init__(self, secret, dispatcher_manager):
self.secret = secret
self.dispatcher_manager = dispatcher_manager
def sample(self, messages):
@ -147,20 +156,24 @@ class CollectorEndpoint(object):
When another service sends a notification over the message
bus, this method receives it.
"""
samples = list(chain.from_iterable(m["payload"] for m in messages))
goods = []
for sample in chain.from_iterable(m["payload"] for m in messages):
if publisher_utils.verify_signature(sample, self.secret):
goods.append(sample)
else:
LOG.warning(_LW('notification signature invalid, '
'discarding: %s'), sample)
try:
self.dispatcher_manager.map_method(self.method, samples)
self.dispatcher_manager.map_method(self.method, goods)
except Exception:
LOG.exception(_LE("Dispatcher failed to handle the %s, "
"requeue it."), self.ep_type)
LOG.exception(_LE("Dispatcher failed to handle the notification, "
"re-queuing it."))
return oslo_messaging.NotificationResult.REQUEUE
class SampleEndpoint(CollectorEndpoint):
method = 'verify_and_record_metering_data'
ep_type = 'sample'
method = 'record_metering_data'
class EventEndpoint(CollectorEndpoint):
method = 'verify_and_record_events'
ep_type = 'event'
method = 'record_events'

View File

@ -21,7 +21,6 @@ import six
from stevedore import named
from ceilometer.i18n import _LW
from ceilometer.publisher import utils
LOG = log.getLogger(__name__)
@ -87,36 +86,9 @@ class MeterDispatcherBase(Base):
def record_metering_data(self, data):
"""Recording metering data interface."""
def verify_and_record_metering_data(self, datapoints):
"""Verify metering data's signature and record valid ones."""
if not isinstance(datapoints, list):
datapoints = [datapoints]
valid_datapoints = []
for datapoint in datapoints:
if utils.verify_signature(datapoint,
self.conf.publisher.telemetry_secret):
valid_datapoints.append(datapoint)
else:
LOG.warning(_LW('Message signature is invalid, discarding '
'it: <%r>.'), datapoint)
return self.record_metering_data(valid_datapoints)
@six.add_metaclass(abc.ABCMeta)
class EventDispatcherBase(Base):
@abc.abstractmethod
def record_events(self, events):
"""Record events."""
def verify_and_record_events(self, events):
"""Verify event signature and record them."""
goods = []
for event in events:
if utils.verify_signature(
event, self.conf.publisher.telemetry_secret):
goods.append(event)
else:
LOG.warning(_LW(
'event signature invalid, discarding event: %s'), event)
return self.record_events(goods)

View File

@ -48,17 +48,18 @@ class TestCollector(tests_base.BaseTestCase):
group='publisher')
self._setup_messaging()
self.counter = sample.Sample(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={},
).as_dict()
self.sample = utils.meter_message_from_counter(
sample.Sample(
name='foobar',
type='bad',
unit='F',
volume=1,
user_id='jd',
project_id='ceilometer',
resource_id='cat',
timestamp=timeutils.utcnow().isoformat(),
resource_metadata={},
), self.CONF.publisher.telemetry_secret)
self.utf8_msg = utils.meter_message_from_counter(
sample.Sample(
@ -115,13 +116,8 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_base(self):
self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher()
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket') as mock_socket:
mock_socket.return_value = udp_socket
@ -132,14 +128,14 @@ class TestCollector(tests_base.BaseTestCase):
mock_socket.assert_called_with(socket.AF_INET, socket.SOCK_DGRAM)
self._verify_udp_socket(udp_socket)
mock_record = mock_dispatcher.verify_and_record_metering_data
mock_record.assert_called_once_with(self.counter)
mock_record = mock_dispatcher.record_metering_data
mock_record.assert_called_once_with(self.sample)
def test_udp_socket_ipv6(self):
self._setup_messaging(False)
self.CONF.set_override('udp_address', '::1', group='collector')
self._setup_fake_dispatcher()
sock = self._make_fake_socket('data')
sock = self._make_fake_socket(self.sample)
with mock.patch.object(socket, 'socket') as mock_socket:
mock_socket.return_value = sock
@ -152,16 +148,10 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_storage_error(self):
self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher()
mock_record = mock_dispatcher.verify_and_record_metering_data
mock_record = mock_dispatcher.record_metering_data
mock_record.side_effect = self._raise_error
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start()
self.addCleanup(self.srv.stop)
@ -170,7 +160,7 @@ class TestCollector(tests_base.BaseTestCase):
self._verify_udp_socket(udp_socket)
mock_record.assert_called_once_with(self.counter)
mock_record.assert_called_once_with(self.sample)
@staticmethod
def _raise_error(*args, **kwargs):
@ -179,7 +169,7 @@ class TestCollector(tests_base.BaseTestCase):
def test_udp_receive_bad_decoding(self):
self._setup_messaging(False)
self._setup_fake_dispatcher()
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
with mock.patch('socket.socket', return_value=udp_socket):
with mock.patch('msgpack.loads', self._raise_error):
self.srv.start()
@ -194,7 +184,7 @@ class TestCollector(tests_base.BaseTestCase):
"""Check that only UDP is started if messaging transport is unset."""
self._setup_messaging(False)
self._setup_fake_dispatcher()
udp_socket = self._make_fake_socket(self.counter)
udp_socket = self._make_fake_socket(self.sample)
real_start = oslo_messaging.MessageHandlingServer.start
with mock.patch.object(oslo_messaging.MessageHandlingServer,
'start', side_effect=real_start) as rpc_start:
@ -224,10 +214,9 @@ class TestCollector(tests_base.BaseTestCase):
mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
mock_record = mock_dispatcher.verify_and_record_metering_data
mock_record = mock_dispatcher.record_metering_data
mock_record.side_effect = Exception('boom')
mock_dispatcher.verify_and_record_events.side_effect = Exception(
'boom')
mock_dispatcher.record_events.side_effect = Exception('boom')
self.srv.start()
self.addCleanup(self.srv.stop)

View File

@ -44,28 +44,6 @@ class TestDispatcherDB(base.BaseTestCase):
self.dispatcher.record_events(event)
self.assertEqual(1, len(record_events.call_args_list[0][0][0]))
@mock.patch('ceilometer.publisher.utils.verify_signature')
def test_event_with_bad_signature(self, mocked_verify):
event = event_models.Event(uuid.uuid4(), 'test',
datetime.datetime(2012, 7, 2, 13, 53, 40),
[], {}).serialize()
def _fake_verify(ev, secret):
if ev.get('message_signature') == 'bad_signature':
return False
return True
mocked_verify.side_effect = _fake_verify
with mock.patch.object(self.dispatcher.event_conn,
'record_events') as record_events:
event['message_signature'] = 'bad_signature'
self.dispatcher.verify_and_record_events([event])
self.assertEqual([], record_events.call_args_list[0][0][0])
del event['message_signature']
event['message_signature'] = utils.compute_signature(
event, self.CONF.publisher.telemetry_secret)
self.dispatcher.verify_and_record_events([event])
self.assertEqual(1, len(record_events.call_args_list[1][0][0]))
def test_valid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
@ -77,30 +55,10 @@ class TestDispatcherDB(base.BaseTestCase):
with mock.patch.object(self.dispatcher.meter_conn,
'record_metering_data') as record_metering_data:
self.dispatcher.verify_and_record_metering_data(msg)
self.dispatcher.record_metering_data(msg)
record_metering_data.assert_called_once_with(msg)
def test_invalid_message(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
'counter_volume': 1,
'message_signature': 'invalid-signature'}
class ErrorConnection(object):
called = False
def record_metering_data_batch(self, data):
self.called = True
self.dispatcher._meter_conn = ErrorConnection()
self.dispatcher.verify_and_record_metering_data(msg)
if self.dispatcher.meter_conn.called:
self.fail('Should not have called the storage connection')
def test_timestamp_conversion(self):
msg = {'counter_name': 'test',
'resource_id': self.id(),
@ -116,7 +74,7 @@ class TestDispatcherDB(base.BaseTestCase):
with mock.patch.object(self.dispatcher.meter_conn,
'record_metering_data') as record_metering_data:
self.dispatcher.verify_and_record_metering_data(msg)
self.dispatcher.record_metering_data(msg)
record_metering_data.assert_called_once_with(expected)
@ -136,6 +94,6 @@ class TestDispatcherDB(base.BaseTestCase):
with mock.patch.object(self.dispatcher.meter_conn,
'record_metering_data') as record_metering_data:
self.dispatcher.verify_and_record_metering_data(msg)
self.dispatcher.record_metering_data(msg)
record_metering_data.assert_called_once_with(expected)

View File

@ -57,7 +57,7 @@ class TestDispatcherFile(base.BaseTestCase):
# The record_metering_data method should exist
# and not produce errors.
dispatcher.verify_and_record_metering_data(msg)
dispatcher.record_metering_data(msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))
@ -88,7 +88,7 @@ class TestDispatcherFile(base.BaseTestCase):
)
# The record_metering_data method should exist and not produce errors.
dispatcher.verify_and_record_metering_data(msg)
dispatcher.record_metering_data(msg)
# After the method call above, the file should have been created.
self.assertTrue(os.path.exists(handler.baseFilename))

View File

@ -142,7 +142,7 @@ class DispatcherTest(base.BaseTestCase):
def _do_test_activity_filter(self, expected_measures, fake_batch, __):
d = gnocchi.GnocchiDispatcher(self.conf.conf)
d.verify_and_record_metering_data(self.samples)
d.record_metering_data(self.samples)
fake_batch.assert_called_with(
mock.ANY, mock.ANY,
{'metrics': 1, 'resources': 1, 'measures': expected_measures})
@ -427,7 +427,7 @@ class DispatcherWorkflowTest(base.BaseTestCase,
batch = fakeclient.metric.batch_resources_metrics_measures
batch.side_effect = batch_side_effect
self.dispatcher.verify_and_record_metering_data([self.sample])
self.dispatcher.record_metering_data([self.sample])
# Check that the last log message is the expected one
if (self.post_measure_fail or self.create_metric_fail

View File

@ -55,7 +55,7 @@ class TestDispatcherHttp(base.BaseTestCase):
self.assertEqual('', dispatcher.target)
with mock.patch.object(requests, 'post') as post:
dispatcher.verify_and_record_metering_data(self.msg)
dispatcher.record_metering_data(self.msg)
# Since the target is not set, no http post should occur, thus the
# call_count should be zero.
@ -66,7 +66,7 @@ class TestDispatcherHttp(base.BaseTestCase):
dispatcher = http.HttpDispatcher(self.CONF)
with mock.patch.object(requests, 'post') as post:
dispatcher.verify_and_record_metering_data(self.msg)
dispatcher.record_metering_data(self.msg)
self.assertEqual(1, post.call_count)

View File

@ -0,0 +1,57 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import fixture
from oslotest import mockpatch
from ceilometer import collector
from ceilometer import dispatcher
from ceilometer.publisher import utils
from ceilometer.tests import base
class FakeDispatcher(dispatcher.EventDispatcherBase):
def __init__(self, conf):
super(FakeDispatcher, self).__init__(conf)
self.events = []
def record_events(self, events):
super(FakeDispatcher, self).record_events(events)
self.events.extend(events)
class TestEventDispatcherVerifier(base.BaseTestCase):
def setUp(self):
super(TestEventDispatcherVerifier, self).setUp()
self.conf = self.useFixture(fixture.Config()).conf
self.conf.import_opt('telemetry_secret',
'ceilometer.publisher.utils',
'publisher')
self.useFixture(mockpatch.Patch(
'ceilometer.dispatcher.database.DatabaseDispatcher',
new=FakeDispatcher))
@mock.patch('ceilometer.publisher.utils.verify_signature')
def test_sample_with_bad_signature(self, mocked_verify):
def _fake_verify(ev, secret):
return ev.get('message_signature') != 'bad_signature'
mocked_verify.side_effect = _fake_verify
sample = {"payload": [{"message_signature": "bad_signature"}]}
manager = dispatcher.load_dispatcher_manager()[0]
v = collector.EventEndpoint("secret", manager)
v.sample([sample])
self.assertEqual([], manager['database'].obj.events)
del sample['payload'][0]['message_signature']
sample['payload'][0]['message_signature'] = utils.compute_signature(
sample['payload'][0], "secret")
v.sample([sample])
self.assertEqual(sample['payload'], manager['database'].obj.events)