Fix a cause of livelock in worker work() method

There were cases where the gearman plugin would submit gearman tasks in
response to some event without handling the IO driving that task. If
this happened it was possible for the state machine to go sideways and
no longer send new get job requests.

Fix this by driving IO as long as the functions to run list is empty and
we have any outstanding submitted tasks.

Logs from jenkins slave that failed to complete its grab job request
when a job update interrupted it.
Apr 28, 2015 9:41:29 PM hudson.slaves.SlaveComputer tryReconnect
INFO: Attempting to reconnect bare-trusty-hpcloud-b2-2353037
--
Apr 28, 2015 9:41:38 PM hudson.plugins.gearman.AbstractWorkerThread run
INFO: ---- Starting Worker bare-trusty-hpcloud-b2-2353037_exec-0 (Tue Apr 28 21:41:38 UTC 2015)
--
Apr 28, 2015 9:41:38 PM hudson.plugins.gearman.MyGearmanWorkerImpl setFunctions
INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 registering 7420 functions
Apr 28, 2015 9:41:38 PM hudson.plugins.gearman.MyGearmanWorkerImpl work
INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 starting work
--
Apr 28, 2015 9:43:52 PM hudson.plugins.gearman.MyGearmanWorkerImpl work
INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 sending initial grab job
--
Apr 28, 2015 9:44:01 PM hudson.plugins.gearman.MyGearmanWorkerImpl setFunctions
INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 registering 7422 functions
--
Apr 28, 2015 9:44:37 PM hudson.plugins.gearman.MyGearmanWorkerImpl handleSessionEvent
INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 received unique job assignment
--
Apr 29, 2015 11:48:58 AM hudson.plugins.gearman.MyGearmanWorkerImpl setFunctions
INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 registering 7424 functions

Change-Id: I2a1a4cf76dab5e1dc3ad5109aa0290a2ecc97fd9
This commit is contained in:
Clark Boylan 2015-04-29 11:48:04 -07:00
parent 6de3cdd29b
commit dc60cad769

View File

@ -150,6 +150,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
this.updated = updated;
}
public boolean getUpdated() {
return updated;
}
}
public void reconnect() {
@ -273,6 +276,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
state = State.RUNNING;
boolean grabJobSent = false;
running:
while (isRunning()) {
LOG.debug("---- Worker " + this + " top of run loop");
@ -328,7 +332,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (!isRunning()) continue;
if (functionList.isEmpty()) {
// Loop as long as we don't have function to run and there are
// tasks in flight.
while (functionList.isEmpty() && taskMap.get(session) != null) {
LOG.debug("---- Worker " + this + " function list empty; selecting");
int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) {
@ -337,34 +343,46 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
session.getSelectionKey().interestOps(interestOps);
try {
ioAvailable.select();
// Wait up to 2 seconds before rechecking the functionList
ioAvailable.select(2000);
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" selecting for IO", io);
session.closeSession();
continue;
continue running;
}
}
LOG.debug("---- Worker " + this + " run loop finished selecting");
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
LOG.debug("---- Worker " + this + " received input in run loop");
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " session is no longer initialized");
continue;
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
LOG.debug("---- Worker " + this + " received input in run loop");
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " session is no longer initialized");
continue running;
}
try {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " received IOException while driving" +
" IO on session " + session, io);
session.closeSession();
continue running;
}
}
try {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " received IOException while driving" +
" IO on session " + session, io);
session.closeSession();
continue;
GearmanTask currentTask = taskMap.get(session);
GearmanPacket packet = currentTask.getRequestPacket();
if (packet.getPacketType() == GearmanPacketType.WORK_FAIL) {
// Ensure we send a grab job after a failed attempt.
grabJobSent = false;
continue running;
}
// If we are have attempted to sleep but need to update
// functions start work again to register.
else if (functionRegistry.getUpdated() &&
packet.getPacketType() == GearmanPacketType.PRE_SLEEP) {
continue running;
}
else if (!isRunning()) continue running;
}
LOG.debug("---- Worker " + this + " run loop finished driving session io");
if (!isRunning()) continue;
LOG.debug("---- Worker " + this + " run loop found at least one function");
//For the time being we will execute the jobs synchronously
//in the future, I expect to change this.
@ -571,6 +589,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
GearmanTask gsr = new GearmanTask(
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.WORK_FAIL, handle));
taskMap.put(session, gsr);
session.submitTask(gsr);
} else {
GearmanFunction function = def.getFactory().getFunction();