Fixed actions on transaction comletion
- update cluster status - register notifications - recursivly update all sub-transaction statuses on error Change-Id: I4783a7b0ef81eaec8c06bb7c736082570a7096d2 Blueprint: graph-concept-extension
This commit is contained in:
parent
4b9a1b84e5
commit
d20f65be7b
@ -289,60 +289,6 @@ class Task(NailgunObject):
|
||||
result.pop('status', None)
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def update_recursively(cls, instance, data):
|
||||
logger.debug("Updating task: %s", instance.uuid)
|
||||
clean_data = cls._clean_data(data)
|
||||
super(Task, cls).update(instance, data)
|
||||
if instance.parent:
|
||||
parent = instance.parent
|
||||
siblings = parent.subtasks
|
||||
status = clean_data.get('status')
|
||||
if status == consts.TASK_STATUSES.ready:
|
||||
clean_data['progress'] = 100
|
||||
instance.progress = 100
|
||||
ready_siblings_count = sum(
|
||||
x.status == consts.TASK_STATUSES.ready for x in siblings
|
||||
)
|
||||
if ready_siblings_count == len(siblings):
|
||||
parent.status = consts.TASK_STATUSES.ready
|
||||
elif status == consts.TASK_STATUSES.error:
|
||||
parent.status = consts.TASK_STATUSES.error
|
||||
for s in siblings:
|
||||
if s.status != consts.TASK_STATUSES.ready:
|
||||
s.status = consts.TASK_STATUSES.error
|
||||
s.progress = 100
|
||||
s.message = "Task aborted"
|
||||
clean_data['progress'] = 100
|
||||
instance.progress = 100
|
||||
TaskHelper.update_action_log(parent)
|
||||
elif status == consts.TASK_STATUSES.running:
|
||||
parent.status = consts.TASK_STATUSES.running
|
||||
|
||||
if 'progress' in clean_data:
|
||||
total_progress = sum(x.progress for x in siblings)
|
||||
parent.progress = total_progress // len(siblings)
|
||||
|
||||
task_status = parent.status
|
||||
else:
|
||||
task_status = instance.status
|
||||
|
||||
if not instance.dry_run:
|
||||
if task_status == consts.TASK_STATUSES.ready:
|
||||
cls._update_cluster_status(
|
||||
instance.cluster,
|
||||
consts.CLUSTER_STATUSES.operational,
|
||||
consts.NODE_STATUSES.ready
|
||||
)
|
||||
elif task_status == consts.TASK_STATUSES.error:
|
||||
cls._update_cluster_status(
|
||||
instance.cluster,
|
||||
consts.CLUSTER_STATUSES.error,
|
||||
None
|
||||
)
|
||||
|
||||
db().flush()
|
||||
|
||||
@classmethod
|
||||
def update(cls, instance, data):
|
||||
logger.debug("Updating task: %s", instance.uuid)
|
||||
|
@ -241,7 +241,6 @@ class NailgunReceiver(object):
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True,
|
||||
)
|
||||
|
||||
manager = transactions.TransactionsManager(transaction.cluster.id)
|
||||
manager.process(transaction, kwargs)
|
||||
|
||||
@ -495,7 +494,13 @@ class NailgunReceiver(object):
|
||||
elif status == consts.TASK_STATUSES.ready:
|
||||
data = cls._success_action(task, status, progress, nodes)
|
||||
else:
|
||||
data = {'status': status, 'progress': progress, 'message': message}
|
||||
data = {}
|
||||
if status:
|
||||
data['status'] = status
|
||||
if progress:
|
||||
data['progress'] = progress
|
||||
if message:
|
||||
data['message'] = message
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
|
@ -112,6 +112,9 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
||||
|
||||
self._success(task.subtasks[0].uuid)
|
||||
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
|
||||
self.assertEqual(
|
||||
consts.CLUSTER_STATUSES.operational, self.cluster.status
|
||||
)
|
||||
|
||||
@mock.patch('nailgun.transactions.manager.rpc')
|
||||
def test_execute_few_graphs(self, rpc_mock):
|
||||
@ -258,6 +261,9 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
||||
|
||||
self.assertEqual(rpc_mock.cast.call_count, 1)
|
||||
self.assertEqual(task.status, consts.TASK_STATUSES.error)
|
||||
self.assertEqual(
|
||||
consts.CLUSTER_STATUSES.partially_deployed, self.cluster.status
|
||||
)
|
||||
|
||||
@mock.patch('nailgun.transactions.manager.rpc')
|
||||
def test_execute_w_task(self, rpc_mock):
|
||||
@ -347,6 +353,10 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
||||
|
||||
@mock.patch('nailgun.transactions.manager.rpc')
|
||||
def test_execute_dry_run(self, rpc_mock):
|
||||
node = self.cluster.nodes[0]
|
||||
node.pending_roles = ['compute']
|
||||
self.cluster.status = consts.CLUSTER_STATUSES.new
|
||||
|
||||
task = self.manager.execute(
|
||||
graphs=[{"type": "test_graph"}], dry_run=True)
|
||||
|
||||
@ -377,6 +387,31 @@ class TestTransactionManager(base.BaseIntegrationTest):
|
||||
|
||||
self._success(task.subtasks[0].uuid)
|
||||
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
|
||||
self.assertEqual(['compute'], node.pending_roles)
|
||||
self.assertEqual(consts.CLUSTER_STATUSES.new, self.cluster.status)
|
||||
|
||||
@mock.patch('nailgun.transactions.manager.rpc')
|
||||
def test_execute_graph_fails_on_some_nodes(self, rpc_mock):
|
||||
task = self.manager.execute(graphs=[{"type": "test_graph"}])
|
||||
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
|
||||
self.assertEqual(1, rpc_mock.cast.call_count)
|
||||
|
||||
self.receiver.transaction_resp(
|
||||
task_uuid=task.uuid,
|
||||
nodes=[
|
||||
{'uid': n.uid, 'status': consts.NODE_STATUSES.error}
|
||||
for n in self.cluster.nodes[:1]
|
||||
] + [
|
||||
{'uid': n.uid, 'status': consts.NODE_STATUSES.ready}
|
||||
for n in self.cluster.nodes[1:]
|
||||
],
|
||||
progress=100,
|
||||
status=consts.TASK_STATUSES.ready)
|
||||
self._success(task.subtasks[0].uuid)
|
||||
self.assertEqual(task.status, consts.TASK_STATUSES.ready)
|
||||
self.assertEqual(
|
||||
consts.CLUSTER_STATUSES.partially_deployed, self.cluster.status
|
||||
)
|
||||
|
||||
@mock.patch('nailgun.transactions.manager.rpc')
|
||||
def test_execute_on_one_node(self, rpc_mock):
|
||||
|
@ -933,159 +933,6 @@ class TestTaskObject(BaseIntegrationTest):
|
||||
self.assertEquals(consts.TASK_STATUSES.ready, task_obj.status)
|
||||
|
||||
|
||||
class TestTransactionObject(BaseIntegrationTest):
|
||||
def setUp(self):
|
||||
super(TestTransactionObject, self).setUp()
|
||||
self.cluster = self.env.create(
|
||||
nodes_kwargs=[
|
||||
{'roles': ['controller']},
|
||||
{'roles': ['compute']},
|
||||
{'roles': ['cinder']}])
|
||||
|
||||
def test_get_last_success_run(self):
|
||||
objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.pending
|
||||
})
|
||||
objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.error
|
||||
})
|
||||
transaction = objects.TransactionCollection.get_last_succeed_run(
|
||||
self.cluster
|
||||
)
|
||||
self.assertIsNone(transaction)
|
||||
objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
finished2 = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
transaction = objects.TransactionCollection.get_last_succeed_run(
|
||||
self.cluster
|
||||
)
|
||||
self.assertEqual(finished2.id, transaction.id)
|
||||
|
||||
def test_get_deployment_info(self):
|
||||
transaction = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
self.assertEquals(
|
||||
objects.Transaction.get_deployment_info(transaction),
|
||||
{}
|
||||
)
|
||||
info = {'test': {'test': 'test'}}
|
||||
objects.Transaction.attach_deployment_info(transaction, info)
|
||||
self.assertEqual(
|
||||
info, objects.Transaction.get_deployment_info(transaction)
|
||||
)
|
||||
self.assertEqual(objects.Transaction.get_deployment_info(None), {})
|
||||
|
||||
def test_get_cluster_settings(self):
|
||||
transaction = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
self.assertIsNone(
|
||||
objects.Transaction.get_cluster_settings(transaction)
|
||||
)
|
||||
info = {'test': 'test'}
|
||||
objects.Transaction.attach_cluster_settings(transaction, info)
|
||||
self.assertEqual(
|
||||
info, objects.Transaction.get_cluster_settings(transaction)
|
||||
)
|
||||
self.assertIsNone(objects.Transaction.get_cluster_settings(None))
|
||||
|
||||
def test_get_network_settings(self):
|
||||
transaction = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
self.assertIsNone(
|
||||
objects.Transaction.get_network_settings(transaction)
|
||||
)
|
||||
info = {'test': 'test'}
|
||||
objects.Transaction.attach_network_settings(transaction, info)
|
||||
self.assertEqual(
|
||||
info, objects.Transaction.get_network_settings(transaction)
|
||||
)
|
||||
self.assertIsNone(objects.Transaction.get_network_settings(None))
|
||||
|
||||
def test_get_successful_transactions_per_task(self):
|
||||
history_collection = objects.DeploymentHistoryCollection
|
||||
get_succeed = (
|
||||
objects.TransactionCollection.get_successful_transactions_per_task
|
||||
)
|
||||
uid1 = '1'
|
||||
uid2 = '2'
|
||||
|
||||
tasks_graph = {
|
||||
None: [
|
||||
{'id': 'post_deployment_start'},
|
||||
{'id': 'post_deployment_end'}
|
||||
],
|
||||
uid1: [{'id': 'dns-client'}]
|
||||
}
|
||||
|
||||
def make_task_with_history(task_status, graph):
|
||||
task = self.env.create_task(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=task_status,
|
||||
cluster_id=self.cluster.id)
|
||||
|
||||
history_collection.create(task, graph)
|
||||
|
||||
history_collection.all().update(
|
||||
{'status': consts.HISTORY_TASK_STATUSES.ready})
|
||||
return task
|
||||
|
||||
# create some tasks in history
|
||||
task1 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id, ['dns-client']).all()
|
||||
self.assertEqual(transactions, [(task1, uid1, 'dns-client')])
|
||||
|
||||
# remove 'dns-client' and add 'test' to graph for two nodes
|
||||
tasks_graph[uid1] = tasks_graph[uid2] = [{'id': 'test'}]
|
||||
task2 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id, ['test']).all()
|
||||
self.assertEqual(transactions, [(task2, uid1, 'test'),
|
||||
(task2, uid2, 'test')])
|
||||
|
||||
# remove 'test' and add 'dns-client' to graph, leave node2 as previous
|
||||
tasks_graph[uid1] = [{'id': 'dns-client'}]
|
||||
task3 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id,
|
||||
['dns-client', 'test']).all()
|
||||
|
||||
# now we should find both `test` and `dns-client` transactions
|
||||
# on node 1 and onle `test` on node 2
|
||||
self.assertEqual(
|
||||
transactions,
|
||||
[(task3, uid1, 'dns-client'),
|
||||
(task2, uid1, 'test'),
|
||||
(task3, uid2, 'test')]
|
||||
)
|
||||
|
||||
# filter out node 2
|
||||
transactions = get_succeed(self.cluster.id,
|
||||
['dns-client', 'test'], [uid1]).all()
|
||||
self.assertEqual(
|
||||
transactions,
|
||||
[(task3, uid1, 'dns-client'),
|
||||
(task2, uid1, 'test')]
|
||||
)
|
||||
|
||||
|
||||
class TestActionLogObject(BaseIntegrationTest):
|
||||
|
||||
def _create_log_entry(self, object_data):
|
||||
@ -2198,126 +2045,6 @@ class TestOpenstackConfigCollection(BaseTestCase):
|
||||
self.assertEqual(configs[0].config_type,
|
||||
consts.OPENSTACK_CONFIG_TYPES.node)
|
||||
|
||||
def test_task_update_recursively(self):
|
||||
parent = self.env.create_task(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=consts.TASK_STATUSES.pending,
|
||||
cluster_id=self.cluster.id
|
||||
)
|
||||
child1 = parent.create_subtask(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=consts.TASK_STATUSES.pending
|
||||
)
|
||||
child2 = parent.create_subtask(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=consts.TASK_STATUSES.pending
|
||||
)
|
||||
# update progress for child1
|
||||
objects.Task.update_recursively(
|
||||
child1, {'status': consts.TASK_STATUSES.running, 'progress': 50}
|
||||
)
|
||||
self.assertEqual(50, child1.progress)
|
||||
self.assertEqual(consts.TASK_STATUSES.running, child1.status)
|
||||
self.assertEqual(25, parent.progress)
|
||||
self.assertEqual(consts.TASK_STATUSES.running, parent.status)
|
||||
# finish child 1
|
||||
objects.Task.update_recursively(
|
||||
child1, {'status': consts.TASK_STATUSES.ready}
|
||||
)
|
||||
self.assertEqual(100, child1.progress)
|
||||
self.assertEqual(consts.TASK_STATUSES.ready, child1.status)
|
||||
self.assertEqual(50, parent.progress)
|
||||
self.assertEqual(consts.TASK_STATUSES.running, parent.status)
|
||||
# finish child 2
|
||||
objects.Task.update_recursively(
|
||||
child2, {'status': consts.TASK_STATUSES.ready}
|
||||
)
|
||||
self.assertEqual(100, parent.progress)
|
||||
self.assertEqual(consts.TASK_STATUSES.ready, parent.status)
|
||||
# fail child 2 when child1 is ready
|
||||
objects.Task.update_recursively(
|
||||
child2, {'status': consts.TASK_STATUSES.error}
|
||||
)
|
||||
self.assertEqual(100, parent.progress)
|
||||
self.assertEqual(consts.TASK_STATUSES.error, parent.status)
|
||||
self.assertEqual(consts.TASK_STATUSES.ready, child1.status)
|
||||
child1.status = consts.TASK_STATUSES.running
|
||||
objects.Task.update_recursively(
|
||||
child2, {'status': consts.TASK_STATUSES.error}
|
||||
)
|
||||
self.assertEqual(100, parent.progress)
|
||||
self.assertEqual(consts.TASK_STATUSES.error, parent.status)
|
||||
self.assertEqual(consts.TASK_STATUSES.error, child1.status)
|
||||
|
||||
def test_update_cluster_status_on_updating_task_status(self):
|
||||
with mock.patch.object(objects.Task, '_update_cluster_status') as m:
|
||||
task = self.env.create_task(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=consts.TASK_STATUSES.running,
|
||||
cluster_id=self.cluster.id,
|
||||
dry_run=True
|
||||
)
|
||||
child1 = task.create_subtask(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=consts.TASK_STATUSES.pending,
|
||||
dry_run=True
|
||||
)
|
||||
child2 = task.create_subtask(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=consts.TASK_STATUSES.pending,
|
||||
dry_run=True
|
||||
)
|
||||
objects.Task.update_recursively(
|
||||
task, {'status': consts.TASK_STATUSES.error}
|
||||
)
|
||||
objects.Task.update_recursively(
|
||||
task, {'status': consts.TASK_STATUSES.ready}
|
||||
)
|
||||
|
||||
task.dry_run = False
|
||||
child1.dry_run = False
|
||||
child2.dry_run = False
|
||||
task.status = consts.TASK_STATUSES.running
|
||||
objects.Task.update_recursively(
|
||||
child1, {'status': consts.TASK_STATUSES.ready}
|
||||
)
|
||||
self.assertEqual(0, m.call_count)
|
||||
|
||||
objects.Task.update_recursively(
|
||||
child2, {'status': consts.TASK_STATUSES.ready}
|
||||
)
|
||||
m.assert_called_with(
|
||||
task.cluster,
|
||||
consts.CLUSTER_STATUSES.operational,
|
||||
consts.NODE_STATUSES.ready
|
||||
)
|
||||
objects.Task.update_recursively(
|
||||
child2, {'status': consts.TASK_STATUSES.error}
|
||||
)
|
||||
|
||||
m.assert_called_with(
|
||||
task.cluster,
|
||||
consts.CLUSTER_STATUSES.error,
|
||||
None
|
||||
)
|
||||
|
||||
objects.Task.update_recursively(
|
||||
task, {'status': consts.TASK_STATUSES.ready}
|
||||
)
|
||||
m.assert_called_with(
|
||||
task.cluster,
|
||||
consts.CLUSTER_STATUSES.operational,
|
||||
consts.NODE_STATUSES.ready
|
||||
)
|
||||
objects.Task.update_recursively(
|
||||
task, {'status': consts.TASK_STATUSES.error}
|
||||
)
|
||||
m.assert_called_with(
|
||||
task.cluster,
|
||||
consts.CLUSTER_STATUSES.error,
|
||||
None
|
||||
)
|
||||
|
||||
|
||||
class TestNodeStatus(BaseTestCase):
|
||||
def setUp(self):
|
||||
|
173
nailgun/nailgun/test/unit/test_transaction_object.py
Normal file
173
nailgun/nailgun/test/unit/test_transaction_object.py
Normal file
@ -0,0 +1,173 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright 2014 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from nailgun.test.base import BaseTestCase
|
||||
|
||||
from nailgun import consts
|
||||
from nailgun import objects
|
||||
|
||||
|
||||
class TestTransactionObject(BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestTransactionObject, self).setUp()
|
||||
self.cluster = self.env.create(
|
||||
nodes_kwargs=[
|
||||
{'roles': ['controller']},
|
||||
{'roles': ['compute']},
|
||||
{'roles': ['cinder']}])
|
||||
|
||||
def test_get_last_success_run(self):
|
||||
objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.pending
|
||||
})
|
||||
objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.error
|
||||
})
|
||||
transaction = objects.TransactionCollection.get_last_succeed_run(
|
||||
self.cluster
|
||||
)
|
||||
self.assertIsNone(transaction)
|
||||
objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
finished2 = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
transaction = objects.TransactionCollection.get_last_succeed_run(
|
||||
self.cluster
|
||||
)
|
||||
self.assertEqual(finished2.id, transaction.id)
|
||||
|
||||
def test_get_deployment_info(self):
|
||||
transaction = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
self.assertEquals(
|
||||
objects.Transaction.get_deployment_info(transaction),
|
||||
{}
|
||||
)
|
||||
info = {'test': {'test': 'test'}}
|
||||
objects.Transaction.attach_deployment_info(transaction, info)
|
||||
self.assertEqual(
|
||||
info, objects.Transaction.get_deployment_info(transaction)
|
||||
)
|
||||
self.assertEqual(objects.Transaction.get_deployment_info(None), {})
|
||||
|
||||
def test_get_cluster_settings(self):
|
||||
transaction = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
self.assertIsNone(
|
||||
objects.Transaction.get_cluster_settings(transaction)
|
||||
)
|
||||
info = {'test': 'test'}
|
||||
objects.Transaction.attach_cluster_settings(transaction, info)
|
||||
self.assertEqual(
|
||||
info, objects.Transaction.get_cluster_settings(transaction)
|
||||
)
|
||||
self.assertIsNone(objects.Transaction.get_cluster_settings(None))
|
||||
|
||||
def test_get_network_settings(self):
|
||||
transaction = objects.Transaction.create({
|
||||
'cluster_id': self.cluster.id,
|
||||
'name': consts.TASK_NAMES.deployment,
|
||||
'status': consts.TASK_STATUSES.ready
|
||||
})
|
||||
self.assertIsNone(
|
||||
objects.Transaction.get_network_settings(transaction)
|
||||
)
|
||||
info = {'test': 'test'}
|
||||
objects.Transaction.attach_network_settings(transaction, info)
|
||||
self.assertEqual(
|
||||
info, objects.Transaction.get_network_settings(transaction)
|
||||
)
|
||||
self.assertIsNone(objects.Transaction.get_network_settings(None))
|
||||
|
||||
def test_get_successful_transactions_per_task(self):
|
||||
history_collection = objects.DeploymentHistoryCollection
|
||||
get_succeed = (
|
||||
objects.TransactionCollection.get_successful_transactions_per_task
|
||||
)
|
||||
uid1 = '1'
|
||||
uid2 = '2'
|
||||
|
||||
tasks_graph = {
|
||||
None: [
|
||||
{'id': 'post_deployment_start'},
|
||||
{'id': 'post_deployment_end'}
|
||||
],
|
||||
uid1: [{'id': 'dns-client'}]
|
||||
}
|
||||
|
||||
def make_task_with_history(task_status, graph):
|
||||
task = self.env.create_task(
|
||||
name=consts.TASK_NAMES.deployment,
|
||||
status=task_status,
|
||||
cluster_id=self.cluster.id)
|
||||
|
||||
history_collection.create(task, graph)
|
||||
|
||||
history_collection.all().update(
|
||||
{'status': consts.HISTORY_TASK_STATUSES.ready})
|
||||
return task
|
||||
|
||||
# create some tasks in history
|
||||
task1 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id, ['dns-client']).all()
|
||||
self.assertEqual(transactions, [(task1, uid1, 'dns-client')])
|
||||
|
||||
# remove 'dns-client' and add 'test' to graph for two nodes
|
||||
tasks_graph[uid1] = tasks_graph[uid2] = [{'id': 'test'}]
|
||||
task2 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id, ['test']).all()
|
||||
self.assertEqual(transactions, [(task2, uid1, 'test'),
|
||||
(task2, uid2, 'test')])
|
||||
|
||||
# remove 'test' and add 'dns-client' to graph, leave node2 as previous
|
||||
tasks_graph[uid1] = [{'id': 'dns-client'}]
|
||||
task3 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id,
|
||||
['dns-client', 'test']).all()
|
||||
|
||||
# now we should find both `test` and `dns-client` transactions
|
||||
# on node 1 and onle `test` on node 2
|
||||
self.assertEqual(
|
||||
transactions,
|
||||
[(task3, uid1, 'dns-client'),
|
||||
(task2, uid1, 'test'),
|
||||
(task3, uid2, 'test')]
|
||||
)
|
||||
|
||||
# filter out node 2
|
||||
transactions = get_succeed(self.cluster.id,
|
||||
['dns-client', 'test'], [uid1]).all()
|
||||
self.assertEqual(
|
||||
transactions,
|
||||
[(task3, uid1, 'dns-client'),
|
||||
(task2, uid1, 'test')]
|
||||
)
|
@ -86,16 +86,14 @@ class try_transaction(object):
|
||||
* create an action log record on start/finish;
|
||||
|
||||
:param transaction: a transaction instance to be wrapped
|
||||
:param suppress: do not propagate exception if True
|
||||
"""
|
||||
|
||||
def __init__(self, transaction, suppress=False):
|
||||
def __init__(self, transaction, on_error):
|
||||
self._transaction = transaction
|
||||
self._suppress = suppress
|
||||
self._on_error = on_error
|
||||
|
||||
def __enter__(self):
|
||||
logger.debug("Transaction %s starts assembling.", self._transaction.id)
|
||||
self._logitem = helpers.TaskHelper.create_action_log(self._transaction)
|
||||
return self._transaction
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
@ -104,19 +102,12 @@ class try_transaction(object):
|
||||
"Transaction %s failed.",
|
||||
self._transaction.id, exc_info=(exc_type, exc_val, exc_tb)
|
||||
)
|
||||
objects.Task.update(self._transaction, {
|
||||
'status': consts.TASK_STATUSES.error,
|
||||
'progress': 100,
|
||||
'message': six.text_type(exc_val),
|
||||
})
|
||||
helpers.TaskHelper.update_action_log(
|
||||
self._transaction, self._logitem
|
||||
)
|
||||
return self._on_error(self._transaction, six.text_type(exc_val))
|
||||
else:
|
||||
logger.debug(
|
||||
"Transaction %s finish assembling.", self._transaction.id
|
||||
)
|
||||
return self._suppress
|
||||
return False
|
||||
|
||||
|
||||
class TransactionsManager(object):
|
||||
@ -173,6 +164,7 @@ class TransactionsManager(object):
|
||||
'status': consts.TASK_STATUSES.pending,
|
||||
'dry_run': dry_run,
|
||||
})
|
||||
helpers.TaskHelper.create_action_log(transaction)
|
||||
|
||||
for graph in graphs:
|
||||
# 'dry_run' flag is a part of transaction, so we can restore its
|
||||
@ -184,7 +176,7 @@ class TransactionsManager(object):
|
||||
cache = graph.copy()
|
||||
cache['force'] = force
|
||||
|
||||
transaction.create_subtask(
|
||||
sub_transaction = transaction.create_subtask(
|
||||
self.task_name,
|
||||
status=consts.TASK_STATUSES.pending,
|
||||
dry_run=dry_run,
|
||||
@ -195,6 +187,7 @@ class TransactionsManager(object):
|
||||
# FIXME: Consider to use a separate set of columns.
|
||||
cache=cache,
|
||||
)
|
||||
helpers.TaskHelper.create_action_log(sub_transaction)
|
||||
|
||||
# We need to commit transaction because asynchronous call below might
|
||||
# be executed in separate process or thread.
|
||||
@ -211,18 +204,30 @@ class TransactionsManager(object):
|
||||
transaction and send it to execution.
|
||||
|
||||
:param transaction: a top-level transaction to continue
|
||||
:return: True if sub transaction will be started, otherwise False
|
||||
"""
|
||||
with try_transaction(transaction, suppress=True):
|
||||
sub_transaction = next((
|
||||
sub_transaction
|
||||
for sub_transaction in transaction.subtasks
|
||||
if sub_transaction.status == consts.TASK_STATUSES.pending), None)
|
||||
|
||||
if sub_transaction is None:
|
||||
# there is no sub-transaction, so we can close this transaction
|
||||
self.success(transaction)
|
||||
return False
|
||||
|
||||
with try_transaction(transaction, self.fail):
|
||||
# uWSGI mule is a separate process, and that means it won't share
|
||||
# our DB session. Hence, we can't pass fetched DB instances to the
|
||||
# function we want to be executed in mule, so let's proceed with
|
||||
# unique identifiers.
|
||||
mule.call_task_manager_async(
|
||||
self.__class__,
|
||||
'_continue_async',
|
||||
'_execute_async',
|
||||
self.cluster_id,
|
||||
transaction.id,
|
||||
sub_transaction.id,
|
||||
)
|
||||
return True
|
||||
|
||||
def process(self, transaction, report):
|
||||
"""Process feedback from executor (Astute).
|
||||
@ -250,42 +255,71 @@ class TransactionsManager(object):
|
||||
|
||||
_update_nodes(transaction, nodes_instances, nodes_params)
|
||||
_update_history(transaction, nodes)
|
||||
_update_transaction(transaction, status, progress, error)
|
||||
|
||||
if status:
|
||||
# FIXME: resolve circular dependencies by moving assemble task
|
||||
# updates from receiver to objects layer.
|
||||
from nailgun.rpc.receiver import NailgunReceiver
|
||||
objects.Task.update_recursively(
|
||||
transaction,
|
||||
NailgunReceiver._assemble_task_update(
|
||||
transaction, status, progress, error, nodes_instances
|
||||
)
|
||||
)
|
||||
if status in (consts.TASK_STATUSES.error, consts.TASK_STATUSES.ready):
|
||||
helpers.TaskHelper.update_action_log(transaction)
|
||||
if transaction.parent:
|
||||
# if transaction is completed successfully,
|
||||
# we've got to initiate the next one in the chain
|
||||
if status == consts.TASK_STATUSES.ready:
|
||||
self.continue_(transaction.parent)
|
||||
else:
|
||||
self.fail(transaction.parent, error)
|
||||
|
||||
# if transaction is completed successfully, we've got to initiate
|
||||
# the next one in the chain
|
||||
if transaction.parent and status == consts.TASK_STATUSES.ready:
|
||||
self.continue_(transaction.parent)
|
||||
def success(self, transaction):
|
||||
objects.Transaction.update(
|
||||
transaction,
|
||||
{'status': consts.TASK_STATUSES.ready, 'progress': 100}
|
||||
)
|
||||
_update_cluster_status(transaction)
|
||||
notifier.notify(
|
||||
consts.NOTIFICATION_TOPICS.done,
|
||||
"Graph execution has been successfully completed."
|
||||
"You can check deployment history for detailed information.",
|
||||
transaction.cluster_id,
|
||||
None,
|
||||
task_uuid=transaction.uuid
|
||||
)
|
||||
|
||||
def _continue_async(self, transaction_id):
|
||||
transaction = objects.Transaction.get_by_uid(transaction_id)
|
||||
def fail(self, transaction, reason):
|
||||
data = {
|
||||
'status': consts.TASK_STATUSES.error,
|
||||
'message': reason,
|
||||
'progress': 100
|
||||
}
|
||||
objects.Transaction.update(transaction, data)
|
||||
helpers.TaskHelper.update_action_log(transaction)
|
||||
|
||||
with try_transaction(transaction, suppress=True):
|
||||
self._continue_sync(transaction)
|
||||
data['message'] = 'Aborted'
|
||||
for sub_transaction in transaction.subtasks:
|
||||
if sub_transaction.status == consts.TASK_STATUSES.pending:
|
||||
objects.Transaction.update(sub_transaction, data)
|
||||
helpers.TaskHelper.update_action_log(sub_transaction)
|
||||
|
||||
_update_cluster_status(transaction)
|
||||
notifier.notify(
|
||||
consts.NOTIFICATION_TOPICS.error,
|
||||
"Graph execution failed with error: '{0}'."
|
||||
"Please check deployment history for more details."
|
||||
.format(reason),
|
||||
transaction.cluster_id,
|
||||
None,
|
||||
task_uuid=transaction.uuid
|
||||
)
|
||||
return True
|
||||
|
||||
def _execute_async(self, sub_transaction_id):
|
||||
sub_transaction = objects.Transaction.get_by_uid(sub_transaction_id)
|
||||
|
||||
with try_transaction(sub_transaction.parent, self.fail):
|
||||
self._execute_sync(sub_transaction)
|
||||
|
||||
# Since the whole function is executed in separate process, we must
|
||||
# commit all changes in order to do not lost them.
|
||||
db().commit()
|
||||
|
||||
def _continue_sync(self, transaction):
|
||||
sub_transaction = next((
|
||||
sub_transaction
|
||||
for sub_transaction in transaction.subtasks
|
||||
if sub_transaction.status == consts.TASK_STATUSES.pending), None)
|
||||
|
||||
if sub_transaction is None:
|
||||
return False
|
||||
|
||||
def _execute_sync(self, sub_transaction):
|
||||
cluster = sub_transaction.cluster
|
||||
graph = objects.Cluster.get_deployment_graph(
|
||||
cluster, sub_transaction.graph_type
|
||||
@ -296,10 +330,11 @@ class TransactionsManager(object):
|
||||
sub_transaction.cache.get('nodes')
|
||||
)
|
||||
for node in nodes:
|
||||
node.roles = list(set(node.roles + node.pending_roles))
|
||||
node.pending_roles = []
|
||||
node.error_type = None
|
||||
# set progress to show that node is in progress state
|
||||
node.progress = 1
|
||||
if not sub_transaction.dry_run:
|
||||
node.error_type = None
|
||||
node.error_msg = None
|
||||
|
||||
resolver = role_resolver.RoleResolver(nodes)
|
||||
_adjust_graph_tasks(
|
||||
@ -320,16 +355,16 @@ class TransactionsManager(object):
|
||||
# top of this.
|
||||
_dump_expected_state(sub_transaction, context.new, graph['tasks'])
|
||||
|
||||
with try_transaction(sub_transaction):
|
||||
message = make_astute_message(
|
||||
sub_transaction, context, graph, resolver)
|
||||
message = make_astute_message(
|
||||
sub_transaction, context, graph, resolver
|
||||
)
|
||||
|
||||
# Once rpc.cast() is called, the message is sent to Astute. By
|
||||
# that moment all transaction instanced must exist in database,
|
||||
# otherwise we may get wrong result due to RPC receiver won't
|
||||
# found entry to update.
|
||||
db().commit()
|
||||
rpc.cast('naily', [message])
|
||||
# Once rpc.cast() is called, the message is sent to Astute. By
|
||||
# that moment all transaction instanced must exist in database,
|
||||
# otherwise we may get wrong result due to RPC receiver won't
|
||||
# found entry to update.
|
||||
db().commit()
|
||||
rpc.cast('naily', [message])
|
||||
|
||||
def _acquire_cluster(self):
|
||||
cluster = objects.Cluster.get_by_uid(
|
||||
@ -558,6 +593,12 @@ def _update_nodes(transaction, nodes_instances, nodes_params):
|
||||
node_id=node.uid,
|
||||
task_uuid=transaction.uuid
|
||||
)
|
||||
elif new_status == 'ready':
|
||||
# TODO(bgaifullin) need to remove pengind roles concept
|
||||
node.roles = list(set(node.roles + node.pending_roles))
|
||||
node.pending_roles = []
|
||||
node.progress = 100
|
||||
node.status = new_status
|
||||
else:
|
||||
node.status = new_status
|
||||
else:
|
||||
@ -581,3 +622,44 @@ def _update_history(transaction, nodes):
|
||||
node['task_status'],
|
||||
node.get('custom'),
|
||||
)
|
||||
|
||||
|
||||
def _update_transaction(transaction, status, progress, message):
|
||||
data = {}
|
||||
if status:
|
||||
data['status'] = status
|
||||
if progress:
|
||||
data['progress'] = progress
|
||||
if message:
|
||||
data['message'] = message
|
||||
if data:
|
||||
objects.Transaction.update(transaction, data)
|
||||
|
||||
if transaction.parent and progress:
|
||||
siblings = transaction.parent.subtasks
|
||||
total_progress = sum(x.progress for x in siblings)
|
||||
objects.Transaction.update(transaction.parent, {
|
||||
'progress': total_progress // len(siblings)
|
||||
})
|
||||
|
||||
|
||||
def _update_cluster_status(transaction):
|
||||
if transaction.dry_run:
|
||||
return
|
||||
|
||||
nodes = objects.NodeCollection.filter_by(
|
||||
None, cluster_id=transaction.cluster_id
|
||||
)
|
||||
failed_nodes = objects.NodeCollection.filter_by_not(nodes, error_type=None)
|
||||
not_ready_nodes = objects.NodeCollection.filter_by_not(
|
||||
nodes, status=consts.NODE_STATUSES.ready
|
||||
)
|
||||
# if all nodes are ready - cluster has operational status
|
||||
# otherwise cluster has partially deployed status
|
||||
if (objects.NodeCollection.count(failed_nodes) or
|
||||
objects.NodeCollection.count(not_ready_nodes)):
|
||||
status = consts.CLUSTER_STATUSES.partially_deployed
|
||||
else:
|
||||
status = consts.CLUSTER_STATUSES.operational
|
||||
|
||||
objects.Cluster.update(transaction.cluster, {'status': status})
|
||||
|
Loading…
Reference in New Issue
Block a user