Introduce CNI network driver for docker

This is the first step to switch docker network plugin to CNI.

Implement: blueprint migrate-docker-to-cni
Change-Id: I974daa2b2f33a86f4bd15b0a9de5c54b3f988114
This commit is contained in:
Hongbin Lu 2020-07-06 04:06:50 +00:00
parent 49ab5efe69
commit 25ee5972ec
12 changed files with 344 additions and 92 deletions

View File

@ -70,6 +70,7 @@ zun.image.driver =
zun.network.driver =
kuryr = zun.network.kuryr_network:KuryrNetwork
cni = zun.network.cni_network:ZunCNI
zun.volume.driver =
cinder = zun.volume.driver:Cinder

View File

@ -30,7 +30,7 @@ CONF = cfg.CONF
class BaseBindingDriver(object, metaclass=abc.ABCMeta):
"""Interface to attach ports to capsules."""
"""Interface to attach ports to capsules/containers."""
@abc.abstractmethod
def connect(self, vif, ifname, netns, container_id):
@ -95,9 +95,9 @@ def _configure_l3(vif_dict, ifname, netns, is_default_gateway):
except pyroute2.NetlinkError as ex:
if ex.code != errno.EEXIST:
raise
LOG.debug("Default route already exists in capsule for "
"vif=%s. Did not overwrite with requested "
"gateway=%s",
LOG.debug("Default route already exists in "
"capsule/container for vif=%s. Did not "
"overwrite with requested gateway=%s",
vif_dict, subnet['gateway'])

View File

@ -99,7 +99,7 @@ class VIFOpenVSwitchDriver(BaseBridgeDriver):
def connect(self, vif, ifname, netns, container_id):
super(VIFOpenVSwitchDriver, self).connect(vif, ifname, netns,
container_id)
# FIXME(irenab) use capsule_uuid (neutron port device_id)
# FIXME(irenab) use container_uuid (neutron port device_id)
instance_id = 'zun'
net_utils.create_ovs_vif_port(vif.bridge_name, vif.vif_name,
vif.port_profile.interface_id,

View File

@ -86,12 +86,13 @@ class DaemonServer(object):
self.plugin.delete(params)
except exception.ResourceNotReady:
# NOTE(dulek): It's better to ignore this error - most of the time
# it will happen when capsule is long gone and runtime
# overzealously tries to delete it from the network.
# We cannot really do anything without VIF metadata,
# so let's just tell runtime to move along.
# it will happen when capsule/container is long gone and runtime
# overzealously tries to delete it from the network. We cannot
# really do anything without VIF metadata, so let's just tell
# runtime to move along.
LOG.warning('Error when processing delNetwork request. '
'Ignoring this error, capsule is most likely gone')
'Ignoring this error, capsule/container is most '
'likely gone')
return '', httplib.NO_CONTENT, self.headers
except Exception:
LOG.exception('Error when processing delNetwork request. CNI '
@ -144,43 +145,45 @@ class CNIDaemonWatcherService(cotyledon.Service):
max_workers=1))
def run(self):
self.periodic.add(self.sync_capsules)
self.periodic.add(self.sync_containers)
self.periodic.add(self.poll_vif_status)
self.periodic.start()
@periodics.periodic(spacing=60, run_immediately=True)
def sync_capsules(self):
LOG.debug('Start syncing capsule states.')
def sync_containers(self):
LOG.debug('Start syncing capsule/container states.')
capsules = objects.Capsule.list_by_host(self.context, self.host)
capsule_in_db = set()
for capsule in capsules:
capsule_in_db.add(capsule.uuid)
capsule_in_registry = self.registry.keys()
# process capsules that are deleted
for uuid in capsule_in_registry:
if uuid not in capsule_in_db:
self._on_capsule_deleted(uuid)
containers = objects.Container.list_by_host(self.context, self.host)
container_in_db = set()
for container in (capsules + containers):
container_in_db.add(container.uuid)
container_in_registry = self.registry.keys()
# process capsules/containers that are deleted
for uuid in container_in_registry:
if uuid not in container_in_db:
self._on_container_deleted(uuid)
def _on_capsule_deleted(self, capsule_uuid):
def _on_container_deleted(self, container_uuid):
try:
# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code for CNI DEL so that
# we delete the registry entry exactly once
with lockutils.lock(capsule_uuid, external=True):
if self.registry[capsule_uuid]['vif_unplugged']:
LOG.debug("Remove capsule %(capsule)s from registry",
{'capsule': capsule_uuid})
del self.registry[capsule_uuid]
with lockutils.lock(container_uuid, external=True):
if self.registry[container_uuid]['vif_unplugged']:
LOG.debug("Remove capsule/container %(container)s from "
"registry", {'container': container_uuid})
del self.registry[container_uuid]
else:
LOG.debug("Received delete for capsule %(capsule)s",
{'capsule': capsule_uuid})
capsule_dict = self.registry[capsule_uuid]
capsule_dict['del_received'] = True
self.registry[capsule_uuid] = capsule_dict
LOG.debug("Received delete for capsule/container "
"%(container)s", {'container': container_uuid})
container_dict = self.registry[container_uuid]
container_dict['del_received'] = True
self.registry[container_uuid] = container_dict
except KeyError:
# This means someone else removed it. It's odd but safe to ignore.
LOG.debug('Capsule %s entry already removed from registry while '
'handling DELETED event. Ignoring.', capsule_uuid)
LOG.debug('Capsule/Container %s entry already removed from '
'registry while handling DELETED event. Ignoring.',
container_uuid)
pass
@periodics.periodic(spacing=1)
@ -188,9 +191,9 @@ class CNIDaemonWatcherService(cotyledon.Service):
# get a copy of registry data stored in manager process
registry_dict = self.registry.copy()
inactive_vifs = {}
for capsule_uuid in registry_dict:
for ifname in registry_dict[capsule_uuid]['vifs']:
vif_dict = registry_dict[capsule_uuid]['vifs'][ifname]
for container_uuid in registry_dict:
for ifname in registry_dict[container_uuid]['vifs']:
vif_dict = registry_dict[container_uuid]['vifs'][ifname]
if not vif_dict['active']:
inactive_vifs[vif_dict['id']] = ifname
if not inactive_vifs:
@ -200,21 +203,21 @@ class CNIDaemonWatcherService(cotyledon.Service):
# TODO(hongbin): search ports by device_owner as well
search_opts = {'binding:host_id': self.host}
ports = self.neutron_api.list_ports(**search_opts)['ports']
for capsule_uuid in registry_dict:
for container_uuid in registry_dict:
for port in ports:
port_id = port['id']
if port_id in inactive_vifs and utils.is_port_active(port):
ifname = inactive_vifs[port_id]
LOG.debug('sync status of port: %s', port_id)
self._update_vif_status(capsule_uuid, ifname)
self._update_vif_status(container_uuid, ifname)
def _update_vif_status(self, capsule_uuid, ifname):
with lockutils.lock(capsule_uuid, external=True):
capsule_dict = self.registry.get(capsule_uuid)
if capsule_dict:
capsule_dict = self.registry[capsule_uuid]
capsule_dict['vifs'][ifname]['active'] = True
self.registry[capsule_uuid] = capsule_dict
def _update_vif_status(self, container_uuid, ifname):
with lockutils.lock(container_uuid, external=True):
container_dict = self.registry.get(container_uuid)
if container_dict:
container_dict = self.registry[container_uuid]
container_dict['vifs'][ifname]['active'] = True
self.registry[container_uuid] = container_dict
def terminate(self):
if self.periodic:

