#!/usr/bin/env python2 """ Copyright 2015 Hewlett-Packard Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import argparse import ConfigParser import json import os import re import sys import requests from freezer_api.common import db_mappings DEFAULT_CONF_PATH = '/etc/freezer-api.conf' DEFAULT_ES_SERVER_PORT = 9200 DEFAULT_INDEX = 'freezer' DEFAULT_REPLICAS = 2 class MergeMappingException(Exception): pass class NumberOfReplicasException(Exception): pass class ElastichsearchEngine(object): def __init__(self, es_url, es_index, args): self.es_url = es_url self.es_index = es_index self.args = args self.exit_code = os.EX_OK def verbose_print(self, message, level=1): if self.args.verbose >= level: print(message) def set_number_of_replicas(self, n): if self.number_of_replicas_match(n): print ('Number of replicas matches. ' 'Current value is {0}'.format(n)) else: self.askput_number_of_replicas(n) def number_of_replicas_match(self, n): url = '{0}/{1}/_settings'.format(self.es_url, self.es_index) self.verbose_print('GET {0}\n'.format(url)) r = requests.get(url) if r.status_code != requests.codes.OK: raise Exception("ERROR {0}: {1}".format(r.status_code, r.text)) self.verbose_print("response: {0}".format(r)) settings_dict = r.json() current_n = int(settings_dict[self.es_index]['settings'] ['index']['number_of_replicas']) self.verbose_print("Current replica number: {0}".format(current_n)) return current_n == int(n) def askput_number_of_replicas(self, n): if self.args.test_only: print "Number of replicas don't match" self.exit_code = os.EX_DATAERR return prompt_message = ('Number of replicas needs to be ' 'updated to {0}. ' 'Proceed ? (y/n)' .format(n)) if not self.proceed(prompt_message, self.args.yes): return url = '{0}/{1}/_settings'.format(self.es_url, self.es_index) body_dict = {"number_of_replicas": int(self.args.replicas)} self.verbose_print('PUT {0}\n{1}'.format(url, body_dict)) r = requests.put(url, data=json.dumps(body_dict)) self.verbose_print("response: {0}".format(r)) if r.status_code == requests.codes.OK: print "Replica number set to {0}".format(self.args.replicas) else: raise NumberOfReplicasException('Error setting the replica ' 'number, {0}: {1}' .format(r.status_code, r.text)) def put_mappings(self, mappings): self.check_index_exists() for es_type, mapping in mappings.iteritems(): if self.mapping_match(es_type, mapping): print '{0}/{1} MATCHES'.format(self.es_index, es_type) else: self.askput_mapping(es_type, mapping) return self.exit_code def check_index_exists(self): url = '{0}/{1}'.format(self.es_url, self.es_index) r = requests.post(url) if r.status_code not in [requests.codes.OK, requests.codes.BAD_REQUEST]: raise Exception('Unable to check/create index {0}. ' 'ERROR {1}'.format(url, r.status_code)) def mapping_match(self, es_type, mapping): url = '{0}/{1}/_mapping/{2}'.format(self.es_url, self.es_index, es_type) self.verbose_print("Getting mappings: http GET {0}".format(url)) r = requests.get(url) self.verbose_print("response: {0}".format(r)) if r.status_code == requests.codes.NOT_FOUND: return False if r.status_code != requests.codes.OK: raise Exception("ERROR {0}: {1}".format(r.status_code, r.text)) current_mappings = r.json().get(self.es_index, {}).get('mappings', {}) return mapping == current_mappings.get(es_type, {}) def askput_mapping(self, es_type, mapping): if self.args.test_only: print '{0}/{1} DOES NOT MATCH'.format(self.es_index, es_type) self.exit_code = os.EX_DATAERR return prompt_message = ('{0}/{1}/{2} needs to be updated. ' 'Proceed ? (y/n)' .format(self.es_url, self.es_index, es_type)) if not self.proceed(prompt_message, self.args.yes): return self.verbose_print('Trying to upload mappings ...') try: self.put_mapping(es_type, mapping) except MergeMappingException as e: self.verbose_print('Unable to merge mappings.') self.verbose_print(e, 2) else: print "Mappings updated" return if self.args.yes and not self.args.erase: # explicit consent to update without explicit consent to erase: # do not erase type and return error code self.exit_code = os.EX_DATAERR print ('{0}/{1} DOES NOT MATCH. ' 'Need explicit consent to erase types' .format(self.es_index, es_type)) return prompt_message = ('Type {0}/{1}/{2} needs to be deleted. ' 'Proceed (y/n) ? '.format(self.es_url, self.es_index, es_type)) if not self.proceed(prompt_message, self.args.erase): return self.verbose_print('Deleting type {0}'.format(es_type)) self.delete_type(es_type) self.verbose_print('Uploading mappings ...') self.put_mapping(es_type, mapping) def delete_type(self, es_type): url = '{0}/{1}/{2}'.format(self.es_url, self.es_index, es_type) self.verbose_print("DELETE {0}".format(url)) r = requests.delete(url) self.verbose_print("response: {0}".format(r)) if r.status_code not in [requests.codes.OK, requests.codes.NOT_FOUND]: raise Exception('Type removal error {0}: ' '{1}'.format(r.status_code, r.text)) def put_mapping(self, es_type, mapping): url = '{0}/{1}/_mapping/{2}'.format(self.es_url, self.es_index, es_type) self.verbose_print('PUT {0}'.format(url)) r = requests.put(url, data=json.dumps(mapping)) self.verbose_print("response: {0}".format(r)) if r.status_code == requests.codes.OK: print "Type {0} mapping created".format(url) else: raise MergeMappingException('Type mapping creation error {0}: ' '{1}'.format(r.status_code, r.text)) def proceed(self, message, assume_yes=False): if assume_yes: return True while True: selection = raw_input(message) if selection.upper() == 'Y': return True elif selection.upper() == 'N': return False def get_args(mapping_choices): arg_parser = argparse.ArgumentParser() arg_parser.add_argument( 'host', action='store', default='', nargs='?', help='The DB host address[:port], default "localhost"') arg_parser.add_argument( '-p', '--port', action='store', type=int, help=('The DB server port ' '(default: {0})'.format(DEFAULT_ES_SERVER_PORT)), dest='port', default=0) arg_parser.add_argument( '-m', '--mapping', action='store', help=('Specific mapping to upload. Valid choices: {0}' .format(','.join(mapping_choices))), choices=mapping_choices, dest='select_mapping', default='') arg_parser.add_argument( '-i', '--index', action='store', help='The DB index (default "{0}")'.format(DEFAULT_INDEX), dest='index') arg_parser.add_argument( '-y', '--yes', action='store_true', help="Automatic confirmation to update mappings and " "number-of-replicas", dest='yes', default=False) arg_parser.add_argument( '-e', '--erase', action='store_true', help=("Enable index deletion in case mapping update " "fails due to incompatible changes"), dest='erase', default=False) arg_parser.add_argument( '-v', '--verbose', action='count', help="Verbose", dest='verbose', default=False) arg_parser.add_argument( '-t', '--test-only', action='store_true', help="Test the validity of the mappings, but take no action", dest='test_only', default=False) arg_parser.add_argument( '-c', '--config-file', action='store', help='Config file with the db information', dest='config_file', default='') arg_parser.add_argument( '-r', '--replicas', action='store', help='Set the value for the number replicas in the DB index ' '(default {0} when not specified here nor in config file)' .format(DEFAULT_REPLICAS), dest='replicas', default=False) return arg_parser.parse_args() def find_config_file(): cwd_config = os.path.join(os.getcwd(), 'freezer-api.conf') for config_file_path in [cwd_config, DEFAULT_CONF_PATH]: if os.path.isfile(config_file_path): return config_file_path def parse_config_file(fname): """ Read host URL from config-file :param fname: config-file path :return: (host, port, db_index, number_of_replicas) """ if not fname: return None, 0, None, 0 host, port, index, number_of_replicas = None, 0, None, 0 config = ConfigParser.ConfigParser() config.read(fname) try: if config.has_option('storage', 'endpoint'): endpoint = config.get('storage', 'endpoint') elif config.has_option('storage', 'hosts'): endpoint = config.get('storage', 'hosts') else: endpoint = '' match = re.search(r'^http://([^:]+):([\d]+)', endpoint) if match: host = match.group(1) port = int(match.group(2)) except: pass try: index = config.get('storage', 'index') except: pass try: number_of_replicas = int(config.get('storage', 'number_of_replicas')) except: pass return host, port, index, number_of_replicas def get_db_params(args): """ Extracts the db configuration parameters either from the provided command line arguments or searching in the default freezer-api config file /etc/freezer-api.conf :param args: argparsed command line arguments :return: (elasticsearch_url, elastichsearch_index, number_of_replicas) """ conf_fname = args.config_file or find_config_file() if args.verbose: print "using config file: {0}".format(conf_fname) conf_host, conf_port, conf_db_index, number_of_replicas = \ parse_config_file(conf_fname) # host lookup # 1) host arg (before ':') # 2) config file provided # 3) string 'localhost' host = args.host or conf_host or 'localhost' host = host.split(':')[0] # port lookup # 1) port arg # 2) host arg (after ':') # 3) config file provided # 4) DEFAULT_ES_SERVER_PORT match_port = None match = re.search(r':(\d+)$', args.host) if match: match_port = match.groups()[0] port = args.port or match_port or conf_port or DEFAULT_ES_SERVER_PORT elasticsearch_url = 'http://{0}:{1}'.format(host, port) # index lookup # 1) index args # 2) config file # 3) string DEFAULT_INDEX elasticsearch_index = args.index or conf_db_index or DEFAULT_INDEX return elasticsearch_url, elasticsearch_index, number_of_replicas def main(): mappings = db_mappings.get_mappings() args = get_args(mapping_choices=mappings.keys()) elasticsearch_url, elasticsearch_index, elasticsearch_replicas = \ get_db_params(args) number_of_replicas = int(args.replicas or elasticsearch_replicas or DEFAULT_REPLICAS) es_manager = ElastichsearchEngine(es_url=elasticsearch_url, es_index=elasticsearch_index, args=args) if args.verbose: print " db url: {0}".format(elasticsearch_url) print "db index: {0}".format(elasticsearch_index) if args.select_mapping: mappings = {args.select_mapping: mappings[args.select_mapping]} try: es_manager.put_mappings(mappings) es_manager.set_number_of_replicas(number_of_replicas) except Exception as e: print "ERROR {0}".format(e) return os.EX_DATAERR return es_manager.exit_code if __name__ == '__main__': sys.exit(main())