Fix job contamination by unmerged change

We had cases where zuul used unmerged job descriptions to a trusted
parent job (change A) in non related downstream jobs (change B) not
having zuul.yaml changes. This happened if the trusted parent job is
not defined in the same config repo as the pipeline. E.g. if change A
adds a new post playbook an unrelated change B fails with 'post
playbook not found'. This is caused by the scheduler using the wrong
unmerged job definition of change A but the final workspace contains
the correct state without change A.

In case of change B there is no dynamic layout and the current active
layout should be taken. However it is taken directly from the pipeline
object in getLayout (item.queue.pipeline.layout) which doesn't have
the correct layout referenced at any time while the layout referenced
by the tenant object is correct.

Because the pipeline definition is in a different repository than the
proposed config repo change, when the dynamic layout is created for
the config repo change, the previously cached Pipeline objects are used
to build the layout.  These objects are the actual live pipelines, and
when they are added to the layout, they have their Pipeline.layout
attributes set to the dynamic layout.  This dynamic layout is then not
used further (it is only created for syntax validation), but the pipelines
remain altered.

We could go ahead and just change that to
item.queue.pipeline.layout.tenant.layout but this feels awkward and
would leave the possibility of similar bugs that are hard to find and
debug. Further pipeline.layout is almost everywhere just used to get
the tenant and not the layout. So this attempt to fix this bug goes
further and completely rips out the layout from the Pipeline object
and replaces it by the tenant. Because the tenant object is never
expected to change during the lifetime of the pipeline object, holding
the reference to the tenant, rather than the layout, is safe.

Change-Id: I1e663f624db5e30a8f51b56134c37cc6e8217029
This commit is contained in:
Tobias Henkel 2018-07-09 08:32:11 +02:00
parent 3cf229c2d4
commit 3b0c37ba66
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
17 changed files with 148 additions and 30 deletions

View File

@ -0,0 +1,16 @@
- pipeline:
name: check
manager: independent
trigger:
github:
- event: pull_request
action:
- opened
success:
github:
comment: true
status: success
failure:
github:
comment: true
status: failure

View File

@ -0,0 +1,5 @@
- job:
name: base
parent: null
vars:
basevar: basejob

View File

@ -0,0 +1,4 @@
- project:
check:
jobs:
- noop

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,6 @@
- hosts: all
tasks:
- name: Test basevar
assert:
that:
- basevar == 'basejob'

View File

@ -0,0 +1,12 @@
- job:
name: python27
nodeset:
nodes:
- name: controller
label: ubuntu-trusty
run: playbooks/python27.yaml
- project:
check:
jobs:
- python27

View File

@ -0,0 +1,10 @@
- tenant:
name: tenant-one
source:
github:
config-projects:
- org/global-config
- org/common-config
untrusted-projects:
- org/project1

View File

@ -406,7 +406,7 @@ class TestTimeDataBase(BaseTestCase):
self.db = model.TimeDataBase(self.tmp_root)
def test_timedatabase(self):
pipeline = Dummy(layout=Dummy(tenant=Dummy(name='test-tenant')))
pipeline = Dummy(tenant=Dummy(name='test-tenant'))
change = Dummy(project=Dummy(canonical_name='git.example.com/foo/bar'))
job = Dummy(name='job-name')
item = Dummy(pipeline=pipeline,

View File

@ -1986,6 +1986,70 @@ class TestInRepoConfig(ZuulTestCase):
A.messages[0], "A should have debug info")
class TestJobContamination(AnsibleZuulTestCase):
config_file = 'zuul-connections-gerrit-and-github.conf'
tenant_config_file = 'config/zuul-job-contamination/main.yaml'
def test_job_contamination_playbooks(self):
conf = textwrap.dedent(
"""
- job:
name: base
post-run:
- playbooks/something-new.yaml
parent: null
vars:
basevar: basejob
""")
file_dict = {'zuul.d/jobs.yaml': conf}
A = self.fake_github.openFakePullRequest(
'org/global-config', 'master', 'A', files=file_dict)
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
self.waitUntilSettled()
B = self.fake_github.openFakePullRequest('org/project1', 'master', 'A')
self.fake_github.emitEvent(B.getPullRequestOpenedEvent())
self.waitUntilSettled()
statuses_b = self.fake_github.getCommitStatuses(
'org/project1', B.head_sha)
self.assertEqual(len(statuses_b), 1)
# B should not be affected by the A PR
self.assertEqual('success', statuses_b[0]['state'])
def test_job_contamination_vars(self):
conf = textwrap.dedent(
"""
- job:
name: base
parent: null
vars:
basevar: basejob-modified
""")
file_dict = {'zuul.d/jobs.yaml': conf}
A = self.fake_github.openFakePullRequest(
'org/global-config', 'master', 'A', files=file_dict)
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
self.waitUntilSettled()
B = self.fake_github.openFakePullRequest('org/project1', 'master', 'A')
self.fake_github.emitEvent(B.getPullRequestOpenedEvent())
self.waitUntilSettled()
statuses_b = self.fake_github.getCommitStatuses(
'org/project1', B.head_sha)
self.assertEqual(len(statuses_b), 1)
# B should not be affected by the A PR
self.assertEqual('success', statuses_b[0]['state'])
class TestInRepoJoin(ZuulTestCase):
# In this config, org/project is not a member of any pipelines, so
# that we may test the changes that cause it to join them.

