diff --git a/ceilometer/agent.py b/ceilometer/agent.py index 63d70367f5..3beddeb0f3 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -16,9 +16,10 @@ # License for the specific language governing permissions and limitations # under the License. -from ceilometer import publish +from stevedore import dispatch from ceilometer.openstack.common import cfg from ceilometer.openstack.common import log +from ceilometer import pipeline LOG = log.getLogger(__name__) @@ -26,7 +27,15 @@ LOG = log.getLogger(__name__) class AgentManager(object): def __init__(self, extension_manager): - self.ext_manager = extension_manager + publisher_manager = dispatch.NameDispatchExtensionManager( + namespace=pipeline.PUBLISHER_NAMESPACE, + check_func=lambda x: True, + invoke_on_load=True, + ) + + self.pipeline_manager = pipeline.setup_pipeline(publisher_manager) + + self.pollster_manager = extension_manager def publish_counters_from_one_pollster(self, ext, manager, context, *args, **kwargs): @@ -36,11 +45,10 @@ class AgentManager(object): LOG.info('Polling %s', ext.name) for c in ext.obj.get_counters(manager, *args, **kwargs): LOG.debug('Publishing counter: %s', c) - publish.publish_counter(context, c, - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - cfg.CONF.counter_source, - ) + manager.pipeline_manager.publish_counter( + context, c, + cfg.CONF.counter_source) + except Exception as err: LOG.warning('Continuing after error from %s: %s', ext.name, err) diff --git a/ceilometer/central/manager.py b/ceilometer/central/manager.py index a0a17dcf87..eb97b6894d 100644 --- a/ceilometer/central/manager.py +++ b/ceilometer/central/manager.py @@ -50,7 +50,7 @@ class AgentManager(agent.AgentManager): tenant_name=cfg.CONF.os_tenant_name, auth_url=cfg.CONF.os_auth_url) - self.ext_manager.map(self.publish_counters_from_one_pollster, - manager=self, - context=context, - ) + self.pollster_manager.map(self.publish_counters_from_one_pollster, + manager=self, + context=context, + ) diff --git a/ceilometer/collector/service.py b/ceilometer/collector/service.py index 4584086139..3abb1c276b 100644 --- a/ceilometer/collector/service.py +++ b/ceilometer/collector/service.py @@ -16,11 +16,11 @@ # License for the specific language governing permissions and limitations # under the License. -from stevedore import extension +from stevedore import dispatch from ceilometer.collector import meter from ceilometer import extension_manager -from ceilometer import publish +from ceilometer import pipeline from ceilometer import service from ceilometer import storage from ceilometer.openstack.common import context @@ -56,16 +56,23 @@ class CollectorService(service.PeriodicService): def initialize_service_hook(self, service): '''Consumers must be declared before consume_thread start.''' - self.ext_manager = extension_manager.ActivatedExtensionManager( - namespace=self.COLLECTOR_NAMESPACE, - disabled_names=cfg.CONF.disabled_notification_listeners, + publisher_manager = dispatch.NameDispatchExtensionManager( + namespace=pipeline.PUBLISHER_NAMESPACE, + check_func=lambda x: True, + invoke_on_load=True, ) + self.pipeline_manager = pipeline.setup_pipeline(publisher_manager) - if not list(self.ext_manager): + self.notification_manager = \ + extension_manager.ActivatedExtensionManager( + namespace=self.COLLECTOR_NAMESPACE, + disabled_names=cfg.CONF.disabled_notification_listeners, + ) + + if not list(self.notification_manager): LOG.warning('Failed to load any notification handlers for %s', self.COLLECTOR_NAMESPACE) - - self.ext_manager.map(self._setup_subscription) + self.notification_manager.map(self._setup_subscription) # Set ourselves up as a separate worker for the metering data, # since the default for service is to use create_consumer(). @@ -95,9 +102,9 @@ class CollectorService(service.PeriodicService): def process_notification(self, notification): """Make a notification processed by an handler.""" LOG.debug('notification %r', notification.get('event_type')) - self.ext_manager.map(self._process_notification_for_ext, - notification=notification, - ) + self.notification_manager.map(self._process_notification_for_ext, + notification=notification, + ) def _process_notification_for_ext(self, ext, notification): handler = ext.obj @@ -107,14 +114,11 @@ class CollectorService(service.PeriodicService): # FIXME(dhellmann): Spawn green thread? self.publish_counter(c) - @staticmethod - def publish_counter(counter): + def publish_counter(self, counter): """Create a metering message for the counter and publish it.""" ctxt = context.get_admin_context() - publish.publish_counter(ctxt, counter, - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - cfg.CONF.counter_source) + self.pipeline_manager.publish_counter(ctxt, counter, + cfg.CONF.counter_source) def record_metering_data(self, context, data): """This method is triggered when metering data is diff --git a/ceilometer/compute/manager.py b/ceilometer/compute/manager.py index 620897fa68..f2de36aec9 100644 --- a/ceilometer/compute/manager.py +++ b/ceilometer/compute/manager.py @@ -66,10 +66,10 @@ class AgentManager(agent.AgentManager): def poll_instance(self, context, instance): """Poll one instance.""" - self.ext_manager.map(self.publish_counters_from_one_pollster, - manager=self, - context=context, - instance=instance) + self.pollster_manager.map(self.publish_counters_from_one_pollster, + manager=self, + context=context, + instance=instance) def periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index 4d044a9913..c0eea3c5dd 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -20,11 +20,13 @@ from __future__ import absolute_import -from ceilometer import publish +from stevedore import dispatch + from ceilometer import counter from ceilometer.openstack.common import cfg from ceilometer.openstack.common import context from ceilometer.openstack.common import timeutils +from ceilometer import pipeline from swift.common.utils import split_path @@ -50,6 +52,13 @@ class CeilometerMiddleware(object): def __init__(self, app, conf): self.app = app cfg.CONF([], project='ceilometer') + publisher_manager = dispatch.NameDispatchExtensionManager( + namespace=pipeline.PUBLISHER_NAMESPACE, + check_func=lambda x: True, + invoke_on_load=True, + ) + + self.pipeline_manager = pipeline.setup_pipeline(publisher_manager) def __call__(self, env, start_response): start_response_args = [None] @@ -81,56 +90,50 @@ class CeilometerMiddleware(object): else: return iter_response(iterable) - @staticmethod - def publish_counter(env, bytes_received, bytes_sent): + def publish_counter(self, env, bytes_received, bytes_sent): req = Request(env) version, account, container, obj = split_path(req.path, 1, 4, True) now = timeutils.utcnow().isoformat() if bytes_received: - publish.publish_counter(context.get_admin_context(), - counter.Counter( - name='storage.objects.incoming.bytes', - type='delta', - unit='B', - volume=bytes_received, - user_id=env.get('HTTP_X_USER_ID'), - project_id=env.get('HTTP_X_TENANT_ID'), - resource_id=account.partition( - 'AUTH_')[2], - timestamp=now, - resource_metadata={ - "path": req.path, - "version": version, - "container": container, - "object": obj, - }, - ), - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - cfg.CONF.counter_source) + self.pipeline_manager.publish_counter( + context.get_admin_context(), + counter.Counter( + name='storage.objects.incoming.bytes', + type='delta', + unit='B', + volume=bytes_received, + user_id=env.get('HTTP_X_USER_ID'), + project_id=env.get('HTTP_X_TENANT_ID'), + resource_id=account.partition('AUTH_')[2], + timestamp=now, + resource_metadata={ + "path": req.path, + "version": version, + "container": container, + "object": obj, + }, ), + cfg.CONF.counter_source) if bytes_sent: - publish.publish_counter(context.get_admin_context(), - counter.Counter( - name='storage.objects.outgoing.bytes', - type='delta', - unit='B', - volume=bytes_sent, - user_id=env.get('HTTP_X_USER_ID'), - project_id=env.get('HTTP_X_TENANT_ID'), - resource_id=account.partition( - 'AUTH_')[2], - timestamp=now, - resource_metadata={ - "path": req.path, - "version": version, - "container": container, - "object": obj, - }), - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - cfg.CONF.counter_source) + self.pipeline_manager.publish_counter( + context.get_admin_context(), + counter.Counter( + name='storage.objects.outgoing.bytes', + type='delta', + unit='B', + volume=bytes_sent, + user_id=env.get('HTTP_X_USER_ID'), + project_id=env.get('HTTP_X_TENANT_ID'), + resource_id=account.partition('AUTH_')[2], + timestamp=now, + resource_metadata={ + "path": req.path, + "version": version, + "container": container, + "object": obj, + }), + cfg.CONF.counter_source) def filter_factory(global_conf, **local_conf): diff --git a/ceilometer/publisher/__init__.py b/ceilometer/publisher/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/ceilometer/publish.py b/ceilometer/publisher/meter_publish.py similarity index 61% rename from ceilometer/publish.py rename to ceilometer/publisher/meter_publish.py index 2d52fa2805..182c47303a 100644 --- a/ceilometer/publish.py +++ b/ceilometer/publisher/meter_publish.py @@ -22,6 +22,7 @@ from ceilometer.collector import meter from ceilometer.openstack.common import cfg from ceilometer.openstack.common import log from ceilometer.openstack.common import rpc +from ceilometer import plugin LOG = log.getLogger(__name__) @@ -42,21 +43,24 @@ def register_opts(config): register_opts(cfg.CONF) -def publish_counter(context, counter, topic, secret, source): - """Send a metering message for the data represented by the counter. +class MeterPublisher(plugin.PublisherBase): + def publish_counter(self, context, counter, source): + """Send a metering message for publishing - :param context: Execution context from the service or RPC call - :param counter: ceilometer.counter.Counter instance - :param source: counter source - """ - msg = { - 'method': 'record_metering_data', - 'version': '1.0', - 'args': {'data': meter.meter_message_from_counter(counter, - secret, - source), - }, - } - LOG.debug('PUBLISH: %s', str(msg)) - rpc.cast(context, topic, msg) - rpc.cast(context, topic + '.' + counter.name, msg) + :param context: Execution context from the service or RPC call + :param counter: Counter from pipeline after transformation + :param source: counter source + """ + topic = cfg.CONF.metering_topic + msg = { + 'method': 'record_metering_data', + 'version': '1.0', + 'args': {'data': meter.meter_message_from_counter( + counter, + cfg.CONF.metering_secret, + source), + }, + } + LOG.debug('PUBLISH: %s', str(msg)) + rpc.cast(context, topic, msg) + rpc.cast(context, topic + '.' + counter.name, msg) diff --git a/etc/ceilometer/pipeline.yaml b/etc/ceilometer/pipeline.yaml new file mode 100644 index 0000000000..96e0f25b06 --- /dev/null +++ b/etc/ceilometer/pipeline.yaml @@ -0,0 +1,9 @@ +--- +- + name: meter_pipeline + interval: 60 + counters: + - "*" + transformers: + publishers: + - meter_publisher diff --git a/setup.py b/setup.py index 5d5134a6c0..254efc35d9 100755 --- a/setup.py +++ b/setup.py @@ -130,6 +130,11 @@ setuptools.setup( [ceilometer.compute.virt] libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector + [ceilometer.transformer] + + [ceilometer.publisher] + meter_publisher = ceilometer.publisher.meter_publish:MeterPublisher + [paste.filter_factory] swift=ceilometer.objectstore.swift_middleware:filter_factory """), diff --git a/tests/central/test_manager.py b/tests/central/test_manager.py index a8b7d0acad..c8f0591803 100644 --- a/tests/central/test_manager.py +++ b/tests/central/test_manager.py @@ -19,20 +19,22 @@ """ import datetime +import mock from stevedore import extension from ceilometer.central import manager from ceilometer import counter -from ceilometer import publish +from ceilometer import pipeline from ceilometer.tests import base from ceilometer.openstack.common import cfg from keystoneclient.v2_0 import client as ksclient +@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_load_plugins(): mgr = manager.AgentManager() - assert list(mgr.ext_manager), 'Failed to load any plugins' + assert list(mgr.pollster_manager), 'Failed to load any plugins' return @@ -56,24 +58,22 @@ class TestRunTasks(base.TestCase): self.counters.append((manager, self.test_data)) return [self.test_data] - def faux_notify(self, context, msg, topic, secret, source): - self.notifications.append((msg, topic, secret, source)) - + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestRunTasks, self).setUp() - self.notifications = [] - self.stubs.Set(publish, 'publish_counter', self.faux_notify) self.stubs.Set(ksclient, 'Client', lambda *args, **kwargs: None) self.mgr = manager.AgentManager() - self.mgr.ext_manager = extension.ExtensionManager('fake', - invoke_on_load=False, - ) - self.mgr.ext_manager.extensions = [extension.Extension('test', - None, - None, - self.Pollster(), - ), - ] + + self.mgr.pollster_manager = extension.ExtensionManager( + 'fake', + invoke_on_load=False, + ) + self.mgr.pollster_manager.extensions = [ + extension.Extension('test', + None, + None, + self.Pollster(), ), + ] # Invoke the periodic tasks to call the pollsters. self.mgr.periodic_tasks(None) @@ -87,8 +87,7 @@ class TestRunTasks(base.TestCase): self.Pollster.test_data) def test_notifications(self): - actual = self.notifications - self.assertEqual(list(actual[0]), [self.Pollster.test_data, - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - cfg.CONF.counter_source]) + self.assertTrue(self.mgr.pipeline_manager.publish_counter.called) + args, _ = self.mgr.pipeline_manager.publish_counter.call_args + self.assertEqual(args[1], self.Pollster.test_data) + self.assertEqual(args[2], cfg.CONF.counter_source) diff --git a/tests/collector/test_manager.py b/tests/collector/test_manager.py index 3fab2c2894..9d5ddda6e9 100644 --- a/tests/collector/test_manager.py +++ b/tests/collector/test_manager.py @@ -21,6 +21,7 @@ from datetime import datetime from mock import patch +from mock import MagicMock from stevedore import extension from stevedore.tests import manager as test_manager @@ -28,6 +29,7 @@ from stevedore.tests import manager as test_manager from ceilometer.collector import meter from ceilometer.collector import service from ceilometer.openstack.common import cfg +from ceilometer import pipeline from ceilometer.storage import base from ceilometer.tests import base as tests_base from ceilometer.compute import notifications @@ -91,6 +93,7 @@ class TestCollectorService(tests_base.TestCase): self.ctx = None #cfg.CONF.metering_secret = 'not-so-secret' + @patch('ceilometer.pipeline.setup_pipeline', MagicMock()) def test_init_host(self): cfg.CONF.database_connection = 'log://localhost' # If we try to create a real RPC connection, init_host() never @@ -181,15 +184,14 @@ class TestCollectorService(tests_base.TestCase): self.srv.record_metering_data(self.ctx, msg) self.mox.VerifyAll() + @patch('ceilometer.pipeline.setup_pipeline', MagicMock()) def test_process_notification(self): # If we try to create a real RPC connection, init_host() never # returns. Mock it out so we can establish the manager # configuration. with patch('ceilometer.openstack.common.rpc.create_connection'): self.srv.start() - results = [] - self.stubs.Set(self.srv, 'publish_counter', results.append) - self.srv.ext_manager = test_manager.TestExtensionManager( + self.srv.notification_manager = test_manager.TestExtensionManager( [extension.Extension('test', None, None, @@ -197,4 +199,4 @@ class TestCollectorService(tests_base.TestCase): ), ]) self.srv.process_notification(TEST_NOTICE) - self.assert_(len(results) >= 1) + assert self.srv.pipeline_manager.publish_counter.called diff --git a/tests/compute/test_instance.py b/tests/compute/test_instance.py index d8e4b8027e..09cc0afa5c 100644 --- a/tests/compute/test_instance.py +++ b/tests/compute/test_instance.py @@ -44,6 +44,7 @@ class FauxInstance(object): class TestLocationMetadata(unittest.TestCase): + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): self.manager = manager.AgentManager() super(TestLocationMetadata, self).setUp() diff --git a/tests/compute/test_manager.py b/tests/compute/test_manager.py index 84884e636e..86118f21e3 100644 --- a/tests/compute/test_manager.py +++ b/tests/compute/test_manager.py @@ -19,21 +19,22 @@ """ import datetime - +import mock from stevedore import extension from ceilometer import nova_client from ceilometer.compute import manager from ceilometer import counter -from ceilometer import publish +from ceilometer import pipeline from ceilometer.tests import base from ceilometer.openstack.common import cfg +@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_load_plugins(): mgr = manager.AgentManager() - assert list(mgr.ext_manager), 'Failed to load any plugins' + assert list(mgr.pollster_manager), 'Failed to load any plugins' return @@ -57,23 +58,21 @@ class TestRunTasks(base.TestCase): self.counters.append((manager, instance)) return [self.test_data] - def faux_notify(self, context, msg, topic, secret, source): - self.notifications.append((msg, topic, secret, source)) - + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestRunTasks, self).setUp() - self.notifications = [] - self.stubs.Set(publish, 'publish_counter', self.faux_notify) self.mgr = manager.AgentManager() - self.mgr.ext_manager = extension.ExtensionManager('fake', - invoke_on_load=False, - ) - self.mgr.ext_manager.extensions = [extension.Extension('test', - None, - None, - self.Pollster(), - ), - ] + self.mgr.pollster_manager = extension.ExtensionManager( + 'fake', + invoke_on_load=False, + ) + self.mgr.pollster_manager.extensions = [ + extension.Extension('test', + None, + None, + self.Pollster(), ), + ] + # Set up a fake instance value to be returned by # instance_get_all_by_host() so when the manager gets the list # of instances to poll we can control the results. @@ -96,8 +95,7 @@ class TestRunTasks(base.TestCase): self.assertTrue(self.Pollster.counters[0][1] is self.instance) def test_notifications(self): - actual = self.notifications - self.assertEqual(list(actual[0]), [self.Pollster.test_data, - cfg.CONF.metering_topic, - cfg.CONF.metering_secret, - cfg.CONF.counter_source]) + self.assertTrue(self.mgr.pipeline_manager.publish_counter.called) + args, _ = self.mgr.pipeline_manager.publish_counter.call_args + self.assertEqual(args[1], self.Pollster.test_data) + self.assertEqual(args[2], cfg.CONF.counter_source) diff --git a/tests/compute/test_nova_notifier.py b/tests/compute/test_nova_notifier.py index f658a83449..f8e38b8859 100644 --- a/tests/compute/test_nova_notifier.py +++ b/tests/compute/test_nova_notifier.py @@ -44,7 +44,6 @@ except ImportError: notifier_api = None -from ceilometer import publish from ceilometer import counter from ceilometer.tests import base from ceilometer.tests import skip @@ -86,6 +85,7 @@ class TestNovaNotifier(base.TestCase): def fake_db_instance_system_metadata_get(context, uuid): return dict(meta_a=123, meta_b="foobar") + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) @skip.skip_unless(notifier_api, "Notifier API not found") def setUp(self): super(TestNovaNotifier, self).setUp() @@ -133,9 +133,8 @@ class TestNovaNotifier(base.TestCase): lambda context, uuid, kwargs: (self.instance, self.instance)) - self.stubs.Set(publish, 'publish_counter', self.do_nothing) agent_manager = manager.AgentManager() - agent_manager.ext_manager = \ + agent_manager.pollster_manager = \ test_manager.TestExtensionManager([ extension.Extension('test', None, diff --git a/tests/compute/test_pollsters.py b/tests/compute/test_pollsters.py index c637a81b96..3cc114cf1e 100644 --- a/tests/compute/test_pollsters.py +++ b/tests/compute/test_pollsters.py @@ -51,6 +51,7 @@ class TestInstancePollster(TestPollsterBase): def setUp(self): super(TestInstancePollster, self).setUp() + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_get_counters(self): self.mox.ReplayAll() @@ -67,6 +68,7 @@ class TestDiskIOPollster(TestPollsterBase): def setUp(self): super(TestDiskIOPollster, self).setUp() + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_get_counters(self): disks = [ (virt_inspector.Disk(device='vda'), @@ -102,6 +104,7 @@ class TestNetPollster(TestPollsterBase): def setUp(self): super(TestNetPollster, self).setUp() + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_get_counters(self): vnic0 = virt_inspector.Interface( name='vnet0', @@ -157,6 +160,7 @@ class TestCPUPollster(TestPollsterBase): def setUp(self): super(TestCPUPollster, self).setUp() + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_get_counters(self): self.inspector.inspect_cpus(self.instance.name).AndReturn( virt_inspector.CPUStats(time=1 * (10 ** 6), number=2)) diff --git a/tests/energy/test_kwapi.py b/tests/energy/test_kwapi.py index dfbceb4937..13af385bc7 100644 --- a/tests/energy/test_kwapi.py +++ b/tests/energy/test_kwapi.py @@ -15,6 +15,7 @@ # under the License. import datetime +import mock from ceilometer.tests import base from ceilometer.energy import kwapi @@ -60,6 +61,7 @@ class TestKwapiPollster(base.TestCase): probe_dict['id'] = key yield probe_dict + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestKwapiPollster, self).setUp() self.context = context.get_admin_context() diff --git a/tests/image/test_glance.py b/tests/image/test_glance.py index b813091bd2..8500f138aa 100644 --- a/tests/image/test_glance.py +++ b/tests/image/test_glance.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +import mock + from ceilometer.tests import base from ceilometer.image import glance from ceilometer.central import manager @@ -96,6 +98,7 @@ class TestImagePollster(base.TestCase): def fake_glance_iter_images(self, ksclient): return iter(IMAGE_LIST) + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestImagePollster, self).setUp() self.context = context.get_admin_context() diff --git a/tests/network/test_floatingip.py b/tests/network/test_floatingip.py index 36e29a9381..9e6e7308ec 100644 --- a/tests/network/test_floatingip.py +++ b/tests/network/test_floatingip.py @@ -28,6 +28,7 @@ from ceilometer.tests import base class TestFloatingIPPollster(base.TestCase): + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestFloatingIPPollster, self).setUp() self.context = context.get_admin_context() diff --git a/tests/objectstore/test_swift.py b/tests/objectstore/test_swift.py index 4282afb61f..57ac34c65f 100644 --- a/tests/objectstore/test_swift.py +++ b/tests/objectstore/test_swift.py @@ -17,6 +17,7 @@ # License for the specific language governing permissions and limitations # under the License. +import mock from ceilometer.central import manager from ceilometer.objectstore import swift from ceilometer.tests import base @@ -45,6 +46,7 @@ class TestSwiftPollster(base.TestCase): for i in ACCOUNTS: yield i + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestSwiftPollster, self).setUp() self.pollster = swift.SwiftPollster() diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index ee0510a1a6..a018e2e28c 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -23,6 +23,7 @@ from webob import Request from ceilometer.tests import base from ceilometer.objectstore import swift_middleware from ceilometer.openstack.common import rpc +from ceilometer import pipeline class FakeApp(object): @@ -41,30 +42,37 @@ class FakeApp(object): class TestSwiftMiddleware(base.TestCase): + class _faux_pipeline_manager(): + def __init__(self): + self.counters = [] + + def publish_counter(self, context, counter, source): + self.counters.append(counter) + + def _faux_setup_pipeline(self, publisher_manager): + return self.pipeline_manager + def setUp(self): super(TestSwiftMiddleware, self).setUp() - self.notifications = [] - self.stubs.Set(rpc, 'cast', self._faux_notify) + self.pipeline_manager = self._faux_pipeline_manager() + self.stubs.Set(pipeline, 'setup_pipeline', self._faux_setup_pipeline) @staticmethod def start_response(*args): pass - def _faux_notify(self, context, topic, msg): - self.notifications.append((topic, msg)) - def test_get(self): app = swift_middleware.CeilometerMiddleware(FakeApp(), {}) req = Request.blank('/1.0/account/container/obj', environ={'REQUEST_METHOD': 'GET'}) resp = app(req.environ, self.start_response) self.assertEqual(list(resp), ["This string is 28 bytes long"]) - self.assertEqual(len(self.notifications), 2) - data = self.notifications[0][1]['args']['data'] - self.assertEqual(data['counter_volume'], 28) - self.assertEqual(data['resource_metadata']['version'], '1.0') - self.assertEqual(data['resource_metadata']['container'], 'container') - self.assertEqual(data['resource_metadata']['object'], 'obj') + self.assertEqual(len(self.pipeline_manager.counters), 1) + data = self.pipeline_manager.counters[0] + self.assertEqual(data.volume, 28) + self.assertEqual(data.resource_metadata['version'], '1.0') + self.assertEqual(data.resource_metadata['container'], 'container') + self.assertEqual(data.resource_metadata['object'], 'obj') def test_put(self): app = swift_middleware.CeilometerMiddleware(FakeApp(body=['']), {}) @@ -73,12 +81,12 @@ class TestSwiftMiddleware(base.TestCase): 'wsgi.input': StringIO.StringIO('some stuff')}) resp = list(app(req.environ, self.start_response)) - self.assertEqual(len(self.notifications), 2) - data = self.notifications[0][1]['args']['data'] - self.assertEqual(data['counter_volume'], 10) - self.assertEqual(data['resource_metadata']['version'], '1.0') - self.assertEqual(data['resource_metadata']['container'], 'container') - self.assertEqual(data['resource_metadata']['object'], 'obj') + self.assertEqual(len(self.pipeline_manager.counters), 1) + data = self.pipeline_manager.counters[0] + self.assertEqual(data.volume, 10) + self.assertEqual(data.resource_metadata['version'], '1.0') + self.assertEqual(data.resource_metadata['container'], 'container') + self.assertEqual(data.resource_metadata['object'], 'obj') def test_post(self): app = swift_middleware.CeilometerMiddleware(FakeApp(body=['']), {}) @@ -87,21 +95,21 @@ class TestSwiftMiddleware(base.TestCase): 'wsgi.input': StringIO.StringIO('some other stuff')}) resp = list(app(req.environ, self.start_response)) - self.assertEqual(len(self.notifications), 2) - data = self.notifications[0][1]['args']['data'] - self.assertEqual(data['counter_volume'], 16) - self.assertEqual(data['resource_metadata']['version'], '1.0') - self.assertEqual(data['resource_metadata']['container'], 'container') - self.assertEqual(data['resource_metadata']['object'], 'obj') + self.assertEqual(len(self.pipeline_manager.counters), 1) + data = self.pipeline_manager.counters[0] + self.assertEqual(data.volume, 16) + self.assertEqual(data.resource_metadata['version'], '1.0') + self.assertEqual(data.resource_metadata['container'], 'container') + self.assertEqual(data.resource_metadata['object'], 'obj') def test_get_container(self): app = swift_middleware.CeilometerMiddleware(FakeApp(), {}) req = Request.blank('/1.0/account/container', environ={'REQUEST_METHOD': 'GET'}) resp = list(app(req.environ, self.start_response)) - self.assertEqual(len(self.notifications), 2) - data = self.notifications[0][1]['args']['data'] - self.assertEqual(data['counter_volume'], 28) - self.assertEqual(data['resource_metadata']['version'], '1.0') - self.assertEqual(data['resource_metadata']['container'], 'container') - self.assertEqual(data['resource_metadata']['object'], None) + self.assertEqual(len(self.pipeline_manager.counters), 1) + data = self.pipeline_manager.counters[0] + self.assertEqual(data.volume, 28) + self.assertEqual(data.resource_metadata['version'], '1.0') + self.assertEqual(data.resource_metadata['container'], 'container') + self.assertEqual(data.resource_metadata['object'], None) diff --git a/tests/publisher/__init__.py b/tests/publisher/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/test_publish.py b/tests/publisher/test_meter_publisher.py similarity index 85% rename from tests/test_publish.py rename to tests/publisher/test_meter_publisher.py index 8de2613154..fe6aa302af 100644 --- a/tests/test_publish.py +++ b/tests/publisher/test_meter_publisher.py @@ -24,7 +24,7 @@ from ceilometer.openstack.common import rpc from ceilometer.tests import base from ceilometer import counter -from ceilometer import publish +from ceilometer.publisher import meter_publish class TestPublish(base.TestCase): @@ -48,12 +48,11 @@ class TestPublish(base.TestCase): super(TestPublish, self).setUp() self.notifications = [] self.stubs.Set(rpc, 'cast', self.faux_notify) - publish.publish_counter(None, - self.test_data, - 'metering', - 'not-so-secret', - 'test', - ) + publisher = meter_publish.MeterPublisher() + publisher.publish_counter(None, + self.test_data, + 'test', + ) def test_notify(self): assert len(self.notifications) == 2