Spread pacemaker remote resources across cluster.

Use location directives to spread pacemaker remote resources across
cluster. This is to prevent multiple resources being taken down in
the event of a single node failure. This would usually not be a
problem but if the node is being queried by masakari host
monitors at the time the node goes down then the query can hang.

Change-Id: Ib8a667d0d82ef3dcd4da27e62460b4f0ce32ee43
Partial-Bug: #1889094
Depends-On: Ic45dbdd9d8581f25549580c7e98a8d6e0bf8c3e7
This commit is contained in:
Liam Young 2020-07-30 06:25:36 +00:00
parent b40a6754b0
commit 527fd2c704
5 changed files with 199 additions and 18 deletions

View File

@ -283,23 +283,40 @@ def crm_version():
return StrictVersion(matched.group(1)) return StrictVersion(matched.group(1))
def crm_update_resource(res_name, res_type, res_params=None, force=False): def _crm_update_object(update_template, update_ctxt, hash_keys, unitdata_key,
"""Update a resource using `crm configure load update` res_params=None, force=False):
"""Update a object using `crm configure load update`
:param res_name: resource name :param update_template: Format string to create object when update_ctxt is
:param res_type: resource type (e.g. IPaddr2) applied.
:param res_params: resource's parameters (e.g. "params ip=10.5.250.250") :type update_template: str
:param update_ctxt: Context to apply to update_template to generate object
creation directive.
:type update_ctxt: dict
:param hash_keys: List of keys to use from update_ctxt when generating
objects hash.
:type hash_keys: List[str]
:param unitdata_key: Key to use when storing objects hash in in unitdata
kv.
:type unitdata_key: str
:param res_params: Resource's additional parameters
(e.g. "params ip=10.5.250.250")
:type res_params: str or None
:param force: Whether to force the update irrespective of whats currently
configured.
:type force: bool
:returns: Return code (0 => success)
:rtype: int
""" """
db = unitdata.kv() db = unitdata.kv()
res_hash = resource_checksum(res_name, res_type, res_params) res_hash = generate_checksum(update_ctxt[k] for k in hash_keys)
if not force and db.get(unitdata_key) == res_hash:
if not force and db.get('{}-{}'.format(res_name, res_type)) == res_hash:
log("Resource {} already defined and parameters haven't changed" log("Resource {} already defined and parameters haven't changed"
.format(res_name)) .format(update_ctxt['object_name']))
return 0 return 0
with tempfile.NamedTemporaryFile() as f: with tempfile.NamedTemporaryFile() as f:
f.write('primitive {} {}'.format(res_name, res_type).encode('ascii')) f.write(update_template.format(**update_ctxt).encode('ascii'))
if res_params: if res_params:
f.write(' \\\n\t{}'.format(res_params).encode('ascii')) f.write(' \\\n\t{}'.format(res_params).encode('ascii'))
@ -308,7 +325,9 @@ def crm_update_resource(res_name, res_type, res_params=None, force=False):
f.flush() f.flush()
f.seek(0) f.seek(0)
log('Updating resource {}'.format(res_name), level=INFO) log(
'Updating resource {}'.format(update_ctxt['object_name']),
level=INFO)
log('File content:\n{}'.format(f.read()), level=DEBUG) log('File content:\n{}'.format(f.read()), level=DEBUG)
cmd = "crm configure load update {}".format(f.name) cmd = "crm configure load update {}".format(f.name)
log('Update command: {}'.format(cmd)) log('Update command: {}'.format(cmd))
@ -321,11 +340,86 @@ def crm_update_resource(res_name, res_type, res_params=None, force=False):
log('crm command exit code: {}'.format(retcode), level=level) log('crm command exit code: {}'.format(retcode), level=level)
if retcode == 0: if retcode == 0:
db.set('{}-{}'.format(res_name, res_type), res_hash) db.set(unitdata_key, res_hash)
db.flush()
return retcode return retcode
def crm_update_resource(res_name, res_type, res_params=None, force=False):
"""Update a resource using `crm configure load update`
:param res_name: resource name
:type res_name: str
:param res_type: resource type (e.g. IPaddr2)
:type res_type: str
:param res_params: resource's parameters (e.g. "params ip=10.5.250.250")
:type res_params: str or None
:param force: Whether to force the update irrespective of whats currently
configured.
:type force: bool
:returns: Return code (0 => success)
:rtype: int
"""
hash_keys = ['resource_type']
if res_params:
hash_keys.append('resource_params')
return _crm_update_object(
'primitive {object_name} {resource_type}',
{
'object_name': res_name,
'resource_params': res_params,
'resource_type': res_type},
hash_keys,
'{}-{}'.format(res_name, res_type),
res_params=res_params,
force=force)
def crm_update_location(location_name, resource_name, score, node,
force=False):
"""Update a location rule.
:param location_name: Name of location rule.
:type location_name: str
:param resource_name: Resource name location rule governs.
:type resource_name: str
:param score: The score for the resource running on node.
:type score: int
:param node: Name of the node this rule applies to.
:type node: str
:param force: Whether to force the update irrespective of whats currently
configured.
:type force: bool
:returns: Return code (0 => success)
:rtype: int
"""
return _crm_update_object(
'location {object_name} {resource_name} {score}: {node}',
{
'object_name': location_name,
'resource_name': resource_name,
'score': str(score),
'node': node},
['resource_name', 'score', 'node'],
'{}-{}'.format(location_name, resource_name),
force=force)
def generate_checksum(check_strings):
"""Create a md5 checksum using each string in the list.
:param check_strings: resource name
:type check_strings: List[str]
:returns: Hash generated from strings.
:rtype: str
"""
m = hashlib.md5()
for entry in check_strings:
m.update(entry.encode('utf-8'))
return m.hexdigest()
def resource_checksum(res_name, res_type, res_params=None): def resource_checksum(res_name, res_type, res_params=None):
"""Create a md5 checksum of the resource parameters. """Create a md5 checksum of the resource parameters.
@ -333,9 +427,7 @@ def resource_checksum(res_name, res_type, res_params=None):
:param res_type: resource type (e.g. IPaddr2) :param res_type: resource type (e.g. IPaddr2)
:param res_params: resource's parameters (e.g. "params ip=10.5.250.250") :param res_params: resource's parameters (e.g. "params ip=10.5.250.250")
""" """
data = [res_type]
m = hashlib.md5()
m.update(res_type.encode('utf-8'))
if res_params is not None: if res_params is not None:
m.update(res_params.encode('utf-8')) data.append(res_params)
return m.hexdigest() return generate_checksum(data)

View File

@ -26,6 +26,7 @@ import fcntl
import struct import struct
import time import time
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
import itertools
from base64 import b64decode from base64 import b64decode
@ -763,6 +764,24 @@ def set_cluster_symmetry():
pcmk.commit(cmd, failure_is_fatal=True) pcmk.commit(cmd, failure_is_fatal=True)
def add_score_location_rule(res_name, node, location_score):
"""Add or update a location rule that uses a score.
:param res_name: Resource that this location rule controls.
:type res_name: str
:param node: Node that this location rule relates to.
:type node: str
:param location_score: The score to give this location.
:type location_score: int
"""
loc_constraint_name = 'loc-{}-{}'.format(res_name, node)
pcmk.crm_update_location(
loc_constraint_name,
res_name,
location_score,
node)
def add_location_rules_for_local_nodes(res_name): def add_location_rules_for_local_nodes(res_name):
"""Add location rules for running resource on local nodes. """Add location rules for running resource on local nodes.
@ -783,6 +802,29 @@ def add_location_rules_for_local_nodes(res_name):
log('%s' % cmd, level=DEBUG) log('%s' % cmd, level=DEBUG)
def add_location_rules_for_pacemaker_remotes(res_names):
"""Add location rules for pacemaker remote resources on local nodes.
Add location rules allowing the pacemaker remote resource to run on a local
node. Use location score rules to spread resources out.
:param res_names: Pacemaker remote resource names.
:type res_names: List[str]
"""
res_names = sorted(res_names)
nodes = sorted(pcmk.list_nodes())
prefered_nodes = list(zip(res_names, itertools.cycle(nodes)))
for res_name in res_names:
for node in nodes:
location_score = 0
if (res_name, node) in prefered_nodes:
location_score = 200
add_score_location_rule(
res_name,
node,
location_score)
def configure_pacemaker_remote(remote_hostname, remote_ip): def configure_pacemaker_remote(remote_hostname, remote_ip):
"""Create a resource corresponding to the pacemaker remote node. """Create a resource corresponding to the pacemaker remote node.
@ -912,9 +954,14 @@ def configure_resources_on_remotes(resources=None, clones=None, groups=None):
'location constraints') 'location constraints')
log(msg, level=WARNING) log(msg, level=WARNING)
return return
pacemaker_remotes = []
for res_name, res_type in resources.items(): for res_name, res_type in resources.items():
if res_name not in list(clones.values()) + list(groups.values()): if res_name not in list(clones.values()) + list(groups.values()):
add_location_rules_for_local_nodes(res_name) if res_type == 'ocf:pacemaker:remote':
pacemaker_remotes.append(res_name)
else:
add_location_rules_for_local_nodes(res_name)
add_location_rules_for_pacemaker_remotes(pacemaker_remotes)
for cl_name in clones: for cl_name in clones:
add_location_rules_for_local_nodes(cl_name) add_location_rules_for_local_nodes(cl_name)
# Limit clone resources to only running on X number of nodes where X # Limit clone resources to only running on X number of nodes where X

