Shard event queue data

This extracts the event data from the queue entries and stores
it in a separate location, sharded and compressed.  This can
reduce ZK traffic and avoid disconnects with very large event
and result data.

We only shard when necessary, as sharding all events has a
significant performance impact.

Change-Id: I121399cd40f7bc5eeed899064198927efda2e485
This commit is contained in:
James E. Blair 2021-07-19 16:43:39 -07:00
parent fc78352b18
commit 4428cf73a3
3 changed files with 147 additions and 23 deletions

View File

@ -19,7 +19,7 @@ import testtools
from zuul import model from zuul import model
from zuul.driver import Driver, TriggerInterface from zuul.driver import Driver, TriggerInterface
from zuul.lib.connections import ConnectionRegistry from zuul.lib.connections import ConnectionRegistry
from zuul.zk import ZooKeeperClient, event_queues from zuul.zk import ZooKeeperClient, event_queues, sharding
from tests.base import BaseTestCase, iterate_timeout from tests.base import BaseTestCase, iterate_timeout
@ -128,6 +128,44 @@ class TestTriggerEventQueue(EventQueueBaseTestCase):
self.driver = DummyDriver() self.driver = DummyDriver()
self.connections.registerDriver(self.driver) self.connections.registerDriver(self.driver)
def test_sharded_tenant_trigger_events(self):
# Test enqueue/dequeue of the tenant trigger event queue.
queue = event_queues.TenantTriggerEventQueue(
self.zk_client, self.connections, "tenant"
)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
event = DummyTriggerEvent()
data = {'test': "x" * (sharding.NODE_BYTE_SIZE_LIMIT + 1)}
event.data = data
queue.put(self.driver.driver_name, event)
queue.put(self.driver.driver_name, event)
self.assertEqual(len(queue), 2)
self.assertTrue(queue.hasEvents())
processed = 0
for event in queue:
self.assertIsInstance(event, DummyTriggerEvent)
processed += 1
self.assertEqual(processed, 2)
self.assertEqual(len(queue), 2)
self.assertTrue(queue.hasEvents())
acked = 0
for event in queue:
queue.ack(event)
self.assertEqual(event.data, data)
acked += 1
self.assertEqual(acked, 2)
self.assertEqual(len(queue), 0)
self.assertFalse(queue.hasEvents())
def test_tenant_trigger_events(self): def test_tenant_trigger_events(self):
# Test enqueue/dequeue of the tenant trigger event queue. # Test enqueue/dequeue of the tenant trigger event queue.
queue = event_queues.TenantTriggerEventQueue( queue = event_queues.TenantTriggerEventQueue(

View File

@ -194,6 +194,7 @@ class ZooKeeperClient(object):
for res in results: for res in results:
if isinstance(res, Exception): if isinstance(res, Exception):
raise res raise res
return results
def getCurrentLtime(self): def getCurrentLtime(self):
"""Get the logical timestamp as seen by the Zookeeper cluster.""" """Get the logical timestamp as seen by the Zookeeper cluster."""

View File

@ -29,7 +29,7 @@ from kazoo.recipe.election import Election
from zuul import model from zuul import model
from zuul.lib.collections import DefaultKeyDict from zuul.lib.collections import DefaultKeyDict
from zuul.zk import ZooKeeperSimpleBase from zuul.zk import ZooKeeperSimpleBase, sharding
RESULT_EVENT_TYPE_MAP = { RESULT_EVENT_TYPE_MAP = {
"BuildCompletedEvent": model.BuildCompletedEvent, "BuildCompletedEvent": model.BuildCompletedEvent,
@ -69,6 +69,7 @@ PIPELINE_MANAGEMENT_ROOT = PIPELINE_NAME_ROOT + "/management"
PIPELINE_TRIGGER_ROOT = PIPELINE_NAME_ROOT + "/trigger" PIPELINE_TRIGGER_ROOT = PIPELINE_NAME_ROOT + "/trigger"
PIPELINE_RESULT_ROOT = PIPELINE_NAME_ROOT + "/result" PIPELINE_RESULT_ROOT = PIPELINE_NAME_ROOT + "/result"
EVENT_DATA_ROOT = "/zuul/events/data"
CONNECTION_ROOT = "/zuul/events/connection" CONNECTION_ROOT = "/zuul/events/connection"
# This is the path to the serialized from of the event in ZK (along # This is the path to the serialized from of the event in ZK (along
@ -185,6 +186,7 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
super().__init__(client) super().__init__(client)
self.event_root = event_root self.event_root = event_root
self.kazoo_client.ensure_path(self.event_root) self.kazoo_client.ensure_path(self.event_root)
self.kazoo_client.ensure_path(EVENT_DATA_ROOT)
def _listEvents(self): def _listEvents(self):
return self.kazoo_client.get_children(self.event_root) return self.kazoo_client.get_children(self.event_root)
@ -204,26 +206,63 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
# event object when it was de-serialized. # event object when it was de-serialized.
if not event.ack_ref: if not event.ack_ref:
raise RuntimeError("Cannot ack event %s without reference", event) raise RuntimeError("Cannot ack event %s without reference", event)
try: if not self._remove(event.ack_ref.path, event.ack_ref.version):
self.kazoo_client.delete(
event.ack_ref.path,
version=event.ack_ref.version,
recursive=True,
)
except NoNodeError:
self.log.warning("Event %s was already acknowledged", event) self.log.warning("Event %s was already acknowledged", event)
@property
def _event_create_path(self):
return f"{self.event_root}/"
def _put(self, data): def _put(self, data):
return self.kazoo_client.create( # Within a transaction, we need something after the / in order
self._event_create_path, # to cause the sequence node to be created under the node
json.dumps(data).encode("utf-8"), # before the /. So we use "seq" for that. This will produce
sequence=True, # paths like event_root/seq-0000.
makepath=True, event_path = f"{self.event_root}/seq"
)
# Event data can be large, so we want to shard it. But events
# also need to be atomic (we don't want an event listener to
# start processing a half-stored event). A natural solution
# is to use a ZK transaction to write the sharded data along
# with the event. However, our events are sequence nodes in
# order to maintain ordering, and we can't use our sharding
# helper to write shards underneath a sequence node inside the
# transaction because we don't know the path of the sequence
# node within the transaction. To resolve this, we store the
# event data in two places: the event itself and associated
# metadata are in the event queue as a single sequence node.
# The event data are stored in a separate tree under a uuid.
# The event metadata includes the UUID of the data. We call
# the separated data "side channel data" to indicate it's
# stored outside the main event queue.
#
# To make the API simpler to work with, we assume "event_data"
# contains the bulk of the data. We extract it here into the
# side channel data, then in _iterEvents we re-constitute it
# into the dictionary.
side_channel_data = None
encoded_data = json.dumps(data).encode("utf-8")
if (len(encoded_data) > sharding.NODE_BYTE_SIZE_LIMIT
and 'event_data' in data):
# Get a unique data node
data_id = str(uuid.uuid4())
data_root = f'{EVENT_DATA_ROOT}/{data_id}'
data_path = f'{data_root}/seq'
side_channel_data = json.dumps(data['event_data']).encode("utf-8")
data = data.copy()
del data['event_data']
data['event_data_path'] = data_root
encoded_data = json.dumps(data).encode("utf-8")
tr = self.kazoo_client.transaction()
tr.create(data_root)
with sharding.BufferedShardWriter(tr, data_path) as stream:
stream.write(side_channel_data)
tr.create(event_path, encoded_data, sequence=True)
resp = self.client.commitTransaction(tr)
return resp[-1]
else:
return self.kazoo_client.create(
event_path, encoded_data, sequence=True)
def _iterEvents(self): def _iterEvents(self):
try: try:
@ -235,7 +274,8 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
for event_id in events: for event_id in events:
path = "/".join((self.event_root, event_id)) path = "/".join((self.event_root, event_id))
# TODO: implement sharding of large events
# Load the event metadata
data, zstat = self.kazoo_client.get(path) data, zstat = self.kazoo_client.get(path)
try: try:
event = json.loads(data) event = json.loads(data)
@ -243,11 +283,54 @@ class ZooKeeperEventQueue(ZooKeeperSimpleBase, Iterable):
self.log.exception("Malformed event data in %s", path) self.log.exception("Malformed event data in %s", path)
self._remove(path) self._remove(path)
continue continue
# Load the event data (if present); if that fails, the
# event is corrupt; delete and continue.
side_channel_path = event.get('event_data_path')
if side_channel_path:
try:
with sharding.BufferedShardReader(
self.kazoo_client, side_channel_path) as stream:
side_channel_data = stream.read()
except NoNodeError:
self.log.exception("Side channel data for %s "
"not found at %s",
path, side_channel_path)
self._remove(path)
continue
try:
event_data = json.loads(side_channel_data)
except json.JSONDecodeError:
self.log.exception("Malformed side channel "
"event data in %s",
side_channel_path)
self._remove(path)
continue
event['event_data'] = event_data
yield event, EventAckRef(path, zstat.version), zstat yield event, EventAckRef(path, zstat.version), zstat
def _remove(self, path, version=UNKNOWN_ZVERSION): def _remove(self, path, version=UNKNOWN_ZVERSION):
with suppress(NoNodeError): try:
# Find the side channel path
side_channel_path = None
data, zstat = self.kazoo_client.get(path)
try:
event = json.loads(data)
side_channel_path = event.get('event_data_path')
except json.JSONDecodeError:
pass
if side_channel_path:
with suppress(NoNodeError):
self.kazoo_client.delete(side_channel_path, recursive=True)
self.kazoo_client.delete(path, version=version, recursive=True) self.kazoo_client.delete(path, version=version, recursive=True)
return True
except NoNodeError:
return False
class ManagementEventResultFuture(ZooKeeperSimpleBase): class ManagementEventResultFuture(ZooKeeperSimpleBase):
@ -608,7 +691,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
def put(self, data): def put(self, data):
self.log.debug("Submitting connection event to queue %s: %s", self.log.debug("Submitting connection event to queue %s: %s",
self.event_root, data) self.event_root, data)
self._put(data) self._put({'event_data': data})
def __iter__(self): def __iter__(self):
for data, ack_ref, _ in self._iterEvents(): for data, ack_ref, _ in self._iterEvents():
@ -616,7 +699,9 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
self.log.warning("Malformed event found: %s", data) self.log.warning("Malformed event found: %s", data)
self._remove(ack_ref.path) self._remove(ack_ref.path)
continue continue
event = model.ConnectionEvent.fromDict(data) # TODO: We can assume event_data exists after 4.6.1 is released
event = model.ConnectionEvent.fromDict(
data.get('event_data', data))
event.ack_ref = ack_ref event.ack_ref = ack_ref
yield event yield event