diff --git a/cli.py b/cli.py deleted file mode 100755 index 06d3b8f8..00000000 --- a/cli.py +++ /dev/null @@ -1,125 +0,0 @@ -#!/usr/bin/python - - -import click - -from solar.orchestration import graph -from solar.orchestration import tasks - -import networkx as nx -import subprocess - -@click.group() -def orchestration(): - """ - \b - ./cli.py create orch/examples/multi.yaml - - ./cli.py execute - ./cli.py report - -> - ./cli.py restart --reset - """ - - -@click.command() -@click.argument('plan', type=click.File('rb')) -def create(plan): - click.echo(graph.create_plan(plan.read())) - - -@click.command() -@click.argument('uid') -@click.argument('plan', type=click.File('rb')) -def update(uid, plan): - graph.update_plan(uid, plan.read()) - -@click.command() -@click.argument('uid') -def report(uid): - colors = { - 'PENDING': 'blue', - 'ERROR': 'red', - 'SUCCESS': 'green', - 'INPROGRESS': 'yellow'} - - report = graph.report_topo(uid) - for item in report: - msg = '{} -> {}'.format(item[0], item[1]) - if item[2]: - msg += ' :: {}'.format(item[2]) - click.echo(click.style(msg, fg=colors[item[1]])) - - -@click.command() -@click.argument('uid') -@click.option('--start', default=None) -@click.option('--end', default=None) -def execute(uid, start, end): - tasks.schedule_start.apply_async( - args=[uid], - kwargs={'start': start, 'end': end}, - queue='scheduler') - - -@click.command() -@click.argument('uid') -def restart(uid): - graph.reset(uid) - tasks.schedule_start.apply_async(args=[uid], queue='scheduler') - - -@click.command() -@click.argument('uid') -def reset(uid): - graph.reset(uid) - - -@click.command() -@click.argument('uid') -def stop(uid): - # TODO(dshulyak) how to do "hard" stop? - # using revoke(terminate=True) will lead to inability to restart execution - # research possibility of customizations of - # app.control and Panel.register in celery - graph.soft_stop(uid) - - -@click.command() -@click.argument('uid') -def retry(uid): - graph.reset(uid, ['ERROR']) - tasks.schedule_start.apply_async(args=[uid], queue='scheduler') - - -@click.command() -@click.argument('uid') -def dg(uid): - plan = graph.get_graph(uid) - - colors = { - 'PENDING': 'blue', - 'ERROR': 'red', - 'SUCCESS': 'green', - 'INPROGRESS': 'yellow'} - - for n in plan: - color = colors[plan.node[n]['status']] - plan.node[n]['color'] = color - nx.write_dot(plan, 'graph.dot') - subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png']) - - -orchestration.add_command(create) -orchestration.add_command(update) -orchestration.add_command(report) -orchestration.add_command(execute) -orchestration.add_command(restart) -orchestration.add_command(reset) -orchestration.add_command(stop) -orchestration.add_command(retry) -orchestration.add_command(dg) - - -if __name__ == '__main__': - orchestration() diff --git a/solar/solar/orchestration/examples/example_py.yml b/examples/orch/example_py.yml similarity index 100% rename from solar/solar/orchestration/examples/example_py.yml rename to examples/orch/example_py.yml diff --git a/solar/solar/orchestration/examples/for_stop.yaml b/examples/orch/for_stop.yaml similarity index 100% rename from solar/solar/orchestration/examples/for_stop.yaml rename to examples/orch/for_stop.yaml diff --git a/solar/solar/orchestration/examples/multi.yaml b/examples/orch/multi.yaml similarity index 100% rename from solar/solar/orchestration/examples/multi.yaml rename to examples/orch/multi.yaml diff --git a/solar/solar/orchestration/examples/simple.yaml b/examples/orch/simple.yaml similarity index 100% rename from solar/solar/orchestration/examples/simple.yaml rename to examples/orch/simple.yaml diff --git a/solar/solar/orchestration/examples/test_errors.yml b/examples/orch/test_errors.yml similarity index 100% rename from solar/solar/orchestration/examples/test_errors.yml rename to examples/orch/test_errors.yml diff --git a/solar/solar/orchestration/examples/upd_test_errors.yml b/examples/orch/upd_test_errors.yml similarity index 100% rename from solar/solar/orchestration/examples/upd_test_errors.yml rename to examples/orch/upd_test_errors.yml diff --git a/solar/solar/cli/orch.py b/solar/solar/cli/orch.py index 85b89d82..05dd8be0 100644 --- a/solar/solar/cli/orch.py +++ b/solar/solar/cli/orch.py @@ -1,14 +1,13 @@ #!/usr/bin/python +import subprocess import click +import networkx as nx from solar.orchestration import graph from solar.orchestration import tasks -import networkx as nx -import subprocess - @click.group(name='orch') def orchestration(): diff --git a/solar/solar/orchestration/TODO b/solar/solar/orchestration/TODO index 9b02fba0..ad7ca136 100644 --- a/solar/solar/orchestration/TODO +++ b/solar/solar/orchestration/TODO @@ -15,7 +15,9 @@ deployment is not exposed, it is stored only in rabbitmq resources template, but it exposes several *anchors* - amqp_cluster_ready and amqp_one_node_ready 3. Granular testing -3.1. How to integrate pre/post verifications +3.1. How to integrate pre/post verifications with graph execution + +4. Add back timeout support Orchestration features ------------------------- diff --git a/solar/solar/orchestration/graph.py b/solar/solar/orchestration/graph.py index f5658ac7..d7342e23 100644 --- a/solar/solar/orchestration/graph.py +++ b/solar/solar/orchestration/graph.py @@ -1,14 +1,12 @@ +import json +import uuid + import networkx as nx - import redis -import json - import yaml -import uuid - r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1) diff --git a/solar/solar/orchestration/tasks.py b/solar/solar/orchestration/tasks.py index d86a6e9c..1927845d 100644 --- a/solar/solar/orchestration/tasks.py +++ b/solar/solar/orchestration/tasks.py @@ -1,20 +1,18 @@ + + +from functools import partial, wraps +from itertools import islice +import subprocess +import time + from celery import Celery from celery.app import task from celery import group from celery.exceptions import Ignore - -from functools import partial, wraps -from itertools import islice - -import subprocess -import time - -from solar.orchestration import graph - import redis - +from solar.orchestration import graph from solar.core import actions from solar.core import resource @@ -39,28 +37,16 @@ class ReportTask(task.Task): queue='scheduler') -solar_task = partial(app.task, base=ReportTask, bind=True) +report_task = partial(app.task, base=ReportTask, bind=True) -def maybe_ignore(func): - """used to ignore tasks when they are in queue, but should be discarded - """ - - @wraps(func) - def wrapper(ctxt, *args, **kwargs): - if r.sismember('tasks.ignore', ctxt.request.id): - raise Ignore() - return func(ctxt, *args, **kwargs) - return wrapper - - -@solar_task +@report_task def solar_resource(ctxt, resource_name, action): res = resource.load(resource_name) return actions.resource_action(res, action) -@solar_task +@report_task def cmd(ctxt, cmd): popen = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) @@ -71,17 +57,17 @@ def cmd(ctxt, cmd): return popen.returncode, out, err -@solar_task +@report_task def sleep(ctxt, seconds): time.sleep(seconds) -@solar_task +@report_task def error(ctxt, message): raise Exception('message') -@solar_task +@report_task def fault_tolerance(ctxt, percent): task_id = ctxt.request.id plan_uid, task_name = task_id.rsplit(':', 1) @@ -101,29 +87,20 @@ def fault_tolerance(ctxt, percent): succes_percent, percent)) -@solar_task +@report_task def echo(ctxt, message): return message -@solar_task +@report_task def anchor(ctxt, *args): - # it should be configurable to wait for atleast 1 / 3 resources + # such tasks should be walked when atleast 1/3/exact number of resources visited dg = graph.get_graph('current') for s in dg.predecessors(ctxt.request.id): if dg.node[s]['status'] != 'SUCCESS': raise Exception('One of the tasks erred, cant proceeed') -@app.task -def fire_timeout(task_id): - result = app.AsyncResult(task_id) - if result.state in ['ERROR', 'SUCCESS']: - return - r.sadd('tasks.ignore', task_id) - schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master') - - def schedule(plan_uid, dg): if not dg.graph.get('stop'): next_tasks = list(traverse(dg)) @@ -162,6 +139,12 @@ control_tasks = [fault_tolerance, anchor] def traverse(dg): + """ + 1. Node should be visited only when all predecessors already visited + 2. Visited nodes should have any state except PENDING, INPROGRESS, for now + is SUCCESS or ERROR, but it can be extended + 3. If node is INPROGRESS it should not be visited once again + """ visited = set() for node in dg: data = dg.node[node] @@ -181,7 +164,7 @@ def traverse(dg): if predecessors <= visited: task_id = '{}:{}'.format(dg.graph['uid'], node) - task_name = 'orch.tasks.{0}'.format(data['type']) + task_name = '{}.{}'.format(__name__, data['type']) task = app.tasks[task_name] if all_success(dg, predecessors) or task in control_tasks: @@ -200,15 +183,8 @@ def generate_task(task, dg, data, task_id): if data.get('target', None): subtask.set(queue=data['target']) - timeout = data.get('timeout') - yield subtask - if timeout: - timeout_task = fire_timeout.subtask([task_id], countdown=timeout) - timeout_task.set(queue='scheduler') - yield timeout_task - def all_success(dg, nodes): return all((n for n in nodes if dg.node[n]['status'] == 'SUCCESS')) diff --git a/solar/solar/orchestration/test_examples.py b/solar/solar/orchestration/test_examples.py deleted file mode 100644 index 888b9672..00000000 --- a/solar/solar/orchestration/test_examples.py +++ /dev/null @@ -1,158 +0,0 @@ - - -import networkx as nx - -from solar.orchestration.tasks import * -from solar.orchestratoin.graph import * - -from pytest import fixture - -import time - - -@fixture(autouse=True) -def clean_ignored(): - r.delete('tasks.ignore') - - -def ex1(): - dg = nx.DiGraph() - - dg.add_node('rabbitmq_cluster1.create', type='cmd', args=['echo "installing cluster"'], status='PENDING') - dg.add_node('rabbitmq_cluster2.join', type='cmd', args=['echo "joining"'], status='PENDING') - dg.add_node('rabbitmq_cluster3.join', type='cmd', args=['echo "joining"'], status='PENDING') - dg.add_node('rabbitmq_cluster.ready', type='anchor', args=[], status='PENDING') - - dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster2.join') - dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster3.join') - dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster.ready') - dg.add_edge('rabbitmq_cluster2.join', 'rabbitmq_cluster.ready') - dg.add_edge('rabbitmq_cluster3.join', 'rabbitmq_cluster.ready') - - dg.add_node('compute1', type='cmd', args=['echo "compute1"'], status='PENDING') - dg.add_node('compute2', type='cmd', args=['echo "compute2"'], status='PENDING') - dg.add_node('compute3', type='cmd', args=['echo "compute3"'], status='PENDING') - dg.add_node('compute4', type='error', args=['echo "compute4"'], status='PENDING') - dg.add_node('compute5', type='error', args=['echo "compute5"'], status='PENDING') - dg.add_node('compute_ready', type='fault_tolerance', args=[60], status='PENDING') - - dg.add_edge('rabbitmq_cluster.ready', 'compute1') - dg.add_edge('rabbitmq_cluster.ready', 'compute2') - dg.add_edge('rabbitmq_cluster.ready', 'compute3') - dg.add_edge('rabbitmq_cluster.ready', 'compute4') - dg.add_edge('rabbitmq_cluster.ready', 'compute5') - - dg.add_edge('compute1', 'compute_ready') - dg.add_edge('compute2', 'compute_ready') - dg.add_edge('compute3', 'compute_ready') - dg.add_edge('compute4', 'compute_ready') - dg.add_edge('compute5', 'compute_ready') - - return dg - - -def test_ex1_exec(): - save_graph('current', ex1()) - schedule_start.apply_async(queue='master') - - -def ex2(): - - dg = nx.DiGraph() - - dg.add_node('rabbitmq_cluster2.join', type='cmd', args=['echo "joining"'], status='PENDING') - dg.add_node('rabbitmq_cluster3.join', type='cmd', args=['echo "joining"'], status='PENDING') - dg.add_node('rabbitmq_cluster.ready', type='anchor', args=[], status='PENDING') - - dg.add_edge('rabbitmq_cluster2.join', 'rabbitmq_cluster.ready') - dg.add_edge('rabbitmq_cluster3.join', 'rabbitmq_cluster.ready') - - return dg - -def test_ex2_exec(): - save_graph('current', ex2()) - schedule_start.apply_async(queue='master') - - -def test_timelimit_exec(): - - dg = nx.DiGraph() - - dg.add_node( - 'timelimit_test', type='sleep', - args=[100], status='PENDING', - time_limit=10) - - dg.add_node( - 'soft_timelimit_test', type='sleep', - args=[100], status='PENDING', - soft_time_limit=10) - - save_graph('current', dg) - schedule_start.apply_async(queue='master') - - -def test_timeout(): - # TODO(dshulyak) how to handle connectivity issues? - # or hardware failure ? - dg = nx.DiGraph() - - dg.add_node( - 'test_timeout', type='echo', target='unreachable', - args=['yoyoyo'], status='PENDING', - timeout=1) - - save_graph('current', dg) - # two tasks will be fired - test_timeout and fire_timeout(test_timeout) - # with countdown set to 10 sec - schedule_start.apply_async(queue='master') - # after 10 seconds fire_timeout will set test_timeout to ERROR - time.sleep(1) - - # master host will start listening from unreachable queue, but task will be ignored - # e.g it will be acked, and fetched from broker, but not processed - assert app.control.add_consumer( - 'unreachable', reply=True, destination=['celery@master']) - dg = get_graph('current') - assert dg.node['test_timeout']['status'] == 'ERROR' - - - -def test_target_exec(): - dg = nx.DiGraph() - - dg.add_node( - 'vagrant_reload', type='cmd', - args=['vagrant reload solar-dev1'], status='PENDING', target='ipmi') - save_graph('current', dg) - schedule_start.apply_async(queue='master') - - -def test_limit_concurrency(): - # - no more than 2 tasks in general - dg = nx.DiGraph() - dg.graph['concurrency'] = 2 - - for i in range(4): - dg.add_node( - str(i), type='echo', - args=[i], status='PENDING') - - save_graph('current', dg) - schedule_start.apply_async(queue='master') - - -def test_ignored(): - - dg = nx.DiGraph() - - dg.add_node( - 'test_ignored', type='echo', args=['hello'], status='PENDING') - r.sadd('tasks.ignore', 'test_ignored') - save_graph('current', dg) - - schedule_start.apply_async(queue='master') - ignored = app.AsyncResult('test_ignored') - ignored.get() - dg = get_graph('current') - assert dg.node['test_ignored']['status'] == {}