oslo.log/oslo_log/tests/unit/test_pipe_mutex.py

210 lines
6.4 KiB
Python

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import unittest
from unittest import mock
import eventlet
from eventlet import debug as eventlet_debug
from eventlet import greenpool
from oslo_log import pipe_mutex
@contextlib.contextmanager
def quiet_eventlet_exceptions():
orig_state = greenpool.DEBUG
eventlet_debug.hub_exceptions(False)
try:
yield
finally:
eventlet_debug.hub_exceptions(orig_state)
class TestPipeMutex(unittest.TestCase):
"""From Swift's test/unit/common/test_utils.py"""
def setUp(self):
self.mutex = pipe_mutex.PipeMutex()
def tearDown(self):
self.mutex.close()
def test_nonblocking(self):
evt_lock1 = eventlet.event.Event()
evt_lock2 = eventlet.event.Event()
evt_unlock = eventlet.event.Event()
def get_the_lock():
self.mutex.acquire()
evt_lock1.send('got the lock')
evt_lock2.wait()
self.mutex.release()
evt_unlock.send('released the lock')
eventlet.spawn(get_the_lock)
evt_lock1.wait() # Now, the other greenthread has the lock.
self.assertFalse(self.mutex.acquire(blocking=False))
evt_lock2.send('please release the lock')
evt_unlock.wait() # The other greenthread has released the lock.
self.assertTrue(self.mutex.acquire(blocking=False))
def test_recursive(self):
self.assertTrue(self.mutex.acquire(blocking=False))
self.assertTrue(self.mutex.acquire(blocking=False))
def try_acquire_lock():
return self.mutex.acquire(blocking=False)
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
self.mutex.release()
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
self.mutex.release()
self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
def test_release_without_acquire(self):
self.assertRaises(RuntimeError, self.mutex.release)
def test_too_many_releases(self):
self.mutex.acquire()
self.mutex.release()
self.assertRaises(RuntimeError, self.mutex.release)
def test_wrong_releaser(self):
self.mutex.acquire()
with quiet_eventlet_exceptions():
self.assertRaises(RuntimeError,
eventlet.spawn(self.mutex.release).wait)
def test_blocking(self):
evt = eventlet.event.Event()
sequence = []
def coro1():
eventlet.sleep(0) # let coro2 go
self.mutex.acquire()
sequence.append('coro1 acquire')
evt.send('go')
self.mutex.release()
sequence.append('coro1 release')
def coro2():
evt.wait() # wait for coro1 to start us
self.mutex.acquire()
sequence.append('coro2 acquire')
self.mutex.release()
sequence.append('coro2 release')
c1 = eventlet.spawn(coro1)
c2 = eventlet.spawn(coro2)
c1.wait()
c2.wait()
self.assertEqual(sequence, [
'coro1 acquire',
'coro1 release',
'coro2 acquire',
'coro2 release'])
def test_blocking_tpool(self):
# Note: this test's success isn't a guarantee that the mutex is
# working. However, this test's failure means that the mutex is
# definitely broken.
sequence = []
def do_stuff():
n = 10
while n > 0:
self.mutex.acquire()
sequence.append("<")
eventlet.sleep(0.0001)
sequence.append(">")
self.mutex.release()
n -= 1
greenthread1 = eventlet.spawn(do_stuff)
greenthread2 = eventlet.spawn(do_stuff)
real_thread1 = eventlet.patcher.original('threading').Thread(
target=do_stuff)
real_thread1.start()
real_thread2 = eventlet.patcher.original('threading').Thread(
target=do_stuff)
real_thread2.start()
greenthread1.wait()
greenthread2.wait()
real_thread1.join()
real_thread2.join()
self.assertEqual(''.join(sequence), "<>" * 40)
def test_blocking_preserves_ownership(self):
pthread1_event = eventlet.patcher.original('threading').Event()
pthread2_event1 = eventlet.patcher.original('threading').Event()
pthread2_event2 = eventlet.patcher.original('threading').Event()
thread_id = []
owner = []
def pthread1():
thread_id.append(id(eventlet.greenthread.getcurrent()))
self.mutex.acquire()
owner.append(self.mutex.owner)
pthread2_event1.set()
orig_os_write = pipe_mutex.os.write
def patched_os_write(*a, **kw):
try:
return orig_os_write(*a, **kw)
finally:
pthread1_event.wait()
with mock.patch.object(pipe_mutex.os, 'write', patched_os_write):
self.mutex.release()
pthread2_event2.set()
def pthread2():
pthread2_event1.wait() # ensure pthread1 acquires lock first
thread_id.append(id(eventlet.greenthread.getcurrent()))
self.mutex.acquire()
pthread1_event.set()
pthread2_event2.wait()
owner.append(self.mutex.owner)
self.mutex.release()
real_thread1 = eventlet.patcher.original('threading').Thread(
target=pthread1)
real_thread1.start()
real_thread2 = eventlet.patcher.original('threading').Thread(
target=pthread2)
real_thread2.start()
real_thread1.join()
real_thread2.join()
self.assertEqual(thread_id, owner)
self.assertIsNone(self.mutex.owner)
@classmethod
def tearDownClass(cls):
# PipeMutex turns this off when you instantiate one
eventlet.debug.hub_prevent_multiple_readers(True)