Merge "Allow for multiple jobs to be executed at the same time"
This commit is contained in:
commit
1157d3c5f8
@ -121,3 +121,9 @@
|
||||
|
||||
# Initialize freezer scheduler with insecure mode (boolean value)
|
||||
#insecure = false
|
||||
|
||||
# Number of jobs that can be executed at the same time. (integer value)
|
||||
# By default only one job is allowed at a given time because there is no
|
||||
# built-in protection to prevent backup and restore to be done at the same time
|
||||
# on one resource.
|
||||
#concurrent_jobs = 1
|
||||
|
@ -98,6 +98,11 @@ def get_common_opts():
|
||||
dest='disable_exec',
|
||||
help='Allow Freezer Scheduler to deny jobs that execute '
|
||||
'commands for security reasons'),
|
||||
cfg.IntOpt('concurrent_jobs',
|
||||
default=1,
|
||||
dest='concurrent_jobs',
|
||||
help='Number of jobs that can be executed at the'
|
||||
' same time'),
|
||||
]
|
||||
|
||||
return _COMMON
|
||||
|
@ -43,7 +43,7 @@ LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class FreezerScheduler(object):
|
||||
def __init__(self, apiclient, interval, job_path):
|
||||
def __init__(self, apiclient, interval, job_path, concurrent_jobs=1):
|
||||
# config_manager
|
||||
self.client = apiclient
|
||||
self.freezerc_executable = spawn.find_executable('freezer-agent')
|
||||
@ -56,14 +56,14 @@ class FreezerScheduler(object):
|
||||
self.job_path = job_path
|
||||
self._client = None
|
||||
self.lock = threading.Lock()
|
||||
self.execution_lock = threading.Lock()
|
||||
job_defaults = {
|
||||
'coalesce': True,
|
||||
'max_instances': 2
|
||||
'max_instances': 1
|
||||
}
|
||||
executors = {
|
||||
'default': {'type': 'threadpool', 'max_workers': 1},
|
||||
'threadpool': {'type': 'threadpool', 'max_workers': 10}
|
||||
'threadpool': {'type': 'threadpool',
|
||||
'max_workers': concurrent_jobs}
|
||||
}
|
||||
self.scheduler = background.BackgroundScheduler(
|
||||
job_defaults=job_defaults,
|
||||
@ -226,7 +226,8 @@ def main():
|
||||
freezer_utils.create_dir(CONF.jobs_dir, do_log=False)
|
||||
freezer_scheduler = FreezerScheduler(apiclient=apiclient,
|
||||
interval=int(CONF.interval),
|
||||
job_path=CONF.jobs_dir)
|
||||
job_path=CONF.jobs_dir,
|
||||
concurrent_jobs=CONF.concurrent_jobs)
|
||||
|
||||
if CONF.no_daemon:
|
||||
print('Freezer Scheduler running in no-daemon mode')
|
||||
@ -240,7 +241,8 @@ def main():
|
||||
daemon = win_daemon.Daemon(daemonizable=freezer_scheduler,
|
||||
interval=int(CONF.interval),
|
||||
job_path=CONF.jobs_dir,
|
||||
insecure=CONF.insecure)
|
||||
insecure=CONF.insecure,
|
||||
concurrent_jobs=CONF.concurrent_jobs)
|
||||
else:
|
||||
daemon = linux_daemon.Daemon(daemonizable=freezer_scheduler)
|
||||
|
||||
|
@ -382,8 +382,6 @@ class Job(object):
|
||||
' retrying in {2} seconds'
|
||||
.format(self.id, action_name,
|
||||
max_retries_interval))
|
||||
# sleeping with the bloody lock, but we don't want other
|
||||
# actions to mess with our stuff like fs snapshots, do we ?
|
||||
time.sleep(max_retries_interval)
|
||||
else:
|
||||
# SUCCESS
|
||||
@ -419,46 +417,45 @@ class Job(object):
|
||||
|
||||
def execute(self):
|
||||
result = Job.SUCCESS_RESULT
|
||||
with self.scheduler.execution_lock:
|
||||
with self.scheduler.lock:
|
||||
LOG.info('job {0} running'.format(self.id))
|
||||
self.state = RunningState
|
||||
self.update_job_schedule_doc(status=Job.RUNNING_STATUS,
|
||||
result="",
|
||||
time_started=int(time.time()),
|
||||
time_ended=Job.TIME_NULL)
|
||||
self.scheduler.update_job_schedule(
|
||||
self.id,
|
||||
self.job_doc['job_schedule'])
|
||||
with self.scheduler.lock:
|
||||
LOG.info('job {0} running'.format(self.id))
|
||||
self.state = RunningState
|
||||
self.update_job_schedule_doc(status=Job.RUNNING_STATUS,
|
||||
result="",
|
||||
time_started=int(time.time()),
|
||||
time_ended=Job.TIME_NULL)
|
||||
self.scheduler.update_job_schedule(
|
||||
self.id,
|
||||
self.job_doc['job_schedule'])
|
||||
|
||||
self.start_session()
|
||||
# if the job contains exec action and the scheduler passes the
|
||||
# parameter --disable-exec job execution should fail
|
||||
if self.contains_exec() and CONF.disable_exec:
|
||||
LOG.info("Job {0} failed because it contains exec action "
|
||||
"and exec actions are disabled by scheduler"
|
||||
.format(self.id))
|
||||
self.result = Job.FAIL_RESULT
|
||||
self.finish()
|
||||
return
|
||||
|
||||
for job_action in self.job_doc.get('job_actions', []):
|
||||
if job_action.get('mandatory', False) or\
|
||||
(result == Job.SUCCESS_RESULT):
|
||||
|
||||
action_result = self.execute_job_action(job_action)
|
||||
if action_result == Job.FAIL_RESULT:
|
||||
result = Job.FAIL_RESULT
|
||||
|
||||
if action_result == Job.ABORTED_RESULT:
|
||||
result = Job.ABORTED_RESULT
|
||||
else:
|
||||
freezer_action = job_action.get('freezer_action', {})
|
||||
action_name = freezer_action.get('action', '')
|
||||
LOG.warning("skipping {0} action".
|
||||
format(action_name))
|
||||
self.result = result
|
||||
self.start_session()
|
||||
# if the job contains exec action and the scheduler passes the
|
||||
# parameter --disable-exec job execution should fail
|
||||
if self.contains_exec() and CONF.disable_exec:
|
||||
LOG.info("Job {0} failed because it contains exec action "
|
||||
"and exec actions are disabled by scheduler"
|
||||
.format(self.id))
|
||||
self.result = Job.FAIL_RESULT
|
||||
self.finish()
|
||||
return
|
||||
|
||||
for job_action in self.job_doc.get('job_actions', []):
|
||||
if job_action.get('mandatory', False) or\
|
||||
(result == Job.SUCCESS_RESULT):
|
||||
|
||||
action_result = self.execute_job_action(job_action)
|
||||
if action_result == Job.FAIL_RESULT:
|
||||
result = Job.FAIL_RESULT
|
||||
|
||||
if action_result == Job.ABORTED_RESULT:
|
||||
result = Job.ABORTED_RESULT
|
||||
else:
|
||||
freezer_action = job_action.get('freezer_action', {})
|
||||
action_name = freezer_action.get('action', '')
|
||||
LOG.warning("skipping {0} action".
|
||||
format(action_name))
|
||||
self.result = result
|
||||
self.finish()
|
||||
|
||||
def finish(self):
|
||||
self.update_job_schedule_doc(time_ended=int(time.time()))
|
||||
|
@ -62,7 +62,7 @@ class Daemon(object):
|
||||
instance
|
||||
"""
|
||||
def __init__(self, daemonizable=None, interval=None, job_path=None,
|
||||
insecure=False):
|
||||
insecure=False, concurrent_jobs=1):
|
||||
self.service_name = 'FreezerService'
|
||||
self.home = r'C:\.freezer'
|
||||
# this is only need it in order to have the same interface as in linux
|
||||
@ -70,6 +70,7 @@ class Daemon(object):
|
||||
self.interval = interval or 60
|
||||
self.job_path = job_path or r'C:\.freezer\scheduler\conf.d'
|
||||
self.insecure = insecure
|
||||
self.concurrent_jobs = concurrent_jobs
|
||||
|
||||
@utils.shield
|
||||
def start(self, log_file=None):
|
||||
@ -85,6 +86,7 @@ class Daemon(object):
|
||||
# send arguments info to the windows service
|
||||
os.environ['SERVICE_JOB_PATH'] = self.job_path
|
||||
os.environ['SERVICE_INTERVAL'] = str(self.interval)
|
||||
os.environ['SERVICE_CONCURRENT_JOBS'] = str(self.concurrent_jobs)
|
||||
|
||||
winutils.save_environment(self.home)
|
||||
|
||||
|
@ -99,7 +99,8 @@ class PySvc(win32serviceutil.ServiceFramework):
|
||||
|
||||
scheduler = FreezerScheduler(
|
||||
apiclient=client, interval=int(os.environ['SERVICE_INTERVAL']),
|
||||
job_path=os.environ['SERVICE_JOB_PATH'])
|
||||
job_path=os.environ['SERVICE_JOB_PATH'],
|
||||
concurrent_jobs=int(os.environ['SERVICE_CONCURRENT_JOBS']))
|
||||
|
||||
scheduler.start()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user