Merge "Add non-intrusive ctrl-c handling"

This commit is contained in:
Jenkins 2016-06-10 03:14:05 +00:00 committed by Gerrit Code Review
commit 319fd3e794

View File

@ -14,6 +14,7 @@
from __future__ import print_function from __future__ import print_function
import contextlib
import datetime import datetime
import errno import errno
import graphviz import graphviz
@ -24,7 +25,6 @@ import pprint
import re import re
import requests import requests
import shutil import shutil
import signal
import sys import sys
import tarfile import tarfile
import tempfile import tempfile
@ -56,14 +56,6 @@ LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO) LOG.setLevel(logging.INFO)
def handle_ctrlc(single, frame):
kollaobj = frame.f_locals['kolla']
kollaobj.cleanup()
sys.exit(1)
signal.signal(signal.SIGINT, handle_ctrlc)
class KollaDirNotFoundException(Exception): class KollaDirNotFoundException(Exception):
pass pass
@ -125,6 +117,15 @@ class Recorder(object):
return u"\n".join(self._lines) return u"\n".join(self._lines)
@contextlib.contextmanager
def join_many(threads):
try:
yield
finally:
for t in threads:
t.join()
def docker_client(): def docker_client():
try: try:
docker_kwargs = docker.utils.kwargs_from_env() docker_kwargs = docker.utils.kwargs_from_env()
@ -436,9 +437,10 @@ class WorkerThread(threading.Thread):
super(WorkerThread, self).__init__() super(WorkerThread, self).__init__()
self.queue = queue self.queue = queue
self.conf = conf self.conf = conf
self.should_stop = False
def run(self): def run(self):
while True: while not self.should_stop:
task = self.queue.get() task = self.queue.get()
if task is self.tombstone: if task is self.tombstone:
# Ensure any other threads also get the tombstone. # Ensure any other threads also get the tombstone.
@ -446,6 +448,8 @@ class WorkerThread(threading.Thread):
break break
try: try:
for attempt in six.moves.range(self.conf.retries + 1): for attempt in six.moves.range(self.conf.retries + 1):
if self.should_stop:
break
if attempt > 0: if attempt > 0:
LOG.debug("Attempting to run task %s for the %s time", LOG.debug("Attempting to run task %s for the %s time",
task.name, attempt + 1) task.name, attempt + 1)
@ -461,7 +465,7 @@ class WorkerThread(threading.Thread):
task.name) task.name)
# try again... # try again...
task.reset() task.reset()
if task.success: if task.success and not self.should_stop:
for next_task in task.followups: for next_task in task.followups:
LOG.debug('Added next task %s to queue', LOG.debug('Added next task %s to queue',
next_task.name) next_task.name)
@ -897,28 +901,34 @@ def run_build():
queue = kolla.build_queue(push_queue) queue = kolla.build_queue(push_queue)
workers = [] workers = []
for x in six.moves.range(conf.threads): with join_many(workers):
worker = WorkerThread(conf, queue) try:
worker.setDaemon(True) for x in six.moves.range(conf.threads):
worker.start() worker = WorkerThread(conf, queue)
workers.append(worker) worker.setDaemon(True)
worker.start()
workers.append(worker)
for x in six.moves.range(conf.push_threads): for x in six.moves.range(conf.push_threads):
worker = WorkerThread(conf, push_queue) worker = WorkerThread(conf, push_queue)
worker.start() worker.start()
workers.append(worker) workers.append(worker)
# sleep until queue is empty # sleep until queue is empty
while queue.unfinished_tasks or push_queue.unfinished_tasks: while queue.unfinished_tasks or push_queue.unfinished_tasks:
time.sleep(3) time.sleep(3)
# ensure all threads exited happily
push_queue.put(WorkerThread.tombstone)
queue.put(WorkerThread.tombstone)
except KeyboardInterrupt:
for w in workers:
w.should_stop = True
push_queue.put(WorkerThread.tombstone)
queue.put(WorkerThread.tombstone)
raise
kolla.summary() kolla.summary()
kolla.cleanup() kolla.cleanup()
# ensure all threads exited happily
queue.put(WorkerThread.tombstone)
push_queue.put(WorkerThread.tombstone)
for w in workers:
w.join()
return kolla.get_image_statuses() return kolla.get_image_statuses()