Merge changes from topic 'reindex-if-stale'
* changes: ChangeIndexer: Reindex if stale after every index update ChangeIndexer: Add method to reindex a change if it is stale Implement staleness checker for index changes Store in index the state of all refs that contribute to a change Add method on Index to look up a single document by key
This commit is contained in:
@@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkArgument;
|
||||
|
||||
import com.google.common.base.CharMatcher;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.gerrit.server.config.AllUsersName;
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.config.TrackingFooters;
|
||||
import com.google.gwtorm.server.OrmException;
|
||||
@@ -65,14 +66,17 @@ public abstract class FieldDef<I, T> {
|
||||
public static class FillArgs {
|
||||
public final TrackingFooters trackingFooters;
|
||||
public final boolean allowsDrafts;
|
||||
public final AllUsersName allUsers;
|
||||
|
||||
@Inject
|
||||
FillArgs(TrackingFooters trackingFooters,
|
||||
@GerritServerConfig Config cfg) {
|
||||
@GerritServerConfig Config cfg,
|
||||
AllUsersName allUsers) {
|
||||
this.trackingFooters = trackingFooters;
|
||||
this.allowsDrafts = cfg == null
|
||||
? true
|
||||
: cfg.getBoolean("change", "allowDrafts", true);
|
||||
this.allUsers = allUsers;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,8 +17,11 @@ package com.google.gerrit.server.index;
|
||||
import com.google.gerrit.server.query.DataSource;
|
||||
import com.google.gerrit.server.query.Predicate;
|
||||
import com.google.gerrit.server.query.QueryParseException;
|
||||
import com.google.gwtorm.server.OrmException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Secondary index implementation for arbitrary documents.
|
||||
@@ -90,6 +93,44 @@ public interface Index<K, V> {
|
||||
DataSource<V> getSource(Predicate<V> p, QueryOptions opts)
|
||||
throws QueryParseException;
|
||||
|
||||
/**
|
||||
* Get a single document from the index.
|
||||
*
|
||||
* @param key document key.
|
||||
* @param opts query options. Options that do not make sense in the context of
|
||||
* a single document, such as start, will be ignored.
|
||||
* @return a single document if present.
|
||||
* @throws IOException
|
||||
*/
|
||||
default Optional<V> get(K key, QueryOptions opts) throws IOException {
|
||||
opts = opts.withStart(0).withLimit(2);
|
||||
List<V> results;
|
||||
try {
|
||||
results = getSource(keyPredicate(key), opts).read().toList();
|
||||
} catch (QueryParseException e) {
|
||||
throw new IOException("Unexpected QueryParseException during get()", e);
|
||||
} catch (OrmException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
switch (results.size()) {
|
||||
case 0:
|
||||
return Optional.empty();
|
||||
case 1:
|
||||
return Optional.of(results.get(0));
|
||||
default:
|
||||
throw new IOException("Multiple results found in index for key "
|
||||
+ key + ": " + results);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a predicate that looks up a single document by key.
|
||||
*
|
||||
* @param key document key.
|
||||
* @return a single predicate.
|
||||
*/
|
||||
Predicate<V> keyPredicate(K key);
|
||||
|
||||
/**
|
||||
* Mark whether this index is up-to-date and ready to serve reads.
|
||||
*
|
||||
|
||||
@@ -18,9 +18,16 @@ import com.google.gerrit.reviewdb.client.Account;
|
||||
import com.google.gerrit.server.account.AccountState;
|
||||
import com.google.gerrit.server.index.Index;
|
||||
import com.google.gerrit.server.index.IndexDefinition;
|
||||
import com.google.gerrit.server.query.Predicate;
|
||||
import com.google.gerrit.server.query.account.AccountPredicates;
|
||||
|
||||
public interface AccountIndex extends Index<Account.Id, AccountState> {
|
||||
public interface Factory extends
|
||||
IndexDefinition.IndexFactory<Account.Id, AccountState, AccountIndex> {
|
||||
}
|
||||
|
||||
@Override
|
||||
default Predicate<AccountState> keyPredicate(Account.Id id) {
|
||||
return AccountPredicates.id(id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,13 +36,20 @@ import com.google.gerrit.reviewdb.client.ChangeMessage;
|
||||
import com.google.gerrit.reviewdb.client.Comment;
|
||||
import com.google.gerrit.reviewdb.client.PatchSet;
|
||||
import com.google.gerrit.reviewdb.client.PatchSetApproval;
|
||||
import com.google.gerrit.reviewdb.client.Project;
|
||||
import com.google.gerrit.reviewdb.client.RefNames;
|
||||
import com.google.gerrit.server.OutputFormat;
|
||||
import com.google.gerrit.server.ReviewerSet;
|
||||
import com.google.gerrit.server.StarredChangesUtil;
|
||||
import com.google.gerrit.server.index.FieldDef;
|
||||
import com.google.gerrit.server.index.FieldType;
|
||||
import com.google.gerrit.server.index.SchemaUtil;
|
||||
import com.google.gerrit.server.index.change.StalenessChecker.RefState;
|
||||
import com.google.gerrit.server.index.change.StalenessChecker.RefStatePattern;
|
||||
import com.google.gerrit.server.notedb.ChangeNotes;
|
||||
import com.google.gerrit.server.notedb.NoteDbChangeState.PrimaryStorage;
|
||||
import com.google.gerrit.server.notedb.ReviewerStateInternal;
|
||||
import com.google.gerrit.server.notedb.RobotCommentNotes;
|
||||
import com.google.gerrit.server.project.SubmitRuleOptions;
|
||||
import com.google.gerrit.server.query.change.ChangeData;
|
||||
import com.google.gerrit.server.query.change.ChangeQueryBuilder;
|
||||
@@ -945,6 +952,74 @@ public class ChangeField {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* All values of all refs that were used in the course of indexing this
|
||||
* document.
|
||||
* <p>
|
||||
* Emitted as UTF-8 encoded strings of the form
|
||||
* {@code project:ref/name:[hex sha]}.
|
||||
*/
|
||||
public static final FieldDef<ChangeData, Iterable<byte[]>> REF_STATE =
|
||||
new FieldDef.Repeatable<ChangeData, byte[]>(
|
||||
"ref_state", FieldType.STORED_ONLY, true) {
|
||||
@Override
|
||||
public Iterable<byte[]> get(ChangeData input, FillArgs args)
|
||||
throws OrmException {
|
||||
List<byte[]> result = new ArrayList<>();
|
||||
Project.NameKey project = input.change().getProject();
|
||||
|
||||
input.editRefs().values().forEach(
|
||||
r -> result.add(RefState.of(r).toByteArray(project)));
|
||||
input.starRefs().values().forEach(
|
||||
r -> result.add(RefState.of(r.ref()).toByteArray(project)));
|
||||
|
||||
if (PrimaryStorage.of(input.change()) == PrimaryStorage.NOTE_DB) {
|
||||
ChangeNotes notes = input.notes();
|
||||
result.add(RefState.create(notes.getRefName(), notes.getMetaId())
|
||||
.toByteArray(project));
|
||||
notes.getRobotComments(); // Force loading robot comments.
|
||||
RobotCommentNotes robotNotes = notes.getRobotCommentNotes();
|
||||
result.add(
|
||||
RefState.create(robotNotes.getRefName(), robotNotes.getMetaId())
|
||||
.toByteArray(project));
|
||||
input.draftRefs().values().forEach(
|
||||
r -> result.add(RefState.of(r).toByteArray(args.allUsers)));
|
||||
}
|
||||
|
||||
return result;
|
||||
} };
|
||||
|
||||
/**
|
||||
* All ref wildcard patterns that were used in the course of indexing this
|
||||
* document.
|
||||
* <p>
|
||||
* Emitted as UTF-8 encoded strings of the form {@code project:ref/name/*}.
|
||||
* See {@link RefStatePattern} for the pattern format.
|
||||
*/
|
||||
public static final FieldDef<ChangeData, Iterable<byte[]>>
|
||||
REF_STATE_PATTERN = new FieldDef.Repeatable<ChangeData, byte[]>(
|
||||
"ref_state_pattern", FieldType.STORED_ONLY, true) {
|
||||
@Override
|
||||
public Iterable<byte[]> get(ChangeData input, FillArgs args)
|
||||
throws OrmException {
|
||||
Change.Id id = input.getId();
|
||||
Project.NameKey project = input.change().getProject();
|
||||
List<byte[]> result = new ArrayList<>(3);
|
||||
result.add(RefStatePattern.create(
|
||||
RefNames.REFS_USERS + "*/" + RefNames.EDIT_PREFIX + id + "/*")
|
||||
.toByteArray(project));
|
||||
if (PrimaryStorage.of(input.change()) == PrimaryStorage.NOTE_DB) {
|
||||
result.add(
|
||||
RefStatePattern.create(RefNames.refsStarredChangesPrefix(id) + "*")
|
||||
.toByteArray(args.allUsers));
|
||||
result.add(
|
||||
RefStatePattern.create(RefNames.refsDraftCommentsPrefix(id) + "*")
|
||||
.toByteArray(args.allUsers));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
public static final Integer NOT_REVIEWED = -1;
|
||||
|
||||
private static String getTopic(ChangeData input) throws OrmException {
|
||||
|
||||
@@ -17,10 +17,17 @@ package com.google.gerrit.server.index.change;
|
||||
import com.google.gerrit.reviewdb.client.Change;
|
||||
import com.google.gerrit.server.index.Index;
|
||||
import com.google.gerrit.server.index.IndexDefinition;
|
||||
import com.google.gerrit.server.query.Predicate;
|
||||
import com.google.gerrit.server.query.change.ChangeData;
|
||||
import com.google.gerrit.server.query.change.LegacyChangeIdPredicate;
|
||||
|
||||
public interface ChangeIndex extends Index<Change.Id, ChangeData> {
|
||||
public interface Factory extends
|
||||
IndexDefinition.IndexFactory<Change.Id, ChangeData, ChangeIndex> {
|
||||
}
|
||||
|
||||
@Override
|
||||
default Predicate<ChangeData> keyPredicate(Change.Id id) {
|
||||
return new LegacyChangeIdPredicate(id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package com.google.gerrit.server.index.change;
|
||||
|
||||
import static com.google.gerrit.server.extensions.events.EventUtil.logEventListenerError;
|
||||
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Atomics;
|
||||
@@ -28,7 +29,9 @@ import com.google.gerrit.reviewdb.client.Change;
|
||||
import com.google.gerrit.reviewdb.client.Project;
|
||||
import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||
import com.google.gerrit.server.CurrentUser;
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.index.Index;
|
||||
import com.google.gerrit.server.index.IndexExecutor;
|
||||
import com.google.gerrit.server.notedb.ChangeNotes;
|
||||
import com.google.gerrit.server.notedb.NotesMigration;
|
||||
import com.google.gerrit.server.query.change.ChangeData;
|
||||
@@ -43,6 +46,7 @@ import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.AssistedInject;
|
||||
import com.google.inject.util.Providers;
|
||||
|
||||
import org.eclipse.jgit.lib.Config;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -102,16 +106,23 @@ public class ChangeIndexer {
|
||||
private final ChangeNotes.Factory changeNotesFactory;
|
||||
private final ChangeData.Factory changeDataFactory;
|
||||
private final ThreadLocalRequestContext context;
|
||||
private final ListeningExecutorService batchExecutor;
|
||||
private final ListeningExecutorService executor;
|
||||
private final DynamicSet<ChangeIndexedListener> indexedListeners;
|
||||
private final StalenessChecker stalenessChecker;
|
||||
private final boolean reindexAfterIndexUpdate;
|
||||
|
||||
@AssistedInject
|
||||
ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
|
||||
ChangeIndexer(
|
||||
@GerritServerConfig Config cfg,
|
||||
SchemaFactory<ReviewDb> schemaFactory,
|
||||
NotesMigration notesMigration,
|
||||
ChangeNotes.Factory changeNotesFactory,
|
||||
ChangeData.Factory changeDataFactory,
|
||||
ThreadLocalRequestContext context,
|
||||
DynamicSet<ChangeIndexedListener> indexedListeners,
|
||||
StalenessChecker stalenessChecker,
|
||||
@IndexExecutor(BATCH) ListeningExecutorService batchExecutor,
|
||||
@Assisted ListeningExecutorService executor,
|
||||
@Assisted ChangeIndex index) {
|
||||
this.executor = executor;
|
||||
@@ -121,17 +132,23 @@ public class ChangeIndexer {
|
||||
this.changeDataFactory = changeDataFactory;
|
||||
this.context = context;
|
||||
this.indexedListeners = indexedListeners;
|
||||
this.stalenessChecker = stalenessChecker;
|
||||
this.batchExecutor = batchExecutor;
|
||||
this.reindexAfterIndexUpdate = reindexAfterIndexUpdate(cfg);
|
||||
this.index = index;
|
||||
this.indexes = null;
|
||||
}
|
||||
|
||||
@AssistedInject
|
||||
ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
|
||||
@GerritServerConfig Config cfg,
|
||||
NotesMigration notesMigration,
|
||||
ChangeNotes.Factory changeNotesFactory,
|
||||
ChangeData.Factory changeDataFactory,
|
||||
ThreadLocalRequestContext context,
|
||||
DynamicSet<ChangeIndexedListener> indexedListeners,
|
||||
StalenessChecker stalenessChecker,
|
||||
@IndexExecutor(BATCH) ListeningExecutorService batchExecutor,
|
||||
@Assisted ListeningExecutorService executor,
|
||||
@Assisted ChangeIndexCollection indexes) {
|
||||
this.executor = executor;
|
||||
@@ -141,10 +158,17 @@ public class ChangeIndexer {
|
||||
this.changeDataFactory = changeDataFactory;
|
||||
this.context = context;
|
||||
this.indexedListeners = indexedListeners;
|
||||
this.stalenessChecker = stalenessChecker;
|
||||
this.batchExecutor = batchExecutor;
|
||||
this.reindexAfterIndexUpdate = reindexAfterIndexUpdate(cfg);
|
||||
this.index = null;
|
||||
this.indexes = indexes;
|
||||
}
|
||||
|
||||
private static boolean reindexAfterIndexUpdate(Config cfg) {
|
||||
return cfg.getBoolean("index", null, "testReindexAfterUpdate", true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start indexing a change.
|
||||
*
|
||||
@@ -181,6 +205,26 @@ public class ChangeIndexer {
|
||||
i.replace(cd);
|
||||
}
|
||||
fireChangeIndexedEvent(cd.getId().get());
|
||||
|
||||
// Always double-check whether the change might be stale immediately after
|
||||
// interactively indexing it. This fixes up the case where two writers write
|
||||
// to the primary storage in one order, and the corresponding index writes
|
||||
// happen in the opposite order:
|
||||
// 1. Writer A writes to primary storage.
|
||||
// 2. Writer B writes to primary storage.
|
||||
// 3. Writer B updates index.
|
||||
// 4. Writer A updates index.
|
||||
//
|
||||
// Without the extra reindexIfStale step, A has no way of knowing that it's
|
||||
// about to overwrite the index document with stale data. It doesn't work to
|
||||
// have A check for staleness before attempting its index update, because
|
||||
// B's index update might not have happened when it does the check.
|
||||
//
|
||||
// With the extra reindexIfStale step after (3)/(4), we are able to detect
|
||||
// and fix the staleness. It doesn't matter which order the two
|
||||
// reindexIfStale calls actually execute in; we are guaranteed that at least
|
||||
// one of them will execute after the second index write, (4).
|
||||
reindexAfterIndexUpdate(cd);
|
||||
}
|
||||
|
||||
private void fireChangeIndexedEvent(int id) {
|
||||
@@ -212,6 +256,8 @@ public class ChangeIndexer {
|
||||
public void index(ReviewDb db, Change change)
|
||||
throws IOException, OrmException {
|
||||
index(newChangeData(db, change));
|
||||
// See comment in #index(ChangeData).
|
||||
reindexAfterIndexUpdate(change.getProject(), change.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -223,7 +269,10 @@ public class ChangeIndexer {
|
||||
*/
|
||||
public void index(ReviewDb db, Project.NameKey project, Change.Id changeId)
|
||||
throws IOException, OrmException {
|
||||
index(newChangeData(db, project, changeId));
|
||||
ChangeData cd = newChangeData(db, project, changeId);
|
||||
index(cd);
|
||||
// See comment in #index(ChangeData).
|
||||
reindexAfterIndexUpdate(cd);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -245,28 +294,68 @@ public class ChangeIndexer {
|
||||
new DeleteTask(id).call();
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously check if a change is stale, and reindex if it is.
|
||||
* <p>
|
||||
* Always run on the batch executor, even if this indexer instance is
|
||||
* configured to use a different executor.
|
||||
*
|
||||
* @param project the project to which the change belongs.
|
||||
* @param id ID of the change to index.
|
||||
* @return future for reindexing the change; returns true if the change was
|
||||
* stale.
|
||||
*/
|
||||
public CheckedFuture<Boolean, IOException> reindexIfStale(
|
||||
Project.NameKey project, Change.Id id) {
|
||||
return submit(new ReindexIfStaleTask(project, id), batchExecutor);
|
||||
}
|
||||
|
||||
private void reindexAfterIndexUpdate(ChangeData cd) throws IOException {
|
||||
try {
|
||||
reindexAfterIndexUpdate(cd.project(), cd.getId());
|
||||
} catch (OrmException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void reindexAfterIndexUpdate(Project.NameKey project, Change.Id id) {
|
||||
if (reindexAfterIndexUpdate) {
|
||||
reindexIfStale(project, id);
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<ChangeIndex> getWriteIndexes() {
|
||||
return indexes != null
|
||||
? indexes.getWriteIndexes()
|
||||
: Collections.singleton(index);
|
||||
}
|
||||
|
||||
private CheckedFuture<?, IOException> submit(Callable<?> task) {
|
||||
private <T> CheckedFuture<T, IOException> submit(Callable<T> task) {
|
||||
return submit(task, executor);
|
||||
}
|
||||
|
||||
private static <T> CheckedFuture<T, IOException> submit(Callable<T> task,
|
||||
ListeningExecutorService executor) {
|
||||
return Futures.makeChecked(
|
||||
Futures.nonCancellationPropagating(executor.submit(task)), MAPPER);
|
||||
}
|
||||
|
||||
private class IndexTask implements Callable<Void> {
|
||||
private final Project.NameKey project;
|
||||
private final Change.Id id;
|
||||
private abstract class AbstractIndexTask<T> implements Callable<T> {
|
||||
protected final Project.NameKey project;
|
||||
protected final Change.Id id;
|
||||
|
||||
private IndexTask(Project.NameKey project, Change.Id id) {
|
||||
protected AbstractIndexTask(Project.NameKey project, Change.Id id) {
|
||||
this.project = project;
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
protected abstract T callImpl(Provider<ReviewDb> db) throws Exception;
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
public abstract String toString();
|
||||
|
||||
@Override
|
||||
public final T call() throws Exception {
|
||||
try {
|
||||
final AtomicReference<Provider<ReviewDb>> dbRef =
|
||||
Atomics.newReference();
|
||||
@@ -295,10 +384,7 @@ public class ChangeIndexer {
|
||||
};
|
||||
RequestContext oldCtx = context.setContext(newCtx);
|
||||
try {
|
||||
ChangeData cd = newChangeData(
|
||||
newCtx.getReviewDbProvider().get(), project, id);
|
||||
index(cd);
|
||||
return null;
|
||||
return callImpl(newCtx.getReviewDbProvider());
|
||||
} finally {
|
||||
context.setContext(oldCtx);
|
||||
Provider<ReviewDb> db = dbRef.get();
|
||||
@@ -307,17 +393,31 @@ public class ChangeIndexer {
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error(String.format("Failed to index change %d", id.get()), e);
|
||||
log.error("Failed to execute " + this, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class IndexTask extends AbstractIndexTask<Void> {
|
||||
private IndexTask(Project.NameKey project, Change.Id id) {
|
||||
super(project, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void callImpl(Provider<ReviewDb> db) throws Exception {
|
||||
ChangeData cd = newChangeData(db.get(), project, id);
|
||||
index(cd);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "index-change-" + id.get();
|
||||
return "index-change-" + id;
|
||||
}
|
||||
}
|
||||
|
||||
// Not AbstractIndexTask as it doesn't need ReviewDb.
|
||||
private class DeleteTask implements Callable<Void> {
|
||||
private final Change.Id id;
|
||||
|
||||
@@ -339,6 +439,26 @@ public class ChangeIndexer {
|
||||
}
|
||||
}
|
||||
|
||||
private class ReindexIfStaleTask extends AbstractIndexTask<Boolean> {
|
||||
private ReindexIfStaleTask(Project.NameKey project, Change.Id id) {
|
||||
super(project, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean callImpl(Provider<ReviewDb> db) throws Exception {
|
||||
if (!stalenessChecker.isStale(id)) {
|
||||
return false;
|
||||
}
|
||||
index(newChangeData(db.get(), project, id));
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "reindex-if-stale-change-" + id;
|
||||
}
|
||||
}
|
||||
|
||||
// Avoid auto-rebuilding when reindexing if reading is disabled. This just
|
||||
// increases contention on the meta ref from a background indexing thread
|
||||
// with little benefit. The next actual write to the entity may still incur a
|
||||
|
||||
@@ -73,12 +73,18 @@ public class ChangeSchemaDefinitions extends SchemaDefinitions<ChangeData> {
|
||||
.add(ChangeField.LABEL2)
|
||||
.build();
|
||||
|
||||
@Deprecated
|
||||
static final Schema<ChangeData> V35 =
|
||||
schema(V34,
|
||||
ChangeField.SUBMIT_RECORD,
|
||||
ChangeField.STORED_SUBMIT_RECORD_LENIENT,
|
||||
ChangeField.STORED_SUBMIT_RECORD_STRICT);
|
||||
|
||||
static final Schema<ChangeData> V36 =
|
||||
schema(V35,
|
||||
ChangeField.REF_STATE,
|
||||
ChangeField.REF_STATE_PATTERN);
|
||||
|
||||
public static final String NAME = "changes";
|
||||
public static final ChangeSchemaDefinitions INSTANCE =
|
||||
new ChangeSchemaDefinitions();
|
||||
|
||||
@@ -0,0 +1,308 @@
|
||||
// Copyright (C) 2016 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server.index.change;
|
||||
|
||||
import static com.google.common.base.MoreObjects.firstNonNull;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static java.util.stream.Collectors.joining;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.SetMultimap;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gerrit.common.Nullable;
|
||||
import com.google.gerrit.reviewdb.client.Change;
|
||||
import com.google.gerrit.reviewdb.client.Project;
|
||||
import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||
import com.google.gerrit.server.git.GitRepositoryManager;
|
||||
import com.google.gerrit.server.index.IndexConfig;
|
||||
import com.google.gerrit.server.notedb.ChangeNotes;
|
||||
import com.google.gerrit.server.notedb.NoteDbChangeState.PrimaryStorage;
|
||||
import com.google.gerrit.server.query.change.ChangeData;
|
||||
import com.google.gwtorm.server.OrmException;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
import org.eclipse.jgit.lib.Constants;
|
||||
import org.eclipse.jgit.lib.ObjectId;
|
||||
import org.eclipse.jgit.lib.Ref;
|
||||
import org.eclipse.jgit.lib.Repository;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
@Singleton
|
||||
public class StalenessChecker {
|
||||
private static final Logger log =
|
||||
LoggerFactory.getLogger(StalenessChecker.class);
|
||||
|
||||
private final ImmutableSet<String> FIELDS = ImmutableSet.of(
|
||||
ChangeField.CHANGE.getName(),
|
||||
ChangeField.REF_STATE.getName(),
|
||||
ChangeField.REF_STATE_PATTERN.getName());
|
||||
|
||||
private final ChangeIndexCollection indexes;
|
||||
private final GitRepositoryManager repoManager;
|
||||
private final IndexConfig indexConfig;
|
||||
private final Provider<ReviewDb> db;
|
||||
|
||||
@Inject
|
||||
StalenessChecker(
|
||||
ChangeIndexCollection indexes,
|
||||
GitRepositoryManager repoManager,
|
||||
IndexConfig indexConfig,
|
||||
Provider<ReviewDb> db) {
|
||||
this.indexes = indexes;
|
||||
this.repoManager = repoManager;
|
||||
this.indexConfig = indexConfig;
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
boolean isStale(Change.Id id) throws IOException, OrmException {
|
||||
ChangeIndex i = indexes.getSearchIndex();
|
||||
if (i == null) {
|
||||
return false; // No index; caller couldn't do anything if it is stale.
|
||||
}
|
||||
if (!i.getSchema().hasField(ChangeField.REF_STATE)
|
||||
|| !i.getSchema().hasField(ChangeField.REF_STATE_PATTERN)) {
|
||||
return false; // Index version not new enough for this check.
|
||||
}
|
||||
|
||||
Optional<ChangeData> result = i.get(
|
||||
id, IndexedChangeQuery.createOptions(indexConfig, 0, 1, FIELDS));
|
||||
if (!result.isPresent()) {
|
||||
return true; // Not in index, but caller wants it to be.
|
||||
}
|
||||
ChangeData cd = result.get();
|
||||
if (reviewDbChangeIsStale(
|
||||
cd.change(),
|
||||
ChangeNotes.readOneReviewDbChange(db.get(), cd.getId()))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return isStale(repoManager, id, parseStates(cd), parsePatterns(cd));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isStale(GitRepositoryManager repoManager,
|
||||
Change.Id id,
|
||||
SetMultimap<Project.NameKey, RefState> states,
|
||||
Multimap<Project.NameKey, RefStatePattern> patterns) {
|
||||
Set<Project.NameKey> projects =
|
||||
Sets.union(states.keySet(), patterns.keySet());
|
||||
|
||||
for (Project.NameKey p : projects) {
|
||||
if (isStale(repoManager, id, p, states, patterns)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean reviewDbChangeIsStale(
|
||||
Change indexChange, @Nullable Change reviewDbChange) {
|
||||
if (reviewDbChange == null) {
|
||||
return false; // Nothing the caller can do.
|
||||
}
|
||||
checkArgument(indexChange.getId().equals(reviewDbChange.getId()),
|
||||
"mismatched change ID: %s != %s",
|
||||
indexChange.getId(), reviewDbChange.getId());
|
||||
if (PrimaryStorage.of(reviewDbChange) != PrimaryStorage.REVIEW_DB) {
|
||||
return false; // Not a ReviewDb change, don't check rowVersion.
|
||||
}
|
||||
return reviewDbChange.getRowVersion() != indexChange.getRowVersion();
|
||||
}
|
||||
|
||||
private SetMultimap<Project.NameKey, RefState> parseStates(ChangeData cd) {
|
||||
return parseStates(cd.getRefStates());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static SetMultimap<Project.NameKey, RefState> parseStates(
|
||||
Iterable<byte[]> states) {
|
||||
RefState.check(states != null, null);
|
||||
SetMultimap<Project.NameKey, RefState> result = HashMultimap.create();
|
||||
for (byte[] b : states) {
|
||||
RefState.check(b != null, null);
|
||||
String s = new String(b, UTF_8);
|
||||
List<String> parts = Splitter.on(':').splitToList(s);
|
||||
RefState.check(
|
||||
parts.size() == 3
|
||||
&& !parts.get(0).isEmpty()
|
||||
&& !parts.get(1).isEmpty(),
|
||||
s);
|
||||
result.put(
|
||||
new Project.NameKey(parts.get(0)),
|
||||
RefState.create(parts.get(1), parts.get(2)));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private Multimap<Project.NameKey, RefStatePattern> parsePatterns(
|
||||
ChangeData cd) {
|
||||
return parsePatterns(cd.getRefStatePatterns());
|
||||
}
|
||||
|
||||
public static ListMultimap<Project.NameKey, RefStatePattern> parsePatterns(
|
||||
Iterable<byte[]> patterns) {
|
||||
RefStatePattern.check(patterns != null, null);
|
||||
ListMultimap<Project.NameKey, RefStatePattern> result =
|
||||
ArrayListMultimap.create();
|
||||
for (byte[] b : patterns) {
|
||||
RefStatePattern.check(b != null, null);
|
||||
String s = new String(b, UTF_8);
|
||||
List<String> parts = Splitter.on(':').splitToList(s);
|
||||
RefStatePattern.check(parts.size() == 2, s);
|
||||
result.put(
|
||||
new Project.NameKey(parts.get(0)),
|
||||
RefStatePattern.create(parts.get(1)));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static boolean isStale(GitRepositoryManager repoManager,
|
||||
Change.Id id, Project.NameKey project,
|
||||
SetMultimap<Project.NameKey, RefState> allStates,
|
||||
Multimap<Project.NameKey, RefStatePattern> allPatterns) {
|
||||
try (Repository repo = repoManager.openRepository(project)) {
|
||||
Set<RefState> states = allStates.get(project);
|
||||
for (RefState state : states) {
|
||||
if (!state.match(repo)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for (RefStatePattern pattern : allPatterns.get(project)) {
|
||||
if (!pattern.match(repo, states)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch (IOException e) {
|
||||
log.warn(
|
||||
String.format("error checking staleness of %s in %s", id, project),
|
||||
e);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@AutoValue
|
||||
public abstract static class RefState {
|
||||
static RefState create(String ref, String sha) {
|
||||
return new AutoValue_StalenessChecker_RefState(
|
||||
ref, ObjectId.fromString(sha));
|
||||
}
|
||||
|
||||
static RefState create(String ref, @Nullable ObjectId id) {
|
||||
return new AutoValue_StalenessChecker_RefState(
|
||||
ref, firstNonNull(id, ObjectId.zeroId()));
|
||||
}
|
||||
|
||||
static RefState of(Ref ref) {
|
||||
return new AutoValue_StalenessChecker_RefState(
|
||||
ref.getName(), ref.getObjectId());
|
||||
}
|
||||
|
||||
byte[] toByteArray(Project.NameKey project) {
|
||||
byte[] a = (project.toString() + ':' + ref() + ':').getBytes(UTF_8);
|
||||
byte[] b = new byte[a.length + Constants.OBJECT_ID_STRING_LENGTH];
|
||||
System.arraycopy(a, 0, b, 0, a.length);
|
||||
id().copyTo(b, a.length);
|
||||
return b;
|
||||
}
|
||||
|
||||
private static void check(boolean condition, String str) {
|
||||
checkArgument(condition, "invalid RefState: %s", str);
|
||||
}
|
||||
|
||||
abstract String ref();
|
||||
abstract ObjectId id();
|
||||
|
||||
private boolean match(Repository repo) throws IOException {
|
||||
Ref ref = repo.exactRef(ref());
|
||||
ObjectId expected = ref != null ? ref.getObjectId() : ObjectId.zeroId();
|
||||
return id().equals(expected);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pattern for matching refs.
|
||||
* <p>
|
||||
* Similar to '*' syntax for native Git refspecs, but slightly more powerful:
|
||||
* the pattern may contain arbitrarily many asterisks. There must be at least
|
||||
* one '*' and the first one must immediately follow a '/'.
|
||||
*/
|
||||
@AutoValue
|
||||
public abstract static class RefStatePattern {
|
||||
static RefStatePattern create(String pattern) {
|
||||
int star = pattern.indexOf('*');
|
||||
check(star > 0 && pattern.charAt(star - 1) == '/', pattern);
|
||||
String prefix = pattern.substring(0, star);
|
||||
check(Repository.isValidRefName(pattern.replace('*', 'x')), pattern);
|
||||
|
||||
// Quote everything except the '*'s, which become ".*".
|
||||
String regex =
|
||||
StreamSupport.stream(Splitter.on('*').split(pattern).spliterator(), false)
|
||||
.map(Pattern::quote)
|
||||
.collect(joining(".*", "^", "$"));
|
||||
return new AutoValue_StalenessChecker_RefStatePattern(
|
||||
pattern, prefix, Pattern.compile(regex));
|
||||
}
|
||||
|
||||
byte[] toByteArray(Project.NameKey project) {
|
||||
return (project.toString() + ':' + pattern()).getBytes(UTF_8);
|
||||
}
|
||||
|
||||
private static void check(boolean condition, String str) {
|
||||
checkArgument(condition, "invalid RefStatePattern: %s", str);
|
||||
}
|
||||
|
||||
abstract String pattern();
|
||||
abstract String prefix();
|
||||
abstract Pattern regex();
|
||||
|
||||
boolean match(String refName) {
|
||||
return regex().matcher(refName).find();
|
||||
}
|
||||
|
||||
private boolean match(Repository repo, Set<RefState> expected)
|
||||
throws IOException {
|
||||
for (Ref r : repo.getRefDatabase().getRefs(prefix()).values()) {
|
||||
if (!match(r.getName())) {
|
||||
continue;
|
||||
}
|
||||
if (!expected.contains(RefState.of(r))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user