Split API and conductor services

This patch splits API and conductor services for ironic-inspector.
Previous patch utilized lock from tooz coordinator, this patch adds
a coordinator wrapper for easier usage and further introduces group
interfaces.

Each conductor service will join a predefined group to mark it's
availability, on each request, API service will query members from
the group and randomly choose on of them, create desiginated topic
and deliver request to it.

The feature is tested with the memcached, file backend of tooz.
Other backends are not fully tested but may work as well, please
refer to tooz documentation for driver compatibilities[1].

[1] https://docs.openstack.org/tooz/latest/user/compatibility.html

Story: 2001842
Task: 30376

Change-Id: I419176cd6d44d74c066db275ef008fe8bb6ef37a
This commit is contained in:
Kaifeng Wang 2019-07-23 13:18:18 +08:00
parent 1bed475409
commit 293b0c7c15
17 changed files with 618 additions and 153 deletions

View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The Ironic Inspector Conductor service."""
import sys
from oslo_config import cfg
from oslo_service import service
from ironic_inspector.common.i18n import _
from ironic_inspector.common.rpc_service import RPCService
from ironic_inspector.common import service_utils
CONF = cfg.CONF
def main(args=sys.argv[1:]):
# Parse config file and command line options, then start logging
service_utils.prepare_service(args)
if CONF.standalone:
msg = _('To run ironic-inspector-conductor, [DEFAULT]standalone '
'should be set to False.')
sys.exit(msg)
launcher = service.ServiceLauncher(CONF, restart_method='mutate')
launcher.launch_service(RPCService(CONF.host))
launcher.wait()
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,34 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""WSGI script for Ironic Inspector API, installed by pbr."""
import sys
from oslo_config import cfg
from ironic_inspector.common.i18n import _
from ironic_inspector.common import service_utils
from ironic_inspector import main
CONF = cfg.CONF
def initialize_wsgi_app():
# Parse config file and command line options, then start logging
service_utils.prepare_service(sys.argv[1:])
if CONF.standalone:
msg = _('To run ironic-inspector-api, [DEFAULT]standalone should be '
'set to False.')
sys.exit(msg)
return main.get_app()

View File

@ -0,0 +1,137 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log
from tooz import coordination
from ironic_inspector import utils
CONF = cfg.CONF
LOG = log.getLogger(__name__)
COORDINATION_PREFIX = 'ironic_inspector'
COORDINATION_GROUP_NAME = '.'.join([COORDINATION_PREFIX, 'service_group'])
LOCK_PREFIX = 'ironic_inspector.'
class Coordinator(object):
"""Tooz coordination wrapper."""
group_name = COORDINATION_GROUP_NAME.encode('ascii')
lock_prefix = LOCK_PREFIX
def __init__(self, prefix=None):
"""Creates a coordinator instance for service coordination.
:param prefix: The prefix to be part of the member id of the service.
Different types of services on the same host should use
different prefix to work properly.
"""
self.coordinator = None
self.started = False
self.prefix = prefix if prefix else 'default'
def start(self, heartbeat=True):
"""Start coordinator.
:param heartbeat: Whether spawns a new thread to keep heartbeating with
the tooz backend. Unless there is periodic task to
do heartbeat manually, it should be always set to
True.
"""
if self.started:
return
member_id = '.'.join([COORDINATION_PREFIX, self.prefix,
CONF.host]).encode('ascii')
self.coordinator = coordination.get_coordinator(
CONF.coordination.backend_url, member_id)
self.coordinator.start(start_heart=heartbeat)
self.started = True
LOG.debug('Coordinator started successfully.')
def stop(self):
"""Disconnect from coordination backend and stop heartbeat."""
if self.started:
try:
self.coordinator.stop()
except Exception as e:
LOG.error('Failed to stop coordinator: %s', e)
self.coordinator = None
self.started = False
LOG.debug('Coordinator stopped successfully')
def _validate_state(self):
if not self.started:
raise utils.Error('Coordinator should be started before '
'executing coordination actions.')
def _create_group(self):
try:
request = self.coordinator.create_group(self.group_name)
request.get()
except coordination.GroupAlreadyExist:
LOG.debug('Group %s already exists.', self.group_name)
def join_group(self):
"""Join service group."""
self._validate_state()
try:
request = self.coordinator.join_group(self.group_name)
request.get()
except coordination.GroupNotCreated:
self._create_group()
request = self.coordinator.join_group(self.group_name)
request.get()
except coordination.MemberAlreadyExist:
pass
LOG.debug('Joined group %s', self.group_name)
def leave_group(self):
"""Leave service group"""
self._validate_state()
try:
request = self.coordinator.leave_group(self.group_name)
request.get()
LOG.debug('Left group %s', self.group_name)
except coordination.MemberNotJoined:
LOG.debug('Leaving a non-existing group.')
def get_members(self):
"""Get members in the service group."""
self._validate_state()
try:
result = self.coordinator.get_members(self.group_name)
return result.get()
except coordination.GroupNotCreated:
# If the group does not exist, there should be no members in it.
return set()
def get_lock(self, uuid):
"""Get lock for node uuid."""
self._validate_state()
lock_name = (self.lock_prefix + uuid).encode('ascii')
return self.coordinator.get_lock(lock_name)
_COORDINATOR = None
@lockutils.synchronized('inspector_coordinator')
def get_coordinator(prefix=None):
global _COORDINATOR
if _COORDINATOR is None:
_COORDINATOR = Coordinator(prefix=prefix)
return _COORDINATOR

