freezer/freezer/scheduler/scheduler_job.py

441 lines
15 KiB
Python

"""
Copyright 2015 Hewlett-Packard
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This product includes cryptographic software written by Eric Young
(eay@cryptsoft.com). This product includes software written by Tim
Hudson (tjh@cryptsoft.com).
========================================================================
"""
import logging
import subprocess
import tempfile
import datetime
import json
import time
from ConfigParser import ConfigParser
class StopState(object):
@staticmethod
def stop(job, doc):
job.job_doc = doc
job.event = Job.NO_EVENT
job.job_doc_status = Job.STOP_STATUS
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def abort(job, doc):
job.job_doc = doc
job.event = Job.NO_EVENT
job.job_doc_status = Job.STOP_STATUS
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def start(job, doc):
job.job_doc = doc
job.event = Job.NO_EVENT
job.job_doc_status = Job.STOP_STATUS
job.schedule()
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def remove(job):
job.unschedule()
job.job_doc_status = Job.REMOVED_STATUS
return Job.NO_EVENT
class ScheduledState(object):
@staticmethod
def stop(job, doc):
job.unschedule()
job.scheduler.update_job(job.id, job.job_doc)
return Job.STOP_EVENT
@staticmethod
def abort(job, doc):
return ScheduledState.stop(job, doc)
@staticmethod
def start(job, doc):
job.event = Job.NO_EVENT
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def remove(job):
job.unschedule()
job.job_doc_status = Job.REMOVED_STATUS
return Job.NO_EVENT
class RunningState(object):
@staticmethod
def stop(job, doc):
job.event = Job.STOP_EVENT
return Job.NO_EVENT
@staticmethod
def abort(job, doc):
job.event = Job.ABORT_EVENT
return Job.NO_EVENT
@staticmethod
def start(job, doc):
job.event = Job.NO_EVENT
job.scheduler.update_job(job.id, job.job_doc)
return Job.NO_EVENT
@staticmethod
def remove(job):
job.event = Job.REMOVE_EVENT
return Job.NO_EVENT
class Job(object):
NO_EVENT = ''
STOP_EVENT = 'stop'
START_EVENT = 'start'
ABORT_EVENT = 'abort'
REMOVE_EVENT = 'remove'
STOP_STATUS = 'stop'
SCHEDULED_STATUS = 'scheduled'
RUNNING_STATUS = 'running'
REMOVED_STATUS = 'removed'
COMPLETED_STATUS = 'completed'
FAIL_RESULT = 'fail'
SUCCESS_RESULT = 'success'
ABORTED_RESULT = 'aborted'
@staticmethod
def create(scheduler, executable, job_doc):
job = Job(scheduler, executable, job_doc)
if job.job_doc_status in ['running', 'scheduled']:
logging.warning('Job {0} already has {1} status, '
'skipping'.format(job.id, job.job_doc_status))
return None
if not job.event and not job.job_doc_status:
logging.warning('Autostart Job {0}'.format(job.id))
job.event = Job.START_EVENT
return job
def __init__(self, scheduler, executable, job_doc):
self.scheduler = scheduler
self.executable = executable
self.job_doc = job_doc
self.process = None
self.state = StopState
def remove(self):
with self.scheduler.lock:
# delegate to state object
logging.info('REMOVE job {0}'.format(self.id))
self.state.remove(self)
@property
def id(self):
return self.job_doc['job_id']
@property
def session_id(self):
return self.job_doc.get('session_id', '')
@session_id.setter
def session_id(self, value):
self.job_doc['session_id'] = value
@property
def session_tag(self):
return self.job_doc.get('session_tag', 0)
@session_tag.setter
def session_tag(self, value):
self.job_doc['session_tag'] = value
@property
def event(self):
return self.job_doc['job_schedule'].get('event', '')
@event.setter
def event(self, value):
self.job_doc['job_schedule']['event'] = value
@property
def job_doc_status(self):
return self.job_doc['job_schedule'].get('status', '')
@job_doc_status.setter
def job_doc_status(self, value):
self.job_doc['job_schedule']['status'] = value
@property
def result(self):
return self.job_doc['job_schedule'].get('result', '')
@result.setter
def result(self, value):
self.job_doc['job_schedule']['result'] = value
def can_be_removed(self):
return self.job_doc_status == Job.REMOVED_STATUS
@staticmethod
def save_action_to_file(action, f):
parser = ConfigParser()
parser.add_section('action')
for action_k, action_v in action.items():
parser.set('action', action_k, action_v)
parser.write(f)
f.seek(0)
@property
def schedule_date(self):
return self.job_doc['job_schedule'].get('schedule_date', '')
@property
def schedule_interval(self):
return self.job_doc['job_schedule'].get('schedule_interval', '')
@property
def schedule_cron_fields(self):
cron_fields = ['year', 'month', 'day', 'week', 'day_of_week',
'hour', 'minute', 'second']
return {key: value
for key, value in self.job_doc['job_schedule'].items()
if key in cron_fields}
@property
def scheduled(self):
return self.scheduler.is_scheduled(self.id)
def get_schedule_args(self):
if self.schedule_date:
return {'trigger': 'date',
'run_date': self.schedule_date}
elif self.schedule_interval:
kwargs = {'trigger': 'interval'}
if self.schedule_interval == 'continuous':
kwargs.update({'seconds': 1})
else:
val, unit = self.schedule_interval.split(' ')
kwargs.update({unit: int(val)})
return kwargs
else:
cron_fields = self.schedule_cron_fields
if cron_fields:
return {'trigger': 'cron'}.update(cron_fields)
# no scheduling information, schedule to start within a few seconds
return {'trigger': 'date',
'run_date': datetime.datetime.now() +
datetime.timedelta(0, 2, 0)}
def process_event(self, job_doc):
with self.scheduler.lock:
next_event = job_doc['job_schedule'].get('event', '')
while next_event:
if next_event == Job.STOP_EVENT:
logging.info('JOB {0} event: STOP'.format(self.id))
next_event = self.state.stop(self, job_doc)
elif next_event == Job.START_EVENT:
logging.info('JOB {0} event: START'.format(self.id))
next_event = self.state.start(self, job_doc)
elif next_event == Job.ABORT_EVENT:
logging.info('JOB {0} event: ABORT'.format(self.id))
next_event = self.state.abort(self, job_doc)
def upload_metadata(self, metadata_string):
try:
metadata = json.loads(metadata_string)
if metadata:
self.scheduler.upload_metadata(metadata)
except Exception as e:
logging.error('[*] metrics upload error: {0}'.format(e))
logging.info("[*] Job {0}, freezer action metadata uploaded")
def execute_job_action(self, job_action):
max_retries = job_action.get('max_retries', 1)
tries = max_retries
freezer_action = job_action.get('freezer_action', {})
max_retries_interval = job_action.get('max_retries_interval', 60)
action_name = freezer_action.get('action', '')
while tries:
with tempfile.NamedTemporaryFile() as config_file:
self.save_action_to_file(freezer_action, config_file)
freezer_command = 'python {0} --metadata-out - --config {1}'.\
format(self.executable, config_file.name)
self.process = subprocess.Popen(freezer_command.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output, error = self.process.communicate()
if error:
logging.error("[*] Freezer client error: {0}".format(error))
elif output:
self.upload_metadata(output)
if self.process.returncode:
# ERROR
tries -= 1
if tries:
logging.warning('[*] Job {0} failed {1} action,'
' retrying in {2} seconds'
.format(self.id, action_name,
max_retries_interval))
# sleeping with the bloody lock, but we don't want other
# actions to mess with our stuff like fs snapshots, do we ?
time.sleep(max_retries_interval)
else:
# SUCCESS
logging.info('[*] Job {0} action {1}'
' returned success exit code'.
format(self.id, action_name))
return Job.SUCCESS_RESULT
logging.error('[*] Job {0} action {1} failed after {2} tries'
.format(self.id, action_name, max_retries))
return Job.FAIL_RESULT
def execute(self):
result = Job.SUCCESS_RESULT
with self.scheduler.execution_lock:
with self.scheduler.lock:
logging.info('job {0} running'.format(self.id))
self.state = RunningState
self.job_doc_status = Job.RUNNING_STATUS
self.scheduler.update_job_status(self.id, self.job_doc_status)
self.start_session()
for job_action in self.job_doc.get('job_actions', []):
if job_action.get('mandatory', False) or\
(result == Job.SUCCESS_RESULT):
result = self.execute_job_action(job_action)
else:
freezer_action = job_action.get('freezer_action', {})
action_name = freezer_action.get('action', '')
logging.warning("[*]skipping {0} action".
format(action_name))
self.result = result
self.finish()
def finish(self):
self.end_session(self.result)
with self.scheduler.lock:
if self.event == Job.REMOVE_EVENT:
self.unschedule()
self.job_doc_status = Job.REMOVED_STATUS
return
if not self.scheduled:
self.job_doc_status = Job.COMPLETED_STATUS
self.state = StopState
self.scheduler.update_job(self.id, self.job_doc)
return
if self.event in [Job.STOP_EVENT, Job.ABORT_EVENT]:
self.unschedule()
self.job_doc_status = Job.COMPLETED_STATUS
self.scheduler.update_job(self.id, self.job_doc)
else:
self.job_doc_status = Job.SCHEDULED_STATUS
self.state = ScheduledState
self.scheduler.update_job_status(self.id, self.job_doc_status)
def start_session(self):
if not self.session_id:
return
retry = 5
while retry:
try:
resp = self.scheduler.start_session(self.session_id,
self.id,
self.session_tag)
if resp['result'] == 'success':
self.session_tag = resp['session_tag']
return
except Exception as e:
logging.error('[*]Error while starting session {0}. {1}'.
format(self.session_id, e))
logging.warning('[*]Retrying to start session {0}'.
format(self.session_id))
retry -= 1
logging.error('[*]Unable to start session {0}'.format(self.session_id))
def end_session(self, result):
if not self.session_id:
return
retry = 5
while retry:
try:
resp = self.scheduler.end_session(self.session_id,
self.id,
self.session_tag,
result)
if resp['result'] == 'success':
return
except Exception as e:
logging.error('[*]Error while ending session {0}. {1}'.
format(self.session_id, e))
logging.warning('[*]Retrying to end session {0}'.
format(self.session_id))
retry -= 1
logging.error('[*]Unable to end session {0}'.format(self.session_id))
def schedule(self):
try:
kwargs = self.get_schedule_args()
self.scheduler.add_job(self.execute, id=self.id, **kwargs)
except Exception as e:
logging.error("[*] Unable to schedule job {0}: {1}".
format(self.id, e))
if self.scheduled:
self.job_doc_status = Job.SCHEDULED_STATUS
self.state = ScheduledState
else:
# job not scheduled or already started and waiting for lock
self.job_doc_status = Job.COMPLETED_STATUS
self.state = StopState
def unschedule(self):
try:
# already executing job are not present in the apscheduler list
self.scheduler.remove_job(job_id=self.id)
except:
pass
self.event = Job.NO_EVENT
self.job_doc_status = Job.STOP_STATUS
self.state = StopState
def terminate(self):
if self.process:
self.process.terminate()
def kill(self):
if self.process:
self.process.kill()