diff --git a/tests/base.py b/tests/base.py index 4dcafe0037..c3ff97cd43 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2953,7 +2953,7 @@ class ZuulTestCase(BaseTestCase): self.merge_client = RecordingMergeClient(self.config, self.sched) self.merge_server = None self.nodepool = zuul.nodepool.Nodepool(self.sched) - self.zk = zuul.zk.ZooKeeper() + self.zk = zuul.zk.ZooKeeper(enable_cache=True) self.zk.connect(self.zk_config, timeout=30.0) self.fake_nodepool = FakeNodepool( @@ -3371,8 +3371,10 @@ class ZuulTestCase(BaseTestCase): 'socketserver_Thread', 'GerritWebServer', ] + # Ignore Kazoo TreeCache threads that start with "Thread-" threads = [t for t in threading.enumerate() - if t.name not in whitelist] + if t.name not in whitelist + and not t.name.startswith("Thread-")] if len(threads) > 1: log_str = "" for thread_id, stack_frame in sys._current_frames().items(): diff --git a/tests/nodepool/test_nodepool_integration.py b/tests/nodepool/test_nodepool_integration.py index b608bba786..bb6c8abaa0 100644 --- a/tests/nodepool/test_nodepool_integration.py +++ b/tests/nodepool/test_nodepool_integration.py @@ -31,7 +31,7 @@ class TestNodepoolIntegration(BaseTestCase): super(TestNodepoolIntegration, self).setUp() self.statsd = None - self.zk = zuul.zk.ZooKeeper() + self.zk = zuul.zk.ZooKeeper(enable_cache=True) self.addCleanup(self.zk.disconnect) self.zk.connect('localhost:2181') self.hostname = socket.gethostname() diff --git a/tests/unit/test_nodepool.py b/tests/unit/test_nodepool.py index e822a10241..2326b1b1bb 100644 --- a/tests/unit/test_nodepool.py +++ b/tests/unit/test_nodepool.py @@ -37,7 +37,7 @@ class TestNodepool(BaseTestCase): self.zk_chroot_fixture.zookeeper_port, self.zk_chroot_fixture.zookeeper_chroot) - self.zk = zuul.zk.ZooKeeper() + self.zk = zuul.zk.ZooKeeper(enable_cache=True) self.addCleanup(self.zk.disconnect) self.zk.connect(self.zk_config) self.hostname = 'nodepool-test-hostname' diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 8a485457f2..d9942a90a2 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -33,7 +33,7 @@ class TestZK(BaseTestCase): self.zk_chroot_fixture.zookeeper_port, self.zk_chroot_fixture.zookeeper_chroot) - self.zk = zuul.zk.ZooKeeper() + self.zk = zuul.zk.ZooKeeper(enable_cache=True) self.addCleanup(self.zk.disconnect) self.zk.connect(self.zk_config) diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 67484bf871..1eefbe713d 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -136,7 +136,7 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): merger = zuul.merger.client.MergeClient(self.config, self.sched) nodepool = zuul.nodepool.Nodepool(self.sched) - zookeeper = zuul.zk.ZooKeeper() + zookeeper = zuul.zk.ZooKeeper(enable_cache=True) zookeeper_hosts = get_default(self.config, 'zookeeper', 'hosts', None) if not zookeeper_hosts: raise Exception("The zookeeper hosts config value is required") diff --git a/zuul/model.py b/zuul/model.py index 9219fc7df5..0191d2c0e4 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -4647,6 +4647,7 @@ class WebInfo(object): class HoldRequest(object): def __init__(self): self.lock = None + self.stat = None self.id = None self.tenant = None self.project = None @@ -4694,6 +4695,19 @@ class HoldRequest(object): d['node_expiration'] = self.node_expiration return d + def updateFromDict(self, d): + ''' + Update current object with data from the given dictionary. + ''' + self.tenant = d.get('tenant') + self.project = d.get('project') + self.job = d.get('job') + self.ref_filter = d.get('ref_filter') + self.max_count = d.get('max_count', 1) + self.current_count = d.get('current_count', 0) + self.reason = d.get('reason') + self.node_expiration = d.get('node_expiration') + def serialize(self): ''' Return a representation of the object as a string. diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index 331df1e7a5..4c5447365c 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -984,7 +984,7 @@ class ZuulWeb(object): # instanciate handlers self.rpc = zuul.rpcclient.RPCClient(gear_server, gear_port, ssl_key, ssl_cert, ssl_ca) - self.zk = zuul.zk.ZooKeeper() + self.zk = zuul.zk.ZooKeeper(enable_cache=True) if zk_hosts: self.zk.connect(hosts=zk_hosts, read_only=True) self.connections = connections diff --git a/zuul/zk.py b/zuul/zk.py index 4a1aa09dcb..e3691ce953 100644 --- a/zuul/zk.py +++ b/zuul/zk.py @@ -17,6 +17,7 @@ import time from kazoo.client import KazooClient, KazooState from kazoo import exceptions as kze from kazoo.handlers.threading import KazooTimeoutError +from kazoo.recipe.cache import TreeCache, TreeEvent from kazoo.recipe.lock import Lock import zuul.model @@ -48,13 +49,26 @@ class ZooKeeper(object): # Log zookeeper retry every 10 seconds retry_log_rate = 10 - def __init__(self): + def __init__(self, enable_cache=True): ''' Initialize the ZooKeeper object. + + :param bool enable_cache: When True, enables caching of ZooKeeper + objects (e.g., HoldRequests). ''' self.client = None self._became_lost = False self._last_retry_log = 0 + self.enable_cache = enable_cache + + # The caching model we use is designed around handing out model + # data as objects. To do this, we use two caches: one is a TreeCache + # which contains raw znode data (among other details), and one for + # storing that data serialized as objects. This allows us to return + # objects from the APIs, and avoids calling the methods to serialize + # the data into objects more than once. + self._hold_request_tree = None + self._cached_hold_requests = {} def _dictToStr(self, data): return json.dumps(data).encode('utf8') @@ -126,6 +140,67 @@ class ZooKeeper(object): except KazooTimeoutError: self.logConnectionRetryEvent() + if self.enable_cache: + self._hold_request_tree = TreeCache(self.client, + self.HOLD_REQUEST_ROOT) + self._hold_request_tree.listen_fault(self.cacheFaultListener) + self._hold_request_tree.listen(self.holdRequestCacheListener) + self._hold_request_tree.start() + + def cacheFaultListener(self, e): + self.log.exception(e) + + def holdRequestCacheListener(self, event): + ''' + Keep the hold request object cache in sync with the TreeCache. + ''' + try: + self._holdRequestCacheListener(event) + except Exception: + self.log.exception( + "Exception in hold request cache update for event: %s", event) + + def _holdRequestCacheListener(self, event): + if hasattr(event.event_data, 'path'): + # Ignore root node + path = event.event_data.path + if path == self.HOLD_REQUEST_ROOT: + return + + if event.event_type not in (TreeEvent.NODE_ADDED, + TreeEvent.NODE_UPDATED, + TreeEvent.NODE_REMOVED): + return + + path = event.event_data.path + request_id = path.rsplit('/', 1)[1] + + if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED): + # Requests with no data are invalid + if not event.event_data.data: + return + + # Perform an in-place update of the already cached request + d = self._bytesToDict(event.event_data.data) + old_request = self._cached_hold_requests.get(request_id) + if old_request: + if event.event_data.stat.version <= old_request.stat.version: + # Don't update to older data + return + old_request.updateFromDict(d) + old_request.stat = event.event_data.stat + else: + request = zuul.model.HoldRequest.fromDict(d) + request.id = request_id + request.stat = event.event_data.stat + self._cached_hold_requests[request_id] = request + + elif event.event_type == TreeEvent.NODE_REMOVED: + try: + del self._cached_hold_requests[request_id] + except KeyError: + pass + def disconnect(self): ''' Close the ZooKeeper cluster connection. @@ -133,6 +208,10 @@ class ZooKeeper(object): You should call this method if you used connect() to establish a cluster connection. ''' + if self._hold_request_tree is not None: + self._hold_request_tree.close() + self._hold_request_tree = None + if self.client is not None and self.client.connected: self.client.stop() self.client.close() @@ -480,6 +559,7 @@ class ZooKeeper(object): obj = zuul.model.HoldRequest.fromDict(self._strToDict(data)) obj.id = hold_request_id + obj.stat = stat return obj def storeHoldRequest(self, hold_request):