Merge pull request #6 from platform9/private/sarun87/add-security-groups-stability-fixes

Adding security group functionality and various stability fixes
This commit is contained in:
Pushkar Acharya 2017-01-04 15:53:33 -08:00 committed by GitHub
commit 2ea33cb3bb
4 changed files with 383 additions and 25 deletions

View File

@ -13,12 +13,14 @@
# under the License.
from ConfigParser import ConfigParser
import boto3
from novaclient.v2 import client as novaclient
from oslo_log import log as logging
from neutron.common import exceptions
import botocore
from novaclient.v2 import client as novaclient
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
import boto3
import botocore
import time
aws_group = cfg.OptGroup(name='AWS', title='Options to connect to an AWS environment')
aws_opts = [
@ -46,10 +48,14 @@ def _process_exception(e, dry_run):
error_message = e.response['Error']['Message']
raise exceptions.AwsException(error_code=error_code,
message=error_message)
elif isinstance(e, exceptions.AwsException):
# If the exception is already an AwsException, do not nest it.
# Instead just propagate it up.
raise e
else:
# TODO: This might display all Exceptions to the user which
# might be irrelevant, keeping it until it becomes stable
error_message = e.message
error_message = e.msg
raise exceptions.AwsException(error_code="NeutronError",
message=error_message)
@ -73,6 +79,7 @@ class AwsUtils:
'aws_access_key_id': cfg.CONF.AWS.access_key,
'region_name': cfg.CONF.AWS.region_name
}
self._wait_time_sec = 60 * cfg.CONF.AWS.wait_time_min
def get_nova_client(self):
if self._nova_client is None:
@ -237,6 +244,10 @@ class AwsUtils:
DryRun=dry_run,
CidrBlock=cidr)['Vpc']['VpcId']
vpc = self._get_ec2_resource().Vpc(vpc_id)
waiter = self._get_ec2_client().get_waiter('vpc_available')
waiter.wait(
DryRun=dry_run,
VpcIds = [ vpc_id ])
vpc.create_tags(Tags=tags_list)
return vpc_id
@ -258,8 +269,13 @@ class AwsUtils:
def create_subnet_and_tags(self, vpc_id, cidr, tags_list, dry_run=False):
vpc = self._get_ec2_resource().Vpc(vpc_id)
subnet = vpc.create_subnet(
AvailabilityZone=cfg.CONF.AWS.az,
DryRun=dry_run,
CidrBlock=cidr)
waiter = self._get_ec2_client().get_waiter('subnet_available')
waiter.wait(
DryRun=dry_run,
SubnetIds = [ subnet.id ])
subnet.create_tags(Tags=tags_list)
@aws_exception
@ -332,3 +348,185 @@ class AwsUtils:
LOG.warning("Ignoring failure in creating default route to IG: %s" % e)
if not ignore_errors:
_process_exception(e, dry_run)
# Has ignore_errors special case so can't use decorator
def delete_default_route_to_ig(self, route_table_id, dry_run=False, ignore_errors=False):
try:
self._get_ec2_client().delete_route(
DryRun=dry_run,
RouteTableId=route_table_id,
DestinationCidrBlock='0.0.0.0/0'
)
except Exception as e:
if not ignore_errors:
_process_exception(e, dry_run)
else:
LOG.warning("Ignoring failure in deleting default route to IG: %s" % e)
# Security group
def _create_sec_grp_tags(self, secgrp, tags):
def _wait_for_state(start_time):
current_time = time.time()
if current_time - start_time > self._wait_time_sec:
raise loopingcall.LoopingCallDone(False)
try:
secgrp.reload()
secgrp.create_tags(Tags=tags)
except Exception as ex:
LOG.exception('Exception when adding tags to security groups.'
' Retrying.')
return
raise loopingcall.LoopingCallDone(True)
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_state, time.time())
return timer.start(interval=5).wait()
def _convert_openstack_rules_to_vpc(self, rules):
ingress_rules = []
egress_rules = []
for rule in rules:
rule_dict = {}
if rule['protocol'] is None:
rule_dict['IpProtocol'] = '-1'
rule_dict['FromPort'] = -1
rule_dict['ToPort'] = -1
elif rule['protocol'].lower() == 'icmp':
rule_dict['IpProtocol'] = '1'
rule_dict['ToPort'] = '-1'
# AWS allows only 1 type of ICMP traffic in 1 rule
# we choose the smaller of the port_min and port_max values
icmp_rule = rule.get('port_range_min', '-1')
if not icmp_rule:
# allow all ICMP traffic rule
icmp_rule = '-1'
rule_dict['FromPort'] = icmp_rule
else:
rule_dict['IpProtocol'] = rule['protocol']
if rule['port_range_min'] is None:
rule_dict['FromPort'] = 0
else:
rule_dict['FromPort'] = rule['port_range_min']
if rule['port_range_max'] is None:
rule_dict['ToPort'] = 65535
else:
rule_dict['ToPort'] = rule['port_range_max']
rule_dict['IpRanges'] = []
if rule['remote_group_id'] is not None:
rule_dict['IpRanges'].append({
'CidrIp': rule['remote_group_id']
})
elif rule['remote_ip_prefix'] is not None:
rule_dict['IpRanges'].append({
'CidrIp': rule['remote_ip_prefix']
})
else:
if rule['direction'] == 'egress':
# OpenStack does not populate allow all egress rule
# with remote_group_id or remote_ip_prefix keys.
rule_dict['IpRanges'].append({
'CidrIp': '0.0.0.0/0'
})
if rule['direction'] == 'egress':
egress_rules.append(rule_dict)
else:
ingress_rules.append(rule_dict)
return ingress_rules, egress_rules
def _refresh_sec_grp_rules(self, secgrp, ingress, egress):
old_ingress = secgrp.ip_permissions
old_egress = secgrp.ip_permissions_egress
if old_ingress:
secgrp.revoke_ingress(IpPermissions=old_ingress)
if old_egress:
secgrp.revoke_egress(IpPermissions=old_egress)
secgrp.authorize_ingress(IpPermissions=ingress)
secgrp.authorize_egress(IpPermissions=egress)
def _create_sec_grp_rules(self, secgrp, rules):
ingress, egress = self._convert_openstack_rules_to_vpc(rules)
def _wait_for_state(start_time):
current_time = time.time()
if current_time - start_time > self._wait_time_sec:
raise loopingcall.LoopingCallDone(False)
try:
self._refresh_sec_grp_rules(secgrp, ingress, egress)
except Exception as ex:
LOG.exception('Error creating security group rules. Retrying.')
return
raise loopingcall.LoopingCallDone(True)
timer = loopingcall.FixedIntervalLoopingCall(_wait_for_state,
time.time())
return timer.start(interval=5).wait()
def create_security_group_rules(self, ec2_secgrp, rules):
if self._create_sec_grp_rules(ec2_secgrp, rules) is False:
exceptions.AwsException(
message='Timed out creating security groups',
error_code='Time Out')
def create_security_group(self, name, description, vpc_id, os_secgrp_id,
tags):
if not description:
description = 'Created by Platform9 OpenStack'
secgrp = self._get_ec2_resource().create_security_group(
GroupName=name, Description=description, VpcId=vpc_id)
if self._create_sec_grp_tags(secgrp, tags) is False:
delete_sec_grp(secgrp.id)
raise exceptions.AwsException(
message='Timed out creating tags on security group',
error_code='Time Out')
return secgrp
@aws_exception
def get_sec_group_by_id(self, secgrp_id, vpc_id = None, dry_run=False):
filters = [{
'Name': 'tag-value',
'Values': [secgrp_id]
}]
if vpc_id:
filters.append({
'Name': 'vpc-id',
'Values': [vpc_id]
})
response = self._get_ec2_client().describe_security_groups(
DryRun=dry_run, Filters=filters)
if 'SecurityGroups' in response:
return response['SecurityGroups']
return []
@aws_exception
def delete_security_group(self, openstack_id):
aws_secgroups = self.get_sec_group_by_id(openstack_id)
for secgrp in aws_secgroups:
group_id = secgrp['GroupId']
self.delete_security_group_by_id(group_id)
@aws_exception
def delete_security_group_by_id(self, group_id):
ec2client = self._get_ec2_client()
ec2client.delete_security_group(GroupId=group_id)
@aws_exception
def _update_sec_group(self, ec2_id, old_ingress, old_egress, new_ingress,
new_egress):
sg = self._get_ec2_resource().SecurityGroup(ec2_id)
sg.revoke_ingress(IpPermissions=old_ingress)
time.sleep(1)
sg.revoke_egress(IpPermissions=old_egress)
time.sleep(1)
sg.authorize_ingress(IpPermissions=new_ingress)
time.sleep(1)
sg.authorize_egress(IpPermissions=new_egress)
@aws_exception
def update_sec_group(self, openstack_id, rules):
ingress, egress = self._convert_openstack_rules_to_vpc(rules)
aws_secgrps = self.get_sec_group_by_id(openstack_id)
for aws_secgrp in aws_secgrps:
old_ingress = aws_secgrp['IpPermissions']
old_egress = aws_secgrp['IpPermissionsEgress']
ec2_sg_id = aws_secgrp['GroupId']
self._update_sec_group(ec2_sg_id, old_ingress, old_egress, ingress,
egress)

