Enable tasks REST API for async worker

- add REST API layer for async worker(tasks) to
support tasks operations.
- update to include sparse task for tasks.index
- add domain proxies for handling  authorization,
policy enforce, notification while processing
task requests.
- add Task domain entity and TaskFactory to create
new Task domain entity objects.
- add integration test for testing tasks api

Co-authored-by: Fei Long Wang <flwang@cn.ibm.com>

Partially implement blueprint async-glance-workers

Change-Id: I072cbf351c06f59a7702733b652bfa63e5abbaa6
This commit is contained in:
Venkatesh Sampath 2013-09-12 21:57:13 +08:00 committed by Nikhil Komawar
parent d43284c110
commit be23b19905
15 changed files with 1850 additions and 70 deletions

View File

@ -328,7 +328,7 @@ class ImmutableTaskProxy(object):
updated_at = _immutable_attr('base', 'updated_at')
def run(self, executor):
raise NotImplementedError()
self.base.run(executor)
def begin_processing(self):
message = _("You are not permitted to set status on this task.")
@ -377,16 +377,14 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
proxy_kwargs=None
)
def new_task(self, task_type, task_input, owner):
def new_task(self, **kwargs):
owner = kwargs.get('owner', self.context.owner)
#NOTE(nikhil): Unlike Images, Tasks are expected to have owner.
# We currently do not allow even admins to set the owner to None.
if owner is not None and (owner == self.context.owner
or self.context.is_admin):
return super(TaskFactoryProxy, self).new_task(
task_type,
task_input,
owner
)
return super(TaskFactoryProxy, self).new_task(**kwargs)
else:
message = _("You are not permitted to create this task with "
"owner as: %s")

View File

@ -20,6 +20,7 @@ from glance.api.v2 import image_members
from glance.api.v2 import image_tags
from glance.api.v2 import images
from glance.api.v2 import schemas
from glance.api.v2 import tasks
from glance.common import wsgi
@ -47,6 +48,14 @@ class API(wsgi.Router):
controller=schemas_resource,
action='members',
conditions={'method': ['GET']})
mapper.connect('/schemas/task',
controller=schemas_resource,
action='task',
conditions={'method': ['GET']})
mapper.connect('/schemas/tasks',
controller=schemas_resource,
action='tasks',
conditions={'method': ['GET']})
images_resource = images.create_resource(custom_image_properties)
mapper.connect('/images',
@ -112,4 +121,18 @@ class API(wsgi.Router):
action='delete',
conditions={'method': ['DELETE']})
tasks_resource = tasks.create_resource()
mapper.connect('/tasks',
controller=tasks_resource,
action='create',
conditions={'method': ['POST']})
mapper.connect('/tasks',
controller=tasks_resource,
action='index',
conditions={'method': ['GET']})
mapper.connect('/tasks/{task_id}',
controller=tasks_resource,
action='get',
conditions={'method': ['GET']})
super(API, self).__init__(mapper)

View File

@ -15,6 +15,7 @@
from glance.api.v2 import images
from glance.api.v2 import image_members
from glance.api.v2 import tasks
from glance.common import wsgi
@ -25,6 +26,8 @@ class Controller(object):
custom_image_properties)
self.member_schema = image_members.get_schema()
self.member_collection_schema = image_members.get_collection_schema()
self.task_schema = tasks.get_task_schema()
self.task_collection_schema = tasks.get_collection_schema()
def image(self, req):
return self.image_schema.raw()
@ -38,6 +41,12 @@ class Controller(object):
def members(self, req):
return self.member_collection_schema.minimal()
def task(self, req):
return self.task_schema.minimal()
def tasks(self, req):
return self.task_collection_schema.minimal()
def create_resource(custom_image_properties=None):
controller = Controller(custom_image_properties)

345
glance/api/v2/tasks.py Normal file
View File

