Merge "Remove advanced services tests"

This commit is contained in:
Jenkins 2015-06-26 01:41:01 +00:00 committed by Gerrit Code Review
commit d6b9baeb47
11 changed files with 1 additions and 2047 deletions

@ -1,74 +0,0 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib.common.utils import data_utils
from tempest.api.network import base
from tempest import test
class LBaaSAgentSchedulerTestJSON(base.BaseAdminNetworkTest):
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
List pools the given LBaaS agent is hosting.
Show a LBaaS agent hosting the given pool.
v2.0 of the Neutron API is assumed. It is also assumed that the following
options are defined in the [networki-feature-enabled] section of
etc/tempest.conf:
api_extensions
"""
@classmethod
def skip_checks(cls):
super(LBaaSAgentSchedulerTestJSON, cls).skip_checks()
if not test.is_extension_enabled('lbaas_agent_scheduler', 'network'):
msg = "LBaaS Agent Scheduler Extension not enabled."
raise cls.skipException(msg)
@classmethod
def resource_setup(cls):
super(LBaaSAgentSchedulerTestJSON, cls).resource_setup()
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
pool_name = data_utils.rand_name('pool-')
cls.pool = cls.create_pool(pool_name, "ROUND_ROBIN",
"HTTP", cls.subnet)
@test.idempotent_id('e5ea8b15-4f44-4350-963c-e0fcb533ee79')
def test_list_pools_on_lbaas_agent(self):
found = False
body = self.admin_client.list_agents(
agent_type="Loadbalancer agent")
agents = body['agents']
for a in agents:
msg = 'Load Balancer agent expected'
self.assertEqual(a['agent_type'], 'Loadbalancer agent', msg)
body = (
self.admin_client.list_pools_hosted_by_one_lbaas_agent(
a['id']))
pools = body['pools']
if self.pool['id'] in [p['id'] for p in pools]:
found = True
msg = 'Unable to find Load Balancer agent hosting pool'
self.assertTrue(found, msg)
@test.idempotent_id('e2745593-fd79-4b98-a262-575fd7865796')
def test_show_lbaas_agent_hosting_pool(self):
body = self.admin_client.show_lbaas_agent_hosting_pool(
self.pool['id'])
self.assertEqual('Loadbalancer agent', body['agent']['agent_type'])

@ -1,118 +0,0 @@
# Copyright 2014 Mirantis.inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib.common.utils import data_utils
from tempest.api.network import base
from tempest import test
class LoadBalancerAdminTestJSON(base.BaseAdminNetworkTest):
"""
Test admin actions for load balancer.
Create VIP for another tenant
Create health monitor for another tenant
"""
force_tenant_isolation = True
@classmethod
def skip_checks(cls):
super(LoadBalancerAdminTestJSON, cls).skip_checks()
if not test.is_extension_enabled('lbaas', 'network'):
msg = "lbaas extension not enabled."
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(LoadBalancerAdminTestJSON, cls).setup_clients()
cls.client = cls.manager.network_client
@classmethod
def resource_setup(cls):
super(LoadBalancerAdminTestJSON, cls).resource_setup()
cls.tenant_id = cls.os.credentials.tenant_id
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.pool = cls.create_pool(data_utils.rand_name('pool-'),
"ROUND_ROBIN", "HTTP", cls.subnet)
@test.idempotent_id('6b0a20d8-4fcd-455e-b54f-ec4db5199518')
def test_create_vip_as_admin_for_another_tenant(self):
name = data_utils.rand_name('vip-')
body = self.admin_client.create_pool(
name=data_utils.rand_name('pool-'),
lb_method="ROUND_ROBIN",
protocol="HTTP",
subnet_id=self.subnet['id'],
tenant_id=self.tenant_id)
pool = body['pool']
self.addCleanup(self.admin_client.delete_pool, pool['id'])
body = self.admin_client.create_vip(name=name,
protocol="HTTP",
protocol_port=80,
subnet_id=self.subnet['id'],
pool_id=pool['id'],
tenant_id=self.tenant_id)
vip = body['vip']
self.addCleanup(self.admin_client.delete_vip, vip['id'])
self.assertIsNotNone(vip['id'])
self.assertEqual(self.tenant_id, vip['tenant_id'])
body = self.client.show_vip(vip['id'])
show_vip = body['vip']
self.assertEqual(vip['id'], show_vip['id'])
self.assertEqual(vip['name'], show_vip['name'])
@test.idempotent_id('74552cfc-ab78-4fb6-825b-f67bca379921')
def test_create_health_monitor_as_admin_for_another_tenant(self):
body = (
self.admin_client.create_health_monitor(delay=4,
max_retries=3,
type="TCP",
timeout=1,
tenant_id=self.tenant_id))
health_monitor = body['health_monitor']
self.addCleanup(self.admin_client.delete_health_monitor,
health_monitor['id'])
self.assertIsNotNone(health_monitor['id'])
self.assertEqual(self.tenant_id, health_monitor['tenant_id'])
body = self.client.show_health_monitor(health_monitor['id'])
show_health_monitor = body['health_monitor']
self.assertEqual(health_monitor['id'], show_health_monitor['id'])
@test.idempotent_id('266a192d-3c22-46c4-a8fb-802450301e82')
def test_create_pool_from_admin_user_other_tenant(self):
body = self.admin_client.create_pool(
name=data_utils.rand_name('pool-'),
lb_method="ROUND_ROBIN",
protocol="HTTP",
subnet_id=self.subnet['id'],
tenant_id=self.tenant_id)
pool = body['pool']
self.addCleanup(self.admin_client.delete_pool, pool['id'])
self.assertIsNotNone(pool['id'])
self.assertEqual(self.tenant_id, pool['tenant_id'])
@test.idempotent_id('158bb272-b9ed-4cfc-803c-661dac46f783')
def test_create_member_from_admin_user_other_tenant(self):
body = self.admin_client.create_member(address="10.0.9.47",
protocol_port=80,
pool_id=self.pool['id'],
tenant_id=self.tenant_id)
member = body['member']
self.addCleanup(self.admin_client.delete_member, member['id'])
self.assertIsNotNone(member['id'])
self.assertEqual(self.tenant_id, member['tenant_id'])

@ -90,10 +90,3 @@ class QuotasTest(base.BaseAdminNetworkTest):
def test_quotas(self):
new_quotas = {'network': 0, 'security_group': 0}
self._check_quotas(new_quotas)
@test.idempotent_id('a7add2b1-691e-44d6-875f-697d9685f091')
@test.requires_ext(extension='lbaas', service='network')
def test_lbaas_quotas(self):
new_quotas = {'vip': 1, 'pool': 2,
'member': 3, 'health_monitor': 4}
self._check_quotas(new_quotas)

@ -82,64 +82,19 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
cls.subnets = []
cls.ports = []
cls.routers = []
cls.pools = []
cls.vips = []
cls.members = []
cls.health_monitors = []
cls.vpnservices = []
cls.ikepolicies = []
cls.floating_ips = []
cls.metering_labels = []
cls.metering_label_rules = []
cls.fw_rules = []
cls.fw_policies = []
cls.ipsecpolicies = []
cls.ethertype = "IPv" + str(cls._ip_version)
@classmethod
def resource_cleanup(cls):
if CONF.service_available.neutron:
# Clean up ipsec policies
for ipsecpolicy in cls.ipsecpolicies:
cls._try_delete_resource(cls.client.delete_ipsecpolicy,
ipsecpolicy['id'])
# Clean up firewall policies
for fw_policy in cls.fw_policies:
cls._try_delete_resource(cls.client.delete_firewall_policy,
fw_policy['id'])
# Clean up firewall rules
for fw_rule in cls.fw_rules:
cls._try_delete_resource(cls.client.delete_firewall_rule,
fw_rule['id'])
# Clean up ike policies
for ikepolicy in cls.ikepolicies:
cls._try_delete_resource(cls.client.delete_ikepolicy,
ikepolicy['id'])
# Clean up vpn services
for vpnservice in cls.vpnservices:
cls._try_delete_resource(cls.client.delete_vpnservice,
vpnservice['id'])
# Clean up floating IPs
for floating_ip in cls.floating_ips:
cls._try_delete_resource(cls.client.delete_floatingip,
floating_ip['id'])
# Clean up health monitors
for health_monitor in cls.health_monitors:
cls._try_delete_resource(cls.client.delete_health_monitor,
health_monitor['id'])
# Clean up members
for member in cls.members:
cls._try_delete_resource(cls.client.delete_member,
member['id'])
# Clean up vips
for vip in cls.vips:
cls._try_delete_resource(cls.client.delete_vip,
vip['id'])
# Clean up pools
for pool in cls.pools:
cls._try_delete_resource(cls.client.delete_pool,
pool['id'])
# Clean up metering label rules
for metering_label_rule in cls.metering_label_rules:
cls._try_delete_resource(
@ -283,78 +238,6 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
cls.floating_ips.append(fip)
return fip
@classmethod
def create_pool(cls, name, lb_method, protocol, subnet):
"""Wrapper utility that returns a test pool."""
body = cls.client.create_pool(
name=name,
lb_method=lb_method,
protocol=protocol,
subnet_id=subnet['id'])
pool = body['pool']
cls.pools.append(pool)
return pool
@classmethod
def update_pool(cls, name):
"""Wrapper utility that returns a test pool."""
body = cls.client.update_pool(name=name)
pool = body['pool']
return pool
@classmethod
def create_vip(cls, name, protocol, protocol_port, subnet, pool):
"""Wrapper utility that returns a test vip."""
body = cls.client.create_vip(name=name,
protocol=protocol,
protocol_port=protocol_port,
subnet_id=subnet['id'],
pool_id=pool['id'])
vip = body['vip']
cls.vips.append(vip)
return vip
@classmethod
def update_vip(cls, name):
body = cls.client.update_vip(name=name)
vip = body['vip']
return vip
@classmethod
def create_member(cls, protocol_port, pool, ip_version=None):
"""Wrapper utility that returns a test member."""
ip_version = ip_version if ip_version is not None else cls._ip_version
member_address = "fd00::abcd" if ip_version == 6 else "10.0.9.46"
body = cls.client.create_member(address=member_address,
protocol_port=protocol_port,
pool_id=pool['id'])
member = body['member']
cls.members.append(member)
return member
@classmethod
def update_member(cls, admin_state_up):
body = cls.client.update_member(admin_state_up=admin_state_up)
member = body['member']
return member
@classmethod
def create_health_monitor(cls, delay, max_retries, Type, timeout):
"""Wrapper utility that returns a test health monitor."""
body = cls.client.create_health_monitor(delay=delay,
max_retries=max_retries,
type=Type,
timeout=timeout)
health_monitor = body['health_monitor']
cls.health_monitors.append(health_monitor)
return health_monitor
@classmethod
def update_health_monitor(cls, admin_state_up):
body = cls.client.update_vip(admin_state_up=admin_state_up)
health_monitor = body['health_monitor']
return health_monitor
@classmethod
def create_router_interface(cls, router_id, subnet_id):
"""Wrapper utility that returns a router interface."""
@ -362,44 +245,6 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
router_id, subnet_id)
return interface
@classmethod
def create_vpnservice(cls, subnet_id, router_id):
"""Wrapper utility that returns a test vpn service."""
body = cls.client.create_vpnservice(
subnet_id=subnet_id, router_id=router_id, admin_state_up=True,
name=data_utils.rand_name("vpnservice-"))
vpnservice = body['vpnservice']
cls.vpnservices.append(vpnservice)
return vpnservice
@classmethod
def create_ikepolicy(cls, name):
"""Wrapper utility that returns a test ike policy."""
body = cls.client.create_ikepolicy(name=name)
ikepolicy = body['ikepolicy']
cls.ikepolicies.append(ikepolicy)
return ikepolicy
@classmethod
def create_firewall_rule(cls, action, protocol):
"""Wrapper utility that returns a test firewall rule."""
body = cls.client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action=action,
protocol=protocol)
fw_rule = body['firewall_rule']
cls.fw_rules.append(fw_rule)
return fw_rule
@classmethod
def create_firewall_policy(cls):
"""Wrapper utility that returns a test firewall policy."""
body = cls.client.create_firewall_policy(
name=data_utils.rand_name("fw-policy"))
fw_policy = body['firewall_policy']
cls.fw_policies.append(fw_policy)
return fw_policy
@classmethod
def delete_router(cls, router):
body = cls.client.list_router_interfaces(router['id'])
@ -412,14 +257,6 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
pass
cls.client.delete_router(router['id'])
@classmethod
def create_ipsecpolicy(cls, name):
"""Wrapper utility that returns a test ipsec policy."""
body = cls.client.create_ipsecpolicy(name=name)
ipsecpolicy = body['ipsecpolicy']
cls.ipsecpolicies.append(ipsecpolicy)
return ipsecpolicy
class BaseAdminNetworkTest(BaseNetworkTest):

@ -1,333 +0,0 @@
# Copyright 2014 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
from tempest.api.network import base
from tempest import config
from tempest import exceptions
from tempest import test
CONF = config.CONF
class FWaaSExtensionTestJSON(base.BaseNetworkTest):
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
List firewall rules
Create firewall rule
Update firewall rule
Delete firewall rule
Show firewall rule
List firewall policies
Create firewall policy
Update firewall policy
Insert firewall rule to policy
Remove firewall rule from policy
Insert firewall rule after/before rule in policy
Update firewall policy audited attribute
Delete firewall policy
Show firewall policy
List firewall
Create firewall
Update firewall
Delete firewall
Show firewall
"""
@classmethod
def skip_checks(cls):
super(FWaaSExtensionTestJSON, cls).skip_checks()
if not test.is_extension_enabled('fwaas', 'network'):
msg = "FWaaS Extension not enabled."
raise cls.skipException(msg)
@classmethod
def resource_setup(cls):
super(FWaaSExtensionTestJSON, cls).resource_setup()
cls.fw_rule = cls.create_firewall_rule("allow", "tcp")
cls.fw_policy = cls.create_firewall_policy()
def _try_delete_policy(self, policy_id):
# delete policy, if it exists
try:
self.client.delete_firewall_policy(policy_id)
# if policy is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
def _try_delete_rule(self, rule_id):
# delete rule, if it exists
try:
self.client.delete_firewall_rule(rule_id)
# if rule is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
def _try_delete_firewall(self, fw_id):
# delete firewall, if it exists
try:
self.client.delete_firewall(fw_id)
# if firewall is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
self.client.wait_for_resource_deletion('firewall', fw_id)
def _wait_until_ready(self, fw_id):
target_states = ('ACTIVE', 'CREATED')
def _wait():
firewall = self.client.show_firewall(fw_id)
firewall = firewall['firewall']
return firewall['status'] in target_states
if not test.call_until_true(_wait, CONF.network.build_timeout,
CONF.network.build_interval):
status = self.client.show_firewall(fw_id)['firewall']['status']
m = ("Timed out waiting for firewall %s to reach %s state(s) "
"after %ss, currently in %s state." %
(fw_id,
target_states,
CONF.network.build_interval,
status))
raise exceptions.TimeoutException(m)
@test.idempotent_id('1b84cf01-9c09-4ce7-bc72-b15e39076468')
def test_list_firewall_rules(self):
# List firewall rules
fw_rules = self.client.list_firewall_rules()
fw_rules = fw_rules['firewall_rules']
self.assertIn((self.fw_rule['id'],
self.fw_rule['name'],
self.fw_rule['action'],
self.fw_rule['protocol'],
self.fw_rule['ip_version'],
self.fw_rule['enabled']),
[(m['id'],
m['name'],
m['action'],
m['protocol'],
m['ip_version'],
m['enabled']) for m in fw_rules])
@test.idempotent_id('563564f7-7077-4f5e-8cdc-51f37ae5a2b9')
def test_create_update_delete_firewall_rule(self):
# Create firewall rule
body = self.client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action="allow",
protocol="tcp")
fw_rule_id = body['firewall_rule']['id']
# Update firewall rule
body = self.client.update_firewall_rule(fw_rule_id,
shared=True)
self.assertTrue(body["firewall_rule"]['shared'])
# Delete firewall rule
self.client.delete_firewall_rule(fw_rule_id)
# Confirm deletion
fw_rules = self.client.list_firewall_rules()
self.assertNotIn(fw_rule_id,
[m['id'] for m in fw_rules['firewall_rules']])
@test.idempotent_id('3ff8c08e-26ff-4034-ae48-810ed213a998')
def test_show_firewall_rule(self):
# show a created firewall rule
fw_rule = self.client.show_firewall_rule(self.fw_rule['id'])
for key, value in six.iteritems(fw_rule['firewall_rule']):
self.assertEqual(self.fw_rule[key], value)
@test.idempotent_id('1086dd93-a4c0-4bbb-a1bd-6d4bc62c199f')
def test_list_firewall_policies(self):
fw_policies = self.client.list_firewall_policies()
fw_policies = fw_policies['firewall_policies']
self.assertIn((self.fw_policy['id'],
self.fw_policy['name'],
self.fw_policy['firewall_rules']),
[(m['id'],
m['name'],
m['firewall_rules']) for m in fw_policies])
@test.idempotent_id('bbf37b6c-498c-421e-9c95-45897d3ed775')
def test_create_update_delete_firewall_policy(self):
# Create firewall policy
body = self.client.create_firewall_policy(
name=data_utils.rand_name("fw-policy"))
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._try_delete_policy, fw_policy_id)
# Update firewall policy
body = self.client.update_firewall_policy(fw_policy_id,
shared=True,
name="updated_policy")
updated_fw_policy = body["firewall_policy"]
self.assertTrue(updated_fw_policy['shared'])
self.assertEqual("updated_policy", updated_fw_policy['name'])
# Delete firewall policy
self.client.delete_firewall_policy(fw_policy_id)
# Confirm deletion
fw_policies = self.client.list_firewall_policies()
fw_policies = fw_policies['firewall_policies']
self.assertNotIn(fw_policy_id, [m['id'] for m in fw_policies])
@test.idempotent_id('1df59b3a-517e-41d4-96f6-fc31cf4ecff2')
def test_show_firewall_policy(self):
# show a created firewall policy
fw_policy = self.client.show_firewall_policy(self.fw_policy['id'])
fw_policy = fw_policy['firewall_policy']
for key, value in six.iteritems(fw_policy):
self.assertEqual(self.fw_policy[key], value)
@test.idempotent_id('02082a03-3cdd-4789-986a-1327dd80bfb7')
def test_create_show_delete_firewall(self):
# Create tenant network resources required for an ACTIVE firewall
network = self.create_network()
subnet = self.create_subnet(network)
router = self.create_router(
data_utils.rand_name('router-'),
admin_state_up=True)
self.client.add_router_interface_with_subnet_id(
router['id'], subnet['id'])
# Create firewall
body = self.client.create_firewall(
name=data_utils.rand_name("firewall"),
firewall_policy_id=self.fw_policy['id'])
created_firewall = body['firewall']
firewall_id = created_firewall['id']
self.addCleanup(self._try_delete_firewall, firewall_id)
# Wait for the firewall resource to become ready
self._wait_until_ready(firewall_id)
# show a created firewall
firewall = self.client.show_firewall(firewall_id)
firewall = firewall['firewall']
for key, value in six.iteritems(firewall):
if key == 'status':
continue
self.assertEqual(created_firewall[key], value)
# list firewall
firewalls = self.client.list_firewalls()
firewalls = firewalls['firewalls']
self.assertIn((created_firewall['id'],
created_firewall['name'],
created_firewall['firewall_policy_id']),
[(m['id'],
m['name'],
m['firewall_policy_id']) for m in firewalls])
# Delete firewall
self.client.delete_firewall(firewall_id)
@test.idempotent_id('53305b4b-9897-4e01-87c0-2ae386083180')
def test_firewall_rule_insertion_position_removal_rule_from_policy(self):
# Create firewall rule
body = self.client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action="allow",
protocol="tcp")
fw_rule_id1 = body['firewall_rule']['id']
self.addCleanup(self._try_delete_rule, fw_rule_id1)
# Create firewall policy
body = self.client.create_firewall_policy(
name=data_utils.rand_name("fw-policy"))
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._try_delete_policy, fw_policy_id)
# Insert rule to firewall policy
self.client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id1, '', '')
# Verify insertion of rule in policy
self.assertIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id))
# Create another firewall rule
body = self.client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action="allow",
protocol="icmp")
fw_rule_id2 = body['firewall_rule']['id']
self.addCleanup(self._try_delete_rule, fw_rule_id2)
# Insert rule to firewall policy after the first rule
self.client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id2, fw_rule_id1, '')
# Verify the posiition of rule after insertion
fw_rule = self.client.show_firewall_rule(
fw_rule_id2)
self.assertEqual(int(fw_rule['firewall_rule']['position']), 2)
# Remove rule from the firewall policy
self.client.remove_firewall_rule_from_policy(
fw_policy_id, fw_rule_id2)
# Insert rule to firewall policy before the first rule
self.client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id2, '', fw_rule_id1)
# Verify the posiition of rule after insertion
fw_rule = self.client.show_firewall_rule(
fw_rule_id2)
self.assertEqual(int(fw_rule['firewall_rule']['position']), 1)
# Remove rule from the firewall policy
self.client.remove_firewall_rule_from_policy(
fw_policy_id, fw_rule_id2)
# Verify removal of rule from firewall policy
self.assertNotIn(fw_rule_id2, self._get_list_fw_rule_ids(fw_policy_id))
# Remove rule from the firewall policy
self.client.remove_firewall_rule_from_policy(
fw_policy_id, fw_rule_id1)
# Verify removal of rule from firewall policy
self.assertNotIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id))
def _get_list_fw_rule_ids(self, fw_policy_id):
fw_policy = self.client.show_firewall_policy(
fw_policy_id)
return [ruleid for ruleid in fw_policy['firewall_policy']
['firewall_rules']]
@test.idempotent_id('8515ca8a-0d2f-4298-b5ff-6f924e4587ca')
def test_update_firewall_policy_audited_attribute(self):
# Create firewall rule
body = self.client.create_firewall_rule(
name=data_utils.rand_name("fw-rule"),
action="allow",
protocol="icmp")
fw_rule_id = body['firewall_rule']['id']
self.addCleanup(self._try_delete_rule, fw_rule_id)
# Create firewall policy
body = self.client.create_firewall_policy(
name=data_utils.rand_name('fw-policy'))
fw_policy_id = body['firewall_policy']['id']
self.addCleanup(self._try_delete_policy, fw_policy_id)
self.assertFalse(body['firewall_policy']['audited'])
# Update firewall policy audited attribute to ture
self.client.update_firewall_policy(fw_policy_id,
audited=True)
# Insert Firewall rule to firewall policy
self.client.insert_firewall_rule_in_policy(
fw_policy_id, fw_rule_id, '', '')
body = self.client.show_firewall_policy(
fw_policy_id)
self.assertFalse(body['firewall_policy']['audited'])

