Implement a multi-sub-task progress monitor for ReceiveCommits

Because there are a lot of file and database accesses, having a
progress monitor is useful. Moreover, we no longer rely on JGit for
ref updates, so we can't depend on its progress meter.

This is a special progress meter implementation that multiplexes
output from the various sub-tasks that ReceiveCommits performs. We
need this multiplexing because we don't know ahead of time how much of
most kinds of work must be performed. For example, a single ref update
can close an arbitrary number of changes.

The progress meter includes a "spinner" animation that updates the
output every 500ms regardless of activity. Additionally, the whole
progress meter updates whenever one sub-task increases by 1%. Because
of this, the MultiProgressMonitor must run in the main thread to avoid
I/O stalls, delegating work to a background worker.

Change-Id: If7f355830387a1a36a4e3b7ca01693239ce2d956
This commit is contained in:
Dave Borowitz
2012-02-29 11:39:00 -08:00
parent 3643ac6ec5
commit 06cb1d2526
3 changed files with 364 additions and 27 deletions

View File

@@ -15,9 +15,10 @@
package com.google.gerrit.server.git;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.git.ReceiveCommits.MessageSender;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.gerrit.server.project.ProjectControl;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.Inject;
@@ -30,19 +31,15 @@ import org.eclipse.jgit.transport.ReceivePack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/** Hook that delegates to {@link ReceiveCommits} in a worker thread. */
public class AsyncReceiveCommits implements PreReceiveHook {
private static final Logger log =
LoggerFactory.getLogger(AsyncReceiveCommits.class);
private final ReceiveCommits rc;
private final Executor executor;
private final RequestScopePropagator scopePropagator;
public interface Factory {
AsyncReceiveCommits create(ProjectControl projectControl,
Repository repository);
@@ -70,7 +67,7 @@ public class AsyncReceiveCommits implements PreReceiveHook {
@Override
public void run() {
rc.processCommands(commands);
rc.processCommands(commands, progress);
}
@Override
@@ -94,6 +91,35 @@ public class AsyncReceiveCommits implements PreReceiveHook {
}
}
private class MessageSenderOutputStream extends OutputStream {
private final MessageSender messageSender = rc.getMessageSender();
@Override
public void write(int b) {
messageSender.sendBytes(new byte[]{(byte)b});
}
@Override
public void write(byte[] what, int off, int len) {
messageSender.sendBytes(what, off, len);
}
@Override
public void write(byte[] what) {
messageSender.sendBytes(what);
}
@Override
public void flush() {
messageSender.flush();
}
}
private final ReceiveCommits rc;
private final Executor executor;
private final RequestScopePropagator scopePropagator;
private final MultiProgressMonitor progress;
@Inject
AsyncReceiveCommits(final ReceiveCommits.Factory factory,
@ReceiveCommitsExecutor final Executor executor,
@@ -104,29 +130,25 @@ public class AsyncReceiveCommits implements PreReceiveHook {
this.scopePropagator = scopePropagator;
rc = factory.create(projectControl, repo);
rc.getReceivePack().setPreReceiveHook(this);
progress = new MultiProgressMonitor(
new MessageSenderOutputStream(), "Updating changes");
}
@Override
public void onPreReceive(final ReceivePack rp,
final Collection<ReceiveCommand> commands) {
Future<?> workerFuture = executor.submit(
scopePropagator.wrap(new Worker(commands)));
Exception err = null;
try {
workerFuture.get();
progress.waitFor(executor.submit(
scopePropagator.wrap(new Worker(commands))));
} catch (ExecutionException e) {
err = e;
} catch (InterruptedException e) {
err = e;
}
if (err != null) {
log.warn("Error in ReceiveCommits", err);
log.warn("Error in ReceiveCommits", e);
rc.getMessageSender().sendError("internal error while processing changes");
// ReceiveCommits has tried its best to catch errors, so anything at this
// point is very bad.
for (final ReceiveCommand c : commands) {
if (c.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) {
ReceiveCommits.reject(c, "internal error");
rc.reject(c, "internal error");
}
}
}

View File

@@ -0,0 +1,284 @@
// Copyright (C) 2012 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.git;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import org.eclipse.jgit.lib.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.List;
/**
* Progress reporting interface that multiplexes multiple sub-tasks.
* <p>
* Output is of the format:
* <pre>
* Task: subA: 1, subB: 75% (3/4) (-)\r
* Task: subA: 2, subB: 75% (3/4), subC: 1 (\)\r
* Task: subA: 2, subB: 100% (4/4), subC: 1 (|)\r
* Task: subA: 4, subB: 100% (4/4), subC: 4, done \n
* </pre>
* <p>
* Callers should try to keep task and sub-task descriptions short, since the
* output should fit on one terminal line. (Note that git clients do not accept
* terminal control characters, so true multi-line progress messages would be
* impossible.)
*/
public class MultiProgressMonitor {
private static final Logger log =
LoggerFactory.getLogger(MultiProgressMonitor.class);
/** Constant indicating the total work units cannot be predicted. */
public static final int UNKNOWN = 0;
private static final char[] SPINNER_STATES = new char[]{'-', '\\', '|', '/'};
private static final char NO_SPINNER = ' ';
/** Handle for a sub-task. */
public class Task {
private final String name;
private final int total;
private volatile int count;
private int lastPercent;
Task(final String subTaskName, final int totalWork) {
this.name = subTaskName;
this.total = totalWork;
}
/**
* Indicate that work has been completed on this sub-task.
* <p>
* Must be called from the worker thread.
*
* @param completed number of work units completed.
*/
public void update(final int completed) {
count += completed;
if (total != UNKNOWN) {
int percent = count * 100 / total;
if (percent > lastPercent) {
lastPercent = percent;
wakeUp();
}
}
}
/**
* Indicate that this sub-task is finished.
* <p>
* Must be called from the worker thread.
*/
public void end() {
if (total == UNKNOWN && count > 0) {
wakeUp();
}
}
}
private final OutputStream out;
private final String taskName;
private final List<Task> tasks = new CopyOnWriteArrayList<Task>();
private int spinnerIndex;
private char spinnerState = NO_SPINNER;
private boolean done;
private boolean write = true;
private final long maxIntervalNanos;
/**
* Create a new progress monitor for multiple sub-tasks.
*
* @param out stream for writing progress messages.
* @param taskName name of the overall task.
*/
public MultiProgressMonitor(final OutputStream out, final String taskName) {
this(out, taskName, 500, TimeUnit.MILLISECONDS);
}
/**
* Create a new progress monitor for multiple sub-tasks.
*
* @param out stream for writing progress messages.
* @param taskName name of the overall task.
* @param maxIntervalTime maximum interval between progress messages.
* @param maxIntervalUnit time unit for progress interval.
*/
public MultiProgressMonitor(final OutputStream out, final String taskName,
long maxIntervalTime, TimeUnit maxIntervalUnit) {
this.out = out;
this.taskName = taskName;
maxIntervalNanos = NANOSECONDS.convert(maxIntervalTime, maxIntervalUnit);
}
/**
* Wait for a task managed by a {@link Future}.
* <p>
* Must be called from the main thread, <em>not</em> the worker thread. Once
* the worker thread calls {@link #end()}, the future has an additional
* <code>maxInterval</code> to finish before it is forcefully cancelled and
* {@link ExecutionException} is thrown.
*
* @param workerFuture a future that returns when the worker thread is
* finished.
*
* @throws ExecutionException if this thread or the worker thread was
* interrupted, the worker was cancelled, or the worker timed out.
*/
public void waitFor(Future<?> workerFuture) throws ExecutionException {
synchronized (this) {
long left = maxIntervalNanos;
while (!done) {
long start = System.nanoTime();
try {
NANOSECONDS.timedWait(this, left);
} catch (InterruptedException e) {
throw new ExecutionException(e);
}
// Send an update on every wakeup (manual or spurious), but only move
// the spinner every maxInterval.
left -= System.nanoTime() - start;
if (left <= 0) {
moveSpinner();
left = maxIntervalNanos;
}
sendUpdate();
if (!done && workerFuture.isDone()) {
// The worker may not have called end() explicitly, which is likely a
// programming error.
log.warn("MultiProgressMonitor worker did not call end()"
+ " before returning");
end();
}
}
sendDone();
}
// The loop exits as soon as the worker calls end(), but we give it another
// maxInterval to finish up and return.
try {
workerFuture.get(maxIntervalNanos, NANOSECONDS);
} catch (InterruptedException e) {
throw new ExecutionException(e);
} catch (CancellationException e) {
throw new ExecutionException(e);
} catch (TimeoutException e) {
workerFuture.cancel(true);
throw new ExecutionException(e);
}
}
private synchronized void wakeUp() {
notifyAll();
}
/**
* Begin a sub-task.
*
* @param subTask sub-task name.
* @param subTaskWork total work units in sub-task, or {@link #UNKNOWN}.
* @return sub-task handle.
*/
public Task beginSubTask(final String subTask, final int subTaskWork) {
Task task = new Task(subTask, subTaskWork);
tasks.add(task);
return task;
}
/**
* End the overall task.
* <p>
* Must be called from the worker thread.
*/
public synchronized void end() {
done = true;
wakeUp();
}
private void sendDone() {
spinnerState = NO_SPINNER;
StringBuilder s = format();
s.append(", done \n");
send(s);
}
private void moveSpinner() {
spinnerIndex = (spinnerIndex + 1) % SPINNER_STATES.length;
spinnerState = SPINNER_STATES[spinnerIndex];
}
private void sendUpdate() {
send(format());
}
private StringBuilder format() {
StringBuilder s = new StringBuilder().append("\r").append(taskName)
.append(':');
if (!tasks.isEmpty()) {
boolean first = true;
for (Task t : tasks) {
int count = t.count;
if (count == 0) {
continue;
}
if (!first) {
s.append(',');
} else {
first = false;
}
s.append(' ').append(t.name).append(": ");
if (t.total == UNKNOWN) {
s.append(count);
} else {
s.append(String.format("%d%% (%d/%d)",
count * 100 / t.total,
count, t.total));
}
}
}
if (spinnerState != NO_SPINNER) {
// Don't output a spinner until the alarm fires for the first time.
s.append(" (").append(spinnerState).append(')');
}
return s;
}
private void send(StringBuilder s) {
if (write) {
try {
out.write(Constants.encode(s.toString()));
out.flush();
} catch (IOException e) {
write = false;
}
}
}
}

View File

@@ -14,6 +14,8 @@
package com.google.gerrit.server.git;
import static com.google.gerrit.server.git.MultiProgressMonitor.UNKNOWN;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.PageLinks;
import com.google.gerrit.common.data.ApprovalType;
@@ -40,6 +42,7 @@ import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.account.AccountResolver;
import com.google.gerrit.server.config.CanonicalWebUrl;
import com.google.gerrit.server.config.TrackingFooters;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.mail.CreateChangeSender;
import com.google.gerrit.server.mail.EmailException;
import com.google.gerrit.server.mail.MergedSender;
@@ -119,6 +122,7 @@ public class ReceiveCommits {
void sendError(String what);
void sendBytes(byte[] what);
void sendBytes(byte[] what, int off, int len);
void flush();
}
private class ReceivePackMessageSender implements MessageSender {
@@ -134,11 +138,7 @@ public class ReceiveCommits {
@Override
public void sendBytes(byte[] what) {
try {
rp.getMessageOutputStream().write(what);
} catch (IOException e) {
// Ignore write failures (matching JGit behavior).
}
sendBytes(what, 0, what.length);
}
@Override
@@ -149,6 +149,15 @@ public class ReceiveCommits {
// Ignore write failures (matching JGit behavior).
}
}
@Override
public void flush() {
try {
rp.getMessageOutputStream().flush();
} catch (IOException e) {
// Ignore write failures (matching JGit behavior).
}
}
}
private static class Message {
@@ -206,6 +215,10 @@ public class ReceiveCommits {
private final SubmoduleOp.Factory subOpFactory;
private final List<Message> messages = new ArrayList<Message>();
private Task newProgress;
private Task replaceProgress;
private Task closeProgress;
private Task commandProgress;
private MessageSender messageSender;
@Inject
@@ -406,14 +419,23 @@ public class ReceiveCommits {
}
}
void processCommands(final Collection<ReceiveCommand> commands) {
void processCommands(final Collection<ReceiveCommand> commands,
final MultiProgressMonitor progress) {
try {
newProgress = progress.beginSubTask("new", UNKNOWN);
replaceProgress = progress.beginSubTask("updated", UNKNOWN);
closeProgress = progress.beginSubTask("closed", UNKNOWN);
commandProgress = progress.beginSubTask("refs", commands.size());
parseCommands(commands);
if (newChange != null
&& newChange.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) {
createNewChanges();
}
newProgress.end();
doReplaces();
replaceProgress.end();
for (final ReceiveCommand c : commands) {
if (c.getResult() == Result.OK) {
@@ -456,8 +478,12 @@ public class ReceiveCommits {
Branch.NameKey destBranch = new Branch.NameKey(project.getNameKey(), c.getRefName());
hooks.doRefUpdatedHook(destBranch, c.getOldId(), c.getNewId(), currentUser.getAccount());
}
commandProgress.update(1);
}
}
closeProgress.end();
commandProgress.end();
progress.end();
if (!allNewChanges.isEmpty() && canonicalWebUrl != null) {
final String url = canonicalWebUrl;
@@ -983,6 +1009,7 @@ public class ReceiveCommits {
reject(newChange, "database error");
return;
}
newProgress.update(1);
}
newChange.setResult(ReceiveCommand.Result.OK);
}
@@ -1124,6 +1151,7 @@ public class ReceiveCommits {
for (final ReplaceRequest request : replaceByChange.values()) {
try {
doReplace(request);
replaceProgress.update(1);
} catch (IOException err) {
log.error("Error computing replacement patch for change "
+ request.ontoChange + ", commit " + request.newCommit.name(), err);
@@ -1841,6 +1869,7 @@ public class ReceiveCommits {
if (ref != null) {
rw.parseBody(c);
closeChange(cmd, PatchSet.Id.fromRef(ref.getName()), c);
closeProgress.update(1);
continue;
}
@@ -1858,6 +1887,7 @@ public class ReceiveCommits {
final PatchSet.Id psi = doReplace(req);
if (psi != null) {
closeChange(req.cmd, psi, req.newCommit);
closeProgress.update(1);
}
}
@@ -2026,12 +2056,13 @@ public class ReceiveCommits {
return new RevId(src.getId().name());
}
private static void reject(final ReceiveCommand cmd) {
private void reject(final ReceiveCommand cmd) {
reject(cmd, "prohibited by Gerrit");
}
static void reject(final ReceiveCommand cmd, final String why) {
void reject(final ReceiveCommand cmd, final String why) {
cmd.setResult(ReceiveCommand.Result.REJECTED_OTHER_REASON, why);
commandProgress.update(1);
}
private static boolean isHead(final Ref ref) {