remove zmq tests

At the Rocky PTG the Oslo team decided to once again deprecate and
remove the ZMQ driver. This is the first part of doing that, which
removes the unit tests and extra functional test jobs, which are
failing.

Change-Id: Ia02adf122d2d4ff281e7c9fd3dff8894da241925
Signed-off-by: Doug Hellmann <doug@doughellmann.com>
This commit is contained in:
Doug Hellmann 2018-03-26 11:32:49 -04:00
parent 222a939361
commit fca263c43c
17 changed files with 0 additions and 1683 deletions

View File

@ -18,27 +18,6 @@
tox_envlist: py27-func-rabbit
bindep_profile: rabbit
- job:
name: oslo.messaging-tox-py27-func-zmq
parent: openstack-tox-py27
vars:
tox_envlist: py27-func-zmq
bindep_profile: zmq
- job:
name: oslo.messaging-tox-py27-func-zmq-proxy
parent: openstack-tox-py27
vars:
tox_envlist: py27-func-zmq-proxy
bindep_profile: zmq
- job:
name: oslo.messaging-tox-py27-func-zmq-pubsub
parent: openstack-tox-py27
vars:
tox_envlist: py27-func-zmq-pubsub
bindep_profile: zmq
- job:
name: oslo.messaging-tox-py35-func-amqp1
parent: openstack-tox-py35
@ -52,13 +31,6 @@
tox_envlist: py35-func-rabbit
bindep_profile: rabbit
- job:
name: oslo.messaging-tox-py35-func-zmq
parent: openstack-tox-py35
vars:
tox_envlist: py35-func-zmq
bindep_profile: zmq
- job:
name: oslo.messaging-src-dsvm-full-rabbit-default
parent: legacy-dsvm-base
@ -115,17 +87,6 @@
- openstack/devstack-plugin-kafka
- openstack/oslo.messaging
- job:
name: oslo.messaging-src-dsvm-full-zmq-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/run.yaml
post-run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/post.yaml
timeout: 10800
required-projects:
- openstack-infra/devstack-gate
- openstack/devstack-plugin-zmq
- openstack/oslo.messaging
- job:
name: oslo.messaging-src-grenade-dsvm
parent: legacy-dsvm-base
@ -193,24 +154,6 @@
- openstack/dib-utils
- openstack/diskimage-builder
- job:
name: oslo.messaging-telemetry-dsvm-integration-zmq
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml
timeout: 4200
required-projects:
- openstack-infra/devstack-gate
- openstack/aodh
- openstack/ceilometer
- openstack/devstack-plugin-zmq
- openstack/oslo.messaging
- openstack/panko
# following are required when DEVSTACK_GATE_HEAT, which this
# job turns on
- openstack/dib-utils
- openstack/diskimage-builder
- job:
name: oslo.messaging-telemetry-dsvm-integration-rabbit
parent: legacy-dsvm-base
@ -268,19 +211,6 @@
- openstack/oslo.messaging
- openstack/tempest
- job:
name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml
post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/post.yaml
timeout: 7800
required-projects:
- openstack-infra/devstack-gate
- openstack/devstack-plugin-zmq
- openstack/neutron
- openstack/oslo.messaging
- openstack/tempest
- project:
check:
@ -290,18 +220,10 @@
- oslo.messaging-tox-py27-func-kafka:
voting: false
- oslo.messaging-tox-py27-func-rabbit
- oslo.messaging-tox-py27-func-zmq-proxy:
voting: false
- oslo.messaging-tox-py27-func-zmq-pubsub:
voting: false
- oslo.messaging-tox-py27-func-zmq:
voting: false
- oslo.messaging-tox-py35-func-amqp1:
voting: false
- oslo.messaging-tox-py35-func-rabbit:
voting: false
- oslo.messaging-tox-py35-func-zmq:
voting: false
- oslo.messaging-src-dsvm-full-rabbit-default
- oslo.messaging-src-dsvm-full-amqp1-hybrid:
@ -314,8 +236,6 @@
voting: false
- oslo.messaging-src-dsvm-full-kafka-default:
voting: false
- oslo.messaging-src-dsvm-full-zmq-default:
voting: false
- oslo.messaging-src-grenade-dsvm:
voting: false
@ -327,8 +247,6 @@
voting: false
- oslo.messaging-telemetry-dsvm-integration-kafka:
voting: false
- oslo.messaging-telemetry-dsvm-integration-zmq:
voting: false
- oslo.messaging-tempest-neutron-dsvm-src-rabbit-default
- oslo.messaging-tempest-neutron-dsvm-src-amqp1-hybrid:
@ -336,8 +254,6 @@
branches: ^(?!stable/ocata).*$
- oslo.messaging-tempest-neutron-dsvm-src-kafka-default:
voting: false
- oslo.messaging-tempest-neutron-dsvm-src-zmq-default:
voting: false
gate:
jobs:

