Enable parallel job execution for the scheduler to allow abort signals

Now the scheduler uses two pools of resources to handle execution of jobs.
The first one will handle the polling of jobs from the api.
The second one will handle the job exection. This way we avoid blocking the
scheduler while running jobs.

Abort a running job is now enabled thanks to the parallel execution.

Depends-on: I9e92ccdd7c592a8109fe8398db6d53c9c42dc10a
Closes-bug: 1615662
Change-Id: I48153dcb521b6a8f0138d5ef81239cf4cded4df0
This commit is contained in:
Memo Garcia 2016-08-22 15:11:08 +01:00 committed by Pierre Mathieu
parent 6e77d8064b
commit c4ea71a4d0
6 changed files with 87 additions and 18 deletions

View File

@ -18,8 +18,9 @@ limitations under the License.
import six
import sys
import threading
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from distutils import spawn
from oslo_config import cfg
from oslo_log import log
@ -29,6 +30,7 @@ from freezer.scheduler import arguments
from freezer.scheduler import scheduler_job
from freezer.scheduler import shell
from freezer.scheduler import utils
from freezer.utils import utils as freezer_utils
from freezer.utils import winutils
@ -60,12 +62,18 @@ class FreezerScheduler(object):
self.execution_lock = threading.Lock()
job_defaults = {
'coalesce': True,
'max_instances': 1
'max_instances': 2
}
self.scheduler = BlockingScheduler(job_defaults=job_defaults)
executors = {
'default': {'type': 'threadpool', 'max_workers': 1},
'threadpool': {'type': 'threadpool', 'max_workers': 10}
}
self.scheduler = BackgroundScheduler(job_defaults=job_defaults,
executors=executors)
if self.client:
self.scheduler.add_job(self.poll, 'interval',
seconds=interval, id='api_poll')
seconds=interval, id='api_poll',
executor='default')
self.add_job = self.scheduler.add_job
self.remove_job = self.scheduler.remove_job
@ -108,6 +116,15 @@ class FreezerScheduler(object):
utils.do_register(self.client)
self.poll()
self.scheduler.start()
try:
while True:
# Due to the new Background scheduler nature, we need to keep
# the main thread alive.
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but
# should be done if possible
self.scheduler.shutdown(wait=False)
def update_job(self, job_id, job_doc):
if self.client:
@ -145,6 +162,14 @@ class FreezerScheduler(object):
work_job_id_list.append(job_id)
job = self.jobs.get(job_id, None) or self.create_job(job_doc)
if job:
# check for abort status
if (job_doc['job_schedule']['event'] == 'abort' and
job_doc['job_schedule']['result'] != 'aborted'):
pid = int(job_doc['job_schedule']['current_pid'])
freezer_utils.terminate_subprocess(pid, 'freezer-agent')
job.process_event(job_doc)
# request removal of any job that has been removed in the api

View File

@ -72,7 +72,7 @@ class ScheduledState(object):
@staticmethod
def abort(job, doc):
return ScheduledState.stop(job, doc)
return StopState.stop(job, doc)
@staticmethod
def start(job, doc):
@ -97,7 +97,8 @@ class RunningState(object):
@staticmethod
def abort(job, doc):
job.event = Job.ABORT_EVENT
return Job.NO_EVENT
job.scheduler.update_job(job.id, job.job_doc)
return Job.ABORTED_RESULT
@staticmethod
def start(job, doc):
@ -303,6 +304,9 @@ class Job(object):
elif next_event == Job.ABORT_EVENT:
LOG.info('JOB {0} event: ABORT'.format(self.id))
next_event = self.state.abort(self, job_doc)
elif next_event == Job.ABORTED_RESULT:
LOG.info('JOB {0} aborted.'.format(self.id))
break
def upload_metadata(self, metadata_string):
try:
@ -323,7 +327,6 @@ class Job(object):
action_name = freezer_action.get('action', '')
config_file_name = None
while tries:
with tempfile.NamedTemporaryFile(delete=False) as config_file:
self.save_action_to_file(freezer_action, config_file)
config_file_name = config_file.name
@ -333,6 +336,17 @@ class Job(object):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy())
# store the pid for this process in the api
try:
self.job_doc['job_schedule']['current_pid'] = \
self.process.pid
self.scheduler.update_job(self.job_doc['job_id'],
self.job_doc)
except Exception as error:
LOG.error("Error saving the process id {}".format(error))
output, error = self.process.communicate()
# ensure the tempfile gets deleted
utils.delete_file(config_file_name)
@ -342,7 +356,11 @@ class Job(object):
elif output:
self.upload_metadata(output)
if self.process.returncode:
if self.process.returncode == -15:
# This means the job action was aborted by the scheduler.
return Job.ABORTED_RESULT
elif self.process.returncode:
# ERROR
tries -= 1
if tries:
@ -396,9 +414,13 @@ class Job(object):
for job_action in self.job_doc.get('job_actions', []):
if job_action.get('mandatory', False) or\
(result == Job.SUCCESS_RESULT):
action_result = self.execute_job_action(job_action)
if action_result == Job.FAIL_RESULT:
result = Job.FAIL_RESULT
if action_result == Job.ABORTED_RESULT:
result = Job.ABORTED_RESULT
else:
freezer_action = job_action.get('freezer_action', {})
action_name = freezer_action.get('action', '')
@ -473,7 +495,9 @@ class Job(object):
def schedule(self):
try:
kwargs = self.get_schedule_args()
self.scheduler.add_job(self.execute, id=self.id, **kwargs)
self.scheduler.add_job(self.execute, id=self.id,
executor='threadpool',
misfire_grace_time=3600, **kwargs)
except Exception as e:
LOG.error("Unable to schedule job {0}: {1}".
format(self.id, e))

