Make worker threads more resilient.

If an unhandled exception happens during the work method, shut
down the worker and start again.

Also, name worker threads for easier debugging.

Change-Id: Icc8e4c060bca4ab624bffc2e1bef18655d50d6b2
This commit is contained in:
James E. Blair 2013-06-07 14:25:12 -07:00
parent 2f579179bb
commit b4e3ed8e3a
2 changed files with 30 additions and 14 deletions

View File

@ -44,8 +44,9 @@ public abstract class AbstractWorkerThread implements Runnable {
protected int port;
protected String name;
protected MyGearmanWorkerImpl worker;
private final GearmanNIOJobServerConnection conn;
protected GearmanNIOJobServerConnection conn;
private Thread thread;
private boolean running = false;
public AbstractWorkerThread(String host, int port) {
this(host, port, DEFAULT_EXECUTOR_NAME);
@ -55,6 +56,10 @@ public abstract class AbstractWorkerThread implements Runnable {
setHost(host);
setPort(port);
setName(name);
initWorker();
}
protected void initWorker() {
worker = new MyGearmanWorkerImpl();
conn = new GearmanNIOJobServerConnection(host, port);
}
@ -102,7 +107,8 @@ public abstract class AbstractWorkerThread implements Runnable {
* Start the thread
*/
public void start() {
thread = new Thread(this);
running = true;
thread = new Thread(this, "Gearman worker " + name);
thread.start();
}
@ -110,30 +116,26 @@ public abstract class AbstractWorkerThread implements Runnable {
* Stop the thread
*/
public void stop() {
running = false;
// Interrupt the thread so it unblocks any blocking call
if (worker.isRunning()) {
try {
worker.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
thread.interrupt();
// Wait until the thread exits
try {
thread.join();
} catch (InterruptedException ex) {
// Unexpected interruption
ex.printStackTrace();
}
}
/*
@ -143,13 +145,23 @@ public abstract class AbstractWorkerThread implements Runnable {
@Override
public void run() {
if (!worker.isRunning()) {
logger.info("---- Starting Worker "+ getName() +" ("+new Date().toString()+")");
worker.addServer(conn);
worker.setWorkerID(name);
worker.setJobUniqueIdRequired(true);
registerJobs();
worker.work();
while (running) {
try {
logger.info("---- Starting Worker "+ getName() +" ("+new Date().toString()+")");
worker.addServer(conn);
worker.setWorkerID(name);
worker.setJobUniqueIdRequired(true);
registerJobs();
worker.work();
} catch (Exception ex) {
logger.error("Exception while running worker", ex);
worker.shutdown();
try {
Thread.sleep(2000);
} catch (InterruptedException e2) {
}
initWorker();
}
}
// Thread exits

View File

@ -59,6 +59,10 @@ public class ExecutorWorkerThread extends AbstractWorkerThread{
super(host, port, name);
this.node = node;
this.masterName = masterName;
}
protected void initWorker() {
super.initWorker();
this.functionMap = new HashMap<String,GearmanFunctionFactory>();
}