Merge "Add create instance taskflow"

This commit is contained in:
Jenkins 2016-12-17 00:24:24 +00:00 committed by Gerrit Code Review
commit bbd428c691
19 changed files with 658 additions and 201 deletions

View File

@ -22,6 +22,7 @@ import sys
from oslo_config import cfg from oslo_config import cfg
from oslo_service import service from oslo_service import service
from nimble.common import constants
from nimble.common import service as nimble_service from nimble.common import service as nimble_service
CONF = cfg.CONF CONF = cfg.CONF
@ -33,7 +34,8 @@ def main():
mgr = nimble_service.RPCService(CONF.host, mgr = nimble_service.RPCService(CONF.host,
'nimble.engine.manager', 'nimble.engine.manager',
'EngineManager') 'EngineManager',
constants.MANAGER_TOPIC)
launcher = service.launch(CONF, mgr) launcher = service.launch(CONF, mgr)
launcher.wait() launcher.wait()

View File

@ -0,0 +1,17 @@
# Copyright 2016 Huawei Technologies Co.,LTD.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
MANAGER_TOPIC = 'nimble.engine_manager'

View File

@ -0,0 +1,79 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_log import log as logging
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
from taskflow.listeners import base
from taskflow.listeners import logging as logging_listener
from taskflow import task
LOG = logging.getLogger(__name__)
def _make_task_name(cls, addons=None):
"""Makes a pretty name for a task class."""
base_name = ".".join([cls.__module__, cls.__name__])
extra = ''
if addons:
extra = ';%s' % (", ".join([str(a) for a in addons]))
return base_name + extra
class NimbleTask(task.Task):
"""The root task class for all nimble tasks.
It automatically names the given task using the module and class that
implement the given task as the task name.
"""
def __init__(self, addons=None, **kwargs):
super(NimbleTask, self).__init__(self.make_name(addons), **kwargs)
@classmethod
def make_name(cls, addons=None):
return _make_task_name(cls, addons)
class DynamicLogListener(logging_listener.DynamicLoggingListener):
"""This is used to attach to taskflow engines while they are running.
It provides a bunch of useful features that expose the actions happening
inside a taskflow engine, which can be useful for developers for debugging,
for operations folks for monitoring and tracking of the resource actions
and more...
"""
#: Exception is an excepted case, don't include traceback in log if fails.
# _NO_TRACE_EXCEPTIONS = (exception.InvalidInput, exception.QuotaError)
_NO_TRACE_EXCEPTIONS = ()
def __init__(self, engine,
task_listen_for=base.DEFAULT_LISTEN_FOR,
flow_listen_for=base.DEFAULT_LISTEN_FOR,
retry_listen_for=base.DEFAULT_LISTEN_FOR,
logger=LOG):
super(DynamicLogListener, self).__init__(
engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for,
retry_listen_for=retry_listen_for,
log=logger)
def _format_failure(self, fail):
if fail.check(*self._NO_TRACE_EXCEPTIONS) is not None:
exc_info = None
exc_details = '%s%s' % (os.linesep, fail.pformat(traceback=False))
return (exc_info, exc_details)
else:
return super(DynamicLogListener, self)._format_failure(fail)

View File

@ -36,13 +36,13 @@ LOG = log.getLogger(__name__)
class RPCService(service.Service): class RPCService(service.Service):
def __init__(self, host, manager_module, manager_class): def __init__(self, host, manager_module, manager_class, topic):
super(RPCService, self).__init__() super(RPCService, self).__init__()
self.host = host self.host = host
manager_module = importutils.try_import(manager_module) manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class) manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, manager_module.MANAGER_TOPIC) self.manager = manager_class(host, topic)
self.topic = self.manager.topic self.topic = topic
self.rpcserver = None self.rpcserver = None
def start(self): def start(self):

View File

@ -72,3 +72,15 @@ def validate_and_normalize_mac(address):
if not is_valid_mac(address): if not is_valid_mac(address):
raise exception.InvalidMAC(mac=address) raise exception.InvalidMAC(mac=address)
return address.lower() return address.lower()
def make_pretty_name(method):
"""Makes a pretty name for a function/method."""
meth_pieces = [method.__name__]
# If its an instance method attempt to tack on the class name
if hasattr(method, '__self__') and method.__self__ is not None:
try:
meth_pieces.insert(0, method.__self__.__class__.__name__)
except AttributeError:
pass
return ".".join(meth_pieces)

View File

@ -80,9 +80,19 @@ class API(object):
instance = self._provision_instances(context, base_options) instance = self._provision_instances(context, base_options)
request_spec = {
'instance_id': instance.uuid,
'instance_properties': {
'availability_zone': instance.availability_zone,
'instance_type_uuid': instance.instance_type_uuid,
},
'instance_type': dict(instance_type),
}
self.engine_rpcapi.create_instance(context, instance, self.engine_rpcapi.create_instance(context, instance,
requested_networks, requested_networks,
instance_type) request_spec,
filter_properties=None)
return instance return instance

