Introduce oslo.messaging and sync rpc call

Adds oslo.messaging to ironic-inspector, and convert
inspect, abort and reapply to synchronized rpc calls.

This is the first step of API and worker seperation.

Change-Id: I15e86d7feb623b6b2889891b9700e5de6b3164cd
Story: #2001842
Task: # 12609
This commit is contained in:
Kaifeng Wang 2018-04-17 19:36:17 +08:00
parent 70cb1ec7f5
commit 6469a1fc0f
11 changed files with 328 additions and 71 deletions

View File

@ -0,0 +1,56 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_messaging.rpc import dispatcher
from ironic_inspector.conductor import manager
CONF = cfg.CONF
_SERVER = None
TRANSPORT = None
TOPIC = 'ironic-inspector-worker'
SERVER_NAME = 'ironic-inspector-rpc-server'
def get_transport():
global TRANSPORT
if TRANSPORT is None:
TRANSPORT = messaging.get_rpc_transport(CONF, url='fake://')
return TRANSPORT
def get_client():
target = messaging.Target(topic=TOPIC, server=SERVER_NAME,
version='1.0')
transport = get_transport()
return messaging.RPCClient(transport, target)
def get_server():
"""Get the singleton RPC server."""
global _SERVER
if _SERVER is None:
transport = get_transport()
target = messaging.Target(topic=TOPIC, server=SERVER_NAME,
version='1.0')
mgr = manager.ConductorManager()
_SERVER = messaging.get_rpc_server(
transport, target, [mgr], executor='eventlet',
access_policy=dispatcher.DefaultRPCAccessPolicy)
return _SERVER

View File

View File

@ -0,0 +1,37 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import oslo_messaging as messaging
from ironic_inspector import introspect
from ironic_inspector import process
from ironic_inspector import utils
class ConductorManager(object):
"""ironic inspector conductor manager"""
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
@messaging.expected_exceptions(utils.Error)
def do_introspection(self, context, node_id, token=None):
introspect.introspect(node_id, token=token)
@messaging.expected_exceptions(utils.Error)
def do_abort(self, context, node_id, token=None):
introspect.abort(node_id, token=token)
@messaging.expected_exceptions(utils.Error)
def do_reapply(self, context, node_id, token=None):
process.reapply(node_id)

View File

