Add worker threads limit to _check_deploy_timeouts task

New nodes filter parameters added to db.sqlalchemy.api. This
parameters used in _check_deploy_timeouts periodic task for
limit of worker threads which can be started simultaneously.
New config option 'periodic_max_workers' added to conductor
group.

Closes-Bug: #1285793
Change-Id: I646540e334dec05682640c05cedb315e0ee355bc
This commit is contained in:
Yuriy Zveryanskyy 2014-03-06 18:00:41 +02:00
parent 9ff949c8c1
commit d473bdabfd
7 changed files with 121 additions and 29 deletions

View File

@ -508,6 +508,11 @@
# the node power state in DB (integer value)
#power_state_sync_max_retries=3
# Maximum number of worker threads that can be started
# simultaneously by a periodic task. Should be less than RPC
# thread pool size. (integer value)
#periodic_max_workers=8
[database]

View File

@ -42,7 +42,6 @@ a change, etc.
"""
import collections
import datetime
import eventlet
from eventlet import greenpool
@ -64,7 +63,6 @@ from ironic.openstack.common import lockutils
from ironic.openstack.common import log
from ironic.openstack.common import periodic_task
from ironic.openstack.common.rpc import common as messaging
from ironic.openstack.common import timeutils
MANAGER_TOPIC = 'ironic.conductor_manager'
WORKER_SPAWN_lOCK = "conductor_worker_spawn"
@ -108,6 +106,11 @@ conductor_opts = [
'number of times Ironic should try syncing the '
'hardware node power state with the node power state '
'in DB'),
cfg.IntOpt('periodic_max_workers',
default=8,
help='Maximum number of worker threads that can be started '
'simultaneously by a periodic task. Should be less '
'than RPC thread pool size.'),
]
CONF = cfg.CONF
@ -628,39 +631,35 @@ class ConductorManager(service.PeriodicService):
if not CONF.conductor.deploy_callback_timeout:
return
filters = {'reserved': False, 'maintenance': False}
columns = ['uuid', 'driver', 'provision_state', 'provision_updated_at']
node_list = self.dbapi.get_nodeinfo_list(columns=columns,
filters=filters)
filters = {'reserved': False, 'provision_state': states.DEPLOYWAIT,
'provisioned_before': CONF.conductor.deploy_callback_timeout}
columns = ['uuid', 'driver']
node_list = self.dbapi.get_nodeinfo_list(
columns=columns,
filters=filters,
sort_key='provision_updated_at',
sort_dir='asc')
for (node_uuid, driver, state, update_time) in node_list:
workers_count = 0
for node_uuid, driver in node_list:
if not self._mapped_to_this_conductor(node_uuid, driver):
continue
if state == states.DEPLOYWAIT:
limit = (timeutils.utcnow() - datetime.timedelta(
seconds=CONF.conductor.deploy_callback_timeout))
if timeutils.normalize_time(update_time) <= limit:
try:
task = task_manager.TaskManager(context, node_uuid)
except (exception.NodeLocked, exception.NodeNotFound):
continue
try:
task = task_manager.TaskManager(context, node_uuid)
except (exception.NodeLocked, exception.NodeNotFound):
continue
node = task.node
node.provision_state = states.DEPLOYFAIL
node.target_provision_state = states.NOSTATE
msg = (_('Timeout reached when waiting callback for '
'node %s') % node_uuid)
node.last_error = msg
LOG.error(msg)
node.save(task.context)
try:
thread = self._spawn_worker(utils.cleanup_after_timeout, task)
thread.link(lambda t: task.release_resources())
except exception.NoFreeConductorWorker:
task.release_resources()
break
try:
thread = self._spawn_worker(
utils.cleanup_after_timeout, task)
thread.link(lambda t: task.release_resources())
except exception.NoFreeConductorWorker:
task.release_resources()
workers_count += 1
if workers_count == CONF.conductor.periodic_max_workers:
break
def rebalance_node_ring(self):
"""Perform any actions necessary when rebalancing the consistent hash.

View File