View File

@ -73,6 +73,15 @@ def set_instance_info(ironicclient, instance):
ironicclient.call("node.update", instance.node_uuid, patch) ironicclient.call("node.update", instance.node_uuid, patch)
def unset_instance_info(ironicclient, instance):
patch = []
patch.append({'path': '/instance_uuid', 'op': 'remove'})
patch.append({'path': '/instance_info', 'op': 'remove'})
ironicclient.call("node.update", instance.node_uuid, patch)
def do_node_deploy(ironicclient, node_uuid): def do_node_deploy(ironicclient, node_uuid):
# trigger the node deploy # trigger the node deploy
ironicclient.call("node.set_provision_state", node_uuid, ironicclient.call("node.set_provision_state", node_uuid,

View File

@ -24,6 +24,7 @@ from nimble.common import ironic
from nimble.common import rpc from nimble.common import rpc
from nimble.conf import CONF from nimble.conf import CONF
from nimble.db import api as dbapi from nimble.db import api as dbapi
from nimble.engine import rpcapi
class BaseEngineManager(periodic_task.PeriodicTasks): class BaseEngineManager(periodic_task.PeriodicTasks):
@ -39,6 +40,7 @@ class BaseEngineManager(periodic_task.PeriodicTasks):
self.scheduler = importutils.import_object(scheduler_driver) self.scheduler = importutils.import_object(scheduler_driver)
self.notifier = rpc.get_notifier() self.notifier = rpc.get_notifier()
self.ironicclient = ironic.IronicClientWrapper() self.ironicclient = ironic.IronicClientWrapper()
self.engine_rpcapi = rpcapi.EngineAPI()
self._started = False self._started = False
def init_host(self): def init_host(self):

View File

View File

@ -0,0 +1,372 @@
# Copyright 2016 Huawei Technologies Co.,LTD.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import traceback
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import timeutils
import taskflow.engines
from taskflow.patterns import linear_flow
from nimble.common import exception
from nimble.common import flow_utils
from nimble.common.i18n import _LE
from nimble.common.i18n import _LI
from nimble.common import neutron
from nimble.common import utils
from nimble.engine.baremetal import ironic
from nimble.engine.baremetal import ironic_states
from nimble.engine import status
LOG = logging.getLogger(__name__)
ACTION = 'instance:create'
CONF = cfg.CONF
class ScheduleCreateInstanceTask(flow_utils.NimbleTask):
"""Activates a scheduler driver and handles any subsequent failure."""
def __init__(self, manager):
requires = ['filter_properties', 'request_spec', 'instance',
'context']
super(ScheduleCreateInstanceTask, self).__init__(addons=[ACTION],
requires=requires)
self.manager = manager
def execute(self, context, instance, request_spec, filter_properties):
top_node = self.manager.scheduler.schedule(context,
request_spec,
self.manager.node_cache,
filter_properties)
instance.node_uuid = top_node
class OnFailureRescheduleTask(flow_utils.NimbleTask):
"""Triggers a rescheduling request to be sent when reverting occurs.
If rescheduling doesn't occur this task errors out the instance.
"""
def __init__(self, engine_rpcapi):
requires = ['filter_properties', 'request_spec', 'instance',
'requested_networks', 'context']
super(OnFailureRescheduleTask, self).__init__(addons=[ACTION],
requires=requires)
self.engine_rpcapi = engine_rpcapi
# These exception types will trigger the instance to be set into error
# status rather than being rescheduled.
self.no_reschedule_exc_types = [
# The instance has been removed from the database, that can not
# be fixed by rescheduling.
exception.InstanceNotFound,
]
def execute(self, **kwargs):
pass
def _reschedule(self, context, cause, request_spec, filter_properties,
instance, requested_networks):
"""Actions that happen during the rescheduling attempt occur here."""
create_instance = self.engine_rpcapi.create_instance
if not filter_properties:
filter_properties = {}
if 'retry' not in filter_properties:
filter_properties['retry'] = {}
retry_info = filter_properties['retry']
num_attempts = retry_info.get('num_attempts', 0)
LOG.debug("Instance %(instance_id)s: re-scheduling %(method)s "
"attempt %(num)d due to %(reason)s",
{'instance_id': instance.uuid,
'method': utils.make_pretty_name(create_instance),
'num': num_attempts,
'reason': cause.exception_str})
if all(cause.exc_info):
# Stringify to avoid circular ref problem in json serialization
retry_info['exc'] = traceback.format_exception(*cause.exc_info)
return create_instance(context, instance, requested_networks,
request_spec=request_spec,
filter_properties=filter_properties)
def revert(self, context, result, flow_failures, instance, **kwargs):
# Check if we have a cause which can tell us not to reschedule and
# set the instance's status to error.
for failure in flow_failures.values():
if failure.check(*self.no_reschedule_exc_types):
LOG.error(_LE("Instance %s: create failed and no reschedule."),
instance.uuid)
return False
cause = list(flow_failures.values())[0]
try:
self._reschedule(context, cause, instance=instance, **kwargs)
return True
except exception.NimbleException:
LOG.exception(_LE("Instance %s: rescheduling failed"),
instance.uuid)
return False
class SetInstanceInfoTask(flow_utils.NimbleTask):
"""Set instance info to ironic node and validate it."""
def __init__(self, ironicclient):
requires = ['instance', 'context']
super(SetInstanceInfoTask, self).__init__(addons=[ACTION],
requires=requires)
self.ironicclient = ironicclient
# These exception types will trigger the instance info to be cleaned.
self.instance_info_cleaned_exc_types = [
exception.ValidationError,
exception.InterfacePlugException,
exception.NetworkError,
]
def execute(self, context, instance):
ironic.set_instance_info(self.ironicclient, instance)
# validate we are ready to do the deploy
validate_chk = ironic.validate_node(self.ironicclient,
instance.node_uuid)
if (not validate_chk.deploy.get('result')
or not validate_chk.power.get('result')):
self._set_instance_obj_error_state(context, instance)
raise exception.ValidationError(_(
"Ironic node: %(id)s failed to validate."
" (deploy: %(deploy)s, power: %(power)s)")
% {'id': instance.node_uuid,
'deploy': validate_chk.deploy,
'power': validate_chk.power})
def revert(self, context, result, flow_failures, instance, **kwargs):
# Check if we have a cause which need to clean up ironic node
# instance info.
for failure in flow_failures.values():
if failure.check(*self.instance_info_cleaned_exc_types):
LOG.debug("Instance %s: cleaning up node instance info",
instance.uuid)
ironic.unset_instance_info(self.ironicclient, instance)
return True
return False
class BuildNetworkTask(flow_utils.NimbleTask):
"""Build network for the instance."""
def __init__(self, ironicclient):
requires = ['instance', 'requested_networks', 'context']
super(BuildNetworkTask, self).__init__(addons=[ACTION],
requires=requires)
self.ironicclient = ironicclient
# These exception types will trigger the network to be cleaned.
self.network_cleaned_exc_types = [
exception.NetworkError,
# include instance create task failure here
exception.InstanceDeployFailure,
loopingcall.LoopingCallTimeOut,
]
def _build_networks(self, context, instance, requested_networks):
node_uuid = instance.node_uuid
ironic_ports = ironic.get_ports_from_node(self.ironicclient,
node_uuid,
detail=True)
LOG.debug(_('Find ports %(ports)s for node %(node)s') %
{'ports': ironic_ports, 'node': node_uuid})
if len(requested_networks) > len(ironic_ports):
raise exception.InterfacePlugException(_(
"Ironic node: %(id)s virtual to physical interface count"
" mismatch"
" (Vif count: %(vif_count)d, Pif count: %(pif_count)d)")
% {'id': instance.node_uuid,
'vif_count': len(requested_networks),
'pif_count': len(ironic_ports)})
network_info = {}
for vif in requested_networks:
for pif in ironic_ports:
# Match the specified port type with physical interface type
if vif.get('port_type') == pif.extra.get('port_type'):
try:
port = neutron.create_port(context, vif['uuid'],
pif.address, instance.uuid)
port_dict = port['port']
network_info[port_dict['id']] = {
'network': port_dict['network_id'],
'mac_address': port_dict['mac_address'],
'fixed_ips': port_dict['fixed_ips']}
ironic.plug_vif(self.ironicclient, pif.uuid,
port_dict['id'])
except Exception:
# Set network_info here, so we can clean up the created
# networks during reverting.
instance.network_info = network_info
LOG.error(_LE("Instance %s: create network failed"),
instance.uuid)
raise exception.NetworkError(_(
"Build network for instance failed."))
return network_info
def _destroy_networks(self, context, instance):
LOG.debug("unplug: instance_uuid=%(uuid)s vif=%(network_info)s",
{'uuid': instance.uuid,
'network_info': str(instance.network_info)})
ports = instance.network_info.keys()
for port in ports:
neutron.delete_port(context, port, instance.uuid)
ironic_ports = ironic.get_ports_from_node(self.ironicclient,
instance.node_uuid,
detail=True)
for pif in ironic_ports:
if 'vif_port_id' in pif.extra:
ironic.unplug_vif(self.ironicclient, pif.uuid)
def execute(self, context, instance, requested_networks):
network_info = self._build_networks(
context,
instance,
requested_networks)
instance.network_info = network_info
def revert(self, context, result, flow_failures, instance, **kwargs):
# Check if we have a cause which need to clean up networks.
for failure in flow_failures.values():
if failure.check(*self.network_cleaned_exc_types):
LOG.debug("Instance %s: cleaning up node networks",
instance.uuid)
if instance.network_info:
self._destroy_networks(context, instance)
# Unset network_info here as we have destroyed it.
instance.network_info = {}
return True
return False
class CreateInstanceTask(flow_utils.NimbleTask):
"""Set instance info to ironic node and validate it."""
def __init__(self, ironicclient):
requires = ['instance', 'context']
super(CreateInstanceTask, self).__init__(addons=[ACTION],
requires=requires)
self.ironicclient = ironicclient
# These exception types will trigger the instance to be cleaned.
self.instance_cleaned_exc_types = [
exception.InstanceDeployFailure,
loopingcall.LoopingCallTimeOut,
]
def _wait_for_active(self, instance):
"""Wait for the node to be marked as ACTIVE in Ironic."""
node = ironic.get_node_by_instance(self.ironicclient,
instance.uuid)
LOG.debug('Current ironic node state is %s', node.provision_state)
if node.provision_state == ironic_states.ACTIVE:
# job is done
LOG.debug("Ironic node %(node)s is now ACTIVE",
dict(node=node.uuid))
instance.status = status.ACTIVE
instance.launched_at = timeutils.utcnow()
instance.save()
raise loopingcall.LoopingCallDone()
if node.target_provision_state in (ironic_states.DELETED,
ironic_states.AVAILABLE):
# ironic is trying to delete it now
raise exception.InstanceNotFound(instance_id=instance.uuid)
if node.provision_state in (ironic_states.NOSTATE,
ironic_states.AVAILABLE):
# ironic already deleted it
raise exception.InstanceNotFound(instance_id=instance.uuid)
if node.provision_state == ironic_states.DEPLOYFAIL:
# ironic failed to deploy
msg = (_("Failed to provision instance %(inst)s: %(reason)s")
% {'inst': instance.uuid, 'reason': node.last_error})
raise exception.InstanceDeployFailure(msg)
def _build_instance(self, context, instance):
ironic.do_node_deploy(self.ironicclient, instance.node_uuid)
timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_active,
instance)
timer.start(interval=CONF.ironic.api_retry_interval).wait()
LOG.info(_LI('Successfully provisioned Ironic node %s'),
instance.node_uuid)
def execute(self, context, instance):
self._build_instance(context, instance)
def revert(self, context, result, flow_failures, instance, **kwargs):
# Check if we have a cause which need to clean up instance.
for failure in flow_failures.values():
if failure.check(*self.instance_cleaned_exc_types):
LOG.debug("Instance %s: destroy ironic node", instance.uuid)
ironic.destroy_node(self.ironicclient, instance.node_uuid)
return True
return False
def get_flow(context, manager, instance, requested_networks, request_spec,
filter_properties):
"""Constructs and returns the manager entrypoint flow
This flow will do the following:
1. Schedule a node to create instance
2. Set instance info to ironic node and validate it's ready to deploy
3. Build networks for the instance and set port id back to ironic port
4. Do node deploy and handle errors.
"""
flow_name = ACTION.replace(":", "_") + "_manager"
instance_flow = linear_flow.Flow(flow_name)
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
create_what = {
'context': context,
'filter_properties': filter_properties,
'request_spec': request_spec,
'instance': instance,
'requested_networks': requested_networks
}
instance_flow.add(ScheduleCreateInstanceTask(manager),
OnFailureRescheduleTask(manager.engine_rpcapi),
SetInstanceInfoTask(manager.ironicclient),
BuildNetworkTask(manager.ironicclient),
CreateInstanceTask(manager.ironicclient))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(instance_flow, store=create_what)

View File

@ -15,11 +15,10 @@
from oslo_log import log from oslo_log import log
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_service import loopingcall
from oslo_service import periodic_task from oslo_service import periodic_task
from oslo_utils import timeutils
from nimble.common import exception from nimble.common import exception
from nimble.common import flow_utils
from nimble.common.i18n import _LE from nimble.common.i18n import _LE
from nimble.common.i18n import _LI from nimble.common.i18n import _LI
from nimble.common import neutron from nimble.common import neutron
@ -27,10 +26,9 @@ from nimble.conf import CONF
from nimble.engine.baremetal import ironic from nimble.engine.baremetal import ironic
from nimble.engine.baremetal import ironic_states from nimble.engine.baremetal import ironic_states
from nimble.engine import base_manager from nimble.engine import base_manager
from nimble.engine.flows import create_instance
from nimble.engine import status from nimble.engine import status
MANAGER_TOPIC = 'nimble.engine_manager'
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -65,39 +63,6 @@ class EngineManager(base_manager.BaseEngineManager):
LOG.debug('Instance has been destroyed from under us while ' LOG.debug('Instance has been destroyed from under us while '
'trying to set it to ERROR', instance=instance) 'trying to set it to ERROR', instance=instance)
def _build_networks(self, context, instance, requested_networks):
node_uuid = instance.node_uuid
ironic_ports = ironic.get_ports_from_node(self.ironicclient,
node_uuid,
detail=True)
LOG.debug(_('Find ports %(ports)s for node %(node)s') %
{'ports': ironic_ports, 'node': node_uuid})
if len(requested_networks) > len(ironic_ports):
raise exception.InterfacePlugException(_(
"Ironic node: %(id)s virtual to physical interface count"
" mismatch"
" (Vif count: %(vif_count)d, Pif count: %(pif_count)d)")
% {'id': instance.node_uuid,
'vif_count': len(requested_networks),
'pif_count': len(ironic_ports)})
network_info = {}
for vif in requested_networks:
for pif in ironic_ports:
# Match the specified port type with physical interface type
if vif.get('port_type') == pif.extra.get('port_type'):
port = neutron.create_port(context, vif['uuid'],
pif.address, instance.uuid)
port_dict = port['port']
network_info[port_dict['id']] = {
'network': port_dict['network_id'],
'mac_address': port_dict['mac_address'],
'fixed_ips': port_dict['fixed_ips']}
ironic.plug_vif(self.ironicclient, pif.uuid,
port_dict['id'])
return network_info
def _destroy_networks(self, context, instance): def _destroy_networks(self, context, instance):
LOG.debug("unplug: instance_uuid=%(uuid)s vif=%(network_info)s", LOG.debug("unplug: instance_uuid=%(uuid)s vif=%(network_info)s",
{'uuid': instance.uuid, {'uuid': instance.uuid,
@ -114,110 +79,51 @@ class EngineManager(base_manager.BaseEngineManager):
if 'vif_port_id' in pif.extra: if 'vif_port_id' in pif.extra:
ironic.unplug_vif(self.ironicclient, pif.uuid) ironic.unplug_vif(self.ironicclient, pif.uuid)
def _wait_for_active(self, instance):
"""Wait for the node to be marked as ACTIVE in Ironic."""
node = ironic.get_node_by_instance(self.ironicclient,
instance.uuid)
LOG.debug('Current ironic node state is %s', node.provision_state)
if node.provision_state == ironic_states.ACTIVE:
# job is done
LOG.debug("Ironic node %(node)s is now ACTIVE",
dict(node=node.uuid))
instance.status = status.ACTIVE
instance.launched_at = timeutils.utcnow()
instance.save()
raise loopingcall.LoopingCallDone()
if node.target_provision_state in (ironic_states.DELETED,
ironic_states.AVAILABLE):
# ironic is trying to delete it now
raise exception.InstanceNotFound(instance_id=instance.uuid)
if node.provision_state in (ironic_states.NOSTATE,
ironic_states.AVAILABLE):
# ironic already deleted it
raise exception.InstanceNotFound(instance_id=instance.uuid)
if node.provision_state == ironic_states.DEPLOYFAIL:
# ironic failed to deploy
msg = (_("Failed to provision instance %(inst)s: %(reason)s")
% {'inst': instance.uuid, 'reason': node.last_error})
raise exception.InstanceDeployFailure(msg)
def _build_instance(self, context, instance):
ironic.do_node_deploy(self.ironicclient, instance.node_uuid)
timer = loopingcall.FixedIntervalLoopingCall(self._wait_for_active,
instance)
timer.start(interval=CONF.ironic.api_retry_interval).wait()
LOG.info(_LI('Successfully provisioned Ironic node %s'),
instance.node_uuid)
def _destroy_instance(self, context, instance): def _destroy_instance(self, context, instance):
ironic.destroy_node(self.ironicclient, instance.node_uuid) ironic.destroy_node(self.ironicclient, instance.node_uuid)
LOG.info(_LI('Successfully destroyed Ironic node %s'), LOG.info(_LI('Successfully destroyed Ironic node %s'),
instance.node_uuid) instance.node_uuid)
def create_instance(self, context, instance, def create_instance(self, context, instance, requested_networks,
requested_networks, instance_type): request_spec=None, filter_properties=None):
"""Perform a deployment.""" """Perform a deployment."""
LOG.debug("Starting instance...") LOG.debug("Starting instance...")
# Populate request spec if filter_properties is None:
instance_type_uuid = instance.instance_type_uuid filter_properties = {}
request_spec = {
'instance_id': instance.uuid,
'instance_properties': {
'availability_zone': instance.availability_zone,
'instance_type_uuid': instance_type_uuid,
},
'instance_type': dict(instance_type),
}
LOG.debug("Scheduling with request_spec: %s", request_spec)
# TODO(zhenguo): Add retry
filter_properties = {}
try: try:
top_node = self.scheduler.schedule(context, flow_engine = create_instance.get_flow(
request_spec, context,
self.node_cache, self,
filter_properties) instance,
requested_networks,
request_spec,
filter_properties,
)
except Exception:
msg = _("Create manager instance flow failed.")
LOG.exception(msg)
raise exception.NimbleException(msg)
def _run_flow():
# This code executes create instance flow. If something goes wrong,
# flow reverts all job that was done and reraises an exception.
# Otherwise, all data that was generated by flow becomes available
# in flow engine's storage.
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
try:
_run_flow()
except exception.NoValidNode: except exception.NoValidNode:
self._set_instance_obj_error_state(context, instance) self._set_instance_obj_error_state(context, instance)
raise exception.NoValidNode( LOG.error(_LE("Created instance %s failed, No valid node "
_('No valid node is found with request spec %s') % "is found with the request spec."), instance.uuid)
request_spec) else:
instance.node_uuid = top_node LOG.info(_LI("Created instance %s successfully."), instance.uuid)
del self.node_cache[top_node] finally:
return instance
ironic.set_instance_info(self.ironicclient, instance)
# validate we are ready to do the deploy
validate_chk = ironic.validate_node(self.ironicclient,
instance.node_uuid)
if (not validate_chk.deploy.get('result')
or not validate_chk.power.get('result')):
self._set_instance_obj_error_state(context, instance)
raise exception.ValidationError(_(
"Ironic node: %(id)s failed to validate."
" (deploy: %(deploy)s, power: %(power)s)")
% {'id': instance.node_uuid,
'deploy': validate_chk.deploy,
'power': validate_chk.power})
try:
network_info = self._build_networks(context, instance,
requested_networks)
except Exception:
self._set_instance_obj_error_state(context, instance)
return
instance.network_info = network_info
try:
self._build_instance(context, instance)
except Exception:
self._set_instance_obj_error_state(context, instance)
def delete_instance(self, context, instance): def delete_instance(self, context, instance):
"""Delete an instance.""" """Delete an instance."""

View File

@ -18,8 +18,8 @@ Client side of the engine RPC API.
from oslo_config import cfg from oslo_config import cfg
import oslo_messaging as messaging import oslo_messaging as messaging
from nimble.common import constants
from nimble.common import rpc from nimble.common import rpc
from nimble.engine import manager
from nimble.objects import base as objects_base from nimble.objects import base as objects_base
CONF = cfg.CONF CONF = cfg.CONF
@ -40,7 +40,7 @@ class EngineAPI(object):
super(EngineAPI, self).__init__() super(EngineAPI, self).__init__()
self.topic = topic self.topic = topic
if self.topic is None: if self.topic is None:
self.topic = manager.MANAGER_TOPIC self.topic = constants.MANAGER_TOPIC
target = messaging.Target(topic=self.topic, target = messaging.Target(topic=self.topic,
version='1.0') version='1.0')
@ -49,13 +49,14 @@ class EngineAPI(object):
version_cap=self.RPC_API_VERSION, version_cap=self.RPC_API_VERSION,
serializer=serializer) serializer=serializer)
def create_instance(self, context, instance, def create_instance(self, context, instance, requested_networks,
requested_networks, instance_type): request_spec, filter_properties):
"""Signal to engine service to perform a deployment.""" """Signal to engine service to perform a deployment."""
cctxt = self.client.prepare(topic=self.topic, server=CONF.host) cctxt = self.client.prepare(topic=self.topic, server=CONF.host)
return cctxt.cast(context, 'create_instance', instance=instance, return cctxt.cast(context, 'create_instance', instance=instance,
requested_networks=requested_networks, requested_networks=requested_networks,
instance_type=instance_type) request_spec=request_spec,
filter_properties=filter_properties)
def delete_instance(self, context, instance): def delete_instance(self, context, instance):
"""Signal to engine service to delete an instance.""" """Signal to engine service to delete an instance."""

View File

@ -86,7 +86,7 @@ class FilterScheduler(driver.Scheduler):
'last_node': last_node, 'last_node': last_node,
'exc': exc}) 'exc': exc})
def _populate_retry(self, filter_properties, properties): def _populate_retry(self, filter_properties, request_spec):
"""Populate filter properties with history of retries for request. """Populate filter properties with history of retries for request.
If maximum retries is exceeded, raise NoValidNode. If maximum retries is exceeded, raise NoValidNode.
@ -108,13 +108,13 @@ class FilterScheduler(driver.Scheduler):
} }
filter_properties['retry'] = retry filter_properties['retry'] = retry
instance_id = properties.get('instance_id') instance_id = request_spec.get('instance_id')
self._log_instance_error(instance_id, retry) self._log_instance_error(instance_id, retry)
if retry['num_attempts'] > max_attempts: if retry['num_attempts'] > max_attempts:
raise exception.NoValidNode( raise exception.NoValidNode(
reason=_("Exceeded max scheduling attempts %(max_attempts)d " _("Exceeded max scheduling attempts %(max_attempts)d "
"for instance %(instance_id)s") % "for instance %(instance_id)s") %
{'max_attempts': max_attempts, {'max_attempts': max_attempts,
'instance_id': instance_id}) 'instance_id': instance_id})
@ -133,13 +133,11 @@ class FilterScheduler(driver.Scheduler):
if filter_properties is None: if filter_properties is None:
filter_properties = {} filter_properties = {}
self._populate_retry(filter_properties, self._populate_retry(filter_properties, request_spec)
request_spec['instance_properties'])
request_spec_dict = jsonutils.to_primitive(request_spec) request_spec_dict = jsonutils.to_primitive(request_spec)
filter_properties.update({'context': context, filter_properties.update({'request_spec': request_spec_dict,
'request_spec': request_spec_dict,
'config_options': config_options, 'config_options': config_options,
'instance_type': instance_type, 'instance_type': instance_type,
'resource_type': resource_type}) 'resource_type': resource_type})
@ -177,7 +175,7 @@ class FilterScheduler(driver.Scheduler):
LOG.warning(_LW('No weighed nodes found for instance ' LOG.warning(_LW('No weighed nodes found for instance '
'with properties: %s'), 'with properties: %s'),
request_spec.get('instance_type')) request_spec.get('instance_type'))
raise exception.NoValidNode(reason=_("No weighed nodes available")) raise exception.NoValidNode(_("No weighed nodes available"))
top_node = self._choose_top_node(weighed_nodes, request_spec) top_node = self._choose_top_node(weighed_nodes, request_spec)
self._add_retry_node(filter_properties, top_node) self._add_retry_node(filter_properties, top_node)

