Factor out batch indexing logic into a ChangeBatchIndexer

Change-Id: Ief166fadc61f76a5f4ffdb879322e9883040bdd6
This commit is contained in:
Dave Borowitz 2013-06-27 14:41:21 -06:00
parent d103b2b61c
commit 2361cefe17
11 changed files with 436 additions and 297 deletions

View File

@ -245,11 +245,10 @@ public class LuceneChangeIndex implements ChangeIndex {
}
@Override
public void markReady() throws IOException {
public void markReady(boolean ready) throws IOException {
try {
FileBasedConfig cfg = LuceneVersionManager.loadGerritIndexConfig(sitePaths);
cfg.setBoolean("index", Integer.toString(schema.getVersion()), "ready",
true);
LuceneVersionManager.setReady(cfg, schema.getVersion(), ready);
cfg.save();
} catch (ConfigInvalidException e) {
throw new IOException(e);

View File

@ -78,12 +78,12 @@ class LuceneVersionManager implements LifecycleListener {
return cfg;
}
private static boolean getReady(Config cfg, int version) {
return cfg.getBoolean("index", Integer.toString(version), "ready", false);
static void setReady(Config cfg, int version, boolean ready) {
cfg.setBoolean("index", Integer.toString(version), "ready", ready);
}
private static void setReady(Config cfg, int version, boolean ready) {
cfg.setBoolean("index", Integer.toString(version), "ready", ready);
private static boolean getReady(Config cfg, int version) {
return cfg.getBoolean("index", Integer.toString(version), "ready", false);
}
private final SitePaths sitePaths;

View File

@ -16,17 +16,8 @@ package com.google.gerrit.pgm;
import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleManager;
@ -38,20 +29,14 @@ import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.index.ChangeBatchIndexer;
import com.google.gerrit.server.index.ChangeIndex;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.index.ChangeSchemas;
import com.google.gerrit.server.index.IndexCollection;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.index.IndexModule.IndexType;
import com.google.gerrit.server.index.NoIndexModule;
import com.google.gerrit.server.patch.PatchListCacheImpl;
import com.google.gerrit.server.patch.PatchListLoader;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.solr.SolrIndexModule;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
@ -63,39 +48,17 @@ import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import org.eclipse.jgit.diff.DiffEntry;
import org.eclipse.jgit.diff.DiffFormatter;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryCache;
import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.util.io.DisabledOutputStream;
import org.eclipse.jgit.util.io.NullOutputStream;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Reindex extends SiteProgram {
private static final Logger log = LoggerFactory.getLogger(Reindex.class);
@Option(name = "--threads", usage = "Number of threads to use for indexing")
private int threads = Runtime.getRuntime().availableProcessors();
@ -136,9 +99,10 @@ public class Reindex extends SiteProgram {
sysManager.start();
index = sysInjector.getInstance(IndexCollection.class).getSearchIndex();
index.markReady(false);
index.deleteAll();
int result = indexAll();
index.markReady();
index.markReady(true);
sysManager.stop();
dbManager.stop();
@ -218,9 +182,6 @@ public class Reindex extends SiteProgram {
private int indexAll() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
ListeningScheduledExecutorService executor = sysInjector.getInstance(
Key.get(ListeningScheduledExecutorService.class, IndexExecutor.class));
ProgressMonitor pm = new TextProgressMonitor();
pm.start(1);
pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
@ -238,234 +199,14 @@ public class Reindex extends SiteProgram {
}
pm.endTask();
final MultiProgressMonitor mpm =
new MultiProgressMonitor(System.err, "Reindexing changes");
final Task projTask = mpm.beginSubTask("projects", projects.size());
final Task doneTask = mpm.beginSubTask(null, changeCount);
final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
Stopwatch sw = new Stopwatch().start();
final List<ListenableFuture<?>> futures =
Lists.newArrayListWithCapacity(projects.size());
final AtomicBoolean ok = new AtomicBoolean(true);
for (final Project.NameKey project : projects) {
final ListenableFuture<?> future = executor.submit(
new ReindexProject(project, doneTask, failedTask));
futures.add(future);
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (InterruptedException e) {
fail(project, e);
} catch (ExecutionException e) {
ok.set(false); // Logged by indexer.
} catch (RuntimeException e) {
failAndThrow(project, e);
} catch (Error e) {
failAndThrow(project, e);
} finally {
projTask.update(1);
}
}
private void fail(Project.NameKey project, Throwable t) {
log.error("Failed to index project " + project, t);
ok.set(false);
}
private void failAndThrow(Project.NameKey project, RuntimeException e) {
fail(project, e);
throw e;
}
private void failAndThrow(Project.NameKey project, Error e) {
fail(project, e);
throw e;
}
}, MoreExecutors.sameThreadExecutor());
}
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override
public ListenableFuture<Void> apply(List<?> input) throws Exception {
mpm.end();
return Futures.immediateFuture(null);
}
}));
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
int n = doneTask.getCount() + failedTask.getCount();
System.out.format("Reindexed %d changes in %.01fs (%.01f/s)\n",
n, elapsed, n/elapsed);
return ok.get() ? 0 : 1;
}
private class ReindexProject implements Callable<Void> {
private final ChangeIndexer indexer;
private final Project.NameKey project;
private final ListMultimap<ObjectId, ChangeData> byId;
private final Task done;
private final Task failed;
private Repository repo;
private RevWalk walk;
private ReindexProject(Project.NameKey project, Task done, Task failed) {
this.indexer = sysInjector.getInstance(ChangeIndexer.class);
this.project = project;
this.byId = ArrayListMultimap.create();
this.done = done;
this.failed = failed;
}
@Override
public Void call() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
GitRepositoryManager mgr = sysInjector.getInstance(GitRepositoryManager.class);
repo = mgr.openRepository(project);
try {
Map<String, Ref> refs = repo.getAllRefs();
for (Change c : db.changes().byProject(project)) {
String refName = c.currentPatchSetId().toRefName();
Ref r = refs.get(refName);
if (r != null) {
byId.put(r.getObjectId(), new ChangeData(c));
} else {
fail("Failed to index change " + c.getId()
+ " (" + refName + " not found)", true, null);
}
}
walk();
} finally {
repo.close();
RepositoryCache.close(repo); // Only used once per Reindex call.
}
return null;
}
private void walk() throws Exception {
walk = new RevWalk(repo);
try {
// Walk only refs first to cover as many changes as we can without having
// to mark every single change.
for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) {
RevObject o = walk.parseAny(ref.getObjectId());
if (o instanceof RevCommit) {
walk.markStart((RevCommit) o);
}
}
RevCommit bCommit;
while ((bCommit = walk.next()) != null && !byId.isEmpty()) {
if (byId.containsKey(bCommit)) {
getPathsAndIndex(bCommit);
byId.removeAll(bCommit);
}
}
for (ObjectId id : byId.keySet()) {
getPathsAndIndex(walk.parseCommit(id));
}
} finally {
walk.release();
}
}
private void getPathsAndIndex(RevCommit bCommit) throws Exception {
RevTree bTree = bCommit.getTree();
List<ChangeData> cds = Lists.newArrayList(byId.get(bCommit));
try {
RevTree aTree = aFor(bCommit, walk);
DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE);
try {
df.setRepository(repo);
if (!cds.isEmpty()) {
List<String> paths = (aTree != null)
? getPaths(df.scan(aTree, bTree))
: Collections.<String>emptyList();
Iterator<ChangeData> cdit = cds.iterator();
for (ChangeData cd ; cdit.hasNext(); cdit.remove()) {
cd = cdit.next();
try {
cd.setCurrentFilePaths(paths);
indexer.indexTask(cd).call();
done.update(1);
if (verbose) {
System.out.println("Reindexed change " + cd.getId());
}
} catch (Exception e) {
fail("Failed to index change " + cd.getId(), true, e);
}
}
}
} finally {
df.release();
}
} catch (Exception e) {
fail("Failed to index commit " + bCommit.name(), false, e);
for (ChangeData cd : cds) {
fail("Failed to index change " + cd.getId(), true, null);
}
}
}
private List<String> getPaths(List<DiffEntry> filenames) {
Set<String> paths = Sets.newTreeSet();
for (DiffEntry e : filenames) {
if (e.getOldPath() != null) {
paths.add(e.getOldPath());
}
if (e.getNewPath() != null) {
paths.add(e.getNewPath());
}
}
return ImmutableList.copyOf(paths);
}
private RevTree aFor(RevCommit b, RevWalk walk) throws IOException {
switch (b.getParentCount()) {
case 0:
return walk.parseTree(emptyTree());
case 1:
RevCommit a = b.getParent(0);
walk.parseBody(a);
return walk.parseTree(a.getTree());
case 2:
return PatchListLoader.automerge(repo, walk, b);
default:
return null;
}
}
private ObjectId emptyTree() throws IOException {
ObjectInserter oi = repo.newObjectInserter();
try {
ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {});
oi.flush();
return id;
} finally {
oi.release();
}
}
private void fail(String error, boolean failed, Exception e) {
if (failed) {
this.failed.update(1);
}
if (e != null) {
log.warn(error, e);
} else {
log.warn(error);
}
if (verbose) {
System.out.println(error);
}
}
ChangeBatchIndexer batchIndexer =
sysInjector.getInstance(ChangeBatchIndexer.class);
ChangeBatchIndexer.Result result = batchIndexer.indexAll(
index, projects, projects.size(), changeCount, System.err,
verbose ? System.out : NullOutputStream.INSTANCE);
int n = result.doneCount() + result.failedCount();
double t = result.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Reindexed %d changes in %.01fs (%.01f/s)\n", n, t, n/t);
return result.success() ? 0 : 1;
}
}