View File

@ -1722,6 +1722,8 @@ class TenantParser(object):
# reference_exceptions has it; add tests if needed.
if not skip_pipelines:
for pipeline in parsed_config.pipelines:
if not pipeline.tenant:
pipeline.tenant = tenant
layout.addPipeline(pipeline)
for nodeset in parsed_config.nodesets:

View File

@ -29,7 +29,7 @@ class MQTTReporter(BaseReporter):
(item.change, self.config))
message = {
'action': self._action,
'tenant': item.pipeline.layout.tenant.name,
'tenant': item.pipeline.tenant.name,
'zuul_ref': item.current_build_set.ref,
'pipeline': item.pipeline.name,
'project': item.change.project.name,
@ -65,7 +65,7 @@ class MQTTReporter(BaseReporter):
topic = None
try:
topic = self.config['topic'].format(
tenant=item.pipeline.layout.tenant.name,
tenant=item.pipeline.tenant.name,
pipeline=item.pipeline.name,
project=item.change.project.name,
branch=getattr(item.change, 'branch', None),

View File

@ -52,7 +52,7 @@ class SQLReporter(BaseReporter):
result=item.current_build_set.result,
message=self._formatItemReport(
item, with_jobs=False),
tenant=item.pipeline.layout.tenant.name,
tenant=item.pipeline.tenant.name,
branch=branch,
)
buildset_ins_result = conn.execute(buildset_ins)

View File

@ -135,7 +135,7 @@ class ExecutorClient(object):
def execute(self, job, item, pipeline, dependent_changes=[],
merger_items=[]):
tenant = pipeline.layout.tenant
tenant = pipeline.tenant
uuid = str(uuid4().hex)
nodeset = item.current_build_set.getJobNodeSet(job.name)
self.log.info(

View File

@ -278,7 +278,7 @@ class PipelineManager(object):
self.enqueueChangesBehind(change, quiet, ignore_requirements,
change_queue)
zuul_driver = self.sched.connections.drivers['zuul']
tenant = self.pipeline.layout.tenant
tenant = self.pipeline.tenant
zuul_driver.onChangeEnqueued(tenant, item.change, self.pipeline)
return True
@ -353,7 +353,7 @@ class PipelineManager(object):
# If we hit an exception we don't have a build in the
# current item so a potentially aquired semaphore must be
# released as it won't be released on dequeue of the item.
tenant = item.pipeline.layout.tenant
tenant = item.pipeline.tenant
tenant.semaphore_handler.release(item, job)
except Exception:
self.log.exception("Exception while releasing semaphore")
@ -365,7 +365,7 @@ class PipelineManager(object):
return False
jobs = item.findJobsToRun(
item.pipeline.layout.tenant.semaphore_handler)
item.pipeline.tenant.semaphore_handler)
if jobs:
self._executeJobs(item, jobs)
@ -390,7 +390,7 @@ class PipelineManager(object):
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, item.change))
finally:
tenant = old_build_set.item.pipeline.layout.tenant
tenant = old_build_set.item.pipeline.tenant
tenant.semaphore_handler.release(
old_build_set.item, build.job)
@ -430,7 +430,7 @@ class PipelineManager(object):
if trusted_updates:
self.log.debug("Loading dynamic layout (phase 1)")
layout = loader.createDynamicLayout(
item.pipeline.layout.tenant,
item.pipeline.tenant,
build_set.files,
include_config_projects=True)
if not len(layout.loading_errors):
@ -441,7 +441,7 @@ class PipelineManager(object):
if untrusted_updates:
self.log.debug("Loading dynamic layout (phase 2)")
layout = loader.createDynamicLayout(
item.pipeline.layout.tenant,
item.pipeline.tenant,
build_set.files,
include_config_projects=False)
else:
@ -449,7 +449,7 @@ class PipelineManager(object):
# config items ahead), so just use the current pipeline
# layout.
if not len(layout.loading_errors):
return item.queue.pipeline.layout
return item.queue.pipeline.tenant.layout
if len(layout.loading_errors):
self.log.info("Configuration syntax error in dynamic layout")
if trusted_layout_verified:
@ -498,7 +498,7 @@ class PipelineManager(object):
def getLayout(self, item):
if not self._queueUpdatesConfig(item):
# No config updates in queue. Use existing pipeline layout
return item.queue.pipeline.layout
return item.queue.pipeline.tenant.layout
elif (not item.change.updatesConfig() and
item.item_ahead and item.item_ahead.live):
# Current change does not update layout, use its parent if parent
@ -710,7 +710,7 @@ class PipelineManager(object):
self.log.debug("Build %s of %s completed" % (build, item.change))
item.setResult(build)
item.pipeline.layout.tenant.semaphore_handler.release(item, build.job)
item.pipeline.tenant.semaphore_handler.release(item, build.job)
self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus()))
@ -776,7 +776,7 @@ class PipelineManager(object):
(change_queue, change_queue.window))
zuul_driver = self.sched.connections.drivers['zuul']
tenant = self.pipeline.layout.tenant
tenant = self.pipeline.tenant
zuul_driver.onChangeMerged(tenant, item.change, source)
def _reportItem(self, item):
@ -789,7 +789,7 @@ class PipelineManager(object):
# pipeline, use the dynamic layout if available, otherwise,
# fall back to the current static layout as a best
# approximation.
layout = (item.layout or self.pipeline.layout)
layout = (item.layout or self.pipeline.tenant.layout)
project_in_pipeline = True
if not layout.getProjectPipelineConfig(item):
@ -850,7 +850,7 @@ class PipelineManager(object):
dt = None
items = len(self.pipeline.getAllItems())
tenant = self.pipeline.layout.tenant
tenant = self.pipeline.tenant
basekey = 'zuul.tenant.%s' % tenant.name
key = '%s.pipeline.%s' % (basekey, self.pipeline.name)
# stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.resident_time

