 5c6a1d4081
			
		
	
	5c6a1d4081
	
	
	
		
			
			Wrap the loading of the engine into a not found exception to match the usage in other entrypoints. This makes the usage of the various entrypoint loading functions more consistent. Change-Id: Id86f7b716c9b3dd76aba411529d2e647ad93a120
		
			
				
	
	
		
			133 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			133 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # -*- coding: utf-8 -*-
 | |
| 
 | |
| #    Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
 | |
| #
 | |
| #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | |
| #    not use this file except in compliance with the License. You may obtain
 | |
| #    a copy of the License at
 | |
| #
 | |
| #         http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| #    Unless required by applicable law or agreed to in writing, software
 | |
| #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | |
| #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | |
| #    License for the specific language governing permissions and limitations
 | |
| #    under the License.
 | |
| 
 | |
| import mock
 | |
| 
 | |
| from taskflow import exceptions as exc
 | |
| from taskflow.patterns import linear_flow
 | |
| from taskflow import test
 | |
| from taskflow.tests import utils as test_utils
 | |
| from taskflow.utils import persistence_utils as p_utils
 | |
| 
 | |
| import taskflow.engines
 | |
| 
 | |
| 
 | |
| class EngineLoadingTestCase(test.TestCase):
 | |
|     def test_default_load(self):
 | |
|         f = linear_flow.Flow('test')
 | |
|         f.add(test_utils.TaskOneReturn("run-1"))
 | |
|         e = taskflow.engines.load(f)
 | |
|         self.assertIsNotNone(e)
 | |
| 
 | |
|     def test_unknown_load(self):
 | |
|         f = linear_flow.Flow('test')
 | |
|         f.add(test_utils.TaskOneReturn("run-1"))
 | |
|         self.assertRaises(exc.NotFound, taskflow.engines.load, f,
 | |
|                           engine_conf='not_really_any_engine')
 | |
| 
 | |
| 
 | |
| class FlowFromDetailTestCase(test.TestCase):
 | |
|     def test_no_meta(self):
 | |
|         _lb, flow_detail = p_utils.temporary_flow_detail()
 | |
|         self.assertEqual({}, flow_detail.meta)
 | |
|         self.assertRaisesRegexp(ValueError,
 | |
|                                 '^Cannot .* no factory information saved.$',
 | |
|                                 taskflow.engines.flow_from_detail,
 | |
|                                 flow_detail)
 | |
| 
 | |
|     def test_no_factory_in_meta(self):
 | |
|         _lb, flow_detail = p_utils.temporary_flow_detail()
 | |
|         self.assertRaisesRegexp(ValueError,
 | |
|                                 '^Cannot .* no factory information saved.$',
 | |
|                                 taskflow.engines.flow_from_detail,
 | |
|                                 flow_detail)
 | |
| 
 | |
|     def test_no_importable_function(self):
 | |
|         _lb, flow_detail = p_utils.temporary_flow_detail()
 | |
|         flow_detail.meta = dict(factory=dict(
 | |
|             name='you can not import me, i contain spaces'
 | |
|         ))
 | |
|         self.assertRaisesRegexp(ImportError,
 | |
|                                 '^Could not import factory',
 | |
|                                 taskflow.engines.flow_from_detail,
 | |
|                                 flow_detail)
 | |
| 
 | |
|     def test_no_arg_factory(self):
 | |
|         name = 'some.test.factory'
 | |
|         _lb, flow_detail = p_utils.temporary_flow_detail()
 | |
|         flow_detail.meta = dict(factory=dict(name=name))
 | |
| 
 | |
|         with mock.patch('taskflow.openstack.common.importutils.import_class',
 | |
|                         return_value=lambda: 'RESULT') as mock_import:
 | |
|             result = taskflow.engines.flow_from_detail(flow_detail)
 | |
|             mock_import.assert_called_onec_with(name)
 | |
|         self.assertEqual(result, 'RESULT')
 | |
| 
 | |
|     def test_factory_with_arg(self):
 | |
|         name = 'some.test.factory'
 | |
|         _lb, flow_detail = p_utils.temporary_flow_detail()
 | |
|         flow_detail.meta = dict(factory=dict(name=name, args=['foo']))
 | |
| 
 | |
|         with mock.patch('taskflow.openstack.common.importutils.import_class',
 | |
|                         return_value=lambda x: 'RESULT %s' % x) as mock_import:
 | |
|             result = taskflow.engines.flow_from_detail(flow_detail)
 | |
|             mock_import.assert_called_onec_with(name)
 | |
|         self.assertEqual(result, 'RESULT foo')
 | |
| 
 | |
| 
 | |
| def my_flow_factory(task_name):
 | |
|     return test_utils.DummyTask(name=task_name)
 | |
| 
 | |
| 
 | |
| class LoadFromFactoryTestCase(test.TestCase):
 | |
| 
 | |
|     def test_non_reimportable(self):
 | |
| 
 | |
|         def factory():
 | |
|             pass
 | |
| 
 | |
|         self.assertRaisesRegexp(ValueError,
 | |
|                                 'Flow factory .* is not reimportable',
 | |
|                                 taskflow.engines.load_from_factory,
 | |
|                                 factory)
 | |
| 
 | |
|     def test_it_works(self):
 | |
|         engine = taskflow.engines.load_from_factory(
 | |
|             my_flow_factory, factory_kwargs={'task_name': 'test1'})
 | |
|         self.assertIsInstance(engine._flow, test_utils.DummyTask)
 | |
| 
 | |
|         fd = engine.storage._flowdetail
 | |
|         self.assertEqual(fd.name, 'test1')
 | |
|         self.assertEqual(fd.meta.get('factory'), {
 | |
|             'name': '%s.my_flow_factory' % __name__,
 | |
|             'args': [],
 | |
|             'kwargs': {'task_name': 'test1'},
 | |
|         })
 | |
| 
 | |
|     def test_it_works_by_name(self):
 | |
|         factory_name = '%s.my_flow_factory' % __name__
 | |
|         engine = taskflow.engines.load_from_factory(
 | |
|             factory_name, factory_kwargs={'task_name': 'test1'})
 | |
|         self.assertIsInstance(engine._flow, test_utils.DummyTask)
 | |
| 
 | |
|         fd = engine.storage._flowdetail
 | |
|         self.assertEqual(fd.name, 'test1')
 | |
|         self.assertEqual(fd.meta.get('factory'), {
 | |
|             'name': factory_name,
 | |
|             'args': [],
 | |
|             'kwargs': {'task_name': 'test1'},
 | |
|         })
 |