From e75df856eae749ece7bdc7cbc9ee77d7cd848297 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Tue, 29 Nov 2011 09:01:16 -0800 Subject: [PATCH] Fix RPC responses to allow None response correctly. Fixes bug 897155 Also adds a new fake rpc implementation that tests use by default. This speeds up the test run by ~10% on my system. We can decide to ditch fake_rabbit at some point later.. Change-Id: I8877fad3d41ae055c15b1adff99e535c34e9ce92 --- nova/rpc/impl_carrot.py | 84 +++++----- nova/rpc/impl_fake.py | 146 ++++++++++++++++++ nova/rpc/impl_kombu.py | 23 ++- nova/test.py | 1 - nova/tests/fake_flags.py | 1 + nova/tests/rpc/__init__.py | 19 +++ .../{test_rpc_common.py => rpc/common.py} | 18 +++ .../test_carrot.py} | 4 +- nova/tests/rpc/test_fake.py | 36 +++++ .../{test_rpc_kombu.py => rpc/test_kombu.py} | 4 +- nova/tests/{ => rpc}/test_rpc.py | 8 +- nova/tests/test_adminapi.py | 11 +- nova/tests/xenapi/stubs.py | 10 -- run_tests.py | 2 + 14 files changed, 294 insertions(+), 73 deletions(-) create mode 100644 nova/rpc/impl_fake.py create mode 100644 nova/tests/rpc/__init__.py rename nova/tests/{test_rpc_common.py => rpc/common.py} (92%) rename nova/tests/{test_rpc_carrot.py => rpc/test_carrot.py} (93%) create mode 100644 nova/tests/rpc/test_fake.py rename nova/tests/{test_rpc_kombu.py => rpc/test_kombu.py} (97%) rename nova/tests/{ => rpc}/test_rpc.py (85%) diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py index 2a518d7d7..57fd074f0 100644 --- a/nova/rpc/impl_carrot.py +++ b/nova/rpc/impl_carrot.py @@ -266,14 +266,13 @@ class AdapterConsumer(Consumer): # we just log the message and send an error string # back to the caller LOG.warn(_('no method for message: %s') % message_data) - if msg_id: - msg_reply(msg_id, - _('No method for message: %s') % message_data) + ctxt.reply(msg_id, + _('No method for message: %s') % message_data) return - self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) + self.pool.spawn_n(self._process_data, ctxt, method, args) @exception.wrap_exception() - def _process_data(self, msg_id, ctxt, method, args): + def _process_data(self, ctxt, method, args): """Thread that magically looks for a method on the proxy object and calls it. """ @@ -283,23 +282,18 @@ class AdapterConsumer(Consumer): # NOTE(vish): magic is fun! try: rval = node_func(context=ctxt, **node_args) - if msg_id: - # Check if the result was a generator - if isinstance(rval, types.GeneratorType): - for x in rval: - msg_reply(msg_id, x, None) - else: - msg_reply(msg_id, rval, None) + # Check if the result was a generator + if isinstance(rval, types.GeneratorType): + for x in rval: + ctxt.reply(x, None) + else: + ctxt.reply(rval, None) - # This final None tells multicall that it is done. - msg_reply(msg_id, None, None) - elif isinstance(rval, types.GeneratorType): - # NOTE(vish): this iterates through the generator - list(rval) + # This final None tells multicall that it is done. + ctxt.reply(ending=True) except Exception as e: LOG.exception('Exception during message handling') - if msg_id: - msg_reply(msg_id, None, sys.exc_info()) + ctxt.reply(None, sys.exc_info()) return @@ -447,7 +441,7 @@ class DirectPublisher(Publisher): super(DirectPublisher, self).__init__(connection=connection) -def msg_reply(msg_id, reply=None, failure=None): +def msg_reply(msg_id, reply=None, failure=None, ending=False): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -463,12 +457,17 @@ def msg_reply(msg_id, reply=None, failure=None): with ConnectionPool.item() as conn: publisher = DirectPublisher(connection=conn, msg_id=msg_id) try: - publisher.send({'result': reply, 'failure': failure}) + msg = {'result': reply, 'failure': failure} + if ending: + msg['ending'] = True + publisher.send(msg) except TypeError: - publisher.send( - {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure}) + msg = {'result': dict((k, repr(v)) + for k, v in reply.__dict__.iteritems()), + 'failure': failure} + if ending: + msg['ending'] = True + publisher.send(msg) publisher.close() @@ -508,8 +507,11 @@ class RpcContext(context.RequestContext): self.msg_id = msg_id super(RpcContext, self).__init__(*args, **kwargs) - def reply(self, *args, **kwargs): - msg_reply(self.msg_id, *args, **kwargs) + def reply(self, reply=None, failure=None, ending=False): + if self.msg_id: + msg_reply(self.msg_id, reply, failure, ending) + if ending: + self.msg_id = None def multicall(context, topic, msg): @@ -537,8 +539,11 @@ class MulticallWaiter(object): self._consumer = consumer self._results = queue.Queue() self._closed = False + self._got_ending = False def close(self): + if self._closed: + return self._closed = True self._consumer.close() ConnectionPool.put(self._consumer.connection) @@ -548,6 +553,8 @@ class MulticallWaiter(object): message.ack() if data['failure']: self._results.put(RemoteError(*data['failure'])) + elif data.get('ending', False): + self._got_ending = True else: self._results.put(data['result']) @@ -555,23 +562,22 @@ class MulticallWaiter(object): return self.wait() def wait(self): - while True: - rv = None - while rv is None and not self._closed: - try: - rv = self._consumer.fetch(enable_callbacks=True) - except Exception: - self.close() - raise + while not self._closed: + try: + rv = self._consumer.fetch(enable_callbacks=True) + except Exception: + self.close() + raise + if rv is None: time.sleep(0.01) - + continue + if self._got_ending: + self.close() + raise StopIteration result = self._results.get() if isinstance(result, Exception): self.close() raise result - if result == None: - self.close() - raise StopIteration yield result diff --git a/nova/rpc/impl_fake.py b/nova/rpc/impl_fake.py new file mode 100644 index 000000000..9a94be07e --- /dev/null +++ b/nova/rpc/impl_fake.py @@ -0,0 +1,146 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Fake RPC implementation which calls proxy methods directly with no +queues. Casts will block, but this is very useful for tests. +""" + +import sys +import traceback +import types + +from nova import context +from nova.rpc import common as rpc_common + +CONSUMERS = {} + + +class RpcContext(context.RequestContext): + def __init__(self, *args, **kwargs): + super(RpcContext, self).__init__(*args, **kwargs) + self._response = [] + self._done = False + + def reply(self, reply=None, failure=None, ending=False): + if ending: + self._done = True + if not self._done: + self._response.append((reply, failure)) + + +class Consumer(object): + def __init__(self, topic, proxy): + self.topic = topic + self.proxy = proxy + + def call(self, context, method, args): + node_func = getattr(self.proxy, method) + node_args = dict((str(k), v) for k, v in args.iteritems()) + + ctxt = RpcContext.from_dict(context.to_dict()) + try: + rval = node_func(context=ctxt, **node_args) + # Caller might have called ctxt.reply() manually + for (reply, failure) in ctxt._response: + if failure: + raise failure[0], failure[1], failure[2] + yield reply + # if ending not 'sent'...we might have more data to + # return from the function itself + if not ctxt._done: + if isinstance(rval, types.GeneratorType): + for val in rval: + yield val + else: + yield rval + except Exception: + exc_info = sys.exc_info() + raise rpc_common.RemoteError(exc_info[0].__name__, + str(exc_info[1]), + traceback.format_exception(*exc_info)) + + +class Connection(object): + """Connection object.""" + + def __init__(self): + self.consumers = [] + + def create_consumer(self, topic, proxy, fanout=False): + consumer = Consumer(topic, proxy) + self.consumers.append(consumer) + if topic not in CONSUMERS: + CONSUMERS[topic] = [] + CONSUMERS[topic].append(consumer) + + def close(self): + for consumer in self.consumers: + CONSUMERS[consumer.topic].remove(consumer) + self.consumers = [] + + def consume_in_thread(self): + pass + + +def create_connection(new=True): + """Create a connection""" + return Connection() + + +def multicall(context, topic, msg): + """Make a call that returns multiple times.""" + + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + + try: + consumer = CONSUMERS[topic][0] + except (KeyError, IndexError): + return iter([None]) + else: + return consumer.call(context, method, args) + + +def call(context, topic, msg): + """Sends a message on a topic and wait for a response.""" + rv = multicall(context, topic, msg) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(context, topic, msg): + try: + call(context, topic, msg) + except rpc_common.RemoteError: + pass + + +def fanout_cast(context, topic, msg): + """Cast to all consumers of a topic""" + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + + for consumer in CONSUMERS.get(topic, []): + try: + consumer.call(context, method, args) + except rpc_common.RemoteError: + pass diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 1b80fce04..757e7636a 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -625,7 +625,7 @@ class ProxyCallback(object): else: ctxt.reply(rval, None) # This final None tells multicall that it is done. - ctxt.reply(None, None) + ctxt.reply(ending=True) except Exception as e: LOG.exception('Exception during message handling') ctxt.reply(None, sys.exc_info()) @@ -668,9 +668,11 @@ class RpcContext(context.RequestContext): self.msg_id = msg_id super(RpcContext, self).__init__(*args, **kwargs) - def reply(self, *args, **kwargs): + def reply(self, reply=None, failure=None, ending=False): if self.msg_id: - msg_reply(self.msg_id, *args, **kwargs) + msg_reply(self.msg_id, reply, failure, ending) + if ending: + self.msg_id = None class MulticallWaiter(object): @@ -679,8 +681,11 @@ class MulticallWaiter(object): self._iterator = connection.iterconsume() self._result = None self._done = False + self._got_ending = False def done(self): + if self._done: + return self._done = True self._iterator.close() self._iterator = None @@ -690,6 +695,8 @@ class MulticallWaiter(object): """The consume() callback will call this. Store the result.""" if data['failure']: self._result = RemoteError(*data['failure']) + elif data.get('ending', False): + self._got_ending = True else: self._result = data['result'] @@ -699,13 +706,13 @@ class MulticallWaiter(object): raise StopIteration while True: self._iterator.next() + if self._got_ending: + self.done() + raise StopIteration result = self._result if isinstance(result, Exception): self.done() raise result - if result == None: - self.done() - raise StopIteration yield result @@ -759,7 +766,7 @@ def fanout_cast(context, topic, msg): conn.fanout_send(topic, msg) -def msg_reply(msg_id, reply=None, failure=None): +def msg_reply(msg_id, reply=None, failure=None, ending=False): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -779,4 +786,6 @@ def msg_reply(msg_id, reply=None, failure=None): msg = {'result': dict((k, repr(v)) for k, v in reply.__dict__.iteritems()), 'failure': failure} + if ending: + msg['ending'] = True conn.direct_send(msg_id, msg) diff --git a/nova/test.py b/nova/test.py index abd1294d4..6c565f53d 100644 --- a/nova/test.py +++ b/nova/test.py @@ -34,7 +34,6 @@ import nose.plugins.skip import nova.image.fake import shutil import stubout -from eventlet import greenthread from nova import fakerabbit from nova import flags diff --git a/nova/tests/fake_flags.py b/nova/tests/fake_flags.py index 13fb6c6ca..fc7bb059a 100644 --- a/nova/tests/fake_flags.py +++ b/nova/tests/fake_flags.py @@ -24,6 +24,7 @@ flags.DECLARE('volume_driver', 'nova.volume.manager') FLAGS['volume_driver'].SetDefault('nova.volume.driver.FakeISCSIDriver') FLAGS['connection_type'].SetDefault('fake') FLAGS['fake_rabbit'].SetDefault(True) +FLAGS['rpc_backend'].SetDefault('nova.rpc.impl_fake') flags.DECLARE('auth_driver', 'nova.auth.manager') FLAGS['auth_driver'].SetDefault('nova.auth.dbdriver.DbDriver') flags.DECLARE('network_size', 'nova.network.manager') diff --git a/nova/tests/rpc/__init__.py b/nova/tests/rpc/__init__.py new file mode 100644 index 000000000..6dab802f2 --- /dev/null +++ b/nova/tests/rpc/__init__.py @@ -0,0 +1,19 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Openstack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work +from nova.tests import * diff --git a/nova/tests/test_rpc_common.py b/nova/tests/rpc/common.py similarity index 92% rename from nova/tests/test_rpc_common.py rename to nova/tests/rpc/common.py index 4ab4e8a0e..dc8aafcfe 100644 --- a/nova/tests/test_rpc_common.py +++ b/nova/tests/rpc/common.py @@ -81,6 +81,17 @@ class _BaseRpcTestCase(test.TestCase): for i, x in enumerate(result): self.assertEqual(value + i, x) + def test_multicall_three_nones(self): + value = 42 + result = self.rpc.multicall(self.context, + 'test', + {"method": "multicall_three_nones", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(x, None) + # i should have been 0, 1, and finally 2: + self.assertEqual(i, 2) + def test_multicall_succeed_three_times_yield(self): value = 42 result = self.rpc.multicall(self.context, @@ -176,6 +187,13 @@ class TestReceiver(object): context.reply(value) context.reply(value + 1) context.reply(value + 2) + context.reply(ending=True) + + @staticmethod + def multicall_three_nones(context, value): + yield None + yield None + yield None @staticmethod def echo_three_times_yield(context, value): diff --git a/nova/tests/test_rpc_carrot.py b/nova/tests/rpc/test_carrot.py similarity index 93% rename from nova/tests/test_rpc_carrot.py rename to nova/tests/rpc/test_carrot.py index 57cdebf4f..fa9f73961 100644 --- a/nova/tests/test_rpc_carrot.py +++ b/nova/tests/rpc/test_carrot.py @@ -22,13 +22,13 @@ Unit Tests for remote procedure calls using carrot from nova import context from nova import log as logging from nova.rpc import impl_carrot -from nova.tests import test_rpc_common +from nova.tests.rpc import common LOG = logging.getLogger('nova.tests.rpc') -class RpcCarrotTestCase(test_rpc_common._BaseRpcTestCase): +class RpcCarrotTestCase(common._BaseRpcTestCase): def setUp(self): self.rpc = impl_carrot super(RpcCarrotTestCase, self).setUp() diff --git a/nova/tests/rpc/test_fake.py b/nova/tests/rpc/test_fake.py new file mode 100644 index 000000000..344c44628 --- /dev/null +++ b/nova/tests/rpc/test_fake.py @@ -0,0 +1,36 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls using fake_impl +""" + +from nova import log as logging +from nova.rpc import impl_fake +from nova.tests.rpc import common + + +LOG = logging.getLogger('nova.tests.rpc') + + +class RpcFakeTestCase(common._BaseRpcTestCase): + def setUp(self): + self.rpc = impl_fake + super(RpcFakeTestCase, self).setUp() + + def tearDown(self): + super(RpcFakeTestCase, self).tearDown() diff --git a/nova/tests/test_rpc_kombu.py b/nova/tests/rpc/test_kombu.py similarity index 97% rename from nova/tests/test_rpc_kombu.py rename to nova/tests/rpc/test_kombu.py index 101ed14af..01b00f33d 100644 --- a/nova/tests/test_rpc_kombu.py +++ b/nova/tests/rpc/test_kombu.py @@ -23,13 +23,13 @@ from nova import context from nova import log as logging from nova import test from nova.rpc import impl_kombu -from nova.tests import test_rpc_common +from nova.tests.rpc import common LOG = logging.getLogger('nova.tests.rpc') -class RpcKombuTestCase(test_rpc_common._BaseRpcTestCase): +class RpcKombuTestCase(common._BaseRpcTestCase): def setUp(self): self.rpc = impl_kombu super(RpcKombuTestCase, self).setUp() diff --git a/nova/tests/test_rpc.py b/nova/tests/rpc/test_rpc.py similarity index 85% rename from nova/tests/test_rpc.py rename to nova/tests/rpc/test_rpc.py index 6b4454747..4524391f8 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/rpc/test_rpc.py @@ -16,20 +16,20 @@ # License for the specific language governing permissions and limitations # under the License. """ -Unit Tests for remote procedure calls using queue +Unit Tests for remote procedure call interfaces """ -from nova import context from nova import log as logging from nova import rpc -from nova.tests import test_rpc_common +from nova.tests.rpc import common LOG = logging.getLogger('nova.tests.rpc') -class RpcTestCase(test_rpc_common._BaseRpcTestCase): +class RpcTestCase(common._BaseRpcTestCase): def setUp(self): + self.flags(rpc_backend='nova.tests.rpc.fake') self.rpc = rpc super(RpcTestCase, self).setUp() diff --git a/nova/tests/test_adminapi.py b/nova/tests/test_adminapi.py index aaa633adc..08c8f707a 100644 --- a/nova/tests/test_adminapi.py +++ b/nova/tests/test_adminapi.py @@ -61,14 +61,9 @@ class AdminApiTestCase(test.TestCase): self.stubs.Set(fake._FakeImageService, 'show', fake_show) self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show) - # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish - rpc_cast = rpc.cast - - def finish_cast(*args, **kwargs): - rpc_cast(*args, **kwargs) - greenthread.sleep(0.2) - - self.stubs.Set(rpc, 'cast', finish_cast) + # NOTE(comstud): Make 'cast' behave like a 'call' which will + # ensure that operations complete + self.stubs.Set(rpc, 'cast', rpc.call) def test_block_external_ips(self): """Make sure provider firewall rules are created.""" diff --git a/nova/tests/xenapi/stubs.py b/nova/tests/xenapi/stubs.py index 7c5634c5a..96c4d9cbe 100644 --- a/nova/tests/xenapi/stubs.py +++ b/nova/tests/xenapi/stubs.py @@ -132,16 +132,6 @@ def stubout_loopingcall_start(stubs): stubs.Set(utils.LoopingCall, 'start', fake_start) -def stubout_loopingcall_delay(stubs): - def fake_start(self, interval, now=True): - self._running = True - eventlet.sleep(1) - self.f(*self.args, **self.kw) - # This would fail before parallel xenapi calls were fixed - assert self._running == False - stubs.Set(utils.LoopingCall, 'start', fake_start) - - def _make_fake_vdi(): sr_ref = fake.get_all('SR')[0] vdi_ref = fake.create_vdi('', False, sr_ref, False) diff --git a/run_tests.py b/run_tests.py index fd836967e..17547b8e0 100644 --- a/run_tests.py +++ b/run_tests.py @@ -64,6 +64,7 @@ import time gettext.install('nova', unicode=1) +import eventlet from nose import config from nose import core from nose import result @@ -336,6 +337,7 @@ class NovaTestRunner(core.TextTestRunner): if __name__ == '__main__': + eventlet.monkey_patch() logging.setup() # If any argument looks like a test name but doesn't have "nova.tests" in # front of it, automatically add that so we don't have to type as much