Ensure that the report state is not a blocking call
Fixes bug 1191768
For the dhcp and l3 agents the first state report will be done
via a call. If this succeeds then subsequent calls will be done via
the cast method.
Change-Id: I82a1d92fc84983b7bb46758db0aee3e3eca1d3be
(cherry picked from commit 0407b59c25
)
This commit is contained in:
parent
0d59115cfa
commit
06f679df5d
@ -689,6 +689,7 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
|||||||
'start_flag': True,
|
'start_flag': True,
|
||||||
'agent_type': constants.AGENT_TYPE_DHCP}
|
'agent_type': constants.AGENT_TYPE_DHCP}
|
||||||
report_interval = cfg.CONF.AGENT.report_interval
|
report_interval = cfg.CONF.AGENT.report_interval
|
||||||
|
self.use_call = True
|
||||||
if report_interval:
|
if report_interval:
|
||||||
self.heartbeat = loopingcall.LoopingCall(self._report_state)
|
self.heartbeat = loopingcall.LoopingCall(self._report_state)
|
||||||
self.heartbeat.start(interval=report_interval)
|
self.heartbeat.start(interval=report_interval)
|
||||||
@ -698,8 +699,8 @@ class DhcpAgentWithStateReport(DhcpAgent):
|
|||||||
self.agent_state.get('configurations').update(
|
self.agent_state.get('configurations').update(
|
||||||
self.cache.get_state())
|
self.cache.get_state())
|
||||||
ctx = context.get_admin_context_without_session()
|
ctx = context.get_admin_context_without_session()
|
||||||
self.state_rpc.report_state(ctx,
|
self.state_rpc.report_state(ctx, self.agent_state, self.use_call)
|
||||||
self.agent_state)
|
self.use_call = False
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
# This means the server does not support report_state
|
# This means the server does not support report_state
|
||||||
LOG.warn(_("Quantum server does not support state report."
|
LOG.warn(_("Quantum server does not support state report."
|
||||||
|
@ -695,6 +695,7 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
|||||||
'start_flag': True,
|
'start_flag': True,
|
||||||
'agent_type': l3_constants.AGENT_TYPE_L3}
|
'agent_type': l3_constants.AGENT_TYPE_L3}
|
||||||
report_interval = cfg.CONF.AGENT.report_interval
|
report_interval = cfg.CONF.AGENT.report_interval
|
||||||
|
self.use_call = True
|
||||||
if report_interval:
|
if report_interval:
|
||||||
self.heartbeat = loopingcall.LoopingCall(self._report_state)
|
self.heartbeat = loopingcall.LoopingCall(self._report_state)
|
||||||
self.heartbeat.start(interval=report_interval)
|
self.heartbeat.start(interval=report_interval)
|
||||||
@ -719,9 +720,10 @@ class L3NATAgentWithStateReport(L3NATAgent):
|
|||||||
configurations['interfaces'] = num_interfaces
|
configurations['interfaces'] = num_interfaces
|
||||||
configurations['floating_ips'] = num_floating_ips
|
configurations['floating_ips'] = num_floating_ips
|
||||||
try:
|
try:
|
||||||
self.state_rpc.report_state(self.context,
|
self.state_rpc.report_state(self.context, self.agent_state,
|
||||||
self.agent_state)
|
self.use_call)
|
||||||
self.agent_state.pop('start_flag', None)
|
self.agent_state.pop('start_flag', None)
|
||||||
|
self.use_call = False
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
# This means the server does not support report_state
|
# This means the server does not support report_state
|
||||||
LOG.warn(_("Quantum server does not support state report."
|
LOG.warn(_("Quantum server does not support state report."
|
||||||
|
@ -57,13 +57,15 @@ class PluginReportStateAPI(proxy.RpcProxy):
|
|||||||
super(PluginReportStateAPI, self).__init__(
|
super(PluginReportStateAPI, self).__init__(
|
||||||
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
topic=topic, default_version=self.BASE_RPC_API_VERSION)
|
||||||
|
|
||||||
def report_state(self, context, agent_state):
|
def report_state(self, context, agent_state, use_call=False):
|
||||||
return self.call(context,
|
msg = self.make_msg('report_state',
|
||||||
self.make_msg('report_state',
|
|
||||||
agent_state={'agent_state':
|
agent_state={'agent_state':
|
||||||
agent_state},
|
agent_state},
|
||||||
time=timeutils.strtime()),
|
time=timeutils.strtime())
|
||||||
topic=self.topic)
|
if use_call:
|
||||||
|
return self.call(context, msg, topic=self.topic)
|
||||||
|
else:
|
||||||
|
return self.cast(context, msg, topic=self.topic)
|
||||||
|
|
||||||
|
|
||||||
class PluginApi(proxy.RpcProxy):
|
class PluginApi(proxy.RpcProxy):
|
||||||
|
@ -48,13 +48,14 @@ class AgentRPCPluginApi(base.BaseTestCase):
|
|||||||
|
|
||||||
|
|
||||||
class AgentPluginReportState(base.BaseTestCase):
|
class AgentPluginReportState(base.BaseTestCase):
|
||||||
def test_plugin_report_state(self):
|
def test_plugin_report_state_use_call(self):
|
||||||
topic = 'test'
|
topic = 'test'
|
||||||
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||||
expected_agent_state = {'agent': 'test'}
|
expected_agent_state = {'agent': 'test'}
|
||||||
with mock.patch.object(reportStateAPI, 'call') as call:
|
with mock.patch.object(reportStateAPI, 'call') as call:
|
||||||
ctxt = context.RequestContext('fake_user', 'fake_project')
|
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||||
reportStateAPI.report_state(ctxt, expected_agent_state)
|
reportStateAPI.report_state(ctxt, expected_agent_state,
|
||||||
|
use_call=True)
|
||||||
self.assertEqual(call.call_args[0][0], ctxt)
|
self.assertEqual(call.call_args[0][0], ctxt)
|
||||||
self.assertEqual(call.call_args[0][1]['method'],
|
self.assertEqual(call.call_args[0][1]['method'],
|
||||||
'report_state')
|
'report_state')
|
||||||
@ -64,6 +65,22 @@ class AgentPluginReportState(base.BaseTestCase):
|
|||||||
str)
|
str)
|
||||||
self.assertEqual(call.call_args[1]['topic'], topic)
|
self.assertEqual(call.call_args[1]['topic'], topic)
|
||||||
|
|
||||||
|
def test_plugin_report_state_cast(self):
|
||||||
|
topic = 'test'
|
||||||
|
reportStateAPI = rpc.PluginReportStateAPI(topic)
|
||||||
|
expected_agent_state = {'agent': 'test'}
|
||||||
|
with mock.patch.object(reportStateAPI, 'cast') as cast:
|
||||||
|
ctxt = context.RequestContext('fake_user', 'fake_project')
|
||||||
|
reportStateAPI.report_state(ctxt, expected_agent_state)
|
||||||
|
self.assertEqual(cast.call_args[0][0], ctxt)
|
||||||
|
self.assertEqual(cast.call_args[0][1]['method'],
|
||||||
|
'report_state')
|
||||||
|
self.assertEqual(cast.call_args[0][1]['args']['agent_state'],
|
||||||
|
{'agent_state': expected_agent_state})
|
||||||
|
self.assertIsInstance(cast.call_args[0][1]['args']['time'],
|
||||||
|
str)
|
||||||
|
self.assertEqual(cast.call_args[1]['topic'], topic)
|
||||||
|
|
||||||
|
|
||||||
class AgentRPCMethods(base.BaseTestCase):
|
class AgentRPCMethods(base.BaseTestCase):
|
||||||
def test_create_consumers(self):
|
def test_create_consumers(self):
|
||||||
|
@ -163,7 +163,8 @@ class TestDhcpAgent(base.BaseTestCase):
|
|||||||
agent_mgr)
|
agent_mgr)
|
||||||
state_rpc.assert_has_calls(
|
state_rpc.assert_has_calls(
|
||||||
[mock.call(mock.ANY),
|
[mock.call(mock.ANY),
|
||||||
mock.call().report_state(mock.ANY, mock.ANY)])
|
mock.call().report_state(mock.ANY, mock.ANY,
|
||||||
|
mock.ANY)])
|
||||||
mock_lease_relay.assert_has_calls(
|
mock_lease_relay.assert_has_calls(
|
||||||
[mock.call(mock.ANY),
|
[mock.call(mock.ANY),
|
||||||
mock.call().start()])
|
mock.call().start()])
|
||||||
|
Loading…
Reference in New Issue
Block a user