Merge remote-tracking branch 'pigmej/orchestrator_os_hacking_guidelines' into pepmerge

This commit is contained in:
Łukasz Oleś 2015-11-26 14:18:24 +01:00
commit 788a667145
37 changed files with 561 additions and 559 deletions

View File

@ -21,16 +21,18 @@ import collections
import json
import click
import collections
from fabric import api as fabric_api
import networkx as nx
from solar.core import resource as sresource
from solar.core import signals
from solar.cli import base
from solar.cli.events import events
from solar.cli.orch import orchestration
from solar.cli.resource import resource as cli_resource
from solar.cli.system_log import changes
from solar.core import resource as sresource
from solar.core import signals
# HELPERS

View File

@ -18,14 +18,14 @@ import time
import click
from solar.cli.uids_history import remember_uid
from solar.cli.uids_history import SOLARUID
from solar import errors
from solar.orchestration import filters
from solar.orchestration import graph
from solar.orchestration import tasks
from solar.orchestration.traversal import states
from solar.orchestration import utils
from solar.cli.uids_history import remember_uid
from solar.cli.uids_history import SOLARUID
from solar import errors
@click.group(name='orch')

View File

@ -259,6 +259,5 @@ def remove(name, tag, f):
if f:
msg = 'Resource %s removed from database' % res.name
else:
msg = 'Resource %s will be removed after commiting changes.'
msg = msg % res.name
click.echo(msg)
click.echo(
'Resource %s will be removed after commiting changes.' % res.name)

View File

@ -15,6 +15,7 @@
#
import os
import yaml
from bunch import Bunch
import yaml

View File

@ -23,7 +23,7 @@ def setup_logger():
handler = logging.FileHandler('solar.log')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s') # NOQA
'%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)

View File

