Don't grab jobs when shutting down.

This rearranges a bit of the previous change to move the WaitBool
functionality into an AvailabilityChecker class.

It implements the new feature of not grabbing jobs while not in the
quiet mode for shutdown by using a busy wait.  There doesn't seem to
be an event framework for that, and well, it doesn't happen very
often, so a slow busy wait probably isn't terrible.

This only applies to Executor workers, not Management workers
(so jobs can still be stopped and descriptions set).

Removed the default-name constructor of AbstractWorkerThread because
it is not used anywhere now (removed its test as well).

Change-Id: I6d5e1cd3cb47c8876ceb909d205cb66445388992
This commit is contained in:
James E. Blair 2013-06-11 15:58:34 -07:00
parent 858fb155fe
commit 76cb343b8c
7 changed files with 82 additions and 60 deletions

View File

@ -44,22 +44,21 @@ public abstract class AbstractWorkerThread implements Runnable {
protected String name;
protected MyGearmanWorkerImpl worker;
protected GearmanNIOJobServerConnection conn;
protected AvailabilityChecker availability;
private Thread thread;
private boolean running = false;
public AbstractWorkerThread(String host, int port) {
this(host, port, Constants.GEARMAN_DEFAULT_EXECUTOR_NAME);
}
public AbstractWorkerThread(String host, int port, String name) {
public AbstractWorkerThread(String host, int port, String name,
AvailabilityChecker availability) {
setHost(host);
setPort(port);
setName(name);
setAvailability(availability);
initWorker();
}
protected void initWorker() {
worker = new MyGearmanWorkerImpl();
worker = new MyGearmanWorkerImpl(getAvailability());
conn = new GearmanNIOJobServerConnection(host, port);
}
@ -87,6 +86,14 @@ public abstract class AbstractWorkerThread implements Runnable {
this.name = name;
}
public AvailabilityChecker getAvailability() {
return availability;
}
public void setAvailability(AvailabilityChecker availability) {
this.availability = availability;
}
/*
* Register jobs with the gearman worker.
* This method should be overriden.

View File

@ -0,0 +1,51 @@
/*
*
* Copyright 2013 OpenStack Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package hudson.plugins.gearman;
import jenkins.model.Jenkins;
public class AvailabilityChecker {
private boolean okayToGrabJob = true;
private final boolean checkQuietingDown;
AvailabilityChecker(boolean checkQuietingDown)
{
this.checkQuietingDown = checkQuietingDown;
}
public synchronized void setOkayToGrabJob(boolean value) {
this.okayToGrabJob = value;
this.notifyAll();
}
public void waitUntilOkayToGrabJob()
throws InterruptedException
{
synchronized(this) {
while (!okayToGrabJob) {
this.wait();
}
}
if (checkQuietingDown) {
while (Jenkins.getInstance().isQuietingDown()) {
Thread.sleep(5000);
}
}
}
}

View File

@ -55,9 +55,9 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
private HashMap<String,GearmanFunctionFactory> functionMap;
// constructor
public ExecutorWorkerThread(String host, int port, String name, Node node,
String masterName) {
super(host, port, name);
public ExecutorWorkerThread(String host, int port, String name,
Node node, String masterName) {
super(host, port, name, new AvailabilityChecker(true));
this.node = node;
this.masterName = masterName;
}
@ -204,7 +204,7 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
// a build has started on this computer
Computer computer = node.toComputer();
if (computer.countIdle() == 0) {
worker.setOkayToGrabJob(false);
getAvailability().setOkayToGrabJob(false);
}
// TODO: There is a race condition here -- a worker may have
@ -218,7 +218,7 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
// a build has completed on this executor
Computer computer = node.toComputer();
worker.setOkayToGrabJob(true);
getAvailability().setOkayToGrabJob(true);
// TODO: There could still be jobs in the queue that may or
// may not be assigned to this computer. If there are, we

View File

@ -42,7 +42,7 @@ public class ManagementWorkerThread extends AbstractWorkerThread{
private final String masterName;
public ManagementWorkerThread(String host, int port, String name, String masterName){
super(host, port, name);
super(host, port, name, new AvailabilityChecker(false));
this.masterName = masterName;
}

View File

@ -83,31 +83,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
private volatile boolean jobUniqueIdRequired = false;
private FunctionRegistry functionRegistry;
private WaitBool okayToGrabJob = new WaitBool(true);
class WaitBool {
private boolean value;
WaitBool(boolean value) {
this.value = value;
}
public synchronized void set(boolean value) {
this.value = value;
this.notifyAll();
}
public synchronized void waitUntil(boolean value)
throws InterruptedException
{
if (this.value == value)
return;
while (this.value != value) {
this.wait();
}
}
}
private AvailabilityChecker availability;
class GrabJobEventHandler implements GearmanServerResponseHandler {
@ -196,11 +172,13 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
LOG.info("Ending reconnect for " + session.toString());
}
public MyGearmanWorkerImpl() {
this (null);
public MyGearmanWorkerImpl(AvailabilityChecker availability) {
this (null, availability);
}
public MyGearmanWorkerImpl(ExecutorService executorService) {
public MyGearmanWorkerImpl(ExecutorService executorService,
AvailabilityChecker availability) {
this.availability = availability;
functionList = new LinkedList<GearmanFunction>();
id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId();
functionMap = new HashMap<String, FunctionDefinition>();
@ -375,12 +353,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
shutDownWorker(true);
}
public void setOkayToGrabJob(boolean value) {
okayToGrabJob.set(value);
}
private void sendGrabJob(GearmanJobServerSession s) throws InterruptedException {
okayToGrabJob.waitUntil(true);
availability.waitUntilOkayToGrabJob();
GearmanTask grabJobTask = new GearmanTask(
new GrabJobEventHandler(s),

View File

@ -54,21 +54,15 @@ public class AbstractWorkerThreadTest {
public void tearDown() throws Exception {
}
@Test
public void testAnonymousThread() {
AbstractWorkerThread fakeWorker = new FakeWorkerThread("GearmanServer", 4730);
assertEquals("anonymous", fakeWorker.getName());
}
@Test
public void testNamedThread() {
AbstractWorkerThread fakeWorker = new FakeWorkerThread("GearmanServer", 4730, "faker");
AbstractWorkerThread fakeWorker = new FakeWorkerThread("GearmanServer", 4730, "faker", null);
assertEquals("faker", fakeWorker.getName());
}
@Test
public void testStartStopThread() {
AbstractWorkerThread fakeWorker = new FakeWorkerThread("GearmanServer", 4730);
AbstractWorkerThread fakeWorker = new FakeWorkerThread("GearmanServer", 4730, "faker", null);
fakeWorker.start();
assertTrue(fakeWorker.isAlive());
fakeWorker.stop();

View File

@ -33,13 +33,9 @@ public class FakeWorkerThread extends AbstractWorkerThread{
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
// constructor
public FakeWorkerThread(String host, int port) {
super(host, port);
}
public FakeWorkerThread(String host, int port, String name) {
super(host, port, name);
public FakeWorkerThread(String host, int port, String name,
AvailabilityChecker availability) {
super(host, port, name, availability);
}
/**