Merge pull request #395 from dshulyak/f2s_rebased

F2s patches to solar
This commit is contained in:
Jędrzej Nowak 2015-12-01 17:24:38 +01:00
commit 25f7f2049d
23 changed files with 169 additions and 36 deletions

View File

@ -2,18 +2,16 @@ FROM ubuntu:14.04
WORKDIR /
RUN apt-get update
# Install pip's dependency: setuptools:
RUN apt-get install -y python python-dev python-distribute python-pip
RUN pip install ansible
ADD bootstrap/playbooks/celery.yaml /celery.yaml
ADD resources /resources
ADD templates /templates
ADD run.sh /run.sh
RUN apt-get update
RUN apt-get install -y python python-dev python-distribute python-pip \
libyaml-dev vim libffi-dev libssl-dev
RUN pip install ansible
RUN apt-get install -y libffi-dev libssl-dev
RUN pip install https://github.com/Mirantis/solar/archive/master.zip
RUN pip install https://github.com/Mirantis/solar-agent/archive/master.zip

View File

@ -18,6 +18,7 @@ solar-celery:
links:
- riak
- redis
riak:
image: tutum/riak
ports:

View File

@ -3,9 +3,9 @@ ply
click==4.0
jinja2==2.7.3
networkx>=1.10
PyYAML>=3.1.0
PyYAML
jsonschema==2.4.0
requests==2.7.0
requests
dictdiffer==0.4.0
enum34==1.0.4
redis==2.10.3

View File

@ -0,0 +1,10 @@
#!/usr/bin/env python
import sys
import json
data = json.loads(sys.stdin.read())
rst = {'val_x_val': int(data['val'])**2}
sys.stdout.write(json.dumps(rst))

View File

@ -0,0 +1,12 @@
id: managed
handler: none
version: 1.0.0
managers:
- managers/manager.py
input:
val:
schema: int!
value: 2
val_x_val:
schema: int
value:

View File

@ -0,0 +1,7 @@
id: sources
handler: naive_sync
version: 1.0.0
input:
sources:
schema: [{'src': 'str!', 'dst': 'str!'}]
value: []

2
run.sh
View File

