Changing image commands to be async functions

Instead of having separate classes for each async command, they now are
functions inside StandbyMode which are run in separate threads. I also
cleaned up the function signatures for StandbyMode.
This commit is contained in:
Josh Gachnang 2014-02-05 15:41:48 -08:00
parent cedfae2e48
commit 313a01e5fa
4 changed files with 91 additions and 62 deletions

@ -14,7 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import abc
import collections
import threading
import uuid
@ -71,11 +70,13 @@ class SyncCommandResult(BaseCommandResult):
class AsyncCommandResult(BaseCommandResult):
"""A command that executes asynchronously in the background. Subclasses
should override `execute` to implement actual command execution.
"""A command that executes asynchronously in the background.
:param execute_method: a callable to be executed asynchronously
"""
def __init__(self, command_name, command_params):
def __init__(self, command_name, command_params, execute_method):
super(AsyncCommandResult, self).__init__(command_name, command_params)
self.execute_method = execute_method
self.command_state_lock = threading.Lock()
thread_name = 'agent-command-{}'.format(self.id)
@ -100,7 +101,7 @@ class AsyncCommandResult(BaseCommandResult):
def run(self):
try:
result = self.execute()
result = self.execute_method(**self.command_params)
with self.command_state_lock:
self.command_result = result
self.command_status = AgentCommandStatus.SUCCEEDED
@ -113,10 +114,6 @@ class AsyncCommandResult(BaseCommandResult):
self.command_error = e
self.command_status = AgentCommandStatus.FAILED
@abc.abstractmethod
def execute(self):
pass
class BaseAgentMode(object):
def __init__(self, name):

@ -143,37 +143,6 @@ def _run_image():
raise errors.SystemRebootError(exit_code)
class CacheImageCommand(base.AsyncCommandResult):
def execute(self):
image_info = self.command_params['image_info']
device = hardware.get_manager().get_os_install_device()
_download_image(image_info)
_write_image(image_info, device)
class PrepareImageCommand(base.AsyncCommandResult):
"""Downloads and writes an image and configdrive to a device."""
def execute(self):
image_info = self.command_params['image_info']
location = _configdrive_location()
metadata = self.command_params['metadata']
files = self.command_params['files']
device = hardware.get_manager().get_os_install_device()
_download_image(image_info)
_write_image(image_info, device)
log.debug('Writing configdrive', location=location)
configdrive.write_configdrive(location, metadata, files)
_copy_configdrive_to_disk(location, device)
class RunImageCommand(base.AsyncCommandResult):
def execute(self):
_run_image()
class StandbyMode(base.BaseAgentMode):
def __init__(self):
super(StandbyMode, self).__init__('STANDBY')
@ -196,16 +165,51 @@ class StandbyMode(base.BaseAgentMode):
'Image \'hashes\' must be a dictionary with at least one '
'element.')
def cache_image(self, command_name, image_info):
self._validate_image_info(image_info)
return CacheImageCommand(command_name, image_info).start()
def prepare_image(self, command_name, **command_params):
self._validate_image_info(command_params['image_info'])
return PrepareImageCommand(command_name, command_params).start()
def run_image(self, command_name, image_info):
def cache_image(self, image_info):
self._validate_image_info(image_info)
return RunImageCommand(command_name, image_info).start()
command_params = {
"image_info": image_info
}
return base.AsyncCommandResult("cache_image",
command_params,
self._thread_cache_image).start()
def prepare_image(self, image_info, metadata, files):
self._validate_image_info(image_info)
command_params = {
"image_info": image_info,
"metadata": metadata,
"files": files
}
return base.AsyncCommandResult("prepare_image",
command_params,
self._thread_prepare_image).start()
def run_image(self, image_info):
self._validate_image_info(image_info)
command_params = {
"image_info": image_info
}
return base.AsyncCommandResult("run_image",
command_params,
_run_image).start()
def _thread_cache_image(self, image_info):
device = hardware.get_manager().get_os_install_device()
_download_image(image_info)
_write_image(image_info, device)
def _thread_prepare_image(self, image_info, metadata, files):
location = _configdrive_location()
device = hardware.get_manager().get_os_install_device()
_download_image(image_info)
_write_image(image_info, device)
log.debug('Writing configdrive', location=location)
configdrive.write_configdrive(location, metadata, files)
_copy_configdrive_to_disk(location, device)

@ -31,12 +31,11 @@ from teeth_agent import errors
EXPECTED_ERROR = RuntimeError('command execution failed')
class FooTeethAgentCommandResult(base.AsyncCommandResult):
def execute(self):
if self.command_params['fail']:
raise EXPECTED_ERROR
else:
return 'command execution succeeded'
def foo_execute(**kwargs):
if kwargs['fail']:
raise EXPECTED_ERROR
else:
return 'command execution succeeded'
class FakeMode(base.BaseAgentMode):
@ -171,7 +170,8 @@ class TestBaseAgent(unittest.TestCase):
self.agent.heartbeater.start.assert_called_once_with()
def test_async_command_success(self):
result = FooTeethAgentCommandResult('foo_command', {'fail': False})
result = base.AsyncCommandResult('foo_command', {'fail': False},
foo_execute)
expected_result = {
'id': result.id,
'command_name': 'foo_command',
@ -193,7 +193,8 @@ class TestBaseAgent(unittest.TestCase):
self.assertEqualEncoded(result, expected_result)
def test_async_command_failure(self):
result = FooTeethAgentCommandResult('foo_command', {'fail': True})
result = base.AsyncCommandResult('foo_command', {'fail': True},
foo_execute)
expected_result = {
'id': result.id,
'command_name': 'foo_command',

@ -20,6 +20,8 @@ import unittest
from teeth_agent import errors
from teeth_agent import standby
import time
class TestStandbyMode(unittest.TestCase):
def setUp(self):
@ -84,14 +86,12 @@ class TestStandbyMode(unittest.TestCase):
invalid_info)
def test_cache_image_success(self):
result = self.agent_mode.cache_image('cache_image',
self._build_fake_image_info())
result = self.agent_mode.cache_image(self._build_fake_image_info())
result.join()
def test_cache_image_invalid_image_list(self):
self.assertRaises(errors.InvalidCommandParamsError,
self.agent_mode.cache_image,
'cache_image',
{'foo': 'bar'})
def test_image_location(self):
@ -233,3 +233,30 @@ class TestStandbyMode(unittest.TestCase):
standby._run_image)
call_mock.assert_called_once_with(command)
def test_cache_image_async(self):
image_info = self._build_fake_image_info()
self.agent_mode._thread_cache_image = mock.Mock(return_value="test")
async_result = self.agent_mode.cache_image(image_info)
while not async_result.is_done():
time.sleep(0.1)
self.assertEqual("SUCCEEDED", async_result.command_status)
self.assertEqual("test", async_result.command_result)
def test_prepare_image_async(self):
image_info = self._build_fake_image_info()
self.agent_mode._thread_prepare_image = mock.Mock(return_value="test")
async_result = self.agent_mode.prepare_image(image_info, {}, [])
while not async_result.is_done():
time.sleep(0.1)
self.assertEqual("SUCCEEDED", async_result.command_status)
self.assertEqual("test", async_result.command_result)
def test_run_image_async(self):
image_info = self._build_fake_image_info()
standby._run_image = mock.Mock(return_value="test")
async_result = self.agent_mode.run_image(image_info)
while not async_result.is_done():
time.sleep(0.1)
self.assertEqual("SUCCEEDED", async_result.command_status)
self.assertEqual("test", async_result.command_result)