From ac58423860a42137ae0cf239693a81c08d079fc7 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Thu, 27 Oct 2016 16:46:23 +0300 Subject: [PATCH] [zmq] Cleanup changes to zmq-specific f-tests Change-Id: Icce92106a0a0a07a4f2d19fe8bcd7c2a6fa530c8 --- oslo_messaging/tests/functional/utils.py | 7 ++- .../tests/functional/zmq/multiproc_utils.py | 45 +++++++++---------- .../tests/functional/zmq/test_startup.py | 15 +++---- setup-test-env-zmq-pub-sub.sh | 7 +++ tox.ini | 4 +- 5 files changed, 41 insertions(+), 37 deletions(-) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 7df5bb849..7956cb10e 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -127,7 +127,7 @@ class RpcServerGroupFixture(fixtures.Fixture): use_fanout_ctrl=False, endpoint=None): self.conf = conf self.url = url - # NOTE(sileht): topic and servier_name must be uniq + # NOTE(sileht): topic and server_name must be unique # to be able to run all tests in parallel self.topic = topic or str(uuid.uuid4()) self.names = names or ["server_%i_%s" % (i, str(uuid.uuid4())[:8]) @@ -162,7 +162,7 @@ class RpcServerGroupFixture(fixtures.Fixture): else: if server == 'all': target = self._target(fanout=True) - elif server >= 0 and server < len(self.targets): + elif 0 <= server < len(self.targets): target = self.targets[server] else: raise ValueError("Invalid value for server: %r" % server) @@ -177,12 +177,11 @@ class RpcServerGroupFixture(fixtures.Fixture): if server is None: for i in range(len(self.servers)): self.client(i).ping() - time.sleep(0.3) else: if server == 'all': for s in self.servers: s.syncq.get(timeout=5) - elif server >= 0 and server < len(self.targets): + elif 0 <= server < len(self.targets): self.servers[server].syncq.get(timeout=5) else: raise ValueError("Invalid value for server: %r" % server) diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py index 4a1498a13..cf3b6e3a5 100644 --- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py +++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py @@ -27,11 +27,10 @@ import oslo_messaging from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging.tests.functional import utils +LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -LOG = logging.getLogger(__name__) - class QueueHandler(logging.Handler): """This is a logging handler which sends events to a multiprocessing queue. @@ -158,6 +157,7 @@ class Server(object): LOG.debug("Waiting for the stop signal ...") time.sleep(1) self.rpc_server.stop() + self.rpc_server.wait() LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid()) def cleanup(self): @@ -180,13 +180,13 @@ class Server(object): pass -class MutliprocTestCase(utils.SkipIfNoTransportURL): +class MultiprocTestCase(utils.SkipIfNoTransportURL): def setUp(self): - super(MutliprocTestCase, self).setUp(conf=cfg.ConfigOpts()) + super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts()) - if not self.url.startswith("zmq:"): - self.skipTest("ZeroMQ specific skipped ...") + if not self.url.startswith("zmq"): + self.skipTest("ZeroMQ specific skipped...") self.transport = oslo_messaging.get_transport(self.conf, url=self.url) @@ -204,30 +204,29 @@ class MutliprocTestCase(utils.SkipIfNoTransportURL): self.conf.project = "test_project" def tearDown(self): - super(MutliprocTestCase, self).tearDown() for process in self.spawned: process.cleanup() + super(MultiprocTestCase, self).tearDown() def get_client(self, topic): return Client(self.transport, topic) - def spawn_server(self, name, wait_for_server=False, topic=None): - srv = Server(self.conf, self.log_queue, self.url, name, topic) - LOG.debug("[SPAWN] %s (starting)...", srv.name) - srv.start() + def spawn_server(self, wait_for_server=False, topic=None): + name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8]) + server = Server(self.conf, self.log_queue, self.url, name, topic) + LOG.debug("[SPAWN] %s (starting)...", server.name) + server.start() if wait_for_server: - while not srv.ready.value: + while not server.ready.value: LOG.debug("[SPAWN] %s (waiting for server ready)...", - srv.name) + server.name) time.sleep(1) - LOG.debug("[SPAWN] Server %s:%d started.", srv.name, srv.process.pid) - self.spawned.append(srv) - return srv + LOG.debug("[SPAWN] Server %s:%d started.", + server.name, server.process.pid) + self.spawned.append(server) + return server - def spawn_servers(self, number, wait_for_server=False, random_topic=True): - common_topic = str(uuid.uuid4()) if random_topic else None - names = ["server_%i_%s" % (i, str(uuid.uuid4())[:8]) - for i in range(number)] - for name in names: - server = self.spawn_server(name, wait_for_server, common_topic) - self.spawned.append(server) + def spawn_servers(self, number, wait_for_server=False, common_topic=True): + topic = str(uuid.uuid4()) if common_topic else None + for _ in range(number): + self.spawn_server(wait_for_server, topic) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py index f1b89b06b..9aa4aa309 100644 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ b/oslo_messaging/tests/functional/zmq/test_startup.py @@ -18,11 +18,10 @@ import sys from oslo_messaging.tests.functional.zmq import multiproc_utils - LOG = logging.getLogger(__name__) -class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): +class StartupOrderTestCase(multiproc_utils.MultiprocTestCase): def setUp(self): super(StartupOrderTestCase, self).setUp() @@ -30,26 +29,26 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): self.conf.prog = "test_prog" self.conf.project = "test_project" - self.config(rpc_response_timeout=30) + self.config(rpc_response_timeout=10) log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir, str(os.getpid()) + ".log") sys.stdout = open(log_path, "w", buffering=0) - def test_call_server_before_client(self): - self.spawn_servers(3, wait_for_server=True, random_topic=False) + def test_call_client_wait_for_server(self): + self.spawn_servers(3, wait_for_server=True, common_topic=True) servers = self.spawned client = self.get_client(servers[0].topic) - for i in range(3): + for _ in range(3): reply = client.call_a() self.assertIsNotNone(reply) self.assertEqual(3, len(client.replies)) def test_call_client_dont_wait_for_server(self): - self.spawn_servers(3, wait_for_server=False, random_topic=False) + self.spawn_servers(3, wait_for_server=False, common_topic=True) servers = self.spawned client = self.get_client(servers[0].topic) - for i in range(3): + for _ in range(3): reply = client.call_a() self.assertIsNotNone(reply) self.assertEqual(3, len(client.replies)) diff --git a/setup-test-env-zmq-pub-sub.sh b/setup-test-env-zmq-pub-sub.sh index 5551be5f9..0d9bea8c1 100755 --- a/setup-test-env-zmq-pub-sub.sh +++ b/setup-test-env-zmq-pub-sub.sh @@ -12,6 +12,9 @@ export ZMQ_REDIS_PORT=65123 export ZMQ_IPC_DIR=${DATADIR} export ZMQ_USE_PUB_SUB=true export ZMQ_USE_ROUTER_PROXY=true +export ZMQ_USE_ACKS=false + +export ZMQ_PROXY_HOST=127.0.0.1 cat > ${DATADIR}/zmq.conf <