Support fwaasrouterinsertion extension

fwaasrouterinsertion extension allows to specify which routers implement
a firewall (on create/update). This changes adds its support by allowing
to:
* set routers with --router option in firewall-create/update commands,
* unset routers with --no-routers option in firewall-update command.

Change-Id: I654c1ddd4140a60b8a09237f7142ad211e951d92
Closes-Bug: #1435264
This commit is contained in:
Cedric Brandily 2015-04-02 11:32:31 +02:00
parent 9b5d39789f
commit 6588c42430
2 changed files with 71 additions and 2 deletions
neutronclient
neutron/v2_0/fw
tests/unit/fw

@ -63,15 +63,27 @@ class CreateFirewall(neutronv20.CreateCommand):
dest='admin_state',
action='store_false',
help=_('Set admin state up to false.'))
parser.add_argument(
'--router',
dest='routers',
metavar='ROUTER',
action='append',
help=_('Firewall associated router names or IDs (requires FWaaS '
'router insertion extension, this option can be repeated)'))
def args2body(self, parsed_args):
client = self.get_client()
_policy_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_policy',
client, 'firewall_policy',
parsed_args.firewall_policy_id)
body = {
self.resource: {
'firewall_policy_id': _policy_id,
'admin_state_up': parsed_args.admin_state, }, }
if parsed_args.routers:
body[self.resource]['router_ids'] = [
neutronv20.find_resourceid_by_name_or_id(client, 'router', r)
for r in parsed_args.routers]
neutronv20.update_dict(parsed_args, body[self.resource],
['name', 'description', 'shared',
'tenant_id'])
@ -87,14 +99,34 @@ class UpdateFirewall(neutronv20.UpdateCommand):
parser.add_argument(
'--policy', metavar='POLICY',
help=_('Firewall policy name or ID.'))
router_sg = parser.add_mutually_exclusive_group()
router_sg.add_argument(
'--router',
dest='routers',
metavar='ROUTER',
action='append',
help=_('Firewall associated router names or IDs (requires FWaaS '
'router insertion extension, this option can be repeated)'))
router_sg.add_argument(
'--no-routers',
action='store_true',
help=_('Associate no routers with the firewall (requires FWaaS '
'router insertion extension)'))
def args2body(self, parsed_args):
data = {}
client = self.get_client()
if parsed_args.policy:
_policy_id = neutronv20.find_resourceid_by_name_or_id(
self.get_client(), 'firewall_policy',
client, 'firewall_policy',
parsed_args.policy)
data['firewall_policy_id'] = _policy_id
if parsed_args.routers:
data['router_ids'] = [
neutronv20.find_resourceid_by_name_or_id(client, 'router', r)
for r in parsed_args.routers]
elif parsed_args.no_routers:
data['router_ids'] = []
return {self.resource: data}

@ -61,6 +61,19 @@ class CLITestV20FirewallJSON(test_cli20.CLITestV20Base):
shared=True, admin_state_up=False,
tenant_id=tenant_id)
def test_create_firewall_with_routers(self):
resource = 'firewall'
cmd = firewall.CreateFirewall(test_cli20.MyApp(sys.stdout), None)
name = 'my-name'
policy_id = 'my-policy-id'
my_id = 'my-id'
args = ['--router', 'fake-id', '--router', 'fake-name', policy_id]
router_ids = ['fake-id', 'fake-name']
position_names = ['firewall_policy_id', 'router_ids']
position_values = [policy_id, router_ids]
self._test_create_resource(resource, cmd, name, my_id, args,
position_names, position_values)
def test_list_firewalls(self):
"""firewall-list."""
resources = "firewalls"
@ -120,6 +133,30 @@ class CLITestV20FirewallJSON(test_cli20.CLITestV20Base):
['myid', '--policy', 'newpolicy'],
{'firewall_policy_id': 'newpolicy'})
def test_update_firewall_with_routers(self):
resource = 'firewall'
cmd = firewall.UpdateFirewall(test_cli20.MyApp(sys.stdout), None)
self._test_update_resource(
resource, cmd, 'myid',
['myid', '--router', 'fake-id', '--router', 'fake-name'],
{'router_ids': ['fake-id', 'fake-name']})
def test_update_firewall_with_no_routers(self):
resource = 'firewall'
cmd = firewall.UpdateFirewall(test_cli20.MyApp(sys.stdout), None)
self._test_update_resource(
resource, cmd, 'myid',
['myid', '--no-routers'], {'router_ids': []})
def test_update_firewall_with_bad_router_options(self):
resource = 'firewall'
cmd = firewall.UpdateFirewall(test_cli20.MyApp(sys.stdout), None)
self.assertRaises(
SystemExit,
self._test_update_resource,
resource, cmd, 'myid',
['myid', '--no-routers', '--router', 'fake-id'], {})
def test_delete_firewall(self):
"""firewall-delete my-id."""
resource = 'firewall'