Use RecordingLaunchServer to run all tests

Instead of having an entirely fake launch server and an entirely
real one, use the real launch server for everything, but add an
option to not actually execute ansible.  This will exercise most
of the code in the launcher, remove unecessary fakes, and still
maintain the speed benefit of not running ansible for every test
of scheduler behavior.

Some tests are still run with the launcher actually running ansible,
and that facility will continue to be available as we create tests
that validate actual ansible behavior.

Change-Id: Ie0fbba2b786a5aeb1c603597af30fcd728a8cec8
This commit is contained in:
James E. Blair 2016-08-02 10:00:27 -07:00
parent 6cdf2f2047
commit e1767bc263
2 changed files with 79 additions and 103 deletions

View File

@ -529,13 +529,12 @@ class FakeStatsd(threading.Thread):
os.write(self.wake_write, '1\n')
class FakeBuild(threading.Thread):
class FakeBuild(object):
log = logging.getLogger("zuul.test")
def __init__(self, worker, job, number, node):
threading.Thread.__init__(self)
def __init__(self, launch_server, job, number, node):
self.daemon = True
self.worker = worker
self.launch_server = launch_server
self.job = job
self.number = number
self.node = node
@ -547,6 +546,9 @@ class FakeBuild(threading.Thread):
self.aborted = False
self.created = time.time()
self.run_error = False
self.changes = None
if 'ZUUL_CHANGE_IDS' in self.parameters:
self.changes = self.parameters['ZUUL_CHANGE_IDS']
def release(self):
self.wait_condition.acquire()
@ -576,7 +578,7 @@ class FakeBuild(threading.Thread):
'url': 'https://server/job/%s/%s/' % (self.name, self.number),
'name': self.name,
'number': self.number,
'manager': self.worker.worker_id,
'manager': self.launch_server.worker.worker_id,
'worker_name': 'My Worker',
'worker_hostname': 'localhost',
'worker_ips': ['127.0.0.1', '192.168.1.1'],
@ -592,81 +594,80 @@ class FakeBuild(threading.Thread):
self.log.debug('Sent WorkData packet with %s' % json.dumps(data))
self.job.sendWorkStatus(0, 100)
if self.worker.hold_jobs_in_build:
if self.launch_server.hold_jobs_in_build:
self.log.debug('Holding build %s' % self.unique)
self._wait()
self.log.debug("Build %s continuing" % self.unique)
self.worker.lock.acquire()
result = 'SUCCESS'
if (('ZUUL_REF' in self.parameters) and
self.worker.shouldFailTest(self.name,
self.parameters['ZUUL_REF'])):
self.launch_server.shouldFailTest(self.name,
self.parameters['ZUUL_REF'])):
result = 'FAILURE'
if self.aborted:
result = 'ABORTED'
if self.run_error:
work_fail = True
result = 'RUN_ERROR'
else:
data['result'] = result
data['node_labels'] = ['bare-necessities']
data['node_name'] = 'foo'
work_fail = False
changes = None
if 'ZUUL_CHANGE_IDS' in self.parameters:
changes = self.parameters['ZUUL_CHANGE_IDS']
self.worker.build_history.append(
BuildHistory(name=self.name, number=self.number,
result=result, changes=changes, node=self.node,
uuid=self.unique, parameters=self.parameters,
pipeline=self.parameters['ZUUL_PIPELINE'])
)
self.job.sendWorkData(json.dumps(data))
if work_fail:
self.job.sendWorkFail()
else:
self.job.sendWorkComplete(json.dumps(data))
del self.worker.gearman_jobs[self.job.unique]
self.worker.running_builds.remove(self)
self.worker.lock.release()
return result
class RecordingLaunchServer(zuul.launcher.server.LaunchServer):
def __init__(self, *args, **kw):
self._run_ansible = kw.pop('_run_ansible', False)
super(RecordingLaunchServer, self).__init__(*args, **kw)
self.job_history = []
self.hold_jobs_in_build = False
self.lock = threading.Lock()
self.running_builds = []
self.build_history = []
self._build_counter_lock = threading.Lock()
self.build_counter = 0
self.fail_tests = {}
def launch(self, job):
self.job_history.append(job)
job.data = []
def addFailTest(self, name, change):
l = self.fail_tests.get(name, [])
l.append(change)
self.fail_tests[name] = l
def sendWorkComplete(data=b''):
job.data.append(data)
params = json.loads(job.arguments)
result = json.loads(job.data[-1])
build = BuildHistory(job=job,
uuid=job.unique,
name=params['job'],
parameters=params,
result=result['result'])
self.build_history.append(build)
gear.WorkerJob.sendWorkComplete(job, data)
def shouldFailTest(self, name, ref):
l = self.fail_tests.get(name, [])
for change in l:
if self.test.ref_has_change(ref, change):
return True
return False
job.sendWorkComplete = sendWorkComplete
super(RecordingLaunchServer, self).launch(job)
def runAnsible(self, jobdir, job):
with self._build_counter_lock:
self.build_counter += 1
build_counter = self.build_counter
node = None
build = FakeBuild(self, job, build_counter, node)
job.build = build
self.running_builds.append(build)
if self._run_ansible:
result = super(RecordingLaunchServer, self).runAnsible(jobdir, job)
else:
result = build.run()
self.lock.acquire()
self.build_history.append(
BuildHistory(name=build.name, number=build.number,
result=result, changes=build.changes, node=build.node,
uuid=build.unique, parameters=build.parameters,
pipeline=build.parameters['ZUUL_PIPELINE'])
)
if build:
self.running_builds.remove(build)
self.lock.release()
return result
class FakeWorker(gear.Worker):
def __init__(self, worker_id, test):
super(FakeWorker, self).__init__(worker_id)
self.gearman_jobs = {}
self.build_history = []
self.running_builds = []
self.build_counter = 0
@ -693,7 +694,6 @@ class FakeWorker(gear.Worker):
node = None
build = FakeBuild(self, job, self.build_counter, node)
job.build = build
self.gearman_jobs[job.unique] = job
self.build_counter += 1
self.running_builds.append(build)
@ -894,22 +894,7 @@ class BaseTestCase(testtools.TestCase):
class ZuulTestCase(BaseTestCase):
config_file = 'zuul.conf'
def _startWorker(self):
self.worker = FakeWorker('fake_worker', self)
self.worker.addServer('127.0.0.1', self.gearman_server.port)
self.gearman_server.worker = self.worker
self.builds = self.worker.running_builds
self.history = self.worker.build_history
def _stopWorker(self):
self.worker.shutdown()
def _lockWorker(self):
self.worker.lock.acquire()
def _unlockWorker(self):
self.worker.lock.release()
run_ansible = False
def _startMerger(self):
self.merge_server = zuul.merger.server.MergeServer(self.config,
@ -1003,15 +988,20 @@ class ZuulTestCase(BaseTestCase):
urllib.request.urlopen = URLOpenerFactory
self._startMerger()
self._startWorker()
self.launcher = zuul.launcher.client.LaunchClient(
self.launch_server = RecordingLaunchServer(
self.config, self.connections, _run_ansible=self.run_ansible)
self.launch_server.start()
self.history = self.launch_server.build_history
self.builds = self.launch_server.running_builds
self.launch_client = zuul.launcher.client.LaunchClient(
self.config, self.sched, self.swift)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
self.nodepool = zuul.nodepool.Nodepool(self.sched)
self.sched.setLauncher(self.launcher)
self.sched.setLauncher(self.launch_client)
self.sched.setMerger(self.merge_client)
self.sched.setNodepool(self.nodepool)
@ -1024,7 +1014,7 @@ class ZuulTestCase(BaseTestCase):
self.sched.resume()
self.webapp.start()
self.rpc.start()
self.launcher.gearman.waitForServer()
self.launch_client.gearman.waitForServer()
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
@ -1153,11 +1143,11 @@ class ZuulTestCase(BaseTestCase):
def shutdown(self):
self.log.debug("Shutting down after tests")
self.launcher.stop()
self.launch_client.stop()
self.merge_server.stop()
self.merge_server.join()
self.merge_client.stop()
self._stopWorker()
self.launch_server.stop()
self.sched.stop()
self.sched.join()
self.statsd.stop()
@ -1279,7 +1269,7 @@ class ZuulTestCase(BaseTestCase):
return parameters[name]
def resetGearmanServer(self):
self.worker.setFunctions([])
self.launch_server.worker.setFunctions([])
while True:
done = True
for connection in self.gearman_server.active_connections:
@ -1295,29 +1285,29 @@ class ZuulTestCase(BaseTestCase):
def haveAllBuildsReported(self):
# See if Zuul is waiting on a meta job to complete
if self.launcher.meta_jobs:
if self.launch_client.meta_jobs:
return False
# Find out if every build that the worker has completed has been
# reported back to Zuul. If it hasn't then that means a Gearman
# event is still in transit and the system is not stable.
for build in self.history:
zbuild = self.launcher.builds.get(build.uuid)
zbuild = self.launch_client.builds.get(build.uuid)
if not zbuild:
# It has already been reported
continue
# It hasn't been reported yet.
return False
# Make sure that none of the worker connections are in GRAB_WAIT
for connection in self.worker.active_connections:
for connection in self.launch_server.worker.active_connections:
if connection.state == 'GRAB_WAIT':
return False
return True
def areAllBuildsWaiting(self):
builds = self.launcher.builds.values()
builds = self.launch_client.builds.values()
for build in builds:
client_job = None
for conn in self.launcher.gearman.active_connections:
for conn in self.launch_client.gearman.active_connections:
for j in conn.related_jobs.values():
if j.unique == build.uuid:
client_job = j
@ -1372,7 +1362,7 @@ class ZuulTestCase(BaseTestCase):
raise Exception("Timeout waiting for Zuul to settle")
# Make sure no new events show up while we're checking
self._lockWorker()
self.launch_server.lock.acquire()
# have all build states propogated to zuul?
if self.haveAllBuildsReported():
# Join ensures that the queue is empty _and_ events have been
@ -1384,11 +1374,11 @@ class ZuulTestCase(BaseTestCase):
self.haveAllBuildsReported() and
self.areAllBuildsWaiting()):
self.sched.run_handler_lock.release()
self._unlockWorker()
self.launch_server.lock.release()
self.log.debug("...settled.")
return
self.sched.run_handler_lock.release()
self._unlockWorker()
self.launch_server.lock.release()
self.sched.wake_event.wait(0.1)
def countJobResults(self, jobs, result):
@ -1472,19 +1462,4 @@ tenants:
class AnsibleZuulTestCase(ZuulTestCase):
"""ZuulTestCase but with an actual ansible launcher running"""
def _startWorker(self):
self.ansible_server = RecordingLaunchServer(
self.config, self.connections)
self.ansible_server.start()
self.history = self.ansible_server.build_history
self.worker = self.ansible_server.worker
def _stopWorker(self):
self.ansible_server.stop()
def _lockWorker(self):
pass
def _unlockWorker(self):
pass
run_ansible = True

View File

@ -243,7 +243,7 @@ class LaunchServer(object):
# TODOv3: Ansible the ansible thing here.
self.prepareAnsibleFiles(jobdir, args)
result = self.runAnsible(jobdir)
result = self.runAnsible(jobdir, job)
data = {
'url': 'https://server/job',
@ -277,7 +277,8 @@ class LaunchServer(object):
config.write('[defaults]\n')
config.write('hostfile = %s\n' % jobdir.inventory)
def runAnsible(self, jobdir):
def runAnsible(self, jobdir, job):
# Job is included here for the benefit of the test framework.
proc = subprocess.Popen(
['ansible-playbook', jobdir.playbook],
cwd=jobdir.ansible_root,