Track timestamps for start and end of transactions

In order to enable building a history of deployment transactions
for a custer it is necessary to keep track of start and end time
for them. This patch adds timestamp_start and timestamp_end attributes
to transactions, makes changes to the API serializers and introduces
appropriate database fields to store that information

I've added times only for 'deployment' task, which is the only
one, needed by new UI feature. At this point there is no single
point, where task switches to error or ready state and I'd have
to set in in many places.

Whem everything-is-a-graph approach will be fully implemented
it would be very easy. So let's not add unnecessary code now.

Co-authored by: Roman Prykhodchenko <me@romcheg.me>
Co-authored by: Dmitry Guryanov <dguryanov@mirantis.com>

DocImpact
Closes-bug: #1593753

Change-Id: Ib206d75a8d2215f8fd6b3f89e209c6e86ed20d0f
This commit is contained in:
Roman Prykhodchenko 2016-09-01 19:32:50 +03:00 committed by Bulat Gaifullin
parent f56a3aa809
commit dd2413fd38
9 changed files with 136 additions and 28 deletions

View File

@ -58,9 +58,11 @@ def upgrade():
upgrade_orchestrator_task_types()
upgrade_node_error_type()
upgrade_deployment_history_summary()
upgrade_add_task_start_end_time()
def downgrade():
downgrade_add_task_start_end_time()
downgrade_cluster_attributes()
downgrade_deployment_history_summary()
downgrade_node_error_type()
@ -302,3 +304,28 @@ def downgrade_cluster_attributes():
id=cluster_id,
info=jsonutils.dumps([]),
)
def upgrade_add_task_start_end_time():
op.add_column(
'tasks',
sa.Column(
'time_start',
sa.TIMESTAMP(),
nullable=True,
)
)
op.add_column(
'tasks',
sa.Column(
'time_end',
sa.TIMESTAMP(),
nullable=True,
)
)
def downgrade_add_task_start_end_time():
op.drop_column('tasks', 'time_start')
op.drop_column('tasks', 'time_end')

View File

@ -26,6 +26,7 @@ from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy import TIMESTAMP
from sqlalchemy.orm import relationship, backref, deferred
from nailgun import consts
@ -94,6 +95,9 @@ class Task(Base):
deployment_history = relationship(
"DeploymentHistory", backref="task", cascade="all,delete")
time_start = Column(TIMESTAMP(), nullable=True)
time_end = Column(TIMESTAMP(), nullable=True)
def __repr__(self):
return "<Task '{0}' {1} ({2}) {3}>".format(
self.name,

View File

@ -31,4 +31,6 @@ class TaskSerializer(BasicSerializer):
"parent_id",
"dry_run",
"graph_type",
"time_start",
"time_end"
)

View File

@ -30,4 +30,6 @@ class TransactionSerializer(BasicSerializer):
"progress",
"dry_run",
"graph_type",
"time_start",
"time_end"
)

View File

