Moved parameters of tasks from graph to separate dict

Since the task parameters is same for all nodes, the parameters was
excluded from graph. this parameters is passed to orcestrator in seperate
argument. the graph contains only links between tasks.
This helps to reduce size of graph on more than 3 times.

Change-Id: Icc8a43df83b01a2da53056178c6dd26089e50d7b
Implements: blueprint reduce-size-of-serialized-tasks
Depends-On: I7e46946c68614789e6c59cc0a0b6ac01fbe6547b
This commit is contained in:
Bulat Gaifullin 2016-02-04 15:29:47 +03:00
parent b0cba9a677
commit 2209a31811
4 changed files with 150 additions and 112 deletions

View File

@ -15,7 +15,6 @@
# under the License.
import collections
import copy
from distutils.version import StrictVersion
import itertools
@ -356,7 +355,7 @@ class TaskProcessor(object):
requires_ex = current.setdefault('requires_ex', [])
for node_id in previous.get('uids', ()):
requires_ex.append(
{'name': previous['id'], 'node_id': node_id}
(previous['id'], node_id)
)
@staticmethod
@ -387,7 +386,8 @@ class TasksSerializer(object):
self.role_resolver = RoleResolver(nodes)
self.task_serializer = DeployTaskSerializer()
self.task_processor = TaskProcessor()
self.tasks_per_node = collections.defaultdict(dict)
self.tasks_connections = collections.defaultdict(dict)
self.tasks_dictionary = dict()
self.task_filter = self.make_task_filter(task_ids)
@classmethod
@ -399,15 +399,18 @@ class TasksSerializer(object):
:param tasks: the list of tasks
:param task_ids: Only specified tasks will be executed,
If None, all tasks will be executed
:return: the list of serialized task per node
:return: the tasks dictionary, the tasks connections
"""
serializer = cls(cluster, nodes, task_ids)
serializer.resolve_nodes(add_plugin_deployment_hooks(tasks), nodes)
serializer.resolve_dependencies()
return dict(
(k, list(six.itervalues(v)))
for k, v in six.iteritems(serializer.tasks_per_node)
)
tasks_dictionary = serializer.tasks_dictionary
tasks_connections = serializer.tasks_connections
for node_id in tasks_connections:
tasks_connections[node_id] = list(
six.itervalues(tasks_connections[node_id])
)
return tasks_dictionary, tasks_connections
def resolve_nodes(self, tasks, nodes):
"""Resolves node roles in tasks.
@ -432,7 +435,7 @@ class TasksSerializer(object):
self.expand_task_groups(groups, tasks_mapping)
# make sure that null node is present
self.tasks_per_node.setdefault(None, dict())
self.tasks_connections.setdefault(None, dict())
def process_task(self, task, nodes, resolver_factory, skip=False):
"""Processes one task one nodes of cluster.
@ -452,50 +455,65 @@ class TasksSerializer(object):
# do not pass skipped attribute to astute
skipped = skip or task.pop('skipped', False) or \
not task_serializer.should_execute()
for astute_task in self.task_processor.process_tasks(
for serialized in self.task_processor.process_tasks(
task, task_serializer.serialize()):
# all skipped task shall have type skipped
# do not exclude them from graph to keep connections between nodes
if skipped:
astute_task['type'] = \
consts.ORCHESTRATOR_TASK_TYPES.skipped
for node_id in astute_task.pop('uids', ()):
node_tasks = self.tasks_per_node[node_id]
if skipped:
task_type = consts.ORCHESTRATOR_TASK_TYPES.skipped
else:
task_type = serialized['type']
task_relations = {
'id': serialized['id'],
'type': task_type,
'requires': serialized.pop('requires', []),
'required_for': serialized.pop('required_for', []),
'cross-depends': serialized.pop('cross-depends', []),
'cross-depended-by': serialized.pop('cross-depended-by', []),
}
node_ids = serialized.pop('uids', ())
self.tasks_dictionary[serialized['id']] = serialized
for node_id in node_ids:
node_tasks = self.tasks_connections[node_id]
# de-duplication the tasks on node
# since task can be added after expand group need to
# overwrite if existed task is skipped and new is not skipped.
if self.need_update_task(node_tasks, astute_task):
node_tasks[astute_task['id']] = copy.deepcopy(astute_task)
if self.need_update_task(node_tasks, serialized):
node_tasks[serialized['id']] = task_relations.copy()
def resolve_dependencies(self):
"""Resolves tasks dependencies."""
for node_id, tasks in six.iteritems(self.tasks_per_node):
for node_id, tasks in six.iteritems(self.tasks_connections):
for task in six.itervalues(tasks):
task['requires'] = list(
self.expand_dependencies(
node_id, task.get('requires'), False
)
)
task['required_for'] = list(
self.expand_dependencies(
node_id, task.get('required_for'), True
)
)
task['requires'].extend(
self.expand_cross_dependencies(
node_id, task.pop('cross-depends', None), False
)
)
requires = set(self.expand_dependencies(
node_id, task.pop('requires'), False
))
requires.update(self.expand_cross_dependencies(
node_id, task.pop('cross-depends', None), False
))
requires.update(task.pop('requires_ex', ()))
task['required_for'].extend(
self.expand_cross_dependencies(
node_id, task.pop('cross-depended-by', None), True
)
)
task['requires'].extend(task.pop('requires_ex', ()))
task['required_for'].extend(task.pop('required_for_ex', ()))
required_for = set(self.expand_dependencies(
node_id, task.pop('required_for'), True
))
required_for.update(self.expand_cross_dependencies(
node_id, task.pop('cross-depended-by', None), True
))
required_for.update(task.pop('required_for_ex', ()))
# render
if requires:
task['requires'] = [
dict(six.moves.zip(('name', 'node_id'), r))
for r in requires
]
if required_for:
task['required_for'] = [
dict(six.moves.zip(('name', 'node_id'), r))
for r in required_for
]
def expand_task_groups(self, groups, task_mapping):
"""Expand group of tasks.
@ -573,11 +591,11 @@ class TasksSerializer(object):
match_policy = NameMatchingPolicy.create(name)
for node_id in node_ids:
applied_tasks = set()
for task_name in self.tasks_per_node[node_id]:
for task_name in self.tasks_connections[node_id]:
if task_name == name:
# the simple case when name of current task
# is exact math to name of task that is search
yield {"name": task_name, "node_id": node_id}
yield task_name, node_id
continue
# at first get the original task name, actual
@ -595,7 +613,7 @@ class TasksSerializer(object):
task_name_gen = self.task_processor.get_last_task_id
task_name = task_name_gen(original_task)
yield {"name": task_name, "node_id": node_id}
yield task_name, node_id
@classmethod
def need_update_task(cls, tasks, task):

