# Copyright 2015 OpenStack Foundation # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import json import logging import os import glance_store as store_api from glance_store import backend from oslo_concurrency import processutils as putils from oslo_config import cfg from oslo_utils import encodeutils from oslo_utils import excutils import six from stevedore import named from taskflow.patterns import linear_flow as lf from taskflow import retry from taskflow import task from taskflow.types import failure from glance.common import exception from glance.common.scripts.image_import import main as image_import from glance.common.scripts import utils as script_utils from glance.i18n import _, _LE, _LI LOG = logging.getLogger(__name__) CONF = cfg.CONF class _CreateImage(task.Task): default_provides = 'image_id' def __init__(self, task_id, task_type, task_repo, image_repo, image_factory): self.task_id = task_id self.task_type = task_type self.task_repo = task_repo self.image_repo = image_repo self.image_factory = image_factory super(_CreateImage, self).__init__( name='%s-CreateImage-%s' % (task_type, task_id)) def execute(self): task = script_utils.get_task(self.task_repo, self.task_id) if task is None: return task_input = script_utils.unpack_task_input(task) image = image_import.create_image( self.image_repo, self.image_factory, task_input.get('image_properties'), self.task_id) LOG.debug("Task %(task_id)s created image %(image_id)s", {'task_id': task.task_id, 'image_id': image.image_id}) return image.image_id def revert(self, *args, **kwargs): # TODO(flaper87): Define the revert rules for images on failures. # Deleting the image may not be what we want since users could upload # the image data in a separate step. However, it really depends on # when the failure happened. I guess we should check if data has been # written, although at that point failures are (should be) unexpected, # at least image-workflow wise. pass class _ImportToFS(task.Task): default_provides = 'file_path' def __init__(self, task_id, task_type, task_repo, uri): self.task_id = task_id self.task_type = task_type self.task_repo = task_repo self.uri = uri super(_ImportToFS, self).__init__( name='%s-ImportToFS-%s' % (task_type, task_id)) if CONF.task.work_dir is None: msg = (_("%(task_id)s of %(task_type)s not configured " "properly. Missing work dir: %(work_dir)s") % {'task_id': self.task_id, 'task_type': self.task_type, 'work_dir': CONF.task.work_dir}) raise exception.BadTaskConfiguration(msg) self.store = self._build_store() def _build_store(self): # NOTE(flaper87): Due to the nice glance_store api (#sarcasm), we're # forced to build our own config object, register the required options # (and by required I mean *ALL* of them, even the ones we don't want), # and create our own store instance by calling a private function. # This is certainly unfortunate but it's the best we can do until the # glance_store refactor is done. A good thing is that glance_store is # under our team's management and it gates on Glance so changes to # this API will (should?) break task's tests. conf = cfg.ConfigOpts() backend.register_opts(conf) conf.set_override('filesystem_store_datadir', CONF.task.work_dir, group='glance_store', enforce_type=True) # NOTE(flaper87): Do not even try to judge me for this... :( # With the glance_store refactor, this code will change, until # that happens, we don't have a better option and this is the # least worst one, IMHO. store = backend._load_store(conf, 'file') if store is None: msg = (_("%(task_id)s of %(task_type)s not configured " "properly. Could not load the filesystem store") % {'task_id': self.task_id, 'task_type': self.task_type}) raise exception.BadTaskConfiguration(msg) store.configure() return store def execute(self, image_id): """Create temp file into store and return path to it :param image_id: Glance Image ID """ # NOTE(flaper87): We've decided to use a separate `work_dir` for # this task - and tasks coming after this one - as a way to expect # users to configure a local store for pre-import works on the image # to happen. # # While using any path should be "technically" fine, it's not what # we recommend as the best solution. For more details on this, please # refer to the comment in the `_ImportToStore.execute` method. data = script_utils.get_image_data_iter(self.uri) path = self.store.add(image_id, data, 0, context=None)[0] try: # NOTE(flaper87): Consider moving this code to a common # place that other tasks can consume as well. stdout, stderr = putils.trycmd('qemu-img', 'info', '--output=json', path, log_errors=putils.LOG_ALL_ERRORS) except OSError as exc: with excutils.save_and_reraise_exception(): msg = (_LE('Failed to execute security checks on the image ' '%(task_id)s: %(exc)s') % {'task_id': self.task_id, 'exc': exc.message}) LOG.error(msg) metadata = json.loads(stdout) backing_file = metadata.get('backing-filename') if backing_file is not None: msg = _("File %(path)s has invalid backing file " "%(bfile)s, aborting.") % {'path': path, 'bfile': backing_file} raise RuntimeError(msg) return path def revert(self, image_id, result, **kwargs): if isinstance(result, failure.Failure): LOG.exception(_LE('Task: %(task_id)s failed to import image ' '%(image_id)s to the filesystem.') % {'task_id': self.task_id, 'image_id': image_id}) return if os.path.exists(result.split("file://")[-1]): store_api.delete_from_backend(result) class _DeleteFromFS(task.Task): def __init__(self, task_id, task_type): self.task_id = task_id self.task_type = task_type super(_DeleteFromFS, self).__init__( name='%s-DeleteFromFS-%s' % (task_type, task_id)) def execute(self, file_path): """Remove file from the backend :param file_path: path to the file being deleted """ store_api.delete_from_backend(file_path) class _ImportToStore(task.Task): def __init__(self, task_id, task_type, image_repo, uri): self.task_id = task_id self.task_type = task_type self.image_repo = image_repo self.uri = uri super(_ImportToStore, self).__init__( name='%s-ImportToStore-%s' % (task_type, task_id)) def execute(self, image_id, file_path=None): """Bringing the introspected image to back end store :param image_id: Glance Image ID :param file_path: path to the image file """ # NOTE(flaper87): There are a couple of interesting bits in the # interaction between this task and the `_ImportToFS` one. I'll try # to cover them in this comment. # # NOTE(flaper87): # `_ImportToFS` downloads the image to a dedicated `work_dir` which # needs to be configured in advance (please refer to the config option # docs for more info). The motivation behind this is also explained in # the `_ImportToFS.execute` method. # # Due to the fact that we have an `_ImportToFS` task which downloads # the image data already, we need to be as smart as we can in this task # to avoid downloading the data several times and reducing the copy or # write times. There are several scenarios where the interaction # between this task and `_ImportToFS` could be improved. All these # scenarios assume the `_ImportToFS` task has been executed before # and/or in a more abstract scenario, that `file_path` is being # provided. # # Scenario 1: FS Store is Remote, introspection enabled, # conversion disabled # # In this scenario, the user would benefit from having the scratch path # being the same path as the fs store. Only one write would happen and # an extra read will happen in order to introspect the image. Note that # this read is just for the image headers and not the entire file. # # Scenario 2: FS Store is remote, introspection enabled, # conversion enabled # # In this scenario, the user would benefit from having a *local* store # into which the image can be converted. This will require downloading # the image locally, converting it and then copying the converted image # to the remote store. # # Scenario 3: FS Store is local, introspection enabled, # conversion disabled # Scenario 4: FS Store is local, introspection enabled, # conversion enabled # # In both these scenarios the user shouldn't care if the FS # store path and the work dir are the same, therefore probably # benefit, about the scratch path and the FS store being the # same from a performance perspective. Space wise, regardless # of the scenario, the user will have to account for it in # advance. # # Lets get to it and identify the different scenarios in the # implementation image = self.image_repo.get(image_id) image.status = 'saving' self.image_repo.save(image) # NOTE(flaper87): Let's dance... and fall # # Unfortunatelly, because of the way our domain layers work and # the checks done in the FS store, we can't simply rename the file # and set the location. To do that, we'd have to duplicate the logic # of every and each of the domain factories (quota, location, etc) # and we'd also need to hack the FS store to prevent it from raising # a "duplication path" error. I'd rather have this task copying the # image bits one more time than duplicating all that logic. # # Since I don't think this should be the definitive solution, I'm # leaving the code below as a reference for what should happen here # once the FS store and domain code will be able to handle this case. # # if file_path is None: # image_import.set_image_data(image, self.uri, None) # return # NOTE(flaper87): Don't assume the image was stored in the # work_dir. Think in the case this path was provided by another task. # Also, lets try to neither assume things nor create "logic" # dependencies between this task and `_ImportToFS` # # base_path = os.path.dirname(file_path.split("file://")[-1]) # NOTE(flaper87): Hopefully just scenarios #3 and #4. I say # hopefully because nothing prevents the user to use the same # FS store path as a work dir # # image_path = os.path.join(base_path, image_id) # # if (base_path == CONF.glance_store.filesystem_store_datadir or # base_path in CONF.glance_store.filesystem_store_datadirs): # os.rename(file_path, image_path) # # image_import.set_image_data(image, image_path, None) image_import.set_image_data(image, file_path or self.uri, self.task_id) # NOTE(flaper87): We need to save the image again after the locations # have been set in the image. self.image_repo.save(image) class _SaveImage(task.Task): def __init__(self, task_id, task_type, image_repo): self.task_id = task_id self.task_type = task_type self.image_repo = image_repo super(_SaveImage, self).__init__( name='%s-SaveImage-%s' % (task_type, task_id)) def execute(self, image_id): """Transition image status to active :param image_id: Glance Image ID """ new_image = self.image_repo.get(image_id) if new_image.status == 'saving': # NOTE(flaper87): THIS IS WRONG! # we should be doing atomic updates to avoid # race conditions. This happens in other places # too. new_image.status = 'active' self.image_repo.save(new_image) class _CompleteTask(task.Task): def __init__(self, task_id, task_type, task_repo): self.task_id = task_id self.task_type = task_type self.task_repo = task_repo super(_CompleteTask, self).__init__( name='%s-CompleteTask-%s' % (task_type, task_id)) def execute(self, image_id): """Finishing the task flow :param image_id: Glance Image ID """ task = script_utils.get_task(self.task_repo, self.task_id) if task is None: return try: task.succeed({'image_id': image_id}) except Exception as e: # Note: The message string contains Error in it to indicate # in the task.message that it's a error message for the user. # TODO(nikhil): need to bring back save_and_reraise_exception when # necessary err_msg = ("Error: " + six.text_type(type(e)) + ': ' + encodeutils.exception_to_unicode(e)) log_msg = err_msg + _LE("Task ID %s") % task.task_id LOG.exception(log_msg) task.fail(err_msg) finally: self.task_repo.save(task) LOG.info(_LI("%(task_id)s of %(task_type)s completed"), {'task_id': self.task_id, 'task_type': self.task_type}) def _get_import_flows(**kwargs): # NOTE(flaper87): Until we have a better infrastructure to enable # and disable tasks plugins, hard-code the tasks we know exist, # instead of loading everything from the namespace. This guarantees # both, the load order of these plugins and the fact that no random # plugins will be added/loaded until we feel comfortable with this. # Future patches will keep using NamedExtensionManager but they'll # rely on a config option to control this process. extensions = named.NamedExtensionManager('glance.flows.import', names=['convert', 'introspect'], name_order=True, invoke_on_load=True, invoke_kwds=kwargs) for ext in extensions.extensions: yield ext.obj def get_flow(**kwargs): """Return task flow :param task_id: Task ID :param task_type: Type of the task :param task_repo: Task repo :param image_repo: Image repository used :param image_factory: Glance Image Factory :param uri: uri for the image file """ task_id = kwargs.get('task_id') task_type = kwargs.get('task_type') task_repo = kwargs.get('task_repo') image_repo = kwargs.get('image_repo') image_factory = kwargs.get('image_factory') uri = kwargs.get('uri') flow = lf.Flow(task_type, retry=retry.AlwaysRevert()).add( _CreateImage(task_id, task_type, task_repo, image_repo, image_factory)) import_to_store = _ImportToStore(task_id, task_type, image_repo, uri) try: # NOTE(flaper87): ImportToLocal and DeleteFromLocal shouldn't be here. # Ideally, we should have the different import flows doing this for us # and this function should clean up duplicated tasks. For example, say # 2 flows need to have a local copy of the image - ImportToLocal - in # order to be able to complete the task - i.e Introspect-. In that # case, the introspect.get_flow call should add both, ImportToLocal and # DeleteFromLocal, to the flow and this function will reduce the # duplicated calls to those tasks by creating a linear flow that # ensures those are called before the other tasks. For now, I'm # keeping them here, though. limbo = lf.Flow(task_type).add(_ImportToFS(task_id, task_type, task_repo, uri)) for subflow in _get_import_flows(**kwargs): limbo.add(subflow) # NOTE(flaper87): We have hard-coded 2 tasks, # if there aren't more than 2, it means that # no subtask has been registered. if len(limbo) > 1: flow.add(limbo) # NOTE(flaper87): Until this implementation gets smarter, # make sure ImportToStore is called *after* the imported # flow stages. If not, the image will be set to saving state # invalidating tasks like Introspection or Convert. flow.add(import_to_store) # NOTE(flaper87): Since this is an "optional" task but required # when `limbo` is executed, we're adding it in its own subflow # to isolate it from the rest of the flow. delete_flow = lf.Flow(task_type).add(_DeleteFromFS(task_id, task_type)) flow.add(delete_flow) else: flow.add(import_to_store) except exception.BadTaskConfiguration: # NOTE(flaper87): If something goes wrong with the load of # import tasks, make sure we go on. flow.add(import_to_store) flow.add( _SaveImage(task_id, task_type, image_repo), _CompleteTask(task_id, task_type, task_repo) ) return flow