Add ZKObject-based launcher API

This change adds two new ZKObjects, namely the NodesetRequest and a
ProviderNode. A launcher API that operates on cached data provides an
interface that will be used by the launcher server later on.

This change only adds the API and exercises it in a unittest. Following
changes will use it as part of the launcher client/server.

Change-Id: I38528ca75002abca2bdcc7aee2173d0cdcf3129e
This commit is contained in:
Simon Westphahl 2024-06-14 16:25:00 +02:00 committed by James E. Blair
parent baf90c4a48
commit 5fb3451347
6 changed files with 943 additions and 12 deletions

View File

@ -56,3 +56,4 @@ google-api-python-client
ibm-vpc
ibm-platform-services
ibm-cos-sdk>=2.11.0
mmh3

View File

@ -17,6 +17,7 @@ from collections import defaultdict
import json
import queue
import threading
import time
import uuid
from unittest import mock
@ -38,6 +39,7 @@ from zuul.zk.exceptions import LockException
from zuul.zk.executor import ExecutorApi
from zuul.zk.job_request_queue import JobRequestEvent
from zuul.zk.merger import MergerApi
from zuul.zk.launcher import LauncherApi
from zuul.zk.layout import LayoutStateStore, LayoutState
from zuul.zk.locks import locked
from zuul.zk.nodepool import ZooKeeperNodepool
@ -48,7 +50,11 @@ from zuul.zk.sharding import (
NODE_BYTE_SIZE_LIMIT,
)
from zuul.zk.components import (
BaseComponent, ComponentRegistry, ExecutorComponent, COMPONENT_REGISTRY
BaseComponent,
ComponentRegistry,
ExecutorComponent,
LauncherComponent,
COMPONENT_REGISTRY
)
from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
@ -2178,3 +2184,146 @@ class TestPipelineInit(ZooKeeperBaseTestCase):
pipeline.manager = mock.Mock()
pipeline.state.refresh(context)
self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
class TestLauncherApi(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.component_info = LauncherComponent(self.zk_client, "test")
self.component_info.state = self.component_info.RUNNING
self.component_info.register()
self.api = LauncherApi(
self.zk_client, self.component_registry, self.component_info,
lambda: None)
self.addCleanup(self.api.stop)
def test_launcher(self):
labels = ["foo", "bar"]
context = ZKContext(self.zk_client, None, None, self.log)
request = model.NodesetRequest.new(
context,
tenant_name="tenant",
pipeline_name="check",
buildset_uuid=uuid.uuid4().hex,
job_uuid=uuid.uuid4().hex,
job_name="foobar",
labels=labels,
priority=100,
request_time=time.time(),
zuul_event_id=None,
span_info=None,
)
# Wait for request to show up in the cache
for _ in iterate_timeout(10, "request to show up"):
request_list = self.api.getNodesetRequests()
if len(request_list):
break
self.assertEqual(len(request_list), 1)
for req in request_list:
request = self.api.getNodesetRequest(req.uuid)
self.assertIs(request, req)
self.assertIsNotNone(request)
self.assertEqual(labels, request.labels)
self.assertIsNotNone(request._zstat)
self.assertIsNotNone(request.getPath())
self.assertIsNotNone(request.acquireLock(self.zk_client))
self.assertTrue(request.hasLock())
for _ in iterate_timeout(10, "request to be locked"):
if request.is_locked:
break
# Create provider nodes for the requested labels
for i, label in enumerate(request.labels):
node = model.ProviderNode.new(
context, request_id=request.uuid, uuid=uuid.uuid4().hex,
label=label)
# Wait for the nodes to show up in the cache
for _ in iterate_timeout(10, "nodes to show up"):
provider_nodes = self.api.getProviderNodes()
if len(provider_nodes) == 2:
break
# Accept and update the nodeset request
request.updateAttributes(
context,
state=model.NodesetRequest.State.ACCEPTED,
provider_nodes=[n.uuid for n in provider_nodes])
# "Fulfill" requested provider nodes
for node in self.api.getMatchingProviderNodes():
self.assertIsNotNone(node.acquireLock(self.zk_client))
self.assertTrue(node.hasLock())
for _ in iterate_timeout(10, "node to be locked"):
if node.is_locked:
break
node.updateAttributes(
context,
state=model.ProviderNode.State.READY
)
node.releaseLock()
self.assertFalse(node.hasLock())
for _ in iterate_timeout(10, "node to be unlocked"):
if not node.is_locked:
break
# Wait for nodes to show up be ready and unlocked
for _ in iterate_timeout(10, "nodes to be ready"):
requested_nodes = [self.api.getProviderNode(ni)
for ni in request.provider_nodes]
if len(requested_nodes) != 2:
continue
if all(n.state == model.ProviderNode.State.READY
and not n.is_locked for n in requested_nodes):
break
# Mark the request as fulfilled + unlock
request.updateAttributes(
context,
state=model.NodesetRequest.State.FULFILLED,
)
request.releaseLock()
self.assertFalse(request.hasLock())
for _ in iterate_timeout(10, "request to be unlocked"):
if not request.is_locked:
break
# Should be a no-op
self.api.cleanupNodes()
# Remove request and wait for it to be removed from the cache
request.delete(context)
for _ in iterate_timeout(10, "request to be removed"):
request_list = self.api.getNodesetRequests()
if not len(request_list):
break
not_request = self.api.getNodesetRequest(request.uuid)
self.assertIsNone(not_request)
# Make sure we still have the provider nodes
provider_nodes = self.api.getProviderNodes()
self.assertEqual(len(provider_nodes), 2)
# Mark nodes as used
for node in provider_nodes:
self.assertIsNotNone(node.acquireLock(self.zk_client))
for _ in iterate_timeout(10, "wait for lock to show up"):
if node.is_locked:
break
node.updateAttributes(
context,
state=model.ProviderNode.State.USED,
)
node.releaseLock()
# Cleanup used nodes and wait for them to be removed from the cache
for _ in iterate_timeout(10, "nodes to be removed"):
self.api.cleanupNodes()
provider_nodes = self.api.getProviderNodes()
if not provider_nodes:
break

View File

@ -26,6 +26,7 @@ import textwrap
import types
import urllib.parse
from collections import OrderedDict, defaultdict, namedtuple, UserDict
from enum import StrEnum
from functools import partial, total_ordering
from uuid import uuid4
@ -2143,6 +2144,129 @@ class NodeRequest(object):
return request
class NodesetRequest(zkobject.LockableZKObject):
class State(StrEnum):
REQUESTED = "requested"
ACCEPTED = "accepted"
FULFILLED = "fulfilled"
FAILED = "failed"
FINAL_STATES = (
State.FULFILLED,
State.FAILED,
)
ROOT = "/zuul/nodeset"
REQUESTS_PATH = "requests"
LOCKS_PATH = "locks"
def __init__(self):
super().__init__()
self._set(
uuid=uuid4().hex,
state=self.State.REQUESTED,
tenant_name="",
pipeline_name="",
buildset_uuid="",
job_uuid="",
job_name="",
labels=[],
priority=0,
request_time=time.time(),
zuul_event_id="",
span_info=None,
provider_nodes=[],
# Attributes that are not serialized
lock=None,
is_locked=False,
# Attributes set by the launcher
_lscores=None,
)
def getPath(self):
return f"{self.ROOT}/{self.REQUESTS_PATH}/{self.uuid}"
def getLockPath(self):
return f"{self.ROOT}/{self.LOCKS_PATH}/{self.uuid}"
def serialize(self, context):
data = dict(
uuid=self.uuid,
state=self.state,
tenant_name=self.tenant_name,
pipeline_name=self.pipeline_name,
buildset_uuid=self.buildset_uuid,
job_uuid=self.job_uuid,
job_name=self.job_name,
labels=self.labels,
priority=self.priority,
request_time=self.request_time,
zuul_event_id=self.zuul_event_id,
span_info=self.span_info,
provider_nodes=self.provider_nodes,
)
return json.dumps(data, sort_keys=True).encode("utf-8")
def __repr__(self):
return (f"<NodesetRequest uuid={self.uuid}, state={self.state},"
f" labels={self.labels}, path={self.getPath()}>")
class ProviderNode(zkobject.LockableZKObject):
class State(StrEnum):
REQUESTED = "requested"
BUILDING = "building"
READY = "ready"
FAILED = "failed"
USED = "used"
HOLD = "hold"
FINAL_STATES = (
State.READY,
State.FAILED,
)
ROOT = "/zuul/nodes"
NODES_PATH = "nodes"
LOCKS_PATH = "locks"
def __init__(self):
super().__init__()
self._set(
uuid=uuid4().hex,
request_id="",
state=self.State.REQUESTED,
label="",
connection_name="",
# Attributes that are not serialized
is_locked=False,
# Attributes set by the launcher
_lscores=None,
)
def __repr__(self):
return (f"<ProviderNode uuid={self.uuid}, state={self.state},"
f" path={self.getPath()}>")
def getPath(self):
return f"{self.ROOT}/{self.NODES_PATH}/{self.uuid}"
def getLockPath(self):
return f"{self.ROOT}/{self.LOCKS_PATH}/{self.uuid}"
def serialize(self, context):
data = dict(
uuid=self.uuid,
request_id=self.request_id,
state=self.state,
label=self.label,
connection_name=self.connection_name,
)
return json.dumps(data, sort_keys=True).encode("utf-8")
class Secret(ConfigObject):
"""A collection of private data.

355
zuul/zk/cache.py Normal file
View File

@ -0,0 +1,355 @@
# Copyright 2023 Acme Gating, LLC
# Copyright 2024 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import json
import logging
import queue
import threading
import time
from kazoo import exceptions as kze
from kazoo.protocol.states import (
EventType,
WatchedEvent,
KazooState,
)
from zuul.zk.vendor.states import AddWatchMode
class ZuulTreeCache(abc.ABC):
'''
Use watchers to keep a cache of local objects up to date.
'''
log = logging.getLogger("zuul.zk.ZooKeeper")
event_log = logging.getLogger("zuul.zk.cache.event")
qsize_warning_threshold = 1024
def __init__(self, zk_client, root):
self.client = zk_client.client
self.root = root
self._last_event_warning = time.monotonic()
self._last_playback_warning = time.monotonic()
self._cached_objects = {}
self._cached_paths = set()
self._ready = threading.Event()
self._init_lock = threading.Lock()
self._stopped = False
self._stop_workers = False
self._event_queue = queue.Queue()
self._playback_queue = queue.Queue()
self._event_worker = None
self._playback_worker = None
self.client.add_listener(self._sessionListener)
self._start()
def _bytesToDict(self, data):
return json.loads(data.decode('utf8'))
def _sessionListener(self, state):
if state == KazooState.LOST:
self._ready.clear()
self._stop_workers = True
self._event_queue.put(None)
self._playback_queue.put(None)
elif state == KazooState.CONNECTED and not self._stopped:
self.client.handler.short_spawn(self._start)
def _cacheListener(self, event):
self._event_queue.put(event)
def _start(self):
with self._init_lock:
self.log.debug("Initialize cache at %s", self.root)
self._ready.clear()
self._stop_workers = True
self._event_queue.put(None)
self._playback_queue.put(None)
# If we have an event worker (this is a re-init), then wait
# for it to finish stopping.
if self._event_worker:
self._event_worker.join()
# Replace the queue since any events from the previous
# session aren't valid.
self._event_queue = queue.Queue()
# Prepare (but don't start) the new worker.
self._event_worker = threading.Thread(
target=self._eventWorker)
self._event_worker.daemon = True
if self._playback_worker:
self._playback_worker.join()
self._playback_queue = queue.Queue()
self._playback_worker = threading.Thread(
target=self._playbackWorker)
self._playback_worker.daemon = True
# Clear the stop flag and start the workers now that we
# are sure that both have stopped and we have cleared the
# queues.
self._stop_workers = False
self._event_worker.start()
self._playback_worker.start()
try:
self.client.add_watch(
self.root, self._cacheListener,
AddWatchMode.PERSISTENT_RECURSIVE)
self.client.ensure_path(self.root)
self._walkTree()
self._ready.set()
self.log.debug("Cache at %s is ready", self.root)
except Exception:
self.log.exception("Error initializing cache at %s", self.root)
self.client.handler.short_spawn(self._start)
def stop(self):
self._stopped = True
self._event_queue.put(None)
self._playback_queue.put(None)
def _walkTree(self, root=None, seen_paths=None):
# Recursively walk the tree and emit fake changed events for
# every item in zk and fake deleted events for every item in
# the cache that is not in zk
exists = True
am_root = False
if root is None:
am_root = True
root = self.root
seen_paths = set()
if not self.client.exists(root):
exists = False
if exists:
seen_paths.add(root)
event = WatchedEvent(EventType.NONE,
self.client._state,
root)
self._cacheListener(event)
try:
for child in self.client.get_children(root):
safe_root = root
if safe_root == '/':
safe_root = ''
new_path = '/'.join([safe_root, child])
self._walkTree(new_path, seen_paths)
except kze.NoNodeError:
self.log.debug("Can't sync non-existent node %s", root)
if am_root:
for path in self._cached_paths.copy():
if path not in seen_paths:
event = WatchedEvent(
EventType.NONE,
self.client._state,
path)
self._cacheListener(event)
def _eventWorker(self):
while not (self._stopped or self._stop_workers):
event = self._event_queue.get()
if event is None:
self._event_queue.task_done()
continue
qsize = self._event_queue.qsize()
if qsize > self.qsize_warning_threshold:
now = time.monotonic()
if now - self._last_event_warning > 60:
self.log.warning("Event queue size for cache at %s is %s",
self.root, qsize)
self._last_event_warning = now
try:
self._handleCacheEvent(event)
except Exception:
self.log.exception("Error handling event %s:", event)
self._event_queue.task_done()
def _handleCacheEvent(self, event):
# Ignore root node since we don't maintain a cached object for
# it (all cached objects are under the root in our tree
# caches).
if event.path == self.root:
return
# Start by assuming we need to fetch data for the event.
fetch = True
if event.type == EventType.NONE:
if event.path is None:
# We're probably being told of a connection change; ignore.
return
elif (event.type == EventType.DELETED):
# If this is a normal deleted event, we don't need to
# fetch anything.
fetch = False
key = self.parsePath(event.path)
if key is None and event.type != EventType.NONE:
# The cache doesn't care about this path, so we don't need
# to fetch (unless the type is none (re-initialization) in
# which case we always need to fetch in order to determine
# existence).
fetch = False
if fetch:
future = self.client.get_async(event.path)
else:
future = None
self._playback_queue.put((event, future, key))
def _playbackWorker(self):
while not (self._stopped or self._stop_workers):
item = self._playback_queue.get()
if item is None:
self._playback_queue.task_done()
continue
qsize = self._playback_queue.qsize()
if qsize > self.qsize_warning_threshold:
now = time.monotonic()
if now - self._last_playback_warning > 60:
self.log.warning(
"Playback queue size for cache at %s is %s",
self.root, qsize)
self._last_playback_warning = now
event, future, key = item
try:
self._handlePlayback(event, future, key)
except Exception:
self.log.exception("Error playing back event %s:", event)
self._playback_queue.task_done()
def _handlePlayback(self, event, future, key):
self.event_log.debug("Cache playback event %s", event)
exists = None
data, stat = None, None
if future:
try:
data, stat = future.get()
exists = True
except kze.NoNodeError:
exists = False
# We set "exists" above in case of cache re-initialization,
# which happens out of sequence with the normal watch events.
# and we can't be sure whether the node still exists or not by
# the time we process it. Later cache watch events may
# supercede this (for example, we may process a NONE event
# here which we interpret as a delete which may be followed by
# a normal delete event. That case, and any other variations
# should be anticipated.
# If the event tells us whether the node exists, prefer that
# value, otherwise fallback to what we determined above.
if (event.type in (EventType.CREATED, EventType.CHANGED)):
exists = True
elif (event.type == EventType.DELETED):
exists = False
# Keep the cached paths up to date
if exists:
self._cached_paths.add(event.path)
else:
self._cached_paths.discard(event.path)
# Some caches have special handling for certain sub-objects
self.preCacheHook(event, exists)
# If we don't actually cache this kind of object, return now
if key is None:
return
if data:
# Perform an in-place update of the cached object if possible
old_obj = self._cached_objects.get(key)
if old_obj:
if stat.mzxid <= old_obj._zstat.mzxid:
# Don't update to older data
return
if getattr(old_obj, 'lock', None):
# Don't update a locked object
return
old_obj._updateFromRaw(data, stat)
else:
obj = self.objectFromDict(data, stat)
self._cached_objects[key] = obj
else:
try:
del self._cached_objects[key]
except KeyError:
# If it's already gone, don't care
pass
self.postCacheHook(event, data, stat)
def ensureReady(self):
self._ready.wait()
# Methods for subclasses:
def preCacheHook(self, event, exists):
"""Called before the cache is updated
This is called for any add/update/remove event under the root,
even for paths that are ignored, so users much test the
relevance of the path in this method.
The ``exists`` argument is provided in all cases. In the case
of EventType.NONE events, it indicates whether the cache has
seen the node in ZK immediately before calling this method.
Otherwise, it indicates whether or not the EventType would
cause the node to exist in ZK.
:param EventType event: The event.
:param bool exists: Whether the object exists in ZK.
"""
return None
def postCacheHook(self, event, data, stat):
"""Called after the cache has been updated"""
return None
@abc.abstractmethod
def parsePath(self, path):
"""Parse the path and return a cache key
The cache key is an opaque object ignored by the cache, but
must be hashable.
A convention is to use a tuple of relevant path components as
the key.
Return None to indicate the path is not relevant to the cache.
"""
return None
@abc.abstractmethod
def objectFromDict(self, d, key):
"""Construct an object from ZooKeeper data
Given a dictionary of data from ZK and cache key, construct
and return an object to insert into the cache.
:param dict d: The dictionary.
:param object key: The key as returned by parsePath.
"""
pass

252
zuul/zk/launcher.py Normal file
View File

@ -0,0 +1,252 @@
# Copyright 2024 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
from operator import attrgetter
import mmh3
from kazoo.exceptions import NoNodeError
from zuul.model import NodesetRequest, ProviderNode
from zuul.zk.cache import ZuulTreeCache
from zuul.zk.zkobject import ZKContext
def _dictToBytes(data):
return json.dumps(data).encode("utf-8")
def _bytesToDict(raw_data):
return json.loads(raw_data.decode("utf-8"))
def launcher_score(name, item):
return mmh3.hash(f"{name}-{item.uuid}", signed=False)
class LockableZKObjectCache(ZuulTreeCache):
def __init__(self, zk_client, updated_event, root, items_path,
locks_path, zkobject_class):
self.updated_event = updated_event
self.items_path = items_path
self.locks_path = locks_path
self.zkobject_class = zkobject_class
super().__init__(zk_client, root)
def _parsePath(self, path):
if not path.startswith(self.root):
return None
path = path[len(self.root) + 1:]
parts = path.split('/')
# We are interested in requests with a parts that look like:
# ([<self.items_path>, <self.locks_path>], <uuid>, ...)
if len(parts) < 2:
return None
return parts
def parsePath(self, path):
parts = self._parsePath(path)
if parts is None:
return None
if len(parts) != 2:
return None
if parts[0] != self.items_path:
return None
item_uuid = parts[-1]
return (item_uuid,)
def preCacheHook(self, event, exists):
parts = self._parsePath(event.path)
if parts is None:
return
# Expecting (<self.locks_path>, <uuid>, <lock>,)
if len(parts) != 3:
return
object_type, request_uuid, *_ = parts
if object_type != self.locks_path:
return
key = (request_uuid,)
request = self._cached_objects.get(key)
if not request:
return
self.updated_event()
request._set(is_locked=exists)
def postCacheHook(self, event, data, stat):
self.updated_event()
def objectFromDict(self, d, zstat):
return self.zkobject_class._fromRaw(d, zstat)
def getItem(self, item_id):
self.ensureReady()
return self._cached_objects.get((item_id,))
def getItems(self):
# get a copy of the values view to avoid runtime errors in the event
# the _cached_nodes dict gets updated while iterating
self.ensureReady()
return list(self._cached_objects.values())
class LauncherApi:
log = logging.getLogger("zuul.LauncherApi")
def __init__(self, zk_client, component_registry, component_info,
event_callback, connection_filter=None):
self.zk_client = zk_client
self.component_registry = component_registry
self.component_info = component_info
self.event_callback = event_callback
self.requests_cache = LockableZKObjectCache(
self.zk_client,
self.event_callback,
root=NodesetRequest.ROOT,
items_path=NodesetRequest.REQUESTS_PATH,
locks_path=NodesetRequest.LOCKS_PATH,
zkobject_class=NodesetRequest)
self.nodes_cache = LockableZKObjectCache(
self.zk_client,
self.event_callback,
root=ProviderNode.ROOT,
items_path=ProviderNode.NODES_PATH,
locks_path=ProviderNode.LOCKS_PATH,
zkobject_class=ProviderNode)
self.connection_filter = connection_filter
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
self.requests_cache.stop()
self.nodes_cache.stop()
def getMatchingRequests(self):
candidate_launchers = {
c.hostname: c for c in self.component_registry.all("launcher")}
candidate_names = set(candidate_launchers.keys())
for request in sorted(self.requests_cache.getItems(),
key=attrgetter("priority")):
if request.hasLock():
# We are holding a lock, so short-circuit here.
yield request
if request.is_locked:
# Request is locked by someone else
continue
score_launchers = (
set(request._lscores.keys()) if request._lscores else set())
missing_scores = candidate_names - score_launchers
if missing_scores or request._lscores is None:
# (Re-)compute launcher scores
request._set(_lscores={launcher_score(n, request): n
for n in candidate_names})
launcher_scores = sorted(request._lscores.items())
# self.log.debug("Launcher scores: %s", launcher_scores)
for score, launcher_name in launcher_scores:
launcher = candidate_launchers.get(launcher_name)
if not launcher:
continue
if launcher.state != launcher.RUNNING:
continue
if launcher.hostname == self.component_info.hostname:
yield request
break
def getNodesetRequest(self, request_id):
return self.requests_cache.getItem(request_id)
def getNodesetRequests(self):
return self.requests_cache.getItems()
def getMatchingProviderNodes(self):
all_launchers = {
c.hostname: c for c in self.component_registry.all("launcher")}
for node in self.nodes_cache.getItems():
if node.hasLock():
# We are holding a lock, so short-circuit here.
yield node
if node.is_locked:
# Node is locked by someone else
continue
candidate_launchers = {
n: c for n, c in all_launchers.items()
if not c.connection_filter
or node.connection_name in c.connection_filter}
candidate_names = set(candidate_launchers)
if node._lscores is None:
missing_scores = candidate_names
else:
score_launchers = set(node._lscores.keys())
missing_scores = candidate_names - score_launchers
if missing_scores or node._lscores is None:
# (Re-)compute launcher scores
node._set(_lscores={launcher_score(n, node): n
for n in candidate_names})
launcher_scores = sorted(node._lscores.items())
for score, launcher_name in launcher_scores:
launcher = candidate_launchers.get(launcher_name)
if not launcher:
# Launcher is no longer online
continue
if launcher.state != launcher.RUNNING:
continue
if launcher.hostname == self.component_info.hostname:
yield node
break
def getProviderNode(self, node_id):
return self.nodes_cache.getItem(node_id)
def getProviderNodes(self):
return self.nodes_cache.getItems()
def createZKContext(self, lock=None, log=None):
return ZKContext(
self.zk_client, lock, self.stop_event, log or self.log)
def cleanupNodes(self):
# TODO: This method currently just performs some basic cleanup and
# might need to be extended in the future.
for node in self.getProviderNodes():
if node.state != ProviderNode.State.USED:
continue
# FIXME: check if the node request still exists
if node.is_locked:
continue
if self.getNodesetRequest(node.request_id):
continue
if lock := node.acquireLock(self.zk_client):
try:
with self.createZKContext(lock) as ctx:
node.delete(ctx)
except NoNodeError:
# Node is already deleted
pass
finally:
node.releaseLock()

View File

@ -22,11 +22,12 @@ import types
import zlib
import collections
from kazoo.exceptions import NodeExistsError, NoNodeError
from kazoo.exceptions import LockTimeout, NodeExistsError, NoNodeError
from kazoo.retry import KazooRetry
from zuul.zk import sharding
from zuul.zk import ZooKeeperClient
from zuul.zk.locks import SessionAwareLock
class BaseZKContext:
@ -358,7 +359,7 @@ class ZKObject:
context.cumulative_read_bytes += len(compressed_data)
return compressed_data, zstat
def _load(self, context, path=None, deserialize=True):
def _load(self, context, path=None):
if path is None:
path = self.getPath()
if context.sessionIsInvalid():
@ -371,21 +372,26 @@ class ZKObject:
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
raise
if deserialize:
self._set(_zkobject_hash=None)
self._updateFromRaw(compressed_data, zstat, context)
@classmethod
def _fromRaw(cls, raw_data, zstat):
obj = cls()
obj._updateFromRaw(raw_data, zstat)
return obj
def _updateFromRaw(self, raw_data, zstat, context=None):
self._set(_zkobject_hash=None)
try:
data = zlib.decompress(compressed_data)
data = zlib.decompress(raw_data)
except zlib.error:
# Fallback for old, uncompressed data
data = compressed_data
if not deserialize:
return data
data = raw_data
self._set(**self.deserialize(data, context))
self._set(_zstat=zstat,
_zkobject_hash=hash(data),
_zkobject_compressed_size=len(compressed_data),
_zkobject_uncompressed_size=len(data),
)
_zkobject_compressed_size=len(raw_data),
_zkobject_uncompressed_size=len(data))
@staticmethod
def _retryableSave(context, create, path, compressed_data, version):
@ -524,3 +530,47 @@ class ShardedZKObject(ZKObject):
context.log.error(
"Exception saving ZKObject %s at %s", self, path)
raise
class LockableZKObject(ZKObject):
_lock = None
def getLockPath(self):
"""Return the path for the lock of this object in ZK
:returns: A string representation of the Znode path
"""
raise NotImplementedError()
def acquireLock(self, zk_client, blocking=True, timeout=None):
have_lock = False
lock = None
path = self.getLockPath()
try:
lock = SessionAwareLock(zk_client.client, path)
have_lock = lock.acquire(blocking, timeout)
except NoNodeError:
# Request disappeared
have_lock = False
except LockTimeout:
have_lock = False
self.log.error("Timeout trying to acquire lock: %s", path)
# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
if not have_lock:
return None
self._set(_lock=lock)
return lock
def releaseLock(self):
if self._lock is None:
return
self._lock.release()
self._set(_lock=None)
def hasLock(self):
if self._lock is None:
return False
return self._lock.is_still_valid()