Big Switch Networks code split

This commit removes all of the Big Switch Networks
code from the tree and replaces it with the import
statements required to pull in the entry points
from the new 'bsnstacklib' module.

All patches for BSN code should now be submitted
to the stackforge/networking-bigswitch project.

Partial-implements: blueprint core-vendor-decomposition
Change-Id: I4d83c9fd6a2953c329c247c78425f8e3280e8a42
changes/60/160360/5
Kevin Benton 8 years ago committed by Kevin Benton
parent aeae583fa5
commit b950be4391

@ -1,14 +0,0 @@
# Neuron REST Proxy Plug-in for Big Switch and FloodLight Controllers
This module provides a generic neutron plugin 'NeutronRestProxy' that
translates neutron function calls to authenticated REST requests (JSON supported)
to a set of redundant external network controllers.
It also keeps a local persistent store of neutron state that has been
setup using that API.
Currently the FloodLight Openflow Controller or the Big Switch Networks Controller
can be configured as external network controllers for this plugin.
For more details on this plugin, please refer to the following link:
http://www.openflowhub.org/display/floodlightcontroller/Neutron+REST+Proxy+Plugin

@ -15,152 +15,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import time
import eventlet
eventlet.monkey_patch()
from oslo_config import cfg
import oslo_messaging
from oslo_utils import excutils
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config
from neutron.common import topics
from neutron import context as q_context
from neutron.extensions import securitygroup as ext_sg
from neutron.i18n import _LE
from neutron.openstack.common import log
from neutron.plugins.bigswitch import config as pl_config
LOG = log.getLogger(__name__)
class IVSBridge(ovs_lib.OVSBridge):
'''
This class does not provide parity with OVS using IVS.
It's only the bare minimum necessary to use IVS with this agent.
'''
def run_vsctl(self, args, check_error=False):
full_args = ["ivs-ctl"] + args
try:
return utils.execute(full_args, run_as_root=True)
except Exception as e:
with excutils.save_and_reraise_exception() as ctxt:
LOG.error(_LE("Unable to execute %(cmd)s. "
"Exception: %(exception)s"),
{'cmd': full_args, 'exception': e})
if not check_error:
ctxt.reraise = False
def get_vif_port_set(self):
port_names = self.get_port_name_list()
edge_ports = set(port_names)
return edge_ports
def get_vif_port_by_id(self, port_id):
# IVS in nova uses hybrid method with last 14 chars of UUID
name = 'qvo%s' % port_id[:14]
if name in self.get_vif_port_set():
return name
return False
class RestProxyAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin):
target = oslo_messaging.Target(version='1.1')
def __init__(self, integ_br, polling_interval, vs='ovs'):
super(RestProxyAgent, self).__init__()
self.polling_interval = polling_interval
self._setup_rpc()
self.sg_agent = sg_rpc.SecurityGroupAgentRpc(self.context,
self.sg_plugin_rpc)
if vs == 'ivs':
self.int_br = IVSBridge(integ_br)
else:
self.int_br = ovs_lib.OVSBridge(integ_br)
def _setup_rpc(self):
self.topic = topics.AGENT
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
self.sg_plugin_rpc = sg_rpc.SecurityGroupServerRpcApi(topics.PLUGIN)
self.context = q_context.get_admin_context_without_session()
self.endpoints = [self]
consumers = [[topics.PORT, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE]]
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
def port_update(self, context, **kwargs):
LOG.debug("Port update received")
port = kwargs.get('port')
vif_port = self.int_br.get_vif_port_by_id(port['id'])
if not vif_port:
LOG.debug("Port %s is not present on this host.", port['id'])
return
LOG.debug("Port %s found. Refreshing firewall.", port['id'])
if ext_sg.SECURITYGROUPS in port:
self.sg_agent.refresh_firewall()
def _update_ports(self, registered_ports):
ports = self.int_br.get_vif_port_set()
if ports == registered_ports:
return
added = ports - registered_ports
removed = registered_ports - ports
return {'current': ports,
'added': added,
'removed': removed}
def _process_devices_filter(self, port_info):
if 'added' in port_info:
self.sg_agent.prepare_devices_filter(port_info['added'])
if 'removed' in port_info:
self.sg_agent.remove_devices_filter(port_info['removed'])
def daemon_loop(self):
ports = set()
while True:
start = time.time()
try:
port_info = self._update_ports(ports)
if port_info:
LOG.debug("Agent loop has new device")
self._process_devices_filter(port_info)
ports = port_info['current']
except Exception:
LOG.exception(_LE("Error in agent event loop"))
elapsed = max(time.time() - start, 0)
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!",
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
def main():
config.init(sys.argv[1:])
config.setup_logging()
pl_config.register_config()
from bsnstacklib.plugins.bigswitch.agent import restproxy_agent
integ_br = cfg.CONF.RESTPROXYAGENT.integration_bridge
polling_interval = cfg.CONF.RESTPROXYAGENT.polling_interval
bsnagent = RestProxyAgent(integ_br, polling_interval,
cfg.CONF.RESTPROXYAGENT.virtual_switch_type)
bsnagent.daemon_loop()
sys.exit(0)
if __name__ == "__main__":
main()
restproxy_agent.main()

