Merge "Optionally store Events in Collector."

This commit is contained in:
Jenkins
2013-07-12 10:21:51 +00:00
committed by Gerrit Code Review
3 changed files with 153 additions and 13 deletions

View File

@@ -16,22 +16,25 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo.config import cfg
import msgpack import msgpack
from oslo.config import cfg
import socket import socket
from stevedore import extension from stevedore import extension
from ceilometer.publisher import rpc as publisher_rpc from ceilometer.publisher import rpc as publisher_rpc
from ceilometer.service import prepare_service from ceilometer.service import prepare_service
from ceilometer.openstack.common import context from ceilometer.openstack.common import context
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import service as os_service
from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher from ceilometer.openstack.common.rpc import dispatcher as rpc_dispatcher
from ceilometer.openstack.common.rpc import service as rpc_service from ceilometer.openstack.common.rpc import service as rpc_service
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import pipeline from ceilometer import pipeline
from ceilometer import storage from ceilometer import storage
from ceilometer.storage import models
from ceilometer import transformer from ceilometer import transformer
OPTS = [ OPTS = [
@@ -42,6 +45,12 @@ OPTS = [
cfg.IntOpt('udp_port', cfg.IntOpt('udp_port',
default=4952, default=4952,
help='port to bind the UDP socket to'), help='port to bind the UDP socket to'),
cfg.BoolOpt('ack_on_event_error',
default=True,
help='Acknowledge message when event persistence fails'),
cfg.BoolOpt('store_events',
default=False,
help='Save event details'),
] ]
cfg.CONF.register_opts(OPTS, group="collector") cfg.CONF.register_opts(OPTS, group="collector")
@@ -141,8 +150,11 @@ class CollectorService(rpc_service.Service):
def _setup_subscription(self, ext, *args, **kwds): def _setup_subscription(self, ext, *args, **kwds):
handler = ext.obj handler = ext.obj
LOG.debug('Event types from %s: %s', ack_on_error = cfg.CONF.collector.ack_on_event_error
ext.name, ', '.join(handler.get_event_types())) LOG.debug('Event types from %s: %s (ack_on_error=%s)',
ext.name, ', '.join(handler.get_event_types()),
ack_on_error)
for exchange_topic in handler.get_exchange_topics(cfg.CONF): for exchange_topic in handler.get_exchange_topics(cfg.CONF):
for topic in exchange_topic.topics: for topic in exchange_topic.topics:
try: try:
@@ -151,7 +163,7 @@ class CollectorService(rpc_service.Service):
pool_name='ceilometer.notifications', pool_name='ceilometer.notifications',
topic=topic, topic=topic,
exchange_name=exchange_topic.exchange, exchange_name=exchange_topic.exchange,
) ack_on_error=ack_on_error)
except Exception: except Exception:
LOG.exception('Could not join consumer pool %s/%s' % LOG.exception('Could not join consumer pool %s/%s' %
(topic, exchange_topic.exchange)) (topic, exchange_topic.exchange))
@@ -160,8 +172,60 @@ class CollectorService(rpc_service.Service):
"""Make a notification processed by an handler.""" """Make a notification processed by an handler."""
LOG.debug('notification %r', notification.get('event_type')) LOG.debug('notification %r', notification.get('event_type'))
self.notification_manager.map(self._process_notification_for_ext, self.notification_manager.map(self._process_notification_for_ext,
notification=notification, notification=notification)
)
if cfg.CONF.collector.store_events:
self._message_to_event(notification)
@staticmethod
def _extract_when(body):
"""Extract the generated datetime from the notification.
"""
when = body.get('timestamp', body.get('_context_timestamp'))
if when:
return timeutils.normalize_time(timeutils.parse_isotime(when))
return timeutils.utcnow()
def _message_to_event(self, body):
"""Convert message to Ceilometer Event.
NOTE: this is currently based on the Nova notification format.
We will need to make this driver-based to support other formats.
NOTE: the rpc layer currently rips out the notification
delivery_info, which is critical to determining the
source of the notification. This will have to get added back later.
"""
event_name = body['event_type']
when = self._extract_when(body)
LOG.debug('Saving event "%s"', event_name)
message_id = body.get('message_id')
# TODO(sandy) - check we have not already saved this notification.
# (possible on retries) Use message_id to spot dups.
publisher = body.get('publisher_id')
request_id = body.get('_context_request_id')
tenant_id = body.get('_context_tenant')
text = models.Trait.TEXT_TYPE
all_traits = [models.Trait('message_id', text, message_id),
models.Trait('service', text, publisher),
models.Trait('request_id', text, request_id),
models.Trait('tenant_id', text, tenant_id),
]
# Only store non-None value traits ...
traits = [trait for trait in all_traits if trait.value is not None]
event = models.Event(event_name, when, traits)
try:
self.storage_conn.record_events([event, ])
except Exception as err:
LOG.exception(_("Unable to store events: %s"), err)
# By re-raising we avoid ack()'ing the message.
raise
def _process_notification_for_ext(self, ext, notification): def _process_notification_for_ext(self, ext, notification):
handler = ext.obj handler = ext.obj

View File

@@ -597,6 +597,13 @@
# port to bind the UDP socket to (integer value) # port to bind the UDP socket to (integer value)
#udp_port=4952 #udp_port=4952
# Acknowledge message when event persistence fails (boolean
# value)
#ack_on_event_error=true
# Save event details (boolean value)
#store_events=false
[matchmaker_ring] [matchmaker_ring]
@@ -624,4 +631,4 @@
#password=<None> #password=<None>
# Total option count: 119 # Total option count: 121

View File

