[zmq] Cleanup changes to zmq-specific f-tests

Change-Id: Icce92106a0a0a07a4f2d19fe8bcd7c2a6fa530c8
This commit is contained in:
Gevorg Davoian 2016-10-27 16:46:23 +03:00
parent dd99f7b1e6
commit ac58423860
5 changed files with 41 additions and 37 deletions

View File

@ -127,7 +127,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
use_fanout_ctrl=False, endpoint=None): use_fanout_ctrl=False, endpoint=None):
self.conf = conf self.conf = conf
self.url = url self.url = url
# NOTE(sileht): topic and servier_name must be uniq # NOTE(sileht): topic and server_name must be unique
# to be able to run all tests in parallel # to be able to run all tests in parallel
self.topic = topic or str(uuid.uuid4()) self.topic = topic or str(uuid.uuid4())
self.names = names or ["server_%i_%s" % (i, str(uuid.uuid4())[:8]) self.names = names or ["server_%i_%s" % (i, str(uuid.uuid4())[:8])
@ -162,7 +162,7 @@ class RpcServerGroupFixture(fixtures.Fixture):
else: else:
if server == 'all': if server == 'all':
target = self._target(fanout=True) target = self._target(fanout=True)
elif server >= 0 and server < len(self.targets): elif 0 <= server < len(self.targets):
target = self.targets[server] target = self.targets[server]
else: else:
raise ValueError("Invalid value for server: %r" % server) raise ValueError("Invalid value for server: %r" % server)
@ -177,12 +177,11 @@ class RpcServerGroupFixture(fixtures.Fixture):
if server is None: if server is None:
for i in range(len(self.servers)): for i in range(len(self.servers)):
self.client(i).ping() self.client(i).ping()
time.sleep(0.3)
else: else:
if server == 'all': if server == 'all':
for s in self.servers: for s in self.servers:
s.syncq.get(timeout=5) s.syncq.get(timeout=5)
elif server >= 0 and server < len(self.targets): elif 0 <= server < len(self.targets):
self.servers[server].syncq.get(timeout=5) self.servers[server].syncq.get(timeout=5)
else: else:
raise ValueError("Invalid value for server: %r" % server) raise ValueError("Invalid value for server: %r" % server)

View File

@ -27,11 +27,10 @@ import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests.functional import utils from oslo_messaging.tests.functional import utils
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq() zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
class QueueHandler(logging.Handler): class QueueHandler(logging.Handler):
"""This is a logging handler which sends events to a multiprocessing queue. """This is a logging handler which sends events to a multiprocessing queue.
@ -158,6 +157,7 @@ class Server(object):
LOG.debug("Waiting for the stop signal ...") LOG.debug("Waiting for the stop signal ...")
time.sleep(1) time.sleep(1)
self.rpc_server.stop() self.rpc_server.stop()
self.rpc_server.wait()
LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid()) LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid())
def cleanup(self): def cleanup(self):
@ -180,12 +180,12 @@ class Server(object):
pass pass
class MutliprocTestCase(utils.SkipIfNoTransportURL): class MultiprocTestCase(utils.SkipIfNoTransportURL):
def setUp(self): def setUp(self):
super(MutliprocTestCase, self).setUp(conf=cfg.ConfigOpts()) super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts())
if not self.url.startswith("zmq:"): if not self.url.startswith("zmq"):
self.skipTest("ZeroMQ specific skipped...") self.skipTest("ZeroMQ specific skipped...")
self.transport = oslo_messaging.get_transport(self.conf, url=self.url) self.transport = oslo_messaging.get_transport(self.conf, url=self.url)
@ -204,30 +204,29 @@ class MutliprocTestCase(utils.SkipIfNoTransportURL):
self.conf.project = "test_project" self.conf.project = "test_project"
def tearDown(self): def tearDown(self):
super(MutliprocTestCase, self).tearDown()
for process in self.spawned: for process in self.spawned:
process.cleanup() process.cleanup()
super(MultiprocTestCase, self).tearDown()
def get_client(self, topic): def get_client(self, topic):
return Client(self.transport, topic) return Client(self.transport, topic)
def spawn_server(self, name, wait_for_server=False, topic=None): def spawn_server(self, wait_for_server=False, topic=None):
srv = Server(self.conf, self.log_queue, self.url, name, topic) name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8])
LOG.debug("[SPAWN] %s (starting)...", srv.name) server = Server(self.conf, self.log_queue, self.url, name, topic)
srv.start() LOG.debug("[SPAWN] %s (starting)...", server.name)
server.start()
if wait_for_server: if wait_for_server:
while not srv.ready.value: while not server.ready.value:
LOG.debug("[SPAWN] %s (waiting for server ready)...", LOG.debug("[SPAWN] %s (waiting for server ready)...",
srv.name) server.name)
time.sleep(1) time.sleep(1)
LOG.debug("[SPAWN] Server %s:%d started.", srv.name, srv.process.pid) LOG.debug("[SPAWN] Server %s:%d started.",
self.spawned.append(srv) server.name, server.process.pid)
return srv
def spawn_servers(self, number, wait_for_server=False, random_topic=True):
common_topic = str(uuid.uuid4()) if random_topic else None
names = ["server_%i_%s" % (i, str(uuid.uuid4())[:8])
for i in range(number)]
for name in names:
server = self.spawn_server(name, wait_for_server, common_topic)
self.spawned.append(server) self.spawned.append(server)
return server
def spawn_servers(self, number, wait_for_server=False, common_topic=True):
topic = str(uuid.uuid4()) if common_topic else None
for _ in range(number):
self.spawn_server(wait_for_server, topic)

