The jenkins gearman plugin
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

GearmanProxy.java 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. /*
  2. *
  3. * Copyright 2013 Hewlett-Packard Development Company, L.P.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package hudson.plugins.gearman;
  19. import hudson.model.Computer;
  20. import hudson.model.Node;
  21. import hudson.model.Run;
  22. import hudson.model.Queue;
  23. import hudson.model.queue.CauseOfBlockage;
  24. import java.net.UnknownHostException;
  25. import java.util.ArrayList;
  26. import java.util.Collections;
  27. import java.util.Iterator;
  28. import java.util.List;
  29. import jenkins.model.Jenkins;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. /**
  33. * This class is used to startup and shutdown the gearman workers.
  34. * It is also used to keep gearman plugin state info.
  35. *
  36. * @author Khai Do
  37. */
  38. public class GearmanProxy {
  39. private static GearmanProxy gearmanProxy;
  40. private static final Logger logger = LoggerFactory
  41. .getLogger(Constants.PLUGIN_LOGGER_NAME);
  42. // handles to gearman workers
  43. private final List<ExecutorWorkerThread> gewtHandles;
  44. private final List<ManagementWorkerThread> gmwtHandles;
  45. private final String masterName;
  46. // Singleton instance
  47. public static synchronized GearmanProxy getInstance() {
  48. if (gearmanProxy == null) {
  49. gearmanProxy = new GearmanProxy();
  50. }
  51. return gearmanProxy;
  52. }
  53. // constructor
  54. private GearmanProxy() {
  55. gewtHandles = Collections.synchronizedList(new ArrayList<ExecutorWorkerThread>());
  56. gmwtHandles = Collections.synchronizedList(new ArrayList<ManagementWorkerThread>());
  57. Computer master = null;
  58. String hostname = Constants.GEARMAN_DEFAULT_EXECUTOR_NAME;
  59. // query Jenkins for master's name
  60. try {
  61. master = Jenkins.getActiveInstance().getComputer("");
  62. hostname = master.getHostName();
  63. } catch (Exception e) {
  64. logger.warn("Exception while getting hostname", e);
  65. }
  66. // master node may not be enabled so get masterName from system
  67. if (master == null) {
  68. try {
  69. hostname = java.net.InetAddress.getLocalHost().getHostName();
  70. } catch (UnknownHostException e) {
  71. logger.warn("Exception while getting hostname", e);
  72. }
  73. }
  74. masterName = hostname;
  75. }
  76. /*
  77. * This method is for unit tests only.
  78. */
  79. protected void testResetHandles() {
  80. gmwtHandles.clear();
  81. gewtHandles.clear();
  82. }
  83. /*
  84. * This method initializes the gearman workers.
  85. */
  86. public void initWorkers() {
  87. /*
  88. * Purpose here is to create a 1:1 mapping of 'gearman worker':'jenkins
  89. * executor' then use the gearman worker to execute builds on that
  90. * jenkins nodes
  91. */
  92. /*
  93. * Spawn management executor worker. This worker does not need any
  94. * executors. It only needs to work with gearman.
  95. */
  96. createManagementWorker();
  97. /*
  98. * Spawn executors for the jenkins master Need to treat the master
  99. * differently than slaves because the master is not the same as a
  100. * slave
  101. */
  102. // first make sure master is enabled (or has executors)
  103. Node masterNode = null;
  104. try {
  105. masterNode = Jenkins.getActiveInstance().getComputer("").getNode();
  106. } catch (NullPointerException npe) {
  107. logger.info("---- Master is offline");
  108. } catch (Exception e) {
  109. logger.error("Exception while finding master", e);
  110. }
  111. if (masterNode != null) {
  112. Computer computer = masterNode.toComputer();
  113. if (computer != null) {
  114. createExecutorWorkersOnNode(computer);
  115. }
  116. }
  117. /*
  118. * Spawn executors for the jenkins slaves
  119. */
  120. List<Node> nodes = Jenkins.getActiveInstance().getNodes();
  121. if (!nodes.isEmpty()) {
  122. for (Node node : nodes) {
  123. Computer computer = node.toComputer();
  124. if (computer != null) {
  125. // create a gearman worker for every executor on the slave
  126. createExecutorWorkersOnNode(computer);
  127. }
  128. }
  129. }
  130. logger.info("---- Num of executors running = " + getNumExecutors());
  131. }
  132. /*
  133. * Spawn management executor workers. This worker does not need any
  134. * executors. It only needs to connect to gearman.
  135. */
  136. public void createManagementWorker() {
  137. ManagementWorkerThread gwt;
  138. synchronized (gmwtHandles) {
  139. if (!gmwtHandles.isEmpty()) {
  140. return;
  141. }
  142. gwt = new ManagementWorkerThread(
  143. GearmanPluginConfig.get().getHost(),
  144. GearmanPluginConfig.get().getPort(),
  145. masterName + "_manager",
  146. masterName, new NoopAvailabilityMonitor());
  147. gmwtHandles.add(gwt);
  148. gwt.start();
  149. }
  150. logger.info("---- Num of executors running = " + getNumExecutors());
  151. }
  152. /*
  153. * Spawn workers for each executor on a node.
  154. */
  155. public void createExecutorWorkersOnNode(Computer computer) {
  156. // find the computer in the executor workers list
  157. synchronized(gewtHandles) {
  158. for (ExecutorWorkerThread t : gewtHandles) {
  159. if (t.getComputer() == computer) {
  160. logger.info("---- Executor thread already running for " + computer.getName());
  161. return;
  162. }
  163. }
  164. AvailabilityMonitor availability = new NodeAvailabilityMonitor(computer);
  165. int executors = computer.getExecutors().size();
  166. for (int i = 0; i < executors; i++) {
  167. String nodeName = null;
  168. nodeName = GearmanPluginUtil.getRealName(computer);
  169. if (nodeName == "master") {
  170. nodeName = masterName;
  171. }
  172. ExecutorWorkerThread ewt = new ExecutorWorkerThread(
  173. GearmanPluginConfig.get().getHost(),
  174. GearmanPluginConfig.get().getPort(),
  175. nodeName+"_exec-"+Integer.toString(i),
  176. computer, masterName, availability);
  177. ewt.start();
  178. gewtHandles.add(ewt);
  179. }
  180. }
  181. logger.info("---- Num of executors running = " + getNumExecutors());
  182. }
  183. /*
  184. * This method stops all gearman workers
  185. */
  186. public void stopAll() {
  187. // stop gearman executors
  188. List<AbstractWorkerThread> stopHandles;
  189. synchronized(gewtHandles) {
  190. stopHandles = new ArrayList<AbstractWorkerThread>(gewtHandles);
  191. gewtHandles.clear();
  192. }
  193. for (AbstractWorkerThread wt : stopHandles) { // stop executors
  194. wt.stop();
  195. }
  196. synchronized(gmwtHandles) {
  197. stopHandles = new ArrayList<AbstractWorkerThread>(gmwtHandles);
  198. gmwtHandles.clear();
  199. }
  200. for (AbstractWorkerThread wt : stopHandles) { // stop executors
  201. wt.stop();
  202. }
  203. logger.info("---- Num of executors running = " + getNumExecutors());
  204. }
  205. /*
  206. * This method stops all threads on the gewtHandles list that
  207. * is used to service the jenkins slave/computer
  208. *
  209. *
  210. * @param Node
  211. * The Computer to stop
  212. *
  213. */
  214. public void stop(Computer computer) {
  215. logger.info("---- Stop computer " + computer);
  216. List<ExecutorWorkerThread> workers = new ArrayList<ExecutorWorkerThread>();
  217. synchronized(gewtHandles) {
  218. // find the computer in the executor workers list and stop it
  219. for (Iterator<ExecutorWorkerThread> it = gewtHandles.iterator(); it.hasNext(); ) {
  220. ExecutorWorkerThread t = it.next();
  221. if (t.getComputer() == computer) {
  222. workers.add(t);
  223. it.remove();
  224. }
  225. }
  226. }
  227. for (ExecutorWorkerThread t : workers) {
  228. t.stop();
  229. }
  230. logger.info("---- Num of executors running = " + getNumExecutors());
  231. }
  232. /*
  233. * This method returns the total number of gearman executor threads
  234. */
  235. public int getNumExecutors() {
  236. return gmwtHandles.size() + gewtHandles.size();
  237. }
  238. public void onBuildFinalized(Run r) {
  239. Computer computer = r.getExecutor().getOwner();
  240. // A build just finished, so let the AvailabilityMonitor
  241. // associated with its node wake up any workers who may be
  242. // waiting for the lock.
  243. AvailabilityMonitor availability = null;
  244. synchronized(gewtHandles) {
  245. for (ExecutorWorkerThread t : gewtHandles) {
  246. if (t.getComputer() == computer) {
  247. availability = t.getAvailability();
  248. }
  249. }
  250. }
  251. if (availability != null) {
  252. availability.wake();
  253. }
  254. }
  255. public AvailabilityMonitor getAvailabilityMonitor(Computer computer) {
  256. synchronized (gewtHandles) {
  257. for (ExecutorWorkerThread t : gewtHandles) {
  258. if (t.getComputer() == computer) {
  259. return t.getAvailability();
  260. }
  261. }
  262. }
  263. return null;
  264. }
  265. public CauseOfBlockage canTake(Node node,
  266. Queue.BuildableItem item) {
  267. // Ask the AvailabilityMonitor for this node if it's okay to
  268. // run this build.
  269. ExecutorWorkerThread workerThread = null;
  270. synchronized(gewtHandles) {
  271. Computer computer = node.toComputer();
  272. for (ExecutorWorkerThread t : gewtHandles) {
  273. if (t.getComputer() == computer) {
  274. workerThread = t;
  275. break;
  276. }
  277. }
  278. }
  279. if (workerThread != null) {
  280. if (workerThread.getAvailability().canTake(item)) {
  281. return null;
  282. } else {
  283. return new CauseOfBlockage.BecauseNodeIsBusy(node);
  284. }
  285. }
  286. return null;
  287. }
  288. public void registerJobs() {
  289. synchronized(gewtHandles) {
  290. for (ExecutorWorkerThread worker : gewtHandles) {
  291. worker.registerJobs();
  292. }
  293. }
  294. }
  295. }