Implement Registry's Client V2

The patch implements Registry's v2 Client.

Some notes:
* The implementation uses __getattr__ to map method's calls to remote
  RPC methods.
* The client supports both bulk and single calls.
* Client is based on the base HTTPClient

About Exceptions:
* If raise_exc == True, the client will try to re-raise the remote
  exception. If the server raised an unknown exception, it will be
  re-raised as RPCException in the client side.

Implements blueprint registry-api-v2

Change-Id: I98757c2272c68301373c1a3af96c4d06cb63cc97
This commit is contained in:
Flaper Fesp 2013-05-07 10:42:27 +02:00
parent 252ef063e4
commit 34135e88d5
9 changed files with 1093 additions and 10 deletions

View File

@ -18,12 +18,15 @@
"""
RPC Controller
"""
import json
import traceback
from oslo.config import cfg
from webob import exc
from glance.common import client
from glance.common import exception
import glance.openstack.common.importutils as imp
import glance.openstack.common.log as logging
@ -166,3 +169,72 @@ class Controller(object):
result = {"_error": {"cls": cls_path, "val": val}}
results.append(result)
return results
class RPCClient(client.BaseClient):
def __init__(self, *args, **kwargs):
self.raise_exc = kwargs.pop("raise_exc", True)
self.base_path = kwargs.pop("base_path", '/rpc')
super(RPCClient, self).__init__(*args, **kwargs)
@client.handle_unauthenticated
def bulk_request(self, commands):
"""
Execute multiple commands in a single request.
:params commands: List of commands to send. Commands
must respect the following form:
{
'command': 'method_name',
'kwargs': method_kwargs
}
"""
body = json.dumps(commands)
response = super(RPCClient, self).do_request('POST',
self.base_path,
body)
return json.loads(response.read())
def do_request(self, method, **kwargs):
"""
Simple do_request override. This method serializes
the outgoing body and builds the command that will
be sent.
:params method: The remote python method to call
:params kwargs: Dynamic parameters that will be
passed to the remote method.
"""
content = self.bulk_request([{'command': method,
'kwargs': kwargs}])
# NOTE(flaper87): Return the first result if
# a single command was executed.
content = content[0]
if self.raise_exc and (content and '_error' in content):
error = content['_error']
try:
exc_cls = imp.import_class(error['cls'])
raise exc_cls(error['val'])
except ImportError:
# NOTE(flaper87): The exception
# class couldn't be imported, using
# a generic exception.
raise exception.RPCError(**error)
return content
def __getattr__(self, item):
"""
This method returns a method_proxy that
will execute the rpc call in the registry
service.
"""
if item.startswith('_'):
raise AttributeError(item)
def method_proxy(**kw):
return self.do_request(item, **kw)
return method_proxy

View File

