Added abduction to top_down_eval suite

Abduction can be applied to compute two things: (i) an explanation (or all
explanations) as to why a tuple belongs to a table in terms
of any collection of other tables and (ii) a collection of
actions that if executed should add/remove a tuple from a table.

All tests pass

Issue: #
Change-Id: I87068133567fcd396962053ada567b93d4737cb6
This commit is contained in:
Tim Hinrichs 2013-10-11 13:08:40 -07:00
parent c3bf002e4c
commit ad9941d0b0
4 changed files with 463 additions and 95 deletions

View File

@ -91,6 +91,9 @@ class Variable (Term):
def __eq__(self, other):
return isinstance(other, Variable) and self.name == other.name
def __ne__(self, other):
return not self == other
def __repr__(self):
return "Variable(name={}, location={})".format(
repr(self.name), repr(self.location))
@ -132,6 +135,9 @@ class ObjectConstant (Term):
self.name == other.name and
self.type == other.type)
def __ne__(self, other):
return not self == other
def is_variable(self):
return False
@ -171,6 +177,9 @@ class Atom (object):
all(self.arguments[i] == other.arguments[i]
for i in xrange(0, len(self.arguments))))
def __ne__(self, other):
return not self == other
def __repr__(self):
return "Atom(table={}, arguments={}, location={})".format(
repr(self.table),
@ -200,8 +209,9 @@ class Atom (object):
def is_ground(self):
return all(not arg.is_variable() for arg in self.arguments)
def plug(self, binding):
def plug(self, binding, caller=None):
"Assumes domain of BINDING is Terms"
logging.debug("Atom.plug({}, {})".format(str(binding), str(caller)))
new = copy.copy(self)
if isinstance(binding, dict):
args = []
@ -213,7 +223,7 @@ class Atom (object):
new.arguments = args
return new
else:
args = [Term.create_from_python(binding.apply(arg))
args = [Term.create_from_python(binding.apply(arg, caller))
for arg in self.arguments]
new.arguments = args
return new
@ -300,9 +310,9 @@ class Rule (object):
def is_rule(self):
return True
def plug(self, binding):
newhead = self.head.plug(binding)
newbody = [lit.plug(binding) for lit in self.body]
def plug(self, binding, caller=None):
newhead = self.head.plug(binding, caller=caller)
newbody = [lit.plug(binding, caller=caller) for lit in self.body]
return Rule(newhead, newbody)
def variables(self):
@ -318,6 +328,12 @@ class Rule (object):
return vs
def formulas_to_string(formulas):
""" Takes an iterable of compiler sentence objects and returns a
string representing that iterable, which the compiler will parse
into the original iterable. """
return " ".join([str(formula) for formula in formulas])
##############################################################################
## Compiler
##############################################################################

View File

@ -140,6 +140,17 @@ class TopDownTheory(object):
str(self.literal_index), str(self.binding),
str(self.previous), str(self.depth))
class TopDownResult(object):
""" Stores a single result for top-down-evaluation """
def __init__(self, binding, support):
self.binding = binding
self.support = support # for abduction
logging.debug(str(self))
def __str__(self):
return "TopDownResult(binding={}, support={})".format(
unify.binding_str(self.binding), iterstr(self.support))
class TopDownCaller(object):
""" Struct for storing info about the original caller of top-down
evaluation.
@ -149,75 +160,144 @@ class TopDownTheory(object):
FIND_ALL controls whether just the first or all answers are found.
ANSWERS is populated by top-down evaluation: it is the list of
VARIABLES instances that the search process proved true."""
def __init__(self, variables, binding, find_all=True):
def __init__(self, variables, binding, find_all=True, save=None):
# an iterable of variable objects
self.variables = variables
# a bi-unifier
self.binding = binding
# a boolean
self.find_all = find_all
# list populated by top-down-eval: the return value
self.answers = []
# The results of top-down-eval: a list of TopDownResults
self.results = []
# a Function that takes a compile.Literal and a unifier and
# returns T iff that literal under the unifier should be
# saved as part of an abductive explanation
self.save = save
# A variable used to store explanations as they are constructed
self.support = []
def __str__(self):
return ("TopDownCaller<query={}, binding={}, answers={}, "
"find_all={}>").format(
str(self.query), str(self.binding), str(self.answers),
str(self.find_all))
return ("TopDownCaller<variables={}, binding={}, find_all={}, "
"results={}, save={}, support={}>".format(
iterstr(self.variables), str(self.binding), str(self.find_all),
iterstr(self.results), repr(self.save), iterstr(self.support)))
def select(self, query):
""" Return tuples in which QUERY is true. """
def select(self, query, find_all=True):
""" Return list of instances of QUERY that are true.
If FIND_ALL is False, the return list has at most 1 element."""
assert (isinstance(query, compile.Atom) or
isinstance(query, compile.Rule)), "Query must be atom/rule"
if isinstance(query, compile.Atom):
literals = [query]
else:
literals = query.body
bindings = self.top_down_evaluation(query.variables(), literals)
# Because our output is instances of QUERY, need all the variables
# in QUERY.
bindings = self.top_down_evaluation(query.variables(), literals,
find_all=find_all)
logging.debug("Top_down_evaluation returned: {}".format(
str(bindings)))
if len(bindings) > 0:
logging.debug("Found answer {}".format(
"[" + ",".join([str(query.plug(x))
for x in bindings]) + "]"))
return [str(query.plug(x)) for x in bindings]
return [query.plug(x) for x in bindings]
# def return_true(*args):
# return True
def explain(self, query, tablenames, find_all=True):
""" Computes additional literals that if true would make
(some instance of) QUERY true. Returns a list of rules
where the head represents an instance of the QUERY and
the body is the collection of literals that must be true
in order to make that instance true. If QUERY is a rule,
each result is an instance of the head of that rule, and
the computed literals if true make the body of that rule
(and hence the head) true. If FIND_ALL is true, the
return list has at most one element.
Limitation: every negative literal relevant to a proof of
QUERY is true, i.e. no literals are saved when proving a negative
literal is true."""
assert (isinstance(query, compile.Atom) or
isinstance(query, compile.Rule)), \
"Explain requires a formula"
if isinstance(query, compile.Atom):
literals = [query]
output = query
else:
literals = query.body
output = query.head
# We need all the variables we will be using in the output, which
# here is just the head of QUERY (or QUERY itself if it is an atom)
abductions = self.top_down_abduction(output.variables(), literals,
find_all=find_all, save=lambda lit,binding: lit.table in tablenames)
results = [compile.Rule(output.plug(abd.binding), abd.support)
for abd in abductions]
logging.debug("MRT's explain result: " + iterstr(results))
return results
# def abduce(self, query, abducibles, consistency=return_true):
# """ Compute a collection of atoms with ABDUCIBLES in the head
# that when added to SELF makes query QUERY true (for some
# instance of QUERY). """
# assert False, "Not yet implemented"
def top_down_evaluation(self, variables, literals, binding=None):
""" Compute all instances of VARIABLES that make LITERALS
true according to the theory (after applying the
unifier BINDING). Returns list. """
# logging.debug("top_down_evaluation(vars={}, lits={}, "
def top_down_evaluation(self, variables, literals,
binding=None, find_all=True):
""" Compute all bindings of VARIABLES that make LITERALS
true according to the theory (after applying the unifier BINDING).
If FIND_ALL is False, stops after finding one such binding.
Returns a list of dictionary bindings. """
# logging.debug("CALL: top_down_evaluation(vars={}, literals={}, "
# "binding={})".format(
# "[" + ",".join([str(x) for x in variables]) + "]",
# "[" + ",".join([str(x) for x in literals]) + "]",
# iterstr(variables), iterstr(literals),
# str(binding)))
results = self.top_down_abduction(variables, literals,
binding=binding, find_all=find_all, save=None)
logging.debug("EXIT: top_down_evaluation(vars={}, literals={}, "
"binding={}) returned {}".format(
iterstr(variables), iterstr(literals),
str(binding), iterstr(results)))
return [x.binding for x in results]
def top_down_abduction(self, variables, literals, binding=None,
find_all=True, save=None):
""" Compute all bindings of VARIABLES that make LITERALS
true according to the theory (after applying the
unifier BINDING), if we add some number of additional
literals. Note: will not save any literals that are
needed to prove a negated literal since the results
would not make sense. Returns a list of TopDownResults. """
if binding is None:
binding = self.new_bi_unifier()
caller = self.TopDownCaller(variables, binding)
caller = self.TopDownCaller(variables, binding, find_all=find_all,
save=save)
if len(literals) == 0:
self.top_down_finish(None, caller)
return caller.answers
# Note: must use same binding in CALLER and CONTEXT
context = self.TopDownContext(literals, 0, binding, None, 0)
self.top_down_eval(context, caller)
return caller.answers
else:
# Note: must use same unifier in CALLER and CONTEXT
context = self.TopDownContext(literals, 0, binding, None, 0)
self.top_down_eval(context, caller)
return list(set(caller.results))
def top_down_eval(self, context, caller):
""" Compute all instances of LITERALS (from LITERAL_INDEX and above)
that are true according to the theory (after applying the
unifier BINDING to LITERALS). Returns False or an answer. """
# no recursive rules, ever; this style of algorithm will never halt.
# no recursive rules, ever; this style of algorithm will not terminate
lit = context.literals[context.literal_index]
# self.log(lit.table, "top_down_eval({})".format(str(context)))
# logging.debug("CALL: top_down_eval({}, {})".format(str(context),
# str(caller)))
# abduction
if caller.save is not None and caller.save(lit, context.binding):
self.print_call(lit, context.binding, context.depth)
# save lit and binding--binding may not be fully flushed out
# when we save (or ever for that matter)
caller.support.append((lit, context.binding))
self.print_save(lit, context.binding, context.depth)
success = self.top_down_finish(context, caller)
caller.support.pop() # pop in either case
if success:
return True
else:
self.print_fail(lit, context.binding, context.depth)
return False
# regular processing
if lit.is_negated():
# logging.debug("{} is negated".format(str(lit)))
# recurse on the negation of the literal
@ -227,9 +307,11 @@ class TopDownTheory(object):
new_context = self.TopDownContext([lit.complement()],
0, context.binding, None, context.depth + 1)
new_caller = self.TopDownCaller(caller.variables, caller.binding,
find_all=False)
# make sure new_caller has find_all=False, so we stop as soon
find_all=False, save=None)
# Make sure new_caller has find_all=False, so we stop as soon
# as we can.
# Ensure save=None so that abduction does not save anything.
# Saving while performing NAF makes no sense.
if self.top_down_includes(new_context, new_caller):
self.print_fail(lit, context.binding, context.depth)
return False
@ -291,12 +373,17 @@ class TopDownTheory(object):
Returns True if the search is finished and False otherwise.
Temporary, transparent modification of CONTEXT."""
if context is None:
# Found an answer; now store it
if caller is not None:
# flatten bindings and store before we undo
# copy caller.support and store before we undo
binding = {}
for var in caller.variables:
binding[var] = caller.binding.apply(var)
caller.answers.append(binding)
result = self.TopDownResult(binding,
[support[0].plug(support[1], caller=caller)
for support in caller.support])
caller.results.append(result)
return True
else:
self.print_exit(context.literals[context.literal_index],
@ -322,6 +409,10 @@ class TopDownTheory(object):
self.log(literal.table, "{}Exit: {} with {}".format("| "*depth,
literal.plug(binding), str(binding)))
def print_save(self, literal, binding, depth):
self.log(literal.table, "{}Save: {} with {}".format("| "*depth,
literal.plug(binding), str(binding)))
def print_fail(self, literal, binding, depth):
self.log(literal.table, "{}Fail: {} with {}".format("| "*depth,
literal.plug(binding), str(binding)))
@ -338,8 +429,9 @@ class TopDownTheory(object):
@classmethod
def new_bi_unifier(cls, dictionary=None):
""" Return a unifier compatible with unify.bi_unify """
return unify.BiUnifier(lambda (index):
compile.Variable("x" + str(index)), dictionary=dictionary)
return unify.BiUnifier(dictionary=dictionary)
# lambda (index):
# compile.Variable("x" + str(index)), dictionary=dictionary)
def head_index(self, table):
""" This routine must return all the formulas pertinent for
@ -831,10 +923,13 @@ class MaterializedRuleTheory(TopDownTheory):
"Delete requires a formula"
return self.modify(formula, is_insert=False)
def explain(self, query):
def explain(self, query, tablenames, find_all):
assert isinstance(query, compile.Atom), \
"Explain requires an atom"
return self.explain_aux(query, 0)
# ignoring TABLENAMES and FIND_ALL
# except that we return the proper type.
proof = self.explain_aux(query, 0)
return [proof]
############### Interface implementation ###############
@ -843,6 +938,8 @@ class MaterializedRuleTheory(TopDownTheory):
def explain_aux(self, query, depth):
self.log(query.table, "Explaining {}".format(str(query)), depth)
# Bail out on negated literals. Need different
# algorithm b/c we need to introduce quantifiers.
if query.is_negated():
return Proof(query, [])
# grab first local proof, since they're all equally good
@ -1102,15 +1199,20 @@ class Runtime (object):
# else:
# return self.select_if_obj(query, temporary_data)
def explain(self, query, target=None):
""" Event handler for explanations. Given a ground query, return
a single proof that it belongs in the database. """
def explain(self, query, tablenames=None, find_all=False, target=None):
""" Event handler for explanations. Given a ground query and
a collection of tablenames that we want the explanation in
terms of, return proof(s) that the query is true. If
FIND_ALL is True, returns list; otherwise, returns single proof."""
if isinstance(query, basestring):
return self.explain_string(query, self.get_target(target))
return self.explain_string(
query, tablenames, find_all, self.get_target(target))
elif isinstance(query, tuple):
return self.explain_tuple(query, self.get_target(target))
return self.explain_tuple(
query, tablenames, find_all, self.get_target(target))
else:
return self.explain_obj(query, self.get_target(target))
return self.explain_obj(
query, tablenames, find_all, self.get_target(target))
def insert(self, formula, target=None):
""" Event handler for arbitrary insertion (rules and facts). """
@ -1131,6 +1233,8 @@ class Runtime (object):
return self.delete_obj(formula, self.get_target(target))
############### Internal interface ###############
## Translate different representations of formulas into
## the compiler's internal representation.
## Only arguments allowed to be strings are suffixed with _string
## All other arguments are instances of Theory, Atom, etc.
@ -1143,22 +1247,24 @@ class Runtime (object):
"Queries can have only 1 statement: {}".format(
[str(x) for x in policy])
results = self.select_obj(policy[0], theory)
return " ".join([str(x) for x in results])
return compile.formulas_to_string(results)
def select_tuple(self, tuple, theory):
return self.select_obj(compile.Atom.create_from_iter(tuple), theory)
def explain_obj(self, query, theory):
return theory.explain(query)
def explain_obj(self, query, tablenames, find_all, theory):
return theory.explain(query, tablenames, find_all)
def explain_string(self, query_string, theory):
def explain_string(self, query_string, tablenames, find_all, theory):
policy = compile.get_parsed([query_string, '--input_string'])
assert len(policy) == 1, "Queries can have only 1 statement"
results = self.explain_obj(policy[0], theory)
return str(results)
results = self.explain_obj(policy[0], tablenames, find_all, theory)
logging.debug("explain_obj results: " + iterstr(results))
return compile.formulas_to_string(results)
def explain_tuple(self, tuple, theory):
self.explain_obj(compile.Atom.create_from_iter(tuple), theory)
def explain_tuple(self, tuple, tablenames, find_all, theory):
self.explain_obj(compile.Atom.create_from_iter(tuple),
tablenames, find_all, theory)
def insert_obj(self, formula, theory):
return theory.insert(formula)
@ -1185,3 +1291,4 @@ class Runtime (object):
def delete_tuple(self, tuple, theory):
self.delete_obj(compile.Atom.create_from_iter(tuple), theory)

