Resurrected more policy tests

Before the alpha release, we mothballed a number of policy tests and features.
We thought we had saved all the tests for features that were
actually included in the alpha, but we missed some.

This change reactivates those tests.

Change-Id: I42f3ea01d8cb07204321d229de864db43f492c66
This commit is contained in:
Tim Hinrichs 2014-09-05 11:27:37 -07:00
parent 49852cc8d5
commit 88470c89a1
10 changed files with 239 additions and 230 deletions

View File

@ -281,7 +281,7 @@ class Theory(object):
def __str__(self): def __str__(self):
s = "" s = ""
for p in self.policy(): for p in self.content():
s += str(p) + '\n' s += str(p) + '\n'
return s + '\n' return s + '\n'
@ -1905,6 +1905,9 @@ class Runtime (object):
"""Dump the contents of the theory called TARGET into """Dump the contents of the theory called TARGET into
the filename FILENAME. the filename FILENAME.
""" """
d = os.path.dirname(filename)
if not os.path.exists(d):
os.makedirs(d)
with open(filename, "w") as f: with open(filename, "w") as f:
f.write(str(self.theory[target])) f.write(str(self.theory[target]))

View File

@ -620,37 +620,7 @@ class TestRuntime(unittest.TestCase):
def close(self, msg): def close(self, msg):
LOG.debug("** Finished: {} **".format(msg)) LOG.debug("** Finished: {} **".format(msg))
def test_theory_inclusion(self): # TODO(tim): add tests for explanations
"""Test evaluation routines when one theory includes another."""
# spread out across inclusions
th1 = runtime.NonrecursiveRuleTheory()
th2 = runtime.NonrecursiveRuleTheory()
th3 = runtime.NonrecursiveRuleTheory()
th1.includes.append(th2)
th2.includes.append(th3)
th1.insert(str2form('p(x) :- q(x), r(x), s(2)'))
th2.insert(str2form('q(1)'))
th1.insert(str2form('r(1)'))
th3.insert(str2form('s(2)'))
self.check_equal(
pol2str(th1.select(str2form('p(x)'))),
'p(1)', 'Data spread across inclusions')
# real deal
actth = runtime.Runtime.ACTION_THEORY
clsth = runtime.Runtime.CLASSIFY_THEORY
run = self.prep_runtime(msg="Theory Inclusion")
run.insert('q(1)', target=actth)
run.insert('q(2)', target=clsth)
run.insert('p(x) :- q(x), r(x)', target=actth)
run.insert('r(1)', target=actth)
run.insert('r(2)', target=clsth)
self.check_equal(run.select('p(x)', target=actth),
"p(1) p(2)", "Real deal")
# TODO(tim): add tests for explanations
def test_materialized_explain(self): def test_materialized_explain(self):
"""Test the explanation event handler.""" """Test the explanation event handler."""
run = self.prep_runtime("p(x) :- q(x), r(x)", "Explanations") run = self.prep_runtime("p(x) :- q(x), r(x)", "Explanations")
@ -665,122 +635,6 @@ class TestRuntime(unittest.TestCase):
LOG.debug(run.explain("p(1)")) LOG.debug(run.explain("p(1)"))
# self.fail() # self.fail()
def test_nonrecursive_abduction(self):
"""Test abduction for NonrecursiveRuleTheory."""
def check(query, code, tablenames, correct, msg, find_all=True):
# We're interacting directly with the runtime's underlying
# theory b/c we haven't yet decided whether Abduce should
# be a top-level API call.
actth = runtime.Runtime.ACTION_THEORY
run = self.prep_runtime()
actiontheory = run.theory[actth]
run.insert(code, target=actth)
query = compile.parse(query)
actual = actiontheory.abduce(
query[0], tablenames=tablenames, find_all=find_all)
# convert result to string, since check_same expects strings
actual = compile.formulas_to_string(actual)
self.check_same(actual, correct, msg)
code = ('p(x) :- q(x), r(x)'
'q(1)'
'q(2)')
check('p(x)', code, ['r'],
'p(1) :- r(1) p(2) :- r(2)', "Basic monadic")
code = ('p(x) :- q(x), r(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['q'],
'p(1) :- q(1) p(2) :- q(2)', "Late, monadic binding")
code = ('p(x) :- q(x)')
check('p(x)', code, ['q'],
'p(x) :- q(x)', "No binding")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s'],
'p(1) :- s(1) p(2) :- s(2)', "Intermediate table")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s', 't'],
'p(1) :- s(1) p(2) :- s(2) p(1) :- t(1) p(2) :- t(2)',
"Intermediate, disjunctive table")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s'],
'p(1) :- s(1) p(2) :- s(2)',
"Intermediate, disjunctive table, but only some saveable")
code = ('p(x) :- q(x), u(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'u(1)'
'u(2)')
check('p(x)', code, ['s', 't', 'r'],
'p(1) :- s(1), r(1) p(2) :- s(2), r(2)'
'p(1) :- t(1), r(1) p(2) :- t(2), r(2)',
"Multiple support literals")
code = ('p(x) :- q(x,y), s(x), r(y, z)'
'r(2,3)'
'r(2,4)'
's(1)'
's(2)')
check('p(x)', code, ['q'],
'p(1) :- q(1,2) p(2) :- q(2,2)',
"Existential variables that become ground")
code = ('p(x) :- q(x,y), r(y, z)'
'r(2,3)'
'r(2,4)')
check('p(x)', code, ['q'],
'p(x) :- q(x,2) p(x) :- q(x,2)',
"Existential variables that do not become ground")
code = ('p+(x) :- q(x), r(z)'
'r(z) :- s(z), q(x)'
's(1)')
check('p+(x)', code, ['q'],
'p+(x) :- q(x), q(x1)',
"Existential variables with name collision")
def test_nonrecursive_consequences(self):
"""Test consequence computation for nonrecursive rule theory."""
def check(code, correct, msg):
# We're interacting directly with the runtime's underlying
# theory b/c we haven't decided whether consequences should
# be a top-level API call.
run = self.prep_runtime()
actth = runtime.Runtime.ACTION_THEORY
run.insert(code, target=actth)
actual = run.theory[actth].consequences()
# convert result to string, since check_same expects strings
actual = compile.formulas_to_string(actual)
self.check_same(actual, correct, msg)
code = ('p+(x) :- q(x)'
'q(1)'
'q(2)')
check(code, 'p+(1) p+(2) q(1) q(2)', 'Monadic')
code = ('p+(x) :- q(x)'
'p-(x) :- r(x)'
'q(1)'
'q(2)')
check(code, 'p+(1) p+(2) q(1) q(2)', 'Monadic with empty tables')
def test_remediation(self): def test_remediation(self):
"""Test remediation computation.""" """Test remediation computation."""
def check(action_code, classify_code, query, correct, msg): def check(action_code, classify_code, query, correct, msg):
@ -1046,80 +900,6 @@ class TestRuntime(unittest.TestCase):
run.delete('p(2)') run.delete('p(2)')
self.check_equal(run.logger.content(), '', 'Delete') self.check_equal(run.logger.content(), '', 'Delete')
def test_dump_load(self):
"""Test if dumping/loading theories works properly."""
run = runtime.Runtime()
run.debug_mode()
service_theory = ('p(4,"a","bcdef ghi", 17.1) '
'p(5,"a","bcdef ghi", 17.1) '
'p(6,"a","bcdef ghi", 17.1)')
run.insert(service_theory, target=run.SERVICE_THEORY)
full_path = os.path.realpath(__file__)
path = os.path.dirname(full_path)
path = os.path.join(path, "snapshot")
run.dump_dir(path)
run = runtime.Runtime()
run.load_dir(path)
self.check_equal(str(run.theory[run.SERVICE_THEORY]),
service_theory, 'Service theory dump/load')
def test_get_arity(self):
run = runtime.Runtime()
run.debug_mode()
run.insert('p(3)', target=run.DATABASE)
run.insert('q(x) :- p(x)', target=run.CLASSIFY_THEORY)
run.insert('s(x) :- t(x)', target=run.ACTION_THEORY)
self.assertEqual(run.theory[run.DATABASE].get_arity('p'), 1)
self.assertEqual(run.theory[run.CLASSIFY_THEORY].get_arity('p'), 1)
self.assertEqual(run.theory[run.CLASSIFY_THEORY].get_arity('q'), 1)
self.assertIsNone(run.theory[run.DATABASE].get_arity('q'))
self.assertEqual(run.theory[run.ACTION_THEORY].get_arity('s'), 1)
self.assertIsNone(run.theory[run.ACTION_THEORY].get_arity('t'))
def test_multi_policy_update(self):
"""Test updates that apply to multiple policies."""
def create(ac_code, class_code):
acth = run.ACCESSCONTROL_THEORY
permitted, errors = run.insert(ac_code, target=acth)
self.assertTrue(permitted,
"Error in access control policy: {}".format(
runtime.iterstr(errors)))
clsth = run.CLASSIFY_THEORY
permitted, errors = run.insert(class_code, target=clsth)
self.assertTrue(permitted, "Error in classifier policy: {}".format(
runtime.iterstr(errors)))
return run
def check_equal(actual, correct):
self.check_equal(actual, correct)
run = self.prep_runtime()
service = compile.parse("p(1) p(2) q(1) q(3)")
clss = compile.parse("r(1) r(2) t(1) t(4)")
service_th = run.SERVICE_THEORY
clss_th = run.CLASSIFY_THEORY
service = [runtime.Event(formula=x, insert=True, target=service_th)
for x in service]
clss = [runtime.Event(formula=x, insert=True, target=clss_th)
for x in clss]
service.extend(clss)
run.update(service)
check_equal(run.select('p(x)', service_th), 'p(1) p(2)')
check_equal(run.select('q(x)', service_th), 'q(1) q(3)')
check_equal(run.select('r(x)', service_th), 'r(1) r(2)')
check_equal(run.select('t(x)', service_th), 't(1) t(4)')
def test_initialize(self):
"""Test initialize() functionality of Runtime."""
run = self.prep_runtime()
run.insert('p(1) p(2)')
run.initialize(['p'], ['p(3)', 'p(4)'])
self.check_equal(run.select('p(x)'), 'p(3) p(4)')
def test_neutron_actions(self): def test_neutron_actions(self):
"""Test our encoding of the Neutron actions. Use simulation. """Test our encoding of the Neutron actions. Use simulation.
Just the basics. Just the basics.

