Freescale FWaaS Plugin code final decomposition.
This removes all the artifacts specific to Freescale FWaaS Plugin code from the neutron-fwaas tree. The code is maintained at github repository. Setup option "fsl_firewall" for this plugin are removed in change: I194a4da49058724766b7fde7343f85d19a75fe8c UpgradeImpact Change-Id: I5d419671cf3ce3dc10020949bc14bb9d8031da3f Closes-Bug: #1519223
This commit is contained in:
parent
1cbc6f47b4
commit
69bc97741f
@ -1,11 +0,0 @@
|
||||
Freescale Firewall as a Service Plugin
|
||||
|
||||
* For more information, refer to:
|
||||
https://wiki.openstack.org/wiki/Freescale_Firewall_as_a_Service_Plugin
|
||||
|
||||
* For Information on Freescale CI, refer to:
|
||||
https://wiki.openstack.org/wiki/ThirdPartySystems/Freescale_CI
|
||||
|
||||
* Freescale CI contact:
|
||||
- fslosci@freescale.com
|
||||
- trinath.somanchi@freescale.com
|
@ -1,272 +0,0 @@
|
||||
# Copyright 2015 Freescale, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from neutron.common import rpc
|
||||
from neutron.common import topics
|
||||
from neutron.i18n import _LE
|
||||
from neutron.plugins.common import constants as const
|
||||
from neutron.plugins.ml2.drivers.freescale import config
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
from neutron_fwaas.db.firewall import firewall_db
|
||||
from neutron_fwaas.services.firewall import fwaas_plugin
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FirewallCallbacks(fwaas_plugin.FirewallCallbacks):
|
||||
|
||||
"""Callbacks to handle CRD notifications to amqp."""
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
|
||||
def __init__(self, plugin):
|
||||
self.plugin = plugin
|
||||
self._client = self.plugin._client
|
||||
|
||||
def get_firewalls_for_tenant(self, context, **kwargs):
|
||||
"""Get all Firewalls and rules for a tenant from CRD.
|
||||
|
||||
For all the firewalls created, check CRD for config_mode.
|
||||
If it is Network Node, prepare the list.
|
||||
Other config modes are handled by CRD internally.
|
||||
"""
|
||||
|
||||
fw_list = []
|
||||
for fw in self.plugin.get_firewalls(context):
|
||||
fw_id = fw['id']
|
||||
# get the firewall details from CRD service.
|
||||
crd_fw_details = self._client.show_firewall(fw_id)
|
||||
config_mode = crd_fw_details['firewall']['config_mode']
|
||||
# get those FWs with config mode NetworkNode (NN) or None
|
||||
if config_mode in ('NN', None):
|
||||
fw_list.append(self.plugin._make_firewall_dict_with_rules(
|
||||
context, fw_id))
|
||||
return fw_list
|
||||
|
||||
|
||||
class FirewallPlugin(firewall_db.Firewall_db_mixin):
|
||||
|
||||
"""Implementation of the Freescale Firewall Service Plugin.
|
||||
|
||||
This class manages the workflow of FWaaS request/response.
|
||||
Existing Firewall database is used.
|
||||
"""
|
||||
supported_extension_aliases = ["fwaas"]
|
||||
|
||||
def __init__(self):
|
||||
"""Do the initialization for the firewall service plugin here."""
|
||||
|
||||
self._client = config.get_crdclient()
|
||||
self.endpoints = [FirewallCallbacks(self)]
|
||||
|
||||
self.conn = rpc.create_connection()
|
||||
self.conn.create_consumer(
|
||||
topics.FIREWALL_PLUGIN, self.endpoints, fanout=False)
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _update_firewall_status(self, context, firewall_id):
|
||||
status_update = {"firewall": {"status": const.PENDING_UPDATE}}
|
||||
super(FirewallPlugin, self).update_firewall(context, firewall_id,
|
||||
status_update)
|
||||
try:
|
||||
self._client.update_firewall(firewall_id, status_update)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE("Failed to update firewall status (%s)."),
|
||||
firewall_id)
|
||||
|
||||
def _update_firewall_policy(self, context, firewall_policy_id):
|
||||
firewall_policy = self.get_firewall_policy(context, firewall_policy_id)
|
||||
if firewall_policy:
|
||||
for firewall_id in firewall_policy['firewall_list']:
|
||||
self._update_firewall_status(context, firewall_id)
|
||||
|
||||
# Firewall Management
|
||||
def create_firewall(self, context, firewall):
|
||||
"""Create Firewall.
|
||||
|
||||
'PENDING' status updates are handled by CRD by posting messages
|
||||
to AMQP (topics.FIREWALL_PLUGIN) that Firewall consumes to
|
||||
update its status.
|
||||
"""
|
||||
firewall['firewall']['status'] = const.PENDING_CREATE
|
||||
fw = super(FirewallPlugin, self).create_firewall(context, firewall)
|
||||
try:
|
||||
crd_firewall = {'firewall': fw}
|
||||
self._client.create_firewall(crd_firewall)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
fw_id = fw['firewall']['id']
|
||||
LOG.error(_LE("Failed to create firewall (%s)."),
|
||||
fw_id)
|
||||
super(FirewallPlugin, self).delete_firewall(context, fw_id)
|
||||
return fw
|
||||
|
||||
def update_firewall(self, context, fw_id, firewall=None):
|
||||
firewall['firewall']['status'] = const.PENDING_UPDATE
|
||||
fw = super(FirewallPlugin,
|
||||
self).update_firewall(context, fw_id, firewall)
|
||||
try:
|
||||
crd_firewall = {'firewall': fw}
|
||||
self._client.update_firewall(fw_id, crd_firewall)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to update firewall (%s)."), fw_id)
|
||||
# TODO(trinaths):do rollback on error
|
||||
return fw
|
||||
|
||||
def delete_db_firewall_object(self, context, fw_id):
|
||||
firewall = self.get_firewall(context, fw_id)
|
||||
if firewall['status'] in [const.PENDING_DELETE]:
|
||||
try:
|
||||
super(FirewallPlugin, self).delete_firewall(context, fw_id)
|
||||
except exc.NoResultFound:
|
||||
LOG.error(_LE("Delete Firewall (%s) DB object failed."),
|
||||
fw_id)
|
||||
|
||||
def delete_firewall(self, context, fw_id):
|
||||
status_update = {"firewall": {"status": const.PENDING_DELETE}}
|
||||
super(FirewallPlugin, self).update_firewall(context, fw_id,
|
||||
status_update)
|
||||
try:
|
||||
self._client.delete_firewall(fw_id)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to delete firewall (%s)."), fw_id)
|
||||
# TODO(trinaths):do rollback on error
|
||||
|
||||
# Firewall Policy Management
|
||||
def create_firewall_policy(self, context, firewall_policy):
|
||||
fw_policy = super(FirewallPlugin, self).create_firewall_policy(
|
||||
context,
|
||||
firewall_policy)
|
||||
fw_policy.pop('firewall_list')
|
||||
try:
|
||||
crd_firewall_policy = {'firewall_policy': fw_policy}
|
||||
self._client.create_firewall_policy(crd_firewall_policy)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
fwp_id = fw_policy['firewall_policy']['id']
|
||||
LOG.error(_LE("Failed to create firewall policy (%s)."),
|
||||
fwp_id)
|
||||
super(FirewallPlugin, self).delete_firewall_policy(context,
|
||||
fwp_id)
|
||||
return fw_policy
|
||||
|
||||
def update_firewall_policy(self, context, fp_id, firewall_policy):
|
||||
fw_policy = super(FirewallPlugin,
|
||||
self).update_firewall_policy(context, fp_id,
|
||||
firewall_policy)
|
||||
self._update_firewall_policy(context, fp_id)
|
||||
fw_policy.pop('firewall_list')
|
||||
try:
|
||||
crd_firewall_policy = {'firewall_policy': fw_policy}
|
||||
self._client.update_firewall_policy(fp_id, crd_firewall_policy)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Update firewall policy failed (%s)."), fp_id)
|
||||
# TODO(trinaths):do rollback on error
|
||||
return fw_policy
|
||||
|
||||
def delete_firewall_policy(self, context, fp_id):
|
||||
super(FirewallPlugin, self).delete_firewall_policy(context, fp_id)
|
||||
try:
|
||||
self._client.delete_firewall_policy(fp_id)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Delete Firewall Policy (%s) failed."),
|
||||
fp_id)
|
||||
# TODO(trinaths):do rollback on error
|
||||
|
||||
# Firewall Rule management
|
||||
def create_firewall_rule(self, context, firewall_rule):
|
||||
fw_rule = super(FirewallPlugin,
|
||||
self).create_firewall_rule(context, firewall_rule)
|
||||
try:
|
||||
crd_firewall_rule = {'firewall_rule': fw_rule}
|
||||
self._client.create_firewall_rule(crd_firewall_rule)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
fwr_id = fw_rule['firewall_rule']['id']
|
||||
LOG.error(_LE("Failed to create firewall rule (%s)."),
|
||||
fwr_id)
|
||||
super(FirewallPlugin, self).delete_firewall_rule(context,
|
||||
fwr_id)
|
||||
return fw_rule
|
||||
|
||||
def update_firewall_rule(self, context, fr_id, firewall_rule):
|
||||
fw_rule = super(FirewallPlugin,
|
||||
self).update_firewall_rule(context, fr_id,
|
||||
firewall_rule)
|
||||
if fw_rule['firewall_policy_id']:
|
||||
self._update_firewall_policy(
|
||||
context,
|
||||
fw_rule['firewall_policy_id'])
|
||||
try:
|
||||
crd_firewall_rule = {'firewall_rule': fw_rule}
|
||||
self._client.update_firewall_rule(fr_id, crd_firewall_rule)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to update firewall rule (%s)."), fr_id)
|
||||
# TODO(trinaths):do rollback on error
|
||||
return fw_rule
|
||||
|
||||
def delete_firewall_rule(self, context, fr_id):
|
||||
fw_rule = self.get_firewall_rule(context, fr_id)
|
||||
super(FirewallPlugin, self).delete_firewall_rule(context, fr_id)
|
||||
if fw_rule['firewall_policy_id']:
|
||||
self._update_firewall_policy(context,
|
||||
fw_rule['firewall_policy_id'])
|
||||
try:
|
||||
self._client.delete_firewall_rule(fr_id)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to delete firewall rule (%s)."),
|
||||
fr_id)
|
||||
# TODO(trinaths):do rollback on error
|
||||
|
||||
def insert_rule(self, context, rid, rule_info):
|
||||
rule = super(FirewallPlugin,
|
||||
self).insert_rule(context, rid, rule_info)
|
||||
try:
|
||||
self._client.firewall_policy_insert_rule(rid, rule_info)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to insert rule %(rule)s into "
|
||||
"firewall policy %(fwpid)s."),
|
||||
{'rule': rule_info,
|
||||
'fwpid': rid})
|
||||
super(FirewallPlugin, self).remove_rule(context, rid,
|
||||
rule_info)
|
||||
self._update_firewall_policy(context, rid)
|
||||
return rule
|
||||
|
||||
def remove_rule(self, context, rid, rule_info):
|
||||
rule = super(FirewallPlugin,
|
||||
self).remove_rule(context, rid, rule_info)
|
||||
try:
|
||||
self._client.firewall_policy_remove_rule(rid, rule_info)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to remove rule %(rule)s from "
|
||||
"firewall policy %(fwpid)s."),
|
||||
{'rule': rule_info,
|
||||
'fwpid': rid})
|
||||
self._update_firewall_policy(context, rid)
|
||||
return rule
|
@ -1,338 +0,0 @@
|
||||
# Copyright 2015 Freescale, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from neutron import context
|
||||
from neutron import manager
|
||||
from webob import exc
|
||||
|
||||
from neutron.plugins.common import constants as const
|
||||
from neutron_fwaas.tests.unit.db.firewall import (
|
||||
test_firewall_db as test_db_firewall)
|
||||
|
||||
"""Unit testing for Freescale FWaaS Plugin."""
|
||||
|
||||
PLUGIN = ("neutron_fwaas.services.firewall.freescale"
|
||||
".fwaas_plugin.FirewallPlugin")
|
||||
|
||||
|
||||
class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
|
||||
|
||||
def setUp(self):
|
||||
mock.patch('neutronclient.v2_0.client.Client').start()
|
||||
super(TestFirewallCallbacks, self).setUp(fw_plugin=PLUGIN)
|
||||
n_mgr = manager.NeutronManager
|
||||
self.plugin = n_mgr.get_service_plugins()[const.FIREWALL]
|
||||
self.callbacks = self.plugin.endpoints[0]
|
||||
self.ctx = context.get_admin_context()
|
||||
|
||||
def test_get_firewalls_for_tenant(self):
|
||||
tenant_id = 'test-tenant'
|
||||
with self.firewall_rule(name='fwr1', tenant_id=tenant_id,
|
||||
do_delete=False) as fr:
|
||||
with self.firewall_policy(tenant_id=tenant_id,
|
||||
do_delete=False) as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
fw_id = fr['firewall_rule']['id']
|
||||
data = {'firewall_policy':
|
||||
{'firewall_rules': [fw_id]}}
|
||||
self.plugin.update_firewall_policy(self.ctx, fwp_id, data)
|
||||
admin_state = test_db_firewall.ADMIN_STATE_UP
|
||||
with self.firewall(firewall_policy_id=fwp_id,
|
||||
tenant_id=tenant_id,
|
||||
do_delete=False,
|
||||
admin_state_up=admin_state) as fw:
|
||||
self.callbacks.get_firewalls_for_tenant(self.ctx,
|
||||
host='dummy')
|
||||
fw_id = fw['firewall']['id']
|
||||
fw['firewall']['config_mode'] = "NN"
|
||||
self.plugin._client.show_firewall.assert_called_once_with(
|
||||
fw_id)
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
self.plugin.delete_firewall_policy(self.ctx, fwp_id)
|
||||
self.plugin.delete_firewall_rule(self.ctx, fr['firewall_rule']['id'])
|
||||
|
||||
|
||||
class TestFreescaleFirewallPlugin(test_db_firewall.TestFirewallDBPlugin):
|
||||
|
||||
def setUp(self):
|
||||
mock.patch('neutronclient.v2_0.client.Client').start()
|
||||
super(TestFreescaleFirewallPlugin, self).setUp(fw_plugin=PLUGIN)
|
||||
self.plugin = manager.NeutronManager.get_service_plugins()['FIREWALL']
|
||||
self.callbacks = self.plugin.endpoints[0]
|
||||
self.clnt = self.plugin._client
|
||||
self.ctx = context.get_admin_context()
|
||||
|
||||
def test_create_firewall_with_admin_and_fwp_is_shared(self):
|
||||
fw_name = "fw_with_shared_fwp"
|
||||
with self.firewall_policy(do_delete=False, tenant_id="tenantX") as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
ctx = context.get_admin_context()
|
||||
target_tenant = 'tenant1'
|
||||
with self.firewall(name=fw_name,
|
||||
firewall_policy_id=fwp_id,
|
||||
tenant_id=target_tenant,
|
||||
context=ctx,
|
||||
do_delete=False,
|
||||
admin_state_up=True) as fw:
|
||||
self.assertEqual(target_tenant, fw['firewall']['tenant_id'])
|
||||
fw_id = fw['firewall']['id']
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.clnt.delete_firewall.assert_called_once_with(fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_create_update_delete_firewall_rule(self):
|
||||
"""Testing create, update and delete firewall rule."""
|
||||
ctx = context.get_admin_context()
|
||||
clnt = self.plugin._client
|
||||
with self.firewall_rule(do_delete=False) as fwr:
|
||||
fwr_id = fwr['firewall_rule']['id']
|
||||
# Create Firewall Rule
|
||||
crd_rule = {'firewall_rule': fwr}
|
||||
clnt.create_firewall_rule.assert_called_once_with(fwr)
|
||||
# Update Firewall Rule
|
||||
data = {'firewall_rule': {'name': 'new_rule_name',
|
||||
'source_port': '10:20',
|
||||
'destination_port': '30:40'}}
|
||||
fw_rule = self.plugin.update_firewall_rule(ctx, fwr_id, data)
|
||||
crd_rule = {'firewall_rule': fw_rule}
|
||||
clnt.update_firewall_rule.assert_called_once_with(fwr_id, crd_rule)
|
||||
# Delete Firewall Rule
|
||||
self.plugin.delete_firewall_rule(ctx, fwr_id)
|
||||
clnt.delete_firewall_rule.assert_called_once_with(fwr_id)
|
||||
|
||||
def test_create_update_delete_firewall_policy(self):
|
||||
"""Testing create, update and delete firewall policy."""
|
||||
with self.firewall_policy(do_delete=False) as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
# Create Firewall Policy
|
||||
crd_policy = {'firewall_policy': fwp}
|
||||
self.clnt.create_firewall_policy.assert_called_once_with(fwp)
|
||||
# Update Firewall Policy
|
||||
data = {'firewall_policy': {'name': 'updated-name'}}
|
||||
fwp = self.plugin.update_firewall_policy(self.ctx, fwp_id, data)
|
||||
crd_policy = {'firewall_policy': fwp}
|
||||
self.clnt.update_firewall_policy.assert_called_once_with(
|
||||
fwp_id,
|
||||
crd_policy)
|
||||
# Delete Firewall Policy
|
||||
self.plugin.delete_firewall_policy(self.ctx, fwp_id)
|
||||
self.clnt.delete_firewall_policy.assert_called_once_with(fwp_id)
|
||||
|
||||
def test_create_firewall(self):
|
||||
name = "firewall-fake"
|
||||
expected_attrs = self._get_test_firewall_attrs(name)
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
expected_attrs['firewall_policy_id'] = fwp_id
|
||||
with self.firewall(name=name,
|
||||
firewall_policy_id=fwp_id,
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False) as actual_firewall:
|
||||
fw_id = actual_firewall['firewall']['id']
|
||||
self.assertDictSupersetOf(expected_attrs,
|
||||
actual_firewall['firewall'])
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.clnt.delete_firewall.assert_called_once_with(fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_show_firewall(self):
|
||||
name = "firewall1"
|
||||
expected_attrs = self._get_test_firewall_attrs(name)
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
expected_attrs['firewall_policy_id'] = fwp_id
|
||||
with self.firewall(name=name,
|
||||
firewall_policy_id=fwp_id,
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False) as actual_firewall:
|
||||
fw_id = actual_firewall['firewall']['id']
|
||||
req = self.new_show_request('firewalls', fw_id,
|
||||
fmt=self.fmt)
|
||||
actual_fw = self.deserialize(self.fmt,
|
||||
req.get_response(self.ext_api))
|
||||
self.assertDictSupersetOf(expected_attrs,
|
||||
actual_fw['firewall'])
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.clnt.delete_firewall.assert_called_once_with(fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_update_firewall(self):
|
||||
name = "new_firewall1"
|
||||
expected_attrs = self._get_test_firewall_attrs(name)
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
expected_attrs['firewall_policy_id'] = fwp_id
|
||||
with self.firewall(firewall_policy_id=fwp_id,
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False) as firewall:
|
||||
fw_id = firewall['firewall']['id']
|
||||
self.callbacks.set_firewall_status(self.ctx, fw_id,
|
||||
const.ACTIVE)
|
||||
data = {'firewall': {'name': name}}
|
||||
req = self.new_update_request('firewalls', data, fw_id)
|
||||
actual_fw = self.deserialize(self.fmt,
|
||||
req.get_response(self.ext_api))
|
||||
expected_attrs = self._replace_firewall_status(expected_attrs,
|
||||
const.PENDING_CREATE,
|
||||
const.PENDING_UPDATE)
|
||||
self.assertDictSupersetOf(expected_attrs,
|
||||
actual_fw['firewall'])
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.clnt.delete_firewall.assert_called_once_with(fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_update_firewall_with_fwp(self):
|
||||
with self.firewall_policy() as fwp1, \
|
||||
self.firewall_policy(shared=False, do_delete=False) as fwp2, \
|
||||
self.firewall(firewall_policy_id=fwp1['firewall_policy']['id'],
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False) as firewall:
|
||||
fw_id = firewall['firewall']['id']
|
||||
fwp2_id = fwp2['firewall_policy']['id']
|
||||
self.callbacks.set_firewall_status(self.ctx, fw_id, const.ACTIVE)
|
||||
data = {'firewall': {'firewall_policy_id': fwp2_id}}
|
||||
req = self.new_update_request('firewalls', data, fw_id)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(200, res.status_int)
|
||||
|
||||
def test_update_firewall_with_shared_fwp(self):
|
||||
with self.firewall_policy() as fwp1, \
|
||||
self.firewall_policy(tenant_id='tenant2',
|
||||
do_delete=False) as fwp2, \
|
||||
self.firewall(firewall_policy_id=fwp1['firewall_policy']['id'],
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False) as firewall:
|
||||
fw_id = firewall['firewall']['id']
|
||||
fwp2_id = fwp2['firewall_policy']['id']
|
||||
self.callbacks.set_firewall_status(self.ctx, fw_id, const.ACTIVE)
|
||||
data = {'firewall': {'firewall_policy_id': fwp2_id}}
|
||||
req = self.new_update_request('firewalls', data, fw_id)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(200, res.status_int)
|
||||
|
||||
def test_update_firewall_with_admin_and_fwp_different_tenant(self):
|
||||
with self.firewall_policy(do_delete=False) as fwp1, \
|
||||
self.firewall_policy(tenant_id='tenant2', shared=False,
|
||||
do_delete=False) as fwp2, \
|
||||
self.firewall(firewall_policy_id=fwp1['firewall_policy']['id'],
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False) as firewall:
|
||||
fw_id = firewall['firewall']['id']
|
||||
fwp2_id = fwp2['firewall_policy']['id']
|
||||
self.callbacks.set_firewall_status(self.ctx, fw_id, const.ACTIVE)
|
||||
data = {'firewall': {'firewall_policy_id': fwp2_id}}
|
||||
req = self.new_update_request('firewalls', data, fw_id)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(409, res.status_int)
|
||||
|
||||
def test_list_firewalls(self):
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
with self.firewall(name='fw1', firewall_policy_id=fwp_id,
|
||||
description='fw') as fw1, \
|
||||
self.firewall(name='fw2', firewall_policy_id=fwp_id,
|
||||
description='fw') as fw2, \
|
||||
self.firewall(name='fw3', firewall_policy_id=fwp_id,
|
||||
description='fw') as fw3:
|
||||
|
||||
fwalls = [fw1, fw2, fw3]
|
||||
self._test_list_resources('firewall', fwalls,
|
||||
query_params='description=fw')
|
||||
for fw in fwalls:
|
||||
fw_id = fw['firewall']['id']
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_delete_firewall_policy_with_firewall_association(self):
|
||||
attrs = self._get_test_firewall_attrs()
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
attrs['firewall_policy_id'] = fwp_id
|
||||
with self.firewall(firewall_policy_id=fwp_id,
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False)as fw:
|
||||
fw_id = fw['firewall']['id']
|
||||
req = self.new_delete_request('firewall_policies', fwp_id)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(exc.HTTPConflict.code, res.status_int)
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.clnt.delete_firewall.assert_called_once_with(fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_update_firewall_policy_assoc_with_other_tenant_firewall(self):
|
||||
with self.firewall_policy(shared=True, tenant_id='tenant1') as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
with self.firewall(firewall_policy_id=fwp_id,
|
||||
do_delete=False) as fw:
|
||||
fw_id = fw['firewall']['id']
|
||||
data = {'firewall_policy': {'shared': False}}
|
||||
req = self.new_update_request('firewall_policies', data,
|
||||
fwp['firewall_policy']['id'])
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(exc.HTTPConflict.code, res.status_int)
|
||||
self.plugin.delete_firewall(self.ctx, fw_id)
|
||||
self.clnt.delete_firewall.assert_called_once_with(fw_id)
|
||||
self.callbacks.firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_delete_firewall(self):
|
||||
attrs = self._get_test_firewall_attrs()
|
||||
with self.firewall_policy() as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
attrs['firewall_policy_id'] = fwp_id
|
||||
with self.firewall(firewall_policy_id=fwp_id,
|
||||
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
|
||||
do_delete=False) as firewall:
|
||||
fw_id = firewall['firewall']['id']
|
||||
attrs = self._replace_firewall_status(attrs,
|
||||
const.PENDING_CREATE,
|
||||
const.PENDING_DELETE)
|
||||
req = self.new_delete_request('firewalls', fw_id)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
|
||||
self.clnt.delete_firewall.assert_called_once_with(fw_id)
|
||||
self.plugin.endpoints[0].firewall_deleted(self.ctx, fw_id)
|
||||
|
||||
def test_insert_remove_rule(self):
|
||||
"""Testing Insert and Remove rule operations."""
|
||||
status_update = {"firewall": {"status": 'PENDING_UPDATE'}}
|
||||
with self.firewall_rule(name='fake_rule',
|
||||
do_delete=False) as fr1:
|
||||
fr_id = fr1['firewall_rule']['id']
|
||||
with self.firewall_policy(do_delete=False) as fwp:
|
||||
fwp_id = fwp['firewall_policy']['id']
|
||||
with self.firewall(firewall_policy_id=fwp_id,
|
||||
do_delete=False) as fw:
|
||||
fw_id = fw['firewall']['id']
|
||||
# Insert Rule
|
||||
rule_info = {'firewall_rule_id': fr_id}
|
||||
self.plugin.insert_rule(self.ctx, fwp_id, rule_info)
|
||||
fp_insert_rule = self.clnt.firewall_policy_insert_rule
|
||||
fp_insert_rule.assert_called_once_with(fwp_id, rule_info)
|
||||
self.clnt.update_firewall.assert_called_once_with(
|
||||
fw_id,
|
||||
status_update)
|
||||
# Remove Rule
|
||||
rule_info = {'firewall_rule_id': fr_id}
|
||||
self.plugin.remove_rule(self.ctx, fwp_id, rule_info)
|
||||
fp_remove_rule = self.clnt.firewall_policy_remove_rule
|
||||
fp_remove_rule.assert_called_once_with(fwp_id, rule_info)
|
||||
self.clnt.update_firewall.assert_called_with(fw_id,
|
||||
status_update)
|
||||
|
||||
def test_create_firewall_with_dvr(self):
|
||||
"""Skip DVR Testing."""
|
||||
self.skipTest("DVR not supported")
|
Loading…
Reference in New Issue
Block a user