@ -99,14 +99,18 @@ class Task(NailgunObject):
messages.append(msg)
statuses.append(status)
if any(st == 'error' for st in statuses):
instance.status = 'error'
status = consts.TASK_STATUSES.error
else:
instance.status = status or instance.status
status = status or previous_status
instance.progress = progress or instance.progress
instance.result = result or instance.result
# join messages if not None or ""
instance.message = '\n'.join([m for m in messages if m])
if previous_status != instance.status and instance.cluster_id:
cls.update_status(instance, status)
if previous_status != status and instance.cluster_id:
logger.debug("Updating cluster status: "
"cluster_id: %s status: %s",
instance.cluster_id, status)
@ -123,6 +127,7 @@ class Task(NailgunObject):
data['status'] = consts.TASK_STATUSES.ready
data['progress'] = 100
data['time_end'] = datetime.utcnow()
data['message'] = u'\n'.join(map(
lambda s: s.message, filter(
lambda s: s.message is not None, subtasks)))
@ -137,10 +142,12 @@ class Task(NailgunObject):
consts.TASK_STATUSES.ready):
subtask.status = consts.TASK_STATUSES.error
subtask.progress = 100
subtasks.time_end = datetime.utcnow()
subtask.message = "Task aborted"
data['status'] = consts.TASK_STATUSES.error
data['progress'] = 100
data['time_end'] = datetime.utcnow()
data['message'] = u'\n'.join(list(set(map(
lambda s: (s.message or ""), filter(
lambda s: (
@ -156,6 +163,7 @@ class Task(NailgunObject):
map(lambda s: s.status in (consts.TASK_STATUSES.running,
consts.TASK_STATUSES.ready),
subtasks)):
instance.time_start = datetime.utcnow()
instance.status = consts.TASK_STATUSES.running
else:
@ -293,19 +301,21 @@ class Task(NailgunObject):
def update(cls, instance, data):
logger.debug("Updating task: %s", instance.uuid)
clean_data = cls._clean_data(data)
status = clean_data.pop('status', None)
super(Task, cls).update(instance, clean_data)
db().flush()
if status:
cls.update_status(instance, status)
# update cluster only if task status was updated
if instance.cluster_id and 'status' in clean_data:
logger.debug("Updating cluster status: %s "
"cluster_id: %s status: %s",
instance.uuid, instance.cluster_id,
data.get('status'))
if instance.cluster_id and status:
logger.debug(
"Updating cluster status: %s cluster_id: %s status: %s",
instance.uuid, instance.cluster_id, status)
cls._update_cluster_data(instance)
if instance.parent and \
{'status', 'message', 'progress'}.intersection(clean_data):
message = clean_data.get('message')
progress = clean_data.get('progress')
if instance.parent and (status or message or progress):
logger.debug("Updating parent task: %s.", instance.parent.uuid)
cls._update_parent_instance(instance.parent)
@ -327,6 +337,31 @@ class Task(NailgunObject):
.update({'deleted_at': datetime.utcnow()},
synchronize_session='fetch')
@classmethod
def update_status(cls, instance, new_status):
if instance.status == new_status:
return
finish_status = (
consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error
)
data = {'status': new_status}
if instance.status == consts.TASK_STATUSES.pending:
data['time_start'] = datetime.utcnow()
if new_status in finish_status:
data['time_end'] = datetime.utcnow()
super(Task, cls).update(instance, data)
@classmethod
def on_start(cls, instance):
instance.time_start = datetime.utcnow()
@classmethod
def on_finish(cls, instance):
instance.time_end = datetime.utcnow()
class TaskCollection(NailgunCollection):

View File

@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from nailgun import consts
from nailgun.db import db
from nailgun.db.sqlalchemy import models
@ -99,6 +101,26 @@ class Transaction(NailgunObject):
if instance is not None:
return instance.tasks_snapshot
@classmethod
def on_start(cls, instance):
cls.update(instance, {
'time_start': datetime.utcnow(),
'status': consts.TASK_STATUSES.running
})
@classmethod
def on_finish(cls, instance, status, message=None):
data = {
'progress': 100,
'status': status,
'time_end': datetime.utcnow(),
}
if message is not None:
data['message'] = message
# set time start the same time of there is no time start
cls.update(instance, data)
class TransactionCollection(NailgunCollection):

View File

@ -53,6 +53,11 @@ class TestTaskManagers(BaseIntegrationTest):
self.assertEqual(task_.cluster_id, None)
self.assertNotEqual(task_.deleted_at, None)
def _check_timing(self, task):
self.assertIsNotNone(task.time_start)
self.assertIsNotNone(task.time_end)
self.assertLessEqual(task.time_start, task.time_end)
def set_history_ready(self):
objects.DeploymentHistoryCollection.all().update(
{'status': consts.HISTORY_TASK_STATUSES.ready})
@ -74,6 +79,7 @@ class TestTaskManagers(BaseIntegrationTest):
self.env.refresh_nodes()
self.assertEqual(supertask.name, TASK_NAMES.deploy)
self.assertEqual(supertask.status, consts.TASK_STATUSES.ready)
self._check_timing(supertask)
# we have three subtasks here
# deletion
# provision
@ -84,11 +90,13 @@ class TestTaskManagers(BaseIntegrationTest):
t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.provision
)
self._check_timing(provision_task)
self.assertEqual(provision_task.weight, 0.4)
deployment_task = next(
t for t in supertask.subtasks
if t.name == consts.TASK_NAMES.deployment
)
self._check_timing(deployment_task)
self.assertEqual(
consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE, deployment_task.graph_type
)

View File

@ -81,6 +81,11 @@ class TestTransactionManager(base.BaseIntegrationTest):
progress=100,
status=consts.TASK_STATUSES.error)
def _check_timing(self, task):
self.assertIsNotNone(task.time_start)
self.assertIsNotNone(task.time_end)
self.assertLessEqual(task.time_start, task.time_end)
@mock.patch('nailgun.transactions.manager.rpc')
def test_execute_graph(self, rpc_mock):
task = self.manager.execute(graphs=[{"type": "test_graph"}])
@ -114,6 +119,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
self._success(task.subtasks[0].uuid)
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
self._check_timing(task)
self.assertEqual(
consts.CLUSTER_STATUSES.operational, self.cluster.status
)
@ -206,7 +212,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
# Consider we've got success from Astute.
self._success(task.subtasks[1].uuid)
self._check_timing(task.subtasks[1])
# Ensure the top leve transaction is ready.
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
@ -270,6 +276,8 @@ class TestTransactionManager(base.BaseIntegrationTest):
self.assertEqual(rpc_mock.cast.call_count, 1)
self.assertEqual(task.status, consts.TASK_STATUSES.error)
self._check_timing(task.subtasks[0])
self._check_timing(task.subtasks[1])
self.assertEqual(
consts.CLUSTER_STATUSES.partially_deployed, self.cluster.status
)

View File

@ -170,6 +170,7 @@ class TransactionsManager(object):
'status': consts.TASK_STATUSES.running,
'dry_run': dry_run or noop_run,
})
objects.Transaction.on_start(transaction)
helpers.TaskHelper.create_action_log(transaction)
for graph in graphs:
@ -185,7 +186,7 @@ class TransactionsManager(object):
cache['dry_run'] = dry_run
cache['debug'] = debug
sub_transaction = transaction.create_subtask(
transaction.create_subtask(
self.task_name,
status=consts.TASK_STATUSES.pending,
dry_run=dry_run or noop_run,
@ -196,7 +197,6 @@ class TransactionsManager(object):
# FIXME: Consider to use a separate set of columns.
cache=cache,
)
helpers.TaskHelper.create_action_log(sub_transaction)
# We need to commit transaction because asynchronous call below might
# be executed in separate process or thread.
@ -267,6 +267,7 @@ class TransactionsManager(object):
_update_transaction(transaction, status, progress, error)
if status in (consts.TASK_STATUSES.error, consts.TASK_STATUSES.ready):
objects.Transaction.on_finish(transaction, status)
helpers.TaskHelper.update_action_log(transaction)
if transaction.parent:
# if transaction is completed successfully,
@ -277,10 +278,8 @@ class TransactionsManager(object):
self.fail(transaction.parent, error)
def success(self, transaction):
objects.Transaction.update(
transaction,
{'status': consts.TASK_STATUSES.ready, 'progress': 100}
)
objects.Transaction.on_finish(transaction, consts.TASK_STATUSES.ready)
helpers.TaskHelper.update_action_log(transaction)
_update_cluster_status(transaction)
notifier.notify(
consts.NOTIFICATION_TOPICS.done,
@ -292,19 +291,18 @@ class TransactionsManager(object):
)
def fail(self, transaction, reason):
data = {
'status': consts.TASK_STATUSES.error,
'message': reason,
'progress': 100
}
objects.Transaction.update(transaction, data)
objects.Transaction.on_finish(
transaction, consts.TASK_STATUSES.error, message=reason
)
helpers.TaskHelper.update_action_log(transaction)
data['message'] = 'Aborted'
for sub_transaction in transaction.subtasks:
if sub_transaction.status == consts.TASK_STATUSES.pending:
objects.Transaction.update(sub_transaction, data)
helpers.TaskHelper.update_action_log(sub_transaction)
# on_start and on_finish called to properly handle
# status transition
objects.Transaction.on_start(sub_transaction)
objects.Transaction.on_finish(
sub_transaction, consts.TASK_STATUSES.error, "Aborted"
)
_update_cluster_status(transaction)
notifier.notify(
@ -367,6 +365,8 @@ class TransactionsManager(object):
message = make_astute_message(
sub_transaction, context, graph, resolver
)
objects.Transaction.on_start(sub_transaction)
helpers.TaskHelper.create_action_log(sub_transaction)
# Once rpc.cast() is called, the message is sent to Astute. By
# that moment all transaction instanced must exist in database,