Listen for volume notifications

This implements bug #1021772

Change-Id: Ic0292e8bdc20668abd331f4f03b06b9d1496217a
Signed-off-by: Julien Danjou <julien@danjou.info>
This commit is contained in:
Julien Danjou 2012-09-21 12:01:23 +02:00
parent 50da744ed5
commit b0eb0b091e
7 changed files with 191 additions and 0 deletions

View File

@ -43,6 +43,7 @@ LOG = log.getLogger(__name__)
COMPUTE_COLLECTOR_NAMESPACE = 'ceilometer.collector.compute' COMPUTE_COLLECTOR_NAMESPACE = 'ceilometer.collector.compute'
VOLUME_COLLECTOR_NAMESPACE = 'ceilometer.collector.volume'
class CollectorManager(manager.Manager): class CollectorManager(manager.Manager):
@ -61,6 +62,10 @@ class CollectorManager(manager.Manager):
COMPUTE_COLLECTOR_NAMESPACE, COMPUTE_COLLECTOR_NAMESPACE,
self._publish_counter, self._publish_counter,
) )
self.volume_handler = dispatcher.NotificationDispatcher(
VOLUME_COLLECTOR_NAMESPACE,
self._publish_counter,
)
# FIXME(dhellmann): Should be using create_worker(), except # FIXME(dhellmann): Should be using create_worker(), except
# that notification messages do not conform to the RPC # that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method" # invocation protocol (they do not include a "method"
@ -68,6 +73,9 @@ class CollectorManager(manager.Manager):
self.connection.declare_topic_consumer( self.connection.declare_topic_consumer(
topic='%s.info' % cfg.CONF.notification_topics[0], topic='%s.info' % cfg.CONF.notification_topics[0],
callback=self.compute_handler.notify) callback=self.compute_handler.notify)
self.connection.declare_topic_consumer(
topic='%s.info' % cfg.CONF.notification_topics[0],
callback=self.volume_handler.notify)
# Set ourselves up as a separate worker for the metering data, # Set ourselves up as a separate worker for the metering data,
# since the default for manager is to use create_consumer(). # since the default for manager is to use create_consumer().

View File

@ -35,6 +35,14 @@ class NotificationBase(object):
def process_notification(self, message): def process_notification(self, message):
"""Return a sequence of Counter instances for the given message.""" """Return a sequence of Counter instances for the given message."""
def notification_to_metadata(self, event):
"""Transform a payload dict to a metadata dict."""
metadata = dict([(k, event['payload'].get(k))
for k in self.metadata_keys])
metadata['event_type'] = event['event_type']
metadata['host'] = event['publisher_id']
return metadata
class PollsterBase(object): class PollsterBase(object):
"""Base class for plugins that support the polling API.""" """Base class for plugins that support the polling API."""

View File

View File

