Handle websocket client hangups better

Currently, if a websocket client hangs up, we won't notice until
the next time we try to send data.  But ws4py has a callback for
informing us of a hangup sooner.  Use that.

Change-Id: Ifbd28161f7fc77cd1dc01883b132366a801b76ef
This commit is contained in:
James E. Blair
2018-06-06 16:05:51 -07:00
parent 8bd7c02750
commit 457cc75be7
2 changed files with 83 additions and 0 deletions
+74
View File
@@ -23,6 +23,7 @@ import socket
import tempfile
import testtools
import threading
import time
import zuul.web
import zuul.lib.log_streamer
@@ -388,6 +389,79 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
self.log.debug("\n\nStreamed: %s\n\n", client2.results)
self.assertEqual(file_contents, client2.results)
def test_websocket_hangup(self):
# Start the web server
web = self.useFixture(
ZuulWebFixture(self.gearman_server.port,
self.config))
# Start the finger streamer daemon
streamer = zuul.lib.log_streamer.LogStreamer(
self.host, 0, self.executor_server.jobdir_root)
self.addCleanup(streamer.stop)
# Need to set the streaming port before submitting the job
finger_port = streamer.server.socket.getsockname()[1]
self.executor_server.log_streaming_port = finger_port
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
# We don't have any real synchronization for the ansible jobs, so
# just wait until we get our running build.
for x in iterate_timeout(30, "build"):
if len(self.builds):
break
build = self.builds[0]
self.assertEqual(build.name, 'python27')
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
for x in iterate_timeout(30, "build dir"):
if os.path.exists(build_dir):
break
# Need to wait to make sure that jobdir gets set
for x in iterate_timeout(30, "jobdir"):
if build.jobdir is not None:
break
build = self.builds[0]
# Wait for the job to begin running and create the ansible log file.
# The job waits to complete until the flag file exists, so we can
# safely access the log here.
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
for x in iterate_timeout(30, "ansible log"):
if os.path.exists(ansible_log):
break
# Start a thread with the websocket client
client1 = self.runWSClient(web.port, build.uuid)
client1.event.wait()
# Wait until we've streamed everything so far
for x in iterate_timeout(30, "streamer is caught up"):
with open(ansible_log, 'r') as logfile:
if client1.results == logfile.read():
break
# This is intensive, give it some time
time.sleep(1)
self.assertNotEqual(len(web.web.stream_manager.streamers.keys()), 0)
# Hangup the client side
client1.close(1000, 'test close')
client1.thread.join()
# The client should be de-registered shortly
for x in iterate_timeout(30, "client cleanup"):
if len(web.web.stream_manager.streamers.keys()) == 0:
break
# Allow the job to complete
flag_file = os.path.join(build_dir, 'test_wait')
open(flag_file, 'w').close()
self.waitUntilSettled()
def test_finger_gateway(self):
# Start the finger streamer daemon
streamer = zuul.lib.log_streamer.LogStreamer(
+9
View File
@@ -99,6 +99,15 @@ class LogStreamHandler(WebSocket):
self.log.exception("Error processing websocket message:")
raise
def closed(self, code, reason):
self.log.debug("Websocket closed: %s %s", code, reason)
if self.streamer:
try:
self.streamer.zuulweb.stream_manager.unregisterStreamer(
self.streamer)
except Exception:
self.log.exception("Error on remote websocket close:")
def logClose(self, code, msg):
self.log.debug("Websocket close: %s %s", code, msg)
try: