start
This commit is contained in:
0
iotronic/conductor/__init__.py
Normal file
0
iotronic/conductor/__init__.py
Normal file
2195
iotronic/conductor/__old/manager.py
Normal file
2195
iotronic/conductor/__old/manager.py
Normal file
File diff suppressed because it is too large
Load Diff
362
iotronic/conductor/__old/task_manager.py
Normal file
362
iotronic/conductor/__old/task_manager.py
Normal file
@@ -0,0 +1,362 @@
|
||||
# coding=utf-8
|
||||
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A context manager to perform a series of tasks on a set of resources.
|
||||
|
||||
:class:`TaskManager` is a context manager, created on-demand to allow
|
||||
synchronized access to a node and its resources.
|
||||
|
||||
The :class:`TaskManager` will, by default, acquire an exclusive lock on
|
||||
a node for the duration that the TaskManager instance exists. You may
|
||||
create a TaskManager instance without locking by passing "shared=True"
|
||||
when creating it, but certain operations on the resources held by such
|
||||
an instance of TaskManager will not be possible. Requiring this exclusive
|
||||
lock guards against parallel operations interfering with each other.
|
||||
|
||||
A shared lock is useful when performing non-interfering operations,
|
||||
such as validating the driver interfaces.
|
||||
|
||||
An exclusive lock is stored in the database to coordinate between
|
||||
:class:`iotronic.iotconductor.manager` instances, that are typically deployed on
|
||||
different hosts.
|
||||
|
||||
:class:`TaskManager` methods, as well as driver methods, may be decorated to
|
||||
determine whether their invocation requires an exclusive lock.
|
||||
|
||||
The TaskManager instance exposes certain node resources and properties as
|
||||
attributes that you may access:
|
||||
|
||||
task.context
|
||||
The context passed to TaskManager()
|
||||
task.shared
|
||||
False if Node is locked, True if it is not locked. (The
|
||||
'shared' kwarg arg of TaskManager())
|
||||
task.node
|
||||
The Node object
|
||||
task.ports
|
||||
Ports belonging to the Node
|
||||
task.driver
|
||||
The Driver for the Node, or the Driver based on the
|
||||
'driver_name' kwarg of TaskManager().
|
||||
|
||||
Example usage:
|
||||
|
||||
::
|
||||
|
||||
with task_manager.acquire(context, node_id) as task:
|
||||
task.driver.power.power_on(task.node)
|
||||
|
||||
If you need to execute task-requiring code in a background thread, the
|
||||
TaskManager instance provides an interface to handle this for you, making
|
||||
sure to release resources when the thread finishes (successfully or if
|
||||
an exception occurs). Common use of this is within the Manager like so:
|
||||
|
||||
::
|
||||
|
||||
with task_manager.acquire(context, node_id) as task:
|
||||
<do some work>
|
||||
task.spawn_after(self._spawn_worker,
|
||||
utils.node_power_action, task, new_state)
|
||||
|
||||
All exceptions that occur in the current GreenThread as part of the
|
||||
spawn handling are re-raised. You can specify a hook to execute custom
|
||||
code when such exceptions occur. For example, the hook is a more elegant
|
||||
solution than wrapping the "with task_manager.acquire()" with a
|
||||
try..exception block. (Note that this hook does not handle exceptions
|
||||
raised in the background thread.):
|
||||
|
||||
::
|
||||
|
||||
def on_error(e):
|
||||
if isinstance(e, Exception):
|
||||
...
|
||||
|
||||
with task_manager.acquire(context, node_id) as task:
|
||||
<do some work>
|
||||
task.set_spawn_error_hook(on_error)
|
||||
task.spawn_after(self._spawn_worker,
|
||||
utils.node_power_action, task, new_state)
|
||||
|
||||
"""
|
||||
|
||||
import functools
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
import retrying
|
||||
|
||||
from iotronic.common import driver_factory
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.common import states
|
||||
from iotronic import objects
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def require_exclusive_lock(f):
|
||||
"""Decorator to require an exclusive lock.
|
||||
|
||||
Decorated functions must take a :class:`TaskManager` as the first
|
||||
parameter. Decorated class methods should take a :class:`TaskManager`
|
||||
as the first parameter after "self".
|
||||
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
task = args[0] if isinstance(args[0], TaskManager) else args[1]
|
||||
if task.shared:
|
||||
raise exception.ExclusiveLockRequired()
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def acquire(context, node_id, shared=False, driver_name=None):
|
||||
"""Shortcut for acquiring a lock on a Node.
|
||||
|
||||
:param context: Request context.
|
||||
:param node_id: ID or UUID of node to lock.
|
||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
||||
lock. Default: False.
|
||||
:param driver_name: Name of Driver. Default: None.
|
||||
:returns: An instance of :class:`TaskManager`.
|
||||
|
||||
"""
|
||||
return TaskManager(context, node_id, shared=shared,
|
||||
driver_name=driver_name)
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
"""Context manager for tasks.
|
||||
|
||||
This class wraps the locking, driver loading, and acquisition
|
||||
of related resources (eg, Node and Ports) when beginning a unit of work.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, context, node_id, shared=False, driver_name=None):
|
||||
"""Create a new TaskManager.
|
||||
|
||||
Acquire a lock on a node. The lock can be either shared or
|
||||
exclusive. Shared locks may be used for read-only or
|
||||
non-disruptive actions only, and must be considerate to what
|
||||
other threads may be doing on the same node at the same time.
|
||||
|
||||
:param context: request context
|
||||
:param node_id: ID or UUID of node to lock.
|
||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
||||
lock. Default: False.
|
||||
:param driver_name: The name of the driver to load, if different
|
||||
from the Node's current driver.
|
||||
:raises: DriverNotFound
|
||||
:raises: NodeNotFound
|
||||
:raises: NodeLocked
|
||||
|
||||
"""
|
||||
|
||||
self._spawn_method = None
|
||||
self._on_error_method = None
|
||||
|
||||
self.context = context
|
||||
self.node = None
|
||||
self.shared = shared
|
||||
|
||||
self.fsm = states.machine.copy()
|
||||
|
||||
# NodeLocked exceptions can be annoying. Let's try to alleviate
|
||||
# some of that pain by retrying our lock attempts. The retrying
|
||||
# module expects a wait_fixed value in milliseconds.
|
||||
@retrying.retry(
|
||||
retry_on_exception=lambda e: isinstance(e, exception.NodeLocked),
|
||||
stop_max_attempt_number=CONF.conductor.node_locked_retry_attempts,
|
||||
wait_fixed=CONF.conductor.node_locked_retry_interval * 1000)
|
||||
def reserve_node():
|
||||
LOG.debug("Attempting to reserve node %(node)s",
|
||||
{'node': node_id})
|
||||
self.node = objects.Node.reserve(context, CONF.host, node_id)
|
||||
|
||||
try:
|
||||
if not self.shared:
|
||||
reserve_node()
|
||||
else:
|
||||
self.node = objects.Node.get(context, node_id)
|
||||
#self.ports = objects.Port.list_by_node_id(context, self.node.id)
|
||||
#self.driver = driver_factory.get_driver(driver_name or
|
||||
# self.node.driver)
|
||||
|
||||
# NOTE(deva): this handles the Juno-era NOSTATE state
|
||||
# and should be deleted after Kilo is released
|
||||
'''
|
||||
if self.node.provision_state is states.NOSTATE:
|
||||
self.node.provision_state = states.AVAILABLE
|
||||
self.node.save()
|
||||
|
||||
self.fsm.initialize(self.node.provision_state)
|
||||
'''
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.release_resources()
|
||||
|
||||
def spawn_after(self, _spawn_method, *args, **kwargs):
|
||||
"""Call this to spawn a thread to complete the task.
|
||||
|
||||
The specified method will be called when the TaskManager instance
|
||||
exits.
|
||||
|
||||
:param _spawn_method: a method that returns a GreenThread object
|
||||
:param args: args passed to the method.
|
||||
:param kwargs: additional kwargs passed to the method.
|
||||
|
||||
"""
|
||||
self._spawn_method = _spawn_method
|
||||
self._spawn_args = args
|
||||
self._spawn_kwargs = kwargs
|
||||
|
||||
def set_spawn_error_hook(self, _on_error_method, *args, **kwargs):
|
||||
"""Create a hook to handle exceptions when spawning a task.
|
||||
|
||||
Create a hook that gets called upon an exception being raised
|
||||
from spawning a background thread to do a task.
|
||||
|
||||
:param _on_error_method: a callable object, it's first parameter
|
||||
should accept the Exception object that was raised.
|
||||
:param args: additional args passed to the callable object.
|
||||
:param kwargs: additional kwargs passed to the callable object.
|
||||
|
||||
"""
|
||||
self._on_error_method = _on_error_method
|
||||
self._on_error_args = args
|
||||
self._on_error_kwargs = kwargs
|
||||
|
||||
def release_resources(self):
|
||||
"""Unlock a node and release resources.
|
||||
|
||||
If an exclusive lock is held, unlock the node. Reset attributes
|
||||
to make it clear that this instance of TaskManager should no
|
||||
longer be accessed.
|
||||
"""
|
||||
|
||||
if not self.shared:
|
||||
try:
|
||||
if self.node:
|
||||
objects.Node.release(self.context, CONF.host, self.node.id)
|
||||
except exception.NodeNotFound:
|
||||
# squelch the exception if the node was deleted
|
||||
# within the task's context.
|
||||
pass
|
||||
self.node = None
|
||||
self.driver = None
|
||||
self.ports = None
|
||||
self.fsm = None
|
||||
|
||||
def _thread_release_resources(self, t):
|
||||
"""Thread.link() callback to release resources."""
|
||||
self.release_resources()
|
||||
|
||||
def process_event(self, event, callback=None, call_args=None,
|
||||
call_kwargs=None, err_handler=None):
|
||||
"""Process the given event for the task's current state.
|
||||
|
||||
:param event: the name of the event to process
|
||||
:param callback: optional callback to invoke upon event transition
|
||||
:param call_args: optional \*args to pass to the callback method
|
||||
:param call_kwargs: optional \**kwargs to pass to the callback method
|
||||
:param err_handler: optional error handler to invoke if the
|
||||
callback fails, eg. because there are no workers available
|
||||
(err_handler should accept arguments node, prev_prov_state, and
|
||||
prev_target_state)
|
||||
:raises: InvalidState if the event is not allowed by the associated
|
||||
state machine
|
||||
"""
|
||||
# Advance the state model for the given event. Note that this doesn't
|
||||
# alter the node in any way. This may raise InvalidState, if this event
|
||||
# is not allowed in the current state.
|
||||
self.fsm.process_event(event)
|
||||
|
||||
# stash current states in the error handler if callback is set,
|
||||
# in case we fail to get a worker from the pool
|
||||
if err_handler and callback:
|
||||
self.set_spawn_error_hook(err_handler, self.node,
|
||||
self.node.provision_state,
|
||||
self.node.target_provision_state)
|
||||
|
||||
self.node.provision_state = self.fsm.current_state
|
||||
self.node.target_provision_state = self.fsm.target_state
|
||||
|
||||
# set up the async worker
|
||||
if callback:
|
||||
# clear the error if we're going to start work in a callback
|
||||
self.node.last_error = None
|
||||
if call_args is None:
|
||||
call_args = ()
|
||||
if call_kwargs is None:
|
||||
call_kwargs = {}
|
||||
self.spawn_after(callback, *call_args, **call_kwargs)
|
||||
|
||||
# publish the state transition by saving the Node
|
||||
self.node.save()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is None and self._spawn_method is not None:
|
||||
# Spawn a worker to complete the task
|
||||
# The linked callback below will be called whenever:
|
||||
# - background task finished with no errors.
|
||||
# - background task has crashed with exception.
|
||||
# - callback was added after the background task has
|
||||
# finished or crashed. While eventlet currently doesn't
|
||||
# schedule the new thread until the current thread blocks
|
||||
# for some reason, this is true.
|
||||
# All of the above are asserted in tests such that we'll
|
||||
# catch if eventlet ever changes this behavior.
|
||||
thread = None
|
||||
try:
|
||||
thread = self._spawn_method(*self._spawn_args,
|
||||
**self._spawn_kwargs)
|
||||
|
||||
# NOTE(comstud): Trying to use a lambda here causes
|
||||
# the callback to not occur for some reason. This
|
||||
# also makes it easier to test.
|
||||
thread.link(self._thread_release_resources)
|
||||
# Don't unlock! The unlock will occur when the
|
||||
# thread finshes.
|
||||
return
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
try:
|
||||
# Execute the on_error hook if set
|
||||
if self._on_error_method:
|
||||
self._on_error_method(e, *self._on_error_args,
|
||||
**self._on_error_kwargs)
|
||||
except Exception:
|
||||
LOG.warning(_LW("Task's on_error hook failed to "
|
||||
"call %(method)s on node %(node)s"),
|
||||
{'method': self._on_error_method.__name__,
|
||||
'node': self.node.uuid})
|
||||
|
||||
if thread is not None:
|
||||
# This means the link() failed for some
|
||||
# reason. Nuke the thread.
|
||||
thread.cancel()
|
||||
self.release_resources()
|
||||
self.release_resources()
|
||||
160
iotronic/conductor/__old/utils.py
Normal file
160
iotronic/conductor/__old/utils.py
Normal file
@@ -0,0 +1,160 @@
|
||||
# coding=utf-8
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import excutils
|
||||
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _
|
||||
from iotronic.common.i18n import _LI
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.common import states
|
||||
from iotronic.conductor import task_manager
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def node_set_boot_device(task, device, persistent=False):
|
||||
"""Set the boot device for a node.
|
||||
|
||||
:param task: a TaskManager instance.
|
||||
:param device: Boot device. Values are vendor-specific.
|
||||
:param persistent: Whether to set next-boot, or make the change
|
||||
permanent. Default: False.
|
||||
:raises: InvalidParameterValue if the validation of the
|
||||
ManagementInterface fails.
|
||||
|
||||
"""
|
||||
if getattr(task.driver, 'management', None):
|
||||
task.driver.management.validate(task)
|
||||
task.driver.management.set_boot_device(task,
|
||||
device=device,
|
||||
persistent=persistent)
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def node_power_action(task, new_state):
|
||||
"""Change power state or reset for a node.
|
||||
|
||||
Perform the requested power action if the transition is required.
|
||||
|
||||
:param task: a TaskManager instance containing the node to act on.
|
||||
:param new_state: Any power state from iotronic.common.states. If the
|
||||
state is 'REBOOT' then a reboot will be attempted, otherwise
|
||||
the node power state is directly set to 'state'.
|
||||
:raises: InvalidParameterValue when the wrong state is specified
|
||||
or the wrong driver info is specified.
|
||||
:raises: other exceptions by the node's power driver if something
|
||||
wrong occurred during the power action.
|
||||
|
||||
"""
|
||||
node = task.node
|
||||
target_state = states.POWER_ON if new_state == states.REBOOT else new_state
|
||||
|
||||
if new_state != states.REBOOT:
|
||||
try:
|
||||
curr_state = task.driver.power.get_power_state(task)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
node['last_error'] = _(
|
||||
"Failed to change power state to '%(target)s'. "
|
||||
"Error: %(error)s") % {'target': new_state, 'error': e}
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
|
||||
if curr_state == new_state:
|
||||
# Neither the iotronic service nor the hardware has erred. The
|
||||
# node is, for some reason, already in the requested state,
|
||||
# though we don't know why. eg, perhaps the user previously
|
||||
# requested the node POWER_ON, the network delayed those IPMI
|
||||
# packets, and they are trying again -- but the node finally
|
||||
# responds to the first request, and so the second request
|
||||
# gets to this check and stops.
|
||||
# This isn't an error, so we'll clear last_error field
|
||||
# (from previous operation), log a warning, and return.
|
||||
node['last_error'] = None
|
||||
# NOTE(dtantsur): under rare conditions we can get out of sync here
|
||||
node['power_state'] = new_state
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
LOG.warn(_LW("Not going to change_node_power_state because "
|
||||
"current state = requested state = '%(state)s'."),
|
||||
{'state': curr_state})
|
||||
return
|
||||
|
||||
if curr_state == states.ERROR:
|
||||
# be optimistic and continue action
|
||||
LOG.warn(_LW("Driver returns ERROR power state for node %s."),
|
||||
node.uuid)
|
||||
|
||||
# Set the target_power_state and clear any last_error, if we're
|
||||
# starting a new operation. This will expose to other processes
|
||||
# and clients that work is in progress.
|
||||
if node['target_power_state'] != target_state:
|
||||
node['target_power_state'] = target_state
|
||||
node['last_error'] = None
|
||||
node.save()
|
||||
|
||||
# take power action
|
||||
try:
|
||||
if new_state != states.REBOOT:
|
||||
task.driver.power.set_power_state(task, new_state)
|
||||
else:
|
||||
task.driver.power.reboot(task)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
node['last_error'] = _(
|
||||
"Failed to change power state to '%(target)s'. "
|
||||
"Error: %(error)s") % {'target': target_state, 'error': e}
|
||||
else:
|
||||
# success!
|
||||
node['power_state'] = target_state
|
||||
LOG.info(_LI('Successfully set node %(node)s power state to '
|
||||
'%(state)s.'),
|
||||
{'node': node.uuid, 'state': target_state})
|
||||
finally:
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def cleanup_after_timeout(task):
|
||||
"""Cleanup deploy task after timeout.
|
||||
|
||||
:param task: a TaskManager instance.
|
||||
"""
|
||||
node = task.node
|
||||
msg = (_('Timeout reached while waiting for callback for node %s')
|
||||
% node.uuid)
|
||||
node.last_error = msg
|
||||
LOG.error(msg)
|
||||
node.save()
|
||||
|
||||
error_msg = _('Cleanup failed for node %(node)s after deploy timeout: '
|
||||
' %(error)s')
|
||||
try:
|
||||
task.driver.deploy.clean_up(task)
|
||||
except exception.IotronicException as e:
|
||||
msg = error_msg % {'node': node.uuid, 'error': e}
|
||||
LOG.error(msg)
|
||||
node.last_error = msg
|
||||
node.save()
|
||||
except Exception as e:
|
||||
msg = error_msg % {'node': node.uuid, 'error': e}
|
||||
LOG.error(msg)
|
||||
node.last_error = _('Deploy timed out, but an unhandled exception was '
|
||||
'encountered while aborting. More info may be '
|
||||
'found in the log file.')
|
||||
node.save()
|
||||
2269
iotronic/conductor/manager.py
Normal file
2269
iotronic/conductor/manager.py
Normal file
File diff suppressed because it is too large
Load Diff
519
iotronic/conductor/rpcapi.py
Normal file
519
iotronic/conductor/rpcapi.py
Normal file
@@ -0,0 +1,519 @@
|
||||
# coding=utf-8
|
||||
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""
|
||||
Client side of the conductor RPC API.
|
||||
"""
|
||||
|
||||
import random
|
||||
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from iotronic.common import exception
|
||||
from iotronic.common import hash_ring
|
||||
from iotronic.common.i18n import _
|
||||
from iotronic.common import rpc
|
||||
from iotronic.conductor import manager
|
||||
from iotronic.objects import base as objects_base
|
||||
|
||||
|
||||
class ConductorAPI(object):
|
||||
"""Client side of the conductor RPC API.
|
||||
|
||||
API version history:
|
||||
| 1.0 - Initial version.
|
||||
"""
|
||||
|
||||
# NOTE(rloo): This must be in sync with manager.ConductorManager's.
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, topic=None):
|
||||
super(ConductorAPI, self).__init__()
|
||||
self.topic = topic
|
||||
if self.topic is None:
|
||||
self.topic = manager.MANAGER_TOPIC
|
||||
|
||||
target = messaging.Target(topic=self.topic,
|
||||
version='1.0')
|
||||
serializer = objects_base.IotronicObjectSerializer()
|
||||
self.client = rpc.get_client(target,
|
||||
version_cap=self.RPC_API_VERSION,
|
||||
serializer=serializer)
|
||||
# NOTE(deva): this is going to be buggy
|
||||
self.ring_manager = hash_ring.HashRingManager()
|
||||
|
||||
def get_topic_for(self, node):
|
||||
"""Get the RPC topic for the conductor service the node is mapped to.
|
||||
|
||||
:param node: a node object.
|
||||
:returns: an RPC topic string.
|
||||
:raises: NoValidHost
|
||||
|
||||
"""
|
||||
'''
|
||||
self.ring_manager.reset()
|
||||
|
||||
try:
|
||||
ring = self.ring_manager[node.driver]
|
||||
dest = ring.get_hosts(node.uuid)
|
||||
return self.topic + "." + dest[0]
|
||||
except exception.DriverNotFound:
|
||||
reason = (_('No conductor service registered which supports '
|
||||
'driver %s.') % node.driver)
|
||||
raise exception.NoValidHost(reason=reason)
|
||||
'''
|
||||
|
||||
pass
|
||||
|
||||
def get_topic_for_driver(self, driver_name):
|
||||
"""Get RPC topic name for a conductor supporting the given driver.
|
||||
|
||||
The topic is used to route messages to the conductor supporting
|
||||
the specified driver. A conductor is selected at random from the
|
||||
set of qualified conductors.
|
||||
|
||||
:param driver_name: the name of the driver to route to.
|
||||
:returns: an RPC topic string.
|
||||
:raises: DriverNotFound
|
||||
|
||||
"""
|
||||
self.ring_manager.reset()
|
||||
|
||||
hash_ring = self.ring_manager[driver_name]
|
||||
host = random.choice(list(hash_ring.hosts))
|
||||
return self.topic + "." + host
|
||||
|
||||
def update_node(self, context, node_obj, topic=None):
|
||||
"""Synchronously, have a conductor update the node's information.
|
||||
|
||||
Update the node's information in the database and return a node object.
|
||||
The conductor will lock the node while it validates the supplied
|
||||
information. If driver_info is passed, it will be validated by
|
||||
the core drivers. If instance_uuid is passed, it will be set or unset
|
||||
only if the node is properly configured.
|
||||
|
||||
Note that power_state should not be passed via this method.
|
||||
Use change_node_power_state for initiating driver actions.
|
||||
|
||||
:param context: request context.
|
||||
:param node_obj: a changed (but not saved) node object.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:returns: updated node object, including all fields.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.1')
|
||||
return cctxt.call(context, 'update_node', node_obj=node_obj)
|
||||
|
||||
def change_node_power_state(self, context, node_id, new_state, topic=None):
|
||||
"""Change a node's power state.
|
||||
|
||||
Synchronously, acquire lock and start the conductor background task
|
||||
to change power state of a node.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param new_state: one of iotronic.common.states power state values
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.6')
|
||||
return cctxt.call(context, 'change_node_power_state', node_id=node_id,
|
||||
new_state=new_state)
|
||||
|
||||
def vendor_passthru(self, context, node_id, driver_method, http_method,
|
||||
info, topic=None):
|
||||
"""Receive requests for vendor-specific actions.
|
||||
|
||||
Synchronously validate driver specific info or get driver status,
|
||||
and if successful invokes the vendor method. If the method mode
|
||||
is async the conductor will start background worker to perform
|
||||
vendor action.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param driver_method: name of method for driver.
|
||||
:param http_method: the HTTP method used for the request.
|
||||
:param info: info for node driver.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: InvalidParameterValue if supplied info is not valid.
|
||||
:raises: MissingParameterValue if a required parameter is missing
|
||||
:raises: UnsupportedDriverExtension if current driver does not have
|
||||
vendor interface.
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:returns: A tuple containing the response of the invoked method
|
||||
and a boolean value indicating whether the method was
|
||||
invoked asynchronously (True) or synchronously (False).
|
||||
If invoked asynchronously the response field will be
|
||||
always None.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.20')
|
||||
return cctxt.call(context, 'vendor_passthru', node_id=node_id,
|
||||
driver_method=driver_method,
|
||||
http_method=http_method,
|
||||
info=info)
|
||||
|
||||
def driver_vendor_passthru(self, context, driver_name, driver_method,
|
||||
http_method, info, topic=None):
|
||||
"""Pass vendor-specific calls which don't specify a node to a driver.
|
||||
|
||||
Handles driver-level vendor passthru calls. These calls don't
|
||||
require a node UUID and are executed on a random conductor with
|
||||
the specified driver. If the method mode is async the conductor
|
||||
will start background worker to perform vendor action.
|
||||
|
||||
:param context: request context.
|
||||
:param driver_name: name of the driver on which to call the method.
|
||||
:param driver_method: name of the vendor method, for use by the driver.
|
||||
:param http_method: the HTTP method used for the request.
|
||||
:param info: data to pass through to the driver.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: InvalidParameterValue for parameter errors.
|
||||
:raises: MissingParameterValue if a required parameter is missing
|
||||
:raises: UnsupportedDriverExtension if the driver doesn't have a vendor
|
||||
interface, or if the vendor interface does not support the
|
||||
specified driver_method.
|
||||
:raises: DriverNotFound if the supplied driver is not loaded.
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
:returns: A tuple containing the response of the invoked method
|
||||
and a boolean value indicating whether the method was
|
||||
invoked asynchronously (True) or synchronously (False).
|
||||
If invoked asynchronously the response field will be
|
||||
always None.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.20')
|
||||
return cctxt.call(context, 'driver_vendor_passthru',
|
||||
driver_name=driver_name,
|
||||
driver_method=driver_method,
|
||||
http_method=http_method,
|
||||
info=info)
|
||||
|
||||
def get_node_vendor_passthru_methods(self, context, node_id, topic=None):
|
||||
"""Retrieve information about vendor methods of the given node.
|
||||
|
||||
:param context: an admin context.
|
||||
:param node_id: the id or uuid of a node.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:returns: dictionary of <method name>:<method metadata> entries.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.21')
|
||||
return cctxt.call(context, 'get_node_vendor_passthru_methods',
|
||||
node_id=node_id)
|
||||
|
||||
def get_driver_vendor_passthru_methods(self, context, driver_name,
|
||||
topic=None):
|
||||
"""Retrieve information about vendor methods of the given driver.
|
||||
|
||||
:param context: an admin context.
|
||||
:param driver_name: name of the driver.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:returns: dictionary of <method name>:<method metadata> entries.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.21')
|
||||
return cctxt.call(context, 'get_driver_vendor_passthru_methods',
|
||||
driver_name=driver_name)
|
||||
|
||||
def do_node_deploy(self, context, node_id, rebuild, configdrive,
|
||||
topic=None):
|
||||
"""Signal to conductor service to perform a deployment.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param rebuild: True if this is a rebuild request.
|
||||
:param configdrive: A gzipped and base64 encoded configdrive.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: InstanceDeployFailure
|
||||
:raises: InvalidParameterValue if validation fails
|
||||
:raises: MissingParameterValue if a required parameter is missing
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
|
||||
The node must already be configured and in the appropriate
|
||||
undeployed state before this method is called.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.22')
|
||||
return cctxt.call(context, 'do_node_deploy', node_id=node_id,
|
||||
rebuild=rebuild, configdrive=configdrive)
|
||||
|
||||
def do_node_tear_down(self, context, node_id, topic=None):
|
||||
"""Signal to conductor service to tear down a deployment.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: InstanceDeployFailure
|
||||
:raises: InvalidParameterValue if validation fails
|
||||
:raises: MissingParameterValue if a required parameter is missing
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
|
||||
The node must already be configured and in the appropriate
|
||||
deployed state before this method is called.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.6')
|
||||
return cctxt.call(context, 'do_node_tear_down', node_id=node_id)
|
||||
|
||||
def do_provisioning_action(self, context, node_id, action, topic=None):
|
||||
"""Signal to conductor service to perform the given action on a node.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param action: an action. One of iotronic.common.states.VERBS
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: InvalidParameterValue
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
:raises: InvalidStateRequested if the requested action can not
|
||||
be performed.
|
||||
|
||||
This encapsulates some provisioning actions in a single call.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.23')
|
||||
return cctxt.call(context, 'do_provisioning_action',
|
||||
node_id=node_id, action=action)
|
||||
|
||||
def continue_node_clean(self, context, node_id, topic=None):
|
||||
"""Signal to conductor service to start the next cleaning action.
|
||||
|
||||
NOTE(JoshNang) this is an RPC cast, there will be no response or
|
||||
exception raised by the conductor for this RPC.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.27')
|
||||
return cctxt.cast(context, 'continue_node_clean',
|
||||
node_id=node_id)
|
||||
|
||||
def validate_driver_interfaces(self, context, node_id, topic=None):
|
||||
"""Validate the `core` and `standardized` interfaces for drivers.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:returns: a dictionary containing the results of each
|
||||
interface validation.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.5')
|
||||
return cctxt.call(context, 'validate_driver_interfaces',
|
||||
node_id=node_id)
|
||||
|
||||
def destroy_node(self, context, node_id, topic=None):
|
||||
"""Delete a node.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:raises: NodeAssociated if the node contains an instance
|
||||
associated with it.
|
||||
:raises: InvalidState if the node is in the wrong provision
|
||||
state to perform deletion.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.9')
|
||||
return cctxt.call(context, 'destroy_node', node_id=node_id)
|
||||
|
||||
def get_console_information(self, context, node_id, topic=None):
|
||||
"""Get connection information about the console.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: UnsupportedDriverExtension if the node's driver doesn't
|
||||
support console.
|
||||
:raises: InvalidParameterValue when the wrong driver info is specified.
|
||||
:raises: MissingParameterValue if a required parameter is missing
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.11')
|
||||
return cctxt.call(context, 'get_console_information', node_id=node_id)
|
||||
|
||||
def set_console_mode(self, context, node_id, enabled, topic=None):
|
||||
"""Enable/Disable the console.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:param enabled: Boolean value; whether the console is enabled or
|
||||
disabled.
|
||||
:raises: UnsupportedDriverExtension if the node's driver doesn't
|
||||
support console.
|
||||
:raises: InvalidParameterValue when the wrong driver info is specified.
|
||||
:raises: MissingParameterValue if a required parameter is missing
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.11')
|
||||
return cctxt.call(context, 'set_console_mode', node_id=node_id,
|
||||
enabled=enabled)
|
||||
|
||||
def update_port(self, context, port_obj, topic=None):
|
||||
"""Synchronously, have a conductor update the port's information.
|
||||
|
||||
Update the port's information in the database and return a port object.
|
||||
The conductor will lock related node and trigger specific driver
|
||||
actions if they are needed.
|
||||
|
||||
:param context: request context.
|
||||
:param port_obj: a changed (but not saved) port object.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:returns: updated port object, including all fields.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.13')
|
||||
return cctxt.call(context, 'update_port', port_obj=port_obj)
|
||||
|
||||
def get_driver_properties(self, context, driver_name, topic=None):
|
||||
"""Get the properties of the driver.
|
||||
|
||||
:param context: request context.
|
||||
:param driver_name: name of the driver.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:returns: a dictionary with <property name>:<property description>
|
||||
entries.
|
||||
:raises: DriverNotFound.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.16')
|
||||
return cctxt.call(context, 'get_driver_properties',
|
||||
driver_name=driver_name)
|
||||
|
||||
def set_boot_device(self, context, node_id, device, persistent=False,
|
||||
topic=None):
|
||||
"""Set the boot device for a node.
|
||||
|
||||
Set the boot device to use on next reboot of the node. Be aware
|
||||
that not all drivers support this.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param device: the boot device, one of
|
||||
:mod:`iotronic.common.boot_devices`.
|
||||
:param persistent: Whether to set next-boot, or make the change
|
||||
permanent. Default: False.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:raises: UnsupportedDriverExtension if the node's driver doesn't
|
||||
support management.
|
||||
:raises: InvalidParameterValue when the wrong driver info is
|
||||
specified or an invalid boot device is specified.
|
||||
:raises: MissingParameterValue if missing supplied info.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
|
||||
return cctxt.call(context, 'set_boot_device', node_id=node_id,
|
||||
device=device, persistent=persistent)
|
||||
|
||||
def get_boot_device(self, context, node_id, topic=None):
|
||||
"""Get the current boot device.
|
||||
|
||||
Returns the current boot device of a node.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:raises: UnsupportedDriverExtension if the node's driver doesn't
|
||||
support management.
|
||||
:raises: InvalidParameterValue when the wrong driver info is
|
||||
specified.
|
||||
:raises: MissingParameterValue if missing supplied info.
|
||||
:returns: a dictionary containing:
|
||||
|
||||
:boot_device: the boot device, one of
|
||||
:mod:`iotronic.common.boot_devices` or None if it is unknown.
|
||||
:persistent: Whether the boot device will persist to all
|
||||
future boots or not, None if it is unknown.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
|
||||
return cctxt.call(context, 'get_boot_device', node_id=node_id)
|
||||
|
||||
def get_supported_boot_devices(self, context, node_id, topic=None):
|
||||
"""Get the list of supported devices.
|
||||
|
||||
Returns the list of supported boot devices of a node.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:raises: UnsupportedDriverExtension if the node's driver doesn't
|
||||
support management.
|
||||
:raises: InvalidParameterValue when the wrong driver info is
|
||||
specified.
|
||||
:raises: MissingParameterValue if missing supplied info.
|
||||
:returns: A list with the supported boot devices defined
|
||||
in :mod:`iotronic.common.boot_devices`.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.17')
|
||||
return cctxt.call(context, 'get_supported_boot_devices',
|
||||
node_id=node_id)
|
||||
|
||||
def inspect_hardware(self, context, node_id, topic=None):
|
||||
"""Signals the conductor service to perform hardware introspection.
|
||||
|
||||
:param context: request context.
|
||||
:param node_id: node id or uuid.
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:raises: HardwareInspectionFailure
|
||||
:raises: NoFreeConductorWorker when there is no free worker to start
|
||||
async task.
|
||||
:raises: UnsupportedDriverExtension if the node's driver doesn't
|
||||
support inspection.
|
||||
:raises: InvalidStateRequested if 'inspect' is not a valid
|
||||
action to do in the current state.
|
||||
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.24')
|
||||
return cctxt.call(context, 'inspect_hardware', node_id=node_id)
|
||||
|
||||
def destroy_port(self, context, port, topic=None):
|
||||
"""Delete a port.
|
||||
|
||||
:param context: request context.
|
||||
:param port: port object
|
||||
:param topic: RPC topic. Defaults to self.topic.
|
||||
:raises: NodeLocked if node is locked by another conductor.
|
||||
:raises: NodeNotFound if the node associated with the port does not
|
||||
exist.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.25')
|
||||
return cctxt.call(context, 'destroy_port', port=port)
|
||||
|
||||
######################### NEW
|
||||
|
||||
def destroy_board(self, context, board_id, topic=None):
|
||||
"""Delete a board.
|
||||
|
||||
:param context: request context.
|
||||
:param board_id: board id or uuid.
|
||||
:raises: BoardLocked if board is locked by another conductor.
|
||||
:raises: BoardAssociated if the board contains an instance
|
||||
associated with it.
|
||||
:raises: InvalidState if the board is in the wrong provision
|
||||
state to perform deletion.
|
||||
"""
|
||||
cctxt = self.client.prepare(topic=topic or self.topic, version='1.0')
|
||||
return cctxt.call(context, 'destroy_board', board_id=board_id)
|
||||
363
iotronic/conductor/task_manager.py
Normal file
363
iotronic/conductor/task_manager.py
Normal file
@@ -0,0 +1,363 @@
|
||||
# coding=utf-8
|
||||
|
||||
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
A context manager to perform a series of tasks on a set of resources.
|
||||
|
||||
:class:`TaskManager` is a context manager, created on-demand to allow
|
||||
synchronized access to a board and its resources.
|
||||
|
||||
The :class:`TaskManager` will, by default, acquire an exclusive lock on
|
||||
a board for the duration that the TaskManager instance exists. You may
|
||||
create a TaskManager instance without locking by passing "shared=True"
|
||||
when creating it, but certain operations on the resources held by such
|
||||
an instance of TaskManager will not be possible. Requiring this exclusive
|
||||
lock guards against parallel operations interfering with each other.
|
||||
|
||||
A shared lock is useful when performing non-interfering operations,
|
||||
such as validating the driver interfaces.
|
||||
|
||||
An exclusive lock is stored in the database to coordinate between
|
||||
:class:`iotronic.iotconductor.manager` instances, that are typically deployed on
|
||||
different hosts.
|
||||
|
||||
:class:`TaskManager` methods, as well as driver methods, may be decorated to
|
||||
determine whether their invocation requires an exclusive lock.
|
||||
|
||||
The TaskManager instance exposes certain board resources and properties as
|
||||
attributes that you may access:
|
||||
|
||||
task.context
|
||||
The context passed to TaskManager()
|
||||
task.shared
|
||||
False if Board is locked, True if it is not locked. (The
|
||||
'shared' kwarg arg of TaskManager())
|
||||
task.board
|
||||
The Board object
|
||||
task.ports
|
||||
Ports belonging to the Board
|
||||
task.driver
|
||||
The Driver for the Board, or the Driver based on the
|
||||
'driver_name' kwarg of TaskManager().
|
||||
|
||||
Example usage:
|
||||
|
||||
::
|
||||
|
||||
with task_manager.acquire(context, board_id) as task:
|
||||
task.driver.power.power_on(task.board)
|
||||
|
||||
If you need to execute task-requiring code in a background thread, the
|
||||
TaskManager instance provides an interface to handle this for you, making
|
||||
sure to release resources when the thread finishes (successfully or if
|
||||
an exception occurs). Common use of this is within the Manager like so:
|
||||
|
||||
::
|
||||
|
||||
with task_manager.acquire(context, board_id) as task:
|
||||
<do some work>
|
||||
task.spawn_after(self._spawn_worker,
|
||||
utils.board_power_action, task, new_state)
|
||||
|
||||
All exceptions that occur in the current GreenThread as part of the
|
||||
spawn handling are re-raised. You can specify a hook to execute custom
|
||||
code when such exceptions occur. For example, the hook is a more elegant
|
||||
solution than wrapping the "with task_manager.acquire()" with a
|
||||
try..exception block. (Note that this hook does not handle exceptions
|
||||
raised in the background thread.):
|
||||
|
||||
::
|
||||
|
||||
def on_error(e):
|
||||
if isinstance(e, Exception):
|
||||
...
|
||||
|
||||
with task_manager.acquire(context, board_id) as task:
|
||||
<do some work>
|
||||
task.set_spawn_error_hook(on_error)
|
||||
task.spawn_after(self._spawn_worker,
|
||||
utils.board_power_action, task, new_state)
|
||||
|
||||
"""
|
||||
|
||||
import functools
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
import retrying
|
||||
|
||||
from iotronic.common import driver_factory
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.common import states
|
||||
from iotronic import objects
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def require_exclusive_lock(f):
|
||||
"""Decorator to require an exclusive lock.
|
||||
|
||||
Decorated functions must take a :class:`TaskManager` as the first
|
||||
parameter. Decorated class methods should take a :class:`TaskManager`
|
||||
as the first parameter after "self".
|
||||
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
task = args[0] if isinstance(args[0], TaskManager) else args[1]
|
||||
if task.shared:
|
||||
raise exception.ExclusiveLockRequired()
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
|
||||
def acquire(context, board_id, shared=False, driver_name=None):
|
||||
"""Shortcut for acquiring a lock on a Board.
|
||||
|
||||
:param context: Request context.
|
||||
:param board_id: ID or UUID of board to lock.
|
||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
||||
lock. Default: False.
|
||||
:param driver_name: Name of Driver. Default: None.
|
||||
:returns: An instance of :class:`TaskManager`.
|
||||
|
||||
"""
|
||||
return TaskManager(context, board_id, shared=shared,
|
||||
driver_name=driver_name)
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
"""Context manager for tasks.
|
||||
|
||||
This class wraps the locking, driver loading, and acquisition
|
||||
of related resources (eg, Board and Ports) when beginning a unit of work.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, context, board_id, shared=False, driver_name=None):
|
||||
"""Create a new TaskManager.
|
||||
|
||||
Acquire a lock on a board. The lock can be either shared or
|
||||
exclusive. Shared locks may be used for read-only or
|
||||
non-disruptive actions only, and must be considerate to what
|
||||
other threads may be doing on the same board at the same time.
|
||||
|
||||
:param context: request context
|
||||
:param board_id: ID or UUID of board to lock.
|
||||
:param shared: Boolean indicating whether to take a shared or exclusive
|
||||
lock. Default: False.
|
||||
:param driver_name: The name of the driver to load, if different
|
||||
from the Board's current driver.
|
||||
:raises: DriverNotFound
|
||||
:raises: BoardNotFound
|
||||
:raises: BoardLocked
|
||||
|
||||
"""
|
||||
|
||||
self._spawn_method = None
|
||||
self._on_error_method = None
|
||||
|
||||
self.context = context
|
||||
#self.board = None
|
||||
self.board = None
|
||||
self.shared = shared
|
||||
|
||||
self.fsm = states.machine.copy()
|
||||
|
||||
# BoardLocked exceptions can be annoying. Let's try to alleviate
|
||||
# some of that pain by retrying our lock attempts. The retrying
|
||||
# module expects a wait_fixed value in milliseconds.
|
||||
@retrying.retry(
|
||||
retry_on_exception=lambda e: isinstance(e, exception.BoardLocked),
|
||||
stop_max_attempt_number=CONF.conductor.board_locked_retry_attempts,
|
||||
wait_fixed=CONF.conductor.board_locked_retry_interval * 1000)
|
||||
def reserve_board():
|
||||
LOG.debug("Attempting to reserve board %(board)s",
|
||||
{'board': board_id})
|
||||
self.board = objects.Board.reserve(context, CONF.host, board_id)
|
||||
|
||||
try:
|
||||
if not self.shared:
|
||||
reserve_board()
|
||||
else:
|
||||
self.board = objects.Board.get(context, board_id)
|
||||
#self.ports = objects.Port.list_by_board_id(context, self.board.id)
|
||||
#self.driver = driver_factory.get_driver(driver_name or
|
||||
# self.board.driver)
|
||||
|
||||
# NOTE(deva): this handles the Juno-era NOSTATE state
|
||||
# and should be deleted after Kilo is released
|
||||
'''
|
||||
if self.board.provision_state is states.NOSTATE:
|
||||
self.board.provision_state = states.AVAILABLE
|
||||
self.board.save()
|
||||
|
||||
self.fsm.initialize(self.board.provision_state)
|
||||
'''
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.release_resources()
|
||||
|
||||
def spawn_after(self, _spawn_method, *args, **kwargs):
|
||||
"""Call this to spawn a thread to complete the task.
|
||||
|
||||
The specified method will be called when the TaskManager instance
|
||||
exits.
|
||||
|
||||
:param _spawn_method: a method that returns a GreenThread object
|
||||
:param args: args passed to the method.
|
||||
:param kwargs: additional kwargs passed to the method.
|
||||
|
||||
"""
|
||||
self._spawn_method = _spawn_method
|
||||
self._spawn_args = args
|
||||
self._spawn_kwargs = kwargs
|
||||
|
||||
def set_spawn_error_hook(self, _on_error_method, *args, **kwargs):
|
||||
"""Create a hook to handle exceptions when spawning a task.
|
||||
|
||||
Create a hook that gets called upon an exception being raised
|
||||
from spawning a background thread to do a task.
|
||||
|
||||
:param _on_error_method: a callable object, it's first parameter
|
||||
should accept the Exception object that was raised.
|
||||
:param args: additional args passed to the callable object.
|
||||
:param kwargs: additional kwargs passed to the callable object.
|
||||
|
||||
"""
|
||||
self._on_error_method = _on_error_method
|
||||
self._on_error_args = args
|
||||
self._on_error_kwargs = kwargs
|
||||
|
||||
def release_resources(self):
|
||||
"""Unlock a board and release resources.
|
||||
|
||||
If an exclusive lock is held, unlock the board. Reset attributes
|
||||
to make it clear that this instance of TaskManager should no
|
||||
longer be accessed.
|
||||
"""
|
||||
|
||||
if not self.shared:
|
||||
try:
|
||||
if self.board:
|
||||
objects.Board.release(self.context, CONF.host, self.board.id)
|
||||
except exception.BoardNotFound:
|
||||
# squelch the exception if the board was deleted
|
||||
# within the task's context.
|
||||
pass
|
||||
self.board = None
|
||||
self.driver = None
|
||||
self.ports = None
|
||||
self.fsm = None
|
||||
|
||||
def _thread_release_resources(self, t):
|
||||
"""Thread.link() callback to release resources."""
|
||||
self.release_resources()
|
||||
|
||||
def process_event(self, event, callback=None, call_args=None,
|
||||
call_kwargs=None, err_handler=None):
|
||||
"""Process the given event for the task's current state.
|
||||
|
||||
:param event: the name of the event to process
|
||||
:param callback: optional callback to invoke upon event transition
|
||||
:param call_args: optional \*args to pass to the callback method
|
||||
:param call_kwargs: optional \**kwargs to pass to the callback method
|
||||
:param err_handler: optional error handler to invoke if the
|
||||
callback fails, eg. because there are no workers available
|
||||
(err_handler should accept arguments board, prev_prov_state, and
|
||||
prev_target_state)
|
||||
:raises: InvalidState if the event is not allowed by the associated
|
||||
state machine
|
||||
"""
|
||||
# Advance the state model for the given event. Note that this doesn't
|
||||
# alter the board in any way. This may raise InvalidState, if this event
|
||||
# is not allowed in the current state.
|
||||
self.fsm.process_event(event)
|
||||
|
||||
# stash current states in the error handler if callback is set,
|
||||
# in case we fail to get a worker from the pool
|
||||
if err_handler and callback:
|
||||
self.set_spawn_error_hook(err_handler, self.board,
|
||||
self.board.provision_state,
|
||||
self.board.target_provision_state)
|
||||
|
||||
self.board.provision_state = self.fsm.current_state
|
||||
self.board.target_provision_state = self.fsm.target_state
|
||||
|
||||
# set up the async worker
|
||||
if callback:
|
||||
# clear the error if we're going to start work in a callback
|
||||
self.board.last_error = None
|
||||
if call_args is None:
|
||||
call_args = ()
|
||||
if call_kwargs is None:
|
||||
call_kwargs = {}
|
||||
self.spawn_after(callback, *call_args, **call_kwargs)
|
||||
|
||||
# publish the state transition by saving the Board
|
||||
self.board.save()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is None and self._spawn_method is not None:
|
||||
# Spawn a worker to complete the task
|
||||
# The linked callback below will be called whenever:
|
||||
# - background task finished with no errors.
|
||||
# - background task has crashed with exception.
|
||||
# - callback was added after the background task has
|
||||
# finished or crashed. While eventlet currently doesn't
|
||||
# schedule the new thread until the current thread blocks
|
||||
# for some reason, this is true.
|
||||
# All of the above are asserted in tests such that we'll
|
||||
# catch if eventlet ever changes this behavior.
|
||||
thread = None
|
||||
try:
|
||||
thread = self._spawn_method(*self._spawn_args,
|
||||
**self._spawn_kwargs)
|
||||
|
||||
# NOTE(comstud): Trying to use a lambda here causes
|
||||
# the callback to not occur for some reason. This
|
||||
# also makes it easier to test.
|
||||
thread.link(self._thread_release_resources)
|
||||
# Don't unlock! The unlock will occur when the
|
||||
# thread finshes.
|
||||
return
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
try:
|
||||
# Execute the on_error hook if set
|
||||
if self._on_error_method:
|
||||
self._on_error_method(e, *self._on_error_args,
|
||||
**self._on_error_kwargs)
|
||||
except Exception:
|
||||
LOG.warning(_LW("Task's on_error hook failed to "
|
||||
"call %(method)s on board %(board)s"),
|
||||
{'method': self._on_error_method.__name__,
|
||||
'board': self.board.uuid})
|
||||
|
||||
if thread is not None:
|
||||
# This means the link() failed for some
|
||||
# reason. Nuke the thread.
|
||||
thread.cancel()
|
||||
self.release_resources()
|
||||
self.release_resources()
|
||||
160
iotronic/conductor/utils.py
Normal file
160
iotronic/conductor/utils.py
Normal file
@@ -0,0 +1,160 @@
|
||||
# coding=utf-8
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import excutils
|
||||
|
||||
from iotronic.common import exception
|
||||
from iotronic.common.i18n import _
|
||||
from iotronic.common.i18n import _LI
|
||||
from iotronic.common.i18n import _LW
|
||||
from iotronic.common import states
|
||||
from iotronic.conductor import task_manager
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def node_set_boot_device(task, device, persistent=False):
|
||||
"""Set the boot device for a node.
|
||||
|
||||
:param task: a TaskManager instance.
|
||||
:param device: Boot device. Values are vendor-specific.
|
||||
:param persistent: Whether to set next-boot, or make the change
|
||||
permanent. Default: False.
|
||||
:raises: InvalidParameterValue if the validation of the
|
||||
ManagementInterface fails.
|
||||
|
||||
"""
|
||||
if getattr(task.driver, 'management', None):
|
||||
task.driver.management.validate(task)
|
||||
task.driver.management.set_boot_device(task,
|
||||
device=device,
|
||||
persistent=persistent)
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def node_power_action(task, new_state):
|
||||
"""Change power state or reset for a node.
|
||||
|
||||
Perform the requested power action if the transition is required.
|
||||
|
||||
:param task: a TaskManager instance containing the node to act on.
|
||||
:param new_state: Any power state from iotronic.common.states. If the
|
||||
state is 'REBOOT' then a reboot will be attempted, otherwise
|
||||
the node power state is directly set to 'state'.
|
||||
:raises: InvalidParameterValue when the wrong state is specified
|
||||
or the wrong driver info is specified.
|
||||
:raises: other exceptions by the node's power driver if something
|
||||
wrong occurred during the power action.
|
||||
|
||||
"""
|
||||
node = task.node
|
||||
target_state = states.POWER_ON if new_state == states.REBOOT else new_state
|
||||
|
||||
if new_state != states.REBOOT:
|
||||
try:
|
||||
curr_state = task.driver.power.get_power_state(task)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
node['last_error'] = _(
|
||||
"Failed to change power state to '%(target)s'. "
|
||||
"Error: %(error)s") % {'target': new_state, 'error': e}
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
|
||||
if curr_state == new_state:
|
||||
# Neither the iotronic service nor the hardware has erred. The
|
||||
# node is, for some reason, already in the requested state,
|
||||
# though we don't know why. eg, perhaps the user previously
|
||||
# requested the node POWER_ON, the network delayed those IPMI
|
||||
# packets, and they are trying again -- but the node finally
|
||||
# responds to the first request, and so the second request
|
||||
# gets to this check and stops.
|
||||
# This isn't an error, so we'll clear last_error field
|
||||
# (from previous operation), log a warning, and return.
|
||||
node['last_error'] = None
|
||||
# NOTE(dtantsur): under rare conditions we can get out of sync here
|
||||
node['power_state'] = new_state
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
LOG.warn(_LW("Not going to change_node_power_state because "
|
||||
"current state = requested state = '%(state)s'."),
|
||||
{'state': curr_state})
|
||||
return
|
||||
|
||||
if curr_state == states.ERROR:
|
||||
# be optimistic and continue action
|
||||
LOG.warn(_LW("Driver returns ERROR power state for node %s."),
|
||||
node.uuid)
|
||||
|
||||
# Set the target_power_state and clear any last_error, if we're
|
||||
# starting a new operation. This will expose to other processes
|
||||
# and clients that work is in progress.
|
||||
if node['target_power_state'] != target_state:
|
||||
node['target_power_state'] = target_state
|
||||
node['last_error'] = None
|
||||
node.save()
|
||||
|
||||
# take power action
|
||||
try:
|
||||
if new_state != states.REBOOT:
|
||||
task.driver.power.set_power_state(task, new_state)
|
||||
else:
|
||||
task.driver.power.reboot(task)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
node['last_error'] = _(
|
||||
"Failed to change power state to '%(target)s'. "
|
||||
"Error: %(error)s") % {'target': target_state, 'error': e}
|
||||
else:
|
||||
# success!
|
||||
node['power_state'] = target_state
|
||||
LOG.info(_LI('Successfully set node %(node)s power state to '
|
||||
'%(state)s.'),
|
||||
{'node': node.uuid, 'state': target_state})
|
||||
finally:
|
||||
node['target_power_state'] = states.NOSTATE
|
||||
node.save()
|
||||
|
||||
|
||||
@task_manager.require_exclusive_lock
|
||||
def cleanup_after_timeout(task):
|
||||
"""Cleanup deploy task after timeout.
|
||||
|
||||
:param task: a TaskManager instance.
|
||||
"""
|
||||
node = task.node
|
||||
msg = (_('Timeout reached while waiting for callback for node %s')
|
||||
% node.uuid)
|
||||
node.last_error = msg
|
||||
LOG.error(msg)
|
||||
node.save()
|
||||
|
||||
error_msg = _('Cleanup failed for node %(node)s after deploy timeout: '
|
||||
' %(error)s')
|
||||
try:
|
||||
task.driver.deploy.clean_up(task)
|
||||
except exception.IotronicException as e:
|
||||
msg = error_msg % {'node': node.uuid, 'error': e}
|
||||
LOG.error(msg)
|
||||
node.last_error = msg
|
||||
node.save()
|
||||
except Exception as e:
|
||||
msg = error_msg % {'node': node.uuid, 'error': e}
|
||||
LOG.error(msg)
|
||||
node.last_error = _('Deploy timed out, but an unhandled exception was '
|
||||
'encountered while aborting. More info may be '
|
||||
'found in the log file.')
|
||||
node.save()
|
||||
Reference in New Issue
Block a user