@@ -15,10 +15,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
"""Tests for ceilometer/agent/manager.py """Tests for ceilometer/agent/service.py
""" """
from datetime import datetime import datetime
import msgpack import msgpack
import socket import socket
@@ -29,6 +29,7 @@ from stevedore import extension
from stevedore.tests import manager as test_manager from stevedore.tests import manager as test_manager
from ceilometer import counter from ceilometer import counter
from ceilometer.openstack.common import timeutils
from ceilometer.publisher import rpc from ceilometer.publisher import rpc
from ceilometer.collector import service from ceilometer.collector import service
from ceilometer.storage import base from ceilometer.storage import base
@@ -164,6 +165,10 @@ class TestUDPCollectorService(TestCollector):
self.srv.start() self.srv.start()
class MyException(Exception):
pass
class TestCollectorService(TestCollector): class TestCollectorService(TestCollector):
def setUp(self): def setUp(self):
@@ -174,7 +179,7 @@ class TestCollectorService(TestCollector):
@patch('ceilometer.pipeline.setup_pipeline', MagicMock()) @patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_init_host(self): def test_init_host(self):
# If we try to create a real RPC connection, init_host() never # If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the manager # returns. Mock it out so we can establish the service
# configuration. # configuration.
with patch('ceilometer.openstack.common.rpc.create_connection'): with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start() self.srv.start()
@@ -230,7 +235,7 @@ class TestCollectorService(TestCollector):
expected = {} expected = {}
expected.update(msg) expected.update(msg)
expected['timestamp'] = datetime(2012, 7, 2, 13, 53, 40) expected['timestamp'] = datetime.datetime(2012, 7, 2, 13, 53, 40)
self.srv.storage_conn = self.mox.CreateMock(base.Connection) self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.srv.storage_conn.record_metering_data(expected) self.srv.storage_conn.record_metering_data(expected)
@@ -251,7 +256,8 @@ class TestCollectorService(TestCollector):
expected = {} expected = {}
expected.update(msg) expected.update(msg)
expected['timestamp'] = datetime(2012, 9, 30, 23, 31, 50, 262000) expected['timestamp'] = datetime.datetime(2012, 9, 30,
23, 31, 50, 262000)
self.srv.storage_conn = self.mox.CreateMock(base.Connection) self.srv.storage_conn = self.mox.CreateMock(base.Connection)
self.srv.storage_conn.record_metering_data(expected) self.srv.storage_conn.record_metering_data(expected)
@@ -262,8 +268,9 @@ class TestCollectorService(TestCollector):
@patch('ceilometer.pipeline.setup_pipeline', MagicMock()) @patch('ceilometer.pipeline.setup_pipeline', MagicMock())
def test_process_notification(self): def test_process_notification(self):
# If we try to create a real RPC connection, init_host() never # If we try to create a real RPC connection, init_host() never
# returns. Mock it out so we can establish the manager # returns. Mock it out so we can establish the service
# configuration. # configuration.
cfg.CONF.set_override("store_events", False, group="collector")
with patch('ceilometer.openstack.common.rpc.create_connection'): with patch('ceilometer.openstack.common.rpc.create_connection'):
self.srv.start() self.srv.start()
self.srv.pipeline_manager.pipelines[0] = MagicMock() self.srv.pipeline_manager.pipelines[0] = MagicMock()
@@ -277,3 +284,65 @@ class TestCollectorService(TestCollector):
self.srv.process_notification(TEST_NOTICE) self.srv.process_notification(TEST_NOTICE)
self.assertTrue( self.assertTrue(
self.srv.pipeline_manager.publisher.called) self.srv.pipeline_manager.publisher.called)
def test_process_notification_no_events(self):
cfg.CONF.set_override("store_events", False, group="collector")
self.srv.notification_manager = MagicMock()
with patch.object(self.srv, '_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertFalse(fake_msg_to_event.called)
def test_process_notification_with_events(self):
cfg.CONF.set_override("store_events", True, group="collector")
self.srv.notification_manager = MagicMock()
with patch.object(self.srv, '_message_to_event') as fake_msg_to_event:
self.srv.process_notification({})
self.assertTrue(fake_msg_to_event.called)
def test_message_to_event_missing_keys(self):
now = timeutils.utcnow()
timeutils.set_time_override(now)
message = {'event_type': "foo", 'message_id': "abc"}
self.srv.storage_conn = MagicMock()
with patch('ceilometer.collector.service.LOG') as mylog:
self.srv._message_to_event(message)
self.assertFalse(mylog.exception.called)
events = self.srv.storage_conn.record_events.call_args[0]
self.assertEquals(1, len(events))
event = events[0][0]
self.assertEquals("foo", event.event_name)
self.assertEquals(now, event.generated)
self.assertEquals(1, len(event.traits))
def test_message_to_event_bad_save(self):
cfg.CONF.set_override("store_events", True, group="collector")
self.srv.storage_conn = MagicMock()
self.srv.storage_conn.record_events.side_effect = MyException("Boom")
message = {'event_type': "foo", 'message_id': "abc"}
try:
self.srv._message_to_event(message)
self.fail("failing save should raise")
except MyException:
pass
def test_extract_when(self):
now = timeutils.utcnow()
modified = now + datetime.timedelta(minutes=1)
timeutils.set_time_override(now)
body = {"timestamp": str(modified)}
self.assertEquals(service.CollectorService._extract_when(body),
modified)
body = {"_context_timestamp": str(modified)}
self.assertEquals(service.CollectorService._extract_when(body),
modified)
then = now + datetime.timedelta(hours=1)
body = {"timestamp": str(modified), "_context_timestamp": str(then)}
self.assertEquals(service.CollectorService._extract_when(body),
modified)
self.assertEquals(service.CollectorService._extract_when({}), now)