freezer/freezer/scheduler/scheduler_job.py

563 lines
19 KiB
Python

"""
Copyright 2015 Hewlett-Packard
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import datetime
import json
import os
import subprocess
import tempfile
import time
from freezer.utils import utils
from oslo_config import cfg
from oslo_log import log
from six.moves import configparser
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class StopState(object):
@staticmethod
def stop(job, doc):
job.job_doc = doc
job.event = Job.NO_EVENT
job.job_doc_status = Job.STOP_STATUS
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def abort(job, doc):
return StopState.stop(job, doc)
@staticmethod
def start(job, doc):
job.job_doc = doc
job.event = Job.NO_EVENT
job.job_doc_status = Job.STOP_STATUS
job.schedule()
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def remove(job):
job.unschedule()
job.job_doc_status = Job.REMOVED_STATUS
return Job.NO_EVENT
class ScheduledState(object):
@staticmethod
def stop(job, doc):
job.unschedule()
job.scheduler.update_job(job.id, job.job_doc)
return Job.STOP_EVENT
@staticmethod
def abort(job, doc):
return StopState.stop(job, doc)
@staticmethod
def start(job, doc):
job.event = Job.NO_EVENT
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def remove(job):
job.unschedule()
job.job_doc_status = Job.REMOVED_STATUS
return Job.NO_EVENT
class RunningState(object):
@staticmethod
def stop(job, doc):
job.event = Job.STOP_EVENT
return Job.NO_EVENT
@staticmethod
def abort(job, doc):
job.event = Job.ABORT_EVENT
job.scheduler.update_job(job.id, job.job_doc)
return Job.ABORTED_RESULT
@staticmethod
def start(job, doc):
job.event = Job.NO_EVENT
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def remove(job):
job.event = Job.REMOVE_EVENT
return Job.NO_EVENT
class Job(object):
NO_EVENT = ''
STOP_EVENT = 'stop'
START_EVENT = 'start'
ABORT_EVENT = 'abort'
REMOVE_EVENT = 'remove'
STOP_STATUS = 'stop'
SCHEDULED_STATUS = 'scheduled'
RUNNING_STATUS = 'running'
REMOVED_STATUS = 'removed'
COMPLETED_STATUS = 'completed'
FAIL_RESULT = 'fail'
SUCCESS_RESULT = 'success'
ABORTED_RESULT = 'aborted'
TIME_NULL = -1
@staticmethod
def create(scheduler, executable, job_doc):
job = Job(scheduler, executable, job_doc)
if job.job_doc_status in ['running', 'scheduled']:
LOG.warning('Resetting {0} status from job {1}'
.format(job.job_doc_status, job.id))
if job.job_doc_status == 'stop' and not job.event:
LOG.info('Job {0} was stopped.'.format(job.id))
job.event = Job.STOP_EVENT
elif not job.event:
LOG.info('Autostart Job {0}'.format(job.id))
job.event = Job.START_EVENT
return job
def __init__(self, scheduler, executable, job_doc):
self.scheduler = scheduler
self.executable = executable
self.job_doc = job_doc
self.process = None
self.state = StopState
def remove(self):
with self.scheduler.lock:
# delegate to state object
LOG.info('REMOVE job {0}'.format(self.id))
self.state.remove(self)
@property
def id(self):
return self.job_doc['job_id']
@property
def session_id(self):
return self.job_doc.get('session_id', '')
@session_id.setter
def session_id(self, value):
self.job_doc['session_id'] = value
@property
def session_tag(self):
return self.job_doc.get('session_tag', 0)
@session_tag.setter
def session_tag(self, value):
self.job_doc['session_tag'] = value
@property
def event(self):
return self.job_doc['job_schedule'].get('event', '')
@event.setter
def event(self, value):
self.job_doc['job_schedule']['event'] = value
@property
def job_doc_status(self):
return self.job_doc['job_schedule'].get('status', '')
@job_doc_status.setter
def job_doc_status(self, value):
self.job_doc['job_schedule']['status'] = value
@property
def result(self):
return self.job_doc['job_schedule'].get('result', '')
@result.setter
def result(self, value):
self.job_doc['job_schedule']['result'] = value
def can_be_removed(self):
return self.job_doc_status == Job.REMOVED_STATUS
@staticmethod
def save_action_to_file(action, f):
parser = configparser.ConfigParser()
parser.add_section('action')
for action_k, action_v in action.items():
parser.set('action', action_k, action_v)
parser.write(f)
f.seek(0)
@property
def schedule_date(self):
return self.job_doc['job_schedule'].get('schedule_date', '')
@property
def schedule_interval(self):
return self.job_doc['job_schedule'].get('schedule_interval', '')
@property
def schedule_start_date(self):
return self.job_doc['job_schedule'].get('schedule_start_date', '')
@property
def schedule_end_date(self):
return self.job_doc['job_schedule'].get('schedule_end_date', '')
@property
def schedule_cron_fields(self):
cron_fields = ['year', 'month', 'day', 'week', 'day_of_week',
'hour', 'minute', 'second']
cron_schedule = {}
for cron in self.job_doc['job_schedule'].keys():
if cron.startswith('schedule_'):
cron_key = cron.split('_', 1)[1]
cron_schedule.update({
cron_key: self.job_doc['job_schedule'][cron]})
return {key: value
for key, value in cron_schedule.items()
if key in cron_fields}
@property
def scheduled(self):
return self.scheduler.is_scheduled(self.id)
def get_schedule_args(self):
def get_start_date(date):
# start_date format "%Y-%m-%dT%H:%M:%S"
now = datetime.datetime.now()
start_date = now + datetime.timedelta(0, 2, 0)
if (utils.date_to_timestamp(date) >
utils.date_to_timestamp(now.isoformat().split('.')[0])):
start_date = datetime.datetime.strptime(
date, "%Y-%m-%dT%H:%M:%S")
return start_date
def get_end_date(start, end):
# start end format "%Y-%m-%dT%H:%M:%S"
end_date = datetime.datetime.strptime(end, "%Y-%m-%dT%H:%M:%S")
if (utils.date_to_timestamp(start) > utils.date_to_timestamp(end)):
end_date = None
return end_date
kwargs_date = {}
if self.schedule_start_date:
kwargs_date.update({
'start_date': get_start_date(self.schedule_start_date)
})
if self.schedule_end_date:
end_date = get_end_date(self.schedule_start_date,
self.schedule_end_date)
kwargs_date.update({
'end_date': end_date
})
if self.schedule_date:
return {'trigger': 'date',
'run_date': self.schedule_date}
elif self.schedule_interval:
kwargs = {'trigger': 'interval'}
kwargs.update(kwargs_date)
if self.schedule_interval == 'continuous':
kwargs.update({'seconds': 1})
else:
val, unit = self.schedule_interval.split(' ')
kwargs.update({unit: int(val)})
return kwargs
elif self.schedule_cron_fields:
kwargs = {'trigger': 'cron'}
kwargs.update(kwargs_date)
cron_fields = self.schedule_cron_fields
kwargs.update(cron_fields)
return kwargs
else:
# no scheduling information, schedule to start within a few seconds
return {'trigger': 'date',
'run_date': datetime.datetime.now() +
datetime.timedelta(0, 2, 0)}
def process_event(self, job_doc):
with self.scheduler.lock:
next_event = job_doc['job_schedule'].get('event', '')
while next_event:
if next_event == Job.STOP_EVENT:
LOG.info('JOB {0} event: STOP'.format(self.id))
next_event = self.state.stop(self, job_doc)
elif next_event == Job.START_EVENT:
LOG.info('JOB {0} event: START'.format(self.id))
next_event = self.state.start(self, job_doc)
elif next_event == Job.ABORT_EVENT:
LOG.info('JOB {0} event: ABORT'.format(self.id))
next_event = self.state.abort(self, job_doc)
elif next_event == Job.ABORTED_RESULT:
LOG.info('JOB {0} aborted.'.format(self.id))
break
def upload_metadata(self, metadata_string):
try:
metadata = json.loads(metadata_string)
if metadata:
metadata['job_id'] = self.id
self.scheduler.upload_metadata(metadata)
LOG.info("Job {0}, freezer action metadata uploaded"
.format(self.id))
except Exception as e:
LOG.error('metrics upload error: {0}'.format(e))
def execute_job_action(self, job_action):
max_tries = (job_action.get('max_retries', 0) + 1)
tries = max_tries
freezer_action = job_action.get('freezer_action', {})
max_retries_interval = job_action.get('max_retries_interval', 60)
action_name = freezer_action.get('action', '')
config_file_name = None
while tries:
with tempfile.NamedTemporaryFile(delete=False) as config_file:
self.save_action_to_file(freezer_action, config_file)
config_file_name = config_file.name
freezer_command = '{0} --metadata-out - --config {1}'.\
format(self.executable, config_file.name)
self.process = subprocess.Popen(freezer_command.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=os.environ.copy())
# store the pid for this process in the api
try:
self.job_doc['job_schedule']['current_pid'] = \
self.process.pid
self.scheduler.update_job(self.job_doc['job_id'],
self.job_doc)
except Exception as error:
LOG.error("Error saving the process id {}".format(error))
output, error = self.process.communicate()
# ensure the tempfile gets deleted
utils.delete_file(config_file_name)
if error:
LOG.error("Freezer client error: {0}".format(error))
elif output:
self.upload_metadata(output)
if self.process.returncode == -15:
# This means the job action was aborted by the scheduler
LOG.warning('Freezer-agent was killed by the scheduler. '
'Cleanup should be done manually: container, '
'mountpoint and lvm snapshots.')
return Job.ABORTED_RESULT
elif self.process.returncode:
# ERROR
tries -= 1
if tries:
LOG.warning('Job {0} failed {1} action,'
' retrying in {2} seconds'
.format(self.id, action_name,
max_retries_interval))
time.sleep(max_retries_interval)
else:
# SUCCESS
LOG.info('Job {0} action {1}'
' returned success exit code'.
format(self.id, action_name))
return Job.SUCCESS_RESULT
LOG.error('Job {0} action {1} failed after {2} tries'
.format(self.id, action_name, max_tries))
return Job.FAIL_RESULT
def contains_exec(self):
jobs = self.job_doc.get('job_actions')
for job in jobs:
freezer_action = job.get('freezer_action')
action = freezer_action.get('action')
if action == 'exec':
return True
return False
def update_job_schedule_doc(self, **kwargs):
"""
Updates the job_schedule section of the job doc with the
provided keyword args. No checks about accepted key/values
are being made here since they may vary in the future.
:param kwargs: keyword args to add
:return: None
"""
job_schedule = self.job_doc['job_schedule']
job_schedule.update(kwargs)
def execute(self):
result = Job.SUCCESS_RESULT
with self.scheduler.lock:
LOG.info('job {0} running'.format(self.id))
self.state = RunningState
self.update_job_schedule_doc(status=Job.RUNNING_STATUS,
result="",
time_started=int(time.time()),
time_ended=Job.TIME_NULL)
self.scheduler.update_job_schedule(
self.id,
self.job_doc['job_schedule'])
self.start_session()
# if the job contains exec action and the scheduler passes the
# parameter --disable-exec job execution should fail
if self.contains_exec() and CONF.disable_exec:
LOG.info("Job {0} failed because it contains exec action "
"and exec actions are disabled by scheduler"
.format(self.id))
self.result = Job.FAIL_RESULT
self.finish()
return
for job_action in self.job_doc.get('job_actions', []):
if job_action.get('mandatory', False) or\
(result == Job.SUCCESS_RESULT):
action_result = self.execute_job_action(job_action)
if action_result == Job.FAIL_RESULT:
result = Job.FAIL_RESULT
if action_result == Job.ABORTED_RESULT:
result = Job.ABORTED_RESULT
else:
freezer_action = job_action.get('freezer_action', {})
action_name = freezer_action.get('action', '')
LOG.warning("skipping {0} action".
format(action_name))
self.result = result
self.finish()
def finish(self):
self.update_job_schedule_doc(time_ended=int(time.time()))
self.end_session(self.result)
with self.scheduler.lock:
if self.event == Job.REMOVE_EVENT:
self.unschedule()
self.job_doc_status = Job.REMOVED_STATUS
return
if not self.scheduled:
self.job_doc_status = Job.COMPLETED_STATUS
self.state = StopState
self.scheduler.update_job(self.id, self.job_doc)
return
if self.event in [Job.STOP_EVENT, Job.ABORT_EVENT]:
self.unschedule()
self.job_doc_status = Job.COMPLETED_STATUS
self.scheduler.update_job(self.id, self.job_doc)
else:
self.job_doc_status = Job.SCHEDULED_STATUS
self.state = ScheduledState
self.scheduler.update_job_schedule(
self.id,
self.job_doc['job_schedule'])
def start_session(self):
if not self.session_id:
return
retry = 5
while retry:
try:
resp = self.scheduler.start_session(self.session_id,
self.id,
self.session_tag)
if resp['result'] == 'success':
self.session_tag = resp['session_tag']
return
except Exception as e:
LOG.error('Error while starting session {0}. {1}'.
format(self.session_id, e))
LOG.warning('Retrying to start session {0}'.
format(self.session_id))
retry -= 1
LOG.error('Unable to start session {0}'.format(self.session_id))
def end_session(self, result):
if not self.session_id:
return
retry = 5
while retry:
try:
resp = self.scheduler.end_session(self.session_id,
self.id,
self.session_tag,
result)
if resp['result'] == 'success':
return
except Exception as e:
LOG.error('Error while ending session {0}. {1}'.
format(self.session_id, e))
LOG.warning('Retrying to end session {0}'.
format(self.session_id))
retry -= 1
LOG.error('Unable to end session {0}'.format(self.session_id))
def schedule(self):
try:
kwargs = self.get_schedule_args()
self.scheduler.add_job(self.execute, id=self.id,
executor='threadpool',
misfire_grace_time=3600, **kwargs)
except Exception as e:
LOG.error("Unable to schedule job {0}: {1}".
format(self.id, e))
LOG.info('scheduler job with parameters {0}'.format(kwargs))
if self.scheduled:
self.job_doc_status = Job.SCHEDULED_STATUS
self.state = ScheduledState
else:
# job not scheduled or already started and waiting for lock
self.job_doc_status = Job.COMPLETED_STATUS
self.state = StopState
def unschedule(self):
try:
# already executing job are not present in the apscheduler list
self.scheduler.remove_job(job_id=self.id)
except Exception:
pass
self.event = Job.NO_EVENT
self.job_doc_status = Job.STOP_STATUS
self.state = StopState
def terminate(self):
if self.process:
self.process.terminate()
def kill(self):
if self.process:
self.process.kill()