Remove unnecessary code for now with some cleanup

This commit is contained in:
Dmitry Shulyak 2015-07-01 09:29:50 +03:00
parent 71014ae272
commit 517195cef2
12 changed files with 32 additions and 340 deletions

125
cli.py
View File

@ -1,125 +0,0 @@
#!/usr/bin/python
import click
from solar.orchestration import graph
from solar.orchestration import tasks
import networkx as nx
import subprocess
@click.group()
def orchestration():
"""
\b
./cli.py create orch/examples/multi.yaml
<id>
./cli.py execute <id>
./cli.py report <id>
<task> -> <status>
./cli.py restart <id> --reset
"""
@click.command()
@click.argument('plan', type=click.File('rb'))
def create(plan):
click.echo(graph.create_plan(plan.read()))
@click.command()
@click.argument('uid')
@click.argument('plan', type=click.File('rb'))
def update(uid, plan):
graph.update_plan(uid, plan.read())
@click.command()
@click.argument('uid')
def report(uid):
colors = {
'PENDING': 'blue',
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow'}
report = graph.report_topo(uid)
for item in report:
msg = '{} -> {}'.format(item[0], item[1])
if item[2]:
msg += ' :: {}'.format(item[2])
click.echo(click.style(msg, fg=colors[item[1]]))
@click.command()
@click.argument('uid')
@click.option('--start', default=None)
@click.option('--end', default=None)
def execute(uid, start, end):
tasks.schedule_start.apply_async(
args=[uid],
kwargs={'start': start, 'end': end},
queue='scheduler')
@click.command()
@click.argument('uid')
def restart(uid):
graph.reset(uid)
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@click.command()
@click.argument('uid')
def reset(uid):
graph.reset(uid)
@click.command()
@click.argument('uid')
def stop(uid):
# TODO(dshulyak) how to do "hard" stop?
# using revoke(terminate=True) will lead to inability to restart execution
# research possibility of customizations of
# app.control and Panel.register in celery
graph.soft_stop(uid)
@click.command()
@click.argument('uid')
def retry(uid):
graph.reset(uid, ['ERROR'])
tasks.schedule_start.apply_async(args=[uid], queue='scheduler')
@click.command()
@click.argument('uid')
def dg(uid):
plan = graph.get_graph(uid)
colors = {
'PENDING': 'blue',
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow'}
for n in plan:
color = colors[plan.node[n]['status']]
plan.node[n]['color'] = color
nx.write_dot(plan, 'graph.dot')
subprocess.call(['dot', '-Tpng', 'graph.dot', '-o', 'graph.png'])
orchestration.add_command(create)
orchestration.add_command(update)
orchestration.add_command(report)
orchestration.add_command(execute)
orchestration.add_command(restart)
orchestration.add_command(reset)
orchestration.add_command(stop)
orchestration.add_command(retry)
orchestration.add_command(dg)
if __name__ == '__main__':
orchestration()

View File

@ -1,14 +1,13 @@
#!/usr/bin/python
import subprocess
import click
import networkx as nx
from solar.orchestration import graph
from solar.orchestration import tasks
import networkx as nx
import subprocess
@click.group(name='orch')
def orchestration():

View File

@ -15,7 +15,9 @@ deployment is not exposed, it is stored only in rabbitmq resources template,
but it exposes several *anchors* - amqp_cluster_ready and amqp_one_node_ready
3. Granular testing
3.1. How to integrate pre/post verifications
3.1. How to integrate pre/post verifications with graph execution
4. Add back timeout support
Orchestration features
-------------------------

View File

@ -1,14 +1,12 @@
import json
import uuid
import networkx as nx
import redis
import json
import yaml
import uuid
r = redis.StrictRedis(host='10.0.0.2', port=6379, db=1)

View File

@ -1,20 +1,18 @@
from functools import partial, wraps
from itertools import islice
import subprocess
import time
from celery import Celery
from celery.app import task
from celery import group
from celery.exceptions import Ignore
from functools import partial, wraps
from itertools import islice
import subprocess
import time
from solar.orchestration import graph
import redis
from solar.orchestration import graph
from solar.core import actions
from solar.core import resource
@ -39,28 +37,16 @@ class ReportTask(task.Task):
queue='scheduler')
solar_task = partial(app.task, base=ReportTask, bind=True)
report_task = partial(app.task, base=ReportTask, bind=True)
def maybe_ignore(func):
"""used to ignore tasks when they are in queue, but should be discarded
"""
@wraps(func)
def wrapper(ctxt, *args, **kwargs):
if r.sismember('tasks.ignore', ctxt.request.id):
raise Ignore()
return func(ctxt, *args, **kwargs)
return wrapper
@solar_task
@report_task
def solar_resource(ctxt, resource_name, action):
res = resource.load(resource_name)
return actions.resource_action(res, action)
@solar_task
@report_task
def cmd(ctxt, cmd):
popen = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
@ -71,17 +57,17 @@ def cmd(ctxt, cmd):
return popen.returncode, out, err
@solar_task
@report_task
def sleep(ctxt, seconds):
time.sleep(seconds)
@solar_task
@report_task
def error(ctxt, message):
raise Exception('message')
@solar_task
@report_task
def fault_tolerance(ctxt, percent):
task_id = ctxt.request.id
plan_uid, task_name = task_id.rsplit(':', 1)
@ -101,29 +87,20 @@ def fault_tolerance(ctxt, percent):
succes_percent, percent))
@solar_task
@report_task
def echo(ctxt, message):
return message
@solar_task
@report_task
def anchor(ctxt, *args):
# it should be configurable to wait for atleast 1 / 3 resources
# such tasks should be walked when atleast 1/3/exact number of resources visited
dg = graph.get_graph('current')
for s in dg.predecessors(ctxt.request.id):
if dg.node[s]['status'] != 'SUCCESS':
raise Exception('One of the tasks erred, cant proceeed')
@app.task
def fire_timeout(task_id):
result = app.AsyncResult(task_id)
if result.state in ['ERROR', 'SUCCESS']:
return
r.sadd('tasks.ignore', task_id)
schedule_next.apply_async(args=[task_id, 'ERROR'], queue='master')
def schedule(plan_uid, dg):
if not dg.graph.get('stop'):
next_tasks = list(traverse(dg))
@ -162,6 +139,12 @@ control_tasks = [fault_tolerance, anchor]
def traverse(dg):
"""
1. Node should be visited only when all predecessors already visited
2. Visited nodes should have any state except PENDING, INPROGRESS, for now
is SUCCESS or ERROR, but it can be extended
3. If node is INPROGRESS it should not be visited once again
"""
visited = set()
for node in dg:
data = dg.node[node]
@ -181,7 +164,7 @@ def traverse(dg):
if predecessors <= visited:
task_id = '{}:{}'.format(dg.graph['uid'], node)
task_name = 'orch.tasks.{0}'.format(data['type'])
task_name = '{}.{}'.format(__name__, data['type'])
task = app.tasks[task_name]
if all_success(dg, predecessors) or task in control_tasks:
@ -200,15 +183,8 @@ def generate_task(task, dg, data, task_id):
if data.get('target', None):
subtask.set(queue=data['target'])
timeout = data.get('timeout')
yield subtask
if timeout:
timeout_task = fire_timeout.subtask([task_id], countdown=timeout)
timeout_task.set(queue='scheduler')
yield timeout_task
def all_success(dg, nodes):
return all((n for n in nodes if dg.node[n]['status'] == 'SUCCESS'))

