Update periodic tasks

Change-Id: Ibe2835b711dae40db4d1e4d2f0aa23a496e09733
This commit is contained in:
Zhenguo Niu 2016-08-19 18:21:39 +08:00
parent c2897cac79
commit 26a6ef10a9
6 changed files with 33 additions and 97 deletions

View File

@ -27,6 +27,10 @@
# value) # value)
#state_path = $pybasedir #state_path = $pybasedir
# Default interval (in seconds) for running periodic tasks.
# (integer value)
#periodic_interval = 60
# Name of this node. This can be an opaque identifier. It is # Name of this node. This can be an opaque identifier. It is
# not necessarily a hostname, FQDN, or IP address. However, # not necessarily a hostname, FQDN, or IP address. However,
# the node name must be valid within an AMQP key, and if using # the node name must be valid within an AMQP key, and if using
@ -543,10 +547,7 @@
# From nimble # From nimble
# #
# The size of the workers greenthread pool. Note that 1 # The size of the workers greenthread pool. (integer value)
# threads will be reserved by the engine itself for handling
# periodic tasks. (integer value)
# Minimum value: 2
#workers_pool_size = 100 #workers_pool_size = 100
# URL of Nimble API service. If not set nimble can get the # URL of Nimble API service. If not set nimble can get the
@ -559,6 +560,10 @@
# thread pool size. (integer value) # thread pool size. (integer value)
#periodic_max_workers = 8 #periodic_max_workers = 8
# Interval between syncing the node resources from ironic, in
# seconds. (integer value)
#sync_node_resource_interval = 60
[matchmaker_redis] [matchmaker_redis]

View File