@ -23,10 +23,10 @@ from ironic_inspector import api_tools
from ironic_inspector.common import context
from ironic_inspector.common.i18n import _
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import rpc
from ironic_inspector.common import swift
import ironic_inspector.conf
from ironic_inspector.conf import opts as conf_opts
from ironic_inspector import introspect
from ironic_inspector import node_cache
from ironic_inspector import process
from ironic_inspector import rules
@ -239,7 +239,8 @@ def api_continue():
methods=['GET', 'POST'])
def api_introspection(node_id):
if flask.request.method == 'POST':
introspect.introspect(node_id,
client = rpc.get_client()
client.call({}, 'do_introspection', node_id=node_id,
token=flask.request.headers.get('X-Auth-Token'))
return '', 202
else:
@ -263,7 +264,9 @@ def api_introspection_statuses():
@api('/v1/introspection/<node_id>/abort', rule="introspection:abort",
methods=['POST'])
def api_introspection_abort(node_id):
introspect.abort(node_id, token=flask.request.headers.get('X-Auth-Token'))
client = rpc.get_client()
client.call({}, 'do_abort', node_id=node_id,
token=flask.request.headers.get('X-Auth-Token'))
return '', 202
@ -291,7 +294,8 @@ def api_introspection_reapply(node_id):
'supported yet'), code=400)
if CONF.processing.store_data == 'swift':
process.reapply(node_id)
client = rpc.get_client()
client.call({}, 'do_reapply', node_id=node_id)
return '', 202
else:
return error_response(_('Inspector is not configured to store'

View File

@ -136,6 +136,13 @@ class Base(base.NodeTest):
conf_file = get_test_conf_file()
self.cfg.set_config_files([conf_file])
# FIXME(milan) FakeListener.poll calls time.sleep() which leads to
# busy polling with no sleep at all, effectively blocking the whole
# process by consuming all CPU cycles in a single thread. MonkeyPatch
# with eventlet.sleep seems to help this.
self.useFixture(fixtures.MonkeyPatch(
'oslo_messaging._drivers.impl_fake.time.sleep', eventlet.sleep))
def tearDown(self):
super(Base, self).tearDown()
node_cache._delete_node(self.uuid)

View File

@ -15,13 +15,15 @@ import datetime
import json
import unittest
import fixtures
import mock
import oslo_messaging as messaging
from oslo_utils import uuidutils
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import rpc
import ironic_inspector.conf
from ironic_inspector.conf import opts as conf_opts
from ironic_inspector import introspect
from ironic_inspector import introspection_state as istate
from ironic_inspector import main
from ironic_inspector import node_cache
@ -49,36 +51,44 @@ class BaseAPITest(test_base.BaseTest):
class TestApiIntrospect(BaseAPITest):
@mock.patch.object(introspect, 'introspect', autospec=True)
def test_introspect_no_authentication(self, introspect_mock):
def setUp(self):
super(TestApiIntrospect, self).setUp()
self.rpc_get_client_mock = self.useFixture(
fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock
self.client_mock = mock.MagicMock(spec=messaging.RPCClient)
self.rpc_get_client_mock.return_value = self.client_mock
def test_introspect_no_authentication(self):
CONF.set_override('auth_strategy', 'noauth')
res = self.app.post('/v1/introspection/%s' % self.uuid)
self.assertEqual(202, res.status_code)
introspect_mock.assert_called_once_with(self.uuid,
self.client_mock.call.assert_called_once_with({}, 'do_introspection',
node_id=self.uuid,
token=None)
@mock.patch.object(introspect, 'introspect', autospec=True)
def test_intospect_failed(self, introspect_mock):
introspect_mock.side_effect = utils.Error("boom")
def test_intospect_failed(self):
self.client_mock.call.side_effect = utils.Error("boom")
res = self.app.post('/v1/introspection/%s' % self.uuid)
self.assertEqual(400, res.status_code)
self.assertEqual(
'boom',
json.loads(res.data.decode('utf-8'))['error']['message'])
introspect_mock.assert_called_once_with(
self.uuid,
self.client_mock.call.assert_called_once_with({}, 'do_introspection',
node_id=self.uuid,
token=None)
@mock.patch.object(utils, 'check_auth', autospec=True)
@mock.patch.object(introspect, 'introspect', autospec=True)
def test_introspect_failed_authentication(self, introspect_mock,
auth_mock):
def test_introspect_failed_authentication(self, auth_mock):
CONF.set_override('auth_strategy', 'keystone')
auth_mock.side_effect = utils.Error('Boom', code=403)
res = self.app.post('/v1/introspection/%s' % self.uuid,
headers={'X-Auth-Token': 'token'})
self.assertEqual(403, res.status_code)
self.assertFalse(introspect_mock.called)
self.assertFalse(self.client_mock.call.called)
@mock.patch.object(process, 'process', autospec=True)
@ -107,45 +117,57 @@ class TestApiContinue(BaseAPITest):
self.assertFalse(process_mock.called)
@mock.patch.object(introspect, 'abort', autospec=True)
class TestApiAbort(BaseAPITest):
def test_ok(self, abort_mock):
abort_mock.return_value = '', 202
def setUp(self):
super(TestApiAbort, self).setUp()
self.rpc_get_client_mock = self.useFixture(
fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock
self.client_mock = mock.MagicMock(spec=messaging.RPCClient)
self.rpc_get_client_mock.return_value = self.client_mock
def test_ok(self):
res = self.app.post('/v1/introspection/%s/abort' % self.uuid,
headers={'X-Auth-Token': 'token'})
abort_mock.assert_called_once_with(self.uuid, token='token')
self.client_mock.call.assert_called_once_with({}, 'do_abort',
node_id=self.uuid,
token='token')
self.assertEqual(202, res.status_code)
self.assertEqual(b'', res.data)
def test_no_authentication(self, abort_mock):
abort_mock.return_value = b'', 202
def test_no_authentication(self):
res = self.app.post('/v1/introspection/%s/abort' % self.uuid)
abort_mock.assert_called_once_with(self.uuid, token=None)
self.client_mock.call.assert_called_once_with({}, 'do_abort',
node_id=self.uuid,
token=None)
self.assertEqual(202, res.status_code)
self.assertEqual(b'', res.data)
def test_node_not_found(self, abort_mock):
def test_node_not_found(self):
exc = utils.Error("Not Found.", code=404)
abort_mock.side_effect = exc
self.client_mock.call.side_effect = exc
res = self.app.post('/v1/introspection/%s/abort' % self.uuid)
abort_mock.assert_called_once_with(self.uuid, token=None)
self.client_mock.call.assert_called_once_with({}, 'do_abort',
node_id=self.uuid,
token=None)
self.assertEqual(404, res.status_code)
data = json.loads(str(res.data.decode()))
self.assertEqual(str(exc), data['error']['message'])
def test_abort_failed(self, abort_mock):
def test_abort_failed(self):
exc = utils.Error("Locked.", code=409)
abort_mock.side_effect = exc
self.client_mock.call.side_effect = exc
res = self.app.post('/v1/introspection/%s/abort' % self.uuid)
abort_mock.assert_called_once_with(self.uuid, token=None)
self.client_mock.call.assert_called_once_with({}, 'do_abort',
node_id=self.uuid,
token=None)
self.assertEqual(409, res.status_code)
data = json.loads(res.data.decode())
self.assertEqual(str(exc), data['error']['message'])
@ -297,29 +319,33 @@ class TestApiGetData(BaseAPITest):
get_mock.assert_called_once_with('name1', fields=['uuid'])
@mock.patch.object(process, 'reapply', autospec=True)
class TestApiReapply(BaseAPITest):
def setUp(self):
super(TestApiReapply, self).setUp()
self.rpc_get_client_mock = self.useFixture(
fixtures.MockPatchObject(rpc, 'get_client', autospec=True)).mock
self.client_mock = mock.MagicMock(spec=messaging.RPCClient)
self.rpc_get_client_mock.return_value = self.client_mock
CONF.set_override('store_data', 'swift', 'processing')
def test_ok(self, reapply_mock):
def test_ok(self):
self.app.post('/v1/introspection/%s/data/unprocessed' %
self.uuid)
reapply_mock.assert_called_once_with(self.uuid)
self.client_mock.call.assert_called_once_with({}, 'do_reapply',
node_id=self.uuid)
def test_user_data(self, reapply_mock):
def test_user_data(self):
res = self.app.post('/v1/introspection/%s/data/unprocessed' %
self.uuid, data='some data')
self.assertEqual(400, res.status_code)
message = json.loads(res.data.decode())['error']['message']
self.assertEqual('User data processing is not supported yet',
message)
self.assertFalse(reapply_mock.called)
self.assertFalse(self.client_mock.call.called)
def test_swift_disabled(self, reapply_mock):
def test_swift_disabled(self):
CONF.set_override('store_data', 'none', 'processing')
res = self.app.post('/v1/introspection/%s/data/unprocessed' %
@ -330,35 +356,11 @@ class TestApiReapply(BaseAPITest):
'data. Set the [processing] store_data '
'configuration option to change this.',
message)
self.assertFalse(reapply_mock.called)
self.assertFalse(self.client_mock.call.called)
def test_node_locked(self, reapply_mock):
exc = utils.Error('Locked.', code=409)
reapply_mock.side_effect = exc
res = self.app.post('/v1/introspection/%s/data/unprocessed' %
self.uuid)
self.assertEqual(409, res.status_code)
message = json.loads(res.data.decode())['error']['message']
self.assertEqual(str(exc), message)
reapply_mock.assert_called_once_with(self.uuid)
def test_node_not_found(self, reapply_mock):
exc = utils.Error('Not found.', code=404)
reapply_mock.side_effect = exc
res = self.app.post('/v1/introspection/%s/data/unprocessed' %
self.uuid)
self.assertEqual(404, res.status_code)
message = json.loads(res.data.decode())['error']['message']
self.assertEqual(str(exc), message)
reapply_mock.assert_called_once_with(self.uuid)
def test_generic_error(self, reapply_mock):
def test_generic_error(self):
exc = utils.Error('Oops', code=400)
reapply_mock.side_effect = exc
self.client_mock.call.side_effect = exc
res = self.app.post('/v1/introspection/%s/data/unprocessed' %
self.uuid)
@ -366,7 +368,8 @@ class TestApiReapply(BaseAPITest):
self.assertEqual(400, res.status_code)
message = json.loads(res.data.decode())['error']['message']
self.assertEqual(str(exc), message)
reapply_mock.assert_called_once_with(self.uuid)
self.client_mock.call.assert_called_once_with({}, 'do_reapply',
node_id=self.uuid)
class TestApiRules(BaseAPITest):
@ -566,8 +569,11 @@ class TestApiVersions(BaseAPITest):
# API version on unknown pages
self._check_version_present(self.app.get('/v1/foobar'))
@mock.patch.object(rpc, 'get_client', autospec=True)
@mock.patch.object(node_cache, 'get_node', autospec=True)
def test_usual_requests(self, get_mock):
def test_usual_requests(self, get_mock, rpc_mock):
client_mock = mock.MagicMock(spec=messaging.RPCClient)
rpc_mock.return_value = client_mock
get_mock.return_value = node_cache.NodeInfo(uuid=self.uuid,
started_at=42.0)
# Successful

View File

@ -0,0 +1,133 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import oslo_messaging as messaging
from ironic_inspector.conductor import manager
import ironic_inspector.conf
from ironic_inspector import introspect
from ironic_inspector import process
from ironic_inspector.test import base as test_base
from ironic_inspector import utils
CONF = ironic_inspector.conf.CONF
class BaseManagerTest(test_base.NodeTest):
def setUp(self):
super(BaseManagerTest, self).setUp()
self.manager = manager.ConductorManager()
self.context = {}
self.token = None
class TestManagerIntrospect(BaseManagerTest):
@mock.patch.object(introspect, 'introspect', autospec=True)
def test_do_introspect(self, introspect_mock):
self.manager.do_introspection(self.context, self.uuid, self.token)
introspect_mock.assert_called_once_with(self.uuid, self.token)
@mock.patch.object(introspect, 'introspect', autospec=True)
def test_introspect_failed(self, introspect_mock):
introspect_mock.side_effect = utils.Error("boom")
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.manager.do_introspection,
self.context, self.uuid, self.token)
self.assertEqual(utils.Error, exc.exc_info[0])
introspect_mock.assert_called_once_with(self.uuid, token=None)
class TestManagerAbort(BaseManagerTest):
@mock.patch.object(introspect, 'abort', autospec=True)
def test_abort_ok(self, abort_mock):
self.manager.do_abort(self.context, self.uuid, self.token)
abort_mock.assert_called_once_with(self.uuid, token=self.token)
@mock.patch.object(introspect, 'abort', autospec=True)
def test_abort_node_not_found(self, abort_mock):
abort_mock.side_effect = utils.Error("Not Found.", code=404)
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.manager.do_abort,
self.context, self.uuid, self.token)
self.assertEqual(utils.Error, exc.exc_info[0])
abort_mock.assert_called_once_with(self.uuid, token=None)
@mock.patch.object(introspect, 'abort', autospec=True)
def test_abort_failed(self, abort_mock):
exc = utils.Error("Locked.", code=409)
abort_mock.side_effect = exc
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.manager.do_abort,
self.context, self.uuid, self.token)
self.assertEqual(utils.Error, exc.exc_info[0])
abort_mock.assert_called_once_with(self.uuid, token=None)
@mock.patch.object(process, 'reapply', autospec=True)
class TestManagerReapply(BaseManagerTest):
def setUp(self):
super(TestManagerReapply, self).setUp()
CONF.set_override('store_data', 'swift', 'processing')
def test_ok(self, reapply_mock):
self.manager.do_reapply(self.context, self.uuid)
reapply_mock.assert_called_once_with(self.uuid)
def test_node_locked(self, reapply_mock):
exc = utils.Error('Locked.', code=409)
reapply_mock.side_effect = exc
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.manager.do_reapply,
self.context, self.uuid)
self.assertEqual(utils.Error, exc.exc_info[0])
self.assertIn('Locked.', str(exc.exc_info[1]))
self.assertEqual(409, exc.exc_info[1].http_code)
reapply_mock.assert_called_once_with(self.uuid)
def test_node_not_found(self, reapply_mock):
exc = utils.Error('Not found.', code=404)
reapply_mock.side_effect = exc
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.manager.do_reapply,
self.context, self.uuid)
self.assertEqual(utils.Error, exc.exc_info[0])
self.assertIn('Not found.', str(exc.exc_info[1]))
self.assertEqual(404, exc.exc_info[1].http_code)
reapply_mock.assert_called_once_with(self.uuid)
def test_generic_error(self, reapply_mock):
exc = utils.Error('Oops', code=400)
reapply_mock.side_effect = exc
exc = self.assertRaises(messaging.rpc.ExpectedException,
self.manager.do_reapply,
self.context, self.uuid)
self.assertEqual(utils.Error, exc.exc_info[0])
self.assertIn('Oops', str(exc.exc_info[1]))
self.assertEqual(400, exc.exc_info[1].http_code)
reapply_mock.assert_called_once_with(self.uuid)

View File

@ -20,6 +20,7 @@ import fixtures
import mock
from oslo_config import cfg
from ironic_inspector.common import rpc
from ironic_inspector.test import base as test_base
from ironic_inspector import wsgi_service
@ -40,6 +41,8 @@ class BaseWSGITest(test_base.BaseTest):
self.mock_log = self.useFixture(fixtures.MockPatchObject(
wsgi_service, 'LOG')).mock
self.service = wsgi_service.WSGIService()
self.mock_rpc_server = self.useFixture(fixtures.MockPatchObject(
rpc, 'get_server')).mock
class TestWSGIServiceInitMiddleware(BaseWSGITest):
@ -199,6 +202,8 @@ class TestWSGIServiceRun(BaseWSGITest):
self.mock__create_ssl_context.assert_called_once_with()
self.mock__init_middleware.assert_called_once_with()
self.mock__init_host.assert_called_once_with()
self.mock_rpc_server.assert_called_once_with()
self.service.rpc_server.start.assert_called_once_with()
self.app.run.assert_called_once_with(
host=CONF.listen_address, port=CONF.listen_port,
ssl_context=self.mock__create_ssl_context.return_value)
@ -247,6 +252,7 @@ class TestWSGIServiceShutdown(BaseWSGITest):
self.service, '_periodics_worker')).mock
self.mock_exit = self.useFixture(fixtures.MockPatchObject(
wsgi_service.sys, 'exit')).mock
self.service.rpc_server = self.mock_rpc_server
def test_shutdown(self):
class MyError(Exception):

