diff --git a/glance/async/flows/base_import.py b/glance/async/flows/base_import.py index 610ef873ad..3607799dcc 100644 --- a/glance/async/flows/base_import.py +++ b/glance/async/flows/base_import.py @@ -20,7 +20,7 @@ import glance_store as store_api from glance_store import backend from oslo_config import cfg import six -from stevedore import extension +from stevedore import named from taskflow.patterns import linear_flow as lf from taskflow import retry from taskflow import task @@ -350,9 +350,19 @@ class _CompleteTask(task.Task): def _get_import_flows(**kwargs): - extensions = extension.ExtensionManager('glance.flows.import', - invoke_on_load=True, - invoke_kwds=kwargs) + # NOTE(flaper87): Until we have a better infrastructure to enable + # and disable tasks plugins, hard-code the tasks we know exist, + # instead of loading everything from the namespace. This guarantees + # both, the load order of these plugins and the fact that no random + # plugins will be added/loaded until we feel comfortable with this. + # Future patches will keep using NamedExtensionManager but they'll + # rely on a config option to control this process. + extensions = named.NamedExtensionManager('glance.flows.import', + names=['convert', + 'introspect'], + name_order=True, + invoke_on_load=True, + invoke_kwds=kwargs) for ext in extensions.extensions: yield ext.obj diff --git a/glance/async/flows/convert.py b/glance/async/flows/convert.py new file mode 100644 index 0000000000..1281c95a2e --- /dev/null +++ b/glance/async/flows/convert.py @@ -0,0 +1,94 @@ +# Copyright 2015 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_concurrency import processutils as putils +from oslo_config import cfg +from taskflow.patterns import linear_flow as lf +from taskflow import task + +from glance import i18n + +_ = i18n._ +_LI = i18n._LI +_LE = i18n._LE +_LW = i18n._LW +LOG = logging.getLogger(__name__) + +convert_task_opts = [ + cfg.StrOpt('conversion_format', + default=None, + choices=('qcow2', 'raw'), + help=_("The format to which images will be automatically " + "converted. " "Can be 'qcow2' or 'raw'.")), +] + +CONF = cfg.CONF + +# NOTE(flaper87): Registering under the taskflow_executor section +# for now. It seems a waste to have a whole section dedidcated to a +# single task with a single option. +CONF.register_opts(convert_task_opts, group='taskflow_executor') + + +class _Convert(task.Task): + + conversion_missing_warned = False + + def __init__(self, task_id, task_type, image_repo): + self.task_id = task_id + self.task_type = task_type + self.image_repo = image_repo + super(_Convert, self).__init__( + name='%s-Convert-%s' % (task_type, task_id)) + + def execute(self, image_id, file_path): + + # NOTE(flaper87): A format must be explicitly + # specified. There's no "sane" default for this + # because the dest format may work differently depending + # on the environment OpenStack is running in. + conversion_format = CONF.taskflow_executor.conversion_format + if conversion_format is None: + if not _Convert.conversion_missing_warned: + msg = (_LW('The conversion format is None, please add a value ' + 'for it in the config file for this task to ' + 'work: %s') % + self.task_id) + LOG.warn(msg) + _Convert.conversion_missing_warned = True + return + + # TODO(flaper87): Check whether the image is in the desired + # format already. Probably using `qemu-img` just like the + # `Introspection` task. + dest_path = "%s.converted" + stdout, stderr = putils.trycmd('qemu-img', 'convert', '-O', + conversion_format, file_path, dest_path, + log_errors=putils.LOG_ALL_ERRORS) + + if stderr: + raise RuntimeError(stderr) + + +def get_flow(**kwargs): + task_id = kwargs.get('task_id') + task_type = kwargs.get('task_type') + image_repo = kwargs.get('image_repo') + + return lf.Flow(task_type).add( + _Convert(task_id, task_type, image_repo), + ) diff --git a/glance/tests/unit/async/flows/test_convert.py b/glance/tests/unit/async/flows/test_convert.py new file mode 100644 index 0000000000..03ebe23a0d --- /dev/null +++ b/glance/tests/unit/async/flows/test_convert.py @@ -0,0 +1,152 @@ +# Copyright 2015 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import mock +import os +import StringIO + +import glance_store +from oslo_concurrency import processutils +from oslo_config import cfg + +from glance.async.flows import convert +from glance.async import taskflow_executor +from glance.common.scripts import utils as script_utils +from glance.common import utils +from glance import domain +from glance import gateway +import glance.tests.utils as test_utils + +CONF = cfg.CONF + +UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d' +TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df' + + +class TestImportTask(test_utils.BaseTestCase): + + def setUp(self): + super(TestImportTask, self).setUp() + self.work_dir = os.path.join(self.test_dir, 'work_dir') + utils.safe_mkdirs(self.work_dir) + self.config(work_dir=self.work_dir, group='task') + + self.context = mock.MagicMock() + self.img_repo = mock.MagicMock() + self.task_repo = mock.MagicMock() + + self.gateway = gateway.Gateway() + self.task_factory = domain.TaskFactory() + self.img_factory = self.gateway.get_image_factory(self.context) + self.image = self.img_factory.new_image(image_id=UUID1, + disk_format='raw', + container_format='bare') + + task_input = { + "import_from": "http://cloud.foo/image.raw", + "import_from_format": "raw", + "image_properties": {'disk_format': 'qcow2', + 'container_format': 'bare'} + } + task_ttl = CONF.task.task_time_to_live + + self.task_type = 'import' + self.task = self.task_factory.new_task(self.task_type, TENANT1, + task_time_to_live=task_ttl, + task_input=task_input) + + glance_store.register_opts(CONF) + self.config(default_store='file', + stores=['file', 'http'], + filesystem_store_datadir=self.test_dir, + group="glance_store") + + self.config(conversion_format='qcow2', + group='taskflow_executor') + glance_store.create_stores(CONF) + + def test_convert_success(self): + image_convert = convert._Convert(self.task.task_id, + self.task_type, + self.img_repo) + + self.task_repo.get.return_value = self.task + image_id = mock.sentinel.image_id + image = mock.MagicMock(image_id=image_id, virtual_size=None) + self.img_repo.get.return_value = image + + with mock.patch.object(processutils, 'execute') as exc_mock: + exc_mock.return_value = ("", None) + image_convert.execute(image, '/test/path.raw') + + def test_import_flow_with_convert_and_introspect(self): + self.config(engine_mode='serial', + group='taskflow_executor') + + image = self.img_factory.new_image(image_id=UUID1, + disk_format='raw', + container_format='bare') + + img_factory = mock.MagicMock() + + executor = taskflow_executor.TaskExecutor( + self.context, + self.task_repo, + self.img_repo, + img_factory) + + self.task_repo.get.return_value = self.task + + def create_image(*args, **kwargs): + kwargs['image_id'] = UUID1 + return self.img_factory.new_image(*args, **kwargs) + + self.img_repo.get.return_value = image + img_factory.new_image.side_effect = create_image + + with mock.patch.object(script_utils, 'get_image_data_iter') as dmock: + dmock.return_value = StringIO.StringIO("TEST_IMAGE") + + with mock.patch.object(processutils, 'execute') as exc_mock: + result = json.dumps({ + "virtual-size": 10737418240, + "filename": "/tmp/image.qcow2", + "cluster-size": 65536, + "format": "qcow2", + "actual-size": 373030912, + "format-specific": { + "type": "qcow2", + "data": { + "compat": "0.10" + } + }, + "dirty-flag": False + }) + + # NOTE(flaper87): First result for the conversion step and + # the second one for the introspection one. The later *must* + # come after the former. If not, the current builtin flow + # process will be unsound. + # Follow-up work will fix this by having a better way to handle + # task's dependencies and activation. + exc_mock.side_effect = [("", None), (result, None)] + executor.begin_processing(self.task.task_id) + image_path = os.path.join(self.test_dir, image.image_id) + tmp_image_path = "%s.tasks_import" % image_path + self.assertFalse(os.path.exists(tmp_image_path)) + self.assertTrue(os.path.exists(image_path)) + self.assertEqual('qcow2', image.disk_format) + self.assertEqual(10737418240, image.virtual_size) diff --git a/setup.cfg b/setup.cfg index f84c97e868..af165d7101 100644 --- a/setup.cfg +++ b/setup.cfg @@ -51,6 +51,7 @@ glance.flows = import = glance.async.flows.base_import:get_flow glance.flows.import = + convert = glance.async.flows.convert:get_flow introspect = glance.async.flows.introspect:get_flow [build_sphinx]