Merge "Fix streaming decoding boundaries"
This commit is contained in:
commit
03e52eab3b
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import io
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
@ -217,6 +218,112 @@ class TestStreaming(tests.base.AnsibleZuulTestCase):
|
||||||
self.streaming_data += data.decode('utf-8')
|
self.streaming_data += data.decode('utf-8')
|
||||||
s.shutdown(socket.SHUT_RDWR)
|
s.shutdown(socket.SHUT_RDWR)
|
||||||
|
|
||||||
|
def test_decode_boundaries(self):
|
||||||
|
'''
|
||||||
|
Test multi-byte characters crossing read buffer boundaries.
|
||||||
|
|
||||||
|
The finger client used by ZuulWeb reads in increments of 1024 bytes.
|
||||||
|
If the last byte is a multi-byte character, we end up with an error
|
||||||
|
similar to:
|
||||||
|
|
||||||
|
'utf-8' codec can't decode byte 0xe2 in position 1023: \
|
||||||
|
unexpected end of data
|
||||||
|
|
||||||
|
By making the 1024th character in the log file a multi-byte character
|
||||||
|
(here, the Euro character), we can test this.
|
||||||
|
'''
|
||||||
|
# Start the finger streamer daemon
|
||||||
|
streamer = zuul.lib.log_streamer.LogStreamer(
|
||||||
|
self.host, 0, self.executor_server.jobdir_root)
|
||||||
|
self.addCleanup(streamer.stop)
|
||||||
|
|
||||||
|
# Need to set the streaming port before submitting the job
|
||||||
|
finger_port = streamer.server.socket.getsockname()[1]
|
||||||
|
self.executor_server.log_streaming_port = finger_port
|
||||||
|
|
||||||
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
|
|
||||||
|
# We don't have any real synchronization for the ansible jobs, so
|
||||||
|
# just wait until we get our running build.
|
||||||
|
while not len(self.builds):
|
||||||
|
time.sleep(0.1)
|
||||||
|
build = self.builds[0]
|
||||||
|
self.assertEqual(build.name, 'python27')
|
||||||
|
|
||||||
|
build_dir = os.path.join(self.executor_server.jobdir_root, build.uuid)
|
||||||
|
while not os.path.exists(build_dir):
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# Need to wait to make sure that jobdir gets set
|
||||||
|
while build.jobdir is None:
|
||||||
|
time.sleep(0.1)
|
||||||
|
build = self.builds[0]
|
||||||
|
|
||||||
|
# Wait for the job to begin running and create the ansible log file.
|
||||||
|
# The job waits to complete until the flag file exists, so we can
|
||||||
|
# safely access the log here. We only open it (to force a file handle
|
||||||
|
# to be kept open for it after the job finishes) but wait to read the
|
||||||
|
# contents until the job is done.
|
||||||
|
ansible_log = os.path.join(build.jobdir.log_root, 'job-output.txt')
|
||||||
|
while not os.path.exists(ansible_log):
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# Replace log file contents with the 1024th character being a
|
||||||
|
# multi-byte character.
|
||||||
|
with io.open(ansible_log, 'w', encoding='utf8') as f:
|
||||||
|
f.write("a" * 1023)
|
||||||
|
f.write(u"\u20AC")
|
||||||
|
|
||||||
|
logfile = open(ansible_log, 'r')
|
||||||
|
self.addCleanup(logfile.close)
|
||||||
|
|
||||||
|
# Start the web server
|
||||||
|
web_server = zuul.web.ZuulWeb(
|
||||||
|
listen_address='::', listen_port=9000,
|
||||||
|
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
|
||||||
|
static_path=tempfile.gettempdir())
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
loop.set_debug(True)
|
||||||
|
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
|
||||||
|
ws_thread.start()
|
||||||
|
self.addCleanup(loop.close)
|
||||||
|
self.addCleanup(ws_thread.join)
|
||||||
|
self.addCleanup(web_server.stop)
|
||||||
|
|
||||||
|
# Wait until web server is started
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
with socket.create_connection((self.host, 9000)):
|
||||||
|
break
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# Start a thread with the websocket client
|
||||||
|
ws_client_event = threading.Event()
|
||||||
|
self.ws_client_results = ''
|
||||||
|
ws_client_thread = threading.Thread(
|
||||||
|
target=self.runWSClient, args=(build.uuid, ws_client_event)
|
||||||
|
)
|
||||||
|
ws_client_thread.start()
|
||||||
|
ws_client_event.wait()
|
||||||
|
|
||||||
|
# Allow the job to complete
|
||||||
|
flag_file = os.path.join(build_dir, 'test_wait')
|
||||||
|
open(flag_file, 'w').close()
|
||||||
|
|
||||||
|
# Wait for the websocket client to complete, which it should when
|
||||||
|
# it's received the full log.
|
||||||
|
ws_client_thread.join()
|
||||||
|
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
file_contents = logfile.read()
|
||||||
|
logfile.close()
|
||||||
|
self.log.debug("\n\nFile contents: %s\n\n", file_contents)
|
||||||
|
self.log.debug("\n\nStreamed: %s\n\n", self.ws_client_results)
|
||||||
|
self.assertEqual(file_contents, self.ws_client_results)
|
||||||
|
|
||||||
def test_websocket_streaming(self):
|
def test_websocket_streaming(self):
|
||||||
# Start the finger streamer daemon
|
# Start the finger streamer daemon
|
||||||
streamer = zuul.lib.log_streamer.LogStreamer(
|
streamer = zuul.lib.log_streamer.LogStreamer(
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import codecs
|
||||||
import copy
|
import copy
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
@ -61,11 +62,20 @@ class LogStreamingHandler(object):
|
||||||
writer.write(msg.encode('utf8'))
|
writer.write(msg.encode('utf8'))
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
|
|
||||||
|
Decoder = codecs.getincrementaldecoder('utf8')
|
||||||
|
decoder = Decoder()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
data = await reader.read(1024)
|
data = await reader.read(1024)
|
||||||
if data:
|
if data:
|
||||||
await ws.send_str(data.decode('utf8'))
|
data = decoder.decode(data)
|
||||||
|
if data:
|
||||||
|
await ws.send_str(data)
|
||||||
else:
|
else:
|
||||||
|
# Make sure we flush anything left in the decoder
|
||||||
|
data = decoder.decode(b'', final=True)
|
||||||
|
if data:
|
||||||
|
await ws.send_str(data)
|
||||||
writer.close()
|
writer.close()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue