Implement /NetworkDriver.Join

This patch implements /NetworkDriver.Join, which goes through the
following steps:

1. Find the Neutron port associated with the given EndpointID
2. Create the veth pair based on the info of the retrieved port
3. Bind the port to the veth endpoint
4. Construct the response with the port and return it

In the process 3, Kuryr executes the binding script specified in the
binding:vif_type attribute of the retrieved Neutron port. Although
Neutron plugin can put arbitrary types in that attribute, if the
attribute was not specified it defaults to "unbound" and Kuryr invokes
"unbound" executable, which always fails with the status code 1.

This patch also includes the unit tests cover the successful case and
the failures.

Change-Id: Id3a8288199975d86812c7c1d210c7e11ae58d7b8
Signed-off-by: Taku Fukushima <f.tac.mac@gmail.com>
This commit is contained in:
Taku Fukushima 2015-09-26 02:10:37 +09:00
parent 8231541b01
commit 61a0350695
11 changed files with 396 additions and 12 deletions

124
kuryr/binding.py Normal file
View File

@ -0,0 +1,124 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import netaddr
from oslo_concurrency import processutils
from oslo_utils import excutils
import pyroute2
from kuryr.common import config
from kuryr.common import exceptions
CONTAINER_VETH_POSTFIX = '_c'
BINDING_SUBCOMMAND = 'bind'
DOWN = 'DOWN'
FALLBACK_VIF_TYPE = 'unbound'
FIXED_IP_KEY = 'fixed_ips'
IFF_UP = 0x1 # The last bit represents if the interface is up
IP_ADDRESS_KEY = 'ip_address'
KIND_VETH = 'veth'
MAC_ADDRESS_KEY = 'mac_address'
SUBNET_ID_KEY = 'subnet_id'
VETH_POSTFIX = '-veth'
VIF_TYPE_KEY = 'binding:vif_type'
def _is_up(interface):
flags = interface['flags']
if not flags:
return False
return (flags & IFF_UP) == 1
def cleanup_veth(ifname):
"""Cleans the veth passed as an argument up.
:param ifname: the name of the veth endpoint
:returns: the index of the interface which name is the given ifname if it
exists, otherwise None
:raises: pyroute2.netlink.NetlinkError
"""
ipr = pyroute2.IPRoute()
veths = ipr.link_lookup(ifname=ifname)
if veths:
host_veth_index = veths[0]
ipr.link_remove(host_veth_index)
return host_veth_index
else:
return None
def port_bind(endpoint_id, neutron_port, neutron_subnets):
"""Binds the Neutorn port to the network interface on the host.
:param endpoint_id: the ID of the endpoint as string
:param neutron_port: a port dictionary returned from
python-neutronclient
:param neutron_subnets: a list of all subnets under network to which this
endpoint is trying to join
:returns: the tuple of the names of the veth pair and the tuple of stdout
and stderr returned by processutils.execute invoked with the
executable script for binding
:raises: kuryr.common.exceptions.VethCreationFailure,
processutils.ProcessExecutionError
"""
# NOTE(tfukushima): pyroute2.ipdb requires Linux to be imported. So I don't
# import it in the module scope but here.
import pyroute2.ipdb
ifname = endpoint_id[:8] + VETH_POSTFIX
peer_name = ifname + CONTAINER_VETH_POSTFIX
subnets_dict = {subnet['id']: subnet for subnet in neutron_subnets}
ip = pyroute2.IPDB()
try:
with ip.create(ifname=ifname, kind=KIND_VETH,
reuse=True, peer=peer_name) as host_veth:
if not _is_up(host_veth):
host_veth.up()
with ip.interfaces[peer_name] as peer_veth:
fixed_ips = neutron_port.get(FIXED_IP_KEY, [])
if not fixed_ips and (IP_ADDRESS_KEY in neutron_port):
peer_veth.add_ip(neutron_port[IP_ADDRESS_KEY])
for fixed_ip in fixed_ips:
if IP_ADDRESS_KEY in fixed_ip and (SUBNET_ID_KEY in fixed_ip):
subnet_id = fixed_ip[SUBNET_ID_KEY]
subnet = subnets_dict[subnet_id]
cidr = netaddr.IPNetwork(subnet['cidr'])
peer_veth.add_ip(fixed_ip[IP_ADDRESS_KEY], cidr.prefixlen)
peer_veth.address = neutron_port[MAC_ADDRESS_KEY].lower()
if not _is_up(peer_veth):
peer_veth.up()
except pyroute2.ipdb.common.CreateException:
raise exceptions.VethCreationFailure(
'Creating the veth pair was failed.')
except pyroute2.ipdb.common.CommitException:
raise exceptions.VethCreationFailure(
'Could not configure the veth endpoint for the container.')
finally:
ip.release()
vif_type = neutron_port.get(VIF_TYPE_KEY, FALLBACK_VIF_TYPE)
binding_exec_path = os.path.join(config.CONF.bindir, vif_type)
port_id = neutron_port['id']
try:
stdout, stderr = processutils.execute(
binding_exec_path, BINDING_SUBCOMMAND, port_id, ifname,
run_as_root=True)
except processutils.ProcessExecutionError:
with excutils.save_and_reraise_exception():
cleanup_veth(ifname)
return (ifname, peer_name, (stdout, stderr))

