Wrap rpc server into oslo.service

This patch is part of work splitting inspector api and worker.
The rpc server is wrapped into oslo.service, and launched from
api process.

Now api and worker belongs to separate thread, functional tests
use the fake messaging driver, while devstack jobs use rabbitmq.

Change-Id: Ie03d16f9d115c3561723463dea6a57d2a763bcc6
Story: #2001842
Task: #23109
This commit is contained in:
Kaifeng Wang 2018-07-23 15:56:27 +08:00
parent 054f300290
commit a228912827
12 changed files with 420 additions and 339 deletions

View File

@ -267,6 +267,8 @@ function configure_inspector {
inspector_iniset iptables dnsmasq_interface $IRONIC_INSPECTOR_INTERFACE inspector_iniset iptables dnsmasq_interface $IRONIC_INSPECTOR_INTERFACE
inspector_iniset database connection `database_connection_url ironic_inspector` inspector_iniset database connection `database_connection_url ironic_inspector`
iniset_rpc_backend ironic-inspector $IRONIC_INSPECTOR_CONF_FILE
if is_service_enabled swift; then if is_service_enabled swift; then
configure_inspector_swift configure_inspector_swift
fi fi

View File

@ -45,6 +45,7 @@ source $TARGET_DEVSTACK_DIR/lib/neutron-legacy
source $TARGET_DEVSTACK_DIR/lib/apache source $TARGET_DEVSTACK_DIR/lib/apache
source $TARGET_DEVSTACK_DIR/lib/keystone source $TARGET_DEVSTACK_DIR/lib/keystone
source $TARGET_DEVSTACK_DIR/lib/database source $TARGET_DEVSTACK_DIR/lib/database
source $TARGET_DEVSTACK_DIR/lib/rpc_backend
# Inspector relies on couple of Ironic variables # Inspector relies on couple of Ironic variables
source $TARGET_RELEASE_DIR/ironic/devstack/lib/ironic source $TARGET_RELEASE_DIR/ironic/devstack/lib/ironic
@ -84,6 +85,8 @@ $IRONIC_INSPECTOR_DBSYNC_BIN_FILE --config-file $IRONIC_INSPECTOR_CONF_FILE upgr
# calls upgrade inspector for specific release # calls upgrade inspector for specific release
upgrade_project ironic-inspector $RUN_DIR $BASE_DEVSTACK_BRANCH $TARGET_DEVSTACK_BRANCH upgrade_project ironic-inspector $RUN_DIR $BASE_DEVSTACK_BRANCH $TARGET_DEVSTACK_BRANCH
# setup transport_url for rpc messaging
iniset_rpc_backend ironic-inspector $IRONIC_INSPECTOR_CONF_FILE
start_inspector start_inspector
if is_inspector_dhcp_required; then if is_inspector_dhcp_required; then

View File

@ -19,38 +19,31 @@ from oslo_messaging.rpc import dispatcher
from ironic_inspector.conductor import manager from ironic_inspector.conductor import manager
CONF = cfg.CONF CONF = cfg.CONF
_SERVER = None
TRANSPORT = None TRANSPORT = None
TOPIC = 'ironic-inspector-worker'
SERVER_NAME = 'ironic-inspector-rpc-server'
def get_transport(): def get_transport():
global TRANSPORT global TRANSPORT
if TRANSPORT is None: if TRANSPORT is None:
TRANSPORT = messaging.get_rpc_transport(CONF, url='fake://') TRANSPORT = messaging.get_rpc_transport(CONF)
return TRANSPORT return TRANSPORT
def get_client(): def get_client():
target = messaging.Target(topic=TOPIC, server=SERVER_NAME, """Get a RPC client instance."""
target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host,
version='1.1') version='1.1')
transport = get_transport() transport = get_transport()
return messaging.RPCClient(transport, target) return messaging.RPCClient(transport, target)
def get_server(): def get_server(endpoints):
"""Get the singleton RPC server.""" """Get a RPC server instance."""
global _SERVER
if _SERVER is None: transport = get_transport()
transport = get_transport() target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host,
target = messaging.Target(topic=TOPIC, server=SERVER_NAME, version='1.1')
version='1.1') return messaging.get_rpc_server(
mgr = manager.ConductorManager() transport, target, endpoints, executor='eventlet',
_SERVER = messaging.get_rpc_server( access_policy=dispatcher.DefaultRPCAccessPolicy)
transport, target, [mgr], executor='eventlet',
access_policy=dispatcher.DefaultRPCAccessPolicy)
return _SERVER

