diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index 9ede59dd53..584ba1a981 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -222,3 +222,8 @@ Version 30 :Prior Zuul version: 10.2.0 :Description: Store playbook nesting_level and cleanup on frozen job. Affects schedulers and executors. + +Version 31 +---------- +:Prior Zuul version: 11.0.1 +:Description: Upgrade sharded zkobject format. diff --git a/tests/base.py b/tests/base.py index 72ec87f9b0..914486c888 100644 --- a/tests/base.py +++ b/tests/base.py @@ -156,6 +156,21 @@ def iterate_timeout(max_seconds, purpose): raise Exception("Timeout waiting for %s" % purpose) +def model_version(version): + """Specify a model version for a model upgrade test + + This creates a dummy scheduler component with the specified model + API version. The component is created before any other, so it + will appear to Zuul that it is joining an existing cluster with + data at the old version. + """ + + def decorator(test): + test.__model_version__ = version + return test + return decorator + + def simple_layout(path, driver='gerrit', enable_nodepool=False): """Specify a layout file for use by a test method. @@ -1894,6 +1909,17 @@ class BaseTestCase(testtools.TestCase): data = compressed_data return data + def setupModelPin(self): + # Add a fake scheduler to the system that is on the old model + # version. + test_name = self.id().split('.')[-1] + test = getattr(self, test_name) + if hasattr(test, '__model_version__'): + version = getattr(test, '__model_version__') + self.model_test_component_info = SchedulerComponent( + self.zk_client, 'test_component') + self.model_test_component_info.register(version) + class SymLink(object): def __init__(self, target): @@ -2164,17 +2190,6 @@ class ZuulTestCase(BaseTestCase): ) self.merge_server.start() - def _setupModelPin(self): - # Add a fake scheduler to the system that is on the old model - # version. - test_name = self.id().split('.')[-1] - test = getattr(self, test_name) - if hasattr(test, '__model_version__'): - version = getattr(test, '__model_version__') - self.model_test_component_info = SchedulerComponent( - self.zk_client, 'test_component') - self.model_test_component_info.register(version) - def setUp(self): super(ZuulTestCase, self).setUp() @@ -2277,7 +2292,7 @@ class ZuulTestCase(BaseTestCase): self.zk_client = ZooKeeperClient.fromConfig(self.config) self.zk_client.connect() - self._setupModelPin() + self.setupModelPin() self._context_lock = SessionAwareLock( self.zk_client.client, f"/test/{uuid.uuid4().hex}") diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index 2ec2136fd8..49009d0081 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -24,6 +24,7 @@ from tests.base import ( ZuulTestCase, simple_layout, iterate_timeout, + model_version, ) from zuul.zk import ZooKeeperClient from zuul.zk.branch_cache import BranchCache, BranchFlag @@ -31,21 +32,6 @@ from zuul.zk.zkobject import ZKContext from tests.unit.test_zk import DummyConnection -def model_version(version): - """Specify a model version for a model upgrade test - - This creates a dummy scheduler component with the specified model - API version. The component is created before any other, so it - will appear to Zuul that it is joining an existing cluster with - data at the old version. - """ - - def decorator(test): - test.__model_version__ = version - return test - return decorator - - class TestModelUpgrade(ZuulTestCase): tenant_config_file = "config/single-tenant/main-model-upgrade.yaml" scheduler_count = 1 diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 2c23239275..3eab7961f8 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -13,12 +13,15 @@ # License for the specific language governing permissions and limitations # under the License. +import binascii from collections import defaultdict import json +import math import queue import threading import time import uuid +import zlib from unittest import mock import testtools @@ -57,8 +60,11 @@ from zuul.zk.components import ( COMPONENT_REGISTRY ) from tests.base import ( - BaseTestCase, HoldableExecutorApi, HoldableMergerApi, - iterate_timeout + BaseTestCase, + HoldableExecutorApi, + HoldableMergerApi, + iterate_timeout, + model_version, ) from zuul.zk.zkobject import ( ShardedZKObject, PolymorphicZKObjectMixin, ZKObject, ZKContext @@ -82,6 +88,7 @@ class ZooKeeperBaseTestCase(BaseTestCase): tls_ca=self.zk_chroot_fixture.zookeeper_ca) self.addCleanup(self.zk_client.disconnect) self.zk_client.connect() + self.setupModelPin() self.component_registry = ComponentRegistry(self.zk_client) # We don't have any other component to initialize the global # registry in these tests, so we do it ourselves. @@ -209,6 +216,116 @@ class TestSharding(ZooKeeperBaseTestCase): ) as shard_io: self.assertDictEqual(json.load(shard_io), data) + def _test_write_old_read_new(self, shard_count): + # Write shards in the old format where each shard is + # compressed individually + data = b'{"key": "value"}' + data_shards = [] + shard_len = math.ceil(len(data) / shard_count) + for start in range(0, len(data), shard_len): + data_shards.append(data[start:start + shard_len]) + # Make sure we split them correctly + self.assertEqual(data, b''.join(data_shards)) + self.assertEqual(shard_count, len(data_shards)) + for shard in data_shards: + shard = zlib.compress(shard) + self.log.debug(f"{binascii.hexlify(shard)=}") + self.zk_client.client.create("/test/shards/", shard, + sequence=True, makepath=True) + + # Read shards, expecting the new format + with BufferedShardReader( + self.zk_client.client, "/test/shards" + ) as shard_io: + read_data = shard_io.read() + decompressed_data = zlib.decompress(read_data) + self.log.debug(f"{binascii.hexlify(read_data)=}") + self.log.debug(f"{decompressed_data=}") + self.assertEqual(decompressed_data, data) + + def test_write_old_read_new_1(self): + self._test_write_old_read_new(1) + + def test_write_old_read_new_2(self): + self._test_write_old_read_new(2) + + def test_write_old_read_new_3(self): + self._test_write_old_read_new(3) + + def _test_write_new_read_new(self, shard_count): + # Write shards in the new format + data = b'{"key": "value"}' + compressed_data = zlib.compress(data) + + data_shards = [] + shard_len = math.ceil(len(compressed_data) / shard_count) + for start in range(0, len(compressed_data), shard_len): + data_shards.append(compressed_data[start:start + shard_len]) + # Make sure we split them correctly + self.assertEqual(compressed_data, b''.join(data_shards)) + self.assertEqual(shard_count, len(data_shards)) + for shard in data_shards: + self.log.debug(f"{binascii.hexlify(shard)=}") + self.zk_client.client.create("/test/shards/", shard, + sequence=True, makepath=True) + + # Read shards, expecting the new format + with BufferedShardReader( + self.zk_client.client, "/test/shards" + ) as shard_io: + read_data = shard_io.read() + decompressed_data = zlib.decompress(read_data) + self.log.debug(f"{binascii.hexlify(read_data)=}") + self.log.debug(f"{decompressed_data=}") + self.assertEqual(decompressed_data, data) + + def test_write_new_read_new_1(self): + self._test_write_new_read_new(1) + + def test_write_new_read_new_2(self): + self._test_write_new_read_new(2) + + def test_write_new_read_new_3(self): + self._test_write_new_read_new(3) + + def _test_write_old_read_old(self, shard_count): + # Test that the writer can write in the old format + data = b'{"key": "value is longer"}' + compressed_data = zlib.compress(data) + + shard_len = math.ceil(len(compressed_data) / shard_count) + # We subtract 1024 from the size limit when writing the old + # format + size_limit = shard_len + 1024 + with mock.patch("zuul.zk.sharding.NODE_BYTE_SIZE_LIMIT", size_limit): + with BufferedShardWriter( + self.zk_client.client, "/test/shards" + ) as shard_writer: + shard_writer.write(compressed_data) + + read_shards = [] + for shard in sorted( + self.zk_client.client.get_children("/test/shards")): + self.log.debug(f"{shard=}") + read_data, _ = self.zk_client.client.get(f"/test/shards/{shard}") + self.log.debug(f"{binascii.hexlify(read_data)=}") + self.log.debug(f"{len(read_data)=}") + read_shards.append(zlib.decompress(read_data)) + self.assertEqual(b"".join(read_shards), data) + self.assertEqual(shard_count, len(read_shards)) + + @model_version(30) + def test_write_old_read_old_1(self): + self._test_write_old_read_old(1) + + @model_version(30) + def test_write_old_read_old_2(self): + self._test_write_old_read_old(2) + + @model_version(30) + def test_write_old_read_old_3(self): + self._test_write_old_read_old(3) + class TestUnparsedConfigCache(ZooKeeperBaseTestCase): diff --git a/zuul/model_api.py b/zuul/model_api.py index 79b25ca866..952a05f21f 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -14,4 +14,4 @@ # When making ZK schema changes, increment this and add a record to # doc/source/developer/model-changelog.rst -MODEL_API = 30 +MODEL_API = 31 diff --git a/zuul/zk/sharding.py b/zuul/zk/sharding.py index 563f8af6a5..9f2bf70661 100644 --- a/zuul/zk/sharding.py +++ b/zuul/zk/sharding.py @@ -1,4 +1,5 @@ # Copyright 2020 BMW Group +# Copyright 2024 Acme Gating, LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,6 +16,9 @@ import io from contextlib import suppress import time +import zlib + +from zuul.zk.components import COMPONENT_REGISTRY from kazoo.exceptions import NoNodeError @@ -77,6 +81,11 @@ class RawZKIO(io.RawIOBase): class RawShardIO(RawZKIO): + def __init__(self, *args, old_format=False, **kw): + # MODEL_API < 31 + self.old_format = old_format + super().__init__(*args, **kw) + def truncate(self, size=None): if size != 0: raise ValueError("Can only truncate to 0") @@ -91,17 +100,62 @@ class RawShardIO(RawZKIO): self.cumulative_read_time += time.perf_counter() - start return ret + def readall_old(self, shard0, shard1): + # Decompress each shard individually and then recompress them + # as a unit. + read_buffer = io.BytesIO() + read_buffer.write(zlib.decompress(shard0)) + read_buffer.write(zlib.decompress(shard1)) + for shard_count, shard_name in enumerate(sorted(self._shards)): + if shard_count < 2: + continue + shard_path = "/".join((self.path, shard_name)) + data = self._getData(shard_path)[0] + read_buffer.write(zlib.decompress(data)) + self.zstat = self.client.exists(self.path) + return zlib.compress(read_buffer.getvalue()) + def readall(self): read_buffer = io.BytesIO() - for shard_name in sorted(self._shards): + for shard_count, shard_name in enumerate(sorted(self._shards)): shard_path = "/".join((self.path, shard_name)) + data = self._getData(shard_path)[0] + if shard_count == 1 and data[:2] == b'\x78\x9c': + # If this is the second shard, and it starts with a + # zlib header, we're probably reading the old format. + # Double check that we can decompress it, and if so, + # switch to reading the old format. + try: + zlib.decompress(data) + return self.readall_old( + read_buffer.getvalue(), + data, + ) + except zlib.error: + # Perhaps we were wrong about the header + pass read_buffer.write(self._getData(shard_path)[0]) self.zstat = self.client.exists(self.path) return read_buffer.getvalue() def write(self, data): - # Only write one key at a time and defer writing the rest to the caller + # Only write one znode at a time and defer writing the rest to + # the caller data_bytes = bytes(data[0:NODE_BYTE_SIZE_LIMIT]) + read_len = len(data_bytes) + # MODEL_API < 31 + if self.old_format: + # We're going to add a header and footer that is several + # bytes, so we definitely need to reduce the size a little + # bit, but the old format would end up with a considerable + # amount of headroom due to compressing after chunking, so + # lets go ahead and reserve 1k of space. + new_limit = NODE_BYTE_SIZE_LIMIT - 1024 + data_bytes = data_bytes[0:new_limit] + # Update our return value to indicate how many bytes we + # actually read from input. + read_len = len(data_bytes) + data_bytes = zlib.compress(data_bytes) if not (len(data_bytes) <= NODE_BYTE_SIZE_LIMIT): raise RuntimeError("Shard too large") start = time.perf_counter() @@ -116,7 +170,7 @@ class RawShardIO(RawZKIO): self.znodes_written += 1 if self.zstat is None: self.zstat = self.client.exists(self.path) - return len(data_bytes) + return read_len class BufferedZKWriter(io.BufferedWriter): @@ -165,7 +219,9 @@ class BufferedZKReader(io.BufferedReader): class BufferedShardWriter(io.BufferedWriter): def __init__(self, client, path, create=False, version=-1): - self.__raw = RawShardIO(client, path, create=create, version=version) + self.__old_format = COMPONENT_REGISTRY.model_api < 31 + self.__raw = RawShardIO(client, path, create=create, version=version, + old_format=self.__old_format) super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT) @property @@ -184,6 +240,12 @@ class BufferedShardWriter(io.BufferedWriter): def zstat(self): return self.__raw.zstat + def write(self, data): + # MODEL_API < 31 + if self.__old_format and data[:2] == b'\x78\x9c': + data = zlib.decompress(data) + return super().write(data) + class BufferedShardReader(io.BufferedReader): def __init__(self, client, path):