Add support for sharded ZKObjects

Some data from the pipeline state needs to be sharded, e.g. the repo
state and the repo files from the build set.

Change-Id: I14432da0664d0f899287c23e7316aa007247565e
This commit is contained in:
Simon Westphahl
2021-10-05 11:04:27 +02:00
committed by James E. Blair
parent 4ee56af65c
commit 36782e35fd
3 changed files with 104 additions and 13 deletions

View File

@@ -48,7 +48,7 @@ from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
from zuul.zk.zkobject import ZKObject, ZKContext
from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
from zuul.zk.locks import tenant_write_lock
from kazoo.exceptions import KazooException, ZookeeperError
@@ -1434,7 +1434,7 @@ class TestChangeCache(ZooKeeperBaseTestCase):
break
class DummyZKObject(ZKObject):
class DummyZKObjectMixin:
_retry_interval = 0.1
def getPath(self):
@@ -1446,24 +1446,32 @@ class DummyZKObject(ZKObject):
return json.dumps(d).encode('utf-8')
class DummyZKObject(DummyZKObjectMixin, ZKObject):
pass
class DummyShardedZKObject(DummyZKObjectMixin, ShardedZKObject):
pass
class TestZKObject(ZooKeeperBaseTestCase):
def test_zk_object(self):
def _test_zk_object(self, zkobject_class):
stop_event = threading.Event()
self.zk_client.client.create('/zuul/pipeline', makepath=True)
# Create a new object
tenant_name = 'fake_tenant'
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
pipeline1 = DummyZKObject.new(context,
name=tenant_name,
foo='bar')
pipeline1 = zkobject_class.new(context,
name=tenant_name,
foo='bar')
self.assertEqual(pipeline1.foo, 'bar')
# Load an object from ZK (that we don't already have)
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
pipeline2 = DummyZKObject.fromZK(context,
'/zuul/pipeline/fake_tenant')
pipeline2 = zkobject_class.fromZK(context,
'/zuul/pipeline/fake_tenant')
self.assertEqual(pipeline2.foo, 'bar')
# Update an object
@@ -1499,7 +1507,7 @@ class TestZKObject(ZooKeeperBaseTestCase):
self.assertIsNone(self.zk_client.client.exists(
'/zuul/pipeline/fake_tenant'))
def test_zk_object_exception(self):
def _test_zk_object_exception(self, zkobject_class):
# Exercise the exception handling in the _save method
stop_event = threading.Event()
self.zk_client.client.create('/zuul/pipeline', makepath=True)
@@ -1507,6 +1515,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
tenant_name = 'fake_tenant'
class ZKFailsOnUpdate:
def delete(self, *args, **kw):
raise ZookeeperError()
def set(self, *args, **kw):
raise ZookeeperError()
@@ -1515,6 +1526,15 @@ class TestZKObject(ZooKeeperBaseTestCase):
self.count = 0
self._real_client = real_client
def create(self, *args, **kw):
return self._real_client.create(*args, **kw)
def delete(self, *args, **kw):
self.count += 1
if self.count < 2:
raise KazooException()
return self._real_client.delete(*args, **kw)
def set(self, *args, **kw):
self.count += 1
if self.count < 2:
@@ -1524,9 +1544,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
# Fail an update
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
pipeline1 = DummyZKObject.new(context,
name=tenant_name,
foo='one')
pipeline1 = zkobject_class.new(context,
name=tenant_name,
foo='one')
self.assertEqual(pipeline1.foo, 'one')
# Simulate a fatal ZK exception
@@ -1555,3 +1575,15 @@ class TestZKObject(ZooKeeperBaseTestCase):
with pipeline1.activeContext(context):
pipeline1.foo = 'five'
self.assertEqual(pipeline1.foo, 'five')
def test_zk_object(self):
self._test_zk_object(DummyZKObject)
def test_sharded_zk_object(self):
self._test_zk_object(DummyShardedZKObject)
def test_zk_object_exception(self):
self._test_zk_object_exception(DummyZKObject)
def test_sharded_zk_object_exception(self):
self._test_zk_object_exception(DummyShardedZKObject)