publisher: stop using global conf

Change-Id: I7084eedf6a6a85464e10fc423799e2b88d80c76e
This commit is contained in:
Mehdi Abaakouk 2016-10-11 07:04:09 +02:00
parent b4fa1597c8
commit f827d42d8a
20 changed files with 149 additions and 87 deletions

View File

@ -363,7 +363,8 @@ class Sink(object):
passed data directly from the sink which are published unchanged.
"""
def __init__(self, cfg, transformer_manager):
def __init__(self, conf, cfg, transformer_manager):
self.conf = conf
self.cfg = cfg
try:
@ -383,8 +384,8 @@ class Sink(object):
# Support old format without URL
p = p + "://"
try:
self.publishers.append(publisher.get_publisher(p,
self.NAMESPACE))
self.publishers.append(publisher.get_publisher(
self.conf, p, self.NAMESPACE))
except Exception:
LOG.exception(_("Unable to load publisher %s"), p)
@ -780,7 +781,8 @@ class PipelineManager(ConfigManagerBase):
name, self)
else:
unique_names.add(name)
sinks[s['name']] = p_type['sink'](s, transformer_manager)
sinks[s['name']] = p_type['sink'](self.conf, s,
transformer_manager)
unique_names.clear()
for source in sources:

View File

@ -16,12 +16,13 @@
import abc
from debtcollector import removals
from oslo_utils import netutils
import six
from stevedore import driver
def get_publisher(url, namespace='ceilometer.publisher'):
def get_publisher(conf, url, namespace='ceilometer.publisher'):
"""Get publisher driver and load it.
:param URL: URL for the publisher
@ -29,12 +30,23 @@ def get_publisher(url, namespace='ceilometer.publisher'):
"""
parse_result = netutils.urlsplit(url)
loaded_driver = driver.DriverManager(namespace, parse_result.scheme)
return loaded_driver.driver(parse_result)
if issubclass(loaded_driver.driver, ConfigPublisherBase):
return loaded_driver.driver(conf, parse_result)
else:
return loaded_driver.driver(parse_result)
@removals.removed_class("PublisherBase",
message="Use ConfigPublisherBase instead",
removal_version="9.0.0")
@six.add_metaclass(abc.ABCMeta)
class PublisherBase(object):
"""Base class for plugins that publish data."""
"""Legacy base class for plugins that publish data.
This base class is for backward compatibility purpose. It doesn't take
oslo.config object as argument. We assume old publisher does not depend
on cfg.CONF.
"""
def __init__(self, parsed_url):
pass
@ -46,3 +58,19 @@ class PublisherBase(object):
@abc.abstractmethod
def publish_events(self, events):
"""Publish events into final conduit."""
@six.add_metaclass(abc.ABCMeta)
class ConfigPublisherBase(object):
"""Base class for plugins that publish data."""
def __init__(self, conf, parsed_url):
self.conf = conf
@abc.abstractmethod
def publish_samples(self, samples):
"""Publish samples into final conduit."""
@abc.abstractmethod
def publish_events(self, events):
"""Publish events into final conduit."""

View File

@ -12,7 +12,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log
import six.moves.urllib.parse as urlparse
from stevedore import driver
@ -25,7 +24,7 @@ from ceilometer.publisher import utils
LOG = log.getLogger(__name__)
class DirectPublisher(publisher.PublisherBase):
class DirectPublisher(publisher.ConfigPublisherBase):
"""A publisher that allows saving directly from the pipeline.
Samples are saved to a configured dispatcher. This is useful
@ -36,8 +35,8 @@ class DirectPublisher(publisher.PublisherBase):
can use direct://?dispatcher=gnocchi, direct://?dispatcher=http,
direct://?dispatcher=log, ...
"""
def __init__(self, parsed_url):
super(DirectPublisher, self).__init__(parsed_url)
def __init__(self, conf, parsed_url):
super(DirectPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query)
self.dispatcher_name = options.get('dispatcher', ['database'])[-1]
self._sample_dispatcher = None
@ -57,13 +56,13 @@ class DirectPublisher(publisher.PublisherBase):
def get_sample_dispatcher(self):
if not self._sample_dispatcher:
self._sample_dispatcher = self.sample_driver(cfg.CONF)
self._sample_dispatcher = self.sample_driver(self.conf)
return self._sample_dispatcher
def get_event_dispatcher(self):
if not self._event_dispatcher:
if self.event_driver != self.sample_driver:
self._event_dispatcher = self.event_driver(cfg.CONF)
self._event_dispatcher = self.event_driver(self.conf)
else:
self._event_dispatcher = self.get_sample_dispatcher()
return self._event_dispatcher
@ -78,7 +77,7 @@ class DirectPublisher(publisher.PublisherBase):
samples = [samples]
self.get_sample_dispatcher().record_metering_data([
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
sample, self.conf.publisher.telemetry_secret)
for sample in samples
])
@ -92,5 +91,5 @@ class DirectPublisher(publisher.PublisherBase):
events = [events]
self.get_event_dispatcher().record_events([
utils.message_from_event(
event, cfg.CONF.publisher.telemetry_secret)
event, self.conf.publisher.telemetry_secret)
for event in events])

View File

@ -26,7 +26,7 @@ from ceilometer import publisher
LOG = log.getLogger(__name__)
class FilePublisher(publisher.PublisherBase):
class FilePublisher(publisher.ConfigPublisherBase):
"""Publisher metering data to file.
The publisher which records metering data into a file. The file name and
@ -53,8 +53,8 @@ class FilePublisher(publisher.PublisherBase):
be used to save the metering data.
"""
def __init__(self, parsed_url):
super(FilePublisher, self).__init__(parsed_url)
def __init__(self, conf, parsed_url):
super(FilePublisher, self).__init__(conf, parsed_url)
self.publisher_logger = None
path = parsed_url.path

View File

@ -25,7 +25,7 @@ from ceilometer import publisher
LOG = log.getLogger(__name__)
class HttpPublisher(publisher.PublisherBase):
class HttpPublisher(publisher.ConfigPublisherBase):
"""Publisher metering data to a http endpoint
The publisher which records metering data into a http endpoint. The
@ -63,8 +63,8 @@ class HttpPublisher(publisher.PublisherBase):
Http end point is required for this publisher to work properly.
"""
def __init__(self, parsed_url):
super(HttpPublisher, self).__init__(parsed_url)
def __init__(self, conf, parsed_url):
super(HttpPublisher, self).__init__(conf, parsed_url)
self.target = parsed_url.geturl()
if not parsed_url.hostname:

View File

@ -62,8 +62,8 @@ class KafkaBrokerPublisher(messaging.MessagingPublisher):
field. If max_retry is not specified, default the number of retry is 100.
"""
def __init__(self, parsed_url):
super(KafkaBrokerPublisher, self).__init__(parsed_url)
def __init__(self, conf, parsed_url):
super(KafkaBrokerPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query)
self._producer = None

