Avoid holding locks on gewtHandles.

Some of the operations we were doing on worker threads while
iterating over the list of worker threads were fairly deep and
themselves required locks.  Instead, try to hold the lock on
worker thread lists as short as possible.  Snapshot them and
iterate over the snapshot when doing complicated tasks.

Change-Id: I4c257e83b2a2f985bf571f4f64e2db44d2d17e3d
This commit is contained in:
James E. Blair 2013-06-15 10:15:43 -07:00
parent 2ba8b61b3a
commit 9bacb54664
2 changed files with 39 additions and 15 deletions

View File

@ -124,11 +124,16 @@ public abstract class AbstractWorkerThread implements Runnable {
public void stop() {
running = false;
logger.info("=== " + getName() + " Request to stop AWT: " + this);
logger.info("=== " + getName() + " Thread: " + thread + " name: " + thread.getName());
logger.info("=== " + getName() + " Worker: " + worker);
worker.stop();
logger.info("=== " + getName() + " Interrupting worker");
// Interrupt the thread so it unblocks any blocking call
thread.interrupt();
logger.info("=== " + getName() + " Joining thread");
// Wait until the thread exits
try {
thread.join();
@ -136,6 +141,7 @@ public abstract class AbstractWorkerThread implements Runnable {
// Unexpected interruption
logger.error("Exception while waiting for thread to join", e);
}
logger.info("=== " + getName() + " Stop request done");
}
/*

View File

@ -198,26 +198,33 @@ public class GearmanProxy {
*/
public void stopAll() {
// stop gearman executors
synchronized(gewtHandles) {
for (AbstractWorkerThread gewtHandle : gewtHandles) { // stop executors
gewtHandle.stop();
}
List<AbstractWorkerThread> stopHandles;
synchronized (gewtHandles) {
stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
gewtHandles.clear();
}
for (AbstractWorkerThread wt : stopHandles) { // stop executors
wt.stop();
}
synchronized (availabilityMonitors) {
// They will be recreated if/when the
// ExecutorWorkerThreads are recreated.
availabilityMonitors.clear();
}
synchronized(gmwtHandles) {
for (AbstractWorkerThread gmwtHandle : gmwtHandles) { // stop executors
gmwtHandle.stop();
}
stopHandles = new ArrayList<AbstractWorkerThread>();
synchronized (gmwtHandles) {
stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
gmwtHandles.clear();
}
for (AbstractWorkerThread wt : stopHandles) { // stop executors
wt.stop();
}
logger.info("---- Num of executors running = " + getNumExecutors());
}
@ -232,17 +239,22 @@ public class GearmanProxy {
*/
public void stop(Computer computer) {
Node node = computer.getNode();
AbstractWorkerThread workerThread = null;
// find the computer in the executor workers list and stop it
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
AbstractWorkerThread t = it.next();
if (t.name.contains(computer.getName())) {
t.stop();
workerThread = t;
it.remove();
break;
}
}
}
if (workerThread != null) {
workerThread.stop();
}
removeAvailabilityMonitor(node);
logger.info("---- Num of executors running = " + getNumExecutors());
@ -321,18 +333,24 @@ public class GearmanProxy {
Queue.BuildableItem item) {
// Ask the AvailabilityMonitor for this node if it's okay to
// run this build.
ExecutorWorkerThread workerThread = null;
synchronized(gewtHandles) {
for (Iterator<AbstractWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
ExecutorWorkerThread t = ((ExecutorWorkerThread)it.next());
if (t.getNode() == node) {
if (t.getAvailability().canTake(item)) {
return null;
} else {
return new CauseOfBlockage.BecauseNodeIsBusy(node);
}
workerThread = t;
break;
}
}
}
if (workerThread != null) {
if (workerThread.getAvailability().canTake(item)) {
return null;
} else {
return new CauseOfBlockage.BecauseNodeIsBusy(node);
}
}
return null;
}
}