diff --git a/searchlight/cmd/manage.py b/searchlight/cmd/manage.py index 69cae377..1e93d198 100644 --- a/searchlight/cmd/manage.py +++ b/searchlight/cmd/manage.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import six import sys @@ -99,12 +100,12 @@ class IndexCommands(object): # when "group" is empty. resource_groups = [] plugin_objs = {} - plugins_to_index = [] + plugins_list = [] for res_type, ext in six.iteritems(utils.get_search_plugins()): plugin_obj = ext.obj group_set.discard(plugin_obj.resource_group_name) if (not group) or (plugin_obj.resource_group_name in group): - plugins_to_index.append((res_type, ext)) + plugins_list.append((res_type, ext)) plugin_objs[plugin_obj.resource_group_name] = plugin_obj if not (plugin_obj.resource_group_name, plugin_obj.alias_name_search, @@ -128,7 +129,7 @@ class IndexCommands(object): # get_index_display_name(). Therefore any child plugins in the # display list, will be listed twice. display_plugins = [] - for res, ext in plugins_to_index: + for res, ext in plugins_list: if not ext.obj.parent_plugin: display_plugins.append((res, ext)) @@ -165,7 +166,7 @@ class IndexCommands(object): try: for group, search, listen in resource_groups: index_names[group] = es_utils.create_new_index(group) - for resource_type, ext in plugins_to_index: + for resource_type, ext in plugins_list: plugin_obj = ext.obj group_name = plugin_obj.resource_group_name plugin_obj.prepare_index(index_name=index_names[group_name]) @@ -189,27 +190,53 @@ class IndexCommands(object): es_utils.alias_error_cleanup(index_names) raise - # Step #3: Re-index all specified resource types. - # NB: The aliases remain unchanged for this step. - # NBB: There is an optimization possible here. In the future when - # we have enable multiple resource_type_group entries, we - # will want to look at only deleting the failing index. We do - # not need to delete the indexes that successfully re-indexed. - # This can get tricky since we will still need to perform - # steps 4 & 5 for this aliases. - for resource_type, ext in plugins_to_index: - plugin_obj = ext.obj - group_name = plugin_obj.resource_group_name - try: - plugin_obj.initial_indexing(index_name=index_names[group_name]) - except exceptions.EndpointNotFound: - LOG.warning(_LW("Service is not available for plugin: " - "%(ext)s") % {"ext": ext.name}) - except Exception as e: - LOG.error(_LE("Failed to setup index extension " - "%(ext)s: %(e)s") % {'ext': ext.name, 'e': e}) - es_utils.alias_error_cleanup(index_names) - raise + # Step #3: Re-index all resource types in this Resource Type Group. + # As an optimization, if any types are explicitly requested, we + # will index them from their service APIs. The rest will be + # indexed from an existing ES index, if one exists. + # NB: The "search" and "listener" aliases remain unchanged for this + # step. + es_reindex = [] + plugins_to_index = copy.copy(plugins_list) + if _type: + for resource_type, ext in plugins_list: + doc_type = ext.obj.get_document_type() + if doc_type not in _type: + es_reindex.append(doc_type) + plugins_to_index.remove((resource_type, ext)) + + # Call plugin API as needed. + if plugins_to_index: + for res, ext in plugins_to_index: + plugin_obj = ext.obj + gname = plugin_obj.resource_group_name + try: + plugin_obj.initial_indexing(index_name=index_names[gname]) + except exceptions.EndpointNotFound: + LOG.warning(_LW("Service is not available for plugin: " + "%(ext)s") % {"ext": ext.name}) + except Exception as e: + LOG.error(_LE("Failed to setup index extension " + "%(ex)s: %(e)s") % {'ex': ext.name, 'e': e}) + es_utils.alias_error_cleanup(index_names) + raise + + # Call ElasticSearch for the rest, if needed. + if es_reindex: + for group in index_names.keys(): + # Grab the correct tuple as a list, convert list to a single + # tuple, extract second member (the search alias) of tuple. + alias_search = \ + [a for a in resource_groups if a[0] == group][0][1] + try: + es_utils.reindex(src_index=alias_search, + dst_index=index_names[group], + type_list=es_reindex) + except Exception as e: + LOG.error(_LE("Failed to setup index extension " + "%(ex)s: %(e)s") % {'ex': ext.name, 'e': e}) + es_utils.alias_error_cleanup(index_names) + raise # Step #4: Update the "search" alias. # All re-indexing has occurred. The index/alias is the same for diff --git a/searchlight/elasticsearch/plugins/utils.py b/searchlight/elasticsearch/plugins/utils.py index 756c26ce..6888fc91 100644 --- a/searchlight/elasticsearch/plugins/utils.py +++ b/searchlight/elasticsearch/plugins/utils.py @@ -64,6 +64,94 @@ def timestamp_to_isotime(timestamp): return oslo_utils.timeutils.isotime(parsed_time) +def helper_reindex(client, source_index, target_index, query=None, + target_client=None, chunk_size=500, scroll='5m', + scan_kwargs={}, bulk_kwargs={}): + """We have lovingly copied the entire helpers.reindex function here: + lib/python2.7/site-packages/elasticsearch/helpers/__init__.py. + We need our own version (haha) of reindex to handle external versioning + within a document. The current implmenentation of helpers.reindex does + not provide this support. Since there is no way to tell helpers.bulk() + that an external version is being used, we will need to modify each + document in the generator. For future maintainablilty, modifications + to the original method will be preceeded with a comment "CHANGED:". + Minor tweaks made for PEP8 conformance excepted. + """ + + """ + Reindex all documents from one index that satisfy a given query + to another, potentially (if `target_client` is specified) on a different + cluster. If you don't specify the query you will reindex all the documents. + + .. note:: + + This helper doesn't transfer mappings, just the data. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use (for + read if `target_client` is specified as well) + :arg source_index: index (or list of indices) to read documents from + :arg target_index: name of the index in the target cluster to populate + :arg query: body for the :meth:`~elasticsearch.Elasticsearch.search` api + :arg target_client: optional, is specified will be used for writing (thus + enabling reindex between clusters) + :arg chunk_size: number of docs in one chunk sent to es (default: 500) + :arg scroll: Specify how long a consistent view of the index should be + maintained for scrolled search + :arg scan_kwargs: additional kwargs to be passed to + :func:`~elasticsearch.helpers.scan` + :arg bulk_kwargs: additional kwargs to be passed to + :func:`~elasticsearch.helpers.bulk` + """ + target_client = client if target_client is None else target_client + + docs = helpers.scan(client, query=query, index=source_index, + scroll=scroll, + fields=('_source', '_parent', '_routing', + '_timestamp'), **scan_kwargs) + + def _change_doc_index(hits, index): + for h in hits: + h['_index'] = index + # CHANGED: Allow for external versions to be indexed. + h['_version_type'] = "external" + if 'fields' in h: + h.update(h.pop('fields')) + yield h + + kwargs = { + 'stats_only': True, + } + kwargs.update(bulk_kwargs) + return helpers.bulk(target_client, _change_doc_index(docs, target_index), + chunk_size=chunk_size, **kwargs) + + +def reindex(src_index, dst_index, type_list, chunk_size=None, time=None): + """Reindex a set of indexes internally within ElasticSearch. All of the + documents under the types that live in "type_list" under the index + "src_index" will be copied into the documents under the same types + in the index "dst_index". In other words, a perfect re-index! + Instead of using the plugin API and consuming bandwidth to perform + the re-index we will allow ElasticSearch to do some heavy lifting for + us. Under the covers we are combining scan/scroll with bulk operations + to do this re-indexing as efficient as possible. + """ + es_engine = searchlight.elasticsearch.get_api() + + # Create a Query DSL string to access all documents within the specified + # document types. We will filter on the "_type" field in this index. Since + # there are multple docuent types, we will need to use the "terms" filter. + # All of the document types will be added to the list for "_type". We need + # to enable version to allow the search to return the version field. This + # will be used by the reindexer. + body = {"version": "true", + "query": {"filtered": {"filter": {"terms": {"_type": type_list}}}}} + # Debug: Show all documents that ES will re-index. + # LOG.debug(es_engine.search(index=src_index, body=body, size=500)) + helper_reindex(client=es_engine, source_index=src_index, + target_index=dst_index, query=body) + + def create_new_index(group): """Create a new index for a specific Resource Type Group. Upon exit of this method, the index is still not ready to be used.