Merge "Delay HA router transition from "backup" to "master""
This commit is contained in:
commit
dd3baf12f3
@ -540,6 +540,15 @@ class L3NATAgent(ha.AgentMixin,
|
||||
return True
|
||||
|
||||
def _router_removed(self, ri, router_id):
|
||||
"""Delete the router and stop the auxiliary processes
|
||||
|
||||
This stops the auxiliary processes (keepalived, keepvalived-state-
|
||||
change, radvd, etc) and deletes the router ports and the namespace.
|
||||
The "router_info" cache is updated too at the beginning of the process,
|
||||
to avoid any other concurrent process to handle the router being
|
||||
deleted. If an exception is raised, the "router_info" cache is
|
||||
restored.
|
||||
"""
|
||||
if ri is None:
|
||||
LOG.warning("Info for router %s was not found. "
|
||||
"Performing router cleanup", router_id)
|
||||
@ -551,8 +560,12 @@ class L3NATAgent(ha.AgentMixin,
|
||||
self.context, states=(ri,),
|
||||
resource_id=router_id))
|
||||
|
||||
ri.delete()
|
||||
del self.router_info[router_id]
|
||||
try:
|
||||
ri.delete()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.router_info[router_id] = ri
|
||||
|
||||
registry.notify(resources.ROUTER, events.AFTER_DELETE, self, router=ri)
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import threading
|
||||
|
||||
import eventlet
|
||||
from neutron_lib import constants
|
||||
@ -83,6 +84,8 @@ class AgentMixin(object):
|
||||
self.state_change_notifier = batch_notifier.BatchNotifier(
|
||||
self._calculate_batch_duration(), self.notify_server)
|
||||
eventlet.spawn(self._start_keepalived_notifications_server)
|
||||
self._transition_states = {}
|
||||
self._transition_state_mutex = threading.Lock()
|
||||
|
||||
def _get_router_info(self, router_id):
|
||||
try:
|
||||
@ -112,7 +115,44 @@ class AgentMixin(object):
|
||||
# default 2 seconds.
|
||||
return self.conf.ha_vrrp_advert_int
|
||||
|
||||
def _update_transition_state(self, router_id, new_state=None):
|
||||
with self._transition_state_mutex:
|
||||
transition_state = self._transition_states.get(router_id)
|
||||
if new_state:
|
||||
self._transition_states[router_id] = new_state
|
||||
else:
|
||||
self._transition_states.pop(router_id, None)
|
||||
return transition_state
|
||||
|
||||
def enqueue_state_change(self, router_id, state):
|
||||
"""Inform the server about the new router state
|
||||
|
||||
This function will also update the metadata proxy, the radvd daemon,
|
||||
process the prefix delegation and inform to the L3 extensions. If the
|
||||
HA router changes to "master", this transition will be delayed for at
|
||||
least "ha_vrrp_advert_int" seconds. When the "master" router
|
||||
transitions to "backup", "keepalived" will set the rest of HA routers
|
||||
to "master" until it decides which one should be the only "master".
|
||||
The transition from "backup" to "master" and then to "backup" again,
|
||||
should not be registered in the Neutron server.
|
||||
|
||||
:param router_id: router ID
|
||||
:param state: ['master', 'backup']
|
||||
"""
|
||||
if not self._update_transition_state(router_id, state):
|
||||
eventlet.spawn_n(self._enqueue_state_change, router_id, state)
|
||||
eventlet.sleep(0)
|
||||
|
||||
def _enqueue_state_change(self, router_id, state):
|
||||
# NOTE(ralonsoh): move 'master' and 'backup' constants to n-lib
|
||||
if state == 'master':
|
||||
eventlet.sleep(self.conf.ha_vrrp_advert_int)
|
||||
if self._update_transition_state(router_id) != state:
|
||||
# If the current "transition state" is not the initial "state" sent
|
||||
# to update the router, that means the actual router state is the
|
||||
# same as the "transition state" (e.g.: backup-->master-->backup).
|
||||
return
|
||||
|
||||
ri = self._get_router_info(router_id)
|
||||
if ri is None:
|
||||
return
|
||||
@ -127,6 +167,7 @@ class AgentMixin(object):
|
||||
# configuration to keepalived-state-change in order to remove the
|
||||
# dependency that currently exists on l3-agent running for the IPv6
|
||||
# failover.
|
||||
ri.ha_state = state
|
||||
self._configure_ipv6_params(ri, state)
|
||||
if self.conf.enable_metadata_proxy:
|
||||
self._update_metadata_proxy(ri, router_id, state)
|
||||
|
@ -71,12 +71,21 @@ class HaRouter(router.RouterInfo):
|
||||
self.ha_port = None
|
||||
self.keepalived_manager = None
|
||||
self.state_change_callback = state_change_callback
|
||||
self._ha_state = None
|
||||
self._ha_state_path = None
|
||||
|
||||
def create_router_namespace_object(
|
||||
self, router_id, agent_conf, iface_driver, use_ipv6):
|
||||
return HaRouterNamespace(
|
||||
router_id, agent_conf, iface_driver, use_ipv6)
|
||||
|
||||
@property
|
||||
def ha_state_path(self):
|
||||
if not self._ha_state_path and self.keepalived_manager:
|
||||
self._ha_state_path = (self.keepalived_manager.
|
||||
get_full_config_file_path('state'))
|
||||
return self._ha_state_path
|
||||
|
||||
@property
|
||||
def ha_priority(self):
|
||||
return self.router.get('priority', keepalived.HA_DEFAULT_PRIORITY)
|
||||
@ -87,22 +96,20 @@ class HaRouter(router.RouterInfo):
|
||||
|
||||
@property
|
||||
def ha_state(self):
|
||||
state = None
|
||||
ha_state_path = self.keepalived_manager.get_full_config_file_path(
|
||||
'state')
|
||||
if self._ha_state:
|
||||
return self._ha_state
|
||||
try:
|
||||
with open(ha_state_path, 'r') as f:
|
||||
state = f.read()
|
||||
with open(self.ha_state_path, 'r') as f:
|
||||
self._ha_state = f.read()
|
||||
except (OSError, IOError):
|
||||
LOG.debug('Error while reading HA state for %s', self.router_id)
|
||||
return state or 'unknown'
|
||||
return self._ha_state or 'unknown'
|
||||
|
||||
@ha_state.setter
|
||||
def ha_state(self, new_state):
|
||||
ha_state_path = self.keepalived_manager.get_full_config_file_path(
|
||||
'state')
|
||||
self._ha_state = new_state
|
||||
try:
|
||||
with open(ha_state_path, 'w') as f:
|
||||
with open(self.ha_state_path, 'w') as f:
|
||||
f.write(new_state)
|
||||
except (OSError, IOError):
|
||||
LOG.error('Error while writing HA state for %s',
|
||||
|
@ -132,6 +132,12 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
|
||||
enable_pf_floating_ip),
|
||||
qos_policy_id=qos_policy_id)
|
||||
|
||||
def change_router_state(self, router_id, state):
|
||||
ri = self.agent.router_info.get(router_id)
|
||||
if not ri:
|
||||
self.fail('Router %s is not present in the L3 agent' % router_id)
|
||||
ri.ha_state = state
|
||||
|
||||
def _test_conntrack_disassociate_fip(self, ha):
|
||||
'''Test that conntrack immediately drops stateful connection
|
||||
that uses floating IP once it's disassociated.
|
||||
@ -493,7 +499,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
|
||||
# so there's no need to check that explicitly.
|
||||
self.assertFalse(self._namespace_exists(router.ns_name))
|
||||
common_utils.wait_until_true(
|
||||
lambda: not self._metadata_proxy_exists(self.agent.conf, router))
|
||||
lambda: not self._metadata_proxy_exists(self.agent.conf, router),
|
||||
timeout=10)
|
||||
|
||||
def _assert_snat_chains(self, router):
|
||||
self.assertFalse(router.iptables_manager.is_chain_empty(
|
||||
|
@ -37,7 +37,8 @@ class L3HATestCase(framework.L3AgentTestFramework):
|
||||
|
||||
def test_keepalived_state_change_notification(self):
|
||||
enqueue_mock = mock.patch.object(
|
||||
self.agent, 'enqueue_state_change').start()
|
||||
self.agent, 'enqueue_state_change',
|
||||
side_effect=self.change_router_state).start()
|
||||
router_info = self.generate_router_info(enable_ha=True)
|
||||
router = self.manage_router(self.agent, router_info)
|
||||
common_utils.wait_until_true(lambda: router.ha_state == 'master')
|
||||
|
@ -231,17 +231,52 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
# Make sure the exceptional code path has coverage
|
||||
agent.enqueue_state_change(non_existent_router, 'master')
|
||||
|
||||
def _enqueue_state_change_transitions(self, transitions, num_called):
|
||||
self.conf.set_override('ha_vrrp_advert_int', 1)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
agent._update_transition_state('router_id')
|
||||
with mock.patch.object(agent, '_get_router_info', return_value=None) \
|
||||
as mock_get_router_info:
|
||||
for state in transitions:
|
||||
agent.enqueue_state_change('router_id', state)
|
||||
eventlet.sleep(0.2)
|
||||
# NOTE(ralonsoh): the wait process should be done inside the mock
|
||||
# context, to allow the spawned thread to call the mocked function
|
||||
# before the context ends.
|
||||
eventlet.sleep(self.conf.ha_vrrp_advert_int + 2)
|
||||
|
||||
if num_called:
|
||||
mock_get_router_info.assert_has_calls(
|
||||
[mock.call('router_id') for _ in range(num_called)])
|
||||
else:
|
||||
mock_get_router_info.assert_not_called()
|
||||
|
||||
def test_enqueue_state_change_from_none_to_master(self):
|
||||
self._enqueue_state_change_transitions(['master'], 1)
|
||||
|
||||
def test_enqueue_state_change_from_none_to_backup(self):
|
||||
self._enqueue_state_change_transitions(['backup'], 1)
|
||||
|
||||
def test_enqueue_state_change_from_none_to_master_to_backup(self):
|
||||
self._enqueue_state_change_transitions(['master', 'backup'], 0)
|
||||
|
||||
def test_enqueue_state_change_from_none_to_backup_to_master(self):
|
||||
self._enqueue_state_change_transitions(['backup', 'master'], 2)
|
||||
|
||||
def test_enqueue_state_change_metadata_disable(self):
|
||||
self.conf.set_override('enable_metadata_proxy', False)
|
||||
self.conf.set_override('ha_vrrp_advert_int', 1)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = mock.Mock()
|
||||
router_info = mock.MagicMock()
|
||||
agent.router_info[router.id] = router_info
|
||||
agent._update_metadata_proxy = mock.Mock()
|
||||
agent.enqueue_state_change(router.id, 'master')
|
||||
eventlet.sleep(self.conf.ha_vrrp_advert_int + 2)
|
||||
self.assertFalse(agent._update_metadata_proxy.call_count)
|
||||
|
||||
def test_enqueue_state_change_l3_extension(self):
|
||||
self.conf.set_override('ha_vrrp_advert_int', 1)
|
||||
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
router = mock.Mock()
|
||||
router_info = mock.MagicMock()
|
||||
@ -249,6 +284,7 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
|
||||
agent.router_info[router.id] = router_info
|
||||
agent.l3_ext_manager.ha_state_change = mock.Mock()
|
||||
agent.enqueue_state_change(router.id, 'master')
|
||||
eventlet.sleep(self.conf.ha_vrrp_advert_int + 2)
|
||||
agent.l3_ext_manager.ha_state_change.assert_called_once_with(
|
||||
agent.context,
|
||||
{'router_id': router.id, 'state': 'master',
|
||||
|
Loading…
x
Reference in New Issue
Block a user