# Copyright (c) 2016 Cisco Systems, Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from abc import ABC, abstractmethod
from collections import namedtuple
import etcd
import eventlet
import eventlet.event
import os
from oslo_config import cfg
from oslo_log import log as logging
import re
import time

from networking_vpp import config_opts
from networking_vpp import constants as nvpp_const
from networking_vpp.db import db
from networking_vpp import etcdutils
from networking_vpp.ext_manager import ExtensionManager
from networking_vpp.extension import MechDriverExtensionBase

from neutron_lib.api.definitions import portbindings
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
import neutron_lib.constants as n_const
from neutron_lib import context as n_context
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api

import neutron.conf.agent.securitygroups_rpc

from neutron.db import provisioning_blocks


LOG = logging.getLogger(__name__)


class VPPMechanismDriver(api.MechanismDriver):
    supported_vnic_types = [portbindings.VNIC_NORMAL]

    MECH_NAME = 'vpp'

    def initialize(self):
        config_opts.register_vpp_opts(cfg.CONF)
        neutron.conf.agent.securitygroups_rpc.register_securitygroups_opts(
            cfg.CONF)

        self.allowed_network_types = cfg.CONF.ml2_vpp.network_types

        self.communicator = EtcdAgentCommunicator(self.port_bind_complete)

        names = names = cfg.CONF.ml2_vpp.driver_extensions
        if names != '':
            self.mgr = ExtensionManager(
                'networking_vpp.driver.extensions',
                names,
                MechDriverExtensionBase)
            self.mgr.call_all('run', self.communicator)

    def get_vif_type(self, port_context):
        """Determine the type of the vif to be bound from port context"""

        # Default vif type
        vif_type = 'vhostuser'

        # We have to explicitly avoid binding agent ports - DHCP,
        # L3 etc. - as vhostuser. The interface code below takes
        # care of those.

        owner = port_context.current['device_owner']
        for f in n_const.DEVICE_OWNER_PREFIXES:
            if owner.startswith(f):
                vif_type = 'tap'
        LOG.debug("vif_type to be bound is: %s", vif_type)
        return vif_type

    def bind_port(self, port_context):
        """Attempt to bind a port.

        :param port_context: PortContext instance describing the port

        This method is called outside any transaction to attempt to
        establish a port binding using this mechanism driver. Bindings
        may be created at each of multiple levels of a hierarchical
        network, and are established from the top level downward. At
        each level, the mechanism driver determines whether it can
        bind to any of the network segments in the
        port_context.segments_to_bind property, based on the value of the
        port_context.host property, any relevant port or network
        attributes, and its own knowledge of the network topology. At
        the top level, port_context.segments_to_bind contains the static
        segments of the port's network. At each lower level of
        binding, it contains static or dynamic segments supplied by
        the driver that bound at the level above. If the driver is
        able to complete the binding of the port to any segment in
        port_context.segments_to_bind, it must call port_context.set_binding
        with the binding details. If it can partially bind the port,
        it must call port_context.continue_binding with the network
        segments to be used to bind at the next lower level.

        If the binding results are committed after bind_port returns,
        they will be seen by all mechanism drivers as
        update_port_precommit and update_port_postcommit calls. But if
        some other thread or process concurrently binds or updates the
        port, these binding results will not be committed, and
        update_port_precommit and update_port_postcommit will not be
        called on the mechanism drivers with these results. Because
        binding results can be discarded rather than committed,
        drivers should avoid making persistent state changes in
        bind_port, or else must ensure that such state changes are
        eventually cleaned up.

        Implementing this method explicitly declares the mechanism
        driver as having the intention to bind ports. This is inspected
        by the QoS service to identify the available QoS rules you
        can use with ports.
        """
        LOG.debug("Attempting to bind port %(port)s on "
                  "network %(network)s",
                  {'port': port_context.current['id'],
                   'network': port_context.network.current['id']})
        vnic_type = port_context.current.get(portbindings.VNIC_TYPE,
                                             portbindings.VNIC_NORMAL)
        if vnic_type not in self.supported_vnic_types:
            LOG.debug("Refusing to bind due to unsupported "
                      "vnic_type: %s",
                      vnic_type)
            return

        for segment in port_context.segments_to_bind:
            if self.check_segment(segment, port_context.host):
                vif_details = {}
                # TODO(ijw) should be in a library that the agent uses
                vif_type = self.get_vif_type(port_context)
                if vif_type == 'vhostuser':
                    vif_details['vhostuser_socket'] = \
                        os.path.join(cfg.CONF.ml2_vpp.vhost_user_dir,
                                     port_context.current['id'])
                    vif_details['vhostuser_mode'] = 'server'
                LOG.debug('Setting details: %s', vif_details)
                port_context.set_binding(segment[api.ID],
                                         vif_type,
                                         vif_details)
                LOG.debug("Bind selected using segment: %s", segment)
                return

    def check_segment(self, segment, host):
        """Check if segment can be bound.

        :param segment: segment dictionary describing segment to bind
        :param host: host on which the segment must be bound to a port
        :returns: True iff segment can be bound for host

        """

        # TODO(ijw): naive - doesn't check host, or configured
        # physnets on the host.  Should work out if the binding
        # can't be achieved before accepting it

        network_type = segment[api.NETWORK_TYPE]
        if network_type not in self.allowed_network_types:
            LOG.debug(
                'Network %(network_id)s is %(network_type)s, '
                'but this driver only supports types '
                '%(allowed_network_types)s. '
                'The type must be supported  if binding is to succeed.',
                {'network_id': segment['id'],
                 'network_type': network_type,
                 'allowed_network_types':
                 ', '.join(self.allowed_network_types)}
            )
            return False

        if network_type in [n_const.TYPE_FLAT,
                            n_const.TYPE_VLAN]:
            physnet = segment[api.PHYSICAL_NETWORK]
            if not self.physnet_known(host, physnet):
                LOG.debug(
                    'Network %(network_id)s is on physical '
                    'network %(physnet)s, but the physical network '
                    'is not one the host %(host)s has attached.',
                    {'network_id': segment['id'],
                     'physnet': physnet,
                     'host': host}
                )
                return False

        return True

    def physnet_known(self, host, physnet):
        return self.communicator.find_physnet(host, physnet)

    def check_vlan_transparency(self, port_context):
        """Check if the network supports vlan transparency.

        :param port_context: NetworkContext instance describing the network.

        In general we do not support VLAN transparency (yet).
        """
        return False

    def update_port_precommit(self, port_context):
        """Work to do, during DB commit, when updating a port

        If we are partially responsible for binding this port, we will
        have to tell our agents they have work to do.  This is an
        operation within a distributed system and can therefore take
        time to complete, or potentially even fail.  Instead, we log
        the requirement to a journal.
        """
        # TODO(ijw): optimisation: the update port may leave the
        # binding state the same as before if someone updated
        # something other than the binding on the port, but this
        # way we always send it out and it's the far end's job to
        # ignore it.  Doing less work is nevertheless good, so we
        # should in future avoid the send.

        # unbind port from old host, if already bound
        if port_context.original_binding_levels is not None:
            prev_bind = port_context.original_binding_levels[-1]

            if (prev_bind is not None and
                    prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME and
                    port_context.host != port_context.original_host):

                # Note that we skip this step if the change happens while
                # 'unbinding' and rebinding to the same host - it's probably
                # an update of extraneous detail and not really a request
                # that requires binding.

                self.communicator.unbind(port_context._plugin_context.session,
                                         port_context.original,
                                         port_context.original_host,
                                         prev_bind[api.BOUND_SEGMENT]
                                         )

        # (Re)bind port to the new host, if it needs to be bound
        if port_context.binding_levels is not None:
            current_bind = port_context.binding_levels[-1]

            if (current_bind is not None and
                    current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):

                binding_type = self.get_vif_type(port_context)
                # Remove port membership from any previously associated
                # security groups for updating remote_security_group_id ACLs
                self.communicator.remove_port_from_remote_groups(
                    port_context._plugin_context.session,
                    port_context.original,
                    port_context.current)

                self.communicator.bind(port_context._plugin_context.session,
                                       port_context.current,
                                       current_bind[api.BOUND_SEGMENT],
                                       port_context.host,
                                       binding_type)

                # TODO(ijW): The agent driver checks for a change of
                # host, but we're oddly seeing that the orig_host is
                # always set.  Should confirm if this is a problem or
                # not.
                self._insert_provisioning_block(port_context)

                # If this is part of a successful live migration where the
                # target compute's nova has invoked update_port() binding the
                # port to the target compute and removing the 'migrating_to'
                # key from the binding:profile attribute then this is our
                # chance to remove etcd entries of the old port binding
                if (port_context.original is not None and
                    port_context.original_host is not None and
                        port_context.original_host != port_context.host and
                    'migrating_to' not in
                        port_context.current['binding:profile'] and
                    'migrating_to' in
                        port_context.original['binding:profile'] and
                    port_context.original['binding:profile']['migrating_to']
                        == port_context.host):
                    self.communicator.unbind(
                        port_context._plugin_context.session,
                        port_context.original,
                        port_context.original_host,
                        current_bind[api.BOUND_SEGMENT]
                    )

    def port_bind_complete(self, port_id, host):
        """Tell things that the port is truly bound.

        This is a callback called by the etcd communicator.

        """
        LOG.debug('bind complete on %s', port_id)
        self._release_provisioning_block(host, port_id)

    def _insert_provisioning_block(self, context):

        # we insert a status barrier to prevent the port from transitioning
        # to active until the agent reports back that the wiring is done
        port = context.current
        if port['status'] == n_const.PORT_STATUS_ACTIVE:
            # no point in putting in a block if the status is already ACTIVE
            return

        provisioning_blocks.add_provisioning_component(
            context._plugin_context, port['id'], resources.PORT,
            provisioning_blocks.L2_AGENT_ENTITY)

    def _release_provisioning_block(self, host, port_id):
        context = n_context.get_admin_context()

        provisioning_blocks.provisioning_complete(
            context, port_id, resources.PORT,
            provisioning_blocks.L2_AGENT_ENTITY)

    def update_port_postcommit(self, port_context):
        """Work to do, post-DB commit, when updating a port

        After accepting an update_port, determine if we logged any
        work to do on the networking devices, and - if so - kick the
        update thread that tells our minions.

        """
        # TODO(ijw): optimisation: the update port may leave the
        # binding state the same as before if someone updated
        # something other than the binding on the port, but this
        # way we always send it out and it's the far end's job to
        # ignore it.  Doing less work is nevertheless good, so we
        # should in future avoid the send.

        if port_context.binding_levels is not None:
            current_bind = port_context.binding_levels[-1]
            if port_context.original_binding_levels is None:
                prev_bind = None
            else:
                prev_bind = port_context.original_binding_levels[-1]

            if (current_bind is not None and
               current_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
                self.communicator.kick()
            elif (prev_bind is not None and
                  prev_bind.get(api.BOUND_DRIVER) == self.MECH_NAME):
                self.communicator.kick()

    def delete_port_precommit(self, port_context):
        port = port_context.current
        host = port_context.host
        # NB: Host is typically '' if the port is not bound
        # A port can be in an invalid state with a host context
        if host and port_context.binding_levels:
            segment = port_context.binding_levels[-1][api.BOUND_SEGMENT]
            self.communicator.unbind(port_context._plugin_context.session,
                                     port, host, segment)

    def delete_port_postcommit(self, port_context):
        self.communicator.kick()


class AgentCommunicator(ABC):

    @abstractmethod
    def bind(self, port, segment, host, binding_type):
        pass

    @abstractmethod
    def unbind(self, port, host):
        pass


# Our prefix for etcd keys, in case others are using etcd.
LEADIN = nvpp_const.LEADIN   # TODO(ijw): make configurable?
# Model for representing a security group
SecurityGroup = namedtuple(
    'SecurityGroup', ['id', 'ingress_rules', 'egress_rules']
    )
# Model for a VPP security group rule
SecurityGroupRule = namedtuple(
    'SecurityGroupRule', ['is_ipv6', 'remote_ip_addr',
                          'ip_prefix_len', 'remote_group_id',
                          'protocol', 'port_min', 'port_max']
    )


class JournalManager(object):
    """Perform journal management tasks.

    Handles the post-journal part of the work that clears out the table and
    updates etcd. Also kicks the forward worker threads that is active and
    tells it that it has work to do.
    """
    def __init__(self):

        self.client_factory = etcdutils.EtcdClientFactory(cfg.CONF.ml2_vpp)

        self.election_key_space = LEADIN + '/election'
        self.journal_kick_key = self.election_key_space + '/kick-journal'
        LOG.debug('Journal manager init complete')

    def kick(self):
        # A thread in one Neutron process - possibly not this one -
        # is waiting to send updates from the DB to etcd.  Wake it.
        try:
            # TODO(ijw): got to be a more efficient way to create
            # a client for each thread
            # From a Neutron thread, we need to tell whichever of the
            # forwarder threads that is active that it has work to do.
            self.client_factory.client().write(self.journal_kick_key, '')
        except etcd.EtcdException:
            # A failed wake is not the end of the world.
            pass

    ######################################################################
    # The post-journal part of the work that clears out the table and
    # updates etcd.

    def make_forward_worker(self):
        # Assign a UUID to each worker thread to enable thread election
        return eventlet.spawn(self._forward_worker)

    def do_etcd_update(self, etcd_writer, k, v):
        with eventlet.Timeout(cfg.CONF.ml2_vpp.etcd_write_time, False):
            if v is None:
                etcd_writer.delete(k)
            else:
                etcd_writer.write(k, v)

    def _forward_worker(self):
        LOG.debug('forward worker begun')
        etcd_client = self.client_factory.client()
        etcd_writer = etcdutils.json_writer(etcd_client)
        lease_time = cfg.CONF.ml2_vpp.forward_worker_master_lease_time
        recovery_time = cfg.CONF.ml2_vpp.forward_worker_recovery_time

        etcd_election = etcdutils.EtcdElection(etcd_client, 'forward_worker',
                                               self.election_key_space,
                                               work_time=lease_time,
                                               recovery_time=recovery_time)
        while True:
            # Try indefinitely to regain the mastery of this thread pool. Most
            # threads will be sitting here
            etcd_election.wait_until_elected()
            try:
                # Master loop - as long as we are master and can
                # maintain it, process incoming events.

                # Every long running section is preceded by extending
                # mastership of the thread pool and followed by
                # confirmation that we still have mastership (usually
                # by a further extension).

                def work(k, v):
                    self.do_etcd_update(etcd_writer, k, v)

                # We will try to empty the pending rows in the DB
                while True:
                    etcd_election.extend_election(
                        cfg.CONF.ml2_vpp.db_query_time)
                    session = n_context.get_admin_context().session
                    maybe_more = db.journal_read(session, work)
                    if not maybe_more:
                        LOG.debug('forward worker has emptied journal')
                        etcd_election.extend_election(lease_time)
                        break

                # work queue is now empty.

                # Wait to be kicked, or (in case of emergency) run
                # every few seconds in case another thread or process
                # dumped work and failed to get notification to us to
                # process it.
                with eventlet.Timeout(lease_time + 1, False):
                    etcd_election.extend_election(lease_time)
                    try:
                        etcd_client.watch(self.journal_kick_key,
                                          timeout=lease_time)
                    except etcd.EtcdException:
                        # Check the DB queue now, anyway
                        pass
            except etcdutils.EtcdElectionLost:
                # We are no longer master
                pass
            except Exception as e:
                # TODO(ijw): log exception properly
                LOG.warning("problems in forward worker - Error name is %s. "
                            "proceeding without quiting", type(e).__name__)
                LOG.warning("Exception in forward_worker: %s", e)
                # something went bad; breathe, in case we end
                # up in a tight loop
                time.sleep(1)
                # never quit
                pass

    ######################################################################


class EtcdAgentCommunicator(AgentCommunicator, JournalManager):
    """Comms unit for etcd

    This will talk to etcd and tell it what is going on
    with the Neutron database.  etcd can be run as a
    cluster and so shouldn't die, but can be unreachable,
    so this class's job is to ensure that all updates are
    forwarded to etcd in order even when things are not
    quite going as planned.

    In etcd, the layout is:
    # Port Space
    LEADIN/nodes - subdirs are compute nodes
    LEADIN/nodes/X/ports - entries are JSON-ness containing
    all information on each bound port on the compute node.
    (Unbound ports are homeless, so the act of unbinding is
    the deletion of this entry.)
    # Global Space
    LEADIN/global/secgroups/<security-group-id> - entries are security-group
    keys whose values contain all the rule data
    LEADIN/global/networks/gpe/<vni>/<hostname>/<mac>/<ip-address> - Entries
    contain GPE data such as the VNI, hostname, instance's mac & IP address
    and key value is the underlay IP address of the compute node
    LEADIN/global/remote_group/<group-id>/<port-id> contain the IP addresses
    of ports in a security-group
    # State Space
    LEADIN/state/X - return state of the VPP
    LEADIN/state/X/alive - heartbeat back
    LEADIN/state/X/ports - port information.
    LEADIN/state/X/physnets - physnets on node
    Specifically a key here (regardless of value) indicates
    the port has been bound and is receiving traffic.
    """

    def __init__(self, notify_bound):
        super(EtcdAgentCommunicator, self).__init__()
        LOG.debug("Using etcd host:%s port:%s user:%s",
                  cfg.CONF.ml2_vpp.etcd_host,
                  cfg.CONF.ml2_vpp.etcd_port,
                  cfg.CONF.ml2_vpp.etcd_user)

        # This is a function that is called when a port has been
        # notified from the agent via etcd as completely attached.

        # We call this when we're certain that the VPP on the far end
        # has definitely bound the port, and has dropped a vhost-user
        # socket where it can be found.

        # This is more important than it seems, becaus libvirt will
        # hang, because qemu ignores its monitor port, when qemu is
        # waiting for a partner to connect with on its vhost-user
        # interfaces.  It can't start the VM - that requires
        # information from its partner it can't guess at - but it
        # shouldn't hang the monitor - nevertheless...  So we notify
        # when the port is there and ready, and qemu is never put into
        # this state by Nova.
        self.notify_bound = notify_bound

        # We need certain directories to exist
        self.state_key_space = LEADIN + '/state'
        self.port_key_space = LEADIN + '/nodes'
        self.secgroup_key_space = LEADIN + '/global/secgroups'
        self.remote_group_key_space = LEADIN + '/global/remote_group'
        self.gpe_key_space = LEADIN + '/global/networks/gpe'

        etcd_client = self.client_factory.client()
        etcd_helper = etcdutils.EtcdHelper(etcd_client)
        etcd_helper.ensure_dir(self.state_key_space)
        etcd_helper.ensure_dir(self.port_key_space)
        etcd_helper.ensure_dir(self.secgroup_key_space)
        etcd_helper.ensure_dir(self.election_key_space)
        etcd_helper.ensure_dir(self.remote_group_key_space)

        self.secgroup_enabled = cfg.CONF.SECURITYGROUP.enable_security_group
        if self.secgroup_enabled:
            self.register_secgroup_event_handler()

        # TODO(ijw): .../state/<host> lists all known hosts, and they
        # heartbeat when they're functioning

        # From this point on, there are multiple threads: ensure that
        # we don't re-use the etcd_client from multiple threads
        # simultaneously
        etcd_helper = None
        etcd_client = None

        registry.subscribe(self.start_threads,
                           resources.PROCESS,
                           events.AFTER_SPAWN)

    def start_threads(self, resource, event, trigger, payload=None):
        LOG.debug('Starting background threads for Neutron worker')
        self.return_thread = self.make_return_worker()
        self.forward_thread = self.make_forward_worker()

    def find_physnet(self, host, physnet):
        """Identify if an agent can connect to the physical network

        This can fail if the agent hasn't been configured for that
        physnet, or if the agent isn't alive.
        """

        # TODO(ijw): we use an on-the-fly created client so that we
        # don't have threading problems from the caller.
        try:
            etcd_client = self.client_factory.client()

            etcd_client.read('%s/state/%s/alive' % (LEADIN, host))
            etcd_client.read('%s/state/%s/physnets/%s' %
                             (LEADIN, host, physnet))
        except etcd.EtcdKeyNotFound:
            return False
        return True

    def register_secgroup_event_handler(self):
        """Subscribe a handler to process secgroup change notifications

        We're interested in PRECOMMIT_xxx (where we store the changes
        to etcd in the journal table) and AFTER_xxx (where we
        remind the worker thread it may have work to do).
        """

        LOG.info("Security groups feature is enabled")

        # NB security group rules cannot be updated, and security
        # groups themselves have no forwarder state in them, so we
        # don't need the update events

        # register pre-commit events
        # security group precommit events
        registry.subscribe(self.process_secgroup_commit,
                           resources.SECURITY_GROUP,
                           events.PRECOMMIT_CREATE)
        registry.subscribe(self.process_secgroup_commit,
                           resources.SECURITY_GROUP,
                           events.PRECOMMIT_DELETE)
        # security group rule precommit events
        registry.subscribe(self.process_secgroup_commit,
                           resources.SECURITY_GROUP_RULE,
                           events.PRECOMMIT_CREATE)
        registry.subscribe(self.process_secgroup_commit,
                           resources.SECURITY_GROUP_RULE,
                           events.PRECOMMIT_DELETE)

        # register post-commit events
        # security group post commit events
        registry.subscribe(self.process_secgroup_after,
                           resources.SECURITY_GROUP,
                           events.AFTER_CREATE)
        registry.subscribe(self.process_secgroup_after,
                           resources.SECURITY_GROUP,
                           events.AFTER_DELETE)
        # security group rule post commit events
        registry.subscribe(self.process_secgroup_after,
                           resources.SECURITY_GROUP_RULE,
                           events.AFTER_CREATE)
        registry.subscribe(self.process_secgroup_after,
                           resources.SECURITY_GROUP_RULE,
                           events.AFTER_DELETE)

    def process_secgroup_after(self, resource, event, trigger, **kwargs):
        """Callback for handling security group/rule commit-complete events

        This is when we should tell other things that a change has
        happened and has been recorded permanently in the DB.
        """

        # Whatever the object that caused this, we've put something
        # in the journal and now need to nudge the communicator
        self.kick()

    def process_secgroup_commit(self, resource, event, trigger, **kwargs):
        """Callback for handling security group/rule  commit events

        This is the time at which we should be committing any of our
        own auxiliary changes to the DB.
        """
        LOG.debug("Received event %s notification for resource"
                  " %s with kwargs %s", event, resource, kwargs)

        context = kwargs['context']

        sgid = None
        deleted_rule_id = None
        added_rule = {}

        if resource == resources.SECURITY_GROUP:
            if event == events.PRECOMMIT_DELETE:
                self.delete_secgroup_from_etcd(context.session,
                                               kwargs['security_group_id'])
                # Job done.
                return

                # All other event types drop out to the update-the-key
                # code at the bottom of the function.

            elif event == events.PRECOMMIT_CREATE:
                # Also, the SG passed to us is what comes in from the user.  We
                # require what went into the DB (where we added a UUID to it),
                # so we have to get that from the record of changed and as yet
                # uncommitted objects.
                sgid = kwargs[resource]['id']

            # The default rules are supplied directly in the context
            rules = kwargs[resource]['security_group_rules']

        elif resource == resources.SECURITY_GROUP_RULE:
            # We store security groups with a composite of all their
            # rules.  So in this case we track down the affected
            # rule and update its entire data.
            # NB: rules are never updated.

            if event == events.PRECOMMIT_DELETE:
                deleted_rule_id = kwargs['security_group_rule_id']
                sgid = kwargs['security_group_id']

            elif event == events.PRECOMMIT_CREATE:
                # Groups don't have the same UUID problem - we're not
                # using their UUID, we're using their SG's, which must
                # be present.
                added_rule = kwargs['security_group_rule']
                sgid = added_rule['security_group_id']

            # Start by getting the rules the DB knows about
            rules = self.get_rules_for_secgroup(sgid, context)

        # Adding a group or changing rules results in the same behaviour:
        # rewrite the key in etcd.

        # If we're in the precommit part, we may have deleted rules in this
        # list and we should exclude them.  We also, cautiously, exclude any
        # added rule as well, though it should not have made it into the DB.
        # If either is unset it's None, and != None is not a test that will
        # pass.
        rules = [r for r in rules
                 if r['id'] != deleted_rule_id
                 and r['id'] != added_rule.get('id')]

        if added_rule:
            rules.append(added_rule)

        # Get the full details of the secgroup in etcd-exchange format
        secgroup = self.get_secgroup_from_rules(sgid, rules)

        # Write security group data to etcd
        self.send_secgroup_to_agents(context.session, secgroup)

    def get_rules_for_secgroup(self, sgid, context):
        plugin = directory.get_plugin()
        rules = plugin.get_security_group_rules(
            context, filters={'security_group_id': [sgid]}
        )

        return rules

    def get_secgroup_rule(self, rule_id, context):
        """Fetch and return a security group rule from Neutron DB"""
        plugin = directory.get_plugin()
        return plugin.get_security_group_rule(context, rule_id)

    def get_secgroup_from_rules(self, sgid, rules):
        """Build and return a security group namedtuple object.

        This object is the format with which we exchange data with
        the agents, and can be written in this form to etcd.

        Arguments:
        sgid - ID of the security group
        rules - A list of security group rules as returned from the DB

        1. Filter rules using the input param: sgid to ensure that rules
        belong to that group
        2. Split rules based on direction.
        3. Construct and return the SecurityGroup namedtuple.
        """
        # A generator object of security group rules for sgid
        sg_rules = (r for r in rules if r['security_group_id'] == sgid)

        # A list of ingress and egress namedtuple rule objects
        ingress_rules = []
        egress_rules = []
        for r in sg_rules:
            if r['direction'] == 'ingress':
                ingress_rules.append(self._neutron_rule_to_vpp_acl(r))
            else:
                egress_rules.append(self._neutron_rule_to_vpp_acl(r))
        return SecurityGroup(sgid, ingress_rules, egress_rules)

    # Neutron supports the following IP protocols by name
    protocols = {'tcp': 6, 'udp': 17, 'icmp': 1, 'icmpv6': 58,
                 'ah': 51, 'dccp': 33, 'egp': 8, 'esp': 50, 'gre': 47,
                 'igmp': 2, 'ipv6-encap': 41, 'ipv6-frag': 44,
                 'ipv6-icmp': 58, 'ipv6-nonxt': 59, 'ipv6-opts': 60,
                 'ipv6-route': 43, 'ospf': 89, 'pgm': 113, 'rsvp': 46,
                 'sctp': 132, 'udplite': 136,
                 'vrrp': 112}

    def _neutron_rule_to_vpp_acl(self, rule):
        """Convert a neutron rule to vpp_acl rule.

        Arguments:
        1. rule -- represents a neutron rule

        - Convert the neutron rule to a vpp_acl rule model
        - Return the SecurityGroupRule namedtuple.
        """
        is_ipv6 = 0 if rule['ethertype'] == 'IPv4' else 1

        if rule['protocol'] is None:
            # Neutron uses None to represent any protocol
            # We use 0 to represent any protocol
            protocol = 0
        elif rule['protocol'] in self.protocols:
            # VPP rules require IANA protocol numbers
            # Convert input accordingly.
            protocol = self.protocols[rule['protocol']]
        else:
            # Convert incoming string value to an integer
            protocol = int(rule['protocol'])

        if is_ipv6 and protocol == self.protocols['icmp']:
            protocol = self.protocols['icmpv6']

        # Neutron represents any ip address by setting
        # both the remote_ip_prefix and remote_group_id fields to None
        # VPP uses all zeros to represent any Ipv4/IpV6 address
        # In a neutron security group rule, you can either set the
        # remote_ip_prefix or remote_group_id but not both.
        # When a remote_ip_prefix value is set, the remote_group_id
        # is ignored and vice versa. If both the attributes are unset,
        # any remote_ip_address is permitted.
        if rule['remote_ip_prefix']:
            remote_ip_addr, ip_prefix_len = rule['remote_ip_prefix'
                                                 ].split('/')
            ip_prefix_len = int(ip_prefix_len)
            # Set the required attribute, referenced by the SecurityGroupRule
            # tuple, remote_group_id to None.
            remote_group_id = None
        elif rule['remote_group_id']:
            remote_group_id = rule['remote_group_id']
            # Set remote_ip_addr and ip_prefix_len to empty values
            # as it is a required attribute referenced by the
            # SecurityGroupRule tuple. When the remote_ip_addr value is set
            # to None, the vpp-agent ignores it and looks at the
            # remote-group-id. One of these attributes must be set to a valid
            # value.
            remote_ip_addr, ip_prefix_len = None, 0
        else:
            # In neutron, when both the remote_ip_prefix and remote-group-id
            # are set to None, it implies permit any. But we need to set a
            # valid value for the remote_ip_address attribute to tell the
            # vpp-agent.
            remote_ip_addr = '0.0.0.0' if not is_ipv6 else '::'
            ip_prefix_len = 0
            # Set the required attribute in the SecurityGroupRule tuple
            # remote_group_id to None.
            remote_group_id = None
        # Neutron uses -1 or None or 0 to represent all ports
        # VPP uses 0-65535 for all tcp/udp ports, Use -1 to represent all
        # ranges for ICMP types and codes
        if rule['port_range_min'] == -1 or not rule['port_range_min']:
            # Valid TCP/UDP port ranges for TCP(6), UDP(17) and UDPLite(136)
            if protocol in [6, 17, 136]:
                port_min, port_max = (0, 65535)
            # A Value of -1 represents all ICMP/ICMPv6 types & code ranges
            elif protocol in [1, 58]:
                port_min, port_max = (-1, -1)
            # Ignore port_min and port_max fields as other protocols don't
            # use them
            else:
                port_min, port_max = (0, 0)
        else:
            port_min, port_max = (rule['port_range_min'],
                                  rule['port_range_max'])

        # Handle a couple of special ICMP cases
        if protocol in [1, 58]:
            # All ICMP types and codes
            if rule['port_range_min'] is None:
                port_min, port_max = (-1, -1)
            # All codes for a specific type
            elif rule['port_range_max'] is None:
                port_min, port_max = (rule['port_range_min'], -1)

        sg_rule = SecurityGroupRule(is_ipv6, remote_ip_addr,
                                    ip_prefix_len,
                                    remote_group_id, protocol, port_min,
                                    port_max)
        return sg_rule

    def send_secgroup_to_agents(self, session, secgroup):
        """Writes a secgroup to the etcd secgroup space

        Does this via the journal as part of the commit, so
        that the write is atomic with the DB commit to the
        Neutron tables.

        Arguments:
        session  -- the DB session with an open transaction
        secgroup -- Named tuple representing a SecurityGroup
        """
        secgroup_path = self._secgroup_path(secgroup.id)
        # sg is a dict of of ingress and egress rule lists
        sg = {}
        ingress_rules = []
        egress_rules = []
        for ingress_rule in secgroup.ingress_rules:
            ingress_rules.append(ingress_rule._asdict())
        for egress_rule in secgroup.egress_rules:
            egress_rules.append(egress_rule._asdict())
        sg['ingress_rules'] = ingress_rules
        sg['egress_rules'] = egress_rules
        db.journal_write(session, secgroup_path, sg)

    def delete_secgroup_from_etcd(self, session, secgroup_id):
        """Deletes the secgroup key from etcd

        Arguments:
        secgroup_id -- The id of the security group that we want to delete
        """
        secgroup_path = self._secgroup_path(secgroup_id)
        # Delete the security-group from remote-groups
        remote_group_path = self._remote_group_path(secgroup_id, '')
        db.journal_write(session, secgroup_path, None)
        db.journal_write(session, remote_group_path, None)

    def _secgroup_path(self, secgroup_id):
        return self.secgroup_key_space + "/" + secgroup_id

    def _port_path(self, host, port):
        return self.port_key_space + "/" + host + "/ports/" + port['id']

    # TODO(najoy): Move all security groups related code to a dedicated
    # module
    def _remote_group_path(self, secgroup_id, port_id):
        return self.remote_group_key_space + "/" + secgroup_id + "/" + port_id

    def _gpe_remote_path(self, host, port, segmentation_id):
        ip_addrs = port.get('fixed_ips', [])
        gpe_dir = self.gpe_key_space + "/" + str(segmentation_id) + "/" + \
            host + "/" + port['mac_address']
        if ip_addrs and segmentation_id:
            # Delete all GPE keys and the empty GPE directory itself in etcd
            return [gpe_dir + "/" + ip_address['ip_address']
                    for ip_address in ip_addrs] + [gpe_dir]
        else:
            return []

    # A remote_group_path is qualified with a port ID because the agent uses
    # the port ID to keep track of the dynamic set of ports associated with
    # the remote_group_id. The value of this key is the
    # list of IP addresses allocated to that port by neutron.
    # Using the above two pieces of information, the vpp-agent
    # computes the complete set of IP addresses belonging to a
    # remote_group_id and uses it to expand the rule when a remote_group_id
    # attribute is specified. The expansion is performed by computing a
    # product using all the IP addresses of the ports in the remote_group
    # and the remaining attributes of the rule.
    def _remote_group_paths(self, port):
        security_groups = port.get('security_groups', [])
        return [self._remote_group_path(secgroup_id, port['id'])
                for secgroup_id in security_groups]

    ######################################################################
    # These functions use a DB journal to log messages before
    # they're put into etcd, as that can take time and may not always
    # succeed (so is a bad candidate for calling within or after a
    # transaction).

    def bind(self, session, port, segment, host, binding_type):
        # NB segmentation_id is not optional in the wireline protocol,
        # we just pass 0 for unsegmented network types
        data = {
            'mac_address': port['mac_address'],
            'mtu': 1500,  # not this, but what?: port['mtu'],
            'physnet': segment[api.PHYSICAL_NETWORK],
            'network_type': segment[api.NETWORK_TYPE],
            'segmentation_id': segment.get(api.SEGMENTATION_ID, 0),
            'binding_type': binding_type,
            'security_groups': port.get('security_groups', []),
            'allowed_address_pairs': port.get('allowed_address_pairs', []),
            'fixed_ips': port.get('fixed_ips', []),
            'port_security_enabled': port.get('port_security_enabled', True),
            # Non-essential, but useful for monitoring and debug:
            'device_id': port.get('device_id', None)
        }
        LOG.debug("Queueing bind request for port:%s, "
                  "segment:%s, host:%s, type:%s",
                  port, data['segmentation_id'],
                  host, data['binding_type'])

        db.journal_write(session, self._port_path(host, port), data)
        # For tracking ports in a remote_group, create a journal entry in
        # the remote-group key-space.
        # This will result in the creation of an etcd key with the port ID
        # and it's security-group-id. The value is the list of IP addresses.
        # For details on how the agent thread handles the remote-group
        # watch events refer to the doc under RemoteGroupWatcher in the
        # server module.
        for remote_group_path in self._remote_group_paths(port):
            db.journal_write(session, remote_group_path,
                             [item['ip_address'] for item in
                              data['fixed_ips']])
        self.kick()

    def unbind(self, session, port, host, segment):
        # GPE requires segmentation ID for removing its etcd keys
        segmentation_id = segment.get(api.SEGMENTATION_ID, 0)
        LOG.debug("Queueing unbind request for port:%s, host:%s, segment:%s",
                  port, host, segmentation_id)
        # When a port is unbound, this journal entry will delete the
        # port key (and hence it's ip address value) from etcd. The behavior
        # is like removing the port IP address(es) from the
        # remote-group. The agent will receive a watch notification and
        # update the ACL rules to remove the IP(s) from the rule.
        # Other port IP addresses associated with the remote-group-id will
        # remain in the rule (as it should be).
        # A hypothetical alternate implementation - If we made the remote key
        # of a secgroup, a list of IPs, we could refresh the content of
        # every secgroup containing this port.
        # It makes the sender's work harder, and receiver's work easier
        # In the sender, we need keep track of IPs during port binds, unbinds,
        # security-group associations and updates. However, algorithmically
        # this alternate impl. won't be better what we have now i.e. O(n).
        for remote_group_path in self._remote_group_paths(port):
            db.journal_write(session, remote_group_path, None)
        db.journal_write(session, self._port_path(host, port), None)
        # Remove all GPE remote keys from etcd, for this port
        for gpe_remote_path in self._gpe_remote_path(host, port,
                                                     segmentation_id):
            db.journal_write(session,
                             gpe_remote_path,
                             None)
        self.kick()

    def remove_port_from_remote_groups(self, session, original_port,
                                       current_port):
        """Remove ports from remote groups when port security is updated."""
        removed_sec_groups = set(original_port['security_groups']) - set(
            current_port['security_groups'])
        for secgroup_id in removed_sec_groups:
            db.journal_write(session,
                             self._remote_group_path(secgroup_id,
                                                     current_port['id']),
                             None)
        self.kick()

    def make_return_worker(self):
        """The thread that manages data returned from agents via etcd."""

        # TODO(ijw): agents and physnets should be checked before a bind
        # is accepted

        # Note that the initial load is done before spawning the background
        # watcher - this means that we're prepared with the information
        # to accept bind requests.

        class ReturnWatcher(etcdutils.EtcdChangeWatcher):

            def __init__(self, etcd_client, name, watch_path,
                         election_path=None, data=None):
                super(ReturnWatcher, self).__init__(etcd_client,
                                                    name, watch_path,
                                                    election_path,
                                                    wait_until_elected=True,
                                                    data=data)

            # Every key changes on a restart, which has the
            # useful effect of resending all Nova notifications
            # for 'port bound' events based on existing state.
            def added(self, key, value):
                # Matches a port key, gets host and uuid
                m = re.match('^([^/]+)/ports/([^/]+)$', key)

                if m:
                    host = m.group(1)
                    port = m.group(2)

                    self.data.notify_bound(port, host)
                else:
                    # Matches an agent, gets a liveness notification
                    m = re.match(self.data.state_key_space + '^([^/]+)/alive$',
                                 key)

                    if m:
                        # TODO(ijw): this should be fed into the agents
                        # table.
                        host = m.group(1)

                        LOG.info('host %s is alive', host)

            def removed(self, key):
                # Nova doesn't much care when ports go away.

                # Matches an agent, gets a liveness notification
                m = re.match(self.data.state_key_space + '^([^/]+)/alive$',
                             key)

                if m:
                    # TODO(ijw): this should be fed into the agents
                    # table.
                    host = m.group(1)

                    LOG.info('host %s has died', host)

        # Assign a UUID to each worker thread to enable thread election
        return eventlet.spawn(
            ReturnWatcher(self.client_factory.client(), 'return_worker',
                          self.state_key_space, self.election_key_space,
                          data=self).watch_forever)