@ -1,432 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from tempest_lib.common.utils import data_utils
from tempest_lib import decorators
from tempest.api.network import base
from tempest import test
class LoadBalancerTestJSON(base.BaseNetworkTest):
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
create vIP, and Pool
show vIP
list vIP
update vIP
delete vIP
update pool
delete pool
show pool
list pool
health monitoring operations
"""
@classmethod
def skip_checks(cls):
super(LoadBalancerTestJSON, cls).skip_checks()
if not test.is_extension_enabled('lbaas', 'network'):
msg = "lbaas extension not enabled."
raise cls.skipException(msg)
@classmethod
def resource_setup(cls):
super(LoadBalancerTestJSON, cls).resource_setup()
cls.network = cls.create_network()
cls.name = cls.network['name']
cls.subnet = cls.create_subnet(cls.network)
pool_name = data_utils.rand_name('pool-')
vip_name = data_utils.rand_name('vip-')
cls.pool = cls.create_pool(pool_name, "ROUND_ROBIN",
"HTTP", cls.subnet)
cls.vip = cls.create_vip(name=vip_name,
protocol="HTTP",
protocol_port=80,
subnet=cls.subnet,
pool=cls.pool)
cls.member = cls.create_member(80, cls.pool, cls._ip_version)
cls.member_address = ("10.0.9.47" if cls._ip_version == 4
else "2015::beef")
cls.health_monitor = cls.create_health_monitor(delay=4,
max_retries=3,
Type="TCP",
timeout=1)
def _check_list_with_filter(self, obj_name, attr_exceptions, **kwargs):
create_obj = getattr(self.client, 'create_' + obj_name)
delete_obj = getattr(self.client, 'delete_' + obj_name)
list_objs = getattr(self.client, 'list_' + obj_name + 's')
body = create_obj(**kwargs)
obj = body[obj_name]
self.addCleanup(delete_obj, obj['id'])
for key, value in six.iteritems(obj):
# It is not relevant to filter by all arguments. That is why
# there is a list of attr to except
if key not in attr_exceptions:
body = list_objs(**{key: value})
objs = [v[key] for v in body[obj_name + 's']]
self.assertIn(value, objs)
@test.idempotent_id('c96dbfab-4a80-4e74-a535-e950b5bedd47')
def test_list_vips(self):
# Verify the vIP exists in the list of all vIPs
body = self.client.list_vips()
vips = body['vips']
self.assertIn(self.vip['id'], [v['id'] for v in vips])
@test.idempotent_id('b8853f65-5089-4e69-befd-041a143427ff')
def test_list_vips_with_filter(self):
name = data_utils.rand_name('vip-')
body = self.client.create_pool(name=data_utils.rand_name("pool-"),
lb_method="ROUND_ROBIN",
protocol="HTTPS",
subnet_id=self.subnet['id'])
pool = body['pool']
self.addCleanup(self.client.delete_pool, pool['id'])
attr_exceptions = ['status', 'session_persistence',
'status_description']
self._check_list_with_filter(
'vip', attr_exceptions, name=name, protocol="HTTPS",
protocol_port=81, subnet_id=self.subnet['id'], pool_id=pool['id'],
description=data_utils.rand_name('description-'),
admin_state_up=False)
@test.idempotent_id('27f56083-9af9-4a48-abe9-ca1bcc6c9035')
def test_create_update_delete_pool_vip(self):
# Creates a vip
name = data_utils.rand_name('vip-')
address = self.subnet['allocation_pools'][0]['end']
body = self.client.create_pool(
name=data_utils.rand_name("pool-"),
lb_method='ROUND_ROBIN',
protocol='HTTP',
subnet_id=self.subnet['id'])
pool = body['pool']
body = self.client.create_vip(name=name,
protocol="HTTP",
protocol_port=80,
subnet_id=self.subnet['id'],
pool_id=pool['id'],
address=address)
vip = body['vip']
vip_id = vip['id']
# Confirm VIP's address correctness with a show
body = self.client.show_vip(vip_id)
vip = body['vip']
self.assertEqual(address, vip['address'])
# Verification of vip update
new_name = "New_vip"
new_description = "New description"
persistence_type = "HTTP_COOKIE"
update_data = {"session_persistence": {
"type": persistence_type}}
body = self.client.update_vip(vip_id,
name=new_name,
description=new_description,
connection_limit=10,
admin_state_up=False,
**update_data)
updated_vip = body['vip']
self.assertEqual(new_name, updated_vip['name'])
self.assertEqual(new_description, updated_vip['description'])
self.assertEqual(10, updated_vip['connection_limit'])
self.assertFalse(updated_vip['admin_state_up'])
self.assertEqual(persistence_type,
updated_vip['session_persistence']['type'])
self.client.delete_vip(vip['id'])
self.client.wait_for_resource_deletion('vip', vip['id'])
# Verification of pool update
new_name = "New_pool"
body = self.client.update_pool(pool['id'],
name=new_name,
description="new_description",
lb_method='LEAST_CONNECTIONS')
updated_pool = body['pool']
self.assertEqual(new_name, updated_pool['name'])
self.assertEqual('new_description', updated_pool['description'])
self.assertEqual('LEAST_CONNECTIONS', updated_pool['lb_method'])
self.client.delete_pool(pool['id'])
@test.idempotent_id('0435a95e-1d19-4d90-9e9f-3b979e9ad089')
def test_show_vip(self):
# Verifies the details of a vip
body = self.client.show_vip(self.vip['id'])
vip = body['vip']
for key, value in six.iteritems(vip):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(self.vip[key], value)
@test.idempotent_id('6e7a7d31-8451-456d-b24a-e50479ce42a7')
def test_show_pool(self):
# Here we need to new pool without any dependence with vips
body = self.client.create_pool(name=data_utils.rand_name("pool-"),
lb_method='ROUND_ROBIN',
protocol='HTTP',
subnet_id=self.subnet['id'])
pool = body['pool']
self.addCleanup(self.client.delete_pool, pool['id'])
# Verifies the details of a pool
body = self.client.show_pool(pool['id'])
shown_pool = body['pool']
for key, value in six.iteritems(pool):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(value, shown_pool[key])
@test.idempotent_id('d1ab1ffa-e06a-487f-911f-56418cb27727')
def test_list_pools(self):
# Verify the pool exists in the list of all pools
body = self.client.list_pools()
pools = body['pools']
self.assertIn(self.pool['id'], [p['id'] for p in pools])
@test.idempotent_id('27cc4c1a-caac-4273-b983-2acb4afaad4f')
def test_list_pools_with_filters(self):
attr_exceptions = ['status', 'vip_id', 'members', 'provider',
'status_description']
self._check_list_with_filter(
'pool', attr_exceptions, name=data_utils.rand_name("pool-"),
lb_method="ROUND_ROBIN", protocol="HTTPS",
subnet_id=self.subnet['id'],
description=data_utils.rand_name('description-'),
admin_state_up=False)
@test.idempotent_id('282d0dfd-5c3a-4c9b-b39c-c99782f39193')
def test_list_members(self):
# Verify the member exists in the list of all members
body = self.client.list_members()
members = body['members']
self.assertIn(self.member['id'], [m['id'] for m in members])
@test.idempotent_id('243b5126-24c6-4879-953e-7c7e32d8a57f')
def test_list_members_with_filters(self):
attr_exceptions = ['status', 'status_description']
self._check_list_with_filter('member', attr_exceptions,
address=self.member_address,
protocol_port=80,
pool_id=self.pool['id'])
@test.idempotent_id('fb833ee8-9e69-489f-b540-a409762b78b2')
def test_create_update_delete_member(self):
# Creates a member
body = self.client.create_member(address=self.member_address,
protocol_port=80,
pool_id=self.pool['id'])
member = body['member']
# Verification of member update
body = self.client.update_member(member['id'],
admin_state_up=False)
updated_member = body['member']
self.assertFalse(updated_member['admin_state_up'])
# Verification of member delete
self.client.delete_member(member['id'])
@test.idempotent_id('893cd71f-a7dd-4485-b162-f6ab9a534914')
def test_show_member(self):
# Verifies the details of a member
body = self.client.show_member(self.member['id'])
member = body['member']
for key, value in six.iteritems(member):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(self.member[key], value)
@test.idempotent_id('8e5822c5-68a4-4224-8d6c-a617741ebc2d')
def test_list_health_monitors(self):
# Verify the health monitor exists in the list of all health monitors
body = self.client.list_health_monitors()
health_monitors = body['health_monitors']
self.assertIn(self.health_monitor['id'],
[h['id'] for h in health_monitors])
@test.idempotent_id('49bac58a-511c-4875-b794-366698211d25')
def test_list_health_monitors_with_filters(self):
attr_exceptions = ['status', 'status_description', 'pools']
self._check_list_with_filter('health_monitor', attr_exceptions,
delay=5, max_retries=4, type="TCP",
timeout=2)
@test.idempotent_id('e8ce05c4-d554-4d1e-a257-ad32ce134bb5')
def test_create_update_delete_health_monitor(self):
# Creates a health_monitor
body = self.client.create_health_monitor(delay=4,
max_retries=3,
type="TCP",
timeout=1)
health_monitor = body['health_monitor']
# Verification of health_monitor update
body = (self.client.update_health_monitor
(health_monitor['id'],
admin_state_up=False))
updated_health_monitor = body['health_monitor']
self.assertFalse(updated_health_monitor['admin_state_up'])
# Verification of health_monitor delete
body = self.client.delete_health_monitor(health_monitor['id'])
@test.idempotent_id('d3e1aebc-06c2-49b3-9816-942af54012eb')
def test_create_health_monitor_http_type(self):
hm_type = "HTTP"
body = self.client.create_health_monitor(delay=4,
max_retries=3,
type=hm_type,
timeout=1)
health_monitor = body['health_monitor']
self.addCleanup(self.client.delete_health_monitor,
health_monitor['id'])
self.assertEqual(hm_type, health_monitor['type'])
@test.idempotent_id('0eff9f67-90fb-4bb1-b4ed-c5fda99fff0c')
def test_update_health_monitor_http_method(self):
body = self.client.create_health_monitor(delay=4,
max_retries=3,
type="HTTP",
timeout=1)
health_monitor = body['health_monitor']
self.addCleanup(self.client.delete_health_monitor,
health_monitor['id'])
body = (self.client.update_health_monitor
(health_monitor['id'],
http_method="POST",
url_path="/home/user",
expected_codes="290"))
updated_health_monitor = body['health_monitor']
self.assertEqual("POST", updated_health_monitor['http_method'])
self.assertEqual("/home/user", updated_health_monitor['url_path'])
self.assertEqual("290", updated_health_monitor['expected_codes'])
@test.idempotent_id('08e126ab-1407-483f-a22e-b11cc032ca7c')
def test_show_health_monitor(self):
# Verifies the details of a health_monitor
body = self.client.show_health_monitor(self.health_monitor['id'])
health_monitor = body['health_monitor']
for key, value in six.iteritems(health_monitor):
# 'status' should not be confirmed in api tests
if key != 'status':
self.assertEqual(self.health_monitor[key], value)
@test.idempotent_id('87f7628e-8918-493d-af50-0602845dbb5b')
def test_associate_disassociate_health_monitor_with_pool(self):
# Verify that a health monitor can be associated with a pool
self.client.associate_health_monitor_with_pool(
self.health_monitor['id'], self.pool['id'])
body = self.client.show_health_monitor(
self.health_monitor['id'])
health_monitor = body['health_monitor']
body = self.client.show_pool(self.pool['id'])
pool = body['pool']
self.assertIn(pool['id'],
[p['pool_id'] for p in health_monitor['pools']])
self.assertIn(health_monitor['id'], pool['health_monitors'])
# Verify that a health monitor can be disassociated from a pool
(self.client.disassociate_health_monitor_with_pool
(self.health_monitor['id'], self.pool['id']))
body = self.client.show_pool(self.pool['id'])
pool = body['pool']
body = self.client.show_health_monitor(
self.health_monitor['id'])
health_monitor = body['health_monitor']
self.assertNotIn(health_monitor['id'], pool['health_monitors'])
self.assertNotIn(pool['id'],
[p['pool_id'] for p in health_monitor['pools']])
@test.idempotent_id('525fc7dc-be24-408d-938d-822e9783e027')
def test_get_lb_pool_stats(self):
# Verify the details of pool stats
body = self.client.list_lb_pool_stats(self.pool['id'])
stats = body['stats']
self.assertIn("bytes_in", stats)
self.assertIn("total_connections", stats)
self.assertIn("active_connections", stats)
self.assertIn("bytes_out", stats)
@test.idempotent_id('66236be2-5121-4047-8cde-db4b83b110a5')
def test_update_list_of_health_monitors_associated_with_pool(self):
(self.client.associate_health_monitor_with_pool
(self.health_monitor['id'], self.pool['id']))
self.client.update_health_monitor(
self.health_monitor['id'], admin_state_up=False)
body = self.client.show_pool(self.pool['id'])
health_monitors = body['pool']['health_monitors']
for health_monitor_id in health_monitors:
body = self.client.show_health_monitor(health_monitor_id)
self.assertFalse(body['health_monitor']['admin_state_up'])
(self.client.disassociate_health_monitor_with_pool
(self.health_monitor['id'], self.pool['id']))
@test.idempotent_id('44ec9b40-b501-41e2-951f-4fc673b15ac0')
def test_update_admin_state_up_of_pool(self):
self.client.update_pool(self.pool['id'],
admin_state_up=False)
body = self.client.show_pool(self.pool['id'])
pool = body['pool']
self.assertFalse(pool['admin_state_up'])
@test.idempotent_id('466a9d4c-37c6-4ea2-b807-133437beb48c')
def test_show_vip_associated_with_pool(self):
body = self.client.show_pool(self.pool['id'])
pool = body['pool']
body = self.client.show_vip(pool['vip_id'])
vip = body['vip']
self.assertEqual(self.vip['name'], vip['name'])
self.assertEqual(self.vip['id'], vip['id'])
@test.idempotent_id('7b97694e-69d0-4151-b265-e1052a465aa8')
def test_show_members_associated_with_pool(self):
body = self.client.show_pool(self.pool['id'])
members = body['pool']['members']
for member_id in members:
body = self.client.show_member(member_id)
self.assertIsNotNone(body['member']['status'])
self.assertEqual(member_id, body['member']['id'])
self.assertIsNotNone(body['member']['admin_state_up'])
@test.idempotent_id('73ed6f27-595b-4b2c-969c-dbdda6b8ab34')
def test_update_pool_related_to_member(self):
# Create new pool
body = self.client.create_pool(name=data_utils.rand_name("pool-"),
lb_method='ROUND_ROBIN',
protocol='HTTP',
subnet_id=self.subnet['id'])
new_pool = body['pool']
self.addCleanup(self.client.delete_pool, new_pool['id'])
# Update member with new pool's id
body = self.client.update_member(self.member['id'],
pool_id=new_pool['id'])
# Confirm with show that pool_id change
body = self.client.show_member(self.member['id'])
member = body['member']
self.assertEqual(member['pool_id'], new_pool['id'])
# Update member with old pool id, this is needed for clean up
body = self.client.update_member(self.member['id'],
pool_id=self.pool['id'])
@test.idempotent_id('cf63f071-bbe3-40ba-97a0-a33e11923162')
def test_update_member_weight(self):
self.client.update_member(self.member['id'],
weight=2)
body = self.client.show_member(self.member['id'])
member = body['member']
self.assertEqual(2, member['weight'])
@decorators.skip_because(bug="1402007")
class LoadBalancerIpV6TestJSON(LoadBalancerTestJSON):
_ip_version = 6

@ -1,319 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
from tempest.api.network import base
from tempest import config
from tempest import test
CONF = config.CONF
class VPNaaSTestJSON(base.BaseAdminNetworkTest):
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
List, Show, Create, Delete, and Update VPN Service
List, Show, Create, Delete, and Update IKE policy
List, Show, Create, Delete, and Update IPSec policy
"""
@classmethod
def skip_checks(cls):
super(VPNaaSTestJSON, cls).skip_checks()
if not test.is_extension_enabled('vpnaas', 'network'):
msg = "vpnaas extension not enabled."
raise cls.skipException(msg)
@classmethod
def resource_setup(cls):
super(VPNaaSTestJSON, cls).resource_setup()
cls.ext_net_id = CONF.network.public_network_id
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.router = cls.create_router(
data_utils.rand_name("router"),
external_network_id=CONF.network.public_network_id)
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.vpnservice = cls.create_vpnservice(cls.subnet['id'],
cls.router['id'])
cls.ikepolicy = cls.create_ikepolicy(
data_utils.rand_name("ike-policy-"))
cls.ipsecpolicy = cls.create_ipsecpolicy(
data_utils.rand_name("ipsec-policy-"))
def _delete_ike_policy(self, ike_policy_id):
# Deletes a ike policy and verifies if it is deleted or not
ike_list = list()
all_ike = self.client.list_ikepolicies()
for ike in all_ike['ikepolicies']:
ike_list.append(ike['id'])
if ike_policy_id in ike_list:
self.client.delete_ikepolicy(ike_policy_id)
# Asserting that the policy is not found in list after deletion
ikepolicies = self.client.list_ikepolicies()
ike_id_list = list()
for i in ikepolicies['ikepolicies']:
ike_id_list.append(i['id'])
self.assertNotIn(ike_policy_id, ike_id_list)
def _delete_ipsec_policy(self, ipsec_policy_id):
# Deletes an ike policy if it exists
try:
self.client.delete_ipsecpolicy(ipsec_policy_id)
except lib_exc.NotFound:
pass
def _assertExpected(self, expected, actual):
# Check if not expected keys/values exists in actual response body
for key, value in six.iteritems(expected):
self.assertIn(key, actual)
self.assertEqual(value, actual[key])
def _delete_vpn_service(self, vpn_service_id):
self.client.delete_vpnservice(vpn_service_id)
# Asserting if vpn service is found in the list after deletion
body = self.client.list_vpnservices()
vpn_services = [vs['id'] for vs in body['vpnservices']]
self.assertNotIn(vpn_service_id, vpn_services)
def _get_tenant_id(self):
"""
Returns the tenant_id of the client current user
"""
# TODO(jroovers) This is a temporary workaround to get the tenant_id
# of the the current client. Replace this once tenant_isolation for
# neutron is fixed.
body = self.client.show_network(self.network['id'])
return body['network']['tenant_id']
@test.idempotent_id('14311574-0737-4e53-ac05-f7ae27742eed')
def test_admin_create_ipsec_policy_for_tenant(self):
tenant_id = self._get_tenant_id()
# Create IPSec policy for the newly created tenant
name = data_utils.rand_name('ipsec-policy')
body = (self.admin_client.
create_ipsecpolicy(name=name, tenant_id=tenant_id))
ipsecpolicy = body['ipsecpolicy']
self.assertIsNotNone(ipsecpolicy['id'])
self.addCleanup(self.admin_client.delete_ipsecpolicy,
ipsecpolicy['id'])
# Assert that created ipsec policy is found in API list call
body = self.client.list_ipsecpolicies()
ipsecpolicies = [policy['id'] for policy in body['ipsecpolicies']]
self.assertIn(ipsecpolicy['id'], ipsecpolicies)
@test.idempotent_id('b62acdc6-0c53-4d84-84aa-859b22b79799')
def test_admin_create_vpn_service_for_tenant(self):
tenant_id = self._get_tenant_id()
# Create vpn service for the newly created tenant
network2 = self.create_network()
subnet2 = self.create_subnet(network2)
router2 = self.create_router(data_utils.rand_name('router-'),
external_network_id=self.ext_net_id)
self.create_router_interface(router2['id'], subnet2['id'])
name = data_utils.rand_name('vpn-service')
body = self.admin_client.create_vpnservice(
subnet_id=subnet2['id'],
router_id=router2['id'],
name=name,
admin_state_up=True,
tenant_id=tenant_id)
vpnservice = body['vpnservice']
self.assertIsNotNone(vpnservice['id'])
self.addCleanup(self.admin_client.delete_vpnservice, vpnservice['id'])
# Assert that created vpnservice is found in API list call
body = self.client.list_vpnservices()
vpn_services = [vs['id'] for vs in body['vpnservices']]
self.assertIn(vpnservice['id'], vpn_services)
@test.idempotent_id('58cc4a1c-443b-4f39-8fb6-c19d39f343ab')
def test_admin_create_ike_policy_for_tenant(self):
tenant_id = self._get_tenant_id()
# Create IKE policy for the newly created tenant
name = data_utils.rand_name('ike-policy')
body = (self.admin_client.
create_ikepolicy(name=name, ike_version="v1",
encryption_algorithm="aes-128",
auth_algorithm="sha1",
tenant_id=tenant_id))
ikepolicy = body['ikepolicy']
self.assertIsNotNone(ikepolicy['id'])
self.addCleanup(self.admin_client.delete_ikepolicy, ikepolicy['id'])
# Assert that created ike policy is found in API list call
body = self.client.list_ikepolicies()
ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
self.assertIn(ikepolicy['id'], ikepolicies)
@test.idempotent_id('de5bb04c-3a1f-46b1-b329-7a8abba5c7f1')
def test_list_vpn_services(self):
# Verify the VPN service exists in the list of all VPN services
body = self.client.list_vpnservices()
vpnservices = body['vpnservices']
self.assertIn(self.vpnservice['id'], [v['id'] for v in vpnservices])
@test.idempotent_id('aacb13b1-fdc7-41fd-bab2-32621aee1878')
def test_create_update_delete_vpn_service(self):
# Creates a VPN service and sets up deletion
network1 = self.create_network()
subnet1 = self.create_subnet(network1)
router1 = self.create_router(data_utils.rand_name('router-'),
external_network_id=self.ext_net_id)
self.create_router_interface(router1['id'], subnet1['id'])
name = data_utils.rand_name('vpn-service1')
body = self.client.create_vpnservice(subnet_id=subnet1['id'],
router_id=router1['id'],
name=name,
admin_state_up=True)
vpnservice = body['vpnservice']
self.addCleanup(self._delete_vpn_service, vpnservice['id'])
# Assert if created vpnservices are not found in vpnservices list
body = self.client.list_vpnservices()
vpn_services = [vs['id'] for vs in body['vpnservices']]
self.assertIsNotNone(vpnservice['id'])
self.assertIn(vpnservice['id'], vpn_services)
# TODO(raies): implement logic to update vpnservice
# VPNaaS client function to update is implemented.
# But precondition is that current state of vpnservice
# should be "ACTIVE" not "PENDING*"
@test.idempotent_id('0dedfc1d-f8ee-4e2a-bfd4-7997b9dc17ff')
def test_show_vpn_service(self):
# Verifies the details of a vpn service
body = self.client.show_vpnservice(self.vpnservice['id'])
vpnservice = body['vpnservice']
self.assertEqual(self.vpnservice['id'], vpnservice['id'])
self.assertEqual(self.vpnservice['name'], vpnservice['name'])
self.assertEqual(self.vpnservice['description'],
vpnservice['description'])
self.assertEqual(self.vpnservice['router_id'], vpnservice['router_id'])
self.assertEqual(self.vpnservice['subnet_id'], vpnservice['subnet_id'])
self.assertEqual(self.vpnservice['tenant_id'], vpnservice['tenant_id'])
valid_status = ["ACTIVE", "DOWN", "BUILD", "ERROR", "PENDING_CREATE",
"PENDING_UPDATE", "PENDING_DELETE"]
self.assertIn(vpnservice['status'], valid_status)
@test.idempotent_id('e0fb6200-da3d-4869-8340-a8c1956ca618')
def test_list_ike_policies(self):
# Verify the ike policy exists in the list of all IKE policies
body = self.client.list_ikepolicies()
ikepolicies = body['ikepolicies']
self.assertIn(self.ikepolicy['id'], [i['id'] for i in ikepolicies])
@test.idempotent_id('d61f29a5-160c-487d-bc0d-22e32e731b44')
def test_create_update_delete_ike_policy(self):
# Creates a IKE policy
name = data_utils.rand_name('ike-policy')
body = (self.client.create_ikepolicy(
name=name,
ike_version="v1",
encryption_algorithm="aes-128",
auth_algorithm="sha1"))
ikepolicy = body['ikepolicy']
self.assertIsNotNone(ikepolicy['id'])
self.addCleanup(self._delete_ike_policy, ikepolicy['id'])
# Update IKE Policy
new_ike = {'name': data_utils.rand_name("New-IKE"),
'description': "Updated ike policy",
'encryption_algorithm': "aes-256",
'ike_version': "v2",
'pfs': "group14",
'lifetime': {'units': "seconds", 'value': 2000}}
self.client.update_ikepolicy(ikepolicy['id'], **new_ike)
# Confirm that update was successful by verifying using 'show'
body = self.client.show_ikepolicy(ikepolicy['id'])
ike_policy = body['ikepolicy']
for key, value in six.iteritems(new_ike):
self.assertIn(key, ike_policy)
self.assertEqual(value, ike_policy[key])
# Verification of ike policy delete
self.client.delete_ikepolicy(ikepolicy['id'])
body = self.client.list_ikepolicies()
ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
self.assertNotIn(ike_policy['id'], ikepolicies)
@test.idempotent_id('b5fcf3a3-9407-452d-b8a8-e7c6c32baea8')
def test_show_ike_policy(self):
# Verifies the details of a ike policy
body = self.client.show_ikepolicy(self.ikepolicy['id'])
ikepolicy = body['ikepolicy']
self.assertEqual(self.ikepolicy['id'], ikepolicy['id'])
self.assertEqual(self.ikepolicy['name'], ikepolicy['name'])
self.assertEqual(self.ikepolicy['description'],
ikepolicy['description'])
self.assertEqual(self.ikepolicy['encryption_algorithm'],
ikepolicy['encryption_algorithm'])
self.assertEqual(self.ikepolicy['auth_algorithm'],
ikepolicy['auth_algorithm'])
self.assertEqual(self.ikepolicy['tenant_id'],
ikepolicy['tenant_id'])
self.assertEqual(self.ikepolicy['pfs'],
ikepolicy['pfs'])
self.assertEqual(self.ikepolicy['phase1_negotiation_mode'],
ikepolicy['phase1_negotiation_mode'])
self.assertEqual(self.ikepolicy['ike_version'],
ikepolicy['ike_version'])
@test.idempotent_id('19ea0a2f-add9-44be-b732-ffd8a7b42f37')
def test_list_ipsec_policies(self):
# Verify the ipsec policy exists in the list of all ipsec policies
body = self.client.list_ipsecpolicies()
ipsecpolicies = body['ipsecpolicies']
self.assertIn(self.ipsecpolicy['id'], [i['id'] for i in ipsecpolicies])
@test.idempotent_id('9c1701c9-329a-4e5d-930a-1ead1b3f86ad')
def test_create_update_delete_ipsec_policy(self):
# Creates an ipsec policy
ipsec_policy_body = {'name': data_utils.rand_name('ipsec-policy'),
'pfs': 'group5',
'encryption_algorithm': "aes-128",
'auth_algorithm': 'sha1'}
resp_body = self.client.create_ipsecpolicy(**ipsec_policy_body)
ipsecpolicy = resp_body['ipsecpolicy']
self.addCleanup(self._delete_ipsec_policy, ipsecpolicy['id'])
self._assertExpected(ipsec_policy_body, ipsecpolicy)
# Verification of ipsec policy update
new_ipsec = {'description': 'Updated ipsec policy',
'pfs': 'group2',
'name': data_utils.rand_name("New-IPSec"),
'encryption_algorithm': "aes-256",
'lifetime': {'units': "seconds", 'value': '2000'}}
body = self.client.update_ipsecpolicy(ipsecpolicy['id'],
**new_ipsec)
updated_ipsec_policy = body['ipsecpolicy']
self._assertExpected(new_ipsec, updated_ipsec_policy)
# Verification of ipsec policy delete
self.client.delete_ipsecpolicy(ipsecpolicy['id'])
self.assertRaises(lib_exc.NotFound,
self.client.delete_ipsecpolicy, ipsecpolicy['id'])
@test.idempotent_id('601f8a05-9d3c-4539-a400-1c4b3a21b03b')
def test_show_ipsec_policy(self):
# Verifies the details of an ipsec policy
body = self.client.show_ipsecpolicy(self.ipsecpolicy['id'])
ipsecpolicy = body['ipsecpolicy']
self._assertExpected(self.ipsecpolicy, ipsecpolicy)

