task: update dcos task exec ATTACH_CONTAINER_INPUT semantics. (#856)
With the new semantics, an ATTACH_CONTAINER_INPUT call will be attempted once before committing to a persistent connection where the actual input is streamed to the agent. This change is required to get around a limitation of the python client libraries where an HTTP response is not read until after the entire request is sent. IN our case, we *never* complete the request (until a timeout), so we don't see the proper error unless we do things as updated in this commit.
This commit is contained in:
@@ -1053,6 +1053,12 @@ class TaskIO(object):
|
||||
self.attach_input_event = threading.Event()
|
||||
self.attach_input_event.clear()
|
||||
|
||||
# Set up an event to block printing the output
|
||||
# until an attach input event has successfully
|
||||
# been established.
|
||||
self.print_output_event = threading.Event()
|
||||
self.print_output_event.clear()
|
||||
|
||||
# Set up an event to block the main thread
|
||||
# from exiting until signaled to do so.
|
||||
self.exit_event = threading.Event()
|
||||
@@ -1233,6 +1239,12 @@ class TaskIO(object):
|
||||
# stream to be attached by setting an event.
|
||||
self.attach_input_event.set()
|
||||
|
||||
# If we are running in interactive mode, wait to make sure that
|
||||
# our input connection succeeds before pushing any output to the
|
||||
# output queue.
|
||||
if self.interactive:
|
||||
self.print_output_event.wait()
|
||||
|
||||
try:
|
||||
for chunk in response.iter_content(chunk_size=None):
|
||||
records = self.decoder.decode(chunk)
|
||||
@@ -1251,12 +1263,13 @@ class TaskIO(object):
|
||||
"""Streams all input data (e.g. STDIN) from the client to the agent
|
||||
"""
|
||||
|
||||
def _input_streamer():
|
||||
"""Generator function yielding ATTACH_CONTAINER_INPUT
|
||||
messages for streaming. It creates the initial
|
||||
ATTACH_CONTAINER_INPUT message to establish the
|
||||
connection, then yields a message from the input_queue on
|
||||
each subsequent call.
|
||||
def _initial_input_streamer():
|
||||
"""Generator function yielding the initial ATTACH_CONTAINER_INPUT
|
||||
message for streaming. We have a separate generator for this so
|
||||
that we can attempt the connection once before committing to a
|
||||
persistent connection where we stream the rest of the input.
|
||||
|
||||
:returns: A RecordIO encoded message
|
||||
"""
|
||||
|
||||
message = {
|
||||
@@ -1271,6 +1284,17 @@ class TaskIO(object):
|
||||
|
||||
yield self.encoder.encode(message)
|
||||
|
||||
def _input_streamer():
|
||||
"""Generator function yielding ATTACH_CONTAINER_INPUT
|
||||
messages for streaming. It yields the _intitial_input_streamer()
|
||||
message, followed by messages from the input_queue on each
|
||||
subsequent call.
|
||||
|
||||
:returns: A RecordIO encoded message
|
||||
"""
|
||||
|
||||
yield next(_initial_input_streamer())
|
||||
|
||||
while True:
|
||||
record = self.input_queue.get()
|
||||
if not record:
|
||||
@@ -1291,6 +1315,19 @@ class TaskIO(object):
|
||||
# `_process_output_stream` function signals us that it's ready.
|
||||
self.attach_input_event.wait()
|
||||
|
||||
# Send an intial "Test" message to ensure that we can establish a
|
||||
# connection with the agent at all. If we can't we will throw an
|
||||
# exception and break out of this thread.
|
||||
http.post(
|
||||
self.agent_url,
|
||||
data=_initial_input_streamer(),
|
||||
**req_extra_args)
|
||||
|
||||
# If we succeeded with that connection, unblock process_output_stream()
|
||||
# from sending output data to the output thread.
|
||||
self.print_output_event.set()
|
||||
|
||||
# Begin streaming the the input.
|
||||
http.post(
|
||||
self.agent_url,
|
||||
data=_input_streamer(),
|
||||
|
||||
Reference in New Issue
Block a user