View File

@ -17,7 +17,7 @@ from oslo_concurrency import lockutils
from oslo_config import cfg
import six
from ironic_inspector import utils
from ironic_inspector.common import coordination
CONF = cfg.CONF
_LOCK_TEMPLATE = 'node-%s'
@ -70,11 +70,14 @@ class InternalLock(BaseLock):
class ToozLock(BaseLock):
"""Locking mechanism based on tooz."""
"""Wrapper on tooz locks."""
def __init__(self, coordinator, uuid, prefix='ironic_inspector_'):
name = (prefix + uuid).encode()
self._lock = coordinator.get_lock(name)
def __init__(self, lock):
"""Creates a wrapper on the tooz lock.
:param lock: a tooz lock instance.
"""
self._lock = lock
def acquire(self, blocking=True):
if not self._lock.acquired:
@ -100,5 +103,6 @@ def get_lock(uuid):
if CONF.standalone:
return InternalLock(uuid)
coordinator = utils.get_coordinator()
return ToozLock(coordinator, uuid)
coordinator = coordination.get_coordinator()
lock = coordinator.get_lock(uuid)
return ToozLock(lock)

View File

@ -30,10 +30,18 @@ def get_transport():
return TRANSPORT
def get_client():
"""Get a RPC client instance."""
target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host,
version='1.2')
def get_client(topic=None):
"""Get a RPC client instance.
:param topic: The topic of the message will be delivered to. This argument
is ignored if CONF.standalone is True.
"""
if CONF.standalone:
target = messaging.Target(topic=manager.MANAGER_TOPIC,
server=CONF.host,
version='1.3')
else:
target = messaging.Target(topic=topic, version='1.3')
transport = get_transport()
return messaging.RPCClient(transport, target)
@ -43,7 +51,7 @@ def get_server(endpoints):
transport = get_transport()
target = messaging.Target(topic=manager.MANAGER_TOPIC, server=CONF.host,
version='1.2')
version='1.3')
return messaging.get_rpc_server(
transport, target, endpoints, executor='eventlet',
access_policy=dispatcher.DefaultRPCAccessPolicy)

View File

