Merge "Make logging more consistent"
This commit is contained in:
commit
88aa9a4995
|
@ -160,14 +160,14 @@ public abstract class AbstractWorkerThread implements Runnable {
|
||||||
registerJobs();
|
registerJobs();
|
||||||
worker.work();
|
worker.work();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Exception while running worker", e);
|
logger.error("---- Exception while running worker " + getName(), e);
|
||||||
if (!running) continue;
|
if (!running) continue;
|
||||||
worker.shutdown();
|
worker.shutdown();
|
||||||
if (!running) continue;
|
if (!running) continue;
|
||||||
try {
|
try {
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
} catch (InterruptedException e2) {
|
} catch (InterruptedException e2) {
|
||||||
logger.error("Exception while waiting to restart worker", e2);
|
logger.error("---- Exception while waiting to restart worker " + getName(), e2);
|
||||||
}
|
}
|
||||||
if (!running) continue;
|
if (!running) continue;
|
||||||
initWorker();
|
initWorker();
|
||||||
|
|
|
@ -153,7 +153,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void reconnect() {
|
public void reconnect() {
|
||||||
LOG.info("Starting reconnect for " + session.toString());
|
LOG.info("---- Worker " + this + " starting reconnect for " + session.toString());
|
||||||
// In case we held the availability lock earlier, release it.
|
// In case we held the availability lock earlier, release it.
|
||||||
availability.unlock(this);
|
availability.unlock(this);
|
||||||
try {
|
try {
|
||||||
|
@ -169,11 +169,11 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
} catch (InterruptedException e1) {
|
} catch (InterruptedException e1) {
|
||||||
LOG.warn("Interrupted while reconnecting", e);
|
LOG.warn("---- Worker " + this + " interrupted while reconnecting", e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Ending reconnect for " + session.toString());
|
LOG.info("---- Worker " + this + " ending reconnect for " + session.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
public MyGearmanWorkerImpl(AvailabilityMonitor availability) {
|
public MyGearmanWorkerImpl(AvailabilityMonitor availability) {
|
||||||
|
@ -194,7 +194,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
try {
|
try {
|
||||||
ioAvailable = Selector.open();
|
ioAvailable = Selector.open();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Failed to open IO selector.", ioe);
|
LOG.warn("---- Worker " + this + " failed to open IO selector", ioe);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,6 +204,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setFunctions(Set<GearmanFunctionFactory> functions) {
|
public void setFunctions(Set<GearmanFunctionFactory> functions) {
|
||||||
|
LOG.info("---- Worker " + this + " registering " + functions.size() + " functions");
|
||||||
functionRegistry.setFunctions(functions);
|
functionRegistry.setFunctions(functions);
|
||||||
ioAvailable.wakeup();
|
ioAvailable.wakeup();
|
||||||
}
|
}
|
||||||
|
@ -247,7 +248,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
session.driveSessionIO();
|
session.driveSessionIO();
|
||||||
if (!isRunning()) return;
|
if (!isRunning()) return;
|
||||||
|
|
||||||
LOG.debug("Worker " + this + " has registered function " +
|
LOG.debug("---- Worker " + this + " registered function " +
|
||||||
factory.getFunctionName());
|
factory.getFunctionName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,7 +263,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void work() {
|
public void work() {
|
||||||
LOG.info("Starting work");
|
LOG.info("---- Worker " + this + " starting work");
|
||||||
|
|
||||||
if (!state.equals(State.IDLE)) {
|
if (!state.equals(State.IDLE)) {
|
||||||
throw new IllegalStateException("Can not call work while worker " +
|
throw new IllegalStateException("Can not call work while worker " +
|
||||||
|
@ -273,21 +274,26 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
boolean grabJobSent = false;
|
boolean grabJobSent = false;
|
||||||
|
|
||||||
while (isRunning()) {
|
while (isRunning()) {
|
||||||
|
LOG.debug("---- Worker " + this + " top of run loop");
|
||||||
|
|
||||||
if (!session.isInitialized()) {
|
if (!session.isInitialized()) {
|
||||||
|
LOG.debug("---- Worker " + this + " run loop reconnect");
|
||||||
reconnect();
|
reconnect();
|
||||||
grabJobSent = false;
|
grabJobSent = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if still disconnected, skip
|
// if still disconnected, skip
|
||||||
if (!session.isInitialized()) {
|
if (!session.isInitialized()) {
|
||||||
|
LOG.debug("---- Worker " + this + " run loop not initialized");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
LOG.debug("---- Worker " + this + " run loop register functions");
|
||||||
registerFunctions();
|
registerFunctions();
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
LOG.warn("Receieved IOException while" +
|
LOG.warn("---- Worker " + this + " receieved IOException while" +
|
||||||
" registering function", io);
|
" registering functions", io);
|
||||||
session.closeSession();
|
session.closeSession();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -295,31 +301,35 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
if (!isRunning()) continue;
|
if (!isRunning()) continue;
|
||||||
|
|
||||||
if (functionList.isEmpty()) {
|
if (functionList.isEmpty()) {
|
||||||
|
LOG.debug("---- Worker " + this + " run loop function list is empty while" +
|
||||||
|
" checking for initial grab job");
|
||||||
if (!grabJobSent) {
|
if (!grabJobSent) {
|
||||||
// send the initial GRAB_JOB on reconnection.
|
// send the initial GRAB_JOB on reconnection.
|
||||||
LOG.info("Worker " + this + " sending initial grab job");
|
LOG.info("---- Worker " + this + " sending initial grab job");
|
||||||
try {
|
try {
|
||||||
sendGrabJob(session);
|
sendGrabJob(session);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn("Interrupted while waiting for okay to send " +
|
LOG.warn("---- Worker " + this +
|
||||||
"grab job", e);
|
" interrupted while waiting for okay to send grab job", e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
grabJobSent = true;
|
grabJobSent = true;
|
||||||
try {
|
try {
|
||||||
session.driveSessionIO();
|
session.driveSessionIO();
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
LOG.warn("Receieved IOException while" +
|
LOG.warn("---- Worker " + this + " receieved IOException while" +
|
||||||
" sending initial grab job", io);
|
" sending initial grab job", io);
|
||||||
session.closeSession();
|
session.closeSession();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.debug("---- Worker " + this + " run loop finished initial grab job");
|
||||||
|
|
||||||
if (!isRunning()) continue;
|
if (!isRunning()) continue;
|
||||||
|
|
||||||
if (functionList.isEmpty()) {
|
if (functionList.isEmpty()) {
|
||||||
|
LOG.debug("---- Worker " + this + " function list empty; selecting");
|
||||||
int interestOps = SelectionKey.OP_READ;
|
int interestOps = SelectionKey.OP_READ;
|
||||||
if (session.sessionHasDataToWrite()) {
|
if (session.sessionHasDataToWrite()) {
|
||||||
interestOps |= SelectionKey.OP_WRITE;
|
interestOps |= SelectionKey.OP_WRITE;
|
||||||
|
@ -329,37 +339,43 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
try {
|
try {
|
||||||
ioAvailable.select();
|
ioAvailable.select();
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
LOG.warn("Receieved IOException while" +
|
LOG.warn("---- Worker " + this + " receieved IOException while" +
|
||||||
" selecting for IO", io);
|
" selecting for IO", io);
|
||||||
session.closeSession();
|
session.closeSession();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.debug("---- Worker " + this + " run loop finished selecting");
|
||||||
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
|
if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
|
||||||
|
LOG.debug("---- Worker " + this + " received input in run loop");
|
||||||
if (!session.isInitialized()) {
|
if (!session.isInitialized()) {
|
||||||
|
LOG.debug("---- Worker " + this + " session is no longer initialized");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
session.driveSessionIO();
|
session.driveSessionIO();
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
LOG.warn("Received IOException while driving" +
|
LOG.warn("---- Worker " + this + " received IOException while driving" +
|
||||||
" IO on session " + session, io);
|
" IO on session " + session, io);
|
||||||
session.closeSession();
|
session.closeSession();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.debug("---- Worker " + this + " run loop finished driving session io");
|
||||||
|
|
||||||
if (!isRunning()) continue;
|
if (!isRunning()) continue;
|
||||||
|
|
||||||
//For the time being we will execute the jobs synchronously
|
//For the time being we will execute the jobs synchronously
|
||||||
//in the future, I expect to change this.
|
//in the future, I expect to change this.
|
||||||
if (!functionList.isEmpty()) {
|
if (!functionList.isEmpty()) {
|
||||||
|
LOG.info("---- Worker " + this + " executing function");
|
||||||
GearmanFunction fun = functionList.remove();
|
GearmanFunction fun = functionList.remove();
|
||||||
submitFunction(fun);
|
submitFunction(fun);
|
||||||
// Send another grab_job on the next loop
|
// Send another grab_job on the next loop
|
||||||
grabJobSent = false;
|
grabJobSent = false;
|
||||||
}
|
}
|
||||||
|
LOG.debug("---- Worker " + this + " bottom of run loop");
|
||||||
}
|
}
|
||||||
|
|
||||||
shutDownWorker(true);
|
shutDownWorker(true);
|
||||||
|
@ -384,26 +400,28 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
GearmanPacket p = event.getPacket();
|
GearmanPacket p = event.getPacket();
|
||||||
GearmanJobServerSession s = event.getSession();
|
GearmanJobServerSession s = event.getSession();
|
||||||
GearmanPacketType t = p.getPacketType();
|
GearmanPacketType t = p.getPacketType();
|
||||||
LOG.debug("Worker " + this + " handling session event" +
|
LOG.debug("---- Worker " + this + " handling session event" +
|
||||||
" ( Session = " + s + " Event = " + t + " )");
|
" ( Session = " + s + " Event = " + t + " )");
|
||||||
switch (t) {
|
switch (t) {
|
||||||
case JOB_ASSIGN:
|
case JOB_ASSIGN:
|
||||||
//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
|
//TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
|
||||||
|
LOG.info("---- Worker " + this + " received job assignment");
|
||||||
taskMap.remove(s);
|
taskMap.remove(s);
|
||||||
addNewJob(event);
|
addNewJob(event);
|
||||||
break;
|
break;
|
||||||
case JOB_ASSIGN_UNIQ:
|
case JOB_ASSIGN_UNIQ:
|
||||||
//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
|
//TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
|
||||||
|
LOG.info("---- Worker " + this + " received unique job assignment");
|
||||||
taskMap.remove(s);
|
taskMap.remove(s);
|
||||||
addNewJob(event);
|
addNewJob(event);
|
||||||
break;
|
break;
|
||||||
case NOOP:
|
case NOOP:
|
||||||
taskMap.remove(s);
|
taskMap.remove(s);
|
||||||
LOG.debug("Worker " + this + " sending grab job after wakeup");
|
LOG.debug("---- Worker " + this + " sending grab job after wakeup");
|
||||||
try {
|
try {
|
||||||
sendGrabJob(s);
|
sendGrabJob(s);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn("Interrupted while waiting for okay to send " +
|
LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " +
|
||||||
"grab job", e);
|
"grab job", e);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -411,7 +429,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
// We didn't get a job, so allow other workers or
|
// We didn't get a job, so allow other workers or
|
||||||
// Jenkins to schedule on this node.
|
// Jenkins to schedule on this node.
|
||||||
availability.unlock(this);
|
availability.unlock(this);
|
||||||
LOG.debug("Worker " + this + " sending pre sleep after no_job");
|
LOG.debug("---- Worker " + this + " sending pre sleep after no_job");
|
||||||
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
|
GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
|
||||||
new GearmanPacketImpl(GearmanPacketMagic.REQ,
|
new GearmanPacketImpl(GearmanPacketMagic.REQ,
|
||||||
GearmanPacketType.PRE_SLEEP, new byte[0]));
|
GearmanPacketType.PRE_SLEEP, new byte[0]));
|
||||||
|
@ -426,8 +444,8 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
s.closeSession();
|
s.closeSession();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.warn("Received unknown packet type " + t +
|
LOG.warn("---- Worker " + this + " received unknown packet type " + t +
|
||||||
" from session " + s + ". Closing connection.");
|
" from session " + s + "; closing connection");
|
||||||
s.closeSession();
|
s.closeSession();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -451,7 +469,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
|
|
||||||
reconnect();
|
reconnect();
|
||||||
|
|
||||||
LOG.debug("Added server " + conn + " to worker " + this);
|
LOG.debug("---- Worker " + this + " added server " + conn);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -511,7 +529,7 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
* exceptions because closeSession does not throw an exception
|
* exceptions because closeSession does not throw an exception
|
||||||
*/
|
*/
|
||||||
private List<Exception> shutDownWorker(boolean completeTasks) {
|
private List<Exception> shutDownWorker(boolean completeTasks) {
|
||||||
LOG.info("Commencing shutdowm of worker " + this);
|
LOG.info("---- Worker " + this + " commencing shutdown");
|
||||||
|
|
||||||
ArrayList<Exception> exceptions = new ArrayList<Exception>();
|
ArrayList<Exception> exceptions = new ArrayList<Exception>();
|
||||||
|
|
||||||
|
@ -528,10 +546,10 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
try {
|
try {
|
||||||
ioAvailable.close();
|
ioAvailable.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Encountered IOException while closing selector for worker: ", ioe);
|
LOG.warn("---- Worker " + this + " encountered IOException while closing selector: ", ioe);
|
||||||
}
|
}
|
||||||
state = State.IDLE;
|
state = State.IDLE;
|
||||||
LOG.info("Completed shutdowm of worker " + this);
|
LOG.info("---- Worker " + this + " completed shutdown");
|
||||||
|
|
||||||
return exceptions;
|
return exceptions;
|
||||||
}
|
}
|
||||||
|
@ -578,12 +596,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
|
||||||
// or FAIL; make sure it gets sent.
|
// or FAIL; make sure it gets sent.
|
||||||
session.driveSessionIO();
|
session.driveSessionIO();
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
LOG.warn("Receieved IOException while" +
|
LOG.warn("---- Worker " + this + " receieved IOException while" +
|
||||||
" running function",io);
|
" running function",io);
|
||||||
session.closeSession();
|
session.closeSession();
|
||||||
// The reconnect will unlock the monitor if needed.
|
// The reconnect will unlock the monitor if needed.
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Exception while executing function " + fun.getName(), e);
|
LOG.warn("---- Worker " + this + " exception while executing function " + fun.getName(), e);
|
||||||
// Unlock the monitor for this worker in case we didn't
|
// Unlock the monitor for this worker in case we didn't
|
||||||
// make it as far as the schedule job unlock.
|
// make it as far as the schedule job unlock.
|
||||||
availability.unlock(this);
|
availability.unlock(this);
|
||||||
|
|
|
@ -50,9 +50,6 @@ public class SaveableListenerImpl extends SaveableListener {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void onChange(Saveable o, XmlFile file) {
|
public void onChange(Saveable o, XmlFile file) {
|
||||||
logger.info("---- " + SaveableListenerImpl.class.getName() + ":"
|
|
||||||
+ " onChange");
|
|
||||||
|
|
||||||
// update functions only when gearman-plugin is enabled
|
// update functions only when gearman-plugin is enabled
|
||||||
if (!GearmanPluginConfig.get().enablePlugin()) {
|
if (!GearmanPluginConfig.get().enablePlugin()) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -123,8 +123,6 @@ public class StartJobWorker extends AbstractGearmanFunction {
|
||||||
private GearmanJobResult safeExecuteFunction()
|
private GearmanJobResult safeExecuteFunction()
|
||||||
throws Exception
|
throws Exception
|
||||||
{
|
{
|
||||||
logger.info("---- Running executeFunction in " + name + " ----");
|
|
||||||
|
|
||||||
// decode the uniqueId from the client
|
// decode the uniqueId from the client
|
||||||
String decodedUniqueId = null;
|
String decodedUniqueId = null;
|
||||||
if (this.uniqueId != null) {
|
if (this.uniqueId != null) {
|
||||||
|
@ -165,9 +163,10 @@ public class StartJobWorker extends AbstractGearmanFunction {
|
||||||
availability.expectUUID(decodedUniqueId);
|
availability.expectUUID(decodedUniqueId);
|
||||||
|
|
||||||
// schedule jenkins to build project
|
// schedule jenkins to build project
|
||||||
logger.info("---- Scheduling "+project.getName()+" build #" +
|
logger.info("---- Worker " + this.worker + " scheduling " +
|
||||||
project.getNextBuildNumber()+" on " + runNodeName
|
project.getName()+" build #" +
|
||||||
+ " with UUID " + decodedUniqueId + " and build params " + buildParams);
|
project.getNextBuildNumber()+" on " + runNodeName
|
||||||
|
+ " with UUID " + decodedUniqueId + " and build params " + buildParams);
|
||||||
QueueTaskFuture<?> future = project.scheduleBuild2(0, new Cause.UserIdCause(), actions);
|
QueueTaskFuture<?> future = project.scheduleBuild2(0, new Cause.UserIdCause(), actions);
|
||||||
|
|
||||||
// check build and pass results back to client
|
// check build and pass results back to client
|
||||||
|
|
Loading…
Reference in New Issue