Remove vendor driver: vyatta from community repo

Vendor drivers are being removed from the community repo and
they can continue to be hosted in respective vendor repos. This
has been discussed and communicated during the Mitaka release
and time given until the Newton release.

Change-Id: I9a64db228bcd9313c04d238c39ae1c53be89e339
This commit is contained in:
Sridar Kandaswamy 2016-08-26 08:55:05 -07:00
parent cf63fe658c
commit f6aed8b66a
13 changed files with 7 additions and 852 deletions

View File

@ -1,58 +0,0 @@
# Copyright 2015 Brocade Communications System, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron import context
from oslo_log import log as logging
from neutron_fwaas.services.firewall.agents.vyatta import vyatta_utils
LOG = logging.getLogger(__name__)
class VyattaFirewallService(object):
# TODO(vishwanathj): Code to be revised in Liberty release to use
# the base class firewall_service.FirewallService for registrations
def __init__(self, l3_agent):
self.conf = l3_agent.conf
registry.subscribe(
sync_firewall_zones, resources.ROUTER, events.AFTER_CREATE)
registry.subscribe(
sync_firewall_zones, resources.ROUTER, events.AFTER_DELETE)
registry.subscribe(
sync_firewall_zones, resources.ROUTER, events.AFTER_UPDATE)
def sync_firewall_zones(resource, event, l3_agent, **kwargs):
LOG.debug('VyattaFirewallService:: sync_firewall_zones() called')
ri = kwargs['router']
ctx = context.Context(None, ri.router['tenant_id'])
client = l3_agent._vyatta_clients_pool.get_by_db_lookup(
ri.router['id'], ctx)
fw_list = l3_agent.fwplugin_rpc.get_firewalls_for_tenant(ctx)
if fw_list:
zone_cmds = []
for fw in fw_list:
if ri.router['id'] in fw['router_ids']:
fw_name = vyatta_utils.get_firewall_name(ri, fw)
zone_cmds.extend(vyatta_utils.get_zone_cmds(client, ri,
fw_name))
client.exec_cmd_batch(zone_cmds)

View File

@ -1,39 +0,0 @@
# Copyright 2015 Brocade Communications System, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from networking_brocade.vyatta.common import l3_agent as vyatta_l3
from neutron.agent import l3_agent
from neutron_fwaas.services.firewall.agents.vyatta import firewall_service
class VyattaFirewallAgent(vyatta_l3.L3AgentMiddleware):
"""Brocade Neutron Firewall agent for Vyatta vRouter.
The base class FWaaSL3AgentRpcCallback of the VyattaFirewallAgent creates
the reference FirewallService object that loads the VyattaFirewallDriver
class.The VyattaFirewallService class registers callbacks and subscribes
to router events.
"""
def __init__(self, host, conf=None):
super(VyattaFirewallAgent, self).__init__(host, conf)
self.service = firewall_service.VyattaFirewallService(self)
def main():
l3_agent.main(
manager='neutron_fwaas.services.firewall.agents.vyatta.'
'fwaas_agent.VyattaFirewallAgent')

View File

