From 5fb345134760a0dd5525cd33d98c84f9faa15b9e Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Fri, 14 Jun 2024 16:25:00 +0200 Subject: [PATCH] Add ZKObject-based launcher API This change adds two new ZKObjects, namely the NodesetRequest and a ProviderNode. A launcher API that operates on cached data provides an interface that will be used by the launcher server later on. This change only adds the API and exercises it in a unittest. Following changes will use it as part of the launcher client/server. Change-Id: I38528ca75002abca2bdcc7aee2173d0cdcf3129e --- requirements.txt | 1 + tests/unit/test_zk.py | 151 +++++++++++++++++- zuul/model.py | 124 +++++++++++++++ zuul/zk/cache.py | 355 ++++++++++++++++++++++++++++++++++++++++++ zuul/zk/launcher.py | 252 ++++++++++++++++++++++++++++++ zuul/zk/zkobject.py | 72 +++++++-- 6 files changed, 943 insertions(+), 12 deletions(-) create mode 100644 zuul/zk/cache.py create mode 100644 zuul/zk/launcher.py diff --git a/requirements.txt b/requirements.txt index ae31a0051f..109e8c7df3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,3 +56,4 @@ google-api-python-client ibm-vpc ibm-platform-services ibm-cos-sdk>=2.11.0 +mmh3 diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 550bb8e56c..8205f622d8 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -17,6 +17,7 @@ from collections import defaultdict import json import queue import threading +import time import uuid from unittest import mock @@ -38,6 +39,7 @@ from zuul.zk.exceptions import LockException from zuul.zk.executor import ExecutorApi from zuul.zk.job_request_queue import JobRequestEvent from zuul.zk.merger import MergerApi +from zuul.zk.launcher import LauncherApi from zuul.zk.layout import LayoutStateStore, LayoutState from zuul.zk.locks import locked from zuul.zk.nodepool import ZooKeeperNodepool @@ -48,7 +50,11 @@ from zuul.zk.sharding import ( NODE_BYTE_SIZE_LIMIT, ) from zuul.zk.components import ( - BaseComponent, ComponentRegistry, ExecutorComponent, COMPONENT_REGISTRY + BaseComponent, + ComponentRegistry, + ExecutorComponent, + LauncherComponent, + COMPONENT_REGISTRY ) from tests.base import ( BaseTestCase, HoldableExecutorApi, HoldableMergerApi, @@ -2178,3 +2184,146 @@ class TestPipelineInit(ZooKeeperBaseTestCase): pipeline.manager = mock.Mock() pipeline.state.refresh(context) self.assertEqual(pipeline.state.layout_uuid, layout.uuid) + + +class TestLauncherApi(ZooKeeperBaseTestCase): + + def setUp(self): + super().setUp() + self.component_info = LauncherComponent(self.zk_client, "test") + self.component_info.state = self.component_info.RUNNING + self.component_info.register() + self.api = LauncherApi( + self.zk_client, self.component_registry, self.component_info, + lambda: None) + self.addCleanup(self.api.stop) + + def test_launcher(self): + labels = ["foo", "bar"] + context = ZKContext(self.zk_client, None, None, self.log) + request = model.NodesetRequest.new( + context, + tenant_name="tenant", + pipeline_name="check", + buildset_uuid=uuid.uuid4().hex, + job_uuid=uuid.uuid4().hex, + job_name="foobar", + labels=labels, + priority=100, + request_time=time.time(), + zuul_event_id=None, + span_info=None, + ) + + # Wait for request to show up in the cache + for _ in iterate_timeout(10, "request to show up"): + request_list = self.api.getNodesetRequests() + if len(request_list): + break + + self.assertEqual(len(request_list), 1) + for req in request_list: + request = self.api.getNodesetRequest(req.uuid) + self.assertIs(request, req) + self.assertIsNotNone(request) + self.assertEqual(labels, request.labels) + self.assertIsNotNone(request._zstat) + self.assertIsNotNone(request.getPath()) + + self.assertIsNotNone(request.acquireLock(self.zk_client)) + self.assertTrue(request.hasLock()) + for _ in iterate_timeout(10, "request to be locked"): + if request.is_locked: + break + + # Create provider nodes for the requested labels + for i, label in enumerate(request.labels): + node = model.ProviderNode.new( + context, request_id=request.uuid, uuid=uuid.uuid4().hex, + label=label) + + # Wait for the nodes to show up in the cache + for _ in iterate_timeout(10, "nodes to show up"): + provider_nodes = self.api.getProviderNodes() + if len(provider_nodes) == 2: + break + + # Accept and update the nodeset request + request.updateAttributes( + context, + state=model.NodesetRequest.State.ACCEPTED, + provider_nodes=[n.uuid for n in provider_nodes]) + + # "Fulfill" requested provider nodes + for node in self.api.getMatchingProviderNodes(): + self.assertIsNotNone(node.acquireLock(self.zk_client)) + self.assertTrue(node.hasLock()) + for _ in iterate_timeout(10, "node to be locked"): + if node.is_locked: + break + node.updateAttributes( + context, + state=model.ProviderNode.State.READY + ) + node.releaseLock() + self.assertFalse(node.hasLock()) + for _ in iterate_timeout(10, "node to be unlocked"): + if not node.is_locked: + break + + # Wait for nodes to show up be ready and unlocked + for _ in iterate_timeout(10, "nodes to be ready"): + requested_nodes = [self.api.getProviderNode(ni) + for ni in request.provider_nodes] + if len(requested_nodes) != 2: + continue + if all(n.state == model.ProviderNode.State.READY + and not n.is_locked for n in requested_nodes): + break + + # Mark the request as fulfilled + unlock + request.updateAttributes( + context, + state=model.NodesetRequest.State.FULFILLED, + ) + request.releaseLock() + self.assertFalse(request.hasLock()) + for _ in iterate_timeout(10, "request to be unlocked"): + if not request.is_locked: + break + + # Should be a no-op + self.api.cleanupNodes() + + # Remove request and wait for it to be removed from the cache + request.delete(context) + for _ in iterate_timeout(10, "request to be removed"): + request_list = self.api.getNodesetRequests() + if not len(request_list): + break + + not_request = self.api.getNodesetRequest(request.uuid) + self.assertIsNone(not_request) + + # Make sure we still have the provider nodes + provider_nodes = self.api.getProviderNodes() + self.assertEqual(len(provider_nodes), 2) + + # Mark nodes as used + for node in provider_nodes: + self.assertIsNotNone(node.acquireLock(self.zk_client)) + for _ in iterate_timeout(10, "wait for lock to show up"): + if node.is_locked: + break + node.updateAttributes( + context, + state=model.ProviderNode.State.USED, + ) + node.releaseLock() + + # Cleanup used nodes and wait for them to be removed from the cache + for _ in iterate_timeout(10, "nodes to be removed"): + self.api.cleanupNodes() + provider_nodes = self.api.getProviderNodes() + if not provider_nodes: + break diff --git a/zuul/model.py b/zuul/model.py index 9980acdda5..934c32a59a 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -26,6 +26,7 @@ import textwrap import types import urllib.parse from collections import OrderedDict, defaultdict, namedtuple, UserDict +from enum import StrEnum from functools import partial, total_ordering from uuid import uuid4 @@ -2143,6 +2144,129 @@ class NodeRequest(object): return request +class NodesetRequest(zkobject.LockableZKObject): + + class State(StrEnum): + REQUESTED = "requested" + ACCEPTED = "accepted" + FULFILLED = "fulfilled" + FAILED = "failed" + + FINAL_STATES = ( + State.FULFILLED, + State.FAILED, + ) + + ROOT = "/zuul/nodeset" + REQUESTS_PATH = "requests" + LOCKS_PATH = "locks" + + def __init__(self): + super().__init__() + self._set( + uuid=uuid4().hex, + state=self.State.REQUESTED, + tenant_name="", + pipeline_name="", + buildset_uuid="", + job_uuid="", + job_name="", + labels=[], + priority=0, + request_time=time.time(), + zuul_event_id="", + span_info=None, + provider_nodes=[], + # Attributes that are not serialized + lock=None, + is_locked=False, + # Attributes set by the launcher + _lscores=None, + ) + + def getPath(self): + return f"{self.ROOT}/{self.REQUESTS_PATH}/{self.uuid}" + + def getLockPath(self): + return f"{self.ROOT}/{self.LOCKS_PATH}/{self.uuid}" + + def serialize(self, context): + data = dict( + uuid=self.uuid, + state=self.state, + tenant_name=self.tenant_name, + pipeline_name=self.pipeline_name, + buildset_uuid=self.buildset_uuid, + job_uuid=self.job_uuid, + job_name=self.job_name, + labels=self.labels, + priority=self.priority, + request_time=self.request_time, + zuul_event_id=self.zuul_event_id, + span_info=self.span_info, + provider_nodes=self.provider_nodes, + ) + return json.dumps(data, sort_keys=True).encode("utf-8") + + def __repr__(self): + return (f"") + + +class ProviderNode(zkobject.LockableZKObject): + + class State(StrEnum): + REQUESTED = "requested" + BUILDING = "building" + READY = "ready" + FAILED = "failed" + USED = "used" + HOLD = "hold" + + FINAL_STATES = ( + State.READY, + State.FAILED, + ) + + ROOT = "/zuul/nodes" + NODES_PATH = "nodes" + LOCKS_PATH = "locks" + + def __init__(self): + super().__init__() + self._set( + uuid=uuid4().hex, + request_id="", + state=self.State.REQUESTED, + label="", + connection_name="", + # Attributes that are not serialized + is_locked=False, + # Attributes set by the launcher + _lscores=None, + ) + + def __repr__(self): + return (f"") + + def getPath(self): + return f"{self.ROOT}/{self.NODES_PATH}/{self.uuid}" + + def getLockPath(self): + return f"{self.ROOT}/{self.LOCKS_PATH}/{self.uuid}" + + def serialize(self, context): + data = dict( + uuid=self.uuid, + request_id=self.request_id, + state=self.state, + label=self.label, + connection_name=self.connection_name, + ) + return json.dumps(data, sort_keys=True).encode("utf-8") + + class Secret(ConfigObject): """A collection of private data. diff --git a/zuul/zk/cache.py b/zuul/zk/cache.py new file mode 100644 index 0000000000..8ed4e73e19 --- /dev/null +++ b/zuul/zk/cache.py @@ -0,0 +1,355 @@ +# Copyright 2023 Acme Gating, LLC +# Copyright 2024 BMW Group +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import json +import logging +import queue +import threading +import time + +from kazoo import exceptions as kze +from kazoo.protocol.states import ( + EventType, + WatchedEvent, + KazooState, +) + +from zuul.zk.vendor.states import AddWatchMode + + +class ZuulTreeCache(abc.ABC): + ''' + Use watchers to keep a cache of local objects up to date. + ''' + + log = logging.getLogger("zuul.zk.ZooKeeper") + event_log = logging.getLogger("zuul.zk.cache.event") + qsize_warning_threshold = 1024 + + def __init__(self, zk_client, root): + self.client = zk_client.client + self.root = root + self._last_event_warning = time.monotonic() + self._last_playback_warning = time.monotonic() + self._cached_objects = {} + self._cached_paths = set() + self._ready = threading.Event() + self._init_lock = threading.Lock() + self._stopped = False + self._stop_workers = False + self._event_queue = queue.Queue() + self._playback_queue = queue.Queue() + self._event_worker = None + self._playback_worker = None + + self.client.add_listener(self._sessionListener) + self._start() + + def _bytesToDict(self, data): + return json.loads(data.decode('utf8')) + + def _sessionListener(self, state): + if state == KazooState.LOST: + self._ready.clear() + self._stop_workers = True + self._event_queue.put(None) + self._playback_queue.put(None) + elif state == KazooState.CONNECTED and not self._stopped: + self.client.handler.short_spawn(self._start) + + def _cacheListener(self, event): + self._event_queue.put(event) + + def _start(self): + with self._init_lock: + self.log.debug("Initialize cache at %s", self.root) + + self._ready.clear() + self._stop_workers = True + self._event_queue.put(None) + self._playback_queue.put(None) + + # If we have an event worker (this is a re-init), then wait + # for it to finish stopping. + if self._event_worker: + self._event_worker.join() + # Replace the queue since any events from the previous + # session aren't valid. + self._event_queue = queue.Queue() + # Prepare (but don't start) the new worker. + self._event_worker = threading.Thread( + target=self._eventWorker) + self._event_worker.daemon = True + + if self._playback_worker: + self._playback_worker.join() + self._playback_queue = queue.Queue() + self._playback_worker = threading.Thread( + target=self._playbackWorker) + self._playback_worker.daemon = True + + # Clear the stop flag and start the workers now that we + # are sure that both have stopped and we have cleared the + # queues. + self._stop_workers = False + self._event_worker.start() + self._playback_worker.start() + + try: + self.client.add_watch( + self.root, self._cacheListener, + AddWatchMode.PERSISTENT_RECURSIVE) + self.client.ensure_path(self.root) + self._walkTree() + self._ready.set() + self.log.debug("Cache at %s is ready", self.root) + except Exception: + self.log.exception("Error initializing cache at %s", self.root) + self.client.handler.short_spawn(self._start) + + def stop(self): + self._stopped = True + self._event_queue.put(None) + self._playback_queue.put(None) + + def _walkTree(self, root=None, seen_paths=None): + # Recursively walk the tree and emit fake changed events for + # every item in zk and fake deleted events for every item in + # the cache that is not in zk + exists = True + am_root = False + if root is None: + am_root = True + root = self.root + seen_paths = set() + if not self.client.exists(root): + exists = False + if exists: + seen_paths.add(root) + event = WatchedEvent(EventType.NONE, + self.client._state, + root) + self._cacheListener(event) + try: + for child in self.client.get_children(root): + safe_root = root + if safe_root == '/': + safe_root = '' + new_path = '/'.join([safe_root, child]) + self._walkTree(new_path, seen_paths) + except kze.NoNodeError: + self.log.debug("Can't sync non-existent node %s", root) + if am_root: + for path in self._cached_paths.copy(): + if path not in seen_paths: + event = WatchedEvent( + EventType.NONE, + self.client._state, + path) + self._cacheListener(event) + + def _eventWorker(self): + while not (self._stopped or self._stop_workers): + event = self._event_queue.get() + if event is None: + self._event_queue.task_done() + continue + + qsize = self._event_queue.qsize() + if qsize > self.qsize_warning_threshold: + now = time.monotonic() + if now - self._last_event_warning > 60: + self.log.warning("Event queue size for cache at %s is %s", + self.root, qsize) + self._last_event_warning = now + + try: + self._handleCacheEvent(event) + except Exception: + self.log.exception("Error handling event %s:", event) + self._event_queue.task_done() + + def _handleCacheEvent(self, event): + # Ignore root node since we don't maintain a cached object for + # it (all cached objects are under the root in our tree + # caches). + if event.path == self.root: + return + + # Start by assuming we need to fetch data for the event. + fetch = True + if event.type == EventType.NONE: + if event.path is None: + # We're probably being told of a connection change; ignore. + return + elif (event.type == EventType.DELETED): + # If this is a normal deleted event, we don't need to + # fetch anything. + fetch = False + + key = self.parsePath(event.path) + if key is None and event.type != EventType.NONE: + # The cache doesn't care about this path, so we don't need + # to fetch (unless the type is none (re-initialization) in + # which case we always need to fetch in order to determine + # existence). + fetch = False + + if fetch: + future = self.client.get_async(event.path) + else: + future = None + self._playback_queue.put((event, future, key)) + + def _playbackWorker(self): + while not (self._stopped or self._stop_workers): + item = self._playback_queue.get() + if item is None: + self._playback_queue.task_done() + continue + + qsize = self._playback_queue.qsize() + if qsize > self.qsize_warning_threshold: + now = time.monotonic() + if now - self._last_playback_warning > 60: + self.log.warning( + "Playback queue size for cache at %s is %s", + self.root, qsize) + self._last_playback_warning = now + + event, future, key = item + try: + self._handlePlayback(event, future, key) + except Exception: + self.log.exception("Error playing back event %s:", event) + self._playback_queue.task_done() + + def _handlePlayback(self, event, future, key): + self.event_log.debug("Cache playback event %s", event) + exists = None + data, stat = None, None + + if future: + try: + data, stat = future.get() + exists = True + except kze.NoNodeError: + exists = False + + # We set "exists" above in case of cache re-initialization, + # which happens out of sequence with the normal watch events. + # and we can't be sure whether the node still exists or not by + # the time we process it. Later cache watch events may + # supercede this (for example, we may process a NONE event + # here which we interpret as a delete which may be followed by + # a normal delete event. That case, and any other variations + # should be anticipated. + + # If the event tells us whether the node exists, prefer that + # value, otherwise fallback to what we determined above. + if (event.type in (EventType.CREATED, EventType.CHANGED)): + exists = True + elif (event.type == EventType.DELETED): + exists = False + + # Keep the cached paths up to date + if exists: + self._cached_paths.add(event.path) + else: + self._cached_paths.discard(event.path) + + # Some caches have special handling for certain sub-objects + self.preCacheHook(event, exists) + + # If we don't actually cache this kind of object, return now + if key is None: + return + + if data: + # Perform an in-place update of the cached object if possible + old_obj = self._cached_objects.get(key) + if old_obj: + if stat.mzxid <= old_obj._zstat.mzxid: + # Don't update to older data + return + if getattr(old_obj, 'lock', None): + # Don't update a locked object + return + old_obj._updateFromRaw(data, stat) + else: + obj = self.objectFromDict(data, stat) + self._cached_objects[key] = obj + else: + try: + del self._cached_objects[key] + except KeyError: + # If it's already gone, don't care + pass + self.postCacheHook(event, data, stat) + + def ensureReady(self): + self._ready.wait() + + # Methods for subclasses: + def preCacheHook(self, event, exists): + """Called before the cache is updated + + This is called for any add/update/remove event under the root, + even for paths that are ignored, so users much test the + relevance of the path in this method. + + The ``exists`` argument is provided in all cases. In the case + of EventType.NONE events, it indicates whether the cache has + seen the node in ZK immediately before calling this method. + Otherwise, it indicates whether or not the EventType would + cause the node to exist in ZK. + + :param EventType event: The event. + :param bool exists: Whether the object exists in ZK. + + """ + return None + + def postCacheHook(self, event, data, stat): + """Called after the cache has been updated""" + return None + + @abc.abstractmethod + def parsePath(self, path): + """Parse the path and return a cache key + + The cache key is an opaque object ignored by the cache, but + must be hashable. + + A convention is to use a tuple of relevant path components as + the key. + + Return None to indicate the path is not relevant to the cache. + + """ + return None + + @abc.abstractmethod + def objectFromDict(self, d, key): + """Construct an object from ZooKeeper data + + Given a dictionary of data from ZK and cache key, construct + and return an object to insert into the cache. + + :param dict d: The dictionary. + :param object key: The key as returned by parsePath. + """ + pass diff --git a/zuul/zk/launcher.py b/zuul/zk/launcher.py new file mode 100644 index 0000000000..ba2ee6b9b4 --- /dev/null +++ b/zuul/zk/launcher.py @@ -0,0 +1,252 @@ +# Copyright 2024 BMW Group +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +import threading +from operator import attrgetter + +import mmh3 +from kazoo.exceptions import NoNodeError + +from zuul.model import NodesetRequest, ProviderNode +from zuul.zk.cache import ZuulTreeCache +from zuul.zk.zkobject import ZKContext + + +def _dictToBytes(data): + return json.dumps(data).encode("utf-8") + + +def _bytesToDict(raw_data): + return json.loads(raw_data.decode("utf-8")) + + +def launcher_score(name, item): + return mmh3.hash(f"{name}-{item.uuid}", signed=False) + + +class LockableZKObjectCache(ZuulTreeCache): + + def __init__(self, zk_client, updated_event, root, items_path, + locks_path, zkobject_class): + self.updated_event = updated_event + self.items_path = items_path + self.locks_path = locks_path + self.zkobject_class = zkobject_class + super().__init__(zk_client, root) + + def _parsePath(self, path): + if not path.startswith(self.root): + return None + path = path[len(self.root) + 1:] + parts = path.split('/') + # We are interested in requests with a parts that look like: + # ([, ], , ...) + if len(parts) < 2: + return None + return parts + + def parsePath(self, path): + parts = self._parsePath(path) + if parts is None: + return None + if len(parts) != 2: + return None + if parts[0] != self.items_path: + return None + item_uuid = parts[-1] + return (item_uuid,) + + def preCacheHook(self, event, exists): + parts = self._parsePath(event.path) + if parts is None: + return + + # Expecting (, , ,) + if len(parts) != 3: + return + + object_type, request_uuid, *_ = parts + if object_type != self.locks_path: + return + + key = (request_uuid,) + request = self._cached_objects.get(key) + + if not request: + return + + self.updated_event() + request._set(is_locked=exists) + + def postCacheHook(self, event, data, stat): + self.updated_event() + + def objectFromDict(self, d, zstat): + return self.zkobject_class._fromRaw(d, zstat) + + def getItem(self, item_id): + self.ensureReady() + return self._cached_objects.get((item_id,)) + + def getItems(self): + # get a copy of the values view to avoid runtime errors in the event + # the _cached_nodes dict gets updated while iterating + self.ensureReady() + return list(self._cached_objects.values()) + + +class LauncherApi: + log = logging.getLogger("zuul.LauncherApi") + + def __init__(self, zk_client, component_registry, component_info, + event_callback, connection_filter=None): + self.zk_client = zk_client + self.component_registry = component_registry + self.component_info = component_info + self.event_callback = event_callback + self.requests_cache = LockableZKObjectCache( + self.zk_client, + self.event_callback, + root=NodesetRequest.ROOT, + items_path=NodesetRequest.REQUESTS_PATH, + locks_path=NodesetRequest.LOCKS_PATH, + zkobject_class=NodesetRequest) + self.nodes_cache = LockableZKObjectCache( + self.zk_client, + self.event_callback, + root=ProviderNode.ROOT, + items_path=ProviderNode.NODES_PATH, + locks_path=ProviderNode.LOCKS_PATH, + zkobject_class=ProviderNode) + self.connection_filter = connection_filter + self.stop_event = threading.Event() + + def stop(self): + self.stop_event.set() + self.requests_cache.stop() + self.nodes_cache.stop() + + def getMatchingRequests(self): + candidate_launchers = { + c.hostname: c for c in self.component_registry.all("launcher")} + candidate_names = set(candidate_launchers.keys()) + + for request in sorted(self.requests_cache.getItems(), + key=attrgetter("priority")): + if request.hasLock(): + # We are holding a lock, so short-circuit here. + yield request + if request.is_locked: + # Request is locked by someone else + continue + + score_launchers = ( + set(request._lscores.keys()) if request._lscores else set()) + missing_scores = candidate_names - score_launchers + if missing_scores or request._lscores is None: + # (Re-)compute launcher scores + request._set(_lscores={launcher_score(n, request): n + for n in candidate_names}) + + launcher_scores = sorted(request._lscores.items()) + # self.log.debug("Launcher scores: %s", launcher_scores) + for score, launcher_name in launcher_scores: + launcher = candidate_launchers.get(launcher_name) + if not launcher: + continue + if launcher.state != launcher.RUNNING: + continue + if launcher.hostname == self.component_info.hostname: + yield request + break + + def getNodesetRequest(self, request_id): + return self.requests_cache.getItem(request_id) + + def getNodesetRequests(self): + return self.requests_cache.getItems() + + def getMatchingProviderNodes(self): + all_launchers = { + c.hostname: c for c in self.component_registry.all("launcher")} + + for node in self.nodes_cache.getItems(): + if node.hasLock(): + # We are holding a lock, so short-circuit here. + yield node + if node.is_locked: + # Node is locked by someone else + continue + + candidate_launchers = { + n: c for n, c in all_launchers.items() + if not c.connection_filter + or node.connection_name in c.connection_filter} + candidate_names = set(candidate_launchers) + if node._lscores is None: + missing_scores = candidate_names + else: + score_launchers = set(node._lscores.keys()) + missing_scores = candidate_names - score_launchers + + if missing_scores or node._lscores is None: + # (Re-)compute launcher scores + node._set(_lscores={launcher_score(n, node): n + for n in candidate_names}) + + launcher_scores = sorted(node._lscores.items()) + + for score, launcher_name in launcher_scores: + launcher = candidate_launchers.get(launcher_name) + if not launcher: + # Launcher is no longer online + continue + if launcher.state != launcher.RUNNING: + continue + if launcher.hostname == self.component_info.hostname: + yield node + break + + def getProviderNode(self, node_id): + return self.nodes_cache.getItem(node_id) + + def getProviderNodes(self): + return self.nodes_cache.getItems() + + def createZKContext(self, lock=None, log=None): + return ZKContext( + self.zk_client, lock, self.stop_event, log or self.log) + + def cleanupNodes(self): + # TODO: This method currently just performs some basic cleanup and + # might need to be extended in the future. + for node in self.getProviderNodes(): + if node.state != ProviderNode.State.USED: + continue + # FIXME: check if the node request still exists + if node.is_locked: + continue + if self.getNodesetRequest(node.request_id): + continue + if lock := node.acquireLock(self.zk_client): + try: + with self.createZKContext(lock) as ctx: + node.delete(ctx) + except NoNodeError: + # Node is already deleted + pass + finally: + node.releaseLock() diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 87d76bca67..6260bf39c5 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -22,11 +22,12 @@ import types import zlib import collections -from kazoo.exceptions import NodeExistsError, NoNodeError +from kazoo.exceptions import LockTimeout, NodeExistsError, NoNodeError from kazoo.retry import KazooRetry from zuul.zk import sharding from zuul.zk import ZooKeeperClient +from zuul.zk.locks import SessionAwareLock class BaseZKContext: @@ -358,7 +359,7 @@ class ZKObject: context.cumulative_read_bytes += len(compressed_data) return compressed_data, zstat - def _load(self, context, path=None, deserialize=True): + def _load(self, context, path=None): if path is None: path = self.getPath() if context.sessionIsInvalid(): @@ -371,21 +372,26 @@ class ZKObject: context.log.error( "Exception loading ZKObject %s at %s", self, path) raise - if deserialize: - self._set(_zkobject_hash=None) + self._updateFromRaw(compressed_data, zstat, context) + + @classmethod + def _fromRaw(cls, raw_data, zstat): + obj = cls() + obj._updateFromRaw(raw_data, zstat) + return obj + + def _updateFromRaw(self, raw_data, zstat, context=None): + self._set(_zkobject_hash=None) try: - data = zlib.decompress(compressed_data) + data = zlib.decompress(raw_data) except zlib.error: # Fallback for old, uncompressed data - data = compressed_data - if not deserialize: - return data + data = raw_data self._set(**self.deserialize(data, context)) self._set(_zstat=zstat, _zkobject_hash=hash(data), - _zkobject_compressed_size=len(compressed_data), - _zkobject_uncompressed_size=len(data), - ) + _zkobject_compressed_size=len(raw_data), + _zkobject_uncompressed_size=len(data)) @staticmethod def _retryableSave(context, create, path, compressed_data, version): @@ -524,3 +530,47 @@ class ShardedZKObject(ZKObject): context.log.error( "Exception saving ZKObject %s at %s", self, path) raise + + +class LockableZKObject(ZKObject): + _lock = None + + def getLockPath(self): + """Return the path for the lock of this object in ZK + + :returns: A string representation of the Znode path + """ + raise NotImplementedError() + + def acquireLock(self, zk_client, blocking=True, timeout=None): + have_lock = False + lock = None + path = self.getLockPath() + try: + lock = SessionAwareLock(zk_client.client, path) + have_lock = lock.acquire(blocking, timeout) + except NoNodeError: + # Request disappeared + have_lock = False + except LockTimeout: + have_lock = False + self.log.error("Timeout trying to acquire lock: %s", path) + + # If we aren't blocking, it's possible we didn't get the lock + # because someone else has it. + if not have_lock: + return None + + self._set(_lock=lock) + return lock + + def releaseLock(self): + if self._lock is None: + return + self._lock.release() + self._set(_lock=None) + + def hasLock(self): + if self._lock is None: + return False + return self._lock.is_still_valid()