diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 2cc722f97..c8ab93a66 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -21,7 +21,6 @@ import queue import sys import threading import time -import urllib.parse import urllib3 import cotyledon @@ -31,27 +30,23 @@ from pyroute2.ipdb import transactional from werkzeug import serving import os_vif -from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils from kuryr_kubernetes import clients -from kuryr_kubernetes.cni import handlers as h_cni +from kuryr_kubernetes.cni.daemon import watcher_service from kuryr_kubernetes.cni import health from kuryr_kubernetes.cni.plugins import k8s_cni_registry from kuryr_kubernetes.cni import prometheus_exporter from kuryr_kubernetes.cni import utils as cni_utils from kuryr_kubernetes import config -from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import exceptions from kuryr_kubernetes import objects -from kuryr_kubernetes import utils -from kuryr_kubernetes import watcher as k_watcher LOG = logging.getLogger(__name__) CONF = cfg.CONF -HEALTH_CHECKER_DELAY = 5 +ErrContainerUnknown = 3 ErrInvalidEnvironmentVariables = 4 ErrTryAgainLater = 11 ErrInternal = 999 @@ -107,6 +102,10 @@ class DaemonServer(object): try: vif = self.plugin.add(params) data = jsonutils.dumps(vif.obj_to_primitive()) + except (exceptions.CNIPodGone, exceptions.CNIPodUidMismatch) as e: + LOG.warning('Pod deleted while processing ADD request') + error = self._error(ErrContainerUnknown, str(e)) + return error, httplib.GONE, self.headers except exceptions.CNITimeout as e: LOG.exception('Timeout on ADD request') error = self._error(ErrTryAgainLater, f"{e}. Try Again Later.") @@ -247,89 +246,6 @@ class CNIDaemonServerService(cotyledon.Service): self.server.stop() -class CNIDaemonWatcherService(cotyledon.Service): - name = "watcher" - - def __init__(self, worker_id, registry, healthy): - super(CNIDaemonWatcherService, self).__init__(worker_id) - self.pipeline = None - self.watcher = None - self.health_thread = None - self.registry = registry - self.healthy = healthy - - def run(self): - self.pipeline = h_cni.CNIPipeline() - self.pipeline.register(h_cni.CallbackHandler(self.on_done, - self.on_deleted)) - self.watcher = k_watcher.Watcher(self.pipeline) - query_label = urllib.parse.quote_plus(f'{k_const.KURYRPORT_LABEL}=' - f'{utils.get_nodename()}') - - self.watcher.add(f'{k_const.K8S_API_CRD_KURYRPORTS}' - f'?labelSelector={query_label}') - - self.is_running = True - self.health_thread = threading.Thread( - target=self._start_watcher_health_checker) - self.health_thread.start() - self.watcher.start() - - def _start_watcher_health_checker(self): - while self.is_running: - if not self.watcher.is_alive(): - LOG.debug("Reporting watcher not healthy.") - with self.healthy.get_lock(): - self.healthy.value = False - time.sleep(HEALTH_CHECKER_DELAY) - - def on_done(self, kuryrport, vifs): - kp_name = utils.get_res_unique_name(kuryrport) - with lockutils.lock(kp_name, external=True): - if (kp_name not in self.registry or - self.registry[kp_name]['kp']['metadata']['uid'] - != kuryrport['metadata']['uid']): - self.registry[kp_name] = {'kp': kuryrport, - 'vifs': vifs, - 'containerid': None, - 'vif_unplugged': False, - 'del_received': False} - else: - old_vifs = self.registry[kp_name]['vifs'] - for iface in vifs: - if old_vifs[iface].active != vifs[iface].active: - kp_dict = self.registry[kp_name] - kp_dict['vifs'] = vifs - self.registry[kp_name] = kp_dict - - def on_deleted(self, kp): - kp_name = utils.get_res_unique_name(kp) - try: - if kp_name in self.registry: - # NOTE(ndesh): We need to lock here to avoid race condition - # with the deletion code for CNI DEL so that - # we delete the registry entry exactly once - with lockutils.lock(kp_name, external=True): - if self.registry[kp_name]['vif_unplugged']: - del self.registry[kp_name] - else: - kp_dict = self.registry[kp_name] - kp_dict['del_received'] = True - self.registry[kp_name] = kp_dict - except KeyError: - # This means someone else removed it. It's odd but safe to ignore. - LOG.debug('KuryrPort %s entry already removed from registry while ' - 'handling DELETED event. Ignoring.', kp_name) - pass - - def terminate(self): - self.is_running = False - if self.health_thread: - self.health_thread.join() - if self.watcher: - self.watcher.stop() - - class CNIDaemonHealthServerService(cotyledon.Service): name = "health" @@ -393,7 +309,10 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager): registry = self.manager.dict() # For Watcher->Server communication. healthy = multiprocessing.Value(c_bool, True) metrics = self.manager.Queue() - self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,)) + self.add(watcher_service.KuryrPortWatcherService, workers=1, + args=(registry, healthy,)) + self.add(watcher_service.PodWatcherService, workers=1, + args=(registry, healthy,)) self._server_service = self.add(CNIDaemonServerService, workers=1, args=(registry, healthy, metrics,)) self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,)) diff --git a/kuryr_kubernetes/cni/daemon/watcher_service.py b/kuryr_kubernetes/cni/daemon/watcher_service.py new file mode 100644 index 000000000..cb6987a36 --- /dev/null +++ b/kuryr_kubernetes/cni/daemon/watcher_service.py @@ -0,0 +1,94 @@ +# Copyright 2022 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading +import time +import urllib.parse + +import cotyledon +from oslo_config import cfg +from oslo_log import log as logging + +from kuryr_kubernetes.cni import handlers +from kuryr_kubernetes import constants as k_const +from kuryr_kubernetes import utils +from kuryr_kubernetes import watcher as k_watcher + + +HEALTH_CHECKER_DELAY = 5 +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class BaseCNIDaemonWatcherService(cotyledon.Service): + name = "watcher" + + def __init__(self, worker_id, handler, path, registry, healthy): + super().__init__(worker_id) + self.pipeline = None + self.watcher = None + self.health_thread = None + self.handler = handler + self.registry = registry + self.healthy = healthy + self.path = path + self.is_running = False + + def run(self): + self.pipeline = handlers.CNIPipeline() + self.pipeline.register(self.handler) + self.watcher = k_watcher.Watcher(self.pipeline) + self.watcher.add(self.path) + + self.is_running = True + + self.health_thread = threading.Thread( + target=self._start_watcher_health_checker) + self.health_thread.start() + + self.watcher.start() + + def _start_watcher_health_checker(self): + while self.is_running: + if not self.watcher.is_alive(): + LOG.warning(f"Reporting watcher {self.__class__.__name__} is " + f"not healthy because it's not running anymore.") + with self.healthy.get_lock(): + self.healthy.value = False + time.sleep(HEALTH_CHECKER_DELAY) + + def terminate(self): + self.is_running = False + if self.health_thread: + self.health_thread.join() + if self.watcher: + self.watcher.stop() + + +class KuryrPortWatcherService(BaseCNIDaemonWatcherService): + def __init__(self, worker_id, registry, healthy): + query_label = urllib.parse.quote_plus(f'{k_const.KURYRPORT_LABEL}=' + f'{utils.get_nodename()}') + path = f'{k_const.K8S_API_CRD_KURYRPORTS}?labelSelector={query_label}' + handler = handlers.CNIKuryrPortHandler(registry) + super().__init__(worker_id, handler, path, registry, healthy) + + +class PodWatcherService(BaseCNIDaemonWatcherService): + def __init__(self, worker_id, registry, healthy): + query_label = urllib.parse.quote_plus(f'spec.nodeName=' + f'{utils.get_nodename()}') + path = f'{k_const.K8S_API_PODS}?fieldSelector={query_label}' + handler = handlers.CNIPodHandler(registry) + super().__init__(worker_id, handler, path, registry, healthy) diff --git a/kuryr_kubernetes/cni/handlers.py b/kuryr_kubernetes/cni/handlers.py index 9abad6904..5e90f0dd6 100644 --- a/kuryr_kubernetes/cni/handlers.py +++ b/kuryr_kubernetes/cni/handlers.py @@ -13,105 +13,98 @@ # License for the specific language governing permissions and limitations # under the License. -import abc - from os_vif import objects as obj_vif +from oslo_concurrency import lockutils from oslo_log import log as logging -from kuryr_kubernetes import clients from kuryr_kubernetes import constants as k_const -from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes.handlers import dispatch as k_dis from kuryr_kubernetes.handlers import k8s_base +from kuryr_kubernetes import utils LOG = logging.getLogger(__name__) -class CNIHandlerBase(k8s_base.ResourceEventHandler, metaclass=abc.ABCMeta): +class CNIKuryrPortHandler(k8s_base.ResourceEventHandler): OBJECT_KIND = k_const.K8S_OBJ_KURYRPORT - def __init__(self, cni, on_done): - self._cni = cni - self._callback = on_done - self._vifs = {} + def __init__(self, registry): + super().__init__() + self.registry = registry - def on_present(self, pod, *args, **kwargs): - vifs = self._get_vifs(pod) + def on_vif(self, kuryrport, vifs): + kp_name = utils.get_res_unique_name(kuryrport) + with lockutils.lock(kp_name, external=True): + if (kp_name not in self.registry or + self.registry[kp_name] == k_const.CNI_DELETED_POD_SENTINEL + or self.registry[kp_name]['kp']['metadata']['uid'] != + kuryrport['metadata']['uid']): + self.registry[kp_name] = {'kp': kuryrport, + 'vifs': vifs, + 'containerid': None, + 'vif_unplugged': False, + 'del_received': False} + else: + old_vifs = self.registry[kp_name]['vifs'] + for iface in vifs: + if old_vifs[iface].active != vifs[iface].active: + kp_dict = self.registry[kp_name] + kp_dict['vifs'] = vifs + self.registry[kp_name] = kp_dict - if self.should_callback(pod, vifs): - self.callback() - - @abc.abstractmethod - def should_callback(self, pod, vifs): - """Called after all vifs have been processed - - Should determine if the CNI is ready to call the callback - - :param pod: dict containing Kubernetes Pod object - :param vifs: dict containing os_vif VIF objects and ifnames - :returns True/False - """ - raise NotImplementedError() - - @abc.abstractmethod - def callback(self): - """Called if should_callback returns True""" - raise NotImplementedError() - - def _get_vifs(self, pod): - k8s = clients.get_kubernetes_client() + def on_deleted(self, kuryrport, *args, **kwargs): + kp_name = utils.get_res_unique_name(kuryrport) try: - kuryrport_crd = k8s.get(f'{k_const.K8S_API_CRD_NAMESPACES}/' - f'{pod["metadata"]["namespace"]}/' - f'kuryrports/{pod["metadata"]["name"]}') - LOG.debug("Got CRD: %r", kuryrport_crd) - except k_exc.K8sClientException: - return {} + if (kp_name in self.registry and self.registry[kp_name] + != k_const.CNI_DELETED_POD_SENTINEL): + # NOTE(ndesh): We need to lock here to avoid race condition + # with the deletion code for CNI DEL so that + # we delete the registry entry exactly once + with lockutils.lock(kp_name, external=True): + if self.registry[kp_name]['vif_unplugged']: + del self.registry[kp_name] + else: + kp_dict = self.registry[kp_name] + kp_dict['del_received'] = True + self.registry[kp_name] = kp_dict + except KeyError: + # This means someone else removed it. It's odd but safe to ignore. + LOG.debug('KuryrPort %s entry already removed from registry while ' + 'handling DELETED event. Ignoring.', kp_name) + pass - vifs_dict = {k: obj_vif.base.VersionedObject - .obj_from_primitive(v['vif']) - for k, v in kuryrport_crd['status']['vifs'].items()} + def on_present(self, kuryrport, *args, **kwargs): + LOG.debug('MODIFIED event for KuryrPort %s', + utils.get_res_unique_name(kuryrport)) + vifs = self._get_vifs(kuryrport) + if vifs: + self.on_vif(kuryrport, vifs) + + def _get_vifs(self, kuryrport): + vifs_dict = { + k: obj_vif.base.VersionedObject.obj_from_primitive(v['vif']) + for k, v in kuryrport['status']['vifs'].items()} LOG.debug("Got vifs: %r", vifs_dict) return vifs_dict - def _get_inst(self, pod): - return obj_vif.instance_info.InstanceInfo( - uuid=pod['metadata']['uid'], name=pod['metadata']['name']) +class CNIPodHandler(k8s_base.ResourceEventHandler): + OBJECT_KIND = k_const.K8S_OBJ_POD -class CallbackHandler(CNIHandlerBase): + def __init__(self, registry): + super().__init__() + self.registry = registry - def __init__(self, on_vif, on_del=None): - super(CallbackHandler, self).__init__(None, on_vif) - self._del_callback = on_del - self._kuryrport = None - self._callback_vifs = None - - def should_callback(self, kuryrport, vifs): - """Called after all vifs have been processed - - Calls callback if there was at least one vif in the CRD - - :param kuryrport: dict containing Kubernetes KuryrPort CRD object - :param vifs: dict containing os_vif VIF objects and ifnames - :returns True/False - """ - self._kuryrport = kuryrport - self._callback_vifs = vifs - if vifs: - return True - return False - - def callback(self): - self._callback(self._kuryrport, self._callback_vifs) - - def on_deleted(self, kuryrport, *args, **kwargs): - LOG.debug("Got kuryrport %s deletion event.", - kuryrport['metadata']['name']) - if self._del_callback: - self._del_callback(kuryrport) + def on_finalize(self, pod, *args, **kwargs): + # TODO(dulek): Verify if this is the handler for such case. + kp_name = utils.get_res_unique_name(pod) + with lockutils.lock(kp_name, external=True): + # If there was no KP and Pod got deleted, we need inform the + # thread waiting for it about that. We'll insert sentinel value. + if kp_name not in self.registry: + self.registry[kp_name] = k_const.CNI_DELETED_POD_SENTINEL class CNIPipeline(k_dis.EventPipeline): diff --git a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py index 42441c762..88b810021 100644 --- a/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py +++ b/kuryr_kubernetes/cni/plugins/k8s_cni_registry.py @@ -31,15 +31,6 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF RETRY_DELAY = 1000 # 1 second in milliseconds -# TODO(dulek, gryf): Another corner case is (and was) when pod is deleted -# before it's corresponding CRD was created and populated by vifs by -# controller or even noticed by any watcher. Kubelet will try to delete such -# vif, but we will have no data about it. This is currently worked around by -# returning successfully in case of timing out in delete. To solve this -# properly we need to watch for pod deletes as well, or perhaps create -# finalizer for the pod as soon, as we know, that kuryrport CRD will be -# created. - class K8sCNIRegistryPlugin(base_cni.CNIPlugin): def __init__(self, registry, healthy): @@ -57,6 +48,8 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): try: return self.k8s.get( f'{k_const.K8S_API_NAMESPACES}/{namespace}/pods/{name}') + except exceptions.K8sResourceNotFound: + return None except exceptions.K8sClientException: uniq_name = self._get_obj_name(params) LOG.exception('Error when getting Pod %s', uniq_name) @@ -72,6 +65,8 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): if 'K8S_POD_UID' not in params.args: # CRI doesn't pass K8S_POD_UID, get it from the API. pod = self._get_pod(params) + if not pod: + raise exceptions.CNIPodGone(kp_name) params.args.K8S_POD_UID = pod['metadata']['uid'] vifs = self._do_work(params, b_base.connect, timeout) @@ -117,6 +112,14 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): def delete(self, params): kp_name = self._get_obj_name(params) try: + with lockutils.lock(kp_name, external=True): + kp = self.registry[kp_name] + if kp == k_const.CNI_DELETED_POD_SENTINEL: + LOG.warning( + 'Received DEL request for deleted Pod %s without a' + 'KuryrPort. Ignoring.', kp_name) + del self.registry[kp_name] + return reg_ci = self.registry[kp_name]['containerid'] LOG.debug('Read containerid = %s for KuryrPort %s', reg_ci, kp_name) @@ -179,6 +182,10 @@ class K8sCNIRegistryPlugin(base_cni.CNIPlugin): e, (KeyError, exceptions.CNIPodUidMismatch))) def find(): d = self.registry[kp_name] + if d == k_const.CNI_DELETED_POD_SENTINEL: + # Pod got deleted meanwhile + raise exceptions.CNIPodGone(kp_name) + static = d['kp']['spec'].get('podStatic', None) uid = d['kp']['spec']['podUid'] # FIXME(dulek): This is weirdly structured for upgrades support. diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py index 29a195c06..1b1a33d24 100644 --- a/kuryr_kubernetes/constants.py +++ b/kuryr_kubernetes/constants.py @@ -16,6 +16,7 @@ KURYR_FQDN = 'kuryr.openstack.org' K8S_API_BASE = '/api/v1' +K8S_API_PODS = K8S_API_BASE + '/pods' K8S_API_NAMESPACES = K8S_API_BASE + '/namespaces' K8S_API_CRD_VERSION = 'openstack.org/v1' K8S_API_CRD = '/apis/' + K8S_API_CRD_VERSION @@ -91,6 +92,7 @@ K8S_OS_VIF_NOOP_PLUGIN = "noop" CNI_EXCEPTION_CODE = 100 CNI_TIMEOUT_CODE = 200 +CNI_DELETED_POD_SENTINEL = None KURYR_PORT_NAME = 'kuryr-pool-port' KURYR_VIF_TYPE_SRIOV = 'sriov' diff --git a/kuryr_kubernetes/exceptions.py b/kuryr_kubernetes/exceptions.py index 283ff6cfb..0da90b495 100644 --- a/kuryr_kubernetes/exceptions.py +++ b/kuryr_kubernetes/exceptions.py @@ -170,7 +170,7 @@ class CNIBindingFailure(Exception): super(CNIBindingFailure, self).__init__(message) -class CNIPodUidMismatch(CNITimeout): +class CNIPodUidMismatch(Exception): """Excepton raised on a mismatch of CNI request's pod UID and KuryrPort""" def __init__(self, name, expected, observed): super().__init__( @@ -179,6 +179,13 @@ class CNIPodUidMismatch(CNITimeout): f' race conditions.') +class CNIPodGone(Exception): + """Excepton raised when Pod got deleted while processing a CNI request""" + def __init__(self, name): + super().__init__( + f'Pod {name} got deleted while processing the CNI ADD request.') + + class UnreachableOctavia(Exception): """Exception indicates Octavia API failure and can not be reached diff --git a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py index 307d0083a..678b70ad6 100644 --- a/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py +++ b/kuryr_kubernetes/tests/unit/cni/plugins/test_k8s_cni_registry.py @@ -197,8 +197,9 @@ class TestK8sCNIRegistryPlugin(base.TestCase): is_default_gateway=False, container_id='cont_id') + @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('kuryr_kubernetes.cni.binding.base.disconnect') - def test_del_wrong_container_id(self, m_disconnect): + def test_del_wrong_container_id(self, m_disconnect, m_lock): registry = {'default/foo': {'kp': self.kp, 'vifs': self.vifs, 'containerid': 'different'}} healthy = mock.Mock() @@ -206,6 +207,7 @@ class TestK8sCNIRegistryPlugin(base.TestCase): self.plugin.delete(self.params) m_disconnect.assert_not_called() + m_lock.assert_called_with('default/foo', external=True) @mock.patch('oslo_concurrency.lockutils.lock') @mock.patch('time.sleep', mock.Mock()) diff --git a/kuryr_kubernetes/tests/unit/cni/test_handlers.py b/kuryr_kubernetes/tests/unit/cni/test_handlers.py new file mode 100644 index 000000000..75dff35b6 --- /dev/null +++ b/kuryr_kubernetes/tests/unit/cni/test_handlers.py @@ -0,0 +1,68 @@ +# Copyright 2021 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +from kuryr_kubernetes.cni import handlers +from kuryr_kubernetes.tests import base + + +class TestCNIDaemonHandlers(base.TestCase): + def setUp(self): + super().setUp() + self.registry = {} + self.pod = {'metadata': {'namespace': 'testing', + 'name': 'default'}, + 'vif_unplugged': False, + 'del_receieved': False} + self.healthy = mock.Mock() + self.port_handler = handlers.CNIKuryrPortHandler(self.registry) + self.pod_handler = handlers.CNIPodHandler(self.registry) + + @mock.patch('oslo_concurrency.lockutils.lock') + def test_kp_on_deleted(self, m_lock): + pod = self.pod + pod['vif_unplugged'] = True + pod_name = 'testing/default' + self.registry[pod_name] = pod + self.port_handler.on_deleted(pod) + self.assertNotIn(pod_name, self.registry) + + @mock.patch('oslo_concurrency.lockutils.lock') + def test_kp_on_deleted_false(self, m_lock): + pod = self.pod + pod_name = 'testing/default' + self.registry[pod_name] = pod + self.port_handler.on_deleted(pod) + self.assertIn(pod_name, self.registry) + self.assertIs(True, pod['del_received']) + + @mock.patch('oslo_concurrency.lockutils.lock') + def test_pod_on_finalize(self, m_lock): + pod = self.pod + pod_name = 'testing/default' + self.pod_handler.on_finalize(pod) + self.assertIn(pod_name, self.registry) + self.assertIsNone(self.registry[pod_name]) + m_lock.assert_called_once_with(pod_name, external=True) + + @mock.patch('oslo_concurrency.lockutils.lock') + def test_pod_on_finalize_exists(self, m_lock): + pod = self.pod + pod_name = 'testing/default' + self.registry[pod_name] = pod + self.pod_handler.on_finalize(pod) + self.assertIn(pod_name, self.registry) + self.assertIsNotNone(self.registry[pod_name]) + m_lock.assert_called_once_with(pod_name, external=True) diff --git a/kuryr_kubernetes/tests/unit/cni/test_service.py b/kuryr_kubernetes/tests/unit/cni/test_service.py index af1038c38..ef1ece7fe 100644 --- a/kuryr_kubernetes/tests/unit/cni/test_service.py +++ b/kuryr_kubernetes/tests/unit/cni/test_service.py @@ -107,34 +107,3 @@ class TestDaemonServer(base.TestCase): m_delete.assert_called_once_with(mock.ANY) self.assertEqual(500, resp.status_code) - - -class TestCNIDaemonWatcherService(base.TestCase): - def setUp(self): - super(TestCNIDaemonWatcherService, self).setUp() - self.registry = {} - self.pod = {'metadata': {'namespace': 'testing', - 'name': 'default'}, - 'vif_unplugged': False, - 'del_receieved': False} - self.healthy = mock.Mock() - self.watcher = service.CNIDaemonWatcherService( - 0, self.registry, self.healthy) - - @mock.patch('oslo_concurrency.lockutils.lock') - def test_on_deleted(self, m_lock): - pod = self.pod - pod['vif_unplugged'] = True - pod_name = 'testing/default' - self.registry[pod_name] = pod - self.watcher.on_deleted(pod) - self.assertNotIn(pod_name, self.registry) - - @mock.patch('oslo_concurrency.lockutils.lock') - def test_on_deleted_false(self, m_lock): - pod = self.pod - pod_name = 'testing/default' - self.registry[pod_name] = pod - self.watcher.on_deleted(pod) - self.assertIn(pod_name, self.registry) - self.assertIs(True, pod['del_received'])