View File

@ -16,6 +16,7 @@ from oslo_config import cfg
import oslo_messaging import oslo_messaging
from oslo_service import service as base_service from oslo_service import service as base_service
from nimble.common import constants
from nimble.common import exception from nimble.common import exception
from nimble.common import rpc from nimble.common import rpc
from nimble.common import service from nimble.common import service
@ -34,7 +35,8 @@ class TestRPCService(base.TestCase):
host = "fake_host" host = "fake_host"
mgr_module = "nimble.engine.manager" mgr_module = "nimble.engine.manager"
mgr_class = "EngineManager" mgr_class = "EngineManager"
self.rpc_svc = service.RPCService(host, mgr_module, mgr_class) self.rpc_svc = service.RPCService(host, mgr_module, mgr_class,
constants.MANAGER_TOPIC)
@mock.patch.object(oslo_messaging, 'Target', autospec=True) @mock.patch.object(oslo_messaging, 'Target', autospec=True)
@mock.patch.object(objects_base, 'NimbleObjectSerializer', autospec=True) @mock.patch.object(objects_base, 'NimbleObjectSerializer', autospec=True)

View File

@ -0,0 +1,96 @@
# Copyright 2016 Huawei Technologies Co.,LTD.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" Tests for create_instance TaskFlow """
import mock
from oslo_utils import uuidutils
from nimble.common import context
from nimble.engine.baremetal import ironic
from nimble.engine.flows import create_instance
from nimble.engine.scheduler import filter_scheduler as scheduler
from nimble.tests import base
from nimble.tests.unit.objects import utils as obj_utils
class CreateInstanceFlowTestCase(base.TestCase):
def setUp(self):
super(CreateInstanceFlowTestCase, self).setUp()
self.ctxt = context.get_admin_context()
@mock.patch.object(scheduler.FilterScheduler, 'schedule')
def test_schedule_task_execute(self, mock_schedule):
fake_uuid = uuidutils.generate_uuid()
fake_engine_manager = mock.MagicMock()
fake_request_spec = mock.MagicMock()
fake_filter_props = mock.MagicMock()
fake_engine_manager.scheduler = scheduler.FilterScheduler()
task = create_instance.ScheduleCreateInstanceTask(
fake_engine_manager)
instance_obj = obj_utils.get_test_instance(self.ctxt)
mock_schedule.return_value = fake_uuid
task.execute(self.ctxt,
instance_obj,
fake_request_spec,
fake_filter_props)
mock_schedule.assert_called_once_with(self.ctxt,
fake_request_spec,
fake_engine_manager.node_cache,
fake_filter_props)
self.assertEqual(fake_uuid, instance_obj.node_uuid)
@mock.patch.object(ironic, 'validate_node')
@mock.patch.object(ironic, 'set_instance_info')
def test_set_instance_info_task_execute(self, mock_set_inst,
mock_validate):
fake_ironicclient = mock.MagicMock()
task = create_instance.SetInstanceInfoTask(
fake_ironicclient)
instance_obj = obj_utils.get_test_instance(self.ctxt)
mock_set_inst.side_effect = None
mock_validate.side_effect = None
task.execute(self.ctxt, instance_obj)
mock_set_inst.assert_called_once_with(fake_ironicclient,
instance_obj)
mock_validate.assert_called_once_with(fake_ironicclient,
instance_obj.node_uuid)
@mock.patch.object(create_instance.BuildNetworkTask, '_build_networks')
def test_create_network_task_execute(self, mock_build_networks):
fake_ironicclient = mock.MagicMock()
fake_requested_networks = mock.MagicMock()
task = create_instance.BuildNetworkTask(
fake_ironicclient)
instance_obj = obj_utils.get_test_instance(self.ctxt)
mock_build_networks.side_effect = None
task.execute(self.ctxt, instance_obj, fake_requested_networks)
mock_build_networks.assert_called_once_with(self.ctxt,
instance_obj,
fake_requested_networks)
@mock.patch.object(create_instance.CreateInstanceTask, '_build_instance')
def test_create_instance_task_execute(self, mock_build_inst):
fake_ironicclient = mock.MagicMock()
task = create_instance.CreateInstanceTask(
fake_ironicclient)
instance_obj = obj_utils.get_test_instance(self.ctxt)
mock_build_inst.side_effect = None
task.execute(self.ctxt, instance_obj)
mock_build_inst.assert_called_once_with(self.ctxt, instance_obj)