@ -0,0 +1,345 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import urllib
import webob.exc
from oslo.config import cfg
from glance.openstack.common import uuidutils
import glance.db
import glance.gateway
import glance.notifier
import glance.schema
import glance.store
from glance.api import policy
from glance.common import wsgi
from glance.common import exception
import glance.openstack.common.jsonutils as json
from glance.openstack.common import timeutils
CONF = cfg.CONF
class TasksController(object):
"""Manages operations on tasks."""
def __init__(self, db_api=None, policy_enforcer=None, notifier=None,
store_api=None):
self.db_api = db_api or glance.db.get_api()
self.db_api.setup_db_env()
self.policy = policy_enforcer or policy.Enforcer()
self.notifier = notifier or glance.notifier.Notifier()
self.store_api = store_api or glance.store
self.gateway = glance.gateway.Gateway(self.db_api, self.store_api,
self.notifier, self.policy)
def create(self, req, task):
task_factory = self.gateway.get_task_factory(req.context)
task_repo = self.gateway.get_task_repo(req.context)
try:
new_task = task_factory.new_task(task_type=task['type'],
task_input=task['input'],
owner=req.context.owner)
task_repo.add(new_task)
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=unicode(e))
return new_task
def index(self, req, marker=None, limit=None, sort_key='created_at',
sort_dir='desc', filters=None):
result = {}
if filters is None:
filters = {}
filters['deleted'] = False
if limit is None:
limit = CONF.limit_param_default
limit = min(CONF.api_limit_max, limit)
task_repo = self.gateway.get_task_repo(req.context)
try:
tasks = task_repo.list(marker, limit, sort_key, sort_dir, filters)
if len(tasks) != 0 and len(tasks) == limit:
result['next_marker'] = tasks[-1].task_id
except (exception.NotFound, exception.InvalidSortKey,
exception.InvalidFilterRangeValue) as e:
raise webob.exc.HTTPBadRequest(explanation=unicode(e))
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=unicode(e))
result['tasks'] = tasks
return result
def get(self, req, task_id):
try:
task_repo = self.gateway.get_task_repo(req.context)
task = task_repo.get(task_id)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=unicode(e))
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=unicode(e))
return task
class RequestDeserializer(wsgi.JSONRequestDeserializer):
_required_properties = ['type', 'input']
def _get_request_body(self, request):
output = super(RequestDeserializer, self).default(request)
if 'body' not in output:
msg = _('Body expected in request')
raise webob.exc.HTTPBadRequest(explanation=msg)
return output['body']
def _validate_sort_dir(self, sort_dir):
if sort_dir not in ['asc', 'desc']:
msg = _('Invalid sort direction: %s') % sort_dir
raise webob.exc.HTTPBadRequest(explanation=msg)
return sort_dir
def _get_filters(self, filters):
status = filters.get('status', None)
if status:
if status not in ['pending', 'processing', 'success', 'failure']:
msg = _('Invalid status value: %s') % status
raise webob.exc.HTTPBadRequest(explanation=msg)
type = filters.get('type', None)
if type:
if type not in ['import']:
msg = _('Invalid type value: %s') % type
raise webob.exc.HTTPBadRequest(explanation=msg)
return filters
def _validate_marker(self, marker):
if marker and not uuidutils.is_uuid_like(marker):
msg = _('Invalid marker format')
raise webob.exc.HTTPBadRequest(explanation=msg)
return marker
def _validate_limit(self, limit):
try:
limit = int(limit)
except ValueError:
msg = _("limit param must be an integer")
raise webob.exc.HTTPBadRequest(explanation=msg)
if limit < 0:
msg = _("limit param must be positive")
raise webob.exc.HTTPBadRequest(explanation=msg)
return limit
def _validate_create_body(self, body):
"""Validate the body of task creating request"""
for param in self._required_properties:
if param not in body:
msg = _("Task '%s' is required") % param
raise webob.exc.HTTPBadRequest(explanation=unicode(msg))
#NOTE(venkatesh): this import type validation needs to be
# moved from here
task_type = body['type']
if task_type == 'import':
for key in ['import_from', 'import_from_format',
'image_properties']:
if key not in body['input']:
msg = _("Input does not contain '%s' field") % key
raise webob.exc.HTTPBadRequest(explanation=unicode(msg))
def __init__(self, schema=None):
super(RequestDeserializer, self).__init__()
self.schema = schema or get_task_schema()
def create(self, request):
body = self._get_request_body(request)
self._validate_create_body(body)
try:
self.schema.validate(body)
except exception.InvalidObject as e:
raise webob.exc.HTTPBadRequest(explanation=unicode(e))
task = {}
properties = body
for key in self._required_properties:
try:
task[key] = properties.pop(key)
except KeyError:
pass
return dict(task=task)
def index(self, request):
params = request.params.copy()
limit = params.pop('limit', None)
marker = params.pop('marker', None)
sort_dir = params.pop('sort_dir', 'desc')
query_params = {
'sort_key': params.pop('sort_key', 'created_at'),
'sort_dir': self._validate_sort_dir(sort_dir),
'filters': self._get_filters(params)
}
if marker is not None:
query_params['marker'] = self._validate_marker(marker)
if limit is not None:
query_params['limit'] = self._validate_limit(limit)
return query_params
class ResponseSerializer(wsgi.JSONResponseSerializer):
def __init__(self, task_schema=None, partial_task_schema=None):
super(ResponseSerializer, self).__init__()
self.task_schema = task_schema or get_task_schema()
self.partial_task_schema = partial_task_schema \
or _get_partial_task_schema()
def _format_task(self, task, schema):
task_view = {}
attributes = ['type', 'status', 'input', 'result', 'owner', 'message']
for key in attributes:
task_view[key] = getattr(task, key)
task_view['id'] = task.task_id
task_view['expires_at'] = timeutils.isotime(task.expires_at)
task_view['created_at'] = timeutils.isotime(task.created_at)
task_view['updated_at'] = timeutils.isotime(task.updated_at)
task_view['self'] = '/v2/tasks/%s' % task.task_id
task_view['schema'] = '/v2/schemas/task'
task_view = schema.filter(task_view) # domain
return task_view
def create(self, response, task):
response.status_int = 201
self.get(response, task)
def get(self, response, task):
task_view = self._format_task(task, self.task_schema)
body = json.dumps(task_view, ensure_ascii=False)
response.unicode_body = unicode(body)
response.content_type = 'application/json'
def index(self, response, result):
params = dict(response.request.params)
params.pop('marker', None)
query = urllib.urlencode(params)
body = {
'tasks': [self._format_task(i, self.partial_task_schema)
for i in result['tasks']],
'first': '/v2/tasks',
'schema': '/v2/schemas/tasks',
}
if query:
body['first'] = '%s?%s' % (body['first'], query)
if 'next_marker' in result:
params['marker'] = result['next_marker']
next_query = urllib.urlencode(params)
body['next'] = '/v2/tasks?%s' % next_query
response.unicode_body = unicode(json.dumps(body, ensure_ascii=False))
response.content_type = 'application/json'
_TASK_SCHEMA = {
"id": {
"description": _("An identifier for the task"),
"pattern": _('^([0-9a-fA-F]){8}-([0-9a-fA-F]){4}-([0-9a-fA-F]){4}'
'-([0-9a-fA-F]){4}-([0-9a-fA-F]){12}$'),
"type": "string"
},
"type": {
"description": _("The type of task represented by this content"),
"enum": [
"import",
],
"type": "string"
},
"status": {
"description": _("The current status of this task"),
"enum": [
"pending",
"processing",
"success",
"failure"
],
"type": "string"
},
"input": {
"description": _("The parameters required by task, JSON blob"),
"type": "object"
},
"result": {
"description": _("The result of current task, JSON blob"),
"type": "object",
},
"owner": {
"description": _("An identifier for the owner of this task"),
"type": "string"
},
"message": {
"description": _("Human-readable informative message only included"
" when appropriate (usually on failure)"),
"type": "string",
},
"expires_at": {
"description": _("Datetime when this resource would be"
" subject to removal"),
"type": "string"
},
"created_at": {
"description": _("Datetime when this resource was created"),
"type": "string"
},
"updated_at": {
"description": _("Datetime when this resource was updated"),
"type": "string"
},
'self': {'type': 'string'},
'schema': {'type': 'string'}
}
def get_task_schema():
properties = copy.deepcopy(_TASK_SCHEMA)
schema = glance.schema.Schema('task', properties)
return schema
def _get_partial_task_schema():
properties = copy.deepcopy(_TASK_SCHEMA)
hide_properties = ['input', 'result', 'message']
for key in hide_properties:
del properties[key]
schema = glance.schema.Schema('task', properties)
return schema
def get_collection_schema():
task_schema = _get_partial_task_schema()
return glance.schema.CollectionSchema('tasks', task_schema)
def create_resource():
"""Task resource factory method"""
task_schema = get_task_schema()
partial_task_schema = _get_partial_task_schema()
deserializer = RequestDeserializer(task_schema)
serializer = ResponseSerializer(task_schema, partial_task_schema)
controller = TasksController()
return wsgi.Resource(controller, deserializer, serializer)

View File

