Fixed problem with canceling during pending

Change-Id: Icbe3cd39fa28d6561607e679e3e7cd5b2a64751a
Closes-bug: #1369979
This commit is contained in:
Andrey Pavlov 2014-09-18 16:26:00 +04:00
parent ea4e38a198
commit 89fbce96f1
10 changed files with 90 additions and 16 deletions

View File

@ -204,6 +204,11 @@
# stored or retrieved in a single operation. (integer value)
#job_binary_max_KB=5120
# Timeout for canceling job execution (in seconds). Sahara
# will try to cancel job execution during this time. (integer
# value)
#job_canceling_timeout=300
# If set to True, Sahara will use floating IPs to communicate
# with instances. To make sure that all instances have
# floating IPs assigned in Nova Network set

View File

@ -35,7 +35,12 @@ edp_opts = [
cfg.IntOpt('job_binary_max_KB',
default=5120,
help='Maximum length of job binary data in kilobytes that '
'may be stored or retrieved in a single operation.')
'may be stored or retrieved in a single operation.'),
cfg.IntOpt('job_canceling_timeout',
default=300,
help='Timeout for canceling job execution (in seconds). '
'Sahara will try to cancel job execution during '
'this time.')
]
networking_opts = [

View File

@ -155,6 +155,15 @@ class CreationFailed(SaharaException):
self.message = message
class CancelingFailed(SaharaException):
message = _("Operation was not canceled")
code = "CANCELING_FAILED"
def __init__(self, message=None):
if message:
self.message = message
class DeletionFailed(SaharaException):
message = _("Object was not deleted")
code = "DELETION_FAILED"

View File

@ -89,7 +89,10 @@ def get_job_execution(id):
def cancel_job_execution(id):
return manager.cancel_job(id)
job_execution = conductor.job_execution_get(context.ctx(), id)
OPS.cancel_job_execution(id)
return job_execution
def delete_job_execution(id):

View File

@ -16,12 +16,14 @@
import datetime
from oslo.config import cfg
from oslo.utils import timeutils
from sahara import conductor as c
from sahara import context
from sahara import exceptions as e
from sahara.i18n import _
from sahara.i18n import _LE
from sahara.i18n import _LI
from sahara.openstack.common import log
from sahara.service.edp import job_utils
from sahara.service.edp.oozie import engine as oozie_engine
@ -131,20 +133,45 @@ def run_job(job_execution_id):
def cancel_job(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
if job_execution.info['status'] in edp.JOB_STATUSES_TERMINATED:
return job_execution
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster is not None:
engine = _get_job_engine(cluster, job_execution)
if engine is not None:
try:
job_info = engine.cancel_job(job_execution)
except Exception as e:
job_info = None
LOG.exception(
_LE("Error during cancel of job execution %(job)s: "
"%(error)s"), {'job': job_execution.id, 'error': e})
if job_info is not None:
job_execution = _write_job_status(job_execution, job_info)
return job_execution
if cluster is None:
return job_execution
engine = _get_job_engine(cluster, job_execution)
if engine is not None:
job_execution = conductor.job_execution_update(
ctx, job_execution_id,
{'info': {'status': edp.JOB_STATUS_TOBEKILLED}})
timeout = CONF.job_canceling_timeout
s_time = timeutils.utcnow()
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
if job_execution.info['status'] not in edp.JOB_STATUSES_TERMINATED:
try:
job_info = engine.cancel_job(job_execution)
except Exception as ex:
job_info = None
LOG.exception(
_LE("Error during cancel of job execution %(job)s: "
"%(error)s"), {'job': job_execution.id,
'error': ex})
if job_info is not None:
job_execution = _write_job_status(job_execution, job_info)
LOG.info(_LI("Job execution %s was canceled successfully"),
job_execution.id)
return job_execution
context.sleep(3)
job_execution = conductor.job_execution_get(
ctx, job_execution_id)
else:
LOG.info(_LI("Job execution status %(job)s: %(status)s"),
{'job': job_execution.id,
'status': job_execution.info['status']})
return job_execution
else:
raise e.CancelingFailed(_('Job execution %s was not canceled')
% job_execution.id)
def get_job_status(job_execution_id):

View File

@ -111,6 +111,9 @@ class OozieJobEngine(base_engine.JobEngine):
client = self._get_client()
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
job_execution)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
client.run_job(job_execution, oozie_job_id)
try:
status = client.get_job_status(job_execution,

View File

@ -166,6 +166,10 @@ class SparkJobEngine(base_engine.JobEngine):
port,
args)
job_execution = conductor.job_execution_get(ctx, job_execution.id)
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
return (None, edp.JOB_STATUS_KILLED, None)
# If an exception is raised here, the job_manager will mark
# the job failed and log the exception
with remote.get_remote(master) as r:

View File

@ -67,6 +67,10 @@ class LocalOps(object):
context.spawn("Starting Job Execution %s" % job_execution_id,
_run_edp_job, job_execution_id)
def cancel_job_execution(self, job_execution_id):
context.spawn("Canceling Job Execution %s" % job_execution_id,
_cancel_job_execution, job_execution_id)
class RemoteOps(rpc_utils.RPCClient):
def __init__(self):
@ -86,6 +90,10 @@ class RemoteOps(rpc_utils.RPCClient):
def run_edp_job(self, job_execution_id):
self.cast('run_edp_job', job_execution_id=job_execution_id)
def cancel_job_execution(self, job_execution_id):
self.cast('cancel_job_execution',
job_execution_id=job_execution_id)
class OpsServer(rpc_utils.RPCServer):
def __init__(self):
@ -105,6 +113,9 @@ class OpsServer(rpc_utils.RPCServer):
def run_edp_job(self, job_execution_id):
_run_edp_job(job_execution_id)
def cancel_job_execution(self, job_execution_id):
_cancel_job_execution(job_execution_id)
def ops_error_handler(f):
@functools.wraps(f)
@ -263,3 +274,7 @@ def terminate_cluster(cluster_id):
def _run_edp_job(job_execution_id):
job_manager.run_job(job_execution_id)
def _cancel_job_execution(job_execution_id):
job_manager.cancel_job(job_execution_id)

View File

@ -307,6 +307,7 @@ class TestSpark(base.SaharaTestCase):
# check that we have nothing new to report ...
self.assertEqual(status, None)
@mock.patch('sahara.conductor.API.job_execution_get')
@mock.patch('sahara.utils.remote.get_remote')
@mock.patch('sahara.plugins.spark.config_helper.get_config_value')
@mock.patch('sahara.service.edp.job_utils.upload_job_files',
@ -319,7 +320,8 @@ class TestSpark(base.SaharaTestCase):
@mock.patch('sahara.conductor.API.job_get')
@mock.patch('sahara.context.ctx', return_value="ctx")
def test_run_job(self, ctx, job_get, get_instance, create_workflow_dir,
upload_job_files, get_config_value, get_remote):
upload_job_files, get_config_value, get_remote,
job_exec_get):
def fix_get(field, default=None):
if field == "args":

View File

@ -20,6 +20,7 @@ JOB_STATUS_KILLED = 'KILLED'
JOB_STATUS_PENDING = 'PENDING'
JOB_STATUS_RUNNING = 'RUNNING'
JOB_STATUS_SUCCEEDED = 'SUCCEEDED'
JOB_STATUS_TOBEKILLED = 'TOBEKILLED'
# statuses for terminated jobs
JOB_STATUSES_TERMINATED = [
JOB_STATUS_DONEWITHERROR,