Merge "Fix canceling builds in starting phase"

This commit is contained in:
Zuul 2020-01-24 16:06:26 +00:00 committed by Gerrit Code Review
commit 83175b3fe0
4 changed files with 91 additions and 18 deletions

View File

@ -2210,6 +2210,12 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
merger, items, repo_state) merger, items, repo_state)
if not commit: # merge conflict if not commit: # merge conflict
self.recordResult('MERGER_FAILURE') self.recordResult('MERGER_FAILURE')
for _ in iterate_timeout(60, 'wait for merge'):
if not self.executor_server.hold_jobs_in_start:
break
time.sleep(1)
return commit return commit
def recordResult(self, result): def recordResult(self, result):
@ -2284,6 +2290,10 @@ class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
build.paused = False build.paused = False
super().resume() super().resume()
def _send_aborted(self):
self.recordResult('ABORTED')
super()._send_aborted()
class RecordingMergeClient(zuul.merger.client.MergeClient): class RecordingMergeClient(zuul.merger.client.MergeClient):
@ -2322,6 +2332,7 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
self._ansible_manager_class = FakeAnsibleManager self._ansible_manager_class = FakeAnsibleManager
super(RecordingExecutorServer, self).__init__(*args, **kw) super(RecordingExecutorServer, self).__init__(*args, **kw)
self.hold_jobs_in_build = False self.hold_jobs_in_build = False
self.hold_jobs_in_start = False
self.lock = threading.Lock() self.lock = threading.Lock()
self.running_builds = [] self.running_builds = []
self.build_history = [] self.build_history = []

View File

@ -2540,6 +2540,33 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(A.reported, 0, "Abandoned change should not report") self.assertEqual(A.reported, 0, "Abandoned change should not report")
self.assertEqual(B.reported, 1, "Change should report") self.assertEqual(B.reported, 1, "Change should report")
def test_cancel_starting_build(self):
"Test that a canceled build that is not processed yet is removed"
self.executor_server.hold_jobs_in_start = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
for _ in iterate_timeout(30, 'Wait for build to be in starting phase'):
if self.executor_server.job_workers:
break
# Abandon change to cancel build
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
for _ in iterate_timeout(30, 'Wait for executor:stop request'):
stop_jobs = [x for x in self.gearman_server.jobs_history
if b'executor:stop' in x.name]
if stop_jobs:
break
self.executor_server.hold_jobs_in_start = False
self.waitUntilSettled()
self.assertHistory([
dict(name='project-merge', result='ABORTED')
])
def test_abandoned_not_timer(self): def test_abandoned_not_timer(self):
"Test that an abandoned change does not cancel timer jobs" "Test that an abandoned change does not cancel timer jobs"
# This test can not use simple_layout because it must start # This test can not use simple_layout because it must start

View File

@ -389,8 +389,7 @@ class ExecutorClient(object):
log.debug("Build has no associated gearman job") log.debug("Build has no associated gearman job")
return False return False
# TODOv3(jeblair): make a nicer way of recording build start. if build.__gearman_worker is not None:
if build.url is not None:
log.debug("Build has already started") log.debug("Build has already started")
self.cancelRunningBuild(build) self.cancelRunningBuild(build)
log.debug("Canceled running build") log.debug("Canceled running build")
@ -406,12 +405,12 @@ class ExecutorClient(object):
time.sleep(1) time.sleep(1)
log.debug("Still unable to find build to cancel") log.debug("Still unable to find build to cancel")
if build.url: if build.__gearman_worker is not None:
log.debug("Build has just started") log.debug("Build has just started")
self.cancelRunningBuild(build) self.cancelRunningBuild(build)
log.debug("Canceled running build") log.debug("Canceled running build")
return True return True
log.debug("Unable to cancel build") log.error("Unable to cancel build")
def onBuildCompleted(self, job, result=None): def onBuildCompleted(self, job, result=None):
if job.unique in self.meta_jobs: if job.unique in self.meta_jobs:
@ -487,6 +486,7 @@ class ExecutorClient(object):
build.url = data.get('url', build.url) build.url = data.get('url', build.url)
# Update information about worker # Update information about worker
build.worker.updateFromData(data) build.worker.updateFromData(data)
build.__gearman_worker = build.worker.name
if 'paused' in data and build.paused != data['paused']: if 'paused' in data and build.paused != data['paused']:
build.paused = data['paused'] build.paused = data['paused']
@ -496,7 +496,6 @@ class ExecutorClient(object):
if not started: if not started:
self.log.info("Build %s started" % job) self.log.info("Build %s started" % job)
build.__gearman_worker = data.get('worker_name')
self.sched.onBuildStarted(build) self.sched.onBuildStarted(build)
else: else:
self.log.error("Unable to find build %s" % job.unique) self.log.error("Unable to find build %s" % job.unique)