@ -262,14 +262,7 @@ class Task(object):
return self._status
def run(self, executor):
# NOTE(flwang): The task status won't be set here but handled by the
# executor.
# NOTE(nikhil): Ideally, a task should always be instantiated with an
# executor. However, we need to make that a part of the framework
# and we are planning to add such logic when Controller would
# be introduced.
if executor:
executor.run(self.task_id)
pass
def _validate_task_status_transition(self, cur_status, new_status):
valid_transitions = {
@ -319,6 +312,7 @@ class Task(object):
class TaskFactory(object):
def new_task(self, task_type, task_input, owner):
task_id = uuidutils.generate_uuid()
status = 'pending'

View File

@ -149,7 +149,7 @@ class Task(object):
updated_at = _proxy('base', 'updated_at')
def run(self, executor):
raise NotImplementedError()
self.base.run(executor)
def begin_processing(self):
self.base.begin_processing()
@ -166,6 +166,6 @@ class TaskFactory(object):
self.helper = Helper(proxy_class, proxy_kwargs)
self.base = base
def new_task(self, task_type, task_input, owner):
t = self.base.new_task(task_type, task_input, owner)
def new_task(self, **kwargs):
t = self.base.new_task(**kwargs)
return self.helper.proxy(t)

View File

@ -0,0 +1,131 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import fixtures
import requests
from glance.openstack.common import uuidutils
from glance.tests import functional
TENANT1 = uuidutils.generate_uuid()
TENANT2 = uuidutils.generate_uuid()
TENANT3 = uuidutils.generate_uuid()
TENANT4 = uuidutils.generate_uuid()
class TestTasks(functional.FunctionalTest):
def setUp(self):
super(TestTasks, self).setUp()
self.cleanup()
self.file_path = self._stash_file()
self.api_server.deployment_flavor = 'noauth'
self.start_servers(**self.__dict__.copy())
def _url(self, path):
return 'http://127.0.0.1:%d%s' % (self.api_port, path)
def _headers(self, custom_headers=None):
base_headers = {
'X-Identity-Status': 'Confirmed',
'X-Auth-Token': '932c5c84-02ac-4fe5-a9ba-620af0e2bb96',
'X-User-Id': 'f9a41d13-0c13-47e9-bee2-ce4e8bfe958e',
'X-Tenant-Id': TENANT1,
'X-Roles': 'member',
}
base_headers.update(custom_headers or {})
return base_headers
def _stash_file(self):
self.tmp_dir = self.useFixture(fixtures.TempDir()).path
self.store_dir = os.path.join(self.tmp_dir, 'images')
os.mkdir(self.store_dir)
file_path = os.path.join(self.store_dir, 'foo')
with open(file_path, 'w') as f:
f.write('blah')
return 'file://%s' % file_path
def test_task_lifecycle(self):
# Task list should be empty
path = self._url('/v2/tasks')
response = requests.get(path, headers=self._headers())
self.assertEqual(200, response.status_code)
tasks = json.loads(response.text)['tasks']
self.assertEqual(0, len(tasks))
# Create a task
path = self._url('/v2/tasks')
headers = self._headers({'content-type': 'application/json'})
data = json.dumps({
"type": "import",
"input": {
"import_from": self.file_path,
"import_from_format": "qcow2",
"image_properties": {
'disk_format': 'vhd',
'container_format': 'ovf'
}
}
})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned task entity should have a generated id and status
task = json.loads(response.text)
task_id = task['id']
checked_keys = set([u'created_at',
u'expires_at',
u'id',
u'input',
u'owner',
u'schema',
u'self',
u'status',
u'type',
u'updated_at'])
self.assertEqual(set(task.keys()), checked_keys)
expected_task = {
'status': 'pending',
'type': 'import',
'input': {
"import_from": self.file_path,
"import_from_format": "qcow2",
"image_properties": {
'disk_format': 'vhd',
'container_format': 'ovf'
}},
'schema': '/v2/schemas/task',
}
for key, value in expected_task.items():
self.assertEqual(task[key], value, key)
# Tasks list should now have one entry
path = self._url('/v2/tasks')
response = requests.get(path, headers=self._headers())
self.assertEqual(200, response.status_code)
tasks = json.loads(response.text)['tasks']
self.assertEqual(1, len(tasks))
self.assertEqual(tasks[0]['id'], task_id)
self.stop_servers()

View File

View File

@ -0,0 +1,216 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import os.path
import tempfile
import fixtures
from oslo.config import cfg
from glance import tests as glance_tests
import glance.common.client
from glance.common import config
import glance.db.sqlalchemy.api
import glance.db.sqlalchemy.migration
import glance.registry.client.v1.client
import glance.store
from glance.tests import utils as test_utils
TESTING_API_PASTE_CONF = """
[pipeline:glance-api]
pipeline = versionnegotiation gzip unauthenticated-context rootapp
[pipeline:glance-api-caching]
pipeline = versionnegotiation gzip unauthenticated-context cache rootapp
[pipeline:glance-api-cachemanagement]
pipeline =
versionnegotiation
gzip
unauthenticated-context
cache
cache_manage
rootapp
[pipeline:glance-api-fakeauth]
pipeline = versionnegotiation gzip fakeauth context rootapp
[pipeline:glance-api-noauth]
pipeline = versionnegotiation gzip context rootapp
[composite:rootapp]
paste.composite_factory = glance.api:root_app_factory
/: apiversions
/v1: apiv1app
/v2: apiv2app
[app:apiversions]
paste.app_factory = glance.api.versions:create_resource
[app:apiv1app]
paste.app_factory = glance.api.v1.router:API.factory
[app:apiv2app]
paste.app_factory = glance.api.v2.router:API.factory
[filter:versionnegotiation]
paste.filter_factory =
glance.api.middleware.version_negotiation:VersionNegotiationFilter.factory
[filter:gzip]
paste.filter_factory = glance.api.middleware.gzip:GzipMiddleware.factory
[filter:cache]
paste.filter_factory = glance.api.middleware.cache:CacheFilter.factory
[filter:cache_manage]
paste.filter_factory =
glance.api.middleware.cache_manage:CacheManageFilter.factory
[filter:context]
paste.filter_factory = glance.api.middleware.context:ContextMiddleware.factory
[filter:unauthenticated-context]
paste.filter_factory =
glance.api.middleware.context:UnauthenticatedContextMiddleware.factory
[filter:fakeauth]
paste.filter_factory = glance.tests.utils:FakeAuthMiddleware.factory
"""
TESTING_REGISTRY_PASTE_CONF = """
[pipeline:glance-registry]
pipeline = unauthenticated-context registryapp
[pipeline:glance-registry-fakeauth]
pipeline = fakeauth context registryapp
[app:registryapp]
paste.app_factory = glance.registry.api.v1:API.factory
[filter:context]
paste.filter_factory = glance.api.middleware.context:ContextMiddleware.factory
[filter:unauthenticated-context]
paste.filter_factory =
glance.api.middleware.context:UnauthenticatedContextMiddleware.factory
[filter:fakeauth]
paste.filter_factory = glance.tests.utils:FakeAuthMiddleware.factory
"""
CONF = cfg.CONF
CONF.import_opt('filesystem_store_datadir', 'glance.store.filesystem')
class ApiTest(test_utils.BaseTestCase):
def setUp(self):
super(ApiTest, self).setUp()
self.test_dir = self.useFixture(fixtures.TempDir()).path
self._configure_logging()
self._setup_database()
self._setup_stores()
self._setup_property_protection()
self.glance_registry_app = self._load_paste_app(
'glance-registry',
flavor=getattr(self, 'registry_flavor', ''),
conf=getattr(self, 'registry_paste_conf',
TESTING_REGISTRY_PASTE_CONF),
)
self._connect_registry_client()
self.glance_api_app = self._load_paste_app(
'glance-api',
flavor=getattr(self, 'api_flavor', ''),
conf=getattr(self, 'api_paste_conf', TESTING_API_PASTE_CONF),
)
self.http = test_utils.Httplib2WsgiAdapter(self.glance_api_app)
def _setup_property_protection(self):
self._copy_data_file('property-protections.conf', self.test_dir)
self.property_file = os.path.join(self.test_dir,
'property-protections.conf')
def _configure_logging(self):
self.config(default_log_levels=[
'amqplib=WARN',
'sqlalchemy=WARN',
'boto=WARN',
'suds=INFO',
'keystone=INFO',
'eventlet.wsgi.server=DEBUG'
])
def _setup_database(self):
sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir
self.config(sql_connection=sql_connection)
glance.db.sqlalchemy.api.clear_db_env()
glance_db_env = 'GLANCE_DB_TEST_SQLITE_FILE'
if glance_db_env in os.environ:
# use the empty db created and cached as a tempfile
# instead of spending the time creating a new one
db_location = os.environ[glance_db_env]
test_utils.execute('cp %s %s/tests.sqlite'
% (db_location, self.test_dir))
else:
glance.db.sqlalchemy.migration.db_sync()
# copy the clean db to a temp location so that it
# can be reused for future tests
(osf, db_location) = tempfile.mkstemp()
os.close(osf)
test_utils.execute('cp %s/tests.sqlite %s'
% (self.test_dir, db_location))
os.environ[glance_db_env] = db_location
# cleanup the temp file when the test suite is
# complete
def _delete_cached_db():
try:
os.remove(os.environ[glance_db_env])
except Exception:
glance_tests.logger.exception(
"Error cleaning up the file %s" %
os.environ[glance_db_env])
atexit.register(_delete_cached_db)
def _setup_stores(self):
image_dir = os.path.join(self.test_dir, "images")
self.config(filesystem_store_datadir=image_dir)
glance.store.create_stores()
def _load_paste_app(self, name, flavor, conf):
conf_file_path = os.path.join(self.test_dir, '%s-paste.ini' % name)
with open(conf_file_path, 'wb') as conf_file:
conf_file.write(conf)
conf_file.flush()
return config.load_paste_app(name, flavor=flavor,
conf_file=conf_file_path)
def _connect_registry_client(self):
def get_connection_type(self2):
def wrapped(*args, **kwargs):
return test_utils.HttplibWsgiAdapter(self.glance_registry_app)
return wrapped
self.stubs.Set(glance.common.client.BaseClient,
'get_connection_type', get_connection_type)
def tearDown(self):
glance.db.sqlalchemy.api.clear_db_env()
super(ApiTest, self).tearDown()

View File

@ -0,0 +1,510 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Rackspace Hosting
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from glance.api.v2 import tasks
import glance.openstack.common.jsonutils as json
from glance.tests.integration.v2 import base
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
TENANT3 = '5a3e60e8-cfa9-4a9e-a90a-62b42cea92b8'
TENANT4 = 'c6c87f25-8a94-47ed-8c83-053c25f42df4'
def minimal_task_headers(owner='tenant1'):
headers = {
'X-Auth-Token': 'user1:%s:admin' % owner,
'Content-Type': 'application/json',
}
return headers
def _new_task_fixture(**kwargs):
task_data = {
"type": "import",
"input": {
"import_from": "/some/file/path",
"import_from_format": "qcow2",
"image_properties": {
'disk_format': 'vhd',
'container_format': 'ovf'
}
}
}
task_data.update(kwargs)
return task_data
class TestTasksApi(base.ApiTest):
def __init__(self, *args, **kwargs):
super(TestTasksApi, self).__init__(*args, **kwargs)
self.api_flavor = 'fakeauth'
self.registry_flavor = 'fakeauth'
def _post_new_task(self, **kwargs):
task_owner = kwargs['owner']
headers = minimal_task_headers(task_owner)
task_data = _new_task_fixture()
body_content = json.dumps(task_data)
path = "/v2/tasks"
response, content = self.http.request(path, 'POST',
headers=headers,
body=body_content)
self.assertEqual(response.status, 201)
task = json.loads(content)
return task
def test_all_task_api(self):
# 0. GET /tasks
# Verify no tasks
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
content_dict = json.loads(content)
self.assertEqual(response.status, 200)
self.assertFalse(content_dict['tasks'])
# 1. GET /tasks/{task_id}
# Verify non-existent task
task_id = 'NON_EXISTENT_TASK'
path = "/v2/tasks/%s" % task_id
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 404)
# 2. POST /tasks
# Create a new task
task_data = _new_task_fixture()
task_owner = 'tenant1'
body_content = json.dumps(task_data)
path = "/v2/tasks"
response, content = self.http.request(path, 'POST',
headers=
minimal_task_headers(task_owner),
body=body_content)
self.assertEqual(response.status, 201)
data = json.loads(content)
task_id = data['id']
self.assertIsNotNone(task_id)
self.assertEqual(task_owner, data['owner'])
self.assertEqual(task_data['type'], data['type'])
self.assertEqual(task_data['input'], data['input'])
# 3. GET /tasks/{task_id}
# Get an existing task
path = "/v2/tasks/%s" % task_id
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
# 4. GET /tasks/{task_id}
# Get all tasks (not deleted)
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
self.assertIsNotNone(content)
data = json.loads(content)
self.assertIsNotNone(data)
self.assertEqual(1, len(data['tasks']))
#NOTE(venkatesh) find a way to get expected_keys from tasks controller
expected_keys = set(['id', 'type', 'owner', 'status',
'created_at', 'updated_at', 'expires_at',
'self', 'schema'])
task = data['tasks'][0]
self.assertEqual(expected_keys, set(task.keys()))
self.assertEqual(task_data['type'], task['type'])
self.assertEqual(task_owner, task['owner'])
self.assertEqual('pending', task['status'])
self.assertIsNotNone(task['created_at'])
self.assertIsNotNone(task['updated_at'])
self.assertIsNotNone(task['expires_at'])
def test_task_schema_api(self):
# 0. GET /schemas/task
# Verify schema for task
path = "/v2/schemas/task"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
schema = tasks.get_task_schema()
expected_schema = schema.minimal()
data = json.loads(content)
self.assertIsNotNone(data)
self.assertEqual(expected_schema, data)
# 1. GET /schemas/tasks
# Verify schema for tasks
path = "/v2/schemas/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
schema = tasks.get_collection_schema()
expected_schema = schema.minimal()
data = json.loads(content)
self.assertIsNotNone(data)
self.assertEqual(expected_schema, data)
def test_create_new_task(self):
# 0. POST /tasks
# Create a new task with valid input and type
task_data = _new_task_fixture()
task_owner = 'tenant1'
body_content = json.dumps(task_data)
path = "/v2/tasks"
response, content = self.http.request(path, 'POST',
headers=
minimal_task_headers(task_owner),
body=body_content)
self.assertEqual(response.status, 201)
data = json.loads(content)
task_id = data['id']
self.assertIsNotNone(task_id)
self.assertEqual(task_owner, data['owner'])
self.assertEqual(task_data['type'], data['type'])
self.assertEqual(task_data['input'], data['input'])
# 1. POST /tasks
# Create a new task with invalid type
# Expect BadRequest(400) Error as response
task_data = _new_task_fixture(type='invalid')
task_owner = 'tenant1'
body_content = json.dumps(task_data)
path = "/v2/tasks"
response, content = self.http.request(path, 'POST',
headers=
minimal_task_headers(task_owner),
body=body_content)
self.assertEqual(response.status, 400)
# 1. POST /tasks
# Create a new task with invalid input for type 'import'
# Expect BadRequest(400) Error as response
task_data = _new_task_fixture(input='{something: invalid}')
task_owner = 'tenant1'
body_content = json.dumps(task_data)
path = "/v2/tasks"
response, content = self.http.request(path, 'POST',
headers=
minimal_task_headers(task_owner),
body=body_content)
self.assertEqual(response.status, 400)
def test_tasks_with_filter(self):
# 0. GET /v2/tasks
# Verify no tasks
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
content_dict = json.loads(content)
self.assertFalse(content_dict['tasks'])
task_ids = []
# 1. POST /tasks with two tasks with status 'pending' and 'processing'
# with various attributes
task_owner = TENANT1
headers = minimal_task_headers(task_owner)
task_data = _new_task_fixture()
body_content = json.dumps(task_data)
path = "/v2/tasks"
response, content = self.http.request(path, 'POST',
headers=headers,
body=body_content)
self.assertEqual(response.status, 201)
data = json.loads(content)
task_ids.append(data['id'])
task_owner = TENANT2
headers = minimal_task_headers(task_owner)
task_data = _new_task_fixture()
body_content = json.dumps(task_data)
path = "/v2/tasks"
response, content = self.http.request(path, 'POST',
headers=headers,
body=body_content)
self.assertEqual(response.status, 201)
data = json.loads(content)
task_ids.append(data['id'])
# 2. GET /tasks
# Verify two import tasks
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
content_dict = json.loads(content)
self.assertEqual(2, len(content_dict['tasks']))
# 3. GET /tasks with owner filter
# Verify correct task returned with owner
params = "owner=%s" % TENANT1
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
content_dict = json.loads(content)
self.assertEqual(1, len(content_dict['tasks']))
self.assertEqual(TENANT1, content_dict['tasks'][0]['owner'])
# Check the same for different owner.
params = "owner=%s" % TENANT2
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
content_dict = json.loads(content)
self.assertEqual(1, len(content_dict['tasks']))
self.assertEqual(TENANT2, content_dict['tasks'][0]['owner'])
# 4. GET /tasks with type filter
# Verify correct task returned with type
params = "type=import"
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
content_dict = json.loads(content)
self.assertEqual(2, len(content_dict['tasks']))
actual_task_ids = [task['id'] for task in content_dict['tasks']]
self.assertEqual(set(task_ids), set(actual_task_ids))
# 5. GET /tasks with status filter
# Verify correct tasks are returned for status 'pending'
params = "status=pending"
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
content_dict = json.loads(content)
self.assertEqual(2, len(content_dict['tasks']))
actual_task_ids = [task['id'] for task in content_dict['tasks']]
self.assertEqual(set(task_ids), set(actual_task_ids))
# 6. GET /tasks with status filter
# Verify no task are returned for status which is not 'pending'
params = "status=success"
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
content_dict = json.loads(content)
self.assertEqual(0, len(content_dict['tasks']))
def test_limited_tasks(self):
"""
Ensure marker and limit query params work
"""
# 0. GET /tasks
# Verify no tasks
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
tasks = json.loads(content)
self.assertFalse(tasks['tasks'])
task_ids = []
# 1. POST /tasks with three tasks with various attributes
task = self._post_new_task(owner=TENANT1)
task_ids.append(task['id'])
task = self._post_new_task(owner=TENANT2)
task_ids.append(task['id'])
task = self._post_new_task(owner=TENANT3)
task_ids.append(task['id'])
# 2. GET /tasks
# Verify 3 tasks are returned
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
tasks = json.loads(content)['tasks']
self.assertEqual(3, len(tasks))
# 3. GET /tasks with limit of 2
# Verify only two tasks were returned
params = "limit=2"
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
actual_tasks = json.loads(content)['tasks']
self.assertEqual(2, len(actual_tasks))
self.assertEqual(tasks[0]['id'], actual_tasks[0]['id'])
self.assertEqual(tasks[1]['id'], actual_tasks[1]['id'])
# 4. GET /tasks with marker
# Verify only two tasks were returned
params = "marker=%s" % tasks[0]['id']
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
actual_tasks = json.loads(content)['tasks']
self.assertEqual(2, len(actual_tasks))
self.assertEqual(tasks[1]['id'], actual_tasks[0]['id'])
self.assertEqual(tasks[2]['id'], actual_tasks[1]['id'])
# 5. GET /tasks with marker and limit
# Verify only one task was returned with the correct id
params = "limit=1&marker=%s" % tasks[1]['id']
path = "/v2/tasks?%s" % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
actual_tasks = json.loads(content)['tasks']
self.assertEqual(1, len(actual_tasks))
self.assertEqual(tasks[2]['id'], actual_tasks[0]['id'])
def test_ordered_tasks(self):
# 0. GET /tasks
# Verify no tasks
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
tasks = json.loads(content)
self.assertFalse(tasks['tasks'])
task_ids = []
# 1. POST /tasks with three tasks with various attributes
task = self._post_new_task(owner=TENANT1)
task_ids.append(task['id'])
task = self._post_new_task(owner=TENANT2)
task_ids.append(task['id'])
task = self._post_new_task(owner=TENANT3)
task_ids.append(task['id'])
# 2. GET /tasks with no query params
# Verify three tasks sorted by created_at desc
# 2. GET /tasks
# Verify 3 tasks are returned
path = "/v2/tasks"
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
actual_tasks = json.loads(content)['tasks']
self.assertEqual(3, len(actual_tasks))
self.assertEqual(task_ids[2], actual_tasks[0]['id'])
self.assertEqual(task_ids[1], actual_tasks[1]['id'])
self.assertEqual(task_ids[0], actual_tasks[2]['id'])
# 3. GET /tasks sorted by owner asc
params = 'sort_key=owner&sort_dir=asc'
path = '/v2/tasks?%s' % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
expected_task_owners = [TENANT1, TENANT2, TENANT3]
expected_task_owners.sort()
actual_tasks = json.loads(content)['tasks']
self.assertEqual(3, len(actual_tasks))
self.assertEqual(expected_task_owners,
[t['owner'] for t in actual_tasks])
# 4. GET /tasks sorted by owner desc with a marker
params = 'sort_key=owner&sort_dir=desc&marker=%s' % task_ids[0]
path = '/v2/tasks?%s' % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
actual_tasks = json.loads(content)['tasks']
self.assertEqual(2, len(actual_tasks))
self.assertEqual(task_ids[2], actual_tasks[0]['id'])
self.assertEqual(task_ids[1], actual_tasks[1]['id'])
self.assertEqual(TENANT3, actual_tasks[0]['owner'])
self.assertEqual(TENANT2, actual_tasks[1]['owner'])
# 5. GET /tasks sorted by owner asc with a marker
params = 'sort_key=owner&sort_dir=asc&marker=%s' % task_ids[0]
path = '/v2/tasks?%s' % params
response, content = self.http.request(path, 'GET',
headers=minimal_task_headers())
self.assertEqual(response.status, 200)
actual_tasks = json.loads(content)['tasks']
self.assertEqual(0, len(actual_tasks))