@ -6,6 +6,6 @@ if [ -d /solar ]; then
fi
#used only to start celery on docker
ansible-playbook -v -i "localhost," -c local /celery.yaml --skip-tags slave
ansible-playbook -v -i "localhost," -c local /celery.yaml --skip-tags slave,stop
tail -f /var/run/celery/*.log

View File

@ -77,7 +77,8 @@ def click_report(uid):
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow',
'SKIPPED': 'blue'}
'SKIPPED': 'blue',
'NOOP': 'black'}
total = 0.0
report = graph.report_topo(uid)
@ -115,6 +116,13 @@ def filter(uid, start, end):
click.echo('Created {name}.png'.format(name=plan.graph['name']))
@orchestration.command(help='Used to mark task as executed')
@click.argument('uid', type=SOLARUID)
@click.option('--task', '-t', multiple=True)
def noop(uid, task):
graph.set_states(uid, task)
@orchestration.command(name='run-once')
@click.argument('uid', type=SOLARUID, default='last')
@click.option('-w', 'wait', default=0)

View File

@ -261,3 +261,17 @@ def remove(name, tag, f):
else:
msg = 'Resource %s will be removed after commiting changes.' % res.name # NOQA
click.echo(msg)
@resource.command()
@click.option('--name', '-n')
@click.option('--tag', '-t', multiple=True)
def prefetch(name, tag):
if name:
resources = [sresource.load(name)]
elif tag:
resources = sresource.load_by_tags(set(tag))
for res in resources:
res.prefetch()
click.echo(res.color_repr())

View File

@ -18,6 +18,7 @@ from solar.core.handlers.ansible_playbook import AnsiblePlaybook
from solar.core.handlers.base import Empty
from solar.core.handlers.puppet import Puppet, PuppetV2
from solar.core.handlers.shell import Shell
from solar.core.handlers.naive_sync import NaiveSync
HANDLERS = {'ansible': AnsibleTemplate,
@ -25,7 +26,8 @@ HANDLERS = {'ansible': AnsibleTemplate,
'shell': Shell,
'puppet': Puppet,
'none': Empty,
'puppetv2': PuppetV2}
'puppetv2': PuppetV2,
'naive_sync': NaiveSync}
def get(handler_name):

View File

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from solar.core.handlers.base import BaseHandler
class NaiveSync(BaseHandler):
def action(self, resource, action_name):
# it is inconsistent with handlers because action_name
# is totally useless piece of info here
args = resource.args
# this src seems not intuitive to me, wo context it is impossible
# to understand where src comes from
for item in args['sources']:
self.transport_sync.copy(resource, item['src'], item['dst'])
self.transport_sync.sync_all()

View File

@ -82,4 +82,5 @@ class Puppet(TempFileHandler):
class PuppetV2(Puppet):
def _make_args(self, resource):
return resource.args
args = resource.args
return {k: args[k] for k in args if not args[k] is None}

View File

@ -15,6 +15,7 @@
from copy import deepcopy
from hashlib import md5
import json
import os
from uuid import uuid4
@ -89,7 +90,8 @@ class Resource(object):
'version': metadata.get('version', ''),
'meta_inputs': inputs,
'tags': tags,
'state': RESOURCE_STATE.created.name
'state': RESOURCE_STATE.created.name,
'managers': metadata.get('managers', [])
})
self.create_inputs(args)
@ -285,6 +287,16 @@ class Resource(object):
receiver.db_obj.save_lazy()
self.db_obj.save_lazy()
def prefetch(self):
if not self.db_obj.managers:
return
for manager in self.db_obj.managers:
manager_path = os.path.join(self.db_obj.base_path, manager)
rst = utils.communicate([manager_path], json.dumps(self.args))
if rst:
self.update(json.loads(rst))
def load(name):
r = DBResource.get(name)

View File

@ -130,6 +130,7 @@ def create_resources(resources, tags=None):
resource_name = r['id']
args = r.get('values', {})
node = r.get('location', None)
values_from = r.get('values_from')
from_path = r.get('from', None)
tags = r.get('tags', [])
base_path = os.path.join(cwd, from_path)
@ -141,7 +142,13 @@ def create_resources(resources, tags=None):
r = new_resources[0]
node.connect(r, mapping={})
r.add_tags('location={}'.format(node.name))
update_inputs(resource_name, args)
if values_from:
from_resource = load_resource(values_from)
from_resource.connect_with_events(r, use_defaults=False)
return created_resources

View File

@ -53,7 +53,7 @@ class InputsFieldWrp(IndexFieldWrp):
# XXX: it could be worth to precalculate it
if ':' in name:
name = name.split(":", 1)[0]
schema = resource.meta_inputs[name]['schema']
schema = resource.meta_inputs[name].get('schema', None)
if isinstance(schema, self._simple_types):
return InputTypes.simple
if isinstance(schema, list):
@ -715,6 +715,7 @@ class Resource(Model):
meta_inputs = Field(dict, default=dict)
state = Field(str) # on_set/on_get would be useful
events = Field(list, default=list)
managers = Field(list, default=list)
inputs = InputsField(default=dict)
tags = TagsField(default=list)

View File

@ -29,12 +29,10 @@ def celery_executor(dg, tasks, control_tasks=()):
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task = app.tasks[dg.node[task_name]['type']]
all_ok = all_success(dg, dg.predecessors(task_name))
if all_ok or task_name in control_tasks:
dg.node[task_name]['status'] = 'INPROGRESS'
dg.node[task_name]['start_time'] = time.time()
for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t)
dg.node[task_name]['status'] = 'INPROGRESS'
dg.node[task_name]['start_time'] = time.time()
for t in generate_task(task, dg.node[task_name], task_id):
to_execute.append(t)
return group(to_execute)

View File

@ -12,18 +12,19 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import Counter
import time
import uuid
from collections import Counter
import networkx as nx
from solar.dblayer.model import clear_cache
from solar.dblayer.model import ModelMeta
from solar.dblayer.solar_models import Task
from solar import errors
from solar import utils
from solar.orchestration.traversal import states
from solar import utils
def save_graph(graph):
@ -55,6 +56,16 @@ def update_graph(graph):
task.save()
def set_states(uid, tasks):
plan = get_graph(uid)
for t in tasks:
if t not in plan.node:
raise Exception("No task %s in plan %s", t, uid)
plan.node[t]['task'].status = states.NOOP.name
plan.node[t]['task'].save_lazy()
ModelMeta.save_all_lazy()
def get_graph(uid):
dg = nx.MultiDiGraph()
dg.graph['uid'] = uid

View File

@ -12,8 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from functools import partial
import subprocess
import time
from functools import partial
from celery.app import task
from celery.signals import task_postrun
@ -29,7 +31,7 @@ from solar.orchestration.runner import app
from solar.orchestration.traversal import traverse
from solar.system_log.tasks import commit_logitem
from solar.system_log.tasks import error_logitem
import time
__all__ = ['solar_resource', 'cmd', 'sleep',
'error', 'fault_tolerance', 'schedule_start', 'schedule_next']

View File

@ -32,8 +32,8 @@ from enum import Enum
states = Enum('States', 'SUCCESS ERROR NOOP INPROGRESS SKIPPED PENDING')
VISITED = (states.SUCCESS.name, states.ERROR.name, states.NOOP.name)
BLOCKED = (states.INPROGRESS.name, states.SKIPPED.name)
VISITED = (states.SUCCESS.name, states.NOOP.name)
BLOCKED = (states.INPROGRESS.name, states.SKIPPED.name, states.ERROR.name)
def traverse(dg):
@ -43,7 +43,7 @@ def traverse(dg):
data = dg.node[node]
if data['status'] in VISITED:
visited.add(node)
rst = []
for node in dg:
data = dg.node[node]
@ -51,4 +51,5 @@ def traverse(dg):
continue
if set(dg.predecessors(node)) <= visited:
yield node
rst.append(node)
return rst

View File

@ -27,7 +27,8 @@ def write_graph(plan):
'ERROR': 'red',
'SUCCESS': 'green',
'INPROGRESS': 'yellow',
'SKIPPED': 'blue'}
'SKIPPED': 'blue',
'NOOP': 'black'}
for n in plan:
color = colors[plan.node[n]['status']]

View File

@ -17,6 +17,7 @@ import io
import json
import logging
import os
import subprocess
import uuid
from jinja2 import Environment
@ -34,6 +35,14 @@ def to_pretty_json(data):
return json.dumps(data, indent=4)
def communicate(command, data):
popen = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
return popen.communicate(input=data)[0]
# Configure jinja2 filters
jinja_env_with_filters = Environment()
jinja_env_with_filters.filters['to_json'] = to_json

View File

@ -6,8 +6,8 @@ resources:
values:
name: mos-holdback
package: '*'
repo: deb http://mirror.fuel-infra.org/mos-repos/ubuntu/7.0/ mos7.0-holdback main restricted
pin: release o=Mirantis,n=mos7.0,a=mos7.0-holdback,l=mos7.0
repo: deb http://mirror.fuel-infra.org/mos-repos/ubuntu/8.0/ mos8.0-holdback main restricted
pin: release o=Mirantis,n=mos8.0,a=mos8.0-holdback,l=mos8.0
pin_priority: 1100
- id: mos_security_{{index}}
from: resources/apt_repo
@ -15,8 +15,8 @@ resources:
values:
name: mos
package: '*'
repo: deb http://mirror.fuel-infra.org/mos-repos/ubuntu/7.0/ mos7.0-security main restricted
pin: release o=Mirantis,n=mos7.0,a=mos7.0-security,l=mos7.0
repo: deb http://mirror.fuel-infra.org/mos-repos/ubuntu/8.0/ mos8.0-security main restricted
pin: release o=Mirantis,n=mos8.0,a=mos8.0-security,l=mos8.0
pin_priority: 1050
- id: mos_updates_{{index}}
from: resources/apt_repo
@ -24,8 +24,8 @@ resources:
values:
name: mos_update
package: '*'
repo: deb http://mirror.fuel-infra.org/mos-repos/ubuntu/7.0/ mos7.0-updates main restricted
pin: release o=Mirantis,a=mos7.0-updates,l=mos7.0,n=mos7.0
repo: deb http://mirror.fuel-infra.org/mos-repos/ubuntu/8.0/ mos8.0-updates main restricted
pin: release o=Mirantis,a=mos8.0-updates,l=mos8.0,n=mos8.0
pin_priority: 1050
- id: managed_apt_{{index}}
from: resources/managed_apt

8
templates/sources.yaml Normal file
View File

@ -0,0 +1,8 @@
id: sources
resources:
- id: sources{{index}}
from: resources/sources
location: {{node}}
values:
sources:
- {src: /tmp/sources_test, dst: /tmp/sources_test}