Fix timeout function and add capturing of messages on failure

Ensure process terminate is only called after checking whether the
process is still running so as not to accidentally set an exitcode.
Additionally include improved message exchange and capturing to allow
for easier debug should the expected exceptions not appear on socket
timeouts.

Change-Id: Ic51745ffa67570e9a3ca4574d2bfc54d0cd6724b
This commit is contained in:
Darragh Bailey 2015-08-11 23:58:00 +01:00 committed by Darragh Bailey
parent 1647facd43
commit a8655eaccc
3 changed files with 61 additions and 14 deletions

View File

@ -6,3 +6,4 @@ unittest2
python-subunit
sphinx>=1.1.2,<1.2
testrepository
testtools

View File

@ -1,4 +1,7 @@
import functools
from multiprocessing import Process
from multiprocessing import Queue
import traceback
from six.moves import socketserver
@ -7,15 +10,54 @@ class TestsTimeoutException(Exception):
pass
def time_limit(seconds, func, *args, **kwargs):
def time_limit(seconds, fp, func, *args, **kwargs):
if fp:
if not hasattr(fp, 'write'):
raise TypeError("Expected 'file-like' object, got '%s'" % fp)
else:
def record(msg):
fp.write(msg)
else:
def record(msg):
return
def capture_results(msg_queue, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception as e:
msg_queue.put(
"Running function '%s' resulted in exception '%s' with "
"message: '%s'\n" % (func.__name__, e.__class__.__name__, e))
# no point re-raising an exception from the subprocess, instead
# return False
return False
else:
msg_queue.put(
"Running function '%s' finished with result '%s', and"
"stack:\n%s\n" % (func.__name__, result,
traceback.format_stack()))
return result
messages = Queue()
# although creating a separate process is expensive it's the only way to
# ensure cross platform that we can cleanly terminate after timeout
p = Process(target=func, args=args, kwargs=kwargs)
p = Process(target=functools.partial(capture_results, messages, func),
args=args, kwargs=kwargs)
p.start()
p.join(seconds)
p.terminate()
if p.exitcode is None:
if p.is_alive():
p.terminate()
while not messages.empty():
record(messages.get())
record("Running function '%s' did not finish\n" % func.__name__)
raise TestsTimeoutException
else:
while not messages.empty():
record(messages.get())
record("Running function '%s' finished with exit code '%s'\n"
% (func.__name__, p.exitcode))
class NullServer(socketserver.TCPServer):

View File

@ -1,21 +1,24 @@
import sys
from six.moves import StringIO
import testtools
from testtools.content import text_content
import jenkins
from tests.helper import NullServer
from tests.helper import TestsTimeoutException
from tests.helper import time_limit
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest
class JenkinsRequestTimeoutTests(unittest.TestCase):
class JenkinsRequestTimeoutTests(testtools.TestCase):
def setUp(self):
super(JenkinsRequestTimeoutTests, self).setUp()
self.server = NullServer(("127.0.0.1", 0))
self.messages = StringIO()
self.addOnException(self._get_messages)
def _get_messages(self, exc_info):
self.addDetail('timeout-tests-messages',
text_content(self.messages.getvalue()))
def test_jenkins_open_timeout(self):
j = jenkins.Jenkins("http://%s:%s" % self.server.server_address,
@ -24,7 +27,7 @@ class JenkinsRequestTimeoutTests(unittest.TestCase):
self.server.server_address)
# assert our request times out when no response
with self.assertRaises(jenkins.TimeoutException):
with testtools.ExpectedException(jenkins.TimeoutException):
j.jenkins_open(request, add_crumb=False)
def test_jenkins_open_no_timeout(self):
@ -35,5 +38,6 @@ class JenkinsRequestTimeoutTests(unittest.TestCase):
# assert we don't timeout quickly like previous test when
# no timeout defined.
with self.assertRaises(TestsTimeoutException):
time_limit(0.5, j.jenkins_open, request, add_crumb=False)
with testtools.ExpectedException(TestsTimeoutException):
time_limit(0.5, self.messages,
j.jenkins_open, request, add_crumb=False)