Delete LockableZKObject lock paths

We currently leak the lock path for LockableZKObjects (which are only
used for zuul-launcher).  To correct this, delete the lock path whenever
we delete the associated object.

This approach attempts to minimize additional ZK calls.

The main consideration when deleting lock paths is whether other actors
may be in contetion for the lock at the time of deleting.  By deleting
the lock path after deleting the object that we are protecting, that
should reduce contention since other actors should no longer be
interested in locking a path that does not exist.  However, due to system
latency and caches, that may still happen.

If we are unable to delete the lock path because another actor has
created a lock contender during our recursive delete, we will retry our
delete for up to 30 seconds.  This should give us ample opportunity to
complete the deletion.

If, after we sucessfully delete the lock path, another actor attempts
to acquire the lock, the normal Kazoo Lock recipe would fully re-create
the lock path.  In order to avoid this situation, we adjust our subclass
of Lock to indicate that it should not create the path when acquiring the
lock.  Instead, we will create the lock path when we originally create
the LockableZKObject itself.  Therefore, any failed lock attempt with a
NoNodeError can be interpreted as indicating the object no longer exists
(and is not lockable).

We do not currently create any LockableZKObjects with a corresponding
lock, but that does seem like something we may wish to do in the
future.  To accommodate that, we create the lock directory immediately
before creating the object itself.  If, in the future, we want to lock
the object on creation, we can insert that instruction between the
two.

Because acquiring and releasing the lock can produce some errors
interesting enough to log, those methods are updated to accept a
ZKContext principally so that we can use both the ZK Client object as
well as a logger.  We don't currently check the lock supplied with the
context, but we might wish to do so in the future in the acquisition
method.  We will probably never want to check the lock or session in
the release method, but let's still pass a context for consistency.

Change-Id: Ic87e92a5e553e753f6fa43bb0674bbe40e7c2597
This commit is contained in:
James E. Blair 2024-11-25 13:24:26 -08:00
parent ab30d104ba
commit 50e1338c32
7 changed files with 165 additions and 87 deletions

View File

@ -773,7 +773,7 @@ class TestMinReadyLauncher(LauncherBaseTestCase):
node.acquireLock(ctx)
node.updateAttributes(ctx, state_time=0)
finally:
node.releaseLock()
node.releaseLock(ctx)
for _ in iterate_timeout(60, "node to be cleaned up"):
nodes = self.launcher.api.nodes_cache.getItems()

View File

@ -2659,7 +2659,7 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
self.assertIsNotNone(request._zstat)
self.assertIsNotNone(request.getPath())
self.assertIsNotNone(request.acquireLock(self.zk_client))
self.assertIsNotNone(request.acquireLock(context))
self.assertTrue(request.hasLock())
for _ in iterate_timeout(10, "request to be locked"):
if request.is_locked:
@ -2687,7 +2687,7 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
# "Fulfill" requested provider nodes
for node in self.api.getMatchingProviderNodes():
self.assertIsNotNone(node.acquireLock(self.zk_client))
self.assertIsNotNone(node.acquireLock(context))
self.assertTrue(node.hasLock())
for _ in iterate_timeout(10, "node to be locked"):
if node.is_locked:
@ -2696,7 +2696,7 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
context,
state=model.ProviderNode.State.READY
)
node.releaseLock()
node.releaseLock(context)
self.assertFalse(node.hasLock())
for _ in iterate_timeout(10, "node to be unlocked"):
if not node.is_locked:
@ -2717,7 +2717,7 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
context,
state=model.NodesetRequest.State.FULFILLED,
)
request.releaseLock()
request.releaseLock(context)
self.assertFalse(request.hasLock())
for _ in iterate_timeout(10, "request to be unlocked"):
if not request.is_locked:
@ -2742,7 +2742,7 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
# Mark nodes as used
for node in provider_nodes:
self.assertIsNotNone(node.acquireLock(self.zk_client))
self.assertIsNotNone(node.acquireLock(context))
for _ in iterate_timeout(10, "wait for lock to show up"):
if node.is_locked:
break
@ -2750,7 +2750,7 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
context,
state=model.ProviderNode.State.USED,
)
node.releaseLock()
node.releaseLock(context)
# Cleanup used nodes and wait for them to be removed from the cache
for _ in iterate_timeout(10, "nodes to be removed"):

