Adjusted tests code to OpenStack Style Guidelines

This commit is contained in:
Jedrzej Nowak 2015-11-26 14:09:54 +01:00
parent 67e1f21481
commit ae112eeef3
14 changed files with 447 additions and 469 deletions

View File

@ -15,12 +15,13 @@
import os
import shutil
import tempfile
import unittest
import yaml
import time
import unittest
import yaml
from solar.core.resource import virtual_resource as vr
from solar.dblayer.model import Model, Replacer, ModelMeta, get_bucket
from solar.dblayer.model import Model
def patched_get_bucket_name(cls):

View File

@ -14,11 +14,13 @@
import os
import time
from solar.core.resource import Resource
from solar.dblayer.model import Model, ModelMeta, get_bucket
import pytest
from solar.core.resource import Resource
from solar.dblayer.model import Model
from solar.dblayer.model import ModelMeta
from solar.dblayer.model import get_bucket
def patched_get_bucket_name(cls):
return cls.__name__ + str(time.time())
@ -49,13 +51,6 @@ def setup(request):
model.bucket = get_bucket(None, model, ModelMeta)
@pytest.fixture(autouse=True)
def setup(request):
for model in ModelMeta._defined_models:
model.bucket = get_bucket(None, model, ModelMeta)
def pytest_runtest_teardown(item, nextitem):
ModelMeta.session_end(result=True)
return nextitem

View File

@ -13,8 +13,8 @@
# under the License.
import networkx as nx
from pytest import fixture
from mock import patch
from pytest import fixture
from solar.orchestration import executor
@ -29,7 +29,6 @@ def dg():
@patch.object(executor, 'app')
def test_celery_executor(mapp, dg):
"""Just check that it doesnt fail for now.
"""
"""Just check that it doesnt fail for now."""
assert executor.celery_executor(dg, ['t1'])
assert dg.node['t1']['status'] == 'INPROGRESS'

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from dictdiffer import patch
from dictdiffer import revert
from pytest import fixture
from dictdiffer import revert, patch
import networkx as nx
from solar.system_log import change
@ -28,8 +28,8 @@ def staged():
'metadata': {},
'connections': [
['node.1', 'res.1', ['ip', 'ip']],
['node.1', 'res.1', ['key', 'key']]]
}
['node.1', 'res.1', ['key', 'key']]
]}
@fixture
@ -40,8 +40,8 @@ def commited():
'list_val': [1]},
'metadata': {},
'connections': [
['node.1', 'res.1', ['ip', 'ip']]]
}
['node.1', 'res.1', ['ip', 'ip']]
]}
@fixture
@ -56,17 +56,24 @@ def diff_for_update(staged, commited):
def test_create_diff_with_empty_commited(full_diff):
# add will be executed
expected = [('add', '', [('connections', [['node.1', 'res.1', ['ip', 'ip']], ['node.1', 'res.1', ['key', 'key']]]), ('input', {
'ip': {'value': '10.0.0.2'}, 'list_val': {'value': [1, 2]}}), ('metadata', {}), ('id', 'res.1'), ('tags', ['res', 'node.1'])])]
expected = [('add', '',
[('connections', [['node.1', 'res.1', ['ip', 'ip']],
['node.1', 'res.1', ['key', 'key']]]),
('input', {
'ip': {'value': '10.0.0.2'},
'list_val': {'value': [1, 2]}
}), ('metadata', {}), ('id', 'res.1'),
('tags', ['res', 'node.1'])])]
assert full_diff == expected
def test_create_diff_modified(diff_for_update):
assert diff_for_update == [
('add', 'connections',
[(1, ['node.1', 'res.1', ['key', 'key']])]),
('change', 'input.ip', ('10.0.0.2', {'value': '10.0.0.2'})),
('change', 'input.list_val', ([1], {'value': [1, 2]}))]
('add', 'connections', [(1, ['node.1', 'res.1', ['key', 'key']])]),
('change', 'input.ip',
('10.0.0.2', {'value': '10.0.0.2'})), ('change', 'input.list_val',
([1], {'value': [1, 2]}))
]
def test_verify_patch_creates_expected(staged, diff_for_update, commited):
@ -81,20 +88,17 @@ def test_revert_update(staged, diff_for_update, commited):
@fixture
def resources():
r = {'n.1':
{'uid': 'n.1',
'args': {'ip': '10.20.0.2'},
'connections': [],
'tags': []},
'r.1':
{'uid': 'r.1',
'args': {'ip': '10.20.0.2'},
'connections': [['n.1', 'r.1', ['ip', 'ip']]],
'tags': []},
'h.1':
{'uid': 'h.1',
'args': {'ip': '10.20.0.2',
'ips': ['10.20.0.2']},
'connections': [['n.1', 'h.1', ['ip', 'ip']]],
'tags': []}}
r = {'n.1': {'uid': 'n.1',
'args': {'ip': '10.20.0.2'},
'connections': [],
'tags': []},
'r.1': {'uid': 'r.1',
'args': {'ip': '10.20.0.2'},
'connections': [['n.1', 'r.1', ['ip', 'ip']]],
'tags': []},
'h.1': {'uid': 'h.1',
'args': {'ip': '10.20.0.2',
'ips': ['10.20.0.2']},
'connections': [['n.1', 'h.1', ['ip', 'ip']]],
'tags': []}}
return r