View File

@ -29,6 +29,8 @@ from zun import objects
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
RETRY_DELAY = 1000 # 1 second in milliseconds
TYPE_CAPSULE = 'CAPSULE'
TYPE_CONTAINER = 'CONTAINER'
class ZunCNIRegistryPlugin(object):
@ -38,29 +40,33 @@ class ZunCNIRegistryPlugin(object):
self.context = zun_context.get_admin_context(all_projects=True)
self.neutron_api = neutron.NeutronAPI(self.context)
def _get_capsule_uuid(self, params):
# NOTE(hongbin): The runtime should set K8S_POD_NAME as capsule uuid
def _get_container_uuid(self, params):
# NOTE(hongbin): The runtime should set K8S_POD_NAME as
# capsule/container uuid
return params.args.K8S_POD_NAME
def _get_container_type(self, params):
return getattr(params.args, 'ZUN_CONTAINER_TYPE', TYPE_CAPSULE)
def add(self, params):
vifs = self._do_work(params, b_base.connect)
capsule_uuid = self._get_capsule_uuid(params)
container_uuid = self._get_container_uuid(params)
# NOTE(dulek): Saving containerid to be able to distinguish old DEL
# requests that we should ignore. We need a lock to
# prevent race conditions and replace whole object in the
# dict for multiprocessing.Manager to notice that.
with lockutils.lock(capsule_uuid, external=True):
self.registry[capsule_uuid] = {
with lockutils.lock(container_uuid, external=True):
self.registry[container_uuid] = {
'containerid': params.CNI_CONTAINERID,
'vif_unplugged': False,
'del_received': False,
'vifs': {ifname: {'active': vif.active, 'id': vif.id}
for ifname, vif in vifs.items()},
}
LOG.debug('Saved containerid = %s for capsule %s',
params.CNI_CONTAINERID, capsule_uuid)
LOG.debug('Saved containerid = %s for capsule/container %s',
params.CNI_CONTAINERID, container_uuid)
# Wait for VIFs to become active.
timeout = CONF.cni_daemon.vif_active_timeout
@ -73,27 +79,27 @@ class ZunCNIRegistryPlugin(object):
# vif is not active.
@retrying.retry(stop_max_delay=timeout * 1000, wait_fixed=RETRY_DELAY,
retry_on_result=any_vif_inactive)
def wait_for_active(capsule_uuid):
return self.registry[capsule_uuid]['vifs']
def wait_for_active(container_uuid):
return self.registry[container_uuid]['vifs']
result = wait_for_active(capsule_uuid)
result = wait_for_active(container_uuid)
for vif in result.values():
if not vif['active']:
LOG.error("Timed out waiting for vifs to become active")
raise exception.ResourceNotReady(resource=capsule_uuid)
raise exception.ResourceNotReady(resource=container_uuid)
return vifs[consts.DEFAULT_IFNAME]
def delete(self, params):
capsule_uuid = self._get_capsule_uuid(params)
container_uuid = self._get_container_uuid(params)
try:
reg_ci = self.registry[capsule_uuid]['containerid']
LOG.debug('Read containerid = %s for capsule %s',
reg_ci, capsule_uuid)
reg_ci = self.registry[container_uuid]['containerid']
LOG.debug('Read containerid = %s for capsule/container %s',
reg_ci, container_uuid)
if reg_ci and reg_ci != params.CNI_CONTAINERID:
# NOTE(dulek): This is a DEL request for some older (probably
# failed) ADD call. We should ignore it or we'll
# unplug a running capsule.
# unplug a running capsule/container.
LOG.warning('Received DEL request for unknown ADD call. '
'Ignoring.')
return
@ -103,36 +109,46 @@ class ZunCNIRegistryPlugin(object):
try:
self._do_work(params, b_base.disconnect)
except exception.ContainerNotFound:
LOG.warning('Capsule is not found in DB. Ignoring.')
LOG.warning('Capsule/Container is not found in DB. Ignoring.')
pass
# NOTE(ndesh): We need to lock here to avoid race condition
# with the deletion code in the watcher to ensure that
# we delete the registry entry exactly once
try:
with lockutils.lock(capsule_uuid, external=True):
if self.registry[capsule_uuid]['del_received']:
LOG.debug("Remove capsule %(capsule)s from registry",
{'capsule': capsule_uuid})
del self.registry[capsule_uuid]
with lockutils.lock(container_uuid, external=True):
if self.registry[container_uuid]['del_received']:
LOG.debug("Remove capsule/container %(container)s from "
"registry", {'container': container_uuid})
del self.registry[container_uuid]
else:
LOG.debug("unplug vif for capsule %(capsule)s",
{'capsule': capsule_uuid})
capsule_dict = self.registry[capsule_uuid]
capsule_dict['vif_unplugged'] = True
self.registry[capsule_uuid] = capsule_dict
LOG.debug("unplug vif for capsule/container %(container)s",
{'container': container_uuid})
container_dict = self.registry[container_uuid]
container_dict['vif_unplugged'] = True
self.registry[container_uuid] = container_dict
except KeyError:
# This means the capsule was removed before vif was unplugged. This
# shouldn't happen, but we can't do anything about it now
LOG.debug('Capsule %s not found while handling DEL request. '
'Ignoring.', capsule_uuid)
# This means the capsule/container was removed before vif was
# unplugged. This shouldn't happen, but we can't do anything
# about it now
LOG.debug('Capsule/Container %s not found while handling DEL '
'request. Ignoring.', container_uuid)
pass
def _do_work(self, params, fn):
capsule_uuid = self._get_capsule_uuid(params)
container_uuid = self._get_container_uuid(params)
container_type = self._get_container_type(params)
capsule = objects.Capsule.get_by_uuid(self.context, capsule_uuid)
vifs = cni_utils.get_vifs(capsule)
if container_type == TYPE_CAPSULE:
container = objects.Capsule.get_by_uuid(self.context,
container_uuid)
elif container_type == TYPE_CONTAINER:
container = objects.Container.get_by_uuid(self.context,
container_uuid)
else:
raise exception.CNIError('Unexpected type: %s', container_type)
vifs = cni_utils.get_vifs(container)
for ifname, vif in vifs.items():
is_default_gateway = (ifname == consts.DEFAULT_IFNAME)
@ -141,11 +157,11 @@ class ZunCNIRegistryPlugin(object):
# use the ifname supplied in the CNI ADD request
ifname = params.CNI_IFNAME
fn(vif, self._get_inst(capsule), ifname, params.CNI_NETNS,
is_default_gateway=is_default_gateway,
container_id=params.CNI_CONTAINERID)
fn(vif, self._get_inst(container), ifname, params.CNI_NETNS,
is_default_gateway=is_default_gateway,
container_id=params.CNI_CONTAINERID)
return vifs
def _get_inst(self, capsule):
def _get_inst(self, container):
return obj_vif.instance_info.InstanceInfo(
uuid=capsule.uuid, name=capsule.name)
uuid=container.uuid, name=container.name)

