Fix AttributeError during startup of ovs agent in DVR mode
Make sure the agent starts processing incoming requests only after the entire
initialization is complete. This is done by making explicit when the rpc loop
is supposed to start, i.e. right at the end of the init process.
This fix was necessary because the agent starts processing rpc messages even
though it has not completed the entire initialization of bridges and data
structures; this is usually okay, but in case of DVR, this leads to a
situation where during the the first run, the agent asks the server to be
assigned a MAC address; this in turn leads the server to fanout the generated
MAC to the running agents, the requesting one included; because of the
incomplete setup, the above mentioned error occurs. During subsequent
restarts, the problem no longer appears.
Closes-bug: #1395196
Conflicts:
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
(cherry picked from commit aa728c00ba
)
Change-Id: I792697b94fef39971693cf8aff715c270601cecb
This commit is contained in:
parent
c0671644e4
commit
3332d4f1be
@ -26,7 +26,7 @@ from neutron.openstack.common import timeutils
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def create_consumers(endpoints, prefix, topic_details):
|
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
|
||||||
"""Create agent RPC consumers.
|
"""Create agent RPC consumers.
|
||||||
|
|
||||||
:param endpoints: The list of endpoints to process the incoming messages.
|
:param endpoints: The list of endpoints to process the incoming messages.
|
||||||
@ -34,6 +34,7 @@ def create_consumers(endpoints, prefix, topic_details):
|
|||||||
:param topic_details: A list of topics. Each topic has a name, an
|
:param topic_details: A list of topics. Each topic has a name, an
|
||||||
operation, and an optional host param keying the
|
operation, and an optional host param keying the
|
||||||
subscription to topic.host for plugin calls.
|
subscription to topic.host for plugin calls.
|
||||||
|
:param start_listening: if True, it starts the processing loop
|
||||||
|
|
||||||
:returns: A common Connection.
|
:returns: A common Connection.
|
||||||
"""
|
"""
|
||||||
@ -50,6 +51,7 @@ def create_consumers(endpoints, prefix, topic_details):
|
|||||||
connection.create_consumer(node_topic_name,
|
connection.create_consumer(node_topic_name,
|
||||||
endpoints,
|
endpoints,
|
||||||
fanout=False)
|
fanout=False)
|
||||||
|
if start_listening:
|
||||||
connection.consume_in_threads()
|
connection.consume_in_threads()
|
||||||
return connection
|
return connection
|
||||||
|
|
||||||
|
@ -250,6 +250,9 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
self.iter_num = 0
|
self.iter_num = 0
|
||||||
self.run_daemon_loop = True
|
self.run_daemon_loop = True
|
||||||
|
|
||||||
|
# The initialization is complete; we can start receiving messages
|
||||||
|
self.connection.consume_in_threads()
|
||||||
|
|
||||||
def _report_state(self):
|
def _report_state(self):
|
||||||
# How many devices are likely used by a VM
|
# How many devices are likely used by a VM
|
||||||
self.agent_state.get('configurations')['devices'] = (
|
self.agent_state.get('configurations')['devices'] = (
|
||||||
@ -284,7 +287,8 @@ class OVSNeutronAgent(n_rpc.RpcCallback,
|
|||||||
topics.UPDATE, cfg.CONF.host])
|
topics.UPDATE, cfg.CONF.host])
|
||||||
self.connection = agent_rpc.create_consumers(self.endpoints,
|
self.connection = agent_rpc.create_consumers(self.endpoints,
|
||||||
self.topic,
|
self.topic,
|
||||||
consumers)
|
consumers,
|
||||||
|
start_listening=False)
|
||||||
report_interval = cfg.CONF.AGENT.report_interval
|
report_interval = cfg.CONF.AGENT.report_interval
|
||||||
if report_interval:
|
if report_interval:
|
||||||
heartbeat = loopingcall.FixedIntervalLoopingCall(
|
heartbeat = loopingcall.FixedIntervalLoopingCall(
|
||||||
|
@ -94,7 +94,16 @@ class AgentPluginReportState(base.BaseTestCase):
|
|||||||
|
|
||||||
|
|
||||||
class AgentRPCMethods(base.BaseTestCase):
|
class AgentRPCMethods(base.BaseTestCase):
|
||||||
def test_create_consumers(self):
|
|
||||||
|
def _test_create_consumers(
|
||||||
|
self, endpoints, method, expected, topics, listen):
|
||||||
|
call_to_patch = 'neutron.common.rpc.create_connection'
|
||||||
|
with mock.patch(call_to_patch) as create_connection:
|
||||||
|
rpc.create_consumers(
|
||||||
|
endpoints, method, topics, start_listening=listen)
|
||||||
|
create_connection.assert_has_calls(expected)
|
||||||
|
|
||||||
|
def test_create_consumers_start_listening(self):
|
||||||
endpoints = [mock.Mock()]
|
endpoints = [mock.Mock()]
|
||||||
expected = [
|
expected = [
|
||||||
mock.call(new=True),
|
mock.call(new=True),
|
||||||
@ -102,11 +111,22 @@ class AgentRPCMethods(base.BaseTestCase):
|
|||||||
fanout=True),
|
fanout=True),
|
||||||
mock.call().consume_in_threads()
|
mock.call().consume_in_threads()
|
||||||
]
|
]
|
||||||
|
method = 'foo'
|
||||||
|
topics = [('topic', 'op')]
|
||||||
|
self._test_create_consumers(
|
||||||
|
endpoints, method, expected, topics, True)
|
||||||
|
|
||||||
call_to_patch = 'neutron.common.rpc.create_connection'
|
def test_create_consumers_do_not_listen(self):
|
||||||
with mock.patch(call_to_patch) as create_connection:
|
endpoints = [mock.Mock()]
|
||||||
rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
|
expected = [
|
||||||
create_connection.assert_has_calls(expected)
|
mock.call(new=True),
|
||||||
|
mock.call().create_consumer('foo-topic-op', endpoints,
|
||||||
|
fanout=True),
|
||||||
|
]
|
||||||
|
method = 'foo'
|
||||||
|
topics = [('topic', 'op')]
|
||||||
|
self._test_create_consumers(
|
||||||
|
endpoints, method, expected, topics, False)
|
||||||
|
|
||||||
def test_create_consumers_with_node_name(self):
|
def test_create_consumers_with_node_name(self):
|
||||||
endpoints = [mock.Mock()]
|
endpoints = [mock.Mock()]
|
||||||
|
Loading…
Reference in New Issue
Block a user