From 3332d4f1be47d5a679312d31377489ba0115e5a2 Mon Sep 17 00:00:00 2001 From: armando-migliaccio Date: Fri, 21 Nov 2014 14:41:05 -0800 Subject: [PATCH] Fix AttributeError during startup of ovs agent in DVR mode Make sure the agent starts processing incoming requests only after the entire initialization is complete. This is done by making explicit when the rpc loop is supposed to start, i.e. right at the end of the init process. This fix was necessary because the agent starts processing rpc messages even though it has not completed the entire initialization of bridges and data structures; this is usually okay, but in case of DVR, this leads to a situation where during the the first run, the agent asks the server to be assigned a MAC address; this in turn leads the server to fanout the generated MAC to the running agents, the requesting one included; because of the incomplete setup, the above mentioned error occurs. During subsequent restarts, the problem no longer appears. Closes-bug: #1395196 Conflicts: neutron/plugins/openvswitch/agent/ovs_neutron_agent.py (cherry picked from commit aa728c00bab3bb6b3e00866e44ca054b1e848bb6) Change-Id: I792697b94fef39971693cf8aff715c270601cecb --- neutron/agent/rpc.py | 6 ++-- .../openvswitch/agent/ovs_neutron_agent.py | 6 +++- neutron/tests/unit/test_agent_rpc.py | 30 +++++++++++++++---- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 7777a6673dd..25589ed38da 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -26,7 +26,7 @@ from neutron.openstack.common import timeutils LOG = logging.getLogger(__name__) -def create_consumers(endpoints, prefix, topic_details): +def create_consumers(endpoints, prefix, topic_details, start_listening=True): """Create agent RPC consumers. :param endpoints: The list of endpoints to process the incoming messages. @@ -34,6 +34,7 @@ def create_consumers(endpoints, prefix, topic_details): :param topic_details: A list of topics. Each topic has a name, an operation, and an optional host param keying the subscription to topic.host for plugin calls. + :param start_listening: if True, it starts the processing loop :returns: A common Connection. """ @@ -50,7 +51,8 @@ def create_consumers(endpoints, prefix, topic_details): connection.create_consumer(node_topic_name, endpoints, fanout=False) - connection.consume_in_threads() + if start_listening: + connection.consume_in_threads() return connection diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 8901cdeb9b2..68325a3ca5b 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -250,6 +250,9 @@ class OVSNeutronAgent(n_rpc.RpcCallback, self.iter_num = 0 self.run_daemon_loop = True + # The initialization is complete; we can start receiving messages + self.connection.consume_in_threads() + def _report_state(self): # How many devices are likely used by a VM self.agent_state.get('configurations')['devices'] = ( @@ -284,7 +287,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback, topics.UPDATE, cfg.CONF.host]) self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, - consumers) + consumers, + start_listening=False) report_interval = cfg.CONF.AGENT.report_interval if report_interval: heartbeat = loopingcall.FixedIntervalLoopingCall( diff --git a/neutron/tests/unit/test_agent_rpc.py b/neutron/tests/unit/test_agent_rpc.py index c0f7e0ddd70..1ca27ae9c57 100644 --- a/neutron/tests/unit/test_agent_rpc.py +++ b/neutron/tests/unit/test_agent_rpc.py @@ -94,7 +94,16 @@ class AgentPluginReportState(base.BaseTestCase): class AgentRPCMethods(base.BaseTestCase): - def test_create_consumers(self): + + def _test_create_consumers( + self, endpoints, method, expected, topics, listen): + call_to_patch = 'neutron.common.rpc.create_connection' + with mock.patch(call_to_patch) as create_connection: + rpc.create_consumers( + endpoints, method, topics, start_listening=listen) + create_connection.assert_has_calls(expected) + + def test_create_consumers_start_listening(self): endpoints = [mock.Mock()] expected = [ mock.call(new=True), @@ -102,11 +111,22 @@ class AgentRPCMethods(base.BaseTestCase): fanout=True), mock.call().consume_in_threads() ] + method = 'foo' + topics = [('topic', 'op')] + self._test_create_consumers( + endpoints, method, expected, topics, True) - call_to_patch = 'neutron.common.rpc.create_connection' - with mock.patch(call_to_patch) as create_connection: - rpc.create_consumers(endpoints, 'foo', [('topic', 'op')]) - create_connection.assert_has_calls(expected) + def test_create_consumers_do_not_listen(self): + endpoints = [mock.Mock()] + expected = [ + mock.call(new=True), + mock.call().create_consumer('foo-topic-op', endpoints, + fanout=True), + ] + method = 'foo' + topics = [('topic', 'op')] + self._test_create_consumers( + endpoints, method, expected, topics, False) def test_create_consumers_with_node_name(self): endpoints = [mock.Mock()]