Stream API for sharded Zookeeper data
Provide 'RawShardedIO' and a buffered reader/writer that allow us to treat sharded content in Zookeeper as a simple byte stream. Change-Id: Ifa0ea33cfda325367b0c222ae1100074401028dc
This commit is contained in:
parent
3bb8684e93
commit
3061107fdc
|
@ -12,18 +12,25 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
import testtools
|
||||
|
||||
from zuul import model
|
||||
import zuul.zk.exceptions
|
||||
|
||||
from tests.base import BaseTestCase
|
||||
|
||||
from zuul import model
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
from zuul.zk.sharding import (
|
||||
RawShardIO,
|
||||
BufferedShardReader,
|
||||
BufferedShardWriter,
|
||||
NODE_BYTE_SIZE_LIMIT,
|
||||
)
|
||||
|
||||
|
||||
class TestZK(BaseTestCase):
|
||||
class ZooKeeperBaseTestCase(BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
@ -35,10 +42,16 @@ class TestZK(BaseTestCase):
|
|||
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
|
||||
tls_key=self.zk_chroot_fixture.zookeeper_key,
|
||||
tls_ca=self.zk_chroot_fixture.zookeeper_ca)
|
||||
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
|
||||
self.addCleanup(self.zk_client.disconnect)
|
||||
self.zk_client.connect()
|
||||
|
||||
|
||||
class TestNodepool(ZooKeeperBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
|
||||
|
||||
def _createRequest(self):
|
||||
req = model.HoldRequest()
|
||||
req.count = 1
|
||||
|
@ -72,8 +85,7 @@ class TestZK(BaseTestCase):
|
|||
# Test lock operations
|
||||
self.zk_nodepool.lockHoldRequest(req2, blocking=False)
|
||||
with testtools.ExpectedException(
|
||||
zuul.zk.exceptions.LockException,
|
||||
"Timeout trying to acquire lock .*"
|
||||
LockException, "Timeout trying to acquire lock .*"
|
||||
):
|
||||
self.zk_nodepool.lockHoldRequest(req2, blocking=True, timeout=2)
|
||||
self.zk_nodepool.unlockHoldRequest(req2)
|
||||
|
@ -82,3 +94,62 @@ class TestZK(BaseTestCase):
|
|||
# Test deleting the request
|
||||
self.zk_nodepool.deleteHoldRequest(req1)
|
||||
self.assertEqual([], self.zk_nodepool.getHoldRequests())
|
||||
|
||||
|
||||
class TestSharding(ZooKeeperBaseTestCase):
|
||||
|
||||
def test_reader(self):
|
||||
shard_io = RawShardIO(self.zk_client.client, "/test/shards")
|
||||
self.assertEqual(len(shard_io._shards), 0)
|
||||
|
||||
with BufferedShardReader(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_reader:
|
||||
self.assertEqual(shard_reader.read(), b"")
|
||||
shard_io.write(b"foobar")
|
||||
self.assertEqual(len(shard_io._shards), 1)
|
||||
self.assertEqual(shard_io.read(), b"foobar")
|
||||
|
||||
def test_writer(self):
|
||||
shard_io = RawShardIO(self.zk_client.client, "/test/shards")
|
||||
self.assertEqual(len(shard_io._shards), 0)
|
||||
|
||||
with BufferedShardWriter(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_writer:
|
||||
shard_writer.write(b"foobar")
|
||||
|
||||
self.assertEqual(len(shard_io._shards), 1)
|
||||
self.assertEqual(shard_io.read(), b"foobar")
|
||||
|
||||
def test_truncate(self):
|
||||
shard_io = RawShardIO(self.zk_client.client, "/test/shards")
|
||||
shard_io.write(b"foobar")
|
||||
self.assertEqual(len(shard_io._shards), 1)
|
||||
|
||||
with BufferedShardWriter(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_writer:
|
||||
shard_writer.truncate(0)
|
||||
|
||||
self.assertEqual(len(shard_io._shards), 0)
|
||||
|
||||
def test_shard_bytes_limit(self):
|
||||
with BufferedShardWriter(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_writer:
|
||||
shard_writer.write(b"x" * (NODE_BYTE_SIZE_LIMIT + 1))
|
||||
shard_writer.flush()
|
||||
self.assertEqual(len(shard_writer.raw._shards), 2)
|
||||
|
||||
def test_json(self):
|
||||
data = {"key": "value"}
|
||||
with BufferedShardWriter(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_io:
|
||||
shard_io.write(json.dumps(data).encode("utf8"))
|
||||
|
||||
with BufferedShardReader(
|
||||
self.zk_client.client, "/test/shards"
|
||||
) as shard_io:
|
||||
self.assertDictEqual(json.load(shard_io), data)
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
# Copyright 2020 BMW Group
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import io
|
||||
from contextlib import suppress
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
||||
# The default size limit for a node in Zookeeper is ~1MiB. However, as this
|
||||
# also includes the size of the key we can not use all of it for data.
|
||||
# Because of that we will leave ~47 KiB for the key.
|
||||
NODE_BYTE_SIZE_LIMIT = 1000000
|
||||
|
||||
|
||||
class RawShardIO(io.RawIOBase):
|
||||
def __init__(self, client, path):
|
||||
self.client = client
|
||||
self.shard_base = path
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
def writable(self):
|
||||
return True
|
||||
|
||||
def truncate(self, size=None):
|
||||
if size != 0:
|
||||
raise ValueError("Can only truncate to 0")
|
||||
with suppress(NoNodeError):
|
||||
self.client.delete(self.shard_base, recursive=True)
|
||||
|
||||
@property
|
||||
def _shards(self):
|
||||
try:
|
||||
return self.client.get_children(self.shard_base)
|
||||
except NoNodeError:
|
||||
return []
|
||||
|
||||
def _getData(self, path):
|
||||
data, _ = self.client.get(path)
|
||||
return data
|
||||
|
||||
def readall(self):
|
||||
read_buffer = io.BytesIO()
|
||||
for shard_name in sorted(self._shards):
|
||||
shard_path = "/".join((self.shard_base, shard_name))
|
||||
read_buffer.write(self._getData(shard_path))
|
||||
return read_buffer.getvalue()
|
||||
|
||||
def write(self, shard_data):
|
||||
byte_count = len(shard_data)
|
||||
# Only write one key at a time and defer writing the rest to the caller
|
||||
shard_bytes = bytes(shard_data[0:NODE_BYTE_SIZE_LIMIT])
|
||||
self.client.create(
|
||||
"{}/".format(self.shard_base),
|
||||
shard_bytes,
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
return min(byte_count, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
|
||||
class BufferedShardWriter(io.BufferedWriter):
|
||||
def __init__(self, client, path):
|
||||
super().__init__(RawShardIO(client, path), NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
|
||||
class BufferedShardReader(io.BufferedReader):
|
||||
def __init__(self, client, path):
|
||||
super().__init__(RawShardIO(client, path), NODE_BYTE_SIZE_LIMIT)
|
Loading…
Reference in New Issue