The jenkins gearman plugin
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MyGearmanWorkerImpl.java 22KB


  1. /*
  2. *
  3. * Copyright 2013 OpenStack Foundation
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. /*
  19. * This is adapted from gearman-java with the following license
  20. *
  21. * Copyright (C) 2013 by Eric Lambert <eric.d.lambert@gmail.com>
  22. * Use and distribution licensed under the BSD license. See
  23. * the COPYING file in the parent directory for full text.
  24. */
  25. package hudson.plugins.gearman;
  26. import java.io.IOException;
  27. import java.nio.channels.SelectionKey;
  28. import java.nio.channels.Selector;
  29. import java.util.ArrayList;
  30. import java.util.HashMap;
  31. import java.util.HashSet;
  32. import java.util.Iterator;
  33. import java.util.LinkedList;
  34. import java.util.List;
  35. import java.util.Map;
  36. import java.util.Queue;
  37. import java.util.Set;
  38. import java.util.concurrent.ConcurrentLinkedQueue;
  39. import java.util.concurrent.ExecutorService;
  40. import org.gearman.common.Constants;
  41. import org.gearman.common.GearmanException;
  42. import org.gearman.common.GearmanJobServerConnection;
  43. import org.gearman.common.GearmanJobServerIpConnectionFactory;
  44. import org.gearman.common.GearmanJobServerSession;
  45. import org.gearman.common.GearmanNIOJobServerConnectionFactory;
  46. import org.gearman.common.GearmanPacket;
  47. import org.gearman.common.GearmanPacket.DataComponentName;
  48. import org.gearman.common.GearmanPacketImpl;
  49. import org.gearman.common.GearmanPacketMagic;
  50. import org.gearman.common.GearmanPacketType;
  51. import org.gearman.common.GearmanServerResponseHandler;
  52. import org.gearman.common.GearmanSessionEvent;
  53. import org.gearman.common.GearmanSessionEventHandler;
  54. import org.gearman.common.GearmanTask;
  55. import org.gearman.worker.DefaultGearmanFunctionFactory;
  56. import org.gearman.worker.GearmanFunction;
  57. import org.gearman.worker.GearmanFunctionFactory;
  58. import org.gearman.worker.GearmanWorker;
  59. import org.gearman.util.ByteUtils;
  60. import org.slf4j.LoggerFactory;
  61. public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
  62. static public enum State {
  63. IDLE, RUNNING, SHUTTINGDOWN
  64. }
  65. private static final String DESCRIPION_PREFIX = "GearmanWorker";
  66. private ConcurrentLinkedQueue<GearmanSessionEvent> eventList = null;
  67. private Selector ioAvailable = null;
  68. private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(
  69. Constants.GEARMAN_WORKER_LOGGER_NAME);
  70. private String id;
  71. private Map<String, FunctionDefinition> functionMap;
  72. private State state;
  73. private ExecutorService executorService;
  74. private GearmanJobServerSession session = null;
  75. private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory();
  76. private volatile boolean jobUniqueIdRequired = false;
  77. private FunctionRegistry functionRegistry;
  78. private AvailabilityMonitor availability;
  79. class GrabJobEventHandler implements GearmanServerResponseHandler {
  80. private final GearmanJobServerSession session;
  81. private boolean isDone = false;
  82. GrabJobEventHandler(GearmanJobServerSession session) {
  83. super();
  84. this.session = session;
  85. }
  86. public void handleEvent(GearmanPacket event) throws GearmanException {
  87. handleSessionEvent(new GearmanSessionEvent(event, session));
  88. isDone = true;
  89. }
  90. public boolean isDone() {
  91. return isDone;
  92. }
  93. }
  94. static class FunctionDefinition {
  95. private final long timeout;
  96. private final GearmanFunctionFactory factory;
  97. FunctionDefinition(long timeout, GearmanFunctionFactory factory) {
  98. this.timeout = timeout;
  99. this.factory = factory;
  100. }
  101. long getTimeout() {
  102. return timeout;
  103. }
  104. GearmanFunctionFactory getFactory() {
  105. return factory;
  106. }
  107. }
  108. class FunctionRegistry {
  109. private Set<GearmanFunctionFactory> functions;
  110. private boolean updated = false;
  111. FunctionRegistry() {
  112. functions = new HashSet<GearmanFunctionFactory>();
  113. }
  114. public synchronized Set<GearmanFunctionFactory> getFunctions(){
  115. if (updated) {
  116. updated = false;
  117. return functions;
  118. } else {
  119. return null;
  120. }
  121. }
  122. public synchronized void setFunctions(Set<GearmanFunctionFactory> functions){
  123. this.functions = functions;
  124. this.updated = true;
  125. }
  126. public synchronized void setUpdated(boolean updated) {
  127. this.updated = updated;
  128. }
  129. }
  130. public void reconnect() {
  131. LOG.info("---- Worker " + this + " starting reconnect for " + session.toString());
  132. // In case we held the availability lock earlier, release it.
  133. availability.unlock(this);
  134. try {
  135. session.initSession(ioAvailable, this);
  136. if (id != null) {
  137. sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
  138. GearmanPacketType.SET_CLIENT_ID,
  139. ByteUtils.toUTF8Bytes(id)));
  140. }
  141. // Reset events so that we don't process events from the old
  142. // connection.
  143. eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
  144. // this will cause a grab-job event
  145. functionRegistry.setUpdated(true);
  146. } catch (IOException e) {
  147. try {
  148. Thread.sleep(2000);
  149. } catch (InterruptedException e1) {
  150. LOG.warn("---- Worker " + this + " interrupted while reconnecting", e);
  151. return;
  152. }
  153. }
  154. LOG.info("---- Worker " + this + " ending reconnect for " + session.toString());
  155. }
  156. public MyGearmanWorkerImpl(AvailabilityMonitor availability) {
  157. this (null, availability);
  158. }
  159. public MyGearmanWorkerImpl(ExecutorService executorService,
  160. AvailabilityMonitor availability) {
  161. this.availability = availability;
  162. eventList = new ConcurrentLinkedQueue<GearmanSessionEvent>();
  163. id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId();
  164. functionMap = new HashMap<String, FunctionDefinition>();
  165. state = State.IDLE;
  166. this.executorService = executorService;
  167. functionRegistry = new FunctionRegistry();
  168. try {
  169. ioAvailable = Selector.open();
  170. } catch (IOException ioe) {
  171. LOG.warn("---- Worker " + this + " failed to open IO selector", ioe);
  172. }
  173. }
  174. @Override
  175. public String toString() {
  176. return id;
  177. }
  178. public void setFunctions(Set<GearmanFunctionFactory> functions) {
  179. LOG.info("---- Worker " + this + " registering " + functions.size() + " functions");
  180. functionRegistry.setFunctions(functions);
  181. ioAvailable.wakeup();
  182. }
  183. /**
  184. * This is a small lie -- it only returns the functions it has been
  185. * instructed to register, not the ones it has actually gotton around
  186. * to registering. This is mostly here for tests.
  187. **/
  188. public Set getRegisteredFunctions() {
  189. Set<String> ret = new HashSet<String>();
  190. Set<GearmanFunctionFactory> functions = functionRegistry.getFunctions();
  191. if (functions == null) {
  192. return ret;
  193. }
  194. for (GearmanFunctionFactory factory: functions) {
  195. ret.add(factory.getFunctionName());
  196. }
  197. return ret;
  198. }
  199. private void registerFunctions() throws IOException {
  200. Set<GearmanFunctionFactory> functions = functionRegistry.getFunctions();
  201. if (functions == null) {
  202. return;
  203. }
  204. functionMap.clear();
  205. sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
  206. GearmanPacketType.RESET_ABILITIES, new byte[0]));
  207. session.driveSessionIO();
  208. if (!isRunning()) return;
  209. for (GearmanFunctionFactory factory: functions) {
  210. FunctionDefinition def = new FunctionDefinition(0, factory);
  211. functionMap.put(factory.getFunctionName(), def);
  212. sendToAll(generateCanDoPacket(def));
  213. session.driveSessionIO();
  214. if (!isRunning()) return;
  215. LOG.debug("---- Worker " + this + " registered function " +
  216. factory.getFunctionName());
  217. }
  218. GearmanSessionEvent nextEvent = eventList.peek();
  219. if (nextEvent == null ||
  220. nextEvent.getPacket().getPacketType() != GearmanPacketType.NOOP) {
  221. // Simulate a NOOP packet which will kick off a GRAB_JOB cycle
  222. // if we're sleeping. If we get a real NOOP in the mean time,
  223. // it should be fine because GearmanJobServerSession ignores a
  224. // NOOP if PRE_SLEEP is not on the stack.
  225. GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES,
  226. GearmanPacketType.NOOP, new byte[0]);
  227. GearmanSessionEvent event = new GearmanSessionEvent(p, session);
  228. session.handleSessionEvent(event);
  229. }
  230. }
  231. public void enqueueNoopEvent() {
  232. // Simulate a NOOP packet which will kick off a GRAB_JOB cycle.
  233. // This unconditionally enqueues the NOOP which will send a GRAB_JOB
  234. // and should only be used when you know you need to send a GRAB_JOB.
  235. // Cases like worker start, post function run, post failure.
  236. GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES,
  237. GearmanPacketType.NOOP, new byte[0]);
  238. GearmanSessionEvent event = new GearmanSessionEvent(p, session);
  239. enqueueEvent(event);
  240. }
  241. public void work() {
  242. GearmanSessionEvent event = null;
  243. GearmanFunction function = null;
  244. LOG.info("---- Worker " + this + " starting work");
  245. if (!state.equals(State.IDLE)) {
  246. throw new IllegalStateException("Can not call work while worker " +
  247. "is running or shutting down");
  248. }
  249. state = State.RUNNING;
  250. // When we first start working we will already be initialized so must
  251. // enqueue a Noop event to trigger GRAB_JOB here.
  252. enqueueNoopEvent();
  253. while (isRunning()) {
  254. LOG.debug("---- Worker " + this + " top of run loop");
  255. if (!session.isInitialized()) {
  256. LOG.debug("---- Worker " + this + " run loop reconnect");
  257. reconnect();
  258. enqueueNoopEvent();
  259. // Restart loop to check we connected.
  260. continue;
  261. }
  262. try {
  263. LOG.debug("---- Worker " + this + " run loop register functions");
  264. registerFunctions();
  265. } catch (IOException io) {
  266. LOG.warn("---- Worker " + this + " receieved IOException while" +
  267. " registering functions", io);
  268. session.closeSession();
  269. continue;
  270. }
  271. if (!isRunning() || !session.isInitialized()) continue;
  272. event = eventList.poll();
  273. function = processSessionEvent(event);
  274. if (!isRunning() || !session.isInitialized()) continue;
  275. // For the time being we will execute the jobs synchronously
  276. // in the future, I expect to change this.
  277. if (function != null) {
  278. LOG.info("---- Worker " + this + " executing function");
  279. submitFunction(function);
  280. // Send another grab_job on the next loop
  281. enqueueNoopEvent();
  282. // Skip IO as submitFunction drives the IO for function
  283. // running.
  284. continue;
  285. }
  286. if (!isRunning() || !session.isInitialized()) continue;
  287. // Run IO, select waiting for ability to read and/or write
  288. // then read and/or write.
  289. int interestOps = SelectionKey.OP_READ;
  290. if (session.sessionHasDataToWrite()) {
  291. interestOps |= SelectionKey.OP_WRITE;
  292. }
  293. session.getSelectionKey().interestOps(interestOps);
  294. try {
  295. ioAvailable.select();
  296. } catch (IOException io) {
  297. LOG.warn("---- Worker " + this + " receieved IOException while" +
  298. " selecting for IO", io);
  299. session.closeSession();
  300. continue;
  301. }
  302. if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) {
  303. LOG.debug("---- Worker " + this + " received input in run loop");
  304. if (!session.isInitialized()) {
  305. LOG.debug("---- Worker " + this + " session is no longer initialized");
  306. continue;
  307. }
  308. try {
  309. session.driveSessionIO();
  310. } catch (IOException io) {
  311. LOG.warn("---- Worker " + this + " received IOException while driving" +
  312. " IO on session " + session, io);
  313. session.closeSession();
  314. continue;
  315. }
  316. }
  317. LOG.debug("---- Worker " + this + " run loop finished driving session io");
  318. }
  319. shutDownWorker(true);
  320. }
  321. private void sendGrabJob(GearmanJobServerSession s) throws InterruptedException {
  322. // If we can get the lock, this will prevent other workers and
  323. // Jenkins itself from scheduling builds on this node. If we
  324. // can not get the lock, this will wait for it.
  325. availability.lock(this);
  326. GearmanTask grabJobTask = new GearmanTask(
  327. new GrabJobEventHandler(s),
  328. new GearmanPacketImpl(GearmanPacketMagic.REQ,
  329. getGrabJobPacketType(), new byte[0]));
  330. s.submitTask(grabJobTask);
  331. }
  332. public void handleSessionEvent(GearmanSessionEvent event)
  333. throws IllegalArgumentException, IllegalStateException {
  334. enqueueEvent(event);
  335. }
  336. public void enqueueEvent(GearmanSessionEvent event) {
  337. // Enqueue in a thread safe manner. Events will
  338. // be pulled off and processed serially in this workers
  339. // main thread.
  340. eventList.add(event);
  341. }
  342. private GearmanFunction processSessionEvent(GearmanSessionEvent event)
  343. throws IllegalArgumentException, IllegalStateException {
  344. if (event != null) {
  345. GearmanPacket p = event.getPacket();
  346. GearmanJobServerSession s = event.getSession();
  347. GearmanPacketType t = p.getPacketType();
  348. LOG.debug("---- Worker " + this + " handling session event" +
  349. " ( Session = " + s + " Event = " + t + " )");
  350. switch (t) {
  351. case JOB_ASSIGN:
  352. //TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true
  353. LOG.info("---- Worker " + this + " received job assignment");
  354. return addNewJob(event);
  355. case JOB_ASSIGN_UNIQ:
  356. //TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false
  357. LOG.info("---- Worker " + this + " received unique job assignment");
  358. return addNewJob(event);
  359. case NOOP:
  360. LOG.debug("---- Worker " + this + " sending grab job after wakeup");
  361. try {
  362. sendGrabJob(s);
  363. } catch (InterruptedException e) {
  364. LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " +
  365. "grab job", e);
  366. }
  367. break;
  368. case NO_JOB:
  369. // We didn't get a job, so allow other workers or
  370. // Jenkins to schedule on this node.
  371. availability.unlock(this);
  372. LOG.debug("---- Worker " + this + " sending pre sleep after no_job");
  373. GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s),
  374. new GearmanPacketImpl(GearmanPacketMagic.REQ,
  375. GearmanPacketType.PRE_SLEEP, new byte[0]));
  376. s.submitTask(preSleepTask);
  377. break;
  378. case ECHO_RES:
  379. break;
  380. case OPTION_RES:
  381. break;
  382. case ERROR:
  383. s.closeSession();
  384. break;
  385. default:
  386. LOG.warn("---- Worker " + this + " received unknown packet type " + t +
  387. " from session " + s + "; closing connection");
  388. s.closeSession();
  389. }
  390. }
  391. return null;
  392. }
  393. public boolean addServer(String host, int port) {
  394. return addServer(connFactory.createConnection(host, port));
  395. }
  396. public boolean addServer(GearmanJobServerConnection conn)
  397. throws IllegalArgumentException, IllegalStateException {
  398. if (conn == null) {
  399. throw new IllegalArgumentException("Connection can not be null");
  400. }
  401. if (session != null) {
  402. return true;
  403. }
  404. session = new GearmanJobServerSession(conn);
  405. reconnect();
  406. LOG.debug("---- Worker " + this + " added server " + conn);
  407. return true;
  408. }
  409. public void setWorkerID(String id) throws IllegalArgumentException {
  410. if (id == null) {
  411. throw new IllegalArgumentException("Worker ID may not be null");
  412. }
  413. this.id = id;
  414. if (session.isInitialized()) {
  415. sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
  416. GearmanPacketType.SET_CLIENT_ID,
  417. ByteUtils.toUTF8Bytes(id)));
  418. }
  419. }
  420. public String getWorkerID() {
  421. return id;
  422. }
  423. public void stop() {
  424. state = State.SHUTTINGDOWN;
  425. }
  426. public List<Exception> shutdown() {
  427. return shutDownWorker(false);
  428. }
  429. public boolean isRunning() {
  430. return state.equals(State.RUNNING);
  431. }
  432. public void setJobUniqueIdRequired(boolean requiresJobUUID) {
  433. jobUniqueIdRequired = requiresJobUUID;
  434. }
  435. public boolean isJobUniqueIdRequired() {
  436. return jobUniqueIdRequired;
  437. }
  438. private GearmanPacket generateCanDoPacket(FunctionDefinition def) {
  439. GearmanPacketType pt = GearmanPacketType.CAN_DO;
  440. byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
  441. return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
  442. }
  443. private void sendToAll(GearmanPacket p) {
  444. sendToAll(null, p);
  445. }
  446. private void sendToAll(GearmanServerResponseHandler handler, GearmanPacket p) {
  447. GearmanTask gsr = new GearmanTask(handler, p);
  448. session.submitTask(gsr);
  449. }
  450. /*
  451. * For the time being this will always return an empty list of
  452. * exceptions because closeSession does not throw an exception
  453. */
  454. private List<Exception> shutDownWorker(boolean completeTasks) {
  455. LOG.info("---- Worker " + this + " commencing shutdown");
  456. ArrayList<Exception> exceptions = new ArrayList<Exception>();
  457. // This gives any jobs in flight a chance to complete
  458. if (executorService != null) {
  459. if (completeTasks) {
  460. executorService.shutdown();
  461. } else {
  462. executorService.shutdownNow();
  463. }
  464. }
  465. session.closeSession();
  466. try {
  467. ioAvailable.close();
  468. } catch (IOException ioe) {
  469. LOG.warn("---- Worker " + this + " encountered IOException while closing selector: ", ioe);
  470. }
  471. state = State.IDLE;
  472. LOG.info("---- Worker " + this + " completed shutdown");
  473. return exceptions;
  474. }
  475. private GearmanFunction addNewJob(GearmanSessionEvent event) {
  476. byte[] handle, data, functionNameBytes, unique;
  477. GearmanPacket p = event.getPacket();
  478. String functionName;
  479. handle = p.getDataComponentValue(
  480. GearmanPacket.DataComponentName.JOB_HANDLE);
  481. functionNameBytes = p.getDataComponentValue(
  482. GearmanPacket.DataComponentName.FUNCTION_NAME);
  483. data = p.getDataComponentValue(
  484. GearmanPacket.DataComponentName.DATA);
  485. unique = p.getDataComponentValue(DataComponentName.UNIQUE_ID);
  486. functionName = ByteUtils.fromUTF8Bytes(functionNameBytes);
  487. FunctionDefinition def = functionMap.get(functionName);
  488. if (def == null) {
  489. GearmanTask gsr = new GearmanTask(
  490. new GearmanPacketImpl(GearmanPacketMagic.REQ,
  491. GearmanPacketType.WORK_FAIL, handle));
  492. session.submitTask(gsr);
  493. availability.unlock(this);
  494. enqueueNoopEvent();
  495. } else {
  496. GearmanFunction function = def.getFactory().getFunction();
  497. function.setData(data);
  498. function.setJobHandle(handle);
  499. function.registerEventListener(session);
  500. if (unique != null && unique.length > 0) {
  501. function.setUniqueId(unique);
  502. }
  503. return function;
  504. }
  505. return null;
  506. }
  507. private void submitFunction(GearmanFunction fun) {
  508. try {
  509. if (executorService == null) {
  510. fun.call();
  511. } else {
  512. executorService.submit(fun);
  513. }
  514. // We should have submitted either a WORK_EXCEPTION, COMPLETE,
  515. // or FAIL; make sure it gets sent.
  516. session.driveSessionIO();
  517. } catch (IOException io) {
  518. LOG.warn("---- Worker " + this + " receieved IOException while" +
  519. " running function",io);
  520. session.closeSession();
  521. } catch (Exception e) {
  522. LOG.warn("---- Worker " + this + " exception while executing function " + fun.getName(), e);
  523. }
  524. // Unlock the monitor for this worker
  525. availability.unlock(this);
  526. }
  527. private GearmanPacketType getGrabJobPacketType() {
  528. if (jobUniqueIdRequired) {
  529. return GearmanPacketType.GRAB_JOB_UNIQ;
  530. }
  531. return GearmanPacketType.GRAB_JOB;
  532. }
  533. }