From fb4c6402a46f8f96ba09f681bfd5356329d416a9 Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Tue, 4 Sep 2018 13:52:33 +0200 Subject: [PATCH] Use gearman client keepalive If the gearman server vanishes (e.g. due to a VM crash) some clients like the merger may not notice that it is gone. They just wait forever for data to be received on an inactive connection. In our case the VM containing the zuul-scheduler crashed and after the restart of the scheduler all mergers were waiting for data on the stale connection which blocked a successful scheduler restart. Using tcp keepalive we can detect that situation and let broken inactive connections be killed by the kernel. Depends-On: I8589cd45450245a25539c051355b38d16ee9f4b9 Change-Id: I30049d59d873d64f3b69c5587c775827e3545854 --- requirements.txt | 2 +- zuul/driver/github/githubconnection.py | 4 +++- zuul/executor/client.py | 4 +++- zuul/executor/server.py | 8 ++++++-- zuul/merger/client.py | 4 +++- zuul/merger/server.py | 4 +++- zuul/rpcclient.py | 4 +++- zuul/rpclistener.py | 4 +++- 8 files changed, 25 insertions(+), 9 deletions(-) diff --git a/requirements.txt b/requirements.txt index 4e6611f5ec..a7a7779daa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ python-daemon>=2.0.4,<2.1.0 extras statsd>=3.0 voluptuous>=0.10.2 -gear>=0.9.0,<1.0.0 +gear>=0.13.0,<1.0.0 apscheduler>=3.0 PrettyTable>=0.6,<0.8 babel>=1.0 diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 81a35560b4..cc74c90346 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -151,7 +151,9 @@ class GithubGearmanWorker(object): ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.gearman = gear.TextWorker('Zuul Github Connector') self.log.debug("Connect to gearman") - self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.log.debug("Waiting for server") self.gearman.waitForServer() self.log.debug("Registering") diff --git a/zuul/executor/client.py b/zuul/executor/client.py index e135bb1c7b..241781b11e 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -121,7 +121,9 @@ class ExecutorClient(object): ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.gearman = ZuulGearmanClient(self) - self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.cleanup_thread = GearmanCleanup(self) self.cleanup_thread.start() diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 75176054ae..d224990dc4 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -2090,10 +2090,14 @@ class ExecutorServer(object): ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.merger_worker = ExecutorMergeWorker(self, 'Zuul Executor Merger') - self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.merger_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.executor_worker = ExecutorExecuteWorker( self, 'Zuul Executor Server') - self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.executor_worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.log.debug("Waiting for server") self.merger_worker.waitForServer() self.executor_worker.waitForServer() diff --git a/zuul/merger/client.py b/zuul/merger/client.py index c89a6fba8a..78c17fa303 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -82,7 +82,9 @@ class MergeClient(object): ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.log.debug("Connecting to gearman at %s:%s" % (server, port)) self.gearman = MergeGearmanClient(self) - self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.log.debug("Waiting for gearman") self.gearman.waitForServer() self.jobs = set() diff --git a/zuul/merger/server.py b/zuul/merger/server.py index b7d0fd42e3..e68a939e59 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -60,7 +60,9 @@ class MergeServer(object): ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.worker = gear.TextWorker('Zuul Merger') - self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.log.debug("Waiting for server") self.worker.waitForServer() self.log.debug("Registering") diff --git a/zuul/rpcclient.py b/zuul/rpcclient.py index d4a7bc9686..ef12b96a2a 100644 --- a/zuul/rpcclient.py +++ b/zuul/rpcclient.py @@ -29,7 +29,9 @@ class RPCClient(object): def __init__(self, server, port, ssl_key=None, ssl_cert=None, ssl_ca=None): self.log.debug("Connecting to gearman at %s:%s" % (server, port)) self.gearman = gear.Client() - self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.gearman.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.log.debug("Waiting for gearman") self.gearman.waitForServer() diff --git a/zuul/rpclistener.py b/zuul/rpclistener.py index 37f34e61f0..0d7d01a2f2 100644 --- a/zuul/rpclistener.py +++ b/zuul/rpclistener.py @@ -48,7 +48,9 @@ class RPCListener(object): ssl_cert = get_default(self.config, 'gearman', 'ssl_cert') ssl_ca = get_default(self.config, 'gearman', 'ssl_ca') self.worker = gear.TextWorker('Zuul RPC Listener') - self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca) + self.worker.addServer(server, port, ssl_key, ssl_cert, ssl_ca, + keepalive=True, tcp_keepidle=60, + tcp_keepintvl=30, tcp_keepcnt=5) self.log.debug("Waiting for server") self.worker.waitForServer() self.log.debug("Registering")