View File

@ -67,3 +67,11 @@ CONF = cfg.CONF
CONF.register_opts(core_opts)
CONF.register_opts(neutron_opts, group='neutron_client')
CONF.register_opts(keystone_opts, group='keystone_client')
binding_opts = [
cfg.StrOpt('veth_dst_prefix',
default='eth',
help=('The name prefix of the veth endpoint put inside the '
'container.'))
]
CONF.register_opts(binding_opts, 'binding')

View File

@ -39,7 +39,7 @@ class NoResourceException(KuryrException):
"""
class VethCreatationFailure(KuryrException):
class VethCreationFailure(KuryrException):
"""Exception represents the veth pair creation is failed.
This exception is thrown when the veth pair is not created appropriately

View File

@ -16,9 +16,13 @@ import flask
import jsonschema
import netaddr
from neutronclient.common import exceptions as n_exceptions
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_utils import excutils
from kuryr import app
from kuryr import binding
from kuryr.common import config
from kuryr.common import constants
from kuryr.common import exceptions
from kuryr import schemata
@ -551,7 +555,70 @@ def network_driver_join():
.format(json_data))
jsonschema.validate(json_data, schemata.JOIN_SCHEMA)
return flask.jsonify(constants.SCHEMA['JOIN'])
neutron_network_name = json_data['NetworkID']
endpoint_id = json_data['EndpointID']
filtered_networks = _get_networks_by_attrs(name=neutron_network_name)
if not filtered_networks:
return flask.jsonify({
'Err': "Neutron network associated with ID {0} doesn't exit."
.format(neutron_network_name)
})
else:
neutron_network_id = filtered_networks[0]['id']
neutron_port_name = utils.get_neutron_port_name(endpoint_id)
filtered_ports = _get_ports_by_attrs(name=neutron_port_name)
if not filtered_ports:
raise exceptions.NoResourceException(
"The port doesn't exist for the name {0}"
.format(neutron_port_name))
neutron_port = filtered_ports[0]
all_subnets = _get_subnets_by_attrs(network_id=neutron_network_id)
try:
ifname, peer_name, (stdout, stderr) = binding.port_bind(
endpoint_id, neutron_port, all_subnets)
app.logger.debug(stdout)
if stderr:
app.logger.error(stderr)
except exceptions.VethCreationFailure as ex:
with excutils.save_and_reraise_exception():
app.logger.error('Preparing the veth pair was failed: {0}.'
.format(ex))
except processutils.ProcessExecutionError:
with excutils.save_and_reraise_exception():
app.logger.error(
'Could not bind the Neutron port to the veth endpoint.')
join_response = {
"InterfaceName": {
"SrcName": peer_name,
"DstPrefix": config.CONF.binding.veth_dst_prefix
},
"StaticRoutes": []
}
for subnet in all_subnets:
if subnet['ip_version'] == 4:
join_response['Gateway'] = subnet.get('gateway_ip', '')
else:
join_response['GatewayIPv6'] = subnet.get('gateway_ip', '')
host_routes = subnet.get('host_routes', [])
for host_route in host_routes:
static_route = {
'Destination': host_route['destination']
}
if host_route.get('nexthop', None):
static_route['RouteType'] = constants.TYPES['NEXTHOP']
static_route['NextHop'] = host_route['nexthop']
else:
static_route['RouteType'] = constants.TYPES['CONNECTED']
join_response['StaticRoutes'].append(static_route)
return flask.jsonify(join_response)
@app.route('/NetworkDriver.Leave', methods=['POST'])

