Register the diff of functions

Instead of registering all functions for every node each time the
functions change register the delta of the functions each time. This
should cut down on the amount of CAN_DO updates we were doing in the
past.

Note that we handle the loss of all functions with RESET_ABILITIES
rather than sending a CANT_DO for each function that is no longer
available. Also, starting a new connection will always begin with
RESET_ABILITIES to clear any potentially stale state from the gearman
server.

Change-Id: I2b16117fce30ddb3e11b338043204cf726c7f1d4
This commit is contained in:
Clark Boylan 2015-05-05 18:05:59 -07:00
parent 0314ab1ea2
commit d4c65db195
1 changed files with 53 additions and 14 deletions

View File

@ -236,22 +236,55 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return;
}
functionMap.clear();
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.RESET_ABILITIES, new byte[0]));
session.driveSessionIO();
if (!isRunning()) return;
for (GearmanFunctionFactory factory: functions) {
FunctionDefinition def = new FunctionDefinition(0, factory);
functionMap.put(factory.getFunctionName(), def);
sendToAll(generateCanDoPacket(def));
HashMap<String, FunctionDefinition> newFunctionMap = new HashMap<String, FunctionDefinition>();
// If we have no previous data then reset abilities to be sure the
// gearman server has no stale data that we don't know about.
// Or if we have no functions anymore just reset everything, we don't
// need a CANT_DO per lost function.
if (functions.isEmpty() || functionMap.isEmpty()) {
sendToAll(new GearmanPacketImpl(GearmanPacketMagic.REQ,
GearmanPacketType.RESET_ABILITIES, new byte[0]));
session.driveSessionIO();
if (!isRunning()) return;
LOG.debug("---- Worker " + this + " registered function " +
factory.getFunctionName());
LOG.debug("---- Worker " + this + " reset functions");
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
}
// Now only update if we have data to update.
if (!functions.isEmpty()) {
for (GearmanFunctionFactory factory: functions) {
FunctionDefinition def = new FunctionDefinition(0, factory);
newFunctionMap.put(factory.getFunctionName(), def);
if (!functionMap.containsKey(factory.getFunctionName())) {
sendToAll(generateCanDoPacket(def));
session.driveSessionIO();
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
LOG.debug("---- Worker " + this + " registered function " +
factory.getFunctionName());
}
functionMap.remove(factory.getFunctionName());
}
for (FunctionDefinition def: functionMap.values()) {
sendToAll(generateCantDoPacket(def));
session.driveSessionIO();
if (!isRunning()) {
// Ensure we start from scratch on reconnection.
functionMap.clear();
return;
}
LOG.debug("---- Worker " + this + " unregistered function " +
def.getFactory().getFunctionName());
}
}
functionMap = newFunctionMap;
GearmanSessionEvent nextEvent = eventList.peek();
if (nextEvent == null ||
@ -515,6 +548,12 @@ public class MyGearmanWorkerImpl implements GearmanSessionEventHandler {
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
}
private GearmanPacket generateCantDoPacket(FunctionDefinition def) {
GearmanPacketType pt = GearmanPacketType.CANT_DO;
byte[] data = ByteUtils.toUTF8Bytes(def.getFactory().getFunctionName());
return new GearmanPacketImpl(GearmanPacketMagic.REQ, pt, data);
}
private void sendToAll(GearmanPacket p) {
sendToAll(null, p);
}