diff --git a/tests/base.py b/tests/base.py index 8017c51349..e1e568f582 100755 --- a/tests/base.py +++ b/tests/base.py @@ -529,13 +529,12 @@ class FakeStatsd(threading.Thread): os.write(self.wake_write, '1\n') -class FakeBuild(threading.Thread): +class FakeBuild(object): log = logging.getLogger("zuul.test") - def __init__(self, worker, job, number, node): - threading.Thread.__init__(self) + def __init__(self, launch_server, job, number, node): self.daemon = True - self.worker = worker + self.launch_server = launch_server self.job = job self.number = number self.node = node @@ -547,6 +546,9 @@ class FakeBuild(threading.Thread): self.aborted = False self.created = time.time() self.run_error = False + self.changes = None + if 'ZUUL_CHANGE_IDS' in self.parameters: + self.changes = self.parameters['ZUUL_CHANGE_IDS'] def release(self): self.wait_condition.acquire() @@ -576,7 +578,7 @@ class FakeBuild(threading.Thread): 'url': 'https://server/job/%s/%s/' % (self.name, self.number), 'name': self.name, 'number': self.number, - 'manager': self.worker.worker_id, + 'manager': self.launch_server.worker.worker_id, 'worker_name': 'My Worker', 'worker_hostname': 'localhost', 'worker_ips': ['127.0.0.1', '192.168.1.1'], @@ -592,81 +594,80 @@ class FakeBuild(threading.Thread): self.log.debug('Sent WorkData packet with %s' % json.dumps(data)) self.job.sendWorkStatus(0, 100) - if self.worker.hold_jobs_in_build: + if self.launch_server.hold_jobs_in_build: self.log.debug('Holding build %s' % self.unique) self._wait() self.log.debug("Build %s continuing" % self.unique) - self.worker.lock.acquire() - result = 'SUCCESS' if (('ZUUL_REF' in self.parameters) and - self.worker.shouldFailTest(self.name, - self.parameters['ZUUL_REF'])): + self.launch_server.shouldFailTest(self.name, + self.parameters['ZUUL_REF'])): result = 'FAILURE' if self.aborted: result = 'ABORTED' if self.run_error: - work_fail = True result = 'RUN_ERROR' - else: - data['result'] = result - data['node_labels'] = ['bare-necessities'] - data['node_name'] = 'foo' - work_fail = False - changes = None - if 'ZUUL_CHANGE_IDS' in self.parameters: - changes = self.parameters['ZUUL_CHANGE_IDS'] - - self.worker.build_history.append( - BuildHistory(name=self.name, number=self.number, - result=result, changes=changes, node=self.node, - uuid=self.unique, parameters=self.parameters, - pipeline=self.parameters['ZUUL_PIPELINE']) - ) - - self.job.sendWorkData(json.dumps(data)) - if work_fail: - self.job.sendWorkFail() - else: - self.job.sendWorkComplete(json.dumps(data)) - del self.worker.gearman_jobs[self.job.unique] - self.worker.running_builds.remove(self) - self.worker.lock.release() + return result class RecordingLaunchServer(zuul.launcher.server.LaunchServer): def __init__(self, *args, **kw): + self._run_ansible = kw.pop('_run_ansible', False) super(RecordingLaunchServer, self).__init__(*args, **kw) - self.job_history = [] + self.hold_jobs_in_build = False + self.lock = threading.Lock() + self.running_builds = [] self.build_history = [] + self._build_counter_lock = threading.Lock() + self.build_counter = 0 + self.fail_tests = {} - def launch(self, job): - self.job_history.append(job) - job.data = [] + def addFailTest(self, name, change): + l = self.fail_tests.get(name, []) + l.append(change) + self.fail_tests[name] = l - def sendWorkComplete(data=b''): - job.data.append(data) - params = json.loads(job.arguments) - result = json.loads(job.data[-1]) - build = BuildHistory(job=job, - uuid=job.unique, - name=params['job'], - parameters=params, - result=result['result']) - self.build_history.append(build) - gear.WorkerJob.sendWorkComplete(job, data) + def shouldFailTest(self, name, ref): + l = self.fail_tests.get(name, []) + for change in l: + if self.test.ref_has_change(ref, change): + return True + return False - job.sendWorkComplete = sendWorkComplete - super(RecordingLaunchServer, self).launch(job) + def runAnsible(self, jobdir, job): + with self._build_counter_lock: + self.build_counter += 1 + build_counter = self.build_counter + node = None + build = FakeBuild(self, job, build_counter, node) + job.build = build + + self.running_builds.append(build) + + if self._run_ansible: + result = super(RecordingLaunchServer, self).runAnsible(jobdir, job) + else: + result = build.run() + + self.lock.acquire() + self.build_history.append( + BuildHistory(name=build.name, number=build.number, + result=result, changes=build.changes, node=build.node, + uuid=build.unique, parameters=build.parameters, + pipeline=build.parameters['ZUUL_PIPELINE']) + ) + if build: + self.running_builds.remove(build) + self.lock.release() + return result class FakeWorker(gear.Worker): def __init__(self, worker_id, test): super(FakeWorker, self).__init__(worker_id) - self.gearman_jobs = {} self.build_history = [] self.running_builds = [] self.build_counter = 0 @@ -693,7 +694,6 @@ class FakeWorker(gear.Worker): node = None build = FakeBuild(self, job, self.build_counter, node) job.build = build - self.gearman_jobs[job.unique] = job self.build_counter += 1 self.running_builds.append(build) @@ -894,22 +894,7 @@ class BaseTestCase(testtools.TestCase): class ZuulTestCase(BaseTestCase): config_file = 'zuul.conf' - - def _startWorker(self): - self.worker = FakeWorker('fake_worker', self) - self.worker.addServer('127.0.0.1', self.gearman_server.port) - self.gearman_server.worker = self.worker - self.builds = self.worker.running_builds - self.history = self.worker.build_history - - def _stopWorker(self): - self.worker.shutdown() - - def _lockWorker(self): - self.worker.lock.acquire() - - def _unlockWorker(self): - self.worker.lock.release() + run_ansible = False def _startMerger(self): self.merge_server = zuul.merger.server.MergeServer(self.config, @@ -1003,15 +988,20 @@ class ZuulTestCase(BaseTestCase): urllib.request.urlopen = URLOpenerFactory self._startMerger() - self._startWorker() - self.launcher = zuul.launcher.client.LaunchClient( + self.launch_server = RecordingLaunchServer( + self.config, self.connections, _run_ansible=self.run_ansible) + self.launch_server.start() + self.history = self.launch_server.build_history + self.builds = self.launch_server.running_builds + + self.launch_client = zuul.launcher.client.LaunchClient( self.config, self.sched, self.swift) self.merge_client = zuul.merger.client.MergeClient( self.config, self.sched) self.nodepool = zuul.nodepool.Nodepool(self.sched) - self.sched.setLauncher(self.launcher) + self.sched.setLauncher(self.launch_client) self.sched.setMerger(self.merge_client) self.sched.setNodepool(self.nodepool) @@ -1024,7 +1014,7 @@ class ZuulTestCase(BaseTestCase): self.sched.resume() self.webapp.start() self.rpc.start() - self.launcher.gearman.waitForServer() + self.launch_client.gearman.waitForServer() self.addCleanup(self.assertFinalState) self.addCleanup(self.shutdown) @@ -1153,11 +1143,11 @@ class ZuulTestCase(BaseTestCase): def shutdown(self): self.log.debug("Shutting down after tests") - self.launcher.stop() + self.launch_client.stop() self.merge_server.stop() self.merge_server.join() self.merge_client.stop() - self._stopWorker() + self.launch_server.stop() self.sched.stop() self.sched.join() self.statsd.stop() @@ -1279,7 +1269,7 @@ class ZuulTestCase(BaseTestCase): return parameters[name] def resetGearmanServer(self): - self.worker.setFunctions([]) + self.launch_server.worker.setFunctions([]) while True: done = True for connection in self.gearman_server.active_connections: @@ -1295,29 +1285,29 @@ class ZuulTestCase(BaseTestCase): def haveAllBuildsReported(self): # See if Zuul is waiting on a meta job to complete - if self.launcher.meta_jobs: + if self.launch_client.meta_jobs: return False # Find out if every build that the worker has completed has been # reported back to Zuul. If it hasn't then that means a Gearman # event is still in transit and the system is not stable. for build in self.history: - zbuild = self.launcher.builds.get(build.uuid) + zbuild = self.launch_client.builds.get(build.uuid) if not zbuild: # It has already been reported continue # It hasn't been reported yet. return False # Make sure that none of the worker connections are in GRAB_WAIT - for connection in self.worker.active_connections: + for connection in self.launch_server.worker.active_connections: if connection.state == 'GRAB_WAIT': return False return True def areAllBuildsWaiting(self): - builds = self.launcher.builds.values() + builds = self.launch_client.builds.values() for build in builds: client_job = None - for conn in self.launcher.gearman.active_connections: + for conn in self.launch_client.gearman.active_connections: for j in conn.related_jobs.values(): if j.unique == build.uuid: client_job = j @@ -1372,7 +1362,7 @@ class ZuulTestCase(BaseTestCase): raise Exception("Timeout waiting for Zuul to settle") # Make sure no new events show up while we're checking - self._lockWorker() + self.launch_server.lock.acquire() # have all build states propogated to zuul? if self.haveAllBuildsReported(): # Join ensures that the queue is empty _and_ events have been @@ -1384,11 +1374,11 @@ class ZuulTestCase(BaseTestCase): self.haveAllBuildsReported() and self.areAllBuildsWaiting()): self.sched.run_handler_lock.release() - self._unlockWorker() + self.launch_server.lock.release() self.log.debug("...settled.") return self.sched.run_handler_lock.release() - self._unlockWorker() + self.launch_server.lock.release() self.sched.wake_event.wait(0.1) def countJobResults(self, jobs, result): @@ -1472,19 +1462,4 @@ tenants: class AnsibleZuulTestCase(ZuulTestCase): """ZuulTestCase but with an actual ansible launcher running""" - - def _startWorker(self): - self.ansible_server = RecordingLaunchServer( - self.config, self.connections) - self.ansible_server.start() - self.history = self.ansible_server.build_history - self.worker = self.ansible_server.worker - - def _stopWorker(self): - self.ansible_server.stop() - - def _lockWorker(self): - pass - - def _unlockWorker(self): - pass + run_ansible = True diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index 7cac5f7a51..45c42dd92e 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -243,7 +243,7 @@ class LaunchServer(object): # TODOv3: Ansible the ansible thing here. self.prepareAnsibleFiles(jobdir, args) - result = self.runAnsible(jobdir) + result = self.runAnsible(jobdir, job) data = { 'url': 'https://server/job', @@ -277,7 +277,8 @@ class LaunchServer(object): config.write('[defaults]\n') config.write('hostfile = %s\n' % jobdir.inventory) - def runAnsible(self, jobdir): + def runAnsible(self, jobdir, job): + # Job is included here for the benefit of the test framework. proc = subprocess.Popen( ['ansible-playbook', jobdir.playbook], cwd=jobdir.ansible_root,