View File

@ -1,158 +0,0 @@
import networkx as nx
from solar.orchestration.tasks import *
from solar.orchestratoin.graph import *
from pytest import fixture
import time
@fixture(autouse=True)
def clean_ignored():
r.delete('tasks.ignore')
def ex1():
dg = nx.DiGraph()
dg.add_node('rabbitmq_cluster1.create', type='cmd', args=['echo "installing cluster"'], status='PENDING')
dg.add_node('rabbitmq_cluster2.join', type='cmd', args=['echo "joining"'], status='PENDING')
dg.add_node('rabbitmq_cluster3.join', type='cmd', args=['echo "joining"'], status='PENDING')
dg.add_node('rabbitmq_cluster.ready', type='anchor', args=[], status='PENDING')
dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster2.join')
dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster3.join')
dg.add_edge('rabbitmq_cluster1.create', 'rabbitmq_cluster.ready')
dg.add_edge('rabbitmq_cluster2.join', 'rabbitmq_cluster.ready')
dg.add_edge('rabbitmq_cluster3.join', 'rabbitmq_cluster.ready')
dg.add_node('compute1', type='cmd', args=['echo "compute1"'], status='PENDING')
dg.add_node('compute2', type='cmd', args=['echo "compute2"'], status='PENDING')
dg.add_node('compute3', type='cmd', args=['echo "compute3"'], status='PENDING')
dg.add_node('compute4', type='error', args=['echo "compute4"'], status='PENDING')
dg.add_node('compute5', type='error', args=['echo "compute5"'], status='PENDING')
dg.add_node('compute_ready', type='fault_tolerance', args=[60], status='PENDING')
dg.add_edge('rabbitmq_cluster.ready', 'compute1')
dg.add_edge('rabbitmq_cluster.ready', 'compute2')
dg.add_edge('rabbitmq_cluster.ready', 'compute3')
dg.add_edge('rabbitmq_cluster.ready', 'compute4')
dg.add_edge('rabbitmq_cluster.ready', 'compute5')
dg.add_edge('compute1', 'compute_ready')
dg.add_edge('compute2', 'compute_ready')
dg.add_edge('compute3', 'compute_ready')
dg.add_edge('compute4', 'compute_ready')
dg.add_edge('compute5', 'compute_ready')
return dg
def test_ex1_exec():
save_graph('current', ex1())
schedule_start.apply_async(queue='master')
def ex2():
dg = nx.DiGraph()
dg.add_node('rabbitmq_cluster2.join', type='cmd', args=['echo "joining"'], status='PENDING')
dg.add_node('rabbitmq_cluster3.join', type='cmd', args=['echo "joining"'], status='PENDING')
dg.add_node('rabbitmq_cluster.ready', type='anchor', args=[], status='PENDING')
dg.add_edge('rabbitmq_cluster2.join', 'rabbitmq_cluster.ready')
dg.add_edge('rabbitmq_cluster3.join', 'rabbitmq_cluster.ready')
return dg
def test_ex2_exec():
save_graph('current', ex2())
schedule_start.apply_async(queue='master')
def test_timelimit_exec():
dg = nx.DiGraph()
dg.add_node(
'timelimit_test', type='sleep',
args=[100], status='PENDING',
time_limit=10)
dg.add_node(
'soft_timelimit_test', type='sleep',
args=[100], status='PENDING',
soft_time_limit=10)
save_graph('current', dg)
schedule_start.apply_async(queue='master')
def test_timeout():
# TODO(dshulyak) how to handle connectivity issues?
# or hardware failure ?
dg = nx.DiGraph()
dg.add_node(
'test_timeout', type='echo', target='unreachable',
args=['yoyoyo'], status='PENDING',
timeout=1)
save_graph('current', dg)
# two tasks will be fired - test_timeout and fire_timeout(test_timeout)
# with countdown set to 10 sec
schedule_start.apply_async(queue='master')
# after 10 seconds fire_timeout will set test_timeout to ERROR
time.sleep(1)
# master host will start listening from unreachable queue, but task will be ignored
# e.g it will be acked, and fetched from broker, but not processed
assert app.control.add_consumer(
'unreachable', reply=True, destination=['celery@master'])
dg = get_graph('current')
assert dg.node['test_timeout']['status'] == 'ERROR'
def test_target_exec():
dg = nx.DiGraph()
dg.add_node(
'vagrant_reload', type='cmd',
args=['vagrant reload solar-dev1'], status='PENDING', target='ipmi')
save_graph('current', dg)
schedule_start.apply_async(queue='master')
def test_limit_concurrency():
# - no more than 2 tasks in general
dg = nx.DiGraph()
dg.graph['concurrency'] = 2
for i in range(4):
dg.add_node(
str(i), type='echo',
args=[i], status='PENDING')
save_graph('current', dg)
schedule_start.apply_async(queue='master')
def test_ignored():
dg = nx.DiGraph()
dg.add_node(
'test_ignored', type='echo', args=['hello'], status='PENDING')
r.sadd('tasks.ignore', 'test_ignored')
save_graph('current', dg)
schedule_start.apply_async(queue='master')
ignored = app.AsyncResult('test_ignored')
ignored.get()
dg = get_graph('current')
assert dg.node['test_ignored']['status'] == {}