From ffd99c4a2effe32a7ac55106b7eb23af817f4bcc Mon Sep 17 00:00:00 2001 From: Pierre-Arthur MATHIEU Date: Mon, 27 Feb 2017 15:16:05 +0000 Subject: [PATCH] Allow for multiple jobs to be executed at the same time Change-Id: I2fa6f989a21f7964cc573010f58d68ad003788d5 --- etc/scheduler.conf.sample | 6 ++ freezer/scheduler/arguments.py | 5 ++ freezer/scheduler/freezer_scheduler.py | 14 +++-- freezer/scheduler/scheduler_job.py | 77 +++++++++++++------------- freezer/scheduler/win_daemon.py | 4 +- freezer/scheduler/win_service.py | 3 +- 6 files changed, 61 insertions(+), 48 deletions(-) diff --git a/etc/scheduler.conf.sample b/etc/scheduler.conf.sample index 7deb1389..47b0669c 100644 --- a/etc/scheduler.conf.sample +++ b/etc/scheduler.conf.sample @@ -136,3 +136,9 @@ # Initialize freezer scheduler with insecure mode (boolean value) #insecure = false + +# Number of jobs that can be executed at the same time. (integer value) +# By default only one job is allowed at a given time because there is no +# built-in protection to prevent backup and restore to be done at the same time +# on one resource. +#concurrent_jobs = 1 diff --git a/freezer/scheduler/arguments.py b/freezer/scheduler/arguments.py index 8eb0f98e..d22b3d44 100644 --- a/freezer/scheduler/arguments.py +++ b/freezer/scheduler/arguments.py @@ -98,6 +98,11 @@ def get_common_opts(): dest='disable_exec', help='Allow Freezer Scheduler to deny jobs that execute ' 'commands for security reasons'), + cfg.IntOpt('concurrent_jobs', + default=1, + dest='concurrent_jobs', + help='Number of jobs that can be executed at the' + ' same time'), ] return _COMMON diff --git a/freezer/scheduler/freezer_scheduler.py b/freezer/scheduler/freezer_scheduler.py index 23ee65cc..4f488759 100644 --- a/freezer/scheduler/freezer_scheduler.py +++ b/freezer/scheduler/freezer_scheduler.py @@ -43,7 +43,7 @@ LOG = log.getLogger(__name__) class FreezerScheduler(object): - def __init__(self, apiclient, interval, job_path): + def __init__(self, apiclient, interval, job_path, concurrent_jobs=1): # config_manager self.client = apiclient self.freezerc_executable = spawn.find_executable('freezer-agent') @@ -56,14 +56,14 @@ class FreezerScheduler(object): self.job_path = job_path self._client = None self.lock = threading.Lock() - self.execution_lock = threading.Lock() job_defaults = { 'coalesce': True, - 'max_instances': 2 + 'max_instances': 1 } executors = { 'default': {'type': 'threadpool', 'max_workers': 1}, - 'threadpool': {'type': 'threadpool', 'max_workers': 10} + 'threadpool': {'type': 'threadpool', + 'max_workers': concurrent_jobs} } self.scheduler = background.BackgroundScheduler( job_defaults=job_defaults, @@ -226,7 +226,8 @@ def main(): freezer_utils.create_dir(CONF.jobs_dir, do_log=False) freezer_scheduler = FreezerScheduler(apiclient=apiclient, interval=int(CONF.interval), - job_path=CONF.jobs_dir) + job_path=CONF.jobs_dir, + concurrent_jobs=CONF.concurrent_jobs) if CONF.no_daemon: print('Freezer Scheduler running in no-daemon mode') @@ -240,7 +241,8 @@ def main(): daemon = win_daemon.Daemon(daemonizable=freezer_scheduler, interval=int(CONF.interval), job_path=CONF.jobs_dir, - insecure=CONF.insecure) + insecure=CONF.insecure, + concurrent_jobs=CONF.concurrent_jobs) else: daemon = linux_daemon.Daemon(daemonizable=freezer_scheduler) diff --git a/freezer/scheduler/scheduler_job.py b/freezer/scheduler/scheduler_job.py index d65231d6..1497c953 100644 --- a/freezer/scheduler/scheduler_job.py +++ b/freezer/scheduler/scheduler_job.py @@ -382,8 +382,6 @@ class Job(object): ' retrying in {2} seconds' .format(self.id, action_name, max_retries_interval)) - # sleeping with the bloody lock, but we don't want other - # actions to mess with our stuff like fs snapshots, do we ? time.sleep(max_retries_interval) else: # SUCCESS @@ -419,46 +417,45 @@ class Job(object): def execute(self): result = Job.SUCCESS_RESULT - with self.scheduler.execution_lock: - with self.scheduler.lock: - LOG.info('job {0} running'.format(self.id)) - self.state = RunningState - self.update_job_schedule_doc(status=Job.RUNNING_STATUS, - result="", - time_started=int(time.time()), - time_ended=Job.TIME_NULL) - self.scheduler.update_job_schedule( - self.id, - self.job_doc['job_schedule']) + with self.scheduler.lock: + LOG.info('job {0} running'.format(self.id)) + self.state = RunningState + self.update_job_schedule_doc(status=Job.RUNNING_STATUS, + result="", + time_started=int(time.time()), + time_ended=Job.TIME_NULL) + self.scheduler.update_job_schedule( + self.id, + self.job_doc['job_schedule']) - self.start_session() - # if the job contains exec action and the scheduler passes the - # parameter --disable-exec job execution should fail - if self.contains_exec() and CONF.disable_exec: - LOG.info("Job {0} failed because it contains exec action " - "and exec actions are disabled by scheduler" - .format(self.id)) - self.result = Job.FAIL_RESULT - self.finish() - return - - for job_action in self.job_doc.get('job_actions', []): - if job_action.get('mandatory', False) or\ - (result == Job.SUCCESS_RESULT): - - action_result = self.execute_job_action(job_action) - if action_result == Job.FAIL_RESULT: - result = Job.FAIL_RESULT - - if action_result == Job.ABORTED_RESULT: - result = Job.ABORTED_RESULT - else: - freezer_action = job_action.get('freezer_action', {}) - action_name = freezer_action.get('action', '') - LOG.warning("skipping {0} action". - format(action_name)) - self.result = result + self.start_session() + # if the job contains exec action and the scheduler passes the + # parameter --disable-exec job execution should fail + if self.contains_exec() and CONF.disable_exec: + LOG.info("Job {0} failed because it contains exec action " + "and exec actions are disabled by scheduler" + .format(self.id)) + self.result = Job.FAIL_RESULT self.finish() + return + + for job_action in self.job_doc.get('job_actions', []): + if job_action.get('mandatory', False) or\ + (result == Job.SUCCESS_RESULT): + + action_result = self.execute_job_action(job_action) + if action_result == Job.FAIL_RESULT: + result = Job.FAIL_RESULT + + if action_result == Job.ABORTED_RESULT: + result = Job.ABORTED_RESULT + else: + freezer_action = job_action.get('freezer_action', {}) + action_name = freezer_action.get('action', '') + LOG.warning("skipping {0} action". + format(action_name)) + self.result = result + self.finish() def finish(self): self.update_job_schedule_doc(time_ended=int(time.time())) diff --git a/freezer/scheduler/win_daemon.py b/freezer/scheduler/win_daemon.py index 13a410bf..2213704e 100644 --- a/freezer/scheduler/win_daemon.py +++ b/freezer/scheduler/win_daemon.py @@ -62,7 +62,7 @@ class Daemon(object): instance """ def __init__(self, daemonizable=None, interval=None, job_path=None, - insecure=False): + insecure=False, concurrent_jobs=1): self.service_name = 'FreezerService' self.home = r'C:\.freezer' # this is only need it in order to have the same interface as in linux @@ -70,6 +70,7 @@ class Daemon(object): self.interval = interval or 60 self.job_path = job_path or r'C:\.freezer\scheduler\conf.d' self.insecure = insecure + self.concurrent_jobs = concurrent_jobs @utils.shield def start(self, log_file=None): @@ -85,6 +86,7 @@ class Daemon(object): # send arguments info to the windows service os.environ['SERVICE_JOB_PATH'] = self.job_path os.environ['SERVICE_INTERVAL'] = str(self.interval) + os.environ['SERVICE_CONCURRENT_JOBS'] = str(self.concurrent_jobs) winutils.save_environment(self.home) diff --git a/freezer/scheduler/win_service.py b/freezer/scheduler/win_service.py index 2f616e11..0d145599 100644 --- a/freezer/scheduler/win_service.py +++ b/freezer/scheduler/win_service.py @@ -99,7 +99,8 @@ class PySvc(win32serviceutil.ServiceFramework): scheduler = FreezerScheduler( apiclient=client, interval=int(os.environ['SERVICE_INTERVAL']), - job_path=os.environ['SERVICE_JOB_PATH']) + job_path=os.environ['SERVICE_JOB_PATH'], + concurrent_jobs=int(os.environ['SERVICE_CONCURRENT_JOBS'])) scheduler.start()