View File

@ -1 +0,0 @@

View File

@ -1 +0,0 @@

View File

@ -1 +0,0 @@

View File

@ -1,4 +0,0 @@
p(4, "a", "bcdef ghi", 17.1)
p(5, "a", "bcdef ghi", 17.1)
p(6, "a", "bcdef ghi", 17.1)

View File

@ -326,3 +326,114 @@ class TestRuntime(unittest.TestCase):
LOG.debug(trace) LOG.debug(trace)
lines = trace.split('\n') lines = trace.split('\n')
self.assertEqual(len(lines), 14) self.assertEqual(len(lines), 14)
def test_abduction(self):
"""Test abduction (computation of policy fragments)."""
def check(query, code, tablenames, correct, msg, find_all=True):
# We're interacting directly with the runtime's underlying
# theory b/c we haven't yet decided whether Abduce should
# be a top-level API call.
run = self.prep_runtime()
run.insert(code, target=NREC_THEORY)
query = helper.str2form(query)
actual = run.theory[NREC_THEORY].abduce(
query, tablenames=tablenames, find_all=find_all)
e = helper.datalog_same(helper.pol2str(actual), correct, msg)
self.assertTrue(e)
code = ('p(x) :- q(x), r(x)'
'q(1)'
'q(2)')
check('p(x)', code, ['r'],
'p(1) :- r(1) p(2) :- r(2)', "Basic monadic")
code = ('p(x) :- q(x), r(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['q'],
'p(1) :- q(1) p(2) :- q(2)', "Late, monadic binding")
code = ('p(x) :- q(x)')
check('p(x)', code, ['q'],
'p(x) :- q(x)', "No binding")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s'],
'p(1) :- s(1) p(2) :- s(2)', "Intermediate table")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s', 't'],
'p(1) :- s(1) p(2) :- s(2) p(1) :- t(1) p(2) :- t(2)',
"Intermediate, disjunctive table")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s'],
'p(1) :- s(1) p(2) :- s(2)',
"Intermediate, disjunctive table, but only some saveable")
code = ('p(x) :- q(x), u(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'u(1)'
'u(2)')
check('p(x)', code, ['s', 't', 'r'],
'p(1) :- s(1), r(1) p(2) :- s(2), r(2)'
'p(1) :- t(1), r(1) p(2) :- t(2), r(2)',
"Multiple support literals")
code = ('p(x) :- q(x,y), s(x), r(y, z)'
'r(2,3)'
'r(2,4)'
's(1)'
's(2)')
check('p(x)', code, ['q'],
'p(1) :- q(1,2) p(2) :- q(2,2)',
"Existential variables that become ground")
code = ('p(x) :- q(x,y), r(y, z)'
'r(2,3)'
'r(2,4)')
check('p(x)', code, ['q'],
'p(x) :- q(x,2) p(x) :- q(x,2)',
"Existential variables that do not become ground")
code = ('p+(x) :- q(x), r(z)'
'r(z) :- s(z), q(x)'
's(1)')
check('p+(x)', code, ['q'],
'p+(x) :- q(x), q(x1)',
"Existential variables with name collision")
def test_consequences(self):
"""Test computation of all atoms true in a theory."""
def check(code, correct, msg):
# We're interacting directly with the runtime's underlying
# theory b/c we haven't decided whether consequences should
# be a top-level API call.
run = self.prep_runtime()
run.insert(code, target=NREC_THEORY)
actual = run.theory[NREC_THEORY].consequences()
e = helper.datalog_same(helper.pol2str(actual), correct, msg)
self.assertTrue(e)
code = ('p1(x) :- q(x)'
'q(1)'
'q(2)')
check(code, 'p1(1) p1(2) q(1) q(2)', 'Monadic')
code = ('p1(x) :- q(x)'
'p2(x) :- r(x)'
'q(1)'
'q(2)')
check(code, 'p1(1) p1(2) q(1) q(2)', 'Monadic with empty tables')

