From 4f2f53f38a857e6cae0061cb885270b41e44e7ca Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Mon, 4 May 2015 18:09:31 -0700 Subject: [PATCH] Fix race between adding job and registering Gearman plugin had a race between adding jobs to the functionList and registering jobs. When registering jobs the functionMap is cleared, when adding a job the plugin checks if the job is in the function Map before running it. If we happen to trigger registration of jobs when we get a response from gearman with a job assignment then the functionMap can be empty making us send a work fail instead of running the job. To make things worse this jenkins worker would not send a subsequent GET JOB and would live lock never doing any useful work. Correct this by making the processing for gearman events synchronous in the work loop. This ensures that we never try to clear the function map and check against it at the same time via different threads. To make this happen the handleSessionEvent() method puts all events on a thread safe queue for synchronous processing. This has allowed us to simplify the work() loop and basically do the following: while running: init() register() process one event run function if processed drive IO This is much easier to reason about as we essentially only have bookkeeping and the code for one thing at a time. Change-Id: Id537710f6c8276a528ad78afd72c5a7c8e8a16ac --- .../plugins/gearman/MyGearmanWorkerImpl.java | 233 ++++++++---------- 1 file changed, 107 insertions(+), 126 deletions(-) diff --git a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java index d261927..ec914e1 100644 --- a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java +++ b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java @@ -36,8 +36,8 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ConcurrentLinkedQueue; import org.gearman.common.Constants; import org.gearman.common.GearmanException; @@ -70,7 +70,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { IDLE, RUNNING, SHUTTINGDOWN } private static final String DESCRIPION_PREFIX = "GearmanWorker"; - private Queue functionList = null; + private ConcurrentLinkedQueue eventList = null; private Selector ioAvailable = null; private static final org.slf4j.Logger LOG = LoggerFactory.getLogger( Constants.GEARMAN_WORKER_LOGGER_NAME); @@ -78,7 +78,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { private Map functionMap; private State state; private ExecutorService executorService; - private Map taskMap = null; private GearmanJobServerSession session = null; private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory(); private volatile boolean jobUniqueIdRequired = false; @@ -183,12 +182,11 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { public MyGearmanWorkerImpl(ExecutorService executorService, AvailabilityMonitor availability) { this.availability = availability; - functionList = new LinkedList(); + eventList = new ConcurrentLinkedQueue(); id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId(); functionMap = new HashMap(); state = State.IDLE; this.executorService = executorService; - taskMap = new HashMap(); functionRegistry = new FunctionRegistry(); try { @@ -251,18 +249,23 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { LOG.debug("---- Worker " + this + " registered function " + factory.getFunctionName()); } + enqueueNoopEvent(); + } + public void enqueueNoopEvent() { // Simulate a NOOP packet which will kick off a GRAB_JOB cycle - // if we're sleeping. If we get a real NOOP in the mean time, - // it should be fine because GearmanJobServerSession ignores a - // NOOP if PRE_SLEEP is not on the stack. + // If we get a real NOOP in the mean time, it should be fine + // because GearmanJobServerSession ignores a NOOP if PRE_SLEEP + // is not on the stack. GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES, GearmanPacketType.NOOP, new byte[0]); GearmanSessionEvent event = new GearmanSessionEvent(p, session); - session.handleSessionEvent(event); + enqueueEvent(event); } public void work() { + GearmanSessionEvent event = null; + GearmanFunction function = null; LOG.info("---- Worker " + this + " starting work"); if (!state.equals(State.IDLE)) { @@ -271,7 +274,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { } state = State.RUNNING; - boolean grabJobSent = false; while (isRunning()) { LOG.debug("---- Worker " + this + " top of run loop"); @@ -279,12 +281,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { if (!session.isInitialized()) { LOG.debug("---- Worker " + this + " run loop reconnect"); reconnect(); - grabJobSent = false; - } - - // if still disconnected, skip - if (!session.isInitialized()) { - LOG.debug("---- Worker " + this + " run loop not initialized"); + enqueueNoopEvent(); + // Restart loop to check we connected. continue; } @@ -298,55 +296,41 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { continue; } - if (!isRunning()) continue; + if (!isRunning() || !session.isInitialized()) continue; - if (functionList.isEmpty()) { - LOG.debug("---- Worker " + this + " run loop function list is empty while" + - " checking for initial grab job"); - if (!grabJobSent) { - // send the initial GRAB_JOB on reconnection. - LOG.info("---- Worker " + this + " sending initial grab job"); - try { - sendGrabJob(session); - } catch (InterruptedException e) { - LOG.warn("---- Worker " + this + - " interrupted while waiting for okay to send grab job", e); - continue; - } - grabJobSent = true; - try { - session.driveSessionIO(); - } catch (IOException io) { - LOG.warn("---- Worker " + this + " receieved IOException while" + - " sending initial grab job", io); - session.closeSession(); - continue; - } - } - } - LOG.debug("---- Worker " + this + " run loop finished initial grab job"); + event = eventList.poll(); + function = processSessionEvent(event); - if (!isRunning()) continue; + if (!isRunning() || !session.isInitialized()) continue; - if (functionList.isEmpty()) { - LOG.debug("---- Worker " + this + " function list empty; selecting"); - int interestOps = SelectionKey.OP_READ; - if (session.sessionHasDataToWrite()) { - interestOps |= SelectionKey.OP_WRITE; - } - session.getSelectionKey().interestOps(interestOps); - - try { - ioAvailable.select(); - } catch (IOException io) { - LOG.warn("---- Worker " + this + " receieved IOException while" + - " selecting for IO", io); - session.closeSession(); - continue; - } + //For the time being we will execute the jobs synchronously + //in the future, I expect to change this. + if (function != null) { + LOG.info("---- Worker " + this + " executing function"); + submitFunction(function); + // Send another grab_job on the next loop + enqueueNoopEvent(); + } + + if (!isRunning() || !session.isInitialized()) continue; + + // Run IO, select waiting for ability to read and/or write + // then read and/or write. + int interestOps = SelectionKey.OP_READ; + if (session.sessionHasDataToWrite()) { + interestOps |= SelectionKey.OP_WRITE; + } + session.getSelectionKey().interestOps(interestOps); + + try { + ioAvailable.select(); + } catch (IOException io) { + LOG.warn("---- Worker " + this + " receieved IOException while" + + " selecting for IO", io); + session.closeSession(); + continue; } - LOG.debug("---- Worker " + this + " run loop finished selecting"); if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) { LOG.debug("---- Worker " + this + " received input in run loop"); if (!session.isInitialized()) { @@ -363,19 +347,6 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { } } LOG.debug("---- Worker " + this + " run loop finished driving session io"); - - if (!isRunning()) continue; - - //For the time being we will execute the jobs synchronously - //in the future, I expect to change this. - if (!functionList.isEmpty()) { - LOG.info("---- Worker " + this + " executing function"); - GearmanFunction fun = functionList.remove(); - submitFunction(fun); - // Send another grab_job on the next loop - grabJobSent = false; - } - LOG.debug("---- Worker " + this + " bottom of run loop"); } shutDownWorker(true); @@ -391,63 +362,71 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { new GrabJobEventHandler(s), new GearmanPacketImpl(GearmanPacketMagic.REQ, getGrabJobPacketType(), new byte[0])); - taskMap.put(s, grabJobTask); s.submitTask(grabJobTask); } public void handleSessionEvent(GearmanSessionEvent event) throws IllegalArgumentException, IllegalStateException { - GearmanPacket p = event.getPacket(); - GearmanJobServerSession s = event.getSession(); - GearmanPacketType t = p.getPacketType(); - LOG.debug("---- Worker " + this + " handling session event" + - " ( Session = " + s + " Event = " + t + " )"); - switch (t) { - case JOB_ASSIGN: - //TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true - LOG.info("---- Worker " + this + " received job assignment"); - taskMap.remove(s); - addNewJob(event); - break; - case JOB_ASSIGN_UNIQ: - //TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false - LOG.info("---- Worker " + this + " received unique job assignment"); - taskMap.remove(s); - addNewJob(event); - break; - case NOOP: - taskMap.remove(s); - LOG.debug("---- Worker " + this + " sending grab job after wakeup"); - try { - sendGrabJob(s); - } catch (InterruptedException e) { - LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " + - "grab job", e); - } - break; - case NO_JOB: - // We didn't get a job, so allow other workers or - // Jenkins to schedule on this node. - availability.unlock(this); - LOG.debug("---- Worker " + this + " sending pre sleep after no_job"); - GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s), - new GearmanPacketImpl(GearmanPacketMagic.REQ, - GearmanPacketType.PRE_SLEEP, new byte[0])); - taskMap.put(s, preSleepTask); - s.submitTask(preSleepTask); - break; - case ECHO_RES: - break; - case OPTION_RES: - break; - case ERROR: - s.closeSession(); - break; - default: - LOG.warn("---- Worker " + this + " received unknown packet type " + t + - " from session " + s + "; closing connection"); - s.closeSession(); + enqueueEvent(event); + } + + public void enqueueEvent(GearmanSessionEvent event) { + // Enqueue in a thread safe manner. Events will + // be pulled off and processed serially in this workers + // main thread. + eventList.add(event); + } + + private GearmanFunction processSessionEvent(GearmanSessionEvent event) + throws IllegalArgumentException, IllegalStateException { + if (event != null) { + GearmanPacket p = event.getPacket(); + GearmanJobServerSession s = event.getSession(); + GearmanPacketType t = p.getPacketType(); + LOG.debug("---- Worker " + this + " handling session event" + + " ( Session = " + s + " Event = " + t + " )"); + switch (t) { + case JOB_ASSIGN: + //TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true + LOG.info("---- Worker " + this + " received job assignment"); + return addNewJob(event); + case JOB_ASSIGN_UNIQ: + //TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false + LOG.info("---- Worker " + this + " received unique job assignment"); + return addNewJob(event); + case NOOP: + LOG.debug("---- Worker " + this + " sending grab job after wakeup"); + try { + sendGrabJob(s); + } catch (InterruptedException e) { + LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " + + "grab job", e); + } + break; + case NO_JOB: + // We didn't get a job, so allow other workers or + // Jenkins to schedule on this node. + availability.unlock(this); + LOG.debug("---- Worker " + this + " sending pre sleep after no_job"); + GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s), + new GearmanPacketImpl(GearmanPacketMagic.REQ, + GearmanPacketType.PRE_SLEEP, new byte[0])); + s.submitTask(preSleepTask); + break; + case ECHO_RES: + break; + case OPTION_RES: + break; + case ERROR: + s.closeSession(); + break; + default: + LOG.warn("---- Worker " + this + " received unknown packet type " + t + + " from session " + s + "; closing connection"); + s.closeSession(); + } } + return null; } public boolean addServer(String host, int port) { @@ -554,7 +533,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { return exceptions; } - private void addNewJob(GearmanSessionEvent event) { + private GearmanFunction addNewJob(GearmanSessionEvent event) { byte[] handle, data, functionNameBytes, unique; GearmanPacket p = event.getPacket(); String functionName; @@ -572,6 +551,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.WORK_FAIL, handle)); session.submitTask(gsr); + enqueueNoopEvent(); } else { GearmanFunction function = def.getFactory().getFunction(); function.setData(data); @@ -580,8 +560,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { if (unique != null && unique.length > 0) { function.setUniqueId(unique); } - functionList.add(function); + return function; } + return null; } private void submitFunction(GearmanFunction fun) {