@ -27,6 +27,7 @@ from taskflow.types import failure
import glance.async.flows.base_import as import_flow
from glance.async import taskflow_executor
from glance.async import utils as async_utils
from glance.common.scripts.image_import import main as image_import
from glance.common.scripts import utils as script_utils
from glance.common import utils
@ -86,6 +87,14 @@ class TestImportTask(test_utils.BaseTestCase):
task_time_to_live = task_ttl ,
task_input = task_input )
def _assert_qemu_process_limits ( self , exec_mock ) :
# NOTE(hemanthm): Assert that process limits are being applied
# on "qemu-img info" calls. See bug #1449062 for more details.
kw_args = exec_mock . call_args [ 1 ]
self . assertIn ( ' prlimit ' , kw_args )
self . assertEqual ( async_utils . QEMU_IMG_PROC_LIMITS ,
kw_args . get ( ' prlimit ' ) )
def test_import_flow ( self ) :
self . config ( engine_mode = ' serial ' ,
group = ' taskflow_executor ' )
@ -127,6 +136,8 @@ class TestImportTask(test_utils.BaseTestCase):
self . image . image_id ) ,
self . image . locations [ 0 ] [ ' url ' ] )
self . _assert_qemu_process_limits ( tmock )
def test_import_flow_missing_work_dir ( self ) :
self . config ( engine_mode = ' serial ' , group = ' taskflow_executor ' )
self . config ( work_dir = None , group = ' task ' )
@ -235,6 +246,7 @@ class TestImportTask(test_utils.BaseTestCase):
self . assertTrue ( rmock . called )
self . assertIsInstance ( rmock . call_args [ 1 ] [ ' result ' ] ,
failure . Failure )
self . _assert_qemu_process_limits ( tmock )
image_path = os . path . join ( self . test_dir ,
self . image . image_id )
@ -283,6 +295,8 @@ class TestImportTask(test_utils.BaseTestCase):
executor . begin_processing ,
self . task . task_id )
self . _assert_qemu_process_limits ( tmock )
image_path = os . path . join ( self . test_dir ,
self . image . image_id )
tmp_image_path = os . path . join ( self . work_dir ,
@ -393,6 +407,7 @@ class TestImportTask(test_utils.BaseTestCase):
image_path = os . path . join ( self . work_dir , image_id )
tmp_image_path = os . path . join ( self . work_dir , image_path )
self . assertTrue ( os . path . exists ( tmp_image_path ) )
self . _assert_qemu_process_limits ( tmock )
def test_delete_from_fs ( self ) :
delete_fs = import_flow . _DeleteFromFS ( self . task . task_id ,