diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java index c991cb08db..1650043af9 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java @@ -15,9 +15,10 @@ package com.google.gerrit.server.git; import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.server.git.ReceiveCommits.MessageSender; +import com.google.gerrit.server.git.WorkQueue.Executor; import com.google.gerrit.server.project.ProjectControl; import com.google.gerrit.server.util.RequestScopePropagator; -import com.google.gerrit.server.git.WorkQueue.Executor; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.FactoryModuleBuilder; import com.google.inject.Inject; @@ -30,19 +31,15 @@ import org.eclipse.jgit.transport.ReceivePack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.OutputStream; import java.util.Collection; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** Hook that delegates to {@link ReceiveCommits} in a worker thread. */ public class AsyncReceiveCommits implements PreReceiveHook { private static final Logger log = LoggerFactory.getLogger(AsyncReceiveCommits.class); - private final ReceiveCommits rc; - private final Executor executor; - private final RequestScopePropagator scopePropagator; - public interface Factory { AsyncReceiveCommits create(ProjectControl projectControl, Repository repository); @@ -70,7 +67,7 @@ public class AsyncReceiveCommits implements PreReceiveHook { @Override public void run() { - rc.processCommands(commands); + rc.processCommands(commands, progress); } @Override @@ -94,6 +91,35 @@ public class AsyncReceiveCommits implements PreReceiveHook { } } + private class MessageSenderOutputStream extends OutputStream { + private final MessageSender messageSender = rc.getMessageSender(); + + @Override + public void write(int b) { + messageSender.sendBytes(new byte[]{(byte)b}); + } + + @Override + public void write(byte[] what, int off, int len) { + messageSender.sendBytes(what, off, len); + } + + @Override + public void write(byte[] what) { + messageSender.sendBytes(what); + } + + @Override + public void flush() { + messageSender.flush(); + } + } + + private final ReceiveCommits rc; + private final Executor executor; + private final RequestScopePropagator scopePropagator; + private final MultiProgressMonitor progress; + @Inject AsyncReceiveCommits(final ReceiveCommits.Factory factory, @ReceiveCommitsExecutor final Executor executor, @@ -104,29 +130,25 @@ public class AsyncReceiveCommits implements PreReceiveHook { this.scopePropagator = scopePropagator; rc = factory.create(projectControl, repo); rc.getReceivePack().setPreReceiveHook(this); + + progress = new MultiProgressMonitor( + new MessageSenderOutputStream(), "Updating changes"); } @Override public void onPreReceive(final ReceivePack rp, final Collection commands) { - Future workerFuture = executor.submit( - scopePropagator.wrap(new Worker(commands))); - Exception err = null; try { - workerFuture.get(); + progress.waitFor(executor.submit( + scopePropagator.wrap(new Worker(commands)))); } catch (ExecutionException e) { - err = e; - } catch (InterruptedException e) { - err = e; - } - if (err != null) { - log.warn("Error in ReceiveCommits", err); + log.warn("Error in ReceiveCommits", e); rc.getMessageSender().sendError("internal error while processing changes"); // ReceiveCommits has tried its best to catch errors, so anything at this // point is very bad. for (final ReceiveCommand c : commands) { if (c.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) { - ReceiveCommits.reject(c, "internal error"); + rc.reject(c, "internal error"); } } } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/MultiProgressMonitor.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/MultiProgressMonitor.java new file mode 100644 index 0000000000..d6dd38ac9e --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/MultiProgressMonitor.java @@ -0,0 +1,284 @@ +// Copyright (C) 2012 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.git; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import org.eclipse.jgit.lib.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.List; + +/** + * Progress reporting interface that multiplexes multiple sub-tasks. + *

+ * Output is of the format: + *

+ *   Task: subA: 1, subB: 75% (3/4) (-)\r
+ *   Task: subA: 2, subB: 75% (3/4), subC: 1 (\)\r
+ *   Task: subA: 2, subB: 100% (4/4), subC: 1 (|)\r
+ *   Task: subA: 4, subB: 100% (4/4), subC: 4, done    \n
+ * 
+ *

+ * Callers should try to keep task and sub-task descriptions short, since the + * output should fit on one terminal line. (Note that git clients do not accept + * terminal control characters, so true multi-line progress messages would be + * impossible.) + */ +public class MultiProgressMonitor { + private static final Logger log = + LoggerFactory.getLogger(MultiProgressMonitor.class); + + /** Constant indicating the total work units cannot be predicted. */ + public static final int UNKNOWN = 0; + + private static final char[] SPINNER_STATES = new char[]{'-', '\\', '|', '/'}; + private static final char NO_SPINNER = ' '; + + /** Handle for a sub-task. */ + public class Task { + private final String name; + private final int total; + private volatile int count; + private int lastPercent; + + Task(final String subTaskName, final int totalWork) { + this.name = subTaskName; + this.total = totalWork; + } + + /** + * Indicate that work has been completed on this sub-task. + *

+ * Must be called from the worker thread. + * + * @param completed number of work units completed. + */ + public void update(final int completed) { + count += completed; + if (total != UNKNOWN) { + int percent = count * 100 / total; + if (percent > lastPercent) { + lastPercent = percent; + wakeUp(); + } + } + } + + /** + * Indicate that this sub-task is finished. + *

+ * Must be called from the worker thread. + */ + public void end() { + if (total == UNKNOWN && count > 0) { + wakeUp(); + } + } + } + + private final OutputStream out; + private final String taskName; + private final List tasks = new CopyOnWriteArrayList(); + private int spinnerIndex; + private char spinnerState = NO_SPINNER; + private boolean done; + private boolean write = true; + + private final long maxIntervalNanos; + + /** + * Create a new progress monitor for multiple sub-tasks. + * + * @param out stream for writing progress messages. + * @param taskName name of the overall task. + */ + public MultiProgressMonitor(final OutputStream out, final String taskName) { + this(out, taskName, 500, TimeUnit.MILLISECONDS); + } + + /** + * Create a new progress monitor for multiple sub-tasks. + * + * @param out stream for writing progress messages. + * @param taskName name of the overall task. + * @param maxIntervalTime maximum interval between progress messages. + * @param maxIntervalUnit time unit for progress interval. + */ + public MultiProgressMonitor(final OutputStream out, final String taskName, + long maxIntervalTime, TimeUnit maxIntervalUnit) { + this.out = out; + this.taskName = taskName; + maxIntervalNanos = NANOSECONDS.convert(maxIntervalTime, maxIntervalUnit); + } + + /** + * Wait for a task managed by a {@link Future}. + *

+ * Must be called from the main thread, not the worker thread. Once + * the worker thread calls {@link #end()}, the future has an additional + * maxInterval to finish before it is forcefully cancelled and + * {@link ExecutionException} is thrown. + * + * @param workerFuture a future that returns when the worker thread is + * finished. + * + * @throws ExecutionException if this thread or the worker thread was + * interrupted, the worker was cancelled, or the worker timed out. + */ + public void waitFor(Future workerFuture) throws ExecutionException { + synchronized (this) { + long left = maxIntervalNanos; + while (!done) { + long start = System.nanoTime(); + try { + NANOSECONDS.timedWait(this, left); + } catch (InterruptedException e) { + throw new ExecutionException(e); + } + + // Send an update on every wakeup (manual or spurious), but only move + // the spinner every maxInterval. + left -= System.nanoTime() - start; + if (left <= 0) { + moveSpinner(); + left = maxIntervalNanos; + } + sendUpdate(); + if (!done && workerFuture.isDone()) { + // The worker may not have called end() explicitly, which is likely a + // programming error. + log.warn("MultiProgressMonitor worker did not call end()" + + " before returning"); + end(); + } + } + sendDone(); + } + + // The loop exits as soon as the worker calls end(), but we give it another + // maxInterval to finish up and return. + try { + workerFuture.get(maxIntervalNanos, NANOSECONDS); + } catch (InterruptedException e) { + throw new ExecutionException(e); + } catch (CancellationException e) { + throw new ExecutionException(e); + } catch (TimeoutException e) { + workerFuture.cancel(true); + throw new ExecutionException(e); + } + } + + private synchronized void wakeUp() { + notifyAll(); + } + + /** + * Begin a sub-task. + * + * @param subTask sub-task name. + * @param subTaskWork total work units in sub-task, or {@link #UNKNOWN}. + * @return sub-task handle. + */ + public Task beginSubTask(final String subTask, final int subTaskWork) { + Task task = new Task(subTask, subTaskWork); + tasks.add(task); + return task; + } + + /** + * End the overall task. + *

+ * Must be called from the worker thread. + */ + public synchronized void end() { + done = true; + wakeUp(); + } + + private void sendDone() { + spinnerState = NO_SPINNER; + StringBuilder s = format(); + s.append(", done \n"); + send(s); + } + + private void moveSpinner() { + spinnerIndex = (spinnerIndex + 1) % SPINNER_STATES.length; + spinnerState = SPINNER_STATES[spinnerIndex]; + } + + private void sendUpdate() { + send(format()); + } + + private StringBuilder format() { + StringBuilder s = new StringBuilder().append("\r").append(taskName) + .append(':'); + + if (!tasks.isEmpty()) { + boolean first = true; + for (Task t : tasks) { + int count = t.count; + if (count == 0) { + continue; + } + + if (!first) { + s.append(','); + } else { + first = false; + } + + s.append(' ').append(t.name).append(": "); + if (t.total == UNKNOWN) { + s.append(count); + } else { + s.append(String.format("%d%% (%d/%d)", + count * 100 / t.total, + count, t.total)); + } + } + } + + if (spinnerState != NO_SPINNER) { + // Don't output a spinner until the alarm fires for the first time. + s.append(" (").append(spinnerState).append(')'); + } + return s; + } + + private void send(StringBuilder s) { + if (write) { + try { + out.write(Constants.encode(s.toString())); + out.flush(); + } catch (IOException e) { + write = false; + } + } + } +} diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java index aa19c1e264..544673d0f0 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java @@ -14,6 +14,8 @@ package com.google.gerrit.server.git; +import static com.google.gerrit.server.git.MultiProgressMonitor.UNKNOWN; + import com.google.gerrit.common.ChangeHooks; import com.google.gerrit.common.PageLinks; import com.google.gerrit.common.data.ApprovalType; @@ -40,6 +42,7 @@ import com.google.gerrit.server.IdentifiedUser; import com.google.gerrit.server.account.AccountResolver; import com.google.gerrit.server.config.CanonicalWebUrl; import com.google.gerrit.server.config.TrackingFooters; +import com.google.gerrit.server.git.MultiProgressMonitor.Task; import com.google.gerrit.server.mail.CreateChangeSender; import com.google.gerrit.server.mail.EmailException; import com.google.gerrit.server.mail.MergedSender; @@ -119,6 +122,7 @@ public class ReceiveCommits { void sendError(String what); void sendBytes(byte[] what); void sendBytes(byte[] what, int off, int len); + void flush(); } private class ReceivePackMessageSender implements MessageSender { @@ -134,11 +138,7 @@ public class ReceiveCommits { @Override public void sendBytes(byte[] what) { - try { - rp.getMessageOutputStream().write(what); - } catch (IOException e) { - // Ignore write failures (matching JGit behavior). - } + sendBytes(what, 0, what.length); } @Override @@ -149,6 +149,15 @@ public class ReceiveCommits { // Ignore write failures (matching JGit behavior). } } + + @Override + public void flush() { + try { + rp.getMessageOutputStream().flush(); + } catch (IOException e) { + // Ignore write failures (matching JGit behavior). + } + } } private static class Message { @@ -206,6 +215,10 @@ public class ReceiveCommits { private final SubmoduleOp.Factory subOpFactory; private final List messages = new ArrayList(); + private Task newProgress; + private Task replaceProgress; + private Task closeProgress; + private Task commandProgress; private MessageSender messageSender; @Inject @@ -406,14 +419,23 @@ public class ReceiveCommits { } } - void processCommands(final Collection commands) { + void processCommands(final Collection commands, + final MultiProgressMonitor progress) { try { + newProgress = progress.beginSubTask("new", UNKNOWN); + replaceProgress = progress.beginSubTask("updated", UNKNOWN); + closeProgress = progress.beginSubTask("closed", UNKNOWN); + commandProgress = progress.beginSubTask("refs", commands.size()); + parseCommands(commands); if (newChange != null && newChange.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) { createNewChanges(); } + newProgress.end(); + doReplaces(); + replaceProgress.end(); for (final ReceiveCommand c : commands) { if (c.getResult() == Result.OK) { @@ -456,8 +478,12 @@ public class ReceiveCommits { Branch.NameKey destBranch = new Branch.NameKey(project.getNameKey(), c.getRefName()); hooks.doRefUpdatedHook(destBranch, c.getOldId(), c.getNewId(), currentUser.getAccount()); } + commandProgress.update(1); } } + closeProgress.end(); + commandProgress.end(); + progress.end(); if (!allNewChanges.isEmpty() && canonicalWebUrl != null) { final String url = canonicalWebUrl; @@ -538,7 +564,7 @@ public class ReceiveCommits { continue; } - if (cmd.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED){ + if (cmd.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) { continue; } @@ -983,6 +1009,7 @@ public class ReceiveCommits { reject(newChange, "database error"); return; } + newProgress.update(1); } newChange.setResult(ReceiveCommand.Result.OK); } @@ -1124,6 +1151,7 @@ public class ReceiveCommits { for (final ReplaceRequest request : replaceByChange.values()) { try { doReplace(request); + replaceProgress.update(1); } catch (IOException err) { log.error("Error computing replacement patch for change " + request.ontoChange + ", commit " + request.newCommit.name(), err); @@ -1841,6 +1869,7 @@ public class ReceiveCommits { if (ref != null) { rw.parseBody(c); closeChange(cmd, PatchSet.Id.fromRef(ref.getName()), c); + closeProgress.update(1); continue; } @@ -1858,6 +1887,7 @@ public class ReceiveCommits { final PatchSet.Id psi = doReplace(req); if (psi != null) { closeChange(req.cmd, psi, req.newCommit); + closeProgress.update(1); } } @@ -2026,12 +2056,13 @@ public class ReceiveCommits { return new RevId(src.getId().name()); } - private static void reject(final ReceiveCommand cmd) { + private void reject(final ReceiveCommand cmd) { reject(cmd, "prohibited by Gerrit"); } - static void reject(final ReceiveCommand cmd, final String why) { + void reject(final ReceiveCommand cmd, final String why) { cmd.setResult(ReceiveCommand.Result.REJECTED_OTHER_REASON, why); + commandProgress.update(1); } private static boolean isHead(final Ref ref) {