@ -1,87 +0,0 @@
# Copyright 2015 Brocade Communications System, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from networking_brocade.vyatta.vrouter import client as vyatta_client
from neutron_lib import constants as l3_constants
from six.moves.urllib import parse
TRUST_ZONE = 'Internal_Trust'
UNTRUST_ZONE = 'External_Untrust'
ZONE_INTERFACE_CMD = 'zone-policy/zone/{0}/interface/{1}'
ZONE_FIREWALL_CMD = 'zone-policy/zone/{0}/from/{1}/firewall/name/{2}'
def get_firewall_name(ri, fw):
"""Make firewall name for Vyatta vRouter
Vyatta vRouter REST API allows firewall name length
up to 28 characters.
"""
return fw['id'].replace('-', '')[:28]
def get_trusted_zone_name(ri):
return TRUST_ZONE
def get_untrusted_zone_name(ri):
return UNTRUST_ZONE
def get_zone_cmds(rest_api, ri, fw_name):
"""Return zone update commands for Vyatta vRouter.
Commands chain drops all zone-policy zones and create new zones
based on internal interfaces and external gateway.
"""
cmd_list = []
# Delete the zone policies
cmd_list.append(vyatta_client.DeleteCmd("zone-policy"))
# Configure trusted zone
trusted_zone_name = None
# Add internal ports to trusted zone
if l3_constants.INTERFACE_KEY in ri.router:
trusted_zone_name = parse.quote_plus(get_trusted_zone_name(ri))
for port in ri.router[l3_constants.INTERFACE_KEY]:
eth_if_id = rest_api.get_ethernet_if_id(port['mac_address'])
cmd_list.append(vyatta_client.SetCmd(
ZONE_INTERFACE_CMD.format(trusted_zone_name, eth_if_id)))
# Configure untrusted zone
untrusted_zone_name = get_untrusted_zone_name(ri)
if untrusted_zone_name is not None:
# Add external ports to untrusted zone
if 'gw_port' in ri.router:
gw_port = ri.router['gw_port']
eth_if_id = rest_api.get_ethernet_if_id(gw_port['mac_address'])
cmd_list.append(vyatta_client.SetCmd(
ZONE_INTERFACE_CMD.format(untrusted_zone_name, eth_if_id)))
if trusted_zone_name is not None:
# Associate firewall to zone
cmd_list.append(vyatta_client.SetCmd(
ZONE_FIREWALL_CMD.format(
trusted_zone_name, untrusted_zone_name,
parse.quote_plus(fw_name))))
cmd_list.append(vyatta_client.SetCmd(
ZONE_FIREWALL_CMD.format(
untrusted_zone_name, trusted_zone_name,
parse.quote_plus(fw_name))))
return cmd_list

View File

@ -1,11 +0,0 @@
Brocade Firewall as a Service Driver
* For more information, refer to:
https://wiki.openstack.org/wiki/Brocade_Vyatta_Firewall_driver
* For information on Brocade Vyatta CI, refer to:
https://wiki.openstack.org/wiki/ThirdPartySystems/Brocade_Vyatta_CI
* Brocade Vyatta CI contact:
- DL-GRP-VYATTA-OSS@Brocade.com
- vjayara@Brocade.com

View File

