metal/inventory/inventory/inventory/agent/base_manager.py

115 lines
3.8 KiB
Python

#
# Copyright (c) 2018 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""Base agent manager functionality."""
import futurist
from futurist import periodics
from futurist import rejection
import inspect
from inventory.common import exception
from inventory.common.i18n import _
from oslo_config import cfg
from oslo_log import log
LOG = log.getLogger(__name__)
class BaseAgentManager(object):
def __init__(self, host, topic):
super(BaseAgentManager, self).__init__()
if not host:
host = cfg.CONF.host
self.host = host
self.topic = topic
self._started = False
def init_host(self, admin_context=None):
"""Initialize the agent host.
:param admin_context: the admin context to pass to periodic tasks.
:raises: RuntimeError when agent is already running.
"""
if self._started:
raise RuntimeError(_('Attempt to start an already running '
'agent manager'))
rejection_func = rejection.reject_when_reached(64)
# CONF.conductor.workers_pool_size)
self._executor = futurist.GreenThreadPoolExecutor(
64, check_and_reject=rejection_func)
# JK max_workers=CONF.conductor.workers_pool_size,
"""Executor for performing tasks async."""
# Collect driver-specific periodic tasks.
# Conductor periodic tasks accept context argument,
LOG.info('Collecting periodic tasks')
self._periodic_task_callables = []
self._collect_periodic_tasks(self, (admin_context,))
self._periodic_tasks = periodics.PeriodicWorker(
self._periodic_task_callables,
executor_factory=periodics.ExistingExecutor(self._executor))
# Start periodic tasks
self._periodic_tasks_worker = self._executor.submit(
self._periodic_tasks.start, allow_empty=True)
self._periodic_tasks_worker.add_done_callback(
self._on_periodic_tasks_stop)
self._started = True
def del_host(self, deregister=True):
# Conductor deregistration fails if called on non-initialized
# agent (e.g. when rpc server is unreachable).
if not hasattr(self, 'agent'):
return
self._periodic_tasks.stop()
self._periodic_tasks.wait()
self._executor.shutdown(wait=True)
self._started = False
def _collect_periodic_tasks(self, obj, args):
"""Collect periodic tasks from a given object.
Populates self._periodic_task_callables with tuples
(callable, args, kwargs).
:param obj: object containing periodic tasks as methods
:param args: tuple with arguments to pass to every task
"""
for name, member in inspect.getmembers(obj):
if periodics.is_periodic(member):
LOG.debug('Found periodic task %(owner)s.%(member)s',
{'owner': obj.__class__.__name__,
'member': name})
self._periodic_task_callables.append((member, args, {}))
def _on_periodic_tasks_stop(self, fut):
try:
fut.result()
except Exception as exc:
LOG.critical('Periodic tasks worker has failed: %s', exc)
else:
LOG.info('Successfully shut down periodic tasks')
def _spawn_worker(self, func, *args, **kwargs):
"""Create a greenthread to run func(*args, **kwargs).
Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller.
:returns: Future object.
:raises: NoFreeConductorWorker if worker pool is currently full.
"""
try:
return self._executor.submit(func, *args, **kwargs)
except futurist.RejectedSubmission:
raise exception.NoFreeConductorWorker()