@ -216,8 +216,7 @@ class Resource(object):
for (emitter_resource, emitter_input), (receiver_resource, receiver_input), meta in self.graph().edges(data=True): # NOQA
if meta:
receiver_input = '{}:{}|{}'.format(receiver_input,
meta['destination_key'],
meta['tag'])
meta['destination_key'], meta['tag'])
rst.add(
(emitter_resource, emitter_input,

View File

@ -112,9 +112,8 @@ def _get_template(name, content, kwargs, inputs):
if input not in kwargs:
missing.append(input)
if missing:
err = '[{0}] Validation error. Missing data in input: {1}'
err = err.format(name, missing)
raise Exception(err)
raise Exception(
'[{0}] Validation error. Missing data in input: {1}'.format(name, missing))
template = Template(content, trim_blocks=True, lstrip_blocks=True)
template = template.render(str=str, zip=zip, **kwargs)
return template

View File

@ -83,8 +83,7 @@ def location_and_transports(emitter, receiver, orig_mapping):
emitter_single_reverse = emitter_single.get('reverse')
receiver_single_reverse = receiver_single.get('reverse')
if inps_receiver is None and inps_emitter is not None:
# we don't connect automaticaly when
# receiver is None and emitter is not None
# we don't connect automaticaly when receiver is None and emitter is not None
# for cases when we connect existing transports to other data
# containers
if receiver_single_reverse:

View File

@ -20,11 +20,8 @@ import os
import sys
import time
import libtorrent as lt
state_str = ['queued', 'checking', 'downloading metadata',
'downloading', 'finished', 'seeding', 'allocating',
'checking fastresume']
'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
class MultiTorrent(object):
@ -150,9 +147,9 @@ def _getter(torrents, max_seed_ratio=3):
# mt.force_reannounce()
s = ses.status()
if i % 5 == 0:
print('%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' %
print '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d) %s' % \
(mt.progress * 100, s.download_rate / 1000, s.upload_rate / 1000,
s.num_peers, mt.numbers()))
s.num_peers, mt.numbers())
now = time.time()
current_state = (now, mt.progress)
if current_state[-1] != last_state[-1]:

View File

@ -18,9 +18,10 @@ __all__ = ['add_dep', 'add_react', 'Dep', 'React', 'add_event']
import networkx as nx
from solar.core.log import log
from solar.events.controls import Dep, React, StateChange
from solar.dblayer.solar_models import Resource
from solar.events.controls import Dep
from solar.events.controls import React
def create_event(event_dict):
@ -99,6 +100,7 @@ def all_events(resource):
def bft_events_graph(start):
"""Builds graph of events traversing events in breadth-first order
This graph doesnt necessary reflect deployment order, it is used
to show dependencies between resources
"""
@ -127,7 +129,8 @@ def bft_events_graph(start):
def build_edges(changes_graph, events):
"""
"""Builds graph edges
:param changes_graph: nx.DiGraph object with actions to be executed
:param events: {res: [controls.Event objects]}
"""
@ -149,7 +152,8 @@ def build_edges(changes_graph, events):
log.debug('No outgoing events based on %s', event_name)
if event_name not in visited:
for parent, child, data in events_graph.edges(event_name, data=True):
for parent, child, data in events_graph.edges(event_name,
data=True):
succ_ev = data['event']
succ_ev.insert(stack, changes_graph)
visited.add(event_name)

View File

@ -23,16 +23,18 @@ if dependent resource isnt changed.
depends_on:
- parent:run -> ok -> dependent:run
*react_on* - relationship which will guarantee that action on dependent resource
will be executed if parent action is going to be executed. This control will
trigger action even if no changes noticed on dependent resource.
*react_on* - relationship which will guarantee that action on dependent
resource will be executed if parent action is going to be executed.
This control will trigger action even
if no changes noticed on dependent resource.
react_on:
- parent:update -> ok -> dependent:update
"""
from solar.dblayer.solar_models import Resource
from solar.dblayer.model import DBLayerNotFound
from solar.dblayer.solar_models import Resource
class Event(object):

View File

@ -14,9 +14,10 @@
import time
from solar.orchestration.runner import app
from celery import group
from solar.orchestration.runner import app
def celery_executor(dg, tasks, control_tasks=()):
to_execute = []
@ -28,7 +29,8 @@ def celery_executor(dg, tasks, control_tasks=()):
task_id = '{}:{}'.format(dg.graph['uid'], task_name)
task = app.tasks[dg.node[task_name]['type']]
if all_success(dg, dg.predecessors(task_name)) or task_name in control_tasks:
all_ok = all_success(dg, dg.predecessors(task_name))
if all_ok or task_name in control_tasks:
dg.node[task_name]['status'] = 'INPROGRESS'
dg.node[task_name]['start_time'] = time.time()
for t in generate_task(task, dg.node[task_name], task_id):

View File

@ -14,7 +14,8 @@
import networkx as nx
from .traversal import VISITED, states
from solar.orchestration.traversal import states
from solar.orchestration.traversal import VISITED
def get_dfs_postorder_subgraph(dg, nodes):
@ -26,6 +27,7 @@ def get_dfs_postorder_subgraph(dg, nodes):
def end_at(dg, nodes):
"""Returns subgraph that will guarantee that predecessors are visited
dg - directed graph
nodes - iterable with node names
"""
@ -33,8 +35,7 @@ def end_at(dg, nodes):
def start_from(dg, start_nodes):
"""Guarantee that all paths starting from specific *nodes* will be visited
"""
"""Ensures that all paths starting from specific *nodes* will be visited"""
visited = {n for n in dg if dg.node[n].get('status') in VISITED}
# sorting nodes in topological order will guarantee that all predecessors
@ -71,7 +72,8 @@ def validate(dg, start_nodes, end_nodes, err_msgs):
def filter(dg, start=None, end=None, tasks=(), skip_with=states.SKIPPED.name):
"""
"""Filters a graph
TODO(dshulyak) skip_with should also support NOOP, which will instead
of blocking task, and its successors, should mark task as visited

View File

@ -12,19 +12,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from collections import Counter
import time
import uuid
import networkx as nx
from solar import utils
from .traversal import states
from solar import errors
from collections import Counter
from solar.dblayer.solar_models import Task
from solar.dblayer.model import clear_cache
from solar.dblayer.solar_models import Task
from solar import errors
from solar import utils
from solar.orchestration.traversal import states
def save_graph(graph):
@ -77,8 +76,7 @@ get_plan = get_graph
def parse_plan(plan_path):
""" parses yaml definition and returns graph
"""
"""parses yaml definition and returns graph"""
plan = utils.yaml_load(plan_path)
dg = nx.MultiDiGraph()
dg.graph['name'] = plan['name']
@ -123,8 +121,6 @@ def show(uid):
def create_plan(plan_path, save=True):
"""
"""
dg = parse_plan(plan_path)
return create_plan_from_graph(dg, save=save)
@ -163,8 +159,9 @@ def report_topo(uid):
def wait_finish(uid, timeout):
"""Wait finish will periodically load graph and check if there is no
PENDING or INPROGRESS
"""Check if graph is finished
Will return when no PENDING or INPROGRESS otherwise yields summary
"""
start_time = time.time()

View File

@ -47,7 +47,9 @@ def get_default_chain(dg, inprogress, added):
def type_based_rule(dg, inprogress, item):
"""condition will be specified like:
"""Checks type based rules
condition should be specified like:
type_limit: 2
"""
_type = dg.node[item].get('resource_type')

View File

@ -14,21 +14,22 @@
from functools import partial
import subprocess
import time
from celery.app import task
from celery.signals import task_prerun, task_postrun
from celery.signals import task_postrun
from celery.signals import task_prerun
from solar.orchestration import graph
from solar.core import actions
from solar.core import resource
from solar.system_log.tasks import commit_logitem, error_logitem
from solar.dblayer import ModelMeta
from solar.orchestration import executor
from solar.orchestration import graph
from solar.orchestration import limits
from solar.orchestration.runner import app
from solar.orchestration.traversal import traverse
from solar.orchestration import limits
from solar.orchestration import executor
from solar.dblayer import ModelMeta
from solar.system_log.tasks import commit_logitem
from solar.system_log.tasks import error_logitem
import time
__all__ = ['solar_resource', 'cmd', 'sleep',
'error', 'fault_tolerance', 'schedule_start', 'schedule_next']

View File

@ -18,7 +18,8 @@ import networkx as nx
def write_graph(plan):
"""
"""Writes graph to dot then to svg
:param plan: networkx Graph object
"""
colors = {

View File

@ -15,19 +15,20 @@
import dictdiffer
import networkx as nx
from solar.system_log import data
from solar.core.log import log
from solar.core import signals
from solar.core import resource
from solar.core.resource.resource import RESOURCE_STATE
from solar.core import signals
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import LogItem
from solar.dblayer.solar_models import StrInt
from solar.events import api as evapi
from solar.orchestration import graph
from solar.system_log import data
from solar import utils
from solar.orchestration import graph
from solar.events import api as evapi
from .consts import CHANGES
from solar.core.resource.resource import RESOURCE_STATE
from solar.errors import CannotFindID
from solar.core.consts import CHANGES
from solar.dblayer.solar_models import Resource, LogItem, CommitedResource, StrInt
def guess_action(from_, to):
@ -135,7 +136,9 @@ def parameters(res, action, data):
def _get_args_to_update(args, connections):
"""For each resource we can update only args that are not provided
"""Returns args to update
For each resource we can update only args that are not provided
by connections
"""
inherited = [i[3].split(':')[0] for i in connections]
@ -146,7 +149,8 @@ def _get_args_to_update(args, connections):
def revert_uids(uids):
"""
"""Reverts uids
:param uids: iterable not generator
"""
items = LogItem.multi_get(uids)
@ -165,15 +169,15 @@ def revert_uids(uids):
def _revert_remove(logitem):
"""Resource should be created with all previous connections
"""
"""Resource should be created with all previous connections"""
commited = CommitedResource.get(logitem.resource)
args = dictdiffer.revert(logitem.diff, commited.inputs)
connections = dictdiffer.revert(
logitem.connections_diff, sorted(commited.connections))
resource.Resource(logitem.resource, logitem.base_path,
args=_get_args_to_update(args, connections), tags=commited.tags)
args=_get_args_to_update(args, connections),
tags=commited.tags)
for emitter, emitter_input, receiver, receiver_input in connections:
emmiter_obj = resource.load(emitter)
receiver_obj = resource.load(receiver)
@ -181,7 +185,7 @@ def _revert_remove(logitem):
emitter_input: receiver_input})
def _update_inputs_connections(res_obj, args, old_connections, new_connections):
def _update_inputs_connections(res_obj, args, old_connections, new_connections): # NOQA
removed = []
for item in old_connections:
@ -204,7 +208,8 @@ def _update_inputs_connections(res_obj, args, old_connections, new_connections):
emmiter_obj.connect(receiver_obj, {emitter_input: receiver_input})
if removed or added:
# TODO without save we will get error that some values can not be updated
# TODO without save we will get error
# that some values can not be updated
# even if connection was removed
receiver_obj.db_obj.save()
@ -212,8 +217,7 @@ def _update_inputs_connections(res_obj, args, old_connections, new_connections):
def _revert_update(logitem):
"""Revert of update should update inputs and connections
"""
"""Revert of update should update inputs and connections"""
res_obj = resource.load(logitem.resource)
commited = res_obj.load_commited()
@ -222,7 +226,8 @@ def _revert_update(logitem):
args = dictdiffer.revert(logitem.diff, commited.inputs)
_update_inputs_connections(
res_obj, _get_args_to_update(args, connections), commited.connections, connections)
res_obj, _get_args_to_update(args, connections),
commited.connections, connections)
def _revert_run(logitem):
@ -247,7 +252,9 @@ def _discard_update(item):
args = dictdiffer.revert(item.diff, resource_obj.args)
_update_inputs_connections(
resource_obj, _get_args_to_update(args, new_connections), old_connections, new_connections)
resource_obj, _get_args_to_update(args, new_connections),
old_connections, new_connections)
def _discard_run(item):
@ -279,9 +286,8 @@ def discard_all():
def commit_all():
"""Helper mainly for ease of testing
"""
from .operations import move_to_commited
"""Helper mainly for ease of testing"""
from solar.system_log.operations import move_to_commited
for item in data.SL():
move_to_commited(item.log_action)

View File

@ -12,11 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
from solar.system_log import data
from solar.dblayer.solar_models import CommitedResource
from dictdiffer import patch
from solar.core.resource import resource
from .consts import CHANGES
from solar.dblayer.solar_models import CommitedResource
from solar.system_log import data
from solar.system_log.consts import CHANGES
def set_error(log_action, *args, **kwargs):

View File

@ -13,7 +13,8 @@
# under the License.
from solar.orchestration.runner import app
from solar.system_log.operations import set_error, move_to_commited
from solar.system_log.operations import move_to_commited
from solar.system_log.operations import set_error
__all__ = ['error_logitem', 'commit_logitem']

View File

@ -198,7 +198,8 @@ class ResourceListTemplate(BaseTemplate):
self.add_react(action_state, ResourceTemplate(r), action)
def filter(self, func):
"""Return ResourceListeTemplate instance with resources filtered by func.
"""Return ResourceListeTemplate instance with resources filtered
by func.
func -- predictate function that takes (idx, resource) as parameter
(idx is the index of resource in self.resources list)
@ -233,7 +234,8 @@ class ResourceListTemplate(BaseTemplate):
)
def connect_list_to_each(self, resources, mapping=None, events=None):
"""Connect each resource in self.resources to each resource in resources.
"""Connect each resource in self.resources to each resource in
resources.
mapping -- optional mapping
"{emitter_num}" -- substitutes for emitter's index in mapping (from
@ -260,9 +262,8 @@ class ResourceListTemplate(BaseTemplate):
)
def on_each(self, resource_path, args=None):
"""Create resource form resource_path
on each resource in self.resources
"""Create resource form resource_path on each resource in
self.resources.
"""
args = args or {}

View File

@ -15,12 +15,13 @@
import os
import shutil
import tempfile
import unittest
import yaml
import time
import unittest
import yaml
from solar.core.resource import virtual_resource as vr
from solar.dblayer.model import Model, Replacer, ModelMeta, get_bucket
from solar.dblayer.model import Model
def patched_get_bucket_name(cls):

View File

@ -14,11 +14,13 @@
import os
import time
from solar.core.resource import Resource
from solar.dblayer.model import Model, ModelMeta, get_bucket
import pytest
from solar.core.resource import Resource
from solar.dblayer.model import Model
from solar.dblayer.model import ModelMeta
from solar.dblayer.model import get_bucket
def patched_get_bucket_name(cls):
return cls.__name__ + str(time.time())
@ -49,13 +51,6 @@ def setup(request):
model.bucket = get_bucket(None, model, ModelMeta)
@pytest.fixture(autouse=True)
def setup(request):
for model in ModelMeta._defined_models:
model.bucket = get_bucket(None, model, ModelMeta)
def pytest_runtest_teardown(item, nextitem):
ModelMeta.session_end(result=True)
return nextitem

View File

@ -13,8 +13,8 @@
# under the License.
import networkx as nx
from pytest import fixture
from mock import patch
from pytest import fixture
from solar.orchestration import executor
@ -29,7 +29,6 @@ def dg():
@patch.object(executor, 'app')
def test_celery_executor(mapp, dg):
"""Just check that it doesnt fail for now.
"""
"""Just check that it doesnt fail for now."""
assert executor.celery_executor(dg, ['t1'])
assert dg.node['t1']['status'] == 'INPROGRESS'

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from dictdiffer import patch
from dictdiffer import revert
from pytest import fixture
from dictdiffer import revert, patch
import networkx as nx
from solar.system_log import change
@ -28,8 +28,9 @@ def staged():
'metadata': {},
'connections': [
['node.1', 'res.1', ['ip', 'ip']],
['node.1', 'res.1', ['key', 'key']]]
}
['node.1', 'res.1', ['key', 'key']]
]}
@fixture
@ -40,8 +41,9 @@ def commited():
'list_val': [1]},
'metadata': {},
'connections': [
['node.1', 'res.1', ['ip', 'ip']]]
}
['node.1', 'res.1', ['ip', 'ip']]
]}
@fixture
@ -56,17 +58,24 @@ def diff_for_update(staged, commited):
def test_create_diff_with_empty_commited(full_diff):
# add will be executed
expected = [('add', '', [('connections', [['node.1', 'res.1', ['ip', 'ip']], ['node.1', 'res.1', ['key', 'key']]]), ('input', {
'ip': {'value': '10.0.0.2'}, 'list_val': {'value': [1, 2]}}), ('metadata', {}), ('id', 'res.1'), ('tags', ['res', 'node.1'])])]
expected = [('add', '',
[('connections', [['node.1', 'res.1', ['ip', 'ip']],
['node.1', 'res.1', ['key', 'key']]]),
('input', {
'ip': {'value': '10.0.0.2'},
'list_val': {'value': [1, 2]}
}), ('metadata', {}), ('id', 'res.1'),
('tags', ['res', 'node.1'])])]
assert full_diff == expected
def test_create_diff_modified(diff_for_update):
assert diff_for_update == [
('add', 'connections',
[(1, ['node.1', 'res.1', ['key', 'key']])]),
('change', 'input.ip', ('10.0.0.2', {'value': '10.0.0.2'})),
('change', 'input.list_val', ([1], {'value': [1, 2]}))]
('add', 'connections', [(1, ['node.1', 'res.1', ['key', 'key']])]),
('change', 'input.ip',
('10.0.0.2', {'value': '10.0.0.2'})), ('change', 'input.list_val',
([1], {'value': [1, 2]}))
]
def test_verify_patch_creates_expected(staged, diff_for_update, commited):
@ -81,20 +90,17 @@ def test_revert_update(staged, diff_for_update, commited):
@fixture
def resources():
r = {'n.1':
{'uid': 'n.1',
'args': {'ip': '10.20.0.2'},
'connections': [],
'tags': []},
'r.1':
{'uid': 'r.1',
'args': {'ip': '10.20.0.2'},
'connections': [['n.1', 'r.1', ['ip', 'ip']]],
'tags': []},
'h.1':
{'uid': 'h.1',
'args': {'ip': '10.20.0.2',
'ips': ['10.20.0.2']},
'connections': [['n.1', 'h.1', ['ip', 'ip']]],
'tags': []}}
r = {'n.1': {'uid': 'n.1',
'args': {'ip': '10.20.0.2'},
'connections': [],
'tags': []},
'r.1': {'uid': 'r.1',
'args': {'ip': '10.20.0.2'},
'connections': [['n.1', 'r.1', ['ip', 'ip']]],
'tags': []},
'h.1': {'uid': 'h.1',
'args': {'ip': '10.20.0.2',
'ips': ['10.20.0.2']},
'connections': [['n.1', 'h.1', ['ip', 'ip']]],
'tags': []}}
return r

