openstacksdk/shade/tests/unit/test_clustering.py
Bailey Miller 4f92423f87
Adds Senlin support to openstacksdk
Adds Senlin API to openstacksdk, along with unit and functional tests
for each Senlin function. Allows openstacksdk to create, list, get,
update and delete clusters, cluster policies, cluster profiles, and
cluster receivers. Also allows for attaching and detaching policies
to clusters, updating policies on clusters, and listing policies on clusters.

Change-Id: I7e80e8ba74bdb415c2359f5c9672aa900f441fba
2018-06-21 07:24:00 -05:00

659 lines
24 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import testtools
import shade
from shade.tests.unit import base
CLUSTERING_DICT = {
'name': 'fake-name',
'profile_id': '1',
'desired_capacity': 1,
'config': 'fake-config',
'max_size': 1,
'min_size': 1,
'timeout': 100,
'metadata': {}
}
PROFILE_DICT = {
'name': 'fake-profile-name',
'spec': {},
'metadata': {}
}
POLICY_DICT = {
'name': 'fake-profile-name',
'spec': {},
}
RECEIVER_DICT = {
'action': 'FAKE_CLUSTER_ACTION',
'cluster_id': 'fake-cluster-id',
'name': 'fake-receiver-name',
'params': {},
'type': 'webhook'
}
NEW_CLUSTERING_DICT = copy.copy(CLUSTERING_DICT)
NEW_CLUSTERING_DICT['id'] = '1'
NEW_PROFILE_DICT = copy.copy(PROFILE_DICT)
NEW_PROFILE_DICT['id'] = '1'
NEW_POLICY_DICT = copy.copy(POLICY_DICT)
NEW_POLICY_DICT['id'] = '1'
NEW_RECEIVER_DICT = copy.copy(RECEIVER_DICT)
NEW_RECEIVER_DICT['id'] = '1'
class TestClustering(base.RequestsMockTestCase):
def assertAreInstances(self, elements, elem_type):
for e in elements:
self.assertIsInstance(e, elem_type)
def setUp(self):
super(TestClustering, self).setUp()
self.use_senlin()
def test_create_cluster(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json=NEW_CLUSTERING_DICT)
])
profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id'])
c = self.cloud.create_cluster(
name=CLUSTERING_DICT['name'],
desired_capacity=CLUSTERING_DICT['desired_capacity'],
profile=profile,
config=CLUSTERING_DICT['config'],
max_size=CLUSTERING_DICT['max_size'],
min_size=CLUSTERING_DICT['min_size'],
metadata=CLUSTERING_DICT['metadata'],
timeout=CLUSTERING_DICT['timeout'])
self.assertEqual(NEW_CLUSTERING_DICT, c)
self.assert_calls()
def test_create_cluster_exception(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json={
"profiles": [NEW_PROFILE_DICT]}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
status_code=500)
])
profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id'])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating cluster fake-name.*"):
self.cloud.create_cluster(name='fake-name', profile=profile)
self.assert_calls()
def test_get_cluster_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT})
])
cluster = self.cloud.get_cluster_by_id('1')
self.assertEqual(cluster['id'], '1')
self.assert_calls()
def test_get_cluster_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters',
'no-cluster']),
status_code=404)
])
c = self.cloud.get_cluster_by_id('no-cluster')
self.assertFalse(c)
self.assert_calls()
def test_update_cluster(self):
new_max_size = 5
updated_cluster = copy.copy(NEW_CLUSTERING_DICT)
updated_cluster['max_size'] = new_max_size
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json=updated_cluster,
)
])
cluster = self.cloud.get_cluster_by_id('1')
c = self.cloud.update_cluster(cluster, new_max_size)
self.assertEqual(updated_cluster, c)
self.assert_calls()
def test_delete_cluster(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies']),
json={"cluster_policies": []}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json={"receivers": []}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json=NEW_CLUSTERING_DICT)
])
self.assertTrue(self.cloud.delete_cluster('1'))
self.assert_calls()
def test_list_clusters(self):
clusters = {'clusters': [NEW_CLUSTERING_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json=clusters)
])
c = self.cloud.list_clusters()
self.assertIsInstance(c, list)
self.assertAreInstances(c, dict)
self.assert_calls()
def test_attach_policy_to_cluster(self):
policy = {
'policy_id': '1',
'enabled': 'true'
}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'actions']),
json={'policy_attach': policy})
])
cluster = self.cloud.get_cluster_by_id('1')
policy = self.cloud.get_cluster_policy_by_id('1')
p = self.cloud.attach_policy_to_cluster(cluster, policy, 'true')
self.assertTrue(p)
self.assert_calls()
def test_detach_policy_from_cluster(self):
updated_cluster = copy.copy(NEW_CLUSTERING_DICT)
updated_cluster['policies'] = ['1']
detached_cluster = copy.copy(NEW_CLUSTERING_DICT)
detached_cluster['policies'] = []
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'actions']),
json={'policy_detach': {'policy_id': '1'}}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": updated_cluster}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": detached_cluster}),
])
cluster = self.cloud.get_cluster_by_id('1')
policy = self.cloud.get_cluster_policy_by_id('1')
p = self.cloud.detach_policy_from_cluster(cluster, policy, wait=True)
self.assertTrue(p)
self.assert_calls()
def test_get_policy_on_cluster_by_id(self):
cluster_policy = {
"cluster_id": "1",
"cluster_name": "cluster1",
"enabled": True,
"id": "1",
"policy_id": "1",
"policy_name": "policy1",
"policy_type": "senlin.policy.deletion-1.0"
}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies', '1']),
json={
"cluster_policy": cluster_policy})
])
policy = self.cloud.get_policy_on_cluster('1', '1')
self.assertEqual(policy['cluster_id'], '1')
self.assert_calls()
def test_get_policy_on_cluster_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies', 'no-policy']),
status_code=404)
])
p = self.cloud.get_policy_on_cluster('1', 'no-policy')
self.assertFalse(p)
self.assert_calls()
def test_update_policy_on_cluster(self):
policy = {
'policy_id': '1',
'enabled': 'true'
}
updated_cluster = copy.copy(NEW_CLUSTERING_DICT)
updated_cluster['policies'] = policy
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1']),
json={
"cluster": NEW_CLUSTERING_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies',
'1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'actions']),
json={'policies': []})
])
cluster = self.cloud.get_cluster_by_id('1')
policy = self.cloud.get_cluster_policy_by_id('1')
p = self.cloud.update_policy_on_cluster(cluster, policy, True)
self.assertTrue(p)
self.assert_calls()
def test_get_policy_on_cluster(self):
cluster_policy = {
'cluster_id': '1',
'cluster_name': 'cluster1',
'enabled': 'true',
'id': '1',
'policy_id': '1',
'policy_name': 'policy1',
'policy_type': 'type'
}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters', '1',
'policies', '1']),
json={
"cluster_policy": cluster_policy})
])
get_policy = self.cloud.get_policy_on_cluster('1', '1')
self.assertEqual(get_policy, cluster_policy)
self.assert_calls()
def test_create_cluster_profile(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json={'profile': NEW_PROFILE_DICT})
])
p = self.cloud.create_cluster_profile('fake-profile-name', {})
self.assertEqual(NEW_PROFILE_DICT, p)
self.assert_calls()
def test_create_cluster_profile_exception(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
status_code=500)
])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating profile fake-profile-name.*"):
self.cloud.create_cluster_profile('fake-profile-name', {})
self.assert_calls()
def test_list_cluster_profiles(self):
profiles = {'profiles': [NEW_PROFILE_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles']),
json=profiles)
])
p = self.cloud.list_cluster_profiles()
self.assertIsInstance(p, list)
self.assertAreInstances(p, dict)
self.assert_calls()
def test_get_cluster_profile_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profile": NEW_PROFILE_DICT})
])
p = self.cloud.get_cluster_profile_by_id('1')
self.assertEqual(p['id'], '1')
self.assert_calls()
def test_get_cluster_profile_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles',
'no-profile']),
status_code=404)
])
p = self.cloud.get_cluster_profile_by_id('no-profile')
self.assertFalse(p)
self.assert_calls()
def test_update_cluster_profile(self):
new_name = "new-name"
updated_profile = copy.copy(NEW_PROFILE_DICT)
updated_profile['name'] = new_name
self.register_uris([
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json=updated_profile,
)
])
p = self.cloud.update_cluster_profile('1', new_name=new_name)
self.assertEqual(updated_profile, p)
self.assert_calls()
def test_delete_cluster_profile(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json={
"profile": NEW_PROFILE_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json={}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'profiles', '1']),
json=NEW_PROFILE_DICT)
])
profile = self.cloud.get_cluster_profile_by_id('1')
self.assertTrue(self.cloud.delete_cluster_profile(profile))
self.assert_calls()
def test_create_cluster_policy(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies']),
json={'policy': NEW_POLICY_DICT})
])
p = self.cloud.create_cluster_policy('fake-policy-name', {})
self.assertEqual(NEW_POLICY_DICT, p)
self.assert_calls()
def test_create_cluster_policy_exception(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies']),
status_code=500)
])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating policy fake-policy-name.*"):
self.cloud.create_cluster_policy('fake-policy-name', {})
self.assert_calls()
def test_list_cluster_policies(self):
policies = {'policies': [NEW_POLICY_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies']),
json=policies)
])
p = self.cloud.list_cluster_policies()
self.assertIsInstance(p, list)
self.assertAreInstances(p, dict)
self.assert_calls()
def test_get_cluster_policy_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT})
])
p = self.cloud.get_cluster_policy_by_id('1')
self.assertEqual(p['id'], '1')
self.assert_calls()
def test_get_cluster_policy_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies',
'no-policy']),
status_code=404)
])
p = self.cloud.get_cluster_policy_by_id('no-policy')
self.assertFalse(p)
self.assert_calls()
def test_update_cluster_policy(self):
new_name = "new-name"
updated_policy = copy.copy(NEW_POLICY_DICT)
updated_policy['name'] = new_name
self.register_uris([
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json=updated_policy,
)
])
p = self.cloud.update_cluster_policy('1', new_name=new_name)
self.assertEqual(updated_policy, p)
self.assert_calls()
def test_delete_cluster_policy(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json={
"policy": NEW_POLICY_DICT}),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'clusters']),
json={}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'policies', '1']),
json=NEW_POLICY_DICT)
])
self.assertTrue(self.cloud.delete_cluster_policy('1'))
self.assert_calls()
def test_create_cluster_receiver(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json={'receiver': NEW_RECEIVER_DICT})
])
r = self.cloud.create_cluster_receiver('fake-receiver-name', {})
self.assertEqual(NEW_RECEIVER_DICT, r)
self.assert_calls()
def test_create_cluster_receiver_exception(self):
self.register_uris([
dict(method='POST',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
status_code=500)
])
with testtools.ExpectedException(
shade.exc.OpenStackCloudHTTPError,
"Error creating receiver fake-receiver-name.*"):
self.cloud.create_cluster_receiver('fake-receiver-name', {})
self.assert_calls()
def test_list_cluster_receivers(self):
receivers = {'receivers': [NEW_RECEIVER_DICT]}
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json=receivers)
])
r = self.cloud.list_cluster_receivers()
self.assertIsInstance(r, list)
self.assertAreInstances(r, dict)
self.assert_calls()
def test_get_cluster_receiver_by_id(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json={
"receiver": NEW_RECEIVER_DICT})
])
r = self.cloud.get_cluster_receiver_by_id('1')
self.assertEqual(r['id'], '1')
self.assert_calls()
def test_get_cluster_receiver_not_found_returns_false(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers',
'no-receiver']),
json={'receivers': []})
])
p = self.cloud.get_cluster_receiver_by_id('no-receiver')
self.assertFalse(p)
self.assert_calls()
def test_update_cluster_receiver(self):
new_name = "new-name"
updated_receiver = copy.copy(NEW_RECEIVER_DICT)
updated_receiver['name'] = new_name
self.register_uris([
dict(method='PATCH',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json=updated_receiver,
)
])
r = self.cloud.update_cluster_receiver('1', new_name=new_name)
self.assertEqual(updated_receiver, r)
self.assert_calls()
def test_delete_cluster_receiver(self):
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers']),
json={
"receivers": [NEW_RECEIVER_DICT]}),
dict(method='DELETE',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json=NEW_RECEIVER_DICT),
dict(method='GET',
uri=self.get_mock_url(
'clustering', 'public', append=['v1', 'receivers', '1']),
json={}),
])
self.assertTrue(self.cloud.delete_cluster_receiver('1', wait=True))
self.assert_calls()