Disallow change index task duplication

Do not allow change index and reindex tasks to be queued multiple times
for a given change.

Make sure to remove the index or reindex task from the corresponding
queue when it gets executed.

Change-Id: Ia68c1e541a3ed36873d3aa3b13548e93393b78ac
This commit is contained in:
Mihaly Petrenyi
2019-06-14 13:14:44 +02:00
committed by Marco Miller
parent da6d10f7d7
commit 3ea4b005a2
2 changed files with 97 additions and 5 deletions

View File

@@ -18,6 +18,7 @@ import static com.google.gerrit.server.extensions.events.EventUtil.logEventListe
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.Atomics;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -50,7 +51,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
@@ -111,6 +114,11 @@ public class ChangeIndexer {
private final StalenessChecker stalenessChecker;
private final boolean autoReindexIfStale;
private final Set<IndexTask> queuedIndexTasks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<ReindexIfStaleTask> queuedReindexIfStaleTasks =
Collections.newSetFromMap(new ConcurrentHashMap<>());
@AssistedInject
ChangeIndexer(
@GerritServerConfig Config cfg,
@@ -178,7 +186,11 @@ public class ChangeIndexer {
@SuppressWarnings("deprecation")
public com.google.common.util.concurrent.CheckedFuture<?, IOException> indexAsync(
Project.NameKey project, Change.Id id) {
return submit(new IndexTask(project, id));
IndexTask task = new IndexTask(project, id);
if (queuedIndexTasks.add(task)) {
return submit(task);
}
return Futures.immediateCheckedFuture(null);
}
/**
@@ -309,7 +321,11 @@ public class ChangeIndexer {
@SuppressWarnings("deprecation")
public com.google.common.util.concurrent.CheckedFuture<Boolean, IOException> reindexIfStale(
Project.NameKey project, Change.Id id) {
return submit(new ReindexIfStaleTask(project, id), batchExecutor);
ReindexIfStaleTask task = new ReindexIfStaleTask(project, id);
if (queuedReindexIfStaleTasks.add(task)) {
return submit(task, batchExecutor);
}
return Futures.immediateCheckedFuture(false);
}
private void autoReindexIfStale(ChangeData cd) {
@@ -351,6 +367,8 @@ public class ChangeIndexer {
protected abstract T callImpl(Provider<ReviewDb> db) throws Exception;
protected abstract void remove();
@Override
public abstract String toString();
@@ -405,15 +423,35 @@ public class ChangeIndexer {
@Override
public Void callImpl(Provider<ReviewDb> db) throws Exception {
remove();
ChangeData cd = newChangeData(db.get(), project, id);
index(cd);
return null;
}
@Override
public int hashCode() {
return Objects.hashCode(IndexTask.class, id.get());
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof IndexTask)) {
return false;
}
IndexTask other = (IndexTask) obj;
return id.get() == other.id.get();
}
@Override
public String toString() {
return "index-change-" + id;
}
@Override
protected void remove() {
queuedIndexTasks.remove(this);
}
}
// Not AbstractIndexTask as it doesn't need ReviewDb.
@@ -445,6 +483,7 @@ public class ChangeIndexer {
@Override
public Boolean callImpl(Provider<ReviewDb> db) throws Exception {
remove();
try {
if (stalenessChecker.isStale(id)) {
index(newChangeData(db.get(), project, id));
@@ -464,10 +503,29 @@ public class ChangeIndexer {
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(ReindexIfStaleTask.class, id.get());
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ReindexIfStaleTask)) {
return false;
}
ReindexIfStaleTask other = (ReindexIfStaleTask) obj;
return id.get() == other.id.get();
}
@Override
public String toString() {
return "reindex-if-stale-change-" + id;
}
@Override
protected void remove() {
queuedReindexIfStaleTasks.remove(this);
}
}
private boolean isCausedByRepositoryNotFoundException(Throwable throwable) {

View File

@@ -17,6 +17,7 @@ package com.google.gerrit.server.index.change;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.gerrit.server.query.change.ChangeData.asChanges;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -42,8 +43,11 @@ import com.google.gwtorm.server.OrmException;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
@@ -62,6 +66,8 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener {
private final ListeningExecutorService executor;
private final boolean enabled;
private final Set<Index> queuedIndexTasks = Collections.newSetFromMap(new ConcurrentHashMap<>());
@Inject
ReindexAfterRefUpdate(
@GerritServerConfig Config cfg,
@@ -109,9 +115,12 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener {
@Override
public void onSuccess(List<Change> changes) {
for (Change c : changes) {
// Don't retry indefinitely; if this fails changes may be stale.
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = executor.submit(new Index(event, c.getId()));
Index task = new Index(event, c.getId());
if (queuedIndexTasks.add(task)) {
// Don't retry indefinitely; if this fails changes may be stale.
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = executor.submit(task);
}
}
}
@@ -141,6 +150,8 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener {
}
protected abstract V impl(RequestContext ctx) throws Exception;
protected abstract void remove();
}
private class GetChanges extends Task<List<Change>> {
@@ -165,6 +176,9 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener {
+ " update of project "
+ event.getProjectName();
}
@Override
protected void remove() {}
}
private class Index extends Task<Void> {
@@ -179,6 +193,7 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener {
protected Void impl(RequestContext ctx) throws OrmException, IOException {
// Reload change, as some time may have passed since GetChanges.
ReviewDb db = ctx.getReviewDbProvider().get();
remove();
try {
Change c =
notesFactory
@@ -191,9 +206,28 @@ public class ReindexAfterRefUpdate implements GitReferenceUpdatedListener {
return null;
}
@Override
public int hashCode() {
return Objects.hashCode(Index.class, id.get());
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Index)) {
return false;
}
Index other = (Index) obj;
return id.get() == other.id.get();
}
@Override
public String toString() {
return "Index change " + id.get() + " of project " + event.getProjectName();
}
@Override
protected void remove() {
queuedIndexTasks.remove(this);
}
}
}