Add import endpoint to initiate image import

Adds endpoint and needed logic to initiate the task that takes
care of the actual image import work after the image has been
created and data provided.

Change-Id: I5327f2d80c87f8afa02d6aa653a1a3bdc18de5bc
This commit is contained in:
Erno Kuvaja 2017-01-05 16:59:47 +00:00
parent bc78de2dd2
commit 09312ab2c7
8 changed files with 190 additions and 9 deletions

View File

@ -25,6 +25,7 @@ from six.moves import http_client as http
import six.moves.urllib.parse as urlparse
import webob.exc
from glance.api import common
from glance.api import policy
from glance.common import exception
from glance.common import location_strategy
@ -87,6 +88,28 @@ class ImagesController(object):
return image
@utils.mutating
def import_image(self, req, image):
task_factory = self.gateway.get_task_factory(req.context)
executor_factory = self.gateway.get_task_executor_factory(req.context)
task_repo = self.gateway.get_task_repo(req.context)
task_input = {}
try:
import_task = task_factory.new_task(task_type='api_image_import',
owner=req.context.owner,
task_input=task_input)
task_repo.add(import_task)
task_executor = executor_factory.new_task_executor(req.context)
pool = common.get_thread_pool("tasks_eventlet_pool")
pool.spawn_n(import_task.run, task_executor)
except exception.Forbidden as e:
LOG.debug("User not permitted to create image import task.")
raise webob.exc.HTTPForbidden(explanation=e.msg)
return image
def index(self, req, marker=None, limit=None, sort_key=None,
sort_dir=None, filters=None, member_status='accepted'):
sort_key = ['created_at'] if not sort_key else sort_key

View File