View File

@ -16,16 +16,12 @@
"""Test class for Nimble ManagerService.""" """Test class for Nimble ManagerService."""
import mock import mock
from oslo_service import loopingcall
from nimble.common import neutron from nimble.common import neutron
from nimble.engine.baremetal import ironic from nimble.engine.baremetal import ironic
from nimble.engine.baremetal import ironic_states from nimble.engine.baremetal import ironic_states
from nimble.engine import manager from nimble.engine import manager
from nimble.engine.scheduler import filter_scheduler as scheduler
from nimble import objects
from nimble.tests.unit.db import base as tests_db_base from nimble.tests.unit.db import base as tests_db_base
from nimble.tests.unit.db import utils as db_utils
from nimble.tests.unit.engine import mgr_utils from nimble.tests.unit.engine import mgr_utils
from nimble.tests.unit.objects import utils as obj_utils from nimble.tests.unit.objects import utils as obj_utils
@ -34,53 +30,6 @@ from nimble.tests.unit.objects import utils as obj_utils
class ManageInstanceTestCase(mgr_utils.ServiceSetUpMixin, class ManageInstanceTestCase(mgr_utils.ServiceSetUpMixin,
tests_db_base.DbTestCase): tests_db_base.DbTestCase):
@mock.patch.object(manager.EngineManager, '_wait_for_active')
@mock.patch.object(ironic, 'do_node_deploy')
def test__build_instance(self, deploy_node_mock, wait_mock,
refresh_cache_mock):
instance = obj_utils.create_test_instance(self.context)
deploy_node_mock.side_effect = None
wait_mock.side_effect = loopingcall.LoopingCallDone()
refresh_cache_mock.side_effect = None
self._start_service()
self.service._build_instance(self.context, instance)
self._stop_service()
deploy_node_mock.assert_called_once_with(mock.ANY, instance.node_uuid)
@mock.patch.object(manager.EngineManager, '_build_instance')
@mock.patch.object(manager.EngineManager, '_build_networks')
@mock.patch.object(ironic, 'validate_node')
@mock.patch.object(ironic, 'set_instance_info')
@mock.patch.object(scheduler.FilterScheduler, 'schedule')
def test_create_instance(self, schedule_mock, set_inst_mock,
validate_mock, build_net_mock,
build_inst_mock, refresh_cache_mock):
instance = obj_utils.create_test_instance(self.context)
fake_type = db_utils.get_test_instance_type(context=self.context)
fake_type['extra_specs'] = {}
inst_type = objects.InstanceType(self.context, **fake_type)
schedule_mock.return_value = 'fake-node'
set_inst_mock.side_effect = None
validate_mock.side_effect = None
build_net_mock.side_effect = None
build_inst_mock.side_effect = None
refresh_cache_mock.side_effect = None
requested_net = [{'uuid': 'fake-net-uuid'}]
self._start_service()
self.service.node_cache = {'fake-node': 'node'}
self.service.create_instance(self.context, instance,
requested_net, inst_type)
self._stop_service()
set_inst_mock.assert_called_once_with(mock.ANY, instance)
validate_mock.assert_called_once_with(mock.ANY, instance.node_uuid)
build_net_mock.assert_called_once_with(self.context, instance,
requested_net)
build_inst_mock.assert_called_once_with(self.context, instance)
@mock.patch.object(ironic, 'unplug_vif') @mock.patch.object(ironic, 'unplug_vif')
@mock.patch.object(ironic, 'get_ports_from_node') @mock.patch.object(ironic, 'get_ports_from_node')
@mock.patch.object(neutron, 'delete_port') @mock.patch.object(neutron, 'delete_port')

View File

@ -107,7 +107,8 @@ class RPCAPITestCase(base.DbTestCase):
version='1.0', version='1.0',
instance=self.fake_instance_obj, instance=self.fake_instance_obj,
requested_networks=[], requested_networks=[],
instance_type=self.fake_type_obj) request_spec=None,
filter_properties=None)
def test_delete_instance(self): def test_delete_instance(self):
self._test_rpcapi('delete_instance', self._test_rpcapi('delete_instance',

View File

@ -26,6 +26,7 @@ oslo.utils>=3.18.0 # Apache-2.0
oslo.versionedobjects>=1.17.0 # Apache-2.0 oslo.versionedobjects>=1.17.0 # Apache-2.0
pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
six>=1.9.0 # MIT six>=1.9.0 # MIT
taskflow>=1.26.0 # Apache-2.0
WSME>=0.8 # MIT WSME>=0.8 # MIT
keystonemiddleware!=4.5.0,>=4.2.0 # Apache-2.0 keystonemiddleware!=4.5.0,>=4.2.0 # Apache-2.0
stevedore>=1.17.1 # Apache-2.0 stevedore>=1.17.1 # Apache-2.0