Allow posting samples via the rest API (v2)

Currently this requires admin, the same as the other API

blueprint meter-post-api
Change-Id: Idfdd077999115140da94621a6920795ac0b8fe4e
This commit is contained in:
Angus Salkeld 2013-05-09 11:47:41 +10:00
parent 0eefaf3c88
commit c9065364d4
7 changed files with 327 additions and 7 deletions

View File

@ -45,7 +45,8 @@ def get_pecan_config():
def setup_app(pecan_config=None, extra_hooks=None): def setup_app(pecan_config=None, extra_hooks=None):
# FIXME: Replace DBHook with a hooks.TransactionHook # FIXME: Replace DBHook with a hooks.TransactionHook
app_hooks = [hooks.ConfigHook(), app_hooks = [hooks.ConfigHook(),
hooks.DBHook()] hooks.DBHook(),
hooks.PipelineHook()]
if extra_hooks: if extra_hooks:
app_hooks.extend(extra_hooks) app_hooks.extend(extra_hooks)

View File

@ -38,9 +38,11 @@ import wsme
import wsmeext.pecan as wsme_pecan import wsmeext.pecan as wsme_pecan
from wsme import types as wtypes from wsme import types as wtypes
from ceilometer.openstack.common import context
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import counter from ceilometer import counter
from ceilometer import pipeline
from ceilometer import storage from ceilometer import storage
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -290,13 +292,26 @@ class Sample(_Base):
message_id = wtypes.text message_id = wtypes.text
"A unique identifier for the sample" "A unique identifier for the sample"
def __init__(self, counter_volume=None, resource_metadata={}, **kwds): def __init__(self, counter_volume=None, resource_metadata={},
timestamp=None, **kwds):
if counter_volume is not None: if counter_volume is not None:
counter_volume = float(counter_volume) counter_volume = float(counter_volume)
resource_metadata = _flatten_metadata(resource_metadata) resource_metadata = _flatten_metadata(resource_metadata)
# this is to make it easier for clients to pass a timestamp in
if timestamp and isinstance(timestamp, basestring):
timestamp = timeutils.parse_isotime(timestamp)
super(Sample, self).__init__(counter_volume=counter_volume, super(Sample, self).__init__(counter_volume=counter_volume,
resource_metadata=resource_metadata, resource_metadata=resource_metadata,
**kwds) timestamp=timestamp, **kwds)
# Seems the mandatory option doesn't work so do it manually
for m in ('counter_volume', 'counter_unit',
'counter_name', 'counter_type', 'resource_id'):
if getattr(self, m) in (wsme.Unset, None):
raise wsme.exc.MissingArgument(m)
if self.resource_metadata in (wtypes.Unset, None):
self.resource_metadata = {}
@classmethod @classmethod
def sample(cls): def sample(cls):
@ -427,6 +442,68 @@ class MeterController(rest.RestController):
for e in pecan.request.storage_conn.get_samples(f) for e in pecan.request.storage_conn.get_samples(f)
] ]
@wsme.validate([Sample])
@wsme_pecan.wsexpose([Sample], body=[Sample])
def post(self, body):
"""Post a list of new Samples to Ceilometer.
:param body: a list of samples within the request body.
"""
# Note:
# 1) the above validate decorator seems to do nothing.
# 2) the mandatory options seems to also do nothing.
# 3) the body should already be in a list of Sample's
def get_consistent_source():
'''Find a source that can be applied across the sample group
or raise InvalidInput if the sources are inconsistent.
If all are None - use the configured counter_source
If any sample has source set then the others must be the same
or None.
'''
source = None
for s in samples:
if source and s.source:
if source != s.source:
raise wsme.exc.InvalidInput('source', s.source,
'can not post Samples %s' %
'with different sources')
if s.source and not source:
source = s.source
return source or pecan.request.cfg.counter_source
samples = [Sample(**b) for b in body]
now = timeutils.utcnow()
source = get_consistent_source()
for s in samples:
if self._id != s.counter_name:
raise wsme.exc.InvalidInput('counter_name', s.counter_name,
'should be %s' % self._id)
if s.timestamp is None or s.timestamp is wsme.Unset:
s.timestamp = now
s.source = '%s:%s' % (s.project_id, source)
with pipeline.PublishContext(
context.get_admin_context(),
source,
pecan.request.pipeline_manager.pipelines,
) as publisher:
publisher([counter.Counter(
name=s.counter_name,
type=s.counter_type,
unit=s.counter_unit,
volume=s.counter_volume,
user_id=s.user_id,
project_id=s.project_id,
resource_id=s.resource_id,
timestamp=s.timestamp.isoformat(),
resource_metadata=s.resource_metadata) for s in samples])
# TODO(asalkeld) this is not ideal, it would be nice if the publisher
# returned the created sample message with message id (or at least the
# a list of message_ids).
return samples
@wsme_pecan.wsexpose([Statistics], [Query], int) @wsme_pecan.wsexpose([Statistics], [Query], int)
def statistics(self, q=[], period=None): def statistics(self, q=[], period=None):
"""Computes the statistics of the samples in the time range given. """Computes the statistics of the samples in the time range given.

