zuul/tests/unit/test_zk.py

2120 lines
75 KiB
Python

# Copyright 2019 Red Hat, Inc.
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
import json
import queue
import threading
import uuid
from unittest import mock
import testtools
from zuul import model
from zuul.lib import yamlutil as yaml
from zuul.model import BuildRequest, HoldRequest, MergeRequest
from zuul.zk import ZooKeeperClient
from zuul.zk.blob_store import BlobStore
from zuul.zk.branch_cache import BranchCache
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache
from zuul.zk.exceptions import LockException
from zuul.zk.executor import ExecutorApi
from zuul.zk.job_request_queue import JobRequestEvent
from zuul.zk.merger import MergerApi
from zuul.zk.layout import LayoutStateStore, LayoutState
from zuul.zk.locks import locked
from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.zk.sharding import (
RawShardIO,
BufferedShardReader,
BufferedShardWriter,
NODE_BYTE_SIZE_LIMIT,
)
from zuul.zk.components import (
BaseComponent, ComponentRegistry, ExecutorComponent, COMPONENT_REGISTRY
)
from tests.base import (
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
iterate_timeout
)
from zuul.zk.zkobject import (
ShardedZKObject, ZKObject, ZKContext
)
from zuul.zk.locks import tenant_write_lock
from kazoo.exceptions import ZookeeperError, OperationTimeoutError, NoNodeError
class ZooKeeperBaseTestCase(BaseTestCase):
def setUp(self):
super().setUp()
self.setupZK()
self.zk_client = ZooKeeperClient(
self.zk_chroot_fixture.zk_hosts,
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
tls_key=self.zk_chroot_fixture.zookeeper_key,
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect()
self.component_registry = ComponentRegistry(self.zk_client)
# We don't have any other component to initialize the global
# registry in these tests, so we do it ourselves.
COMPONENT_REGISTRY.create(self.zk_client)
class TestZookeeperClient(ZooKeeperBaseTestCase):
def test_ltime(self):
ltime = self.zk_client.getCurrentLtime()
self.assertGreaterEqual(ltime, 0)
self.assertIsInstance(ltime, int)
self.assertGreater(self.zk_client.getCurrentLtime(), ltime)
class TestNodepool(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
def _createRequest(self):
req = HoldRequest()
req.count = 1
req.reason = 'some reason'
req.expiration = 1
return req
def test_hold_requests_api(self):
# Test no requests returns empty list
self.assertEqual([], self.zk_nodepool.getHoldRequests())
# Test get on non-existent request is None
self.assertIsNone(self.zk_nodepool.getHoldRequest('anything'))
# Test creating a new request
req1 = self._createRequest()
self.zk_nodepool.storeHoldRequest(req1)
self.assertIsNotNone(req1.id)
self.assertEqual(1, len(self.zk_nodepool.getHoldRequests()))
# Test getting the request
req2 = self.zk_nodepool.getHoldRequest(req1.id)
self.assertEqual(req1.toDict(), req2.toDict())
# Test updating the request
req2.reason = 'a new reason'
self.zk_nodepool.storeHoldRequest(req2)
req2 = self.zk_nodepool.getHoldRequest(req2.id)
self.assertNotEqual(req1.reason, req2.reason)
# Test lock operations
self.zk_nodepool.lockHoldRequest(req2, blocking=False)
with testtools.ExpectedException(
LockException, "Timeout trying to acquire lock .*"
):
self.zk_nodepool.lockHoldRequest(req2, blocking=True, timeout=2)
self.zk_nodepool.unlockHoldRequest(req2)
self.assertIsNone(req2.lock)
# Test deleting the request
self.zk_nodepool.deleteHoldRequest(req1)
self.assertEqual([], self.zk_nodepool.getHoldRequests())
class TestSharding(ZooKeeperBaseTestCase):
def test_reader(self):
shard_io = RawShardIO(self.zk_client.client, "/test/shards")
self.assertEqual(len(shard_io._shards), 0)
with BufferedShardReader(
self.zk_client.client, "/test/shards"
) as shard_reader:
self.assertEqual(shard_reader.read(), b"")
shard_io.write(b"foobar")
self.assertEqual(len(shard_io._shards), 1)
self.assertEqual(shard_io.read(), b"foobar")
def test_writer(self):
shard_io = RawShardIO(self.zk_client.client, "/test/shards")
self.assertEqual(len(shard_io._shards), 0)
with BufferedShardWriter(
self.zk_client.client, "/test/shards"
) as shard_writer:
shard_writer.write(b"foobar")
self.assertEqual(len(shard_io._shards), 1)
self.assertEqual(shard_io.read(), b"foobar")
def test_truncate(self):
shard_io = RawShardIO(self.zk_client.client, "/test/shards")
shard_io.write(b"foobar")
self.assertEqual(len(shard_io._shards), 1)
with BufferedShardWriter(
self.zk_client.client, "/test/shards"
) as shard_writer:
shard_writer.truncate(0)
self.assertEqual(len(shard_io._shards), 0)
def test_shard_bytes_limit(self):
with BufferedShardWriter(
self.zk_client.client, "/test/shards"
) as shard_writer:
shard_writer.write(b"x" * (NODE_BYTE_SIZE_LIMIT + 1))
shard_writer.flush()
self.assertEqual(len(shard_writer.raw._shards), 2)
def test_json(self):
data = {"key": "value"}
with BufferedShardWriter(
self.zk_client.client, "/test/shards"
) as shard_io:
shard_io.write(json.dumps(data).encode("utf8"))
with BufferedShardReader(
self.zk_client.client, "/test/shards"
) as shard_io:
self.assertDictEqual(json.load(shard_io), data)
class TestUnparsedConfigCache(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.config_cache = UnparsedConfigCache(self.zk_client)
def test_files_cache(self):
master_files = self.config_cache.getFilesCache("project", "master")
with self.config_cache.readLock("project"):
self.assertEqual(len(master_files), 0)
with self.config_cache.writeLock("project"):
master_files["/path/to/file"] = "content"
with self.config_cache.readLock("project"):
self.assertEqual(master_files["/path/to/file"], "content")
self.assertEqual(len(master_files), 1)
with self.config_cache.writeLock("project"):
master_files.clear()
self.assertEqual(len(master_files), 0)
def test_valid_for(self):
tpc = model.TenantProjectConfig("project")
tpc.extra_config_files = {"foo.yaml", "bar.yaml"}
tpc.extra_config_dirs = {"foo.d/", "bar.d/"}
master_files = self.config_cache.getFilesCache("project", "master")
self.assertFalse(master_files.isValidFor(tpc, min_ltime=-1))
master_files.setValidFor(tpc.extra_config_files, tpc.extra_config_dirs,
ltime=1)
self.assertTrue(master_files.isValidFor(tpc, min_ltime=-1))
tpc.extra_config_files = set()
tpc.extra_config_dirs = set()
self.assertTrue(master_files.isValidFor(tpc, min_ltime=-1))
self.assertFalse(master_files.isValidFor(tpc, min_ltime=2))
tpc.extra_config_files = {"bar.yaml"}
tpc.extra_config_dirs = {"bar.d/"}
# Valid for subset
self.assertTrue(master_files.isValidFor(tpc, min_ltime=-1))
tpc.extra_config_files = {"foo.yaml", "bar.yaml"}
tpc.extra_config_dirs = {"foo.d/", "bar.d/", "other.d/"}
# Invalid for additional dirs
self.assertFalse(master_files.isValidFor(tpc, min_ltime=-1))
self.assertFalse(master_files.isValidFor(tpc, min_ltime=2))
tpc.extra_config_files = {"foo.yaml", "bar.yaml", "other.yaml"}
tpc.extra_config_dirs = {"foo.d/", "bar.d/"}
# Invalid for additional files
self.assertFalse(master_files.isValidFor(tpc, min_ltime=-1))
self.assertFalse(master_files.isValidFor(tpc, min_ltime=2))
def test_cache_ltime(self):
cache = self.config_cache.getFilesCache("project", "master")
self.assertEqual(cache.ltime, -1)
cache.setValidFor(set(), set(), ltime=1)
self.assertEqual(cache.ltime, 1)
def test_branch_cleanup(self):
master_files = self.config_cache.getFilesCache("project", "master")
release_files = self.config_cache.getFilesCache("project", "release")
master_files["/path/to/file"] = "content"
release_files["/path/to/file"] = "content"
self.config_cache.clearCache("project", "master")
self.assertEqual(len(master_files), 0)
self.assertEqual(len(release_files), 1)
def test_project_cleanup(self):
master_files = self.config_cache.getFilesCache("project", "master")
stable_files = self.config_cache.getFilesCache("project", "stable")
other_files = self.config_cache.getFilesCache("other", "master")
self.assertEqual(len(master_files), 0)
self.assertEqual(len(stable_files), 0)
master_files["/path/to/file"] = "content"
stable_files["/path/to/file"] = "content"
other_files["/path/to/file"] = "content"
self.assertEqual(len(master_files), 1)
self.assertEqual(len(stable_files), 1)
self.assertEqual(len(other_files), 1)
self.config_cache.clearCache("project")
self.assertEqual(len(master_files), 0)
self.assertEqual(len(stable_files), 0)
self.assertEqual(len(other_files), 1)
class TestComponentRegistry(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.second_zk_client = ZooKeeperClient(
self.zk_chroot_fixture.zk_hosts,
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
tls_key=self.zk_chroot_fixture.zookeeper_key,
tls_ca=self.zk_chroot_fixture.zookeeper_ca,
)
self.addCleanup(self.second_zk_client.disconnect)
self.second_zk_client.connect()
self.second_component_registry = ComponentRegistry(
self.second_zk_client)
def assertComponentAttr(self, component_name, attr_name,
attr_value, timeout=25):
for _ in iterate_timeout(
timeout,
f"{component_name} in cache has {attr_name} set to {attr_value}",
):
components = list(self.second_component_registry.all(
component_name))
if (
len(components) > 0 and
getattr(components[0], attr_name) == attr_value
):
break
def assertComponentState(self, component_name, state, timeout=25):
return self.assertComponentAttr(
component_name, "state", state, timeout
)
def assertComponentStopped(self, component_name, timeout=25):
for _ in iterate_timeout(
timeout, f"{component_name} in cache is stopped"
):
components = list(self.second_component_registry.all(
component_name))
if len(components) == 0:
break
def test_component_registry(self):
self.component_info = ExecutorComponent(self.zk_client, 'test')
self.component_info.register()
self.assertComponentState("executor", BaseComponent.STOPPED)
self.zk_client.client.stop()
self.assertComponentStopped("executor")
self.zk_client.client.start()
self.assertComponentState("executor", BaseComponent.STOPPED)
self.component_info.state = self.component_info.RUNNING
self.assertComponentState("executor", BaseComponent.RUNNING)
self.log.debug("DISCONNECT")
self.second_zk_client.client.stop()
self.second_zk_client.client.start()
self.log.debug("RECONNECT")
self.component_info.state = self.component_info.PAUSED
self.assertComponentState("executor", BaseComponent.PAUSED)
# Make sure the registry didn't create any read/write
# component objects that re-registered themselves.
components = list(self.second_component_registry.all('executor'))
self.assertEqual(len(components), 1)
self.component_info.state = self.component_info.RUNNING
self.assertComponentState("executor", BaseComponent.RUNNING)
class TestExecutorApi(ZooKeeperBaseTestCase):
def test_build_request(self):
# Test the lifecycle of a build request
request_queue = queue.Queue()
event_queue = queue.Queue()
# A callback closure for the request queue
def rq_put():
request_queue.put(None)
# and the event queue
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = ExecutorApi(self.zk_client)
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
# Scheduler submits request
request = BuildRequest(
"A", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
client.submit(request, {'job': 'test'})
request_queue.get(timeout=30)
# Executor receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
params = client.getParams(a)
self.assertEqual(params, {'job': 'test'})
client.clearParams(a)
params = client.getParams(a)
self.assertIsNone(params)
# Executor locks request
self.assertTrue(server.lock(a, blocking=False))
a.state = BuildRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, BuildRequest.RUNNING)
# Executor should see no pending requests
reqs = list(server.next())
self.assertEqual(len(reqs), 0)
# Executor pauses build
a.state = BuildRequest.PAUSED
server.update(a)
self.assertEqual(client.get(a.path).state, BuildRequest.PAUSED)
# Scheduler resumes build
self.assertTrue(event_queue.empty())
sched_a = client.get(a.path)
client.requestResume(sched_a)
(build_request, event) = event_queue.get(timeout=30)
self.assertEqual(build_request, a)
self.assertEqual(event, JobRequestEvent.RESUMED)
# Executor resumes build
a.state = BuildRequest.RUNNING
server.update(a)
server.fulfillResume(a)
self.assertEqual(client.get(a.path).state, BuildRequest.RUNNING)
# Scheduler cancels build
self.assertTrue(event_queue.empty())
sched_a = client.get(a.path)
client.requestCancel(sched_a)
(build_request, event) = event_queue.get(timeout=30)
self.assertEqual(build_request, a)
self.assertEqual(event, JobRequestEvent.CANCELED)
# Executor aborts build
a.state = BuildRequest.COMPLETED
server.update(a)
server.fulfillCancel(a)
server.unlock(a)
self.assertEqual(client.get(a.path).state, BuildRequest.COMPLETED)
# Scheduler removes build request on completion
client.remove(sched_a)
self.assertEqual(set(self.getZKPaths('/zuul/executor')),
set(['/zuul/executor/unzoned',
'/zuul/executor/unzoned/locks',
'/zuul/executor/unzoned/params',
'/zuul/executor/unzoned/requests',
'/zuul/executor/unzoned/result-data',
'/zuul/executor/unzoned/results',
'/zuul/executor/unzoned/waiters']))
self.assertEqual(self.getZKWatches(), {})
def test_build_request_remove(self):
# Test the scheduler forcibly removing a request (perhaps the
# tenant is being deleted, so there will be no result queue).
request_queue = queue.Queue()
event_queue = queue.Queue()
def rq_put():
request_queue.put(None)
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = ExecutorApi(self.zk_client)
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
# Scheduler submits request
request = BuildRequest(
"A", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
client.submit(request, {})
request_queue.get(timeout=30)
# Executor receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
# Executor locks request
self.assertTrue(server.lock(a, blocking=False))
a.state = BuildRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, BuildRequest.RUNNING)
# Executor should see no pending requests
reqs = list(server.next())
self.assertEqual(len(reqs), 0)
self.assertTrue(event_queue.empty())
# Scheduler rudely removes build request
sched_a = client.get(a.path)
client.remove(sched_a)
# Make sure it shows up as deleted
(build_request, event) = event_queue.get(timeout=30)
self.assertEqual(build_request, a)
self.assertEqual(event, JobRequestEvent.DELETED)
# Executor should not write anything else since the request
# was deleted.
def test_build_request_hold(self):
# Test that we can hold a build request in "queue"
request_queue = queue.Queue()
event_queue = queue.Queue()
def rq_put():
request_queue.put(None)
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = HoldableExecutorApi(self.zk_client)
client.hold_in_queue = True
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
# Scheduler submits request
request = BuildRequest(
"A", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
client.submit(request, {})
request_queue.get(timeout=30)
# Executor receives nothing
reqs = list(server.next())
self.assertEqual(len(reqs), 0)
# Test releases hold
a = client.get(request.path)
self.assertEqual(a.uuid, 'A')
a.state = BuildRequest.REQUESTED
client.update(a)
# Executor receives request
request_queue.get(timeout=30)
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
# The rest is redundant.
def test_nonexistent_lock(self):
request_queue = queue.Queue()
event_queue = queue.Queue()
def rq_put():
request_queue.put(None)
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = ExecutorApi(self.zk_client)
# Scheduler submits request
request = BuildRequest(
"A", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
client.submit(request, {})
sched_a = client.get(request.path)
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
exec_a = server.get(request.path)
client.remove(sched_a)
# Try to lock a request that was just removed
self.assertFalse(server.lock(exec_a))
def test_efficient_removal(self):
# Test that we don't try to lock a removed request
request_queue = queue.Queue()
event_queue = queue.Queue()
def rq_put():
request_queue.put(None)
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = ExecutorApi(self.zk_client)
# Scheduler submits two requests
request_a = BuildRequest(
"A", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
client.submit(request_a, {})
request_b = BuildRequest(
"B", None, None, "job", "job_uuid", "tenant", "pipeline", '2')
client.submit(request_b, {})
sched_b = client.get(request_b.path)
request_c = BuildRequest(
"C", None, None, "job", "job_uuid", "tenant", "pipeline", '3')
client.submit(request_c, {})
sched_c = client.get(request_c.path)
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
count = 0
for exec_request in server.next():
count += 1
if count == 1:
# Someone starts the second request and client removes
# the third request all while we're processing the first.
sched_b.state = sched_b.RUNNING
client.update(sched_b)
client.remove(sched_c)
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (len(server.zone_queues[None]._cached_requests) == 2):
break
# Make sure we only got the first request
self.assertEqual(count, 1)
def test_lost_build_requests(self):
# Test that lostBuildRequests() returns unlocked running build
# requests
executor_api = ExecutorApi(self.zk_client)
br = BuildRequest(
"A", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
br = BuildRequest(
"B", None, None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_b = br.path
br = BuildRequest(
"C", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_c = br.path
br = BuildRequest(
"D", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_d = br.path
br = BuildRequest(
"E", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
path_e = br.path
b = executor_api.get(path_b)
c = executor_api.get(path_c)
d = executor_api.get(path_d)
e = executor_api.get(path_e)
# Make sure the get() method used the correct zone keys
self.assertEqual(set(executor_api.zone_queues.keys()), {"zone", None})
b.state = BuildRequest.RUNNING
executor_api.update(b)
c.state = BuildRequest.RUNNING
executor_api.lock(c)
executor_api.update(c)
d.state = BuildRequest.COMPLETED
executor_api.update(d)
e.state = BuildRequest.PAUSED
executor_api.update(e)
# Wait until the latest state transition is reflected in the Executor
# APIs cache. Using a DataWatch for this purpose could lead to race
# conditions depending on which DataWatch is executed first. The
# DataWatch might be triggered for the correct event, but the cache
# might still be outdated as the DataWatch that updates the cache
# itself wasn't triggered yet.
b_cache = executor_api.zone_queues[None]._cached_requests
e_cache = executor_api.zone_queues['zone']._cached_requests
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (b_cache[path_b].state == BuildRequest.RUNNING and
e_cache[path_e].state == BuildRequest.PAUSED):
break
# The lost_builds method should only return builds which are running or
# paused, but not locked by any executor, in this case build b and e.
lost_build_requests = list(executor_api.lostRequests())
self.assertEqual(2, len(lost_build_requests))
self.assertEqual(b.path, lost_build_requests[0].path)
def test_lost_build_request_params(self):
# Test cleaning up orphaned request parameters
executor_api = ExecutorApi(self.zk_client)
br = BuildRequest(
"A", "zone", None, "job", "job_uuid", "tenant", "pipeline", '1')
executor_api.submit(br, {})
params_root = executor_api.zone_queues['zone'].PARAM_ROOT
self.assertEqual(len(executor_api._getAllRequestIds()), 1)
self.assertEqual(len(
self.zk_client.client.get_children(params_root)), 1)
# Delete the request but not the params
self.zk_client.client.delete(br.path)
self.assertEqual(len(executor_api._getAllRequestIds()), 0)
self.assertEqual(len(
self.zk_client.client.get_children(params_root)), 1)
# Clean up leaked params
executor_api.cleanup(0)
self.assertEqual(len(
self.zk_client.client.get_children(params_root)), 0)
def test_existing_build_request(self):
# Test that an executor sees an existing build request when
# coming online
# Test the lifecycle of a build request
request_queue = queue.Queue()
event_queue = queue.Queue()
# A callback closure for the request queue
def rq_put():
request_queue.put(None)
# and the event queue
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = ExecutorApi(self.zk_client)
client.submit(
BuildRequest(
"A", None, None, "job", "job_uuid",
"tenant", "pipeline", '1'), {})
# Simulate the server side
server = ExecutorApi(self.zk_client,
build_request_callback=rq_put,
build_event_callback=eq_put)
# Scheduler submits request
request_queue.get(timeout=30)
# Executor receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
class TestMergerApi(ZooKeeperBaseTestCase):
def _assertEmptyRoots(self, client):
self.assertEqual(self.getZKPaths(client.REQUEST_ROOT), [])
self.assertEqual(self.getZKPaths(client.PARAM_ROOT), [])
self.assertEqual(self.getZKPaths(client.RESULT_ROOT), [])
self.assertEqual(self.getZKPaths(client.RESULT_DATA_ROOT), [])
self.assertEqual(self.getZKPaths(client.WAITER_ROOT), [])
self.assertEqual(self.getZKPaths(client.LOCK_ROOT), [])
self.assertEqual(self.getZKWatches(), {})
def test_merge_request(self):
# Test the lifecycle of a merge request
request_queue = queue.Queue()
# A callback closure for the request queue
def rq_put():
request_queue.put(None)
# Simulate the client side
client = MergerApi(self.zk_client)
# Simulate the server side
server = MergerApi(self.zk_client,
merge_request_callback=rq_put)
# Scheduler submits request
payload = {'merge': 'test'}
request = MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
)
client.submit(request, payload)
request_queue.get(timeout=30)
# Merger receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
params = client.getParams(a)
self.assertEqual(params, payload)
client.clearParams(a)
params = client.getParams(a)
self.assertIsNone(params)
# Merger locks request
self.assertTrue(server.lock(a, blocking=False))
a.state = MergeRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, MergeRequest.RUNNING)
# Merger should see no pending requests
reqs = list(server.next())
self.assertEqual(len(reqs), 0)
# Merger removes and unlocks merge request on completion
server.remove(a)
server.unlock(a)
self._assertEmptyRoots(client)
def test_merge_request_hold(self):
# Test that we can hold a merge request in "queue"
request_queue = queue.Queue()
def rq_put():
request_queue.put(None)
# Simulate the client side
client = HoldableMergerApi(self.zk_client)
client.hold_in_queue = True
# Simulate the server side
server = MergerApi(self.zk_client,
merge_request_callback=rq_put)
# Scheduler submits request
payload = {'merge': 'test'}
client.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
request_queue.get(timeout=30)
# Merger receives nothing
reqs = list(server.next())
self.assertEqual(len(reqs), 0)
# Test releases hold
# We have to get a new merge_request object to update it.
a = client.get(f"{client.REQUEST_ROOT}/A")
self.assertEqual(a.uuid, 'A')
a.state = MergeRequest.REQUESTED
client.update(a)
# Merger receives request
request_queue.get(timeout=30)
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
server.remove(a)
# The rest is redundant.
self._assertEmptyRoots(client)
def test_merge_request_result(self):
# Test the lifecycle of a merge request
request_queue = queue.Queue()
# A callback closure for the request queue
def rq_put():
request_queue.put(None)
# Simulate the client side
client = MergerApi(self.zk_client)
# Simulate the server side
server = MergerApi(self.zk_client,
merge_request_callback=rq_put)
# Scheduler submits request
payload = {'merge': 'test'}
future = client.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload, needs_result=True)
request_queue.get(timeout=30)
# Merger receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
# Merger locks request
self.assertTrue(server.lock(a, blocking=False))
a.state = MergeRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, MergeRequest.RUNNING)
# Merger reports result
result_data = {'result': 'ok'}
server.reportResult(a, result_data)
self.assertEqual(set(self.getZKPaths(client.RESULT_ROOT)),
set(['/zuul/merger/results/A']))
self.assertEqual(set(self.getZKPaths(client.RESULT_DATA_ROOT)),
set(['/zuul/merger/result-data/A',
'/zuul/merger/result-data/A/0000000000']))
self.assertEqual(self.getZKPaths(client.WAITER_ROOT),
['/zuul/merger/waiters/A'])
# Merger removes and unlocks merge request on completion
server.remove(a)
server.unlock(a)
# Scheduler awaits result
self.assertTrue(future.wait())
self.assertEqual(future.data, result_data)
self._assertEmptyRoots(client)
def test_lost_merge_request_params(self):
# Test cleaning up orphaned request parameters
merger_api = MergerApi(self.zk_client)
# Scheduler submits request
payload = {'merge': 'test'}
merger_api.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
path_a = '/'.join([merger_api.REQUEST_ROOT, 'A'])
params_root = merger_api.PARAM_ROOT
self.assertEqual(len(merger_api._getAllRequestIds()), 1)
self.assertEqual(len(
self.zk_client.client.get_children(params_root)), 1)
# Delete the request but not the params
self.zk_client.client.delete(path_a)
self.assertEqual(len(merger_api._getAllRequestIds()), 0)
self.assertEqual(len(
self.zk_client.client.get_children(params_root)), 1)
# Clean up leaked params
merger_api.cleanup(0)
self.assertEqual(len(
self.zk_client.client.get_children(params_root)), 0)
self._assertEmptyRoots(merger_api)
def test_lost_merge_request_result(self):
# Test that we can clean up orphaned merge results
request_queue = queue.Queue()
# A callback closure for the request queue
def rq_put():
request_queue.put(None)
# Simulate the client side
client = MergerApi(self.zk_client)
# Simulate the server side
server = MergerApi(self.zk_client,
merge_request_callback=rq_put)
# Scheduler submits request
payload = {'merge': 'test'}
future = client.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload, needs_result=True)
request_queue.get(timeout=30)
# Merger receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
# Merger locks request
self.assertTrue(server.lock(a, blocking=False))
a.state = MergeRequest.RUNNING
server.update(a)
self.assertEqual(client.get(a.path).state, MergeRequest.RUNNING)
# Merger reports result
result_data = {'result': 'ok'}
server.reportResult(a, result_data)
# Merger removes and unlocks merge request on completion
server.remove(a)
server.unlock(a)
self.assertEqual(set(self.getZKPaths(client.RESULT_ROOT)),
set(['/zuul/merger/results/A']))
self.assertEqual(set(self.getZKPaths(client.RESULT_DATA_ROOT)),
set(['/zuul/merger/result-data/A',
'/zuul/merger/result-data/A/0000000000']))
self.assertEqual(self.getZKPaths(client.WAITER_ROOT),
['/zuul/merger/waiters/A'])
# Scheduler "disconnects"
self.zk_client.client.delete(future._waiter_path)
# Find orphaned results
client.cleanup(age=0)
self._assertEmptyRoots(client)
def test_nonexistent_lock(self):
request_queue = queue.Queue()
def rq_put():
request_queue.put(None)
# Simulate the client side
client = MergerApi(self.zk_client)
# Scheduler submits request
payload = {'merge': 'test'}
client.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
client_a = client.get(f"{client.REQUEST_ROOT}/A")
# Simulate the server side
server = MergerApi(self.zk_client,
merge_request_callback=rq_put)
server_a = list(server.next())[0]
client.remove(client_a)
# Try to lock a request that was just removed
self.assertFalse(server.lock(server_a))
self._assertEmptyRoots(client)
def test_efficient_removal(self):
# Test that we don't try to lock a removed request
request_queue = queue.Queue()
event_queue = queue.Queue()
def rq_put():
request_queue.put(None)
def eq_put(br, e):
event_queue.put((br, e))
# Simulate the client side
client = MergerApi(self.zk_client)
# Scheduler submits three requests
payload = {'merge': 'test'}
client.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
client.submit(MergeRequest(
uuid='B',
job_type=MergeRequest.MERGE,
build_set_uuid='BB',
tenant_name='tenant',
pipeline_name='check',
event_id='2',
), payload)
client_b = client.get(f"{client.REQUEST_ROOT}/B")
client.submit(MergeRequest(
uuid='C',
job_type=MergeRequest.MERGE,
build_set_uuid='CC',
tenant_name='tenant',
pipeline_name='check',
event_id='2',
), payload)
client_c = client.get(f"{client.REQUEST_ROOT}/C")
# Simulate the server side
server = MergerApi(self.zk_client,
merge_request_callback=rq_put)
count = 0
for merge_request in server.next():
count += 1
if count == 1:
# Someone starts the second request and client removes
# the third request all while we're processing the first.
client_b.state = client_b.RUNNING
client.update(client_b)
client.remove(client_c)
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (len(server._cached_requests) == 2):
break
# Make sure we only got the first request
self.assertEqual(count, 1)
def test_leaked_lock(self):
client = MergerApi(self.zk_client)
# Manually create a lock with no underlying request
self.zk_client.client.create(f"{client.LOCK_ROOT}/A", b'')
client.cleanup(0)
self._assertEmptyRoots(client)
def test_lost_merge_requests(self):
# Test that lostMergeRequests() returns unlocked running merge
# requests
merger_api = MergerApi(self.zk_client)
payload = {'merge': 'test'}
merger_api.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
merger_api.submit(MergeRequest(
uuid='B',
job_type=MergeRequest.MERGE,
build_set_uuid='BB',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
merger_api.submit(MergeRequest(
uuid='C',
job_type=MergeRequest.MERGE,
build_set_uuid='CC',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
merger_api.submit(MergeRequest(
uuid='D',
job_type=MergeRequest.MERGE,
build_set_uuid='DD',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
b = merger_api.get(f"{merger_api.REQUEST_ROOT}/B")
c = merger_api.get(f"{merger_api.REQUEST_ROOT}/C")
d = merger_api.get(f"{merger_api.REQUEST_ROOT}/D")
b.state = MergeRequest.RUNNING
merger_api.update(b)
merger_api.lock(c)
c.state = MergeRequest.RUNNING
merger_api.update(c)
d.state = MergeRequest.COMPLETED
merger_api.update(d)
# Wait until the latest state transition is reflected in the Merger
# APIs cache. Using a DataWatch for this purpose could lead to race
# conditions depending on which DataWatch is executed first. The
# DataWatch might be triggered for the correct event, but the cache
# might still be outdated as the DataWatch that updates the cache
# itself wasn't triggered yet.
cache = merger_api._cached_requests
for _ in iterate_timeout(30, "cache to be up-to-date"):
if (cache[b.path].state == MergeRequest.RUNNING and
cache[c.path].state == MergeRequest.RUNNING):
break
# The lost_merges method should only return merges which are running
# but not locked by any merger, in this case merge b
lost_merge_requests = list(merger_api.lostRequests())
self.assertEqual(1, len(lost_merge_requests))
self.assertEqual(b.path, lost_merge_requests[0].path)
# This test does not clean them up, so we can't assert empty roots
def test_existing_merge_request(self):
# Test that a merger sees an existing merge request when
# coming online
# Test the lifecycle of a merge request
request_queue = queue.Queue()
# A callback closure for the request queue
def rq_put():
request_queue.put(None)
# Simulate the client side
client = MergerApi(self.zk_client)
payload = {'merge': 'test'}
client.submit(MergeRequest(
uuid='A',
job_type=MergeRequest.MERGE,
build_set_uuid='AA',
tenant_name='tenant',
pipeline_name='check',
event_id='1',
), payload)
# Simulate the server side
server = MergerApi(self.zk_client,
merge_request_callback=rq_put)
# Scheduler submits request
request_queue.get(timeout=30)
# Merger receives request
reqs = list(server.next())
self.assertEqual(len(reqs), 1)
a = reqs[0]
self.assertEqual(a.uuid, 'A')
client.remove(a)
self._assertEmptyRoots(client)
class TestLocks(ZooKeeperBaseTestCase):
def test_locking_ctx(self):
lock = self.zk_client.client.Lock("/lock")
with locked(lock) as ctx_lock:
self.assertIs(lock, ctx_lock)
self.assertTrue(lock.is_acquired)
self.assertFalse(lock.is_acquired)
def test_already_locked_ctx(self):
lock = self.zk_client.client.Lock("/lock")
other_lock = self.zk_client.client.Lock("/lock")
other_lock.acquire()
with testtools.ExpectedException(
LockException, "Failed to acquire lock .*"
):
with locked(lock, blocking=False):
pass
self.assertFalse(lock.is_acquired)
def test_unlock_exception(self):
lock = self.zk_client.client.Lock("/lock")
with testtools.ExpectedException(RuntimeError):
with locked(lock):
self.assertTrue(lock.is_acquired)
raise RuntimeError
self.assertFalse(lock.is_acquired)
class TestLayoutStore(ZooKeeperBaseTestCase):
def test_layout_state(self):
store = LayoutStateStore(self.zk_client, lambda: None)
layout_uuid = uuid.uuid4().hex
branch_cache_min_ltimes = {
"gerrit": 123,
"github": 456,
}
state = LayoutState("tenant", "hostname", 0, layout_uuid,
branch_cache_min_ltimes, -1)
store["tenant"] = state
self.assertEqual(state, store["tenant"])
self.assertNotEqual(state.ltime, -1)
self.assertNotEqual(store["tenant"].ltime, -1)
self.assertEqual(store["tenant"].branch_cache_min_ltimes,
branch_cache_min_ltimes)
def test_ordering(self):
layout_uuid = uuid.uuid4().hex
state_one = LayoutState("tenant", "hostname", 1, layout_uuid,
{}, -1, ltime=1)
state_two = LayoutState("tenant", "hostname", 2, layout_uuid,
{}, -1, ltime=2)
self.assertGreater(state_two, state_one)
def test_cleanup(self):
store = LayoutStateStore(self.zk_client, lambda: None)
min_ltimes = defaultdict(lambda x: -1)
min_ltimes['foo'] = 1
state_one = LayoutState("tenant", "hostname", 1, uuid.uuid4().hex,
{}, -1, ltime=1)
state_two = LayoutState("tenant", "hostname", 2, uuid.uuid4().hex,
{}, -1, ltime=2)
store.setMinLtimes(state_one, min_ltimes)
store.setMinLtimes(state_two, min_ltimes)
store['tenant'] = state_one
# Run with the default delay of 5 minutes; nothing should be deleted.
store.cleanup()
self.assertEqual(store.get('tenant'), state_one)
self.assertIsNotNone(
self.zk_client.client.exists(
f'/zuul/layout-data/{state_one.uuid}'))
self.assertIsNotNone(
self.zk_client.client.exists(
f'/zuul/layout-data/{state_two.uuid}'))
# Run again with immediate deletion
store.cleanup(delay=0)
self.assertEqual(store.get('tenant'), state_one)
self.assertIsNotNone(
self.zk_client.client.exists(
f'/zuul/layout-data/{state_one.uuid}'))
self.assertIsNone(
self.zk_client.client.exists(
f'/zuul/layout-data/{state_two.uuid}'))
class TestSystemConfigCache(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.config_cache = SystemConfigCache(self.zk_client, lambda: None)
def test_set_get(self):
uac = model.UnparsedAbideConfig()
uac.tenants = {"foo": "bar"}
uac.authz_rules = ["bar", "foo"]
attrs = model.SystemAttributes.fromDict({
"use_relative_priority": True,
"max_hold_expiration": 7200,
"default_hold_expiration": 3600,
"default_ansible_version": "X",
"web_root": "/web/root",
"web_status_url": "/web/status",
"websocket_url": "/web/socket",
})
self.config_cache.set(uac, attrs)
uac_cached, cached_attrs = self.config_cache.get()
self.assertEqual(uac.uuid, uac_cached.uuid)
self.assertEqual(uac.tenants, uac_cached.tenants)
self.assertEqual(uac.authz_rules, uac_cached.authz_rules)
self.assertEqual(attrs, cached_attrs)
def test_cache_empty(self):
with testtools.ExpectedException(RuntimeError):
self.config_cache.get()
def test_ltime(self):
uac = model.UnparsedAbideConfig()
attrs = model.SystemAttributes()
self.assertEqual(self.config_cache.ltime, -1)
self.config_cache.set(uac, attrs)
self.assertGreater(self.config_cache.ltime, -1)
self.assertEqual(uac.ltime, self.config_cache.ltime)
old_ltime = self.config_cache.ltime
self.config_cache.set(uac, attrs)
self.assertGreater(self.config_cache.ltime, old_ltime)
self.assertEqual(uac.ltime, self.config_cache.ltime)
cache_uac, _ = self.config_cache.get()
self.assertEqual(uac.ltime, cache_uac.ltime)
def test_valid(self):
uac = model.UnparsedAbideConfig()
attrs = model.SystemAttributes()
self.assertFalse(self.config_cache.is_valid)
self.config_cache.set(uac, attrs)
self.assertTrue(self.config_cache.is_valid)
class DummyChange:
def __init__(self, project, data=None):
self.uid = uuid.uuid4().hex
self.project = project
self.cache_stat = None
if data is not None:
self.deserialize(data)
@property
def cache_version(self):
return -1 if self.cache_stat is None else self.cache_stat.version
def serialize(self):
d = self.__dict__.copy()
d.pop('cache_stat')
return d
def deserialize(self, data):
self.__dict__.update(data)
def getRelatedChanges(self, sched, relevant):
return
class DummyChangeCache(AbstractChangeCache):
CHANGE_TYPE_MAP = {
"DummyChange": DummyChange,
}
class DummySource:
def getProject(self, project_name):
return project_name
def getChange(self, change_key):
return DummyChange('project')
class DummyConnections:
def getSource(self, name):
return DummySource()
class DummyScheduler:
def __init__(self):
self.connections = DummyConnections()
class DummyConnection:
def __init__(self):
self.connection_name = "DummyConnection"
self.source = DummySource()
self.sched = DummyScheduler()
class TestChangeCache(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.cache = DummyChangeCache(self.zk_client, DummyConnection())
def test_insert(self):
change_foo = DummyChange("project", {"foo": "bar"})
change_bar = DummyChange("project", {"bar": "foo"})
key_foo = ChangeKey('conn', 'project', 'change', 'foo', '1')
key_bar = ChangeKey('conn', 'project', 'change', 'bar', '1')
self.cache.set(key_foo, change_foo)
self.cache.set(key_bar, change_bar)
self.assertEqual(self.cache.get(key_foo), change_foo)
self.assertEqual(self.cache.get(key_bar), change_bar)
compressed_size, uncompressed_size = self.cache.estimateDataSize()
self.assertTrue(compressed_size != uncompressed_size != 0)
def test_update(self):
change = DummyChange("project", {"foo": "bar"})
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
change.number = 123
self.cache.set(key, change, change.cache_version)
# The change instance must stay the same
updated_change = self.cache.get(key)
self.assertIs(change, updated_change)
self.assertEqual(change.number, 123)
def test_delete(self):
change = DummyChange("project", {"foo": "bar"})
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
self.cache.delete(key)
self.assertIsNone(self.cache.get(key))
# Deleting an non-existent key should not raise an exception
invalid_key = ChangeKey('conn', 'project', 'change', 'invalid', '1')
self.cache.delete(invalid_key)
def test_concurrent_delete(self):
change = DummyChange("project", {"foo": "bar"})
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
old_version = change.cache_version
# Simulate someone updating the cache after we decided to
# delete the entry
self.cache.set(key, change, old_version)
self.assertNotEqual(old_version, change.cache_version)
self.cache.delete(key, old_version)
# The change should still be in the cache
self.assertIsNotNone(self.cache.get(key))
def test_prune(self):
change1 = DummyChange("project", {"foo": "bar"})
change2 = DummyChange("project", {"foo": "baz"})
key1 = ChangeKey('conn', 'project', 'change', 'foo', '1')
key2 = ChangeKey('conn', 'project', 'change', 'foo', '2')
self.cache.set(key1, change1)
self.cache.set(key2, change2)
self.cache.prune([key1], max_age=0)
self.assertIsNotNone(self.cache.get(key1))
self.assertIsNone(self.cache.get(key2))
def test_concurrent_update(self):
change = DummyChange("project", {"foo": "bar"})
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
# Attempt to update with the old change stat
with testtools.ExpectedException(ConcurrentUpdateError):
self.cache.set(key, change, change.cache_version - 1)
def test_change_update_retry(self):
change = DummyChange("project", {"foobar": 0})
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
# Update the change so we have a new cache stat.
change.foobar = 1
self.cache.set(key, change, change.cache_version)
self.assertEqual(self.cache.get(key).foobar, 1)
def updater(c):
c.foobar += 1
# Change the cache stat so the change is considered outdated and we
# need to retry because of a concurrent update error.
change.cache_stat = model.CacheStat(change.cache_stat.key,
uuid.uuid4().hex,
change.cache_version - 1,
change.cache_stat.mzxid - 1,
0, 0, 0)
updated_change = self.cache.updateChangeWithRetry(
key, change, updater)
self.assertEqual(updated_change.foobar, 2)
def test_cache_sync(self):
other_cache = DummyChangeCache(self.zk_client, DummyConnection())
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
change = DummyChange("project", {"foo": "bar"})
self.cache.set(key, change)
self.assertIsNotNone(other_cache.get(key))
change_other = other_cache.get(key)
change_other.number = 123
other_cache.set(key, change_other, change_other.cache_version)
for _ in iterate_timeout(10, "update to propagate"):
if getattr(change, "number", None) == 123:
break
other_cache.delete(key)
self.assertIsNone(self.cache.get(key))
def test_cache_sync_on_start(self):
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
change = DummyChange("project", {"foo": "bar"})
self.cache.set(key, change)
change.number = 123
self.cache.set(key, change, change.cache_version)
other_cache = DummyChangeCache(self.zk_client, DummyConnection())
other_cache.cleanup()
other_cache.cleanup()
self.assertIsNotNone(other_cache.get(key))
def test_cleanup(self):
change = DummyChange("project", {"foo": "bar"})
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
self.cache.cleanup()
self.assertEqual(len(self.cache._data_cleanup_candidates), 0)
self.assertEqual(
len(self.zk_client.client.get_children(self.cache.data_root)), 1)
change.number = 123
self.cache.set(key, change, change.cache_version)
self.cache.cleanup()
self.assertEqual(len(self.cache._data_cleanup_candidates), 1)
self.assertEqual(
len(self.zk_client.client.get_children(self.cache.data_root)), 2)
self.cache.cleanup()
self.assertEqual(len(self.cache._data_cleanup_candidates), 0)
self.assertEqual(
len(self.zk_client.client.get_children(self.cache.data_root)), 1)
def test_watch_cleanup(self):
change = DummyChange("project", {"foo": "bar"})
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
for _ in iterate_timeout(10, "watch to be registered"):
if change.cache_stat.key._hash in self.cache._watched_keys:
break
self.cache.delete(key)
self.assertIsNone(self.cache.get(key))
for _ in iterate_timeout(10, "watch to be removed"):
if change.cache_stat.key._hash not in self.cache._watched_keys:
break
class DummyZKObjectMixin:
_retry_interval = 0.1
def getPath(self):
return f'/zuul/pipeline/{self.name}'
def serialize(self, context):
d = {'name': self.name,
'foo': self.foo}
return json.dumps(d).encode('utf-8')
class DummyZKObject(DummyZKObjectMixin, ZKObject):
pass
class DummyShardedZKObject(DummyZKObjectMixin, ShardedZKObject):
pass
class TestZKObject(ZooKeeperBaseTestCase):
def _test_zk_object(self, zkobject_class):
stop_event = threading.Event()
self.zk_client.client.create('/zuul/pipeline', makepath=True)
# Create a new object
tenant_name = 'fake_tenant'
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
pipeline1 = zkobject_class.new(context,
name=tenant_name,
foo='bar')
self.assertEqual(pipeline1.foo, 'bar')
compressed_size, uncompressed_size = pipeline1.estimateDataSize()
self.assertTrue(compressed_size != uncompressed_size != 0)
# Load an object from ZK (that we don't already have)
with tenant_write_lock(self.zk_client, tenant_name) as lock:
context = ZKContext(self.zk_client, lock, stop_event, self.log)
pipeline2 = zkobject_class.fromZK(context,
'/zuul/pipeline/fake_tenant')
self.assertEqual(pipeline2.foo, 'bar')
compressed_size, uncompressed_size = pipeline2.estimateDataSize()
self.assertTrue(compressed_size != uncompressed_size != 0)
# Test that nested ZKObject sizes are summed up correctly
p1_compressed, p1_uncompressed = pipeline1.estimateDataSize()
p2_compressed, p2_uncompressed = pipeline2.estimateDataSize()
pipeline2._set(other=pipeline1)
compressed_size, uncompressed_size = pipeline2.estimateDataSize()
self.assertEqual(compressed_size, p1_compressed + p2_compressed)
self.assertEqual(uncompressed_size, p1_uncompressed + p2_uncompressed)
def get_ltime(obj):
zstat = self.zk_client.client.exists(obj.getPath())
return zstat.last_modified_transaction_id
# Update an object
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
ZKContext(
self.zk_client, lock, stop_event, self.log) as context):
ltime1 = get_ltime(pipeline1)
pipeline1.updateAttributes(context, foo='qux')
self.assertEqual(pipeline1.foo, 'qux')
ltime2 = get_ltime(pipeline1)
self.assertNotEqual(ltime1, ltime2)
# This should not produce an unnecessary write
pipeline1.updateAttributes(context, foo='qux')
ltime3 = get_ltime(pipeline1)
self.assertEqual(ltime2, ltime3)
# Update an object using an active context
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
ZKContext(
self.zk_client, lock, stop_event, self.log) as context):
ltime1 = get_ltime(pipeline1)
with pipeline1.activeContext(context):
pipeline1.foo = 'baz'
self.assertEqual(pipeline1.foo, 'baz')
ltime2 = get_ltime(pipeline1)
self.assertNotEqual(ltime1, ltime2)
# This should not produce an unnecessary write
with pipeline1.activeContext(context):
pipeline1.foo = 'baz'
ltime3 = get_ltime(pipeline1)
self.assertEqual(ltime2, ltime3)
# Update of object w/o active context should not work
with testtools.ExpectedException(Exception):
pipeline1.foo = 'nope'
self.assertEqual(pipeline1.foo, 'baz')
# Refresh an existing object
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
ZKContext(
self.zk_client, lock, stop_event, self.log) as context):
pipeline2.refresh(context)
self.assertEqual(pipeline2.foo, 'baz')
# Delete an object
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
ZKContext(self.zk_client, lock, stop_event, self.log) as context):
self.assertIsNotNone(self.zk_client.client.exists(
'/zuul/pipeline/fake_tenant'))
pipeline2.delete(context)
self.assertIsNone(self.zk_client.client.exists(
'/zuul/pipeline/fake_tenant'))
def _test_zk_object_exception(self, zkobject_class):
# Exercise the exception handling in the _save method
stop_event = threading.Event()
self.zk_client.client.create('/zuul/pipeline', makepath=True)
# Create a new object
tenant_name = 'fake_tenant'
class ZKFailsOnUpdate:
def delete(self, *args, **kw):
raise ZookeeperError()
def set(self, *args, **kw):
raise ZookeeperError()
class FailsOnce:
def __init__(self, real_client):
self.count = 0
self._real_client = real_client
def create(self, *args, **kw):
return self._real_client.create(*args, **kw)
def delete(self, *args, **kw):
self.count += 1
if self.count < 2:
raise OperationTimeoutError()
return self._real_client.delete(*args, **kw)
def set(self, *args, **kw):
self.count += 1
if self.count < 2:
raise OperationTimeoutError()
return self._real_client.set(*args, **kw)
# Fail an update
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
ZKContext(
self.zk_client, lock, stop_event, self.log) as context):
pipeline1 = zkobject_class.new(context,
name=tenant_name,
foo='one')
self.assertEqual(pipeline1.foo, 'one')
# Simulate a fatal ZK exception
context.client = ZKFailsOnUpdate()
with testtools.ExpectedException(ZookeeperError):
pipeline1.updateAttributes(context, foo='two')
# We should still have the old attribute
self.assertEqual(pipeline1.foo, 'one')
# Any other error is retryable
context.client = FailsOnce(self.zk_client.client)
pipeline1.updateAttributes(context, foo='three')
# This time it should be updated
self.assertEqual(pipeline1.foo, 'three')
# Repeat test using an active context
context.client = ZKFailsOnUpdate()
with testtools.ExpectedException(ZookeeperError):
with pipeline1.activeContext(context):
pipeline1.foo = 'four'
self.assertEqual(pipeline1.foo, 'three')
context.client = FailsOnce(self.zk_client.client)
with pipeline1.activeContext(context):
pipeline1.foo = 'five'
self.assertEqual(pipeline1.foo, 'five')
def test_zk_object(self):
self._test_zk_object(DummyZKObject)
def test_sharded_zk_object(self):
self._test_zk_object(DummyShardedZKObject)
def test_zk_object_exception(self):
self._test_zk_object_exception(DummyZKObject)
def test_sharded_zk_object_exception(self):
self._test_zk_object_exception(DummyShardedZKObject)
class TestBranchCache(ZooKeeperBaseTestCase):
def test_branch_cache_protected_then_all(self):
conn = DummyConnection()
cache = BranchCache(self.zk_client, conn, self.component_registry)
test_data = {
'project1': {
'all': ['protected1', 'protected2',
'unprotected1', 'unprotected2'],
'protected': ['protected1', 'protected2'],
},
}
# Test a protected-only query followed by all
cache.setProjectBranches('project1', True,
test_data['project1']['protected'])
self.assertEqual(
sorted(cache.getProjectBranches('project1', True)),
test_data['project1']['protected']
)
self.assertRaises(
LookupError,
lambda: cache.getProjectBranches('project1', False)
)
cache.setProjectBranches('project1', False,
test_data['project1']['all'])
self.assertEqual(
sorted(cache.getProjectBranches('project1', True)),
test_data['project1']['protected']
)
self.assertEqual(
sorted(cache.getProjectBranches('project1', False)),
test_data['project1']['all']
)
def test_branch_cache_all_then_protected(self):
conn = DummyConnection()
cache = BranchCache(self.zk_client, conn, self.component_registry)
test_data = {
'project1': {
'all': ['protected1', 'protected2',
'unprotected1', 'unprotected2'],
'protected': ['protected1', 'protected2'],
},
}
self.assertRaises(
LookupError,
lambda: cache.getProjectBranches('project1', True)
)
self.assertRaises(
LookupError,
lambda: cache.getProjectBranches('project1', False)
)
# Test the other order; all followed by protected-only
cache.setProjectBranches('project1', False,
test_data['project1']['all'])
self.assertRaises(
LookupError,
lambda: cache.getProjectBranches('project1', True)
)
self.assertEqual(
sorted(cache.getProjectBranches('project1', False)),
test_data['project1']['all']
)
cache.setProjectBranches('project1', True,
test_data['project1']['protected'])
self.assertEqual(
sorted(cache.getProjectBranches('project1', True)),
test_data['project1']['protected']
)
self.assertEqual(
sorted(cache.getProjectBranches('project1', False)),
test_data['project1']['all']
)
def test_branch_cache_change_protected(self):
conn = DummyConnection()
cache = BranchCache(self.zk_client, conn, self.component_registry)
data1 = {
'project1': {
'all': ['newbranch', 'protected'],
'protected': ['protected'],
},
}
data2 = {
'project1': {
'all': ['newbranch', 'protected'],
'protected': ['newbranch', 'protected'],
},
}
# Create a new unprotected branch
cache.setProjectBranches('project1', False,
data1['project1']['all'])
cache.setProjectBranches('project1', True,
data1['project1']['protected'])
self.assertEqual(
cache.getProjectBranches('project1', True),
data1['project1']['protected']
)
self.assertEqual(
sorted(cache.getProjectBranches('project1', False)),
data1['project1']['all']
)
# Change it to protected
cache.setProtected('project1', 'newbranch', True)
self.assertEqual(
sorted(cache.getProjectBranches('project1', True)),
data2['project1']['protected']
)
self.assertEqual(
sorted(cache.getProjectBranches('project1', False)),
data2['project1']['all']
)
# Change it back
cache.setProtected('project1', 'newbranch', False)
self.assertEqual(
sorted(cache.getProjectBranches('project1', True)),
data1['project1']['protected']
)
self.assertEqual(
sorted(cache.getProjectBranches('project1', False)),
data1['project1']['all']
)
def test_branch_cache_lookup_error(self):
# Test that a missing branch cache entry results in a LookupError
conn = DummyConnection()
cache = BranchCache(self.zk_client, conn, self.component_registry)
self.assertRaises(
LookupError,
lambda: cache.getProjectBranches('project1', True)
)
self.assertIsNone(
cache.getProjectBranches('project1', True, default=None)
)
class TestConfigurationErrorList(ZooKeeperBaseTestCase):
def test_config_error_list(self):
stop_event = threading.Event()
self.zk_client.client.create('/zuul/pipeline', makepath=True)
source_context = model.SourceContext(
'cname', 'project', 'connection', 'branch', 'test', True)
m1 = yaml.Mark('name', 0, 0, 0, '', 0)
m2 = yaml.Mark('name', 1, 0, 0, '', 0)
start_mark = model.ZuulMark(m1, m2, 'hello')
# Create a new object
with (tenant_write_lock(self.zk_client, 'test') as lock,
ZKContext(
self.zk_client, lock, stop_event, self.log) as context):
pipeline = DummyZKObject.new(context, name="test", foo="bar")
e1 = model.ConfigurationError(
source_context, start_mark, "Test error1")
e2 = model.ConfigurationError(
source_context, start_mark, "Test error2")
with pipeline.activeContext(context):
path = '/zuul/pipeline/test/errors'
el1 = model.ConfigurationErrorList.new(
context, errors=[e1, e2], _path=path)
el2 = model.ConfigurationErrorList.fromZK(
context, path, _path=path)
self.assertEqual(el1.errors, el2.errors)
self.assertFalse(el1 is el2)
self.assertEqual(el1.errors[0], el2.errors[0])
self.assertEqual(el1.errors[0], e1)
self.assertNotEqual(e1, e2)
self.assertEqual([e1, e2], [e1, e2])
class TestBlobStore(ZooKeeperBaseTestCase):
def test_blob_store(self):
stop_event = threading.Event()
self.zk_client.client.create('/zuul/pipeline', makepath=True)
# Create a new object
tenant_name = 'fake_tenant'
start_ltime = self.zk_client.getCurrentLtime()
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
ZKContext(
self.zk_client, lock, stop_event, self.log) as context):
bs = BlobStore(context)
with testtools.ExpectedException(KeyError):
bs.get('nope')
path = bs.put(b'something')
self.assertEqual(bs.get(path), b'something')
self.assertEqual([x for x in bs], [path])
self.assertEqual(len(bs), 1)
self.assertTrue(path in bs)
self.assertFalse('nope' in bs)
self.assertTrue(bs._checkKey(path))
self.assertFalse(bs._checkKey('nope'))
cur_ltime = self.zk_client.getCurrentLtime()
self.assertEqual(bs.getKeysLastUsedBefore(cur_ltime), {path})
self.assertEqual(bs.getKeysLastUsedBefore(start_ltime), set())
bs.delete(path, cur_ltime)
with testtools.ExpectedException(KeyError):
bs.get(path)
class TestPipelineInit(ZooKeeperBaseTestCase):
# Test the initialize-on-refresh code paths of various pipeline objects
def test_pipeline_state_new_object(self):
# Test the initialize-on-refresh code path with no existing object
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.state = model.PipelineState.create(
pipeline, pipeline.state)
context = ZKContext(self.zk_client, None, None, self.log)
pipeline.state.refresh(context)
self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
def test_pipeline_state_existing_object(self):
# Test the initialize-on-refresh code path with a pre-existing object
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.manager = mock.Mock()
pipeline.state = model.PipelineState.create(
pipeline, pipeline.state)
pipeline.change_list = model.PipelineChangeList.create(
pipeline)
context = ZKContext(self.zk_client, None, None, self.log)
# We refresh the change list here purely for the side effect
# of creating the pipeline state object with no data (the list
# is a subpath of the state object).
pipeline.change_list.refresh(context)
pipeline.state.refresh(context)
self.assertTrue(
self.zk_client.client.exists(pipeline.change_list.getPath()))
self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
def test_pipeline_change_list_new_object(self):
# Test the initialize-on-refresh code path with no existing object
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.state = model.PipelineState.create(
pipeline, pipeline.state)
pipeline.change_list = model.PipelineChangeList.create(
pipeline)
context = ZKContext(self.zk_client, None, None, self.log)
pipeline.change_list.refresh(context)
self.assertTrue(
self.zk_client.client.exists(pipeline.change_list.getPath()))
pipeline.manager = mock.Mock()
pipeline.state.refresh(context)
self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
def test_pipeline_change_list_new_object_without_lock(self):
# Test the initialize-on-refresh code path if we don't have
# the lock. This should fail.
tenant = model.Tenant('tenant')
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.state = model.PipelineState.create(
pipeline, pipeline.state)
pipeline.change_list = model.PipelineChangeList.create(
pipeline)
context = ZKContext(self.zk_client, None, None, self.log)
with testtools.ExpectedException(NoNodeError):
pipeline.change_list.refresh(context, allow_init=False)
self.assertIsNone(
self.zk_client.client.exists(pipeline.change_list.getPath()))
pipeline.manager = mock.Mock()
pipeline.state.refresh(context)
self.assertEqual(pipeline.state.layout_uuid, layout.uuid)