View File

@ -0,0 +1,359 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package com.google.gerrit.server.git;
package com.google.gerrit.server.index;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.patch.PatchListLoader;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.eclipse.jgit.diff.DiffEntry;
import org.eclipse.jgit.diff.DiffFormatter;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.util.io.DisabledOutputStream;
import org.eclipse.jgit.util.io.NullOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class ChangeBatchIndexer {
private static final Logger log =
LoggerFactory.getLogger(ChangeBatchIndexer.class);
public static class Result {
private final long elapsedNanos;
private final boolean success;
private final int done;
private final int failed;
private Result(Stopwatch sw, boolean success, int done, int failed) {
this.elapsedNanos = sw.elapsed(TimeUnit.NANOSECONDS);
this.success = success;
this.done = done;
this.failed = failed;
}
public boolean success() {
return success;
}
public int doneCount() {
return done;
}
public int failedCount() {
return failed;
}
public long elapsed(TimeUnit timeUnit) {
return timeUnit.convert(elapsedNanos, TimeUnit.NANOSECONDS);
}
}
private final Provider<ReviewDb> dbProvider;
private final GitRepositoryManager repoManager;
private final ListeningScheduledExecutorService executor;
private final ChangeIndexer.Factory indexerFactory;
@Inject
ChangeBatchIndexer(Provider<ReviewDb> dbProvider,
GitRepositoryManager repoManager,
@IndexExecutor ListeningScheduledExecutorService executor,
ChangeIndexer.Factory indexerFactory) {
this.dbProvider = dbProvider;
this.repoManager = repoManager;
this.executor = executor;
this.indexerFactory = indexerFactory;
}
public Result indexAll(ChangeIndex index, Iterable<Project.NameKey> projects,
int numProjects, int numChanges, OutputStream progressOut,
OutputStream verboseOut) {
if (progressOut == null) {
progressOut = NullOutputStream.INSTANCE;
}
PrintWriter verboseWriter = verboseOut != null ? new PrintWriter(verboseOut)
: null;
Stopwatch sw = new Stopwatch().start();
final MultiProgressMonitor mpm =
new MultiProgressMonitor(progressOut, "Reindexing changes");
final Task projTask = mpm.beginSubTask("projects",
numProjects >= 0 ? numProjects : MultiProgressMonitor.UNKNOWN);
final Task doneTask = mpm.beginSubTask(null,
numChanges >= 0 ? numChanges : MultiProgressMonitor.UNKNOWN);
final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
final List<ListenableFuture<?>> futures = Lists.newArrayList();
final AtomicBoolean ok = new AtomicBoolean(true);
for (final Project.NameKey project : projects) {
final ListenableFuture<?> future = executor.submit(new ReindexProject(
indexerFactory.create(index), project, doneTask, failedTask,
verboseWriter));
futures.add(future);
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (InterruptedException e) {
fail(project, e);
} catch (ExecutionException e) {
ok.set(false); // Logged by indexer.
} catch (RuntimeException e) {
failAndThrow(project, e);
} catch (Error e) {
failAndThrow(project, e);
} finally {
projTask.update(1);
}
}
private void fail(Project.NameKey project, Throwable t) {
log.error("Failed to index project " + project, t);
ok.set(false);
}
private void failAndThrow(Project.NameKey project, RuntimeException e) {
fail(project, e);
throw e;
}
private void failAndThrow(Project.NameKey project, Error e) {
fail(project, e);
throw e;
}
}, MoreExecutors.sameThreadExecutor());
}
try {
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override
public ListenableFuture<Void> apply(List<?> input) {
mpm.end();
return Futures.immediateFuture(null);
}
}));
} catch (ExecutionException e) {
log.error("Error in batch indexer", e);
ok.set(false);
}
return new Result(sw, ok.get(), doneTask.getCount(), failedTask.getCount());
}
private class ReindexProject implements Callable<Void> {
private final ChangeIndexer indexer;
private final Project.NameKey project;
private final ListMultimap<ObjectId, ChangeData> byId;
private final Task done;
private final Task failed;
private final PrintWriter verboseWriter;
private Repository repo;
private RevWalk walk;
private ReindexProject(ChangeIndexer indexer, Project.NameKey project,
Task done, Task failed, PrintWriter verboseWriter) {
this.indexer = indexer;
this.project = project;
this.byId = ArrayListMultimap.create();
this.done = done;
this.verboseWriter = verboseWriter;
this.failed = failed;
}
@Override
public Void call() throws Exception {
ReviewDb db = dbProvider.get();
repo = repoManager.openRepository(project);
try {
Map<String, Ref> refs = repo.getAllRefs();
for (Change c : db.changes().byProject(project)) {
Ref r = refs.get(c.currentPatchSetId().toRefName());
if (r != null) {
byId.put(r.getObjectId(), new ChangeData(c));
}
}
walk();
} finally {
repo.close();
// TODO(dborowitz): Opening all repositories in a live server may be
// wasteful; see if we can determine which ones it is safe to close with
// RepositoryCache.close(repo).
}
return null;
}
private void walk() throws Exception {
walk = new RevWalk(repo);
try {
// Walk only refs first to cover as many changes as we can without having
// to mark every single change.
for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) {
RevObject o = walk.parseAny(ref.getObjectId());
if (o instanceof RevCommit) {
walk.markStart((RevCommit) o);
}
}
RevCommit bCommit;
while ((bCommit = walk.next()) != null && !byId.isEmpty()) {
if (byId.containsKey(bCommit)) {
getPathsAndIndex(bCommit);
byId.removeAll(bCommit);
}
}
for (ObjectId id : byId.keySet()) {
getPathsAndIndex(walk.parseCommit(id));
}
} finally {
walk.release();
}
}
private void getPathsAndIndex(RevCommit bCommit) throws Exception {
RevTree bTree = bCommit.getTree();
List<ChangeData> cds = Lists.newArrayList(byId.get(bCommit));
try {
RevTree aTree = aFor(bCommit, walk);
DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE);
try {
df.setRepository(repo);
if (!cds.isEmpty()) {
List<String> paths = (aTree != null)
? getPaths(df.scan(aTree, bTree))
: Collections.<String>emptyList();
Iterator<ChangeData> cdit = cds.iterator();
for (ChangeData cd ; cdit.hasNext(); cdit.remove()) {
cd = cdit.next();
try {
cd.setCurrentFilePaths(paths);
indexer.indexTask(cd).call();
done.update(1);
if (verboseWriter != null) {
verboseWriter.println("Reindexed change " + cd.getId());
}
} catch (Exception e) {
fail("Failed to index change " + cd.getId(), true, e);
}
}
}
} finally {
df.release();
}
} catch (Exception e) {
fail("Failed to index commit " + bCommit.name(), false, e);
for (ChangeData cd : cds) {
fail("Failed to index change " + cd.getId(), true, null);
}
}
}
private List<String> getPaths(List<DiffEntry> filenames) {
Set<String> paths = Sets.newTreeSet();
for (DiffEntry e : filenames) {
if (e.getOldPath() != null) {
paths.add(e.getOldPath());
}
if (e.getNewPath() != null) {
paths.add(e.getNewPath());
}
}
return ImmutableList.copyOf(paths);
}
private RevTree aFor(RevCommit b, RevWalk walk) throws IOException {
switch (b.getParentCount()) {
case 0:
return walk.parseTree(emptyTree());
case 1:
RevCommit a = b.getParent(0);
walk.parseBody(a);
return walk.parseTree(a.getTree());
case 2:
return PatchListLoader.automerge(repo, walk, b);
default:
return null;
}
}
private ObjectId emptyTree() throws IOException {
ObjectInserter oi = repo.newObjectInserter();
try {
ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {});
oi.flush();
return id;
} finally {
oi.release();
}
}
private void fail(String error, boolean failed, Exception e) {
if (failed) {
this.failed.update(1);
}
if (e != null) {
log.warn(error, e);
} else {
log.warn(error);
}
if (verboseWriter != null) {
verboseWriter.println(error);
}
}
}
}

