Migrate to testtools for functional tests

The standard in OpenStack is to use testtools not unittest. Use the base
test class which is based on testtools. This class also sets up loggers
appropriately.

unittest has an assertEqual method that is more appropriate.
Replace use of equal comparisons to None with assertIsNone.
Replace use of setUpClass and tearDownClass with setUp and tearDown.

Change-Id: Ie0f47dc7c0905164030953ba2d2633bd095782ca
This commit is contained in:
Monty Taylor 2017-11-10 16:41:01 +11:00
parent 240f5911d2
commit a25f8f3518
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
51 changed files with 876 additions and 975 deletions

View File

@ -227,7 +227,6 @@
- openstacksdk-ansible-devel-functional-devstack
- openstacksdk-ansible-functional-devstack
- openstacksdk-functional-devstack
- openstacksdk-functional-devstack-legacy
- openstacksdk-functional-devstack-magnum
- openstacksdk-functional-devstack-python3
gate:

View File

@ -13,9 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
# TODO(shade) Remove all use of setUpClass and tearDownClass. setUp and
# addCleanup should be used instead.
import os
import fixtures
@ -69,7 +66,7 @@ class TestCase(testtools.TestCase):
formatter = logging.Formatter('%(asctime)s %(name)-32s %(message)s')
handler.setFormatter(formatter)
logger = logging.getLogger('shade')
logger = logging.getLogger('openstack')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

View File

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import testtools
from examples.compute import create
from examples.compute import delete
@ -21,16 +21,16 @@ from examples.network import find as network_find
from examples.network import list as network_list
class TestCompute(unittest.TestCase):
class TestCompute(testtools.TestCase):
"""Test the compute examples
The purpose of these tests is to ensure the examples run without erring
out.
"""
@classmethod
def setUpClass(cls):
cls.conn = connect.create_connection_from_config()
def setUp(self):
super(TestCompute, self).setUp()
self.conn = connect.create_connection_from_config()
def test_compute(self):
compute_list.list_servers(self.conn)

View File

@ -10,22 +10,22 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import testtools
from examples import connect
from examples.identity import list as identity_list
class TestIdentity(unittest.TestCase):
class TestIdentity(testtools.TestCase):
"""Test the identity examples
The purpose of these tests is to ensure the examples run without erring
out.
"""
@classmethod
def setUpClass(cls):
cls.conn = connect.create_connection_from_config()
def setUp(self):
super(TestIdentity, self).setUp()
self.conn = connect.create_connection_from_config()
def test_identity(self):
identity_list.list_users(self.conn)

View File

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import testtools
from examples import connect
from examples.image import create as image_create
@ -18,16 +18,16 @@ from examples.image import delete as image_delete
from examples.image import list as image_list
class TestImage(unittest.TestCase):
class TestImage(testtools.TestCase):
"""Test the image examples
The purpose of these tests is to ensure the examples run without erring
out.
"""
@classmethod
def setUpClass(cls):
cls.conn = connect.create_connection_from_config()
def setUp(self):
super(TestImage, self).setUp()
self.conn = connect.create_connection_from_config()
def test_image(self):
image_list.list_images(self.conn)

View File

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import testtools
from examples import connect
from examples.network import create as network_create
@ -19,16 +19,16 @@ from examples.network import find as network_find
from examples.network import list as network_list
class TestNetwork(unittest.TestCase):
class TestNetwork(testtools.TestCase):
"""Test the network examples
The purpose of these tests is to ensure the examples run without erring
out.
"""
@classmethod
def setUpClass(cls):
cls.conn = connect.create_connection_from_config()
def setUp(self):
super(TestNetwork, self).setUp()
self.conn = connect.create_connection_from_config()
def test_network(self):
network_list.list_networks(self.conn)

View File

