Fixed detecting changes on new nodes
The cluster state will be used on new nodes as previous state in case if the cluster already has successful deployment. Closes-Bug: 1581002 Closes-Bug: 1573602 Change-Id: Ie8a81193dc6002d4d7dec56b3b73e186b835d5fc
This commit is contained in:
parent
0af74e41be
commit
eff4f2f522
|
@ -29,4 +29,14 @@ class TransactionContext(object):
|
|||
return self.new[node_id]
|
||||
|
||||
def get_old_data(self, node_id, task_id):
|
||||
return self.old.get(task_id, {}).get(node_id)
|
||||
try:
|
||||
task_data = self.old[task_id]
|
||||
except KeyError:
|
||||
return {}
|
||||
|
||||
try:
|
||||
return task_data[node_id]
|
||||
except KeyError:
|
||||
# only if info for node does not present
|
||||
# use the default state
|
||||
return task_data.get(None) or {}
|
||||
|
|
|
@ -420,6 +420,23 @@ class ClusterTransaction(DeploymentTask):
|
|||
|
||||
return node.status in cls.node_statuses_for_redeploy
|
||||
|
||||
@classmethod
|
||||
def get_cluster_state(cls, deployment_info):
|
||||
"""Extracts cluster state from deployment info.
|
||||
|
||||
:param deployment_info: the deployment info
|
||||
:return: the cluster state
|
||||
"""
|
||||
# the cluster state can be produced from master node state
|
||||
if not deployment_info:
|
||||
return {}
|
||||
|
||||
master_state = deployment_info[consts.MASTER_NODE_UID]
|
||||
cluster_state = master_state.copy()
|
||||
cluster_state.pop('roles')
|
||||
cluster_state.pop('uid')
|
||||
return cluster_state
|
||||
|
||||
@classmethod
|
||||
def get_current_state(cls, cluster, nodes, tasks):
|
||||
"""Current state for deployment.
|
||||
|
@ -432,30 +449,47 @@ class ClusterTransaction(DeploymentTask):
|
|||
"""
|
||||
nodes = {n.uid: n for n in nodes}
|
||||
nodes[consts.MASTER_NODE_UID] = None
|
||||
tasks_names = [t['id'] for t in tasks
|
||||
if t['type'] not in cls.ignored_types]
|
||||
tasks_names = {
|
||||
t['id'] for t in tasks if t['type'] not in cls.ignored_types
|
||||
}
|
||||
|
||||
transactions = list(
|
||||
objects.TransactionCollection.get_successful_transactions_per_task(
|
||||
cluster.id, tasks_names, nodes)
|
||||
)
|
||||
|
||||
# sort by transaction.id
|
||||
transactions.sort(key=lambda x: x[0].id)
|
||||
# sort by transaction.id and task_name
|
||||
transactions.sort(key=lambda x: (x[0].id, x[2]))
|
||||
|
||||
state = {}
|
||||
for transaction, data in groupby(transactions, lambda x: x[0]):
|
||||
deployment_info = objects.Transaction.get_deployment_info(
|
||||
transaction)
|
||||
|
||||
for _, node_uid, task_name in data:
|
||||
transaction
|
||||
)
|
||||
cluster_state = cls.get_cluster_state(deployment_info)
|
||||
for task_name, rows in groupby(data, lambda x: x[2]):
|
||||
# exclude existed task
|
||||
tasks_names.discard(task_name)
|
||||
task_state = state.setdefault(task_name, {})
|
||||
task_state.setdefault(node_uid, {})
|
||||
task_state.setdefault(None, cluster_state)
|
||||
for _, node_uid, _ in rows:
|
||||
# we use cluster state for nodes that is for redeploy
|
||||
if not cls.is_node_for_redeploy(nodes[node_uid]):
|
||||
try:
|
||||
task_state[node_uid] = deployment_info[node_uid]
|
||||
except KeyError:
|
||||
# we do not add deployment info for node
|
||||
# if it does not exist
|
||||
pass
|
||||
|
||||
if cls.is_node_for_redeploy(nodes.get(node_uid)):
|
||||
task_state[node_uid] = {}
|
||||
else:
|
||||
task_state[node_uid] = deployment_info.get(node_uid, {})
|
||||
cluster_state = cls.get_cluster_state(
|
||||
objects.Transaction.get_deployment_info(
|
||||
objects.TransactionCollection.get_last_succeed_run(cluster)
|
||||
)
|
||||
)
|
||||
# attach the cluster state from last deployment info to all left tasks
|
||||
for task_name in tasks_names:
|
||||
state[task_name] = {None: cluster_state}
|
||||
|
||||
return state
|
||||
|
||||
|
|
|
@ -33,6 +33,25 @@ class TestTaskSerializerContext(BaseUnitTest):
|
|||
'openstack_version': 'liberty-9.0',
|
||||
'public_ssl': {'hostname': 'localhost'},
|
||||
'attribute': '1'
|
||||
},
|
||||
'2': {'cluster': {'id': 1}}
|
||||
},
|
||||
{
|
||||
'task1': {
|
||||
'1': {
|
||||
'cluster': {'id': 1},
|
||||
'attribute': '2'
|
||||
},
|
||||
None: {
|
||||
'cluster': {'id': 1},
|
||||
'attribute': '3'
|
||||
}
|
||||
},
|
||||
'task2': {
|
||||
None: {
|
||||
'cluster': {'id': 1},
|
||||
'attribute': '4'
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
|
@ -98,6 +117,22 @@ class TestTaskSerializerContext(BaseUnitTest):
|
|||
self.context.get_formatter_context('1')
|
||||
)
|
||||
|
||||
def test_get_yaql_interpreter(self):
|
||||
cases = [
|
||||
{'expected': '2', 'node_id': '1', 'task_id': 'task1'},
|
||||
{'expected': '3', 'node_id': '2', 'task_id': 'task1'},
|
||||
{'expected': '4', 'node_id': '1', 'task_id': 'task2'}
|
||||
]
|
||||
for case in cases:
|
||||
interpreter = self.context.get_yaql_interpreter(
|
||||
case['node_id'], case['task_id']
|
||||
)
|
||||
self.assertEqual(
|
||||
case['expected'], interpreter('old($.attribute)')
|
||||
)
|
||||
interpreter = self.context.get_yaql_interpreter('1', 'task3')
|
||||
self.assertTrue(interpreter('old($.attribute).isUndef()'))
|
||||
|
||||
|
||||
class TestDefaultTaskSerializer(BaseUnitTest):
|
||||
@classmethod
|
||||
|
|
|
@ -15,11 +15,15 @@
|
|||
# under the License.
|
||||
|
||||
import datetime
|
||||
import mock
|
||||
|
||||
from nailgun import consts
|
||||
from nailgun.db import db
|
||||
from nailgun.db.sqlalchemy import models
|
||||
from nailgun import errors
|
||||
from nailgun.task.manager import DeploymentCheckMixin
|
||||
from nailgun.task.task import ClusterTransaction
|
||||
|
||||
from nailgun.test.base import BaseTestCase
|
||||
|
||||
|
||||
|
@ -56,3 +60,117 @@ class TestDeploymentCheckMixin(BaseTestCase):
|
|||
errors.DeploymentAlreadyStarted,
|
||||
DeploymentCheckMixin.check_no_running_deployment,
|
||||
self.cluster)
|
||||
|
||||
|
||||
class TestClusterTransaction(BaseTestCase):
|
||||
def test_get_cluster_state(self):
|
||||
deployments_info = {
|
||||
consts.MASTER_NODE_UID: {
|
||||
'uid': consts.MASTER_NODE_UID,
|
||||
'roles': [consts.TASK_ROLES.master],
|
||||
'key': 'value'
|
||||
}
|
||||
}
|
||||
self.assertEqual(
|
||||
{'key': 'value'},
|
||||
ClusterTransaction.get_cluster_state(deployments_info)
|
||||
)
|
||||
|
||||
self.assertEqual({}, ClusterTransaction.get_cluster_state(None))
|
||||
self.assertEqual({}, ClusterTransaction.get_cluster_state({}))
|
||||
|
||||
def test_is_node_for_redeploy(self):
|
||||
self.assertFalse(ClusterTransaction.is_node_for_redeploy(None))
|
||||
|
||||
self.assertFalse(ClusterTransaction.is_node_for_redeploy(
|
||||
mock.MagicMock(status=consts.NODE_STATUSES.ready)
|
||||
))
|
||||
self.assertTrue(ClusterTransaction.is_node_for_redeploy(
|
||||
mock.MagicMock(status=consts.NODE_STATUSES.provisioned)
|
||||
))
|
||||
self.assertTrue(ClusterTransaction.is_node_for_redeploy(
|
||||
mock.MagicMock(status=consts.NODE_STATUSES.stopped)
|
||||
))
|
||||
self.assertTrue(ClusterTransaction.is_node_for_redeploy(
|
||||
mock.MagicMock(status=consts.NODE_STATUSES.discover)
|
||||
))
|
||||
|
||||
@mock.patch('nailgun.objects.TransactionCollection')
|
||||
def test_get_current_state(self, trans_cls_mock):
|
||||
cluster = self.env.create(
|
||||
nodes_kwargs=[
|
||||
{"pending_addition": True,
|
||||
'status': consts.NODE_STATUSES.ready},
|
||||
{"pending_addition": True,
|
||||
'status': consts.NODE_STATUSES.ready},
|
||||
{"pending_addition": True,
|
||||
'status': consts.NODE_STATUSES.provisioned},
|
||||
],
|
||||
release_kwargs={
|
||||
'operating_system': consts.RELEASE_OS.ubuntu,
|
||||
'version': 'mitaka-9.0'
|
||||
},
|
||||
)
|
||||
|
||||
nodes_ids = [n.uid for n in cluster.nodes]
|
||||
nodes_ids_with_master = nodes_ids + [consts.MASTER_NODE_UID]
|
||||
|
||||
deployments_info = [
|
||||
{
|
||||
uid: {'uid': uid, 'version': version, 'roles': []}
|
||||
for uid in nodes_ids_with_master
|
||||
}
|
||||
for version in range(3)
|
||||
]
|
||||
|
||||
# delete info about node_ids[1] from deployment_info[1]
|
||||
# to check case when deployment_info for node does not found
|
||||
del deployments_info[1][nodes_ids[1]]
|
||||
|
||||
transactions = [
|
||||
mock.MagicMock(deployment_info=x) for x in deployments_info
|
||||
]
|
||||
tasks = [
|
||||
{'id': 'task1', 'type': consts.ORCHESTRATOR_TASK_TYPES.puppet},
|
||||
{'id': 'group1', 'type': consts.ORCHESTRATOR_TASK_TYPES.group},
|
||||
{'id': 'skipped1', 'type': consts.ORCHESTRATOR_TASK_TYPES.skipped},
|
||||
{'id': 'task2', 'type': consts.ORCHESTRATOR_TASK_TYPES.shell},
|
||||
{'id': 'task3', 'type': consts.ORCHESTRATOR_TASK_TYPES.reboot},
|
||||
]
|
||||
|
||||
trans_cls_mock.get_last_succeed_run.return_value = transactions[0]
|
||||
|
||||
trans_cls_mock.get_successful_transactions_per_task.return_value = [
|
||||
(transactions[1], nodes_ids[0], tasks[0]['id']),
|
||||
(transactions[2], nodes_ids[2], tasks[3]['id']),
|
||||
(transactions[1], nodes_ids[1], tasks[0]['id']),
|
||||
]
|
||||
|
||||
state = ClusterTransaction.get_current_state(
|
||||
cluster, cluster.nodes, tasks
|
||||
)
|
||||
|
||||
expected_state = {
|
||||
# cluster state from transaction[0]
|
||||
# it does not have info for node[1], see comment above
|
||||
tasks[0]['id']: {
|
||||
None: ClusterTransaction.get_cluster_state(
|
||||
transactions[1].deployment_info
|
||||
),
|
||||
nodes_ids[0]: transactions[1].deployment_info[nodes_ids[0]]
|
||||
},
|
||||
# cluster state from transaction[1]
|
||||
# there is no state for node 2, because it is provisioned
|
||||
tasks[3]['id']: {
|
||||
None: ClusterTransaction.get_cluster_state(
|
||||
transactions[2].deployment_info
|
||||
)
|
||||
},
|
||||
# contains only default state
|
||||
tasks[4]['id']: {
|
||||
None: ClusterTransaction.get_cluster_state(
|
||||
transactions[0].deployment_info
|
||||
),
|
||||
},
|
||||
}
|
||||
self.assertEqual(expected_state, state)
|
||||
|
|
Loading…
Reference in New Issue