diff --git a/tripleo_ansible/ansible_plugins/modules/os_tripleo_baremetal_node_introspection.py b/tripleo_ansible/ansible_plugins/modules/os_tripleo_baremetal_node_introspection.py index e0ef96a21..9c3d19623 100644 --- a/tripleo_ansible/ansible_plugins/modules/os_tripleo_baremetal_node_introspection.py +++ b/tripleo_ansible/ansible_plugins/modules/os_tripleo_baremetal_node_introspection.py @@ -1,3 +1,4 @@ +#!/usr/bin/python # Copyright (c) 2019 OpenStack Foundation # All Rights Reserved. # @@ -12,18 +13,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -from concurrent import futures -import io -import logging -import yaml - -from ansible.module_utils.basic import AnsibleModule -from ansible.module_utils.openstack import openstack_cloud_from_module -from ansible.module_utils.openstack import openstack_full_argument_spec -from ansible.module_utils.openstack import openstack_module_kwargs - -LOG = logging.getLogger('os_tripleo_baremetal_node_introspection') +__metaclass__ = type ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ['preview'], @@ -137,130 +127,205 @@ EXAMPLES = ''' ''' +import time +import yaml -def _configure_logging(): - log_fmt = ('%(asctime)s %(levelname)s %(name)s: %(message)s') - urllib_level = logging.CRITICAL - - base_level = logging.INFO - - log_stream = io.StringIO() - handler = logging.StreamHandler(log_stream) - logging.basicConfig(level=base_level, format=log_fmt, - handlers=[handler]) - logging.getLogger('urllib3.connectionpool').setLevel(urllib_level) - return log_stream +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.openstack import openstack_full_argument_spec +from ansible.module_utils.openstack import openstack_module_kwargs +from ansible.module_utils.openstack import openstack_cloud_from_module -def introspect(cloud, node_uuids, node_timeout, retry_timeout, max_retries, - concurrency): - result = {} - if not node_uuids: - return result - introspect_jobs = [] +class IntrospectionManagement(object): + def __init__(self, + cloud, + module, + concurrency, + max_retries, + node_timeout, + retry_timeout): + self.client = cloud.baremetal_introspection + self.cloud = cloud + self.module = module + self.concurrency = concurrency + self.max_retries = max_retries + self.node_timeout = node_timeout + self.retry_timeout = retry_timeout - with futures.ThreadPoolExecutor(max_workers=concurrency) as p: - for node_uuid in node_uuids: - introspect_jobs.append(p.submit( - introspect_node, cloud, node_uuid, - node_timeout, retry_timeout, max_retries - )) - for job in futures.as_completed(introspect_jobs): - result[node_uuid] = job.result() - return result - - -def introspect_node(cloud, node_uuid, node_timeout, retry_timeout, - max_retries): - last_error = None - attempt = 0 - - while attempt <= max_retries: - attempt += 1 - - # Attempt cleanup from previous error - if last_error: - LOG.info("Preparing for retry %s for node: %s", attempt, node_uuid) - prepare_for_retry(cloud, node_uuid, node_timeout, retry_timeout) + def log(self, msg): + self.module.log("os_tripleo_baremetal_node_introspection: %s" % msg) + def push_next(self, pool, queue): try: - LOG.info("Introspecting node: %s", node_uuid) + next_introspection = next(queue) + pool.append(next_introspection) + except StopIteration: + pass + return pool - # Start introspection - cloud.baremetal.set_node_provision_state( - node_uuid, 'inspect', wait=True, timeout=node_timeout) + def introspect(self, node_uuids): - # Power off the node - cloud.baremetal.set_node_power_state( - node_uuid, 'power off', wait=True, timeout=node_timeout - ) + result = {} + queue = (NodeIntrospection( + uuid, + self.client, + self.cloud, + self.node_timeout, + self.max_retries, + self.retry_timeout, + self.log) for uuid in node_uuids) + pool = [] - # Wait for the node lock to be released - cloud.baremetal.wait_for_node_reservation( - node_uuid, timeout=node_timeout - ) + for i in range(self.concurrency): + pool = self.push_next(pool, queue) - # Get the introspection data for the result - data = cloud.baremetal_introspection.get_introspection_data( - node_uuid) + while len(pool) > 0: + finished = [] + for intro in pool: + if not intro.started: + try: + intro.start_introspection() + continue + except Exception as e: + self.log("ERROR Node %s can't start introspection" + " because: %s" % (intro.node_id, str(e))) + result[intro.node_id] = { + "error": "Error for introspection node %s: %s " % ( + intro.node_id, str(e)), + "failed": True, + "status": '' + } + finished.append(intro) + continue + status = intro.get_introspection() + if (not status.is_finished and intro.timeouted()) or ( + status.is_finished and status.error is not None + ): + if status.is_finished: + self.log("ERROR Introspection of node %s " + "failed: %s" % ( + status.id, str(status.error)) + ) + if intro.last_retry(): + result[status.id] = (intro.error_msg() + if status.is_finished + else intro.timeout_msg()) + finished.append(intro) + else: + intro.restart_introspection() + if status.is_finished and status.error is None: + result[status.id] = { + 'status': intro.get_introspection_data(), + 'failed': False, + 'error': None} + finished.append(intro) + for i in finished: + pool.remove(i) + pool = self.push_next(pool, queue) + # Let's not DDOS Ironic service + if pool: + time.sleep(min(10, self.node_timeout)) - LOG.info("Introspecting node complete: %s", node_uuid) - # Success - return { - 'status': data, - 'failed': False, - 'error': None - } + return result + + +class NodeIntrospection: + started = False + + def __init__( + self, + node_id, + os_client, + os_cloud, + timeout, + max_retries, + retry_timeout, + log): + self.node_id = node_id + self.os_client = os_client + self.os_cloud = os_cloud + self.timeout = timeout + self.max_retries = max_retries + self.log = log + self.start = int(time.time()) + self.retries = 0 + self.retry_timeout = retry_timeout + self.last_status = None + + def restart_introspection(self): + self.retries += 1 + try: + self.os_client.abort_introspection(self.node_id) except Exception as e: - last_error = str(e) - LOG.error("Introspection of node %s failed on attempt %s: " - "%s", node_uuid, attempt, last_error) + # Node is locked + self.log("ERROR Node %s can't abort introspection: %s" % ( + self.node_id, str(e))) + return + # need to wait before restarting introspection till it's aborted + # to prevent hanging let's use introspect timeout for that + try: + self.os_client.wait_for_introspection( + self.node_id, timeout=self.timeout, ignore_error=True) + # Wait until node is unlocked + self.os_cloud.baremetal.wait_for_node_reservation( + self.node_id, timeout=self.retry_timeout) + except Exception as e: + self.log("ERROR Node %s can't restart introspection because can't " + "abort and unlock it: %s" % (self.node_id, str(e))) + return + self.start = int(time.time()) + return self.start_introspection(restart=True) - message = 'unknown error' - status = '' - # All attempts failed, fetch node to get the reason - try: - node = cloud.baremetal.get_node(node_uuid) - message = node.last_error - status = node.provision_state - except Exception: - if last_error: - # Couldn't fetch the node, use the last exception message instead - message = last_error + def start_introspection(self, restart=False): + self.started = True + if restart: + self.log("INFO Restarting (try %s of %s) introspection of " + "node %s" % ( + self.retries, self.max_retries, self.node_id)) + else: + self.log("INFO Starting introspection of node %s" % (self.node_id)) + return self.os_client.start_introspection(self.node_id) - return { - "error": "Error for introspection node %s on attempt %s: %s " % - (node_uuid, attempt, message), - "failed": True, - "status": status - } + def get_introspection(self): + self.last_status = self.os_client.get_introspection(self.node_id) + return self.last_status + def get_introspection_data(self): + self.log( + "Instrospection of node %s finished successfully!" % self.node_id) + return self.os_client.get_introspection_data(self.node_id) -def prepare_for_retry(cloud, node_uuid, node_timeout, retry_timeout): - # Attempt to abort any existing introspection - try: - cloud.baremetal.set_node_provision_state( - node_uuid, 'abort', wait=True, timeout=node_timeout) - except Exception as e: - LOG.warn("Abort introspection of node %s failed: %s", - node_uuid, str(e)) + def time_elapsed(self): + return int(time.time()) - self.start - # Attempt to power off the node - try: - cloud.baremetal.set_node_power_state( - node_uuid, 'off', wait=True, timeout=node_timeout - ) - except Exception as e: - LOG.warn("Power off of node %s failed: %s", - node_uuid, str(e)) + def timeouted(self): + return self.time_elapsed() > self.timeout - # Wait until node is unlocked - try: - cloud.baremetal.wait_for_node_reservation( - node_uuid, timeout=retry_timeout) - except Exception as e: - LOG.warn("Waiting for node unlock %s failed: %s", - node_uuid, str(e)) + def last_retry(self): + return self.retries >= self.max_retries + + def timeout_msg(self): + self.log( + "ERROR Retry limit %s reached for introspection " + "node %s: exceeded timeout" % ( + self.max_retries, self.node_id)) + return {"error": "Timeout error for introspection node %s: %s " + "sec exceeded max timeout of %s sec" % ( + self.node_id, self.time_elapsed(), self.timeout), + "failed": True, + "status": self.last_status + } + + def error_msg(self): + self.log( + "ERROR Retry limit %s reached for introspection " + "node %s: %s" % ( + self.max_retries, self.node_id, self.last_status.error)) + return {"error": "Error for introspection node %s: %s " % ( + self.node_id, self.last_status.error), + "failed": True, + "status": self.last_status + } def main(): @@ -273,9 +338,6 @@ def main(): supports_check_mode=False, **module_kwargs ) - - log_stream = _configure_logging() - auth_type = module.params.get('auth_type') ironic_url = module.params.get('ironic_url') if auth_type in (None, 'None'): @@ -289,15 +351,16 @@ def main(): _, cloud = openstack_cloud_from_module(module) - result = introspect( + introspector = IntrospectionManagement( cloud, - node_uuids=module.params["node_uuids"], - node_timeout=module.params["node_timeout"], - retry_timeout=module.params["retry_timeout"], - max_retries=module.params["max_retries"], - concurrency=module.params["concurrency"]) + module, + module.params["concurrency"], + module.params["max_retries"], + module.params["node_timeout"], + module.params["retry_timeout"] + ) module_results = {"changed": True} - + result = introspector.introspect(module.params["node_uuids"]) failed_nodes = [k for k, v in result.items() if v['failed']] passed_nodes = [k for k, v in result.items() if not v['failed']] failed = len(failed_nodes) @@ -317,8 +380,7 @@ def main(): "introspection_data": result if not module.params['quiet'] else {}, "failed_nodes": failed_nodes, "passed_nodes": passed_nodes, - "msg": message, - "logging": log_stream.getvalue().split('\n') + "msg": message }) module.exit_json(**module_results)