View File

@ -16,10 +16,8 @@
import networkx as nx
from pytest import fixture
from solar.events import api as evapi
from solar.dblayer.solar_models import Resource
from .base import BaseResourceTest
from solar.events import api as evapi
@fixture
@ -76,9 +74,7 @@ def test_nova_api_run_after_nova(nova_deps):
def test_nova_api_react_on_update(nova_deps):
"""Test that nova_api:update will be called even if there is no changes
in nova_api
"""
"""Test that nova_api:update will be called even if there is no changes in nova_api""" # NOQA
changes_graph = nx.DiGraph()
changes_graph.add_node('nova.update')
evapi.build_edges(changes_graph, nova_deps)
@ -89,19 +85,25 @@ def test_nova_api_react_on_update(nova_deps):
@fixture
def rmq_deps():
"""Example of a case when defaults are not good enough.
For example we need to run some stuff on first node before two others.
"""
# NOTE(dshulyak) is it possible to put this create/join logic into
# puppet manifest? So that rmq_cluster.2 before joining will check if
# cluster already exists?
return {
'rmq.1': [evapi.Dep('rmq.1', 'run', 'success', 'rmq_cluster.1', 'create')],
'rmq.2': [evapi.Dep('rmq.2', 'run', 'success', 'rmq_cluster.2', 'join')],
'rmq.3': [evapi.Dep('rmq.3', 'run', 'success', 'rmq_cluster.3', 'join')],
'rmq.1':
[evapi.Dep('rmq.1', 'run', 'success', 'rmq_cluster.1', 'create')],
'rmq.2':
[evapi.Dep('rmq.2', 'run', 'success', 'rmq_cluster.2', 'join')],
'rmq.3':
[evapi.Dep('rmq.3', 'run', 'success', 'rmq_cluster.3', 'join')],
'rmq_cluster.1': [
evapi.Dep('rmq_cluster.1', 'create',
'success', 'rmq_cluster.2', 'join'),
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.3', 'join')]}
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.2',
'join'), evapi.Dep('rmq_cluster.1', 'create', 'success',
'rmq_cluster.3', 'join')
]
}
def test_rmq(rmq_deps):
@ -115,29 +117,35 @@ def test_rmq(rmq_deps):
evapi.build_edges(changes_graph, rmq_deps)
assert set(changes_graph.successors('rmq_cluster.1.create')) == {
'rmq_cluster.2.join', 'rmq_cluster.3.join'}
'rmq_cluster.2.join', 'rmq_cluster.3.join'
}
def test_riak():
events = {
'riak_service1': [
evapi.React('riak_service1', 'run', 'success',
'riak_service2', 'run'),
evapi.React('riak_service1', 'run', 'success', 'riak_service3', 'run')],
evapi.React('riak_service1', 'run', 'success', 'riak_service2',
'run'), evapi.React('riak_service1', 'run', 'success',
'riak_service3', 'run')
],
'riak_service3': [
evapi.React('riak_service3', 'join', 'success',
'riak_service1', 'commit'),
evapi.React('riak_service3', 'run', 'success', 'riak_service3', 'join')],
evapi.React('riak_service3', 'join', 'success', 'riak_service1',
'commit'),
evapi.React('riak_service3', 'run', 'success', 'riak_service3',
'join')
],
'riak_service2': [
evapi.React('riak_service2', 'run', 'success',
'riak_service2', 'join'),
evapi.React('riak_service2', 'join', 'success', 'riak_service1', 'commit')],
evapi.React('riak_service2', 'run', 'success', 'riak_service2',
'join'),
evapi.React('riak_service2', 'join', 'success', 'riak_service1',
'commit')
],
}
changes_graph = nx.MultiDiGraph()
changes_graph.add_node('riak_service1.run')
evapi.build_edges(changes_graph, events)
assert set(changes_graph.predecessors('riak_service1.commit')) == {
'riak_service2.join', 'riak_service3.join'}
'riak_service2.join', 'riak_service3.join'
}

