Merge branch 'stable-2.14' into stable-2.15

* stable-2.14:
  ElasticIndexVersionDiscovery: Convert to Java stream API
  ElasticVersionManager: Use correct configuration value for prefix
  Elasticsearch: Refactor generation of bulk requests
  Rename AbstractReindexIT to AbstractReindexTests
  ElasticReindexIT: Use @ConfigSuite.Default instead of @ConfigSuite.Config

UpdateRequest, added in I7156bc1e3 ("Elasticsearch: Refactor generation of
bulk requests"), is adapted to the removal of FillArgs in I0785c7e2c.

Change-Id: I30c8e259416e3acd6b61b1623e3a79fff3465edd
This commit is contained in:
David Pursehouse
2018-05-30 15:24:33 +09:00
15 changed files with 265 additions and 104 deletions

View File

@@ -16,19 +16,15 @@ package com.google.gerrit.elasticsearch;
import static com.google.gson.FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static org.apache.commons.codec.binary.Base64.decodeBase64;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.common.io.CharStreams;
import com.google.gerrit.elasticsearch.builders.SearchSourceBuilder;
import com.google.gerrit.elasticsearch.builders.XContentBuilder;
import com.google.gerrit.elasticsearch.bulk.DeleteRequest;
import com.google.gerrit.index.Index;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.Schema.Values;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.IndexUtils;
@@ -50,10 +46,6 @@ import java.util.List;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.eclipse.jgit.lib.Config;
@@ -61,11 +53,8 @@ import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
protected static final String BULK = "_bulk";
protected static final String DELETE = "delete";
protected static final String IGNORE_UNMAPPED = "ignore_unmapped";
protected static final String INDEX = "index";
protected static final String ORDER = "order";
protected static final String SEARCH = "_search";
@@ -143,7 +132,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
@Override
public void delete(K c) throws IOException {
String uri = getURI(indexNameRaw, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, addActions(c), uri, getRefreshParam());
Response response = postRequest(addActions(c), uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
@@ -154,10 +143,10 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
@Override
public void deleteAll() throws IOException {
// Delete the index, if it exists.
Response response = client.performRequest(HttpHead.METHOD_NAME, indexName);
Response response = client.performRequest("HEAD", indexName);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
response = client.performRequest(HttpDelete.METHOD_NAME, indexName);
response = client.performRequest("DELETE", indexName);
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
@@ -166,8 +155,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
}
// Recreate the index.
response =
performRequest(HttpPut.METHOD_NAME, getMappings(), indexName, Collections.emptyMap());
response = performRequest("PUT", getMappings(), indexName, Collections.emptyMap());
statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
String error = String.format("Failed to create index %s: %s", indexName, statusCode);
@@ -183,44 +171,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
protected String delete(String type, K c) {
String id = c.toString();
return toAction(type, id, DELETE);
}
private static boolean shouldAddElement(Object element) {
return !(element instanceof String) || !((String) element).isEmpty();
}
protected String toDoc(V v) throws IOException {
try (XContentBuilder closeable = new XContentBuilder()) {
XContentBuilder builder = closeable.startObject();
for (Values<V> values : schema.buildFields(v)) {
String name = values.getField().getName();
if (values.getField().isRepeatable()) {
builder.field(
name,
Streams.stream(values.getValues())
.filter(e -> shouldAddElement(e))
.collect(toList()));
} else {
Object element = Iterables.getOnlyElement(values.getValues(), "");
if (shouldAddElement(element)) {
builder.field(name, element);
}
}
}
return builder.endObject().string() + System.lineSeparator();
}
}
protected String toAction(String type, String id, String action) {
JsonObject properties = new JsonObject();
properties.addProperty("_id", id);
properties.addProperty("_index", indexName);
properties.addProperty("_type", type);
JsonObject jsonAction = new JsonObject();
jsonAction.add(action, properties);
return jsonAction.toString() + System.lineSeparator();
return new DeleteRequest(id, indexNameRaw, type).toString();
}
protected void addNamedElement(String name, JsonObject element, JsonArray array) {
@@ -257,9 +208,15 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
return encodedIndexName + "/" + encodedType + "/" + request;
}
protected Response performRequest(
String method, String payload, String uri, Map<String, String> params) throws IOException {
HttpEntity entity = new NStringEntity(payload, ContentType.APPLICATION_JSON);
protected Response postRequest(Object payload, String uri, Map<String, String> params)
throws IOException {
return performRequest("POST", payload, uri, params);
}
private Response performRequest(
String method, Object payload, String uri, Map<String, String> params) throws IOException {
String payloadStr = payload instanceof String ? (String) payload : payload.toString();
HttpEntity entity = new NStringEntity(payloadStr, ContentType.APPLICATION_JSON);
return client.performRequest(method, uri, params, entity);
}
}

View File

@@ -21,6 +21,9 @@ import com.google.common.collect.Lists;
import com.google.gerrit.elasticsearch.ElasticMapping.MappingProperties;
import com.google.gerrit.elasticsearch.builders.QueryBuilder;
import com.google.gerrit.elasticsearch.builders.SearchSourceBuilder;
import com.google.gerrit.elasticsearch.bulk.BulkRequest;
import com.google.gerrit.elasticsearch.bulk.IndexRequest;
import com.google.gerrit.elasticsearch.bulk.UpdateRequest;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.query.DataSource;
@@ -50,7 +53,6 @@ import java.util.List;
import java.util.Set;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpPost;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.client.Response;
import org.slf4j.Logger;
@@ -72,6 +74,7 @@ public class ElasticAccountIndex extends AbstractElasticIndex<Account.Id, Accoun
private final AccountMapping mapping;
private final Provider<AccountCache> accountCache;
private final Schema<AccountState> schema;
@AssistedInject
ElasticAccountIndex(
@@ -83,15 +86,16 @@ public class ElasticAccountIndex extends AbstractElasticIndex<Account.Id, Accoun
super(cfg, sitePaths, schema, clientBuilder, ACCOUNTS);
this.accountCache = accountCache;
this.mapping = new AccountMapping(schema);
this.schema = schema;
}
@Override
public void replace(AccountState as) throws IOException {
String bulk = toAction(ACCOUNTS, getId(as), INDEX);
bulk += toDoc(as);
BulkRequest bulk =
new IndexRequest(getId(as), indexName, ACCOUNTS).add(new UpdateRequest<>(schema, as));
String uri = getURI(ACCOUNTS, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, bulk, uri, getRefreshParam());
Response response = postRequest(bulk, uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
@@ -151,8 +155,7 @@ public class ElasticAccountIndex extends AbstractElasticIndex<Account.Id, Accoun
try {
List<AccountState> results = Collections.emptyList();
String uri = getURI(ACCOUNTS, SEARCH);
Response response =
performRequest(HttpPost.METHOD_NAME, search, uri, Collections.emptyMap());
Response response = postRequest(search, uri, Collections.emptyMap());
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() == HttpStatus.SC_OK) {
String content = getContent(response);

View File

@@ -33,6 +33,10 @@ import com.google.common.collect.Sets;
import com.google.gerrit.elasticsearch.ElasticMapping.MappingProperties;
import com.google.gerrit.elasticsearch.builders.QueryBuilder;
import com.google.gerrit.elasticsearch.builders.SearchSourceBuilder;
import com.google.gerrit.elasticsearch.bulk.BulkRequest;
import com.google.gerrit.elasticsearch.bulk.DeleteRequest;
import com.google.gerrit.elasticsearch.bulk.IndexRequest;
import com.google.gerrit.elasticsearch.bulk.UpdateRequest;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.query.Predicate;
@@ -71,7 +75,6 @@ import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpPost;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.client.Response;
import org.slf4j.Logger;
@@ -100,6 +103,7 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
private final ChangeMapping mapping;
private final Provider<ReviewDb> db;
private final ChangeData.Factory changeDataFactory;
private final Schema<ChangeData> schema;
@Inject
ElasticChangeIndex(
@@ -112,6 +116,7 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
super(cfg, sitePaths, schema, clientBuilder, CHANGES);
this.db = db;
this.changeDataFactory = changeDataFactory;
this.schema = schema;
mapping = new ChangeMapping(schema);
}
@@ -132,12 +137,13 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
throw new IOException(e);
}
String bulk = toAction(insertIndex, getId(cd), INDEX);
bulk += toDoc(cd);
bulk += toAction(deleteIndex, cd.getId().toString(), DELETE);
BulkRequest bulk =
new IndexRequest(getId(cd), indexName, insertIndex)
.add(new UpdateRequest<>(schema, cd))
.add(new DeleteRequest(cd.getId().toString(), indexName, deleteIndex));
String uri = getURI(CHANGES, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, bulk, uri, getRefreshParam());
Response response = postRequest(bulk, uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
@@ -205,8 +211,7 @@ class ElasticChangeIndex extends AbstractElasticIndex<Change.Id, ChangeData>
try {
List<ChangeData> results = Collections.emptyList();
String uri = getURI(types);
Response response =
performRequest(HttpPost.METHOD_NAME, search, uri, Collections.emptyMap());
Response response = postRequest(search, uri, Collections.emptyMap());
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() == HttpStatus.SC_OK) {
String content = getContent(response);

View File

@@ -19,6 +19,9 @@ import com.google.common.collect.Lists;
import com.google.gerrit.elasticsearch.ElasticMapping.MappingProperties;
import com.google.gerrit.elasticsearch.builders.QueryBuilder;
import com.google.gerrit.elasticsearch.builders.SearchSourceBuilder;
import com.google.gerrit.elasticsearch.bulk.BulkRequest;
import com.google.gerrit.elasticsearch.bulk.IndexRequest;
import com.google.gerrit.elasticsearch.bulk.UpdateRequest;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.query.DataSource;
@@ -49,7 +52,6 @@ import java.util.Optional;
import java.util.Set;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.HttpPost;
import org.eclipse.jgit.lib.Config;
import org.elasticsearch.client.Response;
import org.slf4j.Logger;
@@ -71,6 +73,7 @@ public class ElasticGroupIndex extends AbstractElasticIndex<AccountGroup.UUID, I
private final GroupMapping mapping;
private final Provider<GroupCache> groupCache;
private final Schema<InternalGroup> schema;
@AssistedInject
ElasticGroupIndex(
@@ -82,15 +85,16 @@ public class ElasticGroupIndex extends AbstractElasticIndex<AccountGroup.UUID, I
super(cfg, sitePaths, schema, clientBuilder, GROUPS);
this.groupCache = groupCache;
this.mapping = new GroupMapping(schema);
this.schema = schema;
}
@Override
public void replace(InternalGroup group) throws IOException {
String bulk = toAction(GROUPS, getId(group), INDEX);
bulk += toDoc(group);
BulkRequest bulk =
new IndexRequest(getId(group), indexName, GROUPS).add(new UpdateRequest<>(schema, group));
String uri = getURI(GROUPS, BULK);
Response response = performRequest(HttpPost.METHOD_NAME, bulk, uri, getRefreshParam());
Response response = postRequest(bulk, uri, getRefreshParam());
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new IOException(
@@ -150,8 +154,7 @@ public class ElasticGroupIndex extends AbstractElasticIndex<AccountGroup.UUID, I
try {
List<InternalGroup> results = Collections.emptyList();
String uri = getURI(GROUPS, SEARCH);
Response response =
performRequest(HttpPost.METHOD_NAME, search, uri, Collections.emptyMap());
Response response = postRequest(search, uri, Collections.emptyMap());
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() == HttpStatus.SC_OK) {
String content = getContent(response);

View File

@@ -14,16 +14,14 @@
package com.google.gerrit.elasticsearch;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import static java.util.stream.Collectors.toList;
import com.google.gson.JsonParser;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.client.Response;
@@ -42,16 +40,14 @@ class ElasticIndexVersionDiscovery {
String name = prefix + indexName + "_";
Response response = client.performRequest(HttpGet.METHOD_NAME, name + "*/_aliases");
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
String content = AbstractElasticIndex.getContent(response);
JsonObject object = new JsonParser().parse(content).getAsJsonObject();
List<String> versions = new ArrayList<>(object.size());
for (Entry<String, JsonElement> entry : object.entrySet()) {
versions.add(entry.getKey().replace(name, ""));
}
return versions;
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
return new JsonParser()
.parse(AbstractElasticIndex.getContent(response))
.getAsJsonObject()
.entrySet()
.stream()
.map(e -> e.getKey().replace(name, ""))
.collect(toList());
}
return Collections.emptyList();
}

View File

@@ -14,7 +14,7 @@
package com.google.gerrit.elasticsearch;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.primitives.Ints;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.index.Index;
@@ -50,7 +50,7 @@ public class ElasticVersionManager extends VersionManager {
ElasticIndexVersionDiscovery versionDiscovery) {
super(sitePaths, listeners, defs, VersionManager.getOnlineUpgrade(cfg));
this.versionDiscovery = versionDiscovery;
prefix = MoreObjects.firstNonNull(cfg.getString("index", null, "prefix"), "gerrit");
prefix = Strings.nullToEmpty(cfg.getString("elasticsearch", null, "prefix"));
}
@Override

View File

@@ -0,0 +1,44 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
import com.google.gson.JsonObject;
abstract class ActionRequest extends BulkRequest {
private final String action;
private final String id;
private final String index;
private final String type;
protected ActionRequest(String action, String id, String index, String type) {
this.action = action;
this.id = id;
this.index = index;
this.type = type;
}
@Override
protected String getRequest() {
JsonObject properties = new JsonObject();
properties.addProperty("_id", id);
properties.addProperty("_index", index);
properties.addProperty("_type", type);
JsonObject jsonAction = new JsonObject();
jsonAction.add(action, properties);
return jsonAction.toString() + System.lineSeparator();
}
}

View File

@@ -0,0 +1,43 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
import java.util.ArrayList;
import java.util.List;
public abstract class BulkRequest {
private final List<BulkRequest> requests = new ArrayList<>();
protected BulkRequest() {
add(this);
}
public BulkRequest add(BulkRequest request) {
requests.add(request);
return this;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
for (BulkRequest request : requests) {
builder.append(request.getRequest());
}
return builder.toString();
}
protected abstract String getRequest();
}

View File

@@ -0,0 +1,22 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
public class DeleteRequest extends ActionRequest {
public DeleteRequest(String id, String index, String type) {
super("delete", id, index, type);
}
}

View File

@@ -0,0 +1,22 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
public class IndexRequest extends ActionRequest {
public IndexRequest(String id, String index, String type) {
super("index", id, index, type);
}
}

View File

@@ -0,0 +1,64 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch.bulk;
import static java.util.stream.Collectors.toList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.google.gerrit.elasticsearch.builders.XContentBuilder;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.Schema.Values;
import java.io.IOException;
public class UpdateRequest<V> extends BulkRequest {
private final Schema<V> schema;
private final V v;
public UpdateRequest(Schema<V> schema, V v) {
this.schema = schema;
this.v = v;
}
@Override
protected String getRequest() {
try (XContentBuilder closeable = new XContentBuilder()) {
XContentBuilder builder = closeable.startObject();
for (Values<V> values : schema.buildFields(v)) {
String name = values.getField().getName();
if (values.getField().isRepeatable()) {
builder.field(
name,
Streams.stream(values.getValues())
.filter(e -> shouldAddElement(e))
.collect(toList()));
} else {
Object element = Iterables.getOnlyElement(values.getValues(), "");
if (shouldAddElement(element)) {
builder.field(name, element);
}
}
}
return builder.endObject().string() + System.lineSeparator();
} catch (IOException e) {
return e.toString();
}
}
private boolean shouldAddElement(Object element) {
return !(element instanceof String) || !((String) element).isEmpty();
}
}