diff --git a/dcos/mesos.py b/dcos/mesos.py index c1923e7..06203ff 100644 --- a/dcos/mesos.py +++ b/dcos/mesos.py @@ -1053,6 +1053,12 @@ class TaskIO(object): self.attach_input_event = threading.Event() self.attach_input_event.clear() + # Set up an event to block printing the output + # until an attach input event has successfully + # been established. + self.print_output_event = threading.Event() + self.print_output_event.clear() + # Set up an event to block the main thread # from exiting until signaled to do so. self.exit_event = threading.Event() @@ -1233,6 +1239,12 @@ class TaskIO(object): # stream to be attached by setting an event. self.attach_input_event.set() + # If we are running in interactive mode, wait to make sure that + # our input connection succeeds before pushing any output to the + # output queue. + if self.interactive: + self.print_output_event.wait() + try: for chunk in response.iter_content(chunk_size=None): records = self.decoder.decode(chunk) @@ -1251,12 +1263,13 @@ class TaskIO(object): """Streams all input data (e.g. STDIN) from the client to the agent """ - def _input_streamer(): - """Generator function yielding ATTACH_CONTAINER_INPUT - messages for streaming. It creates the initial - ATTACH_CONTAINER_INPUT message to establish the - connection, then yields a message from the input_queue on - each subsequent call. + def _initial_input_streamer(): + """Generator function yielding the initial ATTACH_CONTAINER_INPUT + message for streaming. We have a separate generator for this so + that we can attempt the connection once before committing to a + persistent connection where we stream the rest of the input. + + :returns: A RecordIO encoded message """ message = { @@ -1271,6 +1284,17 @@ class TaskIO(object): yield self.encoder.encode(message) + def _input_streamer(): + """Generator function yielding ATTACH_CONTAINER_INPUT + messages for streaming. It yields the _intitial_input_streamer() + message, followed by messages from the input_queue on each + subsequent call. + + :returns: A RecordIO encoded message + """ + + yield next(_initial_input_streamer()) + while True: record = self.input_queue.get() if not record: @@ -1291,6 +1315,19 @@ class TaskIO(object): # `_process_output_stream` function signals us that it's ready. self.attach_input_event.wait() + # Send an intial "Test" message to ensure that we can establish a + # connection with the agent at all. If we can't we will throw an + # exception and break out of this thread. + http.post( + self.agent_url, + data=_initial_input_streamer(), + **req_extra_args) + + # If we succeeded with that connection, unblock process_output_stream() + # from sending output data to the output thread. + self.print_output_event.set() + + # Begin streaming the the input. http.post( self.agent_url, data=_input_streamer(),