@ -0,0 +1,14 @@
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,133 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Registry's Client V2
"""
import os
from oslo.config import cfg
from glance.common import exception
import glance.openstack.common.log as logging
from glance.registry.client.v2 import client
LOG = logging.getLogger(__name__)
registry_client_opts = [
cfg.StrOpt('registry_client_protocol', default='http',
help=_('The protocol to use for communication with the '
'registry server. Either http or https.')),
cfg.StrOpt('registry_client_key_file',
help=_('The path to the key file to use in SSL connections '
'to the registry server.')),
cfg.StrOpt('registry_client_cert_file',
help=_('The path to the cert file to use in SSL connections '
'to the registry server.')),
cfg.StrOpt('registry_client_ca_file',
help=_('The path to the certifying authority cert file to '
'use in SSL connections to the registry server.')),
cfg.BoolOpt('registry_client_insecure', default=False,
help=_('When using SSL in connections to the registry server, '
'do not require validation via a certifying '
'authority.')),
cfg.IntOpt('registry_client_timeout', default=600,
help=_('The period of time, in seconds, that the API server '
'will wait for a registry request to complete. A '
'value of 0 implies no timeout.')),
]
registry_client_ctx_opts = [
cfg.StrOpt('admin_user', secret=True,
help=_('The administrators user name.')),
cfg.StrOpt('admin_password', secret=True,
help=_('The administrators password.')),
cfg.StrOpt('admin_tenant_name', secret=True,
help=_('The tenant name of the adminstrative user.')),
cfg.StrOpt('auth_url',
help=_('The URL to the keystone service.')),
cfg.StrOpt('auth_strategy', default='noauth',
help=_('The strategy to use for authentication.')),
cfg.StrOpt('auth_region',
help=_('The region for the authentication service.')),
]
CONF = cfg.CONF
CONF.register_opts(registry_client_opts)
CONF.register_opts(registry_client_ctx_opts)
_CLIENT_CREDS = None
_CLIENT_HOST = None
_CLIENT_PORT = None
_CLIENT_KWARGS = {}
def configure_registry_client():
"""
Sets up a registry client for use in registry lookups
"""
global _CLIENT_KWARGS, _CLIENT_HOST, _CLIENT_PORT
try:
host, port = CONF.registry_host, CONF.registry_port
except cfg.ConfigFileValueError:
msg = _("Configuration option was not valid")
LOG.error(msg)
raise exception.BadRegistryConnectionConfiguration(msg)
except IndexError:
msg = _("Could not find required configuration option")
LOG.error(msg)
raise exception.BadRegistryConnectionConfiguration(msg)
_CLIENT_HOST = host
_CLIENT_PORT = port
_CLIENT_KWARGS = {
'use_ssl': CONF.registry_client_protocol.lower() == 'https',
'key_file': CONF.registry_client_key_file,
'cert_file': CONF.registry_client_cert_file,
'ca_file': CONF.registry_client_ca_file,
'insecure': CONF.registry_client_insecure,
'timeout': CONF.registry_client_timeout,
}
def configure_registry_admin_creds():
global _CLIENT_CREDS
if CONF.auth_url or os.getenv('OS_AUTH_URL'):
strategy = 'keystone'
else:
strategy = CONF.auth_strategy
_CLIENT_CREDS = {
'user': CONF.admin_user,
'password': CONF.admin_password,
'username': CONF.admin_user,
'tenant': CONF.admin_tenant_name,
'auth_url': CONF.auth_url,
'strategy': strategy,
'region': CONF.auth_region,
}
def get_registry_client(cxt):
global _CLIENT_CREDS, _CLIENT_KWARGS, _CLIENT_HOST, _CLIENT_PORT
kwargs = _CLIENT_KWARGS.copy()
kwargs['auth_tok'] = cxt.auth_tok
if _CLIENT_CREDS:
kwargs['creds'] = _CLIENT_CREDS
return client.RegistryClient(_CLIENT_HOST, _CLIENT_PORT, **kwargs)

View File

@ -0,0 +1,32 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Simple client class to speak with any RESTful service that implements
the Glance Registry API
"""
from glance.common import rpc
import glance.openstack.common.log as logging
LOG = logging.getLogger(__name__)
class RegistryClient(rpc.RPCClient):
"""Registry's V2 Client."""
DEFAULT_PORT = 9191

View File

