diff --git a/neutron/agent/windows/utils.py b/neutron/agent/windows/utils.py index 64f0cbf9165..f4511fcf905 100644 --- a/neutron/agent/windows/utils.py +++ b/neutron/agent/windows/utils.py @@ -15,15 +15,22 @@ import os +import eventlet +from eventlet import tpool from neutron_lib.utils import helpers from oslo_log import log as logging from oslo_utils import encodeutils from neutron._i18n import _ -from neutron.agent.windows import winutils LOG = logging.getLogger(__name__) +# subprocess.Popen will spawn two threads consuming stdout/stderr when passing +# data through stdin. We need to make sure that *native* threads will be used +# as pipes are blocking on Windows. +subprocess = eventlet.patcher.original('subprocess') +subprocess.threading = eventlet.patcher.original('threading') + def create_process(cmd, addl_env=None): cmd = list(map(str, cmd)) @@ -33,7 +40,14 @@ def create_process(cmd, addl_env=None): if addl_env: env.update(addl_env) - obj = winutils.ProcessWithNamedPipes(cmd, env) + popen = subprocess.Popen + obj = popen(cmd, shell=False, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + preexec_fn=None, + close_fds=False) return obj, cmd @@ -47,7 +61,7 @@ def execute(cmd, process_input=None, addl_env=None, else: _process_input = None obj, cmd = create_process(cmd, addl_env=addl_env) - _stdout, _stderr = obj.communicate(_process_input) + _stdout, _stderr = avoid_blocking_call(obj.communicate, _process_input) obj.stdin.close() _stdout = helpers.safe_decode_utf8(_stdout) _stderr = helpers.safe_decode_utf8(_stderr) @@ -74,3 +88,22 @@ def execute(cmd, process_input=None, addl_env=None, raise RuntimeError(m) return (_stdout, _stderr) if return_stderr else _stdout + + +def avoid_blocking_call(f, *args, **kwargs): + """Ensure that the method "f" will not block other greenthreads. + + Performs the call to the function "f" received as parameter in a + different thread using tpool.execute when called from a greenthread. + This will ensure that the function "f" will not block other greenthreads. + If not called from a greenthread, it will invoke the function "f" directly. + The function "f" will receive as parameters the arguments "args" and + keyword arguments "kwargs". + """ + # Note that eventlet.getcurrent will always return a greenlet object. + # In case of a greenthread, the parent greenlet will always be the hub + # loop greenlet. + if eventlet.getcurrent().parent: + return tpool.execute(f, *args, **kwargs) + else: + return f(*args, **kwargs) diff --git a/neutron/agent/windows/winutils.py b/neutron/agent/windows/winutils.py deleted file mode 100644 index 6f9fff7d5bd..00000000000 --- a/neutron/agent/windows/winutils.py +++ /dev/null @@ -1,606 +0,0 @@ -# Copyright 2017 Cloudbase Solutions. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import random -import time - -import eventlet -from eventlet import tpool -from ovs import winutils as ovs_winutils - -import win32con -import win32event -import win32process -import win32security - - -def avoid_blocking_call(f, *args, **kwargs): - """Ensure that the method "f" will not block other greenthreads. - - Performs the call to the function "f" received as parameter in a - different thread using tpool.execute when called from a greenthread. - This will ensure that the function "f" will not block other greenthreads. - If not called from a greenthread, it will invoke the function "f" directly. - The function "f" will receive as parameters the arguments "args" and - keyword arguments "kwargs". - """ - # Note that eventlet.getcurrent will always return a greenlet object. - # In case of a greenthread, the parent greenlet will always be the hub - # loop greenlet. - if eventlet.getcurrent().parent: - return tpool.execute(f, *args, **kwargs) - else: - return f(*args, **kwargs) - - -class WindowsException(Exception): - """Base Windows Exception - - This class is inherited by all the other exceptions that are used in - this file. The 'error_message' property should be defined in the class - that inherits from this with a particular message if needed. - """ - error_message = None - - def __init__(self, message): - super(WindowsException, self).__init__() - # The error message which will be printed for this exception - self.error_message = message - - def __str__(self): - return self.error_message - - -class NamedPipeException(WindowsException): - """Exception raised when there is an error with the named pipes. - - If there is an error code associated with this exception, it can be - retrieved by accessing the 'code' property of this class. - """ - def __init__(self, message, error_code=None): - super(NamedPipeException, self).__init__(message) - # The error code associated with this exception. This property should - # be different than 'None' if there is an existing error code. - self.code = error_code - if self.code: - # Appending the error code to the message - self.error_message += " Error code: '%s'." % self.code - - def __str__(self): - return self._error_string - - -class ProcessException(WindowsException): - """Exception raised when there is an error with the child process. - - This class inherits the implementation from the super class, it does not - have anything particular. It is intentionally left blank. - """ - pass - - -class NamedPipe(object): - def __init__(self, pipe_name=None, sec_attributes=-1): - """Create a named pipe with the given name. - - :param pipe_name(Optional): string representing the name of the pipe - which should be used to create the named pipe - :param sec_attributes(Optional): type win32security.SECURITY_ATTRIBUTES - The default value is -1 which uses the default security attributes. - This means that the named pipe handle is inherited when a new - process is created. - """ - # For reading from the named pipe, we will use an overlapped structure - # for non-blocking I/O - self._read = ovs_winutils.pywintypes.OVERLAPPED() - # Create a new event which will be used by the overlapped structure - self._read.hEvent = ovs_winutils.get_new_event() - # This property tells if there is a pending reading operation on - # the named pipe or not. - self._read_pending = False - - if pipe_name is None: - # Generate a random name for the named pipe if the name was not - # passed explicitly as parameter. - pipe_name = ("NamedPipe_%d_%s" % - (time.time(), str(random.random()).split(".")[1])) - - # Creating the name for a local named pipe. The property "name" will - # have "\\.\pipe\" appended at the start of pipe_name - self.name = ovs_winutils.get_pipe_name(pipe_name) - # This property will contain the handle of the named pipe which can - # be accessed later on. - self.namedpipe = ovs_winutils.create_named_pipe(self.name, - saAttr=sec_attributes) - # This property should be initialised explicitly later on by calling - # the method create_file of this class. - self._npipe_file = None - - if not self.namedpipe: - # If there was an error when creating the named pipe, the property - # "namedpipe" should be None. We raise an exception in this case - raise NamedPipeException("Failed to create named pipe.") - - @property - def read_overlapped_event(self): - """Return the event used by the overlapped structure for reading. - - This is the handle(event) on which we should wait if we want to be - notified when there is something to read from the named pipe. - """ - return self._read.hEvent - - def _wait_event(self, event, timeout=win32event.INFINITE): - """Wait until the event is signaled or the timeout has passed.""" - # If greenthreads are used, we need to wrap the call to - # win32event.WaitForMultipleObjects using avoid_blocking_call to make - # sure the function will not block the other greenthreads. - avoid_blocking_call(win32event.WaitForMultipleObjects, - [event], - False, - timeout) - - def wait_for_read(self, timeout=win32event.INFINITE): - """Wait until there is something to read from the named pipe or the - - timeout passed as parameter has passed. - - :param timeout: int representing the timeout in milliseconds - """ - if self._read_pending: - self._wait_event(self._read.hEvent, timeout) - - def create_file(self, sec_attributes=-1): - """Create the file for the named pipe and store it in the '_npipe_file' - - property of the class. - - :param sec_attributes: type win32security.SECURITY_ATTRIBUTES - The default value is -1 which uses the default security attributes. - This means that the file handle will NOT be inherited when - a new process is created. - """ - try: - # Create the file using the name of the named pipe with the given - # security attributes - self._npipe_file = ovs_winutils.create_file( - self.name, attributes=sec_attributes) - try: - ovs_winutils.set_pipe_mode( - self._npipe_file, - ovs_winutils.win32pipe.PIPE_READMODE_BYTE) - except ovs_winutils.pywintypes.error as e: - raise NamedPipeException( - "Could not set pipe read mode to byte. " - "Error: %s." % e.strerror, e.winerror) - except ovs_winutils.pywintypes.error as e: - raise NamedPipeException("Could not create file for named pipe. " - "Error: %s." % e.strerror, e.winerror) - - def blocking_write(self, buf, to_namedpipe=True): - """Write to the named pipe handle or the file handle. - - This function will wait until the write operation has completed. - - :param buf: string representing the buffer which will be written - :param to_namedpipe: boolean representing where to write the buffer - True = the buffer 'buf' will be written to the named pipe handle - False = the buffer 'buf' will be written to the file handle - """ - if not to_namedpipe and self._npipe_file is None: - # If the method tries to write to the file handle but the - # property '_npipe_file' does not contain the file handle then - # we raise an exception - raise NamedPipeException("create_file must be called first.") - # Represents the handle where we should write the buffer - handle_to_write = self.namedpipe if to_namedpipe else self._npipe_file - # encoded_buf will contain the buffer 'buf' represented in binary type - encoded_buf = ovs_winutils.get_encoded_buffer(buf) - - # If greenthreads are used, we need to wrap the call to - # ovs_winutils.write_file using avoid_blocking_call to make - # sure the function will not block the other greenthreads. - (errCode, _nBytesWritten) = avoid_blocking_call( - ovs_winutils.write_file, - handle_to_write, - encoded_buf, - None) - if errCode: - # errCode should be 0 if the operation completed successfully. - # If we reach here it means there was an error during the write - # operation and we should raise an exception - raise NamedPipeException("Could not write to named pipe.", errCode) - - def nonblocking_read(self, bytes_to_read, from_namedpipe=True): - """Read from the named pipe handle or the file handle. - - This function returns imediatly and does not wait for the read - operation to complete. In case the read operation is not complete, - the property '_read_pending' will be set to True and the method - get_read_result should be called to retrieve the result. Otherwise, - the output of the read operation is returned. - - :param bytes_to_read: int representing the maximum number of bytes - to be read. - :param from_namedpipe: boolean representing from where to read - True = the function reads from the named pipe handle - False = he function reads from the file handle - """ - if self._read_pending: - # If there is a pending read operation, the method - # 'get_read_result' should be called to retrieve the result. - return - - # Represents the handle from where we should read. - handle_to_read = self.namedpipe if from_namedpipe else self._npipe_file - - # The read operation is non-blocking because the read overlapped - # structure is passed. It will return immediately. - (errCode, self._read_buffer) = ovs_winutils.read_file( - handle_to_read, - bytes_to_read, - self._read) - - if errCode: - # The error code should be 0 if the operation executed with success - if errCode == ovs_winutils.winerror.ERROR_IO_PENDING: - # This is returned when the overlapped structure is passed - # to the read operation (which is our case) and the operation - # has not finished yet. We mark this as a pending read - # operation and we will use the method 'get_read_result' - # later on to retrieve the result. - self._read_pending = True - else: - # In this case we received an unexpected error code, raise - # an exception. - raise NamedPipeException( - "Could not read from named pipe.", errCode) - return None - - # If we can not retrieve the output from the overlapped result, - # it means that the pipe was disconnected so we have no output. - # The returned value should be an empty string. - output = "" - try: - # Try to retrieve the result from the overlapped structure. - # This call should succeed or otherwise will raise an exception, - # but it will not block. - nBytesRead = ovs_winutils.get_overlapped_result( - handle_to_read, - self._read, - False) - # Mark the read operation as complete - self._read_pending = False - # Retrieve the result and put the decoded result inside the - # 'output' variable. - output = ovs_winutils.get_decoded_buffer( - self._read_buffer[:nBytesRead]) - # We need to manually signal the event to make sure the call to - # wait for the event will not block. - win32event.SetEvent(self._read.hEvent) - except NamedPipeException as e: - # If the pipe was disconnected, it means no output, we will return - # an empty string in this case. Otherwise raise an exception. - if e.code not in ovs_winutils.pipe_disconnected_errors: - raise e - return output - - def get_read_result(self, from_namedpipe=True): - """Return the result from the overlapped structure. - - If there is no pending read operation, this function will return - immediately. This call will return False if the reading operation - has not completed yet and the read operation is still in progress. - Otherwise, it will return the result. - - :param from_namedpipe: boolean representing from where to read - True = the function reads from the named pipe handle - False = he function reads from the file handle - """ - if not self._read_pending: - # There is no pending read operation, we should return here - return - - # Represents the handle from where we should read. - handle_to_read = self.namedpipe if from_namedpipe else self._npipe_file - try: - # Try to retrieve the result from the overlapped structure. - # This will raise an ERROR_IO_INCOMPLETE exception if the - # read operation has not completed yet. - nBytesRead = ovs_winutils.get_overlapped_result(handle_to_read, - self._read, - False) - # Mark the read operation as complete - self._read_pending = False - # Decode the result and return it - return ovs_winutils.get_decoded_buffer( - self._read_buffer[:nBytesRead]) - except ovs_winutils.pywintypes.error as e: - if e.winerror == ovs_winutils.winerror.ERROR_IO_INCOMPLETE: - # In this case we should call again this function to try to - # retrieve the result. - self._read_pending = True - # Return False to mark that the read operation has not - # completed yet. - return False - else: - # If we reach here it means that an unexpected error was - # received. We should raise an exception in this case. - raise NamedPipeException( - "Could not get the overlapped result. " - "Error: '%s'" % e.strerror, e.winerror) - - def close_filehandle(self): - """Close the file handle.""" - ovs_winutils.close_handle(self._npipe_file) - - def get_file_handle(self): - """Returns the file handle.""" - return self._npipe_file - - def close_all_handles(self): - """Close all the handles used by this class.""" - if hasattr(self, "namedpipe") and self.namedpipe: - ovs_winutils.close_handle(self.namedpipe) - if hasattr(self, "_read") and self._read.hEvent: - ovs_winutils.close_handle(self._read.hEvent) - if hasattr(self, "_npipe_file") and self._npipe_file: - ovs_winutils.close_handle(self._npipe_file) - - def __del__(self): - """Make sure all the handles are closed.""" - self.close_all_handles() - - -class ProcessWithNamedPipes(object): - class HandleClass(object): - """This class is used only to provide a 'close' method for the stdin, - stdout and stderr of the new process. This ensures compatibility with - the subprocess.Popen returned object. - """ - def __init__(self, namedpipe): - self.namedpipe = namedpipe - - def close(self): - # Close all the handles used - if self.namedpipe: - self.namedpipe.close_all_handles() - self.namedpipe = None - - # The maximum number of bytes to be read - _BUFSIZE = 16384 - - def __init__(self, cmd, env): - """Create a new process executing 'cmd' and with environment 'env'. - - :param cmd: string representing the command line to be executed - :param env: instance representing the environment which should be used - for the new process. Look at 'os.environ' for an example. - """ - # The startupinfo structure used to spawn the new process - self._si = win32process.STARTUPINFO() - - # Attributes defined to ensure compatibility with the subprocess.Popen - # returned object. - self.returncode = None - self.stdin = None - self.stdout = None - self.stderr = None - self.pid = None - - # Convert the command to be a single string - cmd = " ".join(cmd) - # Initialize the named pipes used for stdin, stdout and stderr - self._initialize_named_pipes_for_std() - # Create the child process - self._start_process(cmd, env) - - def _initialize_named_pipes_for_std(self): - """Initialize the named pipes used for communication with the child - process. - """ - - # used in generating the name for the pipe - pid = os.getpid() - - # Security attributes for the named pipes, should not be inherited - # by the child process. Those are used by the parent process to - # communicate with the child process. - _saAttr_pipe = win32security.SECURITY_ATTRIBUTES() - _saAttr_pipe.bInheritHandle = 0 - # Security attributes for the file handles, they should be inherited - # by the child process which will use them as stdin, stdout and stderr. - # The parent process will close those handles after the child process - # is created. - _saAttr_file = win32security.SECURITY_ATTRIBUTES() - _saAttr_file.bInheritHandle = 1 - - def create_namedpipe_and_file(prefix, saAttr_pipe=_saAttr_pipe, - saAttr_file=_saAttr_file): - """Create the named pipe and the file for it. - - :param prefix: string representing the prefix which will be - appended to the start of the name for the pipe - :param saAttr_pipe: security attributes used to create - the named pipe - :param saAttr_file: security attributes used to create the file - """ - pipename = ("%s_NamedPipe_%d_%d_%s" % ( - prefix, pid, time.time(), str(random.random()).split(".")[1])) - # Create the named pipe - pipe = NamedPipe(pipe_name=pipename, - sec_attributes=saAttr_pipe) - # Create the file for the previously created named pipe - pipe.create_file(sec_attributes=saAttr_file) - return pipe - - # Create the named pipes and the files used for parent - child process - # communication. - _pipe_stdin = create_namedpipe_and_file("Stdin") - self._pipe_stdout = create_namedpipe_and_file("Stdout") - self._pipe_stderr = create_namedpipe_and_file("Stderr") - - # Set the file handles from the named pipes as stdin, stdout and stderr - # in startupinfo structure for the child process. - self._si.hStdInput = _pipe_stdin.get_file_handle() - self._si.hStdOutput = self._pipe_stdout.get_file_handle() - self._si.hStdError = self._pipe_stderr.get_file_handle() - self._si.dwFlags |= win32con.STARTF_USESTDHANDLES - - # Wrapping around stdin in order to be able to call self.stdin.close() - # to close the stdin. - self.stdin = ProcessWithNamedPipes.HandleClass(_pipe_stdin) - _pipe_stdin = None - - def _get_result_namedpipe(self, namedpipe): - """Retrieve the result from the named pipe given as parameter. - - This function will return False if the read operation has not - completed yet and we should call this method again to try to retrieve - the result. - - :param namedpipe: represents the NamedPipe object from where to - retrieve the result - """ - # The default returned value will be empty string. This is returned - # in case the pipe was disconnected. - output = "" - try: - output = namedpipe.get_read_result() - except NamedPipeException as e: - # If the pipe was disconnected the error is ignored, otherwise - # we raise an exception - if e.code not in ovs_winutils.pipe_disconnected_errors: - raise e - return output - - def communicate(self, input=None): - """Return stdout and stderr of the child process. - - Interact with process: Send the 'input' argument to stdin. - The function waits until the process terminates and reads from - stdout and stderr. - - :param input: string representing the input which should be sent - to the child process. If this value is None, then nothing is passed - as stdin for the child process. - """ - if input: - # If we received any input, write it to stdin then close the handle - # to send EOF on stdin to the child process - self._stdin_write(input) - self.stdin.close() - - # Try to retrieve the output for stdout and stderr. If the read - # operation has not completed yet, then None will be returned and - # we will try to retrieve the result again after the process is - # terminated. - stdout = self._pipe_stdout.nonblocking_read(self._BUFSIZE) - stderr = self._pipe_stderr.nonblocking_read(self._BUFSIZE) - - # Wait for the process to terminate - self.wait() - - if stdout is None: - # Wait until the read operation for stdout has completed and - # then retrieve the result. - self._pipe_stdout.wait_for_read() - stdout = self._get_result_namedpipe(self._pipe_stdout) - - if stderr is None: - # Wait until the read operation for stdout has completed and - # then retrieve the result. - self._pipe_stderr.wait_for_read() - stderr = self._get_result_namedpipe(self._pipe_stderr) - - # Close all the handles since the child process is terminated - # at this point. - self._pipe_stdout.close_all_handles() - self._pipe_stdout = None - self._pipe_stderr.close_all_handles() - self._pipe_stderr = None - - # Return a tuple containing stdout and stderr to ensure compatibility - # with the subprocess module. - return (stdout, stderr) - - def _stdin_write(self, input): - """Send input to stdin for the child process.""" - if input: - encoded_buf = ovs_winutils.get_encoded_buffer(input) - self.stdin.namedpipe.blocking_write(encoded_buf) - - def _start_process(self, cmd_line, env): - """Create a process with the command line 'cmd_line' and environment - 'env'. Stores the pid of the child process in the 'pid' attribute. - """ - app_name = None - # The command line to be executed. - command_line = cmd_line - process_attributes = None - thread_attributes = None - # Each inheritable handle in the calling process is - # inherited by the new process. - inherit_handles = 1 - # The new process has a new console, instead of inheriting - # its parent's console - creation_flags = win32process.CREATE_NO_WINDOW - # Environment used for the new process. - new_environment = env - current_directory = None - - proc_args = (app_name, - command_line, - process_attributes, - thread_attributes, - inherit_handles, - creation_flags, - new_environment, - current_directory, - self._si) - proc_handles = win32process.CreateProcess(*proc_args) - - # Close the handles that the parent is not going to use - self._pipe_stdout.close_filehandle() - self._pipe_stderr.close_filehandle() - - self._hProcess, self._hThread, self.pid, self._tid = proc_handles - - def wait(self, timeout=None): - """Wait for the process to terminate or until timeout expires. - - Returns returncode attribute. If timeout is None, then the method - will wait until the process terminates. - - :param timeout: int or float representing the timeout in seconds - """ - if timeout is None: - timeout_millis = win32event.INFINITE - else: - timeout_millis = int(timeout * 1000) - - if self.returncode is None: - # If the 'returncode' attribute is not set, it means that we - # have to wait for the child process to terminate and to return the - # exit code of it. - result = avoid_blocking_call(win32event.WaitForSingleObject, - self._hProcess, - timeout_millis) - if result == win32event.WAIT_TIMEOUT: - raise ProcessException("Timeout Exception.") - self.returncode = win32process.GetExitCodeProcess(self._hProcess) - # Return the exit code of the child process - return self.returncode diff --git a/neutron/common/eventlet_utils.py b/neutron/common/eventlet_utils.py index 9af9ae66779..5e82ab68cc1 100644 --- a/neutron/common/eventlet_utils.py +++ b/neutron/common/eventlet_utils.py @@ -26,7 +26,13 @@ def monkey_patch(): # https://github.com/eventlet/eventlet/commit/b756447bab51046dfc6f1e0e299cc997ab343701 # For details please check https://bugs.launchpad.net/neutron/+bug/1745013 eventlet.hubs.get_hub() - eventlet.monkey_patch() if os.name != 'nt': + eventlet.monkey_patch() + p_c_e = importutils.import_module('pyroute2.config.asyncio') p_c_e.asyncio_config() + else: + # eventlet monkey patching the os module causes subprocess.Popen to + # fail on Windows when using pipes due to missing non-blocking IO + # support. + eventlet.monkey_patch(os=False)