View File

@ -18,14 +18,12 @@ from pytest import fixture
from solar.orchestration import graph
from solar.orchestration.traversal import states
from solar.dblayer.model import ModelMeta
@fixture
def simple():
simple_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures',
'simple.yaml')
return graph.create_plan(simple_path)
@ -59,7 +57,13 @@ def test_wait_finish(simple):
simple.node[n]['status'] = states.SUCCESS.name
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 2, 'NOOP': 0, 'ERROR': 0, 'INPROGRESS': 0, 'PENDING': 0}
'SKIPPED': 0,
'SUCCESS': 2,
'NOOP': 0,
'ERROR': 0,
'INPROGRESS': 0,
'PENDING': 0
}
def test_several_updates(simple):
@ -67,10 +71,22 @@ def test_several_updates(simple):
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 1, 'INPROGRESS': 0, 'PENDING': 1}
'SKIPPED': 0,
'SUCCESS': 0,
'NOOP': 0,
'ERROR': 1,
'INPROGRESS': 0,
'PENDING': 1
}
simple.node['echo_stuff']['status'] = states.ERROR.name
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 2, 'INPROGRESS': 0, 'PENDING': 0}
'SKIPPED': 0,
'SUCCESS': 0,
'NOOP': 0,
'ERROR': 2,
'INPROGRESS': 0,
'PENDING': 0
}

View File

