Fix streaming decoding boundaries

Use an IncrementalDecoder that understands character boundaries.

Change-Id: I032f15eef15a26287d0d912d6ef105cf5beae200
This commit is contained in:
David Shrewsbury 2018-04-06 09:08:50 -04:00
parent c56edfbc74
commit 3695f376b2
2 changed files with 118 additions and 1 deletions

View File

@ -16,6 +16,7 @@
import aiohttp
import asyncio
import io
import logging
import json
import os
@ -217,6 +218,112 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
self.streaming_data += data.decode('utf-8')
s.shutdown(socket.SHUT_RDWR)
def test_decode_boundaries(self):
'''
Test multi-byte characters crossing read buffer boundaries.
The finger client used by ZuulWeb reads in increments of 1024 bytes.
If the last byte is a multi-byte character, we end up with an error
similar to:
'utf-8' codec can't decode byte 0xe2 in position 1023: \
unexpected end of data
By making the 1024th character in the log file a multi-byte character
(here, the Euro character), we can test this.
'''
# Start the finger streamer daemon
streamer = zuul.lib.log_streamer.LogStreamer(
self.host, 0, self.executor_server.jobdir_root)
self.addCleanup(streamer.stop)
# Need to set the streaming port before submitting the job
finger_port = streamer.server.socket.getsockname()[1]
self.executor_server.log_streaming_port = finger_port
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
# We don't have any real synchronization for the ansible jobs, so
# just wait until we get our running build.
while not len(self.builds):
time.sleep(0.1)
build = self.builds[0]
self.assertEqual(build.name, 'python27')
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
while not os.path.exists(build_dir):
time.sleep(0.1)
# Need to wait to make sure that jobdir gets set
while build.jobdir is None:
time.sleep(0.1)
build = self.builds[0]
# Wait for the job to begin running and create the ansible log file.
# The job waits to complete until the flag file exists, so we can
# safely access the log here. We only open it (to force a file handle
# to be kept open for it after the job finishes) but wait to read the
# contents until the job is done.
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
while not os.path.exists(ansible_log):
time.sleep(0.1)
# Replace log file contents with the 1024th character being a
# multi-byte character.
with io.open(ansible_log, 'w', encoding='utf8') as f:
f.write("a" * 1023)
f.write(u"\u20AC")
logfile = open(ansible_log, 'r')
self.addCleanup(logfile.close)
# Start the web server
web_server = zuul.web.ZuulWeb(
listen_address='::', listen_port=9000,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
static_path=tempfile.gettempdir())
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
self.addCleanup(web_server.stop)
# Wait until web server is started
while True:
try:
with socket.create_connection((self.host, 9000)):
break
except ConnectionRefusedError:
time.sleep(0.1)
# Start a thread with the websocket client
ws_client_event = threading.Event()
self.ws_client_results = ''
ws_client_thread = threading.Thread(
target=self.runWSClient, args=(build.uuid, ws_client_event)
)
ws_client_thread.start()
ws_client_event.wait()
# Allow the job to complete
flag_file = os.path.join(build_dir, 'test_wait')
open(flag_file, 'w').close()
# Wait for the websocket client to complete, which it should when
# it's received the full log.
ws_client_thread.join()
self.waitUntilSettled()
file_contents = logfile.read()
logfile.close()
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
self.assertEqual(file_contents, self.ws_client_results)
def test_websocket_streaming(self):
# Start the finger streamer daemon
streamer = zuul.lib.log_streamer.LogStreamer(

View File

@ -16,6 +16,7 @@
import asyncio
import codecs
import copy
import json
import logging
@ -61,11 +62,20 @@ class LogStreamingHandler(object):
writer.write(msg.encode('utf8'))
await writer.drain()
Decoder = codecs.getincrementaldecoder('utf8')
decoder = Decoder()
while True:
data = await reader.read(1024)
if data:
await ws.send_str(data.decode('utf8'))
data = decoder.decode(data)
if data:
await ws.send_str(data)
else:
# Make sure we flush anything left in the decoder
data = decoder.decode(b'', final=True)
if data:
await ws.send_str(data)
writer.close()
return