Windows: fix exec calls

At some point, we've switched to an alternative process launcher
that uses named pipes to communicate with the child processes. This
implementation has some issues, truncating the process output in some
situations.

This change switches back to subprocess.Popen, which is a much easier
and convenient way to perform exec calls. We're also ensuring that the
os module is not patched (which would cause subprocess.Popen to fail
on Windows due to an eventlet limitation, the reason why the above
mentioned implementation was first introduced).

We're also ensuring that such calls do not block other greenthreads
by leveraging eventlet.tpool.

Side note: I had to store subprocess.Popen in a variable in order
to avoid having OpenStack bandit complaining, even though we're
explicitly passing "shell=False":
http://paste.openstack.org/raw/658319/

Closes-Bug: #1709931

Change-Id: Ib58e12030e69ea10862452c2f141a7a5f2527621
(cherry picked from commit 82d468ba7fd6dce91fec015d39126e71c1434fb1)
This commit is contained in:
Lucian Petrut 2017-08-09 13:50:09 +03:00 committed by Slawek Kaplonski
parent 8dc15cc053
commit a4f91358a0
3 changed files with 43 additions and 610 deletions
neutron

@ -15,15 +15,22 @@
import os
import eventlet
from eventlet import tpool
from neutron_lib.utils import helpers
from oslo_log import log as logging
from oslo_utils import encodeutils
from neutron._i18n import _
from neutron.agent.windows import winutils
LOG = logging.getLogger(__name__)
# subprocess.Popen will spawn two threads consuming stdout/stderr when passing
# data through stdin. We need to make sure that *native* threads will be used
# as pipes are blocking on Windows.
subprocess = eventlet.patcher.original('subprocess')
subprocess.threading = eventlet.patcher.original('threading')
def create_process(cmd, addl_env=None):
cmd = list(map(str, cmd))
@ -33,7 +40,14 @@ def create_process(cmd, addl_env=None):
if addl_env:
env.update(addl_env)
obj = winutils.ProcessWithNamedPipes(cmd, env)
popen = subprocess.Popen
obj = popen(cmd, shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=None,
close_fds=False)
return obj, cmd
@ -47,7 +61,7 @@ def execute(cmd, process_input=None, addl_env=None,
else:
_process_input = None
obj, cmd = create_process(cmd, addl_env=addl_env)
_stdout, _stderr = obj.communicate(_process_input)
_stdout, _stderr = avoid_blocking_call(obj.communicate, _process_input)
obj.stdin.close()
_stdout = helpers.safe_decode_utf8(_stdout)
_stderr = helpers.safe_decode_utf8(_stderr)
@ -74,3 +88,22 @@ def execute(cmd, process_input=None, addl_env=None,
raise RuntimeError(m)
return (_stdout, _stderr) if return_stderr else _stdout
def avoid_blocking_call(f, *args, **kwargs):
"""Ensure that the method "f" will not block other greenthreads.
Performs the call to the function "f" received as parameter in a
different thread using tpool.execute when called from a greenthread.
This will ensure that the function "f" will not block other greenthreads.
If not called from a greenthread, it will invoke the function "f" directly.
The function "f" will receive as parameters the arguments "args" and
keyword arguments "kwargs".
"""
# Note that eventlet.getcurrent will always return a greenlet object.
# In case of a greenthread, the parent greenlet will always be the hub
# loop greenlet.
if eventlet.getcurrent().parent:
return tpool.execute(f, *args, **kwargs)
else:
return f(*args, **kwargs)

@ -1,606 +0,0 @@
# Copyright 2017 Cloudbase Solutions.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import time
import eventlet
from eventlet import tpool
from ovs import winutils as ovs_winutils
import win32con
import win32event
import win32process
import win32security
def avoid_blocking_call(f, *args, **kwargs):
"""Ensure that the method "f" will not block other greenthreads.
Performs the call to the function "f" received as parameter in a
different thread using tpool.execute when called from a greenthread.
This will ensure that the function "f" will not block other greenthreads.
If not called from a greenthread, it will invoke the function "f" directly.
The function "f" will receive as parameters the arguments "args" and
keyword arguments "kwargs".
"""
# Note that eventlet.getcurrent will always return a greenlet object.
# In case of a greenthread, the parent greenlet will always be the hub
# loop greenlet.
if eventlet.getcurrent().parent:
return tpool.execute(f, *args, **kwargs)
else:
return f(*args, **kwargs)
class WindowsException(Exception):
"""Base Windows Exception
This class is inherited by all the other exceptions that are used in
this file. The 'error_message' property should be defined in the class
that inherits from this with a particular message if needed.
"""
error_message = None
def __init__(self, message):
super(WindowsException, self).__init__()
# The error message which will be printed for this exception
self.error_message = message
def __str__(self):
return self.error_message
class NamedPipeException(WindowsException):
"""Exception raised when there is an error with the named pipes.
If there is an error code associated with this exception, it can be
retrieved by accessing the 'code' property of this class.
"""
def __init__(self, message, error_code=None):
super(NamedPipeException, self).__init__(message)
# The error code associated with this exception. This property should
# be different than 'None' if there is an existing error code.
self.code = error_code
if self.code:
# Appending the error code to the message
self.error_message += " Error code: '%s'." % self.code
def __str__(self):
return self._error_string
class ProcessException(WindowsException):
"""Exception raised when there is an error with the child process.
This class inherits the implementation from the super class, it does not
have anything particular. It is intentionally left blank.
"""
pass
class NamedPipe(object):
def __init__(self, pipe_name=None, sec_attributes=-1):
"""Create a named pipe with the given name.
:param pipe_name(Optional): string representing the name of the pipe
which should be used to create the named pipe
:param sec_attributes(Optional): type win32security.SECURITY_ATTRIBUTES
The default value is -1 which uses the default security attributes.
This means that the named pipe handle is inherited when a new
process is created.
"""
# For reading from the named pipe, we will use an overlapped structure
# for non-blocking I/O
self._read = ovs_winutils.pywintypes.OVERLAPPED()
# Create a new event which will be used by the overlapped structure
self._read.hEvent = ovs_winutils.get_new_event()
# This property tells if there is a pending reading operation on
# the named pipe or not.
self._read_pending = False
if pipe_name is None:
# Generate a random name for the named pipe if the name was not
# passed explicitly as parameter.
pipe_name = ("NamedPipe_%d_%s" %
(time.time(), str(random.random()).split(".")[1]))
# Creating the name for a local named pipe. The property "name" will
# have "\\.\pipe\" appended at the start of pipe_name
self.name = ovs_winutils.get_pipe_name(pipe_name)
# This property will contain the handle of the named pipe which can
# be accessed later on.
self.namedpipe = ovs_winutils.create_named_pipe(self.name,
saAttr=sec_attributes)
# This property should be initialised explicitly later on by calling
# the method create_file of this class.
self._npipe_file = None
if not self.namedpipe:
# If there was an error when creating the named pipe, the property
# "namedpipe" should be None. We raise an exception in this case
raise NamedPipeException("Failed to create named pipe.")
@property
def read_overlapped_event(self):
"""Return the event used by the overlapped structure for reading.
This is the handle(event) on which we should wait if we want to be
notified when there is something to read from the named pipe.
"""
return self._read.hEvent
def _wait_event(self, event, timeout=win32event.INFINITE):
"""Wait until the event is signaled or the timeout has passed."""
# If greenthreads are used, we need to wrap the call to
# win32event.WaitForMultipleObjects using avoid_blocking_call to make
# sure the function will not block the other greenthreads.
avoid_blocking_call(win32event.WaitForMultipleObjects,
[event],
False,
timeout)
def wait_for_read(self, timeout=win32event.INFINITE):
"""Wait until there is something to read from the named pipe or the
timeout passed as parameter has passed.
:param timeout: int representing the timeout in milliseconds
"""
if self._read_pending:
self._wait_event(self._read.hEvent, timeout)
def create_file(self, sec_attributes=-1):
"""Create the file for the named pipe and store it in the '_npipe_file'
property of the class.
:param sec_attributes: type win32security.SECURITY_ATTRIBUTES
The default value is -1 which uses the default security attributes.
This means that the file handle will NOT be inherited when
a new process is created.
"""
try:
# Create the file using the name of the named pipe with the given
# security attributes
self._npipe_file = ovs_winutils.create_file(
self.name, attributes=sec_attributes)
try:
ovs_winutils.set_pipe_mode(
self._npipe_file,
ovs_winutils.win32pipe.PIPE_READMODE_BYTE)
except ovs_winutils.pywintypes.error as e:
raise NamedPipeException(
"Could not set pipe read mode to byte. "
"Error: %s." % e.strerror, e.winerror)
except ovs_winutils.pywintypes.error as e:
raise NamedPipeException("Could not create file for named pipe. "
"Error: %s." % e.strerror, e.winerror)
def blocking_write(self, buf, to_namedpipe=True):
"""Write to the named pipe handle or the file handle.
This function will wait until the write operation has completed.
:param buf: string representing the buffer which will be written
:param to_namedpipe: boolean representing where to write the buffer
True = the buffer 'buf' will be written to the named pipe handle
False = the buffer 'buf' will be written to the file handle
"""
if not to_namedpipe and self._npipe_file is None:
# If the method tries to write to the file handle but the
# property '_npipe_file' does not contain the file handle then
# we raise an exception
raise NamedPipeException("create_file must be called first.")
# Represents the handle where we should write the buffer
handle_to_write = self.namedpipe if to_namedpipe else self._npipe_file
# encoded_buf will contain the buffer 'buf' represented in binary type
encoded_buf = ovs_winutils.get_encoded_buffer(buf)
# If greenthreads are used, we need to wrap the call to
# ovs_winutils.write_file using avoid_blocking_call to make
# sure the function will not block the other greenthreads.
(errCode, _nBytesWritten) = avoid_blocking_call(
ovs_winutils.write_file,
handle_to_write,
encoded_buf,
None)
if errCode:
# errCode should be 0 if the operation completed successfully.
# If we reach here it means there was an error during the write
# operation and we should raise an exception
raise NamedPipeException("Could not write to named pipe.", errCode)
def nonblocking_read(self, bytes_to_read, from_namedpipe=True):
"""Read from the named pipe handle or the file handle.
This function returns imediatly and does not wait for the read
operation to complete. In case the read operation is not complete,
the property '_read_pending' will be set to True and the method
get_read_result should be called to retrieve the result. Otherwise,
the output of the read operation is returned.
:param bytes_to_read: int representing the maximum number of bytes
to be read.
:param from_namedpipe: boolean representing from where to read
True = the function reads from the named pipe handle
False = he function reads from the file handle
"""
if self._read_pending:
# If there is a pending read operation, the method
# 'get_read_result' should be called to retrieve the result.
return
# Represents the handle from where we should read.
handle_to_read = self.namedpipe if from_namedpipe else self._npipe_file
# The read operation is non-blocking because the read overlapped
# structure is passed. It will return immediately.
(errCode, self._read_buffer) = ovs_winutils.read_file(
handle_to_read,
bytes_to_read,
self._read)
if errCode:
# The error code should be 0 if the operation executed with success
if errCode == ovs_winutils.winerror.ERROR_IO_PENDING:
# This is returned when the overlapped structure is passed
# to the read operation (which is our case) and the operation
# has not finished yet. We mark this as a pending read
# operation and we will use the method 'get_read_result'
# later on to retrieve the result.
self._read_pending = True
else:
# In this case we received an unexpected error code, raise
# an exception.
raise NamedPipeException(
"Could not read from named pipe.", errCode)
return None
# If we can not retrieve the output from the overlapped result,
# it means that the pipe was disconnected so we have no output.
# The returned value should be an empty string.
output = ""
try:
# Try to retrieve the result from the overlapped structure.
# This call should succeed or otherwise will raise an exception,
# but it will not block.
nBytesRead = ovs_winutils.get_overlapped_result(
handle_to_read,
self._read,
False)
# Mark the read operation as complete
self._read_pending = False
# Retrieve the result and put the decoded result inside the
# 'output' variable.
output = ovs_winutils.get_decoded_buffer(
self._read_buffer[:nBytesRead])
# We need to manually signal the event to make sure the call to
# wait for the event will not block.
win32event.SetEvent(self._read.hEvent)
except NamedPipeException as e:
# If the pipe was disconnected, it means no output, we will return
# an empty string in this case. Otherwise raise an exception.
if e.code not in ovs_winutils.pipe_disconnected_errors:
raise e
return output
def get_read_result(self, from_namedpipe=True):
"""Return the result from the overlapped structure.
If there is no pending read operation, this function will return
immediately. This call will return False if the reading operation
has not completed yet and the read operation is still in progress.
Otherwise, it will return the result.
:param from_namedpipe: boolean representing from where to read
True = the function reads from the named pipe handle
False = he function reads from the file handle
"""
if not self._read_pending:
# There is no pending read operation, we should return here
return
# Represents the handle from where we should read.
handle_to_read = self.namedpipe if from_namedpipe else self._npipe_file
try:
# Try to retrieve the result from the overlapped structure.
# This will raise an ERROR_IO_INCOMPLETE exception if the
# read operation has not completed yet.
nBytesRead = ovs_winutils.get_overlapped_result(handle_to_read,
self._read,
False)
# Mark the read operation as complete
self._read_pending = False
# Decode the result and return it
return ovs_winutils.get_decoded_buffer(
self._read_buffer[:nBytesRead])
except ovs_winutils.pywintypes.error as e:
if e.winerror == ovs_winutils.winerror.ERROR_IO_INCOMPLETE:
# In this case we should call again this function to try to
# retrieve the result.
self._read_pending = True
# Return False to mark that the read operation has not
# completed yet.
return False
else:
# If we reach here it means that an unexpected error was
# received. We should raise an exception in this case.
raise NamedPipeException(
"Could not get the overlapped result. "
"Error: '%s'" % e.strerror, e.winerror)
def close_filehandle(self):
"""Close the file handle."""
ovs_winutils.close_handle(self._npipe_file)
def get_file_handle(self):
"""Returns the file handle."""
return self._npipe_file
def close_all_handles(self):
"""Close all the handles used by this class."""
if hasattr(self, "namedpipe") and self.namedpipe:
ovs_winutils.close_handle(self.namedpipe)
if hasattr(self, "_read") and self._read.hEvent:
ovs_winutils.close_handle(self._read.hEvent)
if hasattr(self, "_npipe_file") and self._npipe_file:
ovs_winutils.close_handle(self._npipe_file)
def __del__(self):
"""Make sure all the handles are closed."""
self.close_all_handles()
class ProcessWithNamedPipes(object):
class HandleClass(object):
"""This class is used only to provide a 'close' method for the stdin,
stdout and stderr of the new process. This ensures compatibility with
the subprocess.Popen returned object.
"""
def __init__(self, namedpipe):
self.namedpipe = namedpipe
def close(self):
# Close all the handles used
if self.namedpipe:
self.namedpipe.close_all_handles()
self.namedpipe = None
# The maximum number of bytes to be read
_BUFSIZE = 16384
def __init__(self, cmd, env):
"""Create a new process executing 'cmd' and with environment 'env'.
:param cmd: string representing the command line to be executed
:param env: instance representing the environment which should be used
for the new process. Look at 'os.environ' for an example.
"""
# The startupinfo structure used to spawn the new process
self._si = win32process.STARTUPINFO()
# Attributes defined to ensure compatibility with the subprocess.Popen
# returned object.
self.returncode = None
self.stdin = None
self.stdout = None
self.stderr = None
self.pid = None
# Convert the command to be a single string
cmd = " ".join(cmd)
# Initialize the named pipes used for stdin, stdout and stderr
self._initialize_named_pipes_for_std()
# Create the child process
self._start_process(cmd, env)
def _initialize_named_pipes_for_std(self):
"""Initialize the named pipes used for communication with the child
process.
"""
# used in generating the name for the pipe
pid = os.getpid()
# Security attributes for the named pipes, should not be inherited
# by the child process. Those are used by the parent process to
# communicate with the child process.
_saAttr_pipe = win32security.SECURITY_ATTRIBUTES()
_saAttr_pipe.bInheritHandle = 0
# Security attributes for the file handles, they should be inherited
# by the child process which will use them as stdin, stdout and stderr.
# The parent process will close those handles after the child process
# is created.
_saAttr_file = win32security.SECURITY_ATTRIBUTES()
_saAttr_file.bInheritHandle = 1
def create_namedpipe_and_file(prefix, saAttr_pipe=_saAttr_pipe,
saAttr_file=_saAttr_file):
"""Create the named pipe and the file for it.
:param prefix: string representing the prefix which will be
appended to the start of the name for the pipe
:param saAttr_pipe: security attributes used to create
the named pipe
:param saAttr_file: security attributes used to create the file
"""
pipename = ("%s_NamedPipe_%d_%d_%s" % (
prefix, pid, time.time(), str(random.random()).split(".")[1]))
# Create the named pipe
pipe = NamedPipe(pipe_name=pipename,
sec_attributes=saAttr_pipe)
# Create the file for the previously created named pipe
pipe.create_file(sec_attributes=saAttr_file)
return pipe
# Create the named pipes and the files used for parent - child process
# communication.
_pipe_stdin = create_namedpipe_and_file("Stdin")
self._pipe_stdout = create_namedpipe_and_file("Stdout")
self._pipe_stderr = create_namedpipe_and_file("Stderr")
# Set the file handles from the named pipes as stdin, stdout and stderr
# in startupinfo structure for the child process.
self._si.hStdInput = _pipe_stdin.get_file_handle()
self._si.hStdOutput = self._pipe_stdout.get_file_handle()
self._si.hStdError = self._pipe_stderr.get_file_handle()
self._si.dwFlags |= win32con.STARTF_USESTDHANDLES
# Wrapping around stdin in order to be able to call self.stdin.close()
# to close the stdin.
self.stdin = ProcessWithNamedPipes.HandleClass(_pipe_stdin)
_pipe_stdin = None
def _get_result_namedpipe(self, namedpipe):
"""Retrieve the result from the named pipe given as parameter.
This function will return False if the read operation has not
completed yet and we should call this method again to try to retrieve
the result.
:param namedpipe: represents the NamedPipe object from where to
retrieve the result
"""
# The default returned value will be empty string. This is returned
# in case the pipe was disconnected.
output = ""
try:
output = namedpipe.get_read_result()
except NamedPipeException as e:
# If the pipe was disconnected the error is ignored, otherwise
# we raise an exception
if e.code not in ovs_winutils.pipe_disconnected_errors:
raise e
return output
def communicate(self, input=None):
"""Return stdout and stderr of the child process.
Interact with process: Send the 'input' argument to stdin.
The function waits until the process terminates and reads from
stdout and stderr.
:param input: string representing the input which should be sent
to the child process. If this value is None, then nothing is passed
as stdin for the child process.
"""
if input:
# If we received any input, write it to stdin then close the handle
# to send EOF on stdin to the child process
self._stdin_write(input)
self.stdin.close()
# Try to retrieve the output for stdout and stderr. If the read
# operation has not completed yet, then None will be returned and
# we will try to retrieve the result again after the process is
# terminated.
stdout = self._pipe_stdout.nonblocking_read(self._BUFSIZE)
stderr = self._pipe_stderr.nonblocking_read(self._BUFSIZE)
# Wait for the process to terminate
self.wait()
if stdout is None:
# Wait until the read operation for stdout has completed and
# then retrieve the result.
self._pipe_stdout.wait_for_read()
stdout = self._get_result_namedpipe(self._pipe_stdout)
if stderr is None:
# Wait until the read operation for stdout has completed and
# then retrieve the result.
self._pipe_stderr.wait_for_read()
stderr = self._get_result_namedpipe(self._pipe_stderr)
# Close all the handles since the child process is terminated
# at this point.
self._pipe_stdout.close_all_handles()
self._pipe_stdout = None
self._pipe_stderr.close_all_handles()
self._pipe_stderr = None
# Return a tuple containing stdout and stderr to ensure compatibility
# with the subprocess module.
return (stdout, stderr)
def _stdin_write(self, input):
"""Send input to stdin for the child process."""
if input:
encoded_buf = ovs_winutils.get_encoded_buffer(input)
self.stdin.namedpipe.blocking_write(encoded_buf)
def _start_process(self, cmd_line, env):
"""Create a process with the command line 'cmd_line' and environment
'env'. Stores the pid of the child process in the 'pid' attribute.
"""
app_name = None
# The command line to be executed.
command_line = cmd_line
process_attributes = None
thread_attributes = None
# Each inheritable handle in the calling process is
# inherited by the new process.
inherit_handles = 1
# The new process has a new console, instead of inheriting
# its parent's console
creation_flags = win32process.CREATE_NO_WINDOW
# Environment used for the new process.
new_environment = env
current_directory = None
proc_args = (app_name,
command_line,
process_attributes,
thread_attributes,
inherit_handles,
creation_flags,
new_environment,
current_directory,
self._si)
proc_handles = win32process.CreateProcess(*proc_args)
# Close the handles that the parent is not going to use
self._pipe_stdout.close_filehandle()
self._pipe_stderr.close_filehandle()
self._hProcess, self._hThread, self.pid, self._tid = proc_handles
def wait(self, timeout=None):
"""Wait for the process to terminate or until timeout expires.
Returns returncode attribute. If timeout is None, then the method
will wait until the process terminates.
:param timeout: int or float representing the timeout in seconds
"""
if timeout is None:
timeout_millis = win32event.INFINITE
else:
timeout_millis = int(timeout * 1000)
if self.returncode is None:
# If the 'returncode' attribute is not set, it means that we
# have to wait for the child process to terminate and to return the
# exit code of it.
result = avoid_blocking_call(win32event.WaitForSingleObject,
self._hProcess,
timeout_millis)
if result == win32event.WAIT_TIMEOUT:
raise ProcessException("Timeout Exception.")
self.returncode = win32process.GetExitCodeProcess(self._hProcess)
# Return the exit code of the child process
return self.returncode

@ -26,7 +26,13 @@ def monkey_patch():
# https://github.com/eventlet/eventlet/commit/b756447bab51046dfc6f1e0e299cc997ab343701
# For details please check https://bugs.launchpad.net/neutron/+bug/1745013
eventlet.hubs.get_hub()
eventlet.monkey_patch()
if os.name != 'nt':
eventlet.monkey_patch()
p_c_e = importutils.import_module('pyroute2.config.asyncio')
p_c_e.asyncio_config()
else:
# eventlet monkey patching the os module causes subprocess.Popen to
# fail on Windows when using pipes due to missing non-blocking IO
# support.
eventlet.monkey_patch(os=False)