Send notification to controller about HA router state change

The L3 agent gets keepalived state change notifications via
a unix domain socket. These events are now batched and
send out as a single RPC to the server. In case the same
router got updated multiple times during the batch period,
only the latest state is sent.

Partially-Implements: blueprint report-ha-router-master
Change-Id: I36834ad3d9e8a49a702f01acc29c7c38f2d48833
This commit is contained in:
Assaf Muller 2014-09-28 14:26:42 +03:00
parent ff30a3a724
commit 79f64ab041
7 changed files with 139 additions and 8 deletions

View File

@ -73,6 +73,9 @@ class L3PluginApi(object):
- get_agent_gateway_port
Needed by the agent when operating in DVR/DVR_SNAT mode
1.3 - Get the list of activated services
1.4 - Added L3 HA update_router_state. This method was reworked in
to update_ha_routers_states
1.5 - Added update_ha_routers_states
"""
@ -120,6 +123,12 @@ class L3PluginApi(object):
cctxt = self.client.prepare(version='1.3')
return cctxt.call(context, 'get_service_plugin_list')
def update_ha_routers_states(self, context, states):
"""Update HA routers states."""
cctxt = self.client.prepare(version='1.5')
return cctxt.call(context, 'update_ha_routers_states',
host=self.host, states=states)
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,
ha.AgentMixin,

View File

@ -24,6 +24,7 @@ from neutron.agent.linux import keepalived
from neutron.agent.linux import utils as agent_utils
from neutron.common import constants as l3_constants
from neutron.i18n import _LE, _LI
from neutron.notifiers import batch_notifier
LOG = logging.getLogger(__name__)
@ -91,6 +92,8 @@ class AgentMixin(object):
def __init__(self, host):
self._init_ha_conf_path()
super(AgentMixin, self).__init__(host)
self.state_change_notifier = batch_notifier.BatchNotifier(
self._calculate_batch_duration(), self.notify_server)
eventlet.spawn(self._start_keepalived_notifications_server)
def _start_keepalived_notifications_server(self):
@ -98,11 +101,22 @@ class AgentMixin(object):
L3AgentKeepalivedStateChangeServer(self, self.conf))
state_change_server.run()
def _calculate_batch_duration(self):
# Slave becomes the master after not hearing from it 3 times
detection_time = self.conf.ha_vrrp_advert_int * 3
# Keepalived takes a couple of seconds to configure the VIPs
configuration_time = 2
# Give it enough slack to batch all events due to the same failure
return (detection_time + configuration_time) * 2
def enqueue_state_change(self, router_id, state):
LOG.info(_LI('Router %(router_id)s transitioned to %(state)s'),
{'router_id': router_id,
'state': state})
self._update_metadata_proxy(router_id, state)
self.state_change_notifier.queue_event((router_id, state))
def _update_metadata_proxy(self, router_id, state):
try:
@ -122,6 +136,17 @@ class AgentMixin(object):
self.metadata_driver.destroy_monitored_metadata_proxy(
self.process_monitor, ri.router_id, ri.ns_name, self.conf)
def notify_server(self, batched_events):
translation_map = {'master': 'active',
'backup': 'standby',
'fault': 'standby'}
translated_states = dict((router_id, translation_map[state]) for
router_id, state in batched_events)
LOG.debug('Updating server with HA routers states %s',
translated_states)
self.plugin_rpc.update_ha_routers_states(
self.context, translated_states)
def _init_ha_conf_path(self):
ha_full_path = os.path.dirname("/%s/" % self.conf.ha_confs_path)
agent_utils.ensure_dir(ha_full_path)

View File

@ -35,13 +35,14 @@ LOG = logging.getLogger(__name__)
class L3RpcCallback(object):
"""L3 agent RPC callback in plugin implementations."""
# 1.0 L3PluginApi BASE_RPC_API_VERSION
# 1.1 Support update_floatingip_statuses
# 1.0 L3PluginApi BASE_RPC_API_VERSION
# 1.1 Support update_floatingip_statuses
# 1.2 Added methods for DVR support
# 1.3 Added a method that returns the list of activated services
# 1.4 Added L3 HA update_router_state. This method was later removed,
# since it was unused. The RPC version was not changed.
target = oslo_messaging.Target(version='1.4')
# since it was unused. The RPC version was not changed
# 1.5 Added update_ha_routers_states
target = oslo_messaging.Target(version='1.5')
@property
def plugin(self):
@ -209,3 +210,15 @@ class L3RpcCallback(object):
'host %(host)s', {'agent_port': agent_port,
'host': host})
return agent_port
def update_ha_routers_states(self, context, **kwargs):
"""Update states for HA routers.
Get a map of router_id to its HA state on a host and update the DB.
State must be in: ('active', 'standby').
"""
states = kwargs.get('states')
host = kwargs.get('host')
LOG.debug('Updating HA routers states on host %s: %s', host, states)
self.l3plugin.update_routers_states(context, states, host)

View File

@ -462,3 +462,20 @@ class L3_HA_NAT_db_mixin(l3_dvr_db.L3_NAT_with_dvr_db_mixin):
router_ids,
active)
return self._process_sync_ha_data(context, sync_data, host)
@classmethod
def _set_router_states(cls, context, bindings, states):
for binding in bindings:
try:
with context.session.begin(subtransactions=True):
binding.state = states[binding.router_id]
except (orm.exc.StaleDataError, orm.exc.ObjectDeletedError):
# Take concurrently deleted routers in to account
pass
def update_routers_states(self, context, states, host):
"""Receive dict of router ID to state and update them all."""
bindings = self.get_ha_router_port_bindings(
context, router_ids=states.keys(), host=host)
self._set_router_states(context, bindings, states)

View File

@ -271,6 +271,12 @@ class L3AgentTestFramework(base.BaseOVSLinuxTestCase):
external_port['mac_address'],
namespace=router.ns_name) for fip in floating_ips)
def fail_ha_router(self, router):
device_name = router.get_ha_device_name(
router.router[l3_constants.HA_INTERFACE_KEY]['id'])
ha_device = ip_lib.IPDevice(device_name, router.ns_name)
ha_device.link.set_down()
class L3AgentTestCase(L3AgentTestFramework):
def test_observer_notifications_legacy_router(self):
@ -286,10 +292,7 @@ class L3AgentTestCase(L3AgentTestFramework):
router = self.manage_router(self.agent, router_info)
utils.wait_until_true(lambda: router.ha_state == 'master')
device_name = router.get_ha_device_name(
router.router[l3_constants.HA_INTERFACE_KEY]['id'])
ha_device = ip_lib.IPDevice(device_name, router.ns_name)
ha_device.link.set_down()
self.fail_ha_router(router)
utils.wait_until_true(lambda: router.ha_state == 'backup')
utils.wait_until_true(lambda: enqueue_mock.call_count == 3)
@ -298,6 +301,31 @@ class L3AgentTestCase(L3AgentTestFramework):
self.assertEqual((router.router_id, 'master'), calls[1])
self.assertEqual((router.router_id, 'backup'), calls[2])
def _expected_rpc_report(self, expected):
calls = (args[0][1] for args in
self.agent.plugin_rpc.update_ha_routers_states.call_args_list)
# Get the last state reported for each router
actual_router_states = {}
for call in calls:
for router_id, state in call.iteritems():
actual_router_states[router_id] = state
return actual_router_states == expected
def test_keepalived_state_change_bulk_rpc(self):
router_info = self.generate_router_info(enable_ha=True)
router1 = self.manage_router(self.agent, router_info)
self.fail_ha_router(router1)
router_info = self.generate_router_info(enable_ha=True)
router2 = self.manage_router(self.agent, router_info)
utils.wait_until_true(lambda: router1.ha_state == 'backup')
utils.wait_until_true(lambda: router2.ha_state == 'master')
utils.wait_until_true(
lambda: self._expected_rpc_report(
{router1.router_id: 'standby', router2.router_id: 'active'}))
def _test_observer_notifications(self, enable_ha):
"""Test create, update, delete of router and notifications."""
with mock.patch.object(

View File

@ -22,6 +22,7 @@ from oslo_config import cfg
from neutron.agent.common import config as agent_config
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3 import config as l3_config
from neutron.agent.l3 import ha as l3_ha_agent
from neutron.agent.metadata import driver as metadata_driver
from neutron.openstack.common import uuidutils
from neutron.tests import base
@ -74,6 +75,7 @@ class TestMetadataDriverProcess(base.BaseTestCase):
'._init_ha_conf_path').start()
cfg.CONF.register_opts(l3_config.OPTS)
cfg.CONF.register_opts(l3_ha_agent.OPTS)
cfg.CONF.register_opts(metadata_driver.MetadataDriver.OPTS)
def _test_spawn_metadata_proxy(self, expected_user, expected_group,

View File

@ -387,6 +387,43 @@ class L3HATestCase(L3HATestFramework):
routers_after = self.plugin.get_routers(self.admin_ctx)
self.assertEqual(routers_before, routers_after)
def test_update_routers_states(self):
router1 = self._create_router()
self._bind_router(router1['id'])
router2 = self._create_router()
self._bind_router(router2['id'])
routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
self.agent1['host'])
for router in routers:
self.assertEqual('standby', router[constants.HA_ROUTER_STATE_KEY])
states = {router1['id']: 'active',
router2['id']: 'standby'}
self.plugin.update_routers_states(
self.admin_ctx, states, self.agent1['host'])
routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
self.agent1['host'])
for router in routers:
self.assertEqual(states[router['id']],
router[constants.HA_ROUTER_STATE_KEY])
def test_set_router_states_handles_concurrently_deleted_router(self):
router1 = self._create_router()
self._bind_router(router1['id'])
router2 = self._create_router()
self._bind_router(router2['id'])
bindings = self.plugin.get_ha_router_port_bindings(
self.admin_ctx, [router1['id'], router2['id']])
self.plugin.delete_router(self.admin_ctx, router1['id'])
self.plugin._set_router_states(
self.admin_ctx, bindings, {router1['id']: 'active',
router2['id']: 'active'})
routers = self.plugin.get_ha_sync_data_for_host(self.admin_ctx,
self.agent1['host'])
self.assertEqual('active', routers[0][constants.HA_ROUTER_STATE_KEY])
def test_exclude_dvr_agents_for_ha_candidates(self):
"""Test dvr agents are not counted in the ha candidates.