View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log
from oslo_service import service
from ironic_inspector.common import rpc
from ironic_inspector.conductor import manager
CONF = cfg.CONF
LOG = log.getLogger(__name__)
SERVER_NAME = 'ironic-inspector-rpc-server'
class RPCService(service.Service):
def __init__(self, host):
super(RPCService, self).__init__()
self.host = host
self.manager = manager.ConductorManager()
self.rpcserver = None
def start(self):
super(RPCService, self).start()
self.rpcserver = rpc.get_server([self.manager])
self.rpcserver.start()
self.manager.init_host()
LOG.info('Created RPC server for service %(service)s on host '
'%(host)s.',
{'service': manager.MANAGER_TOPIC, 'host': self.host})
def stop(self):
try:
self.rpcserver.stop()
self.rpcserver.wait()
except Exception as e:
LOG.exception('Service error occurred when stopping the '
'RPC server. Error: %s', e)
try:
self.manager.del_host()
except Exception as e:
LOG.exception('Service error occurred when cleaning up '
'the RPC manager. Error: %s', e)
super(RPCService, self).stop(graceful=True)
LOG.info('Stopped RPC server for service %(service)s on host '
'%(host)s.',
{'service': manager.MANAGER_TOPIC, 'host': self.host})

View File

@ -11,12 +11,30 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import oslo_messaging as messaging import sys
import traceback as traceback_mod
import eventlet
from eventlet import semaphore
from futurist import periodics
from oslo_config import cfg
from oslo_log import log
import oslo_messaging as messaging
from oslo_utils import reflection
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector import db
from ironic_inspector import introspect from ironic_inspector import introspect
from ironic_inspector import node_cache
from ironic_inspector.plugins import base as plugins_base
from ironic_inspector import process from ironic_inspector import process
from ironic_inspector.pxe_filter import base as pxe_filter
from ironic_inspector import utils from ironic_inspector import utils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
MANAGER_TOPIC = 'ironic-inspector-conductor'
class ConductorManager(object): class ConductorManager(object):
"""ironic inspector conductor manager""" """ironic inspector conductor manager"""
@ -24,6 +42,79 @@ class ConductorManager(object):
target = messaging.Target(version=RPC_API_VERSION) target = messaging.Target(version=RPC_API_VERSION)
def __init__(self):
self._periodics_worker = None
self._shutting_down = semaphore.Semaphore()
def init_host(self):
"""Initialize Worker host
Init db connection, load and validate processing
hooks, runs periodic tasks.
:returns None
"""
if CONF.processing.store_data == 'none':
LOG.warning('Introspection data will not be stored. Change '
'"[processing] store_data" option if this is not '
'the desired behavior')
elif CONF.processing.store_data == 'swift':
LOG.info('Introspection data will be stored in Swift in the '
'container %s', CONF.swift.container)
db.init()
try:
hooks = plugins_base.validate_processing_hooks()
except Exception as exc:
LOG.critical(str(exc))
sys.exit(1)
LOG.info('Enabled processing hooks: %s', [h.name for h in hooks])
driver = pxe_filter.driver()
driver.init_filter()
periodic_clean_up_ = periodics.periodic(
spacing=CONF.clean_up_period
)(periodic_clean_up)
self._periodics_worker = periodics.PeriodicWorker(
callables=[(driver.get_periodic_sync_task(), None, None),
(periodic_clean_up_, None, None)],
executor_factory=periodics.ExistingExecutor(utils.executor()),
on_failure=self._periodics_watchdog)
utils.executor().submit(self._periodics_worker.start)
def del_host(self):
if not self._shutting_down.acquire(blocking=False):
LOG.warning('Attempted to shut down while already shutting down')
return
pxe_filter.driver().tear_down_filter()
if self._periodics_worker is not None:
try:
self._periodics_worker.stop()
self._periodics_worker.wait()
except Exception as e:
LOG.exception('Service error occurred when stopping '
'periodic workers. Error: %s', e)
self._periodics_worker = None
if utils.executor().alive:
utils.executor().shutdown(wait=True)
self._shutting_down.release()
LOG.info('Shut down successfully')
def _periodics_watchdog(self, callable_, activity, spacing, exc_info,
traceback=None):
LOG.exception("The periodic %(callable)s failed with: %(exception)s", {
'exception': ''.join(traceback_mod.format_exception(*exc_info)),
'callable': reflection.get_callable_name(callable_)})
# NOTE(milan): spawn new thread otherwise waiting would block
eventlet.spawn(self.del_host)
@messaging.expected_exceptions(utils.Error) @messaging.expected_exceptions(utils.Error)
def do_introspection(self, context, node_id, token=None, def do_introspection(self, context, node_id, token=None,
manage_boot=True): manage_boot=True):
@ -36,3 +127,20 @@ class ConductorManager(object):
@messaging.expected_exceptions(utils.Error) @messaging.expected_exceptions(utils.Error)
def do_reapply(self, context, node_id, token=None): def do_reapply(self, context, node_id, token=None):
process.reapply(node_id) process.reapply(node_id)
def periodic_clean_up(): # pragma: no cover
try:
if node_cache.clean_up():
pxe_filter.driver().sync(ir_utils.get_client())
sync_with_ironic()
except Exception:
LOG.exception('Periodic clean up of node cache failed')
def sync_with_ironic():
ironic = ir_utils.get_client()
# TODO(yuikotakada): pagination
ironic_nodes = ironic.node.list(limit=0)
ironic_node_uuids = {node.uuid for node in ironic_nodes}
node_cache.delete_nodes_not_in_list(ironic_node_uuids)