@ -1,191 +0,0 @@
# Copyright 2015 Brocade Communications System, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from networking_brocade.vyatta.common import config as vyatta_config
from networking_brocade.vyatta.vrouter import client as vyatta_client
from neutron import context as neutron_context
from novaclient import client as nova_client
from oslo_log import log as logging
from six.moves.urllib import parse
from neutron_fwaas._i18n import _LW
from neutron_fwaas.services.firewall.agents.vyatta import vyatta_utils
from neutron_fwaas.services.firewall.drivers import fwaas_base
LOG = logging.getLogger(__name__)
FW_NAME = 'firewall/name/{0}'
FW_DESCRIPTION = 'firewall/name/{0}/description/{1}'
FW_ESTABLISHED_ACCEPT = 'firewall/state-policy/established/action/accept'
FW_RELATED_ACCEPT = 'firewall/state-policy/related/action/accept'
FW_RULE_DESCRIPTION = 'firewall/name/{0}/rule/{1}/description/{2}'
FW_RULE_PROTOCOL = 'firewall/name/{0}/rule/{1}/protocol/{2}'
FW_RULE_SRC_PORT = 'firewall/name/{0}/rule/{1}/source/port/{2}'
FW_RULE_DEST_PORT = 'firewall/name/{0}/rule/{1}/destination/port/{2}'
FW_RULE_SRC_ADDR = 'firewall/name/{0}/rule/{1}/source/address/{2}'
FW_RULE_DEST_ADDR = 'firewall/name/{0}/rule/{1}/destination/address/{2}'
FW_RULE_ACTION = 'firewall/name/{0}/rule/{1}/action/{2}'
NOVACLIENT_VERSION = '2'
class VyattaFirewallDriver(fwaas_base.FwaasDriverBase):
def __init__(self):
LOG.debug("Vyatta vRouter Fwaas:: Initializing fwaas driver")
compute_client = nova_client.Client(
NOVACLIENT_VERSION,
vyatta_config.VROUTER.tenant_admin_name,
vyatta_config.VROUTER.tenant_admin_password,
auth_url=vyatta_config.CONF.nova_admin_auth_url,
service_type="compute",
tenant_id=vyatta_config.VROUTER.tenant_id)
self._vyatta_clients_pool = vyatta_client.ClientsPool(compute_client)
def create_firewall(self, agent_mode, apply_list, firewall):
LOG.debug('Vyatta vRouter Fwaas::Create_firewall (%s)', firewall)
return self.update_firewall(agent_mode, apply_list, firewall)
def update_firewall(self, agent_mode, apply_list, firewall):
LOG.debug('Vyatta vRouter Fwaas::Update_firewall (%s)', firewall)
if firewall['admin_state_up']:
return self._update_firewall(apply_list, firewall)
else:
return self.apply_default_policy(agent_mode, apply_list, firewall)
def delete_firewall(self, agent_mode, apply_list, firewall):
LOG.debug('Vyatta vRouter Fwaas::Delete_firewall (%s)', firewall)
return self.apply_default_policy(agent_mode, apply_list, firewall)
def apply_default_policy(self, agent_mode, apply_list, firewall):
LOG.debug('Vyatta vRouter Fwaas::apply_default_policy (%s)',
firewall)
for ri in apply_list:
self._delete_firewall(ri, firewall)
return True
def _update_firewall(self, apply_list, firewall):
LOG.debug("Updating firewall (%s)", firewall['id'])
for ri in apply_list:
self._delete_firewall(ri, firewall)
self._setup_firewall(ri, firewall)
return True
def _setup_firewall(self, ri, fw):
client = self._get_vyatta_client(ri.router)
fw_cmd_list = []
# Create firewall
fw_name = vyatta_utils.get_firewall_name(ri, fw)
fw_cmd_list.append(
vyatta_client.SetCmd(
FW_NAME.format(parse.quote_plus(fw_name))))
if fw.get('description'):
fw_cmd_list.append(vyatta_client.SetCmd(
FW_DESCRIPTION.format(
parse.quote_plus(fw_name),
parse.quote_plus(fw['description']))))
# Set firewall state policy
fw_cmd_list.append(vyatta_client.SetCmd(FW_ESTABLISHED_ACCEPT))
fw_cmd_list.append(vyatta_client.SetCmd(FW_RELATED_ACCEPT))
# Create firewall rules
rule_num = 0
for rule in fw['firewall_rule_list']:
if not rule['enabled']:
continue
if rule['ip_version'] == 4:
rule_num += 1
fw_cmd_list += self._set_firewall_rule(fw_name, rule_num, rule)
else:
LOG.warning(_LW("IPv6 rules are not supported."))
# Configure router zones
zone_cmd_list = vyatta_utils.get_zone_cmds(client, ri, fw_name)
client.exec_cmd_batch(fw_cmd_list + zone_cmd_list)
def _delete_firewall(self, ri, fw):
client = self._get_vyatta_client(ri.router)
cmd_list = []
# Delete zones
cmd_list.append(vyatta_client.DeleteCmd("zone-policy"))
# Delete firewall
fw_name = vyatta_utils.get_firewall_name(ri, fw)
cmd_list.append(vyatta_client.DeleteCmd(
FW_NAME.format(parse.quote_plus(fw_name))))
# Delete firewall state policy
cmd_list.append(vyatta_client.DeleteCmd("firewall/state-policy"))
client.exec_cmd_batch(cmd_list)
def _set_firewall_rule(self, fw_name, rule_num, rule):
cmd_list = []
if 'description' in rule and len(rule['description']) > 0:
cmd_list.append(vyatta_client.SetCmd(
FW_RULE_DESCRIPTION.format(
parse.quote_plus(fw_name), rule_num,
parse.quote_plus(rule['description']))))
rules = [
('protocol', FW_RULE_PROTOCOL),
('source_port', FW_RULE_SRC_PORT),
('destination_port', FW_RULE_DEST_PORT),
('source_ip_address', FW_RULE_SRC_ADDR),
('destination_ip_address', FW_RULE_DEST_ADDR),
]
for key, url in rules:
field = rule.get(key)
if field is None:
continue
# For safety and extensibility we need to use quote_plus
# for all data retrieved from external sources.
cmd_list.append(vyatta_client.SetCmd(
url.format(
parse.quote_plus(fw_name), rule_num,
parse.quote_plus(field))))
if 'action' in rule:
if rule['action'] == 'allow':
action = 'accept'
else:
action = 'drop'
cmd_list.append(vyatta_client.SetCmd(
FW_RULE_ACTION.format(
parse.quote_plus(fw_name), rule_num,
action)))
return cmd_list
def _get_vyatta_client(self, router):
ctx = neutron_context.Context(None, router['tenant_id'])
return self._vyatta_clients_pool.get_by_db_lookup(router['id'], ctx)

