Correct exit routine in web, merger
Change I216b76d6aaf7ebd01fa8cca843f03fd7a3eea16d unified the service stop sequence but omitted changes to zuul-web. Update zuul-web to match and make its sequence more robust. Also remove unecessary sys.exit calls from the merger. Change-Id: Ifdebc17878aa44d57996e4bdd46e49e6144b406b
This commit is contained in:
parent
b7c25e0f51
commit
1eda9ccf96
@ -16,7 +16,6 @@
|
||||
# under the License.
|
||||
|
||||
import signal
|
||||
import sys
|
||||
|
||||
import zuul.cmd
|
||||
import zuul.merger.server
|
||||
@ -34,7 +33,6 @@ class Merger(zuul.cmd.ZuulDaemonApp):
|
||||
def exit_handler(self, signum, frame):
|
||||
self.merger.stop()
|
||||
self.merger.join()
|
||||
sys.exit(0)
|
||||
|
||||
def run(self):
|
||||
self.handleCommands()
|
||||
|
@ -76,7 +76,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||
def exit_handler(self, signum, frame):
|
||||
self.sched.stop()
|
||||
self.sched.join()
|
||||
sys.exit(0)
|
||||
|
||||
def run(self):
|
||||
self.handleCommands()
|
||||
|
@ -67,7 +67,13 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
|
||||
self.log.info('Zuul Web Server starting')
|
||||
self.web.start()
|
||||
|
||||
self.web.join()
|
||||
try:
|
||||
self.web.join()
|
||||
except KeyboardInterrupt:
|
||||
print("Ctrl + C: asking process to exit nicely...\n")
|
||||
self.exit_handler(signal.SIGINT, None)
|
||||
self.web.join()
|
||||
|
||||
self.log.info("Zuul Web Server stopped")
|
||||
|
||||
def configure_authenticators(self):
|
||||
|
@ -1690,6 +1690,7 @@ class StreamManager(object):
|
||||
log = logging.getLogger("zuul.web")
|
||||
|
||||
def __init__(self, statsd, metrics):
|
||||
self.thread = None
|
||||
self.statsd = statsd
|
||||
self.metrics = metrics
|
||||
self.hostname = normalize_statsd_name(socket.getfqdn())
|
||||
@ -1709,9 +1710,10 @@ class StreamManager(object):
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
os.write(self.wake_write, b'\n')
|
||||
self.thread.join()
|
||||
if self.thread:
|
||||
self._stopped = True
|
||||
os.write(self.wake_write, b'\n')
|
||||
self.thread.join()
|
||||
|
||||
def run(self):
|
||||
while not self._stopped:
|
||||
@ -1789,11 +1791,13 @@ class ZuulWeb(object):
|
||||
connections,
|
||||
authenticators: AuthenticatorRegistry,
|
||||
info: WebInfo = None):
|
||||
self._running = False
|
||||
self.start_time = time.time()
|
||||
self.config = config
|
||||
self.tracing = tracing.Tracing(self.config)
|
||||
self.metrics = WebMetrics()
|
||||
self.statsd = get_statsd(config)
|
||||
self.wsplugin = None
|
||||
|
||||
self.listen_address = get_default(self.config,
|
||||
'web', 'listen_address',
|
||||
@ -1824,6 +1828,7 @@ class ZuulWeb(object):
|
||||
|
||||
self.component_registry = COMPONENT_REGISTRY.create(self.zk_client)
|
||||
|
||||
self.system_config_thread = None
|
||||
self.system_config_cache_wake_event = threading.Event()
|
||||
self.system_config_cache = SystemConfigCache(
|
||||
self.zk_client,
|
||||
@ -2023,38 +2028,51 @@ class ZuulWeb(object):
|
||||
return cherrypy.server.bound_addr[1]
|
||||
|
||||
def start(self):
|
||||
self.log.debug("ZuulWeb starting")
|
||||
self.log.info("ZuulWeb starting")
|
||||
|
||||
self._running = True
|
||||
self.component_info.state = self.component_info.INITIALIZING
|
||||
# Wait for system config and layouts to be loaded
|
||||
while not self.system_config_cache.is_valid:
|
||||
self.system_config_cache_wake_event.wait()
|
||||
|
||||
# Initialize the system config
|
||||
self.updateSystemConfig()
|
||||
|
||||
# Wait until all layouts/tenants are loaded
|
||||
while True:
|
||||
self.system_config_cache_wake_event.clear()
|
||||
self.updateLayout()
|
||||
if (set(self.unparsed_abide.tenants.keys())
|
||||
!= set(self.abide.tenants.keys())):
|
||||
self.system_config_cache_wake_event.wait()
|
||||
else:
|
||||
break
|
||||
|
||||
self.stream_manager.start()
|
||||
self.wsplugin = WebSocketPlugin(cherrypy.engine)
|
||||
self.wsplugin.subscribe()
|
||||
cherrypy.engine.start()
|
||||
|
||||
self.log.debug("Starting command processor")
|
||||
self.log.info("Starting command processor")
|
||||
self._command_running = True
|
||||
self.command_socket.start()
|
||||
self.command_thread = threading.Thread(target=self.runCommand,
|
||||
name='command')
|
||||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
|
||||
# Wait for system config and layouts to be loaded
|
||||
self.log.info("Waiting for system config from scheduler")
|
||||
while not self.system_config_cache.is_valid:
|
||||
self.system_config_cache_wake_event.wait(1)
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
# Initialize the system config
|
||||
self.updateSystemConfig()
|
||||
|
||||
# Wait until all layouts/tenants are loaded
|
||||
self.log.info("Waiting for all tenants to load")
|
||||
while True:
|
||||
self.system_config_cache_wake_event.clear()
|
||||
self.updateLayout()
|
||||
if (set(self.unparsed_abide.tenants.keys())
|
||||
!= set(self.abide.tenants.keys())):
|
||||
while True:
|
||||
self.system_config_cache_wake_event.wait(1)
|
||||
if not self._running:
|
||||
return
|
||||
if self.system_config_cache_wake_event.is_set():
|
||||
break
|
||||
else:
|
||||
break
|
||||
|
||||
self.log.info("Starting HTTP listeners")
|
||||
self.stream_manager.start()
|
||||
self.wsplugin = WebSocketPlugin(cherrypy.engine)
|
||||
self.wsplugin.subscribe()
|
||||
cherrypy.engine.start()
|
||||
|
||||
self.component_info.state = self.component_info.RUNNING
|
||||
|
||||
self.system_config_thread = threading.Thread(
|
||||
@ -2065,24 +2083,27 @@ class ZuulWeb(object):
|
||||
self.system_config_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("ZuulWeb stopping")
|
||||
self.log.info("ZuulWeb stopping")
|
||||
self._running = False
|
||||
self.component_info.state = self.component_info.STOPPED
|
||||
cherrypy.engine.exit()
|
||||
# Not strictly necessary, but without this, if the server is
|
||||
# started again (e.g., in the unit tests) it will reuse the
|
||||
# same host/port settings.
|
||||
cherrypy.server.httpserver = None
|
||||
self.wsplugin.unsubscribe()
|
||||
if self.wsplugin:
|
||||
self.wsplugin.unsubscribe()
|
||||
self.stream_manager.stop()
|
||||
self._system_config_running = False
|
||||
self.system_config_cache_wake_event.set()
|
||||
self.system_config_thread.join()
|
||||
self.zk_client.disconnect()
|
||||
if self.system_config_thread:
|
||||
self.system_config_thread.join()
|
||||
self.stopRepl()
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
self.monitoring_server.stop()
|
||||
self.tracing.stop()
|
||||
self.zk_client.disconnect()
|
||||
|
||||
def join(self):
|
||||
self.command_thread.join()
|
||||
|
Loading…
Reference in New Issue
Block a user