Merge "Add import endpoint to initiate image import"
This commit is contained in:
commit
61046c8f24
|
@ -25,6 +25,7 @@ from six.moves import http_client as http
|
|||
import six.moves.urllib.parse as urlparse
|
||||
import webob.exc
|
||||
|
||||
from glance.api import common
|
||||
from glance.api import policy
|
||||
from glance.common import exception
|
||||
from glance.common import location_strategy
|
||||
|
@ -87,6 +88,28 @@ class ImagesController(object):
|
|||
|
||||
return image
|
||||
|
||||
@utils.mutating
|
||||
def import_image(self, req, image):
|
||||
task_factory = self.gateway.get_task_factory(req.context)
|
||||
executor_factory = self.gateway.get_task_executor_factory(req.context)
|
||||
task_repo = self.gateway.get_task_repo(req.context)
|
||||
|
||||
task_input = {}
|
||||
|
||||
try:
|
||||
import_task = task_factory.new_task(task_type='api_image_import',
|
||||
owner=req.context.owner,
|
||||
task_input=task_input)
|
||||
task_repo.add(import_task)
|
||||
task_executor = executor_factory.new_task_executor(req.context)
|
||||
pool = common.get_thread_pool("tasks_eventlet_pool")
|
||||
pool.spawn_n(import_task.run, task_executor)
|
||||
except exception.Forbidden as e:
|
||||
LOG.debug("User not permitted to create image import task.")
|
||||
raise webob.exc.HTTPForbidden(explanation=e.msg)
|
||||
|
||||
return image
|
||||
|
||||
def index(self, req, marker=None, limit=None, sort_key=None,
|
||||
sort_dir=None, filters=None, member_status='accepted'):
|
||||
sort_key = ['created_at'] if not sort_key else sort_key
|
||||
|
|
|
@ -424,6 +424,14 @@ class API(wsgi.Router):
|
|||
controller=reject_method_resource,
|
||||
action='reject',
|
||||
allowed_methods='GET, PATCH, DELETE')
|
||||
mapper.connect('/images/{image_id}/import',
|
||||
controller=images_resource,
|
||||
action='import_image',
|
||||
conditions={'method': ['POST']})
|
||||
mapper.connect('/images/{image_id}/import',
|
||||
controller=reject_method_resource,
|
||||
action='reject',
|
||||
allowed_methods='POST')
|
||||
|
||||
image_actions_resource = image_actions.create_resource()
|
||||
mapper.connect('/images/{image_id}/actions/deactivate',
|
||||
|
|
|
@ -112,11 +112,8 @@ class TaskExecutor(glance.async.TaskExecutor):
|
|||
def _get_flow(self, task):
|
||||
try:
|
||||
task_input = script_utils.unpack_task_input(task)
|
||||
uri = script_utils.validate_location_uri(
|
||||
task_input.get('import_from'))
|
||||
|
||||
kwds = {
|
||||
'uri': uri,
|
||||
'task_id': task.task_id,
|
||||
'task_type': task.type,
|
||||
'context': self.context,
|
||||
|
@ -125,6 +122,10 @@ class TaskExecutor(glance.async.TaskExecutor):
|
|||
'image_factory': self.image_factory
|
||||
}
|
||||
|
||||
if task.type == "import":
|
||||
uri = script_utils.validate_location_uri(
|
||||
task_input.get('import_from'))
|
||||
kwds['uri'] = uri
|
||||
return driver.DriverManager('glance.flows', task.type,
|
||||
invoke_on_load=True,
|
||||
invoke_kwds=kwds).driver
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from glance.common.scripts.api_image_import import main as api_image_import
|
||||
from glance.common.scripts.image_import import main as image_import
|
||||
from glance.i18n import _LE, _LI
|
||||
|
||||
|
@ -34,6 +35,13 @@ def run_task(task_id, task_type, context,
|
|||
image_import.run(task_id, context, task_repo,
|
||||
image_repo, image_factory)
|
||||
|
||||
elif task_type == 'api_image_import':
|
||||
api_image_import.run(task_id,
|
||||
context,
|
||||
task_repo,
|
||||
image_repo,
|
||||
image_factory)
|
||||
|
||||
else:
|
||||
msg = _LE("This task type %(task_type)s is not supported by the "
|
||||
"current deployment of Glance. Please refer the "
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
# Copyright 2014 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
__all__ = [
|
||||
'run',
|
||||
]
|
||||
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import encodeutils
|
||||
from oslo_utils import excutils
|
||||
import six
|
||||
|
||||
from glance.api.v2 import images as v2_api
|
||||
from glance.common import exception
|
||||
from glance.common.scripts import utils as script_utils
|
||||
from glance.common import store_utils
|
||||
from glance.i18n import _
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run(t_id, context, task_repo, image_repo, image_factory):
|
||||
LOG.info('Task %(task_id)s beginning image import '
|
||||
'execution.', {'task_id': t_id})
|
||||
_execute(t_id, task_repo, image_repo, image_factory)
|
||||
|
||||
|
||||
# NOTE(nikhil): This lock prevents more than N number of threads to be spawn
|
||||
# simultaneously. The number N represents the number of threads in the
|
||||
# executor pool. The value is set to 10 in the eventlet executor.
|
||||
@lockutils.synchronized("glance_image_import")
|
||||
def _execute(t_id, task_repo, image_repo, image_factory):
|
||||
task = script_utils.get_task(task_repo, t_id)
|
||||
|
||||
if task is None:
|
||||
# NOTE: This happens if task is not found in the database. In
|
||||
# such cases, there is no way to update the task status so,
|
||||
# it's ignored here.
|
||||
return
|
||||
|
||||
try:
|
||||
task_input = script_utils.unpack_task_input(task)
|
||||
|
||||
image_id = task_input.get('image_id')
|
||||
|
||||
task.succeed({'image_id': image_id})
|
||||
except Exception as e:
|
||||
# Note: The message string contains Error in it to indicate
|
||||
# in the task.message that it's a error message for the user.
|
||||
|
||||
# TODO(nikhil): need to bring back save_and_reraise_exception when
|
||||
# necessary
|
||||
err_msg = ("Error: " + six.text_type(type(e)) + ': ' +
|
||||
encodeutils.exception_to_unicode(e))
|
||||
log_msg = err_msg + ("Task ID %s" % task.task_id)
|
||||
LOG.exception(log_msg)
|
||||
|
||||
task.fail(_(err_msg)) # noqa
|
||||
finally:
|
||||
task_repo.save(task)
|
||||
|
||||
|
||||
def import_image(image_repo, image_factory, task_input, task_id, uri):
|
||||
original_image = v2_api.create_image(image_repo,
|
||||
image_factory,
|
||||
task_input.get('image_properties'),
|
||||
task_id)
|
||||
# NOTE: set image status to saving just before setting data
|
||||
original_image.status = 'saving'
|
||||
image_repo.save(original_image)
|
||||
image_id = original_image.image_id
|
||||
|
||||
# NOTE: Retrieving image from the database because the Image object
|
||||
# returned from create_image method does not have appropriate factories
|
||||
# wrapped around it.
|
||||
new_image = image_repo.get(image_id)
|
||||
set_image_data(new_image, uri, task_id)
|
||||
|
||||
try:
|
||||
# NOTE: Check if the Image is not deleted after setting the data
|
||||
# before saving the active image. Here if image status is
|
||||
# saving, then new_image is saved as it contains updated location,
|
||||
# size, virtual_size and checksum information and the status of
|
||||
# new_image is already set to active in set_image_data() call.
|
||||
image = image_repo.get(image_id)
|
||||
if image.status == 'saving':
|
||||
image_repo.save(new_image)
|
||||
return image_id
|
||||
else:
|
||||
msg = _("The Image %(image_id)s object being created by this task "
|
||||
"%(task_id)s, is no longer in valid status for further "
|
||||
"processing.") % {"image_id": image_id,
|
||||
"task_id": task_id}
|
||||
raise exception.Conflict(msg)
|
||||
except (exception.Conflict, exception.NotFound,
|
||||
exception.NotAuthenticated):
|
||||
with excutils.save_and_reraise_exception():
|
||||
if new_image.locations:
|
||||
for location in new_image.locations:
|
||||
store_utils.delete_image_location_from_backend(
|
||||
new_image.context,
|
||||
image_id,
|
||||
location)
|
||||
|
||||
|
||||
def set_image_data(image, uri, task_id):
|
||||
data_iter = None
|
||||
try:
|
||||
LOG.info("Task %(task_id)s: Got image data uri %(data_uri)s to be "
|
||||
"imported", {"data_uri": uri, "task_id": task_id})
|
||||
data_iter = script_utils.get_image_data_iter(uri)
|
||||
image.set_data(data_iter)
|
||||
except Exception as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.warn("Task %(task_id)s failed with exception %(error)s" %
|
||||
{"error": encodeutils.exception_to_unicode(e),
|
||||
"task_id": task_id})
|
||||
LOG.info("Task %(task_id)s: Could not import image file"
|
||||
" %(image_data)s", {"image_data": uri,
|
||||
"task_id": task_id})
|
||||
finally:
|
||||
if hasattr(data_iter, 'close'):
|
||||
data_iter.close()
|
|
@ -52,14 +52,19 @@ def unpack_task_input(task):
|
|||
|
||||
:param task: Task domain object
|
||||
"""
|
||||
task_type = task.type
|
||||
task_input = task.task_input
|
||||
|
||||
# NOTE: until we support multiple task types, we just check for
|
||||
# input fields related to 'import task'.
|
||||
for key in ["import_from", "import_from_format", "image_properties"]:
|
||||
if key not in task_input:
|
||||
msg = _("Input does not contain '%(key)s' field") % {"key": key}
|
||||
if task_type == 'api_image_import':
|
||||
if 'import_method' not in task_input:
|
||||
msg = _("Input does not contain 'import_method'")
|
||||
raise exception.Invalid(msg)
|
||||
else:
|
||||
for key in ["import_from", "import_from_format", "image_properties"]:
|
||||
if key not in task_input:
|
||||
msg = (_("Input does not contain '%(key)s' field") %
|
||||
{"key": key})
|
||||
raise exception.Invalid(msg)
|
||||
|
||||
return task_input
|
||||
|
||||
|
|
|
@ -343,7 +343,7 @@ class ImageMemberFactory(object):
|
|||
|
||||
|
||||
class Task(object):
|
||||
_supported_task_type = ('import',)
|
||||
_supported_task_type = ('import', 'api_image_import')
|
||||
|
||||
_supported_task_status = ('pending', 'processing', 'success', 'failure')
|
||||
|
||||
|
|
Loading…
Reference in New Issue