View File

@ -1,140 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
from stevedore import driver
import testscenarios
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
from oslo_utils import importutils
redis = importutils.try_import('redis')
def redis_available():
'''Helper to see if local redis server is running'''
if not redis:
return False
try:
redis.StrictRedis(socket_timeout=1).ping()
return True
except redis.exceptions.ConnectionError:
return False
load_tests = testscenarios.load_tests_apply_scenarios
class TestImplMatchmaker(test_utils.BaseTestCase):
scenarios = [
("dummy", {"rpc_zmq_matchmaker": "dummy"}),
("redis", {"rpc_zmq_matchmaker": "redis"}),
]
def setUp(self):
super(TestImplMatchmaker, self).setUp()
if self.rpc_zmq_matchmaker == "redis":
if not redis_available():
self.skipTest("redis unavailable")
self.test_matcher = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.rpc_zmq_matchmaker,
).driver(self.conf)
if self.rpc_zmq_matchmaker == "redis":
for redis_instance in self.test_matcher._redis_instances:
self.addCleanup(redis_instance.flushdb)
self.target = oslo_messaging.Target(topic="test_topic")
self.host1 = b"test_host1"
self.host2 = b"test_host2"
def test_register(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertEqual([self.host1],
self.test_matcher.get_hosts(self.target, "test"))
def test_register_two_hosts(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host2,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1, self.host2])
def test_register_unregister(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host2,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.unregister(self.target, self.host2, "test")
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
def test_register_two_same_hosts(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertEqual([self.host1],
self.test_matcher.get_hosts(self.target, "test"))
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertEqual([], self.test_matcher.get_hosts(target, "test"))
def test_handle_redis_package_error(self):
if self.rpc_zmq_matchmaker == "redis":
# move 'redis' variable to prevent this case affect others
module = inspect.getmodule(self.test_matcher)
redis_package = module.redis
# 'redis' variable is set to None, when package importing is failed
module.redis = None
self.assertRaises(ImportError, self.test_matcher.__init__,
self.conf)
# retrieve 'redis' variable which is set originally
module.redis = redis_package

View File

@ -1,129 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging.tests.drivers.zmq import zmq_common
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqTestPortsRange, self).setUp()
# Set config values
kwargs = {'rpc_zmq_min_port': 5555,
'rpc_zmq_max_port': 5560}
self.config(group='oslo_messaging_zmq', **kwargs)
def test_ports_range(self):
listeners = []
for i in range(10):
try:
target = oslo_messaging.Target(topic='testtopic_' + str(i))
new_listener = self.driver.listen(target, None, None)
listeners.append(new_listener)
except zmq_socket.ZmqPortBusy:
pass
self.assertLessEqual(len(listeners), 5)
for l in listeners:
l.cleanup()
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class TestZmqBasics(zmq_common.ZmqBaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqBasics, self).setUp()
self.target = oslo_messaging.Target(topic='topic')
self.ctxt = {'key': 'value'}
self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}}
def test_send_call_without_method_failure(self):
self.message.pop('method')
self.listener.listen(self.target)
self.assertRaises(KeyError, self.driver.send,
self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
def _check_listener_received(self):
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.ctxt, self.listener.message.ctxt)
self.assertEqual(self.message, self.listener.message.message)
def test_send_call_success(self):
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
self.assertTrue(result)
self._check_listener_received()
def test_send_call_direct_success(self):
self.target.server = 'server'
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
self.assertTrue(result)
self._check_listener_received()
def test_send_cast_direct_success(self):
self.target.server = 'server'
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()
def test_send_fanout_success(self):
self.target.fanout = True
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()
def test_send_notify_success(self):
self.listener.listen_notifications([(self.target, 'info')])
self.target.topic += '.info'
result = self.driver.send_notification(self.target, self.ctxt,
self.message, '3.0')
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()