View File

@ -13,6 +13,7 @@
from neutronclient.tests.unit import test_cli20
from kuryr import app
from kuryr import binding
class TestCase(test_cli20.CLITestV20Base):
@ -37,6 +38,15 @@ class TestKuryrBase(TestCase):
if hasattr(app, 'DEFAULT_POOL_IDS'):
del app.DEFAULT_POOL_IDS
def _mock_out_binding(self, endpoint_id, neutron_port, neutron_subnets):
self.mox.StubOutWithMock(binding, 'port_bind')
fake_binding_response = (
'fake-veth', 'fake-veth_c', ('fake stdout', ''))
binding.port_bind(endpoint_id, neutron_port,
neutron_subnets).AndReturn(fake_binding_response)
self.mox.ReplayAll()
return fake_binding_response
def _mock_out_network(self, neutron_network_id, docker_network_id):
fake_list_response = {
"networks": [{

View File

@ -12,22 +12,30 @@
import hashlib
import random
import uuid
import ddt
from oslo_concurrency import processutils
from oslo_serialization import jsonutils
from werkzeug import exceptions as w_exceptions
from kuryr import app
from kuryr import binding
from kuryr.common import exceptions
from kuryr.tests import base
from kuryr import utils
@ddt.ddt
class TestKuryrJoinFailures(base.TestKuryrFailures):
"""Unit tests for the failures for binding a Neutron port to an interface.
"""
def _invoke_join_request(self, docker_network_id,
docker_endpoint_id, sandbox_key):
docker_endpoint_id, container_id):
data = {
'NetworkID': docker_network_id,
'EndpointID': docker_endpoint_id,
'SandboxKey': sandbox_key,
'SandboxKey': utils.get_sandbox_key(container_id),
'Options': {},
}
response = self.app.post('/NetworkDriver.Join',
@ -36,6 +44,76 @@ class TestKuryrJoinFailures(base.TestKuryrFailures):
return response
def _port_bind_with_exeption(self, docker_endpiont_id, neutron_port,
neutron_subnets, ex):
fake_ifname = 'fake-veth'
fake_binding_response = (
fake_ifname,
fake_ifname + binding.CONTAINER_VETH_POSTFIX,
('fake stdout', '')
)
self.mox.StubOutWithMock(binding, 'port_bind')
if ex:
binding.port_bind(
docker_endpiont_id, neutron_port, neutron_subnets).AndRaise(ex)
else:
binding.port_bind(
docker_endpiont_id, neutron_port, neutron_subnets).AndReturn(
fake_binding_response)
self.mox.ReplayAll()
return fake_binding_response
@ddt.data(exceptions.VethCreationFailure,
processutils.ProcessExecutionError)
def test_join_veth_failures(self, GivenException):
fake_docker_network_id = hashlib.sha256(
str(random.getrandbits(256))).hexdigest()
fake_docker_endpoint_id = hashlib.sha256(
str(random.getrandbits(256))).hexdigest()
fake_container_id = hashlib.sha256(
str(random.getrandbits(256))).hexdigest()
fake_neutron_network_id = str(uuid.uuid4())
self._mock_out_network(fake_neutron_network_id, fake_docker_network_id)
fake_neutron_port_id = str(uuid.uuid4())
self.mox.StubOutWithMock(app.neutron, 'list_ports')
neutron_port_name = utils.get_neutron_port_name(
fake_docker_endpoint_id)
fake_neutron_v4_subnet_id = str(uuid.uuid4())
fake_neutron_v6_subnet_id = str(uuid.uuid4())
fake_neutron_ports_response = self._get_fake_ports(
fake_docker_endpoint_id, fake_neutron_network_id,
fake_neutron_port_id,
fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)
app.neutron.list_ports(name=neutron_port_name).AndReturn(
fake_neutron_ports_response)
self.mox.StubOutWithMock(app.neutron, 'list_subnets')
fake_neutron_subnets_response = self._get_fake_subnets(
fake_docker_endpoint_id, fake_neutron_network_id,
fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)
app.neutron.list_subnets(network_id=fake_neutron_network_id).AndReturn(
fake_neutron_subnets_response)
fake_neutron_port = fake_neutron_ports_response['ports'][0]
fake_neutron_subnets = fake_neutron_subnets_response['subnets']
fake_message = "fake message"
fake_exception = GivenException(fake_message)
self._port_bind_with_exeption(
fake_docker_endpoint_id, fake_neutron_port,
fake_neutron_subnets, fake_exception)
self.mox.ReplayAll()
response = self._invoke_join_request(
fake_docker_network_id, fake_docker_endpoint_id, fake_container_id)
self.assertEqual(
w_exceptions.InternalServerError.code, response.status_code)
decoded_json = jsonutils.loads(response.data)
self.assertTrue('Err' in decoded_json)
self.assertTrue(fake_message in decoded_json['Err'])
def test_join_bad_request(self):
fake_docker_network_id = hashlib.sha256(
str(random.getrandbits(256))).hexdigest()
@ -45,9 +123,10 @@ class TestKuryrJoinFailures(base.TestKuryrFailures):
response = self._invoke_join_request(
fake_docker_network_id, invalid_docker_endpoint_id,
utils.get_sandbox_key(fake_container_id))
fake_container_id)
self.assertEqual(400, response.status_code)
self.assertEqual(
w_exceptions.BadRequest.code, response.status_code)
decoded_json = jsonutils.loads(response.data)
self.assertTrue('Err' in decoded_json)
# TODO(tfukushima): Add the better error message validation.

View File

@ -18,6 +18,7 @@ import ddt
from oslo_serialization import jsonutils
from kuryr import app
from kuryr.common import config
from kuryr.common import constants
from kuryr.tests import base
from kuryr import utils
@ -482,7 +483,37 @@ class TestKuryr(base.TestKuryrBase):
fake_container_id = hashlib.sha256(
str(random.getrandbits(256))).hexdigest()
data = {
fake_neutron_network_id = str(uuid.uuid4())
self._mock_out_network(fake_neutron_network_id, fake_docker_network_id)
fake_neutron_port_id = str(uuid.uuid4())
self.mox.StubOutWithMock(app.neutron, 'list_ports')
neutron_port_name = utils.get_neutron_port_name(
fake_docker_endpoint_id)
fake_neutron_v4_subnet_id = str(uuid.uuid4())
fake_neutron_v6_subnet_id = str(uuid.uuid4())
fake_neutron_ports_response = self._get_fake_ports(
fake_docker_endpoint_id, fake_neutron_network_id,
fake_neutron_port_id,
fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)
app.neutron.list_ports(name=neutron_port_name).AndReturn(
fake_neutron_ports_response)
self.mox.StubOutWithMock(app.neutron, 'list_subnets')
fake_neutron_subnets_response = self._get_fake_subnets(
fake_docker_endpoint_id, fake_neutron_network_id,
fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)
app.neutron.list_subnets(network_id=fake_neutron_network_id).AndReturn(
fake_neutron_subnets_response)
fake_neutron_port = fake_neutron_ports_response['ports'][0]
fake_neutron_subnets = fake_neutron_subnets_response['subnets']
_, fake_peer_name, _ = self._mock_out_binding(
fake_docker_endpoint_id, fake_neutron_port, fake_neutron_subnets)
self.mox.ReplayAll()
fake_subnets_dict_by_id = {subnet['id']: subnet
for subnet in fake_neutron_subnets}
join_request = {
'NetworkID': fake_docker_network_id,
'EndpointID': fake_docker_endpoint_id,
'SandboxKey': utils.get_sandbox_key(fake_container_id),
@ -490,9 +521,22 @@ class TestKuryr(base.TestKuryrBase):
}
response = self.app.post('/NetworkDriver.Join',
content_type='application/json',
data=jsonutils.dumps(data))
data=jsonutils.dumps(join_request))
self.assertEqual(200, response.status_code)
decoded_json = jsonutils.loads(response.data)
app.logger.info(decoded_json)
self.assertEqual(constants.SCHEMA['JOIN'], decoded_json)
fake_neutron_v4_subnet = fake_subnets_dict_by_id[
fake_neutron_v4_subnet_id]
fake_neutron_v6_subnet = fake_subnets_dict_by_id[
fake_neutron_v6_subnet_id]
expected_response = {
'Gateway': fake_neutron_v4_subnet['gateway_ip'],
'GatewayIPv6': fake_neutron_v6_subnet['gateway_ip'],
'InterfaceName': {
'DstPrefix': config.CONF.binding.veth_dst_prefix,
'SrcName': fake_peer_name,
},
'StaticRoutes': []
}
self.assertEqual(expected_response, decoded_json)

View File

@ -19,8 +19,10 @@ import jsonschema
from neutronclient.common import exceptions as n_exceptions
from neutronclient.neutron import client
from neutronclient.v2_0 import client as client_v2
from oslo_concurrency import processutils
from werkzeug import exceptions as w_exceptions
from kuryr.common import exceptions
DOCKER_NETNS_BASE = '/var/run/docker/netns'
PORT_POSTFIX = 'port'
@ -61,19 +63,21 @@ def make_json_app(import_name, **kwargs):
"""
app = flask.Flask(import_name, **kwargs)
@app.errorhandler(exceptions.KuryrException)
@app.errorhandler(n_exceptions.NeutronClientException)
@app.errorhandler(jsonschema.ValidationError)
@app.errorhandler(processutils.ProcessExecutionError)
def make_json_error(ex):
app.logger.error("Unexpected error happened: {0}".format(ex))
traceback.print_exc(file=sys.stderr)
response = flask.jsonify({"Err": str(ex)})
response.status_code = 500
response.status_code = w_exceptions.InternalServerError.code
if isinstance(ex, w_exceptions.HTTPException):
response.status_code = ex.code
elif isinstance(ex, n_exceptions.NeutronClientException):
response.status_code = ex.status_code
elif isinstance(ex, jsonschema.ValidationError):
response.status_code = 400
response.status_code = w_exceptions.BadRequest.code
content_type = 'application/vnd.docker.plugins.v1+json; charset=utf-8'
response.headers['Content-Type'] = content_type
return response

View File

@ -7,5 +7,8 @@ Babel>=1.3
Flask>=0.10,<1.0
jsonschema!=2.5.0,<3.0.0,>=2.0.0
netaddr>=0.7.12
oslo.concurrency>=2.3.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0
oslo.utils>=2.0.0 # Apache-2.0
python-neutronclient>=2.3.11,<3
pyroute2>=0.3.10 # Apache-2.0 (+ dual licensed GPL2)

29
usr/libexec/kuryr/midonet Executable file
View File

@ -0,0 +1,29 @@
#!/bin/bash
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
bind_port() {
echo "Binding Neutron port $1 to the veth $2..."
mm-ctl --bind-port $1 $2
}
case $1 in
"bind")
shift
bind_port "$@"
exit 0
;;
*)
echo >&2 "$0: Invalid command $1."
exit 1
;;
esac

16
usr/libexec/kuryr/unbound Normal file
View File

@ -0,0 +1,16 @@
#!/bin/bash
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
echo "binding:vif_type is invalid."
exit 1