Merge "Revert "Revert "Fix RPC scale issue using cast instead of fanout v1"""

This commit is contained in:
Jenkins 2017-03-21 00:49:07 +00:00 committed by Gerrit Code Review
commit d2bf76beee
3 changed files with 73 additions and 25 deletions

View File

@ -17,6 +17,7 @@ from neutron_lib import context as neutron_context
from neutron_lib.plugins import directory
from neutron.common import rpc as n_rpc
from neutron.common import utils as n_utils
from oslo_config import cfg
from oslo_log import log as logging
@ -119,18 +120,28 @@ class FirewallAgentApi(object):
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def create_firewall(self, context, firewall):
cctxt = self.client.prepare(fanout=True)
def _prepare_rpc_client(self, host=None):
if host:
return self.client.prepare(server=host)
else:
# historical behaviour (RPC broadcast)
return self.client.prepare(fanout=True)
def create_firewall(self, context, firewall, host=None):
cctxt = self._prepare_rpc_client(host)
# TODO(blallau) host param is not used on agent side (to be removed)
cctxt.cast(context, 'create_firewall', firewall=firewall,
host=self.host)
def update_firewall(self, context, firewall):
cctxt = self.client.prepare(fanout=True)
def update_firewall(self, context, firewall, host=None):
cctxt = self._prepare_rpc_client(host)
# TODO(blallau) host param is not used on agent side (to be removed)
cctxt.cast(context, 'update_firewall', firewall=firewall,
host=self.host)
def delete_firewall(self, context, firewall):
cctxt = self.client.prepare(fanout=True)
def delete_firewall(self, context, firewall, host=None):
cctxt = self._prepare_rpc_client(host)
# TODO(blallau) host param is not used on agent side (to be removed)
cctxt.cast(context, 'delete_firewall', firewall=firewall,
host=self.host)
@ -166,6 +177,21 @@ class FirewallPlugin(
f_const.FIREWALL_PLUGIN, self.endpoints, fanout=False)
return self.conn.consume_in_threads()
def _get_hosts_to_notify(self, context, router_ids):
"""Returns all hosts to send notification about firewall update"""
l3_plugin = directory.get_plugin(nl_constants.L3)
no_broadcast = (
n_utils.is_extension_supported(
l3_plugin, nl_constants.L3_AGENT_SCHEDULER_EXT_ALIAS) and
getattr(l3_plugin, 'get_l3_agents_hosting_routers', False))
if no_broadcast:
agents = l3_plugin.get_l3_agents_hosting_routers(
context, router_ids, admin_state_up=True, active=True)
return [a.host for a in agents]
# NOTE(blallau): default: FirewallAgentAPI performs RPC broadcast
return [None]
def _rpc_update_firewall(self, context, firewall_id):
status_update = {"firewall": {"status": nl_constants.PENDING_UPDATE}}
super(FirewallPlugin, self).update_firewall(context, firewall_id,
@ -174,10 +200,14 @@ class FirewallPlugin(
firewall_id)
# this is triggered on an update to fw rule or policy, no
# change in associated routers.
fw_with_rules['add-router-ids'] = self.get_firewall_routers(
context, firewall_id)
fw_update_rtrs = self.get_firewall_routers(context, firewall_id)
fw_with_rules['add-router-ids'] = fw_update_rtrs
fw_with_rules['del-router-ids'] = []
self.agent_rpc.update_firewall(context, fw_with_rules)
hosts = self._get_hosts_to_notify(context, fw_update_rtrs)
for host in hosts:
self.agent_rpc.update_firewall(context, fw_with_rules,
host=host)
def _rpc_update_firewall_policy(self, context, firewall_policy_id):
firewall_policy = self.get_firewall_policy(context, firewall_policy_id)
@ -260,8 +290,10 @@ class FirewallPlugin(
fw_with_rules['add-router-ids'] = fw_new_rtrs
fw_with_rules['del-router-ids'] = []
self.agent_rpc.create_firewall(context, fw_with_rules)
hosts = self._get_hosts_to_notify(context, fw_new_rtrs)
for host in hosts:
self.agent_rpc.create_firewall(context, fw_with_rules,
host=host)
return fw
def update_firewall(self, context, id, firewall):
@ -317,8 +349,11 @@ class FirewallPlugin(
fw_with_rules['add-router-ids'],
fw_with_rules['del-router-ids'])
self.agent_rpc.update_firewall(context, fw_with_rules)
hosts = self._get_hosts_to_notify(context, list(
set(fw_new_rtrs).union(set(fw_current_rtrs))))
for host in hosts:
self.agent_rpc.update_firewall(context, fw_with_rules,
host=host)
return fw
def delete_db_firewall_object(self, context, id):
@ -328,8 +363,8 @@ class FirewallPlugin(
LOG.debug("delete_firewall() called on firewall %s", id)
fw_with_rules = (
self._make_firewall_dict_with_rules(context, id))
fw_with_rules['del-router-ids'] = self.get_firewall_routers(
context, id)
fw_delete_rtrs = self.get_firewall_routers(context, id)
fw_with_rules['del-router-ids'] = fw_delete_rtrs
fw_with_rules['add-router-ids'] = []
if not fw_with_rules['del-router-ids']:
# no routers to delete on the agent side
@ -339,7 +374,15 @@ class FirewallPlugin(
super(FirewallPlugin, self).update_firewall(context, id, status)
# Reflect state change in fw_with_rules
fw_with_rules['status'] = status['firewall']['status']
self.agent_rpc.delete_firewall(context, fw_with_rules)
hosts = self._get_hosts_to_notify(context, fw_delete_rtrs)
if hosts:
for host in hosts:
self.agent_rpc.delete_firewall(context, fw_with_rules,
host=host)
else:
# NOTE(blallau): we directly delete the firewall
# if router is not associated to an agent
self.delete_db_firewall_object(context, id)
def update_firewall_policy(self, context, id, firewall_policy):
LOG.debug("update_firewall_policy() called")

View File

@ -248,14 +248,13 @@ class FWaaSExtensionTestJSON(base.BaseFWaaSTest):
@decorators.idempotent_id('1355cf5c-77d4-4bb9-87d7-e50c194d08b5')
def test_firewall_insertion_mode_add_remove_router(self):
# Create routers
# Create legacy routers
router1 = self.create_router(
data_utils.rand_name('router-'),
admin_state_up=True)
router2 = self.create_router(
data_utils.rand_name('router-'),
admin_state_up=True)
# Create firewall on a router1
body = self.firewalls_client.create_firewall(
name=data_utils.rand_name("firewall"),
@ -267,6 +266,12 @@ class FWaaSExtensionTestJSON(base.BaseFWaaSTest):
self.assertEqual([router1['id']], created_firewall['router_ids'])
# Legacy routers are scheduled on L3 agents on network plug events
# Hence firewall resource will not became ready at this stage
network = self.create_network()
subnet = self.create_subnet(network)
self.routers_client.add_router_interface(router1['id'],
subnet_id=subnet['id'])
# Wait for the firewall resource to become ready
self._wait_until_ready(firewall_id)

View File

@ -74,7 +74,7 @@ class TestFirewallRouterInsertionBase(
plugin = 'neutron.tests.unit.extensions.test_l3.TestNoL3NatPlugin'
# the L3 service plugin
l3_plugin = ('neutron.tests.unit.extensions.test_l3.'
'TestL3NatServicePlugin')
'TestL3NatAgentSchedulingServicePlugin')
cfg.CONF.set_override('api_extensions_path', extensions_path)
self.saved_attr_map = {}
@ -316,26 +316,26 @@ class TestFirewallAgentApi(base.BaseTestCase):
self.assertEqual('topic', self.api.client.target.topic)
self.assertEqual('host', self.api.host)
def _call_test_helper(self, method_name):
def _call_test_helper(self, method_name, host):
with mock.patch.object(self.api.client, 'cast') as rpc_mock, \
mock.patch.object(self.api.client, 'prepare') as prepare_mock:
prepare_mock.return_value = self.api.client
getattr(self.api, method_name)(mock.sentinel.context, 'test')
getattr(self.api, method_name)(mock.sentinel.context, 'test', host)
prepare_args = {'fanout': True}
prepare_args = {'server': host}
prepare_mock.assert_called_once_with(**prepare_args)
rpc_mock.assert_called_once_with(mock.sentinel.context, method_name,
firewall='test', host='host')
def test_create_firewall(self):
self._call_test_helper('create_firewall')
self._call_test_helper('create_firewall', 'host')
def test_update_firewall(self):
self._call_test_helper('update_firewall')
self._call_test_helper('update_firewall', 'host')
def test_delete_firewall(self):
self._call_test_helper('delete_firewall')
self._call_test_helper('delete_firewall', 'host')
class TestFirewallPluginBase(TestFirewallRouterInsertionBase,