Split deployment-related functions from manager.py into a new module

With more than 4000 lines of code (more than 8000 of unit tests) the
current manager.py is barely manageable. In preparation for the new
deployment API, this change moves the free-standing deploy-related
functions to utils.py and the new deployments.py.

Change-Id: Ic3f369a7fa72d09263b0670ae78980913b024962
Story: #2006910
This commit is contained in:
Dmitry Tantsur 2020-02-06 13:03:57 +01:00
parent 990c17e356
commit d949318409
6 changed files with 1605 additions and 1513 deletions

View File

@ -0,0 +1,402 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Functionality related to deploying and undeploying."""
import tempfile
from ironic_lib import metrics_utils
from oslo_db import exception as db_exception
from oslo_log import log
from oslo_utils import excutils
from oslo_utils import versionutils
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import release_mappings as versions
from ironic.common import states
from ironic.common import swift
from ironic.conductor import notification_utils as notify_utils
from ironic.conductor import steps as conductor_steps
from ironic.conductor import task_manager
from ironic.conductor import utils
from ironic.conf import CONF
from ironic.objects import fields
LOG = log.getLogger(__name__)
METRICS = metrics_utils.get_metrics_logger(__name__)
# NOTE(rloo) This is used to keep track of deprecation warnings that have
# already been issued for deploy drivers that do not use deploy steps.
_SEEN_NO_DEPLOY_STEP_DEPRECATIONS = set()
@METRICS.timer('do_node_deploy')
@task_manager.require_exclusive_lock
def do_node_deploy(task, conductor_id=None, configdrive=None):
"""Prepare the environment and deploy a node."""
node = task.node
try:
if configdrive:
if isinstance(configdrive, dict):
configdrive = utils.build_configdrive(node, configdrive)
_store_configdrive(node, configdrive)
except (exception.SwiftOperationError, exception.ConfigInvalid) as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Error while uploading the configdrive for %(node)s '
'to Swift') % {'node': node.uuid},
_('Failed to upload the configdrive to Swift. '
'Error: %s') % e,
clean_up=False)
except db_exception.DBDataError as e:
with excutils.save_and_reraise_exception():
# NOTE(hshiina): This error happens when the configdrive is
# too large. Remove the configdrive from the
# object to update DB successfully in handling
# the failure.
node.obj_reset_changes()
utils.deploying_error_handler(
task,
('Error while storing the configdrive for %(node)s into '
'the database: %(err)s') % {'node': node.uuid, 'err': e},
_("Failed to store the configdrive in the database. "
"%s") % e,
clean_up=False)
except Exception as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Unexpected error while preparing the configdrive for '
'node %(node)s') % {'node': node.uuid},
_("Failed to prepare the configdrive. Exception: %s") % e,
traceback=True, clean_up=False)
try:
task.driver.deploy.prepare(task)
except exception.IronicException as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Error while preparing to deploy to node %(node)s: '
'%(err)s') % {'node': node.uuid, 'err': e},
_("Failed to prepare to deploy: %s") % e,
clean_up=False)
except Exception as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Unexpected error while preparing to deploy to node '
'%(node)s') % {'node': node.uuid},
_("Failed to prepare to deploy. Exception: %s") % e,
traceback=True, clean_up=False)
try:
# This gets the deploy steps (if any) and puts them in the node's
# driver_internal_info['deploy_steps'].
conductor_steps.set_node_deployment_steps(task)
except exception.InstanceDeployFailure as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
'Error while getting deploy steps; cannot deploy to node '
'%(node)s. Error: %(err)s' % {'node': node.uuid, 'err': e},
_("Cannot get deploy steps; failed to deploy: %s") % e)
steps = node.driver_internal_info.get('deploy_steps', [])
new_rpc_version = True
release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
if release_ver:
new_rpc_version = versionutils.is_compatible('1.45',
release_ver['rpc'])
if not steps or not new_rpc_version:
# TODO(rloo): This if.. (and the above code wrt rpc version)
# can be deleted after the deprecation period when we no
# longer support drivers with no deploy steps.
# Note that after the deprecation period, there needs to be at least
# one deploy step. If none, the deployment fails.
if steps:
info = node.driver_internal_info
info.pop('deploy_steps')
node.driver_internal_info = info
node.save()
# We go back to using the old way, if:
# - out-of-tree driver hasn't yet converted to using deploy steps, or
# - we're in the middle of a rolling upgrade. This is to prevent the
# corner case of having new conductors with old conductors, and
# a node is deployed with a new conductor (via deploy steps), but
# after the deploy_wait, the node gets handled by an old conductor.
# To avoid this, we need to wait until all the conductors are new,
# signalled by the RPC API version being '1.45'.
_old_rest_of_do_node_deploy(task, conductor_id, not steps)
else:
do_next_deploy_step(task, 0, conductor_id)
def _old_rest_of_do_node_deploy(task, conductor_id, no_deploy_steps):
"""The rest of the do_node_deploy() if not using deploy steps.
To support out-of-tree drivers that have not yet migrated to using
deploy steps.
:param no_deploy_steps: Boolean; True if there are no deploy steps.
"""
# TODO(rloo): This method can be deleted after the deprecation period
# for supporting drivers with no deploy steps.
if no_deploy_steps:
deploy_driver_name = task.driver.deploy.__class__.__name__
if deploy_driver_name not in _SEEN_NO_DEPLOY_STEP_DEPRECATIONS:
LOG.warning('Deploy driver %s does not support deploy steps; this '
'will be required starting with the Stein release.',
deploy_driver_name)
_SEEN_NO_DEPLOY_STEP_DEPRECATIONS.add(deploy_driver_name)
node = task.node
try:
new_state = task.driver.deploy.deploy(task)
except exception.IronicException as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Error in deploy of node %(node)s: %(err)s' %
{'node': node.uuid, 'err': e}),
_("Failed to deploy: %s") % e)
except Exception as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Unexpected error while deploying node %(node)s' %
{'node': node.uuid}),
_("Failed to deploy. Exception: %s") % e,
traceback=True)
# Update conductor_affinity to reference this conductor's ID
# since there may be local persistent state
node.conductor_affinity = conductor_id
# NOTE(deva): Some drivers may return states.DEPLOYWAIT
# eg. if they are waiting for a callback
if new_state == states.DEPLOYDONE:
_start_console_in_deploy(task)
task.process_event('done')
LOG.info('Successfully deployed node %(node)s with '
'instance %(instance)s.',
{'node': node.uuid, 'instance': node.instance_uuid})
elif new_state == states.DEPLOYWAIT:
task.process_event('wait')
else:
LOG.error('Unexpected state %(state)s returned while '
'deploying node %(node)s.',
{'state': new_state, 'node': node.uuid})
node.save()
@task_manager.require_exclusive_lock
def do_next_deploy_step(task, step_index, conductor_id):
"""Do deployment, starting from the specified deploy step.
:param task: a TaskManager instance with an exclusive lock
:param step_index: The first deploy step in the list to execute. This
is the index (from 0) into the list of deploy steps in the node's
driver_internal_info['deploy_steps']. Is None if there are no steps
to execute.
"""
node = task.node
if step_index is None:
steps = []
else:
steps = node.driver_internal_info['deploy_steps'][step_index:]
LOG.info('Executing %(state)s on node %(node)s, remaining steps: '
'%(steps)s', {'node': node.uuid, 'steps': steps,
'state': node.provision_state})
# Execute each step until we hit an async step or run out of steps
for ind, step in enumerate(steps):
# Save which step we're about to start so we can restart
# if necessary
node.deploy_step = step
driver_internal_info = node.driver_internal_info
driver_internal_info['deploy_step_index'] = step_index + ind
node.driver_internal_info = driver_internal_info
node.save()
interface = getattr(task.driver, step.get('interface'))
LOG.info('Executing %(step)s on node %(node)s',
{'step': step, 'node': node.uuid})
try:
result = interface.execute_deploy_step(task, step)
except exception.IronicException as e:
if isinstance(e, exception.AgentConnectionFailed):
if task.node.driver_internal_info.get('deployment_reboot'):
LOG.info('Agent is not yet running on node %(node)s after '
'deployment reboot, waiting for agent to come up '
'to run next deploy step %(step)s.',
{'node': node.uuid, 'step': step})
driver_internal_info['skip_current_deploy_step'] = False
node.driver_internal_info = driver_internal_info
task.process_event('wait')
return
log_msg = ('Node %(node)s failed deploy step %(step)s. Error: '
'%(err)s' %
{'node': node.uuid, 'step': node.deploy_step, 'err': e})
utils.deploying_error_handler(
task, log_msg,
_("Failed to deploy: %s") % node.deploy_step)
return
except Exception as e:
log_msg = ('Node %(node)s failed deploy step %(step)s with '
'unexpected error: %(err)s' %
{'node': node.uuid, 'step': node.deploy_step, 'err': e})
utils.deploying_error_handler(
task, log_msg,
_("Failed to deploy. Exception: %s") % e, traceback=True)
return
if ind == 0:
# We've done the very first deploy step.
# Update conductor_affinity to reference this conductor's ID
# since there may be local persistent state
node.conductor_affinity = conductor_id
node.save()
# Check if the step is done or not. The step should return
# states.DEPLOYWAIT if the step is still being executed, or
# None if the step is done.
# NOTE(deva): Some drivers may return states.DEPLOYWAIT
# eg. if they are waiting for a callback
if result == states.DEPLOYWAIT:
# Kill this worker, the async step will make an RPC call to
# continue_node_deploy() to continue deploying
LOG.info('Deploy step %(step)s on node %(node)s being '
'executed asynchronously, waiting for driver.',
{'node': node.uuid, 'step': step})
task.process_event('wait')
return
elif result is not None:
# NOTE(rloo): This is an internal/dev error; shouldn't happen.
log_msg = (_('While executing deploy step %(step)s on node '
'%(node)s, step returned unexpected state: %(val)s')
% {'step': step, 'node': node.uuid, 'val': result})
utils.deploying_error_handler(
task, log_msg,
_("Failed to deploy: %s") % node.deploy_step)
return
LOG.info('Node %(node)s finished deploy step %(step)s',
{'node': node.uuid, 'step': step})
# Finished executing the steps. Clear deploy_step.
node.deploy_step = None
driver_internal_info = node.driver_internal_info
driver_internal_info['deploy_steps'] = None
driver_internal_info.pop('deploy_step_index', None)
driver_internal_info.pop('deployment_reboot', None)
driver_internal_info.pop('deployment_polling', None)
# Remove the agent_url cached from the deployment.
driver_internal_info.pop('agent_url', None)
node.driver_internal_info = driver_internal_info
node.save()
_start_console_in_deploy(task)
task.process_event('done')
LOG.info('Successfully deployed node %(node)s with '
'instance %(instance)s.',
{'node': node.uuid, 'instance': node.instance_uuid})
def _get_configdrive_obj_name(node):
"""Generate the object name for the config drive."""
return 'configdrive-%s' % node.uuid
def _store_configdrive(node, configdrive):
"""Handle the storage of the config drive.
If configured, the config drive data are uploaded to a swift endpoint.
The Node's instance_info is updated to include either the temporary
Swift URL from the upload, or if no upload, the actual config drive data.
:param node: an Ironic node object.
:param configdrive: A gzipped and base64 encoded configdrive.
:raises: SwiftOperationError if an error occur when uploading the
config drive to the swift endpoint.
:raises: ConfigInvalid if required keystone authorization credentials
with swift are missing.
"""
if CONF.deploy.configdrive_use_object_store:
# NOTE(lucasagomes): No reason to use a different timeout than
# the one used for deploying the node
timeout = (CONF.conductor.configdrive_swift_temp_url_duration
or CONF.conductor.deploy_callback_timeout
# The documented default in ironic.conf.conductor
or 1800)
container = CONF.conductor.configdrive_swift_container
object_name = _get_configdrive_obj_name(node)
object_headers = {'X-Delete-After': str(timeout)}
with tempfile.NamedTemporaryFile(dir=CONF.tempdir) as fileobj:
fileobj.write(configdrive)
fileobj.flush()
swift_api = swift.SwiftAPI()
swift_api.create_object(container, object_name, fileobj.name,
object_headers=object_headers)
configdrive = swift_api.get_temp_url(container, object_name,
timeout)
i_info = node.instance_info
i_info['configdrive'] = configdrive
node.instance_info = i_info
node.save()
def _start_console_in_deploy(task):
"""Start console at the end of deployment.
Console is stopped at tearing down not to be exposed to an instance user.
Then, restart at deployment.
:param task: a TaskManager instance with an exclusive lock
"""
if not task.node.console_enabled:
return
notify_utils.emit_console_notification(
task, 'console_restore', fields.NotificationStatus.START)
try:
task.driver.console.start_console(task)
except Exception as err:
msg = (_('Failed to start console while deploying the '
'node %(node)s: %(err)s.') % {'node': task.node.uuid,
'err': err})
LOG.error(msg)
task.node.last_error = msg
task.node.console_enabled = False
task.node.save()
notify_utils.emit_console_notification(
task, 'console_restore', fields.NotificationStatus.ERROR)
else:
notify_utils.emit_console_notification(
task, 'console_restore', fields.NotificationStatus.END)

View File

@ -43,18 +43,15 @@ notifying Neutron of a change, etc.
import collections
import datetime
import queue
import tempfile
import eventlet
from futurist import periodics
from futurist import waiters
from ironic_lib import metrics_utils
from oslo_db import exception as db_exception
from oslo_log import log
import oslo_messaging as messaging
from oslo_utils import excutils
from oslo_utils import uuidutils
from oslo_utils import versionutils
from ironic.common import driver_factory
from ironic.common import exception
@ -64,11 +61,10 @@ from ironic.common.i18n import _
from ironic.common import images
from ironic.common import network
from ironic.common import nova
from ironic.common import release_mappings as versions
from ironic.common import states
from ironic.common import swift
from ironic.conductor import allocations
from ironic.conductor import base_manager
from ironic.conductor import deployments
from ironic.conductor import notification_utils as notify_utils
from ironic.conductor import steps as conductor_steps
from ironic.conductor import task_manager
@ -87,10 +83,6 @@ METRICS = metrics_utils.get_metrics_logger(__name__)
SYNC_EXCLUDED_STATES = (states.DEPLOYWAIT, states.CLEANWAIT, states.ENROLL)
# NOTE(rloo) This is used to keep track of deprecation warnings that have
# already been issued for deploy drivers that do not use deploy steps.
_SEEN_NO_DEPLOY_STEP_DEPRECATIONS = set()
class ConductorManager(base_manager.BaseConductorManager):
"""Ironic Conductor manager main class."""
@ -892,18 +884,14 @@ class ConductorManager(base_manager.BaseConductorManager):
task.process_event(
event,
callback=self._spawn_worker,
call_args=(do_node_deploy, task, self.conductor.id,
configdrive),
call_args=(deployments.do_node_deploy, task,
self.conductor.id, configdrive),
err_handler=utils.provisioning_error_handler)
except exception.InvalidState:
raise exception.InvalidStateRequested(
action=event, node=task.node.uuid,
state=task.node.provision_state)
def _get_node_next_deploy_steps(self, task, skip_current_step=True):
return self._get_node_next_steps(task, 'deploy',
skip_current_step=skip_current_step)
@METRICS.timer('ConductorManager.continue_node_deploy')
def continue_node_deploy(self, context, node_id):
"""RPC method to continue deploying a node.
@ -954,7 +942,7 @@ class ConductorManager(base_manager.BaseConductorManager):
node.driver_internal_info = info
node.save()
next_step_index = self._get_node_next_deploy_steps(
next_step_index = utils.get_node_next_deploy_steps(
task, skip_current_step=skip_current_step)
# TODO(rloo): When deprecation period is over and node is in
@ -966,7 +954,7 @@ class ConductorManager(base_manager.BaseConductorManager):
task.node)
task.spawn_after(
self._spawn_worker,
_do_next_deploy_step,
deployments.do_next_deploy_step,
task, next_step_index, self.conductor.id)
@METRICS.timer('ConductorManager.do_node_tear_down')
@ -1100,62 +1088,6 @@ class ConductorManager(base_manager.BaseConductorManager):
task.process_event('clean')
self._do_node_clean(task)
def _get_node_next_steps(self, task, step_type,
skip_current_step=True):
"""Get the task's node's next steps.
This determines what the next (remaining) steps are, and
returns the index into the steps list that corresponds to the
next step. The remaining steps are determined as follows:
* If no steps have been started yet, all the steps
must be executed
* If skip_current_step is False, the remaining steps start
with the current step. Otherwise, the remaining steps
start with the step after the current one.
All the steps are in node.driver_internal_info['<step_type>_steps'].
node.<step_type>_step is the current step that was just executed
(or None, {} if no steps have been executed yet).
node.driver_internal_info['<step_type>_step_index'] is the index
index into the steps list (or None, doesn't exist if no steps have
been executed yet) and corresponds to node.<step_type>_step.
:param task: A TaskManager object
:param step_type: The type of steps to process: 'clean' or 'deploy'.
:param skip_current_step: True to skip the current step; False to
include it.
:returns: index of the next step; None if there are none to execute.
"""
valid_types = set(['clean', 'deploy'])
if step_type not in valid_types:
# NOTE(rloo): No need to i18n this, since this would be a
# developer error; it isn't user-facing.
raise exception.Invalid(
'step_type must be one of %(valid)s, not %(step)s'
% {'valid': valid_types, 'step': step_type})
node = task.node
if not getattr(node, '%s_step' % step_type):
# first time through, all steps need to be done. Return the
# index of the first step in the list.
return 0
ind = node.driver_internal_info.get('%s_step_index' % step_type)
if ind is None:
return None
if skip_current_step:
ind += 1
if ind >= len(node.driver_internal_info['%s_steps' % step_type]):
# no steps left to do
ind = None
return ind
def _get_node_next_clean_steps(self, task, skip_current_step=True):
return self._get_node_next_steps(task, 'clean',
skip_current_step=skip_current_step)
@METRICS.timer('ConductorManager.do_node_clean')
@messaging.expected_exceptions(exception.InvalidParameterValue,
exception.InvalidStateRequested,
@ -1276,7 +1208,7 @@ class ConductorManager(base_manager.BaseConductorManager):
node.driver_internal_info = info
node.save()
next_step_index = self._get_node_next_clean_steps(
next_step_index = utils.get_node_next_clean_steps(
task, skip_current_step=skip_current_step)
# If this isn't the final clean step in the cleaning operation
@ -3680,367 +3612,6 @@ def get_vendor_passthru_metadata(route_dict):
return d
def _get_configdrive_obj_name(node):
"""Generate the object name for the config drive."""
return 'configdrive-%s' % node.uuid
def _store_configdrive(node, configdrive):
"""Handle the storage of the config drive.
If configured, the config drive data are uploaded to a swift endpoint.
The Node's instance_info is updated to include either the temporary
Swift URL from the upload, or if no upload, the actual config drive data.
:param node: an Ironic node object.
:param configdrive: A gzipped and base64 encoded configdrive.
:raises: SwiftOperationError if an error occur when uploading the
config drive to the swift endpoint.
:raises: ConfigInvalid if required keystone authorization credentials
with swift are missing.
"""
if CONF.deploy.configdrive_use_object_store:
# NOTE(lucasagomes): No reason to use a different timeout than
# the one used for deploying the node
timeout = (CONF.conductor.configdrive_swift_temp_url_duration
or CONF.conductor.deploy_callback_timeout
# The documented default in ironic.conf.conductor
or 1800)
container = CONF.conductor.configdrive_swift_container
object_name = _get_configdrive_obj_name(node)
object_headers = {'X-Delete-After': str(timeout)}
with tempfile.NamedTemporaryFile(dir=CONF.tempdir) as fileobj:
fileobj.write(configdrive)
fileobj.flush()
swift_api = swift.SwiftAPI()
swift_api.create_object(container, object_name, fileobj.name,
object_headers=object_headers)
configdrive = swift_api.get_temp_url(container, object_name,
timeout)
i_info = node.instance_info
i_info['configdrive'] = configdrive
node.instance_info = i_info
node.save()
@METRICS.timer('do_node_deploy')
@task_manager.require_exclusive_lock
def do_node_deploy(task, conductor_id=None, configdrive=None):
"""Prepare the environment and deploy a node."""
node = task.node
try:
if configdrive:
if isinstance(configdrive, dict):
configdrive = utils.build_configdrive(node, configdrive)
_store_configdrive(node, configdrive)
except (exception.SwiftOperationError, exception.ConfigInvalid) as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Error while uploading the configdrive for %(node)s '
'to Swift') % {'node': node.uuid},
_('Failed to upload the configdrive to Swift. '
'Error: %s') % e,
clean_up=False)
except db_exception.DBDataError as e:
with excutils.save_and_reraise_exception():
# NOTE(hshiina): This error happens when the configdrive is
# too large. Remove the configdrive from the
# object to update DB successfully in handling
# the failure.
node.obj_reset_changes()
utils.deploying_error_handler(
task,
('Error while storing the configdrive for %(node)s into '
'the database: %(err)s') % {'node': node.uuid, 'err': e},
_("Failed to store the configdrive in the database. "
"%s") % e,
clean_up=False)
except Exception as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Unexpected error while preparing the configdrive for '
'node %(node)s') % {'node': node.uuid},
_("Failed to prepare the configdrive. Exception: %s") % e,
traceback=True, clean_up=False)
try:
task.driver.deploy.prepare(task)
except exception.IronicException as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Error while preparing to deploy to node %(node)s: '
'%(err)s') % {'node': node.uuid, 'err': e},
_("Failed to prepare to deploy: %s") % e,
clean_up=False)
except Exception as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Unexpected error while preparing to deploy to node '
'%(node)s') % {'node': node.uuid},
_("Failed to prepare to deploy. Exception: %s") % e,
traceback=True, clean_up=False)
try:
# This gets the deploy steps (if any) and puts them in the node's
# driver_internal_info['deploy_steps'].
conductor_steps.set_node_deployment_steps(task)
except exception.InstanceDeployFailure as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
'Error while getting deploy steps; cannot deploy to node '
'%(node)s. Error: %(err)s' % {'node': node.uuid, 'err': e},
_("Cannot get deploy steps; failed to deploy: %s") % e)
steps = node.driver_internal_info.get('deploy_steps', [])
new_rpc_version = True
release_ver = versions.RELEASE_MAPPING.get(CONF.pin_release_version)
if release_ver:
new_rpc_version = versionutils.is_compatible('1.45',
release_ver['rpc'])
if not steps or not new_rpc_version:
# TODO(rloo): This if.. (and the above code wrt rpc version)
# can be deleted after the deprecation period when we no
# longer support drivers with no deploy steps.
# Note that after the deprecation period, there needs to be at least
# one deploy step. If none, the deployment fails.
if steps:
info = node.driver_internal_info
info.pop('deploy_steps')
node.driver_internal_info = info
node.save()
# We go back to using the old way, if:
# - out-of-tree driver hasn't yet converted to using deploy steps, or
# - we're in the middle of a rolling upgrade. This is to prevent the
# corner case of having new conductors with old conductors, and
# a node is deployed with a new conductor (via deploy steps), but
# after the deploy_wait, the node gets handled by an old conductor.
# To avoid this, we need to wait until all the conductors are new,
# signalled by the RPC API version being '1.45'.
_old_rest_of_do_node_deploy(task, conductor_id, not steps)
else:
_do_next_deploy_step(task, 0, conductor_id)
def _old_rest_of_do_node_deploy(task, conductor_id, no_deploy_steps):
"""The rest of the do_node_deploy() if not using deploy steps.
To support out-of-tree drivers that have not yet migrated to using
deploy steps.
:param no_deploy_steps: Boolean; True if there are no deploy steps.
"""
# TODO(rloo): This method can be deleted after the deprecation period
# for supporting drivers with no deploy steps.
if no_deploy_steps:
deploy_driver_name = task.driver.deploy.__class__.__name__
if deploy_driver_name not in _SEEN_NO_DEPLOY_STEP_DEPRECATIONS:
LOG.warning('Deploy driver %s does not support deploy steps; this '
'will be required starting with the Stein release.',
deploy_driver_name)
_SEEN_NO_DEPLOY_STEP_DEPRECATIONS.add(deploy_driver_name)
node = task.node
try:
new_state = task.driver.deploy.deploy(task)
except exception.IronicException as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Error in deploy of node %(node)s: %(err)s' %
{'node': node.uuid, 'err': e}),
_("Failed to deploy: %s") % e)
except Exception as e:
with excutils.save_and_reraise_exception():
utils.deploying_error_handler(
task,
('Unexpected error while deploying node %(node)s' %
{'node': node.uuid}),
_("Failed to deploy. Exception: %s") % e,
traceback=True)
# Update conductor_affinity to reference this conductor's ID
# since there may be local persistent state
node.conductor_affinity = conductor_id
# NOTE(deva): Some drivers may return states.DEPLOYWAIT
# eg. if they are waiting for a callback
if new_state == states.DEPLOYDONE:
_start_console_in_deploy(task)
task.process_event('done')
LOG.info('Successfully deployed node %(node)s with '
'instance %(instance)s.',
{'node': node.uuid, 'instance': node.instance_uuid})
elif new_state == states.DEPLOYWAIT:
task.process_event('wait')
else:
LOG.error('Unexpected state %(state)s returned while '
'deploying node %(node)s.',
{'state': new_state, 'node': node.uuid})
node.save()
@task_manager.require_exclusive_lock
def _do_next_deploy_step(task, step_index, conductor_id):
"""Do deployment, starting from the specified deploy step.
:param task: a TaskManager instance with an exclusive lock
:param step_index: The first deploy step in the list to execute. This
is the index (from 0) into the list of deploy steps in the node's
driver_internal_info['deploy_steps']. Is None if there are no steps
to execute.
"""
node = task.node
if step_index is None:
steps = []
else:
steps = node.driver_internal_info['deploy_steps'][step_index:]
LOG.info('Executing %(state)s on node %(node)s, remaining steps: '
'%(steps)s', {'node': node.uuid, 'steps': steps,
'state': node.provision_state})
# Execute each step until we hit an async step or run out of steps
for ind, step in enumerate(steps):
# Save which step we're about to start so we can restart
# if necessary
node.deploy_step = step
driver_internal_info = node.driver_internal_info
driver_internal_info['deploy_step_index'] = step_index + ind
node.driver_internal_info = driver_internal_info
node.save()
interface = getattr(task.driver, step.get('interface'))
LOG.info('Executing %(step)s on node %(node)s',
{'step': step, 'node': node.uuid})
try:
result = interface.execute_deploy_step(task, step)
except exception.IronicException as e:
if isinstance(e, exception.AgentConnectionFailed):
if task.node.driver_internal_info.get('deployment_reboot'):
LOG.info('Agent is not yet running on node %(node)s after '
'deployment reboot, waiting for agent to come up '
'to run next deploy step %(step)s.',
{'node': node.uuid, 'step': step})
driver_internal_info['skip_current_deploy_step'] = False
node.driver_internal_info = driver_internal_info
task.process_event('wait')
return
log_msg = ('Node %(node)s failed deploy step %(step)s. Error: '
'%(err)s' %
{'node': node.uuid, 'step': node.deploy_step, 'err': e})
utils.deploying_error_handler(
task, log_msg,
_("Failed to deploy: %s") % node.deploy_step)
return
except Exception as e:
log_msg = ('Node %(node)s failed deploy step %(step)s with '
'unexpected error: %(err)s' %
{'node': node.uuid, 'step': node.deploy_step, 'err': e})
utils.deploying_error_handler(
task, log_msg,
_("Failed to deploy. Exception: %s") % e, traceback=True)
return
if ind == 0:
# We've done the very first deploy step.
# Update conductor_affinity to reference this conductor's ID
# since there may be local persistent state
node.conductor_affinity = conductor_id
node.save()
# Check if the step is done or not. The step should return
# states.DEPLOYWAIT if the step is still being executed, or
# None if the step is done.
# NOTE(deva): Some drivers may return states.DEPLOYWAIT
# eg. if they are waiting for a callback
if result == states.DEPLOYWAIT:
# Kill this worker, the async step will make an RPC call to
# continue_node_deploy() to continue deploying
LOG.info('Deploy step %(step)s on node %(node)s being '
'executed asynchronously, waiting for driver.',
{'node': node.uuid, 'step': step})
task.process_event('wait')
return
elif result is not None:
# NOTE(rloo): This is an internal/dev error; shouldn't happen.
log_msg = (_('While executing deploy step %(step)s on node '
'%(node)s, step returned unexpected state: %(val)s')
% {'step': step, 'node': node.uuid, 'val': result})
utils.deploying_error_handler(
task, log_msg,
_("Failed to deploy: %s") % node.deploy_step)
return
LOG.info('Node %(node)s finished deploy step %(step)s',
{'node': node.uuid, 'step': step})
# Finished executing the steps. Clear deploy_step.
node.deploy_step = None
driver_internal_info = node.driver_internal_info
driver_internal_info['deploy_steps'] = None
driver_internal_info.pop('deploy_step_index', None)
driver_internal_info.pop('deployment_reboot', None)
driver_internal_info.pop('deployment_polling', None)
# Remove the agent_url cached from the deployment.
driver_internal_info.pop('agent_url', None)
node.driver_internal_info = driver_internal_info
node.save()
_start_console_in_deploy(task)
task.process_event('done')
LOG.info('Successfully deployed node %(node)s with '
'instance %(instance)s.',
{'node': node.uuid, 'instance': node.instance_uuid})
def _start_console_in_deploy(task):
"""Start console at the end of deployment.
Console is stopped at tearing down not to be exposed to an instance user.
Then, restart at deployment.
:param task: a TaskManager instance with an exclusive lock
"""
if not task.node.console_enabled:
return
notify_utils.emit_console_notification(
task, 'console_restore', fields.NotificationStatus.START)
try:
task.driver.console.start_console(task)
except Exception as err:
msg = (_('Failed to start console while deploying the '
'node %(node)s: %(err)s.') % {'node': task.node.uuid,
'err': err})
LOG.error(msg)
task.node.last_error = msg
task.node.console_enabled = False
task.node.save()
notify_utils.emit_console_notification(
task, 'console_restore', fields.NotificationStatus.ERROR)
else:
notify_utils.emit_console_notification(
task, 'console_restore', fields.NotificationStatus.END)
@task_manager.require_exclusive_lock
def handle_sync_power_state_max_retries_exceeded(task, actual_power_state,
exception=None):

