Support firewall service for SDK

FWaaS v2 API, add SDK API for firewall group, firewall policy and firewall rule.

Implements blueprint: firewall-v2-support
Change-Id: I2abdedb0e810cbd2278012c8a7cf56d14c7b705d
Co-Authored-By: LIU Yulong <i@liuyulong.me>
Co-Authored-By: Matthias Lisin <lisin@strato.de>
This commit is contained in:
wangweijia 2018-08-10 11:24:26 +08:00 committed by Matthias Lisin
parent 6befbbe76c
commit 5a998529cc
14 changed files with 1115 additions and 0 deletions

View File

@ -114,6 +114,7 @@
networking services enabled.
required-projects:
- openstack/designate
- openstack/neutron-fwaas
- openstack/octavia
vars:
devstack_local_conf:
@ -127,9 +128,18 @@
network_driver: network_noop_driver
certificates:
cert_manager: local_cert_manager
$NEUTRON_CONF:
fwaas:
agent_version: v2
driver: iptables_v2
enabled: true
firewall_l2_driver: ovs
devstack_localrc:
Q_SERVICE_PLUGIN_CLASSES: qos,trunk,firewall_v2
devstack_plugins:
designate: https://git.openstack.org/openstack/designate
octavia: https://git.openstack.org/openstack/octavia
neutron-fwaas: https://git.openstack.org/openstack/neutron-fwaas
devstack_services:
designate: true
octavia: true
@ -138,6 +148,7 @@
o-hm: true
o-hk: true
neutron-dns: true
neutron-fwaas-v2: true
s-account: false
s-container: false
s-object: false

View File

