messaging: stop using global config
Change-Id: I97840aa9d1249deeba91dcdb6a5d23eca2fecdf1
This commit is contained in:
parent
5901528d12
commit
32c5d72647
@ -290,7 +290,7 @@ class AgentManager(service_base.PipelineBasedService):
|
|||||||
if group_prefix else namespace_prefix)
|
if group_prefix else namespace_prefix)
|
||||||
|
|
||||||
self.notifier = oslo_messaging.Notifier(
|
self.notifier = oslo_messaging.Notifier(
|
||||||
messaging.get_transport(),
|
messaging.get_transport(cfg.CONF),
|
||||||
driver=cfg.CONF.publisher_notifier.telemetry_driver,
|
driver=cfg.CONF.publisher_notifier.telemetry_driver,
|
||||||
publisher_id="ceilometer.polling")
|
publisher_id="ceilometer.polling")
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ class NotifierHook(hooks.PecanHook):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
transport = messaging.get_transport()
|
transport = messaging.get_transport(conf)
|
||||||
self.notifier = oslo_messaging.Notifier(
|
self.notifier = oslo_messaging.Notifier(
|
||||||
transport, driver=conf.publisher_notifier.telemetry_driver,
|
transport, driver=conf.publisher_notifier.telemetry_driver,
|
||||||
publisher_id="ceilometer.api")
|
publisher_id="ceilometer.api")
|
||||||
|
@ -74,7 +74,7 @@ class CollectorService(cotyledon.Service):
|
|||||||
if self.conf.collector.udp_address:
|
if self.conf.collector.udp_address:
|
||||||
self.udp_thread = utils.spawn_thread(self.start_udp)
|
self.udp_thread = utils.spawn_thread(self.start_udp)
|
||||||
|
|
||||||
transport = messaging.get_transport(optional=True)
|
transport = messaging.get_transport(self.conf, optional=True)
|
||||||
if transport:
|
if transport:
|
||||||
if list(self.meter_manager):
|
if list(self.meter_manager):
|
||||||
sample_target = oslo_messaging.Target(
|
sample_target = oslo_messaging.Target(
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_messaging import serializer as oslo_serializer
|
from oslo_messaging import serializer as oslo_serializer
|
||||||
|
|
||||||
@ -25,14 +24,14 @@ def setup():
|
|||||||
oslo_messaging.set_transport_defaults('ceilometer')
|
oslo_messaging.set_transport_defaults('ceilometer')
|
||||||
|
|
||||||
|
|
||||||
def get_transport(url=None, optional=False, cache=True):
|
def get_transport(conf, url=None, optional=False, cache=True):
|
||||||
"""Initialise the oslo_messaging layer."""
|
"""Initialise the oslo_messaging layer."""
|
||||||
global TRANSPORTS, DEFAULT_URL
|
global TRANSPORTS, DEFAULT_URL
|
||||||
cache_key = url or DEFAULT_URL
|
cache_key = url or DEFAULT_URL
|
||||||
transport = TRANSPORTS.get(cache_key)
|
transport = TRANSPORTS.get(cache_key)
|
||||||
if not transport or not cache:
|
if not transport or not cache:
|
||||||
try:
|
try:
|
||||||
transport = oslo_messaging.get_transport(cfg.CONF, url)
|
transport = oslo_messaging.get_transport(conf, url)
|
||||||
except (oslo_messaging.InvalidTransportURL,
|
except (oslo_messaging.InvalidTransportURL,
|
||||||
oslo_messaging.DriverLoadFailure):
|
oslo_messaging.DriverLoadFailure):
|
||||||
if not optional or url:
|
if not optional or url:
|
||||||
|
@ -160,7 +160,7 @@ class NotificationService(service_base.PipelineBasedService):
|
|||||||
|
|
||||||
self.event_pipeline_manager = pipeline.setup_event_pipeline()
|
self.event_pipeline_manager = pipeline.setup_event_pipeline()
|
||||||
|
|
||||||
self.transport = messaging.get_transport()
|
self.transport = messaging.get_transport(cfg.CONF)
|
||||||
|
|
||||||
if cfg.CONF.notification.workload_partitioning:
|
if cfg.CONF.notification.workload_partitioning:
|
||||||
self.group_id = self.NOTIFICATION_NAMESPACE
|
self.group_id = self.NOTIFICATION_NAMESPACE
|
||||||
@ -252,7 +252,7 @@ class NotificationService(service_base.PipelineBasedService):
|
|||||||
|
|
||||||
urls = cfg.CONF.notification.messaging_urls or [None]
|
urls = cfg.CONF.notification.messaging_urls or [None]
|
||||||
for url in urls:
|
for url in urls:
|
||||||
transport = messaging.get_transport(url)
|
transport = messaging.get_transport(cfg.CONF, url)
|
||||||
# NOTE(gordc): ignore batching as we want pull
|
# NOTE(gordc): ignore batching as we want pull
|
||||||
# to maintain sequencing as much as possible.
|
# to maintain sequencing as much as possible.
|
||||||
listener = messaging.get_batch_notification_listener(
|
listener = messaging.get_batch_notification_listener(
|
||||||
@ -271,7 +271,7 @@ class NotificationService(service_base.PipelineBasedService):
|
|||||||
def _configure_pipeline_listener(self):
|
def _configure_pipeline_listener(self):
|
||||||
ev_pipes = self.event_pipeline_manager.pipelines
|
ev_pipes = self.event_pipeline_manager.pipelines
|
||||||
pipelines = self.pipeline_manager.pipelines + ev_pipes
|
pipelines = self.pipeline_manager.pipelines + ev_pipes
|
||||||
transport = messaging.get_transport()
|
transport = messaging.get_transport(cfg.CONF)
|
||||||
partitioned = self.partition_coordinator.extract_my_subset(
|
partitioned = self.partition_coordinator.extract_my_subset(
|
||||||
self.group_id,
|
self.group_id,
|
||||||
range(cfg.CONF.notification.pipeline_processing_queues))
|
range(cfg.CONF.notification.pipeline_processing_queues))
|
||||||
|
@ -201,7 +201,7 @@ class NotifierPublisher(MessagingPublisher):
|
|||||||
urlparse.urlencode(options, True),
|
urlparse.urlencode(options, True),
|
||||||
parsed_url.fragment])
|
parsed_url.fragment])
|
||||||
self.notifier = oslo_messaging.Notifier(
|
self.notifier = oslo_messaging.Notifier(
|
||||||
messaging.get_transport(url),
|
messaging.get_transport(cfg.CONF, url),
|
||||||
driver=cfg.CONF.publisher_notifier.telemetry_driver,
|
driver=cfg.CONF.publisher_notifier.telemetry_driver,
|
||||||
publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
|
publisher_id='telemetry.publisher.%s' % cfg.CONF.host,
|
||||||
topics=topic,
|
topics=topic,
|
||||||
|
@ -38,7 +38,7 @@ class BaseTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
# NOTE(sileht): Ensure a new oslo.messaging driver is loaded
|
# NOTE(sileht): Ensure a new oslo.messaging driver is loaded
|
||||||
# between each tests
|
# between each tests
|
||||||
self.transport = messaging.get_transport("fake://", cache=False)
|
self.transport = messaging.get_transport(conf, "fake://", cache=False)
|
||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
'ceilometer.messaging.get_transport',
|
'ceilometer.messaging.get_transport',
|
||||||
return_value=self.transport))
|
return_value=self.transport))
|
||||||
|
@ -121,24 +121,24 @@ class NotifierOnlyPublisherTest(BasePublisherTestCase):
|
|||||||
def test_publish_other_host(self, cgt):
|
def test_publish_other_host(self, cgt):
|
||||||
msg_publisher.SampleNotifierPublisher(
|
msg_publisher.SampleNotifierPublisher(
|
||||||
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234'))
|
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234'))
|
||||||
cgt.assert_called_with('rabbit://foo:foo@127.0.0.1:1234')
|
cgt.assert_called_with(self.CONF, 'rabbit://foo:foo@127.0.0.1:1234')
|
||||||
|
|
||||||
msg_publisher.EventNotifierPublisher(
|
msg_publisher.EventNotifierPublisher(
|
||||||
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234'))
|
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234'))
|
||||||
cgt.assert_called_with('rabbit://foo:foo@127.0.0.1:1234')
|
cgt.assert_called_with(self.CONF, 'rabbit://foo:foo@127.0.0.1:1234')
|
||||||
|
|
||||||
@mock.patch('ceilometer.messaging.get_transport')
|
@mock.patch('ceilometer.messaging.get_transport')
|
||||||
def test_publish_other_host_vhost_and_query(self, cgt):
|
def test_publish_other_host_vhost_and_query(self, cgt):
|
||||||
msg_publisher.SampleNotifierPublisher(
|
msg_publisher.SampleNotifierPublisher(
|
||||||
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234/foo'
|
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234/foo'
|
||||||
'?driver=amqp&amqp_auto_delete=true'))
|
'?driver=amqp&amqp_auto_delete=true'))
|
||||||
cgt.assert_called_with('amqp://foo:foo@127.0.0.1:1234/foo'
|
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
|
||||||
'?amqp_auto_delete=true')
|
'?amqp_auto_delete=true')
|
||||||
|
|
||||||
msg_publisher.EventNotifierPublisher(
|
msg_publisher.EventNotifierPublisher(
|
||||||
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234/foo'
|
netutils.urlsplit('notifier://foo:foo@127.0.0.1:1234/foo'
|
||||||
'?driver=amqp&amqp_auto_delete=true'))
|
'?driver=amqp&amqp_auto_delete=true'))
|
||||||
cgt.assert_called_with('amqp://foo:foo@127.0.0.1:1234/foo'
|
cgt.assert_called_with(self.CONF, 'amqp://foo:foo@127.0.0.1:1234/foo'
|
||||||
'?amqp_auto_delete=true')
|
'?amqp_auto_delete=true')
|
||||||
|
|
||||||
|
|
||||||
|
@ -27,39 +27,40 @@ class MessagingTests(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_get_transport_invalid_url(self):
|
def test_get_transport_invalid_url(self):
|
||||||
self.assertRaises(oslo_messaging.InvalidTransportURL,
|
self.assertRaises(oslo_messaging.InvalidTransportURL,
|
||||||
messaging.get_transport, "notvalid!")
|
messaging.get_transport, self.CONF, "notvalid!")
|
||||||
|
|
||||||
def test_get_transport_url_caching(self):
|
def test_get_transport_url_caching(self):
|
||||||
t1 = messaging.get_transport('fake://')
|
t1 = messaging.get_transport(self.CONF, 'fake://')
|
||||||
t2 = messaging.get_transport('fake://')
|
t2 = messaging.get_transport(self.CONF, 'fake://')
|
||||||
self.assertEqual(t1, t2)
|
self.assertEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_default_url_caching(self):
|
def test_get_transport_default_url_caching(self):
|
||||||
t1 = messaging.get_transport()
|
t1 = messaging.get_transport(self.CONF)
|
||||||
t2 = messaging.get_transport()
|
t2 = messaging.get_transport(self.CONF)
|
||||||
self.assertEqual(t1, t2)
|
self.assertEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_default_url_no_caching(self):
|
def test_get_transport_default_url_no_caching(self):
|
||||||
t1 = messaging.get_transport(cache=False)
|
t1 = messaging.get_transport(self.CONF, cache=False)
|
||||||
t2 = messaging.get_transport(cache=False)
|
t2 = messaging.get_transport(self.CONF, cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_url_no_caching(self):
|
def test_get_transport_url_no_caching(self):
|
||||||
t1 = messaging.get_transport('fake://', cache=False)
|
t1 = messaging.get_transport(self.CONF, 'fake://', cache=False)
|
||||||
t2 = messaging.get_transport('fake://', cache=False)
|
t2 = messaging.get_transport(self.CONF, 'fake://', cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_default_url_caching_mix(self):
|
def test_get_transport_default_url_caching_mix(self):
|
||||||
t1 = messaging.get_transport()
|
t1 = messaging.get_transport(self.CONF)
|
||||||
t2 = messaging.get_transport(cache=False)
|
t2 = messaging.get_transport(self.CONF, cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_url_caching_mix(self):
|
def test_get_transport_url_caching_mix(self):
|
||||||
t1 = messaging.get_transport('fake://')
|
t1 = messaging.get_transport(self.CONF, 'fake://')
|
||||||
t2 = messaging.get_transport('fake://', cache=False)
|
t2 = messaging.get_transport(self.CONF, 'fake://', cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_optional(self):
|
def test_get_transport_optional(self):
|
||||||
self.CONF.set_override('transport_url', 'non-url')
|
self.CONF.set_override('transport_url', 'non-url')
|
||||||
self.assertIsNone(messaging.get_transport(optional=True,
|
self.assertIsNone(messaging.get_transport(self.CONF,
|
||||||
|
optional=True,
|
||||||
cache=False))
|
cache=False))
|
||||||
|
@ -43,9 +43,9 @@ def send_batch_notifier(notifier, topic, batch):
|
|||||||
|
|
||||||
|
|
||||||
def get_notifier(config_file):
|
def get_notifier(config_file):
|
||||||
service.prepare_service(argv=['/', '--config-file', config_file])
|
conf = service.prepare_service(argv=['/', '--config-file', config_file])
|
||||||
return oslo_messaging.Notifier(
|
return oslo_messaging.Notifier(
|
||||||
messaging.get_transport(),
|
messaging.get_transport(conf),
|
||||||
driver='messagingv2',
|
driver='messagingv2',
|
||||||
publisher_id='telemetry.publisher.test',
|
publisher_id='telemetry.publisher.test',
|
||||||
topics=['metering'],
|
topics=['metering'],
|
||||||
|
Loading…
Reference in New Issue
Block a user