From 61a035069514608fbf4c59ad33fdc84ef00b4b13 Mon Sep 17 00:00:00 2001 From: Taku Fukushima Date: Sat, 26 Sep 2015 02:10:37 +0900 Subject: [PATCH] Implement /NetworkDriver.Join This patch implements /NetworkDriver.Join, which goes through the following steps: 1. Find the Neutron port associated with the given EndpointID 2. Create the veth pair based on the info of the retrieved port 3. Bind the port to the veth endpoint 4. Construct the response with the port and return it In the process 3, Kuryr executes the binding script specified in the binding:vif_type attribute of the retrieved Neutron port. Although Neutron plugin can put arbitrary types in that attribute, if the attribute was not specified it defaults to "unbound" and Kuryr invokes "unbound" executable, which always fails with the status code 1. This patch also includes the unit tests cover the successful case and the failures. Change-Id: Id3a8288199975d86812c7c1d210c7e11ae58d7b8 Signed-off-by: Taku Fukushima --- kuryr/binding.py | 124 +++++++++++++++++++++++++++++++++++++ kuryr/common/config.py | 8 +++ kuryr/common/exceptions.py | 2 +- kuryr/controllers.py | 69 ++++++++++++++++++++- kuryr/tests/base.py | 10 +++ kuryr/tests/test_join.py | 87 ++++++++++++++++++++++++-- kuryr/tests/test_kuryr.py | 52 ++++++++++++++-- kuryr/utils.py | 8 ++- requirements.txt | 3 + usr/libexec/kuryr/midonet | 29 +++++++++ usr/libexec/kuryr/unbound | 16 +++++ 11 files changed, 396 insertions(+), 12 deletions(-) create mode 100644 kuryr/binding.py create mode 100755 usr/libexec/kuryr/midonet create mode 100644 usr/libexec/kuryr/unbound diff --git a/kuryr/binding.py b/kuryr/binding.py new file mode 100644 index 00000000..06bece29 --- /dev/null +++ b/kuryr/binding.py @@ -0,0 +1,124 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +import netaddr +from oslo_concurrency import processutils +from oslo_utils import excutils +import pyroute2 + +from kuryr.common import config +from kuryr.common import exceptions + + +CONTAINER_VETH_POSTFIX = '_c' +BINDING_SUBCOMMAND = 'bind' +DOWN = 'DOWN' +FALLBACK_VIF_TYPE = 'unbound' +FIXED_IP_KEY = 'fixed_ips' +IFF_UP = 0x1 # The last bit represents if the interface is up +IP_ADDRESS_KEY = 'ip_address' +KIND_VETH = 'veth' +MAC_ADDRESS_KEY = 'mac_address' +SUBNET_ID_KEY = 'subnet_id' +VETH_POSTFIX = '-veth' +VIF_TYPE_KEY = 'binding:vif_type' + + +def _is_up(interface): + flags = interface['flags'] + if not flags: + return False + return (flags & IFF_UP) == 1 + + +def cleanup_veth(ifname): + """Cleans the veth passed as an argument up. + + :param ifname: the name of the veth endpoint + :returns: the index of the interface which name is the given ifname if it + exists, otherwise None + :raises: pyroute2.netlink.NetlinkError + """ + ipr = pyroute2.IPRoute() + veths = ipr.link_lookup(ifname=ifname) + if veths: + host_veth_index = veths[0] + ipr.link_remove(host_veth_index) + return host_veth_index + else: + return None + + +def port_bind(endpoint_id, neutron_port, neutron_subnets): + """Binds the Neutorn port to the network interface on the host. + + :param endpoint_id: the ID of the endpoint as string + :param neutron_port: a port dictionary returned from + python-neutronclient + :param neutron_subnets: a list of all subnets under network to which this + endpoint is trying to join + :returns: the tuple of the names of the veth pair and the tuple of stdout + and stderr returned by processutils.execute invoked with the + executable script for binding + :raises: kuryr.common.exceptions.VethCreationFailure, + processutils.ProcessExecutionError + """ + # NOTE(tfukushima): pyroute2.ipdb requires Linux to be imported. So I don't + # import it in the module scope but here. + import pyroute2.ipdb + + ifname = endpoint_id[:8] + VETH_POSTFIX + peer_name = ifname + CONTAINER_VETH_POSTFIX + subnets_dict = {subnet['id']: subnet for subnet in neutron_subnets} + + ip = pyroute2.IPDB() + try: + with ip.create(ifname=ifname, kind=KIND_VETH, + reuse=True, peer=peer_name) as host_veth: + if not _is_up(host_veth): + host_veth.up() + with ip.interfaces[peer_name] as peer_veth: + fixed_ips = neutron_port.get(FIXED_IP_KEY, []) + if not fixed_ips and (IP_ADDRESS_KEY in neutron_port): + peer_veth.add_ip(neutron_port[IP_ADDRESS_KEY]) + for fixed_ip in fixed_ips: + if IP_ADDRESS_KEY in fixed_ip and (SUBNET_ID_KEY in fixed_ip): + subnet_id = fixed_ip[SUBNET_ID_KEY] + subnet = subnets_dict[subnet_id] + cidr = netaddr.IPNetwork(subnet['cidr']) + peer_veth.add_ip(fixed_ip[IP_ADDRESS_KEY], cidr.prefixlen) + peer_veth.address = neutron_port[MAC_ADDRESS_KEY].lower() + if not _is_up(peer_veth): + peer_veth.up() + except pyroute2.ipdb.common.CreateException: + raise exceptions.VethCreationFailure( + 'Creating the veth pair was failed.') + except pyroute2.ipdb.common.CommitException: + raise exceptions.VethCreationFailure( + 'Could not configure the veth endpoint for the container.') + finally: + ip.release() + + vif_type = neutron_port.get(VIF_TYPE_KEY, FALLBACK_VIF_TYPE) + binding_exec_path = os.path.join(config.CONF.bindir, vif_type) + port_id = neutron_port['id'] + try: + stdout, stderr = processutils.execute( + binding_exec_path, BINDING_SUBCOMMAND, port_id, ifname, + run_as_root=True) + except processutils.ProcessExecutionError: + with excutils.save_and_reraise_exception(): + cleanup_veth(ifname) + + return (ifname, peer_name, (stdout, stderr)) diff --git a/kuryr/common/config.py b/kuryr/common/config.py index 819981af..0c609e8c 100644 --- a/kuryr/common/config.py +++ b/kuryr/common/config.py @@ -67,3 +67,11 @@ CONF = cfg.CONF CONF.register_opts(core_opts) CONF.register_opts(neutron_opts, group='neutron_client') CONF.register_opts(keystone_opts, group='keystone_client') + +binding_opts = [ + cfg.StrOpt('veth_dst_prefix', + default='eth', + help=('The name prefix of the veth endpoint put inside the ' + 'container.')) +] +CONF.register_opts(binding_opts, 'binding') diff --git a/kuryr/common/exceptions.py b/kuryr/common/exceptions.py index 4ec31955..c142cbb0 100644 --- a/kuryr/common/exceptions.py +++ b/kuryr/common/exceptions.py @@ -39,7 +39,7 @@ class NoResourceException(KuryrException): """ -class VethCreatationFailure(KuryrException): +class VethCreationFailure(KuryrException): """Exception represents the veth pair creation is failed. This exception is thrown when the veth pair is not created appropriately diff --git a/kuryr/controllers.py b/kuryr/controllers.py index 2b5b716f..c7b5c92e 100644 --- a/kuryr/controllers.py +++ b/kuryr/controllers.py @@ -16,9 +16,13 @@ import flask import jsonschema import netaddr from neutronclient.common import exceptions as n_exceptions +from oslo_concurrency import processutils from oslo_config import cfg +from oslo_utils import excutils from kuryr import app +from kuryr import binding +from kuryr.common import config from kuryr.common import constants from kuryr.common import exceptions from kuryr import schemata @@ -551,7 +555,70 @@ def network_driver_join(): .format(json_data)) jsonschema.validate(json_data, schemata.JOIN_SCHEMA) - return flask.jsonify(constants.SCHEMA['JOIN']) + neutron_network_name = json_data['NetworkID'] + endpoint_id = json_data['EndpointID'] + + filtered_networks = _get_networks_by_attrs(name=neutron_network_name) + + if not filtered_networks: + return flask.jsonify({ + 'Err': "Neutron network associated with ID {0} doesn't exit." + .format(neutron_network_name) + }) + else: + neutron_network_id = filtered_networks[0]['id'] + + neutron_port_name = utils.get_neutron_port_name(endpoint_id) + filtered_ports = _get_ports_by_attrs(name=neutron_port_name) + if not filtered_ports: + raise exceptions.NoResourceException( + "The port doesn't exist for the name {0}" + .format(neutron_port_name)) + neutron_port = filtered_ports[0] + all_subnets = _get_subnets_by_attrs(network_id=neutron_network_id) + + try: + ifname, peer_name, (stdout, stderr) = binding.port_bind( + endpoint_id, neutron_port, all_subnets) + app.logger.debug(stdout) + if stderr: + app.logger.error(stderr) + except exceptions.VethCreationFailure as ex: + with excutils.save_and_reraise_exception(): + app.logger.error('Preparing the veth pair was failed: {0}.' + .format(ex)) + except processutils.ProcessExecutionError: + with excutils.save_and_reraise_exception(): + app.logger.error( + 'Could not bind the Neutron port to the veth endpoint.') + + join_response = { + "InterfaceName": { + "SrcName": peer_name, + "DstPrefix": config.CONF.binding.veth_dst_prefix + }, + "StaticRoutes": [] + } + + for subnet in all_subnets: + if subnet['ip_version'] == 4: + join_response['Gateway'] = subnet.get('gateway_ip', '') + else: + join_response['GatewayIPv6'] = subnet.get('gateway_ip', '') + host_routes = subnet.get('host_routes', []) + + for host_route in host_routes: + static_route = { + 'Destination': host_route['destination'] + } + if host_route.get('nexthop', None): + static_route['RouteType'] = constants.TYPES['NEXTHOP'] + static_route['NextHop'] = host_route['nexthop'] + else: + static_route['RouteType'] = constants.TYPES['CONNECTED'] + join_response['StaticRoutes'].append(static_route) + + return flask.jsonify(join_response) @app.route('/NetworkDriver.Leave', methods=['POST']) diff --git a/kuryr/tests/base.py b/kuryr/tests/base.py index 37ab90ae..05730b8d 100644 --- a/kuryr/tests/base.py +++ b/kuryr/tests/base.py @@ -13,6 +13,7 @@ from neutronclient.tests.unit import test_cli20 from kuryr import app +from kuryr import binding class TestCase(test_cli20.CLITestV20Base): @@ -37,6 +38,15 @@ class TestKuryrBase(TestCase): if hasattr(app, 'DEFAULT_POOL_IDS'): del app.DEFAULT_POOL_IDS + def _mock_out_binding(self, endpoint_id, neutron_port, neutron_subnets): + self.mox.StubOutWithMock(binding, 'port_bind') + fake_binding_response = ( + 'fake-veth', 'fake-veth_c', ('fake stdout', '')) + binding.port_bind(endpoint_id, neutron_port, + neutron_subnets).AndReturn(fake_binding_response) + self.mox.ReplayAll() + return fake_binding_response + def _mock_out_network(self, neutron_network_id, docker_network_id): fake_list_response = { "networks": [{ diff --git a/kuryr/tests/test_join.py b/kuryr/tests/test_join.py index 0ef09c2a..a4dadead 100644 --- a/kuryr/tests/test_join.py +++ b/kuryr/tests/test_join.py @@ -12,22 +12,30 @@ import hashlib import random +import uuid +import ddt +from oslo_concurrency import processutils from oslo_serialization import jsonutils +from werkzeug import exceptions as w_exceptions +from kuryr import app +from kuryr import binding +from kuryr.common import exceptions from kuryr.tests import base from kuryr import utils +@ddt.ddt class TestKuryrJoinFailures(base.TestKuryrFailures): """Unit tests for the failures for binding a Neutron port to an interface. """ def _invoke_join_request(self, docker_network_id, - docker_endpoint_id, sandbox_key): + docker_endpoint_id, container_id): data = { 'NetworkID': docker_network_id, 'EndpointID': docker_endpoint_id, - 'SandboxKey': sandbox_key, + 'SandboxKey': utils.get_sandbox_key(container_id), 'Options': {}, } response = self.app.post('/NetworkDriver.Join', @@ -36,6 +44,76 @@ class TestKuryrJoinFailures(base.TestKuryrFailures): return response + def _port_bind_with_exeption(self, docker_endpiont_id, neutron_port, + neutron_subnets, ex): + fake_ifname = 'fake-veth' + fake_binding_response = ( + fake_ifname, + fake_ifname + binding.CONTAINER_VETH_POSTFIX, + ('fake stdout', '') + ) + self.mox.StubOutWithMock(binding, 'port_bind') + if ex: + binding.port_bind( + docker_endpiont_id, neutron_port, neutron_subnets).AndRaise(ex) + else: + binding.port_bind( + docker_endpiont_id, neutron_port, neutron_subnets).AndReturn( + fake_binding_response) + self.mox.ReplayAll() + + return fake_binding_response + + @ddt.data(exceptions.VethCreationFailure, + processutils.ProcessExecutionError) + def test_join_veth_failures(self, GivenException): + fake_docker_network_id = hashlib.sha256( + str(random.getrandbits(256))).hexdigest() + fake_docker_endpoint_id = hashlib.sha256( + str(random.getrandbits(256))).hexdigest() + fake_container_id = hashlib.sha256( + str(random.getrandbits(256))).hexdigest() + + fake_neutron_network_id = str(uuid.uuid4()) + self._mock_out_network(fake_neutron_network_id, fake_docker_network_id) + fake_neutron_port_id = str(uuid.uuid4()) + self.mox.StubOutWithMock(app.neutron, 'list_ports') + neutron_port_name = utils.get_neutron_port_name( + fake_docker_endpoint_id) + fake_neutron_v4_subnet_id = str(uuid.uuid4()) + fake_neutron_v6_subnet_id = str(uuid.uuid4()) + fake_neutron_ports_response = self._get_fake_ports( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_port_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_ports(name=neutron_port_name).AndReturn( + fake_neutron_ports_response) + + self.mox.StubOutWithMock(app.neutron, 'list_subnets') + fake_neutron_subnets_response = self._get_fake_subnets( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_subnets(network_id=fake_neutron_network_id).AndReturn( + fake_neutron_subnets_response) + fake_neutron_port = fake_neutron_ports_response['ports'][0] + fake_neutron_subnets = fake_neutron_subnets_response['subnets'] + + fake_message = "fake message" + fake_exception = GivenException(fake_message) + self._port_bind_with_exeption( + fake_docker_endpoint_id, fake_neutron_port, + fake_neutron_subnets, fake_exception) + self.mox.ReplayAll() + + response = self._invoke_join_request( + fake_docker_network_id, fake_docker_endpoint_id, fake_container_id) + + self.assertEqual( + w_exceptions.InternalServerError.code, response.status_code) + decoded_json = jsonutils.loads(response.data) + self.assertTrue('Err' in decoded_json) + self.assertTrue(fake_message in decoded_json['Err']) + def test_join_bad_request(self): fake_docker_network_id = hashlib.sha256( str(random.getrandbits(256))).hexdigest() @@ -45,9 +123,10 @@ class TestKuryrJoinFailures(base.TestKuryrFailures): response = self._invoke_join_request( fake_docker_network_id, invalid_docker_endpoint_id, - utils.get_sandbox_key(fake_container_id)) + fake_container_id) - self.assertEqual(400, response.status_code) + self.assertEqual( + w_exceptions.BadRequest.code, response.status_code) decoded_json = jsonutils.loads(response.data) self.assertTrue('Err' in decoded_json) # TODO(tfukushima): Add the better error message validation. diff --git a/kuryr/tests/test_kuryr.py b/kuryr/tests/test_kuryr.py index bb3e179f..864126fa 100644 --- a/kuryr/tests/test_kuryr.py +++ b/kuryr/tests/test_kuryr.py @@ -18,6 +18,7 @@ import ddt from oslo_serialization import jsonutils from kuryr import app +from kuryr.common import config from kuryr.common import constants from kuryr.tests import base from kuryr import utils @@ -482,7 +483,37 @@ class TestKuryr(base.TestKuryrBase): fake_container_id = hashlib.sha256( str(random.getrandbits(256))).hexdigest() - data = { + fake_neutron_network_id = str(uuid.uuid4()) + self._mock_out_network(fake_neutron_network_id, fake_docker_network_id) + fake_neutron_port_id = str(uuid.uuid4()) + self.mox.StubOutWithMock(app.neutron, 'list_ports') + neutron_port_name = utils.get_neutron_port_name( + fake_docker_endpoint_id) + fake_neutron_v4_subnet_id = str(uuid.uuid4()) + fake_neutron_v6_subnet_id = str(uuid.uuid4()) + fake_neutron_ports_response = self._get_fake_ports( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_port_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_ports(name=neutron_port_name).AndReturn( + fake_neutron_ports_response) + + self.mox.StubOutWithMock(app.neutron, 'list_subnets') + fake_neutron_subnets_response = self._get_fake_subnets( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_subnets(network_id=fake_neutron_network_id).AndReturn( + fake_neutron_subnets_response) + fake_neutron_port = fake_neutron_ports_response['ports'][0] + fake_neutron_subnets = fake_neutron_subnets_response['subnets'] + _, fake_peer_name, _ = self._mock_out_binding( + fake_docker_endpoint_id, fake_neutron_port, fake_neutron_subnets) + self.mox.ReplayAll() + + fake_subnets_dict_by_id = {subnet['id']: subnet + for subnet in fake_neutron_subnets} + + join_request = { 'NetworkID': fake_docker_network_id, 'EndpointID': fake_docker_endpoint_id, 'SandboxKey': utils.get_sandbox_key(fake_container_id), @@ -490,9 +521,22 @@ class TestKuryr(base.TestKuryrBase): } response = self.app.post('/NetworkDriver.Join', content_type='application/json', - data=jsonutils.dumps(data)) + data=jsonutils.dumps(join_request)) self.assertEqual(200, response.status_code) + decoded_json = jsonutils.loads(response.data) - app.logger.info(decoded_json) - self.assertEqual(constants.SCHEMA['JOIN'], decoded_json) + fake_neutron_v4_subnet = fake_subnets_dict_by_id[ + fake_neutron_v4_subnet_id] + fake_neutron_v6_subnet = fake_subnets_dict_by_id[ + fake_neutron_v6_subnet_id] + expected_response = { + 'Gateway': fake_neutron_v4_subnet['gateway_ip'], + 'GatewayIPv6': fake_neutron_v6_subnet['gateway_ip'], + 'InterfaceName': { + 'DstPrefix': config.CONF.binding.veth_dst_prefix, + 'SrcName': fake_peer_name, + }, + 'StaticRoutes': [] + } + self.assertEqual(expected_response, decoded_json) diff --git a/kuryr/utils.py b/kuryr/utils.py index 11256948..2d8ffd8f 100644 --- a/kuryr/utils.py +++ b/kuryr/utils.py @@ -19,8 +19,10 @@ import jsonschema from neutronclient.common import exceptions as n_exceptions from neutronclient.neutron import client from neutronclient.v2_0 import client as client_v2 +from oslo_concurrency import processutils from werkzeug import exceptions as w_exceptions +from kuryr.common import exceptions DOCKER_NETNS_BASE = '/var/run/docker/netns' PORT_POSTFIX = 'port' @@ -61,19 +63,21 @@ def make_json_app(import_name, **kwargs): """ app = flask.Flask(import_name, **kwargs) + @app.errorhandler(exceptions.KuryrException) @app.errorhandler(n_exceptions.NeutronClientException) @app.errorhandler(jsonschema.ValidationError) + @app.errorhandler(processutils.ProcessExecutionError) def make_json_error(ex): app.logger.error("Unexpected error happened: {0}".format(ex)) traceback.print_exc(file=sys.stderr) response = flask.jsonify({"Err": str(ex)}) - response.status_code = 500 + response.status_code = w_exceptions.InternalServerError.code if isinstance(ex, w_exceptions.HTTPException): response.status_code = ex.code elif isinstance(ex, n_exceptions.NeutronClientException): response.status_code = ex.status_code elif isinstance(ex, jsonschema.ValidationError): - response.status_code = 400 + response.status_code = w_exceptions.BadRequest.code content_type = 'application/vnd.docker.plugins.v1+json; charset=utf-8' response.headers['Content-Type'] = content_type return response diff --git a/requirements.txt b/requirements.txt index c027621a..7d926268 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,5 +7,8 @@ Babel>=1.3 Flask>=0.10,<1.0 jsonschema!=2.5.0,<3.0.0,>=2.0.0 netaddr>=0.7.12 +oslo.concurrency>=2.3.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 +oslo.utils>=2.0.0 # Apache-2.0 python-neutronclient>=2.3.11,<3 +pyroute2>=0.3.10 # Apache-2.0 (+ dual licensed GPL2) diff --git a/usr/libexec/kuryr/midonet b/usr/libexec/kuryr/midonet new file mode 100755 index 00000000..cef2eda7 --- /dev/null +++ b/usr/libexec/kuryr/midonet @@ -0,0 +1,29 @@ +#!/bin/bash +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +bind_port() { + echo "Binding Neutron port $1 to the veth $2..." + mm-ctl --bind-port $1 $2 +} + +case $1 in + "bind") + shift + bind_port "$@" + exit 0 + ;; + *) + echo >&2 "$0: Invalid command $1." + exit 1 + ;; +esac diff --git a/usr/libexec/kuryr/unbound b/usr/libexec/kuryr/unbound new file mode 100644 index 00000000..e88745c8 --- /dev/null +++ b/usr/libexec/kuryr/unbound @@ -0,0 +1,16 @@ +#!/bin/bash +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +echo "binding:vif_type is invalid." +exit 1