Drop FWaaS v1 dashboard

FWaaS v1 support has been deprecated and neutron-fwaas
dropped it in Stein cycle.

Change-Id: Ib14c9edf4dbd66ab7fc898f497f740dcf7aa81b3
This commit is contained in:
Akihiro Motoki 2018-12-25 02:57:30 +09:00
parent 2a4c00d63d
commit cf86037560
44 changed files with 9 additions and 4211 deletions

View File

@ -7,9 +7,6 @@ For more information on DevStack plugins,
see the `DevStack Plugins documentation
<https://docs.openstack.org/developer/devstack/plugins.html>`__.
Common to FWaaS v1 and v2 dashboard
-----------------------------------
If neutron-fwaas-dashboard DevStack plugin is enabled,
Neutron FWaaS dashboard is automatically enabled and
the appropriate version of FWaaS panel is displayed based on
@ -17,10 +14,8 @@ the FWaaS version enabled in your neutron server.
You do not need to specify FWaaS API version in the DevStack plugin
configuration.
How to enable FWaaS v2 dashboard
--------------------------------
Add the following to the localrc section of your local.conf.
To enable FWaaS dashboard, add the following to the localrc section
of your local.conf.
.. code-block:: none
@ -28,15 +23,3 @@ Add the following to the localrc section of your local.conf.
enable_plugin neutron-fwaas https://git.openstack.org/openstack/neutron-fwaas master
enable_service q-fwaas-v2
enable_plugin neutron-fwaas-dashboard https://git.openstack.org/openstack/neutron-fwaas-dashboard master
How to enable FWaaS v1 dashboard
--------------------------------
Add the following to the localrc section of your local.conf.
.. code-block:: none
[[local|localrc]]
enable_plugin neutron-fwaas https://git.openstack.org/openstack/neutron-fwaas master
enable_service q-fwaas-v1
enable_plugin neutron-fwaas-dashboard https://git.openstack.org/openstack/neutron-fwaas-dashboard master

View File