@ -17,6 +17,9 @@ from openstack.network.v2 import auto_allocated_topology as \
_auto_allocated_topology
from openstack.network.v2 import availability_zone
from openstack.network.v2 import extension
from openstack.network.v2 import firewall_group as _firewall_group
from openstack.network.v2 import firewall_policy as _firewall_policy
from openstack.network.v2 import firewall_rule as _firewall_rule
from openstack.network.v2 import flavor as _flavor
from openstack.network.v2 import floating_ip as _floating_ip
from openstack.network.v2 import health_monitor as _health_monitor
@ -2434,6 +2437,346 @@ class Proxy(proxy.Proxy):
router = self._get_resource(_router.Router, router)
return agent.remove_router_from_agent(self, router.id)
def create_firewall_group(self, **attrs):
"""Create a new firewall group from attributes
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.network.v2.firewall_group.FirewallGroup`,
comprised of the properties on the FirewallGroup class.
:returns: The results of firewall group creation
:rtype: :class:`~openstack.network.v2.firewall_group.FirewallGroup`
"""
return self._create(_firewall_group.FirewallGroup, **attrs)
def delete_firewall_group(self, firewall_group, ignore_missing=True):
"""Delete a firewall group
:param firewall_group:
The value can be either the ID of a firewall group or a
:class:`~openstack.network.v2.firewall_group.FirewallGroup`
instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the firewall group does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent firewall group.
:returns: ``None``
"""
self._delete(_firewall_group.FirewallGroup, firewall_group,
ignore_missing=ignore_missing)
def find_firewall_group(self, name_or_id, ignore_missing=True, **args):
"""Find a single firewall group
:param name_or_id: The name or ID of a firewall group.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the resource does not exist.
When set to ``True``, None will be returned when
attempting to find a nonexistent resource.
:param dict args: Any additional parameters to be passed into
underlying methods. such as query filters.
:returns: One :class:`~openstack.network.v2.firewall_group.
FirewallGroup` or None
"""
return self._find(_firewall_group.FirewallGroup,
name_or_id, ignore_missing=ignore_missing, **args)
def get_firewall_group(self, firewall_group):
"""Get a single firewall group
:param firewall_group: The value can be the ID of a firewall group or a
:class:`~openstack.network.v2.firewall_group.FirewallGroup`
instance.
:returns: One
:class:`~openstack.network.v2.firewall_group.FirewallGroup`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_firewall_group.FirewallGroup, firewall_group)
def firewall_groups(self, **query):
"""Return a generator of firewall_groups
:param dict query: Optional query parameters to be sent to limit
the resources being returned. Valid parameters are:
* ``description``: Firewall group description
* ``egress_policy_id``: The ID of egress firewall policy
* ``ingress_policy_id``: The ID of ingress firewall policy
* ``name``: The name of a firewall group
* ``shared``: Indicates whether this firewall group is shared
across all projects.
* ``status``: The status of the firewall group. Valid values are
ACTIVE, INACTIVE, ERROR, PENDING_UPDATE, or
PENDING_DELETE.
* ``ports``: A list of the IDs of the ports associated with the
firewall group.
* ``project_id``: The ID of the project this firewall group is
associated with.
:returns: A generator of firewall group objects
"""
return self._list(_firewall_group.FirewallGroup,
paginated=False, **query)
def update_firewall_group(self, firewall_group, **attrs):
"""Update a firewall group
:param firewall_group: Either the id of a firewall group or a
:class:`~openstack.network.v2.firewall_group.FirewallGroup`
instance.
:param dict attrs: The attributes to update on the firewall group
represented by ``firewall_group``.
:returns: The updated firewall group
:rtype: :class:`~openstack.network.v2.firewall_group.FirewallGroup`
"""
return self._update(_firewall_group.FirewallGroup, firewall_group,
**attrs)
def create_firewall_policy(self, **attrs):
"""Create a new firewall policy from attributes
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.network.v2.firewall_policy.FirewallPolicy`,
comprised of the properties on the FirewallPolicy class.
:returns: The results of firewall policy creation
:rtype: :class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
"""
return self._create(_firewall_policy.FirewallPolicy, **attrs)
def delete_firewall_policy(self, firewall_policy, ignore_missing=True):
"""Delete a firewall policy
:param firewall_policy:
The value can be either the ID of a firewall policy or a
:class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the firewall policy does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent firewall policy.
:returns: ``None``
"""
self._delete(_firewall_policy.FirewallPolicy, firewall_policy,
ignore_missing=ignore_missing)
def find_firewall_policy(self, name_or_id, ignore_missing=True, **args):
"""Find a single firewall policy
:param name_or_id: The name or ID of a firewall policy.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the resource does not exist.
When set to ``True``, None will be returned when
attempting to find a nonexistent resource.
:param dict args: Any additional parameters to be passed into
underlying methods. such as query filters.
:returns: One :class:`~openstack.network.v2.firewall_policy.
FirewallPolicy` or None
"""
return self._find(_firewall_policy.FirewallPolicy,
name_or_id, ignore_missing=ignore_missing, **args)
def get_firewall_policy(self, firewall_policy):
"""Get a single firewall policy
:param firewall_policy: The value can be the ID of a firewall policy
or a
:class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
instance.
:returns: One
:class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_firewall_policy.FirewallPolicy, firewall_policy)
def firewall_policies(self, **query):
"""Return a generator of firewall_policies
:param dict query: Optional query parameters to be sent to limit
the resources being returned. Valid parameters are:
* ``description``: Firewall policy description
* ``firewall_rule``: A list of the IDs of the firewall rules
associated with the firewall policy.
* ``name``: The name of a firewall policy
* ``shared``: Indicates whether this firewall policy is shared
across all projects.
* ``project_id``: The ID of the project that owns the resource.
:returns: A generator of firewall policy objects
"""
return self._list(_firewall_policy.FirewallPolicy,
paginated=False, **query)
def update_firewall_policy(self, firewall_policy, **attrs):
"""Update a firewall policy
:param firewall_policy: Either the id of a firewall policy or a
:class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
instance.
:param dict attrs: The attributes to update on the firewall policy
represented by ``firewall_policy``.
:returns: The updated firewall policy
:rtype: :class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
"""
return self._update(_firewall_policy.FirewallPolicy, firewall_policy,
**attrs)
def insert_rule_into_policy(self, firewall_policy_id, firewall_rule_id,
insert_after=None, insert_before=None):
"""Insert a firewall_rule into a firewall_policy in order
:param firewall_policy_id: The ID of the firewall policy.
:param firewall_rule_id: The ID of the firewall rule.
:param insert_after: The ID of the firewall rule to insert the new
rule after. It will be worked only when
insert_before is none.
:param insert_before: The ID of the firewall rule to insert the new
rule before.
:returns: The updated firewall policy
:rtype: :class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
"""
body = {'firewall_rule_id': firewall_rule_id,
'insert_after': insert_after,
'insert_before': insert_before}
policy = self._get_resource(_firewall_policy.FirewallPolicy,
firewall_policy_id)
return policy.insert_rule(self, **body)
def remove_rule_from_policy(self, firewall_policy_id, firewall_rule_id):
"""Remove a firewall_rule from a firewall_policy.
:param firewall_policy_id: The ID of the firewall policy.
:param firewall_rule_id: The ID of the firewall rule.
:returns: The updated firewall policy
:rtype: :class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
"""
body = {'firewall_rule_id': firewall_rule_id}
policy = self._get_resource(_firewall_policy.FirewallPolicy,
firewall_policy_id)
return policy.remove_rule(self, **body)
def create_firewall_rule(self, **attrs):
"""Create a new firewall rule from attributes
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.network.v2.firewall_rule.FirewallRule`,
comprised of the properties on the FirewallRule class.
:returns: The results of firewall rule creation
:rtype: :class:`~openstack.network.v2.firewall_rule.FirewallRule`
"""
return self._create(_firewall_rule.FirewallRule, **attrs)
def delete_firewall_rule(self, firewall_rule, ignore_missing=True):
"""Delete a firewall rule
:param firewall_rule:
The value can be either the ID of a firewall rule or a
:class:`~openstack.network.v2.firewall_rule.FirewallRule`
instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the firewall rule does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent firewall rule.
:returns: ``None``
"""
self._delete(_firewall_rule.FirewallRule, firewall_rule,
ignore_missing=ignore_missing)
def find_firewall_rule(self, name_or_id, ignore_missing=True, **args):
"""Find a single firewall rule
:param name_or_id: The name or ID of a firewall rule.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the resource does not exist.
When set to ``True``, None will be returned when
attempting to find a nonexistent resource.
:param dict args: Any additional parameters to be passed into
underlying methods. such as query filters.
:returns: One :class:`~openstack.network.v2.firewall_rule.
FirewallRule` or None
"""
return self._find(_firewall_rule.FirewallRule,
name_or_id, ignore_missing=ignore_missing, **args)
def get_firewall_rule(self, firewall_rule):
"""Get a single firewall rule
:param firewall_rule: The value can be the ID of a firewall rule or a
:class:`~openstack.network.v2.firewall_rule.FirewallRule`
instance.
:returns: One
:class:`~openstack.network.v2.firewall_rule.FirewallRule`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_firewall_rule.FirewallRule, firewall_rule)
def firewall_rules(self, **query):
"""Return a generator of firewall_rules
:param dict query: Optional query parameters to be sent to limit
the resources being returned. Valid parameters are:
* ``action``: The action that the API performs on traffic that
matches the firewall rule.
* ``description``: Firewall rule description
* ``name``: The name of a firewall group
* ``destination_ip_address``: The destination IPv4 or IPv6 address
or CIDR for the firewall rule.
* ``destination_port``: The destination port or port range for
the firewall rule.
* ``enabled``: Facilitates selectively turning off rules.
* ``shared``: Indicates whether this firewall group is shared
across all projects.
* ``ip_version``: The IP protocol version for the firewall rule.
* ``protocol``: The IP protocol for the firewall rule.
* ``source_ip_address``: The source IPv4 or IPv6 address or CIDR
for the firewall rule.
* ``source_port``: The source port or port range for the firewall
rule.
* ``project_id``: The ID of the project this firewall group is
associated with.
:returns: A generator of firewall rule objects
"""
return self._list(_firewall_rule.FirewallRule,
paginated=False, **query)
def update_firewall_rule(self, firewall_rule, **attrs):
"""Update a firewall rule
:param firewall_rule: Either the id of a firewall rule or a
:class:`~openstack.network.v2.firewall_rule.FirewallRule`
instance.
:param dict attrs: The attributes to update on the firewall rule
represented by ``firewall_rule``.
:returns: The updated firewall rule
:rtype: :class:`~openstack.network.v2.firewall_rule.FirewallRule`
"""
return self._update(_firewall_rule.FirewallRule, firewall_rule,
**attrs)
def create_security_group(self, **attrs):
"""Create a new security group from attributes

View File

@ -0,0 +1,60 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network import network_service
from openstack import resource
class FirewallGroup(resource.Resource):
resource_key = 'firewall_group'
resources_key = 'firewall_groups'
base_path = '/fwaas/firewall_groups'
service = network_service.NetworkService()
# capabilities
allow_create = True
allow_fetch = True
allow_commit = True
allow_delete = True
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'egress_firewall_policy_id',
'ingress_firewall_policy_id', 'name', 'shared', 'status', 'ports',
'project_id')
# Properties
#: The administrative state of the firewall group, which is up (true) or
#: down (false). Default is true.
admin_state_up = resource.Body('admin_state_up')
#: The firewall group rule description.
description = resource.Body('description')
#: The ID of the egress firewall policy for the firewall group.
egress_firewall_policy_id = resource.Body('egress_firewall_policy_id')
#: The ID of the ingress firewall policy for the firewall group.
ingress_firewall_policy_id = resource.Body('ingress_firewall_policy_id')
#: The ID of the firewall group.
id = resource.Body('id')
#: The name of a firewall group
name = resource.Body('name')
#: A list of the IDs of the ports associated with the firewall group.
ports = resource.Body('ports')
#: The ID of the project that owns the resource.
project_id = resource.Body('project_id')
#: Indicates whether this firewall group is shared across all projects.
shared = resource.Body('shared')
#: The status of the firewall group. Valid values are ACTIVE, INACTIVE,
#: ERROR, PENDING_UPDATE, or PENDING_DELETE.
status = resource.Body('status')

View File

@ -0,0 +1,97 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.exceptions import HttpException
from openstack.network import network_service
from openstack import resource
from openstack import utils
class FirewallPolicy(resource.Resource):
resource_key = 'firewall_policy'
resources_key = 'firewall_policies'
base_path = '/fwaas/firewall_policies'
service = network_service.NetworkService()
# capabilities
allow_create = True
allow_fetch = True
allow_commit = True
allow_delete = True
allow_list = True
_query_mapping = resource.QueryParameters(
'description', 'firewall_rules', 'name', 'project_id', 'shared')
# Properties
#: Each time that the firewall policy or its associated rules are changed,
#: the API sets this attribute to false. To audit the policy,
#: explicitly set this attribute to true.
audited = resource.Body('audited')
#: The firewall group rule description.
description = resource.Body('description')
#: The ID of the firewall policy.
id = resource.Body('id')
#: A list of the IDs of the firewall rules associated with the
#: firewall policy.
firewall_rules = resource.Body('firewall_rules')
#: The name of a firewall policy
name = resource.Body('name')
#: The ID of the project that owns the resource.
project_id = resource.Body('project_id')
#: Set to true to make this firewall policy visible to other projects.
shared = resource.Body('shared')
def insert_rule(self, session, **body):
"""Insert a firewall_rule into a firewall_policy in order.
:param session: The session to communicate through.
:type session: :class:`~openstack.session.Session`
:param dict body: The body requested to be updated on the router
:returns: The updated firewall policy
:rtype: :class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
:raises: :class:`~openstack.exceptions.HttpException` on error.
"""
url = utils.urljoin(self.base_path, self.id, 'insert_rule')
return self._put_request(session, url, body)
def remove_rule(self, session, **body):
"""Remove a firewall_rule from a firewall_policy.
:param session: The session to communicate through.
:type session: :class:`~openstack.session.Session`
:param dict body: The body requested to be updated on the router
:returns: The updated firewall policy
:rtype: :class:`~openstack.network.v2.firewall_policy.FirewallPolicy`
:raises: :class:`~openstack.exceptions.HttpException` on error.
"""
url = utils.urljoin(self.base_path, self.id, 'remove_rule')
return self._put_request(session, url, body)
def _put_request(self, session, url, json_data):
resp = session.put(url, json=json_data)
data = resp.json()
if not resp.ok:
message = None
if 'NeutronError' in data:
message = data['NeutronError']['message']
raise HttpException(message=message, response=resp)
self._body.attributes.update(data)
return self

View File

@ -0,0 +1,68 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network import network_service
from openstack import resource
class FirewallRule(resource.Resource):
resource_key = 'firewall_rule'
resources_key = 'firewall_rules'
base_path = '/fwaas/firewall_rules'
service = network_service.NetworkService()
# capabilities
allow_create = True
allow_fetch = True
allow_commit = True
allow_delete = True
allow_list = True
_query_mapping = resource.QueryParameters(
'action', 'description', 'destination_ip_address', 'name',
'destination_port', 'enabled', 'ip_version', 'project_id', 'protocol',
'shared', 'source_ip_address', 'source_port', 'firewall_policy_id')
# Properties
#: The action that the API performs on traffic that matches the firewall
#: rule. Valid values are allow or deny. Default is deny.
action = resource.Body('action')
#: The description of the firewall rule
description = resource.Body('description')
#: The destination IPv4 or IPv6 address or CIDR for the firewall rule.
destination_ip_address = resource.Body('destination_ip_address')
#: The destination port or port range for the firewall rule.
destination_port = resource.Body('destination_port')
#: Facilitates selectively turning off rules without having to disassociate
#: the rule from the firewall policy
enabled = resource.Body('enabled')
#: The IP protocol version for the firewall rule. Valid values are 4 or 6.
ip_version = resource.Body('ip_version')
#: The name of the firewall rule.
name = resource.Body('name')
#: The ID of the project that owns the resource.
project_id = resource.Body('project_id')
#: The IP protocol for the firewall rule.
protocol = resource.Body('protocol')
#: Indicates whether this firewall rule is shared across all projects.
shared = resource.Body('shared')
#: The source IPv4 or IPv6 address or CIDR for the firewall rule.
source_ip_address = resource.Body('source_ip_address')
#: The source port or port range for the firewall rule.
source_port = resource.Body('source_port')
#: The ID of the firewall policy.
firewall_policy_id = resource.Body('firewall_policy_id')
#: The ID of the firewall rule.
id = resource.Body('id')

View File

@ -0,0 +1,52 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import firewall_group
from openstack.tests.functional import base
class TestFirewallGroup(base.BaseFunctionalTest):
ID = None
def setUp(self):
super(TestFirewallGroup, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
self.NAME = self.getUniqueString()
sot = self.conn.network.create_firewall_group(name=self.NAME)
assert isinstance(sot, firewall_group.FirewallGroup)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_firewall_group(self.ID,
ignore_missing=False)
self.assertIs(None, sot)
super(TestFirewallGroup, self).tearDown()
def test_find(self):
sot = self.conn.network.find_firewall_group(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_firewall_group(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
def test_list(self):
names = [o.name for o in self.conn.network.firewall_groups()]
self.assertIn(self.NAME, names)

View File

@ -0,0 +1,52 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import firewall_policy
from openstack.tests.functional import base
class TestFirewallPolicy(base.BaseFunctionalTest):
ID = None
def setUp(self):
super(TestFirewallPolicy, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
self.NAME = self.getUniqueString()
sot = self.conn.network.create_firewall_policy(name=self.NAME)
assert isinstance(sot, firewall_policy.FirewallPolicy)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_firewall_policy(self.ID,
ignore_missing=False)
self.assertIs(None, sot)
super(TestFirewallPolicy, self).tearDown()
def test_find(self):
sot = self.conn.network.find_firewall_policy(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_firewall_policy(self.ID)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ID, sot.id)
def test_list(self):
names = [o.name for o in self.conn.network.firewall_policies()]
self.assertIn(self.NAME, names)

View File

@ -0,0 +1,69 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import firewall_rule
from openstack.tests.functional import base
class TestFirewallRule(base.BaseFunctionalTest):
ACTION = 'allow'
DEST_IP = '10.0.0.0/24'
DEST_PORT = '80'
IP_VERSION = 4
PROTOCOL = 'tcp'
SOUR_IP = '10.0.1.0/24'
SOUR_PORT = '8000'
ID = None
def setUp(self):
super(TestFirewallRule, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
self.NAME = self.getUniqueString()
sot = self.conn.network.create_firewall_rule(
name=self.NAME, action=self.ACTION, source_port=self.SOUR_PORT,
destination_port=self.DEST_PORT, source_ip_address=self.SOUR_IP,
destination_ip_address=self.DEST_IP, ip_version=self.IP_VERSION,
protocol=self.PROTOCOL)
assert isinstance(sot, firewall_rule.FirewallRule)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
def tearDown(self):
sot = self.conn.network.delete_firewall_rule(self.ID,
ignore_missing=False)
self.assertIs(None, sot)
super(TestFirewallRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_firewall_rule(self.NAME)
self.assertEqual(self.ID, sot.id)
def test_get(self):
sot = self.conn.network.get_firewall_rule(self.ID)
self.assertEqual(self.ID, sot.id)
self.assertEqual(self.NAME, sot.name)
self.assertEqual(self.ACTION, sot.action)
self.assertEqual(self.DEST_IP, sot.destination_ip_address)
self.assertEqual(self.DEST_PORT, sot.destination_port)
self.assertEqual(self.IP_VERSION, sot.ip_version)
self.assertEqual(self.SOUR_IP, sot.source_ip_address)
self.assertEqual(self.SOUR_PORT, sot.source_port)
def test_list(self):
ids = [o.id for o in self.conn.network.firewall_rules()]
self.assertIn(self.ID, ids)

View File

@ -0,0 +1,91 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import firewall_policy
from openstack.network.v2 import firewall_rule
from openstack.tests.functional import base
class TestFirewallPolicyRuleAssociations(base.BaseFunctionalTest):
POLICY_NAME = uuid.uuid4().hex
RULE1_NAME = uuid.uuid4().hex
RULE2_NAME = uuid.uuid4().hex
POLICY_ID = None
RULE1_ID = None
RULE2_ID = None
def setUp(self):
super(TestFirewallPolicyRuleAssociations, self).setUp()
if not self.conn._has_neutron_extension('fwaas_v2'):
self.skipTest('fwaas_v2 service not supported by cloud')
rul1 = self.conn.network.create_firewall_rule(name=self.RULE1_NAME)
assert isinstance(rul1, firewall_rule.FirewallRule)
self.assertEqual(self.RULE1_NAME, rul1.name)
rul2 = self.conn.network.create_firewall_rule(name=self.RULE2_NAME)
assert isinstance(rul2, firewall_rule.FirewallRule)
self.assertEqual(self.RULE2_NAME, rul2.name)
pol = self.conn.network.create_firewall_policy(name=self.POLICY_NAME)
assert isinstance(pol, firewall_policy.FirewallPolicy)
self.assertEqual(self.POLICY_NAME, pol.name)
self.RULE1_ID = rul1.id
self.RULE2_ID = rul2.id
self.POLICY_ID = pol.id
def tearDown(self):
sot = self.conn.network.delete_firewall_policy(self.POLICY_ID,
ignore_missing=False)
self.assertIs(None, sot)
sot = self.conn.network.delete_firewall_rule(self.RULE1_ID,
ignore_missing=False)
self.assertIs(None, sot)
sot = self.conn.network.delete_firewall_rule(self.RULE2_ID,
ignore_missing=False)
self.assertIs(None, sot)
super(TestFirewallPolicyRuleAssociations, self).tearDown()
def test_insert_rule_into_policy(self):
policy = self.conn.network.insert_rule_into_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE1_ID)
self.assertIn(self.RULE1_ID, policy['firewall_rules'])
policy = self.conn.network.insert_rule_into_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE2_ID,
insert_before=self.RULE1_ID)
self.assertEqual(self.RULE1_ID, policy['firewall_rules'][1])
self.assertEqual(self.RULE2_ID, policy['firewall_rules'][0])
def test_remove_rule_from_policy(self):
# insert rules into policy before we remove it again
policy = self.conn.network.insert_rule_into_policy(
self.POLICY_ID, firewall_rule_id=self.RULE1_ID)
self.assertIn(self.RULE1_ID, policy['firewall_rules'])
policy = self.conn.network.insert_rule_into_policy(
self.POLICY_ID, firewall_rule_id=self.RULE2_ID)
self.assertIn(self.RULE2_ID, policy['firewall_rules'])
policy = self.conn.network.remove_rule_from_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE1_ID)
self.assertNotIn(self.RULE1_ID, policy['firewall_rules'])
policy = self.conn.network.remove_rule_from_policy(
self.POLICY_ID,
firewall_rule_id=self.RULE2_ID)
self.assertNotIn(self.RULE2_ID, policy['firewall_rules'])

View File

@ -0,0 +1,60 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from openstack.network.v2 import firewall_group
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'description': '1',
'name': '2',
'egress_firewall_policy_id': '3',
'ingress_firewall_policy_id': '4',
'shared': True,
'status': 'ACTIVE',
'ports': ['5', '6'],
'project_id': '7',
}
class TestFirewallGroup(testtools.TestCase):
def test_basic(self):
sot = firewall_group.FirewallGroup()
self.assertEqual('firewall_group', sot.resource_key)
self.assertEqual('firewall_groups', sot.resources_key)
self.assertEqual('/fwaas/firewall_groups', sot.base_path)
self.assertEqual('network', sot.service.service_type)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_make_it(self):
sot = firewall_group.FirewallGroup(**EXAMPLE)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['egress_firewall_policy_id'],
sot.egress_firewall_policy_id)
self.assertEqual(EXAMPLE['ingress_firewall_policy_id'],
sot.ingress_firewall_policy_id)
self.assertEqual(EXAMPLE['shared'], sot.shared)
self.assertEqual(EXAMPLE['status'], sot.status)
self.assertEqual(list, type(sot.ports))
self.assertEqual(EXAMPLE['ports'], sot.ports)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)

View File

@ -0,0 +1,52 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from openstack.network.v2 import firewall_policy
EXAMPLE = {
'description': '1',
'name': '2',
'firewall_rules': ['a30b0ec2-a468-4b1c-8dbf-928ded2a57a8',
'8d562e98-24f3-46e1-bbf3-d9347c0a67ee'],
'shared': True,
'project_id': '4',
}
class TestFirewallPolicy(testtools.TestCase):
def test_basic(self):
sot = firewall_policy.FirewallPolicy()
self.assertEqual('firewall_policy', sot.resource_key)
self.assertEqual('firewall_policies', sot.resources_key)
self.assertEqual('/fwaas/firewall_policies', sot.base_path)
self.assertEqual('network', sot.service.service_type)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_make_it(self):
sot = firewall_policy.FirewallPolicy(**EXAMPLE)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['firewall_rules'], sot.firewall_rules)
self.assertEqual(EXAMPLE['shared'], sot.shared)
self.assertEqual(list, type(sot.firewall_rules))
self.assertEqual(EXAMPLE['project_id'], sot.project_id)

View File

@ -0,0 +1,65 @@
# Copyright (c) 2018 China Telecom Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from openstack.network.v2 import firewall_rule
EXAMPLE = {
'action': 'allow',
'description': '1',
'destination_ip_address': '10.0.0.2/24',
'destination_port': '2',
'name': '3',
'enabled': True,
'ip_version': 4,
'protocol': 'tcp',
'shared': True,
'source_ip_address': '10.0.1.2/24',
'source_port': '5',
'project_id': '6',
}
class TestFirewallRule(testtools.TestCase):
def test_basic(self):
sot = firewall_rule.FirewallRule()
self.assertEqual('firewall_rule', sot.resource_key)
self.assertEqual('firewall_rules', sot.resources_key)
self.assertEqual('/fwaas/firewall_rules', sot.base_path)
self.assertEqual('network', sot.service.service_type)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertTrue(sot.allow_commit)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
def test_make_it(self):
sot = firewall_rule.FirewallRule(**EXAMPLE)
self.assertEqual(EXAMPLE['action'], sot.action)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['destination_ip_address'],
sot.destination_ip_address)
self.assertEqual(EXAMPLE['destination_port'], sot.destination_port)
self.assertEqual(EXAMPLE['name'], sot.name)
self.assertEqual(EXAMPLE['enabled'], sot.enabled)
self.assertEqual(EXAMPLE['ip_version'], sot.ip_version)
self.assertEqual(EXAMPLE['protocol'], sot.protocol)
self.assertEqual(EXAMPLE['shared'], sot.shared)
self.assertEqual(EXAMPLE['source_ip_address'],
sot.source_ip_address)
self.assertEqual(EXAMPLE['source_port'], sot.source_port)
self.assertEqual(EXAMPLE['project_id'], sot.project_id)

View File

@ -21,6 +21,9 @@ from openstack.network.v2 import agent
from openstack.network.v2 import auto_allocated_topology
from openstack.network.v2 import availability_zone
from openstack.network.v2 import extension
from openstack.network.v2 import firewall_group
from openstack.network.v2 import firewall_policy
from openstack.network.v2 import firewall_rule
from openstack.network.v2 import flavor
from openstack.network.v2 import floating_ip
from openstack.network.v2 import health_monitor
@ -873,6 +876,93 @@ class TestNetworkProxy(test_proxy_base.TestProxyBase):
expected_kwargs={'agent_id': AGENT_ID},
)
def test_firewall_group_create_attrs(self):
self.verify_create(self.proxy.create_firewall_group,
firewall_group.FirewallGroup)
def test_firewall_group_delete(self):
self.verify_delete(self.proxy.delete_firewall_group,
firewall_group.FirewallGroup, False)
def test_firewall_group_delete_ignore(self):
self.verify_delete(self.proxy.delete_firewall_group,
firewall_group.FirewallGroup, True)
def test_firewall_group_find(self):
self.verify_find(self.proxy.find_firewall_group,
firewall_group.FirewallGroup)
def test_firewall_group_get(self):
self.verify_get(self.proxy.get_firewall_group,
firewall_group.FirewallGroup)
def test_firewall_groups(self):
self.verify_list(self.proxy.firewall_groups,
firewall_group.FirewallGroup,
paginated=False)
def test_firewall_group_update(self):
self.verify_update(self.proxy.update_firewall_group,
firewall_group.FirewallGroup)
def test_firewall_policy_create_attrs(self):
self.verify_create(self.proxy.create_firewall_policy,
firewall_policy.FirewallPolicy)
def test_firewall_policy_delete(self):
self.verify_delete(self.proxy.delete_firewall_policy,
firewall_policy.FirewallPolicy, False)
def test_firewall_policy_delete_ignore(self):
self.verify_delete(self.proxy.delete_firewall_policy,
firewall_policy.FirewallPolicy, True)
def test_firewall_policy_find(self):
self.verify_find(self.proxy.find_firewall_policy,
firewall_policy.FirewallPolicy)
def test_firewall_policy_get(self):
self.verify_get(self.proxy.get_firewall_policy,
firewall_policy.FirewallPolicy)
def test_firewall_policies(self):
self.verify_list(self.proxy.firewall_policies,
firewall_policy.FirewallPolicy,
paginated=False)
def test_firewall_policy_update(self):
self.verify_update(self.proxy.update_firewall_policy,
firewall_policy.FirewallPolicy)
def test_firewall_rule_create_attrs(self):
self.verify_create(self.proxy.create_firewall_rule,
firewall_rule.FirewallRule)
def test_firewall_rule_delete(self):
self.verify_delete(self.proxy.delete_firewall_rule,
firewall_rule.FirewallRule, False)
def test_firewall_rule_delete_ignore(self):
self.verify_delete(self.proxy.delete_firewall_rule,
firewall_rule.FirewallRule, True)
def test_firewall_rule_find(self):
self.verify_find(self.proxy.find_firewall_rule,
firewall_rule.FirewallRule)
def test_firewall_rule_get(self):
self.verify_get(self.proxy.get_firewall_rule,
firewall_rule.FirewallRule)
def test_firewall_rules(self):
self.verify_list(self.proxy.firewall_rules,
firewall_rule.FirewallRule,
paginated=False)
def test_firewall_rule_update(self):
self.verify_update(self.proxy.update_firewall_rule,
firewall_rule.FirewallRule)
def test_security_group_create_attrs(self):
self.verify_create(self.proxy.create_security_group,
security_group.SecurityGroup)

View File

@ -0,0 +1,5 @@
---
features:
- |
Implement fwaas v2 resources for managing firewall groups, rules
and policies.