View File

@ -71,21 +71,32 @@ class TestRuntime(unittest.TestCase):
correct_database, msg)
self.close(msg)
def check_equal(self, actual_code, correct_code, msg=None):
def check_equal(self, actual_code, correct_code, msg=None, equal=None):
def minus(iter1, iter2):
extra = []
for i1 in iter1:
found = False
for i2 in iter2:
if equal(i1, i2):
found = True
break
if not found:
extra.append(i1)
return extra
if equal is None:
equal = lambda x,y: x == y
logging.debug("** Checking equality: {} **".format(msg))
actual = compile.get_compiled([actual_code, '--input_string'])
correct = compile.get_compiled([correct_code, '--input_string'])
extra = []
for formula in actual.theory:
if formula not in correct.theory:
extra.append(formula)
missing = []
for formula in correct.theory:
if formula not in actual.theory:
missing.append(formula)
actual = compile.get_parsed([actual_code, '--input_string'])
correct = compile.get_parsed([correct_code, '--input_string'])
extra = minus(actual, correct)
missing = minus(correct, actual)
self.output_diffs(extra, missing, msg)
logging.debug("** Finished: {} **".format(msg))
def check_same(self, actual_code, correct_code, msg=None):
return self.check_equal(actual_code, correct_code, msg=msg,
equal=lambda x,y: unify.same(x,y) is not None)
def check_proofs(self, run, correct, msg=None):
""" Check that the proofs stored in runtime RUN are exactly
those in CORRECT. """
@ -468,20 +479,6 @@ class TestRuntime(unittest.TestCase):
'q(2,3) q(3,4) q(2,4) q(4,5) q(3,5) q(2,5)',
'Delete from recursive rules')
# TODO(tim): add tests for explanations
def test_materialized_explanations(self):
""" Test the explanation event handler. """
run = self.prep_runtime("p(x) :- q(x), r(x)", "Explanations")
run.insert("q(1) r(1)")
self.showdb(run)
logging.debug(run.explain("p(1)"))
run = self.prep_runtime("p(x) :- q(x), r(x) q(x) :- s(x), t(x)", "Explanations")
run.insert("s(1) r(1) t(1)")
self.showdb(run)
logging.debug(run.explain("p(1)"))
# self.fail()
def open(self, msg):
logging.debug("** Checking: {} **".format(msg))
@ -572,6 +569,26 @@ class TestRuntime(unittest.TestCase):
self.fail()
self.close(msg)
def test_same(self):
""" Test whether the SAME computation is correct. """
def str2form(formula_string):
return compile.get_parsed([formula_string, '--input_string'])[0]
def assertIsNotNone(x):
self.assertTrue(x is not None)
def assertIsNone(x):
self.assertTrue(x is None)
assertIsNotNone(unify.same(str2form('p(x)'), str2form('p(y)')))
assertIsNotNone(unify.same(str2form('p(x)'), str2form('p(x)')))
assertIsNotNone(unify.same(str2form('p(x,y)'), str2form('p(x,y)')))
assertIsNotNone(unify.same(str2form('p(x,y)'), str2form('p(y,x)')))
assertIsNone(unify.same(str2form('p(x,x)'), str2form('p(x,y)')))
assertIsNone(unify.same(str2form('p(x,y)'), str2form('p(x,x)')))
assertIsNotNone(unify.same(str2form('p(x,x)'), str2form('p(y,y)')))
assertIsNotNone(unify.same(str2form('p(x,y,x)'), str2form('p(y,x,y)')))
assertIsNone(unify.same(str2form('p(x,y,z)'), str2form('p(x,y,1)')))
def test_bi_unify(self):
""" Test the bi-unification routine and its supporting routines. """
def var(x):
@ -741,6 +758,14 @@ class TestRuntime(unittest.TestCase):
"p(0,5) p(0,6)",
"Tower of existential variables")
run = self.prep_runtime('p(x) :- q(x), r(z)'
'r(z) :- s(z), q(x)'
's(1)'
'q(x) :- t(x)'
't(1)', target=th)
self.check_equal(run.select('p(x)', target=th), 'p(1)',
"Two layers of existential variables")
# Negation
run = self.prep_runtime('p(x) :- q(x), not r(x)'
'q(1)'
@ -768,6 +793,7 @@ class TestRuntime(unittest.TestCase):
self.check_equal(run.select('p(x)', target=th), "p(2)",
"False Binary negation with existentials")
run = self.prep_runtime('p(x) :- q(x,y), s(y,z)'
's(y,z) :- r(y,w), t(z), not u(w,z)'
'q(1,1)'
@ -784,6 +810,9 @@ class TestRuntime(unittest.TestCase):
self.check_equal(run.select('p(x)', target=th), "p(1)",
"False embedded negation with existentials")
def test_theory_inclusion(self):
""" Test evaluation routines when one theory includes another. """
actth = runtime.Runtime.ACTION_THEORY
@ -797,5 +826,103 @@ class TestRuntime(unittest.TestCase):
self.check_equal(run.select('p(x)', target=actth),
"p(1) p(2)", "Theory inclusion")
# TODO(tim): add tests for explanations
def test_materialized_explain(self):
""" Test the explanation event handler. """
run = self.prep_runtime("p(x) :- q(x), r(x)", "Explanations")
run.insert("q(1) r(1)")
self.showdb(run)
logging.debug(run.explain("p(1)"))
run = self.prep_runtime("p(x) :- q(x), r(x) q(x) :- s(x), t(x)", "Explanations")
run.insert("s(1) r(1) t(1)")
self.showdb(run)
logging.debug(run.explain("p(1)"))
# self.fail()
def test_nonrecursive_explain(self):
""" Test explanations for NonrecursiveRuleTheory. """
def check(query, code, tablenames, correct, msg, find_all=True):
actth = runtime.Runtime.ACTION_THEORY
run = self.prep_runtime()
run.insert(code, target=actth)
actual = run.explain(query, tablenames=tablenames,
find_all=find_all, target=actth)
self.check_same(actual, correct, msg)
code = ('p(x) :- q(x), r(x)'
'q(1)'
'q(2)')
check('p(x)', code, ['r'],
'p(1) :- r(1) p(2) :- r(2)', "Basic monadic")
code = ('p(x) :- q(x), r(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['q'],
'p(1) :- q(1) p(2) :- q(2)', "Late, monadic binding")
code = ('p(x) :- q(x)')
check('p(x)', code, ['q'],
'p(x) :- q(x)', "No binding")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s'],
'p(1) :- s(1) p(2) :- s(2)', "Intermediate table")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s', 't'],
'p(1) :- s(1) p(2) :- s(2) p(1) :- t(1) p(2) :- t(2)',
"Intermediate, disjunctive table")
code = ('p(x) :- q(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'r(1)'
'r(2)')
check('p(x)', code, ['s'],
'p(1) :- s(1) p(2) :- s(2)',
"Intermediate, disjunctive table, but only some saveable")
code = ('p(x) :- q(x), u(x), r(x)'
'q(x) :- s(x)'
'q(x) :- t(x)'
'u(1)'
'u(2)')
check('p(x)', code, ['s', 't', 'r'],
'p(1) :- s(1), r(1) p(2) :- s(2), r(2)'
'p(1) :- t(1), r(1) p(2) :- t(2), r(2)',
"Multiple support literals")
code = ('p(x) :- q(x,y), s(x), r(y, z)'
'r(2,3)'
'r(2,4)'
's(1)'
's(2)')
check('p(x)', code, ['q'],
'p(1) :- q(1,2) p(2) :- q(2,2)',
"Existential variables that become ground")
code = ('p(x) :- q(x,y), r(y, z)'
'r(2,3)'
'r(2,4)')
check('p(x)', code, ['q'],
'p(x) :- q(x,2) p(x) :- q(x,2)',
"Existential variables that do not become ground")
code = ('p(x) :- q(x), r(z)'
'r(z) :- s(z), q(x)'
's(1)')
check('p(x)', code, ['q'],
'p(x) :- q(x), q(x1)',
"Existential variables with name collision")
if __name__ == '__main__':
unittest.main()

