Merge "Add a multi-threaded test"

This commit is contained in:
Zuul 2024-10-22 18:37:26 +00:00 committed by Gerrit Code Review
commit 43c26b9dc2

View File

@ -22,11 +22,16 @@
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
try:
import queue
except ImportError: # py2
import Queue as queue
import random
import resource
import string
import sys
import tempfile
import threading
import unittest
from itertools import combinations
@ -179,6 +184,26 @@ class TestPyECLibDriver(unittest.TestCase):
pass
self.assertEqual(available_ec_types, VALID_EC_TYPES)
def test_create_in_threads(self):
def create_backend(kwargs, q):
q.put(ECDriver(**kwargs))
for backend in VALID_EC_TYPES:
q = queue.Queue()
threads = [
threading.Thread(
target=create_backend,
args=({'k': 10, 'm': 5, 'ec_type': backend}, q))
for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
results = []
while not q.empty():
results.append(q.get())
self.assertEqual(len(results), 5)
def test_valid_algo(self):
print("")
for _type in ALL_EC_TYPES: