From dc60cad76904c7b95d2153e1f0e9ca88a5e1b478 Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Wed, 29 Apr 2015 11:48:04 -0700 Subject: [PATCH] Fix a cause of livelock in worker work() method There were cases where the gearman plugin would submit gearman tasks in response to some event without handling the IO driving that task. If this happened it was possible for the state machine to go sideways and no longer send new get job requests. Fix this by driving IO as long as the functions to run list is empty and we have any outstanding submitted tasks. Logs from jenkins slave that failed to complete its grab job request when a job update interrupted it. Apr 28, 2015 9:41:29 PM hudson.slaves.SlaveComputer tryReconnect INFO: Attempting to reconnect bare-trusty-hpcloud-b2-2353037 -- Apr 28, 2015 9:41:38 PM hudson.plugins.gearman.AbstractWorkerThread run INFO: ---- Starting Worker bare-trusty-hpcloud-b2-2353037_exec-0 (Tue Apr 28 21:41:38 UTC 2015) -- Apr 28, 2015 9:41:38 PM hudson.plugins.gearman.MyGearmanWorkerImpl setFunctions INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 registering 7420 functions Apr 28, 2015 9:41:38 PM hudson.plugins.gearman.MyGearmanWorkerImpl work INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 starting work -- Apr 28, 2015 9:43:52 PM hudson.plugins.gearman.MyGearmanWorkerImpl work INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 sending initial grab job -- Apr 28, 2015 9:44:01 PM hudson.plugins.gearman.MyGearmanWorkerImpl setFunctions INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 registering 7422 functions -- Apr 28, 2015 9:44:37 PM hudson.plugins.gearman.MyGearmanWorkerImpl handleSessionEvent INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 received unique job assignment -- Apr 29, 2015 11:48:58 AM hudson.plugins.gearman.MyGearmanWorkerImpl setFunctions INFO: ---- Worker bare-trusty-hpcloud-b2-2353037_exec-0 registering 7424 functions Change-Id: I2a1a4cf76dab5e1dc3ad5109aa0290a2ecc97fd9 --- .../plugins/gearman/MyGearmanWorkerImpl.java | 59 ++++++++++++------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java index d261927..2d56b3c 100644 --- a/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java +++ b/src/main/java/hudson/plugins/gearman/MyGearmanWorkerImpl.java @@ -150,6 +150,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { this.updated = updated; } + public boolean getUpdated() { + return updated; + } } public void reconnect() { @@ -273,6 +276,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { state = State.RUNNING; boolean grabJobSent = false; + running: while (isRunning()) { LOG.debug("---- Worker " + this + " top of run loop"); @@ -328,7 +332,9 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { if (!isRunning()) continue; - if (functionList.isEmpty()) { + // Loop as long as we don't have function to run and there are + // tasks in flight. + while (functionList.isEmpty() && taskMap.get(session) != null) { LOG.debug("---- Worker " + this + " function list empty; selecting"); int interestOps = SelectionKey.OP_READ; if (session.sessionHasDataToWrite()) { @@ -337,34 +343,46 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { session.getSelectionKey().interestOps(interestOps); try { - ioAvailable.select(); + // Wait up to 2 seconds before rechecking the functionList + ioAvailable.select(2000); } catch (IOException io) { LOG.warn("---- Worker " + this + " receieved IOException while" + " selecting for IO", io); session.closeSession(); - continue; + continue running; } - } - LOG.debug("---- Worker " + this + " run loop finished selecting"); - if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) { - LOG.debug("---- Worker " + this + " received input in run loop"); - if (!session.isInitialized()) { - LOG.debug("---- Worker " + this + " session is no longer initialized"); - continue; + if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) { + LOG.debug("---- Worker " + this + " received input in run loop"); + if (!session.isInitialized()) { + LOG.debug("---- Worker " + this + " session is no longer initialized"); + continue running; + } + try { + session.driveSessionIO(); + } catch (IOException io) { + LOG.warn("---- Worker " + this + " received IOException while driving" + + " IO on session " + session, io); + session.closeSession(); + continue running; + } } - try { - session.driveSessionIO(); - } catch (IOException io) { - LOG.warn("---- Worker " + this + " received IOException while driving" + - " IO on session " + session, io); - session.closeSession(); - continue; + GearmanTask currentTask = taskMap.get(session); + GearmanPacket packet = currentTask.getRequestPacket(); + if (packet.getPacketType() == GearmanPacketType.WORK_FAIL) { + // Ensure we send a grab job after a failed attempt. + grabJobSent = false; + continue running; } + // If we are have attempted to sleep but need to update + // functions start work again to register. + else if (functionRegistry.getUpdated() && + packet.getPacketType() == GearmanPacketType.PRE_SLEEP) { + continue running; + } + else if (!isRunning()) continue running; } - LOG.debug("---- Worker " + this + " run loop finished driving session io"); - - if (!isRunning()) continue; + LOG.debug("---- Worker " + this + " run loop found at least one function"); //For the time being we will execute the jobs synchronously //in the future, I expect to change this. @@ -571,6 +589,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { GearmanTask gsr = new GearmanTask( new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.WORK_FAIL, handle)); + taskMap.put(session, gsr); session.submitTask(gsr); } else { GearmanFunction function = def.getFactory().getFunction();