View File

@ -53,11 +53,9 @@ class BiUnifier(object):
def __eq__(self, other):
return self.var == other.var and self.unifier == other.unifier
def __init__(self, new_variable_factory, dictionary=None):
def __init__(self, dictionary=None):
# each value is a Value
self.contents = {}
self.new_variable_counter = 0
self.new_variable_factory = new_variable_factory
if dictionary is not None:
for var, value in dictionary.iteritems():
self.add(var, value, None)
@ -79,25 +77,52 @@ class BiUnifier(object):
else:
return None
def apply(self, term):
return self.apply_full(term)[0]
def apply(self, term, caller=None):
return self.apply_full(term, caller=caller)[0]
def apply_full(self, term):
def apply_full(self, term, caller=None):
""" Recursively apply unifiers to TERM and return
(i) the final value and (ii) the final unifier. """
(i) the final value and (ii) the final unifier.
If the final value is a variable, instantiate
with a new variable if not in KEEP_VARS """
# logging.debug("apply_full({}, {})".format(str(term), str(self)))
val = self.value(term)
if val is None:
return (term, self)
# If result is a variable and this variable is not one of those
# in the top-most calling context, then create a new variable
# name based on this Binding.
# This process avoids improper variable capture.
# Outputting the same variable with the same binding twice will
# generate the same output, but outputting the same variable with
# different bindings will generate different outputs.
# Note that this variable name mangling
# is not done for the top-most variables,
# which makes output a bit easier to read.
# Unfortunately, the process is non-deterministic from one run
# to the next, which makes testing difficult.
if (caller is not None and term.is_variable() and
not (term in caller.variables and caller.binding is self)):
return (compile.Variable(term.name + str(id(self))), self)
else:
return (term, self)
elif val.unifier is None or not val.value.is_variable():
return (val.value, val.unifier)
else:
return val.unifier.apply_full(val.value)
def is_one_to_one(self):
image = set() # set of all things mapped TO
for x in self.contents:
val = self.apply(x)
if val in image:
return False
image.add(val)
return True
def __str__(self):
s = repr(self)
s += "={"
s += ",".join(["{}:{}".format(var, str(val))
s += ",".join(["{}:{}".format(str(var), str(val))
for var, val in self.contents.iteritems()])
s += "}"
return s
@ -113,6 +138,14 @@ class BiUnifier(object):
def __eq__(self, other):
return self.contents == other.contents
def binding_str(binding):
""" Handles string conversion of either dictionary or Unifier. """
if isinstance(binding, dict):
s = ",".join(["{}: {}".format(str(var), str(val))
for var, val in binding.iteritems()])
return '{' + s + '}'
else:
return str(binding)
def undo_all(changes):
""" Undo all the changes in CHANGES. """
@ -147,7 +180,7 @@ def bi_unify_atoms(atom1, unifier1, atom2, unifier2):
# assign variable (if necessary) or fail
if val1.is_variable() and val2.is_variable():
# logging.debug("1 and 2 are variables")
if val1 == val2 and binding1 is binding2:
if bi_var_equal(val1, binding1, val2, binding2):
continue
else:
changes.append(binding1.add(val1, val2, binding2))
@ -196,5 +229,90 @@ def match_tuple_atom(tuple, atom):
binding[arg.name] = tuple[i]
return binding
def bi_var_equal(var1, unifier1, var2, unifier2):
""" Returns True iff variable VAR1 in unifier UNIFIER1 is the same
variable as VAR2 in UNIFIER2. """
return (var1 == var2 and unifier1 is unifier2)
def same(formula1, formula2):
""" Determine if FORMULA1 and FORMULA2 are the same up to a variable
renaming. Treats FORMULA1 and FORMULA2 as having different
variable namespaces. Returns None or the pair of unifiers. """
logging.debug("same({}, {})".format(str(formula1), str(formula2)))
if isinstance(formula1, compile.Atom):
if isinstance(formula2, compile.Rule):
return None
else:
u1 = BiUnifier()
u2 = BiUnifier()
if same_atoms(formula1, u1, formula2, u2, set()) is not None:
return (u1, u2)
return None
elif isinstance(formula1, compile.Rule):
if isinstance(formula2, compile.Atom):
return None
else:
if len(formula1.body) != len(formula2.body):
return None
u1 = BiUnifier()
u2 = BiUnifier()
bound2 = set()
result = same_atoms(formula1.head, u1, formula2.head, u2, bound2)
if result is None:
return None
for i in xrange(0, len(formula1.body)):
result = same_atoms(
formula1.body[i], u1, formula2.body[i], u2, bound2)
if result is None:
return None
return (u1, u2)
else:
return None
def same_atoms(atom1, unifier1, atom2, unifier2, bound2):
""" Modifies UNIFIER1 and UNIFIER2 to demonstrate
that ATOM1 and ATOM2 are identical up to a variable renaming.
Returns None if not possible or the list of changes if it is.
BOUND2 is the set of variables already bound in UNIFIER2 """
def die():
undo_all(changes)
return None
logging.debug("same_atoms({}, {})".format(str(atom1), str(atom2)))
if atom1.table != atom2.table:
return None
if len(atom1.arguments) != len(atom2.arguments):
return None
changes = []
# logging.debug("same_atoms entering loop")
for i in xrange(0, len(atom1.arguments)):
val1, binding1 = unifier1.apply_full(atom1.arguments[i])
val2, binding2 = unifier2.apply_full(atom2.arguments[i])
# logging.debug("val1: {} at {}; val2: {} at {}".format(
# str(val1), str(binding1), str(val2), str(binding2)))
if val1.is_variable() and val2.is_variable():
if bi_var_equal(val1, binding1, val2, binding2):
continue
# if we already bound either of these variables, not SAME
if not bi_var_equal(val1, binding1, atom1.arguments[i], unifier1):
# logging.debug("same_atoms: arg1 already bound")
return die()
if not bi_var_equal(val2, binding2, atom2.arguments[i], unifier2):
# logging.debug("same_atoms: arg2 already bound")
return die()
if val2 in bound2:
# logging.debug("same_atoms: binding is not 1-1")
return die()
changes.append(binding1.add(val1, val2, binding2))
bound2.add(val2)
elif val1.is_variable():
# logging.debug("val1 is a variable")
return die()
elif val2.is_variable():
# logging.debug("val2 is a variable")
return die()
elif val1 != val2:
# one is a variable and one is not or unmatching object constants
# logging.debug("val1 != val2")
return die()
return changes