Add test for online index schema migration

Requires some refactoring of AbstractVersionManager and OnlineReindexer
to allow the test to provide a listener that is injected into Daemon.
Use a simple OnlineUpgradeListener interface, which may end up being
useful for things other than tests.

In addition, factor out a separate LifecycleListener for starting the
online upgrade process. This is not immediately necessary, but will be
used in the near future for the NoteDb migration to hook into. In fact,
this change started life as this minor refactoring, at which point I
realized we probably need tests to make sure I don't break it.

Change-Id: Ifcbcac689cf14137784a250f025df149c90f22ef
This commit is contained in:
Dave Borowitz 2017-06-27 09:18:21 -04:00
parent d782a4f896
commit a9c6832afd
14 changed files with 521 additions and 68 deletions

View File

@ -233,7 +233,7 @@ public class GerritServer implements AutoCloseable {
if (!desc.memory()) {
init(desc, baseConfig, site);
}
return start(desc, baseConfig, site);
return start(desc, baseConfig, site, null);
}
/**
@ -244,10 +244,12 @@ public class GerritServer implements AutoCloseable {
* @param site existing temporary directory for site. Required, but may be empty, for in-memory
* servers. For on-disk servers, assumes that {@link #init} was previously called to
* initialize this directory.
* @param testSysModule optional additional module to add to the system injector.
* @return started server.
* @throws Exception
*/
public static GerritServer start(Description desc, Config baseConfig, Path site)
public static GerritServer start(
Description desc, Config baseConfig, Path site, @Nullable Module testSysModule)
throws Exception {
checkArgument(site != null, "site is required (even for in-memory server");
desc.checkValidAnnotations();
@ -264,6 +266,7 @@ public class GerritServer implements AutoCloseable {
},
site);
daemon.setEmailModuleForTesting(new FakeEmailSender.Module());
daemon.setAdditionalSysModuleForTesting(testSysModule);
daemon.setEnableSshd(desc.useSsh());
if (desc.memory()) {

View File

@ -19,6 +19,7 @@ import static java.util.stream.Collectors.joining;
import static org.junit.Assert.fail;
import com.google.common.collect.Streams;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.launcher.GerritLauncher;
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.server.ReviewDb;
@ -29,6 +30,7 @@ import com.google.gerrit.server.util.OneOffRequestContext;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.testutil.ConfigSuite;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.Provider;
import java.util.Arrays;
import org.eclipse.jgit.lib.Config;
@ -113,19 +115,23 @@ public abstract class StandaloneSiteTest {
}
protected ServerContext startServer() throws Exception {
return new ServerContext(startImpl());
return startServer(null);
}
protected ServerContext startServer(@Nullable Module testSysModule) throws Exception {
return new ServerContext(startImpl(testSysModule));
}
protected void assertServerStartupFails() throws Exception {
try (GerritServer server = startImpl()) {
try (GerritServer server = startImpl(null)) {
fail("expected server startup to fail");
} catch (GerritServer.StartupException e) {
// Expected.
}
}
private GerritServer startImpl() throws Exception {
return GerritServer.start(serverDesc, baseConfig, sitePaths.site_path);
private GerritServer startImpl(@Nullable Module testSysModule) throws Exception {
return GerritServer.start(serverDesc, baseConfig, sitePaths.site_path, testSysModule);
}
protected static void runGerrit(String... args) throws Exception {

View File

@ -14,38 +14,59 @@
package com.google.gerrit.acceptance.pgm;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assert_;
import static com.google.common.truth.Truth8.assertThat;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import com.google.gerrit.acceptance.NoHttpd;
import com.google.gerrit.acceptance.StandaloneSiteTest;
import com.google.gerrit.extensions.api.GerritApi;
import com.google.gerrit.extensions.common.ChangeInput;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.index.GerritIndexStatus;
import com.google.gerrit.server.index.OnlineUpgradeListener;
import com.google.gerrit.server.index.change.ChangeIndexCollection;
import com.google.gerrit.server.index.change.ChangeSchemaDefinitions;
import com.google.gerrit.server.query.change.InternalChangeQuery;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import com.google.inject.Provider;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
import org.junit.Test;
@NoHttpd
public class ReindexIT extends StandaloneSiteTest {
private static final String CHANGES = ChangeSchemaDefinitions.NAME;
private Project.NameKey project;
private String changeId;
@Test
public void reindexFromScratch() throws Exception {
Project.NameKey project = new Project.NameKey("project");
String changeId;
try (ServerContext ctx = startServer()) {
GerritApi gApi = ctx.getInjector().getInstance(GerritApi.class);
gApi.projects().create("project");
ChangeInput in = new ChangeInput(project.get(), "master", "Test change");
in.newBranch = true;
changeId = gApi.changes().create(in).info().changeId;
}
setUpChange();
MoreFiles.deleteRecursively(sitePaths.index_dir, RecursiveDeleteOption.ALLOW_INSECURE);
Files.createDirectory(sitePaths.index_dir);
assertServerStartupFails();
runGerrit("reindex", "-d", sitePaths.site_path.toString(), "--show-stack-trace");
assertReady(ChangeSchemaDefinitions.INSTANCE.getLatest().getVersion());
try (ServerContext ctx = startServer()) {
GerritApi gApi = ctx.getInjector().getInstance(GerritApi.class);
@ -53,4 +74,212 @@ public class ReindexIT extends StandaloneSiteTest {
.containsExactly(changeId);
}
}
@Test
public void onlineUpgradeChanges() throws Exception {
int prevVersion = ChangeSchemaDefinitions.INSTANCE.getPrevious().getVersion();
int currVersion = ChangeSchemaDefinitions.INSTANCE.getLatest().getVersion();
// Before storing any changes, switch back to the previous version.
GerritIndexStatus status = new GerritIndexStatus(sitePaths);
status.setReady(CHANGES, currVersion, false);
status.setReady(CHANGES, prevVersion, true);
status.save();
assertReady(prevVersion);
setOnlineUpgradeConfig(false);
setUpChange();
setOnlineUpgradeConfig(true);
UpgradeController u = new UpgradeController(1);
try (ServerContext ctx = startServer(u.module())) {
assertSearchVersion(ctx, prevVersion);
assertWriteVersions(ctx, prevVersion, currVersion);
// Updating and searching old schema version works.
Provider<InternalChangeQuery> queryProvider =
ctx.getInjector().getProvider(InternalChangeQuery.class);
assertThat(queryProvider.get().byKey(new Change.Key(changeId))).hasSize(1);
assertThat(queryProvider.get().byTopicOpen("topic1")).isEmpty();
GerritApi gApi = ctx.getInjector().getInstance(GerritApi.class);
gApi.changes().id(changeId).topic("topic1");
assertThat(queryProvider.get().byTopicOpen("topic1")).hasSize(1);
u.runUpgrades();
assertThat(u.getStartedAttempts())
.containsExactly(UpgradeAttempt.create(CHANGES, prevVersion, currVersion));
assertThat(u.getSucceededAttempts())
.containsExactly(UpgradeAttempt.create(CHANGES, prevVersion, currVersion));
assertThat(u.getFailedAttempts()).isEmpty();
assertReady(currVersion);
assertSearchVersion(ctx, currVersion);
assertWriteVersions(ctx, currVersion);
// Updating and searching new schema version works.
assertThat(queryProvider.get().byTopicOpen("topic1")).hasSize(1);
assertThat(queryProvider.get().byTopicOpen("topic2")).isEmpty();
gApi.changes().id(changeId).topic("topic2");
assertThat(queryProvider.get().byTopicOpen("topic1")).isEmpty();
assertThat(queryProvider.get().byTopicOpen("topic2")).hasSize(1);
}
}
private void setUpChange() throws Exception {
project = new Project.NameKey("project");
try (ServerContext ctx = startServer()) {
GerritApi gApi = ctx.getInjector().getInstance(GerritApi.class);
gApi.projects().create(project.get());
ChangeInput in = new ChangeInput(project.get(), "master", "Test change");
in.newBranch = true;
changeId = gApi.changes().create(in).info().changeId;
}
}
private void setOnlineUpgradeConfig(boolean enable) throws Exception {
FileBasedConfig cfg = new FileBasedConfig(sitePaths.gerrit_config.toFile(), FS.detect());
cfg.load();
cfg.setBoolean("index", null, "onlineUpgrade", enable);
cfg.save();
}
private void assertSearchVersion(ServerContext ctx, int expected) {
assertThat(
ctx.getInjector()
.getInstance(ChangeIndexCollection.class)
.getSearchIndex()
.getSchema()
.getVersion())
.named("search version")
.isEqualTo(expected);
}
private void assertWriteVersions(ServerContext ctx, Integer... expected) {
assertThat(
ctx.getInjector()
.getInstance(ChangeIndexCollection.class)
.getWriteIndexes()
.stream()
.map(i -> i.getSchema().getVersion()))
.named("write versions")
.containsExactlyElementsIn(ImmutableSet.copyOf(expected));
}
private void assertReady(int expectedReady) throws Exception {
Set<Integer> allVersions = ChangeSchemaDefinitions.INSTANCE.getSchemas().keySet();
GerritIndexStatus status = new GerritIndexStatus(sitePaths);
assertThat(
allVersions.stream().collect(toImmutableMap(v -> v, v -> status.getReady(CHANGES, v))))
.named("ready state for index versions")
.isEqualTo(allVersions.stream().collect(toImmutableMap(v -> v, v -> v == expectedReady)));
}
@AutoValue
abstract static class UpgradeAttempt {
static UpgradeAttempt create(String name, int oldVersion, int newVersion) {
return new AutoValue_ReindexIT_UpgradeAttempt(name, oldVersion, newVersion);
}
abstract String name();
abstract int oldVersion();
abstract int newVersion();
}
private static class UpgradeController implements OnlineUpgradeListener {
private final int numExpected;
private final CountDownLatch readyToStart;
private final CountDownLatch started;
private final CountDownLatch finished;
private final List<UpgradeAttempt> startedAttempts;
private final List<UpgradeAttempt> succeededAttempts;
private final List<UpgradeAttempt> failedAttempts;
UpgradeController(int numExpected) {
this.numExpected = numExpected;
readyToStart = new CountDownLatch(1);
started = new CountDownLatch(numExpected);
finished = new CountDownLatch(numExpected);
startedAttempts = new ArrayList<>();
succeededAttempts = new ArrayList<>();
failedAttempts = new ArrayList<>();
}
Module module() {
return new AbstractModule() {
@Override
public void configure() {
DynamicSet.bind(binder(), OnlineUpgradeListener.class).toInstance(UpgradeController.this);
}
};
}
@Override
public synchronized void onStart(String name, int oldVersion, int newVersion) {
UpgradeAttempt a = UpgradeAttempt.create(name, oldVersion, newVersion);
try {
readyToStart.await();
} catch (InterruptedException e) {
throw new AssertionError("interrupted waiting to start " + a, e);
}
checkState(
started.getCount() > 0, "already started %s upgrades, can't start %s", numExpected, a);
startedAttempts.add(a);
started.countDown();
}
@Override
public synchronized void onSuccess(String name, int oldVersion, int newVersion) {
finish(UpgradeAttempt.create(name, oldVersion, newVersion), succeededAttempts);
}
@Override
public synchronized void onFailure(String name, int oldVersion, int newVersion) {
finish(UpgradeAttempt.create(name, oldVersion, newVersion), failedAttempts);
}
private synchronized void finish(UpgradeAttempt a, List<UpgradeAttempt> out) {
checkState(readyToStart.getCount() == 0, "shouldn't be finishing upgrade before starting");
checkState(
finished.getCount() > 0, "already finished %s upgrades, can't finish %s", numExpected, a);
out.add(a);
finished.countDown();
}
void runUpgrades() throws Exception {
readyToStart.countDown();
// Wait with a timeout. Startup should happen quickly, but bugs preventing upgrading from
// starting might not be that uncommon, so we don't want to have to wait forever to discover
// them.
int timeoutSec = 60;
if (!started.await(timeoutSec, TimeUnit.SECONDS)) {
assert_()
.fail(
"%s/%s online upgrades started after %ss",
numExpected - started.getCount(), numExpected, timeoutSec);
}
// Wait with no timeout. Reindexing might be slow, and given that upgrading started
// successfully, it's unlikely there is a bug preventing it from tripping the finished latch
// eventually, even if it takes longer than we might guess.
finished.await();
}
synchronized ImmutableList<UpgradeAttempt> getStartedAttempts() {
return ImmutableList.copyOf(startedAttempts);
}
synchronized ImmutableList<UpgradeAttempt> getSucceededAttempts() {
return ImmutableList.copyOf(succeededAttempts);
}
synchronized ImmutableList<UpgradeAttempt> getFailedAttempts() {
return ImmutableList.copyOf(failedAttempts);
}
}
}

View File

@ -14,36 +14,51 @@
package com.google.gerrit.elasticsearch;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.index.IndexConfig;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.index.OnlineUpgrader;
import com.google.gerrit.server.index.SingleVersionModule;
import com.google.gerrit.server.index.VersionManager;
import com.google.gerrit.server.index.account.AccountIndex;
import com.google.gerrit.server.index.change.ChangeIndex;
import com.google.gerrit.server.index.group.GroupIndex;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import java.util.Map;
import org.eclipse.jgit.lib.Config;
public class ElasticIndexModule extends LifecycleModule {
private final int threads;
private final Map<String, Integer> singleVersions;
public class ElasticIndexModule extends AbstractModule {
public static ElasticIndexModule singleVersionWithExplicitVersions(
Map<String, Integer> versions, int threads) {
return new ElasticIndexModule(versions, threads);
return new ElasticIndexModule(versions, threads, false);
}
public static ElasticIndexModule latestVersionWithOnlineUpgrade() {
return new ElasticIndexModule(null, 0);
return new ElasticIndexModule(null, 0, true);
}
private ElasticIndexModule(Map<String, Integer> singleVersions, int threads) {
public static ElasticIndexModule latestVersionWithoutOnlineUpgrade() {
return new ElasticIndexModule(null, 0, false);
}
private final Map<String, Integer> singleVersions;
private final int threads;
private final boolean onlineUpgrade;
private ElasticIndexModule(
Map<String, Integer> singleVersions, int threads, boolean onlineUpgrade) {
if (singleVersions != null) {
checkArgument(!onlineUpgrade, "online upgrade is incompatible with single version map");
}
this.singleVersions = singleVersions;
this.threads = threads;
this.onlineUpgrade = onlineUpgrade;
}
@Override
@ -63,7 +78,7 @@ public class ElasticIndexModule extends LifecycleModule {
install(new IndexModule(threads));
if (singleVersions == null) {
listener().to(ElasticVersionManager.class);
install(new MultiVersionModule());
} else {
install(new SingleVersionModule(singleVersions));
}
@ -74,4 +89,15 @@ public class ElasticIndexModule extends LifecycleModule {
IndexConfig getIndexConfig(@GerritServerConfig Config cfg) {
return IndexConfig.fromConfig(cfg).separateChangeSubIndexes(true).build();
}
private class MultiVersionModule extends LifecycleModule {
@Override
public void configure() {
bind(VersionManager.class).to(ElasticVersionManager.class);
listener().to(ElasticVersionManager.class);
if (onlineUpgrade) {
listener().to(OnlineUpgrader.class);
}
}
}
}

View File

@ -16,14 +16,15 @@ package com.google.gerrit.elasticsearch;
import com.google.common.base.MoreObjects;
import com.google.common.primitives.Ints;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.AbstractVersionManager;
import com.google.gerrit.server.index.GerritIndexStatus;
import com.google.gerrit.server.index.Index;
import com.google.gerrit.server.index.IndexDefinition;
import com.google.gerrit.server.index.OnlineUpgradeListener;
import com.google.gerrit.server.index.Schema;
import com.google.gerrit.server.index.VersionManager;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
@ -34,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class ElasticVersionManager extends AbstractVersionManager implements LifecycleListener {
public class ElasticVersionManager extends VersionManager {
private static final Logger log = LoggerFactory.getLogger(ElasticVersionManager.class);
private final String prefix;
@ -44,9 +45,10 @@ public class ElasticVersionManager extends AbstractVersionManager implements Lif
ElasticVersionManager(
@GerritServerConfig Config cfg,
SitePaths sitePaths,
DynamicSet<OnlineUpgradeListener> listeners,
Collection<IndexDefinition<?, ?, ?>> defs,
ElasticIndexVersionDiscovery versionDiscovery) {
super(cfg, sitePaths, defs);
super(sitePaths, listeners, defs, VersionManager.getOnlineUpgrade(cfg));
this.versionDiscovery = versionDiscovery;
prefix = MoreObjects.firstNonNull(cfg.getString("index", null, "prefix"), "gerrit");
}

View File

@ -14,15 +14,20 @@
package com.google.gerrit.lucene;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableMap;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.index.IndexConfig;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.index.OnlineUpgrader;
import com.google.gerrit.server.index.SingleVersionModule;
import com.google.gerrit.server.index.VersionManager;
import com.google.gerrit.server.index.account.AccountIndex;
import com.google.gerrit.server.index.change.ChangeIndex;
import com.google.gerrit.server.index.group.GroupIndex;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
@ -30,30 +35,40 @@ import java.util.Map;
import org.apache.lucene.search.BooleanQuery;
import org.eclipse.jgit.lib.Config;
public class LuceneIndexModule extends LifecycleModule {
public class LuceneIndexModule extends AbstractModule {
public static LuceneIndexModule singleVersionAllLatest(int threads) {
return new LuceneIndexModule(ImmutableMap.<String, Integer>of(), threads);
return new LuceneIndexModule(ImmutableMap.<String, Integer>of(), threads, false);
}
public static LuceneIndexModule singleVersionWithExplicitVersions(
Map<String, Integer> versions, int threads) {
return new LuceneIndexModule(versions, threads);
return new LuceneIndexModule(versions, threads, false);
}
public static LuceneIndexModule latestVersionWithOnlineUpgrade() {
return new LuceneIndexModule(null, 0);
return new LuceneIndexModule(null, 0, true);
}
public static LuceneIndexModule latestVersionWithoutOnlineUpgrade() {
return new LuceneIndexModule(null, 0, false);
}
static boolean isInMemoryTest(Config cfg) {
return cfg.getBoolean("index", "lucene", "testInmemory", false);
}
private final int threads;
private final Map<String, Integer> singleVersions;
private final int threads;
private final boolean onlineUpgrade;
private LuceneIndexModule(Map<String, Integer> singleVersions, int threads) {
private LuceneIndexModule(
Map<String, Integer> singleVersions, int threads, boolean onlineUpgrade) {
if (singleVersions != null) {
checkArgument(!onlineUpgrade, "online upgrade is incompatible with single version map");
}
this.singleVersions = singleVersions;
this.threads = threads;
this.onlineUpgrade = onlineUpgrade;
}
@Override
@ -87,10 +102,14 @@ public class LuceneIndexModule extends LifecycleModule {
return IndexConfig.fromConfig(cfg).separateChangeSubIndexes(true).build();
}
private static class MultiVersionModule extends LifecycleModule {
private class MultiVersionModule extends LifecycleModule {
@Override
public void configure() {
bind(VersionManager.class).to(LuceneVersionManager.class);
listener().to(LuceneVersionManager.class);
if (onlineUpgrade) {
listener().to(OnlineUpgrader.class);
}
}
}
}

View File

@ -15,14 +15,15 @@
package com.google.gerrit.lucene;
import com.google.common.primitives.Ints;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.AbstractVersionManager;
import com.google.gerrit.server.index.GerritIndexStatus;
import com.google.gerrit.server.index.Index;
import com.google.gerrit.server.index.IndexDefinition;
import com.google.gerrit.server.index.OnlineUpgradeListener;
import com.google.gerrit.server.index.Schema;
import com.google.gerrit.server.index.VersionManager;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
@ -36,10 +37,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class LuceneVersionManager extends AbstractVersionManager implements LifecycleListener {
public class LuceneVersionManager extends VersionManager {
private static final Logger log = LoggerFactory.getLogger(LuceneVersionManager.class);
private static class Version<V> extends AbstractVersionManager.Version<V> {
private static class Version<V> extends VersionManager.Version<V> {
private final boolean exists;
private Version(Schema<V> schema, int version, boolean exists, boolean ready) {
@ -56,22 +57,22 @@ public class LuceneVersionManager extends AbstractVersionManager implements Life
LuceneVersionManager(
@GerritServerConfig Config cfg,
SitePaths sitePaths,
DynamicSet<OnlineUpgradeListener> listeners,
Collection<IndexDefinition<?, ?, ?>> defs) {
super(cfg, sitePaths, defs);
super(sitePaths, listeners, defs, VersionManager.getOnlineUpgrade(cfg));
}
@Override
protected <V> boolean isDirty(
Collection<com.google.gerrit.server.index.AbstractVersionManager.Version<V>> inUse,
com.google.gerrit.server.index.AbstractVersionManager.Version<V> v) {
Collection<com.google.gerrit.server.index.VersionManager.Version<V>> inUse,
com.google.gerrit.server.index.VersionManager.Version<V> v) {
return !inUse.contains(v) && ((Version<V>) v).exists;
}
@Override
protected <K, V, I extends Index<K, V>>
TreeMap<Integer, AbstractVersionManager.Version<V>> scanVersions(
IndexDefinition<K, V, I> def, GerritIndexStatus cfg) {
TreeMap<Integer, AbstractVersionManager.Version<V>> versions = new TreeMap<>();
protected <K, V, I extends Index<K, V>> TreeMap<Integer, VersionManager.Version<V>> scanVersions(
IndexDefinition<K, V, I> def, GerritIndexStatus cfg) {
TreeMap<Integer, VersionManager.Version<V>> versions = new TreeMap<>();
for (Schema<V> schema : def.getSchemas().values()) {
// This part is Lucene-specific.
Path p = getDir(sitePaths, def.getName(), schema);

View File

@ -9,6 +9,7 @@ RSRCS = "src/main/resources/com/google/gerrit/pgm/"
INIT_API_SRCS = glob([SRCS + "init/api/*.java"])
BASE_JETTY_DEPS = [
"//gerrit-common:annotations",
"//gerrit-common:server",
"//gerrit-extension-api:api",
"//gerrit-gwtexpui:linker_server",
@ -35,7 +36,7 @@ java_library(
name = "init-api",
srcs = INIT_API_SRCS,
visibility = ["//visibility:public"],
deps = DEPS + ["//gerrit-common:annotations"],
deps = DEPS,
)
java_library(
@ -46,7 +47,6 @@ java_library(
deps = DEPS + [
":init-api",
":util",
"//gerrit-common:annotations",
"//gerrit-elasticsearch:elasticsearch",
"//gerrit-launcher:launcher", # We want this dep to be provided_deps
"//gerrit-lucene:lucene",

View File

@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.gerrit.common.EventBroker;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.elasticsearch.ElasticIndexModule;
import com.google.gerrit.extensions.client.AuthType;
import com.google.gerrit.gpg.GpgModule;
@ -69,6 +70,7 @@ import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.index.DummyIndexModule;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.index.IndexModule.IndexType;
import com.google.gerrit.server.index.VersionManager;
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
import com.google.gerrit.server.mail.receive.MailReceiver;
import com.google.gerrit.server.mail.send.SmtpEmailSender;
@ -174,6 +176,7 @@ public class Daemon extends SiteProgram {
private boolean test;
private AbstractModule luceneModule;
private Module emailModule;
private Module testSysModule;
private Runnable serverStarted;
private IndexType indexType;
@ -296,6 +299,11 @@ public class Daemon extends SiteProgram {
test = true;
}
@VisibleForTesting
public void setAdditionalSysModuleForTesting(@Nullable Module m) {
testSysModule = m;
}
@VisibleForTesting
public void start() throws IOException {
if (dbInjector == null) {
@ -442,6 +450,9 @@ public class Daemon extends SiteProgram {
modules.add(new ChangeCleanupRunner.Module());
}
modules.addAll(LibModuleLoader.loadModules(cfgInjector));
if (testSysModule != null) {
modules.add(testSysModule);
}
return cfgInjector.createChildInjector(modules);
}
@ -452,11 +463,16 @@ public class Daemon extends SiteProgram {
if (luceneModule != null) {
return luceneModule;
}
boolean onlineUpgrade = VersionManager.getOnlineUpgrade(config);
switch (indexType) {
case LUCENE:
return LuceneIndexModule.latestVersionWithOnlineUpgrade();
return onlineUpgrade
? LuceneIndexModule.latestVersionWithOnlineUpgrade()
: LuceneIndexModule.latestVersionWithoutOnlineUpgrade();
case ELASTICSEARCH:
return ElasticIndexModule.latestVersionWithOnlineUpgrade();
return onlineUpgrade
? ElasticIndexModule.latestVersionWithOnlineUpgrade()
: ElasticIndexModule.latestVersionWithoutOnlineUpgrade();
default:
throw new IllegalStateException("unsupported index.type = " + indexType);
}

View File

@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
@ -108,6 +109,8 @@ public class IndexModule extends LifecycleModule {
bind(GroupIndexCollection.class);
listener().to(GroupIndexCollection.class);
factory(GroupIndexerImpl.Factory.class);
DynamicSet.setOf(binder(), OnlineUpgradeListener.class);
}
@Provides

View File

@ -17,6 +17,7 @@ package com.google.gerrit.server.index;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import com.google.gerrit.extensions.registration.DynamicSet;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -26,16 +27,26 @@ import org.slf4j.LoggerFactory;
public class OnlineReindexer<K, V, I extends Index<K, V>> {
private static final Logger log = LoggerFactory.getLogger(OnlineReindexer.class);
private final String name;
private final IndexCollection<K, V, I> indexes;
private final SiteIndexer<K, V, I> batchIndexer;
private final int version;
private final int oldVersion;
private final int newVersion;
private final DynamicSet<OnlineUpgradeListener> listeners;
private I index;
private final AtomicBoolean running = new AtomicBoolean();
public OnlineReindexer(IndexDefinition<K, V, I> def, int version) {
public OnlineReindexer(
IndexDefinition<K, V, I> def,
int oldVersion,
int newVersion,
DynamicSet<OnlineUpgradeListener> listeners) {
this.name = def.getName();
this.indexes = def.getIndexCollection();
this.batchIndexer = def.getSiteIndexer();
this.version = version;
this.oldVersion = oldVersion;
this.newVersion = newVersion;
this.listeners = listeners;
}
public void start() {
@ -44,14 +55,21 @@ public class OnlineReindexer<K, V, I extends Index<K, V>> {
new Thread() {
@Override
public void run() {
boolean ok = false;
try {
reindex();
ok = true;
} finally {
running.set(false);
if (!ok) {
for (OnlineUpgradeListener listener : listeners) {
listener.onFailure(name, oldVersion, newVersion);
}
}
}
}
};
t.setName(String.format("Reindex v%d-v%d", version(indexes.getSearchIndex()), version));
t.setName(String.format("Reindex v%d-v%d", version(indexes.getSearchIndex()), newVersion));
t.start();
}
}
@ -61,7 +79,7 @@ public class OnlineReindexer<K, V, I extends Index<K, V>> {
}
public int getVersion() {
return version;
return newVersion;
}
private static int version(Index<?, ?> i) {
@ -69,9 +87,14 @@ public class OnlineReindexer<K, V, I extends Index<K, V>> {
}
private void reindex() {
for (OnlineUpgradeListener listener : listeners) {
listener.onStart(name, oldVersion, newVersion);
}
index =
checkNotNull(
indexes.getWriteIndex(version), "not an active write schema version: %s", version);
indexes.getWriteIndex(newVersion),
"not an active write schema version: %s",
newVersion);
log.info(
"Starting online reindex from schema version {} to {}",
version(indexes.getSearchIndex()),
@ -88,6 +111,9 @@ public class OnlineReindexer<K, V, I extends Index<K, V>> {
}
log.info("Reindex to version {} complete", version(index));
activateIndex();
for (OnlineUpgradeListener listener : listeners) {
listener.onSuccess(name, oldVersion, newVersion);
}
}
public void activateIndex() {

View File

@ -0,0 +1,45 @@
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.index;
/** Listener for online schema upgrade events. */
public interface OnlineUpgradeListener {
/**
* Called before starting upgrading a single index.
*
* @param name index definition name.
* @param oldVersion old schema version.
* @param newVersion new schema version.
*/
void onStart(String name, int oldVersion, int newVersion);
/**
* Called after successfully upgrading a single index.
*
* @param name index definition name.
* @param oldVersion old schema version.
* @param newVersion new schema version.
*/
void onSuccess(String name, int oldVersion, int newVersion);
/**
* Called after failing to upgrade a single index.
*
* @param name index definition name.
* @param oldVersion old schema version.
* @param newVersion new schema version.
*/
void onFailure(String name, int oldVersion, int newVersion);
}

View File

@ -0,0 +1,39 @@
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.index;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.inject.Inject;
/** Listener to handle upgrading index schema versions at startup. */
public class OnlineUpgrader implements LifecycleListener {
private final VersionManager versionManager;
@Inject
OnlineUpgrader(VersionManager versionManager) {
this.versionManager = versionManager;
}
@Override
public void start() {
versionManager.startOnlineUpgrade();
}
@Override
public void stop() {
// Do nothing; reindexing threadpools are shut down in another listener, and indexes are closed
// on demand by IndexCollection.
}
}

View File

@ -15,11 +15,13 @@
package com.google.gerrit.server.index;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.IndexDefinition.IndexFactory;
import com.google.inject.ProvisionException;
@ -31,7 +33,11 @@ import java.util.TreeMap;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Config;
public abstract class AbstractVersionManager implements LifecycleListener {
public abstract class VersionManager implements LifecycleListener {
public static boolean getOnlineUpgrade(Config cfg) {
return cfg.getBoolean("index", null, "onlineUpgrade", true);
}
public static class Version<V> {
public final Schema<V> schema;
public final int version;
@ -48,22 +54,28 @@ public abstract class AbstractVersionManager implements LifecycleListener {
protected final boolean onlineUpgrade;
protected final String runReindexMsg;
protected final SitePaths sitePaths;
private final DynamicSet<OnlineUpgradeListener> listeners;
// The following fields must be accessed synchronized on this.
protected final Map<String, IndexDefinition<?, ?, ?>> defs;
protected final Map<String, OnlineReindexer<?, ?, ?>> reindexers;
protected AbstractVersionManager(
@GerritServerConfig Config cfg,
protected VersionManager(
SitePaths sitePaths,
Collection<IndexDefinition<?, ?, ?>> defs) {
DynamicSet<OnlineUpgradeListener> listeners,
Collection<IndexDefinition<?, ?, ?>> defs,
boolean onlineUpgrade) {
this.sitePaths = sitePaths;
this.listeners = listeners;
this.defs = Maps.newHashMapWithExpectedSize(defs.size());
for (IndexDefinition<?, ?, ?> def : defs) {
this.defs.put(def.getName(), def);
}
reindexers = Maps.newHashMapWithExpectedSize(defs.size());
onlineUpgrade = cfg.getBoolean("index", null, "onlineUpgrade", true);
runReindexMsg =
this.reindexers = Maps.newHashMapWithExpectedSize(defs.size());
this.onlineUpgrade = onlineUpgrade;
this.runReindexMsg =
"No index versions for index '%s' ready; run java -jar "
+ sitePaths.gerrit_war.toAbsolutePath()
+ " reindex";
@ -162,11 +174,37 @@ public abstract class AbstractVersionManager implements LifecycleListener {
synchronized (this) {
if (!reindexers.containsKey(def.getName())) {
int latest = write.get(0).version;
OnlineReindexer<K, V, I> reindexer = new OnlineReindexer<>(def, latest);
OnlineReindexer<K, V, I> reindexer =
new OnlineReindexer<>(def, search.version, latest, listeners);
reindexers.put(def.getName(), reindexer);
if (onlineUpgrade && latest != search.version) {
reindexer.start();
}
}
}
}
synchronized void startOnlineUpgrade() {
checkState(onlineUpgrade, "online upgrade not enabled");
for (IndexDefinition<?, ?, ?> def : defs.values()) {
String name = def.getName();
IndexCollection<?, ?, ?> indexes = def.getIndexCollection();
Index<?, ?> search = indexes.getSearchIndex();
checkState(
search != null, "no search index ready for %s; should have failed at startup", name);
int searchVersion = search.getSchema().getVersion();
List<Index<?, ?>> write = ImmutableList.copyOf(indexes.getWriteIndexes());
checkState(
!write.isEmpty(),
"no write indexes set for %s; should have been initialized at startup",
name);
int latestWriteVersion = write.get(0).getSchema().getVersion();
if (latestWriteVersion != searchVersion) {
OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
checkState(
reindexer != null,
"no reindexer found for %s; should have been initialized at startup",
name);
reindexer.start();
}
}
}