View File

@ -230,15 +230,21 @@ class DeploymentTask(object):
@classmethod
def task_deploy(cls, task, nodes, task_ids, reexecutable_filter):
deployment_tasks = objects.Cluster.get_deployment_tasks(task.cluster)
logger.debug("start cluster serialization.")
serialized_cluster = deployment_serializers.serialize(
None, task.cluster, nodes
)
serialized_tasks = task_based_deployment.TasksSerializer.serialize(
logger.debug("finish cluster serialization.")
logger.debug("start tasks serialization.")
directory, graph = task_based_deployment.TasksSerializer.serialize(
task.cluster, nodes, deployment_tasks, task_ids
)
logger.debug("finish tasks serialization.")
return {
"deployment_info": serialized_cluster,
"deployment_tasks": serialized_tasks
"tasks_directory": directory,
"tasks_graph": graph
}

View File

@ -82,7 +82,8 @@ class TestTaskDeploy(BaseIntegrationTest):
message = self.get_deploy_message()
self.assertEqual("task_deploy", message["method"])
self.assertItemsEqual(
["task_uuid", "deployment_info", "deployment_tasks"],
["task_uuid", "deployment_info",
"tasks_directory", "tasks_graph"],
message["args"]
)
@ -122,7 +123,7 @@ class TestTaskDeploy(BaseIntegrationTest):
(x.uid for x in self.env.nodes if 'compute' in x.roles), None
)
self.assertIsNotNone(compute_uid)
compute_tasks = message['args']['deployment_tasks'][compute_uid]
compute_tasks = message['args']['tasks_graph'][compute_uid]
expected_tasks = {
consts.PLUGIN_PRE_DEPLOYMENT_HOOK + "_start",
@ -167,10 +168,10 @@ class TestTaskDeploy(BaseIntegrationTest):
).status
)
deploy_tasks = rpc_cast.call_args[0][1]['args']['deployment_tasks']
links = rpc_cast.call_args[0][1]['args']['tasks_graph']
self.assertItemsEqual(
["deploy_legacy"],
(task["id"] for task in deploy_tasks[compute.uid]
(task["id"] for task in links[compute.uid]
if task['type'] != consts.ORCHESTRATOR_TASK_TYPES.skipped)
)
@ -187,7 +188,7 @@ class TestTaskDeploy(BaseIntegrationTest):
self.db.flush()
message = self.get_deploy_message()
deploy_tasks = message['args']['deployment_tasks']
deploy_tasks = message['args']['tasks_graph']
self.assertIn(
"netconfig",
{task["id"] for task in deploy_tasks[compute.uid]

View File

@ -38,18 +38,28 @@ class TestTaskSerializers(BaseTestCase):
self.env.clusters[-1], self.env.nodes
)
def test_serialize(self):
def test_serialize_result(self):
tasks = [
{
"id": "test1", "role": ["controller"],
"type": "stage", "version": "2.0.0"
"id": "task1", "role": ["controller"],
"type": "shell", "version": "2.0.0",
"parameters": {"cmd": "bash -c 'echo 1'"},
"cross-depends": [{"name": "task3", "role": ["compute"]}]
},
{
"id": "test2", "role": ["compute"],
"type": "stage", "version": "2.0.0"
"id": "task2", "role": ["controller"],
"type": "shell", "version": "2.0.0",
"parameters": {"cmd": "bash -c 'echo 1'"},
"requires": ["task1"]
},
{
"id": "task3", "role": ["compute"],
"type": "shell", "version": "2.0.0",
"parameters": {"cmd": "bash -c 'echo 2'"},
"cross-depended-by": [{"name": "task2", "role": "*"}]
},
]
serialized = self.serializer.serialize(
tasks, connections = self.serializer.serialize(
self.env.clusters[-1], self.env.nodes, tasks
)
controllers = [
@ -62,12 +72,27 @@ class TestTaskSerializers(BaseTestCase):
self.assertEqual(1, len(controllers))
self.assertEqual(1, len(computes))
self.assertItemsEqual(
["test1"],
(x["id"] for x in serialized[controllers[0]])
[
{
"id": "task1",
"type": "shell",
"requires": [{"name": "task3", "node_id": computes[0]}]
},
{
"id": "task2",
"type": "shell",
"requires": [{"name": "task1", "node_id": controllers[0]}],
},
],
connections[controllers[0]]
)
self.assertItemsEqual(
["test2"],
(x["id"] for x in serialized[computes[0]])
[{
"id": "task3",
"type": "shell",
"required_for": [{"name": "task2", "node_id": controllers[0]}]
}],
connections[computes[0]]
)
def test_serialize_fail_if_all_task_have_not_version_2(self):
@ -104,7 +129,7 @@ class TestTaskSerializers(BaseTestCase):
]
serialized = self.serializer.serialize(
self.env.clusters[-1], self.env.nodes, tasks, ids
)
)[1]
controllers = [
n.uid for n in self.env.nodes if "controller" in n.roles
]
@ -165,16 +190,16 @@ class TestTaskSerializers(BaseTestCase):
self.serializer.process_task(
task, ["1"], task_based_deployment.NullResolver
)
self.assertItemsEqual(["1"], self.serializer.tasks_per_node)
self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"])
self.assertItemsEqual(["1"], self.serializer.tasks_connections)
self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"])
self.assertEqual(
"test", self.serializer.tasks_per_node["1"]["test"]["id"]
"test", self.serializer.tasks_connections["1"]["test"]["id"]
)
self.assertEqual(
"puppet", self.serializer.tasks_per_node["1"]["test"]["type"]
"puppet", self.serializer.tasks_connections["1"]["test"]["type"]
)
self.assertNotIn(
"skipped", self.serializer.tasks_per_node["1"]["test"]["type"]
"skipped", self.serializer.tasks_connections["1"]["test"]["type"]
)
def test_process_skipped_task(self):
@ -185,16 +210,16 @@ class TestTaskSerializers(BaseTestCase):
self.serializer.process_task(
task, ["1"], task_based_deployment.NullResolver
)
self.assertItemsEqual(["1"], self.serializer.tasks_per_node)
self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"])
self.assertItemsEqual(["1"], self.serializer.tasks_connections)
self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"])
self.assertEqual(
"test", self.serializer.tasks_per_node["1"]["test"]["id"]
"test", self.serializer.tasks_connections["1"]["test"]["id"]
)
self.assertEqual(
"skipped", self.serializer.tasks_per_node["1"]["test"]["type"]
"skipped", self.serializer.tasks_connections["1"]["test"]["type"]
)
self.assertNotIn(
"skipped", self.serializer.tasks_per_node["1"]["test"]
"skipped", self.serializer.tasks_connections["1"]["test"]
)
def test_process_noop_task(self):
@ -202,16 +227,16 @@ class TestTaskSerializers(BaseTestCase):
self.serializer.process_task(
task, ["1"], task_based_deployment.NullResolver
)
self.assertItemsEqual(["1"], self.serializer.tasks_per_node)
self.assertItemsEqual(["test"], self.serializer.tasks_per_node["1"])
self.assertItemsEqual(["1"], self.serializer.tasks_connections)
self.assertItemsEqual(["test"], self.serializer.tasks_connections["1"])
self.assertEqual(
"test", self.serializer.tasks_per_node["1"]["test"]["id"]
"test", self.serializer.tasks_connections["1"]["test"]["id"]
)
self.assertEqual(
"skipped", self.serializer.tasks_per_node["1"]["test"]["type"]
"skipped", self.serializer.tasks_connections["1"]["test"]["type"]
)
self.assertNotIn(
"skipped", self.serializer.tasks_per_node["1"]["test"]
"skipped", self.serializer.tasks_connections["1"]["test"]
)
def test_expand_task_groups(self):
@ -230,30 +255,30 @@ class TestTaskSerializers(BaseTestCase):
'type': 'skipped', 'role': '*'}
}
)
self.assertIn('1', self.serializer.tasks_per_node)
self.assertIn('2', self.serializer.tasks_per_node)
self.assertIn('1', self.serializer.tasks_connections)
self.assertIn('2', self.serializer.tasks_connections)
self.assertItemsEqual(
['task1', 'task2'],
self.serializer.tasks_per_node['1']
self.serializer.tasks_connections['1']
)
self.assertItemsEqual(
self.serializer.tasks_per_node['1'],
self.serializer.tasks_per_node['2']
self.serializer.tasks_connections['1'],
self.serializer.tasks_connections['2']
)
def test_expand_dependencies_on_same_node(self):
node_ids = ['1', '2']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
self.serializer.tasks_per_node[None] = ['sync_point']
self.serializer.tasks_connections[None] = ['sync_point']
self.assertItemsEqual(
[{'name': 'sync_point', 'node_id': None}],
[('sync_point', None)],
self.serializer.expand_dependencies('1', ['sync_point'], False)
)
self.assertItemsEqual(
[{'name': 'task_1', 'node_id': '1'}],
[('task_1', '1')],
self.serializer.expand_dependencies('1', ['/task/'], False)
)
@ -271,7 +296,7 @@ class TestTaskSerializers(BaseTestCase):
def test_expand_cross_dependencies(self):
node_ids = ['1', '2', '3']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
@ -279,7 +304,7 @@ class TestTaskSerializers(BaseTestCase):
m_resolve.resolve.return_value = node_ids
# the default role and policy
self.assertItemsEqual(
[{'name': 'task_1', 'node_id': '1'}],
[('task_1', '1')],
self.serializer.expand_cross_dependencies(
'2', [{'name': 'task_1'}], True
)
@ -289,7 +314,7 @@ class TestTaskSerializers(BaseTestCase):
)
# concrete role and policy
self.assertItemsEqual(
[{'name': 'task_2', 'node_id': '2'}],
[('task_2', '2')],
self.serializer.expand_cross_dependencies(
'2',
[{'name': 'task_2', 'role': ['role'], 'policy': 'any'}],
@ -302,7 +327,7 @@ class TestTaskSerializers(BaseTestCase):
m_resolve.resolve.reset_mock()
# use self as role
self.assertItemsEqual(
[{'name': 'task_1', 'node_id': '1'}],
[('task_1', '1')],
self.serializer.expand_cross_dependencies(
'1',
[{'name': 'task_1', 'role': 'self'}],
@ -313,45 +338,39 @@ class TestTaskSerializers(BaseTestCase):
def test_resolve_relation_when_no_chains(self):
node_ids = ['1', '2', '3']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
self.assertItemsEqual(
[{'node_id': '1', 'name': 'task_1'}],
[('task_1', '1')],
self.serializer.resolve_relation('task_1', node_ids, True)
)
self.assertItemsEqual(
({'node_id': i, 'name': 'task_{0}'.format(i)} for i in node_ids),
(('task_{0}'.format(i), i) for i in node_ids),
self.serializer.resolve_relation('/task/', node_ids, True)
)
def test_resolve_relation_in_chain(self):
node_ids = ['1', '2', '3']
self.serializer.tasks_per_node = dict(
self.serializer.tasks_connections = dict(
(node_id, ['task_{0}'.format(node_id)])
for node_id in node_ids
)
self.serializer.task_processor.origin_task_ids = {
'task_1': 'task', 'task_2': 'task', 'task_3': 'task2'
}
self.serializer.tasks_per_node['1'].append('task_2')
self.serializer.tasks_connections['1'].append('task_2')
self.assertItemsEqual(
[
{'node_id': '1', 'name': 'task_start'},
{'node_id': '2', 'name': 'task_start'},
],
[('task_start', '1'), ('task_start', '2')],
self.serializer.resolve_relation('task', node_ids, True)
)
self.assertItemsEqual(
[
{'node_id': '1', 'name': 'task_end'},
{'node_id': '2', 'name': 'task_end'}
],
[('task_end', '1'), ('task_end', '2')],
self.serializer.resolve_relation('task', node_ids, False)
)
self.assertItemsEqual(
[{'node_id': '1', 'name': 'task_1'}],
[('task_1', '1')],
self.serializer.resolve_relation('task_1', node_ids, False)
)
@ -507,19 +526,13 @@ class TestTaskProcessor(BaseTestCase):
}
self.processor._link_tasks(previous, current)
self.assertItemsEqual(
(
{'name': 'test_task_start', 'node_id': n}
for n in previous['uids']
),
(('test_task_start', n) for n in previous['uids']),
current['requires_ex']
)
current['requires_ex'] = [{'name': 'test_task_start', 'node_id': '0'}]
current['requires_ex'] = [('test_task_start', '0')]
self.processor._link_tasks(previous, current)
self.assertItemsEqual(
(
{'name': 'test_task_start', 'node_id': n}
for n in ['0'] + previous['uids']
),
(('test_task_start', n) for n in ['0'] + previous['uids']),
current['requires_ex']
)