Add mutex class
This will expose Windows mutexes. We usually rely on file locks when synchronizing multiple processes, mostly for portability reasons. Some times, file locks may not be an option, in which case a named mutex can be useful. We intend to use this in Glance when ensuring that only one scrubber can run at a time. Change-Id: I4a2a51e2164c6f5816f2af2641d41d1db7870600
This commit is contained in:
parent
0857a0eb81
commit
5c3f7cd15e
|
@ -0,0 +1,75 @@
|
|||
# Copyright 2019 Cloudbase Solutions Srl
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
import uuid
|
||||
|
||||
from os_win import exceptions
|
||||
from os_win.tests.functional import test_base
|
||||
from os_win.utils import processutils
|
||||
|
||||
|
||||
class MutexTestCase(test_base.OsWinBaseFunctionalTestCase):
|
||||
def setUp(self):
|
||||
super(MutexTestCase, self).setUp()
|
||||
|
||||
mutex_name = str(uuid.uuid4())
|
||||
self._mutex = processutils.Mutex(name=mutex_name)
|
||||
|
||||
self.addCleanup(self._mutex.close)
|
||||
|
||||
def acquire_mutex_in_separate_thread(self, mutex):
|
||||
# We'll wait for a signal before releasing the mutex.
|
||||
stop_event = threading.Event()
|
||||
|
||||
def target():
|
||||
mutex.acquire()
|
||||
|
||||
stop_event.wait()
|
||||
|
||||
mutex.release()
|
||||
|
||||
thread = threading.Thread(target=target)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
return thread, stop_event
|
||||
|
||||
def test_already_acquired_mutex(self):
|
||||
thread, stop_event = self.acquire_mutex_in_separate_thread(
|
||||
self._mutex)
|
||||
|
||||
# We shouldn't be able to acquire a mutex held by a
|
||||
# different thread.
|
||||
self.assertFalse(self._mutex.acquire(timeout_ms=0))
|
||||
|
||||
stop_event.set()
|
||||
|
||||
# We should now be able to acquire the mutex.
|
||||
# We're using a timeout, giving the other thread some
|
||||
# time to release it.
|
||||
self.assertTrue(self._mutex.acquire(timeout_ms=2000))
|
||||
|
||||
def test_release_unacquired_mutex(self):
|
||||
self.assertRaises(exceptions.Win32Exception,
|
||||
self._mutex.release)
|
||||
|
||||
def test_multiple_acquire(self):
|
||||
# The mutex owner should be able to acquire it multiple times.
|
||||
self._mutex.acquire(timeout_ms=0)
|
||||
self._mutex.acquire(timeout_ms=0)
|
||||
|
||||
self._mutex.release()
|
||||
self._mutex.release()
|
|
@ -196,3 +196,24 @@ class ProcessUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
mock_wait.assert_called_once_with(phandles,
|
||||
mock.sentinel.wait_all,
|
||||
mock.sentinel.milliseconds)
|
||||
|
||||
def test_create_mutex(self):
|
||||
handle = self._procutils.create_mutex(
|
||||
mock.sentinel.name, mock.sentinel.owner,
|
||||
mock.sentinel.sec_attr)
|
||||
|
||||
self.assertEqual(self._mock_run.return_value, handle)
|
||||
self._mock_run.assert_called_once_with(
|
||||
self._mock_kernel32.CreateMutexW,
|
||||
self._ctypes.byref(mock.sentinel.sec_attr),
|
||||
mock.sentinel.owner,
|
||||
mock.sentinel.name,
|
||||
kernel32_lib_func=True)
|
||||
|
||||
def test_release_mutex(self):
|
||||
self._procutils.release_mutex(mock.sentinel.handle)
|
||||
|
||||
self._mock_run.assert_called_once_with(
|
||||
self._mock_kernel32.ReleaseMutex,
|
||||
mock.sentinel.handle,
|
||||
kernel32_lib_func=True)
|
||||
|
|
|
@ -241,3 +241,26 @@ class Win32UtilsTestCase(test_base.BaseTestCase):
|
|||
self._win32_utils.wait_for_multiple_objects,
|
||||
fake_handles, mock.sentinel.wait_all,
|
||||
mock.sentinel.milliseconds)
|
||||
|
||||
@mock.patch.object(win32utils.Win32Utils, 'run_and_check_output')
|
||||
def test_wait_for_single_object(self, mock_helper):
|
||||
ret_val = self._win32_utils.wait_for_single_object(
|
||||
mock.sentinel.handle, mock.sentinel.milliseconds)
|
||||
|
||||
mock_helper.assert_called_once_with(
|
||||
win32utils.kernel32.WaitForSingleObject,
|
||||
mock.sentinel.handle,
|
||||
mock.sentinel.milliseconds,
|
||||
kernel32_lib_func=True,
|
||||
error_ret_vals=[w_const.WAIT_FAILED])
|
||||
self.assertEqual(mock_helper.return_value, ret_val)
|
||||
|
||||
@mock.patch.object(win32utils.Win32Utils, 'run_and_check_output')
|
||||
def test_wait_for_single_object_timeout(self, mock_helper):
|
||||
mock_helper.return_value = w_const.ERROR_WAIT_TIMEOUT
|
||||
|
||||
self.assertRaises(
|
||||
exceptions.Timeout,
|
||||
self._win32_utils.wait_for_single_object,
|
||||
mock.sentinel.timeout,
|
||||
mock.sentinel.milliseconds)
|
||||
|
|
|
@ -18,6 +18,7 @@ import ctypes
|
|||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from os_win import exceptions
|
||||
from os_win.utils import win32utils
|
||||
from os_win.utils.winapi import constants as w_const
|
||||
from os_win.utils.winapi import libs as w_lib
|
||||
|
@ -128,3 +129,58 @@ class ProcessUtils(object):
|
|||
finally:
|
||||
for handle in handles:
|
||||
self._win32_utils.close_handle(handle)
|
||||
|
||||
def create_mutex(self, name=None, initial_owner=False,
|
||||
security_attributes=None):
|
||||
sec_attr_ref = (ctypes.byref(security_attributes)
|
||||
if security_attributes else None)
|
||||
return self._run_and_check_output(
|
||||
kernel32.CreateMutexW,
|
||||
sec_attr_ref,
|
||||
initial_owner,
|
||||
name)
|
||||
|
||||
def release_mutex(self, handle):
|
||||
return self._run_and_check_output(
|
||||
kernel32.ReleaseMutex,
|
||||
handle)
|
||||
|
||||
|
||||
class Mutex(object):
|
||||
def __init__(self, name=None):
|
||||
self.name = name
|
||||
|
||||
self._processutils = ProcessUtils()
|
||||
self._win32_utils = win32utils.Win32Utils()
|
||||
|
||||
# This is supposed to be a simple interface.
|
||||
# We're not exposing the "initial_owner" flag,
|
||||
# nor are we informing the caller if the mutex
|
||||
# already exists.
|
||||
self._handle = self._processutils.create_mutex(
|
||||
self.name)
|
||||
|
||||
def acquire(self, timeout_ms=w_const.INFINITE):
|
||||
try:
|
||||
self._win32_utils.wait_for_single_object(
|
||||
self._handle, timeout_ms)
|
||||
return True
|
||||
except exceptions.Timeout:
|
||||
return False
|
||||
|
||||
def release(self):
|
||||
self._processutils.release_mutex(self._handle)
|
||||
|
||||
def close(self):
|
||||
if self._handle:
|
||||
self._win32_utils.close_handle(self._handle)
|
||||
self._handle = None
|
||||
|
||||
__del__ = close
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.release()
|
||||
|
|
|
@ -145,3 +145,16 @@ class Win32Utils(object):
|
|||
raise exceptions.Timeout()
|
||||
|
||||
return ret_val
|
||||
|
||||
def wait_for_single_object(self, handle,
|
||||
milliseconds=w_const.INFINITE):
|
||||
ret_val = self.run_and_check_output(
|
||||
kernel32.WaitForSingleObject,
|
||||
handle,
|
||||
milliseconds,
|
||||
kernel32_lib_func=True,
|
||||
error_ret_vals=[w_const.WAIT_FAILED])
|
||||
if ret_val == w_const.ERROR_WAIT_TIMEOUT:
|
||||
raise exceptions.Timeout()
|
||||
|
||||
return ret_val
|
||||
|
|
|
@ -95,6 +95,12 @@ def register():
|
|||
]
|
||||
lib_handle.CreateFileW.restype = wintypes.HANDLE
|
||||
|
||||
lib_handle.CreateMutexW.argtypes = [
|
||||
wintypes.LPCVOID,
|
||||
wintypes.BOOL,
|
||||
wintypes.LPCWSTR]
|
||||
lib_handle.CreateMutexW.restype = wintypes.HANDLE
|
||||
|
||||
lib_handle.CreatePipe.argtypes = [
|
||||
wintypes.PHANDLE,
|
||||
wintypes.PHANDLE,
|
||||
|
@ -162,6 +168,9 @@ def register():
|
|||
]
|
||||
lib_handle.ReadFileEx.restype = wintypes.BOOL
|
||||
|
||||
lib_handle.ReleaseMutex.argtypes = [wintypes.HANDLE]
|
||||
lib_handle.ReleaseMutex.restype = wintypes.BOOL
|
||||
|
||||
lib_handle.ResetEvent.argtypes = [wintypes.HANDLE]
|
||||
lib_handle.ResetEvent.restype = wintypes.BOOL
|
||||
|
||||
|
@ -171,6 +180,12 @@ def register():
|
|||
lib_handle.SetLastError.argtypes = [wintypes.DWORD]
|
||||
lib_handle.SetLastError.restype = None
|
||||
|
||||
lib_handle.WaitForSingleObject.argtypes = [
|
||||
wintypes.HANDLE,
|
||||
wintypes.DWORD
|
||||
]
|
||||
lib_handle.WaitForSingleObject.restype = wintypes.DWORD
|
||||
|
||||
lib_handle.WaitForSingleObjectEx.argtypes = [
|
||||
wintypes.HANDLE,
|
||||
wintypes.DWORD,
|
||||
|
|
|
@ -19,6 +19,7 @@ from os_win._i18n import _ # noqa
|
|||
from os_win import exceptions
|
||||
from os_win.utils import hostutils
|
||||
from os_win.utils.io import namedpipe
|
||||
from os_win.utils import processutils
|
||||
|
||||
utils = hostutils.HostUtils()
|
||||
|
||||
|
@ -201,3 +202,7 @@ def get_processutils():
|
|||
|
||||
def get_ioutils():
|
||||
return _get_class(class_type='ioutils')
|
||||
|
||||
|
||||
def get_mutex(*args, **kwargs):
|
||||
return processutils.Mutex(*args, **kwargs)
|
||||
|
|
Loading…
Reference in New Issue