Delay Github fileschanges workaround to pipeline processing
Github pull requests files API only returns at max the first 300 changed files of a PR in alphabetical order. Change I10a593e26ac85b8c12ca9c82051cad809382f50a introduced a workaround that queries the file list from the mergers within the github event loop. While this was a minimal invasive approach this can cause multi-minute delays in the github event queue. This can be fixed by making this query asynchronous and delaying it to the pipeline processing. This query is now handled the same way as merge requests. Change-Id: I9c77b35f0da4d892efc420370c04bcc7070c7676 Depends-On: https://review.openstack.org/625596
This commit is contained in:
parent
c933eac2ed
commit
8bfc0cd409
|
@ -1001,7 +1001,6 @@ class FakeGithubPullRequest(object):
|
|||
msg = self.subject + '-' + str(self.number_of_commits)
|
||||
for fn, content in self.files.items():
|
||||
fn = os.path.join(repo.working_dir, fn)
|
||||
f = open(fn, 'w')
|
||||
with open(fn, 'w') as f:
|
||||
f.write(content)
|
||||
repo.index.add([fn])
|
||||
|
|
|
@ -119,6 +119,42 @@ class TestGithubDriver(ZuulTestCase):
|
|||
self.waitUntilSettled()
|
||||
self.assertEqual(1, len(self.history))
|
||||
|
||||
@simple_layout('layouts/files-github.yaml', driver='github')
|
||||
def test_pull_changed_files_length_mismatch_reenqueue(self):
|
||||
# Hold jobs so we can trigger a reconfiguration while the item is in
|
||||
# the pipeline
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
files = {'{:03d}.txt'.format(n): 'test' for n in range(300)}
|
||||
# File 301 which is not included in the list of files of the PR,
|
||||
# since Github only returns max. 300 files in alphabetical order
|
||||
files["foobar-requires"] = "test"
|
||||
A = self.fake_github.openFakePullRequest(
|
||||
'org/project', 'master', 'A', files=files)
|
||||
|
||||
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Comment on the pull request to trigger updateChange
|
||||
self.fake_github.emitEvent(A.getCommentAddedEvent('casual comment'))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Trigger reconfig to enforce a reenqueue of the item
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Now we can release all jobs
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
# There must be exactly one successful job in the history. If there is
|
||||
# an aborted job in the history the reenqueue failed.
|
||||
self.assertHistory([
|
||||
dict(name='project-test1', result='SUCCESS',
|
||||
changes="%s,%s" % (A.number, A.head_sha)),
|
||||
])
|
||||
|
||||
@simple_layout('layouts/basic-github.yaml', driver='github')
|
||||
def test_pull_github_files_error(self):
|
||||
A = self.fake_github.openFakePullRequest(
|
||||
|
|
|
@ -904,24 +904,17 @@ class GithubConnection(BaseConnection):
|
|||
|
||||
return changes
|
||||
|
||||
def getFilesChanges(self, project_name, head, base):
|
||||
job = self.sched.merger.getFilesChanges(self.connection_name,
|
||||
project_name,
|
||||
head, base)
|
||||
self.log.debug("Waiting for fileschanges job %s", job)
|
||||
job.wait()
|
||||
if not job.updated:
|
||||
raise Exception("Fileschanges job {} failed".format(job))
|
||||
self.log.debug("Fileschanges job %s got changes on files %s",
|
||||
job, job.files)
|
||||
return job.files
|
||||
|
||||
def _updateChange(self, change):
|
||||
self.log.info("Updating %s" % (change,))
|
||||
change.pr = self.getPull(change.project.name, change.number)
|
||||
change.ref = "refs/pull/%s/head" % change.number
|
||||
change.branch = change.pr.get('base').get('ref')
|
||||
change.files = change.pr.get('files')
|
||||
|
||||
# Don't overwrite the files list. The change object is bound to a
|
||||
# specific revision and thus the changed files won't change. This is
|
||||
# important if we got the files later because of the 300 files limit.
|
||||
if not change.files:
|
||||
change.files = change.pr.get('files')
|
||||
# Github's pull requests files API only returns at max
|
||||
# the first 300 changed files of a PR in alphabetical order.
|
||||
# https://developer.github.com/v3/pulls/#list-pull-requests-files
|
||||
|
@ -929,10 +922,11 @@ class GithubConnection(BaseConnection):
|
|||
self.log.warning("Got only %s files but PR has %s files.",
|
||||
len(change.files),
|
||||
change.pr.get('changed_files', 0))
|
||||
change.files = self.getFilesChanges(
|
||||
change.project.name,
|
||||
change.ref,
|
||||
change.branch)
|
||||
# In this case explicitly set change.files to None to signalize
|
||||
# that we need to ask the mergers later in pipeline processing.
|
||||
# We cannot query the files here using the mergers because this
|
||||
# can slow down the github event queue considerably.
|
||||
change.files = None
|
||||
change.title = change.pr.get('title')
|
||||
change.open = change.pr.get('state') == 'open'
|
||||
change.is_merged = change.pr.get('merged')
|
||||
|
|
|
@ -578,8 +578,6 @@ class PipelineManager(object):
|
|||
return self._loadDynamicLayout(item)
|
||||
|
||||
def scheduleMerge(self, item, files=None, dirs=None):
|
||||
build_set = item.current_build_set
|
||||
|
||||
self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
|
||||
(item, files, dirs))
|
||||
build_set = item.current_build_set
|
||||
|
@ -594,23 +592,38 @@ class PipelineManager(object):
|
|||
precedence=self.pipeline.precedence)
|
||||
return False
|
||||
|
||||
def scheduleFilesChanges(self, item):
|
||||
self.log.debug("Scheduling fileschanged for item %s", item)
|
||||
build_set = item.current_build_set
|
||||
build_set.files_state = build_set.PENDING
|
||||
|
||||
self.sched.merger.getFilesChanges(
|
||||
item.change.project.connection_name, item.change.project.name,
|
||||
item.change.ref, item.change.branch, build_set=build_set)
|
||||
return False
|
||||
|
||||
def prepareItem(self, item):
|
||||
# This runs on every iteration of _processOneItem
|
||||
# Returns True if the item is ready, false otherwise
|
||||
ready = True
|
||||
build_set = item.current_build_set
|
||||
if not build_set.ref:
|
||||
build_set.setConfiguration()
|
||||
if build_set.merge_state == build_set.NEW:
|
||||
return self.scheduleMerge(item,
|
||||
files=['zuul.yaml', '.zuul.yaml'],
|
||||
dirs=['zuul.d', '.zuul.d'])
|
||||
ready = self.scheduleMerge(item,
|
||||
files=['zuul.yaml', '.zuul.yaml'],
|
||||
dirs=['zuul.d', '.zuul.d'])
|
||||
if build_set.files_state == build_set.NEW:
|
||||
ready = self.scheduleFilesChanges(item)
|
||||
if build_set.files_state == build_set.PENDING:
|
||||
ready = False
|
||||
if build_set.merge_state == build_set.PENDING:
|
||||
return False
|
||||
ready = False
|
||||
if build_set.unable_to_merge:
|
||||
return False
|
||||
ready = False
|
||||
if build_set.config_errors:
|
||||
return False
|
||||
return True
|
||||
ready = False
|
||||
return ready
|
||||
|
||||
def prepareJobs(self, item):
|
||||
# This only runs once the item is in the pipeline's action window
|
||||
|
@ -820,6 +833,12 @@ class PipelineManager(object):
|
|||
self._resumeBuilds(build.build_set)
|
||||
return True
|
||||
|
||||
def onFilesChangesCompleted(self, event):
|
||||
build_set = event.build_set
|
||||
item = build_set.item
|
||||
item.change.files = event.files
|
||||
build_set.files_state = build_set.COMPLETE
|
||||
|
||||
def onMergeCompleted(self, event):
|
||||
build_set = event.build_set
|
||||
item = build_set.item
|
||||
|
|
|
@ -132,12 +132,14 @@ class MergeClient(object):
|
|||
return job
|
||||
|
||||
def getFilesChanges(self, connection_name, project_name, branch,
|
||||
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH):
|
||||
tosha=None, precedence=zuul.model.PRECEDENCE_HIGH,
|
||||
build_set=None):
|
||||
data = dict(connection=connection_name,
|
||||
project=project_name,
|
||||
branch=branch,
|
||||
tosha=tosha)
|
||||
job = self.submitJob('merger:fileschanges', data, None, precedence)
|
||||
job = self.submitJob('merger:fileschanges', data, build_set,
|
||||
precedence)
|
||||
return job
|
||||
|
||||
def onBuildCompleted(self, job):
|
||||
|
@ -153,9 +155,13 @@ class MergeClient(object):
|
|||
(job, merged, job.updated, commit))
|
||||
job.setComplete()
|
||||
if job.build_set:
|
||||
self.sched.onMergeCompleted(job.build_set,
|
||||
merged, job.updated, commit, files,
|
||||
repo_state)
|
||||
if job.name == 'merger:fileschanges':
|
||||
self.sched.onFilesChangesCompleted(job.build_set, files)
|
||||
else:
|
||||
self.sched.onMergeCompleted(job.build_set,
|
||||
merged, job.updated, commit, files,
|
||||
repo_state)
|
||||
|
||||
# The test suite expects the job to be removed from the
|
||||
# internal account after the wake flag is set.
|
||||
self.jobs.remove(job)
|
||||
|
|
|
@ -1784,6 +1784,10 @@ class BuildSet(object):
|
|||
self.files = RepoFiles()
|
||||
self.repo_state = {}
|
||||
self.tries = {}
|
||||
if item.change.files is not None:
|
||||
self.files_state = self.COMPLETE
|
||||
else:
|
||||
self.files_state = self.NEW
|
||||
|
||||
@property
|
||||
def ref(self):
|
||||
|
@ -2582,6 +2586,11 @@ class Ref(object):
|
|||
return set()
|
||||
|
||||
def updatesConfig(self):
|
||||
if self.files is None:
|
||||
# If self.files is None we don't know if this change updates the
|
||||
# config so assume it does as this is a safe default if we don't
|
||||
# know.
|
||||
return True
|
||||
if 'zuul.yaml' in self.files or '.zuul.yaml' in self.files or \
|
||||
[True for fn in self.files if fn.startswith("zuul.d/") or
|
||||
fn.startswith(".zuul.d/")]:
|
||||
|
|
|
@ -215,6 +215,18 @@ class MergeCompletedEvent(ResultEvent):
|
|||
self.repo_state = repo_state
|
||||
|
||||
|
||||
class FilesChangesCompletedEvent(ResultEvent):
|
||||
"""A remote fileschanges operation has completed
|
||||
|
||||
:arg BuildSet build_set: The build_set which is ready.
|
||||
:arg list files: List of files changed.
|
||||
"""
|
||||
|
||||
def __init__(self, build_set, files):
|
||||
self.build_set = build_set
|
||||
self.files = files
|
||||
|
||||
|
||||
class NodesProvisionedEvent(ResultEvent):
|
||||
"""Nodes have been provisioned for a build_set
|
||||
|
||||
|
@ -475,6 +487,11 @@ class Scheduler(threading.Thread):
|
|||
self.result_event_queue.put(event)
|
||||
self.wake_event.set()
|
||||
|
||||
def onFilesChangesCompleted(self, build_set, files):
|
||||
event = FilesChangesCompletedEvent(build_set, files)
|
||||
self.result_event_queue.put(event)
|
||||
self.wake_event.set()
|
||||
|
||||
def onNodesProvisioned(self, req):
|
||||
event = NodesProvisionedEvent(req)
|
||||
self.result_event_queue.put(event)
|
||||
|
@ -1107,6 +1124,8 @@ class Scheduler(threading.Thread):
|
|||
self._doBuildCompletedEvent(event)
|
||||
elif isinstance(event, MergeCompletedEvent):
|
||||
self._doMergeCompletedEvent(event)
|
||||
elif isinstance(event, FilesChangesCompletedEvent):
|
||||
self._doFilesChangesCompletedEvent(event)
|
||||
elif isinstance(event, NodesProvisionedEvent):
|
||||
self._doNodesProvisionedEvent(event)
|
||||
else:
|
||||
|
@ -1264,6 +1283,18 @@ class Scheduler(threading.Thread):
|
|||
return
|
||||
pipeline.manager.onMergeCompleted(event)
|
||||
|
||||
def _doFilesChangesCompletedEvent(self, event):
|
||||
build_set = event.build_set
|
||||
if build_set is not build_set.item.current_build_set:
|
||||
self.log.warning("Build set %s is not current", build_set)
|
||||
return
|
||||
pipeline = build_set.item.pipeline
|
||||
if not pipeline:
|
||||
self.log.warning("Build set %s is not associated with a pipeline",
|
||||
build_set)
|
||||
return
|
||||
pipeline.manager.onFilesChangesCompleted(event)
|
||||
|
||||
def _doNodesProvisionedEvent(self, event):
|
||||
request = event.request
|
||||
request_id = event.request_id
|
||||
|
|
Loading…
Reference in New Issue