From a32cb61b9d20883798ef2fdc889da730fc54050c Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Wed, 20 Nov 2013 15:13:42 +0400 Subject: [PATCH] Modify use case example * Yaml config: actions, transports, tasks * Find only one node and subgraph connected with graph root and this node Change-Id: I29935f66152869e169bbe1f0fba7b52cf228b439 --- concepts/execute_config.json | 3 ++ concepts/use_case_example.json | 66 ------------------------- concepts/use_case_example.py | 88 +++++++++++++++++++++++++++------- concepts/use_case_example.yaml | 42 ++++++++++++++++ requirements.txt | 1 + 5 files changed, 116 insertions(+), 84 deletions(-) create mode 100644 concepts/execute_config.json delete mode 100644 concepts/use_case_example.json create mode 100644 concepts/use_case_example.yaml diff --git a/concepts/execute_config.json b/concepts/execute_config.json new file mode 100644 index 000000000..822cae31d --- /dev/null +++ b/concepts/execute_config.json @@ -0,0 +1,3 @@ +{ + "executeTask": "format_volumes" +} \ No newline at end of file diff --git a/concepts/use_case_example.json b/concepts/use_case_example.json deleted file mode 100644 index 56126a82a..000000000 --- a/concepts/use_case_example.json +++ /dev/null @@ -1,66 +0,0 @@ -{ - "config": { - "tasks": [ - { - "name": "Create Environment", - "provides": ["env"] - }, - { - "name": "Build Image", - "provides": ["image"], - "requires": ["env"] - }, - { - "name": "CreateVM1", - "provides": ["vm1"], - "requires": ["image"] - }, - { - "name": "CreateVM2", - "provides": ["vm2"], - "requires": ["image"] - }, - { - "name": "Install Server", - "provides": ["server"], - "requires": ["vm2"] - }, - { - "name": "Install Agent", - "provides": ["agent"], - "requires": ["vm1"] - }, - { - "name": "Install Dependencies for Agent", - "provides": ["agentDeps"], - "requires": ["vm1"] - }, - { - "name": "Install MySQL", - "provides": ["sql"], - "requires": ["vm2"] - }, - { - "name": "Configure Agent", - "provides": ["configuredAgent"], - "requires": ["agent"] - }, - { - "name": "Start Server", - "provides": ["serverStart"], - "requires": ["server", "sql"] - }, - { - "name": "Start Agent", - "provides": ["agentStart"], - "requires": ["agentDeps", "configuredAgent"] - }, - { - "name": "Show Agent's log", - "provides": ["log"], - "requires": ["serverStart", "agentStart"] - } - ] - }, - "flowName": "Service" -} \ No newline at end of file diff --git a/concepts/use_case_example.py b/concepts/use_case_example.py index e20488985..b50bea195 100644 --- a/concepts/use_case_example.py +++ b/concepts/use_case_example.py @@ -14,7 +14,10 @@ # limitations under the License. import json +import networkx as nx +import yaml from time import sleep +from yaml import composer from taskflow import task from taskflow import engines @@ -23,38 +26,87 @@ from taskflow.patterns import graph_flow class BaseServiceTask(task.Task): def __init__(self, provides=None, requires=None, - execute=None, name=None): + execute=None, name=None, config=None): super(BaseServiceTask, self).__init__(provides=provides, requires=requires, name=name) - self.func = execute + self.config = config def revert(self, *args, **kwargs): print ("Task '%s' is REVERTING" % self.name) + #TODO (nmakhotkin) here should be a really action + def do_action(self, *args, **kwargs): + pass + def execute(self, *args, **kwargs): - print (self.name) - sleep(2) - return self.name + return self.do_action(args, kwargs) -def get_task(task_config): - return BaseServiceTask( - provides=task_config.get("provides", []), - requires=task_config.get("requires", []), - name=task_config["name"] +class ServiceTask(BaseServiceTask): + def do_action(self, *args, **kwargs): + action = self.config["actions"][self.name] + transport_name = action.get("transport", None) + print("Action executing: " + self.name + ",") + print("Doing " + str(action)) + if transport_name: + transport = self.config["transports"][transport_name] + print("transport: " + str(transport)) + print("") + sleep(0.5) + + +def get_task(task_name, task_data, config): + return ServiceTask( + provides=task_name, + requires=task_data.get("requires", []), + name=task_name, + config=config ) -def load_flow(config_path): - config = json.loads(open(config_path).read()) - tasks = config["config"]["tasks"] - flow = graph_flow.Flow(config["flowName"]) - for task in tasks: - flow.add(get_task(task)) +def get_stream(file_name): + return open(file_name).read() + + +def load_flow(cfg_stream): + try: + config = yaml.load(cfg_stream) + except composer.ComposerError: + config = json.loads(cfg_stream) + except ValueError: + raise RuntimeError("Config could not be parsed.") + tasks = config["tasks"] + name = tasks.items()[-1][0] + flow = graph_flow.Flow(name) + for name, data in tasks.items(): + flow.add(get_task(name, data, config)) return flow +def get_by_name(graph, name): + for node in graph: + if node.name == name: + return node + return None + + +def get_root(graph): + for node in graph: + if len(graph.predecessors(node)) == 0: + return node + + if __name__ == "__main__": - flow = load_flow("use_case_example.json") - engines.run(flow, engine_conf="parallel") + flow = load_flow(get_stream("concepts/use_case_example.yaml")) + graph = nx.DiGraph(flow._graph.copy()) + ex_cfg = json.load(open("concepts/execute_config.json")) + all_paths = nx.all_simple_paths(graph, + get_root(graph), + get_by_name(graph, + ex_cfg["executeTask"])) + nodes_set = set([node for path in all_paths for node in path]) + sub_graph = graph.subgraph(nodes_set) + our_flow = graph_flow.Flow(name=flow.name) + our_flow._swap(sub_graph) + engines.run(our_flow, engine_conf="parallel") diff --git a/concepts/use_case_example.yaml b/concepts/use_case_example.yaml new file mode 100644 index 000000000..f5d1df2a6 --- /dev/null +++ b/concepts/use_case_example.yaml @@ -0,0 +1,42 @@ +actions: + create_vm: + transport: my_amqp + + attach_volumes: + transport: my_amqp + + format_volumes: + transport: my_amqp + +transports: + my_amqp: + type: amqp + host: my_host + port: 5672 + exchange: amqp_direct + routing-key: my_queue + user: guest + password: guest + content-type: application/json + +tasks: + create_vm: + action: create_vm + args: + - (my_image_id, my_flavor_id) + + attach_volumes: + requires: [create_vm] + action: attach_volume + args: + - (ctx.vm_id, 10GB, /mnt/my_vol1) + - (ctx.vm_id, 7GB, /mnt/my_vol2) + - (ctx.vm_id, 15GB, /mnt/my_vol3) + + format_volumes: + requires: [attach_volumes] + action: format_volume + args: + - (ctx.vm_id, /mnt/my_vol1) + - (ctx.vm_id, /mnt/my_vol2) + - (ctx.vm_id, /mnt/my_vol3) diff --git a/requirements.txt b/requirements.txt index 0b2d1618b..6207a8af7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pbr>=0.5.21,<1.0 taskflow +pyyaml