896 lines
46 KiB
Python
896 lines
46 KiB
Python
# Copyright 2017 VMware Inc
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
import time
|
|
|
|
from tempest import config
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib.common.utils import test_utils
|
|
|
|
from tempest.lib import decorators
|
|
|
|
from vmware_nsx_tempest_plugin.common import constants
|
|
from vmware_nsx_tempest_plugin.lib import feature_manager
|
|
from vmware_nsx_tempest_plugin.services import nsxp_client
|
|
from vmware_nsx_tempest_plugin.services import nsxv3_client
|
|
|
|
CONF = config.CONF
|
|
LOG = constants.log.getLogger(__name__)
|
|
|
|
|
|
class TestVerifyFwNatOrder(feature_manager.FeatureManager):
|
|
|
|
"""Test New Cases Scenario
|
|
|
|
"""
|
|
@classmethod
|
|
def setup_clients(cls):
|
|
super(TestVerifyFwNatOrder, cls).setup_clients()
|
|
cls.cmgr_adm = cls.get_client_manager('admin')
|
|
cls.cmgr_alt = cls.get_client_manager('alt')
|
|
cls.cmgr_adm = cls.get_client_manager('admin')
|
|
cls.routers_client = cls.cmgr_adm.routers_client
|
|
cls.networks_client = cls.cmgr_adm.networks_client
|
|
cls.subnets_client = cls.cmgr_adm.subnets_client
|
|
cls.sec_rule_client = cls.cmgr_adm.security_group_rules_client
|
|
cls.sec_client = cls.cmgr_adm.security_groups_client
|
|
|
|
def setUp(self):
|
|
super(TestVerifyFwNatOrder, self).setUp()
|
|
CONF.validation.ssh_shell_prologue = ''
|
|
self.vip_ip_address = ''
|
|
self.namestart = 'lbaas-ops'
|
|
self.poke_counters = 12
|
|
self.hm_delay = 4
|
|
self.hm_max_retries = 3
|
|
self.hm_timeout = 10
|
|
self.server_names = []
|
|
self.loadbalancer = None
|
|
self.vip_fip = None
|
|
self.web_service_start_delay = 2.5
|
|
|
|
@classmethod
|
|
def resource_setup(cls):
|
|
super(TestVerifyFwNatOrder, cls).resource_setup()
|
|
cls.nsx = nsxv3_client.NSXV3Client(CONF.nsxv3.nsx_manager,
|
|
CONF.nsxv3.nsx_user,
|
|
CONF.nsxv3.nsx_password)
|
|
cls.nsxp = nsxp_client.NSXPClient(CONF.nsxv3.nsx_manager,
|
|
CONF.nsxv3.nsx_user,
|
|
CONF.nsxv3.nsx_password)
|
|
|
|
def _test_ping_from_external_network(self, fip_ip):
|
|
out = os.popen('ping -c 5 %s' % fip_ip).read().strip()
|
|
return out
|
|
|
|
@decorators.idempotent_id('2317449c-03ac-9106-c317-09956daa46c3')
|
|
def test_verfiy_nat_fw_order_external_fw(self):
|
|
"""
|
|
Create NAT and Firewall rules on router.
|
|
Verify order of NAT and Firewall.
|
|
"""
|
|
rtr_name = data_utils.rand_name(name='tempest-router')
|
|
network_name = data_utils.rand_name(name='tempest-net')
|
|
subnet_name = data_utils.rand_name(name='tempest-subnet')
|
|
router_state = self.create_topology_router(
|
|
rtr_name, set_gateway=True,
|
|
routers_client=self.cmgr_adm.routers_client)
|
|
network_state = self.create_topology_network(
|
|
network_name, networks_client=self.cmgr_adm.networks_client)
|
|
subnet_state = self.create_topology_subnet(
|
|
subnet_name, network_state,
|
|
subnets_client=self.cmgr_adm.subnets_client)
|
|
interface = self.cmgr_adm.routers_client.add_router_interface(
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.cmgr_adm.routers_client.remove_router_interface,
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
security_group = self._create_security_group(
|
|
security_group_rules_client=self.cmgr_adm.
|
|
security_group_rules_client,
|
|
security_groups_client=self.cmgr_adm.security_groups_client)
|
|
image_id = self.get_glance_image_id(["cirros", "esx"])
|
|
security_groups = [{'name': security_group['name']}]
|
|
server1 = self.create_topology_instance(
|
|
"state_vm_1", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
server2 = self.create_topology_instance(
|
|
"state_vm_2", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
ext_network = self.cmgr_adm.networks_client.show_network(
|
|
CONF.network.public_network_id)['network']
|
|
ext_subnet = self.cmgr_adm.subnets_client.show_subnet(
|
|
ext_network['subnets'][0])['subnet']
|
|
fw_rules = self.create_firewall_rule(
|
|
name='test_rule', protocol='icmp',
|
|
action="allow", destination_ip_address=ext_subnet["cidr"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_rule,
|
|
fw_rules['firewall_rule']['id'])
|
|
rules = []
|
|
# Check firewall rule
|
|
rules.append(fw_rules['firewall_rule']['id'])
|
|
policy_name = data_utils.rand_name('fw-policy-')
|
|
# Create firewall policy
|
|
fw_policy = self.create_firewall_policy(
|
|
name=policy_name, firewall_rules=rules,
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_policy,
|
|
fw_policy['firewall_policy']['id'])
|
|
show_policy = self.show_firewall_policy(
|
|
fw_policy['firewall_policy']['id'])
|
|
# Check firewall policy
|
|
self.assertEqual(
|
|
show_policy.get('firewall_policy')['name'],
|
|
policy_name)
|
|
self.assertEqual(show_policy.get('firewall_policy')
|
|
['firewall_rules'], rules)
|
|
policy_id = fw_policy['firewall_policy']['id']
|
|
group_name = data_utils.rand_name('fw-group-')
|
|
# Create firewall group
|
|
fw_group = self.create_firewall_group(
|
|
name=group_name,
|
|
ingress_firewall_policy_id=policy_id,
|
|
egress_firewall_policy_id=policy_id,
|
|
ports=[interface['port_id']],
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.update_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"])
|
|
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
|
|
# Verify traffic to vm
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
# Update destination ip address in firewall rule
|
|
fw_rules = self.update_firewall_rule(
|
|
fw_rules['firewall_rule']['id'],
|
|
destination_ip_address=subnet_state['cidr'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
# Verify traffic to vm
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
self.fwaas_v2_client.update_firewall_v2_group(
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
|
|
@decorators.idempotent_id('2317449c-14bd-0317-c317-09956daa46c3')
|
|
def test_verfiy_nat_fw_order_internal_fw(self):
|
|
"""
|
|
Create NAT and Firewall rules on router.
|
|
Verify order of NAT and Firewall.
|
|
"""
|
|
rtr_name = data_utils.rand_name(name='tempest-router')
|
|
network_name = data_utils.rand_name(name='tempest-net')
|
|
subnet_name = data_utils.rand_name(name='tempest-subnet')
|
|
router_state = self.create_topology_router(
|
|
rtr_name, set_gateway=True,
|
|
routers_client=self.cmgr_adm.routers_client)
|
|
network_state = self.create_topology_network(
|
|
network_name, networks_client=self.cmgr_adm.networks_client)
|
|
subnet_state = self.create_topology_subnet(
|
|
subnet_name, network_state,
|
|
subnets_client=self.cmgr_adm.subnets_client)
|
|
interface = self.cmgr_adm.routers_client.add_router_interface(
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.cmgr_adm.routers_client.remove_router_interface,
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
security_group = self._create_security_group(
|
|
security_group_rules_client=self.cmgr_adm.
|
|
security_group_rules_client,
|
|
security_groups_client=self.cmgr_adm.security_groups_client)
|
|
image_id = self.get_glance_image_id(["cirros", "esx"])
|
|
security_groups = [{'name': security_group['name']}]
|
|
server1 = self.create_topology_instance(
|
|
"state_vm_1", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
server2 = self.create_topology_instance(
|
|
"state_vm_2", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
ext_network = self.cmgr_adm.networks_client.show_network(
|
|
CONF.network.public_network_id)['network']
|
|
ext_subnet = self.cmgr_adm.subnets_client.show_subnet(
|
|
ext_network['subnets'][0])['subnet']
|
|
fw_rules = self.create_firewall_rule(
|
|
name='test_rule', protocol='icmp',
|
|
action="allow",
|
|
destination_ip_address=subnet_state['cidr'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_rule,
|
|
fw_rules['firewall_rule']['id'])
|
|
rules = []
|
|
# Check firewall rule
|
|
rules.append(fw_rules['firewall_rule']['id'])
|
|
policy_name = data_utils.rand_name('fw-policy-')
|
|
# Create firewall policy
|
|
fw_policy = self.create_firewall_policy(
|
|
name=policy_name, firewall_rules=rules,
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_policy,
|
|
fw_policy['firewall_policy']['id'])
|
|
show_policy = self.show_firewall_policy(
|
|
fw_policy['firewall_policy']['id'])
|
|
# Check firewall policy
|
|
self.assertEqual(
|
|
show_policy.get('firewall_policy')['name'],
|
|
policy_name)
|
|
self.assertEqual(show_policy.get('firewall_policy')
|
|
['firewall_rules'], rules)
|
|
policy_id = fw_policy['firewall_policy']['id']
|
|
group_name = data_utils.rand_name('fw-group-')
|
|
# Create firewall group
|
|
fw_group = self.create_firewall_group(
|
|
name=group_name,
|
|
ingress_firewall_policy_id=policy_id,
|
|
egress_firewall_policy_id=policy_id,
|
|
ports=[interface['port_id']],
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.update_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"])
|
|
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
|
|
# Verify traffic to vm
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
# Update destination ip address in firewall rule
|
|
fw_rules = self.update_firewall_rule(
|
|
fw_rules['firewall_rule']['id'],
|
|
destination_ip_address=ext_subnet['cidr'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
# Verify traffic to vm
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
self.fwaas_v2_client.update_firewall_v2_group(
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
|
|
@decorators.idempotent_id('2317449c-14ca-1428-a428-09956daa46c3')
|
|
def test_verfiy_nat_fw_order_external_fw_with_octavia_lb(self):
|
|
"""
|
|
Create NAT and Firewall rules on router.
|
|
Verify order of NAT and Firewall.
|
|
"""
|
|
kwargs = {"enable_snat": True}
|
|
network_name = data_utils.rand_name(name='tempest-net')
|
|
subnet_name = data_utils.rand_name(name='tempest-subnet')
|
|
router_state = self.create_topology_router(
|
|
set_gateway=True,
|
|
routers_client=self.cmgr_adm.routers_client, **kwargs)
|
|
network_state = self.create_topology_network(
|
|
network_name, networks_client=self.cmgr_adm.networks_client)
|
|
subnet_state = self.create_topology_subnet(
|
|
subnet_name, network_state,
|
|
subnets_client=self.cmgr_adm.subnets_client)
|
|
interface = self.cmgr_adm.routers_client.add_router_interface(
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.cmgr_adm.routers_client.remove_router_interface,
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
sec_rule_client = self.cmgr_adm.security_group_rules_client
|
|
sec_client = self.cmgr_adm.security_groups_client
|
|
kwargs = dict(tenant_id=network_state['tenant_id'],
|
|
security_group_rules_client=sec_rule_client,
|
|
security_groups_client=sec_client)
|
|
self.sg = self._create_security_group(
|
|
security_group_rules_client=self.cmgr_adm.
|
|
security_group_rules_client,
|
|
security_groups_client=self.cmgr_adm.security_groups_client)
|
|
lbaas_rules = [dict(direction='ingress', protocol='tcp',
|
|
port_range_min=constants.HTTP_PORT,
|
|
port_range_max=constants.HTTP_PORT, ),
|
|
dict(direction='ingress', protocol='tcp',
|
|
port_range_min=443, port_range_max=443, )]
|
|
for rule in lbaas_rules:
|
|
self.add_security_group_rule(
|
|
self.sg,
|
|
rule,
|
|
ruleclient=self.cmgr_adm.security_group_rules_client,
|
|
secclient=self.cmgr_adm.security_groups_client,
|
|
tenant_id=network_state['tenant_id'])
|
|
ext_network = self.cmgr_adm.networks_client.show_network(
|
|
CONF.network.public_network_id)['network']
|
|
ext_subnet = self.cmgr_adm.subnets_client.show_subnet(
|
|
ext_network['subnets'][0])['subnet']
|
|
security_groups = [{'name': self.sg['name']}]
|
|
image_id = self.get_glance_image_id(["cirros", "esx"])
|
|
self.create_topology_instance(
|
|
"state_vm_1", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
self.create_topology_instance(
|
|
"state_vm_2", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
self.start_web_servers(constants.HTTP_PORT)
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
lb_cist = self.create_project_octavia(
|
|
protocol_type="HTTP", protocol_port="80",
|
|
lb_algorithm="LEAST_CONNECTIONS",
|
|
vip_net_id=network_state['id'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
fw_rules = self.create_firewall_rule(
|
|
name='test_rule', protocol='tcp',
|
|
action="allow", destination_ip_address=ext_subnet["cidr"],
|
|
destination_port='80')
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_rule,
|
|
fw_rules['firewall_rule']['id'])
|
|
rules = []
|
|
# Check firewall rule
|
|
rules.append(fw_rules['firewall_rule']['id'])
|
|
policy_name = data_utils.rand_name('fw-policy-')
|
|
# Create firewall policy
|
|
fw_policy = self.create_firewall_policy(
|
|
name=policy_name, firewall_rules=rules,
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_policy,
|
|
fw_policy['firewall_policy']['id'])
|
|
show_policy = self.show_firewall_policy(
|
|
fw_policy['firewall_policy']['id'])
|
|
# Check firewall policy
|
|
self.assertEqual(
|
|
show_policy.get('firewall_policy')['name'],
|
|
policy_name)
|
|
self.assertEqual(show_policy.get('firewall_policy')
|
|
['firewall_rules'], rules)
|
|
policy_id = fw_policy['firewall_policy']['id']
|
|
group_name = data_utils.rand_name('fw-group-')
|
|
# Create firewall group
|
|
fw_group = self.create_firewall_group(
|
|
name=group_name,
|
|
ingress_firewall_policy_id=policy_id,
|
|
egress_firewall_policy_id=policy_id,
|
|
ports=[interface['port_id']],
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.update_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"])
|
|
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
|
|
self.check_project_lbaas()
|
|
fw_rules = self.update_firewall_rule(
|
|
fw_rules['firewall_rule']['id'],
|
|
destination_ip_address=subnet_state['cidr'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
self.check_project_lbaas()
|
|
self.delete_octavia_lb_resources(lb_cist['lb_id'])
|
|
self.fwaas_v2_client.update_firewall_v2_group(
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
|
|
@decorators.idempotent_id('2317320c-44bd-9106-c317-09956daa46c3')
|
|
def test_verfiy_nat_fw_order_internal_fw_with_octa(self):
|
|
"""
|
|
Create NAT and Firewall rules on router.
|
|
Verify order of NAT and Firewall.
|
|
"""
|
|
kwargs = {"enable_snat": True}
|
|
network_name = data_utils.rand_name(name='tempest-net')
|
|
subnet_name = data_utils.rand_name(name='tempest-subnet')
|
|
router_state = self.create_topology_router(
|
|
set_gateway=True,
|
|
routers_client=self.cmgr_adm.routers_client, **kwargs)
|
|
network_state = self.create_topology_network(
|
|
network_name, networks_client=self.cmgr_adm.networks_client)
|
|
subnet_state = self.create_topology_subnet(
|
|
subnet_name, network_state,
|
|
subnets_client=self.cmgr_adm.subnets_client)
|
|
interface = self.cmgr_adm.routers_client.add_router_interface(
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.cmgr_adm.routers_client.remove_router_interface,
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
sec_rule_client = self.cmgr_adm.security_group_rules_client
|
|
sec_client = self.cmgr_adm.security_groups_client
|
|
kwargs = dict(tenant_id=network_state['tenant_id'],
|
|
security_group_rules_client=sec_rule_client,
|
|
security_groups_client=sec_client)
|
|
self.sg = self._create_security_group(
|
|
security_group_rules_client=self.cmgr_adm.
|
|
security_group_rules_client,
|
|
security_groups_client=self.cmgr_adm.security_groups_client)
|
|
lbaas_rules = [dict(direction='ingress', protocol='tcp',
|
|
port_range_min=constants.HTTP_PORT,
|
|
port_range_max=constants.HTTP_PORT, ),
|
|
dict(direction='ingress', protocol='tcp',
|
|
port_range_min=443, port_range_max=443, )]
|
|
for rule in lbaas_rules:
|
|
self.add_security_group_rule(
|
|
self.sg,
|
|
rule,
|
|
ruleclient=self.cmgr_adm.security_group_rules_client,
|
|
secclient=self.cmgr_adm.security_groups_client,
|
|
tenant_id=network_state['tenant_id'])
|
|
ext_network = self.cmgr_adm.networks_client.show_network(
|
|
CONF.network.public_network_id)['network']
|
|
ext_subnet = self.cmgr_adm.subnets_client.show_subnet(
|
|
ext_network['subnets'][0])['subnet']
|
|
security_groups = [{'name': self.sg['name']}]
|
|
image_id = self.get_glance_image_id(["cirros", "esx"])
|
|
self.create_topology_instance(
|
|
"state_vm_1", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
self.create_topology_instance(
|
|
"state_vm_2", [network_state],
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups)
|
|
self.start_web_servers(constants.HTTP_PORT)
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
lb_cist = self.create_project_octavia(
|
|
protocol_type="HTTP", protocol_port="80",
|
|
lb_algorithm="LEAST_CONNECTIONS",
|
|
vip_net_id=network_state['id'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
fw_rules = self.create_firewall_rule(
|
|
name='test_rule', protocol='tcp',
|
|
action="allow", destination_ip_address=subnet_state['cidr'],
|
|
destination_port='80')
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_rule,
|
|
fw_rules['firewall_rule']['id'])
|
|
rules = []
|
|
# Check firewall rule
|
|
rules.append(fw_rules['firewall_rule']['id'])
|
|
policy_name = data_utils.rand_name('fw-policy-')
|
|
# Create firewall policy
|
|
fw_policy = self.create_firewall_policy(
|
|
name=policy_name, firewall_rules=rules,
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_policy,
|
|
fw_policy['firewall_policy']['id'])
|
|
show_policy = self.show_firewall_policy(
|
|
fw_policy['firewall_policy']['id'])
|
|
# Check firewall policy
|
|
self.assertEqual(
|
|
show_policy.get('firewall_policy')['name'],
|
|
policy_name)
|
|
self.assertEqual(show_policy.get('firewall_policy')
|
|
['firewall_rules'], rules)
|
|
policy_id = fw_policy['firewall_policy']['id']
|
|
group_name = data_utils.rand_name('fw-group-')
|
|
# Create firewall group
|
|
fw_group = self.create_firewall_group(
|
|
name=group_name,
|
|
ingress_firewall_policy_id=policy_id,
|
|
egress_firewall_policy_id=policy_id,
|
|
ports=[interface['port_id']],
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.update_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"])
|
|
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
|
|
self.check_project_lbaas()
|
|
fw_rules = self.update_firewall_rule(
|
|
fw_rules['firewall_rule']['id'],
|
|
destination_ip_address=ext_subnet['cidr'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
self.check_project_lbaas()
|
|
self.delete_octavia_lb_resources(lb_cist['lb_id'])
|
|
self.fwaas_v2_client.update_firewall_v2_group(
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
|
|
@decorators.idempotent_id('2317449c-14ca-1428-a428-10047daa46c3')
|
|
def test_verify_nat_fw_order_when_vm_booted_with_port_internal_fw(self):
|
|
"""
|
|
Create NAT and Firewall rules on router.
|
|
Verify order of NAT and Firewall.
|
|
"""
|
|
rtr_name = data_utils.rand_name(name='tempest-router')
|
|
network_name = data_utils.rand_name(name='tempest-net')
|
|
subnet_name = data_utils.rand_name(name='tempest-subnet')
|
|
router_state = self.create_topology_router(
|
|
rtr_name, set_gateway=True,
|
|
routers_client=self.cmgr_adm.routers_client)
|
|
network_state = self.create_topology_network(
|
|
network_name, networks_client=self.cmgr_adm.networks_client)
|
|
subnet_state = self.create_topology_subnet(
|
|
subnet_name, network_state,
|
|
subnets_client=self.cmgr_adm.subnets_client)
|
|
interface = self.cmgr_adm.routers_client.add_router_interface(
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.cmgr_adm.routers_client.remove_router_interface,
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
security_group = self._create_security_group(
|
|
security_group_rules_client=self.cmgr_adm.
|
|
security_group_rules_client,
|
|
security_groups_client=self.cmgr_adm.security_groups_client)
|
|
image_id = self.get_glance_image_id(["cirros", "esx"])
|
|
security_groups = [{'name': security_group['name']}]
|
|
port1 = self.create_topology_port(
|
|
network_state, ports_client=self.cmgr_adm.ports_client)['port']
|
|
port2 = self.create_topology_port(
|
|
network_state, ports_client=self.cmgr_adm.ports_client)['port']
|
|
kwargs = {'security_groups': [security_group['id']]}
|
|
port1 = self.cmgr_adm.ports_client.update_port(
|
|
port1['id'], **kwargs)['port']
|
|
port2 = self.cmgr_adm.ports_client.update_port(
|
|
port2['id'], **kwargs)['port']
|
|
server1 = self.create_topology_instance(
|
|
"state_vm_1",
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups, port=port1)
|
|
server2 = self.create_topology_instance(
|
|
"state_vm_2",
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups, port=port2)
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_INTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
ext_network = self.cmgr_adm.networks_client.show_network(
|
|
CONF.network.public_network_id)['network']
|
|
ext_subnet = self.cmgr_adm.subnets_client.show_subnet(
|
|
ext_network['subnets'][0])['subnet']
|
|
fw_rules = self.create_firewall_rule(
|
|
name='test_rule', protocol='icmp',
|
|
action="allow",
|
|
destination_ip_address=subnet_state['cidr'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_rule,
|
|
fw_rules['firewall_rule']['id'])
|
|
rules = []
|
|
# Check firewall rule
|
|
rules.append(fw_rules['firewall_rule']['id'])
|
|
policy_name = data_utils.rand_name('fw-policy-')
|
|
# Create firewall policy
|
|
fw_policy = self.create_firewall_policy(
|
|
name=policy_name, firewall_rules=rules,
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_policy,
|
|
fw_policy['firewall_policy']['id'])
|
|
show_policy = self.show_firewall_policy(
|
|
fw_policy['firewall_policy']['id'])
|
|
# Check firewall policy
|
|
self.assertEqual(
|
|
show_policy.get('firewall_policy')['name'],
|
|
policy_name)
|
|
self.assertEqual(show_policy.get('firewall_policy')
|
|
['firewall_rules'], rules)
|
|
policy_id = fw_policy['firewall_policy']['id']
|
|
group_name = data_utils.rand_name('fw-group-')
|
|
# Create firewall group
|
|
fw_group = self.create_firewall_group(
|
|
name=group_name,
|
|
ingress_firewall_policy_id=policy_id,
|
|
egress_firewall_policy_id=policy_id,
|
|
ports=[interface['port_id']],
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.update_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"])
|
|
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
|
|
# Verify traffic to vm
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
# Update destination ip address in firewall rule
|
|
fw_rules = self.update_firewall_rule(
|
|
fw_rules['firewall_rule']['id'],
|
|
destination_ip_address=ext_subnet['cidr'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
# Verify traffic to vm
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
self.fwaas_v2_client.update_firewall_v2_group(
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
|
|
@decorators.idempotent_id('2317449c-14ca-1428-b530-10047daa46c3')
|
|
def test_verify_nat_fw_order_when_vm_booted_with_port_external_fw(self):
|
|
"""
|
|
Create NAT and Firewall rules on router.
|
|
Verify order of NAT and Firewall.
|
|
"""
|
|
rtr_name = data_utils.rand_name(name='tempest-router')
|
|
network_name = data_utils.rand_name(name='tempest-net')
|
|
subnet_name = data_utils.rand_name(name='tempest-subnet')
|
|
router_state = self.create_topology_router(
|
|
rtr_name, set_gateway=True,
|
|
routers_client=self.cmgr_adm.routers_client)
|
|
network_state = self.create_topology_network(
|
|
network_name, networks_client=self.cmgr_adm.networks_client)
|
|
subnet_state = self.create_topology_subnet(
|
|
subnet_name, network_state,
|
|
subnets_client=self.cmgr_adm.subnets_client)
|
|
interface = self.cmgr_adm.routers_client.add_router_interface(
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.cmgr_adm.routers_client.remove_router_interface,
|
|
router_state['id'], subnet_id=subnet_state["id"])
|
|
security_group = self._create_security_group(
|
|
security_group_rules_client=self.cmgr_adm.
|
|
security_group_rules_client,
|
|
security_groups_client=self.cmgr_adm.security_groups_client)
|
|
image_id = self.get_glance_image_id(["cirros", "esx"])
|
|
security_groups = [{'name': security_group['name']}]
|
|
port1 = self.create_topology_port(
|
|
network_state, ports_client=self.cmgr_adm.ports_client)['port']
|
|
port2 = self.create_topology_port(
|
|
network_state, ports_client=self.cmgr_adm.ports_client)['port']
|
|
kwargs = {'security_groups': [security_group['id']]}
|
|
port1 = self.cmgr_adm.ports_client.update_port(
|
|
port1['id'], **kwargs)['port']
|
|
port2 = self.cmgr_adm.ports_client.update_port(
|
|
port2['id'], **kwargs)['port']
|
|
server1 = self.create_topology_instance(
|
|
"state_vm_1",
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups, port=port1)
|
|
server2 = self.create_topology_instance(
|
|
"state_vm_2",
|
|
create_floating_ip=True, image_id=image_id, clients=self.cmgr_adm,
|
|
security_groups=security_groups, port=port2)
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
nsx_router = self.nsxp.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsxp.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
nsx_router = self.nsx.get_logical_router(router_state['name'],
|
|
router_state['id'])
|
|
nat_rules = self.nsx.get_logical_router_nat_rules(nsx_router)
|
|
for nat_rule in nat_rules:
|
|
if nat_rule['firewall_match'] == 'BYPASS':
|
|
continue
|
|
self.assertEqual('MATCH_EXTERNAL_ADDRESS',
|
|
nat_rule['firewall_match'])
|
|
ext_network = self.cmgr_adm.networks_client.show_network(
|
|
CONF.network.public_network_id)['network']
|
|
ext_subnet = self.cmgr_adm.subnets_client.show_subnet(
|
|
ext_network['subnets'][0])['subnet']
|
|
fw_rules = self.create_firewall_rule(
|
|
name='test_rule', protocol='icmp',
|
|
action="allow", destination_ip_address=ext_subnet["cidr"])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_rule,
|
|
fw_rules['firewall_rule']['id'])
|
|
rules = []
|
|
# Check firewall rule
|
|
rules.append(fw_rules['firewall_rule']['id'])
|
|
policy_name = data_utils.rand_name('fw-policy-')
|
|
# Create firewall policy
|
|
fw_policy = self.create_firewall_policy(
|
|
name=policy_name, firewall_rules=rules,
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_policy,
|
|
fw_policy['firewall_policy']['id'])
|
|
show_policy = self.show_firewall_policy(
|
|
fw_policy['firewall_policy']['id'])
|
|
# Check firewall policy
|
|
self.assertEqual(
|
|
show_policy.get('firewall_policy')['name'],
|
|
policy_name)
|
|
self.assertEqual(show_policy.get('firewall_policy')
|
|
['firewall_rules'], rules)
|
|
policy_id = fw_policy['firewall_policy']['id']
|
|
group_name = data_utils.rand_name('fw-group-')
|
|
# Create firewall group
|
|
fw_group = self.create_firewall_group(
|
|
name=group_name,
|
|
ingress_firewall_policy_id=policy_id,
|
|
egress_firewall_policy_id=policy_id,
|
|
ports=[interface['port_id']],
|
|
project_id=router_state['project_id'])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.update_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"], ports=[])
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
self.fwaas_v2_client.delete_firewall_v2_group,
|
|
fw_group["firewall_group"]["id"])
|
|
self._wait_firewall_ready(fw_group["firewall_group"]["id"])
|
|
# Verify traffic to vm
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertIn("64 bytes from ", str(out))
|
|
# Update destination ip address in firewall rule
|
|
fw_rules = self.update_firewall_rule(
|
|
fw_rules['firewall_rule']['id'],
|
|
destination_ip_address=subnet_state['cidr'])
|
|
if CONF.network.backend == 'nsxp':
|
|
time.sleep(constants.NSXP_BACKEND_SMALL_TIME_INTERVAL)
|
|
# Verify traffic to vm
|
|
fip = server1["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
fip = server2["floating_ips"][0]["floating_ip_address"]
|
|
out = self._test_ping_from_external_network(fip)
|
|
self.assertNotIn("64 bytes from ", str(out))
|
|
self.fwaas_v2_client.update_firewall_v2_group(
|
|
fw_group["firewall_group"]["id"], ports=[])
|