@ -23,7 +23,9 @@ from oslo_log import log
import oslo_messaging as messaging
from oslo_utils import excutils
from oslo_utils import reflection
import tooz
from ironic_inspector.common import coordination
from ironic_inspector.common.i18n import _
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import keystone
@ -37,12 +39,12 @@ from ironic_inspector import utils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
MANAGER_TOPIC = 'ironic-inspector-conductor'
MANAGER_TOPIC = 'ironic_inspector.conductor'
class ConductorManager(object):
"""ironic inspector conductor manager"""
RPC_API_VERSION = '1.2'
RPC_API_VERSION = '1.3'
target = messaging.Target(version=RPC_API_VERSION)
@ -98,9 +100,10 @@ class ConductorManager(object):
if not CONF.standalone:
try:
coordinator = utils.get_coordinator()
coordinator.start()
except Exception:
coordinator = coordination.get_coordinator(prefix='conductor')
coordinator.start(heartbeat=True)
coordinator.join_group()
except tooz.ToozError:
with excutils.save_and_reraise_exception():
LOG.critical('Failed when connecting to coordination '
'backend.')
@ -109,6 +112,16 @@ class ConductorManager(object):
LOG.info('Successfully connected to coordination backend.')
def del_host(self):
"""Shutdown the ironic inspector conductor service."""
if not CONF.standalone:
try:
coordinator = coordination.get_coordinator(prefix='conductor')
if coordinator.started:
coordinator.leave_group()
coordinator.stop()
except tooz.ToozError:
LOG.exception('Failed to stop coordinator')
if not self._shutting_down.acquire(blocking=False):
LOG.warning('Attempted to shut down while already shutting down')
@ -133,14 +146,6 @@ class ConductorManager(object):
self._shutting_down.release()
if not CONF.standalone:
try:
coordinator = utils.get_coordinator()
if coordinator and coordinator.is_started:
coordinator.stop()
except Exception:
LOG.exception('Failed to stop coordinator')
LOG.info('Shut down successfully')
def _periodics_watchdog(self, callable_, activity, spacing, exc_info,
@ -177,6 +182,10 @@ class ConductorManager(object):
process.reapply(node_uuid, data=data)
@messaging.expected_exceptions(utils.Error)
def do_continue(self, context, data):
return process.process(data)
def periodic_clean_up(): # pragma: no cover
try:

View File

@ -12,6 +12,7 @@
# limitations under the License.
import os
import random
import re
import flask
@ -21,9 +22,11 @@ import six
from ironic_inspector import api_tools
from ironic_inspector.common import context
from ironic_inspector.common import coordination
from ironic_inspector.common.i18n import _
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import rpc
from ironic_inspector.conductor import manager
import ironic_inspector.conf
from ironic_inspector.conf import opts as conf_opts
from ironic_inspector import node_cache
@ -34,7 +37,7 @@ from ironic_inspector import utils
CONF = ironic_inspector.conf.CONF
app = flask.Flask(__name__)
_app = flask.Flask(__name__)
LOG = utils.getProcessingLogger(__name__)
MINIMUM_API_VERSION = (1, 0)
@ -43,6 +46,55 @@ DEFAULT_API_VERSION = CURRENT_API_VERSION
_LOGGING_EXCLUDED_KEYS = ('logs',)
def _init_middleware():
"""Initialize WSGI middleware.
:returns: None
"""
if CONF.auth_strategy != 'noauth':
utils.add_auth_middleware(_app)
else:
LOG.warning('Starting unauthenticated, please check'
' configuration')
utils.add_cors_middleware(_app)
def get_app():
"""Get the flask instance."""
_init_middleware()
return _app
# TODO(kaifeng) Extract rpc related code into a rpcapi module
def get_random_topic():
coordinator = coordination.get_coordinator(prefix='api')
members = coordinator.get_members()
hosts = []
for member in members:
# NOTE(kaifeng) recomposite host in case it contains '.'
parts = member.decode('ascii').split('.')
if len(parts) < 3:
LOG.warning('Found invalid member %s', member)
continue
if parts[1] == 'conductor':
hosts.append('.'.join(parts[2:]))
if not hosts:
raise utils.NoAvailableConductor('No available conductor service')
topic = '%s.%s' % (manager.MANAGER_TOPIC, random.choice(hosts))
return topic
def get_client_compat():
if CONF.standalone:
return rpc.get_client()
topic = get_random_topic()
return rpc.get_client(topic)
def _get_version():
ver = flask.request.headers.get(conf_opts.VERSION_HEADER,
_DEFAULT_API_VERSION)
@ -88,7 +140,16 @@ def convert_exceptions(func):
return wrapper
@app.before_request
@_app.before_first_request
def start_coordinator():
"""Create a coordinator instance for non-standalone case."""
if not CONF.standalone:
coordinator = coordination.get_coordinator(prefix='api')
coordinator.start(heartbeat=False)
LOG.info('Sucessfully created coordinator.')
@_app.before_request
def check_api_version():
requested = _get_version()
@ -101,7 +162,7 @@ def check_api_version():
code=406)
@app.after_request
@_app.after_request
def add_version_headers(res):
res.headers[conf_opts.MIN_VERSION_HEADER] = '%s.%s' % MINIMUM_API_VERSION
res.headers[conf_opts.MAX_VERSION_HEADER] = '%s.%s' % CURRENT_API_VERSION
@ -166,7 +227,7 @@ def api(path, is_public_api=False, rule=None, verb_to_rule_map=None,
:param kwargs: all the rest kwargs are passed to flask app.route
"""
def outer(func):
@app.route(path, **flask_kwargs)
@_app.route(path, **flask_kwargs)
@convert_exceptions
@six.wraps(func)
def wrapper(*args, **kwargs):
@ -206,7 +267,7 @@ def version_root(version):
pat = re.compile(r'^\/%s\/[^\/]*?$' % version)
resources = []
for url in app.url_map.iter_rules():
for url in _app.url_map.iter_rules():
if pat.match(str(url)):
resources.append(url)
@ -229,7 +290,9 @@ def api_continue():
LOG.debug("Received data from the ramdisk: %s", logged_data,
data=data)
return flask.jsonify(process.process(data))
client = get_client_compat()
result = client.call({}, 'do_continue', data=data)
return flask.jsonify(result)
# TODO(sambetts) Add API discovery for this endpoint
@ -253,7 +316,7 @@ def api_introspection(node_id):
'installation cannot manage boot ('
'(can_manage_boot set to False)'),
code=400)
client = rpc.get_client()
client = get_client_compat()
client.call({}, 'do_introspection', node_id=node_id,
manage_boot=manage_boot,
token=flask.request.headers.get('X-Auth-Token'))
@ -279,7 +342,7 @@ def api_introspection_statuses():
@api('/v1/introspection/<node_id>/abort', rule="introspection:abort",
methods=['POST'])
def api_introspection_abort(node_id):
client = rpc.get_client()
client = get_client_compat()
client.call({}, 'do_abort', node_id=node_id,
token=flask.request.headers.get('X-Auth-Token'))
return '', 202
@ -321,7 +384,7 @@ def api_introspection_reapply(node_id):
node = ir_utils.get_node(node_id, fields=['uuid'])
node_id = node.uuid
client = rpc.get_client()
client = get_client_compat()
client.call({}, 'do_reapply', node_uuid=node_id, data=data)
return '', 202
@ -374,6 +437,6 @@ def api_rule(uuid):
return '', 204
@app.errorhandler(404)
@_app.errorhandler(404)
def handle_404(error):
return error_response(error, code=404)

View File

@ -0,0 +1,123 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
from oslo_config import cfg
import tooz
from ironic_inspector.common import coordination
from ironic_inspector.test import base
from ironic_inspector import utils
CONF = cfg.CONF
@mock.patch.object(coordination, 'Coordinator', autospec=True)
class TestGetCoordinator(base.BaseTest):
def setUp(self):
super(TestGetCoordinator, self).setUp()
coordination._COORDINATOR = None
def test_get(self, mock_coordinator):
coordination.get_coordinator()
mock_coordinator.assert_called_once_with(prefix=None)
def test_get_with_prefix(self, mock_coordinator):
coordination.get_coordinator(prefix='conductor')
mock_coordinator.assert_called_once_with(prefix='conductor')
class TestCoordinator(base.BaseTest):
def setUp(self):
super(TestCoordinator, self).setUp()
self.coordinator = coordination.Coordinator(prefix='test')
self.mock_driver = self.useFixture(
fixtures.MockPatchObject(tooz.coordination, 'CoordinationDriver',
autospec=True)).mock
self.mock_get_coordinator = self.useFixture(
fixtures.MockPatchObject(tooz.coordination, 'get_coordinator',
autospec=True)).mock
self.mock_get_coordinator.return_value = self.mock_driver
self.group_name = coordination.COORDINATION_GROUP_NAME.encode('ascii')
def test_start(self):
CONF.set_override('backend_url', 'memcached://1.2.3.4:11211',
'coordination')
CONF.set_override('host', '1.2.3.5')
self.coordinator.start()
self.mock_get_coordinator.assert_called_once_with(
'memcached://1.2.3.4:11211', b'ironic_inspector.test.1.2.3.5')
self.assertTrue(self.coordinator.started)
self.mock_driver.start.assert_called_once_with(start_heart=True)
def test_stop(self):
self.coordinator.started = True
self.coordinator.coordinator = mock.MagicMock()
self.coordinator.stop()
self.assertFalse(self.coordinator.started)
def test__create_group(self):
self.coordinator.start()
self.coordinator._create_group()
self.mock_driver.create_group.assert_called_once_with(self.group_name)
def test_join_group(self):
self.coordinator.start()
self.coordinator.join_group()
self.mock_driver.join_group.assert_called_once_with(self.group_name)
def test_join_group_not_exist(self):
self.coordinator.start()
self.mock_driver.join_group.side_effect = [
tooz.coordination.GroupNotCreated('a group'), mock.Mock()]
self.coordinator.join_group()
self.mock_driver.create_group.assert_called_once_with(self.group_name)
self.mock_driver.join_group.assert_has_calls([
mock.call(self.group_name), mock.call(self.group_name)])
def test_leave_group(self):
self.coordinator.start()
self.coordinator.leave_group()
self.mock_driver.leave_group.assert_called_once_with(self.group_name)
def test_get_members(self):
self.coordinator.start()
mock_resp = mock.Mock()
mock_resp.get.return_value = {'host1', 'host2'}
self.mock_driver.get_members.return_value = mock_resp
members = self.coordinator.get_members()
self.assertEqual(members, {'host1', 'host2'})
self.mock_driver.get_members.assert_called_once_with(self.group_name)
def test_get_members_no_such_group(self):
self.coordinator.start()
self.mock_driver.get_members.side_effect = (
tooz.coordination.GroupNotCreated('a group'))
self.assertEqual(self.coordinator.get_members(), set())
def test_get_lock(self):
self.coordinator.start()
self.coordinator.get_lock('fake-node')
self.mock_driver.get_lock.assert_called_once_with(
b'ironic_inspector.fake-node')
def test_invalid_state(self):
self.assertRaisesRegex(utils.Error, 'Coordinator should be started',
self.coordinator.join_group)
self.assertRaisesRegex(utils.Error, 'Coordinator should be started',
self.coordinator.leave_group)
self.assertRaisesRegex(utils.Error, 'Coordinator should be started',
self.coordinator.get_members)
self.assertRaisesRegex(utils.Error, 'Coordinator should be started',
self.coordinator.get_lock, 'fake id')

View File

@ -14,9 +14,9 @@
import mock
from oslo_config import cfg
from ironic_inspector.common import coordination
from ironic_inspector.common import locking
from ironic_inspector.test import base as test_base
from ironic_inspector import utils
CONF = cfg.CONF
@ -30,15 +30,17 @@ class TestGetLock(test_base.NodeTest):
mock_internal.assert_called_once_with(self.node.uuid)
mock_tooz.assert_not_called()
@mock.patch.object(utils, 'get_coordinator', autospec=True)
@mock.patch.object(coordination, 'get_coordinator', autospec=True)
def test_get_lock_tooz(self, mock_get_coord, mock_tooz, mock_internal):
CONF.set_override('standalone', False)
coordinator = mock.MagicMock()
mock_lock = mock.Mock()
coordinator = mock.Mock()
coordinator.get_lock.return_value = mock_lock
mock_get_coord.return_value = coordinator
locking.get_lock(self.node.uuid)
mock_tooz.assert_called_once_with(coordinator, self.node.uuid)
mock_tooz.assert_called_once_with(mock_lock)
mock_internal.assert_not_called()
@ -61,8 +63,7 @@ class TestInternalLock(test_base.NodeTest):
self.mock_lock.acquire.assert_called_once_with(blocking=True)
def test_release(self):
lock = locking.ToozLock(mock.MagicMock(), self.node.uuid)
lock._lock = self.mock_lock
lock = locking.ToozLock(self.mock_lock)
self.mock_lock._locked = True
lock.release()
@ -86,32 +87,21 @@ class TestToozLock(test_base.NodeTest):
self.mock_lock.acquire.return_value = True
self.mock_lock.acquired = False
def test_lock_default_prefix(self):
mock_coordinator = mock.MagicMock()
locking.ToozLock(mock_coordinator, self.node.uuid)
mock_coordinator.get_lock.assert_called_once_with(
str.encode('ironic_inspector_%s' % self.node.uuid))
def test_acquire(self):
lock = locking.ToozLock(mock.MagicMock(), self.node.uuid)
lock._lock = self.mock_lock
lock = locking.ToozLock(self.mock_lock)
lock.acquire()
self.mock_lock.acquire.assert_called_once_with(blocking=True)
def test_release(self):
self.mock_lock.acquired = True
lock = locking.ToozLock(mock.MagicMock(), self.node.uuid)
lock._lock = self.mock_lock
lock = locking.ToozLock(self.mock_lock)
lock.release()
self.mock_lock.release.assert_called_once_with()
def test_context(self):
lock = locking.ToozLock(mock.MagicMock(), self.node.uuid)
lock._lock = self.mock_lock
lock = locking.ToozLock(self.mock_lock)
with lock:
self.mock_lock.acquire.assert_called_once_with()

View File

@ -20,6 +20,7 @@ import mock
import oslo_messaging as messaging
from oslo_utils import uuidutils
from ironic_inspector.common import coordination
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import rpc
from ironic_inspector.common import swift
@ -31,7 +32,6 @@ from ironic_inspector import node_cache
from ironic_inspector.plugins import base as plugins_base
from ironic_inspector.plugins import example as example_plugin
from ironic_inspector.plugins import introspection_data as intros_data_plugin
from ironic_inspector import process
from ironic_inspector import rules
from ironic_inspector.test import base as test_base
from ironic_inspector import utils
@ -46,20 +46,17 @@ def _get_error(res):
class BaseAPITest(test_base.BaseTest):
def setUp(self):
super(BaseAPITest, self).setUp()
main.app.config['TESTING'] = True
self.app = main.app.test_client()
main._app.config['TESTING'] = True
self.app = main._app.test_client()
CONF.set_override('auth_strategy', 'noauth')
self.uuid = uuidutils.generate_uuid()
class TestApiIntrospect(BaseAPITest):
def setUp(self):
super(TestApiIntrospect, self).setUp()
self.rpc_get_client_mock = self.useFixture(
fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock
self.client_mock = mock.MagicMock(spec=messaging.RPCClient)
self.rpc_get_client_mock.return_value = self.client_mock
class TestApiIntrospect(BaseAPITest):
def test_introspect_no_authentication(self):
CONF.set_override('auth_strategy', 'noauth')
@ -125,40 +122,34 @@ class TestApiIntrospect(BaseAPITest):
self.assertFalse(self.client_mock.call.called)
@mock.patch.object(process, 'process', autospec=True)
class TestApiContinue(BaseAPITest):
def test_continue(self, process_mock):
def test_continue(self):
# should be ignored
CONF.set_override('auth_strategy', 'keystone')
process_mock.return_value = {'result': 42}
self.client_mock.call.return_value = {'result': 42}
res = self.app.post('/v1/continue', data='{"foo": "bar"}')
self.assertEqual(200, res.status_code)
process_mock.assert_called_once_with({"foo": "bar"})
self.client_mock.call.assert_called_once_with({}, 'do_continue',
data={"foo": "bar"})
self.assertEqual({"result": 42}, json.loads(res.data.decode()))
def test_continue_failed(self, process_mock):
process_mock.side_effect = utils.Error("boom")
def test_continue_failed(self):
self.client_mock.call.side_effect = utils.Error("boom")
res = self.app.post('/v1/continue', data='{"foo": "bar"}')
self.assertEqual(400, res.status_code)
process_mock.assert_called_once_with({"foo": "bar"})
self.client_mock.call.assert_called_once_with({}, 'do_continue',
data={"foo": "bar"})
self.assertEqual('boom', _get_error(res))
def test_continue_wrong_type(self, process_mock):
def test_continue_wrong_type(self):
res = self.app.post('/v1/continue', data='42')
self.assertEqual(400, res.status_code)
self.assertEqual('Invalid data: expected a JSON object, got int',
_get_error(res))
self.assertFalse(process_mock.called)
self.client_mock.call.assert_not_called()
class TestApiAbort(BaseAPITest):
def setUp(self):
super(TestApiAbort, self).setUp()
self.rpc_get_client_mock = self.useFixture(
fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock
self.client_mock = mock.MagicMock(spec=messaging.RPCClient)
self.rpc_get_client_mock.return_value = self.client_mock
def test_ok(self):
res = self.app.post('/v1/introspection/%s/abort' % self.uuid,
@ -360,10 +351,6 @@ class TestApiReapply(BaseAPITest):
def setUp(self):
super(TestApiReapply, self).setUp()
self.rpc_get_client_mock = self.useFixture(
fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock
self.client_mock = mock.MagicMock(spec=messaging.RPCClient)
self.rpc_get_client_mock.return_value = self.client_mock
CONF.set_override('store_data', 'swift', 'processing')
def test_api_ok(self):
@ -590,7 +577,7 @@ class TestApiVersions(BaseAPITest):
}]}
self.assertEqual(expected, json_data)
@mock.patch.object(main.app.url_map, "iter_rules", autospec=True)
@mock.patch.object(main._app.url_map, "iter_rules", autospec=True)
def test_version_endpoint(self, mock_rules):
mock_rules.return_value = ["/v1/endpoint1", "/v1/endpoint2/<uuid>",
"/v1/endpoint1/<name>",
@ -626,11 +613,8 @@ class TestApiVersions(BaseAPITest):
# API version on unknown pages
self._check_version_present(self.app.get('/v1/foobar'))
@mock.patch.object(rpc, 'get_client', autospec=True)
@mock.patch.object(node_cache, 'get_node', autospec=True)
def test_usual_requests(self, get_mock, rpc_mock):
client_mock = mock.MagicMock(spec=messaging.RPCClient)
rpc_mock.return_value = client_mock
def test_usual_requests(self, get_mock):
get_mock.return_value = node_cache.NodeInfo(uuid=self.uuid,
started_at=42.0)
# Successful
@ -681,3 +665,71 @@ class TestPlugins(unittest.TestCase):
def test_manager_is_cached(self):
self.assertIs(plugins_base.processing_hooks_manager(),
plugins_base.processing_hooks_manager())
class TestTopic(test_base.BaseTest):
def setUp(self):
super(TestTopic, self).setUp()
self.transport_mock = self.useFixture(
fixtures.MockPatchObject(rpc, 'get_transport',
autospec=True)).mock
self.target_mock = self.useFixture(
fixtures.MockPatchObject(rpc.messaging, 'Target',
autospec=True)).mock
self.rpcclient_mock = self.useFixture(
fixtures.MockPatchObject(rpc.messaging, 'RPCClient',
autospec=True)).mock
CONF.set_override('host', 'a-host')
def test_get_client_compat_standalone(self):
main.get_client_compat()
self.target_mock.assert_called_with(topic='ironic_inspector.conductor',
server='a-host', version=mock.ANY)
@mock.patch.object(main, 'get_random_topic', autospec=True)
def test_get_client_compat_non_standalone(self, mock_get_topic):
CONF.set_override('host', 'a-host')
CONF.set_override('standalone', False)
mock_get_topic.return_value = 'hello'
main.get_client_compat()
self.target_mock.assert_called_with(
topic='hello', version=mock.ANY)
@mock.patch.object(coordination, 'get_coordinator')
def test_get_random_topic(self, mock_get_coordinator):
mock_coordinator = mock.Mock(spec=['get_members'])
members = [('ironic_inspector.conductor.host%s' % i).encode('ascii')
for i in range(5)]
topics = [('ironic_inspector.conductor.host%s' % i) for i in range(5)]
mock_coordinator.get_members.return_value = set(members)
mock_get_coordinator.return_value = mock_coordinator
for i in range(10):
topic = main.get_random_topic()
self.assertIn(topic, topics)
@mock.patch.object(coordination, 'get_coordinator')
def test_get_random_topic_host_with_domain(self, mock_get_coordinator):
mock_coordinator = mock.Mock(spec=['get_members'])
members = ['ironic_inspector.conductor.'
'local.domain'.encode('ascii')]
mock_coordinator.get_members.return_value = set(members)
mock_get_coordinator.return_value = mock_coordinator
topic = main.get_random_topic()
self.assertEqual(topic, 'ironic_inspector.conductor.local.domain')
@mock.patch.object(coordination, 'get_coordinator')
def test_get_random_topic_host_bypass_invalid(self, mock_get_coordinator):
mock_coordinator = mock.Mock(spec=['get_members'])
members = ['this_should_not_happen'.encode('ascii')]
mock_coordinator.get_members.return_value = set(members)
mock_get_coordinator.return_value = mock_coordinator
self.assertRaisesRegex(utils.NoAvailableConductor,
'No available conductor',
main.get_random_topic)
@mock.patch.object(coordination, 'get_coordinator')
def test_get_random_topic_no_member(self, mock_get_coordinator):
mock_coordinator = mock.Mock(spec=['get_members'])
mock_coordinator.get_members.return_value = set()
mock_get_coordinator.return_value = mock_coordinator
self.assertRaises(utils.NoAvailableConductor, main.get_random_topic)

View File

@ -17,7 +17,9 @@ import fixtures
from ironic_lib import mdns
import mock
import oslo_messaging as messaging
import tooz
from ironic_inspector.common import coordination
from ironic_inspector.common import keystone
from ironic_inspector.common import swift
from ironic_inspector.conductor import manager
@ -139,7 +141,7 @@ class TestManagerInitHost(BaseManagerTest):
mock_zc.return_value.register_service.assert_called_once_with(
'baremetal-introspection', mock_endpoint.return_value)
@mock.patch.object(utils, 'get_coordinator', autospec=True)
@mock.patch.object(coordination, 'get_coordinator', autospec=True)
@mock.patch.object(keystone, 'get_endpoint', autospec=True)
def test_init_host_with_coordinator(self, mock_endpoint, mock_get_coord):
CONF.set_override('standalone', False)
@ -150,24 +152,24 @@ class TestManagerInitHost(BaseManagerTest):
self.mock_validate_processing_hooks.assert_called_once_with()
self.mock_filter.init_filter.assert_called_once_with()
self.assert_periodics()
mock_get_coord.assert_called_once_with()
mock_coordinator.start.assert_called_once_with()
mock_get_coord.assert_called_once_with(prefix='conductor')
mock_coordinator.start.assert_called_once_with(heartbeat=True)
@mock.patch.object(manager.ConductorManager, 'del_host')
@mock.patch.object(utils, 'get_coordinator', autospec=True)
@mock.patch.object(coordination, 'get_coordinator', autospec=True)
@mock.patch.object(keystone, 'get_endpoint', autospec=True)
def test_init_host_with_coordinator_failed(self, mock_endpoint,
mock_get_coord, mock_del_host):
CONF.set_override('standalone', False)
mock_get_coord.side_effect = (utils.Error('Reaching coordination '
'backend failed.'),
mock_get_coord.side_effect = (tooz.ToozError('Reaching coordination '
'backend failed.'),
None)
self.assertRaises(utils.Error, self.manager.init_host)
self.assertRaises(tooz.ToozError, self.manager.init_host)
self.mock_db_init.assert_called_once_with()
self.mock_validate_processing_hooks.assert_called_once_with()
self.mock_filter.init_filter.assert_called_once_with()
self.assert_periodics()
mock_get_coord.assert_called_once_with()
mock_get_coord.assert_called_once_with(prefix='conductor')
mock_del_host.assert_called_once_with()
@ -282,11 +284,11 @@ class TestManagerDelHost(BaseManagerTest):
self.mock_filter.tear_down_filter.assert_called_once_with()
self.mock__shutting_down.release.assert_called_once_with()
@mock.patch.object(utils, 'get_coordinator', autospec=True)
@mock.patch.object(coordination, 'get_coordinator', autospec=True)
def test_del_host_with_coordinator(self, mock_get_coord):
CONF.set_override('standalone', False)
mock_coordinator = mock.MagicMock()
mock_coordinator.is_started = True
mock_coordinator = mock.Mock(spec=coordination.Coordinator)
mock_coordinator.started = True
mock_get_coord.return_value = mock_coordinator
self.manager.del_host()
@ -500,3 +502,20 @@ class TestManagerReapply(BaseManagerTest):
store_mock.assert_called_once_with(self.uuid, self.data,
processed=False)
self.assertFalse(get_mock.called)
class TestManagerContinue(BaseManagerTest):
@mock.patch.object(process, 'process', autospec=True)
def test_continue_ok(self, process_mock):
self.manager.do_continue(self.context, self.data)
process_mock.assert_called_once_with(self.data)
@mock.patch.object(process, 'process', autospec=True)
def test_continue_failed(self, process_mock):
process_mock.side_effect = utils.Error("Boom.")
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.manager.do_continue,
self.context, self.data)
self.assertEqual(utils.Error, exc.exc_info[0])
process_mock.assert_called_once_with(self.data)

View File

@ -154,14 +154,3 @@ class TestIsoTimestamp(base.BaseTest):
def test_none(self):
self.assertIsNone(utils.iso_timestamp(None))
@mock.patch.object(utils, 'coordination', autospec=True)
class TestGetCoordinator(base.BaseTest):
def test_get(self, mock_coordination):
CONF.set_override('backend_url', 'etcd3://1.2.3.4:2379',
'coordination')
CONF.set_override('host', '1.2.3.5')
utils.get_coordinator()
mock_coordination.get_coordinator.assert_called_once_with(
'etcd3://1.2.3.4:2379', b'1.2.3.5')

View File

@ -15,7 +15,9 @@ import eventlet # noqa
import fixtures
from oslo_config import cfg
from ironic_inspector import main
from ironic_inspector.test import base as test_base
from ironic_inspector import utils
from ironic_inspector import wsgi_service
CONF = cfg.CONF
@ -26,37 +28,32 @@ class BaseWSGITest(test_base.BaseTest):
# generic mocks setUp method
super(BaseWSGITest, self).setUp()
self.app = self.useFixture(fixtures.MockPatchObject(
wsgi_service.app, 'app', autospec=True)).mock
main, '_app', autospec=True)).mock
self.server = self.useFixture(fixtures.MockPatchObject(
wsgi_service.wsgi, 'Server', autospec=True)).mock
self.mock_log = self.useFixture(fixtures.MockPatchObject(
wsgi_service, 'LOG')).mock
self.service = wsgi_service.WSGIService()
self.service.server = self.server
class TestWSGIServiceInitMiddleware(BaseWSGITest):
def setUp(self):
super(TestWSGIServiceInitMiddleware, self).setUp()
self.mock_add_auth_middleware = self.useFixture(
fixtures.MockPatchObject(wsgi_service.utils,
'add_auth_middleware')).mock
fixtures.MockPatchObject(utils, 'add_auth_middleware')).mock
self.mock_add_cors_middleware = self.useFixture(
fixtures.MockPatchObject(wsgi_service.utils,
'add_cors_middleware')).mock
fixtures.MockPatchObject(utils, 'add_cors_middleware')).mock
self.mock_log = self.useFixture(fixtures.MockPatchObject(
main, 'LOG')).mock
# 'positive' settings
CONF.set_override('auth_strategy', 'keystone')
CONF.set_override('store_data', 'swift', 'processing')
def test_init_middleware(self):
self.service._init_middleware()
wsgi_service.WSGIService()
self.mock_add_auth_middleware.assert_called_once_with(self.app)
self.mock_add_cors_middleware.assert_called_once_with(self.app)
def test_init_middleware_noauth(self):
CONF.set_override('auth_strategy', 'noauth')
self.service._init_middleware()
wsgi_service.WSGIService()
self.mock_add_auth_middleware.assert_not_called()
self.mock_log.warning.assert_called_once_with(
@ -67,8 +64,10 @@ class TestWSGIServiceInitMiddleware(BaseWSGITest):
class TestWSGIService(BaseWSGITest):
def setUp(self):
super(TestWSGIService, self).setUp()
self.service = wsgi_service.WSGIService()
self.service.server = self.server
self.mock__init_middleware = self.useFixture(fixtures.MockPatchObject(
self.service, '_init_middleware')).mock
main, '_init_middleware')).mock
# 'positive' settings
CONF.set_override('listen_address', '42.42.42.42')
@ -76,8 +75,6 @@ class TestWSGIService(BaseWSGITest):
def test_start(self):
self.service.start()
self.mock__init_middleware.assert_called_once_with()
self.server.start.assert_called_once_with()
def test_stop(self):

View File

@ -21,7 +21,6 @@ from oslo_config import cfg
from oslo_log import log
from oslo_middleware import cors as cors_middleware
import pytz
from tooz import coordination
from ironic_inspector.common.i18n import _
from ironic_inspector import policy
@ -162,6 +161,13 @@ class IntrospectionDataNotFound(NotFoundInCacheError):
"""Introspection data not found."""
class NoAvailableConductor(Error):
"""No available conductor in the service group."""
def __init__(self, msg, **kwargs):
super(NoAvailableConductor, self).__init__(msg, code=503, **kwargs)
def executor():
"""Return the current futures executor."""
global _EXECUTOR
@ -248,14 +254,3 @@ def iso_timestamp(timestamp=None, tz=pytz.timezone('utc')):
return None
date = datetime.datetime.fromtimestamp(timestamp, tz=tz)
return date.isoformat()
_COORDINATOR = None
def get_coordinator():
global _COORDINATOR
if _COORDINATOR is None:
_COORDINATOR = coordination.get_coordinator(
CONF.coordination.backend_url, str.encode(CONF.host))
return _COORDINATOR

View File

@ -15,8 +15,7 @@ from oslo_log import log
from oslo_service import service
from oslo_service import wsgi
from ironic_inspector import main as app
from ironic_inspector import utils
from ironic_inspector import main
LOG = log.getLogger(__name__)
CONF = cfg.CONF
@ -26,31 +25,18 @@ class WSGIService(service.Service):
"""Provides ability to launch API from wsgi app."""
def __init__(self):
self.app = app.app
self.app = main.get_app()
self.server = wsgi.Server(CONF, 'ironic_inspector',
self.app,
host=CONF.listen_address,
port=CONF.listen_port,
use_ssl=CONF.use_ssl)
def _init_middleware(self):
"""Initialize WSGI middleware.
:returns: None
"""
if CONF.auth_strategy != 'noauth':
utils.add_auth_middleware(self.app)
else:
LOG.warning('Starting unauthenticated, please check'
' configuration')
utils.add_cors_middleware(self.app)
def start(self):
"""Start serving this service using loaded configuration.
:returns: None
"""
self._init_middleware()
self.server.start()
def stop(self):

View File

@ -0,0 +1,14 @@
---
features:
- |
Allows splitting the ironic-inspector service to ironic-inspector-api and
ironic-inspector-conductor which coordinate via tooz and its underlying
backend. A new configuration option ``[DEFAULT]standalone`` is introduced
to enable this feature. The configuration defaults to True,
ironic-inspector runs as a single service, which is compatible with the
old behavior. When set to False, ``ironic-inspector-api-wsgi`` is used to
start the API service, and ``ironic-inspector-conductor`` is used to start
the conductor service. For ironic-inspector running at non-standalone
mode, user needs to set the new configuration option
``[coordination]backend_url``, which specifies the backend used for
coordination.

View File

@ -28,8 +28,11 @@ packages =
console_scripts =
ironic-inspector = ironic_inspector.cmd.all:main
ironic-inspector-dbsync = ironic_inspector.cmd.dbsync:main
ironic-inspector-conductor = ironic_inspector.cmd.conductor:main
ironic-inspector-rootwrap = oslo_rootwrap.cmd:main
ironic-inspector-migrate-data = ironic_inspector.cmd.migration:main
wsgi_scripts =
ironic-inspector-api-wsgi = ironic_inspector.cmd.wsgi:initialize_wsgi_app
ironic_inspector.hooks.processing =
scheduler = ironic_inspector.plugins.standard:SchedulerHook
validate_interfaces = ironic_inspector.plugins.standard:ValidateInterfacesHook