diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index c199345d8f..20ad4ec3b0 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -459,6 +459,7 @@ class FakeJenkinsJob(threading.Thread): self.jenkins.fakeAddHistory(name=self.name, number=self.number, result=result) + self.jenkins.lock.acquire() self.callback.jenkins_endpoint(FakeJenkinsEvent(self.name, self.number, self.parameters, @@ -470,6 +471,7 @@ class FakeJenkinsJob(threading.Thread): 'FINISHED', result)) self.jenkins.all_jobs.remove(self) + self.jenkins.lock.release() class FakeJenkins(object): @@ -485,6 +487,7 @@ class FakeJenkins(object): self.hold_jobs_in_build = False self.fail_tests = {} self.nonexistent_jobs = [] + self.lock = threading.Lock() def fakeEnqueue(self, job): self.queue.append(job) @@ -691,17 +694,22 @@ class testScheduler(unittest.TestCase): print self.sched.result_event_queue.empty(), print self.fake_gerrit.event_queue.empty(), raise Exception("Timeout waiting for Zuul to settle") + # Make sure our fake jenkins doesn't end any jobs + # (and therefore, emit events) while we're checking + self.fake_jenkins.lock.acquire() + # Join ensures that the queue is empty _and_ events have been + # processed self.fake_gerrit.event_queue.join() - self.sched.queue_lock.acquire() + self.sched.trigger_event_queue.join() + self.sched.result_event_queue.join() if (self.sched.trigger_event_queue.empty() and self.sched.result_event_queue.empty() and self.fake_gerrit.event_queue.empty() and - len(self.jenkins.lost_threads) == 0 and self.fake_jenkins.fakeAllWaiting()): - self.sched.queue_lock.release() + self.fake_jenkins.lock.release() self.log.debug("...settled.") return - self.sched.queue_lock.release() + self.fake_jenkins.lock.release() self.sched.wake_event.wait(0.1) def countJobResults(self, jobs, result): diff --git a/zuul/launcher/jenkins.py b/zuul/launcher/jenkins.py index ee73f8ae2b..0a79e827b6 100644 --- a/zuul/launcher/jenkins.py +++ b/zuul/launcher/jenkins.py @@ -219,9 +219,6 @@ class Jenkins(object): self.callback_thread.start() self.cleanup_thread = JenkinsCleanup(self) self.cleanup_thread.start() - # Keep track of threads that will report a lost build in the future, - # in aid of testing - self.lost_threads = [] def stop(self): self.cleanup_thread.stop() @@ -327,18 +324,9 @@ class Jenkins(object): "declaring lost" % build) # To keep the queue moving, declare this as a lost build # so that the change will get dropped. - t = threading.Thread(target=self.declareBuildLost, - args=(build,)) - self.lost_threads.append(t) - t.start() + self.onBuildCompleted(build.uuid, 'LOST', None, None) return build - def declareBuildLost(self, build): - # Call this from a new thread to invoke onBuildCompleted from - # a thread that has the queue lock. - self.onBuildCompleted(build.uuid, 'LOST', None, None) - self.lost_threads.remove(threading.currentThread()) - def findBuildInQueue(self, build): for item in self.jenkins.get_queue_info(): if 'actions' not in item: diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 8ed369d137..e18035591e 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -37,7 +37,6 @@ class Scheduler(threading.Thread): threading.Thread.__init__(self) self.wake_event = threading.Event() self.reconfigure_complete_event = threading.Event() - self.queue_lock = threading.Lock() self._pause = False self._reconfigure = False self._exit = False @@ -223,18 +222,14 @@ class Scheduler(threading.Thread): statsd.incr('gerrit.event.%s' % event.type) except: self.log.exception("Exception reporting event stats") - self.queue_lock.acquire() self.trigger_event_queue.put(event) - self.queue_lock.release() self.wake_event.set() self.log.debug("Done adding trigger event: %s" % event) def onBuildStarted(self, build): self.log.debug("Adding start event for build: %s" % build) build.start_time = time.time() - self.queue_lock.acquire() self.result_event_queue.put(('started', build)) - self.queue_lock.release() self.wake_event.set() self.log.debug("Done adding start event for build: %s" % build) @@ -250,9 +245,7 @@ class Scheduler(threading.Thread): statsd.incr(key) except: self.log.exception("Exception reporting runtime stats") - self.queue_lock.acquire() self.result_event_queue.put(('completed', build)) - self.queue_lock.release() self.wake_event.set() self.log.debug("Done adding complete event for build: %s" % build) @@ -360,7 +353,6 @@ class Scheduler(threading.Thread): if self._stopped: return self.log.debug("Run handler awake") - self.queue_lock.acquire() try: if not self._pause: if not self.trigger_event_queue.empty(): @@ -381,7 +373,6 @@ class Scheduler(threading.Thread): self.wake_event.set() except: self.log.exception("Exception in run handler:") - self.queue_lock.release() def process_event_queue(self): self.log.debug("Fetching trigger event") @@ -390,6 +381,7 @@ class Scheduler(threading.Thread): project = self.projects.get(event.project_name) if not project: self.log.warning("Project %s not found" % event.project_name) + self.trigger_event_queue.task_done() return for pipeline in self.pipelines.values(): @@ -400,6 +392,7 @@ class Scheduler(threading.Thread): self.log.info("Adding %s, %s to %s" % (project, change, pipeline)) pipeline.manager.addChange(change) + self.trigger_event_queue.task_done() def process_result_queue(self): self.log.debug("Fetching result event") @@ -408,11 +401,14 @@ class Scheduler(threading.Thread): for pipeline in self.pipelines.values(): if event_type == 'started': if pipeline.manager.onBuildStarted(build): + self.result_event_queue.task_done() return elif event_type == 'completed': if pipeline.manager.onBuildCompleted(build): + self.result_event_queue.task_done() return self.log.warning("Build %s not found by any queue manager" % (build)) + self.result_event_queue.task_done() def formatStatusHTML(self): ret = '
'