View File

@ -829,6 +829,10 @@ class AnsibleJob(object):
def execute(self): def execute(self):
try: try:
self.time_starting_build = time.monotonic() self.time_starting_build = time.monotonic()
# report that job has been taken
self.job.sendWorkData(json.dumps(self._base_job_data()))
self.ssh_agent.start() self.ssh_agent.start()
self.ssh_agent.add(self.private_key_file) self.ssh_agent.add(self.private_key_file)
for key in self.arguments.get('ssh_keys', []): for key in self.arguments.get('ssh_keys', []):
@ -862,6 +866,22 @@ class AnsibleJob(object):
except Exception: except Exception:
self.log.exception("Error finalizing job thread:") self.log.exception("Error finalizing job thread:")
def _base_job_data(self):
return {
# TODO(mordred) worker_name is needed as a unique name for the
# client to use for cancelling jobs on an executor. It's
# defaulting to the hostname for now, but in the future we
# should allow setting a per-executor override so that one can
# run more than one executor on a host.
'worker_name': self.executor_server.hostname,
'worker_hostname': self.executor_server.hostname,
'worker_log_port': self.executor_server.log_streaming_port,
}
def _send_aborted(self):
result = dict(result='ABORTED')
self.job.sendWorkComplete(json.dumps(result))
def _execute(self): def _execute(self):
args = self.arguments args = self.arguments
self.log.info( self.log.info(
@ -914,6 +934,11 @@ class AnsibleJob(object):
'branches': task.branches, 'branches': task.branches,
} }
# Early abort if abort requested
if self.aborted:
self._send_aborted()
return
self.log.debug("Git updates complete") self.log.debug("Git updates complete")
merger = self.executor_server._getMerger( merger = self.executor_server._getMerger(
self.jobdir.src_root, self.jobdir.src_root,
@ -939,10 +964,20 @@ class AnsibleJob(object):
# a work complete result, don't run any jobs # a work complete result, don't run any jobs
return return
# Early abort if abort requested
if self.aborted:
self._send_aborted()
return
state_items = [i for i in args['items'] if not i.get('number')] state_items = [i for i in args['items'] if not i.get('number')]
if state_items: if state_items:
merger.setRepoState(state_items, repo_state) merger.setRepoState(state_items, repo_state)
# Early abort if abort requested
if self.aborted:
self._send_aborted()
return
for project in args['projects']: for project in args['projects']:
repo = repos[project['canonical_name']] repo = repos[project['canonical_name']]
# If this project is the Zuul project and this is a ref # If this project is the Zuul project and this is a ref
@ -979,30 +1014,31 @@ class AnsibleJob(object):
for repo in repos.values(): for repo in repos.values():
repo.setRemoteUrl('file:///dev/null') repo.setRemoteUrl('file:///dev/null')
# Early abort if abort requested
if self.aborted:
self._send_aborted()
return
# This prepares each playbook and the roles needed for each. # This prepares each playbook and the roles needed for each.
self.preparePlaybooks(args) self.preparePlaybooks(args)
self.prepareAnsibleFiles(args) self.prepareAnsibleFiles(args)
self.writeLoggingConfig() self.writeLoggingConfig()
data = { # Early abort if abort requested
# TODO(mordred) worker_name is needed as a unique name for the if self.aborted:
# client to use for cancelling jobs on an executor. It's defaulting self._send_aborted()
# to the hostname for now, but in the future we should allow return
# setting a per-executor override so that one can run more than
# one executor on a host. data = self._base_job_data()
'worker_name': self.executor_server.hostname,
'worker_hostname': self.executor_server.hostname,
'worker_log_port': self.executor_server.log_streaming_port
}
if self.executor_server.log_streaming_port != DEFAULT_FINGER_PORT: if self.executor_server.log_streaming_port != DEFAULT_FINGER_PORT:
data['url'] = "finger://{hostname}:{port}/{uuid}".format( data['url'] = "finger://{hostname}:{port}/{uuid}".format(
hostname=data['worker_hostname'], hostname=self.executor_server.hostname,
port=data['worker_log_port'], port=self.executor_server.log_streaming_port,
uuid=self.job.unique) uuid=self.job.unique)
else: else:
data['url'] = 'finger://{hostname}/{uuid}'.format( data['url'] = 'finger://{hostname}/{uuid}'.format(
hostname=data['worker_hostname'], hostname=self.executor_server.hostname,
uuid=self.job.unique) uuid=self.job.unique)
self.job.sendWorkData(json.dumps(data)) self.job.sendWorkData(json.dumps(data))