Publish and receive metering messages

bug 1004198
bug 1004200

- Set up the collector to receive the metering messages.
- Make the collector republish notifications as metering data.
- Add a "monitor" mode to tools/notificationclient.py to simply print
  the events without writing them to a file.
- Add a --topic flag to tools/notificationclient.py so it can be made
  to listen events other than notifications (for monitoring metering
  events).
- Change "counter_datetime" within the metering message to "timestamp"
  to be consistent with the notification message format.
- Add a configuration option to control the secret value for signing
  metering messages.
- Make the collector and agent daemon control topics more specific.
- Use the config setting to set the metering topic subscription.
- Set a short interval for polling to get more data for development
  testing.
- Log after successful load of pollsters instead of before attempt.

Change-Id: Iedfe26f8a4fa80d88cd0a76e5738001ba5689bdc
This commit is contained in:
Doug Hellmann 2012-05-25 14:22:09 -04:00
parent dbccf0ce69
commit b76f67d11f
14 changed files with 198 additions and 38 deletions

View File

@ -33,7 +33,16 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.Service.create(binary='ceilometer-agent',
topic='ceilometer',
manager='ceilometer.agent.manager.AgentManager')
topic='ceilometer.agent',
manager='ceilometer.agent.manager.AgentManager',
# FIXME(dhellmann): The
# periodic_interval is set very
# short for development. After we
# fix the configuration loading we
# can use the config file to
# adjust it and remove this
# hard-coded value.
periodic_interval=10,
)
service.serve(server)
service.wait()

View File

@ -33,7 +33,7 @@ if __name__ == '__main__':
logging.setup()
utils.monkey_patch()
server = service.Service.create(binary='ceilometer-collector',
topic='ceilometer',
topic='ceilometer.collector',
manager='ceilometer.collector.manager.CollectorManager')
service.serve(server)
service.wait()

View File

@ -18,11 +18,15 @@
import pkg_resources
from nova import flags
from nova import log as logging
from nova import manager
from nova import flags
from nova import rpc
from ceilometer import meter
FLAGS = flags.FLAGS
# FIXME(dhellmann): We need to have the main program set up logging
# correctly so messages from modules outside of the nova package
# appear in the output.
@ -40,8 +44,6 @@ class AgentManager(manager.Manager):
def _load_plugins(self):
self.pollsters = []
for ep in pkg_resources.iter_entry_points(COMPUTE_PLUGIN_NAMESPACE):
LOG.info('attempting to load pollster %s:%s',
COMPUTE_PLUGIN_NAMESPACE, ep.name)
try:
plugin_class = ep.load()
plugin = plugin_class()
@ -51,6 +53,8 @@ class AgentManager(manager.Manager):
# configuration flag and check that asks the plugin if
# it should be enabled.
self.pollsters.append((ep.name, plugin))
LOG.info('loaded pollster %s:%s',
COMPUTE_PLUGIN_NAMESPACE, ep.name)
except Exception as err:
LOG.warning('Failed to load pollster %s:%s',
ep.name, err)
@ -67,8 +71,16 @@ class AgentManager(manager.Manager):
LOG.info('polling %s', name)
for c in pollster.get_counters(self, context):
LOG.info('COUNTER: %s', c)
# FIXME(dhellmann): Convert to meter data and
# publish.
msg = {
'method': 'record_metering_data',
'version': '1.0',
'args': {'data': meter.meter_message_from_counter(c),
},
}
rpc.cast(context, FLAGS.metering_topic, msg)
rpc.cast(context,
FLAGS.metering_topic + '.' + c.type,
msg)
except Exception as err:
LOG.warning('Continuing after error from %s: %s', name, err)
LOG.exception(err)

View File

@ -16,9 +16,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from nova import context
from nova import flags
from nova import log as logging
from nova import manager
from nova import rpc as nova_rpc
from nova.rpc import dispatcher as rpc_dispatcher
from ceilometer import rpc
from ceilometer import meter
@ -39,15 +42,30 @@ COMPUTE_COLLECTOR_NAMESPACE = 'ceilometer.collector.compute'
class CollectorManager(manager.Manager):
def init_host(self):
self.connection = rpc.Connection(flags.FLAGS)
self.compute_handler = dispatcher.NotificationDispatcher(
COMPUTE_COLLECTOR_NAMESPACE,
self._publish_counter,
)
# FIXME(dhellmann): Should be using create_worker(), except
# that notification messages do not conform to the RPC
# invocation protocol (they do not include a "method"
# parameter).
self.connection.declare_topic_consumer(
topic='%s.info' % flags.FLAGS.notification_topics[0],
callback=self.compute_handler.notify)
# Set ourselves up as a separate worker for the metering data,
# since the default for manager is to use create_consumer().
self.connection.create_worker(
flags.FLAGS.metering_topic,
rpc_dispatcher.RpcDispatcher([self]),
'ceilometer.collector.' + flags.FLAGS.metering_topic,
)
self.connection.consume_in_thread()
def _publish_counter(self, counter):
@ -56,3 +74,24 @@ class CollectorManager(manager.Manager):
LOG.info('PUBLISH: %s', str(msg))
# FIXME(dhellmann): Need to publish the message on the
# metering queue.
msg = {
'method': 'record_metering_data',
'version': '1.0',
'args': {'data': msg,
},
}
ctxt = context.get_admin_context()
nova_rpc.cast(ctxt, FLAGS.metering_topic, msg)
nova_rpc.cast(ctxt,
FLAGS.metering_topic + '.' + counter.type,
msg)
def record_metering_data(self, context, data):
"""This method is triggered when metering data is
cast from an agent.
"""
#LOG.info('metering data: %r', data)
LOG.info('metering data %s for %s: %s',
data['event_type'],
data['resource_id'],
data['counter_volume'])

