decouple gearman from the gearman configuration

This change is to create a new object to store Gearman
objects and state info.

src/main/java/hudson/plugins/gearman/GearmanProxy.java
  created to keep Gearman state info.

src/main/java/hudson/plugins/gearman/GearmanPluginConfig.java
  simplied this class by removing the core gearman stuff out to
  a GearmanProxy.java class

src/main/java/hudson/plugins/gearman/Constants.java
  Use one logger instead of two.  updated logger reference in all
  of the other files in this checkin

src/main/java/hudson/plugins/gearman/ProjectListener.java
src/main/java/hudson/plugins/gearman/StartJobWorker.java
src/main/java/hudson/plugins/gearman/StopJobWorker.java
src/main/java/hudson/plugins/gearman/ComputerListenerImpl.java
  update references to changed class and methods

Change-Id: I879cdb8839c8b5437bccf6d7e1602c33eff434a6
This commit is contained in:
zaro 2013-03-02 08:02:55 -08:00
parent 5823f98b5c
commit 0e81e014da
9 changed files with 150 additions and 251 deletions

View File

@ -37,17 +37,17 @@ public class ComputerListenerImpl extends ComputerListener {
// on creation of slave
int currNumNodes = Jenkins.getInstance().getNodes().size();
if (GearmanPluginConfig.numExecutorNodes < currNumNodes) {
if (GearmanProxy.numExecutorNodes < currNumNodes) {
Node node = c.getNode();
int slaveExecutors = c.getExecutors().size();
for (int i=0; i<slaveExecutors; i++) {
AbstractWorkerThread gwt = new ExecutorWorkerThread("15.185.117.66", 4730,
node.getNodeName()+"-exec"+Integer.toString(i), node);
gwt.start();
GearmanPluginConfig.gewtHandles.add(gwt);
GearmanProxy.gewtHandles.add(gwt);
}
GearmanPluginConfig.numExecutorNodes = currNumNodes;
logger.info("---- numExecutorNodes = "+GearmanPluginConfig.numExecutorNodes);
GearmanProxy.numExecutorNodes = currNumNodes;
logger.info("---- numExecutorNodes = "+GearmanProxy.numExecutorNodes);
}
}
@ -68,7 +68,7 @@ public class ComputerListenerImpl extends ComputerListener {
//TODO: adjust for an update to executors. Method does not provide the
// computer to know which thread to remove or add
int gearmanWorkers = GearmanPluginConfig.gewtHandles.size();
int gearmanWorkers = GearmanProxy.gewtHandles.size();
int currNumExecutors = Jenkins.getInstance().getNumExecutors();
if (gearmanWorkers < currNumExecutors) { //executor added
// spawn a thread for executor
@ -77,8 +77,8 @@ public class ComputerListenerImpl extends ComputerListener {
}
// if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
// for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
// if (!GearmanProxy.gewtHandles.isEmpty()) {
// for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
// awt.registerJobs();
// }
// }
@ -96,26 +96,26 @@ public class ComputerListenerImpl extends ComputerListener {
// on deletion of slave
int currNumNodes = Jenkins.getInstance().getNodes().size();
if (GearmanPluginConfig.numExecutorNodes > currNumNodes) {
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
GearmanPluginConfig.numExecutorNodes--;
logger.info("---- numExecutorNodes = "+GearmanPluginConfig.numExecutorNodes);
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (GearmanProxy.numExecutorNodes > currNumNodes) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
GearmanProxy.numExecutorNodes--;
logger.info("---- numExecutorNodes = "+GearmanProxy.numExecutorNodes);
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
if (awt.name.contains(c.getName())) {
try {
awt.stop();
}catch (Exception e){
e.printStackTrace();
}
GearmanPluginConfig.gewtHandles.remove(awt);
GearmanProxy.gewtHandles.remove(awt);
}
}
}
}
// on disconnect of node
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}
@ -132,8 +132,8 @@ public class ComputerListenerImpl extends ComputerListener {
}
// on re-connection of node
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}
@ -149,8 +149,8 @@ public class ComputerListenerImpl extends ComputerListener {
return;
}
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}
@ -166,8 +166,8 @@ public class ComputerListenerImpl extends ComputerListener {
return;
}
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}

