Use single thread to perform all zmq sends.

jeromq does not appear to be thread safe. Use a single thread to call
send on the ZMQ socket to avoid contention for those resources. Have the
RunListener pass events to the ZMQ sender thread with a BlockingQueue.

Do not block when offering events to that queue to avoid
starvation/deadlock in the Jenkins job runners. Events may potentially
be lost if ZMQ cannot keep up.
This commit is contained in:
Clark Boylan 2013-01-31 10:59:53 -08:00
parent 7d54898d4e
commit 135d273963
3 changed files with 144 additions and 77 deletions

7
README
View File

@ -12,8 +12,11 @@ history and you will find the old versions of this plugin that
depended on jzmq.
TODO:
Avoid reading in the global config for each event if possible.
Cleanup config.jelly for the non global Job config.
- Avoid reading in the global config for each event if possible.
- Need to allow ZMQRunnable thread to die if something truly
unexpected happens. The RunListener should then start a new
DaemonThread to handle further events.
- Cleanup config.jelly for the non global Job config.
This plugin borrows heavily from the Jenkins Notification Plugin
https://github.com/jenkinsci/notification-plugin. That plugin

View File

@ -18,90 +18,44 @@
package org.jenkinsci.plugins.ZMQEventPublisher;
import hudson.Extension;
import hudson.EnvVars;
import hudson.model.Hudson;
import hudson.model.Result;
import hudson.model.Run;
import hudson.model.TaskListener;
import hudson.model.listeners.RunListener;
import hudson.util.DaemonThreadFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jeromq.ZMQ;
import org.jeromq.ZMQException;
/*
* Listener to publish Jenkins build events through ZMQ
*/
@Extension
public class RunListenerImpl extends RunListener<Run> {
public static final Logger LOGGER = Logger.getLogger(RunListenerImpl.class.getName());
private int port;
private String bind_addr;
private ZMQ.Context context;
private ZMQ.Socket publisher;
public static final Logger LOGGER =
Logger.getLogger(RunListenerImpl.class.getName());
private final LinkedBlockingQueue<String> queue =
new LinkedBlockingQueue<String>(queueLength);
// ZMQ has a high water mark of 1000 events.
private static final int queueLength = 1024;
private static final DaemonThreadFactory threadFactory =
new DaemonThreadFactory();
private ZMQRunnable ZMQRunner;
private Thread thread;
public RunListenerImpl() {
super(Run.class);
context = ZMQ.context(1);
}
private int getPort(Run build) {
Hudson hudson = Hudson.getInstance();
HudsonNotificationProperty.HudsonNotificationPropertyDescriptor globalProperty =
(HudsonNotificationProperty.HudsonNotificationPropertyDescriptor)
hudson.getDescriptor(HudsonNotificationProperty.class);
if (globalProperty != null) {
return globalProperty.getPort();
}
return 8888;
}
private ZMQ.Socket bindSocket(Run build) {
int tmpPort = getPort(build);
if (publisher == null) {
port = tmpPort;
LOGGER.log(Level.INFO,
String.format("Binding ZMQ PUB to port %d", port));
publisher = bindSocket(port);
}
else if (tmpPort != port) {
LOGGER.log(Level.INFO,
String.format("Changing ZMQ PUB port from %d to %d", port, tmpPort));
try {
publisher.close();
} catch (ZMQException e) {
/* Let the garbage collector sort out cleanup */
LOGGER.log(Level.INFO,
"Unable to close ZMQ PUB socket. " + e.toString(), e);
}
port = tmpPort;
publisher = bindSocket(port);
}
return publisher;
}
private ZMQ.Socket bindSocket(int port) {
ZMQ.Socket socket;
try {
socket = context.socket(ZMQ.PUB);
bind_addr = String.format("tcp://*:%d", port);
socket.bind(bind_addr);
} catch (ZMQException e) {
LOGGER.log(Level.SEVERE,
"Unable to bind ZMQ PUB socket. " + e.toString(), e);
socket = null;
}
return socket;
ZMQRunner = new ZMQRunnable(queue);
thread = threadFactory.newThread(ZMQRunner);
thread.start();
}
@Override
public void onCompleted(Run build, TaskListener listener) {
String event = "onCompleted";
String json = Phase.COMPLETED.handlePhase(build, getStatus(build), listener);
sendEvent(build, event, json);
sendEvent(event, json);
}
/* Currently not emitting onDeleted events. This should be fixed.
@ -117,28 +71,25 @@ public class RunListenerImpl extends RunListener<Run> {
public void onFinalized(Run build) {
String event = "onFinalized";
String json = Phase.FINISHED.handlePhase(build, getStatus(build), TaskListener.NULL);
sendEvent(build, event, json);
sendEvent(event, json);
}
@Override
public void onStarted(Run build, TaskListener listener) {
String event = "onStarted";
String json = Phase.STARTED.handlePhase(build, getStatus(build), listener);
sendEvent(build, event, json);
sendEvent(event, json);
}
private void sendEvent(Run build, String event, String json) {
ZMQ.Socket socket;
private void sendEvent(String event, String json) {
if (json != null) {
socket = bindSocket(build);
if (socket != null) {
event = event + " " + json;
try {
socket.send(event.getBytes(), 0);
} catch (ZMQException e) {
LOGGER.log(Level.INFO,
"Unable to send event. " + e.toString(), e);
}
event = event + " " + json;
// Offer the event. If the queue is full this will not block.
// We may drop events but this should prevent starvation in
// the calling Jenkins threads.
if (!queue.offer(event)) {
LOGGER.log(Level.INFO,
"Unable to add event to ZMQ queue.");
}
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright 2013 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jenkinsci.plugins.ZMQEventPublisher;
import hudson.model.Hudson;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jeromq.ZMQ;
import org.jeromq.ZMQException;
public class ZMQRunnable implements Runnable {
public static final Logger LOGGER = Logger.getLogger(ZMQRunnable.class.getName());
private static final String bind_addr = "tcp://*:%d";
private int port;
private final LinkedBlockingQueue<String> queue;
private final ZMQ.Context context;
private ZMQ.Socket publisher;
public ZMQRunnable(LinkedBlockingQueue<String> queue) {
this.queue = queue;
context = ZMQ.context(1);
bindSocket();
}
private int getPort() {
Hudson hudson = Hudson.getInstance();
HudsonNotificationProperty.HudsonNotificationPropertyDescriptor globalProperty =
(HudsonNotificationProperty.HudsonNotificationPropertyDescriptor)
hudson.getDescriptor(HudsonNotificationProperty.class);
if (globalProperty != null) {
return globalProperty.getPort();
}
return 8888;
}
private void bindSocket() {
int tmpPort = getPort();
if (publisher == null) {
port = tmpPort;
LOGGER.log(Level.INFO,
String.format("Binding ZMQ PUB to port %d", port));
publisher = bindSocket(port);
}
else if (tmpPort != port) {
LOGGER.log(Level.INFO,
String.format("Changing ZMQ PUB port from %d to %d", port, tmpPort));
try {
publisher.close();
} catch (ZMQException e) {
/* Let the garbage collector sort out cleanup */
LOGGER.log(Level.INFO,
"Unable to close ZMQ PUB socket. " + e.toString(), e);
}
port = tmpPort;
publisher = bindSocket(port);
}
}
private ZMQ.Socket bindSocket(int port) {
ZMQ.Socket socket;
try {
socket = context.socket(ZMQ.PUB);
socket.bind(String.format(bind_addr, port));
} catch (ZMQException e) {
LOGGER.log(Level.SEVERE,
"Unable to bind ZMQ PUB socket. " + e.toString(), e);
socket = null;
}
return socket;
}
public void run() {
String event;
while(true) {
try {
event = queue.take();;
bindSocket();
if (publisher != null) {
try {
publisher.send(event.getBytes(), 0);
} catch (ZMQException e) {
LOGGER.log(Level.INFO,
"Unable to send event. " + e.toString(), e);
}
}
}
// Catch all exceptions so that this thread does not die.
catch (Exception e) {
LOGGER.log(Level.SEVERE,
"Unhandled exception publishing ZMQ events " + e.toString(), e);
}
}
}
}