Remove most model_api backwards-compat handling

Since we require deleting the state (except config cache) for the
circular dependency upgrade, we can now remove any zookeeper
backwards compatability handling not in the config cache.  This
change does so.  It leaves upgrade handling for two attributes
in the branch cache (even though they are old) because it is
plausible that even systems that have been upgrading regularly
may have non-upgraded data in the branch cache for infrequently
used branches, and the cost for retaining the upgrade handling
is small.

Change-Id: I881578159b06c8c7b38a9fa0980aa0626dddad31
This commit is contained in:
James E. Blair 2024-02-07 10:11:34 -08:00
parent ca83980bb7
commit fa274fcf25
9 changed files with 41 additions and 944 deletions

View File

@ -14,18 +14,13 @@
import json
from zuul import change_matcher
from zuul import model
from zuul.lib.re2util import ZuulRegex
from zuul.zk.components import ComponentRegistry
from tests.base import (
AnsibleZuulTestCase,
ZuulTestCase,
simple_layout,
iterate_timeout,
)
from tests.base import ZuulWebFixture
def model_version(version):
@ -72,737 +67,3 @@ class TestModelUpgrade(ZuulTestCase):
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 1:
break
@model_version(2)
@simple_layout('layouts/pipeline-supercedes.yaml')
def test_supercedes(self):
"""
Test that pipeline supsercedes still work with model API 2,
which uses deqeueue events.
"""
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'test-job')
A.addApproval('Code-Review', 2)
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'test-job')
self.assertEqual(self.builds[0].pipeline, 'gate')
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
self.assertEqual(A.reported, 2)
self.assertEqual(A.data['status'], 'MERGED')
self.assertHistory([
dict(name='test-job', result='ABORTED', changes='1,1'),
dict(name='test-job', result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(4)
def test_model_4(self):
# Test that Zuul return values are correctly passed to child
# jobs in version 4 compatibility mode.
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A')
fake_data = [
{'name': 'image',
'url': 'http://example.com/image',
'metadata': {
'type': 'container_image'
}},
]
self.executor_server.returnData(
'project-merge', A,
{'zuul': {'artifacts': fake_data}}
)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
# Verify that the child jobs got the data from the parent
test1 = self.getJobFromHistory('project-test1')
self.assertEqual(fake_data[0]['url'],
test1.parameters['zuul']['artifacts'][0]['url'])
integration = self.getJobFromHistory('project1-project2-integration')
self.assertEqual(fake_data[0]['url'],
integration.parameters['zuul']['artifacts'][0]['url'])
@model_version(4)
def test_model_4_5(self):
# Changes share a queue, but with only one job, the first
# merges before the second starts.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
fake_data = [
{'name': 'image',
'url': 'http://example.com/image',
'metadata': {
'type': 'container_image'
}},
]
self.executor_server.returnData(
'project-merge', A,
{'zuul': {'artifacts': fake_data}}
)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = 5
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
# Verify that the child job got the data from the parent
test1 = self.getJobFromHistory('project-test1')
self.assertEqual(fake_data[0]['url'],
test1.parameters['zuul']['artifacts'][0]['url'])
@model_version(5)
def test_model_5_6(self):
# This exercises the min_ltimes=None case in configloader on
# layout updates.
first = self.scheds.first
second = self.createScheduler()
second.start()
self.assertEqual(len(self.scheds), 2)
for _ in iterate_timeout(10, "until priming is complete"):
state_one = first.sched.local_layout_state.get("tenant-one")
if state_one:
break
for _ in iterate_timeout(
10, "all schedulers to have the same layout state"):
if (second.sched.local_layout_state.get(
"tenant-one") == state_one):
break
with second.sched.layout_update_lock, second.sched.run_handler_lock:
file_dict = {'zuul.d/test.yaml': ''}
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
files=file_dict)
A.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
self.waitUntilSettled(matcher=[first])
# Delete the layout data to simulate the first scheduler
# being on model api 5 (we write the data regardless of
# the cluster version since it's a new znode).
self.scheds.first.sched.zk_client.client.delete(
'/zuul/layout-data', recursive=True)
self.waitUntilSettled()
self.assertEqual(first.sched.local_layout_state.get("tenant-one"),
second.sched.local_layout_state.get("tenant-one"))
# No test for model version 7 (secrets in blob store): old and new
# code paths are exercised in existing tests since small secrets
# don't use the blob store.
@model_version(8)
def test_model_8_9(self):
# This excercises the upgrade to nodeset_alternates
first = self.scheds.first
second = self.createScheduler()
second.start()
self.assertEqual(len(self.scheds), 2)
for _ in iterate_timeout(10, "until priming is complete"):
state_one = first.sched.local_layout_state.get("tenant-one")
if state_one:
break
for _ in iterate_timeout(
10, "all schedulers to have the same layout state"):
if (second.sched.local_layout_state.get(
"tenant-one") == state_one):
break
self.fake_nodepool.pause()
with second.sched.layout_update_lock, second.sched.run_handler_lock:
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled(matcher=[first])
self.model_test_component_info.model_api = 9
with first.sched.layout_update_lock, first.sched.run_handler_lock:
self.fake_nodepool.unpause()
self.waitUntilSettled(matcher=[second])
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(11)
def test_model_11_12(self):
# This excercises the upgrade to store build/job versions
first = self.scheds.first
second = self.createScheduler()
second.start()
self.assertEqual(len(self.scheds), 2)
for _ in iterate_timeout(10, "until priming is complete"):
state_one = first.sched.local_layout_state.get("tenant-one")
if state_one:
break
for _ in iterate_timeout(
10, "all schedulers to have the same layout state"):
if (second.sched.local_layout_state.get(
"tenant-one") == state_one):
break
self.executor_server.hold_jobs_in_build = True
with second.sched.layout_update_lock, second.sched.run_handler_lock:
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled(matcher=[first])
self.model_test_component_info.model_api = 12
with first.sched.layout_update_lock, first.sched.run_handler_lock:
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled(matcher=[second])
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(12)
def test_model_12_13(self):
# Initially queue items will still have the full trigger event
# stored in Zookeeper. The trigger event will be converted to
# an event info object after the model API update.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = 13
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(16)
def test_model_16_17(self):
matcher = change_matcher.BranchMatcher(ZuulRegex('foo'))
ser = matcher.serialize()
self.assertEqual(ser, {'regex': 'foo', 'implied': False})
matcher2 = change_matcher.BranchMatcher.deserialize(ser)
self.assertEqual(matcher, matcher2)
# Upgrade our component
self.model_test_component_info.model_api = 17
component_registry = ComponentRegistry(self.zk_client)
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 17:
break
matcher = change_matcher.BranchMatcher(ZuulRegex('foo'))
ser = matcher.serialize()
self.assertEqual(ser, {
'regex': {
'negate': False,
'pattern': 'foo',
},
'implied': False
})
matcher2 = change_matcher.BranchMatcher.deserialize(ser)
self.assertEqual(matcher, matcher2)
@model_version(18)
def test_model_18(self):
# Test backward compatibility with old name-based job storage
# when not all components have updated yet.
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(18)
def test_model_18_19(self):
# Test backward compatibility with existing job graphs that
# still use the name-based job storage.
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = 19
component_registry = ComponentRegistry(self.zk_client)
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 19:
break
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
@model_version(20)
def test_model_20_21(self):
# Test backwards compat for job graph dependency freezing.
# Note that these jobs have a dependency on project-merge.
# First test the entire lifecycle under the old api.
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
], ordered=False)
# Then repeat the test with a mid-cycle upgrade.
self.executor_server.hold_jobs_in_build = True
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = 21
component_registry = ComponentRegistry(self.zk_client)
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 21:
break
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='SUCCESS', changes='1,1'),
dict(name='project-test1', result='SUCCESS', changes='1,1'),
dict(name='project-test2', result='SUCCESS', changes='1,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='1,1'),
dict(name='project-merge', result='SUCCESS', changes='2,1'),
dict(name='project-test1', result='SUCCESS', changes='2,1'),
dict(name='project-test2', result='SUCCESS', changes='2,1'),
dict(name='project1-project2-integration',
result='SUCCESS', changes='2,1'),
], ordered=False)
@model_version(20)
@simple_layout('layouts/soft-dependencies.yaml')
def test_20_soft_dependencies(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='deploy', result='SUCCESS', changes='1,1'),
], ordered=False)
class TestGithubModelUpgrade(ZuulTestCase):
config_file = 'zuul-github-driver.conf'
scheduler_count = 1
@model_version(10)
@simple_layout('layouts/github-merge-mode.yaml', driver='github')
def test_merge_method_syntax_check(self):
"""
Tests that the merge mode gets forwarded to the reporter and the
PR was rebased.
"""
webfixture = self.useFixture(
ZuulWebFixture(self.changes, self.config,
self.additional_event_queues, self.upstream_root,
self.poller_events,
self.git_url_with_auth, self.addCleanup,
self.test_root))
sched = self.scheds.first.sched
web = webfixture.web
github = self.fake_github.getGithubClient()
repo = github.repo_from_project('org/project')
repo._repodata['allow_rebase_merge'] = False
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
# Verify that there are no errors with model version 9 (we
# should be using the defaultdict that indicates all merge
# modes are supported).
tenant = sched.abide.tenants.get('tenant-one')
self.assertEquals(len(tenant.layout.loading_errors), 0)
# Upgrade our component
self.model_test_component_info.model_api = 11
# Perform a smart reconfiguration which should not clear the
# cache; we should continue to see no errors because we should
# still be using the defaultdict.
self.scheds.first.smartReconfigure()
tenant = sched.abide.tenants.get('tenant-one')
self.assertEquals(len(tenant.layout.loading_errors), 0)
# Wait for web to have the same config
for _ in iterate_timeout(10, "config is synced"):
if (web.tenant_layout_state.get('tenant-one') ==
web.local_layout_state.get('tenant-one')):
break
# Repeat the check
tenant = web.abide.tenants.get('tenant-one')
self.assertEquals(len(tenant.layout.loading_errors), 0)
# Perform a full reconfiguration which should cause us to
# actually query, update the branch cache, and report an
# error.
self.scheds.first.fullReconfigure()
self.waitUntilSettled()
tenant = sched.abide.tenants.get('tenant-one')
loading_errors = tenant.layout.loading_errors
self.assertEquals(
len(tenant.layout.loading_errors), 1,
"An error should have been stored in sched")
self.assertIn(
"rebase not supported",
str(loading_errors[0].error))
# Wait for web to have the same config
for _ in iterate_timeout(10, "config is synced"):
if (web.tenant_layout_state.get('tenant-one') ==
web.local_layout_state.get('tenant-one')):
break
# Repoat the check for web
tenant = web.abide.tenants.get('tenant-one')
loading_errors = tenant.layout.loading_errors
self.assertEquals(
len(tenant.layout.loading_errors), 1,
"An error should have been stored in web")
self.assertIn(
"rebase not supported",
str(loading_errors[0].error))
@model_version(17)
@simple_layout('layouts/github-merge-mode.yaml', driver='github')
def test_default_merge_mode(self):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
md = tenant.layout.getProjectMetadata('github.com/org/project1')
self.assertEqual(model.MERGER_MERGE, md.merge_mode)
tpc = tenant.project_configs['github.com/org/project1']
# Force a re-fetch of the project branches, which also updates
# the supported merge modes.
gh = tpc.project.source.connection
gh.updateProjectBranches(tpc.project)
merge_modes = tpc.project.source.getProjectMergeModes(
tpc.project, tenant)
# Branch cache shouldn't contain new merge modes for model API < 18
self.assertEqual([1, 2, 4, 5], merge_modes)
# Upgrade our component
self.model_test_component_info.model_api = 18
component_registry = ComponentRegistry(self.zk_client)
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == 18:
break
merge_modes = tpc.project.source.getProjectMergeModes(
tpc.project, tenant)
# Branch cache still contains only the old merge modes
self.assertEqual([1, 2, 4, 5], merge_modes)
# Test that we can still process changes with the project branch
# cache not containing the new default merge mode.
A = self.fake_github.openFakePullRequest('org/project1', 'master', 'A',
files={"zuul.yaml": ""})
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
self.waitUntilSettled()
self.assertEqual(len(self.history), 1)
# Re-fetch the project branches once more.
gh.updateProjectBranches(tpc.project)
merge_modes = tpc.project.source.getProjectMergeModes(
tpc.project, tenant)
# The cache should now contain the new merge modes, but not the TPC.
self.assertEqual([1, 2, 6, 7, 4, 5], merge_modes)
self.assertEqual([1, 2, 4, 5], tpc.merge_modes)
# Test that we can still process changes with the TPC not using the
# new supported merge modes.
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
self.waitUntilSettled()
self.assertEqual(len(self.history), 2)
# Perform a full reconfiguration which should cause us to
# re-fetch the merge modes.
self.scheds.first.fullReconfigure()
self.waitUntilSettled()
# Now the TPC should also contain the new merge modes.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
tpc = tenant.project_configs['github.com/org/project1']
self.assertEqual([1, 2, 6, 7, 4, 5], tpc.merge_modes)
layout = self.scheds.first.sched.abide.tenants.get('tenant-one').layout
md = layout.getProjectMetadata('github.com/org/project1')
self.assertEqual(model.MERGER_MERGE_ORT, md.merge_mode)
class TestDefaultBranchUpgrade(ZuulTestCase):
config_file = "zuul-gerrit-github.conf"
scheduler_count = 1
@model_version(15)
@simple_layout('layouts/default-branch.yaml', driver='github')
def test_default_branch(self):
self.waitUntilSettled()
github = self.fake_github.getGithubClient()
repo = github.repo_from_project('org/project-default')
repo._repodata['default_branch'] = 'foobar'
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
# Verify we use the default from the defaultdict.
layout = self.scheds.first.sched.abide.tenants.get('tenant-one').layout
md = layout.getProjectMetadata(
'github.com/org/project-default')
self.assertEqual('master', md.default_branch)
# Upgrade our component
self.model_test_component_info.model_api = 16
# Perform a smart reconfiguration which should not clear the
# cache; we should continue to see no change because we should
# still be using the defaultdict.
self.scheds.first.smartReconfigure()
layout = self.scheds.first.sched.abide.tenants.get('tenant-one').layout
md = layout.getProjectMetadata(
'github.com/org/project-default')
self.assertEqual('master', md.default_branch)
# Perform a full reconfiguration which should cause us to
# actually query and update the branch cache.
self.scheds.first.fullReconfigure()
self.waitUntilSettled()
layout = self.scheds.first.sched.abide.tenants.get('tenant-one').layout
md = layout.getProjectMetadata(
'github.com/org/project-default')
self.assertEqual('foobar', md.default_branch)
class TestDataReturn(AnsibleZuulTestCase):
tenant_config_file = 'config/data-return/main.yaml'
@model_version(19)
def test_data_return(self):
# Test backward compatibility handling
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='data-return', result='SUCCESS', changes='1,1'),
dict(name='data-return-relative', result='SUCCESS', changes='1,1'),
dict(name='child', result='SUCCESS', changes='1,1'),
], ordered=False)
self.assertIn('- data-return https://zuul.example.com/',
A.messages[-1])
self.assertIn('- data-return-relative https://zuul.example.com',
A.messages[-1])
# To 21
@model_version(18)
def test_model_18_21(self):
self._test_circ_dep_refactor(21)
@model_version(20)
def test_model_20_21(self):
self._test_circ_dep_refactor(21)
# To 22
@model_version(18)
def test_model_18_22(self):
self._test_circ_dep_refactor(22)
@model_version(20)
def test_model_20_22(self):
self._test_circ_dep_refactor(22)
@model_version(21)
def test_model_21_22(self):
self._test_circ_dep_refactor(22)
# To 23
@model_version(18)
def test_model_18_23(self):
self._test_circ_dep_refactor(23)
@model_version(20)
def test_model_20_23(self):
self._test_circ_dep_refactor(23)
@model_version(21)
def test_model_21_23(self):
self._test_circ_dep_refactor(23)
@model_version(22)
def test_model_22_23(self):
self._test_circ_dep_refactor(23)
# To 24
@model_version(18)
def test_model_18_24(self):
self._test_circ_dep_refactor(24)
@model_version(20)
def test_model_20_24(self):
self._test_circ_dep_refactor(24)
@model_version(21)
def test_model_21_24(self):
self._test_circ_dep_refactor(24)
@model_version(22)
def test_model_22_24(self):
self._test_circ_dep_refactor(24)
@model_version(23)
def test_model_23_24(self):
self._test_circ_dep_refactor(24)
# To 25
@model_version(18)
def test_model_18_25(self):
self._test_circ_dep_refactor(25)
@model_version(20)
def test_model_20_25(self):
self._test_circ_dep_refactor(25)
@model_version(21)
def test_model_21_25(self):
self._test_circ_dep_refactor(25)
@model_version(22)
def test_model_22_25(self):
self._test_circ_dep_refactor(25)
@model_version(23)
def test_model_23_25(self):
self._test_circ_dep_refactor(25)
@model_version(24)
def test_model_24_25(self):
self._test_circ_dep_refactor(25)
def _test_circ_dep_refactor(self, final_model_api):
# Test backwards compat for job graph dependency freezing.
# First test the entire lifecycle under the old api.
A = self.fake_gerrit.addFakeChange('org/project6', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='data-return', result='SUCCESS', changes='1,1'),
dict(name='paused-data-return-child-jobs',
result='SUCCESS', changes='1,1'),
], ordered=True)
self.assertEqual(set(['data-return', 'child']),
set(self.history[1].parameters['zuul']['child_jobs']))
# Then repeat the test with a mid-cycle upgrade.
self.executor_server.hold_jobs_in_build = True
B = self.fake_gerrit.addFakeChange('org/project6', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
# Upgrade our component
self.model_test_component_info.model_api = final_model_api
component_registry = ComponentRegistry(self.zk_client)
for _ in iterate_timeout(30, "model api to update"):
if component_registry.model_api == final_model_api:
break
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='data-return', result='SUCCESS', changes='1,1'),
dict(name='paused-data-return-child-jobs',
result='SUCCESS', changes='1,1'),
dict(name='data-return', result='SUCCESS', changes='2,1'),
dict(name='paused-data-return-child-jobs',
result='SUCCESS', changes='2,1'),
], ordered=True)
self.assertEqual(set(['data-return', 'child']),
set(self.history[1].parameters['zuul']['child_jobs']))
self.assertEqual(set(['data-return', 'child']),
set(self.history[3].parameters['zuul']['child_jobs']))