View File

@ -16,10 +16,8 @@
import networkx as nx
from pytest import fixture
from solar.events import api as evapi
from solar.dblayer.solar_models import Resource
from .base import BaseResourceTest
from solar.events import api as evapi
@fixture
@ -76,9 +74,7 @@ def test_nova_api_run_after_nova(nova_deps):
def test_nova_api_react_on_update(nova_deps):
"""Test that nova_api:update will be called even if there is no changes
in nova_api
"""
"""Test that nova_api:update will be called even if there is no changes in nova_api""" # NOQA
changes_graph = nx.DiGraph()
changes_graph.add_node('nova.update')
evapi.build_edges(changes_graph, nova_deps)
@ -89,19 +85,25 @@ def test_nova_api_react_on_update(nova_deps):
@fixture
def rmq_deps():
"""Example of a case when defaults are not good enough.
For example we need to run some stuff on first node before two others.
"""
# NOTE(dshulyak) is it possible to put this create/join logic into
# puppet manifest? So that rmq_cluster.2 before joining will check if
# cluster already exists?
return {
'rmq.1': [evapi.Dep('rmq.1', 'run', 'success', 'rmq_cluster.1', 'create')],
'rmq.2': [evapi.Dep('rmq.2', 'run', 'success', 'rmq_cluster.2', 'join')],
'rmq.3': [evapi.Dep('rmq.3', 'run', 'success', 'rmq_cluster.3', 'join')],
'rmq.1':
[evapi.Dep('rmq.1', 'run', 'success', 'rmq_cluster.1', 'create')],
'rmq.2':
[evapi.Dep('rmq.2', 'run', 'success', 'rmq_cluster.2', 'join')],
'rmq.3':
[evapi.Dep('rmq.3', 'run', 'success', 'rmq_cluster.3', 'join')],
'rmq_cluster.1': [
evapi.Dep('rmq_cluster.1', 'create',
'success', 'rmq_cluster.2', 'join'),
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.3', 'join')]}
evapi.Dep('rmq_cluster.1', 'create', 'success', 'rmq_cluster.2',
'join'), evapi.Dep('rmq_cluster.1', 'create', 'success',
'rmq_cluster.3', 'join')
]
}
def test_rmq(rmq_deps):
@ -115,29 +117,35 @@ def test_rmq(rmq_deps):
evapi.build_edges(changes_graph, rmq_deps)
assert set(changes_graph.successors('rmq_cluster.1.create')) == {
'rmq_cluster.2.join', 'rmq_cluster.3.join'}
'rmq_cluster.2.join', 'rmq_cluster.3.join'
}
def test_riak():
events = {
'riak_service1': [
evapi.React('riak_service1', 'run', 'success',
'riak_service2', 'run'),
evapi.React('riak_service1', 'run', 'success', 'riak_service3', 'run')],
evapi.React('riak_service1', 'run', 'success', 'riak_service2',
'run'), evapi.React('riak_service1', 'run', 'success',
'riak_service3', 'run')
],
'riak_service3': [
evapi.React('riak_service3', 'join', 'success',
'riak_service1', 'commit'),
evapi.React('riak_service3', 'run', 'success', 'riak_service3', 'join')],
evapi.React('riak_service3', 'join', 'success', 'riak_service1',
'commit'),
evapi.React('riak_service3', 'run', 'success', 'riak_service3',
'join')
],
'riak_service2': [
evapi.React('riak_service2', 'run', 'success',
'riak_service2', 'join'),
evapi.React('riak_service2', 'join', 'success', 'riak_service1', 'commit')],
evapi.React('riak_service2', 'run', 'success', 'riak_service2',
'join'),
evapi.React('riak_service2', 'join', 'success', 'riak_service1',
'commit')
],
}
changes_graph = nx.MultiDiGraph()
changes_graph.add_node('riak_service1.run')
evapi.build_edges(changes_graph, events)
assert set(changes_graph.predecessors('riak_service1.commit')) == {
'riak_service2.join', 'riak_service3.join'}
'riak_service2.join', 'riak_service3.join'
}

View File

