Merge "Enable signature verification for events"
This commit is contained in:
commit
dcada9c8d8
@ -109,20 +109,25 @@ class DatabaseDispatcher(dispatcher.MeterDispatcherBase,
|
||||
|
||||
event_list = []
|
||||
for ev in events:
|
||||
try:
|
||||
event_list.append(
|
||||
models.Event(
|
||||
message_id=ev['message_id'],
|
||||
event_type=ev['event_type'],
|
||||
generated=timeutils.normalize_time(
|
||||
timeutils.parse_isotime(ev['generated'])),
|
||||
traits=[models.Trait(
|
||||
name, dtype,
|
||||
models.Trait.convert_value(dtype, value))
|
||||
for name, dtype, value in ev['traits']],
|
||||
raw=ev.get('raw', {}))
|
||||
)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Error processing event and it will be "
|
||||
"dropped: %s"), ev)
|
||||
if publisher_utils.verify_signature(
|
||||
ev, self.conf.publisher.telemetry_secret):
|
||||
try:
|
||||
event_list.append(
|
||||
models.Event(
|
||||
message_id=ev['message_id'],
|
||||
event_type=ev['event_type'],
|
||||
generated=timeutils.normalize_time(
|
||||
timeutils.parse_isotime(ev['generated'])),
|
||||
traits=[models.Trait(
|
||||
name, dtype,
|
||||
models.Trait.convert_value(dtype, value))
|
||||
for name, dtype, value in ev['traits']],
|
||||
raw=ev.get('raw', {}))
|
||||
)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Error processing event and it will be "
|
||||
"dropped: %s"), ev)
|
||||
else:
|
||||
LOG.warning(_LW(
|
||||
'event signature invalid, discarding event: %s'), ev)
|
||||
self.event_conn.record_events(event_list)
|
||||
|
@ -19,7 +19,7 @@ from oslo_log import log
|
||||
import requests
|
||||
|
||||
from ceilometer import dispatcher
|
||||
from ceilometer.i18n import _, _LE
|
||||
from ceilometer.i18n import _, _LE, _LW
|
||||
from ceilometer.publisher import utils as publisher_utils
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
@ -132,13 +132,19 @@ class HttpDispatcher(dispatcher.MeterDispatcherBase,
|
||||
events = [events]
|
||||
|
||||
for event in events:
|
||||
res = None
|
||||
try:
|
||||
res = requests.post(self.event_target, data=event,
|
||||
headers=self.headers, timeout=self.timeout)
|
||||
res.raise_for_status()
|
||||
except Exception:
|
||||
error_code = res.status_code if res else 'unknown'
|
||||
LOG.exception(_LE('Status Code: %{code}s. Failed to dispatch '
|
||||
'event: %{event}s'),
|
||||
{'code': error_code, 'event': event})
|
||||
if publisher_utils.verify_signature(
|
||||
event, self.conf.publisher.telemetry_secret):
|
||||
res = None
|
||||
try:
|
||||
res = requests.post(self.event_target, data=event,
|
||||
headers=self.headers,
|
||||
timeout=self.timeout)
|
||||
res.raise_for_status()
|
||||
except Exception:
|
||||
error_code = res.status_code if res else 'unknown'
|
||||
LOG.exception(_LE('Status Code: %{code}s. Failed to'
|
||||
'dispatch event: %{event}s'),
|
||||
{'code': error_code, 'event': event})
|
||||
else:
|
||||
LOG.warning(_LW(
|
||||
'event signature invalid, discarding event: %s'), event)
|
||||
|
@ -36,11 +36,35 @@ class TestDispatcherDB(base.BaseTestCase):
|
||||
def test_event_conn(self):
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {}).serialize()
|
||||
[], {})
|
||||
event = utils.message_from_event(event,
|
||||
self.CONF.publisher.telemetry_secret)
|
||||
with mock.patch.object(self.dispatcher.event_conn,
|
||||
'record_events') as record_events:
|
||||
self.dispatcher.record_events(event)
|
||||
self.assertTrue(record_events.called)
|
||||
self.assertEqual(1, len(record_events.call_args_list[0][0][0]))
|
||||
|
||||
@mock.patch('ceilometer.publisher.utils.verify_signature')
|
||||
def test_event_with_bad_signature(self, mocked_verify):
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {}).serialize()
|
||||
|
||||
def _fake_verify(ev, secret):
|
||||
if ev.get('message_signature') == 'bad_signature':
|
||||
return False
|
||||
return True
|
||||
mocked_verify.side_effect = _fake_verify
|
||||
with mock.patch.object(self.dispatcher.event_conn,
|
||||
'record_events') as record_events:
|
||||
event['message_signature'] = 'bad_signature'
|
||||
self.dispatcher.record_events(event)
|
||||
self.assertEqual([], record_events.call_args_list[0][0][0])
|
||||
del event['message_signature']
|
||||
event['message_signature'] = utils.compute_signature(
|
||||
event, self.CONF.publisher.telemetry_secret)
|
||||
self.dispatcher.record_events(event)
|
||||
self.assertEqual(1, len(record_events.call_args_list[1][0][0]))
|
||||
|
||||
def test_valid_message(self):
|
||||
msg = {'counter_name': 'test',
|
||||
|
@ -136,7 +136,9 @@ class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {}).serialize()
|
||||
[], {})
|
||||
event = utils.message_from_event(event,
|
||||
self.CONF.publisher.telemetry_secret)
|
||||
|
||||
with mock.patch.object(requests, 'post') as post:
|
||||
dispatcher.record_events(event)
|
||||
@ -149,8 +151,9 @@ class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {}).serialize()
|
||||
|
||||
[], {})
|
||||
event = utils.message_from_event(event,
|
||||
self.CONF.publisher.telemetry_secret)
|
||||
with mock.patch('ceilometer.dispatcher.http.LOG',
|
||||
mock.MagicMock()) as LOG:
|
||||
dispatcher.record_events(event)
|
||||
@ -162,8 +165,9 @@ class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
|
||||
event = event_models.Event(uuid.uuid4(), 'test',
|
||||
datetime.datetime(2012, 7, 2, 13, 53, 40),
|
||||
[], {}).serialize()
|
||||
|
||||
[], {})
|
||||
event = utils.message_from_event(event,
|
||||
self.CONF.publisher.telemetry_secret)
|
||||
with mock.patch.object(requests, 'post') as post:
|
||||
dispatcher.record_events(event)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user