@ -1,343 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from collections import OrderedDict
from horizon.utils import memoized
from openstack_dashboard.api import neutron
from openstack_dashboard.contrib.developer.profiler import api as profiler
neutronclient = neutron.neutronclient
class Rule(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron firewall rule."""
class Policy(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron firewall policy."""
class Firewall(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron firewall."""
def rule_create(request, **kwargs):
"""Create a firewall rule
:param request: request context
:param name: name for rule
:param description: description for rule
:param protocol: protocol for rule
:param action: action for rule
:param source_ip_address: source IP address or subnet
:param source_port: integer in [1, 65535] or range in a:b
:param destination_ip_address: destination IP address or subnet
:param destination_port: integer in [1, 65535] or range in a:b
:param shared: boolean (default false)
:param enabled: boolean (default true)
:return: Rule object
"""
body = {'firewall_rule': kwargs}
rule = neutronclient(request).create_firewall_rule(
body).get('firewall_rule')
return Rule(rule)
@profiler.trace
def rule_list(request, **kwargs):
return _rule_list(request, expand_policy=True, **kwargs)
@profiler.trace
def rule_list_for_tenant(request, tenant_id, **kwargs):
"""Return a rule list available for the tenant.
The list contains rules owned by the tenant and shared rules.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
rules = rule_list(request, tenant_id=tenant_id, shared=False, **kwargs)
shared_rules = rule_list(request, shared=True, **kwargs)
return rules + shared_rules
def _rule_list(request, expand_policy, **kwargs):
rules = neutronclient(request).list_firewall_rules(
**kwargs).get('firewall_rules')
if expand_policy and rules:
policies = _policy_list(request, expand_rule=False)
policy_dict = OrderedDict((p.id, p) for p in policies)
for rule in rules:
rule['policy'] = policy_dict.get(rule['firewall_policy_id'])
return [Rule(r) for r in rules]
@profiler.trace
def rule_get(request, rule_id):
return _rule_get(request, rule_id, expand_policy=True)
def _rule_get(request, rule_id, expand_policy):
rule = neutronclient(request).show_firewall_rule(
rule_id).get('firewall_rule')
if expand_policy:
if rule['firewall_policy_id']:
rule['policy'] = _policy_get(request, rule['firewall_policy_id'],
expand_rule=False)
else:
rule['policy'] = None
return Rule(rule)
@profiler.trace
def rule_delete(request, rule_id):
neutronclient(request).delete_firewall_rule(rule_id)
@profiler.trace
def rule_update(request, rule_id, **kwargs):
body = {'firewall_rule': kwargs}
rule = neutronclient(request).update_firewall_rule(
rule_id, body).get('firewall_rule')
return Rule(rule)
@profiler.trace
def policy_create(request, **kwargs):
"""Create a firewall policy
:param request: request context
:param name: name for policy
:param description: description for policy
:param firewall_rules: ordered list of rules in policy
:param shared: boolean (default false)
:param audited: boolean (default false)
:return: Policy object
"""
body = {'firewall_policy': kwargs}
policy = neutronclient(request).create_firewall_policy(
body).get('firewall_policy')
return Policy(policy)
@profiler.trace
def policy_list(request, **kwargs):
return _policy_list(request, expand_rule=True, **kwargs)
@profiler.trace
def policy_list_for_tenant(request, tenant_id, **kwargs):
"""Return a policy list available for the tenant.
The list contains policies owned by the tenant and shared policies.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
policies = policy_list(request, tenant_id=tenant_id,
shared=False, **kwargs)
shared_policies = policy_list(request, shared=True, **kwargs)
return policies + shared_policies
def _policy_list(request, expand_rule, **kwargs):
policies = neutronclient(request).list_firewall_policies(
**kwargs).get('firewall_policies')
if expand_rule and policies:
rules = _rule_list(request, expand_policy=False)
rule_dict = OrderedDict((rule.id, rule) for rule in rules)
for p in policies:
p['rules'] = [rule_dict.get(rule) for rule in p['firewall_rules']]
return [Policy(p) for p in policies]
@profiler.trace
def policy_get(request, policy_id):
return _policy_get(request, policy_id, expand_rule=True)
def _policy_get(request, policy_id, expand_rule):
policy = neutronclient(request).show_firewall_policy(
policy_id).get('firewall_policy')
if expand_rule:
policy_rules = policy['firewall_rules']
if policy_rules:
rules = _rule_list(request, expand_policy=False,
firewall_policy_id=policy_id)
rule_dict = OrderedDict((rule.id, rule) for rule in rules)
policy['rules'] = [rule_dict.get(rule) for rule in policy_rules]
else:
policy['rules'] = []
return Policy(policy)
@profiler.trace
def policy_delete(request, policy_id):
neutronclient(request).delete_firewall_policy(policy_id)
@profiler.trace
def policy_update(request, policy_id, **kwargs):
body = {'firewall_policy': kwargs}
policy = neutronclient(request).update_firewall_policy(
policy_id, body).get('firewall_policy')
return Policy(policy)
@profiler.trace
def policy_insert_rule(request, policy_id, **kwargs):
policy = neutronclient(request).firewall_policy_insert_rule(
policy_id, kwargs)
return Policy(policy)
@profiler.trace
def policy_remove_rule(request, policy_id, **kwargs):
policy = neutronclient(request).firewall_policy_remove_rule(
policy_id, kwargs)
return Policy(policy)
@profiler.trace
def firewall_create(request, **kwargs):
"""Create a firewall for specified policy
:param request: request context
:param name: name for firewall
:param description: description for firewall
:param firewall_policy_id: policy id used by firewall
:param shared: boolean (default false)
:param admin_state_up: boolean (default true)
:return: Firewall object
"""
body = {'firewall': kwargs}
firewall = neutronclient(request).create_firewall(body).get('firewall')
return Firewall(firewall)
@profiler.trace
def firewall_list(request, **kwargs):
return _firewall_list(request, expand_policy=True, expand_router=True,
**kwargs)
@profiler.trace
def firewall_list_for_tenant(request, tenant_id, **kwargs):
"""Return a firewall list available for the tenant.
The list contains firewalls owned by the tenant and shared firewalls.
This is required because Neutron returns all resources including
all tenants if a user has admin role.
"""
# NOTE(amotoki): At now 'shared' attribute is not visible in Neutron
# and there is no way to query shared firewalls explicitly.
# Thus this method returns the same as when tenant_id is specified,
# but I would like to have this method for symmetry to firewall
# rules and policies to avoid unnecessary confusion.
return firewall_list(request, tenant_id=tenant_id, **kwargs)
def _firewall_list(request, expand_policy, expand_router, **kwargs):
firewalls = neutronclient(request).list_firewalls(
**kwargs).get('firewalls')
if expand_policy and firewalls:
policies = _policy_list(request, expand_rule=False)
policy_dict = OrderedDict((p.id, p) for p in policies)
for fw in firewalls:
fw['policy'] = policy_dict.get(fw['firewall_policy_id'])
if expand_router and firewalls:
if neutron.is_extension_supported(request, 'fwaasrouterinsertion'):
filter_params = {}
if 'tenant_id' in kwargs:
filter_params['tenant_id'] = kwargs['tenant_id']
routers = neutron.router_list(request, **filter_params)
router_dict = dict((r.id, r) for r in routers)
def _get_router(router_id):
try:
return router_dict[router_id]
except KeyError:
return neutron.Router({'id': router_id, 'name': ''})
for fw in firewalls:
fw['routers'] = [_get_router(router_id)
for router_id in fw['router_ids']]
else:
for fw in firewalls:
fw['routers'] = fw['router_ids']
return [Firewall(f) for f in firewalls]
@profiler.trace
def firewall_get(request, firewall_id):
return _firewall_get(request, firewall_id)
def _firewall_get(request, firewall_id,
expand_policy=True, expand_router=True):
firewall = neutronclient(request).show_firewall(
firewall_id).get('firewall')
if expand_policy:
policy_id = firewall['firewall_policy_id']
if policy_id:
firewall['policy'] = _policy_get(request, policy_id,
expand_rule=False)
else:
firewall['policy'] = None
if expand_router:
if neutron.is_extension_supported(request, 'fwaasrouterinsertion'):
router_ids = firewall['router_ids']
if router_ids:
firewall['routers'] = neutron.router_list(request,
id=router_ids)
else:
firewall['routers'] = []
else:
firewall['routers'] = []
return Firewall(firewall)
@profiler.trace
def firewall_delete(request, firewall_id):
neutronclient(request).delete_firewall(firewall_id)
@profiler.trace
def firewall_update(request, firewall_id, **kwargs):
body = {'firewall': kwargs}
firewall = neutronclient(request).update_firewall(
firewall_id, body).get('firewall')
return Firewall(firewall)
@profiler.trace
@memoized.memoized
def firewall_unassociated_routers_list(request, tenant_id):
all_routers = neutron.router_list(request, tenant_id=tenant_id)
tenant_firewalls = firewall_list_for_tenant(request, tenant_id=tenant_id)
firewall_router_ids = [rid
for fw in tenant_firewalls
for rid in getattr(fw, 'router_ids', [])]
available_routers = [r for r in all_routers
if r.id not in firewall_router_ids]
available_routers = sorted(available_routers,
key=lambda router: router.name_or_id)
return available_routers

View File

@ -1,445 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from django.urls import reverse
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import forms
from horizon import messages
from horizon.utils import validators
from openstack_dashboard import api
from openstack_dashboard import policy
from neutron_fwaas_dashboard.api import fwaas as api_fwaas
port_validator = validators.validate_port_or_colon_separated_port_range
LOG = logging.getLogger(__name__)
class UpdateRule(forms.SelfHandlingForm):
name = forms.CharField(max_length=80, label=_("Name"), required=False)
description = forms.CharField(
required=False,
max_length=80, label=_("Description"))
protocol = forms.ThemableChoiceField(
label=_("Protocol"), required=False,
choices=[('tcp', _('TCP')), ('udp', _('UDP')), ('icmp', _('ICMP')),
('any', _('ANY'))],
help_text=_('Protocol for the firewall rule'))
action = forms.ThemableChoiceField(
label=_("Action"), required=False,
choices=[('allow', _('ALLOW')), ('deny', _('DENY')),
('reject', _('REJECT'))],
help_text=_('Action for the firewall rule'))
source_ip_address = forms.IPField(
label=_("Source IP Address/Subnet"),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True,
help_text=_('Source IP address or subnet'))
destination_ip_address = forms.IPField(
label=_('Destination IP Address/Subnet'),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True,
help_text=_('Destination IP address or subnet'))
source_port = forms.CharField(
max_length=80,
label=_("Source Port/Port Range"),
required=False,
validators=[port_validator],
help_text=_('Source port (integer in [1, 65535] or range in a:b)'))
destination_port = forms.CharField(
max_length=80,
label=_("Destination Port/Port Range"),
required=False,
validators=[port_validator],
help_text=_('Destination port (integer in [1, 65535] or range'
' in a:b)'))
ip_version = forms.ThemableChoiceField(
label=_("IP Version"), required=False,
choices=[('4', '4'), ('6', '6')],
help_text=_('IP Version for Firewall Rule'))
shared = forms.BooleanField(label=_("Shared"), required=False)
enabled = forms.BooleanField(label=_("Enabled"), required=False)
failure_url = 'horizon:project:firewalls:index'
def __init__(self, request, *args, **kwargs):
super(UpdateRule, self).__init__(request, *args, **kwargs)
# Only admin user can update the 'shared' attribute
self.ignore_shared = False
if not policy.check((("neutron-fwaas",
"update_firewall_rule:shared"),),
request):
self.fields['shared'].widget = forms.CheckboxInput(
attrs={'readonly': 'readonly', 'disabled': 'disabled'})
self.fields['shared'].help_text = _(
'Non admin users are not allowed to set the shared property '
'of the rule.')
self.ignore_shared = True
def handle(self, request, context):
rule_id = self.initial['id']
name_or_id = context.get('name') or rule_id
if context['protocol'] == 'any':
context['protocol'] = None
for f in ['source_ip_address', 'destination_ip_address',
'source_port', 'destination_port']:
if not context[f]:
context[f] = None
# Remove 'shared' from the context if the user is not allowed to
# change this field
if self.ignore_shared and 'shared' in context:
del context['shared']
try:
rule = api_fwaas.rule_update(request, rule_id, **context)
msg = _('Rule %s was successfully updated.') % name_or_id
messages.success(request, msg)
return rule
except Exception as e:
LOG.error('Failed to update rule %(id)s: %(reason)s',
{'id': rule_id, 'reason': e})
msg = (_('Failed to update rule %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class UpdatePolicy(forms.SelfHandlingForm):
name = forms.CharField(max_length=80, label=_("Name"), required=False)
description = forms.CharField(required=False,
max_length=80, label=_("Description"))
shared = forms.BooleanField(label=_("Shared"), required=False)
audited = forms.BooleanField(label=_("Audited"), required=False)
failure_url = 'horizon:project:firewalls:index'
def __init__(self, request, *args, **kwargs):
super(UpdatePolicy, self).__init__(request, *args, **kwargs)
# Only admin user can update the 'shared' attribute
self.ignore_shared = False
if not policy.check((("neutron-fwaas",
"update_firewall_policy:shared"),),
request):
self.fields['shared'].widget = forms.CheckboxInput(
attrs={'readonly': 'readonly', 'disabled': 'disabled'})
self.fields['shared'].help_text = _(
'Non admin users are not allowed to set the shared property '
'of the policy.')
self.ignore_shared = True
def handle(self, request, context):
policy_id = self.initial['id']
name_or_id = context.get('name') or policy_id
# Remove 'shared' from the context if the user is not allowed to
# change this field
if self.ignore_shared and 'shared' in context:
del context['shared']
try:
policy = api_fwaas.policy_update(request, policy_id, **context)
msg = _('Policy %s was successfully updated.') % name_or_id
messages.success(request, msg)
return policy
except Exception as e:
LOG.error('Failed to update policy %(id)s: %(reason)s',
{'id': policy_id, 'reason': e})
msg = (_('Failed to update policy %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class UpdateFirewall(forms.SelfHandlingForm):
name = forms.CharField(max_length=80,
label=_("Name"),
required=False)
description = forms.CharField(max_length=80,
label=_("Description"),
required=False)
firewall_policy_id = forms.ThemableChoiceField(label=_("Policy"))
admin_state_up = forms.BooleanField(label=_("Enable Admin State"),
required=False)
failure_url = 'horizon:project:firewalls:index'
def __init__(self, request, *args, **kwargs):
super(UpdateFirewall, self).__init__(request, *args, **kwargs)
try:
tenant_id = self.request.user.tenant_id
policies = api_fwaas.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=lambda policy: policy.name)
except Exception:
exceptions.handle(request,
_('Unable to retrieve policy list.'))
policies = []
policy_id = kwargs['initial']['firewall_policy_id']
policy_name = [p.name for p in policies if p.id == policy_id][0]
firewall_policy_id_choices = [(policy_id, policy_name)]
for p in policies:
if p.id != policy_id:
firewall_policy_id_choices.append((p.id, p.name_or_id))
self.fields['firewall_policy_id'].choices = firewall_policy_id_choices
def handle(self, request, context):
firewall_id = self.initial['id']
name_or_id = context.get('name') or firewall_id
try:
firewall = api_fwaas.firewall_update(request, firewall_id,
**context)
msg = _('Firewall %s was successfully updated.') % name_or_id
messages.success(request, msg)
return firewall
except Exception as e:
LOG.error('Failed to update firewall %(id)s: %(reason)s',
{'id': firewall_id, 'reason': e})
msg = (_('Failed to update firewall %(name)s: %(reason)s') %
{'name': name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class InsertRuleToPolicy(forms.SelfHandlingForm):
firewall_rule_id = forms.ThemableChoiceField(label=_("Insert Rule"))
insert_before = forms.ThemableChoiceField(label=_("Before"),
required=False)
insert_after = forms.ThemableChoiceField(label=_("After"),
required=False)
failure_url = 'horizon:project:firewalls:index'
def __init__(self, request, *args, **kwargs):
super(InsertRuleToPolicy, self).__init__(request, *args, **kwargs)
try:
tenant_id = self.request.user.tenant_id
all_rules = api_fwaas.rule_list_for_tenant(request, tenant_id)
all_rules = sorted(all_rules, key=lambda rule: rule.name_or_id)
available_rules = [r for r in all_rules
if not r.firewall_policy_id]
current_rules = []
for r in kwargs['initial']['firewall_rules']:
r_obj = [rule for rule in all_rules if r == rule.id][0]
current_rules.append(r_obj)
available_choices = [(r.id, r.name_or_id) for r in available_rules]
current_choices = [(r.id, r.name_or_id) for r in current_rules]
except Exception as e:
LOG.error('Failed to retrieve available rules: %s', e)
msg = _('Failed to retrieve available rules: %s') % e
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
self.fields['firewall_rule_id'].choices = available_choices
self.fields['insert_before'].choices = [('', '')] + current_choices
self.fields['insert_after'].choices = [('', '')] + current_choices
def handle(self, request, context):
policy_id = self.initial['id']
policy_name_or_id = self.initial['name'] or policy_id
try:
insert_rule_id = context['firewall_rule_id']
insert_rule = api_fwaas.rule_get(request, insert_rule_id)
body = {'firewall_rule_id': insert_rule_id,
'insert_before': context['insert_before'],
'insert_after': context['insert_after']}
policy = api_fwaas.policy_insert_rule(request, policy_id, **body)
msg = _('Rule %(rule)s was successfully inserted to policy '
'%(policy)s.') % {
'rule': insert_rule.name or insert_rule.id,
'policy': policy_name_or_id}
messages.success(request, msg)
return policy
except Exception as e:
LOG.error('Failed to insert rule to policy %(id)s: %(reason)s',
{'id': policy_id, 'reason': e})
msg = (_('Failed to insert rule to policy %(name)s: %(reason)s') %
{'name': policy_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class RemoveRuleFromPolicy(forms.SelfHandlingForm):
firewall_rule_id = forms.ThemableChoiceField(label=_("Remove Rule"))
failure_url = 'horizon:project:firewalls:index'
def __init__(self, request, *args, **kwargs):
super(RemoveRuleFromPolicy, self).__init__(request, *args, **kwargs)
try:
tenant_id = request.user.tenant_id
all_rules = api_fwaas.rule_list_for_tenant(request, tenant_id)
current_rules = []
for r in kwargs['initial']['firewall_rules']:
r_obj = [rule for rule in all_rules if r == rule.id][0]
current_rules.append(r_obj)
current_choices = [(r.id, r.name_or_id) for r in current_rules]
except Exception as e:
LOG.error('Failed to retrieve current rules in policy %(id)s: '
'%(reason)s',
{'id': self.initial['policy_id'], 'reason': e})
msg = (_('Failed to retrieve current rules in policy %(name)s: '
'%(reason)s') %
{'name': self.initial['name'], 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
self.fields['firewall_rule_id'].choices = current_choices
def handle(self, request, context):
policy_id = self.initial['id']
policy_name_or_id = self.initial['name'] or policy_id
try:
remove_rule_id = context['firewall_rule_id']
remove_rule = api_fwaas.rule_get(request, remove_rule_id)
body = {'firewall_rule_id': remove_rule_id}
policy = api_fwaas.policy_remove_rule(request, policy_id, **body)
msg = _('Rule %(rule)s was successfully removed from policy '
'%(policy)s.') % {
'rule': remove_rule.name or remove_rule.id,
'policy': policy_name_or_id}
messages.success(request, msg)
return policy
except Exception as e:
LOG.error('Failed to remove rule from policy %(id)s: %(reason)s',
{'id': policy_id, 'reason': e})
msg = (_('Failed to remove rule from policy %(name)s: %(reason)s')
% {'name': self.initial['name'], 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class AddRouterToFirewall(forms.SelfHandlingForm):
router_ids = forms.MultipleChoiceField(
label=_("Add Routers"),
required=False,
widget=forms.ThemableCheckboxSelectMultiple(),
help_text=_("Add selected router(s) to the firewall."))
failure_url = 'horizon:project:firewalls:index'
def __init__(self, request, *args, **kwargs):
super(AddRouterToFirewall, self).__init__(request, *args, **kwargs)
try:
router_choices = self.get_router_choices(request, kwargs)
self.fields['router_ids'].choices = router_choices
except Exception as e:
LOG.error('Failed to retrieve available routers: %s', e)
msg = _('Failed to retrieve available routers: %s') % e
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
def get_router_choices(self, request, kwargs):
tenant_id = self.request.user.tenant_id
routers_list = api_fwaas.firewall_unassociated_routers_list(
request, tenant_id)
return [(r.id, r.name_or_id) for r in routers_list]
def get_new_router_ids(self, context):
existing_router_ids = self.initial['router_ids']
add_router_ids = context['router_ids']
return add_router_ids + existing_router_ids
def handle(self, request, context):
firewall_id = self.initial['id']
firewall_name_or_id = self.initial['name'] or firewall_id
try:
body = {'router_ids': self.get_new_router_ids(context)}
firewall = api_fwaas.firewall_update(request, firewall_id, **body)
msg = (_('Router(s) was/were successfully added to firewall '
'%(firewall)s.') %
{'firewall': firewall_name_or_id})
messages.success(request, msg)
return firewall
except Exception as e:
LOG.error('Failed to add router(s) to firewall %(id)s: %(reason)s',
{'id': firewall_id, 'reason': e})
msg = (_('Failed to add router(s) to firewall %(name)s: '
'%(reason)s') %
{'name': firewall_name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
class RemoveRouterFromFirewall(forms.SelfHandlingForm):
router_ids = forms.MultipleChoiceField(
label=_("Associated Routers"),
required=False,
widget=forms.ThemableCheckboxSelectMultiple(),
help_text=_("Unselect the router(s) to be removed from firewall."))
failure_url = 'horizon:project:firewalls:index'
def __init__(self, request, *args, **kwargs):
super(RemoveRouterFromFirewall, self).__init__(request,
*args, **kwargs)
try:
router_choices = self.get_router_choices(request, kwargs)
self.fields['router_ids'].choices = router_choices
except Exception as e:
LOG.error('Failed to retrieve current routers in firewall %(id)s: '
'%(reason)s',
{'id': self.initial['firewall_id'], 'reason': e})
msg = (_('Failed to retrieve current routers in firewall '
'%(name)s: %(reason)s') %
{'name': self.initial['name'], 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)
def get_router_choices(self, request, kwargs):
tenant_id = self.request.user.tenant_id
all_routers = api.neutron.router_list(request, tenant_id=tenant_id)
current_routers = [r for r in all_routers
if r['id'] in kwargs['initial']['router_ids']]
return [(r.id, r.name_or_id) for r in current_routers]
def get_new_router_ids(self, context):
# context[router_ids] is router IDs to be kept.
return context['router_ids']
def handle(self, request, context):
firewall_id = self.initial['id']
firewall_name_or_id = self.initial['name'] or firewall_id
try:
body = {'router_ids': self.get_new_router_ids(context)}
firewall = api_fwaas.firewall_update(request, firewall_id, **body)
msg = (_('Router(s) was successfully removed from firewall '
'%(firewall)s.') %
{'firewall': firewall_name_or_id})
messages.success(request, msg)
return firewall
except Exception as e:
LOG.error('Failed to remove router(s) from firewall %(id)s: '
'%(reason)s', {'id': firewall_id, 'reason': e})
msg = (_('Failed to remove router(s) from firewall %(name)s: '
'%(reason)s') %
{'name': firewall_name_or_id, 'reason': e})
redirect = reverse(self.failure_url)
exceptions.handle(request, msg, redirect=redirect)

View File

@ -1,43 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from django.utils.translation import ugettext_lazy as _
import horizon
from openstack_dashboard.api import neutron
LOG = logging.getLogger(__name__)
class Firewall(horizon.Panel):
name = _("Firewalls")
slug = "firewalls"
permissions = ('openstack.services.network',)
def allowed(self, context):
request = context['request']
if not request.user.has_perms(self.permissions):
return False
try:
if not neutron.is_extension_supported(request, 'fwaas'):
return False
except Exception:
LOG.error("Call to list enabled services failed. This is likely "
"due to a problem communicating with the Neutron "
"endpoint. Firewalls panel will not be displayed.")
return False
if not super(Firewall, self).allowed(context):
return False
return True

View File

@ -1,425 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from django.template import defaultfilters as filters
from django.urls import reverse
from django.utils.translation import pgettext_lazy
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import ungettext_lazy
from horizon import exceptions
from horizon import tables
from openstack_dashboard import api
from openstack_dashboard import policy
from neutron_fwaas_dashboard.api import fwaas as api_fwaas
LOG = logging.getLogger(__name__)
class AddRuleLink(tables.LinkAction):
name = "addrule"
verbose_name = _("Add Rule")
url = "horizon:project:firewalls:addrule"
classes = ("ajax-modal",)
icon = "plus"
policy_rules = (("neutron-fwaas", "create_firewall_rule"),)
class AddPolicyLink(tables.LinkAction):
name = "addpolicy"
verbose_name = _("Add Policy")
url = "horizon:project:firewalls:addpolicy"
classes = ("ajax-modal", "btn-addpolicy",)
icon = "plus"
policy_rules = (("neutron-fwaas", "create_firewall_policy"),)
class AddFirewallLink(tables.LinkAction):
name = "addfirewall"
verbose_name = _("Create Firewall")
url = "horizon:project:firewalls:addfirewall"
classes = ("ajax-modal",)
icon = "plus"
policy_rules = (("neutron-fwaas", "create_firewall"),)
class DeleteRuleLink(policy.PolicyTargetMixin, tables.DeleteAction):
name = "deleterule"
policy_rules = (("neutron-fwaas", "delete_firewall_rule"),)
@staticmethod
def action_present(count):
return ungettext_lazy(
u"Delete Rule",
u"Delete Rules",
count
)
@staticmethod
def action_past(count):
return ungettext_lazy(
u"Scheduled deletion of rule",
u"Scheduled deletion of rules",
count
)
def allowed(self, request, datum=None):
if datum and datum.policy:
return False
return True
def delete(self, request, obj_id):
try:
api_fwaas.rule_delete(request, obj_id)
except Exception as e:
exceptions.handle(request, _('Unable to delete rule. %s') % e)
class DeletePolicyLink(policy.PolicyTargetMixin, tables.DeleteAction):
name = "deletepolicy"
policy_rules = (("neutron-fwaas", "delete_firewall_policy"),)
@staticmethod
def action_present(count):
return ungettext_lazy(
u"Delete Policy",
u"Delete Policies",
count
)
@staticmethod
def action_past(count):
return ungettext_lazy(
u"Scheduled deletion of policy",
u"Scheduled deletion of policies",
count
)
def delete(self, request, obj_id):
try:
api_fwaas.policy_delete(request, obj_id)
except Exception as e:
exceptions.handle(request, _('Unable to delete policy. %s') % e)
class DeleteFirewallLink(policy.PolicyTargetMixin,
tables.DeleteAction):
name = "deletefirewall"
policy_rules = (("neutron-fwaas", "delete_firewall"),)
@staticmethod
def action_present(count):
return ungettext_lazy(
u"Delete Firewall",
u"Delete Firewalls",
count
)
@staticmethod
def action_past(count):
return ungettext_lazy(
u"Scheduled deletion of firewall",
u"Scheduled deletion of firewalls",
count
)
def delete(self, request, obj_id):
try:
api_fwaas.firewall_delete(request, obj_id)
except Exception as e:
exceptions.handle(request, _('Unable to delete firewall. %s') % e)
class UpdateRuleLink(policy.PolicyTargetMixin, tables.LinkAction):
name = "updaterule"
verbose_name = _("Edit Rule")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_firewall_rule"),)
def get_link_url(self, rule):
base_url = reverse("horizon:project:firewalls:updaterule",
kwargs={'rule_id': rule.id})
return base_url
class UpdatePolicyLink(policy.PolicyTargetMixin, tables.LinkAction):
name = "updatepolicy"
verbose_name = _("Edit Policy")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_firewall_policy"),)
def get_link_url(self, policy):
base_url = reverse("horizon:project:firewalls:updatepolicy",
kwargs={'policy_id': policy.id})
return base_url
class UpdateFirewallLink(policy.PolicyTargetMixin, tables.LinkAction):
name = "updatefirewall"
verbose_name = _("Edit Firewall")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_firewall"),)
def get_link_url(self, firewall):
base_url = reverse("horizon:project:firewalls:updatefirewall",
kwargs={'firewall_id': firewall.id})
return base_url
def allowed(self, request, firewall):
if firewall.status in ("PENDING_CREATE",
"PENDING_UPDATE",
"PENDING_DELETE"):
return False
return True
class InsertRuleToPolicyLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "insertrule"
verbose_name = _("Insert Rule")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "get_firewall_policy"),
("neutron-fwaas", "insert_rule"),)
def get_link_url(self, policy):
base_url = reverse("horizon:project:firewalls:insertrule",
kwargs={'policy_id': policy.id})
return base_url
class RemoveRuleFromPolicyLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "removerule"
verbose_name = _("Remove Rule")
classes = ("ajax-modal",)
policy_rules = (("neutron-fwaas", "get_firewall_policy"),
("neutron-fwaas", "remove_rule"),)
action_type = "danger"
def get_link_url(self, policy):
base_url = reverse("horizon:project:firewalls:removerule",
kwargs={'policy_id': policy.id})
return base_url
def allowed(self, request, policy):
if len(policy.rules) > 0:
return True
return False
class AddRouterToFirewallLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "addrouter"
verbose_name = _("Add Router")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_firewall"),)
def get_link_url(self, firewall):
base_url = reverse("horizon:project:firewalls:addrouter",
kwargs={'firewall_id': firewall.id})
return base_url
def allowed(self, request, firewall):
if not api.neutron.is_extension_supported(request,
'fwaasrouterinsertion'):
return False
tenant_id = firewall['tenant_id']
available_routers = api_fwaas.firewall_unassociated_routers_list(
request, tenant_id)
return bool(available_routers)
class RemoveRouterFromFirewallLink(policy.PolicyTargetMixin,
tables.LinkAction):
name = "removerouter"
verbose_name = _("Remove Router")
classes = ("ajax-modal", "btn-update",)
policy_rules = (("neutron-fwaas", "update_firewall"),)
def get_link_url(self, firewall):
base_url = reverse("horizon:project:firewalls:removerouter",
kwargs={'firewall_id': firewall.id})
return base_url
def allowed(self, request, firewall):
if not api.neutron.is_extension_supported(request,
'fwaasrouterinsertion'):
return False
return bool(firewall['router_ids'])
def get_rules_name(datum):
return ', '.join([rule.name or rule.id[:13]
for rule in datum.rules])
def get_routers_name(firewall):
if firewall.routers:
return ', '.join(router.name_or_id for router in firewall.routers)
def get_policy_name(datum):
if datum.policy:
return datum.policy.name or datum.policy.id
def get_policy_link(datum):
if datum.policy:
return reverse('horizon:project:firewalls:policydetails',
kwargs={'policy_id': datum.policy.id})
class RulesTable(tables.DataTable):
ACTION_DISPLAY_CHOICES = (
("Allow", pgettext_lazy("Action name of a firewall rule", u"ALLOW")),
("Deny", pgettext_lazy("Action name of a firewall rule", u"DENY")),
("Reject", pgettext_lazy("Action name of a firewall rule", u"REJECT")),
)
name = tables.Column("name_or_id",
verbose_name=_("Name"),
link="horizon:project:firewalls:ruledetails")
description = tables.Column('description', verbose_name=_('Description'))
protocol = tables.Column("protocol",
filters=(lambda v: filters.default(v, _("ANY")),
filters.upper,),
verbose_name=_("Protocol"))
ip_version = tables.Column("ip_version",
verbose_name=_("IP Version"))
source_ip_address = tables.Column("source_ip_address",
verbose_name=_("Source IP"))
source_port = tables.Column("source_port",
verbose_name=_("Source Port"))
destination_ip_address = tables.Column("destination_ip_address",
verbose_name=_("Destination IP"))
destination_port = tables.Column("destination_port",
verbose_name=_("Destination Port"))
action = tables.Column("action",
display_choices=ACTION_DISPLAY_CHOICES,
verbose_name=_("Action"))
shared = tables.Column("shared",
verbose_name=_("Shared"),
filters=(filters.yesno, filters.capfirst))
enabled = tables.Column("enabled",
verbose_name=_("Enabled"),
filters=(filters.yesno, filters.capfirst))
firewall_policy_id = tables.Column(get_policy_name,
link=get_policy_link,
verbose_name=_("In Policy"))
def get_object_display(self, rule):
return rule.name_or_id
class Meta(object):
name = "rulestable"
verbose_name = _("Rules")
table_actions = (AddRuleLink,
DeleteRuleLink,
tables.NameFilterAction)
row_actions = (UpdateRuleLink, DeleteRuleLink)
class PoliciesTable(tables.DataTable):
name = tables.Column("name_or_id",
verbose_name=_("Name"),
link="horizon:project:firewalls:policydetails")
description = tables.Column('description', verbose_name=_('Description'))
firewall_rules = tables.Column(get_rules_name,
verbose_name=_("Rules"))
shared = tables.Column("shared",
verbose_name=_("Shared"),
filters=(filters.yesno, filters.capfirst))
audited = tables.Column("audited",
verbose_name=_("Audited"),
filters=(filters.yesno, filters.capfirst))
def get_object_display(self, policy):
return policy.name_or_id
class Meta(object):
name = "policiestable"
verbose_name = _("Policies")
table_actions = (AddPolicyLink,
DeletePolicyLink,
tables.NameFilterAction)
row_actions = (UpdatePolicyLink, InsertRuleToPolicyLink,
RemoveRuleFromPolicyLink, DeletePolicyLink)
class FirewallsTable(tables.DataTable):
STATUS_DISPLAY_CHOICES = (
("Active", pgettext_lazy("Current status of a firewall",
u"Active")),
("Down", pgettext_lazy("Current status of a firewall",
u"Down")),
("Error", pgettext_lazy("Current status of a firewall",
u"Error")),
("Created", pgettext_lazy("Current status of a firewall",
u"Created")),
("Pending_Create", pgettext_lazy("Current status of a firewall",
u"Pending Create")),
("Pending_Update", pgettext_lazy("Current status of a firewall",
u"Pending Update")),
("Pending_Delete", pgettext_lazy("Current status of a firewall",
u"Pending Delete")),
("Inactive", pgettext_lazy("Current status of a firewall",
u"Inactive")),
)
ADMIN_STATE_DISPLAY_CHOICES = (
("UP", pgettext_lazy("Admin state of a firewall", u"UP")),
("DOWN", pgettext_lazy("Admin state of a firewall", u"DOWN")),
)
name = tables.Column("name_or_id",
verbose_name=_("Name"),
link="horizon:project:firewalls:firewalldetails")
description = tables.Column('description', verbose_name=_('Description'))
firewall_policy_id = tables.Column(get_policy_name,
link=get_policy_link,
verbose_name=_("Policy"))
router_ids = tables.Column(get_routers_name,
verbose_name=_("Associated Routers"))
status = tables.Column("status",
verbose_name=_("Status"),
display_choices=STATUS_DISPLAY_CHOICES)
admin_state = tables.Column("admin_state",
verbose_name=_("Admin State"),
display_choices=ADMIN_STATE_DISPLAY_CHOICES)
def get_object_display(self, firewall):
return firewall.name_or_id
class Meta(object):
name = "firewallstable"
verbose_name = _("Firewalls")
table_actions = (AddFirewallLink,
DeleteFirewallLink,
tables.NameFilterAction)
row_actions = (UpdateFirewallLink, DeleteFirewallLink,
AddRouterToFirewallLink, RemoveRouterFromFirewallLink)
def __init__(self, request, data=None, needs_form_wrapper=None, **kwargs):
super(FirewallsTable, self).__init__(
request, data=data,
needs_form_wrapper=needs_form_wrapper, **kwargs)
try:
if not api.neutron.is_extension_supported(request,
'fwaasrouterinsertion'):
del self.columns['router_ids']
except Exception as e:
LOG.error('Failed to verify extension support %s', e)
msg = _('Failed to verify extension support %s') % e
exceptions.handle(request, msg)

View File

@ -1,129 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import tabs
from neutron_fwaas_dashboard.api import fwaas as api_fwaas
from neutron_fwaas_dashboard.dashboards.project.firewalls import tables
FirewallsTable = tables.FirewallsTable
PoliciesTable = tables.PoliciesTable
RulesTable = tables.RulesTable
class RulesTab(tabs.TableTab):
table_classes = (RulesTable,)
name = _("Firewall Rules")
slug = "rules"
template_name = "horizon/common/_detail_table.html"
def get_rulestable_data(self):
try:
tenant_id = self.request.user.tenant_id
request = self.tab_group.request
rules = api_fwaas.rule_list_for_tenant(request, tenant_id)
except Exception:
rules = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve rules list.'))
return rules
class PoliciesTab(tabs.TableTab):
table_classes = (PoliciesTable,)
name = _("Firewall Policies")
slug = "policies"
template_name = "horizon/common/_detail_table.html"
def get_policiestable_data(self):
try:
tenant_id = self.request.user.tenant_id
request = self.tab_group.request
policies = api_fwaas.policy_list_for_tenant(request, tenant_id)
except Exception:
policies = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve policies list.'))
return policies
class FirewallsTab(tabs.TableTab):
table_classes = (FirewallsTable,)
name = _("Firewalls")
slug = "firewalls"
template_name = "horizon/common/_detail_table.html"
def get_firewallstable_data(self):
try:
tenant_id = self.request.user.tenant_id
request = self.tab_group.request
firewalls = api_fwaas.firewall_list_for_tenant(request, tenant_id)
except Exception:
firewalls = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve firewall list.'))
return firewalls
class RuleDetailsTab(tabs.Tab):
name = _("Rule")
slug = "ruledetails"
template_name = "project/firewalls/_rule_details.html"
def get_context_data(self, request):
return {"rule": self.tab_group.kwargs['rule']}
class PolicyDetailsTab(tabs.Tab):
name = _("Policy")
slug = "policydetails"
template_name = "project/firewalls/_policy_details.html"
def get_context_data(self, request):
return {"policy": self.tab_group.kwargs['policy']}
class FirewallDetailsTab(tabs.Tab):
name = _("Firewall")
slug = "firewalldetails"
template_name = "project/firewalls/_firewall_details.html"
def get_context_data(self, request):
return {"firewall": self.tab_group.kwargs['firewall']}
class FirewallTabs(tabs.TabGroup):
slug = "fwtabs"
tabs = (FirewallsTab, PoliciesTab, RulesTab)
sticky = True
class RuleDetailsTabs(tabs.TabGroup):
slug = "ruletabs"
tabs = (RuleDetailsTab,)
class PolicyDetailsTabs(tabs.TabGroup):
slug = "policytabs"
tabs = (PolicyDetailsTab,)
class FirewallDetailsTabs(tabs.TabGroup):
slug = "firewalltabs"
tabs = (FirewallDetailsTab,)

View File

@ -1,8 +0,0 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "Choose the router(s) you want to add." %}</p>
{% endblock %}

View File

@ -1,41 +0,0 @@
{% load i18n sizeformat parse_date %}
<div class="detail">
<dl class="dl-horizontal">
<dt>{% trans "Name" %}</dt>
<dd data-display="{{ firewall.name_or_id }}">{{ firewall.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ firewall.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ firewall.id }} </dd>
<dt>{% trans "Project ID" %}</dt>
<dd>{{ firewall.tenant_id }}</dd>
<dt>{% trans "Policy" %}</dt>
<dd>
{% url 'horizon:project:firewalls:policydetails' firewall.firewall_policy_id as policy_url %}
<a href="{{ policy_url }}">{{ firewall.policy.name|default:firewall.policy.id }}</a>
</dd>
<dt>{% trans "Status" %}</dt>
<dd>{{ firewall.status }}</dd>
<dt>{% trans "Admin State Up" %}</dt>
<dd>{{ firewall.admin_state_up|yesno|capfirst }}</dd>
<dt>{% trans "Routers" %}</dt>
<dd>
{% if firewall.routers %}
{% for router in firewall.routers %}
{% url 'horizon:project:routers:detail' router.id as router_url %}
<a href="{{ router_url }}">{{ router.name|default:router.id}}</a><br>
{% endfor %}
{% else %}
{% trans "-" %}
{% endif %}
</dd>
</dl>
</div>

View File

@ -1,7 +0,0 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "Choose the rule you want to insert. Specify either the rule you want to insert immediately before, or the rule to insert immediately after. If both are specified, the prior takes precedence." %}</p>
{% endblock %}

View File

@ -1,35 +0,0 @@
{% load i18n sizeformat parse_date %}
<div class="detail">
<dl class="dl-horizontal">
<dt>{% trans "Name" %}</dt>
<dd data-display="{{ policy.name_or_id }}">{{ policy.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ policy.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ policy.id }}</dd>
<dt>{% trans "Project ID" %}</dt>
<dd>{{ policy.tenant_id }}</dd>
<dt>{% trans "Rules" %}</dt>
<dd>
{% if policy.rules %}
{% for rule in policy.rules %}
{% url 'horizon:project:firewalls:ruledetails' rule.id as rule_url %}
{{ rule.position }} : <a href="{{ rule_url }}">{{ rule.name|default:rule.id }}</a><br>
{% endfor %}
{% else %}
{% trans "-" %}
{% endif %}
</dd>
<dt>{% trans "Shared" %}</dt>
<dd>{{ policy.shared|yesno|capfirst }}</dd>
<dt>{% trans "Audited" %}</dt>
<dd>{{ policy.audited|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -1,7 +0,0 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "Unselect the routers you want to disassociate from the firewall." %}</p>
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "Choose the rule you want to remove." %}</p>
{% endblock %}

View File

@ -1,57 +0,0 @@
{% load i18n sizeformat parse_date %}
<div class="detail">
<dl class="dl-horizontal">
<dt>{% trans "Name" %}</dt>
<dd data-display="{{ rule.name_or_id }}">{{ rule.name|default:_("-") }}</dd>
<dt>{% trans "Description" %}</dt>
<dd>{{ rule.description|default:_("-") }}</dd>
<dt>{% trans "ID" %}</dt>
<dd>{{ rule.id }}</dd>
<dt>{% trans "Project ID" %}</dt>
<dd>{{ rule.tenant_id }}</dd>
<dt>{% trans "Action" %}</dt>
<dd>{{ rule.action|upper }}</dd>
<dt>{% trans "Protocol" %}</dt>
<dd>{{ rule.protocol|default:_("ANY")|upper }}</dd>
<dt>{% trans "Source IP Address" %}</dt>
<dd>{{ rule.source_ip_address|default:_("ANY") }}</dd>
<dt>{% trans "Source Port" %}</dt>
<dd>{{ rule.source_port|default:_("ANY") }}</dd>
<dt>{% trans "Destination IP Address" %}</dt>
<dd>{{ rule.destination_ip_address|default:_("ANY") }}</dd>
<dt>{% trans "Destination Port"%}</dt>
<dd>{{ rule.destination_port|default:_("ANY") }}</dd>
<dt>{% trans "IP Version" %}</dt>
<dd>{{ rule.ip_version }}</dd>
<dt>{% trans "Used in Policy" %}</dt>
<dd>
{% if rule.policy %}
{% url 'horizon:project:firewalls:policydetails' rule.policy.id as policy_url %}
<a href="{{ policy_url }}">{{ rule.policy.name|default:rule.policy.id }}</a>
{% else %}
{% trans "-" %}
{% endif %}
</dd>
<dt>{% trans "Position in Policy" %}</dt>
<dd>{{ rule.position|default:_("-") }}</dd>
<dt>{% trans "Shared" %}</dt>
<dd>{{ rule.shared|yesno|capfirst }}</dd>
<dt>{% trans "Enabled" %}</dt>
<dd>{{ rule.enabled|yesno|capfirst }}</dd>
</dl>
</div>

View File

@ -1,3 +0,0 @@
{% load i18n %}
<p>{% blocktrans %}Choose router(s) from Available Routers to Selected Routers by push button or drag and drop. {% endblocktrans %}</p>

View File

@ -1,35 +0,0 @@
{% load i18n %}
<noscript><h3>{{ step }}</h3></noscript>
<div id="routerListSortContainer" class="sort-container">
<div class="col-sm-6">
<h4 id="selected_router_label">{% trans "Selected Routers" %}</h4>
<ul id="selected_router" class="routerlist box-list"></ul>
<h4>{% trans "Available Routers" %}</h4>
<ul id="available_router" class="routerlist box-list"></ul>
</div>
<div class="col-sm-6">
{% include "project/firewalls/_update_router_help.html" %}
</div>
</div>
<div id="routerListIdContainer">
<div class="actions">
<div id="routerListId">
{% include "horizon/common/_form_fields.html" %}
</div>
</div>
<div class="help_text">
{{ step.get_help_text }}
</div>
</div>
<script>
if (typeof $ !== 'undefined') {
horizon.firewalls.workflow_init($(".workflow"));
} else {
addHorizonLoadEvent(function() {
horizon.firewalls.workflow_init($(".workflow"));
});
}
</script>

View File

@ -1,6 +0,0 @@
{% load i18n horizon %}
<p>{% blocktrans trimmed %}
Choose rule(s) from Available Rules to Selected Rule by push button or
drag and drop, you may change their order by drag and drop as well.
{% endblocktrans %}</p>

View File

@ -1,35 +0,0 @@
{% load i18n %}
<noscript><h3>{{ step }}</h3></noscript>
<div id="ruleListSortContainer" class="sort-container">
<div class="col-sm-6">
<h4 id="selected_rule_label">{% trans "Selected Rules" %}</h4>
<ul id="selected_rule" class="rulelist box-list"></ul>
<h4>{% trans "Available Rules" %}</h4>
<ul id="available_rule" class="rulelist box-list"></ul>
</div>
<div class="col-sm-6">
{% include "project/firewalls/_update_rule_help.html" %}
</div>
</div>
<div id="ruleListIdContainer">
<div class="actions">
<div id="ruleListId">
{% include "horizon/common/_form_fields.html" %}
</div>
</div>
<div class="help_text">
{{ step.get_help_text }}
</div>
</div>
<script>
if (typeof $ !== 'undefined') {
horizon.firewalls.workflow_init($(".workflow"));
} else {
addHorizonLoadEvent(function() {
horizon.firewalls.workflow_init($(".workflow"));
});
}
</script>

View File

@ -1,7 +0,0 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "You may update firewall details here." %}</p>
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "You may update policy details here. Use 'Insert Rule' or 'Remove Rule' links instead to insert or remove a rule" %}</p>
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends "horizon/common/_modal_form.html" %}
{% load i18n %}
{% block modal-body-right %}
<h3>{% trans "Description:" %}</h3>
<p>{% trans "You may update rule details here." %}</p>
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add Router to Firewall" %}{% endblock %}
{% block main %}
{% include 'project/firewalls/_add_router_to_firewall.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add New Firewall" %}{% endblock %}
{% block main %}
{% include 'horizon/common/_workflow.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add New Policy" %}{% endblock %}
{% block main %}
{% include 'horizon/common/_workflow.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Add New Rule" %}{% endblock %}
{% block main %}
{% include 'horizon/common/_workflow.html' %}
{% endblock %}

View File

@ -1,11 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Firewalls" %}{% endblock %}
{% block main %}
<div class="row">
<div class="col-sm-12">
{{ tab_group.render }}
</div>
</div>
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Insert Rule to Policy" %}{% endblock %}
{% block main %}
{% include 'project/firewalls/_insert_rule_to_policy.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Remove Router from Firewall" %}{% endblock %}
{% block main %}
{% include 'project/firewalls/_remove_router_from_firewall.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Remove Rule from Policy" %}{% endblock %}
{% block main %}
{% include 'project/firewalls/_remove_rule_from_policy.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Edit Firewall" %}{% endblock %}
{% block main %}
{% include 'project/firewalls/_updatefirewall.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Edit Policy" %}{% endblock %}
{% block main %}
{% include 'project/firewalls/_updatepolicy.html' %}
{% endblock %}

View File

@ -1,7 +0,0 @@
{% extends 'base.html' %}
{% load i18n %}
{% block title %}{% trans "Edit Rule" %}{% endblock %}
{% block main %}
{% include 'project/firewalls/_updaterule.html' %}
{% endblock %}

View File

@ -1,867 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from django.urls import reverse
from openstack_dashboard import api
from openstack_dashboard.test import helpers
from neutron_fwaas_dashboard.api import fwaas as api_fwaas
from neutron_fwaas_dashboard.test import helpers as test
class FirewallTests(test.TestCase):
DASHBOARD = 'project'
INDEX_URL = reverse('horizon:%s:firewalls:index' % DASHBOARD)
ADDRULE_PATH = 'horizon:%s:firewalls:addrule' % DASHBOARD
ADDPOLICY_PATH = 'horizon:%s:firewalls:addpolicy' % DASHBOARD
ADDFIREWALL_PATH = 'horizon:%s:firewalls:addfirewall' % DASHBOARD
RULE_DETAIL_PATH = 'horizon:%s:firewalls:ruledetails' % DASHBOARD
POLICY_DETAIL_PATH = 'horizon:%s:firewalls:policydetails' % DASHBOARD
FIREWALL_DETAIL_PATH = 'horizon:%s:firewalls:firewalldetails' % DASHBOARD
UPDATERULE_PATH = 'horizon:%s:firewalls:updaterule' % DASHBOARD
UPDATEPOLICY_PATH = 'horizon:%s:firewalls:updatepolicy' % DASHBOARD
UPDATEFIREWALL_PATH = 'horizon:%s:firewalls:updatefirewall' % DASHBOARD
INSERTRULE_PATH = 'horizon:%s:firewalls:insertrule' % DASHBOARD
REMOVERULE_PATH = 'horizon:%s:firewalls:removerule' % DASHBOARD
ADDROUTER_PATH = 'horizon:%s:firewalls:addrouter' % DASHBOARD
REMOVEROUTER_PATH = 'horizon:%s:firewalls:removerouter' % DASHBOARD
def setup_mocks(self, fwaas_router_extension=True):
policies = self.fw_policies.list()
firewalls = self.firewalls.list()
routers = self.routers.list()
self.mock_is_extension_supported.return_value = fwaas_router_extension
self.mock_rule_list_for_tenant.return_value = self.fw_rules.list()
self.mock_policy_list_for_tenant.return_value = policies
self.mock_firewall_list_for_tenant.return_value = firewalls
self.mock_firewall_unassociated_routers_list.return_value = routers
def check_mocks(self, fwaas_router_extension=True):
tenant_id = self.tenant.id
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_is_extension_supported, 5,
mock.call(helpers.IsHttpRequest(), 'fwaasrouterinsertion'))
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.assert_mock_multiple_calls_with_same_arguments(
self.mock_firewall_unassociated_routers_list, 2,
mock.call(helpers.IsHttpRequest(), tenant_id))
def setup_mocks_with_exception(self):
self.mock_is_extension_supported.return_value = True
self.mock_rule_list_for_tenant.side_effect = self.exceptions.neutron
self.mock_policy_list_for_tenant.side_effect = self.exceptions.neutron
self.mock_firewall_list_for_tenant.side_effect = \
self.exceptions.neutron
def check_mocks_with_exception(self):
tenant_id = self.tenant.id
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'firewall_unassociated_routers_list',),
api.neutron: ('is_extension_supported',), })
def test_index_firewalls(self):
self.setup_mocks()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL, tenant_id=tenant_id)
self.assertTemplateUsed(res, '%s/firewalls/details_tabs.html'
% self.DASHBOARD)
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data),
len(self.firewalls.list()))
self.check_mocks()
# TODO(absubram): Change test_index_firewalls for with and without
# router extensions.
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'firewall_unassociated_routers_list',),
api.neutron: ('is_extension_supported',), })
def test_index_policies(self):
self.setup_mocks()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__policies',
tenant_id=tenant_id)
self.assertTemplateUsed(res, '%s/firewalls/details_tabs.html'
% self.DASHBOARD)
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data),
len(self.fw_policies.list()))
self.check_mocks()
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant',
'firewall_unassociated_routers_list',),
api.neutron: ('is_extension_supported',), })
def test_index_rules(self):
self.setup_mocks()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__rules',
tenant_id=tenant_id)
self.assertTemplateUsed(res, '%s/firewalls/details_tabs.html'
% self.DASHBOARD)
self.assertTemplateUsed(res, 'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['rulestable_table'].data),
len(self.fw_rules.list()))
self.check_mocks()
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant'),
api.neutron: ('is_extension_supported',), })
def test_index_exception_firewalls(self):
self.setup_mocks_with_exception()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL, tenant_id=tenant_id)
self.assertTemplateUsed(res,
'%s/firewalls/details_tabs.html'
% self.DASHBOARD)
self.assertTemplateUsed(res,
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['table'].data), 0)
self.check_mocks_with_exception()
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant'),
api.neutron: ('is_extension_supported',), })
def test_index_exception_policies(self):
self.setup_mocks_with_exception()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__policies',
tenant_id=tenant_id)
self.assertTemplateUsed(res,
'%s/firewalls/details_tabs.html'
% self.DASHBOARD)
self.assertTemplateUsed(res,
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['policiestable_table'].data), 0)
self.check_mocks_with_exception()
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'policy_list_for_tenant',
'rule_list_for_tenant'),
api.neutron: ('is_extension_supported',), })
def test_index_exception_rules(self):
self.setup_mocks_with_exception()
tenant_id = self.tenant.id
res = self.client.get(self.INDEX_URL + '?tab=fwtabs__rules',
tenant_id=tenant_id)
self.assertTemplateUsed(res,
'%s/firewalls/details_tabs.html'
% self.DASHBOARD)
self.assertTemplateUsed(res,
'horizon/common/_detail_table.html')
self.assertEqual(len(res.context['rulestable_table'].data), 0)
self.check_mocks_with_exception()
@helpers.create_mocks({api_fwaas: ('rule_create',), })
def test_add_rule_post(self):
rule1 = self.fw_rules.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': rule1.ip_version
}
self.mock_rule_create.return_value = rule1
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_rule_create.assert_called_once_with(
helpers.IsHttpRequest(), **form_data)
@helpers.create_mocks({api_fwaas: ('rule_create',), })
def test_add_rule_post_src_None(self):
rule1 = self.fw_rules.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': rule1.ip_version
}
self.mock_rule_create.return_value = rule1
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
data = form_data.copy()
data['source_ip_address'] = None
data['source_port'] = None
self.mock_rule_create.assert_called_once_with(
helpers.IsHttpRequest(), **data)
@helpers.create_mocks({api_fwaas: ('rule_create',), })
def test_add_rule_post_dest_None(self):
rule1 = self.fw_rules.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': rule1.ip_version
}
self.mock_rule_create.return_value = rule1
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
data = form_data.copy()
data['destination_ip_address'] = None
data['destination_port'] = None
self.mock_rule_create.assert_called_once_with(
helpers.IsHttpRequest(), **data)
def test_add_rule_post_with_error(self):
rule1 = self.fw_rules.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': 'abc',
'action': 'pass',
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled,
'ip_version': 6
}
res = self.client.post(reverse(self.ADDRULE_PATH), form_data)
self.assertFormErrors(res, 3)
@helpers.create_mocks({api_fwaas: ('policy_create',
'rule_list_for_tenant'), })
def test_add_policy_post(self):
policy = self.fw_policies.first()
rules = self.fw_rules.list()
tenant_id = self.tenant.id
form_data = {'name': policy.name,
'description': policy.description,
'firewall_rules': policy.firewall_rules,
'shared': policy.shared,
'audited': policy.audited
}
post_data = {'name': policy.name,
'description': policy.description,
'rule': policy.firewall_rules,
'shared': policy.shared,
'audited': policy.audited
}
# NOTE: SelectRulesAction.populate_rule_choices() lists rule not
# associated with any policy. We need to ensure that rules specified
# in policy.firewall_rules in post_data (above) are not associated
# with any policy. Test data in neutron_data is data in a stable state,
# so we need to modify here.
for rule in rules:
if rule.id in policy.firewall_rules:
rule.firewall_policy_id = rule.policy = None
self.mock_rule_list_for_tenant.return_value = rules
self.mock_policy_create.return_value = policy
res = self.client.post(reverse(self.ADDPOLICY_PATH), post_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_policy_create.assert_called_once_with(
helpers.IsHttpRequest(), **form_data)
@helpers.create_mocks({api_fwaas: ('rule_list_for_tenant',), })
def test_add_policy_post_with_error(self):
policy = self.fw_policies.first()
rules = self.fw_rules.list()
tenant_id = self.tenant.id
form_data = {'description': policy.description,
'firewall_rules': None,
'shared': policy.shared,
'audited': policy.audited
}
self.mock_rule_list_for_tenant.return_value = rules
res = self.client.post(reverse(self.ADDPOLICY_PATH), form_data)
self.assertFormErrors(res, 1)
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
@helpers.create_mocks({api_fwaas: ('firewall_create',
'policy_list_for_tenant',
'firewall_list_for_tenant',),
api.neutron: ('is_extension_supported',
'router_list'), })
def _test_add_firewall_post(self, router_extension=False):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
tenant_id = self.tenant.id
if router_extension:
routers = self.routers.list()
firewalls = self.firewalls.list()
form_data = {'name': firewall.name,
'description': firewall.description,
'firewall_policy_id': firewall.firewall_policy_id,
'admin_state_up': firewall.admin_state_up
}
data = form_data.copy()
if router_extension:
# Lookup for unassociated router(s)
associated = []
for fw in firewalls:
associated += fw.router_ids
unassociated = [r.id for r in routers if r.id not in associated]
form_data['router'] = unassociated
data['router_ids'] = unassociated
self.mock_router_list.return_value = routers
self.mock_firewall_list_for_tenant.return_value = firewalls
self.mock_is_extension_supported.return_value = router_extension
self.mock_policy_list_for_tenant.return_value = policies
self.mock_firewall_create.return_value = firewall
res = self.client.post(reverse(self.ADDFIREWALL_PATH), form_data)
self.assertNoFormErrors(res)
# self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
if router_extension:
self.mock_router_list.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id=tenant_id)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id=tenant_id)
else:
self.mock_router_list.assert_not_called()
self.mock_firewall_list_for_tenant.assert_not_called()
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_create.assert_called_once_with(
helpers.IsHttpRequest(), **data)
def test_add_firewall_post(self):
self._test_add_firewall_post()
def test_add_firewall_post_with_router_extension(self):
self._test_add_firewall_post(router_extension=True)
@helpers.create_mocks({api_fwaas: ('policy_list_for_tenant',),
api.neutron: ('is_extension_supported',), })
def test_add_firewall_post_with_error(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
tenant_id = self.tenant.id
form_data = {'name': firewall.name,
'description': firewall.description,
'firewall_policy_id': None,
'admin_state_up': firewall.admin_state_up
}
self.mock_is_extension_supported.return_value = False
self.mock_policy_list_for_tenant.return_value = policies
res = self.client.post(reverse(self.ADDFIREWALL_PATH), form_data)
self.assertFormErrors(res, 1)
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
@helpers.create_mocks({api_fwaas: ('rule_get',)})
def test_update_rule_get(self):
rule = self.fw_rules.first()
self.mock_rule_get.return_value = rule
res = self.client.get(reverse(self.UPDATERULE_PATH, args=(rule.id,)))
self.assertTemplateUsed(res, 'project/firewalls/updaterule.html')
self.mock_rule_get.assert_called_once_with(helpers.IsHttpRequest(),
rule.id)
@helpers.create_mocks({api_fwaas: ('rule_get', 'rule_update')})
def test_update_rule_post(self):
rule = self.fw_rules.first()
self.mock_rule_get.return_value = rule
data = {'name': 'new name',
'description': 'new desc',
'protocol': 'icmp',
'action': 'allow',
'shared': False,
'enabled': True,
'ip_version': rule.ip_version,
'source_ip_address': rule.source_ip_address,
'destination_ip_address': None,
'source_port': None,
'destination_port': rule.destination_port,
}
self.mock_rule_update.return_value = rule
form_data = data.copy()
form_data['destination_ip_address'] = ''
form_data['source_port'] = ''
res = self.client.post(
reverse(self.UPDATERULE_PATH, args=(rule.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
self.mock_rule_update.assert_called_once_with(
helpers.IsHttpRequest(), rule.id, **data)
@helpers.create_mocks({api_fwaas: ('rule_get', 'rule_update')})
def test_update_protocol_any_rule_post(self):
# protocol any means protocol == None in neutron context.
rule = self.fw_rules.get(protocol=None)
self.mock_rule_get.return_value = rule
self.mock_rule_update.return_value = rule
data = {'name': 'new name',
'description': 'new desc',
'protocol': 'icmp',
'action': 'allow',
'shared': False,
'enabled': True,
'ip_version': rule.ip_version,
'source_ip_address': rule.source_ip_address,
'destination_ip_address': None,
'source_port': None,
'destination_port': rule.destination_port,
}
form_data = data.copy()
form_data['destination_ip_address'] = ''
form_data['source_port'] = ''
res = self.client.post(
reverse(self.UPDATERULE_PATH, args=(rule.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
self.mock_rule_update.assert_called_once_with(
helpers.IsHttpRequest(), rule.id, **data)
@helpers.create_mocks({api_fwaas: ('rule_get', 'rule_update')})
def test_update_rule_protocol_to_any_post(self):
rule = self.fw_rules.first()
self.mock_rule_get.return_value = rule
self.mock_rule_update.return_value = rule
data = {'name': 'new name',
'description': 'new desc',
'protocol': None,
'action': 'allow',
'shared': False,
'enabled': True,
'ip_version': rule.ip_version,
'source_ip_address': rule.source_ip_address,
'destination_ip_address': None,
'source_port': None,
'destination_port': rule.destination_port,
}
form_data = data.copy()
form_data['destination_ip_address'] = ''
form_data['source_port'] = ''
form_data['protocol'] = 'any'
res = self.client.post(
reverse(self.UPDATERULE_PATH, args=(rule.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
self.mock_rule_update.assert_called_once_with(
helpers.IsHttpRequest(), rule.id, **data)
@helpers.create_mocks({api_fwaas: ('policy_get',)})
def test_update_policy_get(self):
policy = self.fw_policies.first()
self.mock_policy_get.return_value = policy
res = self.client.get(
reverse(self.UPDATEPOLICY_PATH, args=(policy.id,)))
self.assertTemplateUsed(res, 'project/firewalls/updatepolicy.html')
self.mock_policy_get.assert_called_once_with(
helpers.IsHttpRequest(), policy.id)
@helpers.create_mocks({api_fwaas: ('policy_get', 'policy_update')})
def test_update_policy_post(self):
policy = self.fw_policies.first()
self.mock_policy_get.return_value = policy
self.mock_policy_update.return_value = policy
data = {'name': 'new name',
'description': 'new desc',
'shared': True,
'audited': False
}
res = self.client.post(
reverse(self.UPDATEPOLICY_PATH, args=(policy.id,)), data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_policy_get.assert_called_once_with(
helpers.IsHttpRequest(), policy.id)
self.mock_policy_update.assert_called_once_with(
helpers.IsHttpRequest(), policy.id, **data)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'policy_list_for_tenant')})
def test_update_firewall_get(self):
firewall = self.firewalls.first()
policies = self.fw_policies.list()
tenant_id = self.tenant.id
self.mock_policy_list_for_tenant.return_value = policies
self.mock_firewall_get.return_value = firewall
res = self.client.get(
reverse(self.UPDATEFIREWALL_PATH, args=(firewall.id,)))
self.assertTemplateUsed(res, 'project/firewalls/updatefirewall.html')
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_get.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'policy_list_for_tenant',
'firewall_update')})
def test_update_firewall_post(self):
firewall = self.firewalls.first()
tenant_id = self.tenant.id
policies = self.fw_policies.list()
self.mock_firewall_get.return_value = firewall
self.mock_policy_list_for_tenant.return_value = policies
self.mock_firewall_update.return_value = firewall
data = {'name': 'new name',
'description': 'new desc',
'firewall_policy_id': firewall.firewall_policy_id,
'admin_state_up': False
}
res = self.client.post(
reverse(self.UPDATEFIREWALL_PATH, args=(firewall.id,)), data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_firewall_get.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id)
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_update.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id, **data)
@helpers.create_mocks({api_fwaas: ('policy_get',
'policy_insert_rule',
'rule_list_for_tenant',
'rule_get')})
def test_policy_insert_rule(self):
policy = self.fw_policies.first()
tenant_id = self.tenant.id
rules = self.fw_rules.list()
new_rule_id = rules[2].id
data = {'firewall_rule_id': new_rule_id,
'insert_before': rules[1].id,
'insert_after': rules[0].id}
policy.firewall_rules = [rules[0].id,
new_rule_id,
rules[1].id]
self.mock_policy_get.return_value = policy
self.mock_rule_list_for_tenant.return_value = rules
self.mock_rule_get.return_value = rules[2]
self.mock_policy_insert_rule.return_value = policy
res = self.client.post(
reverse(self.INSERTRULE_PATH, args=(policy.id,)), data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_policy_get.assert_called_once_with(
helpers.IsHttpRequest(), policy.id)
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), new_rule_id)
self.mock_policy_insert_rule.assert_called_once_with(
helpers.IsHttpRequest(), policy.id, **data)
@helpers.create_mocks({api_fwaas: ('policy_get',
'policy_remove_rule',
'rule_list_for_tenant',
'rule_get')})
def test_policy_remove_rule(self):
policy = self.fw_policies.first()
tenant_id = self.tenant.id
rules = self.fw_rules.list()
remove_rule_id = policy.firewall_rules[0]
left_rule_id = policy.firewall_rules[1]
data = {'firewall_rule_id': remove_rule_id}
after_remove_policy_dict = {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'tenant_id': '1',
'name': 'policy1',
'description': 'policy description',
'firewall_rules': [left_rule_id],
'audited': True,
'shared': True}
after_remove_policy = api_fwaas.Policy(after_remove_policy_dict)
self.mock_policy_get.return_value = policy
self.mock_rule_list_for_tenant.return_value = rules
self.mock_rule_get.return_value = rules[0]
self.mock_policy_remove_rule.return_value = after_remove_policy
res = self.client.post(
reverse(self.REMOVERULE_PATH, args=(policy.id,)), data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_policy_get.assert_called_once_with(
helpers.IsHttpRequest(), policy.id)
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_rule_get.assert_called_once_with(
helpers.IsHttpRequest(), remove_rule_id)
self.mock_policy_remove_rule.assert_called_once_with(
helpers.IsHttpRequest(), policy.id, **data)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'firewall_list_for_tenant',
'firewall_update',
'firewall_unassociated_routers_list')})
def test_firewall_add_router(self):
tenant_id = self.tenant.id
firewall = self.firewalls.first()
routers = self.routers.list()
existing_router_ids = firewall.router_ids
add_router_ids = [routers[1].id]
form_data = {'router_ids': add_router_ids}
post_data = {'router_ids': add_router_ids + existing_router_ids}
firewall.router_ids = [add_router_ids, existing_router_ids]
self.mock_firewall_get.return_value = firewall
self.mock_firewall_unassociated_routers_list.return_value = routers
self.mock_firewall_update.return_value = firewall
res = self.client.post(
reverse(self.ADDROUTER_PATH, args=(firewall.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_firewall_get.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id)
self.mock_firewall_unassociated_routers_list.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id)
self.mock_firewall_update.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id, **post_data)
@helpers.create_mocks({api_fwaas: ('firewall_get',
'firewall_update'),
api.neutron: ('router_list',), })
def test_firewall_remove_router(self):
firewall = self.firewalls.first()
tenant_id = self.tenant.id
routers = self.routers.list()
existing_router_ids = firewall.router_ids
form_data = {'router_ids': existing_router_ids}
firewall.router_ids = []
self.mock_firewall_get.return_value = firewall
self.mock_router_list.return_value = routers
self.mock_firewall_update.return_value = firewall
res = self.client.post(
reverse(self.REMOVEROUTER_PATH, args=(firewall.id,)), form_data)
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
self.mock_firewall_get.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id)
api.neutron.router_list.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id=tenant_id)
self.mock_firewall_update.assert_called_once_with(
helpers.IsHttpRequest(), firewall.id, **form_data)
@helpers.create_mocks({api_fwaas: ('rule_list_for_tenant',
'rule_delete'),
api.neutron: ('is_extension_supported',)})
def test_delete_rule(self):
self.mock_is_extension_supported.return_value = True
self.mock_rule_list_for_tenant.return_value = self.fw_rules.list()
self.mock_rule_delete.return_value = None
rule = self.fw_rules.list()[2]
form_data = {"action": "rulestable__deleterule__%s" % rule.id}
res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res)
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_rule_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), self.tenant.id)
self.mock_rule_delete.assert_called_once_with(
helpers.IsHttpRequest(), rule.id)
@helpers.create_mocks({api_fwaas: ('policy_list_for_tenant',
'policy_delete'),
api.neutron: ('is_extension_supported',)})
def test_delete_policy(self):
self.mock_is_extension_supported.return_value = True
self.mock_policy_list_for_tenant.return_value = self.fw_policies.list()
self.mock_policy_delete.return_value = None
policy = self.fw_policies.first()
form_data = {"action": "policiestable__deletepolicy__%s" % policy.id}
res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res)
api.neutron.is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_policy_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), self.tenant.id)
self.mock_policy_delete.assert_called_once_with(
helpers.IsHttpRequest(), policy.id)
@helpers.create_mocks({api_fwaas: ('firewall_list_for_tenant',
'firewall_delete'),
api.neutron: ('is_extension_supported',)})
def test_delete_firewall(self):
fwl = self.firewalls.first()
self.mock_firewall_list_for_tenant.return_value = [fwl]
self.mock_firewall_delete.return_value = None
self.mock_is_extension_supported.return_value = False
form_data = {"action": "firewallstable__deletefirewall__%s" % fwl.id}
res = self.client.post(self.INDEX_URL, form_data)
self.assertNoFormErrors(res)
self.mock_firewall_list_for_tenant.assert_called_once_with(
helpers.IsHttpRequest(), self.tenant.id)
self.mock_firewall_delete.assert_called_once_with(
helpers.IsHttpRequest(), fwl.id)
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')

View File

@ -1,51 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.conf.urls import url
from neutron_fwaas_dashboard.dashboards.project.firewalls import views
urlpatterns = [
url(r'^$', views.IndexView.as_view(), name='index'),
url(r'^\?tab=fwtabs__firewalls$',
views.IndexView.as_view(), name='firewalls'),
url(r'^\?tab=fwtabs__rules$', views.IndexView.as_view(), name='rules'),
url(r'^\?tab=fwtabs__policies$',
views.IndexView.as_view(), name='policies'),
url(r'^addrule$', views.AddRuleView.as_view(), name='addrule'),
url(r'^addpolicy$', views.AddPolicyView.as_view(), name='addpolicy'),
url(r'^addfirewall/(?P<policy_id>[^/]+)/$',
views.AddFirewallView.as_view(), name='addfirewall'),
url(r'^addfirewall$', views.AddFirewallView.as_view(), name='addfirewall'),
url(r'^insertrule/(?P<policy_id>[^/]+)/$',
views.InsertRuleToPolicyView.as_view(), name='insertrule'),
url(r'^removerule/(?P<policy_id>[^/]+)/$',
views.RemoveRuleFromPolicyView.as_view(), name='removerule'),
url(r'^updaterule/(?P<rule_id>[^/]+)/$',
views.UpdateRuleView.as_view(), name='updaterule'),
url(r'^updatepolicy/(?P<policy_id>[^/]+)/$',
views.UpdatePolicyView.as_view(), name='updatepolicy'),
url(r'^updatefirewall/(?P<firewall_id>[^/]+)/$',
views.UpdateFirewallView.as_view(), name='updatefirewall'),
url(r'^rule/(?P<rule_id>[^/]+)/$',
views.RuleDetailsView.as_view(), name='ruledetails'),
url(r'^policy/(?P<policy_id>[^/]+)/$',
views.PolicyDetailsView.as_view(), name='policydetails'),
url(r'^addrouter/(?P<firewall_id>[^/]+)/$',
views.AddRouterToFirewallView.as_view(), name='addrouter'),
url(r'^removerouter/(?P<firewall_id>[^/]+)/$',
views.RemoveRouterFromFirewallView.as_view(), name='removerouter'),
url(r'^firewall/(?P<firewall_id>[^/]+)/$',
views.FirewallDetailsView.as_view(), name='firewalldetails'),
]

View File

@ -1,416 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.urls import reverse
from django.urls import reverse_lazy
from django.utils.translation import ugettext_lazy as _
from horizon import exceptions
from horizon import forms as horizon_forms
from horizon import tabs as horizon_tabs
from horizon.utils import memoized
from horizon import workflows as horizon_workflows
from openstack_dashboard import api
from neutron_fwaas_dashboard.api import fwaas as api_fwaas
from neutron_fwaas_dashboard.dashboards.project.firewalls import forms
from neutron_fwaas_dashboard.dashboards.project.firewalls import tabs
from neutron_fwaas_dashboard.dashboards.project.firewalls import workflows
AddRouterToFirewall = forms.AddRouterToFirewall
InsertRuleToPolicy = forms.InsertRuleToPolicy
RemoveRouterFromFirewall = forms.RemoveRouterFromFirewall
RemoveRuleFromPolicy = forms.RemoveRuleFromPolicy
UpdateFirewall = forms.UpdateFirewall
UpdatePolicy = forms.UpdatePolicy
UpdateRule = forms.UpdateRule
FirewallDetailsTabs = tabs.FirewallDetailsTabs
FirewallTabs = tabs.FirewallTabs
PolicyDetailsTabs = tabs.PolicyDetailsTabs
RuleDetailsTabs = tabs.RuleDetailsTabs
AddFirewall = workflows.AddFirewall
AddPolicy = workflows.AddPolicy
AddRule = workflows.AddRule
class IndexView(horizon_tabs.TabbedTableView):
tab_group_class = FirewallTabs
template_name = 'project/firewalls/details_tabs.html'
page_title = _("Firewalls")
class AddRuleView(horizon_workflows.WorkflowView):
workflow_class = AddRule
template_name = "project/firewalls/addrule.html"
page_title = _("Add New Rule")
class AddPolicyView(horizon_workflows.WorkflowView):
workflow_class = AddPolicy
template_name = "project/firewalls/addpolicy.html"
page_title = _("Add New Policy")
class AddFirewallView(horizon_workflows.WorkflowView):
workflow_class = AddFirewall
template_name = "project/firewalls/addfirewall.html"
page_title = _("Add New Firewall")
def get_workflow(self):
if api.neutron.is_extension_supported(self.request,
'fwaasrouterinsertion'):
AddFirewall.register(workflows.SelectRoutersStep)
workflow = super(AddFirewallView, self).get_workflow()
return workflow
class RuleDetailsView(horizon_tabs.TabView):
tab_group_class = (RuleDetailsTabs)
template_name = 'horizon/common/_detail.html'
page_title = "{{ rule.name|default:rule.id }}"
failure_url = reverse_lazy('horizon:project:firewalls:index')
def get_context_data(self, **kwargs):
context = super(RuleDetailsView, self).get_context_data(**kwargs)
rule = self.get_data()
table = tabs.RulesTable(self.request)
breadcrumb = [
(_("Rules"), reverse_lazy('horizon:project:firewalls:rules'))]
context["custom_breadcrumb"] = breadcrumb
context["rule"] = rule
context["url"] = self.failure_url
context["actions"] = table.render_row_actions(rule)
return context
@memoized.memoized_method
def get_data(self):
try:
rule_id = self.kwargs['rule_id']
rule = api_fwaas.rule_get(self.request, rule_id)
except Exception:
exceptions.handle(self.request,
_('Unable to retrieve rule details.'),
redirect=self.failure_url)
return rule
def get_tabs(self, request, *args, **kwargs):
rule = self.get_data()
return self.tab_group_class(request, rule=rule, **kwargs)
class PolicyDetailsView(horizon_tabs.TabView):
tab_group_class = (PolicyDetailsTabs)
template_name = 'horizon/common/_detail.html'
page_title = "{{ policy.name|default:policy.id }}"
failure_url = reverse_lazy('horizon:project:firewalls:index')
def get_context_data(self, **kwargs):
context = super(PolicyDetailsView, self).get_context_data(**kwargs)
policy = self.get_data()
table = tabs.PoliciesTable(self.request)
breadcrumb = [
(_("Policies"),
reverse_lazy('horizon:project:firewalls:policies'))]
context["custom_breadcrumb"] = breadcrumb
context["policy"] = policy
context["url"] = self.failure_url
context["actions"] = table.render_row_actions(policy)
return context
@memoized.memoized_method
def get_data(self):
try:
policy_id = self.kwargs['policy_id']
policy = api_fwaas.policy_get(self.request, policy_id)
except Exception:
exceptions.handle(self.request,
_('Unable to retrieve policy details.'),
redirect=self.failure_url)
return policy
def get_tabs(self, request, *args, **kwargs):
policy = self.get_data()
return self.tab_group_class(request, policy=policy, **kwargs)
class FirewallDetailsView(horizon_tabs.TabView):
tab_group_class = (FirewallDetailsTabs)
template_name = 'horizon/common/_detail.html'
page_title = "{{ firewall.name|default:firewall.id }}"
failure_url = reverse_lazy('horizon:project:firewalls:index')
def get_context_data(self, **kwargs):
context = super(FirewallDetailsView, self).get_context_data(**kwargs)
firewall = self.get_data()
table = tabs.FirewallsTable(self.request)
context["firewall"] = firewall
context["url"] = self.failure_url
context["actions"] = table.render_row_actions(firewall)
return context
@memoized.memoized_method
def get_data(self):
try:
firewall_id = self.kwargs['firewall_id']
firewall = api_fwaas.firewall_get(self.request, firewall_id)
except Exception:
exceptions.handle(self.request,
_('Unable to retrieve firewall details.'),
redirect=self.failure_url)
return firewall
def get_tabs(self, request, *args, **kwargs):
firewall = self.get_data()
return self.tab_group_class(request, firewall=firewall, **kwargs)
class UpdateRuleView(horizon_forms.ModalFormView):
form_class = UpdateRule
form_id = "update_rule_form"
template_name = "project/firewalls/updaterule.html"
context_object_name = 'rule'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls:updaterule"
success_url = reverse_lazy("horizon:project:firewalls:index")
page_title = _("Edit Rule {{ name }}")
def get_context_data(self, **kwargs):
context = super(UpdateRuleView, self).get_context_data(**kwargs)
context['rule_id'] = self.kwargs['rule_id']
args = (self.kwargs['rule_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
rule_id = self.kwargs['rule_id']
try:
rule = api_fwaas.rule_get(self.request, rule_id)
return rule
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve rule details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
rule = self._get_object()
initial = rule.to_dict()
if not initial['protocol']:
initial['protocol'] = 'any'
return initial
class UpdatePolicyView(horizon_forms.ModalFormView):
form_class = UpdatePolicy
form_id = "update_policy_form"
template_name = "project/firewalls/updatepolicy.html"
context_object_name = 'policy'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls:updatepolicy"
success_url = reverse_lazy("horizon:project:firewalls:index")
page_title = _("Edit Policy {{ name }}")
def get_context_data(self, **kwargs):
context = super(UpdatePolicyView, self).get_context_data(**kwargs)
context["policy_id"] = self.kwargs['policy_id']
args = (self.kwargs['policy_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
policy_id = self.kwargs['policy_id']
try:
policy = api_fwaas.policy_get(self.request, policy_id)
return policy
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve policy details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
policy = self._get_object()
initial = policy.to_dict()
return initial
class UpdateFirewallView(horizon_forms.ModalFormView):
form_class = UpdateFirewall
form_id = "update_firewall_form"
template_name = "project/firewalls/updatefirewall.html"
context_object_name = 'firewall'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls:updatefirewall"
success_url = reverse_lazy("horizon:project:firewalls:index")
page_title = _("Edit Firewall {{ name }}")
def get_context_data(self, **kwargs):
context = super(UpdateFirewallView, self).get_context_data(**kwargs)
context["firewall_id"] = self.kwargs['firewall_id']
args = (self.kwargs['firewall_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
firewall_id = self.kwargs['firewall_id']
try:
firewall = api_fwaas.firewall_get(self.request,
firewall_id)
return firewall
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve firewall details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
firewall = self._get_object()
initial = firewall.to_dict()
return initial
class InsertRuleToPolicyView(horizon_forms.ModalFormView):
form_class = InsertRuleToPolicy
form_id = "update_policy_form"
template_name = "project/firewalls/insert_rule_to_policy.html"
context_object_name = 'policy'
submit_url = "horizon:project:firewalls:insertrule"
submit_label = _("Save Changes")
success_url = reverse_lazy("horizon:project:firewalls:index")
page_title = _("Insert Rule to Policy")
def get_context_data(self, **kwargs):
context = super(InsertRuleToPolicyView,
self).get_context_data(**kwargs)
context["policy_id"] = self.kwargs['policy_id']
args = (self.kwargs['policy_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
policy_id = self.kwargs['policy_id']
try:
policy = api_fwaas.policy_get(self.request, policy_id)
return policy
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve policy details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
policy = self._get_object()
initial = policy.to_dict()
return initial
class RemoveRuleFromPolicyView(horizon_forms.ModalFormView):
form_class = RemoveRuleFromPolicy
form_id = "update_policy_form"
template_name = "project/firewalls/remove_rule_from_policy.html"
context_object_name = 'policy'
submit_label = _("Save Changes")
submit_url = "horizon:project:firewalls:removerule"
success_url = reverse_lazy("horizon:project:firewalls:index")
page_title = _("Remove Rule from Policy")
def get_context_data(self, **kwargs):
context = super(RemoveRuleFromPolicyView,
self).get_context_data(**kwargs)
context["policy_id"] = self.kwargs['policy_id']
args = (self.kwargs['policy_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
policy_id = self.kwargs['policy_id']
try:
policy = api_fwaas.policy_get(self.request, policy_id)
return policy
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve policy details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
policy = self._get_object()
initial = policy.to_dict()
return initial
class RouterCommonView(horizon_forms.ModalFormView):
form_id = "update_firewall_form"
context_object_name = 'firewall'
submit_label = _("Save Changes")
success_url = reverse_lazy("horizon:project:firewalls:index")
def get_context_data(self, **kwargs):
context = super(RouterCommonView,
self).get_context_data(**kwargs)
context["firewall_id"] = self.kwargs['firewall_id']
args = (self.kwargs['firewall_id'],)
context['submit_url'] = reverse(self.submit_url, args=args)
obj = self._get_object()
if obj:
context['name'] = obj.name_or_id
return context
@memoized.memoized_method
def _get_object(self, *args, **kwargs):
firewall_id = self.kwargs['firewall_id']
try:
firewall = api_fwaas.firewall_get(self.request, firewall_id)
return firewall
except Exception:
redirect = self.success_url
msg = _('Unable to retrieve firewall details.')
exceptions.handle(self.request, msg, redirect=redirect)
def get_initial(self):
firewall = self._get_object()
initial = firewall.to_dict()
return initial
class AddRouterToFirewallView(RouterCommonView):
form_class = AddRouterToFirewall
template_name = "project/firewalls/add_router_to_firewall.html"
submit_url = "horizon:project:firewalls:addrouter"
page_title = _("Add Router to Firewall")
class RemoveRouterFromFirewallView(RouterCommonView):
form_class = RemoveRouterFromFirewall
template_name = "project/firewalls/remove_router_from_firewall.html"
submit_url = "horizon:project:firewalls:removerouter"
page_title = _("Remove Router from Firewall")

View File

@ -1,433 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django.utils.translation import ugettext_lazy as _
import netaddr
from horizon import exceptions
from horizon import forms
from horizon.utils import validators
from horizon import workflows
from openstack_dashboard import policy
from neutron_fwaas_dashboard.api import fwaas as api_fwaas
port_validator = validators.validate_port_or_colon_separated_port_range
class AddRuleAction(workflows.Action):
name = forms.CharField(
max_length=80,
label=_("Name"),
required=False)
description = forms.CharField(
max_length=80,
label=_("Description"),
required=False)
protocol = forms.ThemableChoiceField(
label=_("Protocol"),
choices=[('tcp', _('TCP')),
('udp', _('UDP')),
('icmp', _('ICMP')),
('any', _('ANY'))],
widget=forms.ThemableSelectWidget(attrs={
'class': 'switchable',
'data-slug': 'protocol',
}))
action = forms.ThemableChoiceField(
label=_("Action"),
choices=[('allow', _('ALLOW')),
('deny', _('DENY')),
('reject', _('REJECT'))],)
source_ip_address = forms.IPField(
label=_("Source IP Address/Subnet"),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True)
destination_ip_address = forms.IPField(
label=_("Destination IP Address/Subnet"),
version=forms.IPv4 | forms.IPv6,
required=False, mask=True)
source_port = forms.CharField(
max_length=80,
label=_("Source Port/Port Range"),
widget=forms.TextInput(attrs={
'class': 'switched',
'data-switch-on': 'protocol',
'data-protocol-tcp': _("Source Port/Port Range"),
'data-protocol-udp': _("Source Port/Port Range"),
}),
required=False,
validators=[port_validator])
destination_port = forms.CharField(
max_length=80,
label=_("Destination Port/Port Range"),
widget=forms.TextInput(attrs={
'class': 'switched',
'data-switch-on': 'protocol',
'data-protocol-tcp': _("Destination Port/Port Range"),
'data-protocol-udp': _("Destination Port/Port Range"),
}),
required=False,
validators=[port_validator])
ip_version = forms.ThemableChoiceField(
label=_("IP Version"), required=False,
choices=[('4', '4'), ('6', '6')])
shared = forms.BooleanField(
label=_("Shared"), initial=False, required=False)
enabled = forms.BooleanField(
label=_("Enabled"), initial=True, required=False)
def __init__(self, request, *args, **kwargs):
super(AddRuleAction, self).__init__(request, *args, **kwargs)
# Only admin user can update the 'shared' attribute
self.ignore_shared = False
if not policy.check((("neutron-fwaas",
"create_firewall_rule:shared"),),
request):
self.fields['shared'].widget = forms.CheckboxInput(
attrs={'readonly': 'readonly', 'disabled': 'disabled'})
self.fields['shared'].help_text = _(
'Non admin users are not allowed to set the shared property '
'of the rule.')
self.ignore_shared = True
def _check_ip_addr_and_ip_version(self, cleaned_data):
ip_version = int(str(cleaned_data.get('ip_version')))
src_ip = cleaned_data.get('source_ip_address')
dst_ip = cleaned_data.get('destination_ip_address')
msg = _('Source/Destination Network Address and IP version '
'are inconsistent. Please make them consistent.')
if (src_ip and
netaddr.IPNetwork(src_ip).version != ip_version):
self._errors['ip_version'] = self.error_class([msg])
elif (dst_ip and
netaddr.IPNetwork(dst_ip).version != ip_version):
self._errors['ip_version'] = self.error_class([msg])
def clean(self):
cleaned_data = super(AddRuleAction, self).clean()
self._check_ip_addr_and_ip_version(cleaned_data)
class Meta(object):
name = _("Rule")
permissions = ('openstack.services.network',)
help_text = _("Create a firewall rule.\n\n"
"A firewall rule is an association of the following "
"attributes:\n\n"
"<li>IP Addresses: The addresses from/to which the "
"traffic filtration needs to be applied.</li>"
"<li>IP Version: The type of IP packets (IP V4/V6) "
"that needs to be filtered.</li>"
"<li>Protocol: Type of packets (UDP, ICMP, TCP, Any) "
"that needs to be checked.</li>"
"<li>Action: Action is the type of filtration "
"required, it can be Reject/Deny/Allow data "
"packets.</li>\n"
"The protocol and action fields are required, all "
"others are optional.")
class AddRuleStep(workflows.Step):
action_class = AddRuleAction
contributes = ("name", "description", "protocol", "action",
"source_ip_address", "source_port",
"destination_ip_address", "destination_port",
"enabled", "shared", "ip_version")
def contribute(self, data, context):
context = super(AddRuleStep, self).contribute(data, context)
if data:
if context['protocol'] == 'any':
del context['protocol']
for field in ['source_port',
'destination_port',
'source_ip_address',
'destination_ip_address']:
if not context[field]:
del context[field]
return context
class AddRule(workflows.Workflow):
slug = "addrule"
name = _("Add Rule")
finalize_button_name = _("Add")
success_message = _('Added rule "%s".')
failure_message = _('Unable to add rule "%s".')
success_url = "horizon:project:firewalls:index"
# fwaas is designed to support a wide range of vendor
# firewalls. Considering the multitude of vendor firewall
# features in place today, firewall_rule definition can
# involve more complex configuration over time. Hence,
# a workflow instead of a single form is used for
# firewall_rule add to be ready for future extension.
default_steps = (AddRuleStep,)
def format_status_message(self, message):
return message % self.context.get('name')
def handle(self, request, context):
try:
api_fwaas.rule_create(request, **context)
return True
except Exception as e:
msg = self.format_status_message(self.failure_message) + str(e)
exceptions.handle(request, msg)
return False
class SelectRulesAction(workflows.Action):
rule = forms.MultipleChoiceField(
label=_("Rules"),
required=False,
widget=forms.ThemableCheckboxSelectMultiple(),
help_text=_("Create a policy with selected rules."))
class Meta(object):
name = _("Rules")
permissions = ('openstack.services.network',)
help_text = _("Select rules for your policy.")
def populate_rule_choices(self, request, context):
try:
tenant_id = self.request.user.tenant_id
rules = api_fwaas.rule_list_for_tenant(request, tenant_id)
rules = sorted(rules,
key=lambda rule: rule.name_or_id)
rule_list = [(rule.id, rule.name_or_id) for rule in rules
if not rule.firewall_policy_id]
except Exception as e:
rule_list = []
exceptions.handle(request,
_('Unable to retrieve rules (%(error)s).') % {
'error': str(e)})
return rule_list
class SelectRulesStep(workflows.Step):
action_class = SelectRulesAction
template_name = "project/firewalls/_update_rules.html"
contributes = ("firewall_rules",)
def contribute(self, data, context):
if data:
rules = self.workflow.request.POST.getlist("rule")
if rules:
rules = [r for r in rules if r != '']
context['firewall_rules'] = rules
return context
class SelectRoutersAction(workflows.Action):
router = forms.MultipleChoiceField(
label=_("Routers"),
required=False,
widget=forms.ThemableCheckboxSelectMultiple(),
help_text=_("Create a firewall with selected routers."))
class Meta(object):
name = _("Routers")
permissions = ('openstack.services.network',)
help_text = _("Select routers for your firewall.")
def populate_router_choices(self, request, context):
try:
tenant_id = self.request.user.tenant_id
routers_list = api_fwaas.firewall_unassociated_routers_list(
request, tenant_id)
except Exception as e:
routers_list = []
exceptions.handle(request,
_('Unable to retrieve routers (%(error)s).') % {
'error': str(e)})
routers_list = [(router.id, router.name_or_id)
for router in routers_list]
return routers_list
class SelectRoutersStep(workflows.Step):
action_class = SelectRoutersAction
template_name = "project/firewalls/_update_routers.html"
contributes = ("router_ids",)
def contribute(self, data, context):
if data:
routers = self.workflow.request.POST.getlist("router")
if routers:
routers = [r for r in routers if r != '']
context['router_ids'] = routers
else:
context['router_ids'] = []
return context
class AddPolicyAction(workflows.Action):
name = forms.CharField(max_length=80,
label=_("Name"))
description = forms.CharField(max_length=80,
label=_("Description"),
required=False)
shared = forms.BooleanField(label=_("Shared"),
initial=False,
required=False)
audited = forms.BooleanField(label=_("Audited"),
initial=False,
required=False)
def __init__(self, request, *args, **kwargs):
super(AddPolicyAction, self).__init__(request, *args, **kwargs)
# Only admin user can update the 'shared' attribute
self.ignore_shared = False
if not policy.check((("neutron-fwaas",
"create_firewall_policy:shared"),),
request):
self.fields['shared'].widget = forms.CheckboxInput(
attrs={'readonly': 'readonly', 'disabled': 'disabled'})
self.fields['shared'].help_text = _(
'Non admin users are not allowed to set the shared property '
'of the policy.')
self.ignore_shared = True
class Meta(object):
name = _("Policy")
permissions = ('openstack.services.network',)
help_text = _("Create a firewall policy with an ordered list "
"of firewall rules.\n\n"
"A firewall policy is an ordered collection of firewall "
"rules. So if the traffic matches the first rule, the "
"other rules are not executed. If the traffic does not "
"match the current rule, then the next rule is "
"executed. A firewall policy has the following "
"attributes:\n\n"
"<li>Shared: A firewall policy can be shared across "
"tenants. Thus it can also be made part of an audit "
"workflow wherein the firewall policy can be audited "
"by the relevant entity that is authorized.</li>"
"<li>Audited: When audited is set to True, it indicates "
"that the firewall policy has been audited. "
"Each time the firewall policy or the associated "
"firewall rules are changed, this attribute will be "
"set to False and will have to be explicitly set to "
"True through an update operation.</li>\n"
"The name field is required, all others are optional.")
class AddPolicyStep(workflows.Step):
action_class = AddPolicyAction
contributes = ("name", "description", "shared", "audited")
def contribute(self, data, context):
context = super(AddPolicyStep, self).contribute(data, context)
if data:
return context
class AddPolicy(workflows.Workflow):
slug = "addpolicy"
name = _("Add Policy")
finalize_button_name = _("Add")
success_message = _('Added policy "%s".')
failure_message = _('Unable to add policy "%s".')
success_url = "horizon:project:firewalls:index"
default_steps = (AddPolicyStep, SelectRulesStep)
def format_status_message(self, message):
return message % self.context.get('name')
def handle(self, request, context):
try:
api_fwaas.policy_create(request, **context)
return True
except Exception as e:
msg = self.format_status_message(self.failure_message) + str(e)
exceptions.handle(request, msg)
return False
class AddFirewallAction(workflows.Action):
name = forms.CharField(max_length=80,
label=_("Name"),
required=False)
description = forms.CharField(max_length=80,
label=_("Description"),
required=False)
firewall_policy_id = forms.ThemableChoiceField(label=_("Policy"))
admin_state_up = forms.BooleanField(label=_("Enable Admin State"),
initial=True,
required=False)
def __init__(self, request, *args, **kwargs):
super(AddFirewallAction, self).__init__(request, *args, **kwargs)
firewall_policy_id_choices = [('', _("Select a policy"))]
try:
tenant_id = self.request.user.tenant_id
policies = api_fwaas.policy_list_for_tenant(request, tenant_id)
policies = sorted(policies, key=lambda policy: policy.name)
except Exception as e:
exceptions.handle(
request,
_('Unable to retrieve policy list (%(error)s).') % {
'error': str(e)})
policies = []
for p in policies:
firewall_policy_id_choices.append((p.id, p.name_or_id))
self.fields['firewall_policy_id'].choices = firewall_policy_id_choices
class Meta(object):
name = _("Firewall")
permissions = ('openstack.services.network',)
help_text = _("Create a firewall based on a policy.\n\n"
"A firewall represents a logical firewall resource that "
"a tenant can instantiate and manage. A firewall must "
"be associated with one policy, all other fields are "
"optional.")
class AddFirewallStep(workflows.Step):
action_class = AddFirewallAction
contributes = ("name", "firewall_policy_id", "description",
"admin_state_up")
class AddFirewall(workflows.Workflow):
slug = "addfirewall"
name = _("Add Firewall")
finalize_button_name = _("Add")
success_message = _('Added firewall "%s".')
failure_message = _('Unable to add firewall "%s".')
success_url = "horizon:project:firewalls:index"
# fwaas is designed to support a wide range of vendor
# firewalls. Considering the multitude of vendor firewall
# features in place today, firewall definition can
# involve more complex configuration over time. Hence,
# a workflow instead of a single form is used for
# firewall_rule add to be ready for future extension.
default_steps = (AddFirewallStep, )
def format_status_message(self, message):
return message % self.context.get('name')
def handle(self, request, context):
try:
api_fwaas.firewall_create(request, **context)
return True
except Exception as e:
msg = self.format_status_message(self.failure_message) + str(e)
exceptions.handle(request, msg)
return False

View File

@ -1,22 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# The slug of the panel to be added to HORIZON_CONFIG. Required.
PANEL = 'firewalls'
# The slug of the dashboard the PANEL associated with. Required.
PANEL_DASHBOARD = 'project'
# The slug of the panel group the PANEL is associated with.
PANEL_GROUP = 'network'
# Python panel class of the PANEL to be added.
ADD_PANEL = ('neutron_fwaas_dashboard.dashboards.'
'project.firewalls.panel.Firewall')

View File

@ -1,21 +0,0 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
horizon.firewalls = {
workflow_init: function() {
// Initialise the drag and drop rule list
horizon.lists.generate_html("rule");
horizon.lists.generate_html("router");
}
};

View File

@ -1,7 +1,6 @@
@import "/dashboard/scss/components/resource_topology";
.sort-container {
/* FWaaS v2 and v1 */
#selected_port {
@include common_box_list_selected("rule");
}
@ -9,8 +8,4 @@
#selected_port {
@include common_box_list_selected("port");
}
/* FWaaS v1 */
#selected_port {
@include common_box_list_selected("router");
}
}

View File

@ -1,478 +0,0 @@
# Copyright 2013, Big Switch Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutronclient.v2_0.client import Client as neutronclient
from openstack_dashboard.api import neutron as api_neutron
from openstack_dashboard.test import helpers
from neutron_fwaas_dashboard.api import fwaas as api_fwaas
from neutron_fwaas_dashboard.test import helpers as test
class FwaasApiTests(test.APITestCase):
@helpers.create_mocks({neutronclient: ('create_firewall_rule',)})
def test_rule_create(self):
rule1 = self.fw_rules.first()
rule1_dict = self.api_fw_rules.first()
form_data = {'name': rule1.name,
'description': rule1.description,
'protocol': rule1.protocol,
'action': rule1.action,
'source_ip_address': rule1.source_ip_address,
'source_port': rule1.source_port,
'destination_ip_address': rule1.destination_ip_address,
'destination_port': rule1.destination_port,
'shared': rule1.shared,
'enabled': rule1.enabled
}
form_dict = {'firewall_rule': form_data}
ret_dict = {'firewall_rule': rule1_dict}
self.mock_create_firewall_rule.return_value = ret_dict
ret_val = api_fwaas.rule_create(self.request, **form_data)
self.assertIsInstance(ret_val, api_fwaas.Rule)
self.assertEqual(rule1.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_create_firewall_rule.assert_called_once_with(form_dict)
def _assert_rule_return_value(self, ret_val, exp_rule):
self.assertIsInstance(ret_val, api_fwaas.Rule)
self.assertEqual(exp_rule.name, ret_val.name)
self.assertTrue(ret_val.id)
if exp_rule.policy:
self.assertEqual(exp_rule.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_rule.policy.name, ret_val.policy.name)
else:
self.assertIsNone(ret_val.policy)
@helpers.create_mocks({neutronclient: ('list_firewall_rules',
'list_firewall_policies')})
def test_rule_list(self):
exp_rules = self.fw_rules.list()
api_rules = {'firewall_rules': self.api_fw_rules.list()}
api_policies = {'firewall_policies': self.api_fw_policies.list()}
self.mock_list_firewall_rules.return_value = api_rules
self.mock_list_firewall_policies.return_value = api_policies
ret_val = api_fwaas.rule_list(self.request)
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
self.mock_list_firewall_rules.assert_called_once_with()
self.mock_list_firewall_policies.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('list_firewall_rules',
'list_firewall_policies')})
def test_rule_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_rules = self.fw_rules.list()
api_rules = {'firewall_rules': self.api_fw_rules.list()}
api_policies = {'firewall_policies': self.api_fw_policies.list()}
self.mock_list_firewall_rules.side_effect = [
{'firewall_rules': []},
api_rules,
]
self.mock_list_firewall_policies.return_value = api_policies
ret_val = api_fwaas.rule_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_rules):
self._assert_rule_return_value(v, d)
self.assertEqual(2, self.mock_list_firewall_rules.call_count)
self.mock_list_firewall_rules.assert_has_calls([
mock.call(tenant_id=tenant_id, shared=False),
mock.call(shared=True),
])
self.mock_list_firewall_policies.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('show_firewall_rule',
'show_firewall_policy')})
def test_rule_get(self):
exp_rule = self.fw_rules.first()
ret_dict = {'firewall_rule': self.api_fw_rules.first()}
policy_dict = {'firewall_policy': self.api_fw_policies.first()}
self.mock_show_firewall_rule.return_value = ret_dict
self.mock_show_firewall_policy.return_value = policy_dict
ret_val = api_fwaas.rule_get(self.request, exp_rule.id)
self._assert_rule_return_value(ret_val, exp_rule)
self.mock_show_firewall_rule.assert_called_once_with(exp_rule.id)
self.mock_show_firewall_policy.assert_called_once_with(
exp_rule.firewall_policy_id)
@helpers.create_mocks({neutronclient: ('update_firewall_rule',)})
def test_rule_update(self):
rule = self.fw_rules.first()
rule_dict = self.api_fw_rules.first()
rule.name = 'new name'
rule.description = 'new desc'
rule.protocol = 'icmp'
rule.action = 'deny'
rule.shared = True
rule.enabled = False
rule_dict['name'] = 'new name'
rule_dict['description'] = 'new desc'
rule_dict['protocol'] = 'icmp'
rule_dict['action'] = 'deny'
rule_dict['shared'] = True
rule_dict['enabled'] = False
form_data = {'name': rule.name,
'description': rule.description,
'protocol': rule.protocol,
'action': rule.action,
'shared': rule.shared,
'enabled': rule.enabled
}
form_dict = {'firewall_rule': form_data}
ret_dict = {'firewall_rule': rule_dict}
self.mock_update_firewall_rule.return_value = ret_dict
ret_val = api_fwaas.rule_update(self.request,
rule.id, **form_data)
self.assertIsInstance(ret_val, api_fwaas.Rule)
self.assertEqual(rule.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_update_firewall_rule.assert_called_once_with(rule.id,
form_dict)
@helpers.create_mocks({neutronclient: ('create_firewall_policy', )})
def test_policy_create(self):
policy1 = self.fw_policies.first()
policy1_dict = self.api_fw_policies.first()
form_data = {'name': policy1.name,
'description': policy1.description,
'firewall_rules': policy1.firewall_rules,
'shared': policy1.shared,
'audited': policy1.audited
}
form_dict = {'firewall_policy': form_data}
ret_dict = {'firewall_policy': policy1_dict}
self.mock_create_firewall_policy.return_value = ret_dict
ret_val = api_fwaas.policy_create(self.request, **form_data)
self.assertIsInstance(ret_val, api_fwaas.Policy)
self.assertEqual(policy1.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_create_firewall_policy.assert_called_once_with(form_dict)
def _assert_policy_return_value(self, ret_val, exp_policy):
self.assertIsInstance(ret_val, api_fwaas.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(len(exp_policy.firewall_rules), len(ret_val.rules))
self.assertEqual(len(exp_policy.firewall_rules),
len(ret_val.firewall_rules))
for (r, exp_r) in zip(ret_val.rules, exp_policy.rules):
self.assertEqual(exp_r.id, r.id)
@helpers.create_mocks({neutronclient: ('list_firewall_policies',
'list_firewall_rules')})
def test_policy_list(self):
exp_policies = self.fw_policies.list()
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
rules_dict = {'firewall_rules': self.api_fw_rules.list()}
self.mock_list_firewall_policies.return_value = policies_dict
self.mock_list_firewall_rules.return_value = rules_dict
ret_val = api_fwaas.policy_list(self.request)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
self.mock_list_firewall_policies.assert_called_once_with()
self.mock_list_firewall_rules.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('list_firewall_policies',
'list_firewall_rules')})
def test_policy_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_policies = self.fw_policies.list()
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
rules_dict = {'firewall_rules': self.api_fw_rules.list()}
self.mock_list_firewall_policies.side_effect = [
{'firewall_policies': []},
policies_dict,
]
self.mock_list_firewall_rules.return_value = rules_dict
ret_val = api_fwaas.policy_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_policies):
self._assert_policy_return_value(v, d)
self.assertEqual(2, self.mock_list_firewall_policies.call_count)
self.mock_list_firewall_policies.assert_has_calls([
mock.call(tenant_id=tenant_id, shared=False),
mock.call(shared=True),
])
self.mock_list_firewall_rules.assert_called_once_with()
@helpers.create_mocks({neutronclient: ('show_firewall_policy',
'list_firewall_rules')})
def test_policy_get(self):
exp_policy = self.fw_policies.first()
policy_dict = self.api_fw_policies.first()
# The first two rules are associated with the first policy.
api_rules = self.api_fw_rules.list()[:2]
ret_dict = {'firewall_policy': policy_dict}
self.mock_show_firewall_policy.return_value = ret_dict
filters = {'firewall_policy_id': exp_policy.id}
ret_dict = {'firewall_rules': api_rules}
self.mock_list_firewall_rules.return_value = ret_dict
ret_val = api_fwaas.policy_get(self.request, exp_policy.id)
self._assert_policy_return_value(ret_val, exp_policy)
self.mock_show_firewall_policy.assert_called_once_with(exp_policy.id)
self.mock_list_firewall_rules.assert_called_once_with(**filters)
@helpers.create_mocks({neutronclient: ('show_firewall_policy',)})
def test_policy_get_no_rule(self):
# 2nd policy is not associated with any rules.
exp_policy = self.fw_policies.list()[1]
policy_dict = self.api_fw_policies.list()[1]
ret_dict = {'firewall_policy': policy_dict}
self.mock_show_firewall_policy.return_value = ret_dict
ret_val = api_fwaas.policy_get(self.request, exp_policy.id)
self.assertIsInstance(ret_val, api_fwaas.Policy)
self.assertEqual(exp_policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertFalse(len(ret_val.rules))
self.mock_show_firewall_policy.assert_called_once_with(exp_policy.id)
@helpers.create_mocks({neutronclient: ('update_firewall_policy',)})
def test_policy_update(self):
policy = self.fw_policies.first()
policy_dict = self.api_fw_policies.first()
policy.name = 'new name'
policy.description = 'new desc'
policy.shared = True
policy.audited = False
policy_dict['name'] = 'new name'
policy_dict['description'] = 'new desc'
policy_dict['shared'] = True
policy_dict['audited'] = False
form_data = {'name': policy.name,
'description': policy.description,
'shared': policy.shared,
'audited': policy.audited
}
form_dict = {'firewall_policy': form_data}
ret_dict = {'firewall_policy': policy_dict}
self.mock_update_firewall_policy.return_value = ret_dict
ret_val = api_fwaas.policy_update(self.request,
policy.id, **form_data)
self.assertIsInstance(ret_val, api_fwaas.Policy)
self.assertEqual(policy.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_update_firewall_policy.assert_called_once_with(
policy.id, form_dict)
@helpers.create_mocks({neutronclient: ('firewall_policy_insert_rule',)})
def test_policy_insert_rule(self):
policy = self.fw_policies.first()
policy_dict = self.api_fw_policies.first()
new_rule_id = 'h0881d38-c3eb-4fee-9763-12de3338041d'
policy.firewall_rules.append(new_rule_id)
policy_dict['firewall_rules'].append(new_rule_id)
body = {'firewall_rule_id': new_rule_id,
'insert_before': policy.firewall_rules[1],
'insert_after': policy.firewall_rules[0]}
self.mock_firewall_policy_insert_rule.return_value = policy_dict
ret_val = api_fwaas.policy_insert_rule(self.request,
policy.id, **body)
self.assertIn(new_rule_id, ret_val.firewall_rules)
self.mock_firewall_policy_insert_rule.assert_called_once_with(
policy.id, body)
@helpers.create_mocks({neutronclient: ('firewall_policy_remove_rule',)})
def test_policy_remove_rule(self):
policy = self.fw_policies.first()
policy_dict = self.api_fw_policies.first()
remove_rule_id = policy.firewall_rules[0]
policy_dict['firewall_rules'].remove(remove_rule_id)
body = {'firewall_rule_id': remove_rule_id}
self.mock_firewall_policy_remove_rule.return_value = policy_dict
ret_val = api_fwaas.policy_remove_rule(self.request,
policy.id, **body)
self.assertNotIn(remove_rule_id, ret_val.firewall_rules)
self.mock_firewall_policy_remove_rule.assert_called_once_with(
policy.id, body)
@helpers.create_mocks({neutronclient: ('create_firewall', )})
def test_firewall_create(self):
firewall = self.firewalls.first()
firewall_dict = self.api_firewalls.first()
form_data = {'name': firewall.name,
'description': firewall.description,
'firewall_policy_id': firewall.firewall_policy_id,
'admin_state_up': firewall.admin_state_up
}
form_dict = {'firewall': form_data}
ret_dict = {'firewall': firewall_dict}
self.mock_create_firewall.return_value = ret_dict
ret_val = api_fwaas.firewall_create(self.request, **form_data)
self.assertIsInstance(ret_val, api_fwaas.Firewall)
self.assertEqual(firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_create_firewall.assert_called_once_with(form_dict)
def _assert_firewall_return_value(self, ret_val, exp_firewall):
self.assertIsInstance(ret_val, api_fwaas.Firewall)
self.assertEqual(exp_firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.assertEqual(exp_firewall.firewall_policy_id, ret_val.policy.id)
self.assertEqual(exp_firewall.policy.name, ret_val.policy.name)
# TODO(absubram) : Add API tests for firewall_create with routers,
# add router to firewall and remove router from fw.
@helpers.create_mocks({neutronclient: ('list_firewalls',
'list_firewall_policies'),
api_neutron: ('is_extension_supported',
'router_list')})
def test_firewall_list(self):
exp_firewalls = self.firewalls.list()
firewalls_dict = {'firewalls': self.api_firewalls.list()}
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
self.mock_list_firewalls.return_value = firewalls_dict
self.mock_list_firewall_policies.return_value = policies_dict
self.mock_is_extension_supported.return_value = True
self.mock_router_list.return_value = self.routers.list()
ret_val = api_fwaas.firewall_list(self.request)
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d)
self.mock_list_firewalls.assert_called_once_with()
self.mock_list_firewall_policies.assert_called_once_with()
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_router_list.assert_called_once_with(
helpers.IsHttpRequest())
@helpers.create_mocks({neutronclient: ('list_firewalls',
'list_firewall_policies'),
api_neutron: ('is_extension_supported',
'router_list')})
def test_firewall_list_for_tenant(self):
tenant_id = self.request.user.project_id
exp_firewalls = self.firewalls.list()
firewalls_dict = {'firewalls': self.api_firewalls.list()}
policies_dict = {'firewall_policies': self.api_fw_policies.list()}
self.mock_list_firewalls.return_value = firewalls_dict
self.mock_list_firewall_policies.return_value = policies_dict
self.mock_is_extension_supported.return_value = True
self.mock_router_list.return_value = self.routers.list()
ret_val = api_fwaas.firewall_list_for_tenant(self.request, tenant_id)
for (v, d) in zip(ret_val, exp_firewalls):
self._assert_firewall_return_value(v, d)
self.mock_list_firewalls.assert_called_once_with(tenant_id=tenant_id)
self.mock_list_firewall_policies.assert_called_once_with()
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_router_list.assert_called_once_with(
helpers.IsHttpRequest(), tenant_id=self.request.user.project_id)
@helpers.create_mocks({neutronclient: ('show_firewall',
'show_firewall_policy'),
api_neutron: ('is_extension_supported',
'router_list')})
def test_firewall_get(self):
exp_firewall = self.firewalls.first()
ret_dict = {'firewall': self.api_firewalls.first()}
policy_dict = {'firewall_policy': self.api_fw_policies.first()}
self.mock_show_firewall.return_value = ret_dict
self.mock_show_firewall_policy.return_value = policy_dict
self.mock_is_extension_supported.return_value = True
self.mock_router_list.return_value = exp_firewall.routers
ret_val = api_fwaas.firewall_get(self.request, exp_firewall.id)
self._assert_firewall_return_value(ret_val, exp_firewall)
self.assertEqual(exp_firewall.router_ids, ret_val.router_ids)
self.assertEqual(exp_firewall.router_ids,
[r.id for r in ret_val.routers])
self.assertEqual([r.name for r in exp_firewall.routers],
[r.name for r in ret_val.routers])
self.mock_show_firewall.assert_called_once_with(exp_firewall.id)
self.mock_show_firewall_policy.assert_called_once_with(
exp_firewall.firewall_policy_id)
self.mock_is_extension_supported.assert_called_once_with(
helpers.IsHttpRequest(), 'fwaasrouterinsertion')
self.mock_router_list.assert_called_once_with(
helpers.IsHttpRequest(), id=exp_firewall.router_ids)
@helpers.create_mocks({neutronclient: ('update_firewall',)})
def test_firewall_update(self):
firewall = self.firewalls.first()
firewall_dict = self.api_firewalls.first()
firewall.name = 'new name'
firewall.description = 'new desc'
firewall.admin_state_up = False
firewall_dict['name'] = 'new name'
firewall_dict['description'] = 'new desc'
firewall_dict['admin_state_up'] = False
form_data = {'name': firewall.name,
'description': firewall.description,
'admin_state_up': firewall.admin_state_up
}
form_dict = {'firewall': form_data}
ret_dict = {'firewall': firewall_dict}
self.mock_update_firewall.return_value = ret_dict
ret_val = api_fwaas.firewall_update(self.request,
firewall.id, **form_data)
self.assertIsInstance(ret_val, api_fwaas.Firewall)
self.assertEqual(firewall.name, ret_val.name)
self.assertTrue(ret_val.id)
self.mock_update_firewall.assert_called_once_with(firewall.id,
form_dict)

View File

@ -44,11 +44,6 @@ INSTALLED_APPS = list(set(INSTALLED_APPS))
# mocked globally for unit tests and Selenium tests.
# 'method' is required. 'return_value' and 'side_effect'
# are optional and passed to mock.patch().
TEST_GLOBAL_MOCKS_ON_PANELS['firewalls'] = {
'method': ('neutron_fwaas_dashboard.dashboards.project.firewalls.panel.'
'Firewall.can_access'),
'return_value': True,
}
TEST_GLOBAL_MOCKS_ON_PANELS['firewalls_v2'] = {
'method': ('neutron_fwaas_dashboard.dashboards.project.firewalls_v2.panel.'
'Firewall_V2.can_access'),

View File

@ -1,164 +0,0 @@
# Copyright 2012 Nebula, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from openstack_dashboard.test.test_data import utils
from neutron_fwaas_dashboard.api import fwaas
def data(TEST):
# Data returned by openstack_dashboard.api.neutron wrapper.
TEST.firewalls = utils.TestDataContainer()
TEST.fw_policies = utils.TestDataContainer()
TEST.fw_rules = utils.TestDataContainer()
# Data return by neutronclient.
TEST.api_firewalls = utils.TestDataContainer()
TEST.api_fw_policies = utils.TestDataContainer()
TEST.api_fw_rules = utils.TestDataContainer()
# 1st rule (used by 1st policy)
rule1_dict = {'id': 'f0881d38-c3eb-4fee-9763-12de3338041d',
'tenant_id': '1',
'name': 'rule1',
'description': 'rule1 description',
'protocol': 'tcp',
'action': 'allow',
'source_ip_address': '1.2.3.0/24',
'source_port': '80',
'destination_ip_address': '4.5.6.7/32',
'destination_port': '1:65535',
'firewall_policy_id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'position': 1,
'shared': True,
'enabled': True,
'ip_version': '4'}
TEST.api_fw_rules.add(rule1_dict)
rule1 = fwaas.Rule(copy.deepcopy(rule1_dict))
# NOTE: rule1['policy'] is set below
TEST.fw_rules.add(rule1)
# 2nd rule (used by 2nd policy; no name)
rule2_dict = {'id': 'c6298a93-850f-4f64-b78a-959fd4f1e5df',
'tenant_id': '1',
'name': '',
'description': '',
'protocol': 'udp',
'action': 'deny',
'source_ip_address': '1.2.3.0/24',
'source_port': '80',
'destination_ip_address': '4.5.6.7/32',
'destination_port': '1:65535',
'firewall_policy_id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'position': 2,
'shared': True,
'enabled': True,
'ip_version': '6'}
TEST.api_fw_rules.add(rule2_dict)
rule2 = fwaas.Rule(copy.deepcopy(rule2_dict))
# NOTE: rule2['policy'] is set below
TEST.fw_rules.add(rule2)
# 3rd rule (not used by any policy)
rule3_dict = {'id': 'h0881d38-c3eb-4fee-9763-12de3338041d',
'tenant_id': '1',
'name': 'rule3',
'description': 'rule3 description',
'protocol': None,
'action': 'allow',
'source_ip_address': '1.2.3.0/24',
'source_port': '80',
'destination_ip_address': '4.5.6.7/32',
'destination_port': '1:65535',
'firewall_policy_id': None,
'position': None,
'shared': True,
'enabled': True,
'ip_version': '4'}
TEST.api_fw_rules.add(rule3_dict)
rule3 = fwaas.Rule(copy.deepcopy(rule3_dict))
# rule3 is not associated with any rules
rule3._apidict['policy'] = None
TEST.fw_rules.add(rule3)
# 1st policy (associated with 2 rules)
policy1_dict = {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'tenant_id': '1',
'name': 'policy1',
'description': 'policy with two rules',
'firewall_rules': [rule1_dict['id'], rule2_dict['id']],
'audited': True,
'shared': True}
TEST.api_fw_policies.add(policy1_dict)
policy1 = fwaas.Policy(copy.deepcopy(policy1_dict))
policy1._apidict['rules'] = [rule1, rule2]
TEST.fw_policies.add(policy1)
# Reverse relations (rule -> policy)
rule1._apidict['policy'] = policy1
rule2._apidict['policy'] = policy1
# 2nd policy (associated with no rules; no name)
policy2_dict = {'id': 'cf50b331-787a-4623-825e-da794c918d6a',
'tenant_id': '1',
'name': '',
'description': '',
'firewall_rules': [],
'audited': False,
'shared': False}
TEST.api_fw_policies.add(policy2_dict)
policy2 = fwaas.Policy(copy.deepcopy(policy2_dict))
policy2._apidict['rules'] = []
TEST.fw_policies.add(policy2)
# 1st firewall
fw1_dict = {'id': '8913dde8-4915-4b90-8d3e-b95eeedb0d49',
'tenant_id': '1',
'firewall_policy_id':
'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'firewall1',
'router_ids': [TEST.routers.first().id],
'description': 'firewall description',
'status': 'PENDING_CREATE',
'admin_state_up': True}
TEST.api_firewalls.add(fw1_dict)
fw1 = fwaas.Firewall(copy.deepcopy(fw1_dict))
fw1._apidict['policy'] = policy1
fw1._apidict['routers'] = [TEST.routers.first()]
TEST.firewalls.add(fw1)
# 2nd firewall (no name)
fw2_dict = {'id': '1aa75150-415f-458e-bae5-5a362a4fb1f7',
'tenant_id': '1',
'firewall_policy_id':
'abcdef-c3eb-4fee-9763-12de3338041e',
'name': '',
'router_ids': [],
'description': '',
'status': 'PENDING_CREATE',
'admin_state_up': True}
TEST.api_firewalls.add(fw2_dict)
fw2 = fwaas.Firewall(copy.deepcopy(fw2_dict))
fw2._apidict['policy'] = policy1
fw2._apidict['routers'] = []
TEST.firewalls.add(fw2)

View File

@ -14,12 +14,10 @@ from openstack_dashboard.test.test_data import utils
def load_data(load_onto=None):
from neutron_fwaas_dashboard.test.test_data import fwaas_data
from neutron_fwaas_dashboard.test.test_data import fwaas_v2_data
# The order of these loaders matters, some depend on others.
loaders = (
fwaas_data.data,
fwaas_v2_data.data,
)
if load_onto:

View File

@ -0,0 +1,7 @@
---
upgrade:
- |
FWaaS v1 support was dropped.
FWaaS v1 has been deprecated in neutron-fwaas and was dropped in Stein
release. Along with neutron-fwaas, neutron-fwaas-dashboard dropped
its support.