make http publisher equivalent to dispatcher

we shouldn't have two things doing the same thing slightly differently.
this brings publisher and dispatcher to equivalency.

this patch:
- adds support for batch and verify_ssl
- fixes docstring on usage
- updates timeout default to 5s rather than 1s
- posts entire event rather than just subset (raw_only for old format)
- share a session rather than create new session each message

Change-Id: I8bc12064a8828b9a35af897740c25c377071f60d
Related-Bug: #1630570
This commit is contained in:
gord chung 2016-11-11 20:51:12 +00:00 committed by gordon chung
parent e016938d64
commit e2e74892da
4 changed files with 151 additions and 60 deletions
ceilometer
dispatcher
publisher
tests/unit/publisher
releasenotes/notes

@ -14,6 +14,7 @@
import json
from debtcollector import removals
from oslo_config import cfg
from oslo_log import log
from oslo_utils import strutils
@ -48,6 +49,8 @@ http_dispatcher_opts = [
]
@removals.removed_class("HttpDispatcher", message="Use http publisher instead",
removal_version="9.0.0")
class HttpDispatcher(dispatcher.MeterDispatcherBase,
dispatcher.EventDispatcherBase):
"""Dispatcher class for posting metering/event data into a http target.

@ -15,6 +15,7 @@
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import strutils
import requests
from requests import adapters
from six.moves.urllib import parse as urlparse
@ -26,12 +27,16 @@ LOG = log.getLogger(__name__)
class HttpPublisher(publisher.ConfigPublisherBase):
"""Publisher metering data to a http endpoint
"""Publish metering data to a http endpoint
The publisher which records metering data into a http endpoint. The
This publisher pushes metering data to a specified http endpoint. The
endpoint should be configured in ceilometer pipeline configuration file.
If the timeout and/or retry_count are not specified, the default timeout
and retry_count will be set to 1000 and 2 respectively.
If the `timeout` and/or `max_retries` are not specified, the default
`timeout` and `max_retries` will be set to 5 and 2 respectively. Additional
parameters are:
- ssl can be enabled by setting `verify_ssl`
- batching can be configured by `batch`
To use this publisher for samples, add the following section to the
/etc/ceilometer/pipeline.yaml file or simply add it to an existing
@ -43,16 +48,9 @@ class HttpPublisher(publisher.ConfigPublisherBase):
- "*"
transformers:
publishers:
- http://host:80/path?timeout=1&max_retries=2
- http://host:80/path?timeout=1&max_retries=2&batch=False
To use this publisher for events, the raw message needs to be present in
the event. To enable that, ceilometer.conf file will need to have a
section like the following:
[event]
store_raw = info
Then in the event_pipeline.yaml file, you can use the publisher in one of
In the event_pipeline.yaml file, you can use the publisher in one of
the sinks like the following:
- name: event_sink
@ -60,7 +58,6 @@ class HttpPublisher(publisher.ConfigPublisherBase):
publishers:
- http://host:80/path?timeout=1&max_retries=2
Http end point is required for this publisher to work properly.
"""
def __init__(self, conf, parsed_url):
@ -79,59 +76,76 @@ class HttpPublisher(publisher.ConfigPublisherBase):
self.headers = {'Content-type': 'application/json'}
# Handling other configuration options in the query string
if parsed_url.query:
params = urlparse.parse_qs(parsed_url.query)
self.timeout = self._get_param(params, 'timeout', 1)
self.max_retries = self._get_param(params, 'max_retries', 2)
else:
self.timeout = 1
self.max_retries = 2
params = urlparse.parse_qs(parsed_url.query)
self.timeout = self._get_param(params, 'timeout', 5, int)
self.max_retries = self._get_param(params, 'max_retries', 2, int)
self.poster = (
self._do_post if strutils.bool_from_string(self._get_param(
params, 'batch', True)) else self._individual_post)
try:
self.verify_ssl = strutils.bool_from_string(
self._get_param(params, 'verify_ssl', None), strict=True)
except ValueError:
self.verify_ssl = (self._get_param(params, 'verify_ssl', None)
or True)
self.raw_only = strutils.bool_from_string(
self._get_param(params, 'raw_only', False))
# TODO(gordc): support configurable max connections
self.session = requests.Session()
# FIXME(gordc): support https in addition to http
self.session.mount(self.target,
adapters.HTTPAdapter(max_retries=self.max_retries))
LOG.debug('HttpPublisher for endpoint %s is initialized!' %
self.target)
@staticmethod
def _get_param(params, name, default_value):
def _get_param(params, name, default_value, cast=None):
try:
return int(params.get(name)[-1])
return cast(params.get(name)[-1]) if cast else params.get(name)[-1]
except (ValueError, TypeError):
LOG.debug('Default value %(value)s is used for %(name)s' %
{'value': default_value, 'name': name})
return default_value
def _individual_post(self, data):
for d in data:
self._do_post(d)
def _do_post(self, data):
if not data:
LOG.debug('Data set is empty!')
return
session = requests.Session()
session.mount(self.target,
adapters.HTTPAdapter(max_retries=self.max_retries))
content = ','.join([jsonutils.dumps(item) for item in data])
content = '[' + content + ']'
LOG.debug('Data to be posted by HttpPublisher: %s', content)
res = session.post(self.target, data=content, headers=self.headers,
timeout=self.timeout)
if res.status_code >= 300:
LOG.error(_LE('Data post failed with status code %s'),
res.status_code)
data = jsonutils.dumps(data)
LOG.trace('Message: %s', data)
try:
res = self.session.post(self.target, data=data,
headers=self.headers, timeout=self.timeout,
verify=self.verify_ssl)
res.raise_for_status()
LOG.debug('Message posting to %s: status code %d.',
self.target, res.status_code)
except requests.exceptions.HTTPError:
LOG.exception(_LE('Status Code: %(code)s. '
'Failed to dispatch message: %(data)s') %
{'code': res.status_code, 'data': data})
def publish_samples(self, samples):
"""Send a metering message for publishing
:param samples: Samples from pipeline after transformation
"""
data = [sample.as_dict() for sample in samples]
self._do_post(data)
self.poster([sample.as_dict() for sample in samples])
def publish_events(self, events):
"""Send an event message for publishing
:param events: events from pipeline after transformation
"""
data = [evt.as_dict()['raw']['payload'] for evt in events
if evt.as_dict().get('raw', {}).get('payload')]
self._do_post(data)
if self.raw_only:
data = [evt.as_dict()['raw']['payload'] for evt in events
if evt.as_dict().get('raw', {}).get('payload')]
else:
data = [event.serialize() for event in events]
self.poster(data)

