Fix a cause of livelock in worker work() method

There were cases where the gearman plugin would submit gearman tasks in
response to some event without handling the IO driving that task. If
this happened it was possible for the state machine to go sideways and
no longer send new get job requests.

Fix this by driving IO as long as the functions to run list is empty and
we have any outstanding submitted tasks.

Change-Id: I2a1a4cf76dab5e1dc3ad5109aa0290a2ecc97fd9
This commit is contained in:
Clark Boylan 2015-04-29 11:48:04 -07:00
parent 6de3cdd29b
commit 0ae453fa58
1 changed files with 39 additions and 20 deletions

View File

@ -150,6 +150,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
this.updated = updated;
}
public boolean getUpdated() {
return updated;
}
}
public void reconnect() {
@ -273,6 +276,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
state = State.RUNNING;
boolean grabJobSent = false;
running:
while (isRunning()) {
LOG.debug("---- Worker " + this + " top of run loop");
@ -328,7 +332,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (!isRunning()) continue;
if (functionList.isEmpty()) {
// Loop as long as we don't have function to run and there are
// tasks in flight.
while (functionList.isEmpty() && taskMap.get(session) != null) {
LOG.debug("---- Worker " + this + " function list empty; selecting");
int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) {
@ -337,34 +343,46 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
session.getSelectionKey().interestOps(interestOps);
try {
ioAvailable.select();
// Wait up to 2 seconds before rechecking the functionList
ioAvailable.select(2000);
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" selecting for IO", io);
session.closeSession();
continue;
continue running;
}
}
LOG.debug("---- Worker " + this + " run loop finished selecting");
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
LOG.debug("---- Worker " + this + " received input in run loop");
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " session is no longer initialized");
continue;
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
LOG.debug("---- Worker " + this + " received input in run loop");
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " session is no longer initialized");
continue running;
}
try {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " received IOException while driving" +
" IO on session " + session, io);
session.closeSession();
continue running;
}
}
try {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " received IOException while driving" +
" IO on session " + session, io);
session.closeSession();
continue;
GearmanTask currentTask = taskMap.get(session);
GearmanPacket packet = currentTask.getRequestPacket();
if (packet.getPacketType() == GearmanPacketType.WORK_FAIL) {
// Ensure we send a grab job after a failed attempt.
grabJobSent = false;
continue running;
}
// If we are have attempted to sleep but need to update
// functions start work again to register.
else if (functionRegistry.getUpdated() &&
packet.getPacketType() == GearmanPacketType.PRE_SLEEP) {
continue running;
}
else if (!isRunning()) continue running;
}
LOG.debug("---- Worker " + this + " run loop finished driving session io");
if (!isRunning()) continue;
LOG.debug("---- Worker " + this + " run loop found at least one function");
//For the time being we will execute the jobs synchronously
//in the future, I expect to change this.
@ -571,6 +589,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
GearmanTask gsr = new GearmanTask(
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.WORK_FAIL, handle));
taskMap.put(session, gsr);
session.submitTask(gsr);
} else {
GearmanFunction function = def.getFactory().getFunction();