Implement concurrency in baremetal cleaning

This change implements the concurrency functionality in the os_baremetal_clean_node
module. The module will now process all nodes provided concurently limiting threads
to the number of nodes or the option `concurrency` which has a default of 20. If the
number of nodes less that value of `concurrency` the thread pool will be limited
to the number of nodes.

Change-Id: I07f3c731c79cfe9bac8652f082e291832dc9b351
Signed-off-by: Kevin Carter <kecarter@redhat.com>
This commit is contained in:
Kevin Carter 2020-02-25 10:34:58 -06:00
parent e6a46eb917
commit b595bacddb
No known key found for this signature in database
GPG Key ID: CE94BD890A47B20A
2 changed files with 98 additions and 50 deletions

View File

@ -69,6 +69,18 @@ options:
- Don't provide cleaned nodes info in output of the module - Don't provide cleaned nodes info in output of the module
type: bool type: bool
default: False default: False
max_retries:
description:
- Number of attempts before failing.
type: int
required: False
default: 0
concurrency:
description:
- Max level of concurrency.
type: int
required: False
default: 20
requirements: ["openstacksdk"] requirements: ["openstacksdk"]
''' '''
@ -367,9 +379,11 @@ EXAMPLES = '''
checksum: "a94e683ea16d9ae44768f0a65942234d" checksum: "a94e683ea16d9ae44768f0a65942234d"
component: "ilo" component: "ilo"
''' '''
import time
import yaml import yaml
from concurrent import futures
from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.openstack import (openstack_full_argument_spec, from ansible.module_utils.openstack import (openstack_full_argument_spec,
openstack_module_kwargs, openstack_module_kwargs,
@ -382,55 +396,85 @@ def parallel_nodes_cleaning(conn, module):
nodes = module.params['node_uuid'] + module.params['node_name'] nodes = module.params['node_uuid'] + module.params['node_name']
clean_steps = module.params['clean_steps'] clean_steps = module.params['clean_steps']
result = {} result = {}
nodes_wait = nodes[:] workers = min(len(nodes), module.params['concurrency'])
for node in nodes: with futures.ThreadPoolExecutor(max_workers=workers) as executor:
try: future_to_build = {
client.set_node_provision_state( executor.submit(
client.set_node_provision_state,
node, node,
"clean", "clean",
clean_steps=clean_steps, clean_steps=clean_steps,
wait=False) wait=True
except Exception as e: ): node for node in nodes
nodes_wait.remove(node) }
done, not_done = futures.wait(
future_to_build,
timeout=node_timeout,
return_when=futures.ALL_COMPLETED
)
nodes_wait = list()
for job in done:
if job._exception:
result.update(
{
future_to_build[job]: {
'msg': 'Cleaning failed for node {}: {}'.format(
future_to_build[job],
str(job._exception)
),
'failed': True,
'info': {}
}
}
)
else:
nodes_wait.append(future_to_build[job])
else:
if not_done:
for job in not_done:
result.update(
{
future_to_build[job]: {
'msg': 'Cleaning incomplete for node {}'.format(
future_to_build[job],
),
'failed': True,
'info': {}
}
}
)
nodes_to_delete = []
for node in nodes_wait:
node_info = client.get_node(
node,
fields=['provision_state', 'last_error']
).to_dict()
state = node_info['provision_state']
if state == 'manageable':
nodes_to_delete.append(node)
result.update({node: { result.update({node: {
'msg': 'Can not start cleaning for node %s: %s' % ( 'msg': 'Successful cleaning for node %s' % node,
node, str(e)), 'failed': False,
'failed': True, 'error': '',
'info': {} 'info': node_info,
}}) }})
start = time.time() elif state not in [
while nodes_wait and time.time() - start < node_timeout: 'manageable', 'cleaning', 'clean wait', 'available']:
nodes_to_delete = [] nodes_to_delete.append(node)
for node in nodes_wait: result.update({node: {
node_info = client.get_node( 'msg': 'Failed cleaning for node %s: %s' % (
node, node,
fields=['provision_state', 'last_error'] node_info['last_error'] or 'state %s' % state),
).to_dict() 'failed': True,
state = node_info['provision_state'] 'info': node_info,
if state == 'manageable': }})
nodes_to_delete.append(node)
result.update({node: { for node in nodes_to_delete:
'msg': 'Successful cleaning for node %s' % node, nodes_wait.remove(node)
'failed': False,
'error': '',
'info': node_info,
}})
elif state not in [
'manageable', 'cleaning', 'clean wait', 'available']:
nodes_to_delete.append(node)
result.update({node: {
'msg': 'Failed cleaning for node %s: %s' % (
node,
node_info['last_error'] or 'state %s' % state),
'failed': True,
'info': node_info,
}})
# timeout between get_node() calls
time.sleep(1)
for node in nodes_to_delete:
nodes_wait.remove(node)
# timeout between cycles
time.sleep(5)
if nodes_wait: if nodes_wait:
for node in nodes_wait: for node in nodes_wait:
node_info = client.get_node( node_info = client.get_node(
@ -444,8 +488,7 @@ def parallel_nodes_cleaning(conn, module):
'failed': True, 'failed': True,
'info': node_info, 'info': node_info,
}}) }})
# timeout between get_node() calls
time.sleep(1)
return result return result

View File

@ -46,6 +46,8 @@
node_uuids_clean: "{{ node_uuids }}" node_uuids_clean: "{{ node_uuids }}"
- name: exit if nothing to do - name: exit if nothing to do
when:
- (node_uuids_clean | length) < 1
block: block:
- name: Notice - name: Notice
debug: debug:
@ -53,15 +55,18 @@
- name: end play - name: end play
meta: end_play meta: end_play
when:
- (node_uuids_clean | length) < 1 - name: Notice
debug:
msg: >-
Running cleaning on the following nodes, {{ node_uuids_clean }}.
# Clean nodes # Clean nodes
- name: Start baremetal cleaning - name: Start baremetal cleaning
os_baremetal_clean_node: os_baremetal_clean_node:
cloud: undercloud cloud: undercloud
node_uuid: "{{ node_uuids_clean }}" node_uuid: "{{ node_uuids_clean }}"
# concurrency: "{{ concurrency }}" NotImplemented concurrency: "{{ concurrency }}"
# max_retries: "{{ max_retries }}" NotImplemented # max_retries: "{{ max_retries }}" NotImplemented
timeout: "{{ node_timeout }}" timeout: "{{ node_timeout }}"
clean_steps: clean_steps: