Ensure Win32 API calls do not block
Win32 API functions will be invoked, by default, using tpool.execute when called from within a greenthread, ensuring that other greenthreads are not blocked. This workflow will not be applied to Win32 API functions that are known to always return quickly, in which case this wouldn't bring any benefit. Closes-Bug: #1624364 Change-Id: I81a5a87558330b975bb0f40333689bdc7a5ce3e7
This commit is contained in:
@@ -20,6 +20,8 @@ import socket
|
||||
import time
|
||||
import types
|
||||
|
||||
import eventlet
|
||||
from eventlet import tpool
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_log import log as logging
|
||||
@@ -187,3 +189,24 @@ def get_ips(addr):
|
||||
# Returns IPv4 and IPv6 addresses, ordered by protocol family
|
||||
addr_info.sort()
|
||||
return [a[4][0] for a in addr_info]
|
||||
|
||||
|
||||
def avoid_blocking_call(f, *args, **kwargs):
|
||||
"""Ensures that the invoked method will not block other greenthreads.
|
||||
|
||||
Performs the call in a different thread using tpool.execute when called
|
||||
from a greenthread.
|
||||
"""
|
||||
# Note that eventlet.getcurrent will always return a greenlet object.
|
||||
# In case of a greenthread, the parent greenlet will always be the hub
|
||||
# loop greenlet.
|
||||
if eventlet.getcurrent().parent:
|
||||
return tpool.execute(f, *args, **kwargs)
|
||||
else:
|
||||
return f(*args, **kwargs)
|
||||
|
||||
|
||||
def avoid_blocking_call_decorator(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
return avoid_blocking_call(f, *args, **kwargs)
|
||||
return wrapper
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
Unit tests for the os_win._utils module.
|
||||
"""
|
||||
|
||||
import ddt
|
||||
import mock
|
||||
from oslotest import base
|
||||
|
||||
@@ -25,6 +26,7 @@ from os_win import _utils
|
||||
from os_win import exceptions
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class UtilsTestCase(base.BaseTestCase):
|
||||
|
||||
@mock.patch('oslo_concurrency.processutils.execute')
|
||||
@@ -196,3 +198,31 @@ class UtilsTestCase(base.BaseTestCase):
|
||||
|
||||
mock_getaddrinfo.assert_called_once_with(
|
||||
mock.sentinel.addr, None, 0, 0, 0)
|
||||
|
||||
@mock.patch('eventlet.tpool.execute')
|
||||
@mock.patch('eventlet.getcurrent')
|
||||
@ddt.data(mock.Mock(), None)
|
||||
def test_avoid_blocking_call(self, gt_parent, mock_get_current_gt,
|
||||
mock_execute):
|
||||
mock_get_current_gt.return_value.parent = gt_parent
|
||||
mock_execute.return_value = mock.sentinel.ret_val
|
||||
|
||||
def fake_blocking_func(*args, **kwargs):
|
||||
self.assertEqual((mock.sentinel.arg, ), args)
|
||||
self.assertDictEqual(dict(kwarg=mock.sentinel.kwarg),
|
||||
kwargs)
|
||||
return mock.sentinel.ret_val
|
||||
|
||||
fake_blocking_func_decorated = (
|
||||
_utils.avoid_blocking_call_decorator(fake_blocking_func))
|
||||
|
||||
ret_val = fake_blocking_func_decorated(mock.sentinel.arg,
|
||||
kwarg=mock.sentinel.kwarg)
|
||||
|
||||
self.assertEqual(mock.sentinel.ret_val, ret_val)
|
||||
if gt_parent:
|
||||
mock_execute.assert_called_once_with(fake_blocking_func,
|
||||
mock.sentinel.arg,
|
||||
kwarg=mock.sentinel.kwarg)
|
||||
else:
|
||||
self.assertFalse(mock_execute.called)
|
||||
|
||||
@@ -74,7 +74,8 @@ class IOUtilsTestCase(base.BaseTestCase):
|
||||
|
||||
self._mock_run = self._ioutils._win32_utils.run_and_check_output
|
||||
self._run_args = dict(kernel32_lib_func=True,
|
||||
failure_exc=exceptions.Win32IOException)
|
||||
failure_exc=exceptions.Win32IOException,
|
||||
eventlet_nonblocking_mode=False)
|
||||
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
import mock
|
||||
from oslotest import base
|
||||
|
||||
from os_win import _utils
|
||||
from os_win import exceptions
|
||||
from os_win.utils import win32utils
|
||||
|
||||
@@ -45,14 +46,21 @@ class Win32UtilsTestCase(base.BaseTestCase):
|
||||
wintypes=mock.DEFAULT,
|
||||
create=True).start()
|
||||
|
||||
@mock.patch.object(_utils, 'avoid_blocking_call')
|
||||
@mock.patch.object(win32utils.Win32Utils, 'get_error_message')
|
||||
@mock.patch.object(win32utils.Win32Utils, 'get_last_error')
|
||||
def _test_run_and_check_output(self, mock_get_last_err, mock_get_err_msg,
|
||||
ret_val=None, expected_exc=None,
|
||||
mock_avoid_blocking_call,
|
||||
ret_val=0, expected_exc=None,
|
||||
**kwargs):
|
||||
self._ctypes_patcher.stop()
|
||||
|
||||
mock_func = mock.Mock()
|
||||
mock_func.return_value = ret_val
|
||||
self._ctypes_patcher.stop()
|
||||
mock_avoid_blocking_call.return_value = ret_val
|
||||
|
||||
eventlet_nonblocking_mode = kwargs.get(
|
||||
'eventlet_nonblocking_mode', True)
|
||||
|
||||
if expected_exc:
|
||||
self.assertRaises(expected_exc,
|
||||
@@ -69,13 +77,20 @@ class Win32UtilsTestCase(base.BaseTestCase):
|
||||
**kwargs)
|
||||
self.assertEqual(ret_val, actual_ret_val)
|
||||
|
||||
mock_func.assert_called_once_with(mock.sentinel.arg,
|
||||
kwarg=mock.sentinel.kwarg)
|
||||
if eventlet_nonblocking_mode:
|
||||
mock_avoid_blocking_call.assert_called_once_with(
|
||||
mock_func, mock.sentinel.arg, kwarg=mock.sentinel.kwarg)
|
||||
else:
|
||||
mock_func.assert_called_once_with(mock.sentinel.arg,
|
||||
kwarg=mock.sentinel.kwarg)
|
||||
|
||||
return mock_get_last_err, mock_get_err_msg
|
||||
|
||||
def test_run_and_check_output(self):
|
||||
self._test_run_and_check_output(ret_val=0)
|
||||
self._test_run_and_check_output()
|
||||
|
||||
def test_run_and_check_output_nonblocking_mode_disabled(self):
|
||||
self._test_run_and_check_output(eventlet_nonblocking_mode=False)
|
||||
|
||||
def test_run_and_check_output_fail_on_nonzero_ret_val(self):
|
||||
ret_val = 1
|
||||
|
||||
@@ -145,8 +145,10 @@ class IOUtils(object):
|
||||
self._win32_utils = win32utils.Win32Utils()
|
||||
|
||||
def _run_and_check_output(self, *args, **kwargs):
|
||||
eventlet_blocking_mode = kwargs.get('eventlet_nonblocking_mode', False)
|
||||
kwargs.update(kernel32_lib_func=True,
|
||||
failure_exc=exceptions.Win32IOException)
|
||||
failure_exc=exceptions.Win32IOException,
|
||||
eventlet_nonblocking_mode=eventlet_blocking_mode)
|
||||
return self._win32_utils.run_and_check_output(*args, **kwargs)
|
||||
|
||||
def wait_named_pipe(self, pipe_name, timeout=WAIT_PIPE_DEFAULT_TIMEOUT):
|
||||
|
||||
@@ -22,6 +22,7 @@ import textwrap
|
||||
from oslo_log import log as logging
|
||||
|
||||
from os_win._i18n import _, _LW
|
||||
from os_win import _utils
|
||||
from os_win import exceptions
|
||||
from os_win.utils.storage.initiator import fc_structures as fc_struct
|
||||
from os_win.utils import win32utils
|
||||
@@ -172,5 +173,6 @@ class FCUtils(object):
|
||||
mappings.append(mapping)
|
||||
return mappings
|
||||
|
||||
@_utils.avoid_blocking_call_decorator
|
||||
def refresh_hba_configuration(self):
|
||||
hbaapi.HBA_RefreshAdapterConfiguration()
|
||||
|
||||
@@ -19,6 +19,7 @@ import sys
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from os_win import _utils
|
||||
from os_win import exceptions
|
||||
|
||||
if sys.platform == 'win32':
|
||||
@@ -62,7 +63,13 @@ class Win32Utils(object):
|
||||
# message table.
|
||||
error_msg_src = kwargs.pop('error_msg_src', {})
|
||||
|
||||
ret_val = func(*args, **kwargs)
|
||||
eventlet_nonblocking_mode = kwargs.pop(
|
||||
'eventlet_nonblocking_mode', True)
|
||||
|
||||
if eventlet_nonblocking_mode:
|
||||
ret_val = _utils.avoid_blocking_call(func, *args, **kwargs)
|
||||
else:
|
||||
ret_val = func(*args, **kwargs)
|
||||
|
||||
func_failed = (error_on_nonzero_ret_val and ret_val) or (
|
||||
ret_val in error_ret_vals)
|
||||
|
||||
Reference in New Issue
Block a user