Merge "Remove use of apic-ml2-driver library"

This commit is contained in:
Jenkins
2017-08-29 16:43:31 +00:00
committed by Gerrit Code Review
23 changed files with 1142 additions and 23 deletions

View File

@@ -1,7 +1,6 @@
function install_apic_aim {
echo_summary "Installing apic_aim"
install_apic_ml2
install_aim
install_opflex
install_apicapi

View File

@@ -24,8 +24,6 @@ NEUTRON_CONF=$NEUTRON_CONF_DIR/neutron.conf
GBP_CONF_DIR=/etc/gbp
AIM_REPO=http://github.com/noironetworks/aci-integration-module.git
AIM_DIR=$DEST/aim
APICML2_REPO=http://github.com/noironetworks/apic-ml2-driver.git
APICML2_DIR=$DEST/apic_ml2
OPFLEX_REPO=http://github.com/noironetworks/python-opflex-agent.git
OPFLEX_DIR=$DEST/opflexagent
APICAPI_REPO=http://github.com/noironetworks/apicapi.git
@@ -87,13 +85,6 @@ function install_gbpui {
mv $GBPUI_DIR/_test-requirements.txt $GBPUI_DIR/test-requirements.txt
}
function install_apic_ml2 {
git_clone $APICML2_REPO $APICML2_DIR $APICML2_BRANCH
mv $APICML2_DIR/test-requirements.txt $APICML2_DIR/_test-requirements.txt
setup_develop $APICML2_DIR
mv $APICML2_DIR/_test-requirements.txt $APICML2_DIR/test-requirements.txt
}
function install_apicapi {
git_clone $APICAPI_REPO $APICAPI_DIR $APICAPI_BRANCH
mv $APICAPI_DIR/test-requirements.txt $APICAPI_DIR/_test-requirements.txt

View File

@@ -125,7 +125,6 @@ if is_service_enabled group-policy; then
fi
fi
# REVISIT move installs to install phase?
# install_apic_ml2
install_gbpclient
install_gbpservice
[[ $ENABLE_NFP = True ]] && install_nfpgbpservice

View File

View File

@@ -0,0 +1,29 @@
# Copyright (c) 2017 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron.agent.linux import dhcp
class ApicDnsmasq(dhcp.Dnsmasq):
@classmethod
def get_isolated_subnets(cls, network):
"""Returns a dict indicating whether or not a subnet is isolated
A subnet is always considered isolated for APIC.
"""
isolated_subnets = collections.defaultdict(lambda: True)
return isolated_subnets

View File

@@ -0,0 +1,229 @@
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import httplib2
from oslo_config import cfg
from oslo_log import log as logging
import six.moves.urllib.parse as urlparse
import webob
from neutron._i18n import _LE
from neutron.agent.linux import daemon
from neutron.agent.linux import utils as agent_utils
from neutron.common import config
from neutron.common import utils
from neutron import wsgi
from oslo_serialization import jsonutils
LOG = logging.getLogger(__name__)
class NetworkMetadataProxyHandler(object):
"""Proxy AF_INET metadata request through Unix Domain socket.
The Unix domain socket allows the proxy access resource that are not
accessible within the isolated tenant context.
"""
def __init__(self, network_id=None, router_id=None, domain_id=None):
self.network_id = network_id
self.router_id = router_id
self.domain_id = domain_id
if network_id is None and router_id is None and domain_id is None:
msg = _('network_id, router_id, and domain_id are None. '
'One of them must be provided.')
raise ValueError(msg)
@webob.dec.wsgify(RequestClass=webob.Request)
def __call__(self, req):
LOG.debug("Request: %s", req)
try:
return self._proxy_request(req.remote_addr,
req.method,
req.path_info,
req.query_string,
req.body)
except Exception:
LOG.exception(_LE("Unexpected error."))
msg = _('An unknown error has occurred. '
'Please try your request again.')
return webob.exc.HTTPInternalServerError(explanation=unicode(msg))
def get_network_id(self, domain_id, remote_address):
filedir = '/var/lib/neutron/opflex_agent'
filename = 'instance_networks.state'
fqfn = '%s/%s' % (filedir, filename)
nets = None
try:
with open(fqfn, "r") as f:
nets = jsonutils.load(f)
except Exception as e:
LOG.warning("Exception in reading file: %s" % str(e))
if nets:
if domain_id in nets:
if remote_address in nets[domain_id]:
return nets[domain_id][remote_address]
LOG.warning("IP address not found: domain=%s, addr=%s" % (
domain_id, remote_address))
return None
def _proxy_request(self, remote_address, method, path_info,
query_string, body):
headers = {
'X-Forwarded-For': remote_address,
}
if self.domain_id:
network_id = self.get_network_id(self.domain_id, remote_address)
if network_id:
headers['X-Neutron-Network-ID'] = network_id
else:
return webob.exc.HTTPNotFound()
elif self.router_id:
headers['X-Neutron-Router-ID'] = self.router_id
else:
headers['X-Neutron-Network-ID'] = self.network_id
url = urlparse.urlunsplit((
'http',
'169.254.169.254', # a dummy value to make the request proper
path_info,
query_string,
''))
h = httplib2.Http()
resp, content = h.request(
url,
method=method,
headers=headers,
body=body,
connection_type=agent_utils.UnixDomainHTTPConnection)
if resp.status == 200:
LOG.debug(resp)
LOG.debug(content)
response = webob.Response()
response.status = resp.status
response.headers['Content-Type'] = resp['content-type']
response.body = content
return response
elif resp.status == 400:
return webob.exc.HTTPBadRequest()
elif resp.status == 404:
return webob.exc.HTTPNotFound()
elif resp.status == 409:
return webob.exc.HTTPConflict()
elif resp.status == 500:
msg = _(
'Remote metadata server experienced an internal server error.'
)
LOG.debug(msg)
return webob.exc.HTTPInternalServerError(explanation=unicode(msg))
else:
raise Exception(_('Unexpected response code: %s') % resp.status)
class ProxyDaemon(daemon.Daemon):
def __init__(self, pidfile, port, network_id=None, router_id=None,
domain_id=None,
user=None, group=None, watch_log=True, host="0.0.0.0"):
uuid = domain_id or network_id or router_id
super(ProxyDaemon, self).__init__(pidfile, uuid=uuid, user=user,
group=group, watch_log=watch_log)
self.network_id = network_id
self.router_id = router_id
self.domain_id = domain_id
self.port = port
self.host = host
def run(self):
handler = NetworkMetadataProxyHandler(
self.network_id,
self.router_id,
self.domain_id)
proxy = wsgi.Server('opflex-network-metadata-proxy')
proxy.start(handler, self.port, host=self.host)
# Drop privileges after port bind
super(ProxyDaemon, self).run()
proxy.wait()
def main():
opts = [
cfg.StrOpt('network_id',
help=_('Network that will have instance metadata '
'proxied.')),
cfg.StrOpt('router_id',
help=_('Router that will have connected instances\' '
'metadata proxied.')),
cfg.StrOpt('domain_id',
help=_('L3 domain that will have connected instances\' '
'metadata proxied.')),
cfg.StrOpt('pid_file',
help=_('Location of pid file of this process.')),
cfg.BoolOpt('daemonize',
default=False,
help=_('Run as daemon.')),
cfg.StrOpt('metadata_host',
default="0.0.0.0",
help=_("IP address to listen for metadata server "
"requests.")),
cfg.IntOpt('metadata_port',
default=9697,
help=_("TCP Port to listen for metadata server "
"requests.")),
cfg.StrOpt('metadata_proxy_socket',
default='$state_path/metadata_proxy',
help=_('Location of Metadata Proxy UNIX domain '
'socket')),
cfg.StrOpt('metadata_proxy_user',
default=None,
help=_("User (uid or name) running metadata proxy after "
"its initialization")),
cfg.StrOpt('metadata_proxy_group',
default=None,
help=_("Group (gid or name) running metadata proxy after "
"its initialization")),
cfg.BoolOpt('metadata_proxy_watch_log',
default=True,
help=_("Watch file log. Log watch should be disabled when "
"metadata_proxy_user/group has no read/write "
"permissions on metadata proxy log file.")),
]
cfg.CONF.register_cli_opts(opts)
# Don't get the default configuration file
cfg.CONF(project='neutron', default_config_files=[])
config.setup_logging()
utils.log_opt_values(LOG)
proxy = ProxyDaemon(cfg.CONF.pid_file,
cfg.CONF.metadata_port,
network_id=cfg.CONF.network_id,
router_id=cfg.CONF.router_id,
domain_id=cfg.CONF.domain_id,
user=cfg.CONF.metadata_proxy_user,
group=cfg.CONF.metadata_proxy_group,
watch_log=cfg.CONF.metadata_proxy_watch_log,
host=cfg.CONF.metadata_host)
if cfg.CONF.daemonize:
proxy.start()
else:
proxy.run()

View File

@@ -0,0 +1,290 @@
# Copyright (c) 2014 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import sys
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import periodic_task
from oslo_service import service as svc
from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import config as common_cfg
from neutron.common import utils as neutron_utils
from neutron import manager
from neutron import service
from gbpservice._i18n import _LE
from gbpservice._i18n import _LI
from gbpservice.neutron.agent.topology import rpc as arpc
ACI_CHASSIS_DESCR_FORMAT = 'topology/pod-1/node-(\d+)'
ACI_PORT_DESCR_FORMATS = [
'topology/pod-1/node-(\d+)/sys/conng/path-\[eth(\d+)/(\d+(\/\d+)*)\]',
'topology/pod-1/paths-(\d+)/pathep-\[eth(\d+)/(\d+(\/\d+)*)\]',
]
ACI_PORT_LOCAL_FORMAT = 'Eth(\d+)/(\d+(\/\d+)*)'
ACI_VPCPORT_DESCR_FORMAT = ('topology/pod-1/protpaths-(\d+)-(\d+)/pathep-'
'\[(.*)\]')
AGENT_FORCE_UPDATE_COUNT = 5
BINARY_APIC_HOST_AGENT = 'neutron-cisco-apic-host-agent'
TYPE_APIC_HOST_AGENT = 'cisco-apic-host-agent'
VPCMODULE_NAME = 'vpc-%s-%s'
LOG = logging.getLogger(__name__)
apic_opts = [
cfg.ListOpt('apic_host_uplink_ports',
default=[],
help=_('The uplink ports to check for ACI connectivity')),
cfg.FloatOpt('apic_agent_poll_interval',
default=60,
help=_('Interval between agent poll for topology (in sec)')),
cfg.FloatOpt('apic_agent_report_interval',
default=60,
help=_('Interval between agent status updates (in sec)')),
]
cfg.CONF.register_opts(apic_opts, "ml2_cisco_apic")
class ApicTopologyAgent(manager.Manager):
def __init__(self, host=None):
if host is None:
host = neutron_utils.get_hostname()
super(ApicTopologyAgent, self).__init__(host=host)
self.conf = cfg.CONF.ml2_cisco_apic
self.count_current = 0
self.count_force_send = AGENT_FORCE_UPDATE_COUNT
self.interfaces = {}
self.lldpcmd = None
self.peers = {}
self.port_desc_re = map(re.compile, ACI_PORT_DESCR_FORMATS)
self.port_local_re = re.compile(ACI_PORT_LOCAL_FORMAT)
self.vpcport_desc_re = re.compile(ACI_VPCPORT_DESCR_FORMAT)
self.chassis_desc_re = re.compile(ACI_CHASSIS_DESCR_FORMAT)
self.service_agent = arpc.ApicTopologyServiceNotifierApi()
self.state = None
self.state_agent = None
self.topic = arpc.TOPIC_APIC_SERVICE
self.uplink_ports = []
self.invalid_peers = []
def init_host(self):
LOG.info(_LI("APIC host agent: agent starting on %s"), self.host)
self.state = {
'binary': BINARY_APIC_HOST_AGENT,
'host': self.host,
'topic': self.topic,
'configurations': {},
'start_flag': True,
'agent_type': TYPE_APIC_HOST_AGENT,
}
self.uplink_ports = []
for inf in self.conf.apic_host_uplink_ports:
if ip_lib.device_exists(inf):
self.uplink_ports.append(inf)
else:
# ignore unknown interfaces
LOG.error(_LE("No such interface (ignored): %s"), inf)
self.lldpcmd = ['lldpctl', '-f', 'keyvalue'] + self.uplink_ports
def after_start(self):
LOG.info(_LI("APIC host agent: started on %s"), self.host)
@periodic_task.periodic_task(
spacing=cfg.CONF.ml2_cisco_apic.apic_agent_poll_interval,
run_immediately=True)
def _check_for_new_peers(self, context):
LOG.debug("APIC host agent: _check_for_new_peers")
if not self.lldpcmd:
return
try:
# Check if we must send update even if there is no change
force_send = False
self.count_current += 1
if self.count_current >= self.count_force_send:
force_send = True
self.count_current = 0
# Check for new peers
new_peers = self._get_peers()
new_peers = self._valid_peers(new_peers)
# Make a copy of current interfaces
curr_peers = {}
for interface in self.peers:
curr_peers[interface] = self.peers[interface]
# Based curr -> new updates, add the new interfaces
self.peers = {}
for interface in new_peers:
peer = new_peers[interface]
self.peers[interface] = peer
if (interface in curr_peers and
curr_peers[interface] != peer):
LOG.debug('reporting peer removal: %s', peer)
self.service_agent.update_link(
context, peer[0], peer[1], None, 0, 0, 0, '')
if (interface not in curr_peers or
curr_peers[interface] != peer or
force_send):
LOG.debug('reporting new peer: %s', peer)
self.service_agent.update_link(context, *peer)
if interface in curr_peers:
curr_peers.pop(interface)
# Any interface still in curr_peers need to be deleted
for peer in curr_peers.values():
LOG.debug('reporting peer removal: %s', peer)
self.service_agent.update_link(
context, peer[0], peer[1], None, 0, 0, 0, '')
except Exception:
LOG.exception(_LE("APIC service agent: exception in LLDP parsing"))
def _get_peers(self):
interfaces = {}
peers = {}
lldpkeys = utils.execute(self.lldpcmd, run_as_root=True)
for line in lldpkeys.splitlines():
if '=' not in line:
continue
fqkey, value = line.split('=', 1)
lldp, interface, key = fqkey.split('.', 2)
if lldp == 'lldp':
if interface not in interfaces:
interfaces[interface] = {}
interfaces[interface][key] = value
for interface in interfaces:
if 'port.descr' in interfaces[interface]:
value = interfaces[interface]['port.descr']
port_desc = value
for regexp in self.port_desc_re:
match = regexp.match(value)
if match:
mac = self._get_mac(interface)
switch, module, port = match.group(1, 2, 3)
peer = (self.host, interface, mac,
switch, module, port, port_desc)
if interface not in peers:
peers[interface] = []
peers[interface].append(peer)
match = self.vpcport_desc_re.match(value)
if match:
mac = self._get_mac(interface)
switch1, switch2, bundle = match.group(1, 2, 3)
switch, module, port = None, None, None
if (bundle is not None and
'chassis.descr' in interfaces[interface]):
value = interfaces[interface]['chassis.descr']
match = self.chassis_desc_re.match(value)
if match:
switch = match.group(1)
if (switch is not None and
'port.local' in interfaces[interface]):
value = interfaces[interface]['port.local']
match = self.port_local_re.match(value)
if match:
module, port = match.group(1, 2)
if module is not None and port is not None:
vpcmodule = VPCMODULE_NAME % (module, port)
peer = (self.host, interface, mac,
switch, vpcmodule, bundle, port_desc)
if interface not in peers:
peers[interface] = []
peers[interface].append(peer)
return peers
def _valid_peers(self, peers):
# Reduce the peers array to one valid peer per interface
# NOTE:
# There is a bug in lldpd daemon that it keeps reporting
# old peers even after their updates have stopped
# we keep track of that report remove them from peers
valid_peers = {}
invalid_peers = []
for interface in peers:
curr_peer = None
for peer in peers[interface]:
if peer in self.invalid_peers or curr_peer:
invalid_peers.append(peer)
else:
curr_peer = peer
if curr_peer is not None:
valid_peers[interface] = curr_peer
self.invalid_peers = invalid_peers
return valid_peers
def _get_mac(self, interface):
if interface in self.interfaces:
return self.interfaces[interface]
try:
mac = ip_lib.IPDevice(interface).link.address
self.interfaces[interface] = mac
return mac
except Exception:
# we can safely ignore it, it is only needed for debugging
LOG.exception(
_LE("APIC service agent: can not get MACaddr for %s"),
interface)
def report_send(self, context):
if not self.state_agent:
return
LOG.debug("APIC host agent: sending report state")
try:
self.state_agent.report_state(context, self.state)
self.state.pop('start_flag', None)
except AttributeError:
# This means the server does not support report_state
# ignore it
return
except Exception:
LOG.exception(_LE("APIC host agent: failed in reporting state"))
def launch(binary, manager, topic=None):
cfg.CONF(project='neutron')
common_cfg.init(sys.argv[1:])
config.setup_logging()
report_period = cfg.CONF.ml2_cisco_apic.apic_agent_report_interval
poll_period = cfg.CONF.ml2_cisco_apic.apic_agent_poll_interval
server = service.Service.create(
binary=binary, manager=manager, topic=topic,
report_interval=report_period, periodic_interval=poll_period)
svc.launch(cfg.CONF, server).wait()
def agent_main():
launch(
BINARY_APIC_HOST_AGENT,
'apic_ml2.neutron.plugins.ml2.drivers.' +
'cisco.apic.apic_topology.ApicTopologyAgent')

View File

@@ -0,0 +1,39 @@
# Copyright (c) 2017 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging
from neutron.common import rpc
TOPIC_APIC_SERVICE = 'apic-service'
class ApicTopologyServiceNotifierApi(object):
def __init__(self):
target = oslo_messaging.Target(topic=TOPIC_APIC_SERVICE, version='1.2')
self.client = rpc.get_client(target)
def update_link(self, context, host, interface, mac, switch, module, port,
port_description=''):
cctxt = self.client.prepare(version='1.2', fanout=True)
cctxt.cast(context, 'update_link', host=host, interface=interface,
mac=mac, switch=switch, module=module, port=port,
port_description=port_description)
def delete_link(self, context, host, interface):
cctxt = self.client.prepare(version='1.2', fanout=True)
cctxt.cast(context, 'delete_link', host=host, interface=interface,
mac=None, switch=0, module=0, port=0)

View File

@@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""HA IP address to Port ID association
Revision ID: 4c0c1e2c0160
Revises: 27b724002081
Create Date: 2015-10-19 02:08:54.252877
"""
# revision identifiers, used by Alembic.
revision = '4c0c1e2c0160'
down_revision = None
from alembic import op
import sqlalchemy as sa
from neutron.db import migration
def upgrade():
if not migration.schema_has_table('apic_ml2_ha_ipaddress_to_port_owner'):
op.create_table(
'apic_ml2_ha_ipaddress_to_port_owner',
sa.Column('ha_ip_address', sa.String(length=64), nullable=False),
sa.Column('port_id', sa.String(length=64), nullable=False),
sa.ForeignKeyConstraint(
['port_id'], ['ports.id'], ondelete='CASCADE',
name='apic_ml2_ha_ipaddress_to_port_owner_fk_port_id'),
sa.PrimaryKeyConstraint('ha_ip_address', 'port_id'))

View File

@@ -1 +1 @@
27b724002081
4c0c1e2c0160

View File

@@ -15,11 +15,6 @@
from oslo_config import cfg
# Register apic_system_id
# REVISIT(ivar): would be nice to remove dependency from apic_ml2 in GBP, and
# register option directly here.
from apic_ml2.neutron.plugins.ml2.drivers.cisco.apic import config # noqa
apic_opts = [
cfg.BoolOpt('enable_optimized_dhcp', default=True),
@@ -45,3 +40,15 @@ apic_opts = [
cfg.CONF.register_opts(apic_opts, "ml2_apic_aim")
# oslo_config limits ${var} expansion to global variables
# That is why apic_system_id as a global variable
global_opts = [
cfg.StrOpt('apic_system_id',
default='openstack',
help=_("Prefix for APIC domain/names/profiles created")),
]
cfg.CONF.register_opts(global_opts)

View File

@@ -48,12 +48,11 @@ from oslo_db import exception as db_exc
from oslo_log import log
import oslo_messaging
from apic_ml2.neutron.plugins.ml2.drivers.cisco.apic import (rpc as
apic_topo_rpc)
from gbpservice._i18n import _LE
from gbpservice._i18n import _LI
from gbpservice._i18n import _LW
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.agent.topology import rpc as arpc
from gbpservice.neutron.extensions import cisco_apic
from gbpservice.neutron.extensions import cisco_apic_l3 as a_l3
from gbpservice.neutron.plugins.ml2plus import driver_api as api_plus
@@ -181,7 +180,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.sg_enabled = securitygroups_rpc.is_firewall_enabled()
# setup APIC topology RPC handler
self.topology_conn = n_rpc.create_connection()
self.topology_conn.create_consumer(apic_topo_rpc.TOPIC_APIC_SERVICE,
self.topology_conn.create_consumer(arpc.TOPIC_APIC_SERVICE,
[self.TopologyRpcEndpoint(self)],
fanout=False)
self.topology_conn.consume_in_threads()

View File

@@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from apic_ml2.neutron.db import port_ha_ipaddress_binding as ha_ip_db
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.db import api as db_api
@@ -23,6 +21,9 @@ from gbpservice._i18n import _LE
from gbpservice._i18n import _LW
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
nova_client as nclient)
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
port_ha_ipaddress_binding as ha_ip_db)
LOG = log.getLogger(__name__)

View File

@@ -0,0 +1,140 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy import orm
from neutron import context as nctx
from neutron.db import api as db_api
from neutron.db import models_v2
from oslo_db import exception as db_exc
from oslo_log import log as logging
from neutron_lib.db import model_base
from neutron_lib.plugins import directory
LOG = logging.getLogger(__name__)
class HAIPAddressToPortAssocation(model_base.BASEV2):
"""Port Owner for HA IP Address.
This table is used to store the mapping between the HA IP Address
and the Port ID of the Neutron Port which currently owns this
IP Address.
"""
__tablename__ = 'apic_ml2_ha_ipaddress_to_port_owner'
ha_ip_address = sa.Column(sa.String(64), nullable=False,
primary_key=True)
port_id = sa.Column(sa.String(64), sa.ForeignKey('ports.id',
ondelete='CASCADE'),
nullable=False, primary_key=True)
class PortForHAIPAddress(object):
def __init__(self):
self.session = db_api.get_session()
def _get_ha_ipaddress(self, port_id, ipaddress):
return self.session.query(HAIPAddressToPortAssocation).filter_by(
port_id=port_id, ha_ip_address=ipaddress).first()
def get_port_for_ha_ipaddress(self, ipaddress, network_id):
"""Returns the Neutron Port ID for the HA IP Addresss."""
port_ha_ip = self.session.query(HAIPAddressToPortAssocation).join(
models_v2.Port).filter(
HAIPAddressToPortAssocation.ha_ip_address == ipaddress).filter(
models_v2.Port.network_id == network_id).first()
return port_ha_ip
def get_ha_ipaddresses_for_port(self, port_id):
"""Returns the HA IP Addressses associated with a Port."""
objs = self.session.query(HAIPAddressToPortAssocation).filter_by(
port_id=port_id).all()
return sorted([x['ha_ip_address'] for x in objs])
def set_port_id_for_ha_ipaddress(self, port_id, ipaddress):
"""Stores a Neutron Port Id as owner of HA IP Addr (idempotent API)."""
self.session.expunge_all()
try:
with self.session.begin(subtransactions=True):
obj = self._get_ha_ipaddress(port_id, ipaddress)
if obj:
return obj
else:
obj = HAIPAddressToPortAssocation(port_id=port_id,
ha_ip_address=ipaddress)
self.session.add(obj)
return obj
except db_exc.DBDuplicateEntry:
LOG.debug('Duplicate IP ownership entry for tuple %s',
(port_id, ipaddress))
def delete_port_id_for_ha_ipaddress(self, port_id, ipaddress):
with self.session.begin(subtransactions=True):
try:
return self.session.query(
HAIPAddressToPortAssocation).filter_by(
port_id=port_id,
ha_ip_address=ipaddress).delete()
except orm.exc.NoResultFound:
return
def get_ha_port_associations(self):
return self.session.query(HAIPAddressToPortAssocation).all()
class HAIPOwnerDbMixin(object):
def __init__(self):
self.ha_ip_handler = PortForHAIPAddress()
def _get_plugin(self):
return directory.get_plugin()
def update_ip_owner(self, ip_owner_info):
ports_to_update = set()
port_id = ip_owner_info.get('port')
ipv4 = ip_owner_info.get('ip_address_v4')
ipv6 = ip_owner_info.get('ip_address_v6')
network_id = ip_owner_info.get('network_id')
if not port_id or (not ipv4 and not ipv6):
return ports_to_update
LOG.debug("Got IP owner update: %s", ip_owner_info)
core_plugin = self._get_plugin()
port = core_plugin.get_port(nctx.get_admin_context(), port_id)
if not port:
LOG.debug("Ignoring update for non-existent port: %s", port_id)
return ports_to_update
ports_to_update.add(port_id)
for ipa in [ipv4, ipv6]:
if not ipa:
continue
try:
old_owner = self.ha_ip_handler.get_port_for_ha_ipaddress(
ipa, network_id or port['network_id'])
with self.ha_ip_handler.session.begin(subtransactions=True):
self.ha_ip_handler.set_port_id_for_ha_ipaddress(port_id,
ipa)
if old_owner and old_owner['port_id'] != port_id:
self.ha_ip_handler.delete_port_id_for_ha_ipaddress(
old_owner['port_id'], ipa)
ports_to_update.add(old_owner['port_id'])
except db_exc.DBReferenceError as dbe:
LOG.debug("Ignoring FK error for port %s: %s", port_id, dbe)
return ports_to_update

View File

@@ -0,0 +1,138 @@
# Copyright (c) 2017 Cisco Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.tests import base
from gbpservice.neutron.agent.topology import apic_topology
PERIODIC_TASK = 'oslo_service.periodic_task'
DEV_EXISTS = 'neutron.agent.linux.ip_lib.device_exists'
IP_DEVICE = 'neutron.agent.linux.ip_lib.IPDevice'
EXECUTE = 'neutron.agent.linux.utils.execute'
LLDP_CMD = ['lldpctl', '-f', 'keyvalue']
APIC_EXT_SWITCH = '203'
APIC_EXT_MODULE = '1'
APIC_EXT_PORT = '34'
APIC_UPLINK_PORTS = ['uplink_port']
SERVICE_HOST = 'host1'
SERVICE_HOST_IFACE = 'eth0'
SERVICE_HOST_MAC = 'aa:ee:ii:oo:uu:yy'
SERVICE_PEER_CHASSIS_NAME = 'leaf4'
SERVICE_PEER_CHASSIS = 'topology/pod-1/node-' + APIC_EXT_SWITCH
SERVICE_PEER_PORT_LOCAL = 'Eth%s/%s' % (APIC_EXT_MODULE, APIC_EXT_PORT)
SERVICE_PEER_PORT_DESC = ('topology/pod-1/paths-%s/pathep-[%s]' %
(APIC_EXT_SWITCH, SERVICE_PEER_PORT_LOCAL.lower()))
ETH0 = SERVICE_HOST_IFACE
LLDPCTL_RES = (
'lldp.' + ETH0 + '.via=LLDP\n'
'lldp.' + ETH0 + '.rid=1\n'
'lldp.' + ETH0 + '.age=0 day, 20:55:54\n'
'lldp.' + ETH0 + '.chassis.mac=' + SERVICE_HOST_MAC + '\n'
'lldp.' + ETH0 + '.chassis.name=' + SERVICE_PEER_CHASSIS_NAME + '\n'
'lldp.' + ETH0 + '.chassis.descr=' + SERVICE_PEER_CHASSIS + '\n'
'lldp.' + ETH0 + '.chassis.Bridge.enabled=on\n'
'lldp.' + ETH0 + '.chassis.Router.enabled=on\n'
'lldp.' + ETH0 + '.port.local=' + SERVICE_PEER_PORT_LOCAL + '\n'
'lldp.' + ETH0 + '.port.descr=' + SERVICE_PEER_PORT_DESC)
class TestCiscoApicTopologyAgent(base.BaseTestCase):
def setUp(self):
super(TestCiscoApicTopologyAgent, self).setUp()
# Configure the Cisco APIC mechanism driver
cfg.CONF.set_override('apic_host_uplink_ports',
APIC_UPLINK_PORTS, 'ml2_cisco_apic')
# Patch device_exists
self.dev_exists = mock.patch(DEV_EXISTS).start()
# Patch IPDevice
ipdev_c = mock.patch(IP_DEVICE).start()
self.ipdev = mock.Mock()
ipdev_c.return_value = self.ipdev
self.ipdev.link.address = SERVICE_HOST_MAC
# Patch execute
self.execute = mock.patch(EXECUTE).start()
self.execute.return_value = LLDPCTL_RES
# Patch tasks
self.periodic_task = mock.patch(PERIODIC_TASK).start()
self.agent = apic_topology.ApicTopologyAgent()
self.agent.host = SERVICE_HOST
self.agent.service_agent = mock.Mock()
self.agent.lldpcmd = LLDP_CMD
def test_init_host_device_exists(self):
self.agent.lldpcmd = None
self.dev_exists.return_value = True
self.agent.init_host()
self.assertEqual(LLDP_CMD + APIC_UPLINK_PORTS,
self.agent.lldpcmd)
def test_init_host_device_not_exist(self):
self.agent.lldpcmd = None
self.dev_exists.return_value = False
self.agent.init_host()
self.assertEqual(LLDP_CMD, self.agent.lldpcmd)
def test_get_peers(self):
self.agent.peers = {}
peers = self.agent._get_peers()
expected = [(SERVICE_HOST, SERVICE_HOST_IFACE,
SERVICE_HOST_MAC, APIC_EXT_SWITCH,
APIC_EXT_MODULE, APIC_EXT_PORT,
SERVICE_PEER_PORT_DESC)]
self.assertEqual(expected,
peers[SERVICE_HOST_IFACE])
def test_check_for_new_peers_no_peers(self):
self.agent.peers = {}
expected = (SERVICE_HOST, SERVICE_HOST_IFACE,
SERVICE_HOST_MAC, APIC_EXT_SWITCH,
APIC_EXT_MODULE, APIC_EXT_PORT,
SERVICE_PEER_PORT_DESC)
peers = {SERVICE_HOST_IFACE: [expected]}
context = mock.Mock()
with mock.patch.object(self.agent, '_get_peers',
return_value=peers):
self.agent._check_for_new_peers(context)
self.assertEqual(expected,
self.agent.peers[SERVICE_HOST_IFACE])
self.agent.service_agent.update_link.assert_called_once_with(
context, *expected)
def test_check_for_new_peers_with_peers(self):
expected = (SERVICE_HOST, SERVICE_HOST_IFACE,
SERVICE_HOST_MAC, APIC_EXT_SWITCH,
APIC_EXT_MODULE, APIC_EXT_PORT,
SERVICE_PEER_PORT_DESC)
peers = {SERVICE_HOST_IFACE: [expected]}
self.agent.peers = {SERVICE_HOST_IFACE:
[tuple(x + '1' for x in expected)]}
context = mock.Mock()
with mock.patch.object(self.agent, '_get_peers',
return_value=peers):
self.agent._check_for_new_peers(context)
self.agent.service_agent.update_link.assert_called_with(
context, *expected)

View File

@@ -0,0 +1,217 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron import context
from neutron.tests.unit import testlib_api
from oslo_db import exception as exc
from oslo_utils import importutils
from gbpservice.neutron.services.grouppolicy.drivers.cisco.apic import (
port_ha_ipaddress_binding as ha)
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class PortToHAIPAddressBindingTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(PortToHAIPAddressBindingTestCase, self).setUp()
self.plugin = importutils.import_object(DB_PLUGIN_KLASS)
self.context = context.get_admin_context()
self.net1_data = {'network': {'id': 'fake-net1-id',
'name': 'net1',
'admin_state_up': True,
'tenant_id': 'test-tenant',
'shared': False}}
self.net2_data = {'network': {'id': 'fake-net2-id',
'name': 'net2',
'admin_state_up': True,
'tenant_id': 'test-tenant',
'shared': False}}
self.port1_data = {'port': {'id': 'fake-port1-id',
'name': 'port1',
'network_id': 'fake-net1-id',
'tenant_id': 'test-tenant',
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': [],
'mac_address': 'fake-mac',
'admin_state_up': True}}
# Port that is in the same network as port_1
self.port1_2_data = {'port': {'id': 'fake-port1-2-id',
'name': 'port1',
'network_id': 'fake-net1-id',
'tenant_id': 'test-tenant',
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': [],
'mac_address': 'fake-mac-2',
'admin_state_up': True}}
self.port2_data = {'port': {'id': 'fake-port2-id',
'name': 'port2',
'network_id': 'fake-net2-id',
'tenant_id': 'test-tenant',
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': [],
'mac_address': 'fake-mac',
'admin_state_up': True}}
self.ha_ip1 = "ha-ip-1"
self.ha_ip2 = "ha-ip-2"
self.plugin.create_network(self.context, self.net1_data)
self.plugin.create_network(self.context, self.net2_data)
self.port1 = self.plugin.create_port(self.context, self.port1_data)
self.port1_2 = self.plugin.create_port(self.context, self.port1_2_data)
self.port2 = self.plugin.create_port(self.context, self.port2_data)
self.port_haip = ha.PortForHAIPAddress()
def test_set_and_get_port_to_ha_ip_binding(self):
# Test new HA IP address to port binding can be created
obj = self.port_haip.set_port_id_for_ha_ipaddress(
self.port1['id'], self.ha_ip1)
self.assertEqual(self.port1['id'], obj['port_id'])
self.assertEqual(self.ha_ip1, obj['ha_ip_address'])
# In this test case we also test that same HA IP address can be set/get
# for two different ports in different networks
obj = self.port_haip.set_port_id_for_ha_ipaddress(
self.port2['id'], self.ha_ip1)
self.assertEqual(self.port2['id'], obj['port_id'])
self.assertEqual(self.ha_ip1, obj['ha_ip_address'])
# Test get
obj = self.port_haip.get_port_for_ha_ipaddress(
self.ha_ip1, self.port1['network_id'])
self.assertEqual(self.port1['id'], obj['port_id'])
obj = self.port_haip.get_port_for_ha_ipaddress(
self.ha_ip1, self.port2['network_id'])
self.assertEqual(self.port2['id'], obj['port_id'])
def test_port_to_multiple_ha_ip_binding(self):
self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip1)
self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip2)
obj = self.port_haip.get_port_for_ha_ipaddress(
self.ha_ip1, self.port1['network_id'])
self.assertEqual(self.port1['id'], obj['port_id'])
obj = self.port_haip.get_port_for_ha_ipaddress(
self.ha_ip2, self.port1['network_id'])
self.assertEqual(self.port1['id'], obj['port_id'])
def test_delete_port_for_ha_ip_binding(self):
self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip1)
result = self.port_haip.delete_port_id_for_ha_ipaddress(
self.port1['id'], self.ha_ip1)
self.assertEqual(1, result)
obj = self.port_haip.get_port_for_ha_ipaddress(
self.ha_ip1, self.port2['network_id'])
self.assertIsNone(obj)
def test_get_ha_ip_addresses_for_port(self):
self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip1)
self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip2)
ha_ips = self.port_haip.get_ha_ipaddresses_for_port(self.port1['id'])
self.assertEqual(sorted([self.ha_ip1, self.ha_ip2]), ha_ips)
def test_idempotent(self):
self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip1)
obj = self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip1)
self.assertEqual(self.port1['id'], obj['port_id'])
self.assertEqual(self.ha_ip1, obj['ha_ip_address'])
def test_set_non_existing_port(self):
self.assertRaises(exc.DBReferenceError,
self.port_haip.set_port_id_for_ha_ipaddress,
"fake", self.ha_ip1)
def test_delete_non_existing_entry(self):
self.port_haip.set_port_id_for_ha_ipaddress(self.port1['id'],
self.ha_ip1)
result = self.port_haip.delete_port_id_for_ha_ipaddress(
self.port1['id'], "fake")
self.assertEqual(0, result)
result = self.port_haip.delete_port_id_for_ha_ipaddress("fake",
self.ha_ip1)
self.assertEqual(0, result)
def test_ip_owner_update(self):
mixin = ha.HAIPOwnerDbMixin()
mixin._get_plugin = mock.Mock(return_value=self.plugin)
ip_owner_info = {'port': self.port1['id'],
'ip_address_v4': self.ha_ip1}
# set new owner
ports = mixin.update_ip_owner(ip_owner_info)
obj = mixin.ha_ip_handler.get_port_for_ha_ipaddress(
self.ha_ip1, self.port1['network_id'])
self.assertEqual(self.port1['id'], obj['port_id'])
self.assertTrue(self.port1['id'] in ports)
# update owner
self.port2_data['port']['id'] = 'fake-port3-id'
self.port2_data['port']['network_id'] = self.port1['network_id']
self.port2_data['port']['mac_address'] = 'fake-mac-3'
port3 = self.plugin.create_port(self.context, self.port2_data)
ip_owner_info['port'] = port3['id']
ports = mixin.update_ip_owner(ip_owner_info)
obj = mixin.ha_ip_handler.get_port_for_ha_ipaddress(
self.ha_ip1, port3['network_id'])
self.assertEqual(port3['id'], obj['port_id'])
def test_ip_replaced(self):
mixin = ha.HAIPOwnerDbMixin()
mixin._get_plugin = mock.Mock(return_value=self.plugin)
ip_owner_info = {'port': self.port1['id'],
'ip_address_v4': self.ha_ip1}
mixin.update_ip_owner(ip_owner_info)
# Verify only one entry is there
dump = mixin.ha_ip_handler.get_ha_port_associations()
self.assertEqual(1, len(dump))
self.assertEqual(self.port1['id'], dump[0].port_id)
self.assertEqual(self.ha_ip1, dump[0].ha_ip_address)
# Now override with port1_2
ip_owner_info['port'] = self.port1_2['id']
mixin.update_ip_owner(ip_owner_info)
# Verify still one entry exists
dump = mixin.ha_ip_handler.get_ha_port_associations()
self.assertEqual(1, len(dump))
self.assertEqual(self.port1_2['id'], dump[0].port_id)
self.assertEqual(self.ha_ip1, dump[0].ha_ip_address)
# Override again, but with a different net_id to keep both records
ip_owner_info['port'] = self.port1['id']
ip_owner_info['network_id'] = 'new_net_id'
mixin.update_ip_owner(ip_owner_info)
# Verify still one entry exists
dump = mixin.ha_ip_handler.get_ha_port_associations()
self.assertEqual(2, len(dump))
def test_duplicate_entry_handled_gracefully(self):
self.port_haip.set_port_id_for_ha_ipaddress(
self.port1['id'], self.ha_ip1)
# Set this twice, without hijacking the query
obj = self.port_haip.set_port_id_for_ha_ipaddress(
self.port1['id'], self.ha_ip1)
self.assertEqual(obj.port_id, self.port1['id'])
self.assertEqual(obj.ha_ip_address, self.ha_ip1)
# Now simulate null return from query
self.port_haip._get_ha_ipaddress = mock.Mock(return_value=None)
obj = self.port_haip.set_port_id_for_ha_ipaddress(
self.port1['id'], self.ha_ip1)
self.assertIsNone(obj)

View File

@@ -40,6 +40,8 @@ scripts =
[entry_points]
console_scripts=
gbp-db-manage = gbpservice.neutron.db.migration.cli:main
neutron-cisco-apic-host-agent = gbpservice.neutron.agent.topology.apic_topology:agent_main
opflex-ns-proxy = apic_ml2.neutron.agent.metadata.namespace_proxy:main
neutron.core_plugins =
ml2plus = gbpservice.neutron.plugins.ml2plus.plugin:Ml2PlusPlugin
neutron.service_plugins =

View File

@@ -7,7 +7,6 @@
-e git+https://github.com/noironetworks/apicapi.git@master#egg=apicapi
-e git+https://github.com/noironetworks/python-opflex-agent.git@master#egg=python-opflexagent-agent
-e git+https://github.com/noironetworks/apic-ml2-driver.git@sumit/ocata#egg=apic_ml2
-e git+https://github.com/openstack/vmware-nsx.git@stable/ocata#egg=vmware_nsx
-e git+https://github.com/openstack/vmware-nsxlib.git@master#egg=vmware_nsxlib