Report exceptions while running the job to the client.
Don't catch any exceptions while running the job; instead, report them back to the client (via a catch-all exception handler in StartJobWorker). If the worker raises an exception, unlock the node monitor, in case the worker didn't get to the point where it would be unlocked. This change has the side effect that if the gearman server disconnects while the job is running, the worker should return from watching the job run (as soon as it notices, currently up to 5 seconds). This is helpful in that it will be available to register with gearman again, including sending CAN_DO packets. But the node monitor will still prevent it from scheduling a new job while the one it started earlier is still running. Change-Id: Ie01ef0f9e706d81452b189099e36242ab9967950
This commit is contained in:
parent
6041401766
commit
4556818799
|
@ -563,8 +563,16 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
|||
} else {
|
||||
executorService.submit(fun);
|
||||
}
|
||||
} catch (IOException io) {
|
||||
LOG.warn("Receieved IOException while" +
|
||||
" running function",io);
|
||||
session.closeSession();
|
||||
// The reconnect will unlock the monitor if needed.
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception while executing function " + fun.getName(), e);
|
||||
// Unlock the monitor for this worker in case we didn't
|
||||
// make it as far as the schedule job unlock.
|
||||
availability.unlock(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -105,22 +105,30 @@ public class StartJobWorker extends AbstractGearmanFunction {
|
|||
return gson.toJson(data);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* The Gearman Function
|
||||
* @see org.gearman.worker.AbstractGearmanFunction#executeFunction()
|
||||
*/
|
||||
@Override
|
||||
public GearmanJobResult executeFunction() {
|
||||
try {
|
||||
return safeExecuteFunction();
|
||||
} catch (Exception inner) {
|
||||
RuntimeException outer = new RuntimeException(inner);
|
||||
throw outer;
|
||||
}
|
||||
}
|
||||
|
||||
private GearmanJobResult safeExecuteFunction()
|
||||
throws Exception
|
||||
{
|
||||
logger.info("---- Running executeFunction in " + name + " ----");
|
||||
|
||||
// decode the uniqueId from the client
|
||||
String decodedUniqueId = null;
|
||||
if (this.uniqueId != null) {
|
||||
try {
|
||||
decodedUniqueId = new String(this.uniqueId, "UTF-8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
decodedUniqueId = new String(this.uniqueId, "UTF-8");
|
||||
}
|
||||
|
||||
// create new parameter objects to pass to jenkins build
|
||||
|
@ -128,11 +136,7 @@ public class StartJobWorker extends AbstractGearmanFunction {
|
|||
String decodedData = null;
|
||||
if (this.data != null) {
|
||||
// decode the data from the client
|
||||
try {
|
||||
decodedData = new String((byte[]) this.data, "UTF-8");
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
decodedData = new String((byte[]) this.data, "UTF-8");
|
||||
// convert parameters passed in from client to hash map
|
||||
Gson gson = new Gson();
|
||||
Map<String, String> inParams = gson.fromJson(decodedData,
|
||||
|
@ -184,89 +188,56 @@ public class StartJobWorker extends AbstractGearmanFunction {
|
|||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// wait for start of build
|
||||
Queue.Executable exec = future.getStartCondition().get();
|
||||
AbstractBuild<?, ?> currBuild = (AbstractBuild<?, ?>) exec;
|
||||
// wait for start of build
|
||||
Queue.Executable exec = future.getStartCondition().get();
|
||||
AbstractBuild<?, ?> currBuild = (AbstractBuild<?, ?>) exec;
|
||||
|
||||
availability.unlock(worker);
|
||||
availability.unlock(worker);
|
||||
|
||||
long now = new Date().getTime();
|
||||
int duration = (int) (now - currBuild.getStartTimeInMillis());
|
||||
int estimatedDuration = (int) currBuild.getEstimatedDuration();
|
||||
jobData = buildStatusData(currBuild);
|
||||
long now = new Date().getTime();
|
||||
int duration = (int) (now - currBuild.getStartTimeInMillis());
|
||||
int estimatedDuration = (int) currBuild.getEstimatedDuration();
|
||||
jobData = buildStatusData(currBuild);
|
||||
|
||||
// If we found a session object in the hacky bit above,
|
||||
// use it to send a WORK_STATUS packet indicating the
|
||||
// start of the build.
|
||||
if (sess != null) {
|
||||
try {
|
||||
sendData(jobData.getBytes());
|
||||
sess.driveSessionIO();
|
||||
sendData(jobData.getBytes());
|
||||
sess.driveSessionIO();
|
||||
sendStatus(estimatedDuration, duration);
|
||||
sess.driveSessionIO();
|
||||
|
||||
while (!future.isDone()) {
|
||||
// wait for jenkins build to complete
|
||||
try {
|
||||
future.get(10, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
now = new Date().getTime();
|
||||
duration = (int) (now - currBuild.getStartTimeInMillis());
|
||||
estimatedDuration = (int) currBuild.getEstimatedDuration();
|
||||
if (sess != null) {
|
||||
sendStatus(estimatedDuration, duration);
|
||||
sess.driveSessionIO();
|
||||
} catch (IOException e) {
|
||||
sess = null;
|
||||
logger.warn("IO Exception when driving session IO");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while (!future.isDone()) {
|
||||
// wait for jenkins build to complete
|
||||
try {
|
||||
future.get(10, TimeUnit.SECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
now = new Date().getTime();
|
||||
duration = (int) (now - currBuild.getStartTimeInMillis());
|
||||
estimatedDuration = (int) currBuild.getEstimatedDuration();
|
||||
if (sess != null) {
|
||||
try {
|
||||
sendStatus(estimatedDuration, duration);
|
||||
sess.driveSessionIO();
|
||||
} catch (IOException e2) {
|
||||
sess = null;
|
||||
logger.warn("IO Exception when driving session IO");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
exec = future.get();
|
||||
|
||||
exec = future.get();
|
||||
jobData = buildStatusData(currBuild);
|
||||
if (sess != null) {
|
||||
sendData(jobData.getBytes());
|
||||
sess.driveSessionIO();
|
||||
}
|
||||
|
||||
jobData = buildStatusData(currBuild);
|
||||
if (sess != null) {
|
||||
try {
|
||||
sendData(jobData.getBytes());
|
||||
sess.driveSessionIO();
|
||||
} catch (IOException e) {
|
||||
sess = null;
|
||||
logger.warn("IO Exception when driving session IO");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
// check Jenkins build results
|
||||
Result result = currBuild.getResult();
|
||||
if (result == Result.SUCCESS) {
|
||||
jobResult = true;
|
||||
} else {
|
||||
jobResult = false;
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
availability.unlock(worker);
|
||||
jobFailureMsg = e.getMessage();
|
||||
jobResult = false;
|
||||
} catch (ExecutionException e) {
|
||||
availability.unlock(worker);
|
||||
jobFailureMsg = e.getMessage();
|
||||
// check Jenkins build results
|
||||
Result result = currBuild.getResult();
|
||||
if (result == Result.SUCCESS) {
|
||||
jobResult = true;
|
||||
} else {
|
||||
jobResult = false;
|
||||
}
|
||||
|
||||
// return result to client
|
||||
GearmanJobResult gjr = new GearmanJobResultImpl(this.jobHandle, jobResult,
|
||||
GearmanJobResult gjr = new GearmanJobResultImpl(
|
||||
this.jobHandle, jobResult,
|
||||
"".getBytes(), "".getBytes(),
|
||||
jobFailureMsg.getBytes(), 0, 0);
|
||||
return gjr;
|
||||
|
|
Loading…
Reference in New Issue