View File

@ -1,150 +0,0 @@
# Copyright 2015-2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import time
import msgpack
import six
import testscenarios
from oslo_config import cfg
import oslo_messaging
from oslo_messaging._drivers.zmq_driver.proxy.central \
import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_version
from oslo_messaging.tests.drivers.zmq import zmq_common
load_tests = testscenarios.load_tests_apply_scenarios
zmq = zmq_async.import_zmq()
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
title='ZeroMQ proxy options')
cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
class TestPubSub(zmq_common.ZmqBaseTestCase):
LISTENERS_COUNT = 3
scenarios = [
('json', {'serialization': 'json',
'dumps': lambda obj: six.b(json.dumps(obj))}),
('msgpack', {'serialization': 'msgpack',
'dumps': msgpack.dumps})
]
def setUp(self):
super(TestPubSub, self).setUp()
kwargs = {'use_pub_sub': True,
'rpc_zmq_serialization': self.serialization}
self.config(group='oslo_messaging_zmq', **kwargs)
self.config(host="127.0.0.1", group="zmq_proxy_opts")
self.config(publisher_port=0, group="zmq_proxy_opts")
self.publisher = zmq_publisher_proxy.PublisherProxy(
self.conf, self.driver.matchmaker)
self.driver.matchmaker.register_publisher(
(self.publisher.host, ''),
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.listeners = []
for _ in range(self.LISTENERS_COUNT):
self.listeners.append(zmq_common.TestServerListener(self.driver))
def tearDown(self):
super(TestPubSub, self).tearDown()
self.publisher.cleanup()
for listener in self.listeners:
listener.stop()
def _send_request(self, target):
# Needed only in test env to give listener a chance to connect
# before request fires
time.sleep(1)
context = {}
message = {'method': 'hello-world'}
self.publisher.send_request(
[b"reply_id",
b'',
six.b(zmq_version.MESSAGE_VERSION),
six.b(str(zmq_names.CAST_FANOUT_TYPE)),
zmq_address.target_to_subscribe_filter(target),
b"message_id",
self.dumps([context, message])]
)
def _check_listener(self, listener):
listener._received.wait(timeout=5)
self.assertTrue(listener._received.isSet())
method = listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def _check_listener_negative(self, listener):
listener._received.wait(timeout=1)
self.assertFalse(listener._received.isSet())
def test_single_listener(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listener.listen(target)
self._send_request(target)
self._check_listener(self.listener)
def test_all_listeners(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
for listener in self.listeners:
listener.listen(target)
self._send_request(target)
for listener in self.listeners:
self._check_listener(listener)
def test_filtered(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
target_wrong = oslo_messaging.Target(topic='wrong', fanout=True)
self.listeners[0].listen(target)
self.listeners[1].listen(target)
self.listeners[2].listen(target_wrong)
self._send_request(target)
self._check_listener(self.listeners[0])
self._check_listener(self.listeners[1])
self._check_listener_negative(self.listeners[2])
def test_topic_part_matching(self):
target = oslo_messaging.Target(topic='testtopic', server='server')
target_part = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listeners[0].listen(target)
self.listeners[1].listen(target)
self._send_request(target_part)
self._check_listener(self.listeners[0])
self._check_listener(self.listeners[1])

View File

@ -1,80 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestRoutingTable(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingTable, self).setUp()
def test_get_next_while_origin_changed(self):
table = zmq_routing_table.RoutingTable(self.conf)
table.register("topic1.server1", "1")
table.register("topic1.server1", "2")
table.register("topic1.server1", "3")
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for i in range(3):
result.append(next(rr_gen))
self.assertEqual(3, len(result))
self.assertIn("1", result)
self.assertIn("2", result)
self.assertIn("3", result)
table.register("topic1.server1", "4")
table.register("topic1.server1", "5")
table.register("topic1.server1", "6")
result = []
for i in range(6):
result.append(next(rr_gen))
self.assertEqual(6, len(result))
self.assertIn("1", result)
self.assertIn("2", result)
self.assertIn("3", result)
self.assertIn("4", result)
self.assertIn("5", result)
self.assertIn("6", result)
def test_no_targets(self):
table = zmq_routing_table.RoutingTable(self.conf)
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for t in rr_gen:
result.append(t)
self.assertEqual(0, len(result))
def test_target_unchanged(self):
table = zmq_routing_table.RoutingTable(self.conf)
table.register("topic1.server1", "1")
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for i in range(3):
result.append(next(rr_gen))
self.assertEqual(["1", "1", "1"], result)

View File

@ -1,226 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import mock
import testtools
import time
import oslo_messaging
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver.server.consumers.zmq_dealer_consumer \
import DealerConsumerWithAcks
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging.tests.drivers.zmq import zmq_common
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestZmqAckManager(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqAckManager, self).setUp()
# register and set necessary config opts
self.messaging_conf.transport_driver = 'zmq'
zmq_options.register_opts(self.conf, mock.MagicMock())
kwargs = {'rpc_zmq_matchmaker': 'dummy',
'use_pub_sub': False,
'use_router_proxy': True,
'rpc_thread_pool_size': 1,
'rpc_use_acks': True,
'rpc_ack_timeout_base': 5,
'rpc_ack_timeout_multiplier': 1,
'rpc_retry_attempts': 2}
self.config(group='oslo_messaging_zmq', **kwargs)
self.conf.register_opts(zmq_proxy.zmq_proxy_opts,
group='zmq_proxy_opts')
# mock set_result method of futures
self.set_result_patcher = mock.patch.object(
zmq_receivers.futurist.Future, 'set_result',
side_effect=zmq_receivers.futurist.Future.set_result, autospec=True
)
self.set_result = self.set_result_patcher.start()
# mock send method of senders
self.send_patcher = mock.patch.object(
zmq_senders.RequestSenderProxy, 'send',
side_effect=zmq_senders.RequestSenderProxy.send, autospec=True
)
self.send = self.send_patcher.start()
# get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# prepare and launch proxy
self.proxy = zmq_proxy.ZmqProxy(self.conf)
vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
self.executor = zmq_async.get_executor(self.proxy.run)
self.executor.execute()
# create listener
self.listener = zmq_common.TestServerListener(self.driver)
# create target and message
self.target = oslo_messaging.Target(topic='topic', server='server')
self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}}
# start listening to target
self.listener.listen(self.target)
# get ack manager
self.ack_manager = self.driver.client.get().publishers['default']
self.addCleanup(
zmq_common.StopRpc(
self, [('listener', 'stop'), ('executor', 'stop'),
('proxy', 'close'), ('driver', 'cleanup'),
('send_patcher', 'stop'),
('set_result_patcher', 'stop')]
)
)
# wait for all connections to be established
# and all parties to be ready for messaging
time.sleep(1)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True)
def test_cast_success_without_retries(self, received_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
def test_cast_success_with_one_retry(self):
with mock.patch.object(DealerConsumerWithAcks,
'_acknowledge') as lost_ack_mock:
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, lost_ack_mock.call_count)
self.assertEqual(0, self.set_result.call_count)
self.listener._received.clear()
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True) as received_ack_mock:
self.ack_manager.pool.shutdown(wait=True)
self.assertFalse(self.listener._received.isSet())
self.assertEqual(2, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
def test_cast_success_with_two_retries(self):
with mock.patch.object(DealerConsumerWithAcks,
'_acknowledge') as lost_ack_mock:
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, lost_ack_mock.call_count)
self.assertEqual(0, self.set_result.call_count)
self.listener._received.clear()
self.listener._received.wait(7.5)
self.assertFalse(self.listener._received.isSet())
self.assertEqual(2, self.send.call_count)
self.assertEqual(2, lost_ack_mock.call_count)
self.assertEqual(0, self.set_result.call_count)
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True) as received_ack_mock:
self.ack_manager.pool.shutdown(wait=True)
self.assertFalse(self.listener._received.isSet())
self.assertEqual(3, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
def test_cast_failure_exhausted_retries(self, lost_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(3, self.send.call_count)
self.assertEqual(3, lost_ack_mock.call_count)
self.assertEqual(1, self.set_result.call_count)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True)
@mock.patch.object(DealerConsumerWithAcks, '_reply',
side_effect=DealerConsumerWithAcks._reply,
autospec=True)
@mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache',
side_effect=DealerConsumerWithAcks._reply_from_cache,
autospec=True)
def test_call_success_without_retries(self, unused_reply_from_cache_mock,
received_reply_mock,
received_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=True, timeout=10
)
self.assertIsNotNone(result)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(3, self.set_result.call_count)
received_reply_mock.assert_called_once_with(mock.ANY, mock.ANY,
reply=True, failure=None)
self.assertEqual(0, unused_reply_from_cache_mock.call_count)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
@mock.patch.object(DealerConsumerWithAcks, '_reply')
@mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache')
def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock,
lost_reply_mock, lost_ack_mock):
self.assertRaises(oslo_messaging.MessagingTimeout,
self.driver.send,
self.target, {}, self.message,
wait_for_reply=True, timeout=20)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(3, self.send.call_count)
self.assertEqual(3, lost_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
lost_reply_mock.assert_called_once_with(mock.ANY,
reply=True, failure=None)
self.assertEqual(2, lost_reply_from_cache_mock.call_count)

View File

@ -1,67 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
load_tests = testscenarios.load_tests_apply_scenarios
class TestZmqAddress(test_utils.BaseTestCase):
scenarios = [
('router', {'listener_type': zmq_names.socket_type_str(zmq.ROUTER)}),
('dealer', {'listener_type': zmq_names.socket_type_str(zmq.DEALER)})
]
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_only(self):
target = oslo_messaging.Target(topic='topic')
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_server_round_robin(self):
target = oslo_messaging.Target(topic='topic', server='server')
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic/server', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_fanout(self):
target = oslo_messaging.Target(topic='topic', fanout=True)
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_server_fanout(self):
target = oslo_messaging.Target(topic='topic', server='server',
fanout=True)
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_server_fanout_no_prefix(self):
target = oslo_messaging.Target(topic='topic', server='server',
fanout=True)
key = zmq_address.target_to_key(target)
self.assertEqual('topic', key)

View File

@ -1,93 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import mock
import testtools
from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestImportZmq(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestImportZmq, self).setUp()
def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.import_zmq()
mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
def test_when_evetlet_is_unavailable_then_load_zmq(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.import_zmq()
mock_try_import.assert_called_with('zmq', default=None)
class TestGetPoller(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestGetPoller, self).setUp()
def test_when_eventlet_is_available_then_return_GreenPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
poller = zmq_async.get_poller()
self.assertIsInstance(poller, green_poller.GreenPoller)
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
poller = zmq_async.get_poller()
self.assertIsInstance(poller, threading_poller.ThreadingPoller)
class TestGetExecutor(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestGetExecutor, self).setUp()
def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
executor = zmq_async.get_executor('any method')
self.assertIsInstance(executor, green_poller.GreenExecutor)
self.assertEqual('any method', executor._method)
def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
executor = zmq_async.get_executor('any method')
self.assertIsInstance(executor,
threading_poller.ThreadingExecutor)
self.assertEqual('any method', executor._method)

View File

@ -1,127 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import mock
import testtools
import oslo_messaging
from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base \
import MatchmakerDummy
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
redis = zmq_matchmaker_redis.redis
sentinel = zmq_matchmaker_redis.redis_sentinel
class TestZmqTransportUrl(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqTransportUrl, self).setUp()
def setup_url(self, url):
transport = oslo_messaging.get_transport(self.conf, url)
self.addCleanup(transport.cleanup)
driver = transport._driver
return driver, url
def mock_redis(self):
if redis is None:
self.skipTest("redis not available")
else:
redis_patcher = mock.patch.object(redis, 'StrictRedis')
self.addCleanup(redis_patcher.stop)
return redis_patcher.start()
def mock_sentinel(self):
if sentinel is None:
self.skipTest("sentinel not available")
else:
sentinel_patcher = mock.patch.object(sentinel, 'Sentinel')
self.addCleanup(sentinel_patcher.stop)
return sentinel_patcher.start()
def test_empty_url(self):
self.mock_redis()
driver, url = self.setup_url("zmq:///")
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq', driver.matchmaker.url.transport)
def test_error_url(self):
self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
def test_dummy_url(self):
driver, url = self.setup_url("zmq+dummy:///")
self.assertIs(MatchmakerDummy,
driver.matchmaker.__class__)
self.assertEqual('zmq+dummy', driver.matchmaker.url.transport)
def test_redis_url(self):
self.mock_redis()
driver, url = self.setup_url("zmq+redis:///")
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
def test_sentinel_url(self):
self.mock_sentinel()
driver, url = self.setup_url("zmq+sentinel:///")
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
driver.matchmaker.__class__)
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
def test_host_with_credentials_url(self):
self.mock_redis()
driver, url = self.setup_url("zmq://:password@host:60000/")
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq', driver.matchmaker.url.transport)
self.assertEqual(
[{"host": "host", "port": 60000, "password": "password"}],
driver.matchmaker._redis_hosts
)
def test_redis_multiple_hosts_url(self):
self.mock_redis()
driver, url = self.setup_url(
"zmq+redis://host1:60001,host2:60002,host3:60003/"
)
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
self.assertEqual(
[{"host": "host1", "port": 60001, "password": None},
{"host": "host2", "port": 60002, "password": None},
{"host": "host3", "port": 60003, "password": None}],
driver.matchmaker._redis_hosts
)
def test_sentinel_multiple_hosts_url(self):
self.mock_sentinel()
driver, url = self.setup_url(
"zmq+sentinel://host1:20001,host2:20002,host3:20003/"
)
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
driver.matchmaker.__class__)
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
self.assertEqual(
[("host1", 20001), ("host2", 20002), ("host3", 20003)],
driver.matchmaker._sentinel_hosts
)

