Add stream-events command

The stream-events command outputs patchset added, comment added,
change merged, and change abandoned events in JSON format over SSH.
Streaming ends when the client disconnects or aborts the command.

Change-Id: Iba3ff56c23ab91cf3b855d5dde6c40d9735b643f
This commit is contained in:
Kenny Root
2010-02-24 00:29:20 -08:00
committed by Shawn O. Pearce
parent f458bf6337
commit 15ac1b83f5
10 changed files with 577 additions and 17 deletions

View File

@@ -66,6 +66,9 @@ link:cmd-approve.html[gerrit approve]::
link:cmd-ls-projects.html[gerrit ls-projects]:: link:cmd-ls-projects.html[gerrit ls-projects]::
List projects visible to the caller. List projects visible to the caller.
link:cmd-stream-events.html[gerrit stream-events]::
Monitor events occuring in real time.
gerrit receive-pack:: gerrit receive-pack::
Legacy alias for `git receive-pack`. Legacy alias for `git receive-pack`.

View File

@@ -0,0 +1,48 @@
gerrit stream-events
====================
NAME
----
gerrit stream-events - Monitor events occuring in real time
SYNOPSIS
--------
[verse]
'ssh' -p <port> <host> 'gerrit stream-events'
DESCRIPTION
-----------
Provides a portal into the major events occuring on the server,
outputing activity data in real-time to the client. Events are
filtered by the caller's access permissions, ensuring the caller
only receives events for changes they can view on the web, or in
the project repository.
Event output is in JSON, one event per line.
ACCESS
------
Any user who has configured an SSH key.
SCRIPTING
---------
This command is intended to be used in scripts.
EXAMPLES
--------
-----
$ ssh -p 29418 review.example.com gerrit stream-events
{"type":"comment-added","project":"tools/gerrit", ...}
{"type":"comment-added","project":"tools/gerrit", ...}
-----
SEE ALSO
--------
* link:access-control.html[Access Controls]
GERRIT
------
Part of link:index.html[Gerrit Code Review]

View File

@@ -1347,6 +1347,14 @@ are queued and serviced in a first-come-first-serve order.
+ +
By default, 1.5x the number of CPUs available to the JVM. By default, 1.5x the number of CPUs available to the JVM.
[[sshd.streamThreads]]sshd.streamThreads::
+
Number of threads to use when formatting events to asynchronous
streaming clients. Event formatting is multiplexed onto this thread
pool by a simple FIFO scheduling system.
+
By default, 1 plus the number of CPUs available to the JVM.
[[sshd.cipher]]sshd.cipher:: [[sshd.cipher]]sshd.cipher::
+ +
Available ciphers. To permit multiple ciphers, specify multiple Available ciphers. To permit multiple ciphers, specify multiple

View File

