Improve new scheduler

* Changed method get_scheduled_jobs_count() in the Scheduler
  interface to has_scheduled_jobs(). In fact, the callers
  always need to compare the number of jobs with 0, i.e.
  to see if there are any jobs or not. But more importantly,
  this semantics (returning just boolean) allows to make a
  good optimisation for DefaultScheduler and avoid DB calls
  in a number of cases. Practically, for example, it saves
  several seconds (5-6) if we run a workflow with 500 parallel
  no-op tasks that are all merged with one "join" task. Tested
  on 1 and 2 engines.
* Added test assertions for has_scheduled_jobs()
* Other minor chagnes

Change-Id: Ife48d9e464114fd60a08707d8f32f847a6f623c9
This commit is contained in:
Renat Akhmerov 2019-08-12 11:55:09 +07:00
parent ee94087c89
commit 0f6bc1897f
6 changed files with 59 additions and 24 deletions

View File

@ -301,12 +301,12 @@ def _check_affected_tasks(task):
# already been processed and the task state hasn't changed.
sched = sched_base.get_system_scheduler()
cnt = sched.get_scheduled_jobs_count(
jobs_exist = sched.has_scheduled_jobs(
key=_get_refresh_state_job_key(t_ex_id),
processing=False
)
if cnt == 0:
if not jobs_exist:
_schedule_refresh_task_state(t_ex_id)
for t_ex in affected_task_execs:

View File

@ -45,13 +45,13 @@ class Scheduler(object):
raise NotImplementedError
@abc.abstractmethod
def get_scheduled_jobs_count(self, **filters):
"""Returns the number of scheduled jobs.
def has_scheduled_jobs(self, **filters):
"""Returns True if there are scheduled jobs matching the given filter.
:param filters: Filters that define what kind of jobs
need to be counted. Permitted values:
* key=<string> - a key set for a job when it was scheduled.
* processing=<boolean> - if True, return only jobs that are
* processing=<boolean> - if True, count only jobs that are
currently being processed.
"""
raise NotImplementedError

View File

@ -83,6 +83,11 @@ class DefaultScheduler(base.Scheduler):
"Starting Scheduler Job Store checker [scheduler=%s]...", self
)
eventlet.sleep(
self._fixed_delay +
random.Random().randint(0, self._random_delay * 1000) * 0.001
)
try:
self._process_store_jobs()
except Exception:
@ -98,11 +103,6 @@ class DefaultScheduler(base.Scheduler):
if sys.version_info < (3,):
sys.exc_clear()
eventlet.sleep(
self._fixed_delay +
random.Random().randint(0, self._random_delay * 1000) * 0.001
)
def _process_store_jobs(self):
# Select and capture eligible jobs.
with db_api.transaction():
@ -125,20 +125,31 @@ class DefaultScheduler(base.Scheduler):
self._delete_scheduled_job(job)
def schedule(self, job, allow_redistribute=False):
scheduled_job = DefaultScheduler._persist_job(job)
scheduled_job = self._persist_job(job)
self._schedule_in_memory(job.run_after, scheduled_job)
def get_scheduled_jobs_count(self, **filters):
def has_scheduled_jobs(self, **filters):
# Checking in-memory jobs first.
for j in self.in_memory_jobs.values():
if filters and 'key' in filters and filters['key'] != j.key:
continue
if filters and 'processing' in filters:
if filters['processing'] is (j.captured_at is None):
continue
return True
if filters and 'processing' in filters:
processing = filters.pop('processing')
filters['captured_at'] = {'neq' if processing else 'eq': None}
return db_api.get_scheduled_jobs_count(**filters)
return db_api.get_scheduled_jobs_count(**filters) > 0
@classmethod
def _persist_job(cls, job):
@staticmethod
def _persist_job(job):
ctx_serializer = context.RpcContextSerializer()
ctx = (
@ -223,26 +234,32 @@ class DefaultScheduler(base.Scheduler):
:return: True if the job has been captured, False if not.
"""
now_sec = utils.utc_now_sec()
# Mark this job as captured in order to prevent calling from
# a parallel transaction. We don't use query filter
# {'captured_at': None} to account for a case when the job needs
# {'captured_at': None} to account for a case when the job needs
# to be recaptured after a maximum capture time has elapsed. If this
# method was called for job that has non-empty "captured_at" then
# method was called for a job that has non-empty "captured_at" then
# it means that it is already eligible for recapturing and the
# Job Store selected it.
_, updated_cnt = db_api.update_scheduled_job(
id=scheduled_job.id,
values={'captured_at': utils.utc_now_sec()},
values={'captured_at': now_sec},
query_filter={'captured_at': scheduled_job.captured_at}
)
# We need to update "captured_at" of the initial object stored in
# memory because it's used in a few places.
if updated_cnt == 1:
scheduled_job.captured_at = now_sec
# If updated_cnt != 1 then another scheduler
# has already updated it.
return updated_cnt == 1
@staticmethod
@db_utils.retry_on_db_error
def _delete_scheduled_job(scheduled_job):
def _delete_scheduled_job(self, scheduled_job):
db_api.delete_scheduled_job(scheduled_job.id)
@staticmethod

View File

@ -116,8 +116,8 @@ class LegacyScheduler(sched_base.Scheduler):
**job.func_args
)
def get_scheduled_jobs_count(self, **filters):
return db_api.get_delayed_calls_count(**filters)
def has_scheduled_jobs(self, **filters):
return db_api.get_delayed_calls_count(**filters) > 0
def start(self):
self._thread.start()

View File

@ -758,7 +758,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
sched = sched_base.get_system_scheduler()
self._await(lambda: sched.get_scheduled_jobs_count() == 0)
self._await(lambda: not sched.has_scheduled_jobs())
def test_delete_workflow_integrity_check_on_execution_delete(self):
wf_text = """---
@ -778,7 +778,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
sched = sched_base.get_system_scheduler()
self._await(lambda: sched.get_scheduled_jobs_count() == 0)
self._await(lambda: not sched.has_scheduled_jobs())
def test_output(self):
wf_text = """---

View File

@ -102,6 +102,24 @@ class DefaultSchedulerTest(base.DbTestCase):
self.assertEqual(1, len(scheduled_jobs))
self.assertTrue(self.scheduler.has_scheduled_jobs())
self.assertTrue(self.scheduler.has_scheduled_jobs(processing=True))
self.assertFalse(self.scheduler.has_scheduled_jobs(processing=False))
self.assertTrue(
self.scheduler.has_scheduled_jobs(key=None, processing=True)
)
self.assertFalse(
self.scheduler.has_scheduled_jobs(key=None, processing=False)
)
self.assertFalse(self.scheduler.has_scheduled_jobs(key='foobar'))
self.assertFalse(
self.scheduler.has_scheduled_jobs(key='foobar', processing=True)
)
self.assertFalse(
self.scheduler.has_scheduled_jobs(key='foobar', processing=False)
)
captured_at = scheduled_jobs[0].captured_at
self.assertIsNotNone(captured_at)