Added ability to create security group automatically

Added ability to ask Sahara to create security group for node group. Feature
is only implemented in vanilla plugin 1.2.1 so far.

Partially implements: blueprint cluster-secgroups

Change-Id: I21d0196396bb966fe3d88f5445e98aebe90ad94b
This commit is contained in:
Andrew Lazarev 2014-07-24 12:09:11 -07:00 committed by Sergey Reshetnyak
parent 7fac58c51e
commit bf303ba683
23 changed files with 300 additions and 12 deletions

View File

@ -39,6 +39,7 @@ NODE_GROUP_DEFAULTS = {
"volume_mount_prefix": "/volumes/disk",
"floating_ip_pool": None,
"security_groups": None,
"auto_security_group": False,
}
INSTANCE_DEFAULTS = {

View File

@ -75,8 +75,13 @@ class NodeGroup(object):
volumes_size
volume_mount_prefix
floating_ip_pool - Floating IP Pool name used to assign Floating IPs to
instances in this Node Group
instances in this Node Group
security_groups - List of security groups for instances in this Node Group
auto_security_group - indicates if Sahara should create additional
security group for the Node Group
open_ports - List of ports that will be opened if auto_security_group is
True
count
instances - list of Instance objects
node_group_template_id
@ -170,6 +175,7 @@ class NodeGroupTemplate(object):
volume_mount_prefix
floating_ip_pool
security_groups
auto_security_group
"""

View File

@ -187,7 +187,7 @@ class NodeGroupResource(Resource, objects.NodeGroup):
}
_filter_fields = ['id', 'tenant_id', 'cluster_id', 'cluster_template_id',
'image_username']
'image_username', 'open_ports']
class ClusterTemplateResource(Resource, objects.ClusterTemplate):

View File

@ -0,0 +1,49 @@
# Copyright 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""auto_security_groups
Revision ID: 010
Revises: 009
Create Date: 2014-07-21 14:31:49.685689
"""
# revision identifiers, used by Alembic.
revision = '010'
down_revision = '009'
from alembic import op
import sqlalchemy as sa
from sahara.db.sqlalchemy import types as st
def upgrade():
op.add_column('node_group_templates',
sa.Column('auto_security_group', sa.Boolean()))
op.add_column('node_groups',
sa.Column('auto_security_group', sa.Boolean()))
op.add_column('templates_relations',
sa.Column('auto_security_group', sa.Boolean()))
op.add_column('node_groups',
sa.Column('open_ports', st.JsonEncoded()))
def downgrade():
op.drop_column('node_group_templates', 'auto_security_group')
op.drop_column('node_groups', 'auto_security_group')
op.drop_column('templates_relations', 'auto_security_group')
op.drop_column('node_groups', 'open_ports')

View File

@ -111,6 +111,8 @@ class NodeGroup(mb.SaharaBase):
backref="node_groups", lazy='joined')
floating_ip_pool = sa.Column(sa.String(36))
security_groups = sa.Column(st.JsonListType())
auto_security_group = sa.Column(sa.Boolean())
open_ports = sa.Column(st.JsonListType())
def to_dict(self):
d = super(NodeGroup, self).to_dict()
@ -193,6 +195,7 @@ class NodeGroupTemplate(mb.SaharaBase):
volume_mount_prefix = sa.Column(sa.String(80))
floating_ip_pool = sa.Column(sa.String(36))
security_groups = sa.Column(st.JsonListType())
auto_security_group = sa.Column(sa.Boolean())
class TemplatesRelation(mb.SaharaBase):
@ -224,6 +227,7 @@ class TemplatesRelation(mb.SaharaBase):
lazy='joined')
floating_ip_pool = sa.Column(sa.String(36))
security_groups = sa.Column(st.JsonListType())
auto_security_group = sa.Column(sa.Boolean())
# EDP objects: DataSource, Job, Job Execution, JobBinary

View File

@ -81,6 +81,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def get_edp_engine(self, cluster, job_type):
pass
@plugins_base.required_with_default
def get_open_ports(self, node_group):
return []
@plugins_base.optional
def get_resource_manager_uri(self, cluster):
pass

View File

