Make the number of green threads configurable
The patch makes the number of green threads configurable by creating a global eventlet GreenPool, and providing the function utils.spawn_n as a proxy to create threads using the pool. The total number of threads available in the pool is configurable through the new max_threads config option. Change-Id: Id62ca6a2fc342de0514c6a88af8bebbe94d86dbc Closes-Bug: #1419736
This commit is contained in:
parent
be9d533280
commit
44d15cde7c
@ -52,6 +52,9 @@
|
|||||||
# Path to SSL key (string value)
|
# Path to SSL key (string value)
|
||||||
#ssl_key_path =
|
#ssl_key_path =
|
||||||
|
|
||||||
|
# The green thread pool size. (integer value)
|
||||||
|
#max_concurrency = 1000
|
||||||
|
|
||||||
|
|
||||||
[firewall]
|
[firewall]
|
||||||
|
|
||||||
|
@ -178,6 +178,9 @@ SERVICE_OPTS = [
|
|||||||
cfg.StrOpt('ssl_key_path',
|
cfg.StrOpt('ssl_key_path',
|
||||||
default='',
|
default='',
|
||||||
help='Path to SSL key'),
|
help='Path to SSL key'),
|
||||||
|
cfg.IntOpt('max_concurrency',
|
||||||
|
default=1000,
|
||||||
|
help='The green thread pool size.')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@
|
|||||||
import logging
|
import logging
|
||||||
import string
|
import string
|
||||||
|
|
||||||
import eventlet
|
|
||||||
from ironicclient import exceptions
|
from ironicclient import exceptions
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
@ -109,7 +108,7 @@ def introspect(uuid, new_ipmi_credentials=None):
|
|||||||
LOG.exception(msg)
|
LOG.exception(msg)
|
||||||
cached_node.finished(error=msg)
|
cached_node.finished(error=msg)
|
||||||
|
|
||||||
eventlet.greenthread.spawn_n(_handle_exceptions)
|
utils.spawn_n(_handle_exceptions)
|
||||||
|
|
||||||
|
|
||||||
def _background_introspect(ironic, cached_node):
|
def _background_introspect(ironic, cached_node):
|
||||||
|
@ -160,11 +160,11 @@ def init():
|
|||||||
if CONF.firewall.manage_firewall:
|
if CONF.firewall.manage_firewall:
|
||||||
firewall.init()
|
firewall.init()
|
||||||
period = CONF.firewall.firewall_update_period
|
period = CONF.firewall.firewall_update_period
|
||||||
eventlet.greenthread.spawn_n(periodic_update, period)
|
utils.spawn_n(periodic_update, period)
|
||||||
|
|
||||||
if CONF.timeout > 0:
|
if CONF.timeout > 0:
|
||||||
period = CONF.clean_up_period
|
period = CONF.clean_up_period
|
||||||
eventlet.greenthread.spawn_n(periodic_clean_up, period)
|
utils.spawn_n(periodic_clean_up, period)
|
||||||
else:
|
else:
|
||||||
LOG.warning(_LW('Timeout is disabled in configuration'))
|
LOG.warning(_LW('Timeout is disabled in configuration'))
|
||||||
|
|
||||||
|
@ -156,14 +156,14 @@ def _process_node(ironic, node, node_info, cached_node):
|
|||||||
if cached_node.options.get('new_ipmi_credentials'):
|
if cached_node.options.get('new_ipmi_credentials'):
|
||||||
new_username, new_password = (
|
new_username, new_password = (
|
||||||
cached_node.options.get('new_ipmi_credentials'))
|
cached_node.options.get('new_ipmi_credentials'))
|
||||||
eventlet.greenthread.spawn_n(_finish_set_ipmi_credentials,
|
utils.spawn_n(_finish_set_ipmi_credentials,
|
||||||
ironic, node, cached_node, node_info,
|
ironic, node, cached_node, node_info,
|
||||||
new_username, new_password)
|
new_username, new_password)
|
||||||
return {'ipmi_setup_credentials': True,
|
return {'ipmi_setup_credentials': True,
|
||||||
'ipmi_username': new_username,
|
'ipmi_username': new_username,
|
||||||
'ipmi_password': new_password}
|
'ipmi_password': new_password}
|
||||||
else:
|
else:
|
||||||
eventlet.greenthread.spawn_n(_finish, ironic, cached_node)
|
utils.spawn_n(_finish, ironic, cached_node)
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ class BaseTest(test_base.NodeTest):
|
|||||||
|
|
||||||
|
|
||||||
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
||||||
@mock.patch.object(eventlet.greenthread, 'spawn_n',
|
@mock.patch.object(utils, 'spawn_n',
|
||||||
lambda f, *a, **kw: f(*a, **kw) and None)
|
lambda f, *a, **kw: f(*a, **kw) and None)
|
||||||
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
||||||
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
||||||
@ -244,7 +244,7 @@ class TestIntrospect(BaseTest):
|
|||||||
self.assertFalse(add_mock.called)
|
self.assertFalse(add_mock.called)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(eventlet.greenthread, 'spawn_n',
|
@mock.patch.object(utils, 'spawn_n',
|
||||||
lambda f, *a, **kw: f(*a, **kw) and None)
|
lambda f, *a, **kw: f(*a, **kw) and None)
|
||||||
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
@mock.patch.object(firewall, 'update_filters', autospec=True)
|
||||||
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
@mock.patch.object(node_cache, 'add_node', autospec=True)
|
||||||
|
@ -181,7 +181,7 @@ class TestPlugins(unittest.TestCase):
|
|||||||
plugins_base.processing_hooks_manager())
|
plugins_base.processing_hooks_manager())
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(eventlet.greenthread, 'spawn_n')
|
@mock.patch.object(utils, 'spawn_n')
|
||||||
@mock.patch.object(firewall, 'init')
|
@mock.patch.object(firewall, 'init')
|
||||||
@mock.patch.object(utils, 'add_auth_middleware')
|
@mock.patch.object(utils, 'add_auth_middleware')
|
||||||
@mock.patch.object(utils, 'get_client')
|
@mock.patch.object(utils, 'get_client')
|
||||||
|
@ -305,7 +305,7 @@ class TestProcess(BaseTest):
|
|||||||
self.assertFalse(pop_mock.return_value.finished.called)
|
self.assertFalse(pop_mock.return_value.finished.called)
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(eventlet.greenthread, 'spawn_n',
|
@mock.patch.object(utils, 'spawn_n',
|
||||||
lambda f, *a: f(*a) and None)
|
lambda f, *a: f(*a) and None)
|
||||||
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
@mock.patch.object(eventlet.greenthread, 'sleep', lambda _: None)
|
||||||
@mock.patch.object(example_plugin.ExampleProcessingHook, 'before_update')
|
@mock.patch.object(example_plugin.ExampleProcessingHook, 'before_update')
|
||||||
|
@ -16,12 +16,16 @@ import unittest
|
|||||||
import eventlet
|
import eventlet
|
||||||
from ironicclient import exceptions
|
from ironicclient import exceptions
|
||||||
from keystonemiddleware import auth_token
|
from keystonemiddleware import auth_token
|
||||||
import mock
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
from ironic_inspector.test import base
|
from ironic_inspector.test import base
|
||||||
from ironic_inspector import utils
|
from ironic_inspector import utils
|
||||||
|
|
||||||
|
try:
|
||||||
|
from unittest import mock
|
||||||
|
except ImportError:
|
||||||
|
import mock
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
|
||||||
@ -124,3 +128,24 @@ class TestCapabilities(unittest.TestCase):
|
|||||||
output = utils.dict_to_capabilities(capabilities_dict)
|
output = utils.dict_to_capabilities(capabilities_dict)
|
||||||
self.assertIn('cat:meow', output)
|
self.assertIn('cat:meow', output)
|
||||||
self.assertIn('dog:wuff', output)
|
self.assertIn('dog:wuff', output)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSpawnN(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestSpawnN, self).setUp()
|
||||||
|
utils.GREEN_POOL = None
|
||||||
|
|
||||||
|
@mock.patch('eventlet.greenpool.GreenPool', autospec=True)
|
||||||
|
def test_spawn_n(self, mock_green_pool):
|
||||||
|
greenpool = mock_green_pool.return_value
|
||||||
|
func = lambda x: x
|
||||||
|
|
||||||
|
utils.spawn_n(func, "hello")
|
||||||
|
self.assertEqual(greenpool, utils.GREEN_POOL)
|
||||||
|
greenpool.spawn_n.assert_called_with(func, "hello")
|
||||||
|
|
||||||
|
utils.spawn_n(func, "goodbye")
|
||||||
|
greenpool.spawn_n.assert_called_with(func, "goodbye")
|
||||||
|
|
||||||
|
mock_green_pool.assert_called_once_with(CONF.max_concurrency)
|
||||||
|
@ -34,6 +34,8 @@ LOG = logging.getLogger('ironic_inspector.utils')
|
|||||||
RETRY_COUNT = 12
|
RETRY_COUNT = 12
|
||||||
RETRY_DELAY = 5
|
RETRY_DELAY = 5
|
||||||
|
|
||||||
|
GREEN_POOL = None
|
||||||
|
|
||||||
|
|
||||||
class Error(Exception):
|
class Error(Exception):
|
||||||
"""Inspector exception."""
|
"""Inspector exception."""
|
||||||
@ -44,6 +46,13 @@ class Error(Exception):
|
|||||||
self.http_code = code
|
self.http_code = code
|
||||||
|
|
||||||
|
|
||||||
|
def spawn_n(*args, **kwargs):
|
||||||
|
global GREEN_POOL
|
||||||
|
if not GREEN_POOL:
|
||||||
|
GREEN_POOL = eventlet.greenpool.GreenPool(CONF.max_concurrency)
|
||||||
|
return GREEN_POOL.spawn_n(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def get_client(): # pragma: no cover
|
def get_client(): # pragma: no cover
|
||||||
"""Get Ironic client instance."""
|
"""Get Ironic client instance."""
|
||||||
args = dict({'os_password': CONF.ironic.os_password,
|
args = dict({'os_password': CONF.ironic.os_password,
|
||||||
|
Loading…
Reference in New Issue
Block a user