@ -12,11 +12,10 @@
import os
import openstack.config
import time
import unittest
from keystoneauth1 import exceptions as _exceptions
from openstack import connection
from openstack.tests import base
#: Defines the OpenStack Client Config (OCC) cloud key in your OCC config
@ -46,38 +45,34 @@ IMAGE_NAME = _get_resource_value('image_name', 'cirros-0.3.5-x86_64-disk')
FLAVOR_NAME = _get_resource_value('flavor_name', 'm1.small')
def service_exists(**kwargs):
"""Decorator function to check whether a service exists
class BaseFunctionalTest(base.TestCase):
Usage:
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
class TestMeter(base.BaseFunctionalTest):
...
def setUp(self):
super(BaseFunctionalTest, self).setUp()
self.conn = connection.from_config(cloud_name=TEST_CLOUD)
:param kwargs: The kwargs needed to filter an endpoint.
:returns: True if the service exists, otherwise False.
"""
try:
conn = connection.from_config(cloud_name=TEST_CLOUD)
conn.session.get_endpoint(**kwargs)
def addEmptyCleanup(self, func, *args, **kwargs):
def cleanup():
result = func(*args, **kwargs)
self.assertIsNone(result)
self.addCleanup(cleanup)
return True
except _exceptions.EndpointNotFound:
return False
# TODO(shade) Replace this with call to conn.has_service when we've merged
# the shade methods into Connection.
def require_service(self, service_type, **kwargs):
"""Method to check whether a service exists
Usage:
class TestMeter(base.BaseFunctionalTest):
...
def setUp(self):
super(TestMeter, self).setUp()
self.require_service('metering')
class BaseFunctionalTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.conn = connection.from_config(cloud_name=TEST_CLOUD)
@classmethod
def assertIs(cls, expected, actual):
if expected != actual:
raise Exception(expected + ' != ' + actual)
@classmethod
def linger_for_delete(cls):
time.sleep(40)
:returns: True if the service exists, otherwise False.
"""
try:
self.conn.session.get_endpoint(service_type=service_type, **kwargs)
except _exceptions.EndpointNotFound:
self.skipTest('Service {service_type} not found in cloud'.format(
service_type=service_type))

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.block_store.v2 import snapshot as _snapshot
from openstack.block_store.v2 import volume as _volume
@ -19,49 +18,50 @@ from openstack.tests.functional import base
class TestSnapshot(base.BaseFunctionalTest):
SNAPSHOT_NAME = uuid.uuid4().hex
SNAPSHOT_ID = None
VOLUME_NAME = uuid.uuid4().hex
VOLUME_ID = None
def setUp(self):
super(TestSnapshot, self).setUp()
@classmethod
def setUpClass(cls):
super(TestSnapshot, cls).setUpClass()
volume = cls.conn.block_store.create_volume(
name=cls.VOLUME_NAME,
self.SNAPSHOT_NAME = self.getUniqueString()
self.SNAPSHOT_ID = None
self.VOLUME_NAME = self.getUniqueString()
self.VOLUME_ID = None
volume = self.conn.block_store.create_volume(
name=self.VOLUME_NAME,
size=1)
cls.conn.block_store.wait_for_status(volume,
status='available',
failures=['error'],
interval=2,
wait=120)
self.conn.block_store.wait_for_status(
volume,
status='available',
failures=['error'],
interval=2,
wait=120)
assert isinstance(volume, _volume.Volume)
cls.assertIs(cls.VOLUME_NAME, volume.name)
cls.VOLUME_ID = volume.id
snapshot = cls.conn.block_store.create_snapshot(
name=cls.SNAPSHOT_NAME,
volume_id=cls.VOLUME_ID)
cls.conn.block_store.wait_for_status(snapshot,
status='available',
failures=['error'],
interval=2,
wait=120)
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
snapshot = self.conn.block_store.create_snapshot(
name=self.SNAPSHOT_NAME,
volume_id=self.VOLUME_ID)
self.conn.block_store.wait_for_status(
snapshot,
status='available',
failures=['error'],
interval=2,
wait=120)
assert isinstance(snapshot, _snapshot.Snapshot)
cls.assertIs(cls.SNAPSHOT_NAME, snapshot.name)
cls.SNAPSHOT_ID = snapshot.id
self.assertEqual(self.SNAPSHOT_NAME, snapshot.name)
self.SNAPSHOT_ID = snapshot.id
@classmethod
def tearDownClass(cls):
snapshot = cls.conn.block_store.get_snapshot(cls.SNAPSHOT_ID)
sot = cls.conn.block_store.delete_snapshot(snapshot,
ignore_missing=False)
cls.conn.block_store.wait_for_delete(snapshot,
interval=2,
wait=120)
cls.assertIs(None, sot)
sot = cls.conn.block_store.delete_volume(cls.VOLUME_ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
snapshot = self.conn.block_store.get_snapshot(self.SNAPSHOT_ID)
sot = self.conn.block_store.delete_snapshot(
snapshot, ignore_missing=False)
self.conn.block_store.wait_for_delete(
snapshot, interval=2, wait=120)
self.assertIsNone(sot)
sot = self.conn.block_store.delete_volume(
self.VOLUME_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestSnapshot, self).tearDown()
def test_get(self):
sot = self.conn.block_store.get_snapshot(self.SNAPSHOT_ID)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.block_store.v2 import type as _type
from openstack.tests.functional import base
@ -18,22 +17,22 @@ from openstack.tests.functional import base
class TestType(base.BaseFunctionalTest):
TYPE_NAME = uuid.uuid4().hex
TYPE_ID = None
def setUp(self):
super(TestType, self).setUp()
@classmethod
def setUpClass(cls):
super(TestType, cls).setUpClass()
sot = cls.conn.block_store.create_type(name=cls.TYPE_NAME)
self.TYPE_NAME = self.getUniqueString()
self.TYPE_ID = None
sot = self.conn.block_store.create_type(name=self.TYPE_NAME)
assert isinstance(sot, _type.Type)
cls.assertIs(cls.TYPE_NAME, sot.name)
cls.TYPE_ID = sot.id
self.assertEqual(self.TYPE_NAME, sot.name)
self.TYPE_ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.block_store.delete_type(cls.TYPE_ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.block_store.delete_type(
self.TYPE_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestType, self).tearDown()
def test_get(self):
sot = self.conn.block_store.get_type(self.TYPE_ID)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.block_store.v2 import volume as _volume
from openstack.tests.functional import base
@ -18,29 +17,31 @@ from openstack.tests.functional import base
class TestVolume(base.BaseFunctionalTest):
VOLUME_NAME = uuid.uuid4().hex
VOLUME_ID = None
def setUp(self):
super(TestVolume, self).setUp()
@classmethod
def setUpClass(cls):
super(TestVolume, cls).setUpClass()
volume = cls.conn.block_store.create_volume(
name=cls.VOLUME_NAME,
self.VOLUME_NAME = self.getUniqueString()
self.VOLUME_ID = None
volume = self.conn.block_store.create_volume(
name=self.VOLUME_NAME,
size=1)
cls.conn.block_store.wait_for_status(volume,
status='available',
failures=['error'],
interval=2,
wait=120)
self.conn.block_store.wait_for_status(
volume,
status='available',
failures=['error'],
interval=2,
wait=120)
assert isinstance(volume, _volume.Volume)
cls.assertIs(cls.VOLUME_NAME, volume.name)
cls.VOLUME_ID = volume.id
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.block_store.delete_volume(cls.VOLUME_ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.block_store.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.assertIsNone(sot)
super(TestVolume, self).tearDown()
def test_get(self):
sot = self.conn.block_store.get_volume(self.VOLUME_ID)

View File

@ -18,11 +18,10 @@ from openstack.tests.functional import base
class TestFlavor(base.BaseFunctionalTest):
@classmethod
def setUpClass(cls):
super(TestFlavor, cls).setUpClass()
def setUp(self):
super(TestFlavor, self).setUp()
cls.one_flavor = list(cls.conn.compute.flavors())[0]
self.one_flavor = list(self.conn.compute.flavors())[0]
def test_flavors(self):
flavors = list(self.conn.compute.flavors())

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.compute.v2 import keypair
from openstack.tests.functional import base
@ -18,22 +17,23 @@ from openstack.tests.functional import base
class TestKeypair(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
ID = None
def setUp(self):
super(TestKeypair, self).setUp()
@classmethod
def setUpClass(cls):
super(TestKeypair, cls).setUpClass()
sot = cls.conn.compute.create_keypair(name=cls.NAME)
# Keypairs can't have .'s in the name. Because why?
self.NAME = self.getUniqueString().split('.')[-1]
self.ID = None
sot = self.conn.compute.create_keypair(name=self.NAME)
assert isinstance(sot, keypair.Keypair)
cls.assertIs(cls.NAME, sot.name)
cls._keypair = sot
cls.ID = sot.id
self.assertEqual(self.NAME, sot.name)
self._keypair = sot
self.ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.compute.delete_keypair(cls._keypair)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.compute.delete_keypair(self._keypair)
self.assertIsNone(sot)
super(TestKeypair, self).tearDown()
def test_find(self):
sot = self.conn.compute.find_keypair(self.NAME)

View File

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import time
from openstack.compute.v2 import server
from openstack.tests.functional import base
@ -19,44 +19,41 @@ from openstack.tests.functional.network.v2 import test_network
class TestServer(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
server = None
network = None
subnet = None
cidr = '10.99.99.0/16'
def setUp(self):
super(TestServer, self).setUp()
self.NAME = self.getUniqueString()
self.server = None
self.network = None
self.subnet = None
self.cidr = '10.99.99.0/16'
@classmethod
def setUpClass(cls):
super(TestServer, cls).setUpClass()
flavor = cls.conn.compute.find_flavor(base.FLAVOR_NAME,
ignore_missing=False)
image = cls.conn.compute.find_image(base.IMAGE_NAME,
ignore_missing=False)
cls.network, cls.subnet = test_network.create_network(cls.conn,
cls.NAME,
cls.cidr)
if not cls.network:
# We can't call TestCase.fail from within the setUpClass
# classmethod, but we need to raise some exception in order
# to get this setup to fail and thusly fail the entire class.
raise Exception("Unable to create network for TestServer")
flavor = self.conn.compute.find_flavor(base.FLAVOR_NAME,
ignore_missing=False)
image = self.conn.compute.find_image(base.IMAGE_NAME,
ignore_missing=False)
self.network, self.subnet = test_network.create_network(
self.conn,
self.NAME,
self.cidr)
self.assertIsNotNone(self.network)
sot = cls.conn.compute.create_server(
name=cls.NAME, flavor_id=flavor.id, image_id=image.id,
networks=[{"uuid": cls.network.id}])
cls.conn.compute.wait_for_server(sot)
sot = self.conn.compute.create_server(
name=self.NAME, flavor_id=flavor.id, image_id=image.id,
networks=[{"uuid": self.network.id}])
self.conn.compute.wait_for_server(sot)
assert isinstance(sot, server.Server)
cls.assertIs(cls.NAME, sot.name)
cls.server = sot
self.assertEqual(self.NAME, sot.name)
self.server = sot
@classmethod
def tearDownClass(cls):
sot = cls.conn.compute.delete_server(cls.server.id)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.compute.delete_server(self.server.id)
self.assertIsNone(sot)
# Need to wait for the stack to go away before network delete
cls.conn.compute.wait_for_delete(cls.server)
cls.linger_for_delete()
test_network.delete_network(cls.conn, cls.network, cls.subnet)
self.conn.compute.wait_for_delete(self.server)
# TODO(shade) sleeping in tests is bad mmkay?
time.sleep(40)
test_network.delete_network(self.conn, self.network, self.subnet)
super(TestServer, self).tearDown()
def test_find(self):
sot = self.conn.compute.find_server(self.NAME)

View File

@ -22,23 +22,20 @@ class TestImage(base.BaseFunctionalTest):
def __init__(self):
self.image_api_version = '2'
@classmethod
def setUpClass(cls):
opts = cls.ImageOpts()
cls.conn = connection.from_config(cloud_name=base.TEST_CLOUD,
options=opts)
def setUp(self):
super(TestImage, self).setUp()
opts = self.ImageOpts()
self.conn = connection.from_config(
cloud_name=base.TEST_CLOUD, options=opts)
cls.img = cls.conn.image.upload_image(
self.img = self.conn.image.upload_image(
name=TEST_IMAGE_NAME,
disk_format='raw',
container_format='bare',
properties='{"description": "This is not an image"}',
data=open('CONTRIBUTING.rst', 'r')
)
@classmethod
def tearDownClass(cls):
cls.conn.image.delete_image(cls.img)
self.addCleanup(self.conn.image.delete_image, self.img)
def test_get_image(self):
img2 = self.conn.image.get_image(self.img)

View File

@ -19,12 +19,7 @@ from openstack.tests.functional import base
class BaseLBFunctionalTest(base.BaseFunctionalTest):
@classmethod
def setUpClass(cls):
super(BaseLBFunctionalTest, cls).setUpClass()
@classmethod
def lb_wait_for_status(cls, lb, status, failures, interval=1, wait=120):
def lb_wait_for_status(self, lb, status, failures, interval=1, wait=120):
"""Wait for load balancer to be in a particular provisioning status.
:param lb: The load balancer to wait on to reach the status.
@ -49,7 +44,7 @@ class BaseLBFunctionalTest(base.BaseFunctionalTest):
failures = []
while total_sleep < wait:
lb = cls.conn.load_balancer.get_load_balancer(lb.id)
lb = self.conn.load_balancer.get_load_balancer(lb.id)
if lb.provisioning_status == status:
return None
if lb.provisioning_status in failures:

View File

@ -10,9 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import uuid
from openstack.load_balancer.v2 import health_monitor
from openstack.load_balancer.v2 import l7_policy
from openstack.load_balancer.v2 import l7_rule
@ -20,21 +17,11 @@ from openstack.load_balancer.v2 import listener
from openstack.load_balancer.v2 import load_balancer
from openstack.load_balancer.v2 import member
from openstack.load_balancer.v2 import pool
from openstack.tests.functional import base
from openstack.tests.functional.load_balancer import base as lb_base
@unittest.skipUnless(base.service_exists(service_type='load-balancer'),
'Load-balancer service does not exist')
class TestLoadBalancer(lb_base.BaseLBFunctionalTest):
HM_NAME = uuid.uuid4().hex
L7POLICY_NAME = uuid.uuid4().hex
LB_NAME = uuid.uuid4().hex
LISTENER_NAME = uuid.uuid4().hex
MEMBER_NAME = uuid.uuid4().hex
POOL_NAME = uuid.uuid4().hex
UPDATE_NAME = uuid.uuid4().hex
HM_ID = None
L7POLICY_ID = None
LB_ID = None
@ -58,110 +45,121 @@ class TestLoadBalancer(lb_base.BaseLBFunctionalTest):
L7RULE_TYPE = 'HOST_NAME'
L7RULE_VALUE = 'example'
# Note: Creating load balancers can be slow on some hosts due to nova
# instance boot times (up to ten minutes) so we are consolidating
# all of our functional tests here to reduce test runtime.
@classmethod
def setUpClass(cls):
super(TestLoadBalancer, cls).setUpClass()
subnets = list(cls.conn.network.subnets())
cls.VIP_SUBNET_ID = subnets[0].id
cls.PROJECT_ID = cls.conn.session.get_project_id()
test_lb = cls.conn.load_balancer.create_load_balancer(
name=cls.LB_NAME, vip_subnet_id=cls.VIP_SUBNET_ID,
project_id=cls.PROJECT_ID)
# TODO(shade): Creating load balancers can be slow on some hosts due to
# nova instance boot times (up to ten minutes). This used to
# use setUpClass, but that's a whole other pile of bad, so
# we may need to engineer something pleasing here.
def setUp(self):
super(TestLoadBalancer, self).setUp()
self.require_service('load-balancer')
self.HM_NAME = self.getUniqueString()
self.L7POLICY_NAME = self.getUniqueString()
self.LB_NAME = self.getUniqueString()
self.LISTENER_NAME = self.getUniqueString()
self.MEMBER_NAME = self.getUniqueString()
self.POOL_NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
subnets = list(self.conn.network.subnets())
self.VIP_SUBNET_ID = subnets[0].id
self.PROJECT_ID = self.conn.session.get_project_id()
test_lb = self.conn.load_balancer.create_load_balancer(
name=self.LB_NAME, vip_subnet_id=self.VIP_SUBNET_ID,
project_id=self.PROJECT_ID)
assert isinstance(test_lb, load_balancer.LoadBalancer)
cls.assertIs(cls.LB_NAME, test_lb.name)
self.assertEqual(self.LB_NAME, test_lb.name)
# Wait for the LB to go ACTIVE. On non-virtualization enabled hosts
# it can take nova up to ten minutes to boot a VM.
cls.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'], interval=1, wait=600)
cls.LB_ID = test_lb.id
self.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'], interval=1, wait=600)
self.LB_ID = test_lb.id
test_listener = cls.conn.load_balancer.create_listener(
name=cls.LISTENER_NAME, protocol=cls.PROTOCOL,
protocol_port=cls.PROTOCOL_PORT, loadbalancer_id=cls.LB_ID)
test_listener = self.conn.load_balancer.create_listener(
name=self.LISTENER_NAME, protocol=self.PROTOCOL,
protocol_port=self.PROTOCOL_PORT, loadbalancer_id=self.LB_ID)
assert isinstance(test_listener, listener.Listener)
cls.assertIs(cls.LISTENER_NAME, test_listener.name)
cls.LISTENER_ID = test_listener.id
cls.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
self.assertEqual(self.LISTENER_NAME, test_listener.name)
self.LISTENER_ID = test_listener.id
self.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
test_pool = cls.conn.load_balancer.create_pool(
name=cls.POOL_NAME, protocol=cls.PROTOCOL,
lb_algorithm=cls.LB_ALGORITHM, listener_id=cls.LISTENER_ID)
test_pool = self.conn.load_balancer.create_pool(
name=self.POOL_NAME, protocol=self.PROTOCOL,
lb_algorithm=self.LB_ALGORITHM, listener_id=self.LISTENER_ID)
assert isinstance(test_pool, pool.Pool)
cls.assertIs(cls.POOL_NAME, test_pool.name)
cls.POOL_ID = test_pool.id
cls.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
self.assertEqual(self.POOL_NAME, test_pool.name)
self.POOL_ID = test_pool.id
self.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
test_member = cls.conn.load_balancer.create_member(
pool=cls.POOL_ID, name=cls.MEMBER_NAME, address=cls.MEMBER_ADDRESS,
protocol_port=cls.PROTOCOL_PORT, weight=cls.WEIGHT)
test_member = self.conn.load_balancer.create_member(
pool=self.POOL_ID, name=self.MEMBER_NAME,
address=self.MEMBER_ADDRESS,
protocol_port=self.PROTOCOL_PORT, weight=self.WEIGHT)
assert isinstance(test_member, member.Member)
cls.assertIs(cls.MEMBER_NAME, test_member.name)
cls.MEMBER_ID = test_member.id
cls.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
self.assertEqual(self.MEMBER_NAME, test_member.name)
self.MEMBER_ID = test_member.id
self.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
test_hm = cls.conn.load_balancer.create_health_monitor(
pool_id=cls.POOL_ID, name=cls.HM_NAME, delay=cls.DELAY,
timeout=cls.TIMEOUT, max_retries=cls.MAX_RETRY, type=cls.HM_TYPE)
test_hm = self.conn.load_balancer.create_health_monitor(
pool_id=self.POOL_ID, name=self.HM_NAME, delay=self.DELAY,
timeout=self.TIMEOUT, max_retries=self.MAX_RETRY,
type=self.HM_TYPE)
assert isinstance(test_hm, health_monitor.HealthMonitor)
cls.assertIs(cls.HM_NAME, test_hm.name)
cls.HM_ID = test_hm.id
cls.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
self.assertEqual(self.HM_NAME, test_hm.name)
self.HM_ID = test_hm.id
self.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
test_l7policy = cls.conn.load_balancer.create_l7_policy(
listener_id=cls.LISTENER_ID, name=cls.L7POLICY_NAME,
action=cls.ACTION, redirect_url=cls.REDIRECT_URL)
test_l7policy = self.conn.load_balancer.create_l7_policy(
listener_id=self.LISTENER_ID, name=self.L7POLICY_NAME,
action=self.ACTION, redirect_url=self.REDIRECT_URL)
assert isinstance(test_l7policy, l7_policy.L7Policy)
cls.assertIs(cls.L7POLICY_NAME, test_l7policy.name)
cls.L7POLICY_ID = test_l7policy.id
cls.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
self.assertEqual(self.L7POLICY_NAME, test_l7policy.name)
self.L7POLICY_ID = test_l7policy.id
self.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
test_l7rule = cls.conn.load_balancer.create_l7_rule(
l7_policy=cls.L7POLICY_ID, compare_type=cls.COMPARE_TYPE,
type=cls.L7RULE_TYPE, value=cls.L7RULE_VALUE)
test_l7rule = self.conn.load_balancer.create_l7_rule(
l7_policy=self.L7POLICY_ID, compare_type=self.COMPARE_TYPE,
type=self.L7RULE_TYPE, value=self.L7RULE_VALUE)
assert isinstance(test_l7rule, l7_rule.L7Rule)
cls.assertIs(cls.COMPARE_TYPE, test_l7rule.compare_type)
cls.L7RULE_ID = test_l7rule.id
cls.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
self.assertEqual(self.COMPARE_TYPE, test_l7rule.compare_type)
self.L7RULE_ID = test_l7rule.id
self.lb_wait_for_status(test_lb, status='ACTIVE',
failures=['ERROR'])
@classmethod
def tearDownClass(cls):
test_lb = cls.conn.load_balancer.get_load_balancer(cls.LB_ID)
cls.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
def tearDown(self):
test_lb = self.conn.load_balancer.get_load_balancer(self.LB_ID)
self.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
cls.conn.load_balancer.delete_l7_rule(
cls.L7RULE_ID, l7_policy=cls.L7POLICY_ID, ignore_missing=False)
cls.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
self.conn.load_balancer.delete_l7_rule(
self.L7RULE_ID, l7_policy=self.L7POLICY_ID, ignore_missing=False)
self.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
cls.conn.load_balancer.delete_l7_policy(
cls.L7POLICY_ID, ignore_missing=False)
cls.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
self.conn.load_balancer.delete_l7_policy(
self.L7POLICY_ID, ignore_missing=False)
self.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
cls.conn.load_balancer.delete_health_monitor(
cls.HM_ID, ignore_missing=False)
cls.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
self.conn.load_balancer.delete_health_monitor(
self.HM_ID, ignore_missing=False)
self.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
cls.conn.load_balancer.delete_member(
cls.MEMBER_ID, cls.POOL_ID, ignore_missing=False)
cls.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
self.conn.load_balancer.delete_member(
self.MEMBER_ID, self.POOL_ID, ignore_missing=False)
self.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
cls.conn.load_balancer.delete_pool(cls.POOL_ID, ignore_missing=False)
cls.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
self.conn.load_balancer.delete_pool(self.POOL_ID, ignore_missing=False)
self.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
cls.conn.load_balancer.delete_listener(cls.LISTENER_ID,
ignore_missing=False)
cls.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
self.conn.load_balancer.delete_listener(self.LISTENER_ID,
ignore_missing=False)
self.lb_wait_for_status(test_lb, status='ACTIVE', failures=['ERROR'])
cls.conn.load_balancer.delete_load_balancer(
cls.LB_ID, ignore_missing=False)
self.conn.load_balancer.delete_load_balancer(
self.LB_ID, ignore_missing=False)
super(TestLoadBalancer, self).tearDown()
def test_lb_find(self):
test_lb = self.conn.load_balancer.find_load_balancer(self.LB_NAME)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import address_scope as _address_scope
from openstack.tests.functional import base
@ -19,27 +18,26 @@ from openstack.tests.functional import base
class TestAddressScope(base.BaseFunctionalTest):
ADDRESS_SCOPE_ID = None
ADDRESS_SCOPE_NAME = uuid.uuid4().hex
ADDRESS_SCOPE_NAME_UPDATED = uuid.uuid4().hex
IS_SHARED = False
IP_VERSION = 4
@classmethod
def setUpClass(cls):
super(TestAddressScope, cls).setUpClass()
address_scope = cls.conn.network.create_address_scope(
ip_version=cls.IP_VERSION,
name=cls.ADDRESS_SCOPE_NAME,
shared=cls.IS_SHARED,
def setUp(self):
super(TestAddressScope, self).setUp()
self.ADDRESS_SCOPE_NAME = self.getUniqueString()
self.ADDRESS_SCOPE_NAME_UPDATED = self.getUniqueString()
address_scope = self.conn.network.create_address_scope(
ip_version=self.IP_VERSION,
name=self.ADDRESS_SCOPE_NAME,
shared=self.IS_SHARED,
)
assert isinstance(address_scope, _address_scope.AddressScope)
cls.assertIs(cls.ADDRESS_SCOPE_NAME, address_scope.name)
cls.ADDRESS_SCOPE_ID = address_scope.id
self.assertEqual(self.ADDRESS_SCOPE_NAME, address_scope.name)
self.ADDRESS_SCOPE_ID = address_scope.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_address_scope(cls.ADDRESS_SCOPE_ID)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_address_scope(self.ADDRESS_SCOPE_ID)
self.assertIsNone(sot)
super(TestAddressScope, self).tearDown()
def test_find(self):
sot = self.conn.network.find_address_scope(self.ADDRESS_SCOPE_NAME)

View File

@ -28,12 +28,11 @@ class TestAgent(base.BaseFunctionalTest):
return False
return True
@classmethod
def setUpClass(cls):
super(TestAgent, cls).setUpClass()
agent_list = list(cls.conn.network.agents())
cls.AGENT = agent_list[0]
assert isinstance(cls.AGENT, agent.Agent)
def setUp(self):
super(TestAgent, self).setUp()
agent_list = list(self.conn.network.agents())
self.AGENT = agent_list[0]
assert isinstance(self.AGENT, agent.Agent)
def test_list(self):
agent_list = list(self.conn.network.agents())

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.tests.functional import base
@ -18,34 +17,29 @@ from openstack.tests.functional import base
class TestAgentNetworks(base.BaseFunctionalTest):
NETWORK_NAME = 'network-' + uuid.uuid4().hex
NETWORK_ID = None
AGENT = None
AGENT_ID = None
@classmethod
def setUpClass(cls):
super(TestAgentNetworks, cls).setUpClass()
def setUp(self):
super(TestAgentNetworks, self).setUp()
net = cls.conn.network.create_network(name=cls.NETWORK_NAME)
self.NETWORK_NAME = self.getUniqueString('network')
net = self.conn.network.create_network(name=self.NETWORK_NAME)
self.addCleanup(self.conn.network.delete_network, net.id)
assert isinstance(net, network.Network)
cls.NETWORK_ID = net.id
agent_list = list(cls.conn.network.agents())
self.NETWORK_ID = net.id
agent_list = list(self.conn.network.agents())
agents = [agent for agent in agent_list
if agent.agent_type == 'DHCP agent']
cls.AGENT = agents[0]
cls.AGENT_ID = cls.AGENT.id
self.AGENT = agents[0]
self.AGENT_ID = self.AGENT.id
@classmethod
def tearDownClass(cls):
cls.conn.network.delete_network(cls.NETWORK_ID)
def test_add_agent_to_network(self):
def test_add_remove_agent(self):
net = self.AGENT.add_agent_to_network(self.conn.session,
network_id=self.NETWORK_ID)
self._verify_add(net)
def test_remove_agent_from_network(self):
net = self.AGENT.remove_agent_from_network(self.conn.session,
network_id=self.NETWORK_ID)
self._verify_remove(net)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import router
from openstack.tests.functional import base
@ -18,24 +17,20 @@ from openstack.tests.functional import base
class TestAgentRouters(base.BaseFunctionalTest):
ROUTER_NAME = 'router-name-' + uuid.uuid4().hex
ROUTER = None
AGENT = None
@classmethod
def setUpClass(cls):
super(TestAgentRouters, cls).setUpClass()
def setUp(self):
super(TestAgentRouters, self).setUp()
cls.ROUTER = cls.conn.network.create_router(name=cls.ROUTER_NAME)
assert isinstance(cls.ROUTER, router.Router)
agent_list = list(cls.conn.network.agents())
self.ROUTER_NAME = 'router-name-' + self.getUniqueString('router-name')
self.ROUTER = self.conn.network.create_router(name=self.ROUTER_NAME)
self.addCleanup(self.conn.network.delete_router, self.ROUTER)
assert isinstance(self.ROUTER, router.Router)
agent_list = list(self.conn.network.agents())
agents = [agent for agent in agent_list
if agent.agent_type == 'L3 agent']
cls.AGENT = agents[0]
@classmethod
def tearDownClass(cls):
cls.conn.network.delete_router(cls.ROUTER)
self.AGENT = agents[0]
def test_add_router_to_agent(self):
self.conn.network.add_router_to_agent(self.AGENT, self.ROUTER)

View File

@ -19,16 +19,15 @@ class TestAutoAllocatedTopology(base.BaseFunctionalTest):
NETWORK_ID = None
PROJECT_ID = None
@classmethod
def setUpClass(cls):
super(TestAutoAllocatedTopology, cls).setUpClass()
projects = [o.project_id for o in cls.conn.network.networks()]
cls.PROJECT_ID = projects[0]
def setUp(self):
super(TestAutoAllocatedTopology, self).setUp()
projects = [o.project_id for o in self.conn.network.networks()]
self.PROJECT_ID = projects[0]
@classmethod
def tearDownClass(cls):
res = cls.conn.network.delete_auto_allocated_topology(cls.PROJECT_ID)
cls.assertIs(None, res)
def tearDown(self):
res = self.conn.network.delete_auto_allocated_topology(self.PROJECT_ID)
self.assertIsNone(res)
super(TestAutoAllocatedTopology, self).tearDown()
def test_dry_run_option_pass(self):
# Dry run will only pass if there is a public network

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import router
from openstack.tests.functional import base
@ -18,22 +17,21 @@ from openstack.tests.functional import base
class TestDVRRouter(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
UPDATE_NAME = uuid.uuid4().hex
ID = None
@classmethod
def setUpClass(cls):
super(TestDVRRouter, cls).setUpClass()
sot = cls.conn.network.create_router(name=cls.NAME, distributed=True)
def setUp(self):
super(TestDVRRouter, self).setUp()
self.NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
sot = self.conn.network.create_router(name=self.NAME, distributed=True)
assert isinstance(sot, router.Router)
cls.assertIs(cls.NAME, sot.name)
cls.ID = sot.id
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_router(cls.ID, ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_router(self.ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestDVRRouter, self).tearDown()
def test_find(self):
sot = self.conn.network.find_router(self.NAME)

View File

@ -10,15 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import flavor
from openstack.tests.functional import base
class TestFlavor(base.BaseFunctionalTest):
FLAVOR_NAME = uuid.uuid4().hex
UPDATE_NAME = "UPDATED-NAME"
SERVICE_TYPE = "FLAVORS"
ID = None
@ -26,29 +23,30 @@ class TestFlavor(base.BaseFunctionalTest):
SERVICE_PROFILE_DESCRIPTION = "DESCRIPTION"
METAINFO = "FlAVOR_PROFILE_METAINFO"
@classmethod
def setUpClass(cls):
super(TestFlavor, cls).setUpClass()
flavors = cls.conn.network.create_flavor(name=cls.FLAVOR_NAME,
service_type=cls.SERVICE_TYPE)
def setUp(self):
super(TestFlavor, self).setUp()
self.FLAVOR_NAME = self.getUniqueString('flavor')
flavors = self.conn.network.create_flavor(
name=self.FLAVOR_NAME,
service_type=self.SERVICE_TYPE)
assert isinstance(flavors, flavor.Flavor)
cls.assertIs(cls.FLAVOR_NAME, flavors.name)
cls.assertIs(cls.SERVICE_TYPE, flavors.service_type)
self.assertEqual(self.FLAVOR_NAME, flavors.name)
self.assertEqual(self.SERVICE_TYPE, flavors.service_type)
cls.ID = flavors.id
self.ID = flavors.id
cls.service_profiles = cls.conn.network.create_service_profile(
description=cls.SERVICE_PROFILE_DESCRIPTION,
metainfo=cls.METAINFO,)
self.service_profiles = self.conn.network.create_service_profile(
description=self.SERVICE_PROFILE_DESCRIPTION,
metainfo=self.METAINFO,)
@classmethod
def tearDownClass(cls):
flavors = cls.conn.network.delete_flavor(cls.ID, ignore_missing=True)
cls.assertIs(None, flavors)
def tearDown(self):
flavors = self.conn.network.delete_flavor(self.ID, ignore_missing=True)
self.assertIsNone(flavors)
service_profiles = cls.conn.network.delete_service_profile(
cls.ID, ignore_missing=True)
cls.assertIs(None, service_profiles)
service_profiles = self.conn.network.delete_service_profile(
self.ID, ignore_missing=True)
self.assertIsNone(service_profiles)
super(TestFlavor, self).tearDown()
def test_find(self):
flavors = self.conn.network.find_flavor(self.FLAVOR_NAME)
@ -68,13 +66,12 @@ class TestFlavor(base.BaseFunctionalTest):
name=self.UPDATE_NAME)
self.assertEqual(self.UPDATE_NAME, flavor.name)
def test_associate_flavor_with_service_profile(self):
def test_associate_disassociate_flavor_with_service_profile(self):
response = \
self.conn.network.associate_flavor_with_service_profile(
self.ID, self.service_profiles.id)
self.assertIsNotNone(response)
def test_disassociate_flavor_from_service_profile(self):
response = \
self.conn.network.disassociate_flavor_from_service_profile(
self.ID, self.service_profiles.id)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import floating_ip
from openstack.network.v2 import network
@ -22,11 +21,6 @@ from openstack.tests.functional import base
class TestFloatingIP(base.BaseFunctionalTest):
ROT_NAME = uuid.uuid4().hex
EXT_NET_NAME = uuid.uuid4().hex
EXT_SUB_NAME = uuid.uuid4().hex
INT_NET_NAME = uuid.uuid4().hex
INT_SUB_NAME = uuid.uuid4().hex
IPV4 = 4
EXT_CIDR = "10.100.0.0/24"
INT_CIDR = "10.101.0.0/24"
@ -38,84 +32,89 @@ class TestFloatingIP(base.BaseFunctionalTest):
PORT_ID = None
FIP = None
@classmethod
def setUpClass(cls):
super(TestFloatingIP, cls).setUpClass()
def setUp(self):
super(TestFloatingIP, self).setUp()
self.ROT_NAME = self.getUniqueString()
self.EXT_NET_NAME = self.getUniqueString()
self.EXT_SUB_NAME = self.getUniqueString()
self.INT_NET_NAME = self.getUniqueString()
self.INT_SUB_NAME = self.getUniqueString()
# Create Exeternal Network
args = {'router:external': True}
net = cls._create_network(cls.EXT_NET_NAME, **args)
cls.EXT_NET_ID = net.id
sub = cls._create_subnet(cls.EXT_SUB_NAME, cls.EXT_NET_ID,
cls.EXT_CIDR)
cls.EXT_SUB_ID = sub.id
net = self._create_network(self.EXT_NET_NAME, **args)
self.EXT_NET_ID = net.id
sub = self._create_subnet(
self.EXT_SUB_NAME, self.EXT_NET_ID, self.EXT_CIDR)
self.EXT_SUB_ID = sub.id
# Create Internal Network
net = cls._create_network(cls.INT_NET_NAME)
cls.INT_NET_ID = net.id
sub = cls._create_subnet(cls.INT_SUB_NAME, cls.INT_NET_ID,
cls.INT_CIDR)
cls.INT_SUB_ID = sub.id
net = self._create_network(self.INT_NET_NAME)
self.INT_NET_ID = net.id
sub = self._create_subnet(
self.INT_SUB_NAME, self.INT_NET_ID, self.INT_CIDR)
self.INT_SUB_ID = sub.id
# Create Router
args = {'external_gateway_info': {'network_id': cls.EXT_NET_ID}}
sot = cls.conn.network.create_router(name=cls.ROT_NAME, **args)
args = {'external_gateway_info': {'network_id': self.EXT_NET_ID}}
sot = self.conn.network.create_router(name=self.ROT_NAME, **args)
assert isinstance(sot, router.Router)
cls.assertIs(cls.ROT_NAME, sot.name)
cls.ROT_ID = sot.id
cls.ROT = sot
self.assertEqual(self.ROT_NAME, sot.name)
self.ROT_ID = sot.id
self.ROT = sot
# Add Router's Interface to Internal Network
sot = cls.ROT.add_interface(cls.conn.session, subnet_id=cls.INT_SUB_ID)
cls.assertIs(sot['subnet_id'], cls.INT_SUB_ID)
sot = self.ROT.add_interface(
self.conn.session, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
# Create Port in Internal Network
prt = cls.conn.network.create_port(network_id=cls.INT_NET_ID)
prt = self.conn.network.create_port(network_id=self.INT_NET_ID)
assert isinstance(prt, port.Port)
cls.PORT_ID = prt.id
self.PORT_ID = prt.id
# Create Floating IP.
fip = cls.conn.network.create_ip(floating_network_id=cls.EXT_NET_ID)
fip = self.conn.network.create_ip(floating_network_id=self.EXT_NET_ID)
assert isinstance(fip, floating_ip.FloatingIP)
cls.FIP = fip
self.FIP = fip
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_ip(cls.FIP.id, ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_port(cls.PORT_ID, ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.ROT.remove_interface(cls.conn.session,
subnet_id=cls.INT_SUB_ID)
cls.assertIs(sot['subnet_id'], cls.INT_SUB_ID)
sot = cls.conn.network.delete_router(cls.ROT_ID, ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_subnet(cls.EXT_SUB_ID,
ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_network(cls.EXT_NET_ID,
ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_subnet(cls.INT_SUB_ID,
ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_network(cls.INT_NET_ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_ip(self.FIP.id, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_port(self.PORT_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.ROT.remove_interface(
self.conn.session, subnet_id=self.INT_SUB_ID)
self.assertEqual(sot['subnet_id'], self.INT_SUB_ID)
sot = self.conn.network.delete_router(
self.ROT_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.EXT_SUB_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.EXT_NET_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.INT_SUB_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.INT_NET_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestFloatingIP, self).tearDown()
@classmethod
def _create_network(cls, name, **args):
cls.name = name
net = cls.conn.network.create_network(name=name, **args)
def _create_network(self, name, **args):
self.name = name
net = self.conn.network.create_network(name=name, **args)
assert isinstance(net, network.Network)
cls.assertIs(cls.name, net.name)
self.assertEqual(self.name, net.name)
return net
@classmethod
def _create_subnet(cls, name, net_id, cidr):
cls.name = name
cls.net_id = net_id
cls.cidr = cidr
sub = cls.conn.network.create_subnet(name=cls.name,
ip_version=cls.IPV4,
network_id=cls.net_id,
cidr=cls.cidr)
def _create_subnet(self, name, net_id, cidr):
self.name = name
self.net_id = net_id
self.cidr = cidr
sub = self.conn.network.create_subnet(
name=self.name,
ip_version=self.IPV4,
network_id=self.net_id,
cidr=self.cidr)
assert isinstance(sub, subnet.Subnet)
cls.assertIs(cls.name, sub.name)
self.assertEqual(self.name, sub.name)
return sub
def test_find_by_id(self):

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.tests.functional import base
@ -40,21 +39,20 @@ def delete_network(conn, network, subnet):
class TestNetwork(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
ID = None
@classmethod
def setUpClass(cls):
super(TestNetwork, cls).setUpClass()
sot = cls.conn.network.create_network(name=cls.NAME)
def setUp(self):
super(TestNetwork, self).setUp()
self.NAME = self.getUniqueString()
sot = self.conn.network.create_network(name=self.NAME)
assert isinstance(sot, network.Network)
cls.assertIs(cls.NAME, sot.name)
cls.ID = sot.id
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_network(cls.ID, ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_network(self.ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestNetwork, self).tearDown()
def test_find(self):
sot = self.conn.network.find_network(self.NAME)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.network.v2 import port
@ -20,44 +19,45 @@ from openstack.tests.functional import base
class TestNetworkIPAvailability(base.BaseFunctionalTest):
NET_NAME = uuid.uuid4().hex
SUB_NAME = uuid.uuid4().hex
PORT_NAME = uuid.uuid4().hex
UPDATE_NAME = uuid.uuid4().hex
IPV4 = 4
CIDR = "10.100.0.0/24"
NET_ID = None
SUB_ID = None
PORT_ID = None
@classmethod
def setUpClass(cls):
super(TestNetworkIPAvailability, cls).setUpClass()
net = cls.conn.network.create_network(name=cls.NET_NAME)
def setUp(self):
super(TestNetworkIPAvailability, self).setUp()
self.NET_NAME = self.getUniqueString()
self.SUB_NAME = self.getUniqueString()
self.PORT_NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
cls.assertIs(cls.NET_NAME, net.name)
cls.NET_ID = net.id
sub = cls.conn.network.create_subnet(name=cls.SUB_NAME,
ip_version=cls.IPV4,
network_id=cls.NET_ID,
cidr=cls.CIDR)
self.assertEqual(self.NET_NAME, net.name)
self.NET_ID = net.id
sub = self.conn.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=self.NET_ID,
cidr=self.CIDR)
assert isinstance(sub, subnet.Subnet)
cls.assertIs(cls.SUB_NAME, sub.name)
cls.SUB_ID = sub.id
prt = cls.conn.network.create_port(name=cls.PORT_NAME,
network_id=cls.NET_ID)
self.assertEqual(self.SUB_NAME, sub.name)
self.SUB_ID = sub.id
prt = self.conn.network.create_port(
name=self.PORT_NAME,
network_id=self.NET_ID)
assert isinstance(prt, port.Port)
cls.assertIs(cls.PORT_NAME, prt.name)
cls.PORT_ID = prt.id
self.assertEqual(self.PORT_NAME, prt.name)
self.PORT_ID = prt.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_port(cls.PORT_ID)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_subnet(cls.SUB_ID)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_network(cls.NET_ID)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_port(self.PORT_ID)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(self.SUB_ID)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(self.NET_ID)
self.assertIsNone(sot)
super(TestNetworkIPAvailability, self).tearDown()
def test_find(self):
sot = self.conn.network.find_network_ip_availability(self.NET_ID)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.network.v2 import port
@ -20,44 +19,48 @@ from openstack.tests.functional import base
class TestPort(base.BaseFunctionalTest):
NET_NAME = uuid.uuid4().hex
SUB_NAME = uuid.uuid4().hex
PORT_NAME = uuid.uuid4().hex
UPDATE_NAME = uuid.uuid4().hex
IPV4 = 4
CIDR = "10.100.0.0/24"
NET_ID = None
SUB_ID = None
PORT_ID = None
@classmethod
def setUpClass(cls):
super(TestPort, cls).setUpClass()
net = cls.conn.network.create_network(name=cls.NET_NAME)
def setUp(self):
super(TestPort, self).setUp()
self.NET_NAME = self.getUniqueString()
self.SUB_NAME = self.getUniqueString()
self.PORT_NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
cls.assertIs(cls.NET_NAME, net.name)
cls.NET_ID = net.id
sub = cls.conn.network.create_subnet(name=cls.SUB_NAME,
ip_version=cls.IPV4,
network_id=cls.NET_ID,
cidr=cls.CIDR)
self.assertEqual(self.NET_NAME, net.name)
self.NET_ID = net.id
sub = self.conn.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=self.NET_ID,
cidr=self.CIDR)
assert isinstance(sub, subnet.Subnet)
cls.assertIs(cls.SUB_NAME, sub.name)
cls.SUB_ID = sub.id
prt = cls.conn.network.create_port(name=cls.PORT_NAME,
network_id=cls.NET_ID)
self.assertEqual(self.SUB_NAME, sub.name)
self.SUB_ID = sub.id
prt = self.conn.network.create_port(
name=self.PORT_NAME,
network_id=self.NET_ID)
assert isinstance(prt, port.Port)
cls.assertIs(cls.PORT_NAME, prt.name)
cls.PORT_ID = prt.id
self.assertEqual(self.PORT_NAME, prt.name)
self.PORT_ID = prt.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_port(cls.PORT_ID, ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_subnet(cls.SUB_ID, ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_network(cls.NET_ID, ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_port(
self.PORT_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.SUB_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestPort, self).tearDown()
def test_find(self):
sot = self.conn.network.find_port(self.PORT_NAME)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import (qos_bandwidth_limit_rule as
_qos_bandwidth_limit_rule)
@ -20,10 +19,8 @@ from openstack.tests.functional import base
class TestQoSBandwidthLimitRule(base.BaseFunctionalTest):
QOS_POLICY_ID = None
QOS_POLICY_NAME = uuid.uuid4().hex
QOS_IS_SHARED = False
QOS_POLICY_DESCRIPTION = "QoS policy description"
RULE_ID = uuid.uuid4().hex
RULE_MAX_KBPS = 1500
RULE_MAX_KBPS_NEW = 1800
RULE_MAX_BURST_KBPS = 1100
@ -31,35 +28,36 @@ class TestQoSBandwidthLimitRule(base.BaseFunctionalTest):
RULE_DIRECTION = 'egress'
RULE_DIRECTION_NEW = 'ingress'
@classmethod
def setUpClass(cls):
super(TestQoSBandwidthLimitRule, cls).setUpClass()
qos_policy = cls.conn.network.create_qos_policy(
description=cls.QOS_POLICY_DESCRIPTION,
name=cls.QOS_POLICY_NAME,
shared=cls.QOS_IS_SHARED,
def setUp(self):
super(TestQoSBandwidthLimitRule, self).setUp()
self.QOS_POLICY_NAME = self.getUniqueString()
self.RULE_ID = self.getUniqueString()
qos_policy = self.conn.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.QOS_IS_SHARED,
)
cls.QOS_POLICY_ID = qos_policy.id
qos_rule = cls.conn.network.create_qos_bandwidth_limit_rule(
cls.QOS_POLICY_ID, max_kbps=cls.RULE_MAX_KBPS,
max_burst_kbps=cls.RULE_MAX_BURST_KBPS,
direction=cls.RULE_DIRECTION,
self.QOS_POLICY_ID = qos_policy.id
qos_rule = self.conn.network.create_qos_bandwidth_limit_rule(
self.QOS_POLICY_ID, max_kbps=self.RULE_MAX_KBPS,
max_burst_kbps=self.RULE_MAX_BURST_KBPS,
direction=self.RULE_DIRECTION,
)
assert isinstance(qos_rule,
_qos_bandwidth_limit_rule.QoSBandwidthLimitRule)
cls.assertIs(cls.RULE_MAX_KBPS, qos_rule.max_kbps)
cls.assertIs(cls.RULE_MAX_BURST_KBPS, qos_rule.max_burst_kbps)
cls.assertIs(cls.RULE_DIRECTION, qos_rule.direction)
cls.RULE_ID = qos_rule.id
self.assertEqual(self.RULE_MAX_KBPS, qos_rule.max_kbps)
self.assertEqual(self.RULE_MAX_BURST_KBPS, qos_rule.max_burst_kbps)
self.assertEqual(self.RULE_DIRECTION, qos_rule.direction)
self.RULE_ID = qos_rule.id
@classmethod
def tearDownClass(cls):
rule = cls.conn.network.delete_qos_minimum_bandwidth_rule(
cls.RULE_ID,
cls.QOS_POLICY_ID)
qos_policy = cls.conn.network.delete_qos_policy(cls.QOS_POLICY_ID)
cls.assertIs(None, rule)
cls.assertIs(None, qos_policy)
def tearDown(self):
rule = self.conn.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
qos_policy = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
super(TestQoSBandwidthLimitRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_bandwidth_limit_rule(

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import (qos_dscp_marking_rule as
_qos_dscp_marking_rule)
@ -20,37 +19,36 @@ from openstack.tests.functional import base
class TestQoSDSCPMarkingRule(base.BaseFunctionalTest):
QOS_POLICY_ID = None
QOS_POLICY_NAME = uuid.uuid4().hex
QOS_IS_SHARED = False
QOS_POLICY_DESCRIPTION = "QoS policy description"
RULE_ID = uuid.uuid4().hex
RULE_DSCP_MARK = 36
RULE_DSCP_MARK_NEW = 40
@classmethod
def setUpClass(cls):
super(TestQoSDSCPMarkingRule, cls).setUpClass()
qos_policy = cls.conn.network.create_qos_policy(
description=cls.QOS_POLICY_DESCRIPTION,
name=cls.QOS_POLICY_NAME,
shared=cls.QOS_IS_SHARED,
def setUp(self):
super(TestQoSDSCPMarkingRule, self).setUp()
self.QOS_POLICY_NAME = self.getUniqueString()
self.RULE_ID = self.getUniqueString()
qos_policy = self.conn.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.QOS_IS_SHARED,
)
cls.QOS_POLICY_ID = qos_policy.id
qos_rule = cls.conn.network.create_qos_dscp_marking_rule(
cls.QOS_POLICY_ID, dscp_mark=cls.RULE_DSCP_MARK,
self.QOS_POLICY_ID = qos_policy.id
qos_rule = self.conn.network.create_qos_dscp_marking_rule(
self.QOS_POLICY_ID, dscp_mark=self.RULE_DSCP_MARK,
)
assert isinstance(qos_rule, _qos_dscp_marking_rule.QoSDSCPMarkingRule)
cls.assertIs(cls.RULE_DSCP_MARK, qos_rule.dscp_mark)
cls.RULE_ID = qos_rule.id
self.assertEqual(self.RULE_DSCP_MARK, qos_rule.dscp_mark)
self.RULE_ID = qos_rule.id
@classmethod
def tearDownClass(cls):
rule = cls.conn.network.delete_qos_minimum_bandwidth_rule(
cls.RULE_ID,
cls.QOS_POLICY_ID)
qos_policy = cls.conn.network.delete_qos_policy(cls.QOS_POLICY_ID)
cls.assertIs(None, rule)
cls.assertIs(None, qos_policy)
def tearDown(self):
rule = self.conn.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
qos_policy = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
super(TestQoSDSCPMarkingRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_dscp_marking_rule(

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import (qos_minimum_bandwidth_rule as
_qos_minimum_bandwidth_rule)
@ -20,7 +19,6 @@ from openstack.tests.functional import base
class TestQoSMinimumBandwidthRule(base.BaseFunctionalTest):
QOS_POLICY_ID = None
QOS_POLICY_NAME = uuid.uuid4().hex
QOS_IS_SHARED = False
QOS_POLICY_DESCRIPTION = "QoS policy description"
RULE_ID = None
@ -28,33 +26,33 @@ class TestQoSMinimumBandwidthRule(base.BaseFunctionalTest):
RULE_MIN_KBPS_NEW = 1800
RULE_DIRECTION = 'egress'
@classmethod
def setUpClass(cls):
super(TestQoSMinimumBandwidthRule, cls).setUpClass()
qos_policy = cls.conn.network.create_qos_policy(
description=cls.QOS_POLICY_DESCRIPTION,
name=cls.QOS_POLICY_NAME,
shared=cls.QOS_IS_SHARED,
def setUp(self):
super(TestQoSMinimumBandwidthRule, self).setUp()
self.QOS_POLICY_NAME = self.getUniqueString()
qos_policy = self.conn.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.QOS_IS_SHARED,
)
cls.QOS_POLICY_ID = qos_policy.id
qos_min_bw_rule = cls.conn.network.create_qos_minimum_bandwidth_rule(
cls.QOS_POLICY_ID, direction=cls.RULE_DIRECTION,
min_kbps=cls.RULE_MIN_KBPS,
self.QOS_POLICY_ID = qos_policy.id
qos_min_bw_rule = self.conn.network.create_qos_minimum_bandwidth_rule(
self.QOS_POLICY_ID, direction=self.RULE_DIRECTION,
min_kbps=self.RULE_MIN_KBPS,
)
assert isinstance(qos_min_bw_rule,
_qos_minimum_bandwidth_rule.QoSMinimumBandwidthRule)
cls.assertIs(cls.RULE_MIN_KBPS, qos_min_bw_rule.min_kbps)
cls.assertIs(cls.RULE_DIRECTION, qos_min_bw_rule.direction)
cls.RULE_ID = qos_min_bw_rule.id
self.assertEqual(self.RULE_MIN_KBPS, qos_min_bw_rule.min_kbps)
self.assertEqual(self.RULE_DIRECTION, qos_min_bw_rule.direction)
self.RULE_ID = qos_min_bw_rule.id
@classmethod
def tearDownClass(cls):
rule = cls.conn.network.delete_qos_minimum_bandwidth_rule(
cls.RULE_ID,
cls.QOS_POLICY_ID)
qos_policy = cls.conn.network.delete_qos_policy(cls.QOS_POLICY_ID)
cls.assertIs(None, rule)
cls.assertIs(None, qos_policy)
def tearDown(self):
rule = self.conn.network.delete_qos_minimum_bandwidth_rule(
self.RULE_ID,
self.QOS_POLICY_ID)
qos_policy = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
self.assertIsNone(rule)
self.assertIsNone(qos_policy)
super(TestQoSMinimumBandwidthRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_minimum_bandwidth_rule(

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import qos_policy as _qos_policy
from openstack.tests.functional import base
@ -19,30 +18,29 @@ from openstack.tests.functional import base
class TestQoSPolicy(base.BaseFunctionalTest):
QOS_POLICY_ID = None
QOS_POLICY_NAME = uuid.uuid4().hex
QOS_POLICY_NAME_UPDATED = uuid.uuid4().hex
IS_SHARED = False
IS_DEFAULT = False
RULES = []
QOS_POLICY_DESCRIPTION = "QoS policy description"
@classmethod
def setUpClass(cls):
super(TestQoSPolicy, cls).setUpClass()
qos = cls.conn.network.create_qos_policy(
description=cls.QOS_POLICY_DESCRIPTION,
name=cls.QOS_POLICY_NAME,
shared=cls.IS_SHARED,
is_default=cls.IS_DEFAULT,
def setUp(self):
super(TestQoSPolicy, self).setUp()
self.QOS_POLICY_NAME = self.getUniqueString()
self.QOS_POLICY_NAME_UPDATED = self.getUniqueString()
qos = self.conn.network.create_qos_policy(
description=self.QOS_POLICY_DESCRIPTION,
name=self.QOS_POLICY_NAME,
shared=self.IS_SHARED,
is_default=self.IS_DEFAULT,
)
assert isinstance(qos, _qos_policy.QoSPolicy)
cls.assertIs(cls.QOS_POLICY_NAME, qos.name)
cls.QOS_POLICY_ID = qos.id
self.assertEqual(self.QOS_POLICY_NAME, qos.name)
self.QOS_POLICY_ID = qos.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_qos_policy(cls.QOS_POLICY_ID)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_qos_policy(self.QOS_POLICY_ID)
self.assertIsNone(sot)
super(TestQoSPolicy, self).tearDown()
def test_find(self):
sot = self.conn.network.find_qos_policy(self.QOS_POLICY_NAME)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.network.v2 import rbac_policy
@ -19,37 +18,38 @@ from openstack.tests.functional import base
class TestRBACPolicy(base.BaseFunctionalTest):
NET_NAME = 'net-' + uuid.uuid4().hex
UPDATE_NAME = uuid.uuid4().hex
ACTION = 'access_as_shared'
OBJ_TYPE = 'network'
TARGET_TENANT_ID = '*'
NET_ID = None
ID = None
@classmethod
def setUpClass(cls):
super(TestRBACPolicy, cls).setUpClass()
net = cls.conn.network.create_network(name=cls.NET_NAME)
def setUp(self):
super(TestRBACPolicy, self).setUp()
self.NET_NAME = self.getUniqueString('net')
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
cls.NET_ID = net.id
self.NET_ID = net.id
sot = cls.conn.network.\
create_rbac_policy(action=cls.ACTION,
object_type=cls.OBJ_TYPE,
target_tenant=cls.TARGET_TENANT_ID,
object_id=cls.NET_ID)
sot = self.conn.network.create_rbac_policy(
action=self.ACTION,
object_type=self.OBJ_TYPE,
target_tenant=self.TARGET_TENANT_ID,
object_id=self.NET_ID)
assert isinstance(sot, rbac_policy.RBACPolicy)
cls.ID = sot.id
self.ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_rbac_policy(cls.ID,
ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_network(cls.NET_ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_rbac_policy(
self.ID,
ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID,
ignore_missing=False)
self.assertIsNone(sot)
super(TestRBACPolicy, self).tearDown()
def test_find(self):
sot = self.conn.network.find_rbac_policy(self.ID)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import router
from openstack.tests.functional import base
@ -18,22 +17,21 @@ from openstack.tests.functional import base
class TestRouter(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
UPDATE_NAME = uuid.uuid4().hex
ID = None
@classmethod
def setUpClass(cls):
super(TestRouter, cls).setUpClass()
sot = cls.conn.network.create_router(name=cls.NAME)
def setUp(self):
super(TestRouter, self).setUp()
self.NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
sot = self.conn.network.create_router(name=self.NAME)
assert isinstance(sot, router.Router)
cls.assertIs(cls.NAME, sot.name)
cls.ID = sot.id
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_router(cls.ID, ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_router(self.ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestRouter, self).tearDown()
def test_find(self):
sot = self.conn.network.find_router(self.NAME)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.network.v2 import router
@ -20,9 +19,6 @@ from openstack.tests.functional import base
class TestRouterInterface(base.BaseFunctionalTest):
ROUTER_NAME = uuid.uuid4().hex
NET_NAME = uuid.uuid4().hex
SUB_NAME = uuid.uuid4().hex
CIDR = "10.100.0.0/16"
IPV4 = 4
ROUTER_ID = None
@ -30,42 +26,45 @@ class TestRouterInterface(base.BaseFunctionalTest):
SUB_ID = None
ROT = None
@classmethod
def setUpClass(cls):
super(TestRouterInterface, cls).setUpClass()
sot = cls.conn.network.create_router(name=cls.ROUTER_NAME)
def setUp(self):
super(TestRouterInterface, self).setUp()
self.ROUTER_NAME = self.getUniqueString()
self.NET_NAME = self.getUniqueString()
self.SUB_NAME = self.getUniqueString()
sot = self.conn.network.create_router(name=self.ROUTER_NAME)
assert isinstance(sot, router.Router)
cls.assertIs(cls.ROUTER_NAME, sot.name)
net = cls.conn.network.create_network(name=cls.NET_NAME)
self.assertEqual(self.ROUTER_NAME, sot.name)
net = self.conn.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
cls.assertIs(cls.NET_NAME, net.name)
sub = cls.conn.network.create_subnet(name=cls.SUB_NAME,
ip_version=cls.IPV4,
network_id=net.id,
cidr=cls.CIDR)
self.assertEqual(self.NET_NAME, net.name)
sub = self.conn.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=net.id,
cidr=self.CIDR)
assert isinstance(sub, subnet.Subnet)
cls.assertIs(cls.SUB_NAME, sub.name)
cls.ROUTER_ID = sot.id
cls.ROT = sot
cls.NET_ID = net.id
cls.SUB_ID = sub.id
self.assertEqual(self.SUB_NAME, sub.name)
self.ROUTER_ID = sot.id
self.ROT = sot
self.NET_ID = net.id
self.SUB_ID = sub.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_router(cls.ROUTER_ID,
ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_subnet(cls.SUB_ID, ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_network(cls.NET_ID, ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_router(
self.ROUTER_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_subnet(
self.SUB_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestRouterInterface, self).tearDown()
def test_router_add_interface(self):
def test_router_add_remove_interface(self):
iface = self.ROT.add_interface(self.conn.session,
subnet_id=self.SUB_ID)
self._verification(iface)
def test_router_remove_interface(self):
iface = self.ROT.remove_interface(self.conn.session,
subnet_id=self.SUB_ID)
self._verification(iface)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import security_group
from openstack.tests.functional import base
@ -18,22 +17,21 @@ from openstack.tests.functional import base
class TestSecurityGroup(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
ID = None
@classmethod
def setUpClass(cls):
super(TestSecurityGroup, cls).setUpClass()
sot = cls.conn.network.create_security_group(name=cls.NAME)
def setUp(self):
super(TestSecurityGroup, self).setUp()
self.NAME = self.getUniqueString()
sot = self.conn.network.create_security_group(name=self.NAME)
assert isinstance(sot, security_group.SecurityGroup)
cls.assertIs(cls.NAME, sot.name)
cls.ID = sot.id
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_security_group(cls.ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_security_group(
self.ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestSecurityGroup, self).tearDown()
def test_find(self):
sot = self.conn.network.find_security_group(self.NAME)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import security_group
from openstack.network.v2 import security_group_rule
@ -19,7 +18,6 @@ from openstack.tests.functional import base
class TestSecurityGroupRule(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
IPV4 = 'IPv4'
PROTO = 'tcp'
PORT = 22
@ -27,29 +25,29 @@ class TestSecurityGroupRule(base.BaseFunctionalTest):
ID = None
RULE_ID = None
@classmethod
def setUpClass(cls):
super(TestSecurityGroupRule, cls).setUpClass()
sot = cls.conn.network.create_security_group(name=cls.NAME)
def setUp(self):
super(TestSecurityGroupRule, self).setUp()
self.NAME = self.getUniqueString()
sot = self.conn.network.create_security_group(name=self.NAME)
assert isinstance(sot, security_group.SecurityGroup)
cls.assertIs(cls.NAME, sot.name)
cls.ID = sot.id
rul = cls.conn.network.create_security_group_rule(
direction=cls.DIR, ethertype=cls.IPV4,
port_range_max=cls.PORT, port_range_min=cls.PORT,
protocol=cls.PROTO, security_group_id=cls.ID)
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
rul = self.conn.network.create_security_group_rule(
direction=self.DIR, ethertype=self.IPV4,
port_range_max=self.PORT, port_range_min=self.PORT,
protocol=self.PROTO, security_group_id=self.ID)
assert isinstance(rul, security_group_rule.SecurityGroupRule)
cls.assertIs(cls.ID, rul.security_group_id)
cls.RULE_ID = rul.id
self.assertEqual(self.ID, rul.security_group_id)
self.RULE_ID = rul.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_security_group_rule(cls.RULE_ID,
ignore_missing=False)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_security_group(cls.ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_security_group_rule(
self.RULE_ID, ignore_missing=False)
self.assertIsNone(sot)
sot = self.conn.network.delete_security_group(
self.ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestSecurityGroupRule, self).tearDown()
def test_find(self):
sot = self.conn.network.find_security_group_rule(self.RULE_ID)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.network.v2 import segment
@ -19,7 +18,6 @@ from openstack.tests.functional import base
class TestSegment(base.BaseFunctionalTest):
NETWORK_NAME = uuid.uuid4().hex
NETWORK_TYPE = None
PHYSICAL_NETWORK = None
SEGMENTATION_ID = None
@ -27,88 +25,75 @@ class TestSegment(base.BaseFunctionalTest):
SEGMENT_ID = None
SEGMENT_EXTENSION = None
@classmethod
def setUpClass(cls):
super(TestSegment, cls).setUpClass()
def setUp(self):
super(TestSegment, self).setUp()
self.NETWORK_NAME = self.getUniqueString()
# NOTE(rtheis): The segment extension is not yet enabled by default.
# Skip the tests if not enabled.
cls.SEGMENT_EXTENSION = cls.conn.network.find_extension('segment')
if not self.conn.network.find_extension('segment'):
self.skipTest('Segment extension disabled')
# Create a network to hold the segment.
net = cls.conn.network.create_network(name=cls.NETWORK_NAME)
net = self.conn.network.create_network(name=self.NETWORK_NAME)
assert isinstance(net, network.Network)
cls.assertIs(cls.NETWORK_NAME, net.name)
cls.NETWORK_ID = net.id
self.assertEqual(self.NETWORK_NAME, net.name)
self.NETWORK_ID = net.id
if cls.SEGMENT_EXTENSION:
if self.SEGMENT_EXTENSION:
# Get the segment for the network.
for seg in cls.conn.network.segments():
for seg in self.conn.network.segments():
assert isinstance(seg, segment.Segment)
if cls.NETWORK_ID == seg.network_id:
cls.NETWORK_TYPE = seg.network_type
cls.PHYSICAL_NETWORK = seg.physical_network
cls.SEGMENTATION_ID = seg.segmentation_id
cls.SEGMENT_ID = seg.id
if self.NETWORK_ID == seg.network_id:
self.NETWORK_TYPE = seg.network_type
self.PHYSICAL_NETWORK = seg.physical_network
self.SEGMENTATION_ID = seg.segmentation_id
self.SEGMENT_ID = seg.id
break
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_network(cls.NETWORK_ID,
ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_network(
self.NETWORK_ID,
ignore_missing=False)
self.assertIsNone(sot)
super(TestSegment, self).tearDown()
def test_create_delete(self):
if self.SEGMENT_EXTENSION:
sot = self.conn.network.create_segment(
description='test description',
name='test name',
network_id=self.NETWORK_ID,
network_type='geneve',
segmentation_id=2055,
)
self.assertIsInstance(sot, segment.Segment)
del_sot = self.conn.network.delete_segment(sot.id)
self.assertEqual('test description', sot.description)
self.assertEqual('test name', sot.name)
self.assertEqual(self.NETWORK_ID, sot.network_id)
self.assertEqual('geneve', sot.network_type)
self.assertIsNone(sot.physical_network)
self.assertEqual(2055, sot.segmentation_id)
self.assertIsNone(del_sot)
else:
self.skipTest('Segment extension disabled')
sot = self.conn.network.create_segment(
description='test description',
name='test name',
network_id=self.NETWORK_ID,
network_type='geneve',
segmentation_id=2055,
)
self.assertIsInstance(sot, segment.Segment)
del_sot = self.conn.network.delete_segment(sot.id)
self.assertEqual('test description', sot.description)
self.assertEqual('test name', sot.name)
self.assertEqual(self.NETWORK_ID, sot.network_id)
self.assertEqual('geneve', sot.network_type)
self.assertIsNone(sot.physical_network)
self.assertEqual(2055, sot.segmentation_id)
self.assertIsNone(del_sot)
def test_find(self):
if self.SEGMENT_EXTENSION:
sot = self.conn.network.find_segment(self.SEGMENT_ID)
self.assertEqual(self.SEGMENT_ID, sot.id)
else:
self.skipTest('Segment extension disabled')
sot = self.conn.network.find_segment(self.SEGMENT_ID)
self.assertEqual(self.SEGMENT_ID, sot.id)
def test_get(self):
if self.SEGMENT_EXTENSION:
sot = self.conn.network.get_segment(self.SEGMENT_ID)
self.assertEqual(self.SEGMENT_ID, sot.id)
self.assertIsNone(sot.name)
self.assertEqual(self.NETWORK_ID, sot.network_id)
self.assertEqual(self.NETWORK_TYPE, sot.network_type)
self.assertEqual(self.PHYSICAL_NETWORK, sot.physical_network)
self.assertEqual(self.SEGMENTATION_ID, sot.segmentation_id)
else:
self.skipTest('Segment extension disabled')
sot = self.conn.network.get_segment(self.SEGMENT_ID)
self.assertEqual(self.SEGMENT_ID, sot.id)
self.assertIsNone(sot.name)
self.assertEqual(self.NETWORK_ID, sot.network_id)
self.assertEqual(self.NETWORK_TYPE, sot.network_type)
self.assertEqual(self.PHYSICAL_NETWORK, sot.physical_network)
self.assertEqual(self.SEGMENTATION_ID, sot.segmentation_id)
def test_list(self):
if self.SEGMENT_EXTENSION:
ids = [o.id for o in self.conn.network.segments(name=None)]
self.assertIn(self.SEGMENT_ID, ids)
else:
self.skipTest('Segment extension disabled')
ids = [o.id for o in self.conn.network.segments(name=None)]
self.assertIn(self.SEGMENT_ID, ids)
def test_update(self):
if self.SEGMENT_EXTENSION:
sot = self.conn.network.update_segment(self.SEGMENT_ID,
description='update')
self.assertEqual('update', sot.description)
else:
self.skipTest('Segment extension disabled')
sot = self.conn.network.update_segment(self.SEGMENT_ID,
description='update')
self.assertEqual('update', sot.description)

View File

@ -21,25 +21,25 @@ class TestServiceProfile(base.BaseFunctionalTest):
METAINFO = "FlAVOR_PROFILE_METAINFO"
ID = None
@classmethod
def setUpClass(cls):
super(TestServiceProfile, cls).setUpClass()
service_profiles = cls.conn.network.create_service_profile(
description=cls.SERVICE_PROFILE_DESCRIPTION,
metainfo=cls.METAINFO,)
def setUp(self):
super(TestServiceProfile, self).setUp()
service_profiles = self.conn.network.create_service_profile(
description=self.SERVICE_PROFILE_DESCRIPTION,
metainfo=self.METAINFO,)
assert isinstance(service_profiles, _service_profile.ServiceProfile)
cls.assertIs(cls.SERVICE_PROFILE_DESCRIPTION,
service_profiles.description)
cls.assertIs(cls.METAINFO, service_profiles.meta_info)
self.assertEqual(
self.SERVICE_PROFILE_DESCRIPTION,
service_profiles.description)
self.assertEqual(self.METAINFO, service_profiles.meta_info)
cls.ID = service_profiles.id
self.ID = service_profiles.id
@classmethod
def tearDownClass(cls):
service_profiles = cls.conn.network.delete_service_profile(
cls.ID,
def tearDown(self):
service_profiles = self.conn.network.delete_service_profile(
self.ID,
ignore_missing=True)
cls.assertIs(None, service_profiles)
self.assertIsNone(service_profiles)
super(TestServiceProfile, self).tearDown()
def test_find(self):
service_profiles = self.conn.network.find_service_profile(

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import network
from openstack.network.v2 import subnet
@ -19,9 +18,6 @@ from openstack.tests.functional import base
class TestSubnet(base.BaseFunctionalTest):
NET_NAME = uuid.uuid4().hex
SUB_NAME = uuid.uuid4().hex
UPDATE_NAME = uuid.uuid4().hex
IPV4 = 4
CIDR = "10.100.0.0/24"
DNS_SERVERS = ["8.8.4.4", "8.8.8.8"]
@ -30,30 +26,34 @@ class TestSubnet(base.BaseFunctionalTest):
NET_ID = None
SUB_ID = None
@classmethod
def setUpClass(cls):
super(TestSubnet, cls).setUpClass()
net = cls.conn.network.create_network(name=cls.NET_NAME)
def setUp(self):
super(TestSubnet, self).setUp()
self.NET_NAME = self.getUniqueString()
self.SUB_NAME = self.getUniqueString()
self.UPDATE_NAME = self.getUniqueString()
net = self.conn.network.create_network(name=self.NET_NAME)
assert isinstance(net, network.Network)
cls.assertIs(cls.NET_NAME, net.name)
cls.NET_ID = net.id
sub = cls.conn.network.create_subnet(name=cls.SUB_NAME,
ip_version=cls.IPV4,
network_id=cls.NET_ID,
cidr=cls.CIDR,
dns_nameservers=cls.DNS_SERVERS,
allocation_pools=cls.POOL,
host_routes=cls.ROUTES)
self.assertEqual(self.NET_NAME, net.name)
self.NET_ID = net.id
sub = self.conn.network.create_subnet(
name=self.SUB_NAME,
ip_version=self.IPV4,
network_id=self.NET_ID,
cidr=self.CIDR,
dns_nameservers=self.DNS_SERVERS,
allocation_pools=self.POOL,
host_routes=self.ROUTES)
assert isinstance(sub, subnet.Subnet)
cls.assertIs(cls.SUB_NAME, sub.name)
cls.SUB_ID = sub.id
self.assertEqual(self.SUB_NAME, sub.name)
self.SUB_ID = sub.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_subnet(cls.SUB_ID)
cls.assertIs(None, sot)
sot = cls.conn.network.delete_network(cls.NET_ID, ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_subnet(self.SUB_ID)
self.assertIsNone(sot)
sot = self.conn.network.delete_network(
self.NET_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestSubnet, self).tearDown()
def test_find(self):
sot = self.conn.network.find_subnet(self.SUB_NAME)

View File

@ -10,7 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstack.network.v2 import subnet_pool as _subnet_pool
from openstack.tests.functional import base
@ -18,8 +17,6 @@ from openstack.tests.functional import base
class TestSubnetPool(base.BaseFunctionalTest):
SUBNET_POOL_NAME = uuid.uuid4().hex
SUBNET_POOL_NAME_UPDATED = uuid.uuid4().hex
SUBNET_POOL_ID = None
MINIMUM_PREFIX_LENGTH = 8
DEFAULT_PREFIX_LENGTH = 24
@ -29,25 +26,26 @@ class TestSubnetPool(base.BaseFunctionalTest):
IP_VERSION = 4
PREFIXES = ['10.100.0.0/24', '10.101.0.0/24']
@classmethod
def setUpClass(cls):
super(TestSubnetPool, cls).setUpClass()
subnet_pool = cls.conn.network.create_subnet_pool(
name=cls.SUBNET_POOL_NAME,
min_prefixlen=cls.MINIMUM_PREFIX_LENGTH,
default_prefixlen=cls.DEFAULT_PREFIX_LENGTH,
max_prefixlen=cls.MAXIMUM_PREFIX_LENGTH,
default_quota=cls.DEFAULT_QUOTA,
shared=cls.IS_SHARED,
prefixes=cls.PREFIXES)
def setUp(self):
super(TestSubnetPool, self).setUp()
self.SUBNET_POOL_NAME = self.getUniqueString()
self.SUBNET_POOL_NAME_UPDATED = self.getUniqueString()
subnet_pool = self.conn.network.create_subnet_pool(
name=self.SUBNET_POOL_NAME,
min_prefixlen=self.MINIMUM_PREFIX_LENGTH,
default_prefixlen=self.DEFAULT_PREFIX_LENGTH,
max_prefixlen=self.MAXIMUM_PREFIX_LENGTH,
default_quota=self.DEFAULT_QUOTA,
shared=self.IS_SHARED,
prefixes=self.PREFIXES)
assert isinstance(subnet_pool, _subnet_pool.SubnetPool)
cls.assertIs(cls.SUBNET_POOL_NAME, subnet_pool.name)
cls.SUBNET_POOL_ID = subnet_pool.id
self.assertEqual(self.SUBNET_POOL_NAME, subnet_pool.name)
self.SUBNET_POOL_ID = subnet_pool.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.network.delete_subnet_pool(cls.SUBNET_POOL_ID)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.network.delete_subnet_pool(self.SUBNET_POOL_ID)
self.assertIsNone(sot)
super(TestSubnetPool, self).tearDown()
def test_find(self):
sot = self.conn.network.find_subnet_pool(self.SUBNET_POOL_NAME)

View File

@ -10,20 +10,19 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type='object-store'),
'Object Storage service does not exist')
class TestAccount(base.BaseFunctionalTest):
@classmethod
def tearDownClass(cls):
super(TestAccount, cls).tearDownClass()
account = cls.conn.object_store.get_account_metadata()
cls.conn.object_store.delete_account_metadata(account.metadata.keys())
def setUp(self):
super(TestAccount, self).setUp()
self.require_service('object-store')
def tearDown(self):
account = self.conn.object_store.get_account_metadata()
self.conn.object_store.delete_account_metadata(account.metadata.keys())
super(TestAccount, self).tearDown()
def test_system_metadata(self):
account = self.conn.object_store.get_account_metadata()

View File

@ -10,31 +10,23 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import uuid
from openstack.object_store.v1 import container as _container
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type='object-store'),
'Object Storage service does not exist')
class TestContainer(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
def setUp(self):
super(TestContainer, self).setUp()
self.require_service('object-store')
@classmethod
def setUpClass(cls):
super(TestContainer, cls).setUpClass()
container = cls.conn.object_store.create_container(name=cls.NAME)
self.NAME = self.getUniqueString()
container = self.conn.object_store.create_container(name=self.NAME)
self.addEmptyCleanup(
self.conn.object_store.delete_container,
self.NAME, ignore_missing=False)
assert isinstance(container, _container.Container)
cls.assertIs(cls.NAME, container.name)
@classmethod
def tearDownClass(cls):
result = cls.conn.object_store.delete_container(cls.NAME,
ignore_missing=False)
cls.assertIs(None, result)
self.assertEqual(self.NAME, container.name)
def test_list(self):
names = [o.name for o in self.conn.object_store.containers()]

View File

@ -10,32 +10,26 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import uuid
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type='object-store'),
'Object Storage service does not exist')
class TestObject(base.BaseFunctionalTest):
FOLDER = uuid.uuid4().hex
FILE = uuid.uuid4().hex
DATA = b'abc'
@classmethod
def setUpClass(cls):
super(TestObject, cls).setUpClass()
cls.conn.object_store.create_container(name=cls.FOLDER)
cls.sot = cls.conn.object_store.upload_object(
container=cls.FOLDER, name=cls.FILE, data=cls.DATA)
def setUp(self):
super(TestObject, self).setUp()
self.require_service('object-store')
@classmethod
def tearDownClass(cls):
super(TestObject, cls).tearDownClass()
cls.conn.object_store.delete_object(cls.sot, ignore_missing=False)
cls.conn.object_store.delete_container(cls.FOLDER)
self.FOLDER = self.getUniqueString()
self.FILE = self.getUniqueString()
self.conn.object_store.create_container(name=self.FOLDER)
self.addCleanup(self.conn.object_store.delete_container, self.FOLDER)
self.sot = self.conn.object_store.upload_object(
container=self.FOLDER, name=self.FILE, data=self.DATA)
self.addEmptyCleanup(
self.conn.object_store.delete_object, self.sot,
ignore_missing=False)
def test_list(self):
names = [o.name for o

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import time
import unittest
from openstack import exceptions
@ -19,8 +20,6 @@ from openstack.tests.functional.network.v2 import test_network
@unittest.skip("bug/1525005")
@unittest.skipUnless(base.service_exists(service_type='orchestration'),
'Orchestration service does not exist')
class TestStack(base.BaseFunctionalTest):
NAME = 'test_stack'
@ -29,48 +28,50 @@ class TestStack(base.BaseFunctionalTest):
subnet = None
cidr = '10.99.99.0/16'
@classmethod
def setUpClass(cls):
super(TestStack, cls).setUpClass()
if cls.conn.compute.find_keypair(cls.NAME) is None:
cls.conn.compute.create_keypair(name=cls.NAME)
image = next(cls.conn.image.images())
def setUp(self):
super(TestStack, self).setUp()
self.require_service('orchestration')
if self.conn.compute.find_keypair(self.NAME) is None:
self.conn.compute.create_keypair(name=self.NAME)
image = next(self.conn.image.images())
tname = "openstack/tests/functional/orchestration/v1/hello_world.yaml"
with open(tname) as f:
template = f.read()
cls.network, cls.subnet = test_network.create_network(cls.conn,
cls.NAME,
cls.cidr)
self.network, self.subnet = test_network.create_network(
self.conn,
self.NAME,
self.cidr)
parameters = {
'image': image.id,
'key_name': cls.NAME,
'network': cls.network.id,
'key_name': self.NAME,
'network': self.network.id,
}
sot = cls.conn.orchestration.create_stack(
name=cls.NAME,
sot = self.conn.orchestration.create_stack(
name=self.NAME,
parameters=parameters,
template=template,
)
assert isinstance(sot, stack.Stack)
cls.assertIs(True, (sot.id is not None))
cls.stack = sot
cls.assertIs(cls.NAME, sot.name)
cls.conn.orchestration.wait_for_status(
self.assertEqual(True, (sot.id is not None))
self.stack = sot
self.assertEqual(self.NAME, sot.name)
self.conn.orchestration.wait_for_status(
sot, status='CREATE_COMPLETE', failures=['CREATE_FAILED'])
@classmethod
def tearDownClass(cls):
super(TestStack, cls).tearDownClass()
cls.conn.orchestration.delete_stack(cls.stack, ignore_missing=False)
cls.conn.compute.delete_keypair(cls.NAME)
def tearDown(self):
self.conn.orchestration.delete_stack(self.stack, ignore_missing=False)
self.conn.compute.delete_keypair(self.NAME)
# Need to wait for the stack to go away before network delete
try:
cls.conn.orchestration.wait_for_status(
cls.stack, 'DELETE_COMPLETE')
self.conn.orchestration.wait_for_status(
self.stack, 'DELETE_COMPLETE')
except exceptions.NotFoundException:
pass
cls.linger_for_delete()
test_network.delete_network(cls.conn, cls.network, cls.subnet)
# TODO(shade) sleeping in tests is bad mmkay?
time.sleep(40)
test_network.delete_network(self.conn, self.network, self.subnet)
super(TestStack, self).tearDown()
def test_list(self):
names = [o.name for o in self.conn.orchestration.stacks()]

View File

@ -11,28 +11,25 @@
# under the License.
import unittest
import uuid
from openstack.telemetry.alarm.v2 import alarm
from openstack.tests.functional import base
@unittest.skip("bug/1524468")
@unittest.skipUnless(base.service_exists(service_type="alarming"),
"Alarming service does not exist")
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
class TestAlarm(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
ID = None
@classmethod
def setUpClass(cls):
super(TestAlarm, cls).setUpClass()
meter = next(cls.conn.telemetry.meters())
sot = cls.conn.alarm.create_alarm(
name=cls.NAME,
def setUp(self):
super(TestAlarm, self).setUp()
self.require_service('alarming')
self.require_service('metering')
self.NAME = self.getUniqueString()
meter = next(self.conn.telemetry.meters())
sot = self.conn.alarm.create_alarm(
name=self.NAME,
type='threshold',
threshold_rule={
'meter_name': meter.name,
@ -40,13 +37,13 @@ class TestAlarm(base.BaseFunctionalTest):
},
)
assert isinstance(sot, alarm.Alarm)
cls.assertIs(cls.NAME, sot.name)
cls.ID = sot.id
self.assertEqual(self.NAME, sot.name)
self.ID = sot.id
@classmethod
def tearDownClass(cls):
sot = cls.conn.alarm.delete_alarm(cls.ID, ignore_missing=False)
cls.assertIs(None, sot)
def tearDown(self):
sot = self.conn.alarm.delete_alarm(self.ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestAlarm, self).tearDown()
def test_get(self):
sot = self.conn.alarm.get_alarm(self.ID)

View File

@ -11,38 +11,32 @@
# under the License.
import unittest
import uuid
from openstack.tests.functional import base
@unittest.skip("bug/1524468")
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
@unittest.skipUnless(base.service_exists(service_type="alarming"),
"Alarming service does not exist")
class TestAlarmChange(base.BaseFunctionalTest):
NAME = uuid.uuid4().hex
alarm = None
@classmethod
def setUpClass(cls):
super(TestAlarmChange, cls).setUpClass()
meter = next(cls.conn.telemetry.meters())
alarm = cls.conn.alarm.create_alarm(
name=cls.NAME,
def setUp(self):
super(TestAlarmChange, self).setUp()
self.require_service('alarming')
self.require_service('metering')
self.NAME = self.getUniqueString()
meter = next(self.conn.telemetry.meters())
self.alarm = self.conn.alarm.create_alarm(
name=self.NAME,
type='threshold',
threshold_rule={
'meter_name': meter.name,
'threshold': 1.1,
},
)
cls.alarm = alarm
@classmethod
def tearDownClass(cls):
cls.conn.alarm.delete_alarm(cls.alarm, ignore_missing=False)
self.addCleanup(
self.conn.alarm.delete_alarm, self.alarm, ignore_missing=False)
def test_list(self):
change = next(self.conn.alarm.alarm_changes(self.alarm))

View File

@ -10,15 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
class TestCapability(base.BaseFunctionalTest):
def setUp(self):
super(TestCapability, self).setUp()
self.require_service('metering')
def test_list(self):
ids = [o.id for o in self.conn.telemetry.capabilities()]
self.assertIn('resources:query:simple', ids)

View File

@ -10,21 +10,20 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import uuid
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
class TestMeter(base.BaseFunctionalTest):
def setUp(self):
super(TestMeter, self).setUp()
self.require_service('metering')
def test_list(self):
# TODO(thowe): Remove this in favor of create_meter call.
# Since we do not have a create meter method at the moment
# make sure there is some data in there
name = uuid.uuid4().hex
name = self.getUniqueString()
tainer = self.conn.object_store.create_container(name=name)
self.conn.object_store.delete_container(tainer)

View File

@ -10,15 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
class TestResource(base.BaseFunctionalTest):
def setUp(self):
super(TestResource, self).setUp()
self.require_service('metering')
def test_list(self):
ids = [o.resource_id for o in self.conn.telemetry.resources()]
self.assertNotEqual(0, len(ids))

View File

@ -10,16 +10,16 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from openstack.telemetry.v2 import sample
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
class TestSample(base.BaseFunctionalTest):
def setUp(self):
super(TestSample, self).setUp()
self.require_service('metering')
def test_list(self):
for meter in self.conn.telemetry.meters():
for sot in self.conn.telemetry.samples(meter):

View File

@ -10,15 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from openstack.tests.functional import base
@unittest.skipUnless(base.service_exists(service_type="metering"),
"Metering service does not exist")
class TestStatistics(base.BaseFunctionalTest):
def setUp(self):
super(TestStatistics, self).setUp()
self.require_service('metering')
def test_list(self):
for met in self.conn.telemetry.meters():
for stat in self.conn.telemetry.statistics(met):