@ -64,3 +64,7 @@ class AbstractVersionHandler():
@abc.abstractmethod
def get_edp_engine(self, cluster, job_type):
return
@abc.abstractmethod
def get_open_ports(self, node_group):
return

View File

@ -92,3 +92,7 @@ class VanillaProvider(p.ProvisioningPluginBase):
def get_edp_engine(self, cluster, job_type):
return self._get_version_handler(
cluster.hadoop_version).get_edp_engine(cluster, job_type)
def get_open_ports(self, node_group):
return self._get_version_handler(
node_group.cluster.hadoop_version).get_open_ports(node_group)

View File

@ -492,3 +492,43 @@ class VersionHandler(avm.AbstractVersionHandler):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
return None
def get_open_ports(self, node_group):
cluster = node_group.cluster
ports = []
if "namenode" in node_group.node_processes:
ports.append(c_helper.get_port_from_config(
'HDFS', 'dfs.http.address', cluster))
ports.append(8020)
if "datanode" in node_group.node_processes:
ports.append(c_helper.get_port_from_config(
'HDFS', 'dfs.datanode.http.address', cluster))
ports.append(c_helper.get_port_from_config(
'HDFS', 'dfs.datanode.address', cluster))
ports.append(c_helper.get_port_from_config(
'HDFS', 'dfs.datanode.ipc.address', cluster))
if "jobtracker" in node_group.node_processes:
ports.append(c_helper.get_port_from_config(
'MapReduce', 'mapred.job.tracker.http.address', cluster))
ports.append(8021)
if "tasktracker" in node_group.node_processes:
ports.append(c_helper.get_port_from_config(
'MapReduce', 'mapred.task.tracker.http.address', cluster))
if "secondarynamenode" in node_group.node_processes:
ports.append(c_helper.get_port_from_config(
'HDFS', 'dfs.secondary.http.address', cluster))
if "oozie" in node_group.node_processes:
ports.append(11000)
if "hive" in node_group.node_processes:
ports.append(9999)
ports.append(10000)
return ports

View File

@ -147,3 +147,6 @@ class VersionHandler(avm.AbstractVersionHandler):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
return None
def get_open_ports(self, node_group):
return []

View File

@ -145,3 +145,6 @@ class VersionHandler(avm.AbstractVersionHandler):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
return None
def get_open_ports(self, node_group):
return []

View File

@ -0,0 +1,8 @@
"%(security_group_name)s": {
"Type": "OS::Neutron::SecurityGroup",
"Properties": {
"description": "%(security_group_description)s",
"name": "%(security_group_name)s",
"rules": %(rules)s
}
}

View File

