Merge "gracefully exit daemonserver before registry exit"

This commit is contained in:
Zuul 2021-07-12 16:29:22 +00:00 committed by Gerrit Code Review
commit aae39f5011

View File

@ -28,6 +28,7 @@ import cotyledon
import flask import flask
import pyroute2 import pyroute2
from pyroute2.ipdb import transactional from pyroute2.ipdb import transactional
from werkzeug import serving
import os_vif import os_vif
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
@ -68,6 +69,7 @@ class DaemonServer(object):
'/delNetwork', methods=['POST'], view_func=self.delete) '/delNetwork', methods=['POST'], view_func=self.delete)
self.headers = {'ContentType': 'application/json', self.headers = {'ContentType': 'application/json',
'Connection': 'close'} 'Connection': 'close'}
self._server = None
def _prepare_request(self): def _prepare_request(self):
params = cni_utils.CNIParameters(flask.request.get_json()) params = cni_utils.CNIParameters(flask.request.get_json())
@ -184,12 +186,21 @@ class DaemonServer(object):
raise raise
try: try:
self.application.run(address, port, threaded=False, self._server = serving.make_server(
processes=CONF.cni_daemon.worker_num) address, port, self.application, threaded=False,
processes=CONF.cni_daemon.worker_num)
self._server.serve_forever()
except Exception: except Exception:
LOG.exception('Failed to start kuryr-daemon.') LOG.exception('Failed to start kuryr-daemon.')
raise raise
def stop(self):
LOG.info("Waiting for DaemonServer worker processes to exit...")
self._server._block_on_close = True
self._server.shutdown()
self._server.server_close()
LOG.info("All DaemonServer workers finished gracefully.")
def _check_failure(self): def _check_failure(self):
with self.failure_count.get_lock(): with self.failure_count.get_lock():
if self.failure_count.value < CONF.cni_daemon.cni_failures_count: if self.failure_count.value < CONF.cni_daemon.cni_failures_count:
@ -221,6 +232,9 @@ class CNIDaemonServerService(cotyledon.Service):
# Run HTTP server # Run HTTP server
self.server.run() self.server.run()
def terminate(self):
self.server.stop()
class CNIDaemonWatcherService(cotyledon.Service): class CNIDaemonWatcherService(cotyledon.Service):
name = "watcher" name = "watcher"
@ -328,6 +342,8 @@ class CNIDaemonHealthServerService(cotyledon.Service):
class CNIDaemonServiceManager(cotyledon.ServiceManager): class CNIDaemonServiceManager(cotyledon.ServiceManager):
def __init__(self): def __init__(self):
# NOTE(mdulko): Default shutdown timeout is 60 seconds and K8s won't
# wait more by default anyway.
super(CNIDaemonServiceManager, self).__init__() super(CNIDaemonServiceManager, self).__init__()
# TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload. # TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload.
@ -344,7 +360,8 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager):
registry = self.manager.dict() # For Watcher->Server communication. registry = self.manager.dict() # For Watcher->Server communication.
healthy = multiprocessing.Value(c_bool, True) healthy = multiprocessing.Value(c_bool, True)
self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,)) self.add(CNIDaemonWatcherService, workers=1, args=(registry, healthy,))
self.add(CNIDaemonServerService, workers=1, args=(registry, healthy,)) self._server_service = self.add(
CNIDaemonServerService, workers=1, args=(registry, healthy))
self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,)) self.add(CNIDaemonHealthServerService, workers=1, args=(healthy,))
self.register_hooks(on_terminate=self.terminate) self.register_hooks(on_terminate=self.terminate)
@ -379,7 +396,15 @@ class CNIDaemonServiceManager(cotyledon.ServiceManager):
def terminate(self): def terminate(self):
self._terminate_called.set() self._terminate_called.set()
LOG.info("Gracefully stopping DaemonServer service..")
self.reconfigure(self._server_service, 0)
for worker in self._running_services[self._server_service]:
worker.terminate()
for worker in self._running_services[self._server_service]:
worker.join()
LOG.info("Stopping registry manager...")
self.manager.shutdown() self.manager.shutdown()
LOG.info("Continuing with shutdown")
def start(): def start():