View File

@ -21,9 +21,9 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def get_vifs(capsule):
def get_vifs(container):
try:
cni_metadata = capsule.cni_metadata
cni_metadata = container.cni_metadata
vif_state = cni_metadata[consts.CNI_METADATA_VIF]
except KeyError:
return {}

View File

@ -84,4 +84,5 @@ CNI_EXCEPTION_CODE = 100
CNI_TIMEOUT_CODE = 200
DEFAULT_IFNAME = 'eth0'
CNI_METADATA_VIF = 'vif'
CNI_METADATA_PID = 'pid'
USERSPACE_DRIVERS = ['vfio-pci', 'uio', 'uio_pci_generic', 'igb_uio']

View File

@ -68,6 +68,9 @@ daemon_opts = [
"names. Expected that device of VIF related to "
"exact physnet should be binded on specified driver."),
default={}),
cfg.StrOpt('zun_cni_config_file',
help=_("Path to the Zun CNI config file."),
default='/etc/cni/net.d/10-zun-cni.conf'),
]
ALL_OPTS = (daemon_opts)

View File

@ -448,7 +448,7 @@ class DockerDriver(driver.BaseDriver, driver.ContainerDriver,
return
for neutron_net in container.addresses:
network_driver.disconnect_container_from_network(
container, neutron_network_id=neutron_net)
container, neutron_net)
def _cleanup_exposed_ports(self, neutron_api, container):
exposed_ports = {}
@ -758,6 +758,9 @@ class DockerDriver(driver.BaseDriver, driver.ContainerDriver,
docker.restart(container.container_id)
container.status = consts.RUNNING
container.status_reason = None
network_driver = zun_network.driver(context=context,
docker_api=docker)
network_driver.on_container_started(container)
return container
@check_container_id
@ -771,6 +774,13 @@ class DockerDriver(driver.BaseDriver, driver.ContainerDriver,
docker.stop(container.container_id)
container.status = consts.STOPPED
container.status_reason = None
network_driver = zun_network.driver(context=context,
docker_api=docker)
try:
network_driver.on_container_stopped(container)
except Exception as e:
LOG.error('network driver failed on stopping container: %s',
str(e))
return container
@check_container_id
@ -780,6 +790,22 @@ class DockerDriver(driver.BaseDriver, driver.ContainerDriver,
docker.start(container.container_id)
container.status = consts.RUNNING
container.status_reason = None
network_driver = zun_network.driver(context=context,
docker_api=docker)
try:
network_driver.on_container_started(container)
except Exception as e:
LOG.error('network driver failed on starting container: %s',
str(e))
try:
docker.stop(container.container_id, timeout=5)
LOG.debug('Stop container successfully')
network_driver.on_container_stopped(container)
LOG.debug('Network driver clean up successfully')
except Exception:
pass
container.status = consts.STOPPED
container.status_reason = _("failed to configure network")
return container
@check_container_id

192
zun/network/cni_network.py Normal file
View File

@ -0,0 +1,192 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shlex
from oslo_log import log as logging
from zun.common import consts
from zun.common import context as zun_context
from zun.common import exception
from zun.common import utils
import zun.conf
from zun.network import network
from zun.network import neutron
from zun.network import os_vif_util
from zun import objects
CONF = zun.conf.CONF
LOG = logging.getLogger(__name__)
DEVICE_OWNER = 'compute:zun'
ZUN_INIT_NETWORK_READY_PATH = '/zun-init-network-ready'
ZUN_INIT = ('while ! [ -f %(file)s ] ; '
'do sleep 1 ; done ; rm -f %(file)s ; exec '
) % {'file': ZUN_INIT_NETWORK_READY_PATH}
ZUN_CNI_CONF = ""
with open(CONF.cni_daemon.zun_cni_config_file, 'r') as f:
ZUN_CNI_CONF = f.read()
PATH = os.environ['PATH']
if "CNI_PATH" in os.environ:
PATH = os.environ['CNI_PATH'] + ':' + PATH
ZUN_CNI_BIN = "zun-cni"
ZUN_CNI_ADD_CMD = "ADD"
ZUN_CNI_DEL_CMD = "DEL"
class ZunCNI(network.Network):
def init(self, context, docker_api):
self.docker = docker_api
self.neutron_api = neutron.NeutronAPI(context)
self.context = context
def get_or_create_network(self, *args, **kwargs):
pass
def create_network(self, name, neutron_net_id):
pass
def remove_network(self, network):
pass
def connect_container_to_network(self, container, requested_network,
security_groups=None):
# TODO(hongbin): implement this in zun-cni-daemon
pass
def disconnect_container_from_network(self, container, neutron_network_id):
addrs_list = []
if container.addresses and neutron_network_id:
addrs_list = container.addresses.get(neutron_network_id, [])
neutron_ports = set()
all_ports = set()
for addr in addrs_list:
all_ports.add(addr['port'])
if not addr['preserve_on_delete']:
port_id = addr['port']
neutron_ports.add(port_id)
self.neutron_api.delete_or_unbind_ports(all_ports, neutron_ports)
def process_networking_config(self, container, requested_network,
host_config, container_kwargs, docker,
security_group_ids=None):
network_id = requested_network['network']
addresses, port = self.neutron_api.create_or_update_port(
container, network_id, requested_network, consts.DEVICE_OWNER_ZUN,
security_group_ids, set_binding_host=True)
container.addresses = {network_id: addresses}
admin_neutron_api = neutron.NeutronAPI(zun_context.get_admin_context())
network = admin_neutron_api.show_network(port['network_id'])['network']
subnets = {}
for fixed_ip in port['fixed_ips']:
subnet_id = fixed_ip['subnet_id']
subnets[subnet_id] = \
admin_neutron_api.show_subnet(subnet_id)['subnet']
vif_plugin = port.get('binding:vif_type')
vif_obj = os_vif_util.neutron_to_osvif_vif(vif_plugin, port, network,
subnets)
state = objects.vif.VIFState(default_vif=vif_obj)
state_dict = state.obj_to_primitive()
container.cni_metadata = {consts.CNI_METADATA_VIF: state_dict}
container.save(self.context)
host_config['network_mode'] = 'none'
container_kwargs['mac_address'] = port['mac_address']
# We manipulate entrypoint and command parameters in here.
token = (container.entrypoint or []) + (container.command or [])
new_command = ZUN_INIT + ' '.join(shlex.quote(t) for t in token)
new_entrypoint = ['/bin/sh', '-c']
container_kwargs['entrypoint'] = new_entrypoint
container_kwargs['command'] = [new_command]
def _get_env_variables(self, container, pid, cmd):
return {
'PATH': PATH,
'CNI_COMMAND': cmd,
'CNI_CONTAINERID': str(container.container_id),
'CNI_NETNS': '/proc/%s/ns/net' % pid,
'CNI_ARGS': "K8S_POD_NAME=%s;ZUN_CONTAINER_TYPE=CONTAINER" %
container.uuid,
'CNI_IFNAME': 'eth0',
}
def _add(self, container, pid):
env_variables = self._get_env_variables(container, pid,
ZUN_CNI_ADD_CMD)
utils.execute(ZUN_CNI_BIN,
process_input=ZUN_CNI_CONF,
env_variables=env_variables)
def _delete(self, container, pid):
env_variables = self._get_env_variables(container, pid,
ZUN_CNI_DEL_CMD)
utils.execute(ZUN_CNI_BIN,
process_input=ZUN_CNI_CONF,
env_variables=env_variables)
def _exec_command_in_container(self, container, cmd):
exec_id = self.docker.exec_create(container.container_id, cmd)['Id']
output = self.docker.exec_start(exec_id)
inspect_res = self.docker.exec_inspect(exec_id)
return inspect_res['ExitCode'], output
def on_container_started(self, container):
response = self.docker.inspect_container(container.container_id)
state = response.get('State')
if type(state) is dict and state.get('Pid'):
pid = state['Pid']
else:
pid = None
# no change
cni_metadata = container.cni_metadata
if cni_metadata.get(consts.CNI_METADATA_PID) == pid:
return
# remove container from old network
old_pid = cni_metadata.pop(consts.CNI_METADATA_PID, None)
if old_pid:
self._delete(container, old_pid)
container.cni_metadata = cni_metadata
container.save(self.context)
# add container to network
self._add(container, pid)
cni_metadata[consts.CNI_METADATA_PID] = pid
container.cni_metadata = cni_metadata
container.save(self.context)
# notify the container that network is setup
cmd = ['touch', ZUN_INIT_NETWORK_READY_PATH]
exit_code, output = self._exec_command_in_container(container, cmd)
if exit_code != 0:
raise exception.ZunException('Execute command %(cmd)s failed, '
'output is: %(output)s'
% {'cmd': ' '.join(cmd),
'output': output})
def on_container_stopped(self, container):
cni_metadata = container.cni_metadata
pid = cni_metadata.pop(consts.CNI_METADATA_PID, None)
if pid:
self._delete(container, pid)
container.cni_metadata = cni_metadata
container.save(self.context)

View File

@ -379,3 +379,9 @@ class KuryrNetwork(network.Network):
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception("Neutron Error:")
def on_container_started(self, container):
pass
def on_container_stopped(self, container):
pass

View File

@ -52,14 +52,18 @@ class Network(object, metaclass=abc.ABCMeta):
def connect_container_to_network(self, container, network_name, **kwargs):
raise NotImplementedError()
def disconnect_container_from_network(self, container, network_name,
**kwargs):
def disconnect_container_from_network(self, *args, **kwargs):
raise NotImplementedError()
def add_security_groups_to_ports(self, container, security_group_ids,
**kwargs):
def add_security_groups_to_ports(self, *args, **kwargs):
raise NotImplementedError()
def remove_security_groups_from_ports(self, container, security_group_ids,
**kwargs):
raise NotImplementedError()
def on_container_started(self, container):
raise NotImplementedError()
def on_container_stopped(self, container):
raise NotImplementedError()