Fix race between adding job and registering

Gearman plugin had a race between adding jobs to the functionList and
registering jobs. When registering jobs the functionMap is cleared, when
adding a job the plugin checks if the job is in the function Map before
running it. If we happen to trigger registration of jobs when we get a
response from gearman with a job assignment then the functionMap can be
empty making us send a work fail instead of running the job.

To make things worse this jenkins worker would not send a subsequent
GET JOB and would live lock never doing any useful work.

Correct this by making the processing for gearman events synchronous in
the work loop. This ensures that we never try to clear the function map
and check against it at the same time via different threads. To make
this happen the handleSessionEvent() method puts all events on a thread
safe queue for synchronous processing. This has allowed us to simplify
the work() loop and basically do the following:

  while running:
    init()
    register()
    process one event
    run function if processed
    drive IO

This is much easier to reason about as we essentially only have
bookkeeping and the code for one thing at a time.

Change-Id: Id537710f6c8276a528ad78afd72c5a7c8e8a16ac
This commit is contained in:
Clark Boylan 2015-05-04 18:09:31 -07:00
parent 6de3cdd29b
commit d6ff943ae0

View File

@ -36,7 +36,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.gearman.common.Constants; import org.gearman.common.Constants;
@ -70,7 +70,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
IDLE, RUNNING, SHUTTINGDOWN IDLE, RUNNING, SHUTTINGDOWN
} }
private static final String DESCRIPION_PREFIX = "GearmanWorker"; private static final String DESCRIPION_PREFIX = "GearmanWorker";
private Queue<GearmanFunction> functionList = null; private ConcurrentLinkedQueue<GearmanSessionEvent> eventList = null;
private Selector ioAvailable = null; private Selector ioAvailable = null;
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger( private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(
Constants.GEARMAN_WORKER_LOGGER_NAME); Constants.GEARMAN_WORKER_LOGGER_NAME);
@ -78,7 +78,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
private Map<String, FunctionDefinition> functionMap; private Map<String, FunctionDefinition> functionMap;
private State state; private State state;
private ExecutorService executorService; private ExecutorService executorService;
private Map<GearmanJobServerSession, GearmanTask> taskMap = null;
private GearmanJobServerSession session = null; private GearmanJobServerSession session = null;
private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory(); private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
private volatile boolean jobUniqueIdRequired = false; private volatile boolean jobUniqueIdRequired = false;
@ -163,6 +162,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
GearmanPacketType.SET_CLIENT_ID, GearmanPacketType.SET_CLIENT_ID,
ByteUtils.toUTF8Bytes(id))); ByteUtils.toUTF8Bytes(id)));
} }
// Reset events so that we don't process events from the old
// connection.
eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
// this will cause a grab-job event // this will cause a grab-job event
functionRegistry.setUpdated(true); functionRegistry.setUpdated(true);
} catch (IOException e) { } catch (IOException e) {
@ -183,12 +185,11 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
public MyGearmanWorkerImpl(ExecutorService executorService, public MyGearmanWorkerImpl(ExecutorService executorService,
AvailabilityMonitor availability) { AvailabilityMonitor availability) {
this.availability = availability; this.availability = availability;
functionList = new LinkedList<GearmanFunction>(); eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId(); id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId();
functionMap = new HashMap<String, FunctionDefinition>(); functionMap = new HashMap<String, FunctionDefinition>();
state = State.IDLE; state = State.IDLE;
this.executorService = executorService; this.executorService = executorService;
taskMap = new HashMap<GearmanJobServerSession, GearmanTask>();
functionRegistry = new FunctionRegistry(); functionRegistry = new FunctionRegistry();
try { try {
@ -252,6 +253,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
factory.getFunctionName()); factory.getFunctionName());
} }
GearmanSessionEvent nextEvent = eventList.peek();
if (nextEvent != null &&
nextEvent.getPacket().getPacketType() != GearmanPacketType.NOOP) {
// Simulate a NOOP packet which will kick off a GRAB_JOB cycle // Simulate a NOOP packet which will kick off a GRAB_JOB cycle
// if we're sleeping. If we get a real NOOP in the mean time, // if we're sleeping. If we get a real NOOP in the mean time,
// it should be fine because GearmanJobServerSession ignores a // it should be fine because GearmanJobServerSession ignores a
@ -261,8 +265,22 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
GearmanSessionEvent event = new GearmanSessionEvent(p, session); GearmanSessionEvent event = new GearmanSessionEvent(p, session);
session.handleSessionEvent(event); session.handleSessionEvent(event);
} }
}
public void enqueueNoopEvent() {
// Simulate a NOOP packet which will kick off a GRAB_JOB cycle.
// This unconditionally enqueues the NOOP which will send a GRAB_JOB
// and should only be used when you know you need to send a GRAB_JOB.
// Cases like worker start, post function run, post failure.
GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES,
GearmanPacketType.NOOP, new byte[0]);
GearmanSessionEvent event = new GearmanSessionEvent(p, session);
enqueueEvent(event);
}
public void work() { public void work() {
GearmanSessionEvent event = null;
GearmanFunction function = null;
LOG.info("---- Worker " + this + " starting work"); LOG.info("---- Worker " + this + " starting work");
if (!state.equals(State.IDLE)) { if (!state.equals(State.IDLE)) {
@ -271,7 +289,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
} }
state = State.RUNNING; state = State.RUNNING;
boolean grabJobSent = false;
while (isRunning()) { while (isRunning()) {
LOG.debug("---- Worker " + this + " top of run loop"); LOG.debug("---- Worker " + this + " top of run loop");
@ -279,12 +296,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (!session.isInitialized()) { if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " run loop reconnect"); LOG.debug("---- Worker " + this + " run loop reconnect");
reconnect(); reconnect();
grabJobSent = false; enqueueNoopEvent();
} // Restart loop to check we connected.
// if still disconnected, skip
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " run loop not initialized");
continue; continue;
} }
@ -298,38 +311,29 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
continue; continue;
} }
if (!isRunning()) continue; if (!isRunning() || !session.isInitialized()) continue;
if (functionList.isEmpty()) { event = eventList.poll();
LOG.debug("---- Worker " + this + " run loop function list is empty while" + function = processSessionEvent(event);
" checking for initial grab job");
if (!grabJobSent) { if (!isRunning() || !session.isInitialized()) continue;
// send the initial GRAB_JOB on reconnection.
LOG.info("---- Worker " + this + " sending initial grab job"); // For the time being we will execute the jobs synchronously
try { // in the future, I expect to change this.
sendGrabJob(session); if (function != null) {
} catch (InterruptedException e) { LOG.info("---- Worker " + this + " executing function");
LOG.warn("---- Worker " + this + submitFunction(function);
" interrupted while waiting for okay to send grab job", e); // Send another grab_job on the next loop
enqueueNoopEvent();
// Skip IO as submitFunction drives the IO for function
// running.
continue; continue;
} }
grabJobSent = true;
try {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" sending initial grab job", io);
session.closeSession();
continue;
}
}
}
LOG.debug("---- Worker " + this + " run loop finished initial grab job");
if (!isRunning()) continue; if (!isRunning() || !session.isInitialized()) continue;
if (functionList.isEmpty()) { // Run IO, select waiting for ability to read and/or write
LOG.debug("---- Worker " + this + " function list empty; selecting"); // then read and/or write.
int interestOps = SelectionKey.OP_READ; int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) { if (session.sessionHasDataToWrite()) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
@ -344,9 +348,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
session.closeSession(); session.closeSession();
continue; continue;
} }
}
LOG.debug("---- Worker " + this + " run loop finished selecting");
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) { if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
LOG.debug("---- Worker " + this + " received input in run loop"); LOG.debug("---- Worker " + this + " received input in run loop");
if (!session.isInitialized()) { if (!session.isInitialized()) {
@ -363,19 +365,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
} }
} }
LOG.debug("---- Worker " + this + " run loop finished driving session io"); LOG.debug("---- Worker " + this + " run loop finished driving session io");
if (!isRunning()) continue;
//For the time being we will execute the jobs synchronously
//in the future, I expect to change this.
if (!functionList.isEmpty()) {
LOG.info("---- Worker " + this + " executing function");
GearmanFunction fun = functionList.remove();
submitFunction(fun);
// Send another grab_job on the next loop
grabJobSent = false;
}
LOG.debug("---- Worker " + this + " bottom of run loop");
} }
shutDownWorker(true); shutDownWorker(true);
@ -391,12 +380,24 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
new GrabJobEventHandler(s), new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ, new GearmanPacketImpl(GearmanPacketMagic.REQ,
getGrabJobPacketType(), new byte[0])); getGrabJobPacketType(), new byte[0]));
taskMap.put(s, grabJobTask);
s.submitTask(grabJobTask); s.submitTask(grabJobTask);
} }
public void handleSessionEvent(GearmanSessionEvent event) public void handleSessionEvent(GearmanSessionEvent event)
throws IllegalArgumentException, IllegalStateException { throws IllegalArgumentException, IllegalStateException {
enqueueEvent(event);
}
public void enqueueEvent(GearmanSessionEvent event) {
// Enqueue in a thread safe manner. Events will
// be pulled off and processed serially in this workers
// main thread.
eventList.add(event);
}
private GearmanFunction processSessionEvent(GearmanSessionEvent event)
throws IllegalArgumentException, IllegalStateException {
if (event != null) {
GearmanPacket p = event.getPacket(); GearmanPacket p = event.getPacket();
GearmanJobServerSession s = event.getSession(); GearmanJobServerSession s = event.getSession();
GearmanPacketType t = p.getPacketType(); GearmanPacketType t = p.getPacketType();
@ -406,17 +407,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
case JOB_ASSIGN: case JOB_ASSIGN:
//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true //TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
LOG.info("---- Worker " + this + " received job assignment"); LOG.info("---- Worker " + this + " received job assignment");
taskMap.remove(s); return addNewJob(event);
addNewJob(event);
break;
case JOB_ASSIGN_UNIQ: case JOB_ASSIGN_UNIQ:
//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false //TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
LOG.info("---- Worker " + this + " received unique job assignment"); LOG.info("---- Worker " + this + " received unique job assignment");
taskMap.remove(s); return addNewJob(event);
addNewJob(event);
break;
case NOOP: case NOOP:
taskMap.remove(s);
LOG.debug("---- Worker " + this + " sending grab job after wakeup"); LOG.debug("---- Worker " + this + " sending grab job after wakeup");
try { try {
sendGrabJob(s); sendGrabJob(s);
@ -433,7 +429,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s), GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ, new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.PRE_SLEEP, new byte[0])); GearmanPacketType.PRE_SLEEP, new byte[0]));
taskMap.put(s, preSleepTask);
s.submitTask(preSleepTask); s.submitTask(preSleepTask);
break; break;
case ECHO_RES: case ECHO_RES:
@ -449,6 +444,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
s.closeSession(); s.closeSession();
} }
} }
return null;
}
public boolean addServer(String host, int port) { public boolean addServer(String host, int port) {
return addServer(connFactory.createConnection(host, port)); return addServer(connFactory.createConnection(host, port));
@ -554,7 +551,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return exceptions; return exceptions;
} }
private void addNewJob(GearmanSessionEvent event) { private GearmanFunction addNewJob(GearmanSessionEvent event) {
byte[] handle, data, functionNameBytes, unique; byte[] handle, data, functionNameBytes, unique;
GearmanPacket p = event.getPacket(); GearmanPacket p = event.getPacket();
String functionName; String functionName;
@ -572,6 +569,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
new GearmanPacketImpl(GearmanPacketMagic.REQ, new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.WORK_FAIL, handle)); GearmanPacketType.WORK_FAIL, handle));
session.submitTask(gsr); session.submitTask(gsr);
enqueueNoopEvent();
} else { } else {
GearmanFunction function = def.getFactory().getFunction(); GearmanFunction function = def.getFactory().getFunction();
function.setData(data); function.setData(data);
@ -580,8 +578,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (unique != null && unique.length > 0) { if (unique != null && unique.length > 0) {
function.setUniqueId(unique); function.setUniqueId(unique);
} }
functionList.add(function); return function;
} }
return null;
} }
private void submitFunction(GearmanFunction fun) { private void submitFunction(GearmanFunction fun) {