diff --git a/neutron/neutron/common/aws_utils.py b/neutron/neutron/common/aws_utils.py index ea16258..fd4e07d 100644 --- a/neutron/neutron/common/aws_utils.py +++ b/neutron/neutron/common/aws_utils.py @@ -13,12 +13,14 @@ # under the License. from ConfigParser import ConfigParser -import boto3 -from novaclient.v2 import client as novaclient -from oslo_log import log as logging from neutron.common import exceptions -import botocore +from novaclient.v2 import client as novaclient from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import loopingcall +import boto3 +import botocore +import time aws_group = cfg.OptGroup(name='AWS', title='Options to connect to an AWS environment') aws_opts = [ @@ -46,10 +48,14 @@ def _process_exception(e, dry_run): error_message = e.response['Error']['Message'] raise exceptions.AwsException(error_code=error_code, message=error_message) + elif isinstance(e, exceptions.AwsException): + # If the exception is already an AwsException, do not nest it. + # Instead just propagate it up. + raise e else: # TODO: This might display all Exceptions to the user which # might be irrelevant, keeping it until it becomes stable - error_message = e.message + error_message = e.msg raise exceptions.AwsException(error_code="NeutronError", message=error_message) @@ -73,6 +79,7 @@ class AwsUtils: 'aws_access_key_id': cfg.CONF.AWS.access_key, 'region_name': cfg.CONF.AWS.region_name } + self._wait_time_sec = 60 * cfg.CONF.AWS.wait_time_min def get_nova_client(self): if self._nova_client is None: @@ -237,6 +244,10 @@ class AwsUtils: DryRun=dry_run, CidrBlock=cidr)['Vpc']['VpcId'] vpc = self._get_ec2_resource().Vpc(vpc_id) + waiter = self._get_ec2_client().get_waiter('vpc_available') + waiter.wait( + DryRun=dry_run, + VpcIds = [ vpc_id ]) vpc.create_tags(Tags=tags_list) return vpc_id @@ -258,8 +269,13 @@ class AwsUtils: def create_subnet_and_tags(self, vpc_id, cidr, tags_list, dry_run=False): vpc = self._get_ec2_resource().Vpc(vpc_id) subnet = vpc.create_subnet( + AvailabilityZone=cfg.CONF.AWS.az, DryRun=dry_run, CidrBlock=cidr) + waiter = self._get_ec2_client().get_waiter('subnet_available') + waiter.wait( + DryRun=dry_run, + SubnetIds = [ subnet.id ]) subnet.create_tags(Tags=tags_list) @aws_exception @@ -332,3 +348,185 @@ class AwsUtils: LOG.warning("Ignoring failure in creating default route to IG: %s" % e) if not ignore_errors: _process_exception(e, dry_run) + + # Has ignore_errors special case so can't use decorator + def delete_default_route_to_ig(self, route_table_id, dry_run=False, ignore_errors=False): + try: + self._get_ec2_client().delete_route( + DryRun=dry_run, + RouteTableId=route_table_id, + DestinationCidrBlock='0.0.0.0/0' + ) + except Exception as e: + if not ignore_errors: + _process_exception(e, dry_run) + else: + LOG.warning("Ignoring failure in deleting default route to IG: %s" % e) + + # Security group + def _create_sec_grp_tags(self, secgrp, tags): + def _wait_for_state(start_time): + current_time = time.time() + if current_time - start_time > self._wait_time_sec: + raise loopingcall.LoopingCallDone(False) + try: + secgrp.reload() + secgrp.create_tags(Tags=tags) + except Exception as ex: + LOG.exception('Exception when adding tags to security groups.' + ' Retrying.') + return + raise loopingcall.LoopingCallDone(True) + timer = loopingcall.FixedIntervalLoopingCall(_wait_for_state, time.time()) + return timer.start(interval=5).wait() + + def _convert_openstack_rules_to_vpc(self, rules): + ingress_rules = [] + egress_rules = [] + for rule in rules: + rule_dict = {} + if rule['protocol'] is None: + rule_dict['IpProtocol'] = '-1' + rule_dict['FromPort'] = -1 + rule_dict['ToPort'] = -1 + elif rule['protocol'].lower() == 'icmp': + rule_dict['IpProtocol'] = '1' + rule_dict['ToPort'] = '-1' + # AWS allows only 1 type of ICMP traffic in 1 rule + # we choose the smaller of the port_min and port_max values + icmp_rule = rule.get('port_range_min', '-1') + if not icmp_rule: + # allow all ICMP traffic rule + icmp_rule = '-1' + rule_dict['FromPort'] = icmp_rule + else: + rule_dict['IpProtocol'] = rule['protocol'] + if rule['port_range_min'] is None: + rule_dict['FromPort'] = 0 + else: + rule_dict['FromPort'] = rule['port_range_min'] + if rule['port_range_max'] is None: + rule_dict['ToPort'] = 65535 + else: + rule_dict['ToPort'] = rule['port_range_max'] + rule_dict['IpRanges'] = [] + if rule['remote_group_id'] is not None: + rule_dict['IpRanges'].append({ + 'CidrIp': rule['remote_group_id'] + }) + elif rule['remote_ip_prefix'] is not None: + rule_dict['IpRanges'].append({ + 'CidrIp': rule['remote_ip_prefix'] + }) + else: + if rule['direction'] == 'egress': + # OpenStack does not populate allow all egress rule + # with remote_group_id or remote_ip_prefix keys. + rule_dict['IpRanges'].append({ + 'CidrIp': '0.0.0.0/0' + }) + if rule['direction'] == 'egress': + egress_rules.append(rule_dict) + else: + ingress_rules.append(rule_dict) + return ingress_rules, egress_rules + + def _refresh_sec_grp_rules(self, secgrp, ingress, egress): + old_ingress = secgrp.ip_permissions + old_egress = secgrp.ip_permissions_egress + if old_ingress: + secgrp.revoke_ingress(IpPermissions=old_ingress) + if old_egress: + secgrp.revoke_egress(IpPermissions=old_egress) + secgrp.authorize_ingress(IpPermissions=ingress) + secgrp.authorize_egress(IpPermissions=egress) + + def _create_sec_grp_rules(self, secgrp, rules): + ingress, egress = self._convert_openstack_rules_to_vpc(rules) + def _wait_for_state(start_time): + current_time = time.time() + + if current_time - start_time > self._wait_time_sec: + raise loopingcall.LoopingCallDone(False) + try: + self._refresh_sec_grp_rules(secgrp, ingress, egress) + except Exception as ex: + LOG.exception('Error creating security group rules. Retrying.') + return + raise loopingcall.LoopingCallDone(True) + timer = loopingcall.FixedIntervalLoopingCall(_wait_for_state, + time.time()) + return timer.start(interval=5).wait() + + def create_security_group_rules(self, ec2_secgrp, rules): + if self._create_sec_grp_rules(ec2_secgrp, rules) is False: + exceptions.AwsException( + message='Timed out creating security groups', + error_code='Time Out') + + def create_security_group(self, name, description, vpc_id, os_secgrp_id, + tags): + if not description: + description = 'Created by Platform9 OpenStack' + secgrp = self._get_ec2_resource().create_security_group( + GroupName=name, Description=description, VpcId=vpc_id) + if self._create_sec_grp_tags(secgrp, tags) is False: + delete_sec_grp(secgrp.id) + raise exceptions.AwsException( + message='Timed out creating tags on security group', + error_code='Time Out') + return secgrp + + @aws_exception + def get_sec_group_by_id(self, secgrp_id, vpc_id = None, dry_run=False): + filters = [{ + 'Name': 'tag-value', + 'Values': [secgrp_id] + }] + if vpc_id: + filters.append({ + 'Name': 'vpc-id', + 'Values': [vpc_id] + }) + + response = self._get_ec2_client().describe_security_groups( + DryRun=dry_run, Filters=filters) + if 'SecurityGroups' in response: + return response['SecurityGroups'] + return [] + + @aws_exception + def delete_security_group(self, openstack_id): + aws_secgroups = self.get_sec_group_by_id(openstack_id) + for secgrp in aws_secgroups: + group_id = secgrp['GroupId'] + self.delete_security_group_by_id(group_id) + + @aws_exception + def delete_security_group_by_id(self, group_id): + ec2client = self._get_ec2_client() + ec2client.delete_security_group(GroupId=group_id) + + @aws_exception + def _update_sec_group(self, ec2_id, old_ingress, old_egress, new_ingress, + new_egress): + sg = self._get_ec2_resource().SecurityGroup(ec2_id) + sg.revoke_ingress(IpPermissions=old_ingress) + time.sleep(1) + sg.revoke_egress(IpPermissions=old_egress) + time.sleep(1) + sg.authorize_ingress(IpPermissions=new_ingress) + time.sleep(1) + sg.authorize_egress(IpPermissions=new_egress) + + @aws_exception + def update_sec_group(self, openstack_id, rules): + ingress, egress = self._convert_openstack_rules_to_vpc(rules) + aws_secgrps = self.get_sec_group_by_id(openstack_id) + for aws_secgrp in aws_secgrps: + old_ingress = aws_secgrp['IpPermissions'] + old_egress = aws_secgrp['IpPermissionsEgress'] + ec2_sg_id = aws_secgrp['GroupId'] + self._update_sec_group(ec2_sg_id, old_ingress, old_egress, ingress, + egress) + diff --git a/neutron/neutron/plugins/ml2/drivers/aws/callbacks.py b/neutron/neutron/plugins/ml2/drivers/aws/callbacks.py new file mode 100644 index 0000000..6a13e44 --- /dev/null +++ b/neutron/neutron/plugins/ml2/drivers/aws/callbacks.py @@ -0,0 +1,34 @@ +# Copyright 2016 Platform9 Systems Inc.(http://www.platform9.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.callbacks import events +from neutron.callbacks import exceptions +from neutron.callbacks import registry +from neutron.callbacks import resources +from oslo_log import log as logging + +LOG = logging.getLogger(__name__) + + +def subscribe(mech_driver): + registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP, + events.BEFORE_DELETE) + registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP, + events.BEFORE_UPDATE) + registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP_RULE, + events.BEFORE_DELETE) + registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP_RULE, + events.BEFORE_UPDATE) + registry.subscribe(mech_driver.secgroup_callback, resources.SECURITY_GROUP_RULE, + events.BEFORE_CREATE) diff --git a/neutron/neutron/plugins/ml2/drivers/aws/mechanism_aws.py b/neutron/neutron/plugins/ml2/drivers/aws/mechanism_aws.py index 80635ce..798bac9 100644 --- a/neutron/neutron/plugins/ml2/drivers/aws/mechanism_aws.py +++ b/neutron/neutron/plugins/ml2/drivers/aws/mechanism_aws.py @@ -13,9 +13,13 @@ # under the License. from oslo_log import log +from neutron import manager +from neutron.callbacks import resources +from neutron.callbacks import events from neutron.common.aws_utils import AwsUtils from neutron.common.exceptions import AwsException from neutron.plugins.ml2 import driver_api as api +from neutron.plugins.ml2.drivers.aws import callbacks import json import random @@ -25,9 +29,11 @@ class AwsMechanismDriver(api.MechanismDriver): """Ml2 Mechanism driver for AWS""" def __init__(self): self.aws_utils = None + super(AwsMechanismDriver, self).__init__() def initialize(self): self.aws_utils = AwsUtils() + callbacks.subscribe(self) # NETWORK def create_network_precommit(self, context): @@ -65,10 +71,13 @@ class AwsMechanismDriver(api.MechanismDriver): def create_subnet_precommit(self, context): LOG.info("Create subnet for network %s" % context.network.current['id']) # External Network doesn't exist on AWS, so no operations permitted - if 'provider:physical_network' in context.network.current and context.network.current['provider:physical_network'] == "external": - # Do not create subnets for external & provider networks. Only allow tenant network - # subnet creation at the moment. - return + if 'provider:physical_network' in context.network.current and \ + context.network.current['provider:physical_network'] == "external": + # Do not create subnets for external & provider networks. Only + # allow tenant network subnet creation at the moment. + LOG.info('Creating external network %s' % + context.network.current['id']) + return if context.current['ip_version'] == 6: raise AwsException(error_code="IPv6Error", message="Cannot create subnets with IPv6") @@ -84,14 +93,16 @@ class AwsMechanismDriver(api.MechanismDriver): vpc_cidr = context.current['cidr'][:-2] + '16' tags = [ {'Key': 'Name', 'Value': neutron_network['name']}, - {'Key': 'openstack_network_id', 'Value': neutron_network['id']} + {'Key': 'openstack_network_id', 'Value': neutron_network['id']}, + {'Key': 'openstack_tenant_id', 'Value': context.current['tenant_id']} ] associated_vpc_id = self.aws_utils.create_vpc_and_tags(cidr=vpc_cidr, tags_list=tags) # Create Subnet in AWS tags = [ {'Key': 'Name', 'Value': context.current['name']}, - {'Key': 'openstack_subnet_id', 'Value': context.current['id']} + {'Key': 'openstack_subnet_id', 'Value': context.current['id']}, + {'Key': 'openstack_tenant_id', 'Value': context.current['tenant_id']} ] self.aws_utils.create_subnet_and_tags(vpc_id=associated_vpc_id, cidr=context.current['cidr'], @@ -132,8 +143,11 @@ class AwsMechanismDriver(api.MechanismDriver): def delete_subnet_postcommit(self, context): neutron_network = context.network.current - if 'provider:physical_network' in context.network.current and context.network.current[ - 'provider:physical_network'] == "external": + if 'provider:physical_network' in context.network.current and \ + context.network.current['provider:physical_network'] == \ + "external": + LOG.info('Deleting %s external network' % + context.network.current['id']) return try: subnets = neutron_network['subnets'] @@ -171,7 +185,11 @@ class AwsMechanismDriver(api.MechanismDriver): if 'fixed_ips' in context.current: if len(context.current['fixed_ips']) > 0: fixed_ip_dict = context.current['fixed_ips'][0] - fixed_ip_dict['subnet_id'] = self.aws_utils.get_subnet_from_neutron_subnet_id(fixed_ip_dict['subnet_id']) + fixed_ip_dict['subnet_id'] = \ + self.aws_utils.get_subnet_from_neutron_subnet_id( + fixed_ip_dict['subnet_id']) + secgroup_ids = context.current['security_groups'] + self.create_security_groups_if_needed(context, secgroup_ids) segment_id = random.choice(context.network.network_segments)[api.ID] context.set_binding(segment_id, @@ -179,3 +197,82 @@ class AwsMechanismDriver(api.MechanismDriver): json.dumps(fixed_ip_dict), status='ACTIVE') return True + + def create_security_groups_if_needed(self, context, secgrp_ids): + core_plugin = manager.NeutronManager.get_plugin() + vpc_id = self.aws_utils.get_vpc_from_neutron_network_id( + context.current['network_id']) + for secgrp_id in secgrp_ids: + tags = [ + {'Key': 'openstack_id', 'Value': secgrp_id}, + {'Key': 'openstack_network_id', + 'Value': context.current['network_id']} + ] + secgrp = core_plugin.get_security_group(context._plugin_context, + secgrp_id) + aws_secgrp = self.aws_utils.get_sec_group_by_id(secgrp_id, + vpc_id=vpc_id) + if not aws_secgrp and secgrp['name'] != 'default': + grp_name = secgrp['name'] + desc = secgrp['description'] + rules = secgrp['security_group_rules'] + ec2_secgrp = self.aws_utils.create_security_group( + grp_name, desc, vpc_id, secgrp_id, tags) + self.aws_utils.create_security_group_rules(ec2_secgrp, rules) + + def delete_security_group(self, security_group_id): + self.aws_utils.delete_security_group(security_group_id) + + def remove_security_group_rule(self, context, rule_id): + core_plugin = manager.NeutronManager.get_plugin() + rule = core_plugin.get_security_group_rule(context, rule_id) + secgrp_id = rule['security_group_id'] + secgrp = core_plugin.get_security_group(context, secgrp_id) + old_rules = secgrp['security_group_rules'] + for idx in range(len(old_rules)-1, -1, -1): + if old_rules[idx]['id'] == rule_id: + old_rules.pop(idx) + self.aws_utils.update_sec_group(secgrp_id, old_rules) + + def add_security_group_rule(self, context, rule): + core_plugin = manager.NeutronManager.get_plugin() + secgrp_id = rule['security_group_id'] + secgrp = core_plugin.get_security_group(context, secgrp_id) + old_rules = secgrp['security_group_rules'] + old_rules.append(rule) + self.aws_utils.update_sec_group(secgrp_id, old_rules) + + def update_security_group_rules(self, context, rule_id): + core_plugin = manager.NeutronManager.get_plugin() + rule = core_plugin.get_security_group_rule(context, rule_id) + secgrp_id = rule['security_group_id'] + secgrp = core_plugin.get_security_group(context, secgrp_id) + old_rules = secgrp['security_group_rules'] + for idx in range(len(old_rules)-1, -1, -1): + if old_rules[idx]['id'] == rule_id: + old_rules.pop(idx) + break + old_rules.append(rule) + self.aws_utils.update_sec_group(secgrp_id, old_rules) + + def secgroup_callback(self, resource, event, trigger, **kwargs): + if resource == resources.SECURITY_GROUP: + if event == events.BEFORE_DELETE: + security_group_id = kwargs.get('security_group_id') + if security_group_id: + self.delete_security_group(security_group_id) + else: + LOG.warn('Security group ID not found in delete request') + elif resource == resources.SECURITY_GROUP_RULE: + context = kwargs['context'] + if event == events.BEFORE_CREATE: + rule = kwargs['security_group_rule'] + self.add_security_group_rule(context, rule) + elif event == events.BEFORE_DELETE: + rule_id = kwargs['security_group_rule_id'] + self.remove_security_group_rule(context, rule_id) + elif event == events.BEFORE_UPDATE: + rule_id = kwargs['security_group_rule_id'] + self.update_security_group_rules(context, rule_id) + + diff --git a/neutron/neutron/services/l3_router/aws_router_plugin.py b/neutron/neutron/services/l3_router/aws_router_plugin.py index 5ce4b61..df7d173 100644 --- a/neutron/neutron/services/l3_router/aws_router_plugin.py +++ b/neutron/neutron/services/l3_router/aws_router_plugin.py @@ -24,10 +24,11 @@ from neutron.plugins.common import constants from neutron.quota import resource_registry from neutron.services import service_base from oslo_log import log as logging -from neutron.common.aws_utils import AwsUtils from neutron.common import exceptions from neutron.db import securitygroups_db +from neutron.common.aws_utils import AwsUtils + LOG = logging.getLogger(__name__) class AwsRouterPlugin(service_base.ServicePluginBase, @@ -82,13 +83,22 @@ class AwsRouterPlugin(service_base.ServicePluginBase, port_id = floatingip['floatingip']['port_id'] self._associate_floatingip_to_port(context, public_ip_allocated, port_id) except Exception as e: - LOG.error("Error in Allocating EIP: %s " % e) + LOG.error("Error in Creation/Allocating EIP") + if public_ip_allocated: + LOG.error("Deleting Elastic IP: %s" % public_ip_allocated) + self.aws_utils.delete_elastic_ip(public_ip_allocated) raise e - return super(AwsRouterPlugin, self).create_floatingip( - context, floatingip, - initial_status=n_const.FLOATINGIP_STATUS_DOWN) - + try: + res = super(AwsRouterPlugin, self).create_floatingip( + context, floatingip, + initial_status=n_const.FLOATINGIP_STATUS_DOWN) + except Exception as e: + LOG.error("Error when adding floating ip in openstack. Deleting Elastic IP: %s" % public_ip_allocated) + self.aws_utils.delete_elastic_ip(public_ip_allocated) + raise e + return res + def _associate_floatingip_to_port(self, context, floating_ip_address, port_id): port = self._core_plugin.get_port(context, port_id) ec2_id = None @@ -123,15 +133,30 @@ class AwsRouterPlugin(service_base.ServicePluginBase, self._associate_floatingip_to_port(context, floating_ip_dict['floating_ip_address'], port_id) else: - # Port Disassociate - self.aws_utils.disassociate_elastic_ip_from_ec2_instance(floating_ip_dict['floating_ip_address']) + try: + # Port Disassociate + self.aws_utils.disassociate_elastic_ip_from_ec2_instance(floating_ip_dict['floating_ip_address']) + except exceptions.AwsException as e: + if 'Association ID not found' in e.msg: + # Since its already disassociated on EC2, we continue and remove the association here. + LOG.warn("Association for Elastic IP not found. Probable out of band change on EC2.") + elif 'InvalidAddress.NotFound' in e.msg: + LOG.warn("Elastic IP cannot be found in EC2. Probably removed out of band on EC2.") + else: + raise e return super(AwsRouterPlugin, self).update_floatingip(context, id, floatingip) def delete_floatingip(self, context, id): floating_ip = super(AwsRouterPlugin, self).get_floatingip(context, id) floating_ip_address = floating_ip['floating_ip_address'] LOG.info("Deleting elastic IP %s" % floating_ip_address) - self.aws_utils.delete_elastic_ip(floating_ip_address) + try: + self.aws_utils.delete_elastic_ip(floating_ip_address) + except exceptions.AwsException as e: + if 'InvalidAddress.NotFound' in e.msg: + LOG.warn("Elastic IP not found on AWS. Cleaning up neutron db") + else: + raise e return super(AwsRouterPlugin, self).delete_floatingip(context, id) ##### ROUTERS ##### @@ -211,6 +236,10 @@ class AwsRouterPlugin(service_base.ServicePluginBase, return super(AwsRouterPlugin, self).add_router_interface(context, router_id, interface_info) def remove_router_interface(self, context, router_id, interface_info): - LOG.info("Deleting subnet %s from router %s" % (interface_info['subnet_id'], router_id)) - # TODO: Need to delete the route entry in the Route Table of AWS + LOG.info("Deleting port %s from router %s" % (interface_info['port_id'], router_id)) + self.aws_utils.detach_internet_gateway_by_router_id(router_id) + route_tables = self.aws_utils.get_route_table_by_router_id(router_id) + if route_tables: + route_table_id = route_tables[0]['RouteTableId'] + self.aws_utils.delete_default_route_to_ig(route_table_id) return super(AwsRouterPlugin, self).remove_router_interface(context, router_id, interface_info)