@ -126,6 +126,14 @@ def cleanup_after_timeout(task):
"""
node = task.node
context = task.context
node.provision_state = states.DEPLOYFAIL
node.target_provision_state = states.NOSTATE
msg = (_('Timeout reached while waiting for callback for node %s')
% node.uuid)
node.last_error = msg
LOG.error(msg)
node.save(context)
error_msg = _('Cleanup failed for node %(node)s after deploy timeout: '
' %(error)s')
try:

View File

@ -60,6 +60,9 @@ class Connection(object):
'maintenance': True | False
'chassis_uuid': uuid of chassis
'driver': driver's name
'provision_state': provision state of node
'provisioned_before': nodes with provision_updated_at
field before this interval in seconds
:param limit: Maximum number of nodes to return.
:param marker: the last item of the previous page; we return the next
result set.
@ -80,6 +83,9 @@ class Connection(object):
'maintenance': True | False
'chassis_uuid': uuid of chassis
'driver': driver's name
'provision_state': provision state of node
'provisioned_before': nodes with provision_updated_at
field before this interval in seconds
:param limit: Maximum number of nodes to return.
:param marker: the last item of the previous page; we return the next
result set.

View File

@ -229,6 +229,12 @@ class Connection(api.Connection):
query = query.filter_by(maintenance=filters['maintenance'])
if 'driver' in filters:
query = query.filter_by(driver=filters['driver'])
if 'provision_state' in filters:
query = query.filter_by(provision_state=filters['provision_state'])
if 'provisioned_before' in filters:
limit = timeutils.utcnow() - datetime.timedelta(
seconds=filters['provisioned_before'])
query = query.filter(models.Node.provision_updated_at < limit)
return query

View File

@ -953,6 +953,45 @@ class ManagerTestCase(tests_db_base.DbTestCase):
self.service._worker_pool.waitall()
spawn_mock.assert_called_once_with(mock.ANY, mock.ANY, mock.ANY)
@mock.patch.object(timeutils, 'utcnow')
def test__check_deploy_timeouts_limit(self, mock_utcnow):
self.config(deploy_callback_timeout=60, group='conductor')
self.config(periodic_max_workers=2, group='conductor')
past = datetime.datetime(2000, 1, 1, 0, 0)
present = past + datetime.timedelta(minutes=10)
mock_utcnow.return_value = past
self._start_service()
test_nodes = []
for i in range(3):
next = past + datetime.timedelta(minutes=i)
n = utils.get_test_node(provision_state=states.DEPLOYWAIT,
target_provision_state=states.DEPLOYDONE,
provision_updated_at=next,
uuid=ironic_utils.generate_uuid())
del n['id']
node = self.dbapi.create_node(n)
test_nodes.append(node)
mock_utcnow.return_value = present
self.service._conductor_service_record_keepalive(self.context)
with mock.patch.object(self.driver.deploy, 'clean_up') as clean_mock:
self.service._check_deploy_timeouts(self.context)
self.service._worker_pool.waitall()
for node in test_nodes[:-1]:
node.refresh(self.context)
self.assertEqual(states.DEPLOYFAIL, node.provision_state)
self.assertEqual(states.NOSTATE, node.target_provision_state)
self.assertIsNotNone(node.last_error)
last_node = test_nodes[2]
last_node.refresh(self.context)
self.assertEqual(states.DEPLOYWAIT, last_node.provision_state)
self.assertEqual(states.DEPLOYDONE,
last_node.target_provision_state)
self.assertIsNone(last_node.last_error)
self.assertEqual(2, clean_mock.call_count)
def test_set_console_mode_enabled(self):
ndict = utils.get_test_node(driver='fake')
node = self.dbapi.create_node(ndict)

View File

@ -21,6 +21,7 @@ import mock
import six
from ironic.common import exception
from ironic.common import states
from ironic.common import utils as ironic_utils
from ironic.db import api as dbapi
from ironic.openstack.common import timeutils
@ -152,6 +153,34 @@ class DbNodeTestCase(base.DbTestCase):
res = self.dbapi.get_node_list(filters={'maintenance': False})
self.assertEqual([1], [r.id for r in res])
@mock.patch.object(timeutils, 'utcnow')
def test_get_nodeinfo_list_provision(self, mock_utcnow):
past = datetime.datetime(2000, 1, 1, 0, 0)
next = past + datetime.timedelta(minutes=8)
present = past + datetime.timedelta(minutes=10)
mock_utcnow.return_value = past
# node with provision_updated timeout
n1 = utils.get_test_node(id=1, uuid=ironic_utils.generate_uuid(),
provision_updated_at=past)
# node with None in provision_updated_at
n2 = utils.get_test_node(id=2, uuid=ironic_utils.generate_uuid(),
provision_state=states.DEPLOYWAIT)
# node without timeout
n3 = utils.get_test_node(id=3, uuid=ironic_utils.generate_uuid(),
provision_updated_at=next)
self.dbapi.create_node(n1)
self.dbapi.create_node(n2)
self.dbapi.create_node(n3)
mock_utcnow.return_value = present
res = self.dbapi.get_nodeinfo_list(filters={'provisioned_before': 300})
self.assertEqual([1], [r[0] for r in res])
res = self.dbapi.get_nodeinfo_list(filters={'provision_state':
states.DEPLOYWAIT})
self.assertEqual([2], [r[0] for r in res])
def test_get_node_list(self):
uuids = []
for i in range(1, 6):