@ -55,7 +55,11 @@ class RPCService(service.Service):
self.rpcserver = rpc.get_server(target, endpoints, serializer) self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start() self.rpcserver.start()
self.manager.init_host(admin_context) self.manager.init_host()
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=CONF.periodic_interval,
context=admin_context)
LOG.info(_LI('Created RPC server for service %(service)s on host ' LOG.info(_LI('Created RPC server for service %(service)s on host '
'%(host)s.'), '%(host)s.'),

View File

@ -53,6 +53,10 @@ path_opts = [
] ]
service_opts = [ service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
help=_('Default interval (in seconds) for running periodic '
'tasks.')),
cfg.StrOpt('host', cfg.StrOpt('host',
default=socket.getfqdn(), default=socket.getfqdn(),
sample_default='localhost', sample_default='localhost',

View File

@ -19,10 +19,8 @@ from nimble.common.i18n import _
opts = [ opts = [
cfg.IntOpt('workers_pool_size', cfg.IntOpt('workers_pool_size',
default=100, min=2, default=100,
help=_('The size of the workers greenthread pool. ' help=_('The size of the workers greenthread pool.')),
'Note that 1 threads will be reserved by the engine '
'itself for handling periodic tasks.')),
cfg.StrOpt('api_url', cfg.StrOpt('api_url',
help=_('URL of Nimble API service. If not set nimble can ' help=_('URL of Nimble API service. If not set nimble can '
'get the current value from the keystone service ' 'get the current value from the keystone service '

View File

@ -15,18 +15,11 @@
"""Base engine manager functionality.""" """Base engine manager functionality."""
import inspect from eventlet import greenpool
import futurist
from futurist import periodics
from futurist import rejection
from oslo_log import log from oslo_log import log
from oslo_service import periodic_task
from nimble.common import exception
from nimble.common.i18n import _ from nimble.common.i18n import _
from nimble.common.i18n import _LC
from nimble.common.i18n import _LI
from nimble.common.i18n import _LW
from nimble.common import rpc from nimble.common import rpc
from nimble.conf import CONF from nimble.conf import CONF
from nimble.db import api as dbapi from nimble.db import api as dbapi
@ -35,10 +28,10 @@ from nimble.db import api as dbapi
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
class BaseEngineManager(object): class BaseEngineManager(periodic_task.PeriodicTasks):
def __init__(self, host, topic): def __init__(self, host, topic):
super(BaseEngineManager, self).__init__() super(BaseEngineManager, self).__init__(CONF)
if not host: if not host:
host = CONF.host host = CONF.host
self.host = host self.host = host
@ -46,7 +39,7 @@ class BaseEngineManager(object):
self.notifier = rpc.get_notifier() self.notifier = rpc.get_notifier()
self._started = False self._started = False
def init_host(self, admin_context=None): def init_host(self):
"""Initialize the engine host. """Initialize the engine host.
:param admin_context: the admin context to pass to periodic tasks. :param admin_context: the admin context to pass to periodic tasks.
@ -58,80 +51,11 @@ class BaseEngineManager(object):
self.dbapi = dbapi.get_instance() self.dbapi = dbapi.get_instance()
rejection_func = rejection.reject_when_reached( self._worker_pool = greenpool.GreenPool(
CONF.engine.workers_pool_size) size=CONF.engine.workers_pool_size)
self._executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.engine.workers_pool_size,
check_and_reject=rejection_func)
"""Executor for performing tasks async."""
LOG.debug('Collecting periodic tasks')
self._periodic_task_callables = []
self._collect_periodic_tasks(self, (admin_context,))
if (len(self._periodic_task_callables) >
CONF.engine.workers_pool_size):
LOG.warning(_LW('This engine has %(tasks)d periodic tasks '
'enabled, but only %(workers)d task workers '
'allowed by [engine]workers_pool_size option'),
{'tasks': len(self._periodic_task_callables),
'workers': CONF.engine.workers_pool_size})
self._periodic_tasks = periodics.PeriodicWorker(
self._periodic_task_callables,
executor_factory=periodics.ExistingExecutor(self._executor))
# Start periodic tasks
self._periodic_tasks_worker = self._executor.submit(
self._periodic_tasks.start, allow_empty=True)
self._periodic_tasks_worker.add_done_callback(
self._on_periodic_tasks_stop)
def del_host(self): def del_host(self):
# Waiting here to give workers the chance to finish. This has the self._worker_pool.waitall()
# benefit of releasing locks workers placed on nodes, as well as
# having work complete normally.
self._periodic_tasks.stop()
self._periodic_tasks.wait()
self._executor.shutdown(wait=True)
self._started = False
def _collect_periodic_tasks(self, obj, args): def periodic_tasks(self, context, raise_on_error=False):
"""Collect periodic tasks from a given object. return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
Populates self._periodic_task_callables with tuples
(callable, args, kwargs).
:param obj: object containing periodic tasks as methods
:param args: tuple with arguments to pass to every task
"""
for name, member in inspect.getmembers(obj):
if periodics.is_periodic(member):
LOG.debug('Found periodic task %(owner)s.%(member)s',
{'owner': obj.__class__.__name__,
'member': name})
self._periodic_task_callables.append((member, args, {}))
def _on_periodic_tasks_stop(self, fut):
try:
fut.result()
except Exception as exc:
LOG.critical(_LC('Periodic tasks worker has failed: %s'), exc)
else:
LOG.info(_LI('Successfully shut down periodic tasks'))
def _spawn_worker(self, func, *args, **kwargs):
"""Create a greenthread to run func(*args, **kwargs).
Spawns a greenthread if there are free slots in pool, otherwise raises
exception. Execution control returns immediately to the caller.
:returns: Future object.
:raises: NoFreeEgnineWorker if worker pool is currently full.
"""
try:
return self._executor.submit(func, *args, **kwargs)
except futurist.RejectedSubmission:
raise exception.NoFreeEngineWorker()

View File

@ -13,9 +13,9 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from futurist import periodics
from oslo_log import log from oslo_log import log
import oslo_messaging as messaging import oslo_messaging as messaging
from oslo_service import periodic_task
from nimble.common.i18n import _LI from nimble.common.i18n import _LI
from nimble.conf import CONF from nimble.conf import CONF
@ -33,6 +33,7 @@ class EngineManager(base_manager.BaseEngineManager):
target = messaging.Target(version=RPC_API_VERSION) target = messaging.Target(version=RPC_API_VERSION)
@periodics.periodic(spacing=CONF.engine.sync_node_resource_interval) @periodic_task.periodic_task(
spacing=CONF.engine.sync_node_resource_interval)
def _sync_node_resources(self, context): def _sync_node_resources(self, context):
LOG.info(_LI("During sync_node_resources.")) LOG.info(_LI("During sync_node_resources."))