165 lines
6.7 KiB
Python
165 lines
6.7 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Conductor periodics."""
|
|
|
|
import collections
|
|
import functools
|
|
import inspect
|
|
|
|
import eventlet
|
|
from futurist import periodics
|
|
from oslo_log import log
|
|
|
|
from ironic.common import exception
|
|
from ironic.conductor import base_manager
|
|
from ironic.conductor import task_manager
|
|
from ironic.drivers import base as driver_base
|
|
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
def periodic(spacing, enabled=True, **kwargs):
|
|
"""A decorator to define a periodic task.
|
|
|
|
:param spacing: how often (in seconds) to run the periodic task.
|
|
:param enabled: whether the task is enabled; defaults to ``spacing > 0``.
|
|
"""
|
|
return periodics.periodic(spacing=spacing,
|
|
enabled=enabled and spacing > 0,
|
|
**kwargs)
|
|
|
|
|
|
class Stop(Exception):
|
|
"""A signal to stop the current iteration of a periodic task."""
|
|
|
|
|
|
def node_periodic(purpose, spacing, enabled=True, filters=None,
|
|
predicate=None, predicate_extra_fields=(), limit=None,
|
|
shared_task=True):
|
|
"""A decorator to define a periodic task to act on nodes.
|
|
|
|
Defines a periodic task that fetches the list of nodes mapped to the
|
|
current conductor which satisfy the provided filters.
|
|
|
|
The decorated function must be a method on either the conductor manager
|
|
or a hardware interface. The signature is:
|
|
|
|
* for conductor manager: ``(self, task, context)``
|
|
* for hardware interfaces: ``(self, task, manager, context)``.
|
|
|
|
When the periodic is running on a hardware interface, only tasks
|
|
using this interface are considered.
|
|
|
|
``NodeNotFound`` and ``NodeLocked`` exceptions are ignored. Raise ``Stop``
|
|
to abort the current iteration of the task and reschedule it.
|
|
|
|
:param purpose: a human-readable description of the activity, e.g.
|
|
"verifying that the cat is purring".
|
|
:param spacing: how often (in seconds) to run the periodic task.
|
|
:param enabled: whether the task is enabled; defaults to ``spacing > 0``.
|
|
:param filters: database-level filters for the nodes.
|
|
:param predicate: a callable to run on the fetched nodes *before* creating
|
|
a task for them. The only parameter will be a named tuple with fields
|
|
``uuid``, ``driver``, ``conductor_group`` plus everything from
|
|
``predicate_extra_fields``. If the callable accepts a 2nd parameter,
|
|
it will be the conductor manager instance.
|
|
:param predicate_extra_fields: extra fields to fetch on the initial
|
|
request and pass into the ``predicate``. Must not contain ``uuid``,
|
|
``driver`` and ``conductor_group`` since they are always included.
|
|
:param limit: how many nodes to process before stopping the current
|
|
iteration. If ``predicate`` returns ``False``, the node is not counted.
|
|
If the decorated function returns ``False``, the node is not counted
|
|
either. Can be a callable, in which case it will be called on each
|
|
iteration to determine the limit.
|
|
:param shared_task: if ``True``, the task will have a shared lock. It is
|
|
recommended to start with a shared lock and upgrade it only if needed.
|
|
"""
|
|
node_type = collections.namedtuple(
|
|
'Node',
|
|
['uuid', 'driver', 'conductor_group'] + list(predicate_extra_fields)
|
|
)
|
|
|
|
# Accepting a conductor manager is a bit of an edge case, doing a bit of
|
|
# a signature magic to avoid passing it everywhere.
|
|
accepts_manager = (predicate is not None
|
|
and len(inspect.signature(predicate).parameters) > 1)
|
|
|
|
def decorator(func):
|
|
@periodic(spacing=spacing, enabled=enabled)
|
|
@functools.wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
# Make it work with both drivers and the conductor manager
|
|
if isinstance(self, base_manager.BaseConductorManager):
|
|
manager = self
|
|
context = args[0]
|
|
else:
|
|
manager = args[0]
|
|
context = args[1]
|
|
|
|
interface_type = (getattr(self, 'interface_type', None)
|
|
if isinstance(self, driver_base.BaseInterface)
|
|
else None)
|
|
|
|
if callable(limit):
|
|
local_limit = limit()
|
|
else:
|
|
local_limit = limit
|
|
assert local_limit is None or local_limit > 0
|
|
|
|
nodes = manager.iter_nodes(filters=filters,
|
|
fields=predicate_extra_fields)
|
|
for (node_uuid, *other) in nodes:
|
|
if predicate is not None:
|
|
node = node_type(node_uuid, *other)
|
|
if accepts_manager:
|
|
result = predicate(node, manager)
|
|
else:
|
|
result = predicate(node)
|
|
if not result:
|
|
continue
|
|
|
|
try:
|
|
with task_manager.acquire(context, node_uuid,
|
|
purpose=purpose,
|
|
shared=shared_task) as task:
|
|
if interface_type is not None:
|
|
impl = getattr(task.driver, interface_type)
|
|
if not isinstance(impl, self.__class__):
|
|
continue
|
|
|
|
result = func(self, task, *args, **kwargs)
|
|
except exception.NodeNotFound:
|
|
LOG.info("During %(action)s, node %(node)s was not found "
|
|
"and presumed deleted by another process.",
|
|
{'node': node_uuid, 'action': purpose})
|
|
except exception.NodeLocked:
|
|
LOG.info("During %(action)s, node %(node)s was already "
|
|
"locked by another process. Skip.",
|
|
{'node': node_uuid, 'action': purpose})
|
|
except Stop:
|
|
break
|
|
finally:
|
|
# Yield on every iteration
|
|
eventlet.sleep(0)
|
|
|
|
if (local_limit is not None
|
|
and (result is None or result)):
|
|
local_limit -= 1
|
|
if not local_limit:
|
|
return
|
|
|
|
return wrapper
|
|
|
|
return decorator
|