View File

@ -277,3 +277,10 @@ def do_backup_list(client, args):
table.add_row(row)
l = list_func(offset=offset)
print(table)
def do_job_abort(client, args):
if not args.job_id:
raise Exception("Parameter --job required")
client.jobs.abort_job(args.job_id)
print("Job {0} aborted".format(args.job_id))

View File

@ -76,7 +76,7 @@ class BaseFreezerCliTest(base.BaseFreezerTest):
self.fail('Could not find job: {}'.format(job_id))
def wait_for_job_status(self, job_id, status, timeout=360):
def wait_for_job_status(self, job_id, status, timeout=720):
start = time.time()
while True:
@ -231,7 +231,6 @@ class TestFreezerScenario(BaseFreezerCliTest):
self.cli.freezer_scheduler(action='start', flags='-c test_node')
def tearDown(self):
super(TestFreezerScenario, self).tearDown()
self.source_tree.cleanup()
@ -239,7 +238,6 @@ class TestFreezerScenario(BaseFreezerCliTest):
self.cli.freezer_scheduler(action='stop', flags='-c test_node')
def test_simple_backup(self):
backup_job = {
"job_actions": [
@ -247,9 +245,10 @@ class TestFreezerScenario(BaseFreezerCliTest):
"freezer_action": {
"action": "backup",
"mode": "fs",
"storage": "local",
"backup_name": "backup1",
"path_to_backup": self.source_tree.path,
"container": "freezer_test",
"container": "/tmp/freezer_test/",
},
"max_retries": 3,
"max_retries_interval": 60
@ -262,9 +261,10 @@ class TestFreezerScenario(BaseFreezerCliTest):
{
"freezer_action": {
"action": "restore",
"storage": "local",
"restore_abs_path": self.dest_tree.path,
"backup_name": "backup1",
"container": "freezer_test",
"container": "/tmp/freezer_test/",
},
"max_retries": 3,
"max_retries_interval": 60
@ -274,6 +274,7 @@ class TestFreezerScenario(BaseFreezerCliTest):
}
backup_job_id = self.create_job(backup_job)
self.cli.freezer_scheduler(action='job-start', flags='-c test_node -j {}'.format(backup_job_id))
self.wait_for_job_status(backup_job_id, 'completed')
self.assertJobColumnEqual(backup_job_id, JOB_TABLE_RESULT_COLUMN, 'success')
@ -281,8 +282,4 @@ class TestFreezerScenario(BaseFreezerCliTest):
self.wait_for_job_status(restore_job_id, 'completed')
self.assertJobColumnEqual(restore_job_id, JOB_TABLE_RESULT_COLUMN, 'success')
self.assertTrue(self.source_tree.is_equal(self.dest_tree))

View File

@ -30,6 +30,8 @@ from oslo_config import cfg
from oslo_log import log
from six.moves import configparser
import psutil
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@ -510,3 +512,15 @@ def set_max_process_priority():
])
except Exception as priority_error:
LOG.warning('Priority: {0}'.format(priority_error))
def terminate_subprocess(pid, name):
try:
process = psutil.Process(pid)
if process.name.startswith(name):
process.terminate()
else:
LOG.warning('The name {} does not match the pid {}'.format(
name, pid))
except Exception:
LOG.error('Process {} does not exists anymore'.format(pid))

View File

@ -22,3 +22,5 @@ six>=1.9.0 # MIT
# Not in global-requirements
apscheduler # MIT License
psutil>=1.1.1,<2.0.0 # BSD