Re-indexing optimization for doc_type

When the user calls the "index sync" command and specifies a
document type (i.e. OS::Nova::Server) we will go to all plugins
and have them re-index. Instead only the specified doc type(s)
should re-index through their plugins. The other doc types should
be transferred using internal ElasticSearch functionality, like
the Helper reindex method. If no type is specified we will reindex
through the plugins.

Change-Id: Ida85002c306a52ccb6b5ec73a3dbe021bca333bc
Closes-Bug: #1558618
This commit is contained in:
Rick Aulino 2016-03-21 15:37:22 -06:00
parent 7634f97dc0
commit cb2137bd23
2 changed files with 140 additions and 25 deletions

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import six
import sys
@ -99,12 +100,12 @@ class IndexCommands(object):
# when "group" is empty.
resource_groups = []
plugin_objs = {}
plugins_to_index = []
plugins_list = []
for res_type, ext in six.iteritems(utils.get_search_plugins()):
plugin_obj = ext.obj
group_set.discard(plugin_obj.resource_group_name)
if (not group) or (plugin_obj.resource_group_name in group):
plugins_to_index.append((res_type, ext))
plugins_list.append((res_type, ext))
plugin_objs[plugin_obj.resource_group_name] = plugin_obj
if not (plugin_obj.resource_group_name,
plugin_obj.alias_name_search,
@ -128,7 +129,7 @@ class IndexCommands(object):
# get_index_display_name(). Therefore any child plugins in the
# display list, will be listed twice.
display_plugins = []
for res, ext in plugins_to_index:
for res, ext in plugins_list:
if not ext.obj.parent_plugin:
display_plugins.append((res, ext))
@ -165,7 +166,7 @@ class IndexCommands(object):
try:
for group, search, listen in resource_groups:
index_names[group] = es_utils.create_new_index(group)
for resource_type, ext in plugins_to_index:
for resource_type, ext in plugins_list:
plugin_obj = ext.obj
group_name = plugin_obj.resource_group_name
plugin_obj.prepare_index(index_name=index_names[group_name])
@ -189,27 +190,53 @@ class IndexCommands(object):
es_utils.alias_error_cleanup(index_names)
raise
# Step #3: Re-index all specified resource types.
# NB: The aliases remain unchanged for this step.
# NBB: There is an optimization possible here. In the future when
# we have enable multiple resource_type_group entries, we
# will want to look at only deleting the failing index. We do
# not need to delete the indexes that successfully re-indexed.
# This can get tricky since we will still need to perform
# steps 4 & 5 for this aliases.
for resource_type, ext in plugins_to_index:
plugin_obj = ext.obj
group_name = plugin_obj.resource_group_name
try:
plugin_obj.initial_indexing(index_name=index_names[group_name])
except exceptions.EndpointNotFound:
LOG.warning(_LW("Service is not available for plugin: "
"%(ext)s") % {"ext": ext.name})
except Exception as e:
LOG.error(_LE("Failed to setup index extension "
"%(ext)s: %(e)s") % {'ext': ext.name, 'e': e})
es_utils.alias_error_cleanup(index_names)
raise
# Step #3: Re-index all resource types in this Resource Type Group.
# As an optimization, if any types are explicitly requested, we
# will index them from their service APIs. The rest will be
# indexed from an existing ES index, if one exists.
# NB: The "search" and "listener" aliases remain unchanged for this
# step.
es_reindex = []
plugins_to_index = copy.copy(plugins_list)
if _type:
for resource_type, ext in plugins_list:
doc_type = ext.obj.get_document_type()
if doc_type not in _type:
es_reindex.append(doc_type)
plugins_to_index.remove((resource_type, ext))
# Call plugin API as needed.
if plugins_to_index:
for res, ext in plugins_to_index:
plugin_obj = ext.obj
gname = plugin_obj.resource_group_name
try:
plugin_obj.initial_indexing(index_name=index_names[gname])
except exceptions.EndpointNotFound:
LOG.warning(_LW("Service is not available for plugin: "
"%(ext)s") % {"ext": ext.name})
except Exception as e:
LOG.error(_LE("Failed to setup index extension "
"%(ex)s: %(e)s") % {'ex': ext.name, 'e': e})
es_utils.alias_error_cleanup(index_names)
raise
# Call ElasticSearch for the rest, if needed.
if es_reindex:
for group in index_names.keys():
# Grab the correct tuple as a list, convert list to a single
# tuple, extract second member (the search alias) of tuple.
alias_search = \
[a for a in resource_groups if a[0] == group][0][1]
try:
es_utils.reindex(src_index=alias_search,
dst_index=index_names[group],
type_list=es_reindex)
except Exception as e:
LOG.error(_LE("Failed to setup index extension "
"%(ex)s: %(e)s") % {'ex': ext.name, 'e': e})
es_utils.alias_error_cleanup(index_names)
raise
# Step #4: Update the "search" alias.
# All re-indexing has occurred. The index/alias is the same for

View File

@ -64,6 +64,94 @@ def timestamp_to_isotime(timestamp):
return oslo_utils.timeutils.isotime(parsed_time)
def helper_reindex(client, source_index, target_index, query=None,
target_client=None, chunk_size=500, scroll='5m',
scan_kwargs={}, bulk_kwargs={}):
"""We have lovingly copied the entire helpers.reindex function here:
lib/python2.7/site-packages/elasticsearch/helpers/__init__.py.
We need our own version (haha) of reindex to handle external versioning
within a document. The current implmenentation of helpers.reindex does
not provide this support. Since there is no way to tell helpers.bulk()
that an external version is being used, we will need to modify each
document in the generator. For future maintainablilty, modifications
to the original method will be preceeded with a comment "CHANGED:".
Minor tweaks made for PEP8 conformance excepted.
"""
"""
Reindex all documents from one index that satisfy a given query
to another, potentially (if `target_client` is specified) on a different
cluster. If you don't specify the query you will reindex all the documents.
.. note::
This helper doesn't transfer mappings, just the data.
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use (for
read if `target_client` is specified as well)
:arg source_index: index (or list of indices) to read documents from
:arg target_index: name of the index in the target cluster to populate
:arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api
:arg target_client: optional, is specified will be used for writing (thus
enabling reindex between clusters)
:arg chunk_size: number of docs in one chunk sent to es (default: 500)
:arg scroll: Specify how long a consistent view of the index should be
maintained for scrolled search
:arg scan_kwargs: additional kwargs to be passed to
:func:`~elasticsearch.helpers.scan`
:arg bulk_kwargs: additional kwargs to be passed to
:func:`~elasticsearch.helpers.bulk`
"""
target_client = client if target_client is None else target_client
docs = helpers.scan(client, query=query, index=source_index,
scroll=scroll,
fields=('_source', '_parent', '_routing',
'_timestamp'), **scan_kwargs)
def _change_doc_index(hits, index):
for h in hits:
h['_index'] = index
# CHANGED: Allow for external versions to be indexed.
h['_version_type'] = "external"
if 'fields' in h:
h.update(h.pop('fields'))
yield h
kwargs = {
'stats_only': True,
}
kwargs.update(bulk_kwargs)
return helpers.bulk(target_client, _change_doc_index(docs, target_index),
chunk_size=chunk_size, **kwargs)
def reindex(src_index, dst_index, type_list, chunk_size=None, time=None):
"""Reindex a set of indexes internally within ElasticSearch. All of the
documents under the types that live in "type_list" under the index
"src_index" will be copied into the documents under the same types
in the index "dst_index". In other words, a perfect re-index!
Instead of using the plugin API and consuming bandwidth to perform
the re-index we will allow ElasticSearch to do some heavy lifting for
us. Under the covers we are combining scan/scroll with bulk operations
to do this re-indexing as efficient as possible.
"""
es_engine = searchlight.elasticsearch.get_api()
# Create a Query DSL string to access all documents within the specified
# document types. We will filter on the "_type" field in this index. Since
# there are multple docuent types, we will need to use the "terms" filter.
# All of the document types will be added to the list for "_type". We need
# to enable version to allow the search to return the version field. This
# will be used by the reindexer.
body = {"version": "true",
"query": {"filtered": {"filter": {"terms": {"_type": type_list}}}}}
# Debug: Show all documents that ES will re-index.
# LOG.debug(es_engine.search(index=src_index, body=body, size=500))
helper_reindex(client=es_engine, source_index=src_index,
target_index=dst_index, query=body)
def create_new_index(group):
"""Create a new index for a specific Resource Type Group. Upon
exit of this method, the index is still not ready to be used.