View File

@ -0,0 +1,116 @@
# Copyright (c) 2014 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
import unittest
from congress.openstack.common import log as logging
from congress.policy import runtime
from congress.tests import helper
LOG = logging.getLogger(__name__)
NREC_THEORY = 'non-recursive theory'
class TestRuntime(unittest.TestCase):
"""Tests for Runtime that are not specific to any theory."""
def setUp(self):
pass
def check_equal(self, actual_string, correct_string, msg):
self.assertTrue(helper.datalog_equal(
actual_string, correct_string, msg))
def test_theory_inclusion(self):
"""Test evaluation routines when one theory includes another."""
# spread out across inclusions
th1 = runtime.NonrecursiveRuleTheory()
th2 = runtime.NonrecursiveRuleTheory()
th3 = runtime.NonrecursiveRuleTheory()
th1.includes.append(th2)
th2.includes.append(th3)
th1.insert(helper.str2form('p(x) :- q(x), r(x), s(2)'))
th2.insert(helper.str2form('q(1)'))
th1.insert(helper.str2form('r(1)'))
th3.insert(helper.str2form('s(2)'))
self.check_equal(
helper.pol2str(th1.select(helper.str2form('p(x)'))),
'p(1)', 'Data spread across inclusions')
# TODO(thinrichs): add tests with other types of theories,
# once we get those other theory types cleaned up.
def test_get_arity(self):
run = runtime.Runtime()
run.debug_mode()
th = runtime.NonrecursiveRuleTheory()
th.insert(helper.str2form('q(x) :- p(x)'))
th.insert(helper.str2form('p(x) :- s(x)'))
self.assertEqual(th.get_arity('p'), 1)
self.assertEqual(th.get_arity('q'), 1)
self.assertIsNone(th.get_arity('s'))
self.assertIsNone(th.get_arity('missing'))
def test_multi_policy_update(self):
"""Test updates that apply to multiple policies."""
def check_equal(actual, correct):
e = helper.datalog_equal(actual, correct)
self.assertTrue(e)
run = runtime.Runtime()
run.theory['th1'] = runtime.NonrecursiveRuleTheory()
run.theory['th2'] = runtime.NonrecursiveRuleTheory()
events1 = [runtime.Event(formula=x, insert=True, target='th1')
for x in helper.str2pol("p(1) p(2) q(1) q(3)")]
events2 = [runtime.Event(formula=x, insert=True, target='th2')
for x in helper.str2pol("r(1) r(2) t(1) t(4)")]
run.update(events1 + events2)
check_equal(run.select('p(x)', 'th1'), 'p(1) p(2)')
check_equal(run.select('q(x)', 'th1'), 'q(1) q(3)')
check_equal(run.select('r(x)', 'th2'), 'r(1) r(2)')
check_equal(run.select('t(x)', 'th2'), 't(1) t(4)')
def test_initialize(self):
"""Test initialize() functionality of Runtime."""
run = runtime.Runtime()
run.insert('p(1) p(2)')
run.initialize(['p'], ['p(3)', 'p(4)'])
e = helper.datalog_equal(run.select('p(x)'), 'p(3) p(4)')
self.assertTrue(e)
def test_dump_load(self):
"""Test if dumping/loading theories works properly."""
run = runtime.Runtime()
run.debug_mode()
policy = ('p(4,"a","bcdef ghi", 17.1) '
'p(5,"a","bcdef ghi", 17.1) '
'p(6,"a","bcdef ghi", 17.1)')
run.insert(policy)
full_path = os.path.realpath(__file__)
path = os.path.dirname(full_path)
path = os.path.join(path, "snapshot")
run.dump_dir(path)
run = runtime.Runtime()
run.load_dir(path)
e = helper.datalog_equal(str(run.theory[run.DEFAULT_THEORY]),
policy, 'Service theory dump/load')
self.assertTrue(e)

View File

@ -20,6 +20,7 @@ import time
from congress.openstack.common import log as logging from congress.openstack.common import log as logging
from congress.policy import compile from congress.policy import compile
from congress.policy import runtime from congress.policy import runtime
from congress.policy import unify
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -100,6 +101,12 @@ def pause(factor=1):
time.sleep(factor * 1) time.sleep(factor * 1)
def datalog_same(actual_code, correct_code, msg=None):
return datalog_equal(
actual_code, correct_code, msg=msg,
equal=lambda x, y: unify.same(x, y) is not None)
def datalog_equal(actual_code, correct_code, def datalog_equal(actual_code, correct_code,
msg=None, equal=None): msg=None, equal=None):
"""Check if the strings given by actual_code """Check if the strings given by actual_code