View File

@ -21,7 +21,6 @@ configuration.
import re
from zuul.lib.re2util import ZuulRegex
from zuul.zk.components import COMPONENT_REGISTRY
class AbstractChangeMatcher(object):
@ -94,24 +93,14 @@ class BranchMatcher(AbstractChangeMatcher):
return False
def serialize(self):
if (COMPONENT_REGISTRY.model_api < 17):
return {
"implied": self.exactmatch,
"regex": self.regex.pattern,
}
else:
return {
"implied": self.exactmatch,
"regex": self.regex.serialize(),
}
return {
"implied": self.exactmatch,
"regex": self.regex.serialize(),
}
@classmethod
def deserialize(cls, data):
if isinstance(data['regex'], dict):
regex = ZuulRegex.deserialize(data['regex'])
else:
# MODEL_API >= 17
regex = ZuulRegex(data['regex'])
regex = ZuulRegex.deserialize(data['regex'])
o = cls(regex)
return o

View File

@ -15,7 +15,6 @@
import os
from zuul.lib import strings
from zuul.zk.components import COMPONENT_REGISTRY
def construct_build_params(uuid, connections, job, item, pipeline,
@ -90,14 +89,13 @@ def construct_build_params(uuid, connections, job, item, pipeline,
]
params = dict()
if COMPONENT_REGISTRY.model_api >= 19:
(
params["parent_data"],
params["secret_parent_data"],
artifact_data
) = item.getJobParentData(job)
if artifact_data:
zuul_params['artifacts'] = artifact_data
(
params["parent_data"],
params["secret_parent_data"],
artifact_data
) = item.getJobParentData(job)
if artifact_data:
zuul_params['artifacts'] = artifact_data
params['job_ref'] = job.getPath()
params['items'] = merger_items

View File

@ -982,9 +982,7 @@ class AnsibleJob(object):
with executor_server.zk_context as ctx:
self.job = FrozenJob.fromZK(ctx, arguments["job_ref"])
job_zuul_params = zuul_params_from_job(self.job)
# MODEL_API < 20
job_zuul_params["artifacts"] = self.arguments["zuul"].get(
"artifacts", job_zuul_params.get("artifacts"))
job_zuul_params["artifacts"] = self.arguments["zuul"].get("artifacts")
if job_zuul_params["artifacts"] is None:
del job_zuul_params["artifacts"]
self.arguments["zuul"].update(job_zuul_params)
@ -1079,18 +1077,10 @@ class AnsibleJob(object):
max_attempts = self.arguments["max_attempts"]
self.retry_limit = self.arguments["zuul"]["attempts"] >= max_attempts
try:
parent_data = self.arguments["parent_data"]
except KeyError:
# MODEL_API < 20
parent_data = self.job.parent_data or {}
parent_data = self.arguments["parent_data"]
self.normal_vars = Job._deepUpdate(parent_data.copy(),
self.job.variables)
try:
self.secret_vars = self.arguments["secret_parent_data"]
except KeyError:
# MODEL_API < 20
self.secret_vars = self.job.secret_parent_data or {}
self.secret_vars = self.arguments["secret_parent_data"]
def run(self):
self.running = True

View File

@ -1906,8 +1906,7 @@ class PipelineManager(metaclass=ABCMeta):
item.pipeline.tenant.semaphore_handler.release(
event_queue, item, build.job)
# MODEL_API < 25
if item.getJob(build.job.uuid or build.job.name) is None:
if item.getJob(build.job.uuid) is None:
log.info("Build %s no longer in job graph for item %s",
build, item)
return

View File

@ -52,7 +52,6 @@ from zuul.lib import tracing
from zuul.zk import zkobject
from zuul.zk.blob_store import BlobStore
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
MERGER_MERGE = 1 # "git merge"
MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
@ -308,9 +307,8 @@ class ConfigurationError(object):
@classmethod
def deserialize(cls, data):
data["key"] = ConfigurationErrorKey.deserialize(data["key"])
# These attributes were added in MODEL_API 13
data['severity'] = data.get('severity', SEVERITY_ERROR)
data['name'] = data.get('name', 'Unknown')
data['severity'] = data['severity']
data['name'] = data['name']
o = cls.__new__(cls)
o.__dict__.update(data)
return o
@ -1192,9 +1190,6 @@ class ChangeQueue(zkobject.ZKObject):
"queue": list(items_by_path.values()),
"project_branches": [tuple(pb) for pb in data["project_branches"]],
})
# MODEL_API < 17
if 'window_ceiling' not in data:
data['window_ceiling'] = math.inf
return data
def getPath(self):
@ -2367,10 +2362,7 @@ class FrozenJob(zkobject.ZKObject):
@classmethod
def jobPath(cls, job_id, parent_path):
# MODEL_API < 19
# Job name is used as path component and needs to be quoted.
safe_id = urllib.parse.quote_plus(job_id)
return f"{parent_path}/job/{safe_id}"
return f"{parent_path}/job/{job_id}"
def getPath(self):
return self.jobPath(self.uuid, self.buildset.getPath())
@ -2416,9 +2408,6 @@ class FrozenJob(zkobject.ZKObject):
v = {'storage': 'local', 'data': v}
data[k] = v
if (COMPONENT_REGISTRY.model_api < 9):
data['nodeset'] = data['nodeset_alternatives'][0]
data['ref'] = self.ref
data['other_refs'] = self.other_refs
@ -2431,27 +2420,6 @@ class FrozenJob(zkobject.ZKObject):
# of whether they have been deserialized.
data = super().deserialize(raw, context)
# MODEL_API < 8
if 'deduplicate' not in data:
data['deduplicate'] = 'auto'
# MODEL_API < 9
if data.get('nodeset'):
data['nodeset_alternatives'] = [data['nodeset']]
data['nodeset_index'] = 0
del data['nodeset']
# MODEL_API < 15
if 'ansible_split_streams' not in data:
data['ansible_split_streams'] = None
# MODEL_API < 15
if 'failure_output' not in data:
data['failure_output'] = []
# MODEL_API < 19
data.setdefault("uuid", None)
if hasattr(self, 'nodeset_alternatives'):
alts = self.nodeset_alternatives
else:
@ -2899,8 +2867,7 @@ class Job(ConfigObject):
for secret_key, secret_value in list(playbook['secrets'].items()):
secret_serialized = json_dumps(
secret_value, sort_keys=True).encode("utf8")
if (COMPONENT_REGISTRY.model_api >= 6 and
len(secret_serialized) > self.SECRET_BLOB_SIZE):
if (len(secret_serialized) > self.SECRET_BLOB_SIZE):
# If the secret is large, store it in the blob store
# and store the key in the playbook secrets dict.
blob_key = blobstore.put(secret_serialized)
@ -4068,21 +4035,14 @@ class Build(zkobject.ZKObject):
"span_info": self.span_info,
"events": [e.toDict() for e in self.events],
}
if COMPONENT_REGISTRY.model_api < 5:
data["_result_data"] = (self._result_data.getPath()
if self._result_data else None)
data["_secret_result_data"] = (
self._secret_result_data.getPath()
if self._secret_result_data else None)
else:
for k in self.job_data_attributes:
v = getattr(self, '_' + k)
if isinstance(v, JobData):
v = {'storage': 'offload', 'path': v.getPath(),
'hash': v.hash}
else:
v = {'storage': 'local', 'data': v}
data[k] = v
for k in self.job_data_attributes:
v = getattr(self, '_' + k)
if isinstance(v, JobData):
v = {'storage': 'offload', 'path': v.getPath(),
'hash': v.hash}
else:
v = {'storage': 'local', 'data': v}
data[k] = v
return json.dumps(data, sort_keys=True).encode("utf8")
@ -4097,20 +4057,6 @@ class Build(zkobject.ZKObject):
# Result data can change (between a pause and build
# completion).
# MODEL_API < 5
for k in ('_result_data', '_secret_result_data'):
try:
if data.get(k):
data[k] = JobData.fromZK(context, data[k])
# This used to be a ResultData object, which is
# the same as a JobData but without a hash, so
# generate one.
data[k]._set(hash=JobData.getHash(data[k].data))
except Exception:
self.log.exception("Failed to restore result data")
data[k] = None
# MODEL_API >= 5; override with this if present.
for job_data_key in self.job_data_attributes:
job_data = data.pop(job_data_key, None)
if job_data:
@ -4133,10 +4079,6 @@ class Build(zkobject.ZKObject):
data['_' + job_data_key] = JobData.fromZK(
context, job_data['path'])
# MODEL_API < 14
if 'pre_fail' not in data:
data['pre_fail'] = False
return data
def getPath(self):
@ -4631,12 +4573,6 @@ class BuildSet(zkobject.ZKObject):
return data
def updateBuildVersion(self, context, build):
# It's tempting to update versions regardless of the model
# API, but if we start writing versions before all components
# are upgraded we could get out of sync.
if (COMPONENT_REGISTRY.model_api < 12):
return True
# It is common for a lot of builds/jobs to be added at once,
# so to avoid writing this buildset object repeatedly during
# that time, we only update the version after the initial
@ -4648,26 +4584,17 @@ class BuildSet(zkobject.ZKObject):
self.updateAttributes(context, build_versions=self.build_versions)
def updateJobVersion(self, context, job):
if (COMPONENT_REGISTRY.model_api < 12):
return True
version = job.getZKVersion()
if version is not None:
self.job_versions[job.uuid] = version + 1
self.updateAttributes(context, job_versions=self.job_versions)
def shouldRefreshBuild(self, build, build_versions):
# Unless all schedulers are updating versions, we can't trust
# the data.
if (COMPONENT_REGISTRY.model_api < 12):
return True
current = build.getZKVersion()
expected = build_versions.get(build.uuid, 0)
return expected != current
def shouldRefreshJob(self, job, job_versions):
if (COMPONENT_REGISTRY.model_api < 12):
return True
current = job.getZKVersion()
expected = job_versions.get(job.uuid, 0)
return expected != current
@ -4925,8 +4852,7 @@ class QueueItem(zkobject.ZKObject):
def new(klass, context, **kw):
obj = klass()
obj._set(**kw)
if COMPONENT_REGISTRY.model_api >= 13:
obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
obj._set(event=obj.event and EventInfo.fromEvent(obj.event))
data = obj._trySerialize(context)
obj._save(context, data, create=True)
@ -4962,18 +4888,12 @@ class QueueItem(zkobject.ZKObject):
return (tenant, pipeline, uuid)
def serialize(self, context):
if COMPONENT_REGISTRY.model_api < 13:
if isinstance(self.event, TriggerEvent):
event_type = "TriggerEvent"
else:
event_type = self.event.__class__.__name__
else:
event_type = "EventInfo"
if not isinstance(self.event, EventInfo):
# Convert our local trigger event to a trigger info
# object. This will only happen on the transition to
# model API version 13.
self._set(event=EventInfo.fromEvent(self.event))
event_type = "EventInfo"
if not isinstance(self.event, EventInfo):
# Convert our local trigger event to a trigger info
# object. This will only happen on the transition to
# model API version 13.
self._set(event=EventInfo.fromEvent(self.event))
data = {
"uuid": self.uuid,
@ -5013,24 +4933,7 @@ class QueueItem(zkobject.ZKObject):
# child objects.
self._set(uuid=data["uuid"])
if COMPONENT_REGISTRY.model_api < 13:
event_type = data["event"]["type"]
if event_type == "TriggerEvent":
event_class = (
self.pipeline.manager.sched.connections
.getTriggerEventClass(
data["event"]["data"]["driver_name"])
)
else:
event_class = EventTypeIndex.event_type_mapping.get(event_type)
else:
event_class = EventInfo
if event_class is None:
raise NotImplementedError(
f"Event type {event_type} not deserializable")
event = event_class.fromDict(data["event"]["data"])
event = EventInfo.fromDict(data["event"]["data"])
changes = self.pipeline.manager.resolveChangeReferences(
data["changes"])
build_set = self.current_build_set
@ -5433,8 +5336,6 @@ class QueueItem(zkobject.ZKObject):
return True
try:
data = None
if COMPONENT_REGISTRY.model_api < 20:
data = []
ret = self.item_ahead.providesRequirements(job, data=data)
if data:
data.reverse()
@ -5496,33 +5397,6 @@ class QueueItem(zkobject.ZKObject):
break
if all_parent_jobs_successful:
# Iterate over all jobs of the graph (which is
# in sorted config order) and apply parent data of the jobs we
# already found.
if (parent_builds_with_data
and COMPONENT_REGISTRY.model_api < 20):
# We have all of the parent data here, so we can
# start from scratch each time.
new_parent_data = {}
new_secret_parent_data = {}
# We may have artifact data from
# jobRequirementsReady, so we preserve it.
# updateParentData de-duplicates it.
new_artifact_data = job.artifact_data or []
for parent_job in job_graph.getJobs():
parent_build = parent_builds_with_data.get(
parent_job.uuid)
if parent_build:
(new_parent_data,
new_secret_parent_data,
new_artifact_data) = FrozenJob.updateParentData(
new_parent_data,
new_secret_parent_data,
new_artifact_data,
parent_build)
job.setParentData(new_parent_data,
new_secret_parent_data,
new_artifact_data)
job._set(_ready_to_run=True)
def getArtifactData(self, job):
@ -7629,10 +7503,7 @@ class UnparsedAbideConfig(object):
"semaphores": self.semaphores,
"api_roots": self.api_roots,
}
if (COMPONENT_REGISTRY.model_api < 10):
d["admin_rules"] = self.authz_rules
else:
d["authz_rules"] = self.authz_rules
d["authz_rules"] = self.authz_rules
return d
@classmethod