View File

@ -1,100 +0,0 @@
# Copyright 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import uuid
import mock
from neutron.tests import base
class FakeL3AgentMidleware(object):
def __init__(self, host, conf=None):
self._vyatta_clients_pool = mock.Mock()
self.fwplugin_rpc = mock.Mock()
self.conf = conf
# Mocking imports of 3rd party vyatta library in unit tests and all modules
# that depends on this library. Import will fail if not mocked and 3rd party
# vyatta library is not installed.
with mock.patch.dict(sys.modules, {
'networking_brocade': mock.Mock(),
'networking_brocade.vyatta': mock.Mock(),
'networking_brocade.vyatta.common': mock.Mock(),
'networking_brocade.vyatta.vrouter': mock.Mock(),
}):
from networking_brocade.vyatta.common import l3_agent
l3_agent.L3AgentMiddleware = FakeL3AgentMidleware
from neutron_fwaas.services.firewall.agents.vyatta import firewall_service
from neutron_fwaas.services.firewall.agents.vyatta import fwaas_agent
from neutron_fwaas.services.firewall.agents.vyatta import vyatta_utils
def fake_cmd(*args, **kwargs):
return (args, kwargs)
class TestVyattaFirewallService(base.BaseTestCase):
def test_sync_firewall_zones(self):
agent = self._make_agent()
fake_client = mock.Mock()
agent._vyatta_clients_pool.get_by_db_lookup.return_value = fake_client
router_id = str(uuid.uuid4())
fake_fw_record = {
'id': str(uuid.uuid4()),
'name': 'fake-fw-record0',
'router_ids': [router_id]
}
agent.fwplugin_rpc.get_firewalls_for_tenant.return_value = [
fake_fw_record
]
router = {
'id': router_id,
'tenant_id': str(uuid.uuid4())
}
router_info = mock.NonCallableMock()
router_info.router = router
cmd_list = [
fake_cmd("zone-policy"),
fake_cmd(vyatta_utils.ZONE_INTERFACE_CMD.format(
'fake-zone', 'eth0')),
fake_cmd(vyatta_utils.ZONE_FIREWALL_CMD.format(
vyatta_utils.UNTRUST_ZONE, vyatta_utils.TRUST_ZONE,
'fake-fw-name'))
]
with mock.patch.object(
vyatta_utils, 'get_zone_cmds') as get_zone_mock:
get_zone_mock.return_value = cmd_list
firewall_service.sync_firewall_zones(
None, None, agent, router=router_info)
agent._vyatta_clients_pool.get_by_db_lookup.assert_called_once_with(
router_info.router['id'], mock.ANY)
fake_client.exec_cmd_batch.assert_called_once_with(cmd_list)
def _make_agent(self):
agent = fwaas_agent.VyattaFirewallAgent('fake-host')
agent.router_info = dict()
return agent

View File

