Handle multiple listeners
The collector service can now receive notifications from multiple RabbitMQ servers. Change-Id: Ie06cae29ed929ea46397a76796a6b575a859bce7
This commit is contained in:
parent
3d8c3d5f68
commit
54b56940db
|
@ -12,24 +12,36 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MessagingFactory(object):
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
|
||||
def _get_transport(self):
|
||||
return oslo_messaging.get_notification_transport(self.config, url=self.config.collector.transport_url)
|
||||
|
||||
def get_listener(self, handler):
|
||||
def get_listeners(self, handler):
|
||||
listeners = []
|
||||
targets = [
|
||||
oslo_messaging.Target(topic=self.config.collector.topic),
|
||||
]
|
||||
|
||||
return oslo_messaging.get_notification_listener(self._get_transport(), targets,
|
||||
endpoints=[handler], executor='threading')
|
||||
for url in self.config.collector.transport_url:
|
||||
LOG.debug('Creating listener for %s', url)
|
||||
transport = self._get_transport(url)
|
||||
listeners.append(oslo_messaging.get_notification_listener(transport=transport,
|
||||
targets=targets,
|
||||
endpoints=[handler],
|
||||
executor='threading'))
|
||||
return listeners
|
||||
|
||||
def get_notifier(self):
|
||||
return oslo_messaging.Notifier(self._get_transport(), publisher_id='almanach.collector',
|
||||
transport = self._get_transport(self.config.collector.transport_url[0])
|
||||
return oslo_messaging.Notifier(transport, publisher_id='almanach.collector',
|
||||
topic=self.config.collector.topic, driver='messagingv2')
|
||||
|
||||
def _get_transport(self, url):
|
||||
return oslo_messaging.get_notification_transport(self.config, url=url)
|
||||
|
|
|
@ -26,20 +26,22 @@ LOG = logging.getLogger(__name__)
|
|||
|
||||
class CollectorService(service.ServiceBase):
|
||||
|
||||
def __init__(self, listener):
|
||||
def __init__(self, listeners):
|
||||
super(CollectorService, self).__init__()
|
||||
self.listener = listener
|
||||
self.listeners = listeners
|
||||
|
||||
def start(self):
|
||||
LOG.info('Starting collector service...')
|
||||
self.listener.start()
|
||||
LOG.info('Starting collector listeners...')
|
||||
for listener in self.listeners:
|
||||
listener.start()
|
||||
|
||||
def wait(self):
|
||||
LOG.info('Waiting...')
|
||||
|
||||
def stop(self):
|
||||
LOG.info('Graceful shutdown of the collector service...')
|
||||
self.listener.stop()
|
||||
for listener in self.listeners:
|
||||
listener.stop()
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
@ -59,8 +61,8 @@ class ServiceFactory(object):
|
|||
notification_handler.add_event_handler(self._get_volume_handler())
|
||||
notification_handler.add_event_handler(self._get_volume_type_handler())
|
||||
|
||||
listener = messaging_factory.get_listener(notification_handler)
|
||||
return CollectorService(listener)
|
||||
listeners = messaging_factory.get_listeners(notification_handler)
|
||||
return CollectorService(listeners)
|
||||
|
||||
def _get_instance_handler(self):
|
||||
return instance_handler.InstanceHandler(self.core_factory.get_instance_controller())
|
||||
|
|
|
@ -39,10 +39,10 @@ api_opts = [
|
|||
]
|
||||
|
||||
collector_opts = [
|
||||
cfg.StrOpt('transport_url',
|
||||
secret=True,
|
||||
default='rabbit://guest:guest@localhost:5672',
|
||||
help='AMQP connection URL'),
|
||||
cfg.ListOpt('transport_url',
|
||||
secret=True,
|
||||
default='rabbit://guest:guest@localhost:5672',
|
||||
help='AMQP connection URL'),
|
||||
cfg.StrOpt('topic',
|
||||
default='almanach',
|
||||
help='AMQP topic used for OpenStack notifications'),
|
||||
|
|
|
@ -25,8 +25,16 @@ class TestMessagingFactory(base.BaseTestCase):
|
|||
super(TestMessagingFactory, self).setUp()
|
||||
self.factory = messaging.MessagingFactory(self.config)
|
||||
|
||||
def test_get_listener(self):
|
||||
self.assertIsNotNone(self.factory.get_listener(mock.Mock()))
|
||||
def test_get_listeners_with_one_listener(self):
|
||||
self.config.collector.transport_url = ['rabbit://guest:guest@localhost:5672']
|
||||
listeners = self.factory.get_listeners(mock.Mock())
|
||||
self.assertEqual(1, len(listeners))
|
||||
|
||||
def test_get_listeners_with_multiple_listener(self):
|
||||
self.config.collector.transport_url = ['rabbit://guest:guest@localhost:5672',
|
||||
'rabbit://guest:guest@localhost:5673']
|
||||
listeners = self.factory.get_listeners(mock.Mock())
|
||||
self.assertEqual(2, len(listeners))
|
||||
|
||||
def test_get_notifier(self):
|
||||
self.assertIsInstance(self.factory.get_notifier(), oslo_messaging.Notifier)
|
||||
|
|
Loading…
Reference in New Issue