View File

@ -11,6 +11,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import socket
from oslo_config import cfg from oslo_config import cfg
from ironic_inspector.common.i18n import _ from ironic_inspector.common.i18n import _
@ -23,6 +25,14 @@ _OPTS = [
cfg.PortOpt('listen_port', cfg.PortOpt('listen_port',
default=5050, default=5050,
help=_('Port to listen on.')), help=_('Port to listen on.')),
cfg.StrOpt('host',
default=socket.getfqdn(),
sample_default='localhost',
help=_('Name of this node. This can be an opaque identifier. '
'It is not necessarily a hostname, FQDN, or IP address. '
'However, the node name must be valid within '
'an AMQP key, and if using ZeroMQ, a valid '
'hostname, FQDN, or IP address.')),
cfg.StrOpt('auth_strategy', cfg.StrOpt('auth_strategy',
default='keystone', default='keystone',
choices=('keystone', 'noauth'), choices=('keystone', 'noauth'),

View File

@ -57,6 +57,7 @@ driver = noop
debug = True debug = True
introspection_delay = 0 introspection_delay = 0
auth_strategy=noauth auth_strategy=noauth
transport_url=fake://
[database] [database]
connection = sqlite:///%(db_file)s connection = sqlite:///%(db_file)s
[processing] [processing]

View File

@ -11,6 +11,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import fixtures
import mock import mock
import oslo_messaging as messaging import oslo_messaging as messaging
@ -27,11 +28,213 @@ CONF = ironic_inspector.conf.CONF
class BaseManagerTest(test_base.NodeTest): class BaseManagerTest(test_base.NodeTest):
def setUp(self): def setUp(self):
super(BaseManagerTest, self).setUp() super(BaseManagerTest, self).setUp()
self.mock_log = self.useFixture(fixtures.MockPatchObject(
manager, 'LOG')).mock
self.mock__shutting_down = (self.useFixture(fixtures.MockPatchObject(
manager.semaphore, 'Semaphore', autospec=True))
.mock.return_value)
self.mock__shutting_down.acquire.return_value = True
self.manager = manager.ConductorManager() self.manager = manager.ConductorManager()
self.context = {} self.context = {}
self.token = None self.token = None
class TestManagerInitHost(BaseManagerTest):
def setUp(self):
super(TestManagerInitHost, self).setUp()
self.mock_db_init = self.useFixture(fixtures.MockPatchObject(
manager.db, 'init')).mock
self.mock_validate_processing_hooks = self.useFixture(
fixtures.MockPatchObject(manager.plugins_base,
'validate_processing_hooks')).mock
self.mock_filter = self.useFixture(fixtures.MockPatchObject(
manager.pxe_filter, 'driver')).mock.return_value
self.mock_periodic = self.useFixture(fixtures.MockPatchObject(
manager.periodics, 'periodic')).mock
self.mock_PeriodicWorker = self.useFixture(fixtures.MockPatchObject(
manager.periodics, 'PeriodicWorker')).mock
self.mock_executor = self.useFixture(fixtures.MockPatchObject(
manager.utils, 'executor')).mock
self.mock_ExistingExecutor = self.useFixture(fixtures.MockPatchObject(
manager.periodics, 'ExistingExecutor')).mock
self.mock_exit = self.useFixture(fixtures.MockPatchObject(
manager.sys, 'exit')).mock
def assert_periodics(self):
outer_cleanup_decorator_call = mock.call(
spacing=CONF.clean_up_period)
self.mock_periodic.assert_has_calls([
outer_cleanup_decorator_call,
mock.call()(manager.periodic_clean_up)])
inner_decorator = self.mock_periodic.return_value
inner_cleanup_decorator_call = mock.call(
manager.periodic_clean_up)
inner_decorator.assert_has_calls([inner_cleanup_decorator_call])
self.mock_ExistingExecutor.assert_called_once_with(
self.mock_executor.return_value)
periodic_worker = self.mock_PeriodicWorker.return_value
periodic_sync = self.mock_filter.get_periodic_sync_task.return_value
callables = [(periodic_sync, None, None),
(inner_decorator.return_value, None, None)]
self.mock_PeriodicWorker.assert_called_once_with(
callables=callables,
executor_factory=self.mock_ExistingExecutor.return_value,
on_failure=self.manager._periodics_watchdog)
self.assertIs(periodic_worker, self.manager._periodics_worker)
self.mock_executor.return_value.submit.assert_called_once_with(
self.manager._periodics_worker.start)
def test_no_introspection_data_store(self):
CONF.set_override('store_data', 'none', 'processing')
self.manager.init_host()
self.mock_log.warning.assert_called_once_with(
'Introspection data will not be stored. Change "[processing] '
'store_data" option if this is not the desired behavior')
def test_init_host(self):
self.manager.init_host()
self.mock_db_init.asset_called_once_with()
self.mock_validate_processing_hooks.assert_called_once_with()
self.mock_filter.init_filter.assert_called_once_with()
self.assert_periodics()
def test_init_host_validate_processing_hooks_exception(self):
class MyError(Exception):
pass
error = MyError('Oops!')
self.mock_validate_processing_hooks.side_effect = error
# NOTE(milan): have to stop executing the test case at this point to
# simulate a real sys.exit() call
self.mock_exit.side_effect = SystemExit('Stop!')
self.assertRaisesRegex(SystemExit, 'Stop!', self.manager.init_host)
self.mock_db_init.assert_called_once_with()
self.mock_log.critical.assert_called_once_with(str(error))
self.mock_exit.assert_called_once_with(1)
self.mock_filter.init_filter.assert_not_called()
class TestManagerDelHost(BaseManagerTest):
def setUp(self):
super(TestManagerDelHost, self).setUp()
self.mock_filter = self.useFixture(fixtures.MockPatchObject(
manager.pxe_filter, 'driver')).mock.return_value
self.mock_executor = mock.Mock()
self.mock_executor.alive = True
self.mock_get_executor = self.useFixture(fixtures.MockPatchObject(
manager.utils, 'executor')).mock
self.mock_get_executor.return_value = self.mock_executor
self.mock__periodic_worker = self.useFixture(fixtures.MockPatchObject(
self.manager, '_periodics_worker')).mock
self.mock_exit = self.useFixture(fixtures.MockPatchObject(
manager.sys, 'exit')).mock
def test_del_host(self):
self.manager.del_host()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_called_once_with()
self.mock__periodic_worker.wait.assert_called_once_with()
self.assertIsNone(self.manager._periodics_worker)
self.mock_executor.shutdown.assert_called_once_with(wait=True)
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
def test_del_host_race(self):
self.mock__shutting_down.acquire.return_value = False
self.manager.del_host()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock_log.warning.assert_called_once_with(
'Attempted to shut down while already shutting down')
self.mock__periodic_worker.stop.assert_not_called()
self.mock__periodic_worker.wait.assert_not_called()
self.assertIs(self.mock__periodic_worker,
self.manager._periodics_worker)
self.mock_executor.shutdown.assert_not_called()
self.mock_filter.tear_down_filter.assert_not_called()
self.mock__shutting_down.release.assert_not_called()
self.mock_exit.assert_not_called()
def test_del_host_worker_exception(self):
class MyError(Exception):
pass
error = MyError('Oops!')
self.mock__periodic_worker.wait.side_effect = error
self.manager.del_host()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_called_once_with()
self.mock__periodic_worker.wait.assert_called_once_with()
self.mock_log.exception.assert_called_once_with(
'Service error occurred when stopping periodic workers. Error: %s',
error)
self.assertIsNone(self.manager._periodics_worker)
self.mock_executor.shutdown.assert_called_once_with(wait=True)
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
def test_del_host_no_worker(self):
self.manager._periodics_worker = None
self.manager.del_host()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_not_called()
self.mock__periodic_worker.wait.assert_not_called()
self.assertIsNone(self.manager._periodics_worker)
self.mock_executor.shutdown.assert_called_once_with(wait=True)
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
def test_del_host_stopped_executor(self):
self.mock_executor.alive = False
self.manager.del_host()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_called_once_with()
self.mock__periodic_worker.wait.assert_called_once_with()
self.assertIsNone(self.manager._periodics_worker)
self.mock_executor.shutdown.assert_not_called()
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
class TestManagerPeriodicWatchDog(BaseManagerTest):
def setUp(self):
super(TestManagerPeriodicWatchDog, self).setUp()
self.mock_get_callable_name = self.useFixture(fixtures.MockPatchObject(
manager.reflection, 'get_callable_name')).mock
self.mock_spawn = self.useFixture(fixtures.MockPatchObject(
manager.eventlet, 'spawn')).mock
def test__periodics_watchdog(self):
error = RuntimeError('Oops!')
self.manager._periodics_watchdog(
callable_=None, activity=None, spacing=None,
exc_info=(None, error, None), traceback=None)
self.mock_get_callable_name.assert_called_once_with(None)
self.mock_spawn.assert_called_once_with(self.manager.del_host)
class TestManagerIntrospect(BaseManagerTest): class TestManagerIntrospect(BaseManagerTest):
@mock.patch.object(introspect, 'introspect', autospec=True) @mock.patch.object(introspect, 'introspect', autospec=True)
def test_do_introspect(self, introspect_mock): def test_do_introspect(self, introspect_mock):

View File

@ -20,11 +20,9 @@ import fixtures
import mock import mock
from oslo_config import cfg from oslo_config import cfg
from ironic_inspector.common import rpc
from ironic_inspector.test import base as test_base from ironic_inspector.test import base as test_base
from ironic_inspector import wsgi_service from ironic_inspector import wsgi_service
CONF = cfg.CONF CONF = cfg.CONF
@ -34,15 +32,9 @@ class BaseWSGITest(test_base.BaseTest):
super(BaseWSGITest, self).setUp() super(BaseWSGITest, self).setUp()
self.app = self.useFixture(fixtures.MockPatchObject( self.app = self.useFixture(fixtures.MockPatchObject(
wsgi_service.app, 'app', autospec=True)).mock wsgi_service.app, 'app', autospec=True)).mock
self.mock__shutting_down = (self.useFixture(fixtures.MockPatchObject(
wsgi_service.semaphore, 'Semaphore', autospec=True))
.mock.return_value)
self.mock__shutting_down.acquire.return_value = True
self.mock_log = self.useFixture(fixtures.MockPatchObject( self.mock_log = self.useFixture(fixtures.MockPatchObject(
wsgi_service, 'LOG')).mock wsgi_service, 'LOG')).mock
self.service = wsgi_service.WSGIService() self.service = wsgi_service.WSGIService()
self.mock_rpc_server = self.useFixture(fixtures.MockPatchObject(
rpc, 'get_server')).mock
class TestWSGIServiceInitMiddleware(BaseWSGITest): class TestWSGIServiceInitMiddleware(BaseWSGITest):
@ -73,118 +65,10 @@ class TestWSGIServiceInitMiddleware(BaseWSGITest):
'Starting unauthenticated, please check configuration') 'Starting unauthenticated, please check configuration')
self.mock_add_cors_middleware.assert_called_once_with(self.app) self.mock_add_cors_middleware.assert_called_once_with(self.app)
def test_init_middleware_no_store(self):
CONF.set_override('store_data', 'none', 'processing')
self.service._init_middleware()
self.mock_add_auth_middleware.assert_called_once_with(self.app)
self.mock_log.warning.assert_called_once_with(
'Introspection data will not be stored. Change "[processing] '
'store_data" option if this is not the desired behavior')
self.mock_add_cors_middleware.assert_called_once_with(self.app)
class TestWSGIServiceInitHost(BaseWSGITest):
def setUp(self):
super(TestWSGIServiceInitHost, self).setUp()
self.mock_db_init = self.useFixture(fixtures.MockPatchObject(
wsgi_service.db, 'init')).mock
self.mock_validate_processing_hooks = self.useFixture(
fixtures.MockPatchObject(wsgi_service.plugins_base,
'validate_processing_hooks')).mock
self.mock_filter = self.useFixture(fixtures.MockPatchObject(
wsgi_service.pxe_filter, 'driver')).mock.return_value
self.mock_periodic = self.useFixture(fixtures.MockPatchObject(
wsgi_service.periodics, 'periodic')).mock
self.mock_PeriodicWorker = self.useFixture(fixtures.MockPatchObject(
wsgi_service.periodics, 'PeriodicWorker')).mock
self.mock_executor = self.useFixture(fixtures.MockPatchObject(
wsgi_service.utils, 'executor')).mock
self.mock_ExistingExecutor = self.useFixture(fixtures.MockPatchObject(
wsgi_service.periodics, 'ExistingExecutor')).mock
self.mock_exit = self.useFixture(fixtures.MockPatchObject(
wsgi_service.sys, 'exit')).mock
def assert_periodics(self):
outer_cleanup_decorator_call = mock.call(
spacing=CONF.clean_up_period)
self.mock_periodic.assert_has_calls([
outer_cleanup_decorator_call,
mock.call()(wsgi_service.periodic_clean_up)])
inner_decorator = self.mock_periodic.return_value
inner_cleanup_decorator_call = mock.call(
wsgi_service.periodic_clean_up)
inner_decorator.assert_has_calls([inner_cleanup_decorator_call])
self.mock_ExistingExecutor.assert_called_once_with(
self.mock_executor.return_value)
periodic_worker = self.mock_PeriodicWorker.return_value
periodic_sync = self.mock_filter.get_periodic_sync_task.return_value
callables = [(periodic_sync, None, None),
(inner_decorator.return_value, None, None)]
self.mock_PeriodicWorker.assert_called_once_with(
callables=callables,
executor_factory=self.mock_ExistingExecutor.return_value,
on_failure=self.service._periodics_watchdog)
self.assertIs(periodic_worker, self.service._periodics_worker)
self.mock_executor.return_value.submit.assert_called_once_with(
self.service._periodics_worker.start)
def test_init_host(self):
self.service._init_host()
self.mock_db_init.asset_called_once_with()
self.mock_validate_processing_hooks.assert_called_once_with()
self.mock_filter.init_filter.assert_called_once_with()
self.assert_periodics()
def test_init_host_validate_processing_hooks_exception(self):
class MyError(Exception):
pass
error = MyError('Oops!')
self.mock_validate_processing_hooks.side_effect = error
# NOTE(milan): have to stop executing the test case at this point to
# simulate a real sys.exit() call
self.mock_exit.side_effect = SystemExit('Stop!')
self.assertRaisesRegex(SystemExit, 'Stop!', self.service._init_host)
self.mock_db_init.assert_called_once_with()
self.mock_log.critical.assert_called_once_with(str(error))
self.mock_exit.assert_called_once_with(1)
self.mock_filter.init_filter.assert_not_called()
class TestWSGIServicePeriodicWatchDog(BaseWSGITest):
def setUp(self):
super(TestWSGIServicePeriodicWatchDog, self).setUp()
self.mock_get_callable_name = self.useFixture(fixtures.MockPatchObject(
wsgi_service.reflection, 'get_callable_name')).mock
self.mock_spawn = self.useFixture(fixtures.MockPatchObject(
wsgi_service.eventlet, 'spawn')).mock
def test__periodics_watchdog(self):
error = RuntimeError('Oops!')
self.service._periodics_watchdog(
callable_=None, activity=None, spacing=None,
exc_info=(None, error, None), traceback=None)
self.mock_get_callable_name.assert_called_once_with(None)
self.mock_spawn.assert_called_once_with(self.service.shutdown,
error=str(error))
class TestWSGIServiceRun(BaseWSGITest): class TestWSGIServiceRun(BaseWSGITest):
def setUp(self): def setUp(self):
super(TestWSGIServiceRun, self).setUp() super(TestWSGIServiceRun, self).setUp()
self.mock__init_host = self.useFixture(fixtures.MockPatchObject(
self.service, '_init_host')).mock
self.mock__init_middleware = self.useFixture(fixtures.MockPatchObject( self.mock__init_middleware = self.useFixture(fixtures.MockPatchObject(
self.service, '_init_middleware')).mock self.service, '_init_middleware')).mock
self.mock__create_ssl_context = self.useFixture( self.mock__create_ssl_context = self.useFixture(
@ -201,9 +85,6 @@ class TestWSGIServiceRun(BaseWSGITest):
self.mock__create_ssl_context.assert_called_once_with() self.mock__create_ssl_context.assert_called_once_with()
self.mock__init_middleware.assert_called_once_with() self.mock__init_middleware.assert_called_once_with()
self.mock__init_host.assert_called_once_with()
self.mock_rpc_server.assert_called_once_with()
self.service.rpc_server.start.assert_called_once_with()
self.app.run.assert_called_once_with( self.app.run.assert_called_once_with(
host=CONF.listen_address, port=CONF.listen_port, host=CONF.listen_address, port=CONF.listen_port,
ssl_context=self.mock__create_ssl_context.return_value) ssl_context=self.mock__create_ssl_context.return_value)
@ -215,7 +96,6 @@ class TestWSGIServiceRun(BaseWSGITest):
self.service.run() self.service.run()
self.mock__create_ssl_context.assert_called_once_with() self.mock__create_ssl_context.assert_called_once_with()
self.mock__init_middleware.assert_called_once_with() self.mock__init_middleware.assert_called_once_with()
self.mock__init_host.assert_called_once_with()
self.app.run.assert_called_once_with( self.app.run.assert_called_once_with(
host=CONF.listen_address, port=CONF.listen_port) host=CONF.listen_address, port=CONF.listen_port)
self.mock_shutdown.assert_called_once_with() self.mock_shutdown.assert_called_once_with()
@ -230,7 +110,6 @@ class TestWSGIServiceRun(BaseWSGITest):
self.mock__create_ssl_context.assert_called_once_with() self.mock__create_ssl_context.assert_called_once_with()
self.mock__init_middleware.assert_called_once_with() self.mock__init_middleware.assert_called_once_with()
self.mock__init_host.assert_called_once_with()
self.app.run.assert_called_once_with( self.app.run.assert_called_once_with(
host=CONF.listen_address, port=CONF.listen_port, host=CONF.listen_address, port=CONF.listen_port,
ssl_context=self.mock__create_ssl_context.return_value) ssl_context=self.mock__create_ssl_context.return_value)
@ -240,108 +119,21 @@ class TestWSGIServiceRun(BaseWSGITest):
class TestWSGIServiceShutdown(BaseWSGITest): class TestWSGIServiceShutdown(BaseWSGITest):
def setUp(self): def setUp(self):
super(TestWSGIServiceShutdown, self).setUp() super(TestWSGIServiceShutdown, self).setUp()
self.mock_filter = self.useFixture(fixtures.MockPatchObject(
wsgi_service.pxe_filter, 'driver')).mock.return_value
self.mock_executor = mock.Mock()
self.mock_executor.alive = True
self.mock_get_executor = self.useFixture(fixtures.MockPatchObject(
wsgi_service.utils, 'executor')).mock
self.mock_get_executor.return_value = self.mock_executor
self.service = wsgi_service.WSGIService() self.service = wsgi_service.WSGIService()
self.mock__periodic_worker = self.useFixture(fixtures.MockPatchObject( self.mock_rpc_service = mock.MagicMock()
self.service, '_periodics_worker')).mock self.service.rpc_service = self.mock_rpc_service
self.mock_exit = self.useFixture(fixtures.MockPatchObject( self.mock_exit = self.useFixture(fixtures.MockPatchObject(
wsgi_service.sys, 'exit')).mock wsgi_service.sys, 'exit')).mock
self.service.rpc_server = self.mock_rpc_server
def test_shutdown(self): def test_shutdown(self):
class MyError(Exception): class MyError(Exception):
pass pass
error = MyError('Oops!') error = MyError('Oops!')
self.service.shutdown(error=error) self.service.shutdown(error=error)
self.mock_rpc_service.stop.assert_called_once_with()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_called_once_with()
self.mock__periodic_worker.wait.assert_called_once_with()
self.assertIsNone(self.service._periodics_worker)
self.mock_executor.shutdown.assert_called_once_with(wait=True)
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
self.mock_exit.assert_called_once_with(error) self.mock_exit.assert_called_once_with(error)
def test_shutdown_race(self):
self.mock__shutting_down.acquire.return_value = False
self.service.shutdown()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock_log.warning.assert_called_once_with(
'Attempted to shut down while already shutting down')
self.mock__periodic_worker.stop.assert_not_called()
self.mock__periodic_worker.wait.assert_not_called()
self.assertIs(self.mock__periodic_worker,
self.service._periodics_worker)
self.mock_executor.shutdown.assert_not_called()
self.mock_filter.tear_down_filter.assert_not_called()
self.mock__shutting_down.release.assert_not_called()
self.mock_exit.assert_not_called()
def test_shutdown_worker_exception(self):
class MyError(Exception):
pass
error = MyError('Oops!')
self.mock__periodic_worker.wait.side_effect = error
self.service.shutdown()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_called_once_with()
self.mock__periodic_worker.wait.assert_called_once_with()
self.mock_log.exception.assert_called_once_with(
'Service error occurred when stopping periodic workers. Error: %s',
error)
self.assertIsNone(self.service._periodics_worker)
self.mock_executor.shutdown.assert_called_once_with(wait=True)
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
self.mock_exit.assert_called_once_with(None)
def test_shutdown_no_worker(self):
self.service._periodics_worker = None
self.service.shutdown()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_not_called()
self.mock__periodic_worker.wait.assert_not_called()
self.assertIsNone(self.service._periodics_worker)
self.mock_executor.shutdown.assert_called_once_with(wait=True)
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
self.mock_exit.assert_called_once_with(None)
def test_shutdown_stopped_executor(self):
self.mock_executor.alive = False
self.service.shutdown()
self.mock__shutting_down.acquire.assert_called_once_with(
blocking=False)
self.mock__periodic_worker.stop.assert_called_once_with()
self.mock__periodic_worker.wait.assert_called_once_with()
self.assertIsNone(self.service._periodics_worker)
self.mock_executor.shutdown.assert_not_called()
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
self.mock_exit.assert_called_once_with(None)
class TestCreateSSLContext(test_base.BaseTest): class TestCreateSSLContext(test_base.BaseTest):
def setUp(self): def setUp(self):

View File

@ -13,25 +13,16 @@
import signal import signal
import ssl import ssl
import sys import sys
import traceback as traceback_mod
import eventlet import eventlet
from eventlet import semaphore
from futurist import periodics
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_utils import reflection from oslo_service import service
from ironic_inspector.common import ironic as ir_utils from ironic_inspector.common.rpc_service import RPCService
from ironic_inspector.common import rpc
from ironic_inspector import db
from ironic_inspector import main as app from ironic_inspector import main as app
from ironic_inspector import node_cache
from ironic_inspector.plugins import base as plugins_base
from ironic_inspector.pxe_filter import base as pxe_filter
from ironic_inspector import utils from ironic_inspector import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
@ -41,10 +32,9 @@ class WSGIService(object):
def __init__(self): def __init__(self):
self.app = app.app self.app = app.app
self._periodics_worker = None
self._shutting_down = semaphore.Semaphore()
signal.signal(signal.SIGHUP, self._handle_sighup) signal.signal(signal.SIGHUP, self._handle_sighup)
signal.signal(signal.SIGTERM, self._handle_sigterm) signal.signal(signal.SIGTERM, self._handle_sigterm)
self.rpc_service = RPCService(CONF.host)
def _init_middleware(self): def _init_middleware(self):
"""Initialize WSGI middleware. """Initialize WSGI middleware.
@ -57,15 +47,6 @@ class WSGIService(object):
else: else:
LOG.warning('Starting unauthenticated, please check' LOG.warning('Starting unauthenticated, please check'
' configuration') ' configuration')
# TODO(aarefiev): move to WorkerService once we split service
if CONF.processing.store_data == 'none':
LOG.warning('Introspection data will not be stored. Change '
'"[processing] store_data" option if this is not '
'the desired behavior')
elif CONF.processing.store_data == 'swift':
LOG.info('Introspection data will be stored in Swift in the '
'container %s', CONF.swift.container)
utils.add_cors_middleware(self.app) utils.add_cors_middleware(self.app)
def _create_ssl_context(self): def _create_ssl_context(self):
@ -99,77 +80,13 @@ class WSGIService(object):
'settings: %s', exc) 'settings: %s', exc)
return context return context
# TODO(aarefiev): move init code to WorkerService
def _init_host(self):
"""Initialize Worker host
Init db connection, load and validate processing
hooks, runs periodic tasks.
:returns None
"""
db.init()
try:
hooks = plugins_base.validate_processing_hooks()
except Exception as exc:
LOG.critical(str(exc))
sys.exit(1)
LOG.info('Enabled processing hooks: %s', [h.name for h in hooks])
driver = pxe_filter.driver()
driver.init_filter()
periodic_clean_up_ = periodics.periodic(
spacing=CONF.clean_up_period
)(periodic_clean_up)
self._periodics_worker = periodics.PeriodicWorker(
callables=[(driver.get_periodic_sync_task(), None, None),
(periodic_clean_up_, None, None)],
executor_factory=periodics.ExistingExecutor(utils.executor()),
on_failure=self._periodics_watchdog)
utils.executor().submit(self._periodics_worker.start)
def _periodics_watchdog(self, callable_, activity, spacing, exc_info,
traceback=None):
LOG.exception("The periodic %(callable)s failed with: %(exception)s", {
'exception': ''.join(traceback_mod.format_exception(*exc_info)),
'callable': reflection.get_callable_name(callable_)})
# NOTE(milan): spawn new thread otherwise waiting would block
eventlet.spawn(self.shutdown, error=str(exc_info[1]))
def shutdown(self, error=None): def shutdown(self, error=None):
"""Stop serving API, clean up. """Stop serving API.
:returns: None :returns: None
""" """
# TODO(aarefiev): move shutdown code to WorkerService
if not self._shutting_down.acquire(blocking=False):
LOG.warning('Attempted to shut down while already shutting down')
return
LOG.debug('Shutting down') LOG.debug('Shutting down')
self.rpc_service.stop()
self.rpc_server.stop()
if self._periodics_worker is not None:
try:
self._periodics_worker.stop()
self._periodics_worker.wait()
except Exception as e:
LOG.exception('Service error occurred when stopping '
'periodic workers. Error: %s', e)
self._periodics_worker = None
if utils.executor().alive:
utils.executor().shutdown(wait=True)
pxe_filter.driver().tear_down_filter()
self._shutting_down.release()
LOG.info('Shut down successfully')
sys.exit(error) sys.exit(error)
def run(self): def run(self):
@ -186,10 +103,9 @@ class WSGIService(object):
self._init_middleware() self._init_middleware()
self._init_host() LOG.info('Spawning RPC service')
service.launch(CONF, self.rpc_service,
self.rpc_server = rpc.get_server() restart_method='mutate')
self.rpc_server.start()
try: try:
self.app.run(**app_kwargs) self.app.run(**app_kwargs)
@ -210,20 +126,3 @@ class WSGIService(object):
# SIGTERM. Raising KeyboardIntrerrupt which won't be caught by any # SIGTERM. Raising KeyboardIntrerrupt which won't be caught by any
# 'except Exception' clauses. # 'except Exception' clauses.
raise KeyboardInterrupt raise KeyboardInterrupt
def periodic_clean_up(): # pragma: no cover
try:
if node_cache.clean_up():
pxe_filter.driver().sync(ir_utils.get_client())
sync_with_ironic()
except Exception:
LOG.exception('Periodic clean up of node cache failed')
def sync_with_ironic():
ironic = ir_utils.get_client()
# TODO(yuikotakada): pagination
ironic_nodes = ironic.node.list(limit=0)
ironic_node_uuids = {node.uuid for node in ironic_nodes}
node_cache.delete_nodes_not_in_list(ironic_node_uuids)

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
Adds rpc related configuration options for the communication between
ironic-inspector API and worker. It needs to be configured properly
during upgrade. Set ``[DEFAULT]transport_url`` to ``fake://`` if a
rpc backend is not available or not desired.

View File

@ -6,3 +6,4 @@ namespace = oslo.db
namespace = oslo.log namespace = oslo.log
namespace = oslo.middleware.cors namespace = oslo.middleware.cors
namespace = oslo.policy namespace = oslo.policy
namespace = oslo.messaging