Adds TaskStub class

Partial Task patch, Id I4fbadc9a97e3147128c7c733384c7bb50918806f
removed the result and message vars of the Tasks class. However,
the succeed and fail methods were using those to update them
respectively. This patch introduces a new TaskStub class which
enables the PartialTask functionality without looking result
and message attrs of the Task class.

Fixes bug 1284975

Change-Id: I0b406e4662cfd3cb496b71be77ad10a90c178baa
This commit is contained in:
Nikhil Komawar 2014-02-25 23:28:04 -06:00 committed by Nikhil Komawar
parent dc2672ab4c
commit 0118103eec
13 changed files with 227 additions and 64 deletions

View File

@ -429,8 +429,8 @@ class TaskRepoProxy(glance.domain.proxy.TaskRepo):
self.context = context
super(TaskRepoProxy, self).__init__(task_repo)
def get_task_and_details(self, task_id):
task, task_details = self.task_repo.get_task_and_details(task_id)
def get_task_stub_and_details(self, task_id):
task, task_details = self.task_repo.get_task_stub_and_details(task_id)
return proxy_task(self.context, task), proxy_task_details(self.context,
task,
task_details)

View File

@ -380,9 +380,9 @@ class TaskRepoProxy(glance.domain.proxy.TaskRepo):
task_details_proxy_class=TaskDetailsProxy,
task_details_proxy_kwargs=proxy_kwargs)
def get_task_and_details(self, task_id):
def get_task_stub_and_details(self, task_id):
self.policy.enforce(self.context, 'get_task', {})
return super(TaskRepoProxy, self).get_task_and_details(task_id)
return super(TaskRepoProxy, self).get_task_stub_and_details(task_id)
def list_tasks(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_tasks', {})

View File

@ -56,9 +56,9 @@ class TasksController(object):
task_repo = self.gateway.get_task_repo(req.context)
live_time = CONF.task.task_time_to_live
try:
new_task = task_factory.new_task(task_type=task['type'],
owner=req.context.owner,
task_time_to_live=live_time)
new_task = task_factory.new_task_stub(task_type=task['type'],
owner=req.context.owner,
task_time_to_live=live_time)
new_task_details = task_factory.new_task_details(new_task.task_id,
task['input'])
task_repo.add(new_task, new_task_details)
@ -104,7 +104,7 @@ class TasksController(object):
def get(self, req, task_id):
try:
task_repo = self.gateway.get_task_repo(req.context)
task, task_details = task_repo.get_task_and_details(task_id)
task, task_details = task_repo.get_task_stub_and_details(task_id)
except exception.NotFound as e:
msg = (_("Failed to find task %(task_id)s. Reason: %(reason)s") %
{'task_id': task_id, 'reason': unicode(e)})

View File

@ -283,8 +283,12 @@ class ImageMemberRepo(object):
class TaskRepo(object):
def _format_task_from_db(self, db_task):
return glance.domain.Task(
def __init__(self, context, db_api):
self.context = context
self.db_api = db_api
def _format_task_stub_from_db(self, db_task):
return glance.domain.TaskStub(
task_id=db_task['id'],
task_type=db_task['type'],
status=db_task['status'],
@ -302,7 +306,7 @@ class TaskRepo(object):
message=db_task['message'],
)
def _format_task_to_db(self, task, task_details=None):
def _format_task_stub_and_details_to_db(self, task, task_details=None):
task = {'id': task.task_id,
'type': task.type,
'status': task.status,
@ -323,17 +327,13 @@ class TaskRepo(object):
return task
def __init__(self, context, db_api):
self.context = context
self.db_api = db_api
def get_task_and_details(self, task_id):
def get_task_stub_and_details(self, task_id):
try:
db_api_task = self.db_api.task_get(self.context, task_id)
except (exception.NotFound, exception.Forbidden):
msg = _('Could not find task %s') % task_id
raise exception.NotFound(msg)
return (self._format_task_from_db(db_api_task),
return (self._format_task_stub_from_db(db_api_task),
self._format_task_details_from_db(db_api_task))
def list_tasks(self,
@ -348,10 +348,11 @@ class TaskRepo(object):
limit=limit,
sort_key=sort_key,
sort_dir=sort_dir)
return [self._format_task_from_db(task) for task in db_api_tasks]
return [self._format_task_stub_from_db(task) for task in db_api_tasks]
def save(self, task, task_details=None):
task_values = self._format_task_to_db(task, task_details)
task_values = self._format_task_stub_and_details_to_db(task,
task_details)
try:
updated_values = self.db_api.task_update(self.context,
task.task_id,
@ -362,13 +363,14 @@ class TaskRepo(object):
task.updated_at = updated_values['updated_at']
def add(self, task, task_details=None):
task_values = self._format_task_to_db(task, task_details)
task_values = self._format_task_stub_and_details_to_db(task,
task_details)
updated_values = self.db_api.task_create(self.context, task_values)
task.created_at = updated_values['created_at']
task.updated_at = updated_values['updated_at']
def remove(self, task):
task_values = self._format_task_to_db(task)
task_values = self._format_task_stub_and_details_to_db(task)
try:
self.db_api.task_update(self.context, task.task_id, task_values)
updated_values = self.db_api.task_delete(self.context,

View File

@ -311,7 +311,8 @@ class Task(object):
_supported_task_status = ('pending', 'processing', 'success', 'failure')
def __init__(self, task_id, task_type, status, owner,
expires_at, created_at, updated_at, task_time_to_live=48):
expires_at, created_at, updated_at,
task_input, result, message, task_time_to_live=48):
if task_type not in self._supported_task_type:
raise exception.InvalidTaskType(task_type)
@ -329,14 +330,14 @@ class Task(object):
self._time_to_live = datetime.timedelta(hours=task_time_to_live)
self.created_at = created_at
self.updated_at = updated_at
self.task_input = task_input
self.result = result
self.message = message
@property
def status(self):
return self._status
def run(self, executor):
pass
def _validate_task_status_transition(self, cur_status, new_status):
valid_transitions = {
'pending': ['processing', 'failure'],
@ -384,6 +385,27 @@ class Task(object):
self.expires_at = timeutils.utcnow() + self._time_to_live
class TaskStub(object):
def __init__(self, task_id, task_type, status, owner,
expires_at, created_at, updated_at, task_time_to_live=48):
self.task_id = task_id
self._status = status
self.type = task_type
self.owner = owner
self.expires_at = expires_at
self._time_to_live = datetime.timedelta(hours=task_time_to_live)
self.created_at = created_at
self.updated_at = updated_at
@property
def status(self):
return self._status
def run(self, executor):
pass
class TaskDetails(object):
def __init__(self, task_id, task_input, message, result):
@ -407,6 +429,28 @@ class TaskFactory(object):
created_at = timeutils.utcnow()
updated_at = created_at
return Task(
task_id,
task_type,
status,
owner,
expires_at,
created_at,
updated_at,
None, # input
None, # result
None, # message
task_time_to_live
)
def new_task_stub(self, task_type, owner, task_time_to_live=48):
task_id = str(uuid.uuid4())
status = 'pending'
# Note(nikhil): expires_at would be set on the task, only when it
# succeeds or fails.
expires_at = None
created_at = timeutils.utcnow()
updated_at = created_at
return TaskStub(
task_id,
task_type,
status,

View File

@ -55,8 +55,8 @@ class TaskRepo(object):
self.task_details_proxy_helper = Helper(task_details_proxy_class,
task_details_proxy_kwargs)
def get_task_and_details(self, task_id):
task, task_details = self.base.get_task_and_details(task_id)
def get_task_stub_and_details(self, task_id):
task, task_details = self.base.get_task_stub_and_details(task_id)
return (self.task_proxy_helper.proxy(task),
self.task_details_proxy_helper.proxy(task_details))
@ -178,9 +178,9 @@ class Task(object):
expires_at = _proxy('base', 'expires_at')
created_at = _proxy('base', 'created_at')
updated_at = _proxy('base', 'updated_at')
def run(self, executor):
self.base.run(executor)
task_input = _proxy('base', 'task_input')
result = _proxy('base', 'result')
message = _proxy('base', 'message')
def begin_processing(self):
self.base.begin_processing()
@ -192,6 +192,22 @@ class Task(object):
self.base.fail(message)
class TaskStub(object):
def __init__(self, base):
self.base = base
task_id = _proxy('base', 'task_id')
type = _proxy('base', 'type')
status = _proxy('base', 'status')
owner = _proxy('base', 'owner')
expires_at = _proxy('base', 'expires_at')
created_at = _proxy('base', 'created_at')
updated_at = _proxy('base', 'updated_at')
def run(self, executor):
self.base.run(executor)
class TaskDetails(object):
def __init__(self, base):
self.base = base
@ -218,6 +234,10 @@ class TaskFactory(object):
t = self.base.new_task(**kwargs)
return self.task_helper.proxy(t)
def new_task_stub(self, **kwargs):
t = self.base.new_task_stub(**kwargs)
return self.task_helper.proxy(t)
def new_task_details(self, task_id, task_input, message=None, result=None):
td = self.base.new_task_details(task_id, task_input, message, result)
return self.task_details_helper.proxy(td)

View File

@ -336,11 +336,6 @@ class TaskProxy(glance.domain.proxy.Task):
self.notifier = notifier
super(TaskProxy, self).__init__(task)
def run(self, executor):
self.notifier.info('task.run',
format_task_notification(self.task))
return super(TaskProxy, self).run(executor)
def begin_processing(self):
self.notifier.info(
'task.processing',
@ -359,6 +354,20 @@ class TaskProxy(glance.domain.proxy.Task):
return super(TaskProxy, self).fail(message)
class TaskStubProxy(glance.domain.proxy.TaskStub):
def __init__(self, task, context, notifier):
self.task = task
self.context = context
self.notifier = notifier
super(TaskStubProxy, self).__init__(task)
def run(self, executor):
self.notifier.info('task.run',
format_task_notification(self.task))
return super(TaskStubProxy, self).run(executor)
class TaskDetailsProxy(glance.domain.proxy.TaskDetails):
def __init__(self, task_details, context, notifier):

View File

@ -977,7 +977,7 @@ class TestTaskRepoProxy(utils.BaseTestCase):
def __init__(self, fixtures):
self.fixtures = fixtures
def get_task_and_details(self, task_id):
def get_task_stub_and_details(self, task_id):
for f in self.fixtures:
if f.task_id == task_id:
return f, None
@ -1005,12 +1005,13 @@ class TestTaskRepoProxy(utils.BaseTestCase):
)
def test_get_mutable_task(self):
task, _ = self.task_repo.get_task_and_details(self.fixtures[0].task_id)
task, _ = self.task_repo.get_task_stub_and_details(
self.fixtures[0].task_id)
self.assertEqual(task.task_id, self.fixtures[0].task_id)
def test_get_immutable_task(self):
task_id = self.fixtures[1].task_id
task, task_details = self.task_repo.get_task_and_details(task_id)
task, task_details = self.task_repo.get_task_stub_and_details(task_id)
self.assertRaises(exception.Forbidden,
setattr,
task_details,

View File

@ -562,7 +562,7 @@ class TestTaskRepo(test_utils.BaseTestCase):
[self.db.task_create(None, task) for task in self.tasks]
def test_get(self):
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
self.assertEqual(task.task_id, UUID1)
self.assertEqual(task.type, 'import')
self.assertEqual(task.status, 'pending')
@ -573,12 +573,12 @@ class TestTaskRepo(test_utils.BaseTestCase):
def test_get_not_found(self):
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
self.task_repo.get_task_stub_and_details,
str(uuid.uuid4()))
def test_get_forbidden(self):
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
self.task_repo.get_task_stub_and_details,
UUID4)
def test_list(self):
@ -635,24 +635,24 @@ class TestTaskRepo(test_utils.BaseTestCase):
self.task_repo.add(task, task_details)
retrieved_task, retrieved_task_details = \
self.task_repo.get_task_and_details(task.task_id)
self.task_repo.get_task_stub_and_details(task.task_id)
self.assertEqual(retrieved_task.updated_at, task.updated_at)
self.assertEqual(retrieved_task_details.task_id,
retrieved_task.task_id)
self.assertEqual(retrieved_task_details.input, task_details.input)
def test_save_task(self):
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
original_update_time = task.updated_at
self.task_repo.save(task)
current_update_time = task.updated_at
self.assertTrue(current_update_time > original_update_time)
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
self.assertEqual(task.updated_at, current_update_time)
def test_remove_task(self):
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
self.task_repo.remove(task)
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
self.task_repo.get_task_stub_and_details,
task.task_id)

View File

@ -23,7 +23,6 @@ from oslo.config import cfg
from glance.common import exception
from glance import domain
from glance.openstack.common import timeutils
import glance.tests.unit.utils as unittest_utils
import glance.tests.utils as test_utils
@ -308,11 +307,28 @@ class TestTaskFactory(test_utils.BaseTestCase):
task_type = 'import'
owner = TENANT1
task = self.task_factory.new_task(task_type, owner)
self.assertTrue(task.task_id is not None)
self.assertTrue(task.created_at is not None)
self.assertIsNotNone(task.task_id)
self.assertEqual('pending', task.status)
self.assertEqual(task_type, task.type)
self.assertEqual(owner, task.owner)
self.assertIsNone(task.expires_at)
self.assertIsNotNone(task.created_at)
self.assertEqual(task.created_at, task.updated_at)
self.assertIsNone(task.task_input)
self.assertIsNone(task.result)
self.assertIsNone(task.message)
def test_new_task_stub(self):
task_type = 'import'
owner = TENANT1
task = self.task_factory.new_task_stub(task_type, owner)
self.assertIsNotNone(task.task_id)
self.assertEqual('pending', task.status)
self.assertEqual(task_type, task.type)
self.assertEqual(owner, task.owner)
self.assertIsNone(task.expires_at)
self.assertIsNotNone(task.created_at)
self.assertEqual(task.created_at, task.updated_at)
self.assertEqual(task.status, 'pending')
self.assertEqual(task.owner, TENANT1)
def test_new_task_invalid_type(self):
task_type = 'blah'
@ -347,7 +363,6 @@ class TestTask(test_utils.BaseTestCase):
task_type = 'import'
owner = TENANT1
task_ttl = CONF.task.task_time_to_live
self.gateway = unittest_utils.FakeGateway()
self.task = self.task_factory.new_task(task_type,
owner,
task_time_to_live=task_ttl)
@ -364,7 +379,10 @@ class TestTask(test_utils.BaseTestCase):
owner=None,
expires_at=None,
created_at=timeutils.utcnow(),
updated_at=timeutils.utcnow()
updated_at=timeutils.utcnow(),
task_input=None,
result=None,
message=None
)
def test_validate_status_transition_from_pending(self):
@ -429,6 +447,8 @@ class TestTask(test_utils.BaseTestCase):
self.task.begin_processing()
self.task.succeed('{"location": "file://home"}')
self.assertEqual(self.task.status, 'success')
self.assertEqual(self.task.result, '{"location": "file://home"}')
self.assertEqual(self.task.message, None)
expected = (timeutils.utcnow() +
datetime.timedelta(hours=CONF.task.task_time_to_live))
self.assertEqual(
@ -442,6 +462,8 @@ class TestTask(test_utils.BaseTestCase):
self.task.begin_processing()
self.task.fail('{"message": "connection failed"}')
self.assertEqual(self.task.status, 'failure')
self.assertEqual(self.task.message, '{"message": "connection failed"}')
self.assertEqual(self.task.result, None)
expected = (timeutils.utcnow() +
datetime.timedelta(hours=CONF.task.task_time_to_live))
self.assertEqual(
@ -450,6 +472,49 @@ class TestTask(test_utils.BaseTestCase):
)
class TestTaskStub(test_utils.BaseTestCase):
def setUp(self):
super(TestTaskStub, self).setUp()
self.task_id = str(uuid.uuid4())
self.task_type = 'import'
self.owner = TENANT1
self.task_ttl = CONF.task.task_time_to_live
def test_task_stub_init(self):
self.task_factory = domain.TaskFactory()
task = domain.TaskStub(
self.task_id,
self.task_type,
'status',
self.owner,
'expires_at',
'created_at',
'updated_at',
task_time_to_live=self.task_ttl
)
self.assertEqual(self.task_id, task.task_id)
self.assertEqual(self.task_type, task.type)
self.assertEqual(self.owner, task.owner)
self.assertEqual('status', task.status)
self.assertEqual('expires_at', task.expires_at)
self.assertEqual('created_at', task.created_at)
self.assertEqual('updated_at', task.updated_at)
def test_task_stub_get_status(self):
status = 'pending'
task = domain.TaskStub(
self.task_id,
self.task_type,
status,
self.owner,
'expires_at',
'created_at',
'updated_at',
task_time_to_live=self.task_ttl
)
self.assertEqual(status, task.status)
class TestTaskDetails(test_utils.BaseTestCase):
def setUp(self):
super(TestTaskDetails, self).setUp()

View File

@ -63,10 +63,12 @@ class ImageRepoStub(object):
return ['images_from_list']
class TaskStub(glance.domain.Task):
class TaskStub(glance.domain.TaskStub):
def run(self, executor):
pass
class Task(glance.domain.Task):
def succeed(self, result):
pass
@ -401,17 +403,31 @@ class TestTaskNotifications(utils.BaseTestCase):
def setUp(self):
super(TestTaskNotifications, self).setUp()
self.task = TaskStub(
task_input = {"loc": "fake"}
self.task_stub = TaskStub(
task_id='aaa',
task_type='import',
status='pending',
owner=TENANT2,
expires_at=None,
created_at=DATETIME,
updated_at=DATETIME
updated_at=DATETIME,
)
self.task = Task(
task_id='aaa',
task_type='import',
status='pending',
owner=TENANT2,
expires_at=None,
created_at=DATETIME,
updated_at=DATETIME,
task_input=task_input,
result='res',
message='blah'
)
self.task_details = domain.TaskDetails(task_id=self.task.task_id,
task_input={"loc": "fake"},
task_input=task_input,
result='',
message='')
self.context = glance.context.RequestContext(
@ -430,6 +446,11 @@ class TestTaskNotifications(utils.BaseTestCase):
self.context,
self.notifier
)
self.task_stub_proxy = glance.notifier.TaskStubProxy(
self.task_stub,
self.context,
self.notifier
)
self.task_details_proxy = notifier.TaskDetailsProxy(self.task_details,
self.context,
self.notifier)
@ -442,7 +463,8 @@ class TestTaskNotifications(utils.BaseTestCase):
self.patcher.stop()
def test_task_create_notification(self):
self.task_repo_proxy.add(self.task_proxy, self.task_details_proxy)
self.task_repo_proxy.add(self.task_stub_proxy,
self.task_details_proxy)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]
@ -462,7 +484,7 @@ class TestTaskNotifications(utils.BaseTestCase):
def test_task_delete_notification(self):
now = timeutils.isotime()
self.task_repo_proxy.remove(self.task_proxy)
self.task_repo_proxy.remove(self.task_stub_proxy)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]
@ -485,7 +507,7 @@ class TestTaskNotifications(utils.BaseTestCase):
self.fail('Notification contained location field.')
def test_task_run_notification(self):
self.task_proxy.run(executor=None)
self.task_stub_proxy.run(executor=None)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]

View File

@ -85,7 +85,7 @@ class ImageMembershipStub(object):
class TaskRepoStub(object):
def get_task_and_details(self, *args, **kwargs):
def get_task_stub_and_details(self, *args, **kwargs):
return 'task_from_get', 'task_details_from_get'
def add(self, *args, **kwargs):
@ -386,7 +386,7 @@ class TestTaskPolicy(test_utils.BaseTestCase):
self.policy
)
self.assertRaises(exception.Forbidden,
task_repo.get_task_and_details,
task_repo.get_task_stub_and_details,
UUID1)
def test_get_task_allowed(self):
@ -397,7 +397,7 @@ class TestTaskPolicy(test_utils.BaseTestCase):
{},
self.policy
)
task, task_details = task_repo.get_task_and_details(UUID1)
task, task_details = task_repo.get_task_stub_and_details(UUID1)
self.assertIsInstance(task, glance.api.policy.TaskProxy)
self.assertEqual(task.task, 'task_from_get')

View File

@ -72,7 +72,7 @@ def _domain_fixture(task_id, **kwargs):
'created_at': kwargs.get('created_at', default_datetime),
'updated_at': kwargs.get('updated_at', default_datetime),
}
task = glance.domain.Task(**task_properties)
task = glance.domain.TaskStub(**task_properties)
task_details = glance.domain.TaskDetails(task_id,
kwargs.get('input', {}),
kwargs.get('message', None),