From 3695f376b2b95a22726257256a482aecb396acbd Mon Sep 17 00:00:00 2001 From: David Shrewsbury Date: Fri, 6 Apr 2018 09:08:50 -0400 Subject: [PATCH] Fix streaming decoding boundaries Use an IncrementalDecoder that understands character boundaries. Change-Id: I032f15eef15a26287d0d912d6ef105cf5beae200 --- tests/unit/test_streaming.py | 107 +++++++++++++++++++++++++++++++++++ zuul/web/__init__.py | 12 +++- 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_streaming.py b/tests/unit/test_streaming.py index bba77e6761..7b9d86ae27 100644 --- a/tests/unit/test_streaming.py +++ b/tests/unit/test_streaming.py @@ -16,6 +16,7 @@ import aiohttp import asyncio +import io import logging import json import os @@ -217,6 +218,112 @@ class TestStreaming(tests.base.AnsibleZuulTestCase): self.streaming_data += data.decode('utf-8') s.shutdown(socket.SHUT_RDWR) + def test_decode_boundaries(self): + ''' + Test multi-byte characters crossing read buffer boundaries. + + The finger client used by ZuulWeb reads in increments of 1024 bytes. + If the last byte is a multi-byte character, we end up with an error + similar to: + + 'utf-8' codec can't decode byte 0xe2 in position 1023: \ + unexpected end of data + + By making the 1024th character in the log file a multi-byte character + (here, the Euro character), we can test this. + ''' + # Start the finger streamer daemon + streamer = zuul.lib.log_streamer.LogStreamer( + self.host, 0, self.executor_server.jobdir_root) + self.addCleanup(streamer.stop) + + # Need to set the streaming port before submitting the job + finger_port = streamer.server.socket.getsockname()[1] + self.executor_server.log_streaming_port = finger_port + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + # We don't have any real synchronization for the ansible jobs, so + # just wait until we get our running build. + while not len(self.builds): + time.sleep(0.1) + build = self.builds[0] + self.assertEqual(build.name, 'python27') + + build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid) + while not os.path.exists(build_dir): + time.sleep(0.1) + + # Need to wait to make sure that jobdir gets set + while build.jobdir is None: + time.sleep(0.1) + build = self.builds[0] + + # Wait for the job to begin running and create the ansible log file. + # The job waits to complete until the flag file exists, so we can + # safely access the log here. We only open it (to force a file handle + # to be kept open for it after the job finishes) but wait to read the + # contents until the job is done. + ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt') + while not os.path.exists(ansible_log): + time.sleep(0.1) + + # Replace log file contents with the 1024th character being a + # multi-byte character. + with io.open(ansible_log, 'w', encoding='utf8') as f: + f.write("a" * 1023) + f.write(u"\u20AC") + + logfile = open(ansible_log, 'r') + self.addCleanup(logfile.close) + + # Start the web server + web_server = zuul.web.ZuulWeb( + listen_address='::', listen_port=9000, + gear_server='127.0.0.1', gear_port=self.gearman_server.port, + static_path=tempfile.gettempdir()) + loop = asyncio.new_event_loop() + loop.set_debug(True) + ws_thread = threading.Thread(target=web_server.run, args=(loop,)) + ws_thread.start() + self.addCleanup(loop.close) + self.addCleanup(ws_thread.join) + self.addCleanup(web_server.stop) + + # Wait until web server is started + while True: + try: + with socket.create_connection((self.host, 9000)): + break + except ConnectionRefusedError: + time.sleep(0.1) + + # Start a thread with the websocket client + ws_client_event = threading.Event() + self.ws_client_results = '' + ws_client_thread = threading.Thread( + target=self.runWSClient, args=(build.uuid, ws_client_event) + ) + ws_client_thread.start() + ws_client_event.wait() + + # Allow the job to complete + flag_file = os.path.join(build_dir, 'test_wait') + open(flag_file, 'w').close() + + # Wait for the websocket client to complete, which it should when + # it's received the full log. + ws_client_thread.join() + + self.waitUntilSettled() + + file_contents = logfile.read() + logfile.close() + self.log.debug("\n\nFile contents: %s\n\n", file_contents) + self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results) + self.assertEqual(file_contents, self.ws_client_results) + def test_websocket_streaming(self): # Start the finger streamer daemon streamer = zuul.lib.log_streamer.LogStreamer( diff --git a/zuul/web/__init__.py b/zuul/web/__init__.py index c380bff361..5437635743 100755 --- a/zuul/web/__init__.py +++ b/zuul/web/__init__.py @@ -16,6 +16,7 @@ import asyncio +import codecs import copy import json import logging @@ -61,11 +62,20 @@ class LogStreamingHandler(object): writer.write(msg.encode('utf8')) await writer.drain() + Decoder = codecs.getincrementaldecoder('utf8') + decoder = Decoder() + while True: data = await reader.read(1024) if data: - await ws.send_str(data.decode('utf8')) + data = decoder.decode(data) + if data: + await ws.send_str(data) else: + # Make sure we flush anything left in the decoder + data = decoder.decode(b'', final=True) + if data: + await ws.send_str(data) writer.close() return