Added top-down evaluation for NonrecursiveRuleTheory

No support for negation or included theories.

All tests pass.

Issue: #
Change-Id: I2a55d13a699ff497833a2fc72f116f8f217f5861
This commit is contained in:
Tim Hinrichs 2013-10-03 16:14:06 -07:00
parent bcc8dc0417
commit a8778cd4b6
5 changed files with 576 additions and 150 deletions

4
examples/recursion Normal file
View File

@ -0,0 +1,4 @@
connected(x,y) :- link(x,y)
connected(x,y) :- link(x,z), connected(z,y)

View File

@ -7,14 +7,17 @@ import optparse
import CongressLexer
import CongressParser
import antlr3
import runtime
import logging
import copy
import runtime
class CongressException (Exception):
def __init__(self, msg, obj=None, line=None, col=None):
Exception.__init__(self, msg)
self.obj = obj
self.location = Location(line=line, col=col, obj=obj)
def __str__(self):
s = str(self.location)
if len(s) > 0:
@ -46,6 +49,13 @@ class Location (object):
s += " col: {}".format(self.col)
return s
def __repr__(self):
return "Location(line={}, col={})".format(
repr(self.line), repr(self.col))
def __hash__(self):
return hash(self.__repr__())
class Term(object):
""" Represents the union of Variable and ObjectConstant. Should
only be instantiated via factory method. """
@ -71,11 +81,18 @@ class Variable (Term):
self.location = location
def __str__(self):
return '?' + str(self.name)
return str(self.name)
def __eq__(self, other):
return isinstance(other, Variable) and self.name == other.name
def __repr__(self):
return "Variable(name={}, location={})".format(
repr(self.name), repr(self.location))
def __hash__(self):
return hash("Variable(name={})".format(repr(self.name)))
def is_variable(self):
return True
@ -97,6 +114,14 @@ class ObjectConstant (Term):
def __str__(self):
return str(self.name)
def __repr__(self):
return "ObjectConstant(name={}, type={}, location={})".format(
repr(self.name), repr(self.type), repr(self.location))
def __hash__(self):
return hash("ObjectConstant(name={}, type={})".format(
repr(self.name), repr(self.type)))
def __eq__(self, other):
return (isinstance(other, ObjectConstant) and
self.name == other.name and
@ -141,6 +166,17 @@ class Atom (object):
all(self.arguments[i] == other.arguments[i]
for i in xrange(0, len(self.arguments))))
def __repr__(self):
return "Atom(table={}, arguments={}, location={})".format(
repr(self.table),
"[" + ",".join(repr(arg) for arg in self.arguments) + "]",
repr(self.location))
def __hash__(self):
return hash("Atom(table={}, arguments={})".format(
repr(self.table),
"[" + ",".join(repr(arg) for arg in self.arguments) + "]"))
def is_atom(self):
return True
@ -166,6 +202,17 @@ class Atom (object):
args.append(arg)
return Literal(self.table, args)
def plug_new(self, unifier):
# Uses Unifier instead of dictionary. Should replace plug with
# this one
new = copy.copy(self)
args = []
for arg in self.arguments:
args.append(unifier.apply(arg))
new.arguments = args
return new
class Literal(Atom):
""" Represents either a negated atom or an atom. """
def __init__(self, table, arguments, negated=False, location=None):
@ -181,6 +228,19 @@ class Literal(Atom):
def __eq__(self, other):
return (self.negated == other.negated and Atom.__eq__(self, other))
def __repr__(self):
return "Literal(table={}, arguments={}, location={}, negated={})".format(
repr(self.table),
"[" + ",".join(repr(arg) for arg in self.arguments) + "]",
repr(self.location),
repr(self.negated))
def __hash__(self):
return hash("Literal(table={}, arguments={}, negated={})".format(
repr(self.table),
"[" + ",".join(repr(arg) for arg in self.arguments) + "]",
repr(self.negated)))
def is_negated(self):
return self.negated
@ -217,6 +277,18 @@ class Rule (object):
all(self.body[i] == other.body[i]
for i in xrange(0, len(self.body))))
def __repr__(self):
return "Rule(head={}, body={}, location={})".format(
repr(self.head),
"[" + ",".join(repr(arg) for arg in self.body) + "]",
repr(self.location))
def __hash__(self):
return hash("Rule(head={}, body={})".format(
repr(self.head),
"[" + ",".join(repr(arg) for arg in self.body) + "]"))
def is_atom(self):
return False
@ -502,6 +574,17 @@ def print_tree(tree, text, kids, ind=0):
def get_compiled(args):
""" Run compiler as per ARGS and return the resulting Compiler instance. """
compiler = get_compiler(args)
compiler.compute_delta_rules()
return compiler
def get_parsed(args):
""" Run compiler as per ARGS and return the parsed rules. """
compiler = get_compiler(args)
return compiler.theory
def get_compiler(args):
""" Run compiler as per ARGS and return the compiler object. """
# assumes script name is not passed
parser = optparse.OptionParser()
parser.add_option("--input_string", dest="input_string", default=False,
@ -512,9 +595,9 @@ def get_compiled(args):
compiler = Compiler()
for i in inputs:
compiler.read_source(i, input_string=options.input_string)
compiler.compute_delta_rules()
return compiler
def get_runtime(args):
""" Create runtime by running compiler as per ARGS and initializing runtime
with result of compilation. """

View File

@ -3,6 +3,8 @@
import collections
import logging
import compile
import unify
import copy
class Tracer(object):
def __init__(self):
@ -412,95 +414,18 @@ class DeltaRule(object):
all(self.body[i] == other.body[i]
for i in xrange(0, len(self.body))))
class Unifier (object):
""" Store assignments of variables.
Assignments are not transitively closed, so
applying { x->y, y->z } to x yields z. """
def __init__(self):
self.bindings = {}
def add(self, var, value):
""" Add the variable assignment VAR -> VALUE.
Returns an object that when given to DELETE,
undoes the ADD. """
self.bindings[var] = value
return var
def apply(self, thing):
""" Return value that THING is assigned, or THING. """
# transitively close bindings
if self.bindings.has_key(thing):
return self.apply(self.bindings[thing])
else:
return thing
def delete(self, thing):
""" Deletes assignment of THING
from binding list."""
if thing is not None:
del self.bindings[thing]
def unify(self, thing1, thing2):
""" Modify SELF if possible to make thing1 == thing2.
Return 2 values: whether it is possible and the variable
changed to perform the unifier or None. """
val1 = self.apply(thing1)
val2 = self.apply(thing2)
if val1 == val2:
return (True, None)
if self.is_variable(val1):
change = self.add(val1, val2)
return (True, change)
elif self.is_variable(val2):
change = self.add(val2, val1)
return (True, change)
else:
return (False, None)
def is_variable (self, thing):
""" Checks whether THING is a variable. """
try:
return isinstance(thing, basestring) and thing[0] == '?'
except TypeError:
return False
def plug(self, thing):
""" Creates a copy of thing but with all variables replaced
as per the unifier. Given an iterable, recursively applies
itself to each element of the iterable, putting the result
in a list. """
if isinstance(thing, (basestring, int, long, float)):
return self.apply(thing)
elif isinstance(thing, dict):
new = {}
for key in thing:
newkey = self.plug(key)
newvalue = self.plug(thing[key])
new[newkey] = newvalue
return new
elif isinstance(thing, set):
new = set()
for x in thing:
newx = self.plug(x)
new.add(newx)
return new
elif isinstance(thing, list):
return [self.plug(x) for x in thing]
elif isinstance(thing, tuple):
return tuple([self.plug(x) for x in thing])
else:
raise TypeError(
"Plug does not handle type {}".format(type(thing)))
def __str__ (self):
return str(self.bindings)
##############################################################################
## Theories
##############################################################################
class RuleTheory(object):
""" A collection of Rules. """
def new_BiUnifier():
return unify.BiUnifier(lambda (index):
compile.Variable("x" + str(index)))
class NonrecursiveRuleTheory(object):
""" A non-recursive collection of Rules. """
def __init__(self, rules=None):
# dictionary from table name to list of rules with that table in head
self.contents = {}
@ -514,53 +439,95 @@ class RuleTheory(object):
return str(self.contents)
def insert(self, rule):
if isinstance(rule, compile.Atom):
rule = compile.Rule(rule, [], rule.location)
table = rule.head.table
if table in self.contents:
if rule not in self.content[table]: # eliminate dups
if rule not in self.contents[table]: # eliminate dups
self.contents[table].append(rule)
else:
self.contents[table] = [rule]
def delete(self, rule):
if isinstance(rule, compile.Atom):
rule = compile.Rule(rule, [], rule.location)
table = rule.head.table
if table in self.contents:
self.contents[table].remove(rule)
def select(self, query):
""" Return tuples in which QUERY is true. Current implementation
ignores rules with non-empty bodies and requires QUERY to be atomic."""
assert isinstance(query, compile.Atom), "Only have support for atoms"
results = set()
for rule in self.contents[query.table]:
if not isinstance(query, compile.Atom):
continue
if match(query, rule) is not None:
results.add(tuple([arg.name for arg in rule.arguments]))
# simple implementation b/c we're ignoring real rules.
# full implementation will need to check other theories in
# inner evaluation loop.
for theory in self.includes:
results |= theory.select(query)
return results
class TopDownContext(object):
""" Struct for storing the search state of top-down evaluation """
def __init__(self, literals, literal_index, binding, context, depth):
self.literals = literals
self.literal_index = literal_index
self.binding = binding
self.previous = context
self.depth = depth
def match(atom1, atom2):
""" Return Unifier, if it exists, that when applied to
ATOM1 results in the ground ATOM2. """
if len(atom1.arguments) != len(atom2.arguments):
return None
assert (all(not arg.is_variable() for arg in atom2.arguments),
"Match requires ATOM2 have no variables")
binding = Unifier()
for i in xrange(0, len(atom1.arguments)):
arg = atom1.arguments[i]
if arg.is_variable():
if arg.name in binding:
oldval = binding.apply(arg.name)
if oldval != atom2.arguments[i]:
return None
else:
binding.add(arg.name, atom2.arguments[i])
return binding
def __str__(self):
return ("TopDownContext<literals={}, literal_index={}, binding={}, "
"previous={}, depth={}>").format(
"[" + ",".join([str(x) for x in self.literals]) + "]",
str(self.literal_index), str(self.binding),
str(self.previous), str(self.depth))
class TopDownCaller(object):
""" Struct for storing info about the original caller of top-down
evaluation. QUERY is the initial query requested, ANSWERS
is populated by top-down evaluation: it is the list of QUERY
instances that the search process proved true.
MAX_ANSWERS is the largest number of answers top-down should
find; setting to None finds all."""
def __init__(self, query, binding, max_answers=1):
self.query = query
self.binding = binding
self.answers = []
self.max_answers = max_answers
def __str__(self):
return "TopDownCaller<query={}, binding={}, answers={}>".format(
str(self.query), str(self.binding), str(self.answers))
def select(self, query, max_answers=1):
""" Return tuples in which QUERY is true. """
# No unit test for MAX_ANSWERS--don't yet support it in the Runtime
# May not even be necessary.
assert (isinstance(query, compile.Atom) or
isinstance(query, compile.Rule)), "Query must be atom/rule"
if isinstance(query, compile.Atom):
literals = [query]
else:
literals = query.body
unifier = new_BiUnifier()
context = self.TopDownContext(literals, 0, unifier, None, 0)
caller = self.TopDownCaller(query, unifier, max_answers=max_answers)
self.top_down_eval(context, caller)
logging.debug(caller.answers)
if len(caller.answers) > 0:
logging.debug("Found answer {}".format(
"[" + ",".join([str(x) for x in caller.answers]) + "]"))
return [str(x) for x in caller.answers]
else:
return []
# def match(atom1, atom2):
# """ Return Unifier, if it exists, that when applied to
# ATOM1 results in the ground ATOM2. """
# if len(atom1.arguments) != len(atom2.arguments):
# return None
# assert all(not arg.is_variable() for arg in atom2.arguments), \
# "Match requires ATOM2 have no variables"
# binding = Unifier()
# for i in xrange(0, len(atom1.arguments)):
# arg = atom1.arguments[i]
# if arg.is_variable():
# if arg.name in binding:
# oldval = binding.apply(arg.name)
# if oldval != atom2.arguments[i]:
# return None
# else:
# binding.add(arg.name, atom2.arguments[i])
# return binding
def return_true(*args):
return True
@ -571,18 +538,102 @@ class RuleTheory(object):
instance of QUERY). """
assert False, "Not yet implemented"
class TopDownContext(object):
def __init__(self, literals, literal_index, binding):
self.literals = literals
self.literal_index = literal_index
self.binding = binding
def top_down_evaluation(self, literals, literal_index, context, binding):
def top_down_eval(self, context, caller):
""" Compute all instances of LITERALS (from LITERAL_INDEX and above)
that are true according to the theory (after applying the
unifier BINDING to LITERALS). Returns a list of dictionary
bindings. """
assert False, "Not yet implemented"
unifier BINDING to LITERALS). Returns False or an answer. """
# no recursion, ever; this style of algorithm will never halt
# on recursive rules
# no negation/recursion/included theories for now.
return self.top_down_th(context, caller)
def top_down_th(self, context, caller):
""" Top-down evaluation for just the rules in SELF.CONTENTS. """
# logging.debug("top_down_th({})".format(str(context)))
depth = context.depth
binding = context.binding
if context.literal_index > len(context.literals) - 1:
return True
lit = context.literals[context.literal_index]
self.top_down_call(lit, binding, depth)
if lit.table not in self.contents:
return self.top_down_fail(lit, binding, depth)
for rule in self.contents[lit.table]:
unifier = new_BiUnifier()
# Prefer to bind vars in rule head
undo = unify.bi_unify_atoms(rule.head, unifier, lit, binding)
# self.log(lit.table, "Rule: {}, Unifier: {}, Undo: {}".format(
# str(rule), str(unifier), str(undo)))
if undo is None: # no unifier
continue
if len(rule.body) == 0:
if self.top_down_th_finish(context, caller):
unify.undo_all(undo)
return True
else:
unify.undo_all(undo)
else:
new_context = self.TopDownContext(rule.body, 0,
unifier, context, depth + 1)
if self.top_down_eval(new_context, caller):
unify.undo_all(undo)
return True
else:
unify.undo_all(undo)
return self.top_down_fail(lit, binding, depth)
def top_down_th_finish(self, context, caller):
""" Helper that is called once top_down successfully completes
a proof for a literal. Handles (i) continuing search
for those literals still requiring proofs within CONTEXT,
(ii) adding solutions to CALLER once all needed proofs have
been found, and (iii) printing out Redo/Exit during tracing.
Returns True if the search is finished and False otherwise.
Temporary, transparent modification of CONTEXT."""
if context is None:
# plug now before we undo the bindings
caller.answers.append(caller.query.plug_new(caller.binding))
# return True iff the search is finished.
if caller.max_answers is None:
return False
return len(caller.answers) >= caller.max_answers
else:
self.top_down_exit(context.literals[context.literal_index],
context.binding, context.depth)
# continue the search
if context.literal_index < len(context.literals) - 1:
context.literal_index += 1
finished = self.top_down_eval(context, caller)
context.literal_index -= 1 # in case answer is False
else:
finished = self.top_down_th_finish(context.previous, caller)
# return search result (after printing a Redo if failure)
if not finished:
self.top_down_redo(context.literals[context.literal_index],
context.binding, context.depth)
return finished
def top_down_call(self, literal, binding, depth):
self.log(literal.table, "{}Call: {} with {}".format("| "*depth,
literal.plug_new(binding), str(binding)))
def top_down_exit(self, literal, binding, depth):
self.log(literal.table, "{}Exit: {} with {}".format("| "*depth,
literal.plug_new(binding), str(binding)))
def top_down_fail(self, literal, binding, depth):
self.log(literal.table, "{}Fail: {} with {}".format("| "*depth,
literal.plug_new(binding), str(binding)))
return False
def top_down_redo(self, literal, binding, depth):
self.log(literal.table, "{}Redo: {} with {}".format("| "*depth,
literal.plug_new(binding), str(binding)))
return False
def log(self, table, msg, depth=0):
self.tracer.log(table, "RuleTh: " + msg, depth)
class DeltaRuleTheory (object):
""" A collection of DeltaRules. """
@ -636,7 +687,8 @@ class DeltaRuleTheory (object):
return x in self.views
class MaterializedRuleTheory(object):
""" A theory that stores the table contents explicitly. """
""" A theory that stores the table contents explicitly.
Recursive rules are allowed. """
def __init__(self):
# queue of events left to process
@ -653,9 +705,9 @@ class MaterializedRuleTheory(object):
def select(self, query):
# should generalize to at least a conjunction of atoms.
# Need to change compiler a bit, but runtime should be fine.
assert ((isinstance(query, compile.Atom) or
isinstance(query, compile.Rule)),
"Only have support for atoms")
assert (isinstance(query, compile.Atom) or
isinstance(query, compile.Rule)), \
"Only have support for atoms"
return self.database.select(query)
def insert(self, formula):
@ -695,8 +747,8 @@ class MaterializedRuleTheory(object):
def modify(self, formula, is_insert=True):
""" Event handler for arbitrary insertion/deletion (rules and facts). """
if formula.is_atom():
assert (not self.is_view(formula.table),
"Cannot directly modify tables computed from other tables")
assert not self.is_view(formula.table), \
"Cannot directly modify tables computed from other tables"
args = tuple([arg.name for arg in formula.arguments])
self.modify_tables_with_tuple(
formula.table, args, is_insert=is_insert)
@ -838,8 +890,8 @@ class Runtime (object):
# collection of theories
self.theory = {}
self.theory[self.CLASSIFY_THEORY] = MaterializedRuleTheory()
self.theory[self.SERVICE_THEORY] = RuleTheory()
self.theory[self.ACTION_THEORY] = RuleTheory()
self.theory[self.SERVICE_THEORY] = NonrecursiveRuleTheory()
self.theory[self.ACTION_THEORY] = NonrecursiveRuleTheory()
# Service/Action theories build upon Classify theory
self.theory[self.SERVICE_THEORY].includes.append(
self.theory[self.CLASSIFY_THEORY])
@ -926,9 +978,9 @@ class Runtime (object):
s += ')'
return s
c = compile.get_compiled(['--input_string', policy_string])
assert (len(c.theory) == 1,
assert len(c.theory) == 1, \
"Queries can have only 1 statement: {}".format(
[str(x) for x in c.theory]))
[str(x) for x in c.theory])
results = self.select_obj(c.theory[0], theory)
return " ".join([str(x) for x in results])

