Attach event to queue item

As a preparation to further attach the event id to the logs we need to
add the trigger event to the queue items so we can use it later.

Change-Id: I9cc87e5be9a391a53a0dcfd92a990ca848616ca0
This commit is contained in:
Tobias Henkel 2019-05-12 10:11:23 +02:00
parent 2e08cd812e
commit 8b455adca9
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
7 changed files with 26 additions and 24 deletions

View File

@ -200,7 +200,7 @@ class TestJob(BaseTestCase):
change = model.Change(self.project)
change.branch = 'master'
item = queue.enqueueChange(change)
item = queue.enqueueChange(change, None)
item.layout = self.layout
self.assertTrue(base.changeMatchesBranch(change))
@ -214,7 +214,7 @@ class TestJob(BaseTestCase):
self.assertEqual(job.timeout, 70)
change.branch = 'stable/diablo'
item = queue.enqueueChange(change)
item = queue.enqueueChange(change, None)
item.layout = self.layout
self.assertTrue(base.changeMatchesBranch(change))
@ -263,7 +263,7 @@ class TestJob(BaseTestCase):
change = model.Change(self.project)
change.branch = 'master'
change.files = ['/COMMIT_MSG', 'ignored-file']
item = queue.enqueueChange(change)
item = queue.enqueueChange(change, None)
item.layout = self.layout
self.assertTrue(base.changeMatchesFiles(change))
@ -332,7 +332,7 @@ class TestJob(BaseTestCase):
change = model.Change(self.project)
# Test master
change.branch = 'master'
item = self.queue.enqueueChange(change)
item = self.queue.enqueueChange(change, None)
item.layout = self.layout
with testtools.ExpectedException(
Exception,

View File

@ -179,11 +179,11 @@ class PipelineManager(object):
def isChangeReadyToBeEnqueued(self, change):
return True
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None):
return True
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
def enqueueChangesBehind(self, change, event, quiet, ignore_requirements,
change_queue):
return True
@ -269,7 +269,7 @@ class PipelineManager(object):
item.change.project)
return False
def addChange(self, change, quiet=False, enqueue_time=None,
def addChange(self, change, event, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
change_queue=None, history=None):
self.log.debug("Considering adding change %s" % change)
@ -307,7 +307,8 @@ class PipelineManager(object):
(change, change.project))
return False
if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
if not self.enqueueChangesAhead(change, event, quiet,
ignore_requirements,
change_queue, history=history):
self.log.debug("Failed to enqueue changes "
"ahead of %s" % change)
@ -320,14 +321,14 @@ class PipelineManager(object):
self.log.info("Adding change %s to queue %s in %s" %
(change, change_queue, self.pipeline))
item = change_queue.enqueueChange(change)
item = change_queue.enqueueChange(change, event)
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
self.reportStats(item)
item.quiet = quiet
self.enqueueChangesBehind(change, quiet, ignore_requirements,
change_queue)
self.enqueueChangesBehind(change, event, quiet,
ignore_requirements, change_queue)
zuul_driver = self.sched.connections.drivers['zuul']
tenant = self.pipeline.tenant
zuul_driver.onChangeEnqueued(tenant, item.change, self.pipeline)

View File

@ -101,7 +101,7 @@ class DependentPipelineManager(PipelineManager):
return False
return True
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
def enqueueChangesBehind(self, change, event, quiet, ignore_requirements,
change_queue):
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
@ -145,11 +145,11 @@ class DependentPipelineManager(PipelineManager):
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change, quiet=quiet,
self.addChange(other_change, event, quiet=quiet,
ignore_requirements=ignore_requirements,
change_queue=change_queue)
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None):
if history and change in history:
# detected dependency cycle
@ -168,7 +168,7 @@ class DependentPipelineManager(PipelineManager):
self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
for needed_change in ret:
r = self.addChange(needed_change, quiet=quiet,
r = self.addChange(needed_change, event, quiet=quiet,
ignore_requirements=ignore_requirements,
change_queue=change_queue, history=history)
if not r:

View File

@ -33,7 +33,7 @@ class IndependentPipelineManager(PipelineManager):
self.log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None):
if history and change in history:
# detected dependency cycle
@ -57,7 +57,7 @@ class IndependentPipelineManager(PipelineManager):
# have jobs run. Also, pipeline requirements are always
# ignored (which is safe because the changes are not
# live).
r = self.addChange(needed_change, quiet=True,
r = self.addChange(needed_change, event, quiet=True,
ignore_requirements=True,
live=False, change_queue=change_queue,
history=history)

View File

@ -409,8 +409,8 @@ class ChangeQueue(object):
if not self.name:
self.name = project.name
def enqueueChange(self, change):
item = QueueItem(self, change)
def enqueueChange(self, change, event):
item = QueueItem(self, change, event)
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
@ -2072,7 +2072,7 @@ class QueueItem(object):
"""
log = logging.getLogger("zuul.QueueItem")
def __init__(self, queue, change):
def __init__(self, queue, change, event):
self.pipeline = queue.pipeline
self.queue = queue
self.change = change # a ref
@ -2091,6 +2091,7 @@ class QueueItem(object):
self.project_pipeline_config = None
self.job_graph = None
self._cached_sql_results = {}
self.event = event # The trigger event that lead to this queue item
def __repr__(self):
if self.pipeline:

View File

@ -466,7 +466,7 @@ class RPCListener(object):
change = model.Branch(project)
change.branch = args.get("branch", "master")
queue = model.ChangeQueue(pipeline)
item = model.QueueItem(queue, change)
item = model.QueueItem(queue, change, None)
item.layout = tenant.layout
item.freezeJobGraph(skip_file_matcher=True)

View File

@ -929,7 +929,7 @@ class Scheduler(threading.Thread):
pipeline.manager.dequeueItem(item)
for item in items_to_enqueue:
pipeline.manager.addChange(
item.change,
item.change, event,
enqueue_time=item.enqueue_time,
quiet=True,
ignore_requirements=True)
@ -969,7 +969,7 @@ class Scheduler(threading.Thread):
change = project.source.getChange(event, project)
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
pipeline.manager.addChange(change, ignore_requirements=True)
pipeline.manager.addChange(change, event, ignore_requirements=True)
def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete")
@ -1110,7 +1110,7 @@ class Scheduler(threading.Thread):
elif event.isChangeAbandoned():
pipeline.manager.removeAbandonedChange(change)
if pipeline.manager.eventMatches(event, change):
pipeline.manager.addChange(change)
pipeline.manager.addChange(change, event)
finally:
self.trigger_event_queue.task_done()