diff --git a/kuryr/binding.py b/kuryr/binding.py new file mode 100644 index 00000000..06bece29 --- /dev/null +++ b/kuryr/binding.py @@ -0,0 +1,124 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +import netaddr +from oslo_concurrency import processutils +from oslo_utils import excutils +import pyroute2 + +from kuryr.common import config +from kuryr.common import exceptions + + +CONTAINER_VETH_POSTFIX = '_c' +BINDING_SUBCOMMAND = 'bind' +DOWN = 'DOWN' +FALLBACK_VIF_TYPE = 'unbound' +FIXED_IP_KEY = 'fixed_ips' +IFF_UP = 0x1 # The last bit represents if the interface is up +IP_ADDRESS_KEY = 'ip_address' +KIND_VETH = 'veth' +MAC_ADDRESS_KEY = 'mac_address' +SUBNET_ID_KEY = 'subnet_id' +VETH_POSTFIX = '-veth' +VIF_TYPE_KEY = 'binding:vif_type' + + +def _is_up(interface): + flags = interface['flags'] + if not flags: + return False + return (flags & IFF_UP) == 1 + + +def cleanup_veth(ifname): + """Cleans the veth passed as an argument up. + + :param ifname: the name of the veth endpoint + :returns: the index of the interface which name is the given ifname if it + exists, otherwise None + :raises: pyroute2.netlink.NetlinkError + """ + ipr = pyroute2.IPRoute() + veths = ipr.link_lookup(ifname=ifname) + if veths: + host_veth_index = veths[0] + ipr.link_remove(host_veth_index) + return host_veth_index + else: + return None + + +def port_bind(endpoint_id, neutron_port, neutron_subnets): + """Binds the Neutorn port to the network interface on the host. + + :param endpoint_id: the ID of the endpoint as string + :param neutron_port: a port dictionary returned from + python-neutronclient + :param neutron_subnets: a list of all subnets under network to which this + endpoint is trying to join + :returns: the tuple of the names of the veth pair and the tuple of stdout + and stderr returned by processutils.execute invoked with the + executable script for binding + :raises: kuryr.common.exceptions.VethCreationFailure, + processutils.ProcessExecutionError + """ + # NOTE(tfukushima): pyroute2.ipdb requires Linux to be imported. So I don't + # import it in the module scope but here. + import pyroute2.ipdb + + ifname = endpoint_id[:8] + VETH_POSTFIX + peer_name = ifname + CONTAINER_VETH_POSTFIX + subnets_dict = {subnet['id']: subnet for subnet in neutron_subnets} + + ip = pyroute2.IPDB() + try: + with ip.create(ifname=ifname, kind=KIND_VETH, + reuse=True, peer=peer_name) as host_veth: + if not _is_up(host_veth): + host_veth.up() + with ip.interfaces[peer_name] as peer_veth: + fixed_ips = neutron_port.get(FIXED_IP_KEY, []) + if not fixed_ips and (IP_ADDRESS_KEY in neutron_port): + peer_veth.add_ip(neutron_port[IP_ADDRESS_KEY]) + for fixed_ip in fixed_ips: + if IP_ADDRESS_KEY in fixed_ip and (SUBNET_ID_KEY in fixed_ip): + subnet_id = fixed_ip[SUBNET_ID_KEY] + subnet = subnets_dict[subnet_id] + cidr = netaddr.IPNetwork(subnet['cidr']) + peer_veth.add_ip(fixed_ip[IP_ADDRESS_KEY], cidr.prefixlen) + peer_veth.address = neutron_port[MAC_ADDRESS_KEY].lower() + if not _is_up(peer_veth): + peer_veth.up() + except pyroute2.ipdb.common.CreateException: + raise exceptions.VethCreationFailure( + 'Creating the veth pair was failed.') + except pyroute2.ipdb.common.CommitException: + raise exceptions.VethCreationFailure( + 'Could not configure the veth endpoint for the container.') + finally: + ip.release() + + vif_type = neutron_port.get(VIF_TYPE_KEY, FALLBACK_VIF_TYPE) + binding_exec_path = os.path.join(config.CONF.bindir, vif_type) + port_id = neutron_port['id'] + try: + stdout, stderr = processutils.execute( + binding_exec_path, BINDING_SUBCOMMAND, port_id, ifname, + run_as_root=True) + except processutils.ProcessExecutionError: + with excutils.save_and_reraise_exception(): + cleanup_veth(ifname) + + return (ifname, peer_name, (stdout, stderr)) diff --git a/kuryr/common/config.py b/kuryr/common/config.py index 819981af..0c609e8c 100644 --- a/kuryr/common/config.py +++ b/kuryr/common/config.py @@ -67,3 +67,11 @@ CONF = cfg.CONF CONF.register_opts(core_opts) CONF.register_opts(neutron_opts, group='neutron_client') CONF.register_opts(keystone_opts, group='keystone_client') + +binding_opts = [ + cfg.StrOpt('veth_dst_prefix', + default='eth', + help=('The name prefix of the veth endpoint put inside the ' + 'container.')) +] +CONF.register_opts(binding_opts, 'binding') diff --git a/kuryr/common/exceptions.py b/kuryr/common/exceptions.py index 4ec31955..c142cbb0 100644 --- a/kuryr/common/exceptions.py +++ b/kuryr/common/exceptions.py @@ -39,7 +39,7 @@ class NoResourceException(KuryrException): """ -class VethCreatationFailure(KuryrException): +class VethCreationFailure(KuryrException): """Exception represents the veth pair creation is failed. This exception is thrown when the veth pair is not created appropriately diff --git a/kuryr/controllers.py b/kuryr/controllers.py index 2b5b716f..c7b5c92e 100644 --- a/kuryr/controllers.py +++ b/kuryr/controllers.py @@ -16,9 +16,13 @@ import flask import jsonschema import netaddr from neutronclient.common import exceptions as n_exceptions +from oslo_concurrency import processutils from oslo_config import cfg +from oslo_utils import excutils from kuryr import app +from kuryr import binding +from kuryr.common import config from kuryr.common import constants from kuryr.common import exceptions from kuryr import schemata @@ -551,7 +555,70 @@ def network_driver_join(): .format(json_data)) jsonschema.validate(json_data, schemata.JOIN_SCHEMA) - return flask.jsonify(constants.SCHEMA['JOIN']) + neutron_network_name = json_data['NetworkID'] + endpoint_id = json_data['EndpointID'] + + filtered_networks = _get_networks_by_attrs(name=neutron_network_name) + + if not filtered_networks: + return flask.jsonify({ + 'Err': "Neutron network associated with ID {0} doesn't exit." + .format(neutron_network_name) + }) + else: + neutron_network_id = filtered_networks[0]['id'] + + neutron_port_name = utils.get_neutron_port_name(endpoint_id) + filtered_ports = _get_ports_by_attrs(name=neutron_port_name) + if not filtered_ports: + raise exceptions.NoResourceException( + "The port doesn't exist for the name {0}" + .format(neutron_port_name)) + neutron_port = filtered_ports[0] + all_subnets = _get_subnets_by_attrs(network_id=neutron_network_id) + + try: + ifname, peer_name, (stdout, stderr) = binding.port_bind( + endpoint_id, neutron_port, all_subnets) + app.logger.debug(stdout) + if stderr: + app.logger.error(stderr) + except exceptions.VethCreationFailure as ex: + with excutils.save_and_reraise_exception(): + app.logger.error('Preparing the veth pair was failed: {0}.' + .format(ex)) + except processutils.ProcessExecutionError: + with excutils.save_and_reraise_exception(): + app.logger.error( + 'Could not bind the Neutron port to the veth endpoint.') + + join_response = { + "InterfaceName": { + "SrcName": peer_name, + "DstPrefix": config.CONF.binding.veth_dst_prefix + }, + "StaticRoutes": [] + } + + for subnet in all_subnets: + if subnet['ip_version'] == 4: + join_response['Gateway'] = subnet.get('gateway_ip', '') + else: + join_response['GatewayIPv6'] = subnet.get('gateway_ip', '') + host_routes = subnet.get('host_routes', []) + + for host_route in host_routes: + static_route = { + 'Destination': host_route['destination'] + } + if host_route.get('nexthop', None): + static_route['RouteType'] = constants.TYPES['NEXTHOP'] + static_route['NextHop'] = host_route['nexthop'] + else: + static_route['RouteType'] = constants.TYPES['CONNECTED'] + join_response['StaticRoutes'].append(static_route) + + return flask.jsonify(join_response) @app.route('/NetworkDriver.Leave', methods=['POST']) diff --git a/kuryr/tests/base.py b/kuryr/tests/base.py index 37ab90ae..05730b8d 100644 --- a/kuryr/tests/base.py +++ b/kuryr/tests/base.py @@ -13,6 +13,7 @@ from neutronclient.tests.unit import test_cli20 from kuryr import app +from kuryr import binding class TestCase(test_cli20.CLITestV20Base): @@ -37,6 +38,15 @@ class TestKuryrBase(TestCase): if hasattr(app, 'DEFAULT_POOL_IDS'): del app.DEFAULT_POOL_IDS + def _mock_out_binding(self, endpoint_id, neutron_port, neutron_subnets): + self.mox.StubOutWithMock(binding, 'port_bind') + fake_binding_response = ( + 'fake-veth', 'fake-veth_c', ('fake stdout', '')) + binding.port_bind(endpoint_id, neutron_port, + neutron_subnets).AndReturn(fake_binding_response) + self.mox.ReplayAll() + return fake_binding_response + def _mock_out_network(self, neutron_network_id, docker_network_id): fake_list_response = { "networks": [{ diff --git a/kuryr/tests/test_join.py b/kuryr/tests/test_join.py index 0ef09c2a..a4dadead 100644 --- a/kuryr/tests/test_join.py +++ b/kuryr/tests/test_join.py @@ -12,22 +12,30 @@ import hashlib import random +import uuid +import ddt +from oslo_concurrency import processutils from oslo_serialization import jsonutils +from werkzeug import exceptions as w_exceptions +from kuryr import app +from kuryr import binding +from kuryr.common import exceptions from kuryr.tests import base from kuryr import utils +@ddt.ddt class TestKuryrJoinFailures(base.TestKuryrFailures): """Unit tests for the failures for binding a Neutron port to an interface. """ def _invoke_join_request(self, docker_network_id, - docker_endpoint_id, sandbox_key): + docker_endpoint_id, container_id): data = { 'NetworkID': docker_network_id, 'EndpointID': docker_endpoint_id, - 'SandboxKey': sandbox_key, + 'SandboxKey': utils.get_sandbox_key(container_id), 'Options': {}, } response = self.app.post('/NetworkDriver.Join', @@ -36,6 +44,76 @@ class TestKuryrJoinFailures(base.TestKuryrFailures): return response + def _port_bind_with_exeption(self, docker_endpiont_id, neutron_port, + neutron_subnets, ex): + fake_ifname = 'fake-veth' + fake_binding_response = ( + fake_ifname, + fake_ifname + binding.CONTAINER_VETH_POSTFIX, + ('fake stdout', '') + ) + self.mox.StubOutWithMock(binding, 'port_bind') + if ex: + binding.port_bind( + docker_endpiont_id, neutron_port, neutron_subnets).AndRaise(ex) + else: + binding.port_bind( + docker_endpiont_id, neutron_port, neutron_subnets).AndReturn( + fake_binding_response) + self.mox.ReplayAll() + + return fake_binding_response + + @ddt.data(exceptions.VethCreationFailure, + processutils.ProcessExecutionError) + def test_join_veth_failures(self, GivenException): + fake_docker_network_id = hashlib.sha256( + str(random.getrandbits(256))).hexdigest() + fake_docker_endpoint_id = hashlib.sha256( + str(random.getrandbits(256))).hexdigest() + fake_container_id = hashlib.sha256( + str(random.getrandbits(256))).hexdigest() + + fake_neutron_network_id = str(uuid.uuid4()) + self._mock_out_network(fake_neutron_network_id, fake_docker_network_id) + fake_neutron_port_id = str(uuid.uuid4()) + self.mox.StubOutWithMock(app.neutron, 'list_ports') + neutron_port_name = utils.get_neutron_port_name( + fake_docker_endpoint_id) + fake_neutron_v4_subnet_id = str(uuid.uuid4()) + fake_neutron_v6_subnet_id = str(uuid.uuid4()) + fake_neutron_ports_response = self._get_fake_ports( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_port_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_ports(name=neutron_port_name).AndReturn( + fake_neutron_ports_response) + + self.mox.StubOutWithMock(app.neutron, 'list_subnets') + fake_neutron_subnets_response = self._get_fake_subnets( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_subnets(network_id=fake_neutron_network_id).AndReturn( + fake_neutron_subnets_response) + fake_neutron_port = fake_neutron_ports_response['ports'][0] + fake_neutron_subnets = fake_neutron_subnets_response['subnets'] + + fake_message = "fake message" + fake_exception = GivenException(fake_message) + self._port_bind_with_exeption( + fake_docker_endpoint_id, fake_neutron_port, + fake_neutron_subnets, fake_exception) + self.mox.ReplayAll() + + response = self._invoke_join_request( + fake_docker_network_id, fake_docker_endpoint_id, fake_container_id) + + self.assertEqual( + w_exceptions.InternalServerError.code, response.status_code) + decoded_json = jsonutils.loads(response.data) + self.assertTrue('Err' in decoded_json) + self.assertTrue(fake_message in decoded_json['Err']) + def test_join_bad_request(self): fake_docker_network_id = hashlib.sha256( str(random.getrandbits(256))).hexdigest() @@ -45,9 +123,10 @@ class TestKuryrJoinFailures(base.TestKuryrFailures): response = self._invoke_join_request( fake_docker_network_id, invalid_docker_endpoint_id, - utils.get_sandbox_key(fake_container_id)) + fake_container_id) - self.assertEqual(400, response.status_code) + self.assertEqual( + w_exceptions.BadRequest.code, response.status_code) decoded_json = jsonutils.loads(response.data) self.assertTrue('Err' in decoded_json) # TODO(tfukushima): Add the better error message validation. diff --git a/kuryr/tests/test_kuryr.py b/kuryr/tests/test_kuryr.py index bb3e179f..864126fa 100644 --- a/kuryr/tests/test_kuryr.py +++ b/kuryr/tests/test_kuryr.py @@ -18,6 +18,7 @@ import ddt from oslo_serialization import jsonutils from kuryr import app +from kuryr.common import config from kuryr.common import constants from kuryr.tests import base from kuryr import utils @@ -482,7 +483,37 @@ class TestKuryr(base.TestKuryrBase): fake_container_id = hashlib.sha256( str(random.getrandbits(256))).hexdigest() - data = { + fake_neutron_network_id = str(uuid.uuid4()) + self._mock_out_network(fake_neutron_network_id, fake_docker_network_id) + fake_neutron_port_id = str(uuid.uuid4()) + self.mox.StubOutWithMock(app.neutron, 'list_ports') + neutron_port_name = utils.get_neutron_port_name( + fake_docker_endpoint_id) + fake_neutron_v4_subnet_id = str(uuid.uuid4()) + fake_neutron_v6_subnet_id = str(uuid.uuid4()) + fake_neutron_ports_response = self._get_fake_ports( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_port_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_ports(name=neutron_port_name).AndReturn( + fake_neutron_ports_response) + + self.mox.StubOutWithMock(app.neutron, 'list_subnets') + fake_neutron_subnets_response = self._get_fake_subnets( + fake_docker_endpoint_id, fake_neutron_network_id, + fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id) + app.neutron.list_subnets(network_id=fake_neutron_network_id).AndReturn( + fake_neutron_subnets_response) + fake_neutron_port = fake_neutron_ports_response['ports'][0] + fake_neutron_subnets = fake_neutron_subnets_response['subnets'] + _, fake_peer_name, _ = self._mock_out_binding( + fake_docker_endpoint_id, fake_neutron_port, fake_neutron_subnets) + self.mox.ReplayAll() + + fake_subnets_dict_by_id = {subnet['id']: subnet + for subnet in fake_neutron_subnets} + + join_request = { 'NetworkID': fake_docker_network_id, 'EndpointID': fake_docker_endpoint_id, 'SandboxKey': utils.get_sandbox_key(fake_container_id), @@ -490,9 +521,22 @@ class TestKuryr(base.TestKuryrBase): } response = self.app.post('/NetworkDriver.Join', content_type='application/json', - data=jsonutils.dumps(data)) + data=jsonutils.dumps(join_request)) self.assertEqual(200, response.status_code) + decoded_json = jsonutils.loads(response.data) - app.logger.info(decoded_json) - self.assertEqual(constants.SCHEMA['JOIN'], decoded_json) + fake_neutron_v4_subnet = fake_subnets_dict_by_id[ + fake_neutron_v4_subnet_id] + fake_neutron_v6_subnet = fake_subnets_dict_by_id[ + fake_neutron_v6_subnet_id] + expected_response = { + 'Gateway': fake_neutron_v4_subnet['gateway_ip'], + 'GatewayIPv6': fake_neutron_v6_subnet['gateway_ip'], + 'InterfaceName': { + 'DstPrefix': config.CONF.binding.veth_dst_prefix, + 'SrcName': fake_peer_name, + }, + 'StaticRoutes': [] + } + self.assertEqual(expected_response, decoded_json) diff --git a/kuryr/utils.py b/kuryr/utils.py index 11256948..2d8ffd8f 100644 --- a/kuryr/utils.py +++ b/kuryr/utils.py @@ -19,8 +19,10 @@ import jsonschema from neutronclient.common import exceptions as n_exceptions from neutronclient.neutron import client from neutronclient.v2_0 import client as client_v2 +from oslo_concurrency import processutils from werkzeug import exceptions as w_exceptions +from kuryr.common import exceptions DOCKER_NETNS_BASE = '/var/run/docker/netns' PORT_POSTFIX = 'port' @@ -61,19 +63,21 @@ def make_json_app(import_name, **kwargs): """ app = flask.Flask(import_name, **kwargs) + @app.errorhandler(exceptions.KuryrException) @app.errorhandler(n_exceptions.NeutronClientException) @app.errorhandler(jsonschema.ValidationError) + @app.errorhandler(processutils.ProcessExecutionError) def make_json_error(ex): app.logger.error("Unexpected error happened: {0}".format(ex)) traceback.print_exc(file=sys.stderr) response = flask.jsonify({"Err": str(ex)}) - response.status_code = 500 + response.status_code = w_exceptions.InternalServerError.code if isinstance(ex, w_exceptions.HTTPException): response.status_code = ex.code elif isinstance(ex, n_exceptions.NeutronClientException): response.status_code = ex.status_code elif isinstance(ex, jsonschema.ValidationError): - response.status_code = 400 + response.status_code = w_exceptions.BadRequest.code content_type = 'application/vnd.docker.plugins.v1+json; charset=utf-8' response.headers['Content-Type'] = content_type return response diff --git a/requirements.txt b/requirements.txt index c027621a..7d926268 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,5 +7,8 @@ Babel>=1.3 Flask>=0.10,<1.0 jsonschema!=2.5.0,<3.0.0,>=2.0.0 netaddr>=0.7.12 +oslo.concurrency>=2.3.0 # Apache-2.0 oslo.serialization>=1.4.0 # Apache-2.0 +oslo.utils>=2.0.0 # Apache-2.0 python-neutronclient>=2.3.11,<3 +pyroute2>=0.3.10 # Apache-2.0 (+ dual licensed GPL2) diff --git a/usr/libexec/kuryr/midonet b/usr/libexec/kuryr/midonet new file mode 100755 index 00000000..cef2eda7 --- /dev/null +++ b/usr/libexec/kuryr/midonet @@ -0,0 +1,29 @@ +#!/bin/bash +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +bind_port() { + echo "Binding Neutron port $1 to the veth $2..." + mm-ctl --bind-port $1 $2 +} + +case $1 in + "bind") + shift + bind_port "$@" + exit 0 + ;; + *) + echo >&2 "$0: Invalid command $1." + exit 1 + ;; +esac diff --git a/usr/libexec/kuryr/unbound b/usr/libexec/kuryr/unbound new file mode 100644 index 00000000..e88745c8 --- /dev/null +++ b/usr/libexec/kuryr/unbound @@ -0,0 +1,16 @@ +#!/bin/bash +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +echo "binding:vif_type is invalid." +exit 1