View File

@ -4,6 +4,7 @@
import unittest
from policy import compile
from policy import runtime
from policy import unify
from policy.runtime import Database
import logging
@ -12,12 +13,12 @@ class TestRuntime(unittest.TestCase):
def setUp(self):
pass
def prep_runtime(self, code, msg=None):
def prep_runtime(self, code, msg=None, target=None):
# compile source
if msg is not None:
logging.debug(msg)
run = runtime.Runtime()
run.insert(code)
run.insert(code, target=target)
tracer = runtime.Tracer()
tracer.trace('*')
run.tracer = tracer
@ -61,14 +62,14 @@ class TestRuntime(unittest.TestCase):
def check(self, run, correct_database_code, msg=None):
# extract correct answer from correct_database_code
logging.debug("** Checking {} **".format(msg))
logging.debug("** Checking: {} **".format(msg))
correct_database = self.string_to_database(correct_database_code)
self.check_db_diffs(run.theory[run.CLASSIFY_THEORY].database,
correct_database, msg)
logging.debug("** Finished {} **".format(msg))
logging.debug("** Finished: {} **".format(msg))
def check_equal(self, actual_code, correct_code, msg=None):
logging.debug("** Checking equality for {} **".format(msg))
logging.debug("** Checking equality: {} **".format(msg))
actual = compile.get_compiled([actual_code, '--input_string'])
correct = compile.get_compiled([correct_code, '--input_string'])
extra = []
@ -80,7 +81,7 @@ class TestRuntime(unittest.TestCase):
if formula not in actual.theory:
missing.append(formula)
self.output_diffs(extra, missing, msg)
logging.debug("** Finished for {} **".format(msg))
logging.debug("** Finished: {} **".format(msg))
def check_proofs(self, run, correct, msg=None):
""" Check that the proofs stored in runtime RUN are exactly
@ -109,8 +110,6 @@ class TestRuntime(unittest.TestCase):
# logging.debug("Check_proof errors:\n{}".format("\n".join(errs)))
self.fail("\n".join(errs))
def showdb(self, run):
logging.debug("Resulting DB: {}".format(
str(run.theory[run.CLASSIFY_THEORY].database)))
@ -474,5 +473,173 @@ class TestRuntime(unittest.TestCase):
logging.debug(run.explain("p(1)"))
# self.fail()
def check_unify(self, atom_string1, atom_string2, msg, change_num):
""" Check that bi-unification succeeds. """
logging.debug("** Checking: {} **".format(msg))
unifier1 = runtime.new_BiUnifier()
unifier2 = runtime.new_BiUnifier()
p1 = compile.get_parsed([atom_string1, '--input_string'])[0]
p2 = compile.get_parsed([atom_string2, '--input_string'])[0]
changes = unify.bi_unify_atoms(p1, unifier1, p2, unifier2)
self.assertTrue(p1 != p2)
p1p = p1.plug_new(unifier1)
p2p = p2.plug_new(unifier2)
if not p1p == p2p:
logging.debug("Failure: bi-unify({}, {}) produced {} and {}".format(
str(p1), str(p2), str(unifier1), str(unifier2)))
logging.debug("plug({}, {}) = {}".format(
str(p1), str(unifier1), str(p1p)))
logging.debug("plug({}, {}) = {}".format(
str(p2), str(unifier2), str(p2p)))
self.fail()
self.assertTrue(changes is not None)
if change_num is not None:
self.assertEqual(len(changes), change_num)
unify.undo_all(changes)
self.assertEqual(p1.plug_new(unifier1), p1)
self.assertEqual(p2.plug_new(unifier2), p2)
logging.debug("** Finished: {} **".format(msg))
def check_unify_fail(self, atom_string1, atom_string2, msg):
""" Check that the bi-unification fails. """
unifier1 = runtime.new_BiUnifier()
unifier2 = runtime.new_BiUnifier()
p1 = compile.get_parsed([atom_string1, '--input_string'])[0]
p2 = compile.get_parsed([atom_string2, '--input_string'])[0]
changes = unify.bi_unify_atoms(p1, unifier1, p2, unifier2)
if changes is not None:
logging.debug(
"Failure failure: bi-unify({}, {}) produced {} and {}".format(
str(p1), str(p2), str(unifier1), str(unifier2)))
logging.debug("plug({}, {}) = {}".format(
str(p1), str(unifier1), str(p1.plug_new(unifier1))))
logging.debug("plug({}, {}) = {}".format(
str(p2), str(unifier2), str(p2.plug_new(unifier2))))
self.fail()
def test_bi_unify(self):
""" Test the bi-unification routine. """
self.check_unify("p(x)", "p(1)",
"Matching", 1)
self.check_unify("p(x,y)", "p(1,2)",
"Binary Matching", 2)
self.check_unify("p(1,2)", "p(x,y)",
"Binary Matching Reversed", 2)
self.check_unify("p(1,1)", "p(x,y)",
"Binary Matching Many-to-1", 2)
self.check_unify_fail("p(1,2)", "p(x,x)",
"Binary Matching Failure")
self.check_unify("p(1,x)", "p(1,y)",
"Simple Unification", 1)
self.check_unify("p(1,x)", "p(y,1)",
"Separate Namespace Unification", 2)
self.check_unify("p(1,x)", "p(x,2)",
"Namespace Collision Unification", 2)
self.check_unify("p(x,y,z)", "p(t,u,v)",
"Variable-only Unification", 3)
self.check_unify("p(x,y,y)", "p(t,u,t)",
"Repeated Variable Unification", 3)
self.check_unify_fail("p(x,y,y,x,y)", "p(t,u,t,1,2)",
"Repeated Variable Unification Failure")
self.check_unify("p(x,y,y)", "p(x,y,x)",
"Repeated Variable Unification Namespace Collision", 3)
self.check_unify_fail("p(x,y,y,x,y)", "p(x,y,x,1,2)",
"Repeated Variable Unification Namespace Collision Failure")
def test_rule_select(self):
""" Test top-down evaluation for RuleTheory. """
th = runtime.Runtime.ACTION_THEORY
run = self.prep_runtime('p(1)', target=th)
self.check_equal(run.select('p(1)', target=th), "p(1)",
"Simple lookup")
self.check_equal(run.select('p(2)', target=th), "",
"Failed lookup")
run = self.prep_runtime('p(1)', target=th)
self.check_equal(run.select('p(x)', target=th), "p(1)",
"Variablized lookup")
run = self.prep_runtime('p(x) :- q(x)'
'q(x) :- r(x)'
'r(1)', target=th)
self.check_equal(run.select('p(1)', target=th), "p(1)",
"Monadic rules")
self.check_equal(run.select('p(2)', target=th), "",
"False monadic rules")
self.check_equal(run.select('p(x)', target=th), "p(1)",
"Variablized query with monadic rules")
run = self.prep_runtime('p(x) :- q(x)'
'q(x) :- r(x)'
'q(x) :- s(x)'
'r(1)'
's(2)', target=th)
self.check_equal(run.select('p(1)', target=th), "p(1)",
"Monadic, disjunctive rules 1")
self.check_equal(run.select('p(2)', target=th), "p(2)",
"Monadic, disjunctive rules 2")
self.check_equal(run.select('p(x)', target=th), "p(1)",
"Monadic, disjunctive rules 2")
self.check_equal(run.select('p(3)', target=th), "",
"False Monadic, disjunctive rules")
run = self.prep_runtime('p(x) :- q(x), r(x)'
'q(1)'
'r(1)'
'r(2)'
'q(2)'
'q(3)', target=th)
self.check_equal(run.select('p(1)', target=th), "p(1)",
"Multiple literals in body")
self.check_equal(run.select('p(2)', target=th), "p(2)",
"Multiple literals in body answer 2")
self.check_equal(run.select('p(x)', target=th), "p(1)",
"Multiple literals in body variablized")
self.check_equal(run.select('p(3)', target=th), "",
"False multiple literals in body")
run = self.prep_runtime('p(x) :- q(x), r(x)'
'q(1)'
'r(2)', target=th)
self.check_equal(run.select('p(x)', target=th), "",
"False variablized multiple literals in body")
run = self.prep_runtime('p(x,y) :- q(x,z), r(z, y)'
'q(1,1)'
'q(1,2)'
'r(1,3)'
'r(1,4)'
'r(2,5)', target=th)
self.check_equal(run.select('p(1,3)', target=th), "p(1,3)",
"Binary, existential rules 1")
self.check_equal(run.select('p(1,4)', target=th), "p(1,4)",
"Binary, existential rules 2")
self.check_equal(run.select('p(1,5)', target=th), "p(1,5)",
"Binary, existential rules 3")
self.check_equal(run.select('p(1,1)', target=th), "",
"False binary, existential rules")
run = self.prep_runtime('p(x) :- q(x), r(x)'
'q(y) :- t(y), s(x)'
's(1)'
'r(2)'
't(2)', target=th)
self.check_equal(run.select('p(2)', target=th), "p(2)",
"Distinct variable namespaces across rules")
run = self.prep_runtime('p(x,y) :- q(x,z), r(z,y)'
'q(x,y) :- s(x,z), t(z,y)'
's(x,y) :- u(x,z), v(z,y)'
'u(1,2)'
'v(2,3)'
't(3,4)'
'r(4,5)', target=th)
self.check_equal(run.select('p(1,5)', target=th), "p(1,5)",
"Tower of existential variables")
self.check_equal(run.select('p(x,y)', target=th), "p(1,5)",
"Tower of existential variables")
if __name__ == '__main__':
unittest.main()

120
src/policy/unify.py Normal file
View File

@ -0,0 +1,120 @@
#! /usr/bin/python
import logging
# A unifier designed for the bi_unify_atoms routine
# which is used by a backward-chaining style datalog implementation.
# Main goal: minimize memory allocation by manipulating only unifiers
# to keep variable namespaces separate.
class BiUnifier(object):
""" A unifier designed for bi_unify_atoms. Recursive
datastructure. When adding a binding variable u to
variable v, keeps a reference to the unifier for v.
A variable's identity is its name plus its unification context.
This enables a variable with the same name but from two
different atoms to be treated as different variables. """
class Value(object):
def __init__(self, value, unifier):
# actual value
self.value = value
# unifier context
self.unifier = unifier
def __str__(self):
return "<{},{}>".format(
str(self.value), repr(self.unifier))
class Undo(object):
def __init__(self, var, unifier):
self.var = var
self.unifier = unifier
def __str__(self):
return "<var: {}, unifier: {}>".format(
str(self.var), str(self.unifier))
def __init__(self, new_variable_factory):
# each value is a Value
self.contents = {}
self.new_variable_counter = 0
self.new_variable_factory = new_variable_factory
def add(self, var, value, unifier):
value = self.Value(value, unifier)
self.contents[var] = value
# logging.debug("Adding {} -> {} to unifier {}".format(
# str(var), str(value), str(self)))
return self.Undo(var, self)
def delete(self, var):
if var in self.contents:
del self.contents[var]
def apply(self, term):
return self.apply_full(term)[0]
def apply_full(self, term):
""" Recursively apply unifiers to TERM and return
(i) the final value and (ii) the final unifier. """
if not term.is_variable() or term not in self.contents:
return (term, self)
value = self.contents[term]
# logging.debug("Apply({}): has contents: {}".format(term, str(value)))
if value.unifier is not None:
return value.unifier.apply_full(value.value)
else:
return (value.value, self)
def __str__(self):
s = repr(self)
s += "={"
s += ",".join(["{}:{}".format(var, str(val))
for var, val in self.contents.iteritems()])
s += "}"
return s
def undo_all(changes):
""" Undo all the changes in CHANGES. """
# logging.debug("undo_all({})".format(
# "[" + ",".join([str(x) for x in changes]) + "]"))
for change in changes:
if change.unifier is not None:
change.unifier.delete(change.var)
def bi_unify_atoms(atom1, unifier1, atom2, unifier2):
""" If possible, modify BiUnifier UNIFIER1 and BiUnifier UNIFIER2 so that
ATOM1.plug(UNIFIER1) == ATOM2.plug(UNIFIER2).
Returns None if not possible; otherwise, returns
a list of changes to unifiers that can be undone
with undo-all. May alter unifiers besides UNIFIER1 and UNIFIER2. """
# logging.debug("Unifying {} under {} and {} under {}".format(
# str(atom1), str(unifier1), str(atom2), str(unifier2)))
if atom1.table != atom2.table:
return None
if len(atom1.arguments) != len(atom2.arguments):
return None
changes = []
for i in xrange(0, len(atom1.arguments)):
val1, binding1 = unifier1.apply_full(atom1.arguments[i])
val2, binding2 = unifier2.apply_full(atom2.arguments[i])
# logging.debug("val({})={} at {}, val({})={} at {}".format(
# str(atom1.arguments[i]), str(val1), str(unifier1),
# str(atom2.arguments[i]), str(val2), str(unifier2)))
if not val1.is_variable() and val1 == val2:
continue
elif val1 is val2 and binding1 is binding2:
continue
elif val1.is_variable():
changes.append(binding1.add(val1, val2, binding2))
elif val2.is_variable():
changes.append(binding2.add(val2, val1, binding1))
else:
undo_all(changes)
return None
return changes