@ -35,6 +35,8 @@ conductor = c.API
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
SSH_PORT = 22
class DirectEngine(e.Engine):
def create_cluster(self, cluster):
@ -149,6 +151,8 @@ class DirectEngine(e.Engine):
def _create_instances(self, cluster):
ctx = context.ctx()
cluster = self._create_auto_security_groups(cluster)
aa_groups = {}
for node_group in cluster.node_groups:
@ -157,11 +161,20 @@ class DirectEngine(e.Engine):
for idx in six.moves.xrange(1, count + 1):
self._run_instance(cluster, node_group, idx, aa_groups)
def _create_auto_security_groups(self, cluster):
ctx = context.ctx()
for node_group in cluster.node_groups:
if node_group.auto_security_group:
self._create_auto_security_group(node_group)
return conductor.cluster_get(ctx, cluster)
def _scale_cluster_instances(self, cluster, node_group_id_map):
ctx = context.ctx()
aa_groups = self._generate_anti_affinity_groups(cluster)
instances_to_delete = []
node_groups_to_enlarge = []
node_groups_to_delete = []
for node_group in cluster.node_groups:
new_count = node_group_id_map[node_group.id]
@ -169,8 +182,12 @@ class DirectEngine(e.Engine):
if new_count < node_group.count:
instances_to_delete += node_group.instances[new_count:
node_group.count]
if new_count == 0:
node_groups_to_delete.append(node_group)
elif new_count > node_group.count:
node_groups_to_enlarge.append(node_group)
if node_group.count == 0 and node_group.auto_security_group:
self._create_auto_security_group(node_group)
if instances_to_delete:
cluster = g.change_cluster_status(cluster, "Deleting Instances")
@ -178,6 +195,10 @@ class DirectEngine(e.Engine):
for instance in instances_to_delete:
self._shutdown_instance(instance)
self._await_deleted(cluster, instances_to_delete)
for ng in node_groups_to_delete:
self._delete_auto_security_group(ng)
cluster = conductor.cluster_get(ctx, cluster)
instances_to_add = []
@ -243,6 +264,30 @@ class DirectEngine(e.Engine):
return instance_id
def _create_auto_security_group(self, node_group):
cluster = node_group.cluster
name = g.generate_auto_security_group_name(
cluster.name, node_group.name)
nova_client = nova.client()
security_group = nova_client.security_groups.create(
name, "Auto security group created by Sahara for Node Group '%s' "
"of cluster '%s'." % (node_group.name, cluster.name))
# ssh remote needs ssh port, agents are not implemented yet
nova_client.security_group_rules.create(
security_group.id, 'tcp', SSH_PORT, SSH_PORT, "0.0.0.0/0")
# enable ports returned by plugin
for port in node_group.open_ports:
nova_client.security_group_rules.create(
security_group.id, 'tcp', port, port, "0.0.0.0/0")
security_groups = list(node_group.security_groups or [])
security_groups.append(security_group.id)
conductor.node_group_update(context.ctx(), node_group,
{"security_groups": security_groups})
return security_groups
def _assign_floating_ips(self, instances):
for instance in instances:
node_group = instance.node_group
@ -257,7 +302,7 @@ class DirectEngine(e.Engine):
active_ids = set()
while len(active_ids) != len(instances):
if not g.check_cluster_exists(instances[0].node_group.cluster):
if not g.check_cluster_exists(cluster):
return
for instance in instances:
if instance.id not in active_ids:
@ -268,14 +313,39 @@ class DirectEngine(e.Engine):
LOG.info(_LI("Cluster '%s': all instances are active"), cluster.id)
def _check_if_active(self, instance):
def _await_deleted(self, cluster, instances):
"""Await all instances are deleted."""
if not instances:
return
deleted_ids = set()
while len(deleted_ids) != len(instances):
if not g.check_cluster_exists(cluster):
return
for instance in instances:
if instance.id not in deleted_ids:
if self._check_if_deleted(instance):
LOG.debug("Instance '%s' is deleted" %
instance.instance_name)
deleted_ids.add(instance.id)
context.sleep(1)
def _check_if_active(self, instance):
server = nova.get_instance_info(instance)
if server.status == 'ERROR':
raise exc.SystemError(_("Node %s has error status") % server.name)
return server.status == 'ACTIVE'
def _check_if_deleted(self, instance):
try:
nova.get_instance_info(instance)
except nova_exceptions.NotFound:
return True
return False
def _rollback_cluster_creation(self, cluster, ex):
"""Shutdown all instances and update cluster status."""
LOG.info(_LI("Cluster '%(name)s' creation rollback "
@ -301,6 +371,20 @@ class DirectEngine(e.Engine):
for instance in node_group.instances:
self._shutdown_instance(instance)
self._await_deleted(cluster, node_group.instances)
self._delete_auto_security_group(node_group)
def _delete_auto_security_group(self, node_group):
if not node_group.auto_security_group:
return
name = node_group.security_groups[-1]
try:
nova.client().security_groups.delete(name)
except Exception:
LOG.exception("Failed to delete security group %s", name)
def _shutdown_instance(self, instance):
ctx = context.ctx()

View File

@ -154,9 +154,12 @@ def _prepare_provisioning(cluster_id):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
for nodegroup in cluster.node_groups:
conductor.node_group_update(
ctx, nodegroup,
{"image_username": INFRA.get_node_group_image_username(nodegroup)})
update_dict = {}
update_dict["image_username"] = INFRA.get_node_group_image_username(
nodegroup)
if nodegroup.auto_security_group:
update_dict["open_ports"] = plugin.get_open_ports(nodegroup)
conductor.node_group_update(ctx, nodegroup, update_dict)
cluster = conductor.cluster_get(ctx, cluster_id)

View File

@ -95,6 +95,7 @@ def check_all_configurations(data):
if data.get('node_groups'):
check_duplicates_node_groups_names(data['node_groups'])
for ng in data['node_groups']:
check_auto_security_group(data['name'], ng)
check_node_group_basic_fields(data['plugin_name'],
data['hadoop_version'],
ng, pl_confs)
@ -188,6 +189,16 @@ def check_duplicates_node_groups_names(node_groups):
_("Duplicates in node group names are detected"))
def check_auto_security_group(cluster_name, nodegroup):
if nodegroup.get('auto_security_group'):
name = g.generate_auto_security_group_name(
cluster_name, nodegroup['name'])
if name in [security_group.name for security_group in
nova.client().security_groups.list()]:
raise ex.NameAlreadyExistsException(
_("Security group with name '%s' already exists") % name)
# Cluster creation related checks
def check_cluster_unique_name(name):
@ -261,6 +272,7 @@ def check_node_groups_in_cluster_templates(cluster_name, plugin_name,
n_groups = c_t.to_wrapped_dict()['cluster_template']['node_groups']
check_network_config(n_groups)
for node_group in n_groups:
check_auto_security_group(cluster_name, node_group)
check_node_group_basic_fields(plugin_name, hadoop_version, node_group)
check_cluster_hostnames_lengths(cluster_name, n_groups)
@ -330,6 +342,7 @@ def check_add_node_groups(cluster, add_node_groups):
check_node_group_basic_fields(cluster.plugin_name,
cluster.hadoop_version, ng, pl_confs)
check_auto_security_group(cluster.name, ng)
# Cinder

View File

@ -75,6 +75,9 @@ NODE_GROUP_TEMPLATE_SCHEMA = {
"type": "string",
},
},
"auto_security_group": {
"type": "boolean"
},
},
"additionalProperties": False,
"required": [

View File

@ -116,6 +116,8 @@ class ClusterTest(test_base.ConductorManagerTestCase):
ng.pop("volumes_per_node")
ng.pop("floating_ip_pool")
ng.pop("image_username")
ng.pop("open_ports")
ng.pop("auto_security_group")
ng.pop("tenant_id")
self.assertEqual(SAMPLE_CLUSTER["node_groups"],

View File

@ -186,6 +186,7 @@ class ClusterTemplates(test_base.ConductorManagerTestCase):
ng.pop("volume_mount_prefix")
ng.pop("volumes_size")
ng.pop("volumes_per_node")
ng.pop("auto_security_group")
self.assertEqual(SAMPLE_CLT["node_groups"],
clt_db_obj["node_groups"])

View File

@ -389,3 +389,11 @@ class TestMigrations(base.BaseWalkMigrationTestCase, base.CommonTestsMixIn):
def _check_009(self, engine, date):
self.assertColumnExists(engine, 'clusters', 'rollback_info')
def _check_010(self, engine, date):
self.assertColumnExists(engine, 'node_group_templates',
'auto_security_group')
self.assertColumnExists(engine, 'node_groups', 'auto_security_group')
self.assertColumnExists(engine, 'templates_relations',
'auto_security_group')
self.assertColumnExists(engine, 'node_groups', 'open_ports')

View File

@ -57,9 +57,11 @@ class AbstractInstanceTest(base.SaharaWithDbTestCase):
class TestClusterRollBack(AbstractInstanceTest):
@mock.patch('sahara.service.direct_engine.DirectEngine._check_if_deleted')
@mock.patch('sahara.service.ops._prepare_provisioning')
@mock.patch('sahara.service.ops.INFRA')
def test_cluster_creation_with_errors(self, infra, prepare):
def test_cluster_creation_with_errors(self, infra, prepare,
deleted_checker):
infra.create_cluster.side_effect = self.engine.create_cluster
infra.rollback_cluster.side_effect = self.engine.rollback_cluster
@ -75,6 +77,8 @@ class TestClusterRollBack(AbstractInstanceTest):
self.nova.servers.list.return_value = [_mock_instance(1)]
deleted_checker.return_value = True
ops._provision_cluster(cluster.id)
ctx = context.ctx()
@ -249,7 +253,9 @@ class IpManagementTest(AbstractInstanceTest):
class ShutdownClusterTest(AbstractInstanceTest):
def test_delete_floating_ips(self):
@mock.patch('sahara.service.direct_engine.DirectEngine._check_if_deleted')
def test_delete_floating_ips(self, deleted_checker):
node_groups = [_make_ng_dict("test_group_1", "test_flavor",
["data node", "test tracker"], 2, 'pool')]
@ -262,6 +268,8 @@ class ShutdownClusterTest(AbstractInstanceTest):
self.engine._assign_floating_ips(instances_list)
deleted_checker.return_value = True
self.engine._shutdown_instances(cluster)
self.assertEqual(self.nova.floating_ips.delete.call_count, 2,
"Not expected floating IPs number found in delete")

View File

@ -29,7 +29,8 @@ def make_ng_dict(name, flavor, processes, count, instances=None, **kwargs):
instances = instances or []
dct = {'name': name, 'flavor_id': flavor, 'node_processes': processes,
'count': count, 'instances': instances, 'node_configs': {},
'security_groups': None}
'security_groups': None, 'auto_security_group': False,
'open_ports': []}
dct.update(kwargs)
return dct

View File

@ -113,3 +113,7 @@ def generate_etc_hosts(cluster):
def generate_instance_name(cluster_name, node_group_name, index):
return ("%s-%s-%03d" % (cluster_name, node_group_name, index)).lower()
def generate_auto_security_group_name(cluster_name, node_group_name):
return ("%s-%s" % (cluster_name, node_group_name)).lower()

View File

@ -30,6 +30,8 @@ from sahara.utils.openstack import base
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
SSH_PORT = 22
def client():
ctx = context.current()
@ -143,11 +145,35 @@ class ClusterTemplate(object):
aa_groups = {}
for ng in self.cluster.node_groups:
if ng.auto_security_group:
resources.extend(self._serialize_auto_security_group(ng))
for idx in range(0, self.node_groups_extra[ng.id]['node_count']):
resources.extend(self._serialize_instance(ng, idx, aa_groups))
return ',\n'.join(resources)
def _serialize_auto_security_group(self, ng):
fields = {
'security_group_name': g.generate_auto_security_group_name(
ng.cluster.name, ng.name),
'security_group_description':
"Auto security group created by Sahara for Node Group "
"'%s' of cluster '%s'." % (ng.name, ng.cluster.name),
'rules': self._serialize_auto_security_group_rules(ng)}
yield _load_template('security_group.heat', fields)
def _serialize_auto_security_group_rules(self, ng):
rules = []
for port in ng.open_ports:
rules.append({"remote_ip_prefix": "0.0.0.0/0", "protocol": "tcp",
"port_range_min": port, "port_range_max": port})
rules.append({"remote_ip_prefix": "0.0.0.0/0", "protocol": "tcp",
"port_range_min": SSH_PORT, "port_range_max": SSH_PORT})
return json.dumps(rules)
def _serialize_instance(self, ng, idx, aa_groups):
inst_name = _get_inst_name(self.cluster.name, ng.name, idx)
@ -157,7 +183,7 @@ class ClusterTemplate(object):
port_name = _get_port_name(inst_name)
yield self._serialize_port(port_name,
self.cluster.neutron_management_network,
ng.security_groups)
self._get_security_groups(ng))
nets = '"networks" : [{ "port" : { "Ref" : "%s" }}],' % port_name
@ -171,7 +197,8 @@ class ClusterTemplate(object):
if ng.security_groups:
security_groups = (
'"security_groups": %s,' % json.dumps(ng.security_groups))
'"security_groups": %s,' % json.dumps(
self._get_security_groups(ng)))
aa_names = []
for node_process in ng.node_processes:
@ -242,6 +269,14 @@ class ClusterTemplate(object):
return _load_template('volume.heat', fields)
def _get_security_groups(self, node_group):
if not node_group.auto_security_group:
return node_group.security_groups
return (list(node_group.security_groups or []) +
[{"Ref": g.generate_auto_security_group_name(
node_group.cluster.name, node_group.name)}])
class ClusterStack(object):
def __init__(self, tmpl, heat_stack):