View File

@ -943,3 +943,65 @@ def remove_agent_url(node):
info = node.driver_internal_info
info.pop('agent_url', None)
node.driver_internal_info = info
def _get_node_next_steps(task, step_type, skip_current_step=True):
"""Get the task's node's next steps.
This determines what the next (remaining) steps are, and
returns the index into the steps list that corresponds to the
next step. The remaining steps are determined as follows:
* If no steps have been started yet, all the steps
must be executed
* If skip_current_step is False, the remaining steps start
with the current step. Otherwise, the remaining steps
start with the step after the current one.
All the steps are in node.driver_internal_info['<step_type>_steps'].
node.<step_type>_step is the current step that was just executed
(or None, {} if no steps have been executed yet).
node.driver_internal_info['<step_type>_step_index'] is the index
index into the steps list (or None, doesn't exist if no steps have
been executed yet) and corresponds to node.<step_type>_step.
:param task: A TaskManager object
:param step_type: The type of steps to process: 'clean' or 'deploy'.
:param skip_current_step: True to skip the current step; False to
include it.
:returns: index of the next step; None if there are none to execute.
"""
valid_types = set(['clean', 'deploy'])
if step_type not in valid_types:
# NOTE(rloo): No need to i18n this, since this would be a
# developer error; it isn't user-facing.
raise exception.Invalid(
'step_type must be one of %(valid)s, not %(step)s'
% {'valid': valid_types, 'step': step_type})
node = task.node
if not getattr(node, '%s_step' % step_type):
# first time through, all steps need to be done. Return the
# index of the first step in the list.
return 0
ind = node.driver_internal_info.get('%s_step_index' % step_type)
if ind is None:
return None
if skip_current_step:
ind += 1
if ind >= len(node.driver_internal_info['%s_steps' % step_type]):
# no steps left to do
ind = None
return ind
def get_node_next_clean_steps(task, skip_current_step=True):
return _get_node_next_steps(task, 'clean',
skip_current_step=skip_current_step)
def get_node_next_deploy_steps(task, skip_current_step=True):
return _get_node_next_steps(task, 'deploy',
skip_current_step=skip_current_step)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1915,3 +1915,106 @@ class FastTrackTestCase(db_base.DbTestCase):
with task_manager.acquire(
self.context, self.node.uuid, shared=False) as task:
self.assertFalse(conductor_utils.is_fast_track(task))
class GetNodeNextStepsTestCase(db_base.DbTestCase):
def setUp(self):
super(GetNodeNextStepsTestCase, self).setUp()
self.power_update = {
'step': 'update_firmware', 'priority': 10, 'interface': 'power'}
self.deploy_update = {
'step': 'update_firmware', 'priority': 10, 'interface': 'deploy'}
self.deploy_erase = {
'step': 'erase_disks', 'priority': 20, 'interface': 'deploy'}
# Automated cleaning should be executed in this order
self.clean_steps = [self.deploy_erase, self.power_update,
self.deploy_update]
self.deploy_start = {
'step': 'deploy_start', 'priority': 50, 'interface': 'deploy'}
self.deploy_end = {
'step': 'deploy_end', 'priority': 20, 'interface': 'deploy'}
self.deploy_steps = [self.deploy_start, self.deploy_end]
def _test_get_node_next_deploy_steps(self, skip=True):
driver_internal_info = {'deploy_steps': self.deploy_steps,
'deploy_step_index': 0}
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.DEPLOYWAIT,
target_provision_state=states.ACTIVE,
driver_internal_info=driver_internal_info,
last_error=None,
deploy_step=self.deploy_steps[0])
with task_manager.acquire(self.context, node.uuid) as task:
step_index = conductor_utils.get_node_next_deploy_steps(
task, skip_current_step=skip)
expected_index = 1 if skip else 0
self.assertEqual(expected_index, step_index)
def test_get_node_next_deploy_steps(self):
self._test_get_node_next_deploy_steps()
def test_get_node_next_deploy_steps_no_skip(self):
self._test_get_node_next_deploy_steps(skip=False)
def test_get_node_next_deploy_steps_unset_deploy_step(self):
driver_internal_info = {'deploy_steps': self.deploy_steps,
'deploy_step_index': None}
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.DEPLOYWAIT,
target_provision_state=states.ACTIVE,
driver_internal_info=driver_internal_info,
last_error=None,
deploy_step=None)
with task_manager.acquire(self.context, node.uuid) as task:
step_index = conductor_utils.get_node_next_deploy_steps(task)
self.assertEqual(0, step_index)
def test_get_node_next_steps_exception(self):
node = obj_utils.create_test_node(self.context)
with task_manager.acquire(self.context, node.uuid) as task:
self.assertRaises(exception.Invalid,
conductor_utils._get_node_next_steps,
task, 'foo')
def _test_get_node_next_clean_steps(self, skip=True):
driver_internal_info = {'clean_steps': self.clean_steps,
'clean_step_index': 0}
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.CLEANWAIT,
target_provision_state=states.AVAILABLE,
driver_internal_info=driver_internal_info,
last_error=None,
clean_step=self.clean_steps[0])
with task_manager.acquire(self.context, node.uuid) as task:
step_index = conductor_utils.get_node_next_clean_steps(
task, skip_current_step=skip)
expected_index = 1 if skip else 0
self.assertEqual(expected_index, step_index)
def test_get_node_next_clean_steps(self):
self._test_get_node_next_clean_steps()
def test_get_node_next_clean_steps_no_skip(self):
self._test_get_node_next_clean_steps(skip=False)
def test_get_node_next_clean_steps_unset_clean_step(self):
driver_internal_info = {'clean_steps': self.clean_steps,
'clean_step_index': None}
node = obj_utils.create_test_node(
self.context, driver='fake-hardware',
provision_state=states.CLEANWAIT,
target_provision_state=states.AVAILABLE,
driver_internal_info=driver_internal_info,
last_error=None,
clean_step=None)
with task_manager.acquire(self.context, node.uuid) as task:
step_index = conductor_utils.get_node_next_clean_steps(task)
self.assertEqual(0, step_index)