@ -1,115 +0,0 @@
# Copyright 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import mock
from neutron.tests import base
from neutron_lib import constants as l3_constants
from six.moves.urllib import parse
# Mocking imports of 3rd party vyatta library in unit tests and all modules
# that depends on this library. Import will fail if not mocked and 3rd party
# vyatta library is not installed.
with mock.patch.dict(sys.modules, {
'networking_brocade': mock.Mock(),
'networking_brocade.vyatta': mock.Mock(),
'networking_brocade.vyatta.vrouter': mock.Mock(),
}):
from networking_brocade.vyatta.vrouter import client as vyatta_client
from neutron_fwaas.services.firewall.agents.vyatta import vyatta_utils
def fake_cmd(*args, **kwargs):
return (args, kwargs)
class TestVyattaUtils(base.BaseTestCase):
def setUp(self):
super(TestVyattaUtils, self).setUp()
mock.patch.object(vyatta_client, 'SetCmd', fake_cmd).start()
mock.patch.object(vyatta_client, 'DeleteCmd', fake_cmd).start()
def test_get_firewall_name(self):
fake_firewall = {
'id': '74bc106d-fff0-4f92-ac1a-60d4b6b44fe1',
}
fw_name = vyatta_utils.get_firewall_name(
None, fake_firewall)
self.assertEqual('74bc106dfff04f92ac1a60d4b6b4', fw_name)
def test_get_trusted_zone_name(self):
fake_apply_list = object()
self.assertEqual(
'Internal_Trust', vyatta_utils.get_trusted_zone_name(
fake_apply_list))
def test_get_untrusted_zone_name(self):
fake_apply_list = object()
self.assertEqual(
'External_Untrust', vyatta_utils.get_untrusted_zone_name(
fake_apply_list))
def test_get_zone_cmds(self):
firewall_name = 'fake_firewall0'
eth_iface = 'eth0'
fake_api = mock.NonCallableMock()
fake_api.get_ethernet_if_id.return_value = eth_iface
mac_address = '00:00:00:00:00:00'
fake_apply_rule = mock.NonCallableMock()
fake_apply_rule.router = {
'gw_port': {
'mac_address': mac_address},
l3_constants.INTERFACE_KEY: [{
'mac_address': mac_address}]
}
trusted_zone_name = vyatta_utils.get_trusted_zone_name(
fake_apply_rule)
untrusted_zone_name = vyatta_utils.get_untrusted_zone_name(
fake_apply_rule)
cmds_actual = vyatta_utils.get_zone_cmds(
fake_api, fake_apply_rule, firewall_name)
cmds_expect = [
vyatta_client.DeleteCmd('zone-policy'),
vyatta_client.SetCmd(
vyatta_utils.ZONE_INTERFACE_CMD.format(
trusted_zone_name, eth_iface)),
vyatta_client.SetCmd(
vyatta_utils.ZONE_INTERFACE_CMD.format(
untrusted_zone_name, eth_iface)),
vyatta_client.SetCmd(
vyatta_utils.ZONE_FIREWALL_CMD.format(
trusted_zone_name, untrusted_zone_name,
parse.quote_plus(firewall_name))),
vyatta_client.SetCmd(
vyatta_utils.ZONE_FIREWALL_CMD.format(
untrusted_zone_name, trusted_zone_name,
parse.quote_plus(firewall_name))),
]
self.assertEqual(cmds_expect, cmds_actual)
fake_api.get_ethernet_if_id.assert_has_calls([
mock.call(mac_address),
mock.call(mac_address),
])

View File

