Functional: Split python client functional testing case

This patch splits test_magnum_python_client.py to python_client_base.py
, test_magnum_python_client.py and test_k8s_python_client.py.

python_client_base.py: base class for python client testing.
test_magnum_python_client.py: common python clients testcases.
test_k8s_python_client.py.: k8s specified functional testing.

Futher, there would be swarm/mesos functional testing cases.

Partially implements: blueprint swarm-functional-testing
Change-Id: I1e3e89fd07f7dfe28baeefdc0558f986fd1376bf
This commit is contained in:
Eli Qiao 2015-09-16 11:16:23 +08:00
parent 59accc5e37
commit a89a5a32ad
5 changed files with 319 additions and 266 deletions

View File

@ -14,3 +14,4 @@ pass = secrete
image_id = fedora-21-atomic-5
nic_id = public
keypair_id = default
flavor_id = m1.magnum

View File

@ -84,6 +84,7 @@ region_name = $OS_REGION_NAME
image_id = $IMAGE_ID
nic_id = $NIC_ID
keypair_id = default
flavor_id = m1.magnum
EOF
# Create a keypair for use in the functional tests.

View File

@ -0,0 +1,187 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
test_magnum
----------------------------------
Tests for `magnum` module.
"""
import ConfigParser
import os
import time
import fixtures
from magnum.tests import base
from magnumclient.openstack.common.apiclient import exceptions
from magnumclient.openstack.common import cliutils
from magnumclient.v1 import client as v1client
class BaseMagnumClient(base.TestCase):
@classmethod
def setUpClass(cls):
# Collecting of credentials:
#
# Support the existence of a functional_creds.conf for
# testing. This makes it possible to use a config file.
user = cliutils.env('OS_USERNAME')
passwd = cliutils.env('OS_PASSWORD')
tenant = cliutils.env('OS_TENANT_NAME')
tenant_id = cliutils.env('OS_TENANT_ID')
auth_url = cliutils.env('OS_AUTH_URL')
region_name = cliutils.env('OS_REGION_NAME')
magnum_url = cliutils.env('BYPASS_URL')
image_id = cliutils.env('IMAGE_ID')
nic_id = cliutils.env('NIC_ID')
flavor_id = cliutils.env('FLAVOR_ID')
keypair_id = cliutils.env('KEYPAIR_ID')
config = ConfigParser.RawConfigParser()
if config.read('functional_creds.conf'):
# the OR pattern means the environment is preferred for
# override
user = user or config.get('admin', 'user')
passwd = passwd or config.get('admin', 'pass')
tenant = tenant or config.get('admin', 'tenant')
auth_url = auth_url or config.get('auth', 'auth_url')
magnum_url = magnum_url or config.get('auth', 'magnum_url')
image_id = image_id or config.get('magnum', 'image_id')
nic_id = nic_id or config.get('magnum', 'nic_id')
flavor_id = flavor_id or config.get('magnum', 'flavor_id')
keypair_id = keypair_id or config.get('magnum', 'keypair_id')
cls.image_id = image_id
cls.nic_id = nic_id
cls.flavor_id = flavor_id
cls.keypair_id = keypair_id
cls.cs = v1client.Client(username=user,
api_key=passwd,
project_id=tenant_id,
project_name=tenant,
auth_url=auth_url,
service_type='container',
region_name=region_name,
magnum_url=magnum_url)
@classmethod
def _wait_on_status(cls, bay, wait_status, finish_status):
# Check status every 60 seconds for a total of 100 minutes
for i in range(100):
# sleep 1s to wait bay status changes, this will be usefull for
# the first time we wait for the status, to avoid another 59s
time.sleep(1)
status = cls.cs.bays.get(bay.uuid).status
if status in wait_status:
time.sleep(59)
elif status in finish_status:
break
else:
raise Exception("Unknown Status : %s" % status)
@classmethod
def _create_baymodel(cls, name, coe='kubernetes'):
baymodel = cls.cs.baymodels.create(
name=name,
keypair_id=cls.keypair_id,
external_network_id=cls.nic_id,
image_id=cls.image_id,
flavor_id=cls.flavor_id,
docker_volume_size=1,
network_driver='flannel',
coe=coe,
)
return baymodel
@classmethod
def _create_bay(cls, name, baymodel_uuid, wait=True):
bay = cls.cs.bays.create(
name=name,
baymodel_id=baymodel_uuid,
node_count=None,
)
if wait:
cls._wait_on_status(bay,
[None, "CREATE_IN_PROGRESS"],
["CREATE_FAILED",
"CREATE_COMPLETE"])
return bay
@classmethod
def _delete_baymodel(cls, baymodel_uuid):
cls.cs.baymodels.delete(baymodel_uuid)
@classmethod
def _delete_bay(cls, bay_uuid):
cls.cs.bays.delete(bay_uuid)
class BayTest(BaseMagnumClient):
# NOTE (eliqiao) coe should be specified in subclasses
coe = None
def setUp(self):
super(BayTest, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
def _test_baymodel_create_and_delete(self, delete=True):
baymodel = self._create_baymodel('testbay', coe=self.coe)
list = [item.uuid for item in self.cs.baymodels.list()]
self.assertTrue(baymodel.uuid in list)
if not delete:
return baymodel
else:
self.cs.baymodels.delete(baymodel.uuid)
list = [item.uuid for item in self.cs.baymodels.list()]
self.assertTrue(baymodel.uuid not in list)
def _test_bay_create_and_delete(self):
baymodel = self._test_baymodel_create_and_delete(delete=False)
bay = self._create_bay('testbay', baymodel.uuid)
list = [item.uuid for item in self.cs.bays.list()]
self.assertTrue(bay.uuid in list)
try:
self.assertIn(self.cs.bays.get(bay.uuid).status,
["CREATED", "CREATE_COMPLETE"])
finally:
# Ensure we delete whether the assert above is true or false
self.cs.bays.delete(bay.uuid)
try:
self._wait_on_status(bay,
["CREATE_COMPLETE",
"DELETE_IN_PROGRESS", "CREATE_FAILED"],
["DELETE_FAILED",
"DELETE_COMPLETE"])
except exceptions.NotFound:
# if bay/get fails, the bay has been deleted already
pass
try:
self.cs.baymodels.delete(baymodel.uuid)
except exceptions.BadRequest:
pass

View File

@ -0,0 +1,129 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from magnum.common.pythonk8sclient.swagger_client import api_client
from magnum.common.pythonk8sclient.swagger_client.apis import apiv_api
from magnum.tests.functional.python_client_base import BaseMagnumClient
from magnum.tests.functional.python_client_base import BayTest
from magnumclient.openstack.common.apiclient import exceptions
class TestBayModelResource(BayTest):
coe = 'kubernetes'
def test_baymodel_create_and_delete(self):
self._test_baymodel_create_and_delete()
class TestBayResource(BayTest):
coe = 'kubernetes'
def test_bay_create_and_delete(self):
self._test_bay_create_and_delete()
class TestKubernetesAPIs(BaseMagnumClient):
@classmethod
def setUpClass(cls):
super(TestKubernetesAPIs, cls).setUpClass()
cls.baymodel = cls._create_baymodel('testk8sAPI')
cls.bay = cls._create_bay('testk8sAPI', cls.baymodel.uuid)
kube_api_address = cls.cs.bays.get(cls.bay.uuid).api_address
kube_api_url = 'http://%s' % kube_api_address
k8s_client = api_client.ApiClient(kube_api_url)
cls.k8s_api = apiv_api.ApivApi(k8s_client)
@classmethod
def tearDownClass(cls):
cls._delete_bay(cls.bay.uuid)
try:
cls._wait_on_status(cls.bay,
["CREATE_COMPLETE",
"DELETE_IN_PROGRESS", "CREATE_FAILED"],
["DELETE_FAILED", "DELETE_COMPLETE"])
except exceptions.NotFound:
pass
cls._delete_baymodel(cls.baymodel.uuid)
def test_pod_apis(self):
pod_manifest = {'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'color': 'blue', 'name': 'test'},
'spec': {'containers': [{'image': 'dockerfile/redis',
'name': 'redis'}]}}
resp = self.k8s_api.create_namespaced_pod(body=pod_manifest,
namespace='default')
self.assertEqual(resp.metadata.name, 'test')
self.assertTrue(resp.status.phase)
resp = self.k8s_api.read_namespaced_pod(name='test',
namespace='default')
self.assertEqual(resp.metadata.name, 'test')
self.assertTrue(resp.status.phase)
resp = self.k8s_api.delete_namespaced_pod(name='test', body={},
namespace='default')
def test_service_apis(self):
service_manifest = {'apiVersion': 'v1',
'kind': 'Service',
'metadata': {'labels': {'name': 'frontend'},
'name': 'frontend',
'resourceversion': 'v1'},
'spec': {'ports': [{'port': 80,
'protocol': 'TCP',
'targetPort': 80}],
'selector': {'name': 'frontend'}}}
resp = self.k8s_api.create_namespaced_service(body=service_manifest,
namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertTrue(resp.status)
resp = self.k8s_api.read_namespaced_service(name='frontend',
namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertTrue(resp.status)
resp = self.k8s_api.delete_namespaced_service(name='frontend',
namespace='default')
def test_replication_controller_apis(self):
rc_manifest = {
'apiVersion': 'v1',
'kind': 'ReplicationController',
'metadata': {'labels': {'name': 'frontend'},
'name': 'frontend'},
'spec': {'replicas': 2,
'selector': {'name': 'frontend'},
'template': {'metadata': {
'labels': {'name': 'frontend'}},
'spec': {'containers': [{
'image': 'nginx',
'name': 'nginx',
'ports': [{'containerPort': 80,
'protocol': 'TCP'}]}]}}}}
resp = self.k8s_api.create_namespaced_replication_controller(
body=rc_manifest, namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertEqual(resp.spec.replicas, 2)
resp = self.k8s_api.read_namespaced_replication_controller(
name='frontend', namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertEqual(resp.spec.replicas, 2)
resp = self.k8s_api.delete_namespaced_replication_controller(
name='frontend', body={}, namespace='default')

View File

@ -10,119 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
test_magnum
----------------------------------
Tests for `magnum` module.
"""
import ConfigParser
import os
import time
import fixtures
from magnum.common.pythonk8sclient.swagger_client import api_client
from magnum.common.pythonk8sclient.swagger_client.apis import apiv_api
from magnum.tests import base
from magnumclient.openstack.common.apiclient import exceptions
from magnumclient.openstack.common import cliutils
from magnumclient.v1 import client as v1client
class BaseMagnumClient(base.TestCase):
@classmethod
def setUpClass(cls):
# Collecting of credentials:
#
# Support the existence of a functional_creds.conf for
# testing. This makes it possible to use a config file.
user = cliutils.env('OS_USERNAME')
passwd = cliutils.env('OS_PASSWORD')
tenant = cliutils.env('OS_TENANT_NAME')
tenant_id = cliutils.env('OS_TENANT_ID')
auth_url = cliutils.env('OS_AUTH_URL')
region_name = cliutils.env('OS_REGION_NAME')
magnum_url = cliutils.env('BYPASS_URL')
image_id = cliutils.env('IMAGE_ID')
nic_id = cliutils.env('NIC_ID')
config = ConfigParser.RawConfigParser()
if config.read('functional_creds.conf'):
# the OR pattern means the environment is preferred for
# override
user = user or config.get('admin', 'user')
passwd = passwd or config.get('admin', 'pass')
tenant = tenant or config.get('admin', 'tenant')
auth_url = auth_url or config.get('auth', 'auth_url')
magnum_url = magnum_url or config.get('auth', 'magnum_url')
image_id = image_id or config.get('magnum', 'image_id')
nic_id = nic_id or config.get('magnum', 'nic_id')
cls.image_id = image_id
cls.nic_id = nic_id
cls.cs = v1client.Client(username=user,
api_key=passwd,
project_id=tenant_id,
project_name=tenant,
auth_url=auth_url,
service_type='container',
region_name=region_name,
magnum_url=magnum_url)
@classmethod
def _wait_on_status(cls, bay, wait_status, finish_status):
# Check status every 60 seconds for a total of 100 minutes
for i in range(100):
# sleep 1s to wait bay status changes, this will be usefull for
# the first time we wait for the status, to avoid another 59s
time.sleep(1)
status = cls.cs.bays.get(bay.uuid).status
if status in wait_status:
time.sleep(59)
elif status in finish_status:
break
else:
raise Exception("Unknown Status : %s" % status)
@classmethod
def _create_baymodel(cls, name):
baymodel = cls.cs.baymodels.create(
name=name,
keypair_id='default',
external_network_id=cls.nic_id,
image_id=cls.image_id,
flavor_id='m1.magnum',
docker_volume_size=1,
coe='kubernetes',
network_driver='flannel',
)
return baymodel
@classmethod
def _create_bay(cls, name, baymodel_uuid, wait=True):
bay = cls.cs.bays.create(
name=name,
baymodel_id=baymodel_uuid,
node_count=None,
)
if wait:
cls._wait_on_status(bay,
[None, "CREATE_IN_PROGRESS"],
["CREATE_FAILED",
"CREATE_COMPLETE"])
return bay
@classmethod
def _delete_baymodel(cls, baymodel_uuid):
cls.cs.baymodels.delete(baymodel_uuid)
@classmethod
def _delete_bay(cls, bay_uuid):
cls.cs.bays.delete(bay_uuid)
from magnum.tests.functional.python_client_base import BaseMagnumClient
class TestListResources(BaseMagnumClient):
@ -146,157 +35,3 @@ class TestListResources(BaseMagnumClient):
def test_services_list(self):
self.assertTrue(self.cs.services.list() is not None)
class TestBayModelResource(BaseMagnumClient):
def test_bay_model_create_and_delete(self):
baymodel = self._create_baymodel('testbaymodel')
list = [item.uuid for item in self.cs.baymodels.list()]
self.assertTrue(baymodel.uuid in list)
self.cs.baymodels.delete(baymodel.uuid)
list = [item.uuid for item in self.cs.baymodels.list()]
self.assertTrue(baymodel.uuid not in list)
class TestBayResource(BaseMagnumClient):
def setUp(self):
super(TestBayResource, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.baymodel = self._create_baymodel('testbay')
def delete_baymodel():
try:
self.cs.baymodels.delete(self.baymodel.uuid)
except exceptions.BadRequest:
pass
self.addCleanup(delete_baymodel)
def test_bay_create_and_delete(self):
bay = self._create_bay('testbay', self.baymodel.uuid)
list = [item.uuid for item in self.cs.bays.list()]
self.assertTrue(bay.uuid in list)
try:
self.assertIn(self.cs.bays.get(bay.uuid).status,
["CREATED", "CREATE_COMPLETE"])
finally:
# Ensure we delete whether the assert above is true or false
self.cs.bays.delete(bay.uuid)
try:
self._wait_on_status(bay,
["CREATE_COMPLETE",
"DELETE_IN_PROGRESS", "CREATE_FAILED"],
["DELETE_FAILED",
"DELETE_COMPLETE"])
except exceptions.NotFound:
# if bay/get fails, the bay has been deleted already
pass
class TestKubernetesAPIs(BaseMagnumClient):
@classmethod
def setUpClass(cls):
super(TestKubernetesAPIs, cls).setUpClass()
cls.baymodel = cls._create_baymodel('testk8sAPI')
cls.bay = cls._create_bay('testk8sAPI', cls.baymodel.uuid)
kube_api_address = cls.cs.bays.get(cls.bay.uuid).api_address
kube_api_url = 'http://%s' % kube_api_address
k8s_client = api_client.ApiClient(kube_api_url)
cls.k8s_api = apiv_api.ApivApi(k8s_client)
@classmethod
def tearDownClass(cls):
cls._delete_bay(cls.bay.uuid)
try:
cls._wait_on_status(cls.bay,
["CREATE_COMPLETE",
"DELETE_IN_PROGRESS", "CREATE_FAILED"],
["DELETE_FAILED", "DELETE_COMPLETE"])
except exceptions.NotFound:
pass
cls._delete_baymodel(cls.baymodel.uuid)
def test_pod_apis(self):
pod_manifest = {'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {'color': 'blue', 'name': 'test'},
'spec': {'containers': [{'image': 'dockerfile/redis',
'name': 'redis'}]}}
resp = self.k8s_api.create_namespaced_pod(body=pod_manifest,
namespace='default')
self.assertEqual(resp.metadata.name, 'test')
self.assertTrue(resp.status.phase)
resp = self.k8s_api.read_namespaced_pod(name='test',
namespace='default')
self.assertEqual(resp.metadata.name, 'test')
self.assertTrue(resp.status.phase)
resp = self.k8s_api.delete_namespaced_pod(name='test', body={},
namespace='default')
def test_service_apis(self):
service_manifest = {'apiVersion': 'v1',
'kind': 'Service',
'metadata': {'labels': {'name': 'frontend'},
'name': 'frontend',
'resourceversion': 'v1'},
'spec': {'ports': [{'port': 80,
'protocol': 'TCP',
'targetPort': 80}],
'selector': {'name': 'frontend'}}}
resp = self.k8s_api.create_namespaced_service(body=service_manifest,
namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertTrue(resp.status)
resp = self.k8s_api.read_namespaced_service(name='frontend',
namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertTrue(resp.status)
resp = self.k8s_api.delete_namespaced_service(name='frontend',
namespace='default')
def test_replication_controller_apis(self):
rc_manifest = {
'apiVersion': 'v1',
'kind': 'ReplicationController',
'metadata': {'labels': {'name': 'frontend'},
'name': 'frontend'},
'spec': {'replicas': 2,
'selector': {'name': 'frontend'},
'template': {'metadata': {
'labels': {'name': 'frontend'}},
'spec': {'containers': [{
'image': 'nginx',
'name': 'nginx',
'ports': [{'containerPort': 80,
'protocol': 'TCP'}]}]}}}}
resp = self.k8s_api.create_namespaced_replication_controller(
body=rc_manifest, namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertEqual(resp.spec.replicas, 2)
resp = self.k8s_api.read_namespaced_replication_controller(
name='frontend', namespace='default')
self.assertEqual(resp.metadata.name, 'frontend')
self.assertEqual(resp.spec.replicas, 2)
resp = self.k8s_api.delete_namespaced_replication_controller(
name='frontend', body={}, namespace='default')