View File

@ -18,11 +18,10 @@ import sys
from oslo_messaging.tests.functional.zmq import multiproc_utils from oslo_messaging.tests.functional.zmq import multiproc_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): class StartupOrderTestCase(multiproc_utils.MultiprocTestCase):
def setUp(self): def setUp(self):
super(StartupOrderTestCase, self).setUp() super(StartupOrderTestCase, self).setUp()
@ -30,26 +29,26 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase):
self.conf.prog = "test_prog" self.conf.prog = "test_prog"
self.conf.project = "test_project" self.conf.project = "test_project"
self.config(rpc_response_timeout=30) self.config(rpc_response_timeout=10)
log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir, log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
str(os.getpid()) + ".log") str(os.getpid()) + ".log")
sys.stdout = open(log_path, "w", buffering=0) sys.stdout = open(log_path, "w", buffering=0)
def test_call_server_before_client(self): def test_call_client_wait_for_server(self):
self.spawn_servers(3, wait_for_server=True, random_topic=False) self.spawn_servers(3, wait_for_server=True, common_topic=True)
servers = self.spawned servers = self.spawned
client = self.get_client(servers[0].topic) client = self.get_client(servers[0].topic)
for i in range(3): for _ in range(3):
reply = client.call_a() reply = client.call_a()
self.assertIsNotNone(reply) self.assertIsNotNone(reply)
self.assertEqual(3, len(client.replies)) self.assertEqual(3, len(client.replies))
def test_call_client_dont_wait_for_server(self): def test_call_client_dont_wait_for_server(self):
self.spawn_servers(3, wait_for_server=False, random_topic=False) self.spawn_servers(3, wait_for_server=False, common_topic=True)
servers = self.spawned servers = self.spawned
client = self.get_client(servers[0].topic) client = self.get_client(servers[0].topic)
for i in range(3): for _ in range(3):
reply = client.call_a() reply = client.call_a()
self.assertIsNotNone(reply) self.assertIsNotNone(reply)
self.assertEqual(3, len(client.replies)) self.assertEqual(3, len(client.replies))

View File

@ -12,6 +12,9 @@ export ZMQ_REDIS_PORT=65123
export ZMQ_IPC_DIR=${DATADIR} export ZMQ_IPC_DIR=${DATADIR}
export ZMQ_USE_PUB_SUB=true export ZMQ_USE_PUB_SUB=true
export ZMQ_USE_ROUTER_PROXY=true export ZMQ_USE_ROUTER_PROXY=true
export ZMQ_USE_ACKS=false
export ZMQ_PROXY_HOST=127.0.0.1
cat > ${DATADIR}/zmq.conf <<EOF cat > ${DATADIR}/zmq.conf <<EOF
[DEFAULT] [DEFAULT]
@ -21,8 +24,12 @@ rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR} rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
use_pub_sub=${ZMQ_USE_PUB_SUB} use_pub_sub=${ZMQ_USE_PUB_SUB}
use_router_proxy=${ZMQ_USE_ROUTER_PROXY} use_router_proxy=${ZMQ_USE_ROUTER_PROXY}
rpc_use_acks=${ZMQ_USE_ACKS}
[matchmaker_redis] [matchmaker_redis]
port=${ZMQ_REDIS_PORT} port=${ZMQ_REDIS_PORT}
[zmq_proxy_opts]
host=${ZMQ_PROXY_HOST}
EOF EOF
redis-server --port $ZMQ_REDIS_PORT & redis-server --port $ZMQ_REDIS_PORT &

View File

@ -79,10 +79,10 @@ basepython = python3.4
commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py27-func-zeromq-proxy] [testenv:py27-func-zeromq-proxy]
commands = {toxinidir}/setup-test-env-zmq-proxy.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' commands = {toxinidir}/setup-test-env-zmq-proxy.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py27-func-zeromq-pub-sub] [testenv:py27-func-zeromq-pub-sub]
commands = {toxinidir}/setup-test-env-zmq-pub-sub.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' commands = {toxinidir}/setup-test-env-zmq-pub-sub.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:bandit] [testenv:bandit]
deps = -r{toxinidir}/test-requirements.txt deps = -r{toxinidir}/test-requirements.txt