diff --git a/doc/source/userdoc/features.rst b/doc/source/userdoc/features.rst index bef8f25b2b..3c1fb5d630 100644 --- a/doc/source/userdoc/features.rst +++ b/doc/source/userdoc/features.rst @@ -93,19 +93,10 @@ is not reliable because all replicas may turn up on one physical machine. Anti-affinity feature provides an ability to explicitly tell Sahara to run specified processes on different compute nodes. This is especially useful for Hadoop datanode process to make HDFS replicas reliable. -.. _`enable-anti-affinity`: - -The Anti-Affinity feature requires certain scheduler filters to be enabled on Nova. -Edit your ``/etc/nova/nova.conf`` in the following way: - -.. sourcecode:: cfg - - [DEFAULT] - - ... - - scheduler_driver=nova.scheduler.filter_scheduler.FilterScheduler - scheduler_default_filters=DifferentHostFilter,SameHostFilter +Starting with Juno release Sahara creates server groups with +``anti-affinity`` policy to enable anti affinity feature. Sahara creates one +server group per cluster and assigns all instances with affected processes to +this server group. Refer to Nova documentation on how server groups work. This feature is supported by all plugins out of the box. diff --git a/doc/source/userdoc/installation.guide.rst b/doc/source/userdoc/installation.guide.rst index 0c7abbbe26..138d44ce21 100644 --- a/doc/source/userdoc/installation.guide.rst +++ b/doc/source/userdoc/installation.guide.rst @@ -156,9 +156,6 @@ To install into a virtual environment Notes: ------ -One of the :doc:`Sahara Features `, Anti-Affinity, requires a Nova adjustment. -See :ref:`Enabling Anti-Affinity ` for details. But that is purely optional. - Make sure that your operating system is not blocking Sahara port (default: 8386). You may need to configure iptables in CentOS and some other operating systems. diff --git a/doc/source/userdoc/upgrade.guide.rst b/doc/source/userdoc/upgrade.guide.rst index e8c6af99ef..d5cca6d6e9 100644 --- a/doc/source/userdoc/upgrade.guide.rst +++ b/doc/source/userdoc/upgrade.guide.rst @@ -50,3 +50,21 @@ in the documentation. Note, this change breaks Sahara backward compatibility for clusters created using HEAT infrastructure engine before the change. Clusters will continue to operate, but it is not recommended to perform scale operation over them. + +Anti affinity implementation changed +++++++++++++++++++++++++++++++++++++ + +Starting with Juno release anti affinity feature is implemented using server +groups. There should not be much difference in Sahara behaviour from user +perspective, but there are internal changes: + +1) Server group object will be created if anti affinity feature is enabled +2) New implementation doesn't allow several affected instances on the same + host even if they don't have common processes. So, if anti affinity enabled + for 'datanode' and 'tasktracker' processes, previous implementation allowed + to have instance with 'datanode' process and other instance with + 'tasktracker' process on one host. New implementation guarantees that + instances will be on different hosts. + +Note, new implementation will be applied for new clusters only. Old +implementation will be applied if user scales cluster created in Icehouse. diff --git a/sahara/resources/aa_server_group.heat b/sahara/resources/aa_server_group.heat new file mode 100644 index 0000000000..b0ab9a9a5a --- /dev/null +++ b/sahara/resources/aa_server_group.heat @@ -0,0 +1,7 @@ + "%(server_group_name)s" : { + "Type" : "OS::Nova::ServerGroup", + "Properties" : { + "name" : "%(server_group_name)s", + "policies": ["anti-affinity"] + } + } \ No newline at end of file diff --git a/sahara/service/direct_engine.py b/sahara/service/direct_engine.py index f992ef5e42..045cab1561 100644 --- a/sahara/service/direct_engine.py +++ b/sahara/service/direct_engine.py @@ -134,6 +134,7 @@ class DirectEngine(e.Engine): context.ctx(), cluster, {'rollback_info': rollback_info}) return cluster + # TODO(alazarev) remove when we fully switch to server groups def _generate_anti_affinity_groups(self, cluster): aa_groups = {} @@ -153,13 +154,52 @@ class DirectEngine(e.Engine): cluster = self._create_auto_security_groups(cluster) - aa_groups = {} + aa_group = None + if cluster.anti_affinity: + aa_group = self._create_aa_server_group(cluster) for node_group in cluster.node_groups: count = node_group.count conductor.node_group_update(ctx, node_group, {'count': 0}) for idx in six.moves.xrange(1, count + 1): - self._run_instance(cluster, node_group, idx, aa_groups) + self._run_instance(cluster, node_group, idx, aa_group=aa_group) + + def _create_aa_server_group(self, cluster): + server_group_name = g.generate_aa_group_name(cluster.name) + client = nova.client().server_groups + + if client.findall(name=server_group_name): + raise exc.InvalidDataException( + _("Server group with name %s is already exists") + % server_group_name) + + server_group = client.create(name=server_group_name, + policies=['anti-affinity']) + return server_group.id + + def _delete_aa_server_group(self, cluster): + if cluster.anti_affinity: + server_group_name = g.generate_aa_group_name(cluster.name) + client = nova.client().server_groups + + server_groups = client.findall(name=server_group_name) + if len(server_groups) == 1: + client.delete(server_groups[0].id) + + def _find_aa_server_group(self, cluster): + server_group_name = g.generate_aa_group_name(cluster.name) + server_groups = nova.client().server_groups.findall( + name=server_group_name) + + if len(server_groups) > 1: + raise exc.IncorrectStateError( + _("Several server groups with name %s found") + % server_group_name) + + if len(server_groups) == 1: + return server_groups[0].id + + return None def _create_auto_security_groups(self, cluster): ctx = context.ctx() @@ -171,7 +211,14 @@ class DirectEngine(e.Engine): def _scale_cluster_instances(self, cluster, node_group_id_map): ctx = context.ctx() - aa_groups = self._generate_anti_affinity_groups(cluster) + + aa_group = None + old_aa_groups = None + if cluster.anti_affinity: + aa_group = self._find_aa_server_group(cluster) + if not aa_group: + old_aa_groups = self._generate_anti_affinity_groups(cluster) + instances_to_delete = [] node_groups_to_enlarge = [] node_groups_to_delete = [] @@ -207,8 +254,9 @@ class DirectEngine(e.Engine): for node_group in node_groups_to_enlarge: count = node_group_id_map[node_group.id] for idx in six.moves.xrange(node_group.count + 1, count + 1): - instance_id = self._run_instance(cluster, node_group, idx, - aa_groups) + instance_id = self._run_instance( + cluster, node_group, idx, + aa_group=aa_group, old_aa_groups=old_aa_groups) instances_to_add.append(instance_id) return instances_to_add @@ -220,21 +268,26 @@ class DirectEngine(e.Engine): return None - def _run_instance(self, cluster, node_group, idx, aa_groups): + def _run_instance(self, cluster, node_group, idx, aa_group=None, + old_aa_groups=None): """Create instance using nova client and persist them into DB.""" ctx = context.ctx() name = g.generate_instance_name(cluster.name, node_group.name, idx) userdata = self._generate_user_data_script(node_group, name) - # aa_groups: node process -> instance ids - aa_ids = [] - for node_process in node_group.node_processes: - aa_ids += aa_groups.get(node_process) or [] + if old_aa_groups: + # aa_groups: node process -> instance ids + aa_ids = [] + for node_process in node_group.node_processes: + aa_ids += old_aa_groups.get(node_process) or [] - # create instances only at hosts w/ no instances - # w/ aa-enabled processes - hints = {'different_host': sorted(set(aa_ids))} if aa_ids else None + # create instances only at hosts w/ no instances + # w/ aa-enabled processes + hints = {'different_host': sorted(set(aa_ids))} if aa_ids else None + else: + hints = {'group': aa_group} if ( + aa_group and self._need_aa_server_group(node_group)) else None if CONF.use_neutron: net_id = cluster.neutron_management_network @@ -255,12 +308,14 @@ class DirectEngine(e.Engine): instance_id = conductor.instance_add(ctx, node_group, {"instance_id": nova_instance.id, "instance_name": name}) - # save instance id to aa_groups to support aa feature - for node_process in node_group.node_processes: - if node_process in cluster.anti_affinity: - aa_group_ids = aa_groups.get(node_process, []) - aa_group_ids.append(nova_instance.id) - aa_groups[node_process] = aa_group_ids + + if old_aa_groups: + # save instance id to aa_groups to support aa feature + for node_process in node_group.node_processes: + if node_process in cluster.anti_affinity: + aa_group_ids = old_aa_groups.get(node_process, []) + aa_group_ids.append(nova_instance.id) + old_aa_groups[node_process] = aa_group_ids return instance_id @@ -288,6 +343,12 @@ class DirectEngine(e.Engine): {"security_groups": security_groups}) return security_groups + def _need_aa_server_group(self, node_group): + for node_process in node_group.node_processes: + if node_process in node_group.cluster.anti_affinity: + return True + return False + def _assign_floating_ips(self, instances): for instance in instances: node_group = instance.node_group @@ -415,3 +476,4 @@ class DirectEngine(e.Engine): """Shutdown specified cluster and all related resources.""" self._shutdown_instances(cluster) self._clean_job_executions(cluster) + self._delete_aa_server_group(cluster) diff --git a/sahara/tests/unit/resources/test_serialize_resources_aa.heat b/sahara/tests/unit/resources/test_serialize_resources_aa.heat index 942ca7794c..df34957c03 100644 --- a/sahara/tests/unit/resources/test_serialize_resources_aa.heat +++ b/sahara/tests/unit/resources/test_serialize_resources_aa.heat @@ -3,6 +3,13 @@ "Description" : "Hadoop Cluster by Sahara", "Resources" : { + "cluster-aa-group" : { + "Type" : "OS::Nova::ServerGroup", + "Properties" : { + "name" : "cluster-aa-group", + "policies": ["anti-affinity"] + } + }, "cluster-worker-001-port" : { "Type" : "OS::Neutron::Port", "Properties" : { @@ -25,6 +32,7 @@ "admin_user": "root", "networks" : [{ "port" : { "Ref" : "cluster-worker-001-port" }}], "key_name" : "user_key", + "scheduler_hints" : {"group": {"Ref": "cluster-aa-group"}}, "user_data": { "Fn::Join" : ["\n", ["line2", "line3"]] } @@ -52,7 +60,7 @@ "admin_user": "root", "networks" : [{ "port" : { "Ref" : "cluster-worker-002-port" }}], "key_name" : "user_key", - "scheduler_hints" : {"different_host": [{"Ref": "cluster-worker-001"}]}, + "scheduler_hints" : {"group": {"Ref": "cluster-aa-group"}}, "user_data": { "Fn::Join" : ["\n", ["line2", "line3"]] } diff --git a/sahara/tests/unit/service/test_instances.py b/sahara/tests/unit/service/test_instances.py index d769344d9b..2a92b69a04 100644 --- a/sahara/tests/unit/service/test_instances.py +++ b/sahara/tests/unit/service/test_instances.py @@ -15,7 +15,6 @@ import mock from novaclient import exceptions as nova_exceptions -import six from sahara import conductor as cond from sahara import context @@ -42,6 +41,7 @@ class AbstractInstanceTest(base.SaharaWithDbTestCase): self.novaclient_patcher = mock.patch( 'sahara.utils.openstack.nova.client') self.nova = _create_nova_mock(self.novaclient_patcher.start()) + self.nova.server_groups.findall.return_value = [] self.get_userdata_patcher = mock.patch( 'sahara.utils.remote.get_userdata_template') @@ -89,24 +89,25 @@ class TestClusterRollBack(AbstractInstanceTest): class NodePlacementTest(AbstractInstanceTest): def test_one_node_groups_and_one_affinity_group(self): + self.nova.server_groups.create.return_value = mock.Mock(id='123') + node_groups = [_make_ng_dict('test_group', 'test_flavor', ['data node'], 2)] cluster = _create_cluster_mock(node_groups, ["data node"]) self.engine._create_instances(cluster) userdata = _generate_user_data_script(cluster) - self.nova.servers.create.assert_has_calls( [mock.call("test_cluster-test_group-001", "initial", "test_flavor", - scheduler_hints=None, + scheduler_hints={'group': "123"}, userdata=userdata, key_name='user_keypair', security_groups=None), mock.call("test_cluster-test_group-002", "initial", "test_flavor", - scheduler_hints={'different_host': ["1"]}, + scheduler_hints={'group': "123"}, userdata=userdata, key_name='user_keypair', security_groups=None)], @@ -117,10 +118,13 @@ class NodePlacementTest(AbstractInstanceTest): self.assertEqual(len(cluster_obj.node_groups[0].instances), 2) def test_one_node_groups_and_no_affinity_group(self): + self.nova.server_groups.create.return_value = mock.Mock(id='123') + node_groups = [_make_ng_dict('test_group', 'test_flavor', ['data node', 'task tracker'], 2)] cluster = _create_cluster_mock(node_groups, []) + self.engine._create_instances(cluster) userdata = _generate_user_data_script(cluster) @@ -146,6 +150,8 @@ class NodePlacementTest(AbstractInstanceTest): self.assertEqual(len(cluster_obj.node_groups[0].instances), 2) def test_two_node_groups_and_one_affinity_group(self): + self.nova.server_groups.create.return_value = mock.Mock(id='123') + node_groups = [_make_ng_dict("test_group_1", "test_flavor", ["data node", "test tracker"], 2), _make_ng_dict("test_group_2", "test_flavor", @@ -155,61 +161,25 @@ class NodePlacementTest(AbstractInstanceTest): self.engine._create_instances(cluster) userdata = _generate_user_data_script(cluster) - def _find_created_at(idx): - """Find the #N instance creation call. - - To determine which instance was created first, we should check - scheduler hints For example we should find call with scheduler - hint different_hosts = [1, 2] and it's the third call of instance - create. - """ - different_hosts = [] - for instance_id in six.moves.xrange(1, idx): - different_hosts.append(str(instance_id)) - scheduler_hints = ({'different_host': different_hosts} - if different_hosts else None) - - for call in self.nova.servers.create.mock_calls: - if call[2]['scheduler_hints'] == scheduler_hints: - return call[1][0] - - self.fail("Couldn't find call with scheduler_hints='%s'" - % scheduler_hints) - - # find instance names in instance create calls - instance_names = [] - for idx in six.moves.xrange(1, 4): - instance_name = _find_created_at(idx) - if instance_name in instance_names: - self.fail("Create instance was called twice with the same " - "instance name='%s'" % instance_name) - instance_names.append(instance_name) - - self.assertEqual(3, len(instance_names)) - self.assertEqual(set(['test_cluster-test_group_1-001', - 'test_cluster-test_group_1-002', - 'test_cluster-test_group_2-001']), - set(instance_names)) - self.nova.servers.create.assert_has_calls( - [mock.call(instance_names[0], + [mock.call('test_cluster-test_group_1-001', "initial", "test_flavor", - scheduler_hints=None, + scheduler_hints={'group': "123"}, userdata=userdata, key_name='user_keypair', security_groups=None), - mock.call(instance_names[1], + mock.call('test_cluster-test_group_1-002', "initial", "test_flavor", - scheduler_hints={'different_host': ["1"]}, + scheduler_hints={'group': "123"}, userdata=userdata, key_name='user_keypair', security_groups=None), - mock.call(instance_names[2], + mock.call('test_cluster-test_group_2-001', "initial", "test_flavor", - scheduler_hints={'different_host': ["1", "2"]}, + scheduler_hints={'group': "123"}, userdata=userdata, key_name='user_keypair', security_groups=None)], diff --git a/sahara/tests/unit/utils/test_heat.py b/sahara/tests/unit/utils/test_heat.py index b3b5a87c8b..d0dd0f7322 100644 --- a/sahara/tests/unit/utils/test_heat.py +++ b/sahara/tests/unit/utils/test_heat.py @@ -45,23 +45,6 @@ class TestHeat(testtools.TestCase): userdata = "line1\nline2" self.assertEqual(h._prepare_userdata(userdata), '"line1",\n"line2"') - def test_get_anti_affinity_scheduler_hints(self): - inst_names = ['i1', 'i2'] - expected = ('"scheduler_hints" : {"different_host": ' - '[{"Ref": "i1"}, {"Ref": "i2"}]},') - actual = h._get_anti_affinity_scheduler_hints(inst_names) - self.assertEqual(expected, actual) - - inst_names = ['i1', 'i1'] - expected = '"scheduler_hints" : {"different_host": [{"Ref": "i1"}]},' - actual = h._get_anti_affinity_scheduler_hints(inst_names) - self.assertEqual(expected, actual) - - inst_names = [] - expected = '' - actual = h._get_anti_affinity_scheduler_hints(inst_names) - self.assertEqual(expected, actual) - class TestClusterTemplate(base.SaharaWithDbTestCase): """Checks valid structure of Resources section in generated Heat templates. @@ -86,13 +69,13 @@ class TestClusterTemplate(base.SaharaWithDbTestCase): image_username='root') return ng1, ng2 - def _make_cluster(self, mng_network, ng1, ng2): + def _make_cluster(self, mng_network, ng1, ng2, anti_affinity=[]): return tu.create_cluster("cluster", "tenant1", "general", "1.2.1", [ng1, ng2], user_keypair_id='user_key', neutron_management_network=mng_network, - default_image_id='1', anti_affinity=[], - image_id=None) + default_image_id='1', image_id=None, + anti_affinity=anti_affinity) def _make_heat_template(self, cluster, ng1, ng2): heat_template = h.ClusterTemplate(cluster) @@ -102,6 +85,24 @@ class TestClusterTemplate(base.SaharaWithDbTestCase): get_ud_generator('line2\nline3')) return heat_template + def test_get_anti_affinity_scheduler_hints(self): + ng1, ng2 = self._make_node_groups('floating') + cluster = self._make_cluster('private_net', ng1, ng2, + anti_affinity=["datanode"]) + heat_template = self._make_heat_template(cluster, ng1, ng2) + + ng1 = [ng for ng in cluster.node_groups if ng.name == "master"][0] + ng2 = [ng for ng in cluster.node_groups if ng.name == "worker"][0] + + expected = ('"scheduler_hints" : ' + '{"group": {"Ref": "cluster-aa-group"}},') + actual = heat_template._get_anti_affinity_scheduler_hints(ng2) + self.assertEqual(expected, actual) + + expected = '' + actual = heat_template._get_anti_affinity_scheduler_hints(ng1) + self.assertEqual(expected, actual) + def test_load_template_use_neutron(self): """Test for Heat cluster template with Neutron enabled. diff --git a/sahara/utils/general.py b/sahara/utils/general.py index 513f8e785f..e5c5d883f5 100644 --- a/sahara/utils/general.py +++ b/sahara/utils/general.py @@ -123,3 +123,7 @@ def generate_instance_name(cluster_name, node_group_name, index): def generate_auto_security_group_name(cluster_name, node_group_name): return ("%s-%s" % (cluster_name, node_group_name)).lower() + + +def generate_aa_group_name(cluster_name): + return ("%s-aa-group" % cluster_name).lower() diff --git a/sahara/utils/openstack/heat.py b/sahara/utils/openstack/heat.py index 7a0f351a2d..8cd5040fd4 100644 --- a/sahara/utils/openstack/heat.py +++ b/sahara/utils/openstack/heat.py @@ -61,6 +61,10 @@ def _get_inst_name(cluster_name, ng_name, index): return g.generate_instance_name(cluster_name, ng_name, index + 1) +def _get_aa_group_name(cluster_name): + return g.generate_aa_group_name(cluster_name) + + def _get_port_name(inst_name): return '%s-port' % inst_name @@ -81,16 +85,6 @@ def _get_volume_attach_name(inst_name, volume_idx): return '%s-volume-attachment-%i' % (inst_name, volume_idx) -def _get_anti_affinity_scheduler_hints(instances_names): - if not instances_names: - return '' - - aa_list = [] - for instances_name in sorted(set(instances_names)): - aa_list.append({"Ref": instances_name}) - return '"scheduler_hints" : %s,' % json.dumps({"different_host": aa_list}) - - def _load_template(template_name, fields): template_file = f.get_file_text('resources/%s' % template_name) return template_file.rstrip() % fields @@ -121,6 +115,7 @@ class ClusterTemplate(object): def instantiate(self, update_existing, disable_rollback=True): main_tmpl = _load_template('main.heat', {'resources': self._serialize_resources()}) + heat = client() kwargs = { @@ -140,15 +135,31 @@ class ClusterTemplate(object): return ClusterStack(self, get_stack(self.cluster.name)) + def _need_aa_server_group(self, node_group): + for node_process in node_group.node_processes: + if node_process in self.cluster.anti_affinity: + return True + return False + + def _get_anti_affinity_scheduler_hints(self, node_group): + if not self._need_aa_server_group(node_group): + return '' + + return ('"scheduler_hints" : %s,' % + json.dumps({"group": {"Ref": _get_aa_group_name( + self.cluster.name)}})) + def _serialize_resources(self): resources = [] - aa_groups = {} + + if self.cluster.anti_affinity: + resources.extend(self._serialize_aa_server_group()) for ng in self.cluster.node_groups: if ng.auto_security_group: resources.extend(self._serialize_auto_security_group(ng)) for idx in range(0, self.node_groups_extra[ng.id]['node_count']): - resources.extend(self._serialize_instance(ng, idx, aa_groups)) + resources.extend(self._serialize_instance(ng, idx)) return ',\n'.join(resources) @@ -174,7 +185,7 @@ class ClusterTemplate(object): return json.dumps(rules) - def _serialize_instance(self, ng, idx, aa_groups): + def _serialize_instance(self, ng, idx): inst_name = _get_inst_name(self.cluster.name, ng.name, idx) nets = '' @@ -200,10 +211,6 @@ class ClusterTemplate(object): '"security_groups": %s,' % json.dumps( self._get_security_groups(ng))) - aa_names = [] - for node_process in ng.node_processes: - aa_names += aa_groups.get(node_process) or [] - # Check if cluster contains user key-pair and include it to template. key_name = '' if self.cluster.user_keypair_id: @@ -219,16 +226,10 @@ class ClusterTemplate(object): 'network_interfaces': nets, 'key_name': key_name, 'userdata': _prepare_userdata(userdata), - 'scheduler_hints': _get_anti_affinity_scheduler_hints( - aa_names), + 'scheduler_hints': + self._get_anti_affinity_scheduler_hints(ng), 'security_groups': security_groups} - for node_process in ng.node_processes: - if node_process in self.cluster.anti_affinity: - aa_group_names = aa_groups.get(node_process, []) - aa_group_names.append(inst_name) - aa_groups[node_process] = aa_group_names - yield _load_template('instance.heat', fields) for idx in range(0, ng.volumes_per_node): @@ -277,6 +278,11 @@ class ClusterTemplate(object): [{"Ref": g.generate_auto_security_group_name( node_group.cluster.name, node_group.name)}]) + def _serialize_aa_server_group(self): + fields = {'server_group_name': _get_aa_group_name(self.cluster.name)} + + yield _load_template('aa_server_group.heat', fields) + class ClusterStack(object): def __init__(self, tmpl, heat_stack):