Merge "Refactor AsyncReceiveCommits and ReceiveCommits"

This commit is contained in:
Patrick Hiesel
2020-04-24 15:05:43 +00:00
committed by Gerrit Code Review
7 changed files with 341 additions and 283 deletions

View File

@@ -18,6 +18,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.base.Strings;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
@@ -164,8 +165,12 @@ public class MultiProgressMonitor {
*
* @see #waitFor(Future, long, TimeUnit)
*/
public void waitFor(Future<?> workerFuture) throws ExecutionException {
waitFor(workerFuture, 0, null);
public <T> T waitFor(Future<T> workerFuture) {
try {
return waitFor(workerFuture, 0, null);
} catch (TimeoutException e) {
throw new IllegalStateException("timout exception without setting a timeout", e);
}
}
/**
@@ -182,11 +187,10 @@ public class MultiProgressMonitor {
* @throws ExecutionException if this thread or a worker thread was interrupted, the worker was
* cancelled, or timed out waiting for a worker to call {@link #end()}.
*/
public void waitFor(Future<?> workerFuture, long timeoutTime, TimeUnit timeoutUnit)
throws ExecutionException {
public <T> T waitFor(Future<T> workerFuture, long timeoutTime, TimeUnit timeoutUnit)
throws TimeoutException {
long overallStart = System.nanoTime();
long deadline;
String detailMessage = "";
if (timeoutTime > 0) {
deadline = overallStart + NANOSECONDS.convert(timeoutTime, timeoutUnit);
} else {
@@ -200,7 +204,7 @@ public class MultiProgressMonitor {
try {
NANOSECONDS.timedWait(this, left);
} catch (InterruptedException e) {
throw new ExecutionException(e);
throw new UncheckedExecutionException(e);
}
// Send an update on every wakeup (manual or spurious), but only move
@@ -210,13 +214,10 @@ public class MultiProgressMonitor {
if (deadline > 0 && now > deadline) {
workerFuture.cancel(true);
if (workerFuture.isCancelled()) {
detailMessage =
String.format(
"(timeout %sms, cancelled)",
TimeUnit.MILLISECONDS.convert(now - deadline, NANOSECONDS));
logger.atWarning().log(
"MultiProgressMonitor worker killed after %sms: %s",
TimeUnit.MILLISECONDS.convert(now - overallStart, NANOSECONDS), detailMessage);
"MultiProgressMonitor worker killed after %sms: (timeout %sms, cancelled)",
TimeUnit.MILLISECONDS.convert(now - overallStart, NANOSECONDS),
TimeUnit.MILLISECONDS.convert(now - deadline, NANOSECONDS));
}
break;
}
@@ -240,14 +241,15 @@ public class MultiProgressMonitor {
// The loop exits as soon as the worker calls end(), but we give it another
// maxInterval to finish up and return.
try {
workerFuture.get(maxIntervalNanos, NANOSECONDS);
} catch (InterruptedException e) {
throw new ExecutionException(e);
} catch (CancellationException e) {
throw new ExecutionException(detailMessage, e);
return workerFuture.get(maxIntervalNanos, NANOSECONDS);
} catch (InterruptedException | CancellationException e) {
logger.atWarning().withCause(e).log("unable to finish processing");
throw new UncheckedExecutionException(e);
} catch (TimeoutException e) {
workerFuture.cancel(true);
throw new ExecutionException(e);
throw e;
} catch (ExecutionException e) {
throw new UncheckedExecutionException(e);
}
}

View File

@@ -13,13 +13,70 @@
// limitations under the License.
package com.google.gerrit.server.git;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.entities.Project;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/** Used to retrieve the project name from an operation * */
public interface ProjectRunnable extends Runnable {
Project.NameKey getProjectNameKey();
@Nullable
String getRemoteName();
boolean hasCustomizedPrint();
/**
* Wraps the callable as a {@link FutureTask} and makes it comply with the {@link ProjectRunnable}
* interface.
*/
static <T> FutureTask<T> fromCallable(
Callable<T> callable,
Project.NameKey projectName,
String operationName,
@Nullable String remoteHostname,
boolean hasCustomPrint) {
return new FromCallable<>(callable, projectName, operationName, remoteHostname, hasCustomPrint);
}
class FromCallable<T> extends FutureTask<T> implements ProjectRunnable {
private final Project.NameKey project;
private final String operationName;
private final String remoteHostname;
private final boolean hasCustomPrint;
FromCallable(
Callable<T> callable,
Project.NameKey project,
String operationName,
@Nullable String remoteHostname,
boolean hasCustomPrint) {
super(callable);
this.project = project;
this.operationName = operationName;
this.remoteHostname = remoteHostname;
this.hasCustomPrint = hasCustomPrint;
}
@Override
public Project.NameKey getProjectNameKey() {
return project;
}
@Override
public String getRemoteName() {
return remoteHostname;
}
@Override
public boolean hasCustomizedPrint() {
return hasCustomPrint;
}
@Override
public String toString() {
return operationName;
}
}
}

View File

@@ -14,10 +14,12 @@
package com.google.gerrit.server.git.receive;
import static com.google.common.base.Preconditions.checkState;
import static com.google.gerrit.server.quota.QuotaGroupDefinitions.REPOSITORY_SIZE_GROUP;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.flogger.FluentLogger;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.common.UsedAt;
import com.google.gerrit.common.data.Capable;
@@ -66,10 +68,13 @@ import com.google.inject.name.Named;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.PreReceiveHook;
@@ -84,7 +89,7 @@ import org.eclipse.jgit.transport.ReceivePack;
* of time, it runs in the background so it can be monitored for timeouts and cancelled, and have
* stalls reported to the user from the main thread.
*/
public class AsyncReceiveCommits implements PreReceiveHook {
public class AsyncReceiveCommits {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String TIMEOUT_NAME = "ReceiveCommitsOverallTimeout";
@@ -119,74 +124,30 @@ public class AsyncReceiveCommits implements PreReceiveHook {
}
}
private class Worker implements ProjectRunnable {
final MultiProgressMonitor progress;
final String name;
private static MultiProgressMonitor newMultiProgressMonitor(MessageSender messageSender) {
return new MultiProgressMonitor(
new OutputStream() {
@Override
public void write(int b) {
messageSender.sendBytes(new byte[] {(byte) b});
}
private final Collection<ReceiveCommand> commands;
@Override
public void write(byte[] what, int off, int len) {
messageSender.sendBytes(what, off, len);
}
private Worker(Collection<ReceiveCommand> commands, String name) {
this.commands = commands;
this.name = name;
progress = new MultiProgressMonitor(new MessageSenderOutputStream(), "Processing changes");
}
@Override
public void write(byte[] what) {
messageSender.sendBytes(what);
}
@Override
public void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(oldName + "-for-" + name);
try {
receiveCommits.processCommands(commands, progress);
} finally {
Thread.currentThread().setName(oldName);
}
}
@Override
public Project.NameKey getProjectNameKey() {
return receiveCommits.getProject().getNameKey();
}
@Override
public String getRemoteName() {
return null;
}
@Override
public boolean hasCustomizedPrint() {
return true;
}
@Override
public String toString() {
return "receive-commits";
}
void sendMessages() {
receiveCommits.sendMessages();
}
private class MessageSenderOutputStream extends OutputStream {
@Override
public void write(int b) {
receiveCommits.getMessageSender().sendBytes(new byte[] {(byte) b});
}
@Override
public void write(byte[] what, int off, int len) {
receiveCommits.getMessageSender().sendBytes(what, off, len);
}
@Override
public void write(byte[] what) {
receiveCommits.getMessageSender().sendBytes(what);
}
@Override
public void flush() {
receiveCommits.getMessageSender().flush();
}
}
@Override
public void flush() {
messageSender.flush();
}
},
"Processing changes");
}
private enum PushType {
@@ -245,7 +206,6 @@ public class AsyncReceiveCommits implements PreReceiveHook {
private final Metrics metrics;
private final ReceiveCommits receiveCommits;
private final ResultChangeIds resultChangeIds;
private final PermissionBackend.ForProject perm;
private final ReceivePack receivePack;
private final ExecutorService executor;
@@ -303,7 +263,7 @@ public class AsyncReceiveCommits implements PreReceiveHook {
receivePack.setCheckReceivedObjects(projectState.getConfig().getCheckReceivedObjects());
receivePack.setRefFilter(new ReceiveRefFilter());
receivePack.setAllowPushOptions(true);
receivePack.setPreReceiveHook(this);
receivePack.setPreReceiveHook(asHook());
receivePack.setPostReceiveHook(lazyPostReceive.create(user, projectName));
try {
@@ -323,10 +283,8 @@ public class AsyncReceiveCommits implements PreReceiveHook {
queryProvider,
projectName,
user.getAccountId()));
resultChangeIds = new ResultChangeIds();
receiveCommits =
factory.create(
projectState, user, receivePack, repo, allRefsWatcher, messageSender, resultChangeIds);
factory.create(projectState, user, receivePack, repo, allRefsWatcher, messageSender);
receiveCommits.init();
QuotaResponse.Aggregated availableTokens =
quotaBackend.user(user).project(projectName).availableTokens(REPOSITORY_SIZE_GROUP);
@@ -361,48 +319,90 @@ public class AsyncReceiveCommits implements PreReceiveHook {
return Capable.OK;
}
@Override
public void onPreReceive(ReceivePack rp, Collection<ReceiveCommand> commands) {
/**
* Returns a {@link PreReceiveHook} implementation that can be used directly by JGit when
* processing a push.
*/
public PreReceiveHook asHook() {
return (rp, commands) -> {
checkState(receivePack == rp, "can't perform PreReceive for a different receive pack");
long startNanos = System.nanoTime();
ReceiveCommitsResult result;
try {
result = preReceive(commands);
} catch (TimeoutException e) {
metrics.timeouts.increment();
logger.atWarning().withCause(e).log(
"Timeout in ReceiveCommits while processing changes for project %s",
projectState.getName());
receivePack.sendError("timeout while processing changes");
rejectCommandsNotAttempted(commands);
return;
} catch (Exception e) {
logger.atSevere().withCause(e.getCause()).log("error while processing push");
receivePack.sendError("internal error");
rejectCommandsNotAttempted(commands);
return;
} finally {
// Flush the messages queued up until now (if any).
receiveCommits.sendMessages();
}
reportMetrics(result, System.nanoTime() - startNanos);
};
}
/** Processes {@code commands}, applies them to Git storage and communicates back on the wire. */
@UsedAt(UsedAt.Project.GOOGLE)
public ReceiveCommitsResult preReceive(Collection<ReceiveCommand> commands)
throws TimeoutException, UncheckedExecutionException {
if (commands.stream().anyMatch(c -> c.getResult() != Result.NOT_ATTEMPTED)) {
// Stop processing when command was already processed by previously invoked
// pre-receive hooks
return;
return ReceiveCommitsResult.empty();
}
long startNanos = System.nanoTime();
Worker w = new Worker(commands, Thread.currentThread().getName());
MultiProgressMonitor monitor = newMultiProgressMonitor(receiveCommits.getMessageSender());
Callable<ReceiveCommitsResult> callable =
() -> {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(oldName + "-for-" + Thread.currentThread().getName());
try {
return receiveCommits.processCommands(commands, monitor);
} finally {
Thread.currentThread().setName(oldName);
}
};
try {
w.progress.waitFor(
executor.submit(scopePropagator.wrap(w)), timeoutMillis, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
metrics.timeouts.increment();
logger.atWarning().withCause(e).log(
"Error in ReceiveCommits while processing changes for project %s",
projectState.getName());
rp.sendError("internal error while processing changes");
// ReceiveCommits has tried its best to catch errors, so anything at this
// point is very bad.
for (ReceiveCommand c : commands) {
if (c.getResult() == Result.NOT_ATTEMPTED) {
c.setResult(Result.REJECTED_OTHER_REASON, "internal error");
}
// WorkQueue does not support Callable<T>, so we have to covert it here.
FutureTask<ReceiveCommitsResult> runnable =
ProjectRunnable.fromCallable(
callable, receiveCommits.getProject().getNameKey(), "receive-commits", null, false);
monitor.waitFor(
executor.submit(scopePropagator.wrap(runnable)), timeoutMillis, TimeUnit.MILLISECONDS);
if (!runnable.isDone()) {
// At this point we are either done or have thrown a TimeoutException and bailed out.
throw new IllegalStateException("unable to get receive commits result");
}
} finally {
w.sendMessages();
return runnable.get();
} catch (InterruptedException | ExecutionException e) {
throw new UncheckedExecutionException(e);
}
}
long deltaNanos = System.nanoTime() - startNanos;
int totalChanges = 0;
@UsedAt(UsedAt.Project.GOOGLE)
public void reportMetrics(ReceiveCommitsResult result, long deltaNanos) {
PushType pushType;
if (resultChangeIds.isMagicPush()) {
int totalChanges = 0;
if (result.magicPush()) {
pushType = PushType.CREATE_REPLACE;
List<Change.Id> created = resultChangeIds.get(ResultChangeIds.Key.CREATED);
List<Change.Id> replaced = resultChangeIds.get(ResultChangeIds.Key.REPLACED);
Set<Change.Id> created = result.changes().get(ReceiveCommitsResult.ChangeStatus.CREATED);
Set<Change.Id> replaced = result.changes().get(ReceiveCommitsResult.ChangeStatus.REPLACED);
metrics.changes.record(pushType, created.size() + replaced.size());
totalChanges = replaced.size() + created.size();
} else {
List<Change.Id> autoclosed = resultChangeIds.get(ResultChangeIds.Key.AUTOCLOSED);
Set<Change.Id> autoclosed =
result.changes().get(ReceiveCommitsResult.ChangeStatus.AUTOCLOSED);
if (!autoclosed.isEmpty()) {
pushType = PushType.AUTOCLOSE;
metrics.changes.record(pushType, autoclosed.size());
@@ -411,21 +411,25 @@ public class AsyncReceiveCommits implements PreReceiveHook {
pushType = PushType.NORMAL;
}
}
if (totalChanges > 0) {
metrics.latencyPerChange.record(pushType, deltaNanos / totalChanges, NANOSECONDS);
}
metrics.latencyPerPush.record(pushType, deltaNanos, NANOSECONDS);
}
/** Returns the Change.Ids that were processed in onPreReceive */
@UsedAt(UsedAt.Project.GOOGLE)
public ResultChangeIds getResultChangeIds() {
return resultChangeIds;
}
public ReceivePack getReceivePack() {
return receivePack;
}
/**
* Marks all commands that were not processed yet as {@link Result#REJECTED_OTHER_REASON}.
* Intended to be used to finish up remaining commands when errors occur during processing.
*/
private static void rejectCommandsNotAttempted(Collection<ReceiveCommand> commands) {
for (ReceiveCommand c : commands) {
if (c.getResult() == Result.NOT_ATTEMPTED) {
c.setResult(Result.REJECTED_OTHER_REASON, "internal error");
}
}
}
}

View File

@@ -226,7 +226,6 @@ import org.eclipse.jgit.revwalk.RevSort;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.revwalk.filter.RevFilter;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.eclipse.jgit.transport.ReceiveCommand.Result;
import org.eclipse.jgit.transport.ReceivePack;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.Option;
@@ -255,8 +254,7 @@ class ReceiveCommits {
ReceivePack receivePack,
Repository repository,
AllRefsWatcher allRefsWatcher,
MessageSender messageSender,
ResultChangeIds resultChangeIds);
MessageSender messageSender);
}
private class ReceivePackMessageSender implements MessageSender {
@@ -379,9 +377,12 @@ class ReceiveCommits {
private Optional<String> tracePushOption;
private MessageSender messageSender;
private ResultChangeIds resultChangeIds;
private ReceiveCommitsResult.Builder result;
private ImmutableMap<String, String> loggingTags;
/** This object is for single use only. */
private boolean used;
@Inject
ReceiveCommits(
AccountResolver accountResolver,
@@ -428,8 +429,7 @@ class ReceiveCommits {
@Assisted ReceivePack rp,
@Assisted Repository repository,
@Assisted AllRefsWatcher allRefsWatcher,
@Nullable @Assisted MessageSender messageSender,
@Assisted ResultChangeIds resultChangeIds)
@Nullable @Assisted MessageSender messageSender)
throws IOException {
// Injected fields.
this.accountResolver = accountResolver;
@@ -493,6 +493,8 @@ class ReceiveCommits {
replaceByChange = new LinkedHashMap<>();
updateGroups = new ArrayList<>();
used = false;
this.allowProjectOwnersToChangeParent =
config.getBoolean("receive", "allowProjectOwnersToChangeParent", false);
@@ -502,7 +504,7 @@ class ReceiveCommits {
// Handles for outputting back over the wire to the end user.
this.messageSender = messageSender != null ? messageSender : new ReceivePackMessageSender();
this.resultChangeIds = resultChangeIds;
this.result = ReceiveCommitsResult.builder();
this.loggingTags = ImmutableMap.of();
// TODO(hiesel): Make this decision implicit once vetted
@@ -565,7 +567,9 @@ class ReceiveCommits {
}
}
void processCommands(Collection<ReceiveCommand> commands, MultiProgressMonitor progress) {
ReceiveCommitsResult processCommands(
Collection<ReceiveCommand> commands, MultiProgressMonitor progress) throws StorageException {
checkState(!used, "Tried to re-use a ReceiveCommits objects that is single-use only");
parsePushOptions();
int commandCount = commands.size();
try (TraceContext traceContext =
@@ -604,6 +608,7 @@ class ReceiveCommits {
loggingTags = traceContext.getTags();
logger.atFine().log("Processing commands done.");
}
return result.build();
}
// Process as many commands as possible, but may leave some commands in state NOT_ATTEMPTED.
@@ -666,9 +671,7 @@ class ReceiveCommits {
try {
newChanges = selectNewAndReplacedChangesFromMagicBranch(newProgress);
} catch (IOException e) {
logger.atSevere().withCause(e).log(
"Failed to select new changes in %s", project.getName());
return;
throw new StorageException("Failed to select new changes in " + project.getName(), e);
}
}
@@ -704,7 +707,7 @@ class ReceiveCommits {
throws PermissionBackendException, IOException, NoSuchProjectException {
try (TraceTimer traceTimer =
newTimer("handleRegularCommands", Metadata.builder().resourceCount(cmds.size()))) {
resultChangeIds.setMagicPush(false);
result.magicPush(false);
for (ReceiveCommand cmd : cmds) {
parseRegularCommand(cmd);
}
@@ -728,8 +731,7 @@ class ReceiveCommits {
logger.atFine().log("Added %d additional ref updates", added);
bu.execute();
} catch (UpdateException | RestApiException e) {
rejectRemaining(cmds, INTERNAL_SERVER_ERROR);
logger.atFine().withCause(e).log("update failed:");
throw new StorageException(e);
}
Set<BranchNameKey> branches = new HashSet<>();
@@ -948,9 +950,12 @@ class ReceiveCommits {
}
replaceByChange.values().stream()
.forEach(req -> resultChangeIds.add(ResultChangeIds.Key.REPLACED, req.ontoChange));
.forEach(
req ->
result.addChange(ReceiveCommitsResult.ChangeStatus.REPLACED, req.ontoChange));
newChanges.stream()
.forEach(req -> resultChangeIds.add(ResultChangeIds.Key.CREATED, req.changeId));
.forEach(
req -> result.addChange(ReceiveCommitsResult.ChangeStatus.CREATED, req.changeId));
if (magicBranchCmd != null) {
magicBranchCmd.setResult(OK);
@@ -975,9 +980,7 @@ class ReceiveCommits {
logger.atFine().withCause(e).log("Rejecting due to client error");
reject(magicBranchCmd, e.getMessage());
} catch (RestApiException | IOException e) {
logger.atSevere().withCause(e).log(
"Can't insert change/patch set for %s", project.getName());
reject(magicBranchCmd, String.format("%s: %s", INTERNAL_SERVER_ERROR, e.getMessage()));
throw new StorageException("Can't insert change/patch set for " + project.getName(), e);
}
if (magicBranch != null && magicBranch.submit) {
@@ -1291,11 +1294,11 @@ class ReceiveCommits {
RevObject obj;
try {
obj = receivePack.getRevWalk().parseAny(cmd.getNewId());
} catch (IOException err) {
logger.atSevere().withCause(err).log(
"Invalid object %s for %s creation", cmd.getNewId().name(), cmd.getRefName());
reject(cmd, "invalid object");
return;
} catch (IOException e) {
throw new StorageException(
String.format(
"Invalid object %s for %s creation", cmd.getNewId().name(), cmd.getRefName()),
e);
}
logger.atFine().log("Creating %s", cmd);
@@ -1347,11 +1350,11 @@ class ReceiveCommits {
RevObject obj;
try {
obj = receivePack.getRevWalk().parseAny(cmd.getNewId());
} catch (IOException err) {
logger.atSevere().withCause(err).log(
"Invalid object %s for %s", cmd.getNewId().name(), cmd.getRefName());
reject(cmd, "invalid object");
return false;
} catch (IOException e) {
throw new StorageException(
String.format(
"Invalid object %s for %s creation", cmd.getNewId().name(), cmd.getRefName()),
e);
}
if (obj instanceof RevCommit) {
@@ -1385,11 +1388,11 @@ class ReceiveCommits {
try (TraceTimer traceTimer = newTimer("parseRewind")) {
try {
receivePack.getRevWalk().parseCommit(cmd.getNewId());
} catch (IOException err) {
logger.atSevere().withCause(err).log(
"Invalid object %s for %s forced update", cmd.getNewId().name(), cmd.getRefName());
reject(cmd, "invalid object");
return;
} catch (IOException e) {
throw new StorageException(
String.format(
"Invalid object %s for %s creation", cmd.getNewId().name(), cmd.getRefName()),
e);
}
logger.atFine().log("Rewinding %s", cmd);
@@ -1903,10 +1906,8 @@ class ReceiveCommits {
reject(cmd, "base not found");
return;
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"Project %s cannot read %s", project.getName(), id.name());
reject(cmd, INTERNAL_SERVER_ERROR);
return;
throw new StorageException(
String.format("Project %s cannot read %s", project.getName(), id.name()), e);
}
}
} else if (newChangeForAllNotInTarget) {
@@ -1929,16 +1930,14 @@ class ReceiveCommits {
}
}
}
} catch (IOException ex) {
logger.atWarning().withCause(ex).log(
"Error walking to %s in project %s", destBranch, project.getName());
reject(cmd, INTERNAL_SERVER_ERROR);
return;
} catch (IOException e) {
throw new StorageException(
String.format("Error walking to %s in project %s", destBranch, project.getName()), e);
}
if (validateConnected(magicBranch.cmd, magicBranch.dest, tip)) {
this.magicBranch = magicBranch;
this.resultChangeIds.setMagicPush(true);
this.result.magicPush(true);
}
}
}
@@ -1994,8 +1993,7 @@ class ReceiveCommits {
logger.atFine().log("HEAD = %s", head);
return head;
} catch (IOException e) {
logger.atSevere().withCause(e).log("Cannot read HEAD symref");
return null;
throw new StorageException("Cannot read HEAD symref", e);
}
}
@@ -2061,7 +2059,7 @@ class ReceiveCommits {
try {
receivePack.getRevWalk().parseBody(create.commit);
} catch (IOException e) {
continue;
throw new StorageException("Can't parse commit", e);
}
List<String> idList = create.commit.getFooterLines(FooterConstants.CHANGE_ID);
@@ -2306,14 +2304,7 @@ class ReceiveCommits {
} catch (IOException e) {
// Should never happen, the core receive process would have
// identified the missing object earlier before we got control.
//
magicBranch.cmd.setResult(REJECTED_MISSING_OBJECT);
logger.atSevere().withCause(e).log("Invalid pack upload; one or more objects weren't sent");
return Collections.emptyList();
} catch (StorageException e) {
logger.atSevere().withCause(e).log("Cannot query database to locate prior changes");
reject(magicBranch.cmd, "database error");
return Collections.emptyList();
throw new StorageException("Invalid pack upload; one or more objects weren't sent", e);
}
if (newChanges.isEmpty() && replaceByChange.isEmpty()) {
@@ -2325,25 +2316,20 @@ class ReceiveCommits {
return newChanges;
}
try {
SortedSetMultimap<ObjectId, String> groups = groupCollector.getGroups();
List<Integer> newIds = seq.nextChangeIds(newChanges.size());
for (int i = 0; i < newChanges.size(); i++) {
CreateRequest create = newChanges.get(i);
create.setChangeId(newIds.get(i));
create.groups = ImmutableList.copyOf(groups.get(create.commit));
}
for (ReplaceRequest replace : replaceByChange.values()) {
replace.groups = ImmutableList.copyOf(groups.get(replace.newCommitId));
}
for (UpdateGroupsRequest update : updateGroups) {
update.groups = ImmutableList.copyOf((groups.get(update.commit)));
}
logger.atFine().log("Finished updating groups from GroupCollector");
} catch (StorageException e) {
logger.atSevere().withCause(e).log("Error collecting groups for changes");
reject(magicBranch.cmd, INTERNAL_SERVER_ERROR);
SortedSetMultimap<ObjectId, String> groups = groupCollector.getGroups();
List<Integer> newIds = seq.nextChangeIds(newChanges.size());
for (int i = 0; i < newChanges.size(); i++) {
CreateRequest create = newChanges.get(i);
create.setChangeId(newIds.get(i));
create.groups = ImmutableList.copyOf(groups.get(create.commit));
}
for (ReplaceRequest replace : replaceByChange.values()) {
replace.groups = ImmutableList.copyOf(groups.get(replace.newCommitId));
}
for (UpdateGroupsRequest update : updateGroups) {
update.groups = ImmutableList.copyOf((groups.get(update.commit)));
}
logger.atFine().log("Finished updating groups from GroupCollector");
return newChanges;
}
}
@@ -2656,14 +2642,9 @@ class ReceiveCommits {
req.validateNewPatchSet();
}
}
} catch (StorageException err) {
logger.atSevere().withCause(err).log(
"Cannot read database before replacement for project %s", project.getName());
rejectRemainingRequests(replaceByChange.values(), INTERNAL_SERVER_ERROR);
} catch (IOException | PermissionBackendException err) {
logger.atSevere().withCause(err).log(
"Cannot read repository before replacement for project %s", project.getName());
rejectRemainingRequests(replaceByChange.values(), INTERNAL_SERVER_ERROR);
} catch (IOException | PermissionBackendException e) {
throw new StorageException(
"Cannot read repository before replacement for project " + project.getName(), e);
}
logger.atFine().log("Read %d changes to replace", replaceByChange.size());
@@ -2671,11 +2652,11 @@ class ReceiveCommits {
// Cancel creations tied to refs/for/ command.
for (ReplaceRequest req : replaceByChange.values()) {
if (req.inputCommand == magicBranch.cmd && req.cmd != null) {
req.cmd.setResult(Result.REJECTED_OTHER_REASON, "aborted");
req.cmd.setResult(ReceiveCommand.Result.REJECTED_OTHER_REASON, "aborted");
}
}
for (CreateRequest req : newChanges) {
req.cmd.setResult(Result.REJECTED_OTHER_REASON, "aborted");
req.cmd.setResult(ReceiveCommand.Result.REJECTED_OTHER_REASON, "aborted");
}
}
}
@@ -3097,13 +3078,13 @@ class ReceiveCommits {
logger.atFine().log("Updating project description");
repo.setGitwebDescription(ps.getProject().getDescription());
} catch (IOException e) {
logger.atWarning().withCause(e).log("cannot update description of %s", project.getName());
throw new StorageException("cannot update description of " + project.getName(), e);
}
if (allProjectsName.equals(project.getNameKey())) {
try {
createGroupPermissionSyncer.syncIfNeeded();
} catch (IOException | ConfigInvalidException e) {
logger.atSevere().withCause(e).log("Can't sync create group permissions");
throw new StorageException("cannot update description of " + project.getName(), e);
}
}
}
@@ -3358,8 +3339,7 @@ class ReceiveCommits {
existingPatchSets, newPatchSets);
bu.execute();
} catch (IOException | StorageException | PermissionBackendException e) {
logger.atSevere().withCause(e).log("Failed to auto-close changes");
return null;
throw new StorageException("Failed to auto-close changes", e);
}
// If we are here, we didn't throw UpdateException. Record the result.
@@ -3367,7 +3347,8 @@ class ReceiveCommits {
// doesn't
// fit into TreeSet.
ids.stream()
.forEach(id -> resultChangeIds.add(ResultChangeIds.Key.AUTOCLOSED, id));
.forEach(
id -> result.addChange(ReceiveCommitsResult.ChangeStatus.AUTOCLOSED, id));
return null;
})

View File

@@ -0,0 +1,81 @@
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.git.receive;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gerrit.entities.Change;
import java.util.Arrays;
import java.util.EnumMap;
/** Keeps track of the change IDs thus far updated by {@link ReceiveCommits}. */
@AutoValue
public abstract class ReceiveCommitsResult {
/** Status of a change. Used to aggregate metrics. */
public enum ChangeStatus {
CREATED,
REPLACED,
AUTOCLOSED,
}
/**
* Returns change IDs of the given type for which the BatchUpdate succeeded, or empty list if
* there are none.
*/
public abstract ImmutableMap<ChangeStatus, ImmutableSet<Change.Id>> changes();
/** Indicate that the ReceiveCommits call involved a magic branch, such as {@code refs/for/}. */
public abstract boolean magicPush();
public static Builder builder() {
return new AutoValue_ReceiveCommitsResult.Builder().magicPush(false);
}
public static ReceiveCommitsResult empty() {
return builder().build();
}
@AutoValue.Builder
public abstract static class Builder {
private EnumMap<ChangeStatus, ImmutableSet.Builder<Change.Id>> changes;
Builder() {
changes = Maps.newEnumMap(ChangeStatus.class);
Arrays.stream(ChangeStatus.values()).forEach(k -> changes.put(k, ImmutableSet.builder()));
}
/** Record a change ID update as having completed. */
public Builder addChange(ChangeStatus key, Change.Id id) {
changes.get(key).add(id);
return this;
}
public abstract Builder magicPush(boolean isMagicPush);
public ReceiveCommitsResult build() {
ImmutableMap.Builder<ChangeStatus, ImmutableSet<Change.Id>> changesBuilder =
ImmutableMap.builder();
changes.entrySet().forEach(e -> changesBuilder.put(e.getKey(), e.getValue().build()));
changes(changesBuilder.build());
return autoBuild();
}
protected abstract Builder changes(ImmutableMap<ChangeStatus, ImmutableSet<Change.Id>> changes);
protected abstract ReceiveCommitsResult autoBuild();
}
}

View File

@@ -1,67 +0,0 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.git.receive;
import com.google.common.collect.ImmutableList;
import com.google.gerrit.entities.Change;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
/**
* Keeps track of the change IDs thus far updated by ReceiveCommit.
*
* <p>This class is thread-safe.
*/
public class ResultChangeIds {
public enum Key {
CREATED,
REPLACED,
AUTOCLOSED,
}
private boolean isMagicPush;
private final Map<Key, List<Change.Id>> ids;
ResultChangeIds() {
ids = new EnumMap<>(Key.class);
for (Key k : Key.values()) {
ids.put(k, new ArrayList<>());
}
}
/** Record a change ID update as having completed. Thread-safe. */
public synchronized void add(Key key, Change.Id id) {
ids.get(key).add(id);
}
/** Indicate that the ReceiveCommits call involved a magic branch. */
public synchronized void setMagicPush(boolean magic) {
isMagicPush = magic;
}
public synchronized boolean isMagicPush() {
return isMagicPush;
}
/**
* Returns change IDs of the given type for which the BatchUpdate succeeded, or empty list if
* there are none. Thread-safe.
*/
public synchronized List<Change.Id> get(Key key) {
return ImmutableList.copyOf(ids.get(key));
}
}

View File

@@ -26,6 +26,7 @@ import com.google.common.flogger.FluentLogger;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.entities.RefNames;
@@ -47,7 +48,6 @@ import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
@@ -175,7 +175,7 @@ public class AllChangesIndexer extends SiteIndexer<Change.Id, ChangeData, Change
return null;
},
directExecutor()));
} catch (ExecutionException e) {
} catch (UncheckedExecutionException e) {
logger.atSevere().withCause(e).log("Error in batch indexer");
ok.set(false);
}