View File

@ -72,7 +72,7 @@ public interface ChangeIndex {
}
@Override
public void markReady() {
public void markReady(boolean ready) {
throw new UnsupportedOperationException();
}
};
@ -142,13 +142,10 @@ public interface ChangeIndex {
throws QueryParseException;
/**
* Mark this index as up-to-date and ready to serve reads.
* <p>
* Should only be called immediately after a reindex, either during an online
* schema upgrade while actively writing to this index, or during an offline
* reindex.
* Mark whether this index is up-to-date and ready to serve reads.
*
* @param ready whether the index is ready
* @throws IOException
*/
public void markReady() throws IOException;
public void markReady(boolean ready) throws IOException;
}

View File

@ -29,6 +29,11 @@ import java.util.concurrent.Callable;
* compute some of the fields and/or update the index.
*/
public abstract class ChangeIndexer {
public interface Factory {
ChangeIndexer create(ChangeIndex index);
ChangeIndexer create(IndexCollection indexes);
}
/** Instance indicating secondary index is disabled. */
public static final ChangeIndexer DISABLED = new ChangeIndexer(null) {
@Override

View File

@ -21,9 +21,10 @@ import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.OutOfScopeException;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.util.Providers;
import org.slf4j.Logger;
@ -42,18 +43,32 @@ public class ChangeIndexerImpl extends ChangeIndexer {
LoggerFactory.getLogger(ChangeIndexerImpl.class);
private final IndexCollection indexes;
private final ChangeIndex index;
private final SchemaFactory<ReviewDb> schemaFactory;
private final ThreadLocalRequestContext context;
@Inject
@AssistedInject
ChangeIndexerImpl(@IndexExecutor ListeningScheduledExecutorService executor,
IndexCollection indexes,
SchemaFactory<ReviewDb> schemaFactory,
ThreadLocalRequestContext context) {
ThreadLocalRequestContext context,
@Assisted ChangeIndex index) {
super(executor);
this.indexes = indexes;
this.schemaFactory = schemaFactory;
this.context = context;
this.index = index;
this.indexes = null;
}
@AssistedInject
ChangeIndexerImpl(@IndexExecutor ListeningScheduledExecutorService executor,
SchemaFactory<ReviewDb> schemaFactory,
ThreadLocalRequestContext context,
@Assisted IndexCollection indexes) {
super(executor);
this.schemaFactory = schemaFactory;
this.context = context;
this.index = null;
this.indexes = indexes;
}
@Override
@ -84,8 +99,12 @@ public class ChangeIndexerImpl extends ChangeIndexer {
throw new OutOfScopeException("No user during ChangeIndexer");
}
});
for (ChangeIndex index : indexes.getWriteIndexes()) {
index.replace(cd); // TODO(dborowitz): Parallelize these
if (indexes != null) {
for (ChangeIndex i : indexes.getWriteIndexes()) {
i.replace(cd); // TODO(dborowitz): Parallelize these
}
} else {
index.replace(cd);
}
return null;
} finally {

View File

@ -68,6 +68,14 @@ public class IndexCollection implements LifecycleListener {
writeIndexes.add(index);
}
public void removeWriteIndex(int version) {
writeIndexes.remove(version);
}
public ChangeIndex getWriteIndex(int version) {
return writeIndexes.get(version);
}
@Override
public void start() {
}

View File

@ -25,6 +25,7 @@ import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.eclipse.jgit.lib.Config;
@ -54,11 +55,13 @@ public class IndexModule extends LifecycleModule {
@Override
protected void configure() {
bind(ChangeIndexer.class).to(ChangeIndexerImpl.class);
bind(ChangeQueryRewriter.class).to(IndexRewriteImpl.class);
bind(IndexRewriteImpl.BasicRewritesImpl.class);
bind(IndexCollection.class);
listener().to(IndexCollection.class);
install(new FactoryModuleBuilder()
.implement(ChangeIndexer.class, ChangeIndexerImpl.class)
.build(ChangeIndexer.Factory.class));
}
@Provides
@ -79,4 +82,11 @@ public class IndexModule extends LifecycleModule {
}
return MoreExecutors.listeningDecorator(executor);
}
@Provides
ChangeIndexer getChangeIndexer(
ChangeIndexer.Factory factory,
IndexCollection indexes) {
return factory.create(indexes);
}
}

View File

@ -94,7 +94,7 @@ public class IndexRewriteTest extends TestCase {
}
@Override
public void markReady() {
public void markReady(boolean ready) {
throw new UnsupportedOperationException();
}
}

View File

@ -325,13 +325,14 @@ class SolrChangeIndex implements ChangeIndex, LifecycleListener {
}
@Override
public void markReady() throws IOException {
public void markReady(boolean ready) throws IOException {
// TODO Move the schema version information to a special meta-document
FileBasedConfig cfg = new FileBasedConfig(
solrIndexConfig(sitePaths),
FS.detect());
for (Map.Entry<String, Integer> e : SCHEMA_VERSIONS.entrySet()) {
cfg.setInt("index", e.getKey(), "schemaVersion", e.getValue());
cfg.setInt("index", e.getKey(), "schemaVersion",
ready ? e.getValue() : -1);
}
cfg.save();
}