Remove the scheduler queue lock.
Instead, use queue task_done calls to indicate that the scheduler has finished processing events. This lets the tests know when the queues are both empty and all requests have been handled. Add a lock around reporting complete events in fake jenkins jobs so that waitUntilSettled can be assured that no new events will arrive. Directly report LOST builds when a job doesn't exist, rather than spawning a new thread (which was only done to work around the lock). Change-Id: I32ad46648c82d7458fb5be779c62ac5b57857674 Reviewed-on: https://review.openstack.org/19330 Reviewed-by: Clark Boylan <clark.boylan@gmail.com> Approved: James E. Blair <corvus@inaugust.com> Tested-by: Jenkins
This commit is contained in:
parent
a7c5aa3fa6
commit
ff79197eba
@ -459,6 +459,7 @@ class FakeJenkinsJob(threading.Thread):
|
||||
|
||||
self.jenkins.fakeAddHistory(name=self.name, number=self.number,
|
||||
result=result)
|
||||
self.jenkins.lock.acquire()
|
||||
self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name,
|
||||
self.number,
|
||||
self.parameters,
|
||||
@ -470,6 +471,7 @@ class FakeJenkinsJob(threading.Thread):
|
||||
'FINISHED',
|
||||
result))
|
||||
self.jenkins.all_jobs.remove(self)
|
||||
self.jenkins.lock.release()
|
||||
|
||||
|
||||
class FakeJenkins(object):
|
||||
@ -485,6 +487,7 @@ class FakeJenkins(object):
|
||||
self.hold_jobs_in_build = False
|
||||
self.fail_tests = {}
|
||||
self.nonexistent_jobs = []
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def fakeEnqueue(self, job):
|
||||
self.queue.append(job)
|
||||
@ -691,17 +694,22 @@ class testScheduler(unittest.TestCase):
|
||||
print self.sched.result_event_queue.empty(),
|
||||
print self.fake_gerrit.event_queue.empty(),
|
||||
raise Exception("Timeout waiting for Zuul to settle")
|
||||
# Make sure our fake jenkins doesn't end any jobs
|
||||
# (and therefore, emit events) while we're checking
|
||||
self.fake_jenkins.lock.acquire()
|
||||
# Join ensures that the queue is empty _and_ events have been
|
||||
# processed
|
||||
self.fake_gerrit.event_queue.join()
|
||||
self.sched.queue_lock.acquire()
|
||||
self.sched.trigger_event_queue.join()
|
||||
self.sched.result_event_queue.join()
|
||||
if (self.sched.trigger_event_queue.empty() and
|
||||
self.sched.result_event_queue.empty() and
|
||||
self.fake_gerrit.event_queue.empty() and
|
||||
len(self.jenkins.lost_threads) == 0 and
|
||||
self.fake_jenkins.fakeAllWaiting()):
|
||||
self.sched.queue_lock.release()
|
||||
self.fake_jenkins.lock.release()
|
||||
self.log.debug("...settled.")
|
||||
return
|
||||
self.sched.queue_lock.release()
|
||||
self.fake_jenkins.lock.release()
|
||||
self.sched.wake_event.wait(0.1)
|
||||
|
||||
def countJobResults(self, jobs, result):
|
||||
|
@ -219,9 +219,6 @@ class Jenkins(object):
|
||||
self.callback_thread.start()
|
||||
self.cleanup_thread = JenkinsCleanup(self)
|
||||
self.cleanup_thread.start()
|
||||
# Keep track of threads that will report a lost build in the future,
|
||||
# in aid of testing
|
||||
self.lost_threads = []
|
||||
|
||||
def stop(self):
|
||||
self.cleanup_thread.stop()
|
||||
@ -327,18 +324,9 @@ class Jenkins(object):
|
||||
"declaring lost" % build)
|
||||
# To keep the queue moving, declare this as a lost build
|
||||
# so that the change will get dropped.
|
||||
t = threading.Thread(target=self.declareBuildLost,
|
||||
args=(build,))
|
||||
self.lost_threads.append(t)
|
||||
t.start()
|
||||
self.onBuildCompleted(build.uuid, 'LOST', None, None)
|
||||
return build
|
||||
|
||||
def declareBuildLost(self, build):
|
||||
# Call this from a new thread to invoke onBuildCompleted from
|
||||
# a thread that has the queue lock.
|
||||
self.onBuildCompleted(build.uuid, 'LOST', None, None)
|
||||
self.lost_threads.remove(threading.currentThread())
|
||||
|
||||
def findBuildInQueue(self, build):
|
||||
for item in self.jenkins.get_queue_info():
|
||||
if 'actions' not in item:
|
||||
|
@ -37,7 +37,6 @@ class Scheduler(threading.Thread):
|
||||
threading.Thread.__init__(self)
|
||||
self.wake_event = threading.Event()
|
||||
self.reconfigure_complete_event = threading.Event()
|
||||
self.queue_lock = threading.Lock()
|
||||
self._pause = False
|
||||
self._reconfigure = False
|
||||
self._exit = False
|
||||
@ -223,18 +222,14 @@ class Scheduler(threading.Thread):
|
||||
statsd.incr('gerrit.event.%s' % event.type)
|
||||
except:
|
||||
self.log.exception("Exception reporting event stats")
|
||||
self.queue_lock.acquire()
|
||||
self.trigger_event_queue.put(event)
|
||||
self.queue_lock.release()
|
||||
self.wake_event.set()
|
||||
self.log.debug("Done adding trigger event: %s" % event)
|
||||
|
||||
def onBuildStarted(self, build):
|
||||
self.log.debug("Adding start event for build: %s" % build)
|
||||
build.start_time = time.time()
|
||||
self.queue_lock.acquire()
|
||||
self.result_event_queue.put(('started', build))
|
||||
self.queue_lock.release()
|
||||
self.wake_event.set()
|
||||
self.log.debug("Done adding start event for build: %s" % build)
|
||||
|
||||
@ -250,9 +245,7 @@ class Scheduler(threading.Thread):
|
||||
statsd.incr(key)
|
||||
except:
|
||||
self.log.exception("Exception reporting runtime stats")
|
||||
self.queue_lock.acquire()
|
||||
self.result_event_queue.put(('completed', build))
|
||||
self.queue_lock.release()
|
||||
self.wake_event.set()
|
||||
self.log.debug("Done adding complete event for build: %s" % build)
|
||||
|
||||
@ -360,7 +353,6 @@ class Scheduler(threading.Thread):
|
||||
if self._stopped:
|
||||
return
|
||||
self.log.debug("Run handler awake")
|
||||
self.queue_lock.acquire()
|
||||
try:
|
||||
if not self._pause:
|
||||
if not self.trigger_event_queue.empty():
|
||||
@ -381,7 +373,6 @@ class Scheduler(threading.Thread):
|
||||
self.wake_event.set()
|
||||
except:
|
||||
self.log.exception("Exception in run handler:")
|
||||
self.queue_lock.release()
|
||||
|
||||
def process_event_queue(self):
|
||||
self.log.debug("Fetching trigger event")
|
||||
@ -390,6 +381,7 @@ class Scheduler(threading.Thread):
|
||||
project = self.projects.get(event.project_name)
|
||||
if not project:
|
||||
self.log.warning("Project %s not found" % event.project_name)
|
||||
self.trigger_event_queue.task_done()
|
||||
return
|
||||
|
||||
for pipeline in self.pipelines.values():
|
||||
@ -400,6 +392,7 @@ class Scheduler(threading.Thread):
|
||||
self.log.info("Adding %s, %s to %s" %
|
||||
(project, change, pipeline))
|
||||
pipeline.manager.addChange(change)
|
||||
self.trigger_event_queue.task_done()
|
||||
|
||||
def process_result_queue(self):
|
||||
self.log.debug("Fetching result event")
|
||||
@ -408,11 +401,14 @@ class Scheduler(threading.Thread):
|
||||
for pipeline in self.pipelines.values():
|
||||
if event_type == 'started':
|
||||
if pipeline.manager.onBuildStarted(build):
|
||||
self.result_event_queue.task_done()
|
||||
return
|
||||
elif event_type == 'completed':
|
||||
if pipeline.manager.onBuildCompleted(build):
|
||||
self.result_event_queue.task_done()
|
||||
return
|
||||
self.log.warning("Build %s not found by any queue manager" % (build))
|
||||
self.result_event_queue.task_done()
|
||||
|
||||
def formatStatusHTML(self):
|
||||
ret = '<html><pre>'
|
||||
|
Loading…
Reference in New Issue
Block a user