diff --git a/ceilometer/api/app.py b/ceilometer/api/app.py index 5004171f..0b469cce 100644 --- a/ceilometer/api/app.py +++ b/ceilometer/api/app.py @@ -45,7 +45,8 @@ def get_pecan_config(): def setup_app(pecan_config=None, extra_hooks=None): # FIXME: Replace DBHook with a hooks.TransactionHook app_hooks = [hooks.ConfigHook(), - hooks.DBHook()] + hooks.DBHook(), + hooks.PipelineHook()] if extra_hooks: app_hooks.extend(extra_hooks) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index f5a21373..365c10e5 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -38,9 +38,11 @@ import wsme import wsmeext.pecan as wsme_pecan from wsme import types as wtypes +from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer.openstack.common import timeutils from ceilometer import counter +from ceilometer import pipeline from ceilometer import storage LOG = log.getLogger(__name__) @@ -290,13 +292,26 @@ class Sample(_Base): message_id = wtypes.text "A unique identifier for the sample" - def __init__(self, counter_volume=None, resource_metadata={}, **kwds): + def __init__(self, counter_volume=None, resource_metadata={}, + timestamp=None, **kwds): if counter_volume is not None: counter_volume = float(counter_volume) resource_metadata = _flatten_metadata(resource_metadata) + # this is to make it easier for clients to pass a timestamp in + if timestamp and isinstance(timestamp, basestring): + timestamp = timeutils.parse_isotime(timestamp) + super(Sample, self).__init__(counter_volume=counter_volume, resource_metadata=resource_metadata, - **kwds) + timestamp=timestamp, **kwds) + # Seems the mandatory option doesn't work so do it manually + for m in ('counter_volume', 'counter_unit', + 'counter_name', 'counter_type', 'resource_id'): + if getattr(self, m) in (wsme.Unset, None): + raise wsme.exc.MissingArgument(m) + + if self.resource_metadata in (wtypes.Unset, None): + self.resource_metadata = {} @classmethod def sample(cls): @@ -427,6 +442,68 @@ class MeterController(rest.RestController): for e in pecan.request.storage_conn.get_samples(f) ] + @wsme.validate([Sample]) + @wsme_pecan.wsexpose([Sample], body=[Sample]) + def post(self, body): + """Post a list of new Samples to Ceilometer. + + :param body: a list of samples within the request body. + """ + # Note: + # 1) the above validate decorator seems to do nothing. + # 2) the mandatory options seems to also do nothing. + # 3) the body should already be in a list of Sample's + + def get_consistent_source(): + '''Find a source that can be applied across the sample group + or raise InvalidInput if the sources are inconsistent. + If all are None - use the configured counter_source + If any sample has source set then the others must be the same + or None. + ''' + source = None + for s in samples: + if source and s.source: + if source != s.source: + raise wsme.exc.InvalidInput('source', s.source, + 'can not post Samples %s' % + 'with different sources') + if s.source and not source: + source = s.source + return source or pecan.request.cfg.counter_source + + samples = [Sample(**b) for b in body] + now = timeutils.utcnow() + source = get_consistent_source() + for s in samples: + if self._id != s.counter_name: + raise wsme.exc.InvalidInput('counter_name', s.counter_name, + 'should be %s' % self._id) + if s.timestamp is None or s.timestamp is wsme.Unset: + s.timestamp = now + s.source = '%s:%s' % (s.project_id, source) + + with pipeline.PublishContext( + context.get_admin_context(), + source, + pecan.request.pipeline_manager.pipelines, + ) as publisher: + publisher([counter.Counter( + name=s.counter_name, + type=s.counter_type, + unit=s.counter_unit, + volume=s.counter_volume, + user_id=s.user_id, + project_id=s.project_id, + resource_id=s.resource_id, + timestamp=s.timestamp.isoformat(), + resource_metadata=s.resource_metadata) for s in samples]) + + # TODO(asalkeld) this is not ideal, it would be nice if the publisher + # returned the created sample message with message id (or at least the + # a list of message_ids). + return samples + @wsme_pecan.wsexpose([Statistics], [Query], int) def statistics(self, q=[], period=None): """Computes the statistics of the samples in the time range given. diff --git a/ceilometer/api/hooks.py b/ceilometer/api/hooks.py index 45603116..7455e5c9 100644 --- a/ceilometer/api/hooks.py +++ b/ceilometer/api/hooks.py @@ -3,6 +3,7 @@ # Copyright © 2012 New Dream Network, LLC (DreamHost) # # Author: Doug Hellmann +# Angus Salkeld # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -19,7 +20,10 @@ from oslo.config import cfg from pecan import hooks +from ceilometer import pipeline +from ceilometer import publisher from ceilometer import storage +from ceilometer import transformer class ConfigHook(hooks.PecanHook): @@ -42,3 +46,24 @@ class DBHook(hooks.PecanHook): # def after(self, state): # print 'method:', state.request.method # print 'response:', state.response.status + + +class PipelineHook(hooks.PecanHook): + '''Create and attach a pipeline to the request so that + new samples can be posted via the /v2/meters/ API. + ''' + + pipeline_manager = None + + def __init__(self): + if self.__class__.pipeline_manager is None: + # this is done here as the cfg options are not available + # when the file is imported. + self.__class__.pipeline_manager = pipeline.setup_pipeline( + transformer.TransformerExtensionManager( + 'ceilometer.transformer'), + publisher.PublisherExtensionManager( + 'ceilometer.publisher')) + + def before(self, state): + state.request.pipeline_manager = self.pipeline_manager diff --git a/ceilometer/tests/api.py b/ceilometer/tests/api.py index 50c9c0af..0d831c48 100644 --- a/ceilometer/tests/api.py +++ b/ceilometer/tests/api.py @@ -26,6 +26,7 @@ from oslo.config import cfg import pecan import pecan.testing +from ceilometer import service from ceilometer.openstack.common import jsonutils from ceilometer.api import acl from ceilometer.api.v1 import app as v1_app @@ -79,6 +80,7 @@ class FunctionalTest(db_test_base.TestBase): def setUp(self): super(FunctionalTest, self).setUp() + service.prepare_service() cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME) self.app = self._make_app() @@ -102,8 +104,8 @@ class FunctionalTest(db_test_base.TestBase): 'logging': { 'loggers': { - 'root': {'level': 'INFO', 'handlers': ['console']}, - 'wsme': {'level': 'INFO', 'handlers': ['console']}, + 'root': {'level': 'DEBUG', 'handlers': ['console']}, + 'wsme': {'level': 'DEBUG', 'handlers': ['console']}, 'ceilometer': {'level': 'DEBUG', 'handlers': ['console'], }, diff --git a/tests/api/v2/test_app.py b/tests/api/v2/test_app.py index b5666e4f..c7554c99 100644 --- a/tests/api/v2/test_app.py +++ b/tests/api/v2/test_app.py @@ -35,17 +35,23 @@ class TestApp(unittest.TestCase): cfg.CONF.reset() def test_keystone_middleware_conf(self): + service.prepare_service() cfg.CONF.set_override("auth_protocol", "foottp", group=acl.OPT_GROUP_NAME) cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME) + cfg.CONF.set_override("pipeline_cfg_file", + "../etc/ceilometer/pipeline.yaml") api_app = app.setup_app() self.assertEqual(api_app.auth_protocol, 'foottp') def test_keystone_middleware_parse_conffile(self): tmpfile = tempfile.mktemp() with open(tmpfile, "w") as f: - f.write("[%s]\nauth_protocol = barttp" % acl.OPT_GROUP_NAME) - f.write("\nauth_version = v2.0") + f.write("[DEFAULT]\n") + f.write("pipeline_cfg_file = ../etc/ceilometer/pipeline.yaml\n") + f.write("[%s]\n" % acl.OPT_GROUP_NAME) + f.write("auth_protocol = barttp\n") + f.write("auth_version = v2.0\n") service.prepare_service(['ceilometer-api', '--config-file=%s' % tmpfile]) api_app = app.setup_app() diff --git a/tests/api/v2/test_post_samples.py b/tests/api/v2/test_post_samples.py new file mode 100644 index 00000000..76f4bdac --- /dev/null +++ b/tests/api/v2/test_post_samples.py @@ -0,0 +1,207 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 Red Hat, Inc +# +# Author: Angus Salkeld +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Test listing raw events. +""" + +import copy +import datetime +import logging + +from ceilometer.openstack.common import rpc +from ceilometer.openstack.common import timeutils + +from .base import FunctionalTest + +LOG = logging.getLogger(__name__) + + +class TestPostSamples(FunctionalTest): + + def faux_cast(self, context, topic, msg): + self.published.append((topic, msg)) + + def setUp(self): + super(TestPostSamples, self).setUp() + self.published = [] + self.stubs.Set(rpc, 'cast', self.faux_cast) + + def test_one(self): + s1 = [{'counter_name': 'apples', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': 1, + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_metadata': {'name1': 'value1', + 'name2': 'value2'}}] + + data = self.post_json('/meters/apples/', s1) + + # timestamp not given so it is generated. + s1[0]['timestamp'] = data.json[0]['timestamp'] + # source is generated if not provided. + s1[0]['source'] = '%s:openstack' % s1[0]['project_id'] + + self.assertEquals(s1, data.json) + + def test_wrong_counter_name(self): + ''' + do not accept cross posting samples to different meters + i.e. my_counter_name != wrong + ''' + s1 = [{'counter_name': 'my_counter_name', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': 1, + 'source': 'closedstack', + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_metadata': {'name1': 'value1', + 'name2': 'value2'}}] + + data = self.post_json('/meters/wrong/', s1, expect_errors=True) + + self.assertEquals(data.status_int, 400) + + def test_multiple_samples(self): + ''' + send multiple samples. + The usecase here is to reduce the chatter and send the counters + at a slower cadence. + ''' + samples = [] + stamps = [] + for x in range(6): + dt = datetime.datetime(2012, 8, 27, x, 0, tzinfo=None) + stamps.append(dt) + s = {'counter_name': 'apples', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': float(x * 3), + 'source': 'evil', + 'timestamp': dt.isoformat(), + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_metadata': {'name1': str(x), + 'name2': str(x + 4)}} + samples.append(s) + + data = self.post_json('/meters/apples/', samples) + + # source is modified to include the project_id. + for x in range(6): + for (k, v) in samples[x].iteritems(): + if k == 'timestamp': + timestamp = timeutils.parse_isotime(data.json[x][k]) + self.assertEquals(stamps[x].replace(tzinfo=None), + timestamp.replace(tzinfo=None)) + elif k == 'source': + self.assertEquals(data.json[x][k], + '%s:%s' % (samples[x]['project_id'], + samples[x]['source'])) + else: + self.assertEquals(v, data.json[x][k]) + + def test_missing_mandatory_fields(self): + ''' + do not accept posting samples with missing mandatory fields + ''' + s1 = [{'counter_name': 'my_counter_name', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': 1, + 'source': 'closedstack', + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_metadata': {'name1': 'value1', + 'name2': 'value2'}}] + + # one by one try posting without a mandatory field. + for m in ['counter_volume', 'counter_unit', 'counter_type', + 'resource_id', 'counter_name']: + s_broke = copy.copy(s1) + del s_broke[0][m] + print('posting without %s' % m) + data = self.post_json('/meters/my_counter_name/', s_broke, + expect_errors=True) + self.assertEquals(data.status_int, 400) + + def test_multiple_sources(self): + ''' + do not accept a single post of mixed sources + ''' + s1 = [{'counter_name': 'my_counter_name', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': 1, + 'source': 'closedstack', + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + }, + {'counter_name': 'my_counter_name', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': 2, + 'source': 'not this', + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + 'resource_metadata': {'name1': 'value1', + 'name2': 'value2'}}] + data = self.post_json('/meters/my_counter_name/', s1, + expect_errors=True) + self.assertEquals(data.status_int, 400) + + def test_multiple_samples_some_null_sources(self): + ''' + do accept a single post with some null sources + this is a convience feature so you only have to set + one of the sample's source field. + ''' + s1 = [{'counter_name': 'my_counter_name', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': 1, + 'source': 'paperstack', + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + }, + {'counter_name': 'my_counter_name', + 'counter_type': 'gauge', + 'counter_unit': 'instance', + 'counter_volume': 2, + 'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68', + 'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff', + 'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36', + 'resource_metadata': {'name1': 'value1', + 'name2': 'value2'}}] + data = self.post_json('/meters/my_counter_name/', s1, + expect_errors=True) + self.assertEquals(data.status_int, 200) + for x in range(2): + for (k, v) in s1[x].iteritems(): + if k == 'source': + self.assertEquals(data.json[x][k], + '%s:%s' % (s1[x]['project_id'], + 'paperstack')) diff --git a/tests/test_bin.py b/tests/test_bin.py index de7f62f6..01523b0e 100644 --- a/tests/test_bin.py +++ b/tests/test_bin.py @@ -82,6 +82,8 @@ class BinApiTestCase(unittest.TestCase): "auth_strategy=noauth\n") tmp.write( "debug=true\n") + tmp.write( + "pipeline_cfg_file=../etc/ceilometer/pipeline.yaml\n") self.subp = subprocess.Popen(["../bin/ceilometer-api", "--config-file=%s" % self.tempfile])