diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index bf638a6625..cfa8c9a115 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -6,7 +6,7 @@ increases here. When making a model change: -* Increment the value of ``MODEL_API`` in ``model.py``. +* Increment the value of ``MODEL_API`` in ``model_api.py``. * Update code to use the new API by default and add backwards-compatibility handling for older versions. This makes it easier to clean up backwards-compatibility handling in the future. @@ -54,3 +54,12 @@ Version 4 :Description: Adds QueueItem.dequeued_missing_requirements and sets it to True if a change no longer meets merge requirements in dependent pipelines. This only affects schedulers. + +Version 5 +--------- + +:Prior Zuul version: 5.1.0 +:Description: Changes the result data attributes on Build from + ResultData to JobData instances and uses the + inline/offloading paradigm from FrozenJob. This affects + schedulers and executors. diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index eae37ff089..81faac4da0 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -100,6 +100,47 @@ class TestModelUpgrade(ZuulTestCase): dict(name='test-job', result='SUCCESS', changes='1,1'), ], ordered=False) + @model_version(4) + def test_model_4_5(self): + # Changes share a queue, but with only one job, the first + # merges before the second starts. + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + fake_data = [ + {'name': 'image', + 'url': 'http://example.com/image', + 'metadata': { + 'type': 'container_image' + }}, + ] + self.executor_server.returnData( + 'project-merge', A, + {'zuul': {'artifacts': fake_data}} + ) + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 1) + + # Upgrade our component + self.model_test_component_info.model_api = 5 + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project1-project2-integration', + result='SUCCESS', changes='1,1'), + ], ordered=False) + # Verify that the child job got the data from the parent + test1 = self.getJobFromHistory('project-test1') + self.assertEqual(fake_data[0]['url'], + test1.parameters['zuul']['artifacts'][0]['url']) + class TestSemaphoreModelUpgrade(ZuulTestCase): tenant_config_file = 'config/semaphore/main.yaml' diff --git a/zuul/model.py b/zuul/model.py index df210e353f..5d8525cb69 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -47,10 +47,7 @@ from zuul.lib.capabilities import capabilities_registry from zuul.lib.jsonutil import json_dumps from zuul.zk import zkobject from zuul.zk.change_cache import ChangeKey - -# When making ZK schema changes, increment this and add a record to -# docs/developer/model-changelog.rst -MODEL_API = 4 +from zuul.zk.components import COMPONENT_REGISTRY MERGER_MERGE = 1 # "git merge" MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve" @@ -3351,26 +3348,6 @@ class BuildRequest(JobRequest): ) -class ResultData(zkobject.ShardedZKObject): - # If the node exists already, it is probably a half-written state - # from a crash; truncate it and continue. - truncate_on_create = True - - def __init__(self): - super().__init__() - self._set(data={}) - - def getPath(self): - return self._path - - def serialize(self, context): - data = { - "data": self.data, - "_path": self._path, - } - return json.dumps(data, sort_keys=True).encode("utf8") - - class Build(zkobject.ZKObject): """A Build is an instance of a single execution of a Job. @@ -3379,8 +3356,16 @@ class Build(zkobject.ZKObject): Job (related builds are grouped together in a BuildSet). """ + # If data/variables are more than 10k, we offload them to another + # object, otherwise we store them on this one. + MAX_DATA_LEN = 10 * 1024 + log = logging.getLogger("zuul.Build") + job_data_attributes = ('result_data', + 'secret_result_data', + ) + def __init__(self): super().__init__() self._set( @@ -3409,10 +3394,6 @@ class Build(zkobject.ZKObject): "uuid": self.uuid, "url": self.url, "result": self.result, - "_result_data": (self._result_data.getPath() - if self._result_data else None), - "_secret_result_data": (self._secret_result_data.getPath() - if self._secret_result_data else None), "error_detail": self.error_detail, "execute_time": self.execute_time, "start_time": self.start_time, @@ -3425,20 +3406,63 @@ class Build(zkobject.ZKObject): "zuul_event_id": self.zuul_event_id, "build_request_ref": self.build_request_ref, } + if COMPONENT_REGISTRY.model_api < 5: + data["_result_data"] = (self._result_data.getPath() + if self._result_data else None) + data["_secret_result_data"] = ( + self._secret_result_data.getPath() + if self._secret_result_data else None) + else: + for k in self.job_data_attributes: + v = getattr(self, '_' + k) + if isinstance(v, JobData): + v = {'storage': 'offload', 'path': v.getPath(), + 'hash': v.hash} + else: + v = {'storage': 'local', 'data': v} + data[k] = v + return json.dumps(data, sort_keys=True).encode("utf8") def deserialize(self, raw, context): data = super().deserialize(raw, context) # Result data can change (between a pause and build - # completion) so de-serialize it every time. + # completion). + + # MODEL_API < 5 for k in ('_result_data', '_secret_result_data'): try: - if data[k]: - data[k] = ResultData.fromZK(context, data[k]) + if data.get(k): + data[k] = JobData.fromZK(context, data[k]) + # This used to be a ResultData object, which is + # the same as a JobData but without a hash, so + # generate one. + data[k]._set(hash=JobData.getHash(data[k].data)) except Exception: self.log.exception("Failed to restore result data") data[k] = None + + # MODEL_API >= 5; override with this if present. + for job_data_key in self.job_data_attributes: + job_data = data.pop(job_data_key, None) + if job_data: + # This is a dict which tells us where the actual data is. + if job_data['storage'] == 'local': + # The data are stored locally in this dict + data['_' + job_data_key] = job_data['data'] + elif job_data['storage'] == 'offload': + existing_job_data = getattr(self, job_data_key, None) + if (getattr(existing_job_data, 'hash', None) == + job_data['hash']): + # Re-use the existing object since it's the same + data['_' + job_data_key] = existing_job_data + else: + # Load the object from ZK + data['_' + job_data_key] = JobData.fromZK( + context, job_data['path']) + else: + data['_' + job_data_key] = None return data def getPath(self): @@ -3448,27 +3472,29 @@ class Build(zkobject.ZKObject): return ('' % (self.uuid, self.job.name, self.job.voting)) + def _getJobData(self, name): + val = getattr(self, name) + if isinstance(val, JobData): + return val.data + return val + @property def result_data(self): - if self._result_data: - return self._result_data.data - return {} + return self._getJobData('_result_data') or {} @property def secret_result_data(self): - if self._secret_result_data: - return self._secret_result_data.data - return {} + return self._getJobData('_secret_result_data') or {} def setResultData(self, result_data, secret_result_data): if not self._active_context: raise Exception( "setResultData must be used with a context manager") - self._result_data = ResultData.new( + self._result_data = JobData.new( self._active_context, data=result_data, _path=self.getPath() + '/result_data') - self._secret_result_data = ResultData.new( + self._secret_result_data = JobData.new( self._active_context, data=secret_result_data, _path=self.getPath() + '/secret_result_data') diff --git a/zuul/model_api.py b/zuul/model_api.py new file mode 100644 index 0000000000..92b7826210 --- /dev/null +++ b/zuul/model_api.py @@ -0,0 +1,17 @@ +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# When making ZK schema changes, increment this and add a record to +# docs/developer/model-changelog.rst +MODEL_API = 5 diff --git a/zuul/zk/components.py b/zuul/zk/components.py index 6554297e15..f538ed43cb 100644 --- a/zuul/zk/components.py +++ b/zuul/zk/components.py @@ -21,7 +21,7 @@ from kazoo.exceptions import NoNodeError from kazoo.protocol.states import EventType from zuul.zk import ZooKeeperBase -from zuul import model +from zuul.model_api import MODEL_API COMPONENTS_ROOT = "/zuul/components" @@ -123,7 +123,7 @@ class BaseComponent(ZooKeeperBase): except NoNodeError: self.log.error("Could not update %s in ZooKeeper", self) - def register(self, model_api=model.MODEL_API): + def register(self, model_api=MODEL_API): self.content['model_api'] = model_api with self.register_lock: path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname]) @@ -346,7 +346,7 @@ class ComponentRegistry(ZooKeeperBase): # Start with our own version in case we're the only component # and we haven't registered. - version = model.MODEL_API + version = MODEL_API for kind, components in self.all(): for component in components: version = min(version, component.model_api) @@ -358,28 +358,28 @@ class ComponentRegistry(ZooKeeperBase): version = self.getMinimumModelApi() if version != self.model_api: self.log.info(f"System minimum data model version {version}; " - f"this component {model.MODEL_API}") + f"this component {MODEL_API}") if self.model_api is None: - if version < model.MODEL_API: + if version < MODEL_API: self.log.info("The data model version of this component is " "newer than the rest of the system; this " "component will operate in compatability mode " "until the system is upgraded") - elif version > model.MODEL_API: + elif version > MODEL_API: self.log.error("The data model version of this component is " "older than the rest of the system; " "exiting to prevent data corruption") sys.exit(1) else: if version > self.model_api: - if version > model.MODEL_API: + if version > MODEL_API: self.log.info("The data model version of this component " "is older than other components in the " "system, so other components will operate " "in a compability mode; upgrade this " "component as soon as possible to complete " "the system upgrade") - elif version == model.MODEL_API: + elif version == MODEL_API: self.log.info("The rest of the system has been upgraded " "to the data model version of this " "component")