@ -41,8 +41,13 @@ DEBUG = False
class FakeRegistryConnection(object):
def __init__(self, *args, **kwargs):
pass
def __init__(self, registry=None):
self.registry = registry or rserver
def __call__(self, *args, **kwargs):
# NOTE(flaper87): This method takes
# __init__'s place in the chain.
return self
def connect(self):
return True
@ -60,7 +65,7 @@ class FakeRegistryConnection(object):
def getresponse(self):
mapper = routes.Mapper()
server = rserver.API(mapper)
server = self.registry.API(mapper)
# NOTE(markwash): we need to pass through context auth information if
# we have it.
if 'X-Auth-Token' in self.req.headers:
@ -74,7 +79,7 @@ class FakeRegistryConnection(object):
data=webob_res.body)
def stub_out_registry_and_store_server(stubs, base_dir):
def stub_out_registry_and_store_server(stubs, base_dir, **kwargs):
"""
Mocks calls to 127.0.0.1 on 9191 and 9292 for testing so
that a real Glance server does not need to be up and
@ -171,7 +176,8 @@ def stub_out_registry_and_store_server(stubs, base_dir):
return FakeGlanceConnection
elif (client.port == DEFAULT_REGISTRY_PORT and
client.host == '0.0.0.0'):
return FakeRegistryConnection
rserver = kwargs.get("registry", None)
return FakeRegistryConnection(registry=rserver)
def fake_image_iter(self):
for i in self.source.app_iter:
@ -208,7 +214,8 @@ def stub_out_registry_server(stubs, **kwargs):
if (client.port == DEFAULT_REGISTRY_PORT and
client.host == '0.0.0.0'):
return FakeRegistryConnection
rserver = kwargs.pop("registry", None)
return FakeRegistryConnection(registry=rserver)
def fake_image_iter(self):
for i in self.response.app_iter:

View File

@ -48,6 +48,7 @@ class IsolatedUnitTest(StoreClearingUnitTest):
Unit test case that establishes a mock environment within
a testing directory (in isolation)
"""
registry = None
def setUp(self):
super(IsolatedUnitTest, self).setUp()
@ -60,7 +61,9 @@ class IsolatedUnitTest(StoreClearingUnitTest):
default_store='filesystem',
filesystem_store_datadir=os.path.join(self.test_dir),
policy_file=policy_file)
stubs.stub_out_registry_and_store_server(self.stubs, self.test_dir)
stubs.stub_out_registry_and_store_server(self.stubs,
self.test_dir,
registry=self.registry)
self.addCleanup(self.stubs.UnsetAll)
def _copy_data_file(self, file_name, dst_dir):

View File

@ -24,6 +24,7 @@ import webob
from glance.common import rpc
from glance.common import wsgi
from glance.common import exception
from glance.tests.unit import base
from glance.tests import utils as test_utils
@ -197,3 +198,49 @@ class TestRPCController(base.IsolatedUnitTest):
returned = json.loads(res.body)[0]
self.assertEquals(returned['_error']['cls'],
'glance.common.exception.RPCError')
class TestRPCClient(base.IsolatedUnitTest):
def setUp(self):
super(TestRPCClient, self).setUp()
self.api = create_api()
self.client = rpc.RPCClient(host="http://127.0.0.1:9191")
self.client._do_request = self.fake_request
def fake_request(self, method, url, body, headers):
req = webob.Request.blank(url.path)
req.body = body
req.method = method
webob_res = req.get_response(self.api)
return test_utils.FakeHTTPResponse(status=webob_res.status_int,
headers=webob_res.headers,
data=webob_res.body)
def test_method_proxy(self):
proxy = self.client.some_method
self.assertIn("method_proxy", str(proxy))
def test_bulk_request(self):
commands = [{"command": "get_images", 'kwargs': {'keyword': True}},
{"command": "get_all_images"}]
res = self.client.bulk_request(commands)
self.assertEquals(len(res), 2)
self.assertTrue(res[0])
self.assertFalse(res[1])
def test_exception_raise(self):
try:
self.client.raise_value_error()
self.fail("Exception not raised")
except ValueError as exc:
self.assertEquals(str(exc), "Yep, Just like that!")
def test_rpc_exception(self):
try:
self.client.raise_weird_error()
self.fail("Exception not raised")
except exception.RPCError:
pass

View File