View File

@ -3,6 +3,7 @@
# Copyright © 2012 New Dream Network, LLC (DreamHost) # Copyright © 2012 New Dream Network, LLC (DreamHost)
# #
# Author: Doug Hellmann <doug.hellmann@dreamhost.com> # Author: Doug Hellmann <doug.hellmann@dreamhost.com>
# Angus Salkeld <asalkeld@redhat.com>
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain # not use this file except in compliance with the License. You may obtain
@ -19,7 +20,10 @@
from oslo.config import cfg from oslo.config import cfg
from pecan import hooks from pecan import hooks
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer import storage from ceilometer import storage
from ceilometer import transformer
class ConfigHook(hooks.PecanHook): class ConfigHook(hooks.PecanHook):
@ -42,3 +46,24 @@ class DBHook(hooks.PecanHook):
# def after(self, state): # def after(self, state):
# print 'method:', state.request.method # print 'method:', state.request.method
# print 'response:', state.response.status # print 'response:', state.response.status
class PipelineHook(hooks.PecanHook):
'''Create and attach a pipeline to the request so that
new samples can be posted via the /v2/meters/ API.
'''
pipeline_manager = None
def __init__(self):
if self.__class__.pipeline_manager is None:
# this is done here as the cfg options are not available
# when the file is imported.
self.__class__.pipeline_manager = pipeline.setup_pipeline(
transformer.TransformerExtensionManager(
'ceilometer.transformer'),
publisher.PublisherExtensionManager(
'ceilometer.publisher'))
def before(self, state):
state.request.pipeline_manager = self.pipeline_manager

View File

