ChangeIndexer: Add method to reindex a change if it is stale
At this point we have enough wiring to write a larger test in AbstractQueryChangesTest for StalenessChecker. Change-Id: Ia17351761fb4ac3ed56e360f28fd2f0a9e3c3ed0
This commit is contained in:
@@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
package com.google.gerrit.server.index.change;
|
package com.google.gerrit.server.index.change;
|
||||||
|
|
||||||
|
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
|
||||||
import static com.google.gerrit.server.extensions.events.EventUtil.logEventListenerError;
|
import static com.google.gerrit.server.extensions.events.EventUtil.logEventListenerError;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
@@ -29,6 +30,7 @@ import com.google.gerrit.reviewdb.client.Project;
|
|||||||
import com.google.gerrit.reviewdb.server.ReviewDb;
|
import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||||
import com.google.gerrit.server.CurrentUser;
|
import com.google.gerrit.server.CurrentUser;
|
||||||
import com.google.gerrit.server.index.Index;
|
import com.google.gerrit.server.index.Index;
|
||||||
|
import com.google.gerrit.server.index.IndexExecutor;
|
||||||
import com.google.gerrit.server.notedb.ChangeNotes;
|
import com.google.gerrit.server.notedb.ChangeNotes;
|
||||||
import com.google.gerrit.server.notedb.NotesMigration;
|
import com.google.gerrit.server.notedb.NotesMigration;
|
||||||
import com.google.gerrit.server.query.change.ChangeData;
|
import com.google.gerrit.server.query.change.ChangeData;
|
||||||
@@ -102,8 +104,10 @@ public class ChangeIndexer {
|
|||||||
private final ChangeNotes.Factory changeNotesFactory;
|
private final ChangeNotes.Factory changeNotesFactory;
|
||||||
private final ChangeData.Factory changeDataFactory;
|
private final ChangeData.Factory changeDataFactory;
|
||||||
private final ThreadLocalRequestContext context;
|
private final ThreadLocalRequestContext context;
|
||||||
|
private final ListeningExecutorService batchExecutor;
|
||||||
private final ListeningExecutorService executor;
|
private final ListeningExecutorService executor;
|
||||||
private final DynamicSet<ChangeIndexedListener> indexedListeners;
|
private final DynamicSet<ChangeIndexedListener> indexedListeners;
|
||||||
|
private final StalenessChecker stalenessChecker;
|
||||||
|
|
||||||
@AssistedInject
|
@AssistedInject
|
||||||
ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
|
ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
|
||||||
@@ -112,6 +116,8 @@ public class ChangeIndexer {
|
|||||||
ChangeData.Factory changeDataFactory,
|
ChangeData.Factory changeDataFactory,
|
||||||
ThreadLocalRequestContext context,
|
ThreadLocalRequestContext context,
|
||||||
DynamicSet<ChangeIndexedListener> indexedListeners,
|
DynamicSet<ChangeIndexedListener> indexedListeners,
|
||||||
|
StalenessChecker stalenessChecker,
|
||||||
|
@IndexExecutor(BATCH) ListeningExecutorService batchExecutor,
|
||||||
@Assisted ListeningExecutorService executor,
|
@Assisted ListeningExecutorService executor,
|
||||||
@Assisted ChangeIndex index) {
|
@Assisted ChangeIndex index) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
@@ -121,6 +127,8 @@ public class ChangeIndexer {
|
|||||||
this.changeDataFactory = changeDataFactory;
|
this.changeDataFactory = changeDataFactory;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.indexedListeners = indexedListeners;
|
this.indexedListeners = indexedListeners;
|
||||||
|
this.stalenessChecker = stalenessChecker;
|
||||||
|
this.batchExecutor = batchExecutor;
|
||||||
this.index = index;
|
this.index = index;
|
||||||
this.indexes = null;
|
this.indexes = null;
|
||||||
}
|
}
|
||||||
@@ -132,6 +140,8 @@ public class ChangeIndexer {
|
|||||||
ChangeData.Factory changeDataFactory,
|
ChangeData.Factory changeDataFactory,
|
||||||
ThreadLocalRequestContext context,
|
ThreadLocalRequestContext context,
|
||||||
DynamicSet<ChangeIndexedListener> indexedListeners,
|
DynamicSet<ChangeIndexedListener> indexedListeners,
|
||||||
|
StalenessChecker stalenessChecker,
|
||||||
|
@IndexExecutor(BATCH) ListeningExecutorService batchExecutor,
|
||||||
@Assisted ListeningExecutorService executor,
|
@Assisted ListeningExecutorService executor,
|
||||||
@Assisted ChangeIndexCollection indexes) {
|
@Assisted ChangeIndexCollection indexes) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
@@ -141,6 +151,8 @@ public class ChangeIndexer {
|
|||||||
this.changeDataFactory = changeDataFactory;
|
this.changeDataFactory = changeDataFactory;
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.indexedListeners = indexedListeners;
|
this.indexedListeners = indexedListeners;
|
||||||
|
this.stalenessChecker = stalenessChecker;
|
||||||
|
this.batchExecutor = batchExecutor;
|
||||||
this.index = null;
|
this.index = null;
|
||||||
this.indexes = indexes;
|
this.indexes = indexes;
|
||||||
}
|
}
|
||||||
@@ -245,28 +257,54 @@ public class ChangeIndexer {
|
|||||||
new DeleteTask(id).call();
|
new DeleteTask(id).call();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asynchronously check if a change is stale, and reindex if it is.
|
||||||
|
* <p>
|
||||||
|
* Always run on the batch executor, even if this indexer instance is
|
||||||
|
* configured to use a different executor.
|
||||||
|
*
|
||||||
|
* @param project the project to which the change belongs.
|
||||||
|
* @param id ID of the change to index.
|
||||||
|
* @return future for reindexing the change; returns true if the change was
|
||||||
|
* stale.
|
||||||
|
*/
|
||||||
|
public CheckedFuture<Boolean, IOException> reindexIfStale(
|
||||||
|
Project.NameKey project, Change.Id id) {
|
||||||
|
return submit(new ReindexIfStaleTask(project, id), batchExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
private Collection<ChangeIndex> getWriteIndexes() {
|
private Collection<ChangeIndex> getWriteIndexes() {
|
||||||
return indexes != null
|
return indexes != null
|
||||||
? indexes.getWriteIndexes()
|
? indexes.getWriteIndexes()
|
||||||
: Collections.singleton(index);
|
: Collections.singleton(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
private CheckedFuture<?, IOException> submit(Callable<?> task) {
|
private <T> CheckedFuture<T, IOException> submit(Callable<T> task) {
|
||||||
|
return submit(task, executor);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> CheckedFuture<T, IOException> submit(Callable<T> task,
|
||||||
|
ListeningExecutorService executor) {
|
||||||
return Futures.makeChecked(
|
return Futures.makeChecked(
|
||||||
Futures.nonCancellationPropagating(executor.submit(task)), MAPPER);
|
Futures.nonCancellationPropagating(executor.submit(task)), MAPPER);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class IndexTask implements Callable<Void> {
|
private abstract class AbstractIndexTask<T> implements Callable<T> {
|
||||||
private final Project.NameKey project;
|
protected final Project.NameKey project;
|
||||||
private final Change.Id id;
|
protected final Change.Id id;
|
||||||
|
|
||||||
private IndexTask(Project.NameKey project, Change.Id id) {
|
protected AbstractIndexTask(Project.NameKey project, Change.Id id) {
|
||||||
this.project = project;
|
this.project = project;
|
||||||
this.id = id;
|
this.id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected abstract T callImpl(Provider<ReviewDb> db) throws Exception;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception {
|
public abstract String toString();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final T call() throws Exception {
|
||||||
try {
|
try {
|
||||||
final AtomicReference<Provider<ReviewDb>> dbRef =
|
final AtomicReference<Provider<ReviewDb>> dbRef =
|
||||||
Atomics.newReference();
|
Atomics.newReference();
|
||||||
@@ -295,10 +333,7 @@ public class ChangeIndexer {
|
|||||||
};
|
};
|
||||||
RequestContext oldCtx = context.setContext(newCtx);
|
RequestContext oldCtx = context.setContext(newCtx);
|
||||||
try {
|
try {
|
||||||
ChangeData cd = newChangeData(
|
return callImpl(newCtx.getReviewDbProvider());
|
||||||
newCtx.getReviewDbProvider().get(), project, id);
|
|
||||||
index(cd);
|
|
||||||
return null;
|
|
||||||
} finally {
|
} finally {
|
||||||
context.setContext(oldCtx);
|
context.setContext(oldCtx);
|
||||||
Provider<ReviewDb> db = dbRef.get();
|
Provider<ReviewDb> db = dbRef.get();
|
||||||
@@ -307,17 +342,31 @@ public class ChangeIndexer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error(String.format("Failed to index change %d", id.get()), e);
|
log.error("Failed to execute " + this, e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class IndexTask extends AbstractIndexTask<Void> {
|
||||||
|
private IndexTask(Project.NameKey project, Change.Id id) {
|
||||||
|
super(project, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void callImpl(Provider<ReviewDb> db) throws Exception {
|
||||||
|
ChangeData cd = newChangeData(db.get(), project, id);
|
||||||
|
index(cd);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "index-change-" + id.get();
|
return "index-change-" + id;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Not AbstractIndexTask as it doesn't need ReviewDb.
|
||||||
private class DeleteTask implements Callable<Void> {
|
private class DeleteTask implements Callable<Void> {
|
||||||
private final Change.Id id;
|
private final Change.Id id;
|
||||||
|
|
||||||
@@ -339,6 +388,26 @@ public class ChangeIndexer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ReindexIfStaleTask extends AbstractIndexTask<Boolean> {
|
||||||
|
private ReindexIfStaleTask(Project.NameKey project, Change.Id id) {
|
||||||
|
super(project, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean callImpl(Provider<ReviewDb> db) throws Exception {
|
||||||
|
if (!stalenessChecker.isStale(id)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
index(newChangeData(db.get(), project, id));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "reindex-if-stale-change-" + id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Avoid auto-rebuilding when reindexing if reading is disabled. This just
|
// Avoid auto-rebuilding when reindexing if reading is disabled. This just
|
||||||
// increases contention on the meta ref from a background indexing thread
|
// increases contention on the meta ref from a background indexing thread
|
||||||
// with little benefit. The next actual write to the entity may still incur a
|
// with little benefit. The next actual write to the entity may still incur a
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ import com.google.gerrit.reviewdb.client.Change;
|
|||||||
import com.google.gerrit.reviewdb.client.Patch;
|
import com.google.gerrit.reviewdb.client.Patch;
|
||||||
import com.google.gerrit.reviewdb.client.PatchSet;
|
import com.google.gerrit.reviewdb.client.PatchSet;
|
||||||
import com.google.gerrit.reviewdb.client.Project;
|
import com.google.gerrit.reviewdb.client.Project;
|
||||||
|
import com.google.gerrit.reviewdb.client.RefNames;
|
||||||
import com.google.gerrit.reviewdb.server.ReviewDb;
|
import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||||
import com.google.gerrit.server.CurrentUser;
|
import com.google.gerrit.server.CurrentUser;
|
||||||
import com.google.gerrit.server.IdentifiedUser;
|
import com.google.gerrit.server.IdentifiedUser;
|
||||||
@@ -1611,6 +1612,32 @@ public abstract class AbstractQueryChangesTest extends GerritServerTests {
|
|||||||
cd.currentApprovals();
|
cd.currentApprovals();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void reindexIfStale() throws Exception {
|
||||||
|
Account.Id user = createAccount("user");
|
||||||
|
Project.NameKey project = new Project.NameKey("repo");
|
||||||
|
TestRepository<Repo> repo = createProject(project.get());
|
||||||
|
Change change = insert(repo, newChange(repo));
|
||||||
|
PatchSet ps = db.patchSets().get(change.currentPatchSetId());
|
||||||
|
|
||||||
|
requestContext.setContext(newRequestContext(user));
|
||||||
|
assertThat(changeEditModifier.createEdit(change, ps))
|
||||||
|
.isEqualTo(RefUpdate.Result.NEW);
|
||||||
|
assertQuery("has:edit", change);
|
||||||
|
assertThat(indexer.reindexIfStale(project, change.getId()).get()).isFalse();
|
||||||
|
|
||||||
|
// Delete edit ref behind index's back.
|
||||||
|
RefUpdate ru = repo.getRepository().updateRef(
|
||||||
|
RefNames.refsEdit(user, change.getId(), ps.getId()));
|
||||||
|
ru.setForceUpdate(true);
|
||||||
|
assertThat(ru.delete()).isEqualTo(RefUpdate.Result.FORCED);
|
||||||
|
|
||||||
|
// Index is stale.
|
||||||
|
assertQuery("has:edit", change);
|
||||||
|
assertThat(indexer.reindexIfStale(project, change.getId()).get()).isTrue();
|
||||||
|
assertQuery("has:edit");
|
||||||
|
}
|
||||||
|
|
||||||
protected ChangeInserter newChange(TestRepository<Repo> repo)
|
protected ChangeInserter newChange(TestRepository<Repo> repo)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
return newChange(repo, null, null, null, null);
|
return newChange(repo, null, null, null, null);
|
||||||
|
|||||||
Reference in New Issue
Block a user