FWaaS V2 Horizon Dashboard

Co-Authored-By: Yushiro FURUKAWA <y.furukawa_2@jp.fujitsu.com>
Change-Id: I4a650cb0aff29adfdf9fdac2db994ee0866cc54c
This commit is contained in:
Sarath Chandra Mekala 2017-06-20 20:39:37 +05:30 committed by Yushiro FURUKAWA
parent a59786a61c
commit 1cf1fa573f
46 changed files with 4163 additions and 30 deletions

View File

@ -5,16 +5,34 @@ DevStack plugin for neutron-fwaas-dashboard
This is setup as a DevStack plugin.
For more information on DevStack plugins,
see the `DevStack Plugins documentation
<https://docs.openstack.org/devstack/latest/plugins.html>`__.
<https://docs.openstack.org/developer/devstack/plugins.html>`__.
Common to FWaaS v1 and v2 dashboard
-----------------------------------
If neutron-fwaas-dashboard DevStack plugin is enabled,
Neutron FWaaS dashboard is automatically enabled and
the appropriate version of FWaaS panel is displayed based on
the FWaaS version enabled in your neutron server.
You do not need to specify FWaaS API version in the DevStack plugin
configuration.
How to enable FWaaS v2 dsashboard
---------------------------------
Add the following to the localrc section of your local.conf.
.. code-block:: none
[[local|localrc]]
enable_plugin neutron-fwaas https://git.openstack.org/openstack/neutron-fwaas master
enable_service q-fwaas-v2
enable_plugin neutron-fwaas-dashboard https://git.openstack.org/openstack/neutron-fwaas-dashboard master
How to enable FWaaS v1 dsashboard
---------------------------------
Add the following to the localrc section of your local.conf.
You need to configure FWaaS v1 DevStack plugin as well.
If ``q-fwaas-v1`` (or ``q-fwaas``) is enabled,
FWaaS v1 dashboard ``neutron-fwaas-v1-dashboard`` is automatically enabled.
.. code-block:: none
@ -22,12 +40,3 @@ FWaaS v1 dashboard ``neutron-fwaas-v1-dashboard`` is automatically enabled.
enable_plugin neutron-fwaas https://git.openstack.org/openstack/neutron-fwaas master
enable_service q-fwaas-v1
enable_plugin neutron-fwaas-dashboard https://git.openstack.org/openstack/neutron-fwaas-dashboard master
If you run horizon in a separate server from neutron server and
neutron-fwaas is not configured,
``neutron-fwaas-v1-dashboard`` is enabled by default.
How to enable FWaaS v2 dsashboard
---------------------------------
Coming soon.

View File

@ -9,9 +9,7 @@ function install_neutron_fwaas_dashboard {
}
function configure_neutron_fwaas_dashboard {
if is_service_enabled neutron-fwaas-v1-dashboard; then
cp -a $FWAAS_ENABLED_DIR/_7010_project_firewalls_panel.py $HORIZON_ENABLED_DIR
fi
cp -a $FWAAS_ENABLED_DIR/_[0-9]*.py $HORIZON_ENABLED_DIR
# NOTE: If locale directory does not exist, compilemessages will fail,
# so check for an existence of locale directory is required.
if [ -d $FWAAS_DASHBOARD_DIR/neutron_fwaas_dashboard/locale ]; then
@ -53,7 +51,7 @@ function _set_policy_file {
}
# check for service enabled
if is_service_enabled neutron-fwaas-v1-dashboard neutron-fwaas-v2-dashboard; then
if is_service_enabled neutron-fwaas-dashboard; then
if [[ "$1" == "stack" && "$2" == "pre-install" ]]; then
# Set up system services

View File

@ -1,8 +1,2 @@
# settings file for neutron-fwaas-dashboard plugin
if is_service_enabled q-fwaas q-fwaas-v1; then
enable_service neutron-fwaas-v1-dashboard
elif is_service_enabled q-fwaas-v2; then
enable_service neutron-fwaas-v2-dashboard
else
enable_service neutron-fwaas-v1-dashboard
fi
enable_service neutron-fwaas-dashboard

View File

@ -38,8 +38,8 @@ Enable the horizon plugin.
.. code-block:: console
$ cp neutron_fwaas_dashboard/enabled/_7010_project_firewalls_panel.py \
/opt/stack/horizon/openstack_dashboard/local/enabled/_7010_project_firewalls_panel.py
$ cp neutron_fwaas_dashboard/enabled/_70*_*.py \
/opt/stack/horizon/openstack_dashboard/local/enabled/
.. note::

View File

@ -0,0 +1,325 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from openstack_dashboard.api import neutron
from openstack_dashboard.contrib.developer.profiler import api as profiler
neutronclient = neutron.neutronclient
class Port(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron port."""
def get_dict(self):
port_dict = self._apidict
port_dict['port_id'] = port_dict['id']
return port_dict
class Rule(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron firewall rule."""
def get_dict(self):
rule_dict = self._apidict
rule_dict['rule_id'] = rule_dict['id']
return rule_dict
class Policy(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron firewall policy."""
def get_dict(self):
policy_dict = self._apidict
policy_dict['policy_id'] = policy_dict['id']
return policy_dict
class FirewallGroup(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron firewall group."""
def __init__(self, apiresource):
super(FirewallGroup, self).__init__(apiresource)
def get_dict(self):
firewallgroup_dict = self._apidict
firewallgroup_dict['firewallgroup_id'] = firewallgroup_dict['id']
return firewallgroup_dict
def rule_create(request, **kwargs):
"""Create a firewall rule
:param request: request context
:param name: name for rule
:param description: description for rule
:param protocol: protocol for rule
:param action: action for rule
:param source_ip_address: source IP address or subnet
:param source_port: integer in [1, 65535] or range in a:b
:param destination_ip_address: destination IP address or subnet
:param destination_port: integer in [1, 65535] or range in a:b
:param shared: boolean (default false)
:param enabled: boolean (default true)
:return: Rule object
"""
body = {'firewall_rule': kwargs}
rule = neutronclient(request).create_fwaas_firewall_rule(
body).get('firewall_rule')
return Rule(rule)
@profiler.trace
def rule_list(request, **kwargs):
return _rule_list(request, **kwargs)
@profiler.trace
def fwg_port_list_for_tenant(request, tenant_id, **kwargs):
kwargs['tenant_id'] = tenant_id
ports = neutronclient(request).list_ports(**kwargs).get('ports')
# TODO(SarathMekala): Remove ports which are already associated with a FWG
fwgs = neutronclient(request).list_fwaas_firewall_groups(
**kwargs).get('firewall_groups')
fwg_ports = []
for fwg in fwgs:
if not fwg['ports']:
continue
fwg_ports += fwg['ports']
return [Port(p) for p in ports
if _is_target(p) and p['id'] not in fwg_ports]
def _is_target(port):
return (port['device_owner'].startswith('compute:') or
port['device_owner'].startswith('network:router_interface'))
@profiler.trace
def rule_list_for_tenant(request, tenant_id, **kwargs):
"""Return a rule list available for the tenant.
The list contains rules owned by the tenant and shared rules.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
rules = rule_list(request, tenant_id=tenant_id, shared=False, **kwargs)
shared_rules = rule_list(request, shared=True, **kwargs)
return rules + shared_rules
def _rule_list(request, **kwargs):
rules = neutronclient(request).list_fwaas_firewall_rules(
**kwargs).get('firewall_rules')
return [Rule(r) for r in rules]
@profiler.trace
def rule_get(request, rule_id):
return _rule_get(request, rule_id)
def _rule_get(request, rule_id):
rule = neutronclient(request).show_fwaas_firewall_rule(
rule_id).get('firewall_rule')
return Rule(rule)
@profiler.trace
def rule_delete(request, rule_id):
neutronclient(request).delete_fwaas_firewall_rule(rule_id)
@profiler.trace
def rule_update(request, rule_id, **kwargs):
body = {'firewall_rule': kwargs}
rule = neutronclient(request).update_fwaas_firewall_rule(
rule_id, body).get('firewall_rule')
return Rule(rule)
@profiler.trace
def policy_create(request, **kwargs):
"""Create a firewall policy
:param request: request context
:param name: name for policy
:param description: description for policy
:param firewall_rules: ordered list of rules in policy
:param shared: boolean (default false)
:param audited: boolean (default false)
:return: Policy object
"""
body = {'firewall_policy': kwargs}
policy = neutronclient(request).create_fwaas_firewall_policy(
body).get('firewall_policy')
return Policy(policy)
@profiler.trace
def policy_list(request, **kwargs):
return _policy_list(request, expand_rule=True, **kwargs)
@profiler.trace
def policy_list_for_tenant(request, tenant_id, **kwargs):
"""Return a policy list available for the tenant.
The list contains policies owned by the tenant and shared policies.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
policies = policy_list(request, tenant_id=tenant_id,
shared=False, **kwargs)
shared_policies = policy_list(request, shared=True, **kwargs)
return policies + shared_policies
def _policy_list(request, expand_rule, **kwargs):
policies = neutronclient(request).list_fwaas_firewall_policies(
**kwargs).get('firewall_policies')
if expand_rule and policies:
rules = _rule_list(request)
rule_dict = collections.OrderedDict((rule.id, rule) for rule in rules)
for p in policies:
p['rules'] = [rule_dict.get(rule) for rule in p['firewall_rules']]
return [Policy(p) for p in policies]
@profiler.trace
def policy_get(request, policy_id):
return _policy_get(request, policy_id, expand_rule=True)
def _policy_get(request, policy_id, expand_rule):
policy = neutronclient(request).show_fwaas_firewall_policy(
policy_id).get('firewall_policy')
if expand_rule:
policy_rules = policy['firewall_rules']
if policy_rules:
rules = _rule_list(request, firewall_policy_id=policy_id)
rule_dict = collections.OrderedDict((rule.id, rule)
for rule in rules)
policy['rules'] = [rule_dict.get(rule) for rule in policy_rules]
else:
policy['rules'] = []
return Policy(policy)
@profiler.trace
def policy_delete(request, policy_id):
neutronclient(request).delete_fwaas_firewall_policy(policy_id)
@profiler.trace
def policy_update(request, policy_id, **kwargs):
body = {'firewall_policy': kwargs}
policy = neutronclient(request).update_fwaas_firewall_policy(
policy_id, body).get('firewall_policy')
return Policy(policy)
@profiler.trace
def policy_insert_rule(request, policy_id, **kwargs):
policy = neutronclient(request).insert_rule_fwaas_firewall_policy(
policy_id, kwargs)
return Policy(policy)
@profiler.trace
def policy_remove_rule(request, policy_id, **kwargs):
policy = neutronclient(request).remove_rule_fwaas_firewall_policy(
policy_id, kwargs)
return Policy(policy)
@profiler.trace
def firewall_group_create(request, **kwargs):
"""Create a firewall for specified policy
:param request: request context
:param name: name for firewall
:param description: description for firewall
:param firewall_policy_id: policy id used by firewall
:param shared: boolean (default false)
:param admin_state_up: boolean (default true)
:return: Firewall object
"""
body = {'firewall_group': kwargs}
firewall_group = neutronclient(request).create_fwaas_firewall_group(body)
return FirewallGroup(firewall_group['firewall_group'])
@profiler.trace
def firewall_list(request, **kwargs):
return _firewall_list(request, **kwargs)
@profiler.trace
def firewall_list_for_tenant(request, tenant_id, **kwargs):
"""Return a firewall list available for the tenant.
The list contains firewalls owned by the tenant and shared firewalls.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
fwg = firewall_list(request, tenant_id=tenant_id,
shared=False, **kwargs)
shared_fwg = firewall_list(request, shared=True, **kwargs)
return fwg + shared_fwg
# TODO(SarathMekala): Support expand_policy for _firewall_list
def _firewall_list(request, **kwargs):
firewall_groups = neutronclient(request).list_fwaas_firewall_groups(
**kwargs).get('firewall_groups')
return [FirewallGroup(f) for f in firewall_groups]
@profiler.trace
def firewall_get(request, firewall_id):
return _firewall_get(request, firewall_id, expand_policy=True)
def _firewall_get(request, firewallgroup_id, expand_policy):
firewall_group = neutronclient(request).show_fwaas_firewall_group(
firewallgroup_id).get('firewall_group')
if expand_policy:
ingress_policy_id = firewall_group['ingress_firewall_policy_id']
if ingress_policy_id:
firewall_group['ingress_policy'] = _policy_get(
request, ingress_policy_id, expand_rule=False)
else:
firewall_group['ingress_policy'] = None
egress_policy_id = firewall_group['egress_firewall_policy_id']
if egress_policy_id:
firewall_group['egress_policy'] = _policy_get(
request, egress_policy_id, expand_rule=False)
else:
firewall_group['egress_policy'] = None
return FirewallGroup(firewall_group)
@profiler.trace
def firewall_delete(request, firewallgroup_id):
neutronclient(request).delete_fwaas_firewall_group(firewallgroup_id)
@profiler.trace
def firewall_update(request, firewallgroup_id, **kwargs):
body = {'firewall_group': kwargs}
firewall_group = neutronclient(request).update_fwaas_firewall_group(
firewallgroup_id, body).get('firewall_group')
return FirewallGroup(firewall_group)

View File

@ -0,0 +1,408 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from operator import attrgetter
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import forms
from horizon import messages
from horizon.utils import validators
from neutron_fwaas_dashboard.api import fwaas_v2 as api_fwaas_v2
port_validator = validators.validate_port_or_colon_separated_port_range
LOG = logging.getLogger(__name__)
class UpdateRule(forms.SelfHandlingForm):
name = forms.CharField(max_length=80, label=_("Name"), required=False)
description = forms.CharField(
required=False,
max_length=80, label=_("Description"))
protocol = forms.ThemableChoiceField(
label=_("Protocol"),
choices=[('tcp', _('TCP')), ('udp', _('UDP')), ('icmp', _('ICMP')),
('any', _('ANY'))],
help_text=_('Protocol for the firewall rule'))
action = forms.ThemableChoiceField(
label=_("Action"),
choices=[('allow', _('ALLOW')), ('deny', _('DENY')),
('reject', _('REJECT'))],
help_text=_('Action for the firewall rule'))
source_ip_address = forms.IPField(
label=_("Source IP Address/Subnet"),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True,
help_text=_('Source IP address or subnet'))
destination_ip_address = forms.IPField(
label=_('Destination IP Address/Subnet'),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True,
help_text=_('Destination IP address or subnet'))
source_port = forms.CharField(
max_length=80,
label=_("Source Port/Port Range"),
required=False,
validators=[port_validator],
help_text=_('Source port (integer in [1, 65535] or range in a:b)'))
destination_port = forms.CharField(
max_length=80,
label=_("Destination Port/Port Range"),
required=False,
validators=[port_validator],
help_text=_('Destination port (integer in [1, 65535] or range'
' in a:b)'))
ip_version = forms.ThemableChoiceField(
label=_("IP Version"),
choices=[('4', '4'), ('6', '6')],
help_text=_('IP Version for Firewall Rule'))
shared = forms.BooleanField(label=_("Shared"), required=False)
enabled = forms.BooleanField(label=_("Enabled"), required=False)
failure_url = 'horizon:project:firewalls_v2:index'
def _convert_req_body(self, body):
for key in ['source_port', 'source_ip_address',
'destination_port', 'destination_ip_address']:
if key in body and not body[key]:
body[key] = None
if body.get('protocol') == 'any':
body['protocol'] = None
return body
def handle(self, request, context):
rule_id = self.initial['rule_id']
name_or_id = context.get('name') or rule_id
body = self._convert_req_body(_get_request_body(context, self.initial))
try:
rule = api_fwaas_v2.rule_update(request, rule_id, **body)
msg = _('Rule %s was successfully updated.') % name_or_id
messages.success(request, msg)
return rule
except Exception as e:
msg = (_('Failed to update rule %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class UpdatePolicy(forms.SelfHandlingForm):
name = forms.CharField(max_length=80, label=_("Name"), required=False)
description = forms.CharField(required=False,
max_length=80, label=_("Description"))
shared = forms.BooleanField(label=_("Shared"), required=False)
audited = forms.BooleanField(label=_("Audited"), required=False)
failure_url = 'horizon:project:firewalls_v2:index'
def handle(self, request, context):
policy_id = self.initial['policy_id']
name_or_id = context.get('name') or policy_id
body = _get_request_body(context, self.initial)
try:
policy = api_fwaas_v2.policy_update(request, policy_id, **body)
msg = _('Policy %s was successfully updated.') % name_or_id
messages.success(request, msg)
return policy
except Exception as e:
msg = (_('Failed to update policy %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class UpdateFirewall(forms.SelfHandlingForm):
name = forms.CharField(max_length=80,
label=_("Name"),
required=False)
description = forms.CharField(max_length=80,
label=_("Description"),
required=False)
ingress_firewall_policy_id = forms.ThemableChoiceField(
label=_("Ingress Policy"), required=False)
egress_firewall_policy_id = forms.ThemableChoiceField(
label=_("Egress Policy"), required=False)
admin_state_up = forms.BooleanField(label=_("Admin State"),
required=False)
shared = forms.BooleanField(label=_("Shared"), required=False)
failure_url = 'horizon:project:firewalls_v2:index'
def __init__(self, request, *args, **kwargs):
super(UpdateFirewall, self).__init__(request, *args, **kwargs)
try:
tenant_id = self.request.user.tenant_id
policies = api_fwaas_v2.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=attrgetter('name'))
except Exception:
exceptions.handle(request, _('Unable to retrieve policy list.'))
policies = []
egress_policy_id_choices = []
ingress_policy_id_choices = []
ingress_policy_id = kwargs['initial']['ingress_firewall_policy_id']
if ingress_policy_id:
ingress_policy_name = [
p.name for p in policies if p.id == ingress_policy_id][0]
ingress_policy_id_choices.append(
(ingress_policy_id, ingress_policy_name))
egress_policy_id = kwargs['initial']['egress_firewall_policy_id']
if egress_policy_id:
egress_policy_name = [
p.name for p in policies if p.id == egress_policy_id][0]
egress_policy_id_choices.append((egress_policy_id,
egress_policy_name))
ingress_policy_id_choices.append(('', _('None')))
egress_policy_id_choices.append(('', _('None')))
for p in policies:
if p.id != ingress_policy_id:
ingress_policy_id_choices.append((p.id, p.name_or_id))
if p.id != egress_policy_id:
egress_policy_id_choices.append((p.id, p.name_or_id))
self.fields['ingress_firewall_policy_id'].choices = \
ingress_policy_id_choices
self.fields['egress_firewall_policy_id'].choices = \
egress_policy_id_choices
def _convert_req_body(self, body):
for key in ['ingress_firewall_policy_id', 'egress_firewall_policy_id']:
if key in body and not body[key]:
body[key] = None
return body
def handle(self, request, context):
firewallgroup_id = self.initial['firewallgroup_id']
name_or_id = context.get('name') or firewallgroup_id
body = self._convert_req_body(_get_request_body(context, self.initial))
try:
firewall = api_fwaas_v2.firewall_update(request, firewallgroup_id,
**body)
msg = _('Firewall %s was successfully updated.') % name_or_id
messages.success(request, msg)
return firewall
except Exception as e:
msg = (_('Failed to update firewall %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class AddPort(forms.SelfHandlingForm):
failure_url = 'horizon:project:firewalls_v2:index'
port_id = forms.ThemableChoiceField(
label=_("Ports"), required=False)
def __init__(self, request, *args, **kwargs):
super(AddPort, self).__init__(request, *args, **kwargs)
try:
tenant_id = self.request.user.tenant_id
ports = api_fwaas_v2.fwg_port_list_for_tenant(request, tenant_id)
initial_ports = self.initial['ports']
filtered_ports = [port for port in ports
if port.id not in initial_ports]
filtered_ports = sorted(filtered_ports, key=attrgetter('name'))
except Exception:
exceptions.handle(request, _('Unable to retrieve port list.'))
ports = []
current_choices = [(p.id, p.name_or_id) for p in filtered_ports]
self.fields['port_id'].choices = current_choices
def handle(self, request, context):
firewallgroup_id = self.initial['firewallgroup_id']
name_or_id = context.get('name') or firewallgroup_id
body = _get_request_body(context, self.initial)
add_port = context['port_id']
if add_port:
ports = self.initial['ports']
ports.append(add_port)
body['ports'] = ports
try:
firewallgroup = api_fwaas_v2.firewall_update(
request, firewallgroup_id, **body)
msg = _('FirewallGroup %s was successfully updated.') % name_or_id
messages.success(request, msg)
return firewallgroup
except Exception as e:
msg = (_('Failed to update firewallgroup %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class RemovePort(forms.SelfHandlingForm):
failure_url = 'horizon:project:firewalls_v2:index'
port_id = forms.ThemableChoiceField(
label=_("Ports"), required=False)
def __init__(self, request, *args, **kwargs):
super(RemovePort, self).__init__(request, *args, **kwargs)
try:
ports = self.initial['ports']
except Exception:
exceptions.handle(request, _('Unable to retrieve port list.'))
ports = []
current_choices = [(p, p) for p in ports]
self.fields['port_id'].choices = current_choices
def handle(self, request, context):
firewallgroup_id = self.initial['firewallgroup_id']
name_or_id = context.get('name') or firewallgroup_id
body = _get_request_body(context, self.initial)
remove_port = context['port_id']
if remove_port:
ports = self.initial['ports']
ports.remove(remove_port)
body['ports'] = ports
try:
firewallgroup = api_fwaas_v2.firewall_update(
request, firewallgroup_id, **body)
msg = _('FirewallGroup %s was successfully updated.') % name_or_id
messages.success(request, msg)
return firewallgroup
except Exception as e:
msg = (_('Failed to update firewallgroup %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class InsertRuleToPolicy(forms.SelfHandlingForm):
firewall_rule_id = forms.ThemableChoiceField(label=_("Insert Rule"))
insert_before = forms.ThemableChoiceField(label=_("Before"),
required=False)
insert_after = forms.ThemableChoiceField(label=_("After"),
required=False)
failure_url = 'horizon:project:firewalls_v2:index'
def __init__(self, request, *args, **kwargs):
super(InsertRuleToPolicy, self).__init__(request, *args, **kwargs)
try:
tenant_id = self.request.user.tenant_id
all_rules = api_fwaas_v2.rule_list_for_tenant(request, tenant_id)
all_rules = sorted(all_rules, key=attrgetter('name_or_id'))
available_rules = [r for r in all_rules]
current_rules = []
for x in kwargs['initial']['firewall_rules']:
r_obj = [rule for rule in all_rules if x == rule.id][0]
current_rules.append(r_obj)
available_choices = [(r.id, r.name_or_id) for r in available_rules]
current_choices = [(r.id, r.name_or_id) for r in current_rules]
except Exception as e:
msg = _('Failed to retrieve available rules: %s') % e
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
self.fields['firewall_rule_id'].choices = available_choices
self.fields['insert_before'].choices = [('', _('-'))] + current_choices
self.fields['insert_after'].choices = [('', _('-'))] + current_choices
def handle(self, request, context):
policy_id = self.initial['policy_id']
policy_name_or_id = self.initial['name'] or policy_id
try:
insert_rule_id = context['firewall_rule_id']
insert_rule = api_fwaas_v2.rule_get(request, insert_rule_id)
body = {'firewall_rule_id': insert_rule_id,
'insert_before': context['insert_before'],
'insert_after': context['insert_after']}
policy = api_fwaas_v2.policy_insert_rule(request, policy_id,
**body)
msg = (_('Rule %(rule)s was successfully inserted to policy '
'%(policy)s.') %
{'rule': insert_rule.name or insert_rule.id,
'policy': policy_name_or_id})
messages.success(request, msg)
return policy
except Exception as e:
msg = (_('Failed to insert rule to policy %(name)s: %(reason)s') %
{'name': policy_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class RemoveRuleFromPolicy(forms.SelfHandlingForm):
firewall_rule_id = forms.ThemableChoiceField(label=_("Remove Rule"))
failure_url = 'horizon:project:firewalls_v2:index'
def __init__(self, request, *args, **kwargs):
super(RemoveRuleFromPolicy, self).__init__(request, *args, **kwargs)
try:
tenant_id = request.user.tenant_id
all_rules = api_fwaas_v2.rule_list_for_tenant(request, tenant_id)
current_rules = []
for r in kwargs['initial']['firewall_rules']:
r_obj = [rule for rule in all_rules if r == rule.id][0]
current_rules.append(r_obj)
current_choices = [(r.id, r.name_or_id) for r in current_rules]
except Exception as e:
msg = (_('Failed to retrieve current rules in policy %(name)s: '
'%(reason)s') %
{'name': self.initial['name'], 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
self.fields['firewall_rule_id'].choices = current_choices
def handle(self, request, context):
policy_id = self.initial['policy_id']
policy_name_or_id = self.initial['name'] or policy_id
try:
remove_rule_id = context['firewall_rule_id']
remove_rule = api_fwaas_v2.rule_get(request, remove_rule_id)
body = {'firewall_rule_id': remove_rule_id}
policy = api_fwaas_v2.policy_remove_rule(request, policy_id,
**body)
msg = (_('Rule %(rule)s was successfully removed from policy '
'%(policy)s.') %
{'rule': remove_rule.name or remove_rule.id,
'policy': policy_name_or_id})
messages.success(request, msg)
return policy
except Exception as e:
msg = (_('Failed to remove rule from policy %(name)s: %(reason)s')
% {'name': self.initial['name'], 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
def _get_request_body(context, initial_values):
body = {}
for key, value in context.items():
# TODO(yushiro): Refactor after Q-2.
if key == 'port_id':
continue
if value != initial_values[key]:
body[key] = value
return body

View File

@ -0,0 +1,43 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from django.utils.translation import ugettext_lazy as _
import horizon
from openstack_dashboard.api import neutron
LOG = logging.getLogger(__name__)
class Firewall_V2(horizon.Panel):
name = _("Firewall Groups")
slug = "firewalls_v2"
permissions = ('openstack.services.network',)
def allowed(self, context):
request = context['request']
if not request.user.has_perms(self.permissions):
return False
try:
if not neutron.is_extension_supported(request, 'fwaas_v2'):
return False
except Exception:
LOG.error("Call to list enabled services failed. This is likely "
"due to a problem communicating with the Neutron "
"endpoint. Firewalls panel will not be displayed.")
return False
if not super(Firewall_V2, self).allowed(context):
return False
return True

View File

@ -0,0 +1,479 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from django.core.urlresolvers import reverse
from django.template import defaultfilters as filters
from django.utils.translation import pgettext_lazy
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext_lazy
from horizon import exceptions
from horizon import tables
from openstack_dashboard import policy
from neutron_fwaas_dashboard.api import fwaas_v2 as api_fwaas_v2
LOG = logging.getLogger(__name__)
class AddRuleLink(tables.LinkAction):
name = "addrule"
verbose_name = _("Add Rule")
url = "horizon:project:firewalls_v2:addrule"
classes = ("ajax-modal",)
icon = "plus"
policy_rules = (("neutron-fwaas", "create_fwaas_firewall_rule"),)
class AddPolicyLink(tables.LinkAction):
name = "addpolicy"
verbose_name = _("Add Policy")
url = "horizon:project:firewalls_v2:addpolicy"
classes = ("ajax-modal", "btn-addpolicy",)
icon = "plus"
policy_rules = (("neutron-fwaas", "create_fwaas_firewall_policy"),)
class AddFirewallGroupLink(tables.LinkAction):
name = "addfirewallgroup"
verbose_name = _("Create Firewall Group")
url = "horizon:project:firewalls_v2:addfirewallgroup"
classes = ("ajax-modal",)
icon = "plus"
policy_rules = (("neutron-fwaas", "create_fwaas_firewall_group"),)
class DeleteRuleLink(policy.PolicyTargetMixin, tables.DeleteAction):
name = "deleterule"
policy_rules = (("neutron-fwaas", "delete_fwaas_firewall_rule"),)
@staticmethod
def action_present(count):
return ungettext_lazy(
u"Delete Rule",
u"Delete Rules",
count
)
@staticmethod
def action_past(count):
return ungettext_lazy(
u"Scheduled deletion of Rule",
u"Scheduled deletion of Rules",
count
)
def allowed(self, request, datum=None):
# TODO(Sarath Mekala): If the rule is associated with a policy then
# return false.
return True
def delete(self, request, obj_id):
try:
api_fwaas_v2.rule_delete(request, obj_id)
except Exception as e:
exceptions.handle(request, _('Unable to delete rule. %s') % e)
class DeletePolicyLink(policy.PolicyTargetMixin, tables.DeleteAction):
name = "deletepolicy"
policy_rules = (("neutron-fwaas", "delete_fwaas_firewall_policy"),)
@staticmethod
def action_present(count):
return ungettext_lazy(
u"Delete Policy",
u"Delete Policies",
count
)
@staticmethod
def action_past(count):
return ungettext_lazy(
u"Scheduled deletion of Policy",
u"Scheduled deletion of Policies",
count
)
def delete(self, request, obj_id):
try:
api_fwaas_v2.policy_delete(request, obj_id)
except Exception as e:
exceptions.handle(request, _('Unable to delete policy. %s') % e)
class DeleteFirewallGroupLink(policy.PolicyTargetMixin,
tables.DeleteAction):
name = "deletefirewallgroup"
policy_rules = (("neutron-fwaas", "delete_fwaas_firewall_group"),)
@staticmethod
def action_present(count):
return ungettext_lazy(
u"Delete Firewall Group",
u"Delete Firewall Groups",
count
)
@staticmethod
def action_past(count):
return ungettext_lazy(
u"Scheduled deletion of Firewall Group",
u"Scheduled deletion of Firewall Groups",
count
)
def delete(self, request, obj_id):
try:
api_fwaas_v2.firewall_delete(request, obj_id)
except Exception as e:
exceptions.handle(request, _('Unable to delete firewall. %s') % e)
class UpdateRuleLink(policy.PolicyTargetMixin, tables.LinkAction):
name = "updaterule"
verbose_name = _("Edit Rule")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_fwaas_firewall_rule"),)
def get_link_url(self, rule):
return reverse("horizon:project:firewalls_v2:updaterule",
kwargs={'rule_id': rule.id})
class UpdatePolicyLink(policy.PolicyTargetMixin, tables.LinkAction):
name = "updatepolicy"
verbose_name = _("Edit Policy")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_fwaas_firewall_policy"),)
def get_link_url(self, policy):
return reverse("horizon:project:firewalls_v2:updatepolicy",
kwargs={'policy_id': policy.id})
class UpdateFirewallGroupLink(policy.PolicyTargetMixin, tables.LinkAction):
name = "updatefirewall"
verbose_name = _("Edit Firewall Group")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_firewall"),)
def get_link_url(self, firewallgroup):
return reverse("horizon:project:firewalls_v2:updatefirewall",
kwargs={'firewall_id': firewallgroup.id})
def allowed(self, request, firewallgroup):
return firewallgroup.status not in (
"PENDING_CREATE",
"PENDING_UPDATE",
"PENDING_DELETE")
class InsertRuleToPolicyLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "insertrule"
verbose_name = _("Insert Rule")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "show_fwaas_firewall_policy"),
("neutron-fwaas", "insert_rule_fwaas_firewall_policy"),)
def get_link_url(self, policy):
return reverse("horizon:project:firewalls_v2:insertrule",
kwargs={'policy_id': policy.id})
class RemoveRuleFromPolicyLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "removerule"
verbose_name = _("Remove Rule")
classes = ("ajax-modal",)
policy_rules = (("neutron-fwaas", "show_fwaas_firewall_policy"),
("neutron-fwaas", "firewall_policy_remove_rule"),)
action_type = "danger"
def get_link_url(self, policy):
return reverse("horizon:project:firewalls_v2:removerule",
kwargs={'policy_id': policy.id})
def allowed(self, request, policy):
return bool(policy.rules)
class AddPortToFirewallGroupLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "addport"
verbose_name = _("Add Port")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_fwaas_firewall_group"),)
def get_link_url(self, firewallgroup):
return reverse("horizon:project:firewalls_v2:addport",
kwargs={'firewallgroup_id': firewallgroup.id})
def allowed(self, request, firewallgroup):
return firewallgroup.status not in (
"PENDING_CREATE",
"PENDING_UPDATE",
"PENDING_DELETE")
class RemovePortFromFirewallGroupLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "removeport"
verbose_name = _("Remove Port")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_fwaas_firewall_group"),)
def get_link_url(self, firewallgroup):
return reverse("horizon:project:firewalls_v2:removeport",
kwargs={'firewallgroup_id': firewallgroup.id})
def allowed(self, request, firewallgroup):
return firewallgroup.status not in (
"PENDING_CREATE",
"PENDING_UPDATE",
"PENDING_DELETE")
class AddEgressPolicyToFirewallLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "addegresspolicy"
verbose_name = _("Add Egress Policy")
classes = ("ajax-modal",)
policy_rules = (("neutron-fwaas", "show_fwaas_firewall_policy"),
("neutron-fwaas", "firewall_policy_remove_rule"),)
action_type = "danger"
def get_link_url(self, policy):
return reverse("horizon:project:firewalls_v2:removerule",
kwargs={'policy_id': policy.id})
class RemoveEgressPolicyToFirewallLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "removeegresspolicy"
verbose_name = _("Remove Egress Policy")
classes = ("ajax-modal",)
policy_rules = (("neutron-fwaas", "show_fwaas_firewall_policy"),
("neutron-fwaas", "firewall_policy_remove_rule"),)
action_type = "danger"
def get_link_url(self, policy):
return reverse("horizon:project:firewalls_v2:removerule",
kwargs={'policy_id': policy.id})
class AddIngressPolicyToFirewallLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "addingresspolicy"
verbose_name = _("Add Ingress Policy")
classes = ("ajax-modal",)
policy_rules = (("neutron-fwaas", "show_fwaas_firewall_policy"),
("neutron-fwaas", "firewall_policy_remove_rule"),)
action_type = "danger"
def get_link_url(self, policy):
return reverse("horizon:project:firewalls_v2:removerule",
kwargs={'policy_id': policy.id})
class RemoveIngressPolicyToFirewallLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "removeingresspolicy"
verbose_name = _("Remove Ingress Policy")
classes = ("ajax-modal",)
policy_rules = (("neutron-fwaas", "show_fwaas_firewall_policy"),
("neutron-fwaas", "firewall_policy_remove_rule"),)
action_type = "danger"
def get_link_url(self, policy):
return reverse("horizon:project:firewalls_v2:removerule",
kwargs={'policy_id': policy.id})
def get_rules_name(datum):
return ', '.join([rule.name or rule.id[:13]
for rule in datum.rules])
def get_ports_name(datum):
return ', '.join([port[:13]
for port in datum.ports])
def get_policy_name(datum):
if datum.policy:
return datum.policy.name or datum.policy.id
def get_policy_link(datum):
if datum.policy:
return reverse('horizon:project:firewalls_v2:policydetails',
kwargs={'policy_id': datum.policy.id})
def get_ingress_policy_link(datum):
if datum.ingress_firewall_policy_id:
return reverse('horizon:project:firewalls_v2:policydetails',
kwargs={'policy_id': datum.ingress_firewall_policy_id})
def get_egress_policy_link(datum):
if datum.egress_firewall_policy_id:
return reverse('horizon:project:firewalls_v2:policydetails',
kwargs={'policy_id': datum.egress_firewall_policy_id})
def get_ingress_policy_name(datum):
if datum.ingress_firewall_policy_id:
return datum.ingress_policy.name
def get_egress_policy_name(datum):
if datum.egress_firewall_policy_id:
return datum.egress_policy.name
class RulesTable(tables.DataTable):
ACTION_DISPLAY_CHOICES = (
("Allow", pgettext_lazy("Action Name of a Firewall Rule", u"ALLOW")),
("Deny", pgettext_lazy("Action Name of a Firewall Rule", u"DENY")),
("Reject", pgettext_lazy("Action Name of a Firewall Rule", u"REJECT")),
)
name = tables.Column("name_or_id",
verbose_name=_("Name"),
link="horizon:project:firewalls_v2:ruledetails")
description = tables.Column('description', verbose_name=_('Description'))
protocol = tables.Column("protocol",
filters=(lambda v: filters.default(v, _("ANY")),
filters.upper,),
verbose_name=_("Protocol"))
source_ip_address = tables.Column("source_ip_address",
verbose_name=_("Source IP"))
source_port = tables.Column("source_port",
verbose_name=_("Source Port"))
destination_ip_address = tables.Column("destination_ip_address",
verbose_name=_("Destination IP"))
destination_port = tables.Column("destination_port",
verbose_name=_("Destination Port"))
action = tables.Column("action",
display_choices=ACTION_DISPLAY_CHOICES,
verbose_name=_("Action"))
shared = tables.Column("shared",
verbose_name=_("Shared"),
filters=(filters.yesno, filters.capfirst))
enabled = tables.Column("enabled",
verbose_name=_("Enabled"),
filters=(filters.yesno, filters.capfirst))
class Meta(object):
name = "rulestable"
verbose_name = _("Rules")
table_actions = (AddRuleLink,
DeleteRuleLink,
tables.NameFilterAction)
row_actions = (UpdateRuleLink, DeleteRuleLink)
class PoliciesTable(tables.DataTable):
name = tables.Column("name_or_id",
verbose_name=_("Name"),
link="horizon:project:firewalls_v2:policydetails")
description = tables.Column('description', verbose_name=_('Description'))
firewall_rules = tables.Column(get_rules_name,
verbose_name=_("Rules"))
shared = tables.Column("shared",
verbose_name=_("Shared"),
filters=(filters.yesno, filters.capfirst))
audited = tables.Column("audited",
verbose_name=_("Audited"),
filters=(filters.yesno, filters.capfirst))
class Meta(object):
name = "policiestable"
verbose_name = _("Policies")
table_actions = (AddPolicyLink,
DeletePolicyLink,
tables.NameFilterAction)
row_actions = (UpdatePolicyLink, InsertRuleToPolicyLink,
RemoveRuleFromPolicyLink, DeletePolicyLink)
class FirewallGroupsTable(tables.DataTable):
STATUS_DISPLAY_CHOICES = (
("Active", pgettext_lazy("Current status of a Firewall Group",
u"Active")),
("Down", pgettext_lazy("Current status of a Firewall Group",
u"Down")),
("Error", pgettext_lazy("Current status of a Firewall Group",
u"Error")),
("Created", pgettext_lazy("Current status of a Firewall Group",
u"Created")),
("Pending_Create", pgettext_lazy("Current status of a Firewall Group",
u"Pending Create")),
("Pending_Update", pgettext_lazy("Current status of a Firewall Group",
u"Pending Update")),
("Pending_Delete", pgettext_lazy("Current status of a Firewall Group",
u"Pending Delete")),
("Inactive", pgettext_lazy("Current status of a Firewall Group",
u"Inactive")),
)
ADMIN_STATE_DISPLAY_CHOICES = (
("UP", pgettext_lazy("Admin state of a Firewall Group", u"UP")),
("DOWN", pgettext_lazy("Admin state of a Firewall Group", u"DOWN")),
)
name = tables.Column(
"name_or_id",
verbose_name=_("Name"),
link="horizon:project:firewalls_v2:firewallgroupdetails")
description = tables.Column('description', verbose_name=_('Description'))
ingress_firewall_policy_id = tables.Column(
get_ingress_policy_name,
link=get_ingress_policy_link,
verbose_name=_("Ingress Policy"))
egress_firewall_policy_id = tables.Column(get_egress_policy_name,
link=get_egress_policy_link,
verbose_name=_("Egress Policy"))
ports = tables.Column(get_ports_name,
verbose_name=_("Ports"))
status = tables.Column("status",
verbose_name=_("Status"),
display_choices=STATUS_DISPLAY_CHOICES)
admin_state_up = tables.Column("admin_state_up",
verbose_name=_("Admin State"))
shared = tables.Column("shared",
verbose_name=_("Shared"),
filters=(filters.yesno, filters.capfirst))
class Meta(object):
name = "FirewallGroupsTable"
verbose_name = _("Firewall Groups")
table_actions = (AddFirewallGroupLink,
DeleteFirewallGroupLink,
tables.NameFilterAction)
row_actions = (
UpdateFirewallGroupLink,
DeleteFirewallGroupLink,
AddPortToFirewallGroupLink,
RemovePortFromFirewallGroupLink)
def __init__(self, request, data=None, needs_form_wrapper=None, **kwargs):
super(FirewallGroupsTable, self).__init__(
request, data=data,
needs_form_wrapper=needs_form_wrapper, **kwargs)

View File

@ -0,0 +1,144 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import tabs
from neutron_fwaas_dashboard.api import fwaas_v2 as api_fwaas_v2
from neutron_fwaas_dashboard.dashboards.project.firewalls_v2 import tables
FirewallGroupsTable = tables.FirewallGroupsTable
PoliciesTable = tables.PoliciesTable
RulesTable = tables.RulesTable
class RulesTab(tabs.TableTab):
table_classes = (RulesTable,)
name = _("Firewall Rules")
slug = "rules"
template_name = "horizon/common/_detail_table.html"
def get_rulestable_data(self):
try:
tenant_id = self.request.user.tenant_id
request = self.tab_group.request
rules = api_fwaas_v2.rule_list_for_tenant(request, tenant_id)
except Exception:
rules = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve rules list.'))
return rules
class PoliciesTab(tabs.TableTab):
table_classes = (PoliciesTable,)
name = _("Firewall Policies")
slug = "policies"
template_name = "horizon/common/_detail_table.html"
def get_policiestable_data(self):
try:
tenant_id = self.request.user.tenant_id
request = self.tab_group.request
policies = api_fwaas_v2.policy_list_for_tenant(request, tenant_id)
except Exception:
policies = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve policies list.'))
return policies
class FirewallGroupsTab(tabs.TableTab):
table_classes = (FirewallGroupsTable,)
name = _("Firewall Groups")
slug = "firewallgroups"
template_name = "horizon/common/_detail_table.html"
def get_policy_dict(self, policies):
return dict((policy.id, policy) for policy in policies)
def get_FirewallGroupsTable_data(self):
try:
tenant_id = self.request.user.tenant_id
request = self.tab_group.request
fw_groups = api_fwaas_v2.firewall_list_for_tenant(request,
tenant_id)
tenant_policies = api_fwaas_v2.policy_list_for_tenant(
request, tenant_id)
policy_dict = self.get_policy_dict(policies=tenant_policies)
for fw_group in fw_groups:
if fw_group['ingress_firewall_policy_id'] in policy_dict:
fw_group.ingress_policy = \
policy_dict[fw_group['ingress_firewall_policy_id']]
if fw_group['egress_firewall_policy_id'] in policy_dict:
fw_group.egress_policy = \
policy_dict[fw_group['egress_firewall_policy_id']]
except Exception:
fw_groups = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve firewall list.'))
return fw_groups
class RuleDetailsTab(tabs.Tab):
name = _("Rule")
slug = "ruledetails"
template_name = "project/firewalls_v2/_rule_details.html"
def get_context_data(self, request):
return {"rule": self.tab_group.kwargs['rule']}
class PolicyDetailsTab(tabs.Tab):
name = _("Policy")
slug = "policydetails"
template_name = "project/firewalls_v2/_policy_details.html"
def get_context_data(self, request):
return {"policy": self.tab_group.kwargs['policy']}
class FirewallGroupDetailsTab(tabs.Tab):
name = _("FirewallGroup")
slug = "firewallgroupdetails"
template_name = "project/firewalls_v2/_firewallgroup_details.html"
def get_context_data(self, request):
return {"firewall_group": self.tab_group.kwargs['firewallgroup']}
class FirewallGroupTabs(tabs.TabGroup):
slug = "fwtabs"
tabs = (FirewallGroupsTab, PoliciesTab, RulesTab)
sticky = True
class RuleDetailsTabs(tabs.TabGroup):
slug = "ruletabs"
tabs = (RuleDetailsTab,)
class PolicyDetailsTabs(tabs.TabGroup):
slug = "policytabs"
tabs = (PolicyDetailsTab,)
class FirewallGroupDetailsTabs(tabs.TabGroup):
slug = "firewallgrouptabs"
tabs = (FirewallGroupDetailsTab,)

View File

@ -0,0 +1,7 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "You may add ports to firewall group here." %}</p>
{% endblock %}

View File

@ -0,0 +1,37 @@
{% load i18n sizeformat parse_date %}
<div class="detail">
<dl class="dl-horizontal">
<dt>{% trans "Name" %}</dt>
<dd>{{ firewall_group.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ firewall_group.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ firewall_group.id }} </dd>
<dt>{% trans "Project ID" %}</dt>
<dd>{{ firewall_group.tenant_id }}</dd>
<dt>{% trans "Ingress Policy ID" %}</dt>
<dd>
{% url 'horizon:project:firewalls_v2:policydetails' firewall_group.ingress_firewall_policy_id as policy_url %}
<a href="{{ policy_url }}">{{firewall_group.ingress_policy.name|default:firewall_group.ingress_firewall_policy_id }}</a>
</dd>
<dt>{% trans "Egress Policy ID" %}</dt>
<dd>
{% url 'horizon:project:firewalls_v2:policydetails' firewall_group.egress_firewall_policy_id as policy_url %}
<a href="{{ policy_url }}">{{firewall_group.egress_policy.name|default:firewall_group.egress_firewall_policy_id }}</a>
</dd>
<dt>{% trans "Status" %}</dt>
<dd>{{ firewall_group.status }}</dd>
<dt>{% trans "Admin State Up" %}</dt>
<dd>{{ firewall_group.admin_state_up}}</dd>
<dt>{% trans "Shared" %}</dt>
<dd>{{ firewall_group.shared|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -0,0 +1,12 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% blocktrans trimmed %}
Choose the rule you want to insert.
Specify either the rule you want to insert immediately before,
or the rule to insert immediately after.
If both are specified, the prior takes precedence.
{% endblocktrans %}</p>
{% endblock %}

View File

@ -0,0 +1,35 @@
{% load i18n sizeformat parse_date %}
<div class="detail">
<dl class="dl-horizontal">
<dt>{% trans "Name" %}</dt>
<dd>{{ policy.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ policy.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ policy.id }}</dd>
<dt>{% trans "Project ID" %}</dt>
<dd>{{ policy.tenant_id }}</dd>
<dt>{% trans "Rules" %}</dt>
<dd>
{% if policy.rules %}
{% for rule in policy.rules %}
{% url 'horizon:project:firewalls_v2:ruledetails' rule.id as rule_url %}
{{ rule.position }} : <a href="{{ rule_url }}">{{ rule.name|default:rule.id }}</a><br>
{% endfor %}
{% else %}
{% trans "-" %}
{% endif %}
</dd>
<dt>{% trans "Shared" %}</dt>
<dd>{{ policy.shared|yesno|capfirst }}</dd>
<dt>{% trans "Audited" %}</dt>
<dd>{{ policy.audited|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -0,0 +1,7 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "Choose the rule you want to remove." %}</p>
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "You may remove ports from firewall group here." %}</p>
{% endblock %}

View File

@ -0,0 +1,41 @@
{% load i18n sizeformat parse_date %}
<div class="detail">
<dl class="dl-horizontal">
<dt>{% trans "Name" %}</dt>
<dd>{{ rule.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ rule.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ rule.id }}</dd>
<dt>{% trans "Project ID" %}</dt>
<dd>{{ rule.tenant_id }}</dd>
<dt>{% trans "Action" %}</dt>
<dd>{{ rule.action|upper }}</dd>
<dt>{% trans "Protocol" %}</dt>
<dd>{{ rule.protocol|default:_("ANY")|upper }}</dd>
<dt>{% trans "Source IP Address" %}</dt>
<dd>{{ rule.source_ip_address|default:_("ANY") }}</dd>
<dt>{% trans "Source Port" %}</dt>
<dd>{{ rule.source_port|default:_("ANY") }}</dd>
<dt>{% trans "Destination IP Address" %}</dt>
<dd>{{ rule.destination_ip_address|default:_("ANY") }}</dd>
<dt>{% trans "Destination Port"%}</dt>
<dd>{{ rule.destination_port|default:_("ANY") }}</dd>
<dt>{% trans "Shared" %}</dt>
<dd>{{ rule.shared|yesno|capfirst }}</dd>
<dt>{% trans "Enabled" %}</dt>
<dd>{{ rule.enabled|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -0,0 +1,3 @@
{% load i18n %}
<p>{% blocktrans %}Choose port(s) from Available Ports. {% endblocktrans %}</p>

View File

@ -0,0 +1,23 @@
{% load i18n %}
<noscript><h3>{{ step }}</h3></noscript>
<div id="portListSortContainer">
<div class="col-sm-6">
<h4>{% trans "Available Ports" %}</h4>
<ul id="available_port" class="portlist"></ul>
</div>
<div class="col-sm-6">
{% include "project/firewalls_v2/_update_port_help.html" %}
</div>
</div>
<div id="portListIdContainer">
<div class="actions">
<div id="portListId">
{% include "horizon/common/_form_fields.html" %}
</div>
</div>
<div class="help_text">
{{ step.get_help_text }}
</div>
</div>

View File

@ -0,0 +1,6 @@
{% load i18n horizon %}
<p>{% blocktrans trimmed %}
Choose rule(s) from Available Rules to Selected Rule by push button or
drag and drop, you may change their order by drag and drop as well.
{% endblocktrans %}</p>

View File

@ -0,0 +1,25 @@
{% load i18n %}
<noscript><h3>{{ step }}</h3></noscript>
<div id="ruleListSortContainer">
<div class="col-sm-6">
<h4 id="selected_rule_label">{% trans "Selected Rules" %}</h4>
<ul id="selected_rule" class="rulelist"></ul>
<h4>{% trans "Available Rules" %}</h4>
<ul id="available_rule" class="rulelist"></ul>
</div>
<div class="col-sm-6">
{% include "project/firewalls_v2/_update_rule_help.html" %}
</div>
</div>
<div id="ruleListIdContainer">
<div class="actions">
<div id="ruleListId">
{% include "horizon/common/_form_fields.html" %}
</div>
</div>
<div class="help_text">
{{ step.get_help_text }}
</div>
</div>

View File

@ -0,0 +1,7 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "You may update firewall group details here." %}</p>
{% endblock %}

View File

@ -0,0 +1,10 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% blocktrans trimmed %}
You may update policy details here.
Use 'Insert Rule' or 'Remove Rule' links instead to insert or remove a rule.
{% endblocktrans %}</p>
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "You may update rule details here." %}</p>
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add New Firewall Group" %}{% endblock %}
{% block main %}
{% include 'horizon/common/_workflow.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add New Policy" %}{% endblock %}
{% block main %}
{% include 'horizon/common/_workflow.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add Port to Firewall Group" %}{% endblock %}
{% block main %}
{% include 'project/firewalls_v2/_addport.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add New Rule" %}{% endblock %}
{% block main %}
{% include 'horizon/common/_workflow.html' %}
{% endblock %}

View File

@ -0,0 +1,11 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Firewall Groups" %}{% endblock %}
{% block main %}
<div class="row">
<div class="col-sm-12">
{{ tab_group.render }}
</div>
</div>
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Insert Rule to Policy" %}{% endblock %}
{% block main %}
{% include 'project/firewalls_v2/_insert_rule_to_policy.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Remove Rule from Policy" %}{% endblock %}
{% block main %}
{% include 'project/firewalls_v2/_remove_rule_from_policy.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Remove Port from Firewall Group" %}{% endblock %}
{% block main %}
{% include 'project/firewalls_v2/_removeport.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Edit Firewall Group" %}{% endblock %}
{% block main %}
{% include 'project/firewalls_v2/_updatefirewall.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Edit Policy" %}{% endblock %}
{% block main %}
{% include 'project/firewalls_v2/_updatepolicy.html' %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Edit Rule" %}{% endblock %}
{% block main %}
{% include 'project/firewalls_v2/_updaterule.html' %}
{% endblock %}

View File

@ -0,0 +1,740 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.core.urlresolvers import reverse
from django import http
from mox3.mox import IsA
from openstack_dashboard.api import neutron as api_neutron
from neutron_fwaas_dashboard.api import fwaas_v2 as api_fwaas_v2
from neutron_fwaas_dashboard.test import helpers as test
class FirewallTests(test.TestCase):
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
INDEX_URL = reverse('horizon:project:firewalls_v2:index')
ADDRULE_PATH = 'horizon:project:firewalls_v2:addrule'
ADDPOLICY_PATH = 'horizon:project:firewalls_v2:addpolicy'
ADDFIREWALLGROUP_PATH = 'horizon:project:firewalls_v2:addfirewallgroup'
RULE_DETAIL_PATH = 'horizon:project:firewalls_v2:ruledetails'
POLICY_DETAIL_PATH = 'horizon:project:firewalls_v2:policydetails'
FIREWALLGROUP_DETAIL_PATH = \
'horizon:project:firewalls_v2:firewallgroupdetails'
UPDATERULE_PATH = 'horizon:project:firewalls_v2:updaterule'
UPDATEPOLICY_PATH = 'horizon:project:firewalls_v2:updatepolicy'
UPDATEFIREWALLGROUP_PATH = 'horizon:project:firewalls_v2:updatefirewall'
INSERTRULE_PATH = 'horizon:project:firewalls_v2:insertrule'
REMOVERULE_PATH = 'horizon:project:firewalls_v2:removerule'
ADDPORT_PATH = 'horizon:project:firewalls_v2:addport'
REMOVEPORT_PATH = 'horizon:project:firewalls_v2:removeport'
def set_up_expect(self):
tenant_id = self.tenant.id
# retrieves firewallgroups
firewallgroups = self.firewall_groups_v2.list()
api_fwaas_v2.firewall_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(firewallgroups)
# retrieves policies
# TODO(amotoki): get_firewallgroupstable_data() also calls
# policy_list_for_tenant(). This needs to be clean up.
policies = self.fw_policies_v2.list()
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
# retrieve rules
api_fwaas_v2.rule_list_for_tenant(
IsA(http.HttpRequest),
tenant_id).AndReturn(self.fw_rules_v2.list())
def set_up_expect_with_exception(self):
tenant_id = self.tenant.id
api_fwaas_v2.rule_list_for_tenant(
IsA(http.HttpRequest),
tenant_id).AndRaise(self.exceptions.neutron)
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest),
tenant_id).AndRaise(self.exceptions.neutron)
api_fwaas_v2.firewall_list_for_tenant(
IsA(http.HttpRequest),
tenant_id).AndRaise(self.exceptions.neutron)
@test.create_stubs({api_fwaas_v2: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',),
api_neutron: ('is_extension_supported',), })
def test_index_firewallgroups(self):
self.set_up_expect()
self.mox.ReplayAll()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL, tenant_id=tenant_id)
self.assertTemplateUsed(res, 'project/firewalls_v2/details_tabs.html')
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data),
len(self.firewall_groups_v2.list()))
@test.create_stubs({api_fwaas_v2: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',),
api_neutron: ('is_extension_supported',), })
def test_index_policies(self):
self.set_up_expect()
self.mox.ReplayAll()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__policies',
tenant_id=tenant_id)
self.assertTemplateUsed(res, 'project/firewalls_v2/details_tabs.html')
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data),
len(self.fw_policies_v2.list()))
@test.create_stubs({api_fwaas_v2: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',),
api_neutron: ('is_extension_supported',), })
def test_index_rules(self):
self.set_up_expect()
self.mox.ReplayAll()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__rules',
tenant_id=tenant_id)
self.assertTemplateUsed(res, 'project/firewalls_v2/details_tabs.html')
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['rulestable_table'].data),
len(self.fw_rules_v2.list()))
@test.create_stubs({api_fwaas_v2: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant'),
api_neutron: ('is_extension_supported',), })
def test_index_exception_firewallgroups(self):
self.set_up_expect_with_exception()
self.mox.ReplayAll()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL, tenant_id=tenant_id)
self.assertTemplateUsed(res, 'project/firewalls_v2/details_tabs.html')
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data), 0)
@test.create_stubs({api_fwaas_v2: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant'),
api_neutron: ('is_extension_supported',), })
def test_index_exception_policies(self):
self.set_up_expect_with_exception()
self.mox.ReplayAll()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__policies',
tenant_id=tenant_id)
self.assertTemplateUsed(res, 'project/firewalls_v2/details_tabs.html')
self.assertTemplateUsed(res,
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data), 0)
@test.create_stubs({api_fwaas_v2: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant'),
api_neutron: ('is_extension_supported',), })
def test_index_exception_rules(self):
self.set_up_expect_with_exception()
self.mox.ReplayAll()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__rules',
tenant_id=tenant_id)
self.assertTemplateUsed(res, 'project/firewalls_v2/details_tabs.html')
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['rulestable_table'].data), 0)
@test.create_stubs({api_fwaas_v2: ('rule_create',), })
def test_add_rule_post(self):
rule1 = self.fw_rules_v2.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': rule1.ip_version
}
api_fwaas_v2.rule_create(
IsA(http.HttpRequest), **form_data).AndReturn(rule1)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('rule_create',), })
def test_add_rule_post_src_None(self):
rule1 = self.fw_rules_v2.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': rule1.ip_version
}
api_fwaas_v2.rule_create(
IsA(http.HttpRequest), **form_data).AndReturn(rule1)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('rule_create',), })
def test_add_rule_post_dest_None(self):
rule1 = self.fw_rules_v2.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': rule1.ip_version
}
api_fwaas_v2.rule_create(
IsA(http.HttpRequest), **form_data).AndReturn(rule1)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
def test_add_rule_post_with_error(self):
rule1 = self.fw_rules_v2.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': 'abc',
'action': 'pass',
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': 6
}
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertFormErrors(res, 3)
@test.create_stubs({api_fwaas_v2: ('policy_create',
'rule_list_for_tenant'), })
def test_add_policy_post(self):
policy = self.fw_policies_v2.first()
rules = self.fw_rules_v2.list()
tenant_id = self.tenant.id
form_data = {'name': policy.name,
'description': policy.description,
'firewall_rules': policy.firewall_rules,
'shared': policy.shared,
'audited': policy.audited
}
post_data = {'name': policy.name,
'description': policy.description,
'rule': policy.firewall_rules,
'shared': policy.shared,
'audited': policy.audited
}
# NOTE: SelectRulesAction.populate_rule_choices() lists rule not
# associated with any policy. We need to ensure that rules specified
# in policy.firewall_rules in post_data (above) are not associated
# with any policy. Test data in neutron_data is data in a stable state,
# so we need to modify here.
for rule in rules:
if rule.id in policy.firewall_rules:
rule.firewall_policy_id = rule.policy = None
api_fwaas_v2.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api_fwaas_v2.policy_create(
IsA(http.HttpRequest), **form_data).AndReturn(policy)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDPOLICY_PATH), post_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('policy_create',
'rule_list_for_tenant'), })
def test_add_policy_post_with_error(self):
policy = self.fw_policies_v2.first()
rules = self.fw_rules_v2.list()
tenant_id = self.tenant.id
form_data = {'description': policy.description,
'firewall_rules': None,
'shared': policy.shared,
'audited': policy.audited
}
api_fwaas_v2.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDPOLICY_PATH), form_data)
self.assertFormErrors(res, 1)
@test.create_stubs({api_fwaas_v2: ('firewall_group_create',
'policy_list_for_tenant',
'fwg_port_list_for_tenant',),
api_neutron: ('is_extension_supported',), })
def test_add_firewall_group_post(self):
firewall_group = self.firewall_groups_v2.first()
policies = self.fw_policies_v2.list()
tenant_id = self.tenant.id
form_data = {'name': firewall_group.name,
'description': firewall_group.description,
'ingress_firewall_policy_id':
firewall_group.ingress_firewall_policy_id,
'egress_firewall_policy_id':
firewall_group.egress_firewall_policy_id,
'admin_state_up': firewall_group.admin_state_up
}
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api_fwaas_v2.fwg_port_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn([])
api_fwaas_v2.firewall_group_create(
IsA(http.HttpRequest), **form_data).AndReturn(firewall_group)
self.mox.ReplayAll()
res = self.client.post(reverse(self.ADDFIREWALLGROUP_PATH), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
# TODO(SarathMekala) : Fix this test.
# @test.create_stubs({api_fwaas_v2: ('firewall_group_create',
# 'policy_list_for_tenant',
# 'fwg_port_list_for_tenant',),
# api_neutron: ('is_extension_supported',), })
# def test_add_firewall_post_with_error(self):
# firewall_groups = self.firewall_groups_v2.first()
# policies = self.fw_policies_v2.list()
# tenant_id = self.tenant.id
# form_data = {'name': firewall_groups.name,
# 'description': firewall_groups.description,
# 'admin_state_up': False
# }
# api_fwaas_v2.policy_list_for_tenant(
# IsA(http.HttpRequest), tenant_id).AndReturn(policies)
#
# self.mox.ReplayAll()
# res = self.client.post(reverse(self.ADDFIREWALLGROUP_PATH), form_data)
#
# self.assertFormErrors(res, 1)
@test.create_stubs({api_fwaas_v2: ('rule_get',)})
def test_update_rule_get(self):
rule = self.fw_rules_v2.first()
api_fwaas_v2.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule)
self.mox.ReplayAll()
res = self.client.get(reverse(self.UPDATERULE_PATH, args=(rule.id,)))
self.assertTemplateUsed(res, 'project/firewalls_v2/updaterule.html')
@test.create_stubs({api_fwaas_v2: ('rule_get', 'rule_update')})
def test_update_rule_post(self):
rule = self.fw_rules_v2.first()
api_fwaas_v2.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule)
data = {'name': 'new name',
'description': 'new desc',
'protocol': 'icmp',
'action': 'allow',
'shared': False,
'enabled': True,
'ip_version': rule.ip_version,
'source_ip_address': rule.source_ip_address,
'destination_ip_address': None,
'source_port': None,
'destination_port': rule.destination_port,
}
api_fwaas_v2.rule_update(IsA(http.HttpRequest), rule.id, **data)\
.AndReturn(rule)
self.mox.ReplayAll()
form_data = data.copy()
form_data['destination_ip_address'] = ''
form_data['source_port'] = ''
res = self.client.post(
reverse(self.UPDATERULE_PATH, args=(rule.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('rule_get', 'rule_update')})
def test_update_protocol_any_rule_post(self):
# protocol any means protocol == None in neutron context.
rule = self.fw_rules_v2.get(protocol=None)
api_fwaas_v2.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule)
data = {'name': 'new name',
'description': 'new desc',
'protocol': 'icmp',
'action': 'allow',
'shared': False,
'enabled': True,
'ip_version': rule.ip_version,
'source_ip_address': rule.source_ip_address,
'destination_ip_address': None,
'source_port': None,
'destination_port': rule.destination_port,
}
api_fwaas_v2.rule_update(IsA(http.HttpRequest), rule.id, **data)\
.AndReturn(rule)
self.mox.ReplayAll()
form_data = data.copy()
form_data['destination_ip_address'] = ''
form_data['source_port'] = ''
res = self.client.post(
reverse(self.UPDATERULE_PATH, args=(rule.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('rule_get', 'rule_update')})
def test_update_rule_protocol_to_ANY_post(self):
rule = self.fw_rules_v2.first()
api_fwaas_v2.rule_get(IsA(http.HttpRequest), rule.id).AndReturn(rule)
data = {'name': 'new name',
'description': 'new desc',
'protocol': None,
'action': 'allow',
'shared': False,
'enabled': True,
'ip_version': rule.ip_version,
'source_ip_address': rule.source_ip_address,
'destination_ip_address': None,
'source_port': None,
'destination_port': rule.destination_port,
}
api_fwaas_v2.rule_update(IsA(http.HttpRequest), rule.id, **data)\
.AndReturn(rule)
self.mox.ReplayAll()
form_data = data.copy()
form_data['destination_ip_address'] = ''
form_data['source_port'] = ''
form_data['protocol'] = 'any'
res = self.client.post(
reverse(self.UPDATERULE_PATH, args=(rule.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('policy_get',)})
def test_update_policy_get(self):
policy = self.fw_policies_v2.first()
api_fwaas_v2.policy_get(IsA(http.HttpRequest),
policy.id).AndReturn(policy)
self.mox.ReplayAll()
res = self.client.get(
reverse(self.UPDATEPOLICY_PATH, args=(policy.id,)))
self.assertTemplateUsed(res, 'project/firewalls_v2/updatepolicy.html')
@test.create_stubs({api_fwaas_v2: ('policy_get', 'policy_update',
'rule_list_for_tenant')})
def test_update_policy_post(self):
policy = self.fw_policies_v2.first()
api_fwaas_v2.policy_get(IsA(http.HttpRequest),
policy.id).AndReturn(policy)
data = {'name': 'new name',
'description': 'new desc',
'shared': True,
'audited': False
}
api_fwaas_v2.policy_update(IsA(http.HttpRequest), policy.id, **data)\
.AndReturn(policy)
self.mox.ReplayAll()
res = self.client.post(
reverse(self.UPDATEPOLICY_PATH, args=(policy.id,)), data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('firewall_get',
'policy_list_for_tenant')})
def test_update_firewall_group_get(self):
firewall_group = self.firewall_groups_v2.first()
policies = self.fw_policies_v2.list()
tenant_id = self.tenant.id
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api_fwaas_v2.firewall_get(IsA(http.HttpRequest),
firewall_group.id).AndReturn(firewall_group)
self.mox.ReplayAll()
res = self.client.get(
reverse(self.UPDATEFIREWALLGROUP_PATH, args=(firewall_group.id,)))
self.assertTemplateUsed(res,
'project/firewalls_v2/updatefirewall.html')
@test.create_stubs({api_fwaas_v2: ('firewall_get',
'policy_list_for_tenant',
'firewall_update')})
def test_update_firewall_post(self):
firewall_group = self.firewall_groups_v2.first()
tenant_id = self.tenant.id
api_fwaas_v2.firewall_get(IsA(http.HttpRequest),
firewall_group.id).AndReturn(firewall_group)
data = {'name': 'new name',
'description': 'new desc',
'ingress_firewall_policy_id':
firewall_group.ingress_firewall_policy_id,
'admin_state_up': False
}
policies = self.fw_policies_v2.list()
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(policies)
api_fwaas_v2.firewall_update(
IsA(http.HttpRequest), firewall_group.id, **data)\
.AndReturn(firewall_group)
self.mox.ReplayAll()
res = self.client.post(
reverse(
self.UPDATEFIREWALLGROUP_PATH,
args=(
firewall_group.id,
)),
data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('policy_get', 'policy_insert_rule',
'rule_list_for_tenant', 'rule_get')})
def test_policy_insert_rule(self):
policy = self.fw_policies_v2.first()
tenant_id = self.tenant.id
rules = self.fw_rules_v2.list()
new_rule_id = rules[2].id
data = {'firewall_rule_id': new_rule_id,
'insert_before': rules[1].id,
'insert_after': rules[0].id}
api_fwaas_v2.policy_get(IsA(http.HttpRequest),
policy.id).AndReturn(policy)
policy.firewall_rules = [rules[0].id,
new_rule_id,
rules[1].id]
api_fwaas_v2.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api_fwaas_v2.rule_get(
IsA(http.HttpRequest), new_rule_id).AndReturn(rules[2])
api_fwaas_v2.policy_insert_rule(
IsA(http.HttpRequest), policy.id, **data) .AndReturn(policy)
self.mox.ReplayAll()
res = self.client.post(
reverse(self.INSERTRULE_PATH, args=(policy.id,)), data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('policy_get', 'policy_remove_rule',
'rule_list_for_tenant', 'rule_get')})
def test_policy_remove_rule(self):
policy = self.fw_policies_v2.first()
tenant_id = self.tenant.id
rules = self.fw_rules_v2.list()
remove_rule_id = policy.firewall_rules[0]
left_rule_id = policy.firewall_rules[1]
data = {'firewall_rule_id': remove_rule_id}
after_remove_policy_dict = {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'tenant_id': '1',
'name': 'policy1',
'description': 'policy description',
'firewall_rules': [left_rule_id],
'audited': True,
'shared': True}
after_remove_policy = api_fwaas_v2.Policy(after_remove_policy_dict)
api_fwaas_v2.policy_get(IsA(http.HttpRequest),
policy.id).AndReturn(policy)
api_fwaas_v2.rule_list_for_tenant(
IsA(http.HttpRequest), tenant_id).AndReturn(rules)
api_fwaas_v2.rule_get(
IsA(http.HttpRequest), remove_rule_id).AndReturn(rules[0])
api_fwaas_v2.policy_remove_rule(IsA(http.HttpRequest), policy.id, **data)\
.AndReturn(after_remove_policy)
self.mox.ReplayAll()
res = self.client.post(
reverse(self.REMOVERULE_PATH, args=(policy.id,)), data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
@test.create_stubs({api_fwaas_v2: ('rule_list_for_tenant',
'rule_delete'),
api_neutron: ('is_extension_supported',)})
def test_delete_rule(self):
rule = self.fw_rules_v2.list()[2]
api_fwaas_v2.rule_list_for_tenant(
IsA(http.HttpRequest),
self.tenant.id).AndReturn(self.fw_rules_v2.list())
api_fwaas_v2.rule_delete(IsA(http.HttpRequest), rule.id)
self.mox.ReplayAll()
form_data = {"action": "rulestable__deleterule__%s" % rule.id}
res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res)
@test.create_stubs({api_fwaas_v2: ('policy_list_for_tenant',
'policy_delete'),
api_neutron: ('is_extension_supported',)})
def test_delete_policy(self):
policy = self.fw_policies_v2.first()
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest),
self.tenant.id).AndReturn(self.fw_policies_v2.list())
api_fwaas_v2.policy_delete(IsA(http.HttpRequest), policy.id)
self.mox.ReplayAll()
form_data = {"action": "policiestable__deletepolicy__%s" % policy.id}
res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res)
@test.create_stubs({api_fwaas_v2: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'firewall_delete',),
api_neutron: ('is_extension_supported',)})
def test_delete_firewall_group(self):
fwl = self.firewall_groups_v2.first()
api_fwaas_v2.firewall_list_for_tenant(
IsA(http.HttpRequest), self.tenant.id).AndReturn([fwl])
api_fwaas_v2.policy_list_for_tenant(
IsA(http.HttpRequest),
self.tenant.id).AndReturn(self.fw_policies_v2.list())
api_fwaas_v2.firewall_delete(IsA(http.HttpRequest), fwl.id)
self.mox.ReplayAll()
form_data = {
"action": "FirewallGroupsTable__deletefirewallgroup__%s" %
fwl.id}
res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res)

View File

@ -0,0 +1,53 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.conf.urls import url
from neutron_fwaas_dashboard.dashboards.project.firewalls_v2 import views
# TODO(Sarath Mekala) : Fix 'firewall' --> 'firewallgroup' in URLs as
# well as in other places.
urlpatterns = [
url(r'^$', views.IndexView.as_view(), name='index'),
url(r'^\?tab=fwtabs__firewalls$',
views.IndexView.as_view(), name='firewalls'),
url(r'^\?tab=fwtabs__rules$', views.IndexView.as_view(), name='rules'),
url(r'^\?tab=fwtabs__policies$',
views.IndexView.as_view(), name='policies'),
url(r'^addrule$', views.AddRuleView.as_view(), name='addrule'),
url(r'^addpolicy$', views.AddPolicyView.as_view(), name='addpolicy'),
url(r'^addfirewallgroup$',
views.AddFirewallGroupView.as_view(),
name='addfirewallgroup'),
url(r'^insertrule/(?P<policy_id>[^/]+)/$',
views.InsertRuleToPolicyView.as_view(), name='insertrule'),
url(r'^removerule/(?P<policy_id>[^/]+)/$',
views.RemoveRuleFromPolicyView.as_view(), name='removerule'),
url(r'^updaterule/(?P<rule_id>[^/]+)/$',
views.UpdateRuleView.as_view(), name='updaterule'),
url(r'^updatepolicy/(?P<policy_id>[^/]+)/$',
views.UpdatePolicyView.as_view(), name='updatepolicy'),
url(r'^updatefirewall/(?P<firewall_id>[^/]+)/$',
views.UpdateFirewallView.as_view(), name='updatefirewall'),
url(r'^addport/(?P<firewallgroup_id>[^/]+)/$',
views.AddPortView.as_view(), name='addport'),
url(r'^removeport/(?P<firewallgroup_id>[^/]+)/$',
views.RemovePortView.as_view(), name='removeport'),
url(r'^rule/(?P<rule_id>[^/]+)/$',
views.RuleDetailsView.as_view(), name='ruledetails'),
url(r'^policy/(?P<policy_id>[^/]+)/$',
views.PolicyDetailsView.as_view(), name='policydetails'),
url(r'^firewallgroup/(?P<firewallgroup_id>[^/]+)/$',
views.FirewallGroupDetailsView.as_view(), name='firewallgroupdetails'),
]

View File

@ -0,0 +1,479 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.core.urlresolvers import reverse
from django.core.urlresolvers import reverse_lazy
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import forms
from horizon import tabs
from horizon.utils import memoized
from horizon import workflows
from neutron_fwaas_dashboard.api import fwaas_v2 as api_fwaas_v2
from neutron_fwaas_dashboard.dashboards.project.firewalls_v2 \
import forms as fw_forms
from neutron_fwaas_dashboard.dashboards.project.firewalls_v2 \
import tabs as fw_tabs
from neutron_fwaas_dashboard.dashboards.project.firewalls_v2 \
import workflows as fw_workflows
InsertRuleToPolicy = fw_forms.InsertRuleToPolicy
RemoveRuleFromPolicy = fw_forms.RemoveRuleFromPolicy
UpdateFirewall = fw_forms.UpdateFirewall
UpdatePolicy = fw_forms.UpdatePolicy
UpdateRule = fw_forms.UpdateRule
AddPort = fw_forms.AddPort
RemovePort = fw_forms.RemovePort
FirewallGroupDetailsTabs = fw_tabs.FirewallGroupDetailsTabs
FirewallGroupTabs = fw_tabs.FirewallGroupTabs
PolicyDetailsTabs = fw_tabs.PolicyDetailsTabs
RuleDetailsTabs = fw_tabs.RuleDetailsTabs
AddFirewallGroup = fw_workflows.AddFirewallGroup
AddPolicy = fw_workflows.AddPolicy
AddRule = fw_workflows.AddRule
class IndexView(tabs.TabbedTableView):
tab_group_class = FirewallGroupTabs
template_name = 'project/firewalls_v2/details_tabs.html'
page_title = _("Firewall Groups")
class AddRuleView(workflows.WorkflowView):
workflow_class = AddRule
template_name = "project/firewalls_v2/addrule.html"
page_title = _("Add New Rule")
class AddPolicyView(workflows.WorkflowView):
workflow_class = AddPolicy
template_name = "project/firewalls_v2/addpolicy.html"
page_title = _("Add New Policy")
class AddFirewallGroupView(workflows.WorkflowView):
workflow_class = AddFirewallGroup
template_name = "project/firewalls_v2/addfirewallgroup.html"
page_title = _("Add New Firewall Group")
class RuleDetailsView(tabs.TabView):
tab_group_class = (RuleDetailsTabs)
template_name = 'horizon/common/_detail.html'
page_title = "{{ rule.name|default:rule.id }}"
failure_url = reverse_lazy('horizon:project:firewalls_v2:index')
def get_context_data(self, **kwargs):
context = super(RuleDetailsView, self).get_context_data(**kwargs)
rule = self.get_data()
table = fw_tabs.RulesTable(self.request)
breadcrumb = [
(_("Rules"), reverse_lazy('horizon:project:firewalls_v2:rules'))]
context["custom_breadcrumb"] = breadcrumb
context["rule"] = rule
context["url"] = self.failure_url
context["actions"] = table.render_row_actions(rule)
return context
@memoized.memoized_method
def get_data(self):
try:
rule_id = self.kwargs['rule_id']
rule = api_fwaas_v2.rule_get(self.request, rule_id)
except Exception:
exceptions.handle(self.request,
_('Unable to retrieve rule details.'),
redirect=self.failure_url)
return rule
def get_tabs(self, request, *args, **kwargs):
rule = self.get_data()
return self.tab_group_class(request, rule=rule, **kwargs)
class PolicyDetailsView(tabs.TabView):
tab_group_class = (PolicyDetailsTabs)
template_name = 'horizon/common/_detail.html'
page_title = "{{ policy.name|default:policy.id }}"
failure_url = reverse_lazy('horizon:project:firewalls_v2:index')
def get_context_data(self, **kwargs):
context = super(PolicyDetailsView, self).get_context_data(**kwargs)
policy = self.get_data()
table = fw_tabs.PoliciesTable(self.request)
breadcrumb = [
(_("Policies"),
reverse_lazy('horizon:project:firewalls_v2:policies'))]
context["custom_breadcrumb"] = breadcrumb
context["policy"] = policy
context["url"] = self.failure_url
context["actions"] = table.render_row_actions(policy)
return context
@memoized.memoized_method
def get_data(self):
try:
policy_id = self.kwargs['policy_id']
policy = api_fwaas_v2.policy_get(self.request, policy_id)
except Exception:
exceptions.handle(self.request,
_('Unable to retrieve policy details.'),
redirect=self.failure_url)
return policy
def get_tabs(self, request, *args, **kwargs):
policy = self.get_data()
return self.tab_group_class(request, policy=policy, **kwargs)
class FirewallGroupDetailsView(tabs.TabView):
tab_group_class = (FirewallGroupDetailsTabs)
template_name = 'horizon/common/_detail.html'
page_title = "{{ firewall_group.name|default:firewall_group.id }}"
failure_url = reverse_lazy('horizon:project:firewalls_v2:index')
def get_context_data(self, **kwargs):
context = super(FirewallGroupDetailsView, self) \
.get_context_data(**kwargs)
firewall_group = self.get_data()
table = fw_tabs.FirewallGroupsTable(self.request)
context["firewall_group"] = firewall_group
context["url"] = self.failure_url
context["actions"] = table.render_row_actions(firewall_group)
return context
@memoized.memoized_method
def get_data(self):
try:
firewallgroup_id = self.kwargs['firewallgroup_id']
firewall_group = api_fwaas_v2.firewall_get(self.request,
firewallgroup_id)
except Exception:
exceptions.handle(self.request,
_('Unable to retrieve firewall details.'),
redirect=self.failure_url)
return firewall_group
def get_tabs(self, request, *args, **kwargs):
firewall_group = self.get_data()
return self.tab_group_class(request, firewallgroup=firewall_group,
**kwargs)
class UpdateRuleView(forms.ModalFormView):
form_class = UpdateRule
form_id = "update_rule_form"
template_name = "project/firewalls_v2/updaterule.html"
context_object_name = 'rule'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls_v2:updaterule"
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
page_title = _("Edit Rule {{ name }}")
def get_context_data(self, **kwargs):
context = super(UpdateRuleView, self).get_context_data(**kwargs)
context['rule_id'] = self.kwargs['rule_id']
args = (self.kwargs['rule_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
rule_id = self.kwargs['rule_id']
try:
rule = api_fwaas_v2.rule_get(self.request, rule_id)
return rule
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve rule details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
rule = self._get_object()
initial = rule.get_dict()
protocol = initial['protocol']
initial['protocol'] = protocol.upper() if protocol else 'ANY'
initial['action'] = initial['action'].upper()
return initial
class UpdatePolicyView(forms.ModalFormView):
form_class = UpdatePolicy
form_id = "update_policy_form"
template_name = "project/firewalls_v2/updatepolicy.html"
context_object_name = 'policy'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls_v2:updatepolicy"
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
page_title = _("Edit Policy {{ name }}")
def get_context_data(self, **kwargs):
context = super(UpdatePolicyView, self).get_context_data(**kwargs)
context["policy_id"] = self.kwargs['policy_id']
args = (self.kwargs['policy_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
policy_id = self.kwargs['policy_id']
try:
policy = api_fwaas_v2.policy_get(self.request, policy_id)
return policy
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve policy details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
policy = self._get_object()
initial = policy.get_dict()
return initial
class UpdateFirewallView(forms.ModalFormView):
form_class = UpdateFirewall
form_id = "update_firewall_form"
template_name = "project/firewalls_v2/updatefirewall.html"
context_object_name = 'firewall'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls_v2:updatefirewall"
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
page_title = _("Edit FirewallGroup {{ name }}")
def get_context_data(self, **kwargs):
context = super(UpdateFirewallView, self).get_context_data(**kwargs)
context["firewall_id"] = self.kwargs['firewall_id']
args = (self.kwargs['firewall_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
firewall_id = self.kwargs['firewall_id']
try:
firewall = api_fwaas_v2.firewall_get(self.request,
firewall_id)
return firewall
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve firewall details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
firewall = self._get_object()
initial = firewall.get_dict()
return initial
class AddPortView(forms.ModalFormView):
form_class = AddPort
form_id = "update_firewall_port_form"
template_name = "project/firewalls_v2/addport.html"
context_object_name = 'firewallgroup'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls_v2:addport"
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
page_title = _("Add port to FirewallGroup {{ name }}")
def get_context_data(self, **kwargs):
context = super(AddPortView, self).get_context_data(**kwargs)
context["firewallgroup_id"] = self.kwargs['firewallgroup_id']
args = (self.kwargs['firewallgroup_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
firewallgroup_id = self.kwargs['firewallgroup_id']
try:
firewallgroup = api_fwaas_v2.firewall_get(self.request,
firewallgroup_id)
return firewallgroup
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve firewallgroup details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
firewallgroup = self._get_object()
initial = firewallgroup.get_dict()
return initial
class RemovePortView(forms.ModalFormView):
form_class = RemovePort
form_id = "update_firewall_port_form"
template_name = "project/firewalls_v2/removeport.html"
context_object_name = 'firewallgroup'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls_v2:removeport"
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
page_title = _("Remove port from FirewallGroup {{ name }}")
def get_context_data(self, **kwargs):
context = super(RemovePortView, self).get_context_data(**kwargs)
context["firewallgroup_id"] = self.kwargs['firewallgroup_id']
args = (self.kwargs['firewallgroup_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
firewallgroup_id = self.kwargs['firewallgroup_id']
try:
firewallgroup = api_fwaas_v2.firewall_get(self.request,
firewallgroup_id)
return firewallgroup
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve firewallgroup details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
firewallgroup = self._get_object()
initial = firewallgroup.get_dict()
return initial
class InsertRuleToPolicyView(forms.ModalFormView):
form_class = InsertRuleToPolicy
form_id = "update_policy_form"
template_name = "project/firewalls_v2/insert_rule_to_policy.html"
context_object_name = 'policy'
submit_url = "horizon:project:firewalls_v2:insertrule"
submit_label = _("Save Changes")
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
page_title = _("Insert Rule to Policy")
def get_context_data(self, **kwargs):
context = super(InsertRuleToPolicyView,
self).get_context_data(**kwargs)
context["policy_id"] = self.kwargs['policy_id']
args = (self.kwargs['policy_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
policy_id = self.kwargs['policy_id']
try:
policy = api_fwaas_v2.policy_get(self.request, policy_id)
return policy
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve policy details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
policy = self._get_object()
initial = policy.get_dict()
initial['policy_id'] = initial['id']
return initial
class RemoveRuleFromPolicyView(forms.ModalFormView):
form_class = RemoveRuleFromPolicy
form_id = "update_policy_form"
template_name = "project/firewalls_v2/remove_rule_from_policy.html"
context_object_name = 'policy'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls_v2:removerule"
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
page_title = _("Remove Rule from Policy")
def get_context_data(self, **kwargs):
context = super(RemoveRuleFromPolicyView,
self).get_context_data(**kwargs)
context["policy_id"] = self.kwargs['policy_id']
args = (self.kwargs['policy_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
policy_id = self.kwargs['policy_id']
try:
policy = api_fwaas_v2.policy_get(self.request, policy_id)
return policy
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve policy details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
policy = self._get_object()
initial = policy.get_dict()
initial['policy_id'] = initial['id']
return initial
class RouterCommonView(forms.ModalFormView):
form_id = "update_firewall_form"
context_object_name = 'firewall'
submit_label = _("Save Changes")
success_url = reverse_lazy("horizon:project:firewalls_v2:index")
def get_context_data(self, **kwargs):
context = super(RouterCommonView,
self).get_context_data(**kwargs)
context["firewall_id"] = self.kwargs['firewall_id']
args = (self.kwargs['firewall_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
firewall_id = self.kwargs['firewall_id']
try:
firewall = api_fwaas_v2.firewall_get(self.request, firewall_id)
return firewall
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve firewall details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
firewall = self._get_object()
initial = firewall.get_dict()
return initial

View File

@ -0,0 +1,387 @@
# Copyright 2017, Juniper Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from operator import attrgetter
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import forms
from horizon.utils import validators
from horizon import workflows
import netaddr
from neutron_fwaas_dashboard.api import fwaas_v2 as api_fwaas_v2
port_validator = validators.validate_port_or_colon_separated_port_range
class AddRuleAction(workflows.Action):
name = forms.CharField(
max_length=80,
label=_("Name"),
required=False)
description = forms.CharField(
max_length=80,
label=_("Description"),
required=False)
protocol = forms.ThemableChoiceField(
label=_("Protocol"),
choices=[('tcp', _('TCP')),
('udp', _('UDP')),
('icmp', _('ICMP')),
('any', _('ANY'))],)
action = forms.ThemableChoiceField(
label=_("Action"),
choices=[('allow', _('ALLOW')),
('deny', _('DENY')),
('reject', _('REJECT'))],)
source_ip_address = forms.IPField(
label=_("Source IP Address/Subnet"),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True)
destination_ip_address = forms.IPField(
label=_("Destination IP Address/Subnet"),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True)
source_port = forms.CharField(
max_length=80,
label=_("Source Port/Port Range"),
required=False,
validators=[port_validator])
destination_port = forms.CharField(
max_length=80,
label=_("Destination Port/Port Range"),
required=False,
validators=[port_validator])
ip_version = forms.ThemableChoiceField(
label=_("IP Version"), required=False,
choices=[('4', '4'), ('6', '6')])
shared = forms.BooleanField(
label=_("Shared"), initial=False, required=False)
enabled = forms.BooleanField(
label=_("Enabled"), initial=True, required=False)
def _check_ip_addr_and_ip_version(self, cleaned_data):
ip_version = int(str(cleaned_data.get('ip_version')))
src_ip = cleaned_data.get('source_ip_address')
dst_ip = cleaned_data.get('destination_ip_address')
msg = _('Source/Destination Network Address and IP version '
'are inconsistent. Please make them consistent.')
if (src_ip and
netaddr.IPNetwork(src_ip).version != ip_version):
self._errors['ip_version'] = self.error_class([msg])
elif (dst_ip and
netaddr.IPNetwork(dst_ip).version != ip_version):
self._errors['ip_version'] = self.error_class([msg])
def clean(self):
cleaned_data = super(AddRuleAction, self).clean()
self._check_ip_addr_and_ip_version(cleaned_data)
class Meta(object):
name = _("Rule")
permissions = ('openstack.services.network',)
help_text = _("Create a firewall rule.\n\n"
"A Firewall rule is an association of the following "
"attributes:\n\n"
"<li>IP Addresses: The addresses from/to which the "
"traffic filtration needs to be applied.</li>"
"<li>IP Version: The type of IP packets (IP V4/V6) "
"that needs to be filtered.</li>"
"<li>Protocol: Type of packets (UDP, ICMP, TCP, Any) "
"that needs to be checked.</li>"
"<li>Action: Action is the type of filtration "
"required, it can be Reject/Deny/Allow data "
"packets.</li>\n"
"The protocol and action fields are required, all "
"others are optional.")
class AddRuleStep(workflows.Step):
action_class = AddRuleAction
contributes = ("name", "description", "protocol", "action",
"source_ip_address", "source_port",
"destination_ip_address", "destination_port",
"enabled", "shared", "ip_version")
def contribute(self, data, context):
context = super(AddRuleStep, self).contribute(data, context)
if data:
if context['protocol'] == 'any':
del context['protocol']
for field in ['source_port',
'destination_port',
'source_ip_address',
'destination_ip_address']:
if not context[field]:
del context[field]
return context
class AddRule(workflows.Workflow):
slug = "addrule"
name = _("Add Rule")
finalize_button_name = _("Add")
success_message = _('Added Rule "%s".')
failure_message = _('Unable to add Rule "%s".')
success_url = "horizon:project:firewalls_v2:index"
default_steps = (AddRuleStep,)
def format_status_message(self, message):
return message % self.context.get('name')
def handle(self, request, context):
try:
api_fwaas_v2.rule_create(request, **context)
return True
except Exception as e:
msg = self.format_status_message(self.failure_message) + str(e)
exceptions.handle(request, msg)
return False
class SelectRulesAction(workflows.Action):
rule = forms.MultipleChoiceField(
label=_("Rules"),
required=False,
widget=forms.ThemableCheckboxSelectMultiple(),
help_text=_("Create a policy with selected rules."))
class Meta(object):
name = _("Rules")
permissions = ('openstack.services.network',)
help_text = _("Select rules for your policy.")
def populate_rule_choices(self, request, context):
try:
tenant_id = self.request.user.tenant_id
rules = api_fwaas_v2.rule_list_for_tenant(request, tenant_id)
rules = sorted(rules,
key=attrgetter('name_or_id'))
rule_list = [(rule.id, rule.name_or_id) for rule in rules]
except Exception as e:
rule_list = []
exceptions.handle(request, _('Unable to retrieve rules (%s).') % e)
return rule_list
class SelectRulesStep(workflows.Step):
action_class = SelectRulesAction
template_name = "project/firewalls_v2/_update_rules.html"
contributes = ("firewall_rules",)
def contribute(self, data, context):
if data:
rules = self.workflow.request.POST.getlist("rule")
if rules:
rules = [r for r in rules if r]
context['firewall_rules'] = rules
return context
class AddPolicyAction(workflows.Action):
name = forms.CharField(max_length=80,
label=_("Name"))
description = forms.CharField(max_length=80,
label=_("Description"),
required=False)
shared = forms.BooleanField(label=_("Shared"),
initial=False,
required=False)
audited = forms.BooleanField(label=_("Audited"),
initial=False,
required=False)
class Meta(object):
name = _("Policy")
permissions = ('openstack.services.network',)
help_text = _("Create a firewall policy with an ordered list "
"of firewall rules.\n\n"
"A firewall policy is an ordered collection of firewall "
"rules. So if the traffic matches the first rule, the "
"other rules are not executed. If the traffic does not "
"match the current rule, then the next rule is "
"executed. A firewall policy has the following "
"attributes:\n\n"
"<li>Shared: A firewall policy can be shared across "
"tenants. Thus it can also be made part of an audit "
"workflow wherein the firewall policy can be audited "
"by the relevant entity that is authorized.</li>"
"<li>Audited: When audited is set to True, it indicates "
"that the firewall policy has been audited. "
"Each time the firewall policy or the associated "
"firewall rules are changed, this attribute will be "
"set to False and will have to be explicitly set to "
"True through an update operation.</li>\n"
"The name field is required, all others are optional.")
class AddPolicyStep(workflows.Step):
action_class = AddPolicyAction
contributes = ("name", "description", "shared", "audited")
def contribute(self, data, context):
context = super(AddPolicyStep, self).contribute(data, context)
if data:
return context
class AddPolicy(workflows.Workflow):
slug = "addpolicy"
name = _("Add Policy")
finalize_button_name = _("Add")
success_message = _('Added Policy "%s".')
failure_message = _('Unable to add Policy "%s".')
success_url = "horizon:project:firewalls_v2:index"
default_steps = (AddPolicyStep, SelectRulesStep)
def format_status_message(self, message):
return message % self.context.get('name')
def handle(self, request, context):
try:
api_fwaas_v2.policy_create(request, **context)
return True
except Exception as e:
msg = self.format_status_message(self.failure_message) + str(e)
exceptions.handle(request, msg)
return False
class AddFWGPortsAction(workflows.Action):
port = forms.MultipleChoiceField(
label=_("Ports"),
required=False,
widget=forms.ThemableCheckboxSelectMultiple(),
help_text=_("Create a Firewall Group with selected ports."))
class Meta(object):
name = _("Ports")
permissions = ('openstack.services.network',)
help_text = _("Select ports for your firewall group.")
def populate_port_choices(self, request, context):
try:
tenant_id = self.request.user.tenant_id
ports = api_fwaas_v2.fwg_port_list_for_tenant(request, tenant_id)
ports = sorted(ports,
key=attrgetter('name_or_id'))
port_list = [(port.id, port.name_or_id) for port in ports]
except Exception as e:
port_list = []
exceptions.handle(request, _('Unable to retrieve ports (%s).') % e)
return port_list
class AddFWGPortsStep(workflows.Step):
action_class = AddFWGPortsAction
template_name = "project/firewalls_v2/_update_ports.html"
contributes = ("ports")
def contribute(self, data, context):
if data:
ports = self.workflow.request.POST.getlist("port")
if ports:
ports = [r for r in ports if r != '']
context['ports'] = ports
else:
context['ports'] = []
return context
class AddFirewallGroupAction(workflows.Action):
name = forms.CharField(max_length=80,
label=_("Name"),
required=False)
description = forms.CharField(max_length=80,
label=_("Description"),
required=False)
ingress_firewall_policy_id = forms.ThemableChoiceField(
label=_("Ingress Policy"),
required=False)
egress_firewall_policy_id = forms.ThemableChoiceField(
label=_("Egress Policy"),
required=False)
admin_state_up = forms.BooleanField(label=_("Admin State"), required=False)
shared = forms.BooleanField(
label=_("Shared"), initial=False, required=False)
def __init__(self, request, *args, **kwargs):
super(AddFirewallGroupAction, self).__init__(request, *args, **kwargs)
firewall_policy_id_choices = [('', _("Select a Policy"))]
try:
tenant_id = self.request.user.tenant_id
policies = api_fwaas_v2.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=attrgetter('name'))
except Exception as e:
exceptions.handle(request,
_('Unable to retrieve policy list (%s).') % e)
policies = []
for p in policies:
firewall_policy_id_choices.append((p.id, p.name_or_id))
self.fields['ingress_firewall_policy_id'].choices = \
firewall_policy_id_choices
self.fields['egress_firewall_policy_id'].choices = \
firewall_policy_id_choices
def clean(self):
cleaned_data = super(AddFirewallGroupAction, self).clean()
for field in ('ingress_firewall_policy_id',
'egress_firewall_policy_id'):
if not cleaned_data[field]:
cleaned_data[field] = None
return cleaned_data
class Meta(object):
name = _("FirewallGroup")
permissions = ('openstack.services.network',)
help_text = _("Create a firewall group based on a policy.\n\n"
"A firewall represents a logical firewall resource that "
"a tenant can instantiate and manage. A firewall must "
"be associated with one policy, all other fields are "
"optional.")
class AddFirewallGroupStep(workflows.Step):
action_class = AddFirewallGroupAction
contributes = ("name", "description", "admin_state_up", "shared",
"ingress_firewall_policy_id",
"egress_firewall_policy_id")
def contribute(self, data, context):
context = super(AddFirewallGroupStep, self).contribute(data, context)
return context
class AddFirewallGroup(workflows.Workflow):
slug = "addfirewallgroup"
name = _("Add Firewall Group")
finalize_button_name = _("Add")
success_message = _('Added Firewall Group"%s".')
failure_message = _('Unable to add Firewall Group "%s".')
success_url = "horizon:project:firewalls_v2:index"
default_steps = (AddFirewallGroupStep, AddFWGPortsStep)
def format_status_message(self, message):
return message % self.context.get('name')
def handle(self, request, context):
try:
api_fwaas_v2.firewall_group_create(request, **context)
return True
except Exception as e:
msg = self.format_status_message(self.failure_message) + str(e)
exceptions.handle(request, msg)
return False

View File

@ -0,0 +1,19 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# FEATURE just declares this enabled file needs to be loaded.
# This is required for ADD_INSTALLED_APPS to be processed.
# The value itself has no meaning in the current horizon.
FEATURE = 'neutron-fwaas-dashboard'
ADD_INSTALLED_APPS = ['neutron_fwaas_dashboard']
AUTO_DISCOVER_STATIC_FILES = True

View File

@ -20,6 +20,3 @@ PANEL_GROUP = 'network'
# Python panel class of the PANEL to be added.
ADD_PANEL = ('neutron_fwaas_dashboard.dashboards.'
'project.firewalls.panel.Firewall')
ADD_INSTALLED_APPS = ['neutron_fwaas_dashboard']
AUTO_DISCOVER_STATIC_FILES = True

View File

@ -0,0 +1,22 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# The slug of the panel to be added to HORIZON_CONFIG. Required.
PANEL = 'firewalls_v2'
# The slug of the dashboard the PANEL associated with. Required.
PANEL_DASHBOARD = 'project'
# The slug of the panel group the PANEL is associated with.
PANEL_GROUP = 'network'
# Python panel class of the PANEL to be added.
ADD_PANEL = ('neutron_fwaas_dashboard.dashboards.project.'
'firewalls_v2.panel.Firewall_V2')

View File

@ -0,0 +1,554 @@
# Copyright 2017, Juniper Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutronclient.v2_0.client import Client as neutronclient
from neutron_fwaas_dashboard.api import fwaas_v2 as api_fwaas_v2
from neutron_fwaas_dashboard.test import helpers as test
class FwaasV2ApiTests(test.APITestCase):
@test.create_stubs({neutronclient: ('create_fwaas_firewall_rule',)})
def test_rule_create(self):
rule1 = self.fw_rules_v2.first()
rule1_dict = self.api_fw_rules_v2.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled
}
form_dict = {'firewall_rule': form_data}
ret_dict = {'firewall_rule': rule1_dict}
neutronclient.create_fwaas_firewall_rule(form_dict).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.rule_create(self.request, **form_data)
self._assert_rule_return_value(ret_val, rule1)
def _assert_rule_return_value(self, ret_val, exp_rule):
self.assertIsInstance(ret_val, api_fwaas_v2.Rule)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
@test.create_stubs({neutronclient: ('list_fwaas_firewall_rules',)})
def test_rule_list(self):
exp_rules = self.fw_rules_v2.list()
api_rules = {'firewall_rules': self.api_fw_rules_v2.list()}
neutronclient.list_fwaas_firewall_rules().AndReturn(api_rules)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.rule_list(self.request)
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
@test.create_stubs({neutronclient: ('list_fwaas_firewall_rules',)})
def test_rule_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_rules = self.fw_rules_v2.list()
api_rules = {'firewall_rules': self.api_fw_rules_v2.list()}
neutronclient.list_fwaas_firewall_rules(
tenant_id=tenant_id,
shared=False).AndReturn({'firewall_rules': []})
neutronclient.list_fwaas_firewall_rules(shared=True) \
.AndReturn(api_rules)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.rule_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
@test.create_stubs({neutronclient: ('show_fwaas_firewall_rule',)})
def test_rule_get(self):
exp_rule = self.fw_rules_v2.first()
ret_dict = {'firewall_rule': self.api_fw_rules_v2.first()}
neutronclient.show_fwaas_firewall_rule(exp_rule.id).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.rule_get(self.request, exp_rule.id)
self._assert_rule_return_value(ret_val, exp_rule)
@test.create_stubs({neutronclient: ('update_fwaas_firewall_rule',)})
def test_rule_update(self):
rule = self.fw_rules_v2.first()
rule_dict = self.api_fw_rules_v2.first()
rule.name = 'new name'
rule.description = 'new desc'
rule.protocol = 'icmp'
rule.action = 'deny'
rule.shared = True
rule.enabled = False
rule_dict['name'] = 'new name'
rule_dict['description'] = 'new desc'
rule_dict['protocol'] = 'icmp'
rule_dict['action'] = 'deny'
rule_dict['shared'] = True
rule_dict['enabled'] = False
form_data = {'name': rule.name,
'description': rule.description,
'protocol': rule.protocol,
'action': rule.action,
'shared': rule.shared,
'enabled': rule.enabled
}
form_dict = {'firewall_rule': form_data}
ret_dict = {'firewall_rule': rule_dict}
neutronclient.update_fwaas_firewall_rule(
rule.id, form_dict).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.rule_update(self.request,
rule.id, **form_data)
self._assert_rule_return_value(ret_val, rule)
@test.create_stubs({neutronclient: ('create_fwaas_firewall_policy', )})
def test_policy_create(self):
policy1 = self.fw_policies_v2.first()
policy1_dict = self.api_fw_policies_v2.first()
form_data = {'name': policy1.name,
'description': policy1.description,
'firewall_rules': policy1.firewall_rules,
'shared': policy1.shared,
'audited': policy1.audited
}
form_dict = {'firewall_policy': form_data}
ret_dict = {'firewall_policy': policy1_dict}
neutronclient.create_fwaas_firewall_policy(form_dict).\
AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_create(self.request, **form_data)
self.assertIsInstance(ret_val, api_fwaas_v2.Policy)
self.assertEqual(policy1.name, ret_val.name)
self.assertTrue(ret_val.id)
def _assert_policy_return_value(self, ret_val, exp_policy):
self.assertIsInstance(ret_val, api_fwaas_v2.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(len(exp_policy.firewall_rules), len(ret_val.rules))
self.assertEqual(len(exp_policy.firewall_rules),
len(ret_val.firewall_rules))
for (r, exp_r) in zip(ret_val.rules, exp_policy.rules):
self.assertEqual(exp_r.id, r.id)
@test.create_stubs({neutronclient: ('list_fwaas_firewall_policies',
'list_fwaas_firewall_rules')})
def test_policy_list(self):
exp_policies = self.fw_policies_v2.list()
policies_dict = {'firewall_policies': self.api_fw_policies_v2.list()}
rules_dict = {'firewall_rules': self.api_fw_rules_v2.list()}
neutronclient.list_fwaas_firewall_policies().AndReturn(policies_dict)
neutronclient.list_fwaas_firewall_rules().AndReturn(rules_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_list(self.request)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
@test.create_stubs({neutronclient: ('list_fwaas_firewall_policies',
'list_fwaas_firewall_rules')})
def test_policy_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_policies = self.fw_policies_v2.list()
policies_dict = {'firewall_policies': self.api_fw_policies_v2.list()}
rules_dict = {'firewall_rules': self.api_fw_rules_v2.list()}
neutronclient.list_fwaas_firewall_policies(
tenant_id=tenant_id,
shared=False).AndReturn({'firewall_policies': []})
neutronclient.list_fwaas_firewall_policies(
shared=True).AndReturn(policies_dict)
neutronclient.list_fwaas_firewall_rules().AndReturn(rules_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
@test.create_stubs({neutronclient: ('show_fwaas_firewall_policy',
'list_fwaas_firewall_rules')})
def test_policy_get(self):
exp_policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
# The first two rules are associated with the first policy.
api_rules = self.api_fw_rules_v2.list()[:2]
ret_dict = {'firewall_policy': policy_dict}
neutronclient.show_fwaas_firewall_policy(
exp_policy.id).AndReturn(ret_dict)
filters = {'firewall_policy_id': exp_policy.id}
ret_dict = {'firewall_rules': api_rules}
neutronclient.list_fwaas_firewall_rules(**filters).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_get(self.request, exp_policy.id)
self._assert_policy_return_value(ret_val, exp_policy)
@test.create_stubs({neutronclient: ('show_fwaas_firewall_policy',)})
def test_policy_get_no_rule(self):
# 2nd policy is not associated with any rules.
exp_policy = self.fw_policies_v2.list()[1]
policy_dict = self.api_fw_policies_v2.list()[1]
ret_dict = {'firewall_policy': policy_dict}
neutronclient.show_fwaas_firewall_policy(
exp_policy.id).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_get(self.request, exp_policy.id)
self.assertIsInstance(ret_val, api_fwaas_v2.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertFalse(len(ret_val.rules))
@test.create_stubs({neutronclient: ('update_fwaas_firewall_policy',)})
def test_policy_update(self):
policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
policy.name = 'new name'
policy.description = 'new desc'
policy.shared = True
policy.audited = False
policy_dict['name'] = 'new name'
policy_dict['description'] = 'new desc'
policy_dict['shared'] = True
policy_dict['audited'] = False
form_data = {'name': policy.name,
'description': policy.description,
'shared': policy.shared,
'audited': policy.audited
}
form_dict = {'firewall_policy': form_data}
ret_dict = {'firewall_policy': policy_dict}
neutronclient.update_fwaas_firewall_policy(
policy.id, form_dict).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_update(self.request,
policy.id, **form_data)
self.assertIsInstance(ret_val, api_fwaas_v2.Policy)
self.assertEqual(policy.name, ret_val.name)
self.assertTrue(ret_val.id)
@test.create_stubs({neutronclient: ('insert_rule_fwaas_firewall_policy',)})
def test_policy_insert_rule(self):
policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
new_rule_id = 'h0881d38-c3eb-4fee-9763-12de3338041d'
policy.firewall_rules.append(new_rule_id)
policy_dict['firewall_rules'].append(new_rule_id)
body = {'firewall_rule_id': new_rule_id,
'insert_before': policy.firewall_rules[1],
'insert_after': policy.firewall_rules[0]}
neutronclient.insert_rule_fwaas_firewall_policy(
policy.id, body).AndReturn(policy_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_insert_rule(self.request,
policy.id, **body)
self.assertIn(new_rule_id, ret_val.firewall_rules)
@test.create_stubs({neutronclient: ('remove_rule_fwaas_firewall_policy',)})
def test_policy_remove_rule(self):
policy = self.fw_policies_v2.first()
policy_dict = self.api_fw_policies_v2.first()
remove_rule_id = policy.firewall_rules[0]
policy_dict['firewall_rules'].remove(remove_rule_id)
body = {'firewall_rule_id': remove_rule_id}
neutronclient.remove_rule_fwaas_firewall_policy(
policy.id, body).AndReturn(policy_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.policy_remove_rule(self.request,
policy.id, **body)
self.assertNotIn(remove_rule_id, ret_val.firewall_rules)
@test.create_stubs({neutronclient: ('create_fwaas_firewall_group', )})
def test_firewall_group_create(self):
firewall_group = self.firewall_groups_v2.first()
firewall_group_dict = self.api_firewall_groups_v2.first()
form_data = {
'name': firewall_group.name,
'description': firewall_group.description,
'ingress_firewall_policy_id':
firewall_group.ingress_firewall_policy_id,
'egress_firewall_policy_id':
firewall_group.egress_firewall_policy_id,
'admin_state_up': firewall_group.admin_state_up
}
form_dict = {'firewall_group': form_data}
ret_dict = {'firewall_group': firewall_group_dict}
neutronclient.create_fwaas_firewall_group(
form_dict).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.firewall_group_create(self.request, **form_data)
self.assertIsInstance(ret_val, api_fwaas_v2.FirewallGroup)
self.assertEqual(firewall_group.name, ret_val.name)
self.assertEqual(firewall_group.id, ret_val.id)
def _assert_firewall_return_value(self, ret_val, exp_firewall,
expand_policy=True):
self.assertIsInstance(ret_val, api_fwaas_v2.FirewallGroup)
self.assertEqual(exp_firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_firewall.ingress_firewall_policy_id,
ret_val.ingress_firewall_policy_id)
if expand_policy:
if exp_firewall.ingress_firewall_policy_id:
self.assertEqual(exp_firewall.ingress_firewall_policy_id,
ret_val.ingress_policy.id)
self.assertEqual(exp_firewall.ingress_policy.name,
ret_val.ingress_policy.name)
else:
self.assertIsNone(ret_val.ingress_policy)
if exp_firewall.egress_firewall_policy_id:
self.assertEqual(exp_firewall.egress_firewall_policy_id,
ret_val.egress_policy.id)
self.assertEqual(exp_firewall.egress_policy.name,
ret_val.egress_policy.name)
else:
self.assertIsNone(ret_val.egress_policy)
# TODO(Sarath Mekala) : Add API tests for firewall_group_create with ports,
# add port to firewall and remove port from fw.
@test.create_stubs({neutronclient: ('list_fwaas_firewall_groups',
'list_fwaas_firewall_policies')})
def test_firewall_list(self):
exp_firewalls = self.firewall_groups_v2.list()
firewalls_dict = {
'firewall_groups': self.api_firewall_groups_v2.list()}
neutronclient.list_fwaas_firewall_groups().AndReturn(firewalls_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.firewall_list(self.request)
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d, expand_policy=False)
@test.create_stubs({neutronclient: ('list_fwaas_firewall_groups',
'list_fwaas_firewall_policies')})
def test_firewall_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_firewalls = self.firewall_groups_v2.list()
firewalls_dict = {
'firewall_groups': self.api_firewall_groups_v2.list()}
neutronclient.list_fwaas_firewall_groups(shared=False, tenant_id=tenant_id) \
.AndReturn(firewalls_dict)
neutronclient.list_fwaas_firewall_groups(shared=True) \
.AndReturn(firewalls_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.firewall_list_for_tenant(
self.request, tenant_id)
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d, expand_policy=False)
@test.create_stubs({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
def test_fwg_port_list_for_tenant(self):
tenant_id = self.request.user.project_id
router_port = {
'id': 'id-1',
'name': 'port-1',
'device_owner': 'network:router_interface'
}
vm_port1 = {
'id': 'id-vm_port-1',
'name': 'port-2',
'device_owner': 'compute:nova'
}
vm_port2 = {
'id': 'id-vm_port-2',
'name': 'port-2',
'device_owner': 'compute:nova'
}
gateway_port = {
'id': 'id-3',
'name': 'port-3',
'device_owner': 'network:router_gateway'
}
dhcp_port = {
'id': 'id-4',
'name': 'port-4',
'device_owner': 'network:dhcp'
}
dummy_ports = {'ports': [
router_port,
vm_port1,
vm_port2,
gateway_port,
dhcp_port,
]}
neutronclient.list_ports(tenant_id=tenant_id).AndReturn(dummy_ports)
neutronclient.list_fwaas_firewall_groups(
tenant_id=tenant_id).AndReturn({'firewall_groups': []})
self.mox.ReplayAll()
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual(router_port['id'], ports[0]['id'])
self.assertEqual(vm_port1['id'], ports[1]['id'])
self.assertEqual(vm_port2['id'], ports[2]['id'])
@test.create_stubs({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
def test_fwg_port_list_for_tenant_with_used_port(self):
tenant_id = self.request.user.project_id
router_port = {
'id': 'id-1',
'name': 'port-1',
'device_owner': 'network:router_interface'
}
vm_port1 = {
'id': 'id-vm_port-1',
'name': 'port-2',
'device_owner': 'compute:nova'
}
gateway_port = {
'id': 'id-3',
'name': 'port-3',
'device_owner': 'network:router_gateway'
}
dhcp_port = {
'id': 'id-4',
'name': 'port-4',
'device_owner': 'network:dhcp'
}
dummy_ports = {'ports': [
router_port,
vm_port1,
gateway_port,
dhcp_port,
]}
used_ports = {'firewall_groups': [{'ports': [router_port['id']]}]}
neutronclient.list_ports(tenant_id=tenant_id).AndReturn(dummy_ports)
neutronclient.list_fwaas_firewall_groups(
tenant_id=tenant_id).AndReturn(used_ports)
self.mox.ReplayAll()
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual(vm_port1['id'], ports[0]['id'])
@test.create_stubs({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
def test_fwg_port_list_for_tenant_no_match(self):
tenant_id = self.request.user.project_id
dummy_ports = {'ports': [
{'name': 'port-3', 'device_owner': 'network:router_gateway'},
{'name': 'port-4', 'device_owner': 'network:dhcp'},
]}
neutronclient.list_ports(tenant_id=tenant_id).AndReturn(dummy_ports)
neutronclient.list_fwaas_firewall_groups(
tenant_id=tenant_id).AndReturn({'firewall_groups': []})
self.mox.ReplayAll()
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual([], ports)
@test.create_stubs({neutronclient: ('list_ports',
'list_fwaas_firewall_groups')})
def test_fwg_port_list_for_tenant_no_ports(self):
tenant_id = self.request.user.project_id
neutronclient.list_ports(tenant_id=tenant_id).AndReturn({'ports': []})
neutronclient.list_fwaas_firewall_groups(
tenant_id=tenant_id).AndReturn({'firewall_groups': []})
self.mox.ReplayAll()
ports = api_fwaas_v2.fwg_port_list_for_tenant(self.request, tenant_id)
self.assertEqual([], ports)
@test.create_stubs({neutronclient: ('show_fwaas_firewall_group',
'show_fwaas_firewall_policy')})
def test_firewall_get(self):
exp_firewall = self.firewall_groups_v2.first()
ret_dict = {'firewall_group': self.api_firewall_groups_v2.first()}
ingress_policy_id = exp_firewall.ingress_firewall_policy_id
ingress_policy = [p for p in self.api_fw_policies_v2.list()
if p['id'] == ingress_policy_id][0]
egress_policy_id = exp_firewall.egress_firewall_policy_id
egress_policy = [p for p in self.api_fw_policies_v2.list()
if p['id'] == egress_policy_id][0]
neutronclient.show_fwaas_firewall_group(
exp_firewall.id).AndReturn(ret_dict)
neutronclient.show_fwaas_firewall_policy(ingress_policy_id)\
.AndReturn({'firewall_policy': ingress_policy})
neutronclient.show_fwaas_firewall_policy(egress_policy_id)\
.AndReturn({'firewall_policy': egress_policy})
self.mox.ReplayAll()
ret_val = api_fwaas_v2.firewall_get(self.request, exp_firewall.id)
self._assert_firewall_return_value(ret_val, exp_firewall)
@test.create_stubs({neutronclient: ('update_fwaas_firewall_group',)})
def test_firewall_update(self):
firewall = self.firewall_groups_v2.first()
firewall_dict = self.api_firewall_groups_v2.first()
firewall.name = 'new name'
firewall.description = 'new desc'
firewall.admin_state_up = False
firewall_dict['name'] = 'new name'
firewall_dict['description'] = 'new desc'
firewall_dict['admin_state_up'] = False
form_data = {'name': firewall.name,
'description': firewall.description,
'admin_state_up': firewall.admin_state_up
}
form_dict = {'firewall_group': form_data}
ret_dict = {'firewall_group': firewall_dict}
neutronclient.update_fwaas_firewall_group(
firewall.id, form_dict).AndReturn(ret_dict)
self.mox.ReplayAll()
ret_val = api_fwaas_v2.firewall_update(self.request,
firewall.id, **form_data)
self.assertIsInstance(ret_val, api_fwaas_v2.FirewallGroup)
self.assertEqual(firewall.name, ret_val.name)
self.assertTrue(ret_val.id)

View File

@ -49,3 +49,8 @@ TEST_GLOBAL_MOCKS_ON_PANELS['firewalls'] = {
'Firewall.can_access'),
'return_value': True,
}
TEST_GLOBAL_MOCKS_ON_PANELS['firewalls_v2'] = {
'method': ('neutron_fwaas_dashboard.dashboards.project.firewalls_v2.panel.'
'Firewall_V2.can_access'),
'return_value': True,
}

View File

@ -0,0 +1,167 @@
# Copyright 2017 Juniper Networks
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from openstack_dashboard.test.test_data import utils
from neutron_fwaas_dashboard.api import fwaas_v2 as fwaas
def data(TEST):
# Data returned by openstack_dashboard.api.neutron wrapper.
TEST.firewall_groups_v2 = utils.TestDataContainer()
TEST.fw_policies_v2 = utils.TestDataContainer()
TEST.fw_rules_v2 = utils.TestDataContainer()
# Data return by neutronclient.
TEST.api_firewall_groups_v2 = utils.TestDataContainer()
TEST.api_fw_policies_v2 = utils.TestDataContainer()
TEST.api_fw_rules_v2 = utils.TestDataContainer()
# 1st rule (used by 1st policy)
rule1_dict = {
'action': 'allow',
'description': 'rule1 description',
'destination_ip_address': '4.5.6.7/32',
'destination_port': '1:65535',
'enabled': True,
'id': 'f0881d38-c3eb-4fee-9763-12de3338041d',
'ip_version': '4',
'name': 'rule1',
'protocol': 'tcp',
'shared': True,
'source_ip_address': '1.2.3.0/24',
'source_port': '80',
'tenant_id': '1',
}
TEST.api_fw_rules_v2.add(rule1_dict)
rule1 = fwaas.Rule(copy.deepcopy(rule1_dict))
TEST.fw_rules_v2.add(rule1)
# 2nd rule (used by 2nd policy; no name)
rule2_dict = {
'action': 'deny',
'description': '',
'destination_ip_address': None,
'destination_port': '1:65535',
'enabled': True,
'id': 'c6298a93-850f-4f64-b78a-959fd4f1e5df',
'ip_version': '6',
'name': '',
'protocol': 'udp',
'shared': False,
'source_ip_address': '2001:db8::/32',
'source_port': '80',
'tenant_id': '1',
}
TEST.api_fw_rules_v2.add(rule2_dict)
rule2 = fwaas.Rule(copy.deepcopy(rule2_dict))
TEST.fw_rules_v2.add(rule2)
# 3rd rule (not used by any policy)
rule3_dict = {
'action': 'allow',
'description': 'rule3 description',
'destination_ip_address': '4.5.6.7/32',
'destination_port': '1:65535',
'enabled': True,
'id': 'h0881d38-c3eb-4fee-9763-12de3338041d',
'ip_version': '4',
'name': 'rule3',
'protocol': None,
'shared': True,
'source_ip_address': '1.2.3.0/24',
'source_port': '80',
'tenant_id': '1',
}
TEST.api_fw_rules_v2.add(rule3_dict)
rule3 = fwaas.Rule(copy.deepcopy(rule3_dict))
TEST.fw_rules_v2.add(rule3)
# 1st policy (associated with 2 rules)
policy1_dict = {
'audited': True,
'description': 'policy with two rules',
'firewall_rules': [rule1_dict['id'], rule2_dict['id']],
'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'policy1',
'shared': True,
'tenant_id': '1',
}
TEST.api_fw_policies_v2.add(policy1_dict)
policy1 = fwaas.Policy(copy.deepcopy(policy1_dict))
policy1._apidict['rules'] = [rule1, rule2]
TEST.fw_policies_v2.add(policy1)
# 2nd policy (associated with no rules; no name)
policy2_dict = {
'audited': False,
'description': '',
'firewall_rules': [],
'id': 'cf50b331-787a-4623-825e-da794c918d6a',
'name': '',
'shared': False,
'tenant_id': '1',
}
TEST.api_fw_policies_v2.add(policy2_dict)
policy2 = fwaas.Policy(copy.deepcopy(policy2_dict))
policy2._apidict['rules'] = []
TEST.fw_policies_v2.add(policy2)
# 1st firewall group
fwg1_dict = {
'admin_state_up': True,
'description': 'firewall description',
'egress_firewall_policy_id': 'cf50b331-787a-4623-825e-da794c918d6a',
'id': '8913dde8-4915-4b90-8d3e-b95eeedb0d49',
'ingress_firewall_policy_id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'firewallgroup1',
'ports': [],
'shared': False,
'status': 'PENDING_CREATE',
'tenant_id': '1',
}
TEST.api_firewall_groups_v2.add(fwg1_dict)
fwg1 = fwaas.FirewallGroup(copy.deepcopy(fwg1_dict))
fwg1._apidict['ingress_policy'] = policy1
fwg1._apidict['egress_policy'] = policy2
fwg1._apidict['port_ids'] = []
TEST.firewall_groups_v2.add(fwg1)
# 2nd firewall group (no name)
fwg2_dict = {
'admin_state_up': True,
'description': '',
'egress_firewall_policy_id': None,
'id': '1aa75150-415f-458e-bae5-5a362a4fb1f7',
'ingress_firewall_policy_id': None,
'name': '',
'ports': [],
'shared': False,
'status': 'INACTIVE',
'tenant_id': '1',
}
TEST.api_firewall_groups_v2.add(fwg2_dict)
fwg2 = fwaas.FirewallGroup(copy.deepcopy(fwg2_dict))
fwg2._apidict['ingress_policy'] = None
fwg2._apidict['egress_policy'] = None
TEST.firewall_groups_v2.add(fwg2)

View File

@ -15,10 +15,12 @@ from openstack_dashboard.test.test_data import utils
def load_data(load_onto=None):
from neutron_fwaas_dashboard.test.test_data import fwaas_data
from neutron_fwaas_dashboard.test.test_data import fwaas_v2_data
# The order of these loaders matters, some depend on others.
loaders = (
fwaas_data.data,
fwaas_v2_data.data,
)
if load_onto:
for data_func in loaders: