Merge "Track timestamps for start and end of transactions" into stable/mitaka
This commit is contained in:
commit
f45e49a11a
|
@ -59,9 +59,11 @@ def upgrade():
|
|||
upgrade_deployment_graphs_attributes()
|
||||
upgrade_deployment_history_summary()
|
||||
upgrade_node_deployment_info()
|
||||
upgrade_add_task_start_end_time()
|
||||
|
||||
|
||||
def downgrade():
|
||||
downgrade_add_task_start_end_time()
|
||||
downgrade_node_deployment_info()
|
||||
downgrade_deployment_history_summary()
|
||||
downgrade_deployment_graphs_attributes()
|
||||
|
@ -443,3 +445,28 @@ def upgrade_node_deployment_info():
|
|||
|
||||
def downgrade_node_deployment_info():
|
||||
op.drop_table('node_deployment_info')
|
||||
|
||||
|
||||
def upgrade_add_task_start_end_time():
|
||||
op.add_column(
|
||||
'tasks',
|
||||
sa.Column(
|
||||
'time_start',
|
||||
sa.TIMESTAMP(),
|
||||
nullable=True,
|
||||
)
|
||||
)
|
||||
|
||||
op.add_column(
|
||||
'tasks',
|
||||
sa.Column(
|
||||
'time_end',
|
||||
sa.TIMESTAMP(),
|
||||
nullable=True,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def downgrade_add_task_start_end_time():
|
||||
op.drop_column('tasks', 'time_start')
|
||||
op.drop_column('tasks', 'time_end')
|
||||
|
|
|
@ -26,6 +26,7 @@ from sqlalchemy import Index
|
|||
from sqlalchemy import Integer
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy import Text
|
||||
from sqlalchemy import TIMESTAMP
|
||||
from sqlalchemy.orm import relationship, backref, deferred
|
||||
|
||||
from nailgun import consts
|
||||
|
@ -94,6 +95,9 @@ class Task(Base):
|
|||
deployment_history = relationship(
|
||||
"DeploymentHistory", backref="task", cascade="all,delete")
|
||||
|
||||
time_start = Column(TIMESTAMP(), nullable=True)
|
||||
time_end = Column(TIMESTAMP(), nullable=True)
|
||||
|
||||
def __repr__(self):
|
||||
return "<Task '{0}' {1} ({2}) {3}>".format(
|
||||
self.name,
|
||||
|
|
|
@ -31,4 +31,6 @@ class TaskSerializer(BasicSerializer):
|
|||
"parent_id",
|
||||
"dry_run",
|
||||
"graph_type",
|
||||
"time_start",
|
||||
"time_end"
|
||||
)
|
||||
|
|
|
@ -30,4 +30,6 @@ class TransactionSerializer(BasicSerializer):
|
|||
"progress",
|
||||
"dry_run",
|
||||
"graph_type",
|
||||
"time_start",
|
||||
"time_end"
|
||||
)
|
||||
|
|
|
@ -99,14 +99,18 @@ class Task(NailgunObject):
|
|||
messages.append(msg)
|
||||
statuses.append(status)
|
||||
if any(st == 'error' for st in statuses):
|
||||
instance.status = 'error'
|
||||
status = consts.TASK_STATUSES.error
|
||||
else:
|
||||
instance.status = status or instance.status
|
||||
status = status or previous_status
|
||||
|
||||
instance.progress = progress or instance.progress
|
||||
instance.result = result or instance.result
|
||||
# join messages if not None or ""
|
||||
instance.message = '\n'.join([m for m in messages if m])
|
||||
if previous_status != instance.status and instance.cluster_id:
|
||||
|
||||
cls.update_status(instance, status)
|
||||
|
||||
if previous_status != status and instance.cluster_id:
|
||||
logger.debug("Updating cluster status: "
|
||||
"cluster_id: %s status: %s",
|
||||
instance.cluster_id, status)
|
||||
|
@ -123,6 +127,7 @@ class Task(NailgunObject):
|
|||
|
||||
data['status'] = consts.TASK_STATUSES.ready
|
||||
data['progress'] = 100
|
||||
data['time_end'] = datetime.utcnow()
|
||||
data['message'] = u'\n'.join(map(
|
||||
lambda s: s.message, filter(
|
||||
lambda s: s.message is not None, subtasks)))
|
||||
|
@ -137,10 +142,12 @@ class Task(NailgunObject):
|
|||
consts.TASK_STATUSES.ready):
|
||||
subtask.status = consts.TASK_STATUSES.error
|
||||
subtask.progress = 100
|
||||
subtasks.time_end = datetime.utcnow()
|
||||
subtask.message = "Task aborted"
|
||||
|
||||
data['status'] = consts.TASK_STATUSES.error
|
||||
data['progress'] = 100
|
||||
data['time_end'] = datetime.utcnow()
|
||||
data['message'] = u'\n'.join(list(set(map(
|
||||
lambda s: (s.message or ""), filter(
|
||||
lambda s: (
|
||||
|
@ -156,6 +163,7 @@ class Task(NailgunObject):
|
|||
map(lambda s: s.status in (consts.TASK_STATUSES.running,
|
||||
consts.TASK_STATUSES.ready),
|
||||
subtasks)):
|
||||
instance.time_start = datetime.utcnow()
|
||||
instance.status = consts.TASK_STATUSES.running
|
||||
|
||||
else:
|
||||
|
@ -293,19 +301,21 @@ class Task(NailgunObject):
|
|||
def update(cls, instance, data):
|
||||
logger.debug("Updating task: %s", instance.uuid)
|
||||
clean_data = cls._clean_data(data)
|
||||
status = clean_data.pop('status', None)
|
||||
super(Task, cls).update(instance, clean_data)
|
||||
db().flush()
|
||||
if status:
|
||||
cls.update_status(instance, status)
|
||||
|
||||
# update cluster only if task status was updated
|
||||
if instance.cluster_id and 'status' in clean_data:
|
||||
logger.debug("Updating cluster status: %s "
|
||||
"cluster_id: %s status: %s",
|
||||
instance.uuid, instance.cluster_id,
|
||||
data.get('status'))
|
||||
if instance.cluster_id and status:
|
||||
logger.debug(
|
||||
"Updating cluster status: %s cluster_id: %s status: %s",
|
||||
instance.uuid, instance.cluster_id, status)
|
||||
cls._update_cluster_data(instance)
|
||||
|
||||
if instance.parent and \
|
||||
{'status', 'message', 'progress'}.intersection(clean_data):
|
||||
message = clean_data.get('message')
|
||||
progress = clean_data.get('progress')
|
||||
if instance.parent and (status or message or progress):
|
||||
logger.debug("Updating parent task: %s.", instance.parent.uuid)
|
||||
cls._update_parent_instance(instance.parent)
|
||||
|
||||
|
@ -327,6 +337,31 @@ class Task(NailgunObject):
|
|||
.update({'deleted_at': datetime.utcnow()},
|
||||
synchronize_session='fetch')
|
||||
|
||||
@classmethod
|
||||
def update_status(cls, instance, new_status):
|
||||
if instance.status == new_status:
|
||||
return
|
||||
|
||||
finish_status = (
|
||||
consts.TASK_STATUSES.ready, consts.TASK_STATUSES.error
|
||||
)
|
||||
data = {'status': new_status}
|
||||
if instance.status == consts.TASK_STATUSES.pending:
|
||||
data['time_start'] = datetime.utcnow()
|
||||
|
||||
if new_status in finish_status:
|
||||
data['time_end'] = datetime.utcnow()
|
||||
|
||||
super(Task, cls).update(instance, data)
|
||||
|
||||
@classmethod
|
||||
def on_start(cls, instance):
|
||||
instance.time_start = datetime.utcnow()
|
||||
|
||||
@classmethod
|
||||
def on_finish(cls, instance):
|
||||
instance.time_end = datetime.utcnow()
|
||||
|
||||
|
||||
class TaskCollection(NailgunCollection):
|
||||
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from nailgun import consts
|
||||
from nailgun.db import db
|
||||
from nailgun.db.sqlalchemy import models
|
||||
|
@ -94,6 +96,26 @@ class Transaction(NailgunObject):
|
|||
if instance is not None:
|
||||
return instance.tasks_snapshot
|
||||
|
||||
@classmethod
|
||||
def on_start(cls, instance):
|
||||
cls.update(instance, {
|
||||
'time_start': datetime.utcnow(),
|
||||
'status': consts.TASK_STATUSES.running
|
||||
})
|
||||
|
||||
@classmethod
|
||||
def on_finish(cls, instance, status, message=None):
|
||||
data = {
|
||||
'progress': 100,
|
||||
'status': status,
|
||||
'time_end': datetime.utcnow(),
|
||||
}
|
||||
if message is not None:
|
||||
data['message'] = message
|
||||
|
||||
# set time start the same time of there is no time start
|
||||
cls.update(instance, data)
|
||||
|
||||
|
||||
class TransactionCollection(NailgunCollection):
|
||||
|
||||
|
|
|
@ -56,6 +56,11 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
self.assertEqual(task_.cluster_id, None)
|
||||
self.assertNotEqual(task_.deleted_at, None)
|
||||
|
||||
def _check_timing(self, task):
|
||||
self.assertIsNotNone(task.time_start)
|
||||
self.assertIsNotNone(task.time_end)
|
||||
self.assertLessEqual(task.time_start, task.time_end)
|
||||
|
||||
def set_history_ready(self):
|
||||
objects.DeploymentHistoryCollection.all().update(
|
||||
{'status': consts.HISTORY_TASK_STATUSES.ready})
|
||||
|
@ -77,6 +82,7 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
self.env.refresh_nodes()
|
||||
self.assertEqual(supertask.name, TASK_NAMES.deploy)
|
||||
self.assertEqual(supertask.status, consts.TASK_STATUSES.ready)
|
||||
self._check_timing(supertask)
|
||||
# we have three subtasks here
|
||||
# deletion
|
||||
# provision
|
||||
|
@ -87,11 +93,13 @@ class TestTaskManagers(BaseIntegrationTest):
|
|||
t for t in supertask.subtasks
|
||||
if t.name == consts.TASK_NAMES.provision
|
||||
)
|
||||
self._check_timing(provision_task)
|
||||
self.assertEqual(provision_task.weight, 0.4)
|
||||
deployment_task = next(
|
||||
t for t in supertask.subtasks
|
||||
if t.name == consts.TASK_NAMES.deployment
|
||||
)
|
||||
self._check_timing(deployment_task)
|
||||
self.assertEqual(
|
||||
consts.DEFAULT_DEPLOYMENT_GRAPH_TYPE, deployment_task.graph_type
|
||||
)
|
||||
|
|
|
@ -81,6 +81,11 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
|||
progress=100,
|
||||
status=consts.TASK_STATUSES.error)
|
||||
|
||||
def _check_timing(self, task):
|
||||
self.assertIsNotNone(task.time_start)
|
||||
self.assertIsNotNone(task.time_end)
|
||||
self.assertLessEqual(task.time_start, task.time_end)
|
||||
|
||||
@mock.patch('nailgun.transactions.manager.rpc')
|
||||
def test_execute_graph(self, rpc_mock):
|
||||
task = self.manager.execute(graphs=[{"type": "test_graph"}])
|
||||
|
@ -114,6 +119,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
|||
|
||||
self._success(task.subtasks[0].uuid)
|
||||
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
|
||||
self._check_timing(task)
|
||||
self.assertEqual(
|
||||
consts.CLUSTER_STATUSES.operational, self.cluster.status
|
||||
)
|
||||
|
@ -206,7 +212,7 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
|||
|
||||
# Consider we've got success from Astute.
|
||||
self._success(task.subtasks[1].uuid)
|
||||
|
||||
self._check_timing(task.subtasks[1])
|
||||
# Ensure the top leve transaction is ready.
|
||||
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
|
||||
|
||||
|
@ -270,6 +276,8 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
|||
|
||||
self.assertEqual(rpc_mock.cast.call_count, 1)
|
||||
self.assertEqual(task.status, consts.TASK_STATUSES.error)
|
||||
self._check_timing(task.subtasks[0])
|
||||
self._check_timing(task.subtasks[1])
|
||||
self.assertEqual(
|
||||
consts.CLUSTER_STATUSES.partially_deployed, self.cluster.status
|
||||
)
|
||||
|
|
|
@ -170,6 +170,7 @@ class TransactionsManager(object):
|
|||
'status': consts.TASK_STATUSES.running,
|
||||
'dry_run': dry_run or noop_run,
|
||||
})
|
||||
objects.Transaction.on_start(transaction)
|
||||
helpers.TaskHelper.create_action_log(transaction)
|
||||
|
||||
for graph in graphs:
|
||||
|
@ -185,7 +186,7 @@ class TransactionsManager(object):
|
|||
cache['dry_run'] = dry_run
|
||||
cache['debug'] = debug
|
||||
|
||||
sub_transaction = transaction.create_subtask(
|
||||
transaction.create_subtask(
|
||||
self.task_name,
|
||||
status=consts.TASK_STATUSES.pending,
|
||||
dry_run=dry_run or noop_run,
|
||||
|
@ -196,7 +197,6 @@ class TransactionsManager(object):
|
|||
# FIXME: Consider to use a separate set of columns.
|
||||
cache=cache,
|
||||
)
|
||||
helpers.TaskHelper.create_action_log(sub_transaction)
|
||||
|
||||
# We need to commit transaction because asynchronous call below might
|
||||
# be executed in separate process or thread.
|
||||
|
@ -267,6 +267,7 @@ class TransactionsManager(object):
|
|||
_update_transaction(transaction, status, progress, error)
|
||||
|
||||
if status in (consts.TASK_STATUSES.error, consts.TASK_STATUSES.ready):
|
||||
objects.Transaction.on_finish(transaction, status)
|
||||
helpers.TaskHelper.update_action_log(transaction)
|
||||
if transaction.parent:
|
||||
# if transaction is completed successfully,
|
||||
|
@ -277,10 +278,8 @@ class TransactionsManager(object):
|
|||
self.fail(transaction.parent, error)
|
||||
|
||||
def success(self, transaction):
|
||||
objects.Transaction.update(
|
||||
transaction,
|
||||
{'status': consts.TASK_STATUSES.ready, 'progress': 100}
|
||||
)
|
||||
objects.Transaction.on_finish(transaction, consts.TASK_STATUSES.ready)
|
||||
helpers.TaskHelper.update_action_log(transaction)
|
||||
_update_cluster_status(transaction)
|
||||
notifier.notify(
|
||||
consts.NOTIFICATION_TOPICS.done,
|
||||
|
@ -292,19 +291,18 @@ class TransactionsManager(object):
|
|||
)
|
||||
|
||||
def fail(self, transaction, reason):
|
||||
data = {
|
||||
'status': consts.TASK_STATUSES.error,
|
||||
'message': reason,
|
||||
'progress': 100
|
||||
}
|
||||
objects.Transaction.update(transaction, data)
|
||||
objects.Transaction.on_finish(
|
||||
transaction, consts.TASK_STATUSES.error, message=reason
|
||||
)
|
||||
helpers.TaskHelper.update_action_log(transaction)
|
||||
|
||||
data['message'] = 'Aborted'
|
||||
for sub_transaction in transaction.subtasks:
|
||||
if sub_transaction.status == consts.TASK_STATUSES.pending:
|
||||
objects.Transaction.update(sub_transaction, data)
|
||||
helpers.TaskHelper.update_action_log(sub_transaction)
|
||||
# on_start and on_finish called to properly handle
|
||||
# status transition
|
||||
objects.Transaction.on_start(sub_transaction)
|
||||
objects.Transaction.on_finish(
|
||||
sub_transaction, consts.TASK_STATUSES.error, "Aborted"
|
||||
)
|
||||
|
||||
_update_cluster_status(transaction)
|
||||
notifier.notify(
|
||||
|
@ -367,6 +365,8 @@ class TransactionsManager(object):
|
|||
message = make_astute_message(
|
||||
sub_transaction, context, graph, resolver
|
||||
)
|
||||
objects.Transaction.on_start(sub_transaction)
|
||||
helpers.TaskHelper.create_action_log(sub_transaction)
|
||||
|
||||
# Once rpc.cast() is called, the message is sent to Astute. By
|
||||
# that moment all transaction instanced must exist in database,
|
||||
|
|
Loading…
Reference in New Issue