@ -71,12 +71,7 @@ class TestHttpPublisher(base.BaseTestCase):
event_data = [event.Event(
message_id=str(uuid.uuid4()), event_type='event_%d' % i,
generated=datetime.datetime.utcnow().isoformat(),
traits=[], raw={'payload': {'some': 'aa'}}) for i in range(0, 2)]
empty_event_data = [event.Event(
message_id=str(uuid.uuid4()), event_type='event_%d' % i,
generated=datetime.datetime.utcnow().isoformat(),
traits=[], raw={'payload': {}}) for i in range(0, 2)]
traits=[], raw={'payload': {'some': 'aa'}}) for i in range(3)]
def setUp(self):
super(TestHttpPublisher, self).setUp()
@ -96,9 +91,9 @@ class TestHttpPublisher(base.BaseTestCase):
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(self.CONF, parsed_url)
# By default, timeout and retry_count should be set to 1000 and 2
# By default, timeout and retry_count should be set to 5 and 2
# respectively
self.assertEqual(1, publisher.timeout)
self.assertEqual(5, publisher.timeout)
self.assertEqual(2, publisher.max_retries)
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
@ -116,7 +111,7 @@ class TestHttpPublisher(base.BaseTestCase):
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'max_retries=6')
publisher = http.HttpPublisher(self.CONF, parsed_url)
self.assertEqual(1, publisher.timeout)
self.assertEqual(5, publisher.timeout)
self.assertEqual(6, publisher.max_retries)
@mock.patch('ceilometer.publisher.http.LOG')
@ -125,22 +120,23 @@ class TestHttpPublisher(base.BaseTestCase):
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(self.CONF, parsed_url)
res = mock.Mock()
res = requests.Response()
res.status_code = 200
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_samples(self.sample_data)
self.assertEqual(1, m_req.call_count)
self.assertFalse(thelog.error.called)
self.assertFalse(thelog.exception.called)
res = requests.Response()
res.status_code = 401
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_samples(self.sample_data)
self.assertEqual(1, m_req.call_count)
self.assertTrue(thelog.error.called)
self.assertTrue(thelog.exception.called)
@mock.patch('ceilometer.publisher.http.LOG')
def test_http_post_events(self, thelog):
@ -148,33 +144,100 @@ class TestHttpPublisher(base.BaseTestCase):
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(self.CONF, parsed_url)
res = mock.Mock()
res = requests.Response()
res.status_code = 200
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_events(self.event_data)
self.assertEqual(1, m_req.call_count)
self.assertFalse(thelog.error.called)
self.assertFalse(thelog.exception.called)
res = requests.Response()
res.status_code = 401
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_events(self.event_data)
self.assertEqual(1, m_req.call_count)
self.assertTrue(thelog.error.called)
self.assertTrue(thelog.exception.called)
@mock.patch('ceilometer.publisher.http.LOG')
def test_http_post_empty_data(self, thelog):
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(self.CONF, parsed_url)
res = mock.Mock()
res = requests.Response()
res.status_code = 200
with mock.patch.object(requests.Session, 'post',
return_value=res) as m_req:
publisher.publish_events(self.empty_event_data)
publisher.publish_events([])
self.assertEqual(0, m_req.call_count)
self.assertTrue(thelog.debug.called)
def _post_batch_control_test(self, method, data, batch):
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'batch=%s' % batch)
publisher = http.HttpPublisher(self.CONF, parsed_url)
with mock.patch.object(requests.Session, 'post') as post:
getattr(publisher, method)(data)
self.assertEqual(1 if batch else 3, post.call_count)
def test_post_batch_sample(self):
self._post_batch_control_test('publish_samples', self.sample_data, 1)
def test_post_no_batch_sample(self):
self._post_batch_control_test('publish_samples', self.sample_data, 0)
def test_post_batch_event(self):
self._post_batch_control_test('publish_events', self.event_data, 1)
def test_post_no_batch_event(self):
self._post_batch_control_test('publish_events', self.event_data, 0)
def test_post_verify_ssl_default(self):
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(self.CONF, parsed_url)
with mock.patch.object(requests.Session, 'post') as post:
publisher.publish_samples(self.sample_data)
self.assertTrue(post.call_args[1]['verify'])
def test_post_verify_ssl_True(self):
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'verify_ssl=True')
publisher = http.HttpPublisher(self.CONF, parsed_url)
with mock.patch.object(requests.Session, 'post') as post:
publisher.publish_samples(self.sample_data)
self.assertTrue(post.call_args[1]['verify'])
def test_post_verify_ssl_False(self):
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'verify_ssl=False')
publisher = http.HttpPublisher(self.CONF, parsed_url)
with mock.patch.object(requests.Session, 'post') as post:
publisher.publish_samples(self.sample_data)
self.assertFalse(post.call_args[1]['verify'])
def test_post_verify_ssl_path(self):
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'verify_ssl=/path/to/cert.crt')
publisher = http.HttpPublisher(self.CONF, parsed_url)
with mock.patch.object(requests.Session, 'post') as post:
publisher.publish_samples(self.sample_data)
self.assertEqual('/path/to/cert.crt', post.call_args[1]['verify'])
def test_post_raw_only(self):
parsed_url = urlparse.urlparse('http://localhost:90/path1?raw_only=1')
publisher = http.HttpPublisher(self.CONF, parsed_url)
with mock.patch.object(requests.Session, 'post') as post:
publisher.publish_events(self.event_data)
self.assertEqual(
'[{"some": "aa"}, {"some": "aa"}, {"some": "aa"}]',
post.call_args[1]['data'])

@ -0,0 +1,11 @@
---
upgrade:
- Configuration values can passed in via the querystring of publisher in
pipeline. For example, rather than setting target, timeout, verify_ssl,
and batch_mode under [dispatcher_http] section of conf, you can specify
http://<target>/?verify_ssl=True&batch=True&timeout=10. Use `raw_only=1`
if only the raw details of event are required.
deprecations:
- As the collector service is being deprecated, the duplication of publishers
and dispatchers is being addressed. The http dispatcher is now marked
as deprecated and the recommended path is to use http publisher.