@ -18,14 +18,12 @@ from pytest import fixture
from solar.orchestration import graph
from solar.orchestration.traversal import states
from solar.dblayer.model import ModelMeta
@fixture
def simple():
simple_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures',
'simple.yaml')
return graph.create_plan(simple_path)
@ -59,7 +57,13 @@ def test_wait_finish(simple):
simple.node[n]['status'] = states.SUCCESS.name
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 2, 'NOOP': 0, 'ERROR': 0, 'INPROGRESS': 0, 'PENDING': 0}
'SKIPPED': 0,
'SUCCESS': 2,
'NOOP': 0,
'ERROR': 0,
'INPROGRESS': 0,
'PENDING': 0
}
def test_several_updates(simple):
@ -67,10 +71,22 @@ def test_several_updates(simple):
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 1, 'INPROGRESS': 0, 'PENDING': 1}
'SKIPPED': 0,
'SUCCESS': 0,
'NOOP': 0,
'ERROR': 1,
'INPROGRESS': 0,
'PENDING': 1
}
simple.node['echo_stuff']['status'] = states.ERROR.name
graph.update_graph(simple)
assert next(graph.wait_finish(simple.graph['uid'], 10)) == {
'SKIPPED': 0, 'SUCCESS': 0, 'NOOP': 0, 'ERROR': 2, 'INPROGRESS': 0, 'PENDING': 0}
'SKIPPED': 0,
'SUCCESS': 0,
'NOOP': 0,
'ERROR': 2,
'INPROGRESS': 0,
'PENDING': 0
}

View File

@ -14,14 +14,13 @@
import os
import networkx as nx
from pytest import fixture
from pytest import mark
import networkx as nx
from solar.orchestration import graph
from solar.orchestration import filters
from solar.orchestration import graph
from solar.orchestration.traversal import states
from solar.utils import yaml_load
@fixture
@ -44,8 +43,7 @@ def test_end_at(dg_ex1, end_nodes, visited):
@mark.parametrize("start_nodes,visited", [
(['n3'], {'n3'}),
(['n1'], {'n1', 'n2', 'n4'}),
(['n3'], {'n3'}), (['n1'], {'n1', 'n2', 'n4'}),
(['n1', 'n3'], {'n1', 'n2', 'n3', 'n4', 'n5'})
])
def test_start_from(dg_ex1, start_nodes, visited):
@ -63,20 +61,21 @@ def dg_ex2():
@fixture
def riak_plan():
riak_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures',
'riak.yaml')
return graph.create_plan(riak_path, save=False)
def test_riak_start_node1(riak_plan):
assert filters.start_from(riak_plan, ['node1.run']) == {
'node1.run', 'hosts_file1.run', 'riak_service1.run'}
'node1.run', 'hosts_file1.run', 'riak_service1.run'
}
def test_riak_end_hosts_file1(riak_plan):
assert filters.end_at(riak_plan, ['hosts_file1.run']) == {
'node1.run', 'hosts_file1.run'}
'node1.run', 'hosts_file1.run'
}
def test_start_at_two_nodes(riak_plan):
@ -87,8 +86,10 @@ def test_start_at_two_nodes(riak_plan):
def test_initial_from_node1_traverse(riak_plan):
filters.filter(riak_plan, start=['node1.run'])
pending = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.PENDING.name}
pending = {n
for n in riak_plan
if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file1.run', 'riak_service1.run', 'node1.run'}
@ -97,18 +98,21 @@ def test_second_from_node2_with_node1_walked(riak_plan):
for n in success:
riak_plan.node[n]['status'] = states.SUCCESS.name
filters.filter(riak_plan, start=['node2.run'])
pending = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file2.run', 'riak_service2.run',
'node2.run', 'riak_service2.join'}
pending = {n
for n in riak_plan
if riak_plan.node[
n]['status'] == states.PENDING.name}
assert pending == {'hosts_file2.run', 'riak_service2.run', 'node2.run',
'riak_service2.join'}
def test_end_joins(riak_plan):
filters.filter(
riak_plan,
start=['node1.run', 'node2.run', 'node3.run'],
end=['riak_service2.join', 'riak_service3.join'])
skipped = {n for n in riak_plan if riak_plan.node[
n]['status'] == states.SKIPPED.name}
filters.filter(riak_plan,
start=['node1.run', 'node2.run', 'node3.run'],
end=['riak_service2.join', 'riak_service3.join'])
skipped = {n
for n in riak_plan
if riak_plan.node[
n]['status'] == states.SKIPPED.name}
assert skipped == {'riak_service1.commit'}

View File