View File

@ -1,132 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
from oslo_messaging.tests import utils as test_utils
class TestZmqTTLCache(test_utils.BaseTestCase):
def setUp(self):
super(TestZmqTTLCache, self).setUp()
def call_count_decorator(unbound_method):
def wrapper(self, *args, **kwargs):
wrapper.call_count += 1
return unbound_method(self, *args, **kwargs)
wrapper.call_count = 0
return wrapper
zmq_ttl_cache.TTLCache._update_cache = \
call_count_decorator(zmq_ttl_cache.TTLCache._update_cache)
self.cache = zmq_ttl_cache.TTLCache(ttl=1)
self.addCleanup(lambda: self.cache.cleanup())
def _test_add_get(self):
self.cache.add('x', 'a')
self.assertEqual(self.cache.get('x'), 'a')
self.assertEqual(self.cache.get('x', 'b'), 'a')
self.assertIsNone(self.cache.get('y'))
self.assertEqual(self.cache.get('y', 'b'), 'b')
time.sleep(1)
self.assertIsNone(self.cache.get('x'))
self.assertEqual(self.cache.get('x', 'b'), 'b')
def test_add_get_with_executor(self):
self._test_add_get()
def test_add_get_without_executor(self):
self.cache._executor.stop()
self._test_add_get()
def _test_in_operator(self):
self.cache.add(1)
self.assertIn(1, self.cache)
time.sleep(0.5)
self.cache.add(2)
self.assertIn(1, self.cache)
self.assertIn(2, self.cache)
time.sleep(0.75)
self.cache.add(3)
self.assertNotIn(1, self.cache)
self.assertIn(2, self.cache)
self.assertIn(3, self.cache)
time.sleep(0.5)
self.assertNotIn(2, self.cache)
self.assertIn(3, self.cache)
def test_in_operator_with_executor(self):
self._test_in_operator()
def test_in_operator_without_executor(self):
self.cache._executor.stop()
self._test_in_operator()
def _is_expired(self, key):
with self.cache._lock:
_, expiration_time = self.cache._cache[key]
return self.cache._is_expired(expiration_time, time.time())
def test_executor(self):
self.cache.add(1)
self.assertEqual([1], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(1))
time.sleep(0.75)
self.assertEqual(1, self.cache._update_cache.call_count)
self.cache.add(2)
self.assertEqual([1, 2], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(1))
self.assertFalse(self._is_expired(2))
time.sleep(0.75)
self.assertEqual(2, self.cache._update_cache.call_count)
self.cache.add(3)
if 1 in self.cache:
self.assertEqual([1, 2, 3], sorted(self.cache._cache.keys()))
self.assertTrue(self._is_expired(1))
else:
self.assertEqual([2, 3], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(2))
self.assertFalse(self._is_expired(3))
time.sleep(0.75)
self.assertEqual(3, self.cache._update_cache.call_count)
self.assertEqual([3], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(3))