View File

@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from lxml import etree
from nova import log as logging
@ -43,7 +45,7 @@ def make_counter_from_instance(instance, type, volume):
user_id=instance.user_id,
project_id=instance.project_id,
resource_id=instance.uuid,
datetime=None,
timestamp=datetime.datetime.utcnow().isoformat(),
duration=None,
resource_metadata={
'display_name': instance.display_name,

View File

@ -31,7 +31,7 @@ def c1(body):
user_id=body['payload']['user_id'],
project_id=body['payload']['tenant_id'],
resource_id=body['payload']['instance_id'],
datetime=body['timestamp'],
timestamp=body['timestamp'],
duration=0,
# FIXME(dhellmann): Add region and other
# details to metadata

View File

@ -32,7 +32,7 @@ Counter = collections.namedtuple('Counter',
'user_id',
'project_id',
'resource_id',
'datetime',
'timestamp',
'duration',
'resource_metadata',
])

View File

@ -18,19 +18,31 @@
"""Compute the signature of a metering message.
"""
import hmac
import hashlib
import hmac
import uuid
from nova import flags
from nova.openstack.common import cfg
# FIXME(dhellmann): Need to move this secret out of the code. Where?
SECRET = 'secrete'
METER_OPTS = [
cfg.StrOpt('metering_secret',
default='change this or be hacked',
help='Secret value for signing metering messages',
),
cfg.StrOpt('metering_topic',
default='metering',
help='the topic ceilometer uses for metering messages',
),
]
flags.FLAGS.register_opts(METER_OPTS)
def compute_signature(message):
"""Return the signature for a message dictionary.
"""
digest_maker = hmac.new(SECRET, '', hashlib.sha256)
digest_maker = hmac.new(flags.FLAGS.metering_secret, '', hashlib.sha256)
for name, value in sorted(message.iteritems()):
if name == 'message_signature':
# Skip any existing signature value, which would not have
@ -53,10 +65,12 @@ def meter_message_from_counter(counter):
'user_id': counter.user_id,
'project_id': counter.project_id,
'resource_id': counter.resource_id,
'counter_datetime': counter.datetime,
'timestamp': counter.timestamp,
'counter_duration': counter.duration,
'resource_metadata': counter.resource_metadata,
'message_id': str(uuid.uuid1()),
# This field is used by the notification system.
'event_type': '%s.%s' % (flags.FLAGS.metering_topic, counter.type),
}
msg['message_signature'] = compute_signature(msg)
return msg

View File

@ -0,0 +1 @@
from nova.tests import *

View File

@ -18,7 +18,14 @@
"""Tests for ceilometer/agent/manager.py
"""
import datetime
from nova import context
from nova import rpc
from nova import test
from ceilometer.agent import manager
from ceilometer import counter
def test_load_plugins():
@ -28,15 +35,45 @@ def test_load_plugins():
return
def test_run_tasks():
class TestRunTasks(test.TestCase):
class Pollster:
counters = []
test_data = counter.Counter(
source='test',
type='test',
volume=1,
user_id='test',
project_id='test',
resource_id='test_run_tasks',
timestamp=datetime.datetime.utcnow().isoformat(),
duration=0,
resource_metadata={'name': 'Pollster',
},
)
def get_counters(self, manager, context):
self.counters.append((manager, context))
return ['test data']
return [self.test_data]
mgr = manager.AgentManager()
mgr.pollsters = [('test', Pollster())]
mgr.periodic_tasks('context')
assert Pollster.counters[0] == (mgr, 'context')
def faux_notify(self, context, topic, msg):
self.notifications.append((topic, msg))
def setUp(self):
super(TestRunTasks, self).setUp()
self.notifications = []
self.stubs.Set(rpc, 'cast', self.faux_notify)
self.mgr = manager.AgentManager()
self.mgr.pollsters = [('test', self.Pollster())]
self.ctx = context.RequestContext("user", "project")
self.mgr.periodic_tasks(self.ctx)
def test_message(self):
assert self.Pollster.counters[0][1] is self.ctx
def test_notify(self):
assert len(self.notifications) == 2
def test_notify_topics(self):
topics = [n[0] for n in self.notifications]
assert topics == ['metering', 'metering.test']

View File

