Elastic Search: Split reusable code to utility classes

As a preparatory step before adding the implementation of the account
index in Elastic Search, split out code that can be reused into utility
classes.

Change-Id: I46e0fe4e4e2ef969565cd2f481ab45c46397016e
Signed-off-by: Dariusz Luksza <dluksza@collab.net>
Signed-off-by: David Pursehouse <dpursehouse@collab.net>
This commit is contained in:
Dariusz Luksza
2016-09-22 14:44:40 +02:00
committed by David Pursehouse
parent 91ff74efc8
commit 1d71d309a4
8 changed files with 233 additions and 152 deletions

View File

@@ -30,22 +30,34 @@ java_library(
load('//tools/bzl:junit.bzl', 'junit_tests')
junit_tests(
name = 'elasticsearch_tests',
tags = ['elastic', 'flaky'],
srcs = glob(['src/test/java/**/*.java']),
size = "large",
java_library(
name = 'elasticsearch_test_utils',
srcs = glob(['src/test/java/**/ElasticTestUtils.java']),
deps = [
':elasticsearch',
'//gerrit-extension-api:api',
'//gerrit-server:server',
'//gerrit-server:testutil',
'//gerrit-server:query_tests_code',
'//lib:gson',
'//lib:guava',
'//lib:junit',
'//lib:truth',
'//lib/elasticsearch:elasticsearch',
'//lib/jgit/org.eclipse.jgit:jgit',
'//lib/jgit/org.eclipse.jgit.junit:junit',
],
testonly = 1,
)
junit_tests(
name = 'elasticsearch_tests',
tags = ['elastic', 'flaky'],
srcs = glob(['src/test/java/**/*Test.java']),
size = "large",
deps = [
':elasticsearch_test_utils',
':elasticsearch',
'//gerrit-server:query_tests_code',
'//gerrit-server:server',
'//gerrit-server:testutil',
'//lib/guice:guice',
'//lib/jgit/org.eclipse.jgit:jgit',
'//lib/jgit/org.eclipse.jgit.junit:junit',

View File

@@ -15,9 +15,11 @@
package com.google.gerrit.elasticsearch;
import static com.google.common.base.Preconditions.checkState;
import static org.apache.commons.codec.binary.Base64.decodeBase64;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
@@ -26,6 +28,9 @@ import com.google.gerrit.server.index.Index;
import com.google.gerrit.server.index.IndexUtils;
import com.google.gerrit.server.index.Schema;
import com.google.gerrit.server.index.Schema.Values;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gwtorm.protobuf.ProtobufCodec;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -33,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.concurrent.TimeUnit;
import io.searchbox.client.JestClientFactory;
@@ -46,6 +52,16 @@ import io.searchbox.indices.DeleteIndex;
import io.searchbox.indices.IndicesExists;
abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
protected static <T> List<T> decodeProtos(JsonObject doc, String fieldName,
ProtobufCodec<T> codec) {
JsonArray field = doc.getAsJsonArray(fieldName);
if (field == null) {
return null;
}
return FluentIterable.from(field)
.transform(i -> codec.decode(decodeBase64(i.toString())))
.toList();
}
private final Schema<V> schema;
private final FillArgs fillArgs;
@@ -55,7 +71,6 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
protected final String indexName;
protected final JestHttpClient client;
AbstractElasticIndex(@GerritServerConfig Config cfg,
FillArgs fillArgs,
SitePaths sitePaths,

View File

@@ -34,9 +34,7 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.ReviewerSet;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.FieldDef;
import com.google.gerrit.server.index.FieldDef.FillArgs;
import com.google.gerrit.server.index.FieldType;
import com.google.gerrit.server.index.IndexUtils;
import com.google.gerrit.server.index.QueryOptions;
import com.google.gerrit.server.index.Schema;
@@ -56,7 +54,6 @@ import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gwtorm.protobuf.ProtobufCodec;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.ResultSet;
import com.google.inject.Provider;
@@ -94,30 +91,9 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
MappingProperties closedChanges;
ChangeMapping(Schema<ChangeData> schema) {
ElasticMapping.Builder mappingBuilder = new ElasticMapping.Builder();
for (FieldDef<?, ?> field : schema.getFields().values()) {
String name = field.getName();
FieldType<?> fieldType = field.getType();
if (fieldType == FieldType.EXACT) {
mappingBuilder.addExactField(name);
} else if (fieldType == FieldType.TIMESTAMP) {
mappingBuilder.addTimestamp(name);
} else if (fieldType == FieldType.INTEGER
|| fieldType == FieldType.INTEGER_RANGE
|| fieldType == FieldType.LONG) {
mappingBuilder.addNumber(name);
} else if (fieldType == FieldType.PREFIX
|| fieldType == FieldType.FULL_TEXT
|| fieldType == FieldType.STORED_ONLY) {
mappingBuilder.addString(name);
} else {
throw new IllegalArgumentException(
"Unsupported field type " + fieldType.getName());
}
}
MappingProperties mapping = mappingBuilder.build();
openChanges = mapping;
closedChanges = mapping;
MappingProperties mapping = ElasticMapping.createMapping(schema);
this.openChanges = mapping;
this.closedChanges = mapping;
}
}
@@ -149,17 +125,6 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
.setFieldNamingPolicy(LOWER_CASE_WITH_UNDERSCORES).create();
}
private static <T> List<T> decodeProtos(JsonObject doc, String fieldName,
ProtobufCodec<T> codec) {
JsonArray field = doc.getAsJsonArray(fieldName);
if (field == null) {
return null;
}
return FluentIterable.from(field)
.transform(i -> codec.decode(decodeBase64(i.toString())))
.toList();
}
@Override
public void replace(ChangeData cd) throws IOException {
String deleteIndex;
@@ -236,7 +201,7 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
sort.setIgnoreUnmapped();
}
QueryBuilder qb = queryBuilder.toQueryBuilder(p);
fields = IndexUtils.fields(opts);
fields = IndexUtils.changeFields(opts);
SearchSourceBuilder searchSource = new SearchSourceBuilder()
.query(qb)
.from(opts.start())

View File

@@ -15,10 +15,38 @@
package com.google.gerrit.elasticsearch;
import com.google.common.collect.ImmutableMap;
import com.google.gerrit.server.index.FieldDef;
import com.google.gerrit.server.index.FieldType;
import com.google.gerrit.server.index.Schema;
import java.util.Map;
class ElasticMapping {
static MappingProperties createMapping(Schema<?> schema) {
ElasticMapping.Builder mapping = new ElasticMapping.Builder();
for (FieldDef<?, ?> field : schema.getFields().values()) {
String name = field.getName();
FieldType<?> fieldType = field.getType();
if (fieldType == FieldType.EXACT) {
mapping.addExactField(name);
} else if (fieldType == FieldType.TIMESTAMP) {
mapping.addTimestamp(name);
} else if (fieldType == FieldType.INTEGER
|| fieldType == FieldType.INTEGER_RANGE
|| fieldType == FieldType.LONG) {
mapping.addNumber(name);
} else if (fieldType == FieldType.PREFIX
|| fieldType == FieldType.FULL_TEXT
|| fieldType == FieldType.STORED_ONLY) {
mapping.addString(name);
} else {
throw new IllegalStateException(
"Unsupported field type: " + fieldType.getName());
}
}
return mapping.build();
}
static class Builder {
private final ImmutableMap.Builder<String, FieldProperties> fields =
new ImmutableMap.Builder<>();

View File

@@ -14,123 +14,60 @@
package com.google.gerrit.elasticsearch;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static com.google.gerrit.elasticsearch.ElasticChangeIndex.CHANGES_PREFIX;
import static com.google.gerrit.elasticsearch.ElasticChangeIndex.CLOSED_CHANGES;
import static com.google.gerrit.elasticsearch.ElasticChangeIndex.OPEN_CHANGES;
import com.google.common.base.Strings;
import com.google.common.io.Files;
import com.google.gerrit.elasticsearch.ElasticChangeIndex.ChangeMapping;
import com.google.gerrit.server.index.IndexModule.IndexType;
import com.google.gerrit.elasticsearch.ElasticTestUtils.ElasticNodeInfo;
import com.google.gerrit.server.index.change.ChangeSchemaDefinitions;
import com.google.gerrit.server.query.change.AbstractQueryChangesTest;
import com.google.gerrit.testutil.InMemoryModule;
import com.google.gerrit.testutil.InMemoryRepositoryManager.Repo;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.eclipse.jgit.junit.TestRepository;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class ElasticQueryChangesTest extends AbstractQueryChangesTest {
private static final Gson gson = new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
.create();
private static final String INDEX_NAME =
String.format("%s%04d", CHANGES_PREFIX,
ChangeSchemaDefinitions.INSTANCE.getLatest().getVersion());
private static Node node;
private static String port;
private static File elasticDir;
static class NodeInfo {
String httpAddress;
}
static class Info {
Map<String, NodeInfo> nodes;
}
private static ElasticNodeInfo nodeInfo;
@BeforeClass
public static void startIndexService()
throws InterruptedException, ExecutionException {
if (node != null) {
if (nodeInfo != null) {
// do not start Elasticsearch twice
return;
}
elasticDir = Files.createTempDir();
Path elasticDirPath = elasticDir.toPath();
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "gerrit")
.put("node.name", "Gerrit Elasticsearch Test Node")
.put("node.local", true)
.put("discovery.zen.ping.multicast.enabled", false)
.put("index.store.fs.memory.enabled", true)
.put("index.gateway.type", "none")
.put("index.max_result_window", Integer.MAX_VALUE)
.put("gateway.type", "default")
.put("http.port", 0)
.put("discovery.zen.ping.unicast.hosts", "[\"localhost\"]")
.put("path.home", elasticDirPath.toAbsolutePath())
.put("path.data", elasticDirPath.resolve("data").toAbsolutePath())
.put("path.work", elasticDirPath.resolve("work").toAbsolutePath())
.put("path.logs", elasticDirPath.resolve("logs").toAbsolutePath())
.put("transport.tcp.connect_timeout", "60s")
.build();
// Start the node
node = NodeBuilder.nodeBuilder()
.settings(settings)
.node();
// Wait for it to be ready
node.client()
.admin()
.cluster()
.prepareHealth()
.setWaitForYellowStatus()
.execute()
.actionGet();
nodeInfo = ElasticTestUtils.startElasticsearchNode();
createIndexes();
assertThat(node.isClosed()).isFalse();
port = getHttpPort();
}
@After
public void cleanupIndex() {
node.client().admin().indices().prepareDelete(INDEX_NAME).execute();
if (nodeInfo != null) {
ElasticTestUtils.deleteIndexes(nodeInfo.node, INDEX_NAME);
createIndexes();
}
}
@AfterClass
public static void stopElasticsearchServer() {
if (node != null) {
node.close();
node = null;
}
if (elasticDir != null && elasticDir.delete()) {
elasticDir = null;
if (nodeInfo != null) {
nodeInfo.node.close();
nodeInfo.elasticDir.delete();
nodeInfo = null;
}
}
@@ -138,11 +75,7 @@ public class ElasticQueryChangesTest extends AbstractQueryChangesTest {
protected Injector createInjector() {
Config elasticsearchConfig = new Config(config);
InMemoryModule.setDefaults(elasticsearchConfig);
elasticsearchConfig.setEnum("index", null, "type", IndexType.ELASTICSEARCH);
elasticsearchConfig.setString("index", null, "protocol", "http");
elasticsearchConfig.setString("index", null, "hostname", "localhost");
elasticsearchConfig.setString("index", null, "port", port);
elasticsearchConfig.setBoolean("index", "elasticsearch", "test", true);
ElasticTestUtils.configure(elasticsearchConfig, nodeInfo.port);
return Guice.createInjector(
new InMemoryModule(elasticsearchConfig, notesMigration));
}
@@ -154,35 +87,19 @@ public class ElasticQueryChangesTest extends AbstractQueryChangesTest {
new ChangeMapping(ChangeSchemaDefinitions.INSTANCE.getLatest());
openChangesMapping.closedChanges = null;
closedChangesMapping.openChanges = null;
node.client()
nodeInfo.node
.client()
.admin()
.indices()
.prepareCreate(INDEX_NAME)
.addMapping(OPEN_CHANGES, gson.toJson(openChangesMapping))
.addMapping(CLOSED_CHANGES, gson.toJson(closedChangesMapping))
.addMapping(OPEN_CHANGES,
ElasticTestUtils.gson.toJson(openChangesMapping))
.addMapping(CLOSED_CHANGES,
ElasticTestUtils.gson.toJson(closedChangesMapping))
.execute()
.actionGet();
}
private static String getHttpPort()
throws InterruptedException, ExecutionException {
String nodes = node.client().admin().cluster()
.nodesInfo(new NodesInfoRequest("*")).get().toString();
Gson gson = new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
.create();
Info info = gson.fromJson(nodes, Info.class);
checkState(info.nodes != null && info.nodes.size() == 1);
Iterator<NodeInfo> values = info.nodes.values().iterator();
String httpAddress = values.next().httpAddress;
checkState(
!Strings.isNullOrEmpty(httpAddress) && httpAddress.indexOf(':') > 0);
return httpAddress.substring(httpAddress.indexOf(':') + 1,
httpAddress.length());
}
@Test
public void byOwnerInvalidQuery() throws Exception {
TestRepository<Repo> repo = createProject("repo");

View File

@@ -0,0 +1,144 @@
// Copyright (C) 2016 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.base.Strings;
import com.google.common.io.Files;
import com.google.gerrit.server.index.IndexModule.IndexType;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import java.io.File;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
final class ElasticTestUtils {
static final Gson gson = new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
.create();
static class ElasticNodeInfo {
final Node node;
final String port;
final File elasticDir;
private ElasticNodeInfo(Node node, File rootDir, String port) {
this.node = node;
this.port = port;
this.elasticDir = rootDir;
}
}
static void configure(Config config, String port) {
config.setEnum("index", null, "type", IndexType.ELASTICSEARCH);
config.setString("index", null, "protocol", "http");
config.setString("index", null, "hostname", "localhost");
config.setString("index", null, "port", port);
config.setBoolean("index", "elasticsearch", "test", true);
}
static ElasticNodeInfo startElasticsearchNode()
throws InterruptedException, ExecutionException {
File elasticDir = Files.createTempDir();
Path elasticDirPath = elasticDir.toPath();
Settings settings = Settings.settingsBuilder()
.put("cluster.name", "gerrit")
.put("node.name", "Gerrit Elasticsearch Test Node")
.put("node.local", true)
.put("discovery.zen.ping.multicast.enabled", false)
.put("index.store.fs.memory.enabled", true)
.put("index.gateway.type", "none")
.put("index.max_result_window", Integer.MAX_VALUE)
.put("gateway.type", "default")
.put("http.port", 0)
.put("discovery.zen.ping.unicast.hosts", "[\"localhost\"]")
.put("path.home", elasticDirPath.toAbsolutePath())
.put("path.data", elasticDirPath.resolve("data").toAbsolutePath())
.put("path.work", elasticDirPath.resolve("work").toAbsolutePath())
.put("path.logs", elasticDirPath.resolve("logs").toAbsolutePath())
.put("transport.tcp.connect_timeout", "60s")
.build();
// Start the node
Node node = NodeBuilder.nodeBuilder()
.settings(settings)
.node();
// Wait for it to be ready
node.client()
.admin()
.cluster()
.prepareHealth()
.setWaitForYellowStatus()
.execute()
.actionGet();
assertThat(node.isClosed()).isFalse();
return new ElasticNodeInfo(node, elasticDir, getHttpPort(node));
}
static void deleteIndexes(Node node, String index) {
node.client().admin().indices().prepareDelete(index).execute();
}
static class NodeInfo {
String httpAddress;
}
static class Info {
Map<String, NodeInfo> nodes;
}
private static String getHttpPort(Node node)
throws InterruptedException, ExecutionException {
String nodes = node.client().admin().cluster()
.nodesInfo(new NodesInfoRequest("*")).get().toString();
Gson gson = new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
.create();
Info info = gson.fromJson(nodes, Info.class);
if (info.nodes == null || info.nodes.size() != 1) {
throw new RuntimeException(
"Cannot extract local Elasticsearch http port");
}
Iterator<NodeInfo> values = info.nodes.values().iterator();
String httpAddress = values.next().httpAddress;
if (Strings.isNullOrEmpty(httpAddress)) {
throw new RuntimeException(
"Cannot extract local Elasticsearch http port");
}
if (httpAddress.indexOf(':') < 0) {
throw new RuntimeException(
"Seems that port is not included in Elasticsearch http_address");
}
return httpAddress.substring(httpAddress.indexOf(':') + 1,
httpAddress.length());
}
private ElasticTestUtils() {
// hide default constructor
}
}

View File

@@ -319,7 +319,7 @@ public class LuceneChangeIndex implements ChangeIndex {
throw new OrmException("interrupted");
}
final Set<String> fields = IndexUtils.fields(opts);
final Set<String> fields = IndexUtils.changeFields(opts);
return new ChangeDataResults(
executor.submit(new Callable<List<Document>>() {
@Override

View File

@@ -44,7 +44,7 @@ public final class IndexUtils {
}
}
public static Set<String> fields(QueryOptions opts) {
public static Set<String> changeFields(QueryOptions opts) {
// Ensure we request enough fields to construct a ChangeData. We need both
// change ID and project, which can either come via the Change field or
// separate fields.