@ -1,251 +0,0 @@
# Copyright 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import mock
from neutron.tests import base
from oslo_utils import uuidutils
from six.moves.urllib import parse
# Mocking imports of 3rd party vyatta library in unit tests and all modules
# that depends on this library. Import will fail if not mocked and 3rd party
# vyatta library is not installed.
with mock.patch.dict(sys.modules, {
'networking_brocade': mock.Mock(),
'networking_brocade.vyatta': mock.Mock(),
'networking_brocade.vyatta.common': mock.Mock(),
'networking_brocade.vyatta.vrouter': mock.Mock(),
}):
from networking_brocade.vyatta.vrouter import client as vyatta_client
from neutron_fwaas.services.firewall.agents.vyatta import vyatta_utils
from neutron_fwaas.services.firewall.drivers.vyatta import vyatta_fwaas
_uuid = uuidutils.generate_uuid
FAKE_FW_UUID = _uuid()
def fake_cmd(*args, **kwargs):
return (args, kwargs)
class VyattaFwaasTestCase(base.BaseTestCase):
def setUp(self):
super(VyattaFwaasTestCase, self).setUp()
mock.patch.object(vyatta_client, 'SetCmd', fake_cmd).start()
mock.patch.object(vyatta_client, 'DeleteCmd', fake_cmd).start()
self.fwaas_driver = vyatta_fwaas.VyattaFirewallDriver()
self.fake_rules = [self._make_fake_fw_rule()]
self.fake_firewall = self._make_fake_firewall(self.fake_rules)
self.fake_firewall_name = vyatta_utils.get_firewall_name(
None, self.fake_firewall)
self.fake_apply_list = [self._make_fake_router_info()]
self.fake_agent_mode = None
def test_create_firewall(self):
with mock.patch.object(
self.fwaas_driver, 'update_firewall') as fw_update:
self.fwaas_driver.create_firewall(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
fw_update.assert_called_once_with(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
def test_update_firewall(self):
with mock.patch.object(
self.fwaas_driver, '_update_firewall') as fw_update:
self.fake_firewall['admin_state_up'] = True
self.fwaas_driver.create_firewall(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
fw_update.assert_called_once_with(
self.fake_apply_list, self.fake_firewall)
with mock.patch.object(
self.fwaas_driver, 'apply_default_policy') as fw_apply_policy:
self.fake_firewall['admin_state_up'] = False
self.fwaas_driver.create_firewall(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
fw_apply_policy.assert_called_once_with(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
def test_delete_firewall(self):
with mock.patch.object(
self.fwaas_driver, 'apply_default_policy') as fw_apply_policy:
self.fwaas_driver.delete_firewall(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
fw_apply_policy.assert_called_once_with(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
def test_apply_default_policy(self):
with mock.patch.object(
self.fwaas_driver, '_delete_firewall') as fw_delete:
self.fwaas_driver.apply_default_policy(
self.fake_agent_mode, self.fake_apply_list, self.fake_firewall)
calls = [mock.call(x, self.fake_firewall)
for x in self.fake_apply_list]
fw_delete.assert_has_calls(calls)
def test_update_firewall_internal(self):
with mock.patch.object(
self.fwaas_driver, '_delete_firewall'
) as fw_delete, mock.patch.object(
self.fwaas_driver, '_setup_firewall') as fw_setup:
self.fwaas_driver._update_firewall(
self.fake_apply_list, self.fake_firewall)
calls = [mock.call(x, self.fake_firewall)
for x in self.fake_apply_list]
fw_delete.assert_has_calls(calls)
fw_setup.assert_has_calls(calls)
def test_setup_firewall_internal(self):
fake_rule = self._make_fake_fw_rule()
fake_router_info = self._make_fake_router_info()
fake_rule_cmd = 'fake-fw-rule0'
fake_zone_configure_rules = ['fake-config-rule0']
mock_api = mock.Mock()
mock_api_gen = mock.Mock(return_value=mock_api)
mock_get_firewall_rule = mock.Mock(return_value=[fake_rule_cmd])
mock_get_zone_cmds = mock.Mock(return_value=fake_zone_configure_rules)
with mock.patch.object(self.fwaas_driver, '_get_vyatta_client',
mock_api_gen), \
mock.patch.object(vyatta_fwaas.vyatta_utils, 'get_zone_cmds',
mock_get_zone_cmds), \
mock.patch.object(self.fwaas_driver, '_set_firewall_rule',
mock_get_firewall_rule):
self.fwaas_driver._setup_firewall(
fake_router_info, self.fake_firewall)
mock_api_gen.assert_called_once_with(
fake_router_info.router)
mock_get_firewall_rule.assert_called_once_with(
self.fake_firewall_name, 1, fake_rule)
mock_get_zone_cmds.assert_called_once_with(
mock_api, fake_router_info, self.fake_firewall_name)
cmds = [
vyatta_client.SetCmd(
vyatta_fwaas.FW_NAME.format(
self.fake_firewall_name)),
vyatta_client.SetCmd(
vyatta_fwaas.FW_DESCRIPTION.format(
self.fake_firewall_name,
parse.quote_plus(self.fake_firewall['description']))),
vyatta_client.SetCmd(
vyatta_fwaas.FW_ESTABLISHED_ACCEPT),
vyatta_client.SetCmd(
vyatta_fwaas.FW_RELATED_ACCEPT),
fake_rule_cmd,
] + fake_zone_configure_rules
mock_api.exec_cmd_batch.assert_called_once_with(cmds)
def test_delete_firewall_internal(self):
fake_router_info = self._make_fake_router_info()
with mock.patch.object(
self.fwaas_driver,
'_get_vyatta_client') as mock_client_factory:
mock_api = mock_client_factory.return_value
self.fwaas_driver._delete_firewall(
fake_router_info, self.fake_firewall)
cmds = [
vyatta_client.DeleteCmd("zone-policy"),
vyatta_client.DeleteCmd(vyatta_fwaas.FW_NAME.format(
self.fake_firewall_name)),
vyatta_client.DeleteCmd("firewall/state-policy"),
]
mock_api.exec_cmd_batch.assert_called_once_with(cmds)
def test_set_firewall_rule_internal(self):
fake_rule = self._make_fake_fw_rule()
fake_firewall_name = 'fake-fw-name'
fake_rule.update({
'description': 'rule description',
'source_port': '2080',
'destination_ip_address': '172.16.1.1'
})
action_map = {
'allow': 'accept',
}
cmds_actual = self.fwaas_driver._set_firewall_rule(
fake_firewall_name, 1, fake_rule)
cmds_expect = [
vyatta_client.SetCmd(
vyatta_fwaas.FW_RULE_DESCRIPTION.format(
parse.quote_plus(fake_firewall_name), 1,
parse.quote_plus(fake_rule['description'])))
]
rules = [
('protocol', vyatta_fwaas.FW_RULE_PROTOCOL),
('source_port', vyatta_fwaas.FW_RULE_SRC_PORT),
('destination_port', vyatta_fwaas.FW_RULE_DEST_PORT),
('source_ip_address', vyatta_fwaas.FW_RULE_SRC_ADDR),
('destination_ip_address', vyatta_fwaas.FW_RULE_DEST_ADDR),
]
for key, url in rules:
cmds_expect.append(vyatta_client.SetCmd(
url.format(
parse.quote_plus(fake_firewall_name), 1,
parse.quote_plus(fake_rule[key]))))
cmds_expect.append(vyatta_client.SetCmd(
vyatta_fwaas.FW_RULE_ACTION.format(
parse.quote_plus(fake_firewall_name), 1,
action_map.get(fake_rule['action'], 'drop'))))
self.assertEqual(cmds_expect, cmds_actual)
def _make_fake_router_info(self):
info = mock.Mock()
info.router = {
'id': 'fake-router-id',
'tenant_id': 'tenant-uuid',
}
return info
def _make_fake_fw_rule(self):
return {
'enabled': True,
'action': 'allow',
'ip_version': 4,
'protocol': 'tcp',
'destination_port': '80',
'source_ip_address': '10.24.4.2'}
def _make_fake_firewall(self, rules):
return {'id': FAKE_FW_UUID,
'admin_state_up': True,
'name': 'test-firewall',
'tenant_id': 'tenant-uuid',
'description': 'Fake firewall',
'firewall_rule_list': rules}

View File

@ -0,0 +1,7 @@
---
prelude: >
- The vyatta Firewall Driver is being removed from the FwaaS repo,
as per decision to remove vendor drivers from the community repo.
upgrade:
- The vyatta Firewall Driver will not be available for use in the
Newton release from the community repo.