@ -420,131 +420,6 @@ class NetworkService(BaseService):
self.data['networks'] = networks
class NetworkIpSecPolicyService(NetworkService):
def list(self):
client = self.client
ipsecpols = client.list_ipsecpolicies()
ipsecpols = ipsecpols['ipsecpolicies']
ipsecpols = self._filter_by_tenant_id(ipsecpols)
LOG.debug("List count, %s IP Security Policies" % len(ipsecpols))
return ipsecpols
def delete(self):
client = self.client
ipsecpols = self.list()
for ipsecpol in ipsecpols:
try:
client.delete_ipsecpolicy(ipsecpol['id'])
except Exception as e:
LOG.exception("Delete IP Securty Policy exception: %s" % e)
pass
def dry_run(self):
ipsecpols = self.list()
self.data['ip_security_policies'] = ipsecpols
class NetworkFwPolicyService(NetworkService):
def list(self):
client = self.client
fwpols = client.list_firewall_policies()
fwpols = fwpols['firewall_policies']
fwpols = self._filter_by_tenant_id(fwpols)
LOG.debug("List count, %s Firewall Policies" % len(fwpols))
return fwpols
def delete(self):
client = self.client
fwpols = self.list()
for fwpol in fwpols:
try:
client.delete_firewall_policy(fwpol['id'])
except Exception as e:
LOG.exception("Delete Firewall Policy exception: %s" % e)
pass
def dry_run(self):
fwpols = self.list()
self.data['firewall_policies'] = fwpols
class NetworkFwRulesService(NetworkService):
def list(self):
client = self.client
fwrules = client.list_firewall_rules()
fwrules = fwrules['firewall_rules']
fwrules = self._filter_by_tenant_id(fwrules)
LOG.debug("List count, %s Firewall Rules" % len(fwrules))
return fwrules
def delete(self):
client = self.client
fwrules = self.list()
for fwrule in fwrules:
try:
client.delete_firewall_rule(fwrule['id'])
except Exception as e:
LOG.exception("Delete Firewall Rule exception: %s" % e)
pass
def dry_run(self):
fwrules = self.list()
self.data['firewall_rules'] = fwrules
class NetworkIkePolicyService(NetworkService):
def list(self):
client = self.client
ikepols = client.list_ikepolicies()
ikepols = ikepols['ikepolicies']
ikepols = self._filter_by_tenant_id(ikepols)
LOG.debug("List count, %s IKE Policies" % len(ikepols))
return ikepols
def delete(self):
client = self.client
ikepols = self.list()
for ikepol in ikepols:
try:
client.delete_firewall_rule(ikepol['id'])
except Exception as e:
LOG.exception("Delete IKE Policy exception: %s" % e)
pass
def dry_run(self):
ikepols = self.list()
self.data['ike_policies'] = ikepols
class NetworkVpnServiceService(NetworkService):
def list(self):
client = self.client
vpnsrvs = client.list_vpnservices()
vpnsrvs = vpnsrvs['vpnservices']
vpnsrvs = self._filter_by_tenant_id(vpnsrvs)
LOG.debug("List count, %s VPN Services" % len(vpnsrvs))
return vpnsrvs
def delete(self):
client = self.client
vpnsrvs = self.list()
for vpnsrv in vpnsrvs:
try:
client.delete_vpnservice(vpnsrv['id'])
except Exception as e:
LOG.exception("Delete VPN Service exception: %s" % e)
pass
def dry_run(self):
vpnsrvs = self.list()
self.data['vpn_services'] = vpnsrvs
class NetworkFloatingIpService(NetworkService):
def list(self):
@ -1094,18 +969,6 @@ def get_tenant_cleanup_services():
if IS_HEAT:
tenant_services.append(StackService)
if IS_NEUTRON:
if test.is_extension_enabled('vpnaas', 'network'):
tenant_services.append(NetworkIpSecPolicyService)
tenant_services.append(NetworkIkePolicyService)
tenant_services.append(NetworkVpnServiceService)
if test.is_extension_enabled('fwaas', 'network'):
tenant_services.append(NetworkFwPolicyService)
tenant_services.append(NetworkFwRulesService)
if test.is_extension_enabled('lbaas', 'network'):
tenant_services.append(NetworkHealthMonitorService)
tenant_services.append(NetworkMemberService)
tenant_services.append(NetworkVipService)
tenant_services.append(NetworkPoolService)
if test.is_extension_enabled('metering', 'network'):
tenant_services.append(NetworkMeteringLabelRuleService)
tenant_services.append(NetworkMeteringLabelService)