View File

@ -22,6 +22,7 @@ from oslo_log import log
from oslo_utils import reflection
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector.common import rpc
from ironic_inspector import db
from ironic_inspector import main as app
from ironic_inspector import node_cache
@ -148,6 +149,8 @@ class WSGIService(object):
LOG.debug('Shutting down')
self.rpc_server.stop()
if self._periodics_worker is not None:
try:
self._periodics_worker.stop()
@ -182,6 +185,9 @@ class WSGIService(object):
self._init_host()
self.rpc_server = rpc.get_server()
self.rpc_server.start()
try:
self.app.run(**app_kwargs)
except Exception as e:

View File

@ -67,6 +67,7 @@ oslo.context==2.19.2
oslo.db==4.27.0
oslo.i18n==3.15.3
oslo.log==3.36.0
oslo.messaging==5.32.0
oslo.middleware==3.31.0
oslo.policy==1.30.0
oslo.rootwrap==5.8.0

View File

@ -24,6 +24,7 @@ oslo.context>=2.19.2 # Apache-2.0
oslo.db>=4.27.0 # Apache-2.0
oslo.i18n>=3.15.3 # Apache-2.0
oslo.log>=3.36.0 # Apache-2.0
oslo.messaging>=5.32.0 # Apache-2.0
oslo.middleware>=3.31.0 # Apache-2.0
oslo.policy>=1.30.0 # Apache-2.0
oslo.rootwrap>=5.8.0 # Apache-2.0