Copy images_client from tempest

Now the gate test of ceilometer is broken, because the ceilometer test
is using nonstable tempest interfaces and these interfaces are changed.
QA team defines interfaces which are not under tempest.lib as unstable
clearly according to

https://github.com/openstack/tempest/blob/master/doc/source/library.rst#stability

So current test way is against the QA policy, so here is a workaround.
Copy&paste is not good, almost wrong. However, if still continuing the
tests, it would be a nice option to copy&paste and pass the gate test.
After QA team defines stable interfaces, that is a nice time to switch
using the stable interfaces with removing this copy&paste code.

Closes-Bug: #1589426

Depends-On: I3abd9049560ee507b3610ab482c697a239f13a3b
Change-Id: I30266f1b690ea105511d9b1162755bcc85e2e69a
This commit is contained in:
Ken'ichi Ohmichi 2016-06-06 14:35:28 +09:00 committed by Julien Danjou
parent c28336d400
commit 6c0b97b1e6
8 changed files with 1037 additions and 3 deletions

View File

@ -0,0 +1,169 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
class TempestException(Exception):
"""Base Tempest Exception
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
message = "An unknown exception occurred"
def __init__(self, *args, **kwargs):
super(TempestException, self).__init__()
try:
self._error_string = self.message % kwargs
except Exception:
# at least get the core message out if something happened
self._error_string = self.message
if len(args) > 0:
# If there is a non-kwarg parameter, assume it's the error
# message or reason description and tack it on to the end
# of the exception message
# Convert all arguments into their string representations...
args = ["%s" % arg for arg in args]
self._error_string = (self._error_string +
"\nDetails: %s" % '\n'.join(args))
def __str__(self):
return self._error_string
class RestClientException(TempestException,
testtools.TestCase.failureException):
pass
class InvalidConfiguration(TempestException):
message = "Invalid Configuration"
class InvalidCredentials(TempestException):
message = "Invalid Credentials"
class InvalidServiceTag(TempestException):
message = "Invalid service tag"
class InvalidIdentityVersion(TempestException):
message = "Invalid version %(identity_version)s of the identity service"
class TimeoutException(TempestException):
message = "Request timed out"
class BuildErrorException(TempestException):
message = "Server %(server_id)s failed to build and is in ERROR status"
class ImageKilledException(TempestException):
message = "Image %(image_id)s 'killed' while waiting for '%(status)s'"
class AddImageException(TempestException):
message = "Image %(image_id)s failed to become ACTIVE in the allotted time"
class VolumeBuildErrorException(TempestException):
message = "Volume %(volume_id)s failed to build and is in ERROR status"
class VolumeRestoreErrorException(TempestException):
message = "Volume %(volume_id)s failed to restore and is in ERROR status"
class SnapshotBuildErrorException(TempestException):
message = "Snapshot %(snapshot_id)s failed to build and is in ERROR status"
class VolumeBackupException(TempestException):
message = "Volume backup %(backup_id)s failed and is in ERROR status"
class StackBuildErrorException(TempestException):
message = ("Stack %(stack_identifier)s is in %(stack_status)s status "
"due to '%(stack_status_reason)s'")
class EndpointNotFound(TempestException):
message = "Endpoint not found"
class IdentityError(TempestException):
message = "Got identity error"
class ServerUnreachable(TempestException):
message = "The server is not reachable via the configured network"
# NOTE(andreaf) This exception is added here to facilitate the migration
# of get_network_from_name and preprov_creds to tempest.lib, and it should
# be migrated along with them
class InvalidTestResource(TempestException):
message = "%(name) is not a valid %(type), or the name is ambiguous"
class RFCViolation(RestClientException):
message = "RFC Violation"
class InvalidHttpSuccessCode(RestClientException):
message = "The success code is different than the expected one"
class BadRequest(RestClientException):
message = "Bad request"
class ResponseWithNonEmptyBody(RFCViolation):
message = ("RFC Violation! Response with %(status)d HTTP Status Code "
"MUST NOT have a body")
class ResponseWithEntity(RFCViolation):
message = ("RFC Violation! Response with 205 HTTP Status Code "
"MUST NOT have an entity")
class InvalidHTTPResponseHeader(RestClientException):
message = "HTTP response header is invalid"
class InvalidStructure(TempestException):
message = "Invalid structure of table with details"
class CommandFailed(Exception):
def __init__(self, returncode, cmd, output, stderr):
super(CommandFailed, self).__init__()
self.returncode = returncode
self.cmd = cmd
self.stdout = output
self.stderr = stderr
def __str__(self):
return ("Command '%s' returned non-zero exit status %d.\n"
"stdout:\n%s\n"
"stderr:\n%s" % (self.cmd,
self.returncode,
self.stdout,
self.stderr))

View File

@ -23,12 +23,14 @@ from tempest.lib.services.compute.floating_ips_client import FloatingIPsClient
from tempest.lib.services.compute.networks_client import NetworksClient from tempest.lib.services.compute.networks_client import NetworksClient
from tempest.lib.services.compute.servers_client import ServersClient from tempest.lib.services.compute.servers_client import ServersClient
from tempest import manager from tempest import manager
from tempest.services.image.v1.json.images_client import ImagesClient
from tempest.services.image.v2.json.images_client import \
ImagesClient as ImagesClientV2
from tempest.services.object_storage.container_client import ContainerClient from tempest.services.object_storage.container_client import ContainerClient
from tempest.services.object_storage.object_client import ObjectClient from tempest.services.object_storage.object_client import ObjectClient
from ceilometer.tests.tempest.service.images.v1.images_client import \
ImagesClient
from ceilometer.tests.tempest.service.images.v2.images_client import \
ImagesClient as ImagesClientV2
CONF = config.CONF CONF = config.CONF

View File

@ -0,0 +1,361 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Originally copied from python-glanceclient
import copy
import hashlib
import posixpath
import re
import socket
import struct
import OpenSSL
from oslo_log import log as logging
import six
from six import moves
from six.moves import http_client as httplib
from six.moves.urllib import parse as urlparse
from ceilometer.tests.tempest import exceptions as exc
LOG = logging.getLogger(__name__)
USER_AGENT = 'tempest'
CHUNKSIZE = 1024 * 64 # 64kB
TOKEN_CHARS_RE = re.compile('^[-A-Za-z0-9+/=]*$')
class HTTPClient(object):
def __init__(self, auth_provider, filters, **kwargs):
self.auth_provider = auth_provider
self.filters = filters
self.endpoint = auth_provider.base_url(filters)
endpoint_parts = urlparse.urlparse(self.endpoint)
self.endpoint_scheme = endpoint_parts.scheme
self.endpoint_hostname = endpoint_parts.hostname
self.endpoint_port = endpoint_parts.port
self.connection_class = self._get_connection_class(
self.endpoint_scheme)
self.connection_kwargs = self._get_connection_kwargs(
self.endpoint_scheme, **kwargs)
@staticmethod
def _get_connection_class(scheme):
if scheme == 'https':
return VerifiedHTTPSConnection
else:
return httplib.HTTPConnection
@staticmethod
def _get_connection_kwargs(scheme, **kwargs):
_kwargs = {'timeout': float(kwargs.get('timeout', 600))}
if scheme == 'https':
_kwargs['ca_certs'] = kwargs.get('ca_certs', None)
_kwargs['cert_file'] = kwargs.get('cert_file', None)
_kwargs['key_file'] = kwargs.get('key_file', None)
_kwargs['insecure'] = kwargs.get('insecure', False)
_kwargs['ssl_compression'] = kwargs.get('ssl_compression', True)
return _kwargs
def _get_connection(self):
_class = self.connection_class
try:
return _class(self.endpoint_hostname, self.endpoint_port,
**self.connection_kwargs)
except httplib.InvalidURL:
raise exc.EndpointNotFound
def _http_request(self, url, method, **kwargs):
"""Send an http request with the specified characteristics.
Wrapper around httplib.HTTP(S)Connection.request to handle tasks such
as setting headers and error handling.
"""
# Copy the kwargs so we can reuse the original in case of redirects
kwargs['headers'] = copy.deepcopy(kwargs.get('headers', {}))
kwargs['headers'].setdefault('User-Agent', USER_AGENT)
self._log_request(method, url, kwargs['headers'])
conn = self._get_connection()
try:
url_parts = urlparse.urlparse(url)
conn_url = posixpath.normpath(url_parts.path)
LOG.debug('Actual Path: {path}'.format(path=conn_url))
if kwargs['headers'].get('Transfer-Encoding') == 'chunked':
conn.putrequest(method, conn_url)
for header, value in kwargs['headers'].items():
conn.putheader(header, value)
conn.endheaders()
chunk = kwargs['body'].read(CHUNKSIZE)
# Chunk it, baby...
while chunk:
conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
chunk = kwargs['body'].read(CHUNKSIZE)
conn.send('0\r\n\r\n')
else:
conn.request(method, conn_url, **kwargs)
resp = conn.getresponse()
except socket.gaierror as e:
message = ("Error finding address for %(url)s: %(e)s" %
{'url': url, 'e': e})
raise exc.EndpointNotFound(message)
except (socket.error, socket.timeout) as e:
message = ("Error communicating with %(endpoint)s %(e)s" %
{'endpoint': self.endpoint, 'e': e})
raise exc.TimeoutException(message)
body_iter = ResponseBodyIterator(resp)
# Read body into string if it isn't obviously image data
if resp.getheader('content-type', None) != 'application/octet-stream':
body_str = ''.join([body_chunk for body_chunk in body_iter])
body_iter = six.StringIO(body_str)
self._log_response(resp, None)
else:
self._log_response(resp, body_iter)
return resp, body_iter
def _log_request(self, method, url, headers):
LOG.info('Request: ' + method + ' ' + url)
if headers:
headers_out = headers
if 'X-Auth-Token' in headers and headers['X-Auth-Token']:
token = headers['X-Auth-Token']
if len(token) > 64 and TOKEN_CHARS_RE.match(token):
headers_out = headers.copy()
headers_out['X-Auth-Token'] = "<Token omitted>"
LOG.info('Request Headers: ' + str(headers_out))
def _log_response(self, resp, body):
status = str(resp.status)
LOG.info("Response Status: " + status)
if resp.getheaders():
LOG.info('Response Headers: ' + str(resp.getheaders()))
if body:
str_body = str(body)
length = len(body)
LOG.info('Response Body: ' + str_body[:2048])
if length >= 2048:
self.LOG.debug("Large body (%d) md5 summary: %s", length,
hashlib.md5(str_body).hexdigest())
def raw_request(self, method, url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type',
'application/octet-stream')
if 'body' in kwargs:
if (hasattr(kwargs['body'], 'read')
and method.lower() in ('post', 'put')):
# We use 'Transfer-Encoding: chunked' because
# body size may not always be known in advance.
kwargs['headers']['Transfer-Encoding'] = 'chunked'
# Decorate the request with auth
req_url, kwargs['headers'], kwargs['body'] = \
self.auth_provider.auth_request(
method=method, url=url, headers=kwargs['headers'],
body=kwargs.get('body', None), filters=self.filters)
return self._http_request(req_url, method, **kwargs)
class OpenSSLConnectionDelegator(object):
"""An OpenSSL.SSL.Connection delegator.
Supplies an additional 'makefile' method which httplib requires
and is not present in OpenSSL.SSL.Connection.
Note: Since it is not possible to inherit from OpenSSL.SSL.Connection
a delegator must be used.
"""
def __init__(self, *args, **kwargs):
self.connection = OpenSSL.SSL.Connection(*args, **kwargs)
def __getattr__(self, name):
return getattr(self.connection, name)
def makefile(self, *args, **kwargs):
# Ensure the socket is closed when this file is closed
kwargs['close'] = True
return socket._fileobject(self.connection, *args, **kwargs)
class VerifiedHTTPSConnection(httplib.HTTPSConnection):
"""Extended HTTPSConnection which uses OpenSSL library for enhanced SSL
Note: Much of this functionality can eventually be replaced
with native Python 3.3 code.
"""
def __init__(self, host, port=None, key_file=None, cert_file=None,
ca_certs=None, timeout=None, insecure=False,
ssl_compression=True):
httplib.HTTPSConnection.__init__(self, host, port,
key_file=key_file,
cert_file=cert_file)
self.key_file = key_file
self.cert_file = cert_file
self.timeout = timeout
self.insecure = insecure
self.ssl_compression = ssl_compression
self.ca_certs = ca_certs
self.setcontext()
@staticmethod
def host_matches_cert(host, x509):
"""Verify that the x509 certificate we have received from 'host'
Identifies the server we are connecting to, ie that the certificate's
Common Name or a Subject Alternative Name matches 'host'.
"""
# First see if we can match the CN
if x509.get_subject().commonName == host:
return True
# Also try Subject Alternative Names for a match
san_list = None
for i in moves.xrange(x509.get_extension_count()):
ext = x509.get_extension(i)
if ext.get_short_name() == 'subjectAltName':
san_list = str(ext)
for san in ''.join(san_list.split()).split(','):
if san == "DNS:%s" % host:
return True
# Server certificate does not match host
msg = ('Host "%s" does not match x509 certificate contents: '
'CommonName "%s"' % (host, x509.get_subject().commonName))
if san_list is not None:
msg = msg + ', subjectAltName "%s"' % san_list
raise exc.SSLCertificateError(msg)
def verify_callback(self, connection, x509, errnum,
depth, preverify_ok):
if x509.has_expired():
msg = "SSL Certificate expired on '%s'" % x509.get_notAfter()
raise exc.SSLCertificateError(msg)
if depth == 0 and preverify_ok is True:
# We verify that the host matches against the last
# certificate in the chain
return self.host_matches_cert(self.host, x509)
else:
# Pass through OpenSSL's default result
return preverify_ok
def setcontext(self):
"""Set up the OpenSSL context."""
self.context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
if self.ssl_compression is False:
self.context.set_options(0x20000) # SSL_OP_NO_COMPRESSION
if self.insecure is not True:
self.context.set_verify(OpenSSL.SSL.VERIFY_PEER,
self.verify_callback)
else:
self.context.set_verify(OpenSSL.SSL.VERIFY_NONE,
self.verify_callback)
if self.cert_file:
try:
self.context.use_certificate_file(self.cert_file)
except Exception as e:
msg = 'Unable to load cert from "%s" %s' % (self.cert_file, e)
raise exc.SSLConfigurationError(msg)
if self.key_file is None:
# We support having key and cert in same file
try:
self.context.use_privatekey_file(self.cert_file)
except Exception as e:
msg = ('No key file specified and unable to load key '
'from "%s" %s' % (self.cert_file, e))
raise exc.SSLConfigurationError(msg)
if self.key_file:
try:
self.context.use_privatekey_file(self.key_file)
except Exception as e:
msg = 'Unable to load key from "%s" %s' % (self.key_file, e)
raise exc.SSLConfigurationError(msg)
if self.ca_certs:
try:
self.context.load_verify_locations(self.ca_certs)
except Exception as e:
msg = 'Unable to load CA from "%s" %s' % (self.ca_certs, e)
raise exc.SSLConfigurationError(msg)
else:
self.context.set_default_verify_paths()
def connect(self):
"""Connect to SSL port and apply per-connection parameters."""
try:
addresses = socket.getaddrinfo(self.host,
self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)
except OSError as msg:
raise exc.RestClientException(msg)
for res in addresses:
af, socktype, proto, canonname, sa = res
sock = socket.socket(af, socket.SOCK_STREAM)
if self.timeout is not None:
# '0' microseconds
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO,
struct.pack('LL', self.timeout, 0))
self.sock = OpenSSLConnectionDelegator(self.context, sock)
try:
self.sock.connect(sa)
except OSError as msg:
if self.sock:
self.sock = None
continue
break
if self.sock is None:
# Happen only when all results have failed.
raise exc.RestClientException('Cannot connect to %s' % self.host)
def close(self):
if self.sock:
# Remove the reference to the socket but don't close it yet.
# Response close will close both socket and associated
# file. Closing socket too soon will cause response
# reads to fail with socket IO error 'Bad file descriptor'.
self.sock = None
httplib.HTTPSConnection.close(self)
class ResponseBodyIterator(object):
"""A class that acts as an iterator over an HTTP response."""
def __init__(self, resp):
self.resp = resp
def __iter__(self):
while True:
yield self.next()
def next(self):
chunk = self.resp.read(CHUNKSIZE)
if chunk:
return chunk
else:
raise StopIteration()

View File

@ -0,0 +1,257 @@
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import errno
import os
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
import six
from six.moves.urllib import parse as urllib
from tempest.lib.common import rest_client
from tempest.lib import exceptions as lib_exc
from ceilometer.tests.tempest.service.images import glance_http
LOG = logging.getLogger(__name__)
class ImagesClient(rest_client.RestClient):
def __init__(self, auth_provider, catalog_type, region, **kwargs):
super(ImagesClient, self).__init__(
auth_provider, catalog_type, region, **kwargs)
self._http = None
self.dscv = kwargs.get("disable_ssl_certificate_validation")
self.ca_certs = kwargs.get("ca_certs")
def _image_meta_from_headers(self, headers):
meta = {'properties': {}}
for key, value in six.iteritems(headers):
if key.startswith('x-image-meta-property-'):
_key = key[22:]
meta['properties'][_key] = value
elif key.startswith('x-image-meta-'):
_key = key[13:]
meta[_key] = value
for key in ['is_public', 'protected', 'deleted']:
if key in meta:
meta[key] = meta[key].strip().lower() in ('t', 'true', 'yes',
'1')
for key in ['size', 'min_ram', 'min_disk']:
if key in meta:
try:
meta[key] = int(meta[key])
except ValueError:
pass
return meta
def _image_meta_to_headers(self, fields):
headers = {}
fields_copy = copy.deepcopy(fields)
copy_from = fields_copy.pop('copy_from', None)
if copy_from is not None:
headers['x-glance-api-copy-from'] = copy_from
for key, value in six.iteritems(fields_copy.pop('properties', {})):
headers['x-image-meta-property-%s' % key] = str(value)
for key, value in six.iteritems(fields_copy.pop('api', {})):
headers['x-glance-api-property-%s' % key] = str(value)
for key, value in six.iteritems(fields_copy):
headers['x-image-meta-%s' % key] = str(value)
return headers
def _get_file_size(self, obj):
"""Analyze file-like object and attempt to determine its size.
:param obj: file-like object, typically redirected from stdin.
:retval The file's size or None if it cannot be determined.
"""
# For large images, we need to supply the size of the
# image file. See LP Bugs #827660 and #845788.
if hasattr(obj, 'seek') and hasattr(obj, 'tell'):
try:
obj.seek(0, os.SEEK_END)
obj_size = obj.tell()
obj.seek(0)
return obj_size
except IOError as e:
if e.errno == errno.ESPIPE:
# Illegal seek. This means the user is trying
# to pipe image data to the client, e.g.
# echo testdata | bin/glance add blah..., or
# that stdin is empty, or that a file-like
# object which doesn't support 'seek/tell' has
# been supplied.
return None
else:
raise
else:
# Cannot determine size of input image
return None
def _get_http(self):
return glance_http.HTTPClient(auth_provider=self.auth_provider,
filters=self.filters,
insecure=self.dscv,
ca_certs=self.ca_certs)
def _create_with_data(self, headers, data):
resp, body_iter = self.http.raw_request('POST', '/v1/images',
headers=headers, body=data)
self._error_checker('POST', '/v1/images', headers, data, resp,
body_iter)
body = json.loads(''.join([c for c in body_iter]))
return rest_client.ResponseBody(resp, body)
def _update_with_data(self, image_id, headers, data):
url = '/v1/images/%s' % image_id
resp, body_iter = self.http.raw_request('PUT', url, headers=headers,
body=data)
self._error_checker('PUT', url, headers, data,
resp, body_iter)
body = json.loads(''.join([c for c in body_iter]))
return rest_client.ResponseBody(resp, body)
@property
def http(self):
if self._http is None:
self._http = self._get_http()
return self._http
def create_image(self, **kwargs):
headers = {}
data = kwargs.pop('data', None)
headers.update(self._image_meta_to_headers(kwargs))
if data is not None:
return self._create_with_data(headers, data)
resp, body = self.post('v1/images', None, headers)
self.expected_success(201, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def update_image(self, image_id, **kwargs):
headers = {}
data = kwargs.pop('data', None)
headers.update(self._image_meta_to_headers(kwargs))
if data is not None:
return self._update_with_data(image_id, headers, data)
url = 'v1/images/%s' % image_id
resp, body = self.put(url, None, headers)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def delete_image(self, image_id):
url = 'v1/images/%s' % image_id
resp, body = self.delete(url)
self.expected_success(200, resp.status)
return rest_client.ResponseBody(resp, body)
def list_images(self, detail=False, **kwargs):
"""Return a list of all images filtered by input parameters.
Available params: see http://developer.openstack.org/
api-ref-image-v1.html#listImage-v1
Most parameters except the following are passed to the API without
any changes.
:param changes_since: The name is changed to changes-since
"""
url = 'v1/images'
if detail:
url += '/detail'
properties = kwargs.pop('properties', {})
for key, value in six.iteritems(properties):
kwargs['property-%s' % key] = value
if kwargs.get('changes_since'):
kwargs['changes-since'] = kwargs.pop('changes_since')
if len(kwargs) > 0:
url += '?%s' % urllib.urlencode(kwargs)
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def check_image(self, image_id):
"""Check image metadata."""
url = 'v1/images/%s' % image_id
resp, __ = self.head(url)
self.expected_success(200, resp.status)
body = self._image_meta_from_headers(resp)
return rest_client.ResponseBody(resp, body)
def show_image(self, image_id):
"""Get image details plus the image itself."""
url = 'v1/images/%s' % image_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
return rest_client.ResponseBodyData(resp, body)
def is_resource_deleted(self, id):
try:
if self.check_image(id)['status'] == 'deleted':
return True
except lib_exc.NotFound:
return True
return False
@property
def resource_type(self):
"""Returns the primary type of resource this client works with."""
return 'image_meta'
def list_image_members(self, image_id):
url = 'v1/images/%s/members' % image_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def list_shared_images(self, tenant_id):
"""List shared images with the specified tenant"""
url = 'v1/shared-images/%s' % tenant_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def add_member(self, member_id, image_id, **kwargs):
"""Add a member to an image.
Available params: see http://developer.openstack.org/
api-ref-image-v1.html#addMember-v1
"""
url = 'v1/images/%s/members/%s' % (image_id, member_id)
body = json.dumps({'member': kwargs})
resp, __ = self.put(url, body)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp)
def delete_member(self, member_id, image_id):
url = 'v1/images/%s/members/%s' % (image_id, member_id)
resp, __ = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp)

View File

@ -0,0 +1,245 @@
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils as json
from six.moves.urllib import parse as urllib
from tempest.lib.common import rest_client
from tempest.lib import exceptions as lib_exc
from ceilometer.tests.tempest.service.images import glance_http
class ImagesClient(rest_client.RestClient):
def __init__(self, auth_provider, catalog_type, region, **kwargs):
super(ImagesClient, self).__init__(
auth_provider, catalog_type, region, **kwargs)
self._http = None
self.dscv = kwargs.get("disable_ssl_certificate_validation")
self.ca_certs = kwargs.get("ca_certs")
def _get_http(self):
return glance_http.HTTPClient(auth_provider=self.auth_provider,
filters=self.filters,
insecure=self.dscv,
ca_certs=self.ca_certs)
@property
def http(self):
if self._http is None:
self._http = self._get_http()
return self._http
def update_image(self, image_id, patch):
"""Update an image.
Available params: see http://developer.openstack.org/
api-ref-image-v2.html#updateImage-v2
"""
data = json.dumps(patch)
headers = {"Content-Type": "application/openstack-images-v2.0"
"-json-patch"}
resp, body = self.patch('v2/images/%s' % image_id, data, headers)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def create_image(self, **kwargs):
"""Create an image.
Available params: see http://developer.openstack.org/
api-ref-image-v2.html#createImage-v2
"""
data = json.dumps(kwargs)
resp, body = self.post('v2/images', data)
self.expected_success(201, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def deactivate_image(self, image_id):
url = 'v2/images/%s/actions/deactivate' % image_id
resp, body = self.post(url, None)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def reactivate_image(self, image_id):
url = 'v2/images/%s/actions/reactivate' % image_id
resp, body = self.post(url, None)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def delete_image(self, image_id):
url = 'v2/images/%s' % image_id
resp, _ = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp)
def list_images(self, params=None):
url = 'v2/images'
if params:
url += '?%s' % urllib.urlencode(params)
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_image(self, image_id):
url = 'v2/images/%s' % image_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def is_resource_deleted(self, id):
try:
self.show_image(id)
except lib_exc.NotFound:
return True
return False
@property
def resource_type(self):
"""Returns the primary type of resource this client works with."""
return 'image'
def store_image_file(self, image_id, data):
url = 'v2/images/%s/file' % image_id
headers = {'Content-Type': 'application/octet-stream'}
resp, body = self.http.raw_request('PUT', url, headers=headers,
body=data)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def show_image_file(self, image_id):
url = 'v2/images/%s/file' % image_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
return rest_client.ResponseBodyData(resp, body)
def add_image_tag(self, image_id, tag):
url = 'v2/images/%s/tags/%s' % (image_id, tag)
resp, body = self.put(url, body=None)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp, body)
def delete_image_tag(self, image_id, tag):
url = 'v2/images/%s/tags/%s' % (image_id, tag)
resp, _ = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp)
def list_image_members(self, image_id):
url = 'v2/images/%s/members' % image_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def create_image_member(self, image_id, **kwargs):
"""Create an image member.
Available params: see http://developer.openstack.org/
api-ref-image-v2.html#createImageMember-v2
"""
url = 'v2/images/%s/members' % image_id
data = json.dumps(kwargs)
resp, body = self.post(url, data)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def update_image_member(self, image_id, member_id, **kwargs):
"""Update an image member.
Available params: see http://developer.openstack.org/
api-ref-image-v2.html#updateImageMember-v2
"""
url = 'v2/images/%s/members/%s' % (image_id, member_id)
data = json.dumps(kwargs)
resp, body = self.put(url, data)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_image_member(self, image_id, member_id):
url = 'v2/images/%s/members/%s' % (image_id, member_id)
resp, body = self.get(url)
self.expected_success(200, resp.status)
return rest_client.ResponseBody(resp, json.loads(body))
def delete_image_member(self, image_id, member_id):
url = 'v2/images/%s/members/%s' % (image_id, member_id)
resp, _ = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp)
def show_schema(self, schema):
url = 'v2/schemas/%s' % schema
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def list_resource_types(self):
url = '/v2/metadefs/resource_types'
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def create_namespace(self, **kwargs):
"""Create a namespace.
Available params: see http://developer.openstack.org/
api-ref-image-v2.html#createNamespace-v2
"""
data = json.dumps(kwargs)
resp, body = self.post('/v2/metadefs/namespaces', data)
self.expected_success(201, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_namespace(self, namespace):
url = '/v2/metadefs/namespaces/%s' % namespace
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def update_namespace(self, namespace, **kwargs):
"""Update a namespace.
Available params: see http://developer.openstack.org/
api-ref-image-v2.html#updateNamespace-v2
"""
# NOTE: On Glance API, we need to pass namespace on both URI
# and a request body.
params = {'namespace': namespace}
params.update(kwargs)
data = json.dumps(params)
url = '/v2/metadefs/namespaces/%s' % namespace
resp, body = self.put(url, body=data)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def delete_namespace(self, namespace):
url = '/v2/metadefs/namespaces/%s' % namespace
resp, _ = self.delete(url)
self.expected_success(204, resp.status)
return rest_client.ResponseBody(resp)