diff --git a/neutron/agent/l3_agent.py b/neutron/agent/l3_agent.py index 7b504e90a..1f5105d1a 100644 --- a/neutron/agent/l3_agent.py +++ b/neutron/agent/l3_agent.py @@ -18,7 +18,6 @@ # import eventlet -from eventlet import semaphore import netaddr from oslo.config import cfg @@ -36,6 +35,7 @@ from neutron.common import utils as common_utils from neutron import context from neutron import manager from neutron.openstack.common import importutils +from neutron.openstack.common import lockutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall from neutron.openstack.common import periodic_task @@ -49,6 +49,7 @@ LOG = logging.getLogger(__name__) NS_PREFIX = 'qrouter-' INTERNAL_DEV_PREFIX = 'qr-' EXTERNAL_DEV_PREFIX = 'qg-' +RPC_LOOP_INTERVAL = 1 class L3PluginApi(proxy.RpcProxy): @@ -66,9 +67,8 @@ class L3PluginApi(proxy.RpcProxy): topic=topic, default_version=self.BASE_RPC_API_VERSION) self.host = host - def get_routers(self, context, fullsync=True, router_id=None): + def get_routers(self, context, fullsync=True, router_ids=None): """Make a remote process call to retrieve the sync data for routers.""" - router_ids = [router_id] if router_id else None return self.call(context, self.make_msg('sync_routers', host=self.host, fullsync=fullsync, @@ -140,6 +140,17 @@ class RouterInfo(object): class L3NATAgent(manager.Manager): + """Manager for L3NatAgent + + API version history: + 1.0 initial Version + 1.1 changed the type of the routers parameter + to the routers_updated method. + It was previously a list of routers in dict format. + It is now a list of router IDs only. + Per rpc versioning rules, it is backwards compatible. + """ + RPC_API_VERSION = '1.1' OPTS = [ cfg.StrOpt('external_network_bridge', default='br-ex', @@ -196,9 +207,15 @@ class L3NATAgent(manager.Manager): self.context = context.get_admin_context_without_session() self.plugin_rpc = L3PluginApi(topics.PLUGIN, host) self.fullsync = True - self.sync_sem = semaphore.Semaphore(1) + self.updated_routers = set() + self.removed_routers = set() + self.sync_progress = False if self.conf.use_namespaces: self._destroy_router_namespaces(self.conf.router_id) + + self.rpc_loop = loopingcall.FixedIntervalLoopingCall( + self._rpc_loop) + self.rpc_loop.start(interval=RPC_LOOP_INTERVAL) super(L3NATAgent, self).__init__(host=self.conf.host) def _destroy_router_namespaces(self, only_router_id=None): @@ -323,6 +340,7 @@ class L3NATAgent(manager.Manager): port['ip_cidr'] = "%s/%s" % (ips[0]['ip_address'], prefixlen) def process_router(self, ri): + ri.iptables_manager.defer_apply_on() ex_gw_port = self._get_ex_gw_port(ri) internal_ports = ri.router.get(l3_constants.INTERFACE_KEY, []) existing_port_ids = set([p['id'] for p in ri.internal_ports]) @@ -371,6 +389,7 @@ class L3NATAgent(manager.Manager): ri.ex_gw_port = ex_gw_port ri.enable_snat = ri.router.get('enable_snat') self.routes_updated(ri) + ri.iptables_manager.defer_apply_off() def _handle_router_snat_rules(self, ri, ex_gw_port, internal_cidrs, interface_name, action): @@ -586,35 +605,28 @@ class L3NATAgent(manager.Manager): def router_deleted(self, context, router_id): """Deal with router deletion RPC message.""" - with self.sync_sem: - if router_id in self.router_info: - try: - self._router_removed(router_id) - except Exception: - msg = _("Failed dealing with router " - "'%s' deletion RPC message") - LOG.debug(msg, router_id) - self.fullsync = True + LOG.debug(_('Got router deleted notification for %s'), router_id) + self.removed_routers.add(router_id) def routers_updated(self, context, routers): """Deal with routers modification and creation RPC message.""" - if not routers: - return - with self.sync_sem: - try: - self._process_routers(routers) - except Exception: - msg = _("Failed dealing with routers update RPC message") - LOG.debug(msg) - self.fullsync = True + LOG.debug(_('Got routers updated notification :%s'), routers) + if routers: + # This is needed for backward compatiblity + if isinstance(routers[0], dict): + routers = [router['id'] for router in routers] + self.updated_routers.update(routers) def router_removed_from_agent(self, context, payload): - self.router_deleted(context, payload['router_id']) + LOG.debug(_('Got router removed from agent :%r'), payload) + self.removed_routers.add(payload['router_id']) def router_added_to_agent(self, context, payload): + LOG.debug(_('Got router added to agent :%r'), payload) self.routers_updated(context, payload) def _process_routers(self, routers, all_routers=False): + pool = eventlet.GreenPool() if (self.conf.external_network_bridge and not ip_lib.device_exists(self.conf.external_network_bridge)): LOG.error(_("The external network bridge '%s' does not exist"), @@ -652,28 +664,58 @@ class L3NATAgent(manager.Manager): self._router_added(r['id'], r) ri = self.router_info[r['id']] ri.router = r - self.process_router(ri) + pool.spawn_n(self.process_router, ri) # identify and remove routers that no longer exist for router_id in prev_router_ids - cur_router_ids: - self._router_removed(router_id) + pool.spawn_n(self._router_removed, router_id) + pool.waitall() + + @lockutils.synchronized('l3-agent', 'neutron-') + def _rpc_loop(self): + # _rpc_loop and _sync_routers_task will not be + # executed in the same time because of lock. + # so we can clear the value of updated_routers + # and removed_routers + try: + if self.updated_routers: + router_ids = list(self.updated_routers) + self.updated_routers.clear() + routers = self.plugin_rpc.get_routers( + self.context, router_ids) + self._process_routers(routers) + self._process_router_delete() + except Exception: + LOG.exception(_("Failed synchronizing routers")) + self.fullsync = True + + def _process_router_delete(self): + current_removed_routers = list(self.removed_routers) + for router_id in current_removed_routers: + self._router_removed(context, router_id) + self.removed_routers.remove(router_id) + + def _router_ids(self): + if not self.conf.use_namespaces: + return [self.conf.router_id] @periodic_task.periodic_task + @lockutils.synchronized('l3-agent', 'neutron-') def _sync_routers_task(self, context): - # we need to sync with router deletion RPC message - with self.sync_sem: - if self.fullsync: - try: - if not self.conf.use_namespaces: - router_id = self.conf.router_id - else: - router_id = None - routers = self.plugin_rpc.get_routers( - context, router_id) - self._process_routers(routers, all_routers=True) - self.fullsync = False - except Exception: - LOG.exception(_("Failed synchronizing routers")) - self.fullsync = True + if not self.fullsync: + return + try: + router_ids = self._router_ids() + self.updated_routers.clear() + self.removed_routers.clear() + routers = self.plugin_rpc.get_routers( + context, router_ids) + + LOG.debug(_('Processing :%r'), routers) + self._process_routers(routers, all_routers=True) + self.fullsync = False + except Exception: + LOG.exception(_("Failed synchronizing routers")) + self.fullsync = True def after_start(self): LOG.info(_("L3 agent started")) diff --git a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py index 28fea4c24..518440620 100644 --- a/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py +++ b/neutron/api/rpc/agentnotifiers/l3_rpc_agent_api.py @@ -42,14 +42,14 @@ class L3AgentNotifyAPI(proxy.RpcProxy): payload=payload), topic='%s.%s' % (topics.L3_AGENT, host)) - def _agent_notification(self, context, method, routers, + def _agent_notification(self, context, method, router_ids, operation, data): """Notify changed routers to hosting l3 agents.""" adminContext = context.is_admin and context or context.elevated() plugin = manager.NeutronManager.get_plugin() - for router in routers: + for router_id in router_ids: l3_agents = plugin.get_l3_agents_hosting_routers( - adminContext, [router['id']], + adminContext, [router_id], admin_state_up=True, active=True) for l3_agent in l3_agents: @@ -60,23 +60,24 @@ class L3AgentNotifyAPI(proxy.RpcProxy): 'method': method}) self.cast( context, self.make_msg(method, - routers=[router]), - topic='%s.%s' % (l3_agent.topic, l3_agent.host)) + routers=[router_id]), + topic='%s.%s' % (l3_agent.topic, l3_agent.host), + version='1.1') - def _notification(self, context, method, routers, operation, data): + def _notification(self, context, method, router_ids, operation, data): """Notify all the agents that are hosting the routers.""" plugin = manager.NeutronManager.get_plugin() if utils.is_extension_supported( plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS): adminContext = (context.is_admin and context or context.elevated()) - plugin.schedule_routers(adminContext, routers) + plugin.schedule_routers(adminContext, router_ids) self._agent_notification( - context, method, routers, operation, data) + context, method, router_ids, operation, data) else: self.fanout_cast( context, self.make_msg(method, - routers=routers), + routers=router_ids), topic=topics.L3_AGENT) def _notification_fanout(self, context, method, router_id): @@ -99,17 +100,17 @@ class L3AgentNotifyAPI(proxy.RpcProxy): def router_deleted(self, context, router_id): self._notification_fanout(context, 'router_deleted', router_id) - def routers_updated(self, context, routers, operation=None, data=None): - if routers: - self._notification(context, 'routers_updated', routers, + def routers_updated(self, context, router_ids, operation=None, data=None): + if router_ids: + self._notification(context, 'routers_updated', router_ids, operation, data) def router_removed_from_agent(self, context, router_id, host): self._notification_host(context, 'router_removed_from_agent', {'router_id': router_id}, host) - def router_added_to_agent(self, context, routers, host): + def router_added_to_agent(self, context, router_ids, host): self._notification_host(context, 'router_added_to_agent', - routers, host) + router_ids, host) L3AgentNotify = L3AgentNotifyAPI() diff --git a/neutron/db/agentschedulers_db.py b/neutron/db/agentschedulers_db.py index 964b1d20a..c9ea7b118 100644 --- a/neutron/db/agentschedulers_db.py +++ b/neutron/db/agentschedulers_db.py @@ -129,9 +129,8 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase, router_id=router_id, agent_id=id) if self.l3_agent_notifier: - routers = self.get_sync_data(context, [router_id]) self.l3_agent_notifier.router_added_to_agent( - context, routers, agent_db.host) + context, [router_id], agent_db.host) def remove_router_from_l3_agent(self, context, id, router_id): """Remove the router from l3 agent. diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 50db1eadd..5e84c2cee 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -183,9 +183,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): # Ensure we actually have something to update if r.keys(): router_db.update(r) - routers = self.get_sync_data(context.elevated(), - [router_db['id']]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, [router_db['id']]) return self._make_router_dict(router_db) def _create_router_gw_port(self, context, router, network_id): @@ -336,27 +335,29 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): raise q_exc.BadRequest(resource='router', msg=msg) if 'port_id' in interface_info: - if 'subnet_id' in interface_info: - msg = _("Cannot specify both subnet-id and port-id") - raise q_exc.BadRequest(resource='router', msg=msg) + # make sure port update is committed + with context.session.begin(subtransactions=True): + if 'subnet_id' in interface_info: + msg = _("Cannot specify both subnet-id and port-id") + raise q_exc.BadRequest(resource='router', msg=msg) - port = self._get_port(context, interface_info['port_id']) - if port['device_id']: - raise q_exc.PortInUse(net_id=port['network_id'], - port_id=port['id'], - device_id=port['device_id']) - fixed_ips = [ip for ip in port['fixed_ips']] - if len(fixed_ips) != 1: - msg = _('Router port must have exactly one fixed IP') - raise q_exc.BadRequest(resource='router', msg=msg) - subnet_id = fixed_ips[0]['subnet_id'] - subnet = self._get_subnet(context, subnet_id) - self._check_for_dup_router_subnet(context, router_id, - port['network_id'], - subnet['id'], - subnet['cidr']) - port.update({'device_id': router_id, - 'device_owner': DEVICE_OWNER_ROUTER_INTF}) + port = self._get_port(context, interface_info['port_id']) + if port['device_id']: + raise q_exc.PortInUse(net_id=port['network_id'], + port_id=port['id'], + device_id=port['device_id']) + fixed_ips = [ip for ip in port['fixed_ips']] + if len(fixed_ips) != 1: + msg = _('Router port must have exactly one fixed IP') + raise q_exc.BadRequest(resource='router', msg=msg) + subnet_id = fixed_ips[0]['subnet_id'] + subnet = self._get_subnet(context, subnet_id) + self._check_for_dup_router_subnet(context, router_id, + port['network_id'], + subnet['id'], + subnet['cidr']) + port.update({'device_id': router_id, + 'device_owner': DEVICE_OWNER_ROUTER_INTF}) elif 'subnet_id' in interface_info: subnet_id = interface_info['subnet_id'] subnet = self._get_subnet(context, subnet_id) @@ -381,11 +382,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'device_owner': DEVICE_OWNER_ROUTER_INTF, 'name': ''}}) - routers = self.get_sync_data(context.elevated(), [router_id]) l3_rpc_agent_api.L3AgentNotify.routers_updated( - context, routers, 'add_router_interface', - {'network_id': port['network_id'], - 'subnet_id': subnet_id}) + context, [router_id], 'add_router_interface') info = {'id': router_id, 'tenant_id': subnet['tenant_id'], 'port_id': port['id'], @@ -428,7 +426,6 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): subnet = self._get_subnet(context, subnet_id) self._confirm_router_interface_not_in_use( context, router_id, subnet_id) - _network_id = port_db['network_id'] self.delete_port(context, port_db['id'], l3_port_check=False) elif 'subnet_id' in interface_info: subnet_id = interface_info['subnet_id'] @@ -448,7 +445,6 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): for p in ports: if p['fixed_ips'][0]['subnet_id'] == subnet_id: port_id = p['id'] - _network_id = p['network_id'] self.delete_port(context, p['id'], l3_port_check=False) found = True break @@ -458,11 +454,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): if not found: raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id, subnet_id=subnet_id) - routers = self.get_sync_data(context.elevated(), [router_id]) l3_rpc_agent_api.L3AgentNotify.routers_updated( - context, routers, 'remove_router_interface', - {'network_id': _network_id, - 'subnet_id': subnet_id}) + context, [router_id], 'remove_router_interface') info = {'id': router_id, 'tenant_id': subnet['tenant_id'], 'port_id': port_id, @@ -674,9 +667,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): router_id = floatingip_db['router_id'] if router_id: - routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, - 'create_floatingip') + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, [router_id], + 'create_floatingip') return self._make_floatingip_dict(floatingip_db) def update_floatingip(self, context, id, floatingip): @@ -697,8 +690,7 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): if router_id and router_id != before_router_id: router_ids.append(router_id) if router_ids: - routers = self.get_sync_data(context.elevated(), router_ids) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, + l3_rpc_agent_api.L3AgentNotify.routers_updated(context, router_ids, 'update_floatingip') return self._make_floatingip_dict(floatingip_db) @@ -711,9 +703,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): floatingip['floating_port_id'], l3_port_check=False) if router_id: - routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers, - 'delete_floatingip') + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, [router_id], + 'delete_floatingip') def get_floatingip(self, context, id, fields=None): floatingip = self._get_floatingip(context, id) @@ -782,8 +774,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): raise Exception(_('Multiple floating IPs found for port %s') % port_id) if router_id: - routers = self.get_sync_data(context.elevated(), [router_id]) - l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers) + l3_rpc_agent_api.L3AgentNotify.routers_updated( + context, [router_id]) def _network_is_external(self, context, net_id): try: diff --git a/neutron/scheduler/l3_agent_scheduler.py b/neutron/scheduler/l3_agent_scheduler.py index 027ee5b05..e90dfa268 100644 --- a/neutron/scheduler/l3_agent_scheduler.py +++ b/neutron/scheduler/l3_agent_scheduler.py @@ -109,7 +109,7 @@ class ChanceScheduler(object): context.session.add(binding) return True - def schedule(self, plugin, context, sync_router): + def schedule(self, plugin, context, router_id): """Schedule the router to an active L3 agent if there is no enable L3 agent hosting it. """ @@ -119,14 +119,15 @@ class ChanceScheduler(object): # timing problem. Non-active l3 agent can return to # active any time l3_agents = plugin.get_l3_agents_hosting_routers( - context, [sync_router['id']], admin_state_up=True) + context, [router_id], admin_state_up=True) if l3_agents: LOG.debug(_('Router %(router_id)s has already been hosted' ' by L3 agent %(agent_id)s'), - {'router_id': sync_router['id'], + {'router_id': router_id, 'agent_id': l3_agents[0]['id']}) return + sync_router = plugin.get_router(context, router_id) active_l3_agents = plugin.get_l3_agents(context, active=True) if not active_l3_agents: LOG.warn(_('No active L3 agents')) diff --git a/neutron/tests/unit/openvswitch/test_agent_scheduler.py b/neutron/tests/unit/openvswitch/test_agent_scheduler.py index c4c7ac3cb..804e2e7d8 100644 --- a/neutron/tests/unit/openvswitch/test_agent_scheduler.py +++ b/neutron/tests/unit/openvswitch/test_agent_scheduler.py @@ -1070,8 +1070,7 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin, L3_HOSTA) self._add_router_to_l3_agent(hosta_id, router1['router']['id']) - routers = plugin.get_sync_data(self.adminContext, - [router1['router']['id']]) + routers = [router1['router']['id']] mock_l3.assert_called_with( mock.ANY, plugin.l3_agent_notifier.make_msg( diff --git a/neutron/tests/unit/test_l3_agent.py b/neutron/tests/unit/test_l3_agent.py index 9ca9d7c74..dc76926e4 100644 --- a/neutron/tests/unit/test_l3_agent.py +++ b/neutron/tests/unit/test_l3_agent.py @@ -31,6 +31,7 @@ from neutron.tests import base _uuid = uuidutils.generate_uuid HOSTNAME = 'myhost' +FAKE_ID = _uuid() class TestBasicRouterOperations(base.BaseTestCase): @@ -508,22 +509,29 @@ class TestBasicRouterOperations(base.BaseTestCase): agent._process_routers(routers) self.assertNotIn(routers[0]['id'], agent.router_info) - def testSingleLoopRouterRemoval(self): + def test_router_deleted(self): agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) - self.plugin_api.get_external_network_id.return_value = None - routers = [ - {'id': _uuid(), - 'admin_state_up': True, - 'routes': [], - 'external_gateway_info': {}}] - agent._process_routers(routers) + agent.router_deleted(None, FAKE_ID) + # verify that will set fullsync + self.assertTrue(FAKE_ID in agent.removed_routers) - agent.router_deleted(None, routers[0]['id']) - # verify that remove is called - self.assertEqual(self.mock_ip.get_devices.call_count, 1) + def test_routers_updated(self): + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent.routers_updated(None, [FAKE_ID]) + # verify that will set fullsync + self.assertTrue(FAKE_ID in agent.updated_routers) - self.device_exists.assert_has_calls( - [mock.call(self.conf.external_network_bridge)]) + def test_removed_from_agent(self): + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent.router_removed_from_agent(None, {'router_id': FAKE_ID}) + # verify that will set fullsync + self.assertTrue(FAKE_ID in agent.removed_routers) + + def test_added_to_agent(self): + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + agent.router_added_to_agent(None, [FAKE_ID]) + # verify that will set fullsync + self.assertTrue(FAKE_ID in agent.updated_routers) def testDestroyNamespace(self):