Use futurist library for asynchronous tasks
A green thread is now used instead of spawn_n for running asynchronous operations during introspection, processing and aborting. The existing periodic tasks are now run using Futurist PeriodicWorker. Main shut down procedure was split into a separate function for convenience. Also updated the example.conf to the latest versions (some pending updates from 3rdparty libraries included). Change-Id: Id0efa31aee68a80ec55e4136c53189484b452559
This commit is contained in:
parent
dcb65fc5a9
commit
5b02024cca
29
example.conf
29
example.conf
@ -50,6 +50,7 @@
|
|||||||
#ssl_key_path =
|
#ssl_key_path =
|
||||||
|
|
||||||
# The green thread pool size. (integer value)
|
# The green thread pool size. (integer value)
|
||||||
|
# Minimum value: 2
|
||||||
#max_concurrency = 1000
|
#max_concurrency = 1000
|
||||||
|
|
||||||
# Delay (in seconds) between two introspections. (integer value)
|
# Delay (in seconds) between two introspections. (integer value)
|
||||||
@ -84,20 +85,13 @@
|
|||||||
# The name of a logging configuration file. This file is appended to
|
# The name of a logging configuration file. This file is appended to
|
||||||
# any existing logging configuration files. For details about logging
|
# any existing logging configuration files. For details about logging
|
||||||
# configuration files, see the Python logging module documentation.
|
# configuration files, see the Python logging module documentation.
|
||||||
# Note that when logging configuration files are used all logging
|
# Note that when logging configuration files are used then all logging
|
||||||
# configuration is defined in the configuration file and other logging
|
# configuration is set in the configuration file and other logging
|
||||||
# configuration options are ignored (for example, log_format). (string
|
# configuration options are ignored (for example,
|
||||||
# value)
|
# logging_context_format_string). (string value)
|
||||||
# Deprecated group/name - [DEFAULT]/log_config
|
# Deprecated group/name - [DEFAULT]/log_config
|
||||||
#log_config_append = <None>
|
#log_config_append = <None>
|
||||||
|
|
||||||
# DEPRECATED. A logging.Formatter log message format string which may
|
|
||||||
# use any of the available logging.LogRecord attributes. This option
|
|
||||||
# is deprecated. Please use logging_context_format_string and
|
|
||||||
# logging_default_format_string instead. This option is ignored if
|
|
||||||
# log_config_append is set. (string value)
|
|
||||||
#log_format = <None>
|
|
||||||
|
|
||||||
# Defines the format string for %%(asctime)s in log records. Default:
|
# Defines the format string for %%(asctime)s in log records. Default:
|
||||||
# %(default)s . This option is ignored if log_config_append is set.
|
# %(default)s . This option is ignored if log_config_append is set.
|
||||||
# (string value)
|
# (string value)
|
||||||
@ -126,15 +120,6 @@
|
|||||||
# log_config_append is set. (boolean value)
|
# log_config_append is set. (boolean value)
|
||||||
#use_syslog = false
|
#use_syslog = false
|
||||||
|
|
||||||
# Enables or disables syslog rfc5424 format for logging. If enabled,
|
|
||||||
# prefixes the MSG part of the syslog message with APP-NAME (RFC5424).
|
|
||||||
# The format without the APP-NAME is deprecated in Kilo, and will be
|
|
||||||
# removed in Mitaka, along with this option. This option is ignored if
|
|
||||||
# log_config_append is set. (boolean value)
|
|
||||||
# This option is deprecated for removal.
|
|
||||||
# Its value may be silently ignored in the future.
|
|
||||||
#use_syslog_rfc_format = true
|
|
||||||
|
|
||||||
# Syslog facility to receive log lines. This option is ignored if
|
# Syslog facility to receive log lines. This option is ignored if
|
||||||
# log_config_append is set. (string value)
|
# log_config_append is set. (string value)
|
||||||
#syslog_log_facility = LOG_USER
|
#syslog_log_facility = LOG_USER
|
||||||
@ -164,7 +149,7 @@
|
|||||||
|
|
||||||
# List of package logging levels in logger=LEVEL pairs. This option is
|
# List of package logging levels in logger=LEVEL pairs. This option is
|
||||||
# ignored if log_config_append is set. (list value)
|
# ignored if log_config_append is set. (list value)
|
||||||
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN,keystoneauth=WARN
|
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN,keystoneauth=WARN,oslo.cache=INFO,dogpile.core.dogpile=INFO
|
||||||
|
|
||||||
# Enables or disables publication of error events. (boolean value)
|
# Enables or disables publication of error events. (boolean value)
|
||||||
#publish_errors = false
|
#publish_errors = false
|
||||||
@ -250,7 +235,7 @@
|
|||||||
# value)
|
# value)
|
||||||
# Deprecated group/name - [DEFAULT]/sql_max_overflow
|
# Deprecated group/name - [DEFAULT]/sql_max_overflow
|
||||||
# Deprecated group/name - [DATABASE]/sqlalchemy_max_overflow
|
# Deprecated group/name - [DATABASE]/sqlalchemy_max_overflow
|
||||||
#max_overflow = <None>
|
#max_overflow = 50
|
||||||
|
|
||||||
# Verbosity of SQL debugging information: 0=None, 100=Everything.
|
# Verbosity of SQL debugging information: 0=None, 100=Everything.
|
||||||
# (integer value)
|
# (integer value)
|
||||||
|
@ -240,7 +240,7 @@ SERVICE_OPTS = [
|
|||||||
default='',
|
default='',
|
||||||
help='Path to SSL key'),
|
help='Path to SSL key'),
|
||||||
cfg.IntOpt('max_concurrency',
|
cfg.IntOpt('max_concurrency',
|
||||||
default=1000,
|
default=1000, min=2,
|
||||||
help='The green thread pool size.'),
|
help='The green thread pool size.'),
|
||||||
cfg.IntOpt('introspection_delay',
|
cfg.IntOpt('introspection_delay',
|
||||||
default=5,
|
default=5,
|
||||||
|
@ -100,9 +100,9 @@ def introspect(uuid, new_ipmi_credentials=None, token=None):
|
|||||||
ironic=ironic)
|
ironic=ironic)
|
||||||
node_info.set_option('new_ipmi_credentials', new_ipmi_credentials)
|
node_info.set_option('new_ipmi_credentials', new_ipmi_credentials)
|
||||||
|
|
||||||
def _handle_exceptions():
|
def _handle_exceptions(fut):
|
||||||
try:
|
try:
|
||||||
_background_introspect(ironic, node_info)
|
fut.result()
|
||||||
except utils.Error as exc:
|
except utils.Error as exc:
|
||||||
# Logging has already happened in Error.__init__
|
# Logging has already happened in Error.__init__
|
||||||
node_info.finished(error=str(exc))
|
node_info.finished(error=str(exc))
|
||||||
@ -111,7 +111,8 @@ def introspect(uuid, new_ipmi_credentials=None, token=None):
|
|||||||
LOG.exception(msg, node_info=node_info)
|
LOG.exception(msg, node_info=node_info)
|
||||||
node_info.finished(error=msg)
|
node_info.finished(error=msg)
|
||||||
|
|
||||||
utils.spawn_n(_handle_exceptions)
|
future = utils.executor().submit(_background_introspect, ironic, node_info)
|
||||||
|
future.add_done_callback(_handle_exceptions)
|
||||||
|
|
||||||
|
|
||||||
def _background_introspect(ironic, node_info):
|
def _background_introspect(ironic, node_info):
|
||||||
@ -196,7 +197,7 @@ def abort(uuid, token=None):
|
|||||||
raise utils.Error(_('Node is locked, please, retry later'),
|
raise utils.Error(_('Node is locked, please, retry later'),
|
||||||
node_info=node_info, code=409)
|
node_info=node_info, code=409)
|
||||||
|
|
||||||
utils.spawn_n(_abort, node_info, ironic)
|
utils.executor().submit(_abort, node_info, ironic)
|
||||||
|
|
||||||
|
|
||||||
def _abort(node_info, ironic):
|
def _abort(node_info, ironic):
|
||||||
|
@ -21,6 +21,7 @@ import ssl
|
|||||||
import sys
|
import sys
|
||||||
|
|
||||||
import flask
|
import flask
|
||||||
|
from futurist import periodics
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
from oslo_utils import uuidutils
|
from oslo_utils import uuidutils
|
||||||
@ -286,26 +287,23 @@ def handle_404(error):
|
|||||||
return error_response(error, code=404)
|
return error_response(error, code=404)
|
||||||
|
|
||||||
|
|
||||||
def periodic_update(period): # pragma: no cover
|
@periodics.periodic(spacing=CONF.firewall.firewall_update_period,
|
||||||
while True:
|
enabled=CONF.firewall.manage_firewall)
|
||||||
LOG.debug('Running periodic update of filters')
|
def periodic_update(): # pragma: no cover
|
||||||
try:
|
try:
|
||||||
firewall.update_filters()
|
firewall.update_filters()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_LE('Periodic update failed'))
|
LOG.exception(_LE('Periodic update of firewall rules failed'))
|
||||||
eventlet.greenthread.sleep(period)
|
|
||||||
|
|
||||||
|
|
||||||
def periodic_clean_up(period): # pragma: no cover
|
@periodics.periodic(spacing=CONF.clean_up_period)
|
||||||
while True:
|
def periodic_clean_up(): # pragma: no cover
|
||||||
LOG.debug('Running periodic clean up of node cache')
|
|
||||||
try:
|
try:
|
||||||
if node_cache.clean_up():
|
if node_cache.clean_up():
|
||||||
firewall.update_filters()
|
firewall.update_filters()
|
||||||
sync_with_ironic()
|
sync_with_ironic()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_LE('Periodic clean up of node cache failed'))
|
LOG.exception(_LE('Periodic clean up of node cache failed'))
|
||||||
eventlet.greenthread.sleep(period)
|
|
||||||
|
|
||||||
|
|
||||||
def sync_with_ironic():
|
def sync_with_ironic():
|
||||||
@ -316,7 +314,12 @@ def sync_with_ironic():
|
|||||||
node_cache.delete_nodes_not_in_list(ironic_node_uuids)
|
node_cache.delete_nodes_not_in_list(ironic_node_uuids)
|
||||||
|
|
||||||
|
|
||||||
|
_PERIODICS_WORKER = None
|
||||||
|
|
||||||
|
|
||||||
def init():
|
def init():
|
||||||
|
global _PERIODICS_WORKER
|
||||||
|
|
||||||
if utils.get_auth_strategy() != 'noauth':
|
if utils.get_auth_strategy() != 'noauth':
|
||||||
utils.add_auth_middleware(app)
|
utils.add_auth_middleware(app)
|
||||||
else:
|
else:
|
||||||
@ -344,14 +347,29 @@ def init():
|
|||||||
|
|
||||||
if CONF.firewall.manage_firewall:
|
if CONF.firewall.manage_firewall:
|
||||||
firewall.init()
|
firewall.init()
|
||||||
period = CONF.firewall.firewall_update_period
|
|
||||||
utils.spawn_n(periodic_update, period)
|
|
||||||
|
|
||||||
if CONF.timeout > 0:
|
_PERIODICS_WORKER = periodics.PeriodicWorker(
|
||||||
period = CONF.clean_up_period
|
callables=[(periodic_update, None, None),
|
||||||
utils.spawn_n(periodic_clean_up, period)
|
(periodic_clean_up, None, None)],
|
||||||
else:
|
executor_factory=periodics.ExistingExecutor(utils.executor()))
|
||||||
LOG.warning(_LW('Timeout is disabled in configuration'))
|
utils.executor().submit(_PERIODICS_WORKER.start)
|
||||||
|
|
||||||
|
|
||||||
|
def shutdown():
|
||||||
|
global _PERIODICS_WORKER
|
||||||
|
LOG.debug('Shutting down')
|
||||||
|
|
||||||
|
firewall.clean_up()
|
||||||
|
|
||||||
|
if _PERIODICS_WORKER is not None:
|
||||||
|
_PERIODICS_WORKER.stop()
|
||||||
|
_PERIODICS_WORKER.wait()
|
||||||
|
_PERIODICS_WORKER = None
|
||||||
|
|
||||||
|
if utils.executor().alive:
|
||||||
|
utils.executor().shutdown(wait=True)
|
||||||
|
|
||||||
|
LOG.info(_LI('Shut down successfully'))
|
||||||
|
|
||||||
|
|
||||||
def create_ssl_context():
|
def create_ssl_context():
|
||||||
@ -416,4 +434,4 @@ def main(args=sys.argv[1:]): # pragma: no cover
|
|||||||
try:
|
try:
|
||||||
app.run(**app_kwargs)
|
app.run(**app_kwargs)
|
||||||
finally:
|
finally:
|
||||||
firewall.clean_up()
|
shutdown()
|
||||||
|
@ -177,14 +177,14 @@ def _process_node(node, introspection_data, node_info):
|
|||||||
if node_info.options.get('new_ipmi_credentials'):
|
if node_info.options.get('new_ipmi_credentials'):
|
||||||
new_username, new_password = (
|
new_username, new_password = (
|
||||||
node_info.options.get('new_ipmi_credentials'))
|
node_info.options.get('new_ipmi_credentials'))
|
||||||
utils.spawn_n(_finish_set_ipmi_credentials,
|
utils.executor().submit(_finish_set_ipmi_credentials,
|
||||||
ironic, node, node_info, introspection_data,
|
ironic, node, node_info, introspection_data,
|
||||||
new_username, new_password)
|
new_username, new_password)
|
||||||
resp['ipmi_setup_credentials'] = True
|
resp['ipmi_setup_credentials'] = True
|
||||||
resp['ipmi_username'] = new_username
|
resp['ipmi_username'] = new_username
|
||||||
resp['ipmi_password'] = new_password
|
resp['ipmi_password'] = new_password
|
||||||
else:
|
else:
|
||||||
utils.spawn_n(_finish, ironic, node_info, introspection_data)
|
utils.executor().submit(_finish, ironic, node_info, introspection_data)
|
||||||
|
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
|
import futurist
|
||||||
import mock
|
import mock
|
||||||
from oslo_concurrency import lockutils
|
from oslo_concurrency import lockutils
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -25,6 +26,7 @@ from ironic_inspector import conf # noqa
|
|||||||
from ironic_inspector import db
|
from ironic_inspector import db
|
||||||
from ironic_inspector import node_cache
|
from ironic_inspector import node_cache
|
||||||
from ironic_inspector.plugins import base as plugins_base
|
from ironic_inspector.plugins import base as plugins_base
|
||||||
|
from ironic_inspector import utils
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
@ -65,6 +67,7 @@ class BaseTest(unittest.TestCase):
|
|||||||
patch.start()
|
patch.start()
|
||||||
# 'p=patch' magic is due to how closures work
|
# 'p=patch' magic is due to how closures work
|
||||||
self.addCleanup(lambda p=patch: p.stop())
|
self.addCleanup(lambda p=patch: p.stop())
|
||||||
|
utils._EXECUTOR = futurist.SynchronousExecutor(green=True)
|
||||||
|
|
||||||
def assertPatchEqual(self, expected, actual):
|
def assertPatchEqual(self, expected, actual):
|
||||||
expected = sorted(expected, key=lambda p: p['path'])
|
expected = sorted(expected, key=lambda p: p['path'])
|
||||||
|
@ -49,8 +49,6 @@ class BaseTest(test_base.NodeTest):
|
|||||||
|
|
||||||
|
|
||||||
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
||||||
@mock.patch.object(utils, 'spawn_n',
|
|
||||||
lambda f, *a, **kw: f(*a, **kw) and None)
|
|
||||||
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
||||||
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
||||||
@mock.patch.object(ir_utils, 'get_client', autospec=True)
|
@mock.patch.object(ir_utils, 'get_client', autospec=True)
|
||||||
@ -334,8 +332,6 @@ class TestIntrospect(BaseTest):
|
|||||||
self.assertEqual(42, introspect._LAST_INTROSPECTION_TIME)
|
self.assertEqual(42, introspect._LAST_INTROSPECTION_TIME)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(utils, 'spawn_n',
|
|
||||||
lambda f, *a, **kw: f(*a, **kw) and None)
|
|
||||||
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
||||||
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
||||||
@mock.patch.object(ir_utils, 'get_client', autospec=True)
|
@mock.patch.object(ir_utils, 'get_client', autospec=True)
|
||||||
@ -419,8 +415,6 @@ class TestSetIpmiCredentials(BaseTest):
|
|||||||
new_ipmi_credentials=self.new_creds)
|
new_ipmi_credentials=self.new_creds)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(utils, 'spawn_n',
|
|
||||||
lambda f, *a, **kw: f(*a, **kw) and None)
|
|
||||||
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
||||||
@mock.patch.object(node_cache, 'get_node', autospec=True)
|
@mock.patch.object(node_cache, 'get_node', autospec=True)
|
||||||
@mock.patch.object(ir_utils, 'get_client', autospec=True)
|
@mock.patch.object(ir_utils, 'get_client', autospec=True)
|
||||||
|
@ -457,31 +457,30 @@ class TestPlugins(unittest.TestCase):
|
|||||||
plugins_base.processing_hooks_manager())
|
plugins_base.processing_hooks_manager())
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(utils, 'spawn_n')
|
|
||||||
@mock.patch.object(firewall, 'init')
|
@mock.patch.object(firewall, 'init')
|
||||||
@mock.patch.object(utils, 'add_auth_middleware')
|
@mock.patch.object(utils, 'add_auth_middleware')
|
||||||
@mock.patch.object(ir_utils, 'get_client')
|
@mock.patch.object(ir_utils, 'get_client')
|
||||||
@mock.patch.object(db, 'init')
|
@mock.patch.object(db, 'init')
|
||||||
class TestInit(test_base.BaseTest):
|
class TestInit(test_base.BaseTest):
|
||||||
|
def setUp(self):
|
||||||
|
super(TestInit, self).setUp()
|
||||||
|
# Tests default to a synchronous executor which can't be used here
|
||||||
|
utils._EXECUTOR = None
|
||||||
|
|
||||||
|
@mock.patch.object(firewall, 'clean_up', lambda: None)
|
||||||
|
def tearDown(self):
|
||||||
|
main.shutdown()
|
||||||
|
|
||||||
def test_ok(self, mock_node_cache, mock_get_client, mock_auth,
|
def test_ok(self, mock_node_cache, mock_get_client, mock_auth,
|
||||||
mock_firewall, mock_spawn_n):
|
mock_firewall):
|
||||||
CONF.set_override('auth_strategy', 'keystone')
|
CONF.set_override('auth_strategy', 'keystone')
|
||||||
main.init()
|
main.init()
|
||||||
mock_auth.assert_called_once_with(main.app)
|
mock_auth.assert_called_once_with(main.app)
|
||||||
mock_node_cache.assert_called_once_with()
|
mock_node_cache.assert_called_once_with()
|
||||||
mock_firewall.assert_called_once_with()
|
mock_firewall.assert_called_once_with()
|
||||||
|
|
||||||
spawn_n_expected_args = [
|
|
||||||
(main.periodic_update, CONF.firewall.firewall_update_period),
|
|
||||||
(main.periodic_clean_up, CONF.clean_up_period)]
|
|
||||||
spawn_n_call_args_list = mock_spawn_n.call_args_list
|
|
||||||
|
|
||||||
for (args, call) in zip(spawn_n_expected_args,
|
|
||||||
spawn_n_call_args_list):
|
|
||||||
self.assertEqual(args, call[0])
|
|
||||||
|
|
||||||
def test_init_without_authenticate(self, mock_node_cache, mock_get_client,
|
def test_init_without_authenticate(self, mock_node_cache, mock_get_client,
|
||||||
mock_auth, mock_firewall, mock_spawn_n):
|
mock_auth, mock_firewall):
|
||||||
CONF.set_override('auth_strategy', 'noauth')
|
CONF.set_override('auth_strategy', 'noauth')
|
||||||
main.init()
|
main.init()
|
||||||
self.assertFalse(mock_auth.called)
|
self.assertFalse(mock_auth.called)
|
||||||
@ -489,7 +488,7 @@ class TestInit(test_base.BaseTest):
|
|||||||
@mock.patch.object(main.LOG, 'warning')
|
@mock.patch.object(main.LOG, 'warning')
|
||||||
def test_init_with_no_data_storage(self, mock_log, mock_node_cache,
|
def test_init_with_no_data_storage(self, mock_log, mock_node_cache,
|
||||||
mock_get_client, mock_auth,
|
mock_get_client, mock_auth,
|
||||||
mock_firewall, mock_spawn_n):
|
mock_firewall):
|
||||||
msg = ('Introspection data will not be stored. Change '
|
msg = ('Introspection data will not be stored. Change '
|
||||||
'"[processing] store_data" option if this is not the '
|
'"[processing] store_data" option if this is not the '
|
||||||
'desired behavior')
|
'desired behavior')
|
||||||
@ -499,7 +498,7 @@ class TestInit(test_base.BaseTest):
|
|||||||
@mock.patch.object(main.LOG, 'info')
|
@mock.patch.object(main.LOG, 'info')
|
||||||
def test_init_with_swift_storage(self, mock_log, mock_node_cache,
|
def test_init_with_swift_storage(self, mock_log, mock_node_cache,
|
||||||
mock_get_client, mock_auth,
|
mock_get_client, mock_auth,
|
||||||
mock_firewall, mock_spawn_n):
|
mock_firewall):
|
||||||
CONF.set_override('store_data', 'swift', 'processing')
|
CONF.set_override('store_data', 'swift', 'processing')
|
||||||
msg = mock.call('Introspection data will be stored in Swift in the '
|
msg = mock.call('Introspection data will be stored in Swift in the '
|
||||||
'container %s', CONF.swift.container)
|
'container %s', CONF.swift.container)
|
||||||
@ -508,33 +507,15 @@ class TestInit(test_base.BaseTest):
|
|||||||
|
|
||||||
def test_init_without_manage_firewall(self, mock_node_cache,
|
def test_init_without_manage_firewall(self, mock_node_cache,
|
||||||
mock_get_client, mock_auth,
|
mock_get_client, mock_auth,
|
||||||
mock_firewall, mock_spawn_n):
|
mock_firewall):
|
||||||
CONF.set_override('manage_firewall', False, 'firewall')
|
CONF.set_override('manage_firewall', False, 'firewall')
|
||||||
main.init()
|
main.init()
|
||||||
self.assertFalse(mock_firewall.called)
|
self.assertFalse(mock_firewall.called)
|
||||||
spawn_n_expected_args = [
|
|
||||||
(main.periodic_clean_up, CONF.clean_up_period)]
|
|
||||||
spawn_n_call_args_list = mock_spawn_n.call_args_list
|
|
||||||
for (args, call) in zip(spawn_n_expected_args,
|
|
||||||
spawn_n_call_args_list):
|
|
||||||
self.assertEqual(args, call[0])
|
|
||||||
|
|
||||||
def test_init_with_timeout_0(self, mock_node_cache, mock_get_client,
|
|
||||||
mock_auth, mock_firewall, mock_spawn_n):
|
|
||||||
CONF.set_override('timeout', 0)
|
|
||||||
main.init()
|
|
||||||
spawn_n_expected_args = [
|
|
||||||
(main.periodic_update, CONF.firewall.firewall_update_period)]
|
|
||||||
spawn_n_call_args_list = mock_spawn_n.call_args_list
|
|
||||||
|
|
||||||
for (args, call) in zip(spawn_n_expected_args,
|
|
||||||
spawn_n_call_args_list):
|
|
||||||
self.assertEqual(args, call[0])
|
|
||||||
|
|
||||||
@mock.patch.object(main.LOG, 'critical')
|
@mock.patch.object(main.LOG, 'critical')
|
||||||
def test_init_failed_processing_hook(self, mock_log, mock_node_cache,
|
def test_init_failed_processing_hook(self, mock_log, mock_node_cache,
|
||||||
mock_get_client, mock_auth,
|
mock_get_client, mock_auth,
|
||||||
mock_firewall, mock_spawn_n):
|
mock_firewall):
|
||||||
CONF.set_override('processing_hooks', 'foo!', 'processing')
|
CONF.set_override('processing_hooks', 'foo!', 'processing')
|
||||||
plugins_base._HOOKS_MGR = None
|
plugins_base._HOOKS_MGR = None
|
||||||
|
|
||||||
|
@ -240,8 +240,6 @@ class TestProcess(BaseTest):
|
|||||||
hook_mock.assert_called_once_with(self.data)
|
hook_mock.assert_called_once_with(self.data)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(utils, 'spawn_n',
|
|
||||||
lambda f, *a: f(*a) and None)
|
|
||||||
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
||||||
@mock.patch.object(example_plugin.ExampleProcessingHook, 'before_update')
|
@mock.patch.object(example_plugin.ExampleProcessingHook, 'before_update')
|
||||||
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
||||||
@ -371,8 +369,7 @@ class TestProcessNode(BaseTest):
|
|||||||
self.node_info.set_option('new_ipmi_credentials', self.new_creds)
|
self.node_info.set_option('new_ipmi_credentials', self.new_creds)
|
||||||
self.cli.node.get_boot_device.side_effect = RuntimeError('boom')
|
self.cli.node.get_boot_device.side_effect = RuntimeError('boom')
|
||||||
|
|
||||||
self.assertRaisesRegexp(utils.Error, 'Failed to validate',
|
self.call()
|
||||||
self.call)
|
|
||||||
|
|
||||||
self.cli.node.update.assert_any_call(self.uuid, self.patch_credentials)
|
self.cli.node.update.assert_any_call(self.uuid, self.patch_credentials)
|
||||||
self.assertEqual(2, self.cli.node.update.call_count)
|
self.assertEqual(2, self.cli.node.update.call_count)
|
||||||
@ -389,8 +386,7 @@ class TestProcessNode(BaseTest):
|
|||||||
post_hook_mock):
|
post_hook_mock):
|
||||||
self.cli.node.set_power_state.side_effect = RuntimeError('boom')
|
self.cli.node.set_power_state.side_effect = RuntimeError('boom')
|
||||||
|
|
||||||
self.assertRaisesRegexp(utils.Error, 'Failed to power off',
|
self.call()
|
||||||
self.call)
|
|
||||||
|
|
||||||
self.cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
self.cli.node.set_power_state.assert_called_once_with(self.uuid, 'off')
|
||||||
self.assertCalledWithPatch(self.patch_props, self.cli.node.update)
|
self.assertCalledWithPatch(self.patch_props, self.cli.node.update)
|
||||||
|
@ -11,8 +11,6 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
from keystonemiddleware import auth_token
|
from keystonemiddleware import auth_token
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
@ -106,27 +104,6 @@ class TestCheckAuth(base.BaseTest):
|
|||||||
utils.check_auth(request)
|
utils.check_auth(request)
|
||||||
|
|
||||||
|
|
||||||
class TestSpawnN(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestSpawnN, self).setUp()
|
|
||||||
utils.GREEN_POOL = None
|
|
||||||
|
|
||||||
@mock.patch('eventlet.greenpool.GreenPool', autospec=True)
|
|
||||||
def test_spawn_n(self, mock_green_pool):
|
|
||||||
greenpool = mock_green_pool.return_value
|
|
||||||
func = lambda x: x
|
|
||||||
|
|
||||||
utils.spawn_n(func, "hello")
|
|
||||||
self.assertEqual(greenpool, utils.GREEN_POOL)
|
|
||||||
greenpool.spawn_n.assert_called_with(func, "hello")
|
|
||||||
|
|
||||||
utils.spawn_n(func, "goodbye")
|
|
||||||
greenpool.spawn_n.assert_called_with(func, "goodbye")
|
|
||||||
|
|
||||||
mock_green_pool.assert_called_once_with(CONF.max_concurrency)
|
|
||||||
|
|
||||||
|
|
||||||
class TestProcessingLogger(base.BaseTest):
|
class TestProcessingLogger(base.BaseTest):
|
||||||
def test_prefix_no_info(self):
|
def test_prefix_no_info(self):
|
||||||
self.assertEqual('[unidentified node]',
|
self.assertEqual('[unidentified node]',
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
import logging as pylog
|
import logging as pylog
|
||||||
import re
|
import re
|
||||||
|
|
||||||
import eventlet
|
import futurist
|
||||||
from keystonemiddleware import auth_token
|
from keystonemiddleware import auth_token
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_log import log
|
||||||
@ -25,7 +25,7 @@ from ironic_inspector import conf # noqa
|
|||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
GREEN_POOL = None
|
_EXECUTOR = None
|
||||||
|
|
||||||
|
|
||||||
def get_ipmi_address_from_data(introspection_data):
|
def get_ipmi_address_from_data(introspection_data):
|
||||||
@ -116,11 +116,13 @@ class NotFoundInCacheError(Error):
|
|||||||
super(NotFoundInCacheError, self).__init__(msg, code)
|
super(NotFoundInCacheError, self).__init__(msg, code)
|
||||||
|
|
||||||
|
|
||||||
def spawn_n(*args, **kwargs):
|
def executor():
|
||||||
global GREEN_POOL
|
"""Return the current futures executor."""
|
||||||
if not GREEN_POOL:
|
global _EXECUTOR
|
||||||
GREEN_POOL = eventlet.greenpool.GreenPool(CONF.max_concurrency)
|
if _EXECUTOR is None:
|
||||||
return GREEN_POOL.spawn_n(*args, **kwargs)
|
_EXECUTOR = futurist.GreenThreadPoolExecutor(
|
||||||
|
max_workers=CONF.max_concurrency)
|
||||||
|
return _EXECUTOR
|
||||||
|
|
||||||
|
|
||||||
def add_auth_middleware(app):
|
def add_auth_middleware(app):
|
||||||
|
5
releasenotes/notes/futurist-557fcd18d4eaf1c1.yaml
Normal file
5
releasenotes/notes/futurist-557fcd18d4eaf1c1.yaml
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
---
|
||||||
|
upgrade:
|
||||||
|
- Minimum possible value for the "max_concurrency" setting is now 2.
|
||||||
|
other:
|
||||||
|
- Switched to Futurist library for asynchronous tasks.
|
@ -5,6 +5,7 @@ alembic>=0.8.0 # MIT
|
|||||||
Babel>=1.3 # BSD
|
Babel>=1.3 # BSD
|
||||||
eventlet!=0.18.3,>=0.18.2 # MIT
|
eventlet!=0.18.3,>=0.18.2 # MIT
|
||||||
Flask<1.0,>=0.10 # BSD
|
Flask<1.0,>=0.10 # BSD
|
||||||
|
futurist>=0.11.0 # Apache-2.0
|
||||||
jsonpath-rw<2.0,>=1.2.0 # Apache-2.0
|
jsonpath-rw<2.0,>=1.2.0 # Apache-2.0
|
||||||
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
|
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
|
||||||
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
|
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
|
||||||
|
Loading…
Reference in New Issue
Block a user