From c6177a472b2b7ce2a63e50634833eceb9b8fd237 Mon Sep 17 00:00:00 2001 From: Artem Goncharov Date: Thu, 20 May 2021 10:18:06 +0200 Subject: [PATCH] Drop cloud layer methods for Senlin Rework of the cloud layer showed this was not working properly and tests even verifying wrong things. Due to the big amount of effort required to fix things and most likely nobody using it (this code is not really usable now) drop it (comment-out). python-senlinclient is using direclty proxy layer (correct), Ansible modules for senlin not existing - no direct negative impact is visible. If anybody is interested in getting it back - the code is there and can be uncommented and fixed. Change-Id: I198eed8454fd8ae2ce8fbd70e3562c9f15db7445 --- openstack/cloud/_clustering.py | 1019 +++++++------- openstack/tests/unit/cloud/test_clustering.py | 1238 +++++++++-------- ...p-senlin-cloud-layer-c06d496acc70b014.yaml | 4 + 3 files changed, 1107 insertions(+), 1154 deletions(-) create mode 100644 releasenotes/notes/drop-senlin-cloud-layer-c06d496acc70b014.yaml diff --git a/openstack/cloud/_clustering.py b/openstack/cloud/_clustering.py index b7aa60b01..05cb0a214 100644 --- a/openstack/cloud/_clustering.py +++ b/openstack/cloud/_clustering.py @@ -16,12 +16,10 @@ import types # noqa from openstack.cloud import _normalize -from openstack.cloud import _utils -from openstack.cloud import exc -from openstack import utils class ClusteringCloudMixin(_normalize.Normalizer): + pass @property def _clustering_client(self): @@ -31,537 +29,484 @@ class ClusteringCloudMixin(_normalize.Normalizer): self._raw_clients['clustering'] = clustering_client return self._raw_clients['clustering'] - def create_cluster(self, name, profile, config=None, desired_capacity=0, - max_size=None, metadata=None, min_size=None, - timeout=None): - profile = self.get_cluster_profile(profile) - profile_id = profile['id'] - body = { - 'desired_capacity': desired_capacity, - 'name': name, - 'profile_id': profile_id - } - - if config is not None: - body['config'] = config - - if max_size is not None: - body['max_size'] = max_size - - if metadata is not None: - body['metadata'] = metadata - - if min_size is not None: - body['min_size'] = min_size - - if timeout is not None: - body['timeout'] = timeout - - data = self._clustering_client.post( - '/clusters', json={'cluster': body}, - error_message="Error creating cluster {name}".format(name=name)) - - return self._get_and_munchify(key=None, data=data) - - def set_cluster_metadata(self, name_or_id, metadata): - cluster = self.get_cluster(name_or_id) - if not cluster: - raise exc.OpenStackCloudException( - 'Invalid Cluster {cluster}'.format(cluster=name_or_id)) - - self._clustering_client.post( - '/clusters/{cluster_id}/metadata'.format(cluster_id=cluster['id']), - json={'metadata': metadata}, - error_message='Error updating cluster metadata') - - def get_cluster_by_id(self, cluster_id): - try: - data = self._clustering_client.get( - "/clusters/{cluster_id}".format(cluster_id=cluster_id), - error_message="Error fetching cluster {name}".format( - name=cluster_id)) - return self._get_and_munchify('cluster', data) - except Exception: - return None - - def get_cluster(self, name_or_id, filters=None): - return _utils._get_entity( - cloud=self, resource='cluster', - name_or_id=name_or_id, filters=filters) - - def update_cluster(self, name_or_id, new_name=None, - profile_name_or_id=None, config=None, metadata=None, - timeout=None, profile_only=False): - old_cluster = self.get_cluster(name_or_id) - if old_cluster is None: - raise exc.OpenStackCloudException( - 'Invalid Cluster {cluster}'.format(cluster=name_or_id)) - cluster = { - 'profile_only': profile_only - } - - if config is not None: - cluster['config'] = config - - if metadata is not None: - cluster['metadata'] = metadata - - if profile_name_or_id is not None: - profile = self.get_cluster_profile(profile_name_or_id) - if profile is None: - raise exc.OpenStackCloudException( - 'Invalid Cluster Profile {profile}'.format( - profile=profile_name_or_id)) - cluster['profile_id'] = profile.id - - if timeout is not None: - cluster['timeout'] = timeout - - if new_name is not None: - cluster['name'] = new_name - - data = self._clustering_client.patch( - "/clusters/{cluster_id}".format(cluster_id=old_cluster['id']), - json={'cluster': cluster}, - error_message="Error updating cluster " - "{name}".format(name=name_or_id)) - - return self._get_and_munchify(key=None, data=data) - - def delete_cluster(self, name_or_id): - cluster = self.get_cluster(name_or_id) - if cluster is None: - self.log.debug("Cluster %s not found for deleting", name_or_id) - return False - - for policy in self.list_policies_on_cluster(name_or_id): - detach_policy = self.get_cluster_policy_by_id( - policy['policy_id']) - self.detach_policy_from_cluster(cluster, detach_policy) - - for receiver in self.list_cluster_receivers(): - if cluster["id"] == receiver["cluster_id"]: - self.delete_cluster_receiver(receiver["id"], wait=True) - - self._clustering_client.delete( - "/clusters/{cluster_id}".format(cluster_id=name_or_id), - error_message="Error deleting cluster {name}".format( - name=name_or_id)) - - return True - - def search_clusters(self, name_or_id=None, filters=None): - clusters = self.list_clusters() - return _utils._filter_list(clusters, name_or_id, filters) - - def list_clusters(self): - try: - data = self._clustering_client.get( - '/clusters', - error_message="Error fetching clusters") - return self._get_and_munchify('clusters', data) - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return [] - - def attach_policy_to_cluster(self, name_or_id, policy_name_or_id, - is_enabled): - cluster = self.get_cluster(name_or_id) - policy = self.get_cluster_policy(policy_name_or_id) - if cluster is None: - raise exc.OpenStackCloudException( - 'Cluster {cluster} not found for attaching'.format( - cluster=name_or_id)) - - if policy is None: - raise exc.OpenStackCloudException( - 'Policy {policy} not found for attaching'.format( - policy=policy_name_or_id)) - - body = { - 'policy_id': policy['id'], - 'enabled': is_enabled - } - - self._clustering_client.post( - "/clusters/{cluster_id}/actions".format(cluster_id=cluster['id']), - error_message="Error attaching policy {policy} to cluster " - "{cluster}".format( - policy=policy['id'], - cluster=cluster['id']), - json={'policy_attach': body}) - - return True - - def detach_policy_from_cluster( - self, name_or_id, policy_name_or_id, wait=False, timeout=3600): - cluster = self.get_cluster(name_or_id) - policy = self.get_cluster_policy(policy_name_or_id) - if cluster is None: - raise exc.OpenStackCloudException( - 'Cluster {cluster} not found for detaching'.format( - cluster=name_or_id)) - - if policy is None: - raise exc.OpenStackCloudException( - 'Policy {policy} not found for detaching'.format( - policy=policy_name_or_id)) - - body = {'policy_id': policy['id']} - self._clustering_client.post( - "/clusters/{cluster_id}/actions".format(cluster_id=cluster['id']), - error_message="Error detaching policy {policy} from cluster " - "{cluster}".format( - policy=policy['id'], - cluster=cluster['id']), - json={'policy_detach': body}) - - if not wait: - return True - - value = [] - - for count in utils.iterate_timeout( - timeout, "Timeout waiting for cluster policy to detach"): - - # TODO(bjjohnson) This logic will wait until there are no policies. - # Since we're detaching a specific policy, checking to make sure - # that policy is not in the list of policies would be better. - policy_status = self.get_cluster_by_id(cluster['id'])['policies'] - - if policy_status == value: - break - return True - - def update_policy_on_cluster(self, name_or_id, policy_name_or_id, - is_enabled): - cluster = self.get_cluster(name_or_id) - policy = self.get_cluster_policy(policy_name_or_id) - if cluster is None: - raise exc.OpenStackCloudException( - 'Cluster {cluster} not found for updating'.format( - cluster=name_or_id)) - - if policy is None: - raise exc.OpenStackCloudException( - 'Policy {policy} not found for updating'.format( - policy=policy_name_or_id)) - - body = { - 'policy_id': policy['id'], - 'enabled': is_enabled - } - self._clustering_client.post( - "/clusters/{cluster_id}/actions".format(cluster_id=cluster['id']), - error_message="Error updating policy {policy} on cluster " - "{cluster}".format( - policy=policy['id'], - cluster=cluster['id']), - json={'policy_update': body}) - - return True - - def get_policy_on_cluster(self, name_or_id, policy_name_or_id): - try: - policy = self._clustering_client.get( - "/clusters/{cluster_id}/policies/{policy_id}".format( - cluster_id=name_or_id, policy_id=policy_name_or_id), - error_message="Error fetching policy " - "{name}".format(name=policy_name_or_id)) - return self._get_and_munchify('cluster_policy', policy) - except Exception: - return False - - def list_policies_on_cluster(self, name_or_id): - endpoint = "/clusters/{cluster_id}/policies".format( - cluster_id=name_or_id) - try: - data = self._clustering_client.get( - endpoint, - error_message="Error fetching cluster policies") - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return [] - return self._get_and_munchify('cluster_policies', data) - - def create_cluster_profile(self, name, spec, metadata=None): - profile = { - 'name': name, - 'spec': spec - } - - if metadata is not None: - profile['metadata'] = metadata - - data = self._clustering_client.post( - '/profiles', json={'profile': profile}, - error_message="Error creating profile {name}".format(name=name)) - - return self._get_and_munchify('profile', data) - - def set_cluster_profile_metadata(self, name_or_id, metadata): - profile = self.get_cluster_profile(name_or_id) - if not profile: - raise exc.OpenStackCloudException( - 'Invalid Profile {profile}'.format(profile=name_or_id)) - - self._clustering_client.post( - '/profiles/{profile_id}/metadata'.format(profile_id=profile['id']), - json={'metadata': metadata}, - error_message='Error updating profile metadata') - - def search_cluster_profiles(self, name_or_id=None, filters=None): - cluster_profiles = self.list_cluster_profiles() - return _utils._filter_list(cluster_profiles, name_or_id, filters) - - def list_cluster_profiles(self): - try: - data = self._clustering_client.get( - '/profiles', - error_message="Error fetching profiles") - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return [] - return self._get_and_munchify('profiles', data) - - def get_cluster_profile_by_id(self, profile_id): - try: - data = self._clustering_client.get( - "/profiles/{profile_id}".format(profile_id=profile_id), - error_message="Error fetching profile {name}".format( - name=profile_id)) - return self._get_and_munchify('profile', data) - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return None - - def get_cluster_profile(self, name_or_id, filters=None): - return _utils._get_entity(self, 'cluster_profile', name_or_id, filters) - - def delete_cluster_profile(self, name_or_id): - profile = self.get_cluster_profile(name_or_id) - if profile is None: - self.log.debug("Profile %s not found for deleting", name_or_id) - return False - - for cluster in self.list_clusters(): - if (name_or_id, profile.id) in cluster.items(): - self.log.debug( - "Profile %s is being used by cluster %s, won't delete", - name_or_id, cluster.name) - return False - - self._clustering_client.delete( - "/profiles/{profile_id}".format(profile_id=profile['id']), - error_message="Error deleting profile " - "{name}".format(name=name_or_id)) - - return True - - def update_cluster_profile(self, name_or_id, metadata=None, new_name=None): - old_profile = self.get_cluster_profile(name_or_id) - if not old_profile: - raise exc.OpenStackCloudException( - 'Invalid Profile {profile}'.format(profile=name_or_id)) - - profile = {} - - if metadata is not None: - profile['metadata'] = metadata - - if new_name is not None: - profile['name'] = new_name - - data = self._clustering_client.patch( - "/profiles/{profile_id}".format(profile_id=old_profile.id), - json={'profile': profile}, - error_message="Error updating profile {name}".format( - name=name_or_id)) - - return self._get_and_munchify(key=None, data=data) - - def create_cluster_policy(self, name, spec): - policy = { - 'name': name, - 'spec': spec - } - - data = self._clustering_client.post( - '/policies', json={'policy': policy}, - error_message="Error creating policy {name}".format( - name=policy['name'])) - return self._get_and_munchify('policy', data) - - def search_cluster_policies(self, name_or_id=None, filters=None): - cluster_policies = self.list_cluster_policies() - return _utils._filter_list(cluster_policies, name_or_id, filters) - - def list_cluster_policies(self): - endpoint = "/policies" - try: - data = self._clustering_client.get( - endpoint, - error_message="Error fetching cluster policies") - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return [] - return self._get_and_munchify('policies', data) - - def get_cluster_policy_by_id(self, policy_id): - try: - data = self._clustering_client.get( - "/policies/{policy_id}".format(policy_id=policy_id), - error_message="Error fetching policy {name}".format( - name=policy_id)) - return self._get_and_munchify('policy', data) - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return None - - def get_cluster_policy(self, name_or_id, filters=None): - return _utils._get_entity( - self, 'cluster_policie', name_or_id, filters) - - def delete_cluster_policy(self, name_or_id): - policy = self.get_cluster_policy_by_id(name_or_id) - if policy is None: - self.log.debug("Policy %s not found for deleting", name_or_id) - return False - - for cluster in self.list_clusters(): - if (name_or_id, policy.id) in cluster.items(): - self.log.debug( - "Policy %s is being used by cluster %s, won't delete", - name_or_id, cluster.name) - return False - - self._clustering_client.delete( - "/policies/{policy_id}".format(policy_id=name_or_id), - error_message="Error deleting policy " - "{name}".format(name=name_or_id)) - - return True - - def update_cluster_policy(self, name_or_id, new_name): - old_policy = self.get_cluster_policy(name_or_id) - if not old_policy: - raise exc.OpenStackCloudException( - 'Invalid Policy {policy}'.format(policy=name_or_id)) - policy = {'name': new_name} - - data = self._clustering_client.patch( - "/policies/{policy_id}".format(policy_id=old_policy.id), - json={'policy': policy}, - error_message="Error updating policy " - "{name}".format(name=name_or_id)) - return self._get_and_munchify(key=None, data=data) - - def create_cluster_receiver(self, name, receiver_type, - cluster_name_or_id=None, action=None, - actor=None, params=None): - cluster = self.get_cluster(cluster_name_or_id) - if cluster is None: - raise exc.OpenStackCloudException( - 'Invalid cluster {cluster}'.format(cluster=cluster_name_or_id)) - - receiver = { - 'name': name, - 'type': receiver_type - } - - if cluster_name_or_id is not None: - receiver['cluster_id'] = cluster.id - - if action is not None: - receiver['action'] = action - - if actor is not None: - receiver['actor'] = actor - - if params is not None: - receiver['params'] = params - - data = self._clustering_client.post( - '/receivers', json={'receiver': receiver}, - error_message="Error creating receiver {name}".format(name=name)) - return self._get_and_munchify('receiver', data) - - def search_cluster_receivers(self, name_or_id=None, filters=None): - cluster_receivers = self.list_cluster_receivers() - return _utils._filter_list(cluster_receivers, name_or_id, filters) - - def list_cluster_receivers(self): - try: - data = self._clustering_client.get( - '/receivers', - error_message="Error fetching receivers") - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return [] - return self._get_and_munchify('receivers', data) - - def get_cluster_receiver_by_id(self, receiver_id): - try: - data = self._clustering_client.get( - "/receivers/{receiver_id}".format(receiver_id=receiver_id), - error_message="Error fetching receiver {name}".format( - name=receiver_id)) - return self._get_and_munchify('receiver', data) - except exc.OpenStackCloudURINotFound as e: - self.log.debug(str(e), exc_info=True) - return None - - def get_cluster_receiver(self, name_or_id, filters=None): - return _utils._get_entity( - self, 'cluster_receiver', name_or_id, filters) - - def delete_cluster_receiver(self, name_or_id, wait=False, timeout=3600): - receiver = self.get_cluster_receiver(name_or_id) - if receiver is None: - self.log.debug("Receiver %s not found for deleting", name_or_id) - return False - - receiver_id = receiver['id'] - - self._clustering_client.delete( - "/receivers/{receiver_id}".format(receiver_id=receiver_id), - error_message="Error deleting receiver {name}".format( - name=name_or_id)) - - if not wait: - return True - - for count in utils.iterate_timeout( - timeout, "Timeout waiting for cluster receiver to delete"): - - receiver = self.get_cluster_receiver_by_id(receiver_id) - - if not receiver: - break - - return True - - def update_cluster_receiver(self, name_or_id, new_name=None, action=None, - params=None): - old_receiver = self.get_cluster_receiver(name_or_id) - if old_receiver is None: - raise exc.OpenStackCloudException( - 'Invalid receiver {receiver}'.format(receiver=name_or_id)) - - receiver = {} - - if new_name is not None: - receiver['name'] = new_name - - if action is not None: - receiver['action'] = action - - if params is not None: - receiver['params'] = params - - data = self._clustering_client.patch( - "/receivers/{receiver_id}".format(receiver_id=old_receiver.id), - json={'receiver': receiver}, - error_message="Error updating receiver {name}".format( - name=name_or_id)) - return self._get_and_munchify(key=None, data=data) +# NOTE(gtema): work on getting rid of direct API calls showed that this +# implementation never worked properly and tests in reality verifying wrong +# things. Unless someone is really interested in this piece of code this will +# be commented out and apparently dropped completely. This has no impact on the +# proxy layer. + +# def create_cluster(self, name, profile, config=None, desired_capacity=0, +# max_size=None, metadata=None, min_size=None, +# timeout=None): +# profile = self.get_cluster_profile(profile) +# profile_id = profile['id'] +# body = { +# 'desired_capacity': desired_capacity, +# 'name': name, +# 'profile_id': profile_id +# } +# +# if config is not None: +# body['config'] = config +# +# if max_size is not None: +# body['max_size'] = max_size +# +# if metadata is not None: +# body['metadata'] = metadata +# +# if min_size is not None: +# body['min_size'] = min_size +# +# if timeout is not None: +# body['timeout'] = timeout +# +# return self.clustering.create_cluster(**body) +# +# def set_cluster_metadata(self, name_or_id, metadata): +# cluster = self.get_cluster(name_or_id) +# if not cluster: +# raise exc.OpenStackCloudException( +# 'Invalid Cluster {cluster}'.format(cluster=name_or_id)) +# self.clustering.set_cluster_metadata(cluster, **metadata) +# +# def get_cluster_by_id(self, cluster_id): +# try: +# return self.get_cluster(cluster_id) +# except Exception: +# return None +# +# def get_cluster(self, name_or_id, filters=None): +# return _utils._get_entity( +# cloud=self, resource='cluster', +# name_or_id=name_or_id, filters=filters) +# +# def update_cluster(self, name_or_id, new_name=None, +# profile_name_or_id=None, config=None, metadata=None, +# timeout=None, profile_only=False): +# old_cluster = self.get_cluster(name_or_id) +# if old_cluster is None: +# raise exc.OpenStackCloudException( +# 'Invalid Cluster {cluster}'.format(cluster=name_or_id)) +# cluster = { +# 'profile_only': profile_only +# } +# +# if config is not None: +# cluster['config'] = config +# +# if metadata is not None: +# cluster['metadata'] = metadata +# +# if profile_name_or_id is not None: +# profile = self.get_cluster_profile(profile_name_or_id) +# if profile is None: +# raise exc.OpenStackCloudException( +# 'Invalid Cluster Profile {profile}'.format( +# profile=profile_name_or_id)) +# cluster['profile_id'] = profile.id +# +# if timeout is not None: +# cluster['timeout'] = timeout +# +# if new_name is not None: +# cluster['name'] = new_name +# +# return self.update_cluster(old_cluster, cluster) +# +# def delete_cluster(self, name_or_id): +# cluster = self.get_cluster(name_or_id) +# if cluster is None: +# self.log.debug("Cluster %s not found for deleting", name_or_id) +# return False +# +# for policy in self.list_policies_on_cluster(name_or_id): +# detach_policy = self.get_cluster_policy_by_id( +# policy['policy_id']) +# self.detach_policy_from_cluster(cluster, detach_policy) +# +# for receiver in self.list_cluster_receivers(): +# if cluster["id"] == receiver["cluster_id"]: +# self.delete_cluster_receiver(receiver["id"], wait=True) +# +# self.clustering.delete_cluster(cluster) +# +# return True +# +# def search_clusters(self, name_or_id=None, filters=None): +# clusters = self.list_clusters() +# return _utils._filter_list(clusters, name_or_id, filters) +# +# def list_clusters(self): +# return list(self.clustering.clusters()) +# +# def attach_policy_to_cluster( +# self, name_or_id, policy_name_or_id, is_enabled +# ): +# cluster = self.get_cluster(name_or_id) +# policy = self.get_cluster_policy(policy_name_or_id) +# if cluster is None: +# raise exc.OpenStackCloudException( +# 'Cluster {cluster} not found for attaching'.format( +# cluster=name_or_id)) +# +# if policy is None: +# raise exc.OpenStackCloudException( +# 'Policy {policy} not found for attaching'.format( +# policy=policy_name_or_id)) +# +# self.clustering.attach_policy_to_cluster(cluster, policy) +# +# return True +# +# def detach_policy_from_cluster( +# self, name_or_id, policy_name_or_id, wait=False, timeout=3600): +# cluster = self.get_cluster(name_or_id) +# policy = self.get_cluster_policy(policy_name_or_id) +# if cluster is None: +# raise exc.OpenStackCloudException( +# 'Cluster {cluster} not found for detaching'.format( +# cluster=name_or_id)) +# +# if policy is None: +# raise exc.OpenStackCloudException( +# 'Policy {policy} not found for detaching'.format( +# policy=policy_name_or_id)) +# +# self.clustering.detach_policy_from_cluster(cluster, policy) +# +# if not wait: +# return True +# +# for count in utils.iterate_timeout( +# timeout, "Timeout waiting for cluster policy to detach"): +# +# policies = self.get_cluster_by_id(cluster['id']).policies +# +# # NOTE(gtema): we iterate over all current policies and "continue" +# # if selected policy is still there +# is_present = False +# for active_policy in policies: +# if active_policy.id == policy.id: +# is_present = True +# if not is_present: +# break +# return True +# +# def update_policy_on_cluster(self, name_or_id, policy_name_or_id, +# is_enabled): +# cluster = self.get_cluster(name_or_id) +# policy = self.get_cluster_policy(policy_name_or_id) +# if cluster is None: +# raise exc.OpenStackCloudException( +# 'Cluster {cluster} not found for updating'.format( +# cluster=name_or_id)) +# +# if policy is None: +# raise exc.OpenStackCloudException( +# 'Policy {policy} not found for updating'.format( +# policy=policy_name_or_id)) +# +# self.clustering.update_cluster_policy( +# cluster, policy, is_enabled=is_enabled) +# +# return True +# +# def get_policy_on_cluster(self, name_or_id, policy_name_or_id): +# cluster = self.get_cluster(name_or_id) +# policy = self.get_policy(policy_name_or_id) +# if policy and cluster: +# return self.clustering.get_cluster_policy( +# policy, cluster) +# else: +# return False +# +# def list_policies_on_cluster(self, name_or_id): +# cluster = self.get_cluster(name_or_id) +# return list(self.clustering.cluster_policies(cluster)) +# +# def create_cluster_profile(self, name, spec, metadata=None): +# profile = { +# 'name': name, +# 'spec': spec +# } +# +# if metadata is not None: +# profile['metadata'] = metadata +# +# data = self._clustering_client.post( +# '/profiles', json={'profile': profile}, +# error_message="Error creating profile {name}".format(name=name)) +# +# return self._get_and_munchify('profile', data) +# +# def set_cluster_profile_metadata(self, name_or_id, metadata): +# profile = self.get_cluster_profile(name_or_id) +# if not profile: +# raise exc.OpenStackCloudException( +# 'Invalid Profile {profile}'.format(profile=name_or_id)) +# +# self._clustering_client.post( +# '/profiles/{profile_id}/metadata'.format(profile_id=profile['id']), +# json={'metadata': metadata}, +# error_message='Error updating profile metadata') +# +# def search_cluster_profiles(self, name_or_id=None, filters=None): +# cluster_profiles = self.list_cluster_profiles() +# return _utils._filter_list(cluster_profiles, name_or_id, filters) +# +# def list_cluster_profiles(self): +# try: +# data = self._clustering_client.get( +# '/profiles', +# error_message="Error fetching profiles") +# except exc.OpenStackCloudURINotFound as e: +# self.log.debug(str(e), exc_info=True) +# return [] +# return self._get_and_munchify('profiles', data) +# +# def get_cluster_profile_by_id(self, profile_id): +# try: +# data = self._clustering_client.get( +# "/profiles/{profile_id}".format(profile_id=profile_id), +# error_message="Error fetching profile {name}".format( +# name=profile_id)) +# return self._get_and_munchify('profile', data) +# except exc.OpenStackCloudURINotFound as e: +# self.log.debug(str(e), exc_info=True) +# return None +# +# def get_cluster_profile(self, name_or_id, filters=None): +# return _utils._get_entity(self, 'cluster_profile', name_or_id, +# filters) +# +# def delete_cluster_profile(self, name_or_id): +# profile = self.get_cluster_profile(name_or_id) +# if profile is None: +# self.log.debug("Profile %s not found for deleting", name_or_id) +# return False +# +# for cluster in self.list_clusters(): +# if (name_or_id, profile.id) in cluster.items(): +# self.log.debug( +# "Profile %s is being used by cluster %s, won't delete", +# name_or_id, cluster.name) +# return False +# +# self._clustering_client.delete( +# "/profiles/{profile_id}".format(profile_id=profile['id']), +# error_message="Error deleting profile " +# "{name}".format(name=name_or_id)) +# +# return True +# +# def update_cluster_profile( +# self, name_or_id, metadata=None, new_name=None +# ): +# old_profile = self.get_cluster_profile(name_or_id) +# if not old_profile: +# raise exc.OpenStackCloudException( +# 'Invalid Profile {profile}'.format(profile=name_or_id)) +# +# profile = {} +# +# if metadata is not None: +# profile['metadata'] = metadata +# +# if new_name is not None: +# profile['name'] = new_name +# +# data = self._clustering_client.patch( +# "/profiles/{profile_id}".format(profile_id=old_profile.id), +# json={'profile': profile}, +# error_message="Error updating profile {name}".format( +# name=name_or_id)) +# +# return self._get_and_munchify(key=None, data=data) +# +# def create_cluster_policy(self, name, spec): +# policy = { +# 'name': name, +# 'spec': spec +# } +# +# data = self._clustering_client.post( +# '/policies', json={'policy': policy}, +# error_message="Error creating policy {name}".format( +# name=policy['name'])) +# return self._get_and_munchify('policy', data) +# +# def search_cluster_policies(self, name_or_id=None, filters=None): +# cluster_policies = self.list_cluster_policies() +# return _utils._filter_list(cluster_policies, name_or_id, filters) +# +# def list_cluster_policies(self): +# endpoint = "/policies" +# try: +# data = self._clustering_client.get( +# endpoint, +# error_message="Error fetching cluster policies") +# except exc.OpenStackCloudURINotFound as e: +# self.log.debug(str(e), exc_info=True) +# return [] +# return self._get_and_munchify('policies', data) +# +# def get_cluster_policy_by_id(self, policy_id): +# try: +# data = self._clustering_client.get( +# "/policies/{policy_id}".format(policy_id=policy_id), +# error_message="Error fetching policy {name}".format( +# name=policy_id)) +# return self._get_and_munchify('policy', data) +# except exc.OpenStackCloudURINotFound as e: +# self.log.debug(str(e), exc_info=True) +# return None +# +# def get_cluster_policy(self, name_or_id, filters=None): +# return _utils._get_entity( +# self, 'cluster_policie', name_or_id, filters) +# +# def delete_cluster_policy(self, name_or_id): +# policy = self.get_cluster_policy_by_id(name_or_id) +# if policy is None: +# self.log.debug("Policy %s not found for deleting", name_or_id) +# return False +# +# for cluster in self.list_clusters(): +# if (name_or_id, policy.id) in cluster.items(): +# self.log.debug( +# "Policy %s is being used by cluster %s, won't delete", +# name_or_id, cluster.name) +# return False +# +# self._clustering_client.delete( +# "/policies/{policy_id}".format(policy_id=name_or_id), +# error_message="Error deleting policy " +# "{name}".format(name=name_or_id)) +# +# return True +# +# def update_cluster_policy(self, name_or_id, new_name): +# old_policy = self.get_cluster_policy(name_or_id) +# if not old_policy: +# raise exc.OpenStackCloudException( +# 'Invalid Policy {policy}'.format(policy=name_or_id)) +# policy = {'name': new_name} +# +# data = self._clustering_client.patch( +# "/policies/{policy_id}".format(policy_id=old_policy.id), +# json={'policy': policy}, +# error_message="Error updating policy " +# "{name}".format(name=name_or_id)) +# return self._get_and_munchify(key=None, data=data) +# +# def create_cluster_receiver(self, name, receiver_type, +# cluster_name_or_id=None, action=None, +# actor=None, params=None): +# cluster = self.get_cluster(cluster_name_or_id) +# if cluster is None: +# raise exc.OpenStackCloudException( +# 'Invalid cluster {cluster}'.format( +# cluster=cluster_name_or_id)) +# +# receiver = { +# 'name': name, +# 'type': receiver_type +# } +# +# if cluster_name_or_id is not None: +# receiver['cluster_id'] = cluster.id +# +# if action is not None: +# receiver['action'] = action +# +# if actor is not None: +# receiver['actor'] = actor +# +# if params is not None: +# receiver['params'] = params +# +# data = self._clustering_client.post( +# '/receivers', json={'receiver': receiver}, +# error_message="Error creating receiver {name}".format(name=name)) +# return self._get_and_munchify('receiver', data) +# +# def search_cluster_receivers(self, name_or_id=None, filters=None): +# cluster_receivers = self.list_cluster_receivers() +# return _utils._filter_list(cluster_receivers, name_or_id, filters) +# +# def list_cluster_receivers(self): +# try: +# data = self._clustering_client.get( +# '/receivers', +# error_message="Error fetching receivers") +# except exc.OpenStackCloudURINotFound as e: +# self.log.debug(str(e), exc_info=True) +# return [] +# return self._get_and_munchify('receivers', data) +# +# def get_cluster_receiver_by_id(self, receiver_id): +# try: +# data = self._clustering_client.get( +# "/receivers/{receiver_id}".format(receiver_id=receiver_id), +# error_message="Error fetching receiver {name}".format( +# name=receiver_id)) +# return self._get_and_munchify('receiver', data) +# except exc.OpenStackCloudURINotFound as e: +# self.log.debug(str(e), exc_info=True) +# return None +# +# def get_cluster_receiver(self, name_or_id, filters=None): +# return _utils._get_entity( +# self, 'cluster_receiver', name_or_id, filters) +# +# def delete_cluster_receiver(self, name_or_id, wait=False, timeout=3600): +# receiver = self.get_cluster_receiver(name_or_id) +# if receiver is None: +# self.log.debug("Receiver %s not found for deleting", name_or_id) +# return False +# +# receiver_id = receiver['id'] +# +# self._clustering_client.delete( +# "/receivers/{receiver_id}".format(receiver_id=receiver_id), +# error_message="Error deleting receiver {name}".format( +# name=name_or_id)) +# +# if not wait: +# return True +# +# for count in utils.iterate_timeout( +# timeout, "Timeout waiting for cluster receiver to delete"): +# +# receiver = self.get_cluster_receiver_by_id(receiver_id) +# +# if not receiver: +# break +# +# return True +# +# def update_cluster_receiver(self, name_or_id, new_name=None, action=None, +# params=None): +# old_receiver = self.get_cluster_receiver(name_or_id) +# if old_receiver is None: +# raise exc.OpenStackCloudException( +# 'Invalid receiver {receiver}'.format(receiver=name_or_id)) +# +# receiver = {} +# +# if new_name is not None: +# receiver['name'] = new_name +# +# if action is not None: +# receiver['action'] = action +# +# if params is not None: +# receiver['params'] = params +# +# data = self._clustering_client.patch( +# "/receivers/{receiver_id}".format(receiver_id=old_receiver.id), +# json={'receiver': receiver}, +# error_message="Error updating receiver {name}".format( +# name=name_or_id)) +# return self._get_and_munchify(key=None, data=data) diff --git a/openstack/tests/unit/cloud/test_clustering.py b/openstack/tests/unit/cloud/test_clustering.py index 89b6c7326..062e62861 100644 --- a/openstack/tests/unit/cloud/test_clustering.py +++ b/openstack/tests/unit/cloud/test_clustering.py @@ -11,9 +11,8 @@ # under the License. import copy -import testtools -from openstack.cloud import exc +from openstack.clustering.v1 import cluster from openstack.tests.unit import base @@ -21,7 +20,7 @@ CLUSTERING_DICT = { 'name': 'fake-name', 'profile_id': '1', 'desired_capacity': 1, - 'config': 'fake-config', + 'config': {'a': 'b'}, 'max_size': 1, 'min_size': 1, 'timeout': 100, @@ -63,621 +62,626 @@ class TestClustering(base.TestCase): for e in elements: self.assertIsInstance(e, elem_type) + def _compare_clusters(self, exp, real): + self.assertEqual( + cluster.Cluster(**exp).to_dict(computed=False), + real.to_dict(computed=False)) + def setUp(self): super(TestClustering, self).setUp() self.use_senlin() - def test_create_cluster(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles', '1']), - json={ - "profiles": [NEW_PROFILE_DICT]}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles']), - json={ - "profiles": [NEW_PROFILE_DICT]}), - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - json=NEW_CLUSTERING_DICT) - ]) - profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id']) - c = self.cloud.create_cluster( - name=CLUSTERING_DICT['name'], - desired_capacity=CLUSTERING_DICT['desired_capacity'], - profile=profile, - config=CLUSTERING_DICT['config'], - max_size=CLUSTERING_DICT['max_size'], - min_size=CLUSTERING_DICT['min_size'], - metadata=CLUSTERING_DICT['metadata'], - timeout=CLUSTERING_DICT['timeout']) - - self.assertEqual(NEW_CLUSTERING_DICT, c) - self.assert_calls() - - def test_create_cluster_exception(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles', '1']), - json={ - "profiles": [NEW_PROFILE_DICT]}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles']), - json={ - "profiles": [NEW_PROFILE_DICT]}), - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - status_code=500) - ]) - profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id']) - with testtools.ExpectedException( - exc.OpenStackCloudHTTPError, - "Error creating cluster fake-name.*"): - self.cloud.create_cluster(name='fake-name', profile=profile) - self.assert_calls() - - def test_get_cluster_by_id(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json={ - "cluster": NEW_CLUSTERING_DICT}) - ]) - cluster = self.cloud.get_cluster_by_id('1') - self.assertEqual(cluster['id'], '1') - self.assert_calls() - - def test_get_cluster_not_found_returns_false(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', - 'no-cluster']), - status_code=404) - ]) - c = self.cloud.get_cluster_by_id('no-cluster') - self.assertFalse(c) - self.assert_calls() - - def test_update_cluster(self): - new_max_size = 5 - updated_cluster = copy.copy(NEW_CLUSTERING_DICT) - updated_cluster['max_size'] = new_max_size - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json={ - "cluster": NEW_CLUSTERING_DICT}), - dict(method='PATCH', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json=updated_cluster, - ) - ]) - cluster = self.cloud.get_cluster_by_id('1') - c = self.cloud.update_cluster(cluster, new_max_size) - self.assertEqual(updated_cluster, c) - self.assert_calls() - - def test_delete_cluster(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - json={ - "clusters": [NEW_CLUSTERING_DICT]}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1', - 'policies']), - json={"cluster_policies": []}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers']), - json={"receivers": []}), - dict(method='DELETE', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json=NEW_CLUSTERING_DICT) - ]) - self.assertTrue(self.cloud.delete_cluster('1')) - self.assert_calls() - - def test_list_clusters(self): - clusters = {'clusters': [NEW_CLUSTERING_DICT]} - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - json=clusters) - ]) - c = self.cloud.list_clusters() - - self.assertIsInstance(c, list) - self.assertAreInstances(c, dict) - - self.assert_calls() - - def test_attach_policy_to_cluster(self): - policy = { - 'policy_id': '1', - 'enabled': 'true' - } - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json={ - "cluster": NEW_CLUSTERING_DICT}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', '1']), - json={ - "policy": NEW_POLICY_DICT}), - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1', - 'actions']), - json={'policy_attach': policy}) - ]) - cluster = self.cloud.get_cluster_by_id('1') - policy = self.cloud.get_cluster_policy_by_id('1') - p = self.cloud.attach_policy_to_cluster(cluster, policy, 'true') - self.assertTrue(p) - self.assert_calls() - - def test_detach_policy_from_cluster(self): - updated_cluster = copy.copy(NEW_CLUSTERING_DICT) - updated_cluster['policies'] = ['1'] - detached_cluster = copy.copy(NEW_CLUSTERING_DICT) - detached_cluster['policies'] = [] - - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json={ - "cluster": NEW_CLUSTERING_DICT}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', '1']), - json={ - "policy": NEW_POLICY_DICT}), - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1', - 'actions']), - json={'policy_detach': {'policy_id': '1'}}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json={ - "cluster": updated_cluster}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json={ - "cluster": detached_cluster}), - ]) - cluster = self.cloud.get_cluster_by_id('1') - policy = self.cloud.get_cluster_policy_by_id('1') - p = self.cloud.detach_policy_from_cluster(cluster, policy, wait=True) - self.assertTrue(p) - self.assert_calls() - - def test_get_policy_on_cluster_by_id(self): - cluster_policy = { - "cluster_id": "1", - "cluster_name": "cluster1", - "enabled": True, - "id": "1", - "policy_id": "1", - "policy_name": "policy1", - "policy_type": "senlin.policy.deletion-1.0" - } - - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1', - 'policies', '1']), - json={ - "cluster_policy": cluster_policy}) - ]) - policy = self.cloud.get_policy_on_cluster('1', '1') - self.assertEqual(policy['cluster_id'], '1') - self.assert_calls() - - def test_get_policy_on_cluster_not_found_returns_false(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1', - 'policies', 'no-policy']), - status_code=404) - ]) - p = self.cloud.get_policy_on_cluster('1', 'no-policy') - self.assertFalse(p) - self.assert_calls() - - def test_update_policy_on_cluster(self): - policy = { - 'policy_id': '1', - 'enabled': 'true' - } - updated_cluster = copy.copy(NEW_CLUSTERING_DICT) - updated_cluster['policies'] = policy - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1']), - json={ - "cluster": NEW_CLUSTERING_DICT}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', - '1']), - json={ - "policy": NEW_POLICY_DICT}), - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1', - 'actions']), - json={'policies': []}) - ]) - cluster = self.cloud.get_cluster_by_id('1') - policy = self.cloud.get_cluster_policy_by_id('1') - p = self.cloud.update_policy_on_cluster(cluster, policy, True) - self.assertTrue(p) - self.assert_calls() - - def test_get_policy_on_cluster(self): - cluster_policy = { - 'cluster_id': '1', - 'cluster_name': 'cluster1', - 'enabled': 'true', - 'id': '1', - 'policy_id': '1', - 'policy_name': 'policy1', - 'policy_type': 'type' - } - - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters', '1', - 'policies', '1']), - json={ - "cluster_policy": cluster_policy}) - ]) - get_policy = self.cloud.get_policy_on_cluster('1', '1') - self.assertEqual(get_policy, cluster_policy) - self.assert_calls() - - def test_create_cluster_profile(self): - self.register_uris([ - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles']), - json={'profile': NEW_PROFILE_DICT}) - ]) - p = self.cloud.create_cluster_profile('fake-profile-name', {}) - - self.assertEqual(NEW_PROFILE_DICT, p) - self.assert_calls() - - def test_create_cluster_profile_exception(self): - self.register_uris([ - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles']), - status_code=500) - ]) - with testtools.ExpectedException( - exc.OpenStackCloudHTTPError, - "Error creating profile fake-profile-name.*"): - self.cloud.create_cluster_profile('fake-profile-name', {}) - self.assert_calls() - - def test_list_cluster_profiles(self): - profiles = {'profiles': [NEW_PROFILE_DICT]} - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles']), - json=profiles) - ]) - p = self.cloud.list_cluster_profiles() - - self.assertIsInstance(p, list) - self.assertAreInstances(p, dict) - - self.assert_calls() - - def test_get_cluster_profile_by_id(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles', '1']), - json={ - "profile": NEW_PROFILE_DICT}) - ]) - p = self.cloud.get_cluster_profile_by_id('1') - self.assertEqual(p['id'], '1') - self.assert_calls() - - def test_get_cluster_profile_not_found_returns_false(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles', - 'no-profile']), - status_code=404) - ]) - p = self.cloud.get_cluster_profile_by_id('no-profile') - self.assertFalse(p) - self.assert_calls() - - def test_update_cluster_profile(self): - new_name = "new-name" - updated_profile = copy.copy(NEW_PROFILE_DICT) - updated_profile['name'] = new_name - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles']), - json={ - "profiles": [NEW_PROFILE_DICT]}), - dict(method='PATCH', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles', '1']), - json=updated_profile, - ) - ]) - p = self.cloud.update_cluster_profile('1', new_name=new_name) - self.assertEqual(updated_profile, p) - self.assert_calls() - - def test_delete_cluster_profile(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles', '1']), - json={ - "profile": NEW_PROFILE_DICT}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - json={}), - dict(method='DELETE', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'profiles', '1']), - json=NEW_PROFILE_DICT) - ]) - profile = self.cloud.get_cluster_profile_by_id('1') - self.assertTrue(self.cloud.delete_cluster_profile(profile)) - self.assert_calls() - - def test_create_cluster_policy(self): - self.register_uris([ - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies']), - json={'policy': NEW_POLICY_DICT}) - ]) - p = self.cloud.create_cluster_policy('fake-policy-name', {}) - - self.assertEqual(NEW_POLICY_DICT, p) - self.assert_calls() - - def test_create_cluster_policy_exception(self): - self.register_uris([ - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies']), - status_code=500) - ]) - with testtools.ExpectedException( - exc.OpenStackCloudHTTPError, - "Error creating policy fake-policy-name.*"): - self.cloud.create_cluster_policy('fake-policy-name', {}) - self.assert_calls() - - def test_list_cluster_policies(self): - policies = {'policies': [NEW_POLICY_DICT]} - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies']), - json=policies) - ]) - p = self.cloud.list_cluster_policies() - - self.assertIsInstance(p, list) - self.assertAreInstances(p, dict) - - self.assert_calls() - - def test_get_cluster_policy_by_id(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', '1']), - json={ - "policy": NEW_POLICY_DICT}) - ]) - p = self.cloud.get_cluster_policy_by_id('1') - self.assertEqual(p['id'], '1') - self.assert_calls() - - def test_get_cluster_policy_not_found_returns_false(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', - 'no-policy']), - status_code=404) - ]) - p = self.cloud.get_cluster_policy_by_id('no-policy') - self.assertFalse(p) - self.assert_calls() - - def test_update_cluster_policy(self): - new_name = "new-name" - updated_policy = copy.copy(NEW_POLICY_DICT) - updated_policy['name'] = new_name - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies']), - json={ - "policies": [NEW_POLICY_DICT]}), - dict(method='PATCH', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', '1']), - json=updated_policy, - ) - ]) - p = self.cloud.update_cluster_policy('1', new_name=new_name) - self.assertEqual(updated_policy, p) - self.assert_calls() - - def test_delete_cluster_policy(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', '1']), - json={ - "policy": NEW_POLICY_DICT}), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - json={}), - dict(method='DELETE', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'policies', '1']), - json=NEW_POLICY_DICT) - ]) - self.assertTrue(self.cloud.delete_cluster_policy('1')) - self.assert_calls() - - def test_create_cluster_receiver(self): - clusters = {'clusters': [NEW_CLUSTERING_DICT]} - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - json=clusters), - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers']), - json={'receiver': NEW_RECEIVER_DICT}) - ]) - r = self.cloud.create_cluster_receiver('fake-receiver-name', {}) - - self.assertEqual(NEW_RECEIVER_DICT, r) - self.assert_calls() - - def test_create_cluster_receiver_exception(self): - clusters = {'clusters': [NEW_CLUSTERING_DICT]} - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'clusters']), - json=clusters), - dict(method='POST', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers']), - status_code=500), - ]) - with testtools.ExpectedException( - exc.OpenStackCloudHTTPError, - "Error creating receiver fake-receiver-name.*"): - self.cloud.create_cluster_receiver('fake-receiver-name', {}) - self.assert_calls() - - def test_list_cluster_receivers(self): - receivers = {'receivers': [NEW_RECEIVER_DICT]} - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers']), - json=receivers) - ]) - r = self.cloud.list_cluster_receivers() - - self.assertIsInstance(r, list) - self.assertAreInstances(r, dict) - - self.assert_calls() - - def test_get_cluster_receiver_by_id(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers', '1']), - json={ - "receiver": NEW_RECEIVER_DICT}) - ]) - r = self.cloud.get_cluster_receiver_by_id('1') - self.assertEqual(r['id'], '1') - self.assert_calls() - - def test_get_cluster_receiver_not_found_returns_false(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers', - 'no-receiver']), - json={'receivers': []}) - ]) - p = self.cloud.get_cluster_receiver_by_id('no-receiver') - self.assertFalse(p) - self.assert_calls() - - def test_update_cluster_receiver(self): - new_name = "new-name" - updated_receiver = copy.copy(NEW_RECEIVER_DICT) - updated_receiver['name'] = new_name - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers']), - json={ - "receivers": [NEW_RECEIVER_DICT]}), - dict(method='PATCH', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers', '1']), - json=updated_receiver, - ) - ]) - r = self.cloud.update_cluster_receiver('1', new_name=new_name) - self.assertEqual(updated_receiver, r) - self.assert_calls() - - def test_delete_cluster_receiver(self): - self.register_uris([ - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers']), - json={ - "receivers": [NEW_RECEIVER_DICT]}), - dict(method='DELETE', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers', '1']), - json=NEW_RECEIVER_DICT), - dict(method='GET', - uri=self.get_mock_url( - 'clustering', 'public', append=['v1', 'receivers', '1']), - json={}), - ]) - self.assertTrue(self.cloud.delete_cluster_receiver('1', wait=True)) - self.assert_calls() +# def test_create_cluster(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles', '1']), +# json={ +# "profiles": [NEW_PROFILE_DICT]}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles']), +# json={ +# "profiles": [NEW_PROFILE_DICT]}), +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# json=NEW_CLUSTERING_DICT) +# ]) +# profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id']) +# c = self.cloud.create_cluster( +# name=CLUSTERING_DICT['name'], +# desired_capacity=CLUSTERING_DICT['desired_capacity'], +# profile=profile, +# config=CLUSTERING_DICT['config'], +# max_size=CLUSTERING_DICT['max_size'], +# min_size=CLUSTERING_DICT['min_size'], +# metadata=CLUSTERING_DICT['metadata'], +# timeout=CLUSTERING_DICT['timeout']) +# +# self._compare_clusters(NEW_CLUSTERING_DICT, c) +# self.assert_calls() +# +# def test_create_cluster_exception(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles', '1']), +# json={ +# "profiles": [NEW_PROFILE_DICT]}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles']), +# json={ +# "profiles": [NEW_PROFILE_DICT]}), +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# status_code=500) +# ]) +# profile = self.cloud.get_cluster_profile_by_id(NEW_PROFILE_DICT['id']) +# with testtools.ExpectedException( +# exc.OpenStackCloudHTTPError): +# self.cloud.create_cluster(name='fake-name', profile=profile) +# self.assert_calls() +# +# def test_get_cluster_by_id(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json={ +# "cluster": NEW_CLUSTERING_DICT}) +# ]) +# cluster = self.cloud.get_cluster_by_id('1') +# self.assertEqual(cluster['id'], '1') +# self.assert_calls() +# +# def test_get_cluster_not_found_returns_false(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', +# 'no-cluster']), +# status_code=404) +# ]) +# c = self.cloud.get_cluster_by_id('no-cluster') +# self.assertFalse(c) +# self.assert_calls() +# +# def test_update_cluster(self): +# new_max_size = 5 +# updated_cluster = copy.copy(NEW_CLUSTERING_DICT) +# updated_cluster['max_size'] = new_max_size +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json={ +# "cluster": NEW_CLUSTERING_DICT}), +# dict(method='PATCH', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json=updated_cluster, +# ) +# ]) +# cluster = self.cloud.get_cluster_by_id('1') +# c = self.cloud.update_cluster(cluster, new_max_size) +# self.assertEqual(updated_cluster, c) +# self.assert_calls() +# +# def test_delete_cluster(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# json={ +# "clusters": [NEW_CLUSTERING_DICT]}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1', +# 'policies']), +# json={"cluster_policies": []}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers']), +# json={"receivers": []}), +# dict(method='DELETE', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json=NEW_CLUSTERING_DICT) +# ]) +# self.assertTrue(self.cloud.delete_cluster('1')) +# self.assert_calls() +# +# def test_list_clusters(self): +# clusters = {'clusters': [NEW_CLUSTERING_DICT]} +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# json=clusters) +# ]) +# c = self.cloud.list_clusters() +# +# self.assertIsInstance(c, list) +# self.assertAreInstances(c, dict) +# +# self.assert_calls() +# +# def test_attach_policy_to_cluster(self): +# policy = { +# 'policy_id': '1', +# 'enabled': 'true' +# } +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json={ +# "cluster": NEW_CLUSTERING_DICT}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', '1']), +# json={ +# "policy": NEW_POLICY_DICT}), +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1', +# 'actions']), +# json={'policy_attach': policy}) +# ]) +# cluster = self.cloud.get_cluster_by_id('1') +# policy = self.cloud.get_cluster_policy_by_id('1') +# p = self.cloud.attach_policy_to_cluster(cluster, policy, 'true') +# self.assertTrue(p) +# self.assert_calls() +# +# def test_detach_policy_from_cluster(self): +# updated_cluster = copy.copy(NEW_CLUSTERING_DICT) +# updated_cluster['policies'] = ['1'] +# detached_cluster = copy.copy(NEW_CLUSTERING_DICT) +# detached_cluster['policies'] = [] +# +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json={ +# "cluster": NEW_CLUSTERING_DICT}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', '1']), +# json={ +# "policy": NEW_POLICY_DICT}), +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1', +# 'actions']), +# json={'policy_detach': {'policy_id': '1'}}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json={ +# "cluster": updated_cluster}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json={ +# "cluster": detached_cluster}), +# ]) +# cluster = self.cloud.get_cluster_by_id('1') +# policy = self.cloud.get_cluster_policy_by_id('1') +# p = self.cloud.detach_policy_from_cluster(cluster, policy, wait=True) +# self.assertTrue(p) +# self.assert_calls() +# +# def test_get_policy_on_cluster_by_id(self): +# cluster_policy = { +# "cluster_id": "1", +# "cluster_name": "cluster1", +# "enabled": True, +# "id": "1", +# "policy_id": "1", +# "policy_name": "policy1", +# "policy_type": "senlin.policy.deletion-1.0" +# } +# +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1', +# 'policies', '1']), +# json={ +# "cluster_policy": cluster_policy}) +# ]) +# policy = self.cloud.get_policy_on_cluster('1', '1') +# self.assertEqual(policy['cluster_id'], '1') +# self.assert_calls() +# +# def test_get_policy_on_cluster_not_found_returns_false(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1', +# 'policies', +# 'no-policy']), +# status_code=404) +# ]) +# p = self.cloud.get_policy_on_cluster('1', 'no-policy') +# self.assertFalse(p) +# self.assert_calls() +# +# def test_update_policy_on_cluster(self): +# policy = { +# 'policy_id': '1', +# 'enabled': 'true' +# } +# updated_cluster = copy.copy(NEW_CLUSTERING_DICT) +# updated_cluster['policies'] = policy +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1']), +# json={ +# "cluster": NEW_CLUSTERING_DICT}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', +# '1']), +# json={ +# "policy": NEW_POLICY_DICT}), +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1', +# 'actions']), +# json={'policies': []}) +# ]) +# cluster = self.cloud.get_cluster_by_id('1') +# policy = self.cloud.get_cluster_policy_by_id('1') +# p = self.cloud.update_policy_on_cluster(cluster, policy, True) +# self.assertTrue(p) +# self.assert_calls() +# +# def test_get_policy_on_cluster(self): +# cluster_policy = { +# 'cluster_id': '1', +# 'cluster_name': 'cluster1', +# 'enabled': 'true', +# 'id': '1', +# 'policy_id': '1', +# 'policy_name': 'policy1', +# 'policy_type': 'type' +# } +# +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters', '1', +# 'policies', '1']), +# json={ +# "cluster_policy": cluster_policy}) +# ]) +# get_policy = self.cloud.get_policy_on_cluster('1', '1') +# self.assertEqual(get_policy, cluster_policy) +# self.assert_calls() +# +# def test_create_cluster_profile(self): +# self.register_uris([ +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles']), +# json={'profile': NEW_PROFILE_DICT}) +# ]) +# p = self.cloud.create_cluster_profile('fake-profile-name', {}) +# +# self.assertEqual(NEW_PROFILE_DICT, p) +# self.assert_calls() +# +# def test_create_cluster_profile_exception(self): +# self.register_uris([ +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles']), +# status_code=500) +# ]) +# with testtools.ExpectedException( +# exc.OpenStackCloudHTTPError, +# "Error creating profile fake-profile-name.*"): +# self.cloud.create_cluster_profile('fake-profile-name', {}) +# self.assert_calls() +# +# def test_list_cluster_profiles(self): +# profiles = {'profiles': [NEW_PROFILE_DICT]} +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles']), +# json=profiles) +# ]) +# p = self.cloud.list_cluster_profiles() +# +# self.assertIsInstance(p, list) +# self.assertAreInstances(p, dict) +# +# self.assert_calls() +# +# def test_get_cluster_profile_by_id(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles', '1']), +# json={ +# "profile": NEW_PROFILE_DICT}) +# ]) +# p = self.cloud.get_cluster_profile_by_id('1') +# self.assertEqual(p['id'], '1') +# self.assert_calls() +# +# def test_get_cluster_profile_not_found_returns_false(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles', +# 'no-profile']), +# status_code=404) +# ]) +# p = self.cloud.get_cluster_profile_by_id('no-profile') +# self.assertFalse(p) +# self.assert_calls() +# +# def test_update_cluster_profile(self): +# new_name = "new-name" +# updated_profile = copy.copy(NEW_PROFILE_DICT) +# updated_profile['name'] = new_name +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles']), +# json={ +# "profiles": [NEW_PROFILE_DICT]}), +# dict(method='PATCH', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles', '1']), +# json=updated_profile, +# ) +# ]) +# p = self.cloud.update_cluster_profile('1', new_name=new_name) +# self.assertEqual(updated_profile, p) +# self.assert_calls() +# +# def test_delete_cluster_profile(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles', '1']), +# json={ +# "profile": NEW_PROFILE_DICT}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# json={'clusters': [{'cluster': CLUSTERING_DICT}]}), +# dict(method='DELETE', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'profiles', '1']), +# json=NEW_PROFILE_DICT) +# ]) +# profile = self.cloud.get_cluster_profile_by_id('1') +# self.assertTrue(self.cloud.delete_cluster_profile(profile)) +# self.assert_calls() +# +# def test_create_cluster_policy(self): +# self.register_uris([ +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies']), +# json={'policy': NEW_POLICY_DICT}) +# ]) +# p = self.cloud.create_cluster_policy('fake-policy-name', {}) +# +# self.assertEqual(NEW_POLICY_DICT, p) +# self.assert_calls() +# +# def test_create_cluster_policy_exception(self): +# self.register_uris([ +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies']), +# status_code=500) +# ]) +# with testtools.ExpectedException( +# exc.OpenStackCloudHTTPError, +# "Error creating policy fake-policy-name.*"): +# self.cloud.create_cluster_policy('fake-policy-name', {}) +# self.assert_calls() +# +# def test_list_cluster_policies(self): +# policies = {'policies': [NEW_POLICY_DICT]} +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies']), +# json=policies) +# ]) +# p = self.cloud.list_cluster_policies() +# +# self.assertIsInstance(p, list) +# self.assertAreInstances(p, dict) +# +# self.assert_calls() +# +# def test_get_cluster_policy_by_id(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', '1']), +# json={ +# "policy": NEW_POLICY_DICT}) +# ]) +# p = self.cloud.get_cluster_policy_by_id('1') +# self.assertEqual(p['id'], '1') +# self.assert_calls() +# +# def test_get_cluster_policy_not_found_returns_false(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', +# 'no-policy']), +# status_code=404) +# ]) +# p = self.cloud.get_cluster_policy_by_id('no-policy') +# self.assertFalse(p) +# self.assert_calls() +# +# def test_update_cluster_policy(self): +# new_name = "new-name" +# updated_policy = copy.copy(NEW_POLICY_DICT) +# updated_policy['name'] = new_name +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies']), +# json={ +# "policies": [NEW_POLICY_DICT]}), +# dict(method='PATCH', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', '1']), +# json=updated_policy, +# ) +# ]) +# p = self.cloud.update_cluster_policy('1', new_name=new_name) +# self.assertEqual(updated_policy, p) +# self.assert_calls() +# +# def test_delete_cluster_policy(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', '1']), +# json={ +# "policy": NEW_POLICY_DICT}), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# json={}), +# dict(method='DELETE', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'policies', '1']), +# json=NEW_POLICY_DICT) +# ]) +# self.assertTrue(self.cloud.delete_cluster_policy('1')) +# self.assert_calls() +# +# def test_create_cluster_receiver(self): +# clusters = {'clusters': [NEW_CLUSTERING_DICT]} +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# json=clusters), +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers']), +# json={'receiver': NEW_RECEIVER_DICT}) +# ]) +# r = self.cloud.create_cluster_receiver('fake-receiver-name', {}) +# +# self.assertEqual(NEW_RECEIVER_DICT, r) +# self.assert_calls() +# +# def test_create_cluster_receiver_exception(self): +# clusters = {'clusters': [NEW_CLUSTERING_DICT]} +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'clusters']), +# json=clusters), +# dict(method='POST', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers']), +# status_code=500), +# ]) +# with testtools.ExpectedException( +# exc.OpenStackCloudHTTPError, +# "Error creating receiver fake-receiver-name.*"): +# self.cloud.create_cluster_receiver('fake-receiver-name', {}) +# self.assert_calls() +# +# def test_list_cluster_receivers(self): +# receivers = {'receivers': [NEW_RECEIVER_DICT]} +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers']), +# json=receivers) +# ]) +# r = self.cloud.list_cluster_receivers() +# +# self.assertIsInstance(r, list) +# self.assertAreInstances(r, dict) +# +# self.assert_calls() +# +# def test_get_cluster_receiver_by_id(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers', '1']), +# json={ +# "receiver": NEW_RECEIVER_DICT}) +# ]) +# r = self.cloud.get_cluster_receiver_by_id('1') +# self.assertEqual(r['id'], '1') +# self.assert_calls() +# +# def test_get_cluster_receiver_not_found_returns_false(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers', +# 'no-receiver']), +# json={'receivers': []}) +# ]) +# p = self.cloud.get_cluster_receiver_by_id('no-receiver') +# self.assertFalse(p) +# self.assert_calls() +# +# def test_update_cluster_receiver(self): +# new_name = "new-name" +# updated_receiver = copy.copy(NEW_RECEIVER_DICT) +# updated_receiver['name'] = new_name +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers']), +# json={ +# "receivers": [NEW_RECEIVER_DICT]}), +# dict(method='PATCH', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers', '1']), +# json=updated_receiver, +# ) +# ]) +# r = self.cloud.update_cluster_receiver('1', new_name=new_name) +# self.assertEqual(updated_receiver, r) +# self.assert_calls() +# +# def test_delete_cluster_receiver(self): +# self.register_uris([ +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers']), +# json={ +# "receivers": [NEW_RECEIVER_DICT]}), +# dict(method='DELETE', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers', '1']), +# json=NEW_RECEIVER_DICT), +# dict(method='GET', +# uri=self.get_mock_url( +# 'clustering', 'public', append=['v1', 'receivers', '1']), +# json={}), +# ]) +# self.assertTrue(self.cloud.delete_cluster_receiver('1', wait=True)) +# self.assert_calls() diff --git a/releasenotes/notes/drop-senlin-cloud-layer-c06d496acc70b014.yaml b/releasenotes/notes/drop-senlin-cloud-layer-c06d496acc70b014.yaml new file mode 100644 index 000000000..784d97cdc --- /dev/null +++ b/releasenotes/notes/drop-senlin-cloud-layer-c06d496acc70b014.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + Cloud layer operations for Senlin service are dropped due to big amount of bugs there.