@ -14,14 +14,13 @@
import os
import networkx as nx
from pytest import fixture
from pytest import mark
import networkx as nx
from solar.orchestration import graph
from solar.orchestration import filters
from solar.orchestration import graph
from solar.orchestration.traversal import states
from solar.utils import yaml_load
@fixture
@ -44,8 +43,7 @@ def test_end_at(dg_ex1, end_nodes, visited):
@mark.parametrize("start_nodes,visited", [
(['n3'], {'n3'}),
(['n1'], {'n1', 'n2', 'n4'}),
(['n3'], {'n3'}), (['n1'], {'n1', 'n2', 'n4'}),
(['n1', 'n3'], {'n1', 'n2', 'n3', 'n4', 'n5'})
])
def test_start_from(dg_ex1, start_nodes, visited):
@ -63,20 +61,21 @@ def dg_ex2():
@fixture
def riak_plan():
riak_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures',
'riak.yaml')
return graph.create_plan(riak_path, save=False)
def test_riak_start_node1(riak_plan):
assert filters.start_from(riak_plan, ['node1.run']) == {
'node1.run', 'hosts_file1.run', 'riak_service1.run'}
'node1.run', 'hosts_file1.run', 'riak_service1.run'
}
def test_riak_end_hosts_file1(riak_plan):
assert filters.end_at(riak_plan, ['hosts_file1.run']) == {
'node1.run', 'hosts_file1.run'}
'node1.run', 'hosts_file1.run'
}
def test_start_at_two_nodes(riak_plan):
@ -87,8 +86,10 @@ def test_start_at_two_nodes(riak_plan):
def test_initial_from_node1_traverse(riak_plan):
filters.filter(riak_plan, start=['node1.run'])
pending = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.PENDING.name}
pending = {n
for n in riak_plan
if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file1.run', 'riak_service1.run', 'node1.run'}
@ -97,18 +98,21 @@ def test_second_from_node2_with_node1_walked(riak_plan):
for n in success:
riak_plan.node[n]['status'] = states.SUCCESS.name
filters.filter(riak_plan, start=['node2.run'])
pending = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file2.run', 'riak_service2.run',
'node2.run', 'riak_service2.join'}
pending = {n
for n in riak_plan
if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file2.run', 'riak_service2.run', 'node2.run',
'riak_service2.join'}
def test_end_joins(riak_plan):
filters.filter(
riak_plan,
start=['node1.run', 'node2.run', 'node3.run'],
end=['riak_service2.join', 'riak_service3.join'])
skipped = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.SKIPPED.name}
filters.filter(riak_plan,
start=['node1.run', 'node2.run', 'node3.run'],
end=['riak_service2.join', 'riak_service3.join'])
skipped = {n
for n in riak_plan
if riak_plan.node[
n]['status'] == states.SKIPPED.name}
assert skipped == {'riak_service1.commit'}

View File

@ -14,40 +14,49 @@
import os
from pytest import fixture
import networkx as nx
from pytest import fixture
from solar.orchestration import limits
from solar.orchestration import graph
from solar.orchestration import limits
@fixture
def dg():
ex = nx.DiGraph()
ex.add_node('t1', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t2', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t3', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t1',
status='PENDING',
target='1',
resource_type='node',
type_limit=2)
ex.add_node('t2',
status='PENDING',
target='1',
resource_type='node',
type_limit=2)
ex.add_node('t3',
status='PENDING',
target='1',
resource_type='node',
type_limit=2)
return ex
def test_target_rule(dg):
assert limits.target_based_rule(dg, [], 't1') == True
assert limits.target_based_rule(dg, ['t1'], 't2') == False
assert limits.target_based_rule(dg, [], 't1') is True
assert limits.target_based_rule(dg, ['t1'], 't2') is False
def test_type_limit_rule(dg):
assert limits.type_based_rule(dg, ['t1'], 't2') == True
assert limits.type_based_rule(dg, ['t1', 't2'], 't3') == False
assert limits.type_based_rule(dg, ['t1'], 't2') is True
assert limits.type_based_rule(dg, ['t1', 't2'], 't3') is False
def test_items_rule(dg):
assert limits.items_rule(dg, ['1'] * 99, '2')
assert limits.items_rule(dg, ['1'] * 99, '2', limit=10) == False
assert limits.items_rule(dg, ['1'] * 99, '2', limit=10) is False
@fixture
@ -68,8 +77,7 @@ def test_filtering_chain(target_dg):
@fixture
def seq_plan():
seq_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures',
'sequential.yaml')
return graph.create_plan(seq_path, save=False)
@ -78,6 +86,6 @@ def test_limits_sequential(seq_plan):
stack_to_execute = seq_plan.nodes()
while stack_to_execute:
left = stack_to_execute[0]
assert list(limits.get_default_chain(
seq_plan, [], stack_to_execute)) == [left]
assert list(limits.get_default_chain(seq_plan, [],
stack_to_execute)) == [left]
stack_to_execute.pop(0)

View File

@ -15,8 +15,8 @@
from pytest import fixture
from solar.core import resource
from solar.dblayer.solar_models import Resource
from solar.dblayer.model import ModelMeta
from solar.dblayer.solar_models import Resource
@fixture

View File

@ -13,7 +13,6 @@
# under the License.
import base
from solar.core import resource
from solar.core import signals
@ -31,9 +30,8 @@ input:
value: 0
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'value': 1})
self.assertEqual(sample1.args['value'], 1)
# test default value
@ -41,7 +39,8 @@ input:
self.assertEqual(sample2.args['value'], 0)
def test_connections_recreated_after_load(self):
"""
"""Test if connections are ok after load
Create resource in some process. Then in other process load it.
All connections should remain the same.
"""
@ -56,12 +55,9 @@ input:
""")
def creating_process():
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir,
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'value': 1})
sample2 = self.create_resource('sample2', sample_meta_dir, )
signals.connect(sample1, sample2)
self.assertEqual(sample1.args['value'], sample2.args['value'])
@ -86,9 +82,7 @@ input:
value: 0
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'value': 1}
)
sample = self.create_resource('sample', sample_meta_dir, {'value': 1})
sample_l = resource.load('sample')
@ -107,12 +101,9 @@ input:
value: 0
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'value': 1})
sample2 = self.create_resource('sample2', sample_meta_dir, {})
signals.connect(sample1, sample2)
self.assertEqual(sample1.args['value'], sample2.args['value'])

View File

