/* * * Copyright 2013 OpenStack Foundation * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ /* * This is adapted from gearman-java with the following license * * Copyright (C) 2013 by Eric Lambert * Use and distribution licensed under the BSD license. See * the COPYING file in the parent directory for full text. */ package hudson.plugins.gearman; import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import org.gearman.common.Constants; import org.gearman.common.GearmanException; import org.gearman.common.GearmanJobServerConnection; import org.gearman.common.GearmanJobServerIpConnectionFactory; import org.gearman.common.GearmanJobServerSession; import org.gearman.common.GearmanNIOJobServerConnectionFactory; import org.gearman.common.GearmanPacket; import org.gearman.common.GearmanPacket.DataComponentName; import org.gearman.common.GearmanPacketImpl; import org.gearman.common.GearmanPacketMagic; import org.gearman.common.GearmanPacketType; import org.gearman.common.GearmanServerResponseHandler; import org.gearman.common.GearmanSessionEvent; import org.gearman.common.GearmanSessionEventHandler; import org.gearman.common.GearmanTask; import org.gearman.worker.DefaultGearmanFunctionFactory; import org.gearman.worker.GearmanFunction; import org.gearman.worker.GearmanFunctionFactory; import org.gearman.worker.GearmanWorker; import org.gearman.util.ByteUtils; import org.slf4j.LoggerFactory; public class MyGearmanWorkerImpl implements GearmanSessionEventHandler { static public enum State { IDLE, RUNNING, SHUTTINGDOWN } private static final String DESCRIPION_PREFIX = "GearmanWorker"; private ConcurrentLinkedQueue eventList = null; private Selector ioAvailable = null; private static final org.slf4j.Logger LOG = LoggerFactory.getLogger( Constants.GEARMAN_WORKER_LOGGER_NAME); private String id; private Map functionMap; private State state; private ExecutorService executorService; private GearmanJobServerSession session = null; private final GearmanJobServerIpConnectionFactory connFactory = new GearmanNIOJobServerConnectionFactory(); private volatile boolean jobUniqueIdRequired = false; private FunctionRegistry functionRegistry; private AvailabilityMonitor availability; class GrabJobEventHandler implements GearmanServerResponseHandler { private final GearmanJobServerSession session; private boolean isDone = false; GrabJobEventHandler(GearmanJobServerSession session) { super(); this.session = session; } public void handleEvent(GearmanPacket event) throws GearmanException { handleSessionEvent(new GearmanSessionEvent(event, session)); isDone = true; } public boolean isDone() { return isDone; } } static class FunctionDefinition { private final long timeout; private final GearmanFunctionFactory factory; FunctionDefinition(long timeout, GearmanFunctionFactory factory) { this.timeout = timeout; this.factory = factory; } long getTimeout() { return timeout; } GearmanFunctionFactory getFactory() { return factory; } } class FunctionRegistry { private Set functions; private boolean updated = false; FunctionRegistry() { functions = new HashSet(); } public synchronized Set getFunctions(){ if (updated) { updated = false; return functions; } else { return null; } } public synchronized void setFunctions(Set functions){ this.functions = functions; this.updated = true; } public synchronized void setUpdated(boolean updated) { this.updated = updated; } } public void reconnect() { LOG.info("---- Worker " + this + " starting reconnect for " + session.toString()); // In case we held the availability lock earlier, release it. availability.unlock(this); try { session.initSession(ioAvailable, this); if (id != null) { sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.SET_CLIENT_ID, ByteUtils.toUTF8Bytes(id))); } // Reset events so that we don't process events from the old // connection. eventList = new ConcurrentLinkedQueue(); // this will cause a grab-job event functionRegistry.setUpdated(true); // Make sure we reset the function list functionMap.clear(); } catch (IOException e) { try { Thread.sleep(2000); } catch (InterruptedException e1) { LOG.warn("---- Worker " + this + " interrupted while reconnecting", e); return; } } LOG.info("---- Worker " + this + " ending reconnect for " + session.toString()); } public MyGearmanWorkerImpl(AvailabilityMonitor availability) { this (null, availability); } public MyGearmanWorkerImpl(ExecutorService executorService, AvailabilityMonitor availability) { this.availability = availability; eventList = new ConcurrentLinkedQueue(); id = DESCRIPION_PREFIX + ":" + Thread.currentThread().getId(); functionMap = new HashMap(); state = State.IDLE; this.executorService = executorService; functionRegistry = new FunctionRegistry(); try { ioAvailable = Selector.open(); } catch (IOException ioe) { LOG.warn("---- Worker " + this + " failed to open IO selector", ioe); } } @Override public String toString() { return id; } public void setFunctions(Set functions) { LOG.info("---- Worker " + this + " registering " + functions.size() + " functions"); functionRegistry.setFunctions(functions); ioAvailable.wakeup(); } /** * This is a small lie -- it only returns the functions it has been * instructed to register, not the ones it has actually gotton around * to registering. This is mostly here for tests. **/ public Set getRegisteredFunctions() { Set ret = new HashSet(); Set functions = functionRegistry.getFunctions(); if (functions == null) { return ret; } for (GearmanFunctionFactory factory: functions) { ret.add(factory.getFunctionName()); } return ret; } private void registerFunctions() throws IOException { Set functions = functionRegistry.getFunctions(); if (functions == null) { return; } HashMap newFunctionMap = new HashMap(); // If we have no previous data then reset abilities to be sure the // gearman server has no stale data that we don't know about. // Or if we have no functions anymore just reset everything, we don't // need a CANT_DO per lost function. if (functions.isEmpty() || functionMap.isEmpty()) { sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.RESET_ABILITIES, new byte[0])); session.driveSessionIO(); LOG.debug("---- Worker " + this + " reset functions"); if (!isRunning()) { // Ensure we start from scratch on reconnection. functionMap.clear(); return; } } // Now only update if we have data to update. if (!functions.isEmpty()) { for (GearmanFunctionFactory factory: functions) { FunctionDefinition def = new FunctionDefinition(0, factory); newFunctionMap.put(factory.getFunctionName(), def); if (!functionMap.containsKey(factory.getFunctionName())) { sendToAll(generateCanDoPacket(def)); session.driveSessionIO(); if (!isRunning()) { // Ensure we start from scratch on reconnection. functionMap.clear(); return; } LOG.debug("---- Worker " + this + " registered function " + factory.getFunctionName()); } functionMap.remove(factory.getFunctionName()); } for (FunctionDefinition def: functionMap.values()) { sendToAll(generateCantDoPacket(def)); session.driveSessionIO(); if (!isRunning()) { // Ensure we start from scratch on reconnection. functionMap.clear(); return; } LOG.debug("---- Worker " + this + " unregistered function " + def.getFactory().getFunctionName()); } } functionMap = newFunctionMap; GearmanSessionEvent nextEvent = eventList.peek(); if (nextEvent == null || nextEvent.getPacket().getPacketType() != GearmanPacketType.NOOP) { // Simulate a NOOP packet which will kick off a GRAB_JOB cycle // if we're sleeping. If we get a real NOOP in the mean time, // it should be fine because GearmanJobServerSession ignores a // NOOP if PRE_SLEEP is not on the stack. GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES, GearmanPacketType.NOOP, new byte[0]); GearmanSessionEvent event = new GearmanSessionEvent(p, session); session.handleSessionEvent(event); } } public void enqueueNoopEvent() { // Simulate a NOOP packet which will kick off a GRAB_JOB cycle. // This unconditionally enqueues the NOOP which will send a GRAB_JOB // and should only be used when you know you need to send a GRAB_JOB. // Cases like worker start, post function run, post failure. GearmanPacket p = new GearmanPacketImpl(GearmanPacketMagic.RES, GearmanPacketType.NOOP, new byte[0]); GearmanSessionEvent event = new GearmanSessionEvent(p, session); enqueueEvent(event); } public void work() { GearmanSessionEvent event = null; GearmanFunction function = null; LOG.info("---- Worker " + this + " starting work"); if (!state.equals(State.IDLE)) { throw new IllegalStateException("Can not call work while worker " + "is running or shutting down"); } state = State.RUNNING; // When we first start working we will already be initialized so must // enqueue a Noop event to trigger GRAB_JOB here. enqueueNoopEvent(); while (isRunning()) { LOG.debug("---- Worker " + this + " top of run loop"); if (!session.isInitialized()) { LOG.debug("---- Worker " + this + " run loop reconnect"); reconnect(); enqueueNoopEvent(); // Restart loop to check we connected. continue; } try { LOG.debug("---- Worker " + this + " run loop register functions"); registerFunctions(); } catch (IOException io) { LOG.warn("---- Worker " + this + " receieved IOException while" + " registering functions", io); session.closeSession(); continue; } if (!isRunning() || !session.isInitialized()) continue; event = eventList.poll(); function = processSessionEvent(event); if (!isRunning() || !session.isInitialized()) continue; // For the time being we will execute the jobs synchronously // in the future, I expect to change this. if (function != null) { LOG.info("---- Worker " + this + " executing function"); submitFunction(function); // Send another grab_job on the next loop enqueueNoopEvent(); // Skip IO as submitFunction drives the IO for function // running. continue; } if (!isRunning() || !session.isInitialized()) continue; // Run IO, select waiting for ability to read and/or write // then read and/or write. int interestOps = SelectionKey.OP_READ; if (session.sessionHasDataToWrite()) { interestOps |= SelectionKey.OP_WRITE; } session.getSelectionKey().interestOps(interestOps); try { ioAvailable.select(); } catch (IOException io) { LOG.warn("---- Worker " + this + " receieved IOException while" + " selecting for IO", io); session.closeSession(); continue; } if (ioAvailable.selectedKeys().contains(session.getSelectionKey())) { LOG.debug("---- Worker " + this + " received input in run loop"); if (!session.isInitialized()) { LOG.debug("---- Worker " + this + " session is no longer initialized"); continue; } try { session.driveSessionIO(); } catch (IOException io) { LOG.warn("---- Worker " + this + " received IOException while driving" + " IO on session " + session, io); session.closeSession(); continue; } } LOG.debug("---- Worker " + this + " run loop finished driving session io"); } shutDownWorker(true); } private void sendGrabJob(GearmanJobServerSession s) throws InterruptedException { // If we can get the lock, this will prevent other workers and // Jenkins itself from scheduling builds on this node. If we // can not get the lock, this will wait for it. availability.lock(this); GearmanTask grabJobTask = new GearmanTask( new GrabJobEventHandler(s), new GearmanPacketImpl(GearmanPacketMagic.REQ, getGrabJobPacketType(), new byte[0])); s.submitTask(grabJobTask); } public void handleSessionEvent(GearmanSessionEvent event) throws IllegalArgumentException, IllegalStateException { enqueueEvent(event); } public void enqueueEvent(GearmanSessionEvent event) { // Enqueue in a thread safe manner. Events will // be pulled off and processed serially in this workers // main thread. eventList.add(event); } private GearmanFunction processSessionEvent(GearmanSessionEvent event) throws IllegalArgumentException, IllegalStateException { if (event != null) { GearmanPacket p = event.getPacket(); GearmanJobServerSession s = event.getSession(); GearmanPacketType t = p.getPacketType(); LOG.debug("---- Worker " + this + " handling session event" + " ( Session = " + s + " Event = " + t + " )"); switch (t) { case JOB_ASSIGN: //TODO Figure out what the right behavior is if JobUUIDRequired was false when we submitted but is now true LOG.info("---- Worker " + this + " received job assignment"); return addNewJob(event); case JOB_ASSIGN_UNIQ: //TODO Figure out what the right behavior is if JobUUIDRequired was true when we submitted but is now false LOG.info("---- Worker " + this + " received unique job assignment"); return addNewJob(event); case NOOP: LOG.debug("---- Worker " + this + " sending grab job after wakeup"); try { sendGrabJob(s); } catch (InterruptedException e) { LOG.warn("---- Worker " + this + " interrupted while waiting for okay to send " + "grab job", e); } break; case NO_JOB: // We didn't get a job, so allow other workers or // Jenkins to schedule on this node. availability.unlock(this); LOG.debug("---- Worker " + this + " sending pre sleep after no_job"); GearmanTask preSleepTask = new GearmanTask(new GrabJobEventHandler(s), new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.PRE_SLEEP, new byte[0])); s.submitTask(preSleepTask); break; case ECHO_RES: break; case OPTION_RES: break; case ERROR: s.closeSession(); break; default: LOG.warn("---- Worker " + this + " received unknown packet type " + t + " from session " + s + "; closing connection"); s.closeSession(); } } return null; } public boolean addServer(String host, int port) { return addServer(connFactory.createConnection(host, port)); } public boolean addServer(GearmanJobServerConnection conn) throws IllegalArgumentException, IllegalStateException { if (conn == null) { throw new IllegalArgumentException("Connection can not be null"); } if (session != null) { return true; } session = new GearmanJobServerSession(conn); reconnect(); LOG.debug("---- Worker " + this + " added server " + conn); return true; } public void setWorkerID(String id) throws IllegalArgumentException { if (id == null) { throw new IllegalArgumentException("Worker ID may not be null"); } this.id = id; if (session.isInitialized()) { sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.SET_CLIENT_ID, ByteUtils.toUTF8Bytes(id))); } } public String getWorkerID() { return id; } public void stop() { state = State.SHUTTINGDOWN; } public List shutdown() { return shutDownWorker(false); } public boolean isRunning() { return state.equals(State.RUNNING); } public void setJobUniqueIdRequired(boolean requiresJobUUID) { jobUniqueIdRequired = requiresJobUUID; } public boolean isJobUniqueIdRequired() { return jobUniqueIdRequired; } private GearmanPacket generateCanDoPacket(FunctionDefinition def) { GearmanPacketType pt = GearmanPacketType.CAN_DO; byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName()); return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data); } private GearmanPacket generateCantDoPacket(FunctionDefinition def) { GearmanPacketType pt = GearmanPacketType.CANT_DO; byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName()); return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data); } private void sendToAll(GearmanPacket p) { sendToAll(null, p); } private void sendToAll(GearmanServerResponseHandler handler, GearmanPacket p) { GearmanTask gsr = new GearmanTask(handler, p); session.submitTask(gsr); } /* * For the time being this will always return an empty list of * exceptions because closeSession does not throw an exception */ private List shutDownWorker(boolean completeTasks) { LOG.info("---- Worker " + this + " commencing shutdown"); ArrayList exceptions = new ArrayList(); // This gives any jobs in flight a chance to complete if (executorService != null) { if (completeTasks) { executorService.shutdown(); } else { executorService.shutdownNow(); } } session.closeSession(); try { ioAvailable.close(); } catch (IOException ioe) { LOG.warn("---- Worker " + this + " encountered IOException while closing selector: ", ioe); } state = State.IDLE; LOG.info("---- Worker " + this + " completed shutdown"); return exceptions; } private GearmanFunction addNewJob(GearmanSessionEvent event) { byte[] handle, data, functionNameBytes, unique; GearmanPacket p = event.getPacket(); String functionName; handle = p.getDataComponentValue( GearmanPacket.DataComponentName.JOB_HANDLE); functionNameBytes = p.getDataComponentValue( GearmanPacket.DataComponentName.FUNCTION_NAME); data = p.getDataComponentValue( GearmanPacket.DataComponentName.DATA); unique = p.getDataComponentValue(DataComponentName.UNIQUE_ID); functionName = ByteUtils.fromUTF8Bytes(functionNameBytes); FunctionDefinition def = functionMap.get(functionName); if (def == null) { GearmanTask gsr = new GearmanTask( new GearmanPacketImpl(GearmanPacketMagic.REQ, GearmanPacketType.WORK_FAIL, handle)); session.submitTask(gsr); availability.unlock(this); enqueueNoopEvent(); } else { GearmanFunction function = def.getFactory().getFunction(); function.setData(data); function.setJobHandle(handle); function.registerEventListener(session); if (unique != null && unique.length > 0) { function.setUniqueId(unique); } return function; } return null; } private void submitFunction(GearmanFunction fun) { try { if (executorService == null) { fun.call(); } else { executorService.submit(fun); } // We should have submitted either a WORK_EXCEPTION, COMPLETE, // or FAIL; make sure it gets sent. session.driveSessionIO(); } catch (IOException io) { LOG.warn("---- Worker " + this + " receieved IOException while" + " running function",io); session.closeSession(); } catch (Exception e) { LOG.warn("---- Worker " + this + " exception while executing function " + fun.getName(), e); } // Unlock the monitor for this worker availability.unlock(this); } private GearmanPacketType getGrabJobPacketType() { if (jobUniqueIdRequired) { return GearmanPacketType.GRAB_JOB_UNIQ; } return GearmanPacketType.GRAB_JOB; } }