# Copyright 2017 Catalyst IT Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime from datetime import timedelta import json import requests from tempest.lib import exceptions from qinling_tempest_plugin.services import base as client_base class QinlingClient(client_base.QinlingClientBase): """Tempest REST client for Qinling.""" def delete_resource(self, res, id, ignore_notfound=False): try: resp, _ = self.delete_obj(res, id) return resp except exceptions.NotFound: if ignore_notfound: pass else: raise def get_resource(self, res, id): resp, body = self.get_obj(res, id) return resp, body def get_resources(self, res, params=None): resp, body = self.get_list_objs(res, params=params) return resp, body def create_runtime(self, image, name=None, is_public=True): req_body = {"image": image, "is_public": is_public} if name: req_body.update({'name': name}) resp, body = self.post_json('runtimes', req_body) return resp, body def create_function(self, code, runtime_id, name='', package_data=None, entry=None): """Create function. Tempest rest client doesn't support multipart upload, so use requests lib instead. As a result, we can not use self.assertRaises function for negative tests. """ headers = {'X-Auth-Token': self.auth_provider.get_token()} req_body = { 'name': name, 'runtime_id': runtime_id, 'code': json.dumps(code) } if entry: req_body['entry'] = entry req_kwargs = { 'headers': headers, 'data': req_body } if package_data: req_kwargs.update({'files': {'package': package_data}}) url_path = '%s/v1/functions' % (self.base_url) resp = requests.post(url_path, **req_kwargs) return resp, json.loads(resp.text) def update_function(self, function_id, package_data=None, code=None, entry=None): headers = {'X-Auth-Token': self.auth_provider.get_token()} req_body = {} if code: req_body['code'] = json.dumps(code) if entry: req_body['entry'] = entry req_kwargs = { 'headers': headers, 'data': req_body } if package_data: req_kwargs.update({'files': {'package': package_data}}) url_path = '%s/v1/functions/%s' % (self.base_url, function_id) resp = requests.put(url_path, **req_kwargs) return resp, json.loads(resp.text) def get_function(self, function_id): resp, body = self.get( '/v1/functions/{id}'.format(id=function_id), ) return resp, json.loads(body) def download_function(self, function_id): return self.get('/v1/functions/%s?download=true' % function_id, headers={}) def detach_function(self, function_id, version=0): if version == 0: url = '/v1/functions/%s/detach' % function_id else: url = '/v1/functions/%s/versions/%s/detach' % \ (function_id, version) return self.post(url, None, headers={}) def create_execution(self, function_id, input=None, sync=True, version=0): req_body = { 'function_id': function_id, 'function_version': version, 'sync': sync, 'input': input } resp, body = self.post_json('executions', req_body) return resp, body def get_execution_log(self, execution_id): return self.get('/v1/executions/%s/log' % execution_id, headers={'Accept': 'text/plain'}) def get_function_workers(self, function_id, version=0): q_params = None if version > 0: q_params = "/?function_version=%s" % version url = 'functions/%s/workers' % function_id if q_params: url += q_params return self.get_resources(url) def create_webhook(self, function_id, version=0): req_body = {"function_id": function_id, "function_version": version} resp, body = self.post_json('webhooks', req_body) return resp, body def create_job(self, function_id, version=0, first_execution_time=None): req_body = {"function_id": function_id, "function_version": version} if not first_execution_time: first_execution_time = str( datetime.utcnow() + timedelta(hours=1) ) req_body.update({'first_execution_time': first_execution_time}) resp, body = self.post_json('jobs', req_body) return resp, body def create_function_version(self, function_id, description=None): req_body = {} if description is not None: req_body['description'] = description resp, body = self.post_json( 'functions/%s/versions' % function_id, req_body ) return resp, body def delete_function_version(self, function_id, version, ignore_notfound=False): try: resp, _ = self.delete( '/v1/functions/{id}/versions/{version}'.format( id=function_id, version=version) ) return resp except exceptions.NotFound: if ignore_notfound: pass else: raise def get_function_version(self, function_id, version): resp, body = self.get( '/v1/functions/%s/versions/%s' % (function_id, version), ) return resp, json.loads(body) def get_function_versions(self, function_id): resp, body = self.get( '/v1/functions/%s/versions' % (function_id), ) return resp, json.loads(body)