View File

@ -29,5 +29,4 @@ public interface Constants {
public static final int GEARMAN_DEFAULT_TCP_PORT = 4730;
public static final String PLUGIN_LOGGER_NAME = "hudson.plugins.gearman.logger";
public static final String PLUGIN_EXECTUOR_LOGGER_NAME = "hudson.plugins.gearman.executor.logger";
}

View File

@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
*/
public class ExecutorWorkerThread extends AbstractWorkerThread{
// private static final Logger logger = LoggerFactory
// .getLogger(AbstractWorkerThread.class);
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);

View File

@ -18,21 +18,14 @@
package hudson.plugins.gearman;
import hudson.Extension;
import hudson.model.Computer;
import hudson.model.Descriptor;
import hudson.model.Node;
import hudson.util.FormValidation;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.List;
import java.util.Stack;
import javax.servlet.ServletException;
import jenkins.model.GlobalConfiguration;
import jenkins.model.Jenkins;
import net.sf.json.JSONObject;
import org.kohsuke.stapler.QueryParameter;
@ -54,12 +47,8 @@ public class GearmanPluginConfig extends GlobalConfiguration {
public static boolean launchWorker; // launchWorker state (from UI checkbox)
private String host; // gearman server host
private int port; // gearman server port
GearmanProxy gearmanProxy;
// handles to gearman workers
public static List<AbstractWorkerThread> gewtHandles;
public static List<AbstractWorkerThread> gmwtHandles;
public static int numExecutorNodes;
/**
* Constructor.
@ -67,10 +56,7 @@ public class GearmanPluginConfig extends GlobalConfiguration {
public GearmanPluginConfig() {
logger.info("--- GearmanPluginConfig Constructor ---");
gewtHandles = new Stack<AbstractWorkerThread>();
gmwtHandles = new Stack<AbstractWorkerThread>();
numExecutorNodes = 0;
gearmanProxy = new GearmanProxy();
load();
/*
@ -78,53 +64,10 @@ public class GearmanPluginConfig extends GlobalConfiguration {
* initialize the launch worker flag to disabled state at jenkins
* startup so we are always at a known state
*/
this.launchWorker = Constants.GEARMAN_DEFAULT_LAUNCH_WORKER;
GearmanPluginConfig.launchWorker = Constants.GEARMAN_DEFAULT_LAUNCH_WORKER;
save();
}
/*
* This method checks whether a connection can be made to a host:port
*
* @param host
* the host name
*
* @param port
* the host port
*
* @param timeout
* the timeout (milliseconds) to try the connection
*
* @return
* true if a socket connection can be established otherwise false
*/
public boolean connectionIsAvailable(String host, int port, int timeout) {
InetSocketAddress endPoint = new InetSocketAddress(host, port);
Socket socket = new Socket();
if (endPoint.isUnresolved()) {
System.out.println("Failure " + endPoint);
} else {
try {
socket.connect(endPoint, timeout);
logger.info("Connection Success: "+endPoint);
return true;
} catch (Exception e) {
logger.info("Connection Failure: "+endPoint+" message: "
+e.getClass().getSimpleName()+" - "
+e.getMessage());
} finally {
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
}
}
return false;
}
/*
* This method runs when user clicks Test Connection button.
@ -137,7 +80,7 @@ public class GearmanPluginConfig extends GlobalConfiguration {
@QueryParameter("port") final int port) throws IOException,
ServletException {
if (connectionIsAvailable(host, port, 5000)) {
if (GearmanPluginUtil.connectionIsAvailable(host, port, 5000)) {
return FormValidation.ok("Success");
} else {
return FormValidation.error("Failed: Unable to Connect");
@ -153,102 +96,22 @@ public class GearmanPluginConfig extends GlobalConfiguration {
host = json.getString("host");
port = json.getInt("port");
/*
* Purpose here is to create a 1:1 mapping of 'gearman worker':'jenkins
* executor' then use the gearman worker to execute builds on that
* jenkins nodes
*/
if (launchWorker && gmwtHandles.isEmpty() && gewtHandles.isEmpty()) {
if (launchWorker) {
// check for a valid connection to gearman server
logger.info("--- Check connection to Gearman Server " + getHost() + ":"
+ getPort());
if (!connectionIsAvailable(host, port, 5000)) {
this.launchWorker = false;
throw new RuntimeException(
"Could not get connection to Gearman Server " + getHost()
+ ":" + getPort());
logger.info("--- Check connection to Gearman Server " + host + ":"
+ port);
if (!GearmanPluginUtil.connectionIsAvailable(host, port, 5000)) {
GearmanPluginConfig.launchWorker = false;
throw new RuntimeException("Unable to connect to Gearman Server");
}
/*
* Spawn management executor worker. This worker does not need any
* executors. It only needs to work with gearman.
*/
AbstractWorkerThread gwt = null;
gwt = new ManagementWorkerThread(host, port, host);
gwt.registerJobs();
gwt.start();
gmwtHandles.add(gwt);
gearmanProxy.init_worker(host, port);
/*
* Spawn executors for the jenkins master Need to treat the master
* differently than slaves because the master is not the same as a
* slave
*/
// first make sure master is enabled (or has executors)
Node masterNode = null;
try {
masterNode = Computer.currentComputer().getNode();
} catch (NullPointerException npe) {
logger.info("--- Master is offline");
} catch (Exception e) {
logger.info("--- Can't get Master");
e.printStackTrace();
}
if (masterNode != null) {
Computer computer = masterNode.toComputer();
int executors = computer.getExecutors().size();
for (int i = 0; i < executors; i++) {
// create a gearman worker for every executor on the master
gwt = new ExecutorWorkerThread(host, port, "master-exec"
+ Integer.toString(i), masterNode);
gwt.registerJobs();
gwt.start();
gewtHandles.add(gwt);
}
numExecutorNodes++;
}
/*
* Spawn executors for the jenkins slaves
*/
List<Node> nodes = Jenkins.getInstance().getNodes();
if (!nodes.isEmpty()) {
for (Node node : nodes) {
Computer computer = node.toComputer();
// create a gearman worker for every executor on the slave
int slaveExecutors = computer.getExecutors().size();
for (int i = 0; i < slaveExecutors; i++) {
gwt = new ExecutorWorkerThread(host, port,
node.getNodeName() + "-exec"
+ Integer.toString(i), node);
gwt.registerJobs();
gwt.start();
gewtHandles.add(gwt);
}
numExecutorNodes++;
}
}
} else {
gearmanProxy.stop_all();
}
// stop gearman workers
if (!launchWorker) {
for (AbstractWorkerThread gewtHandle : gewtHandles) { // stop executors
gewtHandle.stop();
}
gewtHandles.clear();
for (AbstractWorkerThread gmwtHandle : gmwtHandles) { // stop executors
gmwtHandle.stop();
}
gmwtHandles.clear();
numExecutorNodes = 0;
}
int runningExecutors = gmwtHandles.size() + gewtHandles.size();
logger.info("--- Num of executors running = " + runningExecutors);
req.bindJSON(this, json);
save();
return true;

View File

@ -0,0 +1,60 @@
package hudson.plugins.gearman;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class GearmanPluginUtil {
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
/*
* This method checks whether a connection can be made to a host:port
*
* @param host
* the host name
*
* @param port
* the host port
*
* @param timeout
* the timeout (milliseconds) to try the connection
*
* @return
* true if a socket connection can be established otherwise false
*/
public static boolean connectionIsAvailable(String host, int port, int timeout) {
InetSocketAddress endPoint = new InetSocketAddress(host, port);
Socket socket = new Socket();
if (endPoint.isUnresolved()) {
System.out.println("Failure " + endPoint);
} else {
try {
socket.connect(endPoint, timeout);
logger.info("Connection Success: "+endPoint);
return true;
} catch (Exception e) {
logger.info("Connection Failure: "+endPoint+" message: "
+e.getClass().getSimpleName()+" - "
+e.getMessage());
} finally {
if (socket != null) {
try {
socket.close();
} catch (Exception e) {
logger.info(e.getMessage());
}
}
}
}
return false;
}
}

View File

@ -32,55 +32,49 @@ import org.slf4j.LoggerFactory;
public class GearmanProxy {
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_LOGGER_NAME);
// handles to gearman workers
// public static List<AbstractWorkerThread> gewtHandles;
// public static List<AbstractWorkerThread> gmwtHandles;
public static Stack<AbstractWorkerThread> gewtHandles;
public static Stack<AbstractWorkerThread> gmwtHandles;
public static List<AbstractWorkerThread> gewtHandles;
public static List<AbstractWorkerThread> gmwtHandles;
public static int numExecutorNodes;
public GearmanProxy() {
// TODO Auto-generated constructor stub
logger.info("--- GearmanProxy Constructor ---");
// gewtHandles = new ArrayList<AbstractWorkerThread>();
// gmwtHandles = new ArrayList<AbstractWorkerThread>();
gewtHandles = new Stack<AbstractWorkerThread>();
gmwtHandles = new Stack<AbstractWorkerThread>();
numExecutorNodes = 0;
}
public void init_worker(String host, int port) {
public void init_worker(String host, int port) throws RuntimeException{
/*
* Purpose here is to create a 1:1 mapping of 'gearman
* worker':'jenkins executor' then use the gearman worker to execute
* builds on that jenkins nodes
* Purpose here is to create a 1:1 mapping of 'gearman worker':'jenkins
* executor' then use the gearman worker to execute builds on that
* jenkins nodes
*/
if (gmwtHandles.isEmpty() && gewtHandles.isEmpty()) {
if (getNumExecutors() == 0) {
/*
* Spawn management executor. This worker does not need any
* executors. It only needs to work with gearman.
* Spawn management executor worker. This worker does not need any
* executors. It only needs to work with gearman.
*/
AbstractWorkerThread gwt = null;
gwt = new ManagementWorkerThread(host, port, host);
AbstractWorkerThread gwt = new ManagementWorkerThread(host, port, host);
gwt.registerJobs();
gwt.start();
gmwtHandles.add(gwt);
/*
* Spawn executors for the jenkins master
* Need to treat the master differently than slaves because
* the master is not the same as a slave
* Spawn executors for the jenkins master Need to treat the master
* differently than slaves because the master is not the same as a
* slave
*/
// make sure master is enabled (or has executors)
// first make sure master is enabled (or has executors)
Node masterNode = null;
try {
masterNode = Computer.currentComputer().getNode();
@ -94,14 +88,15 @@ public class GearmanProxy {
if (masterNode != null) {
Computer computer = masterNode.toComputer();
int executors = computer.getExecutors().size();
for (int i=0; i<executors; i++) {
// create a gearman executor for every jenkins executor
gwt = new ExecutorWorkerThread(host, port,
"master-exec"+Integer.toString(i), masterNode);
for (int i = 0; i < executors; i++) {
// create a gearman worker for every executor on the master
gwt = new ExecutorWorkerThread(host, port, "master-exec"
+ Integer.toString(i), masterNode);
gwt.registerJobs();
gwt.start();
gewtHandles.add(gwt);
}
numExecutorNodes++;
}
/*
@ -111,57 +106,41 @@ public class GearmanProxy {
if (!nodes.isEmpty()) {
for (Node node : nodes) {
Computer computer = node.toComputer();
// if (computer.isOnline()) {
// create a gearman executor for every jenkins executor
int slaveExecutors = computer.getExecutors().size();
for (int i=0; i<slaveExecutors; i++) {
gwt = new ExecutorWorkerThread(host, port,
node.getNodeName()+"-exec"+Integer.toString(i), node);
gwt.registerJobs();
gwt.start();
gewtHandles.add(gwt);
}
// }
// create a gearman worker for every executor on the slave
int slaveExecutors = computer.getExecutors().size();
for (int i = 0; i < slaveExecutors; i++) {
gwt = new ExecutorWorkerThread(host, port,
node.getNodeName() + "-exec"
+ Integer.toString(i), node);
gwt.registerJobs();
gwt.start();
gewtHandles.add(gwt);
}
numExecutorNodes++;
}
}
}
logger.info("--- Num of executors running = "+getRunningExecutors());
logger.info("--- Num of executors running = "+getNumExecutors());
}
public void stop_all() {
//stop gearman executors
// for (int i=0; i<gewtHandles.size(); i++) {
// gewtHandles.get(i).stop();
// gewtHandles.remove(i);
// }
// for (AbstractWorkerThread gewtHandle : gewtHandles) {
// gewtHandle.stop();
// }
// gewtHandles.clear();
// for (int i=0; i<gmwtHandles.size(); i++) {
// gmwtHandles.get(i).stop();
// gmwtHandles.remove(i);
// }
// for (AbstractWorkerThread gmwtHandle : gmwtHandles) {
// gmwtHandle.stop();
// }
// gmwtHandles.clear();
for (AbstractWorkerThread gewtHandle : gewtHandles) {
gewtHandles.pop().stop();
for (AbstractWorkerThread gewtHandle : gewtHandles) { // stop executors
gewtHandle.stop();
}
for (AbstractWorkerThread gmwtHandle : gmwtHandles) {
gmwtHandles.pop().stop();
}
gewtHandles.clear();
logger.info("--- Num of executors running = "+getRunningExecutors());
for (AbstractWorkerThread gmwtHandle : gmwtHandles) { // stop executors
gmwtHandle.stop();
}
gmwtHandles.clear();
numExecutorNodes = 0;
logger.info("--- Num of executors running = "+getNumExecutors());
}
public int getRunningExecutors() {
public int getNumExecutors() {
return gmwtHandles.size()+gewtHandles.size();

View File

@ -28,8 +28,8 @@ public class ProjectListener extends ItemListener
}
// update gearman worker functions on existing threads
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}
@ -45,8 +45,8 @@ public class ProjectListener extends ItemListener
}
// update gearman worker functions on existing threads
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}
@ -62,8 +62,8 @@ public class ProjectListener extends ItemListener
}
// update gearman worker functions on existing threads
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}
@ -79,8 +79,8 @@ public class ProjectListener extends ItemListener
}
// update gearman worker functions on existing threads
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}
@ -96,8 +96,8 @@ public class ProjectListener extends ItemListener
}
// update gearman worker functions on existing threads
if (!GearmanPluginConfig.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanPluginConfig.gewtHandles) {
if (!GearmanProxy.gewtHandles.isEmpty()) {
for (AbstractWorkerThread awt: GearmanProxy.gewtHandles) {
awt.registerJobs();
}
}

View File

@ -59,7 +59,7 @@ import com.google.gson.reflect.TypeToken;
public class StartJobWorker extends AbstractGearmanFunction {
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_EXECTUOR_LOGGER_NAME);
.getLogger(Constants.PLUGIN_LOGGER_NAME);
Node node;
Project<?, ?> project;

View File

@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
public class StopJobWorker extends AbstractGearmanFunction {
private static final Logger logger = LoggerFactory
.getLogger(Constants.PLUGIN_EXECTUOR_LOGGER_NAME);
.getLogger(Constants.PLUGIN_LOGGER_NAME);
/*