Fix getting previous state for deployment
Now we get last success transaction not only for every task but for every node in the cluster. The previous version of state collecting function may lead to false-positive choice of previous deployment transaction and issues with smart YAQL-based redeployment. Change-Id: If9e22924acb465487a2fa97534fe50f03a1590a1 Closes-Bug: #1581015 Closes-Bug: #1582269
This commit is contained in:
parent
d3a00f2a67
commit
5ba7416156
@ -88,31 +88,44 @@ class TransactionCollection(NailgunCollection):
|
||||
).order_by('-id').limit(1).first()
|
||||
|
||||
@classmethod
|
||||
def get_successful_transactions_per_task(cls, cluster_id, task_names=None):
|
||||
def get_successful_transactions_per_task(cls, cluster_id,
|
||||
task_names=None,
|
||||
nodes_uids=None):
|
||||
"""Get last successful transaction for every task name.
|
||||
|
||||
:param cluster_id: db id of cluster object
|
||||
:param task_names: list with task names
|
||||
:returns: [(Transaction, task_name), ...]
|
||||
:param nodes_uids: db Node uids, which state you need
|
||||
:returns: [(Transaction, node_id, task_name), ...]
|
||||
"""
|
||||
history = models.DeploymentHistory
|
||||
model = cls.single.model
|
||||
|
||||
transactions = db().query(
|
||||
model, history.deployment_graph_task_name).join(history).filter(
|
||||
model,
|
||||
history.node_id,
|
||||
history.deployment_graph_task_name,
|
||||
).join(history).filter(
|
||||
model.cluster_id == cluster_id,
|
||||
model.name == consts.TASK_NAMES.deployment,
|
||||
history.status == consts.HISTORY_TASK_STATUSES.ready,
|
||||
)
|
||||
|
||||
if nodes_uids is not None:
|
||||
transactions = transactions.filter(
|
||||
history.node_id.in_(nodes_uids),
|
||||
)
|
||||
|
||||
if task_names is not None:
|
||||
transactions = transactions.filter(
|
||||
history.deployment_graph_task_name.in_(task_names),
|
||||
)
|
||||
|
||||
transactions = transactions.order_by(
|
||||
history.deployment_graph_task_name, history.task_id.desc(),
|
||||
history.deployment_graph_task_name,
|
||||
history.node_id,
|
||||
history.task_id.desc(),
|
||||
).distinct(
|
||||
history.deployment_graph_task_name
|
||||
history.deployment_graph_task_name, history.node_id
|
||||
)
|
||||
return transactions
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
import collections
|
||||
from copy import deepcopy
|
||||
from itertools import groupby
|
||||
import os
|
||||
|
||||
import netaddr
|
||||
@ -370,6 +371,13 @@ class ClusterTransaction(DeploymentTask):
|
||||
consts.ORCHESTRATOR_TASK_TYPES.stage,
|
||||
}
|
||||
|
||||
node_statuses_for_redeploy = {
|
||||
consts.NODE_STATUSES.discover,
|
||||
consts.NODE_STATUSES.error,
|
||||
consts.NODE_STATUSES.provisioned,
|
||||
consts.NODE_STATUSES.stopped,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_deployment_methods(cls, cluster):
|
||||
return ['task_deploy']
|
||||
@ -391,6 +399,57 @@ class ClusterTransaction(DeploymentTask):
|
||||
|
||||
yield task
|
||||
|
||||
@classmethod
|
||||
def is_node_for_redeploy(cls, node):
|
||||
"""Should node's previous state be cleared.
|
||||
|
||||
:param node: db Node object or None
|
||||
:returns: Bool
|
||||
"""
|
||||
if node is None:
|
||||
return False
|
||||
|
||||
return node.status in cls.node_statuses_for_redeploy
|
||||
|
||||
@classmethod
|
||||
def get_current_state(cls, cluster, nodes, tasks):
|
||||
"""Current state for deployment.
|
||||
|
||||
:param cluster: Cluster db object
|
||||
:param nodes: iterable of Node db objects
|
||||
:param tasks: list of tasks which state needed
|
||||
:returns: current state {task_name: {node_uid: <astute.yaml>, ...},}
|
||||
|
||||
"""
|
||||
nodes = {n.uid: n for n in nodes}
|
||||
nodes[consts.MASTER_NODE_UID] = None
|
||||
tasks_names = [t['id'] for t in tasks
|
||||
if t['type'] not in cls.ignored_types]
|
||||
|
||||
transactions = list(
|
||||
objects.TransactionCollection.get_successful_transactions_per_task(
|
||||
cluster.id, tasks_names, nodes)
|
||||
)
|
||||
|
||||
# sort by transaction.id
|
||||
transactions.sort(key=lambda x: x[0].id)
|
||||
|
||||
state = {}
|
||||
for transaction, data in groupby(transactions, lambda x: x[0]):
|
||||
deployment_info = objects.Transaction.get_deployment_info(
|
||||
transaction)
|
||||
|
||||
for _, node_uid, task_name in data:
|
||||
task_state = state.setdefault(task_name, {})
|
||||
task_state.setdefault(node_uid, {})
|
||||
|
||||
if cls.is_node_for_redeploy(nodes.get(node_uid)):
|
||||
task_state[node_uid] = {}
|
||||
else:
|
||||
task_state[node_uid] = deployment_info.get(node_uid, {})
|
||||
|
||||
return state
|
||||
|
||||
@classmethod
|
||||
def task_deploy(cls, transaction, tasks, nodes, force=False,
|
||||
selected_task_ids=None, **kwargs):
|
||||
@ -406,27 +465,11 @@ class ClusterTransaction(DeploymentTask):
|
||||
if selected_task_ids:
|
||||
tasks = list(cls.mark_skipped(tasks, selected_task_ids))
|
||||
|
||||
current_state = {}
|
||||
if not force:
|
||||
|
||||
tasks_names = [t['id'] for t in tasks
|
||||
if t['type'] not in cls.ignored_types]
|
||||
transaction_collection = objects.TransactionCollection
|
||||
transactions = (
|
||||
transaction_collection.get_successful_transactions_per_task(
|
||||
transaction.cluster.id, tasks_names)
|
||||
)
|
||||
current_state = {
|
||||
task_id: objects.Transaction.get_deployment_info(tr)
|
||||
for tr, task_id in transactions
|
||||
}
|
||||
|
||||
# FIXME: https://bugs.launchpad.net/fuel/+bug/1582269
|
||||
for n in nodes:
|
||||
if n.status == consts.NODE_STATUSES.provisioned:
|
||||
for task_deployment_info in current_state.values():
|
||||
if task_deployment_info.get(n.uid):
|
||||
task_deployment_info[n.uid] = {}
|
||||
if force:
|
||||
current_state = {}
|
||||
else:
|
||||
current_state = cls.get_current_state(
|
||||
transaction.cluster, nodes, tasks)
|
||||
|
||||
expected_state = cls._save_deployment_info(
|
||||
transaction, deployment_info
|
||||
|
@ -1410,11 +1410,11 @@ class TestTaskManagers(BaseIntegrationTest):
|
||||
@mock.patch('nailgun.objects.Cluster.get_deployment_tasks')
|
||||
@mock.patch('nailgun.objects.TransactionCollection'
|
||||
'.get_successful_transactions_per_task')
|
||||
def check_correct_state_calculation(self, provision, state_mock,
|
||||
tasks_mock, rpc_mock):
|
||||
def check_correct_state_calculation(self, node_status, is_skip_expected,
|
||||
state_mock, tasks_mock, rpc_mock):
|
||||
cluster = self.env.create(
|
||||
nodes_kwargs=[{'roles': ['controller'],
|
||||
'status': consts.NODE_STATUSES.provisioned}],
|
||||
'status': consts.NODE_STATUSES.ready}],
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'mitaka-9.0'
|
||||
@ -1439,21 +1439,20 @@ class TestTaskManagers(BaseIntegrationTest):
|
||||
|
||||
self.set_history_ready()
|
||||
|
||||
if provision:
|
||||
node.status = consts.NODE_STATUSES.provisioned
|
||||
state_mock.return_value = [(supertask, 'test1')]
|
||||
node.status = node_status
|
||||
|
||||
state_mock.return_value = [(supertask, node.uid, 'test1')]
|
||||
task = self.env.launch_deployment_selected([node.uid], cluster.id)
|
||||
self.assertNotEqual(consts.TASK_STATUSES.error, task.status)
|
||||
tasks_graph = rpc_mock.call_args[0][1]['args']['tasks_graph']
|
||||
|
||||
# chek that test1 task skipped by condition and test2 was not
|
||||
for task in tasks_graph[node.uid]:
|
||||
if task['id'] == 'test1':
|
||||
if provision:
|
||||
self.assertNotEqual(
|
||||
if is_skip_expected:
|
||||
self.assertEqual(
|
||||
task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
|
||||
else:
|
||||
self.assertEqual(
|
||||
self.assertNotEqual(
|
||||
task['type'], consts.ORCHESTRATOR_TASK_TYPES.skipped)
|
||||
elif task['id'] == 'test2':
|
||||
self.assertNotEqual(
|
||||
@ -1462,10 +1461,20 @@ class TestTaskManagers(BaseIntegrationTest):
|
||||
self.fail('Unexpected task in graph')
|
||||
|
||||
def test_correct_state_calculation(self):
|
||||
self.check_correct_state_calculation(False)
|
||||
self.check_correct_state_calculation(
|
||||
consts.NODE_STATUSES.ready, True)
|
||||
|
||||
def test_state_calculation_after_provision(self):
|
||||
self.check_correct_state_calculation(True)
|
||||
self.check_correct_state_calculation(
|
||||
consts.NODE_STATUSES.provisioned, False)
|
||||
|
||||
def test_state_calculation_after_stop(self):
|
||||
self.check_correct_state_calculation(
|
||||
consts.NODE_STATUSES.stopped, False)
|
||||
|
||||
def test_state_calculation_after_rediscover(self):
|
||||
self.check_correct_state_calculation(
|
||||
consts.NODE_STATUSES.discover, False)
|
||||
|
||||
|
||||
class TestUpdateDnsmasqTaskManagers(BaseIntegrationTest):
|
||||
|
@ -995,12 +995,15 @@ class TestTransactionObject(BaseIntegrationTest):
|
||||
get_succeed = (
|
||||
objects.TransactionCollection.get_successful_transactions_per_task
|
||||
)
|
||||
uid1 = '1'
|
||||
uid2 = '2'
|
||||
|
||||
tasks_graph = {
|
||||
None: [
|
||||
{'id': 'post_deployment_start'},
|
||||
{'id': 'post_deployment_end'}
|
||||
],
|
||||
'1': [{'id': 'dns-client'}]
|
||||
uid1: [{'id': 'dns-client'}]
|
||||
}
|
||||
|
||||
def make_task_with_history(task_status, graph):
|
||||
@ -1018,23 +1021,38 @@ class TestTransactionObject(BaseIntegrationTest):
|
||||
# create some tasks in history
|
||||
task1 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id, ['dns-client']).all()
|
||||
self.assertEqual(transactions, [(task1, 'dns-client')])
|
||||
self.assertEqual(transactions, [(task1, uid1, 'dns-client')])
|
||||
|
||||
# remove 'dns-client' and add 'test' to graph
|
||||
tasks_graph['1'] = [{'id': 'test'}]
|
||||
# remove 'dns-client' and add 'test' to graph for two nodes
|
||||
tasks_graph[uid1] = tasks_graph[uid2] = [{'id': 'test'}]
|
||||
task2 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id, ['test']).all()
|
||||
self.assertEqual(transactions, [(task2, 'test')])
|
||||
self.assertEqual(transactions, [(task2, uid1, 'test'),
|
||||
(task2, uid2, 'test')])
|
||||
|
||||
# remove 'test' and add 'dns-client' to graph
|
||||
tasks_graph['1'] = [{'id': 'dns-client'}]
|
||||
# remove 'test' and add 'dns-client' to graph, leave node2 as previous
|
||||
tasks_graph[uid1] = [{'id': 'dns-client'}]
|
||||
task3 = make_task_with_history('ready', tasks_graph)
|
||||
transactions = get_succeed(self.cluster.id,
|
||||
['dns-client', 'test']).all()
|
||||
|
||||
# now we should find both `test` and `dns-client` transactions
|
||||
self.assertEqual(transactions,
|
||||
[(task3, 'dns-client'), (task2, 'test')])
|
||||
# on node 1 and onle `test` on node 2
|
||||
self.assertEqual(
|
||||
transactions,
|
||||
[(task3, uid1, 'dns-client'),
|
||||
(task2, uid1, 'test'),
|
||||
(task3, uid2, 'test')]
|
||||
)
|
||||
|
||||
# filter out node 2
|
||||
transactions = get_succeed(self.cluster.id,
|
||||
['dns-client', 'test'], [uid1]).all()
|
||||
self.assertEqual(
|
||||
transactions,
|
||||
[(task3, uid1, 'dns-client'),
|
||||
(task2, uid1, 'test')]
|
||||
)
|
||||
|
||||
|
||||
class TestActionLogObject(BaseIntegrationTest):
|
||||
|
Loading…
Reference in New Issue
Block a user