View File

@ -914,13 +914,6 @@ class TestImmutableTask(utils.BaseTestCase):
def test_change_updated_at(self):
self._test_change('updated_at', 'fake')
def test_run(self):
self.assertRaises(
NotImplementedError,
self.task.run,
'executor'
)
def test_begin_processing(self):
self.assertRaises(
exception.Forbidden,
@ -961,40 +954,32 @@ class TestTaskFactoryProxy(utils.BaseTestCase):
def test_task_create_default_owner(self):
owner = self.request1.context.owner
task = self.task_factory.new_task(
self.task_type,
self.task_input,
owner
)
task = self.task_factory.new_task(task_type=self.task_type,
task_input=self.task_input,
owner=owner)
self.assertEqual(task.owner, TENANT1)
def test_task_create_wrong_owner(self):
self.assertRaises(
exception.Forbidden,
self.task_factory.new_task,
self.task_type,
self.task_input,
self.owner
)
self.assertRaises(exception.Forbidden,
self.task_factory.new_task,
task_type=self.task_type,
task_input=self.task_input,
owner=self.owner)
def test_task_create_owner_as_None(self):
self.assertRaises(
exception.Forbidden,
self.task_factory.new_task,
self.task_type,
self.task_input,
None
)
self.assertRaises(exception.Forbidden,
self.task_factory.new_task,
task_type=self.task_type,
task_input=self.task_input,
owner=None)
def test_task_create_admin_context_owner_as_None(self):
self.context.is_admin = True
self.assertRaises(
exception.Forbidden,
self.task_factory.new_task,
self.task_type,
self.task_input,
None
)
self.assertRaises(exception.Forbidden,
self.task_factory.new_task,
task_type=self.task_type,
task_input=self.task_input,
owner=None)
class TestTaskRepoProxy(utils.BaseTestCase):

View File

@ -296,15 +296,15 @@ class TestTaskFactory(test_utils.BaseTestCase):
proxy_factory = proxy.TaskFactory(self.factory)
proxy_factory.new_task(
self.fake_type,
self.fake_input,
self.fake_owner
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
self.factory.new_task.assert_called_once_with(
self.fake_type,
self.fake_input,
self.fake_owner
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
def test_proxy_wrapping(self):
@ -316,15 +316,15 @@ class TestTaskFactory(test_utils.BaseTestCase):
self.factory.new_task.return_value = 'fake_task'
task = proxy_factory.new_task(
self.fake_type,
self.fake_input,
self.fake_owner
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
self.factory.new_task.assert_called_once_with(
self.fake_type,
self.fake_input,
self.fake_owner
type=self.fake_type,
input=self.fake_input,
owner=self.fake_owner
)
self.assertTrue(isinstance(task, FakeProxy))
self.assertEqual(task.base, 'fake_task')

View File

@ -64,7 +64,7 @@ class ImageRepoStub(object):
class TaskStub(glance.domain.Task):
def run(self):
def run(self, executor):
pass
def succeed(self, result):
@ -453,11 +453,7 @@ class TestTaskNotifications(utils.BaseTestCase):
self.fail('Notification contained location field.')
def test_task_run_notification(self):
self.assertRaises(
NotImplementedError,
self.task_proxy.run,
executor=None
)
self.task_proxy.run(executor=None)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]

View File

@ -95,7 +95,7 @@ class TaskStub(object):
self.task_id = task_id
self.status = 'pending'
def run(self):
def run(self, executor):
self.status = 'processing'

View File

@ -0,0 +1,573 @@
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import json
import webob
import glance.api.v2.tasks
import glance.domain
from glance.openstack.common import timeutils
from glance.openstack.common import uuidutils
from glance.tests.unit import base
import glance.tests.unit.utils as unit_test_utils
import glance.tests.utils as test_utils
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
UUID2 = 'a85abd86-55b3-4d5b-b0b4-5d0a6e6042fc'
UUID3 = '971ec09a-8067-4bc8-a91f-ae3557f1c4c7'
UUID4 = '6bbe7cc2-eae7-4c0f-b50d-a7160b0c6a86'
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
TENANT3 = '5a3e60e8-cfa9-4a9e-a90a-62b42cea92b8'
TENANT4 = 'c6c87f25-8a94-47ed-8c83-053c25f42df4'
DATETIME = datetime.datetime(2013, 9, 28, 15, 27, 36, 325355)
ISOTIME = '2013-09-28T15:27:36Z'
def _db_fixture(id, **kwargs):
default_datetime = timeutils.utcnow()
obj = {
'id': id,
'status': 'pending',
'type': 'import',
'input': {},
'result': None,
'owner': None,
'message': None,
'expires_at': None,
'created_at': default_datetime,
'updated_at': default_datetime,
'deleted_at': None,
'deleted': False
}
obj.update(kwargs)
return obj
def _domain_fixture(id, **kwargs):
default_datetime = timeutils.utcnow()
properties = {
'task_id': id,
'status': 'pending',
'type': 'import',
'input': {},
'result': None,
'owner': None,
'message': None,
'expires_at': None,
'created_at': default_datetime,
'updated_at': default_datetime,
}
properties.update(kwargs)
return glance.domain.Task(**properties)
class TestTasksController(test_utils.BaseTestCase):
def setUp(self):
super(TestTasksController, self).setUp()
self.db = unit_test_utils.FakeDB()
self.policy = unit_test_utils.FakePolicyEnforcer()
self.notifier = unit_test_utils.FakeNotifier()
self.store = unit_test_utils.FakeStoreAPI()
self._create_tasks()
self.controller = glance.api.v2.tasks.TasksController(self.db,
self.policy,
self.notifier,
self.store)
def _create_tasks(self):
self.db.reset()
self.tasks = [
_db_fixture(UUID1, owner=TENANT1),
# FIXME(venkatesh): change the type to include clone and export
# once they are included as a valid types under Task domain model.
_db_fixture(UUID2, owner=TENANT2, type='import'),
_db_fixture(UUID3, owner=TENANT3, type='import'),
_db_fixture(UUID4, owner=TENANT4, type='import')
]
[self.db.task_create(None, task) for task in self.tasks]
def test_index(self):
self.config(limit_param_default=1, api_limit_max=3)
request = unit_test_utils.get_fake_request()
output = self.controller.index(request)
self.assertEqual(1, len(output['tasks']))
actual = set([task.task_id for task in output['tasks']])
expected = set([UUID1])
self.assertEqual(actual, expected)
def test_index_admin(self):
request = unit_test_utils.get_fake_request(is_admin=True)
output = self.controller.index(request)
self.assertEqual(4, len(output['tasks']))
def test_index_return_parameters(self):
self.config(limit_param_default=1, api_limit_max=4)
request = unit_test_utils.get_fake_request(is_admin=True)
output = self.controller.index(request, marker=UUID3, limit=1,
sort_key='created_at', sort_dir='desc')
self.assertEqual(1, len(output['tasks']))
actual = set([task.task_id for task in output['tasks']])
expected = set([UUID2])
self.assertEqual(expected, actual)
self.assertEqual(UUID2, output['next_marker'])
def test_index_next_marker(self):
self.config(limit_param_default=1, api_limit_max=3)
request = unit_test_utils.get_fake_request(is_admin=True)
output = self.controller.index(request, marker=UUID3, limit=2)
self.assertEqual(2, len(output['tasks']))
actual = set([task.task_id for task in output['tasks']])
expected = set([UUID2, UUID1])
self.assertEqual(actual, expected)
self.assertEqual(UUID1, output['next_marker'])
def test_index_no_next_marker(self):
self.config(limit_param_default=1, api_limit_max=3)
request = unit_test_utils.get_fake_request(is_admin=True)
output = self.controller.index(request, marker=UUID1, limit=2)
self.assertEqual(0, len(output['tasks']))
actual = set([task.task_id for task in output['tasks']])
expected = set([])
self.assertEqual(actual, expected)
self.assertTrue('next_marker' not in output)
def test_index_with_id_filter(self):
request = unit_test_utils.get_fake_request('/tasks?id=%s' % UUID1)
output = self.controller.index(request, filters={'id': UUID1})
self.assertEqual(1, len(output['tasks']))
actual = set([task.task_id for task in output['tasks']])
expected = set([UUID1])
self.assertEqual(actual, expected)
def test_index_with_filters_return_many(self):
path = '/tasks?status=pending'
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request, filters={'status': 'pending'})
self.assertEqual(4, len(output['tasks']))
actual = set([task.task_id for task in output['tasks']])
expected = set([UUID1, UUID2, UUID3, UUID4])
self.assertEqual(sorted(actual), sorted(expected))
def test_index_with_many_filters(self):
url = '/tasks?status=pending&type=import'
request = unit_test_utils.get_fake_request(url, is_admin=True)
output = self.controller.index(request,
filters={
'status': 'pending',
'type': 'import',
'owner': TENANT1,
})
self.assertEqual(1, len(output['tasks']))
actual = set([task.task_id for task in output['tasks']])
expected = set([UUID1])
self.assertEqual(actual, expected)
def test_index_with_marker(self):
self.config(limit_param_default=1, api_limit_max=3)
path = '/tasks'
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request, marker=UUID3)
actual = set([task.task_id for task in output['tasks']])
self.assertEquals(1, len(actual))
self.assertTrue(UUID2 in actual)
def test_index_with_limit(self):
path = '/tasks'
limit = 2
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request, limit=limit)
actual = set([task.task_id for task in output['tasks']])
self.assertEquals(limit, len(actual))
def test_index_greater_than_limit_max(self):
self.config(limit_param_default=1, api_limit_max=3)
path = '/tasks'
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request, limit=4)
actual = set([task.task_id for task in output['tasks']])
self.assertEquals(3, len(actual))
self.assertTrue(output['next_marker'] not in output)
def test_index_default_limit(self):
self.config(limit_param_default=1, api_limit_max=3)
path = '/tasks'
request = unit_test_utils.get_fake_request(path)
output = self.controller.index(request)
actual = set([task.task_id for task in output['tasks']])
self.assertEquals(1, len(actual))
def test_index_with_sort_dir(self):
path = '/tasks'
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request, sort_dir='asc', limit=3)
actual = [task.task_id for task in output['tasks']]
self.assertEquals(3, len(actual))
self.assertEqual(sorted(set(actual)),
sorted(set([UUID1, UUID2, UUID3])))
def test_index_with_sort_key(self):
path = '/tasks'
request = unit_test_utils.get_fake_request(path, is_admin=True)
output = self.controller.index(request, sort_key='created_at', limit=3)
actual = [task.task_id for task in output['tasks']]
self.assertEquals(3, len(actual))
self.assertEquals(UUID4, actual[0])
self.assertEquals(UUID3, actual[1])
self.assertEquals(UUID2, actual[2])
def test_index_with_marker_not_found(self):
fake_uuid = uuidutils.generate_uuid()
path = '/tasks'
request = unit_test_utils.get_fake_request(path)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, request, marker=fake_uuid)
def test_index_with_marker_is_not_like_uuid(self):
marker = 'INVALID_UUID'
path = '/tasks'
request = unit_test_utils.get_fake_request(path)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, request, marker=marker)
def test_index_invalid_sort_key(self):
path = '/tasks'
request = unit_test_utils.get_fake_request(path)
self.assertRaises(webob.exc.HTTPBadRequest,
self.controller.index, request, sort_key='foo')
def test_index_zero_tasks(self):
self.db.reset()
request = unit_test_utils.get_fake_request()
output = self.controller.index(request)
self.assertEqual([], output['tasks'])
def test_get(self):
request = unit_test_utils.get_fake_request()
output = self.controller.get(request, task_id=UUID1)
self.assertEqual(UUID1, output.task_id)
self.assertEqual('import', output.type)
def test_get_non_existent(self):
request = unit_test_utils.get_fake_request()
task_id = uuidutils.generate_uuid()
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.get, request, task_id)
def test_get_not_allowed(self):
request = unit_test_utils.get_fake_request()
self.assertEquals(request.context.tenant, TENANT1)
self.assertRaises(webob.exc.HTTPNotFound,
self.controller.get, request, UUID4)
def test_create(self):
request = unit_test_utils.get_fake_request()
task = {"type": "import", "input": {
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
"image_from_format": "qcow2"}
}
output = self.controller.create(request, task=task)
self.assertEqual('import', output.type)
self.assertEqual({
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
"image_from_format": "qcow2"}, output.input)
output_logs = [nlog for nlog in self.notifier.get_logs()
if nlog['event_type'] == 'task.create']
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]
self.assertEqual(output_log['notification_type'], 'INFO')
self.assertEqual(output_log['event_type'], 'task.create')
class TestTasksControllerPolicies(base.IsolatedUnitTest):
def setUp(self):
super(TestTasksControllerPolicies, self).setUp()
self.db = unit_test_utils.FakeDB()
self.policy = unit_test_utils.FakePolicyEnforcer()
self.controller = glance.api.v2.tasks.TasksController(self.db,
self.policy)
def test_index_unauthorized(self):
rules = {"get_tasks": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.controller.index,
request)
def test_get_unauthorized(self):
rules = {"get_task": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPForbidden, self.controller.get,
request, task_id=UUID2)
def test_create_task_unauthorized(self):
rules = {"add_task": False}
self.policy.set_rules(rules)
request = unit_test_utils.get_fake_request()
task = {'type': 'import', 'input': {"import_from": "fake"}}
self.assertRaises(webob.exc.HTTPForbidden, self.controller.create,
request, task)
class TestTasksDeserializer(test_utils.BaseTestCase):
def setUp(self):
super(TestTasksDeserializer, self).setUp()
self.deserializer = glance.api.v2.tasks.RequestDeserializer()
def test_create_no_body(self):
request = unit_test_utils.get_fake_request()
self.assertRaises(webob.exc.HTTPBadRequest,
self.deserializer.create, request)
def test_create(self):
request = unit_test_utils.get_fake_request()
request.body = json.dumps({
'type': 'import',
'input': {'import_from':
'swift://cloud.foo/myaccount/mycontainer/path',
'import_from_format': 'qcow2',
'image_properties': {'name': 'fake1'}},
})
output = self.deserializer.create(request)
properties = {
'type': 'import',
'input': {'import_from':
'swift://cloud.foo/myaccount/mycontainer/path',
'import_from_format': 'qcow2',
'image_properties': {'name': 'fake1'}},
}
self.maxDiff = None
expected = {'task': properties}
self.assertEqual(expected, output)
def test_index(self):
marker = uuidutils.generate_uuid()
path = '/tasks?limit=1&marker=%s' % marker
request = unit_test_utils.get_fake_request(path)
expected = {'limit': 1,
'marker': marker,
'sort_key': 'created_at',
'sort_dir': 'desc',
'filters': {}}
output = self.deserializer.index(request)
self.assertEqual(output, expected)
def test_index_strip_params_from_filters(self):
type = 'import'
path = '/tasks?type=%s' % type
request = unit_test_utils.get_fake_request(path)
output = self.deserializer.index(request)
self.assertEqual(output['filters']['type'], type)
def test_index_with_many_filter(self):
status = 'success'
type = 'import'
path = '/tasks?status=%(status)s&type=%(type)s' % locals()
request = unit_test_utils.get_fake_request(path)
output = self.deserializer.index(request)
self.assertEqual(output['filters']['status'], status)
self.assertEqual(output['filters']['type'], type)
def test_index_with_filter_and_limit(self):
status = 'success'
path = '/tasks?status=%s&limit=1' % status
request = unit_test_utils.get_fake_request(path)
output = self.deserializer.index(request)
self.assertEqual(output['filters']['status'], status)
self.assertEqual(output['limit'], 1)
def test_index_non_integer_limit(self):
request = unit_test_utils.get_fake_request('/tasks?limit=blah')
self.assertRaises(webob.exc.HTTPBadRequest,
self.deserializer.index, request)
def test_index_zero_limit(self):
request = unit_test_utils.get_fake_request('/tasks?limit=0')
expected = {'limit': 0,
'sort_key': 'created_at',
'sort_dir': 'desc',
'filters': {}}
output = self.deserializer.index(request)
self.assertEqual(expected, output)
def test_index_negative_limit(self):
path = '/tasks?limit=-1'
request = unit_test_utils.get_fake_request(path)
self.assertRaises(webob.exc.HTTPBadRequest,
self.deserializer.index, request)
def test_index_fraction(self):
request = unit_test_utils.get_fake_request('/tasks?limit=1.1')
self.assertRaises(webob.exc.HTTPBadRequest,
self.deserializer.index, request)
def test_index_invalid_status(self):
path = '/tasks?status=blah'
request = unit_test_utils.get_fake_request(path)
self.assertRaises(webob.exc.HTTPBadRequest,
self.deserializer.index, request)
def test_index_marker(self):
marker = uuidutils.generate_uuid()
path = '/tasks?marker=%s' % marker
request = unit_test_utils.get_fake_request(path)
output = self.deserializer.index(request)
self.assertEqual(output.get('marker'), marker)
def test_index_marker_not_specified(self):
request = unit_test_utils.get_fake_request('/tasks')
output = self.deserializer.index(request)
self.assertFalse('marker' in output)
def test_index_limit_not_specified(self):
request = unit_test_utils.get_fake_request('/tasks')
output = self.deserializer.index(request)
self.assertFalse('limit' in output)
def test_index_sort_key_id(self):
request = unit_test_utils.get_fake_request('/tasks?sort_key=id')
output = self.deserializer.index(request)
expected = {
'sort_key': 'id',
'sort_dir': 'desc',
'filters': {}
}
self.assertEqual(output, expected)
def test_index_sort_dir_asc(self):
request = unit_test_utils.get_fake_request('/tasks?sort_dir=asc')
output = self.deserializer.index(request)
expected = {
'sort_key': 'created_at',
'sort_dir': 'asc',
'filters': {}}
self.assertEqual(output, expected)
def test_index_sort_dir_bad_value(self):
request = unit_test_utils.get_fake_request('/tasks?sort_dir=invalid')
self.assertRaises(webob.exc.HTTPBadRequest, self.deserializer.index,
request)
class TestTasksSerializer(test_utils.BaseTestCase):
def setUp(self):
super(TestTasksSerializer, self).setUp()
self.serializer = glance.api.v2.tasks.ResponseSerializer()
self.fixtures = [
_domain_fixture(UUID1, type='import', status='pending',
input={'loc': 'fake'}, result={}, owner=TENANT1,
message='', created_at=DATETIME,
updated_at=DATETIME, expires_at=DATETIME),
_domain_fixture(UUID2, type='import', status='processing',
input={'loc': 'foo'}, owner=TENANT2, message='',
created_at=DATETIME, updated_at=DATETIME,
result={}, expires_at=DATETIME),
]
def test_index(self):
expected = {
'tasks': [
{
'id': UUID1,
'type': 'import',
'status': 'pending',
'owner': TENANT1,
'created_at': ISOTIME,
'updated_at': ISOTIME,
'expires_at': ISOTIME,
'self': '/v2/tasks/%s' % UUID1,
'schema': '/v2/schemas/task',
},
{
'id': UUID2,
'type': 'import',
'status': 'processing',
'owner': TENANT2,
'expires_at': ISOTIME,
'created_at': ISOTIME,
'updated_at': ISOTIME,
'self': '/v2/tasks/%s' % UUID2,
'schema': '/v2/schemas/task',
},
],
'first': '/v2/tasks',
'schema': '/v2/schemas/tasks',
}
request = webob.Request.blank('/v2/tasks')
response = webob.Response(request=request)
result = {'tasks': self.fixtures}
self.serializer.index(response, result)
actual = json.loads(response.body)
self.assertEqual(expected, actual)
self.assertEqual('application/json', response.content_type)
def test_index_next_marker(self):
request = webob.Request.blank('/v2/tasks')
response = webob.Response(request=request)
result = {'tasks': self.fixtures, 'next_marker': UUID2}
self.serializer.index(response, result)
output = json.loads(response.body)
self.assertEqual('/v2/tasks?marker=%s' % UUID2, output['next'])
def test_index_carries_query_parameters(self):
url = '/v2/tasks?limit=10&sort_key=id&sort_dir=asc'
request = webob.Request.blank(url)
response = webob.Response(request=request)
result = {'tasks': self.fixtures, 'next_marker': UUID2}
self.serializer.index(response, result)
output = json.loads(response.body)
self.assertEqual('/v2/tasks?sort_key=id&sort_dir=asc&limit=10',
output['first'])
expect_next = '/v2/tasks?sort_key=id&sort_dir=asc&limit=10&marker=%s'
self.assertEqual(expect_next % UUID2, output['next'])
def test_get(self):
expected = {
'id': UUID1,
'type': 'import',
'status': 'pending',
'input': {'loc': 'fake'},
'result': {},
'owner': TENANT1,
'message': '',
'created_at': ISOTIME,
'updated_at': ISOTIME,
'expires_at': ISOTIME,
'self': '/v2/tasks/%s' % UUID1,
'schema': '/v2/schemas/task',
}
response = webob.Response()
self.serializer.get(response, self.fixtures[0])
actual = json.loads(response.body)
self.assertEqual(expected, actual)
self.assertEqual('application/json', response.content_type)
def test_create(self):
response = webob.Response()
self.serializer.create(response, self.fixtures[0])
self.assertEqual(response.status_int, 201)
self.assertEqual(self.fixtures[0].task_id,
json.loads(response.body)['id'])
self.assertEqual('application/json', response.content_type)