@ -934,41 +934,6 @@ class NetworkScenarioTest(ScenarioTest):
return rules
def _create_pool(self, lb_method, protocol, subnet_id):
"""Wrapper utility that returns a test pool."""
client = self.network_client
name = data_utils.rand_name('pool')
resp_pool = client.create_pool(protocol=protocol, name=name,
subnet_id=subnet_id,
lb_method=lb_method)
pool = net_resources.DeletablePool(client=client, **resp_pool['pool'])
self.assertEqual(pool['name'], name)
self.addCleanup(self.delete_wrapper, pool.delete)
return pool
def _create_member(self, address, protocol_port, pool_id):
"""Wrapper utility that returns a test member."""
client = self.network_client
resp_member = client.create_member(protocol_port=protocol_port,
pool_id=pool_id,
address=address)
member = net_resources.DeletableMember(client=client,
**resp_member['member'])
self.addCleanup(self.delete_wrapper, member.delete)
return member
def _create_vip(self, protocol, protocol_port, subnet_id, pool_id):
"""Wrapper utility that returns a test vip."""
client = self.network_client
name = data_utils.rand_name('vip')
resp_vip = client.create_vip(protocol=protocol, name=name,
subnet_id=subnet_id, pool_id=pool_id,
protocol_port=protocol_port)
vip = net_resources.DeletableVip(client=client, **resp_vip['vip'])
self.assertEqual(vip['name'], name)
self.addCleanup(self.delete_wrapper, vip.delete)
return vip
def _ssh_to_server(self, server, private_key):
ssh_login = CONF.compute.image_ssh_user
return self.get_remote_client(server,

@ -1,321 +0,0 @@
# Copyright 2014 Mirantis.inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tempfile
import time
import six
from six.moves.urllib import request as urllib2
from tempest.common import commands
from tempest import config
from tempest import exceptions
from tempest.scenario import manager
from tempest.services.network import resources as net_resources
from tempest import test
config = config.CONF
class TestLoadBalancerBasic(manager.NetworkScenarioTest):
"""
This test checks basic load balancing.
The following is the scenario outline:
1. Create an instance
2. SSH to the instance and start two servers
3. Create a load balancer with two members and with ROUND_ROBIN algorithm
associate the VIP with a floating ip
4. Send NUM requests to the floating ip and check that they are shared
between the two servers.
"""
@classmethod
def skip_checks(cls):
super(TestLoadBalancerBasic, cls).skip_checks()
cfg = config.network
if not test.is_extension_enabled('lbaas', 'network'):
msg = 'LBaaS Extension is not enabled'
raise cls.skipException(msg)
if not (cfg.tenant_networks_reachable or cfg.public_network_id):
msg = ('Either tenant_networks_reachable must be "true", or '
'public_network_id must be defined.')
raise cls.skipException(msg)
@classmethod
def resource_setup(cls):
super(TestLoadBalancerBasic, cls).resource_setup()
cls.servers_keypairs = {}
cls.members = []
cls.floating_ips = {}
cls.server_ips = {}
cls.port1 = 80
cls.port2 = 88
cls.num = 50
def setUp(self):
super(TestLoadBalancerBasic, self).setUp()
self.server_ips = {}
self.server_fixed_ips = {}
self._create_security_group_for_test()
self._set_net_and_subnet()
def _set_net_and_subnet(self):
"""
Query and set appropriate network and subnet attributes to be used
for the test. Existing tenant networks are used if they are found.
The configured private network and associated subnet is used as a
fallback in absence of tenant networking.
"""
try:
tenant_net = self._list_networks(tenant_id=self.tenant_id)[0]
except IndexError:
tenant_net = None
if tenant_net:
tenant_subnet = self._list_subnets(tenant_id=self.tenant_id)[0]
self.subnet = net_resources.DeletableSubnet(
client=self.network_client,
**tenant_subnet)
self.network = tenant_net
else:
self.network = self._get_network_by_name(
config.compute.fixed_network_name)
# TODO(adam_g): We are assuming that the first subnet associated
# with the fixed network is the one we want. In the future, we
# should instead pull a subnet id from config, which is set by
# devstack/admin/etc.
subnet = self._list_subnets(network_id=self.network['id'])[0]
self.subnet = net_resources.AttributeDict(subnet)
def _create_security_group_for_test(self):
self.security_group = self._create_security_group(
tenant_id=self.tenant_id)
self._create_security_group_rules_for_port(self.port1)
self._create_security_group_rules_for_port(self.port2)
def _create_security_group_rules_for_port(self, port):
rule = {
'direction': 'ingress',
'protocol': 'tcp',
'port_range_min': port,
'port_range_max': port,
}
self._create_security_group_rule(
secgroup=self.security_group,
tenant_id=self.tenant_id,
**rule)
def _create_server(self, name):
keypair = self.create_keypair()
security_groups = [{'name': self.security_group['name']}]
create_kwargs = {
'networks': [
{'uuid': self.network['id']},
],
'key_name': keypair['name'],
'security_groups': security_groups,
}
net_name = self.network['name']
server = self.create_server(name=name, create_kwargs=create_kwargs)
self.servers_keypairs[server['id']] = keypair
if (config.network.public_network_id and not
config.network.tenant_networks_reachable):
public_network_id = config.network.public_network_id
floating_ip = self.create_floating_ip(
server, public_network_id)
self.floating_ips[floating_ip] = server
self.server_ips[server['id']] = floating_ip.floating_ip_address
else:
self.server_ips[server['id']] =\
server['addresses'][net_name][0]['addr']
self.server_fixed_ips[server['id']] =\
server['addresses'][net_name][0]['addr']
self.assertTrue(self.servers_keypairs)
return server
def _create_servers(self):
for count in range(2):
self._create_server(name=("server%s" % (count + 1)))
self.assertEqual(len(self.servers_keypairs), 2)
def _start_servers(self):
"""
Start two backends
1. SSH to the instance
2. Start two http backends listening on ports 80 and 88 respectively
"""
for server_id, ip in six.iteritems(self.server_ips):
private_key = self.servers_keypairs[server_id]['private_key']
server_name = self.servers_client.get_server(server_id)['name']
username = config.scenario.ssh_user
ssh_client = self.get_remote_client(
server_or_ip=ip,
private_key=private_key)
# Write a backend's response into a file
resp = ('echo -ne "HTTP/1.1 200 OK\r\nContent-Length: 7\r\n'
'Connection: close\r\nContent-Type: text/html; '
'charset=UTF-8\r\n\r\n%s"; cat >/dev/null')
with tempfile.NamedTemporaryFile() as script:
script.write(resp % server_name)
script.flush()
with tempfile.NamedTemporaryFile() as key:
key.write(private_key)
key.flush()
commands.copy_file_to_host(script.name,
"/tmp/script1",
ip,
username, key.name)
# Start netcat
start_server = ('while true; do '
'sudo nc -ll -p %(port)s -e sh /tmp/%(script)s; '
'done > /dev/null &')
cmd = start_server % {'port': self.port1,
'script': 'script1'}
ssh_client.exec_command(cmd)
if len(self.server_ips) == 1:
with tempfile.NamedTemporaryFile() as script:
script.write(resp % 'server2')
script.flush()
with tempfile.NamedTemporaryFile() as key:
key.write(private_key)
key.flush()
commands.copy_file_to_host(script.name,
"/tmp/script2", ip,
username, key.name)
cmd = start_server % {'port': self.port2,
'script': 'script2'}
ssh_client.exec_command(cmd)
def _check_connection(self, check_ip, port=80):
def try_connect(ip, port):
try:
resp = urllib2.urlopen("http://{0}:{1}/".format(ip, port))
if resp.getcode() == 200:
return True
return False
except IOError:
return False
except urllib2.HTTPError:
return False
timeout = config.compute.ping_timeout
start = time.time()
while not try_connect(check_ip, port):
if (time.time() - start) > timeout:
message = "Timed out trying to connect to %s" % check_ip
raise exceptions.TimeoutException(message)
def _create_pool(self):
"""Create a pool with ROUND_ROBIN algorithm."""
self.pool = super(TestLoadBalancerBasic, self)._create_pool(
lb_method='ROUND_ROBIN',
protocol='HTTP',
subnet_id=self.subnet.id)
self.assertTrue(self.pool)
def _create_members(self):
"""
Create two members.
In case there is only one server, create both members with the same ip
but with different ports to listen on.
"""
for server_id, ip in six.iteritems(self.server_fixed_ips):
if len(self.server_fixed_ips) == 1:
member1 = self._create_member(address=ip,
protocol_port=self.port1,
pool_id=self.pool.id)
member2 = self._create_member(address=ip,
protocol_port=self.port2,
pool_id=self.pool.id)
self.members.extend([member1, member2])
else:
member = self._create_member(address=ip,
protocol_port=self.port1,
pool_id=self.pool.id)
self.members.append(member)
self.assertTrue(self.members)
def _assign_floating_ip_to_vip(self, vip):
public_network_id = config.network.public_network_id
port_id = vip.port_id
floating_ip = self.create_floating_ip(vip, public_network_id,
port_id=port_id)
self.floating_ips.setdefault(vip.id, [])
self.floating_ips[vip.id].append(floating_ip)
# Check for floating ip status before you check load-balancer
self.check_floating_ip_status(floating_ip, "ACTIVE")
def _create_load_balancer(self):
self._create_pool()
self._create_members()
self.vip = self._create_vip(protocol='HTTP',
protocol_port=80,
subnet_id=self.subnet.id,
pool_id=self.pool.id)
self.vip.wait_for_status('ACTIVE')
if (config.network.public_network_id and not
config.network.tenant_networks_reachable):
self._assign_floating_ip_to_vip(self.vip)
self.vip_ip = self.floating_ips[
self.vip.id][0]['floating_ip_address']
else:
self.vip_ip = self.vip.address
# Currently the ovs-agent is not enforcing security groups on the
# vip port - see https://bugs.launchpad.net/neutron/+bug/1163569
# However the linuxbridge-agent does, and it is necessary to add a
# security group with a rule that allows tcp port 80 to the vip port.
self.network_client.update_port(
self.vip.port_id, security_groups=[self.security_group.id])
def _check_load_balancing(self):
"""
1. Send NUM requests on the floating ip associated with the VIP
2. Check that the requests are shared between the two servers
"""
self._check_connection(self.vip_ip)
self._send_requests(self.vip_ip, ["server1", "server2"])
def _send_requests(self, vip_ip, servers):
counters = dict.fromkeys(servers, 0)
for i in range(self.num):
try:
server = urllib2.urlopen("http://{0}/".format(vip_ip)).read()
counters[server] += 1
# HTTP exception means fail of server, so don't increase counter
# of success and continue connection tries
except urllib2.HTTPError:
continue
# Assert that each member of the pool gets balanced at least once
for member, counter in six.iteritems(counters):
self.assertGreater(counter, 0, 'Member %s never balanced' % member)
@test.idempotent_id('c0c6f1ca-603b-4509-9c0f-2c63f0d838ee')
@test.services('compute', 'network')
def test_load_balancer_basic(self):
self._create_server('server1')
self._start_servers()
self._create_load_balancer()
self._check_load_balancing()

@ -42,34 +42,18 @@ class NetworkClientJSON(service_client.ServiceClient):
def get_uri(self, plural_name):
# get service prefix from resource name
# The following list represents resource names that do not require
# changing underscore to a hyphen
hyphen_exceptions = ["health_monitors", "firewall_rules",
"firewall_policies"]
# the following map is used to construct proper URI
# for the given neutron resource
service_resource_prefix_map = {
'networks': '',
'subnets': '',
'ports': '',
'pools': 'lb',
'vips': 'lb',
'health_monitors': 'lb',
'members': 'lb',
'ipsecpolicies': 'vpn',
'vpnservices': 'vpn',
'ikepolicies': 'vpn',
'ipsec-site-connections': 'vpn',
'metering_labels': 'metering',
'metering_label_rules': 'metering',
'firewall_rules': 'fw',
'firewall_policies': 'fw',
'firewalls': 'fw'
}
service_prefix = service_resource_prefix_map.get(
plural_name)
if plural_name not in hyphen_exceptions:
plural_name = plural_name.replace("_", "-")
plural_name = plural_name.replace("_", "-")
if service_prefix:
uri = '%s/%s/%s' % (self.uri_prefix, service_prefix,
plural_name)
@ -85,11 +69,7 @@ class NetworkClientJSON(service_client.ServiceClient):
resource_plural_map = {
'security_groups': 'security_groups',
'security_group_rules': 'security_group_rules',
'ipsecpolicy': 'ipsecpolicies',
'ikepolicy': 'ikepolicies',
'ipsec_site_connection': 'ipsec-site-connections',
'quotas': 'quotas',
'firewall_policy': 'firewall_policies'
}
return resource_plural_map.get(resource_name, resource_name + 's')
@ -385,29 +365,6 @@ class NetworkClientJSON(service_client.ServiceClient):
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def associate_health_monitor_with_pool(self, health_monitor_id,
pool_id):
post_body = {
"health_monitor": {
"id": health_monitor_id,
}
}
body = json.dumps(post_body)
uri = '%s/lb/pools/%s/health_monitors' % (self.uri_prefix,
pool_id)
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def disassociate_health_monitor_with_pool(self, health_monitor_id,
pool_id):
uri = '%s/lb/pools/%s/health_monitors/%s' % (self.uri_prefix, pool_id,
health_monitor_id)
resp, body = self.delete(uri)
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def list_router_interfaces(self, uuid):
uri = '%s/ports?device_id=%s' % (self.uri_prefix, uuid)
resp, body = self.get(uri)
@ -428,21 +385,6 @@ class NetworkClientJSON(service_client.ServiceClient):
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_pools_hosted_by_one_lbaas_agent(self, agent_id):
uri = '%s/agents/%s/loadbalancer-pools' % (self.uri_prefix, agent_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def show_lbaas_agent_hosting_pool(self, pool_id):
uri = ('%s/lb/pools/%s/loadbalancer-agent' %
(self.uri_prefix, pool_id))
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_routers_on_l3_agent(self, agent_id):
uri = '%s/agents/%s/l3-routers' % (self.uri_prefix, agent_id)
resp, body = self.get(uri)
@ -494,21 +436,6 @@ class NetworkClientJSON(service_client.ServiceClient):
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
def create_ikepolicy(self, name, **kwargs):
post_body = {
"ikepolicy": {
"name": name,
}
}
for key, val in kwargs.items():
post_body['ikepolicy'][key] = val
body = json.dumps(post_body)
uri = '%s/vpn/ikepolicies' % (self.uri_prefix)
resp, body = self.post(uri, body)
self.expected_success(201, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def update_extra_routes(self, router_id, nexthop, destination):
uri = '%s/routers/%s' % (self.uri_prefix, router_id)
put_body = {
@ -537,13 +464,6 @@ class NetworkClientJSON(service_client.ServiceClient):
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def list_lb_pool_stats(self, pool_id):
uri = '%s/lb/pools/%s/stats' % (self.uri_prefix, pool_id)
resp, body = self.get(uri)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def add_dhcp_agent_to_network(self, agent_id, network_id):
post_body = {'network_id': network_id}
body = json.dumps(post_body)
@ -552,30 +472,3 @@ class NetworkClientJSON(service_client.ServiceClient):
self.expected_success(201, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def insert_firewall_rule_in_policy(self, firewall_policy_id,
firewall_rule_id, insert_after="",
insert_before=""):
uri = '%s/fw/firewall_policies/%s/insert_rule' % (self.uri_prefix,
firewall_policy_id)
body = {
"firewall_rule_id": firewall_rule_id,
"insert_after": insert_after,
"insert_before": insert_before
}
body = json.dumps(body)
resp, body = self.put(uri, body)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)
def remove_firewall_rule_from_policy(self, firewall_policy_id,
firewall_rule_id):
uri = '%s/fw/firewall_policies/%s/remove_rule' % (self.uri_prefix,
firewall_policy_id)
update_body = {"firewall_rule_id": firewall_rule_id}
update_body = json.dumps(update_body)
resp, body = self.put(uri, update_body)
self.expected_success(200, resp.status)
body = json.loads(body)
return service_client.ResponseBody(resp, body)