Merge "Windows: fix exec calls" into stable/queens
This commit is contained in:
commit
f58fa5c021
@ -15,15 +15,22 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
from eventlet import tpool
|
||||||
from neutron_lib.utils import helpers
|
from neutron_lib.utils import helpers
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import encodeutils
|
from oslo_utils import encodeutils
|
||||||
|
|
||||||
from neutron._i18n import _
|
from neutron._i18n import _
|
||||||
from neutron.agent.windows import winutils
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# subprocess.Popen will spawn two threads consuming stdout/stderr when passing
|
||||||
|
# data through stdin. We need to make sure that *native* threads will be used
|
||||||
|
# as pipes are blocking on Windows.
|
||||||
|
subprocess = eventlet.patcher.original('subprocess')
|
||||||
|
subprocess.threading = eventlet.patcher.original('threading')
|
||||||
|
|
||||||
|
|
||||||
def create_process(cmd, addl_env=None):
|
def create_process(cmd, addl_env=None):
|
||||||
cmd = list(map(str, cmd))
|
cmd = list(map(str, cmd))
|
||||||
@ -33,7 +40,14 @@ def create_process(cmd, addl_env=None):
|
|||||||
if addl_env:
|
if addl_env:
|
||||||
env.update(addl_env)
|
env.update(addl_env)
|
||||||
|
|
||||||
obj = winutils.ProcessWithNamedPipes(cmd, env)
|
popen = subprocess.Popen
|
||||||
|
obj = popen(cmd, shell=False,
|
||||||
|
stdin=subprocess.PIPE,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
env=env,
|
||||||
|
preexec_fn=None,
|
||||||
|
close_fds=False)
|
||||||
|
|
||||||
return obj, cmd
|
return obj, cmd
|
||||||
|
|
||||||
@ -47,7 +61,7 @@ def execute(cmd, process_input=None, addl_env=None,
|
|||||||
else:
|
else:
|
||||||
_process_input = None
|
_process_input = None
|
||||||
obj, cmd = create_process(cmd, addl_env=addl_env)
|
obj, cmd = create_process(cmd, addl_env=addl_env)
|
||||||
_stdout, _stderr = obj.communicate(_process_input)
|
_stdout, _stderr = avoid_blocking_call(obj.communicate, _process_input)
|
||||||
obj.stdin.close()
|
obj.stdin.close()
|
||||||
_stdout = helpers.safe_decode_utf8(_stdout)
|
_stdout = helpers.safe_decode_utf8(_stdout)
|
||||||
_stderr = helpers.safe_decode_utf8(_stderr)
|
_stderr = helpers.safe_decode_utf8(_stderr)
|
||||||
@ -74,3 +88,22 @@ def execute(cmd, process_input=None, addl_env=None,
|
|||||||
raise RuntimeError(m)
|
raise RuntimeError(m)
|
||||||
|
|
||||||
return (_stdout, _stderr) if return_stderr else _stdout
|
return (_stdout, _stderr) if return_stderr else _stdout
|
||||||
|
|
||||||
|
|
||||||
|
def avoid_blocking_call(f, *args, **kwargs):
|
||||||
|
"""Ensure that the method "f" will not block other greenthreads.
|
||||||
|
|
||||||
|
Performs the call to the function "f" received as parameter in a
|
||||||
|
different thread using tpool.execute when called from a greenthread.
|
||||||
|
This will ensure that the function "f" will not block other greenthreads.
|
||||||
|
If not called from a greenthread, it will invoke the function "f" directly.
|
||||||
|
The function "f" will receive as parameters the arguments "args" and
|
||||||
|
keyword arguments "kwargs".
|
||||||
|
"""
|
||||||
|
# Note that eventlet.getcurrent will always return a greenlet object.
|
||||||
|
# In case of a greenthread, the parent greenlet will always be the hub
|
||||||
|
# loop greenlet.
|
||||||
|
if eventlet.getcurrent().parent:
|
||||||
|
return tpool.execute(f, *args, **kwargs)
|
||||||
|
else:
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
@ -1,606 +0,0 @@
|
|||||||
# Copyright 2017 Cloudbase Solutions.
|
|
||||||
# All Rights Reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import time
|
|
||||||
|
|
||||||
import eventlet
|
|
||||||
from eventlet import tpool
|
|
||||||
from ovs import winutils as ovs_winutils
|
|
||||||
|
|
||||||
import win32con
|
|
||||||
import win32event
|
|
||||||
import win32process
|
|
||||||
import win32security
|
|
||||||
|
|
||||||
|
|
||||||
def avoid_blocking_call(f, *args, **kwargs):
|
|
||||||
"""Ensure that the method "f" will not block other greenthreads.
|
|
||||||
|
|
||||||
Performs the call to the function "f" received as parameter in a
|
|
||||||
different thread using tpool.execute when called from a greenthread.
|
|
||||||
This will ensure that the function "f" will not block other greenthreads.
|
|
||||||
If not called from a greenthread, it will invoke the function "f" directly.
|
|
||||||
The function "f" will receive as parameters the arguments "args" and
|
|
||||||
keyword arguments "kwargs".
|
|
||||||
"""
|
|
||||||
# Note that eventlet.getcurrent will always return a greenlet object.
|
|
||||||
# In case of a greenthread, the parent greenlet will always be the hub
|
|
||||||
# loop greenlet.
|
|
||||||
if eventlet.getcurrent().parent:
|
|
||||||
return tpool.execute(f, *args, **kwargs)
|
|
||||||
else:
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class WindowsException(Exception):
|
|
||||||
"""Base Windows Exception
|
|
||||||
|
|
||||||
This class is inherited by all the other exceptions that are used in
|
|
||||||
this file. The 'error_message' property should be defined in the class
|
|
||||||
that inherits from this with a particular message if needed.
|
|
||||||
"""
|
|
||||||
error_message = None
|
|
||||||
|
|
||||||
def __init__(self, message):
|
|
||||||
super(WindowsException, self).__init__()
|
|
||||||
# The error message which will be printed for this exception
|
|
||||||
self.error_message = message
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self.error_message
|
|
||||||
|
|
||||||
|
|
||||||
class NamedPipeException(WindowsException):
|
|
||||||
"""Exception raised when there is an error with the named pipes.
|
|
||||||
|
|
||||||
If there is an error code associated with this exception, it can be
|
|
||||||
retrieved by accessing the 'code' property of this class.
|
|
||||||
"""
|
|
||||||
def __init__(self, message, error_code=None):
|
|
||||||
super(NamedPipeException, self).__init__(message)
|
|
||||||
# The error code associated with this exception. This property should
|
|
||||||
# be different than 'None' if there is an existing error code.
|
|
||||||
self.code = error_code
|
|
||||||
if self.code:
|
|
||||||
# Appending the error code to the message
|
|
||||||
self.error_message += " Error code: '%s'." % self.code
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return self._error_string
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessException(WindowsException):
|
|
||||||
"""Exception raised when there is an error with the child process.
|
|
||||||
|
|
||||||
This class inherits the implementation from the super class, it does not
|
|
||||||
have anything particular. It is intentionally left blank.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class NamedPipe(object):
|
|
||||||
def __init__(self, pipe_name=None, sec_attributes=-1):
|
|
||||||
"""Create a named pipe with the given name.
|
|
||||||
|
|
||||||
:param pipe_name(Optional): string representing the name of the pipe
|
|
||||||
which should be used to create the named pipe
|
|
||||||
:param sec_attributes(Optional): type win32security.SECURITY_ATTRIBUTES
|
|
||||||
The default value is -1 which uses the default security attributes.
|
|
||||||
This means that the named pipe handle is inherited when a new
|
|
||||||
process is created.
|
|
||||||
"""
|
|
||||||
# For reading from the named pipe, we will use an overlapped structure
|
|
||||||
# for non-blocking I/O
|
|
||||||
self._read = ovs_winutils.pywintypes.OVERLAPPED()
|
|
||||||
# Create a new event which will be used by the overlapped structure
|
|
||||||
self._read.hEvent = ovs_winutils.get_new_event()
|
|
||||||
# This property tells if there is a pending reading operation on
|
|
||||||
# the named pipe or not.
|
|
||||||
self._read_pending = False
|
|
||||||
|
|
||||||
if pipe_name is None:
|
|
||||||
# Generate a random name for the named pipe if the name was not
|
|
||||||
# passed explicitly as parameter.
|
|
||||||
pipe_name = ("NamedPipe_%d_%s" %
|
|
||||||
(time.time(), str(random.random()).split(".")[1]))
|
|
||||||
|
|
||||||
# Creating the name for a local named pipe. The property "name" will
|
|
||||||
# have "\\.\pipe\" appended at the start of pipe_name
|
|
||||||
self.name = ovs_winutils.get_pipe_name(pipe_name)
|
|
||||||
# This property will contain the handle of the named pipe which can
|
|
||||||
# be accessed later on.
|
|
||||||
self.namedpipe = ovs_winutils.create_named_pipe(self.name,
|
|
||||||
saAttr=sec_attributes)
|
|
||||||
# This property should be initialised explicitly later on by calling
|
|
||||||
# the method create_file of this class.
|
|
||||||
self._npipe_file = None
|
|
||||||
|
|
||||||
if not self.namedpipe:
|
|
||||||
# If there was an error when creating the named pipe, the property
|
|
||||||
# "namedpipe" should be None. We raise an exception in this case
|
|
||||||
raise NamedPipeException("Failed to create named pipe.")
|
|
||||||
|
|
||||||
@property
|
|
||||||
def read_overlapped_event(self):
|
|
||||||
"""Return the event used by the overlapped structure for reading.
|
|
||||||
|
|
||||||
This is the handle(event) on which we should wait if we want to be
|
|
||||||
notified when there is something to read from the named pipe.
|
|
||||||
"""
|
|
||||||
return self._read.hEvent
|
|
||||||
|
|
||||||
def _wait_event(self, event, timeout=win32event.INFINITE):
|
|
||||||
"""Wait until the event is signaled or the timeout has passed."""
|
|
||||||
# If greenthreads are used, we need to wrap the call to
|
|
||||||
# win32event.WaitForMultipleObjects using avoid_blocking_call to make
|
|
||||||
# sure the function will not block the other greenthreads.
|
|
||||||
avoid_blocking_call(win32event.WaitForMultipleObjects,
|
|
||||||
[event],
|
|
||||||
False,
|
|
||||||
timeout)
|
|
||||||
|
|
||||||
def wait_for_read(self, timeout=win32event.INFINITE):
|
|
||||||
"""Wait until there is something to read from the named pipe or the
|
|
||||||
|
|
||||||
timeout passed as parameter has passed.
|
|
||||||
|
|
||||||
:param timeout: int representing the timeout in milliseconds
|
|
||||||
"""
|
|
||||||
if self._read_pending:
|
|
||||||
self._wait_event(self._read.hEvent, timeout)
|
|
||||||
|
|
||||||
def create_file(self, sec_attributes=-1):
|
|
||||||
"""Create the file for the named pipe and store it in the '_npipe_file'
|
|
||||||
|
|
||||||
property of the class.
|
|
||||||
|
|
||||||
:param sec_attributes: type win32security.SECURITY_ATTRIBUTES
|
|
||||||
The default value is -1 which uses the default security attributes.
|
|
||||||
This means that the file handle will NOT be inherited when
|
|
||||||
a new process is created.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
# Create the file using the name of the named pipe with the given
|
|
||||||
# security attributes
|
|
||||||
self._npipe_file = ovs_winutils.create_file(
|
|
||||||
self.name, attributes=sec_attributes)
|
|
||||||
try:
|
|
||||||
ovs_winutils.set_pipe_mode(
|
|
||||||
self._npipe_file,
|
|
||||||
ovs_winutils.win32pipe.PIPE_READMODE_BYTE)
|
|
||||||
except ovs_winutils.pywintypes.error as e:
|
|
||||||
raise NamedPipeException(
|
|
||||||
"Could not set pipe read mode to byte. "
|
|
||||||
"Error: %s." % e.strerror, e.winerror)
|
|
||||||
except ovs_winutils.pywintypes.error as e:
|
|
||||||
raise NamedPipeException("Could not create file for named pipe. "
|
|
||||||
"Error: %s." % e.strerror, e.winerror)
|
|
||||||
|
|
||||||
def blocking_write(self, buf, to_namedpipe=True):
|
|
||||||
"""Write to the named pipe handle or the file handle.
|
|
||||||
|
|
||||||
This function will wait until the write operation has completed.
|
|
||||||
|
|
||||||
:param buf: string representing the buffer which will be written
|
|
||||||
:param to_namedpipe: boolean representing where to write the buffer
|
|
||||||
True = the buffer 'buf' will be written to the named pipe handle
|
|
||||||
False = the buffer 'buf' will be written to the file handle
|
|
||||||
"""
|
|
||||||
if not to_namedpipe and self._npipe_file is None:
|
|
||||||
# If the method tries to write to the file handle but the
|
|
||||||
# property '_npipe_file' does not contain the file handle then
|
|
||||||
# we raise an exception
|
|
||||||
raise NamedPipeException("create_file must be called first.")
|
|
||||||
# Represents the handle where we should write the buffer
|
|
||||||
handle_to_write = self.namedpipe if to_namedpipe else self._npipe_file
|
|
||||||
# encoded_buf will contain the buffer 'buf' represented in binary type
|
|
||||||
encoded_buf = ovs_winutils.get_encoded_buffer(buf)
|
|
||||||
|
|
||||||
# If greenthreads are used, we need to wrap the call to
|
|
||||||
# ovs_winutils.write_file using avoid_blocking_call to make
|
|
||||||
# sure the function will not block the other greenthreads.
|
|
||||||
(errCode, _nBytesWritten) = avoid_blocking_call(
|
|
||||||
ovs_winutils.write_file,
|
|
||||||
handle_to_write,
|
|
||||||
encoded_buf,
|
|
||||||
None)
|
|
||||||
if errCode:
|
|
||||||
# errCode should be 0 if the operation completed successfully.
|
|
||||||
# If we reach here it means there was an error during the write
|
|
||||||
# operation and we should raise an exception
|
|
||||||
raise NamedPipeException("Could not write to named pipe.", errCode)
|
|
||||||
|
|
||||||
def nonblocking_read(self, bytes_to_read, from_namedpipe=True):
|
|
||||||
"""Read from the named pipe handle or the file handle.
|
|
||||||
|
|
||||||
This function returns imediatly and does not wait for the read
|
|
||||||
operation to complete. In case the read operation is not complete,
|
|
||||||
the property '_read_pending' will be set to True and the method
|
|
||||||
get_read_result should be called to retrieve the result. Otherwise,
|
|
||||||
the output of the read operation is returned.
|
|
||||||
|
|
||||||
:param bytes_to_read: int representing the maximum number of bytes
|
|
||||||
to be read.
|
|
||||||
:param from_namedpipe: boolean representing from where to read
|
|
||||||
True = the function reads from the named pipe handle
|
|
||||||
False = he function reads from the file handle
|
|
||||||
"""
|
|
||||||
if self._read_pending:
|
|
||||||
# If there is a pending read operation, the method
|
|
||||||
# 'get_read_result' should be called to retrieve the result.
|
|
||||||
return
|
|
||||||
|
|
||||||
# Represents the handle from where we should read.
|
|
||||||
handle_to_read = self.namedpipe if from_namedpipe else self._npipe_file
|
|
||||||
|
|
||||||
# The read operation is non-blocking because the read overlapped
|
|
||||||
# structure is passed. It will return immediately.
|
|
||||||
(errCode, self._read_buffer) = ovs_winutils.read_file(
|
|
||||||
handle_to_read,
|
|
||||||
bytes_to_read,
|
|
||||||
self._read)
|
|
||||||
|
|
||||||
if errCode:
|
|
||||||
# The error code should be 0 if the operation executed with success
|
|
||||||
if errCode == ovs_winutils.winerror.ERROR_IO_PENDING:
|
|
||||||
# This is returned when the overlapped structure is passed
|
|
||||||
# to the read operation (which is our case) and the operation
|
|
||||||
# has not finished yet. We mark this as a pending read
|
|
||||||
# operation and we will use the method 'get_read_result'
|
|
||||||
# later on to retrieve the result.
|
|
||||||
self._read_pending = True
|
|
||||||
else:
|
|
||||||
# In this case we received an unexpected error code, raise
|
|
||||||
# an exception.
|
|
||||||
raise NamedPipeException(
|
|
||||||
"Could not read from named pipe.", errCode)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# If we can not retrieve the output from the overlapped result,
|
|
||||||
# it means that the pipe was disconnected so we have no output.
|
|
||||||
# The returned value should be an empty string.
|
|
||||||
output = ""
|
|
||||||
try:
|
|
||||||
# Try to retrieve the result from the overlapped structure.
|
|
||||||
# This call should succeed or otherwise will raise an exception,
|
|
||||||
# but it will not block.
|
|
||||||
nBytesRead = ovs_winutils.get_overlapped_result(
|
|
||||||
handle_to_read,
|
|
||||||
self._read,
|
|
||||||
False)
|
|
||||||
# Mark the read operation as complete
|
|
||||||
self._read_pending = False
|
|
||||||
# Retrieve the result and put the decoded result inside the
|
|
||||||
# 'output' variable.
|
|
||||||
output = ovs_winutils.get_decoded_buffer(
|
|
||||||
self._read_buffer[:nBytesRead])
|
|
||||||
# We need to manually signal the event to make sure the call to
|
|
||||||
# wait for the event will not block.
|
|
||||||
win32event.SetEvent(self._read.hEvent)
|
|
||||||
except NamedPipeException as e:
|
|
||||||
# If the pipe was disconnected, it means no output, we will return
|
|
||||||
# an empty string in this case. Otherwise raise an exception.
|
|
||||||
if e.code not in ovs_winutils.pipe_disconnected_errors:
|
|
||||||
raise e
|
|
||||||
return output
|
|
||||||
|
|
||||||
def get_read_result(self, from_namedpipe=True):
|
|
||||||
"""Return the result from the overlapped structure.
|
|
||||||
|
|
||||||
If there is no pending read operation, this function will return
|
|
||||||
immediately. This call will return False if the reading operation
|
|
||||||
has not completed yet and the read operation is still in progress.
|
|
||||||
Otherwise, it will return the result.
|
|
||||||
|
|
||||||
:param from_namedpipe: boolean representing from where to read
|
|
||||||
True = the function reads from the named pipe handle
|
|
||||||
False = he function reads from the file handle
|
|
||||||
"""
|
|
||||||
if not self._read_pending:
|
|
||||||
# There is no pending read operation, we should return here
|
|
||||||
return
|
|
||||||
|
|
||||||
# Represents the handle from where we should read.
|
|
||||||
handle_to_read = self.namedpipe if from_namedpipe else self._npipe_file
|
|
||||||
try:
|
|
||||||
# Try to retrieve the result from the overlapped structure.
|
|
||||||
# This will raise an ERROR_IO_INCOMPLETE exception if the
|
|
||||||
# read operation has not completed yet.
|
|
||||||
nBytesRead = ovs_winutils.get_overlapped_result(handle_to_read,
|
|
||||||
self._read,
|
|
||||||
False)
|
|
||||||
# Mark the read operation as complete
|
|
||||||
self._read_pending = False
|
|
||||||
# Decode the result and return it
|
|
||||||
return ovs_winutils.get_decoded_buffer(
|
|
||||||
self._read_buffer[:nBytesRead])
|
|
||||||
except ovs_winutils.pywintypes.error as e:
|
|
||||||
if e.winerror == ovs_winutils.winerror.ERROR_IO_INCOMPLETE:
|
|
||||||
# In this case we should call again this function to try to
|
|
||||||
# retrieve the result.
|
|
||||||
self._read_pending = True
|
|
||||||
# Return False to mark that the read operation has not
|
|
||||||
# completed yet.
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
# If we reach here it means that an unexpected error was
|
|
||||||
# received. We should raise an exception in this case.
|
|
||||||
raise NamedPipeException(
|
|
||||||
"Could not get the overlapped result. "
|
|
||||||
"Error: '%s'" % e.strerror, e.winerror)
|
|
||||||
|
|
||||||
def close_filehandle(self):
|
|
||||||
"""Close the file handle."""
|
|
||||||
ovs_winutils.close_handle(self._npipe_file)
|
|
||||||
|
|
||||||
def get_file_handle(self):
|
|
||||||
"""Returns the file handle."""
|
|
||||||
return self._npipe_file
|
|
||||||
|
|
||||||
def close_all_handles(self):
|
|
||||||
"""Close all the handles used by this class."""
|
|
||||||
if hasattr(self, "namedpipe") and self.namedpipe:
|
|
||||||
ovs_winutils.close_handle(self.namedpipe)
|
|
||||||
if hasattr(self, "_read") and self._read.hEvent:
|
|
||||||
ovs_winutils.close_handle(self._read.hEvent)
|
|
||||||
if hasattr(self, "_npipe_file") and self._npipe_file:
|
|
||||||
ovs_winutils.close_handle(self._npipe_file)
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
"""Make sure all the handles are closed."""
|
|
||||||
self.close_all_handles()
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessWithNamedPipes(object):
|
|
||||||
class HandleClass(object):
|
|
||||||
"""This class is used only to provide a 'close' method for the stdin,
|
|
||||||
stdout and stderr of the new process. This ensures compatibility with
|
|
||||||
the subprocess.Popen returned object.
|
|
||||||
"""
|
|
||||||
def __init__(self, namedpipe):
|
|
||||||
self.namedpipe = namedpipe
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
# Close all the handles used
|
|
||||||
if self.namedpipe:
|
|
||||||
self.namedpipe.close_all_handles()
|
|
||||||
self.namedpipe = None
|
|
||||||
|
|
||||||
# The maximum number of bytes to be read
|
|
||||||
_BUFSIZE = 16384
|
|
||||||
|
|
||||||
def __init__(self, cmd, env):
|
|
||||||
"""Create a new process executing 'cmd' and with environment 'env'.
|
|
||||||
|
|
||||||
:param cmd: string representing the command line to be executed
|
|
||||||
:param env: instance representing the environment which should be used
|
|
||||||
for the new process. Look at 'os.environ' for an example.
|
|
||||||
"""
|
|
||||||
# The startupinfo structure used to spawn the new process
|
|
||||||
self._si = win32process.STARTUPINFO()
|
|
||||||
|
|
||||||
# Attributes defined to ensure compatibility with the subprocess.Popen
|
|
||||||
# returned object.
|
|
||||||
self.returncode = None
|
|
||||||
self.stdin = None
|
|
||||||
self.stdout = None
|
|
||||||
self.stderr = None
|
|
||||||
self.pid = None
|
|
||||||
|
|
||||||
# Convert the command to be a single string
|
|
||||||
cmd = " ".join(cmd)
|
|
||||||
# Initialize the named pipes used for stdin, stdout and stderr
|
|
||||||
self._initialize_named_pipes_for_std()
|
|
||||||
# Create the child process
|
|
||||||
self._start_process(cmd, env)
|
|
||||||
|
|
||||||
def _initialize_named_pipes_for_std(self):
|
|
||||||
"""Initialize the named pipes used for communication with the child
|
|
||||||
process.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# used in generating the name for the pipe
|
|
||||||
pid = os.getpid()
|
|
||||||
|
|
||||||
# Security attributes for the named pipes, should not be inherited
|
|
||||||
# by the child process. Those are used by the parent process to
|
|
||||||
# communicate with the child process.
|
|
||||||
_saAttr_pipe = win32security.SECURITY_ATTRIBUTES()
|
|
||||||
_saAttr_pipe.bInheritHandle = 0
|
|
||||||
# Security attributes for the file handles, they should be inherited
|
|
||||||
# by the child process which will use them as stdin, stdout and stderr.
|
|
||||||
# The parent process will close those handles after the child process
|
|
||||||
# is created.
|
|
||||||
_saAttr_file = win32security.SECURITY_ATTRIBUTES()
|
|
||||||
_saAttr_file.bInheritHandle = 1
|
|
||||||
|
|
||||||
def create_namedpipe_and_file(prefix, saAttr_pipe=_saAttr_pipe,
|
|
||||||
saAttr_file=_saAttr_file):
|
|
||||||
"""Create the named pipe and the file for it.
|
|
||||||
|
|
||||||
:param prefix: string representing the prefix which will be
|
|
||||||
appended to the start of the name for the pipe
|
|
||||||
:param saAttr_pipe: security attributes used to create
|
|
||||||
the named pipe
|
|
||||||
:param saAttr_file: security attributes used to create the file
|
|
||||||
"""
|
|
||||||
pipename = ("%s_NamedPipe_%d_%d_%s" % (
|
|
||||||
prefix, pid, time.time(), str(random.random()).split(".")[1]))
|
|
||||||
# Create the named pipe
|
|
||||||
pipe = NamedPipe(pipe_name=pipename,
|
|
||||||
sec_attributes=saAttr_pipe)
|
|
||||||
# Create the file for the previously created named pipe
|
|
||||||
pipe.create_file(sec_attributes=saAttr_file)
|
|
||||||
return pipe
|
|
||||||
|
|
||||||
# Create the named pipes and the files used for parent - child process
|
|
||||||
# communication.
|
|
||||||
_pipe_stdin = create_namedpipe_and_file("Stdin")
|
|
||||||
self._pipe_stdout = create_namedpipe_and_file("Stdout")
|
|
||||||
self._pipe_stderr = create_namedpipe_and_file("Stderr")
|
|
||||||
|
|
||||||
# Set the file handles from the named pipes as stdin, stdout and stderr
|
|
||||||
# in startupinfo structure for the child process.
|
|
||||||
self._si.hStdInput = _pipe_stdin.get_file_handle()
|
|
||||||
self._si.hStdOutput = self._pipe_stdout.get_file_handle()
|
|
||||||
self._si.hStdError = self._pipe_stderr.get_file_handle()
|
|
||||||
self._si.dwFlags |= win32con.STARTF_USESTDHANDLES
|
|
||||||
|
|
||||||
# Wrapping around stdin in order to be able to call self.stdin.close()
|
|
||||||
# to close the stdin.
|
|
||||||
self.stdin = ProcessWithNamedPipes.HandleClass(_pipe_stdin)
|
|
||||||
_pipe_stdin = None
|
|
||||||
|
|
||||||
def _get_result_namedpipe(self, namedpipe):
|
|
||||||
"""Retrieve the result from the named pipe given as parameter.
|
|
||||||
|
|
||||||
This function will return False if the read operation has not
|
|
||||||
completed yet and we should call this method again to try to retrieve
|
|
||||||
the result.
|
|
||||||
|
|
||||||
:param namedpipe: represents the NamedPipe object from where to
|
|
||||||
retrieve the result
|
|
||||||
"""
|
|
||||||
# The default returned value will be empty string. This is returned
|
|
||||||
# in case the pipe was disconnected.
|
|
||||||
output = ""
|
|
||||||
try:
|
|
||||||
output = namedpipe.get_read_result()
|
|
||||||
except NamedPipeException as e:
|
|
||||||
# If the pipe was disconnected the error is ignored, otherwise
|
|
||||||
# we raise an exception
|
|
||||||
if e.code not in ovs_winutils.pipe_disconnected_errors:
|
|
||||||
raise e
|
|
||||||
return output
|
|
||||||
|
|
||||||
def communicate(self, input=None):
|
|
||||||
"""Return stdout and stderr of the child process.
|
|
||||||
|
|
||||||
Interact with process: Send the 'input' argument to stdin.
|
|
||||||
The function waits until the process terminates and reads from
|
|
||||||
stdout and stderr.
|
|
||||||
|
|
||||||
:param input: string representing the input which should be sent
|
|
||||||
to the child process. If this value is None, then nothing is passed
|
|
||||||
as stdin for the child process.
|
|
||||||
"""
|
|
||||||
if input:
|
|
||||||
# If we received any input, write it to stdin then close the handle
|
|
||||||
# to send EOF on stdin to the child process
|
|
||||||
self._stdin_write(input)
|
|
||||||
self.stdin.close()
|
|
||||||
|
|
||||||
# Try to retrieve the output for stdout and stderr. If the read
|
|
||||||
# operation has not completed yet, then None will be returned and
|
|
||||||
# we will try to retrieve the result again after the process is
|
|
||||||
# terminated.
|
|
||||||
stdout = self._pipe_stdout.nonblocking_read(self._BUFSIZE)
|
|
||||||
stderr = self._pipe_stderr.nonblocking_read(self._BUFSIZE)
|
|
||||||
|
|
||||||
# Wait for the process to terminate
|
|
||||||
self.wait()
|
|
||||||
|
|
||||||
if stdout is None:
|
|
||||||
# Wait until the read operation for stdout has completed and
|
|
||||||
# then retrieve the result.
|
|
||||||
self._pipe_stdout.wait_for_read()
|
|
||||||
stdout = self._get_result_namedpipe(self._pipe_stdout)
|
|
||||||
|
|
||||||
if stderr is None:
|
|
||||||
# Wait until the read operation for stdout has completed and
|
|
||||||
# then retrieve the result.
|
|
||||||
self._pipe_stderr.wait_for_read()
|
|
||||||
stderr = self._get_result_namedpipe(self._pipe_stderr)
|
|
||||||
|
|
||||||
# Close all the handles since the child process is terminated
|
|
||||||
# at this point.
|
|
||||||
self._pipe_stdout.close_all_handles()
|
|
||||||
self._pipe_stdout = None
|
|
||||||
self._pipe_stderr.close_all_handles()
|
|
||||||
self._pipe_stderr = None
|
|
||||||
|
|
||||||
# Return a tuple containing stdout and stderr to ensure compatibility
|
|
||||||
# with the subprocess module.
|
|
||||||
return (stdout, stderr)
|
|
||||||
|
|
||||||
def _stdin_write(self, input):
|
|
||||||
"""Send input to stdin for the child process."""
|
|
||||||
if input:
|
|
||||||
encoded_buf = ovs_winutils.get_encoded_buffer(input)
|
|
||||||
self.stdin.namedpipe.blocking_write(encoded_buf)
|
|
||||||
|
|
||||||
def _start_process(self, cmd_line, env):
|
|
||||||
"""Create a process with the command line 'cmd_line' and environment
|
|
||||||
'env'. Stores the pid of the child process in the 'pid' attribute.
|
|
||||||
"""
|
|
||||||
app_name = None
|
|
||||||
# The command line to be executed.
|
|
||||||
command_line = cmd_line
|
|
||||||
process_attributes = None
|
|
||||||
thread_attributes = None
|
|
||||||
# Each inheritable handle in the calling process is
|
|
||||||
# inherited by the new process.
|
|
||||||
inherit_handles = 1
|
|
||||||
# The new process has a new console, instead of inheriting
|
|
||||||
# its parent's console
|
|
||||||
creation_flags = win32process.CREATE_NO_WINDOW
|
|
||||||
# Environment used for the new process.
|
|
||||||
new_environment = env
|
|
||||||
current_directory = None
|
|
||||||
|
|
||||||
proc_args = (app_name,
|
|
||||||
command_line,
|
|
||||||
process_attributes,
|
|
||||||
thread_attributes,
|
|
||||||
inherit_handles,
|
|
||||||
creation_flags,
|
|
||||||
new_environment,
|
|
||||||
current_directory,
|
|
||||||
self._si)
|
|
||||||
proc_handles = win32process.CreateProcess(*proc_args)
|
|
||||||
|
|
||||||
# Close the handles that the parent is not going to use
|
|
||||||
self._pipe_stdout.close_filehandle()
|
|
||||||
self._pipe_stderr.close_filehandle()
|
|
||||||
|
|
||||||
self._hProcess, self._hThread, self.pid, self._tid = proc_handles
|
|
||||||
|
|
||||||
def wait(self, timeout=None):
|
|
||||||
"""Wait for the process to terminate or until timeout expires.
|
|
||||||
|
|
||||||
Returns returncode attribute. If timeout is None, then the method
|
|
||||||
will wait until the process terminates.
|
|
||||||
|
|
||||||
:param timeout: int or float representing the timeout in seconds
|
|
||||||
"""
|
|
||||||
if timeout is None:
|
|
||||||
timeout_millis = win32event.INFINITE
|
|
||||||
else:
|
|
||||||
timeout_millis = int(timeout * 1000)
|
|
||||||
|
|
||||||
if self.returncode is None:
|
|
||||||
# If the 'returncode' attribute is not set, it means that we
|
|
||||||
# have to wait for the child process to terminate and to return the
|
|
||||||
# exit code of it.
|
|
||||||
result = avoid_blocking_call(win32event.WaitForSingleObject,
|
|
||||||
self._hProcess,
|
|
||||||
timeout_millis)
|
|
||||||
if result == win32event.WAIT_TIMEOUT:
|
|
||||||
raise ProcessException("Timeout Exception.")
|
|
||||||
self.returncode = win32process.GetExitCodeProcess(self._hProcess)
|
|
||||||
# Return the exit code of the child process
|
|
||||||
return self.returncode
|
|
@ -26,7 +26,13 @@ def monkey_patch():
|
|||||||
# https://github.com/eventlet/eventlet/commit/b756447bab51046dfc6f1e0e299cc997ab343701
|
# https://github.com/eventlet/eventlet/commit/b756447bab51046dfc6f1e0e299cc997ab343701
|
||||||
# For details please check https://bugs.launchpad.net/neutron/+bug/1745013
|
# For details please check https://bugs.launchpad.net/neutron/+bug/1745013
|
||||||
eventlet.hubs.get_hub()
|
eventlet.hubs.get_hub()
|
||||||
eventlet.monkey_patch()
|
|
||||||
if os.name != 'nt':
|
if os.name != 'nt':
|
||||||
|
eventlet.monkey_patch()
|
||||||
|
|
||||||
p_c_e = importutils.import_module('pyroute2.config.asyncio')
|
p_c_e = importutils.import_module('pyroute2.config.asyncio')
|
||||||
p_c_e.asyncio_config()
|
p_c_e.asyncio_config()
|
||||||
|
else:
|
||||||
|
# eventlet monkey patching the os module causes subprocess.Popen to
|
||||||
|
# fail on Windows when using pipes due to missing non-blocking IO
|
||||||
|
# support.
|
||||||
|
eventlet.monkey_patch(os=False)
|
||||||
|
Loading…
Reference in New Issue
Block a user