diff --git a/zuul/zk/__init__.py b/zuul/zk/__init__.py index 85934e5c23..195197c5ff 100644 --- a/zuul/zk/__init__.py +++ b/zuul/zk/__init__.py @@ -23,6 +23,7 @@ from kazoo.protocol.states import KazooState from zuul.lib.config import get_default from zuul.zk.exceptions import NoClientException +from zuul.zk.handler import PoolSequentialThreadingHandler class ZooKeeperClient(object): @@ -126,6 +127,7 @@ class ZooKeeperClient(object): hosts=self.hosts, read_only=self.read_only, timeout=self.timeout, + handler=PoolSequentialThreadingHandler(), ) if self.tls_key: args['use_ssl'] = True diff --git a/zuul/zk/handler.py b/zuul/zk/handler.py new file mode 100644 index 0000000000..3cbce0b0e4 --- /dev/null +++ b/zuul/zk/handler.py @@ -0,0 +1,36 @@ +# Copyright 2021 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from concurrent.futures import ThreadPoolExecutor + +from kazoo.handlers.threading import SequentialThreadingHandler + + +class PoolSequentialThreadingHandler(SequentialThreadingHandler): + def __init__(self): + super().__init__() + self._pool_executor = None + + def start(self): + self._pool_executor = ThreadPoolExecutor(max_workers=10) + super().start() + + def stop(self): + super().stop() + if self._pool_executor: + self._pool_executor.shutdown() + self._pool_executor = None + + def short_spawn(self, func, *args, **kwargs): + self._pool_executor.submit(func, *args, **kwargs) diff --git a/zuul/zk/vendor/watchers.py b/zuul/zk/vendor/watchers.py index bfb9bc1612..8eac05ca64 100644 --- a/zuul/zk/vendor/watchers.py +++ b/zuul/zk/vendor/watchers.py @@ -206,7 +206,7 @@ class DataWatch(object): stat = self._retry(self._client.exists, self._path, self._watcher) if stat: - self._client.handler.spawn(self._get_data) + self._client.handler.short_spawn(self._get_data) return # No node data, clear out version @@ -229,7 +229,7 @@ class DataWatch(object): def _session_watcher(self, state): if state == KazooState.CONNECTED: - self._client.handler.spawn(self._get_data) + self._client.handler.short_spawn(self._get_data) class ChildrenWatch(object): @@ -371,7 +371,7 @@ class ChildrenWatch(object): self._watch_established = False elif (state == KazooState.CONNECTED and not self._watch_established and not self._stopped): - self._client.handler.spawn(self._get_children) + self._client.handler.short_spawn(self._get_children) class PatientChildrenWatch(object):