@ -26,6 +26,7 @@ from oslo.config import cfg
import pecan import pecan
import pecan.testing import pecan.testing
from ceilometer import service
from ceilometer.openstack.common import jsonutils from ceilometer.openstack.common import jsonutils
from ceilometer.api import acl from ceilometer.api import acl
from ceilometer.api.v1 import app as v1_app from ceilometer.api.v1 import app as v1_app
@ -79,6 +80,7 @@ class FunctionalTest(db_test_base.TestBase):
def setUp(self): def setUp(self):
super(FunctionalTest, self).setUp() super(FunctionalTest, self).setUp()
service.prepare_service()
cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME) cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME)
self.app = self._make_app() self.app = self._make_app()
@ -102,8 +104,8 @@ class FunctionalTest(db_test_base.TestBase):
'logging': { 'logging': {
'loggers': { 'loggers': {
'root': {'level': 'INFO', 'handlers': ['console']}, 'root': {'level': 'DEBUG', 'handlers': ['console']},
'wsme': {'level': 'INFO', 'handlers': ['console']}, 'wsme': {'level': 'DEBUG', 'handlers': ['console']},
'ceilometer': {'level': 'DEBUG', 'ceilometer': {'level': 'DEBUG',
'handlers': ['console'], 'handlers': ['console'],
}, },

View File

@ -35,17 +35,23 @@ class TestApp(unittest.TestCase):
cfg.CONF.reset() cfg.CONF.reset()
def test_keystone_middleware_conf(self): def test_keystone_middleware_conf(self):
service.prepare_service()
cfg.CONF.set_override("auth_protocol", "foottp", cfg.CONF.set_override("auth_protocol", "foottp",
group=acl.OPT_GROUP_NAME) group=acl.OPT_GROUP_NAME)
cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME) cfg.CONF.set_override("auth_version", "v2.0", group=acl.OPT_GROUP_NAME)
cfg.CONF.set_override("pipeline_cfg_file",
"../etc/ceilometer/pipeline.yaml")
api_app = app.setup_app() api_app = app.setup_app()
self.assertEqual(api_app.auth_protocol, 'foottp') self.assertEqual(api_app.auth_protocol, 'foottp')
def test_keystone_middleware_parse_conffile(self): def test_keystone_middleware_parse_conffile(self):
tmpfile = tempfile.mktemp() tmpfile = tempfile.mktemp()
with open(tmpfile, "w") as f: with open(tmpfile, "w") as f:
f.write("[%s]\nauth_protocol = barttp" % acl.OPT_GROUP_NAME) f.write("[DEFAULT]\n")
f.write("\nauth_version = v2.0") f.write("pipeline_cfg_file = ../etc/ceilometer/pipeline.yaml\n")
f.write("[%s]\n" % acl.OPT_GROUP_NAME)
f.write("auth_protocol = barttp\n")
f.write("auth_version = v2.0\n")
service.prepare_service(['ceilometer-api', service.prepare_service(['ceilometer-api',
'--config-file=%s' % tmpfile]) '--config-file=%s' % tmpfile])
api_app = app.setup_app() api_app = app.setup_app()

View File

@ -0,0 +1,207 @@
# -*- encoding: utf-8 -*-
#
# Copyright © 2013 Red Hat, Inc
#
# Author: Angus Salkeld <asalkeld@redhat.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test listing raw events.
"""
import copy
import datetime
import logging
from ceilometer.openstack.common import rpc
from ceilometer.openstack.common import timeutils
from .base import FunctionalTest
LOG = logging.getLogger(__name__)
class TestPostSamples(FunctionalTest):
def faux_cast(self, context, topic, msg):
self.published.append((topic, msg))
def setUp(self):
super(TestPostSamples, self).setUp()
self.published = []
self.stubs.Set(rpc, 'cast', self.faux_cast)
def test_one(self):
s1 = [{'counter_name': 'apples',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 1,
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
data = self.post_json('/meters/apples/', s1)
# timestamp not given so it is generated.
s1[0]['timestamp'] = data.json[0]['timestamp']
# source is generated if not provided.
s1[0]['source'] = '%s:openstack' % s1[0]['project_id']
self.assertEquals(s1, data.json)
def test_wrong_counter_name(self):
'''
do not accept cross posting samples to different meters
i.e. my_counter_name != wrong
'''
s1 = [{'counter_name': 'my_counter_name',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 1,
'source': 'closedstack',
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
data = self.post_json('/meters/wrong/', s1, expect_errors=True)
self.assertEquals(data.status_int, 400)
def test_multiple_samples(self):
'''
send multiple samples.
The usecase here is to reduce the chatter and send the counters
at a slower cadence.
'''
samples = []
stamps = []
for x in range(6):
dt = datetime.datetime(2012, 8, 27, x, 0, tzinfo=None)
stamps.append(dt)
s = {'counter_name': 'apples',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': float(x * 3),
'source': 'evil',
'timestamp': dt.isoformat(),
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_metadata': {'name1': str(x),
'name2': str(x + 4)}}
samples.append(s)
data = self.post_json('/meters/apples/', samples)
# source is modified to include the project_id.
for x in range(6):
for (k, v) in samples[x].iteritems():
if k == 'timestamp':
timestamp = timeutils.parse_isotime(data.json[x][k])
self.assertEquals(stamps[x].replace(tzinfo=None),
timestamp.replace(tzinfo=None))
elif k == 'source':
self.assertEquals(data.json[x][k],
'%s:%s' % (samples[x]['project_id'],
samples[x]['source']))
else:
self.assertEquals(v, data.json[x][k])
def test_missing_mandatory_fields(self):
'''
do not accept posting samples with missing mandatory fields
'''
s1 = [{'counter_name': 'my_counter_name',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 1,
'source': 'closedstack',
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
# one by one try posting without a mandatory field.
for m in ['counter_volume', 'counter_unit', 'counter_type',
'resource_id', 'counter_name']:
s_broke = copy.copy(s1)
del s_broke[0][m]
print('posting without %s' % m)
data = self.post_json('/meters/my_counter_name/', s_broke,
expect_errors=True)
self.assertEquals(data.status_int, 400)
def test_multiple_sources(self):
'''
do not accept a single post of mixed sources
'''
s1 = [{'counter_name': 'my_counter_name',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 1,
'source': 'closedstack',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
},
{'counter_name': 'my_counter_name',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 2,
'source': 'not this',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
data = self.post_json('/meters/my_counter_name/', s1,
expect_errors=True)
self.assertEquals(data.status_int, 400)
def test_multiple_samples_some_null_sources(self):
'''
do accept a single post with some null sources
this is a convience feature so you only have to set
one of the sample's source field.
'''
s1 = [{'counter_name': 'my_counter_name',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 1,
'source': 'paperstack',
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
},
{'counter_name': 'my_counter_name',
'counter_type': 'gauge',
'counter_unit': 'instance',
'counter_volume': 2,
'project_id': '35b17138-b364-4e6a-a131-8f3099c5be68',
'user_id': 'efd87807-12d2-4b38-9c70-5f5c2ac427ff',
'resource_id': 'bd9431c1-8d69-4ad3-803a-8d4a6b89fd36',
'resource_metadata': {'name1': 'value1',
'name2': 'value2'}}]
data = self.post_json('/meters/my_counter_name/', s1,
expect_errors=True)
self.assertEquals(data.status_int, 200)
for x in range(2):
for (k, v) in s1[x].iteritems():
if k == 'source':
self.assertEquals(data.json[x][k],
'%s:%s' % (s1[x]['project_id'],
'paperstack'))

View File

@ -82,6 +82,8 @@ class BinApiTestCase(unittest.TestCase):
"auth_strategy=noauth\n") "auth_strategy=noauth\n")
tmp.write( tmp.write(
"debug=true\n") "debug=true\n")
tmp.write(
"pipeline_cfg_file=../etc/ceilometer/pipeline.yaml\n")
self.subp = subprocess.Popen(["../bin/ceilometer-api", self.subp = subprocess.Popen(["../bin/ceilometer-api",
"--config-file=%s" % self.tempfile]) "--config-file=%s" % self.tempfile])