View File

@ -0,0 +1,34 @@
# Copyright 2016 Platform9 Systems Inc.(http://www.platform9.com)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.callbacks import events
from neutron.callbacks import exceptions
from neutron.callbacks import registry
from neutron.callbacks import resources
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
def subscribe(mech_driver):
registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP,
events.BEFORE_DELETE)
registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP,
events.BEFORE_UPDATE)
registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP_RULE,
events.BEFORE_DELETE)
registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP_RULE,
events.BEFORE_UPDATE)
registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP_RULE,
events.BEFORE_CREATE)

View File

@ -13,9 +13,13 @@
# under the License.
from oslo_log import log
from neutron import manager
from neutron.callbacks import resources
from neutron.callbacks import events
from neutron.common.aws_utils import AwsUtils
from neutron.common.exceptions import AwsException
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers.aws import callbacks
import json
import random
@ -25,9 +29,11 @@ class AwsMechanismDriver(api.MechanismDriver):
"""Ml2 Mechanism driver for AWS"""
def __init__(self):
self.aws_utils = None
super(AwsMechanismDriver, self).__init__()
def initialize(self):
self.aws_utils = AwsUtils()
callbacks.subscribe(self)
# NETWORK
def create_network_precommit(self, context):
@ -65,10 +71,13 @@ class AwsMechanismDriver(api.MechanismDriver):
def create_subnet_precommit(self, context):
LOG.info("Create subnet for network %s" % context.network.current['id'])
# External Network doesn't exist on AWS, so no operations permitted
if 'provider:physical_network' in context.network.current and context.network.current['provider:physical_network'] == "external":
# Do not create subnets for external & provider networks. Only allow tenant network
# subnet creation at the moment.
return
if 'provider:physical_network' in context.network.current and \
context.network.current['provider:physical_network'] == "external":
# Do not create subnets for external & provider networks. Only
# allow tenant network subnet creation at the moment.
LOG.info('Creating external network %s' %
context.network.current['id'])
return
if context.current['ip_version'] == 6:
raise AwsException(error_code="IPv6Error", message="Cannot create subnets with IPv6")
@ -84,14 +93,16 @@ class AwsMechanismDriver(api.MechanismDriver):
vpc_cidr = context.current['cidr'][:-2] + '16'
tags = [
{'Key': 'Name', 'Value': neutron_network['name']},
{'Key': 'openstack_network_id', 'Value': neutron_network['id']}
{'Key': 'openstack_network_id', 'Value': neutron_network['id']},
{'Key': 'openstack_tenant_id', 'Value': context.current['tenant_id']}
]
associated_vpc_id = self.aws_utils.create_vpc_and_tags(cidr=vpc_cidr,
tags_list=tags)
# Create Subnet in AWS
tags = [
{'Key': 'Name', 'Value': context.current['name']},
{'Key': 'openstack_subnet_id', 'Value': context.current['id']}
{'Key': 'openstack_subnet_id', 'Value': context.current['id']},
{'Key': 'openstack_tenant_id', 'Value': context.current['tenant_id']}
]
self.aws_utils.create_subnet_and_tags(vpc_id=associated_vpc_id,
cidr=context.current['cidr'],
@ -132,8 +143,11 @@ class AwsMechanismDriver(api.MechanismDriver):
def delete_subnet_postcommit(self, context):
neutron_network = context.network.current
if 'provider:physical_network' in context.network.current and context.network.current[
'provider:physical_network'] == "external":
if 'provider:physical_network' in context.network.current and \
context.network.current['provider:physical_network'] == \
"external":
LOG.info('Deleting %s external network' %
context.network.current['id'])
return
try:
subnets = neutron_network['subnets']
@ -171,7 +185,11 @@ class AwsMechanismDriver(api.MechanismDriver):
if 'fixed_ips' in context.current:
if len(context.current['fixed_ips']) > 0:
fixed_ip_dict = context.current['fixed_ips'][0]
fixed_ip_dict['subnet_id'] = self.aws_utils.get_subnet_from_neutron_subnet_id(fixed_ip_dict['subnet_id'])
fixed_ip_dict['subnet_id'] = \
self.aws_utils.get_subnet_from_neutron_subnet_id(
fixed_ip_dict['subnet_id'])
secgroup_ids = context.current['security_groups']
self.create_security_groups_if_needed(context, secgroup_ids)
segment_id = random.choice(context.network.network_segments)[api.ID]
context.set_binding(segment_id,
@ -179,3 +197,82 @@ class AwsMechanismDriver(api.MechanismDriver):
json.dumps(fixed_ip_dict),
status='ACTIVE')
return True
def create_security_groups_if_needed(self, context, secgrp_ids):
core_plugin = manager.NeutronManager.get_plugin()
vpc_id = self.aws_utils.get_vpc_from_neutron_network_id(
context.current['network_id'])
for secgrp_id in secgrp_ids:
tags = [
{'Key': 'openstack_id', 'Value': secgrp_id},
{'Key': 'openstack_network_id',
'Value': context.current['network_id']}
]
secgrp = core_plugin.get_security_group(context._plugin_context,
secgrp_id)
aws_secgrp = self.aws_utils.get_sec_group_by_id(secgrp_id,
vpc_id=vpc_id)
if not aws_secgrp and secgrp['name'] != 'default':
grp_name = secgrp['name']
desc = secgrp['description']
rules = secgrp['security_group_rules']
ec2_secgrp = self.aws_utils.create_security_group(
grp_name, desc, vpc_id, secgrp_id, tags)
self.aws_utils.create_security_group_rules(ec2_secgrp, rules)
def delete_security_group(self, security_group_id):
self.aws_utils.delete_security_group(security_group_id)
def remove_security_group_rule(self, context, rule_id):
core_plugin = manager.NeutronManager.get_plugin()
rule = core_plugin.get_security_group_rule(context, rule_id)
secgrp_id = rule['security_group_id']
secgrp = core_plugin.get_security_group(context, secgrp_id)
old_rules = secgrp['security_group_rules']
for idx in range(len(old_rules)-1, -1, -1):
if old_rules[idx]['id'] == rule_id:
old_rules.pop(idx)
self.aws_utils.update_sec_group(secgrp_id, old_rules)
def add_security_group_rule(self, context, rule):
core_plugin = manager.NeutronManager.get_plugin()
secgrp_id = rule['security_group_id']
secgrp = core_plugin.get_security_group(context, secgrp_id)
old_rules = secgrp['security_group_rules']
old_rules.append(rule)
self.aws_utils.update_sec_group(secgrp_id, old_rules)
def update_security_group_rules(self, context, rule_id):
core_plugin = manager.NeutronManager.get_plugin()
rule = core_plugin.get_security_group_rule(context, rule_id)
secgrp_id = rule['security_group_id']
secgrp = core_plugin.get_security_group(context, secgrp_id)
old_rules = secgrp['security_group_rules']
for idx in range(len(old_rules)-1, -1, -1):
if old_rules[idx]['id'] == rule_id:
old_rules.pop(idx)
break
old_rules.append(rule)
self.aws_utils.update_sec_group(secgrp_id, old_rules)
def secgroup_callback(self, resource, event, trigger, **kwargs):
if resource == resources.SECURITY_GROUP:
if event == events.BEFORE_DELETE:
security_group_id = kwargs.get('security_group_id')
if security_group_id:
self.delete_security_group(security_group_id)
else:
LOG.warn('Security group ID not found in delete request')
elif resource == resources.SECURITY_GROUP_RULE:
context = kwargs['context']
if event == events.BEFORE_CREATE:
rule = kwargs['security_group_rule']
self.add_security_group_rule(context, rule)
elif event == events.BEFORE_DELETE:
rule_id = kwargs['security_group_rule_id']
self.remove_security_group_rule(context, rule_id)
elif event == events.BEFORE_UPDATE:
rule_id = kwargs['security_group_rule_id']
self.update_security_group_rules(context, rule_id)

View File

@ -24,10 +24,11 @@ from neutron.plugins.common import constants
from neutron.quota import resource_registry
from neutron.services import service_base
from oslo_log import log as logging
from neutron.common.aws_utils import AwsUtils
from neutron.common import exceptions
from neutron.db import securitygroups_db
from neutron.common.aws_utils import AwsUtils
LOG = logging.getLogger(__name__)
class AwsRouterPlugin(service_base.ServicePluginBase,
@ -82,13 +83,22 @@ class AwsRouterPlugin(service_base.ServicePluginBase,
port_id = floatingip['floatingip']['port_id']
self._associate_floatingip_to_port(context, public_ip_allocated, port_id)
except Exception as e:
LOG.error("Error in Allocating EIP: %s " % e)
LOG.error("Error in Creation/Allocating EIP")
if public_ip_allocated:
LOG.error("Deleting Elastic IP: %s" % public_ip_allocated)
self.aws_utils.delete_elastic_ip(public_ip_allocated)
raise e
return super(AwsRouterPlugin, self).create_floatingip(
context, floatingip,
initial_status=n_const.FLOATINGIP_STATUS_DOWN)
try:
res = super(AwsRouterPlugin, self).create_floatingip(
context, floatingip,
initial_status=n_const.FLOATINGIP_STATUS_DOWN)
except Exception as e:
LOG.error("Error when adding floating ip in openstack. Deleting Elastic IP: %s" % public_ip_allocated)
self.aws_utils.delete_elastic_ip(public_ip_allocated)
raise e
return res
def _associate_floatingip_to_port(self, context, floating_ip_address, port_id):
port = self._core_plugin.get_port(context, port_id)
ec2_id = None
@ -123,15 +133,30 @@ class AwsRouterPlugin(service_base.ServicePluginBase,
self._associate_floatingip_to_port(context,
floating_ip_dict['floating_ip_address'], port_id)
else:
# Port Disassociate
self.aws_utils.disassociate_elastic_ip_from_ec2_instance(floating_ip_dict['floating_ip_address'])
try:
# Port Disassociate
self.aws_utils.disassociate_elastic_ip_from_ec2_instance(floating_ip_dict['floating_ip_address'])
except exceptions.AwsException as e:
if 'Association ID not found' in e.msg:
# Since its already disassociated on EC2, we continue and remove the association here.
LOG.warn("Association for Elastic IP not found. Probable out of band change on EC2.")
elif 'InvalidAddress.NotFound' in e.msg:
LOG.warn("Elastic IP cannot be found in EC2. Probably removed out of band on EC2.")
else:
raise e
return super(AwsRouterPlugin, self).update_floatingip(context, id, floatingip)
def delete_floatingip(self, context, id):
floating_ip = super(AwsRouterPlugin, self).get_floatingip(context, id)
floating_ip_address = floating_ip['floating_ip_address']
LOG.info("Deleting elastic IP %s" % floating_ip_address)
self.aws_utils.delete_elastic_ip(floating_ip_address)
try:
self.aws_utils.delete_elastic_ip(floating_ip_address)
except exceptions.AwsException as e:
if 'InvalidAddress.NotFound' in e.msg:
LOG.warn("Elastic IP not found on AWS. Cleaning up neutron db")
else:
raise e
return super(AwsRouterPlugin, self).delete_floatingip(context, id)
##### ROUTERS #####
@ -211,6 +236,10 @@ class AwsRouterPlugin(service_base.ServicePluginBase,
return super(AwsRouterPlugin, self).add_router_interface(context, router_id, interface_info)
def remove_router_interface(self, context, router_id, interface_info):
LOG.info("Deleting subnet %s from router %s" % (interface_info['subnet_id'], router_id))
# TODO: Need to delete the route entry in the Route Table of AWS
LOG.info("Deleting port %s from router %s" % (interface_info['port_id'], router_id))
self.aws_utils.detach_internet_gateway_by_router_id(router_id)
route_tables = self.aws_utils.get_route_table_by_router_id(router_id)
if route_tables:
route_table_id = route_tables[0]['RouteTableId']
self.aws_utils.delete_default_route_to_ig(route_table_id)
return super(AwsRouterPlugin, self).remove_router_interface(context, router_id, interface_info)