reset listeners on agent refresh

each time an agent joins/leaves group, we kill and refresh the list
of pipeline listeners each agent has. this list previously did not
clear the killed listeners so the list of listeners of an agent
would grow with a bunch of dead listeners. this patch removes
references to the dead listeners.

Change-Id: If4c9e20bca9643d9a30bb1a6c95e252a4ca21bd7
Closes-Bug: #1406405
This commit is contained in:
gordon chung
2014-12-29 17:16:29 -05:00
parent fe61ea9aa5
commit 24503c3987
2 changed files with 20 additions and 13 deletions

View File

@@ -175,19 +175,19 @@ class NotificationService(os_service.Service):
self._configure_pipeline_listeners()
def _configure_pipeline_listeners(self):
if cfg.CONF.notification.workload_partitioning:
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id, self.pipeline_manager.pipelines)
transport = messaging.get_transport()
for pipe in partitioned:
LOG.debug(_('Pipeline endpoint: %s'), pipe.name)
listener = messaging.get_notification_listener(
transport,
[oslo.messaging.Target(
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))],
[pipeline.PipelineEndpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
self.pipeline_listeners = []
partitioned = self.partition_coordinator.extract_my_subset(
self.group_id, self.pipeline_manager.pipelines)
transport = messaging.get_transport()
for pipe in partitioned:
LOG.debug(_('Pipeline endpoint: %s'), pipe.name)
listener = messaging.get_notification_listener(
transport,
[oslo.messaging.Target(
topic='%s-%s' % (self.NOTIFICATION_IPC, pipe.name))],
[pipeline.PipelineEndpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
def stop(self):
self.partition_coordinator.leave_group(self.group_id)

View File

@@ -262,3 +262,10 @@ class TestRealNotificationHA(BaseRealNotification):
fake_coord1.extract_my_subset.side_effect = lambda x, y: y
fake_coord.return_value = fake_coord1
self._check_notification_service()
def test_reset_listeners_on_refresh(self):
self.srv.start()
self.assertEqual(1, len(self.srv.pipeline_listeners))
self.srv._refresh_agent(None)
self.assertEqual(1, len(self.srv.pipeline_listeners))
self.srv.stop()