diff --git a/ironic_inspector/common/coordination.py b/ironic_inspector/common/coordination.py index b163be4a9..01d4e4d52 100644 --- a/ironic_inspector/common/coordination.py +++ b/ironic_inspector/common/coordination.py @@ -14,6 +14,7 @@ from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log +import tooz from tooz import coordination from ironic_inspector import utils @@ -42,6 +43,8 @@ class Coordinator(object): self.coordinator = None self.started = False self.prefix = prefix if prefix else 'default' + self.is_leader = False + self.supports_election = True def start(self, heartbeat=True): """Start coordinator. @@ -85,6 +88,24 @@ class Coordinator(object): except coordination.GroupAlreadyExist: LOG.debug('Group %s already exists.', self.group_name) + def _join_election(self): + self.is_leader = False + + def _when_elected(event): + LOG.info('This conductor instance is a group leader now.') + self.is_leader = True + + try: + self.coordinator.watch_elected_as_leader( + self.group_name, _when_elected) + self.coordinator.run_elect_coordinator() + except tooz.NotImplemented: + LOG.warning('The coordination backend does not support leader ' + 'elections, assuming we are a leader. This is ' + 'deprecated, please use a supported backend.') + self.is_leader = True + self.supports_election = False + def join_group(self): """Join service group.""" self._validate_state() @@ -97,6 +118,8 @@ class Coordinator(object): request.get() except coordination.MemberAlreadyExist: pass + + self._join_election() LOG.debug('Joined group %s', self.group_name) def leave_group(self): @@ -125,6 +148,18 @@ class Coordinator(object): lock_name = (self.lock_prefix + uuid).encode('ascii') return self.coordinator.get_lock(lock_name) + def run_elect_coordinator(self): + """Trigger a new leader election.""" + if self.supports_election: + LOG.debug('Starting leader election') + self.coordinator.run_elect_coordinator() + LOG.debug('Finished leader election') + else: + LOG.warning('The coordination backend does not support leader ' + 'elections, assuming we are a leader. This is ' + 'deprecated, please use a supported backend.') + self.is_leader = True + _COORDINATOR = None diff --git a/ironic_inspector/conductor/manager.py b/ironic_inspector/conductor/manager.py index 884ad4883..ecd608bd5 100644 --- a/ironic_inspector/conductor/manager.py +++ b/ironic_inspector/conductor/manager.py @@ -51,6 +51,7 @@ class ConductorManager(object): self._periodics_worker = None self._zeroconf = None self._shutting_down = semaphore.Semaphore() + self.coordinator = None def init_host(self): """Initialize Worker host @@ -70,6 +71,24 @@ class ConductorManager(object): db.init() + self.coordinator = None + try: + self.coordinator = coordination.get_coordinator(prefix='conductor') + self.coordinator.start(heartbeat=True) + self.coordinator.join_group() + except Exception as exc: + if CONF.standalone: + LOG.info('Coordination backend cannot be started, assuming ' + 'no other instances are running. Error: %s', exc) + self.coordinator = None + else: + with excutils.save_and_reraise_exception(): + LOG.critical('Failure when connecting to coordination ' + 'backend', exc_info=True) + self.del_host() + else: + LOG.info('Successfully connected to coordination backend.') + try: hooks = plugins_base.validate_processing_hooks() except Exception as exc: @@ -91,11 +110,20 @@ class ConductorManager(object): )(sync_with_ironic) callables = [(periodic_clean_up_, None, None), - (sync_with_ironic_, None, None)] + (sync_with_ironic_, (self,), None)] + driver_task = driver.get_periodic_sync_task() if driver_task is not None: callables.append((driver_task, None, None)) + # run elections periodically if we have a coordinator + # that we were able to start + if (self.coordinator and self.coordinator.started): + periodic_leader_election_ = periodics.periodic( + spacing=CONF.leader_election_interval + )(periodic_leader_election) + callables.append((periodic_leader_election_, (self,), None)) + self._periodics_worker = periodics.PeriodicWorker( callables=callables, executor_factory=periodics.ExistingExecutor(utils.executor()), @@ -109,28 +137,14 @@ class ConductorManager(object): self._zeroconf.register_service('baremetal-introspection', endpoint) - if not CONF.standalone: - try: - coordinator = coordination.get_coordinator(prefix='conductor') - coordinator.start(heartbeat=True) - coordinator.join_group() - except tooz.ToozError: - with excutils.save_and_reraise_exception(): - LOG.critical('Failed when connecting to coordination ' - 'backend.') - self.del_host() - else: - LOG.info('Successfully connected to coordination backend.') - def del_host(self): """Shutdown the ironic inspector conductor service.""" - if not CONF.standalone: + if self.coordinator is not None: try: - coordinator = coordination.get_coordinator(prefix='conductor') - if coordinator.started: - coordinator.leave_group() - coordinator.stop() + if self.coordinator.started: + self.coordinator.leave_group() + self.coordinator.stop() except tooz.ToozError: LOG.exception('Failed to stop coordinator') @@ -201,9 +215,22 @@ def periodic_clean_up(): # pragma: no cover pxe_filter.driver().sync(ir_utils.get_client()) -def sync_with_ironic(): +def sync_with_ironic(conductor): + if (conductor.coordinator is not None + and not conductor.coordinator.is_leader): + LOG.debug('The conductor is not a leader, skipping syncing ' + 'with ironic') + return + + LOG.debug('Syncing with ironic') ironic = ir_utils.get_client() # TODO(yuikotakada): pagination ironic_nodes = ironic.nodes(fields=["uuid"], limit=None) ironic_node_uuids = {node.id for node in ironic_nodes} node_cache.delete_nodes_not_in_list(ironic_node_uuids) + + +def periodic_leader_election(conductor): + if conductor.coordinator is not None: + conductor.coordinator.run_elect_coordinator() + return diff --git a/ironic_inspector/conf/default.py b/ironic_inspector/conf/default.py index 0bda7969b..f34ed74f7 100644 --- a/ironic_inspector/conf/default.py +++ b/ironic_inspector/conf/default.py @@ -64,6 +64,9 @@ _OPTS = [ 'Not advisable if the deployment uses a PXE filter, ' 'and will result in the ironic-inspector ceasing ' 'periodic cleanup activities.')), + cfg.IntOpt('leader_election_interval', + default=10, + help=_('Interval (in seconds) between leader elections.')), cfg.BoolOpt('use_ssl', default=False, help=_('SSL Enabled/Disabled')), diff --git a/ironic_inspector/test/unit/test_manager.py b/ironic_inspector/test/unit/test_manager.py index 77453c667..c76e26e6a 100644 --- a/ironic_inspector/test/unit/test_manager.py +++ b/ironic_inspector/test/unit/test_manager.py @@ -79,15 +79,21 @@ class TestManagerInitHost(BaseManagerTest): self.mock_executor.return_value.submit.assert_called_once_with( self.manager._periodics_worker.start) - def test_no_introspection_data_store(self): + @mock.patch.object(coordination, 'get_coordinator', autospec=True) + def test_no_introspection_data_store(self, mock_get_coord): CONF.set_override('store_data', 'none', 'processing') + mock_coordinator = mock.MagicMock() + mock_get_coord.return_value = mock_coordinator self.manager.init_host() self.mock_log.warning.assert_called_once_with( 'Introspection data will not be stored. Change "[processing] ' 'store_data" option if this is not the desired behavior') + @mock.patch.object(coordination, 'get_coordinator', autospec=True) @mock.patch.object(mdns, 'Zeroconf', autospec=True) - def test_init_host(self, mock_zc): + def test_init_host(self, mock_zc, mock_get_coord): + mock_coordinator = mock.MagicMock() + mock_get_coord.return_value = mock_coordinator self.manager.init_host() self.mock_db_init.assert_called_once_with() self.mock_validate_processing_hooks.assert_called_once_with() @@ -112,10 +118,13 @@ class TestManagerInitHost(BaseManagerTest): self.mock_exit.assert_called_once_with(1) self.mock_filter.init_filter.assert_not_called() + @mock.patch.object(coordination, 'get_coordinator', autospec=True) @mock.patch.object(mdns, 'Zeroconf', autospec=True) @mock.patch.object(keystone, 'get_endpoint', autospec=True) - def test_init_host_with_mdns(self, mock_endpoint, mock_zc): + def test_init_host_with_mdns(self, mock_endpoint, mock_zc, mock_get_coord): CONF.set_override('enable_mdns', True) + mock_coordinator = mock.MagicMock() + mock_get_coord.return_value = mock_coordinator self.manager.init_host() self.mock_db_init.assert_called_once_with() self.mock_validate_processing_hooks.assert_called_once_with() @@ -149,9 +158,9 @@ class TestManagerInitHost(BaseManagerTest): None) self.assertRaises(tooz.ToozError, self.manager.init_host) self.mock_db_init.assert_called_once_with() - self.mock_validate_processing_hooks.assert_called_once_with() - self.mock_filter.init_filter.assert_called_once_with() - self.assert_periodics() + self.mock_validate_processing_hooks.assert_not_called() + self.mock_filter.init_filter.assert_not_called() + self.assertIsNone(self.manager._periodics_worker) mock_get_coord.assert_called_once_with(prefix='conductor') mock_del_host.assert_called_once_with(self.manager) diff --git a/releasenotes/notes/leader-election-c6692d9962f30ad1.yaml b/releasenotes/notes/leader-election-c6692d9962f30ad1.yaml new file mode 100644 index 000000000..814a739b4 --- /dev/null +++ b/releasenotes/notes/leader-election-c6692d9962f30ad1.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Adds periodic leader election for the cleanup sync with Ironic. + The election interval is configured by the new + ``leader_election_interval`` config option. \ No newline at end of file