diff --git a/devstack/plugin.sh b/devstack/plugin.sh index d5170a441..68581f4c1 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -267,6 +267,8 @@ function configure_inspector { inspector_iniset iptables dnsmasq_interface $IRONIC_INSPECTOR_INTERFACE inspector_iniset database connection `database_connection_url ironic_inspector` + iniset_rpc_backend ironic-inspector $IRONIC_INSPECTOR_CONF_FILE + if is_service_enabled swift; then configure_inspector_swift fi diff --git a/devstack/upgrade/upgrade.sh b/devstack/upgrade/upgrade.sh index 7138f2d86..949bb8f5f 100755 --- a/devstack/upgrade/upgrade.sh +++ b/devstack/upgrade/upgrade.sh @@ -45,6 +45,7 @@ source $TARGET_DEVSTACK_DIR/lib/neutron-legacy source $TARGET_DEVSTACK_DIR/lib/apache source $TARGET_DEVSTACK_DIR/lib/keystone source $TARGET_DEVSTACK_DIR/lib/database +source $TARGET_DEVSTACK_DIR/lib/rpc_backend # Inspector relies on couple of Ironic variables source $TARGET_RELEASE_DIR/ironic/devstack/lib/ironic @@ -84,6 +85,8 @@ $IRONIC_INSPECTOR_DBSYNC_BIN_FILE --config-file $IRONIC_INSPECTOR_CONF_FILE upgr # calls upgrade inspector for specific release upgrade_project ironic-inspector $RUN_DIR $BASE_DEVSTACK_BRANCH $TARGET_DEVSTACK_BRANCH +# setup transport_url for rpc messaging +iniset_rpc_backend ironic-inspector $IRONIC_INSPECTOR_CONF_FILE start_inspector if is_inspector_dhcp_required; then diff --git a/ironic_inspector/common/rpc.py b/ironic_inspector/common/rpc.py index 5586d8dc5..31e531102 100644 --- a/ironic_inspector/common/rpc.py +++ b/ironic_inspector/common/rpc.py @@ -19,38 +19,31 @@ from oslo_messaging.rpc import dispatcher from ironic_inspector.conductor import manager CONF = cfg.CONF - -_SERVER = None TRANSPORT = None -TOPIC = 'ironic-inspector-worker' -SERVER_NAME = 'ironic-inspector-rpc-server' def get_transport(): global TRANSPORT if TRANSPORT is None: - TRANSPORT = messaging.get_rpc_transport(CONF, url='fake://') + TRANSPORT = messaging.get_rpc_transport(CONF) return TRANSPORT def get_client(): - target = messaging.Target(topic=TOPIC, server=SERVER_NAME, + """Get a RPC client instance.""" + target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host, version='1.1') transport = get_transport() return messaging.RPCClient(transport, target) -def get_server(): - """Get the singleton RPC server.""" - global _SERVER +def get_server(endpoints): + """Get a RPC server instance.""" - if _SERVER is None: - transport = get_transport() - target = messaging.Target(topic=TOPIC, server=SERVER_NAME, - version='1.1') - mgr = manager.ConductorManager() - _SERVER = messaging.get_rpc_server( - transport, target, [mgr], executor='eventlet', - access_policy=dispatcher.DefaultRPCAccessPolicy) - return _SERVER + transport = get_transport() + target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host, + version='1.1') + return messaging.get_rpc_server( + transport, target, endpoints, executor='eventlet', + access_policy=dispatcher.DefaultRPCAccessPolicy) diff --git a/ironic_inspector/common/rpc_service.py b/ironic_inspector/common/rpc_service.py new file mode 100644 index 000000000..5035106a7 --- /dev/null +++ b/ironic_inspector/common/rpc_service.py @@ -0,0 +1,62 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log +from oslo_service import service + +from ironic_inspector.common import rpc +from ironic_inspector.conductor import manager + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + +SERVER_NAME = 'ironic-inspector-rpc-server' + + +class RPCService(service.Service): + + def __init__(self, host): + super(RPCService, self).__init__() + self.host = host + self.manager = manager.ConductorManager() + self.rpcserver = None + + def start(self): + super(RPCService, self).start() + self.rpcserver = rpc.get_server([self.manager]) + self.rpcserver.start() + + self.manager.init_host() + LOG.info('Created RPC server for service %(service)s on host ' + '%(host)s.', + {'service': manager.MANAGER_TOPIC, 'host': self.host}) + + def stop(self): + try: + self.rpcserver.stop() + self.rpcserver.wait() + except Exception as e: + LOG.exception('Service error occurred when stopping the ' + 'RPC server. Error: %s', e) + + try: + self.manager.del_host() + except Exception as e: + LOG.exception('Service error occurred when cleaning up ' + 'the RPC manager. Error: %s', e) + + super(RPCService, self).stop(graceful=True) + LOG.info('Stopped RPC server for service %(service)s on host ' + '%(host)s.', + {'service': manager.MANAGER_TOPIC, 'host': self.host}) diff --git a/ironic_inspector/conductor/manager.py b/ironic_inspector/conductor/manager.py index 84061cba1..c4ab9584f 100644 --- a/ironic_inspector/conductor/manager.py +++ b/ironic_inspector/conductor/manager.py @@ -11,12 +11,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -import oslo_messaging as messaging +import sys +import traceback as traceback_mod +import eventlet +from eventlet import semaphore +from futurist import periodics +from oslo_config import cfg +from oslo_log import log +import oslo_messaging as messaging +from oslo_utils import reflection + +from ironic_inspector.common import ironic as ir_utils +from ironic_inspector import db from ironic_inspector import introspect +from ironic_inspector import node_cache +from ironic_inspector.plugins import base as plugins_base from ironic_inspector import process +from ironic_inspector.pxe_filter import base as pxe_filter from ironic_inspector import utils +LOG = log.getLogger(__name__) +CONF = cfg.CONF +MANAGER_TOPIC = 'ironic-inspector-conductor' + class ConductorManager(object): """ironic inspector conductor manager""" @@ -24,6 +42,79 @@ class ConductorManager(object): target = messaging.Target(version=RPC_API_VERSION) + def __init__(self): + self._periodics_worker = None + self._shutting_down = semaphore.Semaphore() + + def init_host(self): + """Initialize Worker host + + Init db connection, load and validate processing + hooks, runs periodic tasks. + + :returns None + """ + if CONF.processing.store_data == 'none': + LOG.warning('Introspection data will not be stored. Change ' + '"[processing] store_data" option if this is not ' + 'the desired behavior') + elif CONF.processing.store_data == 'swift': + LOG.info('Introspection data will be stored in Swift in the ' + 'container %s', CONF.swift.container) + + db.init() + + try: + hooks = plugins_base.validate_processing_hooks() + except Exception as exc: + LOG.critical(str(exc)) + sys.exit(1) + LOG.info('Enabled processing hooks: %s', [h.name for h in hooks]) + + driver = pxe_filter.driver() + driver.init_filter() + + periodic_clean_up_ = periodics.periodic( + spacing=CONF.clean_up_period + )(periodic_clean_up) + + self._periodics_worker = periodics.PeriodicWorker( + callables=[(driver.get_periodic_sync_task(), None, None), + (periodic_clean_up_, None, None)], + executor_factory=periodics.ExistingExecutor(utils.executor()), + on_failure=self._periodics_watchdog) + utils.executor().submit(self._periodics_worker.start) + + def del_host(self): + + if not self._shutting_down.acquire(blocking=False): + LOG.warning('Attempted to shut down while already shutting down') + return + + pxe_filter.driver().tear_down_filter() + if self._periodics_worker is not None: + try: + self._periodics_worker.stop() + self._periodics_worker.wait() + except Exception as e: + LOG.exception('Service error occurred when stopping ' + 'periodic workers. Error: %s', e) + self._periodics_worker = None + + if utils.executor().alive: + utils.executor().shutdown(wait=True) + + self._shutting_down.release() + LOG.info('Shut down successfully') + + def _periodics_watchdog(self, callable_, activity, spacing, exc_info, + traceback=None): + LOG.exception("The periodic %(callable)s failed with: %(exception)s", { + 'exception': ''.join(traceback_mod.format_exception(*exc_info)), + 'callable': reflection.get_callable_name(callable_)}) + # NOTE(milan): spawn new thread otherwise waiting would block + eventlet.spawn(self.del_host) + @messaging.expected_exceptions(utils.Error) def do_introspection(self, context, node_id, token=None, manage_boot=True): @@ -36,3 +127,20 @@ class ConductorManager(object): @messaging.expected_exceptions(utils.Error) def do_reapply(self, context, node_id, token=None): process.reapply(node_id) + + +def periodic_clean_up(): # pragma: no cover + try: + if node_cache.clean_up(): + pxe_filter.driver().sync(ir_utils.get_client()) + sync_with_ironic() + except Exception: + LOG.exception('Periodic clean up of node cache failed') + + +def sync_with_ironic(): + ironic = ir_utils.get_client() + # TODO(yuikotakada): pagination + ironic_nodes = ironic.node.list(limit=0) + ironic_node_uuids = {node.uuid for node in ironic_nodes} + node_cache.delete_nodes_not_in_list(ironic_node_uuids) diff --git a/ironic_inspector/conf/default.py b/ironic_inspector/conf/default.py index b840341be..ef2b9c2d5 100644 --- a/ironic_inspector/conf/default.py +++ b/ironic_inspector/conf/default.py @@ -11,6 +11,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import socket + from oslo_config import cfg from ironic_inspector.common.i18n import _ @@ -23,6 +25,14 @@ _OPTS = [ cfg.PortOpt('listen_port', default=5050, help=_('Port to listen on.')), + cfg.StrOpt('host', + default=socket.getfqdn(), + sample_default='localhost', + help=_('Name of this node. This can be an opaque identifier. ' + 'It is not necessarily a hostname, FQDN, or IP address. ' + 'However, the node name must be valid within ' + 'an AMQP key, and if using ZeroMQ, a valid ' + 'hostname, FQDN, or IP address.')), cfg.StrOpt('auth_strategy', default='keystone', choices=('keystone', 'noauth'), diff --git a/ironic_inspector/test/functional.py b/ironic_inspector/test/functional.py index b26f3381d..cdae214e9 100644 --- a/ironic_inspector/test/functional.py +++ b/ironic_inspector/test/functional.py @@ -57,6 +57,7 @@ driver = noop debug = True introspection_delay = 0 auth_strategy=noauth +transport_url=fake:// [database] connection = sqlite:///%(db_file)s [processing] diff --git a/ironic_inspector/test/unit/test_manager.py b/ironic_inspector/test/unit/test_manager.py index e898a516d..71131f756 100644 --- a/ironic_inspector/test/unit/test_manager.py +++ b/ironic_inspector/test/unit/test_manager.py @@ -11,6 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import fixtures import mock import oslo_messaging as messaging @@ -27,11 +28,213 @@ CONF = ironic_inspector.conf.CONF class BaseManagerTest(test_base.NodeTest): def setUp(self): super(BaseManagerTest, self).setUp() + self.mock_log = self.useFixture(fixtures.MockPatchObject( + manager, 'LOG')).mock + self.mock__shutting_down = (self.useFixture(fixtures.MockPatchObject( + manager.semaphore, 'Semaphore', autospec=True)) + .mock.return_value) + self.mock__shutting_down.acquire.return_value = True self.manager = manager.ConductorManager() self.context = {} self.token = None +class TestManagerInitHost(BaseManagerTest): + def setUp(self): + super(TestManagerInitHost, self).setUp() + self.mock_db_init = self.useFixture(fixtures.MockPatchObject( + manager.db, 'init')).mock + self.mock_validate_processing_hooks = self.useFixture( + fixtures.MockPatchObject(manager.plugins_base, + 'validate_processing_hooks')).mock + self.mock_filter = self.useFixture(fixtures.MockPatchObject( + manager.pxe_filter, 'driver')).mock.return_value + self.mock_periodic = self.useFixture(fixtures.MockPatchObject( + manager.periodics, 'periodic')).mock + self.mock_PeriodicWorker = self.useFixture(fixtures.MockPatchObject( + manager.periodics, 'PeriodicWorker')).mock + self.mock_executor = self.useFixture(fixtures.MockPatchObject( + manager.utils, 'executor')).mock + self.mock_ExistingExecutor = self.useFixture(fixtures.MockPatchObject( + manager.periodics, 'ExistingExecutor')).mock + self.mock_exit = self.useFixture(fixtures.MockPatchObject( + manager.sys, 'exit')).mock + + def assert_periodics(self): + outer_cleanup_decorator_call = mock.call( + spacing=CONF.clean_up_period) + self.mock_periodic.assert_has_calls([ + outer_cleanup_decorator_call, + mock.call()(manager.periodic_clean_up)]) + + inner_decorator = self.mock_periodic.return_value + inner_cleanup_decorator_call = mock.call( + manager.periodic_clean_up) + inner_decorator.assert_has_calls([inner_cleanup_decorator_call]) + + self.mock_ExistingExecutor.assert_called_once_with( + self.mock_executor.return_value) + + periodic_worker = self.mock_PeriodicWorker.return_value + + periodic_sync = self.mock_filter.get_periodic_sync_task.return_value + callables = [(periodic_sync, None, None), + (inner_decorator.return_value, None, None)] + self.mock_PeriodicWorker.assert_called_once_with( + callables=callables, + executor_factory=self.mock_ExistingExecutor.return_value, + on_failure=self.manager._periodics_watchdog) + self.assertIs(periodic_worker, self.manager._periodics_worker) + + self.mock_executor.return_value.submit.assert_called_once_with( + self.manager._periodics_worker.start) + + def test_no_introspection_data_store(self): + CONF.set_override('store_data', 'none', 'processing') + self.manager.init_host() + self.mock_log.warning.assert_called_once_with( + 'Introspection data will not be stored. Change "[processing] ' + 'store_data" option if this is not the desired behavior') + + def test_init_host(self): + self.manager.init_host() + self.mock_db_init.asset_called_once_with() + self.mock_validate_processing_hooks.assert_called_once_with() + self.mock_filter.init_filter.assert_called_once_with() + self.assert_periodics() + + def test_init_host_validate_processing_hooks_exception(self): + class MyError(Exception): + pass + + error = MyError('Oops!') + self.mock_validate_processing_hooks.side_effect = error + + # NOTE(milan): have to stop executing the test case at this point to + # simulate a real sys.exit() call + self.mock_exit.side_effect = SystemExit('Stop!') + self.assertRaisesRegex(SystemExit, 'Stop!', self.manager.init_host) + + self.mock_db_init.assert_called_once_with() + self.mock_log.critical.assert_called_once_with(str(error)) + self.mock_exit.assert_called_once_with(1) + self.mock_filter.init_filter.assert_not_called() + + +class TestManagerDelHost(BaseManagerTest): + def setUp(self): + super(TestManagerDelHost, self).setUp() + self.mock_filter = self.useFixture(fixtures.MockPatchObject( + manager.pxe_filter, 'driver')).mock.return_value + self.mock_executor = mock.Mock() + self.mock_executor.alive = True + self.mock_get_executor = self.useFixture(fixtures.MockPatchObject( + manager.utils, 'executor')).mock + self.mock_get_executor.return_value = self.mock_executor + self.mock__periodic_worker = self.useFixture(fixtures.MockPatchObject( + self.manager, '_periodics_worker')).mock + self.mock_exit = self.useFixture(fixtures.MockPatchObject( + manager.sys, 'exit')).mock + + def test_del_host(self): + self.manager.del_host() + + self.mock__shutting_down.acquire.assert_called_once_with( + blocking=False) + self.mock__periodic_worker.stop.assert_called_once_with() + self.mock__periodic_worker.wait.assert_called_once_with() + self.assertIsNone(self.manager._periodics_worker) + self.mock_executor.shutdown.assert_called_once_with(wait=True) + self.mock_filter.tear_down_filter.assert_called_once_with() + self.mock__shutting_down.release.assert_called_once_with() + + def test_del_host_race(self): + self.mock__shutting_down.acquire.return_value = False + + self.manager.del_host() + + self.mock__shutting_down.acquire.assert_called_once_with( + blocking=False) + self.mock_log.warning.assert_called_once_with( + 'Attempted to shut down while already shutting down') + self.mock__periodic_worker.stop.assert_not_called() + self.mock__periodic_worker.wait.assert_not_called() + self.assertIs(self.mock__periodic_worker, + self.manager._periodics_worker) + self.mock_executor.shutdown.assert_not_called() + self.mock_filter.tear_down_filter.assert_not_called() + self.mock__shutting_down.release.assert_not_called() + self.mock_exit.assert_not_called() + + def test_del_host_worker_exception(self): + class MyError(Exception): + pass + + error = MyError('Oops!') + self.mock__periodic_worker.wait.side_effect = error + + self.manager.del_host() + + self.mock__shutting_down.acquire.assert_called_once_with( + blocking=False) + self.mock__periodic_worker.stop.assert_called_once_with() + self.mock__periodic_worker.wait.assert_called_once_with() + self.mock_log.exception.assert_called_once_with( + 'Service error occurred when stopping periodic workers. Error: %s', + error) + self.assertIsNone(self.manager._periodics_worker) + self.mock_executor.shutdown.assert_called_once_with(wait=True) + self.mock_filter.tear_down_filter.assert_called_once_with() + self.mock__shutting_down.release.assert_called_once_with() + + def test_del_host_no_worker(self): + self.manager._periodics_worker = None + + self.manager.del_host() + + self.mock__shutting_down.acquire.assert_called_once_with( + blocking=False) + self.mock__periodic_worker.stop.assert_not_called() + self.mock__periodic_worker.wait.assert_not_called() + self.assertIsNone(self.manager._periodics_worker) + self.mock_executor.shutdown.assert_called_once_with(wait=True) + self.mock_filter.tear_down_filter.assert_called_once_with() + self.mock__shutting_down.release.assert_called_once_with() + + def test_del_host_stopped_executor(self): + self.mock_executor.alive = False + + self.manager.del_host() + + self.mock__shutting_down.acquire.assert_called_once_with( + blocking=False) + self.mock__periodic_worker.stop.assert_called_once_with() + self.mock__periodic_worker.wait.assert_called_once_with() + self.assertIsNone(self.manager._periodics_worker) + self.mock_executor.shutdown.assert_not_called() + self.mock_filter.tear_down_filter.assert_called_once_with() + self.mock__shutting_down.release.assert_called_once_with() + + +class TestManagerPeriodicWatchDog(BaseManagerTest): + def setUp(self): + super(TestManagerPeriodicWatchDog, self).setUp() + self.mock_get_callable_name = self.useFixture(fixtures.MockPatchObject( + manager.reflection, 'get_callable_name')).mock + self.mock_spawn = self.useFixture(fixtures.MockPatchObject( + manager.eventlet, 'spawn')).mock + + def test__periodics_watchdog(self): + error = RuntimeError('Oops!') + + self.manager._periodics_watchdog( + callable_=None, activity=None, spacing=None, + exc_info=(None, error, None), traceback=None) + + self.mock_get_callable_name.assert_called_once_with(None) + self.mock_spawn.assert_called_once_with(self.manager.del_host) + + class TestManagerIntrospect(BaseManagerTest): @mock.patch.object(introspect, 'introspect', autospec=True) def test_do_introspect(self, introspect_mock): diff --git a/ironic_inspector/test/unit/test_wsgi_service.py b/ironic_inspector/test/unit/test_wsgi_service.py index 6f37e79c9..3cb83dbaa 100644 --- a/ironic_inspector/test/unit/test_wsgi_service.py +++ b/ironic_inspector/test/unit/test_wsgi_service.py @@ -20,11 +20,9 @@ import fixtures import mock from oslo_config import cfg -from ironic_inspector.common import rpc from ironic_inspector.test import base as test_base from ironic_inspector import wsgi_service - CONF = cfg.CONF @@ -34,15 +32,9 @@ class BaseWSGITest(test_base.BaseTest): super(BaseWSGITest, self).setUp() self.app = self.useFixture(fixtures.MockPatchObject( wsgi_service.app, 'app', autospec=True)).mock - self.mock__shutting_down = (self.useFixture(fixtures.MockPatchObject( - wsgi_service.semaphore, 'Semaphore', autospec=True)) - .mock.return_value) - self.mock__shutting_down.acquire.return_value = True self.mock_log = self.useFixture(fixtures.MockPatchObject( wsgi_service, 'LOG')).mock self.service = wsgi_service.WSGIService() - self.mock_rpc_server = self.useFixture(fixtures.MockPatchObject( - rpc, 'get_server')).mock class TestWSGIServiceInitMiddleware(BaseWSGITest): @@ -73,118 +65,10 @@ class TestWSGIServiceInitMiddleware(BaseWSGITest): 'Starting unauthenticated, please check configuration') self.mock_add_cors_middleware.assert_called_once_with(self.app) - def test_init_middleware_no_store(self): - CONF.set_override('store_data', 'none', 'processing') - self.service._init_middleware() - - self.mock_add_auth_middleware.assert_called_once_with(self.app) - self.mock_log.warning.assert_called_once_with( - 'Introspection data will not be stored. Change "[processing] ' - 'store_data" option if this is not the desired behavior') - self.mock_add_cors_middleware.assert_called_once_with(self.app) - - -class TestWSGIServiceInitHost(BaseWSGITest): - def setUp(self): - super(TestWSGIServiceInitHost, self).setUp() - self.mock_db_init = self.useFixture(fixtures.MockPatchObject( - wsgi_service.db, 'init')).mock - self.mock_validate_processing_hooks = self.useFixture( - fixtures.MockPatchObject(wsgi_service.plugins_base, - 'validate_processing_hooks')).mock - self.mock_filter = self.useFixture(fixtures.MockPatchObject( - wsgi_service.pxe_filter, 'driver')).mock.return_value - self.mock_periodic = self.useFixture(fixtures.MockPatchObject( - wsgi_service.periodics, 'periodic')).mock - self.mock_PeriodicWorker = self.useFixture(fixtures.MockPatchObject( - wsgi_service.periodics, 'PeriodicWorker')).mock - self.mock_executor = self.useFixture(fixtures.MockPatchObject( - wsgi_service.utils, 'executor')).mock - self.mock_ExistingExecutor = self.useFixture(fixtures.MockPatchObject( - wsgi_service.periodics, 'ExistingExecutor')).mock - self.mock_exit = self.useFixture(fixtures.MockPatchObject( - wsgi_service.sys, 'exit')).mock - - def assert_periodics(self): - outer_cleanup_decorator_call = mock.call( - spacing=CONF.clean_up_period) - self.mock_periodic.assert_has_calls([ - outer_cleanup_decorator_call, - mock.call()(wsgi_service.periodic_clean_up)]) - - inner_decorator = self.mock_periodic.return_value - inner_cleanup_decorator_call = mock.call( - wsgi_service.periodic_clean_up) - inner_decorator.assert_has_calls([inner_cleanup_decorator_call]) - - self.mock_ExistingExecutor.assert_called_once_with( - self.mock_executor.return_value) - - periodic_worker = self.mock_PeriodicWorker.return_value - - periodic_sync = self.mock_filter.get_periodic_sync_task.return_value - callables = [(periodic_sync, None, None), - (inner_decorator.return_value, None, None)] - self.mock_PeriodicWorker.assert_called_once_with( - callables=callables, - executor_factory=self.mock_ExistingExecutor.return_value, - on_failure=self.service._periodics_watchdog) - self.assertIs(periodic_worker, self.service._periodics_worker) - - self.mock_executor.return_value.submit.assert_called_once_with( - self.service._periodics_worker.start) - - def test_init_host(self): - self.service._init_host() - - self.mock_db_init.asset_called_once_with() - self.mock_validate_processing_hooks.assert_called_once_with() - self.mock_filter.init_filter.assert_called_once_with() - self.assert_periodics() - - def test_init_host_validate_processing_hooks_exception(self): - class MyError(Exception): - pass - - error = MyError('Oops!') - self.mock_validate_processing_hooks.side_effect = error - - # NOTE(milan): have to stop executing the test case at this point to - # simulate a real sys.exit() call - self.mock_exit.side_effect = SystemExit('Stop!') - self.assertRaisesRegex(SystemExit, 'Stop!', self.service._init_host) - - self.mock_db_init.assert_called_once_with() - self.mock_log.critical.assert_called_once_with(str(error)) - self.mock_exit.assert_called_once_with(1) - self.mock_filter.init_filter.assert_not_called() - - -class TestWSGIServicePeriodicWatchDog(BaseWSGITest): - def setUp(self): - super(TestWSGIServicePeriodicWatchDog, self).setUp() - self.mock_get_callable_name = self.useFixture(fixtures.MockPatchObject( - wsgi_service.reflection, 'get_callable_name')).mock - self.mock_spawn = self.useFixture(fixtures.MockPatchObject( - wsgi_service.eventlet, 'spawn')).mock - - def test__periodics_watchdog(self): - error = RuntimeError('Oops!') - - self.service._periodics_watchdog( - callable_=None, activity=None, spacing=None, - exc_info=(None, error, None), traceback=None) - - self.mock_get_callable_name.assert_called_once_with(None) - self.mock_spawn.assert_called_once_with(self.service.shutdown, - error=str(error)) - class TestWSGIServiceRun(BaseWSGITest): def setUp(self): super(TestWSGIServiceRun, self).setUp() - self.mock__init_host = self.useFixture(fixtures.MockPatchObject( - self.service, '_init_host')).mock self.mock__init_middleware = self.useFixture(fixtures.MockPatchObject( self.service, '_init_middleware')).mock self.mock__create_ssl_context = self.useFixture( @@ -201,9 +85,6 @@ class TestWSGIServiceRun(BaseWSGITest): self.mock__create_ssl_context.assert_called_once_with() self.mock__init_middleware.assert_called_once_with() - self.mock__init_host.assert_called_once_with() - self.mock_rpc_server.assert_called_once_with() - self.service.rpc_server.start.assert_called_once_with() self.app.run.assert_called_once_with( host=CONF.listen_address, port=CONF.listen_port, ssl_context=self.mock__create_ssl_context.return_value) @@ -215,7 +96,6 @@ class TestWSGIServiceRun(BaseWSGITest): self.service.run() self.mock__create_ssl_context.assert_called_once_with() self.mock__init_middleware.assert_called_once_with() - self.mock__init_host.assert_called_once_with() self.app.run.assert_called_once_with( host=CONF.listen_address, port=CONF.listen_port) self.mock_shutdown.assert_called_once_with() @@ -230,7 +110,6 @@ class TestWSGIServiceRun(BaseWSGITest): self.mock__create_ssl_context.assert_called_once_with() self.mock__init_middleware.assert_called_once_with() - self.mock__init_host.assert_called_once_with() self.app.run.assert_called_once_with( host=CONF.listen_address, port=CONF.listen_port, ssl_context=self.mock__create_ssl_context.return_value) @@ -240,108 +119,21 @@ class TestWSGIServiceRun(BaseWSGITest): class TestWSGIServiceShutdown(BaseWSGITest): def setUp(self): super(TestWSGIServiceShutdown, self).setUp() - self.mock_filter = self.useFixture(fixtures.MockPatchObject( - wsgi_service.pxe_filter, 'driver')).mock.return_value - self.mock_executor = mock.Mock() - self.mock_executor.alive = True - self.mock_get_executor = self.useFixture(fixtures.MockPatchObject( - wsgi_service.utils, 'executor')).mock - self.mock_get_executor.return_value = self.mock_executor self.service = wsgi_service.WSGIService() - self.mock__periodic_worker = self.useFixture(fixtures.MockPatchObject( - self.service, '_periodics_worker')).mock + self.mock_rpc_service = mock.MagicMock() + self.service.rpc_service = self.mock_rpc_service self.mock_exit = self.useFixture(fixtures.MockPatchObject( wsgi_service.sys, 'exit')).mock - self.service.rpc_server = self.mock_rpc_server def test_shutdown(self): class MyError(Exception): pass - error = MyError('Oops!') self.service.shutdown(error=error) - - self.mock__shutting_down.acquire.assert_called_once_with( - blocking=False) - self.mock__periodic_worker.stop.assert_called_once_with() - self.mock__periodic_worker.wait.assert_called_once_with() - self.assertIsNone(self.service._periodics_worker) - self.mock_executor.shutdown.assert_called_once_with(wait=True) - self.mock_filter.tear_down_filter.assert_called_once_with() - self.mock__shutting_down.release.assert_called_once_with() + self.mock_rpc_service.stop.assert_called_once_with() self.mock_exit.assert_called_once_with(error) - def test_shutdown_race(self): - self.mock__shutting_down.acquire.return_value = False - - self.service.shutdown() - - self.mock__shutting_down.acquire.assert_called_once_with( - blocking=False) - self.mock_log.warning.assert_called_once_with( - 'Attempted to shut down while already shutting down') - self.mock__periodic_worker.stop.assert_not_called() - self.mock__periodic_worker.wait.assert_not_called() - self.assertIs(self.mock__periodic_worker, - self.service._periodics_worker) - self.mock_executor.shutdown.assert_not_called() - self.mock_filter.tear_down_filter.assert_not_called() - self.mock__shutting_down.release.assert_not_called() - self.mock_exit.assert_not_called() - - def test_shutdown_worker_exception(self): - class MyError(Exception): - pass - - error = MyError('Oops!') - self.mock__periodic_worker.wait.side_effect = error - - self.service.shutdown() - - self.mock__shutting_down.acquire.assert_called_once_with( - blocking=False) - self.mock__periodic_worker.stop.assert_called_once_with() - self.mock__periodic_worker.wait.assert_called_once_with() - self.mock_log.exception.assert_called_once_with( - 'Service error occurred when stopping periodic workers. Error: %s', - error) - self.assertIsNone(self.service._periodics_worker) - self.mock_executor.shutdown.assert_called_once_with(wait=True) - self.mock_filter.tear_down_filter.assert_called_once_with() - self.mock__shutting_down.release.assert_called_once_with() - self.mock_exit.assert_called_once_with(None) - - def test_shutdown_no_worker(self): - self.service._periodics_worker = None - - self.service.shutdown() - - self.mock__shutting_down.acquire.assert_called_once_with( - blocking=False) - self.mock__periodic_worker.stop.assert_not_called() - self.mock__periodic_worker.wait.assert_not_called() - self.assertIsNone(self.service._periodics_worker) - self.mock_executor.shutdown.assert_called_once_with(wait=True) - self.mock_filter.tear_down_filter.assert_called_once_with() - self.mock__shutting_down.release.assert_called_once_with() - self.mock_exit.assert_called_once_with(None) - - def test_shutdown_stopped_executor(self): - self.mock_executor.alive = False - - self.service.shutdown() - - self.mock__shutting_down.acquire.assert_called_once_with( - blocking=False) - self.mock__periodic_worker.stop.assert_called_once_with() - self.mock__periodic_worker.wait.assert_called_once_with() - self.assertIsNone(self.service._periodics_worker) - self.mock_executor.shutdown.assert_not_called() - self.mock_filter.tear_down_filter.assert_called_once_with() - self.mock__shutting_down.release.assert_called_once_with() - self.mock_exit.assert_called_once_with(None) - class TestCreateSSLContext(test_base.BaseTest): def setUp(self): diff --git a/ironic_inspector/wsgi_service.py b/ironic_inspector/wsgi_service.py index 33fc40754..8513e16f6 100644 --- a/ironic_inspector/wsgi_service.py +++ b/ironic_inspector/wsgi_service.py @@ -13,25 +13,16 @@ import signal import ssl import sys -import traceback as traceback_mod import eventlet -from eventlet import semaphore -from futurist import periodics from oslo_config import cfg from oslo_log import log -from oslo_utils import reflection +from oslo_service import service -from ironic_inspector.common import ironic as ir_utils -from ironic_inspector.common import rpc -from ironic_inspector import db +from ironic_inspector.common.rpc_service import RPCService from ironic_inspector import main as app -from ironic_inspector import node_cache -from ironic_inspector.plugins import base as plugins_base -from ironic_inspector.pxe_filter import base as pxe_filter from ironic_inspector import utils - LOG = log.getLogger(__name__) CONF = cfg.CONF @@ -41,10 +32,9 @@ class WSGIService(object): def __init__(self): self.app = app.app - self._periodics_worker = None - self._shutting_down = semaphore.Semaphore() signal.signal(signal.SIGHUP, self._handle_sighup) signal.signal(signal.SIGTERM, self._handle_sigterm) + self.rpc_service = RPCService(CONF.host) def _init_middleware(self): """Initialize WSGI middleware. @@ -57,15 +47,6 @@ class WSGIService(object): else: LOG.warning('Starting unauthenticated, please check' ' configuration') - - # TODO(aarefiev): move to WorkerService once we split service - if CONF.processing.store_data == 'none': - LOG.warning('Introspection data will not be stored. Change ' - '"[processing] store_data" option if this is not ' - 'the desired behavior') - elif CONF.processing.store_data == 'swift': - LOG.info('Introspection data will be stored in Swift in the ' - 'container %s', CONF.swift.container) utils.add_cors_middleware(self.app) def _create_ssl_context(self): @@ -99,77 +80,13 @@ class WSGIService(object): 'settings: %s', exc) return context - # TODO(aarefiev): move init code to WorkerService - def _init_host(self): - """Initialize Worker host - - Init db connection, load and validate processing - hooks, runs periodic tasks. - - :returns None - """ - db.init() - - try: - hooks = plugins_base.validate_processing_hooks() - except Exception as exc: - LOG.critical(str(exc)) - sys.exit(1) - - LOG.info('Enabled processing hooks: %s', [h.name for h in hooks]) - - driver = pxe_filter.driver() - driver.init_filter() - - periodic_clean_up_ = periodics.periodic( - spacing=CONF.clean_up_period - )(periodic_clean_up) - - self._periodics_worker = periodics.PeriodicWorker( - callables=[(driver.get_periodic_sync_task(), None, None), - (periodic_clean_up_, None, None)], - executor_factory=periodics.ExistingExecutor(utils.executor()), - on_failure=self._periodics_watchdog) - utils.executor().submit(self._periodics_worker.start) - - def _periodics_watchdog(self, callable_, activity, spacing, exc_info, - traceback=None): - LOG.exception("The periodic %(callable)s failed with: %(exception)s", { - 'exception': ''.join(traceback_mod.format_exception(*exc_info)), - 'callable': reflection.get_callable_name(callable_)}) - # NOTE(milan): spawn new thread otherwise waiting would block - eventlet.spawn(self.shutdown, error=str(exc_info[1])) - def shutdown(self, error=None): - """Stop serving API, clean up. + """Stop serving API. :returns: None """ - # TODO(aarefiev): move shutdown code to WorkerService - if not self._shutting_down.acquire(blocking=False): - LOG.warning('Attempted to shut down while already shutting down') - return - LOG.debug('Shutting down') - - self.rpc_server.stop() - - if self._periodics_worker is not None: - try: - self._periodics_worker.stop() - self._periodics_worker.wait() - except Exception as e: - LOG.exception('Service error occurred when stopping ' - 'periodic workers. Error: %s', e) - self._periodics_worker = None - - if utils.executor().alive: - utils.executor().shutdown(wait=True) - - pxe_filter.driver().tear_down_filter() - - self._shutting_down.release() - LOG.info('Shut down successfully') + self.rpc_service.stop() sys.exit(error) def run(self): @@ -186,10 +103,9 @@ class WSGIService(object): self._init_middleware() - self._init_host() - - self.rpc_server = rpc.get_server() - self.rpc_server.start() + LOG.info('Spawning RPC service') + service.launch(CONF, self.rpc_service, + restart_method='mutate') try: self.app.run(**app_kwargs) @@ -210,20 +126,3 @@ class WSGIService(object): # SIGTERM. Raising KeyboardIntrerrupt which won't be caught by any # 'except Exception' clauses. raise KeyboardInterrupt - - -def periodic_clean_up(): # pragma: no cover - try: - if node_cache.clean_up(): - pxe_filter.driver().sync(ir_utils.get_client()) - sync_with_ironic() - except Exception: - LOG.exception('Periodic clean up of node cache failed') - - -def sync_with_ironic(): - ironic = ir_utils.get_client() - # TODO(yuikotakada): pagination - ironic_nodes = ironic.node.list(limit=0) - ironic_node_uuids = {node.uuid for node in ironic_nodes} - node_cache.delete_nodes_not_in_list(ironic_node_uuids) diff --git a/releasenotes/notes/rpc-backends-0e7405aa1c7723a0.yaml b/releasenotes/notes/rpc-backends-0e7405aa1c7723a0.yaml new file mode 100644 index 000000000..57abafbac --- /dev/null +++ b/releasenotes/notes/rpc-backends-0e7405aa1c7723a0.yaml @@ -0,0 +1,7 @@ +--- +upgrade: + - | + Adds rpc related configuration options for the communication between + ironic-inspector API and worker. It needs to be configured properly + during upgrade. Set ``[DEFAULT]transport_url`` to ``fake://`` if a + rpc backend is not available or not desired. \ No newline at end of file diff --git a/tools/config-generator.conf b/tools/config-generator.conf index fae4268a6..47fd222cf 100644 --- a/tools/config-generator.conf +++ b/tools/config-generator.conf @@ -6,3 +6,4 @@ namespace = oslo.db namespace = oslo.log namespace = oslo.middleware.cors namespace = oslo.policy +namespace = oslo.messaging