@ -14,40 +14,49 @@
import os
from pytest import fixture
import networkx as nx
from pytest import fixture
from solar.orchestration import limits
from solar.orchestration import graph
from solar.orchestration import limits
@fixture
def dg():
ex = nx.DiGraph()
ex.add_node('t1', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t2', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t3', status='PENDING', target='1',
resource_type='node', type_limit=2)
ex.add_node('t1',
status='PENDING',
target='1',
resource_type='node',
type_limit=2)
ex.add_node('t2',
status='PENDING',
target='1',
resource_type='node',
type_limit=2)
ex.add_node('t3',
status='PENDING',
target='1',
resource_type='node',
type_limit=2)
return ex
def test_target_rule(dg):
assert limits.target_based_rule(dg, [], 't1') == True
assert limits.target_based_rule(dg, ['t1'], 't2') == False
assert limits.target_based_rule(dg, [], 't1') is True
assert limits.target_based_rule(dg, ['t1'], 't2') is False
def test_type_limit_rule(dg):
assert limits.type_based_rule(dg, ['t1'], 't2') == True
assert limits.type_based_rule(dg, ['t1', 't2'], 't3') == False
assert limits.type_based_rule(dg, ['t1'], 't2') is True
assert limits.type_based_rule(dg, ['t1', 't2'], 't3') is False
def test_items_rule(dg):
assert limits.items_rule(dg, ['1'] * 99, '2')
assert limits.items_rule(dg, ['1'] * 99, '2', limit=10) == False
assert limits.items_rule(dg, ['1'] * 99, '2', limit=10) is False
@fixture
@ -68,8 +77,7 @@ def test_filtering_chain(target_dg):
@fixture
def seq_plan():
seq_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
'orch_fixtures',
os.path.dirname(os.path.realpath(__file__)), 'orch_fixtures',
'sequential.yaml')
return graph.create_plan(seq_path, save=False)
@ -78,6 +86,6 @@ def test_limits_sequential(seq_plan):
stack_to_execute = seq_plan.nodes()
while stack_to_execute:
left = stack_to_execute[0]
assert list(limits.get_default_chain(
seq_plan, [], stack_to_execute)) == [left]
assert list(limits.get_default_chain(seq_plan, [],
stack_to_execute)) == [left]
stack_to_execute.pop(0)

View File

@ -15,8 +15,8 @@
from pytest import fixture
from solar.core import resource
from solar.dblayer.solar_models import Resource
from solar.dblayer.model import ModelMeta
from solar.dblayer.solar_models import Resource
@fixture

View File

@ -13,13 +13,11 @@
# under the License.
import base
from solar.core import resource
from solar.core import signals
class TestResource(base.BaseResourceTest):
def test_resource_args(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -31,9 +29,8 @@ input:
value: 0
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'value': 1})
self.assertEqual(sample1.args['value'], 1)
# test default value
@ -41,7 +38,8 @@ input:
self.assertEqual(sample2.args['value'], 0)
def test_connections_recreated_after_load(self):
"""
"""Test if connections are ok after load
Create resource in some process. Then in other process load it.
All connections should remain the same.
"""
@ -56,12 +54,9 @@ input:
""")
def creating_process():
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir,
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'value': 1})
sample2 = self.create_resource('sample2', sample_meta_dir, )
signals.connect(sample1, sample2)
self.assertEqual(sample1.args['value'], sample2.args['value'])
@ -86,9 +81,7 @@ input:
value: 0
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'value': 1}
)
sample = self.create_resource('sample', sample_meta_dir, {'value': 1})
sample_l = resource.load('sample')
@ -107,12 +100,9 @@ input:
value: 0
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'value': 1}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'value': 1})
sample2 = self.create_resource('sample2', sample_meta_dir, {})
signals.connect(sample1, sample2)
self.assertEqual(sample1.args['value'], sample2.args['value'])

View File

