make enqueue_time passable to addChange

When doing a doPromote we should keep the enqueue times of items.
However, the teardown and rebuild of the queues means that items
are fully destroyed and created a new.

Change the addChange api call so that it takes an optional enqueue
time to set on the item, which it can do internally once the new
item has gotten created.

This should address the issue where enqueue_times get lost when
we do a promote of the queue.

Now with unit tests! (Also ensured that if I removed the scheduler
change these tests failed, so the test is testing a correct thing)

Change-Id: I4dce23d00128280c7add922ca9ef5018b57d1cf3
This commit is contained in:
Sean Dague 2014-01-10 21:34:35 -05:00
parent dda083b9fb
commit f39b9ca80c
2 changed files with 18 additions and 2 deletions

View File

@ -3153,11 +3153,22 @@ class TestScheduler(testtools.TestCase):
self.waitUntilSettled()
items = self.sched.layout.pipelines['gate'].getAllItems()
enqueue_times = {}
for item in items:
enqueue_times[str(item.change)] = item.enqueue_time
client = zuul.rpcclient.RPCClient('127.0.0.1',
self.gearman_server.port)
r = client.promote(pipeline='gate',
change_ids=['2,1', '3,1'])
# ensure that enqueue times are durable
items = self.sched.layout.pipelines['gate'].getAllItems()
for item in items:
self.assertEqual(
enqueue_times[str(item.change)], item.enqueue_time)
self.worker.release('.*-merge')
self.waitUntilSettled()
self.worker.release('.*-merge')

View File

@ -611,7 +611,10 @@ class Scheduler(threading.Thread):
pipeline.manager.cancelJobs(item)
pipeline.manager.dequeueItem(item)
for item in items_to_enqueue:
pipeline.manager.addChange(item.change, quiet=True)
pipeline.manager.addChange(
item.change,
enqueue_time=item.enqueue_time,
quiet=True)
while pipeline.manager.processQueue():
pass
@ -973,7 +976,7 @@ class BasePipelineManager(object):
item.change.project)
return False
def addChange(self, change, quiet=False):
def addChange(self, change, quiet=False, enqueue_time=None):
self.log.debug("Considering adding change %s" % change)
if self.isChangeAlreadyInQueue(change):
self.log.debug("Change %s is already in queue, ignoring" % change)
@ -1000,6 +1003,8 @@ class BasePipelineManager(object):
if len(self.pipeline.start_actions) > 0:
self.reportStart(change)
item = change_queue.enqueueChange(change)
if enqueue_time:
item.enqueue_time = enqueue_time
self.reportStats(item)
self.enqueueChangesBehind(change, quiet)
else: