Merge "Fix race between adding job and registering"

This commit is contained in:
Jenkins 2015-05-06 20:08:59 +00:00 committed by Gerrit Code Review
commit 0314ab1ea2
1 changed files with 129 additions and 127 deletions

View File

@ -36,7 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import org.gearman.common.Constants;
@ -70,7 +70,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
IDLE, RUNNING, SHUTTINGDOWN
}
private static final String DESCRIPION_PREFIX = "GearmanWorker";
private Queue<GearmanFunction> functionList = null;
private ConcurrentLinkedQueue<GearmanSessionEvent> eventList = null;
private Selector ioAvailable = null;
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(
Constants.GEARMAN_WORKER_LOGGER_NAME);
@ -78,7 +78,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
private Map<String, FunctionDefinition> functionMap;
private State state;
private ExecutorService executorService;
private Map<GearmanJobServerSession, GearmanTask> taskMap = null;
private GearmanJobServerSession session = null;
private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
private volatile boolean jobUniqueIdRequired = false;
@ -163,6 +162,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
GearmanPacketType.SET_CLIENT_ID,
ByteUtils.toUTF8Bytes(id)));
}
// Reset events so that we don't process events from the old
// connection.
eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
// this will cause a grab-job event
functionRegistry.setUpdated(true);
} catch (IOException e) {
@ -183,12 +185,11 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
public MyGearmanWorkerImpl(ExecutorService executorService,
AvailabilityMonitor availability) {
this.availability = availability;
functionList = new LinkedList<GearmanFunction>();
eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId();
functionMap = new HashMap<String, FunctionDefinition>();
state = State.IDLE;
this.executorService = executorService;
taskMap = new HashMap<GearmanJobServerSession, GearmanTask>();
functionRegistry = new FunctionRegistry();
try {
@ -252,17 +253,34 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
factory.getFunctionName());
}
// Simulate a NOOP packet which will kick off a GRAB_JOB cycle
// if we're sleeping. If we get a real NOOP in the mean time,
// it should be fine because GearmanJobServerSession ignores a
// NOOP if PRE_SLEEP is not on the stack.
GearmanSessionEvent nextEvent = eventList.peek();
if (nextEvent == null ||
nextEvent.getPacket().getPacketType() != GearmanPacketType.NOOP) {
// Simulate a NOOP packet which will kick off a GRAB_JOB cycle
// if we're sleeping. If we get a real NOOP in the mean time,
// it should be fine because GearmanJobServerSession ignores a
// NOOP if PRE_SLEEP is not on the stack.
GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES,
GearmanPacketType.NOOP, new byte[0]);
GearmanSessionEvent event = new GearmanSessionEvent(p, session);
session.handleSessionEvent(event);
}
}
public void enqueueNoopEvent() {
// Simulate a NOOP packet which will kick off a GRAB_JOB cycle.
// This unconditionally enqueues the NOOP which will send a GRAB_JOB
// and should only be used when you know you need to send a GRAB_JOB.
// Cases like worker start, post function run, post failure.
GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES,
GearmanPacketType.NOOP, new byte[0]);
GearmanSessionEvent event = new GearmanSessionEvent(p, session);
session.handleSessionEvent(event);
enqueueEvent(event);
}
public void work() {
GearmanSessionEvent event = null;
GearmanFunction function = null;
LOG.info("---- Worker " + this + " starting work");
if (!state.equals(State.IDLE)) {
@ -271,7 +289,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
}
state = State.RUNNING;
boolean grabJobSent = false;
// When we first start working we will already be initialized so must
// enqueue a Noop event to trigger GRAB_JOB here.
enqueueNoopEvent();
while (isRunning()) {
LOG.debug("---- Worker " + this + " top of run loop");
@ -279,12 +299,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " run loop reconnect");
reconnect();
grabJobSent = false;
}
// if still disconnected, skip
if (!session.isInitialized()) {
LOG.debug("---- Worker " + this + " run loop not initialized");
enqueueNoopEvent();
// Restart loop to check we connected.
continue;
}
@ -298,55 +314,44 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
continue;
}
if (!isRunning()) continue;
if (!isRunning() || !session.isInitialized()) continue;
if (functionList.isEmpty()) {
LOG.debug("---- Worker " + this + " run loop function list is empty while" +
" checking for initial grab job");
if (!grabJobSent) {
// send the initial GRAB_JOB on reconnection.
LOG.info("---- Worker " + this + " sending initial grab job");
try {
sendGrabJob(session);
} catch (InterruptedException e) {
LOG.warn("---- Worker " + this +
" interrupted while waiting for okay to send grab job", e);
continue;
}
grabJobSent = true;
try {
session.driveSessionIO();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" sending initial grab job", io);
session.closeSession();
continue;
}
}
}
LOG.debug("---- Worker " + this + " run loop finished initial grab job");
event = eventList.poll();
function = processSessionEvent(event);
if (!isRunning()) continue;
if (!isRunning() || !session.isInitialized()) continue;
if (functionList.isEmpty()) {
LOG.debug("---- Worker " + this + " function list empty; selecting");
int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) {
interestOps |= SelectionKey.OP_WRITE;
}
session.getSelectionKey().interestOps(interestOps);
try {
ioAvailable.select();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" selecting for IO", io);
session.closeSession();
continue;
}
// For the time being we will execute the jobs synchronously
// in the future, I expect to change this.
if (function != null) {
LOG.info("---- Worker " + this + " executing function");
submitFunction(function);
// Send another grab_job on the next loop
enqueueNoopEvent();
// Skip IO as submitFunction drives the IO for function
// running.
continue;
}
if (!isRunning() || !session.isInitialized()) continue;
// Run IO, select waiting for ability to read and/or write
// then read and/or write.
int interestOps = SelectionKey.OP_READ;
if (session.sessionHasDataToWrite()) {
interestOps |= SelectionKey.OP_WRITE;
}
session.getSelectionKey().interestOps(interestOps);
try {
ioAvailable.select();
} catch (IOException io) {
LOG.warn("---- Worker " + this + " receieved IOException while" +
" selecting for IO", io);
session.closeSession();
continue;
}
LOG.debug("---- Worker " + this + " run loop finished selecting");
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
LOG.debug("---- Worker " + this + " received input in run loop");
if (!session.isInitialized()) {
@ -363,19 +368,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
}
}
LOG.debug("---- Worker " + this + " run loop finished driving session io");
if (!isRunning()) continue;
//For the time being we will execute the jobs synchronously
//in the future, I expect to change this.
if (!functionList.isEmpty()) {
LOG.info("---- Worker " + this + " executing function");
GearmanFunction fun = functionList.remove();
submitFunction(fun);
// Send another grab_job on the next loop
grabJobSent = false;
}
LOG.debug("---- Worker " + this + " bottom of run loop");
}
shutDownWorker(true);
@ -391,63 +383,71 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
getGrabJobPacketType(), new byte[0]));
taskMap.put(s, grabJobTask);
s.submitTask(grabJobTask);
}
public void handleSessionEvent(GearmanSessionEvent event)
throws IllegalArgumentException, IllegalStateException {
GearmanPacket p = event.getPacket();
GearmanJobServerSession s = event.getSession();
GearmanPacketType t = p.getPacketType();
LOG.debug("---- Worker " + this + " handling session event" +
" ( Session = " + s + " Event = " + t + " )");
switch (t) {
case JOB_ASSIGN:
//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
LOG.info("---- Worker " + this + " received job assignment");
taskMap.remove(s);
addNewJob(event);
break;
case JOB_ASSIGN_UNIQ:
//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
LOG.info("---- Worker " + this + " received unique job assignment");
taskMap.remove(s);
addNewJob(event);
break;
case NOOP:
taskMap.remove(s);
LOG.debug("---- Worker " + this + " sending grab job after wakeup");
try {
sendGrabJob(s);
} catch (InterruptedException e) {
LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " +
"grab job", e);
}
break;
case NO_JOB:
// We didn't get a job, so allow other workers or
// Jenkins to schedule on this node.
availability.unlock(this);
LOG.debug("---- Worker " + this + " sending pre sleep after no_job");
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.PRE_SLEEP, new byte[0]));
taskMap.put(s, preSleepTask);
s.submitTask(preSleepTask);
break;
case ECHO_RES:
break;
case OPTION_RES:
break;
case ERROR:
s.closeSession();
break;
default:
LOG.warn("---- Worker " + this + " received unknown packet type " + t +
" from session " + s + "; closing connection");
s.closeSession();
enqueueEvent(event);
}
public void enqueueEvent(GearmanSessionEvent event) {
// Enqueue in a thread safe manner. Events will
// be pulled off and processed serially in this workers
// main thread.
eventList.add(event);
}
private GearmanFunction processSessionEvent(GearmanSessionEvent event)
throws IllegalArgumentException, IllegalStateException {
if (event != null) {
GearmanPacket p = event.getPacket();
GearmanJobServerSession s = event.getSession();
GearmanPacketType t = p.getPacketType();
LOG.debug("---- Worker " + this + " handling session event" +
" ( Session = " + s + " Event = " + t + " )");
switch (t) {
case JOB_ASSIGN:
//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
LOG.info("---- Worker " + this + " received job assignment");
return addNewJob(event);
case JOB_ASSIGN_UNIQ:
//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
LOG.info("---- Worker " + this + " received unique job assignment");
return addNewJob(event);
case NOOP:
LOG.debug("---- Worker " + this + " sending grab job after wakeup");
try {
sendGrabJob(s);
} catch (InterruptedException e) {
LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " +
"grab job", e);
}
break;
case NO_JOB:
// We didn't get a job, so allow other workers or
// Jenkins to schedule on this node.
availability.unlock(this);
LOG.debug("---- Worker " + this + " sending pre sleep after no_job");
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.PRE_SLEEP, new byte[0]));
s.submitTask(preSleepTask);
break;
case ECHO_RES:
break;
case OPTION_RES:
break;
case ERROR:
s.closeSession();
break;
default:
LOG.warn("---- Worker " + this + " received unknown packet type " + t +
" from session " + s + "; closing connection");
s.closeSession();
}
}
return null;
}
public boolean addServer(String host, int port) {
@ -554,7 +554,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return exceptions;
}
private void addNewJob(GearmanSessionEvent event) {
private GearmanFunction addNewJob(GearmanSessionEvent event) {
byte[] handle, data, functionNameBytes, unique;
GearmanPacket p = event.getPacket();
String functionName;
@ -572,6 +572,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.WORK_FAIL, handle));
session.submitTask(gsr);
enqueueNoopEvent();
} else {
GearmanFunction function = def.getFactory().getFunction();
function.setData(data);
@ -580,8 +581,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
if (unique != null && unique.length > 0) {
function.setUniqueId(unique);
}
functionList.add(function);
return function;
}
return null;
}
private void submitFunction(GearmanFunction fun) {