@ -12,12 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import base
from solar.core import signals as xs
import pytest
import base
from solar.core import signals as xs
class TestBaseInput(base.BaseResourceTest):
@ -32,13 +31,11 @@ input:
value:
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'value': 'x'}
)
sample = self.create_resource('sample', sample_meta_dir,
{'value': 'x'})
with self.assertRaisesRegexp(
Exception,
'Trying to connect value-.* to itself'):
with self.assertRaisesRegexp(Exception,
'Trying to connect value-.* to itself'):
xs.connect(sample, sample, {'value'})
def test_input_dict_type(self):
@ -52,42 +49,23 @@ input:
value: {}
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'values': {'a': 1, 'b': 2}}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir
)
sample1 = self.create_resource('sample1', sample_meta_dir, {'values':
{'a': 1,
'b': 2}})
sample2 = self.create_resource('sample2', sample_meta_dir)
xs.connect(sample1, sample2)
self.assertEqual(
sample1.args['values'],
sample2.args['values']
)
self.assertEqual(sample1.args['values'], sample2.args['values'])
# Check update
sample1.update({'values': {'a': 2}})
self.assertEqual(
sample1.args['values'],
{'a': 2}
)
self.assertEqual(
sample1.args['values'],
sample2.args['values'],
)
self.assertEqual(sample1.args['values'], {'a': 2})
self.assertEqual(sample1.args['values'], sample2.args['values'], )
# Check disconnect
# TODO: should sample2.value be reverted to original value?
sample1.disconnect(sample2)
sample1.update({'values': {'a': 3}})
self.assertEqual(
sample1.args['values'],
{'a': 3}
)
# self.assertEqual(
# sample2.args['values'],
# {'a': 2}
#)
#self.assertEqual(sample2.args['values'].emitter, None)
self.assertEqual(sample1.args['values'], {'a': 3})
def test_multiple_resource_disjoint_connect(self):
sample_meta_dir = self.make_resource_meta("""
@ -121,31 +99,19 @@ input:
value:
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'ip': None, 'port': None}
)
sample_ip = self.create_resource(
'sample-ip', sample_ip_meta_dir, {'ip': '10.0.0.1'}
)
sample_port = self.create_resource(
'sample-port', sample_port_meta_dir, {'port': 8000}
)
self.assertNotEqual(
sample.resource_inputs()['ip'],
sample_ip.resource_inputs()['ip'],
)
sample = self.create_resource('sample', sample_meta_dir,
{'ip': None,
'port': None})
sample_ip = self.create_resource('sample-ip', sample_ip_meta_dir,
{'ip': '10.0.0.1'})
sample_port = self.create_resource('sample-port', sample_port_meta_dir,
{'port': 8000})
self.assertNotEqual(sample.resource_inputs()['ip'],
sample_ip.resource_inputs()['ip'], )
xs.connect(sample_ip, sample)
xs.connect(sample_port, sample)
self.assertEqual(sample.args['ip'], sample_ip.args['ip'])
self.assertEqual(sample.args['port'], sample_port.args['port'])
# self.assertEqual(
# sample.args['ip'].emitter,
# sample_ip.args['ip']
#)
# self.assertEqual(
# sample.args['port'].emitter,
# sample_port.args['port']
#)
def test_simple_observer_unsubscription(self):
sample_meta_dir = self.make_resource_meta("""
@ -158,29 +124,18 @@ input:
value:
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'ip': None}
)
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2'}
)
sample = self.create_resource('sample', sample_meta_dir, {'ip': None})
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1'})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2'})
xs.connect(sample1, sample)
self.assertEqual(sample1.args['ip'], sample.args['ip'])
#self.assertEqual(len(list(sample1.args['ip'].receivers)), 1)
# self.assertEqual(
# sample.args['ip'].emitter,
# sample1.args['ip']
#)
xs.connect(sample2, sample)
self.assertEqual(sample2.args['ip'], sample.args['ip'])
# sample should be unsubscribed from sample1 and subscribed to sample2
#self.assertEqual(len(list(sample1.args['ip'].receivers)), 0)
#self.assertEqual(sample.args['ip'].emitter, sample2.args['ip'])
sample2.update({'ip': '10.0.0.3'})
self.assertEqual(sample2.args['ip'], sample.args['ip'])
@ -198,15 +153,13 @@ input:
value:
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2'}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1'})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2'})
xs.connect(sample1, sample2)
with self.assertRaises(Exception):
with self.assertRaises(Exception): # NOQA
xs.connect(sample2, sample1)
@ -232,68 +185,36 @@ input:
value: []
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2'}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1'})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2'})
list_input_single = self.create_resource(
'list-input-single', list_input_single_meta_dir, {'ips': []}
)
'list-input-single', list_input_single_meta_dir, {'ips': []})
sample1.connect(list_input_single, mapping={'ip': 'ips'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip')]
#)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
])
sample2.connect(list_input_single, mapping={'ip': 'ips'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip'),
# (sample2.args['ip'].attached_to.name, 'ip')]
#)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
sample2.args['ip'],
])
# Test update
sample2.update({'ip': '10.0.0.3'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
sample2.args['ip'],
])
# Test disconnect
sample2.disconnect(list_input_single)
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip')]
#)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
])
def test_list_input_multi(self):
sample_meta_dir = self.make_resource_meta("""
@ -321,70 +242,58 @@ input:
value:
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1', 'port': 1000}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2', 'port': 1001}
)
list_input_multi = self.create_resource(
'list-input-multi', list_input_multi_meta_dir, args={'ips': [], 'ports': []}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1',
'port': 1000})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2',
'port': 1001})
list_input_multi = self.create_resource('list-input-multi',
list_input_multi_meta_dir,
args={'ips': [],
'ports': []})
xs.connect(sample1, list_input_multi, mapping={
'ip': 'ips', 'port': 'ports'})
xs.connect(sample1,
list_input_multi,
mapping={
'ip': 'ips',
'port': 'ports'
})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
[sample1.args['ip']]
)
[sample1.args['ip']])
self.assertItemsEqual(
#[p['value'] for p in list_input_multi.args['ports']],
list_input_multi.args['ports'],
[sample1.args['port']]
)
[sample1.args['port']])
xs.connect(sample2, list_input_multi, mapping={
'ip': 'ips', 'port': 'ports'})
xs.connect(sample2,
list_input_multi,
mapping={
'ip': 'ips',
'port': 'ports'
})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip'),
# (sample2.args['ip'].attached_to.name, 'ip')]
#)
])
self.assertItemsEqual(
#[p['value'] for p in list_input_multi.args['ports']],
list_input_multi.args['ports'],
[
sample1.args['port'],
sample2.args['port'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ports']],
# [(sample1.args['port'].attached_to.name, 'port'),
# (sample2.args['port'].attached_to.name, 'port')]
#)
])
# Test disconnect
sample2.disconnect(list_input_multi)
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
[sample1.args['ip']]
)
[sample1.args['ip']])
self.assertItemsEqual(
#[p['value'] for p in list_input_multi.args['ports']],
list_input_multi.args['ports'],
[sample1.args['port']]
)
[sample1.args['port']])
# XXX: not used for now, not implemented in new db (jnowak)
# @pytest.mark.xfail(reason="Nested lists are not supported in new_db")
@ -446,14 +355,13 @@ input:
# sample1.connect(list_input, mapping={'ip': 'ips', 'port': 'ports'})
# sample2.connect(list_input, mapping={'ip': 'ips', 'port': 'ports'})
# list_input.connect(list_input_nested, mapping={'ips': 'ipss', 'ports': 'portss'})
# list_input.connect(list_input_nested,
# mapping={'ips': 'ipss', 'ports': 'portss'})
# self.assertListEqual(
# #[ips['value'] for ips in list_input_nested.args['ipss']],
# list_input_nested.args['ipss'],
# [list_input.args['ips']]
# )
# self.assertListEqual(
# #[ps['value'] for ps in list_input_nested.args['portss']],
# list_input_nested.args['portss'],
# [list_input.args['ports']]
# )
@ -461,7 +369,6 @@ input:
# # Test disconnect
# xs.disconnect(sample1, list_input)
# self.assertListEqual(
# #[[ip['value'] for ip in ips['value']] for ips in list_input_nested.args['ipss']],
# list_input_nested.args['ipss'],
# [[sample2.args['ip']]]
# )
@ -472,8 +379,8 @@ input:
class TestHashInput(base.BaseResourceTest):
@pytest.mark.xfail(reason="Connect should raise an error if already connected")
@pytest.mark.xfail(
reason="Connect should raise an error if already connected")
def test_hash_input_basic(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -496,42 +403,51 @@ input:
schema: {ip: str!, port: int!}
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
sample3 = self.create_resource(
'sample3', sample_meta_dir, args={'ip': '10.0.0.3', 'port': 5002}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
xs.connect(sample1, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
sample1 = self.create_resource('sample1',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
sample2 = self.create_resource('sample2',
sample_meta_dir,
args={'ip': '10.0.0.2',
'port': 5001})
sample3 = self.create_resource('sample3',
sample_meta_dir,
args={'ip': '10.0.0.3',
'port': 5002})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample1,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertDictEqual(
{'ip': sample1.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
)
{'ip': sample1.args['ip'],
'port': sample1.args['port']},
receiver.args['server'], )
sample1.update({'ip': '10.0.0.2'})
self.assertDictEqual(
{'ip': sample1.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
)
{'ip': sample1.args['ip'],
'port': sample1.args['port']},
receiver.args['server'], )
# XXX: We need to disconnect first
# XXX: it should raise error when connecting already connected inputs
xs.connect(sample2, receiver, mapping={'ip': 'server:ip'})
self.assertDictEqual(
{'ip': sample2.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
)
xs.connect(sample3, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
{'ip': sample2.args['ip'],
'port': sample1.args['port']},
receiver.args['server'], )
xs.connect(sample3,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertDictEqual(
{'ip': sample3.args['ip'], 'port': sample3.args['port']},
receiver.args['server'],
)
{'ip': sample3.args['ip'],
'port': sample3.args['port']},
receiver.args['server'], )
def test_hash_input_mixed(self):
sample_meta_dir = self.make_resource_meta("""
@ -555,22 +471,23 @@ input:
schema: {ip: str!, port: int!}
""")
sample = self.create_resource(
'sample', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir, args={'server': {'port': 5001}}
)
sample = self.create_resource('sample',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
receiver = self.create_resource('receiver',
receiver_meta_dir,
args={'server': {'port': 5001}})
sample.connect(receiver, mapping={'ip': 'server:ip'})
self.assertDictEqual(
{'ip': sample.args['ip'], 'port': 5001},
receiver.args['server'],
)
{'ip': sample.args['ip'],
'port': 5001},
receiver.args['server'], )
sample.update({'ip': '10.0.0.2'})
self.assertDictEqual(
{'ip': sample.args['ip'], 'port': 5001},
receiver.args['server'],
)
{'ip': sample.args['ip'],
'port': 5001},
receiver.args['server'], )
def test_hash_input_with_list(self):
sample_meta_dir = self.make_resource_meta("""
@ -594,33 +511,41 @@ input:
schema: [{ip: str!, port: int!}]
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
xs.connect(sample1, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
sample1 = self.create_resource('sample1',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample1,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample1.args['port']}],
receiver.args['server'],
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
xs.connect(sample2, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
[{'ip': sample1.args['ip'],
'port': sample1.args['port']}],
receiver.args['server'], )
sample2 = self.create_resource('sample2',
sample_meta_dir,
args={'ip': '10.0.0.2',
'port': 5001})
xs.connect(sample2,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample1.args['port']},
{'ip': sample2.args['ip'], 'port': sample2.args['port']}],
receiver.args['server'],
)
[{'ip': sample1.args['ip'],
'port': sample1.args['port']}, {'ip': sample2.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
sample1.disconnect(receiver)
self.assertItemsEqual(
[{'ip': sample2.args['ip'], 'port': sample2.args['port']}],
receiver.args['server'],
)
[{'ip': sample2.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
def test_hash_input_with_multiple_connections(self):
sample_meta_dir = self.make_resource_meta("""
@ -644,21 +569,14 @@ input:
schema: {ip: str!}
""")
sample = self.create_resource(
'sample', sample_meta_dir, args={'ip': '10.0.0.1'}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
sample = self.create_resource('sample',
sample_meta_dir,
args={'ip': '10.0.0.1'})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample, receiver, mapping={'ip': ['ip', 'server:ip']})
self.assertEqual(
sample.args['ip'],
receiver.args['ip']
)
self.assertDictEqual(
{'ip': sample.args['ip']},
receiver.args['server'],
)
self.assertEqual(sample.args['ip'], receiver.args['ip'])
self.assertDictEqual({'ip': sample.args['ip']},
receiver.args['server'], )
def test_hash_input_multiple_resources_with_tag_connect(self):
sample_meta_dir = self.make_resource_meta("""
@ -682,44 +600,49 @@ input:
schema: [{ip: str!, port: int!}]
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
sample1 = self.create_resource('sample1',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
sample2 = self.create_resource('sample2',
sample_meta_dir,
args={'ip': '10.0.0.2',
'port': 5001})
receiver = self.create_resource('receiver', receiver_meta_dir)
sample1.connect(receiver, mapping={'ip': 'server:ip'})
sample2.connect(receiver, mapping={'port': 'server:port|sample1'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']}],
receiver.args['server'],
)
sample3 = self.create_resource(
'sample3', sample_meta_dir, args={'ip': '10.0.0.3', 'port': 5002}
)
sample3.connect(receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
sample3 = self.create_resource('sample3',
sample_meta_dir,
args={'ip': '10.0.0.3',
'port': 5002})
sample3.connect(receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample3.args['ip'], 'port': sample3.args['port']}],
receiver.args['server'],
)
sample4 = self.create_resource(
'sample4', sample_meta_dir, args={'ip': '10.0.0.4', 'port': 5003}
)
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}, {'ip': sample3.args['ip'],
'port': sample3.args['port']}],
receiver.args['server'], )
sample4 = self.create_resource('sample4',
sample_meta_dir,
args={'ip': '10.0.0.4',
'port': 5003})
sample4.connect(receiver, mapping={'port': 'server:port|sample3'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample3.args['ip'], 'port': sample4.args['port']}],
receiver.args['server'],
)
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}, {'ip': sample3.args['ip'],
'port': sample4.args['port']}],
receiver.args['server'], )
# There can be no sample3 connections left now
sample4.connect(receiver, mapping={'ip': 'server:ip|sample3'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample4.args['ip'], 'port': sample4.args['port']}],
receiver.args['server'],
)
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}, {'ip': sample4.args['ip'],
'port': sample4.args['port']}],
receiver.args['server'], )

View File

@ -13,25 +13,27 @@
# under the License.
import mock
from pytest import fixture
from pytest import mark
from solar.core.resource import resource
from solar.core.resource import RESOURCE_STATE
from solar.core import signals
from solar.dblayer.model import ModelMeta
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import Resource as DBResource
from solar.system_log import change
from solar.system_log import data
from solar.system_log import operations
from solar.core import signals
from solar.core.resource import resource, RESOURCE_STATE
from solar.dblayer.solar_models import Resource as DBResource
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.model import ModelMeta
def test_revert_update():
commit = {'a': '10'}
previous = {'a': '9'}
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1',
'base_path': 'x',
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.save()
action = 'update'
res.inputs['a'] = '9'
@ -40,9 +42,11 @@ def test_revert_update():
assert resource_obj.args == previous
log = data.SL()
logitem = change.create_logitem(
res.name, action, change.create_diff(commit, previous), [],
base_path=res.base_path)
logitem = change.create_logitem(res.name,
action,
change.create_diff(commit, previous),
[],
base_path=res.base_path)
log.append(logitem)
resource_obj.update(commit)
operations.move_to_commited(logitem.log_action)
@ -56,23 +60,29 @@ def test_revert_update():
def test_revert_update_connected():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = ''
res2.save_lazy()
res3 = DBResource.from_dict('test3',
{'name': 'test3', 'base_path': 'x',
{'name': 'test3',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res3.inputs['a'] = ''
res3.save_lazy()
@ -113,15 +123,16 @@ def test_revert_update_connected():
def test_revert_removal():
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.inputs['a'] = '9'
res.save_lazy()
commited = CommitedResource.from_dict('test1',
{'inputs': {'a': '9'},
'state': 'operational'})
commited = CommitedResource.from_dict('test1', {'inputs': {'a': '9'},
'state': 'operational'})
commited.save_lazy()
resource_obj = resource.load(res.name)
@ -139,23 +150,31 @@ def test_revert_removal():
with mock.patch.object(resource, 'read_meta') as mread:
mread.return_value = {
'input': {'a': {'schema': 'str!'}}, 'id': 'mocked'}
'input': {'a': {'schema': 'str!'}},
'id': 'mocked'
}
change.revert(changes[0].uid)
ModelMeta.save_all_lazy()
assert len(DBResource.bucket.get('test1').siblings) == 1
resource_obj = resource.load('test1')
assert resource_obj.args == {
'a': '9', 'location_id': '', 'transports_id': ''}
'a': '9',
'location_id': '',
'transports_id': ''
}
@mark.xfail(reason='With current approach child will be notice changes after parent is removed')
@mark.xfail(
reason="""With current approach child will
notice changes after parent is removed"""
)
def test_revert_removed_child():
res1 = orm.DBResource(id='test1', name='test1', base_path='x')
res1 = orm.DBResource(id='test1', name='test1', base_path='x') # NOQA
res1.save()
res1.add_input('a', 'str', '9')
res2 = orm.DBResource(id='test2', name='test2', base_path='x')
res2 = orm.DBResource(id='test2', name='test2', base_path='x') # NOQA
res2.save()
res2.add_input('a', 'str', 0)
@ -184,9 +203,11 @@ def test_revert_removed_child():
def test_revert_create():
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.inputs['a'] = '9'
res.save_lazy()
ModelMeta.save_all_lazy()
@ -211,16 +232,20 @@ def test_revert_create():
def test_discard_all_pending_changes_resources_created():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
@ -236,16 +261,20 @@ def test_discard_all_pending_changes_resources_created():
def test_discard_connection():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
@ -267,9 +296,11 @@ def test_discard_connection():
def test_discard_removed():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()
@ -289,9 +320,11 @@ def test_discard_removed():
def test_discard_update():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()

View File

@ -12,10 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from solar.test import base
from solar import errors
from solar.core import validation as sv
from solar.test import base
class TestInputValidation(base.BaseResourceTest):

View File

@ -18,8 +18,9 @@ from StringIO import StringIO
import pytest
import yaml
from solar.events.controls import React, Dep
from solar.core.resource import virtual_resource as vr
from solar.events.controls import Dep
from solar.events.controls import React
@pytest.fixture

View File

@ -18,11 +18,13 @@ import json
import logging
import os
import uuid
import yaml
from jinja2 import Environment
import yaml
logger = logging.getLogger(__name__)

View File

@ -12,7 +12,7 @@ deps = -r{toxinidir}/test-requirements.txt
commands = ostestr --serial
[testenv:pep8]
deps = hacking==0.10.2
deps = hacking==0.9.6
usedevelop = False
commands =
flake8 {posargs:solar/core}
@ -27,7 +27,7 @@ envdir = devenv
usedevelop = True
[flake8]
ignore = H101,H236,E731
ignore = E731
exclude = .venv,.git,.tox,dist,doc,*lib/python*,*egg,build,tools,__init__.py,docs
show-pep8 = True
show-source = True