Merge "L3 agent prefers RPC messages over full sync"

This commit is contained in:
Jenkins 2014-07-15 21:03:20 +00:00 committed by Gerrit Code Review
commit 6321ab820b
2 changed files with 324 additions and 62 deletions

View File

@ -15,11 +15,13 @@
import sys
import datetime
import eventlet
eventlet.monkey_patch()
import netaddr
from oslo.config import cfg
import Queue
from neutron.agent.common import config
from neutron.agent.linux import external_process
@ -37,12 +39,12 @@ from neutron import context
from neutron import manager
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import lockutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.openstack.common import processutils
from neutron.openstack.common import service
from neutron.openstack.common import timeutils
from neutron import service as neutron_service
from neutron.services.firewall.agents.l3reference import firewall_l3_agent
@ -52,6 +54,10 @@ INTERNAL_DEV_PREFIX = 'qr-'
EXTERNAL_DEV_PREFIX = 'qg-'
RPC_LOOP_INTERVAL = 1
FLOATING_IP_CIDR_SUFFIX = '/32'
# Lower value is higher priority
PRIORITY_RPC = 0
PRIORITY_SYNC_ROUTERS_TASK = 1
DELETE_ROUTER = 1
class L3PluginApi(n_rpc.RpcProxy):
@ -146,6 +152,144 @@ class RouterInfo(object):
self._snat_action = None
class RouterUpdate(object):
"""Encapsulates a router update
An instance of this object carries the information necessary to prioritize
and process a request to update a router.
"""
def __init__(self, router_id, priority,
action=None, router=None, timestamp=None):
self.priority = priority
self.timestamp = timestamp
if not timestamp:
self.timestamp = timeutils.utcnow()
self.id = router_id
self.action = action
self.router = router
def __lt__(self, other):
"""Implements priority among updates
Lower numerical priority always gets precedence. When comparing two
updates of the same priority then the one with the earlier timestamp
gets procedence. In the unlikely event that the timestamps are also
equal it falls back to a simple comparison of ids meaning the
precedence is essentially random.
"""
if self.priority != other.priority:
return self.priority < other.priority
if self.timestamp != other.timestamp:
return self.timestamp < other.timestamp
return self.id < other.id
class ExclusiveRouterProcessor(object):
"""Manager for access to a router for processing
This class controls access to a router in a non-blocking way. The first
instance to be created for a given router_id is granted exclusive access to
the router.
Other instances may be created for the same router_id while the first
instance has exclusive access. If that happens then it doesn't block and
wait for access. Instead, it signals to the master instance that an update
came in with the timestamp.
This way, a thread will not block to wait for access to a router. Instead
it effectively signals to the thread that is working on the router that
something has changed since it started working on it. That thread will
simply finish its current iteration and then repeat.
This class keeps track of the last time that a router data was fetched and
processed. The timestamp that it keeps must be before when the data used
to process the router last was fetched from the database. But, as close as
possible. The timestamp should not be recorded, however, until the router
has been processed using the fetch data.
"""
_masters = {}
_router_timestamps = {}
def __init__(self, router_id):
self._router_id = router_id
if router_id not in self._masters:
self._masters[router_id] = self
self._queue = []
self._master = self._masters[router_id]
def _i_am_master(self):
return self == self._master
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if self._i_am_master():
del self._masters[self._router_id]
def _get_router_data_timestamp(self):
return self._router_timestamps.get(self._router_id,
datetime.datetime.min)
def fetched_and_processed(self, timestamp):
"""Records the data timestamp after it is used to update the router"""
new_timestamp = max(timestamp, self._get_router_data_timestamp())
self._router_timestamps[self._router_id] = new_timestamp
def queue_update(self, update):
"""Queues an update from a worker
This is the queue used to keep new updates that come in while a router
is being processed. These updates have already bubbled to the front of
the RouterProcessingQueue.
"""
self._master._queue.append(update)
def updates(self):
"""Processes the router until updates stop coming
Only the master instance will process the router. However, updates may
come in from other workers while it is in progress. This method loops
until they stop coming.
"""
if self._i_am_master():
while self._queue:
# Remove the update from the queue even if it is old.
update = self._queue.pop(0)
# Process the update only if it is fresh.
if self._get_router_data_timestamp() < update.timestamp:
yield update
class RouterProcessingQueue(object):
"""Manager of the queue of routers to process."""
def __init__(self):
self._queue = Queue.PriorityQueue()
def add(self, update):
self._queue.put(update)
def each_update_to_next_router(self):
"""Grabs the next router from the queue and processes
This method uses a for loop to process the router repeatedly until
updates stop bubbling to the front of the queue.
"""
next_update = self._queue.get()
with ExclusiveRouterProcessor(next_update.id) as rp:
# Queue the update whether this worker is the master or not.
rp.queue_update(next_update)
# Here, if the current worker is not the master, the call to
# rp.updates() will not yield and so this will essentially be a
# noop.
for update in rp.updates():
yield (rp, update)
class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
"""Manager for L3NatAgent
@ -221,9 +365,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
self._clean_stale_namespaces = self.conf.use_namespaces
self.rpc_loop = loopingcall.FixedIntervalLoopingCall(
self._rpc_loop)
self.rpc_loop.start(interval=RPC_LOOP_INTERVAL)
self._queue = RouterProcessingQueue()
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
@ -244,10 +386,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
LOG.error(msg)
raise SystemExit(1)
def _cleanup_namespaces(self, routers):
"""Destroy stale router namespaces on host when L3 agent restarts
This routine is called when self._clean_stale_namespaces is True.
def _list_namespaces(self):
"""Get a set of all router namespaces on host
The argument routers is the list of routers that are recorded in
the database as being hosted on this node.
@ -256,15 +396,24 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
root_ip = ip_lib.IPWrapper(self.root_helper)
host_namespaces = root_ip.get_namespaces(self.root_helper)
router_namespaces = set(ns for ns in host_namespaces
if ns.startswith(NS_PREFIX))
ns_to_ignore = set(NS_PREFIX + r['id'] for r in routers)
ns_to_destroy = router_namespaces - ns_to_ignore
return set(ns for ns in host_namespaces
if ns.startswith(NS_PREFIX))
except RuntimeError:
LOG.exception(_('RuntimeError in obtaining router list '
'for namespace cleanup.'))
else:
self._destroy_stale_router_namespaces(ns_to_destroy)
return set()
def _cleanup_namespaces(self, router_namespaces, router_ids):
"""Destroy stale router namespaces on host when L3 agent restarts
This routine is called when self._clean_stale_namespaces is True.
The argument router_namespaces is the list of all routers namespaces
The argument router_ids is the list of ids for known routers.
"""
ns_to_ignore = set(NS_PREFIX + id for id in router_ids)
ns_to_destroy = router_namespaces - ns_to_ignore
self._destroy_stale_router_namespaces(ns_to_destroy)
def _destroy_stale_router_namespaces(self, router_namespaces):
"""Destroys the stale router namespaces
@ -735,7 +884,8 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
def router_deleted(self, context, router_id):
"""Deal with router deletion RPC message."""
LOG.debug(_('Got router deleted notification for %s'), router_id)
self.removed_routers.add(router_id)
update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
self._queue.add(update)
def routers_updated(self, context, routers):
"""Deal with routers modification and creation RPC message."""
@ -744,11 +894,15 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
# This is needed for backward compatibility
if isinstance(routers[0], dict):
routers = [router['id'] for router in routers]
self.updated_routers.update(routers)
for id in routers:
update = RouterUpdate(id, PRIORITY_RPC)
self._queue.add(update)
def router_removed_from_agent(self, context, payload):
LOG.debug(_('Got router removed from agent :%r'), payload)
self.removed_routers.add(payload['router_id'])
router_id = payload['router_id']
update = RouterUpdate(router_id, PRIORITY_RPC, action=DELETE_ROUTER)
self._queue.add(update)
def router_added_to_agent(self, context, payload):
LOG.debug(_('Got router added to agent :%r'), payload)
@ -800,36 +954,37 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
pool.spawn_n(self._router_removed, router_id)
pool.waitall()
@lockutils.synchronized('l3-agent', 'neutron-')
def _rpc_loop(self):
# _rpc_loop and _sync_routers_task will not be
# executed in the same time because of lock.
# so we can clear the value of updated_routers
# and removed_routers, but they can be updated by
# updated_routers and removed_routers rpc call
try:
LOG.debug(_("Starting RPC loop for %d updated routers"),
len(self.updated_routers))
if self.updated_routers:
# We're capturing and clearing the list, and will
# process the "captured" updates in this loop,
# and any updates that happen due to a context switch
# will be picked up on the next pass.
updated_routers = set(self.updated_routers)
self.updated_routers.clear()
router_ids = list(updated_routers)
routers = self.plugin_rpc.get_routers(
self.context, router_ids)
# routers with admin_state_up=false will not be in the fetched
fetched = set([r['id'] for r in routers])
self.removed_routers.update(updated_routers - fetched)
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_router():
LOG.debug("Starting router update for %s", update.id)
router = update.router
if update.action != DELETE_ROUTER and not router:
try:
update.timestamp = timeutils.utcnow()
routers = self.plugin_rpc.get_routers(self.context,
[update.id])
except Exception:
msg = _("Failed to fetch router information for '%s'")
LOG.exception(msg, update.id)
self.fullsync = True
continue
self._process_routers(routers)
self._process_router_delete()
LOG.debug(_("RPC loop successfully completed"))
except Exception:
LOG.exception(_("Failed synchronizing routers"))
self.fullsync = True
if routers:
router = routers[0]
if not router:
self._router_removed(update.id)
continue
self._process_routers([router])
LOG.debug("Finished a router update for %s", update.id)
rp.fetched_and_processed(update.timestamp)
def _process_routers_loop(self):
LOG.debug("Starting _process_routers_loop")
pool = eventlet.GreenPool(size=8)
while True:
pool.spawn_n(self._process_router_update)
def _process_router_delete(self):
current_removed_routers = list(self.removed_routers)
@ -842,7 +997,6 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
return [self.conf.router_id]
@periodic_task.periodic_task
@lockutils.synchronized('l3-agent', 'neutron-')
def periodic_sync_routers_task(self, context):
self._sync_routers_task(context)
@ -853,15 +1007,29 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
self.fullsync)
if not self.fullsync:
return
# Capture a picture of namespaces *before* fetching the full list from
# the database. This is important to correctly identify stale ones.
namespaces = set()
if self._clean_stale_namespaces:
namespaces = self._list_namespaces()
prev_router_ids = set(self.router_info)
try:
router_ids = self._router_ids()
self.updated_routers.clear()
self.removed_routers.clear()
timestamp = timeutils.utcnow()
routers = self.plugin_rpc.get_routers(
context, router_ids)
LOG.debug(_('Processing :%r'), routers)
self._process_routers(routers, all_routers=True)
for r in routers:
update = RouterUpdate(r['id'],
PRIORITY_SYNC_ROUTERS_TASK,
router=r,
timestamp=timestamp)
self._queue.add(update)
self.fullsync = False
LOG.debug(_("_sync_routers_task successfully completed"))
except n_rpc.RPCException:
@ -872,10 +1040,25 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
self.fullsync = True
else:
# Resync is not necessary for the cleanup of stale namespaces
curr_router_ids = set([r['id'] for r in routers])
# Two kinds of stale routers: Routers for which info is cached in
# self.router_info and the others. First, handle the former.
for router_id in prev_router_ids - curr_router_ids:
update = RouterUpdate(router_id,
PRIORITY_SYNC_ROUTERS_TASK,
timestamp=timestamp,
action=DELETE_ROUTER)
self._queue.add(update)
# Next, one effort to clean out namespaces for which we don't have
# a record. (i.e. _clean_stale_namespaces=False after one pass)
if self._clean_stale_namespaces:
self._cleanup_namespaces(routers)
ids_to_keep = curr_router_ids | prev_router_ids
self._cleanup_namespaces(namespaces, ids_to_keep)
def after_start(self):
eventlet.spawn_n(self._process_routers_loop)
LOG.info(_("L3 agent started"))
def _update_routing_table(self, ri, operation, route):

