Use a ThreadPoolExecutor for kazoo callbacks

The kazoo client can spawn a number of background threads to perform
async tasks without blocking foreground operation.  In particular,
this allows its main thread to keep up with ZK traffic which may
trigger callbacks, and then if those callbacks need to perform
operations which take time (or cause ZK traffic themselves) they
will spawn threads for these operations.

Watches set session callbacks which refresh their data.

If a ZK session is reset, then kazoo will call all of these callbacks,
each of which may spawn a thread to refresh its data.  This can easily
lead to spawning too many threads and/or running out of memory.

To alleviate this, use a threadpool executor in the kazoo client when
spawning background operations.  We allow 10 workers for this.  The
watches (which we vendor) are updating so that their throwaway operations
use the thread pool executor, but the long-running background threads
are top-level threads as usual.

Change-Id: Ie90f904342f6261859a875e75b1ea14e2238f895
This commit is contained in:
James E. Blair 2021-10-22 09:24:35 -07:00
parent 220534c0f7
commit 5a8e4d84e2
3 changed files with 41 additions and 3 deletions

View File

@ -23,6 +23,7 @@ from kazoo.protocol.states import KazooState
from zuul.lib.config import get_default
from zuul.zk.exceptions import NoClientException
from zuul.zk.handler import PoolSequentialThreadingHandler
class ZooKeeperClient(object):
@ -126,6 +127,7 @@ class ZooKeeperClient(object):
hosts=self.hosts,
read_only=self.read_only,
timeout=self.timeout,
handler=PoolSequentialThreadingHandler(),
)
if self.tls_key:
args['use_ssl'] = True

36
zuul/zk/handler.py Normal file
View File

@ -0,0 +1,36 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor
from kazoo.handlers.threading import SequentialThreadingHandler
class PoolSequentialThreadingHandler(SequentialThreadingHandler):
def __init__(self):
super().__init__()
self._pool_executor = None
def start(self):
self._pool_executor = ThreadPoolExecutor(max_workers=10)
super().start()
def stop(self):
super().stop()
if self._pool_executor:
self._pool_executor.shutdown()
self._pool_executor = None
def short_spawn(self, func, *args, **kwargs):
self._pool_executor.submit(func, *args, **kwargs)

View File

@ -206,7 +206,7 @@ class DataWatch(object):
stat = self._retry(self._client.exists, self._path,
self._watcher)
if stat:
self._client.handler.spawn(self._get_data)
self._client.handler.short_spawn(self._get_data)
return
# No node data, clear out version
@ -229,7 +229,7 @@ class DataWatch(object):
def _session_watcher(self, state):
if state == KazooState.CONNECTED:
self._client.handler.spawn(self._get_data)
self._client.handler.short_spawn(self._get_data)
class ChildrenWatch(object):
@ -371,7 +371,7 @@ class ChildrenWatch(object):
self._watch_established = False
elif (state == KazooState.CONNECTED and
not self._watch_established and not self._stopped):
self._client.handler.spawn(self._get_children)
self._client.handler.short_spawn(self._get_children)
class PatientChildrenWatch(object):