Reimplement os_tripleo_baremetal_node_introspection

Triggering introspection via the introspection API is discouraged when
also using the Ironic API, because Ironic will be out of sync with
introspection state. This is suspected to be causing occasional errors
in CI and with end users.

This module was one of the very first to be converted from mistral, so
hasn't benefited from the concurrency and logging patterns in later
modules.

The internal implementation of this module has been rewritten, and now
does the following:

- Concurrency uses a ThreadPoolExecutor instead of a custom pool with
  async calls
- Waiting for state changes uses existing openstacksdk wait functions
- Logging ends up in a 'logging' result value instead of calling
  module.log which ansible does not pass back to the caller
- Introspection is triggered by setting the node provision state to
  'inspect'
- Node is powered off after a successful introspection

Closes-Bug: 1903786
Change-Id: Ic2c83c2ffb19866f7ad0dcaeb196a16f1745bbbc
This commit is contained in:
Steve Baker 2020-11-12 13:31:59 +13:00
parent 7deb7a7ed4
commit 0ed41aeab1
1 changed files with 132 additions and 194 deletions

View File

@ -1,4 +1,3 @@
#!/usr/bin/python
# Copyright (c) 2019 OpenStack Foundation
# All Rights Reserved.
#
@ -13,7 +12,18 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__metaclass__ = type
from concurrent import futures
import io
import logging
import yaml
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.openstack import openstack_cloud_from_module
from ansible.module_utils.openstack import openstack_full_argument_spec
from ansible.module_utils.openstack import openstack_module_kwargs
LOG = logging.getLogger('os_tripleo_baremetal_node_introspection')
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
@ -127,205 +137,130 @@ EXAMPLES = '''
'''
import time
import yaml
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.openstack import openstack_full_argument_spec
from ansible.module_utils.openstack import openstack_module_kwargs
from ansible.module_utils.openstack import openstack_cloud_from_module
def _configure_logging():
log_fmt = ('%(asctime)s %(levelname)s %(name)s: %(message)s')
urllib_level = logging.CRITICAL
base_level = logging.INFO
log_stream = io.StringIO()
handler = logging.StreamHandler(log_stream)
logging.basicConfig(level=base_level, format=log_fmt,
handlers=[handler])
logging.getLogger('urllib3.connectionpool').setLevel(urllib_level)
return log_stream
class IntrospectionManagement(object):
def __init__(self,
cloud,
module,
concurrency,
max_retries,
node_timeout,
retry_timeout):
self.client = cloud.baremetal_introspection
self.cloud = cloud
self.module = module
self.concurrency = concurrency
self.max_retries = max_retries
self.node_timeout = node_timeout
self.retry_timeout = retry_timeout
def log(self, msg):
self.module.log("os_tripleo_baremetal_node_introspection: %s" % msg)
def push_next(self, pool, queue):
try:
next_introspection = next(queue)
pool.append(next_introspection)
except StopIteration:
pass
return pool
def introspect(self, node_uuids):
result = {}
queue = (NodeIntrospection(
uuid,
self.client,
self.cloud,
self.node_timeout,
self.max_retries,
self.retry_timeout,
self.log) for uuid in node_uuids)
pool = []
for i in range(self.concurrency):
pool = self.push_next(pool, queue)
while len(pool) > 0:
finished = []
for intro in pool:
if not intro.started:
try:
intro.start_introspection()
continue
except Exception as e:
self.log("ERROR Node %s can't start introspection"
" because: %s" % (intro.node_id, str(e)))
result[intro.node_id] = {
"error": "Error for introspection node %s: %s " % (
intro.node_id, str(e)),
"failed": True,
"status": ''
}
finished.append(intro)
continue
status = intro.get_introspection()
if (not status.is_finished and intro.timeouted()) or (
status.is_finished and status.error is not None
):
if status.is_finished:
self.log("ERROR Introspection of node %s "
"failed: %s" % (
status.id, str(status.error))
)
if intro.last_retry():
result[status.id] = (intro.error_msg()
if status.is_finished
else intro.timeout_msg())
finished.append(intro)
else:
intro.restart_introspection()
if status.is_finished and status.error is None:
result[status.id] = {
'status': intro.get_introspection_data(),
'failed': False,
'error': None}
finished.append(intro)
for i in finished:
pool.remove(i)
pool = self.push_next(pool, queue)
# Let's not DDOS Ironic service
if pool:
time.sleep(min(10, self.node_timeout))
def introspect(cloud, node_uuids, node_timeout, retry_timeout, max_retries,
concurrency):
result = {}
if not node_uuids:
return result
introspect_jobs = []
with futures.ThreadPoolExecutor(max_workers=concurrency) as p:
for node_uuid in node_uuids:
introspect_jobs.append(p.submit(
introspect_node, cloud, node_uuid,
node_timeout, retry_timeout, max_retries
))
for job in futures.as_completed(introspect_jobs):
result[node_uuid] = job.result()
return result
class NodeIntrospection:
started = False
def introspect_node(cloud, node_uuid, node_timeout, retry_timeout,
max_retries):
last_error = None
attempt = 0
def __init__(
self,
node_id,
os_client,
os_cloud,
timeout,
max_retries,
retry_timeout,
log):
self.node_id = node_id
self.os_client = os_client
self.os_cloud = os_cloud
self.timeout = timeout
self.max_retries = max_retries
self.log = log
self.start = int(time.time())
self.retries = 0
self.retry_timeout = retry_timeout
self.last_status = None
while attempt <= max_retries:
attempt += 1
# Attempt cleanup from previous error
if last_error:
LOG.info("Preparing for retry %s for node: %s", attempt, node_uuid)
prepare_for_retry(cloud, node_uuid, node_timeout, retry_timeout)
def restart_introspection(self):
self.retries += 1
try:
self.os_client.abort_introspection(self.node_id)
LOG.info("Introspecting node: %s", node_uuid)
# Start introspection
cloud.baremetal.set_node_provision_state(
node_uuid, 'inspect', wait=True, timeout=node_timeout)
# Power off the node
cloud.baremetal.set_node_power_state(
node_uuid, 'power off', wait=True, timeout=node_timeout
)
# Wait for the node lock to be released
cloud.baremetal.wait_for_node_reservation(
node_uuid, timeout=node_timeout
)
# Get the introspection data for the result
data = cloud.baremetal_introspection.get_introspection_data(
node_uuid)
LOG.info("Introspecting node complete: %s", node_uuid)
# Success
return {
'status': data,
'failed': False,
'error': None
}
except Exception as e:
# Node is locked
self.log("ERROR Node %s can't abort introspection: %s" % (
self.node_id, str(e)))
return
# need to wait before restarting introspection till it's aborted
# to prevent hanging let's use introspect timeout for that
try:
self.os_client.wait_for_introspection(
self.node_id, timeout=self.timeout, ignore_error=True)
# Wait until node is unlocked
self.os_cloud.baremetal.wait_for_node_reservation(
self.node_id, timeout=self.retry_timeout)
except Exception as e:
self.log("ERROR Node %s can't restart introspection because can't "
"abort and unlock it: %s" % (self.node_id, str(e)))
return
self.start = int(time.time())
return self.start_introspection(restart=True)
last_error = str(e)
LOG.error("Introspection of node %s failed on attempt %s: "
"%s", node_uuid, attempt, last_error)
def start_introspection(self, restart=False):
self.started = True
if restart:
self.log("INFO Restarting (try %s of %s) introspection of "
"node %s" % (
self.retries, self.max_retries, self.node_id))
else:
self.log("INFO Starting introspection of node %s" % (self.node_id))
return self.os_client.start_introspection(self.node_id)
message = 'unknown error'
status = ''
# All attempts failed, fetch node to get the reason
try:
node = cloud.baremetal.get_node(node_uuid)
message = node.last_error
status = node.provision_state
except Exception:
if last_error:
# Couldn't fetch the node, use the last exception message instead
message = last_error
def get_introspection(self):
self.last_status = self.os_client.get_introspection(self.node_id)
return self.last_status
return {
"error": "Error for introspection node %s on attempt %s: %s " %
(node_uuid, attempt, message),
"failed": True,
"status": status
}
def get_introspection_data(self):
self.log(
"Instrospection of node %s finished successfully!" % self.node_id)
return self.os_client.get_introspection_data(self.node_id)
def time_elapsed(self):
return int(time.time()) - self.start
def prepare_for_retry(cloud, node_uuid, node_timeout, retry_timeout):
# Attempt to abort any existing introspection
try:
cloud.baremetal.set_node_provision_state(
node_uuid, 'abort', wait=True, timeout=node_timeout)
except Exception as e:
LOG.warn("Abort introspection of node %s failed: %s",
node_uuid, str(e))
def timeouted(self):
return self.time_elapsed() > self.timeout
# Attempt to power off the node
try:
cloud.baremetal.set_node_power_state(
node_uuid, 'off', wait=True, timeout=node_timeout
)
except Exception as e:
LOG.warn("Power off of node %s failed: %s",
node_uuid, str(e))
def last_retry(self):
return self.retries >= self.max_retries
def timeout_msg(self):
self.log(
"ERROR Retry limit %s reached for introspection "
"node %s: exceeded timeout" % (
self.max_retries, self.node_id))
return {"error": "Timeout error for introspection node %s: %s "
"sec exceeded max timeout of %s sec" % (
self.node_id, self.time_elapsed(), self.timeout),
"failed": True,
"status": self.last_status
}
def error_msg(self):
self.log(
"ERROR Retry limit %s reached for introspection "
"node %s: %s" % (
self.max_retries, self.node_id, self.last_status.error))
return {"error": "Error for introspection node %s: %s " % (
self.node_id, self.last_status.error),
"failed": True,
"status": self.last_status
}
# Wait until node is unlocked
try:
cloud.baremetal.wait_for_node_reservation(
node_uuid, timeout=retry_timeout)
except Exception as e:
LOG.warn("Waiting for node unlock %s failed: %s",
node_uuid, str(e))
def main():
@ -338,6 +273,9 @@ def main():
supports_check_mode=False,
**module_kwargs
)
log_stream = _configure_logging()
auth_type = module.params.get('auth_type')
ironic_url = module.params.get('ironic_url')
if auth_type in (None, 'None'):
@ -351,16 +289,15 @@ def main():
_, cloud = openstack_cloud_from_module(module)
introspector = IntrospectionManagement(
result = introspect(
cloud,
module,
module.params["concurrency"],
module.params["max_retries"],
module.params["node_timeout"],
module.params["retry_timeout"]
)
node_uuids=module.params["node_uuids"],
node_timeout=module.params["node_timeout"],
retry_timeout=module.params["retry_timeout"],
max_retries=module.params["max_retries"],
concurrency=module.params["concurrency"])
module_results = {"changed": True}
result = introspector.introspect(module.params["node_uuids"])
failed_nodes = [k for k, v in result.items() if v['failed']]
passed_nodes = [k for k, v in result.items() if not v['failed']]
failed = len(failed_nodes)
@ -380,7 +317,8 @@ def main():
"introspection_data": result if not module.params['quiet'] else {},
"failed_nodes": failed_nodes,
"passed_nodes": passed_nodes,
"msg": message
"msg": message,
"logging": log_stream.getvalue().split('\n')
})
module.exit_json(**module_results)