@ -12,15 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import base
from solar.core import signals as xs
import pytest
import base
from solar.core import signals as xs
class TestBaseInput(base.BaseResourceTest):
def test_no_self_connection(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -32,13 +30,11 @@ input:
value:
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'value': 'x'}
)
sample = self.create_resource('sample', sample_meta_dir,
{'value': 'x'})
with self.assertRaisesRegexp(
Exception,
'Trying to connect value-.* to itself'):
with self.assertRaisesRegexp(Exception,
'Trying to connect value-.* to itself'):
xs.connect(sample, sample, {'value'})
def test_input_dict_type(self):
@ -52,42 +48,23 @@ input:
value: {}
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'values': {'a': 1, 'b': 2}}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir
)
sample1 = self.create_resource('sample1', sample_meta_dir, {'values':
{'a': 1,
'b': 2}})
sample2 = self.create_resource('sample2', sample_meta_dir)
xs.connect(sample1, sample2)
self.assertEqual(
sample1.args['values'],
sample2.args['values']
)
self.assertEqual(sample1.args['values'], sample2.args['values'])
# Check update
sample1.update({'values': {'a': 2}})
self.assertEqual(
sample1.args['values'],
{'a': 2}
)
self.assertEqual(
sample1.args['values'],
sample2.args['values'],
)
self.assertEqual(sample1.args['values'], {'a': 2})
self.assertEqual(sample1.args['values'], sample2.args['values'], )
# Check disconnect
# TODO: should sample2.value be reverted to original value?
sample1.disconnect(sample2)
sample1.update({'values': {'a': 3}})
self.assertEqual(
sample1.args['values'],
{'a': 3}
)
# self.assertEqual(
# sample2.args['values'],
# {'a': 2}
#)
#self.assertEqual(sample2.args['values'].emitter, None)
self.assertEqual(sample1.args['values'], {'a': 3})
def test_multiple_resource_disjoint_connect(self):
sample_meta_dir = self.make_resource_meta("""
@ -121,31 +98,19 @@ input:
value:
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'ip': None, 'port': None}
)
sample_ip = self.create_resource(
'sample-ip', sample_ip_meta_dir, {'ip': '10.0.0.1'}
)
sample_port = self.create_resource(
'sample-port', sample_port_meta_dir, {'port': 8000}
)
self.assertNotEqual(
sample.resource_inputs()['ip'],
sample_ip.resource_inputs()['ip'],
)
sample = self.create_resource('sample', sample_meta_dir,
{'ip': None,
'port': None})
sample_ip = self.create_resource('sample-ip', sample_ip_meta_dir,
{'ip': '10.0.0.1'})
sample_port = self.create_resource('sample-port', sample_port_meta_dir,
{'port': 8000})
self.assertNotEqual(sample.resource_inputs()['ip'],
sample_ip.resource_inputs()['ip'], )
xs.connect(sample_ip, sample)
xs.connect(sample_port, sample)
self.assertEqual(sample.args['ip'], sample_ip.args['ip'])
self.assertEqual(sample.args['port'], sample_port.args['port'])
# self.assertEqual(
# sample.args['ip'].emitter,
# sample_ip.args['ip']
#)
# self.assertEqual(
# sample.args['port'].emitter,
# sample_port.args['port']
#)
def test_simple_observer_unsubscription(self):
sample_meta_dir = self.make_resource_meta("""
@ -158,29 +123,18 @@ input:
value:
""")
sample = self.create_resource(
'sample', sample_meta_dir, {'ip': None}
)
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2'}
)
sample = self.create_resource('sample', sample_meta_dir, {'ip': None})
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1'})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2'})
xs.connect(sample1, sample)
self.assertEqual(sample1.args['ip'], sample.args['ip'])
#self.assertEqual(len(list(sample1.args['ip'].receivers)), 1)
# self.assertEqual(
# sample.args['ip'].emitter,
# sample1.args['ip']
#)
xs.connect(sample2, sample)
self.assertEqual(sample2.args['ip'], sample.args['ip'])
# sample should be unsubscribed from sample1 and subscribed to sample2
#self.assertEqual(len(list(sample1.args['ip'].receivers)), 0)
#self.assertEqual(sample.args['ip'].emitter, sample2.args['ip'])
sample2.update({'ip': '10.0.0.3'})
self.assertEqual(sample2.args['ip'], sample.args['ip'])
@ -198,20 +152,17 @@ input:
value:
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2'}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1'})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2'})
xs.connect(sample1, sample2)
with self.assertRaises(Exception):
with self.assertRaises(Exception): # NOQA
xs.connect(sample2, sample1)
class TestListInput(base.BaseResourceTest):
def test_list_input_single(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -232,68 +183,36 @@ input:
value: []
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1'}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2'}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1'})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2'})
list_input_single = self.create_resource(
'list-input-single', list_input_single_meta_dir, {'ips': []}
)
'list-input-single', list_input_single_meta_dir, {'ips': []})
sample1.connect(list_input_single, mapping={'ip': 'ips'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip')]
#)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
])
sample2.connect(list_input_single, mapping={'ip': 'ips'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip'),
# (sample2.args['ip'].attached_to.name, 'ip')]
#)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
sample2.args['ip'],
])
# Test update
sample2.update({'ip': '10.0.0.3'})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
sample2.args['ip'],
])
# Test disconnect
sample2.disconnect(list_input_single)
self.assertItemsEqual(
#[ip['value'] for ip in list_input_single.args['ips']],
list_input_single.args['ips'],
[
sample1.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_single.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip')]
#)
self.assertItemsEqual(list_input_single.args['ips'], [
sample1.args['ip'],
])
def test_list_input_multi(self):
sample_meta_dir = self.make_resource_meta("""
@ -321,70 +240,58 @@ input:
value:
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, {'ip': '10.0.0.1', 'port': 1000}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, {'ip': '10.0.0.2', 'port': 1001}
)
list_input_multi = self.create_resource(
'list-input-multi', list_input_multi_meta_dir, args={'ips': [], 'ports': []}
)
sample1 = self.create_resource('sample1', sample_meta_dir,
{'ip': '10.0.0.1',
'port': 1000})
sample2 = self.create_resource('sample2', sample_meta_dir,
{'ip': '10.0.0.2',
'port': 1001})
list_input_multi = self.create_resource('list-input-multi',
list_input_multi_meta_dir,
args={'ips': [],
'ports': []})
xs.connect(sample1, list_input_multi, mapping={
'ip': 'ips', 'port': 'ports'})
xs.connect(sample1,
list_input_multi,
mapping={
'ip': 'ips',
'port': 'ports'
})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
[sample1.args['ip']]
)
[sample1.args['ip']])
self.assertItemsEqual(
#[p['value'] for p in list_input_multi.args['ports']],
list_input_multi.args['ports'],
[sample1.args['port']]
)
[sample1.args['port']])
xs.connect(sample2, list_input_multi, mapping={
'ip': 'ips', 'port': 'ports'})
xs.connect(sample2,
list_input_multi,
mapping={
'ip': 'ips',
'port': 'ports'
})
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
[
sample1.args['ip'],
sample2.args['ip'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ips']],
# [(sample1.args['ip'].attached_to.name, 'ip'),
# (sample2.args['ip'].attached_to.name, 'ip')]
#)
])
self.assertItemsEqual(
#[p['value'] for p in list_input_multi.args['ports']],
list_input_multi.args['ports'],
[
sample1.args['port'],
sample2.args['port'],
]
)
# self.assertListEqual(
# [(e['emitter_attached_to'], e['emitter']) for e in list_input_multi.args['ports']],
# [(sample1.args['port'].attached_to.name, 'port'),
# (sample2.args['port'].attached_to.name, 'port')]
#)
])
# Test disconnect
sample2.disconnect(list_input_multi)
self.assertItemsEqual(
#[ip['value'] for ip in list_input_multi.args['ips']],
list_input_multi.args['ips'],
[sample1.args['ip']]
)
[sample1.args['ip']])
self.assertItemsEqual(
#[p['value'] for p in list_input_multi.args['ports']],
list_input_multi.args['ports'],
[sample1.args['port']]
)
[sample1.args['port']])
# XXX: not used for now, not implemented in new db (jnowak)
# @pytest.mark.xfail(reason="Nested lists are not supported in new_db")
@ -446,14 +353,13 @@ input:
# sample1.connect(list_input, mapping={'ip': 'ips', 'port': 'ports'})
# sample2.connect(list_input, mapping={'ip': 'ips', 'port': 'ports'})
# list_input.connect(list_input_nested, mapping={'ips': 'ipss', 'ports': 'portss'})
# list_input.connect(list_input_nested,
# mapping={'ips': 'ipss', 'ports': 'portss'})
# self.assertListEqual(
# #[ips['value'] for ips in list_input_nested.args['ipss']],
# list_input_nested.args['ipss'],
# [list_input.args['ips']]
# )
# self.assertListEqual(
# #[ps['value'] for ps in list_input_nested.args['portss']],
# list_input_nested.args['portss'],
# [list_input.args['ports']]
# )
@ -461,7 +367,6 @@ input:
# # Test disconnect
# xs.disconnect(sample1, list_input)
# self.assertListEqual(
# #[[ip['value'] for ip in ips['value']] for ips in list_input_nested.args['ipss']],
# list_input_nested.args['ipss'],
# [[sample2.args['ip']]]
# )
@ -472,8 +377,8 @@ input:
class TestHashInput(base.BaseResourceTest):
@pytest.mark.xfail(reason="Connect should raise an error if already connected")
@pytest.mark.xfail(
reason="Connect should raise an error if already connected")
def test_hash_input_basic(self):
sample_meta_dir = self.make_resource_meta("""
id: sample
@ -496,42 +401,51 @@ input:
schema: {ip: str!, port: int!}
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
sample3 = self.create_resource(
'sample3', sample_meta_dir, args={'ip': '10.0.0.3', 'port': 5002}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
xs.connect(sample1, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
sample1 = self.create_resource('sample1',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
sample2 = self.create_resource('sample2',
sample_meta_dir,
args={'ip': '10.0.0.2',
'port': 5001})
sample3 = self.create_resource('sample3',
sample_meta_dir,
args={'ip': '10.0.0.3',
'port': 5002})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample1,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertDictEqual(
{'ip': sample1.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
)
{'ip': sample1.args['ip'],
'port': sample1.args['port']},
receiver.args['server'], )
sample1.update({'ip': '10.0.0.2'})
self.assertDictEqual(
{'ip': sample1.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
)
{'ip': sample1.args['ip'],
'port': sample1.args['port']},
receiver.args['server'], )
# XXX: We need to disconnect first
# XXX: it should raise error when connecting already connected inputs
xs.connect(sample2, receiver, mapping={'ip': 'server:ip'})
self.assertDictEqual(
{'ip': sample2.args['ip'], 'port': sample1.args['port']},
receiver.args['server'],
)
xs.connect(sample3, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
{'ip': sample2.args['ip'],
'port': sample1.args['port']},
receiver.args['server'], )
xs.connect(sample3,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertDictEqual(
{'ip': sample3.args['ip'], 'port': sample3.args['port']},
receiver.args['server'],
)
{'ip': sample3.args['ip'],
'port': sample3.args['port']},
receiver.args['server'], )
def test_hash_input_mixed(self):
sample_meta_dir = self.make_resource_meta("""
@ -555,22 +469,23 @@ input:
schema: {ip: str!, port: int!}
""")
sample = self.create_resource(
'sample', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir, args={'server': {'port': 5001}}
)
sample = self.create_resource('sample',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
receiver = self.create_resource('receiver',
receiver_meta_dir,
args={'server': {'port': 5001}})
sample.connect(receiver, mapping={'ip': 'server:ip'})
self.assertDictEqual(
{'ip': sample.args['ip'], 'port': 5001},
receiver.args['server'],
)
{'ip': sample.args['ip'],
'port': 5001},
receiver.args['server'], )
sample.update({'ip': '10.0.0.2'})
self.assertDictEqual(
{'ip': sample.args['ip'], 'port': 5001},
receiver.args['server'],
)
{'ip': sample.args['ip'],
'port': 5001},
receiver.args['server'], )
def test_hash_input_with_list(self):
sample_meta_dir = self.make_resource_meta("""
@ -594,33 +509,41 @@ input:
schema: [{ip: str!, port: int!}]
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
xs.connect(sample1, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
sample1 = self.create_resource('sample1',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample1,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample1.args['port']}],
receiver.args['server'],
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
xs.connect(sample2, receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
[{'ip': sample1.args['ip'],
'port': sample1.args['port']}],
receiver.args['server'], )
sample2 = self.create_resource('sample2',
sample_meta_dir,
args={'ip': '10.0.0.2',
'port': 5001})
xs.connect(sample2,
receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample1.args['port']},
{'ip': sample2.args['ip'], 'port': sample2.args['port']}],
receiver.args['server'],
)
[{'ip': sample1.args['ip'],
'port': sample1.args['port']}, {'ip': sample2.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
sample1.disconnect(receiver)
self.assertItemsEqual(
[{'ip': sample2.args['ip'], 'port': sample2.args['port']}],
receiver.args['server'],
)
[{'ip': sample2.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
def test_hash_input_with_multiple_connections(self):
sample_meta_dir = self.make_resource_meta("""
@ -644,21 +567,14 @@ input:
schema: {ip: str!}
""")
sample = self.create_resource(
'sample', sample_meta_dir, args={'ip': '10.0.0.1'}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
sample = self.create_resource('sample',
sample_meta_dir,
args={'ip': '10.0.0.1'})
receiver = self.create_resource('receiver', receiver_meta_dir)
xs.connect(sample, receiver, mapping={'ip': ['ip', 'server:ip']})
self.assertEqual(
sample.args['ip'],
receiver.args['ip']
)
self.assertDictEqual(
{'ip': sample.args['ip']},
receiver.args['server'],
)
self.assertEqual(sample.args['ip'], receiver.args['ip'])
self.assertDictEqual({'ip': sample.args['ip']},
receiver.args['server'], )
def test_hash_input_multiple_resources_with_tag_connect(self):
sample_meta_dir = self.make_resource_meta("""
@ -682,44 +598,49 @@ input:
schema: [{ip: str!, port: int!}]
""")
sample1 = self.create_resource(
'sample1', sample_meta_dir, args={'ip': '10.0.0.1', 'port': 5000}
)
sample2 = self.create_resource(
'sample2', sample_meta_dir, args={'ip': '10.0.0.2', 'port': 5001}
)
receiver = self.create_resource(
'receiver', receiver_meta_dir
)
sample1 = self.create_resource('sample1',
sample_meta_dir,
args={'ip': '10.0.0.1',
'port': 5000})
sample2 = self.create_resource('sample2',
sample_meta_dir,
args={'ip': '10.0.0.2',
'port': 5001})
receiver = self.create_resource('receiver', receiver_meta_dir)
sample1.connect(receiver, mapping={'ip': 'server:ip'})
sample2.connect(receiver, mapping={'port': 'server:port|sample1'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']}],
receiver.args['server'],
)
sample3 = self.create_resource(
'sample3', sample_meta_dir, args={'ip': '10.0.0.3', 'port': 5002}
)
sample3.connect(receiver, mapping={
'ip': 'server:ip', 'port': 'server:port'})
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}],
receiver.args['server'], )
sample3 = self.create_resource('sample3',
sample_meta_dir,
args={'ip': '10.0.0.3',
'port': 5002})
sample3.connect(receiver,
mapping={
'ip': 'server:ip',
'port': 'server:port'
})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample3.args['ip'], 'port': sample3.args['port']}],
receiver.args['server'],
)
sample4 = self.create_resource(
'sample4', sample_meta_dir, args={'ip': '10.0.0.4', 'port': 5003}
)
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}, {'ip': sample3.args['ip'],
'port': sample3.args['port']}],
receiver.args['server'], )
sample4 = self.create_resource('sample4',
sample_meta_dir,
args={'ip': '10.0.0.4',
'port': 5003})
sample4.connect(receiver, mapping={'port': 'server:port|sample3'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample3.args['ip'], 'port': sample4.args['port']}],
receiver.args['server'],
)
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}, {'ip': sample3.args['ip'],
'port': sample4.args['port']}],
receiver.args['server'], )
# There can be no sample3 connections left now
sample4.connect(receiver, mapping={'ip': 'server:ip|sample3'})
self.assertItemsEqual(
[{'ip': sample1.args['ip'], 'port': sample2.args['port']},
{'ip': sample4.args['ip'], 'port': sample4.args['port']}],
receiver.args['server'],
)
[{'ip': sample1.args['ip'],
'port': sample2.args['port']}, {'ip': sample4.args['ip'],
'port': sample4.args['port']}],
receiver.args['server'], )