View File

@ -72,9 +72,10 @@ def raise_delivery_failure(exc):
@six.add_metaclass(abc.ABCMeta)
class MessagingPublisher(publisher.PublisherBase):
class MessagingPublisher(publisher.ConfigPublisherBase):
def __init__(self, parsed_url):
def __init__(self, conf, parsed_url):
super(MessagingPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query)
# the value of options is a list of url param values
# only take care of the latest one if the option
@ -107,10 +108,10 @@ class MessagingPublisher(publisher.PublisherBase):
meters = [
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
sample, self.conf.publisher.telemetry_secret)
for sample in samples
]
topic = cfg.CONF.publisher_notifier.metering_topic
topic = self.conf.publisher_notifier.metering_topic
self.local_queue.append((topic, meters))
if self.per_meter_topic:
@ -177,9 +178,9 @@ class MessagingPublisher(publisher.PublisherBase):
:param events: events from pipeline after transformation
"""
ev_list = [utils.message_from_event(
event, cfg.CONF.publisher.telemetry_secret) for event in events]
event, self.conf.publisher.telemetry_secret) for event in events]
topic = cfg.CONF.publisher_notifier.event_topic
topic = self.conf.publisher_notifier.event_topic
self.local_queue.append((topic, ev_list))
self.flush()
@ -189,8 +190,8 @@ class MessagingPublisher(publisher.PublisherBase):
class NotifierPublisher(MessagingPublisher):
def __init__(self, parsed_url, default_topic):
super(NotifierPublisher, self).__init__(parsed_url)
def __init__(self, conf, parsed_url, default_topic):
super(NotifierPublisher, self).__init__(conf, parsed_url)
options = urlparse.parse_qs(parsed_url.query)
topic = options.pop('topic', [default_topic])
driver = options.pop('driver', ['rabbit'])[0]
@ -201,9 +202,9 @@ class NotifierPublisher(MessagingPublisher):
urlparse.urlencode(options, True),
parsed_url.fragment])
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(cfg.CONF, url),
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
messaging.get_transport(self.conf, url),
driver=self.conf.publisher_notifier.telemetry_driver,
publisher_id='telemetry.publisher.%s' % self.conf.host,
topics=topic,
retry=self.retry
)
@ -217,12 +218,12 @@ class NotifierPublisher(MessagingPublisher):
class SampleNotifierPublisher(NotifierPublisher):
def __init__(self, parsed_url):
def __init__(self, conf, parsed_url):
super(SampleNotifierPublisher, self).__init__(
parsed_url, cfg.CONF.publisher_notifier.metering_topic)
conf, parsed_url, conf.publisher_notifier.metering_topic)
class EventNotifierPublisher(NotifierPublisher):
def __init__(self, parsed_url):
def __init__(self, conf, parsed_url):
super(EventNotifierPublisher, self).__init__(
parsed_url, cfg.CONF.publisher_notifier.event_topic)
conf, parsed_url, conf.publisher_notifier.event_topic)

View File

@ -18,10 +18,11 @@
from ceilometer import publisher
class TestPublisher(publisher.PublisherBase):
class TestPublisher(publisher.ConfigPublisherBase):
"""Publisher used in unit testing."""
def __init__(self, parsed_url):
def __init__(self, conf, parsed_url):
super(TestPublisher, self).__init__(conf, parsed_url)
self.samples = []
self.events = []
self.calls = 0

View File

@ -33,11 +33,12 @@ cfg.CONF.import_opt('udp_port', 'ceilometer.collector',
LOG = log.getLogger(__name__)
class UDPPublisher(publisher.PublisherBase):
def __init__(self, parsed_url):
class UDPPublisher(publisher.ConfigPublisherBase):
def __init__(self, conf, parsed_url):
super(UDPPublisher, self).__init__(conf, parsed_url)
self.host, self.port = netutils.parse_host_port(
parsed_url.netloc,
default_port=cfg.CONF.collector.udp_port)
default_port=self.conf.collector.udp_port)
addrinfo = None
try:
addrinfo = socket.getaddrinfo(self.host, None, socket.AF_INET6,
@ -66,7 +67,7 @@ class UDPPublisher(publisher.PublisherBase):
for sample in samples:
msg = utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
sample, self.conf.publisher.telemetry_secret)
host = self.host
port = self.port
LOG.debug("Publishing sample %(msg)s over UDP to "

View File

@ -71,7 +71,7 @@ class TestDirectPublisher(tests_db.TestBase):
self.CONF.set_override('connection', self.db_manager.url,
group='database')
parsed_url = netutils.urlsplit('direct://')
publisher = direct.DirectPublisher(parsed_url)
publisher = direct.DirectPublisher(self.CONF, parsed_url)
publisher.publish_samples(self.test_data)
meters = list(self.conn.get_meters(resource=self.resource_id))
@ -90,7 +90,7 @@ class TestEventDirectPublisher(tests_db.TestBase):
def test_direct_publisher(self):
parsed_url = netutils.urlsplit('direct://dispatcher=database')
publisher = direct.DirectPublisher(parsed_url)
publisher = direct.DirectPublisher(self.CONF, parsed_url)
publisher.publish_events(self.test_data)
e_types = list(self.event_conn.get_event_types())

View File

@ -246,7 +246,7 @@ class BaseRealNotification(tests_base.BaseTestCase):
"definitions_cfg_file",
self.path_get('etc/ceilometer/event_definitions.yaml'),
group='event')
self.publisher = test_publisher.TestPublisher("")
self.publisher = test_publisher.TestPublisher(self.CONF, "")
def _check_notification_service(self):
self.srv.run()
@ -539,8 +539,8 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase):
group='notification')
self.CONF.set_override('pipeline_processing_queues', 2,
group='notification')
self.publisher = test_publisher.TestPublisher("")
self.publisher2 = test_publisher.TestPublisher("")
self.publisher = test_publisher.TestPublisher(self.CONF, "")
self.publisher2 = test_publisher.TestPublisher(self.CONF, "")
def _check_notifications(self, fake_publisher_cls):
fake_publisher_cls.side_effect = [self.publisher, self.publisher2]

View File

@ -74,13 +74,13 @@ class BasePipelineTestCase(base.BaseTestCase):
raise KeyError(name)
def get_publisher(self, url, namespace=''):
def get_publisher(self, conf, url, namespace=''):
fake_drivers = {'test://': test_publisher.TestPublisher,
'new://': test_publisher.TestPublisher,
'except://': self.PublisherClassException}
return fake_drivers[url](url)
return fake_drivers[url](conf, url)
class PublisherClassException(publisher.PublisherBase):
class PublisherClassException(publisher.ConfigPublisherBase):
def publish_samples(self, samples):
raise Exception()
@ -171,7 +171,7 @@ class BasePipelineTestCase(base.BaseTestCase):
os.unlink(self.tmp_cfg.name)
super(BasePipelineTestCase, self).tearDown()
def _handle_reraise_exception(self, msg):
def _handle_reraise_exception(self, *args, **kwargs):
if self._reraise_exception:
raise Exception(traceback.format_exc())

View File

@ -277,12 +277,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase):
self.useFixture(mockpatch.PatchObject(
publisher, 'get_publisher', side_effect=self.get_publisher))
@staticmethod
def get_publisher(url, namespace=''):
def get_publisher(self, url, namespace=''):
fake_drivers = {'test://': test_publisher.TestPublisher,
'new://': test_publisher.TestPublisher,
'rpc://': test_publisher.TestPublisher}
return fake_drivers[url](url)
return fake_drivers[url](self.CONF, url)
def tearDown(self):
self.Pollster.samples = []

View File

@ -83,10 +83,11 @@ TEST_NOTICE_PAYLOAD = {
class TestEventEndpoint(tests_base.BaseTestCase):
def get_publisher(self, url, namespace=''):
@staticmethod
def get_publisher(conf, url, namespace=''):
fake_drivers = {'test://': test.TestPublisher,
'except://': test.TestPublisher}
return fake_drivers[url](url)
return fake_drivers[url](conf, url)
def _setup_pipeline(self, publishers):
ev_pipeline = yaml.dump({

View File

@ -20,6 +20,7 @@ import logging.handlers
import os
import tempfile
from oslo_config import fixture as fixture_config
from oslo_utils import netutils
from oslotest import base
@ -65,13 +66,17 @@ class TestFilePublisher(base.BaseTestCase):
),
]
def setUp(self):
super(TestFilePublisher, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
def test_file_publisher_maxbytes(self):
# Test valid configurations
tempdir = tempfile.mkdtemp()
name = '%s/log_file' % tempdir
parsed_url = netutils.urlsplit('file://%s?max_bytes=50&backup_count=3'
% name)
publisher = file.FilePublisher(parsed_url)
publisher = file.FilePublisher(self.CONF, parsed_url)
publisher.publish_samples(self.test_data)
handler = publisher.publisher_logger.handlers[0]
@ -88,7 +93,7 @@ class TestFilePublisher(base.BaseTestCase):
tempdir = tempfile.mkdtemp()
name = '%s/log_file_plain' % tempdir
parsed_url = netutils.urlsplit('file://%s' % name)
publisher = file.FilePublisher(parsed_url)
publisher = file.FilePublisher(self.CONF, parsed_url)
publisher.publish_samples(self.test_data)
handler = publisher.publisher_logger.handlers[0]
@ -111,7 +116,7 @@ class TestFilePublisher(base.BaseTestCase):
parsed_url = netutils.urlsplit(
'file://%s/log_file_bad'
'?max_bytes=yus&backup_count=5y' % tempdir)
publisher = file.FilePublisher(parsed_url)
publisher = file.FilePublisher(self.CONF, parsed_url)
publisher.publish_samples(self.test_data)
self.assertIsNone(publisher.publisher_logger)

View File

@ -17,6 +17,7 @@
import datetime
import mock
from oslo_config import fixture as fixture_config
from oslotest import base
import requests
from six.moves.urllib import parse as urlparse
@ -77,20 +78,24 @@ class TestHttpPublisher(base.BaseTestCase):
generated=datetime.datetime.utcnow().isoformat(),
traits=[], raw={'payload': {}}) for i in range(0, 2)]
def setUp(self):
super(TestHttpPublisher, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
def test_http_publisher_config(self):
"""Test publisher config parameters."""
# invalid hostname, the given url, results in an empty hostname
parsed_url = urlparse.urlparse('http:/aaa.bb/path')
self.assertRaises(ValueError, http.HttpPublisher,
parsed_url)
self.CONF, parsed_url)
# invalid port
parsed_url = urlparse.urlparse('http://aaa:bb/path')
self.assertRaises(ValueError, http.HttpPublisher,
parsed_url)
self.CONF, parsed_url)
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
publisher = http.HttpPublisher(self.CONF, parsed_url)
# By default, timeout and retry_count should be set to 1000 and 2
# respectively
self.assertEqual(1, publisher.timeout)
@ -98,19 +103,19 @@ class TestHttpPublisher(base.BaseTestCase):
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'timeout=19&max_retries=4')
publisher = http.HttpPublisher(parsed_url)
publisher = http.HttpPublisher(self.CONF, parsed_url)
self.assertEqual(19, publisher.timeout)
self.assertEqual(4, publisher.max_retries)
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'timeout=19')
publisher = http.HttpPublisher(parsed_url)
publisher = http.HttpPublisher(self.CONF, parsed_url)
self.assertEqual(19, publisher.timeout)
self.assertEqual(2, publisher.max_retries)
parsed_url = urlparse.urlparse('http://localhost:90/path1?'
'max_retries=6')
publisher = http.HttpPublisher(parsed_url)
publisher = http.HttpPublisher(self.CONF, parsed_url)
self.assertEqual(1, publisher.timeout)
self.assertEqual(6, publisher.max_retries)
@ -118,7 +123,7 @@ class TestHttpPublisher(base.BaseTestCase):
def test_http_post_samples(self, thelog):
"""Test publisher post."""
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
publisher = http.HttpPublisher(self.CONF, parsed_url)
res = mock.Mock()
res.status_code = 200
@ -141,7 +146,7 @@ class TestHttpPublisher(base.BaseTestCase):
def test_http_post_events(self, thelog):
"""Test publisher post."""
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
publisher = http.HttpPublisher(self.CONF, parsed_url)
res = mock.Mock()
res.status_code = 200
@ -163,7 +168,7 @@ class TestHttpPublisher(base.BaseTestCase):
@mock.patch('ceilometer.publisher.http.LOG')
def test_http_post_empty_data(self, thelog):
parsed_url = urlparse.urlparse('http://localhost:90/path1')
publisher = http.HttpPublisher(parsed_url)
publisher = http.HttpPublisher(self.CONF, parsed_url)
res = mock.Mock()
res.status_code = 200

View File

@ -18,6 +18,7 @@ import datetime
import uuid
import mock
from oslo_config import fixture as fixture_config
from oslo_utils import netutils
from ceilometer.event.storage import models as event
@ -97,8 +98,12 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
),
]
def setUp(self):
super(TestKafkaPublisher, self).setUp()
self.CONF = self.useFixture(fixture_config.Config()).conf
def test_publish(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer'))
with mock.patch.object(publisher, '_producer') as fake_producer:
@ -108,7 +113,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
def test_publish_without_options(self):
publisher = kafka.KafkaBrokerPublisher(
netutils.urlsplit('kafka://127.0.0.1:9092'))
self.CONF, netutils.urlsplit('kafka://127.0.0.1:9092'))
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_samples(self.test_data)
@ -116,16 +121,16 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_without_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer'))
self.assertEqual('default', publisher.policy)
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=test'))
self.assertEqual('default', publisher.policy)
def test_publish_to_host_with_default_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=default'))
with mock.patch.object(publisher, '_producer') as fake_producer:
@ -137,7 +142,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_drop_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=drop'))
with mock.patch.object(publisher, '_producer') as fake_producer:
@ -147,7 +152,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(0, len(publisher.local_queue))
def test_publish_to_host_with_queue_policy(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_producer') as fake_producer:
@ -157,7 +162,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(1, len(publisher.local_queue))
def test_publish_to_down_host_with_default_queue_size(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_producer') as fake_producer:
@ -175,7 +180,7 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
publisher.local_queue[1023][1][0]['counter_name'])
def test_publish_to_host_from_down_to_up_with_queue(self):
publisher = kafka.KafkaBrokerPublisher(netutils.urlsplit(
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer&policy=queue'))
with mock.patch.object(publisher, '_producer') as fake_producer:
@ -194,8 +199,8 @@ class TestKafkaPublisher(tests_base.BaseTestCase):
self.assertEqual(0, len(publisher.local_queue))
def test_publish_event_with_default_policy(self):
publisher = kafka.KafkaBrokerPublisher(
netutils.urlsplit('kafka://127.0.0.1:9092?topic=ceilometer'))
publisher = kafka.KafkaBrokerPublisher(self.CONF, netutils.urlsplit(
'kafka://127.0.0.1:9092?topic=ceilometer'))
with mock.patch.object(publisher, '_producer') as fake_producer:
publisher.publish_events(self.test_event_data)

View File

@ -106,12 +106,14 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase):
@mock.patch('oslo_messaging.Notifier')
def test_publish_topic_override(self, notifier):
msg_publisher.SampleNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://?topic=custom_topic'))
notifier.assert_called_with(mock.ANY, topics=['custom_topic'],
driver=mock.ANY, retry=mock.ANY,
publisher_id=mock.ANY)
msg_publisher.EventNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://?topic=custom_event_topic'))
notifier.assert_called_with(mock.ANY, topics=['custom_event_topic'],
driver=mock.ANY, retry=mock.ANY,
@ -120,22 +122,26 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase):
@mock.patch('ceilometer.messaging.get_transport')
def test_publish_other_host(self, cgt):
msg_publisher.SampleNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234'))
cgt.assert_called_with(self.CONF, 'rabbit://foo:foo@127.0.0.1:1234')
msg_publisher.EventNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234'))
cgt.assert_called_with(self.CONF, 'rabbit://foo:foo@127.0.0.1:1234')
@mock.patch('ceilometer.messaging.get_transport')
def test_publish_other_host_vhost_and_query(self, cgt):
msg_publisher.SampleNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234/foo'
'?driver=amqp&amqp_auto_delete=true'))
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
'?amqp_auto_delete=true')
msg_publisher.EventNotifierPublisher(
self.CONF,
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234/foo'
'?driver=amqp&amqp_auto_delete=true'))
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
@ -168,6 +174,7 @@ class TestPublisherPolicy(TestPublisher):
@mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_no_policy(self, mylog):
publisher = self.publisher_cls(
self.CONF,
netutils.urlsplit('%s://' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
@ -185,6 +192,7 @@ class TestPublisherPolicy(TestPublisher):
@mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_block(self, mylog):
publisher = self.publisher_cls(
self.CONF,
netutils.urlsplit('%s://?policy=default' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
@ -201,6 +209,7 @@ class TestPublisherPolicy(TestPublisher):
@mock.patch('ceilometer.publisher.messaging.LOG')
def test_published_with_policy_incorrect(self, mylog):
publisher = self.publisher_cls(
self.CONF,
netutils.urlsplit('%s://?policy=notexist' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
@ -221,6 +230,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_drop_and_rpc_down(self):
publisher = self.publisher_cls(
self.CONF,
netutils.urlsplit('%s://?policy=drop' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
@ -232,6 +242,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_queue_and_rpc_down(self):
publisher = self.publisher_cls(
self.CONF,
netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()
with mock.patch.object(publisher, '_send') as fake_send:
@ -245,6 +256,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_queue_and_rpc_down_up(self):
self.rpc_unreachable = True
publisher = self.publisher_cls(
self.CONF,
netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()
@ -266,7 +278,7 @@ class TestPublisherPolicyReactions(TestPublisher):
self.assertEqual(expected, fake_send.mock_calls)
def test_published_with_policy_sized_queue_and_rpc_down(self):
publisher = self.publisher_cls(netutils.urlsplit(
publisher = self.publisher_cls(self.CONF, netutils.urlsplit(
'%s://?policy=queue&max_queue_length=3' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()
@ -293,6 +305,7 @@ class TestPublisherPolicyReactions(TestPublisher):
def test_published_with_policy_default_sized_queue_and_rpc_down(self):
publisher = self.publisher_cls(
self.CONF,
netutils.urlsplit('%s://?policy=queue' % self.protocol))
side_effect = msg_publisher.DeliveryFailure()

View File

@ -115,7 +115,7 @@ class TestUDPPublisher(base.BaseTestCase):
def _check_udp_socket(self, url, expected_addr_family):
with mock.patch.object(socket, 'socket') as mock_socket:
udp.UDPPublisher(netutils.urlsplit(url))
udp.UDPPublisher(self.CONF, netutils.urlsplit(url))
mock_socket.assert_called_with(expected_addr_family,
socket.SOCK_DGRAM)
@ -154,6 +154,7 @@ class TestUDPPublisher(base.BaseTestCase):
with mock.patch('socket.socket',
self._make_fake_socket(self.data_sent)):
publisher = udp.UDPPublisher(
self.CONF,
netutils.urlsplit('udp://somehost'))
publisher.publish_samples(self.test_data)
@ -192,5 +193,6 @@ class TestUDPPublisher(base.BaseTestCase):
with mock.patch('socket.socket',
self._make_broken_socket):
publisher = udp.UDPPublisher(
self.CONF,
netutils.urlsplit('udp://localhost'))
publisher.publish_samples(self.test_data)

View File

@ -30,13 +30,13 @@ from ceilometer.publisher import utils
class EventPipelineTestCase(base.BaseTestCase):
def get_publisher(self, url, namespace=''):
def get_publisher(self, conf, url, namespace=''):
fake_drivers = {'test://': test_publisher.TestPublisher,
'new://': test_publisher.TestPublisher,
'except://': self.PublisherClassException}
return fake_drivers[url](url)
return fake_drivers[url](conf, url)
class PublisherClassException(publisher.PublisherBase):
class PublisherClassException(publisher.ConfigPublisherBase):
def publish_samples(self, samples):
pass
@ -87,7 +87,7 @@ class EventPipelineTestCase(base.BaseTestCase):
'ceilometer.pipeline.LOG.exception',
side_effect=self._handle_reraise_exception))
def _handle_reraise_exception(self, msg):
def _handle_reraise_exception(self, *args, **kwargs):
if self._reraise_exception:
raise Exception(traceback.format_exc())