Merge "Stop using eventlet in sqlite_lock"
This commit is contained in:
commit
f74f45903d
@ -12,17 +12,17 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from eventlet import semaphore
|
||||
import threading
|
||||
|
||||
|
||||
_mutex = semaphore.Semaphore()
|
||||
_mutex = threading.Semaphore()
|
||||
_locks = {}
|
||||
|
||||
|
||||
def acquire_lock(obj_id, session):
|
||||
with _mutex:
|
||||
if obj_id not in _locks:
|
||||
_locks[obj_id] = (session, semaphore.BoundedSemaphore(1))
|
||||
_locks[obj_id] = (session, threading.BoundedSemaphore(1))
|
||||
|
||||
tup = _locks.get(obj_id)
|
||||
|
||||
|
@ -14,10 +14,11 @@
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
import random
|
||||
import testtools
|
||||
import threading
|
||||
import time
|
||||
|
||||
from mistral import context as auth_context
|
||||
from mistral.db.sqlalchemy import sqlite_lock
|
||||
@ -57,7 +58,7 @@ class SQLiteLocksTest(test_base.DbTestCase):
|
||||
)
|
||||
|
||||
def _random_sleep(self):
|
||||
eventlet.sleep(random.Random().randint(0, 10) * 0.001)
|
||||
time.sleep(random.Random().randint(0, 10) * 0.001)
|
||||
|
||||
def _run_acquire_release_sqlite_lock(self, obj_id, session):
|
||||
self._random_sleep()
|
||||
@ -69,19 +70,17 @@ class SQLiteLocksTest(test_base.DbTestCase):
|
||||
sqlite_lock.release_locks(session)
|
||||
|
||||
def test_acquire_release_sqlite_lock(self):
|
||||
"""Acquire and release locks from threads"""
|
||||
threads = []
|
||||
|
||||
id = "object_id"
|
||||
|
||||
number = 500
|
||||
|
||||
for i in range(1, number):
|
||||
threads.append(
|
||||
eventlet.spawn(self._run_acquire_release_sqlite_lock, id, i)
|
||||
)
|
||||
|
||||
[t.wait() for t in threads]
|
||||
[t.kill() for t in threads]
|
||||
t = threading.Thread(target=self._run_acquire_release_sqlite_lock,
|
||||
args=[id, i])
|
||||
t.start()
|
||||
threads.append(t)
|
||||
[t.join(timeout=10) for t in threads]
|
||||
|
||||
self.assertEqual(1, len(sqlite_lock.get_locks()))
|
||||
|
||||
@ -107,22 +106,22 @@ class SQLiteLocksTest(test_base.DbTestCase):
|
||||
return wf_ex.name
|
||||
|
||||
def test_correct_locking(self):
|
||||
"""Verify overlap between threads"""
|
||||
wf_ex = db_api.create_workflow_execution(WF_EXEC)
|
||||
|
||||
threads = []
|
||||
|
||||
number = 500
|
||||
|
||||
for i in range(1, number):
|
||||
threads.append(
|
||||
eventlet.spawn(self._run_correct_locking, wf_ex)
|
||||
)
|
||||
|
||||
[t.wait() for t in threads]
|
||||
[t.kill() for t in threads]
|
||||
t = threading.Thread(target=self._run_correct_locking,
|
||||
args=[wf_ex])
|
||||
t.start()
|
||||
threads.append(t)
|
||||
[t.join(timeout=10) for t in threads]
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
print("Correct locking test gave object name: %s" % wf_ex.name)
|
||||
|
||||
self.assertEqual(str(number), wf_ex.name)
|
||||
self.assertEqual(1, len(sqlite_lock.get_locks()))
|
||||
sqlite_lock.cleanup()
|
||||
|
Loading…
x
Reference in New Issue
Block a user