@ -70,7 +70,7 @@ def test_c1():
for name, actual, expected in [
('counter_type', info.type, 'instance'),
('counter_volume', info.volume, 1),
('counter_datetime', info.datetime,
('timestamp', info.timestamp,
INSTANCE_CREATE_END['timestamp']),
('resource_id', info.resource_id,
INSTANCE_CREATE_END['payload']['instance_id']),

View File

@ -21,6 +21,8 @@
from ceilometer import counter
from ceilometer import meter
from nova import flags
def test_compute_signature_change_key():
sig1 = meter.compute_signature({'a': 'A', 'b': 'B'})
@ -48,13 +50,25 @@ def test_compute_signature_signed():
assert sig1 == sig2
def test_compute_signature_use_configured_secret():
data = {'a': 'A', 'b': 'B'}
sig1 = meter.compute_signature(data)
old_secret = flags.FLAGS.metering_secret
try:
flags.FLAGS.metering_secret = 'not the default value'
sig2 = meter.compute_signature(data)
finally:
flags.FLAGS.metering_secret = old_secret
assert sig1 != sig2
TEST_COUNTER = counter.Counter(source='src',
type='typ',
volume=1,
user_id='user',
project_id='project',
resource_id=2,
datetime='today',
timestamp='today',
duration=3,
resource_metadata={'key': 'value'},
)
@ -102,13 +116,17 @@ def test_meter_message_from_counter_signed():
assert 'message_signature' in msg
def test_meter_message_from_counter_event_type():
msg = meter.meter_message_from_counter(TEST_COUNTER)
assert msg['event_type'] == 'metering.' + TEST_COUNTER.type
def test_meter_message_from_counter_field():
def compare(f, c, msg_f, msg):
assert msg == c
msg = meter.meter_message_from_counter(TEST_COUNTER)
name_map = {'type': 'counter_type',
'volume': 'counter_volume',
'datetime': 'counter_datetime',
'duration': 'counter_duration',
}
for f in TEST_COUNTER._fields:

View File

@ -24,7 +24,7 @@ def test_send_messages():
conn = mox.MockObject(impl_kombu.Connection)
conn.topic_send('notifications.info', message)
mox.Replay(conn)
notificationclient.send_messages(conn, input)
notificationclient.send_messages(conn, 'notifications.info', input)
mox.Verify(conn)
return
@ -35,6 +35,6 @@ def test_record_messages():
mox.IsA(types.FunctionType))
conn.consume()
mox.Replay(conn)
notificationclient.record_messages(conn, StringIO())
notificationclient.record_messages(conn, 'notifications.info', StringIO())
mox.Verify(conn)
return

View File

@ -34,10 +34,8 @@ from nova.openstack.common import cfg
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
NOTIFICATION_TOPIC = 'notifications.info'
def record_messages(connection, output):
def record_messages(connection, topic, output):
"""Listen to notification.info messages and pickle them to output."""
def process_event(body):
print ('%s: %s' %
@ -46,16 +44,38 @@ def record_messages(connection, output):
))
pickle.dump(body, output)
connection.declare_topic_consumer(NOTIFICATION_TOPIC, process_event)
connection.declare_topic_consumer(topic, process_event)
try:
connection.consume()
# for i in connection.iterconsume(5):
# print 'iteration', i
except KeyboardInterrupt:
pass
def send_messages(connection, input):
def monitor_messages(connection, topic):
"""Listen to notification.info messages and print them."""
def process_event(msg):
body = msg['args']['data']
if 'resource_id' in body:
print ('%s: %s/%-15s: %s' %
(body.get('timestamp'),
body.get('resource_id'),
body.get('event_type'),
body.get('counter_volume'),
))
else:
print ('%s: %s' %
(body.get('timestamp'),
body.get('event_type'),
))
connection.declare_topic_consumer(topic, process_event)
try:
connection.consume()
except KeyboardInterrupt:
pass
def send_messages(connection, topic, input):
"""Read messages from the input and send them to the AMQP queue."""
while True:
try:
@ -66,7 +86,7 @@ def send_messages(connection, input):
(body.get('timestamp'),
body.get('event_type', 'unknown event'),
))
connection.topic_send(NOTIFICATION_TOPIC, body)
connection.topic_send(topic, body)
def main():
@ -91,12 +111,18 @@ def main():
description='record or play back notification events',
)
parser.add_argument('mode',
choices=('record', 'replay'),
choices=('record', 'replay', 'monitor'),
help='operating mode',
)
parser.add_argument('data_file',
default='msgs.dat',
nargs='?',
help='the data file to read or write',
)
parser.add_argument('--topic',
default='notifications.info',
help='the exchange topic to listen for',
)
args = parser.parse_args(remaining_args[1:])
console = logging.StreamHandler(sys.stderr)
@ -111,10 +137,12 @@ def main():
try:
if args.mode == 'replay':
with open(args.data_file, 'rb') as input:
send_messages(connection, input)
send_messages(connection, args.topic, input)
elif args.mode == 'record':
with open(args.data_file, 'wb') as output:
record_messages(connection, output)
record_messages(connection, args.topic, output)
elif args.mode == 'monitor':
monitor_messages(connection, args.topic)
finally:
connection.close()