From 09312ab2c723d2cfb233ea0e417a297f38b67f40 Mon Sep 17 00:00:00 2001 From: Erno Kuvaja Date: Thu, 5 Jan 2017 16:59:47 +0000 Subject: [PATCH] Add import endpoint to initiate image import Adds endpoint and needed logic to initiate the task that takes care of the actual image import work after the image has been created and data provided. Change-Id: I5327f2d80c87f8afa02d6aa653a1a3bdc18de5bc --- glance/api/v2/images.py | 23 +++ glance/api/v2/router.py | 8 ++ glance/async/taskflow_executor.py | 7 +- glance/common/scripts/__init__.py | 8 ++ .../scripts/api_image_import/__init__.py | 0 .../common/scripts/api_image_import/main.py | 136 ++++++++++++++++++ glance/common/scripts/utils.py | 15 +- glance/domain/__init__.py | 2 +- 8 files changed, 190 insertions(+), 9 deletions(-) create mode 100644 glance/common/scripts/api_image_import/__init__.py create mode 100644 glance/common/scripts/api_image_import/main.py diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index 000826f5bf..21010a7f5f 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -25,6 +25,7 @@ from six.moves import http_client as http import six.moves.urllib.parse as urlparse import webob.exc +from glance.api import common from glance.api import policy from glance.common import exception from glance.common import location_strategy @@ -87,6 +88,28 @@ class ImagesController(object): return image + @utils.mutating + def import_image(self, req, image): + task_factory = self.gateway.get_task_factory(req.context) + executor_factory = self.gateway.get_task_executor_factory(req.context) + task_repo = self.gateway.get_task_repo(req.context) + + task_input = {} + + try: + import_task = task_factory.new_task(task_type='api_image_import', + owner=req.context.owner, + task_input=task_input) + task_repo.add(import_task) + task_executor = executor_factory.new_task_executor(req.context) + pool = common.get_thread_pool("tasks_eventlet_pool") + pool.spawn_n(import_task.run, task_executor) + except exception.Forbidden as e: + LOG.debug("User not permitted to create image import task.") + raise webob.exc.HTTPForbidden(explanation=e.msg) + + return image + def index(self, req, marker=None, limit=None, sort_key=None, sort_dir=None, filters=None, member_status='accepted'): sort_key = ['created_at'] if not sort_key else sort_key diff --git a/glance/api/v2/router.py b/glance/api/v2/router.py index ffe1db2b5e..3334f6a5a2 100644 --- a/glance/api/v2/router.py +++ b/glance/api/v2/router.py @@ -424,6 +424,14 @@ class API(wsgi.Router): controller=reject_method_resource, action='reject', allowed_methods='GET, PATCH, DELETE') + mapper.connect('/images/{image_id}/import', + controller=images_resource, + action='import_image', + conditions={'method': ['POST']}) + mapper.connect('/images/{image_id}/import', + controller=reject_method_resource, + action='reject', + allowed_methods='POST') image_actions_resource = image_actions.create_resource() mapper.connect('/images/{image_id}/actions/deactivate', diff --git a/glance/async/taskflow_executor.py b/glance/async/taskflow_executor.py index c485987364..ab252fbd4c 100644 --- a/glance/async/taskflow_executor.py +++ b/glance/async/taskflow_executor.py @@ -112,11 +112,8 @@ class TaskExecutor(glance.async.TaskExecutor): def _get_flow(self, task): try: task_input = script_utils.unpack_task_input(task) - uri = script_utils.validate_location_uri( - task_input.get('import_from')) kwds = { - 'uri': uri, 'task_id': task.task_id, 'task_type': task.type, 'context': self.context, @@ -125,6 +122,10 @@ class TaskExecutor(glance.async.TaskExecutor): 'image_factory': self.image_factory } + if task.type == "import": + uri = script_utils.validate_location_uri( + task_input.get('import_from')) + kwds['uri'] = uri return driver.DriverManager('glance.flows', task.type, invoke_on_load=True, invoke_kwds=kwds).driver diff --git a/glance/common/scripts/__init__.py b/glance/common/scripts/__init__.py index 49542346c1..37a53a2f40 100644 --- a/glance/common/scripts/__init__.py +++ b/glance/common/scripts/__init__.py @@ -15,6 +15,7 @@ from oslo_log import log as logging +from glance.common.scripts.api_image_import import main as api_image_import from glance.common.scripts.image_import import main as image_import from glance.i18n import _LE, _LI @@ -34,6 +35,13 @@ def run_task(task_id, task_type, context, image_import.run(task_id, context, task_repo, image_repo, image_factory) + elif task_type == 'api_image_import': + api_image_import.run(task_id, + context, + task_repo, + image_repo, + image_factory) + else: msg = _LE("This task type %(task_type)s is not supported by the " "current deployment of Glance. Please refer the " diff --git a/glance/common/scripts/api_image_import/__init__.py b/glance/common/scripts/api_image_import/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/glance/common/scripts/api_image_import/main.py b/glance/common/scripts/api_image_import/main.py new file mode 100644 index 0000000000..4c5bee1278 --- /dev/null +++ b/glance/common/scripts/api_image_import/main.py @@ -0,0 +1,136 @@ +# Copyright 2014 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +__all__ = [ + 'run', +] + +from oslo_concurrency import lockutils +from oslo_log import log as logging +from oslo_utils import encodeutils +from oslo_utils import excutils +import six + +from glance.api.v2 import images as v2_api +from glance.common import exception +from glance.common.scripts import utils as script_utils +from glance.common import store_utils +from glance.i18n import _ + +LOG = logging.getLogger(__name__) + + +def run(t_id, context, task_repo, image_repo, image_factory): + LOG.info('Task %(task_id)s beginning image import ' + 'execution.', {'task_id': t_id}) + _execute(t_id, task_repo, image_repo, image_factory) + + +# NOTE(nikhil): This lock prevents more than N number of threads to be spawn +# simultaneously. The number N represents the number of threads in the +# executor pool. The value is set to 10 in the eventlet executor. +@lockutils.synchronized("glance_image_import") +def _execute(t_id, task_repo, image_repo, image_factory): + task = script_utils.get_task(task_repo, t_id) + + if task is None: + # NOTE: This happens if task is not found in the database. In + # such cases, there is no way to update the task status so, + # it's ignored here. + return + + try: + task_input = script_utils.unpack_task_input(task) + + image_id = task_input.get('image_id') + + task.succeed({'image_id': image_id}) + except Exception as e: + # Note: The message string contains Error in it to indicate + # in the task.message that it's a error message for the user. + + # TODO(nikhil): need to bring back save_and_reraise_exception when + # necessary + err_msg = ("Error: " + six.text_type(type(e)) + ': ' + + encodeutils.exception_to_unicode(e)) + log_msg = err_msg + ("Task ID %s" % task.task_id) + LOG.exception(log_msg) + + task.fail(_(err_msg)) # noqa + finally: + task_repo.save(task) + + +def import_image(image_repo, image_factory, task_input, task_id, uri): + original_image = v2_api.create_image(image_repo, + image_factory, + task_input.get('image_properties'), + task_id) + # NOTE: set image status to saving just before setting data + original_image.status = 'saving' + image_repo.save(original_image) + image_id = original_image.image_id + + # NOTE: Retrieving image from the database because the Image object + # returned from create_image method does not have appropriate factories + # wrapped around it. + new_image = image_repo.get(image_id) + set_image_data(new_image, uri, task_id) + + try: + # NOTE: Check if the Image is not deleted after setting the data + # before saving the active image. Here if image status is + # saving, then new_image is saved as it contains updated location, + # size, virtual_size and checksum information and the status of + # new_image is already set to active in set_image_data() call. + image = image_repo.get(image_id) + if image.status == 'saving': + image_repo.save(new_image) + return image_id + else: + msg = _("The Image %(image_id)s object being created by this task " + "%(task_id)s, is no longer in valid status for further " + "processing.") % {"image_id": image_id, + "task_id": task_id} + raise exception.Conflict(msg) + except (exception.Conflict, exception.NotFound, + exception.NotAuthenticated): + with excutils.save_and_reraise_exception(): + if new_image.locations: + for location in new_image.locations: + store_utils.delete_image_location_from_backend( + new_image.context, + image_id, + location) + + +def set_image_data(image, uri, task_id): + data_iter = None + try: + LOG.info("Task %(task_id)s: Got image data uri %(data_uri)s to be " + "imported", {"data_uri": uri, "task_id": task_id}) + data_iter = script_utils.get_image_data_iter(uri) + image.set_data(data_iter) + except Exception as e: + with excutils.save_and_reraise_exception(): + LOG.warn("Task %(task_id)s failed with exception %(error)s" % + {"error": encodeutils.exception_to_unicode(e), + "task_id": task_id}) + LOG.info("Task %(task_id)s: Could not import image file" + " %(image_data)s", {"image_data": uri, + "task_id": task_id}) + finally: + if hasattr(data_iter, 'close'): + data_iter.close() diff --git a/glance/common/scripts/utils.py b/glance/common/scripts/utils.py index 894f82bf6b..f88d21015a 100644 --- a/glance/common/scripts/utils.py +++ b/glance/common/scripts/utils.py @@ -52,14 +52,19 @@ def unpack_task_input(task): :param task: Task domain object """ + task_type = task.type task_input = task.task_input - # NOTE: until we support multiple task types, we just check for - # input fields related to 'import task'. - for key in ["import_from", "import_from_format", "image_properties"]: - if key not in task_input: - msg = _("Input does not contain '%(key)s' field") % {"key": key} + if task_type == 'api_image_import': + if 'import_method' not in task_input: + msg = _("Input does not contain 'import_method'") raise exception.Invalid(msg) + else: + for key in ["import_from", "import_from_format", "image_properties"]: + if key not in task_input: + msg = (_("Input does not contain '%(key)s' field") % + {"key": key}) + raise exception.Invalid(msg) return task_input diff --git a/glance/domain/__init__.py b/glance/domain/__init__.py index 7f97a4fdcd..c6d5ded207 100644 --- a/glance/domain/__init__.py +++ b/glance/domain/__init__.py @@ -343,7 +343,7 @@ class ImageMemberFactory(object): class Task(object): - _supported_task_type = ('import',) + _supported_task_type = ('import', 'api_image_import') _supported_task_status = ('pending', 'processing', 'success', 'failure')