Merge "Use a ThreadPoolExecutor for kazoo callbacks"

This commit is contained in:
Zuul
2021-10-22 20:11:34 +00:00
committed by Gerrit Code Review
3 changed files with 41 additions and 3 deletions

View File

@@ -23,6 +23,7 @@ from kazoo.protocol.states import KazooState
from zuul.lib.config import get_default
from zuul.zk.exceptions import NoClientException
from zuul.zk.handler import PoolSequentialThreadingHandler
class ZooKeeperClient(object):
@@ -126,6 +127,7 @@ class ZooKeeperClient(object):
hosts=self.hosts,
read_only=self.read_only,
timeout=self.timeout,
handler=PoolSequentialThreadingHandler(),
)
if self.tls_key:
args['use_ssl'] = True

36
zuul/zk/handler.py Normal file
View File

@@ -0,0 +1,36 @@
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor
from kazoo.handlers.threading import SequentialThreadingHandler
class PoolSequentialThreadingHandler(SequentialThreadingHandler):
def __init__(self):
super().__init__()
self._pool_executor = None
def start(self):
self._pool_executor = ThreadPoolExecutor(max_workers=10)
super().start()
def stop(self):
super().stop()
if self._pool_executor:
self._pool_executor.shutdown()
self._pool_executor = None
def short_spawn(self, func, *args, **kwargs):
self._pool_executor.submit(func, *args, **kwargs)

View File

@@ -206,7 +206,7 @@ class DataWatch(object):
stat = self._retry(self._client.exists, self._path,
self._watcher)
if stat:
self._client.handler.spawn(self._get_data)
self._client.handler.short_spawn(self._get_data)
return
# No node data, clear out version
@@ -229,7 +229,7 @@ class DataWatch(object):
def _session_watcher(self, state):
if state == KazooState.CONNECTED:
self._client.handler.spawn(self._get_data)
self._client.handler.short_spawn(self._get_data)
class ChildrenWatch(object):
@@ -371,7 +371,7 @@ class ChildrenWatch(object):
self._watch_established = False
elif (state == KazooState.CONNECTED and
not self._watch_established and not self._stopped):
self._client.handler.spawn(self._get_children)
self._client.handler.short_spawn(self._get_children)
class PatientChildrenWatch(object):