# Copyright (c) 2016 Cisco Systems, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from abc import ABC, abstractmethod from collections import namedtuple import etcd import eventlet import eventlet.event import os from oslo_config import cfg from oslo_log import log as logging import re import time from networking_vpp import config_opts from networking_vpp import constants as nvpp_const from networking_vpp.db import db from networking_vpp import etcdutils from networking_vpp.ext_manager import ExtensionManager from networking_vpp.extension import MechDriverExtensionBase from neutron_lib.api.definitions import portbindings from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources import neutron_lib.constants as n_const from neutron_lib import context as n_context from neutron_lib.plugins import directory from neutron_lib.plugins.ml2 import api import neutron.conf.agent.securitygroups_rpc from neutron.db import provisioning_blocks LOG = logging.getLogger(__name__) class VPPMechanismDriver(api.MechanismDriver): supported_vnic_types = [portbindings.VNIC_NORMAL] MECH_NAME = 'vpp' def initialize(self): config_opts.register_vpp_opts(cfg.CONF) neutron.conf.agent.securitygroups_rpc.register_securitygroups_opts( cfg.CONF) self.allowed_network_types = cfg.CONF.ml2_vpp.network_types self.communicator = EtcdAgentCommunicator(self.port_bind_complete) names = names = cfg.CONF.ml2_vpp.driver_extensions if names != '': self.mgr = ExtensionManager( 'networking_vpp.driver.extensions', names, MechDriverExtensionBase) self.mgr.call_all('run', self.communicator) def get_vif_type(self, port_context): """Determine the type of the vif to be bound from port context""" # Default vif type vif_type = 'vhostuser' # We have to explicitly avoid binding agent ports - DHCP, # L3 etc. - as vhostuser. The interface code below takes # care of those. owner = port_context.current['device_owner'] for f in n_const.DEVICE_OWNER_PREFIXES: if owner.startswith(f): vif_type = 'tap' LOG.debug("vif_type to be bound is: %s", vif_type) return vif_type def bind_port(self, port_context): """Attempt to bind a port. :param port_context: PortContext instance describing the port This method is called outside any transaction to attempt to establish a port binding using this mechanism driver. Bindings may be created at each of multiple levels of a hierarchical network, and are established from the top level downward. At each level, the mechanism driver determines whether it can bind to any of the network segments in the port_context.segments_to_bind property, based on the value of the port_context.host property, any relevant port or network attributes, and its own knowledge of the network topology. At the top level, port_context.segments_to_bind contains the static segments of the port's network. At each lower level of binding, it contains static or dynamic segments supplied by the driver that bound at the level above. If the driver is able to complete the binding of the port to any segment in port_context.segments_to_bind, it must call port_context.set_binding with the binding details. If it can partially bind the port, it must call port_context.continue_binding with the network segments to be used to bind at the next lower level. If the binding results are committed after bind_port returns, they will be seen by all mechanism drivers as update_port_precommit and update_port_postcommit calls. But if some other thread or process concurrently binds or updates the port, these binding results will not be committed, and update_port_precommit and update_port_postcommit will not be called on the mechanism drivers with these results. Because binding results can be discarded rather than committed, drivers should avoid making persistent state changes in bind_port, or else must ensure that such state changes are eventually cleaned up. Implementing this method explicitly declares the mechanism driver as having the intention to bind ports. This is inspected by the QoS service to identify the available QoS rules you can use with ports. """ LOG.debug("Attempting to bind port %(port)s on " "network %(network)s", {'port': port_context.current['id'], 'network': port_context.network.current['id']}) vnic_type = port_context.current.get(portbindings.VNIC_TYPE, portbindings.VNIC_NORMAL) if vnic_type not in self.supported_vnic_types: LOG.debug("Refusing to bind due to unsupported " "vnic_type: %s", vnic_type) return for segment in port_context.segments_to_bind: if self.check_segment(segment, port_context.host): vif_details = {} # TODO(ijw) should be in a library that the agent uses vif_type = self.get_vif_type(port_context) if vif_type == 'vhostuser': vif_details['vhostuser_socket'] = \ os.path.join(cfg.CONF.ml2_vpp.vhost_user_dir, port_context.current['id']) vif_details['vhostuser_mode'] = 'server' LOG.debug('Setting details: %s', vif_details) port_context.set_binding(segment[api.ID], vif_type, vif_details) LOG.debug("Bind selected using segment: %s", segment) return def check_segment(self, segment, host): """Check if segment can be bound. :param segment: segment dictionary describing segment to bind :param host: host on which the segment must be bound to a port :returns: True iff segment can be bound for host """ # TODO(ijw): naive - doesn't check host, or configured # physnets on the host. Should work out if the binding # can't be achieved before accepting it network_type = segment[api.NETWORK_TYPE] if network_type not in self.allowed_network_types: LOG.debug( 'Network %(network_id)s is %(network_type)s, ' 'but this driver only supports types ' '%(allowed_network_types)s. ' 'The type must be supported if binding is to succeed.', {'network_id': segment['id'], 'network_type': network_type, 'allowed_network_types': ', '.join(self.allowed_network_types)} ) return False if network_type in [n_const.TYPE_FLAT, n_const.TYPE_VLAN]: physnet = segment[api.PHYSICAL_NETWORK] if not self.physnet_known(host, physnet): LOG.debug( 'Network %(network_id)s is on physical ' 'network %(physnet)s, but the physical network ' 'is not one the host %(host)s has attached.', {'network_id': segment['id'], 'physnet': physnet, 'host': host} ) return False return True def physnet_known(self, host, physnet): return self.communicator.find_physnet(host, physnet) def check_vlan_transparency(self, port_context): """Check if the network supports vlan transparency. :param port_context: NetworkContext instance describing the network. In general we do not support VLAN transparency (yet). """ return False def update_port_precommit(self, port_context): """Work to do, during DB commit, when updating a port If we are partially responsible for binding this port, we will have to tell our agents they have work to do. This is an operation within a distributed system and can therefore take time to complete, or potentially even fail. Instead, we log the requirement to a journal. """ # TODO(ijw): optimisation: the update port may leave the # binding state the same as before if someone updated # something other than the binding on the port, but this # way we always send it out and it's the far end's job to # ignore it. Doing less work is nevertheless good, so we # should in future avoid the send. # unbind port from old host, if already bound if port_context.original_binding_levels is not None: prev_bind = port_context.original_binding_levels[-1] if (prev_bind is not None and prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME and port_context.host != port_context.original_host): # Note that we skip this step if the change happens while # 'unbinding' and rebinding to the same host - it's probably # an update of extraneous detail and not really a request # that requires binding. self.communicator.unbind(port_context._plugin_context.session, port_context.original, port_context.original_host, prev_bind[api.BOUND_SEGMENT] ) # (Re)bind port to the new host, if it needs to be bound if port_context.binding_levels is not None: current_bind = port_context.binding_levels[-1] if (current_bind is not None and current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME): binding_type = self.get_vif_type(port_context) # Remove port membership from any previously associated # security groups for updating remote_security_group_id ACLs self.communicator.remove_port_from_remote_groups( port_context._plugin_context.session, port_context.original, port_context.current) self.communicator.bind(port_context._plugin_context.session, port_context.current, current_bind[api.BOUND_SEGMENT], port_context.host, binding_type) # TODO(ijW): The agent driver checks for a change of # host, but we're oddly seeing that the orig_host is # always set. Should confirm if this is a problem or # not. self._insert_provisioning_block(port_context) # If this is part of a successful live migration where the # target compute's nova has invoked update_port() binding the # port to the target compute and removing the 'migrating_to' # key from the binding:profile attribute then this is our # chance to remove etcd entries of the old port binding if (port_context.original is not None and port_context.original_host is not None and port_context.original_host != port_context.host and 'migrating_to' not in port_context.current['binding:profile'] and 'migrating_to' in port_context.original['binding:profile'] and port_context.original['binding:profile']['migrating_to'] == port_context.host): self.communicator.unbind( port_context._plugin_context.session, port_context.original, port_context.original_host, current_bind[api.BOUND_SEGMENT] ) def port_bind_complete(self, port_id, host): """Tell things that the port is truly bound. This is a callback called by the etcd communicator. """ LOG.debug('bind complete on %s', port_id) self._release_provisioning_block(host, port_id) def _insert_provisioning_block(self, context): # we insert a status barrier to prevent the port from transitioning # to active until the agent reports back that the wiring is done port = context.current if port['status'] == n_const.PORT_STATUS_ACTIVE: # no point in putting in a block if the status is already ACTIVE return provisioning_blocks.add_provisioning_component( context._plugin_context, port['id'], resources.PORT, provisioning_blocks.L2_AGENT_ENTITY) def _release_provisioning_block(self, host, port_id): context = n_context.get_admin_context() provisioning_blocks.provisioning_complete( context, port_id, resources.PORT, provisioning_blocks.L2_AGENT_ENTITY) def update_port_postcommit(self, port_context): """Work to do, post-DB commit, when updating a port After accepting an update_port, determine if we logged any work to do on the networking devices, and - if so - kick the update thread that tells our minions. """ # TODO(ijw): optimisation: the update port may leave the # binding state the same as before if someone updated # something other than the binding on the port, but this # way we always send it out and it's the far end's job to # ignore it. Doing less work is nevertheless good, so we # should in future avoid the send. if port_context.binding_levels is not None: current_bind = port_context.binding_levels[-1] if port_context.original_binding_levels is None: prev_bind = None else: prev_bind = port_context.original_binding_levels[-1] if (current_bind is not None and current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME): self.communicator.kick() elif (prev_bind is not None and prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME): self.communicator.kick() def delete_port_precommit(self, port_context): port = port_context.current host = port_context.host # NB: Host is typically '' if the port is not bound # A port can be in an invalid state with a host context if host and port_context.binding_levels: segment = port_context.binding_levels[-1][api.BOUND_SEGMENT] self.communicator.unbind(port_context._plugin_context.session, port, host, segment) def delete_port_postcommit(self, port_context): self.communicator.kick() class AgentCommunicator(ABC): @abstractmethod def bind(self, port, segment, host, binding_type): pass @abstractmethod def unbind(self, port, host): pass # Our prefix for etcd keys, in case others are using etcd. LEADIN = nvpp_const.LEADIN # TODO(ijw): make configurable? # Model for representing a security group SecurityGroup = namedtuple( 'SecurityGroup', ['id', 'ingress_rules', 'egress_rules'] ) # Model for a VPP security group rule SecurityGroupRule = namedtuple( 'SecurityGroupRule', ['is_ipv6', 'remote_ip_addr', 'ip_prefix_len', 'remote_group_id', 'protocol', 'port_min', 'port_max'] ) class JournalManager(object): """Perform journal management tasks. Handles the post-journal part of the work that clears out the table and updates etcd. Also kicks the forward worker threads that is active and tells it that it has work to do. """ def __init__(self): self.client_factory = etcdutils.EtcdClientFactory(cfg.CONF.ml2_vpp) self.election_key_space = LEADIN + '/election' self.journal_kick_key = self.election_key_space + '/kick-journal' LOG.debug('Journal manager init complete') def kick(self): # A thread in one Neutron process - possibly not this one - # is waiting to send updates from the DB to etcd. Wake it. try: # TODO(ijw): got to be a more efficient way to create # a client for each thread # From a Neutron thread, we need to tell whichever of the # forwarder threads that is active that it has work to do. self.client_factory.client().write(self.journal_kick_key, '') except etcd.EtcdException: # A failed wake is not the end of the world. pass ###################################################################### # The post-journal part of the work that clears out the table and # updates etcd. def make_forward_worker(self): # Assign a UUID to each worker thread to enable thread election return eventlet.spawn(self._forward_worker) def do_etcd_update(self, etcd_writer, k, v): with eventlet.Timeout(cfg.CONF.ml2_vpp.etcd_write_time, False): if v is None: etcd_writer.delete(k) else: etcd_writer.write(k, v) def _forward_worker(self): LOG.debug('forward worker begun') etcd_client = self.client_factory.client() etcd_writer = etcdutils.json_writer(etcd_client) lease_time = cfg.CONF.ml2_vpp.forward_worker_master_lease_time recovery_time = cfg.CONF.ml2_vpp.forward_worker_recovery_time etcd_election = etcdutils.EtcdElection(etcd_client, 'forward_worker', self.election_key_space, work_time=lease_time, recovery_time=recovery_time) while True: # Try indefinitely to regain the mastery of this thread pool. Most # threads will be sitting here etcd_election.wait_until_elected() try: # Master loop - as long as we are master and can # maintain it, process incoming events. # Every long running section is preceded by extending # mastership of the thread pool and followed by # confirmation that we still have mastership (usually # by a further extension). def work(k, v): self.do_etcd_update(etcd_writer, k, v) # We will try to empty the pending rows in the DB while True: etcd_election.extend_election( cfg.CONF.ml2_vpp.db_query_time) session = n_context.get_admin_context().session maybe_more = db.journal_read(session, work) if not maybe_more: LOG.debug('forward worker has emptied journal') etcd_election.extend_election(lease_time) break # work queue is now empty. # Wait to be kicked, or (in case of emergency) run # every few seconds in case another thread or process # dumped work and failed to get notification to us to # process it. with eventlet.Timeout(lease_time + 1, False): etcd_election.extend_election(lease_time) try: etcd_client.watch(self.journal_kick_key, timeout=lease_time) except etcd.EtcdException: # Check the DB queue now, anyway pass except etcdutils.EtcdElectionLost: # We are no longer master pass except Exception as e: # TODO(ijw): log exception properly LOG.warning("problems in forward worker - Error name is %s. " "proceeding without quiting", type(e).__name__) LOG.warning("Exception in forward_worker: %s", e) # something went bad; breathe, in case we end # up in a tight loop time.sleep(1) # never quit pass ###################################################################### class EtcdAgentCommunicator(AgentCommunicator, JournalManager): """Comms unit for etcd This will talk to etcd and tell it what is going on with the Neutron database. etcd can be run as a cluster and so shouldn't die, but can be unreachable, so this class's job is to ensure that all updates are forwarded to etcd in order even when things are not quite going as planned. In etcd, the layout is: # Port Space LEADIN/nodes - subdirs are compute nodes LEADIN/nodes/X/ports - entries are JSON-ness containing all information on each bound port on the compute node. (Unbound ports are homeless, so the act of unbinding is the deletion of this entry.) # Global Space LEADIN/global/secgroups/<security-group-id> - entries are security-group keys whose values contain all the rule data LEADIN/global/networks/gpe/<vni>/<hostname>/<mac>/<ip-address> - Entries contain GPE data such as the VNI, hostname, instance's mac & IP address and key value is the underlay IP address of the compute node LEADIN/global/remote_group/<group-id>/<port-id> contain the IP addresses of ports in a security-group # State Space LEADIN/state/X - return state of the VPP LEADIN/state/X/alive - heartbeat back LEADIN/state/X/ports - port information. LEADIN/state/X/physnets - physnets on node Specifically a key here (regardless of value) indicates the port has been bound and is receiving traffic. """ def __init__(self, notify_bound): super(EtcdAgentCommunicator, self).__init__() LOG.debug("Using etcd host:%s port:%s user:%s", cfg.CONF.ml2_vpp.etcd_host, cfg.CONF.ml2_vpp.etcd_port, cfg.CONF.ml2_vpp.etcd_user) # This is a function that is called when a port has been # notified from the agent via etcd as completely attached. # We call this when we're certain that the VPP on the far end # has definitely bound the port, and has dropped a vhost-user # socket where it can be found. # This is more important than it seems, becaus libvirt will # hang, because qemu ignores its monitor port, when qemu is # waiting for a partner to connect with on its vhost-user # interfaces. It can't start the VM - that requires # information from its partner it can't guess at - but it # shouldn't hang the monitor - nevertheless... So we notify # when the port is there and ready, and qemu is never put into # this state by Nova. self.notify_bound = notify_bound # We need certain directories to exist self.state_key_space = LEADIN + '/state' self.port_key_space = LEADIN + '/nodes' self.secgroup_key_space = LEADIN + '/global/secgroups' self.remote_group_key_space = LEADIN + '/global/remote_group' self.gpe_key_space = LEADIN + '/global/networks/gpe' etcd_client = self.client_factory.client() etcd_helper = etcdutils.EtcdHelper(etcd_client) etcd_helper.ensure_dir(self.state_key_space) etcd_helper.ensure_dir(self.port_key_space) etcd_helper.ensure_dir(self.secgroup_key_space) etcd_helper.ensure_dir(self.election_key_space) etcd_helper.ensure_dir(self.remote_group_key_space) self.secgroup_enabled = cfg.CONF.SECURITYGROUP.enable_security_group if self.secgroup_enabled: self.register_secgroup_event_handler() # TODO(ijw): .../state/<host> lists all known hosts, and they # heartbeat when they're functioning # From this point on, there are multiple threads: ensure that # we don't re-use the etcd_client from multiple threads # simultaneously etcd_helper = None etcd_client = None registry.subscribe(self.start_threads, resources.PROCESS, events.AFTER_SPAWN) def start_threads(self, resource, event, trigger, payload=None): LOG.debug('Starting background threads for Neutron worker') self.return_thread = self.make_return_worker() self.forward_thread = self.make_forward_worker() def find_physnet(self, host, physnet): """Identify if an agent can connect to the physical network This can fail if the agent hasn't been configured for that physnet, or if the agent isn't alive. """ # TODO(ijw): we use an on-the-fly created client so that we # don't have threading problems from the caller. try: etcd_client = self.client_factory.client() etcd_client.read('%s/state/%s/alive' % (LEADIN, host)) etcd_client.read('%s/state/%s/physnets/%s' % (LEADIN, host, physnet)) except etcd.EtcdKeyNotFound: return False return True def register_secgroup_event_handler(self): """Subscribe a handler to process secgroup change notifications We're interested in PRECOMMIT_xxx (where we store the changes to etcd in the journal table) and AFTER_xxx (where we remind the worker thread it may have work to do). """ LOG.info("Security groups feature is enabled") # NB security group rules cannot be updated, and security # groups themselves have no forwarder state in them, so we # don't need the update events # register pre-commit events # security group precommit events registry.subscribe(self.process_secgroup_commit, resources.SECURITY_GROUP, events.PRECOMMIT_CREATE) registry.subscribe(self.process_secgroup_commit, resources.SECURITY_GROUP, events.PRECOMMIT_DELETE) # security group rule precommit events registry.subscribe(self.process_secgroup_commit, resources.SECURITY_GROUP_RULE, events.PRECOMMIT_CREATE) registry.subscribe(self.process_secgroup_commit, resources.SECURITY_GROUP_RULE, events.PRECOMMIT_DELETE) # register post-commit events # security group post commit events registry.subscribe(self.process_secgroup_after, resources.SECURITY_GROUP, events.AFTER_CREATE) registry.subscribe(self.process_secgroup_after, resources.SECURITY_GROUP, events.AFTER_DELETE) # security group rule post commit events registry.subscribe(self.process_secgroup_after, resources.SECURITY_GROUP_RULE, events.AFTER_CREATE) registry.subscribe(self.process_secgroup_after, resources.SECURITY_GROUP_RULE, events.AFTER_DELETE) def process_secgroup_after(self, resource, event, trigger, **kwargs): """Callback for handling security group/rule commit-complete events This is when we should tell other things that a change has happened and has been recorded permanently in the DB. """ # Whatever the object that caused this, we've put something # in the journal and now need to nudge the communicator self.kick() def process_secgroup_commit(self, resource, event, trigger, **kwargs): """Callback for handling security group/rule commit events This is the time at which we should be committing any of our own auxiliary changes to the DB. """ LOG.debug("Received event %s notification for resource" " %s with kwargs %s", event, resource, kwargs) context = kwargs['context'] sgid = None deleted_rule_id = None added_rule = {} if resource == resources.SECURITY_GROUP: if event == events.PRECOMMIT_DELETE: self.delete_secgroup_from_etcd(context.session, kwargs['security_group_id']) # Job done. return # All other event types drop out to the update-the-key # code at the bottom of the function. elif event == events.PRECOMMIT_CREATE: # Also, the SG passed to us is what comes in from the user. We # require what went into the DB (where we added a UUID to it), # so we have to get that from the record of changed and as yet # uncommitted objects. sgid = kwargs[resource]['id'] # The default rules are supplied directly in the context rules = kwargs[resource]['security_group_rules'] elif resource == resources.SECURITY_GROUP_RULE: # We store security groups with a composite of all their # rules. So in this case we track down the affected # rule and update its entire data. # NB: rules are never updated. if event == events.PRECOMMIT_DELETE: deleted_rule_id = kwargs['security_group_rule_id'] sgid = kwargs['security_group_id'] elif event == events.PRECOMMIT_CREATE: # Groups don't have the same UUID problem - we're not # using their UUID, we're using their SG's, which must # be present. added_rule = kwargs['security_group_rule'] sgid = added_rule['security_group_id'] # Start by getting the rules the DB knows about rules = self.get_rules_for_secgroup(sgid, context) # Adding a group or changing rules results in the same behaviour: # rewrite the key in etcd. # If we're in the precommit part, we may have deleted rules in this # list and we should exclude them. We also, cautiously, exclude any # added rule as well, though it should not have made it into the DB. # If either is unset it's None, and != None is not a test that will # pass. rules = [r for r in rules if r['id'] != deleted_rule_id and r['id'] != added_rule.get('id')] if added_rule: rules.append(added_rule) # Get the full details of the secgroup in etcd-exchange format secgroup = self.get_secgroup_from_rules(sgid, rules) # Write security group data to etcd self.send_secgroup_to_agents(context.session, secgroup) def get_rules_for_secgroup(self, sgid, context): plugin = directory.get_plugin() rules = plugin.get_security_group_rules( context, filters={'security_group_id': [sgid]} ) return rules def get_secgroup_rule(self, rule_id, context): """Fetch and return a security group rule from Neutron DB""" plugin = directory.get_plugin() return plugin.get_security_group_rule(context, rule_id) def get_secgroup_from_rules(self, sgid, rules): """Build and return a security group namedtuple object. This object is the format with which we exchange data with the agents, and can be written in this form to etcd. Arguments: sgid - ID of the security group rules - A list of security group rules as returned from the DB 1. Filter rules using the input param: sgid to ensure that rules belong to that group 2. Split rules based on direction. 3. Construct and return the SecurityGroup namedtuple. """ # A generator object of security group rules for sgid sg_rules = (r for r in rules if r['security_group_id'] == sgid) # A list of ingress and egress namedtuple rule objects ingress_rules = [] egress_rules = [] for r in sg_rules: if r['direction'] == 'ingress': ingress_rules.append(self._neutron_rule_to_vpp_acl(r)) else: egress_rules.append(self._neutron_rule_to_vpp_acl(r)) return SecurityGroup(sgid, ingress_rules, egress_rules) # Neutron supports the following IP protocols by name protocols = {'tcp': 6, 'udp': 17, 'icmp': 1, 'icmpv6': 58, 'ah': 51, 'dccp': 33, 'egp': 8, 'esp': 50, 'gre': 47, 'igmp': 2, 'ipv6-encap': 41, 'ipv6-frag': 44, 'ipv6-icmp': 58, 'ipv6-nonxt': 59, 'ipv6-opts': 60, 'ipv6-route': 43, 'ospf': 89, 'pgm': 113, 'rsvp': 46, 'sctp': 132, 'udplite': 136, 'vrrp': 112} def _neutron_rule_to_vpp_acl(self, rule): """Convert a neutron rule to vpp_acl rule. Arguments: 1. rule -- represents a neutron rule - Convert the neutron rule to a vpp_acl rule model - Return the SecurityGroupRule namedtuple. """ is_ipv6 = 0 if rule['ethertype'] == 'IPv4' else 1 if rule['protocol'] is None: # Neutron uses None to represent any protocol # We use 0 to represent any protocol protocol = 0 elif rule['protocol'] in self.protocols: # VPP rules require IANA protocol numbers # Convert input accordingly. protocol = self.protocols[rule['protocol']] else: # Convert incoming string value to an integer protocol = int(rule['protocol']) if is_ipv6 and protocol == self.protocols['icmp']: protocol = self.protocols['icmpv6'] # Neutron represents any ip address by setting # both the remote_ip_prefix and remote_group_id fields to None # VPP uses all zeros to represent any Ipv4/IpV6 address # In a neutron security group rule, you can either set the # remote_ip_prefix or remote_group_id but not both. # When a remote_ip_prefix value is set, the remote_group_id # is ignored and vice versa. If both the attributes are unset, # any remote_ip_address is permitted. if rule['remote_ip_prefix']: remote_ip_addr, ip_prefix_len = rule['remote_ip_prefix' ].split('/') ip_prefix_len = int(ip_prefix_len) # Set the required attribute, referenced by the SecurityGroupRule # tuple, remote_group_id to None. remote_group_id = None elif rule['remote_group_id']: remote_group_id = rule['remote_group_id'] # Set remote_ip_addr and ip_prefix_len to empty values # as it is a required attribute referenced by the # SecurityGroupRule tuple. When the remote_ip_addr value is set # to None, the vpp-agent ignores it and looks at the # remote-group-id. One of these attributes must be set to a valid # value. remote_ip_addr, ip_prefix_len = None, 0 else: # In neutron, when both the remote_ip_prefix and remote-group-id # are set to None, it implies permit any. But we need to set a # valid value for the remote_ip_address attribute to tell the # vpp-agent. remote_ip_addr = '0.0.0.0' if not is_ipv6 else '::' ip_prefix_len = 0 # Set the required attribute in the SecurityGroupRule tuple # remote_group_id to None. remote_group_id = None # Neutron uses -1 or None or 0 to represent all ports # VPP uses 0-65535 for all tcp/udp ports, Use -1 to represent all # ranges for ICMP types and codes if rule['port_range_min'] == -1 or not rule['port_range_min']: # Valid TCP/UDP port ranges for TCP(6), UDP(17) and UDPLite(136) if protocol in [6, 17, 136]: port_min, port_max = (0, 65535) # A Value of -1 represents all ICMP/ICMPv6 types & code ranges elif protocol in [1, 58]: port_min, port_max = (-1, -1) # Ignore port_min and port_max fields as other protocols don't # use them else: port_min, port_max = (0, 0) else: port_min, port_max = (rule['port_range_min'], rule['port_range_max']) # Handle a couple of special ICMP cases if protocol in [1, 58]: # All ICMP types and codes if rule['port_range_min'] is None: port_min, port_max = (-1, -1) # All codes for a specific type elif rule['port_range_max'] is None: port_min, port_max = (rule['port_range_min'], -1) sg_rule = SecurityGroupRule(is_ipv6, remote_ip_addr, ip_prefix_len, remote_group_id, protocol, port_min, port_max) return sg_rule def send_secgroup_to_agents(self, session, secgroup): """Writes a secgroup to the etcd secgroup space Does this via the journal as part of the commit, so that the write is atomic with the DB commit to the Neutron tables. Arguments: session -- the DB session with an open transaction secgroup -- Named tuple representing a SecurityGroup """ secgroup_path = self._secgroup_path(secgroup.id) # sg is a dict of of ingress and egress rule lists sg = {} ingress_rules = [] egress_rules = [] for ingress_rule in secgroup.ingress_rules: ingress_rules.append(ingress_rule._asdict()) for egress_rule in secgroup.egress_rules: egress_rules.append(egress_rule._asdict()) sg['ingress_rules'] = ingress_rules sg['egress_rules'] = egress_rules db.journal_write(session, secgroup_path, sg) def delete_secgroup_from_etcd(self, session, secgroup_id): """Deletes the secgroup key from etcd Arguments: secgroup_id -- The id of the security group that we want to delete """ secgroup_path = self._secgroup_path(secgroup_id) # Delete the security-group from remote-groups remote_group_path = self._remote_group_path(secgroup_id, '') db.journal_write(session, secgroup_path, None) db.journal_write(session, remote_group_path, None) def _secgroup_path(self, secgroup_id): return self.secgroup_key_space + "/" + secgroup_id def _port_path(self, host, port): return self.port_key_space + "/" + host + "/ports/" + port['id'] # TODO(najoy): Move all security groups related code to a dedicated # module def _remote_group_path(self, secgroup_id, port_id): return self.remote_group_key_space + "/" + secgroup_id + "/" + port_id def _gpe_remote_path(self, host, port, segmentation_id): ip_addrs = port.get('fixed_ips', []) gpe_dir = self.gpe_key_space + "/" + str(segmentation_id) + "/" + \ host + "/" + port['mac_address'] if ip_addrs and segmentation_id: # Delete all GPE keys and the empty GPE directory itself in etcd return [gpe_dir + "/" + ip_address['ip_address'] for ip_address in ip_addrs] + [gpe_dir] else: return [] # A remote_group_path is qualified with a port ID because the agent uses # the port ID to keep track of the dynamic set of ports associated with # the remote_group_id. The value of this key is the # list of IP addresses allocated to that port by neutron. # Using the above two pieces of information, the vpp-agent # computes the complete set of IP addresses belonging to a # remote_group_id and uses it to expand the rule when a remote_group_id # attribute is specified. The expansion is performed by computing a # product using all the IP addresses of the ports in the remote_group # and the remaining attributes of the rule. def _remote_group_paths(self, port): security_groups = port.get('security_groups', []) return [self._remote_group_path(secgroup_id, port['id']) for secgroup_id in security_groups] ###################################################################### # These functions use a DB journal to log messages before # they're put into etcd, as that can take time and may not always # succeed (so is a bad candidate for calling within or after a # transaction). def bind(self, session, port, segment, host, binding_type): # NB segmentation_id is not optional in the wireline protocol, # we just pass 0 for unsegmented network types data = { 'mac_address': port['mac_address'], 'mtu': 1500, # not this, but what?: port['mtu'], 'physnet': segment[api.PHYSICAL_NETWORK], 'network_type': segment[api.NETWORK_TYPE], 'segmentation_id': segment.get(api.SEGMENTATION_ID, 0), 'binding_type': binding_type, 'security_groups': port.get('security_groups', []), 'allowed_address_pairs': port.get('allowed_address_pairs', []), 'fixed_ips': port.get('fixed_ips', []), 'port_security_enabled': port.get('port_security_enabled', True), # Non-essential, but useful for monitoring and debug: 'device_id': port.get('device_id', None) } LOG.debug("Queueing bind request for port:%s, " "segment:%s, host:%s, type:%s", port, data['segmentation_id'], host, data['binding_type']) db.journal_write(session, self._port_path(host, port), data) # For tracking ports in a remote_group, create a journal entry in # the remote-group key-space. # This will result in the creation of an etcd key with the port ID # and it's security-group-id. The value is the list of IP addresses. # For details on how the agent thread handles the remote-group # watch events refer to the doc under RemoteGroupWatcher in the # server module. for remote_group_path in self._remote_group_paths(port): db.journal_write(session, remote_group_path, [item['ip_address'] for item in data['fixed_ips']]) self.kick() def unbind(self, session, port, host, segment): # GPE requires segmentation ID for removing its etcd keys segmentation_id = segment.get(api.SEGMENTATION_ID, 0) LOG.debug("Queueing unbind request for port:%s, host:%s, segment:%s", port, host, segmentation_id) # When a port is unbound, this journal entry will delete the # port key (and hence it's ip address value) from etcd. The behavior # is like removing the port IP address(es) from the # remote-group. The agent will receive a watch notification and # update the ACL rules to remove the IP(s) from the rule. # Other port IP addresses associated with the remote-group-id will # remain in the rule (as it should be). # A hypothetical alternate implementation - If we made the remote key # of a secgroup, a list of IPs, we could refresh the content of # every secgroup containing this port. # It makes the sender's work harder, and receiver's work easier # In the sender, we need keep track of IPs during port binds, unbinds, # security-group associations and updates. However, algorithmically # this alternate impl. won't be better what we have now i.e. O(n). for remote_group_path in self._remote_group_paths(port): db.journal_write(session, remote_group_path, None) db.journal_write(session, self._port_path(host, port), None) # Remove all GPE remote keys from etcd, for this port for gpe_remote_path in self._gpe_remote_path(host, port, segmentation_id): db.journal_write(session, gpe_remote_path, None) self.kick() def remove_port_from_remote_groups(self, session, original_port, current_port): """Remove ports from remote groups when port security is updated.""" removed_sec_groups = set(original_port['security_groups']) - set( current_port['security_groups']) for secgroup_id in removed_sec_groups: db.journal_write(session, self._remote_group_path(secgroup_id, current_port['id']), None) self.kick() def make_return_worker(self): """The thread that manages data returned from agents via etcd.""" # TODO(ijw): agents and physnets should be checked before a bind # is accepted # Note that the initial load is done before spawning the background # watcher - this means that we're prepared with the information # to accept bind requests. class ReturnWatcher(etcdutils.EtcdChangeWatcher): def __init__(self, etcd_client, name, watch_path, election_path=None, data=None): super(ReturnWatcher, self).__init__(etcd_client, name, watch_path, election_path, wait_until_elected=True, data=data) # Every key changes on a restart, which has the # useful effect of resending all Nova notifications # for 'port bound' events based on existing state. def added(self, key, value): # Matches a port key, gets host and uuid m = re.match('^([^/]+)/ports/([^/]+)$', key) if m: host = m.group(1) port = m.group(2) self.data.notify_bound(port, host) else: # Matches an agent, gets a liveness notification m = re.match(self.data.state_key_space + '^([^/]+)/alive$', key) if m: # TODO(ijw): this should be fed into the agents # table. host = m.group(1) LOG.info('host %s is alive', host) def removed(self, key): # Nova doesn't much care when ports go away. # Matches an agent, gets a liveness notification m = re.match(self.data.state_key_space + '^([^/]+)/alive$', key) if m: # TODO(ijw): this should be fed into the agents # table. host = m.group(1) LOG.info('host %s has died', host) # Assign a UUID to each worker thread to enable thread election return eventlet.spawn( ReturnWatcher(self.client_factory.client(), 'return_worker', self.state_key_space, self.election_key_space, data=self).watch_forever)