Fix race between adding job and registering

Gearman plugin had a race between adding jobs to the functionList and
registering jobs. When registering jobs the functionMap is cleared, when
adding a job the plugin checks if the job is in the function Map before
running it. If we happen to trigger registration of jobs when we get a
response from gearman with a job assignment then the functionMap can be
empty making us send a work fail instead of running the job.

To make things worse this jenkins worker would not send a subsequent
GET JOB and would live lock never doing any useful work.

Correct this by making the processing for gearman events synchronous in
the work loop. This ensures that we never try to clear the function map
and check against it at the same time via different threads. To make
this happen the handleSessionEvent() method puts all events on a thread
safe queue for synchronous processing. This has allowed us to simplify
the work() loop and basically do the following:

  while running:
    init()
    register()
    process one event
    run function if processed
    drive IO

This is much easier to reason about as we essentially only have
bookkeeping and the code for one thing at a time.

Change-Id: Id537710f6c8276a528ad78afd72c5a7c8e8a16ac
This commit is contained in:
Clark Boylan 2015-05-04 18:09:31 -07:00
parent 6de3cdd29b
commit 4f2f53f38a

View File

@ -36,8 +36,8 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.gearman.common.Constants;
import org.gearman.common.GearmanException;
@ -70,7 +70,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
IDLE, RUNNING, SHUTTINGDOWN
}
private static final String DESCRIPION_PREFIX = "GearmanWorker";
private Queue<GearmanFunction> functionList = null;
private ConcurrentLinkedQueue<GearmanSessionEvent> eventList = null;
private Selector ioAvailable = null;
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(
Constants.GEARMAN_WORKER_LOGGER_NAME);
@ -78,7 +78,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
private Map<String, FunctionDefinition> functionMap;
private State state;
private ExecutorService executorService;
private Map<GearmanJobServerSession, GearmanTask> taskMap = null;
private GearmanJobServerSession session = null;
private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
private volatile boolean jobUniqueIdRequired = false;
@ -183,12 +182,11 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
public MyGearmanWorkerImpl(ExecutorService executorService,
AvailabilityMonitor availability) {
this.availability = availability;
functionList = new LinkedList<GearmanFunction>();
eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId();
functionMap = new HashMap<String, FunctionDefinition>();
state = State.IDLE;
this.executorService = executorService;
taskMap = new HashMap<GearmanJobServerSession, GearmanTask>();
functionRegistry = new FunctionRegistry();
try {
@ -251,18 +249,23 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
LOG.debug("---- Worker " + this + " registered function " +
factory.getFunctionName());
}
enqueueNoopEvent();
}
public void enqueueNoopEvent() {
// Simulate a NOOP packet which will kick off a GRAB_JOB cycle
// if we're sleeping. If we get a real NOOP in the mean time,
// it should be fine because GearmanJobServerSession ignores a
// NOOP if PRE_SLEEP is not on the stack.
// If we get a real NOOP in the mean time, it should be fine
// because GearmanJobServerSession ignores a NOOP if PRE_SLEEP
// is not on the stack.
GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES,
GearmanPacketType.NOOP, new byte[0]);
GearmanSessionEvent event = new GearmanSessionEvent(p, session);
session.handleSessionEvent(event);
enqueueEvent(event);
}
public void work() {
GearmanSessionEvent event = null;
GearmanFunction function = null;
LOG.info("---- Worker " + this + " starting work");
if (!state.equals(State.IDLE)) {
@ -271,7 +274,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
}
state = State.RUNNING;
boolean grabJobSent = false;
while (isRunning()) {
LOG.debug("---- Worker " + this + " top of run loop");
@ -279,12 +281,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " run loop reconnect");
reconnect();
grabJobSent = false;
}
// if still disconnected, skip
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " run loop not initialized");
enqueueNoopEvent();
// Restart loop to check we connected.
continue;
}
@ -298,55 +296,41 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
continue;
}
if (!isRunning()) continue;
if (!isRunning() || !session.isInitialized()) continue;
if (functionList.isEmpty()) {
LOG.debug("---- Worker " + this + " run loop function list is empty while" +
" checking for initial grab job");
if (!grabJobSent) {
// send the initial GRAB_JOB on reconnection.
LOG.info("---- Worker " + this + " sending initial grab job");
try {
sendGrabJob(session);
} catch (InterruptedException e) {
LOG.warn("---- Worker " + this +
" interrupted while waiting for okay to send grab job", e);
continue;
}
grabJobSent = true;
try {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" sending initial grab job", io);
session.closeSession();
continue;
}
}
}
LOG.debug("---- Worker " + this + " run loop finished initial grab job");
event = eventList.poll();
function = processSessionEvent(event);
if (!isRunning()) continue;
if (!isRunning() || !session.isInitialized()) continue;
if (functionList.isEmpty()) {
LOG.debug("---- Worker " + this + " function list empty; selecting");
int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) {
interestOps |= SelectionKey.OP_WRITE;
}
session.getSelectionKey().interestOps(interestOps);
try {
ioAvailable.select();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" selecting for IO", io);
session.closeSession();
continue;
}
//For the time being we will execute the jobs synchronously
//in the future, I expect to change this.
if (function != null) {
LOG.info("---- Worker " + this + " executing function");
submitFunction(function);
// Send another grab_job on the next loop
enqueueNoopEvent();
}
if (!isRunning() || !session.isInitialized()) continue;
// Run IO, select waiting for ability to read and/or write
// then read and/or write.
int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) {
interestOps |= SelectionKey.OP_WRITE;
}
session.getSelectionKey().interestOps(interestOps);
try {
ioAvailable.select();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" selecting for IO", io);
session.closeSession();
continue;
}
LOG.debug("---- Worker " + this + " run loop finished selecting");
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
LOG.debug("---- Worker " + this + " received input in run loop");
if (!session.isInitialized()) {
@ -363,19 +347,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
}
}
LOG.debug("---- Worker " + this + " run loop finished driving session io");
if (!isRunning()) continue;
//For the time being we will execute the jobs synchronously
//in the future, I expect to change this.
if (!functionList.isEmpty()) {
LOG.info("---- Worker " + this + " executing function");
GearmanFunction fun = functionList.remove();
submitFunction(fun);
// Send another grab_job on the next loop
grabJobSent = false;
}
LOG.debug("---- Worker " + this + " bottom of run loop");
}
shutDownWorker(true);
@ -391,63 +362,71 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
getGrabJobPacketType(), new byte[0]));
taskMap.put(s, grabJobTask);
s.submitTask(grabJobTask);
}
public void handleSessionEvent(GearmanSessionEvent event)
throws IllegalArgumentException, IllegalStateException {
GearmanPacket p = event.getPacket();
GearmanJobServerSession s = event.getSession();
GearmanPacketType t = p.getPacketType();
LOG.debug("---- Worker " + this + " handling session event" +
" ( Session = " + s + " Event = " + t + " )");
switch (t) {
case JOB_ASSIGN:
//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
LOG.info("---- Worker " + this + " received job assignment");
taskMap.remove(s);
addNewJob(event);
break;
case JOB_ASSIGN_UNIQ:
//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
LOG.info("---- Worker " + this + " received unique job assignment");
taskMap.remove(s);
addNewJob(event);
break;
case NOOP:
taskMap.remove(s);
LOG.debug("---- Worker " + this + " sending grab job after wakeup");
try {
sendGrabJob(s);
} catch (InterruptedException e) {
LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " +
"grab job", e);
}
break;
case NO_JOB:
// We didn't get a job, so allow other workers or
// Jenkins to schedule on this node.
availability.unlock(this);
LOG.debug("---- Worker " + this + " sending pre sleep after no_job");
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.PRE_SLEEP, new byte[0]));
taskMap.put(s, preSleepTask);
s.submitTask(preSleepTask);
break;
case ECHO_RES:
break;
case OPTION_RES:
break;
case ERROR:
s.closeSession();
break;
default:
LOG.warn("---- Worker " + this + " received unknown packet type " + t +
" from session " + s + "; closing connection");
s.closeSession();
enqueueEvent(event);
}
public void enqueueEvent(GearmanSessionEvent event) {
// Enqueue in a thread safe manner. Events will
// be pulled off and processed serially in this workers
// main thread.
eventList.add(event);
}
private GearmanFunction processSessionEvent(GearmanSessionEvent event)
throws IllegalArgumentException, IllegalStateException {
if (event != null) {
GearmanPacket p = event.getPacket();
GearmanJobServerSession s = event.getSession();
GearmanPacketType t = p.getPacketType();
LOG.debug("---- Worker " + this + " handling session event" +
" ( Session = " + s + " Event = " + t + " )");
switch (t) {
case JOB_ASSIGN:
//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
LOG.info("---- Worker " + this + " received job assignment");
return addNewJob(event);
case JOB_ASSIGN_UNIQ:
//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
LOG.info("---- Worker " + this + " received unique job assignment");
return addNewJob(event);
case NOOP:
LOG.debug("---- Worker " + this + " sending grab job after wakeup");
try {
sendGrabJob(s);
} catch (InterruptedException e) {
LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " +
"grab job", e);
}
break;
case NO_JOB:
// We didn't get a job, so allow other workers or
// Jenkins to schedule on this node.
availability.unlock(this);
LOG.debug("---- Worker " + this + " sending pre sleep after no_job");
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.PRE_SLEEP, new byte[0]));
s.submitTask(preSleepTask);
break;
case ECHO_RES:
break;
case OPTION_RES:
break;
case ERROR:
s.closeSession();
break;
default:
LOG.warn("---- Worker " + this + " received unknown packet type " + t +
" from session " + s + "; closing connection");
s.closeSession();
}
}
return null;
}
public boolean addServer(String host, int port) {
@ -554,7 +533,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return exceptions;
}
private void addNewJob(GearmanSessionEvent event) {
private GearmanFunction addNewJob(GearmanSessionEvent event) {
byte[] handle, data, functionNameBytes, unique;
GearmanPacket p = event.getPacket();
String functionName;
@ -572,6 +551,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.WORK_FAIL, handle));
session.submitTask(gsr);
enqueueNoopEvent();
} else {
GearmanFunction function = def.getFactory().getFunction();
function.setData(data);
@ -580,8 +560,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (unique != null && unique.length > 0) {
function.setUniqueId(unique);
}
functionList.add(function);
return function;
}
return null;
}
private void submitFunction(GearmanFunction fun) {