Merge "Glance Image Introspection"
This commit is contained in:
commit
07712bbf3f
@ -460,18 +460,35 @@ revocation_cache_time = 10
|
||||
# The default value for task_executor is taskflow.
|
||||
# task_executor = taskflow
|
||||
|
||||
# Work dir for asynchronous task operations. The directory set here
|
||||
# will be used to operate over images - normally before they are
|
||||
# imported in the destination store. When providing work dir, make sure
|
||||
# enough space is provided for concurrent tasks to run efficiently
|
||||
# without running out of space. A rough estimation can be done by
|
||||
# multiplying the number of `max_workers` - or the N of workers running
|
||||
# - by an average image size (e.g 500MB). The image size estimation
|
||||
# should be done based on the average size in your deployment. Note that
|
||||
# depending on the tasks running you may need to multiply this number by
|
||||
# some factor depending on what the task does. For example, you may want
|
||||
# to double the available size if image conversion is enabled. All this
|
||||
# being said, remember these are just estimations and you should do them
|
||||
# based on the worst case scenario and be prepared to act in case they
|
||||
# were wrong.
|
||||
# work_dir=None
|
||||
|
||||
# Specifies the maximum number of eventlet threads which can be spun up by
|
||||
# the eventlet based task executor to perform execution of Glance tasks.
|
||||
# DEPRECATED: Use [taskflow_executor]/max_workers instead.
|
||||
# eventlet_executor_pool_size = 1000
|
||||
|
||||
[taskflow_executor]
|
||||
# The mode in which the engine will run. Can be 'serial' or 'parallel'.
|
||||
# The mode in which the engine will run. Can bedefault', 'serial',
|
||||
# 'parallel' or 'worker-based'
|
||||
#engine_mode = serial
|
||||
|
||||
# The number of parallel activities executed at the same time by
|
||||
# the engine. The value can be greater than one when the engine mode is
|
||||
# 'parallel', otherwise this value will be ignored.
|
||||
# 'parallel' or 'worker-based', otherwise this value will be ignored.
|
||||
#max_workers = 10
|
||||
|
||||
[glance_store]
|
||||
|
0
glance/async/flows/__init__.py
Normal file
0
glance/async/flows/__init__.py
Normal file
431
glance/async/flows/base_import.py
Normal file
431
glance/async/flows/base_import.py
Normal file
@ -0,0 +1,431 @@
|
||||
# Copyright 2015 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import glance_store as store_api
|
||||
from glance_store import backend
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
from stevedore import extension
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow import retry
|
||||
from taskflow import task
|
||||
|
||||
from glance.common import exception
|
||||
from glance.common.scripts.image_import import main as image_import
|
||||
from glance.common.scripts import utils as script_utils
|
||||
from glance.common import utils as common_utils
|
||||
from glance import i18n
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
_ = i18n._
|
||||
_LE = i18n._LE
|
||||
_LI = i18n._LI
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class _CreateImage(task.Task):
|
||||
|
||||
default_provides = 'image_id'
|
||||
|
||||
def __init__(self, task_id, task_type, task_repo, image_repo,
|
||||
image_factory):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.task_repo = task_repo
|
||||
self.image_repo = image_repo
|
||||
self.image_factory = image_factory
|
||||
super(_CreateImage, self).__init__(
|
||||
name='%s-CreateImage-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self):
|
||||
task = script_utils.get_task(self.task_repo, self.task_id)
|
||||
if task is None:
|
||||
return
|
||||
task_input = script_utils.unpack_task_input(task)
|
||||
image = image_import.create_image(
|
||||
self.image_repo, self.image_factory,
|
||||
task_input.get('image_properties'), self.task_id)
|
||||
|
||||
LOG.debug("Task %(task_id)s created image %(image_id)s" %
|
||||
{'task_id': task.task_id, 'image_id': image.image_id})
|
||||
return image.image_id
|
||||
|
||||
def revert(self, *args, **kwargs):
|
||||
# TODO(flaper87): Define the revert rules for images on failures.
|
||||
# Deleting the image may not be what we want since users could upload
|
||||
# the image data in a separate step. However, it really depends on
|
||||
# when the failure happened. I guess we should check if data has been
|
||||
# written, although at that point failures are (should be) unexpected,
|
||||
# at least image-workflow wise.
|
||||
pass
|
||||
|
||||
|
||||
class _ImportToFS(task.Task):
|
||||
|
||||
default_provides = 'file_path'
|
||||
|
||||
def __init__(self, task_id, task_type, task_repo, uri):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.task_repo = task_repo
|
||||
self.uri = uri
|
||||
super(_ImportToFS, self).__init__(
|
||||
name='%s-ImportToFS-%s' % (task_type, task_id))
|
||||
|
||||
if CONF.task.work_dir is None:
|
||||
msg = (_("%(task_id)s of %(task_type)s not configured "
|
||||
"properly. Missing work dir: %(work_dir)s") %
|
||||
{'task_id': self.task_id,
|
||||
'task_type': self.task_type,
|
||||
'work_dir': CONF.task.work_dir})
|
||||
raise exception.BadTaskConfiguration(msg)
|
||||
|
||||
self.store = self._build_store()
|
||||
|
||||
def _build_store(self):
|
||||
# NOTE(flaper87): Due to the nice glance_store api (#sarcasm), we're
|
||||
# forced to build our own config object, register the required options
|
||||
# (and by required I mean *ALL* of them, even the ones we don't want),
|
||||
# and create our own store instance by calling a private function.
|
||||
# This is certainly unfortunate but it's the best we can do until the
|
||||
# glance_store refactor is done. A good thing is that glance_store is
|
||||
# under our team's management and it gates on Glance so changes to
|
||||
# this API will (should?) break task's tests.
|
||||
conf = cfg.ConfigOpts()
|
||||
backend.register_opts(conf)
|
||||
conf.set_override('filesystem_store_datadir',
|
||||
CONF.task.work_dir,
|
||||
group='glance_store')
|
||||
|
||||
# NOTE(flaper87): Do not even try to judge me for this... :(
|
||||
# With the glance_store refactor, this code will change, until
|
||||
# that happens, we don't have a better option and this is the
|
||||
# least worst one, IMHO.
|
||||
store = backend._load_store(conf, 'file')
|
||||
|
||||
if store is None:
|
||||
msg = (_("%(task_id)s of %(task_type)s not configured "
|
||||
"properly. Could not load the filesystem store") %
|
||||
{'task_id': self.task_id, 'task_type': self.task_type})
|
||||
raise exception.BadTaskConfiguration(msg)
|
||||
|
||||
store.configure()
|
||||
return store
|
||||
|
||||
def execute(self, image_id):
|
||||
"""Create temp file into store and return path to it
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
"""
|
||||
# NOTE(flaper87): We've decided to use a separate `work_dir` for
|
||||
# this task - and tasks coming after this one - as a way to expect
|
||||
# users to configure a local store for pre-import works on the image
|
||||
# to happen.
|
||||
#
|
||||
# While using any path should be "technically" fine, it's not what
|
||||
# we recommend as the best solution. For more details on this, please
|
||||
# refer to the comment in the `_ImportToStore.execute` method.
|
||||
data = script_utils.get_image_data_iter(self.uri)
|
||||
|
||||
# NOTE(jokke): Using .tasks_import to ease debugging. The file name
|
||||
# is specific so we know exactly where it's coming from.
|
||||
tmp_id = "%s.tasks_import" % image_id
|
||||
path = self.store.add(tmp_id, data, 0, context=None)[0]
|
||||
return path
|
||||
|
||||
def revert(self, image_id, result=None, **kwargs):
|
||||
# NOTE(flaper87): If result is None, it probably
|
||||
# means this task failed. Otherwise, we would have
|
||||
# a result from its execution.
|
||||
if result is None:
|
||||
return
|
||||
|
||||
if os.path.exists(result.split("file://")[-1]):
|
||||
store_api.delete_from_backend(result)
|
||||
|
||||
|
||||
class _DeleteFromFS(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
super(_DeleteFromFS, self).__init__(
|
||||
name='%s-DeleteFromFS-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, file_path):
|
||||
"""Remove file from the backend
|
||||
|
||||
:param file_path: path to the file being deleted
|
||||
"""
|
||||
store_api.delete_from_backend(file_path)
|
||||
|
||||
|
||||
class _ImportToStore(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo, uri):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
self.uri = uri
|
||||
super(_ImportToStore, self).__init__(
|
||||
name='%s-ImportToStore-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id, file_path=None):
|
||||
"""Bringing the introspected image to back end store
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
:param file_path: path to the image file
|
||||
"""
|
||||
# NOTE(flaper87): There are a couple of interesting bits in the
|
||||
# interaction between this task and the `_ImportToFS` one. I'll try
|
||||
# to cover them in this comment.
|
||||
#
|
||||
# NOTE(flaper87):
|
||||
# `_ImportToFS` downloads the image to a dedicated `work_dir` which
|
||||
# needs to be configured in advance (please refer to the config option
|
||||
# docs for more info). The motivation behind this is also explained in
|
||||
# the `_ImportToFS.execute` method.
|
||||
#
|
||||
# Due to the fact that we have an `_ImportToFS` task which downloads
|
||||
# the image data already, we need to be as smart as we can in this task
|
||||
# to avoid downloading the data several times and reducing the copy or
|
||||
# write times. There are several scenarios where the interaction
|
||||
# between this task and `_ImportToFS` could be improved. All these
|
||||
# scenarios assume the `_ImportToFS` task has been executed before
|
||||
# and/or in a more abstract scenario, that `file_path` is being
|
||||
# provided.
|
||||
#
|
||||
# Scenario 1: FS Store is Remote, introspection enabled,
|
||||
# conversion disabled
|
||||
#
|
||||
# In this scenario, the user would benefit from having the scratch path
|
||||
# being the same path as the fs store. Only one write would happen and
|
||||
# an extra read will happen in order to introspect the image. Note that
|
||||
# this read is just for the image headers and not the entire file.
|
||||
#
|
||||
# Scenario 2: FS Store is remote, introspection enabled,
|
||||
# conversion enabled
|
||||
#
|
||||
# In this scenario, the user would benefit from having a *local* store
|
||||
# into which the image can be converted. This will require downloading
|
||||
# the image locally, converting it and then copying the converted image
|
||||
# to the remote store.
|
||||
#
|
||||
# Scenario 3: FS Store is local, introspection enabled,
|
||||
# conversion disabled
|
||||
# Scenario 4: FS Store is local, introspection enabled,
|
||||
# conversion enabled
|
||||
#
|
||||
# In both these scenarios the user shouldn't care if the FS
|
||||
# store path and the work dir are the same, therefore probably
|
||||
# benefit, about the scratch path and the FS store being the
|
||||
# same from a performance perspective. Space wise, regardless
|
||||
# of the scenario, the user will have to account for it in
|
||||
# advance.
|
||||
#
|
||||
# Lets get to it and identify the different scenarios in the
|
||||
# implementation
|
||||
image = self.image_repo.get(image_id)
|
||||
image.status = 'saving'
|
||||
self.image_repo.save(image)
|
||||
|
||||
# NOTE(flaper87): Let's dance... and fall
|
||||
#
|
||||
# Unfortunatelly, because of the way our domain layers work and
|
||||
# the checks done in the FS store, we can't simply rename the file
|
||||
# and set the location. To do that, we'd have to duplicate the logic
|
||||
# of every and each of the domain factories (quota, location, etc)
|
||||
# and we'd also need to hack the FS store to prevent it from raising
|
||||
# a "duplication path" error. I'd rather have this task copying the
|
||||
# image bits one more time than duplicating all that logic.
|
||||
#
|
||||
# Since I don't think this should be the definitive solution, I'm
|
||||
# leaving the code below as a reference for what should happen here
|
||||
# once the FS store and domain code will be able to handle this case.
|
||||
#
|
||||
# if file_path is None:
|
||||
# image_import.set_image_data(image, self.uri, None)
|
||||
# return
|
||||
|
||||
# NOTE(flaper87): Don't assume the image was stored in the
|
||||
# work_dir. Think in the case this path was provided by another task.
|
||||
# Also, lets try to neither assume things nor create "logic"
|
||||
# dependencies between this task and `_ImportToFS`
|
||||
#
|
||||
# base_path = os.path.dirname(file_path.split("file://")[-1])
|
||||
|
||||
# NOTE(flaper87): Hopefully just scenarios #3 and #4. I say
|
||||
# hopefully because nothing prevents the user to use the same
|
||||
# FS store path as a work dir
|
||||
#
|
||||
# image_path = os.path.join(base_path, image_id)
|
||||
#
|
||||
# if (base_path == CONF.glance_store.filesystem_store_datadir or
|
||||
# base_path in CONF.glance_store.filesystem_store_datadirs):
|
||||
# os.rename(file_path, image_path)
|
||||
#
|
||||
# image_import.set_image_data(image, image_path, None)
|
||||
|
||||
image_import.set_image_data(image, file_path or self.uri, None)
|
||||
|
||||
|
||||
class _SaveImage(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
super(_SaveImage, self).__init__(
|
||||
name='%s-SaveImage-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id):
|
||||
"""Transition image status to active
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
"""
|
||||
new_image = self.image_repo.get(image_id)
|
||||
if new_image.status == 'saving':
|
||||
# NOTE(flaper87): THIS IS WRONG!
|
||||
# we should be doing atomic updates to avoid
|
||||
# race conditions. This happens in other places
|
||||
# too.
|
||||
new_image.status = 'active'
|
||||
self.image_repo.save(new_image)
|
||||
|
||||
|
||||
class _CompleteTask(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, task_repo):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.task_repo = task_repo
|
||||
super(_CompleteTask, self).__init__(
|
||||
name='%s-CompleteTask-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id):
|
||||
"""Finishing the task flow
|
||||
|
||||
:param image_id: Glance Image ID
|
||||
"""
|
||||
task = script_utils.get_task(self.task_repo, self.task_id)
|
||||
if task is None:
|
||||
return
|
||||
try:
|
||||
task.succeed({'image_id': image_id})
|
||||
except Exception as e:
|
||||
# Note: The message string contains Error in it to indicate
|
||||
# in the task.message that it's a error message for the user.
|
||||
|
||||
# TODO(nikhil): need to bring back save_and_reraise_exception when
|
||||
# necessary
|
||||
err_msg = ("Error: " + six.text_type(type(e)) + ': ' +
|
||||
common_utils.exception_to_str(e))
|
||||
log_msg = err_msg + _LE("Task ID %s") % task.task_id
|
||||
LOG.exception(log_msg)
|
||||
|
||||
task.fail(err_msg)
|
||||
finally:
|
||||
self.task_repo.save(task)
|
||||
|
||||
LOG.info(_LI("%(task_id)s of %(task_type)s completed") %
|
||||
{'task_id': self.task_id, 'task_type': self.task_type})
|
||||
|
||||
|
||||
def _get_import_flows(**kwargs):
|
||||
extensions = extension.ExtensionManager('glance.flows.import',
|
||||
invoke_on_load=True,
|
||||
invoke_kwds=kwargs)
|
||||
|
||||
for ext in extensions.extensions:
|
||||
yield ext.obj
|
||||
|
||||
|
||||
def get_flow(**kwargs):
|
||||
"""Return task flow
|
||||
|
||||
:param task_id: Task ID
|
||||
:param task_type: Type of the task
|
||||
:param task_repo: Task repo
|
||||
:param image_repo: Image repository used
|
||||
:param image_factory: Glance Image Factory
|
||||
:param uri: uri for the image file
|
||||
"""
|
||||
task_id = kwargs.get('task_id')
|
||||
task_type = kwargs.get('task_type')
|
||||
task_repo = kwargs.get('task_repo')
|
||||
image_repo = kwargs.get('image_repo')
|
||||
image_factory = kwargs.get('image_factory')
|
||||
uri = kwargs.get('uri')
|
||||
|
||||
flow = lf.Flow(task_type, retry=retry.AlwaysRevert()).add(
|
||||
_CreateImage(task_id, task_type, task_repo, image_repo, image_factory))
|
||||
|
||||
import_to_store = _ImportToStore(task_id, task_type, image_repo, uri)
|
||||
|
||||
try:
|
||||
# NOTE(flaper87): ImportToLocal and DeleteFromLocal shouldn't be here.
|
||||
# Ideally, we should have the different import flows doing this for us
|
||||
# and this function should clean up duplicated tasks. For example, say
|
||||
# 2 flows need to have a local copy of the image - ImportToLocal - in
|
||||
# order to be able to complete the task - i.e Introspect-. In that
|
||||
# case, the introspect.get_flow call should add both, ImportToLocal and
|
||||
# DeleteFromLocal, to the flow and this function will reduce the
|
||||
# duplicated calls to those tasks by creating a linear flow that
|
||||
# ensures those are called before the other tasks. For now, I'm
|
||||
# keeping them here, though.
|
||||
limbo = lf.Flow(task_type).add(_ImportToFS(task_id,
|
||||
task_type,
|
||||
task_repo,
|
||||
uri))
|
||||
|
||||
for subflow in _get_import_flows(**kwargs):
|
||||
limbo.add(subflow)
|
||||
|
||||
# NOTE(flaper87): We have hard-coded 2 tasks,
|
||||
# if there aren't more than 2, it means that
|
||||
# no subtask has been registered.
|
||||
if len(limbo) > 1:
|
||||
flow.add(limbo)
|
||||
|
||||
# NOTE(flaper87): Until this implementation gets smarter,
|
||||
# make sure ImportToStore is called *after* the imported
|
||||
# flow stages. If not, the image will be set to saving state
|
||||
# invalidating tasks like Introspection or Convert.
|
||||
flow.add(import_to_store)
|
||||
|
||||
# NOTE(flaper87): Since this is an "optional" task but required
|
||||
# when `limbo` is executed, we're adding it in its own subflow
|
||||
# to isolat it from the rest of the flow.
|
||||
delete_flow = lf.Flow(task_type).add(_DeleteFromFS(task_id,
|
||||
task_type))
|
||||
flow.add(delete_flow)
|
||||
else:
|
||||
flow.add(import_to_store)
|
||||
except exception.BadTaskConfiguration:
|
||||
# NOTE(flaper87): If something goes wrong with the load of
|
||||
# import tasks, make sure we go on.
|
||||
flow.add(import_to_store)
|
||||
|
||||
flow.add(
|
||||
_SaveImage(task_id, task_type, image_repo),
|
||||
_CompleteTask(task_id, task_type, task_repo)
|
||||
)
|
||||
return flow
|
89
glance/async/flows/introspect.py
Normal file
89
glance/async/flows/introspect.py
Normal file
@ -0,0 +1,89 @@
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from oslo_concurrency import processutils as putils
|
||||
from oslo_utils import excutils
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
|
||||
from glance.async import utils
|
||||
from glance import i18n
|
||||
|
||||
|
||||
_LE = i18n._LE
|
||||
_LI = i18n._LI
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _Introspect(utils.OptionalTask):
|
||||
"""Taskflow to pull the embedded metadata out of image file"""
|
||||
|
||||
def __init__(self, task_id, task_type, image_repo):
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.image_repo = image_repo
|
||||
super(_Introspect, self).__init__(
|
||||
name='%s-Introspect-%s' % (task_type, task_id))
|
||||
|
||||
def execute(self, image_id, file_path):
|
||||
"""Does the actual introspection
|
||||
|
||||
:param image_id: Glance image ID
|
||||
:param file_path: Path to the file being introspected
|
||||
"""
|
||||
|
||||
try:
|
||||
stdout, stderr = putils.trycmd('qemu-img', 'info',
|
||||
'--output=json', file_path,
|
||||
log_errors=putils.LOG_ALL_ERRORS)
|
||||
except OSError as exc:
|
||||
# NOTE(flaper87): errno == 2 means the executable file
|
||||
# was not found. For now, log an error and move forward
|
||||
# until we have a better way to enable/disable optional
|
||||
# tasks.
|
||||
if exc.errno != 2:
|
||||
with excutils.save_and_reraise_exception():
|
||||
msg = (_LE('Failed to execute introspection '
|
||||
'%(task_id)s: %(exc)s') %
|
||||
{'task_id': self.task_id, 'exc': exc.message})
|
||||
LOG.error(msg)
|
||||
return
|
||||
|
||||
if stderr:
|
||||
raise RuntimeError(stderr)
|
||||
|
||||
metadata = json.loads(stdout)
|
||||
new_image = self.image_repo.get(image_id)
|
||||
new_image.virtual_size = metadata.get('virtual-size', 0)
|
||||
new_image.disk_format = metadata.get('format')
|
||||
self.image_repo.save(new_image)
|
||||
LOG.debug("%(task_id)s: Introspection successful: %(file)s" %
|
||||
{'task_id': self.task_id, 'file': file_path})
|
||||
return new_image
|
||||
|
||||
|
||||
def get_flow(**kwargs):
|
||||
task_id = kwargs.get('task_id')
|
||||
task_type = kwargs.get('task_type')
|
||||
image_repo = kwargs.get('image_repo')
|
||||
|
||||
LOG.debug("Flow: %(task_type)s with ID %(id)s on %(repo)s" %
|
||||
{'task_type': task_type, 'id': task_id, 'repo': image_repo})
|
||||
|
||||
return lf.Flow(task_type).add(
|
||||
_Introspect(task_id, task_type, image_repo),
|
||||
)
|
@ -17,20 +17,18 @@ import contextlib
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import excutils
|
||||
from stevedore import driver
|
||||
from taskflow import engines
|
||||
from taskflow.listeners import logging as llistener
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow import task
|
||||
from taskflow.types import futures
|
||||
from taskflow.utils import eventlet_utils
|
||||
|
||||
import glance.async
|
||||
import glance.common.scripts as scripts
|
||||
from glance.common.scripts import utils as script_utils
|
||||
from glance import i18n
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
_ = i18n._
|
||||
_LI = i18n._LI
|
||||
_LE = i18n._LE
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -56,23 +54,6 @@ CONF = cfg.CONF
|
||||
CONF.register_opts(taskflow_executor_opts, group='taskflow_executor')
|
||||
|
||||
|
||||
class _Task(task.Task):
|
||||
|
||||
def __init__(self, task_id, task_type, context, task_repo,
|
||||
image_repo, image_factory):
|
||||
super(_Task, self).__init__(name='%s-%s' % (task_type, task_id))
|
||||
self.task_id = task_id
|
||||
self.task_type = task_type
|
||||
self.context = context
|
||||
self.task_repo = task_repo
|
||||
self.image_repo = image_repo
|
||||
self.image_factory = image_factory
|
||||
|
||||
def execute(self):
|
||||
scripts.run_task(self.task_id, self.task_type, self.context,
|
||||
self.task_repo, self.image_repo, self.image_factory)
|
||||
|
||||
|
||||
class TaskExecutor(glance.async.TaskExecutor):
|
||||
|
||||
def __init__(self, context, task_repo, image_repo, image_factory):
|
||||
@ -101,15 +82,43 @@ class TaskExecutor(glance.async.TaskExecutor):
|
||||
else:
|
||||
yield futures.ThreadPoolExecutor(max_workers=max_workers)
|
||||
|
||||
def _get_flow(self, task):
|
||||
try:
|
||||
task_input = script_utils.unpack_task_input(task)
|
||||
uri = script_utils.validate_location_uri(
|
||||
task_input.get('import_from'))
|
||||
|
||||
kwds = {
|
||||
'uri': uri,
|
||||
'task_id': task.task_id,
|
||||
'task_type': task.type,
|
||||
'context': self.context,
|
||||
'task_repo': self.task_repo,
|
||||
'image_repo': self.image_repo,
|
||||
'image_factory': self.image_factory
|
||||
}
|
||||
|
||||
return driver.DriverManager('glance.flows', task.type,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds=kwds).driver
|
||||
except RuntimeError:
|
||||
raise NotImplementedError()
|
||||
|
||||
def _run(self, task_id, task_type):
|
||||
LOG.info(_LI('Taskflow executor picked up the execution of task ID '
|
||||
'%(task_id)s of task type '
|
||||
'%(task_type)s') % {'task_id': task_id,
|
||||
'task_type': task_type})
|
||||
flow = lf.Flow(task_type).add(
|
||||
_Task(task_id, task_type, self.context, self.task_repo,
|
||||
self.image_repo, self.image_factory)
|
||||
)
|
||||
LOG.debug('Taskflow executor picked up the execution of task ID '
|
||||
'%(task_id)s of task type '
|
||||
'%(task_type)s' % {'task_id': task_id,
|
||||
'task_type': task_type})
|
||||
|
||||
task = script_utils.get_task(self.task_repo, task_id)
|
||||
if task is None:
|
||||
# NOTE: This happens if task is not found in the database. In
|
||||
# such cases, there is no way to update the task status so,
|
||||
# it's ignored here.
|
||||
return
|
||||
|
||||
flow = self._get_flow(task)
|
||||
|
||||
try:
|
||||
with self._executor() as executor:
|
||||
engine = engines.load(flow, self.engine_conf,
|
||||
|
66
glance/async/utils.py
Normal file
66
glance/async/utils.py
Normal file
@ -0,0 +1,66 @@
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from taskflow import task
|
||||
|
||||
from glance import i18n
|
||||
import glance.openstack.common.log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
_LW = i18n._LW
|
||||
|
||||
|
||||
class OptionalTask(task.Task):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(OptionalTask, self).__init__(*args, **kwargs)
|
||||
self.execute = self._catch_all(self.execute)
|
||||
|
||||
def _catch_all(self, func):
|
||||
# NOTE(flaper87): Read this comment before calling the MI6
|
||||
# Here's the thing, there's no nice way to define "optional"
|
||||
# tasks. That is, tasks whose failure shouldn't affect the execution
|
||||
# of the flow. The only current "sane" way to do this, is by catching
|
||||
# everything and logging. This seems harmless from a taskflow
|
||||
# perspective but it is not. There are some issues related to this
|
||||
# "workaround":
|
||||
#
|
||||
# - Task's states will shamelessly lie to us saying the task succeeded.
|
||||
#
|
||||
# - No revert procedure will be triggered, which means optional tasks,
|
||||
# for now, mustn't cause any side-effects because they won't be able to
|
||||
# clean them up. If these tasks depend on other task that do cause side
|
||||
# effects, a task that cleans those side effects most be registered as
|
||||
# well. For example, _ImportToFS, _MyDumbTask, _DeleteFromFS.
|
||||
#
|
||||
# - Ideally, optional tasks shouldn't `provide` new values unless they
|
||||
# are part of an optional flow. Due to the decoration of the execute
|
||||
# method, these tasks will need to define the provided methods at
|
||||
# class level using `default_provides`.
|
||||
#
|
||||
#
|
||||
# The taskflow team is working on improving this and on something that
|
||||
# will provide the ability of defining optional tasks. For now, to lie
|
||||
# ourselves we must.
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as exc:
|
||||
msg = (_LW("An optional task has failed, "
|
||||
"the failure was: %s") %
|
||||
exc.message)
|
||||
LOG.warn(msg)
|
||||
return wrapper
|
@ -70,6 +70,26 @@ task_opts = [
|
||||
default='taskflow',
|
||||
help=_("Specifies which task executor to be used to run the "
|
||||
"task scripts.")),
|
||||
cfg.StrOpt('work_dir',
|
||||
default=None,
|
||||
help=_('Work dir for asynchronous task operations. '
|
||||
'The directory set here will be used to operate over '
|
||||
'images - normally before they are imported in the '
|
||||
'destination store. When providing work dir, make sure '
|
||||
'enough space is provided for concurrent tasks to run '
|
||||
'efficiently without running out of space. A rough '
|
||||
'estimation can be done by multiplying the number of '
|
||||
'`max_workers` - or the N of workers running - by an '
|
||||
'average image size (e.g 500MB). The image size '
|
||||
'estimation should be done based on the average size in '
|
||||
'your deployment. Note that depending on the tasks '
|
||||
'running you may need to multiply this number by some '
|
||||
'factor depending on what the task does. For example, '
|
||||
'you may want to double the available size if image '
|
||||
'conversion is enabled. All this being said, remember '
|
||||
'these are just estimations and you should do them '
|
||||
'based on the worst case scenario and be prepared to '
|
||||
'act in case they were wrong.')),
|
||||
]
|
||||
manage_opts = [
|
||||
cfg.BoolOpt('db_enforce_mysql_charset',
|
||||
|
@ -324,6 +324,10 @@ class TaskException(GlanceException):
|
||||
message = _("An unknown task exception occurred")
|
||||
|
||||
|
||||
class BadTaskConfiguration(GlanceException):
|
||||
message = _("Task was not configured properly")
|
||||
|
||||
|
||||
class TaskNotFound(TaskException, NotFound):
|
||||
message = _("Task with the given id %(task_id)s was not found")
|
||||
|
||||
|
@ -111,10 +111,26 @@ def validate_location_uri(location):
|
||||
|
||||
|
||||
def get_image_data_iter(uri):
|
||||
"""The scripts are expected to support only over non-local locations of
|
||||
data. Note the absence of file:// for security reasons, see LP bug #942118.
|
||||
If the above constraint is violated, task should fail.
|
||||
"""Returns iterable object either for local file or uri
|
||||
|
||||
:param uri: uri (remote or local) to the datasource we want to iterate
|
||||
|
||||
Validation/sanitization of the uri is expected to happen before we get
|
||||
here.
|
||||
"""
|
||||
# NOTE: Current script supports http location. Other locations
|
||||
# types are to be supported as the script evolve.
|
||||
# NOTE(flaper87): This is safe because the input uri is already
|
||||
# verified before the task is created.
|
||||
if uri.startswith("file://"):
|
||||
uri = uri.split("file://")[-1]
|
||||
# NOTE(flaper87): The caller of this function expects to have
|
||||
# an iterable object. FileObjects in python are iterable, therefore
|
||||
# we are returning it as is.
|
||||
# The file descriptor will be eventually cleaned up by the garbage
|
||||
# collector once its ref-count is dropped to 0. That is, when there
|
||||
# wont be any references pointing to this file.
|
||||
#
|
||||
# We're not using StringIO or other tools to avoid reading everything
|
||||
# into memory. Some images may be quite heavy.
|
||||
return open(uri, "r")
|
||||
|
||||
return urllib2.urlopen(uri)
|
||||
|
@ -30,6 +30,7 @@ import glance.openstack.common.log as logging
|
||||
|
||||
_ = i18n._
|
||||
_LE = i18n._LE
|
||||
_LW = i18n._LW
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('task_executor', 'glance.common.config', group='task')
|
||||
@ -453,6 +454,7 @@ class TaskFactory(object):
|
||||
|
||||
|
||||
class TaskExecutorFactory(object):
|
||||
eventlet_deprecation_warned = False
|
||||
|
||||
def __init__(self, task_repo, image_repo, image_factory):
|
||||
self.task_repo = task_repo
|
||||
@ -467,6 +469,13 @@ class TaskExecutorFactory(object):
|
||||
# executor.
|
||||
task_executor = CONF.task.task_executor
|
||||
if task_executor == 'eventlet':
|
||||
# NOTE(jokke): Making sure we do not log the deprecation
|
||||
# warning 1000 times or anything crazy like that.
|
||||
if not TaskExecutorFactory.eventlet_deprecation_warned:
|
||||
msg = _LW("The `eventlet` executor has been deprecated. "
|
||||
"Use `taskflow` instead.")
|
||||
LOG.warn(msg)
|
||||
TaskExecutorFactory.eventlet_deprecation_warned = True
|
||||
task_executor = 'taskflow'
|
||||
|
||||
executor_cls = ('glance.async.%s_executor.'
|
||||
|
0
glance/tests/unit/async/flows/__init__.py
Normal file
0
glance/tests/unit/async/flows/__init__.py
Normal file
308
glance/tests/unit/async/flows/test_import.py
Normal file
308
glance/tests/unit/async/flows/test_import.py
Normal file
@ -0,0 +1,308 @@
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import os
|
||||
import urllib2
|
||||
|
||||
import glance_store
|
||||
from oslo_config import cfg
|
||||
from six.moves import cStringIO
|
||||
from taskflow import task
|
||||
|
||||
import glance.async.flows.base_import as import_flow
|
||||
from glance.async import taskflow_executor
|
||||
from glance.common.scripts.image_import import main as image_import
|
||||
from glance.common.scripts import utils as script_utils
|
||||
from glance.common import utils
|
||||
from glance import domain
|
||||
from glance import gateway
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
|
||||
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
|
||||
|
||||
|
||||
class _ErrorTask(task.Task):
|
||||
|
||||
def execute(self):
|
||||
raise RuntimeError()
|
||||
|
||||
|
||||
class TestImportTask(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestImportTask, self).setUp()
|
||||
|
||||
glance_store.register_opts(CONF)
|
||||
self.config(default_store='file',
|
||||
stores=['file', 'http'],
|
||||
filesystem_store_datadir=self.test_dir,
|
||||
group="glance_store")
|
||||
glance_store.create_stores(CONF)
|
||||
|
||||
self.work_dir = os.path.join(self.test_dir, 'work_dir')
|
||||
utils.safe_mkdirs(self.work_dir)
|
||||
self.config(work_dir=self.work_dir, group='task')
|
||||
|
||||
self.context = mock.MagicMock()
|
||||
self.img_repo = mock.MagicMock()
|
||||
self.task_repo = mock.MagicMock()
|
||||
|
||||
self.gateway = gateway.Gateway()
|
||||
self.task_factory = domain.TaskFactory()
|
||||
self.img_factory = self.gateway.get_image_factory(self.context)
|
||||
self.image = self.img_factory.new_image(image_id=UUID1,
|
||||
disk_format='qcow2',
|
||||
container_format='bare')
|
||||
|
||||
task_input = {
|
||||
"import_from": "http://cloud.foo/image.qcow2",
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": {'disk_format': 'qcow2',
|
||||
'container_format': 'bare'}
|
||||
}
|
||||
task_ttl = CONF.task.task_time_to_live
|
||||
|
||||
self.task_type = 'import'
|
||||
self.task = self.task_factory.new_task(self.task_type, TENANT1,
|
||||
task_time_to_live=task_ttl,
|
||||
task_input=task_input)
|
||||
|
||||
def test_import_flow(self):
|
||||
self.config(engine_mode='serial',
|
||||
group='taskflow_executor')
|
||||
|
||||
img_factory = mock.MagicMock()
|
||||
|
||||
executor = taskflow_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.img_repo,
|
||||
img_factory)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
|
||||
def create_image(*args, **kwargs):
|
||||
kwargs['image_id'] = UUID1
|
||||
return self.img_factory.new_image(*args, **kwargs)
|
||||
|
||||
self.img_repo.get.return_value = self.image
|
||||
img_factory.new_image.side_effect = create_image
|
||||
|
||||
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
|
||||
dmock.return_value = cStringIO("TEST_IMAGE")
|
||||
executor.begin_processing(self.task.task_id)
|
||||
image_path = os.path.join(self.test_dir, self.image.image_id)
|
||||
tmp_image_path = os.path.join(self.work_dir,
|
||||
"%s.tasks_import" % image_path)
|
||||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
self.assertTrue(os.path.exists(image_path))
|
||||
|
||||
def test_import_flow_missing_work_dir(self):
|
||||
self.config(engine_mode='serial', group='taskflow_executor')
|
||||
self.config(work_dir=None, group='task')
|
||||
|
||||
img_factory = mock.MagicMock()
|
||||
|
||||
executor = taskflow_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.img_repo,
|
||||
img_factory)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
|
||||
def create_image(*args, **kwargs):
|
||||
kwargs['image_id'] = UUID1
|
||||
return self.img_factory.new_image(*args, **kwargs)
|
||||
|
||||
self.img_repo.get.return_value = self.image
|
||||
img_factory.new_image.side_effect = create_image
|
||||
|
||||
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
|
||||
dmock.return_value = cStringIO("TEST_IMAGE")
|
||||
|
||||
with mock.patch.object(import_flow._ImportToFS, 'execute') as emk:
|
||||
executor.begin_processing(self.task.task_id)
|
||||
self.assertFalse(emk.called)
|
||||
|
||||
image_path = os.path.join(self.test_dir, self.image.image_id)
|
||||
tmp_image_path = os.path.join(self.work_dir,
|
||||
"%s.tasks_import" % image_path)
|
||||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
self.assertTrue(os.path.exists(image_path))
|
||||
|
||||
def test_import_flow_revert(self):
|
||||
self.config(engine_mode='serial',
|
||||
group='taskflow_executor')
|
||||
|
||||
img_factory = mock.MagicMock()
|
||||
|
||||
executor = taskflow_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.img_repo,
|
||||
img_factory)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
|
||||
def create_image(*args, **kwargs):
|
||||
kwargs['image_id'] = UUID1
|
||||
return self.img_factory.new_image(*args, **kwargs)
|
||||
|
||||
self.img_repo.get.return_value = self.image
|
||||
img_factory.new_image.side_effect = create_image
|
||||
|
||||
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
|
||||
dmock.return_value = cStringIO("TEST_IMAGE")
|
||||
|
||||
with mock.patch.object(import_flow, "_get_import_flows") as imock:
|
||||
imock.return_value = (x for x in [_ErrorTask()])
|
||||
self.assertRaises(RuntimeError,
|
||||
executor.begin_processing, self.task.task_id)
|
||||
image_path = os.path.join(self.test_dir, self.image.image_id)
|
||||
tmp_image_path = os.path.join(self.work_dir,
|
||||
"%s.tasks_import" % image_path)
|
||||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
|
||||
# NOTE(flaper87): Eventually, we want this to be assertTrue.
|
||||
# The current issue is there's no way to tell taskflow to
|
||||
# continue on failures. That is, revert the subflow but keep
|
||||
# executing the parent flow. Under discussion/development.
|
||||
self.assertFalse(os.path.exists(image_path))
|
||||
|
||||
def test_import_flow_no_import_flows(self):
|
||||
self.config(engine_mode='serial',
|
||||
group='taskflow_executor')
|
||||
|
||||
img_factory = mock.MagicMock()
|
||||
|
||||
executor = taskflow_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
self.img_repo,
|
||||
img_factory)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
|
||||
def create_image(*args, **kwargs):
|
||||
kwargs['image_id'] = UUID1
|
||||
return self.img_factory.new_image(*args, **kwargs)
|
||||
|
||||
self.img_repo.get.return_value = self.image
|
||||
img_factory.new_image.side_effect = create_image
|
||||
|
||||
with mock.patch.object(urllib2, 'urlopen') as umock:
|
||||
content = "TEST_IMAGE"
|
||||
umock.return_value = cStringIO(content)
|
||||
|
||||
with mock.patch.object(import_flow, "_get_import_flows") as imock:
|
||||
imock.return_value = (x for x in [])
|
||||
executor.begin_processing(self.task.task_id)
|
||||
image_path = os.path.join(self.test_dir, self.image.image_id)
|
||||
tmp_image_path = os.path.join(self.work_dir,
|
||||
"%s.tasks_import" % image_path)
|
||||
self.assertFalse(os.path.exists(tmp_image_path))
|
||||
self.assertTrue(os.path.exists(image_path))
|
||||
umock.assert_called_once()
|
||||
|
||||
with open(image_path) as ifile:
|
||||
self.assertEqual(content, ifile.read())
|
||||
|
||||
def test_create_image(self):
|
||||
image_create = import_flow._CreateImage(self.task.task_id,
|
||||
self.task_type,
|
||||
self.task_repo,
|
||||
self.img_repo,
|
||||
self.img_factory)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
with mock.patch.object(image_import, 'create_image') as ci_mock:
|
||||
ci_mock.return_value = mock.Mock()
|
||||
image_create.execute()
|
||||
|
||||
ci_mock.assert_called_once_with(self.img_repo,
|
||||
self.img_factory,
|
||||
{'container_format': 'bare',
|
||||
'disk_format': 'qcow2'},
|
||||
self.task.task_id)
|
||||
|
||||
def test_save_image(self):
|
||||
save_image = import_flow._SaveImage(self.task.task_id,
|
||||
self.task_type,
|
||||
self.img_repo)
|
||||
|
||||
with mock.patch.object(self.img_repo, 'get') as get_mock:
|
||||
image_id = mock.sentinel.image_id
|
||||
image = mock.MagicMock(image_id=image_id, status='saving')
|
||||
get_mock.return_value = image
|
||||
|
||||
with mock.patch.object(self.img_repo, 'save') as save_mock:
|
||||
save_image.execute(image.image_id)
|
||||
get_mock.assert_called_once_with(image_id)
|
||||
save_mock.assert_called_once_with(image)
|
||||
self.assertEqual('active', image.status)
|
||||
|
||||
def test_import_to_fs(self):
|
||||
import_fs = import_flow._ImportToFS(self.task.task_id,
|
||||
self.task_type,
|
||||
self.task_repo,
|
||||
'http://example.com/image.qcow2')
|
||||
|
||||
with mock.patch.object(script_utils, 'get_image_data_iter') as dmock:
|
||||
dmock.return_value = "test"
|
||||
|
||||
image_id = UUID1
|
||||
path = import_fs.execute(image_id)
|
||||
reader, size = glance_store.get_from_backend(path)
|
||||
self.assertEqual(4, size)
|
||||
self.assertEqual(dmock.return_value, "".join(reader))
|
||||
|
||||
image_path = os.path.join(self.work_dir, image_id)
|
||||
tmp_image_path = os.path.join(self.work_dir,
|
||||
"%s.tasks_import" % image_path)
|
||||
self.assertTrue(os.path.exists(tmp_image_path))
|
||||
|
||||
def test_delete_from_fs(self):
|
||||
delete_fs = import_flow._DeleteFromFS(self.task.task_id,
|
||||
self.task_type)
|
||||
|
||||
data = "test"
|
||||
|
||||
store = glance_store.get_store_from_scheme('file')
|
||||
path = glance_store.store_add_to_backend(mock.sentinel.image_id, data,
|
||||
mock.sentinel.image_size,
|
||||
store, context=None)[0]
|
||||
|
||||
path_wo_scheme = path.split("file://")[1]
|
||||
self.assertTrue(os.path.exists(path_wo_scheme))
|
||||
delete_fs.execute(path)
|
||||
self.assertFalse(os.path.exists(path_wo_scheme))
|
||||
|
||||
def test_complete_task(self):
|
||||
complete_task = import_flow._CompleteTask(self.task.task_id,
|
||||
self.task_type,
|
||||
self.task_repo)
|
||||
|
||||
image_id = mock.sentinel.image_id
|
||||
image = mock.MagicMock(image_id=image_id)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
with mock.patch.object(self.task, 'succeed') as succeed:
|
||||
complete_task.execute(image.image_id)
|
||||
succeed.assert_called_once_with({'image_id': image_id})
|
111
glance/tests/unit/async/flows/test_introspect.py
Normal file
111
glance/tests/unit/async/flows/test_introspect.py
Normal file
@ -0,0 +1,111 @@
|
||||
# Copyright 2015 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import mock
|
||||
|
||||
import glance_store
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
|
||||
from glance.async.flows import introspect
|
||||
from glance import domain
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
|
||||
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
|
||||
|
||||
|
||||
class TestImportTask(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestImportTask, self).setUp()
|
||||
self.task_factory = domain.TaskFactory()
|
||||
task_input = {
|
||||
"import_from": "http://cloud.foo/image.qcow2",
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": mock.sentinel.image_properties
|
||||
}
|
||||
task_ttl = CONF.task.task_time_to_live
|
||||
|
||||
self.task_type = 'import'
|
||||
self.task = self.task_factory.new_task(self.task_type, TENANT1,
|
||||
task_time_to_live=task_ttl,
|
||||
task_input=task_input)
|
||||
|
||||
self.context = mock.Mock()
|
||||
self.img_repo = mock.Mock()
|
||||
self.task_repo = mock.Mock()
|
||||
self.img_factory = mock.Mock()
|
||||
|
||||
glance_store.register_opts(CONF)
|
||||
self.config(default_store='file',
|
||||
stores=['file', 'http'],
|
||||
filesystem_store_datadir=self.test_dir,
|
||||
group="glance_store")
|
||||
glance_store.create_stores(CONF)
|
||||
|
||||
def test_introspect_success(self):
|
||||
image_create = introspect._Introspect(self.task.task_id,
|
||||
self.task_type,
|
||||
self.img_repo)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
image_id = mock.sentinel.image_id
|
||||
image = mock.MagicMock(image_id=image_id)
|
||||
self.img_repo.get.return_value = image
|
||||
|
||||
with mock.patch.object(processutils, 'execute') as exc_mock:
|
||||
result = json.dumps({
|
||||
"virtual-size": 10737418240,
|
||||
"filename": "/tmp/image.qcow2",
|
||||
"cluster-size": 65536,
|
||||
"format": "qcow2",
|
||||
"actual-size": 373030912,
|
||||
"format-specific": {
|
||||
"type": "qcow2",
|
||||
"data": {
|
||||
"compat": "0.10"
|
||||
}
|
||||
},
|
||||
"dirty-flag": False
|
||||
})
|
||||
|
||||
exc_mock.return_value = (result, None)
|
||||
image_create.execute(image, '/test/path.qcow2')
|
||||
self.assertEqual(10737418240, image.virtual_size)
|
||||
|
||||
def test_introspect_no_image(self):
|
||||
image_create = introspect._Introspect(self.task.task_id,
|
||||
self.task_type,
|
||||
self.img_repo)
|
||||
|
||||
self.task_repo.get.return_value = self.task
|
||||
image_id = mock.sentinel.image_id
|
||||
image = mock.MagicMock(image_id=image_id, virtual_size=None)
|
||||
self.img_repo.get.return_value = image
|
||||
|
||||
# NOTE(flaper87): Don't mock, test the error.
|
||||
with mock.patch.object(processutils, 'execute') as exc_mock:
|
||||
exc_mock.return_value = (None, "some error")
|
||||
# NOTE(flaper87): Pls, read the `OptionalTask._catch_all`
|
||||
# docs to know why this is commented.
|
||||
# self.assertRaises(RuntimeError,
|
||||
# image_create.execute,
|
||||
# image, '/test/path.qcow2')
|
||||
image_create.execute(image, '/test/path.qcow2')
|
||||
self.assertIsNone(image.virtual_size)
|
@ -15,18 +15,53 @@
|
||||
|
||||
import mock
|
||||
|
||||
import glance_store
|
||||
from oslo.config import cfg
|
||||
from taskflow import engines
|
||||
|
||||
from glance.async import taskflow_executor
|
||||
from glance import domain
|
||||
import glance.tests.utils as test_utils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
|
||||
|
||||
|
||||
class TestTaskExecutor(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestTaskExecutor, self).setUp()
|
||||
|
||||
glance_store.register_opts(CONF)
|
||||
self.config(default_store='file',
|
||||
stores=['file', 'http'],
|
||||
filesystem_store_datadir=self.test_dir,
|
||||
group="glance_store")
|
||||
glance_store.create_stores(CONF)
|
||||
|
||||
self.config(engine_mode='serial',
|
||||
group='taskflow_executor')
|
||||
|
||||
self.context = mock.Mock()
|
||||
self.task_repo = mock.Mock()
|
||||
self.image_repo = mock.Mock()
|
||||
self.image_factory = mock.Mock()
|
||||
|
||||
task_input = {
|
||||
"import_from": "http://cloud.foo/image.qcow2",
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": {'disk_format': 'qcow2',
|
||||
'container_format': 'bare'}
|
||||
}
|
||||
task_ttl = CONF.task.task_time_to_live
|
||||
|
||||
self.task_type = 'import'
|
||||
self.task_factory = domain.TaskFactory()
|
||||
self.task = self.task_factory.new_task(self.task_type, TENANT1,
|
||||
task_time_to_live=task_ttl,
|
||||
task_input=task_input)
|
||||
|
||||
self.executor = taskflow_executor.TaskExecutor(
|
||||
self.context,
|
||||
self.task_repo,
|
||||
@ -34,14 +69,12 @@ class TestTaskExecutor(test_utils.BaseTestCase):
|
||||
self.image_factory)
|
||||
|
||||
def test_begin_processing(self):
|
||||
task_id = mock.ANY
|
||||
task = mock.Mock()
|
||||
task.type = mock.ANY
|
||||
|
||||
with mock.patch.object(taskflow_executor.TaskExecutor,
|
||||
'_run') as mock_run:
|
||||
self.task_repo.get.return_value = task
|
||||
self.executor.begin_processing(task_id)
|
||||
with mock.patch.object(engines, 'load') as load_mock:
|
||||
engine = mock.Mock()
|
||||
load_mock.return_value = engine
|
||||
self.task_repo.get.return_value = self.task
|
||||
self.executor.begin_processing(self.task.task_id)
|
||||
|
||||
# assert the call
|
||||
mock_run.assert_called_once_with(task_id, task.type)
|
||||
load_mock.assert_called_once()
|
||||
engine.assert_called_once()
|
||||
|
@ -584,7 +584,7 @@ class TestTaskRepo(test_utils.BaseTestCase):
|
||||
self.task_factory = glance.domain.TaskFactory()
|
||||
self.fake_task_input = ('{"import_from": '
|
||||
'"swift://cloud.foo/account/mycontainer/path"'
|
||||
',"image_from_format": "qcow2"}')
|
||||
',"import_from_format": "qcow2"}')
|
||||
self._create_tasks()
|
||||
|
||||
def _create_tasks(self):
|
||||
|
@ -140,6 +140,7 @@ class OptsTestCase(utils.BaseTestCase):
|
||||
'disk_formats',
|
||||
'task_time_to_live',
|
||||
'task_executor',
|
||||
'work_dir',
|
||||
'store_type_preference',
|
||||
'flavor',
|
||||
'config_file',
|
||||
|
@ -297,7 +297,8 @@ class TestTasksController(test_utils.BaseTestCase):
|
||||
"type": "import",
|
||||
"input": {
|
||||
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
|
||||
"image_from_format": "qcow2"
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": {}
|
||||
}
|
||||
}
|
||||
new_task = mock.Mock()
|
||||
@ -316,17 +317,21 @@ class TestTasksController(test_utils.BaseTestCase):
|
||||
mock_get_task_executor_factory.new_task_exector.assert_called_once()
|
||||
mock_get_task_factory.new_task.run.assert_called_once()
|
||||
|
||||
def test_notifications_on_create(self):
|
||||
@mock.patch.object(glance.gateway.Gateway, 'get_task_factory')
|
||||
def test_notifications_on_create(self, mock_get_task_factory):
|
||||
request = unit_test_utils.get_fake_request()
|
||||
|
||||
new_task = mock.MagicMock(type='import')
|
||||
mock_get_task_factory.new_task.return_value = new_task
|
||||
new_task.run.return_value = mock.ANY
|
||||
|
||||
task = {"type": "import", "input": {
|
||||
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
|
||||
"image_from_format": "qcow2"}
|
||||
"import_from": "http://cloud.foo/myaccount/mycontainer/path",
|
||||
"import_from_format": "qcow2",
|
||||
"image_properties": {}
|
||||
}
|
||||
}
|
||||
task = self.controller.create(request, task=task)
|
||||
self.assertEqual('import', task.type)
|
||||
self.assertEqual({
|
||||
"import_from": "swift://cloud.foo/myaccount/mycontainer/path",
|
||||
"image_from_format": "qcow2"}, task.task_input)
|
||||
output_logs = [nlog for nlog in self.notifier.get_logs()
|
||||
if nlog['event_type'] == 'task.create']
|
||||
self.assertEqual(1, len(output_logs))
|
||||
|
@ -47,6 +47,12 @@ glance.database.migration_backend =
|
||||
glance.database.metadata_backend =
|
||||
sqlalchemy = glance.db.sqlalchemy.metadata
|
||||
|
||||
glance.flows =
|
||||
import = glance.async.flows.base_import:get_flow
|
||||
|
||||
glance.flows.import =
|
||||
introspect = glance.async.flows.introspect:get_flow
|
||||
|
||||
[build_sphinx]
|
||||
all_files = 1
|
||||
build-dir = doc/build
|
||||
|
Loading…
x
Reference in New Issue
Block a user