View File

@ -30,14 +30,13 @@ class DependentPipelineManager(PipelineManager):
def _postConfig(self, layout):
super(DependentPipelineManager, self)._postConfig(layout)
self.buildChangeQueues()
self.buildChangeQueues(layout)
def buildChangeQueues(self):
def buildChangeQueues(self, layout):
self.log.debug("Building shared change queues")
change_queues = {}
layout = self.pipeline.layout
tenant = self.pipeline.tenant
layout_project_configs = layout.project_configs
tenant = layout.tenant
for project_name, project_configs in layout_project_configs.items():
(trusted, project) = tenant.getProject(project_name)
@ -119,7 +118,7 @@ class DependentPipelineManager(PipelineManager):
self.log.debug(" Checking source: %s", source)
for c in source.getChangesDependingOn(change,
change_queue.projects,
self.pipeline.layout.tenant):
self.pipeline.tenant):
if c not in seen:
seen.add(c)
needed_by_changes.append(c)

View File

@ -184,7 +184,7 @@ class Pipeline(object):
self.name = name
self.tenant_name = tenant_name
self.source_context = None
self.layout = None
self.tenant = None
self.description = None
self.failure_message = None
self.merge_failure_message = None
@ -1670,7 +1670,7 @@ class BuildSet(object):
break
item = item.item_ahead
if not layout:
layout = self.item.pipeline.layout
layout = self.item.pipeline.tenant.layout
if layout:
project = self.item.change.project
project_metadata = layout.getProjectMetadata(
@ -1838,7 +1838,7 @@ class QueueItem(object):
def includesConfigUpdates(self):
includes_trusted = False
includes_untrusted = False
tenant = self.pipeline.layout.tenant
tenant = self.pipeline.tenant
item = self
while item:
if item.change.updatesConfig():
@ -2013,7 +2013,7 @@ class QueueItem(object):
# secrets, etc.
safe_change = self.change.getSafeAttributes()
safe_pipeline = self.pipeline.getSafeAttributes()
safe_tenant = self.pipeline.layout.tenant.getSafeAttributes()
safe_tenant = self.pipeline.tenant.getSafeAttributes()
safe_buildset = self.current_build_set.getSafeAttributes()
safe_job = job.getSafeAttributes() if job else {}
safe_build = build.getSafeAttributes() if build else {}
@ -2990,7 +2990,6 @@ class Layout(object):
def addPipeline(self, pipeline):
self.pipelines[pipeline.name] = pipeline
pipeline.layout = self
def addProjectTemplate(self, project_template):
template_list = self.project_templates.get(project_template.name)
@ -3626,7 +3625,7 @@ class TimeDataBase(object):
dir_path = os.path.join(
self.root,
build.build_set.item.pipeline.layout.tenant.name,
build.build_set.item.pipeline.tenant.name,
build.build_set.item.change.project.canonical_name,
branch)
if not os.path.exists(dir_path):

View File

@ -378,7 +378,7 @@ class Scheduler(threading.Thread):
build.result = result
try:
if self.statsd and build.pipeline:
tenant = build.pipeline.layout.tenant
tenant = build.pipeline.tenant
jobname = build.job.name.replace('.', '_').replace('/', '_')
hostname = (build.build_set.item.change.project.
canonical_hostname.replace('.', '_'))
@ -1028,7 +1028,7 @@ class Scheduler(threading.Thread):
def _getAutoholdRequestKey(self, build):
change = build.build_set.item.change
autohold_key_base = (build.pipeline.layout.tenant.name,
autohold_key_base = (build.pipeline.tenant.name,
change.project.canonical_name,
build.job.name)