Use JobData for Build result data

The result data from a job may be quite large, which is why we
offload it to a sharded ResultData object.  It may also be
nonexistent, which is why we just set it to null and skip the
offloading in that case.

The JobData class that we use for FrozenJobs has two extra
optimizations: if the data size is small, we just store it
inline instead of creating a small single-node shard; and if
we do shard, we store a hash inline so we only refresh if
necessary.

Update the Build object to use JobData instances instead of
ResultData for the build result data so that we can benefit
from the additional optimizations.

This is, surprisingly, the first time we have had to check the
cluster model api within model.py, so this change also performs
a slight rearrangement of the MODEL_API constant to avoid
circular imports.

Change-Id: I2053bd0327dfcad1d4246b158098033e188e2ed4
This commit is contained in:
James E. Blair 2022-03-03 15:26:52 -08:00
parent 082ff730fe
commit 208b936530
5 changed files with 141 additions and 48 deletions

View File

@ -6,7 +6,7 @@ increases here.
When making a model change:
* Increment the value of ``MODEL_API`` in ``model.py``.
* Increment the value of ``MODEL_API`` in ``model_api.py``.
* Update code to use the new API by default and add
backwards-compatibility handling for older versions. This makes it
easier to clean up backwards-compatibility handling in the future.
@ -54,3 +54,12 @@ Version 4
:Description: Adds QueueItem.dequeued_missing_requirements and sets it to True
if a change no longer meets merge requirements in dependent
pipelines. This only affects schedulers.
Version 5
---------
:Prior Zuul version: 5.1.0
:Description: Changes the result data attributes on Build from
ResultData to JobData instances and uses the
inline/offloading paradigm from FrozenJob. This affects
schedulers and executors.

View File