View File

@ -13,25 +13,27 @@
# under the License.
import mock
from pytest import fixture
from pytest import mark
from solar.core.resource import resource
from solar.core.resource import RESOURCE_STATE
from solar.core import signals
from solar.dblayer.model import ModelMeta
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.solar_models import Resource as DBResource
from solar.system_log import change
from solar.system_log import data
from solar.system_log import operations
from solar.core import signals
from solar.core.resource import resource, RESOURCE_STATE
from solar.dblayer.solar_models import Resource as DBResource
from solar.dblayer.solar_models import CommitedResource
from solar.dblayer.model import ModelMeta
def test_revert_update():
commit = {'a': '10'}
previous = {'a': '9'}
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
{'name': 'test1',
'base_path': 'x',
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.save()
action = 'update'
res.inputs['a'] = '9'
@ -40,9 +42,11 @@ def test_revert_update():
assert resource_obj.args == previous
log = data.SL()
logitem = change.create_logitem(
res.name, action, change.create_diff(commit, previous), [],
base_path=res.base_path)
logitem = change.create_logitem(res.name,
action,
change.create_diff(commit, previous),
[],
base_path=res.base_path)
log.append(logitem)
resource_obj.update(commit)
operations.move_to_commited(logitem.log_action)
@ -56,23 +60,29 @@ def test_revert_update():
def test_revert_update_connected():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = ''
res2.save_lazy()
res3 = DBResource.from_dict('test3',
{'name': 'test3', 'base_path': 'x',
{'name': 'test3',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res3.inputs['a'] = ''
res3.save_lazy()
@ -113,15 +123,16 @@ def test_revert_update_connected():
def test_revert_removal():
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.inputs['a'] = '9'
res.save_lazy()
commited = CommitedResource.from_dict('test1',
{'inputs': {'a': '9'},
'state': 'operational'})
commited = CommitedResource.from_dict('test1', {'inputs': {'a': '9'},
'state': 'operational'})
commited.save_lazy()
resource_obj = resource.load(res.name)
@ -139,23 +150,31 @@ def test_revert_removal():
with mock.patch.object(resource, 'read_meta') as mread:
mread.return_value = {
'input': {'a': {'schema': 'str!'}}, 'id': 'mocked'}
'input': {'a': {'schema': 'str!'}},
'id': 'mocked'
}
change.revert(changes[0].uid)
ModelMeta.save_all_lazy()
assert len(DBResource.bucket.get('test1').siblings) == 1
resource_obj = resource.load('test1')
assert resource_obj.args == {
'a': '9', 'location_id': '', 'transports_id': ''}
'a': '9',
'location_id': '',
'transports_id': ''
}
@mark.xfail(reason='With current approach child will be notice changes after parent is removed')
@mark.xfail(
reason="""With current approach child will
notice changes after parent is removed"""
)
def test_revert_removed_child():
res1 = orm.DBResource(id='test1', name='test1', base_path='x')
res1 = orm.DBResource(id='test1', name='test1', base_path='x') # NOQA
res1.save()
res1.add_input('a', 'str', '9')
res2 = orm.DBResource(id='test2', name='test2', base_path='x')
res2 = orm.DBResource(id='test2', name='test2', base_path='x') # NOQA
res2.save()
res2.add_input('a', 'str', 0)
@ -184,9 +203,11 @@ def test_revert_removed_child():
def test_revert_create():
res = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res.inputs['a'] = '9'
res.save_lazy()
ModelMeta.save_all_lazy()
@ -211,16 +232,20 @@ def test_revert_create():
def test_discard_all_pending_changes_resources_created():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
@ -236,16 +261,20 @@ def test_discard_all_pending_changes_resources_created():
def test_discard_connection():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
res2 = DBResource.from_dict('test2',
{'name': 'test2', 'base_path': 'x',
{'name': 'test2',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res2.inputs['a'] = '0'
res2.save_lazy()
ModelMeta.save_all_lazy()
@ -267,9 +296,11 @@ def test_discard_connection():
def test_discard_removed():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()
@ -289,9 +320,11 @@ def test_discard_removed():
def test_discard_update():
res1 = DBResource.from_dict('test1',
{'name': 'test1', 'base_path': 'x',
{'name': 'test1',
'base_path': 'x',
'state': RESOURCE_STATE.created.name,
'meta_inputs': {'a': {'value': None, 'schema': 'str'}}})
'meta_inputs': {'a': {'value': None,
'schema': 'str'}}})
res1.inputs['a'] = '9'
res1.save_lazy()
ModelMeta.save_all_lazy()

View File

@ -12,10 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from solar.test import base
from solar import errors
from solar.core import validation as sv
from solar.test import base
class TestInputValidation(base.BaseResourceTest):

View File

@ -18,8 +18,9 @@ from StringIO import StringIO
import pytest
import yaml
from solar.events.controls import React, Dep
from solar.core.resource import virtual_resource as vr
from solar.events.controls import Dep
from solar.events.controls import React
@pytest.fixture