View File

@ -2855,9 +2855,8 @@ class Scheduler(threading.Thread):
args = {}
if 'url' in event.data:
args['url'] = event.data['url']
if (COMPONENT_REGISTRY.model_api >= 14):
if 'pre_fail' in event.data:
args['pre_fail'] = event.data['pre_fail']
if 'pre_fail' in event.data:
args['pre_fail'] = event.data['pre_fail']
build.updateAttributes(pipeline.manager.current_context,
**args)

View File

@ -20,7 +20,6 @@ import json
from zuul.zk.zkobject import ZKContext, ShardedZKObject
from zuul.zk.locks import SessionAwareReadLock, SessionAwareWriteLock, locked
from zuul.zk.components import COMPONENT_REGISTRY
from zuul import model
from kazoo.exceptions import NoNodeError
@ -75,15 +74,9 @@ class BranchCacheZKObject(ShardedZKObject):
data = {
"protected": self.protected,
"remainder": self.remainder,
"merge_modes": self.merge_modes,
"default_branch": self.default_branch,
}
# This is mostly here to enable unit tests of upgrades, it's
# safe to move into the dict above at any time.
if (COMPONENT_REGISTRY.model_api >= 11):
data["merge_modes"] = self.merge_modes
# This is mostly here to enable unit tests of upgrades, it's
# safe to move into the dict above at any time.
if (COMPONENT_REGISTRY.model_api >= 16):
data["default_branch"] = self.default_branch
return json.dumps(data, sort_keys=True).encode("utf8")
def deserialize(self, raw, context):

View File

@ -22,7 +22,6 @@ import time
from kazoo.exceptions import NoNodeError
from zuul.zk.components import COMPONENT_REGISTRY
from zuul.zk import sharding, ZooKeeperBase
@ -169,8 +168,6 @@ class LayoutStateStore(ZooKeeperBase, MutableMapping):
return zstat.children_count
def getMinLtimes(self, layout_state):
if COMPONENT_REGISTRY.model_api < 6:
return None
try:
path = f"{self.layout_data_root}/{layout_state.uuid}"
with sharding.BufferedShardReader(