View File

@ -34,6 +34,7 @@ class TestCorosyncConf(unittest.TestCase):
def setUp(self): def setUp(self):
self.tmpdir = tempfile.mkdtemp() self.tmpdir = tempfile.mkdtemp()
self.tmpfile = tempfile.NamedTemporaryFile(delete=False) self.tmpfile = tempfile.NamedTemporaryFile(delete=False)
os.environ['UNIT_STATE_DB'] = ':memory:'
def tearDown(self): def tearDown(self):
shutil.rmtree(self.tmpdir) shutil.rmtree(self.tmpdir)

View File

@ -704,6 +704,16 @@ class UtilsTestCase(unittest.TestCase):
utils.set_cluster_symmetry() utils.set_cluster_symmetry()
self.assertFalse(commit.called) self.assertFalse(commit.called)
@mock.patch('pcmk.crm_update_location')
def test_add_score_location_rule(self, crm_update_location):
# Check no update required
utils.add_score_location_rule('res1', 'juju-lxd-0', 0)
crm_update_location.assert_called_once_with(
'loc-res1-juju-lxd-0',
'res1',
0,
'juju-lxd-0')
@mock.patch('pcmk.commit') @mock.patch('pcmk.commit')
@mock.patch('pcmk.crm_opt_exists') @mock.patch('pcmk.crm_opt_exists')
@mock.patch('pcmk.list_nodes') @mock.patch('pcmk.list_nodes')
@ -717,6 +727,35 @@ class UtilsTestCase(unittest.TestCase):
'crm -w -F configure location loc-res1-node2 res1 0: node2', 'crm -w -F configure location loc-res1-node2 res1 0: node2',
failure_is_fatal=True) failure_is_fatal=True)
@mock.patch.object(utils, 'add_score_location_rule')
@mock.patch('pcmk.list_nodes')
def test_add_location_rules_for_pacemaker_remotes(self, list_nodes,
add_score_location_rule):
list_nodes.return_value = ['node1', 'node2', 'node3']
utils.add_location_rules_for_pacemaker_remotes([
'res1',
'res2',
'res3',
'res4',
'res5'])
expect = [
mock.call('res1', 'node1', 200),
mock.call('res1', 'node2', 0),
mock.call('res1', 'node3', 0),
mock.call('res2', 'node1', 0),
mock.call('res2', 'node2', 200),
mock.call('res2', 'node3', 0),
mock.call('res3', 'node1', 0),
mock.call('res3', 'node2', 0),
mock.call('res3', 'node3', 200),
mock.call('res4', 'node1', 200),
mock.call('res4', 'node2', 0),
mock.call('res4', 'node3', 0),
mock.call('res5', 'node1', 0),
mock.call('res5', 'node2', 200),
mock.call('res5', 'node3', 0)]
add_score_location_rule.assert_has_calls(expect)
@mock.patch('pcmk.is_resource_present') @mock.patch('pcmk.is_resource_present')
@mock.patch('pcmk.commit') @mock.patch('pcmk.commit')
def test_configure_pacemaker_remote(self, commit, is_resource_present): def test_configure_pacemaker_remote(self, commit, is_resource_present):

View File

@ -231,6 +231,8 @@ class TestPcmk(unittest.TestCase):
@mock.patch('subprocess.call') @mock.patch('subprocess.call')
def test_crm_update_resource(self, mock_call): def test_crm_update_resource(self, mock_call):
db = unitdata.kv()
db.set('res_test-IPaddr2', '')
mock_call.return_value = 0 mock_call.return_value = 0
with mock.patch.object(tempfile, "NamedTemporaryFile", with mock.patch.object(tempfile, "NamedTemporaryFile",