Files
senlin-tempest-plugin/senlin_tempest_plugin/common/messaging_client.py
Duc Truong ebc5a6198c Add tests for lifecycle hooks
* add API test for complete lifecycle
* change existing tests to use deletion-1.1 policy
* add functional test for deletion policy with hooks
* add integration test for deletion policy with hooks

Depends-On: I888a01c4f26959649121d6f82430017858a4c481

Change-Id: I5e2d4b0c073b515d9697cf91cd89ea0e6a3c81b8
2018-01-30 20:43:34 +00:00

72 lines
2.4 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from tempest import config
from tempest.lib.common import rest_client
CONF = config.CONF
class V2MessagingClient(rest_client.RestClient):
def __init__(self, auth_provider, service, region, **kwargs):
super(V2MessagingClient, self).__init__(
auth_provider, service, region, **kwargs)
self.uri_prefix = 'v2'
client_id = uuidutils.generate_uuid()
self.headers = {'Client-ID': client_id}
def get_resp(self, resp, body):
# Parse status code and location
res = {
'status': int(resp.pop('status')),
'location': resp.pop('location', None)
}
# Parse other keys included in resp
res.update(resp)
# Parse body
res['body'] = self._parse_resp(body)
return res
def create_queue(self, queue_name):
uri = '{0}/queues/{1}'.format(self.uri_prefix, queue_name)
resp, body = self.put(uri, '', extra_headers=True,
headers=self.headers)
return self.get_resp(resp, body)
def delete_queue(self, queue_name):
uri = '{0}/queues/{1}'.format(self.uri_prefix, queue_name)
resp, body = self.delete(uri, extra_headers=True, headers=self.headers)
return self.get_resp(resp, body)
def list_messages(self, queue_name):
uri = '{0}/queues/{1}/messages'.format(self.uri_prefix, queue_name)
resp, body = self.get(uri, extra_headers=True, headers=self.headers)
return self.get_resp(resp, body)
def post_messages(self, queue_name, messages):
uri = '{0}/queues/{1}/messages'.format(self.uri_prefix, queue_name)
resp, body = self.post(uri, body=jsonutils.dumps(messages),
extra_headers=True,
headers=self.headers)
return self.get_resp(resp, body)