@ -100,6 +100,47 @@ class TestModelUpgrade(ZuulTestCase):
dict(name='test-job', result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(4)
def test_model_4_5(self):
# Changes share a queue, but with only one job, the first
# merges before the second starts.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
fake_data = [
{'name': 'image',
'url': 'http://example.com/image',
'metadata': {
'type': 'container_image'
}},
]
self.executor_server.returnData(
'project-merge', A,
{'zuul': {'artifacts': fake_data}}
)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = 5
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
# Verify that the child job got the data from the parent
test1 = self.getJobFromHistory('project-test1')
self.assertEqual(fake_data[0]['url'],
test1.parameters['zuul']['artifacts'][0]['url'])
class TestSemaphoreModelUpgrade(ZuulTestCase):
tenant_config_file = 'config/semaphore/main.yaml'

View File

@ -47,10 +47,7 @@ from zuul.lib.capabilities import capabilities_registry
from zuul.lib.jsonutil import json_dumps
from zuul.zk import zkobject
from zuul.zk.change_cache import ChangeKey
# When making ZK schema changes, increment this and add a record to
# docs/developer/model-changelog.rst
MODEL_API = 4
from zuul.zk.components import COMPONENT_REGISTRY
MERGER_MERGE = 1 # "git merge"
MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
@ -3351,26 +3348,6 @@ class BuildRequest(JobRequest):
)
class ResultData(zkobject.ShardedZKObject):
# If the node exists already, it is probably a half-written state
# from a crash; truncate it and continue.
truncate_on_create = True
def __init__(self):
super().__init__()
self._set(data={})
def getPath(self):
return self._path
def serialize(self, context):
data = {
"data": self.data,
"_path": self._path,
}
return json.dumps(data, sort_keys=True).encode("utf8")
class Build(zkobject.ZKObject):
"""A Build is an instance of a single execution of a Job.
@ -3379,8 +3356,16 @@ class Build(zkobject.ZKObject):
Job (related builds are grouped together in a BuildSet).
"""
# If data/variables are more than 10k, we offload them to another
# object, otherwise we store them on this one.
MAX_DATA_LEN = 10 * 1024
log = logging.getLogger("zuul.Build")
job_data_attributes = ('result_data',
'secret_result_data',
)
def __init__(self):
super().__init__()
self._set(
@ -3409,10 +3394,6 @@ class Build(zkobject.ZKObject):
"uuid": self.uuid,
"url": self.url,
"result": self.result,
"_result_data": (self._result_data.getPath()
if self._result_data else None),
"_secret_result_data": (self._secret_result_data.getPath()
if self._secret_result_data else None),
"error_detail": self.error_detail,
"execute_time": self.execute_time,
"start_time": self.start_time,
@ -3425,20 +3406,63 @@ class Build(zkobject.ZKObject):
"zuul_event_id": self.zuul_event_id,
"build_request_ref": self.build_request_ref,
}
if COMPONENT_REGISTRY.model_api < 5:
data["_result_data"] = (self._result_data.getPath()
if self._result_data else None)
data["_secret_result_data"] = (
self._secret_result_data.getPath()
if self._secret_result_data else None)
else:
for k in self.job_data_attributes:
v = getattr(self, '_' + k)
if isinstance(v, JobData):
v = {'storage': 'offload', 'path': v.getPath(),
'hash': v.hash}
else:
v = {'storage': 'local', 'data': v}
data[k] = v
return json.dumps(data, sort_keys=True).encode("utf8")
def deserialize(self, raw, context):
data = super().deserialize(raw, context)
# Result data can change (between a pause and build
# completion) so de-serialize it every time.
# completion).
# MODEL_API < 5
for k in ('_result_data', '_secret_result_data'):
try:
if data[k]:
data[k] = ResultData.fromZK(context, data[k])
if data.get(k):
data[k] = JobData.fromZK(context, data[k])
# This used to be a ResultData object, which is
# the same as a JobData but without a hash, so
# generate one.
data[k]._set(hash=JobData.getHash(data[k].data))
except Exception:
self.log.exception("Failed to restore result data")
data[k] = None
# MODEL_API >= 5; override with this if present.
for job_data_key in self.job_data_attributes:
job_data = data.pop(job_data_key, None)
if job_data:
# This is a dict which tells us where the actual data is.
if job_data['storage'] == 'local':
# The data are stored locally in this dict
data['_' + job_data_key] = job_data['data']
elif job_data['storage'] == 'offload':
existing_job_data = getattr(self, job_data_key, None)
if (getattr(existing_job_data, 'hash', None) ==
job_data['hash']):
# Re-use the existing object since it's the same
data['_' + job_data_key] = existing_job_data
else:
# Load the object from ZK
data['_' + job_data_key] = JobData.fromZK(
context, job_data['path'])
else:
data['_' + job_data_key] = None
return data
def getPath(self):
@ -3448,27 +3472,29 @@ class Build(zkobject.ZKObject):
return ('<Build %s of %s voting:%s>' %
(self.uuid, self.job.name, self.job.voting))
def _getJobData(self, name):
val = getattr(self, name)
if isinstance(val, JobData):
return val.data
return val
@property
def result_data(self):
if self._result_data:
return self._result_data.data
return {}
return self._getJobData('_result_data') or {}
@property
def secret_result_data(self):
if self._secret_result_data:
return self._secret_result_data.data
return {}
return self._getJobData('_secret_result_data') or {}
def setResultData(self, result_data, secret_result_data):
if not self._active_context:
raise Exception(
"setResultData must be used with a context manager")
self._result_data = ResultData.new(
self._result_data = JobData.new(
self._active_context,
data=result_data,
_path=self.getPath() + '/result_data')
self._secret_result_data = ResultData.new(
self._secret_result_data = JobData.new(
self._active_context,
data=secret_result_data,
_path=self.getPath() + '/secret_result_data')

17
zuul/model_api.py Normal file
View File

@ -0,0 +1,17 @@
# Copyright 2022 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# When making ZK schema changes, increment this and add a record to
# docs/developer/model-changelog.rst
MODEL_API = 5

View File

@ -21,7 +21,7 @@ from kazoo.exceptions import NoNodeError
from kazoo.protocol.states import EventType
from zuul.zk import ZooKeeperBase
from zuul import model
from zuul.model_api import MODEL_API
COMPONENTS_ROOT = "/zuul/components"
@ -123,7 +123,7 @@ class BaseComponent(ZooKeeperBase):
except NoNodeError:
self.log.error("Could not update %s in ZooKeeper", self)
def register(self, model_api=model.MODEL_API):
def register(self, model_api=MODEL_API):
self.content['model_api'] = model_api
with self.register_lock:
path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname])
@ -346,7 +346,7 @@ class ComponentRegistry(ZooKeeperBase):
# Start with our own version in case we're the only component
# and we haven't registered.
version = model.MODEL_API
version = MODEL_API
for kind, components in self.all():
for component in components:
version = min(version, component.model_api)
@ -358,28 +358,28 @@ class ComponentRegistry(ZooKeeperBase):
version = self.getMinimumModelApi()
if version != self.model_api:
self.log.info(f"System minimum data model version {version}; "
f"this component {model.MODEL_API}")
f"this component {MODEL_API}")
if self.model_api is None:
if version < model.MODEL_API:
if version < MODEL_API:
self.log.info("The data model version of this component is "
"newer than the rest of the system; this "
"component will operate in compatability mode "
"until the system is upgraded")
elif version > model.MODEL_API:
elif version > MODEL_API:
self.log.error("The data model version of this component is "
"older than the rest of the system; "
"exiting to prevent data corruption")
sys.exit(1)
else:
if version > self.model_api:
if version > model.MODEL_API:
if version > MODEL_API:
self.log.info("The data model version of this component "
"is older than other components in the "
"system, so other components will operate "
"in a compability mode; upgrade this "
"component as soon as possible to complete "
"the system upgrade")
elif version == model.MODEL_API:
elif version == MODEL_API:
self.log.info("The rest of the system has been upgraded "
"to the data model version of this "
"component")