@ -424,6 +424,14 @@ class API(wsgi.Router):
controller=reject_method_resource,
action='reject',
allowed_methods='GET, PATCH, DELETE')
mapper.connect('/images/{image_id}/import',
controller=images_resource,
action='import_image',
conditions={'method': ['POST']})
mapper.connect('/images/{image_id}/import',
controller=reject_method_resource,
action='reject',
allowed_methods='POST')
image_actions_resource = image_actions.create_resource()
mapper.connect('/images/{image_id}/actions/deactivate',

View File

@ -112,11 +112,8 @@ class TaskExecutor(glance.async.TaskExecutor):
def _get_flow(self, task):
try:
task_input = script_utils.unpack_task_input(task)
uri = script_utils.validate_location_uri(
task_input.get('import_from'))
kwds = {
'uri': uri,
'task_id': task.task_id,
'task_type': task.type,
'context': self.context,
@ -125,6 +122,10 @@ class TaskExecutor(glance.async.TaskExecutor):
'image_factory': self.image_factory
}
if task.type == "import":
uri = script_utils.validate_location_uri(
task_input.get('import_from'))
kwds['uri'] = uri
return driver.DriverManager('glance.flows', task.type,
invoke_on_load=True,
invoke_kwds=kwds).driver

View File

@ -15,6 +15,7 @@
from oslo_log import log as logging
from glance.common.scripts.api_image_import import main as api_image_import
from glance.common.scripts.image_import import main as image_import
from glance.i18n import _LE, _LI
@ -34,6 +35,13 @@ def run_task(task_id, task_type, context,
image_import.run(task_id, context, task_repo,
image_repo, image_factory)
elif task_type == 'api_image_import':
api_image_import.run(task_id,
context,
task_repo,
image_repo,
image_factory)
else:
msg = _LE("This task type %(task_type)s is not supported by the "
"current deployment of Glance. Please refer the "

View File

@ -0,0 +1,136 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'run',
]
from oslo_concurrency import lockutils
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import excutils
import six
from glance.api.v2 import images as v2_api
from glance.common import exception
from glance.common.scripts import utils as script_utils
from glance.common import store_utils
from glance.i18n import _
LOG = logging.getLogger(__name__)
def run(t_id, context, task_repo, image_repo, image_factory):
LOG.info('Task %(task_id)s beginning image import '
'execution.', {'task_id': t_id})
_execute(t_id, task_repo, image_repo, image_factory)
# NOTE(nikhil): This lock prevents more than N number of threads to be spawn
# simultaneously. The number N represents the number of threads in the
# executor pool. The value is set to 10 in the eventlet executor.
@lockutils.synchronized("glance_image_import")
def _execute(t_id, task_repo, image_repo, image_factory):
task = script_utils.get_task(task_repo, t_id)
if task is None:
# NOTE: This happens if task is not found in the database. In
# such cases, there is no way to update the task status so,
# it's ignored here.
return
try:
task_input = script_utils.unpack_task_input(task)
image_id = task_input.get('image_id')
task.succeed({'image_id': image_id})
except Exception as e:
# Note: The message string contains Error in it to indicate
# in the task.message that it's a error message for the user.
# TODO(nikhil): need to bring back save_and_reraise_exception when
# necessary
err_msg = ("Error: " + six.text_type(type(e)) + ': ' +
encodeutils.exception_to_unicode(e))
log_msg = err_msg + ("Task ID %s" % task.task_id)
LOG.exception(log_msg)
task.fail(_(err_msg)) # noqa
finally:
task_repo.save(task)
def import_image(image_repo, image_factory, task_input, task_id, uri):
original_image = v2_api.create_image(image_repo,
image_factory,
task_input.get('image_properties'),
task_id)
# NOTE: set image status to saving just before setting data
original_image.status = 'saving'
image_repo.save(original_image)
image_id = original_image.image_id
# NOTE: Retrieving image from the database because the Image object
# returned from create_image method does not have appropriate factories
# wrapped around it.
new_image = image_repo.get(image_id)
set_image_data(new_image, uri, task_id)
try:
# NOTE: Check if the Image is not deleted after setting the data
# before saving the active image. Here if image status is
# saving, then new_image is saved as it contains updated location,
# size, virtual_size and checksum information and the status of
# new_image is already set to active in set_image_data() call.
image = image_repo.get(image_id)
if image.status == 'saving':
image_repo.save(new_image)
return image_id
else:
msg = _("The Image %(image_id)s object being created by this task "
"%(task_id)s, is no longer in valid status for further "
"processing.") % {"image_id": image_id,
"task_id": task_id}
raise exception.Conflict(msg)
except (exception.Conflict, exception.NotFound,
exception.NotAuthenticated):
with excutils.save_and_reraise_exception():
if new_image.locations:
for location in new_image.locations:
store_utils.delete_image_location_from_backend(
new_image.context,
image_id,
location)
def set_image_data(image, uri, task_id):
data_iter = None
try:
LOG.info("Task %(task_id)s: Got image data uri %(data_uri)s to be "
"imported", {"data_uri": uri, "task_id": task_id})
data_iter = script_utils.get_image_data_iter(uri)
image.set_data(data_iter)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.warn("Task %(task_id)s failed with exception %(error)s" %
{"error": encodeutils.exception_to_unicode(e),
"task_id": task_id})
LOG.info("Task %(task_id)s: Could not import image file"
" %(image_data)s", {"image_data": uri,
"task_id": task_id})
finally:
if hasattr(data_iter, 'close'):
data_iter.close()

View File

@ -52,14 +52,19 @@ def unpack_task_input(task):
:param task: Task domain object
"""
task_type = task.type
task_input = task.task_input
# NOTE: until we support multiple task types, we just check for
# input fields related to 'import task'.
for key in ["import_from", "import_from_format", "image_properties"]:
if key not in task_input:
msg = _("Input does not contain '%(key)s' field") % {"key": key}
if task_type == 'api_image_import':
if 'import_method' not in task_input:
msg = _("Input does not contain 'import_method'")
raise exception.Invalid(msg)
else:
for key in ["import_from", "import_from_format", "image_properties"]:
if key not in task_input:
msg = (_("Input does not contain '%(key)s' field") %
{"key": key})
raise exception.Invalid(msg)
return task_input

View File

@ -343,7 +343,7 @@ class ImageMemberFactory(object):
class Task(object):
_supported_task_type = ('import',)
_supported_task_type = ('import', 'api_image_import')
_supported_task_status = ('pending', 'processing', 'success', 'failure')