Merge "Allow periodics to terminate inspector"
This commit is contained in:
commit
65485e392a
@ -182,15 +182,27 @@ class BaseFilter(interface.FilterDriver):
|
||||
def get_periodic_sync_task(self):
|
||||
"""Get periodic sync task for the filter.
|
||||
|
||||
The periodic task returned is casting the InvalidFilterDriverState
|
||||
to the periodics.NeverAgain exception to quit looping.
|
||||
|
||||
:raises: periodics.NeverAgain
|
||||
:returns: a periodic task to be run in the background.
|
||||
"""
|
||||
ironic = ir_utils.get_client()
|
||||
|
||||
def periodic_sync_task():
|
||||
try:
|
||||
self.sync(ironic)
|
||||
except InvalidFilterDriverState as e:
|
||||
LOG.warning('Filter driver %s disabling periodic sync '
|
||||
'task because of an invalid state.', self)
|
||||
raise periodics.NeverAgain(e)
|
||||
|
||||
return periodics.periodic(
|
||||
# NOTE(milan): the periodic decorator doesn't support 0 as
|
||||
# a spacing value of (a switched off) periodic
|
||||
spacing=CONF.pxe_filter.sync_period or float('inf'),
|
||||
enabled=bool(CONF.pxe_filter.sync_period))(
|
||||
lambda: self.sync(ironic))
|
||||
enabled=bool(CONF.pxe_filter.sync_period))(periodic_sync_task)
|
||||
|
||||
|
||||
class NoopFilter(BaseFilter):
|
||||
|
@ -231,6 +231,29 @@ class TestBaseFilterInterface(BaseFilterBaseTest):
|
||||
self.mock_periodic.return_value.call_args[0][0]()
|
||||
sync_mock.assert_called_once_with(self.mock_get_client.return_value)
|
||||
|
||||
def test_get_periodic_sync_task_invalid_state(self):
|
||||
sync_mock = self.useFixture(
|
||||
fixtures.MockPatchObject(self.driver, 'sync')).mock
|
||||
sync_mock.side_effect = pxe_filter.InvalidFilterDriverState('Oops!')
|
||||
|
||||
self.driver.get_periodic_sync_task()
|
||||
self.mock_periodic.assert_called_once_with(spacing=15, enabled=True)
|
||||
self.assertRaisesRegex(periodics.NeverAgain, 'Oops!',
|
||||
self.mock_periodic.return_value.call_args[0][0])
|
||||
|
||||
def test_get_periodic_sync_task_custom_error(self):
|
||||
class MyError(Exception):
|
||||
pass
|
||||
|
||||
sync_mock = self.useFixture(
|
||||
fixtures.MockPatchObject(self.driver, 'sync')).mock
|
||||
sync_mock.side_effect = MyError('Oops!')
|
||||
|
||||
self.driver.get_periodic_sync_task()
|
||||
self.mock_periodic.assert_called_once_with(spacing=15, enabled=True)
|
||||
self.assertRaisesRegex(
|
||||
MyError, 'Oops!', self.mock_periodic.return_value.call_args[0][0])
|
||||
|
||||
def test_get_periodic_sync_task_disabled(self):
|
||||
CONF.set_override('sync_period', 0, 'pxe_filter')
|
||||
self.driver.get_periodic_sync_task()
|
||||
|
@ -20,64 +20,338 @@ import fixtures
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from ironic_inspector import db
|
||||
from ironic_inspector import firewall
|
||||
from ironic_inspector import main
|
||||
from ironic_inspector.plugins import base as plugins_base
|
||||
from ironic_inspector.test import base as test_base
|
||||
from ironic_inspector import utils
|
||||
from ironic_inspector import wsgi_service
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
@mock.patch.object(firewall, 'clean_up', lambda: None)
|
||||
@mock.patch.object(db, 'init', lambda: None)
|
||||
@mock.patch.object(wsgi_service.WSGIService, '_init_host', lambda x: None)
|
||||
@mock.patch.object(utils, 'add_auth_middleware')
|
||||
class TestWSGIService(test_base.BaseTest):
|
||||
class BaseWSGITest(test_base.BaseTest):
|
||||
def setUp(self):
|
||||
super(TestWSGIService, self).setUp()
|
||||
# generic mocks setUp method
|
||||
super(BaseWSGITest, self).setUp()
|
||||
self.app = self.useFixture(fixtures.MockPatchObject(
|
||||
main, 'app', autospec=True)).mock
|
||||
wsgi_service.app, 'app', autospec=True)).mock
|
||||
self.mock__shutting_down = (self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.semaphore, 'Semaphore', autospec=True))
|
||||
.mock.return_value)
|
||||
self.mock__shutting_down.acquire.return_value = True
|
||||
self.mock_log = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service, 'LOG')).mock
|
||||
self.service = wsgi_service.WSGIService()
|
||||
|
||||
def test_init_middleware(self, mock_auth):
|
||||
|
||||
class TestWSGIServiceInitMiddleware(BaseWSGITest):
|
||||
def setUp(self):
|
||||
super(TestWSGIServiceInitMiddleware, self).setUp()
|
||||
self.mock_add_auth_middleware = self.useFixture(
|
||||
fixtures.MockPatchObject(wsgi_service.utils,
|
||||
'add_auth_middleware')).mock
|
||||
self.mock_add_cors_middleware = self.useFixture(
|
||||
fixtures.MockPatchObject(wsgi_service.utils,
|
||||
'add_cors_middleware')).mock
|
||||
# 'positive' settings
|
||||
CONF.set_override('auth_strategy', 'keystone')
|
||||
CONF.set_override('store_data', 'swift', 'processing')
|
||||
|
||||
def test_init_middleware(self):
|
||||
self.service._init_middleware()
|
||||
|
||||
mock_auth.assert_called_once_with(self.app)
|
||||
|
||||
@mock.patch.object(wsgi_service.WSGIService, '_init_middleware')
|
||||
def test_run_ok(self, mock_init_middlw, mock_auth):
|
||||
self.service.run()
|
||||
|
||||
mock_init_middlw.assert_called_once_with()
|
||||
self.app.run.assert_called_once_with(host='0.0.0.0', port=5050)
|
||||
|
||||
@mock.patch.object(wsgi_service.LOG, 'info')
|
||||
def test_init_with_swift_storage(self, mock_log, mock_auth):
|
||||
|
||||
CONF.set_override('store_data', 'swift', 'processing')
|
||||
msg = mock.call('Introspection data will be stored in Swift in the '
|
||||
'container %s', CONF.swift.container)
|
||||
self.service.run()
|
||||
self.assertIn(msg, mock_log.call_args_list)
|
||||
|
||||
def test_init_without_authenticate(self, mock_auth):
|
||||
self.mock_add_auth_middleware.assert_called_once_with(self.app)
|
||||
self.mock_add_cors_middleware.assert_called_once_with(self.app)
|
||||
|
||||
def test_init_middleware_noauth(self):
|
||||
CONF.set_override('auth_strategy', 'noauth')
|
||||
self.service.run()
|
||||
self.assertFalse(mock_auth.called)
|
||||
self.service._init_middleware()
|
||||
|
||||
@mock.patch.object(wsgi_service.LOG, 'warning')
|
||||
def test_init_with_no_data_storage(self, mock_log, mock_auth):
|
||||
msg = ('Introspection data will not be stored. Change '
|
||||
'"[processing] store_data" option if this is not the '
|
||||
'desired behavior')
|
||||
self.mock_add_auth_middleware.assert_not_called()
|
||||
self.mock_log.warning.assert_called_once_with(
|
||||
'Starting unauthenticated, please check configuration')
|
||||
self.mock_add_cors_middleware.assert_called_once_with(self.app)
|
||||
|
||||
def test_init_middleware_no_store(self):
|
||||
CONF.set_override('store_data', 'none', 'processing')
|
||||
self.service._init_middleware()
|
||||
|
||||
self.mock_add_auth_middleware.assert_called_once_with(self.app)
|
||||
self.mock_log.warning.assert_called_once_with(
|
||||
'Introspection data will not be stored. Change "[processing] '
|
||||
'store_data" option if this is not the desired behavior')
|
||||
self.mock_add_cors_middleware.assert_called_once_with(self.app)
|
||||
|
||||
|
||||
class TestWSGIServiceInitHost(BaseWSGITest):
|
||||
def setUp(self):
|
||||
super(TestWSGIServiceInitHost, self).setUp()
|
||||
self.mock_db_init = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.db, 'init')).mock
|
||||
self.mock_validate_processing_hooks = self.useFixture(
|
||||
fixtures.MockPatchObject(wsgi_service.plugins_base,
|
||||
'validate_processing_hooks')).mock
|
||||
self.mock_firewall_init = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.firewall, 'init')).mock
|
||||
self.mock_periodic = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.periodics, 'periodic')).mock
|
||||
self.mock_PeriodicWorker = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.periodics, 'PeriodicWorker')).mock
|
||||
self.mock_executor = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.utils, 'executor')).mock
|
||||
self.mock_ExistingExecutor = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.periodics, 'ExistingExecutor')).mock
|
||||
self.mock_exit = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.sys, 'exit')).mock
|
||||
|
||||
# 'positive' settings
|
||||
CONF.set_override('manage_firewall', True, 'firewall')
|
||||
|
||||
def assert_periodics(self):
|
||||
outer_update_decorator_call = mock.call(
|
||||
spacing=CONF.firewall.firewall_update_period,
|
||||
enabled=CONF.firewall.manage_firewall)
|
||||
outer_cleanup_decorator_call = mock.call(
|
||||
spacing=CONF.clean_up_period)
|
||||
self.mock_periodic.assert_has_calls([
|
||||
outer_update_decorator_call,
|
||||
mock.call()(wsgi_service.firewall.update_filters),
|
||||
outer_cleanup_decorator_call,
|
||||
mock.call()(wsgi_service.periodic_clean_up)])
|
||||
|
||||
inner_decorator = self.mock_periodic.return_value
|
||||
inner_update_decorator_call = mock.call(
|
||||
wsgi_service.firewall.update_filters)
|
||||
inner_cleanup_decorator_call = mock.call(
|
||||
wsgi_service.periodic_clean_up)
|
||||
inner_decorator.assert_has_calls([inner_update_decorator_call,
|
||||
inner_cleanup_decorator_call])
|
||||
|
||||
self.mock_ExistingExecutor.assert_called_once_with(
|
||||
self.mock_executor.return_value)
|
||||
|
||||
periodic_worker = self.mock_PeriodicWorker.return_value
|
||||
|
||||
callables = [(inner_decorator.return_value, None, None),
|
||||
(inner_decorator.return_value, None, None)]
|
||||
self.mock_PeriodicWorker.assert_called_once_with(
|
||||
callables=callables,
|
||||
executor_factory=self.mock_ExistingExecutor.return_value,
|
||||
on_failure=self.service._periodics_watchdog)
|
||||
self.assertIs(periodic_worker, self.service._periodics_worker)
|
||||
|
||||
self.mock_executor.return_value.submit.assert_called_once_with(
|
||||
self.service._periodics_worker.start)
|
||||
|
||||
def test_init_host(self):
|
||||
self.service._init_host()
|
||||
|
||||
self.mock_db_init.asset_called_once_with()
|
||||
self.mock_validate_processing_hooks.assert_called_once_with()
|
||||
self.mock_firewall_init.assert_called_once_with()
|
||||
self.assert_periodics()
|
||||
|
||||
def test_init_host_validate_processing_hooks_exception(self):
|
||||
class MyError(Exception):
|
||||
pass
|
||||
|
||||
error = MyError('Oops!')
|
||||
self.mock_validate_processing_hooks.side_effect = error
|
||||
|
||||
# NOTE(milan): have to stop executing the test case at this point to
|
||||
# simulate a real sys.exit() call
|
||||
self.mock_exit.side_effect = SystemExit('Stop!')
|
||||
self.assertRaisesRegex(SystemExit, 'Stop!', self.service._init_host)
|
||||
|
||||
self.mock_db_init.assert_called_once_with()
|
||||
self.mock_log.critical.assert_called_once_with(str(error))
|
||||
self.mock_exit.assert_called_once_with(1)
|
||||
|
||||
def test_init_host_not_manage_firewall(self):
|
||||
CONF.set_override('manage_firewall', False, 'firewall')
|
||||
self.service._init_host()
|
||||
|
||||
self.mock_db_init.assert_called_once_with()
|
||||
self.mock_firewall_init.assert_not_called()
|
||||
self.assert_periodics()
|
||||
|
||||
|
||||
class TestWSGIServicePeriodicWatchDog(BaseWSGITest):
|
||||
def setUp(self):
|
||||
super(TestWSGIServicePeriodicWatchDog, self).setUp()
|
||||
self.mock_get_callable_name = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.reflection, 'get_callable_name')).mock
|
||||
self.mock_spawn = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.eventlet, 'spawn')).mock
|
||||
|
||||
def test__periodics_watchdog(self):
|
||||
error = RuntimeError('Oops!')
|
||||
|
||||
self.service._periodics_watchdog(
|
||||
callable_=None, activity=None, spacing=None,
|
||||
exc_info=(None, error, None), traceback=None)
|
||||
|
||||
self.mock_get_callable_name.assert_called_once_with(None)
|
||||
self.mock_spawn.assert_called_once_with(self.service.shutdown,
|
||||
error=str(error))
|
||||
|
||||
|
||||
class TestWSGIServiceRun(BaseWSGITest):
|
||||
def setUp(self):
|
||||
super(TestWSGIServiceRun, self).setUp()
|
||||
self.mock__init_host = self.useFixture(fixtures.MockPatchObject(
|
||||
self.service, '_init_host')).mock
|
||||
self.mock__init_middleware = self.useFixture(fixtures.MockPatchObject(
|
||||
self.service, '_init_middleware')).mock
|
||||
self.mock__create_ssl_context = self.useFixture(
|
||||
fixtures.MockPatchObject(self.service, '_create_ssl_context')).mock
|
||||
self.mock_shutdown = self.useFixture(fixtures.MockPatchObject(
|
||||
self.service, 'shutdown')).mock
|
||||
|
||||
# 'positive' settings
|
||||
CONF.set_override('listen_address', '42.42.42.42')
|
||||
CONF.set_override('listen_port', 42)
|
||||
|
||||
def test_run(self):
|
||||
self.service.run()
|
||||
mock_log.assert_called_once_with(msg)
|
||||
|
||||
self.mock__create_ssl_context.assert_called_once_with()
|
||||
self.mock__init_middleware.assert_called_once_with()
|
||||
self.mock__init_host.assert_called_once_with()
|
||||
self.app.run.assert_called_once_with(
|
||||
host=CONF.listen_address, port=CONF.listen_port,
|
||||
ssl_context=self.mock__create_ssl_context.return_value)
|
||||
self.mock_shutdown.assert_called_once_with()
|
||||
|
||||
def test_run_no_ssl_context(self):
|
||||
self.mock__create_ssl_context.return_value = None
|
||||
|
||||
self.service.run()
|
||||
self.mock__create_ssl_context.assert_called_once_with()
|
||||
self.mock__init_middleware.assert_called_once_with()
|
||||
self.mock__init_host.assert_called_once_with()
|
||||
self.app.run.assert_called_once_with(
|
||||
host=CONF.listen_address, port=CONF.listen_port)
|
||||
self.mock_shutdown.assert_called_once_with()
|
||||
|
||||
def test_run_app_error(self):
|
||||
class MyError(Exception):
|
||||
pass
|
||||
|
||||
error = MyError('Oops!')
|
||||
self.app.run.side_effect = error
|
||||
self.service.run()
|
||||
|
||||
self.mock__create_ssl_context.assert_called_once_with()
|
||||
self.mock__init_middleware.assert_called_once_with()
|
||||
self.mock__init_host.assert_called_once_with()
|
||||
self.app.run.assert_called_once_with(
|
||||
host=CONF.listen_address, port=CONF.listen_port,
|
||||
ssl_context=self.mock__create_ssl_context.return_value)
|
||||
self.mock_shutdown.assert_called_once_with(error=str(error))
|
||||
|
||||
|
||||
class TestWSGIServiceShutdown(BaseWSGITest):
|
||||
def setUp(self):
|
||||
super(TestWSGIServiceShutdown, self).setUp()
|
||||
self.mock_firewall_clean_up = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.firewall, 'clean_up')).mock
|
||||
self.mock_executor = mock.Mock()
|
||||
self.mock_executor.alive = True
|
||||
self.mock_get_executor = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.utils, 'executor')).mock
|
||||
self.mock_get_executor.return_value = self.mock_executor
|
||||
self.service = wsgi_service.WSGIService()
|
||||
self.mock__periodic_worker = self.useFixture(fixtures.MockPatchObject(
|
||||
self.service, '_periodics_worker')).mock
|
||||
self.mock_exit = self.useFixture(fixtures.MockPatchObject(
|
||||
wsgi_service.sys, 'exit')).mock
|
||||
|
||||
def test_shutdown(self):
|
||||
class MyError(Exception):
|
||||
pass
|
||||
|
||||
error = MyError('Oops!')
|
||||
|
||||
self.service.shutdown(error=error)
|
||||
|
||||
self.mock__shutting_down.acquire.assert_called_once_with(
|
||||
blocking=False)
|
||||
self.mock__periodic_worker.stop.assert_called_once_with()
|
||||
self.mock__periodic_worker.wait.assert_called_once_with()
|
||||
self.assertIsNone(self.service._periodics_worker)
|
||||
self.mock_executor.shutdown.assert_called_once_with(wait=True)
|
||||
self.mock_firewall_clean_up.assert_called_once_with()
|
||||
self.mock__shutting_down.release.assert_called_once_with()
|
||||
self.mock_exit.assert_called_once_with(error)
|
||||
|
||||
def test_shutdown_race(self):
|
||||
self.mock__shutting_down.acquire.return_value = False
|
||||
|
||||
self.service.shutdown()
|
||||
|
||||
self.mock__shutting_down.acquire.assert_called_once_with(
|
||||
blocking=False)
|
||||
self.mock_log.warning.assert_called_once_with(
|
||||
'Attempted to shut down while already shutting down')
|
||||
self.mock__periodic_worker.stop.assert_not_called()
|
||||
self.mock__periodic_worker.wait.assert_not_called()
|
||||
self.assertIs(self.mock__periodic_worker,
|
||||
self.service._periodics_worker)
|
||||
self.mock_executor.shutdown.assert_not_called()
|
||||
self.mock_firewall_clean_up.assert_not_called()
|
||||
self.mock__shutting_down.release.assert_not_called()
|
||||
self.mock_exit.assert_not_called()
|
||||
|
||||
def test_shutdown_worker_exception(self):
|
||||
class MyError(Exception):
|
||||
pass
|
||||
|
||||
error = MyError('Oops!')
|
||||
self.mock__periodic_worker.wait.side_effect = error
|
||||
|
||||
self.service.shutdown()
|
||||
|
||||
self.mock__shutting_down.acquire.assert_called_once_with(
|
||||
blocking=False)
|
||||
self.mock__periodic_worker.stop.assert_called_once_with()
|
||||
self.mock__periodic_worker.wait.assert_called_once_with()
|
||||
self.mock_log.exception.assert_called_once_with(
|
||||
'Service error occurred when stopping periodic workers. Error: %s',
|
||||
error)
|
||||
self.assertIsNone(self.service._periodics_worker)
|
||||
self.mock_executor.shutdown.assert_called_once_with(wait=True)
|
||||
self.mock_firewall_clean_up.assert_called_once_with()
|
||||
self.mock__shutting_down.release.assert_called_once_with()
|
||||
self.mock_exit.assert_called_once_with(None)
|
||||
|
||||
def test_shutdown_no_worker(self):
|
||||
self.service._periodics_worker = None
|
||||
|
||||
self.service.shutdown()
|
||||
|
||||
self.mock__shutting_down.acquire.assert_called_once_with(
|
||||
blocking=False)
|
||||
self.mock__periodic_worker.stop.assert_not_called()
|
||||
self.mock__periodic_worker.wait.assert_not_called()
|
||||
self.assertIsNone(self.service._periodics_worker)
|
||||
self.mock_executor.shutdown.assert_called_once_with(wait=True)
|
||||
self.mock_firewall_clean_up.assert_called_once_with()
|
||||
self.mock__shutting_down.release.assert_called_once_with()
|
||||
self.mock_exit.assert_called_once_with(None)
|
||||
|
||||
def test_shutdown_stopped_executor(self):
|
||||
self.mock_executor.alive = False
|
||||
|
||||
self.service.shutdown()
|
||||
|
||||
self.mock__shutting_down.acquire.assert_called_once_with(
|
||||
blocking=False)
|
||||
self.mock__periodic_worker.stop.assert_called_once_with()
|
||||
self.mock__periodic_worker.wait.assert_called_once_with()
|
||||
self.assertIsNone(self.service._periodics_worker)
|
||||
self.mock_executor.shutdown.assert_not_called()
|
||||
self.mock_firewall_clean_up.assert_called_once_with()
|
||||
self.mock__shutting_down.release.assert_called_once_with()
|
||||
self.mock_exit.assert_called_once_with(None)
|
||||
|
||||
|
||||
class TestCreateSSLContext(test_base.BaseTest):
|
||||
@ -165,43 +439,3 @@ class TestCreateSSLContext(test_base.BaseTest):
|
||||
self.assertEqual(mock_context, con)
|
||||
mock_context.load_cert_chain.assert_called_once_with(cert_path,
|
||||
key_path)
|
||||
|
||||
|
||||
@mock.patch.object(firewall, 'init')
|
||||
@mock.patch.object(db, 'init')
|
||||
class TestInit(test_base.BaseTest):
|
||||
def setUp(self):
|
||||
super(TestInit, self).setUp()
|
||||
# Tests default to a synchronous executor which can't be used here
|
||||
utils._EXECUTOR = None
|
||||
# Monkey patch for periodic tasks
|
||||
eventlet.monkey_patch()
|
||||
self.wsgi = wsgi_service.WSGIService()
|
||||
|
||||
@mock.patch.object(firewall, 'clean_up', lambda: None)
|
||||
def tearDown(self):
|
||||
self.wsgi.shutdown()
|
||||
super(TestInit, self).tearDown()
|
||||
|
||||
def test_ok(self, mock_db, mock_firewall):
|
||||
self.wsgi._init_host()
|
||||
|
||||
mock_db.assert_called_once_with()
|
||||
mock_firewall.assert_called_once_with()
|
||||
|
||||
def test_init_without_manage_firewall(self, mock_db, mock_firewall):
|
||||
|
||||
CONF.set_override('manage_firewall', False, 'firewall')
|
||||
self.wsgi._init_host()
|
||||
self.assertFalse(mock_firewall.called)
|
||||
|
||||
@mock.patch.object(wsgi_service.LOG, 'critical')
|
||||
def test_init_failed_processing_hook(self, mock_log,
|
||||
mock_db, mock_firewall):
|
||||
|
||||
CONF.set_override('processing_hooks', 'foo!', 'processing')
|
||||
plugins_base._HOOKS_MGR = None
|
||||
|
||||
self.assertRaises(SystemExit, self.wsgi._init_host)
|
||||
mock_log.assert_called_once_with(
|
||||
'The following hook(s) are missing or failed to load: foo!')
|
||||
|
@ -12,10 +12,14 @@
|
||||
|
||||
import ssl
|
||||
import sys
|
||||
import traceback as traceback_mod
|
||||
|
||||
import eventlet
|
||||
from eventlet import semaphore
|
||||
from futurist import periodics
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import reflection
|
||||
|
||||
from ironic_inspector.common import ironic as ir_utils
|
||||
from ironic_inspector import db
|
||||
@ -36,6 +40,7 @@ class WSGIService(object):
|
||||
def __init__(self):
|
||||
self.app = app.app
|
||||
self._periodics_worker = None
|
||||
self._shutting_down = semaphore.Semaphore()
|
||||
|
||||
def _init_middleware(self):
|
||||
"""Initialize WSGI middleware.
|
||||
@ -115,7 +120,7 @@ class WSGIService(object):
|
||||
periodic_update_ = periodics.periodic(
|
||||
spacing=CONF.firewall.firewall_update_period,
|
||||
enabled=CONF.firewall.manage_firewall
|
||||
)(periodic_update)
|
||||
)(firewall.update_filters)
|
||||
periodic_clean_up_ = periodics.periodic(
|
||||
spacing=CONF.clean_up_period
|
||||
)(periodic_clean_up)
|
||||
@ -123,18 +128,29 @@ class WSGIService(object):
|
||||
self._periodics_worker = periodics.PeriodicWorker(
|
||||
callables=[(periodic_update_, None, None),
|
||||
(periodic_clean_up_, None, None)],
|
||||
executor_factory=periodics.ExistingExecutor(utils.executor()))
|
||||
executor_factory=periodics.ExistingExecutor(utils.executor()),
|
||||
on_failure=self._periodics_watchdog)
|
||||
utils.executor().submit(self._periodics_worker.start)
|
||||
|
||||
def shutdown(self):
|
||||
def _periodics_watchdog(self, callable_, activity, spacing, exc_info,
|
||||
traceback=None):
|
||||
LOG.exception("The periodic %(callable)s failed with: %(exception)s", {
|
||||
'exception': ''.join(traceback_mod.format_exception(*exc_info)),
|
||||
'callable': reflection.get_callable_name(callable_)})
|
||||
# NOTE(milan): spawn new thread otherwise waiting would block
|
||||
eventlet.spawn(self.shutdown, error=str(exc_info[1]))
|
||||
|
||||
def shutdown(self, error=None):
|
||||
"""Stop serving API, clean up.
|
||||
|
||||
:returns: None
|
||||
"""
|
||||
# TODO(aarefiev): move shutdown code to WorkerService
|
||||
LOG.debug('Shutting down')
|
||||
if not self._shutting_down.acquire(blocking=False):
|
||||
LOG.warning('Attempted to shut down while already shutting down')
|
||||
return
|
||||
|
||||
firewall.clean_up()
|
||||
LOG.debug('Shutting down')
|
||||
|
||||
if self._periodics_worker is not None:
|
||||
try:
|
||||
@ -148,7 +164,11 @@ class WSGIService(object):
|
||||
if utils.executor().alive:
|
||||
utils.executor().shutdown(wait=True)
|
||||
|
||||
firewall.clean_up()
|
||||
|
||||
self._shutting_down.release()
|
||||
LOG.info('Shut down successfully')
|
||||
sys.exit(error)
|
||||
|
||||
def run(self):
|
||||
"""Start serving this service using loaded application.
|
||||
@ -168,17 +188,12 @@ class WSGIService(object):
|
||||
|
||||
try:
|
||||
self.app.run(**app_kwargs)
|
||||
finally:
|
||||
except Exception as e:
|
||||
self.shutdown(error=str(e))
|
||||
else:
|
||||
self.shutdown()
|
||||
|
||||
|
||||
def periodic_update(): # pragma: no cover
|
||||
try:
|
||||
firewall.update_filters()
|
||||
except Exception:
|
||||
LOG.exception('Periodic update of firewall rules failed')
|
||||
|
||||
|
||||
def periodic_clean_up(): # pragma: no cover
|
||||
try:
|
||||
if node_cache.clean_up():
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
other:
|
||||
- |
|
||||
Allow a periodic task to shut down **ironic-inspector** upon a failure
|
Loading…
x
Reference in New Issue
Block a user