@ -38,7 +38,7 @@ UUID2 = _gen_uuid()
config.parse_args(args=[])
class TestRegistryClient(base.IsolatedUnitTest):
class TestRegistryV1Client(base.IsolatedUnitTest):
"""
Test proper actions made for both valid and invalid requests
@ -47,7 +47,7 @@ class TestRegistryClient(base.IsolatedUnitTest):
def setUp(self):
"""Establish a clean test environment"""
super(TestRegistryClient, self).setUp()
super(TestRegistryV1Client, self).setUp()
db_api.setup_db_env()
db_api.get_engine()
self.context = context.RequestContext(is_admin=True)
@ -86,7 +86,7 @@ class TestRegistryClient(base.IsolatedUnitTest):
def tearDown(self):
"""Clear the test environment"""
super(TestRegistryClient, self).tearDown()
super(TestRegistryV1Client, self).tearDown()
self.destroy_fixtures()
def create_fixtures(self):

View File

@ -0,0 +1,775 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for Glance Registry's client.
This tests are temporary and will be removed once
the registry's driver tests will be added.
"""
import copy
import datetime
from glance.common import config
from glance.common import exception
from glance import context
from glance.db.sqlalchemy import api as db_api
from glance.db.sqlalchemy import models as db_models
from glance.openstack.common import timeutils
from glance.openstack.common import uuidutils
from glance.registry.client.v2.api import client as rclient
from glance.registry.api import v2 as rserver
from glance.tests.unit import base
_gen_uuid = uuidutils.generate_uuid
UUID1 = _gen_uuid()
UUID2 = _gen_uuid()
#NOTE(bcwaldon): needed to init config_dir cli opt
config.parse_args(args=[])
class TestRegistryV2Client(base.IsolatedUnitTest):
"""
Test proper actions made for both valid and invalid requests
against a Registry service
"""
# Registry server to user
# in the stub.
registry = rserver
def setUp(self):
"""Establish a clean test environment"""
super(TestRegistryV2Client, self).setUp()
db_api.setup_db_env()
db_api.get_engine()
self.context = context.RequestContext(is_admin=True)
self.FIXTURES = [
{'id': UUID1,
'name': 'fake image #1',
'status': 'active',
'disk_format': 'ami',
'container_format': 'ami',
'is_public': False,
'created_at': timeutils.utcnow(),
'updated_at': timeutils.utcnow(),
'deleted_at': None,
'deleted': False,
'checksum': None,
'size': 13,
'location': "swift://user:passwd@acct/container/obj.tar.0",
'properties': {'type': 'kernel'}},
{'id': UUID2,
'name': 'fake image #2',
'status': 'active',
'disk_format': 'vhd',
'container_format': 'ovf',
'is_public': True,
'created_at': timeutils.utcnow(),
'updated_at': timeutils.utcnow(),
'deleted_at': None,
'deleted': False,
'checksum': None,
'size': 19,
'location': "file:///tmp/glance-tests/2",
'properties': {}}]
self.destroy_fixtures()
self.create_fixtures()
self.client = rclient.RegistryClient("0.0.0.0")
def tearDown(self):
"""Clear the test environment"""
super(TestRegistryV2Client, self).tearDown()
self.destroy_fixtures()
def create_fixtures(self):
for fixture in self.FIXTURES:
db_api.image_create(self.context, fixture)
def destroy_fixtures(self):
# Easiest to just drop the models and re-create them...
db_models.unregister_models(db_api._ENGINE)
db_models.register_models(db_api._ENGINE)
def test_image_get_index(self):
"""Test correct set of public image returned"""
images = self.client.image_get_all()
self.assertEquals(len(images), 2)
def test_create_image_with_null_min_disk_min_ram(self):
UUID3 = _gen_uuid()
extra_fixture = {
'id': UUID3,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
'size': 19,
'checksum': None,
'min_disk': None,
'min_ram': None,
}
db_api.image_create(self.context, extra_fixture)
image = self.client.image_get(image_id=UUID3)
self.assertEqual(0, image["min_ram"])
self.assertEqual(0, image["min_disk"])
def test_get_index_sort_name_asc(self):
"""
Tests that the registry API returns list of
public images sorted alphabetically by name in
ascending order.
"""
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'xyz',
'size': 20,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(sort_key='name', sort_dir='asc')
self.assertEquals(len(images), 4)
self.assertEquals(images[0]['id'], UUID3)
self.assertEquals(images[1]['id'], UUID1)
self.assertEquals(images[2]['id'], UUID2)
self.assertEquals(images[3]['id'], UUID4)
def test_get_index_sort_status_desc(self):
"""
Tests that the registry API returns list of
public images sorted alphabetically by status in
descending order.
"""
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'queued',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'asdf',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'xyz',
'size': 20,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(sort_key='status', sort_dir='desc')
self.assertEquals(len(images), 4)
self.assertEquals(images[0]['id'], UUID3)
self.assertEquals(images[1]['id'], UUID4)
self.assertEquals(images[2]['id'], UUID2)
self.assertEquals(images[3]['id'], UUID1)
def test_get_index_sort_disk_format_asc(self):
"""
Tests that the registry API returns list of
public images sorted alphabetically by disk_format in
ascending order.
"""
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'disk_format': 'ami',
'container_format': 'ami',
'name': 'asdf',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'disk_format': 'vdi',
'container_format': 'ovf',
'name': 'xyz',
'size': 20,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(sort_key='disk_format',
sort_dir='asc')
self.assertEquals(len(images), 4)
self.assertEquals(images[0]['id'], UUID1)
self.assertEquals(images[1]['id'], UUID3)
self.assertEquals(images[2]['id'], UUID4)
self.assertEquals(images[3]['id'], UUID2)
def test_get_index_sort_container_format_desc(self):
"""
Tests that the registry API returns list of
public images sorted alphabetically by container_format in
descending order.
"""
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'disk_format': 'ami',
'container_format': 'ami',
'name': 'asdf',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'disk_format': 'iso',
'container_format': 'bare',
'name': 'xyz',
'size': 20,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(sort_key='container_format',
sort_dir='desc')
self.assertEquals(len(images), 4)
self.assertEquals(images[0]['id'], UUID2)
self.assertEquals(images[1]['id'], UUID4)
self.assertEquals(images[2]['id'], UUID3)
self.assertEquals(images[3]['id'], UUID1)
def test_get_index_sort_size_asc(self):
"""
Tests that the registry API returns list of
public images sorted by size in ascending order.
"""
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'disk_format': 'ami',
'container_format': 'ami',
'name': 'asdf',
'size': 100,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'disk_format': 'iso',
'container_format': 'bare',
'name': 'xyz',
'size': 2,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(sort_key='size', sort_dir='asc')
self.assertEquals(len(images), 4)
self.assertEquals(images[0]['id'], UUID4)
self.assertEquals(images[1]['id'], UUID1)
self.assertEquals(images[2]['id'], UUID2)
self.assertEquals(images[3]['id'], UUID3)
def test_get_index_sort_created_at_asc(self):
"""
Tests that the registry API returns list of
public images sorted by created_at in ascending order.
"""
now = timeutils.utcnow()
time1 = now + datetime.timedelta(seconds=5)
time2 = now
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None,
'created_at': time1}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 20,
'checksum': None,
'created_at': time2}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(sort_key='created_at',
sort_dir='asc')
self.assertEquals(len(images), 4)
self.assertEquals(images[0]['id'], UUID1)
self.assertEquals(images[1]['id'], UUID2)
self.assertEquals(images[2]['id'], UUID4)
self.assertEquals(images[3]['id'], UUID3)
def test_get_index_sort_updated_at_desc(self):
"""
Tests that the registry API returns list of
public images sorted by updated_at in descending order.
"""
now = timeutils.utcnow()
time1 = now + datetime.timedelta(seconds=5)
time2 = now
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None,
'created_at': None,
'updated_at': time1}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 20,
'checksum': None,
'created_at': None,
'updated_at': time2}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(sort_key='updated_at',
sort_dir='desc')
self.assertEquals(len(images), 4)
self.assertEquals(images[0]['id'], UUID3)
self.assertEquals(images[1]['id'], UUID4)
self.assertEquals(images[2]['id'], UUID2)
self.assertEquals(images[3]['id'], UUID1)
def test_image_get_index_marker(self):
"""Test correct set of images returned with marker param."""
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #125',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(marker=UUID4)
self.assertEquals(len(images), 3)
self.assertEquals(images[0]['id'], UUID3)
self.assertEquals(images[1]['id'], UUID2)
self.assertEquals(images[2]['id'], UUID1)
def test_image_get_index_limit(self):
"""Test correct number of images returned with limit param."""
extra_fixture = {'id': _gen_uuid(),
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
extra_fixture = {'id': _gen_uuid(),
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #125',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(limit=2)
self.assertEquals(len(images), 2)
def test_image_get_index_marker_limit(self):
"""Test correct set of images returned with marker/limit params."""
UUID3 = _gen_uuid()
extra_fixture = {'id': UUID3,
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
UUID4 = _gen_uuid()
extra_fixture = {'id': UUID4,
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #125',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(marker=UUID3, limit=1)
self.assertEquals(len(images), 1)
self.assertEquals(images[0]['id'], UUID2)
def test_image_get_index_limit_None(self):
"""Test correct set of images returned with limit param == None."""
extra_fixture = {'id': _gen_uuid(),
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
extra_fixture = {'id': _gen_uuid(),
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #125',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(limit=None)
self.assertEquals(len(images), 4)
def test_image_get_index_by_name(self):
"""
Test correct set of public, name-filtered image returned. This
is just a sanity check, we test the details call more in-depth.
"""
extra_fixture = {'id': _gen_uuid(),
'status': 'active',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None}
db_api.image_create(self.context, extra_fixture)
images = self.client.image_get_all(filters={'name': 'new name! #123'})
self.assertEquals(len(images), 1)
for image in images:
self.assertEquals('new name! #123', image['name'])
def test_image_get_is_public_v2(self):
"""Tests that a detailed call can be filtered by a property"""
extra_fixture = {'id': _gen_uuid(),
'status': 'saving',
'is_public': True,
'disk_format': 'vhd',
'container_format': 'ovf',
'name': 'new name! #123',
'size': 19,
'checksum': None,
'properties': {'is_public': 'avalue'}}
context = copy.copy(self.context)
db_api.image_create(context, extra_fixture)
filters = {'is_public': 'avalue'}
images = self.client.image_get_all(filters=filters)
self.assertEquals(len(images), 1)
for image in images:
self.assertEquals('avalue', image['properties'][0]['value'])
def test_image_get(self):
"""Tests that the detailed info about an image returned"""
fixture = {'id': UUID1,
'name': 'fake image #1',
'is_public': False,
'disk_format': 'ami',
'container_format': 'ami',
'status': 'active',
'size': 13}
data = self.client.image_get(image_id=UUID1)
for k, v in fixture.items():
el = data[k]
self.assertEquals(v, data[k],
"Failed v != data[k] where v = %(v)s and "
"k = %(k)s and data[k] = %(el)s" %
dict(v=v, k=k, el=el))
def test_image_get_non_existing(self):
"""Tests that NotFound is raised when getting a non-existing image"""
self.assertRaises(exception.NotFound,
self.client.image_get,
image_id=_gen_uuid())
def test_image_create_basic(self):
"""Tests that we can add image metadata and returns the new id"""
fixture = {
'name': 'fake public image',
'is_public': True,
'disk_format': 'vmdk',
'container_format': 'ovf',
'size': 19,
'status': 'active',
}
new_image = self.client.image_create(values=fixture)
# Test all other attributes set
data = self.client.image_get(image_id=new_image['id'])
for k, v in fixture.items():
self.assertEquals(v, data[k])
# Test status was updated properly
self.assertTrue('status' in data.keys())
self.assertEquals('active', data['status'])
def test_image_create_with_properties(self):
"""Tests that we can add image metadata with properties"""
fixture = {'name': 'fake public image',
'is_public': True,
'disk_format': 'vmdk',
'container_format': 'ovf',
'size': 19,
'status': 'active',
'location': "file:///tmp/glance-tests/2",
'properties': {'distro': 'Ubuntu 10.04 LTS'}}
new_image = self.client.image_create(values=fixture)
self.assertIn('properties', new_image)
self.assertEquals(new_image['properties'][0]['value'],
fixture['properties']['distro'])
del fixture['location']
del fixture['properties']
for k, v in fixture.items():
self.assertEquals(v, new_image[k])
# Test status was updated properly
self.assertTrue('status' in new_image.keys())
self.assertEquals('active', new_image['status'])
def test_image_create_already_exists(self):
"""Tests proper exception is raised if image with ID already exists"""
fixture = {
'id': UUID2,
'name': 'fake public image',
'is_public': True,
'disk_format': 'vmdk',
'container_format': 'ovf',
'size': 19,
'status': 'active',
'location': "file:///tmp/glance-tests/2",
}
self.assertRaises(exception.Duplicate,
self.client.image_create,
values=fixture)
def test_image_create_with_bad_status(self):
"""Tests proper exception is raised if a bad status is set"""
fixture = {
'name': 'fake public image',
'is_public': True,
'disk_format': 'vmdk',
'container_format': 'ovf',
'status': 'bad status',
'size': 19,
'location': "file:///tmp/glance-tests/2",
}
self.assertRaises(exception.Invalid,
self.client.image_create,
values=fixture)
def test_image_update(self):
"""Tests that the registry API updates the image"""
fixture = {'name': 'fake public image #2',
'disk_format': 'vmdk'}
self.assertTrue(self.client.image_update(image_id=UUID2,
values=fixture))
# Test all other attributes set
data = self.client.image_get(image_id=UUID2)
for k, v in fixture.items():
self.assertEquals(v, data[k])
def test_image_update_not_existing(self):
"""Tests non existing image update doesn't work"""
fixture = {
'name': 'fake public image',
'is_public': True,
'disk_format': 'vmdk',
'container_format': 'ovf',
'status': 'bad status',
}
self.assertRaises(exception.NotFound,
self.client.image_update,
image_id=_gen_uuid(),
values=fixture)
def test_image_destroy(self):
"""Tests that image metadata is deleted properly"""
# Grab the original number of images
orig_num_images = len(self.client.image_get_all())
# Delete image #2
image = self.FIXTURES[1]
deleted_image = self.client.image_destroy(image_id=image['id'])
self.assertTrue(deleted_image)
self.assertEquals(image['id'], deleted_image['id'])
self.assertTrue(deleted_image['deleted'])
self.assertTrue(deleted_image['deleted_at'])
# Verify one less image
filters = {'deleted': False}
new_num_images = len(self.client.image_get_all(filters=filters))
self.assertEquals(new_num_images, orig_num_images - 1)
def test_image_destroy_not_existing(self):
"""Tests cannot delete non-existing image"""
self.assertRaises(exception.NotFound,
self.client.image_destroy,
image_id=_gen_uuid())
def test_image_get_members(self):
"""Tests getting image members"""
memb_list = self.client.image_member_find(image_id=UUID2)
num_members = len(memb_list)
self.assertEquals(num_members, 0)
def test_image_get_members_not_existing(self):
"""Tests getting non-existent image members"""
self.assertRaises(exception.NotFound,
self.client.image_get_members,
image_id=_gen_uuid())
def test_image_member_find(self):
"""Tests getting member images"""
memb_list = self.client.image_member_find(member='pattieblack')
num_members = len(memb_list)
self.assertEquals(num_members, 0)
def test_add_update_members(self):
"""Tests updating image members"""
values = dict(image_id=UUID2, member='pattieblack')
member = self.client.image_member_create(values=values)
self.assertTrue(member)
values['member'] = 'pattieblack2'
self.assertTrue(self.client.image_member_update(memb_id=member['id'],
values=values))
def test_add_delete_member(self):
"""Tests deleting image members"""
values = dict(image_id=UUID2, member='pattieblack')
member = self.client.image_member_create(values=values)
self.client.image_member_delete(memb_id=member['id'])
memb_list = self.client.image_member_find(member='pattieblack')
self.assertEquals(len(memb_list), 0)