@@ -19,10 +19,14 @@ import com.google.gerrit.reviewdb.ApprovalCategory;
import com.google.gerrit.reviewdb.ApprovalCategoryValue; import com.google.gerrit.reviewdb.ApprovalCategoryValue;
import com.google.gerrit.reviewdb.Change; import com.google.gerrit.reviewdb.Change;
import com.google.gerrit.reviewdb.PatchSet; import com.google.gerrit.reviewdb.PatchSet;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.config.GerritServerConfig; import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectControl;
import com.google.gerrit.server.project.ProjectState;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@@ -38,6 +42,7 @@ import java.io.InputStreamReader;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* This class implements hooks for certain gerrit events. * This class implements hooks for certain gerrit events.
@@ -47,6 +52,72 @@ public class ChangeHookRunner {
/** A logger for this class. */ /** A logger for this class. */
private static final Logger log = LoggerFactory.getLogger(ChangeHookRunner.class); private static final Logger log = LoggerFactory.getLogger(ChangeHookRunner.class);
public static abstract class ChangeEvent {
}
public static class ApprovalAttribute {
public String type;
public String value;
}
public static class AuthorAttribute {
public String name;
public String email;
}
public static class CommentAddedEvent extends ChangeEvent {
public final String type = "comment-added";
public String project;
public String branch;
public String change;
public String revision;
public AuthorAttribute author;
public ApprovalAttribute[] approvals;
public String comment;
}
public static class ChangeMergedEvent extends ChangeEvent {
public final String type = "change-merged";
public String project;
public String branch;
public String change;
public String patchSet;
public AuthorAttribute submitter;
public String description;
}
public static class ChangeAbandonedEvent extends ChangeEvent {
public final String type = "change-abandoned";
public String project;
public String branch;
public String change;
public AuthorAttribute author;
public String reason;
}
public static class PatchSetCreatedEvent extends ChangeEvent {
public final String type = "patchset-created";
public String project;
public String branch;
public String change;
public String commit;
public String patchSet;
}
private static class ChangeListenerHolder {
final ChangeListener listener;
final IdentifiedUser user;
ChangeListenerHolder(ChangeListener l, IdentifiedUser u) {
listener = l;
user = u;
}
}
/** Listeners to receive changes as they happen. */
private final Map<ChangeListener, ChangeListenerHolder> listeners =
new ConcurrentHashMap<ChangeListener, ChangeListenerHolder>();
/** Filename of the new patchset hook. */ /** Filename of the new patchset hook. */
private final File patchsetCreatedHook; private final File patchsetCreatedHook;
@@ -65,6 +136,8 @@ public class ChangeHookRunner {
/** Queue of hooks that need to run. */ /** Queue of hooks that need to run. */
private final WorkQueue.Executor hookQueue; private final WorkQueue.Executor hookQueue;
private final ProjectCache projectCache;
/** /**
* Create a new ChangeHookRunner. * Create a new ChangeHookRunner.
* *
@@ -72,11 +145,16 @@ public class ChangeHookRunner {
* @param repoManager The repository manager. * @param repoManager The repository manager.
* @param config Config file to use. * @param config Config file to use.
* @param sitePath The sitepath of this gerrit install. * @param sitePath The sitepath of this gerrit install.
* @param projectCache the project cache instance for the server.
*/ */
@Inject @Inject
public ChangeHookRunner(final WorkQueue queue, final GitRepositoryManager repoManager, @GerritServerConfig final Config config, final SitePaths sitePath) { public ChangeHookRunner(final WorkQueue queue,
final GitRepositoryManager repoManager,
@GerritServerConfig final Config config, final SitePaths sitePath,
final ProjectCache projectCache) {
this.repoManager = repoManager; this.repoManager = repoManager;
this.hookQueue = queue.createQueue(1, "hook"); this.hookQueue = queue.createQueue(1, "hook");
this.projectCache = projectCache;
final File hooksPath = sitePath.resolve(getValue(config, "hooks", "path", sitePath.hooks_dir.getAbsolutePath())); final File hooksPath = sitePath.resolve(getValue(config, "hooks", "path", sitePath.hooks_dir.getAbsolutePath()));
@@ -86,6 +164,14 @@ public class ChangeHookRunner {
changeAbandonedHook = sitePath.resolve(new File(hooksPath, getValue(config, "hooks", "changeAbandonedHook", "change-abandoned")).getPath()); changeAbandonedHook = sitePath.resolve(new File(hooksPath, getValue(config, "hooks", "changeAbandonedHook", "change-abandoned")).getPath());
} }
public void addChangeListener(ChangeListener listener, IdentifiedUser user) {
listeners.put(listener, new ChangeListenerHolder(listener, user));
}
public void removeChangeListener(ChangeListener listener) {
listeners.remove(listener);
}
/** /**
* Helper Method for getting values from the config. * Helper Method for getting values from the config.
* *
@@ -121,19 +207,28 @@ public class ChangeHookRunner {
* @param patchSet The Patchset that was created. * @param patchSet The Patchset that was created.
*/ */
public void doPatchsetCreatedHook(final Change change, final PatchSet patchSet) { public void doPatchsetCreatedHook(final Change change, final PatchSet patchSet) {
final PatchSetCreatedEvent event = new PatchSetCreatedEvent();
event.project = change.getProject().get();
event.branch = change.getDest().getShortName();
event.change = change.getKey().get();
event.commit = patchSet.getRevision().get();
event.patchSet = Integer.toString(patchSet.getPatchSetId());
fireEvent(change, event);
final List<String> args = new ArrayList<String>(); final List<String> args = new ArrayList<String>();
args.add(patchsetCreatedHook.getAbsolutePath()); args.add(patchsetCreatedHook.getAbsolutePath());
args.add("--change"); args.add("--change");
args.add(change.getKey().get()); args.add(event.change);
args.add("--project"); args.add("--project");
args.add(change.getProject().get()); args.add(event.project);
args.add("--branch"); args.add("--branch");
args.add(change.getDest().getShortName()); args.add(event.branch);
args.add("--commit"); args.add("--commit");
args.add(patchSet.getRevision().get()); args.add(event.commit);
args.add("--patchset"); args.add("--patchset");
args.add(Integer.toString(patchSet.getPatchSetId())); args.add(event.patchSet);
runHook(getRepo(change), args); runHook(getRepo(change), args);
} }
@@ -148,19 +243,41 @@ public class ChangeHookRunner {
* @param approvals Map of Approval Categories and Scores * @param approvals Map of Approval Categories and Scores
*/ */
public void doCommentAddedHook(final Change change, final Account account, final PatchSet patchSet, final String comment, final Map<ApprovalCategory.Id, ApprovalCategoryValue.Id> approvals) { public void doCommentAddedHook(final Change change, final Account account, final PatchSet patchSet, final String comment, final Map<ApprovalCategory.Id, ApprovalCategoryValue.Id> approvals) {
final CommentAddedEvent event = new CommentAddedEvent();
event.project = change.getProject().get();
event.branch = change.getDest().getShortName();
event.change = change.getKey().get();
event.author = getAuthorAttribute(account);
event.revision = patchSet.getRevision().get();
event.comment = comment;
if (approvals.size() > 0) {
event.approvals = new ApprovalAttribute[approvals.size()];
int i = 0;
for (Map.Entry<ApprovalCategory.Id, ApprovalCategoryValue.Id> approval : approvals.entrySet()) {
ApprovalAttribute a = new ApprovalAttribute();
a.type = approval.getKey().get();
a.value = Short.toString(approval.getValue().get());
event.approvals[i++] = a;
}
}
fireEvent(change, event);
final List<String> args = new ArrayList<String>(); final List<String> args = new ArrayList<String>();
args.add(commentAddedHook.getAbsolutePath()); args.add(commentAddedHook.getAbsolutePath());
args.add("--change"); args.add("--change");
args.add(change.getKey().get()); args.add(event.change);
args.add("--project"); args.add("--project");
args.add(change.getProject().get()); args.add(event.project);
args.add("--branch"); args.add("--branch");
args.add(change.getDest().getShortName()); args.add(event.branch);
args.add("--author"); args.add("--author");
args.add(getDisplayName(account)); args.add(getDisplayName(account));
args.add("--commit"); args.add("--commit");
args.add(patchSet.getRevision().get()); args.add(event.revision);
args.add("--comment"); args.add("--comment");
args.add(comment == null ? "" : comment); args.add(comment == null ? "" : comment);
for (Map.Entry<ApprovalCategory.Id, ApprovalCategoryValue.Id> approval : approvals.entrySet()) { for (Map.Entry<ApprovalCategory.Id, ApprovalCategoryValue.Id> approval : approvals.entrySet()) {
@@ -179,19 +296,29 @@ public class ChangeHookRunner {
* @param patchSet The patchset that was merged. * @param patchSet The patchset that was merged.
*/ */
public void doChangeMergedHook(final Change change, final Account account, final PatchSet patchSet) { public void doChangeMergedHook(final Change change, final Account account, final PatchSet patchSet) {
final ChangeMergedEvent event = new ChangeMergedEvent();
event.project = change.getProject().get();
event.branch = change.getDest().getShortName();
event.change = change.getKey().get();
event.submitter = getAuthorAttribute(account);
event.patchSet = patchSet.getRevision().get();
event.description = change.getSubject();
fireEvent(change, event);
final List<String> args = new ArrayList<String>(); final List<String> args = new ArrayList<String>();
args.add(changeMergedHook.getAbsolutePath()); args.add(changeMergedHook.getAbsolutePath());
args.add("--change"); args.add("--change");
args.add(change.getKey().get()); args.add(event.change);
args.add("--project"); args.add("--project");
args.add(change.getProject().get()); args.add(event.project);
args.add("--branch"); args.add("--branch");
args.add(change.getDest().getShortName()); args.add(event.branch);
args.add("--submitter"); args.add("--submitter");
args.add(getDisplayName(account)); args.add(getDisplayName(account));
args.add("--commit"); args.add("--commit");
args.add(patchSet.getRevision().get()); args.add(event.patchSet);
runHook(getRepo(change), args); runHook(getRepo(change), args);
} }
@@ -204,15 +331,24 @@ public class ChangeHookRunner {
* @param reason Reason for abandoning the change. * @param reason Reason for abandoning the change.
*/ */
public void doChangeAbandonedHook(final Change change, final Account account, final String reason) { public void doChangeAbandonedHook(final Change change, final Account account, final String reason) {
final ChangeAbandonedEvent event = new ChangeAbandonedEvent();
event.project = change.getProject().get();
event.branch = change.getDest().getShortName();
event.change = change.getKey().get();
event.author = getAuthorAttribute(account);
event.reason = reason;
fireEvent(change, event);
final List<String> args = new ArrayList<String>(); final List<String> args = new ArrayList<String>();
args.add(changeAbandonedHook.getAbsolutePath()); args.add(changeAbandonedHook.getAbsolutePath());
args.add("--change"); args.add("--change");
args.add(change.getKey().get()); args.add(event.change);
args.add("--project"); args.add("--project");
args.add(change.getProject().get()); args.add(event.project);
args.add("--branch"); args.add("--branch");
args.add(change.getDest().getShortName()); args.add(event.branch);
args.add("--abandoner"); args.add("--abandoner");
args.add(getDisplayName(account)); args.add(getDisplayName(account));
args.add("--reason"); args.add("--reason");
@@ -221,6 +357,36 @@ public class ChangeHookRunner {
runHook(getRepo(change), args); runHook(getRepo(change), args);
} }
private void fireEvent(final Change change, final ChangeEvent event) {
for (ChangeListenerHolder holder : listeners.values()) {
if (isVisibleTo(change, holder.user)) {
holder.listener.onChangeEvent(event);
}
}
}
private boolean isVisibleTo(Change change, IdentifiedUser user) {
final ProjectState pe = projectCache.get(change.getProject());
if (pe == null) {
return false;
}
final ProjectControl pc = pe.controlFor(user);
return pc.controlFor(change).isVisible();
}
/**
* Create an AuthorAttribute for the given account suitable for serialization to JSON.
*
* @param account
* @return object suitable for serialization to JSON
*/
private AuthorAttribute getAuthorAttribute(final Account account) {
AuthorAttribute author = new AuthorAttribute();
author.name = account.getFullName();
author.email = account.getPreferredEmail();
return author;
}
/** /**
* Get the display name for the given account. * Get the display name for the given account.
* *

View File

@@ -0,0 +1,21 @@
// Copyright (C) 2010 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.common;
import com.google.gerrit.common.ChangeHookRunner.ChangeEvent;
public interface ChangeListener {
public void onChangeEvent(ChangeEvent event);
}

View File

@@ -75,6 +75,8 @@ public class SshModule extends FactoryModule {
bind(WorkQueue.Executor.class).annotatedWith(CommandExecutor.class) bind(WorkQueue.Executor.class).annotatedWith(CommandExecutor.class)
.toProvider(CommandExecutorProvider.class).in(SINGLETON); .toProvider(CommandExecutorProvider.class).in(SINGLETON);
bind(WorkQueue.Executor.class).annotatedWith(StreamCommandExecutor.class)
.toProvider(StreamCommandExecutorProvider.class).in(SINGLETON);
bind(PublickeyAuthenticator.class).to(DatabasePubKeyAuth.class); bind(PublickeyAuthenticator.class).to(DatabasePubKeyAuth.class);
bind(PasswordAuthenticator.class).to(DatabasePasswordAuth.class); bind(PasswordAuthenticator.class).to(DatabasePasswordAuth.class);

View File

@@ -0,0 +1,28 @@
// Copyright (C) 2009 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.sshd;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
/** Marker on {@link Executor} used by delayed event streaming. */
@Retention(RUNTIME)
@BindingAnnotation
public @interface StreamCommandExecutor {
}

View File

@@ -0,0 +1,55 @@
// Copyright (C) 2010 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.sshd;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.eclipse.jgit.lib.Config;
import java.util.concurrent.ThreadFactory;
class StreamCommandExecutorProvider implements Provider<WorkQueue.Executor> {
private final int poolSize;
private final WorkQueue queues;
@Inject
StreamCommandExecutorProvider(@GerritServerConfig final Config config,
final WorkQueue wq) {
final int cores = Runtime.getRuntime().availableProcessors();
poolSize = config.getInt("sshd", "streamThreads", cores + 1);
queues = wq;
}
@Override
public WorkQueue.Executor get() {
final WorkQueue.Executor executor;
executor = queues.createQueue(poolSize, "SSH-Stream-Worker");
final ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(final Runnable task) {
final Thread t = parent.newThread(task);
t.setPriority(Thread.MIN_PRIORITY);
return t;
}
});
return executor;
}
}

View File

@@ -39,6 +39,7 @@ public class DefaultCommandModule extends CommandModule {
command(gerrit, "show-caches").to(AdminShowCaches.class); command(gerrit, "show-caches").to(AdminShowCaches.class);
command(gerrit, "show-connections").to(AdminShowConnections.class); command(gerrit, "show-connections").to(AdminShowConnections.class);
command(gerrit, "show-queue").to(AdminShowQueue.class); command(gerrit, "show-queue").to(AdminShowQueue.class);
command(gerrit, "stream-events").to(StreamEvents.class);
command(git).toProvider(new DispatchCommandProvider(git)); command(git).toProvider(new DispatchCommandProvider(git));
command(git, "receive-pack").to(Commands.key(gerrit, "receive-pack")); command(git, "receive-pack").to(Commands.key(gerrit, "receive-pack"));

View File

@@ -0,0 +1,228 @@
// Copyright (C) 2010 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.sshd.commands;
import com.google.gerrit.common.ChangeHookRunner;
import com.google.gerrit.common.ChangeListener;
import com.google.gerrit.common.ChangeHookRunner.ChangeEvent;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gerrit.sshd.BaseCommand;
import com.google.gerrit.sshd.StreamCommandExecutor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import org.apache.sshd.server.Environment;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
final class StreamEvents extends BaseCommand {
/** Maximum number of events that may be queued up for each connection. */
private static final int MAX_EVENTS = 128;
/** Number of events to write before yielding off the thread. */
private static final int BATCH_SIZE = 32;
@Inject
private IdentifiedUser currentUser;
@Inject
private ChangeHookRunner hooks;
@Inject
@StreamCommandExecutor
private WorkQueue.Executor pool;
/** Queue of events to stream to the connected user. */
private final LinkedBlockingQueue<ChangeEvent> queue =
new LinkedBlockingQueue<ChangeEvent>(MAX_EVENTS);
private final Gson gson = new Gson();
/** Special event to notify clients they missed other events. */
private final Object droppedOutputEvent = new Object() {
@SuppressWarnings("unused")
final String type = "dropped-output";
};
private final ChangeListener listener = new ChangeListener() {
@Override
public void onChangeEvent(final ChangeEvent event) {
offer(event);
}
};
private final CancelableRunnable writer = new CancelableRunnable() {
@Override
public void run() {
writeEvents();
}
@Override
public void cancel() {
onExit(0);
}
};
/** True if {@link #droppedOutputEvent} needs to be sent. */
private volatile boolean dropped;
/** Lock to protect {@link #queue}, {@link #task}, {@link #done}. */
private final Object taskLock = new Object();
/** True if no more messages should be sent to the output. */
private boolean done;
/**
* Currently scheduled task to spin out {@link #queue}.
* <p>
* This field is usually {@code null}, unless there is at least one object
* present inside of {@link #queue} ready for delivery. Tasks are only started
* when there are events to be sent.
*/
private Future<?> task;
private PrintWriter stdout;
@Override
public void start(final Environment env) throws IOException {
try {
parseCommandLine();
} catch (UnloggedFailure e) {
String msg = e.getMessage();
if (!msg.endsWith("\n")) {
msg += "\n";
}
err.write(msg.getBytes("UTF-8"));
err.flush();
onExit(1);
return;
}
stdout = toPrintWriter(out);
hooks.addChangeListener(listener, currentUser);
}
@Override
protected void onExit(final int rc) {
hooks.removeChangeListener(listener);
synchronized (taskLock) {
done = true;
}
super.onExit(rc);
}
@Override
public void destroy() {
hooks.removeChangeListener(listener);
final boolean exit;
synchronized (taskLock) {
if (task != null) {
task.cancel(true);
exit = false; // onExit will be invoked by the task cancellation.
} else {
exit = !done;
}
done = true;
}
if (exit) {
onExit(0);
}
}
private void offer(final ChangeEvent event) {
synchronized (taskLock) {
if (!queue.offer(event)) {
dropped = true;
}
if (task == null && !done) {
task = pool.submit(writer);
}
}
}
private ChangeEvent poll() {
synchronized (taskLock) {
ChangeEvent event = queue.poll();
if (event == null) {
task = null;
}
return event;
}
}
private void writeEvents() {
int processed = 0;
while (processed < BATCH_SIZE) {
if (Thread.interrupted() || stdout.checkError()) {
// The other side either requested a shutdown by calling our
// destroy() above, or it closed the stream and is no longer
// accepting output. Either way terminate this instance.
//
hooks.removeChangeListener(listener);
flush();
onExit(0);
return;
}
if (dropped) {
write(droppedOutputEvent);
dropped = false;
}
final ChangeEvent event = poll();
if (event == null) {
break;
}
write(event);
processed++;
}
flush();
if (BATCH_SIZE <= processed) {
// We processed the limit, but more might remain in the queue.
// Schedule the write task again so we will come back here and
// can process more events.
//
synchronized (taskLock) {
task = pool.submit(writer);
}
}
}
private void write(final Object message) {
final String msg = gson.toJson(message) + "\n";
synchronized (stdout) {
stdout.print(msg);
}
}
private void flush() {
synchronized (stdout) {
stdout.flush();
}
}
}