View File

@ -134,19 +134,19 @@ class LauncherClient:
provider_node = getattr(node, "_provider_node", None)
if not provider_node:
continue
try:
node.state = STATE_USED
with self.createZKContext(provider_node._lock, log) as ctx:
with self.createZKContext(provider_node._lock, log) as ctx:
try:
node.state = STATE_USED
provider_node.updateAttributes(
ctx, state=ProviderNode.State.USED)
log.debug("Released %s", provider_node)
except Exception:
log.exception("Unable to return node %s", provider_node)
finally:
try:
provider_node.releaseLock()
log.debug("Released %s", provider_node)
except Exception:
log.exception("Error unlocking node %s", provider_node)
log.exception("Unable to return node %s", provider_node)
finally:
try:
provider_node.releaseLock(ctx)
except Exception:
log.exception("Error unlocking node %s", provider_node)
def createZKContext(self, lock, log):
return ZKContext(self.zk_client, lock, self.stop_event, log)

View File

@ -103,48 +103,50 @@ class UploadJob:
# and skip download
acquired = []
path = None
try:
with self.launcher.createZKContext(None, self.log) as ctx:
try:
with self.image_build_artifact.locked(
self.launcher.zk_client, blocking=False):
for upload in self.uploads:
if upload.acquireLock(
self.launcher.zk_client, blocking=False):
acquired.append(upload)
self.log.debug("Acquired upload lock for %s",
try:
with self.image_build_artifact.locked(ctx, blocking=False):
for upload in self.uploads:
if upload.acquireLock(ctx, blocking=False):
acquired.append(upload)
self.log.debug("Acquired upload lock for %s",
upload)
except LockException:
return
if not acquired:
return
path = self.launcher.downloadArtifact(
self.image_build_artifact)
futures = []
for upload in acquired:
future = self.launcher.endpoint_upload_executor.submit(
EndpointUploadJob(
self.launcher, self.image_build_artifact,
upload, path).run)
futures.append((upload, future))
for upload, future in futures:
try:
future.result()
self.log.info("Finished upload %s", upload)
except Exception:
self.log.exception("Unable to upload image %s", upload)
finally:
for upload in acquired:
try:
upload.releaseLock(ctx)
self.log.debug("Released upload lock for %s", upload)
except Exception:
self.log.exception("Unable to release lock for %s",
upload)
except LockException:
return
if not acquired:
return
path = self.launcher.downloadArtifact(self.image_build_artifact)
futures = []
for upload in acquired:
future = self.launcher.endpoint_upload_executor.submit(
EndpointUploadJob(self.launcher, self.image_build_artifact,
upload, path).run)
futures.append((upload, future))
for upload, future in futures:
try:
future.result()
self.log.info("Finished upload %s", upload)
except Exception:
self.log.exception("Unable to upload image %s", upload)
finally:
for upload in acquired:
try:
upload.releaseLock()
self.log.debug("Released upload lock for %s", upload)
except Exception:
self.log.exception("Unable to release lock for %s", upload)
if path:
try:
os.unlink(path)
self.log.info("Deleted %s", path)
except Exception:
self.log.exception("Unable to delete %s", path)
if path:
try:
os.unlink(path)
self.log.info("Deleted %s", path)
except Exception:
self.log.exception("Unable to delete %s", path)
class EndpointUploadJob:
@ -324,9 +326,11 @@ class Launcher:
# Nothing to do here
continue
log.debug("Got request %s", request)
if not request.acquireLock(self.zk_client, blocking=False):
log.debug("Failed to lock matching request %s", request)
continue
with self.createZKContext(None, log) as ctx:
if not request.acquireLock(ctx, blocking=False):
log.debug("Failed to lock matching request %s",
request)
continue
if not self._cachesReadyForRequest(request):
self.log.debug("Caches are not up-to-date for %s", request)
@ -350,7 +354,8 @@ class Launcher:
except Exception:
log.exception("Error processing request %s", request)
if request.state in request.FINAL_STATES:
request.releaseLock()
with self.createZKContext(None, log) as ctx:
request.releaseLock(ctx)
def _cachesReadyForRequest(self, request):
# Make sure we have all associated provider nodes in the cache
@ -384,7 +389,7 @@ class Launcher:
else:
continue
if not node.acquireLock(self.zk_client, blocking=False):
if not node.acquireLock(ctx, blocking=False):
log.debug("Failed to lock matching ready node %s",
node)
continue
@ -406,7 +411,7 @@ class Launcher:
log.exception("Faild to assign ready node %s", node)
continue
finally:
node.releaseLock()
node.releaseLock(ctx)
else:
node = self._requestNode(
label, request, provider, log, ctx)
@ -525,9 +530,10 @@ class Launcher:
if not self._isNodeActionable(node):
continue
if not node.acquireLock(self.zk_client, blocking=False):
log.debug("Failed to lock matching node %s", node)
continue
with self.createZKContext(None, log) as ctx:
if not node.acquireLock(ctx, blocking=False):
log.debug("Failed to lock matching node %s", node)
continue
request = self.api.getNodesetRequest(node.request_id)
if ((request or node.request_id is None)
@ -583,7 +589,8 @@ class Launcher:
self.wake_event.set()
if node.state == model.ProviderNode.State.READY:
node.releaseLock()
with self.createZKContext(None, self.log) as ctx:
node.releaseLock(ctx)
def _isNodeActionable(self, node):
if node.is_locked:
@ -634,7 +641,7 @@ class Launcher:
node.setState(node.State.READY)
self.wake_event.set()
log.debug("Marking node %s as %s", node, node.state)
node.releaseLock()
node.releaseLock(ctx)
def _cleanupNode(self, node, log):
with self.createZKContext(node._lock, self.log) as ctx:
@ -671,7 +678,7 @@ class Launcher:
if not self.api.getNodesetRequest(node.request_id):
log.debug("Removing provider node %s", node)
node.delete(ctx)
node.releaseLock()
node.releaseLock(ctx)
def _processMinReady(self):
if not self.api.nodes_cache.waitForSync(

View File

@ -245,12 +245,13 @@ class LauncherApi:
continue
if self.getNodesetRequest(node.request_id):
continue
if lock := node.acquireLock(self.zk_client):
try:
with self.createZKContext(None) as outer_ctx:
if lock := node.acquireLock(outer_ctx):
with self.createZKContext(lock) as ctx:
node.delete(ctx)
except NoNodeError:
# Node is already deleted
pass
finally:
node.releaseLock()
try:
node.delete(ctx)
except NoNodeError:
# Node is already deleted
pass
finally:
node.releaseLock(ctx)

View File

@ -28,15 +28,22 @@ CONNECTION_LOCK_ROOT = f"{LOCK_ROOT}/connection"
class SessionAwareMixin:
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
def __init__(self, client, path, identifier=None, extra_lock_patterns=(),
ensure_path=True):
self._zuul_ephemeral = None
self._zuul_session_expired = False
self._zuul_watching_session = False
self._zuul_seen_contenders = set()
self._zuul_seen_contender_names = set()
self._zuul_contender_watch = None
self._zuul_ensure_path = ensure_path
super().__init__(client, path, identifier, extra_lock_patterns)
def _ensure_path(self):
# Override
if self._zuul_ensure_path:
return super()._ensure_path()
def acquire(self, blocking=True, timeout=None, ephemeral=True):
ret = super().acquire(blocking, timeout, ephemeral)
self._zuul_session_expired = False

View File

@ -19,11 +19,17 @@ import contextlib
import json
import logging
import sys
import time
import types
import zlib
import collections
from kazoo.exceptions import LockTimeout, NodeExistsError, NoNodeError
from kazoo.exceptions import (
LockTimeout,
NoNodeError,
NodeExistsError,
NotEmptyError,
)
from kazoo.retry import KazooRetry
from zuul.zk import sharding
@ -141,6 +147,7 @@ class ZKObject:
_retry_interval = 5
_zkobject_compressed_size = 0
_zkobject_uncompressed_size = 0
_deleted = False
io_reader_class = sharding.RawZKIO
io_writer_class = sharding.RawZKIO
truncate_on_create = False
@ -293,6 +300,7 @@ class ZKObject:
context.log.error(
"Exception deleting ZKObject %s at %s", self, path)
raise
self._set(_deleted=True)
def estimateDataSize(self, seen=None):
"""Attempt to find all ZKObjects below this one and sum their
@ -513,32 +521,80 @@ class LockableZKObject(ZKObject):
"""
raise NotImplementedError()
@classmethod
def new(klass, context, **kw):
"""Create a new instance and save it in ZooKeeper"""
obj = klass()
obj._set(**kw)
# Create the lock path first. In the future, if we want to
# support creating locked objects, we can acquire it here.
obj._createLockPath(context)
data = obj._trySerialize(context)
obj._save(context, data, create=True)
return obj
def _createLockPath(self, context):
if isinstance(context, LocalZKContext):
return
path = self.getLockPath()
if context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
try:
self._retry(context, context.client.ensure_path, path)
except Exception:
context.log.error(
"Exception creating ZKObject %s lock path at %s", self, path)
raise
def _deleteLockPath(self, client):
path = self.getLockPath()
# Give other actors 30 seconds to realize this object
# doesn't exist anymore and they should stop trying to
# lock it.
for x in range(30):
# This handles connection-related retries, but not
# conflicts. We ignore the session here because we're
# trying to delete a lock; we want to do that even if we
# lose whatever lock the context might have.
kazoo_retry = KazooRetry(max_tries=-1,
delay=self._retry_interval,
backoff=0)
try:
return kazoo_retry(client.delete, path, recursive=True)
except NotEmptyError:
time.sleep(1)
@contextmanager
def locked(self, zk_client, blocking=True, timeout=None):
if not (lock := self.acquireLock(zk_client, blocking=blocking,
def locked(self, context, blocking=True, timeout=None):
if not (lock := self.acquireLock(context, blocking=blocking,
timeout=timeout)):
raise LockException(f"Failed to acquire lock on {self}")
try:
yield lock
finally:
try:
self.releaseLock()
self.releaseLock(context)
except Exception:
self.log.exception("Failed to release lock on %s", self)
context.log.exception("Failed to release lock on %s", self)
def acquireLock(self, zk_client, blocking=True, timeout=None):
def acquireLock(self, context, blocking=True, timeout=None):
have_lock = False
lock = None
path = self.getLockPath()
try:
lock = SessionAwareLock(zk_client.client, path)
# We create the lock path when we create the object in ZK,
# so there is no need to ensure the path on lock. This
# lets us avoid re-creating the lock if the object was
# deleted behind our back.
lock = SessionAwareLock(context.client, path,
ensure_path=False)
have_lock = lock.acquire(blocking, timeout)
except NoNodeError:
# Request disappeared
have_lock = False
except LockTimeout:
have_lock = False
self.log.error("Timeout trying to acquire lock: %s", path)
context.log.error("Timeout trying to acquire lock: %s", path)
# If we aren't blocking, it's possible we didn't get the lock
# because someone else has it.
@ -548,11 +604,18 @@ class LockableZKObject(ZKObject):
self._set(_lock=lock)
return lock
def releaseLock(self):
def releaseLock(self, context):
if self._lock is None:
return
self._lock.release()
self._set(_lock=None)
if self._deleted:
# If we are releasing the lock after deleting the object,
# also cleanup the lock path.
try:
self._deleteLockPath(context.client)
except Exception:
context.log.error("Unable to delete lock path for %s", self)
def hasLock(self):
if self._lock is None: