Fix 'is_locked' flag for items in ZKObject cache
This change fixes a problem with the LockableZKObjectCache cache and the `is_locked` indicator. The problem was that the pre-cache hook did not take into account multiple lock contender nodes and was setting the `is_locked` flag to false when any of the contenders was deleted. To fix this the items in the cache will keep a set of lock contenders and consider the node locked as long as contenders exist. Change-Id: I4341498b09b34ada39fe19e6ceab2f66187b7361
This commit is contained in:
@@ -50,7 +50,7 @@ from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.executor import ExecutorApi
|
||||
from zuul.zk.job_request_queue import JobRequestEvent
|
||||
from zuul.zk.merger import MergerApi
|
||||
from zuul.zk.launcher import LauncherApi
|
||||
from zuul.zk.launcher import LauncherApi, LockableZKObjectCache
|
||||
from zuul.zk.layout import LayoutStateStore, LayoutState
|
||||
from zuul.zk.locks import locked
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
@@ -76,7 +76,11 @@ from tests.base import (
|
||||
ZOOKEEPER_SESSION_TIMEOUT,
|
||||
)
|
||||
from zuul.zk.zkobject import (
|
||||
ShardedZKObject, PolymorphicZKObjectMixin, ZKObject, ZKContext
|
||||
LockableZKObject,
|
||||
PolymorphicZKObjectMixin,
|
||||
ShardedZKObject,
|
||||
ZKContext,
|
||||
ZKObject,
|
||||
)
|
||||
from zuul.zk.locks import tenant_write_lock
|
||||
|
||||
@@ -2968,3 +2972,78 @@ class TestLauncherApi(ZooKeeperBaseTestCase):
|
||||
used = self.api.nodes_cache.getQuota(provider)
|
||||
if used.quota.get('instances') == 0:
|
||||
break
|
||||
|
||||
|
||||
class DummyLockable(LockableZKObject):
|
||||
ROOT = "/test/dummy"
|
||||
DUMMY_PATH = "dummy"
|
||||
LOCKS_PATH = "locks"
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._set(
|
||||
uuid=uuid.uuid4().hex,
|
||||
is_locked=False,
|
||||
)
|
||||
|
||||
def serialize(self, context):
|
||||
return json.dumps({
|
||||
"uuid": self.uuid,
|
||||
}).encode("utf8")
|
||||
|
||||
def getPath(self):
|
||||
return f"{self.ROOT}/{self.DUMMY_PATH}/{self.uuid}"
|
||||
|
||||
def getLockPath(self):
|
||||
return f"{self.ROOT}/{self.LOCKS_PATH}/{self.uuid}"
|
||||
|
||||
|
||||
class TestLockableZKObjectCache(ZooKeeperBaseTestCase):
|
||||
|
||||
def test_is_locked_contenders(self):
|
||||
cache = LockableZKObjectCache(
|
||||
self.zk_client,
|
||||
None,
|
||||
root=DummyLockable.ROOT,
|
||||
items_path=DummyLockable.DUMMY_PATH,
|
||||
locks_path=DummyLockable.LOCKS_PATH,
|
||||
zkobject_class=DummyLockable)
|
||||
|
||||
ctx = ZKContext(self.zk_client, None, None, self.log)
|
||||
dummy = DummyLockable.new(ctx)
|
||||
|
||||
for _ in iterate_timeout(10, "cache to sync"):
|
||||
if cache.getItems():
|
||||
break
|
||||
|
||||
# Acquire lock for all items
|
||||
for item in cache.getItems():
|
||||
item.acquireLock(ctx)
|
||||
|
||||
for _ in iterate_timeout(10, "cache to sync"):
|
||||
if all(d.is_locked for d in cache.getItems()):
|
||||
break
|
||||
|
||||
# Create a dummy lock contender
|
||||
contender_path = f"{dummy.getLockPath()}/deadbeef__lock__0000000001"
|
||||
self.zk_client.client.create(contender_path)
|
||||
|
||||
# Make sure items are still considered locked
|
||||
for _ in iterate_timeout(10, "cache to sync"):
|
||||
if all(d.is_locked for d in cache.getItems()):
|
||||
break
|
||||
|
||||
# Discard the pending lock and make sure items are still
|
||||
# considered locked
|
||||
self.zk_client.client.delete(contender_path)
|
||||
for _ in iterate_timeout(10, "cache to sync"):
|
||||
if all(d.is_locked for d in cache.getItems()):
|
||||
break
|
||||
|
||||
# Release the lock and confirm items are no longer considered locked
|
||||
for item in cache.getItems():
|
||||
item.releaseLock(ctx)
|
||||
|
||||
for _ in iterate_timeout(10, "cache to sync"):
|
||||
if not any(d.is_locked for d in cache.getItems()):
|
||||
break
|
||||
|
||||
@@ -78,7 +78,7 @@ class LockableZKObjectCache(ZuulTreeCache):
|
||||
if len(parts) != 3:
|
||||
return
|
||||
|
||||
object_type, request_uuid, *_ = parts
|
||||
object_type, request_uuid, contender, *_ = parts
|
||||
if object_type != self.locks_path:
|
||||
return
|
||||
|
||||
@@ -88,8 +88,14 @@ class LockableZKObjectCache(ZuulTreeCache):
|
||||
if not request:
|
||||
return
|
||||
|
||||
if request.is_locked != exists:
|
||||
request._set(is_locked=exists)
|
||||
if exists:
|
||||
request._lock_contenders.add(contender)
|
||||
else:
|
||||
request._lock_contenders.discard(contender)
|
||||
|
||||
is_locked = bool(request._lock_contenders)
|
||||
if request.is_locked != is_locked:
|
||||
request._set(is_locked=is_locked)
|
||||
if self.updated_event:
|
||||
self.updated_event()
|
||||
|
||||
|
||||
@@ -518,6 +518,11 @@ class ShardedZKObject(ZKObject):
|
||||
class LockableZKObject(ZKObject):
|
||||
_lock = None
|
||||
|
||||
def __new__(klass, *args, **kwargs):
|
||||
zko = super().__new__(klass)
|
||||
zko._set(_lock_contenders=set())
|
||||
return zko
|
||||
|
||||
def getLockPath(self):
|
||||
"""Return the path for the lock of this object in ZK
|
||||
|
||||
|
||||
Reference in New Issue
Block a user