View File

@ -1,63 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver import zmq_version
from oslo_messaging.tests import utils as test_utils
class Doer(object):
def __init__(self):
self.x = 1
self.y = 2
self.z = 3
def _sudo(self):
pass
def do(self):
pass
def _do_v_1_1(self):
pass
def _do_v_2_2(self):
pass
def _do_v_3_3(self):
pass
class TestZmqVersion(test_utils.BaseTestCase):
def setUp(self):
super(TestZmqVersion, self).setUp()
self.doer = Doer()
def test_get_unknown_attr_versions(self):
self.assertRaises(AssertionError, zmq_version.get_method_versions,
self.doer, 'qwerty')
def test_get_non_method_attr_versions(self):
for attr_name in vars(self.doer):
self.assertRaises(AssertionError, zmq_version.get_method_versions,
self.doer, attr_name)
def test_get_private_method_versions(self):
self.assertRaises(AssertionError, zmq_version.get_method_versions,
self.doer, '_sudo')
def test_get_public_method_versions(self):
do_versions = zmq_version.get_method_versions(self.doer, 'do')
self.assertEqual(['1.1', '2.2', '3.3'], sorted(do_versions.keys()))

View File

@ -1,111 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import fixtures
from six.moves import mock
import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
from oslo_messaging.tests import utils as test_utils
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestServerListener(object):
def __init__(self, driver):
self.driver = driver
self.listener = None
self.executor = zmq_async.get_executor(self._run)
self._stop = threading.Event()
self._received = threading.Event()
self.message = None
def listen(self, target):
self.listener = self.driver.listen(target, None,
None)._poll_style_listener
self.executor.execute()
def listen_notifications(self, targets_and_priorities):
self.listener = self.driver.listen_for_notifications(
targets_and_priorities, None, None, None)._poll_style_listener
self.executor.execute()
def _run(self):
try:
messages = self.listener.poll()
if messages:
message = messages[0]
message.acknowledge()
self._received.set()
self.message = message
message.reply(reply=True)
except Exception:
LOG.exception(_LE("Unexpected exception occurred."))
def stop(self):
self.executor.stop()
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests """
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
zmq_options.register_opts(self.conf, mock.MagicMock())
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False,
'use_router_proxy': False,
'rpc_zmq_matchmaker': 'dummy'}
self.config(group='oslo_messaging_zmq', **kwargs)
self.config(rpc_response_timeout=5)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
self.listener = TestServerListener(self.driver)
self.addCleanup(
StopRpc(self, [('listener', 'stop'), ('driver', 'cleanup')])
)
class StopRpc(object):
def __init__(self, obj, attrs_and_stops):
self.obj = obj
self.attrs_and_stops = attrs_and_stops
def __call__(self):
for attr, stop in self.attrs_and_stops:
if hasattr(self.obj, attr):
obj_attr = getattr(self.obj, attr)
if hasattr(obj_attr, stop):
obj_attr_stop = getattr(obj_attr, stop)
obj_attr_stop()

View File

@ -1,232 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import logging.handlers
import multiprocessing
import os
import sys
import threading
import time
import uuid
from oslo_config import cfg
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests.functional import utils
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class QueueHandler(logging.Handler):
"""This is a logging handler which sends events to a multiprocessing queue.
The plan is to add it to Python 3.2, but this can be copy pasted into
user code for use with earlier Python versions.
"""
def __init__(self, queue):
"""Initialise an instance, using the passed queue."""
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
"""Emit a record.
Writes the LogRecord to the queue.
"""
try:
ei = record.exc_info
if ei:
# just to get traceback text into record.exc_text
dummy = self.format(record) # noqa
record.exc_info = None # not needed any more
self.queue.put_nowait(record)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)
def listener_configurer(conf):
root = logging.getLogger()
h = logging.StreamHandler(sys.stdout)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s '
'%(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
"/" + "zmq_multiproc.log"
file_handler = logging.StreamHandler(open(log_path, 'w'))
file_handler.setFormatter(f)
root.addHandler(file_handler)
def server_configurer(queue):
h = QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.DEBUG)
def listener_thread(queue, configurer, conf):
configurer(conf)
while True:
time.sleep(0.3)
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except (KeyboardInterrupt, SystemExit):
raise
class Client(oslo_messaging.RPCClient):
def __init__(self, transport, topic):
super(Client, self).__init__(
transport=transport, target=oslo_messaging.Target(topic=topic))
self.replies = []
def call_a(self):
LOG.warning("call_a - client side")
rep = self.call({}, 'call_a')
LOG.warning("after call_a - client side")
self.replies.append(rep)
return rep
class ReplyServerEndpoint(object):
def call_a(self, *args, **kwargs):
LOG.warning("call_a - Server endpoint reached!")
return "OK"
class Server(object):
def __init__(self, conf, log_queue, transport_url, name, topic=None):
self.conf = conf
self.log_queue = log_queue
self.transport_url = transport_url
self.name = name
self.topic = topic or str(uuid.uuid4())
self.ready = multiprocessing.Value('b', False)
self._stop = multiprocessing.Event()
def start(self):
self.process = multiprocessing.Process(target=self._run_server,
name=self.name,
args=(self.conf,
self.transport_url,
self.log_queue,
self.ready))
self.process.start()
LOG.debug("Server process started: pid: %d", self.process.pid)
def _run_server(self, conf, url, log_queue, ready):
server_configurer(log_queue)
LOG.debug("Starting RPC server")
transport = oslo_messaging.get_transport(conf, url=url)
target = oslo_messaging.Target(topic=self.topic, server=self.name)
self.rpc_server = oslo_messaging.get_rpc_server(
transport=transport, target=target,
endpoints=[ReplyServerEndpoint()],
executor='eventlet')
self.rpc_server.start()
ready.value = True
LOG.debug("RPC server being started")
while not self._stop.is_set():
LOG.debug("Waiting for the stop signal ...")
time.sleep(1)
self.rpc_server.stop()
self.rpc_server.wait()
LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid())
def cleanup(self):
LOG.debug("Stopping server")
self.shutdown()
def shutdown(self):
self._stop.set()
def restart(self, time_for_restart=1):
pass
def hang(self):
pass
def crash(self):
pass
def ping(self):
pass
class MultiprocTestCase(utils.SkipIfNoTransportURL):
def setUp(self):
super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts())
if not self.url.startswith("zmq"):
self.skipTest("ZeroMQ specific skipped...")
self.transport = oslo_messaging.get_transport(self.conf, url=self.url)
LOG.debug("Start log queue")
self.log_queue = multiprocessing.Queue()
self.log_listener = threading.Thread(target=listener_thread,
args=(self.log_queue,
listener_configurer,
self.conf))
self.log_listener.start()
self.spawned = []
self.conf.prog = "test_prog"
self.conf.project = "test_project"
def tearDown(self):
for process in self.spawned:
process.cleanup()
super(MultiprocTestCase, self).tearDown()
def get_client(self, topic):
return Client(self.transport, topic)
def spawn_server(self, wait_for_server=False, topic=None):
name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8])
server = Server(self.conf, self.log_queue, self.url, name, topic)
LOG.debug("[SPAWN] %s (starting)...", server.name)
server.start()
if wait_for_server:
while not server.ready.value:
LOG.debug("[SPAWN] %s (waiting for server ready)...",
server.name)
time.sleep(1)
LOG.debug("[SPAWN] Server %s:%d started.",
server.name, server.process.pid)
self.spawned.append(server)
return server
def spawn_servers(self, number, wait_for_server=False, common_topic=True):
topic = str(uuid.uuid4()) if common_topic else None
for _ in range(number):
self.spawn_server(wait_for_server, topic)

View File

@ -1,49 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
from oslo_messaging.tests.functional.zmq import multiproc_utils
class StartupOrderTestCase(multiproc_utils.MultiprocTestCase):
def setUp(self):
super(StartupOrderTestCase, self).setUp()
self.conf.prog = "test_prog"
self.conf.project = "test_project"
self.config(rpc_response_timeout=10)
log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
str(os.getpid()) + ".log")
sys.stdout = open(log_path, "wb", buffering=0)
def test_call_client_wait_for_server(self):
server = self.spawn_server(wait_for_server=True)
client = self.get_client(server.topic)
for _ in range(3):
reply = client.call_a()
self.assertIsNotNone(reply)
self.assertEqual(3, len(client.replies))
def test_call_client_dont_wait_for_server(self):
server = self.spawn_server(wait_for_server=False)
client = self.get_client(server.topic)
for _ in range(3):
reply = client.call_a()
self.assertIsNotNone(reply)
self.assertEqual(3, len(client.replies))