From 5a89faf496dcb3046d149d11f4d76b399ee63702 Mon Sep 17 00:00:00 2001 From: Zhenguo Niu Date: Thu, 8 Dec 2016 20:11:42 +0800 Subject: [PATCH] Add create instance taskflow Support rollback in the process of creating an instance Co-Authored-By: Zhong Luyao Implements: blueprint add-rollback-mechanism-for-creating-a-instance Change-Id: I2f373d5acee1db88ee1eb00160966ebb0977ad50 --- nimble/cmd/engine.py | 4 +- nimble/common/constants.py | 17 + nimble/common/flow_utils.py | 79 ++++ nimble/common/service.py | 6 +- nimble/common/utils.py | 12 + nimble/engine/api.py | 12 +- nimble/engine/baremetal/ironic.py | 9 + nimble/engine/base_manager.py | 2 + nimble/engine/flows/__init__.py | 0 nimble/engine/flows/create_instance.py | 372 ++++++++++++++++++ nimble/engine/manager.py | 164 ++------ nimble/engine/rpcapi.py | 11 +- nimble/engine/scheduler/filter_scheduler.py | 16 +- nimble/tests/unit/common/test_service.py | 4 +- nimble/tests/unit/engine/flows/__init__.py | 0 .../engine/flows/test_create_instance_flow.py | 96 +++++ nimble/tests/unit/engine/test_manager.py | 51 --- nimble/tests/unit/engine/test_rpcapi.py | 3 +- requirements.txt | 1 + 19 files changed, 658 insertions(+), 201 deletions(-) create mode 100644 nimble/common/constants.py create mode 100644 nimble/common/flow_utils.py create mode 100644 nimble/engine/flows/__init__.py create mode 100644 nimble/engine/flows/create_instance.py create mode 100644 nimble/tests/unit/engine/flows/__init__.py create mode 100644 nimble/tests/unit/engine/flows/test_create_instance_flow.py diff --git a/nimble/cmd/engine.py b/nimble/cmd/engine.py index e197de0e..d87f428d 100644 --- a/nimble/cmd/engine.py +++ b/nimble/cmd/engine.py @@ -22,6 +22,7 @@ import sys from oslo_config import cfg from oslo_service import service +from nimble.common import constants from nimble.common import service as nimble_service CONF = cfg.CONF @@ -33,7 +34,8 @@ def main(): mgr = nimble_service.RPCService(CONF.host, 'nimble.engine.manager', - 'EngineManager') + 'EngineManager', + constants.MANAGER_TOPIC) launcher = service.launch(CONF, mgr) launcher.wait() diff --git a/nimble/common/constants.py b/nimble/common/constants.py new file mode 100644 index 00000000..d9f86947 --- /dev/null +++ b/nimble/common/constants.py @@ -0,0 +1,17 @@ +# Copyright 2016 Huawei Technologies Co.,LTD. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +MANAGER_TOPIC = 'nimble.engine_manager' diff --git a/nimble/common/flow_utils.py b/nimble/common/flow_utils.py new file mode 100644 index 00000000..73ce80e6 --- /dev/null +++ b/nimble/common/flow_utils.py @@ -0,0 +1,79 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from oslo_log import log as logging +# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow +from taskflow.listeners import base +from taskflow.listeners import logging as logging_listener +from taskflow import task + +LOG = logging.getLogger(__name__) + + +def _make_task_name(cls, addons=None): + """Makes a pretty name for a task class.""" + base_name = ".".join([cls.__module__, cls.__name__]) + extra = '' + if addons: + extra = ';%s' % (", ".join([str(a) for a in addons])) + return base_name + extra + + +class NimbleTask(task.Task): + """The root task class for all nimble tasks. + + It automatically names the given task using the module and class that + implement the given task as the task name. + """ + + def __init__(self, addons=None, **kwargs): + super(NimbleTask, self).__init__(self.make_name(addons), **kwargs) + + @classmethod + def make_name(cls, addons=None): + return _make_task_name(cls, addons) + + +class DynamicLogListener(logging_listener.DynamicLoggingListener): + """This is used to attach to taskflow engines while they are running. + + It provides a bunch of useful features that expose the actions happening + inside a taskflow engine, which can be useful for developers for debugging, + for operations folks for monitoring and tracking of the resource actions + and more... + """ + + #: Exception is an excepted case, don't include traceback in log if fails. + # _NO_TRACE_EXCEPTIONS = (exception.InvalidInput, exception.QuotaError) + _NO_TRACE_EXCEPTIONS = () + + def __init__(self, engine, + task_listen_for=base.DEFAULT_LISTEN_FOR, + flow_listen_for=base.DEFAULT_LISTEN_FOR, + retry_listen_for=base.DEFAULT_LISTEN_FOR, + logger=LOG): + super(DynamicLogListener, self).__init__( + engine, + task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for, + retry_listen_for=retry_listen_for, + log=logger) + + def _format_failure(self, fail): + if fail.check(*self._NO_TRACE_EXCEPTIONS) is not None: + exc_info = None + exc_details = '%s%s' % (os.linesep, fail.pformat(traceback=False)) + return (exc_info, exc_details) + else: + return super(DynamicLogListener, self)._format_failure(fail) diff --git a/nimble/common/service.py b/nimble/common/service.py index bd629247..835ebdab 100644 --- a/nimble/common/service.py +++ b/nimble/common/service.py @@ -36,13 +36,13 @@ LOG = log.getLogger(__name__) class RPCService(service.Service): - def __init__(self, host, manager_module, manager_class): + def __init__(self, host, manager_module, manager_class, topic): super(RPCService, self).__init__() self.host = host manager_module = importutils.try_import(manager_module) manager_class = getattr(manager_module, manager_class) - self.manager = manager_class(host, manager_module.MANAGER_TOPIC) - self.topic = self.manager.topic + self.manager = manager_class(host, topic) + self.topic = topic self.rpcserver = None def start(self): diff --git a/nimble/common/utils.py b/nimble/common/utils.py index 209efe03..df5ac9a2 100644 --- a/nimble/common/utils.py +++ b/nimble/common/utils.py @@ -72,3 +72,15 @@ def validate_and_normalize_mac(address): if not is_valid_mac(address): raise exception.InvalidMAC(mac=address) return address.lower() + + +def make_pretty_name(method): + """Makes a pretty name for a function/method.""" + meth_pieces = [method.__name__] + # If its an instance method attempt to tack on the class name + if hasattr(method, '__self__') and method.__self__ is not None: + try: + meth_pieces.insert(0, method.__self__.__class__.__name__) + except AttributeError: + pass + return ".".join(meth_pieces) diff --git a/nimble/engine/api.py b/nimble/engine/api.py index 2eaa991d..1173e674 100644 --- a/nimble/engine/api.py +++ b/nimble/engine/api.py @@ -80,9 +80,19 @@ class API(object): instance = self._provision_instances(context, base_options) + request_spec = { + 'instance_id': instance.uuid, + 'instance_properties': { + 'availability_zone': instance.availability_zone, + 'instance_type_uuid': instance.instance_type_uuid, + }, + 'instance_type': dict(instance_type), + } + self.engine_rpcapi.create_instance(context, instance, requested_networks, - instance_type) + request_spec, + filter_properties=None) return instance diff --git a/nimble/engine/baremetal/ironic.py b/nimble/engine/baremetal/ironic.py index 026029cd..b6c0dbcd 100644 --- a/nimble/engine/baremetal/ironic.py +++ b/nimble/engine/baremetal/ironic.py @@ -73,6 +73,15 @@ def set_instance_info(ironicclient, instance): ironicclient.call("node.update", instance.node_uuid, patch) +def unset_instance_info(ironicclient, instance): + + patch = [] + patch.append({'path': '/instance_uuid', 'op': 'remove'}) + patch.append({'path': '/instance_info', 'op': 'remove'}) + + ironicclient.call("node.update", instance.node_uuid, patch) + + def do_node_deploy(ironicclient, node_uuid): # trigger the node deploy ironicclient.call("node.set_provision_state", node_uuid, diff --git a/nimble/engine/base_manager.py b/nimble/engine/base_manager.py index 38b4ad6d..dd360ef2 100644 --- a/nimble/engine/base_manager.py +++ b/nimble/engine/base_manager.py @@ -24,6 +24,7 @@ from nimble.common import ironic from nimble.common import rpc from nimble.conf import CONF from nimble.db import api as dbapi +from nimble.engine import rpcapi class BaseEngineManager(periodic_task.PeriodicTasks): @@ -39,6 +40,7 @@ class BaseEngineManager(periodic_task.PeriodicTasks): self.scheduler = importutils.import_object(scheduler_driver) self.notifier = rpc.get_notifier() self.ironicclient = ironic.IronicClientWrapper() + self.engine_rpcapi = rpcapi.EngineAPI() self._started = False def init_host(self): diff --git a/nimble/engine/flows/__init__.py b/nimble/engine/flows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nimble/engine/flows/create_instance.py b/nimble/engine/flows/create_instance.py new file mode 100644 index 00000000..23810798 --- /dev/null +++ b/nimble/engine/flows/create_instance.py @@ -0,0 +1,372 @@ +# Copyright 2016 Huawei Technologies Co.,LTD. +# All Rights Reserved. + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import traceback + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import loopingcall +from oslo_utils import timeutils +import taskflow.engines +from taskflow.patterns import linear_flow + +from nimble.common import exception +from nimble.common import flow_utils +from nimble.common.i18n import _LE +from nimble.common.i18n import _LI +from nimble.common import neutron +from nimble.common import utils +from nimble.engine.baremetal import ironic +from nimble.engine.baremetal import ironic_states +from nimble.engine import status + +LOG = logging.getLogger(__name__) + +ACTION = 'instance:create' +CONF = cfg.CONF + + +class ScheduleCreateInstanceTask(flow_utils.NimbleTask): + """Activates a scheduler driver and handles any subsequent failure.""" + + def __init__(self, manager): + requires = ['filter_properties', 'request_spec', 'instance', + 'context'] + super(ScheduleCreateInstanceTask, self).__init__(addons=[ACTION], + requires=requires) + self.manager = manager + + def execute(self, context, instance, request_spec, filter_properties): + top_node = self.manager.scheduler.schedule(context, + request_spec, + self.manager.node_cache, + filter_properties) + instance.node_uuid = top_node + + +class OnFailureRescheduleTask(flow_utils.NimbleTask): + """Triggers a rescheduling request to be sent when reverting occurs. + + If rescheduling doesn't occur this task errors out the instance. + """ + + def __init__(self, engine_rpcapi): + requires = ['filter_properties', 'request_spec', 'instance', + 'requested_networks', 'context'] + super(OnFailureRescheduleTask, self).__init__(addons=[ACTION], + requires=requires) + self.engine_rpcapi = engine_rpcapi + # These exception types will trigger the instance to be set into error + # status rather than being rescheduled. + self.no_reschedule_exc_types = [ + # The instance has been removed from the database, that can not + # be fixed by rescheduling. + exception.InstanceNotFound, + ] + + def execute(self, **kwargs): + pass + + def _reschedule(self, context, cause, request_spec, filter_properties, + instance, requested_networks): + """Actions that happen during the rescheduling attempt occur here.""" + + create_instance = self.engine_rpcapi.create_instance + if not filter_properties: + filter_properties = {} + if 'retry' not in filter_properties: + filter_properties['retry'] = {} + + retry_info = filter_properties['retry'] + num_attempts = retry_info.get('num_attempts', 0) + + LOG.debug("Instance %(instance_id)s: re-scheduling %(method)s " + "attempt %(num)d due to %(reason)s", + {'instance_id': instance.uuid, + 'method': utils.make_pretty_name(create_instance), + 'num': num_attempts, + 'reason': cause.exception_str}) + + if all(cause.exc_info): + # Stringify to avoid circular ref problem in json serialization + retry_info['exc'] = traceback.format_exception(*cause.exc_info) + + return create_instance(context, instance, requested_networks, + request_spec=request_spec, + filter_properties=filter_properties) + + def revert(self, context, result, flow_failures, instance, **kwargs): + # Check if we have a cause which can tell us not to reschedule and + # set the instance's status to error. + for failure in flow_failures.values(): + if failure.check(*self.no_reschedule_exc_types): + LOG.error(_LE("Instance %s: create failed and no reschedule."), + instance.uuid) + return False + + cause = list(flow_failures.values())[0] + try: + self._reschedule(context, cause, instance=instance, **kwargs) + return True + except exception.NimbleException: + LOG.exception(_LE("Instance %s: rescheduling failed"), + instance.uuid) + + return False + + +class SetInstanceInfoTask(flow_utils.NimbleTask): + """Set instance info to ironic node and validate it.""" + + def __init__(self, ironicclient): + requires = ['instance', 'context'] + super(SetInstanceInfoTask, self).__init__(addons=[ACTION], + requires=requires) + self.ironicclient = ironicclient + # These exception types will trigger the instance info to be cleaned. + self.instance_info_cleaned_exc_types = [ + exception.ValidationError, + exception.InterfacePlugException, + exception.NetworkError, + ] + + def execute(self, context, instance): + ironic.set_instance_info(self.ironicclient, instance) + # validate we are ready to do the deploy + validate_chk = ironic.validate_node(self.ironicclient, + instance.node_uuid) + if (not validate_chk.deploy.get('result') + or not validate_chk.power.get('result')): + self._set_instance_obj_error_state(context, instance) + raise exception.ValidationError(_( + "Ironic node: %(id)s failed to validate." + " (deploy: %(deploy)s, power: %(power)s)") + % {'id': instance.node_uuid, + 'deploy': validate_chk.deploy, + 'power': validate_chk.power}) + + def revert(self, context, result, flow_failures, instance, **kwargs): + # Check if we have a cause which need to clean up ironic node + # instance info. + for failure in flow_failures.values(): + if failure.check(*self.instance_info_cleaned_exc_types): + LOG.debug("Instance %s: cleaning up node instance info", + instance.uuid) + ironic.unset_instance_info(self.ironicclient, instance) + return True + + return False + + +class BuildNetworkTask(flow_utils.NimbleTask): + """Build network for the instance.""" + + def __init__(self, ironicclient): + requires = ['instance', 'requested_networks', 'context'] + super(BuildNetworkTask, self).__init__(addons=[ACTION], + requires=requires) + self.ironicclient = ironicclient + # These exception types will trigger the network to be cleaned. + self.network_cleaned_exc_types = [ + exception.NetworkError, + # include instance create task failure here + exception.InstanceDeployFailure, + loopingcall.LoopingCallTimeOut, + ] + + def _build_networks(self, context, instance, requested_networks): + node_uuid = instance.node_uuid + ironic_ports = ironic.get_ports_from_node(self.ironicclient, + node_uuid, + detail=True) + LOG.debug(_('Find ports %(ports)s for node %(node)s') % + {'ports': ironic_ports, 'node': node_uuid}) + if len(requested_networks) > len(ironic_ports): + raise exception.InterfacePlugException(_( + "Ironic node: %(id)s virtual to physical interface count" + " mismatch" + " (Vif count: %(vif_count)d, Pif count: %(pif_count)d)") + % {'id': instance.node_uuid, + 'vif_count': len(requested_networks), + 'pif_count': len(ironic_ports)}) + + network_info = {} + for vif in requested_networks: + for pif in ironic_ports: + # Match the specified port type with physical interface type + if vif.get('port_type') == pif.extra.get('port_type'): + try: + port = neutron.create_port(context, vif['uuid'], + pif.address, instance.uuid) + port_dict = port['port'] + network_info[port_dict['id']] = { + 'network': port_dict['network_id'], + 'mac_address': port_dict['mac_address'], + 'fixed_ips': port_dict['fixed_ips']} + ironic.plug_vif(self.ironicclient, pif.uuid, + port_dict['id']) + except Exception: + # Set network_info here, so we can clean up the created + # networks during reverting. + instance.network_info = network_info + LOG.error(_LE("Instance %s: create network failed"), + instance.uuid) + raise exception.NetworkError(_( + "Build network for instance failed.")) + + return network_info + + def _destroy_networks(self, context, instance): + LOG.debug("unplug: instance_uuid=%(uuid)s vif=%(network_info)s", + {'uuid': instance.uuid, + 'network_info': str(instance.network_info)}) + + ports = instance.network_info.keys() + for port in ports: + neutron.delete_port(context, port, instance.uuid) + + ironic_ports = ironic.get_ports_from_node(self.ironicclient, + instance.node_uuid, + detail=True) + for pif in ironic_ports: + if 'vif_port_id' in pif.extra: + ironic.unplug_vif(self.ironicclient, pif.uuid) + + def execute(self, context, instance, requested_networks): + network_info = self._build_networks( + context, + instance, + requested_networks) + + instance.network_info = network_info + + def revert(self, context, result, flow_failures, instance, **kwargs): + # Check if we have a cause which need to clean up networks. + for failure in flow_failures.values(): + if failure.check(*self.network_cleaned_exc_types): + LOG.debug("Instance %s: cleaning up node networks", + instance.uuid) + if instance.network_info: + self._destroy_networks(context, instance) + # Unset network_info here as we have destroyed it. + instance.network_info = {} + return True + + return False + + +class CreateInstanceTask(flow_utils.NimbleTask): + """Set instance info to ironic node and validate it.""" + + def __init__(self, ironicclient): + requires = ['instance', 'context'] + super(CreateInstanceTask, self).__init__(addons=[ACTION], + requires=requires) + self.ironicclient = ironicclient + # These exception types will trigger the instance to be cleaned. + self.instance_cleaned_exc_types = [ + exception.InstanceDeployFailure, + loopingcall.LoopingCallTimeOut, + ] + + def _wait_for_active(self, instance): + """Wait for the node to be marked as ACTIVE in Ironic.""" + + node = ironic.get_node_by_instance(self.ironicclient, + instance.uuid) + LOG.debug('Current ironic node state is %s', node.provision_state) + if node.provision_state == ironic_states.ACTIVE: + # job is done + LOG.debug("Ironic node %(node)s is now ACTIVE", + dict(node=node.uuid)) + instance.status = status.ACTIVE + instance.launched_at = timeutils.utcnow() + instance.save() + raise loopingcall.LoopingCallDone() + + if node.target_provision_state in (ironic_states.DELETED, + ironic_states.AVAILABLE): + # ironic is trying to delete it now + raise exception.InstanceNotFound(instance_id=instance.uuid) + + if node.provision_state in (ironic_states.NOSTATE, + ironic_states.AVAILABLE): + # ironic already deleted it + raise exception.InstanceNotFound(instance_id=instance.uuid) + + if node.provision_state == ironic_states.DEPLOYFAIL: + # ironic failed to deploy + msg = (_("Failed to provision instance %(inst)s: %(reason)s") + % {'inst': instance.uuid, 'reason': node.last_error}) + raise exception.InstanceDeployFailure(msg) + + def _build_instance(self, context, instance): + ironic.do_node_deploy(self.ironicclient, instance.node_uuid) + + timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_active, + instance) + timer.start(interval=CONF.ironic.api_retry_interval).wait() + LOG.info(_LI('Successfully provisioned Ironic node %s'), + instance.node_uuid) + + def execute(self, context, instance): + self._build_instance(context, instance) + + def revert(self, context, result, flow_failures, instance, **kwargs): + # Check if we have a cause which need to clean up instance. + for failure in flow_failures.values(): + if failure.check(*self.instance_cleaned_exc_types): + LOG.debug("Instance %s: destroy ironic node", instance.uuid) + ironic.destroy_node(self.ironicclient, instance.node_uuid) + return True + + return False + + +def get_flow(context, manager, instance, requested_networks, request_spec, + filter_properties): + + """Constructs and returns the manager entrypoint flow + + This flow will do the following: + + 1. Schedule a node to create instance + 2. Set instance info to ironic node and validate it's ready to deploy + 3. Build networks for the instance and set port id back to ironic port + 4. Do node deploy and handle errors. + """ + + flow_name = ACTION.replace(":", "_") + "_manager" + instance_flow = linear_flow.Flow(flow_name) + + # This injects the initial starting flow values into the workflow so that + # the dependency order of the tasks provides/requires can be correctly + # determined. + create_what = { + 'context': context, + 'filter_properties': filter_properties, + 'request_spec': request_spec, + 'instance': instance, + 'requested_networks': requested_networks + } + + instance_flow.add(ScheduleCreateInstanceTask(manager), + OnFailureRescheduleTask(manager.engine_rpcapi), + SetInstanceInfoTask(manager.ironicclient), + BuildNetworkTask(manager.ironicclient), + CreateInstanceTask(manager.ironicclient)) + + # Now load (but do not run) the flow using the provided initial data. + return taskflow.engines.load(instance_flow, store=create_what) diff --git a/nimble/engine/manager.py b/nimble/engine/manager.py index 9e95e81b..aff80b05 100644 --- a/nimble/engine/manager.py +++ b/nimble/engine/manager.py @@ -15,11 +15,10 @@ from oslo_log import log import oslo_messaging as messaging -from oslo_service import loopingcall from oslo_service import periodic_task -from oslo_utils import timeutils from nimble.common import exception +from nimble.common import flow_utils from nimble.common.i18n import _LE from nimble.common.i18n import _LI from nimble.common import neutron @@ -27,10 +26,9 @@ from nimble.conf import CONF from nimble.engine.baremetal import ironic from nimble.engine.baremetal import ironic_states from nimble.engine import base_manager +from nimble.engine.flows import create_instance from nimble.engine import status -MANAGER_TOPIC = 'nimble.engine_manager' - LOG = log.getLogger(__name__) @@ -65,39 +63,6 @@ class EngineManager(base_manager.BaseEngineManager): LOG.debug('Instance has been destroyed from under us while ' 'trying to set it to ERROR', instance=instance) - def _build_networks(self, context, instance, requested_networks): - node_uuid = instance.node_uuid - ironic_ports = ironic.get_ports_from_node(self.ironicclient, - node_uuid, - detail=True) - LOG.debug(_('Find ports %(ports)s for node %(node)s') % - {'ports': ironic_ports, 'node': node_uuid}) - if len(requested_networks) > len(ironic_ports): - raise exception.InterfacePlugException(_( - "Ironic node: %(id)s virtual to physical interface count" - " mismatch" - " (Vif count: %(vif_count)d, Pif count: %(pif_count)d)") - % {'id': instance.node_uuid, - 'vif_count': len(requested_networks), - 'pif_count': len(ironic_ports)}) - - network_info = {} - for vif in requested_networks: - for pif in ironic_ports: - # Match the specified port type with physical interface type - if vif.get('port_type') == pif.extra.get('port_type'): - port = neutron.create_port(context, vif['uuid'], - pif.address, instance.uuid) - port_dict = port['port'] - network_info[port_dict['id']] = { - 'network': port_dict['network_id'], - 'mac_address': port_dict['mac_address'], - 'fixed_ips': port_dict['fixed_ips']} - ironic.plug_vif(self.ironicclient, pif.uuid, - port_dict['id']) - - return network_info - def _destroy_networks(self, context, instance): LOG.debug("unplug: instance_uuid=%(uuid)s vif=%(network_info)s", {'uuid': instance.uuid, @@ -114,110 +79,51 @@ class EngineManager(base_manager.BaseEngineManager): if 'vif_port_id' in pif.extra: ironic.unplug_vif(self.ironicclient, pif.uuid) - def _wait_for_active(self, instance): - """Wait for the node to be marked as ACTIVE in Ironic.""" - - node = ironic.get_node_by_instance(self.ironicclient, - instance.uuid) - LOG.debug('Current ironic node state is %s', node.provision_state) - if node.provision_state == ironic_states.ACTIVE: - # job is done - LOG.debug("Ironic node %(node)s is now ACTIVE", - dict(node=node.uuid)) - instance.status = status.ACTIVE - instance.launched_at = timeutils.utcnow() - instance.save() - raise loopingcall.LoopingCallDone() - - if node.target_provision_state in (ironic_states.DELETED, - ironic_states.AVAILABLE): - # ironic is trying to delete it now - raise exception.InstanceNotFound(instance_id=instance.uuid) - - if node.provision_state in (ironic_states.NOSTATE, - ironic_states.AVAILABLE): - # ironic already deleted it - raise exception.InstanceNotFound(instance_id=instance.uuid) - - if node.provision_state == ironic_states.DEPLOYFAIL: - # ironic failed to deploy - msg = (_("Failed to provision instance %(inst)s: %(reason)s") - % {'inst': instance.uuid, 'reason': node.last_error}) - raise exception.InstanceDeployFailure(msg) - - def _build_instance(self, context, instance): - ironic.do_node_deploy(self.ironicclient, instance.node_uuid) - - timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_active, - instance) - timer.start(interval=CONF.ironic.api_retry_interval).wait() - LOG.info(_LI('Successfully provisioned Ironic node %s'), - instance.node_uuid) - def _destroy_instance(self, context, instance): ironic.destroy_node(self.ironicclient, instance.node_uuid) LOG.info(_LI('Successfully destroyed Ironic node %s'), instance.node_uuid) - def create_instance(self, context, instance, - requested_networks, instance_type): + def create_instance(self, context, instance, requested_networks, + request_spec=None, filter_properties=None): """Perform a deployment.""" LOG.debug("Starting instance...") - # Populate request spec - instance_type_uuid = instance.instance_type_uuid - request_spec = { - 'instance_id': instance.uuid, - 'instance_properties': { - 'availability_zone': instance.availability_zone, - 'instance_type_uuid': instance_type_uuid, - }, - 'instance_type': dict(instance_type), - } - LOG.debug("Scheduling with request_spec: %s", request_spec) + if filter_properties is None: + filter_properties = {} - # TODO(zhenguo): Add retry - filter_properties = {} try: - top_node = self.scheduler.schedule(context, - request_spec, - self.node_cache, - filter_properties) + flow_engine = create_instance.get_flow( + context, + self, + instance, + requested_networks, + request_spec, + filter_properties, + ) + except Exception: + msg = _("Create manager instance flow failed.") + LOG.exception(msg) + raise exception.NimbleException(msg) + + def _run_flow(): + # This code executes create instance flow. If something goes wrong, + # flow reverts all job that was done and reraises an exception. + # Otherwise, all data that was generated by flow becomes available + # in flow engine's storage. + with flow_utils.DynamicLogListener(flow_engine, logger=LOG): + flow_engine.run() + + try: + _run_flow() except exception.NoValidNode: self._set_instance_obj_error_state(context, instance) - raise exception.NoValidNode( - _('No valid node is found with request spec %s') % - request_spec) - instance.node_uuid = top_node - del self.node_cache[top_node] - - ironic.set_instance_info(self.ironicclient, instance) - # validate we are ready to do the deploy - validate_chk = ironic.validate_node(self.ironicclient, - instance.node_uuid) - if (not validate_chk.deploy.get('result') - or not validate_chk.power.get('result')): - self._set_instance_obj_error_state(context, instance) - raise exception.ValidationError(_( - "Ironic node: %(id)s failed to validate." - " (deploy: %(deploy)s, power: %(power)s)") - % {'id': instance.node_uuid, - 'deploy': validate_chk.deploy, - 'power': validate_chk.power}) - - try: - network_info = self._build_networks(context, instance, - requested_networks) - except Exception: - self._set_instance_obj_error_state(context, instance) - return - - instance.network_info = network_info - - try: - self._build_instance(context, instance) - except Exception: - self._set_instance_obj_error_state(context, instance) + LOG.error(_LE("Created instance %s failed, No valid node " + "is found with the request spec."), instance.uuid) + else: + LOG.info(_LI("Created instance %s successfully."), instance.uuid) + finally: + return instance def delete_instance(self, context, instance): """Delete an instance.""" diff --git a/nimble/engine/rpcapi.py b/nimble/engine/rpcapi.py index 4e984930..4e8c7551 100644 --- a/nimble/engine/rpcapi.py +++ b/nimble/engine/rpcapi.py @@ -18,8 +18,8 @@ Client side of the engine RPC API. from oslo_config import cfg import oslo_messaging as messaging +from nimble.common import constants from nimble.common import rpc -from nimble.engine import manager from nimble.objects import base as objects_base CONF = cfg.CONF @@ -40,7 +40,7 @@ class EngineAPI(object): super(EngineAPI, self).__init__() self.topic = topic if self.topic is None: - self.topic = manager.MANAGER_TOPIC + self.topic = constants.MANAGER_TOPIC target = messaging.Target(topic=self.topic, version='1.0') @@ -49,13 +49,14 @@ class EngineAPI(object): version_cap=self.RPC_API_VERSION, serializer=serializer) - def create_instance(self, context, instance, - requested_networks, instance_type): + def create_instance(self, context, instance, requested_networks, + request_spec, filter_properties): """Signal to engine service to perform a deployment.""" cctxt = self.client.prepare(topic=self.topic, server=CONF.host) return cctxt.cast(context, 'create_instance', instance=instance, requested_networks=requested_networks, - instance_type=instance_type) + request_spec=request_spec, + filter_properties=filter_properties) def delete_instance(self, context, instance): """Signal to engine service to delete an instance.""" diff --git a/nimble/engine/scheduler/filter_scheduler.py b/nimble/engine/scheduler/filter_scheduler.py index 46f704bf..e094667a 100644 --- a/nimble/engine/scheduler/filter_scheduler.py +++ b/nimble/engine/scheduler/filter_scheduler.py @@ -86,7 +86,7 @@ class FilterScheduler(driver.Scheduler): 'last_node': last_node, 'exc': exc}) - def _populate_retry(self, filter_properties, properties): + def _populate_retry(self, filter_properties, request_spec): """Populate filter properties with history of retries for request. If maximum retries is exceeded, raise NoValidNode. @@ -108,13 +108,13 @@ class FilterScheduler(driver.Scheduler): } filter_properties['retry'] = retry - instance_id = properties.get('instance_id') + instance_id = request_spec.get('instance_id') self._log_instance_error(instance_id, retry) if retry['num_attempts'] > max_attempts: raise exception.NoValidNode( - reason=_("Exceeded max scheduling attempts %(max_attempts)d " - "for instance %(instance_id)s") % + _("Exceeded max scheduling attempts %(max_attempts)d " + "for instance %(instance_id)s") % {'max_attempts': max_attempts, 'instance_id': instance_id}) @@ -133,13 +133,11 @@ class FilterScheduler(driver.Scheduler): if filter_properties is None: filter_properties = {} - self._populate_retry(filter_properties, - request_spec['instance_properties']) + self._populate_retry(filter_properties, request_spec) request_spec_dict = jsonutils.to_primitive(request_spec) - filter_properties.update({'context': context, - 'request_spec': request_spec_dict, + filter_properties.update({'request_spec': request_spec_dict, 'config_options': config_options, 'instance_type': instance_type, 'resource_type': resource_type}) @@ -177,7 +175,7 @@ class FilterScheduler(driver.Scheduler): LOG.warning(_LW('No weighed nodes found for instance ' 'with properties: %s'), request_spec.get('instance_type')) - raise exception.NoValidNode(reason=_("No weighed nodes available")) + raise exception.NoValidNode(_("No weighed nodes available")) top_node = self._choose_top_node(weighed_nodes, request_spec) self._add_retry_node(filter_properties, top_node) diff --git a/nimble/tests/unit/common/test_service.py b/nimble/tests/unit/common/test_service.py index 06effa5a..c7e8e2f7 100644 --- a/nimble/tests/unit/common/test_service.py +++ b/nimble/tests/unit/common/test_service.py @@ -16,6 +16,7 @@ from oslo_config import cfg import oslo_messaging from oslo_service import service as base_service +from nimble.common import constants from nimble.common import exception from nimble.common import rpc from nimble.common import service @@ -34,7 +35,8 @@ class TestRPCService(base.TestCase): host = "fake_host" mgr_module = "nimble.engine.manager" mgr_class = "EngineManager" - self.rpc_svc = service.RPCService(host, mgr_module, mgr_class) + self.rpc_svc = service.RPCService(host, mgr_module, mgr_class, + constants.MANAGER_TOPIC) @mock.patch.object(oslo_messaging, 'Target', autospec=True) @mock.patch.object(objects_base, 'NimbleObjectSerializer', autospec=True) diff --git a/nimble/tests/unit/engine/flows/__init__.py b/nimble/tests/unit/engine/flows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nimble/tests/unit/engine/flows/test_create_instance_flow.py b/nimble/tests/unit/engine/flows/test_create_instance_flow.py new file mode 100644 index 00000000..77a8369e --- /dev/null +++ b/nimble/tests/unit/engine/flows/test_create_instance_flow.py @@ -0,0 +1,96 @@ +# Copyright 2016 Huawei Technologies Co.,LTD. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" Tests for create_instance TaskFlow """ + +import mock +from oslo_utils import uuidutils + +from nimble.common import context +from nimble.engine.baremetal import ironic +from nimble.engine.flows import create_instance +from nimble.engine.scheduler import filter_scheduler as scheduler +from nimble.tests import base +from nimble.tests.unit.objects import utils as obj_utils + + +class CreateInstanceFlowTestCase(base.TestCase): + + def setUp(self): + super(CreateInstanceFlowTestCase, self).setUp() + self.ctxt = context.get_admin_context() + + @mock.patch.object(scheduler.FilterScheduler, 'schedule') + def test_schedule_task_execute(self, mock_schedule): + fake_uuid = uuidutils.generate_uuid() + fake_engine_manager = mock.MagicMock() + fake_request_spec = mock.MagicMock() + fake_filter_props = mock.MagicMock() + fake_engine_manager.scheduler = scheduler.FilterScheduler() + task = create_instance.ScheduleCreateInstanceTask( + fake_engine_manager) + instance_obj = obj_utils.get_test_instance(self.ctxt) + mock_schedule.return_value = fake_uuid + + task.execute(self.ctxt, + instance_obj, + fake_request_spec, + fake_filter_props) + mock_schedule.assert_called_once_with(self.ctxt, + fake_request_spec, + fake_engine_manager.node_cache, + fake_filter_props) + self.assertEqual(fake_uuid, instance_obj.node_uuid) + + @mock.patch.object(ironic, 'validate_node') + @mock.patch.object(ironic, 'set_instance_info') + def test_set_instance_info_task_execute(self, mock_set_inst, + mock_validate): + fake_ironicclient = mock.MagicMock() + task = create_instance.SetInstanceInfoTask( + fake_ironicclient) + instance_obj = obj_utils.get_test_instance(self.ctxt) + mock_set_inst.side_effect = None + mock_validate.side_effect = None + + task.execute(self.ctxt, instance_obj) + mock_set_inst.assert_called_once_with(fake_ironicclient, + instance_obj) + mock_validate.assert_called_once_with(fake_ironicclient, + instance_obj.node_uuid) + + @mock.patch.object(create_instance.BuildNetworkTask, '_build_networks') + def test_create_network_task_execute(self, mock_build_networks): + fake_ironicclient = mock.MagicMock() + fake_requested_networks = mock.MagicMock() + task = create_instance.BuildNetworkTask( + fake_ironicclient) + instance_obj = obj_utils.get_test_instance(self.ctxt) + mock_build_networks.side_effect = None + + task.execute(self.ctxt, instance_obj, fake_requested_networks) + mock_build_networks.assert_called_once_with(self.ctxt, + instance_obj, + fake_requested_networks) + + @mock.patch.object(create_instance.CreateInstanceTask, '_build_instance') + def test_create_instance_task_execute(self, mock_build_inst): + fake_ironicclient = mock.MagicMock() + task = create_instance.CreateInstanceTask( + fake_ironicclient) + instance_obj = obj_utils.get_test_instance(self.ctxt) + mock_build_inst.side_effect = None + + task.execute(self.ctxt, instance_obj) + mock_build_inst.assert_called_once_with(self.ctxt, instance_obj) diff --git a/nimble/tests/unit/engine/test_manager.py b/nimble/tests/unit/engine/test_manager.py index 5504cc6b..278ae28a 100644 --- a/nimble/tests/unit/engine/test_manager.py +++ b/nimble/tests/unit/engine/test_manager.py @@ -16,16 +16,12 @@ """Test class for Nimble ManagerService.""" import mock -from oslo_service import loopingcall from nimble.common import neutron from nimble.engine.baremetal import ironic from nimble.engine.baremetal import ironic_states from nimble.engine import manager -from nimble.engine.scheduler import filter_scheduler as scheduler -from nimble import objects from nimble.tests.unit.db import base as tests_db_base -from nimble.tests.unit.db import utils as db_utils from nimble.tests.unit.engine import mgr_utils from nimble.tests.unit.objects import utils as obj_utils @@ -34,53 +30,6 @@ from nimble.tests.unit.objects import utils as obj_utils class ManageInstanceTestCase(mgr_utils.ServiceSetUpMixin, tests_db_base.DbTestCase): - @mock.patch.object(manager.EngineManager, '_wait_for_active') - @mock.patch.object(ironic, 'do_node_deploy') - def test__build_instance(self, deploy_node_mock, wait_mock, - refresh_cache_mock): - instance = obj_utils.create_test_instance(self.context) - deploy_node_mock.side_effect = None - wait_mock.side_effect = loopingcall.LoopingCallDone() - refresh_cache_mock.side_effect = None - self._start_service() - - self.service._build_instance(self.context, instance) - self._stop_service() - - deploy_node_mock.assert_called_once_with(mock.ANY, instance.node_uuid) - - @mock.patch.object(manager.EngineManager, '_build_instance') - @mock.patch.object(manager.EngineManager, '_build_networks') - @mock.patch.object(ironic, 'validate_node') - @mock.patch.object(ironic, 'set_instance_info') - @mock.patch.object(scheduler.FilterScheduler, 'schedule') - def test_create_instance(self, schedule_mock, set_inst_mock, - validate_mock, build_net_mock, - build_inst_mock, refresh_cache_mock): - instance = obj_utils.create_test_instance(self.context) - fake_type = db_utils.get_test_instance_type(context=self.context) - fake_type['extra_specs'] = {} - inst_type = objects.InstanceType(self.context, **fake_type) - schedule_mock.return_value = 'fake-node' - set_inst_mock.side_effect = None - validate_mock.side_effect = None - build_net_mock.side_effect = None - build_inst_mock.side_effect = None - refresh_cache_mock.side_effect = None - requested_net = [{'uuid': 'fake-net-uuid'}] - - self._start_service() - self.service.node_cache = {'fake-node': 'node'} - self.service.create_instance(self.context, instance, - requested_net, inst_type) - self._stop_service() - - set_inst_mock.assert_called_once_with(mock.ANY, instance) - validate_mock.assert_called_once_with(mock.ANY, instance.node_uuid) - build_net_mock.assert_called_once_with(self.context, instance, - requested_net) - build_inst_mock.assert_called_once_with(self.context, instance) - @mock.patch.object(ironic, 'unplug_vif') @mock.patch.object(ironic, 'get_ports_from_node') @mock.patch.object(neutron, 'delete_port') diff --git a/nimble/tests/unit/engine/test_rpcapi.py b/nimble/tests/unit/engine/test_rpcapi.py index 7e40763e..2a99e1e5 100644 --- a/nimble/tests/unit/engine/test_rpcapi.py +++ b/nimble/tests/unit/engine/test_rpcapi.py @@ -107,7 +107,8 @@ class RPCAPITestCase(base.DbTestCase): version='1.0', instance=self.fake_instance_obj, requested_networks=[], - instance_type=self.fake_type_obj) + request_spec=None, + filter_properties=None) def test_delete_instance(self): self._test_rpcapi('delete_instance', diff --git a/requirements.txt b/requirements.txt index 6fdf8389..6970c663 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,7 @@ oslo.utils>=3.18.0 # Apache-2.0 oslo.versionedobjects>=1.17.0 # Apache-2.0 pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD six>=1.9.0 # MIT +taskflow>=1.26.0 # Apache-2.0 WSME>=0.8 # MIT keystonemiddleware!=4.5.0,>=4.2.0 # Apache-2.0 stevedore>=1.17.1 # Apache-2.0