@ -1,116 +0,0 @@
# Copyright 2014 Big Switch Networks, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
This module manages configuration options
"""
from oslo_config import cfg
from neutron.common import utils
from neutron.extensions import portbindings
restproxy_opts = [
cfg.ListOpt('servers', default=['localhost:8800'],
help=_("A comma separated list of Big Switch or Floodlight "
"servers and port numbers. The plugin proxies the "
"requests to the Big Switch/Floodlight server, "
"which performs the networking configuration. Only one"
"server is needed per deployment, but you may wish to"
"deploy multiple servers to support failover.")),
cfg.StrOpt('server_auth', secret=True,
help=_("The username and password for authenticating against "
" the Big Switch or Floodlight controller.")),
cfg.BoolOpt('server_ssl', default=True,
help=_("If True, Use SSL when connecting to the Big Switch or "
"Floodlight controller.")),
cfg.BoolOpt('ssl_sticky', default=True,
help=_("Trust and store the first certificate received for "
"each controller address and use it to validate future "
"connections to that address.")),
cfg.BoolOpt('no_ssl_validation', default=False,
help=_("Disables SSL certificate validation for controllers")),
cfg.BoolOpt('cache_connections', default=True,
help=_("Re-use HTTP/HTTPS connections to the controller.")),
cfg.StrOpt('ssl_cert_directory',
default='/etc/neutron/plugins/bigswitch/ssl',
help=_("Directory containing ca_certs and host_certs "
"certificate directories.")),
cfg.BoolOpt('sync_data', default=False,
help=_("Sync data on connect")),
cfg.BoolOpt('auto_sync_on_failure', default=True,
help=_("If neutron fails to create a resource because "
"the backend controller doesn't know of a dependency, "
"the plugin automatically triggers a full data "
"synchronization to the controller.")),
cfg.IntOpt('consistency_interval', default=60,
help=_("Time between verifications that the backend controller "
"database is consistent with Neutron. (0 to disable)")),
cfg.IntOpt('server_timeout', default=10,
help=_("Maximum number of seconds to wait for proxy request "
"to connect and complete.")),
cfg.IntOpt('thread_pool_size', default=4,
help=_("Maximum number of threads to spawn to handle large "
"volumes of port creations.")),
cfg.StrOpt('neutron_id', default='neutron-' + utils.get_hostname(),
deprecated_name='quantum_id',
help=_("User defined identifier for this Neutron deployment")),
cfg.BoolOpt('add_meta_server_route', default=True,
help=_("Flag to decide if a route to the metadata server "
"should be injected into the VM")),
]
router_opts = [
cfg.MultiStrOpt('tenant_default_router_rule', default=['*:any:any:permit'],
help=_("The default router rules installed in new tenant "
"routers. Repeat the config option for each rule. "
"Format is <tenant>:<source>:<destination>:<action>"
" Use an * to specify default for all tenants.")),
cfg.IntOpt('max_router_rules', default=200,
help=_("Maximum number of router rules")),
]
nova_opts = [
cfg.StrOpt('vif_type', default='ovs',
help=_("Virtual interface type to configure on "
"Nova compute nodes")),
]
# Each VIF Type can have a list of nova host IDs that are fixed to that type
for i in portbindings.VIF_TYPES:
opt = cfg.ListOpt('node_override_vif_' + i, default=[],
help=_("Nova compute nodes to manually set VIF "
"type to %s") % i)
nova_opts.append(opt)
# Add the vif types for reference later
nova_opts.append(cfg.ListOpt('vif_types',
default=portbindings.VIF_TYPES,
help=_('List of allowed vif_type values.')))
agent_opts = [
cfg.StrOpt('integration_bridge', default='br-int',
help=_('Name of integration bridge on compute '
'nodes used for security group insertion.')),
cfg.IntOpt('polling_interval', default=5,
help=_('Seconds between agent checks for port changes')),
cfg.StrOpt('virtual_switch_type', default='ovs',
help=_('Virtual switch type.'))
]
def register_config():
cfg.CONF.register_opts(restproxy_opts, "RESTPROXY")
cfg.CONF.register_opts(router_opts, "ROUTER")
cfg.CONF.register_opts(nova_opts, "NOVA")
cfg.CONF.register_opts(agent_opts, "RESTPROXYAGENT")

@ -12,42 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import re
import string
import time
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import session
import sqlalchemy as sa
from neutron.db import model_base
from neutron.i18n import _LI, _LW
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# Maximum time in seconds to wait for a single record lock to be released
# NOTE: The total time waiting may exceed this if there are multiple servers
# waiting for the same lock
MAX_LOCK_WAIT_TIME = 15
def setup_db():
'''Helper to register models for unit tests'''
if HashHandler._FACADE is None:
HashHandler._FACADE = session.EngineFacade.from_config(
cfg.CONF, sqlite_fk=True)
ConsistencyHash.metadata.create_all(
HashHandler._FACADE.get_engine())
def clear_db():
'''Helper to unregister models and clear engine in unit tests'''
if not HashHandler._FACADE:
return
ConsistencyHash.metadata.drop_all(HashHandler._FACADE.get_engine())
HashHandler._FACADE = None
class ConsistencyHash(model_base.BASEV2):
@ -61,165 +28,3 @@ class ConsistencyHash(model_base.BASEV2):
hash_id = sa.Column(sa.String(255),
primary_key=True)
hash = sa.Column(sa.String(255), nullable=False)
class HashHandler(object):
'''
A wrapper object to keep track of the session between the read
and the update operations.
This class needs an SQL engine completely independent of the main
neutron connection so rollbacks from consistency hash operations don't
affect the parent sessions.
'''
_FACADE = None
def __init__(self, hash_id='1'):
if HashHandler._FACADE is None:
HashHandler._FACADE = session.EngineFacade.from_config(
cfg.CONF, sqlite_fk=True)
self.hash_id = hash_id
self.session = HashHandler._FACADE.get_session(autocommit=True,
expire_on_commit=False)
self.random_lock_id = ''.join(random.choice(string.ascii_uppercase
+ string.digits)
for _ in range(10))
self.lock_marker = 'LOCKED_BY[%s]' % self.random_lock_id
def _get_current_record(self):
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if res:
self.session.refresh(res) # make sure latest is loaded from db
return res
def _insert_empty_hash_with_lock(self):
# try to insert a new hash, return False on conflict
try:
with self.session.begin(subtransactions=True):
res = ConsistencyHash(hash_id=self.hash_id,
hash=self.lock_marker)
self.session.add(res)
return True
except db_exc.DBDuplicateEntry:
# another server created a new record at the same time
return False
def _optimistic_update_hash_record(self, old_record, new_hash):
# Optimistic update strategy. Returns True if successful, else False.
query = sa.update(ConsistencyHash.__table__).values(hash=new_hash)
query = query.where(ConsistencyHash.hash_id == old_record.hash_id)
query = query.where(ConsistencyHash.hash == old_record.hash)
with self._FACADE.get_engine().begin() as conn:
result = conn.execute(query)
# We need to check update row count in case another server is
# doing this at the same time. Only one will succeed, the other will
# not update any rows.
return result.rowcount != 0
def _get_lock_owner(self, record):
matches = re.findall(r"^LOCKED_BY\[(\w+)\]", record)
if not matches:
return None
return matches[0]
def read_for_update(self):
# An optimistic locking strategy with a timeout to avoid using a
# consistency hash while another server is using it. This will
# not return until a lock is acquired either normally or by stealing
# it after an individual ID holds it for greater than
# MAX_LOCK_WAIT_TIME.
lock_wait_start = None
last_lock_owner = None
while True:
res = self._get_current_record()
if not res:
# no current entry. try to insert to grab lock
if not self._insert_empty_hash_with_lock():
# A failed insert after missing current record means
# a concurrent insert occurred. Start process over to
# find the new record.
LOG.debug("Concurrent record inserted. Retrying.")
time.sleep(0.25)
continue
# The empty hash was successfully inserted with our lock
return ''
current_lock_owner = self._get_lock_owner(res.hash)
if not current_lock_owner:
# no current lock. attempt to lock
new = self.lock_marker + res.hash
if not self._optimistic_update_hash_record(res, new):
# someone else beat us to it. restart process to wait
# for new lock ID to be removed
LOG.debug(
"Failed to acquire lock. Restarting lock wait. "
"Previous hash: %(prev)s. Attempted update: %(new)s",
{'prev': res.hash, 'new': new})
time.sleep(0.25)
continue
# successfully got the lock
return res.hash
LOG.debug("This request's lock ID is %(this)s. "
"DB lock held by %(that)s",
{'this': self.random_lock_id,
'that': current_lock_owner})
if current_lock_owner == self.random_lock_id:
# no change needed, we already have the table lock due to
# previous read_for_update call.
# return hash with lock tag stripped off for use in a header
return res.hash.replace(self.lock_marker, '')
if current_lock_owner != last_lock_owner:
# The owner changed since the last iteration, but it
# wasn't to us. Reset the counter. Log if not
# first iteration.
if lock_wait_start:
LOG.debug("Lock owner changed from %(old)s to %(new)s "
"while waiting to acquire it.",
{'old': last_lock_owner,
'new': current_lock_owner})
lock_wait_start = time.time()
last_lock_owner = current_lock_owner
if time.time() - lock_wait_start > MAX_LOCK_WAIT_TIME:
# the lock has been held too long, steal it
LOG.warning(_LW("Gave up waiting for consistency DB "
"lock, trying to take it. "
"Current hash is: %s"), res.hash)
new_db_value = res.hash.replace(current_lock_owner,
self.random_lock_id)
if self._optimistic_update_hash_record(res, new_db_value):
return res.hash.replace(new_db_value, '')
LOG.info(_LI("Failed to take lock. Another process updated "
"the DB first."))
def clear_lock(self):
LOG.debug("Clearing hash record lock of id %s", self.random_lock_id)
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if not res:
LOG.warning(_LW("Hash record already gone, no lock to clear."))
return
if not res.hash.startswith(self.lock_marker):
# if these are frequent the server is too slow
LOG.warning(_LW("Another server already removed the lock. %s"),
res.hash)
return
res.hash = res.hash.replace(self.lock_marker, '')
def put_hash(self, hash):
hash = hash or ''
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if res:
res.hash = hash
else:
conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
self.session.merge(conhash)
LOG.debug("Consistency hash for group %(hash_id)s updated "
"to %(hash)s", {'hash_id': self.hash_id, 'hash': hash})

@ -1,52 +0,0 @@
# Copyright 2013, Big Switch Networks
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes
from neutron.i18n import _LW
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def get_port_hostid(context, port_id):
# REVISIT(kevinbenton): this is a workaround to avoid portbindings_db
# relational table generation until one of the functions is called.
from neutron.db import portbindings_db
with context.session.begin(subtransactions=True):
query = context.session.query(portbindings_db.PortBindingPort)
res = query.filter_by(port_id=port_id).first()
if not res:
return False
return res.host
def put_port_hostid(context, port_id, host):
# REVISIT(kevinbenton): this is a workaround to avoid portbindings_db
# relational table generation until one of the functions is called.
from neutron.db import portbindings_db
if not attributes.is_attr_set(host):
LOG.warning(_LW("No host_id in port request to track port location."))
return
if port_id == '':
LOG.warning(_LW("Received an empty port ID for host_id '%s'"), host)
return
if host == '':
LOG.debug("Received an empty host_id for port '%s'", port_id)
return
LOG.debug("Logging port %(port)s on host_id %(host)s",
{'port': port_id, 'host': host})
with context.session.begin(subtransactions=True):
location = portbindings_db.PortBindingPort(port_id=port_id, host=host)
context.session.merge(location)

@ -1,140 +0,0 @@
# Copyright 2013 Big Switch Networks, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.v2 import attributes as attr
from neutron.common import exceptions as nexception
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# Router Rules Exceptions
class InvalidRouterRules(nexception.InvalidInput):
message = _("Invalid format for router rules: %(rule)s, %(reason)s")
class RulesExhausted(nexception.BadRequest):
message = _("Unable to complete rules update for %(router_id)s. "
"The number of rules exceeds the maximum %(quota)s.")
def convert_to_valid_router_rules(data):
"""
Validates and converts router rules to the appropriate data structure
Example argument = [{'source': 'any', 'destination': 'any',
'action':'deny'},
{'source': '1.1.1.1/32', 'destination': 'external',
'action':'permit',
'nexthops': ['1.1.1.254', '1.1.1.253']}
]
"""
V4ANY = '0.0.0.0/0'
CIDRALL = ['any', 'external']
if not isinstance(data, list):
emsg = _("Invalid data format for router rule: '%s'") % data
LOG.debug(emsg)
raise nexception.InvalidInput(error_message=emsg)
_validate_uniquerules(data)
rules = []
expected_keys = ['source', 'destination', 'action']
for rule in data:
rule['nexthops'] = rule.get('nexthops', [])
if not isinstance(rule['nexthops'], list):
rule['nexthops'] = rule['nexthops'].split('+')
src = V4ANY if rule['source'] in CIDRALL else rule['source']
dst = V4ANY if rule['destination'] in CIDRALL else rule['destination']
errors = [attr._verify_dict_keys(expected_keys, rule, False),
attr._validate_subnet(dst),
attr._validate_subnet(src),
_validate_nexthops(rule['nexthops']),
_validate_action(rule['action'])]
errors = [m for m in errors if m]
if errors:
LOG.debug(errors)
raise nexception.InvalidInput(error_message=errors)
rules.append(rule)
return rules
def _validate_nexthops(nexthops):
seen = []
for ip in nexthops:
msg = attr._validate_ip_address(ip)
if ip in seen:
msg = _("Duplicate nexthop in rule '%s'") % ip
seen.append(ip)
if msg:
return msg
def _validate_action(action):
if action not in ['permit', 'deny']:
return _("Action must be either permit or deny."
" '%s' was provided") % action
def _validate_uniquerules(rules):
pairs = []
for r in rules:
if 'source' not in r or 'destination' not in r:
continue
pairs.append((r['source'], r['destination']))
if len(set(pairs)) != len(pairs):
error = _("Duplicate router rules (src,dst) found '%s'") % pairs
LOG.debug(error)
raise nexception.InvalidInput(error_message=error)
class Routerrule(object):
@classmethod
def get_name(cls):
return "Neutron Router Rule"
@classmethod
def get_alias(cls):
return "router_rules"
@classmethod
def get_description(cls):
return "Router rule configuration for L3 router"
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/neutron/routerrules/api/v1.0"
@classmethod
def get_updated(cls):
return "2013-05-23T10:00:00-00:00"
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}
# Attribute Map
EXTENDED_ATTRIBUTES_2_0 = {
'routers': {
'router_rules': {'allow_post': False, 'allow_put': True,
'convert_to': convert_to_valid_router_rules,
'is_visible': True,
'default': attr.ATTR_NOT_SPECIFIED},
}
}

@ -22,285 +22,7 @@ It is intended to be used in conjunction with the Big Switch ML2 driver or the
Big Switch core plugin.
"""
from oslo_config import cfg
from oslo_utils import excutils
from bsnstacklib.plugins.bigswitch import l3_router_plugin
from neutron.api import extensions as neutron_extensions
from neutron.common import exceptions
from neutron.common import log
from neutron.db import l3_db
from neutron.extensions import l3
from neutron.i18n import _LE
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch import extensions
from neutron.plugins.bigswitch import plugin as cplugin
from neutron.plugins.bigswitch import routerrule_db
from neutron.plugins.bigswitch import servermanager
from neutron.plugins.common import constants
# number of fields in a router rule string
ROUTER_RULE_COMPONENT_COUNT = 5
LOG = logging.getLogger(__name__)
put_context_in_serverpool = cplugin.put_context_in_serverpool
class L3RestProxy(cplugin.NeutronRestProxyV2Base,
routerrule_db.RouterRule_db_mixin):
supported_extension_aliases = ["router", "router_rules"]
@staticmethod
def get_plugin_type():
return constants.L3_ROUTER_NAT
@staticmethod
def get_plugin_description():
return _("L3 Router Service Plugin for Big Switch fabric")
def __init__(self):
# Include the Big Switch Extensions path in the api_extensions
neutron_extensions.append_api_extensions_path(extensions.__path__)
super(L3RestProxy, self).__init__()
self.servers = servermanager.ServerPool.get_instance()
@put_context_in_serverpool
@log.log
def create_router(self, context, router):
self._warn_on_state_status(router['router'])
tenant_id = self._get_tenant_id_for_create(context, router["router"])
# set default router rules
rules = self._get_tenant_default_router_rules(tenant_id)
router['router']['router_rules'] = rules
with context.session.begin(subtransactions=True):
# create router in DB
new_router = super(L3RestProxy, self).create_router(context,
router)
mapped_router = self._map_state_and_status(new_router)
self.servers.rest_create_router(tenant_id, mapped_router)
# return created router
return new_router
@put_context_in_serverpool
@log.log
def update_router(self, context, router_id, router):
self._warn_on_state_status(router['router'])
orig_router = super(L3RestProxy, self).get_router(context, router_id)
tenant_id = orig_router["tenant_id"]
with context.session.begin(subtransactions=True):
new_router = super(L3RestProxy,
self).update_router(context, router_id, router)
router = self._map_state_and_status(new_router)
# look up the network on this side to save an expensive query on
# the backend controller.
if router and router.get('external_gateway_info'):
router['external_gateway_info']['network'] = self.get_network(
context.elevated(),
router['external_gateway_info']['network_id'])
# update router on network controller
self.servers.rest_update_router(tenant_id, router, router_id)
# return updated router
return new_router
@put_context_in_serverpool
@log.log
def delete_router(self, context, router_id):
with context.session.begin(subtransactions=True):
orig_router = self._get_router(context, router_id)
tenant_id = orig_router["tenant_id"]
# Ensure that the router is not used
router_filter = {'router_id': [router_id]}
fips = self.get_floatingips_count(context.elevated(),
filters=router_filter)
if fips:
raise l3.RouterInUse(router_id=router_id)
device_owner = l3_db.DEVICE_OWNER_ROUTER_INTF
device_filter = {'device_id': [router_id],
'device_owner': [device_owner]}
ports = self.get_ports_count(context.elevated(),
filters=device_filter)
if ports:
raise l3.RouterInUse(router_id=router_id)
super(L3RestProxy, self).delete_router(context, router_id)
# delete from network controller
self.servers.rest_delete_router(tenant_id, router_id)
@put_context_in_serverpool
@log.log
def add_router_interface(self, context, router_id, interface_info):
# Validate args
router = self._get_router(context, router_id)
tenant_id = router['tenant_id']
with context.session.begin(subtransactions=True):
# create interface in DB
new_intf_info = super(L3RestProxy,
self).add_router_interface(context,
router_id,
interface_info)
port = self._get_port(context, new_intf_info['port_id'])
net_id = port['network_id']
subnet_id = new_intf_info['subnet_id']
# we will use the port's network id as interface's id
interface_id = net_id
intf_details = self._get_router_intf_details(context,
interface_id,
subnet_id)
# create interface on the network controller
self.servers.rest_add_router_interface(tenant_id, router_id,
intf_details)
return new_intf_info
@put_context_in_serverpool
@log.log
def remove_router_interface(self, context, router_id, interface_info):
# Validate args
router = self._get_router(context, router_id)
tenant_id = router['tenant_id']
# we will first get the interface identifier before deleting in the DB
if not interface_info:
msg = _("Either subnet_id or port_id must be specified")
raise exceptions.BadRequest(resource='router', msg=msg)
if 'port_id' in interface_info:
port = self._get_port(context, interface_info['port_id'])
interface_id = port['network_id']
elif 'subnet_id' in interface_info:
subnet = self._get_subnet(context, interface_info['subnet_id'])
interface_id = subnet['network_id']
else:
msg = _("Either subnet_id or port_id must be specified")
raise exceptions.BadRequest(resource='router', msg=msg)
with context.session.begin(subtransactions=True):
# remove router in DB
del_ret = super(L3RestProxy,
self).remove_router_interface(context,
router_id,
interface_info)
# create router on the network controller
self.servers.rest_remove_router_interface(tenant_id, router_id,
interface_id)
return del_ret
@put_context_in_serverpool
@log.log
def create_floatingip(self, context, floatingip):
with context.session.begin(subtransactions=True):
# create floatingip in DB
new_fl_ip = super(L3RestProxy,
self).create_floatingip(context, floatingip)
# create floatingip on the network controller
try:
if 'floatingip' in self.servers.get_capabilities():
self.servers.rest_create_floatingip(
new_fl_ip['tenant_id'], new_fl_ip)
else:
self._send_floatingip_update(context)
except servermanager.RemoteRestError as e:
with excutils.save_and_reraise_exception():
LOG.error(
_LE("NeutronRestProxyV2: Unable to create remote "
"floating IP: %s"), e)
# return created floating IP
return new_fl_ip
@put_context_in_serverpool
@log.log
def update_floatingip(self, context, id, floatingip):
with context.session.begin(subtransactions=True):
# update floatingip in DB
new_fl_ip = super(L3RestProxy,
self).update_floatingip(context, id, floatingip)
# update network on network controller
if 'floatingip' in self.servers.get_capabilities():
self.servers.rest_update_floatingip(new_fl_ip['tenant_id'],
new_fl_ip, id)
else:
self._send_floatingip_update(context)
return new_fl_ip
@put_context_in_serverpool
@log.log
def delete_floatingip(self, context, id):
with context.session.begin(subtransactions=True):
# delete floating IP in DB
old_fip = super(L3RestProxy, self).get_floatingip(context, id)
super(L3RestProxy, self).delete_floatingip(context, id)
# update network on network controller
if 'floatingip' in self.servers.get_capabilities():
self.servers.rest_delete_floatingip(old_fip['tenant_id'], id)
else:
self._send_floatingip_update(context)
@put_context_in_serverpool
@log.log
def disassociate_floatingips(self, context, port_id, do_notify=True):
router_ids = super(L3RestProxy, self).disassociate_floatingips(
context, port_id, do_notify=do_notify)
self._send_floatingip_update(context)
return router_ids
# overriding method from l3_db as original method calls
# self.delete_floatingip() which in turn calls self.delete_port() which
# is locked with 'bsn-port-barrier'
@put_context_in_serverpool
def delete_disassociated_floatingips(self, context, network_id):
query = self._model_query(context, l3_db.FloatingIP)
query = query.filter_by(floating_network_id=network_id,
fixed_port_id=None,
router_id=None)
for fip in query:
context.session.delete(fip)
self._delete_port(context.elevated(), fip['floating_port_id'])
def _send_floatingip_update(self, context):
try:
ext_net_id = self.get_external_network_id(context)
if ext_net_id:
# Use the elevated state of the context for the ext_net query
admin_context = context.elevated()
ext_net = super(L3RestProxy,
self).get_network(admin_context, ext_net_id)
# update external network on network controller
self._send_update_network(ext_net, admin_context)
except exceptions.TooManyExternalNetworks:
# get_external_network can raise errors when multiple external
# networks are detected, which isn't supported by the Plugin
LOG.error(_LE("NeutronRestProxyV2: too many external networks"))
def _get_tenant_default_router_rules(self, tenant):
rules = cfg.CONF.ROUTER.tenant_default_router_rule
default_set = []
tenant_set = []
for rule in rules:
items = rule.split(':')
# put an empty string on the end if nexthops wasn't specified
if len(items) < ROUTER_RULE_COMPONENT_COUNT:
items.append('')
try:
(tenant_id, source, destination, action, nexthops) = items
except ValueError:
continue
parsed_rule = {'source': source,
'destination': destination, 'action': action,
'nexthops': [hop for hop in nexthops.split(',')
if hop]}
if tenant_id == '*':
default_set.append(parsed_rule)
if tenant_id == tenant:
tenant_set.append(parsed_rule)
return tenant_set if tenant_set else default_set
L3RestProxy = l3_router_plugin.L3RestProxy

@ -16,880 +16,9 @@
"""
Neutron REST Proxy Plug-in for Big Switch and FloodLight Controllers.
NeutronRestProxy provides a generic neutron plugin that translates all plugin
function calls to equivalent authenticated REST calls to a set of redundant
external network controllers. It also keeps persistent store for all neutron
state to allow for re-sync of the external controller(s), if required.
The local state on the plugin also allows for local response and fast-fail
semantics where it can be determined based on the local persistent store.
Network controller specific code is decoupled from this plugin and expected
to reside on the controller itself (via the REST interface).
This allows for:
- independent authentication and redundancy schemes between neutron and the
network controller
- independent upgrade/development cycles between neutron and the controller
as it limits the proxy code upgrade requirement to neutron release cycle
and the controller specific code upgrade requirement to controller code
- ability to sync the controller with neutron for independent recovery/reset
External REST API used by proxy is the same API as defined for neutron (JSON
subset) with some additional parameters (gateway on network-create and macaddr
on port-attach) on an additional PUT to do a bulk dump of all persistent data.
See http://github.com/stackforge/networking-bigswitch for more information
"""
from bsnstacklib.plugins.bigswitch import plugin
import copy
import functools
import httplib
import re
import eventlet
from oslo_config import cfg
import oslo_messaging
from oslo_utils import importutils
from sqlalchemy.orm import exc as sqlexc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
from neutron.common import constants as const
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import context as qcontext
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import external_net_db
from neutron.db import extradhcpopt_db
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.db import securitygroups_db as sg_db
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import external_net
from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.extensions import portbindings
from neutron import manager
from neutron.i18n import _LE, _LI, _LW
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch import config as pl_config
from neutron.plugins.bigswitch.db import porttracker_db
from neutron.plugins.bigswitch import extensions
from neutron.plugins.bigswitch import servermanager
from neutron.plugins.bigswitch import version
from neutron.plugins.common import constants as pconst
LOG = logging.getLogger(__name__)
SYNTAX_ERROR_MESSAGE = _('Syntax error in server config file, aborting plugin')
METADATA_SERVER_IP = '169.254.169.254'
class AgentNotifierApi(sg_rpc.SecurityGroupAgentRpcApiMixin):
def __init__(self, topic):
self.topic = topic
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def port_update(self, context, port):
topic_port_update = topics.get_topic_name(self.client.target.topic,
topics.PORT, topics.UPDATE)
cctxt = self.client.prepare(fanout=True, topic=topic_port_update)
cctxt.cast(context, 'port_update', port=port)
class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
def get_port_from_device(self, device):
port_id = re.sub(r"^%s" % const.TAP_DEVICE_PREFIX, "", device)
port = self.get_port_and_sgs(port_id)
if port:
port['device'] = device
return port
def get_port_and_sgs(self, port_id):
"""Get port from database with security group info."""
LOG.debug("get_port_and_sgs() called for port_id %s", port_id)
session = db.get_session()
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
with session.begin(subtransactions=True):
query = session.query(
models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id
)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id.startswith(port_id))
port_and_sgs = query.all()
if not port_and_sgs:
return
port = port_and_sgs[0][0]
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict['security_groups'] = [
sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict
class NeutronRestProxyV2Base(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin):
supported_extension_aliases = ["binding"]
servers = None
@property
def l3_plugin(self):
return manager.NeutronManager.get_service_plugins().get(
pconst.L3_ROUTER_NAT)
def _get_all_data(self, get_ports=True, get_floating_ips=True,
get_routers=True):
admin_context = qcontext.get_admin_context()
networks = []
# this method is used by the ML2 driver so it can't directly invoke
# the self.get_(ports|networks) methods
plugin = manager.NeutronManager.get_plugin()
all_networks = plugin.get_networks(admin_context) or []
for net in all_networks:
mapped_network = self._get_mapped_network_with_subnets(net)
flips_n_ports = mapped_network
if get_floating_ips:
flips_n_ports = self._get_network_with_floatingips(
mapped_network)
if get_ports:
ports = []
net_filter = {'network_id': [net.get('id')]}
net_ports = plugin.get_ports(admin_context,
filters=net_filter) or []
for port in net_ports:
mapped_port = self._map_state_and_status(port)
mapped_port['attachment'] = {
'id': port.get('device_id'),
'mac': port.get('mac_address'),
}
mapped_port = self._extend_port_dict_binding(admin_context,
mapped_port)
ports.append(mapped_port)
flips_n_ports['ports'] = ports
if flips_n_ports:
networks.append(flips_n_ports)
data = {'networks': networks}
if get_routers and self.l3_plugin:
routers = []
all_routers = self.l3_plugin.get_routers(admin_context) or []
for router in all_routers:
interfaces = []
mapped_router = self._map_state_and_status(router)
router_filter = {
'device_owner': [const.DEVICE_OWNER_ROUTER_INTF],
'device_id': [router.get('id')]
}
router_ports = self.get_ports(admin_context,
filters=router_filter) or []
for port in router_ports:
net_id = port.get('network_id')
subnet_id = port['fixed_ips'][0]['subnet_id']
intf_details = self._get_router_intf_details(admin_context,
net_id,
subnet_id)
interfaces.append(intf_details)
mapped_router['interfaces'] = interfaces
routers.append(mapped_router)
data.update({'routers': routers})
return data
def _send_all_data(self, send_ports=True, send_floating_ips=True,
send_routers=True, timeout=None,
triggered_by_tenant=None):
"""Pushes all data to network ctrl (networks/ports, ports/attachments).
This gives the controller an option to re-sync it's persistent store
with neutron's current view of that data.
"""
data = self._get_all_data(send_ports, send_floating_ips, send_routers)
data['triggered_by_tenant'] = triggered_by_tenant
errstr = _("Unable to update remote topology: %s")
return self.servers.rest_action('PUT', servermanager.TOPOLOGY_PATH,
data, errstr, timeout=timeout)
def _get_network_with_floatingips(self, network, context=None):
if context is None:
context = qcontext.get_admin_context()
net_id = network['id']
net_filter = {'floating_network_id': [net_id]}
if self.l3_plugin:
fl_ips = self.l3_plugin.get_floatingips(context,
filters=net_filter) or []
network['floatingips'] = fl_ips
return network
def _get_all_subnets_json_for_network(self, net_id, context=None):
if context is None:
context = qcontext.get_admin_context()
# start a sub-transaction to avoid breaking parent transactions
with context.session.begin(subtransactions=True):
subnets = self._get_subnets_by_network(context,
net_id)
subnets_details = []
if subnets:
for subnet in subnets:
subnet_dict = self._make_subnet_dict(subnet)
mapped_subnet = self._map_state_and_status(subnet_dict)
subnets_details.append(mapped_subnet)
return subnets_details
def _get_mapped_network_with_subnets(self, network, context=None):
# if context is not provided, admin context is used
if context is None:
context = qcontext.get_admin_context()
network = self._map_state_and_status(network)
subnets = self._get_all_subnets_json_for_network(network['id'],
context)
network['subnets'] = subnets
for subnet in (subnets or []):
if subnet['gateway_ip']:
# FIX: For backward compatibility with wire protocol
network['gateway'] = subnet['gateway_ip']
break
else:
network['gateway'] = ''
network[external_net.EXTERNAL] = self._network_is_external(
context, network['id'])
# include ML2 segmentation types
network['segmentation_types'] = getattr(self, "segmentation_types", "")
return network
def _send_create_network(self, network, context=None):
tenant_id = network['tenant_id']
mapped_network = self._get_mapped_network_with_subnets(network,
context)
self.servers.rest_create_network(tenant_id, mapped_network)
def _send_update_network(self, network, context=None):
net_id = network['id']
tenant_id = network['tenant_id']
mapped_network = self._get_mapped_network_with_subnets(network,
context)
net_fl_ips = self._get_network_with_floatingips(mapped_network,
context)
self.servers.rest_update_network(tenant_id, net_id, net_fl_ips)
def _send_delete_network(self, network, context=None):
net_id = network['id']
tenant_id = network['tenant_id']
self.servers.rest_delete_network(tenant_id, net_id)
def _map_state_and_status(self, resource):
resource = copy.copy(resource)
resource['state'] = ('UP' if resource.pop('admin_state_up',
True) else 'DOWN')
resource.pop('status', None)
return resource
def _warn_on_state_status(self, resource):
if resource.get('admin_state_up', True) is False:
LOG.warning(_LW("Setting admin_state_up=False is not supported "
"in this plugin version. Ignoring setting for "
"resource: %s"), resource)
if 'status' in resource:
if resource['status'] != const.NET_STATUS_ACTIVE:
LOG.warning(_LW("Operational status is internally set by the "
"plugin. Ignoring setting status=%s."),
resource['status'])
def _get_router_intf_details(self, context, intf_id, subnet_id):
# we will use the network id as interface's id
net_id = intf_id
network = self.get_network(context, net_id)
subnet = self.get_subnet(context, subnet_id)
mapped_network = self._get_mapped_network_with_subnets(network)
mapped_subnet = self._map_state_and_status(subnet)
data = {
'id': intf_id,
"network": mapped_network,
"subnet": mapped_subnet
}
return data
def _extend_port_dict_binding(self, context, port):
cfg_vif_type = cfg.CONF.NOVA.vif_type.lower()
if cfg_vif_type not in (portbindings.VIF_TYPE_OVS,
portbindings.VIF_TYPE_IVS):
LOG.warning(_LW("Unrecognized vif_type in configuration "
"[%s]. Defaulting to ovs."),
cfg_vif_type)
cfg_vif_type = portbindings.VIF_TYPE_OVS
# In ML2, the host_id is already populated
if portbindings.HOST_ID in port:
hostid = port[portbindings.HOST_ID]
elif 'id' in port:
hostid = porttracker_db.get_port_hostid(context, port['id'])
else:
hostid = None
if hostid:
port[portbindings.HOST_ID] = hostid
override = self._check_hostvif_override(hostid)
if override:
cfg_vif_type = override
port[portbindings.VIF_TYPE] = cfg_vif_type
sg_enabled = sg_rpc.is_firewall_enabled()
port[portbindings.VIF_DETAILS] = {
# TODO(rkukura): Replace with new VIF security details
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases,
portbindings.OVS_HYBRID_PLUG: sg_enabled
}
return port
def _check_hostvif_override(self, hostid):
for v in cfg.CONF.NOVA.vif_types:
if hostid in getattr(cfg.CONF.NOVA, "node_override_vif_" + v, []):
return v
return False
def _get_port_net_tenantid(self, context, port):
net = super(NeutronRestProxyV2Base,
self).get_network(context, port["network_id"])
return net['tenant_id']
def async_port_create(self, tenant_id, net_id, port):
try:
self.servers.rest_create_port(tenant_id, net_id, port)
except servermanager.RemoteRestError as e:
# 404 should never be received on a port create unless
# there are inconsistencies between the data in neutron
# and the data in the backend.
# Run a sync to get it consistent.
if (cfg.CONF.RESTPROXY.auto_sync_on_failure and
e.status == httplib.NOT_FOUND and
servermanager.NXNETWORK in e.reason):
LOG.error(_LE("Iconsistency with backend controller "
"triggering full synchronization."))
# args depend on if we are operating in ML2 driver
# or as the full plugin
topoargs = self.servers.get_topo_function_args
self._send_all_data(
send_ports=topoargs['get_ports'],
send_floating_ips=topoargs['get_floating_ips'],
send_routers=topoargs['get_routers'],
triggered_by_tenant=tenant_id
)
# If the full sync worked, the port will be created
# on the controller so it can be safely marked as active
else:
# Any errors that don't result in a successful auto-sync
# require that the port be placed into the error state.
LOG.error(
_LE("NeutronRestProxyV2: Unable to create port: %s"), e)
try:
self._set_port_status(port['id'], const.PORT_STATUS_ERROR)
except exceptions.PortNotFound:
# If port is already gone from DB and there was an error
# creating on the backend, everything is already consistent
pass
return
new_status = (const.PORT_STATUS_ACTIVE if port['state'] == 'UP'
else const.PORT_STATUS_DOWN)
try:
self._set_port_status(port['id'], new_status)
except exceptions.PortNotFound:
# This port was deleted before the create made it to the controller
# so it now needs to be deleted since the normal delete request
# would have deleted an non-existent port.
self.servers.rest_delete_port(tenant_id, net_id, port['id'])
# NOTE(kevinbenton): workaround for eventlet/mysql deadlock
@utils.synchronized('bsn-port-barrier')
def _set_port_status(self, port_id, status):
session = db.get_session()
try:
port = session.query(models_v2.Port).filter_by(id=port_id).one()
port['status'] = status
session.flush()
except sqlexc.NoResultFound:
raise exceptions.PortNotFound(port_id=port_id)
def put_context_in_serverpool(f):
@functools.wraps(f)
def wrapper(self, context, *args, **kwargs):
# core plugin: context is top level object
# ml2: keeps context in _plugin_context
self.servers.set_context(getattr(context, '_plugin_context', context))
return f(self, context, *args, **kwargs)
return wrapper
class NeutronRestProxyV2(NeutronRestProxyV2Base,
addr_pair_db.AllowedAddressPairsMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
SecurityGroupServerRpcMixin):
_supported_extension_aliases = ["external-net", "binding",
"extra_dhcp_opt", "quotas",
"dhcp_agent_scheduler", "agent",