Add upgrade handling for new zkobject sharding
Change I513976a703f035545eaf5bd05a381dbc0c509cda updated the format for zkobjects. The old format chunks the data and then compresses each chunk with zlib; the new format compressed the entire data then chunks the compressed data. The change did not take into account the leading and trailing zlib metadata for each chunk. To correct this, we will add a model api revision so that we can know whether we should write the old or new formats. As for reading, we will always assume the new format to start, but on the second shard, we will check for a zlib header, and if we find it, we will switch to reading the old format. This will allow us to support reading old data for some time after the upgrade. For any systems that have already upgraded between the previous commit and this one, if they haven't encountered problems, then they are likely able to store data without sharding; the old and new data formats for a single shard are identical, so some flapping between the old and new versions on the "write" side during the model upgrade is unlikely to cause long-term problems. Change-Id: Ic44d340685bbaf79c43d4ca4848536bd33b26ed3 Co-Authored-By: Simon Westphahl <simon.westphahl@bmw.de>
This commit is contained in:
parent
d6b185fa95
commit
ffb548f367
@ -222,3 +222,8 @@ Version 30
|
||||
:Prior Zuul version: 10.2.0
|
||||
:Description: Store playbook nesting_level and cleanup on frozen job.
|
||||
Affects schedulers and executors.
|
||||
|
||||
Version 31
|
||||
----------
|
||||
:Prior Zuul version: 11.0.1
|
||||
:Description: Upgrade sharded zkobject format.
|
||||
|
@ -156,6 +156,21 @@ def iterate_timeout(max_seconds, purpose):
|
||||
raise Exception("Timeout waiting for %s" % purpose)
|
||||
|
||||
|
||||
def model_version(version):
|
||||
"""Specify a model version for a model upgrade test
|
||||
|
||||
This creates a dummy scheduler component with the specified model
|
||||
API version. The component is created before any other, so it
|
||||
will appear to Zuul that it is joining an existing cluster with
|
||||
data at the old version.
|
||||
"""
|
||||
|
||||
def decorator(test):
|
||||
test.__model_version__ = version
|
||||
return test
|
||||
return decorator
|
||||
|
||||
|
||||
def simple_layout(path, driver='gerrit', enable_nodepool=False):
|
||||
"""Specify a layout file for use by a test method.
|
||||
|
||||
@ -1894,6 +1909,17 @@ class BaseTestCase(testtools.TestCase):
|
||||
data = compressed_data
|
||||
return data
|
||||
|
||||
def setupModelPin(self):
|
||||
# Add a fake scheduler to the system that is on the old model
|
||||
# version.
|
||||
test_name = self.id().split('.')[-1]
|
||||
test = getattr(self, test_name)
|
||||
if hasattr(test, '__model_version__'):
|
||||
version = getattr(test, '__model_version__')
|
||||
self.model_test_component_info = SchedulerComponent(
|
||||
self.zk_client, 'test_component')
|
||||
self.model_test_component_info.register(version)
|
||||
|
||||
|
||||
class SymLink(object):
|
||||
def __init__(self, target):
|
||||
@ -2164,17 +2190,6 @@ class ZuulTestCase(BaseTestCase):
|
||||
)
|
||||
self.merge_server.start()
|
||||
|
||||
def _setupModelPin(self):
|
||||
# Add a fake scheduler to the system that is on the old model
|
||||
# version.
|
||||
test_name = self.id().split('.')[-1]
|
||||
test = getattr(self, test_name)
|
||||
if hasattr(test, '__model_version__'):
|
||||
version = getattr(test, '__model_version__')
|
||||
self.model_test_component_info = SchedulerComponent(
|
||||
self.zk_client, 'test_component')
|
||||
self.model_test_component_info.register(version)
|
||||
|
||||
def setUp(self):
|
||||
super(ZuulTestCase, self).setUp()
|
||||
|
||||
@ -2277,7 +2292,7 @@ class ZuulTestCase(BaseTestCase):
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
|
||||
self._setupModelPin()
|
||||
self.setupModelPin()
|
||||
|
||||
self._context_lock = SessionAwareLock(
|
||||
self.zk_client.client, f"/test/{uuid.uuid4().hex}")
|
||||
|
@ -24,6 +24,7 @@ from tests.base import (
|
||||
ZuulTestCase,
|
||||
simple_layout,
|
||||
iterate_timeout,
|
||||
model_version,
|
||||
)
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.branch_cache import BranchCache, BranchFlag
|
||||
@ -31,21 +32,6 @@ from zuul.zk.zkobject import ZKContext
|
||||
from tests.unit.test_zk import DummyConnection
|
||||
|
||||
|
||||
def model_version(version):
|
||||
"""Specify a model version for a model upgrade test
|
||||
|
||||
This creates a dummy scheduler component with the specified model
|
||||
API version. The component is created before any other, so it
|
||||
will appear to Zuul that it is joining an existing cluster with
|
||||
data at the old version.
|
||||
"""
|
||||
|
||||
def decorator(test):
|
||||
test.__model_version__ = version
|
||||
return test
|
||||
return decorator
|
||||
|
||||
|
||||
class TestModelUpgrade(ZuulTestCase):
|
||||
tenant_config_file = "config/single-tenant/main-model-upgrade.yaml"
|
||||
scheduler_count = 1
|
||||
|
@ -13,12 +13,15 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import binascii
|
||||
from collections import defaultdict
|
||||
import json
|
||||
import math
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
import zlib
|
||||
from unittest import mock
|
||||
|
||||
import testtools
|
||||
@ -57,8 +60,11 @@ from zuul.zk.components import (
|
||||
COMPONENT_REGISTRY
|
||||
)
|
||||
from tests.base import (
|
||||
BaseTestCase, HoldableExecutorApi, HoldableMergerApi,
|
||||
iterate_timeout
|
||||
BaseTestCase,
|
||||
HoldableExecutorApi,
|
||||
HoldableMergerApi,
|
||||
iterate_timeout,
|
||||
model_version,
|
||||
)
|
||||
from zuul.zk.zkobject import (
|
||||
ShardedZKObject, PolymorphicZKObjectMixin, ZKObject, ZKContext
|
||||
@ -82,6 +88,7 @@ class ZooKeeperBaseTestCase(BaseTestCase):
|
||||
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
|
||||
self.addCleanup(self.zk_client.disconnect)
|
||||
self.zk_client.connect()
|
||||
self.setupModelPin()
|
||||
self.component_registry = ComponentRegistry(self.zk_client)
|
||||
# We don't have any other component to initialize the global
|
||||
# registry in these tests, so we do it ourselves.
|
||||
@ -209,6 +216,116 @@ class TestSharding(ZooKeeperBaseTestCase):
|
||||
) as shard_io:
|
||||
self.assertDictEqual(json.load(shard_io), data)
|
||||
|
||||
def _test_write_old_read_new(self, shard_count):
|
||||
# Write shards in the old format where each shard is
|
||||
# compressed individually
|
||||
data = b'{"key": "value"}'
|
||||
data_shards = []
|
||||
shard_len = math.ceil(len(data) / shard_count)
|
||||
for start in range(0, len(data), shard_len):
|
||||
data_shards.append(data[start:start + shard_len])
|
||||
# Make sure we split them correctly
|
||||
self.assertEqual(data, b''.join(data_shards))
|
||||
self.assertEqual(shard_count, len(data_shards))
|
||||
for shard in data_shards:
|
||||
shard = zlib.compress(shard)
|
||||
self.log.debug(f"{binascii.hexlify(shard)=}")
|
||||
self.zk_client.client.create("/test/shards/", shard,
|
||||
sequence=True, makepath=True)
|
||||
|
||||
# Read shards, expecting the new format
|
||||
with BufferedShardReader(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_io:
|
||||
read_data = shard_io.read()
|
||||
decompressed_data = zlib.decompress(read_data)
|
||||
self.log.debug(f"{binascii.hexlify(read_data)=}")
|
||||
self.log.debug(f"{decompressed_data=}")
|
||||
self.assertEqual(decompressed_data, data)
|
||||
|
||||
def test_write_old_read_new_1(self):
|
||||
self._test_write_old_read_new(1)
|
||||
|
||||
def test_write_old_read_new_2(self):
|
||||
self._test_write_old_read_new(2)
|
||||
|
||||
def test_write_old_read_new_3(self):
|
||||
self._test_write_old_read_new(3)
|
||||
|
||||
def _test_write_new_read_new(self, shard_count):
|
||||
# Write shards in the new format
|
||||
data = b'{"key": "value"}'
|
||||
compressed_data = zlib.compress(data)
|
||||
|
||||
data_shards = []
|
||||
shard_len = math.ceil(len(compressed_data) / shard_count)
|
||||
for start in range(0, len(compressed_data), shard_len):
|
||||
data_shards.append(compressed_data[start:start + shard_len])
|
||||
# Make sure we split them correctly
|
||||
self.assertEqual(compressed_data, b''.join(data_shards))
|
||||
self.assertEqual(shard_count, len(data_shards))
|
||||
for shard in data_shards:
|
||||
self.log.debug(f"{binascii.hexlify(shard)=}")
|
||||
self.zk_client.client.create("/test/shards/", shard,
|
||||
sequence=True, makepath=True)
|
||||
|
||||
# Read shards, expecting the new format
|
||||
with BufferedShardReader(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_io:
|
||||
read_data = shard_io.read()
|
||||
decompressed_data = zlib.decompress(read_data)
|
||||
self.log.debug(f"{binascii.hexlify(read_data)=}")
|
||||
self.log.debug(f"{decompressed_data=}")
|
||||
self.assertEqual(decompressed_data, data)
|
||||
|
||||
def test_write_new_read_new_1(self):
|
||||
self._test_write_new_read_new(1)
|
||||
|
||||
def test_write_new_read_new_2(self):
|
||||
self._test_write_new_read_new(2)
|
||||
|
||||
def test_write_new_read_new_3(self):
|
||||
self._test_write_new_read_new(3)
|
||||
|
||||
def _test_write_old_read_old(self, shard_count):
|
||||
# Test that the writer can write in the old format
|
||||
data = b'{"key": "value is longer"}'
|
||||
compressed_data = zlib.compress(data)
|
||||
|
||||
shard_len = math.ceil(len(compressed_data) / shard_count)
|
||||
# We subtract 1024 from the size limit when writing the old
|
||||
# format
|
||||
size_limit = shard_len + 1024
|
||||
with mock.patch("zuul.zk.sharding.NODE_BYTE_SIZE_LIMIT", size_limit):
|
||||
with BufferedShardWriter(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_writer:
|
||||
shard_writer.write(compressed_data)
|
||||
|
||||
read_shards = []
|
||||
for shard in sorted(
|
||||
self.zk_client.client.get_children("/test/shards")):
|
||||
self.log.debug(f"{shard=}")
|
||||
read_data, _ = self.zk_client.client.get(f"/test/shards/{shard}")
|
||||
self.log.debug(f"{binascii.hexlify(read_data)=}")
|
||||
self.log.debug(f"{len(read_data)=}")
|
||||
read_shards.append(zlib.decompress(read_data))
|
||||
self.assertEqual(b"".join(read_shards), data)
|
||||
self.assertEqual(shard_count, len(read_shards))
|
||||
|
||||
@model_version(30)
|
||||
def test_write_old_read_old_1(self):
|
||||
self._test_write_old_read_old(1)
|
||||
|
||||
@model_version(30)
|
||||
def test_write_old_read_old_2(self):
|
||||
self._test_write_old_read_old(2)
|
||||
|
||||
@model_version(30)
|
||||
def test_write_old_read_old_3(self):
|
||||
self._test_write_old_read_old(3)
|
||||
|
||||
|
||||
class TestUnparsedConfigCache(ZooKeeperBaseTestCase):
|
||||
|
||||
|
@ -14,4 +14,4 @@
|
||||
|
||||
# When making ZK schema changes, increment this and add a record to
|
||||
# doc/source/developer/model-changelog.rst
|
||||
MODEL_API = 30
|
||||
MODEL_API = 31
|
||||
|
@ -1,4 +1,5 @@
|
||||
# Copyright 2020 BMW Group
|
||||
# Copyright 2024 Acme Gating, LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -15,6 +16,9 @@
|
||||
import io
|
||||
from contextlib import suppress
|
||||
import time
|
||||
import zlib
|
||||
|
||||
from zuul.zk.components import COMPONENT_REGISTRY
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
||||
@ -77,6 +81,11 @@ class RawZKIO(io.RawIOBase):
|
||||
|
||||
|
||||
class RawShardIO(RawZKIO):
|
||||
def __init__(self, *args, old_format=False, **kw):
|
||||
# MODEL_API < 31
|
||||
self.old_format = old_format
|
||||
super().__init__(*args, **kw)
|
||||
|
||||
def truncate(self, size=None):
|
||||
if size != 0:
|
||||
raise ValueError("Can only truncate to 0")
|
||||
@ -91,17 +100,62 @@ class RawShardIO(RawZKIO):
|
||||
self.cumulative_read_time += time.perf_counter() - start
|
||||
return ret
|
||||
|
||||
def readall_old(self, shard0, shard1):
|
||||
# Decompress each shard individually and then recompress them
|
||||
# as a unit.
|
||||
read_buffer = io.BytesIO()
|
||||
read_buffer.write(zlib.decompress(shard0))
|
||||
read_buffer.write(zlib.decompress(shard1))
|
||||
for shard_count, shard_name in enumerate(sorted(self._shards)):
|
||||
if shard_count < 2:
|
||||
continue
|
||||
shard_path = "/".join((self.path, shard_name))
|
||||
data = self._getData(shard_path)[0]
|
||||
read_buffer.write(zlib.decompress(data))
|
||||
self.zstat = self.client.exists(self.path)
|
||||
return zlib.compress(read_buffer.getvalue())
|
||||
|
||||
def readall(self):
|
||||
read_buffer = io.BytesIO()
|
||||
for shard_name in sorted(self._shards):
|
||||
for shard_count, shard_name in enumerate(sorted(self._shards)):
|
||||
shard_path = "/".join((self.path, shard_name))
|
||||
data = self._getData(shard_path)[0]
|
||||
if shard_count == 1 and data[:2] == b'\x78\x9c':
|
||||
# If this is the second shard, and it starts with a
|
||||
# zlib header, we're probably reading the old format.
|
||||
# Double check that we can decompress it, and if so,
|
||||
# switch to reading the old format.
|
||||
try:
|
||||
zlib.decompress(data)
|
||||
return self.readall_old(
|
||||
read_buffer.getvalue(),
|
||||
data,
|
||||
)
|
||||
except zlib.error:
|
||||
# Perhaps we were wrong about the header
|
||||
pass
|
||||
read_buffer.write(self._getData(shard_path)[0])
|
||||
self.zstat = self.client.exists(self.path)
|
||||
return read_buffer.getvalue()
|
||||
|
||||
def write(self, data):
|
||||
# Only write one key at a time and defer writing the rest to the caller
|
||||
# Only write one znode at a time and defer writing the rest to
|
||||
# the caller
|
||||
data_bytes = bytes(data[0:NODE_BYTE_SIZE_LIMIT])
|
||||
read_len = len(data_bytes)
|
||||
# MODEL_API < 31
|
||||
if self.old_format:
|
||||
# We're going to add a header and footer that is several
|
||||
# bytes, so we definitely need to reduce the size a little
|
||||
# bit, but the old format would end up with a considerable
|
||||
# amount of headroom due to compressing after chunking, so
|
||||
# lets go ahead and reserve 1k of space.
|
||||
new_limit = NODE_BYTE_SIZE_LIMIT - 1024
|
||||
data_bytes = data_bytes[0:new_limit]
|
||||
# Update our return value to indicate how many bytes we
|
||||
# actually read from input.
|
||||
read_len = len(data_bytes)
|
||||
data_bytes = zlib.compress(data_bytes)
|
||||
if not (len(data_bytes) <= NODE_BYTE_SIZE_LIMIT):
|
||||
raise RuntimeError("Shard too large")
|
||||
start = time.perf_counter()
|
||||
@ -116,7 +170,7 @@ class RawShardIO(RawZKIO):
|
||||
self.znodes_written += 1
|
||||
if self.zstat is None:
|
||||
self.zstat = self.client.exists(self.path)
|
||||
return len(data_bytes)
|
||||
return read_len
|
||||
|
||||
|
||||
class BufferedZKWriter(io.BufferedWriter):
|
||||
@ -165,7 +219,9 @@ class BufferedZKReader(io.BufferedReader):
|
||||
|
||||
class BufferedShardWriter(io.BufferedWriter):
|
||||
def __init__(self, client, path, create=False, version=-1):
|
||||
self.__raw = RawShardIO(client, path, create=create, version=version)
|
||||
self.__old_format = COMPONENT_REGISTRY.model_api < 31
|
||||
self.__raw = RawShardIO(client, path, create=create, version=version,
|
||||
old_format=self.__old_format)
|
||||
super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
@property
|
||||
@ -184,6 +240,12 @@ class BufferedShardWriter(io.BufferedWriter):
|
||||
def zstat(self):
|
||||
return self.__raw.zstat
|
||||
|
||||
def write(self, data):
|
||||
# MODEL_API < 31
|
||||
if self.__old_format and data[:2] == b'\x78\x9c':
|
||||
data = zlib.decompress(data)
|
||||
return super().write(data)
|
||||
|
||||
|
||||
class BufferedShardReader(io.BufferedReader):
|
||||
def __init__(self, client, path):
|
||||
|
Loading…
x
Reference in New Issue
Block a user