From 0c0ade435e08213da5999464ab7df8164a1aefbd Mon Sep 17 00:00:00 2001 From: Yash Gupta Date: Wed, 11 Dec 2019 15:29:15 +0900 Subject: [PATCH] gracefully exit daemonserver before registry exit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If the registry manager is stopped while daemon server is still processing requests, the pending requests fail and result in resource leaks in ovs. With this change, we wait for DaemonServer to finish processing before shutting down the registry manager. Closes-bug: 1856109 Change-Id: I37498ae7bf6a8b4b14afe53d1af3fb8823d589cf Signed-off-by: Yash Gupta Co-Authored-By: MichaƂ Dulko --- kuryr_kubernetes/cni/daemon/service.py | 31 +++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/kuryr_kubernetes/cni/daemon/service.py b/kuryr_kubernetes/cni/daemon/service.py index 56156ae72..449a58530 100644 --- a/kuryr_kubernetes/cni/daemon/service.py +++ b/kuryr_kubernetes/cni/daemon/service.py @@ -28,6 +28,7 @@ import cotyledon import flask import pyroute2 from pyroute2.ipdb import transactional +from werkzeug import serving import os_vif from oslo_concurrency import lockutils @@ -68,6 +69,7 @@ class DaemonServer(object): '/delNetwork', methods=['POST'], view_func=self.delete) self.headers = {'ContentType': 'application/json', 'Connection': 'close'} + self._server = None def _prepare_request(self): params = cni_utils.CNIParameters(flask.request.get_json()) @@ -184,12 +186,21 @@ class DaemonServer(object): raise try: - self.application.run(address, port, threaded=False, - processes=CONF.cni_daemon.worker_num) + self._server = serving.make_server( + address, port, self.application, threaded=False, + processes=CONF.cni_daemon.worker_num) + self._server.serve_forever() except Exception: LOG.exception('Failed to start kuryr-daemon.') raise + def stop(self): + LOG.info("Waiting for DaemonServer worker processes to exit...") + self._server._block_on_close = True + self._server.shutdown() + self._server.server_close() + LOG.info("All DaemonServer workers finished gracefully.") + def _check_failure(self): with self.failure_count.get_lock(): if self.failure_count.value < CONF.cni_daemon.cni_failures_count: @@ -221,6 +232,9 @@ class CNIDaemonServerService(cotyledon.Service): # Run HTTP server self.server.run() + def terminate(self): + self.server.stop() + class CNIDaemonWatcherService(cotyledon.Service): name = "watcher" @@ -328,6 +342,8 @@ class CNIDaemonHealthServerService(cotyledon.Service): class CNIDaemonServiceManager(cotyledon.ServiceManager): def __init__(self): + # NOTE(mdulko): Default shutdown timeout is 60 seconds and K8s won't + # wait more by default anyway. super(CNIDaemonServiceManager, self).__init__() # TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload. @@ -344,7 +360,8 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager): registry = self.manager.dict() # For Watcher->Server communication. healthy = multiprocessing.Value(c_bool, True) self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,)) - self.add(CNIDaemonServerService, workers=1, args=(registry, healthy,)) + self._server_service = self.add( + CNIDaemonServerService, workers=1, args=(registry, healthy)) self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,)) self.register_hooks(on_terminate=self.terminate) @@ -379,7 +396,15 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager): def terminate(self): self._terminate_called.set() + LOG.info("Gracefully stopping DaemonServer service..") + self.reconfigure(self._server_service, 0) + for worker in self._running_services[self._server_service]: + worker.terminate() + for worker in self._running_services[self._server_service]: + worker.join() + LOG.info("Stopping registry manager...") self.manager.shutdown() + LOG.info("Continuing with shutdown") def start():