@ -0,0 +1,80 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2012 New Dream Network, LLC (DreamHost)
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Converters for producing volume counter messages from cinder notification
events.
"""
from ceilometer import counter
from ceilometer import plugin
class _Base(plugin.NotificationBase):
"""Convert compute.instance.* notifications into Counters
"""
metadata_keys = [
"status",
"display_name",
"volume_type",
"size",
]
@staticmethod
def get_event_types():
return ['volume.exists',
'volume.create.end',
'volume.delete.end',
]
class Volume(_Base):
def process_notification(self, message):
return [
counter.Counter(source='?',
name='volume',
type='absolute',
volume=1,
user_id=message['payload']['user_id'],
project_id=message['payload']['tenant_id'],
resource_id=message['payload']['volume_id'],
timestamp=message['timestamp'],
duration=None,
resource_metadata=self.notification_to_metadata(
message),
),
]
class VolumeSize(_Base):
def process_notification(self, message):
return [
counter.Counter(source='?',
name='volume_size',
type='absolute',
volume=message['payload']['size'],
user_id=message['payload']['user_id'],
project_id=message['payload']['tenant_id'],
resource_id=message['payload']['volume_id'],
timestamp=message['timestamp'],
duration=None,
resource_metadata=self.notification_to_metadata(
message),
),
]

View File

@ -46,6 +46,10 @@ setuptools.setup(
root_disk_size = ceilometer.compute.notifications:RootDiskSize root_disk_size = ceilometer.compute.notifications:RootDiskSize
ephemeral_disk_size = ceilometer.compute.notifications:EphemeralDiskSize ephemeral_disk_size = ceilometer.compute.notifications:EphemeralDiskSize
[ceilometer.collector.volume]
volume = ceilometer.volume.notifications:Volume
volume_size = ceilometer.volume.notifications:VolumeSize
[ceilometer.poll.compute] [ceilometer.poll.compute]
libvirt_diskio = ceilometer.compute.libvirt:DiskIOPollster libvirt_diskio = ceilometer.compute.libvirt:DiskIOPollster
libvirt_cpu = ceilometer.compute.libvirt:CPUPollster libvirt_cpu = ceilometer.compute.libvirt:CPUPollster

0
tests/volume/__init__.py Normal file
View File

View File

@ -0,0 +1,91 @@
import unittest
from ceilometer.volume import notifications
NOTIFICATION_VOLUME_EXISTS = {
u'_context_roles': [u'admin'],
u'_context_request_id': u'req-7ef29a5d-adeb-48a8-b104-59c05361aa27',
u'_context_quota_class': None,
u'event_type': u'volume.exists',
u'timestamp': u'2012-09-21 09:29:10.620731',
u'message_id': u'e0e6a5ad-2fc9-453c-b3fb-03fe504538dc',
u'_context_auth_token': None,
u'_context_is_admin': True,
u'_context_project_id': None,
u'_context_timestamp': u'2012-09-21T09:29:10.266928',
u'_context_read_deleted': u'no',
u'_context_user_id': None,
u'_context_remote_address': None,
u'publisher_id': u'volume.ubuntu-VirtualBox',
u'payload': {u'status': u'available',
u'audit_period_beginning': u'2012-09-20 00:00:00',
u'display_name': u'volume1',
u'tenant_id': u'6c97f1ecf17047eab696786d56a0bff5',
u'created_at': u'2012-09-20 15:05:16',
u'snapshot_id': None,
u'volume_type': None,
u'volume_id': u'84c363b9-9854-48dc-b949-fe04263f4cf0',
u'audit_period_ending': u'2012-09-21 00:00:00',
u'user_id': u'4d2fa4b76a4a4ecab8c468c8dea42f89',
u'launched_at': u'2012-09-20 15:05:23',
u'size': 1},
u'priority': u'INFO'
}
NOTIFICATION_VOLUME_DELETE = {
u'_context_roles': [u'Member', u'admin'],
u'_context_request_id': u'req-6ba8ccb4-1093-4a39-b029-adfaa3fc7ceb',
u'_context_quota_class': None,
u'event_type': u'volume.delete.end',
u'timestamp': u'2012-09-21 10:24:13.168630',
u'message_id': u'f6e6bc1f-fcd5-41e1-9a86-da7d024f03d9',
u'_context_auth_token': u'277c6899de8a4b3d999f3e2e4c0915ff',
u'_context_is_admin': True,
u'_context_project_id': u'6c97f1ecf17047eab696786d56a0bff5',
u'_context_timestamp': u'2012-09-21T10:23:54.741228',
u'_context_read_deleted': u'no',
u'_context_user_id': u'4d2fa4b76a4a4ecab8c468c8dea42f89',
u'_context_remote_address': u'192.168.22.101',
u'publisher_id': u'volume.ubuntu-VirtualBox',
u'payload': {u'status': u'deleting',
u'volume_type_id': None,
u'display_name': u'abc',
u'tenant_id': u'6c97f1ecf17047eab696786d56a0bff5',
u'created_at': u'2012-09-21 10:10:47',
u'snapshot_id': None,
u'volume_id': u'3b761164-84b4-4eb3-8fcb-1974c641d6ef',
u'user_id': u'4d2fa4b76a4a4ecab8c468c8dea42f89',
u'launched_at': u'2012-09-21 10:10:50',
u'size': 1},
u'priority': u'INFO'}
class TestNotifications(unittest.TestCase):
def test_volume_exists(self):
v = notifications.Volume()
counters = v.process_notification(NOTIFICATION_VOLUME_EXISTS)
self.assertEqual(len(counters), 1)
c = counters[0]
self.assertEqual(c.volume, 1)
def test_volume_size_exists(self):
v = notifications.VolumeSize()
counters = v.process_notification(NOTIFICATION_VOLUME_EXISTS)
self.assertEqual(len(counters), 1)
c = counters[0]
self.assertEqual(c.volume, NOTIFICATION_VOLUME_EXISTS['payload']['size'])
def test_volume_delete(self):
v = notifications.Volume()
counters = v.process_notification(NOTIFICATION_VOLUME_EXISTS)
self.assertEqual(len(counters), 1)
c = counters[0]
self.assertEqual(c.volume, 1)
def test_volume_size_delete(self):
v = notifications.VolumeSize()
counters = v.process_notification(NOTIFICATION_VOLUME_EXISTS)
self.assertEqual(len(counters), 1)
c = counters[0]
self.assertEqual(c.volume, NOTIFICATION_VOLUME_EXISTS['payload']['size'])