periodics: switch to use futurist

This is a follow-up of [1]. As we switched from oslo_service to
cotyledon, eventlet is not monkey_patched anymore. Thus the API of the
periodic tasks from oslo_service should be replaced by futurist.

[1] Ib99565e00eedc72c388e8ebec6b7f1453f77f30f

Change-Id: I80b865f4e9d782b747f33eaae2ba6cf3f264bdf2
This commit is contained in:
Hunt Xu 2018-03-28 11:46:43 +08:00 committed by Lingxian Kong
parent de292dfcee
commit bd29ab0d9c
1 changed files with 25 additions and 17 deletions

View File

@ -14,10 +14,11 @@
from datetime import datetime
from datetime import timedelta
import threading
from futurist import periodics
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import threadgroup
from oslo_utils import timeutils
from qinling import context
@ -36,6 +37,7 @@ CONF = cfg.CONF
_periodic_tasks = {}
@periodics.periodic(300)
def handle_function_service_expiration(ctx, engine):
"""Clean up resources related to expired functions.
@ -69,6 +71,7 @@ def handle_function_service_expiration(ctx, engine):
etcd_util.delete_function(func_db.id)
@periodics.periodic(3)
def handle_job(engine_client):
"""Execute job task with no db transactions."""
for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)):
@ -152,14 +155,17 @@ def start_function_mapping_handler(engine):
Function mapping handler is supposed to be running with engine service.
"""
tg = threadgroup.ThreadGroup(1)
tg.add_timer(
300,
worker = periodics.PeriodicWorker([])
worker.add(
handle_function_service_expiration,
ctx=context.Context(),
engine=engine,
engine=engine
)
_periodic_tasks[constants.PERIODIC_FUNC_MAPPING_HANDLER] = tg
_periodic_tasks[constants.PERIODIC_FUNC_MAPPING_HANDLER] = worker
thread = threading.Thread(target=worker.start)
thread.setDaemon(True)
thread.start()
LOG.info('Function mapping handler started.')
@ -169,28 +175,30 @@ def start_job_handler():
Job handler is supposed to be running with api service.
"""
tg = threadgroup.ThreadGroup(1)
worker = periodics.PeriodicWorker([])
engine_client = rpc.get_engine_client()
tg.add_timer(
3,
worker.add(
handle_job,
engine_client=engine_client,
engine_client=engine_client
)
_periodic_tasks[constants.PERIODIC_JOB_HANDLER] = tg
_periodic_tasks[constants.PERIODIC_JOB_HANDLER] = worker
thread = threading.Thread(target=worker.start)
thread.setDaemon(True)
thread.start()
LOG.info('Job handler started.')
def stop(task=None):
if not task:
for name, tg in _periodic_tasks.items():
for name, worker in _periodic_tasks.items():
LOG.info('Stopping periodic task: %s', name)
tg.stop()
worker.stop()
del _periodic_tasks[name]
else:
tg = _periodic_tasks.get(task)
if tg:
worker = _periodic_tasks.get(task)
if worker:
LOG.info('Stopping periodic task: %s', task)
tg.stop()
worker.stop()
del _periodic_tasks[task]