Introduces eventlet executor for Glance Tasks
A sample import script to successfully import image from http or https location is introduced. This should work on a devstack installtion. Also, the following changes are introduced:- 1. An interface for implementing any other type of executor. 2. Provides namespace for keeping Tasks scripts. 3. A config for choosing deployer specific executor. 4. An interface for writing Tasks scripts. 5. A module for common script related methods. 6. Logic for limiting number of simultaneous tasks execution on the Glance API server. partially implements bp async-glance-workers DocImpact Change-Id: I382472fffd0fdad43573e72b2e78a9a6ed1e1f1a
This commit is contained in:
parent
2e7de07c5a
commit
186991bb9d
@ -1257,10 +1257,6 @@ Configuring Glance Tasks
|
||||
|
||||
Glance Tasks are implemented only for version 2 of the OpenStack Images API.
|
||||
|
||||
``Please be aware that Glance tasks are currently a work in progress
|
||||
feature.`` Although, the API is available, the execution part of it
|
||||
is being worked on.
|
||||
|
||||
The config value ``task_time_to_live`` is used to determine how long a task
|
||||
would be visible to the user after transitioning to either the ``success`` or
|
||||
the ``failure`` state.
|
||||
@ -1269,6 +1265,22 @@ the ``failure`` state.
|
||||
|
||||
Optional. Default: ``48``
|
||||
|
||||
The config value ``task_executor`` is used to determine which executor
|
||||
should be used by the Glance service to process the task.
|
||||
|
||||
* ``task_executor=<executor_type>``
|
||||
|
||||
Optional. Default: ``eventlet``
|
||||
|
||||
The config value ``eventlet_executor_pool_size`` is used to configure the
|
||||
eventlet task executor. It sets the maximum on the number of threads which can
|
||||
be spun up at any given point of time, that are used for the execution of
|
||||
Glance Tasks.
|
||||
|
||||
* ``eventlet_executor_pool_size=<Size_of_pool_in_int>``
|
||||
|
||||
Optional. Default: ``1000``
|
||||
|
||||
Configuring Glance performance profiling
|
||||
----------------------------------------
|
||||
|
||||
|
@ -118,15 +118,25 @@ backlog = 4096
|
||||
# The default value for property_protection_rule_format is 'roles'.
|
||||
#property_protection_rule_format = roles
|
||||
|
||||
# This value sets what strategy will be used to determine the image location
|
||||
# order. Currently two strategies are packaged with Glance 'location_order'
|
||||
# and 'store_type'.
|
||||
#location_strategy = location_order
|
||||
|
||||
# ================= Glance Tasks Options ============================
|
||||
|
||||
# Specifies how long (in hours) a task is supposed to live in the tasks DB
|
||||
# after succeeding or failing before getting soft-deleted.
|
||||
# The default value for task_time_to_live is 48 hours.
|
||||
# task_time_to_live = 48
|
||||
|
||||
# This value sets what strategy will be used to determine the image location
|
||||
# order. Currently two strategies are packaged with Glance 'location_order'
|
||||
# and 'store_type'.
|
||||
#location_strategy = location_order
|
||||
# Specifies which task executor to be used to run the task scripts.
|
||||
# The default value for task_executor is eventlet.
|
||||
# task_executor = eventlet
|
||||
|
||||
# Specifies the maximum number of eventlet threads which can be spun up by
|
||||
# the eventlet based task executor to perform execution of Glance tasks.
|
||||
# eventlet_executor_pool_size = 1000
|
||||
|
||||
# ================= Syslog Options ============================
|
||||
|
||||
|
@ -416,8 +416,8 @@ class RequestDeserializer(wsgi.JSONRequestDeserializer):
|
||||
partial_image = None
|
||||
if len(change['path']) == 1:
|
||||
partial_image = {path_root: change['value']}
|
||||
elif ((path_root in _get_base_properties().keys()) and
|
||||
(_get_base_properties()[path_root].get('type', '') == 'array')):
|
||||
elif ((path_root in get_base_properties().keys()) and
|
||||
(get_base_properties()[path_root].get('type', '') == 'array')):
|
||||
# NOTE(zhiyan): cient can use PATCH API to adding element to
|
||||
# the image's existing set property directly.
|
||||
# Such as: 1. using '/locations/N' path to adding a location
|
||||
@ -667,7 +667,7 @@ class ResponseSerializer(wsgi.JSONResponseSerializer):
|
||||
response.status_int = 204
|
||||
|
||||
|
||||
def _get_base_properties():
|
||||
def get_base_properties():
|
||||
return {
|
||||
'id': {
|
||||
'type': 'string',
|
||||
@ -801,7 +801,7 @@ def _get_base_links():
|
||||
|
||||
|
||||
def get_schema(custom_properties=None):
|
||||
properties = _get_base_properties()
|
||||
properties = get_base_properties()
|
||||
links = _get_base_links()
|
||||
if CONF.allow_additional_image_properties:
|
||||
schema = glance.schema.PermissiveSchema('image', properties, links)
|
||||
|
@ -56,6 +56,7 @@ class TasksController(object):
|
||||
|
||||
def create(self, req, task):
|
||||
task_factory = self.gateway.get_task_factory(req.context)
|
||||
executor_factory = self.gateway.get_task_executor_factory(req.context)
|
||||
task_repo = self.gateway.get_task_repo(req.context)
|
||||
live_time = CONF.task.task_time_to_live
|
||||
try:
|
||||
@ -64,6 +65,8 @@ class TasksController(object):
|
||||
task_time_to_live=live_time,
|
||||
task_input=task['input'])
|
||||
task_repo.add(new_task)
|
||||
task_executor = executor_factory.new_task_executor(req.context)
|
||||
new_task.run(task_executor)
|
||||
except exception.Forbidden as e:
|
||||
msg = (_LI("Forbidden to create task. Reason: %(reason)s")
|
||||
% {'reason': utils.exception_to_str(e)})
|
||||
|
72
glance/async/__init__.py
Normal file
72
glance/async/__init__.py
Normal file
@ -0,0 +1,72 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from glance import i18n
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
_LE = i18n._LE
|
||||
|
||||
|
||||
class TaskExecutor(object):
|
||||
"""Base class for Asynchronous task executors. It does not support the
|
||||
execution mechanism.
|
||||
|
||||
Provisions the extensible classes with necessary variables to utilize
|
||||
important Glance modules like, context, task_repo, image_repo,
|
||||
image_factory.
|
||||
|
||||
Note:
|
||||
It also gives abstraction for the standard pre-processing and
|
||||
post-processing operations to be executed by a task. These may include
|
||||
validation checks, security checks, introspection, error handling etc.
|
||||
The aim is to give developers an abstract sense of the execution
|
||||
pipeline logic.
|
||||
|
||||
Args:
|
||||
context: glance.context.RequestContext object for AuthZ and AuthN
|
||||
checks
|
||||
task_repo: glance.db.TaskRepo object which acts as a translator for
|
||||
glance.domain.Task and glance.domain.TaskStub objects
|
||||
into ORM semantics
|
||||
image_repo: glance.db.ImageRepo object which acts as a translator for
|
||||
glance.domain.Image object into ORM semantics
|
||||
image_factory: glance.domain.ImageFactory object to be used for
|
||||
creating new images for certain types of tasks viz. import, cloning
|
||||
"""
|
||||
|
||||
def __init__(self, context, task_repo, image_repo, image_factory):
|
||||
self.context = context
|
||||
self.task_repo = task_repo
|
||||
self.image_repo = image_repo
|
||||
self.image_factory = image_factory
|
||||
|
||||
def begin_processing(self, task_id):
|
||||
task = self.task_repo.get(task_id)
|
||||
task.begin_processing()
|
||||
self.task_repo.save(task)
|
||||
|
||||
# start running
|
||||
self._run(task_id, task.type)
|
||||
|
||||
def _run(self, task_id, task_type):
|
||||
task = self.task_repo.get(task_id)
|
||||
msg = _LE("This execution of Tasks is not setup. Please consult the "
|
||||
"project documentation for more information on the "
|
||||
"executors available.")
|
||||
LOG.error(msg)
|
||||
task.fail(_LE("Internal error occurred while trying to process task."))
|
||||
self.task_repo.save(task)
|
61
glance/async/eventlet_executor.py
Normal file
61
glance/async/eventlet_executor.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
from oslo.config import cfg
|
||||
|
||||
import glance.async
|
||||
import glance.common.scripts as scripts
|
||||
from glance import i18n
|
||||
from glance.openstack.common import lockutils
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
|
||||
_LI = i18n._LI
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('eventlet_executor_pool_size', 'glance.common.config',
|
||||
group='task')
|
||||
|
||||
_MAX_EXECUTOR_THREADS = CONF.task.eventlet_executor_pool_size
|
||||
_THREAD_POOL = None
|
||||
|
||||
|
||||
class TaskExecutor(glance.async.TaskExecutor):
|
||||
def __init__(self, context, task_repo, image_repo, image_factory):
|
||||
super(TaskExecutor, self).__init__(context, task_repo, image_repo,
|
||||
image_factory)
|
||||
if _THREAD_POOL is None:
|
||||
self._set_gobal_threadpool_if_none()
|
||||
|
||||
@lockutils.synchronized("tasks_eventlet_pool")
|
||||
def _set_gobal_threadpool_if_none(self):
|
||||
global _THREAD_POOL
|
||||
if _THREAD_POOL is None:
|
||||
_THREAD_POOL = eventlet.GreenPool(size=_MAX_EXECUTOR_THREADS)
|
||||
|
||||
def _run(self, task_id, task_type):
|
||||
LOG.info(_LI('Eventlet executor picked up the execution of task ID '
|
||||
'%(task_id)s of task type '
|
||||
'%(task_type)s') % {'task_id': task_id,
|
||||
'task_type': task_type})
|
||||
|
||||
_THREAD_POOL.spawn_n(scripts.run_task,
|
||||
task_id,
|
||||
task_type,
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
@ -29,7 +29,7 @@ from glance.common import utils
|
||||
|
||||
# Monkey patch socket, time, select, threads
|
||||
eventlet.patcher.monkey_patch(all=False, socket=True, time=True,
|
||||
select=True, thread=True)
|
||||
select=True, thread=True, os=True)
|
||||
|
||||
# If ../glance/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
|
@ -61,6 +61,15 @@ task_opts = [
|
||||
"succeeding or failing"),
|
||||
deprecated_opts=[cfg.DeprecatedOpt('task_time_to_live',
|
||||
group='DEFAULT')]),
|
||||
cfg.StrOpt('task_executor',
|
||||
default='eventlet',
|
||||
help=_("Specifies which task executor to be used to run the "
|
||||
"task scripts.")),
|
||||
cfg.IntOpt('eventlet_executor_pool_size',
|
||||
default=1000,
|
||||
help=_("Specifies the maximum number of eventlet threads which "
|
||||
"can be spun up by the eventlet based task executor to "
|
||||
"perform execution of Glance tasks.")),
|
||||
]
|
||||
common_opts = [
|
||||
cfg.BoolOpt('allow_additional_image_properties', default=True,
|
||||
|
52
glance/common/scripts/__init__.py
Normal file
52
glance/common/scripts/__init__.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from glance.common.scripts.image_import import main as image_import
|
||||
from glance import i18n
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
|
||||
_LI = i18n._LI
|
||||
_LE = i18n._LE
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_task(task_id, task_type, context,
|
||||
task_repo=None, image_repo=None, image_factory=None):
|
||||
#TODO(nikhil): if task_repo is None get new task repo
|
||||
#TODO(nikhil): if image_repo is None get new image repo
|
||||
#TODO(nikhil): if image_factory is None get new image factory
|
||||
LOG.info(_LI("Loading known task scripts for task_id %(task_id)s "
|
||||
"of type %(task_type)s"), {'task_id': task_id,
|
||||
'task_type': task_type})
|
||||
if task_type == 'import':
|
||||
image_import.run(task_id, context, task_repo,
|
||||
image_repo, image_factory)
|
||||
|
||||
else:
|
||||
msg = _LE("This task type %(task_type)s is not supported by the "
|
||||
"current deployment of Glance. Please refer the "
|
||||
"documentation provided by OpenStack or your operator "
|
||||
"for more information.") % {'task_type': task_type}
|
||||
LOG.error(msg)
|
||||
task = task_repo.get(task_id)
|
||||
task.fail(msg)
|
||||
if task_repo:
|
||||
task_repo.save(task)
|
||||
else:
|
||||
LOG.error(_LE("Failed to save task %(task_id)s in DB as task_repo "
|
||||
"is %(task_repo)s"), {"task_id": task_id,
|
||||
"task_repo": task_repo})
|
0
glance/common/scripts/image_import/__init__.py
Normal file
0
glance/common/scripts/image_import/__init__.py
Normal file
155
glance/common/scripts/image_import/main.py
Normal file
155
glance/common/scripts/image_import/main.py
Normal file
@ -0,0 +1,155 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
__all__ = [
|
||||
'run',
|
||||
]
|
||||
|
||||
import six
|
||||
|
||||
from glance.api.v2 import images as v2_api
|
||||
from glance.common import exception
|
||||
from glance.common.scripts import utils as script_utils
|
||||
from glance.common import utils as common_utils
|
||||
from glance import i18n
|
||||
from glance.openstack.common import excutils
|
||||
from glance.openstack.common import lockutils
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
|
||||
_LE = i18n._LE
|
||||
_LI = i18n._LI
|
||||
_LW = i18n._LW
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run(t_id, context, task_repo, image_repo, image_factory):
|
||||
LOG.info(_LI('Task %(task_id)s beginning import '
|
||||
'execution.') % {'task_id': t_id})
|
||||
_execute(t_id, task_repo, image_repo, image_factory)
|
||||
|
||||
|
||||
# NOTE(nikhil): This lock prevents more than N number of threads to be spawn
|
||||
# simultaneously. The number N represents the number of threads in the
|
||||
# executor pool. The value is set to 10 in the eventlet executor.
|
||||
@lockutils.synchronized("glance_import")
|
||||
def _execute(t_id, task_repo, image_repo, image_factory):
|
||||
task = script_utils.get_task(task_repo, t_id)
|
||||
|
||||
if task is None:
|
||||
# NOTE: This happens if task is not found in the database. In
|
||||
# such cases, there is no way to update the task status so,
|
||||
# it's ignored here.
|
||||
return
|
||||
|
||||
try:
|
||||
task_input = script_utils.unpack_task_input(task)
|
||||
|
||||
uri = script_utils.validate_location_uri(task_input.get('import_from'))
|
||||
image_id = import_image(image_repo, image_factory, task_input, t_id,
|
||||
uri)
|
||||
|
||||
task.succeed({'image_id': image_id})
|
||||
except Exception as e:
|
||||
# Note: The message string contains Error in it to indicate
|
||||
# in the task.message that it's a error message for the user.
|
||||
|
||||
#TODO(nikhil): need to bring back save_and_reraise_exception when
|
||||
# necessary
|
||||
err_msg = ("Error: " + six.text_type(type(e)) + ': ' +
|
||||
common_utils.exception_to_str(e))
|
||||
log_msg = _LE(err_msg + ("Task ID %s" % task.task_id))
|
||||
LOG.exception(log_msg)
|
||||
|
||||
task.fail(_LE(err_msg))
|
||||
finally:
|
||||
task_repo.save(task)
|
||||
|
||||
|
||||
def import_image(image_repo, image_factory, task_input, task_id, uri):
|
||||
original_image = create_image(image_repo, image_factory,
|
||||
task_input.get('image_properties'), task_id)
|
||||
# NOTE: set image status to saving just before setting data
|
||||
original_image.status = 'saving'
|
||||
image_repo.save(original_image)
|
||||
set_image_data(original_image, uri, None)
|
||||
|
||||
# NOTE: Check if the Image is not deleted after setting the data
|
||||
# before setting it's status to active. We need to set the status
|
||||
# explicitly here using the Image object returned from image_repo .The
|
||||
# Image object returned from create_image method does not have appropriate
|
||||
# factories wrapped around it.
|
||||
image_id = original_image.image_id
|
||||
new_image = image_repo.get(image_id)
|
||||
if new_image.status in ['saving']:
|
||||
new_image.status = 'active'
|
||||
new_image.size = original_image.size
|
||||
new_image.virtual_size = original_image.virtual_size
|
||||
new_image.checksum = original_image.checksum
|
||||
else:
|
||||
msg = _LE("The Image %(image_id)s object being created by this task "
|
||||
"%(task_id)s, is no longer in valid status for further "
|
||||
"processing." % {"image_id": new_image.image_id,
|
||||
"task_id": task_id})
|
||||
raise exception.Conflict(msg)
|
||||
image_repo.save(new_image)
|
||||
|
||||
return image_id
|
||||
|
||||
|
||||
def create_image(image_repo, image_factory, image_properties, task_id):
|
||||
_base_properties = []
|
||||
for k, v in v2_api.get_base_properties().items():
|
||||
_base_properties.append(k)
|
||||
|
||||
properties = {}
|
||||
# NOTE: get the base properties
|
||||
for key in _base_properties:
|
||||
try:
|
||||
properties[key] = image_properties.pop(key)
|
||||
except KeyError:
|
||||
msg = _("Task ID %(task_id)s: Ignoring property %(k)s for setting "
|
||||
"base properties while creating "
|
||||
"Image.") % {'task_id': task_id, 'k': key}
|
||||
LOG.debug(msg)
|
||||
|
||||
# NOTE: get the rest of the properties and pass them as
|
||||
# extra_properties for Image to be created with them.
|
||||
properties['extra_properties'] = image_properties
|
||||
script_utils.set_base_image_properties(properties=properties)
|
||||
|
||||
image = image_factory.new_image(**properties)
|
||||
image_repo.add(image)
|
||||
return image
|
||||
|
||||
|
||||
def set_image_data(image, uri, task_id):
|
||||
data_iter = None
|
||||
try:
|
||||
LOG.info(_LI("Task %(task_id)s: Got image data uri %(data_uri)s to be "
|
||||
"imported") % {"data_uri": uri, "task_id": task_id})
|
||||
data_iter = script_utils.get_image_data_iter(uri)
|
||||
image.set_data(data_iter)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.warn(_LW("Task %(task_id)s failed with exception %(error)s") %
|
||||
{"error": common_utils.exception_to_str(e),
|
||||
"task_id": task_id})
|
||||
LOG.info(_LI("Task %(task_id)s: Could not import image file"
|
||||
" %(image_data)s") % {"image_data": uri,
|
||||
"task_id": task_id})
|
||||
finally:
|
||||
if isinstance(data_iter, file):
|
||||
data_iter.close()
|
120
glance/common/scripts/utils.py
Normal file
120
glance/common/scripts/utils.py
Normal file
@ -0,0 +1,120 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
__all__ = [
|
||||
'get_task',
|
||||
'unpack_task_input',
|
||||
'set_base_image_properties',
|
||||
'validate_location_uri',
|
||||
'get_image_data_iter',
|
||||
]
|
||||
|
||||
|
||||
import urllib2
|
||||
|
||||
from glance.common import exception
|
||||
from glance import i18n
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
|
||||
_LE = i18n._LE
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_task(task_repo, task_id):
|
||||
"""Gets a TaskProxy object.
|
||||
|
||||
:param task_repo: TaskRepo object used to perform DB operations
|
||||
:param task_id: ID of the Task
|
||||
"""
|
||||
task = None
|
||||
try:
|
||||
task = task_repo.get(task_id)
|
||||
except exception.NotFound:
|
||||
msg = _LE('Task not found for task_id %s') % task_id
|
||||
LOG.exception(msg)
|
||||
|
||||
return task
|
||||
|
||||
|
||||
def unpack_task_input(task):
|
||||
"""Verifies and returns valid task input dictionary.
|
||||
|
||||
:param task: Task domain object
|
||||
"""
|
||||
task_input = task.task_input
|
||||
|
||||
# NOTE: until we support multiple task types, we just check for
|
||||
# input fields related to 'import task'.
|
||||
for key in ["import_from", "import_from_format", "image_properties"]:
|
||||
if key not in task_input:
|
||||
msg = _("Input does not contain '%(key)s' field") % {"key": key}
|
||||
raise exception.Invalid(msg)
|
||||
|
||||
return task_input
|
||||
|
||||
|
||||
def set_base_image_properties(properties=None):
|
||||
"""Sets optional base properties for creating Image.
|
||||
|
||||
:param properties: Input dict to set some base properties
|
||||
"""
|
||||
if properties is not None:
|
||||
# TODO(nikhil): We can make these properties configurable while
|
||||
# implementing the pipeline logic for the scripts. The below shown
|
||||
# are placeholders to show that the scripts work on 'devstack'
|
||||
# environment.
|
||||
properties['disk_format'] = 'qcow2'
|
||||
properties['container_format'] = 'ovf'
|
||||
|
||||
|
||||
def validate_location_uri(location):
|
||||
"""Validate location uri into acceptable format.
|
||||
|
||||
:param location: Location uri to be validated
|
||||
"""
|
||||
if not location:
|
||||
raise exception.BadStoreUri(_('Invalid location: %s') % location)
|
||||
|
||||
elif location.startswith('http://') or location.startswith("https://"):
|
||||
return location
|
||||
|
||||
# NOTE: file type uri is being avoided for security reasons,
|
||||
# see LP bug #942118.
|
||||
elif location.startswith("file:///"):
|
||||
msg = ("File based imports are not allowed. Please use a non-local "
|
||||
"source of image data.")
|
||||
# NOTE: raise Exception and let the encompassing block save
|
||||
# the error msg in the task.message.
|
||||
raise StandardError(msg)
|
||||
|
||||
else:
|
||||
#TODO(nikhil): add other supported uris
|
||||
supported = ['http', ]
|
||||
msg = ("The given uri is not valid. Please specify a "
|
||||
"valid uri from the following list of supported uri "
|
||||
"%(supported)s" % {'supported': supported})
|
||||
raise urllib2.URLError(msg)
|
||||
|
||||
|
||||
def get_image_data_iter(uri):
|
||||
"""
|
||||
The scripts are expected to support only over non-local locations of data.
|
||||
Note the absence of file:// for security reasons, see LP bug #942118.
|
||||
If the above constraint is violated, task should fail.
|
||||
"""
|
||||
# NOTE: Current script supports http location. Other locations
|
||||
# types are to be supported as the script evolve.
|
||||
return urllib2.urlopen(uri)
|
@ -238,6 +238,15 @@ def user_get_storage_usage(client, owner_id, image_id=None, session=None):
|
||||
return client.user_get_storage_usage(owner_id=owner_id, image_id=image_id)
|
||||
|
||||
|
||||
@_get_client
|
||||
def task_get(client, task_id, session=None, force_show_deleted=False):
|
||||
"""Get a single task object
|
||||
:return: task dictionary
|
||||
"""
|
||||
return client.task_get(task_id=task_id, session=session,
|
||||
force_show_deleted=force_show_deleted)
|
||||
|
||||
|
||||
@_get_client
|
||||
def task_get_all(client, filters=None, marker=None, limit=None,
|
||||
sort_key='created_at', sort_dir='desc', admin_as_user=False):
|
||||
@ -261,7 +270,18 @@ def task_get_all(client, filters=None, marker=None, limit=None,
|
||||
@_get_client
|
||||
def task_create(client, values, session=None):
|
||||
"""Create a task object"""
|
||||
return client.task_create(values=values)
|
||||
return client.task_create(values=values, session=session)
|
||||
|
||||
|
||||
@_get_client
|
||||
def task_delete(client, task_id, session=None):
|
||||
"""Delete a task object"""
|
||||
return client.task_delete(task_id=task_id, session=session)
|
||||
|
||||
|
||||
@_get_client
|
||||
def task_update(client, task_id, values, session=None):
|
||||
return client.task_update(task_id=task_id, values=values, session=session)
|
||||
|
||||
|
||||
# Metadef
|
||||
|
@ -22,12 +22,16 @@ from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from glance.common import exception
|
||||
from glance import i18n
|
||||
from glance.openstack.common import excutils
|
||||
from glance.openstack.common import importutils
|
||||
import glance.openstack.common.log as logging
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
_LE = i18n._LE
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('task_executor', 'glance.common.config', group='task')
|
||||
|
||||
|
||||
_delayed_delete_imported = False
|
||||
@ -371,6 +375,7 @@ class Task(object):
|
||||
{'task_id': self.task_id, 'cur_status': self.status,
|
||||
'new_status': new_status})
|
||||
LOG.info(log_msg)
|
||||
self._status = new_status
|
||||
else:
|
||||
log_msg = (_("Task [%(task_id)s] status failed to change from "
|
||||
"%(cur_status)s to %(new_status)s") %
|
||||
@ -399,7 +404,7 @@ class Task(object):
|
||||
self.expires_at = timeutils.utcnow() + self._time_to_live
|
||||
|
||||
def run(self, executor):
|
||||
pass
|
||||
executor.begin_processing(self.task_id)
|
||||
|
||||
|
||||
class TaskStub(object):
|
||||
@ -445,6 +450,29 @@ class TaskFactory(object):
|
||||
)
|
||||
|
||||
|
||||
class TaskExecutorFactory(object):
|
||||
|
||||
def __init__(self, task_repo, image_repo, image_factory):
|
||||
self.task_repo = task_repo
|
||||
self.image_repo = image_repo
|
||||
self.image_factory = image_factory
|
||||
|
||||
def new_task_executor(self, context):
|
||||
try:
|
||||
executor_cls = ('glance.async.%s_executor.'
|
||||
'TaskExecutor' % CONF.task.task_executor)
|
||||
LOG.debug("Loading %s executor" % CONF.task.task_executor)
|
||||
executor = importutils.import_class(executor_cls)
|
||||
return executor(context,
|
||||
self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
||||
except ImportError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE("Failed to load the %s executor provided "
|
||||
"in the config.") % CONF.task.task_executor)
|
||||
|
||||
|
||||
class MetadefNamespace(object):
|
||||
|
||||
def __init__(self, namespace_id, namespace, display_name, description,
|
||||
|
@ -121,6 +121,14 @@ class Gateway(object):
|
||||
notifier_task_stub_repo, context)
|
||||
return authorized_task_stub_repo
|
||||
|
||||
def get_task_executor_factory(self, context):
|
||||
task_repo = self.get_task_repo(context)
|
||||
image_repo = self.get_repo(context)
|
||||
image_factory = self.get_image_factory(context)
|
||||
return glance.domain.TaskExecutorFactory(task_repo,
|
||||
image_repo,
|
||||
image_factory)
|
||||
|
||||
def get_metadef_namespace_factory(self, context):
|
||||
ns_factory = glance.domain.MetadefNamespaceFactory()
|
||||
policy_ns_factory = policy.MetadefNamespaceFactoryProxy(
|
||||
|
@ -14,10 +14,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import requests
|
||||
|
||||
from glance.openstack.common import jsonutils
|
||||
@ -35,7 +33,6 @@ class TestTasks(functional.FunctionalTest):
|
||||
def setUp(self):
|
||||
super(TestTasks, self).setUp()
|
||||
self.cleanup()
|
||||
self.file_path = self._stash_file()
|
||||
self.api_server.deployment_flavor = 'noauth'
|
||||
|
||||
def _url(self, path):
|
||||
@ -52,16 +49,6 @@ class TestTasks(functional.FunctionalTest):
|
||||
base_headers.update(custom_headers or {})
|
||||
return base_headers
|
||||
|
||||
def _stash_file(self):
|
||||
self.tmp_dir = self.useFixture(fixtures.TempDir()).path
|
||||
self.store_dir = os.path.join(self.tmp_dir, 'images')
|
||||
os.mkdir(self.store_dir)
|
||||
|
||||
file_path = os.path.join(self.store_dir, 'foo')
|
||||
with open(file_path, 'w') as f:
|
||||
f.write('blah')
|
||||
return 'file://%s' % file_path
|
||||
|
||||
def test_task_lifecycle(self):
|
||||
self.start_servers(**self.__dict__.copy())
|
||||
# Task list should be empty
|
||||
@ -78,7 +65,7 @@ class TestTasks(functional.FunctionalTest):
|
||||
data = jsonutils.dumps({
|
||||
"type": "import",
|
||||
"input": {
|
||||
"import_from": self.file_path,
|
||||
"import_from": "http://example.com",
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": {
|
||||
'disk_format': 'vhd',
|
||||
@ -111,7 +98,7 @@ class TestTasks(functional.FunctionalTest):
|
||||
'status': 'pending',
|
||||
'type': 'import',
|
||||
'input': {
|
||||
"import_from": self.file_path,
|
||||
"import_from": "http://example.com",
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": {
|
||||
'disk_format': 'vhd',
|
||||
@ -125,15 +112,15 @@ class TestTasks(functional.FunctionalTest):
|
||||
# Tasks list should now have one entry
|
||||
path = self._url('/v2/tasks')
|
||||
response = requests.get(path, headers=self._headers())
|
||||
self.assertEqual(200, response.status_code)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
tasks = jsonutils.loads(response.text)['tasks']
|
||||
self.assertEqual(1, len(tasks))
|
||||
self.assertEqual(len(tasks), 1)
|
||||
self.assertEqual(tasks[0]['id'], task_id)
|
||||
|
||||
# Attempt to delete a task
|
||||
path = self._url('/v2/tasks/%s' % tasks[0]['id'])
|
||||
response = requests.delete(path, headers=self._headers())
|
||||
self.assertEqual(405, response.status_code)
|
||||
self.assertEqual(response.status_code, 405)
|
||||
self.assertIsNotNone(response.headers.get('Allow'))
|
||||
self.assertEqual('GET', response.headers.get('Allow'))
|
||||
|
||||
|
@ -13,8 +13,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
|
||||
from glance.api.v2 import tasks
|
||||
import glance.openstack.common.jsonutils as json
|
||||
from glance.openstack.common import timeutils
|
||||
from glance.tests.integration.v2 import base
|
||||
|
||||
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
|
||||
@ -35,7 +38,7 @@ def _new_task_fixture(**kwargs):
|
||||
task_data = {
|
||||
"type": "import",
|
||||
"input": {
|
||||
"import_from": "/some/file/path",
|
||||
"import_from": "http://example.com",
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": {
|
||||
'disk_format': 'vhd',
|
||||
@ -54,10 +57,42 @@ class TestTasksApi(base.ApiTest):
|
||||
self.api_flavor = 'fakeauth'
|
||||
self.registry_flavor = 'fakeauth'
|
||||
|
||||
def _wait_on_task_execution(self):
|
||||
"""Wait until all the tasks have finished execution and are in
|
||||
state of success or failure.
|
||||
"""
|
||||
|
||||
start = timeutils.utcnow()
|
||||
|
||||
# wait for maximum of 5 seconds
|
||||
while timeutils.delta_seconds(start, timeutils.utcnow()) < 5:
|
||||
wait = False
|
||||
# Verify that no task is in status of pending or processing
|
||||
path = "/v2/tasks"
|
||||
res, content = self.http.request(path, 'GET',
|
||||
headers=minimal_task_headers())
|
||||
content_dict = json.loads(content)
|
||||
|
||||
self.assertEqual(res.status, 200)
|
||||
res_tasks = content_dict['tasks']
|
||||
if len(res_tasks) != 0:
|
||||
for task in res_tasks:
|
||||
if task['status'] in ('pending', 'processing'):
|
||||
wait = True
|
||||
break
|
||||
|
||||
if wait:
|
||||
time.sleep(0.05)
|
||||
continue
|
||||
else:
|
||||
break
|
||||
|
||||
def _post_new_task(self, **kwargs):
|
||||
task_owner = kwargs['owner']
|
||||
task_owner = kwargs.get('owner')
|
||||
headers = minimal_task_headers(task_owner)
|
||||
|
||||
task_data = _new_task_fixture()
|
||||
task_data['input']['import_from'] = "http://example.com"
|
||||
body_content = json.dumps(task_data)
|
||||
|
||||
path = "/v2/tasks"
|
||||
@ -68,7 +103,14 @@ class TestTasksApi(base.ApiTest):
|
||||
self.assertEqual(response.status, 201)
|
||||
|
||||
task = json.loads(content)
|
||||
return task
|
||||
task_id = task['id']
|
||||
|
||||
self.assertIsNotNone(task_id)
|
||||
self.assertEqual(task_owner, task['owner'])
|
||||
self.assertEqual(task_data['type'], task['type'])
|
||||
self.assertEqual(task_data['input'], task['input'])
|
||||
|
||||
return task, task_data
|
||||
|
||||
def test_all_task_api(self):
|
||||
# 0. GET /tasks
|
||||
@ -92,32 +134,18 @@ class TestTasksApi(base.ApiTest):
|
||||
|
||||
# 2. POST /tasks
|
||||
# Create a new task
|
||||
task_data = _new_task_fixture()
|
||||
task_owner = 'tenant1'
|
||||
body_content = json.dumps(task_data)
|
||||
|
||||
path = "/v2/tasks"
|
||||
response, content = self.http.request(
|
||||
path, 'POST', headers=minimal_task_headers(task_owner),
|
||||
body=body_content)
|
||||
self.assertEqual(response.status, 201)
|
||||
|
||||
data = json.loads(content)
|
||||
task_id = data['id']
|
||||
|
||||
self.assertIsNotNone(task_id)
|
||||
self.assertEqual(task_owner, data['owner'])
|
||||
self.assertEqual(task_data['type'], data['type'])
|
||||
self.assertEqual(task_data['input'], data['input'])
|
||||
data, req_input = self._post_new_task(owner=task_owner)
|
||||
|
||||
# 3. GET /tasks/{task_id}
|
||||
# Get an existing task
|
||||
task_id = data['id']
|
||||
path = "/v2/tasks/%s" % task_id
|
||||
response, content = self.http.request(path, 'GET',
|
||||
headers=minimal_task_headers())
|
||||
self.assertEqual(response.status, 200)
|
||||
|
||||
# 4. GET /tasks/{task_id}
|
||||
# 4. GET /tasks
|
||||
# Get all tasks (not deleted)
|
||||
path = "/v2/tasks"
|
||||
response, content = self.http.request(path, 'GET',
|
||||
@ -134,12 +162,16 @@ class TestTasksApi(base.ApiTest):
|
||||
'created_at', 'updated_at', 'self', 'schema'])
|
||||
task = data['tasks'][0]
|
||||
self.assertEqual(expected_keys, set(task.keys()))
|
||||
self.assertEqual(task_data['type'], task['type'])
|
||||
self.assertEqual(req_input['type'], task['type'])
|
||||
self.assertEqual(task_owner, task['owner'])
|
||||
self.assertEqual('pending', task['status'])
|
||||
self.assertEqual('processing', task['status'])
|
||||
self.assertIsNotNone(task['created_at'])
|
||||
self.assertIsNotNone(task['updated_at'])
|
||||
|
||||
# NOTE(nikhil): wait for all task executions to finish before exiting
|
||||
# else there is a risk of running into deadlock
|
||||
self._wait_on_task_execution()
|
||||
|
||||
def test_task_schema_api(self):
|
||||
# 0. GET /schemas/task
|
||||
# Verify schema for task
|
||||
@ -167,6 +199,10 @@ class TestTasksApi(base.ApiTest):
|
||||
self.assertIsNotNone(data)
|
||||
self.assertEqual(expected_schema, data)
|
||||
|
||||
# NOTE(nikhil): wait for all task executions to finish before exiting
|
||||
# else there is a risk of running into deadlock
|
||||
self._wait_on_task_execution()
|
||||
|
||||
def test_create_new_task(self):
|
||||
# 0. POST /tasks
|
||||
# Create a new task with valid input and type
|
||||
@ -214,6 +250,10 @@ class TestTasksApi(base.ApiTest):
|
||||
body=body_content)
|
||||
self.assertEqual(response.status, 400)
|
||||
|
||||
# NOTE(nikhil): wait for all task executions to finish before exiting
|
||||
# else there is a risk of running into deadlock
|
||||
self._wait_on_task_execution()
|
||||
|
||||
def test_tasks_with_filter(self):
|
||||
|
||||
# 0. GET /v2/tasks
|
||||
@ -229,30 +269,13 @@ class TestTasksApi(base.ApiTest):
|
||||
|
||||
task_ids = []
|
||||
|
||||
# 1. POST /tasks with two tasks with status 'pending' and 'processing'
|
||||
# with various attributes
|
||||
# 1. Make 2 POST requests on /tasks with various attributes
|
||||
task_owner = TENANT1
|
||||
headers = minimal_task_headers(task_owner)
|
||||
task_data = _new_task_fixture()
|
||||
body_content = json.dumps(task_data)
|
||||
path = "/v2/tasks"
|
||||
response, content = self.http.request(path, 'POST',
|
||||
headers=headers,
|
||||
body=body_content)
|
||||
self.assertEqual(response.status, 201)
|
||||
data = json.loads(content)
|
||||
data, req_input1 = self._post_new_task(owner=task_owner)
|
||||
task_ids.append(data['id'])
|
||||
|
||||
task_owner = TENANT2
|
||||
headers = minimal_task_headers(task_owner)
|
||||
task_data = _new_task_fixture()
|
||||
body_content = json.dumps(task_data)
|
||||
path = "/v2/tasks"
|
||||
response, content = self.http.request(path, 'POST',
|
||||
headers=headers,
|
||||
body=body_content)
|
||||
self.assertEqual(response.status, 201)
|
||||
data = json.loads(content)
|
||||
data, req_input2 = self._post_new_task(owner=task_owner)
|
||||
task_ids.append(data['id'])
|
||||
|
||||
# 2. GET /tasks
|
||||
@ -308,32 +331,9 @@ class TestTasksApi(base.ApiTest):
|
||||
actual_task_ids = [task['id'] for task in content_dict['tasks']]
|
||||
self.assertEqual(set(task_ids), set(actual_task_ids))
|
||||
|
||||
# 5. GET /tasks with status filter
|
||||
# Verify correct tasks are returned for status 'pending'
|
||||
params = "status=pending"
|
||||
path = "/v2/tasks?%s" % params
|
||||
|
||||
response, content = self.http.request(path, 'GET',
|
||||
headers=minimal_task_headers())
|
||||
self.assertEqual(response.status, 200)
|
||||
|
||||
content_dict = json.loads(content)
|
||||
self.assertEqual(2, len(content_dict['tasks']))
|
||||
|
||||
actual_task_ids = [task['id'] for task in content_dict['tasks']]
|
||||
self.assertEqual(set(task_ids), set(actual_task_ids))
|
||||
|
||||
# 6. GET /tasks with status filter
|
||||
# Verify no task are returned for status which is not 'pending'
|
||||
params = "status=success"
|
||||
path = "/v2/tasks?%s" % params
|
||||
|
||||
response, content = self.http.request(path, 'GET',
|
||||
headers=minimal_task_headers())
|
||||
self.assertEqual(response.status, 200)
|
||||
|
||||
content_dict = json.loads(content)
|
||||
self.assertEqual(0, len(content_dict['tasks']))
|
||||
# NOTE(nikhil): wait for all task executions to finish before exiting
|
||||
# else there is a risk of running into deadlock
|
||||
self._wait_on_task_execution()
|
||||
|
||||
def test_limited_tasks(self):
|
||||
"""
|
||||
@ -353,13 +353,13 @@ class TestTasksApi(base.ApiTest):
|
||||
|
||||
# 1. POST /tasks with three tasks with various attributes
|
||||
|
||||
task = self._post_new_task(owner=TENANT1)
|
||||
task, _ = self._post_new_task(owner=TENANT1)
|
||||
task_ids.append(task['id'])
|
||||
|
||||
task = self._post_new_task(owner=TENANT2)
|
||||
task, _ = self._post_new_task(owner=TENANT2)
|
||||
task_ids.append(task['id'])
|
||||
|
||||
task = self._post_new_task(owner=TENANT3)
|
||||
task, _ = self._post_new_task(owner=TENANT3)
|
||||
task_ids.append(task['id'])
|
||||
|
||||
# 2. GET /tasks
|
||||
@ -418,6 +418,10 @@ class TestTasksApi(base.ApiTest):
|
||||
self.assertEqual(1, len(actual_tasks))
|
||||
self.assertEqual(tasks[2]['id'], actual_tasks[0]['id'])
|
||||
|
||||
# NOTE(nikhil): wait for all task executions to finish before exiting
|
||||
# else there is a risk of running into deadlock
|
||||
self._wait_on_task_execution()
|
||||
|
||||
def test_ordered_tasks(self):
|
||||
# 0. GET /tasks
|
||||
# Verify no tasks
|
||||
@ -431,13 +435,13 @@ class TestTasksApi(base.ApiTest):
|
||||
task_ids = []
|
||||
|
||||
# 1. POST /tasks with three tasks with various attributes
|
||||
task = self._post_new_task(owner=TENANT1)
|
||||
task, _ = self._post_new_task(owner=TENANT1)
|
||||
task_ids.append(task['id'])
|
||||
|
||||
task = self._post_new_task(owner=TENANT2)
|
||||
task, _ = self._post_new_task(owner=TENANT2)
|
||||
task_ids.append(task['id'])
|
||||
|
||||
task = self._post_new_task(owner=TENANT3)
|
||||
task, _ = self._post_new_task(owner=TENANT3)
|
||||
task_ids.append(task['id'])
|
||||
|
||||
# 2. GET /tasks with no query params
|
||||
@ -501,6 +505,10 @@ class TestTasksApi(base.ApiTest):
|
||||
|
||||
self.assertEqual(0, len(actual_tasks))
|
||||
|
||||
# NOTE(nikhil): wait for all task executions to finish before exiting
|
||||
# else there is a risk of running into deadlock
|
||||
self._wait_on_task_execution()
|
||||
|
||||
def test_delete_task(self):
|
||||
# 0. POST /tasks
|
||||
# Create a new task with valid input and type
|
||||
@ -536,3 +544,7 @@ class TestTasksApi(base.ApiTest):
|
||||
headers=minimal_task_headers())
|
||||
self.assertEqual(response.status, 200)
|
||||
self.assertIsNotNone(content)
|
||||
|
||||
# NOTE(nikhil): wait for all task executions to finish before exiting
|
||||
# else there is a risk of running into deadlock
|
||||
self._wait_on_task_execution()
|
||||
|
0
glance/tests/unit/async/__init__.py
Normal file
0
glance/tests/unit/async/__init__.py
Normal file
49
glance/tests/unit/async/test_async.py
Normal file
49
glance/tests/unit/async/test_async.py
Normal file
@ -0,0 +1,49 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import mock
|
||||
|
||||
import glance.async
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
|
||||
class TestTaskExecutor(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestTaskExecutor, self).setUp()
|
||||
self.context = mock.Mock()
|
||||
self.task_repo = mock.Mock()
|
||||
self.image_repo = mock.Mock()
|
||||
self.image_factory = mock.Mock()
|
||||
self.executor = glance.async.TaskExecutor(self.context,
|
||||
self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
||||
|
||||
def test_begin_processing(self):
|
||||
# setup
|
||||
task_id = mock.ANY
|
||||
task_type = mock.ANY
|
||||
task = mock.Mock()
|
||||
|
||||
with mock.patch.object(
|
||||
glance.async.TaskExecutor,
|
||||
'_run') as mock_run:
|
||||
self.task_repo.get.return_value = task
|
||||
self.executor.begin_processing(task_id)
|
||||
|
||||
# assert the call
|
||||
mock_run.assert_called_once_with(task_id, task_type)
|
47
glance/tests/unit/async/test_eventlet_executor.py
Normal file
47
glance/tests/unit/async/test_eventlet_executor.py
Normal file
@ -0,0 +1,47 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from glance.async import eventlet_executor
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
|
||||
class TestTaskExecutor(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestTaskExecutor, self).setUp()
|
||||
self.context = mock.Mock()
|
||||
self.task_repo = mock.Mock()
|
||||
self.image_repo = mock.Mock()
|
||||
self.image_factory = mock.Mock()
|
||||
self.executor = eventlet_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
||||
|
||||
def test_begin_processing(self):
|
||||
task_id = mock.ANY
|
||||
task = mock.Mock()
|
||||
task.type = mock.ANY
|
||||
|
||||
with mock.patch.object(eventlet_executor.TaskExecutor,
|
||||
'_run') as mock_run:
|
||||
self.task_repo.get.return_value = task
|
||||
self.executor.begin_processing(task_id)
|
||||
|
||||
# assert the call
|
||||
mock_run.assert_called_once_with(task_id, task.type)
|
0
glance/tests/unit/common/scripts/__init__.py
Normal file
0
glance/tests/unit/common/scripts/__init__.py
Normal file
92
glance/tests/unit/common/scripts/image_import/test_main.py
Normal file
92
glance/tests/unit/common/scripts/image_import/test_main.py
Normal file
@ -0,0 +1,92 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import urllib2
|
||||
|
||||
from glance.common.scripts.image_import import main as image_import_script
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
|
||||
class TestImageImport(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestImageImport, self).setUp()
|
||||
|
||||
def test_run(self):
|
||||
with mock.patch.object(image_import_script,
|
||||
'_execute') as mock_execute:
|
||||
task_id = mock.ANY
|
||||
context = mock.ANY
|
||||
task_repo = mock.ANY
|
||||
image_repo = mock.ANY
|
||||
image_factory = mock.ANY
|
||||
image_import_script.run(task_id, context, task_repo, image_repo,
|
||||
image_factory)
|
||||
|
||||
mock_execute.assert_called_once_with(task_id, task_repo, image_repo,
|
||||
image_factory)
|
||||
|
||||
def test_import_image(self):
|
||||
image_id = mock.ANY
|
||||
image = mock.Mock(image_id=image_id)
|
||||
image_repo = mock.Mock()
|
||||
image_repo.get.return_value = image
|
||||
image_factory = mock.ANY
|
||||
task_input = mock.Mock(image_properties=mock.ANY)
|
||||
uri = mock.ANY
|
||||
with mock.patch.object(image_import_script,
|
||||
'create_image') as mock_create_image:
|
||||
with mock.patch.object(image_import_script,
|
||||
'set_image_data') as mock_set_img_data:
|
||||
mock_create_image.return_value = image
|
||||
self.assertEqual(
|
||||
image_id,
|
||||
image_import_script.import_image(image_repo, image_factory,
|
||||
task_input, None, uri))
|
||||
self.assertEqual('active', image.status)
|
||||
self.assertTrue(image_repo.save.called)
|
||||
mock_set_img_data.assert_called_once_with(image, uri, None)
|
||||
self.assertTrue(image_repo.get.called)
|
||||
self.assertTrue(image_repo.save.called)
|
||||
|
||||
def test_create_image(self):
|
||||
image = mock.ANY
|
||||
image_repo = mock.Mock()
|
||||
image_factory = mock.Mock()
|
||||
image_factory.new_image.return_value = image
|
||||
|
||||
# Note: include some base properties to ensure no error while
|
||||
# attempting to verify them
|
||||
image_properties = {'disk_format': 'foo',
|
||||
'id': 'bar'}
|
||||
|
||||
self.assertEqual(image,
|
||||
image_import_script.create_image(image_repo,
|
||||
image_factory,
|
||||
image_properties,
|
||||
None))
|
||||
|
||||
def test_set_image_data_http(self):
|
||||
uri = 'http://www.example.com'
|
||||
image = mock.Mock()
|
||||
self.assertEqual(None,
|
||||
image_import_script.set_image_data(image, uri, None))
|
||||
|
||||
def test_set_image_data_http_error(self):
|
||||
uri = 'blahhttp://www.example.com'
|
||||
image = mock.Mock()
|
||||
self.assertRaises(urllib2.URLError,
|
||||
image_import_script.set_image_data, image, uri, None)
|
139
glance/tests/unit/common/scripts/test_scripts_utils.py
Normal file
139
glance/tests/unit/common/scripts/test_scripts_utils.py
Normal file
@ -0,0 +1,139 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import urllib2
|
||||
|
||||
from glance.common import exception
|
||||
from glance.common.scripts import utils as script_utils
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
|
||||
class TestScriptsUtils(test_utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestScriptsUtils, self).setUp()
|
||||
|
||||
def test_get_task(self):
|
||||
task = mock.ANY
|
||||
task_repo = mock.Mock(return_value=task)
|
||||
task_id = mock.ANY
|
||||
self.assertEqual(task, script_utils.get_task(task_repo, task_id))
|
||||
|
||||
def test_unpack_task_input(self):
|
||||
task_input = {"import_from": "foo",
|
||||
"import_from_format": "bar",
|
||||
"image_properties": "baz"}
|
||||
task = mock.Mock(task_input=task_input)
|
||||
self.assertEqual(task_input,
|
||||
script_utils.unpack_task_input(task))
|
||||
|
||||
def test_unpack_task_input_error(self):
|
||||
task_input1 = {"import_from_format": "bar", "image_properties": "baz"}
|
||||
task_input2 = {"import_from": "foo", "image_properties": "baz"}
|
||||
task_input3 = {"import_from": "foo", "import_from_format": "bar"}
|
||||
task1 = mock.Mock(task_input=task_input1)
|
||||
task2 = mock.Mock(task_input=task_input2)
|
||||
task3 = mock.Mock(task_input=task_input3)
|
||||
self.assertRaises(exception.Invalid,
|
||||
script_utils.unpack_task_input, task1)
|
||||
self.assertRaises(exception.Invalid,
|
||||
script_utils.unpack_task_input, task2)
|
||||
self.assertRaises(exception.Invalid,
|
||||
script_utils.unpack_task_input, task3)
|
||||
|
||||
def test_set_base_image_properties(self):
|
||||
properties = {}
|
||||
script_utils.set_base_image_properties(properties)
|
||||
self.assertIn('disk_format', properties)
|
||||
self.assertIn('container_format', properties)
|
||||
|
||||
def test_validate_location_http(self):
|
||||
location = 'http://example.com'
|
||||
self.assertEqual(location,
|
||||
script_utils.validate_location_uri(location))
|
||||
|
||||
def test_validate_location_https(self):
|
||||
location = 'https://example.com'
|
||||
self.assertEqual(location,
|
||||
script_utils.validate_location_uri(location))
|
||||
|
||||
def test_validate_location_none_error(self):
|
||||
self.assertRaises(exception.BadStoreUri,
|
||||
script_utils.validate_location_uri, '')
|
||||
|
||||
def test_validate_location_file_location_error(self):
|
||||
self.assertRaises(StandardError, script_utils.validate_location_uri,
|
||||
"file:///tmp")
|
||||
|
||||
def test_validate_location_unsupported_error(self):
|
||||
location = 'swift'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'swift+http'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'swift+https'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'swift+config'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'vsphere'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'sheepdog://'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 's3+https://'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'rbd://'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'gridfs://'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
location = 'cinder://'
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.validate_location_uri, location)
|
||||
|
||||
def test_get_image_data_http(self):
|
||||
uri = "http://example.com"
|
||||
response = urllib2.urlopen(uri)
|
||||
expected = response.read()
|
||||
self.assertEqual(expected,
|
||||
script_utils.get_image_data_iter(uri).read())
|
||||
|
||||
def test_get_image_data_https(self):
|
||||
uri = "https://example.com"
|
||||
response = urllib2.urlopen(uri)
|
||||
expected = response.read()
|
||||
self.assertEqual(expected,
|
||||
script_utils.get_image_data_iter(uri).read())
|
||||
|
||||
def test_get_image_data_http_error(self):
|
||||
uri = "http:/example.com"
|
||||
self.assertRaises(urllib2.URLError,
|
||||
script_utils.get_image_data_iter,
|
||||
uri)
|
42
glance/tests/unit/common/test_scripts.py
Normal file
42
glance/tests/unit/common/test_scripts.py
Normal file
@ -0,0 +1,42 @@
|
||||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import mock
|
||||
|
||||
import glance.common.scripts as scripts
|
||||
from glance.common.scripts.image_import import main as image_import
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
|
||||
class TestScripts(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestScripts, self).setUp()
|
||||
|
||||
def test_run_task(self):
|
||||
task_id = mock.ANY
|
||||
task_type = 'import'
|
||||
context = mock.ANY
|
||||
task_repo = mock.ANY
|
||||
image_repo = mock.ANY
|
||||
image_factory = mock.ANY
|
||||
|
||||
with mock.patch.object(image_import, 'run') as mock_run:
|
||||
scripts.run_task(task_id, task_type, context, task_repo,
|
||||
image_repo, image_factory)
|
||||
|
||||
mock_run.assert_called_once_with(task_id, context, task_repo,
|
||||
image_repo, image_factory)
|
@ -20,6 +20,7 @@ import uuid
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
|
||||
import glance.async
|
||||
from glance.common import exception
|
||||
from glance import domain
|
||||
from glance.openstack.common import timeutils
|
||||
@ -447,6 +448,16 @@ class TestTask(test_utils.BaseTestCase):
|
||||
expected
|
||||
)
|
||||
|
||||
@mock.patch.object(glance.async.TaskExecutor, 'begin_processing')
|
||||
def test_run(self, mock_begin_processing):
|
||||
executor = glance.async.TaskExecutor(context=mock.ANY,
|
||||
task_repo=mock.ANY,
|
||||
image_repo=mock.ANY,
|
||||
image_factory=mock.ANY)
|
||||
self.task.run(executor)
|
||||
|
||||
mock_begin_processing.assert_called_once_with(self.task.task_id)
|
||||
|
||||
|
||||
class TestTaskStub(test_utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
@ -487,3 +498,46 @@ class TestTaskStub(test_utils.BaseTestCase):
|
||||
'updated_at'
|
||||
)
|
||||
self.assertEqual(status, task.status)
|
||||
|
||||
|
||||
class TestTaskExecutorFactory(test_utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestTaskExecutorFactory, self).setUp()
|
||||
self.task_repo = mock.Mock()
|
||||
self.image_repo = mock.Mock()
|
||||
self.image_factory = mock.Mock()
|
||||
|
||||
def test_init(self):
|
||||
task_executor_factory = domain.TaskExecutorFactory(self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
||||
self.assertEqual(self.task_repo, task_executor_factory.task_repo)
|
||||
|
||||
def test_new_task_executor(self):
|
||||
task_executor_factory = domain.TaskExecutorFactory(self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
||||
context = mock.Mock()
|
||||
with mock.patch.object(glance.openstack.common.importutils,
|
||||
'import_class') as mock_import_class:
|
||||
mock_executor = mock.Mock()
|
||||
mock_import_class.return_value = mock_executor
|
||||
task_executor_factory.new_task_executor(context)
|
||||
|
||||
mock_executor.assert_called_once_with(context,
|
||||
self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
||||
|
||||
def test_new_task_executor_error(self):
|
||||
task_executor_factory = domain.TaskExecutorFactory(self.task_repo,
|
||||
self.image_repo,
|
||||
self.image_factory)
|
||||
context = mock.Mock()
|
||||
with mock.patch.object(glance.openstack.common.importutils,
|
||||
'import_class') as mock_import_class:
|
||||
mock_import_class.side_effect = ImportError
|
||||
|
||||
self.assertRaises(ImportError,
|
||||
task_executor_factory.new_task_executor,
|
||||
context)
|
||||
|
@ -21,6 +21,7 @@ from oslo.config import cfg
|
||||
from oslo import messaging
|
||||
import webob
|
||||
|
||||
import glance.async
|
||||
from glance.common import exception
|
||||
import glance.context
|
||||
from glance import notifier
|
||||
@ -87,7 +88,7 @@ class TaskRepoStub(object):
|
||||
def add(self, *args, **kwargs):
|
||||
return 'task_from_add'
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
def get_task(self, *args, **kwargs):
|
||||
return 'task_from_get'
|
||||
|
||||
def list(self, *args, **kwargs):
|
||||
@ -486,7 +487,10 @@ class TestTaskNotifications(utils.BaseTestCase):
|
||||
self.fail('Notification contained location field.')
|
||||
|
||||
def test_task_run_notification(self):
|
||||
self.task_proxy.run(executor=None)
|
||||
with mock.patch('glance.async.TaskExecutor') as mock_executor:
|
||||
executor = mock_executor.return_value
|
||||
executor._run.return_value = mock.Mock()
|
||||
self.task_proxy.run(executor=mock_executor)
|
||||
output_logs = self.notifier.get_logs()
|
||||
self.assertEqual(len(output_logs), 1)
|
||||
output_log = output_logs[0]
|
||||
|
@ -17,10 +17,12 @@
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import mock
|
||||
import webob
|
||||
|
||||
import glance.api.v2.tasks
|
||||
import glance.domain
|
||||
import glance.gateway
|
||||
from glance.openstack.common import jsonutils
|
||||
from glance.openstack.common import timeutils
|
||||
from glance.tests.unit import base
|
||||
@ -282,7 +284,37 @@ class TestTasksController(test_utils.BaseTestCase):
|
||||
self.assertRaises(webob.exc.HTTPNotFound,
|
||||
self.controller.get, request, UUID4)
|
||||
|
||||
def test_create(self):
|
||||
@mock.patch.object(glance.gateway.Gateway, 'get_task_factory')
|
||||
@mock.patch.object(glance.gateway.Gateway, 'get_task_executor_factory')
|
||||
@mock.patch.object(glance.gateway.Gateway, 'get_task_repo')
|
||||
def test_create(self, mock_get_task_repo, mock_get_task_executor_factory,
|
||||
mock_get_task_factory):
|
||||
# setup
|
||||
request = unit_test_utils.get_fake_request()
|
||||
task = {
|
||||
"type": "import",
|
||||
"input": {
|
||||
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
|
||||
"image_from_format": "qcow2"
|
||||
}
|
||||
}
|
||||
new_task = mock.Mock()
|
||||
mock_get_task_factory.new_task.return_value = new_task
|
||||
mock_get_task_factory.new_task.run.return_value = mock.ANY
|
||||
mock_get_task_executor_factory.new_task_exector.return_value = \
|
||||
mock.Mock()
|
||||
mock_get_task_repo.add.return_value = mock.Mock()
|
||||
|
||||
# call
|
||||
self.controller.create(request, task=task)
|
||||
|
||||
# assert
|
||||
mock_get_task_factory.new_task.assert_called_once()
|
||||
mock_get_task_repo.add.assert_called_once()
|
||||
mock_get_task_executor_factory.new_task_exector.assert_called_once()
|
||||
mock_get_task_factory.new_task.run.assert_called_once()
|
||||
|
||||
def test_notifications_on_create(self):
|
||||
request = unit_test_utils.get_fake_request()
|
||||
task = {"type": "import", "input": {
|
||||
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
|
||||
|
Loading…
Reference in New Issue
Block a user