Re-launch a job if the worker fails to run it

If a job is complete with no build result, it has failed to
run to completion.  In this case, discard the previous build
and launch a replacement (in the next run of the queue processor).

Change-Id: Ib8fc245a5becb1e7deb13f1ea0721fdb6ceb9f6f
This commit is contained in:
James E. Blair 2013-08-23 15:17:33 -07:00
parent 0aac487cf7
commit 4a28a88381
3 changed files with 39 additions and 4 deletions

View File

@ -443,6 +443,7 @@ class FakeBuild(threading.Thread):
self.aborted = False
self.created = time.time()
self.description = ''
self.run_error = False
def release(self):
self.wait_condition.acquire()
@ -492,7 +493,13 @@ class FakeBuild(threading.Thread):
if self.aborted:
result = 'ABORTED'
data = {'result': result}
if self.run_error:
work_fail = True
result = 'RUN_ERROR'
else:
data['result'] = result
work_fail = False
changes = None
if 'ZUUL_CHANGE_IDS' in self.parameters:
changes = self.parameters['ZUUL_CHANGE_IDS']
@ -504,7 +511,11 @@ class FakeBuild(threading.Thread):
pipeline=self.parameters['ZUUL_PIPELINE'])
)
self.job.sendWorkComplete(json.dumps(data))
self.job.sendWorkData(json.dumps(data))
if work_fail:
self.job.sendWorkFail()
else:
self.job.sendWorkComplete(json.dumps(data))
del self.worker.gearman_jobs[self.job.unique]
self.worker.running_builds.remove(self)
self.worker.lock.release()
@ -2381,6 +2392,21 @@ class TestScheduler(testtools.TestCase):
self.assertEqual(D.data['status'], 'MERGED')
self.assertEqual(D.reported, 2)
def test_rerun_on_error(self):
"Test that if a worker fails to run a job, it is run again"
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.builds[0].run_error = True
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
self.assertEqual(self.countJobResults(self.history, 'RUN_ERROR'), 1)
self.assertEqual(self.countJobResults(self.history, 'SUCCESS'), 3)
def test_statsd(self):
"Test each of the statsd methods used in the scheduler"
import extras

View File

@ -359,7 +359,7 @@ class Gearman(object):
data = getJobData(job)
result = data.get('result')
if result is None:
result = 'LOST'
build.retry = True
self.log.info("Build %s complete, result %s" %
(job, result))
build.result = result

View File

@ -162,7 +162,9 @@ class Pipeline(object):
return self.isHoldingFollowingChanges(item.item_ahead)
def setResult(self, item, build):
if build.result != 'SUCCESS':
if build.retry:
item.removeBuild(build)
elif build.result != 'SUCCESS':
# Get a JobTree from a Job so we can find only its dependent jobs
root = self.getJobTree(item.change.project)
tree = root.getJobTreeForJob(build.job)
@ -537,6 +539,7 @@ class Build(object):
self.estimated_time = None
self.pipeline = None
self.canceled = False
self.retry = False
self.parameters = {}
def __repr__(self):
@ -572,6 +575,9 @@ class BuildSet(object):
self.builds[build.job.name] = build
build.build_set = self
def removeBuild(self, build):
del self.builds[build.job.name]
def getBuild(self, job_name):
return self.builds.get(job_name)
@ -609,6 +615,9 @@ class QueueItem(object):
self.current_build_set.addBuild(build)
build.pipeline = self.pipeline
def removeBuild(self, build):
self.current_build_set.removeBuild(build)
def setReportedResult(self, result):
self.current_build_set.result = result