From 8f4ba1656c26ac7885c46f2fc8fcf9f519146f96 Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Tue, 15 May 2012 13:39:56 -0400 Subject: [PATCH] add a tool for recording notifications and replaying them Change-Id: I852a6fbef7b9bf02309f699419da0a2537ce7a90 --- .gitignore | 5 +- tests/test_tools_notificationclient.py | 39 ++++++++ tools/notificationclient.py | 124 +++++++++++++++++++++++++ 3 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 tests/test_tools_notificationclient.py create mode 100755 tools/notificationclient.py diff --git a/.gitignore b/.gitignore index 7e99e367..8027c175 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -*.pyc \ No newline at end of file +*.pyc +*.dat +TAGS +*.egg-info diff --git a/tests/test_tools_notificationclient.py b/tests/test_tools_notificationclient.py new file mode 100644 index 00000000..eb0ec649 --- /dev/null +++ b/tests/test_tools_notificationclient.py @@ -0,0 +1,39 @@ +import os +import cPickle as pickle +from StringIO import StringIO +import sys +import types + +import mox + +from nova.rpc import impl_kombu + +# The module being tested is part of the tools directory, +# so make sure it is in our import path. +sys.path.insert(0, os.path.normpath(os.path.join(os.path.dirname(__file__), + '..', 'tools'))) +import notificationclient + + +def test_send_messages(): + message = {'timestamp': 'date goes here', + 'event_type': 'compute.instance.exists', + # real messages have more fields... + } + input = StringIO(pickle.dumps(message)) + conn = mox.MockObject(impl_kombu.Connection) + conn.topic_send('notifications.info', message) + mox.Replay(conn) + notificationclient.send_messages(conn, input) + mox.Verify(conn) + return + + +def test_record_messages(): + conn = mox.MockObject(impl_kombu.Connection) + conn.declare_topic_consumer('notifications.info', mox.IsA(types.FunctionType)) + conn.consume() + mox.Replay(conn) + notificationclient.record_messages(conn, StringIO()) + mox.Verify(conn) + return diff --git a/tools/notificationclient.py b/tools/notificationclient.py new file mode 100755 index 00000000..21a01a8a --- /dev/null +++ b/tools/notificationclient.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 New Dream Network, LLC (DreamHost) +# +# Author: Doug Hellmann +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Command line tool for recording notification messages and replaying +them later. +""" + +import argparse +import logging +import cPickle as pickle +import sys + +from nova import flags +from nova import rpc +from nova import utils +from nova.openstack.common import cfg + +FLAGS = flags.FLAGS +LOG = logging.getLogger(__name__) + +NOTIFICATION_TOPIC = 'notifications.info' + + +def record_messages(connection, output): + """Listen to notification.info messages and pickle them to output.""" + def process_event(body): + print ('%s: %s' % + (body.get('timestamp'), + body.get('event_type', 'unknown event'), + )) + pickle.dump(body, output) + + connection.declare_topic_consumer(NOTIFICATION_TOPIC, process_event) + try: + connection.consume() + # for i in connection.iterconsume(5): + # print 'iteration', i + except KeyboardInterrupt: + pass + + +def send_messages(connection, input): + """Read messages from the input and send them to the AMQP queue.""" + while True: + try: + body = pickle.load(input) + except EOFError: + break + print('%s: %s' % + (body.get('timestamp'), + body.get('event_type', 'unknown event'), + )) + connection.topic_send(NOTIFICATION_TOPIC, body) + + +def main(): + rpc.register_opts(FLAGS) + FLAGS.register_opts([ + cfg.StrOpt('datafile', + default=None, + help='Data file to read or write', + ), + cfg.BoolOpt('record', + help='Record events', + ), + cfg.BoolOpt('replay', + help='Replay events', + ), + ]) + + remaining_args = FLAGS(sys.argv) + utils.monkey_patch() + + parser = argparse.ArgumentParser( + description='record or play back notification events', + ) + parser.add_argument('mode', + choices=('record', 'replay'), + help='operating mode', + ) + parser.add_argument('data_file', + help='the data file to read or write', + ) + args = parser.parse_args(remaining_args[1:]) + + console = logging.StreamHandler(sys.stderr) + console.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(message)s') + console.setFormatter(formatter) + root_logger = logging.getLogger('') + root_logger.addHandler(console) + root_logger.setLevel(logging.DEBUG) + + connection = rpc.create_connection() + try: + if args.mode == 'replay': + with open(args.data_file, 'rb') as input: + send_messages(connection, input) + elif args.mode == 'record': + with open(args.data_file, 'wb') as output: + record_messages(connection, output) + finally: + connection.close() + + return 0 + +if __name__ == '__main__': + main()