Add support for elasticsearch number of replicas

It is now possible to specify the number of replicas of the freezer
index in the elasticsearch database.

The value is set by the freezer-db-init script and can be specified
either as a command line parameter or defined in the freezer-api config
file.

When not specified it is set to the default value 2.

Adds the following command line parameter to the freeze-db-init script:

    -r  --replicas <N>

Adds the following parameter to the [storage] group of the
freezer-api.conf file:

    number_of_replicas = N

Change-Id: I210dce8bf634727f52c87152c3d036cc4d7065b5
This commit is contained in:
Fabrizio Vanni 2015-10-28 09:09:53 +00:00
parent 41c344c9e2
commit 4faa13b8e7
3 changed files with 73 additions and 21 deletions

View File

@ -33,8 +33,11 @@ delay_auth_decision = False
[storage] [storage]
# supported db engine. currently elasticsearch only
db=elasticsearch db=elasticsearch
hosts='http://elasticsearch_host:9200' hosts='http://elasticsearch_host:9200'
# freezer-db-init uses the following parameter to set the number of replicas
number_of_replicas=2
#use_ssl=False #use_ssl=False
#ca_certs='' #ca_certs=''

View File

@ -35,6 +35,7 @@ from freezer_api.common import db_mappings
DEFAULT_CONF_PATH = '/etc/freezer-api.conf' DEFAULT_CONF_PATH = '/etc/freezer-api.conf'
DEFAULT_ES_SERVER_PORT = 9200 DEFAULT_ES_SERVER_PORT = 9200
DEFAULT_INDEX = 'freezer' DEFAULT_INDEX = 'freezer'
DEFAULT_REPLICAS = 2
class MergeMappingException(Exception): class MergeMappingException(Exception):
@ -52,6 +53,20 @@ class ElastichsearchEngine(object):
if self.args.verbose >= level: if self.args.verbose >= level:
print(message) print(message)
def set_number_of_replicas(self):
url = '{0}/{1}/_settings'.format(self.es_url,
self.es_index)
body_dict = {"number_of_replicas": int(self.args.replicas)}
self.verbose_print('PUT {0}\n{1}'.format(url, body_dict))
r = requests.put(url, data=json.dumps(body_dict))
self.verbose_print("response: {0}".format(r))
if r.status_code == requests.codes.OK:
print "Replica number set to {0}".format(self.args.replicas)
else:
raise MergeMappingException('Error setting the replica number,'
' {0}: {1}'
.format(r.status_code, r.text))
def put_mappings(self, mappings): def put_mappings(self, mappings):
self.check_index_exists() self.check_index_exists()
for es_type, mapping in mappings.iteritems(): for es_type, mapping in mappings.iteritems():
@ -59,6 +74,7 @@ class ElastichsearchEngine(object):
print '{0}/{1} MATCHES'.format(self.es_index, es_type) print '{0}/{1} MATCHES'.format(self.es_index, es_type)
else: else:
self.askput_mapping(es_type, mapping) self.askput_mapping(es_type, mapping)
self.set_number_of_replicas()
return self.exit_code return self.exit_code
def check_index_exists(self): def check_index_exists(self):
@ -200,7 +216,12 @@ def get_args(mapping_choices):
'-c', '--config-file', action='store', '-c', '--config-file', action='store',
help='Config file with the db information', help='Config file with the db information',
dest='config_file', default='') dest='config_file', default='')
arg_parser.add_argument(
'-r', '--replicas', action='store',
help='Set the value for the number replicas in the DB index '
'(default {0} when not specified here nor in config file)'
.format(DEFAULT_REPLICAS),
dest='replicas', default=False)
return arg_parser.parse_args() return arg_parser.parse_args()
@ -216,18 +237,23 @@ def parse_config_file(fname):
Read host URL from config-file Read host URL from config-file
:param fname: config-file path :param fname: config-file path
:return: (host, port, db_index) :return: (host, port, db_index, number_of_replicas)
""" """
if not fname: if not fname:
return None, 0, None return None, 0, None
host, port, index = None, 0, None host, port, index, number_of_replicas = None, 0, None, 0
config = ConfigParser.ConfigParser() config = ConfigParser.ConfigParser()
config.read(fname) config.read(fname)
try: try:
endpoint = config.get('storage', 'endpoint') if config.has_option('storage', 'endpoint'):
match = re.search(r'^http://([^:]+):([\d]+)$', endpoint) endpoint = config.get('storage', 'endpoint')
elif config.has_option('storage', 'hosts'):
endpoint = config.get('storage', 'hosts')
else:
endpoint = ''
match = re.search(r'^http://([^:]+):([\d]+)', endpoint)
if match: if match:
host = match.group(1) host = match.group(1)
port = int(match.group(2)) port = int(match.group(2))
@ -237,7 +263,11 @@ def parse_config_file(fname):
index = config.get('storage', 'index') index = config.get('storage', 'index')
except: except:
pass pass
return host, int(port), index try:
number_of_replicas = config.get('storage', 'number_of_replicas')
except:
pass
return host, int(port), index, int(number_of_replicas)
def get_db_params(args): def get_db_params(args):
@ -247,14 +277,15 @@ def get_db_params(args):
file /etc/freezer-api.conf file /etc/freezer-api.conf
:param args: argparsed command line arguments :param args: argparsed command line arguments
:return: (elasticsearch_url, elastichsearch_index) :return: (elasticsearch_url, elastichsearch_index, number_of_replicas)
""" """
conf_fname = args.config_file or find_config_file() conf_fname = args.config_file or find_config_file()
if args.verbose: if args.verbose:
print "using config file: {0}".format(conf_fname) print "using config file: {0}".format(conf_fname)
conf_host, conf_port, conf_db_index = parse_config_file(conf_fname) conf_host, conf_port, conf_db_index, number_of_replicas = \
parse_config_file(conf_fname)
# host lookup # host lookup
# 1) host arg (before ':') # 1) host arg (before ':')
@ -283,7 +314,7 @@ def get_db_params(args):
# 3) string DEFAULT_INDEX # 3) string DEFAULT_INDEX
elasticsearch_index = args.index or conf_db_index or DEFAULT_INDEX elasticsearch_index = args.index or conf_db_index or DEFAULT_INDEX
return elasticsearch_url, elasticsearch_index return elasticsearch_url, elasticsearch_index, number_of_replicas
def main(): def main():
@ -291,7 +322,10 @@ def main():
args = get_args(mapping_choices=mappings.keys()) args = get_args(mapping_choices=mappings.keys())
elasticsearch_url, elasticsearch_index = get_db_params(args) elasticsearch_url, elasticsearch_index, elasticsearch_replicas = \
get_db_params(args)
args.replicas = args.replicas or elasticsearch_replicas or DEFAULT_REPLICAS
es_manager = ElastichsearchEngine(es_url=elasticsearch_url, es_manager = ElastichsearchEngine(es_url=elasticsearch_url,
es_index=elasticsearch_index, es_index=elasticsearch_index,

View File

@ -52,6 +52,7 @@ class TestElasticsearchEngine(unittest.TestCase):
self.mock_args.verbose = 1 self.mock_args.verbose = 1
self.mock_args.select_mapping = '' self.mock_args.select_mapping = ''
self.mock_args.erase = False self.mock_args.erase = False
self.mock_args.replicas = 0
self.es_manager = ElastichsearchEngine(es_url='http://test:9333', self.es_manager = ElastichsearchEngine(es_url='http://test:9333',
es_index='freezerindex', es_index='freezerindex',
args=self.mock_args) args=self.mock_args)
@ -62,14 +63,24 @@ class TestElasticsearchEngine(unittest.TestCase):
@patch.object(ElastichsearchEngine, 'check_index_exists') @patch.object(ElastichsearchEngine, 'check_index_exists')
@patch.object(ElastichsearchEngine, 'mapping_match') @patch.object(ElastichsearchEngine, 'mapping_match')
@patch.object(ElastichsearchEngine, 'askput_mapping') @patch.object(ElastichsearchEngine, 'askput_mapping')
def test_put_mappings_does_nothing_when_mappings_match(self, mock_askput_mapping, mock_mapping_match, mock_check_index_exists): @patch.object(ElastichsearchEngine, 'set_number_of_replicas')
def test_put_mappings_does_nothing_when_mappings_match(self,
mock_set_number_of_replicas,
mock_askput_mapping,
mock_mapping_match,
mock_check_index_exists):
self.es_manager.put_mappings(self.test_mappings) self.es_manager.put_mappings(self.test_mappings)
self.assertEquals(mock_askput_mapping.call_count, 0) self.assertEquals(mock_askput_mapping.call_count, 0)
@patch.object(ElastichsearchEngine, 'check_index_exists') @patch.object(ElastichsearchEngine, 'check_index_exists')
@patch.object(ElastichsearchEngine, 'mapping_match') @patch.object(ElastichsearchEngine, 'mapping_match')
@patch.object(ElastichsearchEngine, 'askput_mapping') @patch.object(ElastichsearchEngine, 'askput_mapping')
def test_put_mappings_calls_askput_when_mappings_match_not(self, mock_askput_mapping, mock_mapping_match, mock_check_index_exists): @patch.object(ElastichsearchEngine, 'set_number_of_replicas')
def test_put_mappings_calls_askput_when_mappings_match_not(self,
mock_set_number_of_replicas,
mock_askput_mapping,
mock_mapping_match,
mock_check_index_exists):
mock_mapping_match.return_value = False mock_mapping_match.return_value = False
self.es_manager.put_mappings(self.test_mappings) self.es_manager.put_mappings(self.test_mappings)
self.assertEquals(mock_askput_mapping.call_count, 3) self.assertEquals(mock_askput_mapping.call_count, 3)
@ -77,10 +88,12 @@ class TestElasticsearchEngine(unittest.TestCase):
@patch.object(ElastichsearchEngine, 'proceed') @patch.object(ElastichsearchEngine, 'proceed')
@patch.object(ElastichsearchEngine, 'delete_type') @patch.object(ElastichsearchEngine, 'delete_type')
@patch.object(ElastichsearchEngine, 'put_mapping') @patch.object(ElastichsearchEngine, 'put_mapping')
@patch.object(ElastichsearchEngine, 'set_number_of_replicas')
def test_askput_calls_delete_and_put_mappings_when_always_yes_and_erase(self, def test_askput_calls_delete_and_put_mappings_when_always_yes_and_erase(self,
mock_put_mapping, mock_set_number_of_replicas,
mock_delete_type, mock_put_mapping,
mock_proceed): mock_delete_type,
mock_proceed):
self.mock_args.yes = True self.mock_args.yes = True
self.mock_args.erase = True self.mock_args.erase = True
mock_put_mapping.side_effect = [MergeMappingException('regular test failure'), 0] mock_put_mapping.side_effect = [MergeMappingException('regular test failure'), 0]
@ -254,20 +267,22 @@ class TestDbInit(unittest.TestCase):
mock_config = Mock() mock_config = Mock()
mock_ConfigParser.return_value = mock_config mock_ConfigParser.return_value = mock_config
mock_config.get.side_effect = lambda *x: {('storage', 'endpoint'): 'http://iperuranio:1999', mock_config.get.side_effect = lambda *x: {('storage', 'endpoint'): 'http://iperuranio:1999',
('storage', 'index'): 'ohyes'}[x] ('storage', 'index'): 'ohyes',
host, port, index = parse_config_file('dontcare') ('storage', 'number_of_replicas'): '10'}[x]
host, port, index, replicas = parse_config_file('dontcare')
self.assertEquals(host, 'iperuranio') self.assertEquals(host, 'iperuranio')
self.assertEquals(port, 1999) self.assertEquals(port, 1999)
self.assertEquals(index, 'ohyes') self.assertEquals(index, 'ohyes')
self.assertEquals(replicas, 10)
@patch('freezer_api.cmd.db_init.parse_config_file') @patch('freezer_api.cmd.db_init.parse_config_file')
def test_get_db_params_returns_args_parameters(self, mock_parse_config_file): def test_get_db_params_returns_args_parameters(self, mock_parse_config_file):
mock_parse_config_file.return_value = (None, None, None) mock_parse_config_file.return_value = (None, None, None, None )
mock_args = Mock() mock_args = Mock()
mock_args.host = 'pumpkin' mock_args.host = 'pumpkin'
mock_args.port = 12345 mock_args.port = 12345
mock_args.index = 'ciccio' mock_args.index = 'ciccio'
elasticsearch_url, elasticsearch_index = get_db_params(mock_args) elasticsearch_url, elasticsearch_index, elasticsearch_replicas = get_db_params(mock_args)
self.assertEquals(elasticsearch_url, 'http://pumpkin:12345') self.assertEquals(elasticsearch_url, 'http://pumpkin:12345')
self.assertEquals(elasticsearch_index, 'ciccio') self.assertEquals(elasticsearch_index, 'ciccio')
@ -277,7 +292,7 @@ class TestDbInit(unittest.TestCase):
def test_main_calls_esmanager_put_mappings_with_mappings(self, mock_get_args, mock_get_db_params, def test_main_calls_esmanager_put_mappings_with_mappings(self, mock_get_args, mock_get_db_params,
mock_ElastichsearchEngine): mock_ElastichsearchEngine):
mock_get_args.return_value = self.mock_args mock_get_args.return_value = self.mock_args
mock_get_db_params.return_value = Mock(), Mock() mock_get_db_params.return_value = 'url', 'index', 0
mock_es_manager = Mock() mock_es_manager = Mock()
mock_es_manager.put_mappings.return_value = os.EX_OK mock_es_manager.put_mappings.return_value = os.EX_OK
@ -294,7 +309,7 @@ class TestDbInit(unittest.TestCase):
def test_main_return_EX_DATAERR_exitcode_on_error(self, mock_get_args, mock_get_db_params, def test_main_return_EX_DATAERR_exitcode_on_error(self, mock_get_args, mock_get_db_params,
mock_ElastichsearchEngine): mock_ElastichsearchEngine):
mock_get_args.return_value = self.mock_args mock_get_args.return_value = self.mock_args
mock_get_db_params.return_value = Mock(), Mock() mock_get_db_params.return_value = 'url', 'index', 0
mock_es_manager = Mock() mock_es_manager = Mock()
mock_ElastichsearchEngine.return_value = mock_es_manager mock_ElastichsearchEngine.return_value = mock_es_manager