View File

@ -15,6 +15,7 @@
import contextlib
import copy
import datetime
import mock
import netaddr
@ -35,6 +36,85 @@ from neutron.tests import base
_uuid = uuidutils.generate_uuid
HOSTNAME = 'myhost'
FAKE_ID = _uuid()
FAKE_ID_2 = _uuid()
class TestExclusiveRouterProcessor(base.BaseTestCase):
def setUp(self):
super(TestExclusiveRouterProcessor, self).setUp()
def test_i_am_master(self):
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
self.assertTrue(master._i_am_master())
self.assertFalse(not_master._i_am_master())
self.assertTrue(master_2._i_am_master())
self.assertFalse(not_master_2._i_am_master())
master.__exit__(None, None, None)
master_2.__exit__(None, None, None)
def test_master(self):
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
not_master_2 = l3_agent.ExclusiveRouterProcessor(FAKE_ID_2)
self.assertEqual(master._master, master)
self.assertEqual(not_master._master, master)
self.assertEqual(master_2._master, master_2)
self.assertEqual(not_master_2._master, master_2)
master.__exit__(None, None, None)
master_2.__exit__(None, None, None)
def test__enter__(self):
self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
master.__enter__()
self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
master.__exit__(None, None, None)
def test__exit__(self):
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
master.__enter__()
self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
not_master.__enter__()
not_master.__exit__(None, None, None)
self.assertTrue(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
master.__exit__(None, None, None)
self.assertFalse(FAKE_ID in l3_agent.ExclusiveRouterProcessor._masters)
def test_data_fetched_since(self):
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
self.assertEqual(master._get_router_data_timestamp(),
datetime.datetime.min)
ts1 = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
ts2 = datetime.datetime.utcnow()
master.fetched_and_processed(ts2)
self.assertEqual(master._get_router_data_timestamp(), ts2)
master.fetched_and_processed(ts1)
self.assertEqual(master._get_router_data_timestamp(), ts2)
master.__exit__(None, None, None)
def test_updates(self):
master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
not_master = l3_agent.ExclusiveRouterProcessor(FAKE_ID)
master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
not_master.queue_update(l3_agent.RouterUpdate(FAKE_ID, 0))
for update in not_master.updates():
raise Exception("Only the master should process a router")
self.assertEqual(2, len([i for i in master.updates()]))
class TestBasicRouterOperations(base.BaseTestCase):
@ -100,12 +180,10 @@ class TestBasicRouterOperations(base.BaseTestCase):
def test__sync_routers_task_call_clean_stale_namespaces(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.plugin_api.get_routers.return_value = mock.ANY
self.plugin_api.get_routers.return_value = []
with mock.patch.object(agent, '_cleanup_namespaces') as f:
with mock.patch.object(agent, '_process_routers') as g:
agent._sync_routers_task(agent.context)
agent._sync_routers_task(agent.context)
self.assertTrue(f.called)
g.assert_called_with(mock.ANY, all_routers=True)
def test_router_info_create(self):
id = _uuid()
@ -1024,27 +1102,27 @@ class TestBasicRouterOperations(base.BaseTestCase):
def test_router_deleted(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._queue = mock.Mock()
agent.router_deleted(None, FAKE_ID)
# verify that will set fullsync
self.assertIn(FAKE_ID, agent.removed_routers)
agent._queue.add.assert_called_once()
def test_routers_updated(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._queue = mock.Mock()
agent.routers_updated(None, [FAKE_ID])
# verify that will set fullsync
self.assertIn(FAKE_ID, agent.updated_routers)
agent._queue.add.assert_called_once()
def test_removed_from_agent(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._queue = mock.Mock()
agent.router_removed_from_agent(None, {'router_id': FAKE_ID})
# verify that will set fullsync
self.assertIn(FAKE_ID, agent.removed_routers)
agent._queue.add.assert_called_once()
def test_added_to_agent(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
agent._queue = mock.Mock()
agent.router_added_to_agent(None, [FAKE_ID])
# verify that will set fullsync
self.assertIn(FAKE_ID, agent.updated_routers)
agent._queue.add.assert_called_once()
def test_process_router_delete(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
@ -1277,7 +1355,8 @@ class TestBasicRouterOperations(base.BaseTestCase):
pm.reset_mock()
agent._destroy_router_namespace = mock.MagicMock()
agent._cleanup_namespaces(router_list)
ns_list = agent._list_namespaces()
agent._cleanup_namespaces(ns_list, [r['id'] for r in router_list])
self.assertEqual(pm.disable.call_count, len(stale_namespace_list))
self.assertEqual(agent._destroy_router_namespace.call_count,